app/vmselect/prometheus: small fixes on top of 8bb762124a

This commit is contained in:
Aliaksandr Valialkin 2020-07-05 18:17:02 +03:00
parent 8bb762124a
commit acf828a759
2 changed files with 22 additions and 44 deletions

View file

@ -791,8 +791,8 @@ func queryRangeHandler(w http.ResponseWriter, query string, start, end, step int
return fmt.Errorf("cannot execute query: %w", err) return fmt.Errorf("cannot execute query: %w", err)
} }
queryOffset := getLatencyOffsetMilliseconds() queryOffset := getLatencyOffsetMilliseconds()
if ct-end < queryOffset { if ct-queryOffset < end {
result = adjustLastPoints(result, ct, queryOffset) result = adjustLastPoints(result, ct-queryOffset, end)
} }
// Remove NaN values as Prometheus does. // Remove NaN values as Prometheus does.
@ -837,46 +837,24 @@ func removeNaNValuesInplace(tss []netstorage.Result) {
var queryRangeDuration = metrics.NewSummary(`vm_request_duration_seconds{path="/api/v1/query_range"}`) var queryRangeDuration = metrics.NewSummary(`vm_request_duration_seconds{path="/api/v1/query_range"}`)
// adjustLastPoints substitutes the last point values with the previous // adjustLastPoints substitutes the last point values on the time range (start..end]
// point values, since the last points may contain garbage. // with the previous point values, since these points may contain garbage.
func adjustLastPoints(tss []netstorage.Result, ct, queryOffset int64) []netstorage.Result { func adjustLastPoints(tss []netstorage.Result, start, end int64) []netstorage.Result {
if len(tss) == 0 {
return nil
}
// Search for the last non-NaN value across all the timeseries.
lastNonNaNIdx := -1
for i := range tss { for i := range tss {
values := tss[i].Values ts := &tss[i]
j := len(values) - 1 values := ts.Values
for j >= 0 && math.IsNaN(values[j]) { timestamps := ts.Timestamps
j := len(timestamps) - 1
for j >= 0 && timestamps[j] > start {
j-- j--
} }
if j > lastNonNaNIdx { j++
lastNonNaNIdx = j if j <= 0 {
}
}
if lastNonNaNIdx == -1 {
// All timeseries contain only NaNs.
return nil
}
// Substitute the last two values starting from lastNonNaNIdx
// with the previous values for each timeseries.
for i := range tss {
values := tss[i].Values
if lastNonNaNIdx > len(values) {
continue continue
} }
end := tss[i].Timestamps[lastNonNaNIdx] for j < len(timestamps) && timestamps[j] <= end {
if ct-end < queryOffset { values[j] = values[j-1]
for j := 1; j < 3; j++ { j++
idx := lastNonNaNIdx + j
if idx <= 0 || idx >= len(values) || math.IsNaN(values[idx-1]) {
continue
}
values[idx] = values[idx-1]
}
} }
} }
return tss return tss

View file

@ -115,9 +115,9 @@ func TestGetTimeError(t *testing.T) {
} }
func TestAdjustLastPoints(t *testing.T) { func TestAdjustLastPoints(t *testing.T) {
f := func(tss []netstorage.Result, ct, queryOffset int64, tssExpected []netstorage.Result) { f := func(tss []netstorage.Result, start, end int64, tssExpected []netstorage.Result) {
t.Helper() t.Helper()
tss = adjustLastPoints(tss, ct, queryOffset) tss = adjustLastPoints(tss, start, end)
for i, ts := range tss { for i, ts := range tss {
for j, value := range ts.Values { for j, value := range ts.Values {
expectedValue := tssExpected[i].Values[j] expectedValue := tssExpected[i].Values[j]
@ -138,7 +138,7 @@ func TestAdjustLastPoints(t *testing.T) {
nan := math.NaN() nan := math.NaN()
f(nil, 500, 300, nil) f(nil, 300, 500, nil)
f([]netstorage.Result{ f([]netstorage.Result{
{ {
@ -149,7 +149,7 @@ func TestAdjustLastPoints(t *testing.T) {
Timestamps: []int64{100, 200, 300, 400, 500}, Timestamps: []int64{100, 200, 300, 400, 500},
Values: []float64{1, 2, 3, nan, nan}, Values: []float64{1, 2, 3, nan, nan},
}, },
}, 500, 300, []netstorage.Result{ }, 400, 500, []netstorage.Result{
{ {
Timestamps: []int64{100, 200, 300, 400, 500}, Timestamps: []int64{100, 200, 300, 400, 500},
Values: []float64{1, 2, 3, 4, 4}, Values: []float64{1, 2, 3, 4, 4},
@ -169,7 +169,7 @@ func TestAdjustLastPoints(t *testing.T) {
Timestamps: []int64{100, 200, 300, 400, 500}, Timestamps: []int64{100, 200, 300, 400, 500},
Values: []float64{1, 2, nan, nan, nan}, Values: []float64{1, 2, nan, nan, nan},
}, },
}, 500, 300, []netstorage.Result{ }, 300, 500, []netstorage.Result{
{ {
Timestamps: []int64{100, 200, 300, 400, 500}, Timestamps: []int64{100, 200, 300, 400, 500},
Values: []float64{1, 2, 3, 3, 3}, Values: []float64{1, 2, 3, 3, 3},
@ -209,7 +209,7 @@ func TestAdjustLastPoints(t *testing.T) {
Timestamps: []int64{100, 200, 300, 400}, Timestamps: []int64{100, 200, 300, 400},
Values: []float64{1, 2, 3, 4}, Values: []float64{1, 2, 3, 4},
}, },
}, 500, 300, []netstorage.Result{ }, 400, 500, []netstorage.Result{
{ {
Timestamps: []int64{100, 200, 300, 400, 500}, Timestamps: []int64{100, 200, 300, 400, 500},
Values: []float64{1, 2, 3, 4, 4}, Values: []float64{1, 2, 3, 4, 4},
@ -229,7 +229,7 @@ func TestAdjustLastPoints(t *testing.T) {
Timestamps: []int64{100, 200, 300}, Timestamps: []int64{100, 200, 300},
Values: []float64{1, 2, nan}, Values: []float64{1, 2, nan},
}, },
}, 500, 300, []netstorage.Result{ }, 300, 600, []netstorage.Result{
{ {
Timestamps: []int64{100, 200, 300, 400, 500}, Timestamps: []int64{100, 200, 300, 400, 500},
Values: []float64{1, 2, 3, 3, 3}, Values: []float64{1, 2, 3, 3, 3},