diff --git a/app/vmselect/netstorage/netstorage.go b/app/vmselect/netstorage/netstorage.go index 8813b724c..9175fda16 100644 --- a/app/vmselect/netstorage/netstorage.go +++ b/app/vmselect/netstorage/netstorage.go @@ -68,9 +68,10 @@ func (rss *Results) Cancel() { // RunParallel runs in parallel f for all the results from rss. // // f shouldn't hold references to rs after returning. +// workerID is the id of the worker goroutine that calls f. // // rss becomes unusable after the call to RunParallel. -func (rss *Results) RunParallel(f func(rs *Result)) error { +func (rss *Results) RunParallel(f func(rs *Result, workerID uint)) error { defer func() { putTmpBlocksFile(rss.tbf) rss.tbf = nil @@ -88,7 +89,7 @@ func (rss *Results) RunParallel(f func(rs *Result)) error { // Start workers. for i := 0; i < workersCount; i++ { - go func() { + go func(workerID uint) { rs := getResult() defer putResult(rs) maxWorkersCount := gomaxprocs / workersCount @@ -106,13 +107,13 @@ func (rss *Results) RunParallel(f func(rs *Result)) error { // Skip empty blocks. continue } - f(rs) + f(rs, workerID) } // Drain the remaining work for range workCh { } doneCh <- err - }() + }(uint(i)) } // Feed workers with work. diff --git a/app/vmselect/prometheus/prometheus.go b/app/vmselect/prometheus/prometheus.go index 29f5e9248..71638815f 100644 --- a/app/vmselect/prometheus/prometheus.go +++ b/app/vmselect/prometheus/prometheus.go @@ -83,7 +83,7 @@ func FederateHandler(at *auth.Token, w http.ResponseWriter, r *http.Request) err resultsCh := make(chan *quicktemplate.ByteBuffer) doneCh := make(chan error) go func() { - err := rss.RunParallel(func(rs *netstorage.Result) { + err := rss.RunParallel(func(rs *netstorage.Result, workerID uint) { bb := quicktemplate.AcquireByteBuffer() WriteFederate(bb, rs) resultsCh <- bb @@ -181,7 +181,7 @@ func exportHandler(at *auth.Token, w http.ResponseWriter, matches []string, star resultsCh := make(chan *quicktemplate.ByteBuffer, runtime.GOMAXPROCS(-1)) doneCh := make(chan error) go func() { - err := rss.RunParallel(func(rs *netstorage.Result) { + err := rss.RunParallel(func(rs *netstorage.Result, workerID uint) { bb := quicktemplate.AcquireByteBuffer() writeLineFunc(bb, rs) resultsCh <- bb @@ -413,7 +413,7 @@ func SeriesHandler(at *auth.Token, w http.ResponseWriter, r *http.Request) error resultsCh := make(chan *quicktemplate.ByteBuffer) doneCh := make(chan error) go func() { - err := rss.RunParallel(func(rs *netstorage.Result) { + err := rss.RunParallel(func(rs *netstorage.Result, workerID uint) { bb := quicktemplate.AcquireByteBuffer() writemetricNameObject(bb, &rs.MetricName) resultsCh <- bb diff --git a/app/vmselect/promql/aggr_incremental.go b/app/vmselect/promql/aggr_incremental.go index 07ec8dcfd..8b0fe4a66 100644 --- a/app/vmselect/promql/aggr_incremental.go +++ b/app/vmselect/promql/aggr_incremental.go @@ -13,30 +13,37 @@ import ( var incrementalAggrFuncCallbacksMap = map[string]*incrementalAggrFuncCallbacks{ "sum": { updateAggrFunc: updateAggrSum, + mergeAggrFunc: mergeAggrSum, finalizeAggrFunc: finalizeAggrCommon, }, "min": { updateAggrFunc: updateAggrMin, + mergeAggrFunc: mergeAggrMin, finalizeAggrFunc: finalizeAggrCommon, }, "max": { updateAggrFunc: updateAggrMax, + mergeAggrFunc: mergeAggrMax, finalizeAggrFunc: finalizeAggrCommon, }, "avg": { updateAggrFunc: updateAggrAvg, + mergeAggrFunc: mergeAggrAvg, finalizeAggrFunc: finalizeAggrAvg, }, "count": { updateAggrFunc: updateAggrCount, + mergeAggrFunc: mergeAggrCount, finalizeAggrFunc: finalizeAggrCount, }, "sum2": { updateAggrFunc: updateAggrSum2, + mergeAggrFunc: mergeAggrSum2, finalizeAggrFunc: finalizeAggrCommon, }, "geomean": { updateAggrFunc: updateAggrGeomean, + mergeAggrFunc: mergeAggrGeomean, finalizeAggrFunc: finalizeAggrGeomean, }, } @@ -44,8 +51,8 @@ var incrementalAggrFuncCallbacksMap = map[string]*incrementalAggrFuncCallbacks{ type incrementalAggrFuncContext struct { ae *aggrFuncExpr - mu sync.Mutex - m map[string]*incrementalAggrContext + mLock sync.Mutex + m map[uint]map[string]*incrementalAggrContext callbacks *incrementalAggrFuncCallbacks } @@ -53,17 +60,24 @@ type incrementalAggrFuncContext struct { func newIncrementalAggrFuncContext(ae *aggrFuncExpr, callbacks *incrementalAggrFuncCallbacks) *incrementalAggrFuncContext { return &incrementalAggrFuncContext{ ae: ae, - m: make(map[string]*incrementalAggrContext, 1), + m: make(map[uint]map[string]*incrementalAggrContext), callbacks: callbacks, } } -func (iafc *incrementalAggrFuncContext) updateTimeseries(ts *timeseries) { +func (iafc *incrementalAggrFuncContext) updateTimeseries(ts *timeseries, workerID uint) { + iafc.mLock.Lock() + m := iafc.m[workerID] + if m == nil { + m = make(map[string]*incrementalAggrContext, 1) + iafc.m[workerID] = m + } + iafc.mLock.Unlock() + removeGroupTags(&ts.MetricName, &iafc.ae.Modifier) bb := bbPool.Get() bb.B = marshalMetricNameSorted(bb.B[:0], &ts.MetricName) - iafc.mu.Lock() - iac := iafc.m[string(bb.B)] + iac := m[string(bb.B)] if iac == nil { tsAggr := ×eries{ Values: make([]float64, len(ts.Values)), @@ -75,19 +89,30 @@ func (iafc *incrementalAggrFuncContext) updateTimeseries(ts *timeseries) { ts: tsAggr, values: make([]float64, len(ts.Values)), } - iafc.m[string(bb.B)] = iac + m[string(bb.B)] = iac } - iafc.callbacks.updateAggrFunc(iac, ts.Values) - iafc.mu.Unlock() bbPool.Put(bb) + iafc.callbacks.updateAggrFunc(iac, ts.Values) } func (iafc *incrementalAggrFuncContext) finalizeTimeseries() []*timeseries { - // There is no need in iafc.mu.Lock here, since getTimeseries must be called + // There is no need in iafc.mLock.Lock here, since finalizeTimeseries must be called // without concurrent goroutines touching iafc. - tss := make([]*timeseries, 0, len(iafc.m)) + mGlobal := make(map[string]*incrementalAggrContext) + mergeAggrFunc := iafc.callbacks.mergeAggrFunc + for _, m := range iafc.m { + for k, iac := range m { + iacGlobal := mGlobal[k] + if iacGlobal == nil { + mGlobal[k] = iac + continue + } + mergeAggrFunc(iacGlobal, iac) + } + } + tss := make([]*timeseries, 0, len(mGlobal)) finalizeAggrFunc := iafc.callbacks.finalizeAggrFunc - for _, iac := range iafc.m { + for _, iac := range mGlobal { finalizeAggrFunc(iac) tss = append(tss, iac.ts) } @@ -96,6 +121,7 @@ func (iafc *incrementalAggrFuncContext) finalizeTimeseries() []*timeseries { type incrementalAggrFuncCallbacks struct { updateAggrFunc func(iac *incrementalAggrContext, values []float64) + mergeAggrFunc func(dst, src *incrementalAggrContext) finalizeAggrFunc func(iac *incrementalAggrContext) } @@ -129,8 +155,33 @@ func updateAggrSum(iac *incrementalAggrContext, values []float64) { if math.IsNaN(v) { continue } + if dstCounts[i] == 0 { + dstValues[i] = v + dstCounts[i] = 1 + continue + } + dstValues[i] += v + } +} + +func mergeAggrSum(dst, src *incrementalAggrContext) { + srcValues := src.ts.Values + dstValues := dst.ts.Values + srcCounts := src.values + dstCounts := dst.values + _ = srcCounts[len(srcValues)-1] + _ = dstCounts[len(srcValues)-1] + _ = dstValues[len(srcValues)-1] + for i, v := range srcValues { + if srcCounts[i] == 0 { + continue + } + if dstCounts[i] == 0 { + dstValues[i] = v + dstCounts[i] = 1 + continue + } dstValues[i] += v - dstCounts[i] = 1 } } @@ -154,6 +205,29 @@ func updateAggrMin(iac *incrementalAggrContext, values []float64) { } } +func mergeAggrMin(dst, src *incrementalAggrContext) { + srcValues := src.ts.Values + dstValues := dst.ts.Values + srcCounts := src.values + dstCounts := dst.values + _ = srcCounts[len(srcValues)-1] + _ = dstCounts[len(srcValues)-1] + _ = dstValues[len(srcValues)-1] + for i, v := range srcValues { + if srcCounts[i] == 0 { + continue + } + if dstCounts[i] == 0 { + dstValues[i] = v + dstCounts[i] = 1 + continue + } + if v < dstValues[i] { + dstValues[i] = v + } + } +} + func updateAggrMax(iac *incrementalAggrContext, values []float64) { dstValues := iac.ts.Values dstCounts := iac.values @@ -174,6 +248,29 @@ func updateAggrMax(iac *incrementalAggrContext, values []float64) { } } +func mergeAggrMax(dst, src *incrementalAggrContext) { + srcValues := src.ts.Values + dstValues := dst.ts.Values + srcCounts := src.values + dstCounts := dst.values + _ = srcCounts[len(srcValues)-1] + _ = dstCounts[len(srcValues)-1] + _ = dstValues[len(srcValues)-1] + for i, v := range srcValues { + if srcCounts[i] == 0 { + continue + } + if dstCounts[i] == 0 { + dstValues[i] = v + dstCounts[i] = 1 + continue + } + if v > dstValues[i] { + dstValues[i] = v + } + } +} + func updateAggrAvg(iac *incrementalAggrContext, values []float64) { // Do not use `Rapid calculation methods` at https://en.wikipedia.org/wiki/Standard_deviation, // since it is slower and has no obvious benefits in increased precision. @@ -195,6 +292,28 @@ func updateAggrAvg(iac *incrementalAggrContext, values []float64) { } } +func mergeAggrAvg(dst, src *incrementalAggrContext) { + srcValues := src.ts.Values + dstValues := dst.ts.Values + srcCounts := src.values + dstCounts := dst.values + _ = srcCounts[len(srcValues)-1] + _ = dstCounts[len(srcValues)-1] + _ = dstValues[len(srcValues)-1] + for i, v := range srcValues { + if srcCounts[i] == 0 { + continue + } + if dstCounts[i] == 0 { + dstValues[i] = v + dstCounts[i] = srcCounts[i] + continue + } + dstValues[i] += v + dstCounts[i] += srcCounts[i] + } +} + func finalizeAggrAvg(iac *incrementalAggrContext) { dstValues := iac.ts.Values counts := iac.values @@ -219,6 +338,15 @@ func updateAggrCount(iac *incrementalAggrContext, values []float64) { } } +func mergeAggrCount(dst, src *incrementalAggrContext) { + srcValues := src.ts.Values + dstValues := dst.ts.Values + _ = dstValues[len(srcValues)-1] + for i, v := range srcValues { + dstValues[i] += v + } +} + func finalizeAggrCount(iac *incrementalAggrContext) { // Nothing to do } @@ -232,8 +360,33 @@ func updateAggrSum2(iac *incrementalAggrContext, values []float64) { if math.IsNaN(v) { continue } + if dstCounts[i] == 0 { + dstValues[i] = v * v + dstCounts[i] = 1 + continue + } dstValues[i] += v * v - dstCounts[i] = 1 + } +} + +func mergeAggrSum2(dst, src *incrementalAggrContext) { + srcValues := src.ts.Values + dstValues := dst.ts.Values + srcCounts := src.values + dstCounts := dst.values + _ = srcCounts[len(srcValues)-1] + _ = dstCounts[len(srcValues)-1] + _ = dstValues[len(srcValues)-1] + for i, v := range srcValues { + if srcCounts[i] == 0 { + continue + } + if dstCounts[i] == 0 { + dstValues[i] = v + dstCounts[i] = 1 + continue + } + dstValues[i] += v } } @@ -256,6 +409,28 @@ func updateAggrGeomean(iac *incrementalAggrContext, values []float64) { } } +func mergeAggrGeomean(dst, src *incrementalAggrContext) { + srcValues := src.ts.Values + dstValues := dst.ts.Values + srcCounts := src.values + dstCounts := dst.values + _ = srcCounts[len(srcValues)-1] + _ = dstCounts[len(srcValues)-1] + _ = dstValues[len(srcValues)-1] + for i, v := range srcValues { + if srcCounts[i] == 0 { + continue + } + if dstCounts[i] == 0 { + dstValues[i] = v + dstCounts[i] = srcCounts[i] + continue + } + dstValues[i] *= v + dstCounts[i] += srcCounts[i] + } +} + func finalizeAggrGeomean(iac *incrementalAggrContext) { dstValues := iac.ts.Values counts := iac.values diff --git a/app/vmselect/promql/aggr_incremental_test.go b/app/vmselect/promql/aggr_incremental_test.go new file mode 100644 index 000000000..7e779a124 --- /dev/null +++ b/app/vmselect/promql/aggr_incremental_test.go @@ -0,0 +1,187 @@ +package promql + +import ( + "fmt" + "math" + "reflect" + "runtime" + "sync" + "testing" +) + +func TestIncrementalAggr(t *testing.T) { + defaultTimestamps := []int64{100e3, 200e3, 300e3, 400e3} + values := [][]float64{ + {1, nan, 2, nan}, + {3, nan, nan, 4}, + {nan, nan, 5, 6}, + {7, nan, 8, 9}, + {4, nan, nan, nan}, + {2, nan, 3, 2}, + {0, nan, 1, 1}, + } + tssSrc := make([]*timeseries, len(values)) + for i, vs := range values { + ts := ×eries{ + Timestamps: defaultTimestamps, + Values: vs, + } + tssSrc[i] = ts + } + + copyTimeseries := func(tssSrc []*timeseries) []*timeseries { + tssDst := make([]*timeseries, len(tssSrc)) + for i, tsSrc := range tssSrc { + var tsDst timeseries + tsDst.CopyFromShallowTimestamps(tsSrc) + tssDst[i] = &tsDst + } + return tssDst + } + + f := func(name string, valuesExpected []float64) { + t.Helper() + callbacks := getIncrementalAggrFuncCallbacks(name) + ae := &aggrFuncExpr{ + Name: name, + } + tssExpected := []*timeseries{{ + Timestamps: defaultTimestamps, + Values: valuesExpected, + }} + // run the test multiple times to make sure there are no side effects on concurrency + for i := 0; i < 10; i++ { + iafc := newIncrementalAggrFuncContext(ae, callbacks) + tssSrcCopy := copyTimeseries(tssSrc) + if err := testIncrementalParallelAggr(iafc, tssSrcCopy, tssExpected); err != nil { + t.Fatalf("unexpected error on iteration %d: %s", i, err) + } + } + } + + t.Run("sum", func(t *testing.T) { + t.Parallel() + valuesExpected := []float64{17, nan, 19, 22} + f("sum", valuesExpected) + }) + t.Run("min", func(t *testing.T) { + t.Parallel() + valuesExpected := []float64{0, nan, 1, 1} + f("min", valuesExpected) + }) + t.Run("max", func(t *testing.T) { + t.Parallel() + valuesExpected := []float64{7, nan, 8, 9} + f("max", valuesExpected) + }) + t.Run("avg", func(t *testing.T) { + t.Parallel() + valuesExpected := []float64{2.8333333333333335, nan, 3.8, 4.4} + f("avg", valuesExpected) + }) + t.Run("count", func(t *testing.T) { + t.Parallel() + valuesExpected := []float64{6, 0, 5, 5} + f("count", valuesExpected) + }) + t.Run("sum2", func(t *testing.T) { + t.Parallel() + valuesExpected := []float64{79, nan, 103, 138} + f("sum2", valuesExpected) + }) + t.Run("geomean", func(t *testing.T) { + t.Parallel() + valuesExpected := []float64{0, nan, 2.9925557394776896, 3.365865436338599} + f("geomean", valuesExpected) + }) +} + +func testIncrementalParallelAggr(iafc *incrementalAggrFuncContext, tssSrc, tssExpected []*timeseries) error { + const workersCount = 3 + tsCh := make(chan *timeseries) + var wg sync.WaitGroup + wg.Add(workersCount) + for i := 0; i < workersCount; i++ { + go func(workerID uint) { + defer wg.Done() + for ts := range tsCh { + runtime.Gosched() // allow other goroutines performing the work + iafc.updateTimeseries(ts, workerID) + } + }(uint(i)) + } + for _, ts := range tssSrc { + tsCh <- ts + } + close(tsCh) + wg.Wait() + tssActual := iafc.finalizeTimeseries() + if err := expectTimeseriesEqual(tssActual, tssExpected); err != nil { + return fmt.Errorf("%s; tssActual=%v, tssExpected=%v", err, tssActual, tssExpected) + } + return nil +} + +func expectTimeseriesEqual(actual, expected []*timeseries) error { + if len(actual) != len(expected) { + return fmt.Errorf("unexpected number of time series; got %d; want %d", len(actual), len(expected)) + } + mActual := timeseriesToMap(actual) + mExpected := timeseriesToMap(expected) + if len(mActual) != len(mExpected) { + return fmt.Errorf("unexpected number of time series after converting to map; got %d; want %d", len(mActual), len(mExpected)) + } + for k, tsExpected := range mExpected { + tsActual := mActual[k] + if tsActual == nil { + return fmt.Errorf("missing time series for key=%q", k) + } + if err := expectTsEqual(tsActual, tsExpected); err != nil { + return err + } + } + return nil +} + +func timeseriesToMap(tss []*timeseries) map[string]*timeseries { + m := make(map[string]*timeseries, len(tss)) + for _, ts := range tss { + k := ts.MetricName.Marshal(nil) + m[string(k)] = ts + } + return m +} + +func expectTsEqual(actual, expected *timeseries) error { + mnActual := actual.MetricName.Marshal(nil) + mnExpected := expected.MetricName.Marshal(nil) + if string(mnActual) != string(mnExpected) { + return fmt.Errorf("unexpected metric name; got %q; want %q", mnActual, mnExpected) + } + if !reflect.DeepEqual(actual.Timestamps, expected.Timestamps) { + return fmt.Errorf("unexpected timestamps; got %v; want %v", actual.Timestamps, expected.Timestamps) + } + if err := compareValues(actual.Values, expected.Values); err != nil { + return fmt.Errorf("%s; actual %v; expected %v", err, actual.Values, expected.Values) + } + return nil +} + +func compareValues(vs1, vs2 []float64) error { + if len(vs1) != len(vs2) { + return fmt.Errorf("unexpected number of values; got %d; want %d", len(vs1), len(vs2)) + } + for i, v1 := range vs1 { + v2 := vs2[i] + if math.IsNaN(v1) { + if !math.IsNaN(v2) { + return fmt.Errorf("unexpected value; got %v; want %v", v1, v2) + } + continue + } + if v1 != v2 { + return fmt.Errorf("unexpected value; got %v; want %v", v1, v2) + } + } + return nil +} diff --git a/app/vmselect/promql/eval.go b/app/vmselect/promql/eval.go index c8f255273..21704b2ee 100644 --- a/app/vmselect/promql/eval.go +++ b/app/vmselect/promql/eval.go @@ -663,7 +663,7 @@ func getRollupMemoryLimiter() *memoryLimiter { func evalRollupWithIncrementalAggregate(iafc *incrementalAggrFuncContext, rss *netstorage.Results, rcs []*rollupConfig, preFunc func(values []float64, timestamps []int64), sharedTimestamps []int64) ([]*timeseries, error) { - err := rss.RunParallel(func(rs *netstorage.Result) { + err := rss.RunParallel(func(rs *netstorage.Result, workerID uint) { preFunc(rs.Values, rs.Timestamps) ts := getTimeseries() defer putTimeseries(ts) @@ -675,7 +675,7 @@ func evalRollupWithIncrementalAggregate(iafc *incrementalAggrFuncContext, rss *n } ts.Values = rc.Do(ts.Values[:0], rs.Values, rs.Timestamps) ts.Timestamps = sharedTimestamps - iafc.updateTimeseries(ts) + iafc.updateTimeseries(ts, workerID) ts.Timestamps = nil } }) @@ -690,7 +690,7 @@ func evalRollupNoIncrementalAggregate(rss *netstorage.Results, rcs []*rollupConf preFunc func(values []float64, timestamps []int64), sharedTimestamps []int64) ([]*timeseries, error) { tss := make([]*timeseries, 0, rss.Len()*len(rcs)) var tssLock sync.Mutex - err := rss.RunParallel(func(rs *netstorage.Result) { + err := rss.RunParallel(func(rs *netstorage.Result, workerID uint) { preFunc(rs.Values, rs.Timestamps) for _, rc := range rcs { var ts timeseries