2019-05-22 21:16:55 +00:00
package netstorage
import (
"container/heap"
2020-08-10 10:17:12 +00:00
"errors"
2019-05-22 21:16:55 +00:00
"flag"
"fmt"
2020-11-16 01:58:12 +00:00
"regexp"
2019-05-22 21:16:55 +00:00
"sort"
"sync"
2020-09-26 01:29:45 +00:00
"sync/atomic"
2021-03-16 23:12:28 +00:00
"time"
2019-05-22 21:16:55 +00:00
2020-09-11 10:18:57 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/searchutils"
2019-05-22 21:16:55 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmstorage"
2020-09-23 11:26:39 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmstorage/promdb"
2019-05-22 21:16:55 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
2020-12-08 18:49:32 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
2020-06-24 16:36:55 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
2020-07-22 11:53:54 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
2019-05-22 21:16:55 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
"github.com/VictoriaMetrics/metrics"
2021-07-15 21:34:33 +00:00
"github.com/valyala/fastrand"
2019-05-22 21:16:55 +00:00
)
var (
2020-09-10 21:28:19 +00:00
maxTagKeysPerSearch = flag . Int ( "search.maxTagKeys" , 100e3 , "The maximum number of tag keys returned from /api/v1/labels" )
maxTagValuesPerSearch = flag . Int ( "search.maxTagValues" , 100e3 , "The maximum number of tag values returned from /api/v1/label/<label_name>/values" )
maxTagValueSuffixesPerSearch = flag . Int ( "search.maxTagValueSuffixesPerSearch" , 100e3 , "The maximum number of tag value suffixes returned from /metrics/find" )
2021-07-15 13:03:26 +00:00
maxMetricsPerSearch = flag . Int ( "search.maxUniqueTimeseries" , 300e3 , "The maximum number of unique time series each search can scan. This option allows limiting memory usage" )
maxSamplesPerSeries = flag . Int ( "search.maxSamplesPerSeries" , 30e6 , "The maximum number of raw samples a single query can scan per each time series. This option allows limiting memory usage" )
2021-07-28 14:40:09 +00:00
maxSamplesPerQuery = flag . Int ( "search.maxSamplesPerQuery" , 1e9 , "The maximum number of raw samples a single query can process across all time series. This protects from heavy queries, which select unexpectedly high number of raw samples. See also -search.maxSamplesPerSeries" )
2019-05-22 21:16:55 +00:00
)
// Result is a single timeseries result.
//
// ProcessSearchQuery returns Result slice.
type Result struct {
// The name of the metric.
MetricName storage . MetricName
// Values are sorted by Timestamps.
Values [ ] float64
Timestamps [ ] int64
// Marshaled MetricName. Used only for results sorting
// in app/vmselect/promql
MetricNameMarshaled [ ] byte
}
func ( r * Result ) reset ( ) {
r . MetricName . Reset ( )
r . Values = r . Values [ : 0 ]
r . Timestamps = r . Timestamps [ : 0 ]
r . MetricNameMarshaled = r . MetricNameMarshaled [ : 0 ]
}
// Results holds results returned from ProcessSearchQuery.
type Results struct {
2019-08-04 19:15:33 +00:00
tr storage . TimeRange
fetchData bool
2020-09-11 10:18:57 +00:00
deadline searchutils . Deadline
2019-05-22 21:16:55 +00:00
packedTimeseries [ ] packedTimeseries
2020-04-27 05:13:41 +00:00
sr * storage . Search
2020-11-04 14:46:10 +00:00
tbf * tmpBlocksFile
2019-05-22 21:16:55 +00:00
}
// Len returns the number of results in rss.
func ( rss * Results ) Len ( ) int {
return len ( rss . packedTimeseries )
}
// Cancel cancels rss work.
func ( rss * Results ) Cancel ( ) {
2020-04-27 05:13:41 +00:00
rss . mustClose ( )
}
func ( rss * Results ) mustClose ( ) {
putStorageSearch ( rss . sr )
rss . sr = nil
2020-11-04 14:46:10 +00:00
putTmpBlocksFile ( rss . tbf )
rss . tbf = nil
2019-05-22 21:16:55 +00:00
}
2020-06-23 17:29:19 +00:00
type timeseriesWork struct {
2021-03-30 10:22:21 +00:00
mustStop * uint32
2020-09-27 20:17:14 +00:00
rss * Results
pts * packedTimeseries
f func ( rs * Result , workerID uint ) error
doneCh chan error
2020-06-23 17:29:19 +00:00
rowsProcessed int
}
2021-02-16 14:08:37 +00:00
func ( tsw * timeseriesWork ) reset ( ) {
2021-03-30 10:22:21 +00:00
tsw . mustStop = nil
2021-02-16 14:08:37 +00:00
tsw . rss = nil
tsw . pts = nil
tsw . f = nil
if n := len ( tsw . doneCh ) ; n > 0 {
logger . Panicf ( "BUG: tsw.doneCh must be empty during reset; it contains %d items instead" , n )
}
tsw . rowsProcessed = 0
}
func getTimeseriesWork ( ) * timeseriesWork {
v := tswPool . Get ( )
if v == nil {
v = & timeseriesWork {
doneCh : make ( chan error , 1 ) ,
}
}
return v . ( * timeseriesWork )
}
func putTimeseriesWork ( tsw * timeseriesWork ) {
tsw . reset ( )
tswPool . Put ( tsw )
}
var tswPool sync . Pool
2021-07-26 12:38:51 +00:00
func scheduleTimeseriesWork ( workChs [ ] chan * timeseriesWork , tsw * timeseriesWork ) {
if len ( workChs ) == 1 {
2021-07-30 09:02:09 +00:00
// Fast path for a single worker
2021-07-26 12:38:51 +00:00
workChs [ 0 ] <- tsw
2021-07-15 21:34:33 +00:00
return
}
2021-07-15 12:40:41 +00:00
attempts := 0
for {
2021-07-26 12:38:51 +00:00
idx := fastrand . Uint32n ( uint32 ( len ( workChs ) ) )
2021-07-15 12:40:41 +00:00
select {
2021-07-26 12:38:51 +00:00
case workChs [ idx ] <- tsw :
2021-07-15 12:40:41 +00:00
return
default :
attempts ++
2021-07-26 12:38:51 +00:00
if attempts >= len ( workChs ) {
workChs [ idx ] <- tsw
2021-07-15 12:40:41 +00:00
return
}
}
2020-06-23 17:29:19 +00:00
}
}
2021-07-30 09:02:09 +00:00
func ( tsw * timeseriesWork ) do ( r * Result , workerID uint ) error {
if atomic . LoadUint32 ( tsw . mustStop ) != 0 {
return nil
}
rss := tsw . rss
if rss . deadline . Exceeded ( ) {
atomic . StoreUint32 ( tsw . mustStop , 1 )
return fmt . Errorf ( "timeout exceeded during query execution: %s" , rss . deadline . String ( ) )
}
if err := tsw . pts . Unpack ( r , rss . tbf , rss . tr , rss . fetchData ) ; err != nil {
atomic . StoreUint32 ( tsw . mustStop , 1 )
return fmt . Errorf ( "error during time series unpacking: %w" , err )
}
if len ( r . Timestamps ) > 0 || ! rss . fetchData {
if err := tsw . f ( r , workerID ) ; err != nil {
2021-03-30 10:22:21 +00:00
atomic . StoreUint32 ( tsw . mustStop , 1 )
2021-07-30 09:02:09 +00:00
return err
2020-06-23 17:29:19 +00:00
}
}
2021-07-30 09:02:09 +00:00
tsw . rowsProcessed = len ( r . Values )
return nil
2020-06-23 17:29:19 +00:00
}
2021-07-15 12:40:41 +00:00
func timeseriesWorker ( ch <- chan * timeseriesWork , workerID uint ) {
2021-07-26 12:38:51 +00:00
v := resultPool . Get ( )
if v == nil {
v = & result { }
}
r := v . ( * result )
2021-07-15 12:40:41 +00:00
for tsw := range ch {
2021-07-30 09:02:09 +00:00
err := tsw . do ( & r . rs , workerID )
tsw . doneCh <- err
}
currentTime := fasttime . UnixTimestamp ( )
if cap ( r . rs . Values ) > 1024 * 1024 && 4 * len ( r . rs . Values ) < cap ( r . rs . Values ) && currentTime - r . lastResetTime > 10 {
// Reset r.rs in order to preseve memory usage after processing big time series with millions of rows.
r . rs = Result { }
r . lastResetTime = currentTime
2020-06-23 17:29:19 +00:00
}
2021-07-26 12:38:51 +00:00
resultPool . Put ( r )
}
type result struct {
rs Result
lastResetTime uint64
2020-06-23 17:29:19 +00:00
}
2021-07-26 12:38:51 +00:00
var resultPool sync . Pool
2020-07-23 16:21:49 +00:00
// RunParallel runs f in parallel for all the results from rss.
2019-05-22 21:16:55 +00:00
//
// f shouldn't hold references to rs after returning.
2019-07-12 12:51:02 +00:00
// workerID is the id of the worker goroutine that calls f.
2020-09-27 20:17:14 +00:00
// Data processing is immediately stopped if f returns non-nil error.
2019-05-22 21:16:55 +00:00
//
// rss becomes unusable after the call to RunParallel.
2020-09-27 20:17:14 +00:00
func ( rss * Results ) RunParallel ( f func ( rs * Result , workerID uint ) error ) error {
2020-04-27 05:13:41 +00:00
defer rss . mustClose ( )
2019-05-22 21:16:55 +00:00
2021-07-26 12:38:51 +00:00
// Spin up local workers.
//
// Do not use a global workChs with a global pool of workers, since it may lead to a deadlock in the following case:
// - RunParallel is called with f, which blocks without forward progress.
// - All the workers in the global pool became blocked in f.
// - workChs is filled up, so it cannot accept new work items from other RunParallel calls.
workers := len ( rss . packedTimeseries )
if workers > gomaxprocs {
workers = gomaxprocs
}
if workers < 1 {
workers = 1
}
workChs := make ( [ ] chan * timeseriesWork , workers )
var workChsWG sync . WaitGroup
for i := 0 ; i < workers ; i ++ {
workChs [ i ] = make ( chan * timeseriesWork , 16 )
workChsWG . Add ( 1 )
go func ( workerID int ) {
defer workChsWG . Done ( )
timeseriesWorker ( workChs [ workerID ] , uint ( workerID ) )
} ( i )
}
2019-05-22 21:16:55 +00:00
// Feed workers with work.
2020-06-23 17:29:19 +00:00
tsws := make ( [ ] * timeseriesWork , len ( rss . packedTimeseries ) )
2021-03-30 10:22:21 +00:00
var mustStop uint32
2019-05-22 21:16:55 +00:00
for i := range rss . packedTimeseries {
2021-02-16 14:08:37 +00:00
tsw := getTimeseriesWork ( )
tsw . rss = rss
tsw . pts = & rss . packedTimeseries [ i ]
tsw . f = f
2021-03-30 10:22:21 +00:00
tsw . mustStop = & mustStop
2021-07-26 12:38:51 +00:00
scheduleTimeseriesWork ( workChs , tsw )
2020-06-23 17:29:19 +00:00
tsws [ i ] = tsw
2019-05-22 21:16:55 +00:00
}
2019-11-23 11:22:55 +00:00
seriesProcessedTotal := len ( rss . packedTimeseries )
2019-05-22 21:16:55 +00:00
rss . packedTimeseries = rss . packedTimeseries [ : 0 ]
2020-06-23 17:29:19 +00:00
// Wait until work is complete.
var firstErr error
rowsProcessedTotal := 0
for _ , tsw := range tsws {
2021-03-30 10:22:21 +00:00
if err := <- tsw . doneCh ; err != nil && firstErr == nil {
// Return just the first error, since other errors are likely duplicate the first error.
2020-06-23 17:29:19 +00:00
firstErr = err
2019-05-22 21:16:55 +00:00
}
2020-06-23 17:29:19 +00:00
rowsProcessedTotal += tsw . rowsProcessed
2021-02-16 14:08:37 +00:00
putTimeseriesWork ( tsw )
2019-05-22 21:16:55 +00:00
}
2020-06-23 17:29:19 +00:00
2019-11-23 11:22:55 +00:00
perQueryRowsProcessed . Update ( float64 ( rowsProcessedTotal ) )
perQuerySeriesProcessed . Update ( float64 ( seriesProcessedTotal ) )
2021-07-26 12:38:51 +00:00
// Shut down local workers
for _ , workCh := range workChs {
close ( workCh )
}
workChsWG . Wait ( )
2020-06-23 17:29:19 +00:00
return firstErr
2019-05-22 21:16:55 +00:00
}
2019-11-23 11:22:55 +00:00
var perQueryRowsProcessed = metrics . NewHistogram ( ` vm_per_query_rows_processed_count ` )
var perQuerySeriesProcessed = metrics . NewHistogram ( ` vm_per_query_series_processed_count ` )
2020-12-08 18:49:32 +00:00
var gomaxprocs = cgroup . AvailableCPUs ( )
2019-05-22 21:16:55 +00:00
type packedTimeseries struct {
metricName string
2020-11-04 14:46:10 +00:00
brs [ ] blockRef
2020-09-23 11:26:39 +00:00
pd * promData
}
type promData struct {
values [ ] float64
timestamps [ ] int64
2019-05-22 21:16:55 +00:00
}
2020-08-06 14:42:15 +00:00
type unpackWorkItem struct {
2020-11-04 14:46:10 +00:00
br blockRef
2020-08-06 14:42:15 +00:00
tr storage . TimeRange
}
2020-06-23 17:29:19 +00:00
type unpackWork struct {
2020-11-04 14:46:10 +00:00
tbf * tmpBlocksFile
2020-09-24 17:16:19 +00:00
ws [ ] unpackWorkItem
sbs [ ] * sortBlock
doneCh chan error
2020-06-23 17:29:19 +00:00
}
2019-05-22 21:16:55 +00:00
2020-07-22 11:53:54 +00:00
func ( upw * unpackWork ) reset ( ) {
2020-11-04 14:46:10 +00:00
upw . tbf = nil
2020-08-06 14:42:15 +00:00
ws := upw . ws
for i := range ws {
w := & ws [ i ]
2020-11-04 14:46:10 +00:00
w . br = blockRef { }
2020-08-06 14:42:15 +00:00
w . tr = storage . TimeRange { }
}
upw . ws = upw . ws [ : 0 ]
sbs := upw . sbs
for i := range sbs {
sbs [ i ] = nil
}
upw . sbs = upw . sbs [ : 0 ]
2020-07-22 11:53:54 +00:00
if n := len ( upw . doneCh ) ; n > 0 {
logger . Panicf ( "BUG: upw.doneCh must be empty; it contains %d items now" , n )
}
}
2020-09-15 18:06:04 +00:00
func ( upw * unpackWork ) unpack ( tmpBlock * storage . Block ) {
2020-08-06 14:42:15 +00:00
for _ , w := range upw . ws {
sb := getSortBlock ( )
2020-11-04 14:46:10 +00:00
if err := sb . unpackFrom ( tmpBlock , upw . tbf , w . br , w . tr ) ; err != nil {
2020-08-06 14:42:15 +00:00
putSortBlock ( sb )
upw . doneCh <- fmt . Errorf ( "cannot unpack block: %w" , err )
return
}
upw . sbs = append ( upw . sbs , sb )
}
upw . doneCh <- nil
}
2020-07-22 11:53:54 +00:00
func getUnpackWork ( ) * unpackWork {
v := unpackWorkPool . Get ( )
if v != nil {
return v . ( * unpackWork )
}
return & unpackWork {
doneCh : make ( chan error , 1 ) ,
}
}
func putUnpackWork ( upw * unpackWork ) {
upw . reset ( )
unpackWorkPool . Put ( upw )
}
var unpackWorkPool sync . Pool
2021-07-30 09:02:09 +00:00
func scheduleUnpackWork ( workChs [ ] chan * unpackWork , uw * unpackWork ) {
if len ( workChs ) == 1 {
// Fast path for a single worker
workChs [ 0 ] <- uw
2021-07-15 21:34:33 +00:00
return
}
2021-07-15 12:40:41 +00:00
attempts := 0
for {
2021-07-30 09:02:09 +00:00
idx := fastrand . Uint32n ( uint32 ( len ( workChs ) ) )
2021-07-15 12:40:41 +00:00
select {
2021-07-30 09:02:09 +00:00
case workChs [ idx ] <- uw :
2021-07-15 12:40:41 +00:00
return
default :
attempts ++
2021-07-30 09:02:09 +00:00
if attempts >= len ( workChs ) {
workChs [ idx ] <- uw
2021-07-15 12:40:41 +00:00
return
}
}
2019-05-22 21:16:55 +00:00
}
2020-06-23 17:29:19 +00:00
}
2021-07-15 12:40:41 +00:00
func unpackWorker ( ch <- chan * unpackWork ) {
2021-07-30 09:02:09 +00:00
v := tmpBlockPool . Get ( )
if v == nil {
v = & storage . Block { }
2019-05-22 21:16:55 +00:00
}
2021-07-30 09:02:09 +00:00
tmpBlock := v . ( * storage . Block )
2021-07-15 12:40:41 +00:00
for upw := range ch {
2021-07-30 09:02:09 +00:00
upw . unpack ( tmpBlock )
2019-05-22 21:16:55 +00:00
}
2021-07-30 09:02:09 +00:00
tmpBlockPool . Put ( v )
2020-06-23 17:29:19 +00:00
}
2019-05-22 21:16:55 +00:00
2021-07-30 09:02:09 +00:00
var tmpBlockPool sync . Pool
2020-08-06 14:42:15 +00:00
// unpackBatchSize is the maximum number of blocks that may be unpacked at once by a single goroutine.
//
2021-07-30 09:02:09 +00:00
// It is better to load a single goroutine for up to one second on a system with many CPU cores
// in order to reduce inter-CPU memory ping-pong.
// A single goroutine can unpack up to 40 millions of rows per second, while a single block contains up to 8K rows.
// So the batch size should be 40M / 8K = 5K.
var unpackBatchSize = 5000
2020-08-06 14:42:15 +00:00
2020-06-23 17:29:19 +00:00
// Unpack unpacks pts to dst.
2020-11-04 14:46:10 +00:00
func ( pts * packedTimeseries ) Unpack ( dst * Result , tbf * tmpBlocksFile , tr storage . TimeRange , fetchData bool ) error {
2020-06-23 17:29:19 +00:00
dst . reset ( )
if err := dst . MetricName . Unmarshal ( bytesutil . ToUnsafeBytes ( pts . metricName ) ) ; err != nil {
2020-06-30 19:58:18 +00:00
return fmt . Errorf ( "cannot unmarshal metricName %q: %w" , pts . metricName , err )
2019-05-22 21:16:55 +00:00
}
2020-09-24 17:16:19 +00:00
if ! fetchData {
// Do not spend resources on data reading and unpacking.
return nil
}
2019-05-22 21:16:55 +00:00
2021-07-30 09:02:09 +00:00
// Spin up local workers.
// Do not use global workers pool, since it increases inter-CPU memory ping-poing,
// which reduces the scalability on systems with many CPU cores.
2020-09-22 20:49:47 +00:00
brsLen := len ( pts . brs )
2021-07-30 09:02:09 +00:00
workers := brsLen / unpackBatchSize
if workers > gomaxprocs {
workers = gomaxprocs
}
if workers < 1 {
workers = 1
}
workChs := make ( [ ] chan * unpackWork , workers )
var workChsWG sync . WaitGroup
for i := 0 ; i < workers ; i ++ {
// Use unbuffered channel on purpose, since there are high chances
// that only a single unpackWork is needed to unpack.
// The unbuffered channel should reduce inter-CPU ping-pong in this case,
// which should improve the performance in a system with many CPU cores.
workChs [ i ] = make ( chan * unpackWork )
workChsWG . Add ( 1 )
go func ( workerID int ) {
defer workChsWG . Done ( )
unpackWorker ( workChs [ workerID ] )
} ( i )
}
// Feed workers with work
2020-09-22 20:49:47 +00:00
upws := make ( [ ] * unpackWork , 0 , 1 + brsLen / unpackBatchSize )
2020-08-06 14:42:15 +00:00
upw := getUnpackWork ( )
2020-11-04 14:46:10 +00:00
upw . tbf = tbf
2020-08-06 14:42:15 +00:00
for _ , br := range pts . brs {
if len ( upw . ws ) >= unpackBatchSize {
2021-07-30 09:02:09 +00:00
scheduleUnpackWork ( workChs , upw )
2020-08-06 14:42:15 +00:00
upws = append ( upws , upw )
upw = getUnpackWork ( )
2020-11-04 14:46:10 +00:00
upw . tbf = tbf
2020-08-06 14:42:15 +00:00
}
upw . ws = append ( upw . ws , unpackWorkItem {
br : br ,
tr : tr ,
} )
2019-05-22 21:16:55 +00:00
}
2021-07-30 09:02:09 +00:00
scheduleUnpackWork ( workChs , upw )
2020-08-06 14:42:15 +00:00
upws = append ( upws , upw )
2020-04-27 05:13:41 +00:00
pts . brs = pts . brs [ : 0 ]
2019-05-22 21:16:55 +00:00
2020-06-23 17:29:19 +00:00
// Wait until work is complete
2021-07-15 13:03:26 +00:00
samples := 0
2020-09-22 20:49:47 +00:00
sbs := make ( [ ] * sortBlock , 0 , brsLen )
2020-06-23 17:29:19 +00:00
var firstErr error
for _ , upw := range upws {
if err := <- upw . doneCh ; err != nil && firstErr == nil {
// Return the first error only, since other errors are likely the same.
firstErr = err
}
if firstErr == nil {
2021-07-15 13:03:26 +00:00
for _ , sb := range upw . sbs {
samples += len ( sb . Timestamps )
}
2021-07-28 14:40:09 +00:00
if * maxSamplesPerSeries <= 0 || samples < * maxSamplesPerSeries {
2021-07-15 13:03:26 +00:00
sbs = append ( sbs , upw . sbs ... )
} else {
firstErr = fmt . Errorf ( "cannot process more than %d samples per series; either increase -search.maxSamplesPerSeries " +
"or reduce time range for the query" , * maxSamplesPerSeries )
}
}
if firstErr != nil {
2020-08-06 14:42:15 +00:00
for _ , sb := range upw . sbs {
putSortBlock ( sb )
}
2019-05-22 21:16:55 +00:00
}
2020-07-22 11:53:54 +00:00
putUnpackWork ( upw )
2019-05-22 21:16:55 +00:00
}
2021-07-30 09:02:09 +00:00
// Shut down local workers
for _ , workCh := range workChs {
close ( workCh )
}
workChsWG . Wait ( )
2020-06-23 17:29:19 +00:00
if firstErr != nil {
return firstErr
2019-05-22 21:16:55 +00:00
}
2020-09-23 11:26:39 +00:00
if pts . pd != nil {
// Add data from Prometheus to dst.
// It usually has smaller timestamps than the data from sbs, so put it first.
//
// There is no need in check for fetchData, since this is already checked when initializing pts.pd.
dst . Values = append ( dst . Values , pts . pd . values ... )
dst . Timestamps = append ( dst . Timestamps , pts . pd . timestamps ... )
}
2019-05-22 21:16:55 +00:00
mergeSortBlocks ( dst , sbs )
2020-09-23 11:26:39 +00:00
if pts . pd != nil {
if ! sort . IsSorted ( dst ) {
sort . Sort ( dst )
}
pts . pd = nil
}
2019-05-22 21:16:55 +00:00
return nil
}
2020-09-23 11:26:39 +00:00
// sort.Interface implementation for Result
// Len implements sort.Interface
func ( r * Result ) Len ( ) int {
return len ( r . Timestamps )
}
// Less implements sort.Interface
func ( r * Result ) Less ( i , j int ) bool {
timestamps := r . Timestamps
return timestamps [ i ] < timestamps [ j ]
}
// Swap implements sort.Interface
func ( r * Result ) Swap ( i , j int ) {
timestamps := r . Timestamps
values := r . Values
timestamps [ i ] , timestamps [ j ] = timestamps [ j ] , timestamps [ i ]
values [ i ] , values [ j ] = values [ j ] , values [ i ]
}
2019-05-22 21:16:55 +00:00
func getSortBlock ( ) * sortBlock {
v := sbPool . Get ( )
if v == nil {
return & sortBlock { }
}
return v . ( * sortBlock )
}
func putSortBlock ( sb * sortBlock ) {
sb . reset ( )
sbPool . Put ( sb )
}
var sbPool sync . Pool
var metricRowsSkipped = metrics . NewCounter ( ` vm_metric_rows_skipped_total { name="vmselect"} ` )
func mergeSortBlocks ( dst * Result , sbh sortBlocksHeap ) {
// Skip empty sort blocks, since they cannot be passed to heap.Init.
src := sbh
sbh = sbh [ : 0 ]
for _ , sb := range src {
if len ( sb . Timestamps ) == 0 {
putSortBlock ( sb )
continue
}
sbh = append ( sbh , sb )
}
if len ( sbh ) == 0 {
return
}
heap . Init ( & sbh )
for {
top := sbh [ 0 ]
heap . Pop ( & sbh )
if len ( sbh ) == 0 {
dst . Timestamps = append ( dst . Timestamps , top . Timestamps [ top . NextIdx : ] ... )
dst . Values = append ( dst . Values , top . Values [ top . NextIdx : ] ... )
putSortBlock ( top )
2020-01-30 23:09:44 +00:00
break
2019-05-22 21:16:55 +00:00
}
sbNext := sbh [ 0 ]
tsNext := sbNext . Timestamps [ sbNext . NextIdx ]
idxNext := len ( top . Timestamps )
if top . Timestamps [ idxNext - 1 ] > tsNext {
idxNext = top . NextIdx
for top . Timestamps [ idxNext ] <= tsNext {
idxNext ++
}
}
dst . Timestamps = append ( dst . Timestamps , top . Timestamps [ top . NextIdx : idxNext ] ... )
dst . Values = append ( dst . Values , top . Values [ top . NextIdx : idxNext ] ... )
if idxNext < len ( top . Timestamps ) {
top . NextIdx = idxNext
heap . Push ( & sbh , top )
} else {
// Return top to the pool.
putSortBlock ( top )
}
}
2020-01-30 23:09:44 +00:00
2020-02-27 21:47:05 +00:00
timestamps , values := storage . DeduplicateSamples ( dst . Timestamps , dst . Values )
dedups := len ( dst . Timestamps ) - len ( timestamps )
dedupsDuringSelect . Add ( dedups )
dst . Timestamps = timestamps
dst . Values = values
2019-05-22 21:16:55 +00:00
}
2020-02-27 21:47:05 +00:00
var dedupsDuringSelect = metrics . NewCounter ( ` vm_deduplicated_samples_total { type="select"} ` )
2019-05-22 21:16:55 +00:00
type sortBlock struct {
Timestamps [ ] int64
Values [ ] float64
NextIdx int
}
func ( sb * sortBlock ) reset ( ) {
sb . Timestamps = sb . Timestamps [ : 0 ]
sb . Values = sb . Values [ : 0 ]
sb . NextIdx = 0
}
2020-11-04 14:46:10 +00:00
func ( sb * sortBlock ) unpackFrom ( tmpBlock * storage . Block , tbf * tmpBlocksFile , br blockRef , tr storage . TimeRange ) error {
2020-09-15 18:06:04 +00:00
tmpBlock . Reset ( )
2020-11-04 14:46:10 +00:00
brReal := tbf . MustReadBlockRefAt ( br . partRef , br . addr )
brReal . MustReadBlock ( tmpBlock , true )
2020-09-24 17:16:19 +00:00
if err := tmpBlock . UnmarshalData ( ) ; err != nil {
return fmt . Errorf ( "cannot unmarshal block: %w" , err )
2019-05-22 21:16:55 +00:00
}
2020-09-26 01:29:45 +00:00
sb . Timestamps , sb . Values = tmpBlock . AppendRowsWithTimeRangeFilter ( sb . Timestamps [ : 0 ] , sb . Values [ : 0 ] , tr )
skippedRows := tmpBlock . RowsCount ( ) - len ( sb . Timestamps )
2019-05-22 21:16:55 +00:00
metricRowsSkipped . Add ( skippedRows )
return nil
}
type sortBlocksHeap [ ] * sortBlock
func ( sbh sortBlocksHeap ) Len ( ) int {
return len ( sbh )
}
func ( sbh sortBlocksHeap ) Less ( i , j int ) bool {
a := sbh [ i ]
b := sbh [ j ]
return a . Timestamps [ a . NextIdx ] < b . Timestamps [ b . NextIdx ]
}
func ( sbh sortBlocksHeap ) Swap ( i , j int ) {
sbh [ i ] , sbh [ j ] = sbh [ j ] , sbh [ i ]
}
func ( sbh * sortBlocksHeap ) Push ( x interface { } ) {
* sbh = append ( * sbh , x . ( * sortBlock ) )
}
func ( sbh * sortBlocksHeap ) Pop ( ) interface { } {
a := * sbh
v := a [ len ( a ) - 1 ]
* sbh = a [ : len ( a ) - 1 ]
return v
}
// DeleteSeries deletes time series matching the given tagFilterss.
2021-02-02 22:24:05 +00:00
func DeleteSeries ( sq * storage . SearchQuery , deadline searchutils . Deadline ) ( int , error ) {
tr := storage . TimeRange {
MinTimestamp : sq . MinTimestamp ,
MaxTimestamp : sq . MaxTimestamp ,
}
tfss , err := setupTfss ( tr , sq . TagFilterss , deadline )
2019-05-22 21:16:55 +00:00
if err != nil {
return 0 , err
}
return vmstorage . DeleteMetrics ( tfss )
}
2020-11-04 22:15:43 +00:00
// GetLabelsOnTimeRange returns labels for the given tr until the given deadline.
func GetLabelsOnTimeRange ( tr storage . TimeRange , deadline searchutils . Deadline ) ( [ ] string , error ) {
if deadline . Exceeded ( ) {
return nil , fmt . Errorf ( "timeout exceeded before starting the query processing: %s" , deadline . String ( ) )
}
labels , err := vmstorage . SearchTagKeysOnTimeRange ( tr , * maxTagKeysPerSearch , deadline . Deadline ( ) )
if err != nil {
return nil , fmt . Errorf ( "error during labels search on time range: %w" , err )
}
// Substitute "" with "__name__"
for i := range labels {
if labels [ i ] == "" {
labels [ i ] = "__name__"
}
}
2020-11-05 00:51:08 +00:00
// Merge labels obtained from Prometheus storage.
promLabels , err := promdb . GetLabelNamesOnTimeRange ( tr , deadline )
if err != nil {
return nil , fmt . Errorf ( "cannot obtain labels from Prometheus storage: %w" , err )
}
labels = mergeStrings ( labels , promLabels )
2020-11-04 22:15:43 +00:00
// Sort labels like Prometheus does
sort . Strings ( labels )
return labels , nil
}
2020-11-15 23:25:38 +00:00
// GetGraphiteTags returns Graphite tags until the given deadline.
2020-11-16 01:58:12 +00:00
func GetGraphiteTags ( filter string , limit int , deadline searchutils . Deadline ) ( [ ] string , error ) {
2020-11-15 23:25:38 +00:00
if deadline . Exceeded ( ) {
return nil , fmt . Errorf ( "timeout exceeded before starting the query processing: %s" , deadline . String ( ) )
}
2020-11-16 01:58:12 +00:00
labels , err := GetLabels ( deadline )
2020-11-15 23:25:38 +00:00
if err != nil {
2020-11-16 01:58:12 +00:00
return nil , err
}
// Substitute "__name__" with "name" for Graphite compatibility
2020-11-15 23:25:38 +00:00
for i := range labels {
2020-12-06 23:07:03 +00:00
if labels [ i ] != "__name__" {
continue
}
// Prevent from duplicate `name` tag.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/942
if hasString ( labels , "name" ) {
labels = append ( labels [ : i ] , labels [ i + 1 : ] ... )
} else {
2020-11-15 23:25:38 +00:00
labels [ i ] = "name"
2020-11-16 12:49:46 +00:00
sort . Strings ( labels )
2020-11-15 23:25:38 +00:00
}
2020-12-06 23:07:03 +00:00
break
2020-11-15 23:25:38 +00:00
}
2020-11-16 13:50:48 +00:00
if len ( filter ) > 0 {
labels , err = applyGraphiteRegexpFilter ( filter , labels )
if err != nil {
return nil , err
}
}
2020-11-16 01:58:12 +00:00
if limit > 0 && limit < len ( labels ) {
labels = labels [ : limit ]
}
2020-11-15 23:25:38 +00:00
return labels , nil
}
2020-12-06 23:07:03 +00:00
func hasString ( a [ ] string , s string ) bool {
for _ , x := range a {
if x == s {
return true
}
}
return false
}
2019-05-22 21:16:55 +00:00
// GetLabels returns labels until the given deadline.
2020-09-11 10:18:57 +00:00
func GetLabels ( deadline searchutils . Deadline ) ( [ ] string , error ) {
2020-07-21 15:34:59 +00:00
if deadline . Exceeded ( ) {
return nil , fmt . Errorf ( "timeout exceeded before starting the query processing: %s" , deadline . String ( ) )
}
2020-09-11 10:18:57 +00:00
labels , err := vmstorage . SearchTagKeys ( * maxTagKeysPerSearch , deadline . Deadline ( ) )
2019-05-22 21:16:55 +00:00
if err != nil {
2020-06-30 19:58:18 +00:00
return nil , fmt . Errorf ( "error during labels search: %w" , err )
2019-05-22 21:16:55 +00:00
}
// Substitute "" with "__name__"
for i := range labels {
if labels [ i ] == "" {
labels [ i ] = "__name__"
}
}
2020-09-23 19:42:44 +00:00
// Merge labels obtained from Prometheus storage.
promLabels , err := promdb . GetLabelNames ( deadline )
if err != nil {
return nil , fmt . Errorf ( "cannot obtain labels from Prometheus storage: %w" , err )
}
labels = mergeStrings ( labels , promLabels )
2019-05-22 21:16:55 +00:00
// Sort labels like Prometheus does
sort . Strings ( labels )
return labels , nil
}
2020-09-23 19:42:44 +00:00
func mergeStrings ( a , b [ ] string ) [ ] string {
if len ( a ) == 0 {
return b
}
if len ( b ) == 0 {
return a
}
m := make ( map [ string ] struct { } , len ( a ) + len ( b ) )
for _ , s := range a {
m [ s ] = struct { } { }
}
for _ , s := range b {
m [ s ] = struct { } { }
}
result := make ( [ ] string , 0 , len ( m ) )
for s := range m {
result = append ( result , s )
}
return result
}
2020-11-04 22:15:43 +00:00
// GetLabelValuesOnTimeRange returns label values for the given labelName on the given tr
// until the given deadline.
func GetLabelValuesOnTimeRange ( labelName string , tr storage . TimeRange , deadline searchutils . Deadline ) ( [ ] string , error ) {
if deadline . Exceeded ( ) {
return nil , fmt . Errorf ( "timeout exceeded before starting the query processing: %s" , deadline . String ( ) )
}
if labelName == "__name__" {
labelName = ""
}
// Search for tag values
labelValues , err := vmstorage . SearchTagValuesOnTimeRange ( [ ] byte ( labelName ) , tr , * maxTagValuesPerSearch , deadline . Deadline ( ) )
if err != nil {
return nil , fmt . Errorf ( "error during label values search on time range for labelName=%q: %w" , labelName , err )
}
2020-11-05 00:51:08 +00:00
// Merge label values obtained from Prometheus storage.
promLabelValues , err := promdb . GetLabelValuesOnTimeRange ( labelName , tr , deadline )
if err != nil {
return nil , fmt . Errorf ( "cannot obtain label values on time range for %q from Prometheus storage: %w" , labelName , err )
}
labelValues = mergeStrings ( labelValues , promLabelValues )
2020-11-04 22:15:43 +00:00
// Sort labelValues like Prometheus does
sort . Strings ( labelValues )
return labelValues , nil
}
2020-11-16 01:31:09 +00:00
// GetGraphiteTagValues returns tag values for the given tagName until the given deadline.
2020-11-16 01:58:12 +00:00
func GetGraphiteTagValues ( tagName , filter string , limit int , deadline searchutils . Deadline ) ( [ ] string , error ) {
2020-11-16 01:31:09 +00:00
if deadline . Exceeded ( ) {
return nil , fmt . Errorf ( "timeout exceeded before starting the query processing: %s" , deadline . String ( ) )
}
if tagName == "name" {
tagName = ""
}
2020-11-16 01:58:12 +00:00
tagValues , err := GetLabelValues ( tagName , deadline )
if err != nil {
return nil , err
2020-11-16 01:31:09 +00:00
}
2020-11-16 01:58:12 +00:00
if len ( filter ) > 0 {
tagValues , err = applyGraphiteRegexpFilter ( filter , tagValues )
if err != nil {
return nil , err
}
2020-11-16 01:31:09 +00:00
}
2020-11-16 01:58:12 +00:00
if limit > 0 && limit < len ( tagValues ) {
tagValues = tagValues [ : limit ]
2020-11-16 01:31:09 +00:00
}
return tagValues , nil
}
2019-05-22 21:16:55 +00:00
// GetLabelValues returns label values for the given labelName
// until the given deadline.
2020-09-11 10:18:57 +00:00
func GetLabelValues ( labelName string , deadline searchutils . Deadline ) ( [ ] string , error ) {
2020-07-21 15:34:59 +00:00
if deadline . Exceeded ( ) {
return nil , fmt . Errorf ( "timeout exceeded before starting the query processing: %s" , deadline . String ( ) )
}
2019-05-22 21:16:55 +00:00
if labelName == "__name__" {
labelName = ""
}
// Search for tag values
2020-09-11 10:18:57 +00:00
labelValues , err := vmstorage . SearchTagValues ( [ ] byte ( labelName ) , * maxTagValuesPerSearch , deadline . Deadline ( ) )
2019-05-22 21:16:55 +00:00
if err != nil {
2020-06-30 19:58:18 +00:00
return nil , fmt . Errorf ( "error during label values search for labelName=%q: %w" , labelName , err )
2019-05-22 21:16:55 +00:00
}
2020-09-23 19:42:44 +00:00
// Merge label values obtained from Prometheus storage.
promLabelValues , err := promdb . GetLabelValues ( labelName , deadline )
if err != nil {
return nil , fmt . Errorf ( "cannot obtain label values for %q from Prometheus storage: %w" , labelName , err )
}
labelValues = mergeStrings ( labelValues , promLabelValues )
2019-05-22 21:16:55 +00:00
// Sort labelValues like Prometheus does
sort . Strings ( labelValues )
return labelValues , nil
}
2020-09-10 21:28:19 +00:00
// GetTagValueSuffixes returns tag value suffixes for the given tagKey and the given tagValuePrefix.
//
// It can be used for implementing https://graphite-api.readthedocs.io/en/latest/api.html#metrics-find
2020-09-11 10:18:57 +00:00
func GetTagValueSuffixes ( tr storage . TimeRange , tagKey , tagValuePrefix string , delimiter byte , deadline searchutils . Deadline ) ( [ ] string , error ) {
2020-09-10 21:28:19 +00:00
if deadline . Exceeded ( ) {
return nil , fmt . Errorf ( "timeout exceeded before starting the query processing: %s" , deadline . String ( ) )
}
2020-09-11 10:18:57 +00:00
suffixes , err := vmstorage . SearchTagValueSuffixes ( tr , [ ] byte ( tagKey ) , [ ] byte ( tagValuePrefix ) , delimiter , * maxTagValueSuffixesPerSearch , deadline . Deadline ( ) )
2020-09-10 21:28:19 +00:00
if err != nil {
return nil , fmt . Errorf ( "error during search for suffixes for tagKey=%q, tagValuePrefix=%q, delimiter=%c on time range %s: %w" ,
tagKey , tagValuePrefix , delimiter , tr . String ( ) , err )
}
2021-02-02 22:24:05 +00:00
if len ( suffixes ) >= * maxTagValueSuffixesPerSearch {
return nil , fmt . Errorf ( "more than -search.maxTagValueSuffixesPerSearch=%d tag value suffixes found for tagKey=%q, tagValuePrefix=%q, delimiter=%c on time range %s; " +
"either narrow down the query or increase -search.maxTagValueSuffixesPerSearch command-line flag value" ,
* maxTagValueSuffixesPerSearch , tagKey , tagValuePrefix , delimiter , tr . String ( ) )
}
2020-09-10 21:28:19 +00:00
return suffixes , nil
}
2019-06-10 15:55:20 +00:00
// GetLabelEntries returns all the label entries until the given deadline.
2020-09-11 10:18:57 +00:00
func GetLabelEntries ( deadline searchutils . Deadline ) ( [ ] storage . TagEntry , error ) {
2020-07-21 15:34:59 +00:00
if deadline . Exceeded ( ) {
return nil , fmt . Errorf ( "timeout exceeded before starting the query processing: %s" , deadline . String ( ) )
}
2020-09-11 10:18:57 +00:00
labelEntries , err := vmstorage . SearchTagEntries ( * maxTagKeysPerSearch , * maxTagValuesPerSearch , deadline . Deadline ( ) )
2019-06-10 15:55:20 +00:00
if err != nil {
2020-06-30 19:58:18 +00:00
return nil , fmt . Errorf ( "error during label entries request: %w" , err )
2019-06-10 15:55:20 +00:00
}
// Substitute "" with "__name__"
for i := range labelEntries {
e := & labelEntries [ i ]
if e . Key == "" {
e . Key = "__name__"
}
}
// Sort labelEntries by the number of label values in each entry.
sort . Slice ( labelEntries , func ( i , j int ) bool {
a , b := labelEntries [ i ] . Values , labelEntries [ j ] . Values
2019-12-14 22:07:09 +00:00
if len ( a ) != len ( b ) {
return len ( a ) > len ( b )
2019-06-10 15:55:20 +00:00
}
2019-12-14 22:07:09 +00:00
return labelEntries [ i ] . Key > labelEntries [ j ] . Key
2019-06-10 15:55:20 +00:00
} )
return labelEntries , nil
}
2020-04-22 16:57:36 +00:00
// GetTSDBStatusForDate returns tsdb status according to https://prometheus.io/docs/prometheus/latest/querying/api/#tsdb-stats
2020-09-11 10:18:57 +00:00
func GetTSDBStatusForDate ( deadline searchutils . Deadline , date uint64 , topN int ) ( * storage . TSDBStatus , error ) {
2020-07-21 15:34:59 +00:00
if deadline . Exceeded ( ) {
return nil , fmt . Errorf ( "timeout exceeded before starting the query processing: %s" , deadline . String ( ) )
}
2020-09-11 10:18:57 +00:00
status , err := vmstorage . GetTSDBStatusForDate ( date , topN , deadline . Deadline ( ) )
2020-04-22 16:57:36 +00:00
if err != nil {
2020-06-30 19:58:18 +00:00
return nil , fmt . Errorf ( "error during tsdb status request: %w" , err )
2020-04-22 16:57:36 +00:00
}
return status , nil
}
2021-05-12 12:18:45 +00:00
// GetTSDBStatusWithFilters returns tsdb status according to https://prometheus.io/docs/prometheus/latest/querying/api/#tsdb-stats
2021-05-12 13:32:48 +00:00
//
// It accepts aribtrary filters on time series in sq.
2021-05-12 12:18:45 +00:00
func GetTSDBStatusWithFilters ( deadline searchutils . Deadline , sq * storage . SearchQuery , topN int ) ( * storage . TSDBStatus , error ) {
if deadline . Exceeded ( ) {
return nil , fmt . Errorf ( "timeout exceeded before starting the query processing: %s" , deadline . String ( ) )
}
tr := storage . TimeRange {
MinTimestamp : sq . MinTimestamp ,
2021-05-12 13:32:48 +00:00
MaxTimestamp : sq . MaxTimestamp ,
2021-05-12 12:18:45 +00:00
}
tfss , err := setupTfss ( tr , sq . TagFilterss , deadline )
if err != nil {
return nil , err
}
2021-05-12 13:32:48 +00:00
date := uint64 ( tr . MinTimestamp ) / ( 3600 * 24 * 1000 )
status , err := vmstorage . GetTSDBStatusWithFiltersForDate ( tfss , date , topN , deadline . Deadline ( ) )
2021-05-12 12:18:45 +00:00
if err != nil {
2021-05-12 13:32:48 +00:00
return nil , fmt . Errorf ( "error during tsdb status with filters request: %w" , err )
2021-05-12 12:18:45 +00:00
}
return status , nil
2020-04-22 16:57:36 +00:00
}
2019-05-22 21:16:55 +00:00
// GetSeriesCount returns the number of unique series.
2020-09-11 10:18:57 +00:00
func GetSeriesCount ( deadline searchutils . Deadline ) ( uint64 , error ) {
2020-07-21 15:34:59 +00:00
if deadline . Exceeded ( ) {
return 0 , fmt . Errorf ( "timeout exceeded before starting the query processing: %s" , deadline . String ( ) )
}
2020-09-11 10:18:57 +00:00
n , err := vmstorage . GetSeriesCount ( deadline . Deadline ( ) )
2019-05-22 21:16:55 +00:00
if err != nil {
2020-06-30 19:58:18 +00:00
return 0 , fmt . Errorf ( "error during series count request: %w" , err )
2019-05-22 21:16:55 +00:00
}
return n , nil
}
func getStorageSearch ( ) * storage . Search {
v := ssPool . Get ( )
if v == nil {
return & storage . Search { }
}
return v . ( * storage . Search )
}
func putStorageSearch ( sr * storage . Search ) {
sr . MustClose ( )
ssPool . Put ( sr )
}
var ssPool sync . Pool
2020-09-26 01:29:45 +00:00
// ExportBlocks searches for time series matching sq and calls f for each found block.
//
// f is called in parallel from multiple goroutines.
2020-09-27 20:17:14 +00:00
// Data processing is immediately stopped if f returns non-nil error.
2020-09-26 01:29:45 +00:00
// It is the responsibility of f to call b.UnmarshalData before reading timestamps and values from the block.
// It is the responsibility of f to filter blocks according to the given tr.
func ExportBlocks ( sq * storage . SearchQuery , deadline searchutils . Deadline , f func ( mn * storage . MetricName , b * storage . Block , tr storage . TimeRange ) error ) error {
if deadline . Exceeded ( ) {
return fmt . Errorf ( "timeout exceeded before starting data export: %s" , deadline . String ( ) )
}
tr := storage . TimeRange {
MinTimestamp : sq . MinTimestamp ,
MaxTimestamp : sq . MaxTimestamp ,
}
if err := vmstorage . CheckTimeRange ( tr ) ; err != nil {
return err
}
2021-02-02 22:24:05 +00:00
tfss , err := setupTfss ( tr , sq . TagFilterss , deadline )
if err != nil {
return err
}
2020-09-26 01:29:45 +00:00
vmstorage . WG . Add ( 1 )
defer vmstorage . WG . Done ( )
sr := getStorageSearch ( )
defer putStorageSearch ( sr )
2021-03-16 23:12:28 +00:00
startTime := time . Now ( )
2020-09-26 01:29:45 +00:00
sr . Init ( vmstorage . Storage , tfss , tr , * maxMetricsPerSearch , deadline . Deadline ( ) )
2021-03-16 23:12:28 +00:00
indexSearchDuration . UpdateDuration ( startTime )
2020-09-26 01:29:45 +00:00
// Start workers that call f in parallel on available CPU cores.
2020-12-08 18:49:32 +00:00
gomaxprocs := cgroup . AvailableCPUs ( )
2020-09-26 01:29:45 +00:00
workCh := make ( chan * exportWork , gomaxprocs * 8 )
var (
errGlobal error
errGlobalLock sync . Mutex
mustStop uint32
)
var wg sync . WaitGroup
wg . Add ( gomaxprocs )
for i := 0 ; i < gomaxprocs ; i ++ {
go func ( ) {
defer wg . Done ( )
for xw := range workCh {
if err := f ( & xw . mn , & xw . b , tr ) ; err != nil {
errGlobalLock . Lock ( )
if errGlobal != nil {
errGlobal = err
atomic . StoreUint32 ( & mustStop , 1 )
}
errGlobalLock . Unlock ( )
}
xw . reset ( )
exportWorkPool . Put ( xw )
}
} ( )
}
// Feed workers with work
blocksRead := 0
for sr . NextMetricBlock ( ) {
blocksRead ++
if deadline . Exceeded ( ) {
return fmt . Errorf ( "timeout exceeded while fetching data block #%d from storage: %s" , blocksRead , deadline . String ( ) )
}
if atomic . LoadUint32 ( & mustStop ) != 0 {
break
}
xw := exportWorkPool . Get ( ) . ( * exportWork )
if err := xw . mn . Unmarshal ( sr . MetricBlockRef . MetricName ) ; err != nil {
return fmt . Errorf ( "cannot unmarshal metricName for block #%d: %w" , blocksRead , err )
}
sr . MetricBlockRef . BlockRef . MustReadBlock ( & xw . b , true )
workCh <- xw
}
close ( workCh )
// Wait for workers to finish.
wg . Wait ( )
// Check errors.
err = sr . Error ( )
if err == nil {
err = errGlobal
}
if err != nil {
if errors . Is ( err , storage . ErrDeadlineExceeded ) {
return fmt . Errorf ( "timeout exceeded during the query: %s" , deadline . String ( ) )
}
return fmt . Errorf ( "search error after reading %d data blocks: %w" , blocksRead , err )
}
return nil
}
type exportWork struct {
mn storage . MetricName
b storage . Block
}
func ( xw * exportWork ) reset ( ) {
xw . mn . Reset ( )
xw . b . Reset ( )
}
var exportWorkPool = & sync . Pool {
New : func ( ) interface { } {
return & exportWork { }
} ,
}
2020-11-16 08:55:55 +00:00
// SearchMetricNames returns all the metric names matching sq until the given deadline.
func SearchMetricNames ( sq * storage . SearchQuery , deadline searchutils . Deadline ) ( [ ] storage . MetricName , error ) {
if deadline . Exceeded ( ) {
return nil , fmt . Errorf ( "timeout exceeded before starting to search metric names: %s" , deadline . String ( ) )
}
// Setup search.
tr := storage . TimeRange {
MinTimestamp : sq . MinTimestamp ,
MaxTimestamp : sq . MaxTimestamp ,
}
if err := vmstorage . CheckTimeRange ( tr ) ; err != nil {
return nil , err
}
2021-02-02 22:24:05 +00:00
tfss , err := setupTfss ( tr , sq . TagFilterss , deadline )
if err != nil {
return nil , err
}
2020-11-16 08:55:55 +00:00
mns , err := vmstorage . SearchMetricNames ( tfss , tr , * maxMetricsPerSearch , deadline . Deadline ( ) )
if err != nil {
return nil , fmt . Errorf ( "cannot find metric names: %w" , err )
}
return mns , nil
}
2020-09-26 01:29:45 +00:00
// ProcessSearchQuery performs sq until the given deadline.
2020-04-27 05:13:41 +00:00
//
// Results.RunParallel or Results.Cancel must be called on the returned Results.
2020-09-11 10:18:57 +00:00
func ProcessSearchQuery ( sq * storage . SearchQuery , fetchData bool , deadline searchutils . Deadline ) ( * Results , error ) {
2020-07-21 15:34:59 +00:00
if deadline . Exceeded ( ) {
return nil , fmt . Errorf ( "timeout exceeded before starting the query processing: %s" , deadline . String ( ) )
}
2019-05-22 21:16:55 +00:00
// Setup search.
tr := storage . TimeRange {
MinTimestamp : sq . MinTimestamp ,
MaxTimestamp : sq . MaxTimestamp ,
}
2020-06-30 21:20:13 +00:00
if err := vmstorage . CheckTimeRange ( tr ) ; err != nil {
return nil , err
}
2021-02-02 22:24:05 +00:00
tfss , err := setupTfss ( tr , sq . TagFilterss , deadline )
if err != nil {
return nil , err
}
2019-05-22 21:16:55 +00:00
vmstorage . WG . Add ( 1 )
defer vmstorage . WG . Done ( )
sr := getStorageSearch ( )
2021-03-16 23:12:28 +00:00
startTime := time . Now ( )
2020-09-11 10:18:57 +00:00
maxSeriesCount := sr . Init ( vmstorage . Storage , tfss , tr , * maxMetricsPerSearch , deadline . Deadline ( ) )
2021-03-16 23:12:28 +00:00
indexSearchDuration . UpdateDuration ( startTime )
2020-11-04 14:46:10 +00:00
m := make ( map [ string ] [ ] blockRef , maxSeriesCount )
2020-08-06 16:17:51 +00:00
orderedMetricNames := make ( [ ] string , 0 , maxSeriesCount )
2019-07-28 09:12:30 +00:00
blocksRead := 0
2021-07-28 14:40:09 +00:00
samples := 0
2020-11-04 14:46:10 +00:00
tbf := getTmpBlocksFile ( )
var buf [ ] byte
2019-05-22 21:16:55 +00:00
for sr . NextMetricBlock ( ) {
2019-07-28 09:12:30 +00:00
blocksRead ++
2020-07-21 15:34:59 +00:00
if deadline . Exceeded ( ) {
2020-11-04 14:46:10 +00:00
putTmpBlocksFile ( tbf )
2020-09-22 19:56:49 +00:00
putStorageSearch ( sr )
2020-01-22 13:50:34 +00:00
return nil , fmt . Errorf ( "timeout exceeded while fetching data block #%d from storage: %s" , blocksRead , deadline . String ( ) )
2019-05-22 21:16:55 +00:00
}
2021-07-28 14:40:09 +00:00
br := sr . MetricBlockRef . BlockRef
samples += br . RowsCount ( )
if * maxSamplesPerQuery > 0 && samples > * maxSamplesPerQuery {
putTmpBlocksFile ( tbf )
putStorageSearch ( sr )
return nil , fmt . Errorf ( "cannot select more than -search.maxSamplesPerQuery=%d samples; possible solutions: to increase the -search.maxSamplesPerQuery; to reduce time range for the query; to use more specific label filters in order to select lower number of series" , * maxSamplesPerQuery )
}
buf = br . Marshal ( buf [ : 0 ] )
2020-11-04 14:46:10 +00:00
addr , err := tbf . WriteBlockRefData ( buf )
if err != nil {
putTmpBlocksFile ( tbf )
putStorageSearch ( sr )
return nil , fmt . Errorf ( "cannot write %d bytes to temporary file: %w" , len ( buf ) , err )
}
2020-04-27 05:13:41 +00:00
metricName := sr . MetricBlockRef . MetricName
2021-07-30 06:55:08 +00:00
brs := m [ string ( metricName ) ]
2020-11-04 14:46:10 +00:00
brs = append ( brs , blockRef {
2021-07-28 14:40:09 +00:00
partRef : br . PartRef ( ) ,
2020-11-04 14:46:10 +00:00
addr : addr ,
} )
2020-07-23 11:11:48 +00:00
if len ( brs ) > 1 {
2021-07-30 06:55:08 +00:00
m [ string ( metricName ) ] = brs
2020-07-23 10:53:30 +00:00
} else {
// An optimization for big number of time series with long metricName values:
// use only a single copy of metricName for both orderedMetricNames and m.
2020-07-23 14:53:52 +00:00
orderedMetricNames = append ( orderedMetricNames , string ( metricName ) )
m [ orderedMetricNames [ len ( orderedMetricNames ) - 1 ] ] = brs
2020-04-26 13:25:35 +00:00
}
2019-05-22 21:16:55 +00:00
}
if err := sr . Error ( ) ; err != nil {
2020-11-04 14:46:10 +00:00
putTmpBlocksFile ( tbf )
2020-09-22 19:56:49 +00:00
putStorageSearch ( sr )
2020-08-10 10:17:12 +00:00
if errors . Is ( err , storage . ErrDeadlineExceeded ) {
return nil , fmt . Errorf ( "timeout exceeded during the query: %s" , deadline . String ( ) )
}
2020-06-30 19:58:18 +00:00
return nil , fmt . Errorf ( "search error after reading %d data blocks: %w" , blocksRead , err )
2019-05-22 21:16:55 +00:00
}
2020-11-04 14:46:10 +00:00
if err := tbf . Finalize ( ) ; err != nil {
putTmpBlocksFile ( tbf )
putStorageSearch ( sr )
return nil , fmt . Errorf ( "cannot finalize temporary file: %w" , err )
}
2019-05-22 21:16:55 +00:00
2020-09-23 11:26:39 +00:00
// Fetch data from promdb.
pm := make ( map [ string ] * promData )
err = promdb . VisitSeries ( sq , fetchData , deadline , func ( metricName [ ] byte , values [ ] float64 , timestamps [ ] int64 ) {
pd := pm [ string ( metricName ) ]
if pd == nil {
if _ , ok := m [ string ( metricName ) ] ; ! ok {
orderedMetricNames = append ( orderedMetricNames , string ( metricName ) )
}
pd = & promData { }
pm [ string ( metricName ) ] = pd
}
pd . values = append ( pd . values , values ... )
pd . timestamps = append ( pd . timestamps , timestamps ... )
} )
if err != nil {
2021-01-13 09:53:11 +00:00
putTmpBlocksFile ( tbf )
2020-09-23 11:26:39 +00:00
putStorageSearch ( sr )
return nil , fmt . Errorf ( "error when searching in Prometheus data: %w" , err )
}
2019-05-22 21:16:55 +00:00
var rss Results
rss . tr = tr
2019-08-04 19:15:33 +00:00
rss . fetchData = fetchData
2019-05-22 21:16:55 +00:00
rss . deadline = deadline
2020-04-26 13:25:35 +00:00
pts := make ( [ ] packedTimeseries , len ( orderedMetricNames ) )
for i , metricName := range orderedMetricNames {
pts [ i ] = packedTimeseries {
metricName : metricName ,
2020-04-27 05:13:41 +00:00
brs : m [ metricName ] ,
2020-09-23 11:26:39 +00:00
pd : pm [ metricName ] ,
2020-04-26 13:25:35 +00:00
}
2019-05-22 21:16:55 +00:00
}
2020-04-26 13:25:35 +00:00
rss . packedTimeseries = pts
2020-04-27 05:13:41 +00:00
rss . sr = sr
2020-11-04 14:46:10 +00:00
rss . tbf = tbf
2019-05-22 21:16:55 +00:00
return & rss , nil
}
2021-03-16 23:12:28 +00:00
var indexSearchDuration = metrics . NewHistogram ( ` vm_index_search_duration_seconds ` )
2020-11-04 14:46:10 +00:00
type blockRef struct {
partRef storage . PartRef
addr tmpBlockAddr
}
2021-02-02 22:24:05 +00:00
func setupTfss ( tr storage . TimeRange , tagFilterss [ ] [ ] storage . TagFilter , deadline searchutils . Deadline ) ( [ ] * storage . TagFilters , error ) {
2019-05-22 21:16:55 +00:00
tfss := make ( [ ] * storage . TagFilters , 0 , len ( tagFilterss ) )
for _ , tagFilters := range tagFilterss {
tfs := storage . NewTagFilters ( )
for i := range tagFilters {
tf := & tagFilters [ i ]
2021-02-02 22:24:05 +00:00
if string ( tf . Key ) == "__graphite__" {
query := tf . Value
paths , err := vmstorage . SearchGraphitePaths ( tr , query , * maxMetricsPerSearch , deadline . Deadline ( ) )
if err != nil {
return nil , fmt . Errorf ( "error when searching for Graphite paths for query %q: %w" , query , err )
}
if len ( paths ) >= * maxMetricsPerSearch {
return nil , fmt . Errorf ( "more than -search.maxUniqueTimeseries=%d time series match Graphite query %q; " +
"either narrow down the query or increase -search.maxUniqueTimeseries command-line flag value" , * maxMetricsPerSearch , query )
}
tfs . AddGraphiteQuery ( query , paths , tf . IsNegative )
continue
}
2019-05-22 21:16:55 +00:00
if err := tfs . Add ( tf . Key , tf . Value , tf . IsNegative , tf . IsRegexp ) ; err != nil {
2020-06-30 19:58:18 +00:00
return nil , fmt . Errorf ( "cannot parse tag filter %s: %w" , tf , err )
2019-05-22 21:16:55 +00:00
}
}
tfss = append ( tfss , tfs )
}
return tfss , nil
}
2020-11-16 01:58:12 +00:00
func applyGraphiteRegexpFilter ( filter string , ss [ ] string ) ( [ ] string , error ) {
// Anchor filter regexp to the beginning of the string as Graphite does.
// See https://github.com/graphite-project/graphite-web/blob/3ad279df5cb90b211953e39161df416e54a84948/webapp/graphite/tags/localdatabase.py#L157
filter = "^(?:" + filter + ")"
re , err := regexp . Compile ( filter )
if err != nil {
return nil , fmt . Errorf ( "cannot parse regexp filter=%q: %w" , filter , err )
}
dst := ss [ : 0 ]
for _ , s := range ss {
if re . MatchString ( s ) {
dst = append ( dst , s )
}
}
return dst , nil
}