diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 33c2a5760..6a261ca86 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -51,7 +51,7 @@ See also [LTS releases](https://docs.victoriametrics.com/lts-releases/). * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): allow configuring `-remoteWrite.disableOnDiskQueue` and `-remoteWrite.dropSamplesOnOverload` cmd-line flags per each `-remoteWrite.url`. See this [pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6065). Thanks to @rbizos for implementation! * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add labels `path` and `url` to metrics `vmagent_remotewrite_push_failures_total` and `vmagent_remotewrite_samples_dropped_total`. Now number of failed pushes and dropped samples can be tracked per `-remoteWrite.url`. * FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): add [rate_sum](https://docs.victoriametrics.com/stream-aggregation/#rate_sum) and [rate_avg](https://docs.victoriametrics.com/stream-aggregation/#rate_avg) aggregation outputs. -* FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): reduce the number of allocated objects in heap during deduplication and aggregation. The change supposed to reduce pressure on Garbage Collector, as it will need to scan less objects. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6402). +* FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): reduce the number of allocated objects in heap during deduplication and aggregation. The change supposed to reduce pressure on Garbage Collector, as it will need to scan less objects. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6402). * FEATURE: [vmalert](https://docs.victoriametrics.com/vmalert/): add `datasource.idleConnTimeout`, `remoteWrite.idleConnTimeout` and `remoteRead.idleConnTimeout` flags. These flags are set to 50s by default and should reduce the probability of `broken pipe` or `connection reset by peer` errors in vmalert logs. See this [issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5661) for details. * FEATURE: [vmalert](https://docs.victoriametrics.com/vmalert/): add auto request retry for trivial network errors, such as `broken pipe` and `connection reset` for requests to `remoteRead`, `remoteWrite` and `datasource` URLs. See this [issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5661) for details. * FEATURE: [vmalert](https://docs.victoriametrics.com/vmalert/): reduce CPU usage when evaluating high number of alerting and recording rules. diff --git a/lib/streamaggr/avg.go b/lib/streamaggr/avg.go index e3fdfaa4d..2d0546058 100644 --- a/lib/streamaggr/avg.go +++ b/lib/streamaggr/avg.go @@ -1,9 +1,9 @@ package streamaggr import ( - "strings" "sync" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" ) @@ -36,7 +36,8 @@ func (as *avgAggrState) pushSamples(samples []pushSample) { sum: s.value, count: 1, } - vNew, loaded := as.m.LoadOrStore(strings.Clone(outputKey), v) + outputKey = bytesutil.InternString(outputKey) + vNew, loaded := as.m.LoadOrStore(outputKey, v) if !loaded { // The entry has been successfully stored continue diff --git a/lib/streamaggr/count_samples.go b/lib/streamaggr/count_samples.go index b23868437..f48ae44dd 100644 --- a/lib/streamaggr/count_samples.go +++ b/lib/streamaggr/count_samples.go @@ -1,9 +1,9 @@ package streamaggr import ( - "strings" "sync" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" ) @@ -34,7 +34,8 @@ func (as *countSamplesAggrState) pushSamples(samples []pushSample) { v = &countSamplesStateValue{ n: 1, } - vNew, loaded := as.m.LoadOrStore(strings.Clone(outputKey), v) + outputKey = bytesutil.InternString(outputKey) + vNew, loaded := as.m.LoadOrStore(outputKey, v) if !loaded { // The new entry has been successfully created. continue diff --git a/lib/streamaggr/count_series.go b/lib/streamaggr/count_series.go index 6348d38f5..edf4a95a4 100644 --- a/lib/streamaggr/count_series.go +++ b/lib/streamaggr/count_series.go @@ -1,7 +1,6 @@ package streamaggr import ( - "strings" "sync" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" @@ -42,7 +41,8 @@ func (as *countSeriesAggrState) pushSamples(samples []pushSample) { h: {}, }, } - vNew, loaded := as.m.LoadOrStore(strings.Clone(outputKey), v) + outputKey = bytesutil.InternString(outputKey) + vNew, loaded := as.m.LoadOrStore(outputKey, v) if !loaded { // The entry has been added to the map. continue diff --git a/lib/streamaggr/dedup.go b/lib/streamaggr/dedup.go index 2354499d3..399e9a3dc 100644 --- a/lib/streamaggr/dedup.go +++ b/lib/streamaggr/dedup.go @@ -1,7 +1,6 @@ package streamaggr import ( - "strings" "sync" "unsafe" @@ -25,7 +24,7 @@ type dedupAggrShard struct { type dedupAggrShardNopad struct { mu sync.Mutex - m map[string]*dedupAggrSample + m map[string]dedupAggrSample } type dedupAggrSample struct { @@ -170,13 +169,14 @@ func (das *dedupAggrShard) pushSamples(samples []pushSample) { m := das.m if m == nil { - m = make(map[string]*dedupAggrSample, len(samples)) + m = make(map[string]dedupAggrSample, len(samples)) das.m = m } for _, sample := range samples { s, ok := m[sample.key] if !ok { - m[strings.Clone(sample.key)] = &dedupAggrSample{ + key := bytesutil.InternString(sample.key) + m[key] = dedupAggrSample{ value: sample.value, timestamp: sample.timestamp, } @@ -184,8 +184,11 @@ func (das *dedupAggrShard) pushSamples(samples []pushSample) { } // Update the existing value according to logic described at https://docs.victoriametrics.com/#deduplication if sample.timestamp > s.timestamp || (sample.timestamp == s.timestamp && sample.value > s.value) { - s.value = sample.value - s.timestamp = sample.timestamp + key := bytesutil.InternString(sample.key) + m[key] = dedupAggrSample{ + value: sample.value, + timestamp: sample.timestamp, + } } } } @@ -195,7 +198,7 @@ func (das *dedupAggrShard) flush(ctx *dedupFlushCtx, f func(samples []pushSample m := das.m if resetState && len(m) > 0 { - das.m = make(map[string]*dedupAggrSample, len(m)) + das.m = make(map[string]dedupAggrSample, len(m)) } das.mu.Unlock() diff --git a/lib/streamaggr/deduplicator.go b/lib/streamaggr/deduplicator.go index a96d765c2..3ec25f5fa 100644 --- a/lib/streamaggr/deduplicator.go +++ b/lib/streamaggr/deduplicator.go @@ -94,8 +94,9 @@ func (d *Deduplicator) Push(tss []prompbmarshal.TimeSeries) { } labels.Sort() - buf = lc.Compress(buf[:0], labels.Labels) - key := bytesutil.InternBytes(buf) + bufLen := len(buf) + buf = lc.Compress(buf, labels.Labels) + key := bytesutil.ToUnsafeString(buf[bufLen:]) for _, s := range ts.Samples { pss = append(pss, pushSample{ key: key, diff --git a/lib/streamaggr/histogram_bucket.go b/lib/streamaggr/histogram_bucket.go index 519332fd0..4cce32c2d 100644 --- a/lib/streamaggr/histogram_bucket.go +++ b/lib/streamaggr/histogram_bucket.go @@ -2,10 +2,10 @@ package streamaggr import ( "math" - "strings" "sync" "time" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/metrics" ) @@ -43,7 +43,8 @@ func (as *histogramBucketAggrState) pushSamples(samples []pushSample) { if !ok { // The entry is missing in the map. Try creating it. v = &histogramBucketStateValue{} - vNew, loaded := as.m.LoadOrStore(strings.Clone(outputKey), v) + outputKey = bytesutil.InternString(outputKey) + vNew, loaded := as.m.LoadOrStore(outputKey, v) if loaded { // Use the entry created by a concurrent goroutine. v = vNew diff --git a/lib/streamaggr/last.go b/lib/streamaggr/last.go index 32410f9d3..9bf4e08e0 100644 --- a/lib/streamaggr/last.go +++ b/lib/streamaggr/last.go @@ -1,9 +1,9 @@ package streamaggr import ( - "strings" "sync" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" ) @@ -36,7 +36,8 @@ func (as *lastAggrState) pushSamples(samples []pushSample) { last: s.value, timestamp: s.timestamp, } - vNew, loaded := as.m.LoadOrStore(strings.Clone(outputKey), v) + outputKey = bytesutil.InternString(outputKey) + vNew, loaded := as.m.LoadOrStore(outputKey, v) if !loaded { // The new entry has been successfully created. continue diff --git a/lib/streamaggr/max.go b/lib/streamaggr/max.go index 7a3d6176f..a106d3300 100644 --- a/lib/streamaggr/max.go +++ b/lib/streamaggr/max.go @@ -1,9 +1,9 @@ package streamaggr import ( - "strings" "sync" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" ) @@ -34,7 +34,8 @@ func (as *maxAggrState) pushSamples(samples []pushSample) { v = &maxStateValue{ max: s.value, } - vNew, loaded := as.m.LoadOrStore(strings.Clone(outputKey), v) + outputKey = bytesutil.InternString(outputKey) + vNew, loaded := as.m.LoadOrStore(outputKey, v) if !loaded { // The new entry has been successfully created. continue diff --git a/lib/streamaggr/min.go b/lib/streamaggr/min.go index af81d92ed..b03105c92 100644 --- a/lib/streamaggr/min.go +++ b/lib/streamaggr/min.go @@ -1,9 +1,9 @@ package streamaggr import ( - "strings" "sync" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" ) @@ -34,7 +34,8 @@ func (as *minAggrState) pushSamples(samples []pushSample) { v = &minStateValue{ min: s.value, } - vNew, loaded := as.m.LoadOrStore(strings.Clone(outputKey), v) + outputKey = bytesutil.InternString(outputKey) + vNew, loaded := as.m.LoadOrStore(outputKey, v) if !loaded { // The new entry has been successfully created. continue diff --git a/lib/streamaggr/quantiles.go b/lib/streamaggr/quantiles.go index 0da0f1fae..edaa128f5 100644 --- a/lib/streamaggr/quantiles.go +++ b/lib/streamaggr/quantiles.go @@ -2,7 +2,6 @@ package streamaggr import ( "strconv" - "strings" "sync" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" @@ -42,7 +41,8 @@ func (as *quantilesAggrState) pushSamples(samples []pushSample) { v = &quantilesStateValue{ h: h, } - vNew, loaded := as.m.LoadOrStore(strings.Clone(outputKey), v) + outputKey = bytesutil.InternString(outputKey) + vNew, loaded := as.m.LoadOrStore(outputKey, v) if loaded { // Use the entry created by a concurrent goroutine. histogram.PutFast(h) diff --git a/lib/streamaggr/rate.go b/lib/streamaggr/rate.go index 1eedf1f1d..44e6bcea4 100644 --- a/lib/streamaggr/rate.go +++ b/lib/streamaggr/rate.go @@ -4,6 +4,7 @@ import ( "sync" "time" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" ) @@ -59,6 +60,7 @@ func (as *rateAggrState) pushSamples(samples []pushSample) { v = &rateStateValue{ lastValues: make(map[string]rateLastValueState), } + outputKey = bytesutil.InternString(outputKey) vNew, loaded := as.m.LoadOrStore(outputKey, v) if loaded { // Use the entry created by a concurrent goroutine. @@ -89,6 +91,8 @@ func (as *rateAggrState) pushSamples(samples []pushSample) { lv.value = s.value lv.timestamp = s.timestamp lv.deleteDeadline = deleteDeadline + + inputKey = bytesutil.InternString(inputKey) sv.lastValues[inputKey] = lv sv.deleteDeadline = deleteDeadline } diff --git a/lib/streamaggr/stddev.go b/lib/streamaggr/stddev.go index 483a9e442..ef10177a0 100644 --- a/lib/streamaggr/stddev.go +++ b/lib/streamaggr/stddev.go @@ -2,9 +2,9 @@ package streamaggr import ( "math" - "strings" "sync" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" ) @@ -35,7 +35,8 @@ func (as *stddevAggrState) pushSamples(samples []pushSample) { if !ok { // The entry is missing in the map. Try creating it. v = &stddevStateValue{} - vNew, loaded := as.m.LoadOrStore(strings.Clone(outputKey), v) + outputKey = bytesutil.InternString(outputKey) + vNew, loaded := as.m.LoadOrStore(outputKey, v) if loaded { // Use the entry created by a concurrent goroutine. v = vNew diff --git a/lib/streamaggr/stdvar.go b/lib/streamaggr/stdvar.go index a0c4d6df2..dd4c21efe 100644 --- a/lib/streamaggr/stdvar.go +++ b/lib/streamaggr/stdvar.go @@ -1,9 +1,9 @@ package streamaggr import ( - "strings" "sync" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" ) @@ -34,7 +34,8 @@ func (as *stdvarAggrState) pushSamples(samples []pushSample) { if !ok { // The entry is missing in the map. Try creating it. v = &stdvarStateValue{} - vNew, loaded := as.m.LoadOrStore(strings.Clone(outputKey), v) + outputKey = bytesutil.InternString(outputKey) + vNew, loaded := as.m.LoadOrStore(outputKey, v) if loaded { // Use the entry created by a concurrent goroutine. v = vNew diff --git a/lib/streamaggr/streamaggr.go b/lib/streamaggr/streamaggr.go index c2465f808..92fdba57a 100644 --- a/lib/streamaggr/streamaggr.go +++ b/lib/streamaggr/streamaggr.go @@ -398,7 +398,11 @@ type aggregator struct { } type aggrState interface { + // pushSamples must push samples to the aggrState. + // + // samples[].key must be cloned by aggrState, since it may change after returning from pushSamples. pushSamples(samples []pushSample) + flushState(ctx *flushCtx, resetState bool) } diff --git a/lib/streamaggr/sum_samples.go b/lib/streamaggr/sum_samples.go index 0e72ebaff..eb621ce5c 100644 --- a/lib/streamaggr/sum_samples.go +++ b/lib/streamaggr/sum_samples.go @@ -1,9 +1,9 @@ package streamaggr import ( - "strings" "sync" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" ) @@ -34,7 +34,8 @@ func (as *sumSamplesAggrState) pushSamples(samples []pushSample) { v = &sumSamplesStateValue{ sum: s.value, } - vNew, loaded := as.m.LoadOrStore(strings.Clone(outputKey), v) + outputKey = bytesutil.InternString(outputKey) + vNew, loaded := as.m.LoadOrStore(outputKey, v) if !loaded { // The new entry has been successfully created. continue diff --git a/lib/streamaggr/total.go b/lib/streamaggr/total.go index ceca2d223..365fdb35d 100644 --- a/lib/streamaggr/total.go +++ b/lib/streamaggr/total.go @@ -2,10 +2,10 @@ package streamaggr import ( "math" - "strings" "sync" "time" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" ) @@ -81,7 +81,8 @@ func (as *totalAggrState) pushSamples(samples []pushSample) { v = &totalStateValue{ lastValues: make(map[string]totalLastValueState), } - vNew, loaded := as.m.LoadOrStore(strings.Clone(outputKey), v) + outputKey = bytesutil.InternString(outputKey) + vNew, loaded := as.m.LoadOrStore(outputKey, v) if loaded { // Use the entry created by a concurrent goroutine. v = vNew @@ -109,9 +110,8 @@ func (as *totalAggrState) pushSamples(samples []pushSample) { lv.value = s.value lv.timestamp = s.timestamp lv.deleteDeadline = deleteDeadline - if !ok { - inputKey = strings.Clone(inputKey) - } + + inputKey = bytesutil.InternString(inputKey) sv.lastValues[inputKey] = lv sv.deleteDeadline = deleteDeadline } diff --git a/lib/streamaggr/unique_samples.go b/lib/streamaggr/unique_samples.go index 7c11a3883..310617cc7 100644 --- a/lib/streamaggr/unique_samples.go +++ b/lib/streamaggr/unique_samples.go @@ -1,9 +1,9 @@ package streamaggr import ( - "strings" "sync" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" ) @@ -36,7 +36,8 @@ func (as *uniqueSamplesAggrState) pushSamples(samples []pushSample) { s.value: {}, }, } - vNew, loaded := as.m.LoadOrStore(strings.Clone(outputKey), v) + outputKey = bytesutil.InternString(outputKey) + vNew, loaded := as.m.LoadOrStore(outputKey, v) if !loaded { // The new entry has been successfully created. continue