VictoriaMetrics/app/vmagent/remotewrite/remotewrite_test.go
Roman Khavronenko 3ec0247ee3
lib/prompbmarshal: move MustParsePromMetrics to protoparser/prometheus (#8405)
`MustParsePromMetrics` imports `lib/protoparser/prometheus`, and this
package exposes the following metrics:
```
vm_protoparser_rows_read_total{type="promscrape"}
vm_rows_invalid_total{type="prometheus"}
```

It means every package that uses `lib/prompbmarshal` will start exposing
these metrics. For example, vlogs imports `lib/protoparser/common` which
uses `lib/prompbmarshal.Label`. And only because of this vlogs starts
exposing unrelated prometheus metrics on /metrics page.

Moving `MustParsePromMetrics` to `lib/protoparser/prometheus` seems like
the leas intrusive change.


-----------

Depends on another change
https://github.com/VictoriaMetrics/VictoriaMetrics/pull/8403

Signed-off-by: hagen1778 <roman@victoriametrics.com>
2025-02-27 22:55:32 +01:00

173 lines
4.4 KiB
Go

package remotewrite
import (
"fmt"
"math"
"reflect"
"testing"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/prometheus"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/streamaggr"
"github.com/VictoriaMetrics/metrics"
)
func TestGetLabelsHash_Distribution(t *testing.T) {
f := func(bucketsCount int) {
t.Helper()
// Distribute itemsCount hashes returned by getLabelsHash() across bucketsCount buckets.
itemsCount := 1_000 * bucketsCount
m := make([]int, bucketsCount)
var labels []prompbmarshal.Label
for i := 0; i < itemsCount; i++ {
labels = append(labels[:0], prompbmarshal.Label{
Name: "__name__",
Value: fmt.Sprintf("some_name_%d", i),
})
for j := 0; j < 10; j++ {
labels = append(labels, prompbmarshal.Label{
Name: fmt.Sprintf("label_%d", j),
Value: fmt.Sprintf("value_%d_%d", i, j),
})
}
h := getLabelsHash(labels)
m[h%uint64(bucketsCount)]++
}
// Verify that the distribution is even
expectedItemsPerBucket := itemsCount / bucketsCount
for _, n := range m {
if math.Abs(1-float64(n)/float64(expectedItemsPerBucket)) > 0.04 {
t.Fatalf("unexpected items in the bucket for %d buckets; got %d; want around %d", bucketsCount, n, expectedItemsPerBucket)
}
}
}
f(2)
f(3)
f(4)
f(5)
f(10)
}
func TestRemoteWriteContext_TryPush_ImmutableTimeseries(t *testing.T) {
f := func(streamAggrConfig, relabelConfig string, enableWindows bool, dedupInterval time.Duration, keepInput, dropInput bool, input string) {
t.Helper()
perURLRelabel, err := promrelabel.ParseRelabelConfigsData([]byte(relabelConfig))
if err != nil {
t.Fatalf("cannot load relabel configs: %s", err)
}
rcs := &relabelConfigs{
perURL: []*promrelabel.ParsedConfigs{
perURLRelabel,
},
}
allRelabelConfigs.Store(rcs)
pss := make([]*pendingSeries, 1)
pss[0] = newPendingSeries(nil, true, 0, 100)
rwctx := &remoteWriteCtx{
idx: 0,
streamAggrKeepInput: keepInput,
streamAggrDropInput: dropInput,
pss: pss,
rowsPushedAfterRelabel: metrics.GetOrCreateCounter(`foo`),
rowsDroppedByRelabel: metrics.GetOrCreateCounter(`bar`),
}
if dedupInterval > 0 {
rwctx.deduplicator = streamaggr.NewDeduplicator(nil, enableWindows, dedupInterval, nil, "dedup-global")
}
if streamAggrConfig != "" {
pushNoop := func(_ []prompbmarshal.TimeSeries) {}
opts := streamaggr.Options{
EnableWindows: enableWindows,
}
sas, err := streamaggr.LoadFromData([]byte(streamAggrConfig), pushNoop, &opts, "global")
if err != nil {
t.Fatalf("cannot load streamaggr configs: %s", err)
}
defer sas.MustStop()
rwctx.sas.Store(sas)
}
offsetMsecs := time.Now().UnixMilli()
inputTss := prometheus.MustParsePromMetrics(input, offsetMsecs)
expectedTss := make([]prompbmarshal.TimeSeries, len(inputTss))
// copy inputTss to make sure it is not mutated during TryPush call
copy(expectedTss, inputTss)
if !rwctx.TryPush(inputTss, false) {
t.Fatalf("cannot push samples to rwctx")
}
if !reflect.DeepEqual(expectedTss, inputTss) {
t.Fatalf("unexpected samples;\ngot\n%v\nwant\n%v", inputTss, expectedTss)
}
}
f(`
- interval: 1m
outputs: [sum_samples]
- interval: 2m
outputs: [count_series]
`, `
- action: keep
source_labels: [env]
regex: "dev"
`, false, 0, false, false, `
metric{env="dev"} 10
metric{env="bar"} 20
metric{env="dev"} 15
metric{env="bar"} 25
`)
f(``, ``, true, time.Hour, false, false, `
metric{env="dev"} 10
metric{env="foo"} 20
metric{env="dev"} 15
metric{env="foo"} 25
`)
f(``, `
- action: keep
source_labels: [env]
regex: "dev"
`, true, time.Hour, false, false, `
metric{env="dev"} 10
metric{env="bar"} 20
metric{env="dev"} 15
metric{env="bar"} 25
`)
f(``, `
- action: keep
source_labels: [env]
regex: "dev"
`, true, time.Hour, true, false, `
metric{env="test"} 10
metric{env="dev"} 20
metric{env="foo"} 15
metric{env="dev"} 25
`)
f(``, `
- action: keep
source_labels: [env]
regex: "dev"
`, true, time.Hour, false, true, `
metric{env="foo"} 10
metric{env="dev"} 20
metric{env="foo"} 15
metric{env="dev"} 25
`)
f(``, `
- action: keep
source_labels: [env]
regex: "dev"
`, true, time.Hour, true, true, `
metric{env="dev"} 10
metric{env="test"} 20
metric{env="dev"} 15
metric{env="bar"} 25
`)
}