diff --git a/app/vmselect/promql/aggr_incremental.go b/app/vmselect/promql/aggr_incremental.go index c87ba7bb3..4ae701b45 100644 --- a/app/vmselect/promql/aggr_incremental.go +++ b/app/vmselect/promql/aggr_incremental.go @@ -79,6 +79,13 @@ type incrementalAggrFuncContext struct { callbacks *incrementalAggrFuncCallbacks } +func (iafc *incrementalAggrFuncContext) resetState() { + byWorkerID := iafc.byWorkerID + for i := range byWorkerID { + byWorkerID[i].m = make(map[string]*incrementalAggrContext, len(byWorkerID[i].m)) + } +} + func newIncrementalAggrFuncContext(ae *metricsql.AggrFuncExpr, callbacks *incrementalAggrFuncCallbacks) *incrementalAggrFuncContext { return &incrementalAggrFuncContext{ ae: ae, @@ -154,6 +161,8 @@ func (iafc *incrementalAggrFuncContext) finalizeTimeseries() []*timeseries { finalizeAggrFunc(iac) tss = append(tss, iac.ts) } + // reset iafc state, so it could be re-used + iafc.resetState() return tss } diff --git a/app/vmselect/promql/eval.go b/app/vmselect/promql/eval.go index 2240a7055..5d00efadc 100644 --- a/app/vmselect/promql/eval.go +++ b/app/vmselect/promql/eval.go @@ -16,6 +16,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/memory" @@ -61,7 +62,7 @@ func ValidateMaxPointsPerSeries(start, end, step int64, maxPoints int) error { // AdjustStartEnd adjusts start and end values, so response caching may be enabled. // -// See EvalConfig.mayCache for details. +// See EvalConfig.mayCache() for details. func AdjustStartEnd(start, end, step int64) (int64, int64) { if *disableCache { // Do not adjust start and end values when cache is disabled. @@ -191,6 +192,11 @@ func (ec *EvalConfig) mayCache() bool { if !ec.MayCache { return false } + if ec.Start == ec.End { + // There is no need in aligning start and end to step for instant query + // in order to cache its results. + return true + } if ec.Start%ec.Step != 0 { return false } @@ -1039,6 +1045,262 @@ func removeNanValues(dstValues []float64, dstTimestamps []int64, values []float6 return dstValues, dstTimestamps } +// minWindowForInstantRollupOptimization is the minimum lookbehind window in milliseconds +// for enabling smart caching of instant rollup function results. +const minWindowForInstantRollupOptimization = 24 * 3600 * 1000 + +// evalInstantRollup evaluates instant rollup where ec.Start == ec.End. +func evalInstantRollup(qt *querytracer.Tracer, ec *EvalConfig, funcName string, rf rollupFunc, + expr metricsql.Expr, me *metricsql.MetricExpr, iafc *incrementalAggrFuncContext, window int64) ([]*timeseries, error) { + if ec.Start != ec.End { + logger.Panicf("BUG: evalInstantRollup cannot be called on non-empty time range; got %s", ec.timeRangeString()) + } + timestamp := ec.Start + if qt.Enabled() { + qt = qt.NewChild("instant rollup %s; time=%s, window=%d", expr.AppendString(nil), storage.TimestampToHumanReadableFormat(timestamp), window) + defer qt.Done() + } + + evalAt := func(qt *querytracer.Tracer, timestamp, window int64) ([]*timeseries, error) { + ecCopy := copyEvalConfig(ec) + ecCopy.Start = timestamp + ecCopy.End = timestamp + pointsPerSeries := int64(1) + return evalRollupFuncNoCache(qt, ecCopy, funcName, rf, expr, me, iafc, window, pointsPerSeries) + } + tooBigOffset := func(offset int64) bool { + maxOffset := window / 2 + if maxOffset > 3600*1000 { + maxOffset = 3600 * 1000 + } + return offset >= maxOffset + } + + if !ec.mayCache() { + qt.Printf("do not apply instant rollup optimization because of disabled cache") + return evalAt(qt, timestamp, window) + } + if window < minWindowForInstantRollupOptimization { + qt.Printf("do not apply instant rollup optimization because of too small window=%d; must be equal or bigger than %d", window, minWindowForInstantRollupOptimization) + return evalAt(qt, timestamp, window) + } + switch funcName { + case "avg_over_time": + if iafc != nil { + qt.Printf("do not apply instant rollup optimization for incremental aggregate %s()", iafc.ae.Name) + return evalAt(qt, timestamp, window) + } + qt.Printf("optimized calculation for instant rollup avg_over_time(m[d]) as (sum_over_time(m[d]) / count_over_time(m[d]))") + fe := expr.(*metricsql.FuncExpr) + feSum := *fe + feSum.Name = "sum_over_time" + feCount := *fe + feCount.Name = "count_over_time" + be := &metricsql.BinaryOpExpr{ + Op: "/", + KeepMetricNames: fe.KeepMetricNames, + Left: &feSum, + Right: &feCount, + } + return evalExpr(qt, ec, be) + case "rate": + if iafc != nil { + if strings.ToLower(iafc.ae.Name) != "sum" { + qt.Printf("do not apply instant rollup optimization for incremental aggregate %s()", iafc.ae.Name) + return evalAt(qt, timestamp, window) + } + qt.Printf("optimized calculation for sum(rate(m[d])) as (sum(increase(m[d])) / d)") + afe := expr.(*metricsql.AggrFuncExpr) + fe := afe.Args[0].(*metricsql.FuncExpr) + feIncrease := *fe + feIncrease.Name = "increase" + re := fe.Args[0].(*metricsql.RollupExpr) + d := re.Window.Duration(ec.Step) + if d == 0 { + d = ec.Step + } + afeIncrease := *afe + afeIncrease.Args = []metricsql.Expr{&feIncrease} + be := &metricsql.BinaryOpExpr{ + Op: "/", + KeepMetricNames: true, + Left: &afeIncrease, + Right: &metricsql.NumberExpr{ + N: float64(d) / 1000, + }, + } + return evalExpr(qt, ec, be) + } + qt.Printf("optimized calculation for instant rollup rate(m[d]) as (increase(m[d]) / d)") + fe := expr.(*metricsql.FuncExpr) + feIncrease := *fe + feIncrease.Name = "increase" + re := fe.Args[0].(*metricsql.RollupExpr) + d := re.Window.Duration(ec.Step) + if d == 0 { + d = ec.Step + } + be := &metricsql.BinaryOpExpr{ + Op: "/", + KeepMetricNames: fe.KeepMetricNames, + Left: &feIncrease, + Right: &metricsql.NumberExpr{ + N: float64(d) / 1000, + }, + } + return evalExpr(qt, ec, be) + case "count_over_time", "sum_over_time", "increase": + if iafc != nil && strings.ToLower(iafc.ae.Name) != "sum" { + qt.Printf("do not apply instant rollup optimization for non-sum incremental aggregate %s()", iafc.ae.Name) + return evalAt(qt, timestamp, window) + } + + // Calculate + // + // rf(m[window] @ timestamp) + // + // as + // + // rf(m[window] @ (timestamp-offset)) + rf(m[offset] @ timestamp) - rf(m[offset] @ (timestamp-window)) + // + // where + // + // - rf is count_over_time, sum_over_time or increase + // - rf(m[window] @ (timestamp-offset)) is obtained from cache + // - rf(m[offset] @ timestamp) and rf(m[offset] @ (timestamp-window)) are calculated from the storage + // These rollups are calculated faster than rf(m[window]) because offset is smaller than window. + qtChild := qt.NewChild("optimized calculation for instant rollup %s at time=%s with lookbehind window=%d", + expr.AppendString(nil), storage.TimestampToHumanReadableFormat(timestamp), window) + defer qtChild.Done() + + again: + offset := int64(0) + tssCached := rollupResultCacheV.GetInstantValues(qtChild, expr, window, ec.Step, ec.EnforcedTagFilterss) + ec.QueryStats.addSeriesFetched(len(tssCached)) + if len(tssCached) == 0 { + // Cache miss. Re-populate it + start := int64(fasttime.UnixTimestamp()*1000) - cacheTimestampOffset.Milliseconds() + offset = timestamp - start + if offset < 0 { + start = timestamp + offset = 0 + } + if tooBigOffset(offset) { + qtChild.Printf("cannot apply instant rollup optimization because the -search.cacheTimestampOffset=%s is too big "+ + "for the requested time=%s and window=%d", cacheTimestampOffset, storage.TimestampToHumanReadableFormat(timestamp), window) + return evalAt(qtChild, timestamp, window) + } + qtChild.Printf("calculating the rollup at time=%s, because it is missing in the cache", storage.TimestampToHumanReadableFormat(start)) + tss, err := evalAt(qtChild, start, window) + if err != nil { + return nil, err + } + rollupResultCacheV.PutInstantValues(qtChild, expr, window, ec.Step, ec.EnforcedTagFilterss, tss) + tssCached = tss + } else { + offset = timestamp - tssCached[0].Timestamps[0] + if offset < 0 { + qtChild.Printf("do not apply instant rollup optimization because the cached values have bigger timestamp=%s than the requested one=%s", + storage.TimestampToHumanReadableFormat(tssCached[0].Timestamps[0]), storage.TimestampToHumanReadableFormat(timestamp)) + // Delete the outdated cached values, so the cache could be re-populated with newer values. + rollupResultCacheV.DeleteInstantValues(qtChild, expr, window, ec.Step, ec.EnforcedTagFilterss) + goto again + } + if tooBigOffset(offset) { + qtChild.Printf("do not apply instant rollup optimization because the offset=%d between the requested timestamp "+ + "and the cached values is too big comparing to window=%d", offset, window) + // Delete the outdated cached values, so the cache could be re-populated with newer values. + rollupResultCacheV.DeleteInstantValues(qtChild, expr, window, ec.Step, ec.EnforcedTagFilterss) + goto again + } + } + if offset == 0 { + qtChild.Printf("return cached values, since they have the requested timestamp=%s", storage.TimestampToHumanReadableFormat(timestamp)) + return tssCached, nil + } + // Calculate count_over_time(m[offset] @ timestamp) + tssStart, err := evalAt(qtChild, timestamp, offset) + if err != nil { + return nil, err + } + // Calculate count_over_time(m[offset] @ (timestamp - window)) + tssEnd, err := evalAt(qtChild, timestamp-window, offset) + if err != nil { + return nil, err + } + tss, err := mergeInstantValues(qtChild, tssCached, tssStart, tssEnd) + if err != nil { + return nil, fmt.Errorf("cannot merge instant series: %w", err) + } + return tss, nil + default: + qt.Printf("instant rollup optimization isn't implemented for %s()", funcName) + return evalAt(qt, timestamp, window) + } +} + +// mergeInstantValues calculates tssCached + tssStart - tssEnd +func mergeInstantValues(qt *querytracer.Tracer, tssCached, tssStart, tssEnd []*timeseries) ([]*timeseries, error) { + qt = qt.NewChild("merge instant values across series; cached=%d, start=%d, end=%d", len(tssCached), len(tssStart), len(tssEnd)) + defer qt.Done() + + assertInstantValues(tssCached) + assertInstantValues(tssStart) + assertInstantValues(tssEnd) + + m := make(map[string]*timeseries, len(tssCached)) + bb := bbPool.Get() + defer bbPool.Put(bb) + + for _, ts := range tssCached { + bb.B = marshalMetricNameSorted(bb.B[:0], &ts.MetricName) + if tsExisting := m[string(bb.B)]; tsExisting != nil { + return nil, fmt.Errorf("duplicate series found: %s", &ts.MetricName) + } + m[string(bb.B)] = ts + } + + for _, ts := range tssStart { + bb.B = marshalMetricNameSorted(bb.B[:0], &ts.MetricName) + tsCached := m[string(bb.B)] + if tsCached != nil && !math.IsNaN(tsCached.Values[0]) { + if !math.IsNaN(ts.Values[0]) { + tsCached.Values[0] += ts.Values[0] + } + } else { + m[string(bb.B)] = ts + } + } + + for _, ts := range tssEnd { + bb.B = marshalMetricNameSorted(bb.B[:0], &ts.MetricName) + tsCached := m[string(bb.B)] + if tsCached != nil && !math.IsNaN(tsCached.Values[0]) { + if !math.IsNaN(ts.Values[0]) { + tsCached.Values[0] -= ts.Values[0] + } + } + } + + rvs := make([]*timeseries, 0, len(m)) + for _, ts := range m { + rvs = append(rvs, ts) + } + qt.Printf("resulting series=%d", len(rvs)) + return rvs, nil +} + +func assertInstantValues(tss []*timeseries) { + for _, ts := range tss { + if len(ts.Values) != 1 { + logger.Panicf("BUG: instant series must contain a single value; got %d values", len(ts.Values)) + } + if len(ts.Timestamps) != 1 { + logger.Panicf("BUG: instant series must contain a single timestamp; got %d timestamps", len(ts.Timestamps)) + } + } +} + var ( rollupResultCacheFullHits = metrics.NewCounter(`vm_rollup_result_cache_full_hits_total`) rollupResultCachePartialHits = metrics.NewCounter(`vm_rollup_result_cache_partial_hits_total`) @@ -1049,23 +1311,28 @@ var ( func evalRollupFuncWithMetricExpr(qt *querytracer.Tracer, ec *EvalConfig, funcName string, rf rollupFunc, expr metricsql.Expr, me *metricsql.MetricExpr, iafc *incrementalAggrFuncContext, windowExpr *metricsql.DurationExpr) ([]*timeseries, error) { - var rollupMemorySize int64 window, err := windowExpr.NonNegativeDuration(ec.Step) if err != nil { return nil, fmt.Errorf("cannot parse lookbehind window in square brackets at %s: %w", expr.AppendString(nil), err) } - if qt.Enabled() { - qt = qt.NewChild("rollup %s(): timeRange=%s, step=%d, window=%d", funcName, ec.timeRangeString(), ec.Step, window) - defer func() { - qt.Donef("neededMemoryBytes=%d", rollupMemorySize) - }() - } if me.IsEmpty() { return evalNumber(ec, nan), nil } + if ec.Start == ec.End { + rvs, err := evalInstantRollup(qt, ec, funcName, rf, expr, me, iafc, window) + if err != nil { + err = &UserReadableError{ + Err: err, + } + return nil, err + } + return rvs, nil + } + // Search for partial results in cache. - tssCached, start := rollupResultCacheV.Get(qt, ec, expr, window) + tssCached, start := rollupResultCacheV.GetSeries(qt, ec, expr, window) + ec.QueryStats.addSeriesFetched(len(tssCached)) if start > ec.End { // The result is fully cached. rollupResultCacheFullHits.Inc() @@ -1077,10 +1344,41 @@ func evalRollupFuncWithMetricExpr(qt *querytracer.Tracer, ec *EvalConfig, funcNa rollupResultCacheMiss.Inc() } - // Obtain rollup configs before fetching data from db, - // so type errors can be caught earlier. - sharedTimestamps := getTimestamps(start, ec.End, ec.Step, ec.MaxPointsPerSeries) - preFunc, rcs, err := getRollupConfigs(funcName, rf, expr, start, ec.End, ec.Step, ec.MaxPointsPerSeries, window, ec.LookbackDelta, sharedTimestamps) + ecCopy := copyEvalConfig(ec) + ecCopy.Start = start + pointsPerSeries := 1 + (ec.End-ec.Start)/ec.Step + tss, err := evalRollupFuncNoCache(qt, ec, funcName, rf, expr, me, iafc, window, pointsPerSeries) + if err != nil { + err = &UserReadableError{ + Err: err, + } + return nil, err + } + rvs, err := mergeTimeseries(qt, tssCached, tss, start, ec) + if err != nil { + return nil, fmt.Errorf("cannot merge series: %w", err) + } + if tss != nil { + rollupResultCacheV.PutSeries(qt, ec, expr, window, tss) + } + return rvs, nil +} + +// evalRollupFuncNoCache calculates the given rf with the given lookbehind window. +// +// pointsPerSeries is used only for estimating the needed memory for query processing +func evalRollupFuncNoCache(qt *querytracer.Tracer, ec *EvalConfig, funcName string, rf rollupFunc, + expr metricsql.Expr, me *metricsql.MetricExpr, iafc *incrementalAggrFuncContext, window, pointsPerSeries int64) ([]*timeseries, error) { + if qt.Enabled() { + qt = qt.NewChild("rollup %s: timeRange=%s, step=%d, window=%d", expr.AppendString(nil), ec.timeRangeString(), ec.Step, window) + defer qt.Done() + } + if window <= 0 { + return nil, nil + } + // Obtain rollup configs before fetching data from db, so type errors could be caught earlier. + sharedTimestamps := getTimestamps(ec.Start, ec.End, ec.Step, ec.MaxPointsPerSeries) + preFunc, rcs, err := getRollupConfigs(funcName, rf, expr, ec.Start, ec.End, ec.Step, ec.MaxPointsPerSeries, window, ec.LookbackDelta, sharedTimestamps) if err != nil { return nil, err } @@ -1088,7 +1386,10 @@ func evalRollupFuncWithMetricExpr(qt *querytracer.Tracer, ec *EvalConfig, funcNa // Fetch the remaining part of the result. tfss := searchutils.ToTagFilterss(me.LabelFilterss) tfss = searchutils.JoinTagFilterss(tfss, ec.EnforcedTagFilterss) - minTimestamp := start - maxSilenceInterval + minTimestamp := ec.Start + if needSilenceIntervalForRollupFunc(funcName) { + minTimestamp -= maxSilenceInterval + } if window > ec.Step { minTimestamp -= window } else { @@ -1100,21 +1401,16 @@ func evalRollupFuncWithMetricExpr(qt *querytracer.Tracer, ec *EvalConfig, funcNa sq := storage.NewSearchQuery(minTimestamp, ec.End, tfss, ec.MaxSeries) rss, err := netstorage.ProcessSearchQuery(qt, sq, ec.Deadline) if err != nil { - return nil, &UserReadableError{ - Err: err, - } + return nil, err } rssLen := rss.Len() if rssLen == 0 { rss.Cancel() - tss := mergeTimeseries(tssCached, nil, start, ec) - return tss, nil + return nil, nil } ec.QueryStats.addSeriesFetched(rssLen) - // Verify timeseries fit available memory after the rollup. - // Take into account points from tssCached. - pointsPerTimeseries := 1 + (ec.End-ec.Start)/ec.Step + // Verify timeseries fit available memory during rollup calculations. timeseriesLen := rssLen if iafc != nil { // Incremental aggregates require holding only GOMAXPROCS timeseries in memory. @@ -1134,8 +1430,8 @@ func evalRollupFuncWithMetricExpr(qt *querytracer.Tracer, ec *EvalConfig, funcNa timeseriesLen = rssLen } } - rollupPoints := mulNoOverflow(pointsPerTimeseries, int64(timeseriesLen*len(rcs))) - rollupMemorySize = sumNoOverflow(mulNoOverflow(int64(rssLen), 1000), mulNoOverflow(rollupPoints, 16)) + rollupPoints := mulNoOverflow(pointsPerSeries, int64(timeseriesLen*len(rcs))) + rollupMemorySize := sumNoOverflow(mulNoOverflow(int64(rssLen), 1000), mulNoOverflow(rollupPoints, 16)) if maxMemory := int64(logQueryMemoryUsage.N); maxMemory > 0 && rollupMemorySize > maxMemory { memoryIntensiveQueries.Inc() requestURI := ec.GetRequestURI() @@ -1146,44 +1442,33 @@ func evalRollupFuncWithMetricExpr(qt *querytracer.Tracer, ec *EvalConfig, funcNa } if maxMemory := int64(maxMemoryPerQuery.N); maxMemory > 0 && rollupMemorySize > maxMemory { rss.Cancel() - return nil, &UserReadableError{ - Err: fmt.Errorf("not enough memory for processing %s, which returns %d data points across %d time series with %d points in each time series "+ - "according to -search.maxMemoryPerQuery=%d; requested memory: %d bytes; "+ - "possible solutions are: reducing the number of matching time series; increasing `step` query arg (step=%gs); "+ - "increasing -search.maxMemoryPerQuery", - expr.AppendString(nil), rollupPoints, timeseriesLen*len(rcs), pointsPerTimeseries, maxMemory, rollupMemorySize, float64(ec.Step)/1e3), - } + err := fmt.Errorf("not enough memory for processing %s, which returns %d data points across %d time series with %d points in each time series "+ + "according to -search.maxMemoryPerQuery=%d; requested memory: %d bytes; "+ + "possible solutions are: reducing the number of matching time series; increasing `step` query arg (step=%gs); "+ + "increasing -search.maxMemoryPerQuery", + expr.AppendString(nil), rollupPoints, timeseriesLen*len(rcs), pointsPerSeries, maxMemory, rollupMemorySize, float64(ec.Step)/1e3) + return nil, err } rml := getRollupMemoryLimiter() if !rml.Get(uint64(rollupMemorySize)) { rss.Cancel() - return nil, &UserReadableError{ - Err: fmt.Errorf("not enough memory for processing %s, which returns %d data points across %d time series with %d points in each time series; "+ - "total available memory for concurrent requests: %d bytes; "+ - "requested memory: %d bytes; "+ - "possible solutions are: reducing the number of matching time series; increasing `step` query arg (step=%gs); "+ - "switching to node with more RAM; increasing -memory.allowedPercent", - expr.AppendString(nil), rollupPoints, timeseriesLen*len(rcs), pointsPerTimeseries, rml.MaxSize, uint64(rollupMemorySize), float64(ec.Step)/1e3), - } + err := fmt.Errorf("not enough memory for processing %s, which returns %d data points across %d time series with %d points in each time series; "+ + "total available memory for concurrent requests: %d bytes; requested memory: %d bytes; "+ + "possible solutions are: reducing the number of matching time series; increasing `step` query arg (step=%gs); "+ + "switching to node with more RAM; increasing -memory.allowedPercent", + expr.AppendString(nil), rollupPoints, timeseriesLen*len(rcs), pointsPerSeries, rml.MaxSize, uint64(rollupMemorySize), float64(ec.Step)/1e3) + return nil, err } defer rml.Put(uint64(rollupMemorySize)) + qt.Printf("the rollup evaluation needs an estimated %d bytes of RAM for %d series and %d points per series (summary %d points)", + rollupMemorySize, timeseriesLen, pointsPerSeries, rollupPoints) // Evaluate rollup keepMetricNames := getKeepMetricNames(expr) - var tss []*timeseries if iafc != nil { - tss, err = evalRollupWithIncrementalAggregate(qt, funcName, keepMetricNames, iafc, rss, rcs, preFunc, sharedTimestamps) - } else { - tss, err = evalRollupNoIncrementalAggregate(qt, funcName, keepMetricNames, rss, rcs, preFunc, sharedTimestamps) + return evalRollupWithIncrementalAggregate(qt, funcName, keepMetricNames, iafc, rss, rcs, preFunc, sharedTimestamps) } - if err != nil { - return nil, &UserReadableError{ - Err: err, - } - } - tss = mergeTimeseries(tssCached, tss, start, ec) - rollupResultCacheV.Put(qt, ec, expr, window, tss) - return tss, nil + return evalRollupNoIncrementalAggregate(qt, funcName, keepMetricNames, rss, rcs, preFunc, sharedTimestamps) } var ( @@ -1198,6 +1483,53 @@ func getRollupMemoryLimiter() *memoryLimiter { return &rollupMemoryLimiter } +func needSilenceIntervalForRollupFunc(funcName string) bool { + // All rollup the functions, which do not rely on the previous sample + // before the lookbehind window (aka prevValue), do not need silence interval. + switch strings.ToLower(funcName) { + case + "absent_over_time", + "avg_over_time", + "count_eq_over_time", + "count_gt_over_time", + "count_le_over_time", + "count_ne_over_time", + "count_over_time", + "default_rollup", + "first_over_time", + "histogram_over_time", + "hoeffding_bound_lower", + "hoeffding_bound_upper", + "last_over_time", + "mad_over_time", + "max_over_time", + "median_over_time", + "min_over_time", + "predict_linear", + "present_over_time", + "quantile_over_time", + "quantiles_over_time", + "range_over_time", + "share_gt_over_time", + "share_le_over_time", + "share_eq_over_time", + "stale_samples_over_time", + "stddev_over_time", + "stdvar_over_time", + "sum_over_time", + "tfirst_over_time", + "timestamp", + "timestamp_with_name", + "tlast_over_time", + "tmax_over_time", + "tmin_over_time", + "zscore_over_time": + return false + default: + return true + } +} + func evalRollupWithIncrementalAggregate(qt *querytracer.Tracer, funcName string, keepMetricNames bool, iafc *incrementalAggrFuncContext, rss *netstorage.Results, rcs []*rollupConfig, preFunc func(values []float64, timestamps []int64), sharedTimestamps []int64) ([]*timeseries, error) { diff --git a/app/vmselect/promql/rollup_result_cache.go b/app/vmselect/promql/rollup_result_cache.go index b5ecaf147..d47490e4a 100644 --- a/app/vmselect/promql/rollup_result_cache.go +++ b/app/vmselect/promql/rollup_result_cache.go @@ -202,11 +202,74 @@ func ResetRollupResultCache() { logger.Infof("rollupResult cache has been cleared") } -func (rrc *rollupResultCache) Get(qt *querytracer.Tracer, ec *EvalConfig, expr metricsql.Expr, window int64) (tss []*timeseries, newStart int64) { +func (rrc *rollupResultCache) GetInstantValues(qt *querytracer.Tracer, expr metricsql.Expr, window, step int64, etfss [][]storage.TagFilter) []*timeseries { if qt.Enabled() { query := string(expr.AppendString(nil)) query = bytesutil.LimitStringLen(query, 300) - qt = qt.NewChild("rollup cache get: query=%s, timeRange=%s, step=%d, window=%d", query, ec.timeRangeString(), ec.Step, window) + qt = qt.NewChild("rollup cache get instant values: query=%s, window=%d, step=%d", query, window, step) + defer qt.Done() + } + + // Obtain instant values from the cache + bb := bbPool.Get() + defer bbPool.Put(bb) + + bb.B = marshalRollupResultCacheKeyForInstantValues(bb.B[:0], expr, window, step, etfss) + tss, ok := rrc.getSeriesFromCache(qt, bb.B) + if !ok || len(tss) == 0 { + return nil + } + assertInstantValues(tss) + qt.Printf("found %d series for time=%s", len(tss), storage.TimestampToHumanReadableFormat(tss[0].Timestamps[0])) + return tss +} + +func (rrc *rollupResultCache) PutInstantValues(qt *querytracer.Tracer, expr metricsql.Expr, window, step int64, etfss [][]storage.TagFilter, tss []*timeseries) { + if qt.Enabled() { + query := string(expr.AppendString(nil)) + query = bytesutil.LimitStringLen(query, 300) + startStr := "" + if len(tss) > 0 { + startStr = storage.TimestampToHumanReadableFormat(tss[0].Timestamps[0]) + } + qt = qt.NewChild("rollup cache put instant values: query=%s, window=%d, step=%d, series=%d, time=%s", query, window, step, len(tss), startStr) + defer qt.Done() + } + if len(tss) == 0 { + qt.Printf("do not cache empty series list") + return + } + + assertInstantValues(tss) + + bb := bbPool.Get() + defer bbPool.Put(bb) + + bb.B = marshalRollupResultCacheKeyForInstantValues(bb.B[:0], expr, window, step, etfss) + _ = rrc.putSeriesToCache(qt, bb.B, step, tss) +} + +func (rrc *rollupResultCache) DeleteInstantValues(qt *querytracer.Tracer, expr metricsql.Expr, window, step int64, etfss [][]storage.TagFilter) { + bb := bbPool.Get() + defer bbPool.Put(bb) + + bb.B = marshalRollupResultCacheKeyForInstantValues(bb.B[:0], expr, window, step, etfss) + if !rrc.putSeriesToCache(qt, bb.B, step, nil) { + logger.Panicf("BUG: cannot store zero series to cache") + } + + if qt.Enabled() { + query := string(expr.AppendString(nil)) + query = bytesutil.LimitStringLen(query, 300) + qt.Printf("rollup result cache delete instant values: query=%s, window=%d, step=%d", query, window, step) + } +} + +func (rrc *rollupResultCache) GetSeries(qt *querytracer.Tracer, ec *EvalConfig, expr metricsql.Expr, window int64) (tss []*timeseries, newStart int64) { + if qt.Enabled() { + query := string(expr.AppendString(nil)) + query = bytesutil.LimitStringLen(query, 300) + qt = qt.NewChild("rollup cache get series: query=%s, timeRange=%s, window=%d, step=%d", query, ec.timeRangeString(), window, ec.Step) defer qt.Done() } if !ec.mayCache() { @@ -218,7 +281,7 @@ func (rrc *rollupResultCache) Get(qt *querytracer.Tracer, ec *EvalConfig, expr m bb := bbPool.Get() defer bbPool.Put(bb) - bb.B = marshalRollupResultCacheKey(bb.B[:0], expr, window, ec.Step, ec.EnforcedTagFilterss) + bb.B = marshalRollupResultCacheKeyForSeries(bb.B[:0], expr, window, ec.Step, ec.EnforcedTagFilterss) metainfoBuf := rrc.c.Get(nil, bb.B) if len(metainfoBuf) == 0 { qt.Printf("nothing found") @@ -233,31 +296,17 @@ func (rrc *rollupResultCache) Get(qt *querytracer.Tracer, ec *EvalConfig, expr m qt.Printf("nothing found on the timeRange") return nil, ec.Start } + + var ok bool bb.B = key.Marshal(bb.B[:0]) - compressedResultBuf := resultBufPool.Get() - defer resultBufPool.Put(compressedResultBuf) - compressedResultBuf.B = rrc.c.GetBig(compressedResultBuf.B[:0], bb.B) - if len(compressedResultBuf.B) == 0 { + tss, ok = rrc.getSeriesFromCache(qt, bb.B) + if !ok { mi.RemoveKey(key) metainfoBuf = mi.Marshal(metainfoBuf[:0]) - bb.B = marshalRollupResultCacheKey(bb.B[:0], expr, window, ec.Step, ec.EnforcedTagFilterss) + bb.B = marshalRollupResultCacheKeyForSeries(bb.B[:0], expr, window, ec.Step, ec.EnforcedTagFilterss) rrc.c.Set(bb.B, metainfoBuf) - qt.Printf("missing cache entry") return nil, ec.Start } - // Decompress into newly allocated byte slice, since tss returned from unmarshalTimeseriesFast - // refers to the byte slice, so it cannot be returned to the resultBufPool. - qt.Printf("load compressed entry from cache with size %d bytes", len(compressedResultBuf.B)) - resultBuf, err := encoding.DecompressZSTD(nil, compressedResultBuf.B) - if err != nil { - logger.Panicf("BUG: cannot decompress resultBuf from rollupResultCache: %s; it looks like it was improperly saved", err) - } - qt.Printf("unpack the entry into %d bytes", len(resultBuf)) - tss, err = unmarshalTimeseriesFast(resultBuf) - if err != nil { - logger.Panicf("BUG: cannot unmarshal timeseries from rollupResultCache: %s; it looks like it was improperly saved", err) - } - qt.Printf("unmarshal %d series", len(tss)) // Extract values for the matching timestamps timestamps := tss[0].Timestamps @@ -303,17 +352,21 @@ func (rrc *rollupResultCache) Get(qt *querytracer.Tracer, ec *EvalConfig, expr m var resultBufPool bytesutil.ByteBufferPool -func (rrc *rollupResultCache) Put(qt *querytracer.Tracer, ec *EvalConfig, expr metricsql.Expr, window int64, tss []*timeseries) { +func (rrc *rollupResultCache) PutSeries(qt *querytracer.Tracer, ec *EvalConfig, expr metricsql.Expr, window int64, tss []*timeseries) { if qt.Enabled() { query := string(expr.AppendString(nil)) query = bytesutil.LimitStringLen(query, 300) - qt = qt.NewChild("rollup cache put: query=%s, timeRange=%s, step=%d, window=%d, series=%d", query, ec.timeRangeString(), ec.Step, window, len(tss)) + qt = qt.NewChild("rollup cache put series: query=%s, timeRange=%s, step=%d, window=%d, series=%d", query, ec.timeRangeString(), ec.Step, window, len(tss)) defer qt.Done() } - if len(tss) == 0 || !ec.mayCache() { + if !ec.mayCache() { qt.Printf("do not store series to cache, since it is disabled in the current context") return } + if len(tss) == 0 { + qt.Printf("do not store empty series list") + return + } // Remove values up to currentTime - step - cacheTimestampOffset, // since these values may be added later. @@ -346,7 +399,7 @@ func (rrc *rollupResultCache) Put(qt *querytracer.Tracer, ec *EvalConfig, expr m metainfoBuf := bbPool.Get() defer bbPool.Put(metainfoBuf) - metainfoKey.B = marshalRollupResultCacheKey(metainfoKey.B[:0], expr, window, ec.Step, ec.EnforcedTagFilterss) + metainfoKey.B = marshalRollupResultCacheKeyForSeries(metainfoKey.B[:0], expr, window, ec.Step, ec.EnforcedTagFilterss) metainfoBuf.B = rrc.c.Get(metainfoBuf.B[:0], metainfoKey.B) var mi rollupResultCacheMetainfo if len(metainfoBuf.B) > 0 { @@ -365,31 +418,17 @@ func (rrc *rollupResultCache) Put(qt *querytracer.Tracer, ec *EvalConfig, expr m return } - maxMarshaledSize := getRollupResultCacheSize() / 4 - resultBuf := resultBufPool.Get() - defer resultBufPool.Put(resultBuf) - resultBuf.B = marshalTimeseriesFast(resultBuf.B[:0], tss, maxMarshaledSize, ec.Step) - if len(resultBuf.B) == 0 { - tooBigRollupResults.Inc() - qt.Printf("cannot store series in the cache, since they would occupy more than %d bytes", maxMarshaledSize) - return - } - if qt.Enabled() { - startString := storage.TimestampToHumanReadableFormat(start) - endString := storage.TimestampToHumanReadableFormat(end) - qt.Printf("marshal %d series on a timeRange=[%s..%s] into %d bytes", len(tss), startString, endString, len(resultBuf.B)) - } - compressedResultBuf := resultBufPool.Get() - defer resultBufPool.Put(compressedResultBuf) - compressedResultBuf.B = encoding.CompressZSTDLevel(compressedResultBuf.B[:0], resultBuf.B, 1) - qt.Printf("compress %d bytes into %d bytes", len(resultBuf.B), len(compressedResultBuf.B)) - var key rollupResultCacheKey key.prefix = rollupResultCacheKeyPrefix key.suffix = atomic.AddUint64(&rollupResultCacheKeySuffix, 1) - rollupResultKey := key.Marshal(nil) - rrc.c.SetBig(rollupResultKey, compressedResultBuf.B) - qt.Printf("store %d bytes in the cache", len(compressedResultBuf.B)) + + bb := bbPool.Get() + bb.B = key.Marshal(bb.B[:0]) + ok := rrc.putSeriesToCache(qt, bb.B, ec.Step, tss) + bbPool.Put(bb) + if !ok { + return + } mi.AddKey(key, timestamps[0], timestamps[len(timestamps)-1]) metainfoBuf.B = mi.Marshal(metainfoBuf.B[:0]) @@ -401,6 +440,52 @@ var ( rollupResultCacheKeySuffix = uint64(time.Now().UnixNano()) ) +func (rrc *rollupResultCache) getSeriesFromCache(qt *querytracer.Tracer, key []byte) ([]*timeseries, bool) { + compressedResultBuf := resultBufPool.Get() + compressedResultBuf.B = rrc.c.GetBig(compressedResultBuf.B[:0], key) + if len(compressedResultBuf.B) == 0 { + qt.Printf("nothing found in the cache") + resultBufPool.Put(compressedResultBuf) + return nil, false + } + qt.Printf("load compressed entry from cache with size %d bytes", len(compressedResultBuf.B)) + // Decompress into newly allocated byte slice, since tss returned from unmarshalTimeseriesFast + // refers to the byte slice, so it cannot be re-used. + resultBuf, err := encoding.DecompressZSTD(nil, compressedResultBuf.B) + if err != nil { + logger.Panicf("BUG: cannot decompress resultBuf from rollupResultCache: %s; it looks like it was improperly saved", err) + } + resultBufPool.Put(compressedResultBuf) + qt.Printf("unpack the entry into %d bytes", len(resultBuf)) + tss, err := unmarshalTimeseriesFast(resultBuf) + if err != nil { + logger.Panicf("BUG: cannot unmarshal timeseries from rollupResultCache: %s; it looks like it was improperly saved", err) + } + qt.Printf("unmarshal %d series", len(tss)) + return tss, true +} + +func (rrc *rollupResultCache) putSeriesToCache(qt *querytracer.Tracer, key []byte, step int64, tss []*timeseries) bool { + maxMarshaledSize := getRollupResultCacheSize() / 4 + resultBuf := resultBufPool.Get() + defer resultBufPool.Put(resultBuf) + resultBuf.B = marshalTimeseriesFast(resultBuf.B[:0], tss, maxMarshaledSize, step) + if len(resultBuf.B) == 0 { + tooBigRollupResults.Inc() + qt.Printf("cannot store %d series in the cache, since they would occupy more than %d bytes", len(tss), maxMarshaledSize) + return false + } + qt.Printf("marshal %d series into %d bytes", len(tss), len(resultBuf.B)) + compressedResultBuf := resultBufPool.Get() + defer resultBufPool.Put(compressedResultBuf) + compressedResultBuf.B = encoding.CompressZSTDLevel(compressedResultBuf.B[:0], resultBuf.B, 1) + qt.Printf("compress %d bytes into %d bytes", len(resultBuf.B), len(compressedResultBuf.B)) + + rrc.c.SetBig(key, compressedResultBuf.B) + qt.Printf("store %d bytes in the cache", len(compressedResultBuf.B)) + return true +} + func newRollupResultCacheKeyPrefix() uint64 { var buf [8]byte if _, err := rand.Read(buf[:]); err != nil { @@ -439,14 +524,36 @@ func mustSaveRollupResultCacheKeyPrefix(path string) { var tooBigRollupResults = metrics.NewCounter("vm_too_big_rollup_results_total") // Increment this value every time the format of the cache changes. -const rollupResultCacheVersion = 9 +const rollupResultCacheVersion = 10 -func marshalRollupResultCacheKey(dst []byte, expr metricsql.Expr, window, step int64, etfs [][]storage.TagFilter) []byte { +const ( + rollupResultCacheTypeSeries = 0 + rollupResultCacheTypeInstantValues = 1 +) + +func marshalRollupResultCacheKeyForSeries(dst []byte, expr metricsql.Expr, window, step int64, etfs [][]storage.TagFilter) []byte { dst = append(dst, rollupResultCacheVersion) dst = encoding.MarshalUint64(dst, rollupResultCacheKeyPrefix) + dst = append(dst, rollupResultCacheTypeSeries) dst = encoding.MarshalInt64(dst, window) dst = encoding.MarshalInt64(dst, step) + dst = marshalTagFiltersForRollupResultCacheKey(dst, etfs) dst = expr.AppendString(dst) + return dst +} + +func marshalRollupResultCacheKeyForInstantValues(dst []byte, expr metricsql.Expr, window, step int64, etfs [][]storage.TagFilter) []byte { + dst = append(dst, rollupResultCacheVersion) + dst = encoding.MarshalUint64(dst, rollupResultCacheKeyPrefix) + dst = append(dst, rollupResultCacheTypeInstantValues) + dst = encoding.MarshalInt64(dst, window) + dst = encoding.MarshalInt64(dst, step) + dst = marshalTagFiltersForRollupResultCacheKey(dst, etfs) + dst = expr.AppendString(dst) + return dst +} + +func marshalTagFiltersForRollupResultCacheKey(dst []byte, etfs [][]storage.TagFilter) []byte { for i, etf := range etfs { for _, f := range etf { dst = f.Marshal(dst) @@ -461,12 +568,15 @@ func marshalRollupResultCacheKey(dst []byte, expr metricsql.Expr, window, step i // mergeTimeseries concatenates b with a and returns the result. // // Preconditions: -// - a mustn't intersect with b. +// - a mustn't intersect with b by timestamps. // - a timestamps must be smaller than b timestamps. // // Postconditions: // - a and b cannot be used after returning from the call. -func mergeTimeseries(a, b []*timeseries, bStart int64, ec *EvalConfig) []*timeseries { +func mergeTimeseries(qt *querytracer.Tracer, a, b []*timeseries, bStart int64, ec *EvalConfig) ([]*timeseries, error) { + qt = qt.NewChild("merge series len(a)=%d, len(b)=%d", len(a), len(b)) + defer qt.Done() + sharedTimestamps := ec.getSharedTimestamps() if bStart == ec.Start { // Nothing to merge - b covers all the time range. @@ -478,7 +588,7 @@ func mergeTimeseries(a, b []*timeseries, bStart int64, ec *EvalConfig) []*timese logger.Panicf("BUG: unexpected number of values in b; got %d; want %d", len(tsB.Values), len(tsB.Timestamps)) } } - return b + return b, nil } m := make(map[string]*timeseries, len(a)) @@ -486,6 +596,9 @@ func mergeTimeseries(a, b []*timeseries, bStart int64, ec *EvalConfig) []*timese defer bbPool.Put(bb) for _, ts := range a { bb.B = marshalMetricNameSorted(bb.B[:0], &ts.MetricName) + if _, ok := m[string(bb.B)]; ok { + return nil, fmt.Errorf("duplicate series found: %s", &ts.MetricName) + } m[string(bb.B)] = ts } @@ -536,7 +649,8 @@ func mergeTimeseries(a, b []*timeseries, bStart int64, ec *EvalConfig) []*timese } rvs = append(rvs, &tmp) } - return rvs + qt.Printf("resulting series=%d", len(rvs)) + return rvs, nil } type rollupResultCacheMetainfo struct { diff --git a/app/vmselect/promql/rollup_result_cache_test.go b/app/vmselect/promql/rollup_result_cache_test.go index d72a9f8d4..3bac0deb1 100644 --- a/app/vmselect/promql/rollup_result_cache_test.go +++ b/app/vmselect/promql/rollup_result_cache_test.go @@ -61,7 +61,7 @@ func TestRollupResultCache(t *testing.T) { // Try obtaining an empty value. t.Run("empty", func(t *testing.T) { - tss, newStart := rollupResultCacheV.Get(nil, ec, fe, window) + tss, newStart := rollupResultCacheV.GetSeries(nil, ec, fe, window) if newStart != ec.Start { t.Fatalf("unexpected newStart; got %d; want %d", newStart, ec.Start) } @@ -79,8 +79,8 @@ func TestRollupResultCache(t *testing.T) { Values: []float64{0, 1, 2}, }, } - rollupResultCacheV.Put(nil, ec, fe, window, tss) - tss, newStart := rollupResultCacheV.Get(nil, ec, fe, window) + rollupResultCacheV.PutSeries(nil, ec, fe, window, tss) + tss, newStart := rollupResultCacheV.GetSeries(nil, ec, fe, window) if newStart != 1400 { t.Fatalf("unexpected newStart; got %d; want %d", newStart, 1400) } @@ -100,8 +100,8 @@ func TestRollupResultCache(t *testing.T) { Values: []float64{0, 1, 2}, }, } - rollupResultCacheV.Put(nil, ec, ae, window, tss) - tss, newStart := rollupResultCacheV.Get(nil, ec, ae, window) + rollupResultCacheV.PutSeries(nil, ec, ae, window, tss) + tss, newStart := rollupResultCacheV.GetSeries(nil, ec, ae, window) if newStart != 1400 { t.Fatalf("unexpected newStart; got %d; want %d", newStart, 1400) } @@ -123,8 +123,8 @@ func TestRollupResultCache(t *testing.T) { Values: []float64{333, 0, 1, 2}, }, } - rollupResultCacheV.Put(nil, ec, fe, window, tss) - tss, newStart := rollupResultCacheV.Get(nil, ec, fe, window) + rollupResultCacheV.PutSeries(nil, ec, fe, window, tss) + tss, newStart := rollupResultCacheV.GetSeries(nil, ec, fe, window) if newStart != 1000 { t.Fatalf("unexpected newStart; got %d; want %d", newStart, 1000) } @@ -142,8 +142,8 @@ func TestRollupResultCache(t *testing.T) { Values: []float64{0, 1, 2}, }, } - rollupResultCacheV.Put(nil, ec, fe, window, tss) - tss, newStart := rollupResultCacheV.Get(nil, ec, fe, window) + rollupResultCacheV.PutSeries(nil, ec, fe, window, tss) + tss, newStart := rollupResultCacheV.GetSeries(nil, ec, fe, window) if newStart != 1000 { t.Fatalf("unexpected newStart; got %d; want %d", newStart, 1000) } @@ -161,8 +161,8 @@ func TestRollupResultCache(t *testing.T) { Values: []float64{0, 1, 2}, }, } - rollupResultCacheV.Put(nil, ec, fe, window, tss) - tss, newStart := rollupResultCacheV.Get(nil, ec, fe, window) + rollupResultCacheV.PutSeries(nil, ec, fe, window, tss) + tss, newStart := rollupResultCacheV.GetSeries(nil, ec, fe, window) if newStart != 1000 { t.Fatalf("unexpected newStart; got %d; want %d", newStart, 1000) } @@ -180,8 +180,8 @@ func TestRollupResultCache(t *testing.T) { Values: []float64{0, 1, 2}, }, } - rollupResultCacheV.Put(nil, ec, fe, window, tss) - tss, newStart := rollupResultCacheV.Get(nil, ec, fe, window) + rollupResultCacheV.PutSeries(nil, ec, fe, window, tss) + tss, newStart := rollupResultCacheV.GetSeries(nil, ec, fe, window) if newStart != 1000 { t.Fatalf("unexpected newStart; got %d; want %d", newStart, 1000) } @@ -199,8 +199,8 @@ func TestRollupResultCache(t *testing.T) { Values: []float64{0, 1, 2, 3, 4, 5, 6, 7}, }, } - rollupResultCacheV.Put(nil, ec, fe, window, tss) - tss, newStart := rollupResultCacheV.Get(nil, ec, fe, window) + rollupResultCacheV.PutSeries(nil, ec, fe, window, tss) + tss, newStart := rollupResultCacheV.GetSeries(nil, ec, fe, window) if newStart != 2200 { t.Fatalf("unexpected newStart; got %d; want %d", newStart, 2200) } @@ -222,8 +222,8 @@ func TestRollupResultCache(t *testing.T) { Values: []float64{1, 2, 3, 4, 5, 6}, }, } - rollupResultCacheV.Put(nil, ec, fe, window, tss) - tss, newStart := rollupResultCacheV.Get(nil, ec, fe, window) + rollupResultCacheV.PutSeries(nil, ec, fe, window, tss) + tss, newStart := rollupResultCacheV.GetSeries(nil, ec, fe, window) if newStart != 2200 { t.Fatalf("unexpected newStart; got %d; want %d", newStart, 2200) } @@ -247,8 +247,8 @@ func TestRollupResultCache(t *testing.T) { } tss = append(tss, ts) } - rollupResultCacheV.Put(nil, ec, fe, window, tss) - tssResult, newStart := rollupResultCacheV.Get(nil, ec, fe, window) + rollupResultCacheV.PutSeries(nil, ec, fe, window, tss) + tssResult, newStart := rollupResultCacheV.GetSeries(nil, ec, fe, window) if newStart != 2200 { t.Fatalf("unexpected newStart; got %d; want %d", newStart, 2200) } @@ -276,10 +276,10 @@ func TestRollupResultCache(t *testing.T) { Values: []float64{0, 1, 2}, }, } - rollupResultCacheV.Put(nil, ec, fe, window, tss1) - rollupResultCacheV.Put(nil, ec, fe, window, tss2) - rollupResultCacheV.Put(nil, ec, fe, window, tss3) - tss, newStart := rollupResultCacheV.Get(nil, ec, fe, window) + rollupResultCacheV.PutSeries(nil, ec, fe, window, tss1) + rollupResultCacheV.PutSeries(nil, ec, fe, window, tss2) + rollupResultCacheV.PutSeries(nil, ec, fe, window, tss3) + tss, newStart := rollupResultCacheV.GetSeries(nil, ec, fe, window) if newStart != 1400 { t.Fatalf("unexpected newStart; got %d; want %d", newStart, 1400) } @@ -311,7 +311,10 @@ func TestMergeTimeseries(t *testing.T) { Values: []float64{1, 2, 3, 4, 5, 6}, }, } - tss := mergeTimeseries(a, b, 1000, ec) + tss, err := mergeTimeseries(nil, a, b, 1000, ec) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } tssExpected := []*timeseries{ { Timestamps: []int64{1000, 1200, 1400, 1600, 1800, 2000}, @@ -328,7 +331,10 @@ func TestMergeTimeseries(t *testing.T) { Values: []float64{3, 4, 5, 6}, }, } - tss := mergeTimeseries(a, b, bStart, ec) + tss, err := mergeTimeseries(nil, a, b, bStart, ec) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } tssExpected := []*timeseries{ { Timestamps: []int64{1000, 1200, 1400, 1600, 1800, 2000}, @@ -345,7 +351,10 @@ func TestMergeTimeseries(t *testing.T) { }, } b := []*timeseries{} - tss := mergeTimeseries(a, b, bStart, ec) + tss, err := mergeTimeseries(nil, a, b, bStart, ec) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } tssExpected := []*timeseries{ { Timestamps: []int64{1000, 1200, 1400, 1600, 1800, 2000}, @@ -367,7 +376,10 @@ func TestMergeTimeseries(t *testing.T) { Values: []float64{3, 4, 5, 6}, }, } - tss := mergeTimeseries(a, b, bStart, ec) + tss, err := mergeTimeseries(nil, a, b, bStart, ec) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } tssExpected := []*timeseries{ { Timestamps: []int64{1000, 1200, 1400, 1600, 1800, 2000}, @@ -391,7 +403,10 @@ func TestMergeTimeseries(t *testing.T) { }, } b[0].MetricName.MetricGroup = []byte("foo") - tss := mergeTimeseries(a, b, bStart, ec) + tss, err := mergeTimeseries(nil, a, b, bStart, ec) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } tssExpected := []*timeseries{ { MetricName: storage.MetricName{ diff --git a/app/vmselect/promql/timeseries.go b/app/vmselect/promql/timeseries.go index 2294f622b..2aea3e3f5 100644 --- a/app/vmselect/promql/timeseries.go +++ b/app/vmselect/promql/timeseries.go @@ -79,7 +79,10 @@ var timeseriesPool sync.Pool func marshalTimeseriesFast(dst []byte, tss []*timeseries, maxSize int, step int64) []byte { if len(tss) == 0 { - logger.Panicf("BUG: tss cannot be empty") + // marshal zero timeseries and zero timestamps + dst = encoding.MarshalUint64(dst, 0) + dst = encoding.MarshalUint64(dst, 0) + return dst } // timestamps are stored only once for all the tss, since they must be identical diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index a6eb5d61a..36428ff5e 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -33,6 +33,13 @@ The sandbox cluster installation is running under the constant load generated by * SECURITY: upgrade Go builder from Go1.21.1 to Go1.21.3. See [the list of issues addressed in Go1.21.2](https://github.com/golang/go/issues?q=milestone%3AGo1.21.2+label%3ACherryPickApproved) and [the list of issues addressed in Go1.21.3](https://github.com/golang/go/issues?q=milestone%3AGo1.21.3+label%3ACherryPickApproved). +* FEATURE: `vmselect`: improve performance for repeated [instant queries](https://docs.victoriametrics.com/keyConcepts.html#instant-query) if they contain one of the following [rollup functions](https://docs.victoriametrics.com/MetricsQL.html#rollup-functions) with lookbehind window in square brackets bigger or equal to 1 day: + - [sum_over_time](https://docs.victoriametrics.com/MetricsQL.html#sum_over_time) + - [count_over_time](https://docs.victoriametrics.com/MetricsQL.html#count_over_time) + - [avg_over_time](https://docs.victoriametrics.com/MetricsQL.html#avg_over_time) + - [increase](https://docs.victoriametrics.com/MetricsQL.html#increase) + - [rate](https://docs.victoriametrics.com/MetricsQL.html#rate) + These functions are usually used in SLO/SLI queries such as `avg_over_time(up[30d])` or `sum(rate(http_request_errors_total[3d])) / sum(rate(http_requests_total[3d]))`. * FEATURE: `vmselect`: improve query performance on systems with big number of CPU cores (`>=32`). Add `-search.maxWorkersPerQuery` command-line flag, which can be used for fine-tuning query performance on systems with big number of CPU cores. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5195). * FEATURE: `vmselect`: expose `vm_memory_intensive_queries_total` counter metric which gets increased each time `-search.logQueryMemoryUsage` memory limit is exceeded by a query. This metric should help to identify expensive and heavy queries without inspecting the logs. * FEATURE: [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html): add [drop_empty_series()](https://docs.victoriametrics.com/MetricsQL.html#drop_empty_series) function, which can be used for filtering out empty series before performing additional calculations as shown in [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5071).