VictoriaMetrics/lib/streamaggr/streamaggr_timing_test.go
Andrii Chubatiuk 937ae2ca90
lib/streamaggr: added stale samples metric, added metrics labels (#6462)
### Describe Your Changes

- added stale metrics counters for input and output samples
- added labels for aggregator metrics =>
`name="{rwctx}:{aggrId}:{aggrSuffix}"`
   - rwctx - global or number starting from 1
   - aggrid - aggregator id starting from 1
   - aggrSuffix - <interval>_(by|without)_label1_label2_labeln
   e.g: `name="global:1:1m_without_instance_pod"`

### Checklist

The following checks are **mandatory**:

- [ ] My change adheres [VictoriaMetrics contributing
guidelines](https://docs.victoriametrics.com/contributing/).

---------

Signed-off-by: hagen1778 <roman@victoriametrics.com>
Co-authored-by: hagen1778 <roman@victoriametrics.com>

(cherry picked from commit 861852f262)
Signed-off-by: hagen1778 <roman@victoriametrics.com>
2024-07-01 15:01:49 +02:00

115 lines
2.7 KiB
Go

package streamaggr
import (
"fmt"
"strconv"
"strings"
"testing"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
)
var benchOutputs = []string{
"total",
"total_prometheus",
"increase",
"increase_prometheus",
"rate_sum",
"rate_avg",
"count_series",
"count_samples",
"unique_samples",
"sum_samples",
"last",
"min",
"max",
"avg",
"stddev",
"stdvar",
"histogram_bucket",
"quantiles(0, 0.5, 1)",
}
func BenchmarkAggregatorsPush(b *testing.B) {
for _, output := range benchOutputs {
b.Run(fmt.Sprintf("output=%s", output), func(b *testing.B) {
benchmarkAggregatorsPush(b, output)
})
}
}
func BenchmarkAggregatorsFlushSerial(b *testing.B) {
outputs := []string{
"total", "sum_samples", "count_samples", "min",
"max", "avg", "increase", "count_series",
"last", "stddev", "stdvar", "total_prometheus", "increase_prometheus",
}
pushFunc := func(_ []prompbmarshal.TimeSeries) {}
a := newBenchAggregators(outputs, pushFunc)
defer a.MustStop()
_ = a.Push(benchSeries, nil)
b.ResetTimer()
b.ReportAllocs()
b.SetBytes(int64(len(benchSeries) * len(outputs)))
for i := 0; i < b.N; i++ {
for _, aggr := range a.as {
aggr.flush(pushFunc, time.Hour, false)
}
}
}
func benchmarkAggregatorsPush(b *testing.B, output string) {
pushFunc := func(_ []prompbmarshal.TimeSeries) {}
a := newBenchAggregators([]string{output}, pushFunc)
defer a.MustStop()
const loops = 100
b.ResetTimer()
b.ReportAllocs()
b.SetBytes(int64(len(benchSeries) * loops))
b.RunParallel(func(pb *testing.PB) {
var matchIdxs []byte
for pb.Next() {
for i := 0; i < loops; i++ {
matchIdxs = a.Push(benchSeries, matchIdxs)
}
}
})
}
func newBenchAggregators(outputs []string, pushFunc PushFunc) *Aggregators {
outputsQuoted := make([]string, len(outputs))
for i := range outputs {
outputsQuoted[i] = strconv.Quote(outputs[i])
}
config := fmt.Sprintf(`
- match: http_requests_total
interval: 24h
by: [job]
outputs: [%s]
`, strings.Join(outputsQuoted, ","))
a, err := newAggregatorsFromData([]byte(config), pushFunc, Options{})
if err != nil {
panic(fmt.Errorf("unexpected error when initializing aggregators: %s", err))
}
return a
}
func newBenchSeries(seriesCount int) []prompbmarshal.TimeSeries {
a := make([]string, seriesCount)
for j := 0; j < seriesCount; j++ {
s := fmt.Sprintf(`http_requests_total{path="/foo/%d",job="foo_%d",instance="bar",pod="pod-123232312",namespace="kube-foo-bar",node="node-123-3434-443",`+
`some_other_label="foo-bar-baz",environment="prod",label1="value1",label2="value2",label3="value3"} %d`, j, j%100, j*1000)
a = append(a, s)
}
metrics := strings.Join(a, "\n")
return mustParsePromMetrics(metrics)
}
const seriesCount = 10_000
var benchSeries = newBenchSeries(seriesCount)