app/vmselect/promql: properly handle instant query optimization conrner cases for min_over_time() and max_over_time()

- If min_over_time(m[offset] @ timestamp) <= min_over_time(m[offset] @ (timestamp-window)),
  then the optimization can be applied.

- If max_over_time(m[offset] @ timestamp) >= max_over_time(m[offset] @ (timestamp-window)),
  then the optimization can be applied.
This commit is contained in:
Aliaksandr Valialkin 2023-11-14 02:52:39 +01:00
parent 0116a333fb
commit 7cf7740d18
No known key found for this signature in database
GPG key ID: A72BEC6CD3D0DED1

View file

@ -1249,23 +1249,6 @@ func evalInstantRollup(qt *querytracer.Tracer, ec *EvalConfig, funcName string,
if offset == 0 {
return tssCached, nil
}
// Calculate max_over_time(m[offset] @ (timestamp - window))
tssEnd, err := evalAt(qtChild, timestamp-window, offset)
if err != nil {
return nil, err
}
if hasDuplicateSeries(tssEnd) {
qtChild.Printf("cannot apply instant rollup optimization, since tssEnd contains duplicate series")
return evalAt(qtChild, timestamp, window)
}
// Verify whether tssCached values are bigger than tssEnd values.
// If this isn't the case, then the optimization cannot be applied.
if !isLowerInstantValues(tssEnd, tssCached) {
qtChild.Printf("cannot apply instant rollup optimization, since tssEnd contains bigger values than tssCached")
deleteCachedSeries(qtChild)
return evalAt(qt, timestamp, window)
}
// Calculate max_over_time(m[offset] @ timestamp)
tssStart, err := evalAt(qtChild, timestamp, offset)
if err != nil {
@ -1275,8 +1258,22 @@ func evalInstantRollup(qt *querytracer.Tracer, ec *EvalConfig, funcName string,
qtChild.Printf("cannot apply instant rollup optimization, since tssStart contains duplicate series")
return evalAt(qtChild, timestamp, window)
}
// Calculate max_over_time(m[offset] @ (timestamp - window))
tssEnd, err := evalAt(qtChild, timestamp-window, offset)
if err != nil {
return nil, err
}
if hasDuplicateSeries(tssEnd) {
qtChild.Printf("cannot apply instant rollup optimization, since tssEnd contains duplicate series")
return evalAt(qtChild, timestamp, window)
}
// Calculate the result
tss := getMaxInstantValues(qtChild, tssCached, tssStart)
tss, ok := getMaxInstantValues(qtChild, tssCached, tssStart, tssEnd)
if !ok {
qtChild.Printf("cannot apply instant rollup optimization, since tssEnd contains bigger values than tssCached")
deleteCachedSeries(qtChild)
return evalAt(qt, timestamp, window)
}
return tss, nil
case "min_over_time":
if iafc != nil {
@ -1314,23 +1311,6 @@ func evalInstantRollup(qt *querytracer.Tracer, ec *EvalConfig, funcName string,
if offset == 0 {
return tssCached, nil
}
// Calculate min_over_time(m[offset] @ (timestamp - window))
tssEnd, err := evalAt(qtChild, timestamp-window, offset)
if err != nil {
return nil, err
}
if hasDuplicateSeries(tssEnd) {
qtChild.Printf("cannot apply instant rollup optimization, since tssEnd contains duplicate series")
return evalAt(qtChild, timestamp, window)
}
// Verify whether tssCached values are smaller than tssEnd values.
// If this isn't the case, then the optimization cannot be applied.
if !isLowerInstantValues(tssCached, tssEnd) {
qtChild.Printf("cannot apply instant rollup optimization, since tssEnd contains smaller values than tssCached")
deleteCachedSeries(qtChild)
return evalAt(qt, timestamp, window)
}
// Calculate min_over_time(m[offset] @ timestamp)
tssStart, err := evalAt(qtChild, timestamp, offset)
if err != nil {
@ -1340,8 +1320,22 @@ func evalInstantRollup(qt *querytracer.Tracer, ec *EvalConfig, funcName string,
qtChild.Printf("cannot apply instant rollup optimization, since tssStart contains duplicate series")
return evalAt(qtChild, timestamp, window)
}
// Calculate min_over_time(m[offset] @ (timestamp - window))
tssEnd, err := evalAt(qtChild, timestamp-window, offset)
if err != nil {
return nil, err
}
if hasDuplicateSeries(tssEnd) {
qtChild.Printf("cannot apply instant rollup optimization, since tssEnd contains duplicate series")
return evalAt(qtChild, timestamp, window)
}
// Calculate the result
tss := getMinInstantValues(qtChild, tssCached, tssStart)
tss, ok := getMinInstantValues(qtChild, tssCached, tssStart, tssEnd)
if !ok {
qtChild.Printf("cannot apply instant rollup optimization, since tssEnd contains smaller values than tssCached")
deleteCachedSeries(qtChild)
return evalAt(qt, timestamp, window)
}
return tss, nil
case
"count_eq_over_time",
@ -1428,37 +1422,8 @@ func hasDuplicateSeries(tss []*timeseries) bool {
return false
}
// isLowerInstantValues verifies that tssA contains lower values than tssB
func isLowerInstantValues(tssA, tssB []*timeseries) bool {
assertInstantValues(tssA)
assertInstantValues(tssB)
m := make(map[string]*timeseries, len(tssA))
bb := bbPool.Get()
defer bbPool.Put(bb)
for _, ts := range tssA {
bb.B = marshalMetricNameSorted(bb.B[:0], &ts.MetricName)
if _, ok := m[string(bb.B)]; ok {
logger.Panicf("BUG: duplicate series found: %s", &ts.MetricName)
}
m[string(bb.B)] = ts
}
for _, tsB := range tssB {
bb.B = marshalMetricNameSorted(bb.B[:0], &tsB.MetricName)
tsA := m[string(bb.B)]
if tsA != nil && !math.IsNaN(tsA.Values[0]) && !math.IsNaN(tsB.Values[0]) {
if tsA.Values[0] >= tsB.Values[0] {
return false
}
}
}
return true
}
func getMinInstantValues(qt *querytracer.Tracer, tssCached, tssStart []*timeseries) []*timeseries {
qt = qt.NewChild("calculate the minimum for instant values across series; cached=%d, start=%d", len(tssCached), len(tssStart))
func getMinInstantValues(qt *querytracer.Tracer, tssCached, tssStart, tssEnd []*timeseries) ([]*timeseries, bool) {
qt = qt.NewChild("calculate the minimum for instant values across series; cached=%d, start=%d, end=%d", len(tssCached), len(tssStart), len(tssEnd))
defer qt.Done()
getMin := func(a, b float64) float64 {
@ -1467,13 +1432,13 @@ func getMinInstantValues(qt *querytracer.Tracer, tssCached, tssStart []*timeseri
}
return b
}
tss := getMinMaxInstantValues(tssCached, tssStart, getMin)
qt.Printf("resulting series=%d", len(tss))
return tss
tss, ok := getMinMaxInstantValues(tssCached, tssStart, tssEnd, getMin)
qt.Printf("resulting series=%d; ok=%v", len(tss), ok)
return tss, ok
}
func getMaxInstantValues(qt *querytracer.Tracer, tssCached, tssStart []*timeseries) []*timeseries {
qt = qt.NewChild("calculate the maximum for instant values across series; cached=%d, start=%d", len(tssCached), len(tssStart))
func getMaxInstantValues(qt *querytracer.Tracer, tssCached, tssStart, tssEnd []*timeseries) ([]*timeseries, bool) {
qt = qt.NewChild("calculate the maximum for instant values across series; cached=%d, start=%d, end=%d", len(tssCached), len(tssStart), len(tssEnd))
defer qt.Done()
getMax := func(a, b float64) float64 {
@ -1482,19 +1447,20 @@ func getMaxInstantValues(qt *querytracer.Tracer, tssCached, tssStart []*timeseri
}
return b
}
tss := getMinMaxInstantValues(tssCached, tssStart, getMax)
tss, ok := getMinMaxInstantValues(tssCached, tssStart, tssEnd, getMax)
qt.Printf("resulting series=%d", len(tss))
return tss
return tss, ok
}
func getMinMaxInstantValues(tssCached, tssStart []*timeseries, f func(a, b float64) float64) []*timeseries {
func getMinMaxInstantValues(tssCached, tssStart, tssEnd []*timeseries, f func(a, b float64) float64) ([]*timeseries, bool) {
assertInstantValues(tssCached)
assertInstantValues(tssStart)
assertInstantValues(tssEnd)
m := make(map[string]*timeseries, len(tssCached))
bb := bbPool.Get()
defer bbPool.Put(bb)
m := make(map[string]*timeseries, len(tssCached))
for _, ts := range tssCached {
bb.B = marshalMetricNameSorted(bb.B[:0], &ts.MetricName)
if _, ok := m[string(bb.B)]; ok {
@ -1503,8 +1469,13 @@ func getMinMaxInstantValues(tssCached, tssStart []*timeseries, f func(a, b float
m[string(bb.B)] = ts
}
mStart := make(map[string]*timeseries, len(tssStart))
for _, ts := range tssStart {
bb.B = marshalMetricNameSorted(bb.B[:0], &ts.MetricName)
if _, ok := m[string(bb.B)]; ok {
logger.Panicf("BUG: duplicate series found: %s", &ts.MetricName)
}
m[string(bb.B)] = ts
tsCached := m[string(bb.B)]
if tsCached != nil && !math.IsNaN(tsCached.Values[0]) {
if !math.IsNaN(ts.Values[0]) {
@ -1515,11 +1486,24 @@ func getMinMaxInstantValues(tssCached, tssStart []*timeseries, f func(a, b float
}
}
for _, ts := range tssEnd {
bb.B = marshalMetricNameSorted(bb.B[:0], &ts.MetricName)
tsCached := m[string(bb.B)]
if tsCached != nil && !math.IsNaN(tsCached.Values[0]) && !math.IsNaN(ts.Values[0]) {
if ts.Values[0] == f(ts.Values[0], tsCached.Values[0]) {
tsStart := mStart[string(bb.B)]
if tsStart == nil || math.IsNaN(tsStart.Values[0]) || tsStart.Values[0] != f(ts.Values[0], tsStart.Values[0]) {
return nil, false
}
}
}
}
rvs := make([]*timeseries, 0, len(m))
for _, ts := range m {
rvs = append(rvs, ts)
}
return rvs
return rvs, true
}
// getSumInstantValues calculates tssCached + tssStart - tssEnd