diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index e41464b3a7..33c2a57607 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -51,6 +51,7 @@ See also [LTS releases](https://docs.victoriametrics.com/lts-releases/). * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): allow configuring `-remoteWrite.disableOnDiskQueue` and `-remoteWrite.dropSamplesOnOverload` cmd-line flags per each `-remoteWrite.url`. See this [pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6065). Thanks to @rbizos for implementation! * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add labels `path` and `url` to metrics `vmagent_remotewrite_push_failures_total` and `vmagent_remotewrite_samples_dropped_total`. Now number of failed pushes and dropped samples can be tracked per `-remoteWrite.url`. * FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): add [rate_sum](https://docs.victoriametrics.com/stream-aggregation/#rate_sum) and [rate_avg](https://docs.victoriametrics.com/stream-aggregation/#rate_avg) aggregation outputs. +* FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): reduce the number of allocated objects in heap during deduplication and aggregation. The change supposed to reduce pressure on Garbage Collector, as it will need to scan less objects. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6402). * FEATURE: [vmalert](https://docs.victoriametrics.com/vmalert/): add `datasource.idleConnTimeout`, `remoteWrite.idleConnTimeout` and `remoteRead.idleConnTimeout` flags. These flags are set to 50s by default and should reduce the probability of `broken pipe` or `connection reset by peer` errors in vmalert logs. See this [issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5661) for details. * FEATURE: [vmalert](https://docs.victoriametrics.com/vmalert/): add auto request retry for trivial network errors, such as `broken pipe` and `connection reset` for requests to `remoteRead`, `remoteWrite` and `datasource` URLs. See this [issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5661) for details. * FEATURE: [vmalert](https://docs.victoriametrics.com/vmalert/): reduce CPU usage when evaluating high number of alerting and recording rules. diff --git a/lib/streamaggr/avg.go b/lib/streamaggr/avg.go index 131c9a8a0c..e3fdfaa4d7 100644 --- a/lib/streamaggr/avg.go +++ b/lib/streamaggr/avg.go @@ -1,6 +1,7 @@ package streamaggr import ( + "strings" "sync" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" @@ -35,7 +36,7 @@ func (as *avgAggrState) pushSamples(samples []pushSample) { sum: s.value, count: 1, } - vNew, loaded := as.m.LoadOrStore(outputKey, v) + vNew, loaded := as.m.LoadOrStore(strings.Clone(outputKey), v) if !loaded { // The entry has been successfully stored continue diff --git a/lib/streamaggr/count_samples.go b/lib/streamaggr/count_samples.go index 30099d4bd7..b238684371 100644 --- a/lib/streamaggr/count_samples.go +++ b/lib/streamaggr/count_samples.go @@ -1,6 +1,7 @@ package streamaggr import ( + "strings" "sync" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" @@ -33,7 +34,7 @@ func (as *countSamplesAggrState) pushSamples(samples []pushSample) { v = &countSamplesStateValue{ n: 1, } - vNew, loaded := as.m.LoadOrStore(outputKey, v) + vNew, loaded := as.m.LoadOrStore(strings.Clone(outputKey), v) if !loaded { // The new entry has been successfully created. continue diff --git a/lib/streamaggr/count_series.go b/lib/streamaggr/count_series.go index a80c69dbba..6348d38f50 100644 --- a/lib/streamaggr/count_series.go +++ b/lib/streamaggr/count_series.go @@ -1,6 +1,7 @@ package streamaggr import ( + "strings" "sync" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" @@ -41,7 +42,7 @@ func (as *countSeriesAggrState) pushSamples(samples []pushSample) { h: {}, }, } - vNew, loaded := as.m.LoadOrStore(outputKey, v) + vNew, loaded := as.m.LoadOrStore(strings.Clone(outputKey), v) if !loaded { // The entry has been added to the map. continue diff --git a/lib/streamaggr/dedup.go b/lib/streamaggr/dedup.go index c9ac3dc676..2354499d3b 100644 --- a/lib/streamaggr/dedup.go +++ b/lib/streamaggr/dedup.go @@ -1,6 +1,7 @@ package streamaggr import ( + "strings" "sync" "unsafe" @@ -24,7 +25,7 @@ type dedupAggrShard struct { type dedupAggrShardNopad struct { mu sync.Mutex - m map[string]dedupAggrSample + m map[string]*dedupAggrSample } type dedupAggrSample struct { @@ -169,13 +170,13 @@ func (das *dedupAggrShard) pushSamples(samples []pushSample) { m := das.m if m == nil { - m = make(map[string]dedupAggrSample, len(samples)) + m = make(map[string]*dedupAggrSample, len(samples)) das.m = m } for _, sample := range samples { s, ok := m[sample.key] if !ok { - m[sample.key] = dedupAggrSample{ + m[strings.Clone(sample.key)] = &dedupAggrSample{ value: sample.value, timestamp: sample.timestamp, } @@ -183,10 +184,8 @@ func (das *dedupAggrShard) pushSamples(samples []pushSample) { } // Update the existing value according to logic described at https://docs.victoriametrics.com/#deduplication if sample.timestamp > s.timestamp || (sample.timestamp == s.timestamp && sample.value > s.value) { - m[sample.key] = dedupAggrSample{ - value: sample.value, - timestamp: sample.timestamp, - } + s.value = sample.value + s.timestamp = sample.timestamp } } } @@ -196,7 +195,7 @@ func (das *dedupAggrShard) flush(ctx *dedupFlushCtx, f func(samples []pushSample m := das.m if resetState && len(m) > 0 { - das.m = make(map[string]dedupAggrSample, len(m)) + das.m = make(map[string]*dedupAggrSample, len(m)) } das.mu.Unlock() diff --git a/lib/streamaggr/histogram_bucket.go b/lib/streamaggr/histogram_bucket.go index aba933ae47..519332fd0f 100644 --- a/lib/streamaggr/histogram_bucket.go +++ b/lib/streamaggr/histogram_bucket.go @@ -2,6 +2,7 @@ package streamaggr import ( "math" + "strings" "sync" "time" @@ -42,7 +43,7 @@ func (as *histogramBucketAggrState) pushSamples(samples []pushSample) { if !ok { // The entry is missing in the map. Try creating it. v = &histogramBucketStateValue{} - vNew, loaded := as.m.LoadOrStore(outputKey, v) + vNew, loaded := as.m.LoadOrStore(strings.Clone(outputKey), v) if loaded { // Use the entry created by a concurrent goroutine. v = vNew diff --git a/lib/streamaggr/last.go b/lib/streamaggr/last.go index c8c7881edf..32410f9d3b 100644 --- a/lib/streamaggr/last.go +++ b/lib/streamaggr/last.go @@ -1,6 +1,7 @@ package streamaggr import ( + "strings" "sync" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" @@ -35,7 +36,7 @@ func (as *lastAggrState) pushSamples(samples []pushSample) { last: s.value, timestamp: s.timestamp, } - vNew, loaded := as.m.LoadOrStore(outputKey, v) + vNew, loaded := as.m.LoadOrStore(strings.Clone(outputKey), v) if !loaded { // The new entry has been successfully created. continue diff --git a/lib/streamaggr/max.go b/lib/streamaggr/max.go index f92036c66a..7a3d6176f6 100644 --- a/lib/streamaggr/max.go +++ b/lib/streamaggr/max.go @@ -1,6 +1,7 @@ package streamaggr import ( + "strings" "sync" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" @@ -33,7 +34,7 @@ func (as *maxAggrState) pushSamples(samples []pushSample) { v = &maxStateValue{ max: s.value, } - vNew, loaded := as.m.LoadOrStore(outputKey, v) + vNew, loaded := as.m.LoadOrStore(strings.Clone(outputKey), v) if !loaded { // The new entry has been successfully created. continue diff --git a/lib/streamaggr/min.go b/lib/streamaggr/min.go index 17137e6cdb..af81d92ede 100644 --- a/lib/streamaggr/min.go +++ b/lib/streamaggr/min.go @@ -1,6 +1,7 @@ package streamaggr import ( + "strings" "sync" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" @@ -33,7 +34,7 @@ func (as *minAggrState) pushSamples(samples []pushSample) { v = &minStateValue{ min: s.value, } - vNew, loaded := as.m.LoadOrStore(outputKey, v) + vNew, loaded := as.m.LoadOrStore(strings.Clone(outputKey), v) if !loaded { // The new entry has been successfully created. continue diff --git a/lib/streamaggr/quantiles.go b/lib/streamaggr/quantiles.go index b97697ebbb..0da0f1faee 100644 --- a/lib/streamaggr/quantiles.go +++ b/lib/streamaggr/quantiles.go @@ -2,6 +2,7 @@ package streamaggr import ( "strconv" + "strings" "sync" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" @@ -41,7 +42,7 @@ func (as *quantilesAggrState) pushSamples(samples []pushSample) { v = &quantilesStateValue{ h: h, } - vNew, loaded := as.m.LoadOrStore(outputKey, v) + vNew, loaded := as.m.LoadOrStore(strings.Clone(outputKey), v) if loaded { // Use the entry created by a concurrent goroutine. histogram.PutFast(h) diff --git a/lib/streamaggr/stddev.go b/lib/streamaggr/stddev.go index 0eb6df6fec..483a9e4420 100644 --- a/lib/streamaggr/stddev.go +++ b/lib/streamaggr/stddev.go @@ -2,6 +2,7 @@ package streamaggr import ( "math" + "strings" "sync" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" @@ -34,7 +35,7 @@ func (as *stddevAggrState) pushSamples(samples []pushSample) { if !ok { // The entry is missing in the map. Try creating it. v = &stddevStateValue{} - vNew, loaded := as.m.LoadOrStore(outputKey, v) + vNew, loaded := as.m.LoadOrStore(strings.Clone(outputKey), v) if loaded { // Use the entry created by a concurrent goroutine. v = vNew diff --git a/lib/streamaggr/stdvar.go b/lib/streamaggr/stdvar.go index c5d46af2ae..a0c4d6df28 100644 --- a/lib/streamaggr/stdvar.go +++ b/lib/streamaggr/stdvar.go @@ -1,6 +1,7 @@ package streamaggr import ( + "strings" "sync" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" @@ -33,7 +34,7 @@ func (as *stdvarAggrState) pushSamples(samples []pushSample) { if !ok { // The entry is missing in the map. Try creating it. v = &stdvarStateValue{} - vNew, loaded := as.m.LoadOrStore(outputKey, v) + vNew, loaded := as.m.LoadOrStore(strings.Clone(outputKey), v) if loaded { // Use the entry created by a concurrent goroutine. v = vNew diff --git a/lib/streamaggr/streamaggr.go b/lib/streamaggr/streamaggr.go index 625a0b570d..c2465f8082 100644 --- a/lib/streamaggr/streamaggr.go +++ b/lib/streamaggr/streamaggr.go @@ -845,8 +845,11 @@ func (a *aggregator) Push(tss []prompbmarshal.TimeSeries, matchIdxs []byte) { outputLabels.Labels = append(outputLabels.Labels, labels.Labels...) } - buf = compressLabels(buf[:0], inputLabels.Labels, outputLabels.Labels) - key := bytesutil.InternBytes(buf) + bufLen := len(buf) + buf = compressLabels(buf, inputLabels.Labels, outputLabels.Labels) + // key remains valid only by the end of this function and can't be reused after + // do not intern key because number of unique keys could be too high + key := bytesutil.ToUnsafeString(buf[bufLen:]) for _, sample := range ts.Samples { if math.IsNaN(sample.Value) { a.ignoredNanSamples.Inc() @@ -942,6 +945,8 @@ func (ctx *pushCtx) reset() { } type pushSample struct { + // key identifies a sample that belongs to unique series + // key value can't be re-used key string value float64 timestamp int64 diff --git a/lib/streamaggr/sum_samples.go b/lib/streamaggr/sum_samples.go index bbd63fd0ef..0e72ebaffd 100644 --- a/lib/streamaggr/sum_samples.go +++ b/lib/streamaggr/sum_samples.go @@ -1,6 +1,7 @@ package streamaggr import ( + "strings" "sync" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" @@ -33,7 +34,7 @@ func (as *sumSamplesAggrState) pushSamples(samples []pushSample) { v = &sumSamplesStateValue{ sum: s.value, } - vNew, loaded := as.m.LoadOrStore(outputKey, v) + vNew, loaded := as.m.LoadOrStore(strings.Clone(outputKey), v) if !loaded { // The new entry has been successfully created. continue diff --git a/lib/streamaggr/total.go b/lib/streamaggr/total.go index b53d326d32..ceca2d2234 100644 --- a/lib/streamaggr/total.go +++ b/lib/streamaggr/total.go @@ -2,6 +2,7 @@ package streamaggr import ( "math" + "strings" "sync" "time" @@ -80,7 +81,7 @@ func (as *totalAggrState) pushSamples(samples []pushSample) { v = &totalStateValue{ lastValues: make(map[string]totalLastValueState), } - vNew, loaded := as.m.LoadOrStore(outputKey, v) + vNew, loaded := as.m.LoadOrStore(strings.Clone(outputKey), v) if loaded { // Use the entry created by a concurrent goroutine. v = vNew @@ -108,6 +109,9 @@ func (as *totalAggrState) pushSamples(samples []pushSample) { lv.value = s.value lv.timestamp = s.timestamp lv.deleteDeadline = deleteDeadline + if !ok { + inputKey = strings.Clone(inputKey) + } sv.lastValues[inputKey] = lv sv.deleteDeadline = deleteDeadline } diff --git a/lib/streamaggr/unique_samples.go b/lib/streamaggr/unique_samples.go index 4f10b5c02b..7c11a38838 100644 --- a/lib/streamaggr/unique_samples.go +++ b/lib/streamaggr/unique_samples.go @@ -1,6 +1,7 @@ package streamaggr import ( + "strings" "sync" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" @@ -35,7 +36,7 @@ func (as *uniqueSamplesAggrState) pushSamples(samples []pushSample) { s.value: {}, }, } - vNew, loaded := as.m.LoadOrStore(outputKey, v) + vNew, loaded := as.m.LoadOrStore(strings.Clone(outputKey), v) if !loaded { // The new entry has been successfully created. continue