diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 9a83a07ed..d5a8da1d0 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -52,6 +52,7 @@ Released at 2024-04-26 * FEATURE: [vmsingle](https://docs.victoriametrics.com/single-server-victoriametrics/): support disable or log [implicit conversions](https://docs.victoriametrics.com/metricsql/#implicit-query-conversions) for subquery with cmd-line flags `-search.disableImplicitConversion` and `-search.logImplicitConversion`. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4338). * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent/): support data replication additionally to sharding among remote storage systems if `-remoteWrite.shardByURLReplicas=N` command-line flag is set additionally to `-remoteWrite.shardByURL` command-line flag, where `N` is desired replication factor. This allows setting up data replication among failure domains when the replication factor is smaller than the number of failure domains. See [these docs](https://docs.victoriametrics.com/vmagent/#sharding-among-remote-storages) and [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6054). * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent/): reduce CPU usage when [sharding among remote storage systems](https://docs.victoriametrics.com/vmagent/#sharding-among-remote-storages) is enabled. +* FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): reduce memory usage during stream aggregation if multiple aggregation configs are used for the same set of data. * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent/): support [DNS SRV](https://en.wikipedia.org/wiki/SRV_record) addresses in `-remoteWrite.url` command-line option and in scrape target urls. For example, `-remoteWrite.url=http://srv+victoria-metrics/api/v1/write` automatically resolves the `victoria-metrics` DNS SRV to a list of hostnames with TCP ports and then sends the collected metrics to these TCP addresses. See [these docs](https://docs.victoriametrics.com/vmagent/#srv-urls) and [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6053). * FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): allow skipping first N aggregation intervals via cmd-line flag `-streamAggr.ignoreFirstIntervals` for [Single-node VictoriaMetrics](https://docs.victoriametrics.com/) or `-remoteWrite.streamAggr.ignoreFirstIntervals` for [vmagent](https://docs.victoriametrics.com/vmagent/). See more details [here](https://docs.victoriametrics.com/stream-aggregation/#ignore-aggregation-intervals-on-start). * FEATURE: [vmauth](https://docs.victoriametrics.com/vmauth/): support automatic discovering and load balancing for TCP addresses behind DNS SRV addresses. These addresses can be put inside `url_prefix` urls in the form `http://srv+addr/path`, where the `addr` is the [DNS SRV](https://en.wikipedia.org/wiki/SRV_record) address, which is automatically resolved to hostnames with TCP ports. See [these docs](https://docs.victoriametrics.com/vmauth/#srv-urls) for details. diff --git a/lib/streamaggr/dedup_timing_test.go b/lib/streamaggr/dedup_timing_test.go index 2b906fe39..abb1d0451 100644 --- a/lib/streamaggr/dedup_timing_test.go +++ b/lib/streamaggr/dedup_timing_test.go @@ -7,7 +7,6 @@ import ( "time" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils" ) func BenchmarkDedupAggr(b *testing.B) { @@ -50,7 +49,6 @@ func benchmarkDedupAggr(b *testing.B, samplesPerPush int) { } func newBenchSamples(count int) []pushSample { - var lc promutils.LabelsCompressor labels := []prompbmarshal.Label{ { Name: "app", @@ -82,7 +80,7 @@ func newBenchSamples(count int) []pushSample { Name: "app", Value: fmt.Sprintf("instance-%d", i), }) - keyBuf = compressLabels(keyBuf[:0], &lc, labels[:labelsLen], labels[labelsLen:]) + keyBuf = compressLabels(keyBuf[:0], labels[:labelsLen], labels[labelsLen:]) sample.key = string(keyBuf) sample.value = float64(i) } diff --git a/lib/streamaggr/deduplicator.go b/lib/streamaggr/deduplicator.go index 9cc3fee22..a96d765c2 100644 --- a/lib/streamaggr/deduplicator.go +++ b/lib/streamaggr/deduplicator.go @@ -15,7 +15,6 @@ import ( // Deduplicator deduplicates samples per each time series. type Deduplicator struct { da *dedupAggr - lc promutils.LabelsCompressor dropLabels []string @@ -38,8 +37,7 @@ type Deduplicator struct { // MustStop must be called on the returned deduplicator in order to free up occupied resources. func NewDeduplicator(pushFunc PushFunc, dedupInterval time.Duration, dropLabels []string) *Deduplicator { d := &Deduplicator{ - da: newDedupAggr(), - + da: newDedupAggr(), dropLabels: dropLabels, stopCh: make(chan struct{}), @@ -54,13 +52,6 @@ func NewDeduplicator(pushFunc PushFunc, dedupInterval time.Duration, dropLabels return float64(d.da.itemsCount()) }) - _ = ms.NewGauge(`vm_streamaggr_labels_compressor_size_bytes`, func() float64 { - return float64(d.lc.SizeBytes()) - }) - _ = ms.NewGauge(`vm_streamaggr_labels_compressor_items_count`, func() float64 { - return float64(d.lc.ItemsCount()) - }) - d.dedupFlushDuration = ms.GetOrCreateHistogram(`vm_streamaggr_dedup_flush_duration_seconds`) d.dedupFlushTimeouts = ms.GetOrCreateCounter(`vm_streamaggr_dedup_flush_timeouts_total`) @@ -103,7 +94,7 @@ func (d *Deduplicator) Push(tss []prompbmarshal.TimeSeries) { } labels.Sort() - buf = d.lc.Compress(buf[:0], labels.Labels) + buf = lc.Compress(buf[:0], labels.Labels) key := bytesutil.InternBytes(buf) for _, s := range ts.Samples { pss = append(pss, pushSample{ @@ -155,7 +146,7 @@ func (d *Deduplicator) flush(pushFunc PushFunc, dedupInterval time.Duration) { samples := ctx.samples for _, ps := range pss { labelsLen := len(labels) - labels = decompressLabels(labels, &d.lc, ps.key) + labels = decompressLabels(labels, ps.key) samplesLen := len(samples) samples = append(samples, prompbmarshal.Sample{ diff --git a/lib/streamaggr/streamaggr.go b/lib/streamaggr/streamaggr.go index 5cb480aad..3e5b72ecd 100644 --- a/lib/streamaggr/streamaggr.go +++ b/lib/streamaggr/streamaggr.go @@ -45,6 +45,19 @@ var supportedOutputs = []string{ "quantiles(phi1, ..., phiN)", } +var ( + // lc contains information about all compressed labels for streaming aggregation + lc promutils.LabelsCompressor + + _ = metrics.NewGauge(`vm_streamaggr_labels_compressor_size_bytes`, func() float64 { + return float64(lc.SizeBytes()) + }) + + _ = metrics.NewGauge(`vm_streamaggr_labels_compressor_items_count`, func() float64 { + return float64(lc.ItemsCount()) + }) +) + // LoadFromFile loads Aggregators from the given path and uses the given pushFunc for pushing the aggregated data. // // opts can contain additional options. If opts is nil, then default options are used. @@ -269,21 +282,6 @@ func newAggregatorsFromData(data []byte, pushFunc PushFunc, opts *Options) (*Agg return float64(n) }) - _ = ms.NewGauge(`vm_streamaggr_labels_compressor_size_bytes`, func() float64 { - n := uint64(0) - for _, aggr := range as { - n += aggr.lc.SizeBytes() - } - return float64(n) - }) - _ = ms.NewGauge(`vm_streamaggr_labels_compressor_items_count`, func() float64 { - n := uint64(0) - for _, aggr := range as { - n += aggr.lc.ItemsCount() - } - return float64(n) - }) - metrics.RegisterSet(ms) return &Aggregators{ as: as, @@ -361,9 +359,6 @@ type aggregator struct { // aggrStates contains aggregate states for the given outputs aggrStates []aggrState - // lc is used for compressing series keys before passing them to dedupAggr and aggrState - lc promutils.LabelsCompressor - // minTimestamp is used for ignoring old samples when ignoreOldSamples is set minTimestamp atomic.Int64 @@ -822,7 +817,7 @@ func (a *aggregator) Push(tss []prompbmarshal.TimeSeries, matchIdxs []byte) { outputLabels.Labels = append(outputLabels.Labels, labels.Labels...) } - buf = compressLabels(buf[:0], &a.lc, inputLabels.Labels, outputLabels.Labels) + buf = compressLabels(buf[:0], inputLabels.Labels, outputLabels.Labels) key := bytesutil.InternBytes(buf) for _, sample := range ts.Samples { if math.IsNaN(sample.Value) { @@ -850,7 +845,7 @@ func (a *aggregator) Push(tss []prompbmarshal.TimeSeries, matchIdxs []byte) { } } -func compressLabels(dst []byte, lc *promutils.LabelsCompressor, inputLabels, outputLabels []prompbmarshal.Label) []byte { +func compressLabels(dst []byte, inputLabels, outputLabels []prompbmarshal.Label) []byte { bb := bbPool.Get() bb.B = lc.Compress(bb.B, inputLabels) dst = encoding.MarshalVarUint64(dst, uint64(len(bb.B))) @@ -860,7 +855,7 @@ func compressLabels(dst []byte, lc *promutils.LabelsCompressor, inputLabels, out return dst } -func decompressLabels(dst []prompbmarshal.Label, lc *promutils.LabelsCompressor, key string) []prompbmarshal.Label { +func decompressLabels(dst []prompbmarshal.Label, key string) []prompbmarshal.Label { return lc.Decompress(dst, bytesutil.ToUnsafeBytes(key)) } @@ -1035,7 +1030,7 @@ func (ctx *flushCtx) flushSeries() { func (ctx *flushCtx) appendSeries(key, suffix string, timestamp int64, value float64) { labelsLen := len(ctx.labels) samplesLen := len(ctx.samples) - ctx.labels = decompressLabels(ctx.labels, &ctx.a.lc, key) + ctx.labels = decompressLabels(ctx.labels, key) if !ctx.a.keepMetricNames { ctx.labels = addMetricSuffix(ctx.labels, labelsLen, ctx.a.suffix, suffix) } @@ -1058,7 +1053,7 @@ func (ctx *flushCtx) appendSeries(key, suffix string, timestamp int64, value flo func (ctx *flushCtx) appendSeriesWithExtraLabel(key, suffix string, timestamp int64, value float64, extraName, extraValue string) { labelsLen := len(ctx.labels) samplesLen := len(ctx.samples) - ctx.labels = decompressLabels(ctx.labels, &ctx.a.lc, key) + ctx.labels = decompressLabels(ctx.labels, key) if !ctx.a.keepMetricNames { ctx.labels = addMetricSuffix(ctx.labels, labelsLen, ctx.a.suffix, suffix) }