From 538fdfe1332e6df8dcb109301b44504576182e3e Mon Sep 17 00:00:00 2001
From: Aliaksandr Valialkin <valyala@gmail.com>
Date: Tue, 19 May 2020 16:10:52 +0300
Subject: [PATCH] app/vmselect/promql: move common code from aggrFuncOutliersK
 and newAggrFuncRangeTopK into getRangeTopKTimeseries

---
 app/vmselect/promql/aggr.go | 91 +++++++++++++++----------------------
 1 file changed, 36 insertions(+), 55 deletions(-)

diff --git a/app/vmselect/promql/aggr.go b/app/vmselect/promql/aggr.go
index 314916c4f6..f53fd9f4bb 100644
--- a/app/vmselect/promql/aggr.go
+++ b/app/vmselect/promql/aggr.go
@@ -484,11 +484,6 @@ func newAggrFuncTopK(isReverse bool) aggrFunc {
 	}
 }
 
-type tsWithValue struct {
-	ts    *timeseries
-	value float64
-}
-
 func newAggrFuncRangeTopK(f func(values []float64) float64, isReverse bool) aggrFunc {
 	return func(afa *aggrFuncArg) ([]*timeseries, error) {
 		args := afa.args
@@ -500,34 +495,42 @@ func newAggrFuncRangeTopK(f func(values []float64) float64, isReverse bool) aggr
 			return nil, err
 		}
 		afe := func(tss []*timeseries) []*timeseries {
-			maxs := make([]tsWithValue, len(tss))
-			for i, ts := range tss {
-				value := f(ts.Values)
-				maxs[i] = tsWithValue{
-					ts:    ts,
-					value: value,
-				}
-			}
-			sort.Slice(maxs, func(i, j int) bool {
-				a := maxs[i].value
-				b := maxs[j].value
-				if isReverse {
-					a, b = b, a
-				}
-				return lessWithNaNs(a, b)
-			})
-			for i := range maxs {
-				tss[i] = maxs[i].ts
-			}
-			for i, k := range ks {
-				fillNaNsAtIdx(i, k, tss)
-			}
-			return removeNaNs(tss)
+			return getRangeTopKTimeseries(tss, ks, f, isReverse)
 		}
 		return aggrFuncExt(afe, args[1], &afa.ae.Modifier, afa.ae.Limit, true)
 	}
 }
 
+func getRangeTopKTimeseries(tss []*timeseries, ks []float64, f func(values []float64) float64, isReverse bool) []*timeseries {
+	type tsWithValue struct {
+		ts    *timeseries
+		value float64
+	}
+	maxs := make([]tsWithValue, len(tss))
+	for i, ts := range tss {
+		value := f(ts.Values)
+		maxs[i] = tsWithValue{
+			ts:    ts,
+			value: value,
+		}
+	}
+	sort.Slice(maxs, func(i, j int) bool {
+		a := maxs[i].value
+		b := maxs[j].value
+		if isReverse {
+			a, b = b, a
+		}
+		return lessWithNaNs(a, b)
+	})
+	for i := range maxs {
+		tss[i] = maxs[i].ts
+	}
+	for i, k := range ks {
+		fillNaNsAtIdx(i, k, tss)
+	}
+	return removeNaNs(tss)
+}
+
 func fillNaNsAtIdx(idx int, k float64, tss []*timeseries) {
 	if math.IsNaN(k) {
 		k = 0
@@ -623,38 +626,16 @@ func aggrFuncOutliersK(afa *aggrFuncArg) ([]*timeseries, error) {
 		}
 		histogram.PutFast(h)
 
-		// Calculate variation-like value for each tss.
-		type variation struct {
-			sum2 float64
-			ts   *timeseries
-		}
-		variations := make([]variation, len(tss))
-		for i, ts := range tss {
+		// Return topK time series with the highest variance from median.
+		f := func(values []float64) float64 {
 			sum2 := float64(0)
-			for n, v := range ts.Values {
+			for n, v := range values {
 				d := v - medians[n]
 				sum2 += d * d
 			}
-			variations[i] = variation{
-				sum2: sum2,
-				ts:   ts,
-			}
+			return sum2
 		}
-
-		// Sort variations by sum2.
-		sort.Slice(variations, func(i, j int) bool {
-			a, b := variations[i], variations[j]
-			return lessWithNaNs(a.sum2, b.sum2)
-		})
-
-		// Return only up to k time series with the highest variation.
-		for i := range variations {
-			tss[i] = variations[i].ts
-		}
-		for i, k := range ks {
-			fillNaNsAtIdx(i, k, tss)
-		}
-		return removeNaNs(tss)
+		return getRangeTopKTimeseries(tss, ks, f, false)
 	}
 	return aggrFuncExt(afe, args[1], &afa.ae.Modifier, afa.ae.Limit, true)
 }