2019-05-22 21:16:55 +00:00
package netstorage
import (
"container/heap"
2020-06-30 21:58:26 +00:00
"errors"
2020-11-22 22:39:34 +00:00
"flag"
2019-05-22 21:16:55 +00:00
"fmt"
2019-05-22 21:23:23 +00:00
"io"
2021-03-30 11:54:34 +00:00
"net"
2020-06-30 21:58:26 +00:00
"net/http"
2020-11-16 01:58:12 +00:00
"regexp"
2019-05-22 21:16:55 +00:00
"sort"
2020-06-30 21:58:26 +00:00
"strings"
2019-05-22 21:16:55 +00:00
"sync"
2020-09-27 20:17:14 +00:00
"sync/atomic"
2019-05-22 21:16:55 +00:00
"time"
2020-09-11 10:18:57 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/searchutils"
2019-05-22 21:23:23 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
2019-05-22 21:16:55 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
2020-12-08 18:49:32 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
2019-05-22 21:23:23 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
2020-06-24 16:36:55 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
2019-05-22 21:23:23 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/handshake"
2020-06-30 21:58:26 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
2019-05-22 21:16:55 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
2019-05-22 21:23:23 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil"
2019-05-22 21:16:55 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
2020-11-23 08:25:28 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/syncwg"
2019-05-22 21:16:55 +00:00
"github.com/VictoriaMetrics/metrics"
2020-11-23 10:33:17 +00:00
xxhash "github.com/cespare/xxhash/v2"
2021-07-15 21:34:33 +00:00
"github.com/valyala/fastrand"
2019-05-22 21:16:55 +00:00
)
2021-07-15 13:03:26 +00:00
var (
replicationFactor = flag . Int ( "replicationFactor" , 1 , "How many copies of every time series is available on vmstorage nodes. " +
"See -replicationFactor command-line flag for vminsert nodes" )
2021-07-28 14:40:09 +00:00
maxSamplesPerSeries = flag . Int ( "search.maxSamplesPerSeries" , 30e6 , "The maximum number of raw samples a single query can scan per each time series. See also -search.maxSamplesPerQuery" )
maxSamplesPerQuery = flag . Int ( "search.maxSamplesPerQuery" , 1e9 , "The maximum number of raw samples a single query can process across all time series. This protects from heavy queries, which select unexpectedly high number of raw samples. See also -search.maxSamplesPerSeries" )
2021-07-15 13:03:26 +00:00
)
2020-11-22 22:39:34 +00:00
2019-05-22 21:16:55 +00:00
// Result is a single timeseries result.
//
// ProcessSearchQuery returns Result slice.
type Result struct {
// The name of the metric.
MetricName storage . MetricName
// Values are sorted by Timestamps.
Values [ ] float64
Timestamps [ ] int64
}
func ( r * Result ) reset ( ) {
r . MetricName . Reset ( )
r . Values = r . Values [ : 0 ]
r . Timestamps = r . Timestamps [ : 0 ]
}
// Results holds results returned from ProcessSearchQuery.
type Results struct {
2019-08-04 19:15:33 +00:00
at * auth . Token
tr storage . TimeRange
fetchData bool
2020-09-11 10:18:57 +00:00
deadline searchutils . Deadline
2019-05-22 21:16:55 +00:00
tbf * tmpBlocksFile
packedTimeseries [ ] packedTimeseries
}
// Len returns the number of results in rss.
func ( rss * Results ) Len ( ) int {
return len ( rss . packedTimeseries )
}
// Cancel cancels rss work.
func ( rss * Results ) Cancel ( ) {
putTmpBlocksFile ( rss . tbf )
rss . tbf = nil
}
2020-06-23 17:29:19 +00:00
type timeseriesWork struct {
2021-03-30 10:22:21 +00:00
mustStop * uint32
2020-09-27 20:17:14 +00:00
rss * Results
pts * packedTimeseries
f func ( rs * Result , workerID uint ) error
doneCh chan error
2020-06-23 17:29:19 +00:00
rowsProcessed int
}
2021-02-16 14:08:37 +00:00
func ( tsw * timeseriesWork ) reset ( ) {
2021-03-30 10:22:21 +00:00
tsw . mustStop = nil
2021-02-16 14:08:37 +00:00
tsw . rss = nil
tsw . pts = nil
tsw . f = nil
if n := len ( tsw . doneCh ) ; n > 0 {
logger . Panicf ( "BUG: tsw.doneCh must be empty during reset; it contains %d items instead" , n )
}
tsw . rowsProcessed = 0
}
func getTimeseriesWork ( ) * timeseriesWork {
v := tswPool . Get ( )
if v == nil {
v = & timeseriesWork {
doneCh : make ( chan error , 1 ) ,
}
}
return v . ( * timeseriesWork )
}
func putTimeseriesWork ( tsw * timeseriesWork ) {
tsw . reset ( )
tswPool . Put ( tsw )
}
var tswPool sync . Pool
2021-07-26 12:38:51 +00:00
func scheduleTimeseriesWork ( workChs [ ] chan * timeseriesWork , tsw * timeseriesWork ) {
if len ( workChs ) == 1 {
2021-07-30 09:02:09 +00:00
// Fast path for a single worker
2021-07-26 12:38:51 +00:00
workChs [ 0 ] <- tsw
2021-07-15 21:34:33 +00:00
return
}
2021-07-15 12:40:41 +00:00
attempts := 0
for {
2021-07-26 12:38:51 +00:00
idx := fastrand . Uint32n ( uint32 ( len ( workChs ) ) )
2021-07-15 12:40:41 +00:00
select {
2021-07-26 12:38:51 +00:00
case workChs [ idx ] <- tsw :
2021-07-15 12:40:41 +00:00
return
default :
attempts ++
2021-07-26 12:38:51 +00:00
if attempts >= len ( workChs ) {
workChs [ idx ] <- tsw
2021-07-15 12:40:41 +00:00
return
}
}
2020-06-23 17:29:19 +00:00
}
}
2021-07-30 09:02:09 +00:00
func ( tsw * timeseriesWork ) do ( r * Result , workerID uint ) error {
if atomic . LoadUint32 ( tsw . mustStop ) != 0 {
return nil
}
rss := tsw . rss
if rss . deadline . Exceeded ( ) {
atomic . StoreUint32 ( tsw . mustStop , 1 )
return fmt . Errorf ( "timeout exceeded during query execution: %s" , rss . deadline . String ( ) )
}
if err := tsw . pts . Unpack ( r , rss . tbf , rss . tr , rss . fetchData , rss . at ) ; err != nil {
atomic . StoreUint32 ( tsw . mustStop , 1 )
return fmt . Errorf ( "error during time series unpacking: %w" , err )
}
if len ( r . Timestamps ) > 0 || ! rss . fetchData {
if err := tsw . f ( r , workerID ) ; err != nil {
atomic . StoreUint32 ( tsw . mustStop , 1 )
return err
}
}
tsw . rowsProcessed = len ( r . Values )
return nil
}
2021-07-15 12:40:41 +00:00
func timeseriesWorker ( ch <- chan * timeseriesWork , workerID uint ) {
2021-07-26 12:38:51 +00:00
v := resultPool . Get ( )
if v == nil {
v = & result { }
}
r := v . ( * result )
2021-07-15 12:40:41 +00:00
for tsw := range ch {
2021-07-30 09:02:09 +00:00
err := tsw . do ( & r . rs , workerID )
tsw . doneCh <- err
}
currentTime := fasttime . UnixTimestamp ( )
if cap ( r . rs . Values ) > 1024 * 1024 && 4 * len ( r . rs . Values ) < cap ( r . rs . Values ) && currentTime - r . lastResetTime > 10 {
// Reset r.rs in order to preseve memory usage after processing big time series with millions of rows.
r . rs = Result { }
r . lastResetTime = currentTime
2020-06-23 17:29:19 +00:00
}
2021-07-26 12:38:51 +00:00
resultPool . Put ( r )
}
type result struct {
rs Result
lastResetTime uint64
2020-06-23 17:29:19 +00:00
}
2021-07-26 12:38:51 +00:00
var resultPool sync . Pool
2020-07-23 16:21:49 +00:00
// RunParallel runs f in parallel for all the results from rss.
2019-05-22 21:16:55 +00:00
//
// f shouldn't hold references to rs after returning.
2019-07-12 12:51:02 +00:00
// workerID is the id of the worker goroutine that calls f.
2020-09-27 20:17:14 +00:00
// Data processing is immediately stopped if f returns non-nil error.
2019-05-22 21:16:55 +00:00
//
// rss becomes unusable after the call to RunParallel.
2020-09-27 20:17:14 +00:00
func ( rss * Results ) RunParallel ( f func ( rs * Result , workerID uint ) error ) error {
2019-05-22 21:16:55 +00:00
defer func ( ) {
putTmpBlocksFile ( rss . tbf )
rss . tbf = nil
} ( )
2021-07-26 12:38:51 +00:00
// Spin up local workers.
//
// Do not use a global workChs with a global pool of workers, since it may lead to a deadlock in the following case:
// - RunParallel is called with f, which blocks without forward progress.
// - All the workers in the global pool became blocked in f.
// - workChs is filled up, so it cannot accept new work items from other RunParallel calls.
workers := len ( rss . packedTimeseries )
if workers > gomaxprocs {
workers = gomaxprocs
}
if workers < 1 {
workers = 1
}
workChs := make ( [ ] chan * timeseriesWork , workers )
var workChsWG sync . WaitGroup
for i := 0 ; i < workers ; i ++ {
workChs [ i ] = make ( chan * timeseriesWork , 16 )
workChsWG . Add ( 1 )
go func ( workerID int ) {
defer workChsWG . Done ( )
timeseriesWorker ( workChs [ workerID ] , uint ( workerID ) )
} ( i )
}
2019-05-22 21:16:55 +00:00
// Feed workers with work.
2020-06-23 17:29:19 +00:00
tsws := make ( [ ] * timeseriesWork , len ( rss . packedTimeseries ) )
2021-03-30 10:22:21 +00:00
var mustStop uint32
2019-05-22 21:16:55 +00:00
for i := range rss . packedTimeseries {
2021-02-16 14:08:37 +00:00
tsw := getTimeseriesWork ( )
tsw . rss = rss
tsw . pts = & rss . packedTimeseries [ i ]
tsw . f = f
2021-03-30 10:22:21 +00:00
tsw . mustStop = & mustStop
2021-07-26 12:38:51 +00:00
scheduleTimeseriesWork ( workChs , tsw )
2020-06-23 17:29:19 +00:00
tsws [ i ] = tsw
2019-05-22 21:16:55 +00:00
}
2019-11-23 11:22:55 +00:00
seriesProcessedTotal := len ( rss . packedTimeseries )
2019-05-22 21:16:55 +00:00
rss . packedTimeseries = rss . packedTimeseries [ : 0 ]
2020-06-23 17:29:19 +00:00
// Wait until work is complete.
var firstErr error
rowsProcessedTotal := 0
for _ , tsw := range tsws {
2021-03-30 10:22:21 +00:00
if err := <- tsw . doneCh ; err != nil && firstErr == nil {
// Return just the first error, since other errors are likely duplicate the first error.
2020-06-23 17:29:19 +00:00
firstErr = err
2019-05-22 21:16:55 +00:00
}
2020-06-23 17:29:19 +00:00
rowsProcessedTotal += tsw . rowsProcessed
2021-02-16 14:08:37 +00:00
putTimeseriesWork ( tsw )
2019-05-22 21:16:55 +00:00
}
2020-06-23 17:29:19 +00:00
2019-11-23 11:22:55 +00:00
perQueryRowsProcessed . Update ( float64 ( rowsProcessedTotal ) )
perQuerySeriesProcessed . Update ( float64 ( seriesProcessedTotal ) )
2021-07-26 12:38:51 +00:00
// Shut down local workers
for _ , workCh := range workChs {
close ( workCh )
}
workChsWG . Wait ( )
2020-06-23 17:29:19 +00:00
return firstErr
2019-05-22 21:16:55 +00:00
}
2019-11-23 11:22:55 +00:00
var perQueryRowsProcessed = metrics . NewHistogram ( ` vm_per_query_rows_processed_count ` )
var perQuerySeriesProcessed = metrics . NewHistogram ( ` vm_per_query_series_processed_count ` )
2020-12-08 18:49:32 +00:00
var gomaxprocs = cgroup . AvailableCPUs ( )
2019-05-22 21:16:55 +00:00
type packedTimeseries struct {
metricName string
addrs [ ] tmpBlockAddr
}
2020-08-06 14:42:15 +00:00
type unpackWorkItem struct {
addr tmpBlockAddr
tr storage . TimeRange
}
2020-06-23 17:29:19 +00:00
type unpackWork struct {
2020-09-24 17:16:19 +00:00
ws [ ] unpackWorkItem
tbf * tmpBlocksFile
at * auth . Token
sbs [ ] * sortBlock
doneCh chan error
2020-06-23 17:29:19 +00:00
}
2019-05-22 21:16:55 +00:00
2020-07-22 11:53:54 +00:00
func ( upw * unpackWork ) reset ( ) {
2020-08-06 14:42:15 +00:00
ws := upw . ws
for i := range ws {
w := & ws [ i ]
w . addr = tmpBlockAddr { }
w . tr = storage . TimeRange { }
}
upw . ws = upw . ws [ : 0 ]
2020-07-22 11:53:54 +00:00
upw . tbf = nil
upw . at = nil
2020-08-06 14:42:15 +00:00
sbs := upw . sbs
for i := range sbs {
sbs [ i ] = nil
}
upw . sbs = upw . sbs [ : 0 ]
2020-07-22 11:53:54 +00:00
if n := len ( upw . doneCh ) ; n > 0 {
logger . Panicf ( "BUG: upw.doneCh must be empty; it contains %d items now" , n )
}
}
2020-09-15 18:06:04 +00:00
func ( upw * unpackWork ) unpack ( tmpBlock * storage . Block ) {
2020-08-06 14:42:15 +00:00
for _ , w := range upw . ws {
sb := getSortBlock ( )
2020-09-24 17:16:19 +00:00
if err := sb . unpackFrom ( tmpBlock , upw . tbf , w . addr , w . tr , upw . at ) ; err != nil {
2020-08-06 14:42:15 +00:00
putSortBlock ( sb )
upw . doneCh <- fmt . Errorf ( "cannot unpack block: %w" , err )
return
}
upw . sbs = append ( upw . sbs , sb )
}
upw . doneCh <- nil
}
2020-07-22 11:53:54 +00:00
func getUnpackWork ( ) * unpackWork {
v := unpackWorkPool . Get ( )
if v != nil {
return v . ( * unpackWork )
}
return & unpackWork {
doneCh : make ( chan error , 1 ) ,
}
}
func putUnpackWork ( upw * unpackWork ) {
upw . reset ( )
unpackWorkPool . Put ( upw )
}
var unpackWorkPool sync . Pool
2021-07-30 09:02:09 +00:00
func scheduleUnpackWork ( workChs [ ] chan * unpackWork , uw * unpackWork ) {
if len ( workChs ) == 1 {
// Fast path for a single worker
workChs [ 0 ] <- uw
2021-07-15 21:34:33 +00:00
return
}
2021-07-15 12:40:41 +00:00
attempts := 0
for {
2021-07-30 09:02:09 +00:00
idx := fastrand . Uint32n ( uint32 ( len ( workChs ) ) )
2021-07-15 12:40:41 +00:00
select {
2021-07-30 09:02:09 +00:00
case workChs [ idx ] <- uw :
2021-07-15 12:40:41 +00:00
return
default :
attempts ++
2021-07-30 09:02:09 +00:00
if attempts >= len ( workChs ) {
workChs [ idx ] <- uw
2021-07-15 12:40:41 +00:00
return
}
}
2019-05-22 21:16:55 +00:00
}
2020-06-23 17:29:19 +00:00
}
2021-07-15 12:40:41 +00:00
func unpackWorker ( ch <- chan * unpackWork ) {
2021-07-30 09:02:09 +00:00
v := tmpBlockPool . Get ( )
if v == nil {
v = & storage . Block { }
}
tmpBlock := v . ( * storage . Block )
2021-07-15 12:40:41 +00:00
for upw := range ch {
2021-07-30 09:02:09 +00:00
upw . unpack ( tmpBlock )
2019-05-22 21:16:55 +00:00
}
2021-07-30 09:02:09 +00:00
tmpBlockPool . Put ( v )
2020-06-23 17:29:19 +00:00
}
2019-05-22 21:16:55 +00:00
2021-07-30 09:02:09 +00:00
var tmpBlockPool sync . Pool
2020-08-06 14:42:15 +00:00
// unpackBatchSize is the maximum number of blocks that may be unpacked at once by a single goroutine.
//
2021-07-30 09:02:09 +00:00
// It is better to load a single goroutine for up to one second on a system with many CPU cores
// in order to reduce inter-CPU memory ping-pong.
// A single goroutine can unpack up to 40 millions of rows per second, while a single block contains up to 8K rows.
// So the batch size should be 40M / 8K = 5K.
var unpackBatchSize = 5000
2020-08-06 14:42:15 +00:00
2020-06-23 17:29:19 +00:00
// Unpack unpacks pts to dst.
2021-07-30 09:02:09 +00:00
func ( pts * packedTimeseries ) Unpack ( dst * Result , tbf * tmpBlocksFile , tr storage . TimeRange , fetchData bool , at * auth . Token ) error {
2020-06-23 17:29:19 +00:00
dst . reset ( )
if err := dst . MetricName . Unmarshal ( bytesutil . ToUnsafeBytes ( pts . metricName ) ) ; err != nil {
2020-06-30 19:58:18 +00:00
return fmt . Errorf ( "cannot unmarshal metricName %q: %w" , pts . metricName , err )
2019-05-22 21:16:55 +00:00
}
2020-09-24 17:16:19 +00:00
if ! fetchData {
// Do not spend resources on data reading and unpacking.
return nil
}
2019-05-22 21:16:55 +00:00
2021-07-30 09:02:09 +00:00
// Spin up local workers.
// Do not use global workers pool, since it increases inter-CPU memory ping-poing,
// which reduces the scalability on systems with many CPU cores.
2020-09-22 20:50:55 +00:00
addrsLen := len ( pts . addrs )
2021-07-30 09:02:09 +00:00
workers := addrsLen / unpackBatchSize
if workers > gomaxprocs {
workers = gomaxprocs
}
if workers < 1 {
workers = 1
}
workChs := make ( [ ] chan * unpackWork , workers )
var workChsWG sync . WaitGroup
for i := 0 ; i < workers ; i ++ {
// Use unbuffered channel on purpose, since there are high chances
// that only a single unpackWork is needed to unpack.
// The unbuffered channel should reduce inter-CPU ping-pong in this case,
// which should improve the performance in a system with many CPU cores.
workChs [ i ] = make ( chan * unpackWork )
workChsWG . Add ( 1 )
go func ( workerID int ) {
defer workChsWG . Done ( )
unpackWorker ( workChs [ workerID ] )
} ( i )
}
// Feed workers with work
2020-09-22 20:50:55 +00:00
upws := make ( [ ] * unpackWork , 0 , 1 + addrsLen / unpackBatchSize )
2020-08-06 14:42:15 +00:00
upw := getUnpackWork ( )
upw . tbf = tbf
upw . at = at
for _ , addr := range pts . addrs {
if len ( upw . ws ) >= unpackBatchSize {
2021-07-30 09:02:09 +00:00
scheduleUnpackWork ( workChs , upw )
2020-08-06 14:42:15 +00:00
upws = append ( upws , upw )
upw = getUnpackWork ( )
upw . tbf = tbf
upw . at = at
}
upw . ws = append ( upw . ws , unpackWorkItem {
addr : addr ,
tr : tr ,
} )
2019-05-22 21:16:55 +00:00
}
2021-07-30 09:02:09 +00:00
scheduleUnpackWork ( workChs , upw )
2020-08-06 14:42:15 +00:00
upws = append ( upws , upw )
2019-05-22 21:16:55 +00:00
pts . addrs = pts . addrs [ : 0 ]
2020-06-23 17:29:19 +00:00
// Wait until work is complete
2021-07-15 13:03:26 +00:00
samples := 0
2020-09-22 20:50:55 +00:00
sbs := make ( [ ] * sortBlock , 0 , addrsLen )
2020-06-23 17:29:19 +00:00
var firstErr error
for _ , upw := range upws {
if err := <- upw . doneCh ; err != nil && firstErr == nil {
// Return the first error only, since other errors are likely the same.
firstErr = err
}
if firstErr == nil {
2021-07-15 13:03:26 +00:00
for _ , sb := range upw . sbs {
samples += len ( sb . Timestamps )
}
2021-07-28 14:40:09 +00:00
if * maxSamplesPerSeries <= 0 || samples < * maxSamplesPerSeries {
2021-07-15 13:03:26 +00:00
sbs = append ( sbs , upw . sbs ... )
} else {
firstErr = fmt . Errorf ( "cannot process more than %d samples per series; either increase -search.maxSamplesPerSeries " +
"or reduce time range for the query" , * maxSamplesPerSeries )
}
}
if firstErr != nil {
2020-08-06 14:42:15 +00:00
for _ , sb := range upw . sbs {
putSortBlock ( sb )
}
2019-05-22 21:16:55 +00:00
}
2020-07-22 11:53:54 +00:00
putUnpackWork ( upw )
2019-05-22 21:16:55 +00:00
}
2021-07-30 09:02:09 +00:00
// Shut down local workers
for _ , workCh := range workChs {
close ( workCh )
}
workChsWG . Wait ( )
2020-06-23 17:29:19 +00:00
if firstErr != nil {
return firstErr
2019-05-22 21:16:55 +00:00
}
mergeSortBlocks ( dst , sbs )
return nil
}
func getSortBlock ( ) * sortBlock {
v := sbPool . Get ( )
if v == nil {
return & sortBlock { }
}
return v . ( * sortBlock )
}
func putSortBlock ( sb * sortBlock ) {
sb . reset ( )
sbPool . Put ( sb )
}
var sbPool sync . Pool
var metricRowsSkipped = metrics . NewCounter ( ` vm_metric_rows_skipped_total { name="vmselect"} ` )
func mergeSortBlocks ( dst * Result , sbh sortBlocksHeap ) {
// Skip empty sort blocks, since they cannot be passed to heap.Init.
src := sbh
sbh = sbh [ : 0 ]
for _ , sb := range src {
if len ( sb . Timestamps ) == 0 {
putSortBlock ( sb )
continue
}
sbh = append ( sbh , sb )
}
if len ( sbh ) == 0 {
return
}
heap . Init ( & sbh )
for {
top := sbh [ 0 ]
heap . Pop ( & sbh )
if len ( sbh ) == 0 {
dst . Timestamps = append ( dst . Timestamps , top . Timestamps [ top . NextIdx : ] ... )
dst . Values = append ( dst . Values , top . Values [ top . NextIdx : ] ... )
putSortBlock ( top )
2020-01-30 23:09:44 +00:00
break
2019-05-22 21:16:55 +00:00
}
sbNext := sbh [ 0 ]
tsNext := sbNext . Timestamps [ sbNext . NextIdx ]
idxNext := len ( top . Timestamps )
if top . Timestamps [ idxNext - 1 ] > tsNext {
idxNext = top . NextIdx
for top . Timestamps [ idxNext ] <= tsNext {
idxNext ++
}
}
dst . Timestamps = append ( dst . Timestamps , top . Timestamps [ top . NextIdx : idxNext ] ... )
dst . Values = append ( dst . Values , top . Values [ top . NextIdx : idxNext ] ... )
if idxNext < len ( top . Timestamps ) {
top . NextIdx = idxNext
heap . Push ( & sbh , top )
} else {
// Return top to the pool.
putSortBlock ( top )
}
}
2020-01-30 23:09:44 +00:00
2020-02-27 21:47:05 +00:00
timestamps , values := storage . DeduplicateSamples ( dst . Timestamps , dst . Values )
dedups := len ( dst . Timestamps ) - len ( timestamps )
dedupsDuringSelect . Add ( dedups )
dst . Timestamps = timestamps
dst . Values = values
2019-05-22 21:16:55 +00:00
}
2020-02-27 21:47:05 +00:00
var dedupsDuringSelect = metrics . NewCounter ( ` vm_deduplicated_samples_total { type="select"} ` )
2019-05-22 21:16:55 +00:00
type sortBlock struct {
Timestamps [ ] int64
Values [ ] float64
NextIdx int
}
func ( sb * sortBlock ) reset ( ) {
sb . Timestamps = sb . Timestamps [ : 0 ]
sb . Values = sb . Values [ : 0 ]
sb . NextIdx = 0
}
2020-09-24 17:16:19 +00:00
func ( sb * sortBlock ) unpackFrom ( tmpBlock * storage . Block , tbf * tmpBlocksFile , addr tmpBlockAddr , tr storage . TimeRange , at * auth . Token ) error {
2020-09-15 18:06:04 +00:00
tmpBlock . Reset ( )
2020-09-15 21:10:28 +00:00
tbf . MustReadBlockAt ( tmpBlock , addr )
2020-09-24 17:16:19 +00:00
if err := tmpBlock . UnmarshalData ( ) ; err != nil {
return fmt . Errorf ( "cannot unmarshal block: %w" , err )
2019-05-22 21:16:55 +00:00
}
2020-09-26 01:29:45 +00:00
sb . Timestamps , sb . Values = tmpBlock . AppendRowsWithTimeRangeFilter ( sb . Timestamps [ : 0 ] , sb . Values [ : 0 ] , tr )
skippedRows := tmpBlock . RowsCount ( ) - len ( sb . Timestamps )
2019-05-22 21:16:55 +00:00
metricRowsSkipped . Add ( skippedRows )
return nil
}
type sortBlocksHeap [ ] * sortBlock
func ( sbh sortBlocksHeap ) Len ( ) int {
return len ( sbh )
}
func ( sbh sortBlocksHeap ) Less ( i , j int ) bool {
a := sbh [ i ]
b := sbh [ j ]
return a . Timestamps [ a . NextIdx ] < b . Timestamps [ b . NextIdx ]
}
func ( sbh sortBlocksHeap ) Swap ( i , j int ) {
sbh [ i ] , sbh [ j ] = sbh [ j ] , sbh [ i ]
}
func ( sbh * sortBlocksHeap ) Push ( x interface { } ) {
* sbh = append ( * sbh , x . ( * sortBlock ) )
}
func ( sbh * sortBlocksHeap ) Pop ( ) interface { } {
a := * sbh
v := a [ len ( a ) - 1 ]
* sbh = a [ : len ( a ) - 1 ]
return v
}
2020-11-23 10:33:17 +00:00
// RegisterMetricNames registers metric names from mrs in the storage.
func RegisterMetricNames ( at * auth . Token , mrs [ ] storage . MetricRow , deadline searchutils . Deadline ) error {
// Split mrs among available vmstorage nodes.
mrsPerNode := make ( [ ] [ ] storage . MetricRow , len ( storageNodes ) )
for _ , mr := range mrs {
idx := 0
if len ( storageNodes ) > 1 {
// There is no need in using the same hash as for time series distribution in vminsert,
// since RegisterMetricNames is used only in Graphite Tags API.
h := xxhash . Sum64 ( mr . MetricNameRaw )
idx = int ( h % uint64 ( len ( storageNodes ) ) )
}
mrsPerNode [ idx ] = append ( mrsPerNode [ idx ] , mr )
}
// Push mrs to storage nodes in parallel.
snr := startStorageNodesRequest ( true , func ( idx int , sn * storageNode ) interface { } {
sn . registerMetricNamesRequests . Inc ( )
err := sn . registerMetricNames ( mrsPerNode [ idx ] , deadline )
if err != nil {
2020-11-23 13:00:04 +00:00
sn . registerMetricNamesErrors . Inc ( )
2020-11-23 10:33:17 +00:00
}
return & err
} )
// Collect results
err := snr . collectAllResults ( func ( result interface { } ) error {
errP := result . ( * error )
return * errP
} )
if err != nil {
return fmt . Errorf ( "cannot register series on all the vmstorage nodes: %w" , err )
}
return nil
}
2019-05-22 21:23:23 +00:00
// DeleteSeries deletes time series matching the given sq.
2020-09-11 10:18:57 +00:00
func DeleteSeries ( at * auth . Token , sq * storage . SearchQuery , deadline searchutils . Deadline ) ( int , error ) {
2019-05-22 21:23:23 +00:00
requestData := sq . Marshal ( nil )
// Send the query to all the storage nodes in parallel.
type nodeResult struct {
deletedCount int
err error
2019-05-22 21:16:55 +00:00
}
2020-11-23 10:33:17 +00:00
snr := startStorageNodesRequest ( true , func ( idx int , sn * storageNode ) interface { } {
2020-11-23 08:51:40 +00:00
sn . deleteSeriesRequests . Inc ( )
deletedCount , err := sn . deleteMetrics ( requestData , deadline )
if err != nil {
2020-11-23 13:00:04 +00:00
sn . deleteSeriesErrors . Inc ( )
2020-11-23 08:51:40 +00:00
}
return & nodeResult {
deletedCount : deletedCount ,
err : err ,
}
} )
2019-05-22 21:23:23 +00:00
// Collect results
deletedTotal := 0
2020-11-23 10:33:17 +00:00
err := snr . collectAllResults ( func ( result interface { } ) error {
2020-11-22 22:15:51 +00:00
nr := result . ( * nodeResult )
2019-05-22 21:23:23 +00:00
if nr . err != nil {
2020-11-22 22:15:51 +00:00
return nr . err
2019-05-22 21:23:23 +00:00
}
deletedTotal += nr . deletedCount
2020-11-22 22:15:51 +00:00
return nil
} )
if err != nil {
return deletedTotal , fmt . Errorf ( "cannot delete time series on all the vmstorage nodes: %w" , err )
2019-05-22 21:23:23 +00:00
}
return deletedTotal , nil
2019-05-22 21:16:55 +00:00
}
2020-11-04 22:15:43 +00:00
// GetLabelsOnTimeRange returns labels for the given tr until the given deadline.
2020-11-14 10:36:21 +00:00
func GetLabelsOnTimeRange ( at * auth . Token , denyPartialResponse bool , tr storage . TimeRange , deadline searchutils . Deadline ) ( [ ] string , bool , error ) {
2020-11-04 22:15:43 +00:00
if deadline . Exceeded ( ) {
return nil , false , fmt . Errorf ( "timeout exceeded before starting the query processing: %s" , deadline . String ( ) )
}
// Send the query to all the storage nodes in parallel.
type nodeResult struct {
labels [ ] string
err error
}
2020-11-23 10:33:17 +00:00
snr := startStorageNodesRequest ( denyPartialResponse , func ( idx int , sn * storageNode ) interface { } {
2020-11-23 08:51:40 +00:00
sn . labelsOnTimeRangeRequests . Inc ( )
labels , err := sn . getLabelsOnTimeRange ( at . AccountID , at . ProjectID , tr , deadline )
if err != nil {
2020-11-23 13:00:04 +00:00
sn . labelsOnTimeRangeErrors . Inc ( )
2020-11-23 08:51:40 +00:00
err = fmt . Errorf ( "cannot get labels on time range from vmstorage %s: %w" , sn . connPool . Addr ( ) , err )
}
return & nodeResult {
labels : labels ,
err : err ,
}
} )
2020-11-04 22:15:43 +00:00
// Collect results
var labels [ ] string
2020-11-23 08:51:40 +00:00
isPartial , err := snr . collectResults ( partialLabelsOnTimeRangeResults , func ( result interface { } ) error {
2020-11-22 22:15:51 +00:00
nr := result . ( * nodeResult )
2020-11-04 22:15:43 +00:00
if nr . err != nil {
2020-11-22 22:15:51 +00:00
return nr . err
2020-11-04 22:15:43 +00:00
}
labels = append ( labels , nr . labels ... )
2020-11-22 22:15:51 +00:00
return nil
} )
if err != nil {
return nil , isPartial , fmt . Errorf ( "cannot fetch labels on time range from vmstorage nodes: %w" , err )
2020-11-04 22:15:43 +00:00
}
// Deduplicate labels
labels = deduplicateStrings ( labels )
// Substitute "" with "__name__"
for i := range labels {
if labels [ i ] == "" {
labels [ i ] = "__name__"
}
}
// Sort labels like Prometheus does
sort . Strings ( labels )
2020-11-14 10:36:21 +00:00
return labels , isPartial , nil
2020-11-04 22:15:43 +00:00
}
2020-11-15 23:25:38 +00:00
// GetGraphiteTags returns Graphite tags until the given deadline.
2020-11-16 01:58:12 +00:00
func GetGraphiteTags ( at * auth . Token , denyPartialResponse bool , filter string , limit int , deadline searchutils . Deadline ) ( [ ] string , bool , error ) {
if deadline . Exceeded ( ) {
return nil , false , fmt . Errorf ( "timeout exceeded before starting the query processing: %s" , deadline . String ( ) )
}
2020-11-15 23:25:38 +00:00
labels , isPartial , err := GetLabels ( at , denyPartialResponse , deadline )
if err != nil {
return nil , false , err
}
2020-11-16 01:58:12 +00:00
// Substitute "__name__" with "name" for Graphite compatibility
for i := range labels {
2020-12-06 23:07:03 +00:00
if labels [ i ] != "__name__" {
continue
}
// Prevent from duplicate `name` tag.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/942
if hasString ( labels , "name" ) {
labels = append ( labels [ : i ] , labels [ i + 1 : ] ... )
} else {
2020-11-15 23:25:38 +00:00
labels [ i ] = "name"
2020-11-16 12:49:46 +00:00
sort . Strings ( labels )
2020-11-15 23:25:38 +00:00
}
2020-12-06 23:07:03 +00:00
break
2020-11-15 23:25:38 +00:00
}
2020-11-16 13:50:48 +00:00
if len ( filter ) > 0 {
labels , err = applyGraphiteRegexpFilter ( filter , labels )
if err != nil {
return nil , false , err
}
}
2020-11-16 01:58:12 +00:00
if limit > 0 && limit < len ( labels ) {
labels = labels [ : limit ]
}
2020-11-15 23:25:38 +00:00
return labels , isPartial , nil
}
2020-12-06 23:07:03 +00:00
func hasString ( a [ ] string , s string ) bool {
for _ , x := range a {
if x == s {
return true
}
}
return false
}
2019-05-22 21:16:55 +00:00
// GetLabels returns labels until the given deadline.
2020-11-14 10:36:21 +00:00
func GetLabels ( at * auth . Token , denyPartialResponse bool , deadline searchutils . Deadline ) ( [ ] string , bool , error ) {
2020-07-21 15:34:59 +00:00
if deadline . Exceeded ( ) {
return nil , false , fmt . Errorf ( "timeout exceeded before starting the query processing: %s" , deadline . String ( ) )
}
2019-05-22 21:23:23 +00:00
// Send the query to all the storage nodes in parallel.
type nodeResult struct {
labels [ ] string
err error
}
2020-11-23 10:33:17 +00:00
snr := startStorageNodesRequest ( denyPartialResponse , func ( idx int , sn * storageNode ) interface { } {
2020-11-23 08:51:40 +00:00
sn . labelsRequests . Inc ( )
labels , err := sn . getLabels ( at . AccountID , at . ProjectID , deadline )
if err != nil {
2020-11-23 13:00:04 +00:00
sn . labelsErrors . Inc ( )
2020-11-23 08:51:40 +00:00
err = fmt . Errorf ( "cannot get labels from vmstorage %s: %w" , sn . connPool . Addr ( ) , err )
}
return & nodeResult {
labels : labels ,
err : err ,
}
} )
2019-05-22 21:16:55 +00:00
2019-05-22 21:23:23 +00:00
// Collect results
var labels [ ] string
2020-11-23 08:51:40 +00:00
isPartial , err := snr . collectResults ( partialLabelsResults , func ( result interface { } ) error {
2020-11-22 22:15:51 +00:00
nr := result . ( * nodeResult )
2019-05-22 21:23:23 +00:00
if nr . err != nil {
2020-11-22 22:15:51 +00:00
return nr . err
2019-05-22 21:23:23 +00:00
}
labels = append ( labels , nr . labels ... )
2020-11-22 22:15:51 +00:00
return nil
} )
if err != nil {
return nil , isPartial , fmt . Errorf ( "cannot fetch labels from vmstorage nodes: %w" , err )
2019-05-22 21:23:23 +00:00
}
// Deduplicate labels
labels = deduplicateStrings ( labels )
2019-05-22 21:16:55 +00:00
// Substitute "" with "__name__"
for i := range labels {
if labels [ i ] == "" {
labels [ i ] = "__name__"
}
}
// Sort labels like Prometheus does
sort . Strings ( labels )
2020-11-14 10:36:21 +00:00
return labels , isPartial , nil
2019-05-22 21:16:55 +00:00
}
2020-11-04 22:15:43 +00:00
// GetLabelValuesOnTimeRange returns label values for the given labelName on the given tr
// until the given deadline.
2020-11-14 10:36:21 +00:00
func GetLabelValuesOnTimeRange ( at * auth . Token , denyPartialResponse bool , labelName string , tr storage . TimeRange , deadline searchutils . Deadline ) ( [ ] string , bool , error ) {
2020-11-04 22:15:43 +00:00
if deadline . Exceeded ( ) {
return nil , false , fmt . Errorf ( "timeout exceeded before starting the query processing: %s" , deadline . String ( ) )
}
if labelName == "__name__" {
labelName = ""
}
// Send the query to all the storage nodes in parallel.
type nodeResult struct {
labelValues [ ] string
err error
}
2020-11-23 10:33:17 +00:00
snr := startStorageNodesRequest ( denyPartialResponse , func ( idx int , sn * storageNode ) interface { } {
2020-11-23 08:51:40 +00:00
sn . labelValuesOnTimeRangeRequests . Inc ( )
labelValues , err := sn . getLabelValuesOnTimeRange ( at . AccountID , at . ProjectID , labelName , tr , deadline )
if err != nil {
2020-11-23 13:00:04 +00:00
sn . labelValuesOnTimeRangeErrors . Inc ( )
2020-11-23 08:51:40 +00:00
err = fmt . Errorf ( "cannot get label values on time range from vmstorage %s: %w" , sn . connPool . Addr ( ) , err )
}
return & nodeResult {
labelValues : labelValues ,
err : err ,
}
} )
2020-11-04 22:15:43 +00:00
// Collect results
var labelValues [ ] string
2020-11-23 08:51:40 +00:00
isPartial , err := snr . collectResults ( partialLabelValuesOnTimeRangeResults , func ( result interface { } ) error {
2020-11-22 22:15:51 +00:00
nr := result . ( * nodeResult )
2020-11-04 22:15:43 +00:00
if nr . err != nil {
2020-11-22 22:15:51 +00:00
return nr . err
2020-11-04 22:15:43 +00:00
}
labelValues = append ( labelValues , nr . labelValues ... )
2020-11-22 22:15:51 +00:00
return nil
} )
if err != nil {
return nil , isPartial , fmt . Errorf ( "cannot fetch label values on time range from vmstorage nodes: %w" , err )
2020-11-04 22:15:43 +00:00
}
// Deduplicate label values
labelValues = deduplicateStrings ( labelValues )
// Sort labelValues like Prometheus does
sort . Strings ( labelValues )
2020-11-14 10:36:21 +00:00
return labelValues , isPartial , nil
2020-11-04 22:15:43 +00:00
}
2020-11-16 01:31:09 +00:00
// GetGraphiteTagValues returns tag values for the given tagName until the given deadline.
2020-11-16 01:58:12 +00:00
func GetGraphiteTagValues ( at * auth . Token , denyPartialResponse bool , tagName , filter string , limit int , deadline searchutils . Deadline ) ( [ ] string , bool , error ) {
2020-11-16 01:31:09 +00:00
if deadline . Exceeded ( ) {
return nil , false , fmt . Errorf ( "timeout exceeded before starting the query processing: %s" , deadline . String ( ) )
}
if tagName == "name" {
tagName = ""
}
tagValues , isPartial , err := GetLabelValues ( at , denyPartialResponse , tagName , deadline )
if err != nil {
return nil , false , err
}
2020-11-16 01:58:12 +00:00
if len ( filter ) > 0 {
tagValues , err = applyGraphiteRegexpFilter ( filter , tagValues )
if err != nil {
return nil , false , err
}
}
if limit > 0 && limit < len ( tagValues ) {
2020-11-16 01:31:09 +00:00
tagValues = tagValues [ : limit ]
}
return tagValues , isPartial , nil
}
2019-05-22 21:16:55 +00:00
// GetLabelValues returns label values for the given labelName
// until the given deadline.
2020-11-14 10:36:21 +00:00
func GetLabelValues ( at * auth . Token , denyPartialResponse bool , labelName string , deadline searchutils . Deadline ) ( [ ] string , bool , error ) {
2020-07-21 15:34:59 +00:00
if deadline . Exceeded ( ) {
return nil , false , fmt . Errorf ( "timeout exceeded before starting the query processing: %s" , deadline . String ( ) )
}
2019-05-22 21:16:55 +00:00
if labelName == "__name__" {
labelName = ""
}
2019-05-22 21:23:23 +00:00
// Send the query to all the storage nodes in parallel.
type nodeResult struct {
labelValues [ ] string
err error
}
2020-11-23 10:33:17 +00:00
snr := startStorageNodesRequest ( denyPartialResponse , func ( idx int , sn * storageNode ) interface { } {
2020-11-23 08:51:40 +00:00
sn . labelValuesRequests . Inc ( )
labelValues , err := sn . getLabelValues ( at . AccountID , at . ProjectID , labelName , deadline )
if err != nil {
2020-11-23 13:00:04 +00:00
sn . labelValuesErrors . Inc ( )
2020-11-23 08:51:40 +00:00
err = fmt . Errorf ( "cannot get label values from vmstorage %s: %w" , sn . connPool . Addr ( ) , err )
}
return & nodeResult {
labelValues : labelValues ,
err : err ,
}
} )
2019-05-22 21:16:55 +00:00
2019-05-22 21:23:23 +00:00
// Collect results
var labelValues [ ] string
2020-11-23 08:51:40 +00:00
isPartial , err := snr . collectResults ( partialLabelValuesResults , func ( result interface { } ) error {
2020-11-22 22:15:51 +00:00
nr := result . ( * nodeResult )
2019-05-22 21:23:23 +00:00
if nr . err != nil {
2020-11-22 22:15:51 +00:00
return nr . err
2019-05-22 21:23:23 +00:00
}
labelValues = append ( labelValues , nr . labelValues ... )
2020-11-22 22:15:51 +00:00
return nil
} )
if err != nil {
return nil , isPartial , fmt . Errorf ( "cannot fetch label values from vmstorage nodes: %w" , err )
2019-05-22 21:23:23 +00:00
}
2019-06-10 15:55:20 +00:00
// Deduplicate label values
2019-05-22 21:23:23 +00:00
labelValues = deduplicateStrings ( labelValues )
2019-05-22 21:16:55 +00:00
// Sort labelValues like Prometheus does
sort . Strings ( labelValues )
2020-11-14 10:36:21 +00:00
return labelValues , isPartial , nil
2019-05-22 21:16:55 +00:00
}
2020-09-10 21:29:26 +00:00
// GetTagValueSuffixes returns tag value suffixes for the given tagKey and the given tagValuePrefix.
//
// It can be used for implementing https://graphite-api.readthedocs.io/en/latest/api.html#metrics-find
2020-11-14 10:36:21 +00:00
func GetTagValueSuffixes ( at * auth . Token , denyPartialResponse bool , tr storage . TimeRange , tagKey , tagValuePrefix string ,
delimiter byte , deadline searchutils . Deadline ) ( [ ] string , bool , error ) {
2020-09-10 21:29:26 +00:00
if deadline . Exceeded ( ) {
return nil , false , fmt . Errorf ( "timeout exceeded before starting the query processing: %s" , deadline . String ( ) )
}
// Send the query to all the storage nodes in parallel.
type nodeResult struct {
suffixes [ ] string
err error
}
2020-11-23 10:33:17 +00:00
snr := startStorageNodesRequest ( denyPartialResponse , func ( idx int , sn * storageNode ) interface { } {
2020-11-23 08:51:40 +00:00
sn . tagValueSuffixesRequests . Inc ( )
suffixes , err := sn . getTagValueSuffixes ( at . AccountID , at . ProjectID , tr , tagKey , tagValuePrefix , delimiter , deadline )
if err != nil {
2020-11-23 13:00:04 +00:00
sn . tagValueSuffixesErrors . Inc ( )
2020-11-23 08:51:40 +00:00
err = fmt . Errorf ( "cannot get tag value suffixes for tr=%s, tagKey=%q, tagValuePrefix=%q, delimiter=%c from vmstorage %s: %w" ,
tr . String ( ) , tagKey , tagValuePrefix , delimiter , sn . connPool . Addr ( ) , err )
}
return & nodeResult {
suffixes : suffixes ,
err : err ,
}
} )
2020-09-10 21:29:26 +00:00
// Collect results
m := make ( map [ string ] struct { } )
2020-11-23 08:51:40 +00:00
isPartial , err := snr . collectResults ( partialTagValueSuffixesResults , func ( result interface { } ) error {
2020-11-22 22:15:51 +00:00
nr := result . ( * nodeResult )
2020-09-10 21:29:26 +00:00
if nr . err != nil {
2020-11-22 22:15:51 +00:00
return nr . err
2020-09-10 21:29:26 +00:00
}
for _ , suffix := range nr . suffixes {
m [ suffix ] = struct { } { }
}
2020-11-22 22:15:51 +00:00
return nil
} )
if err != nil {
return nil , isPartial , fmt . Errorf ( "cannot fetch tag value suffixes from vmstorage nodes: %w" , err )
2020-09-10 21:29:26 +00:00
}
suffixes := make ( [ ] string , 0 , len ( m ) )
for suffix := range m {
suffixes = append ( suffixes , suffix )
}
2020-11-14 10:36:21 +00:00
return suffixes , isPartial , nil
2020-09-10 21:29:26 +00:00
}
2019-06-10 15:55:20 +00:00
// GetLabelEntries returns all the label entries for at until the given deadline.
2020-11-14 10:36:21 +00:00
func GetLabelEntries ( at * auth . Token , denyPartialResponse bool , deadline searchutils . Deadline ) ( [ ] storage . TagEntry , bool , error ) {
2020-07-21 15:34:59 +00:00
if deadline . Exceeded ( ) {
return nil , false , fmt . Errorf ( "timeout exceeded before starting the query processing: %s" , deadline . String ( ) )
}
2019-06-10 15:55:20 +00:00
// Send the query to all the storage nodes in parallel.
type nodeResult struct {
labelEntries [ ] storage . TagEntry
err error
}
2020-11-23 10:33:17 +00:00
snr := startStorageNodesRequest ( denyPartialResponse , func ( idx int , sn * storageNode ) interface { } {
2020-11-23 08:51:40 +00:00
sn . labelEntriesRequests . Inc ( )
labelEntries , err := sn . getLabelEntries ( at . AccountID , at . ProjectID , deadline )
if err != nil {
2020-11-23 13:00:04 +00:00
sn . labelEntriesErrors . Inc ( )
2020-11-23 08:51:40 +00:00
err = fmt . Errorf ( "cannot get label entries from vmstorage %s: %w" , sn . connPool . Addr ( ) , err )
}
return & nodeResult {
labelEntries : labelEntries ,
err : err ,
}
} )
2019-06-10 15:55:20 +00:00
// Collect results
var labelEntries [ ] storage . TagEntry
2020-11-23 08:51:40 +00:00
isPartial , err := snr . collectResults ( partialLabelEntriesResults , func ( result interface { } ) error {
2020-11-22 22:15:51 +00:00
nr := result . ( * nodeResult )
2019-06-10 15:55:20 +00:00
if nr . err != nil {
2020-11-22 22:15:51 +00:00
return nr . err
2019-06-10 15:55:20 +00:00
}
labelEntries = append ( labelEntries , nr . labelEntries ... )
2020-11-22 22:15:51 +00:00
return nil
} )
if err != nil {
return nil , isPartial , fmt . Errorf ( "cannot featch label etnries from vmstorage nodes: %w" , err )
2019-06-10 15:55:20 +00:00
}
// Substitute "" with "__name__"
for i := range labelEntries {
e := & labelEntries [ i ]
if e . Key == "" {
e . Key = "__name__"
}
}
2019-06-10 16:51:05 +00:00
// Deduplicate label entries
labelEntries = deduplicateLabelEntries ( labelEntries )
2019-06-10 15:55:20 +00:00
// Sort labelEntries by the number of label values in each entry.
sort . Slice ( labelEntries , func ( i , j int ) bool {
a , b := labelEntries [ i ] . Values , labelEntries [ j ] . Values
2019-12-14 22:07:09 +00:00
if len ( a ) != len ( b ) {
return len ( a ) > len ( b )
2019-06-10 15:55:20 +00:00
}
2019-12-14 22:07:09 +00:00
return labelEntries [ i ] . Key > labelEntries [ j ] . Key
2019-06-10 15:55:20 +00:00
} )
2020-11-14 10:36:21 +00:00
return labelEntries , isPartial , nil
2019-06-10 15:55:20 +00:00
}
func deduplicateLabelEntries ( src [ ] storage . TagEntry ) [ ] storage . TagEntry {
m := make ( map [ string ] [ ] string , len ( src ) )
for i := range src {
e := & src [ i ]
m [ e . Key ] = append ( m [ e . Key ] , e . Values ... )
}
dst := make ( [ ] storage . TagEntry , 0 , len ( m ) )
for key , values := range m {
values := deduplicateStrings ( values )
sort . Strings ( values )
dst = append ( dst , storage . TagEntry {
Key : key ,
Values : values ,
} )
}
return dst
}
2019-05-22 21:23:23 +00:00
func deduplicateStrings ( a [ ] string ) [ ] string {
m := make ( map [ string ] bool , len ( a ) )
for _ , s := range a {
m [ s ] = true
2019-05-22 21:16:55 +00:00
}
2019-05-22 21:23:23 +00:00
a = a [ : 0 ]
for s := range m {
a = append ( a , s )
}
return a
2019-05-22 21:16:55 +00:00
}
2020-04-22 16:57:36 +00:00
// GetTSDBStatusForDate returns tsdb status according to https://prometheus.io/docs/prometheus/latest/querying/api/#tsdb-stats
2020-11-14 10:36:21 +00:00
func GetTSDBStatusForDate ( at * auth . Token , denyPartialResponse bool , deadline searchutils . Deadline , date uint64 , topN int ) ( * storage . TSDBStatus , bool , error ) {
2020-07-21 15:34:59 +00:00
if deadline . Exceeded ( ) {
return nil , false , fmt . Errorf ( "timeout exceeded before starting the query processing: %s" , deadline . String ( ) )
}
2020-04-22 16:57:36 +00:00
// Send the query to all the storage nodes in parallel.
type nodeResult struct {
status * storage . TSDBStatus
err error
}
2020-11-23 10:33:17 +00:00
snr := startStorageNodesRequest ( denyPartialResponse , func ( idx int , sn * storageNode ) interface { } {
2020-11-23 08:51:40 +00:00
sn . tsdbStatusRequests . Inc ( )
status , err := sn . getTSDBStatusForDate ( at . AccountID , at . ProjectID , date , topN , deadline )
if err != nil {
2020-11-23 13:00:04 +00:00
sn . tsdbStatusErrors . Inc ( )
2020-11-23 08:51:40 +00:00
err = fmt . Errorf ( "cannot obtain tsdb status from vmstorage %s: %w" , sn . connPool . Addr ( ) , err )
}
return & nodeResult {
status : status ,
err : err ,
}
} )
2020-04-22 16:57:36 +00:00
// Collect results.
var statuses [ ] * storage . TSDBStatus
2020-11-23 08:51:40 +00:00
isPartial , err := snr . collectResults ( partialTSDBStatusResults , func ( result interface { } ) error {
2020-11-22 22:15:51 +00:00
nr := result . ( * nodeResult )
2020-04-22 16:57:36 +00:00
if nr . err != nil {
2020-11-22 22:15:51 +00:00
return nr . err
2020-04-22 16:57:36 +00:00
}
statuses = append ( statuses , nr . status )
2020-11-22 22:15:51 +00:00
return nil
} )
if err != nil {
return nil , isPartial , fmt . Errorf ( "cannot fetch tsdb status from vmstorage nodes: %w" , err )
2020-04-22 16:57:36 +00:00
}
status := mergeTSDBStatuses ( statuses , topN )
2020-11-14 10:36:21 +00:00
return status , isPartial , nil
2020-04-22 16:57:36 +00:00
}
func mergeTSDBStatuses ( statuses [ ] * storage . TSDBStatus , topN int ) * storage . TSDBStatus {
seriesCountByMetricName := make ( map [ string ] uint64 )
labelValueCountByLabelName := make ( map [ string ] uint64 )
seriesCountByLabelValuePair := make ( map [ string ] uint64 )
for _ , st := range statuses {
for _ , e := range st . SeriesCountByMetricName {
seriesCountByMetricName [ e . Name ] += e . Count
}
for _ , e := range st . LabelValueCountByLabelName {
// Label values are copied among vmstorage nodes,
// so select the maximum label values count.
if e . Count > labelValueCountByLabelName [ e . Name ] {
labelValueCountByLabelName [ e . Name ] = e . Count
}
}
for _ , e := range st . SeriesCountByLabelValuePair {
seriesCountByLabelValuePair [ e . Name ] += e . Count
}
}
return & storage . TSDBStatus {
SeriesCountByMetricName : toTopHeapEntries ( seriesCountByMetricName , topN ) ,
LabelValueCountByLabelName : toTopHeapEntries ( labelValueCountByLabelName , topN ) ,
SeriesCountByLabelValuePair : toTopHeapEntries ( seriesCountByLabelValuePair , topN ) ,
}
}
func toTopHeapEntries ( m map [ string ] uint64 , topN int ) [ ] storage . TopHeapEntry {
a := make ( [ ] storage . TopHeapEntry , 0 , len ( m ) )
for name , count := range m {
a = append ( a , storage . TopHeapEntry {
Name : name ,
Count : count ,
} )
}
sort . Slice ( a , func ( i , j int ) bool {
if a [ i ] . Count != a [ j ] . Count {
return a [ i ] . Count > a [ j ] . Count
}
return a [ i ] . Name < a [ j ] . Name
} )
if len ( a ) > topN {
a = a [ : topN ]
}
return a
}
2021-05-12 12:18:45 +00:00
// GetTSDBStatusWithFilters returns tsdb status according to https://prometheus.io/docs/prometheus/latest/querying/api/#tsdb-stats
//
// It accepts aribtrary filters on time series in sq.
func GetTSDBStatusWithFilters ( at * auth . Token , denyPartialResponse bool , deadline searchutils . Deadline , sq * storage . SearchQuery , topN int ) ( * storage . TSDBStatus , bool , error ) {
if deadline . Exceeded ( ) {
return nil , false , fmt . Errorf ( "timeout exceeded before starting the query processing: %s" , deadline . String ( ) )
}
requestData := sq . Marshal ( nil )
// Send the query to all the storage nodes in parallel.
type nodeResult struct {
status * storage . TSDBStatus
err error
}
snr := startStorageNodesRequest ( denyPartialResponse , func ( idx int , sn * storageNode ) interface { } {
sn . tsdbStatusWithFiltersRequests . Inc ( )
status , err := sn . getTSDBStatusWithFilters ( requestData , topN , deadline )
if err != nil {
sn . tsdbStatusWithFiltersErrors . Inc ( )
err = fmt . Errorf ( "cannot obtain tsdb status with filters from vmstorage %s: %w" , sn . connPool . Addr ( ) , err )
}
return & nodeResult {
status : status ,
err : err ,
}
} )
// Collect results.
var statuses [ ] * storage . TSDBStatus
isPartial , err := snr . collectResults ( partialTSDBStatusResults , func ( result interface { } ) error {
nr := result . ( * nodeResult )
if nr . err != nil {
return nr . err
}
statuses = append ( statuses , nr . status )
return nil
} )
if err != nil {
return nil , isPartial , fmt . Errorf ( "cannot fetch tsdb status with filters from vmstorage nodes: %w" , err )
}
status := mergeTSDBStatuses ( statuses , topN )
return status , isPartial , nil
}
2019-05-22 21:23:23 +00:00
// GetSeriesCount returns the number of unique series for the given at.
2020-11-14 10:36:21 +00:00
func GetSeriesCount ( at * auth . Token , denyPartialResponse bool , deadline searchutils . Deadline ) ( uint64 , bool , error ) {
2020-07-21 15:34:59 +00:00
if deadline . Exceeded ( ) {
return 0 , false , fmt . Errorf ( "timeout exceeded before starting the query processing: %s" , deadline . String ( ) )
}
2019-05-22 21:23:23 +00:00
// Send the query to all the storage nodes in parallel.
type nodeResult struct {
n uint64
err error
}
2020-11-23 10:33:17 +00:00
snr := startStorageNodesRequest ( denyPartialResponse , func ( idx int , sn * storageNode ) interface { } {
2020-11-23 08:51:40 +00:00
sn . seriesCountRequests . Inc ( )
n , err := sn . getSeriesCount ( at . AccountID , at . ProjectID , deadline )
if err != nil {
2020-11-23 13:00:04 +00:00
sn . seriesCountErrors . Inc ( )
2020-11-23 08:51:40 +00:00
err = fmt . Errorf ( "cannot get series count from vmstorage %s: %w" , sn . connPool . Addr ( ) , err )
}
return & nodeResult {
n : n ,
err : err ,
}
} )
2019-05-22 21:16:55 +00:00
2019-05-22 21:23:23 +00:00
// Collect results
var n uint64
2020-11-23 08:51:40 +00:00
isPartial , err := snr . collectResults ( partialSeriesCountResults , func ( result interface { } ) error {
2020-11-22 22:15:51 +00:00
nr := result . ( * nodeResult )
2019-05-22 21:23:23 +00:00
if nr . err != nil {
2020-11-22 22:15:51 +00:00
return nr . err
2019-05-22 21:23:23 +00:00
}
n += nr . n
2020-11-22 22:15:51 +00:00
return nil
} )
if err != nil {
return 0 , isPartial , fmt . Errorf ( "cannot fetch series count from vmstorage nodes: %w" , err )
2019-05-22 21:23:23 +00:00
}
2020-11-14 10:36:21 +00:00
return n , isPartial , nil
2019-05-22 21:23:23 +00:00
}
2019-05-22 21:16:55 +00:00
2019-09-28 09:20:50 +00:00
type tmpBlocksFileWrapper struct {
2020-04-26 13:45:51 +00:00
mu sync . Mutex
tbf * tmpBlocksFile
m map [ string ] [ ] tmpBlockAddr
orderedMetricNames [ ] string
2019-09-28 09:20:50 +00:00
}
2020-09-24 17:16:19 +00:00
func ( tbfw * tmpBlocksFileWrapper ) RegisterEmptyBlock ( mb * storage . MetricBlock ) {
metricName := mb . MetricName
tbfw . mu . Lock ( )
if addrs := tbfw . m [ string ( metricName ) ] ; addrs == nil {
// An optimization for big number of time series with long names: store only a single copy of metricNameStr
// in both tbfw.orderedMetricNames and tbfw.m.
tbfw . orderedMetricNames = append ( tbfw . orderedMetricNames , string ( metricName ) )
tbfw . m [ tbfw . orderedMetricNames [ len ( tbfw . orderedMetricNames ) - 1 ] ] = [ ] tmpBlockAddr { { } }
}
tbfw . mu . Unlock ( )
}
func ( tbfw * tmpBlocksFileWrapper ) RegisterAndWriteBlock ( mb * storage . MetricBlock ) error {
2019-09-28 17:38:24 +00:00
bb := tmpBufPool . Get ( )
2020-04-27 05:13:41 +00:00
bb . B = storage . MarshalBlock ( bb . B [ : 0 ] , & mb . Block )
2019-09-28 09:20:50 +00:00
tbfw . mu . Lock ( )
2019-09-28 17:38:24 +00:00
addr , err := tbfw . tbf . WriteBlockData ( bb . B )
tmpBufPool . Put ( bb )
if err == nil {
metricName := mb . MetricName
2021-07-30 06:55:08 +00:00
addrs := tbfw . m [ string ( metricName ) ]
2020-07-23 10:48:08 +00:00
addrs = append ( addrs , addr )
2020-07-23 11:11:08 +00:00
if len ( addrs ) > 1 {
2021-07-30 06:55:08 +00:00
tbfw . m [ string ( metricName ) ] = addrs
2020-07-23 10:48:08 +00:00
} else {
// An optimization for big number of time series with long names: store only a single copy of metricNameStr
// in both tbfw.orderedMetricNames and tbfw.m.
2020-07-23 14:55:18 +00:00
tbfw . orderedMetricNames = append ( tbfw . orderedMetricNames , string ( metricName ) )
tbfw . m [ tbfw . orderedMetricNames [ len ( tbfw . orderedMetricNames ) - 1 ] ] = addrs
2020-04-26 13:45:51 +00:00
}
2019-09-28 17:38:24 +00:00
}
tbfw . mu . Unlock ( )
return err
2019-09-28 09:20:50 +00:00
}
2020-09-26 01:29:45 +00:00
var metricNamePool = & sync . Pool {
New : func ( ) interface { } {
return & storage . MetricName { }
} ,
}
// ExportBlocks searches for time series matching sq and calls f for each found block.
//
// f is called in parallel from multiple goroutines.
// It is the responsibility of f to call b.UnmarshalData before reading timestamps and values from the block.
// It is the responsibility of f to filter blocks according to the given tr.
2020-11-14 10:36:21 +00:00
func ExportBlocks ( at * auth . Token , sq * storage . SearchQuery , deadline searchutils . Deadline , f func ( mn * storage . MetricName , b * storage . Block , tr storage . TimeRange ) error ) error {
2020-09-26 01:29:45 +00:00
if deadline . Exceeded ( ) {
2020-11-14 10:36:21 +00:00
return fmt . Errorf ( "timeout exceeded before starting data export: %s" , deadline . String ( ) )
2020-09-26 01:29:45 +00:00
}
tr := storage . TimeRange {
MinTimestamp : sq . MinTimestamp ,
MaxTimestamp : sq . MaxTimestamp ,
}
2020-11-23 08:25:28 +00:00
var wg syncwg . WaitGroup
var stopped uint32
2020-09-26 01:29:45 +00:00
processBlock := func ( mb * storage . MetricBlock ) error {
2020-11-23 08:25:28 +00:00
wg . Add ( 1 )
defer wg . Done ( )
if atomic . LoadUint32 ( & stopped ) != 0 {
return nil
}
2020-09-26 01:29:45 +00:00
mn := metricNamePool . Get ( ) . ( * storage . MetricName )
if err := mn . Unmarshal ( mb . MetricName ) ; err != nil {
return fmt . Errorf ( "cannot unmarshal metricName: %w" , err )
}
if err := f ( mn , & mb . Block , tr ) ; err != nil {
return err
}
mn . Reset ( )
metricNamePool . Put ( mn )
return nil
}
2020-11-14 10:36:21 +00:00
_ , err := processSearchQuery ( at , true , sq , true , processBlock , deadline )
2020-11-23 08:25:28 +00:00
// Make sure processBlock isn't called anymore in order to prevent from data races.
atomic . StoreUint32 ( & stopped , 1 )
wg . Wait ( )
2020-09-26 01:29:45 +00:00
if err != nil {
2020-11-14 10:36:21 +00:00
return fmt . Errorf ( "error occured during export: %w" , err )
2020-09-26 01:29:45 +00:00
}
2020-11-14 10:36:21 +00:00
return nil
2020-09-26 01:29:45 +00:00
}
2020-11-16 08:55:55 +00:00
// SearchMetricNames returns all the metric names matching sq until the given deadline.
func SearchMetricNames ( at * auth . Token , denyPartialResponse bool , sq * storage . SearchQuery , deadline searchutils . Deadline ) ( [ ] storage . MetricName , bool , error ) {
if deadline . Exceeded ( ) {
return nil , false , fmt . Errorf ( "timeout exceeded before starting to search metric names: %s" , deadline . String ( ) )
}
requestData := sq . Marshal ( nil )
// Send the query to all the storage nodes in parallel.
type nodeResult struct {
metricNames [ ] [ ] byte
err error
}
2020-11-23 10:33:17 +00:00
snr := startStorageNodesRequest ( denyPartialResponse , func ( idx int , sn * storageNode ) interface { } {
2020-11-23 08:51:40 +00:00
sn . searchMetricNamesRequests . Inc ( )
metricNames , err := sn . processSearchMetricNames ( requestData , deadline )
if err != nil {
2020-11-23 13:00:04 +00:00
sn . searchMetricNamesErrors . Inc ( )
2020-11-23 08:51:40 +00:00
err = fmt . Errorf ( "cannot search metric names on vmstorage %s: %w" , sn . connPool . Addr ( ) , err )
}
return & nodeResult {
metricNames : metricNames ,
err : err ,
}
} )
2020-11-16 08:55:55 +00:00
// Collect results.
metricNames := make ( map [ string ] struct { } )
2020-11-23 08:51:40 +00:00
isPartial , err := snr . collectResults ( partialSearchMetricNamesResults , func ( result interface { } ) error {
2020-11-22 22:15:51 +00:00
nr := result . ( * nodeResult )
2020-11-16 08:55:55 +00:00
if nr . err != nil {
2020-11-22 22:15:51 +00:00
return nr . err
2020-11-16 08:55:55 +00:00
}
for _ , metricName := range nr . metricNames {
metricNames [ string ( metricName ) ] = struct { } { }
}
2020-11-22 22:15:51 +00:00
return nil
} )
if err != nil {
return nil , isPartial , fmt . Errorf ( "cannot fetch metric names from vmstorage nodes: %w" , err )
2020-11-16 08:55:55 +00:00
}
// Unmarshal metricNames
mns := make ( [ ] storage . MetricName , len ( metricNames ) )
i := 0
for metricName := range metricNames {
mn := & mns [ i ]
if err := mn . Unmarshal ( bytesutil . ToUnsafeBytes ( metricName ) ) ; err != nil {
return nil , false , fmt . Errorf ( "cannot unmarshal metric name obtained from vmstorage: %w; metricName=%q" , err , metricName )
}
i ++
}
return mns , isPartial , nil
}
2020-09-26 01:29:45 +00:00
// ProcessSearchQuery performs sq until the given deadline.
2020-09-15 17:39:34 +00:00
//
// Results.RunParallel or Results.Cancel must be called on the returned Results.
2020-11-14 10:36:21 +00:00
func ProcessSearchQuery ( at * auth . Token , denyPartialResponse bool , sq * storage . SearchQuery , fetchData bool , deadline searchutils . Deadline ) ( * Results , bool , error ) {
2020-07-21 15:34:59 +00:00
if deadline . Exceeded ( ) {
return nil , false , fmt . Errorf ( "timeout exceeded before starting the query processing: %s" , deadline . String ( ) )
}
2019-05-22 21:16:55 +00:00
tr := storage . TimeRange {
MinTimestamp : sq . MinTimestamp ,
MaxTimestamp : sq . MaxTimestamp ,
}
2019-09-28 09:20:50 +00:00
tbfw := & tmpBlocksFileWrapper {
tbf : getTmpBlocksFile ( ) ,
m : make ( map [ string ] [ ] tmpBlockAddr ) ,
}
2020-11-23 08:25:28 +00:00
var wg syncwg . WaitGroup
var stopped uint32
2021-07-28 14:40:09 +00:00
var samples uint64
2020-09-26 01:29:45 +00:00
processBlock := func ( mb * storage . MetricBlock ) error {
2020-11-23 08:25:28 +00:00
wg . Add ( 1 )
defer wg . Done ( )
if atomic . LoadUint32 ( & stopped ) != 0 {
return nil
}
2020-09-26 01:29:45 +00:00
if ! fetchData {
tbfw . RegisterEmptyBlock ( mb )
return nil
}
2021-07-28 14:40:09 +00:00
n := atomic . AddUint64 ( & samples , uint64 ( mb . Block . RowsCount ( ) ) )
if * maxSamplesPerQuery > 0 && n > uint64 ( * maxSamplesPerQuery ) {
return fmt . Errorf ( "cannot select more than -search.maxSamplesPerQuery=%d samples; possible solutions: to increase the -search.maxSamplesPerQuery; to reduce time range for the query; to use more specific label filters in order to select lower number of series" , * maxSamplesPerQuery )
}
2020-09-26 01:29:45 +00:00
if err := tbfw . RegisterAndWriteBlock ( mb ) ; err != nil {
return fmt . Errorf ( "cannot write MetricBlock to temporary blocks file: %w" , err )
}
return nil
}
2020-11-14 10:36:21 +00:00
isPartial , err := processSearchQuery ( at , denyPartialResponse , sq , fetchData , processBlock , deadline )
2020-11-23 08:25:28 +00:00
// Make sure processBlock isn't called anymore in order to protect from data races.
atomic . StoreUint32 ( & stopped , 1 )
wg . Wait ( )
2020-09-26 01:29:45 +00:00
if err != nil {
putTmpBlocksFile ( tbfw . tbf )
2020-11-10 16:48:50 +00:00
return nil , false , fmt . Errorf ( "error occured during search: %w" , err )
2020-09-26 01:29:45 +00:00
}
if err := tbfw . tbf . Finalize ( ) ; err != nil {
putTmpBlocksFile ( tbfw . tbf )
return nil , false , fmt . Errorf ( "cannot finalize temporary blocks file with %d time series: %w" , len ( tbfw . m ) , err )
}
var rss Results
rss . at = at
rss . tr = tr
rss . fetchData = fetchData
rss . deadline = deadline
rss . tbf = tbfw . tbf
pts := make ( [ ] packedTimeseries , len ( tbfw . orderedMetricNames ) )
for i , metricName := range tbfw . orderedMetricNames {
pts [ i ] = packedTimeseries {
metricName : metricName ,
addrs : tbfw . m [ metricName ] ,
}
}
rss . packedTimeseries = pts
2020-11-14 10:36:21 +00:00
return & rss , isPartial , nil
2020-09-26 01:29:45 +00:00
}
2020-11-14 10:36:21 +00:00
func processSearchQuery ( at * auth . Token , denyPartialResponse bool , sq * storage . SearchQuery , fetchData bool ,
processBlock func ( mb * storage . MetricBlock ) error , deadline searchutils . Deadline ) ( bool , error ) {
2020-09-26 01:29:45 +00:00
requestData := sq . Marshal ( nil )
// Send the query to all the storage nodes in parallel.
2020-11-23 10:33:17 +00:00
snr := startStorageNodesRequest ( denyPartialResponse , func ( idx int , sn * storageNode ) interface { } {
2020-11-23 08:51:40 +00:00
sn . searchRequests . Inc ( )
err := sn . processSearchQuery ( requestData , fetchData , processBlock , deadline )
if err != nil {
2020-11-23 13:00:04 +00:00
sn . searchErrors . Inc ( )
2020-11-23 08:51:40 +00:00
err = fmt . Errorf ( "cannot perform search on vmstorage %s: %w" , sn . connPool . Addr ( ) , err )
}
return & err
} )
2019-05-22 21:16:55 +00:00
2019-05-22 21:23:23 +00:00
// Collect results.
2020-11-23 08:51:40 +00:00
isPartial , err := snr . collectResults ( partialSearchResults , func ( result interface { } ) error {
2020-11-22 22:15:51 +00:00
errP := result . ( * error )
return * errP
} )
if err != nil {
return isPartial , fmt . Errorf ( "cannot fetch query results from vmstorage nodes: %w" , err )
}
return isPartial , nil
}
2020-11-23 08:51:40 +00:00
type storageNodesRequest struct {
denyPartialResponse bool
resultsCh chan interface { }
}
2020-11-23 10:33:17 +00:00
func startStorageNodesRequest ( denyPartialResponse bool , f func ( idx int , sn * storageNode ) interface { } ) * storageNodesRequest {
2020-11-23 08:51:40 +00:00
resultsCh := make ( chan interface { } , len ( storageNodes ) )
2020-11-23 10:33:17 +00:00
for idx , sn := range storageNodes {
go func ( idx int , sn * storageNode ) {
result := f ( idx , sn )
2020-11-23 08:51:40 +00:00
resultsCh <- result
2020-11-23 10:33:17 +00:00
} ( idx , sn )
2020-11-23 08:51:40 +00:00
}
return & storageNodesRequest {
denyPartialResponse : denyPartialResponse ,
resultsCh : resultsCh ,
}
}
2020-11-23 10:33:17 +00:00
func ( snr * storageNodesRequest ) collectAllResults ( f func ( result interface { } ) error ) error {
2020-11-23 08:51:40 +00:00
var errors [ ] error
for i := 0 ; i < len ( storageNodes ) ; i ++ {
result := <- snr . resultsCh
if err := f ( result ) ; err != nil {
errors = append ( errors , err )
continue
}
}
if len ( errors ) > 0 {
return errors [ 0 ]
}
return nil
}
func ( snr * storageNodesRequest ) collectResults ( partialResultsCounter * metrics . Counter , f func ( result interface { } ) error ) ( bool , error ) {
2019-05-22 21:23:23 +00:00
var errors [ ] error
2020-11-22 22:39:34 +00:00
resultsCollected := 0
2019-05-22 21:23:23 +00:00
for i := 0 ; i < len ( storageNodes ) ; i ++ {
2020-11-23 10:33:17 +00:00
// There is no need in timer here, since all the goroutines executing the f function
// passed to startStorageNodesRequest must be finished until the deadline.
2020-11-23 08:51:40 +00:00
result := <- snr . resultsCh
if err := f ( result ) ; err != nil {
2019-09-28 09:20:50 +00:00
errors = append ( errors , err )
2019-05-22 21:23:23 +00:00
continue
2019-05-22 21:16:55 +00:00
}
2020-11-22 22:39:34 +00:00
resultsCollected ++
if resultsCollected > len ( storageNodes ) - * replicationFactor {
// There is no need in waiting for the remaining results,
// because the collected results contain all the data according to the given -replicationFactor.
// This should speed up responses when a part of vmstorage nodes are slow and/or temporarily unavailable.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/711
//
2020-11-23 08:51:40 +00:00
// It is expected that cap(snr.resultsCh) == len(storageNodes), otherwise goroutine leak is possible.
2020-11-22 22:39:34 +00:00
return false , nil
}
2019-05-22 21:16:55 +00:00
}
2020-11-14 10:36:21 +00:00
isPartial := false
2019-05-22 21:23:23 +00:00
if len ( errors ) > 0 {
2020-04-22 16:57:36 +00:00
if len ( errors ) == len ( storageNodes ) {
2020-11-22 22:15:51 +00:00
// All the vmstorage nodes returned error.
2019-05-22 21:23:23 +00:00
// Return only the first error, since it has no sense in returning all errors.
2020-11-10 16:48:50 +00:00
return false , errors [ 0 ]
2019-05-22 21:23:23 +00:00
}
2020-11-22 22:15:51 +00:00
// Return partial results.
2019-05-22 21:23:23 +00:00
// This allows gracefully degrade vmselect in the case
2020-11-22 22:15:51 +00:00
// if a part of storageNodes are temporarily unavailable.
2020-09-26 01:29:45 +00:00
// Do not return the error, since it may spam logs on busy vmselect
2020-11-10 16:48:50 +00:00
// serving high amounts of requests.
2020-11-22 22:15:51 +00:00
partialResultsCounter . Inc ( )
2020-11-23 08:51:40 +00:00
if snr . denyPartialResponse {
2020-11-14 10:36:21 +00:00
return true , errors [ 0 ]
}
isPartial = true
2019-05-22 21:16:55 +00:00
}
2020-11-14 10:36:21 +00:00
return isPartial , nil
2019-05-22 21:23:23 +00:00
}
type storageNode struct {
connPool * netutil . ConnPool
2021-05-24 16:11:35 +00:00
// The number of concurrent queries to storageNode.
concurrentQueries * metrics . Counter
2019-05-22 21:23:23 +00:00
2020-11-23 10:33:17 +00:00
// The number of RegisterMetricNames requests to storageNode.
registerMetricNamesRequests * metrics . Counter
// The number of RegisterMetricNames request errors to storageNode.
2020-11-23 13:00:04 +00:00
registerMetricNamesErrors * metrics . Counter
2020-11-23 10:33:17 +00:00
2019-05-22 21:23:23 +00:00
// The number of DeleteSeries requests to storageNode.
deleteSeriesRequests * metrics . Counter
// The number of DeleteSeries request errors to storageNode.
2020-11-23 13:00:04 +00:00
deleteSeriesErrors * metrics . Counter
2019-05-22 21:23:23 +00:00
2020-11-04 22:15:43 +00:00
// The number of requests to labels.
labelsOnTimeRangeRequests * metrics . Counter
2019-05-22 21:23:23 +00:00
// The number of requests to labels.
labelsRequests * metrics . Counter
2020-11-04 22:15:43 +00:00
// The number of errors during requests to labels.
2020-11-23 13:00:04 +00:00
labelsOnTimeRangeErrors * metrics . Counter
2020-11-04 22:15:43 +00:00
2019-05-22 21:23:23 +00:00
// The number of errors during requests to labels.
2020-11-23 13:00:04 +00:00
labelsErrors * metrics . Counter
2019-05-22 21:23:23 +00:00
2020-11-04 22:15:43 +00:00
// The number of requests to labelValuesOnTimeRange.
labelValuesOnTimeRangeRequests * metrics . Counter
2019-05-22 21:23:23 +00:00
// The number of requests to labelValues.
labelValuesRequests * metrics . Counter
2020-11-04 22:15:43 +00:00
// The number of errors during requests to labelValuesOnTimeRange.
2020-11-23 13:00:04 +00:00
labelValuesOnTimeRangeErrors * metrics . Counter
2020-11-04 22:15:43 +00:00
2019-05-22 21:23:23 +00:00
// The number of errors during requests to labelValues.
2020-11-23 13:00:04 +00:00
labelValuesErrors * metrics . Counter
2019-05-22 21:23:23 +00:00
2019-06-10 15:55:20 +00:00
// The number of requests to labelEntries.
labelEntriesRequests * metrics . Counter
// The number of errors during requests to labelEntries.
2020-11-23 13:00:04 +00:00
labelEntriesErrors * metrics . Counter
2019-06-10 15:55:20 +00:00
2020-09-10 21:29:26 +00:00
// The number of requests to tagValueSuffixes.
tagValueSuffixesRequests * metrics . Counter
// The number of errors during requests to tagValueSuffixes.
2020-11-23 13:00:04 +00:00
tagValueSuffixesErrors * metrics . Counter
2020-09-10 21:29:26 +00:00
2020-04-22 16:57:36 +00:00
// The number of requests to tsdb status.
tsdbStatusRequests * metrics . Counter
// The number of errors during requests to tsdb status.
2020-11-23 13:00:04 +00:00
tsdbStatusErrors * metrics . Counter
2020-04-22 16:57:36 +00:00
2021-05-12 12:18:45 +00:00
// The number of requests to tsdb status.
tsdbStatusWithFiltersRequests * metrics . Counter
// The number of errors during requests to tsdb status.
tsdbStatusWithFiltersErrors * metrics . Counter
2019-05-22 21:23:23 +00:00
// The number of requests to seriesCount.
seriesCountRequests * metrics . Counter
// The number of errors during requests to seriesCount.
2020-11-23 13:00:04 +00:00
seriesCountErrors * metrics . Counter
2019-05-22 21:23:23 +00:00
2020-11-16 08:55:55 +00:00
// The number of 'search metric names' requests to storageNode.
searchMetricNamesRequests * metrics . Counter
2019-05-22 21:23:23 +00:00
// The number of search requests to storageNode.
searchRequests * metrics . Counter
2020-11-16 08:55:55 +00:00
// The number of 'search metric names' errors to storageNode.
2020-11-23 13:00:04 +00:00
searchMetricNamesErrors * metrics . Counter
2020-11-16 08:55:55 +00:00
2019-05-22 21:23:23 +00:00
// The number of search request errors to storageNode.
2020-11-23 13:00:04 +00:00
searchErrors * metrics . Counter
2019-05-22 21:23:23 +00:00
// The number of metric blocks read.
metricBlocksRead * metrics . Counter
// The number of read metric rows.
metricRowsRead * metrics . Counter
}
2020-11-23 10:33:17 +00:00
func ( sn * storageNode ) registerMetricNames ( mrs [ ] storage . MetricRow , deadline searchutils . Deadline ) error {
if len ( mrs ) == 0 {
return nil
}
f := func ( bc * handshake . BufferedConn ) error {
return sn . registerMetricNamesOnConn ( bc , mrs )
}
2021-03-30 11:54:34 +00:00
return sn . execOnConnWithPossibleRetry ( "registerMetricNames_v1" , f , deadline )
2020-11-23 10:33:17 +00:00
}
2020-09-11 10:18:57 +00:00
func ( sn * storageNode ) deleteMetrics ( requestData [ ] byte , deadline searchutils . Deadline ) ( int , error ) {
2019-05-22 21:23:23 +00:00
var deletedCount int
f := func ( bc * handshake . BufferedConn ) error {
n , err := sn . deleteMetricsOnConn ( bc , requestData )
if err != nil {
return err
}
2021-03-30 11:54:34 +00:00
deletedCount = n
2019-05-22 21:23:23 +00:00
return nil
}
2021-03-30 11:54:34 +00:00
if err := sn . execOnConnWithPossibleRetry ( "deleteMetrics_v3" , f , deadline ) ; err != nil {
return 0 , err
2019-05-22 21:23:23 +00:00
}
return deletedCount , nil
}
2020-11-04 22:15:43 +00:00
func ( sn * storageNode ) getLabelsOnTimeRange ( accountID , projectID uint32 , tr storage . TimeRange , deadline searchutils . Deadline ) ( [ ] string , error ) {
var labels [ ] string
f := func ( bc * handshake . BufferedConn ) error {
ls , err := sn . getLabelsOnTimeRangeOnConn ( bc , accountID , projectID , tr )
if err != nil {
return err
}
labels = ls
return nil
}
2021-03-30 11:54:34 +00:00
if err := sn . execOnConnWithPossibleRetry ( "labelsOnTimeRange_v1" , f , deadline ) ; err != nil {
return nil , err
2020-11-04 22:15:43 +00:00
}
return labels , nil
}
2020-09-11 10:18:57 +00:00
func ( sn * storageNode ) getLabels ( accountID , projectID uint32 , deadline searchutils . Deadline ) ( [ ] string , error ) {
2019-05-22 21:23:23 +00:00
var labels [ ] string
f := func ( bc * handshake . BufferedConn ) error {
ls , err := sn . getLabelsOnConn ( bc , accountID , projectID )
if err != nil {
return err
}
labels = ls
return nil
}
2021-03-30 11:54:34 +00:00
if err := sn . execOnConnWithPossibleRetry ( "labels_v2" , f , deadline ) ; err != nil {
return nil , err
2019-05-22 21:23:23 +00:00
}
return labels , nil
}
2020-11-04 22:15:43 +00:00
func ( sn * storageNode ) getLabelValuesOnTimeRange ( accountID , projectID uint32 , labelName string , tr storage . TimeRange , deadline searchutils . Deadline ) ( [ ] string , error ) {
var labelValues [ ] string
f := func ( bc * handshake . BufferedConn ) error {
lvs , err := sn . getLabelValuesOnTimeRangeOnConn ( bc , accountID , projectID , labelName , tr )
if err != nil {
return err
}
labelValues = lvs
return nil
}
2021-03-30 11:54:34 +00:00
if err := sn . execOnConnWithPossibleRetry ( "labelValuesOnTimeRange_v1" , f , deadline ) ; err != nil {
return nil , err
2020-11-04 22:15:43 +00:00
}
return labelValues , nil
}
2020-09-11 10:18:57 +00:00
func ( sn * storageNode ) getLabelValues ( accountID , projectID uint32 , labelName string , deadline searchutils . Deadline ) ( [ ] string , error ) {
2019-05-22 21:23:23 +00:00
var labelValues [ ] string
f := func ( bc * handshake . BufferedConn ) error {
lvs , err := sn . getLabelValuesOnConn ( bc , accountID , projectID , labelName )
if err != nil {
return err
}
labelValues = lvs
return nil
}
2021-03-30 11:54:34 +00:00
if err := sn . execOnConnWithPossibleRetry ( "labelValues_v2" , f , deadline ) ; err != nil {
return nil , err
2019-05-22 21:23:23 +00:00
}
return labelValues , nil
}
2020-09-11 10:18:57 +00:00
func ( sn * storageNode ) getTagValueSuffixes ( accountID , projectID uint32 , tr storage . TimeRange , tagKey , tagValuePrefix string ,
delimiter byte , deadline searchutils . Deadline ) ( [ ] string , error ) {
2020-09-10 21:29:26 +00:00
var suffixes [ ] string
f := func ( bc * handshake . BufferedConn ) error {
ss , err := sn . getTagValueSuffixesOnConn ( bc , accountID , projectID , tr , tagKey , tagValuePrefix , delimiter )
if err != nil {
return err
}
suffixes = ss
return nil
}
2021-03-30 11:54:34 +00:00
if err := sn . execOnConnWithPossibleRetry ( "tagValueSuffixes_v1" , f , deadline ) ; err != nil {
return nil , err
2020-09-10 21:29:26 +00:00
}
return suffixes , nil
}
2020-09-11 10:18:57 +00:00
func ( sn * storageNode ) getLabelEntries ( accountID , projectID uint32 , deadline searchutils . Deadline ) ( [ ] storage . TagEntry , error ) {
2019-06-10 15:55:20 +00:00
var tagEntries [ ] storage . TagEntry
f := func ( bc * handshake . BufferedConn ) error {
tes , err := sn . getLabelEntriesOnConn ( bc , accountID , projectID )
if err != nil {
return err
}
tagEntries = tes
return nil
}
2021-03-30 11:54:34 +00:00
if err := sn . execOnConnWithPossibleRetry ( "labelEntries_v2" , f , deadline ) ; err != nil {
return nil , err
2019-06-10 15:55:20 +00:00
}
return tagEntries , nil
}
2020-09-11 10:18:57 +00:00
func ( sn * storageNode ) getTSDBStatusForDate ( accountID , projectID uint32 , date uint64 , topN int , deadline searchutils . Deadline ) ( * storage . TSDBStatus , error ) {
2020-04-22 16:57:36 +00:00
var status * storage . TSDBStatus
f := func ( bc * handshake . BufferedConn ) error {
st , err := sn . getTSDBStatusForDateOnConn ( bc , accountID , projectID , date , topN )
if err != nil {
return err
}
status = st
return nil
}
2021-03-30 11:54:34 +00:00
if err := sn . execOnConnWithPossibleRetry ( "tsdbStatus_v2" , f , deadline ) ; err != nil {
return nil , err
2020-04-22 16:57:36 +00:00
}
return status , nil
}
2021-05-12 12:18:45 +00:00
func ( sn * storageNode ) getTSDBStatusWithFilters ( requestData [ ] byte , topN int , deadline searchutils . Deadline ) ( * storage . TSDBStatus , error ) {
var status * storage . TSDBStatus
f := func ( bc * handshake . BufferedConn ) error {
st , err := sn . getTSDBStatusWithFiltersOnConn ( bc , requestData , topN )
if err != nil {
return err
}
status = st
return nil
}
if err := sn . execOnConnWithPossibleRetry ( "tsdbStatusWithFilters_v1" , f , deadline ) ; err != nil {
return nil , err
}
return status , nil
}
2020-09-11 10:18:57 +00:00
func ( sn * storageNode ) getSeriesCount ( accountID , projectID uint32 , deadline searchutils . Deadline ) ( uint64 , error ) {
2019-05-22 21:23:23 +00:00
var n uint64
f := func ( bc * handshake . BufferedConn ) error {
nn , err := sn . getSeriesCountOnConn ( bc , accountID , projectID )
if err != nil {
return err
}
n = nn
return nil
}
2021-03-30 11:54:34 +00:00
if err := sn . execOnConnWithPossibleRetry ( "seriesCount_v2" , f , deadline ) ; err != nil {
return 0 , err
2019-05-22 21:23:23 +00:00
}
return n , nil
}
2020-11-16 08:55:55 +00:00
func ( sn * storageNode ) processSearchMetricNames ( requestData [ ] byte , deadline searchutils . Deadline ) ( [ ] [ ] byte , error ) {
var metricNames [ ] [ ] byte
f := func ( bc * handshake . BufferedConn ) error {
mns , err := sn . processSearchMetricNamesOnConn ( bc , requestData )
if err != nil {
return err
}
metricNames = mns
return nil
}
2021-03-30 11:54:34 +00:00
if err := sn . execOnConnWithPossibleRetry ( "searchMetricNames_v1" , f , deadline ) ; err != nil {
return nil , err
2020-11-16 08:55:55 +00:00
}
return metricNames , nil
}
2020-09-26 01:29:45 +00:00
func ( sn * storageNode ) processSearchQuery ( requestData [ ] byte , fetchData bool , processBlock func ( mb * storage . MetricBlock ) error , deadline searchutils . Deadline ) error {
2019-05-22 21:23:23 +00:00
f := func ( bc * handshake . BufferedConn ) error {
2021-02-24 09:43:09 +00:00
if err := sn . processSearchQueryOnConn ( bc , requestData , fetchData , processBlock ) ; err != nil {
2019-05-22 21:23:23 +00:00
return err
}
return nil
}
2021-03-30 11:54:34 +00:00
return sn . execOnConnWithPossibleRetry ( "search_v4" , f , deadline )
}
func ( sn * storageNode ) execOnConnWithPossibleRetry ( funcName string , f func ( bc * handshake . BufferedConn ) error , deadline searchutils . Deadline ) error {
err := sn . execOnConn ( funcName , f , deadline )
if err == nil {
return nil
2019-05-22 21:23:23 +00:00
}
2021-03-30 11:54:34 +00:00
var er * errRemote
var ne net . Error
if errors . As ( err , & er ) || errors . As ( err , & ne ) && ne . Timeout ( ) {
// There is no sense in repeating the query on errors induced by vmstorage (errRemote) or on network timeout errors.
return err
}
// Repeat the query in the hope the error was temporary.
return sn . execOnConn ( funcName , f , deadline )
2019-05-22 21:23:23 +00:00
}
2020-09-11 10:18:57 +00:00
func ( sn * storageNode ) execOnConn ( rpcName string , f func ( bc * handshake . BufferedConn ) error , deadline searchutils . Deadline ) error {
2021-05-24 16:11:35 +00:00
sn . concurrentQueries . Inc ( )
defer sn . concurrentQueries . Dec ( )
2019-05-22 21:23:23 +00:00
2020-09-16 18:03:51 +00:00
d := time . Unix ( int64 ( deadline . Deadline ( ) ) , 0 )
nowSecs := fasttime . UnixTimestamp ( )
currentTime := time . Unix ( int64 ( nowSecs ) , 0 )
timeout := d . Sub ( currentTime )
if timeout <= 0 {
2021-03-30 11:54:34 +00:00
return fmt . Errorf ( "request timeout reached: %s" , deadline . String ( ) )
2020-09-16 18:03:51 +00:00
}
2019-05-22 21:23:23 +00:00
bc , err := sn . connPool . Get ( )
if err != nil {
2020-06-30 19:58:18 +00:00
return fmt . Errorf ( "cannot obtain connection from a pool: %w" , err )
2019-05-22 21:23:23 +00:00
}
2020-09-16 18:03:51 +00:00
// Extend the connection deadline by 2 seconds, so the remote storage could return `timeout` error
// without the need to break the connection.
connDeadline := d . Add ( 2 * time . Second )
if err := bc . SetDeadline ( connDeadline ) ; err != nil {
2019-05-22 21:23:23 +00:00
_ = bc . Close ( )
logger . Panicf ( "FATAL: cannot set connection deadline: %s" , err )
}
if err := writeBytes ( bc , [ ] byte ( rpcName ) ) ; err != nil {
// Close the connection instead of returning it to the pool,
// since it may be broken.
_ = bc . Close ( )
2020-06-30 19:58:18 +00:00
return fmt . Errorf ( "cannot send rpcName=%q to the server: %w" , rpcName , err )
2019-05-22 21:23:23 +00:00
}
2020-07-23 17:42:57 +00:00
// Send the remaining timeout instead of deadline to remote server, since it may have different time.
2020-09-16 18:03:51 +00:00
timeoutSecs := uint32 ( timeout . Seconds ( ) + 1 )
if err := writeUint32 ( bc , timeoutSecs ) ; err != nil {
2020-07-23 17:42:57 +00:00
// Close the connection instead of returning it to the pool,
// since it may be broken.
_ = bc . Close ( )
return fmt . Errorf ( "cannot send timeout=%d for rpcName=%q to the server: %w" , timeout , rpcName , err )
}
2019-05-22 21:23:23 +00:00
if err := f ( bc ) ; err != nil {
remoteAddr := bc . RemoteAddr ( )
2020-06-30 21:58:26 +00:00
var er * errRemote
if errors . As ( err , & er ) {
2019-05-22 21:23:23 +00:00
// Remote error. The connection may be re-used. Return it to the pool.
sn . connPool . Put ( bc )
} else {
// Local error.
// Close the connection instead of returning it to the pool,
// since it may be broken.
_ = bc . Close ( )
}
2020-06-30 19:58:18 +00:00
return fmt . Errorf ( "cannot execute rpcName=%q on vmstorage %q with timeout %s: %w" , rpcName , remoteAddr , deadline . String ( ) , err )
2019-05-22 21:23:23 +00:00
}
// Return the connection back to the pool, assuming it is healthy.
sn . connPool . Put ( bc )
return nil
}
type errRemote struct {
msg string
}
func ( er * errRemote ) Error ( ) string {
return er . msg
}
2020-06-30 21:58:26 +00:00
func newErrRemote ( buf [ ] byte ) error {
err := & errRemote {
msg : string ( buf ) ,
}
if ! strings . Contains ( err . msg , "denyQueriesOutsideRetention" ) {
return err
}
return & httpserver . ErrorWithStatusCode {
Err : err ,
StatusCode : http . StatusServiceUnavailable ,
}
}
2020-11-23 10:33:17 +00:00
func ( sn * storageNode ) registerMetricNamesOnConn ( bc * handshake . BufferedConn , mrs [ ] storage . MetricRow ) error {
// Send the request to sn.
if err := writeUint64 ( bc , uint64 ( len ( mrs ) ) ) ; err != nil {
return fmt . Errorf ( "cannot send metricsCount to conn: %w" , err )
}
for i , mr := range mrs {
if err := writeBytes ( bc , mr . MetricNameRaw ) ; err != nil {
return fmt . Errorf ( "cannot send MetricNameRaw #%d to conn: %w" , i + 1 , err )
}
if err := writeUint64 ( bc , uint64 ( mr . Timestamp ) ) ; err != nil {
return fmt . Errorf ( "cannot send Timestamp #%d to conn: %w" , i + 1 , err )
}
}
if err := bc . Flush ( ) ; err != nil {
return fmt . Errorf ( "cannot flush registerMetricNames request to conn: %w" , err )
}
// Read response error.
buf , err := readBytes ( nil , bc , maxErrorMessageSize )
if err != nil {
return fmt . Errorf ( "cannot read error message: %w" , err )
}
if len ( buf ) > 0 {
return newErrRemote ( buf )
}
return nil
}
2019-05-22 21:23:23 +00:00
func ( sn * storageNode ) deleteMetricsOnConn ( bc * handshake . BufferedConn , requestData [ ] byte ) ( int , error ) {
// Send the request to sn
if err := writeBytes ( bc , requestData ) ; err != nil {
2020-06-30 19:58:18 +00:00
return 0 , fmt . Errorf ( "cannot send deleteMetrics request to conn: %w" , err )
2019-05-22 21:23:23 +00:00
}
if err := bc . Flush ( ) ; err != nil {
2020-06-30 19:58:18 +00:00
return 0 , fmt . Errorf ( "cannot flush deleteMetrics request to conn: %w" , err )
2019-05-22 21:23:23 +00:00
}
// Read response error.
buf , err := readBytes ( nil , bc , maxErrorMessageSize )
if err != nil {
2020-06-30 19:58:18 +00:00
return 0 , fmt . Errorf ( "cannot read error message: %w" , err )
2019-05-22 21:23:23 +00:00
}
if len ( buf ) > 0 {
2020-06-30 21:58:26 +00:00
return 0 , newErrRemote ( buf )
2019-05-22 21:23:23 +00:00
}
// Read deletedCount
deletedCount , err := readUint64 ( bc )
if err != nil {
2020-06-30 19:58:18 +00:00
return 0 , fmt . Errorf ( "cannot read deletedCount value: %w" , err )
2019-05-22 21:23:23 +00:00
}
return int ( deletedCount ) , nil
}
2019-06-10 15:55:20 +00:00
const maxLabelSize = 16 * 1024 * 1024
2019-05-22 21:23:23 +00:00
2020-11-04 22:15:43 +00:00
func ( sn * storageNode ) getLabelsOnTimeRangeOnConn ( bc * handshake . BufferedConn , accountID , projectID uint32 , tr storage . TimeRange ) ( [ ] string , error ) {
// Send the request to sn.
if err := sendAccountIDProjectID ( bc , accountID , projectID ) ; err != nil {
return nil , err
}
if err := writeTimeRange ( bc , tr ) ; err != nil {
return nil , err
}
if err := bc . Flush ( ) ; err != nil {
return nil , fmt . Errorf ( "cannot flush request to conn: %w" , err )
}
// Read response error.
buf , err := readBytes ( nil , bc , maxErrorMessageSize )
if err != nil {
return nil , fmt . Errorf ( "cannot read error message: %w" , err )
}
if len ( buf ) > 0 {
return nil , newErrRemote ( buf )
}
// Read response
var labels [ ] string
for {
buf , err = readBytes ( buf [ : 0 ] , bc , maxLabelSize )
if err != nil {
return nil , fmt . Errorf ( "cannot read labels: %w" , err )
}
if len ( buf ) == 0 {
// Reached the end of the response
return labels , nil
}
labels = append ( labels , string ( buf ) )
}
}
2019-05-22 21:23:23 +00:00
func ( sn * storageNode ) getLabelsOnConn ( bc * handshake . BufferedConn , accountID , projectID uint32 ) ( [ ] string , error ) {
// Send the request to sn.
2020-09-10 21:29:26 +00:00
if err := sendAccountIDProjectID ( bc , accountID , projectID ) ; err != nil {
return nil , err
2019-05-22 21:23:23 +00:00
}
if err := bc . Flush ( ) ; err != nil {
2020-06-30 19:58:18 +00:00
return nil , fmt . Errorf ( "cannot flush request to conn: %w" , err )
2019-05-22 21:23:23 +00:00
}
// Read response error.
buf , err := readBytes ( nil , bc , maxErrorMessageSize )
if err != nil {
2020-06-30 19:58:18 +00:00
return nil , fmt . Errorf ( "cannot read error message: %w" , err )
2019-05-22 21:23:23 +00:00
}
if len ( buf ) > 0 {
2020-06-30 21:58:26 +00:00
return nil , newErrRemote ( buf )
2019-05-22 21:23:23 +00:00
}
// Read response
var labels [ ] string
for {
2019-06-10 15:55:20 +00:00
buf , err = readBytes ( buf [ : 0 ] , bc , maxLabelSize )
2019-05-22 21:23:23 +00:00
if err != nil {
2020-06-30 19:58:18 +00:00
return nil , fmt . Errorf ( "cannot read labels: %w" , err )
2019-05-22 21:23:23 +00:00
}
if len ( buf ) == 0 {
// Reached the end of the response
return labels , nil
}
labels = append ( labels , string ( buf ) )
}
}
const maxLabelValueSize = 16 * 1024 * 1024
2020-11-04 22:15:43 +00:00
func ( sn * storageNode ) getLabelValuesOnTimeRangeOnConn ( bc * handshake . BufferedConn , accountID , projectID uint32 , labelName string , tr storage . TimeRange ) ( [ ] string , error ) {
// Send the request to sn.
if err := sendAccountIDProjectID ( bc , accountID , projectID ) ; err != nil {
return nil , err
}
if err := writeBytes ( bc , [ ] byte ( labelName ) ) ; err != nil {
return nil , fmt . Errorf ( "cannot send labelName=%q to conn: %w" , labelName , err )
}
if err := writeTimeRange ( bc , tr ) ; err != nil {
return nil , err
}
if err := bc . Flush ( ) ; err != nil {
return nil , fmt . Errorf ( "cannot flush labelName to conn: %w" , err )
}
// Read response error.
buf , err := readBytes ( nil , bc , maxErrorMessageSize )
if err != nil {
return nil , fmt . Errorf ( "cannot read error message: %w" , err )
}
if len ( buf ) > 0 {
return nil , newErrRemote ( buf )
}
// Read response
labelValues , _ , err := readLabelValues ( buf , bc )
if err != nil {
return nil , err
}
return labelValues , nil
}
2019-05-22 21:23:23 +00:00
func ( sn * storageNode ) getLabelValuesOnConn ( bc * handshake . BufferedConn , accountID , projectID uint32 , labelName string ) ( [ ] string , error ) {
// Send the request to sn.
2020-09-10 21:29:26 +00:00
if err := sendAccountIDProjectID ( bc , accountID , projectID ) ; err != nil {
return nil , err
2019-05-22 21:23:23 +00:00
}
if err := writeBytes ( bc , [ ] byte ( labelName ) ) ; err != nil {
2020-06-30 19:58:18 +00:00
return nil , fmt . Errorf ( "cannot send labelName=%q to conn: %w" , labelName , err )
2019-05-22 21:23:23 +00:00
}
if err := bc . Flush ( ) ; err != nil {
2020-06-30 19:58:18 +00:00
return nil , fmt . Errorf ( "cannot flush labelName to conn: %w" , err )
2019-05-22 21:23:23 +00:00
}
// Read response error.
buf , err := readBytes ( nil , bc , maxErrorMessageSize )
if err != nil {
2020-06-30 19:58:18 +00:00
return nil , fmt . Errorf ( "cannot read error message: %w" , err )
2019-05-22 21:23:23 +00:00
}
if len ( buf ) > 0 {
2020-06-30 21:58:26 +00:00
return nil , newErrRemote ( buf )
2019-05-22 21:23:23 +00:00
}
// Read response
2019-06-10 15:55:20 +00:00
labelValues , _ , err := readLabelValues ( buf , bc )
if err != nil {
return nil , err
}
return labelValues , nil
}
func readLabelValues ( buf [ ] byte , bc * handshake . BufferedConn ) ( [ ] string , [ ] byte , error ) {
2019-05-22 21:23:23 +00:00
var labelValues [ ] string
for {
2019-06-10 15:55:20 +00:00
var err error
2019-05-22 21:23:23 +00:00
buf , err = readBytes ( buf [ : 0 ] , bc , maxLabelValueSize )
if err != nil {
2020-06-30 19:58:18 +00:00
return nil , buf , fmt . Errorf ( "cannot read labelValue: %w" , err )
2019-05-22 21:23:23 +00:00
}
if len ( buf ) == 0 {
// Reached the end of the response
2019-06-10 15:55:20 +00:00
return labelValues , buf , nil
2019-05-22 21:23:23 +00:00
}
labelValues = append ( labelValues , string ( buf ) )
}
}
2020-09-10 21:29:26 +00:00
func ( sn * storageNode ) getTagValueSuffixesOnConn ( bc * handshake . BufferedConn , accountID , projectID uint32 ,
tr storage . TimeRange , tagKey , tagValuePrefix string , delimiter byte ) ( [ ] string , error ) {
2019-06-10 15:55:20 +00:00
// Send the request to sn.
2020-09-10 21:29:26 +00:00
if err := sendAccountIDProjectID ( bc , accountID , projectID ) ; err != nil {
return nil , err
2019-06-10 15:55:20 +00:00
}
2020-11-04 22:15:43 +00:00
if err := writeTimeRange ( bc , tr ) ; err != nil {
return nil , err
2020-09-10 21:29:26 +00:00
}
if err := writeBytes ( bc , [ ] byte ( tagKey ) ) ; err != nil {
return nil , fmt . Errorf ( "cannot send tagKey=%q to conn: %w" , tagKey , err )
}
if err := writeBytes ( bc , [ ] byte ( tagValuePrefix ) ) ; err != nil {
return nil , fmt . Errorf ( "cannot send tagValuePrefix=%q to conn: %w" , tagValuePrefix , err )
}
if err := writeByte ( bc , delimiter ) ; err != nil {
return nil , fmt . Errorf ( "cannot send delimiter=%c to conn: %w" , delimiter , err )
2019-06-10 15:55:20 +00:00
}
if err := bc . Flush ( ) ; err != nil {
2020-06-30 19:58:18 +00:00
return nil , fmt . Errorf ( "cannot flush request to conn: %w" , err )
2019-06-10 15:55:20 +00:00
}
// Read response error.
buf , err := readBytes ( nil , bc , maxErrorMessageSize )
if err != nil {
2020-06-30 19:58:18 +00:00
return nil , fmt . Errorf ( "cannot read error message: %w" , err )
2019-06-10 15:55:20 +00:00
}
if len ( buf ) > 0 {
2020-06-30 21:58:26 +00:00
return nil , newErrRemote ( buf )
2019-06-10 15:55:20 +00:00
}
2020-09-10 21:29:26 +00:00
// Read response.
// The response may contain empty suffix, so it is prepended with the number of the following suffixes.
suffixesCount , err := readUint64 ( bc )
if err != nil {
return nil , fmt . Errorf ( "cannot read the number of tag value suffixes: %w" , err )
}
suffixes := make ( [ ] string , 0 , suffixesCount )
for i := 0 ; i < int ( suffixesCount ) ; i ++ {
buf , err = readBytes ( buf [ : 0 ] , bc , maxLabelValueSize )
if err != nil {
return nil , fmt . Errorf ( "cannot read tag value suffix #%d: %w" , i + 1 , err )
}
suffixes = append ( suffixes , string ( buf ) )
}
return suffixes , nil
}
func ( sn * storageNode ) getLabelEntriesOnConn ( bc * handshake . BufferedConn , accountID , projectID uint32 ) ( [ ] storage . TagEntry , error ) {
// Send the request to sn.
if err := sendAccountIDProjectID ( bc , accountID , projectID ) ; err != nil {
return nil , err
}
if err := bc . Flush ( ) ; err != nil {
return nil , fmt . Errorf ( "cannot flush request to conn: %w" , err )
}
// Read response error.
buf , err := readBytes ( nil , bc , maxErrorMessageSize )
if err != nil {
return nil , fmt . Errorf ( "cannot read error message: %w" , err )
}
if len ( buf ) > 0 {
return nil , newErrRemote ( buf )
}
// Read response.
2019-06-10 15:55:20 +00:00
var labelEntries [ ] storage . TagEntry
for {
buf , err = readBytes ( buf [ : 0 ] , bc , maxLabelSize )
if err != nil {
2020-06-30 19:58:18 +00:00
return nil , fmt . Errorf ( "cannot read label: %w" , err )
2019-06-10 15:55:20 +00:00
}
if len ( buf ) == 0 {
// Reached the end of the response
return labelEntries , nil
}
label := string ( buf )
var values [ ] string
values , buf , err = readLabelValues ( buf , bc )
if err != nil {
2020-06-30 19:58:18 +00:00
return nil , fmt . Errorf ( "cannot read values for label %q: %w" , label , err )
2019-06-10 15:55:20 +00:00
}
labelEntries = append ( labelEntries , storage . TagEntry {
Key : label ,
Values : values ,
} )
}
}
2020-04-22 16:57:36 +00:00
func ( sn * storageNode ) getTSDBStatusForDateOnConn ( bc * handshake . BufferedConn , accountID , projectID uint32 , date uint64 , topN int ) ( * storage . TSDBStatus , error ) {
// Send the request to sn.
2020-09-10 21:29:26 +00:00
if err := sendAccountIDProjectID ( bc , accountID , projectID ) ; err != nil {
return nil , err
2020-04-22 16:57:36 +00:00
}
// date shouldn't exceed 32 bits, so send it as uint32.
if err := writeUint32 ( bc , uint32 ( date ) ) ; err != nil {
2020-06-30 19:58:18 +00:00
return nil , fmt . Errorf ( "cannot send date=%d to conn: %w" , date , err )
2020-04-22 16:57:36 +00:00
}
// topN shouldn't exceed 32 bits, so send it as uint32.
if err := writeUint32 ( bc , uint32 ( topN ) ) ; err != nil {
2020-06-30 19:58:18 +00:00
return nil , fmt . Errorf ( "cannot send topN=%d to conn: %w" , topN , err )
2020-04-22 16:57:36 +00:00
}
if err := bc . Flush ( ) ; err != nil {
2020-06-30 19:58:18 +00:00
return nil , fmt . Errorf ( "cannot flush tsdbStatus args to conn: %w" , err )
2020-04-22 16:57:36 +00:00
}
// Read response error.
buf , err := readBytes ( nil , bc , maxErrorMessageSize )
if err != nil {
2020-06-30 19:58:18 +00:00
return nil , fmt . Errorf ( "cannot read error message: %w" , err )
2020-04-22 16:57:36 +00:00
}
if len ( buf ) > 0 {
2020-06-30 21:58:26 +00:00
return nil , newErrRemote ( buf )
2020-04-22 16:57:36 +00:00
}
// Read response
seriesCountByMetricName , err := readTopHeapEntries ( bc )
if err != nil {
2020-06-30 19:58:18 +00:00
return nil , fmt . Errorf ( "cannot read seriesCountByMetricName: %w" , err )
2020-04-22 16:57:36 +00:00
}
labelValueCountByLabelName , err := readTopHeapEntries ( bc )
if err != nil {
2020-06-30 19:58:18 +00:00
return nil , fmt . Errorf ( "cannot read labelValueCountByLabelName: %w" , err )
2020-04-22 16:57:36 +00:00
}
seriesCountByLabelValuePair , err := readTopHeapEntries ( bc )
if err != nil {
2020-06-30 19:58:18 +00:00
return nil , fmt . Errorf ( "cannot read seriesCountByLabelValuePair: %w" , err )
2020-04-22 16:57:36 +00:00
}
status := & storage . TSDBStatus {
SeriesCountByMetricName : seriesCountByMetricName ,
LabelValueCountByLabelName : labelValueCountByLabelName ,
SeriesCountByLabelValuePair : seriesCountByLabelValuePair ,
}
return status , nil
}
2021-05-12 12:18:45 +00:00
func ( sn * storageNode ) getTSDBStatusWithFiltersOnConn ( bc * handshake . BufferedConn , requestData [ ] byte , topN int ) ( * storage . TSDBStatus , error ) {
// Send the request to sn.
if err := writeBytes ( bc , requestData ) ; err != nil {
return nil , fmt . Errorf ( "cannot write requestData: %w" , err )
}
// topN shouldn't exceed 32 bits, so send it as uint32.
if err := writeUint32 ( bc , uint32 ( topN ) ) ; err != nil {
return nil , fmt . Errorf ( "cannot send topN=%d to conn: %w" , topN , err )
}
if err := bc . Flush ( ) ; err != nil {
return nil , fmt . Errorf ( "cannot flush tsdbStatusWithFilters args to conn: %w" , err )
}
// Read response error.
buf , err := readBytes ( nil , bc , maxErrorMessageSize )
if err != nil {
return nil , fmt . Errorf ( "cannot read error message: %w" , err )
}
if len ( buf ) > 0 {
return nil , newErrRemote ( buf )
}
// Read response
seriesCountByMetricName , err := readTopHeapEntries ( bc )
if err != nil {
return nil , fmt . Errorf ( "cannot read seriesCountByMetricName: %w" , err )
}
labelValueCountByLabelName , err := readTopHeapEntries ( bc )
if err != nil {
return nil , fmt . Errorf ( "cannot read labelValueCountByLabelName: %w" , err )
}
seriesCountByLabelValuePair , err := readTopHeapEntries ( bc )
if err != nil {
return nil , fmt . Errorf ( "cannot read seriesCountByLabelValuePair: %w" , err )
}
status := & storage . TSDBStatus {
SeriesCountByMetricName : seriesCountByMetricName ,
LabelValueCountByLabelName : labelValueCountByLabelName ,
SeriesCountByLabelValuePair : seriesCountByLabelValuePair ,
}
return status , nil
}
2020-04-22 16:57:36 +00:00
func readTopHeapEntries ( bc * handshake . BufferedConn ) ( [ ] storage . TopHeapEntry , error ) {
n , err := readUint64 ( bc )
if err != nil {
2020-06-30 19:58:18 +00:00
return nil , fmt . Errorf ( "cannot read the number of topHeapEntries: %w" , err )
2020-04-22 16:57:36 +00:00
}
var a [ ] storage . TopHeapEntry
var buf [ ] byte
for i := uint64 ( 0 ) ; i < n ; i ++ {
buf , err = readBytes ( buf [ : 0 ] , bc , maxLabelSize )
if err != nil {
2020-06-30 19:58:18 +00:00
return nil , fmt . Errorf ( "cannot read label name: %w" , err )
2020-04-22 16:57:36 +00:00
}
count , err := readUint64 ( bc )
if err != nil {
2020-06-30 19:58:18 +00:00
return nil , fmt . Errorf ( "cannot read label count: %w" , err )
2020-04-22 16:57:36 +00:00
}
a = append ( a , storage . TopHeapEntry {
Name : string ( buf ) ,
Count : count ,
} )
}
return a , nil
}
2019-05-22 21:23:23 +00:00
func ( sn * storageNode ) getSeriesCountOnConn ( bc * handshake . BufferedConn , accountID , projectID uint32 ) ( uint64 , error ) {
// Send the request to sn.
2020-09-10 21:29:26 +00:00
if err := sendAccountIDProjectID ( bc , accountID , projectID ) ; err != nil {
return 0 , err
2019-05-22 21:23:23 +00:00
}
if err := bc . Flush ( ) ; err != nil {
2020-06-30 19:58:18 +00:00
return 0 , fmt . Errorf ( "cannot flush seriesCount args to conn: %w" , err )
2019-05-22 21:23:23 +00:00
}
// Read response error.
buf , err := readBytes ( nil , bc , maxErrorMessageSize )
if err != nil {
2020-06-30 19:58:18 +00:00
return 0 , fmt . Errorf ( "cannot read error message: %w" , err )
2019-05-22 21:23:23 +00:00
}
if len ( buf ) > 0 {
2020-06-30 21:58:26 +00:00
return 0 , newErrRemote ( buf )
2019-05-22 21:23:23 +00:00
}
// Read response
n , err := readUint64 ( bc )
if err != nil {
2020-06-30 19:58:18 +00:00
return 0 , fmt . Errorf ( "cannot read series count: %w" , err )
2019-05-22 21:23:23 +00:00
}
return n , nil
}
// maxMetricBlockSize is the maximum size of serialized MetricBlock.
const maxMetricBlockSize = 1024 * 1024
// maxErrorMessageSize is the maximum size of error message received
// from vmstorage.
const maxErrorMessageSize = 64 * 1024
2020-11-16 08:55:55 +00:00
func ( sn * storageNode ) processSearchMetricNamesOnConn ( bc * handshake . BufferedConn , requestData [ ] byte ) ( [ ] [ ] byte , error ) {
// Send the requst to sn.
if err := writeBytes ( bc , requestData ) ; err != nil {
return nil , fmt . Errorf ( "cannot write requestData: %w" , err )
}
if err := bc . Flush ( ) ; err != nil {
return nil , fmt . Errorf ( "cannot flush requestData to conn: %w" , err )
}
// Read response error.
buf , err := readBytes ( nil , bc , maxErrorMessageSize )
if err != nil {
return nil , fmt . Errorf ( "cannot read error message: %w" , err )
}
if len ( buf ) > 0 {
return nil , newErrRemote ( buf )
}
// Read metricNames from response.
metricNamesCount , err := readUint64 ( bc )
if err != nil {
return nil , fmt . Errorf ( "cannot read metricNamesCount: %w" , err )
}
2020-11-16 13:54:40 +00:00
metricNames := make ( [ ] [ ] byte , metricNamesCount )
2020-11-16 08:55:55 +00:00
for i := int64 ( 0 ) ; i < int64 ( metricNamesCount ) ; i ++ {
buf , err = readBytes ( buf [ : 0 ] , bc , maxMetricNameSize )
if err != nil {
return nil , fmt . Errorf ( "cannot read metricName #%d: %w" , i + 1 , err )
}
metricNames [ i ] = append ( metricNames [ i ] [ : 0 ] , buf ... )
}
return metricNames , nil
}
const maxMetricNameSize = 64 * 1024
2021-02-24 09:43:09 +00:00
func ( sn * storageNode ) processSearchQueryOnConn ( bc * handshake . BufferedConn , requestData [ ] byte , fetchData bool , processBlock func ( mb * storage . MetricBlock ) error ) error {
2019-05-22 21:23:23 +00:00
// Send the request to sn.
if err := writeBytes ( bc , requestData ) ; err != nil {
2021-02-24 09:43:09 +00:00
return fmt . Errorf ( "cannot write requestData: %w" , err )
2019-05-22 21:23:23 +00:00
}
2019-08-04 19:15:33 +00:00
if err := writeBool ( bc , fetchData ) ; err != nil {
2021-02-24 09:43:09 +00:00
return fmt . Errorf ( "cannot write fetchData=%v: %w" , fetchData , err )
2019-08-04 19:15:33 +00:00
}
2019-05-22 21:23:23 +00:00
if err := bc . Flush ( ) ; err != nil {
2021-02-24 09:43:09 +00:00
return fmt . Errorf ( "cannot flush requestData to conn: %w" , err )
2019-05-22 21:23:23 +00:00
}
// Read response error.
2020-11-16 08:55:55 +00:00
buf , err := readBytes ( nil , bc , maxErrorMessageSize )
2019-05-22 21:23:23 +00:00
if err != nil {
2021-02-24 09:43:09 +00:00
return fmt . Errorf ( "cannot read error message: %w" , err )
2019-05-22 21:23:23 +00:00
}
if len ( buf ) > 0 {
2021-02-24 09:43:09 +00:00
return newErrRemote ( buf )
2019-05-22 21:23:23 +00:00
}
// Read response. It may consist of multiple MetricBlocks.
2019-09-28 09:20:50 +00:00
blocksRead := 0
2020-04-27 05:13:41 +00:00
var mb storage . MetricBlock
2019-05-22 21:23:23 +00:00
for {
buf , err = readBytes ( buf [ : 0 ] , bc , maxMetricBlockSize )
if err != nil {
2021-02-24 09:43:09 +00:00
return fmt . Errorf ( "cannot read MetricBlock #%d: %w" , blocksRead , err )
2019-05-22 21:23:23 +00:00
}
if len ( buf ) == 0 {
// Reached the end of the response
2021-02-24 09:43:09 +00:00
return nil
2019-05-22 21:23:23 +00:00
}
tail , err := mb . Unmarshal ( buf )
if err != nil {
2021-02-24 09:43:09 +00:00
return fmt . Errorf ( "cannot unmarshal MetricBlock #%d: %w" , blocksRead , err )
2019-05-22 21:23:23 +00:00
}
if len ( tail ) != 0 {
2021-02-24 09:43:09 +00:00
return fmt . Errorf ( "non-empty tail after unmarshaling MetricBlock #%d: (len=%d) %q" , blocksRead , len ( tail ) , tail )
2019-05-22 21:23:23 +00:00
}
2019-09-28 09:20:50 +00:00
blocksRead ++
2019-05-22 21:23:23 +00:00
sn . metricBlocksRead . Inc ( )
sn . metricRowsRead . Add ( mb . Block . RowsCount ( ) )
2020-09-26 01:29:45 +00:00
if err := processBlock ( & mb ) ; err != nil {
2021-02-24 09:43:09 +00:00
return fmt . Errorf ( "cannot process MetricBlock #%d: %w" , blocksRead , err )
2019-09-28 09:20:50 +00:00
}
2019-05-22 21:23:23 +00:00
}
}
2020-11-04 22:15:43 +00:00
func writeTimeRange ( bc * handshake . BufferedConn , tr storage . TimeRange ) error {
if err := writeUint64 ( bc , uint64 ( tr . MinTimestamp ) ) ; err != nil {
return fmt . Errorf ( "cannot send minTimestamp=%d to conn: %w" , tr . MinTimestamp , err )
}
if err := writeUint64 ( bc , uint64 ( tr . MaxTimestamp ) ) ; err != nil {
return fmt . Errorf ( "cannot send maxTimestamp=%d to conn: %w" , tr . MaxTimestamp , err )
}
return nil
}
2019-05-22 21:23:23 +00:00
func writeBytes ( bc * handshake . BufferedConn , buf [ ] byte ) error {
sizeBuf := encoding . MarshalUint64 ( nil , uint64 ( len ( buf ) ) )
if _ , err := bc . Write ( sizeBuf ) ; err != nil {
return err
}
2020-09-10 21:29:26 +00:00
_ , err := bc . Write ( buf )
return err
2019-05-22 21:23:23 +00:00
}
func writeUint32 ( bc * handshake . BufferedConn , n uint32 ) error {
buf := encoding . MarshalUint32 ( nil , n )
2020-09-10 21:29:26 +00:00
_ , err := bc . Write ( buf )
return err
}
func writeUint64 ( bc * handshake . BufferedConn , n uint64 ) error {
buf := encoding . MarshalUint64 ( nil , n )
_ , err := bc . Write ( buf )
return err
2019-05-22 21:23:23 +00:00
}
2019-08-04 19:15:33 +00:00
func writeBool ( bc * handshake . BufferedConn , b bool ) error {
var buf [ 1 ] byte
if b {
buf [ 0 ] = 1
}
2020-09-10 21:29:26 +00:00
_ , err := bc . Write ( buf [ : ] )
return err
}
func writeByte ( bc * handshake . BufferedConn , b byte ) error {
var buf [ 1 ] byte
buf [ 0 ] = b
_ , err := bc . Write ( buf [ : ] )
return err
}
func sendAccountIDProjectID ( bc * handshake . BufferedConn , accountID , projectID uint32 ) error {
if err := writeUint32 ( bc , accountID ) ; err != nil {
return fmt . Errorf ( "cannot send accountID=%d to conn: %w" , accountID , err )
}
if err := writeUint32 ( bc , projectID ) ; err != nil {
return fmt . Errorf ( "cannot send projectID=%d to conn: %w" , projectID , err )
2019-08-04 19:15:33 +00:00
}
return nil
}
2019-05-22 21:23:23 +00:00
func readBytes ( buf [ ] byte , bc * handshake . BufferedConn , maxDataSize int ) ( [ ] byte , error ) {
buf = bytesutil . Resize ( buf , 8 )
2019-12-24 12:40:04 +00:00
if n , err := io . ReadFull ( bc , buf ) ; err != nil {
2020-06-30 19:58:18 +00:00
return buf , fmt . Errorf ( "cannot read %d bytes with data size: %w; read only %d bytes" , len ( buf ) , err , n )
2019-05-22 21:23:23 +00:00
}
dataSize := encoding . UnmarshalUint64 ( buf )
if dataSize > uint64 ( maxDataSize ) {
return buf , fmt . Errorf ( "too big data size: %d; it mustn't exceed %d bytes" , dataSize , maxDataSize )
}
buf = bytesutil . Resize ( buf , int ( dataSize ) )
if dataSize == 0 {
return buf , nil
}
2019-09-11 11:11:37 +00:00
if n , err := io . ReadFull ( bc , buf ) ; err != nil {
2020-06-30 19:58:18 +00:00
return buf , fmt . Errorf ( "cannot read data with size %d: %w; read only %d bytes" , dataSize , err , n )
2019-05-22 21:23:23 +00:00
}
return buf , nil
}
func readUint64 ( bc * handshake . BufferedConn ) ( uint64 , error ) {
var buf [ 8 ] byte
if _ , err := io . ReadFull ( bc , buf [ : ] ) ; err != nil {
2020-06-30 19:58:18 +00:00
return 0 , fmt . Errorf ( "cannot read uint64: %w" , err )
2019-05-22 21:23:23 +00:00
}
n := encoding . UnmarshalUint64 ( buf [ : ] )
return n , nil
}
var storageNodes [ ] * storageNode
// InitStorageNodes initializes storage nodes' connections to the given addrs.
func InitStorageNodes ( addrs [ ] string ) {
if len ( addrs ) == 0 {
logger . Panicf ( "BUG: addrs must be non-empty" )
}
for _ , addr := range addrs {
2021-09-15 15:04:28 +00:00
if _ , _ , err := net . SplitHostPort ( addr ) ; err != nil {
// Automatically add missing port.
addr += ":8401"
}
2019-05-22 21:23:23 +00:00
sn := & storageNode {
// There is no need in requests compression, since they are usually very small.
connPool : netutil . NewConnPool ( "vmselect" , addr , handshake . VMSelectClient , 0 ) ,
2021-05-24 16:11:35 +00:00
concurrentQueries : metrics . NewCounter ( fmt . Sprintf ( ` vm_concurrent_queries { name="vmselect", addr=%q} ` , addr ) ) ,
2019-05-22 21:23:23 +00:00
2020-11-23 13:00:04 +00:00
registerMetricNamesRequests : metrics . NewCounter ( fmt . Sprintf ( ` vm_requests_total { action="registerMetricNames", type="rpcClient", name="vmselect", addr=%q} ` , addr ) ) ,
registerMetricNamesErrors : metrics . NewCounter ( fmt . Sprintf ( ` vm_request_errors_total { action="registerMetricNames", type="rpcClient", name="vmselect", addr=%q} ` , addr ) ) ,
deleteSeriesRequests : metrics . NewCounter ( fmt . Sprintf ( ` vm_requests_total { action="deleteSeries", type="rpcClient", name="vmselect", addr=%q} ` , addr ) ) ,
deleteSeriesErrors : metrics . NewCounter ( fmt . Sprintf ( ` vm_request_errors_total { action="deleteSeries", type="rpcClient", name="vmselect", addr=%q} ` , addr ) ) ,
labelsOnTimeRangeRequests : metrics . NewCounter ( fmt . Sprintf ( ` vm_requests_total { action="labelsOnTimeRange", type="rpcClient", name="vmselect", addr=%q} ` , addr ) ) ,
labelsRequests : metrics . NewCounter ( fmt . Sprintf ( ` vm_requests_total { action="labels", type="rpcClient", name="vmselect", addr=%q} ` , addr ) ) ,
labelsOnTimeRangeErrors : metrics . NewCounter ( fmt . Sprintf ( ` vm_request_errors_total { action="labelsOnTimeRange", type="rpcClient", name="vmselect", addr=%q} ` , addr ) ) ,
labelsErrors : metrics . NewCounter ( fmt . Sprintf ( ` vm_request_errors_total { action="labels", type="rpcClient", name="vmselect", addr=%q} ` , addr ) ) ,
labelValuesOnTimeRangeRequests : metrics . NewCounter ( fmt . Sprintf ( ` vm_requests_total { action="labelValuesOnTimeRange", type="rpcClient", name="vmselect", addr=%q} ` , addr ) ) ,
labelValuesRequests : metrics . NewCounter ( fmt . Sprintf ( ` vm_requests_total { action="labelValues", type="rpcClient", name="vmselect", addr=%q} ` , addr ) ) ,
labelValuesOnTimeRangeErrors : metrics . NewCounter ( fmt . Sprintf ( ` vm_request_errors_total { action="labelValuesOnTimeRange", type="rpcClient", name="vmselect", addr=%q} ` , addr ) ) ,
labelValuesErrors : metrics . NewCounter ( fmt . Sprintf ( ` vm_request_errors_total { action="labelValues", type="rpcClient", name="vmselect", addr=%q} ` , addr ) ) ,
labelEntriesRequests : metrics . NewCounter ( fmt . Sprintf ( ` vm_requests_total { action="labelEntries", type="rpcClient", name="vmselect", addr=%q} ` , addr ) ) ,
labelEntriesErrors : metrics . NewCounter ( fmt . Sprintf ( ` vm_request_errors_total { action="labelEntries", type="rpcClient", name="vmselect", addr=%q} ` , addr ) ) ,
tagValueSuffixesRequests : metrics . NewCounter ( fmt . Sprintf ( ` vm_requests_total { action="tagValueSuffixes", type="rpcClient", name="vmselect", addr=%q} ` , addr ) ) ,
tagValueSuffixesErrors : metrics . NewCounter ( fmt . Sprintf ( ` vm_request_errors_total { action="tagValueSuffixes", type="rpcClient", name="vmselect", addr=%q} ` , addr ) ) ,
tsdbStatusRequests : metrics . NewCounter ( fmt . Sprintf ( ` vm_requests_total { action="tsdbStatus", type="rpcClient", name="vmselect", addr=%q} ` , addr ) ) ,
tsdbStatusErrors : metrics . NewCounter ( fmt . Sprintf ( ` vm_request_errors_total { action="tsdbStatus", type="rpcClient", name="vmselect", addr=%q} ` , addr ) ) ,
2021-05-12 12:18:45 +00:00
tsdbStatusWithFiltersRequests : metrics . NewCounter ( fmt . Sprintf ( ` vm_requests_total { action="tsdbStatusWithFilters", type="rpcClient", name="vmselect", addr=%q} ` , addr ) ) ,
tsdbStatusWithFiltersErrors : metrics . NewCounter ( fmt . Sprintf ( ` vm_request_errors_total { action="tsdbStatusWithFilters", type="rpcClient", name="vmselect", addr=%q} ` , addr ) ) ,
2020-11-23 13:00:04 +00:00
seriesCountRequests : metrics . NewCounter ( fmt . Sprintf ( ` vm_requests_total { action="seriesCount", type="rpcClient", name="vmselect", addr=%q} ` , addr ) ) ,
seriesCountErrors : metrics . NewCounter ( fmt . Sprintf ( ` vm_request_errors_total { action="seriesCount", type="rpcClient", name="vmselect", addr=%q} ` , addr ) ) ,
searchMetricNamesRequests : metrics . NewCounter ( fmt . Sprintf ( ` vm_requests_total { action="searchMetricNames", type="rpcClient", name="vmselect", addr=%q} ` , addr ) ) ,
searchRequests : metrics . NewCounter ( fmt . Sprintf ( ` vm_requests_total { action="search", type="rpcClient", name="vmselect", addr=%q} ` , addr ) ) ,
searchMetricNamesErrors : metrics . NewCounter ( fmt . Sprintf ( ` vm_request_errors_total { action="searchMetricNames", type="rpcClient", name="vmselect", addr=%q} ` , addr ) ) ,
searchErrors : metrics . NewCounter ( fmt . Sprintf ( ` vm_request_errors_total { action="search", type="rpcClient", name="vmselect", addr=%q} ` , addr ) ) ,
metricBlocksRead : metrics . NewCounter ( fmt . Sprintf ( ` vm_metric_blocks_read_total { name="vmselect", addr=%q} ` , addr ) ) ,
metricRowsRead : metrics . NewCounter ( fmt . Sprintf ( ` vm_metric_rows_read_total { name="vmselect", addr=%q} ` , addr ) ) ,
2019-05-22 21:23:23 +00:00
}
storageNodes = append ( storageNodes , sn )
}
}
// Stop gracefully stops netstorage.
func Stop ( ) {
// Nothing to do at the moment.
2019-05-22 21:16:55 +00:00
}
2019-05-22 21:23:23 +00:00
var (
2020-11-22 22:15:51 +00:00
partialLabelsOnTimeRangeResults = metrics . NewCounter ( ` vm_partial_results_total { type="labels_on_time_range", name="vmselect"} ` )
partialLabelsResults = metrics . NewCounter ( ` vm_partial_results_total { type="labels", name="vmselect"} ` )
partialLabelValuesOnTimeRangeResults = metrics . NewCounter ( ` vm_partial_results_total { type="label_values_on_time_range", name="vmselect"} ` )
partialLabelValuesResults = metrics . NewCounter ( ` vm_partial_results_total { type="label_values", name="vmselect"} ` )
partialTagValueSuffixesResults = metrics . NewCounter ( ` vm_partial_results_total { type="tag_value_suffixes", name="vmselect"} ` )
partialLabelEntriesResults = metrics . NewCounter ( ` vm_partial_results_total { type="label_entries", name="vmselect"} ` )
partialTSDBStatusResults = metrics . NewCounter ( ` vm_partial_results_total { type="tsdb_status", name="vmselect"} ` )
2020-11-22 23:09:34 +00:00
partialSeriesCountResults = metrics . NewCounter ( ` vm_partial_results_total { type="series_count", name="vmselect"} ` )
2020-11-22 22:15:51 +00:00
partialSearchMetricNamesResults = metrics . NewCounter ( ` vm_partial_results_total { type="search_metric_names", name="vmselect"} ` )
partialSearchResults = metrics . NewCounter ( ` vm_partial_results_total { type="search", name="vmselect"} ` )
2019-05-22 21:23:23 +00:00
)
2020-11-16 01:58:12 +00:00
func applyGraphiteRegexpFilter ( filter string , ss [ ] string ) ( [ ] string , error ) {
// Anchor filter regexp to the beginning of the string as Graphite does.
// See https://github.com/graphite-project/graphite-web/blob/3ad279df5cb90b211953e39161df416e54a84948/webapp/graphite/tags/localdatabase.py#L157
filter = "^(?:" + filter + ")"
re , err := regexp . Compile ( filter )
if err != nil {
return nil , fmt . Errorf ( "cannot parse regexp filter=%q: %w" , filter , err )
}
dst := ss [ : 0 ]
for _ , s := range ss {
if re . MatchString ( s ) {
dst = append ( dst , s )
}
}
return dst , nil
}