package promql import ( "flag" "fmt" "math" "regexp" "sort" "strings" "sync" "sync/atomic" "time" "unsafe" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/netstorage" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/searchutils" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/memory" "github.com/VictoriaMetrics/VictoriaMetrics/lib/querytracer" "github.com/VictoriaMetrics/VictoriaMetrics/lib/storage" "github.com/VictoriaMetrics/VictoriaMetrics/lib/stringsutil" "github.com/VictoriaMetrics/metrics" "github.com/VictoriaMetrics/metricsql" ) var ( disableCache = flag.Bool("search.disableCache", false, "Whether to disable response caching. This may be useful when ingesting historical data. "+ "See https://docs.victoriametrics.com/#backfilling . See also -search.resetRollupResultCacheOnStartup") maxPointsSubqueryPerTimeseries = flag.Int("search.maxPointsSubqueryPerTimeseries", 100e3, "The maximum number of points per series, which can be generated by subquery. "+ "See https://valyala.medium.com/prometheus-subqueries-in-victoriametrics-9b1492b720b3") maxMemoryPerQuery = flagutil.NewBytes("search.maxMemoryPerQuery", 0, "The maximum amounts of memory a single query may consume. "+ "Queries requiring more memory are rejected. The total memory limit for concurrently executed queries can be estimated "+ "as -search.maxMemoryPerQuery multiplied by -search.maxConcurrentRequests . "+ "See also -search.logQueryMemoryUsage") logQueryMemoryUsage = flagutil.NewBytes("search.logQueryMemoryUsage", 0, "Log query and increment vm_memory_intensive_queries_total metric each time "+ "the query requires more memory than specified by this flag. "+ "This may help detecting and optimizing heavy queries. Query logging is disabled by default. "+ "See also -search.logSlowQueryDuration and -search.maxMemoryPerQuery") noStaleMarkers = flag.Bool("search.noStaleMarkers", false, "Set this flag to true if the database doesn't contain Prometheus stale markers, "+ "so there is no need in spending additional CPU time on its handling. Staleness markers may exist only in data obtained from Prometheus scrape targets") minWindowForInstantRollupOptimization = flagutil.NewDuration("search.minWindowForInstantRollupOptimization", "3h", "Enable cache-based optimization for repeated queries "+ "to /api/v1/query (aka instant queries), which contain rollup functions with lookbehind window exceeding the given value") ) // The minimum number of points per timeseries for enabling time rounding. // This improves cache hit ratio for frequently requested queries over // big time ranges. const minTimeseriesPointsForTimeRounding = 50 // ValidateMaxPointsPerSeries validates that the number of points for the given start, end and step do not exceed maxPoints. func ValidateMaxPointsPerSeries(start, end, step int64, maxPoints int) error { if step == 0 { return fmt.Errorf("step can't be equal to zero") } points := (end-start)/step + 1 if points > int64(maxPoints) { return fmt.Errorf("too many points for the given start=%d, end=%d and step=%d: %d; the maximum number of points is %d", start, end, step, points, maxPoints) } return nil } // AdjustStartEnd adjusts start and end values, so response caching may be enabled. // // See EvalConfig.mayCache() for details. func AdjustStartEnd(start, end, step int64) (int64, int64) { if *disableCache { // Do not adjust start and end values when cache is disabled. // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/563 return start, end } points := (end-start)/step + 1 if points < minTimeseriesPointsForTimeRounding { // Too small number of points for rounding. return start, end } // Round start and end to values divisible by step in order // to enable response caching (see EvalConfig.mayCache). start, end = alignStartEnd(start, end, step) // Make sure that the new number of points is the same as the initial number of points. newPoints := (end-start)/step + 1 for newPoints > points { end -= step newPoints-- } return start, end } func alignStartEnd(start, end, step int64) (int64, int64) { // Round start to the nearest smaller value divisible by step. start -= start % step // Round end to the nearest bigger value divisible by step. adjust := end % step if adjust > 0 { end += step - adjust } return start, end } // EvalConfig is the configuration required for query evaluation via Exec type EvalConfig struct { Start int64 End int64 Step int64 // MaxSeries is the maximum number of time series, which can be scanned by the query. // Zero means 'no limit' MaxSeries int // MaxPointsPerSeries is the limit on the number of points, which can be generated per each returned time series. MaxPointsPerSeries int // QuotedRemoteAddr contains quoted remote address. QuotedRemoteAddr string Deadline searchutils.Deadline // Whether the response can be cached. MayCache bool // LookbackDelta is analog to `-query.lookback-delta` from Prometheus. LookbackDelta int64 // How many decimal digits after the point to leave in response. RoundDigits int // EnforcedTagFilterss may contain additional label filters to use in the query. EnforcedTagFilterss [][]storage.TagFilter // The callback, which returns the request URI during logging. // The request URI isn't stored here because its' construction may take non-trivial amounts of CPU. GetRequestURI func() string // QueryStats contains various stats for the currently executed query. // // The caller must initialize QueryStats, otherwise it isn't collected. QueryStats *QueryStats timestamps []int64 timestampsOnce sync.Once } // copyEvalConfig returns src copy. func copyEvalConfig(src *EvalConfig) *EvalConfig { var ec EvalConfig ec.Start = src.Start ec.End = src.End ec.Step = src.Step ec.MaxSeries = src.MaxSeries ec.MaxPointsPerSeries = src.MaxPointsPerSeries ec.Deadline = src.Deadline ec.MayCache = src.MayCache ec.LookbackDelta = src.LookbackDelta ec.RoundDigits = src.RoundDigits ec.EnforcedTagFilterss = src.EnforcedTagFilterss ec.GetRequestURI = src.GetRequestURI ec.QueryStats = src.QueryStats // do not copy src.timestamps - they must be generated again. return &ec } // QueryStats contains various stats for the query. type QueryStats struct { // SeriesFetched contains the number of series fetched from storage during the query evaluation. SeriesFetched atomic.Int64 // ExecutionTimeMsec contains the number of milliseconds the query took to execute. ExecutionTimeMsec atomic.Int64 } func (qs *QueryStats) addSeriesFetched(n int) { if qs == nil { return } qs.SeriesFetched.Add(int64(n)) } func (qs *QueryStats) addExecutionTimeMsec(startTime time.Time) { if qs == nil { return } d := time.Since(startTime).Milliseconds() qs.ExecutionTimeMsec.Add(d) } func (ec *EvalConfig) validate() { if ec.Start > ec.End { logger.Panicf("BUG: start cannot exceed end; got %d vs %d", ec.Start, ec.End) } if ec.Step <= 0 { logger.Panicf("BUG: step must be greater than 0; got %d", ec.Step) } } func (ec *EvalConfig) mayCache() bool { if *disableCache { return false } if !ec.MayCache { return false } if ec.Start == ec.End { // There is no need in aligning start and end to step for instant query // in order to cache its results. return true } if ec.Start%ec.Step != 0 { return false } if ec.End%ec.Step != 0 { return false } return true } func (ec *EvalConfig) timeRangeString() string { start := storage.TimestampToHumanReadableFormat(ec.Start) end := storage.TimestampToHumanReadableFormat(ec.End) return fmt.Sprintf("[%s..%s]", start, end) } func (ec *EvalConfig) getSharedTimestamps() []int64 { ec.timestampsOnce.Do(ec.timestampsInit) return ec.timestamps } func (ec *EvalConfig) timestampsInit() { ec.timestamps = getTimestamps(ec.Start, ec.End, ec.Step, ec.MaxPointsPerSeries) } func getTimestamps(start, end, step int64, maxPointsPerSeries int) []int64 { // Sanity checks. if step <= 0 { logger.Panicf("BUG: Step must be bigger than 0; got %d", step) } if start > end { logger.Panicf("BUG: Start cannot exceed End; got %d vs %d", start, end) } if err := ValidateMaxPointsPerSeries(start, end, step, maxPointsPerSeries); err != nil { logger.Panicf("BUG: %s; this must be validated before the call to getTimestamps", err) } // Prepare timestamps. points := 1 + (end-start)/step timestamps := make([]int64, points) for i := range timestamps { timestamps[i] = start start += step } return timestamps } func evalExpr(qt *querytracer.Tracer, ec *EvalConfig, e metricsql.Expr) ([]*timeseries, error) { if qt.Enabled() { query := string(e.AppendString(nil)) query = stringsutil.LimitStringLen(query, 300) mayCache := ec.mayCache() qt = qt.NewChild("eval: query=%s, timeRange=%s, step=%d, mayCache=%v", query, ec.timeRangeString(), ec.Step, mayCache) } rv, err := evalExprInternal(qt, ec, e) if err != nil { return nil, err } if qt.Enabled() { seriesCount := len(rv) pointsPerSeries := 0 if len(rv) > 0 { pointsPerSeries = len(rv[0].Timestamps) } pointsCount := seriesCount * pointsPerSeries qt.Donef("series=%d, points=%d, pointsPerSeries=%d", seriesCount, pointsCount, pointsPerSeries) } return rv, nil } func evalExprInternal(qt *querytracer.Tracer, ec *EvalConfig, e metricsql.Expr) ([]*timeseries, error) { if me, ok := e.(*metricsql.MetricExpr); ok { re := &metricsql.RollupExpr{ Expr: me, } rv, err := evalRollupFunc(qt, ec, "default_rollup", rollupDefault, e, re, nil) if err != nil { return nil, fmt.Errorf(`cannot evaluate %q: %w`, me.AppendString(nil), err) } return rv, nil } if re, ok := e.(*metricsql.RollupExpr); ok { rv, err := evalRollupFunc(qt, ec, "default_rollup", rollupDefault, e, re, nil) if err != nil { return nil, fmt.Errorf(`cannot evaluate %q: %w`, re.AppendString(nil), err) } return rv, nil } if fe, ok := e.(*metricsql.FuncExpr); ok { nrf := getRollupFunc(fe.Name) if nrf == nil { qtChild := qt.NewChild("transform %s()", fe.Name) rv, err := evalTransformFunc(qtChild, ec, fe) qtChild.Donef("series=%d", len(rv)) return rv, err } args, re, err := evalRollupFuncArgs(qt, ec, fe) if err != nil { return nil, err } rf, err := nrf(args) if err != nil { return nil, fmt.Errorf("cannot evaluate args for %q: %w", fe.AppendString(nil), err) } rv, err := evalRollupFunc(qt, ec, fe.Name, rf, e, re, nil) if err != nil { return nil, fmt.Errorf(`cannot evaluate %q: %w`, fe.AppendString(nil), err) } return rv, nil } if ae, ok := e.(*metricsql.AggrFuncExpr); ok { qtChild := qt.NewChild("aggregate %s()", ae.Name) rv, err := evalAggrFunc(qtChild, ec, ae) qtChild.Donef("series=%d", len(rv)) return rv, err } if be, ok := e.(*metricsql.BinaryOpExpr); ok { qtChild := qt.NewChild("binary op %q", be.Op) rv, err := evalBinaryOp(qtChild, ec, be) qtChild.Donef("series=%d", len(rv)) return rv, err } if ne, ok := e.(*metricsql.NumberExpr); ok { rv := evalNumber(ec, ne.N) return rv, nil } if se, ok := e.(*metricsql.StringExpr); ok { rv := evalString(ec, se.S) return rv, nil } if de, ok := e.(*metricsql.DurationExpr); ok { d := de.Duration(ec.Step) dSec := float64(d) / 1000 rv := evalNumber(ec, dSec) return rv, nil } return nil, fmt.Errorf("unexpected expression %q", e.AppendString(nil)) } func evalTransformFunc(qt *querytracer.Tracer, ec *EvalConfig, fe *metricsql.FuncExpr) ([]*timeseries, error) { tf := getTransformFunc(fe.Name) if tf == nil { return nil, &UserReadableError{ Err: fmt.Errorf(`unknown func %q`, fe.Name), } } var args [][]*timeseries var err error switch fe.Name { case "", "union": args, err = evalExprsInParallel(qt, ec, fe.Args) default: args, err = evalExprsSequentially(qt, ec, fe.Args) } if err != nil { return nil, err } tfa := &transformFuncArg{ ec: ec, fe: fe, args: args, } rv, err := tf(tfa) if err != nil { return nil, &UserReadableError{ Err: fmt.Errorf(`cannot evaluate %q: %w`, fe.AppendString(nil), err), } } return rv, nil } func evalAggrFunc(qt *querytracer.Tracer, ec *EvalConfig, ae *metricsql.AggrFuncExpr) ([]*timeseries, error) { if callbacks := getIncrementalAggrFuncCallbacks(ae.Name); callbacks != nil { fe, nrf := tryGetArgRollupFuncWithMetricExpr(ae) if fe != nil { // There is an optimized path for calculating metricsql.AggrFuncExpr over rollupFunc over metricsql.MetricExpr. // The optimized path saves RAM for aggregates over big number of time series. args, re, err := evalRollupFuncArgs(qt, ec, fe) if err != nil { return nil, err } rf, err := nrf(args) if err != nil { return nil, fmt.Errorf("cannot evaluate args for aggregate func %q: %w", ae.AppendString(nil), err) } iafc := newIncrementalAggrFuncContext(ae, callbacks) return evalRollupFunc(qt, ec, fe.Name, rf, ae, re, iafc) } } args, err := evalExprsInParallel(qt, ec, ae.Args) if err != nil { return nil, err } af := getAggrFunc(ae.Name) if af == nil { return nil, &UserReadableError{ Err: fmt.Errorf(`unknown func %q`, ae.Name), } } afa := &aggrFuncArg{ ae: ae, args: args, ec: ec, } qtChild := qt.NewChild("eval %s", ae.Name) rv, err := af(afa) qtChild.Done() if err != nil { return nil, fmt.Errorf(`cannot evaluate %q: %w`, ae.AppendString(nil), err) } return rv, nil } func evalBinaryOp(qt *querytracer.Tracer, ec *EvalConfig, be *metricsql.BinaryOpExpr) ([]*timeseries, error) { bf := getBinaryOpFunc(be.Op) if bf == nil { return nil, fmt.Errorf(`unknown binary op %q`, be.Op) } var err error var tssLeft, tssRight []*timeseries switch strings.ToLower(be.Op) { case "and", "if": // Fetch right-side series at first, since it usually contains // lower number of time series for `and` and `if` operator. // This should produce more specific label filters for the left side of the query. // This, in turn, should reduce the time to select series for the left side of the query. tssRight, tssLeft, err = execBinaryOpArgs(qt, ec, be.Right, be.Left, be) default: tssLeft, tssRight, err = execBinaryOpArgs(qt, ec, be.Left, be.Right, be) } if err != nil { return nil, fmt.Errorf("cannot execute %q: %w", be.AppendString(nil), err) } bfa := &binaryOpFuncArg{ be: be, left: tssLeft, right: tssRight, } rv, err := bf(bfa) if err != nil { return nil, fmt.Errorf(`cannot evaluate %q: %w`, be.AppendString(nil), err) } return rv, nil } func canPushdownCommonFilters(be *metricsql.BinaryOpExpr) bool { switch strings.ToLower(be.Op) { case "or", "default": return false } if isAggrFuncWithoutGrouping(be.Left) || isAggrFuncWithoutGrouping(be.Right) { return false } return true } func isAggrFuncWithoutGrouping(e metricsql.Expr) bool { afe, ok := e.(*metricsql.AggrFuncExpr) if !ok { return false } return len(afe.Modifier.Args) == 0 } func execBinaryOpArgs(qt *querytracer.Tracer, ec *EvalConfig, exprFirst, exprSecond metricsql.Expr, be *metricsql.BinaryOpExpr) ([]*timeseries, []*timeseries, error) { if !canPushdownCommonFilters(be) { // Execute exprFirst and exprSecond in parallel, since it is impossible to pushdown common filters // from exprFirst to exprSecond. // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2886 qt = qt.NewChild("execute left and right sides of %q in parallel", be.Op) defer qt.Done() var wg sync.WaitGroup var tssFirst []*timeseries var errFirst error qtFirst := qt.NewChild("expr1") wg.Add(1) go func() { defer wg.Done() tssFirst, errFirst = evalExpr(qtFirst, ec, exprFirst) qtFirst.Done() }() var tssSecond []*timeseries var errSecond error qtSecond := qt.NewChild("expr2") wg.Add(1) go func() { defer wg.Done() tssSecond, errSecond = evalExpr(qtSecond, ec, exprSecond) qtSecond.Done() }() wg.Wait() if errFirst != nil { return nil, nil, errFirst } if errSecond != nil { return nil, nil, errSecond } return tssFirst, tssSecond, nil } // Execute binary operation in the following way: // // 1) execute the exprFirst // 2) get common label filters for series returned at step 1 // 3) push down the found common label filters to exprSecond. This filters out unneeded series // during exprSecond execution instead of spending compute resources on extracting and processing these series // before they are dropped later when matching time series according to https://prometheus.io/docs/prometheus/latest/querying/operators/#vector-matching // 4) execute the exprSecond with possible additional filters found at step 3 // // Typical use cases: // - Kubernetes-related: show pod creation time with the node name: // // kube_pod_created{namespace="prod"} * on (uid) group_left(node) kube_pod_info // // Without the optimization `kube_pod_info` would select and spend compute resources // for more time series than needed. The selected time series would be dropped later // when matching time series on the right and left sides of binary operand. // // - Generic alerting queries, which rely on `info` metrics. // See https://grafana.com/blog/2021/08/04/how-to-use-promql-joins-for-more-effective-queries-of-prometheus-metrics-at-scale/ // // - Queries, which get additional labels from `info` metrics. // See https://www.robustperception.io/exposing-the-software-version-to-prometheus tssFirst, err := evalExpr(qt, ec, exprFirst) if err != nil { return nil, nil, err } if len(tssFirst) == 0 && !strings.EqualFold(be.Op, "or") { // Fast path: there is no sense in executing the exprSecond when exprFirst returns an empty result, // since the "exprFirst op exprSecond" would return an empty result in any case. // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3349 return nil, nil, nil } lfs := getCommonLabelFilters(tssFirst) lfs = metricsql.TrimFiltersByGroupModifier(lfs, be) exprSecond = metricsql.PushdownBinaryOpFilters(exprSecond, lfs) tssSecond, err := evalExpr(qt, ec, exprSecond) if err != nil { return nil, nil, err } return tssFirst, tssSecond, nil } func getCommonLabelFilters(tss []*timeseries) []metricsql.LabelFilter { if len(tss) == 0 { return nil } type valuesCounter struct { values map[string]struct{} count int } m := make(map[string]*valuesCounter, len(tss[0].MetricName.Tags)) for _, ts := range tss { for _, tag := range ts.MetricName.Tags { vc, ok := m[string(tag.Key)] if !ok { k := string(tag.Key) v := string(tag.Value) m[k] = &valuesCounter{ values: map[string]struct{}{ v: {}, }, count: 1, } continue } if len(vc.values) > 100 { // Too many unique values found for the given tag. // Do not make a filter on such values, since it may slow down // search for matching time series. continue } vc.count++ if _, ok := vc.values[string(tag.Value)]; !ok { vc.values[string(tag.Value)] = struct{}{} } } } lfs := make([]metricsql.LabelFilter, 0, len(m)) var values []string for k, vc := range m { if vc.count != len(tss) { // Skip the tag, since it doesn't belong to all the time series. continue } values = values[:0] for s := range vc.values { values = append(values, s) } lf := metricsql.LabelFilter{ Label: k, } if len(values) == 1 { lf.Value = values[0] } else { sort.Strings(values) lf.Value = joinRegexpValues(values) lf.IsRegexp = true } lfs = append(lfs, lf) } sort.Slice(lfs, func(i, j int) bool { return lfs[i].Label < lfs[j].Label }) return lfs } func joinRegexpValues(a []string) string { var b []byte for i, s := range a { sQuoted := regexp.QuoteMeta(s) b = append(b, sQuoted...) if i < len(a)-1 { b = append(b, '|') } } return string(b) } func tryGetArgRollupFuncWithMetricExpr(ae *metricsql.AggrFuncExpr) (*metricsql.FuncExpr, newRollupFunc) { if len(ae.Args) != 1 { return nil, nil } e := ae.Args[0] // Make sure e contains one of the following: // - metricExpr // - metricExpr[d] // - rollupFunc(metricExpr) // - rollupFunc(metricExpr[d]) if me, ok := e.(*metricsql.MetricExpr); ok { // e = metricExpr if me.IsEmpty() { return nil, nil } fe := &metricsql.FuncExpr{ Name: "default_rollup", Args: []metricsql.Expr{me}, } nrf := getRollupFunc(fe.Name) return fe, nrf } if re, ok := e.(*metricsql.RollupExpr); ok { if me, ok := re.Expr.(*metricsql.MetricExpr); !ok || me.IsEmpty() || re.ForSubquery() { return nil, nil } // e = metricExpr[d] fe := &metricsql.FuncExpr{ Name: "default_rollup", Args: []metricsql.Expr{re}, } nrf := getRollupFunc(fe.Name) return fe, nrf } fe, ok := e.(*metricsql.FuncExpr) if !ok { return nil, nil } nrf := getRollupFunc(fe.Name) if nrf == nil { return nil, nil } rollupArgIdx := metricsql.GetRollupArgIdx(fe) if rollupArgIdx >= len(fe.Args) { // Incorrect number of args for rollup func. return nil, nil } arg := fe.Args[rollupArgIdx] if me, ok := arg.(*metricsql.MetricExpr); ok { if me.IsEmpty() { return nil, nil } // e = rollupFunc(metricExpr) return fe, nrf } if re, ok := arg.(*metricsql.RollupExpr); ok { if me, ok := re.Expr.(*metricsql.MetricExpr); !ok || me.IsEmpty() || re.ForSubquery() { return nil, nil } // e = rollupFunc(metricExpr[d]) return fe, nrf } return nil, nil } func evalExprsSequentially(qt *querytracer.Tracer, ec *EvalConfig, es []metricsql.Expr) ([][]*timeseries, error) { var rvs [][]*timeseries for _, e := range es { rv, err := evalExpr(qt, ec, e) if err != nil { return nil, err } rvs = append(rvs, rv) } return rvs, nil } func evalExprsInParallel(qt *querytracer.Tracer, ec *EvalConfig, es []metricsql.Expr) ([][]*timeseries, error) { if len(es) < 2 { return evalExprsSequentially(qt, ec, es) } rvs := make([][]*timeseries, len(es)) errs := make([]error, len(es)) qt.Printf("eval function args in parallel") var wg sync.WaitGroup for i, e := range es { wg.Add(1) qtChild := qt.NewChild("eval arg %d", i) go func(e metricsql.Expr, i int) { defer func() { qtChild.Done() wg.Done() }() rv, err := evalExpr(qtChild, ec, e) rvs[i] = rv errs[i] = err }(e, i) } wg.Wait() for _, err := range errs { if err != nil { return nil, err } } return rvs, nil } func evalRollupFuncArgs(qt *querytracer.Tracer, ec *EvalConfig, fe *metricsql.FuncExpr) ([]interface{}, *metricsql.RollupExpr, error) { var re *metricsql.RollupExpr rollupArgIdx := metricsql.GetRollupArgIdx(fe) if len(fe.Args) <= rollupArgIdx { return nil, nil, fmt.Errorf("expecting at least %d args to %q; got %d args; expr: %q", rollupArgIdx+1, fe.Name, len(fe.Args), fe.AppendString(nil)) } args := make([]interface{}, len(fe.Args)) for i, arg := range fe.Args { if i == rollupArgIdx { re = getRollupExprArg(arg) args[i] = re continue } ts, err := evalExpr(qt, ec, arg) if err != nil { return nil, nil, fmt.Errorf("cannot evaluate arg #%d for %q: %w", i+1, fe.AppendString(nil), err) } args[i] = ts } return args, re, nil } func getRollupExprArg(arg metricsql.Expr) *metricsql.RollupExpr { re, ok := arg.(*metricsql.RollupExpr) if !ok { // Wrap non-rollup arg into metricsql.RollupExpr. return &metricsql.RollupExpr{ Expr: arg, } } if !re.ForSubquery() { // Return standard rollup if it doesn't contain subquery. return re } me, ok := re.Expr.(*metricsql.MetricExpr) if !ok { // arg contains subquery. return re } // Convert me[w:step] -> default_rollup(me)[w:step] reNew := *re reNew.Expr = &metricsql.FuncExpr{ Name: "default_rollup", Args: []metricsql.Expr{ &metricsql.RollupExpr{Expr: me}, }, } return &reNew } // expr may contain: // - rollupFunc(m) if iafc is nil // - aggrFunc(rollupFunc(m)) if iafc isn't nil func evalRollupFunc(qt *querytracer.Tracer, ec *EvalConfig, funcName string, rf rollupFunc, expr metricsql.Expr, re *metricsql.RollupExpr, iafc *incrementalAggrFuncContext) ([]*timeseries, error) { if re.At == nil { return evalRollupFuncWithoutAt(qt, ec, funcName, rf, expr, re, iafc) } tssAt, err := evalExpr(qt, ec, re.At) if err != nil { return nil, &UserReadableError{ Err: fmt.Errorf("cannot evaluate `@` modifier: %w", err), } } if len(tssAt) != 1 { return nil, &UserReadableError{ Err: fmt.Errorf("`@` modifier must return a single series; it returns %d series instead", len(tssAt)), } } atTimestamp := int64(tssAt[0].Values[0] * 1000) ecNew := copyEvalConfig(ec) ecNew.Start = atTimestamp ecNew.End = atTimestamp tss, err := evalRollupFuncWithoutAt(qt, ecNew, funcName, rf, expr, re, iafc) if err != nil { return nil, err } // expand single-point tss to the original time range. timestamps := ec.getSharedTimestamps() for _, ts := range tss { v := ts.Values[0] values := make([]float64, len(timestamps)) for i := range timestamps { values[i] = v } ts.Timestamps = timestamps ts.Values = values } return tss, nil } func evalRollupFuncWithoutAt(qt *querytracer.Tracer, ec *EvalConfig, funcName string, rf rollupFunc, expr metricsql.Expr, re *metricsql.RollupExpr, iafc *incrementalAggrFuncContext) ([]*timeseries, error) { funcName = strings.ToLower(funcName) ecNew := ec var offset int64 if re.Offset != nil { offset = re.Offset.Duration(ec.Step) ecNew = copyEvalConfig(ecNew) ecNew.Start -= offset ecNew.End -= offset // There is no need in calling AdjustStartEnd() on ecNew if ecNew.MayCache is set to true, // since the time range alignment has been already performed by the caller, // so cache hit rate should be quite good. // See also https://github.com/VictoriaMetrics/VictoriaMetrics/issues/976 } if funcName == "rollup_candlestick" { // Automatically apply `offset -step` to `rollup_candlestick` function // in order to obtain expected OHLC results. // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/309#issuecomment-582113462 step := ecNew.Step ecNew = copyEvalConfig(ecNew) ecNew.Start += step ecNew.End += step offset -= step } var rvs []*timeseries var err error if me, ok := re.Expr.(*metricsql.MetricExpr); ok { rvs, err = evalRollupFuncWithMetricExpr(qt, ecNew, funcName, rf, expr, me, iafc, re.Window) } else { if iafc != nil { logger.Panicf("BUG: iafc must be nil for rollup %q over subquery %q", funcName, re.AppendString(nil)) } rvs, err = evalRollupFuncWithSubquery(qt, ecNew, funcName, rf, expr, re) } if err != nil { return nil, &UserReadableError{ Err: err, } } if funcName == "absent_over_time" { rvs = aggregateAbsentOverTime(ecNew, re.Expr, rvs) } if offset != 0 && len(rvs) > 0 { // Make a copy of timestamps, since they may be used in other values. srcTimestamps := rvs[0].Timestamps dstTimestamps := append([]int64{}, srcTimestamps...) for i := range dstTimestamps { dstTimestamps[i] += offset } for _, ts := range rvs { ts.Timestamps = dstTimestamps } } return rvs, nil } // aggregateAbsentOverTime collapses tss to a single time series with 1 and nan values. // // Values for returned series are set to nan if at least a single tss series contains nan at that point. // This means that tss contains a series with non-empty results at that point. // This follows Prometheus logic - see https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2130 func aggregateAbsentOverTime(ec *EvalConfig, expr metricsql.Expr, tss []*timeseries) []*timeseries { rvs := getAbsentTimeseries(ec, expr) if len(tss) == 0 { return rvs } for i := range tss[0].Values { for _, ts := range tss { if math.IsNaN(ts.Values[i]) { rvs[0].Values[i] = nan break } } } return rvs } func evalRollupFuncWithSubquery(qt *querytracer.Tracer, ec *EvalConfig, funcName string, rf rollupFunc, expr metricsql.Expr, re *metricsql.RollupExpr) ([]*timeseries, error) { // TODO: determine whether to use rollupResultCacheV here. qt = qt.NewChild("subquery") defer qt.Done() step, err := re.Step.NonNegativeDuration(ec.Step) if err != nil { return nil, fmt.Errorf("cannot parse step in square brackets at %s: %w", expr.AppendString(nil), err) } if step == 0 { step = ec.Step } window, err := re.Window.NonNegativeDuration(ec.Step) if err != nil { return nil, fmt.Errorf("cannot parse lookbehind window in square brackets at %s: %w", expr.AppendString(nil), err) } ecSQ := copyEvalConfig(ec) ecSQ.Start -= window + step + maxSilenceInterval() ecSQ.End += step ecSQ.Step = step ecSQ.MaxPointsPerSeries = *maxPointsSubqueryPerTimeseries if err := ValidateMaxPointsPerSeries(ecSQ.Start, ecSQ.End, ecSQ.Step, ecSQ.MaxPointsPerSeries); err != nil { return nil, fmt.Errorf("%w; (see -search.maxPointsSubqueryPerTimeseries command-line flag)", err) } // unconditionally align start and end args to step for subquery as Prometheus does. ecSQ.Start, ecSQ.End = alignStartEnd(ecSQ.Start, ecSQ.End, ecSQ.Step) tssSQ, err := evalExpr(qt, ecSQ, re.Expr) if err != nil { return nil, err } if len(tssSQ) == 0 { return nil, nil } sharedTimestamps := getTimestamps(ec.Start, ec.End, ec.Step, ec.MaxPointsPerSeries) preFunc, rcs, err := getRollupConfigs(funcName, rf, expr, ec.Start, ec.End, ec.Step, ec.MaxPointsPerSeries, window, ec.LookbackDelta, sharedTimestamps) if err != nil { return nil, err } var samplesScannedTotal atomic.Uint64 keepMetricNames := getKeepMetricNames(expr) tsw := getTimeseriesByWorkerID() seriesByWorkerID := tsw.byWorkerID doParallel(tssSQ, func(tsSQ *timeseries, values []float64, timestamps []int64, workerID uint) ([]float64, []int64) { values, timestamps = removeNanValues(values[:0], timestamps[:0], tsSQ.Values, tsSQ.Timestamps) preFunc(values, timestamps) for _, rc := range rcs { if tsm := newTimeseriesMap(funcName, keepMetricNames, sharedTimestamps, &tsSQ.MetricName); tsm != nil { samplesScanned := rc.DoTimeseriesMap(tsm, values, timestamps) samplesScannedTotal.Add(samplesScanned) seriesByWorkerID[workerID].tss = tsm.AppendTimeseriesTo(seriesByWorkerID[workerID].tss) continue } var ts timeseries samplesScanned := doRollupForTimeseries(funcName, keepMetricNames, rc, &ts, &tsSQ.MetricName, values, timestamps, sharedTimestamps) samplesScannedTotal.Add(samplesScanned) seriesByWorkerID[workerID].tss = append(seriesByWorkerID[workerID].tss, &ts) } return values, timestamps }) tss := make([]*timeseries, 0, len(tssSQ)*len(rcs)) for i := range seriesByWorkerID { tss = append(tss, seriesByWorkerID[i].tss...) } putTimeseriesByWorkerID(tsw) rowsScannedPerQuery.Update(float64(samplesScannedTotal.Load())) qt.Printf("rollup %s() over %d series returned by subquery: series=%d, samplesScanned=%d", funcName, len(tssSQ), len(tss), samplesScannedTotal.Load()) return tss, nil } var rowsScannedPerQuery = metrics.NewHistogram(`vm_rows_scanned_per_query`) func getKeepMetricNames(expr metricsql.Expr) bool { if ae, ok := expr.(*metricsql.AggrFuncExpr); ok { // Extract rollupFunc(...) from aggrFunc(rollupFunc(...)). // This case is possible when optimized aggrFunc calculations are used // such as `sum(rate(...))` if len(ae.Args) != 1 { return false } expr = ae.Args[0] } if fe, ok := expr.(*metricsql.FuncExpr); ok { return fe.KeepMetricNames } return false } func doParallel(tss []*timeseries, f func(ts *timeseries, values []float64, timestamps []int64, workerID uint) ([]float64, []int64)) { workers := netstorage.MaxWorkers() if workers > len(tss) { workers = len(tss) } seriesPerWorker := (len(tss) + workers - 1) / workers workChs := make([]chan *timeseries, workers) for i := range workChs { workChs[i] = make(chan *timeseries, seriesPerWorker) } for i, ts := range tss { idx := i % len(workChs) workChs[idx] <- ts } for _, workCh := range workChs { close(workCh) } var wg sync.WaitGroup wg.Add(workers) for i := 0; i < workers; i++ { go func(workerID uint) { defer wg.Done() var tmpValues []float64 var tmpTimestamps []int64 for ts := range workChs[workerID] { tmpValues, tmpTimestamps = f(ts, tmpValues, tmpTimestamps, workerID) } }(uint(i)) } wg.Wait() } func removeNanValues(dstValues []float64, dstTimestamps []int64, values []float64, timestamps []int64) ([]float64, []int64) { hasNan := false for _, v := range values { if math.IsNaN(v) { hasNan = true } } if !hasNan { // Fast path - no NaNs. dstValues = append(dstValues, values...) dstTimestamps = append(dstTimestamps, timestamps...) return dstValues, dstTimestamps } // Slow path - remove NaNs. for i, v := range values { if math.IsNaN(v) { continue } dstValues = append(dstValues, v) dstTimestamps = append(dstTimestamps, timestamps[i]) } return dstValues, dstTimestamps } // evalInstantRollup evaluates instant rollup where ec.Start == ec.End. func evalInstantRollup(qt *querytracer.Tracer, ec *EvalConfig, funcName string, rf rollupFunc, expr metricsql.Expr, me *metricsql.MetricExpr, iafc *incrementalAggrFuncContext, window int64) ([]*timeseries, error) { if ec.Start != ec.End { logger.Panicf("BUG: evalInstantRollup cannot be called on non-empty time range; got %s", ec.timeRangeString()) } timestamp := ec.Start if qt.Enabled() { qt = qt.NewChild("instant rollup %s; time=%s, window=%d", expr.AppendString(nil), storage.TimestampToHumanReadableFormat(timestamp), window) defer qt.Done() } evalAt := func(qt *querytracer.Tracer, timestamp, window int64) ([]*timeseries, error) { ecCopy := copyEvalConfig(ec) ecCopy.Start = timestamp ecCopy.End = timestamp pointsPerSeries := int64(1) return evalRollupFuncNoCache(qt, ecCopy, funcName, rf, expr, me, iafc, window, pointsPerSeries) } tooBigOffset := func(offset int64) bool { maxOffset := window / 2 if maxOffset > 1800*1000 { maxOffset = 1800 * 1000 } return offset >= maxOffset } deleteCachedSeries := func(qt *querytracer.Tracer) { rollupResultCacheV.DeleteInstantValues(qt, expr, window, ec.Step, ec.EnforcedTagFilterss) } getCachedSeries := func(qt *querytracer.Tracer) ([]*timeseries, int64, error) { again: offset := int64(0) tssCached := rollupResultCacheV.GetInstantValues(qt, expr, window, ec.Step, ec.EnforcedTagFilterss) ec.QueryStats.addSeriesFetched(len(tssCached)) if len(tssCached) == 0 { // Cache miss. Re-populate the missing data. start := int64(fasttime.UnixTimestamp()*1000) - cacheTimestampOffset.Milliseconds() offset = timestamp - start if offset < 0 { start = timestamp offset = 0 } if tooBigOffset(offset) { qt.Printf("cannot apply instant rollup optimization because the -search.cacheTimestampOffset=%s is too big "+ "for the requested time=%s and window=%d", cacheTimestampOffset, storage.TimestampToHumanReadableFormat(timestamp), window) tss, err := evalAt(qt, timestamp, window) return tss, 0, err } qt.Printf("calculating the rollup at time=%s, because it is missing in the cache", storage.TimestampToHumanReadableFormat(start)) tss, err := evalAt(qt, start, window) if err != nil { return nil, 0, err } if hasDuplicateSeries(tss) { qt.Printf("cannot apply instant rollup optimization because the result contains duplicate series") tss, err := evalAt(qt, timestamp, window) return tss, 0, err } rollupResultCacheV.PutInstantValues(qt, expr, window, ec.Step, ec.EnforcedTagFilterss, tss) return tss, offset, nil } // Cache hit. Verify whether it is OK to use the cached data. offset = timestamp - tssCached[0].Timestamps[0] if offset < 0 { qt.Printf("do not apply instant rollup optimization because the cached values have bigger timestamp=%s than the requested one=%s", storage.TimestampToHumanReadableFormat(tssCached[0].Timestamps[0]), storage.TimestampToHumanReadableFormat(timestamp)) // Delete the outdated cached values, so the cache could be re-populated with newer values. deleteCachedSeries(qt) goto again } if tooBigOffset(offset) { qt.Printf("do not apply instant rollup optimization because the offset=%d between the requested timestamp "+ "and the cached values is too big comparing to window=%d", offset, window) // Delete the outdated cached values, so the cache could be re-populated with newer values. deleteCachedSeries(qt) goto again } return tssCached, offset, nil } if !ec.mayCache() { qt.Printf("do not apply instant rollup optimization because of disabled cache") return evalAt(qt, timestamp, window) } if window < minWindowForInstantRollupOptimization.Milliseconds() { qt.Printf("do not apply instant rollup optimization because of too small window=%d; must be equal or bigger than %d", window, minWindowForInstantRollupOptimization.Milliseconds()) return evalAt(qt, timestamp, window) } switch funcName { case "avg_over_time": if iafc != nil { qt.Printf("do not apply instant rollup optimization for incremental aggregate %s()", iafc.ae.Name) return evalAt(qt, timestamp, window) } qt.Printf("optimized calculation for instant rollup avg_over_time(m[d]) as (sum_over_time(m[d]) / count_over_time(m[d]))") fe := expr.(*metricsql.FuncExpr) feSum := *fe feSum.Name = "sum_over_time" feCount := *fe feCount.Name = "count_over_time" be := &metricsql.BinaryOpExpr{ Op: "/", KeepMetricNames: fe.KeepMetricNames, Left: &feSum, Right: &feCount, } return evalExpr(qt, ec, be) case "rate": if iafc != nil { if !strings.EqualFold(iafc.ae.Name, "sum") { qt.Printf("do not apply instant rollup optimization for incremental aggregate %s()", iafc.ae.Name) return evalAt(qt, timestamp, window) } qt.Printf("optimized calculation for sum(rate(m[d])) as (sum(increase(m[d])) / d)") afe := expr.(*metricsql.AggrFuncExpr) fe := afe.Args[0].(*metricsql.FuncExpr) feIncrease := *fe feIncrease.Name = "increase" re := fe.Args[0].(*metricsql.RollupExpr) d := re.Window.Duration(ec.Step) if d == 0 { d = ec.Step } afeIncrease := *afe afeIncrease.Args = []metricsql.Expr{&feIncrease} be := &metricsql.BinaryOpExpr{ Op: "/", KeepMetricNames: true, Left: &afeIncrease, Right: &metricsql.NumberExpr{ N: float64(d) / 1000, }, } return evalExpr(qt, ec, be) } qt.Printf("optimized calculation for instant rollup rate(m[d]) as (increase(m[d]) / d)") fe := expr.(*metricsql.FuncExpr) feIncrease := *fe feIncrease.Name = "increase" re := fe.Args[0].(*metricsql.RollupExpr) d := re.Window.Duration(ec.Step) if d == 0 { d = ec.Step } be := &metricsql.BinaryOpExpr{ Op: "/", KeepMetricNames: fe.KeepMetricNames, Left: &feIncrease, Right: &metricsql.NumberExpr{ N: float64(d) / 1000, }, } return evalExpr(qt, ec, be) case "max_over_time": if iafc != nil { if !strings.EqualFold(iafc.ae.Name, "max") { qt.Printf("do not apply instant rollup optimization for non-max incremental aggregate %s()", iafc.ae.Name) return evalAt(qt, timestamp, window) } } // Calculate // // max_over_time(m[window] @ timestamp) // // as the maximum of // // - max_over_time(m[window] @ (timestamp-offset)) // - max_over_time(m[offset] @ timestamp) // // if max_over_time(m[offset] @ (timestamp-window)) < max_over_time(m[window] @ (timestamp-offset)) // otherwise do not apply the optimization // // where // // - max_over_time(m[window] @ (timestamp-offset)) is obtained from cache // - max_over_time(m[offset] @ timestamp) and max_over_time(m[offset] @ (timestamp-window)) are calculated from the storage // These rollups are calculated faster than max_over_time(m[window]) because offset is smaller than window. qtChild := qt.NewChild("optimized calculation for instant rollup %s at time=%s with lookbehind window=%d", expr.AppendString(nil), storage.TimestampToHumanReadableFormat(timestamp), window) defer qtChild.Done() tssCached, offset, err := getCachedSeries(qtChild) if err != nil { return nil, err } if offset == 0 { return tssCached, nil } // Calculate max_over_time(m[offset] @ timestamp) tssStart, err := evalAt(qtChild, timestamp, offset) if err != nil { return nil, err } if hasDuplicateSeries(tssStart) { qtChild.Printf("cannot apply instant rollup optimization, since tssStart contains duplicate series") return evalAt(qtChild, timestamp, window) } // Calculate max_over_time(m[offset] @ (timestamp - window)) tssEnd, err := evalAt(qtChild, timestamp-window, offset) if err != nil { return nil, err } if hasDuplicateSeries(tssEnd) { qtChild.Printf("cannot apply instant rollup optimization, since tssEnd contains duplicate series") return evalAt(qtChild, timestamp, window) } // Calculate the result tss, ok := getMaxInstantValues(qtChild, tssCached, tssStart, tssEnd, timestamp) if !ok { qtChild.Printf("cannot apply instant rollup optimization, since tssEnd contains bigger values than tssCached") deleteCachedSeries(qtChild) return evalAt(qt, timestamp, window) } return tss, nil case "min_over_time": if iafc != nil { if !strings.EqualFold(iafc.ae.Name, "min") { qt.Printf("do not apply instant rollup optimization for non-min incremental aggregate %s()", iafc.ae.Name) return evalAt(qt, timestamp, window) } } // Calculate // // min_over_time(m[window] @ timestamp) // // as the minimum of // // - min_over_time(m[window] @ (timestamp-offset)) // - min_over_time(m[offset] @ timestamp) // // if min_over_time(m[offset] @ (timestamp-window)) > min_over_time(m[window] @ (timestamp-offset)) // otherwise do not apply the optimization // // where // // - min_over_time(m[window] @ (timestamp-offset)) is obtained from cache // - min_over_time(m[offset] @ timestamp) and min_over_time(m[offset] @ (timestamp-window)) are calculated from the storage // These rollups are calculated faster than min_over_time(m[window]) because offset is smaller than window. qtChild := qt.NewChild("optimized calculation for instant rollup %s at time=%s with lookbehind window=%d", expr.AppendString(nil), storage.TimestampToHumanReadableFormat(timestamp), window) defer qtChild.Done() tssCached, offset, err := getCachedSeries(qtChild) if err != nil { return nil, err } if offset == 0 { return tssCached, nil } // Calculate min_over_time(m[offset] @ timestamp) tssStart, err := evalAt(qtChild, timestamp, offset) if err != nil { return nil, err } if hasDuplicateSeries(tssStart) { qtChild.Printf("cannot apply instant rollup optimization, since tssStart contains duplicate series") return evalAt(qtChild, timestamp, window) } // Calculate min_over_time(m[offset] @ (timestamp - window)) tssEnd, err := evalAt(qtChild, timestamp-window, offset) if err != nil { return nil, err } if hasDuplicateSeries(tssEnd) { qtChild.Printf("cannot apply instant rollup optimization, since tssEnd contains duplicate series") return evalAt(qtChild, timestamp, window) } // Calculate the result tss, ok := getMinInstantValues(qtChild, tssCached, tssStart, tssEnd, timestamp) if !ok { qtChild.Printf("cannot apply instant rollup optimization, since tssEnd contains smaller values than tssCached") deleteCachedSeries(qtChild) return evalAt(qt, timestamp, window) } return tss, nil case "count_eq_over_time", "count_gt_over_time", "count_le_over_time", "count_ne_over_time", "count_over_time", "increase", "increase_pure", "sum_over_time": if iafc != nil && !strings.EqualFold(iafc.ae.Name, "sum") { qt.Printf("do not apply instant rollup optimization for non-sum incremental aggregate %s()", iafc.ae.Name) return evalAt(qt, timestamp, window) } // Calculate // // rf(m[window] @ timestamp) // // as // // rf(m[window] @ (timestamp-offset)) + rf(m[offset] @ timestamp) - rf(m[offset] @ (timestamp-window)) // // where // // - rf is count_over_time, sum_over_time or increase // - rf(m[window] @ (timestamp-offset)) is obtained from cache // - rf(m[offset] @ timestamp) and rf(m[offset] @ (timestamp-window)) are calculated from the storage // These rollups are calculated faster than rf(m[window]) because offset is smaller than window. qtChild := qt.NewChild("optimized calculation for instant rollup %s at time=%s with lookbehind window=%d", expr.AppendString(nil), storage.TimestampToHumanReadableFormat(timestamp), window) defer qtChild.Done() tssCached, offset, err := getCachedSeries(qtChild) if err != nil { return nil, err } if offset == 0 { return tssCached, nil } // Calculate rf(m[offset] @ timestamp) tssStart, err := evalAt(qtChild, timestamp, offset) if err != nil { return nil, err } if hasDuplicateSeries(tssStart) { qtChild.Printf("cannot apply instant rollup optimization, since tssStart contains duplicate series") return evalAt(qtChild, timestamp, window) } // Calculate rf(m[offset] @ (timestamp - window)) tssEnd, err := evalAt(qtChild, timestamp-window, offset) if err != nil { return nil, err } if hasDuplicateSeries(tssEnd) { qtChild.Printf("cannot apply instant rollup optimization, since tssEnd contains duplicate series") return evalAt(qtChild, timestamp, window) } // Calculate the result tss := getSumInstantValues(qtChild, tssCached, tssStart, tssEnd, timestamp) return tss, nil default: qt.Printf("instant rollup optimization isn't implemented for %s()", funcName) return evalAt(qt, timestamp, window) } } func hasDuplicateSeries(tss []*timeseries) bool { if len(tss) <= 1 { return false } m := make(map[string]struct{}, len(tss)) bb := bbPool.Get() defer bbPool.Put(bb) for _, ts := range tss { bb.B = marshalMetricNameSorted(bb.B[:0], &ts.MetricName) if _, ok := m[string(bb.B)]; ok { return true } m[string(bb.B)] = struct{}{} } return false } func getMinInstantValues(qt *querytracer.Tracer, tssCached, tssStart, tssEnd []*timeseries, timestamp int64) ([]*timeseries, bool) { qt = qt.NewChild("calculate the minimum for instant values across series; cached=%d, start=%d, end=%d, timestamp=%d", len(tssCached), len(tssStart), len(tssEnd), timestamp) defer qt.Done() getMin := func(a, b float64) float64 { if a < b { return a } return b } tss, ok := getMinMaxInstantValues(tssCached, tssStart, tssEnd, timestamp, getMin) qt.Printf("resulting series=%d; ok=%v", len(tss), ok) return tss, ok } func getMaxInstantValues(qt *querytracer.Tracer, tssCached, tssStart, tssEnd []*timeseries, timestamp int64) ([]*timeseries, bool) { qt = qt.NewChild("calculate the maximum for instant values across series; cached=%d, start=%d, end=%d, timestamp=%d", len(tssCached), len(tssStart), len(tssEnd), timestamp) defer qt.Done() getMax := func(a, b float64) float64 { if a > b { return a } return b } tss, ok := getMinMaxInstantValues(tssCached, tssStart, tssEnd, timestamp, getMax) qt.Printf("resulting series=%d", len(tss)) return tss, ok } func getMinMaxInstantValues(tssCached, tssStart, tssEnd []*timeseries, timestamp int64, f func(a, b float64) float64) ([]*timeseries, bool) { assertInstantValues(tssCached) assertInstantValues(tssStart) assertInstantValues(tssEnd) bb := bbPool.Get() defer bbPool.Put(bb) m := make(map[string]*timeseries, len(tssCached)) for _, ts := range tssCached { bb.B = marshalMetricNameSorted(bb.B[:0], &ts.MetricName) if _, ok := m[string(bb.B)]; ok { logger.Panicf("BUG: duplicate series found: %s", &ts.MetricName) } m[string(bb.B)] = ts } mStart := make(map[string]*timeseries, len(tssStart)) for _, ts := range tssStart { bb.B = marshalMetricNameSorted(bb.B[:0], &ts.MetricName) if _, ok := mStart[string(bb.B)]; ok { logger.Panicf("BUG: duplicate series found: %s", &ts.MetricName) } mStart[string(bb.B)] = ts tsCached := m[string(bb.B)] if tsCached != nil && !math.IsNaN(tsCached.Values[0]) { if !math.IsNaN(ts.Values[0]) { tsCached.Values[0] = f(ts.Values[0], tsCached.Values[0]) } } else { m[string(bb.B)] = ts } } for _, ts := range tssEnd { bb.B = marshalMetricNameSorted(bb.B[:0], &ts.MetricName) tsCached := m[string(bb.B)] if tsCached != nil && !math.IsNaN(tsCached.Values[0]) && !math.IsNaN(ts.Values[0]) { if ts.Values[0] == f(ts.Values[0], tsCached.Values[0]) { tsStart := mStart[string(bb.B)] if tsStart == nil || math.IsNaN(tsStart.Values[0]) || tsStart.Values[0] != f(ts.Values[0], tsStart.Values[0]) { return nil, false } } } } rvs := make([]*timeseries, 0, len(m)) for _, ts := range m { rvs = append(rvs, ts) } setInstantTimestamp(rvs, timestamp) return rvs, true } // getSumInstantValues aggregates tssCached, tssStart, tssEnd time series // into a new time series with value = tssCached + tssStart - tssEnd func getSumInstantValues(qt *querytracer.Tracer, tssCached, tssStart, tssEnd []*timeseries, timestamp int64) []*timeseries { qt = qt.NewChild("calculate the sum for instant values across series; cached=%d, start=%d, end=%d, timestamp=%d", len(tssCached), len(tssStart), len(tssEnd), timestamp) defer qt.Done() assertInstantValues(tssCached) assertInstantValues(tssStart) assertInstantValues(tssEnd) m := make(map[string]*timeseries, len(tssCached)) bb := bbPool.Get() defer bbPool.Put(bb) for _, ts := range tssCached { bb.B = marshalMetricNameSorted(bb.B[:0], &ts.MetricName) if _, ok := m[string(bb.B)]; ok { logger.Panicf("BUG: duplicate series found: %s", &ts.MetricName) } m[string(bb.B)] = ts } for _, ts := range tssStart { bb.B = marshalMetricNameSorted(bb.B[:0], &ts.MetricName) tsCached := m[string(bb.B)] if tsCached != nil && !math.IsNaN(tsCached.Values[0]) { if !math.IsNaN(ts.Values[0]) { tsCached.Values[0] += ts.Values[0] } } else { m[string(bb.B)] = ts } } for _, ts := range tssEnd { bb.B = marshalMetricNameSorted(bb.B[:0], &ts.MetricName) tsCached := m[string(bb.B)] if tsCached != nil && !math.IsNaN(tsCached.Values[0]) { if !math.IsNaN(ts.Values[0]) { tsCached.Values[0] -= ts.Values[0] } } } rvs := make([]*timeseries, 0, len(m)) for _, ts := range m { rvs = append(rvs, ts) } setInstantTimestamp(rvs, timestamp) qt.Printf("resulting series=%d", len(rvs)) return rvs } func setInstantTimestamp(tss []*timeseries, timestamp int64) { for _, ts := range tss { ts.Timestamps[0] = timestamp } } func assertInstantValues(tss []*timeseries) { for _, ts := range tss { if len(ts.Values) != 1 { logger.Panicf("BUG: instant series must contain a single value; got %d values", len(ts.Values)) } if len(ts.Timestamps) != 1 { logger.Panicf("BUG: instant series must contain a single timestamp; got %d timestamps", len(ts.Timestamps)) } } } var ( rollupResultCacheFullHits = metrics.NewCounter(`vm_rollup_result_cache_full_hits_total`) rollupResultCachePartialHits = metrics.NewCounter(`vm_rollup_result_cache_partial_hits_total`) rollupResultCacheMiss = metrics.NewCounter(`vm_rollup_result_cache_miss_total`) memoryIntensiveQueries = metrics.NewCounter(`vm_memory_intensive_queries_total`) ) func evalRollupFuncWithMetricExpr(qt *querytracer.Tracer, ec *EvalConfig, funcName string, rf rollupFunc, expr metricsql.Expr, me *metricsql.MetricExpr, iafc *incrementalAggrFuncContext, windowExpr *metricsql.DurationExpr) ([]*timeseries, error) { window, err := windowExpr.NonNegativeDuration(ec.Step) if err != nil { return nil, fmt.Errorf("cannot parse lookbehind window in square brackets at %s: %w", expr.AppendString(nil), err) } if me.IsEmpty() { return evalNumber(ec, nan), nil } if ec.Start == ec.End { rvs, err := evalInstantRollup(qt, ec, funcName, rf, expr, me, iafc, window) if err != nil { err = &UserReadableError{ Err: err, } return nil, err } return rvs, nil } pointsPerSeries := 1 + (ec.End-ec.Start)/ec.Step evalWithConfig := func(ec *EvalConfig) ([]*timeseries, error) { tss, err := evalRollupFuncNoCache(qt, ec, funcName, rf, expr, me, iafc, window, pointsPerSeries) if err != nil { err = &UserReadableError{ Err: err, } return nil, err } return tss, nil } if !ec.mayCache() { qt.Printf("do not fetch series from cache, since it is disabled in the current context") return evalWithConfig(ec) } // Search for cached results. tssCached, start := rollupResultCacheV.GetSeries(qt, ec, expr, window) ec.QueryStats.addSeriesFetched(len(tssCached)) if start > ec.End { qt.Printf("the result is fully cached") rollupResultCacheFullHits.Inc() return tssCached, nil } if start > ec.Start { qt.Printf("partial cache hit") rollupResultCachePartialHits.Inc() } else { qt.Printf("cache miss") rollupResultCacheMiss.Inc() } // Fetch missing results, which aren't cached yet. ecNew := ec if start != ec.Start { ecNew = copyEvalConfig(ec) ecNew.Start = start } tss, err := evalWithConfig(ecNew) if err != nil { return nil, err } // Merge cached results with the fetched additional results. rvs, ok := mergeSeries(qt, tssCached, tss, start, ec) if !ok { // Cannot merge series - fall back to non-cached querying. qt.Printf("fall back to non-caching querying") rvs, err = evalWithConfig(ec) if err != nil { return nil, err } } rollupResultCacheV.PutSeries(qt, ec, expr, window, rvs) return rvs, nil } // evalRollupFuncNoCache calculates the given rf with the given lookbehind window. // // pointsPerSeries is used only for estimating the needed memory for query processing func evalRollupFuncNoCache(qt *querytracer.Tracer, ec *EvalConfig, funcName string, rf rollupFunc, expr metricsql.Expr, me *metricsql.MetricExpr, iafc *incrementalAggrFuncContext, window, pointsPerSeries int64) ([]*timeseries, error) { if qt.Enabled() { qt = qt.NewChild("rollup %s: timeRange=%s, step=%d, window=%d", expr.AppendString(nil), ec.timeRangeString(), ec.Step, window) defer qt.Done() } if window < 0 { return nil, nil } // Obtain rollup configs before fetching data from db, so type errors could be caught earlier. sharedTimestamps := getTimestamps(ec.Start, ec.End, ec.Step, ec.MaxPointsPerSeries) preFunc, rcs, err := getRollupConfigs(funcName, rf, expr, ec.Start, ec.End, ec.Step, ec.MaxPointsPerSeries, window, ec.LookbackDelta, sharedTimestamps) if err != nil { return nil, err } // Fetch the result. tfss := searchutils.ToTagFilterss(me.LabelFilterss) tfss = searchutils.JoinTagFilterss(tfss, ec.EnforcedTagFilterss) minTimestamp := ec.Start if needSilenceIntervalForRollupFunc[funcName] { minTimestamp -= maxSilenceInterval() } if window > ec.Step { minTimestamp -= window } else { minTimestamp -= ec.Step } sq := storage.NewSearchQuery(minTimestamp, ec.End, tfss, ec.MaxSeries) rss, err := netstorage.ProcessSearchQuery(qt, sq, ec.Deadline) if err != nil { return nil, err } rssLen := rss.Len() if rssLen == 0 { rss.Cancel() return nil, nil } ec.QueryStats.addSeriesFetched(rssLen) // Verify timeseries fit available memory during rollup calculations. timeseriesLen := rssLen if iafc != nil { // Incremental aggregates require holding only GOMAXPROCS timeseries in memory. timeseriesLen = cgroup.AvailableCPUs() if iafc.ae.Modifier.Op != "" { if iafc.ae.Limit > 0 { // There is an explicit limit on the number of output time series. timeseriesLen *= iafc.ae.Limit } else { // Increase the number of timeseries for non-empty group list: `aggr() by (something)`, // since each group can have own set of time series in memory. timeseriesLen *= 1000 } } // The maximum number of output time series is limited by rssLen. if timeseriesLen > rssLen { timeseriesLen = rssLen } } rollupPoints := mulNoOverflow(pointsPerSeries, int64(timeseriesLen*len(rcs))) rollupMemorySize := sumNoOverflow(mulNoOverflow(int64(rssLen), 1000), mulNoOverflow(rollupPoints, 16)) if maxMemory := int64(logQueryMemoryUsage.N); maxMemory > 0 && rollupMemorySize > maxMemory { memoryIntensiveQueries.Inc() requestURI := ec.GetRequestURI() logger.Warnf("remoteAddr=%s, requestURI=%s: the %s requires %d bytes of memory for processing; "+ "logging this query, since it exceeds the -search.logQueryMemoryUsage=%d; "+ "the query selects %d time series and generates %d points across all the time series; try reducing the number of selected time series", ec.QuotedRemoteAddr, requestURI, expr.AppendString(nil), rollupMemorySize, maxMemory, timeseriesLen*len(rcs), rollupPoints) } if maxMemory := int64(maxMemoryPerQuery.N); maxMemory > 0 && rollupMemorySize > maxMemory { rss.Cancel() err := fmt.Errorf("not enough memory for processing %s, which returns %d data points across %d time series with %d points in each time series "+ "according to -search.maxMemoryPerQuery=%d; requested memory: %d bytes; "+ "possible solutions are: reducing the number of matching time series; increasing `step` query arg (step=%gs); "+ "increasing -search.maxMemoryPerQuery", expr.AppendString(nil), rollupPoints, timeseriesLen*len(rcs), pointsPerSeries, maxMemory, rollupMemorySize, float64(ec.Step)/1e3) return nil, err } rml := getRollupMemoryLimiter() if !rml.Get(uint64(rollupMemorySize)) { rss.Cancel() err := fmt.Errorf("not enough memory for processing %s, which returns %d data points across %d time series with %d points in each time series; "+ "total available memory for concurrent requests: %d bytes; requested memory: %d bytes; "+ "possible solutions are: reducing the number of matching time series; increasing `step` query arg (step=%gs); "+ "switching to node with more RAM; increasing -memory.allowedPercent", expr.AppendString(nil), rollupPoints, timeseriesLen*len(rcs), pointsPerSeries, rml.MaxSize, uint64(rollupMemorySize), float64(ec.Step)/1e3) return nil, err } defer rml.Put(uint64(rollupMemorySize)) qt.Printf("the rollup evaluation needs an estimated %d bytes of RAM for %d series and %d points per series (summary %d points)", rollupMemorySize, timeseriesLen, pointsPerSeries, rollupPoints) // Evaluate rollup keepMetricNames := getKeepMetricNames(expr) if iafc != nil { return evalRollupWithIncrementalAggregate(qt, funcName, keepMetricNames, iafc, rss, rcs, preFunc, sharedTimestamps) } return evalRollupNoIncrementalAggregate(qt, funcName, keepMetricNames, rss, rcs, preFunc, sharedTimestamps) } var ( rollupMemoryLimiter memoryLimiter rollupMemoryLimiterOnce sync.Once ) func getRollupMemoryLimiter() *memoryLimiter { rollupMemoryLimiterOnce.Do(func() { rollupMemoryLimiter.MaxSize = uint64(memory.Allowed()) / 4 }) return &rollupMemoryLimiter } func maxSilenceInterval() int64 { d := minStalenessInterval.Milliseconds() if d <= 0 { d = 5 * 60 * 1000 } return d } func evalRollupWithIncrementalAggregate(qt *querytracer.Tracer, funcName string, keepMetricNames bool, iafc *incrementalAggrFuncContext, rss *netstorage.Results, rcs []*rollupConfig, preFunc func(values []float64, timestamps []int64), sharedTimestamps []int64) ([]*timeseries, error) { qt = qt.NewChild("rollup %s() with incremental aggregation %s() over %d series; rollupConfigs=%s", funcName, iafc.ae.Name, rss.Len(), rcs) defer qt.Done() var samplesScannedTotal atomic.Uint64 err := rss.RunParallel(qt, func(rs *netstorage.Result, workerID uint) error { rs.Values, rs.Timestamps = dropStaleNaNs(funcName, rs.Values, rs.Timestamps) preFunc(rs.Values, rs.Timestamps) ts := getTimeseries() defer putTimeseries(ts) for _, rc := range rcs { if tsm := newTimeseriesMap(funcName, keepMetricNames, sharedTimestamps, &rs.MetricName); tsm != nil { samplesScanned := rc.DoTimeseriesMap(tsm, rs.Values, rs.Timestamps) for _, ts := range tsm.m { iafc.updateTimeseries(ts, workerID) } samplesScannedTotal.Add(samplesScanned) continue } ts.Reset() samplesScanned := doRollupForTimeseries(funcName, keepMetricNames, rc, ts, &rs.MetricName, rs.Values, rs.Timestamps, sharedTimestamps) samplesScannedTotal.Add(samplesScanned) iafc.updateTimeseries(ts, workerID) // ts.Timestamps points to sharedTimestamps. Zero it, so it can be re-used. ts.Timestamps = nil ts.denyReuse = false } return nil }) if err != nil { return nil, err } tss := iafc.finalizeTimeseries() rowsScannedPerQuery.Update(float64(samplesScannedTotal.Load())) qt.Printf("series after aggregation with %s(): %d; samplesScanned=%d", iafc.ae.Name, len(tss), samplesScannedTotal.Load()) return tss, nil } func evalRollupNoIncrementalAggregate(qt *querytracer.Tracer, funcName string, keepMetricNames bool, rss *netstorage.Results, rcs []*rollupConfig, preFunc func(values []float64, timestamps []int64), sharedTimestamps []int64) ([]*timeseries, error) { qt = qt.NewChild("rollup %s() over %d series; rollupConfigs=%s", funcName, rss.Len(), rcs) defer qt.Done() var samplesScannedTotal atomic.Uint64 tsw := getTimeseriesByWorkerID() seriesByWorkerID := tsw.byWorkerID seriesLen := rss.Len() err := rss.RunParallel(qt, func(rs *netstorage.Result, workerID uint) error { rs.Values, rs.Timestamps = dropStaleNaNs(funcName, rs.Values, rs.Timestamps) preFunc(rs.Values, rs.Timestamps) for _, rc := range rcs { if tsm := newTimeseriesMap(funcName, keepMetricNames, sharedTimestamps, &rs.MetricName); tsm != nil { samplesScanned := rc.DoTimeseriesMap(tsm, rs.Values, rs.Timestamps) samplesScannedTotal.Add(samplesScanned) seriesByWorkerID[workerID].tss = tsm.AppendTimeseriesTo(seriesByWorkerID[workerID].tss) continue } var ts timeseries samplesScanned := doRollupForTimeseries(funcName, keepMetricNames, rc, &ts, &rs.MetricName, rs.Values, rs.Timestamps, sharedTimestamps) samplesScannedTotal.Add(samplesScanned) seriesByWorkerID[workerID].tss = append(seriesByWorkerID[workerID].tss, &ts) } return nil }) if err != nil { return nil, err } tss := make([]*timeseries, 0, seriesLen*len(rcs)) for i := range seriesByWorkerID { tss = append(tss, seriesByWorkerID[i].tss...) } putTimeseriesByWorkerID(tsw) rowsScannedPerQuery.Update(float64(samplesScannedTotal.Load())) qt.Printf("samplesScanned=%d", samplesScannedTotal.Load()) return tss, nil } func doRollupForTimeseries(funcName string, keepMetricNames bool, rc *rollupConfig, tsDst *timeseries, mnSrc *storage.MetricName, valuesSrc []float64, timestampsSrc []int64, sharedTimestamps []int64) uint64 { tsDst.MetricName.CopyFrom(mnSrc) if len(rc.TagValue) > 0 { tsDst.MetricName.AddTag("rollup", rc.TagValue) } if !keepMetricNames && !rollupFuncsKeepMetricName[funcName] { tsDst.MetricName.ResetMetricGroup() } var samplesScanned uint64 tsDst.Values, samplesScanned = rc.Do(tsDst.Values[:0], valuesSrc, timestampsSrc) tsDst.Timestamps = sharedTimestamps tsDst.denyReuse = true return samplesScanned } type timeseriesWithPadding struct { tss []*timeseries // The padding prevents false sharing on widespread platforms with // 128 mod (cache line size) = 0 . _ [128 - unsafe.Sizeof([]*timeseries{})%128]byte } type timeseriesByWorkerID struct { byWorkerID []timeseriesWithPadding } func (tsw *timeseriesByWorkerID) reset() { byWorkerID := tsw.byWorkerID for i := range byWorkerID { byWorkerID[i].tss = nil } } func getTimeseriesByWorkerID() *timeseriesByWorkerID { v := timeseriesByWorkerIDPool.Get() if v == nil { return ×eriesByWorkerID{ byWorkerID: make([]timeseriesWithPadding, netstorage.MaxWorkers()), } } return v.(*timeseriesByWorkerID) } func putTimeseriesByWorkerID(tsw *timeseriesByWorkerID) { tsw.reset() timeseriesByWorkerIDPool.Put(tsw) } var timeseriesByWorkerIDPool sync.Pool var bbPool bytesutil.ByteBufferPool func evalNumber(ec *EvalConfig, n float64) []*timeseries { var ts timeseries ts.denyReuse = true timestamps := ec.getSharedTimestamps() values := make([]float64, len(timestamps)) for i := range timestamps { values[i] = n } ts.Values = values ts.Timestamps = timestamps return []*timeseries{&ts} } func evalString(ec *EvalConfig, s string) []*timeseries { rv := evalNumber(ec, nan) rv[0].MetricName.MetricGroup = append(rv[0].MetricName.MetricGroup[:0], s...) return rv } func evalTime(ec *EvalConfig) []*timeseries { rv := evalNumber(ec, nan) timestamps := rv[0].Timestamps values := rv[0].Values for i, ts := range timestamps { values[i] = float64(ts) / 1e3 } return rv } func mulNoOverflow(a, b int64) int64 { if math.MaxInt64/b < a { // Overflow return math.MaxInt64 } return a * b } func sumNoOverflow(a, b int64) int64 { if math.MaxInt64-a < b { // Overflow return math.MaxInt64 } return a + b } func dropStaleNaNs(funcName string, values []float64, timestamps []int64) ([]float64, []int64) { if *noStaleMarkers || funcName == "default_rollup" || funcName == "stale_samples_over_time" { // Do not drop Prometheus staleness marks (aka stale NaNs) for default_rollup() function, // since it uses them for Prometheus-style staleness detection. // Do not drop staleness marks for stale_samples_over_time() function, since it needs // to calculate the number of staleness markers. return values, timestamps } // Remove Prometheus staleness marks, so non-default rollup functions don't hit NaN values. hasStaleSamples := false for _, v := range values { if decimal.IsStaleNaN(v) { hasStaleSamples = true break } } if !hasStaleSamples { // Fast path: values have no Prometheus staleness marks. return values, timestamps } // Slow path: drop Prometheus staleness marks from values. dstValues := values[:0] dstTimestamps := timestamps[:0] for i, v := range values { if decimal.IsStaleNaN(v) { continue } dstValues = append(dstValues, v) dstTimestamps = append(dstTimestamps, timestamps[i]) } return dstValues, dstTimestamps }