app/vmselect: unconditionally align time range boundaries to step for subqueries as Prometheus does

This commit is contained in:
Aliaksandr Valialkin 2020-09-03 13:21:51 +03:00
parent 7ab7ae79c7
commit 3490160fd0
2 changed files with 36 additions and 31 deletions

View file

@ -57,14 +57,7 @@ func AdjustStartEnd(start, end, step int64) (int64, int64) {
// Round start and end to values divisible by step in order
// to enable response caching (see EvalConfig.mayCache).
// Round start to the nearest smaller value divisible by step.
start -= start % step
// Round end to the nearest bigger value divisible by step.
adjust := end % step
if adjust > 0 {
end += step - adjust
}
start, end = alignStartEnd(start, end, step)
// Make sure that the new number of points is the same as the initial number of points.
newPoints := (end-start)/step + 1
@ -76,6 +69,17 @@ func AdjustStartEnd(start, end, step int64) (int64, int64) {
return start, end
}
func alignStartEnd(start, end, step int64) (int64, int64) {
// Round start to the nearest smaller value divisible by step.
start -= start % step
// Round end to the nearest bigger value divisible by step.
adjust := end % step
if adjust > 0 {
end += step - adjust
}
return start, end
}
// EvalConfig is the configuration required for query evaluation via Exec
type EvalConfig struct {
AuthToken *auth.Token
@ -507,11 +511,13 @@ func evalRollupFuncWithSubquery(ec *EvalConfig, name string, rf rollupFunc, expr
ecSQ := newEvalConfig(ec)
ecSQ.Start -= window + maxSilenceInterval + step
ecSQ.End += step
ecSQ.Step = step
if err := ValidateMaxPointsPerTimeseries(ecSQ.Start, ecSQ.End, ecSQ.Step); err != nil {
return nil, err
}
ecSQ.Start, ecSQ.End = AdjustStartEnd(ecSQ.Start, ecSQ.End, ecSQ.Step)
// unconditionally align start and end args to step for subquery as Prometheus does.
ecSQ.Start, ecSQ.End = alignStartEnd(ecSQ.Start, ecSQ.End, ecSQ.Step)
tssSQ, err := evalExpr(ecSQ, re.Expr)
if err != nil {
return nil, err
@ -523,7 +529,6 @@ func evalRollupFuncWithSubquery(ec *EvalConfig, name string, rf rollupFunc, expr
}
return nil, nil
}
sharedTimestamps := getTimestamps(ec.Start, ec.End, ec.Step)
preFunc, rcs, err := getRollupConfigs(name, rf, expr, ec.Start, ec.End, ec.Step, window, ec.LookbackDelta, sharedTimestamps)
if err != nil {

View file

@ -118,7 +118,7 @@ func TestExecSuccess(t *testing.T) {
q := `time() offset 0s`
r := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{900, 1100, 1300, 1500, 1700, 1900},
Values: []float64{1000, 1200, 1400, 1600, 1800, 2000},
Timestamps: timestampsExpected,
}
resultExpected := []netstorage.Result{r}
@ -129,7 +129,7 @@ func TestExecSuccess(t *testing.T) {
q := `sort((label_set(time(), "foo", "bar"), label_set(time()+10, "foo", "baz")) offset 0s)`
r1 := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{900, 1100, 1300, 1500, 1700, 1900},
Values: []float64{1000, 1200, 1400, 1600, 1800, 2000},
Timestamps: timestampsExpected,
}
r1.MetricName.Tags = []storage.Tag{{
@ -138,7 +138,7 @@ func TestExecSuccess(t *testing.T) {
}}
r2 := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{910, 1110, 1310, 1510, 1710, 1910},
Values: []float64{1010, 1210, 1410, 1610, 1810, 2010},
Timestamps: timestampsExpected,
}
r2.MetricName.Tags = []storage.Tag{{
@ -219,7 +219,7 @@ func TestExecSuccess(t *testing.T) {
}}
r2 := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{860, 1060, 1260, 1460, 1660, 1860},
Values: []float64{810, 1010, 1210, 1410, 1610, 1810},
Timestamps: timestampsExpected,
}
r2.MetricName.Tags = []storage.Tag{{
@ -234,7 +234,7 @@ func TestExecSuccess(t *testing.T) {
q := `sort((label_set(time() offset 100s, "foo", "bar"), label_set(time()+10, "foo", "baz") offset 50s) offset 400s)`
r1 := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{300, 500, 700, 900, 1100, 1300},
Values: []float64{400, 600, 800, 1000, 1200, 1400},
Timestamps: timestampsExpected,
}
r1.MetricName.Tags = []storage.Tag{{
@ -243,7 +243,7 @@ func TestExecSuccess(t *testing.T) {
}}
r2 := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{360, 560, 760, 960, 1160, 1360},
Values: []float64{410, 610, 810, 1010, 1210, 1410},
Timestamps: timestampsExpected,
}
r2.MetricName.Tags = []storage.Tag{{
@ -258,21 +258,21 @@ func TestExecSuccess(t *testing.T) {
q := `sort((label_set(time() offset -100s, "foo", "bar"), label_set(time()+10, "foo", "baz") offset -50s) offset -400s)`
r1 := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{1260, 1460, 1660, 1860, 2060, 2260},
Values: []float64{1400, 1600, 1800, 2000, 2200, 2400},
Timestamps: timestampsExpected,
}
r1.MetricName.Tags = []storage.Tag{{
Key: []byte("foo"),
Value: []byte("baz"),
Value: []byte("bar"),
}}
r2 := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{1300, 1500, 1700, 1900, 2100, 2300},
Values: []float64{1410, 1610, 1810, 2010, 2210, 2410},
Timestamps: timestampsExpected,
}
r2.MetricName.Tags = []storage.Tag{{
Key: []byte("foo"),
Value: []byte("bar"),
Value: []byte("baz"),
}}
resultExpected := []netstorage.Result{r1, r2}
f(q, resultExpected)
@ -315,7 +315,7 @@ func TestExecSuccess(t *testing.T) {
q := `time()[300s] offset 100s`
r := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{900, 1100, 1300, 1500, 1700, 1900},
Values: []float64{800, 1000, 1200, 1400, 1600, 1800},
Timestamps: timestampsExpected,
}
resultExpected := []netstorage.Result{r}
@ -348,7 +348,7 @@ func TestExecSuccess(t *testing.T) {
q := `timestamp(123)`
r := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{900, 1100, 1300, 1500, 1700, 1900},
Values: []float64{1000, 1200, 1400, 1600, 1800, 2000},
Timestamps: timestampsExpected,
}
resultExpected := []netstorage.Result{r}
@ -359,7 +359,7 @@ func TestExecSuccess(t *testing.T) {
q := `timestamp(time())`
r := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{900, 1100, 1300, 1500, 1700, 1900},
Values: []float64{1000, 1200, 1400, 1600, 1800, 2000},
Timestamps: timestampsExpected,
}
resultExpected := []netstorage.Result{r}
@ -370,7 +370,7 @@ func TestExecSuccess(t *testing.T) {
q := `timestamp(456/time()+123)`
r := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{900, 1100, 1300, 1500, 1700, 1900},
Values: []float64{1000, 1200, 1400, 1600, 1800, 2000},
Timestamps: timestampsExpected,
}
resultExpected := []netstorage.Result{r}
@ -381,7 +381,7 @@ func TestExecSuccess(t *testing.T) {
q := `timestamp(time()>=1600)`
r := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{nan, nan, nan, nan, 1700, 1900},
Values: []float64{nan, nan, nan, 1600, 1800, 2000},
Timestamps: timestampsExpected,
}
resultExpected := []netstorage.Result{r}
@ -3640,7 +3640,7 @@ func TestExecSuccess(t *testing.T) {
q := `round(geomean_over_time(alias(time()/100, "foobar")[3i]), 0.1)`
r := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{6.8, 8.8, 10.9, 12.9, 14.9, 16.9},
Values: []float64{7.8, 9.9, 11.9, 13.9, 15.9, 17.9},
Timestamps: timestampsExpected,
}
resultExpected := []netstorage.Result{r}
@ -3662,7 +3662,7 @@ func TestExecSuccess(t *testing.T) {
q := `sum2_over_time(alias(time()/100, "foobar")[3i])`
r := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{155, 251, 371, 515, 683, 875},
Values: []float64{200, 308, 440, 596, 776, 980},
Timestamps: timestampsExpected,
}
resultExpected := []netstorage.Result{r}
@ -4723,10 +4723,10 @@ func TestExecSuccess(t *testing.T) {
})
t.Run(`ru(time() offset 1i, 2000)`, func(t *testing.T) {
t.Parallel()
q := `ru(time() offset 1i, 2000)`
q := `ru(time() offset 1.5i, 2000)`
r := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{65, 55.00000000000001, 45, 35, 25, 15},
Values: []float64{70, 60, 50, 40, 30, 20},
Timestamps: timestampsExpected,
}
resultExpected := []netstorage.Result{r}
@ -4814,7 +4814,7 @@ func TestExecSuccess(t *testing.T) {
q := `integrate(time()*1e-3)`
r := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{160, 200, 240.00000000000003, 280, 320, 360},
Values: []float64{180, 220.00000000000003, 260, 300, 340.00000000000006, 380},
Timestamps: timestampsExpected,
}
resultExpected := []netstorage.Result{r}
@ -4913,7 +4913,7 @@ func TestExecSuccess(t *testing.T) {
q := `increase(2000-time())`
r := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{1100, 900, 700, 500, 300, 100},
Values: []float64{1000, 800, 600, 400, 200, 0},
Timestamps: timestampsExpected,
}
resultExpected := []netstorage.Result{r}