lib/streamaggr: reduce number of inuse objects (#6402)

The main change is getting rid of interning of sample key. It was
discovered that for cases with many unique time series aggregated by
vmagent interned keys could grow up to hundreds of millions of objects.
This has negative impact on the following aspects:
1. It slows down garbage collection cycles, as GC has to scan all inuse
objects periodically. The higher is the number of inuse objects, the
longer it takes/the more CPU it takes.
2. It slows down the hot path of samples aggregation where each key
needs to be looked up in the map first.

The change makes code more fragile, but suppose to provide performance
optimization for heavy-loaded vmagents with stream aggregation enabled.

---------

Signed-off-by: hagen1778 <roman@victoriametrics.com>
Co-authored-by: Aliaksandr Valialkin <valyala@victoriametrics.com>
This commit is contained in:
Roman Khavronenko 2024-06-07 15:45:52 +02:00 committed by GitHub
parent 5f46f8a11d
commit 7cb894a777
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
16 changed files with 44 additions and 23 deletions

View file

@ -51,6 +51,7 @@ See also [LTS releases](https://docs.victoriametrics.com/lts-releases/).
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): allow configuring `-remoteWrite.disableOnDiskQueue` and `-remoteWrite.dropSamplesOnOverload` cmd-line flags per each `-remoteWrite.url`. See this [pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6065). Thanks to @rbizos for implementation!
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add labels `path` and `url` to metrics `vmagent_remotewrite_push_failures_total` and `vmagent_remotewrite_samples_dropped_total`. Now number of failed pushes and dropped samples can be tracked per `-remoteWrite.url`.
* FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): add [rate_sum](https://docs.victoriametrics.com/stream-aggregation/#rate_sum) and [rate_avg](https://docs.victoriametrics.com/stream-aggregation/#rate_avg) aggregation outputs.
* FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): reduce the number of allocated objects in heap during deduplication and aggregation. The change supposed to reduce pressure on Garbage Collector, as it will need to scan less objects. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6402).
* FEATURE: [vmalert](https://docs.victoriametrics.com/vmalert/): add `datasource.idleConnTimeout`, `remoteWrite.idleConnTimeout` and `remoteRead.idleConnTimeout` flags. These flags are set to 50s by default and should reduce the probability of `broken pipe` or `connection reset by peer` errors in vmalert logs. See this [issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5661) for details.
* FEATURE: [vmalert](https://docs.victoriametrics.com/vmalert/): add auto request retry for trivial network errors, such as `broken pipe` and `connection reset` for requests to `remoteRead`, `remoteWrite` and `datasource` URLs. See this [issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5661) for details.
* FEATURE: [vmalert](https://docs.victoriametrics.com/vmalert/): reduce CPU usage when evaluating high number of alerting and recording rules.

View file

@ -1,6 +1,7 @@
package streamaggr
import (
"strings"
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
@ -35,7 +36,7 @@ func (as *avgAggrState) pushSamples(samples []pushSample) {
sum: s.value,
count: 1,
}
vNew, loaded := as.m.LoadOrStore(outputKey, v)
vNew, loaded := as.m.LoadOrStore(strings.Clone(outputKey), v)
if !loaded {
// The entry has been successfully stored
continue

View file

@ -1,6 +1,7 @@
package streamaggr
import (
"strings"
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
@ -33,7 +34,7 @@ func (as *countSamplesAggrState) pushSamples(samples []pushSample) {
v = &countSamplesStateValue{
n: 1,
}
vNew, loaded := as.m.LoadOrStore(outputKey, v)
vNew, loaded := as.m.LoadOrStore(strings.Clone(outputKey), v)
if !loaded {
// The new entry has been successfully created.
continue

View file

@ -1,6 +1,7 @@
package streamaggr
import (
"strings"
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
@ -41,7 +42,7 @@ func (as *countSeriesAggrState) pushSamples(samples []pushSample) {
h: {},
},
}
vNew, loaded := as.m.LoadOrStore(outputKey, v)
vNew, loaded := as.m.LoadOrStore(strings.Clone(outputKey), v)
if !loaded {
// The entry has been added to the map.
continue

View file

@ -1,6 +1,7 @@
package streamaggr
import (
"strings"
"sync"
"unsafe"
@ -24,7 +25,7 @@ type dedupAggrShard struct {
type dedupAggrShardNopad struct {
mu sync.Mutex
m map[string]dedupAggrSample
m map[string]*dedupAggrSample
}
type dedupAggrSample struct {
@ -169,13 +170,13 @@ func (das *dedupAggrShard) pushSamples(samples []pushSample) {
m := das.m
if m == nil {
m = make(map[string]dedupAggrSample, len(samples))
m = make(map[string]*dedupAggrSample, len(samples))
das.m = m
}
for _, sample := range samples {
s, ok := m[sample.key]
if !ok {
m[sample.key] = dedupAggrSample{
m[strings.Clone(sample.key)] = &dedupAggrSample{
value: sample.value,
timestamp: sample.timestamp,
}
@ -183,10 +184,8 @@ func (das *dedupAggrShard) pushSamples(samples []pushSample) {
}
// Update the existing value according to logic described at https://docs.victoriametrics.com/#deduplication
if sample.timestamp > s.timestamp || (sample.timestamp == s.timestamp && sample.value > s.value) {
m[sample.key] = dedupAggrSample{
value: sample.value,
timestamp: sample.timestamp,
}
s.value = sample.value
s.timestamp = sample.timestamp
}
}
}
@ -196,7 +195,7 @@ func (das *dedupAggrShard) flush(ctx *dedupFlushCtx, f func(samples []pushSample
m := das.m
if resetState && len(m) > 0 {
das.m = make(map[string]dedupAggrSample, len(m))
das.m = make(map[string]*dedupAggrSample, len(m))
}
das.mu.Unlock()

View file

@ -2,6 +2,7 @@ package streamaggr
import (
"math"
"strings"
"sync"
"time"
@ -42,7 +43,7 @@ func (as *histogramBucketAggrState) pushSamples(samples []pushSample) {
if !ok {
// The entry is missing in the map. Try creating it.
v = &histogramBucketStateValue{}
vNew, loaded := as.m.LoadOrStore(outputKey, v)
vNew, loaded := as.m.LoadOrStore(strings.Clone(outputKey), v)
if loaded {
// Use the entry created by a concurrent goroutine.
v = vNew

View file

@ -1,6 +1,7 @@
package streamaggr
import (
"strings"
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
@ -35,7 +36,7 @@ func (as *lastAggrState) pushSamples(samples []pushSample) {
last: s.value,
timestamp: s.timestamp,
}
vNew, loaded := as.m.LoadOrStore(outputKey, v)
vNew, loaded := as.m.LoadOrStore(strings.Clone(outputKey), v)
if !loaded {
// The new entry has been successfully created.
continue

View file

@ -1,6 +1,7 @@
package streamaggr
import (
"strings"
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
@ -33,7 +34,7 @@ func (as *maxAggrState) pushSamples(samples []pushSample) {
v = &maxStateValue{
max: s.value,
}
vNew, loaded := as.m.LoadOrStore(outputKey, v)
vNew, loaded := as.m.LoadOrStore(strings.Clone(outputKey), v)
if !loaded {
// The new entry has been successfully created.
continue

View file

@ -1,6 +1,7 @@
package streamaggr
import (
"strings"
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
@ -33,7 +34,7 @@ func (as *minAggrState) pushSamples(samples []pushSample) {
v = &minStateValue{
min: s.value,
}
vNew, loaded := as.m.LoadOrStore(outputKey, v)
vNew, loaded := as.m.LoadOrStore(strings.Clone(outputKey), v)
if !loaded {
// The new entry has been successfully created.
continue

View file

@ -2,6 +2,7 @@ package streamaggr
import (
"strconv"
"strings"
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
@ -41,7 +42,7 @@ func (as *quantilesAggrState) pushSamples(samples []pushSample) {
v = &quantilesStateValue{
h: h,
}
vNew, loaded := as.m.LoadOrStore(outputKey, v)
vNew, loaded := as.m.LoadOrStore(strings.Clone(outputKey), v)
if loaded {
// Use the entry created by a concurrent goroutine.
histogram.PutFast(h)

View file

@ -2,6 +2,7 @@ package streamaggr
import (
"math"
"strings"
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
@ -34,7 +35,7 @@ func (as *stddevAggrState) pushSamples(samples []pushSample) {
if !ok {
// The entry is missing in the map. Try creating it.
v = &stddevStateValue{}
vNew, loaded := as.m.LoadOrStore(outputKey, v)
vNew, loaded := as.m.LoadOrStore(strings.Clone(outputKey), v)
if loaded {
// Use the entry created by a concurrent goroutine.
v = vNew

View file

@ -1,6 +1,7 @@
package streamaggr
import (
"strings"
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
@ -33,7 +34,7 @@ func (as *stdvarAggrState) pushSamples(samples []pushSample) {
if !ok {
// The entry is missing in the map. Try creating it.
v = &stdvarStateValue{}
vNew, loaded := as.m.LoadOrStore(outputKey, v)
vNew, loaded := as.m.LoadOrStore(strings.Clone(outputKey), v)
if loaded {
// Use the entry created by a concurrent goroutine.
v = vNew

View file

@ -845,8 +845,11 @@ func (a *aggregator) Push(tss []prompbmarshal.TimeSeries, matchIdxs []byte) {
outputLabels.Labels = append(outputLabels.Labels, labels.Labels...)
}
buf = compressLabels(buf[:0], inputLabels.Labels, outputLabels.Labels)
key := bytesutil.InternBytes(buf)
bufLen := len(buf)
buf = compressLabels(buf, inputLabels.Labels, outputLabels.Labels)
// key remains valid only by the end of this function and can't be reused after
// do not intern key because number of unique keys could be too high
key := bytesutil.ToUnsafeString(buf[bufLen:])
for _, sample := range ts.Samples {
if math.IsNaN(sample.Value) {
a.ignoredNanSamples.Inc()
@ -942,6 +945,8 @@ func (ctx *pushCtx) reset() {
}
type pushSample struct {
// key identifies a sample that belongs to unique series
// key value can't be re-used
key string
value float64
timestamp int64

View file

@ -1,6 +1,7 @@
package streamaggr
import (
"strings"
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
@ -33,7 +34,7 @@ func (as *sumSamplesAggrState) pushSamples(samples []pushSample) {
v = &sumSamplesStateValue{
sum: s.value,
}
vNew, loaded := as.m.LoadOrStore(outputKey, v)
vNew, loaded := as.m.LoadOrStore(strings.Clone(outputKey), v)
if !loaded {
// The new entry has been successfully created.
continue

View file

@ -2,6 +2,7 @@ package streamaggr
import (
"math"
"strings"
"sync"
"time"
@ -80,7 +81,7 @@ func (as *totalAggrState) pushSamples(samples []pushSample) {
v = &totalStateValue{
lastValues: make(map[string]totalLastValueState),
}
vNew, loaded := as.m.LoadOrStore(outputKey, v)
vNew, loaded := as.m.LoadOrStore(strings.Clone(outputKey), v)
if loaded {
// Use the entry created by a concurrent goroutine.
v = vNew
@ -108,6 +109,9 @@ func (as *totalAggrState) pushSamples(samples []pushSample) {
lv.value = s.value
lv.timestamp = s.timestamp
lv.deleteDeadline = deleteDeadline
if !ok {
inputKey = strings.Clone(inputKey)
}
sv.lastValues[inputKey] = lv
sv.deleteDeadline = deleteDeadline
}

View file

@ -1,6 +1,7 @@
package streamaggr
import (
"strings"
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
@ -35,7 +36,7 @@ func (as *uniqueSamplesAggrState) pushSamples(samples []pushSample) {
s.value: {},
},
}
vNew, loaded := as.m.LoadOrStore(outputKey, v)
vNew, loaded := as.m.LoadOrStore(strings.Clone(outputKey), v)
if !loaded {
// The new entry has been successfully created.
continue