From 3490160fd0b4243f51b33264940bf7d797ab9b38 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin <valyala@gmail.com> Date: Thu, 3 Sep 2020 13:21:51 +0300 Subject: [PATCH] app/vmselect: unconditionally align time range boundaries to step for subqueries as Prometheus does --- app/vmselect/promql/eval.go | 25 +++++++++++-------- app/vmselect/promql/exec_test.go | 42 ++++++++++++++++---------------- 2 files changed, 36 insertions(+), 31 deletions(-) diff --git a/app/vmselect/promql/eval.go b/app/vmselect/promql/eval.go index 4e94775052..2a49ae3653 100644 --- a/app/vmselect/promql/eval.go +++ b/app/vmselect/promql/eval.go @@ -57,14 +57,7 @@ func AdjustStartEnd(start, end, step int64) (int64, int64) { // Round start and end to values divisible by step in order // to enable response caching (see EvalConfig.mayCache). - - // Round start to the nearest smaller value divisible by step. - start -= start % step - // Round end to the nearest bigger value divisible by step. - adjust := end % step - if adjust > 0 { - end += step - adjust - } + start, end = alignStartEnd(start, end, step) // Make sure that the new number of points is the same as the initial number of points. newPoints := (end-start)/step + 1 @@ -76,6 +69,17 @@ func AdjustStartEnd(start, end, step int64) (int64, int64) { return start, end } +func alignStartEnd(start, end, step int64) (int64, int64) { + // Round start to the nearest smaller value divisible by step. + start -= start % step + // Round end to the nearest bigger value divisible by step. + adjust := end % step + if adjust > 0 { + end += step - adjust + } + return start, end +} + // EvalConfig is the configuration required for query evaluation via Exec type EvalConfig struct { AuthToken *auth.Token @@ -507,11 +511,13 @@ func evalRollupFuncWithSubquery(ec *EvalConfig, name string, rf rollupFunc, expr ecSQ := newEvalConfig(ec) ecSQ.Start -= window + maxSilenceInterval + step + ecSQ.End += step ecSQ.Step = step if err := ValidateMaxPointsPerTimeseries(ecSQ.Start, ecSQ.End, ecSQ.Step); err != nil { return nil, err } - ecSQ.Start, ecSQ.End = AdjustStartEnd(ecSQ.Start, ecSQ.End, ecSQ.Step) + // unconditionally align start and end args to step for subquery as Prometheus does. + ecSQ.Start, ecSQ.End = alignStartEnd(ecSQ.Start, ecSQ.End, ecSQ.Step) tssSQ, err := evalExpr(ecSQ, re.Expr) if err != nil { return nil, err @@ -523,7 +529,6 @@ func evalRollupFuncWithSubquery(ec *EvalConfig, name string, rf rollupFunc, expr } return nil, nil } - sharedTimestamps := getTimestamps(ec.Start, ec.End, ec.Step) preFunc, rcs, err := getRollupConfigs(name, rf, expr, ec.Start, ec.End, ec.Step, window, ec.LookbackDelta, sharedTimestamps) if err != nil { diff --git a/app/vmselect/promql/exec_test.go b/app/vmselect/promql/exec_test.go index d17c8ef699..468034c46c 100644 --- a/app/vmselect/promql/exec_test.go +++ b/app/vmselect/promql/exec_test.go @@ -118,7 +118,7 @@ func TestExecSuccess(t *testing.T) { q := `time() offset 0s` r := netstorage.Result{ MetricName: metricNameExpected, - Values: []float64{900, 1100, 1300, 1500, 1700, 1900}, + Values: []float64{1000, 1200, 1400, 1600, 1800, 2000}, Timestamps: timestampsExpected, } resultExpected := []netstorage.Result{r} @@ -129,7 +129,7 @@ func TestExecSuccess(t *testing.T) { q := `sort((label_set(time(), "foo", "bar"), label_set(time()+10, "foo", "baz")) offset 0s)` r1 := netstorage.Result{ MetricName: metricNameExpected, - Values: []float64{900, 1100, 1300, 1500, 1700, 1900}, + Values: []float64{1000, 1200, 1400, 1600, 1800, 2000}, Timestamps: timestampsExpected, } r1.MetricName.Tags = []storage.Tag{{ @@ -138,7 +138,7 @@ func TestExecSuccess(t *testing.T) { }} r2 := netstorage.Result{ MetricName: metricNameExpected, - Values: []float64{910, 1110, 1310, 1510, 1710, 1910}, + Values: []float64{1010, 1210, 1410, 1610, 1810, 2010}, Timestamps: timestampsExpected, } r2.MetricName.Tags = []storage.Tag{{ @@ -219,7 +219,7 @@ func TestExecSuccess(t *testing.T) { }} r2 := netstorage.Result{ MetricName: metricNameExpected, - Values: []float64{860, 1060, 1260, 1460, 1660, 1860}, + Values: []float64{810, 1010, 1210, 1410, 1610, 1810}, Timestamps: timestampsExpected, } r2.MetricName.Tags = []storage.Tag{{ @@ -234,7 +234,7 @@ func TestExecSuccess(t *testing.T) { q := `sort((label_set(time() offset 100s, "foo", "bar"), label_set(time()+10, "foo", "baz") offset 50s) offset 400s)` r1 := netstorage.Result{ MetricName: metricNameExpected, - Values: []float64{300, 500, 700, 900, 1100, 1300}, + Values: []float64{400, 600, 800, 1000, 1200, 1400}, Timestamps: timestampsExpected, } r1.MetricName.Tags = []storage.Tag{{ @@ -243,7 +243,7 @@ func TestExecSuccess(t *testing.T) { }} r2 := netstorage.Result{ MetricName: metricNameExpected, - Values: []float64{360, 560, 760, 960, 1160, 1360}, + Values: []float64{410, 610, 810, 1010, 1210, 1410}, Timestamps: timestampsExpected, } r2.MetricName.Tags = []storage.Tag{{ @@ -258,21 +258,21 @@ func TestExecSuccess(t *testing.T) { q := `sort((label_set(time() offset -100s, "foo", "bar"), label_set(time()+10, "foo", "baz") offset -50s) offset -400s)` r1 := netstorage.Result{ MetricName: metricNameExpected, - Values: []float64{1260, 1460, 1660, 1860, 2060, 2260}, + Values: []float64{1400, 1600, 1800, 2000, 2200, 2400}, Timestamps: timestampsExpected, } r1.MetricName.Tags = []storage.Tag{{ Key: []byte("foo"), - Value: []byte("baz"), + Value: []byte("bar"), }} r2 := netstorage.Result{ MetricName: metricNameExpected, - Values: []float64{1300, 1500, 1700, 1900, 2100, 2300}, + Values: []float64{1410, 1610, 1810, 2010, 2210, 2410}, Timestamps: timestampsExpected, } r2.MetricName.Tags = []storage.Tag{{ Key: []byte("foo"), - Value: []byte("bar"), + Value: []byte("baz"), }} resultExpected := []netstorage.Result{r1, r2} f(q, resultExpected) @@ -315,7 +315,7 @@ func TestExecSuccess(t *testing.T) { q := `time()[300s] offset 100s` r := netstorage.Result{ MetricName: metricNameExpected, - Values: []float64{900, 1100, 1300, 1500, 1700, 1900}, + Values: []float64{800, 1000, 1200, 1400, 1600, 1800}, Timestamps: timestampsExpected, } resultExpected := []netstorage.Result{r} @@ -348,7 +348,7 @@ func TestExecSuccess(t *testing.T) { q := `timestamp(123)` r := netstorage.Result{ MetricName: metricNameExpected, - Values: []float64{900, 1100, 1300, 1500, 1700, 1900}, + Values: []float64{1000, 1200, 1400, 1600, 1800, 2000}, Timestamps: timestampsExpected, } resultExpected := []netstorage.Result{r} @@ -359,7 +359,7 @@ func TestExecSuccess(t *testing.T) { q := `timestamp(time())` r := netstorage.Result{ MetricName: metricNameExpected, - Values: []float64{900, 1100, 1300, 1500, 1700, 1900}, + Values: []float64{1000, 1200, 1400, 1600, 1800, 2000}, Timestamps: timestampsExpected, } resultExpected := []netstorage.Result{r} @@ -370,7 +370,7 @@ func TestExecSuccess(t *testing.T) { q := `timestamp(456/time()+123)` r := netstorage.Result{ MetricName: metricNameExpected, - Values: []float64{900, 1100, 1300, 1500, 1700, 1900}, + Values: []float64{1000, 1200, 1400, 1600, 1800, 2000}, Timestamps: timestampsExpected, } resultExpected := []netstorage.Result{r} @@ -381,7 +381,7 @@ func TestExecSuccess(t *testing.T) { q := `timestamp(time()>=1600)` r := netstorage.Result{ MetricName: metricNameExpected, - Values: []float64{nan, nan, nan, nan, 1700, 1900}, + Values: []float64{nan, nan, nan, 1600, 1800, 2000}, Timestamps: timestampsExpected, } resultExpected := []netstorage.Result{r} @@ -3640,7 +3640,7 @@ func TestExecSuccess(t *testing.T) { q := `round(geomean_over_time(alias(time()/100, "foobar")[3i]), 0.1)` r := netstorage.Result{ MetricName: metricNameExpected, - Values: []float64{6.8, 8.8, 10.9, 12.9, 14.9, 16.9}, + Values: []float64{7.8, 9.9, 11.9, 13.9, 15.9, 17.9}, Timestamps: timestampsExpected, } resultExpected := []netstorage.Result{r} @@ -3662,7 +3662,7 @@ func TestExecSuccess(t *testing.T) { q := `sum2_over_time(alias(time()/100, "foobar")[3i])` r := netstorage.Result{ MetricName: metricNameExpected, - Values: []float64{155, 251, 371, 515, 683, 875}, + Values: []float64{200, 308, 440, 596, 776, 980}, Timestamps: timestampsExpected, } resultExpected := []netstorage.Result{r} @@ -4723,10 +4723,10 @@ func TestExecSuccess(t *testing.T) { }) t.Run(`ru(time() offset 1i, 2000)`, func(t *testing.T) { t.Parallel() - q := `ru(time() offset 1i, 2000)` + q := `ru(time() offset 1.5i, 2000)` r := netstorage.Result{ MetricName: metricNameExpected, - Values: []float64{65, 55.00000000000001, 45, 35, 25, 15}, + Values: []float64{70, 60, 50, 40, 30, 20}, Timestamps: timestampsExpected, } resultExpected := []netstorage.Result{r} @@ -4814,7 +4814,7 @@ func TestExecSuccess(t *testing.T) { q := `integrate(time()*1e-3)` r := netstorage.Result{ MetricName: metricNameExpected, - Values: []float64{160, 200, 240.00000000000003, 280, 320, 360}, + Values: []float64{180, 220.00000000000003, 260, 300, 340.00000000000006, 380}, Timestamps: timestampsExpected, } resultExpected := []netstorage.Result{r} @@ -4913,7 +4913,7 @@ func TestExecSuccess(t *testing.T) { q := `increase(2000-time())` r := netstorage.Result{ MetricName: metricNameExpected, - Values: []float64{1100, 900, 700, 500, 300, 100}, + Values: []float64{1000, 800, 600, 400, 200, 0}, Timestamps: timestampsExpected, } resultExpected := []netstorage.Result{r}