app/vmselect: add candlestick(m[d]) func for returning open, close, low and high rollups on the given time range d

This function is frequently used in financial apps. See https://en.wikipedia.org/wiki/Candlestick_chart
This commit is contained in:
Aliaksandr Valialkin 2019-06-27 18:45:45 +03:00
parent 0c8a09c8e1
commit 796b010139
3 changed files with 49 additions and 0 deletions

View file

@ -406,6 +406,7 @@ func evalRollupFuncWithSubquery(ec *EvalConfig, name string, rf rollupFunc, re *
ts.Values = rc.Do(ts.Values[:0], values, timestamps)
ts.Timestamps = sharedTimestamps
ts.denyReuse = true
tssLock.Lock()
tss = append(tss, &ts)
tssLock.Unlock()
@ -618,6 +619,11 @@ func getRollupConfigs(name string, rf rollupFunc, start, end, step, window int64
deltaValues(values)
}
rcs = appendRollupConfigs(rcs)
case "candlestick":
rcs = append(rcs, newRollupConfig(rollupFirst, "open"))
rcs = append(rcs, newRollupConfig(rollupLast, "close"))
rcs = append(rcs, newRollupConfig(rollupMin, "low"))
rcs = append(rcs, newRollupConfig(rollupMax, "high"))
default:
rcs = append(rcs, newRollupConfig(rf, ""))
}

View file

@ -3203,6 +3203,48 @@ func TestExecSuccess(t *testing.T) {
resultExpected := []netstorage.Result{r}
f(q, resultExpected)
})
t.Run(`candlestick()`, func(t *testing.T) {
t.Parallel()
q := `sort(candlestick(round(rand(0),0.01)[:10s]))`
r1 := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{0.02, 0.02, 0.03, 0, 0.03, 0.02},
Timestamps: timestampsExpected,
}
r1.MetricName.Tags = []storage.Tag{{
Key: []byte("rollup"),
Value: []byte("low"),
}}
r2 := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{0.32, 0.82, 0.13, 0.28, 0.86, 0.57},
Timestamps: timestampsExpected,
}
r2.MetricName.Tags = []storage.Tag{{
Key: []byte("rollup"),
Value: []byte("close"),
}}
r3 := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{0.9, 0.32, 0.82, 0.13, 0.28, 0.86},
Timestamps: timestampsExpected,
}
r3.MetricName.Tags = []storage.Tag{{
Key: []byte("rollup"),
Value: []byte("open"),
}}
r4 := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{0.85, 0.94, 0.97, 0.93, 0.98, 0.92},
Timestamps: timestampsExpected,
}
r4.MetricName.Tags = []storage.Tag{{
Key: []byte("rollup"),
Value: []byte("high"),
}}
resultExpected := []netstorage.Result{r1, r2, r3, r4}
f(q, resultExpected)
})
t.Run(`rollup_increase()`, func(t *testing.T) {
t.Parallel()
q := `sort(rollup_increase(time()))`

View file

@ -50,6 +50,7 @@ var rollupFuncs = map[string]newRollupFunc{
"rollup_deriv": newRollupFuncOneArg(rollupFake),
"rollup_delta": newRollupFuncOneArg(rollupFake),
"rollup_increase": newRollupFuncOneArg(rollupFake), // + rollupFuncsRemoveCounterResets
"candlestick": newRollupFuncOneArg(rollupFake),
}
var rollupFuncsMayAdjustWindow = map[string]bool{