2019-05-22 21:16:55 +00:00
package promql
import (
"flag"
"fmt"
"math"
"runtime"
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/netstorage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
"github.com/VictoriaMetrics/metrics"
2020-04-28 12:28:22 +00:00
"github.com/VictoriaMetrics/metricsql"
2019-05-22 21:16:55 +00:00
)
var (
2020-04-28 09:51:36 +00:00
maxPointsPerTimeseries = flag . Int ( "search.maxPointsPerTimeseries" , 30e3 , "The maximum points per a single timeseries returned from the search" )
estimatedSeriesCountAfterAggregation = flag . Int ( "search.estimatedSeriesCountAfterAggregation" , 1000 , "Estimated number of series returned by aggregation with grouping " +
"such as `sum(...) by (...)`. Increase this value in order to reduce the probability of OOMs. Reduce this value in order to reduce 'not enough memory' errors " +
"for queries containing aggregation with grouping" )
2019-05-22 21:16:55 +00:00
)
2019-05-25 18:51:11 +00:00
// The minimum number of points per timeseries for enabling time rounding.
2019-05-22 21:16:55 +00:00
// This improves cache hit ratio for frequently requested queries over
// big time ranges.
const minTimeseriesPointsForTimeRounding = 50
// ValidateMaxPointsPerTimeseries checks the maximum number of points that
// may be returned per each time series.
//
// The number mustn't exceed -search.maxPointsPerTimeseries.
func ValidateMaxPointsPerTimeseries ( start , end , step int64 ) error {
points := ( end - start ) / step + 1
if uint64 ( points ) > uint64 ( * maxPointsPerTimeseries ) {
2019-06-24 19:52:06 +00:00
return fmt . Errorf ( ` too many points for the given step=%d, start=%d and end=%d: %d; cannot exceed -search.maxPointsPerTimeseries=%d ` ,
2019-05-22 21:16:55 +00:00
step , start , end , uint64 ( points ) , * maxPointsPerTimeseries )
}
return nil
}
// AdjustStartEnd adjusts start and end values, so response caching may be enabled.
//
// See EvalConfig.mayCache for details.
func AdjustStartEnd ( start , end , step int64 ) ( int64 , int64 ) {
points := ( end - start ) / step + 1
if points < minTimeseriesPointsForTimeRounding {
// Too small number of points for rounding.
return start , end
}
// Round start and end to values divisible by step in order
// to enable response caching (see EvalConfig.mayCache).
// Round start to the nearest smaller value divisible by step.
start -= start % step
// Round end to the nearest bigger value divisible by step.
adjust := end % step
if adjust > 0 {
end += step - adjust
}
2019-12-24 20:44:24 +00:00
// Make sure that the new number of points is the same as the initial number of points.
newPoints := ( end - start ) / step + 1
for newPoints > points {
end -= step
newPoints --
}
2019-05-22 21:16:55 +00:00
return start , end
}
// EvalConfig is the configuration required for query evaluation via Exec
type EvalConfig struct {
Start int64
End int64
Step int64
Deadline netstorage . Deadline
MayCache bool
2019-10-15 16:12:27 +00:00
// LookbackDelta is analog to `-query.lookback-delta` from Prometheus.
LookbackDelta int64
2019-05-22 21:16:55 +00:00
timestamps [ ] int64
timestampsOnce sync . Once
}
// newEvalConfig returns new EvalConfig copy from src.
func newEvalConfig ( src * EvalConfig ) * EvalConfig {
var ec EvalConfig
ec . Start = src . Start
ec . End = src . End
ec . Step = src . Step
ec . Deadline = src . Deadline
ec . MayCache = src . MayCache
2019-10-15 16:12:27 +00:00
ec . LookbackDelta = src . LookbackDelta
2019-05-22 21:16:55 +00:00
// do not copy src.timestamps - they must be generated again.
return & ec
}
func ( ec * EvalConfig ) validate ( ) {
if ec . Start > ec . End {
logger . Panicf ( "BUG: start cannot exceed end; got %d vs %d" , ec . Start , ec . End )
}
if ec . Step <= 0 {
logger . Panicf ( "BUG: step must be greater than 0; got %d" , ec . Step )
}
}
func ( ec * EvalConfig ) mayCache ( ) bool {
if ! ec . MayCache {
return false
}
if ec . Start % ec . Step != 0 {
return false
}
if ec . End % ec . Step != 0 {
return false
}
return true
}
func ( ec * EvalConfig ) getSharedTimestamps ( ) [ ] int64 {
ec . timestampsOnce . Do ( ec . timestampsInit )
return ec . timestamps
}
func ( ec * EvalConfig ) timestampsInit ( ) {
ec . timestamps = getTimestamps ( ec . Start , ec . End , ec . Step )
}
func getTimestamps ( start , end , step int64 ) [ ] int64 {
// Sanity checks.
if step <= 0 {
logger . Panicf ( "BUG: Step must be bigger than 0; got %d" , step )
}
if start > end {
logger . Panicf ( "BUG: Start cannot exceed End; got %d vs %d" , start , end )
}
if err := ValidateMaxPointsPerTimeseries ( start , end , step ) ; err != nil {
logger . Panicf ( "BUG: %s; this must be validated before the call to getTimestamps" , err )
}
// Prepare timestamps.
points := 1 + ( end - start ) / step
timestamps := make ( [ ] int64 , points )
for i := range timestamps {
timestamps [ i ] = start
start += step
}
return timestamps
}
2019-12-25 19:35:47 +00:00
func evalExpr ( ec * EvalConfig , e metricsql . Expr ) ( [ ] * timeseries , error ) {
if me , ok := e . ( * metricsql . MetricExpr ) ; ok {
re := & metricsql . RollupExpr {
2019-05-22 21:16:55 +00:00
Expr : me ,
}
2020-01-03 18:42:51 +00:00
rv , err := evalRollupFunc ( ec , "default_rollup" , rollupDefault , e , re , nil )
2019-05-22 21:16:55 +00:00
if err != nil {
return nil , fmt . Errorf ( ` cannot evaluate %q: %s ` , me . AppendString ( nil ) , err )
}
return rv , nil
}
2019-12-25 19:35:47 +00:00
if re , ok := e . ( * metricsql . RollupExpr ) ; ok {
2020-01-03 18:42:51 +00:00
rv , err := evalRollupFunc ( ec , "default_rollup" , rollupDefault , e , re , nil )
2019-05-22 21:16:55 +00:00
if err != nil {
return nil , fmt . Errorf ( ` cannot evaluate %q: %s ` , re . AppendString ( nil ) , err )
}
return rv , nil
}
2019-12-25 19:35:47 +00:00
if fe , ok := e . ( * metricsql . FuncExpr ) ; ok {
2019-05-22 21:16:55 +00:00
nrf := getRollupFunc ( fe . Name )
if nrf == nil {
args , err := evalExprs ( ec , fe . Args )
if err != nil {
return nil , err
}
tf := getTransformFunc ( fe . Name )
if tf == nil {
return nil , fmt . Errorf ( ` unknown func %q ` , fe . Name )
}
tfa := & transformFuncArg {
ec : ec ,
fe : fe ,
args : args ,
}
rv , err := tf ( tfa )
if err != nil {
return nil , fmt . Errorf ( ` cannot evaluate %q: %s ` , fe . AppendString ( nil ) , err )
}
return rv , nil
}
args , re , err := evalRollupFuncArgs ( ec , fe )
if err != nil {
return nil , err
}
rf , err := nrf ( args )
if err != nil {
return nil , err
}
2020-01-03 18:42:51 +00:00
rv , err := evalRollupFunc ( ec , fe . Name , rf , e , re , nil )
2019-05-22 21:16:55 +00:00
if err != nil {
return nil , fmt . Errorf ( ` cannot evaluate %q: %s ` , fe . AppendString ( nil ) , err )
}
return rv , nil
}
2019-12-25 19:35:47 +00:00
if ae , ok := e . ( * metricsql . AggrFuncExpr ) ; ok {
2019-07-10 09:57:27 +00:00
if callbacks := getIncrementalAggrFuncCallbacks ( ae . Name ) ; callbacks != nil {
fe , nrf := tryGetArgRollupFuncWithMetricExpr ( ae )
if fe != nil {
2019-12-25 19:35:47 +00:00
// There is an optimized path for calculating metricsql.AggrFuncExpr over rollupFunc over metricsql.MetricExpr.
2019-07-10 09:57:27 +00:00
// The optimized path saves RAM for aggregates over big number of time series.
args , re , err := evalRollupFuncArgs ( ec , fe )
if err != nil {
return nil , err
}
rf , err := nrf ( args )
if err != nil {
return nil , err
}
iafc := newIncrementalAggrFuncContext ( ae , callbacks )
2020-01-03 18:42:51 +00:00
return evalRollupFunc ( ec , fe . Name , rf , e , re , iafc )
2019-07-10 09:57:27 +00:00
}
}
2019-05-22 21:16:55 +00:00
args , err := evalExprs ( ec , ae . Args )
if err != nil {
return nil , err
}
af := getAggrFunc ( ae . Name )
if af == nil {
return nil , fmt . Errorf ( ` unknown func %q ` , ae . Name )
}
afa := & aggrFuncArg {
ae : ae ,
args : args ,
ec : ec ,
}
rv , err := af ( afa )
if err != nil {
return nil , fmt . Errorf ( ` cannot evaluate %q: %s ` , ae . AppendString ( nil ) , err )
}
return rv , nil
}
2019-12-25 19:35:47 +00:00
if be , ok := e . ( * metricsql . BinaryOpExpr ) ; ok {
2019-05-22 21:16:55 +00:00
left , err := evalExpr ( ec , be . Left )
if err != nil {
return nil , err
}
right , err := evalExpr ( ec , be . Right )
if err != nil {
return nil , err
}
bf := getBinaryOpFunc ( be . Op )
if bf == nil {
return nil , fmt . Errorf ( ` unknown binary op %q ` , be . Op )
}
bfa := & binaryOpFuncArg {
be : be ,
left : left ,
right : right ,
}
rv , err := bf ( bfa )
if err != nil {
return nil , fmt . Errorf ( ` cannot evaluate %q: %s ` , be . AppendString ( nil ) , err )
}
return rv , nil
}
2019-12-25 19:35:47 +00:00
if ne , ok := e . ( * metricsql . NumberExpr ) ; ok {
2019-05-22 21:16:55 +00:00
rv := evalNumber ( ec , ne . N )
return rv , nil
}
2019-12-25 19:35:47 +00:00
if se , ok := e . ( * metricsql . StringExpr ) ; ok {
2019-05-22 21:16:55 +00:00
rv := evalString ( ec , se . S )
return rv , nil
}
return nil , fmt . Errorf ( "unexpected expression %q" , e . AppendString ( nil ) )
}
2019-12-25 19:35:47 +00:00
func tryGetArgRollupFuncWithMetricExpr ( ae * metricsql . AggrFuncExpr ) ( * metricsql . FuncExpr , newRollupFunc ) {
2019-07-10 09:57:27 +00:00
if len ( ae . Args ) != 1 {
return nil , nil
}
e := ae . Args [ 0 ]
// Make sure e contains one of the following:
// - metricExpr
// - metricExpr[d]
// - rollupFunc(metricExpr)
// - rollupFunc(metricExpr[d])
2019-12-25 19:35:47 +00:00
if me , ok := e . ( * metricsql . MetricExpr ) ; ok {
2019-07-10 09:57:27 +00:00
// e = metricExpr
if me . IsEmpty ( ) {
return nil , nil
}
2019-12-25 19:35:47 +00:00
fe := & metricsql . FuncExpr {
2019-07-10 09:57:27 +00:00
Name : "default_rollup" ,
2019-12-25 19:35:47 +00:00
Args : [ ] metricsql . Expr { me } ,
2019-07-10 09:57:27 +00:00
}
nrf := getRollupFunc ( fe . Name )
return fe , nrf
}
2019-12-25 19:35:47 +00:00
if re , ok := e . ( * metricsql . RollupExpr ) ; ok {
if me , ok := re . Expr . ( * metricsql . MetricExpr ) ; ! ok || me . IsEmpty ( ) || re . ForSubquery ( ) {
2019-07-10 09:57:27 +00:00
return nil , nil
}
2019-09-13 18:40:46 +00:00
// e = metricExpr[d]
2019-12-25 19:35:47 +00:00
fe := & metricsql . FuncExpr {
2019-07-10 09:57:27 +00:00
Name : "default_rollup" ,
2019-12-25 19:35:47 +00:00
Args : [ ] metricsql . Expr { re } ,
2019-07-10 09:57:27 +00:00
}
nrf := getRollupFunc ( fe . Name )
return fe , nrf
}
2019-12-25 19:35:47 +00:00
fe , ok := e . ( * metricsql . FuncExpr )
2019-07-10 09:57:27 +00:00
if ! ok {
return nil , nil
}
nrf := getRollupFunc ( fe . Name )
if nrf == nil {
return nil , nil
}
rollupArgIdx := getRollupArgIdx ( fe . Name )
2020-01-15 14:26:02 +00:00
if rollupArgIdx >= len ( fe . Args ) {
// Incorrect number of args for rollup func.
return nil , nil
}
2019-07-10 09:57:27 +00:00
arg := fe . Args [ rollupArgIdx ]
2019-12-25 19:35:47 +00:00
if me , ok := arg . ( * metricsql . MetricExpr ) ; ok {
2019-07-10 09:57:27 +00:00
if me . IsEmpty ( ) {
return nil , nil
}
2019-09-13 18:40:46 +00:00
// e = rollupFunc(metricExpr)
2019-12-25 19:35:47 +00:00
return & metricsql . FuncExpr {
2019-07-10 09:57:27 +00:00
Name : fe . Name ,
2019-12-25 19:35:47 +00:00
Args : [ ] metricsql . Expr { me } ,
2019-07-10 09:57:27 +00:00
} , nrf
}
2019-12-25 19:35:47 +00:00
if re , ok := arg . ( * metricsql . RollupExpr ) ; ok {
if me , ok := re . Expr . ( * metricsql . MetricExpr ) ; ! ok || me . IsEmpty ( ) || re . ForSubquery ( ) {
2019-07-10 09:57:27 +00:00
return nil , nil
}
2019-09-13 18:40:46 +00:00
// e = rollupFunc(metricExpr[d])
2019-07-10 09:57:27 +00:00
return fe , nrf
}
return nil , nil
}
2019-12-25 19:35:47 +00:00
func evalExprs ( ec * EvalConfig , es [ ] metricsql . Expr ) ( [ ] [ ] * timeseries , error ) {
2019-05-22 21:16:55 +00:00
var rvs [ ] [ ] * timeseries
for _ , e := range es {
rv , err := evalExpr ( ec , e )
if err != nil {
return nil , err
}
rvs = append ( rvs , rv )
}
return rvs , nil
}
2019-12-25 19:35:47 +00:00
func evalRollupFuncArgs ( ec * EvalConfig , fe * metricsql . FuncExpr ) ( [ ] interface { } , * metricsql . RollupExpr , error ) {
var re * metricsql . RollupExpr
2019-05-22 21:16:55 +00:00
rollupArgIdx := getRollupArgIdx ( fe . Name )
2020-01-10 19:16:14 +00:00
if len ( fe . Args ) <= rollupArgIdx {
2020-01-15 14:26:02 +00:00
return nil , nil , fmt . Errorf ( "expecting at least %d args to %q; got %d args; expr: %q" , rollupArgIdx + 1 , fe . Name , len ( fe . Args ) , fe . AppendString ( nil ) )
2020-01-10 19:16:14 +00:00
}
2019-05-22 21:16:55 +00:00
args := make ( [ ] interface { } , len ( fe . Args ) )
for i , arg := range fe . Args {
if i == rollupArgIdx {
re = getRollupExprArg ( arg )
args [ i ] = re
continue
}
ts , err := evalExpr ( ec , arg )
if err != nil {
return nil , nil , fmt . Errorf ( "cannot evaluate arg #%d for %q: %s" , i + 1 , fe . AppendString ( nil ) , err )
}
args [ i ] = ts
}
return args , re , nil
}
2019-12-25 19:35:47 +00:00
func getRollupExprArg ( arg metricsql . Expr ) * metricsql . RollupExpr {
re , ok := arg . ( * metricsql . RollupExpr )
2019-05-22 21:16:55 +00:00
if ! ok {
2019-12-25 19:35:47 +00:00
// Wrap non-rollup arg into metricsql.RollupExpr.
return & metricsql . RollupExpr {
2019-05-22 21:16:55 +00:00
Expr : arg ,
}
}
2019-09-13 18:40:46 +00:00
if ! re . ForSubquery ( ) {
// Return standard rollup if it doesn't contain subquery.
2019-05-22 21:16:55 +00:00
return re
}
2019-12-25 19:35:47 +00:00
me , ok := re . Expr . ( * metricsql . MetricExpr )
2019-05-22 21:16:55 +00:00
if ! ok {
// arg contains subquery.
return re
}
// Convert me[w:step] -> default_rollup(me)[w:step]
reNew := * re
2019-12-25 19:35:47 +00:00
reNew . Expr = & metricsql . FuncExpr {
2019-05-22 21:16:55 +00:00
Name : "default_rollup" ,
2019-12-25 19:35:47 +00:00
Args : [ ] metricsql . Expr {
& metricsql . RollupExpr { Expr : me } ,
2019-05-22 21:16:55 +00:00
} ,
}
return & reNew
}
2020-01-03 18:42:51 +00:00
func evalRollupFunc ( ec * EvalConfig , name string , rf rollupFunc , expr metricsql . Expr , re * metricsql . RollupExpr , iafc * incrementalAggrFuncContext ) ( [ ] * timeseries , error ) {
2019-05-22 21:16:55 +00:00
ecNew := ec
var offset int64
if len ( re . Offset ) > 0 {
var err error
2019-12-25 19:35:47 +00:00
offset , err = metricsql . DurationValue ( re . Offset , ec . Step )
2019-05-22 21:16:55 +00:00
if err != nil {
return nil , err
}
2020-02-04 21:23:37 +00:00
ecNew = newEvalConfig ( ecNew )
2019-05-22 21:16:55 +00:00
ecNew . Start -= offset
ecNew . End -= offset
2019-12-24 20:44:24 +00:00
if ecNew . MayCache {
start , end := AdjustStartEnd ( ecNew . Start , ecNew . End , ecNew . Step )
offset += ecNew . Start - start
ecNew . Start = start
ecNew . End = end
}
2019-05-22 21:16:55 +00:00
}
2020-02-04 21:31:06 +00:00
if name == "rollup_candlestick" {
2020-02-04 21:23:37 +00:00
// Automatically apply `offset -step` to `rollup_candlestick` function
// in order to obtain expected OHLC results.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/309#issuecomment-582113462
step := ecNew . Step
ecNew = newEvalConfig ( ecNew )
ecNew . Start += step
ecNew . End += step
offset -= step
}
2019-05-22 21:16:55 +00:00
var rvs [ ] * timeseries
var err error
2019-12-25 19:35:47 +00:00
if me , ok := re . Expr . ( * metricsql . MetricExpr ) ; ok {
2020-01-03 18:42:51 +00:00
rvs , err = evalRollupFuncWithMetricExpr ( ecNew , name , rf , expr , me , iafc , re . Window )
2019-05-22 21:16:55 +00:00
} else {
2019-07-10 09:57:27 +00:00
if iafc != nil {
logger . Panicf ( "BUG: iafc must be nil for rollup %q over subquery %q" , name , re . AppendString ( nil ) )
}
2020-01-10 19:16:14 +00:00
rvs , err = evalRollupFuncWithSubquery ( ecNew , name , rf , expr , re )
2019-05-22 21:16:55 +00:00
}
if err != nil {
return nil , err
}
if offset != 0 && len ( rvs ) > 0 {
// Make a copy of timestamps, since they may be used in other values.
srcTimestamps := rvs [ 0 ] . Timestamps
dstTimestamps := append ( [ ] int64 { } , srcTimestamps ... )
for i := range dstTimestamps {
dstTimestamps [ i ] += offset
}
for _ , ts := range rvs {
ts . Timestamps = dstTimestamps
}
}
return rvs , nil
}
2020-01-10 19:16:14 +00:00
func evalRollupFuncWithSubquery ( ec * EvalConfig , name string , rf rollupFunc , expr metricsql . Expr , re * metricsql . RollupExpr ) ( [ ] * timeseries , error ) {
2020-01-03 18:42:51 +00:00
// TODO: determine whether to use rollupResultCacheV here.
2019-05-22 21:16:55 +00:00
var step int64
if len ( re . Step ) > 0 {
var err error
2019-12-25 19:35:47 +00:00
step , err = metricsql . PositiveDurationValue ( re . Step , ec . Step )
2019-05-22 21:16:55 +00:00
if err != nil {
return nil , err
}
} else {
step = ec . Step
}
var window int64
if len ( re . Window ) > 0 {
var err error
2019-12-25 19:35:47 +00:00
window , err = metricsql . PositiveDurationValue ( re . Window , ec . Step )
2019-05-22 21:16:55 +00:00
if err != nil {
return nil , err
}
}
ecSQ := newEvalConfig ( ec )
2019-06-21 18:50:44 +00:00
ecSQ . Start -= window + maxSilenceInterval + step
2019-05-22 21:16:55 +00:00
ecSQ . Step = step
if err := ValidateMaxPointsPerTimeseries ( ecSQ . Start , ecSQ . End , ecSQ . Step ) ; err != nil {
return nil , err
}
ecSQ . Start , ecSQ . End = AdjustStartEnd ( ecSQ . Start , ecSQ . End , ecSQ . Step )
tssSQ , err := evalExpr ( ecSQ , re . Expr )
if err != nil {
return nil , err
}
2020-01-03 22:46:39 +00:00
if len ( tssSQ ) == 0 {
if name == "absent_over_time" {
tss := evalNumber ( ec , 1 )
return tss , nil
}
return nil , nil
}
2019-05-22 21:16:55 +00:00
sharedTimestamps := getTimestamps ( ec . Start , ec . End , ec . Step )
2020-01-10 19:16:14 +00:00
preFunc , rcs , err := getRollupConfigs ( name , rf , expr , ec . Start , ec . End , ec . Step , window , ec . LookbackDelta , sharedTimestamps )
if err != nil {
return nil , err
}
2019-05-22 21:16:55 +00:00
tss := make ( [ ] * timeseries , 0 , len ( tssSQ ) * len ( rcs ) )
var tssLock sync . Mutex
2019-07-25 18:48:12 +00:00
removeMetricGroup := ! rollupFuncsKeepMetricGroup [ name ]
2019-05-22 21:16:55 +00:00
doParallel ( tssSQ , func ( tsSQ * timeseries , values [ ] float64 , timestamps [ ] int64 ) ( [ ] float64 , [ ] int64 ) {
values , timestamps = removeNanValues ( values [ : 0 ] , timestamps [ : 0 ] , tsSQ . Values , tsSQ . Timestamps )
preFunc ( values , timestamps )
for _ , rc := range rcs {
2020-01-03 21:50:47 +00:00
if tsm := newTimeseriesMap ( name , sharedTimestamps , & tsSQ . MetricName ) ; tsm != nil {
rc . DoTimeseriesMap ( tsm , values , timestamps )
tssLock . Lock ( )
tss = tsm . AppendTimeseriesTo ( tss )
tssLock . Unlock ( )
continue
}
2019-05-22 21:16:55 +00:00
var ts timeseries
2019-07-25 18:48:12 +00:00
doRollupForTimeseries ( rc , & ts , & tsSQ . MetricName , values , timestamps , sharedTimestamps , removeMetricGroup )
2019-05-22 21:16:55 +00:00
tssLock . Lock ( )
tss = append ( tss , & ts )
tssLock . Unlock ( )
}
return values , timestamps
} )
return tss , nil
}
func doParallel ( tss [ ] * timeseries , f func ( ts * timeseries , values [ ] float64 , timestamps [ ] int64 ) ( [ ] float64 , [ ] int64 ) ) {
concurrency := runtime . GOMAXPROCS ( - 1 )
if concurrency > len ( tss ) {
concurrency = len ( tss )
}
workCh := make ( chan * timeseries , concurrency )
var wg sync . WaitGroup
wg . Add ( concurrency )
for i := 0 ; i < concurrency ; i ++ {
go func ( ) {
defer wg . Done ( )
var tmpValues [ ] float64
var tmpTimestamps [ ] int64
for ts := range workCh {
tmpValues , tmpTimestamps = f ( ts , tmpValues , tmpTimestamps )
}
} ( )
}
for _ , ts := range tss {
workCh <- ts
}
close ( workCh )
wg . Wait ( )
}
func removeNanValues ( dstValues [ ] float64 , dstTimestamps [ ] int64 , values [ ] float64 , timestamps [ ] int64 ) ( [ ] float64 , [ ] int64 ) {
hasNan := false
for _ , v := range values {
if math . IsNaN ( v ) {
hasNan = true
}
}
if ! hasNan {
// Fast path - no NaNs.
dstValues = append ( dstValues , values ... )
dstTimestamps = append ( dstTimestamps , timestamps ... )
return dstValues , dstTimestamps
}
// Slow path - remove NaNs.
for i , v := range values {
if math . IsNaN ( v ) {
continue
}
dstValues = append ( dstValues , v )
dstTimestamps = append ( dstTimestamps , timestamps [ i ] )
}
return dstValues , dstTimestamps
}
var (
rollupResultCacheFullHits = metrics . NewCounter ( ` vm_rollup_result_cache_full_hits_total ` )
rollupResultCachePartialHits = metrics . NewCounter ( ` vm_rollup_result_cache_partial_hits_total ` )
rollupResultCacheMiss = metrics . NewCounter ( ` vm_rollup_result_cache_miss_total ` )
)
2020-01-03 18:42:51 +00:00
func evalRollupFuncWithMetricExpr ( ec * EvalConfig , name string , rf rollupFunc ,
expr metricsql . Expr , me * metricsql . MetricExpr , iafc * incrementalAggrFuncContext , windowStr string ) ( [ ] * timeseries , error ) {
2019-07-10 09:57:27 +00:00
if me . IsEmpty ( ) {
return evalNumber ( ec , nan ) , nil
}
var window int64
if len ( windowStr ) > 0 {
var err error
2019-12-25 19:35:47 +00:00
window , err = metricsql . PositiveDurationValue ( windowStr , ec . Step )
2019-07-10 09:57:27 +00:00
if err != nil {
return nil , err
}
}
2019-05-22 21:16:55 +00:00
// Search for partial results in cache.
2020-01-03 18:42:51 +00:00
tssCached , start := rollupResultCacheV . Get ( ec , expr , window )
2019-05-22 21:16:55 +00:00
if start > ec . End {
// The result is fully cached.
rollupResultCacheFullHits . Inc ( )
return tssCached , nil
}
if start > ec . Start {
rollupResultCachePartialHits . Inc ( )
} else {
rollupResultCacheMiss . Inc ( )
}
2020-01-11 12:40:32 +00:00
// Obtain rollup configs before fetching data from db,
// so type errors can be caught earlier.
sharedTimestamps := getTimestamps ( start , ec . End , ec . Step )
preFunc , rcs , err := getRollupConfigs ( name , rf , expr , start , ec . End , ec . Step , window , ec . LookbackDelta , sharedTimestamps )
if err != nil {
return nil , err
}
2019-05-22 21:16:55 +00:00
// Fetch the remaining part of the result.
2019-12-25 19:35:47 +00:00
tfs := toTagFilters ( me . LabelFilters )
2020-02-05 17:20:54 +00:00
minTimestamp := start - maxSilenceInterval
if window > ec . Step {
minTimestamp -= window
} else {
minTimestamp -= ec . Step
}
2019-05-22 21:16:55 +00:00
sq := & storage . SearchQuery {
2020-02-05 17:20:54 +00:00
MinTimestamp : minTimestamp ,
MaxTimestamp : ec . End ,
2019-12-25 19:35:47 +00:00
TagFilterss : [ ] [ ] storage . TagFilter { tfs } ,
2019-05-22 21:16:55 +00:00
}
2019-08-04 19:15:33 +00:00
rss , err := netstorage . ProcessSearchQuery ( sq , true , ec . Deadline )
2019-05-22 21:16:55 +00:00
if err != nil {
return nil , err
}
rssLen := rss . Len ( )
if rssLen == 0 {
rss . Cancel ( )
2020-01-03 22:46:39 +00:00
var tss [ ] * timeseries
if name == "absent_over_time" {
tss = getAbsentTimeseries ( ec , me )
}
2019-05-22 21:16:55 +00:00
// Add missing points until ec.End.
// Do not cache the result, since missing points
// may be backfilled in the future.
2020-01-03 22:46:39 +00:00
tss = mergeTimeseries ( tssCached , tss , start , ec )
2019-05-22 21:16:55 +00:00
return tss , nil
}
// Verify timeseries fit available memory after the rollup.
// Take into account points from tssCached.
pointsPerTimeseries := 1 + ( ec . End - ec . Start ) / ec . Step
2019-11-08 16:45:25 +00:00
timeseriesLen := rssLen
if iafc != nil {
// Incremental aggregates require hold only GOMAXPROCS timeseries in memory.
timeseriesLen = runtime . GOMAXPROCS ( - 1 )
if iafc . ae . Modifier . Op != "" {
// Increase the number of timeseries for non-empty group list: `aggr() by (something)`,
// since each group can have own set of time series in memory.
2020-04-28 09:51:36 +00:00
timeseriesLen *= * estimatedSeriesCountAfterAggregation
2019-11-08 16:45:25 +00:00
}
}
rollupPoints := mulNoOverflow ( pointsPerTimeseries , int64 ( timeseriesLen * len ( rcs ) ) )
2019-06-12 19:25:37 +00:00
rollupMemorySize := mulNoOverflow ( rollupPoints , 16 )
rml := getRollupMemoryLimiter ( )
if ! rml . Get ( uint64 ( rollupMemorySize ) ) {
2019-05-22 21:16:55 +00:00
rss . Cancel ( )
2019-06-12 19:25:37 +00:00
return nil , fmt . Errorf ( "not enough memory for processing %d data points across %d time series with %d points in each time series; " +
"possible solutions are: reducing the number of matching time series; switching to node with more RAM; " +
2020-04-28 09:51:36 +00:00
"increasing -memory.allowedPercent; increasing `step` query arg (%gs); reducing -search.estimatedSeriesCountAfterAggregation" ,
rollupPoints , timeseriesLen * len ( rcs ) , pointsPerTimeseries , float64 ( ec . Step ) / 1e3 )
2019-05-22 21:16:55 +00:00
}
2019-06-12 19:25:37 +00:00
defer rml . Put ( uint64 ( rollupMemorySize ) )
2019-05-22 21:16:55 +00:00
// Evaluate rollup
2019-07-25 18:48:12 +00:00
removeMetricGroup := ! rollupFuncsKeepMetricGroup [ name ]
2019-07-10 09:57:27 +00:00
var tss [ ] * timeseries
if iafc != nil {
2020-01-03 21:50:47 +00:00
tss , err = evalRollupWithIncrementalAggregate ( name , iafc , rss , rcs , preFunc , sharedTimestamps , removeMetricGroup )
2019-07-10 09:57:27 +00:00
} else {
2020-01-03 21:50:47 +00:00
tss , err = evalRollupNoIncrementalAggregate ( name , rss , rcs , preFunc , sharedTimestamps , removeMetricGroup )
2019-07-10 09:57:27 +00:00
}
2019-05-22 21:16:55 +00:00
if err != nil {
return nil , err
}
tss = mergeTimeseries ( tssCached , tss , start , ec )
2020-01-03 18:42:51 +00:00
rollupResultCacheV . Put ( ec , expr , window , tss )
2019-05-22 21:16:55 +00:00
return tss , nil
}
2019-06-12 19:25:37 +00:00
var (
rollupMemoryLimiter memoryLimiter
rollupMemoryLimiterOnce sync . Once
)
func getRollupMemoryLimiter ( ) * memoryLimiter {
rollupMemoryLimiterOnce . Do ( func ( ) {
rollupMemoryLimiter . MaxSize = uint64 ( memory . Allowed ( ) ) / 4
} )
return & rollupMemoryLimiter
}
2020-01-03 21:50:47 +00:00
func evalRollupWithIncrementalAggregate ( name string , iafc * incrementalAggrFuncContext , rss * netstorage . Results , rcs [ ] * rollupConfig ,
2019-07-25 18:48:12 +00:00
preFunc func ( values [ ] float64 , timestamps [ ] int64 ) , sharedTimestamps [ ] int64 , removeMetricGroup bool ) ( [ ] * timeseries , error ) {
2019-07-12 12:51:02 +00:00
err := rss . RunParallel ( func ( rs * netstorage . Result , workerID uint ) {
2019-07-10 09:57:27 +00:00
preFunc ( rs . Values , rs . Timestamps )
ts := getTimeseries ( )
defer putTimeseries ( ts )
for _ , rc := range rcs {
2020-01-03 21:50:47 +00:00
if tsm := newTimeseriesMap ( name , sharedTimestamps , & rs . MetricName ) ; tsm != nil {
rc . DoTimeseriesMap ( tsm , rs . Values , rs . Timestamps )
for _ , ts := range tsm . m {
iafc . updateTimeseries ( ts , workerID )
}
continue
}
2019-07-10 09:57:27 +00:00
ts . Reset ( )
2019-07-25 18:48:12 +00:00
doRollupForTimeseries ( rc , ts , & rs . MetricName , rs . Values , rs . Timestamps , sharedTimestamps , removeMetricGroup )
2019-07-12 12:51:02 +00:00
iafc . updateTimeseries ( ts , workerID )
2019-07-25 18:48:12 +00:00
// ts.Timestamps points to sharedTimestamps. Zero it, so it can be re-used.
2019-07-10 09:57:27 +00:00
ts . Timestamps = nil
2019-07-25 18:48:12 +00:00
ts . denyReuse = false
2019-07-10 09:57:27 +00:00
}
} )
if err != nil {
return nil , err
}
tss := iafc . finalizeTimeseries ( )
return tss , nil
}
2020-01-03 21:50:47 +00:00
func evalRollupNoIncrementalAggregate ( name string , rss * netstorage . Results , rcs [ ] * rollupConfig ,
2019-07-25 18:48:12 +00:00
preFunc func ( values [ ] float64 , timestamps [ ] int64 ) , sharedTimestamps [ ] int64 , removeMetricGroup bool ) ( [ ] * timeseries , error ) {
2019-07-10 09:57:27 +00:00
tss := make ( [ ] * timeseries , 0 , rss . Len ( ) * len ( rcs ) )
var tssLock sync . Mutex
2019-07-12 12:51:02 +00:00
err := rss . RunParallel ( func ( rs * netstorage . Result , workerID uint ) {
2019-07-10 09:57:27 +00:00
preFunc ( rs . Values , rs . Timestamps )
for _ , rc := range rcs {
2020-01-03 21:50:47 +00:00
if tsm := newTimeseriesMap ( name , sharedTimestamps , & rs . MetricName ) ; tsm != nil {
rc . DoTimeseriesMap ( tsm , rs . Values , rs . Timestamps )
tssLock . Lock ( )
tss = tsm . AppendTimeseriesTo ( tss )
tssLock . Unlock ( )
continue
}
2019-07-10 09:57:27 +00:00
var ts timeseries
2019-07-25 18:48:12 +00:00
doRollupForTimeseries ( rc , & ts , & rs . MetricName , rs . Values , rs . Timestamps , sharedTimestamps , removeMetricGroup )
2019-07-10 09:57:27 +00:00
tssLock . Lock ( )
tss = append ( tss , & ts )
tssLock . Unlock ( )
}
} )
if err != nil {
return nil , err
}
return tss , nil
}
2019-07-25 18:48:12 +00:00
func doRollupForTimeseries ( rc * rollupConfig , tsDst * timeseries , mnSrc * storage . MetricName , valuesSrc [ ] float64 , timestampsSrc [ ] int64 ,
sharedTimestamps [ ] int64 , removeMetricGroup bool ) {
tsDst . MetricName . CopyFrom ( mnSrc )
if len ( rc . TagValue ) > 0 {
tsDst . MetricName . AddTag ( "rollup" , rc . TagValue )
}
if removeMetricGroup {
tsDst . MetricName . ResetMetricGroup ( )
}
tsDst . Values = rc . Do ( tsDst . Values [ : 0 ] , valuesSrc , timestampsSrc )
tsDst . Timestamps = sharedTimestamps
tsDst . denyReuse = true
}
2019-05-22 21:16:55 +00:00
var bbPool bytesutil . ByteBufferPool
func evalNumber ( ec * EvalConfig , n float64 ) [ ] * timeseries {
var ts timeseries
ts . denyReuse = true
timestamps := ec . getSharedTimestamps ( )
values := make ( [ ] float64 , len ( timestamps ) )
for i := range timestamps {
values [ i ] = n
}
ts . Values = values
ts . Timestamps = timestamps
return [ ] * timeseries { & ts }
}
func evalString ( ec * EvalConfig , s string ) [ ] * timeseries {
rv := evalNumber ( ec , nan )
rv [ 0 ] . MetricName . MetricGroup = append ( rv [ 0 ] . MetricName . MetricGroup [ : 0 ] , s ... )
return rv
}
func evalTime ( ec * EvalConfig ) [ ] * timeseries {
rv := evalNumber ( ec , nan )
timestamps := rv [ 0 ] . Timestamps
values := rv [ 0 ] . Values
for i , ts := range timestamps {
values [ i ] = float64 ( ts ) * 1e-3
}
return rv
}
2019-06-12 19:25:37 +00:00
func mulNoOverflow ( a , b int64 ) int64 {
if math . MaxInt64 / b < a {
// Overflow
return math . MaxInt64
}
return a * b
}
2019-12-25 19:35:47 +00:00
func toTagFilters ( lfs [ ] metricsql . LabelFilter ) [ ] storage . TagFilter {
tfs := make ( [ ] storage . TagFilter , len ( lfs ) )
for i := range lfs {
toTagFilter ( & tfs [ i ] , & lfs [ i ] )
}
return tfs
}
func toTagFilter ( dst * storage . TagFilter , src * metricsql . LabelFilter ) {
if src . Label != "__name__" {
dst . Key = [ ] byte ( src . Label )
} else {
// This is required for storage.Search.
dst . Key = nil
}
dst . Value = [ ] byte ( src . Value )
dst . IsRegexp = src . IsRegexp
dst . IsNegative = src . IsNegative
}