mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
app/vmselect: add interpolate
function for filling gaps with linearly interpolated values
See https://stackoverflow.com/q/62565021/274937 for details
This commit is contained in:
parent
4dd3de9286
commit
5d3db3ff7c
7 changed files with 92 additions and 4 deletions
|
@ -4103,6 +4103,44 @@ func TestExecSuccess(t *testing.T) {
|
|||
resultExpected := []netstorage.Result{r1}
|
||||
f(q, resultExpected)
|
||||
})
|
||||
t.Run(`interpolate()`, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
q := `interpolate(label_set(time() < 1300 default time() > 1700, "__name__", "foobar", "x", "y"))`
|
||||
r1 := netstorage.Result{
|
||||
MetricName: metricNameExpected,
|
||||
Values: []float64{1000, 1200, 1400, 1600, 1800, 2000},
|
||||
Timestamps: timestampsExpected,
|
||||
}
|
||||
r1.MetricName.MetricGroup = []byte("foobar")
|
||||
r1.MetricName.Tags = []storage.Tag{{
|
||||
Key: []byte("x"),
|
||||
Value: []byte("y"),
|
||||
}}
|
||||
resultExpected := []netstorage.Result{r1}
|
||||
f(q, resultExpected)
|
||||
})
|
||||
t.Run(`interpolate(tail)`, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
q := `interpolate(time() < 1300)`
|
||||
r1 := netstorage.Result{
|
||||
MetricName: metricNameExpected,
|
||||
Values: []float64{1000, 1200, 1200, 1200, 1200, 1200},
|
||||
Timestamps: timestampsExpected,
|
||||
}
|
||||
resultExpected := []netstorage.Result{r1}
|
||||
f(q, resultExpected)
|
||||
})
|
||||
t.Run(`interpolate(head)`, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
q := `interpolate(time() > 1500)`
|
||||
r1 := netstorage.Result{
|
||||
MetricName: metricNameExpected,
|
||||
Values: []float64{1600, 1600, 1600, 1600, 1800, 2000},
|
||||
Timestamps: timestampsExpected,
|
||||
}
|
||||
resultExpected := []netstorage.Result{r1}
|
||||
f(q, resultExpected)
|
||||
})
|
||||
t.Run(`distinct_over_time([500s])`, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
q := `distinct_over_time((time() < 1700)[500s])`
|
||||
|
@ -5595,6 +5633,7 @@ func TestExecError(t *testing.T) {
|
|||
f(`median("foo", "bar")`)
|
||||
f(`keep_last_value()`)
|
||||
f(`keep_next_value()`)
|
||||
f(`interpolate()`)
|
||||
f(`distinct_over_time()`)
|
||||
f(`distinct()`)
|
||||
f(`alias()`)
|
||||
|
|
|
@ -72,6 +72,7 @@ var transformFuncs = map[string]transformFunc{
|
|||
"": transformUnion, // empty func is a synonim to union
|
||||
"keep_last_value": transformKeepLastValue,
|
||||
"keep_next_value": transformKeepNextValue,
|
||||
"interpolate": transformInterpolate,
|
||||
"start": newTransformFuncZeroArgs(transformStart),
|
||||
"end": newTransformFuncZeroArgs(transformEnd),
|
||||
"step": newTransformFuncZeroArgs(transformStep),
|
||||
|
@ -764,6 +765,52 @@ func transformKeepNextValue(tfa *transformFuncArg) ([]*timeseries, error) {
|
|||
return rvs, nil
|
||||
}
|
||||
|
||||
func transformInterpolate(tfa *transformFuncArg) ([]*timeseries, error) {
|
||||
args := tfa.args
|
||||
if err := expectTransformArgsNum(args, 1); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
rvs := args[0]
|
||||
for _, ts := range rvs {
|
||||
values := ts.Values
|
||||
if len(values) == 0 {
|
||||
continue
|
||||
}
|
||||
prevValue := nan
|
||||
var nextValue float64
|
||||
for i := 0; i < len(values); i++ {
|
||||
if !math.IsNaN(values[i]) {
|
||||
continue
|
||||
}
|
||||
if i > 0 {
|
||||
prevValue = values[i-1]
|
||||
}
|
||||
j := i + 1
|
||||
for j < len(values) {
|
||||
if !math.IsNaN(values[j]) {
|
||||
break
|
||||
}
|
||||
j++
|
||||
}
|
||||
if j >= len(values) {
|
||||
nextValue = prevValue
|
||||
} else {
|
||||
nextValue = values[j]
|
||||
}
|
||||
if math.IsNaN(prevValue) {
|
||||
prevValue = nextValue
|
||||
}
|
||||
delta := (nextValue - prevValue) / float64(j-i+1)
|
||||
for i < j {
|
||||
prevValue += delta
|
||||
values[i] = prevValue
|
||||
i++
|
||||
}
|
||||
}
|
||||
}
|
||||
return rvs, nil
|
||||
}
|
||||
|
||||
func newTransformFuncRunning(rf func(a, b float64, idx int) float64) transformFunc {
|
||||
return func(tfa *transformFuncArg) ([]*timeseries, error) {
|
||||
args := tfa.args
|
||||
|
|
|
@ -80,6 +80,7 @@ This functionality can be tried at [an editable Grafana dashboard](http://play-g
|
|||
Use `limitk(1, q)` if you need retaining all the labels from `q`.
|
||||
- `keep_last_value(q)` - fills missing data (gaps) in `q` with the previous non-empty value.
|
||||
- `keep_next_value(q)` - fills missing data (gaps) in `q` with the next non-empty value.
|
||||
- `interpolate(q)` - fills missing data (gaps) in `q` with linearly interpolated values.
|
||||
- `distinct_over_time(m[d])` - returns distinct number of values for `m` data points over `d` duration.
|
||||
- `distinct(q)` - returns a time series with the number of unique values for each timestamp in `q`.
|
||||
- `sum2_over_time(m[d])` - returns sum of squares for all the `m` values over `d` duration.
|
||||
|
|
2
go.mod
2
go.mod
|
@ -9,7 +9,7 @@ require (
|
|||
// like https://github.com/valyala/fasthttp/commit/996610f021ff45fdc98c2ce7884d5fa4e7f9199b
|
||||
github.com/VictoriaMetrics/fasthttp v1.0.1
|
||||
github.com/VictoriaMetrics/metrics v1.11.3
|
||||
github.com/VictoriaMetrics/metricsql v0.2.3
|
||||
github.com/VictoriaMetrics/metricsql v0.2.4
|
||||
github.com/aws/aws-sdk-go v1.32.13
|
||||
github.com/cespare/xxhash/v2 v2.1.1
|
||||
github.com/golang/snappy v0.0.1
|
||||
|
|
4
go.sum
4
go.sum
|
@ -53,8 +53,8 @@ github.com/VictoriaMetrics/metrics v1.11.2 h1:t/ceLP6SvagUqypCKU7cI7+tQn54+TIV/t
|
|||
github.com/VictoriaMetrics/metrics v1.11.2/go.mod h1:LU2j9qq7xqZYXz8tF3/RQnB2z2MbZms5TDiIg9/NHiQ=
|
||||
github.com/VictoriaMetrics/metrics v1.11.3 h1:eSfXc0CrquKa1VTNUvhP+dhNjLUZHQGTFfp19mYCQWE=
|
||||
github.com/VictoriaMetrics/metrics v1.11.3/go.mod h1:LU2j9qq7xqZYXz8tF3/RQnB2z2MbZms5TDiIg9/NHiQ=
|
||||
github.com/VictoriaMetrics/metricsql v0.2.3 h1:xGscDmLoeIV7+8qX/mdHnOY0vu4m+wHIVGMoy/nBovY=
|
||||
github.com/VictoriaMetrics/metricsql v0.2.3/go.mod h1:UIjd9S0W1UnTWlJdM0wLS+2pfuPqjwqKoK8yTos+WyE=
|
||||
github.com/VictoriaMetrics/metricsql v0.2.4 h1:240YwT8B4KITVFE7EOrf1rVvsY+5fYsAzyb+bI6/q50=
|
||||
github.com/VictoriaMetrics/metricsql v0.2.4/go.mod h1:UIjd9S0W1UnTWlJdM0wLS+2pfuPqjwqKoK8yTos+WyE=
|
||||
github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156 h1:eMwmnE/GDgah4HI848JfFxHt+iPb26b4zyfspmqY0/8=
|
||||
github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156/go.mod h1:Cb/ax3seSYIx7SuZdm2G2xzfwmv3TPSk2ucNfQESPXM=
|
||||
github.com/aws/aws-sdk-go v1.32.13 h1:zzyXF7SUxJcJa3hTcYCl1/Ey+kh2N8TjK5tWnL0wieo=
|
||||
|
|
1
vendor/github.com/VictoriaMetrics/metricsql/transform.go
generated
vendored
1
vendor/github.com/VictoriaMetrics/metricsql/transform.go
generated
vendored
|
@ -51,6 +51,7 @@ var transformFuncs = map[string]bool{
|
|||
"": true, // empty func is a synonim to union
|
||||
"keep_last_value": true,
|
||||
"keep_next_value": true,
|
||||
"interpolate": true,
|
||||
"start": true,
|
||||
"end": true,
|
||||
"step": true,
|
||||
|
|
2
vendor/modules.txt
vendored
2
vendor/modules.txt
vendored
|
@ -16,7 +16,7 @@ github.com/VictoriaMetrics/fasthttp/fasthttputil
|
|||
github.com/VictoriaMetrics/fasthttp/stackless
|
||||
# github.com/VictoriaMetrics/metrics v1.11.3
|
||||
github.com/VictoriaMetrics/metrics
|
||||
# github.com/VictoriaMetrics/metricsql v0.2.3
|
||||
# github.com/VictoriaMetrics/metricsql v0.2.4
|
||||
github.com/VictoriaMetrics/metricsql
|
||||
github.com/VictoriaMetrics/metricsql/binaryop
|
||||
# github.com/aws/aws-sdk-go v1.32.13
|
||||
|
|
Loading…
Reference in a new issue