VictoriaMetrics/app/vmagent/remotewrite/remotewrite_test.go
Aliaksandr Valialkin bc1f92d7f5
app/vmagent/remotewrite: follow-up for 87fd400dfc
- Drop samples and return true from remotewrite.TryPush() at fast path when all the remote storage
  systems are configured with the disabled on-disk queue, every in-memory queue is full
  and -remoteWrite.dropSamplesOnOverload is set to true. This case is quite common,
  so it should be optimized. Previously additional CPU time was spent on per-remoteWriteCtx
  relabeling and other processing in this case.

- Properly count the number of dropped samples inside remoteWriteCtx.pushInternalTrackDropped().
  Previously dropped samples were counted only if -remoteWrite.dropSamplesOnOverload flag is set.
  In reality, the samples are dropped when they couldn't be sent to the queue because in-memory queue is full
  and on-disk queue is disabled.
  The remoteWriteCtx.pushInternalTrackDropped() function is called by streaming aggregation for pushing
  the aggregated data to the remote storage. Streaming aggregation cannot wait until the remote storage
  processes pending data, so it drops aggregated samples in this case.

- Clarify the description for -remoteWrite.disableOnDiskQueue command-line flag at -help output,
  so it is clear that this flag can be set individually per each -remoteWrite.url.

- Make the -remoteWrite.dropSamplesOnOverload flag global. If some of the remote storage systems
  are configured with the disabled on-disk queue, then there is no sense in keeping samples
  on some of these systems, while dropping samples on the remaining systems, since this
  will result in global stall on the remote storage system with the disabled on-disk queue
  and with the -remoteWrite.dropSamplesOnOverload=false flag. vmagent will always return false
  from remotewrite.TryPush() in this case. This will result in infinite duplicate samples
  written to the remaining remote storage systems. That's why the -remoteWrite.dropSamplesOnOverload
  is forcibly set to true if more than one -remoteWrite.disableOnDiskQueue flag is set.
  This allows proceeding with newly scraped / pushed samples by sending them to the remaining
  remote storage systems, while dropping them on overloaded systems with the -remoteWrite.disableOnDiskQueue flag set.

- Verify that the remoteWriteCtx.TryPush() returns true in the TestRemoteWriteContext_TryPush_ImmutableTimeseries test.

- Mention in vmagent docs that the -remoteWrite.disableOnDiskQueue command-line flag can be set individually per each -remoteWrite.url.
  See https://docs.victoriametrics.com/vmagent/#disabling-on-disk-persistence

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6248
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6065
2024-07-13 02:30:10 +02:00

167 lines
4.1 KiB
Go

package remotewrite
import (
"fmt"
"math"
"reflect"
"testing"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/streamaggr"
"github.com/VictoriaMetrics/metrics"
)
func TestGetLabelsHash_Distribution(t *testing.T) {
f := func(bucketsCount int) {
t.Helper()
// Distribute itemsCount hashes returned by getLabelsHash() across bucketsCount buckets.
itemsCount := 1_000 * bucketsCount
m := make([]int, bucketsCount)
var labels []prompbmarshal.Label
for i := 0; i < itemsCount; i++ {
labels = append(labels[:0], prompbmarshal.Label{
Name: "__name__",
Value: fmt.Sprintf("some_name_%d", i),
})
for j := 0; j < 10; j++ {
labels = append(labels, prompbmarshal.Label{
Name: fmt.Sprintf("label_%d", j),
Value: fmt.Sprintf("value_%d_%d", i, j),
})
}
h := getLabelsHash(labels)
m[h%uint64(bucketsCount)]++
}
// Verify that the distribution is even
expectedItemsPerBucket := itemsCount / bucketsCount
for _, n := range m {
if math.Abs(1-float64(n)/float64(expectedItemsPerBucket)) > 0.04 {
t.Fatalf("unexpected items in the bucket for %d buckets; got %d; want around %d", bucketsCount, n, expectedItemsPerBucket)
}
}
}
f(2)
f(3)
f(4)
f(5)
f(10)
}
func TestRemoteWriteContext_TryPush_ImmutableTimeseries(t *testing.T) {
f := func(streamAggrConfig, relabelConfig string, dedupInterval time.Duration, keepInput, dropInput bool, input string) {
t.Helper()
perURLRelabel, err := promrelabel.ParseRelabelConfigsData([]byte(relabelConfig))
if err != nil {
t.Fatalf("cannot load relabel configs: %s", err)
}
rcs := &relabelConfigs{
perURL: []*promrelabel.ParsedConfigs{
perURLRelabel,
},
}
allRelabelConfigs.Store(rcs)
pss := make([]*pendingSeries, 1)
pss[0] = newPendingSeries(nil, true, 0, 100)
rwctx := &remoteWriteCtx{
idx: 0,
streamAggrKeepInput: keepInput,
streamAggrDropInput: dropInput,
pss: pss,
rowsPushedAfterRelabel: metrics.GetOrCreateCounter(`foo`),
rowsDroppedByRelabel: metrics.GetOrCreateCounter(`bar`),
}
if dedupInterval > 0 {
rwctx.deduplicator = streamaggr.NewDeduplicator(nil, dedupInterval, nil, "global")
}
if len(streamAggrConfig) > 0 {
sas, err := streamaggr.LoadFromData([]byte(streamAggrConfig), nil, streamaggr.Options{})
if err != nil {
t.Fatalf("cannot load streamaggr configs: %s", err)
}
rwctx.sas.Store(sas)
}
offsetMsecs := time.Now().UnixMilli()
inputTss := prompbmarshal.MustParsePromMetrics(input, offsetMsecs)
expectedTss := make([]prompbmarshal.TimeSeries, len(inputTss))
// copy inputTss to make sure it is not mutated during TryPush call
copy(expectedTss, inputTss)
if !rwctx.TryPush(inputTss, false) {
t.Fatalf("cannot push samples to rwctx")
}
if !reflect.DeepEqual(expectedTss, inputTss) {
t.Fatalf("unexpected samples;\ngot\n%v\nwant\n%v", inputTss, expectedTss)
}
}
f(`
- interval: 1m
outputs: [sum_samples]
- interval: 2m
outputs: [count_series]
`, `
- action: keep
source_labels: [env]
regex: "dev"
`, 0, false, false, `
metric{env="dev"} 10
metric{env="bar"} 20
metric{env="dev"} 15
metric{env="bar"} 25
`)
f(``, ``, time.Hour, false, false, `
metric{env="dev"} 10
metric{env="foo"} 20
metric{env="dev"} 15
metric{env="foo"} 25
`)
f(``, `
- action: keep
source_labels: [env]
regex: "dev"
`, time.Hour, false, false, `
metric{env="dev"} 10
metric{env="bar"} 20
metric{env="dev"} 15
metric{env="bar"} 25
`)
f(``, `
- action: keep
source_labels: [env]
regex: "dev"
`, time.Hour, true, false, `
metric{env="test"} 10
metric{env="dev"} 20
metric{env="foo"} 15
metric{env="dev"} 25
`)
f(``, `
- action: keep
source_labels: [env]
regex: "dev"
`, time.Hour, false, true, `
metric{env="foo"} 10
metric{env="dev"} 20
metric{env="foo"} 15
metric{env="dev"} 25
`)
f(``, `
- action: keep
source_labels: [env]
regex: "dev"
`, time.Hour, true, true, `
metric{env="dev"} 10
metric{env="test"} 20
metric{env="dev"} 15
metric{env="bar"} 25
`)
}