From 23ab8650359fb35a442c77303a117eebc0d3aa04 Mon Sep 17 00:00:00 2001
From: rbizos <58781501+rbizos@users.noreply.github.com>
Date: Tue, 26 Mar 2024 14:48:58 +0100
Subject: [PATCH] adding AggregateSeriesLists graphite function (#5809)

* adding aggregate series list graphite function

adding also aliases for sum diff and multiply

* Adding tests for aggregateSeriesLists and aliases
---
 app/vmselect/graphite/eval_test.go   |  96 +++++++++++++++++++++
 app/vmselect/graphite/functions.json | 124 ++++++++++++++++++++++++++-
 app/vmselect/graphite/transform.go   | 114 ++++++++++++++++++------
 3 files changed, 307 insertions(+), 27 deletions(-)

diff --git a/app/vmselect/graphite/eval_test.go b/app/vmselect/graphite/eval_test.go
index 4570d83389..0022699c8e 100644
--- a/app/vmselect/graphite/eval_test.go
+++ b/app/vmselect/graphite/eval_test.go
@@ -3279,6 +3279,102 @@ func TestExecExprSuccess(t *testing.T) {
 			Tags:       map[string]string{"name": "foo.bar.baz", "summarize": "45s", "summarizeFunction": "sum"},
 		},
 	})
