From a9283e06a315c41e22d89668c131c8fc42d7b12c Mon Sep 17 00:00:00 2001 From: Andrii Chubatiuk Date: Wed, 8 May 2024 14:10:53 +0300 Subject: [PATCH] streamaggr: made labels compressor shared (#6173) Though labels compressor is quite resource intensive, each aggregator and deduplicator instance has it's own compressor. Made it shared across all aggregators to consume less resources while using multiple aggregators. Co-authored-by: Roman Khavronenko --- docs/CHANGELOG.md | 1 + lib/streamaggr/dedup_timing_test.go | 4 +-- lib/streamaggr/deduplicator.go | 15 +++-------- lib/streamaggr/streamaggr.go | 41 +++++++++++++---------------- 4 files changed, 23 insertions(+), 38 deletions(-) diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 9a83a07ed5..d5a8da1d02 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -52,6 +52,7 @@ Released at 2024-04-26 * FEATURE: [vmsingle](https://docs.victoriametrics.com/single-server-victoriametrics/): support disable or log [implicit conversions](https://docs.victoriametrics.com/metricsql/#implicit-query-conversions) for subquery with cmd-line flags `-search.disableImplicitConversion` and `-search.logImplicitConversion`. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4338). * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent/): support data replication additionally to sharding among remote storage systems if `-remoteWrite.shardByURLReplicas=N` command-line flag is set additionally to `-remoteWrite.shardByURL` command-line flag, where `N` is desired replication factor. This allows setting up data replication among failure domains when the replication factor is smaller than the number of failure domains. See [these docs](https://docs.victoriametrics.com/vmagent/#sharding-among-remote-storages) and [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6054). * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent/): reduce CPU usage when [sharding among remote storage systems](https://docs.victoriametrics.com/vmagent/#sharding-among-remote-storages) is enabled. +* FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): reduce memory usage during stream aggregation if multiple aggregation configs are used for the same set of data. * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent/): support [DNS SRV](https://en.wikipedia.org/wiki/SRV_record) addresses in `-remoteWrite.url` command-line option and in scrape target urls. For example, `-remoteWrite.url=http://srv+victoria-metrics/api/v1/write` automatically resolves the `victoria-metrics` DNS SRV to a list of hostnames with TCP ports and then sends the collected metrics to these TCP addresses. See [these docs](https://docs.victoriametrics.com/vmagent/#srv-urls) and [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6053). * FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): allow skipping first N aggregation intervals via cmd-line flag `-streamAggr.ignoreFirstIntervals` for [Single-node VictoriaMetrics](https://docs.victoriametrics.com/) or `-remoteWrite.streamAggr.ignoreFirstIntervals` for [vmagent](https://docs.victoriametrics.com/vmagent/). See more details [here](https://docs.victoriametrics.com/stream-aggregation/#ignore-aggregation-intervals-on-start). * FEATURE: [vmauth](https://docs.victoriametrics.com/vmauth/): support automatic discovering and load balancing for TCP addresses behind DNS SRV addresses. These addresses can be put inside `url_prefix` urls in the form `http://srv+addr/path`, where the `addr` is the [DNS SRV](https://en.wikipedia.org/wiki/SRV_record) address, which is automatically resolved to hostnames with TCP ports. See [these docs](https://docs.victoriametrics.com/vmauth/#srv-urls) for details. diff --git a/lib/streamaggr/dedup_timing_test.go b/lib/streamaggr/dedup_timing_test.go index 2b906fe39a..abb1d04517 100644 --- a/lib/streamaggr/dedup_timing_test.go +++ b/lib/streamaggr/dedup_timing_test.go @@ -7,7 +7,6 @@ import ( "time" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils" ) func BenchmarkDedupAggr(b *testing.B) { @@ -50,7 +49,6 @@ func benchmarkDedupAggr(b *testing.B, samplesPerPush int) { } func newBenchSamples(count int) []pushSample { - var lc promutils.LabelsCompressor labels := []prompbmarshal.Label{ { Name: "app", @@ -82,7 +80,7 @@ func newBenchSamples(count int) []pushSample { Name: "app", Value: fmt.Sprintf("instance-%d", i), }) - keyBuf = compressLabels(keyBuf[:0], &lc, labels[:labelsLen], labels[labelsLen:]) + keyBuf = compressLabels(keyBuf[:0], labels[:labelsLen], labels[labelsLen:]) sample.key = string(keyBuf) sample.value = float64(i) } diff --git a/lib/streamaggr/deduplicator.go b/lib/streamaggr/deduplicator.go index 9cc3fee222..a96d765c21 100644 --- a/lib/streamaggr/deduplicator.go +++ b/lib/streamaggr/deduplicator.go @@ -15,7 +15,6 @@ import ( // Deduplicator deduplicates samples per each time series. type Deduplicator struct { da *dedupAggr - lc promutils.LabelsCompressor dropLabels []string @@ -38,8 +37,7 @@ type Deduplicator struct { // MustStop must be called on the returned deduplicator in order to free up occupied resources. func NewDeduplicator(pushFunc PushFunc, dedupInterval time.Duration, dropLabels []string) *Deduplicator { d := &Deduplicator{ - da: newDedupAggr(), - + da: newDedupAggr(), dropLabels: dropLabels, stopCh: make(chan struct{}), @@ -54,13 +52,6 @@ func NewDeduplicator(pushFunc PushFunc, dedupInterval time.Duration, dropLabels return float64(d.da.itemsCount()) }) - _ = ms.NewGauge(`vm_streamaggr_labels_compressor_size_bytes`, func() float64 { - return float64(d.lc.SizeBytes()) - }) - _ = ms.NewGauge(`vm_streamaggr_labels_compressor_items_count`, func() float64 { - return float64(d.lc.ItemsCount()) - }) - d.dedupFlushDuration = ms.GetOrCreateHistogram(`vm_streamaggr_dedup_flush_duration_seconds`) d.dedupFlushTimeouts = ms.GetOrCreateCounter(`vm_streamaggr_dedup_flush_timeouts_total`) @@ -103,7 +94,7 @@ func (d *Deduplicator) Push(tss []prompbmarshal.TimeSeries) { } labels.Sort() - buf = d.lc.Compress(buf[:0], labels.Labels) + buf = lc.Compress(buf[:0], labels.Labels) key := bytesutil.InternBytes(buf) for _, s := range ts.Samples { pss = append(pss, pushSample{ @@ -155,7 +146,7 @@ func (d *Deduplicator) flush(pushFunc PushFunc, dedupInterval time.Duration) { samples := ctx.samples for _, ps := range pss { labelsLen := len(labels) - labels = decompressLabels(labels, &d.lc, ps.key) + labels = decompressLabels(labels, ps.key) samplesLen := len(samples) samples = append(samples, prompbmarshal.Sample{ diff --git a/lib/streamaggr/streamaggr.go b/lib/streamaggr/streamaggr.go index 5cb480aad2..3e5b72ecd9 100644 --- a/lib/streamaggr/streamaggr.go +++ b/lib/streamaggr/streamaggr.go @@ -45,6 +45,19 @@ var supportedOutputs = []string{ "quantiles(phi1, ..., phiN)", } +var ( + // lc contains information about all compressed labels for streaming aggregation + lc promutils.LabelsCompressor + + _ = metrics.NewGauge(`vm_streamaggr_labels_compressor_size_bytes`, func() float64 { + return float64(lc.SizeBytes()) + }) + + _ = metrics.NewGauge(`vm_streamaggr_labels_compressor_items_count`, func() float64 { + return float64(lc.ItemsCount()) + }) +) + // LoadFromFile loads Aggregators from the given path and uses the given pushFunc for pushing the aggregated data. // // opts can contain additional options. If opts is nil, then default options are used. @@ -269,21 +282,6 @@ func newAggregatorsFromData(data []byte, pushFunc PushFunc, opts *Options) (*Agg return float64(n) }) - _ = ms.NewGauge(`vm_streamaggr_labels_compressor_size_bytes`, func() float64 { - n := uint64(0) - for _, aggr := range as { - n += aggr.lc.SizeBytes() - } - return float64(n) - }) - _ = ms.NewGauge(`vm_streamaggr_labels_compressor_items_count`, func() float64 { - n := uint64(0) - for _, aggr := range as { - n += aggr.lc.ItemsCount() - } - return float64(n) - }) - metrics.RegisterSet(ms) return &Aggregators{ as: as, @@ -361,9 +359,6 @@ type aggregator struct { // aggrStates contains aggregate states for the given outputs aggrStates []aggrState - // lc is used for compressing series keys before passing them to dedupAggr and aggrState - lc promutils.LabelsCompressor - // minTimestamp is used for ignoring old samples when ignoreOldSamples is set minTimestamp atomic.Int64 @@ -822,7 +817,7 @@ func (a *aggregator) Push(tss []prompbmarshal.TimeSeries, matchIdxs []byte) { outputLabels.Labels = append(outputLabels.Labels, labels.Labels...) } - buf = compressLabels(buf[:0], &a.lc, inputLabels.Labels, outputLabels.Labels) + buf = compressLabels(buf[:0], inputLabels.Labels, outputLabels.Labels) key := bytesutil.InternBytes(buf) for _, sample := range ts.Samples { if math.IsNaN(sample.Value) { @@ -850,7 +845,7 @@ func (a *aggregator) Push(tss []prompbmarshal.TimeSeries, matchIdxs []byte) { } } -func compressLabels(dst []byte, lc *promutils.LabelsCompressor, inputLabels, outputLabels []prompbmarshal.Label) []byte { +func compressLabels(dst []byte, inputLabels, outputLabels []prompbmarshal.Label) []byte { bb := bbPool.Get() bb.B = lc.Compress(bb.B, inputLabels) dst = encoding.MarshalVarUint64(dst, uint64(len(bb.B))) @@ -860,7 +855,7 @@ func compressLabels(dst []byte, lc *promutils.LabelsCompressor, inputLabels, out return dst } -func decompressLabels(dst []prompbmarshal.Label, lc *promutils.LabelsCompressor, key string) []prompbmarshal.Label { +func decompressLabels(dst []prompbmarshal.Label, key string) []prompbmarshal.Label { return lc.Decompress(dst, bytesutil.ToUnsafeBytes(key)) } @@ -1035,7 +1030,7 @@ func (ctx *flushCtx) flushSeries() { func (ctx *flushCtx) appendSeries(key, suffix string, timestamp int64, value float64) { labelsLen := len(ctx.labels) samplesLen := len(ctx.samples) - ctx.labels = decompressLabels(ctx.labels, &ctx.a.lc, key) + ctx.labels = decompressLabels(ctx.labels, key) if !ctx.a.keepMetricNames { ctx.labels = addMetricSuffix(ctx.labels, labelsLen, ctx.a.suffix, suffix) } @@ -1058,7 +1053,7 @@ func (ctx *flushCtx) appendSeries(key, suffix string, timestamp int64, value flo func (ctx *flushCtx) appendSeriesWithExtraLabel(key, suffix string, timestamp int64, value float64, extraName, extraValue string) { labelsLen := len(ctx.labels) samplesLen := len(ctx.samples) - ctx.labels = decompressLabels(ctx.labels, &ctx.a.lc, key) + ctx.labels = decompressLabels(ctx.labels, key) if !ctx.a.keepMetricNames { ctx.labels = addMetricSuffix(ctx.labels, labelsLen, ctx.a.suffix, suffix) }