From 796b010139bc3b19c6e6220a38d363bc88c4365d Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Thu, 27 Jun 2019 18:45:45 +0300 Subject: [PATCH] app/vmselect: add `candlestick(m[d])` func for returning `open`, `close`, `low` and `high` rollups on the given time range `d` This function is frequently used in financial apps. See https://en.wikipedia.org/wiki/Candlestick_chart --- app/vmselect/promql/eval.go | 6 +++++ app/vmselect/promql/exec_test.go | 42 ++++++++++++++++++++++++++++++++ app/vmselect/promql/rollup.go | 1 + 3 files changed, 49 insertions(+) diff --git a/app/vmselect/promql/eval.go b/app/vmselect/promql/eval.go index 68d7c2485..c7d49bb24 100644 --- a/app/vmselect/promql/eval.go +++ b/app/vmselect/promql/eval.go @@ -406,6 +406,7 @@ func evalRollupFuncWithSubquery(ec *EvalConfig, name string, rf rollupFunc, re * ts.Values = rc.Do(ts.Values[:0], values, timestamps) ts.Timestamps = sharedTimestamps ts.denyReuse = true + tssLock.Lock() tss = append(tss, &ts) tssLock.Unlock() @@ -618,6 +619,11 @@ func getRollupConfigs(name string, rf rollupFunc, start, end, step, window int64 deltaValues(values) } rcs = appendRollupConfigs(rcs) + case "candlestick": + rcs = append(rcs, newRollupConfig(rollupFirst, "open")) + rcs = append(rcs, newRollupConfig(rollupLast, "close")) + rcs = append(rcs, newRollupConfig(rollupMin, "low")) + rcs = append(rcs, newRollupConfig(rollupMax, "high")) default: rcs = append(rcs, newRollupConfig(rf, "")) } diff --git a/app/vmselect/promql/exec_test.go b/app/vmselect/promql/exec_test.go index 50e4a7b5f..40e63ca31 100644 --- a/app/vmselect/promql/exec_test.go +++ b/app/vmselect/promql/exec_test.go @@ -3203,6 +3203,48 @@ func TestExecSuccess(t *testing.T) { resultExpected := []netstorage.Result{r} f(q, resultExpected) }) + t.Run(`candlestick()`, func(t *testing.T) { + t.Parallel() + q := `sort(candlestick(round(rand(0),0.01)[:10s]))` + r1 := netstorage.Result{ + MetricName: metricNameExpected, + Values: []float64{0.02, 0.02, 0.03, 0, 0.03, 0.02}, + Timestamps: timestampsExpected, + } + r1.MetricName.Tags = []storage.Tag{{ + Key: []byte("rollup"), + Value: []byte("low"), + }} + r2 := netstorage.Result{ + MetricName: metricNameExpected, + Values: []float64{0.32, 0.82, 0.13, 0.28, 0.86, 0.57}, + Timestamps: timestampsExpected, + } + r2.MetricName.Tags = []storage.Tag{{ + Key: []byte("rollup"), + Value: []byte("close"), + }} + r3 := netstorage.Result{ + MetricName: metricNameExpected, + Values: []float64{0.9, 0.32, 0.82, 0.13, 0.28, 0.86}, + Timestamps: timestampsExpected, + } + r3.MetricName.Tags = []storage.Tag{{ + Key: []byte("rollup"), + Value: []byte("open"), + }} + r4 := netstorage.Result{ + MetricName: metricNameExpected, + Values: []float64{0.85, 0.94, 0.97, 0.93, 0.98, 0.92}, + Timestamps: timestampsExpected, + } + r4.MetricName.Tags = []storage.Tag{{ + Key: []byte("rollup"), + Value: []byte("high"), + }} + resultExpected := []netstorage.Result{r1, r2, r3, r4} + f(q, resultExpected) + }) t.Run(`rollup_increase()`, func(t *testing.T) { t.Parallel() q := `sort(rollup_increase(time()))` diff --git a/app/vmselect/promql/rollup.go b/app/vmselect/promql/rollup.go index 1a866d2b6..f0d707bd5 100644 --- a/app/vmselect/promql/rollup.go +++ b/app/vmselect/promql/rollup.go @@ -50,6 +50,7 @@ var rollupFuncs = map[string]newRollupFunc{ "rollup_deriv": newRollupFuncOneArg(rollupFake), "rollup_delta": newRollupFuncOneArg(rollupFake), "rollup_increase": newRollupFuncOneArg(rollupFake), // + rollupFuncsRemoveCounterResets + "candlestick": newRollupFuncOneArg(rollupFake), } var rollupFuncsMayAdjustWindow = map[string]bool{