+	f(`aggregateSeriesLists(
+    summarize(
+               time('foo.bar.baz',10),
+               '45s'
+       ),
+    summarize(
+               time('bar.foo.bad',10),
+               '45s'
+       ), 'sum')`, []*series{
+		{
+			Timestamps: []int64{120000, 165000},
+			Values:     []float64{1170, 2000},
+			Name:       `sumSeries(summarize(foo.bar.baz,'45s','sum'),summarize(bar.foo.bad,'45s','sum'))`,
+			Tags:       map[string]string{"name": "foo.bar.baz", "summarize": "45s", "summarizeFunction": "sum"},
+		},
+	})
+	f(`sumSeriesLists(
+    summarize(
+               time('foo.bar.baz',10),
+               '45s'
+       ),
+    summarize(
+               time('bar.foo.bad',10),
+               '45s'
+       ))`, []*series{
+		{
+			Timestamps: []int64{120000, 165000},
+			Values:     []float64{1170, 2000},
+			Name:       `sumSeries(summarize(foo.bar.baz,'45s','sum'),summarize(bar.foo.bad,'45s','sum'))`,
+			Tags:       map[string]string{"name": "foo.bar.baz", "summarize": "45s", "summarizeFunction": "sum"},
+		},
+	})
+	f(`aggregateSeriesLists(
+    summarize(
+               time('foo.bar.baz',10),
+               '45s'
+       ),
+    summarize(
+               time('bar.foo.bad',10),
+               '45s'
+       ), 'diff')`, []*series{
+		{
+			Timestamps: []int64{120000, 165000},
+			Values:     []float64{0, 0},
+			Name:       `diffSeries(summarize(foo.bar.baz,'45s','sum'),summarize(bar.foo.bad,'45s','sum'))`,
+			Tags:       map[string]string{"name": "foo.bar.baz", "summarize": "45s", "summarizeFunction": "sum"},
+		},
+	})
+	f(`diffSeriesLists(
+    summarize(
+               time('foo.bar.baz',10),
+               '45s'
+       ),
+    summarize(
+               time('bar.foo.bad',10),
+               '45s'
+       ))`, []*series{
+		{
+			Timestamps: []int64{120000, 165000},
+			Values:     []float64{0, 0},
+			Name:       `diffSeries(summarize(foo.bar.baz,'45s','sum'),summarize(bar.foo.bad,'45s','sum'))`,
+			Tags:       map[string]string{"name": "foo.bar.baz", "summarize": "45s", "summarizeFunction": "sum"},
+		},
+	})
+	f(`aggregateSeriesLists(
+    summarize(
+               time('foo.bar.baz',10),
+               '45s'
+       ),
+    summarize(
+               time('bar.foo.bad',10),
+               '45s'
+       ), 'multiply')`, []*series{
+		{
+			Timestamps: []int64{120000, 165000},
+			Values:     []float64{342225, 1e+06},
+			Name:       `multiplySeries(summarize(foo.bar.baz,'45s','sum'),summarize(bar.foo.bad,'45s','sum'))`,
+			Tags:       map[string]string{"name": "foo.bar.baz", "summarize": "45s", "summarizeFunction": "sum"},
+		},
+	})
+	f(`multiplySeriesLists(
+    summarize(
+               time('foo.bar.baz',10),
+               '45s'
+       ),
+    summarize(
+               time('bar.foo.bad',10),
+               '45s'
+       ))`, []*series{
+		{
+			Timestamps: []int64{120000, 165000},
+			Values:     []float64{342225, 1e+06},
+			Name:       `multiplySeries(summarize(foo.bar.baz,'45s','sum'),summarize(bar.foo.bad,'45s','sum'))`,
+			Tags:       map[string]string{"name": "foo.bar.baz", "summarize": "45s", "summarizeFunction": "sum"},
+		},
+	})
 	f(`weightedAverage(
     summarize(
                group(
diff --git a/app/vmselect/graphite/functions.json b/app/vmselect/graphite/functions.json
index 3279a3b7a2..3875276159 100644
--- a/app/vmselect/graphite/functions.json
+++ b/app/vmselect/graphite/functions.json
@@ -40,6 +40,52 @@
       }
     ]
   },
+  "aggregateSeriesLists": {
+    "name": "aggregateSeriesLists",
+    "function": "aggregateSeriesLists(seriesListFirstPos, seriesListSecondPos, func, xFilesFactor=None)",
+    "description": "Iterates over a two lists and aggregates using specified function list1[0] to list2[0], list1[1] to list2[1] and so on. The lists will need to be the same length\n\nPosition of seriesList matters. For example using “sum” function aggregateSeriesLists(list1[0..n], list2[0..n], \"sum\") it would find sum for each member of the list list1[0] + list2[0], list1[1] + list2[1], list1[n] + list2[n].",
+    "module": "graphite.render.functions",
+    "group": "Combine",
+    "params": [
+      {
+        "name": "seriesListFirstPos",
+        "type": "seriesList",
+        "required": true
+      },
+      {
+        "name": "seriesListSecondPos",
+        "type": "seriesList",
+        "required": true
+      },
+      {
+        "name": "func",
+        "type": "aggFunc",
+        "required": true,
+        "options": [
+          "average",
+          "avg",
+          "avg_zero",
+          "count",
+          "current",
+          "diff",
+          "last",
+          "max",
+          "median",
+          "min",
+          "multiply",
+          "range",
+          "rangeOf",
+          "stddev",
+          "sum",
+          "total"
+        ]
+      },
+      {
+        "name": "xFilesFactor",
+        "type": "float"
+      }
+    ]
+  },
   "aggregateWithWildcards": {
     "name": "aggregateWithWildcards",
     "function": "aggregateWithWildcards(seriesList, func, *positions)",
@@ -211,6 +257,25 @@
       }
     ]
   },
+  "DiffSeriesLists": {
+    "name": "DiffSeriesLists",
+    "function": "DiffSeriesLists(seriesListFirstPos, seriesListSecondPos)",
+    "description": "Iterates over a two lists and subtracts series lists 2 through n from series 1 list1[0] to list2[0], list1[1] to list2[1] and so on. The lists will need to be the same length",
+    "module": "graphite.render.functions",
+    "group": "Combine",
+    "params": [
+      {
+        "name": "seriesListFirstPos",
+        "type": "seriesList",
+        "required": true
+      },
+      {
+        "name": "seriesListSecondPos",
+        "type": "seriesList",
+        "required": true
+      }
+    ]
+  },
   "divideSeries": {
     "name": "divideSeries",
     "function": "divideSeries(dividendSeriesList, divisorSeries)",
@@ -529,6 +594,25 @@
       }
     ]
   },
+  "MultiplySeriesLists": {
+    "name": "MultiplySeriesLists",
+    "function": "MultiplySeriesLists(seriesListFirstPos, seriesListSecondPos)",
+    "description": "Iterates over a two lists and multiply series lists 2 through n from series 1 list1[0] to list2[0], list1[1] to list2[1] and so on. The lists will need to be the same length",
+    "module": "graphite.render.functions",
+    "group": "Combine",
+    "params": [
+      {
+        "name": "seriesListFirstPos",
+        "type": "seriesList",
+        "required": true
+      },
+      {
+        "name": "seriesListSecondPos",
+        "type": "seriesList",
+        "required": true
+      }
+    ]
+  },
   "multiplySeriesWithWildcards": {
     "name": "multiplySeriesWithWildcards",
     "function": "multiplySeriesWithWildcards(seriesList, *position)",
@@ -715,6 +799,26 @@
       }
     ]
   },
+  "SumSeriesLists": {
+    "name": "SumSeriesLists",
+    "function": "sumSeriesLists(seriesListFirstPos, seriesListSecondPos)",
+    "description": "Iterates over a two lists and sums series lists 2 through n from series 1 list1[0] to list2[0], list1[1] to list2[1] and so on. The lists will need to be the same length",
+    "module": "graphite.render.functions",
+    "group": "Combine",
+    "params": [
+      {
+        "name": "seriesListFirstPos",
+        "type": "seriesList",
+        "required": true
+      },
+      {
+        "name": "seriesListSecondPos",
+        "type": "seriesList",
+        "required": true
+      }
+    ]
+  },
+
   "sumSeriesWithWildcards": {
     "name": "sumSeriesWithWildcards",
     "function": "sumSeriesWithWildcards(seriesList, *position)",
@@ -728,7 +832,25 @@
         "required": true
       },
       {
-        "name": "position",
+        "name": "position",  "SumSeriesLists": {
+        "name": "SumSeriesLists",
+        "function": "sumSeriesLists(seriesListFirstPos, seriesListSecondPos)",
+        "description": "Iterates over a two lists and sums series lists 2 through n from series 1 list1[0] to list2[0], list1[1] to list2[1] and so on. The lists will need to be the same length",
+        "module": "graphite.render.functions",
+        "group": "Combine",
+        "params": [
+          {
+            "name": "seriesListFirstPos",
+            "type": "seriesList",
+            "required": true
+          },
+          {
+            "name": "seriesListSecondPos",
+            "type": "seriesList",
+            "required": true
+          }
+        ]
+      },
         "type": "node",
         "multiple": true
       }
diff --git a/app/vmselect/graphite/transform.go b/app/vmselect/graphite/transform.go
index 6d25d0394b..dc16aba044 100644
--- a/app/vmselect/graphite/transform.go
+++ b/app/vmselect/graphite/transform.go
@@ -36,6 +36,7 @@ func init() {
 		"add":                         transformAdd,
 		"aggregate":                   transformAggregate,
 		"aggregateLine":               transformAggregateLine,
+		"aggregateSeriesLists":        transformAggregateSeriesLists,
 		"aggregateWithWildcards":      transformAggregateWithWildcards,
 		"alias":                       transformAlias,
 		"aliasByMetric":               transformAliasByMetric,
@@ -66,6 +67,7 @@ func init() {
 		"delay":                       transformDelay,
 		"derivative":                  transformDerivative,
 		"diffSeries":                  transformDiffSeries,
+		"diffSeriesLists":             transformDiffSeriesLists,
 		"divideSeries":                transformDivideSeries,
 		"divideSeriesLists":           transformDivideSeriesLists,
 		"drawAsInfinite":              transformDrawAsInfinite,
@@ -125,6 +127,7 @@ func init() {
 		"movingSum":                   transformMovingSum,
 		"movingWindow":                transformMovingWindow,
 		"multiplySeries":              transformMultiplySeries,
+		"multiplySeriesLists":         transformMultiplySeriesLists,
 		"multiplySeriesWithWildcards": transformMultiplySeriesWithWildcards,
 		"nPercentile":                 transformNPercentile,
 		"nonNegativeDerivative":       transformNonNegativeDerivative,
@@ -172,6 +175,7 @@ func init() {
 		"substr":                  transformSubstr,
 		"sum":                     transformSumSeries,
 		"sumSeries":               transformSumSeries,
+		"sumSeriesLists":          transformSumSeriesLists,
 		"sumSeriesWithWildcards":  transformSumSeriesWithWildcards,
 		"summarize":               transformSummarize,
 		"threshold":               transformThreshold,
@@ -1316,6 +1320,86 @@ func transformDivideSeries(ec *evalConfig, fe *graphiteql.FuncExpr) (nextSeriesF
 	return f, nil
 }
 
+func aggregateSeriesListsGeneric(ec *evalConfig, fe *graphiteql.FuncExpr, funcName string) (nextSeriesFunc, error) {
+	args := fe.Args
+	agg, err := getAggrFunc(funcName)
+	if err != nil {
+		return nil, err
+	}
+	nextSeriesFirst, err := evalSeriesList(ec, args, "seriesListFirstPos", 0)
+	if err != nil {
+		return nil, err
+	}
+	nextSeriesSecond, err := evalSeriesList(ec, args, "seriesListSecondPos", 1)
+	if err != nil {
+		return nil, err
+	}
+	return aggregateSeriesList(ec, fe, nextSeriesFirst, nextSeriesSecond, agg, funcName)
+}
+
+// See https://graphite.readthedocs.io/en/latest/functions.html#graphite.render.functions.aggregateSeriesLists
+func transformAggregateSeriesLists(ec *evalConfig, fe *graphiteql.FuncExpr) (nextSeriesFunc, error) {
+	args := fe.Args
+	if len(args) != 3 && len(args) != 4 {
+		return nil, fmt.Errorf("unexpected number of args; got %d; want 3 or 4", len(args))
+	}
+
+	funcName, err := getString(args, "func", 2)
+	if err != nil {
+		return nil, err
+	}
+
+	return aggregateSeriesListsGeneric(ec, fe, funcName)
+}
+
+// See https://graphite.readthedocs.io/en/latest/functions.html#graphite.render.functions.sumSeriesLists
+func transformSumSeriesLists(ec *evalConfig, fe *graphiteql.FuncExpr) (nextSeriesFunc, error) {
+	return aggregateSeriesListsGeneric(ec, fe, "sum")
+}
+
+// See https://graphite.readthedocs.io/en/latest/functions.html#graphite.render.functions.multiplySeriesLists
+func transformMultiplySeriesLists(ec *evalConfig, fe *graphiteql.FuncExpr) (nextSeriesFunc, error) {
+	return aggregateSeriesListsGeneric(ec, fe, "multiply")
+}
+
+// See https://graphite.readthedocs.io/en/latest/functions.html#graphite.render.functions.diffSeriesLists
+func transformDiffSeriesLists(ec *evalConfig, fe *graphiteql.FuncExpr) (nextSeriesFunc, error) {
+	return aggregateSeriesListsGeneric(ec, fe, "diff")
+}
+
+func aggregateSeriesList(ec *evalConfig, fe *graphiteql.FuncExpr, nextSeriesFirst, nextSeriesSecond nextSeriesFunc, agg aggrFunc, funcName string) (nextSeriesFunc, error) {
+	ssFirst, stepFirst, err := fetchNormalizedSeries(ec, nextSeriesFirst, false)
+	if err != nil {
+		return nil, err
+	}
+	ssSecond, stepSecond, err := fetchNormalizedSeries(ec, nextSeriesSecond, false)
+	if err != nil {
+		return nil, err
+	}
+
+	if len(ssFirst) != len(ssSecond) {
+		return nil, fmt.Errorf("First and second lists must have equal number of series; got %d vs %d series", len(ssFirst), len(ssSecond))
+	}
+	if stepFirst != stepSecond {
+		return nil, fmt.Errorf("step mismatch for first and second: %d vs %d", stepFirst, stepSecond)
+	}
+
+	valuePair := make([]float64, 2)
+	for i, s := range ssFirst {
+		sSecond := ssSecond[i]
+		values := s.Values
+		secondValues := sSecond.Values
+		for j, v := range values {
+			valuePair[0], valuePair[1] = v, secondValues[j]
+			values[j] = agg(valuePair)
+		}
+		s.Name = fmt.Sprintf("%sSeries(%s,%s)", funcName, s.Name, sSecond.Name)
+		s.expr = fe
+		s.pathExpression = s.Name
+	}
+	return multiSeriesFunc(ssFirst), nil
+}
+
 // See https://graphite.readthedocs.io/en/stable/functions.html#graphite.render.functions.divideSeriesLists
 func transformDivideSeriesLists(ec *evalConfig, fe *graphiteql.FuncExpr) (nextSeriesFunc, error) {
 	args := fe.Args
@@ -1326,36 +1410,14 @@ func transformDivideSeriesLists(ec *evalConfig, fe *graphiteql.FuncExpr) (nextSe
 	if err != nil {
 		return nil, err
 	}
-	ssDividend, stepDivident, err := fetchNormalizedSeries(ec, nextDividend, false)
-	if err != nil {
-		return nil, err
-	}
 	nextDivisor, err := evalSeriesList(ec, args, "divisorSeriesList", 1)
 	if err != nil {
 		return nil, err
 	}
-	ssDivisor, stepDivisor, err := fetchNormalizedSeries(ec, nextDivisor, false)
-	if err != nil {
-		return nil, err
-	}
-	if len(ssDividend) != len(ssDivisor) {
-		return nil, fmt.Errorf("divident and divisor must have equal number of series; got %d vs %d series", len(ssDividend), len(ssDivisor))
-	}
-	if stepDivident != stepDivisor {
-		return nil, fmt.Errorf("step mismatch for divident and divisor: %d vs %d", stepDivident, stepDivisor)
-	}
-	for i, s := range ssDividend {
-		sDivisor := ssDivisor[i]
-		values := s.Values
-		divisorValues := sDivisor.Values
-		for j, v := range values {
-			values[j] = v / divisorValues[j]
-		}
-		s.Name = fmt.Sprintf("divideSeries(%s,%s)", s.Name, sDivisor.Name)
-		s.expr = fe
-		s.pathExpression = s.Name
-	}
-	return multiSeriesFunc(ssDividend), nil
+
+	return aggregateSeriesList(ec, fe, nextDividend, nextDivisor, func(values []float64) float64 {
+		return values[0] / values[1]
+	}, "divide")
 }
 
 // See https://graphite.readthedocs.io/en/stable/functions.html#graphite.render.functions.drawAsInfinite