2019-05-22 21:16:55 +00:00
package promql
import (
"flag"
"fmt"
"math"
2022-01-31 17:32:36 +00:00
"regexp"
"sort"
2021-09-17 20:33:15 +00:00
"strings"
2019-05-22 21:16:55 +00:00
"sync"
2022-06-28 16:26:17 +00:00
"sync/atomic"
2023-11-01 15:42:51 +00:00
"time"
2023-03-25 06:07:12 +00:00
"unsafe"
2019-05-22 21:16:55 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/netstorage"
2020-09-11 10:18:57 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/searchutils"
2019-05-22 21:16:55 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
2020-12-08 18:49:32 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
2021-08-15 10:20:02 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal"
2023-10-31 18:24:18 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
2022-10-07 22:07:42 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
2019-05-22 21:16:55 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
2022-05-31 23:29:19 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/querytracer"
2019-05-22 21:16:55 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
2023-11-11 11:30:08 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/stringsutil"
2019-05-22 21:16:55 +00:00
"github.com/VictoriaMetrics/metrics"
2020-04-28 12:28:22 +00:00
"github.com/VictoriaMetrics/metricsql"
2019-05-22 21:16:55 +00:00
)
var (
2024-02-08 12:47:21 +00:00
disableCache = flag . Bool ( "search.disableCache" , false , "Whether to disable response caching. This may be useful when ingesting historical data. " +
"See https://docs.victoriametrics.com/#backfilling . See also -search.resetRollupResultCacheOnStartup" )
2022-08-24 12:25:18 +00:00
maxPointsSubqueryPerTimeseries = flag . Int ( "search.maxPointsSubqueryPerTimeseries" , 100e3 , "The maximum number of points per series, which can be generated by subquery. " +
"See https://valyala.medium.com/prometheus-subqueries-in-victoriametrics-9b1492b720b3" )
2022-10-07 22:07:42 +00:00
maxMemoryPerQuery = flagutil . NewBytes ( "search.maxMemoryPerQuery" , 0 , "The maximum amounts of memory a single query may consume. " +
2022-10-10 18:43:36 +00:00
"Queries requiring more memory are rejected. The total memory limit for concurrently executed queries can be estimated " +
2023-02-24 02:40:31 +00:00
"as -search.maxMemoryPerQuery multiplied by -search.maxConcurrentRequests . " +
"See also -search.logQueryMemoryUsage" )
2023-10-31 17:58:42 +00:00
logQueryMemoryUsage = flagutil . NewBytes ( "search.logQueryMemoryUsage" , 0 , "Log query and increment vm_memory_intensive_queries_total metric each time " +
"the query requires more memory than specified by this flag. " +
2023-02-24 02:40:31 +00:00
"This may help detecting and optimizing heavy queries. Query logging is disabled by default. " +
"See also -search.logSlowQueryDuration and -search.maxMemoryPerQuery" )
2022-10-07 22:07:42 +00:00
noStaleMarkers = flag . Bool ( "search.noStaleMarkers" , false , "Set this flag to true if the database doesn't contain Prometheus stale markers, " +
"so there is no need in spending additional CPU time on its handling. Staleness markers may exist only in data obtained from Prometheus scrape targets" )
2023-11-11 11:09:25 +00:00
minWindowForInstantRollupOptimization = flagutil . NewDuration ( "search.minWindowForInstantRollupOptimization" , "3h" , "Enable cache-based optimization for repeated queries " +
2023-11-02 18:47:36 +00:00
"to /api/v1/query (aka instant queries), which contain rollup functions with lookbehind window exceeding the given value" )
2019-05-22 21:16:55 +00:00
)
2019-05-25 18:51:11 +00:00
// The minimum number of points per timeseries for enabling time rounding.
2019-05-22 21:16:55 +00:00
// This improves cache hit ratio for frequently requested queries over
// big time ranges.
const minTimeseriesPointsForTimeRounding = 50
2022-08-24 12:25:18 +00:00
// ValidateMaxPointsPerSeries validates that the number of points for the given start, end and step do not exceed maxPoints.
func ValidateMaxPointsPerSeries ( start , end , step int64 , maxPoints int ) error {
if step == 0 {
return fmt . Errorf ( "step can't be equal to zero" )
}
2019-05-22 21:16:55 +00:00
points := ( end - start ) / step + 1
2022-08-24 12:25:18 +00:00
if points > int64 ( maxPoints ) {
2022-09-06 10:25:59 +00:00
return fmt . Errorf ( "too many points for the given start=%d, end=%d and step=%d: %d; the maximum number of points is %d" ,
2022-08-24 12:25:18 +00:00
start , end , step , points , maxPoints )
2019-05-22 21:16:55 +00:00
}
return nil
}
// AdjustStartEnd adjusts start and end values, so response caching may be enabled.
//
2023-10-31 18:24:18 +00:00
// See EvalConfig.mayCache() for details.
2019-05-22 21:16:55 +00:00
func AdjustStartEnd ( start , end , step int64 ) ( int64 , int64 ) {
2020-07-30 20:14:15 +00:00
if * disableCache {
// Do not adjust start and end values when cache is disabled.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/563
return start , end
}
2019-05-22 21:16:55 +00:00
points := ( end - start ) / step + 1
if points < minTimeseriesPointsForTimeRounding {
// Too small number of points for rounding.
return start , end
}
// Round start and end to values divisible by step in order
// to enable response caching (see EvalConfig.mayCache).
2020-09-03 10:21:51 +00:00
start , end = alignStartEnd ( start , end , step )
2019-12-24 20:44:24 +00:00
// Make sure that the new number of points is the same as the initial number of points.
newPoints := ( end - start ) / step + 1
for newPoints > points {
end -= step
newPoints --
}
2019-05-22 21:16:55 +00:00
return start , end
}
2020-09-03 10:21:51 +00:00
func alignStartEnd ( start , end , step int64 ) ( int64 , int64 ) {
// Round start to the nearest smaller value divisible by step.
start -= start % step
// Round end to the nearest bigger value divisible by step.
adjust := end % step
if adjust > 0 {
end += step - adjust
}
return start , end
}
2019-05-22 21:16:55 +00:00
// EvalConfig is the configuration required for query evaluation via Exec
type EvalConfig struct {
Start int64
End int64
Step int64
2022-03-26 08:17:37 +00:00
// MaxSeries is the maximum number of time series, which can be scanned by the query.
// Zero means 'no limit'
MaxSeries int
2022-08-24 12:25:18 +00:00
// MaxPointsPerSeries is the limit on the number of points, which can be generated per each returned time series.
MaxPointsPerSeries int
2020-07-31 15:00:21 +00:00
// QuotedRemoteAddr contains quoted remote address.
QuotedRemoteAddr string
2020-09-11 10:18:57 +00:00
Deadline searchutils . Deadline
2019-05-22 21:16:55 +00:00
2022-05-31 23:29:19 +00:00
// Whether the response can be cached.
2019-05-22 21:16:55 +00:00
MayCache bool
2019-10-15 16:12:27 +00:00
// LookbackDelta is analog to `-query.lookback-delta` from Prometheus.
LookbackDelta int64
2021-03-15 10:35:44 +00:00
// How many decimal digits after the point to leave in response.
RoundDigits int
2021-02-01 16:04:14 +00:00
2021-12-06 15:07:06 +00:00
// EnforcedTagFilterss may contain additional label filters to use in the query.
EnforcedTagFilterss [ ] [ ] storage . TagFilter
2021-03-15 10:35:44 +00:00
2023-02-24 02:40:31 +00:00
// The callback, which returns the request URI during logging.
// The request URI isn't stored here because its' construction may take non-trivial amounts of CPU.
GetRequestURI func ( ) string
2023-03-27 22:11:40 +00:00
// QueryStats contains various stats for the currently executed query.
//
2023-11-01 15:42:51 +00:00
// The caller must initialize QueryStats, otherwise it isn't collected.
2023-03-27 22:11:40 +00:00
QueryStats * QueryStats
2021-03-15 10:35:44 +00:00
timestamps [ ] int64
timestampsOnce sync . Once
2019-05-22 21:16:55 +00:00
}
2022-03-26 08:17:37 +00:00
// copyEvalConfig returns src copy.
func copyEvalConfig ( src * EvalConfig ) * EvalConfig {
2019-05-22 21:16:55 +00:00
var ec EvalConfig
ec . Start = src . Start
ec . End = src . End
ec . Step = src . Step
2022-03-26 08:17:37 +00:00
ec . MaxSeries = src . MaxSeries
2022-08-24 12:25:18 +00:00
ec . MaxPointsPerSeries = src . MaxPointsPerSeries
2019-05-22 21:16:55 +00:00
ec . Deadline = src . Deadline
ec . MayCache = src . MayCache
2019-10-15 16:12:27 +00:00
ec . LookbackDelta = src . LookbackDelta
2021-03-15 10:35:44 +00:00
ec . RoundDigits = src . RoundDigits
2021-12-06 15:07:06 +00:00
ec . EnforcedTagFilterss = src . EnforcedTagFilterss
2023-02-24 02:40:31 +00:00
ec . GetRequestURI = src . GetRequestURI
2023-03-27 22:11:40 +00:00
ec . QueryStats = src . QueryStats
2023-03-27 15:51:33 +00:00
2019-05-22 21:16:55 +00:00
// do not copy src.timestamps - they must be generated again.
return & ec
}
2023-03-27 22:11:40 +00:00
// QueryStats contains various stats for the query.
type QueryStats struct {
// SeriesFetched contains the number of series fetched from storage during the query evaluation.
2023-11-01 15:42:51 +00:00
SeriesFetched int64
// ExecutionTimeMsec contains the number of milliseconds the query took to execute.
ExecutionTimeMsec int64
2023-03-27 15:51:33 +00:00
}
2023-03-27 22:11:40 +00:00
func ( qs * QueryStats ) addSeriesFetched ( n int ) {
2023-11-01 15:42:51 +00:00
if qs == nil {
return
2023-03-27 15:51:33 +00:00
}
2023-11-01 15:42:51 +00:00
atomic . AddInt64 ( & qs . SeriesFetched , int64 ( n ) )
}
func ( qs * QueryStats ) addExecutionTimeMsec ( startTime time . Time ) {
if qs == nil {
return
}
d := time . Since ( startTime ) . Milliseconds ( )
atomic . AddInt64 ( & qs . ExecutionTimeMsec , d )
2023-03-27 15:51:33 +00:00
}
2019-05-22 21:16:55 +00:00
func ( ec * EvalConfig ) validate ( ) {
if ec . Start > ec . End {
logger . Panicf ( "BUG: start cannot exceed end; got %d vs %d" , ec . Start , ec . End )
}
if ec . Step <= 0 {
logger . Panicf ( "BUG: step must be greater than 0; got %d" , ec . Step )
}
}
func ( ec * EvalConfig ) mayCache ( ) bool {
2020-07-30 20:14:15 +00:00
if * disableCache {
return false
}
2019-05-22 21:16:55 +00:00
if ! ec . MayCache {
return false
}
2023-10-31 18:24:18 +00:00
if ec . Start == ec . End {
// There is no need in aligning start and end to step for instant query
// in order to cache its results.
return true
}
2019-05-22 21:16:55 +00:00
if ec . Start % ec . Step != 0 {
return false
}
if ec . End % ec . Step != 0 {
return false
}
return true
}
2022-06-27 10:32:47 +00:00
func ( ec * EvalConfig ) timeRangeString ( ) string {
start := storage . TimestampToHumanReadableFormat ( ec . Start )
end := storage . TimestampToHumanReadableFormat ( ec . End )
return fmt . Sprintf ( "[%s..%s]" , start , end )
}
2019-05-22 21:16:55 +00:00
func ( ec * EvalConfig ) getSharedTimestamps ( ) [ ] int64 {
ec . timestampsOnce . Do ( ec . timestampsInit )
return ec . timestamps
}
func ( ec * EvalConfig ) timestampsInit ( ) {
2022-08-24 12:25:18 +00:00
ec . timestamps = getTimestamps ( ec . Start , ec . End , ec . Step , ec . MaxPointsPerSeries )
2019-05-22 21:16:55 +00:00
}
2022-08-24 12:25:18 +00:00
func getTimestamps ( start , end , step int64 , maxPointsPerSeries int ) [ ] int64 {
2019-05-22 21:16:55 +00:00
// Sanity checks.
if step <= 0 {
logger . Panicf ( "BUG: Step must be bigger than 0; got %d" , step )
}
if start > end {
logger . Panicf ( "BUG: Start cannot exceed End; got %d vs %d" , start , end )
}
2022-08-24 12:25:18 +00:00
if err := ValidateMaxPointsPerSeries ( start , end , step , maxPointsPerSeries ) ; err != nil {
2019-05-22 21:16:55 +00:00
logger . Panicf ( "BUG: %s; this must be validated before the call to getTimestamps" , err )
}
// Prepare timestamps.
points := 1 + ( end - start ) / step
timestamps := make ( [ ] int64 , points )
for i := range timestamps {
timestamps [ i ] = start
start += step
}
return timestamps
}
2022-05-31 23:29:19 +00:00
func evalExpr ( qt * querytracer . Tracer , ec * EvalConfig , e metricsql . Expr ) ( [ ] * timeseries , error ) {
2022-06-08 18:05:17 +00:00
if qt . Enabled ( ) {
2022-06-30 15:17:07 +00:00
query := string ( e . AppendString ( nil ) )
2023-11-11 11:30:08 +00:00
query = stringsutil . LimitStringLen ( query , 300 )
2022-06-08 18:05:17 +00:00
mayCache := ec . mayCache ( )
2022-06-27 10:32:47 +00:00
qt = qt . NewChild ( "eval: query=%s, timeRange=%s, step=%d, mayCache=%v" , query , ec . timeRangeString ( ) , ec . Step , mayCache )
2022-06-08 18:05:17 +00:00
}
2022-05-31 23:29:19 +00:00
rv , err := evalExprInternal ( qt , ec , e )
if err != nil {
return nil , err
}
if qt . Enabled ( ) {
seriesCount := len ( rv )
pointsPerSeries := 0
if len ( rv ) > 0 {
pointsPerSeries = len ( rv [ 0 ] . Timestamps )
}
pointsCount := seriesCount * pointsPerSeries
2022-06-08 18:05:17 +00:00
qt . Donef ( "series=%d, points=%d, pointsPerSeries=%d" , seriesCount , pointsCount , pointsPerSeries )
2022-05-31 23:29:19 +00:00
}
return rv , nil
}
func evalExprInternal ( qt * querytracer . Tracer , ec * EvalConfig , e metricsql . Expr ) ( [ ] * timeseries , error ) {
2019-12-25 19:35:47 +00:00
if me , ok := e . ( * metricsql . MetricExpr ) ; ok {
re := & metricsql . RollupExpr {
2019-05-22 21:16:55 +00:00
Expr : me ,
}
2022-05-31 23:29:19 +00:00
rv , err := evalRollupFunc ( qt , ec , "default_rollup" , rollupDefault , e , re , nil )
2019-05-22 21:16:55 +00:00
if err != nil {
2020-06-30 19:58:18 +00:00
return nil , fmt . Errorf ( ` cannot evaluate %q: %w ` , me . AppendString ( nil ) , err )
2019-05-22 21:16:55 +00:00
}
return rv , nil
}
2019-12-25 19:35:47 +00:00
if re , ok := e . ( * metricsql . RollupExpr ) ; ok {
2022-05-31 23:29:19 +00:00
rv , err := evalRollupFunc ( qt , ec , "default_rollup" , rollupDefault , e , re , nil )
2019-05-22 21:16:55 +00:00
if err != nil {
2020-06-30 19:58:18 +00:00
return nil , fmt . Errorf ( ` cannot evaluate %q: %w ` , re . AppendString ( nil ) , err )
2019-05-22 21:16:55 +00:00
}
return rv , nil
}
2019-12-25 19:35:47 +00:00
if fe , ok := e . ( * metricsql . FuncExpr ) ; ok {
2019-05-22 21:16:55 +00:00
nrf := getRollupFunc ( fe . Name )
if nrf == nil {
2022-06-08 18:05:17 +00:00
qtChild := qt . NewChild ( "transform %s()" , fe . Name )
2022-05-31 23:29:19 +00:00
rv , err := evalTransformFunc ( qtChild , ec , fe )
2022-06-08 18:05:17 +00:00
qtChild . Donef ( "series=%d" , len ( rv ) )
2022-05-31 23:29:19 +00:00
return rv , err
2019-05-22 21:16:55 +00:00
}
2022-05-31 23:29:19 +00:00
args , re , err := evalRollupFuncArgs ( qt , ec , fe )
2019-05-22 21:16:55 +00:00
if err != nil {
return nil , err
}
rf , err := nrf ( args )
if err != nil {
return nil , err
}
2022-05-31 23:29:19 +00:00
rv , err := evalRollupFunc ( qt , ec , fe . Name , rf , e , re , nil )
2019-05-22 21:16:55 +00:00
if err != nil {
2020-06-30 19:58:18 +00:00
return nil , fmt . Errorf ( ` cannot evaluate %q: %w ` , fe . AppendString ( nil ) , err )
2019-05-22 21:16:55 +00:00
}
return rv , nil
}
2019-12-25 19:35:47 +00:00
if ae , ok := e . ( * metricsql . AggrFuncExpr ) ; ok {
2022-06-08 18:05:17 +00:00
qtChild := qt . NewChild ( "aggregate %s()" , ae . Name )
2022-05-31 23:29:19 +00:00
rv , err := evalAggrFunc ( qtChild , ec , ae )
2022-06-08 18:05:17 +00:00
qtChild . Donef ( "series=%d" , len ( rv ) )
2022-05-31 23:29:19 +00:00
return rv , err
2019-05-22 21:16:55 +00:00
}
2019-12-25 19:35:47 +00:00
if be , ok := e . ( * metricsql . BinaryOpExpr ) ; ok {
2022-06-08 18:05:17 +00:00
qtChild := qt . NewChild ( "binary op %q" , be . Op )
2022-05-31 23:29:19 +00:00
rv , err := evalBinaryOp ( qtChild , ec , be )
2022-06-08 18:05:17 +00:00
qtChild . Donef ( "series=%d" , len ( rv ) )
2022-05-31 23:29:19 +00:00
return rv , err
2019-05-22 21:16:55 +00:00
}
2019-12-25 19:35:47 +00:00
if ne , ok := e . ( * metricsql . NumberExpr ) ; ok {
2019-05-22 21:16:55 +00:00
rv := evalNumber ( ec , ne . N )
return rv , nil
}
2019-12-25 19:35:47 +00:00
if se , ok := e . ( * metricsql . StringExpr ) ; ok {
2019-05-22 21:16:55 +00:00
rv := evalString ( ec , se . S )
return rv , nil
}
2021-07-12 14:16:38 +00:00
if de , ok := e . ( * metricsql . DurationExpr ) ; ok {
d := de . Duration ( ec . Step )
dSec := float64 ( d ) / 1000
rv := evalNumber ( ec , dSec )
return rv , nil
}
2019-05-22 21:16:55 +00:00
return nil , fmt . Errorf ( "unexpected expression %q" , e . AppendString ( nil ) )
}
2022-05-31 23:29:19 +00:00
func evalTransformFunc ( qt * querytracer . Tracer , ec * EvalConfig , fe * metricsql . FuncExpr ) ( [ ] * timeseries , error ) {
tf := getTransformFunc ( fe . Name )
if tf == nil {
2022-08-15 10:50:14 +00:00
return nil , & UserReadableError {
Err : fmt . Errorf ( ` unknown func %q ` , fe . Name ) ,
}
2022-05-31 23:29:19 +00:00
}
2022-09-02 16:46:25 +00:00
var args [ ] [ ] * timeseries
var err error
switch fe . Name {
case "" , "union" :
args , err = evalExprsInParallel ( qt , ec , fe . Args )
default :
args , err = evalExprsSequentially ( qt , ec , fe . Args )
}
2022-07-12 16:48:24 +00:00
if err != nil {
return nil , err
}
2022-05-31 23:29:19 +00:00
tfa := & transformFuncArg {
ec : ec ,
fe : fe ,
args : args ,
}
rv , err := tf ( tfa )
if err != nil {
2022-08-26 22:35:46 +00:00
return nil , & UserReadableError {
Err : fmt . Errorf ( ` cannot evaluate %q: %w ` , fe . AppendString ( nil ) , err ) ,
}
2022-05-31 23:29:19 +00:00
}
return rv , nil
}
func evalAggrFunc ( qt * querytracer . Tracer , ec * EvalConfig , ae * metricsql . AggrFuncExpr ) ( [ ] * timeseries , error ) {
if callbacks := getIncrementalAggrFuncCallbacks ( ae . Name ) ; callbacks != nil {
fe , nrf := tryGetArgRollupFuncWithMetricExpr ( ae )
if fe != nil {
// There is an optimized path for calculating metricsql.AggrFuncExpr over rollupFunc over metricsql.MetricExpr.
// The optimized path saves RAM for aggregates over big number of time series.
args , re , err := evalRollupFuncArgs ( qt , ec , fe )
if err != nil {
return nil , err
}
rf , err := nrf ( args )
if err != nil {
return nil , err
}
iafc := newIncrementalAggrFuncContext ( ae , callbacks )
return evalRollupFunc ( qt , ec , fe . Name , rf , ae , re , iafc )
}
}
2022-09-02 16:46:25 +00:00
args , err := evalExprsInParallel ( qt , ec , ae . Args )
2022-05-31 23:29:19 +00:00
if err != nil {
return nil , err
}
af := getAggrFunc ( ae . Name )
if af == nil {
2022-08-15 10:50:14 +00:00
return nil , & UserReadableError {
Err : fmt . Errorf ( ` unknown func %q ` , ae . Name ) ,
}
2022-05-31 23:29:19 +00:00
}
afa := & aggrFuncArg {
ae : ae ,
args : args ,
ec : ec ,
}
2023-02-24 04:05:11 +00:00
qtChild := qt . NewChild ( "eval %s" , ae . Name )
2022-05-31 23:29:19 +00:00
rv , err := af ( afa )
2023-02-24 04:05:11 +00:00
qtChild . Done ( )
2022-05-31 23:29:19 +00:00
if err != nil {
return nil , fmt . Errorf ( ` cannot evaluate %q: %w ` , ae . AppendString ( nil ) , err )
}
return rv , nil
}
func evalBinaryOp ( qt * querytracer . Tracer , ec * EvalConfig , be * metricsql . BinaryOpExpr ) ( [ ] * timeseries , error ) {
bf := getBinaryOpFunc ( be . Op )
if bf == nil {
return nil , fmt . Errorf ( ` unknown binary op %q ` , be . Op )
}
var err error
var tssLeft , tssRight [ ] * timeseries
switch strings . ToLower ( be . Op ) {
case "and" , "if" :
// Fetch right-side series at first, since it usually contains
// lower number of time series for `and` and `if` operator.
// This should produce more specific label filters for the left side of the query.
// This, in turn, should reduce the time to select series for the left side of the query.
tssRight , tssLeft , err = execBinaryOpArgs ( qt , ec , be . Right , be . Left , be )
default :
tssLeft , tssRight , err = execBinaryOpArgs ( qt , ec , be . Left , be . Right , be )
}
if err != nil {
return nil , fmt . Errorf ( "cannot execute %q: %w" , be . AppendString ( nil ) , err )
}
bfa := & binaryOpFuncArg {
be : be ,
left : tssLeft ,
right : tssRight ,
}
rv , err := bf ( bfa )
if err != nil {
return nil , fmt . Errorf ( ` cannot evaluate %q: %w ` , be . AppendString ( nil ) , err )
}
return rv , nil
}
2022-07-19 11:27:45 +00:00
func canPushdownCommonFilters ( be * metricsql . BinaryOpExpr ) bool {
switch strings . ToLower ( be . Op ) {
case "or" , "default" :
return false
}
if isAggrFuncWithoutGrouping ( be . Left ) || isAggrFuncWithoutGrouping ( be . Right ) {
return false
}
return true
}
func isAggrFuncWithoutGrouping ( e metricsql . Expr ) bool {
afe , ok := e . ( * metricsql . AggrFuncExpr )
if ! ok {
return false
}
return len ( afe . Modifier . Args ) == 0
}
2022-05-31 23:29:19 +00:00
func execBinaryOpArgs ( qt * querytracer . Tracer , ec * EvalConfig , exprFirst , exprSecond metricsql . Expr , be * metricsql . BinaryOpExpr ) ( [ ] * timeseries , [ ] * timeseries , error ) {
2022-07-19 11:27:45 +00:00
if ! canPushdownCommonFilters ( be ) {
// Execute exprFirst and exprSecond in parallel, since it is impossible to pushdown common filters
// from exprFirst to exprSecond.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2886
qt = qt . NewChild ( "execute left and right sides of %q in parallel" , be . Op )
defer qt . Done ( )
var wg sync . WaitGroup
var tssFirst [ ] * timeseries
var errFirst error
qtFirst := qt . NewChild ( "expr1" )
wg . Add ( 1 )
go func ( ) {
defer wg . Done ( )
tssFirst , errFirst = evalExpr ( qtFirst , ec , exprFirst )
qtFirst . Done ( )
} ( )
var tssSecond [ ] * timeseries
var errSecond error
qtSecond := qt . NewChild ( "expr2" )
wg . Add ( 1 )
go func ( ) {
defer wg . Done ( )
tssSecond , errSecond = evalExpr ( qtSecond , ec , exprSecond )
qtSecond . Done ( )
} ( )
wg . Wait ( )
if errFirst != nil {
return nil , nil , errFirst
}
if errSecond != nil {
2022-07-20 14:44:28 +00:00
return nil , nil , errSecond
2022-07-19 11:27:45 +00:00
}
return tssFirst , tssSecond , nil
}
2022-01-31 17:32:36 +00:00
// Execute binary operation in the following way:
//
// 1) execute the exprFirst
// 2) get common label filters for series returned at step 1
// 3) push down the found common label filters to exprSecond. This filters out unneeded series
2023-02-13 12:27:13 +00:00
// during exprSecond execution instead of spending compute resources on extracting and processing these series
2022-01-31 17:32:36 +00:00
// before they are dropped later when matching time series according to https://prometheus.io/docs/prometheus/latest/querying/operators/#vector-matching
// 4) execute the exprSecond with possible additional filters found at step 3
//
// Typical use cases:
// - Kubernetes-related: show pod creation time with the node name:
//
// kube_pod_created{namespace="prod"} * on (uid) group_left(node) kube_pod_info
//
// Without the optimization `kube_pod_info` would select and spend compute resources
// for more time series than needed. The selected time series would be dropped later
// when matching time series on the right and left sides of binary operand.
//
// - Generic alerting queries, which rely on `info` metrics.
// See https://grafana.com/blog/2021/08/04/how-to-use-promql-joins-for-more-effective-queries-of-prometheus-metrics-at-scale/
//
// - Queries, which get additional labels from `info` metrics.
// See https://www.robustperception.io/exposing-the-software-version-to-prometheus
2022-05-31 23:29:19 +00:00
tssFirst , err := evalExpr ( qt , ec , exprFirst )
2022-01-31 17:32:36 +00:00
if err != nil {
return nil , nil , err
}
2022-11-21 14:08:48 +00:00
if len ( tssFirst ) == 0 && strings . ToLower ( be . Op ) != "or" {
// Fast path: there is no sense in executing the exprSecond when exprFirst returns an empty result,
// since the "exprFirst op exprSecond" would return an empty result in any case.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3349
return nil , nil , nil
}
2022-07-19 11:27:45 +00:00
lfs := getCommonLabelFilters ( tssFirst )
lfs = metricsql . TrimFiltersByGroupModifier ( lfs , be )
exprSecond = metricsql . PushdownBinaryOpFilters ( exprSecond , lfs )
2022-05-31 23:29:19 +00:00
tssSecond , err := evalExpr ( qt , ec , exprSecond )
2022-01-31 17:32:36 +00:00
if err != nil {
return nil , nil , err
}
return tssFirst , tssSecond , nil
}
func getCommonLabelFilters ( tss [ ] * timeseries ) [ ] metricsql . LabelFilter {
2023-01-15 20:55:37 +00:00
if len ( tss ) == 0 {
return nil
}
type valuesCounter struct {
values map [ string ] struct { }
count int
}
m := make ( map [ string ] * valuesCounter , len ( tss [ 0 ] . MetricName . Tags ) )
2022-01-31 17:32:36 +00:00
for _ , ts := range tss {
for _ , tag := range ts . MetricName . Tags {
2023-01-15 20:55:37 +00:00
vc , ok := m [ string ( tag . Key ) ]
if ! ok {
2023-10-10 11:44:02 +00:00
k := string ( tag . Key )
v := string ( tag . Value )
2023-01-15 20:55:37 +00:00
m [ k ] = & valuesCounter {
values : map [ string ] struct { } {
v : { } ,
} ,
count : 1 ,
}
continue
}
if len ( vc . values ) > 100 {
// Too many unique values found for the given tag.
// Do not make a filter on such values, since it may slow down
// search for matching time series.
continue
}
vc . count ++
if _ , ok := vc . values [ string ( tag . Value ) ] ; ! ok {
2023-10-10 11:44:02 +00:00
vc . values [ string ( tag . Value ) ] = struct { } { }
2023-01-15 20:55:37 +00:00
}
2022-01-31 17:32:36 +00:00
}
}
lfs := make ( [ ] metricsql . LabelFilter , 0 , len ( m ) )
2023-01-15 20:55:37 +00:00
var values [ ] string
for k , vc := range m {
if vc . count != len ( tss ) {
2022-01-31 17:32:36 +00:00
// Skip the tag, since it doesn't belong to all the time series.
continue
}
2023-01-15 20:55:37 +00:00
values = values [ : 0 ]
for s := range vc . values {
values = append ( values , s )
2022-02-02 21:37:35 +00:00
}
2022-01-31 17:32:36 +00:00
lf := metricsql . LabelFilter {
2023-01-15 20:55:37 +00:00
Label : k ,
2022-01-31 17:32:36 +00:00
}
if len ( values ) == 1 {
lf . Value = values [ 0 ]
} else {
2022-02-02 21:37:35 +00:00
sort . Strings ( values )
2022-01-31 17:32:36 +00:00
lf . Value = joinRegexpValues ( values )
lf . IsRegexp = true
}
lfs = append ( lfs , lf )
}
sort . Slice ( lfs , func ( i , j int ) bool {
return lfs [ i ] . Label < lfs [ j ] . Label
} )
return lfs
}
func joinRegexpValues ( a [ ] string ) string {
var b [ ] byte
for i , s := range a {
sQuoted := regexp . QuoteMeta ( s )
b = append ( b , sQuoted ... )
if i < len ( a ) - 1 {
b = append ( b , '|' )
}
}
return string ( b )
}
2019-12-25 19:35:47 +00:00
func tryGetArgRollupFuncWithMetricExpr ( ae * metricsql . AggrFuncExpr ) ( * metricsql . FuncExpr , newRollupFunc ) {
2019-07-10 09:57:27 +00:00
if len ( ae . Args ) != 1 {
return nil , nil
}
e := ae . Args [ 0 ]
// Make sure e contains one of the following:
// - metricExpr
// - metricExpr[d]
// - rollupFunc(metricExpr)
// - rollupFunc(metricExpr[d])
2019-12-25 19:35:47 +00:00
if me , ok := e . ( * metricsql . MetricExpr ) ; ok {
2019-07-10 09:57:27 +00:00
// e = metricExpr
if me . IsEmpty ( ) {
return nil , nil
}
2019-12-25 19:35:47 +00:00
fe := & metricsql . FuncExpr {
2019-07-10 09:57:27 +00:00
Name : "default_rollup" ,
2019-12-25 19:35:47 +00:00
Args : [ ] metricsql . Expr { me } ,
2019-07-10 09:57:27 +00:00
}
nrf := getRollupFunc ( fe . Name )
return fe , nrf
}
2019-12-25 19:35:47 +00:00
if re , ok := e . ( * metricsql . RollupExpr ) ; ok {
if me , ok := re . Expr . ( * metricsql . MetricExpr ) ; ! ok || me . IsEmpty ( ) || re . ForSubquery ( ) {
2019-07-10 09:57:27 +00:00
return nil , nil
}
2019-09-13 18:40:46 +00:00
// e = metricExpr[d]
2019-12-25 19:35:47 +00:00
fe := & metricsql . FuncExpr {
2019-07-10 09:57:27 +00:00
Name : "default_rollup" ,
2019-12-25 19:35:47 +00:00
Args : [ ] metricsql . Expr { re } ,
2019-07-10 09:57:27 +00:00
}
nrf := getRollupFunc ( fe . Name )
return fe , nrf
}
2019-12-25 19:35:47 +00:00
fe , ok := e . ( * metricsql . FuncExpr )
2019-07-10 09:57:27 +00:00
if ! ok {
return nil , nil
}
nrf := getRollupFunc ( fe . Name )
if nrf == nil {
return nil , nil
}
2022-01-13 20:12:02 +00:00
rollupArgIdx := metricsql . GetRollupArgIdx ( fe )
2020-01-15 14:26:02 +00:00
if rollupArgIdx >= len ( fe . Args ) {
// Incorrect number of args for rollup func.
return nil , nil
}
2019-07-10 09:57:27 +00:00
arg := fe . Args [ rollupArgIdx ]
2019-12-25 19:35:47 +00:00
if me , ok := arg . ( * metricsql . MetricExpr ) ; ok {
2019-07-10 09:57:27 +00:00
if me . IsEmpty ( ) {
return nil , nil
}
2019-09-13 18:40:46 +00:00
// e = rollupFunc(metricExpr)
2023-12-14 10:38:54 +00:00
return fe , nrf
2019-07-10 09:57:27 +00:00
}
2019-12-25 19:35:47 +00:00
if re , ok := arg . ( * metricsql . RollupExpr ) ; ok {
if me , ok := re . Expr . ( * metricsql . MetricExpr ) ; ! ok || me . IsEmpty ( ) || re . ForSubquery ( ) {
2019-07-10 09:57:27 +00:00
return nil , nil
}
2019-09-13 18:40:46 +00:00
// e = rollupFunc(metricExpr[d])
2019-07-10 09:57:27 +00:00
return fe , nrf
}
return nil , nil
}
2022-09-02 16:46:25 +00:00
func evalExprsSequentially ( qt * querytracer . Tracer , ec * EvalConfig , es [ ] metricsql . Expr ) ( [ ] [ ] * timeseries , error ) {
2019-05-22 21:16:55 +00:00
var rvs [ ] [ ] * timeseries
for _ , e := range es {
2022-05-31 23:29:19 +00:00
rv , err := evalExpr ( qt , ec , e )
2019-05-22 21:16:55 +00:00
if err != nil {
return nil , err
}
rvs = append ( rvs , rv )
}
return rvs , nil
}
2022-09-02 16:46:25 +00:00
func evalExprsInParallel ( qt * querytracer . Tracer , ec * EvalConfig , es [ ] metricsql . Expr ) ( [ ] [ ] * timeseries , error ) {
if len ( es ) < 2 {
return evalExprsSequentially ( qt , ec , es )
}
rvs := make ( [ ] [ ] * timeseries , len ( es ) )
errs := make ( [ ] error , len ( es ) )
2023-01-11 06:23:27 +00:00
qt . Printf ( "eval function args in parallel" )
2022-09-02 16:46:25 +00:00
var wg sync . WaitGroup
for i , e := range es {
wg . Add ( 1 )
qtChild := qt . NewChild ( "eval arg %d" , i )
go func ( e metricsql . Expr , i int ) {
defer func ( ) {
qtChild . Done ( )
wg . Done ( )
} ( )
rv , err := evalExpr ( qtChild , ec , e )
rvs [ i ] = rv
errs [ i ] = err
} ( e , i )
}
wg . Wait ( )
for _ , err := range errs {
if err != nil {
return nil , err
}
}
return rvs , nil
}
2022-05-31 23:29:19 +00:00
func evalRollupFuncArgs ( qt * querytracer . Tracer , ec * EvalConfig , fe * metricsql . FuncExpr ) ( [ ] interface { } , * metricsql . RollupExpr , error ) {
2019-12-25 19:35:47 +00:00
var re * metricsql . RollupExpr
2022-01-13 20:12:02 +00:00
rollupArgIdx := metricsql . GetRollupArgIdx ( fe )
2020-01-10 19:16:14 +00:00
if len ( fe . Args ) <= rollupArgIdx {
2020-01-15 14:26:02 +00:00
return nil , nil , fmt . Errorf ( "expecting at least %d args to %q; got %d args; expr: %q" , rollupArgIdx + 1 , fe . Name , len ( fe . Args ) , fe . AppendString ( nil ) )
2020-01-10 19:16:14 +00:00
}
2019-05-22 21:16:55 +00:00
args := make ( [ ] interface { } , len ( fe . Args ) )
for i , arg := range fe . Args {
if i == rollupArgIdx {
re = getRollupExprArg ( arg )
args [ i ] = re
continue
}
2022-05-31 23:29:19 +00:00
ts , err := evalExpr ( qt , ec , arg )
2019-05-22 21:16:55 +00:00
if err != nil {
2020-06-30 19:58:18 +00:00
return nil , nil , fmt . Errorf ( "cannot evaluate arg #%d for %q: %w" , i + 1 , fe . AppendString ( nil ) , err )
2019-05-22 21:16:55 +00:00
}
args [ i ] = ts
}
return args , re , nil
}
2019-12-25 19:35:47 +00:00
func getRollupExprArg ( arg metricsql . Expr ) * metricsql . RollupExpr {
re , ok := arg . ( * metricsql . RollupExpr )
2019-05-22 21:16:55 +00:00
if ! ok {
2019-12-25 19:35:47 +00:00
// Wrap non-rollup arg into metricsql.RollupExpr.
return & metricsql . RollupExpr {
2019-05-22 21:16:55 +00:00
Expr : arg ,
}
}
2019-09-13 18:40:46 +00:00
if ! re . ForSubquery ( ) {
// Return standard rollup if it doesn't contain subquery.
2019-05-22 21:16:55 +00:00
return re
}
2019-12-25 19:35:47 +00:00
me , ok := re . Expr . ( * metricsql . MetricExpr )
2019-05-22 21:16:55 +00:00
if ! ok {
// arg contains subquery.
return re
}
// Convert me[w:step] -> default_rollup(me)[w:step]
reNew := * re
2019-12-25 19:35:47 +00:00
reNew . Expr = & metricsql . FuncExpr {
2019-05-22 21:16:55 +00:00
Name : "default_rollup" ,
2019-12-25 19:35:47 +00:00
Args : [ ] metricsql . Expr {
& metricsql . RollupExpr { Expr : me } ,
2019-05-22 21:16:55 +00:00
} ,
}
return & reNew
}
2022-01-17 13:27:00 +00:00
// expr may contain:
// - rollupFunc(m) if iafc is nil
// - aggrFunc(rollupFunc(m)) if iafc isn't nil
2022-05-31 23:29:19 +00:00
func evalRollupFunc ( qt * querytracer . Tracer , ec * EvalConfig , funcName string , rf rollupFunc , expr metricsql . Expr ,
re * metricsql . RollupExpr , iafc * incrementalAggrFuncContext ) ( [ ] * timeseries , error ) {
2022-01-13 20:12:02 +00:00
if re . At == nil {
2022-05-31 23:29:19 +00:00
return evalRollupFuncWithoutAt ( qt , ec , funcName , rf , expr , re , iafc )
2022-01-13 20:12:02 +00:00
}
2022-05-31 23:29:19 +00:00
tssAt , err := evalExpr ( qt , ec , re . At )
2022-01-13 20:12:02 +00:00
if err != nil {
2022-08-15 10:50:14 +00:00
return nil , & UserReadableError {
Err : fmt . Errorf ( "cannot evaluate `@` modifier: %w" , err ) ,
}
2022-01-13 20:12:02 +00:00
}
if len ( tssAt ) != 1 {
2022-08-15 10:50:14 +00:00
return nil , & UserReadableError {
Err : fmt . Errorf ( "`@` modifier must return a single series; it returns %d series instead" , len ( tssAt ) ) ,
}
2022-01-13 20:12:02 +00:00
}
atTimestamp := int64 ( tssAt [ 0 ] . Values [ 0 ] * 1000 )
2022-03-26 08:17:37 +00:00
ecNew := copyEvalConfig ( ec )
2022-01-13 20:12:02 +00:00
ecNew . Start = atTimestamp
ecNew . End = atTimestamp
2022-05-31 23:29:19 +00:00
tss , err := evalRollupFuncWithoutAt ( qt , ecNew , funcName , rf , expr , re , iafc )
2022-01-13 20:12:02 +00:00
if err != nil {
return nil , err
}
2022-01-14 02:05:39 +00:00
// expand single-point tss to the original time range.
2022-01-13 20:12:02 +00:00
timestamps := ec . getSharedTimestamps ( )
for _ , ts := range tss {
v := ts . Values [ 0 ]
values := make ( [ ] float64 , len ( timestamps ) )
for i := range timestamps {
values [ i ] = v
}
ts . Timestamps = timestamps
ts . Values = values
}
return tss , nil
}
2022-05-31 23:29:19 +00:00
func evalRollupFuncWithoutAt ( qt * querytracer . Tracer , ec * EvalConfig , funcName string , rf rollupFunc ,
expr metricsql . Expr , re * metricsql . RollupExpr , iafc * incrementalAggrFuncContext ) ( [ ] * timeseries , error ) {
2021-09-17 20:33:15 +00:00
funcName = strings . ToLower ( funcName )
2019-05-22 21:16:55 +00:00
ecNew := ec
var offset int64
2021-07-12 14:16:38 +00:00
if re . Offset != nil {
offset = re . Offset . Duration ( ec . Step )
2022-03-26 08:17:37 +00:00
ecNew = copyEvalConfig ( ecNew )
2019-05-22 21:16:55 +00:00
ecNew . Start -= offset
ecNew . End -= offset
2020-12-27 12:09:22 +00:00
// There is no need in calling AdjustStartEnd() on ecNew if ecNew.MayCache is set to true,
// since the time range alignment has been already performed by the caller,
// so cache hit rate should be quite good.
// See also https://github.com/VictoriaMetrics/VictoriaMetrics/issues/976
2019-05-22 21:16:55 +00:00
}
2021-09-17 20:33:15 +00:00
if funcName == "rollup_candlestick" {
2020-02-04 21:23:37 +00:00
// Automatically apply `offset -step` to `rollup_candlestick` function
// in order to obtain expected OHLC results.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/309#issuecomment-582113462
step := ecNew . Step
2022-03-26 08:17:37 +00:00
ecNew = copyEvalConfig ( ecNew )
2020-02-04 21:23:37 +00:00
ecNew . Start += step
ecNew . End += step
offset -= step
}
2019-05-22 21:16:55 +00:00
var rvs [ ] * timeseries
var err error
2019-12-25 19:35:47 +00:00
if me , ok := re . Expr . ( * metricsql . MetricExpr ) ; ok {
2022-05-31 23:29:19 +00:00
rvs , err = evalRollupFuncWithMetricExpr ( qt , ecNew , funcName , rf , expr , me , iafc , re . Window )
2019-05-22 21:16:55 +00:00
} else {
2019-07-10 09:57:27 +00:00
if iafc != nil {
2021-09-17 20:33:15 +00:00
logger . Panicf ( "BUG: iafc must be nil for rollup %q over subquery %q" , funcName , re . AppendString ( nil ) )
2019-07-10 09:57:27 +00:00
}
2022-05-31 23:29:19 +00:00
rvs , err = evalRollupFuncWithSubquery ( qt , ecNew , funcName , rf , expr , re )
2019-05-22 21:16:55 +00:00
}
if err != nil {
2022-08-15 10:50:14 +00:00
return nil , & UserReadableError {
Err : err ,
}
2019-05-22 21:16:55 +00:00
}
2022-02-12 13:45:06 +00:00
if funcName == "absent_over_time" {
2023-11-16 14:52:38 +00:00
rvs = aggregateAbsentOverTime ( ecNew , re . Expr , rvs )
2022-02-12 13:45:06 +00:00
}
2019-05-22 21:16:55 +00:00
if offset != 0 && len ( rvs ) > 0 {
// Make a copy of timestamps, since they may be used in other values.
srcTimestamps := rvs [ 0 ] . Timestamps
dstTimestamps := append ( [ ] int64 { } , srcTimestamps ... )
for i := range dstTimestamps {
dstTimestamps [ i ] += offset
}
for _ , ts := range rvs {
ts . Timestamps = dstTimestamps
}
}
return rvs , nil
}
2022-02-12 13:45:06 +00:00
// aggregateAbsentOverTime collapses tss to a single time series with 1 and nan values.
//
// Values for returned series are set to nan if at least a single tss series contains nan at that point.
// This means that tss contains a series with non-empty results at that point.
// This follows Prometheus logic - see https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2130
func aggregateAbsentOverTime ( ec * EvalConfig , expr metricsql . Expr , tss [ ] * timeseries ) [ ] * timeseries {
rvs := getAbsentTimeseries ( ec , expr )
if len ( tss ) == 0 {
return rvs
}
for i := range tss [ 0 ] . Values {
for _ , ts := range tss {
if math . IsNaN ( ts . Values [ i ] ) {
rvs [ 0 ] . Values [ i ] = nan
break
}
}
}
return rvs
}
2022-05-31 23:29:19 +00:00
func evalRollupFuncWithSubquery ( qt * querytracer . Tracer , ec * EvalConfig , funcName string , rf rollupFunc , expr metricsql . Expr , re * metricsql . RollupExpr ) ( [ ] * timeseries , error ) {
2020-01-03 18:42:51 +00:00
// TODO: determine whether to use rollupResultCacheV here.
2022-06-08 18:05:17 +00:00
qt = qt . NewChild ( "subquery" )
defer qt . Done ( )
2023-08-12 11:47:50 +00:00
step , err := re . Step . NonNegativeDuration ( ec . Step )
if err != nil {
return nil , fmt . Errorf ( "cannot parse step in square brackets at %s: %w" , expr . AppendString ( nil ) , err )
}
2021-07-12 14:16:38 +00:00
if step == 0 {
2019-05-22 21:16:55 +00:00
step = ec . Step
}
2023-08-12 11:47:50 +00:00
window , err := re . Window . NonNegativeDuration ( ec . Step )
if err != nil {
return nil , fmt . Errorf ( "cannot parse lookbehind window in square brackets at %s: %w" , expr . AppendString ( nil ) , err )
}
2019-05-22 21:16:55 +00:00
2022-03-26 08:17:37 +00:00
ecSQ := copyEvalConfig ( ec )
2023-12-06 12:15:48 +00:00
ecSQ . Start -= window + step + maxSilenceInterval ( )
2020-09-03 10:21:51 +00:00
ecSQ . End += step
2019-05-22 21:16:55 +00:00
ecSQ . Step = step
2022-08-24 12:25:18 +00:00
ecSQ . MaxPointsPerSeries = * maxPointsSubqueryPerTimeseries
if err := ValidateMaxPointsPerSeries ( ecSQ . Start , ecSQ . End , ecSQ . Step , ecSQ . MaxPointsPerSeries ) ; err != nil {
2022-09-06 10:25:59 +00:00
return nil , fmt . Errorf ( "%w; (see -search.maxPointsSubqueryPerTimeseries command-line flag)" , err )
2019-05-22 21:16:55 +00:00
}
2020-09-03 10:21:51 +00:00
// unconditionally align start and end args to step for subquery as Prometheus does.
ecSQ . Start , ecSQ . End = alignStartEnd ( ecSQ . Start , ecSQ . End , ecSQ . Step )
2022-05-31 23:29:19 +00:00
tssSQ , err := evalExpr ( qt , ecSQ , re . Expr )
2019-05-22 21:16:55 +00:00
if err != nil {
return nil , err
}
2020-01-03 22:46:39 +00:00
if len ( tssSQ ) == 0 {
return nil , nil
}
2022-08-24 12:25:18 +00:00
sharedTimestamps := getTimestamps ( ec . Start , ec . End , ec . Step , ec . MaxPointsPerSeries )
preFunc , rcs , err := getRollupConfigs ( funcName , rf , expr , ec . Start , ec . End , ec . Step , ec . MaxPointsPerSeries , window , ec . LookbackDelta , sharedTimestamps )
2020-01-10 19:16:14 +00:00
if err != nil {
return nil , err
}
2023-03-25 06:07:12 +00:00
2022-06-28 16:26:17 +00:00
var samplesScannedTotal uint64
2022-01-14 02:05:39 +00:00
keepMetricNames := getKeepMetricNames ( expr )
2023-03-25 06:34:34 +00:00
tsw := getTimeseriesByWorkerID ( )
seriesByWorkerID := tsw . byWorkerID
2023-03-21 03:54:54 +00:00
doParallel ( tssSQ , func ( tsSQ * timeseries , values [ ] float64 , timestamps [ ] int64 , workerID uint ) ( [ ] float64 , [ ] int64 ) {
2019-05-22 21:16:55 +00:00
values , timestamps = removeNanValues ( values [ : 0 ] , timestamps [ : 0 ] , tsSQ . Values , tsSQ . Timestamps )
preFunc ( values , timestamps )
for _ , rc := range rcs {
2022-01-14 02:05:39 +00:00
if tsm := newTimeseriesMap ( funcName , keepMetricNames , sharedTimestamps , & tsSQ . MetricName ) ; tsm != nil {
2022-06-28 16:26:17 +00:00
samplesScanned := rc . DoTimeseriesMap ( tsm , values , timestamps )
atomic . AddUint64 ( & samplesScannedTotal , samplesScanned )
2023-03-25 06:07:12 +00:00
seriesByWorkerID [ workerID ] . tss = tsm . AppendTimeseriesTo ( seriesByWorkerID [ workerID ] . tss )
2020-01-03 21:50:47 +00:00
continue
}
2019-05-22 21:16:55 +00:00
var ts timeseries
2022-06-28 16:26:17 +00:00
samplesScanned := doRollupForTimeseries ( funcName , keepMetricNames , rc , & ts , & tsSQ . MetricName , values , timestamps , sharedTimestamps )
atomic . AddUint64 ( & samplesScannedTotal , samplesScanned )
2023-03-25 06:07:12 +00:00
seriesByWorkerID [ workerID ] . tss = append ( seriesByWorkerID [ workerID ] . tss , & ts )
2019-05-22 21:16:55 +00:00
}
return values , timestamps
} )
2023-03-25 06:07:12 +00:00
tss := make ( [ ] * timeseries , 0 , len ( tssSQ ) * len ( rcs ) )
for i := range seriesByWorkerID {
tss = append ( tss , seriesByWorkerID [ i ] . tss ... )
}
2023-03-25 06:34:34 +00:00
putTimeseriesByWorkerID ( tsw )
2023-03-25 06:07:12 +00:00
2022-06-28 17:18:08 +00:00
rowsScannedPerQuery . Update ( float64 ( samplesScannedTotal ) )
2022-06-28 16:26:17 +00:00
qt . Printf ( "rollup %s() over %d series returned by subquery: series=%d, samplesScanned=%d" , funcName , len ( tssSQ ) , len ( tss ) , samplesScannedTotal )
2019-05-22 21:16:55 +00:00
return tss , nil
}
2022-06-28 17:18:08 +00:00
var rowsScannedPerQuery = metrics . NewHistogram ( ` vm_rows_scanned_per_query ` )
2022-01-14 02:05:39 +00:00
func getKeepMetricNames ( expr metricsql . Expr ) bool {
2022-01-17 13:27:00 +00:00
if ae , ok := expr . ( * metricsql . AggrFuncExpr ) ; ok {
// Extract rollupFunc(...) from aggrFunc(rollupFunc(...)).
// This case is possible when optimized aggrFunc calculations are used
// such as `sum(rate(...))`
if len ( ae . Args ) != 1 {
return false
}
expr = ae . Args [ 0 ]
}
2022-01-14 02:05:39 +00:00
if fe , ok := expr . ( * metricsql . FuncExpr ) ; ok {
return fe . KeepMetricNames
}
return false
}
2023-03-21 03:54:54 +00:00
func doParallel ( tss [ ] * timeseries , f func ( ts * timeseries , values [ ] float64 , timestamps [ ] int64 , workerID uint ) ( [ ] float64 , [ ] int64 ) ) {
workers := netstorage . MaxWorkers ( )
if workers > len ( tss ) {
workers = len ( tss )
2019-05-22 21:16:55 +00:00
}
2023-03-21 03:54:54 +00:00
seriesPerWorker := ( len ( tss ) + workers - 1 ) / workers
workChs := make ( [ ] chan * timeseries , workers )
for i := range workChs {
workChs [ i ] = make ( chan * timeseries , seriesPerWorker )
}
for i , ts := range tss {
idx := i % len ( workChs )
workChs [ idx ] <- ts
}
for _ , workCh := range workChs {
close ( workCh )
}
2019-05-22 21:16:55 +00:00
var wg sync . WaitGroup
2023-03-21 03:54:54 +00:00
wg . Add ( workers )
for i := 0 ; i < workers ; i ++ {
go func ( workerID uint ) {
2019-05-22 21:16:55 +00:00
defer wg . Done ( )
var tmpValues [ ] float64
var tmpTimestamps [ ] int64
2023-03-21 03:54:54 +00:00
for ts := range workChs [ workerID ] {
tmpValues , tmpTimestamps = f ( ts , tmpValues , tmpTimestamps , workerID )
2019-05-22 21:16:55 +00:00
}
2023-03-21 03:54:54 +00:00
} ( uint ( i ) )
2019-05-22 21:16:55 +00:00
}
wg . Wait ( )
}
func removeNanValues ( dstValues [ ] float64 , dstTimestamps [ ] int64 , values [ ] float64 , timestamps [ ] int64 ) ( [ ] float64 , [ ] int64 ) {
hasNan := false
for _ , v := range values {
if math . IsNaN ( v ) {
hasNan = true
}
}
if ! hasNan {
// Fast path - no NaNs.
dstValues = append ( dstValues , values ... )
dstTimestamps = append ( dstTimestamps , timestamps ... )
return dstValues , dstTimestamps
}
// Slow path - remove NaNs.
for i , v := range values {
if math . IsNaN ( v ) {
continue
}
dstValues = append ( dstValues , v )
dstTimestamps = append ( dstTimestamps , timestamps [ i ] )
}
return dstValues , dstTimestamps
}
2023-10-31 18:24:18 +00:00
// evalInstantRollup evaluates instant rollup where ec.Start == ec.End.
func evalInstantRollup ( qt * querytracer . Tracer , ec * EvalConfig , funcName string , rf rollupFunc ,
expr metricsql . Expr , me * metricsql . MetricExpr , iafc * incrementalAggrFuncContext , window int64 ) ( [ ] * timeseries , error ) {
if ec . Start != ec . End {
logger . Panicf ( "BUG: evalInstantRollup cannot be called on non-empty time range; got %s" , ec . timeRangeString ( ) )
}
timestamp := ec . Start
if qt . Enabled ( ) {
qt = qt . NewChild ( "instant rollup %s; time=%s, window=%d" , expr . AppendString ( nil ) , storage . TimestampToHumanReadableFormat ( timestamp ) , window )
defer qt . Done ( )
}
evalAt := func ( qt * querytracer . Tracer , timestamp , window int64 ) ( [ ] * timeseries , error ) {
ecCopy := copyEvalConfig ( ec )
ecCopy . Start = timestamp
ecCopy . End = timestamp
pointsPerSeries := int64 ( 1 )
return evalRollupFuncNoCache ( qt , ecCopy , funcName , rf , expr , me , iafc , window , pointsPerSeries )
}
tooBigOffset := func ( offset int64 ) bool {
maxOffset := window / 2
2023-11-01 19:16:15 +00:00
if maxOffset > 1800 * 1000 {
maxOffset = 1800 * 1000
2023-10-31 18:24:18 +00:00
}
return offset >= maxOffset
}
2023-11-11 11:09:25 +00:00
deleteCachedSeries := func ( qt * querytracer . Tracer ) {
rollupResultCacheV . DeleteInstantValues ( qt , expr , window , ec . Step , ec . EnforcedTagFilterss )
}
getCachedSeries := func ( qt * querytracer . Tracer ) ( [ ] * timeseries , int64 , error ) {
again :
offset := int64 ( 0 )
tssCached := rollupResultCacheV . GetInstantValues ( qt , expr , window , ec . Step , ec . EnforcedTagFilterss )
ec . QueryStats . addSeriesFetched ( len ( tssCached ) )
if len ( tssCached ) == 0 {
// Cache miss. Re-populate the missing data.
start := int64 ( fasttime . UnixTimestamp ( ) * 1000 ) - cacheTimestampOffset . Milliseconds ( )
offset = timestamp - start
if offset < 0 {
start = timestamp
offset = 0
}
if tooBigOffset ( offset ) {
qt . Printf ( "cannot apply instant rollup optimization because the -search.cacheTimestampOffset=%s is too big " +
"for the requested time=%s and window=%d" , cacheTimestampOffset , storage . TimestampToHumanReadableFormat ( timestamp ) , window )
tss , err := evalAt ( qt , timestamp , window )
return tss , 0 , err
}
qt . Printf ( "calculating the rollup at time=%s, because it is missing in the cache" , storage . TimestampToHumanReadableFormat ( start ) )
tss , err := evalAt ( qt , start , window )
if err != nil {
return nil , 0 , err
}
if hasDuplicateSeries ( tss ) {
qt . Printf ( "cannot apply instant rollup optimization because the result contains duplicate series" )
tss , err := evalAt ( qt , timestamp , window )
return tss , 0 , err
}
rollupResultCacheV . PutInstantValues ( qt , expr , window , ec . Step , ec . EnforcedTagFilterss , tss )
return tss , offset , nil
}
// Cache hit. Verify whether it is OK to use the cached data.
offset = timestamp - tssCached [ 0 ] . Timestamps [ 0 ]
if offset < 0 {
qt . Printf ( "do not apply instant rollup optimization because the cached values have bigger timestamp=%s than the requested one=%s" ,
storage . TimestampToHumanReadableFormat ( tssCached [ 0 ] . Timestamps [ 0 ] ) , storage . TimestampToHumanReadableFormat ( timestamp ) )
// Delete the outdated cached values, so the cache could be re-populated with newer values.
deleteCachedSeries ( qt )
goto again
}
if tooBigOffset ( offset ) {
qt . Printf ( "do not apply instant rollup optimization because the offset=%d between the requested timestamp " +
"and the cached values is too big comparing to window=%d" , offset , window )
// Delete the outdated cached values, so the cache could be re-populated with newer values.
deleteCachedSeries ( qt )
goto again
}
return tssCached , offset , nil
}
2023-10-31 18:24:18 +00:00
if ! ec . mayCache ( ) {
qt . Printf ( "do not apply instant rollup optimization because of disabled cache" )
return evalAt ( qt , timestamp , window )
}
2023-11-01 19:16:15 +00:00
if window < minWindowForInstantRollupOptimization . Milliseconds ( ) {
qt . Printf ( "do not apply instant rollup optimization because of too small window=%d; must be equal or bigger than %d" ,
window , minWindowForInstantRollupOptimization . Milliseconds ( ) )
2023-10-31 18:24:18 +00:00
return evalAt ( qt , timestamp , window )
}
switch funcName {
case "avg_over_time" :
if iafc != nil {
qt . Printf ( "do not apply instant rollup optimization for incremental aggregate %s()" , iafc . ae . Name )
return evalAt ( qt , timestamp , window )
}
qt . Printf ( "optimized calculation for instant rollup avg_over_time(m[d]) as (sum_over_time(m[d]) / count_over_time(m[d]))" )
fe := expr . ( * metricsql . FuncExpr )
feSum := * fe
feSum . Name = "sum_over_time"
feCount := * fe
feCount . Name = "count_over_time"
be := & metricsql . BinaryOpExpr {
Op : "/" ,
KeepMetricNames : fe . KeepMetricNames ,
Left : & feSum ,
Right : & feCount ,
}
return evalExpr ( qt , ec , be )
case "rate" :
if iafc != nil {
if strings . ToLower ( iafc . ae . Name ) != "sum" {
qt . Printf ( "do not apply instant rollup optimization for incremental aggregate %s()" , iafc . ae . Name )
return evalAt ( qt , timestamp , window )
}
qt . Printf ( "optimized calculation for sum(rate(m[d])) as (sum(increase(m[d])) / d)" )
afe := expr . ( * metricsql . AggrFuncExpr )
fe := afe . Args [ 0 ] . ( * metricsql . FuncExpr )
feIncrease := * fe
feIncrease . Name = "increase"
re := fe . Args [ 0 ] . ( * metricsql . RollupExpr )
d := re . Window . Duration ( ec . Step )
if d == 0 {
d = ec . Step
}
afeIncrease := * afe
afeIncrease . Args = [ ] metricsql . Expr { & feIncrease }
be := & metricsql . BinaryOpExpr {
Op : "/" ,
KeepMetricNames : true ,
Left : & afeIncrease ,
Right : & metricsql . NumberExpr {
N : float64 ( d ) / 1000 ,
} ,
}
return evalExpr ( qt , ec , be )
}
qt . Printf ( "optimized calculation for instant rollup rate(m[d]) as (increase(m[d]) / d)" )
fe := expr . ( * metricsql . FuncExpr )
feIncrease := * fe
feIncrease . Name = "increase"
re := fe . Args [ 0 ] . ( * metricsql . RollupExpr )
d := re . Window . Duration ( ec . Step )
if d == 0 {
d = ec . Step
}
be := & metricsql . BinaryOpExpr {
Op : "/" ,
KeepMetricNames : fe . KeepMetricNames ,
Left : & feIncrease ,
Right : & metricsql . NumberExpr {
N : float64 ( d ) / 1000 ,
} ,
}
return evalExpr ( qt , ec , be )
2023-11-11 11:09:25 +00:00
case "max_over_time" :
if iafc != nil {
if strings . ToLower ( iafc . ae . Name ) != "max" {
qt . Printf ( "do not apply instant rollup optimization for non-max incremental aggregate %s()" , iafc . ae . Name )
return evalAt ( qt , timestamp , window )
}
}
// Calculate
//
// max_over_time(m[window] @ timestamp)
//
// as the maximum of
//
// - max_over_time(m[window] @ (timestamp-offset))
// - max_over_time(m[offset] @ timestamp)
//
// if max_over_time(m[offset] @ (timestamp-window)) < max_over_time(m[window] @ (timestamp-offset))
// otherwise do not apply the optimization
//
// where
//
// - max_over_time(m[window] @ (timestamp-offset)) is obtained from cache
// - max_over_time(m[offset] @ timestamp) and max_over_time(m[offset] @ (timestamp-window)) are calculated from the storage
// These rollups are calculated faster than max_over_time(m[window]) because offset is smaller than window.
qtChild := qt . NewChild ( "optimized calculation for instant rollup %s at time=%s with lookbehind window=%d" ,
expr . AppendString ( nil ) , storage . TimestampToHumanReadableFormat ( timestamp ) , window )
defer qtChild . Done ( )
tssCached , offset , err := getCachedSeries ( qtChild )
if err != nil {
return nil , err
}
if offset == 0 {
return tssCached , nil
}
2023-11-14 01:52:39 +00:00
// Calculate max_over_time(m[offset] @ timestamp)
tssStart , err := evalAt ( qtChild , timestamp , offset )
if err != nil {
return nil , err
}
if hasDuplicateSeries ( tssStart ) {
qtChild . Printf ( "cannot apply instant rollup optimization, since tssStart contains duplicate series" )
return evalAt ( qtChild , timestamp , window )
}
2023-11-11 11:09:25 +00:00
// Calculate max_over_time(m[offset] @ (timestamp - window))
tssEnd , err := evalAt ( qtChild , timestamp - window , offset )
if err != nil {
return nil , err
}
if hasDuplicateSeries ( tssEnd ) {
qtChild . Printf ( "cannot apply instant rollup optimization, since tssEnd contains duplicate series" )
return evalAt ( qtChild , timestamp , window )
}
2023-11-14 01:52:39 +00:00
// Calculate the result
2024-01-30 20:03:34 +00:00
tss , ok := getMaxInstantValues ( qtChild , tssCached , tssStart , tssEnd , timestamp )
2023-11-14 01:52:39 +00:00
if ! ok {
2023-11-11 11:09:25 +00:00
qtChild . Printf ( "cannot apply instant rollup optimization, since tssEnd contains bigger values than tssCached" )
deleteCachedSeries ( qtChild )
return evalAt ( qt , timestamp , window )
}
return tss , nil
case "min_over_time" :
if iafc != nil {
if strings . ToLower ( iafc . ae . Name ) != "min" {
qt . Printf ( "do not apply instant rollup optimization for non-min incremental aggregate %s()" , iafc . ae . Name )
return evalAt ( qt , timestamp , window )
}
}
// Calculate
//
// min_over_time(m[window] @ timestamp)
//
// as the minimum of
//
// - min_over_time(m[window] @ (timestamp-offset))
// - min_over_time(m[offset] @ timestamp)
//
// if min_over_time(m[offset] @ (timestamp-window)) > min_over_time(m[window] @ (timestamp-offset))
// otherwise do not apply the optimization
//
// where
//
// - min_over_time(m[window] @ (timestamp-offset)) is obtained from cache
// - min_over_time(m[offset] @ timestamp) and min_over_time(m[offset] @ (timestamp-window)) are calculated from the storage
// These rollups are calculated faster than min_over_time(m[window]) because offset is smaller than window.
qtChild := qt . NewChild ( "optimized calculation for instant rollup %s at time=%s with lookbehind window=%d" ,
expr . AppendString ( nil ) , storage . TimestampToHumanReadableFormat ( timestamp ) , window )
defer qtChild . Done ( )
tssCached , offset , err := getCachedSeries ( qtChild )
if err != nil {
return nil , err
}
if offset == 0 {
return tssCached , nil
}
2023-11-14 01:52:39 +00:00
// Calculate min_over_time(m[offset] @ timestamp)
tssStart , err := evalAt ( qtChild , timestamp , offset )
if err != nil {
return nil , err
}
if hasDuplicateSeries ( tssStart ) {
qtChild . Printf ( "cannot apply instant rollup optimization, since tssStart contains duplicate series" )
return evalAt ( qtChild , timestamp , window )
}
2023-11-11 11:09:25 +00:00
// Calculate min_over_time(m[offset] @ (timestamp - window))
tssEnd , err := evalAt ( qtChild , timestamp - window , offset )
if err != nil {
return nil , err
}
if hasDuplicateSeries ( tssEnd ) {
qtChild . Printf ( "cannot apply instant rollup optimization, since tssEnd contains duplicate series" )
return evalAt ( qtChild , timestamp , window )
}
2023-11-14 01:52:39 +00:00
// Calculate the result
2024-01-30 20:03:34 +00:00
tss , ok := getMinInstantValues ( qtChild , tssCached , tssStart , tssEnd , timestamp )
2023-11-14 01:52:39 +00:00
if ! ok {
2023-11-11 11:09:25 +00:00
qtChild . Printf ( "cannot apply instant rollup optimization, since tssEnd contains smaller values than tssCached" )
deleteCachedSeries ( qtChild )
return evalAt ( qt , timestamp , window )
}
return tss , nil
2023-11-01 08:57:55 +00:00
case
"count_eq_over_time" ,
"count_gt_over_time" ,
"count_le_over_time" ,
"count_ne_over_time" ,
"count_over_time" ,
"increase" ,
"increase_pure" ,
"sum_over_time" :
2023-10-31 18:24:18 +00:00
if iafc != nil && strings . ToLower ( iafc . ae . Name ) != "sum" {
qt . Printf ( "do not apply instant rollup optimization for non-sum incremental aggregate %s()" , iafc . ae . Name )
return evalAt ( qt , timestamp , window )
}
// Calculate
//
// rf(m[window] @ timestamp)
//
// as
//
// rf(m[window] @ (timestamp-offset)) + rf(m[offset] @ timestamp) - rf(m[offset] @ (timestamp-window))
//
// where
//
// - rf is count_over_time, sum_over_time or increase
// - rf(m[window] @ (timestamp-offset)) is obtained from cache
// - rf(m[offset] @ timestamp) and rf(m[offset] @ (timestamp-window)) are calculated from the storage
// These rollups are calculated faster than rf(m[window]) because offset is smaller than window.
qtChild := qt . NewChild ( "optimized calculation for instant rollup %s at time=%s with lookbehind window=%d" ,
expr . AppendString ( nil ) , storage . TimestampToHumanReadableFormat ( timestamp ) , window )
defer qtChild . Done ( )
2023-11-11 11:09:25 +00:00
tssCached , offset , err := getCachedSeries ( qtChild )
if err != nil {
return nil , err
2023-10-31 18:24:18 +00:00
}
if offset == 0 {
return tssCached , nil
}
2023-11-11 11:09:25 +00:00
// Calculate rf(m[offset] @ timestamp)
2023-10-31 18:24:18 +00:00
tssStart , err := evalAt ( qtChild , timestamp , offset )
if err != nil {
return nil , err
}
2023-11-11 11:09:25 +00:00
if hasDuplicateSeries ( tssStart ) {
qtChild . Printf ( "cannot apply instant rollup optimization, since tssStart contains duplicate series" )
return evalAt ( qtChild , timestamp , window )
}
// Calculate rf(m[offset] @ (timestamp - window))
2023-10-31 18:24:18 +00:00
tssEnd , err := evalAt ( qtChild , timestamp - window , offset )
if err != nil {
return nil , err
}
2023-11-11 11:09:25 +00:00
if hasDuplicateSeries ( tssEnd ) {
qtChild . Printf ( "cannot apply instant rollup optimization, since tssEnd contains duplicate series" )
return evalAt ( qtChild , timestamp , window )
2023-10-31 18:24:18 +00:00
}
2023-11-11 11:09:25 +00:00
// Calculate the result
2024-01-30 20:03:34 +00:00
tss := getSumInstantValues ( qtChild , tssCached , tssStart , tssEnd , timestamp )
2023-10-31 18:24:18 +00:00
return tss , nil
default :
qt . Printf ( "instant rollup optimization isn't implemented for %s()" , funcName )
return evalAt ( qt , timestamp , window )
}
}
2023-11-11 11:09:25 +00:00
func hasDuplicateSeries ( tss [ ] * timeseries ) bool {
2023-11-11 11:30:08 +00:00
if len ( tss ) <= 1 {
return false
}
2023-11-11 11:09:25 +00:00
m := make ( map [ string ] struct { } , len ( tss ) )
bb := bbPool . Get ( )
defer bbPool . Put ( bb )
for _ , ts := range tss {
bb . B = marshalMetricNameSorted ( bb . B [ : 0 ] , & ts . MetricName )
if _ , ok := m [ string ( bb . B ) ] ; ok {
return true
}
m [ string ( bb . B ) ] = struct { } { }
}
return false
}
2024-01-30 20:03:34 +00:00
func getMinInstantValues ( qt * querytracer . Tracer , tssCached , tssStart , tssEnd [ ] * timeseries , timestamp int64 ) ( [ ] * timeseries , bool ) {
qt = qt . NewChild ( "calculate the minimum for instant values across series; cached=%d, start=%d, end=%d, timestamp=%d" , len ( tssCached ) , len ( tssStart ) , len ( tssEnd ) , timestamp )
2023-11-11 11:09:25 +00:00
defer qt . Done ( )
getMin := func ( a , b float64 ) float64 {
if a < b {
return a
}
return b
}
2024-01-30 20:03:34 +00:00
tss , ok := getMinMaxInstantValues ( tssCached , tssStart , tssEnd , timestamp , getMin )
2023-11-14 01:52:39 +00:00
qt . Printf ( "resulting series=%d; ok=%v" , len ( tss ) , ok )
return tss , ok
2023-11-11 11:09:25 +00:00
}
2024-01-30 20:03:34 +00:00
func getMaxInstantValues ( qt * querytracer . Tracer , tssCached , tssStart , tssEnd [ ] * timeseries , timestamp int64 ) ( [ ] * timeseries , bool ) {
qt = qt . NewChild ( "calculate the maximum for instant values across series; cached=%d, start=%d, end=%d, timestamp=%d" , len ( tssCached ) , len ( tssStart ) , len ( tssEnd ) , timestamp )
2023-11-11 11:09:25 +00:00
defer qt . Done ( )
getMax := func ( a , b float64 ) float64 {
if a > b {
return a
}
return b
}
2024-01-30 20:03:34 +00:00
tss , ok := getMinMaxInstantValues ( tssCached , tssStart , tssEnd , timestamp , getMax )
2023-11-11 11:09:25 +00:00
qt . Printf ( "resulting series=%d" , len ( tss ) )
2023-11-14 01:52:39 +00:00
return tss , ok
2023-11-11 11:09:25 +00:00
}
2024-01-30 20:03:34 +00:00
func getMinMaxInstantValues ( tssCached , tssStart , tssEnd [ ] * timeseries , timestamp int64 , f func ( a , b float64 ) float64 ) ( [ ] * timeseries , bool ) {
2023-11-11 11:09:25 +00:00
assertInstantValues ( tssCached )
assertInstantValues ( tssStart )
2023-11-14 01:52:39 +00:00
assertInstantValues ( tssEnd )
2023-11-11 11:09:25 +00:00
bb := bbPool . Get ( )
defer bbPool . Put ( bb )
2023-11-14 01:52:39 +00:00
m := make ( map [ string ] * timeseries , len ( tssCached ) )
2023-11-11 11:09:25 +00:00
for _ , ts := range tssCached {
bb . B = marshalMetricNameSorted ( bb . B [ : 0 ] , & ts . MetricName )
if _ , ok := m [ string ( bb . B ) ] ; ok {
logger . Panicf ( "BUG: duplicate series found: %s" , & ts . MetricName )
}
m [ string ( bb . B ) ] = ts
}
2023-11-14 01:52:39 +00:00
mStart := make ( map [ string ] * timeseries , len ( tssStart ) )
2023-11-11 11:09:25 +00:00
for _ , ts := range tssStart {
bb . B = marshalMetricNameSorted ( bb . B [ : 0 ] , & ts . MetricName )
2023-11-14 02:34:09 +00:00
if _ , ok := mStart [ string ( bb . B ) ] ; ok {
2023-11-14 01:52:39 +00:00
logger . Panicf ( "BUG: duplicate series found: %s" , & ts . MetricName )
}
2023-11-14 02:34:09 +00:00
mStart [ string ( bb . B ) ] = ts
2023-11-11 11:09:25 +00:00
tsCached := m [ string ( bb . B ) ]
if tsCached != nil && ! math . IsNaN ( tsCached . Values [ 0 ] ) {
if ! math . IsNaN ( ts . Values [ 0 ] ) {
tsCached . Values [ 0 ] = f ( ts . Values [ 0 ] , tsCached . Values [ 0 ] )
}
} else {
m [ string ( bb . B ) ] = ts
}
}
2023-11-14 01:52:39 +00:00
for _ , ts := range tssEnd {
bb . B = marshalMetricNameSorted ( bb . B [ : 0 ] , & ts . MetricName )
tsCached := m [ string ( bb . B ) ]
if tsCached != nil && ! math . IsNaN ( tsCached . Values [ 0 ] ) && ! math . IsNaN ( ts . Values [ 0 ] ) {
if ts . Values [ 0 ] == f ( ts . Values [ 0 ] , tsCached . Values [ 0 ] ) {
tsStart := mStart [ string ( bb . B ) ]
if tsStart == nil || math . IsNaN ( tsStart . Values [ 0 ] ) || tsStart . Values [ 0 ] != f ( ts . Values [ 0 ] , tsStart . Values [ 0 ] ) {
return nil , false
}
}
}
}
2023-11-11 11:09:25 +00:00
rvs := make ( [ ] * timeseries , 0 , len ( m ) )
for _ , ts := range m {
rvs = append ( rvs , ts )
}
2024-01-30 20:03:34 +00:00
setInstantTimestamp ( rvs , timestamp )
2023-11-14 01:52:39 +00:00
return rvs , true
2023-11-11 11:09:25 +00:00
}
2024-01-30 20:03:34 +00:00
// getSumInstantValues aggregates tssCached, tssStart, tssEnd time series
// into a new time series with value = tssCached + tssStart - tssEnd
func getSumInstantValues ( qt * querytracer . Tracer , tssCached , tssStart , tssEnd [ ] * timeseries , timestamp int64 ) [ ] * timeseries {
qt = qt . NewChild ( "calculate the sum for instant values across series; cached=%d, start=%d, end=%d, timestamp=%d" , len ( tssCached ) , len ( tssStart ) , len ( tssEnd ) , timestamp )
2023-10-31 18:24:18 +00:00
defer qt . Done ( )
assertInstantValues ( tssCached )
assertInstantValues ( tssStart )
assertInstantValues ( tssEnd )
m := make ( map [ string ] * timeseries , len ( tssCached ) )
bb := bbPool . Get ( )
defer bbPool . Put ( bb )
for _ , ts := range tssCached {
bb . B = marshalMetricNameSorted ( bb . B [ : 0 ] , & ts . MetricName )
2023-11-11 11:09:25 +00:00
if _ , ok := m [ string ( bb . B ) ] ; ok {
logger . Panicf ( "BUG: duplicate series found: %s" , & ts . MetricName )
2023-10-31 18:24:18 +00:00
}
m [ string ( bb . B ) ] = ts
}
for _ , ts := range tssStart {
bb . B = marshalMetricNameSorted ( bb . B [ : 0 ] , & ts . MetricName )
tsCached := m [ string ( bb . B ) ]
if tsCached != nil && ! math . IsNaN ( tsCached . Values [ 0 ] ) {
if ! math . IsNaN ( ts . Values [ 0 ] ) {
tsCached . Values [ 0 ] += ts . Values [ 0 ]
}
} else {
m [ string ( bb . B ) ] = ts
}
}
for _ , ts := range tssEnd {
bb . B = marshalMetricNameSorted ( bb . B [ : 0 ] , & ts . MetricName )
tsCached := m [ string ( bb . B ) ]
if tsCached != nil && ! math . IsNaN ( tsCached . Values [ 0 ] ) {
if ! math . IsNaN ( ts . Values [ 0 ] ) {
tsCached . Values [ 0 ] -= ts . Values [ 0 ]
}
}
}
rvs := make ( [ ] * timeseries , 0 , len ( m ) )
for _ , ts := range m {
rvs = append ( rvs , ts )
}
2024-01-30 20:03:34 +00:00
setInstantTimestamp ( rvs , timestamp )
2023-10-31 18:24:18 +00:00
qt . Printf ( "resulting series=%d" , len ( rvs ) )
2023-11-11 11:09:25 +00:00
return rvs
2023-10-31 18:24:18 +00:00
}
2024-01-30 20:03:34 +00:00
func setInstantTimestamp ( tss [ ] * timeseries , timestamp int64 ) {
for _ , ts := range tss {
ts . Timestamps [ 0 ] = timestamp
}
}
2023-10-31 18:24:18 +00:00
func assertInstantValues ( tss [ ] * timeseries ) {
for _ , ts := range tss {
if len ( ts . Values ) != 1 {
logger . Panicf ( "BUG: instant series must contain a single value; got %d values" , len ( ts . Values ) )
}
if len ( ts . Timestamps ) != 1 {
logger . Panicf ( "BUG: instant series must contain a single timestamp; got %d timestamps" , len ( ts . Timestamps ) )
}
}
}
2019-05-22 21:16:55 +00:00
var (
rollupResultCacheFullHits = metrics . NewCounter ( ` vm_rollup_result_cache_full_hits_total ` )
rollupResultCachePartialHits = metrics . NewCounter ( ` vm_rollup_result_cache_partial_hits_total ` )
rollupResultCacheMiss = metrics . NewCounter ( ` vm_rollup_result_cache_miss_total ` )
2023-10-31 12:31:09 +00:00
memoryIntensiveQueries = metrics . NewCounter ( ` vm_memory_intensive_queries_total ` )
2019-05-22 21:16:55 +00:00
)
2022-05-31 23:29:19 +00:00
func evalRollupFuncWithMetricExpr ( qt * querytracer . Tracer , ec * EvalConfig , funcName string , rf rollupFunc ,
2021-07-12 14:16:38 +00:00
expr metricsql . Expr , me * metricsql . MetricExpr , iafc * incrementalAggrFuncContext , windowExpr * metricsql . DurationExpr ) ( [ ] * timeseries , error ) {
2023-08-12 11:47:50 +00:00
window , err := windowExpr . NonNegativeDuration ( ec . Step )
if err != nil {
return nil , fmt . Errorf ( "cannot parse lookbehind window in square brackets at %s: %w" , expr . AppendString ( nil ) , err )
}
2019-07-10 09:57:27 +00:00
if me . IsEmpty ( ) {
return evalNumber ( ec , nan ) , nil
}
2023-10-31 18:24:18 +00:00
if ec . Start == ec . End {
rvs , err := evalInstantRollup ( qt , ec , funcName , rf , expr , me , iafc , window )
if err != nil {
err = & UserReadableError {
Err : err ,
}
return nil , err
}
return rvs , nil
}
2023-11-16 14:52:38 +00:00
pointsPerSeries := 1 + ( ec . End - ec . Start ) / ec . Step
evalWithConfig := func ( ec * EvalConfig ) ( [ ] * timeseries , error ) {
tss , err := evalRollupFuncNoCache ( qt , ec , funcName , rf , expr , me , iafc , window , pointsPerSeries )
if err != nil {
err = & UserReadableError {
Err : err ,
}
return nil , err
}
return tss , nil
}
if ! ec . mayCache ( ) {
qt . Printf ( "do not fetch series from cache, since it is disabled in the current context" )
return evalWithConfig ( ec )
}
2023-10-31 18:24:18 +00:00
2023-11-16 14:52:38 +00:00
// Search for cached results.
2023-10-31 18:24:18 +00:00
tssCached , start := rollupResultCacheV . GetSeries ( qt , ec , expr , window )
ec . QueryStats . addSeriesFetched ( len ( tssCached ) )
2019-05-22 21:16:55 +00:00
if start > ec . End {
2023-10-31 21:22:34 +00:00
qt . Printf ( "the result is fully cached" )
2019-05-22 21:16:55 +00:00
rollupResultCacheFullHits . Inc ( )
return tssCached , nil
}
if start > ec . Start {
2023-10-31 21:22:34 +00:00
qt . Printf ( "partial cache hit" )
2019-05-22 21:16:55 +00:00
rollupResultCachePartialHits . Inc ( )
} else {
2023-10-31 21:22:34 +00:00
qt . Printf ( "cache miss" )
2019-05-22 21:16:55 +00:00
rollupResultCacheMiss . Inc ( )
}
2023-11-16 14:52:38 +00:00
// Fetch missing results, which aren't cached yet.
ecNew := ec
if start != ec . Start {
ecNew = copyEvalConfig ( ec )
ecNew . Start = start
2023-10-31 18:24:18 +00:00
}
2023-11-16 14:52:38 +00:00
tss , err := evalWithConfig ( ecNew )
2023-10-31 18:24:18 +00:00
if err != nil {
2023-11-16 14:52:38 +00:00
return nil , err
2023-10-31 18:24:18 +00:00
}
2023-11-16 14:52:38 +00:00
// Merge cached results with the fetched additional results.
rvs , ok := mergeSeries ( qt , tssCached , tss , start , ec )
if ! ok {
// Cannot merge series - fall back to non-cached querying.
qt . Printf ( "fall back to non-caching querying" )
rvs , err = evalWithConfig ( ec )
if err != nil {
return nil , err
}
2023-10-31 18:24:18 +00:00
}
2023-11-16 14:52:38 +00:00
rollupResultCacheV . PutSeries ( qt , ec , expr , window , rvs )
2023-10-31 18:24:18 +00:00
return rvs , nil
}
// evalRollupFuncNoCache calculates the given rf with the given lookbehind window.
//
// pointsPerSeries is used only for estimating the needed memory for query processing
func evalRollupFuncNoCache ( qt * querytracer . Tracer , ec * EvalConfig , funcName string , rf rollupFunc ,
expr metricsql . Expr , me * metricsql . MetricExpr , iafc * incrementalAggrFuncContext , window , pointsPerSeries int64 ) ( [ ] * timeseries , error ) {
if qt . Enabled ( ) {
qt = qt . NewChild ( "rollup %s: timeRange=%s, step=%d, window=%d" , expr . AppendString ( nil ) , ec . timeRangeString ( ) , ec . Step , window )
defer qt . Done ( )
}
2023-10-31 21:22:34 +00:00
if window < 0 {
2023-10-31 18:24:18 +00:00
return nil , nil
}
// Obtain rollup configs before fetching data from db, so type errors could be caught earlier.
sharedTimestamps := getTimestamps ( ec . Start , ec . End , ec . Step , ec . MaxPointsPerSeries )
preFunc , rcs , err := getRollupConfigs ( funcName , rf , expr , ec . Start , ec . End , ec . Step , ec . MaxPointsPerSeries , window , ec . LookbackDelta , sharedTimestamps )
2020-01-11 12:40:32 +00:00
if err != nil {
return nil , err
}
2023-12-06 12:15:48 +00:00
// Fetch the result.
2023-07-16 06:48:21 +00:00
tfss := searchutils . ToTagFilterss ( me . LabelFilterss )
tfss = searchutils . JoinTagFilterss ( tfss , ec . EnforcedTagFilterss )
2023-10-31 18:24:18 +00:00
minTimestamp := ec . Start
if needSilenceIntervalForRollupFunc ( funcName ) {
2023-12-06 12:15:48 +00:00
minTimestamp -= maxSilenceInterval ( )
2023-10-31 18:24:18 +00:00
}
2020-02-05 17:20:54 +00:00
if window > ec . Step {
minTimestamp -= window
} else {
minTimestamp -= ec . Step
}
2022-03-26 08:17:37 +00:00
sq := storage . NewSearchQuery ( minTimestamp , ec . End , tfss , ec . MaxSeries )
2022-06-28 09:55:20 +00:00
rss , err := netstorage . ProcessSearchQuery ( qt , sq , ec . Deadline )
2019-05-22 21:16:55 +00:00
if err != nil {
2023-10-31 18:24:18 +00:00
return nil , err
2019-05-22 21:16:55 +00:00
}
rssLen := rss . Len ( )
if rssLen == 0 {
rss . Cancel ( )
2023-10-31 18:24:18 +00:00
return nil , nil
2019-05-22 21:16:55 +00:00
}
2023-03-27 22:11:40 +00:00
ec . QueryStats . addSeriesFetched ( rssLen )
2019-05-22 21:16:55 +00:00
2023-10-31 18:24:18 +00:00
// Verify timeseries fit available memory during rollup calculations.
2019-11-08 16:45:25 +00:00
timeseriesLen := rssLen
if iafc != nil {
2020-05-12 16:06:54 +00:00
// Incremental aggregates require holding only GOMAXPROCS timeseries in memory.
2020-12-08 18:49:32 +00:00
timeseriesLen = cgroup . AvailableCPUs ( )
2019-11-08 16:45:25 +00:00
if iafc . ae . Modifier . Op != "" {
2020-05-12 16:06:54 +00:00
if iafc . ae . Limit > 0 {
// There is an explicit limit on the number of output time series.
timeseriesLen *= iafc . ae . Limit
} else {
// Increase the number of timeseries for non-empty group list: `aggr() by (something)`,
// since each group can have own set of time series in memory.
timeseriesLen *= 1000
}
2020-04-28 21:18:47 +00:00
}
// The maximum number of output time series is limited by rssLen.
if timeseriesLen > rssLen {
timeseriesLen = rssLen
2019-11-08 16:45:25 +00:00
}
}
2023-10-31 18:24:18 +00:00
rollupPoints := mulNoOverflow ( pointsPerSeries , int64 ( timeseriesLen * len ( rcs ) ) )
rollupMemorySize := sumNoOverflow ( mulNoOverflow ( int64 ( rssLen ) , 1000 ) , mulNoOverflow ( rollupPoints , 16 ) )
2023-02-24 02:40:31 +00:00
if maxMemory := int64 ( logQueryMemoryUsage . N ) ; maxMemory > 0 && rollupMemorySize > maxMemory {
2023-10-31 12:31:09 +00:00
memoryIntensiveQueries . Inc ( )
2023-02-24 02:40:31 +00:00
requestURI := ec . GetRequestURI ( )
logger . Warnf ( "remoteAddr=%s, requestURI=%s: the %s requires %d bytes of memory for processing; " +
"logging this query, since it exceeds the -search.logQueryMemoryUsage=%d; " +
"the query selects %d time series and generates %d points across all the time series; try reducing the number of selected time series" ,
ec . QuotedRemoteAddr , requestURI , expr . AppendString ( nil ) , rollupMemorySize , maxMemory , timeseriesLen * len ( rcs ) , rollupPoints )
}
2022-10-12 06:23:43 +00:00
if maxMemory := int64 ( maxMemoryPerQuery . N ) ; maxMemory > 0 && rollupMemorySize > maxMemory {
2019-05-22 21:16:55 +00:00
rss . Cancel ( )
2023-10-31 18:24:18 +00:00
err := fmt . Errorf ( "not enough memory for processing %s, which returns %d data points across %d time series with %d points in each time series " +
"according to -search.maxMemoryPerQuery=%d; requested memory: %d bytes; " +
"possible solutions are: reducing the number of matching time series; increasing `step` query arg (step=%gs); " +
"increasing -search.maxMemoryPerQuery" ,
expr . AppendString ( nil ) , rollupPoints , timeseriesLen * len ( rcs ) , pointsPerSeries , maxMemory , rollupMemorySize , float64 ( ec . Step ) / 1e3 )
return nil , err
2019-05-22 21:16:55 +00:00
}
2022-10-10 18:43:36 +00:00
rml := getRollupMemoryLimiter ( )
if ! rml . Get ( uint64 ( rollupMemorySize ) ) {
rss . Cancel ( )
2023-10-31 18:24:18 +00:00
err := fmt . Errorf ( "not enough memory for processing %s, which returns %d data points across %d time series with %d points in each time series; " +
"total available memory for concurrent requests: %d bytes; requested memory: %d bytes; " +
"possible solutions are: reducing the number of matching time series; increasing `step` query arg (step=%gs); " +
"switching to node with more RAM; increasing -memory.allowedPercent" ,
expr . AppendString ( nil ) , rollupPoints , timeseriesLen * len ( rcs ) , pointsPerSeries , rml . MaxSize , uint64 ( rollupMemorySize ) , float64 ( ec . Step ) / 1e3 )
return nil , err
2022-10-10 18:43:36 +00:00
}
defer rml . Put ( uint64 ( rollupMemorySize ) )
2023-10-31 18:24:18 +00:00
qt . Printf ( "the rollup evaluation needs an estimated %d bytes of RAM for %d series and %d points per series (summary %d points)" ,
rollupMemorySize , timeseriesLen , pointsPerSeries , rollupPoints )
2019-05-22 21:16:55 +00:00
// Evaluate rollup
2022-01-14 02:05:39 +00:00
keepMetricNames := getKeepMetricNames ( expr )
2019-07-10 09:57:27 +00:00
if iafc != nil {
2023-10-31 18:24:18 +00:00
return evalRollupWithIncrementalAggregate ( qt , funcName , keepMetricNames , iafc , rss , rcs , preFunc , sharedTimestamps )
2019-07-10 09:57:27 +00:00
}
2023-10-31 18:24:18 +00:00
return evalRollupNoIncrementalAggregate ( qt , funcName , keepMetricNames , rss , rcs , preFunc , sharedTimestamps )
2019-05-22 21:16:55 +00:00
}
2022-10-10 18:43:36 +00:00
var (
rollupMemoryLimiter memoryLimiter
rollupMemoryLimiterOnce sync . Once
)
2019-06-12 19:25:37 +00:00
2022-10-10 18:43:36 +00:00
func getRollupMemoryLimiter ( ) * memoryLimiter {
rollupMemoryLimiterOnce . Do ( func ( ) {
rollupMemoryLimiter . MaxSize = uint64 ( memory . Allowed ( ) ) / 4
} )
return & rollupMemoryLimiter
2019-06-12 19:25:37 +00:00
}
2023-12-06 12:15:48 +00:00
func maxSilenceInterval ( ) int64 {
d := minStalenessInterval . Milliseconds ( )
if d <= 0 {
d = 5 * 60 * 1000
}
return d
}
2023-10-31 18:24:18 +00:00
func needSilenceIntervalForRollupFunc ( funcName string ) bool {
2023-12-06 12:15:48 +00:00
// All the rollup functions, which do not rely on the previous sample
// before the lookbehind window (aka prevValue and realPrevValue), do not need silence interval.
2023-10-31 18:24:18 +00:00
switch strings . ToLower ( funcName ) {
2023-12-06 12:15:48 +00:00
case "default_rollup" :
// The default_rollup implicitly relies on the previous samples in order to fill gaps.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5388
return true
2023-10-31 18:24:18 +00:00
case
"absent_over_time" ,
"avg_over_time" ,
"count_eq_over_time" ,
"count_gt_over_time" ,
"count_le_over_time" ,
"count_ne_over_time" ,
"count_over_time" ,
"first_over_time" ,
"histogram_over_time" ,
"hoeffding_bound_lower" ,
"hoeffding_bound_upper" ,
"last_over_time" ,
"mad_over_time" ,
"max_over_time" ,
"median_over_time" ,
"min_over_time" ,
"predict_linear" ,
"present_over_time" ,
"quantile_over_time" ,
"quantiles_over_time" ,
"range_over_time" ,
"share_gt_over_time" ,
"share_le_over_time" ,
"share_eq_over_time" ,
"stale_samples_over_time" ,
"stddev_over_time" ,
"stdvar_over_time" ,
"sum_over_time" ,
"tfirst_over_time" ,
"timestamp" ,
"timestamp_with_name" ,
"tlast_over_time" ,
"tmax_over_time" ,
"tmin_over_time" ,
"zscore_over_time" :
return false
default :
return true
}
}
2022-05-31 23:29:19 +00:00
func evalRollupWithIncrementalAggregate ( qt * querytracer . Tracer , funcName string , keepMetricNames bool ,
iafc * incrementalAggrFuncContext , rss * netstorage . Results , rcs [ ] * rollupConfig ,
2021-09-17 20:33:15 +00:00
preFunc func ( values [ ] float64 , timestamps [ ] int64 ) , sharedTimestamps [ ] int64 ) ( [ ] * timeseries , error ) {
2022-06-28 16:26:17 +00:00
qt = qt . NewChild ( "rollup %s() with incremental aggregation %s() over %d series; rollupConfigs=%s" , funcName , iafc . ae . Name , rss . Len ( ) , rcs )
2022-06-08 18:05:17 +00:00
defer qt . Done ( )
2022-06-28 16:26:17 +00:00
var samplesScannedTotal uint64
2022-05-31 23:29:19 +00:00
err := rss . RunParallel ( qt , func ( rs * netstorage . Result , workerID uint ) error {
2021-09-17 20:33:15 +00:00
rs . Values , rs . Timestamps = dropStaleNaNs ( funcName , rs . Values , rs . Timestamps )
2019-07-10 09:57:27 +00:00
preFunc ( rs . Values , rs . Timestamps )
ts := getTimeseries ( )
defer putTimeseries ( ts )
for _ , rc := range rcs {
2022-01-14 02:05:39 +00:00
if tsm := newTimeseriesMap ( funcName , keepMetricNames , sharedTimestamps , & rs . MetricName ) ; tsm != nil {
2022-06-28 16:26:17 +00:00
samplesScanned := rc . DoTimeseriesMap ( tsm , rs . Values , rs . Timestamps )
2020-01-03 21:50:47 +00:00
for _ , ts := range tsm . m {
iafc . updateTimeseries ( ts , workerID )
}
2022-06-28 16:26:17 +00:00
atomic . AddUint64 ( & samplesScannedTotal , samplesScanned )
2020-01-03 21:50:47 +00:00
continue
}
2019-07-10 09:57:27 +00:00
ts . Reset ( )
2022-06-28 16:26:17 +00:00
samplesScanned := doRollupForTimeseries ( funcName , keepMetricNames , rc , ts , & rs . MetricName , rs . Values , rs . Timestamps , sharedTimestamps )
atomic . AddUint64 ( & samplesScannedTotal , samplesScanned )
2019-07-12 12:51:02 +00:00
iafc . updateTimeseries ( ts , workerID )
2019-07-25 18:48:12 +00:00
// ts.Timestamps points to sharedTimestamps. Zero it, so it can be re-used.
2019-07-10 09:57:27 +00:00
ts . Timestamps = nil
2019-07-25 18:48:12 +00:00
ts . denyReuse = false
2019-07-10 09:57:27 +00:00
}
2020-09-27 20:17:14 +00:00
return nil
2019-07-10 09:57:27 +00:00
} )
if err != nil {
return nil , err
}
tss := iafc . finalizeTimeseries ( )
2022-06-28 17:18:08 +00:00
rowsScannedPerQuery . Update ( float64 ( samplesScannedTotal ) )
2022-06-28 16:26:17 +00:00
qt . Printf ( "series after aggregation with %s(): %d; samplesScanned=%d" , iafc . ae . Name , len ( tss ) , samplesScannedTotal )
2019-07-10 09:57:27 +00:00
return tss , nil
}
2022-05-31 23:29:19 +00:00
func evalRollupNoIncrementalAggregate ( qt * querytracer . Tracer , funcName string , keepMetricNames bool , rss * netstorage . Results , rcs [ ] * rollupConfig ,
2021-09-17 20:33:15 +00:00
preFunc func ( values [ ] float64 , timestamps [ ] int64 ) , sharedTimestamps [ ] int64 ) ( [ ] * timeseries , error ) {
2022-06-28 16:26:17 +00:00
qt = qt . NewChild ( "rollup %s() over %d series; rollupConfigs=%s" , funcName , rss . Len ( ) , rcs )
2022-06-08 18:05:17 +00:00
defer qt . Done ( )
2023-03-25 06:07:12 +00:00
2022-06-28 16:26:17 +00:00
var samplesScannedTotal uint64
2023-03-25 06:34:34 +00:00
tsw := getTimeseriesByWorkerID ( )
seriesByWorkerID := tsw . byWorkerID
seriesLen := rss . Len ( )
2022-05-31 23:29:19 +00:00
err := rss . RunParallel ( qt , func ( rs * netstorage . Result , workerID uint ) error {
2021-09-17 20:33:15 +00:00
rs . Values , rs . Timestamps = dropStaleNaNs ( funcName , rs . Values , rs . Timestamps )
2019-07-10 09:57:27 +00:00
preFunc ( rs . Values , rs . Timestamps )
for _ , rc := range rcs {
2022-01-14 02:05:39 +00:00
if tsm := newTimeseriesMap ( funcName , keepMetricNames , sharedTimestamps , & rs . MetricName ) ; tsm != nil {
2022-06-28 16:26:17 +00:00
samplesScanned := rc . DoTimeseriesMap ( tsm , rs . Values , rs . Timestamps )
atomic . AddUint64 ( & samplesScannedTotal , samplesScanned )
2023-03-25 06:07:12 +00:00
seriesByWorkerID [ workerID ] . tss = tsm . AppendTimeseriesTo ( seriesByWorkerID [ workerID ] . tss )
2020-01-03 21:50:47 +00:00
continue
}
2019-07-10 09:57:27 +00:00
var ts timeseries
2022-06-28 16:26:17 +00:00
samplesScanned := doRollupForTimeseries ( funcName , keepMetricNames , rc , & ts , & rs . MetricName , rs . Values , rs . Timestamps , sharedTimestamps )
atomic . AddUint64 ( & samplesScannedTotal , samplesScanned )
2023-03-25 06:07:12 +00:00
seriesByWorkerID [ workerID ] . tss = append ( seriesByWorkerID [ workerID ] . tss , & ts )
2019-07-10 09:57:27 +00:00
}
2020-09-27 20:17:14 +00:00
return nil
2019-07-10 09:57:27 +00:00
} )
if err != nil {
return nil , err
}
2023-03-25 06:34:34 +00:00
tss := make ( [ ] * timeseries , 0 , seriesLen * len ( rcs ) )
2023-03-25 06:07:12 +00:00
for i := range seriesByWorkerID {
tss = append ( tss , seriesByWorkerID [ i ] . tss ... )
}
2023-03-25 06:34:34 +00:00
putTimeseriesByWorkerID ( tsw )
2023-03-25 06:07:12 +00:00
2022-06-28 17:18:08 +00:00
rowsScannedPerQuery . Update ( float64 ( samplesScannedTotal ) )
qt . Printf ( "samplesScanned=%d" , samplesScannedTotal )
2019-07-10 09:57:27 +00:00
return tss , nil
}
2022-01-14 02:05:39 +00:00
func doRollupForTimeseries ( funcName string , keepMetricNames bool , rc * rollupConfig , tsDst * timeseries , mnSrc * storage . MetricName ,
2022-06-28 16:26:17 +00:00
valuesSrc [ ] float64 , timestampsSrc [ ] int64 , sharedTimestamps [ ] int64 ) uint64 {
2019-07-25 18:48:12 +00:00
tsDst . MetricName . CopyFrom ( mnSrc )
if len ( rc . TagValue ) > 0 {
tsDst . MetricName . AddTag ( "rollup" , rc . TagValue )
}
2022-01-14 02:05:39 +00:00
if ! keepMetricNames && ! rollupFuncsKeepMetricName [ funcName ] {
2019-07-25 18:48:12 +00:00
tsDst . MetricName . ResetMetricGroup ( )
}
2022-06-28 16:26:17 +00:00
var samplesScanned uint64
tsDst . Values , samplesScanned = rc . Do ( tsDst . Values [ : 0 ] , valuesSrc , timestampsSrc )
2019-07-25 18:48:12 +00:00
tsDst . Timestamps = sharedTimestamps
tsDst . denyReuse = true
2022-06-28 16:26:17 +00:00
return samplesScanned
2019-07-25 18:48:12 +00:00
}
2023-03-25 06:34:34 +00:00
type timeseriesWithPadding struct {
tss [ ] * timeseries
// The padding prevents false sharing on widespread platforms with
// 128 mod (cache line size) = 0 .
_ [ 128 - unsafe . Sizeof ( [ ] * timeseries { } ) % 128 ] byte
}
type timeseriesByWorkerID struct {
byWorkerID [ ] timeseriesWithPadding
}
func ( tsw * timeseriesByWorkerID ) reset ( ) {
byWorkerID := tsw . byWorkerID
for i := range byWorkerID {
2023-03-25 06:46:30 +00:00
byWorkerID [ i ] . tss = nil
2023-03-25 06:34:34 +00:00
}
}
func getTimeseriesByWorkerID ( ) * timeseriesByWorkerID {
v := timeseriesByWorkerIDPool . Get ( )
if v == nil {
return & timeseriesByWorkerID {
byWorkerID : make ( [ ] timeseriesWithPadding , netstorage . MaxWorkers ( ) ) ,
}
}
return v . ( * timeseriesByWorkerID )
}
func putTimeseriesByWorkerID ( tsw * timeseriesByWorkerID ) {
tsw . reset ( )
timeseriesByWorkerIDPool . Put ( tsw )
}
var timeseriesByWorkerIDPool sync . Pool
2019-05-22 21:16:55 +00:00
var bbPool bytesutil . ByteBufferPool
func evalNumber ( ec * EvalConfig , n float64 ) [ ] * timeseries {
var ts timeseries
ts . denyReuse = true
timestamps := ec . getSharedTimestamps ( )
values := make ( [ ] float64 , len ( timestamps ) )
for i := range timestamps {
values [ i ] = n
}
ts . Values = values
ts . Timestamps = timestamps
return [ ] * timeseries { & ts }
}
func evalString ( ec * EvalConfig , s string ) [ ] * timeseries {
rv := evalNumber ( ec , nan )
rv [ 0 ] . MetricName . MetricGroup = append ( rv [ 0 ] . MetricName . MetricGroup [ : 0 ] , s ... )
return rv
}
func evalTime ( ec * EvalConfig ) [ ] * timeseries {
rv := evalNumber ( ec , nan )
timestamps := rv [ 0 ] . Timestamps
values := rv [ 0 ] . Values
for i , ts := range timestamps {
2020-09-08 11:00:47 +00:00
values [ i ] = float64 ( ts ) / 1e3
2019-05-22 21:16:55 +00:00
}
return rv
}
2019-06-12 19:25:37 +00:00
func mulNoOverflow ( a , b int64 ) int64 {
if math . MaxInt64 / b < a {
// Overflow
return math . MaxInt64
}
return a * b
}
2019-12-25 19:35:47 +00:00
2022-10-07 22:07:42 +00:00
func sumNoOverflow ( a , b int64 ) int64 {
if math . MaxInt64 - a < b {
// Overflow
return math . MaxInt64
}
return a + b
}
2021-09-17 20:33:15 +00:00
func dropStaleNaNs ( funcName string , values [ ] float64 , timestamps [ ] int64 ) ( [ ] float64 , [ ] int64 ) {
2022-01-13 23:48:04 +00:00
if * noStaleMarkers || funcName == "default_rollup" || funcName == "stale_samples_over_time" {
2021-08-17 08:00:17 +00:00
// Do not drop Prometheus staleness marks (aka stale NaNs) for default_rollup() function,
// since it uses them for Prometheus-style staleness detection.
2022-01-13 23:48:04 +00:00
// Do not drop staleness marks for stale_samples_over_time() function, since it needs
// to calculate the number of staleness markers.
2021-08-17 08:00:17 +00:00
return values , timestamps
}
// Remove Prometheus staleness marks, so non-default rollup functions don't hit NaN values.
2021-08-15 10:20:02 +00:00
hasStaleSamples := false
for _ , v := range values {
if decimal . IsStaleNaN ( v ) {
hasStaleSamples = true
break
}
}
if ! hasStaleSamples {
// Fast path: values have no Prometheus staleness marks.
return values , timestamps
}
// Slow path: drop Prometheus staleness marks from values.
dstValues := values [ : 0 ]
dstTimestamps := timestamps [ : 0 ]
for i , v := range values {
if decimal . IsStaleNaN ( v ) {
continue
}
dstValues = append ( dstValues , v )
dstTimestamps = append ( dstTimestamps , timestamps [ i ] )
}
return dstValues , dstTimestamps
}