mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
app/vmselect/promql: follow-up after 526dd93b32
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1625 Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1612
This commit is contained in:
parent
526dd93b32
commit
c4c77aa2dd
4 changed files with 94 additions and 73 deletions
|
@ -2,15 +2,15 @@ package promql
|
|||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
|
||||
"github.com/VictoriaMetrics/metrics"
|
||||
"github.com/VictoriaMetrics/metricsql"
|
||||
"github.com/valyala/histogram"
|
||||
"math"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
|
||||
"github.com/VictoriaMetrics/metrics"
|
||||
"github.com/VictoriaMetrics/metricsql"
|
||||
)
|
||||
|
||||
var aggrFuncs = map[string]aggrFunc{
|
||||
|
@ -803,36 +803,25 @@ func medianValue(values []float64) float64 {
|
|||
return quantile(0.5, values)
|
||||
}
|
||||
|
||||
// quantiles calculates the given phis from originValues
|
||||
// without modifying originValues
|
||||
func quantiles(phis []float64, originValues []float64) []float64 {
|
||||
a := float64sPool.Get().(*float64s)
|
||||
// quantiles calculates the given phis from originValues without modifying originValues, appends them to qs and returns the result.
|
||||
func quantiles(qs, phis []float64, originValues []float64) []float64 {
|
||||
a := getFloat64s()
|
||||
a.A = prepareForQuantileFloat64(a.A[:0], originValues)
|
||||
res := quantilesSorted(phis, a.A)
|
||||
float64sPool.Put(a)
|
||||
return res
|
||||
qs = quantilesSorted(qs, phis, a.A)
|
||||
putFloat64s(a)
|
||||
return qs
|
||||
}
|
||||
|
||||
func quantilesSorted(phis []float64, values []float64) []float64 {
|
||||
res := make([]float64, len(phis))
|
||||
for i, phi := range phis {
|
||||
res[i] = quantileSorted(phi, values)
|
||||
}
|
||||
return res
|
||||
}
|
||||
|
||||
// quantile calculates the given phi from originValues
|
||||
// without modifying originValues
|
||||
// quantile calculates the given phi from originValues without modifying originValues
|
||||
func quantile(phi float64, originValues []float64) float64 {
|
||||
a := float64sPool.Get().(*float64s)
|
||||
a := getFloat64s()
|
||||
a.A = prepareForQuantileFloat64(a.A[:0], originValues)
|
||||
res := quantileSorted(phi, a.A)
|
||||
float64sPool.Put(a)
|
||||
return res
|
||||
q := quantileSorted(phi, a.A)
|
||||
putFloat64s(a)
|
||||
return q
|
||||
}
|
||||
|
||||
// prepareForQuantileFloat64 copies items from src
|
||||
// to dst but removes NaNs and sorts the dst
|
||||
// prepareForQuantileFloat64 copies items from src to dst but removes NaNs and sorts the dst
|
||||
func prepareForQuantileFloat64(dst, src []float64) []float64 {
|
||||
for _, v := range src {
|
||||
if math.IsNaN(v) {
|
||||
|
@ -844,7 +833,20 @@ func prepareForQuantileFloat64(dst, src []float64) []float64 {
|
|||
return dst
|
||||
}
|
||||
|
||||
// quantileSorted calculates the given quantile of a sorted list of values.
|
||||
// quantilesSorted calculates the given phis over a sorted list of values, appends them to qs and returns the result.
|
||||
//
|
||||
// It is expected that values won't contain NaN items.
|
||||
// The implementation mimics Prometheus implementation for compatibility's sake.
|
||||
func quantilesSorted(qs, phis []float64, values []float64) []float64 {
|
||||
for _, phi := range phis {
|
||||
q := quantileSorted(phi, values)
|
||||
qs = append(qs, q)
|
||||
}
|
||||
return qs
|
||||
}
|
||||
|
||||
// quantileSorted calculates the given quantile over a sorted list of values.
|
||||
//
|
||||
// It is expected that values won't contain NaN items.
|
||||
// The implementation mimics Prometheus implementation for compatibility's sake.
|
||||
func quantileSorted(phi float64, values []float64) float64 {
|
||||
|
@ -944,36 +946,40 @@ func getPerPointMedians(tss []*timeseries) []float64 {
|
|||
logger.Panicf("BUG: expecting non-empty tss")
|
||||
}
|
||||
medians := make([]float64, len(tss[0].Values))
|
||||
h := histogram.GetFast()
|
||||
a := getFloat64s()
|
||||
values := a.A
|
||||
for n := range medians {
|
||||
h.Reset()
|
||||
values = values[:0]
|
||||
for j := range tss {
|
||||
v := tss[j].Values[n]
|
||||
if !math.IsNaN(v) {
|
||||
h.Update(v)
|
||||
values = append(values, v)
|
||||
}
|
||||
}
|
||||
medians[n] = h.Quantile(0.5)
|
||||
medians[n] = quantile(0.5, values)
|
||||
}
|
||||
histogram.PutFast(h)
|
||||
a.A = values
|
||||
putFloat64s(a)
|
||||
return medians
|
||||
}
|
||||
|
||||
func getPerPointMADs(tss []*timeseries, medians []float64) []float64 {
|
||||
mads := make([]float64, len(medians))
|
||||
h := histogram.GetFast()
|
||||
a := getFloat64s()
|
||||
values := a.A
|
||||
for n, median := range medians {
|
||||
h.Reset()
|
||||
values = values[:0]
|
||||
for j := range tss {
|
||||
v := tss[j].Values[n]
|
||||
if !math.IsNaN(v) {
|
||||
ad := math.Abs(v - median)
|
||||
h.Update(ad)
|
||||
values = append(values, ad)
|
||||
}
|
||||
}
|
||||
mads[n] = h.Quantile(0.5)
|
||||
mads[n] = quantile(0.5, values)
|
||||
}
|
||||
histogram.PutFast(h)
|
||||
a.A = values
|
||||
putFloat64s(a)
|
||||
return mads
|
||||
}
|
||||
|
||||
|
@ -1043,24 +1049,24 @@ func aggrFuncQuantiles(afa *aggrFuncArg) ([]*timeseries, error) {
|
|||
tssDst[j] = ts
|
||||
}
|
||||
|
||||
var qs []float64
|
||||
values := float64sPool.Get().(*float64s)
|
||||
b := getFloat64s()
|
||||
qs := b.A
|
||||
a := getFloat64s()
|
||||
values := a.A
|
||||
for n := range tss[0].Values {
|
||||
values.A = values.A[:0]
|
||||
values = values[:0]
|
||||
for j := range tss {
|
||||
v := tss[j].Values[n]
|
||||
if math.IsNaN(v) {
|
||||
continue
|
||||
}
|
||||
values.A = append(values.A, v)
|
||||
values = append(values, tss[j].Values[n])
|
||||
}
|
||||
sort.Float64s(values.A)
|
||||
qs = quantilesSorted(phis, values.A)
|
||||
qs = quantiles(qs[:0], phis, values)
|
||||
for j := range tssDst {
|
||||
tssDst[j].Values[n] = qs[j]
|
||||
}
|
||||
}
|
||||
float64sPool.Put(values)
|
||||
a.A = values
|
||||
putFloat64s(a)
|
||||
b.A = qs
|
||||
putFloat64s(b)
|
||||
return tssDst
|
||||
}
|
||||
return aggrFuncExt(afe, argOrig, &afa.ae.Modifier, afa.ae.Limit, false)
|
||||
|
@ -1092,20 +1098,17 @@ func aggrFuncMedian(afa *aggrFuncArg) ([]*timeseries, error) {
|
|||
func newAggrQuantileFunc(phis []float64) func(tss []*timeseries, modifier *metricsql.ModifierExpr) []*timeseries {
|
||||
return func(tss []*timeseries, modifier *metricsql.ModifierExpr) []*timeseries {
|
||||
dst := tss[0]
|
||||
var values []float64
|
||||
a := getFloat64s()
|
||||
values := a.A
|
||||
for n := range dst.Values {
|
||||
values = values[:0]
|
||||
for j := range tss {
|
||||
v := tss[j].Values[n]
|
||||
if math.IsNaN(v) {
|
||||
continue
|
||||
}
|
||||
values = append(values, v)
|
||||
values = append(values, tss[j].Values[n])
|
||||
}
|
||||
phi := phis[n]
|
||||
sort.Float64s(values)
|
||||
dst.Values[n] = quantileSorted(phi, values)
|
||||
dst.Values[n] = quantile(phis[n], values)
|
||||
}
|
||||
a.A = values
|
||||
putFloat64s(a)
|
||||
tss[0] = dst
|
||||
return tss[:1]
|
||||
}
|
||||
|
|
|
@ -12,7 +12,6 @@ import (
|
|||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
|
||||
"github.com/VictoriaMetrics/metrics"
|
||||
"github.com/VictoriaMetrics/metricsql"
|
||||
"github.com/valyala/histogram"
|
||||
)
|
||||
|
||||
var minStalenessInterval = flag.Duration("search.minStalenessInterval", 0, "The minimum interval for staleness calculations. "+
|
||||
|
@ -643,18 +642,20 @@ func getScrapeInterval(timestamps []int64) int64 {
|
|||
}
|
||||
|
||||
// Estimate scrape interval as 0.6 quantile for the first 20 intervals.
|
||||
h := histogram.GetFast()
|
||||
tsPrev := timestamps[0]
|
||||
timestamps = timestamps[1:]
|
||||
if len(timestamps) > 20 {
|
||||
timestamps = timestamps[:20]
|
||||
}
|
||||
a := getFloat64s()
|
||||
intervals := a.A[:0]
|
||||
for _, ts := range timestamps {
|
||||
h.Update(float64(ts - tsPrev))
|
||||
intervals = append(intervals, float64(ts-tsPrev))
|
||||
tsPrev = ts
|
||||
}
|
||||
scrapeInterval := int64(h.Quantile(0.6))
|
||||
histogram.PutFast(h)
|
||||
scrapeInterval := int64(quantile(0.6, intervals))
|
||||
a.A = intervals
|
||||
putFloat64s(a)
|
||||
if scrapeInterval <= 0 {
|
||||
return int64(maxSilenceInterval)
|
||||
}
|
||||
|
@ -1066,13 +1067,15 @@ func newRollupQuantiles(args []interface{}) (rollupFunc, error) {
|
|||
// Fast path - only a single value.
|
||||
return values[0]
|
||||
}
|
||||
qs := quantiles(phis, values)
|
||||
qs := getFloat64s()
|
||||
qs.A = quantiles(qs.A[:0], phis, values)
|
||||
idx := rfa.idx
|
||||
tsm := rfa.tsm
|
||||
for i, phiStr := range phiStrs {
|
||||
ts := tsm.GetOrCreateTimeseries(phiLabel, phiStr)
|
||||
ts.Values[idx] = qs[i]
|
||||
ts.Values[idx] = qs.A[i]
|
||||
}
|
||||
putFloat64s(qs)
|
||||
return nan
|
||||
}
|
||||
return rf, nil
|
||||
|
@ -1772,19 +1775,28 @@ func rollupModeOverTime(rfa *rollupFuncArg) float64 {
|
|||
// before calling rollup funcs.
|
||||
|
||||
// Copy rfa.values to a.A, since modeNoNaNs modifies a.A contents.
|
||||
a := float64sPool.Get().(*float64s)
|
||||
a := getFloat64s()
|
||||
a.A = append(a.A[:0], rfa.values...)
|
||||
result := modeNoNaNs(rfa.prevValue, a.A)
|
||||
float64sPool.Put(a)
|
||||
putFloat64s(a)
|
||||
return result
|
||||
}
|
||||
|
||||
var float64sPool = &sync.Pool{
|
||||
New: func() interface{} {
|
||||
return &float64s{}
|
||||
},
|
||||
func getFloat64s() *float64s {
|
||||
v := float64sPool.Get()
|
||||
if v == nil {
|
||||
v = &float64s{}
|
||||
}
|
||||
return v.(*float64s)
|
||||
}
|
||||
|
||||
func putFloat64s(a *float64s) {
|
||||
a.A = a.A[:0]
|
||||
float64sPool.Put(a)
|
||||
}
|
||||
|
||||
var float64sPool sync.Pool
|
||||
|
||||
type float64s struct {
|
||||
A []float64
|
||||
}
|
||||
|
|
|
@ -1177,7 +1177,8 @@ func transformRangeQuantile(tfa *transformFuncArg) ([]*timeseries, error) {
|
|||
}
|
||||
phi := phis[0]
|
||||
rvs := args[1]
|
||||
var values []float64
|
||||
a := getFloat64s()
|
||||
values := a.A[:0]
|
||||
for _, ts := range rvs {
|
||||
lastIdx := -1
|
||||
originValues := ts.Values
|
||||
|
@ -1194,6 +1195,8 @@ func transformRangeQuantile(tfa *transformFuncArg) ([]*timeseries, error) {
|
|||
originValues[lastIdx] = quantileSorted(phi, values)
|
||||
}
|
||||
}
|
||||
a.A = values
|
||||
putFloat64s(a)
|
||||
setLastValues(rvs)
|
||||
return rvs, nil
|
||||
}
|
||||
|
|
|
@ -6,6 +6,9 @@ sort: 15
|
|||
|
||||
## tip
|
||||
|
||||
* FEATURE: vmagent [enterprise](https://victoriametrics.com/enterprise.html): add support for data reading from [Apache Kafka](https://kafka.apache.org/).
|
||||
* FEATURE: calculate quantiles in the same way as Prometheus does in such functions as [quantile_over_time](https://docs.victoriametrics.com/MetricsQL.html#quantile_over_time) and [quantile](https://docs.victoriametrics.com/MetricsQL.html#quantile). Previously results from VictoriaMetrics could be slightly different than results from Prometheus. See [this](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1625) and [this](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1612) issues.
|
||||
|
||||
* BUGFIX: align behavior of the queries `a or on (labels) b`, `a and on (labels) b` and `a unless on (labels) b` where `b` has multiple time series with the given `labels` to Prometheus behavior. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/1643).
|
||||
|
||||
|
||||
|
|
Loading…
Reference in a new issue