app/vmselect: add interpolate function for filling gaps with linearly interpolated values

See https://stackoverflow.com/q/62565021/274937 for details
This commit is contained in:
Aliaksandr Valialkin 2020-07-02 14:54:18 +03:00
parent 2361ad8ab4
commit f10e8809c0
7 changed files with 92 additions and 4 deletions

View file

@ -4113,6 +4113,44 @@ func TestExecSuccess(t *testing.T) {
resultExpected := []netstorage.Result{r1} resultExpected := []netstorage.Result{r1}
f(q, resultExpected) f(q, resultExpected)
}) })
t.Run(`interpolate()`, func(t *testing.T) {
t.Parallel()
q := `interpolate(label_set(time() < 1300 default time() > 1700, "__name__", "foobar", "x", "y"))`
r1 := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{1000, 1200, 1400, 1600, 1800, 2000},
Timestamps: timestampsExpected,
}
r1.MetricName.MetricGroup = []byte("foobar")
r1.MetricName.Tags = []storage.Tag{{
Key: []byte("x"),
Value: []byte("y"),
}}
resultExpected := []netstorage.Result{r1}
f(q, resultExpected)
})
t.Run(`interpolate(tail)`, func(t *testing.T) {
t.Parallel()
q := `interpolate(time() < 1300)`
r1 := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{1000, 1200, 1200, 1200, 1200, 1200},
Timestamps: timestampsExpected,
}
resultExpected := []netstorage.Result{r1}
f(q, resultExpected)
})
t.Run(`interpolate(head)`, func(t *testing.T) {
t.Parallel()
q := `interpolate(time() > 1500)`
r1 := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{1600, 1600, 1600, 1600, 1800, 2000},
Timestamps: timestampsExpected,
}
resultExpected := []netstorage.Result{r1}
f(q, resultExpected)
})
t.Run(`distinct_over_time([500s])`, func(t *testing.T) { t.Run(`distinct_over_time([500s])`, func(t *testing.T) {
t.Parallel() t.Parallel()
q := `distinct_over_time((time() < 1700)[500s])` q := `distinct_over_time((time() < 1700)[500s])`
@ -5609,6 +5647,7 @@ func TestExecError(t *testing.T) {
f(`median("foo", "bar")`) f(`median("foo", "bar")`)
f(`keep_last_value()`) f(`keep_last_value()`)
f(`keep_next_value()`) f(`keep_next_value()`)
f(`interpolate()`)
f(`distinct_over_time()`) f(`distinct_over_time()`)
f(`distinct()`) f(`distinct()`)
f(`alias()`) f(`alias()`)

View file

@ -72,6 +72,7 @@ var transformFuncs = map[string]transformFunc{
"": transformUnion, // empty func is a synonim to union "": transformUnion, // empty func is a synonim to union
"keep_last_value": transformKeepLastValue, "keep_last_value": transformKeepLastValue,
"keep_next_value": transformKeepNextValue, "keep_next_value": transformKeepNextValue,
"interpolate": transformInterpolate,
"start": newTransformFuncZeroArgs(transformStart), "start": newTransformFuncZeroArgs(transformStart),
"end": newTransformFuncZeroArgs(transformEnd), "end": newTransformFuncZeroArgs(transformEnd),
"step": newTransformFuncZeroArgs(transformStep), "step": newTransformFuncZeroArgs(transformStep),
@ -764,6 +765,52 @@ func transformKeepNextValue(tfa *transformFuncArg) ([]*timeseries, error) {
return rvs, nil return rvs, nil
} }
func transformInterpolate(tfa *transformFuncArg) ([]*timeseries, error) {
args := tfa.args
if err := expectTransformArgsNum(args, 1); err != nil {
return nil, err
}
rvs := args[0]
for _, ts := range rvs {
values := ts.Values
if len(values) == 0 {
continue
}
prevValue := nan
var nextValue float64
for i := 0; i < len(values); i++ {
if !math.IsNaN(values[i]) {
continue
}
if i > 0 {
prevValue = values[i-1]
}
j := i + 1
for j < len(values) {
if !math.IsNaN(values[j]) {
break
}
j++
}
if j >= len(values) {
nextValue = prevValue
} else {
nextValue = values[j]
}
if math.IsNaN(prevValue) {
prevValue = nextValue
}
delta := (nextValue - prevValue) / float64(j-i+1)
for i < j {
prevValue += delta
values[i] = prevValue
i++
}
}
}
return rvs, nil
}
func newTransformFuncRunning(rf func(a, b float64, idx int) float64) transformFunc { func newTransformFuncRunning(rf func(a, b float64, idx int) float64) transformFunc {
return func(tfa *transformFuncArg) ([]*timeseries, error) { return func(tfa *transformFuncArg) ([]*timeseries, error) {
args := tfa.args args := tfa.args

View file

@ -80,6 +80,7 @@ This functionality can be tried at [an editable Grafana dashboard](http://play-g
Use `limitk(1, q)` if you need retaining all the labels from `q`. Use `limitk(1, q)` if you need retaining all the labels from `q`.
- `keep_last_value(q)` - fills missing data (gaps) in `q` with the previous non-empty value. - `keep_last_value(q)` - fills missing data (gaps) in `q` with the previous non-empty value.
- `keep_next_value(q)` - fills missing data (gaps) in `q` with the next non-empty value. - `keep_next_value(q)` - fills missing data (gaps) in `q` with the next non-empty value.
- `interpolate(q)` - fills missing data (gaps) in `q` with linearly interpolated values.
- `distinct_over_time(m[d])` - returns distinct number of values for `m` data points over `d` duration. - `distinct_over_time(m[d])` - returns distinct number of values for `m` data points over `d` duration.
- `distinct(q)` - returns a time series with the number of unique values for each timestamp in `q`. - `distinct(q)` - returns a time series with the number of unique values for each timestamp in `q`.
- `sum2_over_time(m[d])` - returns sum of squares for all the `m` values over `d` duration. - `sum2_over_time(m[d])` - returns sum of squares for all the `m` values over `d` duration.

2
go.mod
View file

@ -9,7 +9,7 @@ require (
// like https://github.com/valyala/fasthttp/commit/996610f021ff45fdc98c2ce7884d5fa4e7f9199b // like https://github.com/valyala/fasthttp/commit/996610f021ff45fdc98c2ce7884d5fa4e7f9199b
github.com/VictoriaMetrics/fasthttp v1.0.1 github.com/VictoriaMetrics/fasthttp v1.0.1
github.com/VictoriaMetrics/metrics v1.11.3 github.com/VictoriaMetrics/metrics v1.11.3
github.com/VictoriaMetrics/metricsql v0.2.3 github.com/VictoriaMetrics/metricsql v0.2.4
github.com/aws/aws-sdk-go v1.32.13 github.com/aws/aws-sdk-go v1.32.13
github.com/cespare/xxhash/v2 v2.1.1 github.com/cespare/xxhash/v2 v2.1.1
github.com/golang/snappy v0.0.1 github.com/golang/snappy v0.0.1

4
go.sum
View file

@ -53,8 +53,8 @@ github.com/VictoriaMetrics/metrics v1.11.2 h1:t/ceLP6SvagUqypCKU7cI7+tQn54+TIV/t
github.com/VictoriaMetrics/metrics v1.11.2/go.mod h1:LU2j9qq7xqZYXz8tF3/RQnB2z2MbZms5TDiIg9/NHiQ= github.com/VictoriaMetrics/metrics v1.11.2/go.mod h1:LU2j9qq7xqZYXz8tF3/RQnB2z2MbZms5TDiIg9/NHiQ=
github.com/VictoriaMetrics/metrics v1.11.3 h1:eSfXc0CrquKa1VTNUvhP+dhNjLUZHQGTFfp19mYCQWE= github.com/VictoriaMetrics/metrics v1.11.3 h1:eSfXc0CrquKa1VTNUvhP+dhNjLUZHQGTFfp19mYCQWE=
github.com/VictoriaMetrics/metrics v1.11.3/go.mod h1:LU2j9qq7xqZYXz8tF3/RQnB2z2MbZms5TDiIg9/NHiQ= github.com/VictoriaMetrics/metrics v1.11.3/go.mod h1:LU2j9qq7xqZYXz8tF3/RQnB2z2MbZms5TDiIg9/NHiQ=
github.com/VictoriaMetrics/metricsql v0.2.3 h1:xGscDmLoeIV7+8qX/mdHnOY0vu4m+wHIVGMoy/nBovY= github.com/VictoriaMetrics/metricsql v0.2.4 h1:240YwT8B4KITVFE7EOrf1rVvsY+5fYsAzyb+bI6/q50=
github.com/VictoriaMetrics/metricsql v0.2.3/go.mod h1:UIjd9S0W1UnTWlJdM0wLS+2pfuPqjwqKoK8yTos+WyE= github.com/VictoriaMetrics/metricsql v0.2.4/go.mod h1:UIjd9S0W1UnTWlJdM0wLS+2pfuPqjwqKoK8yTos+WyE=
github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156 h1:eMwmnE/GDgah4HI848JfFxHt+iPb26b4zyfspmqY0/8= github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156 h1:eMwmnE/GDgah4HI848JfFxHt+iPb26b4zyfspmqY0/8=
github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156/go.mod h1:Cb/ax3seSYIx7SuZdm2G2xzfwmv3TPSk2ucNfQESPXM= github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156/go.mod h1:Cb/ax3seSYIx7SuZdm2G2xzfwmv3TPSk2ucNfQESPXM=
github.com/aws/aws-sdk-go v1.32.13 h1:zzyXF7SUxJcJa3hTcYCl1/Ey+kh2N8TjK5tWnL0wieo= github.com/aws/aws-sdk-go v1.32.13 h1:zzyXF7SUxJcJa3hTcYCl1/Ey+kh2N8TjK5tWnL0wieo=

View file

@ -51,6 +51,7 @@ var transformFuncs = map[string]bool{
"": true, // empty func is a synonim to union "": true, // empty func is a synonim to union
"keep_last_value": true, "keep_last_value": true,
"keep_next_value": true, "keep_next_value": true,
"interpolate": true,
"start": true, "start": true,
"end": true, "end": true,
"step": true, "step": true,

2
vendor/modules.txt vendored
View file

@ -16,7 +16,7 @@ github.com/VictoriaMetrics/fasthttp/fasthttputil
github.com/VictoriaMetrics/fasthttp/stackless github.com/VictoriaMetrics/fasthttp/stackless
# github.com/VictoriaMetrics/metrics v1.11.3 # github.com/VictoriaMetrics/metrics v1.11.3
github.com/VictoriaMetrics/metrics github.com/VictoriaMetrics/metrics
# github.com/VictoriaMetrics/metricsql v0.2.3 # github.com/VictoriaMetrics/metricsql v0.2.4
github.com/VictoriaMetrics/metricsql github.com/VictoriaMetrics/metricsql
github.com/VictoriaMetrics/metricsql/binaryop github.com/VictoriaMetrics/metricsql/binaryop
# github.com/aws/aws-sdk-go v1.32.13 # github.com/aws/aws-sdk-go v1.32.13