From 78121642df2afd16b4e356986bb71c12284aa3b7 Mon Sep 17 00:00:00 2001 From: Roman Khavronenko Date: Fri, 7 Jun 2024 15:45:52 +0200 Subject: [PATCH] lib/streamaggr: reduce number of inuse objects (#6402) The main change is getting rid of interning of sample key. It was discovered that for cases with many unique time series aggregated by vmagent interned keys could grow up to hundreds of millions of objects. This has negative impact on the following aspects: 1. It slows down garbage collection cycles, as GC has to scan all inuse objects periodically. The higher is the number of inuse objects, the longer it takes/the more CPU it takes. 2. It slows down the hot path of samples aggregation where each key needs to be looked up in the map first. The change makes code more fragile, but suppose to provide performance optimization for heavy-loaded vmagents with stream aggregation enabled. --------- Signed-off-by: hagen1778 Co-authored-by: Aliaksandr Valialkin --- docs/CHANGELOG.md | 1 + lib/streamaggr/avg.go | 3 ++- lib/streamaggr/count_samples.go | 3 ++- lib/streamaggr/count_series.go | 3 ++- lib/streamaggr/dedup.go | 15 +++++++-------- lib/streamaggr/histogram_bucket.go | 3 ++- lib/streamaggr/last.go | 3 ++- lib/streamaggr/max.go | 3 ++- lib/streamaggr/min.go | 3 ++- lib/streamaggr/quantiles.go | 3 ++- lib/streamaggr/stddev.go | 3 ++- lib/streamaggr/stdvar.go | 3 ++- lib/streamaggr/streamaggr.go | 9 +++++++-- lib/streamaggr/sum_samples.go | 3 ++- lib/streamaggr/total.go | 6 +++++- lib/streamaggr/unique_samples.go | 3 ++- 16 files changed, 44 insertions(+), 23 deletions(-) diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index e41464b3a7..33c2a57607 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -51,6 +51,7 @@ See also [LTS releases](https://docs.victoriametrics.com/lts-releases/). * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): allow configuring `-remoteWrite.disableOnDiskQueue` and `-remoteWrite.dropSamplesOnOverload` cmd-line flags per each `-remoteWrite.url`. See this [pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6065). Thanks to @rbizos for implementation! * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add labels `path` and `url` to metrics `vmagent_remotewrite_push_failures_total` and `vmagent_remotewrite_samples_dropped_total`. Now number of failed pushes and dropped samples can be tracked per `-remoteWrite.url`. * FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): add [rate_sum](https://docs.victoriametrics.com/stream-aggregation/#rate_sum) and [rate_avg](https://docs.victoriametrics.com/stream-aggregation/#rate_avg) aggregation outputs. +* FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): reduce the number of allocated objects in heap during deduplication and aggregation. The change supposed to reduce pressure on Garbage Collector, as it will need to scan less objects. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6402). * FEATURE: [vmalert](https://docs.victoriametrics.com/vmalert/): add `datasource.idleConnTimeout`, `remoteWrite.idleConnTimeout` and `remoteRead.idleConnTimeout` flags. These flags are set to 50s by default and should reduce the probability of `broken pipe` or `connection reset by peer` errors in vmalert logs. See this [issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5661) for details. * FEATURE: [vmalert](https://docs.victoriametrics.com/vmalert/): add auto request retry for trivial network errors, such as `broken pipe` and `connection reset` for requests to `remoteRead`, `remoteWrite` and `datasource` URLs. See this [issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5661) for details. * FEATURE: [vmalert](https://docs.victoriametrics.com/vmalert/): reduce CPU usage when evaluating high number of alerting and recording rules. diff --git a/lib/streamaggr/avg.go b/lib/streamaggr/avg.go index 131c9a8a0c..e3fdfaa4d7 100644 --- a/lib/streamaggr/avg.go +++ b/lib/streamaggr/avg.go @@ -1,6 +1,7 @@ package streamaggr import ( + "strings" "sync" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" @@ -35,7 +36,7 @@ func (as *avgAggrState) pushSamples(samples []pushSample) { sum: s.value, count: 1, } - vNew, loaded := as.m.LoadOrStore(outputKey, v) + vNew, loaded := as.m.LoadOrStore(strings.Clone(outputKey), v) if !loaded { // The entry has been successfully stored continue diff --git a/lib/streamaggr/count_samples.go b/lib/streamaggr/count_samples.go index 30099d4bd7..b238684371 100644 --- a/lib/streamaggr/count_samples.go +++ b/lib/streamaggr/count_samples.go @@ -1,6 +1,7 @@ package streamaggr import ( + "strings" "sync" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" @@ -33,7 +34,7 @@ func (as *countSamplesAggrState) pushSamples(samples []pushSample) { v = &countSamplesStateValue{ n: 1, } - vNew, loaded := as.m.LoadOrStore(outputKey, v) + vNew, loaded := as.m.LoadOrStore(strings.Clone(outputKey), v) if !loaded { // The new entry has been successfully created. continue diff --git a/lib/streamaggr/count_series.go b/lib/streamaggr/count_series.go index a80c69dbba..6348d38f50 100644 --- a/lib/streamaggr/count_series.go +++ b/lib/streamaggr/count_series.go @@ -1,6 +1,7 @@ package streamaggr import ( + "strings" "sync" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" @@ -41,7 +42,7 @@ func (as *countSeriesAggrState) pushSamples(samples []pushSample) { h: {}, }, } - vNew, loaded := as.m.LoadOrStore(outputKey, v) + vNew, loaded := as.m.LoadOrStore(strings.Clone(outputKey), v) if !loaded { // The entry has been added to the map. continue diff --git a/lib/streamaggr/dedup.go b/lib/streamaggr/dedup.go index c9ac3dc676..2354499d3b 100644 --- a/lib/streamaggr/dedup.go +++ b/lib/streamaggr/dedup.go @@ -1,6 +1,7 @@ package streamaggr import ( + "strings" "sync" "unsafe" @@ -24,7 +25,7 @@ type dedupAggrShard struct { type dedupAggrShardNopad struct { mu sync.Mutex - m map[string]dedupAggrSample + m map[string]*dedupAggrSample } type dedupAggrSample struct { @@ -169,13 +170,13 @@ func (das *dedupAggrShard) pushSamples(samples []pushSample) { m := das.m if m == nil { - m = make(map[string]dedupAggrSample, len(samples)) + m = make(map[string]*dedupAggrSample, len(samples)) das.m = m } for _, sample := range samples { s, ok := m[sample.key] if !ok { - m[sample.key] = dedupAggrSample{ + m[strings.Clone(sample.key)] = &dedupAggrSample{ value: sample.value, timestamp: sample.timestamp, } @@ -183,10 +184,8 @@ func (das *dedupAggrShard) pushSamples(samples []pushSample) { } // Update the existing value according to logic described at https://docs.victoriametrics.com/#deduplication if sample.timestamp > s.timestamp || (sample.timestamp == s.timestamp && sample.value > s.value) { - m[sample.key] = dedupAggrSample{ - value: sample.value, - timestamp: sample.timestamp, - } + s.value = sample.value + s.timestamp = sample.timestamp } } } @@ -196,7 +195,7 @@ func (das *dedupAggrShard) flush(ctx *dedupFlushCtx, f func(samples []pushSample m := das.m if resetState && len(m) > 0 { - das.m = make(map[string]dedupAggrSample, len(m)) + das.m = make(map[string]*dedupAggrSample, len(m)) } das.mu.Unlock() diff --git a/lib/streamaggr/histogram_bucket.go b/lib/streamaggr/histogram_bucket.go index aba933ae47..519332fd0f 100644 --- a/lib/streamaggr/histogram_bucket.go +++ b/lib/streamaggr/histogram_bucket.go @@ -2,6 +2,7 @@ package streamaggr import ( "math" + "strings" "sync" "time" @@ -42,7 +43,7 @@ func (as *histogramBucketAggrState) pushSamples(samples []pushSample) { if !ok { // The entry is missing in the map. Try creating it. v = &histogramBucketStateValue{} - vNew, loaded := as.m.LoadOrStore(outputKey, v) + vNew, loaded := as.m.LoadOrStore(strings.Clone(outputKey), v) if loaded { // Use the entry created by a concurrent goroutine. v = vNew diff --git a/lib/streamaggr/last.go b/lib/streamaggr/last.go index c8c7881edf..32410f9d3b 100644 --- a/lib/streamaggr/last.go +++ b/lib/streamaggr/last.go @@ -1,6 +1,7 @@ package streamaggr import ( + "strings" "sync" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" @@ -35,7 +36,7 @@ func (as *lastAggrState) pushSamples(samples []pushSample) { last: s.value, timestamp: s.timestamp, } - vNew, loaded := as.m.LoadOrStore(outputKey, v) + vNew, loaded := as.m.LoadOrStore(strings.Clone(outputKey), v) if !loaded { // The new entry has been successfully created. continue diff --git a/lib/streamaggr/max.go b/lib/streamaggr/max.go index f92036c66a..7a3d6176f6 100644 --- a/lib/streamaggr/max.go +++ b/lib/streamaggr/max.go @@ -1,6 +1,7 @@ package streamaggr import ( + "strings" "sync" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" @@ -33,7 +34,7 @@ func (as *maxAggrState) pushSamples(samples []pushSample) { v = &maxStateValue{ max: s.value, } - vNew, loaded := as.m.LoadOrStore(outputKey, v) + vNew, loaded := as.m.LoadOrStore(strings.Clone(outputKey), v) if !loaded { // The new entry has been successfully created. continue diff --git a/lib/streamaggr/min.go b/lib/streamaggr/min.go index 17137e6cdb..af81d92ede 100644 --- a/lib/streamaggr/min.go +++ b/lib/streamaggr/min.go @@ -1,6 +1,7 @@ package streamaggr import ( + "strings" "sync" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" @@ -33,7 +34,7 @@ func (as *minAggrState) pushSamples(samples []pushSample) { v = &minStateValue{ min: s.value, } - vNew, loaded := as.m.LoadOrStore(outputKey, v) + vNew, loaded := as.m.LoadOrStore(strings.Clone(outputKey), v) if !loaded { // The new entry has been successfully created. continue diff --git a/lib/streamaggr/quantiles.go b/lib/streamaggr/quantiles.go index b97697ebbb..0da0f1faee 100644 --- a/lib/streamaggr/quantiles.go +++ b/lib/streamaggr/quantiles.go @@ -2,6 +2,7 @@ package streamaggr import ( "strconv" + "strings" "sync" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" @@ -41,7 +42,7 @@ func (as *quantilesAggrState) pushSamples(samples []pushSample) { v = &quantilesStateValue{ h: h, } - vNew, loaded := as.m.LoadOrStore(outputKey, v) + vNew, loaded := as.m.LoadOrStore(strings.Clone(outputKey), v) if loaded { // Use the entry created by a concurrent goroutine. histogram.PutFast(h) diff --git a/lib/streamaggr/stddev.go b/lib/streamaggr/stddev.go index 0eb6df6fec..483a9e4420 100644 --- a/lib/streamaggr/stddev.go +++ b/lib/streamaggr/stddev.go @@ -2,6 +2,7 @@ package streamaggr import ( "math" + "strings" "sync" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" @@ -34,7 +35,7 @@ func (as *stddevAggrState) pushSamples(samples []pushSample) { if !ok { // The entry is missing in the map. Try creating it. v = &stddevStateValue{} - vNew, loaded := as.m.LoadOrStore(outputKey, v) + vNew, loaded := as.m.LoadOrStore(strings.Clone(outputKey), v) if loaded { // Use the entry created by a concurrent goroutine. v = vNew diff --git a/lib/streamaggr/stdvar.go b/lib/streamaggr/stdvar.go index c5d46af2ae..a0c4d6df28 100644 --- a/lib/streamaggr/stdvar.go +++ b/lib/streamaggr/stdvar.go @@ -1,6 +1,7 @@ package streamaggr import ( + "strings" "sync" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" @@ -33,7 +34,7 @@ func (as *stdvarAggrState) pushSamples(samples []pushSample) { if !ok { // The entry is missing in the map. Try creating it. v = &stdvarStateValue{} - vNew, loaded := as.m.LoadOrStore(outputKey, v) + vNew, loaded := as.m.LoadOrStore(strings.Clone(outputKey), v) if loaded { // Use the entry created by a concurrent goroutine. v = vNew diff --git a/lib/streamaggr/streamaggr.go b/lib/streamaggr/streamaggr.go index 625a0b570d..c2465f8082 100644 --- a/lib/streamaggr/streamaggr.go +++ b/lib/streamaggr/streamaggr.go @@ -845,8 +845,11 @@ func (a *aggregator) Push(tss []prompbmarshal.TimeSeries, matchIdxs []byte) { outputLabels.Labels = append(outputLabels.Labels, labels.Labels...) } - buf = compressLabels(buf[:0], inputLabels.Labels, outputLabels.Labels) - key := bytesutil.InternBytes(buf) + bufLen := len(buf) + buf = compressLabels(buf, inputLabels.Labels, outputLabels.Labels) + // key remains valid only by the end of this function and can't be reused after + // do not intern key because number of unique keys could be too high + key := bytesutil.ToUnsafeString(buf[bufLen:]) for _, sample := range ts.Samples { if math.IsNaN(sample.Value) { a.ignoredNanSamples.Inc() @@ -942,6 +945,8 @@ func (ctx *pushCtx) reset() { } type pushSample struct { + // key identifies a sample that belongs to unique series + // key value can't be re-used key string value float64 timestamp int64 diff --git a/lib/streamaggr/sum_samples.go b/lib/streamaggr/sum_samples.go index bbd63fd0ef..0e72ebaffd 100644 --- a/lib/streamaggr/sum_samples.go +++ b/lib/streamaggr/sum_samples.go @@ -1,6 +1,7 @@ package streamaggr import ( + "strings" "sync" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" @@ -33,7 +34,7 @@ func (as *sumSamplesAggrState) pushSamples(samples []pushSample) { v = &sumSamplesStateValue{ sum: s.value, } - vNew, loaded := as.m.LoadOrStore(outputKey, v) + vNew, loaded := as.m.LoadOrStore(strings.Clone(outputKey), v) if !loaded { // The new entry has been successfully created. continue diff --git a/lib/streamaggr/total.go b/lib/streamaggr/total.go index b53d326d32..ceca2d2234 100644 --- a/lib/streamaggr/total.go +++ b/lib/streamaggr/total.go @@ -2,6 +2,7 @@ package streamaggr import ( "math" + "strings" "sync" "time" @@ -80,7 +81,7 @@ func (as *totalAggrState) pushSamples(samples []pushSample) { v = &totalStateValue{ lastValues: make(map[string]totalLastValueState), } - vNew, loaded := as.m.LoadOrStore(outputKey, v) + vNew, loaded := as.m.LoadOrStore(strings.Clone(outputKey), v) if loaded { // Use the entry created by a concurrent goroutine. v = vNew @@ -108,6 +109,9 @@ func (as *totalAggrState) pushSamples(samples []pushSample) { lv.value = s.value lv.timestamp = s.timestamp lv.deleteDeadline = deleteDeadline + if !ok { + inputKey = strings.Clone(inputKey) + } sv.lastValues[inputKey] = lv sv.deleteDeadline = deleteDeadline } diff --git a/lib/streamaggr/unique_samples.go b/lib/streamaggr/unique_samples.go index 4f10b5c02b..7c11a38838 100644 --- a/lib/streamaggr/unique_samples.go +++ b/lib/streamaggr/unique_samples.go @@ -1,6 +1,7 @@ package streamaggr import ( + "strings" "sync" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" @@ -35,7 +36,7 @@ func (as *uniqueSamplesAggrState) pushSamples(samples []pushSample) { s.value: {}, }, } - vNew, loaded := as.m.LoadOrStore(outputKey, v) + vNew, loaded := as.m.LoadOrStore(strings.Clone(outputKey), v) if !loaded { // The new entry has been successfully created. continue