From ab44787a70d732a73b82eaed9ffd6c3cec72593e Mon Sep 17 00:00:00 2001 From: rbizos <58781501+rbizos@users.noreply.github.com> Date: Tue, 26 Mar 2024 14:48:58 +0100 Subject: [PATCH] adding AggregateSeriesLists graphite function (#5809) * adding aggregate series list graphite function adding also aliases for sum diff and multiply * Adding tests for aggregateSeriesLists and aliases --- app/vmselect/graphite/eval_test.go | 96 +++++++++++++++++++++ app/vmselect/graphite/functions.json | 124 ++++++++++++++++++++++++++- app/vmselect/graphite/transform.go | 114 ++++++++++++++++++------ 3 files changed, 307 insertions(+), 27 deletions(-) diff --git a/app/vmselect/graphite/eval_test.go b/app/vmselect/graphite/eval_test.go index a817120ed..17f37e075 100644 --- a/app/vmselect/graphite/eval_test.go +++ b/app/vmselect/graphite/eval_test.go @@ -3281,6 +3281,102 @@ func TestExecExprSuccess(t *testing.T) { Tags: map[string]string{"name": "foo.bar.baz", "summarize": "45s", "summarizeFunction": "sum"}, }, }) + f(`aggregateSeriesLists( + summarize( + time('foo.bar.baz',10), + '45s' + ), + summarize( + time('bar.foo.bad',10), + '45s' + ), 'sum')`, []*series{ + { + Timestamps: []int64{120000, 165000}, + Values: []float64{1170, 2000}, + Name: `sumSeries(summarize(foo.bar.baz,'45s','sum'),summarize(bar.foo.bad,'45s','sum'))`, + Tags: map[string]string{"name": "foo.bar.baz", "summarize": "45s", "summarizeFunction": "sum"}, + }, + }) + f(`sumSeriesLists( + summarize( + time('foo.bar.baz',10), + '45s' + ), + summarize( + time('bar.foo.bad',10), + '45s' + ))`, []*series{ + { + Timestamps: []int64{120000, 165000}, + Values: []float64{1170, 2000}, + Name: `sumSeries(summarize(foo.bar.baz,'45s','sum'),summarize(bar.foo.bad,'45s','sum'))`, + Tags: map[string]string{"name": "foo.bar.baz", "summarize": "45s", "summarizeFunction": "sum"}, + }, + }) + f(`aggregateSeriesLists( + summarize( + time('foo.bar.baz',10), + '45s' + ), + summarize( + time('bar.foo.bad',10), + '45s' + ), 'diff')`, []*series{ + { + Timestamps: []int64{120000, 165000}, + Values: []float64{0, 0}, + Name: `diffSeries(summarize(foo.bar.baz,'45s','sum'),summarize(bar.foo.bad,'45s','sum'))`, + Tags: map[string]string{"name": "foo.bar.baz", "summarize": "45s", "summarizeFunction": "sum"}, + }, + }) + f(`diffSeriesLists( + summarize( + time('foo.bar.baz',10), + '45s' + ), + summarize( + time('bar.foo.bad',10), + '45s' + ))`, []*series{ + { + Timestamps: []int64{120000, 165000}, + Values: []float64{0, 0}, + Name: `diffSeries(summarize(foo.bar.baz,'45s','sum'),summarize(bar.foo.bad,'45s','sum'))`, + Tags: map[string]string{"name": "foo.bar.baz", "summarize": "45s", "summarizeFunction": "sum"}, + }, + }) + f(`aggregateSeriesLists( + summarize( + time('foo.bar.baz',10), + '45s' + ), + summarize( + time('bar.foo.bad',10), + '45s' + ), 'multiply')`, []*series{ + { + Timestamps: []int64{120000, 165000}, + Values: []float64{342225, 1e+06}, + Name: `multiplySeries(summarize(foo.bar.baz,'45s','sum'),summarize(bar.foo.bad,'45s','sum'))`, + Tags: map[string]string{"name": "foo.bar.baz", "summarize": "45s", "summarizeFunction": "sum"}, + }, + }) + f(`multiplySeriesLists( + summarize( + time('foo.bar.baz',10), + '45s' + ), + summarize( + time('bar.foo.bad',10), + '45s' + ))`, []*series{ + { + Timestamps: []int64{120000, 165000}, + Values: []float64{342225, 1e+06}, + Name: `multiplySeries(summarize(foo.bar.baz,'45s','sum'),summarize(bar.foo.bad,'45s','sum'))`, + Tags: map[string]string{"name": "foo.bar.baz", "summarize": "45s", "summarizeFunction": "sum"}, + }, + }) f(`weightedAverage( summarize( group( diff --git a/app/vmselect/graphite/functions.json b/app/vmselect/graphite/functions.json index 3279a3b7a..387527615 100644 --- a/app/vmselect/graphite/functions.json +++ b/app/vmselect/graphite/functions.json @@ -40,6 +40,52 @@ } ] }, + "aggregateSeriesLists": { + "name": "aggregateSeriesLists", + "function": "aggregateSeriesLists(seriesListFirstPos, seriesListSecondPos, func, xFilesFactor=None)", + "description": "Iterates over a two lists and aggregates using specified function list1[0] to list2[0], list1[1] to list2[1] and so on. The lists will need to be the same length\n\nPosition of seriesList matters. For example using “sum” function aggregateSeriesLists(list1[0..n], list2[0..n], \"sum\") it would find sum for each member of the list list1[0] + list2[0], list1[1] + list2[1], list1[n] + list2[n].", + "module": "graphite.render.functions", + "group": "Combine", + "params": [ + { + "name": "seriesListFirstPos", + "type": "seriesList", + "required": true + }, + { + "name": "seriesListSecondPos", + "type": "seriesList", + "required": true + }, + { + "name": "func", + "type": "aggFunc", + "required": true, + "options": [ + "average", + "avg", + "avg_zero", + "count", + "current", + "diff", + "last", + "max", + "median", + "min", + "multiply", + "range", + "rangeOf", + "stddev", + "sum", + "total" + ] + }, + { + "name": "xFilesFactor", + "type": "float" + } + ] + }, "aggregateWithWildcards": { "name": "aggregateWithWildcards", "function": "aggregateWithWildcards(seriesList, func, *positions)", @@ -211,6 +257,25 @@ } ] }, + "DiffSeriesLists": { + "name": "DiffSeriesLists", + "function": "DiffSeriesLists(seriesListFirstPos, seriesListSecondPos)", + "description": "Iterates over a two lists and subtracts series lists 2 through n from series 1 list1[0] to list2[0], list1[1] to list2[1] and so on. The lists will need to be the same length", + "module": "graphite.render.functions", + "group": "Combine", + "params": [ + { + "name": "seriesListFirstPos", + "type": "seriesList", + "required": true + }, + { + "name": "seriesListSecondPos", + "type": "seriesList", + "required": true + } + ] + }, "divideSeries": { "name": "divideSeries", "function": "divideSeries(dividendSeriesList, divisorSeries)", @@ -529,6 +594,25 @@ } ] }, + "MultiplySeriesLists": { + "name": "MultiplySeriesLists", + "function": "MultiplySeriesLists(seriesListFirstPos, seriesListSecondPos)", + "description": "Iterates over a two lists and multiply series lists 2 through n from series 1 list1[0] to list2[0], list1[1] to list2[1] and so on. The lists will need to be the same length", + "module": "graphite.render.functions", + "group": "Combine", + "params": [ + { + "name": "seriesListFirstPos", + "type": "seriesList", + "required": true + }, + { + "name": "seriesListSecondPos", + "type": "seriesList", + "required": true + } + ] + }, "multiplySeriesWithWildcards": { "name": "multiplySeriesWithWildcards", "function": "multiplySeriesWithWildcards(seriesList, *position)", @@ -715,6 +799,26 @@ } ] }, + "SumSeriesLists": { + "name": "SumSeriesLists", + "function": "sumSeriesLists(seriesListFirstPos, seriesListSecondPos)", + "description": "Iterates over a two lists and sums series lists 2 through n from series 1 list1[0] to list2[0], list1[1] to list2[1] and so on. The lists will need to be the same length", + "module": "graphite.render.functions", + "group": "Combine", + "params": [ + { + "name": "seriesListFirstPos", + "type": "seriesList", + "required": true + }, + { + "name": "seriesListSecondPos", + "type": "seriesList", + "required": true + } + ] + }, + "sumSeriesWithWildcards": { "name": "sumSeriesWithWildcards", "function": "sumSeriesWithWildcards(seriesList, *position)", @@ -728,7 +832,25 @@ "required": true }, { - "name": "position", + "name": "position", "SumSeriesLists": { + "name": "SumSeriesLists", + "function": "sumSeriesLists(seriesListFirstPos, seriesListSecondPos)", + "description": "Iterates over a two lists and sums series lists 2 through n from series 1 list1[0] to list2[0], list1[1] to list2[1] and so on. The lists will need to be the same length", + "module": "graphite.render.functions", + "group": "Combine", + "params": [ + { + "name": "seriesListFirstPos", + "type": "seriesList", + "required": true + }, + { + "name": "seriesListSecondPos", + "type": "seriesList", + "required": true + } + ] + }, "type": "node", "multiple": true } diff --git a/app/vmselect/graphite/transform.go b/app/vmselect/graphite/transform.go index f3d96328a..ff64ffcdb 100644 --- a/app/vmselect/graphite/transform.go +++ b/app/vmselect/graphite/transform.go @@ -36,6 +36,7 @@ func init() { "add": transformAdd, "aggregate": transformAggregate, "aggregateLine": transformAggregateLine, + "aggregateSeriesLists": transformAggregateSeriesLists, "aggregateWithWildcards": transformAggregateWithWildcards, "alias": transformAlias, "aliasByMetric": transformAliasByMetric, @@ -66,6 +67,7 @@ func init() { "delay": transformDelay, "derivative": transformDerivative, "diffSeries": transformDiffSeries, + "diffSeriesLists": transformDiffSeriesLists, "divideSeries": transformDivideSeries, "divideSeriesLists": transformDivideSeriesLists, "drawAsInfinite": transformDrawAsInfinite, @@ -125,6 +127,7 @@ func init() { "movingSum": transformMovingSum, "movingWindow": transformMovingWindow, "multiplySeries": transformMultiplySeries, + "multiplySeriesLists": transformMultiplySeriesLists, "multiplySeriesWithWildcards": transformMultiplySeriesWithWildcards, "nPercentile": transformNPercentile, "nonNegativeDerivative": transformNonNegativeDerivative, @@ -172,6 +175,7 @@ func init() { "substr": transformSubstr, "sum": transformSumSeries, "sumSeries": transformSumSeries, + "sumSeriesLists": transformSumSeriesLists, "sumSeriesWithWildcards": transformSumSeriesWithWildcards, "summarize": transformSummarize, "threshold": transformThreshold, @@ -1316,6 +1320,86 @@ func transformDivideSeries(ec *evalConfig, fe *graphiteql.FuncExpr) (nextSeriesF return f, nil } +func aggregateSeriesListsGeneric(ec *evalConfig, fe *graphiteql.FuncExpr, funcName string) (nextSeriesFunc, error) { + args := fe.Args + agg, err := getAggrFunc(funcName) + if err != nil { + return nil, err + } + nextSeriesFirst, err := evalSeriesList(ec, args, "seriesListFirstPos", 0) + if err != nil { + return nil, err + } + nextSeriesSecond, err := evalSeriesList(ec, args, "seriesListSecondPos", 1) + if err != nil { + return nil, err + } + return aggregateSeriesList(ec, fe, nextSeriesFirst, nextSeriesSecond, agg, funcName) +} + +// See https://graphite.readthedocs.io/en/latest/functions.html#graphite.render.functions.aggregateSeriesLists +func transformAggregateSeriesLists(ec *evalConfig, fe *graphiteql.FuncExpr) (nextSeriesFunc, error) { + args := fe.Args + if len(args) != 3 && len(args) != 4 { + return nil, fmt.Errorf("unexpected number of args; got %d; want 3 or 4", len(args)) + } + + funcName, err := getString(args, "func", 2) + if err != nil { + return nil, err + } + + return aggregateSeriesListsGeneric(ec, fe, funcName) +} + +// See https://graphite.readthedocs.io/en/latest/functions.html#graphite.render.functions.sumSeriesLists +func transformSumSeriesLists(ec *evalConfig, fe *graphiteql.FuncExpr) (nextSeriesFunc, error) { + return aggregateSeriesListsGeneric(ec, fe, "sum") +} + +// See https://graphite.readthedocs.io/en/latest/functions.html#graphite.render.functions.multiplySeriesLists +func transformMultiplySeriesLists(ec *evalConfig, fe *graphiteql.FuncExpr) (nextSeriesFunc, error) { + return aggregateSeriesListsGeneric(ec, fe, "multiply") +} + +// See https://graphite.readthedocs.io/en/latest/functions.html#graphite.render.functions.diffSeriesLists +func transformDiffSeriesLists(ec *evalConfig, fe *graphiteql.FuncExpr) (nextSeriesFunc, error) { + return aggregateSeriesListsGeneric(ec, fe, "diff") +} + +func aggregateSeriesList(ec *evalConfig, fe *graphiteql.FuncExpr, nextSeriesFirst, nextSeriesSecond nextSeriesFunc, agg aggrFunc, funcName string) (nextSeriesFunc, error) { + ssFirst, stepFirst, err := fetchNormalizedSeries(ec, nextSeriesFirst, false) + if err != nil { + return nil, err + } + ssSecond, stepSecond, err := fetchNormalizedSeries(ec, nextSeriesSecond, false) + if err != nil { + return nil, err + } + + if len(ssFirst) != len(ssSecond) { + return nil, fmt.Errorf("First and second lists must have equal number of series; got %d vs %d series", len(ssFirst), len(ssSecond)) + } + if stepFirst != stepSecond { + return nil, fmt.Errorf("step mismatch for first and second: %d vs %d", stepFirst, stepSecond) + } + + valuePair := make([]float64, 2) + for i, s := range ssFirst { + sSecond := ssSecond[i] + values := s.Values + secondValues := sSecond.Values + for j, v := range values { + valuePair[0], valuePair[1] = v, secondValues[j] + values[j] = agg(valuePair) + } + s.Name = fmt.Sprintf("%sSeries(%s,%s)", funcName, s.Name, sSecond.Name) + s.expr = fe + s.pathExpression = s.Name + } + return multiSeriesFunc(ssFirst), nil +} + // See https://graphite.readthedocs.io/en/stable/functions.html#graphite.render.functions.divideSeriesLists func transformDivideSeriesLists(ec *evalConfig, fe *graphiteql.FuncExpr) (nextSeriesFunc, error) { args := fe.Args @@ -1326,36 +1410,14 @@ func transformDivideSeriesLists(ec *evalConfig, fe *graphiteql.FuncExpr) (nextSe if err != nil { return nil, err } - ssDividend, stepDivident, err := fetchNormalizedSeries(ec, nextDividend, false) - if err != nil { - return nil, err - } nextDivisor, err := evalSeriesList(ec, args, "divisorSeriesList", 1) if err != nil { return nil, err } - ssDivisor, stepDivisor, err := fetchNormalizedSeries(ec, nextDivisor, false) - if err != nil { - return nil, err - } - if len(ssDividend) != len(ssDivisor) { - return nil, fmt.Errorf("divident and divisor must have equal number of series; got %d vs %d series", len(ssDividend), len(ssDivisor)) - } - if stepDivident != stepDivisor { - return nil, fmt.Errorf("step mismatch for divident and divisor: %d vs %d", stepDivident, stepDivisor) - } - for i, s := range ssDividend { - sDivisor := ssDivisor[i] - values := s.Values - divisorValues := sDivisor.Values - for j, v := range values { - values[j] = v / divisorValues[j] - } - s.Name = fmt.Sprintf("divideSeries(%s,%s)", s.Name, sDivisor.Name) - s.expr = fe - s.pathExpression = s.Name - } - return multiSeriesFunc(ssDividend), nil + + return aggregateSeriesList(ec, fe, nextDividend, nextDivisor, func(values []float64) float64 { + return values[0] / values[1] + }, "divide") } // See https://graphite.readthedocs.io/en/stable/functions.html#graphite.render.functions.drawAsInfinite