app/vmselect/promql: adjust rollup_candlestick calculations to the exepcted results

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/309
This commit is contained in:
Aliaksandr Valialkin 2020-02-04 22:42:10 +02:00
parent 8b360a25e9
commit 4f7116d1ee
2 changed files with 77 additions and 9 deletions

View file

@ -4708,25 +4708,25 @@ func TestExecSuccess(t *testing.T) {
}}
r2 := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{0.85, 0.15, 0.43, 0.76, 0.47, 0.21},
Values: []float64{0.1, 0.04, 0.49, 0.46, 0.57, 0.92},
Timestamps: timestampsExpected,
}
r2.MetricName.Tags = []storage.Tag{{
Key: []byte("rollup"),
Value: []byte("open"),
Value: []byte("close"),
}}
r3 := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{0.32, 0.82, 0.13, 0.28, 0.86, 0.57},
Values: []float64{0.9, 0.32, 0.82, 0.13, 0.28, 0.86},
Timestamps: timestampsExpected,
}
r3.MetricName.Tags = []storage.Tag{{
Key: []byte("rollup"),
Value: []byte("close"),
Value: []byte("open"),
}}
r4 := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{0.85, 0.94, 0.97, 0.93, 0.98, 0.92},
Values: []float64{0.9, 0.94, 0.97, 0.93, 0.98, 0.92},
Timestamps: timestampsExpected,
}
r4.MetricName.Tags = []storage.Tag{{

View file

@ -259,10 +259,10 @@ func getRollupConfigs(name string, rf rollupFunc, expr metricsql.Expr, start, en
}
rcs = appendRollupConfigs(rcs)
case "rollup_candlestick":
rcs = append(rcs, newRollupConfig(rollupFirst, "open"))
rcs = append(rcs, newRollupConfig(rollupLast, "close"))
rcs = append(rcs, newRollupConfig(rollupMin, "low"))
rcs = append(rcs, newRollupConfig(rollupMax, "high"))
rcs = append(rcs, newRollupConfig(rollupOpen, "open"))
rcs = append(rcs, newRollupConfig(rollupClose, "close"))
rcs = append(rcs, newRollupConfig(rollupLow, "low"))
rcs = append(rcs, newRollupConfig(rollupHigh, "high"))
case "aggr_over_time":
aggrFuncNames, err := getRollupAggrFuncNames(expr)
if err != nil {
@ -1419,6 +1419,74 @@ func rollupResets(rfa *rollupFuncArg) float64 {
return float64(n)
}
// getCandlestickValues returns a subset of rfa.values suitable for rollup_candlestick
//
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/309 for details.
func getCandlestickValues(rfa *rollupFuncArg) []float64 {
currTimestamp := rfa.currTimestamp
timestamps := rfa.timestamps
for len(timestamps) > 0 && timestamps[len(timestamps)-1] >= currTimestamp {
timestamps = timestamps[:len(timestamps)-1]
}
if len(timestamps) == 0 {
return nil
}
return rfa.values[:len(timestamps)]
}
func rollupOpen(rfa *rollupFuncArg) float64 {
if !math.IsNaN(rfa.prevValue) {
return rfa.prevValue
}
values := getCandlestickValues(rfa)
if len(values) == 0 {
return nan
}
return values[0]
}
func rollupClose(rfa *rollupFuncArg) float64 {
values := getCandlestickValues(rfa)
if len(values) == 0 {
return rfa.prevValue
}
return values[len(values)-1]
}
func rollupHigh(rfa *rollupFuncArg) float64 {
values := getCandlestickValues(rfa)
max := rfa.prevValue
if math.IsNaN(max) {
if len(values) == 0 {
return nan
}
max = values[0]
}
for _, v := range values {
if v > max {
max = v
}
}
return max
}
func rollupLow(rfa *rollupFuncArg) float64 {
values := getCandlestickValues(rfa)
min := rfa.prevValue
if math.IsNaN(min) {
if len(values) == 0 {
return nan
}
min = values[0]
}
for _, v := range values {
if v < min {
min = v
}
}
return min
}
func rollupFirst(rfa *rollupFuncArg) float64 {
// There is no need in handling NaNs here, since they must be cleaned up
// before calling rollup funcs.