mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-10 15:14:09 +00:00
lib/streamaggr: reduce number of inuse objects (#6402)
The main change is getting rid of interning of sample key. It was discovered that for cases with many unique time series aggregated by vmagent interned keys could grow up to hundreds of millions of objects. This has negative impact on the following aspects: 1. It slows down garbage collection cycles, as GC has to scan all inuse objects periodically. The higher is the number of inuse objects, the longer it takes/the more CPU it takes. 2. It slows down the hot path of samples aggregation where each key needs to be looked up in the map first. The change makes code more fragile, but suppose to provide performance optimization for heavy-loaded vmagents with stream aggregation enabled. --------- Signed-off-by: hagen1778 <roman@victoriametrics.com> Co-authored-by: Aliaksandr Valialkin <valyala@victoriametrics.com>
This commit is contained in:
parent
fae589bb83
commit
78121642df
16 changed files with 44 additions and 23 deletions
|
@ -51,6 +51,7 @@ See also [LTS releases](https://docs.victoriametrics.com/lts-releases/).
|
|||
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): allow configuring `-remoteWrite.disableOnDiskQueue` and `-remoteWrite.dropSamplesOnOverload` cmd-line flags per each `-remoteWrite.url`. See this [pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6065). Thanks to @rbizos for implementation!
|
||||
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add labels `path` and `url` to metrics `vmagent_remotewrite_push_failures_total` and `vmagent_remotewrite_samples_dropped_total`. Now number of failed pushes and dropped samples can be tracked per `-remoteWrite.url`.
|
||||
* FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): add [rate_sum](https://docs.victoriametrics.com/stream-aggregation/#rate_sum) and [rate_avg](https://docs.victoriametrics.com/stream-aggregation/#rate_avg) aggregation outputs.
|
||||
* FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): reduce the number of allocated objects in heap during deduplication and aggregation. The change supposed to reduce pressure on Garbage Collector, as it will need to scan less objects. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6402).
|
||||
* FEATURE: [vmalert](https://docs.victoriametrics.com/vmalert/): add `datasource.idleConnTimeout`, `remoteWrite.idleConnTimeout` and `remoteRead.idleConnTimeout` flags. These flags are set to 50s by default and should reduce the probability of `broken pipe` or `connection reset by peer` errors in vmalert logs. See this [issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5661) for details.
|
||||
* FEATURE: [vmalert](https://docs.victoriametrics.com/vmalert/): add auto request retry for trivial network errors, such as `broken pipe` and `connection reset` for requests to `remoteRead`, `remoteWrite` and `datasource` URLs. See this [issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5661) for details.
|
||||
* FEATURE: [vmalert](https://docs.victoriametrics.com/vmalert/): reduce CPU usage when evaluating high number of alerting and recording rules.
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package streamaggr
|
||||
|
||||
import (
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
|
||||
|
@ -35,7 +36,7 @@ func (as *avgAggrState) pushSamples(samples []pushSample) {
|
|||
sum: s.value,
|
||||
count: 1,
|
||||
}
|
||||
vNew, loaded := as.m.LoadOrStore(outputKey, v)
|
||||
vNew, loaded := as.m.LoadOrStore(strings.Clone(outputKey), v)
|
||||
if !loaded {
|
||||
// The entry has been successfully stored
|
||||
continue
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package streamaggr
|
||||
|
||||
import (
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
|
||||
|
@ -33,7 +34,7 @@ func (as *countSamplesAggrState) pushSamples(samples []pushSample) {
|
|||
v = &countSamplesStateValue{
|
||||
n: 1,
|
||||
}
|
||||
vNew, loaded := as.m.LoadOrStore(outputKey, v)
|
||||
vNew, loaded := as.m.LoadOrStore(strings.Clone(outputKey), v)
|
||||
if !loaded {
|
||||
// The new entry has been successfully created.
|
||||
continue
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package streamaggr
|
||||
|
||||
import (
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||
|
@ -41,7 +42,7 @@ func (as *countSeriesAggrState) pushSamples(samples []pushSample) {
|
|||
h: {},
|
||||
},
|
||||
}
|
||||
vNew, loaded := as.m.LoadOrStore(outputKey, v)
|
||||
vNew, loaded := as.m.LoadOrStore(strings.Clone(outputKey), v)
|
||||
if !loaded {
|
||||
// The entry has been added to the map.
|
||||
continue
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package streamaggr
|
||||
|
||||
import (
|
||||
"strings"
|
||||
"sync"
|
||||
"unsafe"
|
||||
|
||||
|
@ -24,7 +25,7 @@ type dedupAggrShard struct {
|
|||
|
||||
type dedupAggrShardNopad struct {
|
||||
mu sync.Mutex
|
||||
m map[string]dedupAggrSample
|
||||
m map[string]*dedupAggrSample
|
||||
}
|
||||
|
||||
type dedupAggrSample struct {
|
||||
|
@ -169,13 +170,13 @@ func (das *dedupAggrShard) pushSamples(samples []pushSample) {
|
|||
|
||||
m := das.m
|
||||
if m == nil {
|
||||
m = make(map[string]dedupAggrSample, len(samples))
|
||||
m = make(map[string]*dedupAggrSample, len(samples))
|
||||
das.m = m
|
||||
}
|
||||
for _, sample := range samples {
|
||||
s, ok := m[sample.key]
|
||||
if !ok {
|
||||
m[sample.key] = dedupAggrSample{
|
||||
m[strings.Clone(sample.key)] = &dedupAggrSample{
|
||||
value: sample.value,
|
||||
timestamp: sample.timestamp,
|
||||
}
|
||||
|
@ -183,10 +184,8 @@ func (das *dedupAggrShard) pushSamples(samples []pushSample) {
|
|||
}
|
||||
// Update the existing value according to logic described at https://docs.victoriametrics.com/#deduplication
|
||||
if sample.timestamp > s.timestamp || (sample.timestamp == s.timestamp && sample.value > s.value) {
|
||||
m[sample.key] = dedupAggrSample{
|
||||
value: sample.value,
|
||||
timestamp: sample.timestamp,
|
||||
}
|
||||
s.value = sample.value
|
||||
s.timestamp = sample.timestamp
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -196,7 +195,7 @@ func (das *dedupAggrShard) flush(ctx *dedupFlushCtx, f func(samples []pushSample
|
|||
|
||||
m := das.m
|
||||
if resetState && len(m) > 0 {
|
||||
das.m = make(map[string]dedupAggrSample, len(m))
|
||||
das.m = make(map[string]*dedupAggrSample, len(m))
|
||||
}
|
||||
|
||||
das.mu.Unlock()
|
||||
|
|
|
@ -2,6 +2,7 @@ package streamaggr
|
|||
|
||||
import (
|
||||
"math"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
|
@ -42,7 +43,7 @@ func (as *histogramBucketAggrState) pushSamples(samples []pushSample) {
|
|||
if !ok {
|
||||
// The entry is missing in the map. Try creating it.
|
||||
v = &histogramBucketStateValue{}
|
||||
vNew, loaded := as.m.LoadOrStore(outputKey, v)
|
||||
vNew, loaded := as.m.LoadOrStore(strings.Clone(outputKey), v)
|
||||
if loaded {
|
||||
// Use the entry created by a concurrent goroutine.
|
||||
v = vNew
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package streamaggr
|
||||
|
||||
import (
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
|
||||
|
@ -35,7 +36,7 @@ func (as *lastAggrState) pushSamples(samples []pushSample) {
|
|||
last: s.value,
|
||||
timestamp: s.timestamp,
|
||||
}
|
||||
vNew, loaded := as.m.LoadOrStore(outputKey, v)
|
||||
vNew, loaded := as.m.LoadOrStore(strings.Clone(outputKey), v)
|
||||
if !loaded {
|
||||
// The new entry has been successfully created.
|
||||
continue
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package streamaggr
|
||||
|
||||
import (
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
|
||||
|
@ -33,7 +34,7 @@ func (as *maxAggrState) pushSamples(samples []pushSample) {
|
|||
v = &maxStateValue{
|
||||
max: s.value,
|
||||
}
|
||||
vNew, loaded := as.m.LoadOrStore(outputKey, v)
|
||||
vNew, loaded := as.m.LoadOrStore(strings.Clone(outputKey), v)
|
||||
if !loaded {
|
||||
// The new entry has been successfully created.
|
||||
continue
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package streamaggr
|
||||
|
||||
import (
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
|
||||
|
@ -33,7 +34,7 @@ func (as *minAggrState) pushSamples(samples []pushSample) {
|
|||
v = &minStateValue{
|
||||
min: s.value,
|
||||
}
|
||||
vNew, loaded := as.m.LoadOrStore(outputKey, v)
|
||||
vNew, loaded := as.m.LoadOrStore(strings.Clone(outputKey), v)
|
||||
if !loaded {
|
||||
// The new entry has been successfully created.
|
||||
continue
|
||||
|
|
|
@ -2,6 +2,7 @@ package streamaggr
|
|||
|
||||
import (
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||
|
@ -41,7 +42,7 @@ func (as *quantilesAggrState) pushSamples(samples []pushSample) {
|
|||
v = &quantilesStateValue{
|
||||
h: h,
|
||||
}
|
||||
vNew, loaded := as.m.LoadOrStore(outputKey, v)
|
||||
vNew, loaded := as.m.LoadOrStore(strings.Clone(outputKey), v)
|
||||
if loaded {
|
||||
// Use the entry created by a concurrent goroutine.
|
||||
histogram.PutFast(h)
|
||||
|
|
|
@ -2,6 +2,7 @@ package streamaggr
|
|||
|
||||
import (
|
||||
"math"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
|
||||
|
@ -34,7 +35,7 @@ func (as *stddevAggrState) pushSamples(samples []pushSample) {
|
|||
if !ok {
|
||||
// The entry is missing in the map. Try creating it.
|
||||
v = &stddevStateValue{}
|
||||
vNew, loaded := as.m.LoadOrStore(outputKey, v)
|
||||
vNew, loaded := as.m.LoadOrStore(strings.Clone(outputKey), v)
|
||||
if loaded {
|
||||
// Use the entry created by a concurrent goroutine.
|
||||
v = vNew
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package streamaggr
|
||||
|
||||
import (
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
|
||||
|
@ -33,7 +34,7 @@ func (as *stdvarAggrState) pushSamples(samples []pushSample) {
|
|||
if !ok {
|
||||
// The entry is missing in the map. Try creating it.
|
||||
v = &stdvarStateValue{}
|
||||
vNew, loaded := as.m.LoadOrStore(outputKey, v)
|
||||
vNew, loaded := as.m.LoadOrStore(strings.Clone(outputKey), v)
|
||||
if loaded {
|
||||
// Use the entry created by a concurrent goroutine.
|
||||
v = vNew
|
||||
|
|
|
@ -845,8 +845,11 @@ func (a *aggregator) Push(tss []prompbmarshal.TimeSeries, matchIdxs []byte) {
|
|||
outputLabels.Labels = append(outputLabels.Labels, labels.Labels...)
|
||||
}
|
||||
|
||||
buf = compressLabels(buf[:0], inputLabels.Labels, outputLabels.Labels)
|
||||
key := bytesutil.InternBytes(buf)
|
||||
bufLen := len(buf)
|
||||
buf = compressLabels(buf, inputLabels.Labels, outputLabels.Labels)
|
||||
// key remains valid only by the end of this function and can't be reused after
|
||||
// do not intern key because number of unique keys could be too high
|
||||
key := bytesutil.ToUnsafeString(buf[bufLen:])
|
||||
for _, sample := range ts.Samples {
|
||||
if math.IsNaN(sample.Value) {
|
||||
a.ignoredNanSamples.Inc()
|
||||
|
@ -942,6 +945,8 @@ func (ctx *pushCtx) reset() {
|
|||
}
|
||||
|
||||
type pushSample struct {
|
||||
// key identifies a sample that belongs to unique series
|
||||
// key value can't be re-used
|
||||
key string
|
||||
value float64
|
||||
timestamp int64
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package streamaggr
|
||||
|
||||
import (
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
|
||||
|
@ -33,7 +34,7 @@ func (as *sumSamplesAggrState) pushSamples(samples []pushSample) {
|
|||
v = &sumSamplesStateValue{
|
||||
sum: s.value,
|
||||
}
|
||||
vNew, loaded := as.m.LoadOrStore(outputKey, v)
|
||||
vNew, loaded := as.m.LoadOrStore(strings.Clone(outputKey), v)
|
||||
if !loaded {
|
||||
// The new entry has been successfully created.
|
||||
continue
|
||||
|
|
|
@ -2,6 +2,7 @@ package streamaggr
|
|||
|
||||
import (
|
||||
"math"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
|
@ -80,7 +81,7 @@ func (as *totalAggrState) pushSamples(samples []pushSample) {
|
|||
v = &totalStateValue{
|
||||
lastValues: make(map[string]totalLastValueState),
|
||||
}
|
||||
vNew, loaded := as.m.LoadOrStore(outputKey, v)
|
||||
vNew, loaded := as.m.LoadOrStore(strings.Clone(outputKey), v)
|
||||
if loaded {
|
||||
// Use the entry created by a concurrent goroutine.
|
||||
v = vNew
|
||||
|
@ -108,6 +109,9 @@ func (as *totalAggrState) pushSamples(samples []pushSample) {
|
|||
lv.value = s.value
|
||||
lv.timestamp = s.timestamp
|
||||
lv.deleteDeadline = deleteDeadline
|
||||
if !ok {
|
||||
inputKey = strings.Clone(inputKey)
|
||||
}
|
||||
sv.lastValues[inputKey] = lv
|
||||
sv.deleteDeadline = deleteDeadline
|
||||
}
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package streamaggr
|
||||
|
||||
import (
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
|
||||
|
@ -35,7 +36,7 @@ func (as *uniqueSamplesAggrState) pushSamples(samples []pushSample) {
|
|||
s.value: {},
|
||||
},
|
||||
}
|
||||
vNew, loaded := as.m.LoadOrStore(outputKey, v)
|
||||
vNew, loaded := as.m.LoadOrStore(strings.Clone(outputKey), v)
|
||||
if !loaded {
|
||||
// The new entry has been successfully created.
|
||||
continue
|
||||
|
|
Loading…
Reference in a new issue