From 7ca8ebef202594e40081567520b545cb9e2aa41b Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Thu, 16 Nov 2023 15:52:38 +0100 Subject: [PATCH] app/vmselect/promql: properly handle duplicate series when merging cached results with the results obtained from the database evalRollupFuncNoCache() may return time series with identical labels (aka duplicate series) when performing queries satisfying all the following conditions: - It must select time series with multiple metric names. For example, {__name__=~"foo|bar"} - The series selector must be wrapped into rollup function, which drops metric names. For example, rate({__name__=~"foo|bar"}) - The rollup function must be wrapped into aggregate function, which has no streaming optimization. For example, quantile(0.9, rate({__name__=~"foo|bar"}) In this case VictoriaMetrics shouldn't return `cannot merge series: duplicate series found` error. Instead, it should fall back to query execution with disabled cache. Also properly store the merged results. Previously they were incorrectly stored because of a typo introduced in the commit 41a0fdaf39ea99708cf1c899332910965d62dcfc Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5332 Updates https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5337 --- app/vmselect/promql/eval.go | 53 ++++-- app/vmselect/promql/rollup_result_cache.go | 153 ++++++++++++------ .../promql/rollup_result_cache_test.go | 103 ++++++++++-- docs/CHANGELOG.md | 2 + 4 files changed, 228 insertions(+), 83 deletions(-) diff --git a/app/vmselect/promql/eval.go b/app/vmselect/promql/eval.go index 4be1931c7..596083bf8 100644 --- a/app/vmselect/promql/eval.go +++ b/app/vmselect/promql/eval.go @@ -890,7 +890,7 @@ func evalRollupFuncWithoutAt(qt *querytracer.Tracer, ec *EvalConfig, funcName st } } if funcName == "absent_over_time" { - rvs = aggregateAbsentOverTime(ec, re.Expr, rvs) + rvs = aggregateAbsentOverTime(ecNew, re.Expr, rvs) } ec.updateIsPartialResponse(ecNew.IsPartialResponse.Load()) if offset != 0 && len(rvs) > 0 { @@ -1618,8 +1618,23 @@ func evalRollupFuncWithMetricExpr(qt *querytracer.Tracer, ec *EvalConfig, funcNa } return rvs, nil } + pointsPerSeries := 1 + (ec.End-ec.Start)/ec.Step + evalWithConfig := func(ec *EvalConfig) ([]*timeseries, error) { + tss, err := evalRollupFuncNoCache(qt, ec, funcName, rf, expr, me, iafc, window, pointsPerSeries) + if err != nil { + err = &UserReadableError{ + Err: err, + } + return nil, err + } + return tss, nil + } + if !ec.mayCache() { + qt.Printf("do not fetch series from cache, since it is disabled in the current context") + return evalWithConfig(ec) + } - // Search for partial results in cache. + // Search for cached results. tssCached, start := rollupResultCacheV.GetSeries(qt, ec, expr, window) ec.QueryStats.addSeriesFetched(len(tssCached)) if start > ec.End { @@ -1635,23 +1650,31 @@ func evalRollupFuncWithMetricExpr(qt *querytracer.Tracer, ec *EvalConfig, funcNa rollupResultCacheMiss.Inc() } - ecCopy := copyEvalConfig(ec) - ecCopy.Start = start - pointsPerSeries := 1 + (ec.End-ec.Start)/ec.Step - tss, err := evalRollupFuncNoCache(qt, ecCopy, funcName, rf, expr, me, iafc, window, pointsPerSeries) + // Fetch missing results, which aren't cached yet. + ecNew := ec + if start != ec.Start { + ecNew = copyEvalConfig(ec) + ecNew.Start = start + } + tss, err := evalWithConfig(ecNew) if err != nil { - err = &UserReadableError{ - Err: err, - } return nil, err } - isPartial := ecCopy.IsPartialResponse.Load() - ec.updateIsPartialResponse(isPartial) - rvs, err := mergeTimeseries(qt, tssCached, tss, start, ec) - if err != nil { - return nil, fmt.Errorf("cannot merge series: %w", err) + isPartial := ecNew.IsPartialResponse.Load() + + // Merge cached results with the fetched additional results. + rvs, ok := mergeSeries(qt, tssCached, tss, start, ec) + if !ok { + // Cannot merge series - fall back to non-cached querying. + qt.Printf("fall back to non-caching querying") + rvs, err = evalWithConfig(ec) + if err != nil { + return nil, err + } + isPartial = ec.IsPartialResponse.Load() } - if tss != nil && !isPartial { + ec.updateIsPartialResponse(isPartial) + if !isPartial { rollupResultCacheV.PutSeries(qt, ec, expr, window, tss) } return rvs, nil diff --git a/app/vmselect/promql/rollup_result_cache.go b/app/vmselect/promql/rollup_result_cache.go index 693e0a7ac..2f7229bfa 100644 --- a/app/vmselect/promql/rollup_result_cache.go +++ b/app/vmselect/promql/rollup_result_cache.go @@ -227,10 +227,6 @@ func (rrc *rollupResultCache) GetSeries(qt *querytracer.Tracer, ec *EvalConfig, qt = qt.NewChild("rollup cache get series: query=%s, timeRange=%s, window=%d, step=%d", query, ec.timeRangeString(), window, ec.Step) defer qt.Done() } - if !ec.mayCache() { - qt.Printf("do not fetch series from cache, since it is disabled in the current context") - return nil, ec.Start - } // Obtain tss from the cache. bb := bbPool.Get() @@ -312,13 +308,25 @@ func (rrc *rollupResultCache) PutSeries(qt *querytracer.Tracer, ec *EvalConfig, qt = qt.NewChild("rollup cache put series: query=%s, timeRange=%s, step=%d, window=%d, series=%d", query, ec.timeRangeString(), ec.Step, window, len(tss)) defer qt.Done() } - if !ec.mayCache() { - qt.Printf("do not store series to cache, since it is disabled in the current context") + if len(tss) == 0 { + qt.Printf("do not cache empty series list") return } - if len(tss) == 0 { - qt.Printf("do not store empty series list") - return + + if len(tss) > 1 { + // Verify whether tss contains series with duplicate naming. + // There is little sense in storing such series in the cache, since they cannot be merged in mergeSeries() later. + bb := bbPool.Get() + m := make(map[string]struct{}, len(tss)) + for _, ts := range tss { + bb.B = marshalMetricNameSorted(bb.B[:0], &ts.MetricName) + if _, ok := m[string(bb.B)]; ok { + qt.Printf("do not cache series with duplicate naming %s", &ts.MetricName) + return + } + m[string(bb.B)] = struct{}{} + } + bbPool.Put(bb) } // Remove values up to currentTime - step - cacheTimestampOffset, @@ -477,7 +485,7 @@ func mustSaveRollupResultCacheKeyPrefix(path string) { var tooBigRollupResults = metrics.NewCounter("vm_too_big_rollup_results_total") // Increment this value every time the format of the cache changes. -const rollupResultCacheVersion = 10 +const rollupResultCacheVersion = 11 const ( rollupResultCacheTypeSeries = 0 @@ -522,74 +530,115 @@ func marshalTagFiltersForRollupResultCacheKey(dst []byte, etfs [][]storage.TagFi return dst } -// mergeTimeseries concatenates b with a and returns the result. +func equalTimestamps(a, b []int64) bool { + if len(a) != len(b) { + return false + } + for i, tsA := range a { + tsB := b[i] + if tsA != tsB { + return false + } + } + return true +} + +// mergeSeries concatenates a with b and returns the result. +// +// true is returned on successful concatenation, false otherwise. // // Preconditions: -// - a mustn't intersect with b by timestamps. -// - a timestamps must be smaller than b timestamps. +// - bStart must be in the range [ec.Start .. ec.End] +// - a must contain series with all the samples on the range [ec.Start ... bStart - ec.Step] with ec.Step interval between them +// - b must contain series with all the samples on the range [bStart .. ec.End] with ec.Step interval between them // // Postconditions: +// - the returned series contain all the samples on the range [ec.Start .. ec.End] with ec.Step interval between them // - a and b cannot be used after returning from the call. -func mergeTimeseries(qt *querytracer.Tracer, a, b []*timeseries, bStart int64, ec *EvalConfig) ([]*timeseries, error) { - qt = qt.NewChild("merge series len(a)=%d, len(b)=%d", len(a), len(b)) - defer qt.Done() +func mergeSeries(qt *querytracer.Tracer, a, b []*timeseries, bStart int64, ec *EvalConfig) ([]*timeseries, bool) { + if qt.Enabled() { + qt = qt.NewChild("merge series on time range %s with step=%dms; len(a)=%d, len(b)=%d, bStart=%s", + ec.timeRangeString(), ec.Step, len(a), len(b), storage.TimestampToHumanReadableFormat(bStart)) + defer qt.Done() + } sharedTimestamps := ec.getSharedTimestamps() - if bStart == ec.Start { - // Nothing to merge - b covers all the time range. - // Verify b is correct. + i := 0 + for i < len(sharedTimestamps) && sharedTimestamps[i] < bStart { + i++ + } + aTimestamps := sharedTimestamps[:i] + bTimestamps := sharedTimestamps[i:] + + if len(bTimestamps) == len(sharedTimestamps) { + // Nothing to merge - just return b to the caller for _, tsB := range b { + if !equalTimestamps(tsB.Timestamps, bTimestamps) { + logger.Panicf("BUG: invalid timestamps in b series %s; got %d; want %d", &tsB.MetricName, tsB.Timestamps, bTimestamps) + } tsB.denyReuse = true tsB.Timestamps = sharedTimestamps - if len(tsB.Values) != len(tsB.Timestamps) { - logger.Panicf("BUG: unexpected number of values in b; got %d; want %d", len(tsB.Values), len(tsB.Timestamps)) - } } - return b, nil + return b, true } - m := make(map[string]*timeseries, len(a)) bb := bbPool.Get() defer bbPool.Put(bb) + + mA := make(map[string]*timeseries, len(a)) for _, ts := range a { - bb.B = marshalMetricNameSorted(bb.B[:0], &ts.MetricName) - if _, ok := m[string(bb.B)]; ok { - return nil, fmt.Errorf("duplicate series found: %s", &ts.MetricName) + if !equalTimestamps(ts.Timestamps, aTimestamps) { + logger.Panicf("BUG: invalid timestamps in a series %s; got %d; want %d", &ts.MetricName, ts.Timestamps, aTimestamps) } - m[string(bb.B)] = ts + bb.B = marshalMetricNameSorted(bb.B[:0], &ts.MetricName) + if _, ok := mA[string(bb.B)]; ok { + qt.Printf("cannot merge series because a series contain duplicate %s", &ts.MetricName) + return nil, false + } + mA[string(bb.B)] = ts } + mB := make(map[string]struct{}, len(b)) rvs := make([]*timeseries, 0, len(a)) + var aNaNs []float64 for _, tsB := range b { + if !equalTimestamps(tsB.Timestamps, bTimestamps) { + logger.Panicf("BUG: invalid timestamps for b series %s; got %d; want %d", &tsB.MetricName, tsB.Timestamps, bTimestamps) + } + bb.B = marshalMetricNameSorted(bb.B[:0], &tsB.MetricName) + if _, ok := mB[string(bb.B)]; ok { + qt.Printf("cannot merge series because b series contain duplicate %s", &tsB.MetricName) + return nil, false + } + mB[string(bb.B)] = struct{}{} + var tmp timeseries tmp.denyReuse = true tmp.Timestamps = sharedTimestamps tmp.Values = make([]float64, 0, len(tmp.Timestamps)) tmp.MetricName.MoveFrom(&tsB.MetricName) - bb.B = marshalMetricNameSorted(bb.B[:0], &tmp.MetricName) - k := string(bb.B) - tsA := m[k] + tsA := mA[string(bb.B)] if tsA == nil { - tStart := ec.Start - for tStart < bStart { - tmp.Values = append(tmp.Values, nan) - tStart += ec.Step + if aNaNs == nil { + tStart := ec.Start + for tStart < bStart { + aNaNs = append(aNaNs, nan) + tStart += ec.Step + } } + tmp.Values = append(tmp.Values, aNaNs...) } else { tmp.Values = append(tmp.Values, tsA.Values...) - delete(m, k) + delete(mA, string(bb.B)) } tmp.Values = append(tmp.Values, tsB.Values...) - if len(tmp.Values) != len(tmp.Timestamps) { - logger.Panicf("BUG: unexpected values after merging new values; got %d; want %d; len(a.Values)=%d; len(b.Values)=%d", - len(tmp.Values), len(tmp.Timestamps), len(tsA.Values), len(tsB.Values)) - } rvs = append(rvs, &tmp) } - // Copy the remaining timeseries from m. - for _, tsA := range m { + // Copy the remaining timeseries from mA. + var bNaNs []float64 + for _, tsA := range mA { var tmp timeseries tmp.denyReuse = true tmp.Timestamps = sharedTimestamps @@ -597,18 +646,18 @@ func mergeTimeseries(qt *querytracer.Tracer, a, b []*timeseries, bStart int64, e tmp.MetricName.MoveFrom(&tsA.MetricName) tmp.Values = append(tmp.Values, tsA.Values...) - tStart := bStart - for tStart <= ec.End { - tmp.Values = append(tmp.Values, nan) - tStart += ec.Step - } - if len(tmp.Values) != len(tmp.Timestamps) { - logger.Panicf("BUG: unexpected values in the result after adding cached values; got %d; want %d", len(tmp.Values), len(tmp.Timestamps)) + if bNaNs == nil { + tStart := bStart + for tStart <= ec.End { + bNaNs = append(bNaNs, nan) + tStart += ec.Step + } } + tmp.Values = append(tmp.Values, bNaNs...) rvs = append(rvs, &tmp) } qt.Printf("resulting series=%d", len(rvs)) - return rvs, nil + return rvs, true } type rollupResultCacheMetainfo struct { @@ -691,9 +740,9 @@ func (mi *rollupResultCacheMetainfo) AddKey(key rollupResultCacheKey, start, end end: end, key: key, }) - if len(mi.entries) > 30 { + if len(mi.entries) > 10 { // Remove old entries. - mi.entries = append(mi.entries[:0], mi.entries[10:]...) + mi.entries = append(mi.entries[:0], mi.entries[5:]...) } } diff --git a/app/vmselect/promql/rollup_result_cache_test.go b/app/vmselect/promql/rollup_result_cache_test.go index 69cddbfa6..03430ae3a 100644 --- a/app/vmselect/promql/rollup_result_cache_test.go +++ b/app/vmselect/promql/rollup_result_cache_test.go @@ -1,6 +1,7 @@ package promql import ( + "fmt" "testing" "github.com/VictoriaMetrics/VictoriaMetrics/lib/auth" @@ -251,6 +252,7 @@ func TestRollupResultCache(t *testing.T) { Timestamps: []int64{1000, 1200, 1400, 1600, 1800, 2000}, Values: []float64{1, 2, 3, 4, 5, 6}, } + ts.MetricName.MetricGroup = []byte(fmt.Sprintf("metric %d", i)) tss = append(tss, ts) } rollupResultCacheV.PutSeries(nil, ec, fe, window, tss) @@ -261,6 +263,29 @@ func TestRollupResultCache(t *testing.T) { testTimeseriesEqual(t, tssResult, tss) }) + // Store series with identical naming (they shouldn't be stored) + t.Run("duplicate-series", func(t *testing.T) { + ResetRollupResultCache() + tss := []*timeseries{ + { + Timestamps: []int64{800, 1000, 1200}, + Values: []float64{0, 1, 2}, + }, + { + Timestamps: []int64{800, 1000, 1200}, + Values: []float64{0, 1, 2}, + }, + } + rollupResultCacheV.PutSeries(nil, ec, fe, window, tss) + tssResult, newStart := rollupResultCacheV.GetSeries(nil, ec, fe, window) + if newStart != ec.Start { + t.Fatalf("unexpected newStart; got %d; want %d", newStart, ec.Start) + } + if len(tssResult) != 0 { + t.Fatalf("unexpected non-empty series returned") + } + }) + // Store multiple time series t.Run("multi-timeseries", func(t *testing.T) { ResetRollupResultCache() @@ -300,7 +325,7 @@ func TestRollupResultCache(t *testing.T) { } -func TestMergeTimeseries(t *testing.T) { +func TestMergeSeries(t *testing.T) { ec := &EvalConfig{ Start: 1000, End: 2000, @@ -317,9 +342,9 @@ func TestMergeTimeseries(t *testing.T) { Values: []float64{1, 2, 3, 4, 5, 6}, }, } - tss, err := mergeTimeseries(nil, a, b, 1000, ec) - if err != nil { - t.Fatalf("unexpected error: %s", err) + tss, ok := mergeSeries(nil, a, b, 1000, ec) + if !ok { + t.Fatalf("unexpected failure to merge series") } tssExpected := []*timeseries{ { @@ -337,9 +362,9 @@ func TestMergeTimeseries(t *testing.T) { Values: []float64{3, 4, 5, 6}, }, } - tss, err := mergeTimeseries(nil, a, b, bStart, ec) - if err != nil { - t.Fatalf("unexpected error: %s", err) + tss, ok := mergeSeries(nil, a, b, bStart, ec) + if !ok { + t.Fatalf("unexpected failure to merge series") } tssExpected := []*timeseries{ { @@ -357,9 +382,9 @@ func TestMergeTimeseries(t *testing.T) { }, } b := []*timeseries{} - tss, err := mergeTimeseries(nil, a, b, bStart, ec) - if err != nil { - t.Fatalf("unexpected error: %s", err) + tss, ok := mergeSeries(nil, a, b, bStart, ec) + if !ok { + t.Fatalf("unexpected failure to merge series") } tssExpected := []*timeseries{ { @@ -382,9 +407,9 @@ func TestMergeTimeseries(t *testing.T) { Values: []float64{3, 4, 5, 6}, }, } - tss, err := mergeTimeseries(nil, a, b, bStart, ec) - if err != nil { - t.Fatalf("unexpected error: %s", err) + tss, ok := mergeSeries(nil, a, b, bStart, ec) + if !ok { + t.Fatalf("unexpected failure to merge series") } tssExpected := []*timeseries{ { @@ -409,9 +434,9 @@ func TestMergeTimeseries(t *testing.T) { }, } b[0].MetricName.MetricGroup = []byte("foo") - tss, err := mergeTimeseries(nil, a, b, bStart, ec) - if err != nil { - t.Fatalf("unexpected error: %s", err) + tss, ok := mergeSeries(nil, a, b, bStart, ec) + if !ok { + t.Fatalf("unexpected failure to merge series") } tssExpected := []*timeseries{ { @@ -431,6 +456,52 @@ func TestMergeTimeseries(t *testing.T) { } testTimeseriesEqual(t, tss, tssExpected) }) + t.Run("duplicate-series-a", func(t *testing.T) { + a := []*timeseries{ + { + Timestamps: []int64{1000, 1200}, + Values: []float64{2, 1}, + }, + { + Timestamps: []int64{1000, 1200}, + Values: []float64{3, 3}, + }, + } + b := []*timeseries{ + { + Timestamps: []int64{1400, 1600, 1800, 2000}, + Values: []float64{3, 4, 5, 6}, + }, + } + tss, ok := mergeSeries(nil, a, b, bStart, ec) + if ok { + t.Fatalf("expecting failre to merge series") + } + testTimeseriesEqual(t, tss, nil) + }) + t.Run("duplicate-series-b", func(t *testing.T) { + a := []*timeseries{ + { + Timestamps: []int64{1000, 1200}, + Values: []float64{1, 2}, + }, + } + b := []*timeseries{ + { + Timestamps: []int64{1400, 1600, 1800, 2000}, + Values: []float64{3, 4, 5, 6}, + }, + { + Timestamps: []int64{1400, 1600, 1800, 2000}, + Values: []float64{13, 14, 15, 16}, + }, + } + tss, ok := mergeSeries(nil, a, b, bStart, ec) + if ok { + t.Fatalf("expecting failre to merge series") + } + testTimeseriesEqual(t, tss, nil) + }) } func testTimeseriesEqual(t *testing.T, tss, tssExpected []*timeseries) { diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 39e0c6dd3..465092344 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -30,6 +30,8 @@ The sandbox cluster installation is running under the constant load generated by * FEATURE: dashboards: use `version` instead of `short_version` in version change annotation for single/cluster dashboards. The update should reflect version changes even if different flavours of the same release were applied (custom builds). +* BUGFIX: fix a bug, which could result in improper results and/or to `cannot merge series: duplicate series found` error during [range query](https://docs.victoriametrics.com/keyConcepts.html#range-query) query execution. The issue has been introduced in [v1.95.0](https://docs.victoriametrics.com/CHANGELOG.html#v1950). See [this bugreport](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5332) for details. + ## [v1.95.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.95.0) Released at 2023-11-15