diff --git a/app/vmselect/promql/eval.go b/app/vmselect/promql/eval.go index 4be1931c7..596083bf8 100644 --- a/app/vmselect/promql/eval.go +++ b/app/vmselect/promql/eval.go @@ -890,7 +890,7 @@ func evalRollupFuncWithoutAt(qt *querytracer.Tracer, ec *EvalConfig, funcName st } } if funcName == "absent_over_time" { - rvs = aggregateAbsentOverTime(ec, re.Expr, rvs) + rvs = aggregateAbsentOverTime(ecNew, re.Expr, rvs) } ec.updateIsPartialResponse(ecNew.IsPartialResponse.Load()) if offset != 0 && len(rvs) > 0 { @@ -1618,8 +1618,23 @@ func evalRollupFuncWithMetricExpr(qt *querytracer.Tracer, ec *EvalConfig, funcNa } return rvs, nil } + pointsPerSeries := 1 + (ec.End-ec.Start)/ec.Step + evalWithConfig := func(ec *EvalConfig) ([]*timeseries, error) { + tss, err := evalRollupFuncNoCache(qt, ec, funcName, rf, expr, me, iafc, window, pointsPerSeries) + if err != nil { + err = &UserReadableError{ + Err: err, + } + return nil, err + } + return tss, nil + } + if !ec.mayCache() { + qt.Printf("do not fetch series from cache, since it is disabled in the current context") + return evalWithConfig(ec) + } - // Search for partial results in cache. + // Search for cached results. tssCached, start := rollupResultCacheV.GetSeries(qt, ec, expr, window) ec.QueryStats.addSeriesFetched(len(tssCached)) if start > ec.End { @@ -1635,23 +1650,31 @@ func evalRollupFuncWithMetricExpr(qt *querytracer.Tracer, ec *EvalConfig, funcNa rollupResultCacheMiss.Inc() } - ecCopy := copyEvalConfig(ec) - ecCopy.Start = start - pointsPerSeries := 1 + (ec.End-ec.Start)/ec.Step - tss, err := evalRollupFuncNoCache(qt, ecCopy, funcName, rf, expr, me, iafc, window, pointsPerSeries) + // Fetch missing results, which aren't cached yet. + ecNew := ec + if start != ec.Start { + ecNew = copyEvalConfig(ec) + ecNew.Start = start + } + tss, err := evalWithConfig(ecNew) if err != nil { - err = &UserReadableError{ - Err: err, - } return nil, err } - isPartial := ecCopy.IsPartialResponse.Load() - ec.updateIsPartialResponse(isPartial) - rvs, err := mergeTimeseries(qt, tssCached, tss, start, ec) - if err != nil { - return nil, fmt.Errorf("cannot merge series: %w", err) + isPartial := ecNew.IsPartialResponse.Load() + + // Merge cached results with the fetched additional results. + rvs, ok := mergeSeries(qt, tssCached, tss, start, ec) + if !ok { + // Cannot merge series - fall back to non-cached querying. + qt.Printf("fall back to non-caching querying") + rvs, err = evalWithConfig(ec) + if err != nil { + return nil, err + } + isPartial = ec.IsPartialResponse.Load() } - if tss != nil && !isPartial { + ec.updateIsPartialResponse(isPartial) + if !isPartial { rollupResultCacheV.PutSeries(qt, ec, expr, window, tss) } return rvs, nil diff --git a/app/vmselect/promql/rollup_result_cache.go b/app/vmselect/promql/rollup_result_cache.go index 693e0a7ac..2f7229bfa 100644 --- a/app/vmselect/promql/rollup_result_cache.go +++ b/app/vmselect/promql/rollup_result_cache.go @@ -227,10 +227,6 @@ func (rrc *rollupResultCache) GetSeries(qt *querytracer.Tracer, ec *EvalConfig, qt = qt.NewChild("rollup cache get series: query=%s, timeRange=%s, window=%d, step=%d", query, ec.timeRangeString(), window, ec.Step) defer qt.Done() } - if !ec.mayCache() { - qt.Printf("do not fetch series from cache, since it is disabled in the current context") - return nil, ec.Start - } // Obtain tss from the cache. bb := bbPool.Get() @@ -312,13 +308,25 @@ func (rrc *rollupResultCache) PutSeries(qt *querytracer.Tracer, ec *EvalConfig, qt = qt.NewChild("rollup cache put series: query=%s, timeRange=%s, step=%d, window=%d, series=%d", query, ec.timeRangeString(), ec.Step, window, len(tss)) defer qt.Done() } - if !ec.mayCache() { - qt.Printf("do not store series to cache, since it is disabled in the current context") + if len(tss) == 0 { + qt.Printf("do not cache empty series list") return } - if len(tss) == 0 { - qt.Printf("do not store empty series list") - return + + if len(tss) > 1 { + // Verify whether tss contains series with duplicate naming. + // There is little sense in storing such series in the cache, since they cannot be merged in mergeSeries() later. + bb := bbPool.Get() + m := make(map[string]struct{}, len(tss)) + for _, ts := range tss { + bb.B = marshalMetricNameSorted(bb.B[:0], &ts.MetricName) + if _, ok := m[string(bb.B)]; ok { + qt.Printf("do not cache series with duplicate naming %s", &ts.MetricName) + return + } + m[string(bb.B)] = struct{}{} + } + bbPool.Put(bb) } // Remove values up to currentTime - step - cacheTimestampOffset, @@ -477,7 +485,7 @@ func mustSaveRollupResultCacheKeyPrefix(path string) { var tooBigRollupResults = metrics.NewCounter("vm_too_big_rollup_results_total") // Increment this value every time the format of the cache changes. -const rollupResultCacheVersion = 10 +const rollupResultCacheVersion = 11 const ( rollupResultCacheTypeSeries = 0 @@ -522,74 +530,115 @@ func marshalTagFiltersForRollupResultCacheKey(dst []byte, etfs [][]storage.TagFi return dst } -// mergeTimeseries concatenates b with a and returns the result. +func equalTimestamps(a, b []int64) bool { + if len(a) != len(b) { + return false + } + for i, tsA := range a { + tsB := b[i] + if tsA != tsB { + return false + } + } + return true +} + +// mergeSeries concatenates a with b and returns the result. +// +// true is returned on successful concatenation, false otherwise. // // Preconditions: -// - a mustn't intersect with b by timestamps. -// - a timestamps must be smaller than b timestamps. +// - bStart must be in the range [ec.Start .. ec.End] +// - a must contain series with all the samples on the range [ec.Start ... bStart - ec.Step] with ec.Step interval between them +// - b must contain series with all the samples on the range [bStart .. ec.End] with ec.Step interval between them // // Postconditions: +// - the returned series contain all the samples on the range [ec.Start .. ec.End] with ec.Step interval between them // - a and b cannot be used after returning from the call. -func mergeTimeseries(qt *querytracer.Tracer, a, b []*timeseries, bStart int64, ec *EvalConfig) ([]*timeseries, error) { - qt = qt.NewChild("merge series len(a)=%d, len(b)=%d", len(a), len(b)) - defer qt.Done() +func mergeSeries(qt *querytracer.Tracer, a, b []*timeseries, bStart int64, ec *EvalConfig) ([]*timeseries, bool) { + if qt.Enabled() { + qt = qt.NewChild("merge series on time range %s with step=%dms; len(a)=%d, len(b)=%d, bStart=%s", + ec.timeRangeString(), ec.Step, len(a), len(b), storage.TimestampToHumanReadableFormat(bStart)) + defer qt.Done() + } sharedTimestamps := ec.getSharedTimestamps() - if bStart == ec.Start { - // Nothing to merge - b covers all the time range. - // Verify b is correct. + i := 0 + for i < len(sharedTimestamps) && sharedTimestamps[i] < bStart { + i++ + } + aTimestamps := sharedTimestamps[:i] + bTimestamps := sharedTimestamps[i:] + + if len(bTimestamps) == len(sharedTimestamps) { + // Nothing to merge - just return b to the caller for _, tsB := range b { + if !equalTimestamps(tsB.Timestamps, bTimestamps) { + logger.Panicf("BUG: invalid timestamps in b series %s; got %d; want %d", &tsB.MetricName, tsB.Timestamps, bTimestamps) + } tsB.denyReuse = true tsB.Timestamps = sharedTimestamps - if len(tsB.Values) != len(tsB.Timestamps) { - logger.Panicf("BUG: unexpected number of values in b; got %d; want %d", len(tsB.Values), len(tsB.Timestamps)) - } } - return b, nil + return b, true } - m := make(map[string]*timeseries, len(a)) bb := bbPool.Get() defer bbPool.Put(bb) + + mA := make(map[string]*timeseries, len(a)) for _, ts := range a { - bb.B = marshalMetricNameSorted(bb.B[:0], &ts.MetricName) - if _, ok := m[string(bb.B)]; ok { - return nil, fmt.Errorf("duplicate series found: %s", &ts.MetricName) + if !equalTimestamps(ts.Timestamps, aTimestamps) { + logger.Panicf("BUG: invalid timestamps in a series %s; got %d; want %d", &ts.MetricName, ts.Timestamps, aTimestamps) } - m[string(bb.B)] = ts + bb.B = marshalMetricNameSorted(bb.B[:0], &ts.MetricName) + if _, ok := mA[string(bb.B)]; ok { + qt.Printf("cannot merge series because a series contain duplicate %s", &ts.MetricName) + return nil, false + } + mA[string(bb.B)] = ts } + mB := make(map[string]struct{}, len(b)) rvs := make([]*timeseries, 0, len(a)) + var aNaNs []float64 for _, tsB := range b { + if !equalTimestamps(tsB.Timestamps, bTimestamps) { + logger.Panicf("BUG: invalid timestamps for b series %s; got %d; want %d", &tsB.MetricName, tsB.Timestamps, bTimestamps) + } + bb.B = marshalMetricNameSorted(bb.B[:0], &tsB.MetricName) + if _, ok := mB[string(bb.B)]; ok { + qt.Printf("cannot merge series because b series contain duplicate %s", &tsB.MetricName) + return nil, false + } + mB[string(bb.B)] = struct{}{} + var tmp timeseries tmp.denyReuse = true tmp.Timestamps = sharedTimestamps tmp.Values = make([]float64, 0, len(tmp.Timestamps)) tmp.MetricName.MoveFrom(&tsB.MetricName) - bb.B = marshalMetricNameSorted(bb.B[:0], &tmp.MetricName) - k := string(bb.B) - tsA := m[k] + tsA := mA[string(bb.B)] if tsA == nil { - tStart := ec.Start - for tStart < bStart { - tmp.Values = append(tmp.Values, nan) - tStart += ec.Step + if aNaNs == nil { + tStart := ec.Start + for tStart < bStart { + aNaNs = append(aNaNs, nan) + tStart += ec.Step + } } + tmp.Values = append(tmp.Values, aNaNs...) } else { tmp.Values = append(tmp.Values, tsA.Values...) - delete(m, k) + delete(mA, string(bb.B)) } tmp.Values = append(tmp.Values, tsB.Values...) - if len(tmp.Values) != len(tmp.Timestamps) { - logger.Panicf("BUG: unexpected values after merging new values; got %d; want %d; len(a.Values)=%d; len(b.Values)=%d", - len(tmp.Values), len(tmp.Timestamps), len(tsA.Values), len(tsB.Values)) - } rvs = append(rvs, &tmp) } - // Copy the remaining timeseries from m. - for _, tsA := range m { + // Copy the remaining timeseries from mA. + var bNaNs []float64 + for _, tsA := range mA { var tmp timeseries tmp.denyReuse = true tmp.Timestamps = sharedTimestamps @@ -597,18 +646,18 @@ func mergeTimeseries(qt *querytracer.Tracer, a, b []*timeseries, bStart int64, e tmp.MetricName.MoveFrom(&tsA.MetricName) tmp.Values = append(tmp.Values, tsA.Values...) - tStart := bStart - for tStart <= ec.End { - tmp.Values = append(tmp.Values, nan) - tStart += ec.Step - } - if len(tmp.Values) != len(tmp.Timestamps) { - logger.Panicf("BUG: unexpected values in the result after adding cached values; got %d; want %d", len(tmp.Values), len(tmp.Timestamps)) + if bNaNs == nil { + tStart := bStart + for tStart <= ec.End { + bNaNs = append(bNaNs, nan) + tStart += ec.Step + } } + tmp.Values = append(tmp.Values, bNaNs...) rvs = append(rvs, &tmp) } qt.Printf("resulting series=%d", len(rvs)) - return rvs, nil + return rvs, true } type rollupResultCacheMetainfo struct { @@ -691,9 +740,9 @@ func (mi *rollupResultCacheMetainfo) AddKey(key rollupResultCacheKey, start, end end: end, key: key, }) - if len(mi.entries) > 30 { + if len(mi.entries) > 10 { // Remove old entries. - mi.entries = append(mi.entries[:0], mi.entries[10:]...) + mi.entries = append(mi.entries[:0], mi.entries[5:]...) } } diff --git a/app/vmselect/promql/rollup_result_cache_test.go b/app/vmselect/promql/rollup_result_cache_test.go index 69cddbfa6..03430ae3a 100644 --- a/app/vmselect/promql/rollup_result_cache_test.go +++ b/app/vmselect/promql/rollup_result_cache_test.go @@ -1,6 +1,7 @@ package promql import ( + "fmt" "testing" "github.com/VictoriaMetrics/VictoriaMetrics/lib/auth" @@ -251,6 +252,7 @@ func TestRollupResultCache(t *testing.T) { Timestamps: []int64{1000, 1200, 1400, 1600, 1800, 2000}, Values: []float64{1, 2, 3, 4, 5, 6}, } + ts.MetricName.MetricGroup = []byte(fmt.Sprintf("metric %d", i)) tss = append(tss, ts) } rollupResultCacheV.PutSeries(nil, ec, fe, window, tss) @@ -261,6 +263,29 @@ func TestRollupResultCache(t *testing.T) { testTimeseriesEqual(t, tssResult, tss) }) + // Store series with identical naming (they shouldn't be stored) + t.Run("duplicate-series", func(t *testing.T) { + ResetRollupResultCache() + tss := []*timeseries{ + { + Timestamps: []int64{800, 1000, 1200}, + Values: []float64{0, 1, 2}, + }, + { + Timestamps: []int64{800, 1000, 1200}, + Values: []float64{0, 1, 2}, + }, + } + rollupResultCacheV.PutSeries(nil, ec, fe, window, tss) + tssResult, newStart := rollupResultCacheV.GetSeries(nil, ec, fe, window) + if newStart != ec.Start { + t.Fatalf("unexpected newStart; got %d; want %d", newStart, ec.Start) + } + if len(tssResult) != 0 { + t.Fatalf("unexpected non-empty series returned") + } + }) + // Store multiple time series t.Run("multi-timeseries", func(t *testing.T) { ResetRollupResultCache() @@ -300,7 +325,7 @@ func TestRollupResultCache(t *testing.T) { } -func TestMergeTimeseries(t *testing.T) { +func TestMergeSeries(t *testing.T) { ec := &EvalConfig{ Start: 1000, End: 2000, @@ -317,9 +342,9 @@ func TestMergeTimeseries(t *testing.T) { Values: []float64{1, 2, 3, 4, 5, 6}, }, } - tss, err := mergeTimeseries(nil, a, b, 1000, ec) - if err != nil { - t.Fatalf("unexpected error: %s", err) + tss, ok := mergeSeries(nil, a, b, 1000, ec) + if !ok { + t.Fatalf("unexpected failure to merge series") } tssExpected := []*timeseries{ { @@ -337,9 +362,9 @@ func TestMergeTimeseries(t *testing.T) { Values: []float64{3, 4, 5, 6}, }, } - tss, err := mergeTimeseries(nil, a, b, bStart, ec) - if err != nil { - t.Fatalf("unexpected error: %s", err) + tss, ok := mergeSeries(nil, a, b, bStart, ec) + if !ok { + t.Fatalf("unexpected failure to merge series") } tssExpected := []*timeseries{ { @@ -357,9 +382,9 @@ func TestMergeTimeseries(t *testing.T) { }, } b := []*timeseries{} - tss, err := mergeTimeseries(nil, a, b, bStart, ec) - if err != nil { - t.Fatalf("unexpected error: %s", err) + tss, ok := mergeSeries(nil, a, b, bStart, ec) + if !ok { + t.Fatalf("unexpected failure to merge series") } tssExpected := []*timeseries{ { @@ -382,9 +407,9 @@ func TestMergeTimeseries(t *testing.T) { Values: []float64{3, 4, 5, 6}, }, } - tss, err := mergeTimeseries(nil, a, b, bStart, ec) - if err != nil { - t.Fatalf("unexpected error: %s", err) + tss, ok := mergeSeries(nil, a, b, bStart, ec) + if !ok { + t.Fatalf("unexpected failure to merge series") } tssExpected := []*timeseries{ { @@ -409,9 +434,9 @@ func TestMergeTimeseries(t *testing.T) { }, } b[0].MetricName.MetricGroup = []byte("foo") - tss, err := mergeTimeseries(nil, a, b, bStart, ec) - if err != nil { - t.Fatalf("unexpected error: %s", err) + tss, ok := mergeSeries(nil, a, b, bStart, ec) + if !ok { + t.Fatalf("unexpected failure to merge series") } tssExpected := []*timeseries{ { @@ -431,6 +456,52 @@ func TestMergeTimeseries(t *testing.T) { } testTimeseriesEqual(t, tss, tssExpected) }) + t.Run("duplicate-series-a", func(t *testing.T) { + a := []*timeseries{ + { + Timestamps: []int64{1000, 1200}, + Values: []float64{2, 1}, + }, + { + Timestamps: []int64{1000, 1200}, + Values: []float64{3, 3}, + }, + } + b := []*timeseries{ + { + Timestamps: []int64{1400, 1600, 1800, 2000}, + Values: []float64{3, 4, 5, 6}, + }, + } + tss, ok := mergeSeries(nil, a, b, bStart, ec) + if ok { + t.Fatalf("expecting failre to merge series") + } + testTimeseriesEqual(t, tss, nil) + }) + t.Run("duplicate-series-b", func(t *testing.T) { + a := []*timeseries{ + { + Timestamps: []int64{1000, 1200}, + Values: []float64{1, 2}, + }, + } + b := []*timeseries{ + { + Timestamps: []int64{1400, 1600, 1800, 2000}, + Values: []float64{3, 4, 5, 6}, + }, + { + Timestamps: []int64{1400, 1600, 1800, 2000}, + Values: []float64{13, 14, 15, 16}, + }, + } + tss, ok := mergeSeries(nil, a, b, bStart, ec) + if ok { + t.Fatalf("expecting failre to merge series") + } + testTimeseriesEqual(t, tss, nil) + }) } func testTimeseriesEqual(t *testing.T, tss, tssExpected []*timeseries) { diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 39e0c6dd3..465092344 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -30,6 +30,8 @@ The sandbox cluster installation is running under the constant load generated by * FEATURE: dashboards: use `version` instead of `short_version` in version change annotation for single/cluster dashboards. The update should reflect version changes even if different flavours of the same release were applied (custom builds). +* BUGFIX: fix a bug, which could result in improper results and/or to `cannot merge series: duplicate series found` error during [range query](https://docs.victoriametrics.com/keyConcepts.html#range-query) query execution. The issue has been introduced in [v1.95.0](https://docs.victoriametrics.com/CHANGELOG.html#v1950). See [this bugreport](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5332) for details. + ## [v1.95.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.95.0) Released at 2023-11-15