From ada6a3da8d8f97d3f130a217ad6f27cb62578910 Mon Sep 17 00:00:00 2001
From: Aliaksandr Valialkin <valyala@gmail.com>
Date: Tue, 4 Feb 2020 22:42:10 +0200
Subject: [PATCH] app/vmselect/promql: adjust rollup_candlestick calculations
 to the exepcted results

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/309
---
 app/vmselect/promql/exec_test.go | 10 ++---
 app/vmselect/promql/rollup.go    | 76 ++++++++++++++++++++++++++++++--
 2 files changed, 77 insertions(+), 9 deletions(-)

diff --git a/app/vmselect/promql/exec_test.go b/app/vmselect/promql/exec_test.go
index ea793e2618..bc462a3d13 100644
--- a/app/vmselect/promql/exec_test.go
+++ b/app/vmselect/promql/exec_test.go
@@ -4698,25 +4698,25 @@ func TestExecSuccess(t *testing.T) {
 		}}
 		r2 := netstorage.Result{
 			MetricName: metricNameExpected,
-			Values:     []float64{0.85, 0.15, 0.43, 0.76, 0.47, 0.21},
+			Values:     []float64{0.1, 0.04, 0.49, 0.46, 0.57, 0.92},
 			Timestamps: timestampsExpected,
 		}
 		r2.MetricName.Tags = []storage.Tag{{
 			Key:   []byte("rollup"),
-			Value: []byte("open"),
+			Value: []byte("close"),
 		}}
 		r3 := netstorage.Result{
 			MetricName: metricNameExpected,
-			Values:     []float64{0.32, 0.82, 0.13, 0.28, 0.86, 0.57},
+			Values:     []float64{0.9, 0.32, 0.82, 0.13, 0.28, 0.86},
 			Timestamps: timestampsExpected,
 		}
 		r3.MetricName.Tags = []storage.Tag{{
 			Key:   []byte("rollup"),
-			Value: []byte("close"),
+			Value: []byte("open"),
 		}}
 		r4 := netstorage.Result{
 			MetricName: metricNameExpected,
-			Values:     []float64{0.85, 0.94, 0.97, 0.93, 0.98, 0.92},
+			Values:     []float64{0.9, 0.94, 0.97, 0.93, 0.98, 0.92},
 			Timestamps: timestampsExpected,
 		}
 		r4.MetricName.Tags = []storage.Tag{{
diff --git a/app/vmselect/promql/rollup.go b/app/vmselect/promql/rollup.go
index 0175b734cc..ca9a988ec4 100644
--- a/app/vmselect/promql/rollup.go
+++ b/app/vmselect/promql/rollup.go
@@ -259,10 +259,10 @@ func getRollupConfigs(name string, rf rollupFunc, expr metricsql.Expr, start, en
 		}
 		rcs = appendRollupConfigs(rcs)
 	case "rollup_candlestick":
-		rcs = append(rcs, newRollupConfig(rollupFirst, "open"))
-		rcs = append(rcs, newRollupConfig(rollupLast, "close"))
-		rcs = append(rcs, newRollupConfig(rollupMin, "low"))
-		rcs = append(rcs, newRollupConfig(rollupMax, "high"))
+		rcs = append(rcs, newRollupConfig(rollupOpen, "open"))
+		rcs = append(rcs, newRollupConfig(rollupClose, "close"))
+		rcs = append(rcs, newRollupConfig(rollupLow, "low"))
+		rcs = append(rcs, newRollupConfig(rollupHigh, "high"))
 	case "aggr_over_time":
 		aggrFuncNames, err := getRollupAggrFuncNames(expr)
 		if err != nil {
@@ -1419,6 +1419,74 @@ func rollupResets(rfa *rollupFuncArg) float64 {
 	return float64(n)
 }
 
+// getCandlestickValues returns a subset of rfa.values suitable for rollup_candlestick
+//
+// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/309 for details.
+func getCandlestickValues(rfa *rollupFuncArg) []float64 {
+	currTimestamp := rfa.currTimestamp
+	timestamps := rfa.timestamps
+	for len(timestamps) > 0 && timestamps[len(timestamps)-1] >= currTimestamp {
+		timestamps = timestamps[:len(timestamps)-1]
+	}
+	if len(timestamps) == 0 {
+		return nil
+	}
+	return rfa.values[:len(timestamps)]
+}
+
+func rollupOpen(rfa *rollupFuncArg) float64 {
+	if !math.IsNaN(rfa.prevValue) {
+		return rfa.prevValue
+	}
+	values := getCandlestickValues(rfa)
+	if len(values) == 0 {
+		return nan
+	}
+	return values[0]
+}
+
+func rollupClose(rfa *rollupFuncArg) float64 {
+	values := getCandlestickValues(rfa)
+	if len(values) == 0 {
+		return rfa.prevValue
+	}
+	return values[len(values)-1]
+}
+
+func rollupHigh(rfa *rollupFuncArg) float64 {
+	values := getCandlestickValues(rfa)
+	max := rfa.prevValue
+	if math.IsNaN(max) {
+		if len(values) == 0 {
+			return nan
+		}
+		max = values[0]
+	}
+	for _, v := range values {
+		if v > max {
+			max = v
+		}
+	}
+	return max
+}
+
+func rollupLow(rfa *rollupFuncArg) float64 {
+	values := getCandlestickValues(rfa)
+	min := rfa.prevValue
+	if math.IsNaN(min) {
+		if len(values) == 0 {
+			return nan
+		}
+		min = values[0]
+	}
+	for _, v := range values {
+		if v < min {
+			min = v
+		}
+	}
+	return min
+}
+
 func rollupFirst(rfa *rollupFuncArg) float64 {
 	// There is no need in handling NaNs here, since they must be cleaned up
 	// before calling rollup funcs.