app/vmselect/promql: properly handle duplicate series when merging cached results with the results obtained from the database

evalRollupFuncNoCache() may return time series with identical labels (aka duplicate series)
when performing queries satisfying all the following conditions:

- It must select time series with multiple metric names. For example, {__name__=~"foo|bar"}
- The series selector must be wrapped into rollup function, which drops metric names. For example, rate({__name__=~"foo|bar"})
- The rollup function must be wrapped into aggregate function, which has no streaming optimization.
  For example, quantile(0.9, rate({__name__=~"foo|bar"})

In this case VictoriaMetrics shouldn't return `cannot merge series: duplicate series found` error.
Instead, it should fall back to query execution with disabled cache.

Also properly store the merged results. Previously they were incorrectly stored because of a typo
introduced in the commit 41a0fdaf39

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5332
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5337
This commit is contained in:
Aliaksandr Valialkin 2023-11-16 15:52:38 +01:00
parent 16a41593e6
commit 7ca8ebef20
No known key found for this signature in database
GPG key ID: A72BEC6CD3D0DED1
4 changed files with 228 additions and 83 deletions

View file

@ -890,7 +890,7 @@ func evalRollupFuncWithoutAt(qt *querytracer.Tracer, ec *EvalConfig, funcName st
}
}
if funcName == "absent_over_time" {
rvs = aggregateAbsentOverTime(ec, re.Expr, rvs)
rvs = aggregateAbsentOverTime(ecNew, re.Expr, rvs)
}
ec.updateIsPartialResponse(ecNew.IsPartialResponse.Load())
if offset != 0 && len(rvs) > 0 {
@ -1618,8 +1618,23 @@ func evalRollupFuncWithMetricExpr(qt *querytracer.Tracer, ec *EvalConfig, funcNa
}
return rvs, nil
}
pointsPerSeries := 1 + (ec.End-ec.Start)/ec.Step
evalWithConfig := func(ec *EvalConfig) ([]*timeseries, error) {
tss, err := evalRollupFuncNoCache(qt, ec, funcName, rf, expr, me, iafc, window, pointsPerSeries)
if err != nil {
err = &UserReadableError{
Err: err,
}
return nil, err
}
return tss, nil
}
if !ec.mayCache() {
qt.Printf("do not fetch series from cache, since it is disabled in the current context")
return evalWithConfig(ec)
}
// Search for partial results in cache.
// Search for cached results.
tssCached, start := rollupResultCacheV.GetSeries(qt, ec, expr, window)
ec.QueryStats.addSeriesFetched(len(tssCached))
if start > ec.End {
@ -1635,23 +1650,31 @@ func evalRollupFuncWithMetricExpr(qt *querytracer.Tracer, ec *EvalConfig, funcNa
rollupResultCacheMiss.Inc()
}
ecCopy := copyEvalConfig(ec)
ecCopy.Start = start
pointsPerSeries := 1 + (ec.End-ec.Start)/ec.Step
tss, err := evalRollupFuncNoCache(qt, ecCopy, funcName, rf, expr, me, iafc, window, pointsPerSeries)
// Fetch missing results, which aren't cached yet.
ecNew := ec
if start != ec.Start {
ecNew = copyEvalConfig(ec)
ecNew.Start = start
}
tss, err := evalWithConfig(ecNew)
if err != nil {
err = &UserReadableError{
Err: err,
}
return nil, err
}
isPartial := ecCopy.IsPartialResponse.Load()
ec.updateIsPartialResponse(isPartial)
rvs, err := mergeTimeseries(qt, tssCached, tss, start, ec)
if err != nil {
return nil, fmt.Errorf("cannot merge series: %w", err)
isPartial := ecNew.IsPartialResponse.Load()
// Merge cached results with the fetched additional results.
rvs, ok := mergeSeries(qt, tssCached, tss, start, ec)
if !ok {
// Cannot merge series - fall back to non-cached querying.
qt.Printf("fall back to non-caching querying")
rvs, err = evalWithConfig(ec)
if err != nil {
return nil, err
}
isPartial = ec.IsPartialResponse.Load()
}
if tss != nil && !isPartial {
ec.updateIsPartialResponse(isPartial)
if !isPartial {
rollupResultCacheV.PutSeries(qt, ec, expr, window, tss)
}
return rvs, nil

View file

@ -227,10 +227,6 @@ func (rrc *rollupResultCache) GetSeries(qt *querytracer.Tracer, ec *EvalConfig,
qt = qt.NewChild("rollup cache get series: query=%s, timeRange=%s, window=%d, step=%d", query, ec.timeRangeString(), window, ec.Step)
defer qt.Done()
}
if !ec.mayCache() {
qt.Printf("do not fetch series from cache, since it is disabled in the current context")
return nil, ec.Start
}
// Obtain tss from the cache.
bb := bbPool.Get()
@ -312,13 +308,25 @@ func (rrc *rollupResultCache) PutSeries(qt *querytracer.Tracer, ec *EvalConfig,
qt = qt.NewChild("rollup cache put series: query=%s, timeRange=%s, step=%d, window=%d, series=%d", query, ec.timeRangeString(), ec.Step, window, len(tss))
defer qt.Done()
}
if !ec.mayCache() {
qt.Printf("do not store series to cache, since it is disabled in the current context")
if len(tss) == 0 {
qt.Printf("do not cache empty series list")
return
}
if len(tss) == 0 {
qt.Printf("do not store empty series list")
return
if len(tss) > 1 {
// Verify whether tss contains series with duplicate naming.
// There is little sense in storing such series in the cache, since they cannot be merged in mergeSeries() later.
bb := bbPool.Get()
m := make(map[string]struct{}, len(tss))
for _, ts := range tss {
bb.B = marshalMetricNameSorted(bb.B[:0], &ts.MetricName)
if _, ok := m[string(bb.B)]; ok {
qt.Printf("do not cache series with duplicate naming %s", &ts.MetricName)
return
}
m[string(bb.B)] = struct{}{}
}
bbPool.Put(bb)
}
// Remove values up to currentTime - step - cacheTimestampOffset,
@ -477,7 +485,7 @@ func mustSaveRollupResultCacheKeyPrefix(path string) {
var tooBigRollupResults = metrics.NewCounter("vm_too_big_rollup_results_total")
// Increment this value every time the format of the cache changes.
const rollupResultCacheVersion = 10
const rollupResultCacheVersion = 11
const (
rollupResultCacheTypeSeries = 0
@ -522,74 +530,115 @@ func marshalTagFiltersForRollupResultCacheKey(dst []byte, etfs [][]storage.TagFi
return dst
}
// mergeTimeseries concatenates b with a and returns the result.
func equalTimestamps(a, b []int64) bool {
if len(a) != len(b) {
return false
}
for i, tsA := range a {
tsB := b[i]
if tsA != tsB {
return false
}
}
return true
}
// mergeSeries concatenates a with b and returns the result.
//
// true is returned on successful concatenation, false otherwise.
//
// Preconditions:
// - a mustn't intersect with b by timestamps.
// - a timestamps must be smaller than b timestamps.
// - bStart must be in the range [ec.Start .. ec.End]
// - a must contain series with all the samples on the range [ec.Start ... bStart - ec.Step] with ec.Step interval between them
// - b must contain series with all the samples on the range [bStart .. ec.End] with ec.Step interval between them
//
// Postconditions:
// - the returned series contain all the samples on the range [ec.Start .. ec.End] with ec.Step interval between them
// - a and b cannot be used after returning from the call.
func mergeTimeseries(qt *querytracer.Tracer, a, b []*timeseries, bStart int64, ec *EvalConfig) ([]*timeseries, error) {
qt = qt.NewChild("merge series len(a)=%d, len(b)=%d", len(a), len(b))
defer qt.Done()
func mergeSeries(qt *querytracer.Tracer, a, b []*timeseries, bStart int64, ec *EvalConfig) ([]*timeseries, bool) {
if qt.Enabled() {
qt = qt.NewChild("merge series on time range %s with step=%dms; len(a)=%d, len(b)=%d, bStart=%s",
ec.timeRangeString(), ec.Step, len(a), len(b), storage.TimestampToHumanReadableFormat(bStart))
defer qt.Done()
}
sharedTimestamps := ec.getSharedTimestamps()
if bStart == ec.Start {
// Nothing to merge - b covers all the time range.
// Verify b is correct.
i := 0
for i < len(sharedTimestamps) && sharedTimestamps[i] < bStart {
i++
}
aTimestamps := sharedTimestamps[:i]
bTimestamps := sharedTimestamps[i:]
if len(bTimestamps) == len(sharedTimestamps) {
// Nothing to merge - just return b to the caller
for _, tsB := range b {
if !equalTimestamps(tsB.Timestamps, bTimestamps) {
logger.Panicf("BUG: invalid timestamps in b series %s; got %d; want %d", &tsB.MetricName, tsB.Timestamps, bTimestamps)
}
tsB.denyReuse = true
tsB.Timestamps = sharedTimestamps
if len(tsB.Values) != len(tsB.Timestamps) {
logger.Panicf("BUG: unexpected number of values in b; got %d; want %d", len(tsB.Values), len(tsB.Timestamps))
}
}
return b, nil
return b, true
}
m := make(map[string]*timeseries, len(a))
bb := bbPool.Get()
defer bbPool.Put(bb)
mA := make(map[string]*timeseries, len(a))
for _, ts := range a {
bb.B = marshalMetricNameSorted(bb.B[:0], &ts.MetricName)
if _, ok := m[string(bb.B)]; ok {
return nil, fmt.Errorf("duplicate series found: %s", &ts.MetricName)
if !equalTimestamps(ts.Timestamps, aTimestamps) {
logger.Panicf("BUG: invalid timestamps in a series %s; got %d; want %d", &ts.MetricName, ts.Timestamps, aTimestamps)
}
m[string(bb.B)] = ts
bb.B = marshalMetricNameSorted(bb.B[:0], &ts.MetricName)
if _, ok := mA[string(bb.B)]; ok {
qt.Printf("cannot merge series because a series contain duplicate %s", &ts.MetricName)
return nil, false
}
mA[string(bb.B)] = ts
}
mB := make(map[string]struct{}, len(b))
rvs := make([]*timeseries, 0, len(a))
var aNaNs []float64
for _, tsB := range b {
if !equalTimestamps(tsB.Timestamps, bTimestamps) {
logger.Panicf("BUG: invalid timestamps for b series %s; got %d; want %d", &tsB.MetricName, tsB.Timestamps, bTimestamps)
}
bb.B = marshalMetricNameSorted(bb.B[:0], &tsB.MetricName)
if _, ok := mB[string(bb.B)]; ok {
qt.Printf("cannot merge series because b series contain duplicate %s", &tsB.MetricName)
return nil, false
}
mB[string(bb.B)] = struct{}{}
var tmp timeseries
tmp.denyReuse = true
tmp.Timestamps = sharedTimestamps
tmp.Values = make([]float64, 0, len(tmp.Timestamps))
tmp.MetricName.MoveFrom(&tsB.MetricName)
bb.B = marshalMetricNameSorted(bb.B[:0], &tmp.MetricName)
k := string(bb.B)
tsA := m[k]
tsA := mA[string(bb.B)]
if tsA == nil {
tStart := ec.Start
for tStart < bStart {
tmp.Values = append(tmp.Values, nan)
tStart += ec.Step
if aNaNs == nil {
tStart := ec.Start
for tStart < bStart {
aNaNs = append(aNaNs, nan)
tStart += ec.Step
}
}
tmp.Values = append(tmp.Values, aNaNs...)
} else {
tmp.Values = append(tmp.Values, tsA.Values...)
delete(m, k)
delete(mA, string(bb.B))
}
tmp.Values = append(tmp.Values, tsB.Values...)
if len(tmp.Values) != len(tmp.Timestamps) {
logger.Panicf("BUG: unexpected values after merging new values; got %d; want %d; len(a.Values)=%d; len(b.Values)=%d",
len(tmp.Values), len(tmp.Timestamps), len(tsA.Values), len(tsB.Values))
}
rvs = append(rvs, &tmp)
}
// Copy the remaining timeseries from m.
for _, tsA := range m {
// Copy the remaining timeseries from mA.
var bNaNs []float64
for _, tsA := range mA {
var tmp timeseries
tmp.denyReuse = true
tmp.Timestamps = sharedTimestamps
@ -597,18 +646,18 @@ func mergeTimeseries(qt *querytracer.Tracer, a, b []*timeseries, bStart int64, e
tmp.MetricName.MoveFrom(&tsA.MetricName)
tmp.Values = append(tmp.Values, tsA.Values...)
tStart := bStart
for tStart <= ec.End {
tmp.Values = append(tmp.Values, nan)
tStart += ec.Step
}
if len(tmp.Values) != len(tmp.Timestamps) {
logger.Panicf("BUG: unexpected values in the result after adding cached values; got %d; want %d", len(tmp.Values), len(tmp.Timestamps))
if bNaNs == nil {
tStart := bStart
for tStart <= ec.End {
bNaNs = append(bNaNs, nan)
tStart += ec.Step
}
}
tmp.Values = append(tmp.Values, bNaNs...)
rvs = append(rvs, &tmp)
}
qt.Printf("resulting series=%d", len(rvs))
return rvs, nil
return rvs, true
}
type rollupResultCacheMetainfo struct {
@ -691,9 +740,9 @@ func (mi *rollupResultCacheMetainfo) AddKey(key rollupResultCacheKey, start, end
end: end,
key: key,
})
if len(mi.entries) > 30 {
if len(mi.entries) > 10 {
// Remove old entries.
mi.entries = append(mi.entries[:0], mi.entries[10:]...)
mi.entries = append(mi.entries[:0], mi.entries[5:]...)
}
}

View file

@ -1,6 +1,7 @@
package promql
import (
"fmt"
"testing"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
@ -251,6 +252,7 @@ func TestRollupResultCache(t *testing.T) {
Timestamps: []int64{1000, 1200, 1400, 1600, 1800, 2000},
Values: []float64{1, 2, 3, 4, 5, 6},
}
ts.MetricName.MetricGroup = []byte(fmt.Sprintf("metric %d", i))
tss = append(tss, ts)
}
rollupResultCacheV.PutSeries(nil, ec, fe, window, tss)
@ -261,6 +263,29 @@ func TestRollupResultCache(t *testing.T) {
testTimeseriesEqual(t, tssResult, tss)
})
// Store series with identical naming (they shouldn't be stored)
t.Run("duplicate-series", func(t *testing.T) {
ResetRollupResultCache()
tss := []*timeseries{
{
Timestamps: []int64{800, 1000, 1200},
Values: []float64{0, 1, 2},
},
{
Timestamps: []int64{800, 1000, 1200},
Values: []float64{0, 1, 2},
},
}
rollupResultCacheV.PutSeries(nil, ec, fe, window, tss)
tssResult, newStart := rollupResultCacheV.GetSeries(nil, ec, fe, window)
if newStart != ec.Start {
t.Fatalf("unexpected newStart; got %d; want %d", newStart, ec.Start)
}
if len(tssResult) != 0 {
t.Fatalf("unexpected non-empty series returned")
}
})
// Store multiple time series
t.Run("multi-timeseries", func(t *testing.T) {
ResetRollupResultCache()
@ -300,7 +325,7 @@ func TestRollupResultCache(t *testing.T) {
}
func TestMergeTimeseries(t *testing.T) {
func TestMergeSeries(t *testing.T) {
ec := &EvalConfig{
Start: 1000,
End: 2000,
@ -317,9 +342,9 @@ func TestMergeTimeseries(t *testing.T) {
Values: []float64{1, 2, 3, 4, 5, 6},
},
}
tss, err := mergeTimeseries(nil, a, b, 1000, ec)
if err != nil {
t.Fatalf("unexpected error: %s", err)
tss, ok := mergeSeries(nil, a, b, 1000, ec)
if !ok {
t.Fatalf("unexpected failure to merge series")
}
tssExpected := []*timeseries{
{
@ -337,9 +362,9 @@ func TestMergeTimeseries(t *testing.T) {
Values: []float64{3, 4, 5, 6},
},
}
tss, err := mergeTimeseries(nil, a, b, bStart, ec)
if err != nil {
t.Fatalf("unexpected error: %s", err)
tss, ok := mergeSeries(nil, a, b, bStart, ec)
if !ok {
t.Fatalf("unexpected failure to merge series")
}
tssExpected := []*timeseries{
{
@ -357,9 +382,9 @@ func TestMergeTimeseries(t *testing.T) {
},
}
b := []*timeseries{}
tss, err := mergeTimeseries(nil, a, b, bStart, ec)
if err != nil {
t.Fatalf("unexpected error: %s", err)
tss, ok := mergeSeries(nil, a, b, bStart, ec)
if !ok {
t.Fatalf("unexpected failure to merge series")
}
tssExpected := []*timeseries{
{
@ -382,9 +407,9 @@ func TestMergeTimeseries(t *testing.T) {
Values: []float64{3, 4, 5, 6},
},
}
tss, err := mergeTimeseries(nil, a, b, bStart, ec)
if err != nil {
t.Fatalf("unexpected error: %s", err)
tss, ok := mergeSeries(nil, a, b, bStart, ec)
if !ok {
t.Fatalf("unexpected failure to merge series")
}
tssExpected := []*timeseries{
{
@ -409,9 +434,9 @@ func TestMergeTimeseries(t *testing.T) {
},
}
b[0].MetricName.MetricGroup = []byte("foo")
tss, err := mergeTimeseries(nil, a, b, bStart, ec)
if err != nil {
t.Fatalf("unexpected error: %s", err)
tss, ok := mergeSeries(nil, a, b, bStart, ec)
if !ok {
t.Fatalf("unexpected failure to merge series")
}
tssExpected := []*timeseries{
{
@ -431,6 +456,52 @@ func TestMergeTimeseries(t *testing.T) {
}
testTimeseriesEqual(t, tss, tssExpected)
})
t.Run("duplicate-series-a", func(t *testing.T) {
a := []*timeseries{
{
Timestamps: []int64{1000, 1200},
Values: []float64{2, 1},
},
{
Timestamps: []int64{1000, 1200},
Values: []float64{3, 3},
},
}
b := []*timeseries{
{
Timestamps: []int64{1400, 1600, 1800, 2000},
Values: []float64{3, 4, 5, 6},
},
}
tss, ok := mergeSeries(nil, a, b, bStart, ec)
if ok {
t.Fatalf("expecting failre to merge series")
}
testTimeseriesEqual(t, tss, nil)
})
t.Run("duplicate-series-b", func(t *testing.T) {
a := []*timeseries{
{
Timestamps: []int64{1000, 1200},
Values: []float64{1, 2},
},
}
b := []*timeseries{
{
Timestamps: []int64{1400, 1600, 1800, 2000},
Values: []float64{3, 4, 5, 6},
},
{
Timestamps: []int64{1400, 1600, 1800, 2000},
Values: []float64{13, 14, 15, 16},
},
}
tss, ok := mergeSeries(nil, a, b, bStart, ec)
if ok {
t.Fatalf("expecting failre to merge series")
}
testTimeseriesEqual(t, tss, nil)
})
}
func testTimeseriesEqual(t *testing.T, tss, tssExpected []*timeseries) {

View file

@ -30,6 +30,8 @@ The sandbox cluster installation is running under the constant load generated by
* FEATURE: dashboards: use `version` instead of `short_version` in version change annotation for single/cluster dashboards. The update should reflect version changes even if different flavours of the same release were applied (custom builds).
* BUGFIX: fix a bug, which could result in improper results and/or to `cannot merge series: duplicate series found` error during [range query](https://docs.victoriametrics.com/keyConcepts.html#range-query) query execution. The issue has been introduced in [v1.95.0](https://docs.victoriametrics.com/CHANGELOG.html#v1950). See [this bugreport](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5332) for details.
## [v1.95.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.95.0)
Released at 2023-11-15