lib/streamaggr: follow-up for 7cb894a777

- Use bytesutil.InternString() instead of strings.Clone() for inputKey and outputKey in aggregatorpushSamples().
  This should reduce string allocation rate, since strings can be re-used between aggrState flushes.
- Reduce memory allocations at dedupAggrShard by storing dedupAggrSample by value in the active series map.
- Remove duplicate call to bytesutil.InternBytes() at Deduplicator, since it is already called inside dedupAggr.pushSamples().
- Add missing string interning at rateAggrState.pushSamples().

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6402
This commit is contained in:
Aliaksandr Valialkin 2024-06-07 16:24:09 +02:00
parent 78121642df
commit 32aa0751a1
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB
18 changed files with 61 additions and 39 deletions

View file

@ -1,9 +1,9 @@
package streamaggr package streamaggr
import ( import (
"strings"
"sync" "sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
) )
@ -36,7 +36,8 @@ func (as *avgAggrState) pushSamples(samples []pushSample) {
sum: s.value, sum: s.value,
count: 1, count: 1,
} }
vNew, loaded := as.m.LoadOrStore(strings.Clone(outputKey), v) outputKey = bytesutil.InternString(outputKey)
vNew, loaded := as.m.LoadOrStore(outputKey, v)
if !loaded { if !loaded {
// The entry has been successfully stored // The entry has been successfully stored
continue continue

View file

@ -1,9 +1,9 @@
package streamaggr package streamaggr
import ( import (
"strings"
"sync" "sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
) )
@ -34,7 +34,8 @@ func (as *countSamplesAggrState) pushSamples(samples []pushSample) {
v = &countSamplesStateValue{ v = &countSamplesStateValue{
n: 1, n: 1,
} }
vNew, loaded := as.m.LoadOrStore(strings.Clone(outputKey), v) outputKey = bytesutil.InternString(outputKey)
vNew, loaded := as.m.LoadOrStore(outputKey, v)
if !loaded { if !loaded {
// The new entry has been successfully created. // The new entry has been successfully created.
continue continue

View file

@ -1,7 +1,6 @@
package streamaggr package streamaggr
import ( import (
"strings"
"sync" "sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
@ -42,7 +41,8 @@ func (as *countSeriesAggrState) pushSamples(samples []pushSample) {
h: {}, h: {},
}, },
} }
vNew, loaded := as.m.LoadOrStore(strings.Clone(outputKey), v) outputKey = bytesutil.InternString(outputKey)
vNew, loaded := as.m.LoadOrStore(outputKey, v)
if !loaded { if !loaded {
// The entry has been added to the map. // The entry has been added to the map.
continue continue

View file

@ -1,7 +1,6 @@
package streamaggr package streamaggr
import ( import (
"strings"
"sync" "sync"
"unsafe" "unsafe"
@ -25,7 +24,7 @@ type dedupAggrShard struct {
type dedupAggrShardNopad struct { type dedupAggrShardNopad struct {
mu sync.Mutex mu sync.Mutex
m map[string]*dedupAggrSample m map[string]dedupAggrSample
} }
type dedupAggrSample struct { type dedupAggrSample struct {
@ -170,13 +169,14 @@ func (das *dedupAggrShard) pushSamples(samples []pushSample) {
m := das.m m := das.m
if m == nil { if m == nil {
m = make(map[string]*dedupAggrSample, len(samples)) m = make(map[string]dedupAggrSample, len(samples))
das.m = m das.m = m
} }
for _, sample := range samples { for _, sample := range samples {
s, ok := m[sample.key] s, ok := m[sample.key]
if !ok { if !ok {
m[strings.Clone(sample.key)] = &dedupAggrSample{ key := bytesutil.InternString(sample.key)
m[key] = dedupAggrSample{
value: sample.value, value: sample.value,
timestamp: sample.timestamp, timestamp: sample.timestamp,
} }
@ -184,8 +184,11 @@ func (das *dedupAggrShard) pushSamples(samples []pushSample) {
} }
// Update the existing value according to logic described at https://docs.victoriametrics.com/#deduplication // Update the existing value according to logic described at https://docs.victoriametrics.com/#deduplication
if sample.timestamp > s.timestamp || (sample.timestamp == s.timestamp && sample.value > s.value) { if sample.timestamp > s.timestamp || (sample.timestamp == s.timestamp && sample.value > s.value) {
s.value = sample.value key := bytesutil.InternString(sample.key)
s.timestamp = sample.timestamp m[key] = dedupAggrSample{
value: sample.value,
timestamp: sample.timestamp,
}
} }
} }
} }
@ -195,7 +198,7 @@ func (das *dedupAggrShard) flush(ctx *dedupFlushCtx, f func(samples []pushSample
m := das.m m := das.m
if resetState && len(m) > 0 { if resetState && len(m) > 0 {
das.m = make(map[string]*dedupAggrSample, len(m)) das.m = make(map[string]dedupAggrSample, len(m))
} }
das.mu.Unlock() das.mu.Unlock()

View file

@ -94,8 +94,9 @@ func (d *Deduplicator) Push(tss []prompbmarshal.TimeSeries) {
} }
labels.Sort() labels.Sort()
buf = lc.Compress(buf[:0], labels.Labels) bufLen := len(buf)
key := bytesutil.InternBytes(buf) buf = lc.Compress(buf, labels.Labels)
key := bytesutil.ToUnsafeString(buf[bufLen:])
for _, s := range ts.Samples { for _, s := range ts.Samples {
pss = append(pss, pushSample{ pss = append(pss, pushSample{
key: key, key: key,

View file

@ -2,10 +2,10 @@ package streamaggr
import ( import (
"math" "math"
"strings"
"sync" "sync"
"time" "time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/metrics" "github.com/VictoriaMetrics/metrics"
) )
@ -43,7 +43,8 @@ func (as *histogramBucketAggrState) pushSamples(samples []pushSample) {
if !ok { if !ok {
// The entry is missing in the map. Try creating it. // The entry is missing in the map. Try creating it.
v = &histogramBucketStateValue{} v = &histogramBucketStateValue{}
vNew, loaded := as.m.LoadOrStore(strings.Clone(outputKey), v) outputKey = bytesutil.InternString(outputKey)
vNew, loaded := as.m.LoadOrStore(outputKey, v)
if loaded { if loaded {
// Use the entry created by a concurrent goroutine. // Use the entry created by a concurrent goroutine.
v = vNew v = vNew

View file

@ -1,9 +1,9 @@
package streamaggr package streamaggr
import ( import (
"strings"
"sync" "sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
) )
@ -36,7 +36,8 @@ func (as *lastAggrState) pushSamples(samples []pushSample) {
last: s.value, last: s.value,
timestamp: s.timestamp, timestamp: s.timestamp,
} }
vNew, loaded := as.m.LoadOrStore(strings.Clone(outputKey), v) outputKey = bytesutil.InternString(outputKey)
vNew, loaded := as.m.LoadOrStore(outputKey, v)
if !loaded { if !loaded {
// The new entry has been successfully created. // The new entry has been successfully created.
continue continue

View file

@ -1,9 +1,9 @@
package streamaggr package streamaggr
import ( import (
"strings"
"sync" "sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
) )
@ -34,7 +34,8 @@ func (as *maxAggrState) pushSamples(samples []pushSample) {
v = &maxStateValue{ v = &maxStateValue{
max: s.value, max: s.value,
} }
vNew, loaded := as.m.LoadOrStore(strings.Clone(outputKey), v) outputKey = bytesutil.InternString(outputKey)
vNew, loaded := as.m.LoadOrStore(outputKey, v)
if !loaded { if !loaded {
// The new entry has been successfully created. // The new entry has been successfully created.
continue continue

View file

@ -1,9 +1,9 @@
package streamaggr package streamaggr
import ( import (
"strings"
"sync" "sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
) )
@ -34,7 +34,8 @@ func (as *minAggrState) pushSamples(samples []pushSample) {
v = &minStateValue{ v = &minStateValue{
min: s.value, min: s.value,
} }
vNew, loaded := as.m.LoadOrStore(strings.Clone(outputKey), v) outputKey = bytesutil.InternString(outputKey)
vNew, loaded := as.m.LoadOrStore(outputKey, v)
if !loaded { if !loaded {
// The new entry has been successfully created. // The new entry has been successfully created.
continue continue

View file

@ -2,7 +2,6 @@ package streamaggr
import ( import (
"strconv" "strconv"
"strings"
"sync" "sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
@ -42,7 +41,8 @@ func (as *quantilesAggrState) pushSamples(samples []pushSample) {
v = &quantilesStateValue{ v = &quantilesStateValue{
h: h, h: h,
} }
vNew, loaded := as.m.LoadOrStore(strings.Clone(outputKey), v) outputKey = bytesutil.InternString(outputKey)
vNew, loaded := as.m.LoadOrStore(outputKey, v)
if loaded { if loaded {
// Use the entry created by a concurrent goroutine. // Use the entry created by a concurrent goroutine.
histogram.PutFast(h) histogram.PutFast(h)

View file

@ -4,6 +4,7 @@ import (
"sync" "sync"
"time" "time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
) )
@ -59,6 +60,7 @@ func (as *rateAggrState) pushSamples(samples []pushSample) {
v = &rateStateValue{ v = &rateStateValue{
lastValues: make(map[string]rateLastValueState), lastValues: make(map[string]rateLastValueState),
} }
outputKey = bytesutil.InternString(outputKey)
vNew, loaded := as.m.LoadOrStore(outputKey, v) vNew, loaded := as.m.LoadOrStore(outputKey, v)
if loaded { if loaded {
// Use the entry created by a concurrent goroutine. // Use the entry created by a concurrent goroutine.
@ -89,6 +91,8 @@ func (as *rateAggrState) pushSamples(samples []pushSample) {
lv.value = s.value lv.value = s.value
lv.timestamp = s.timestamp lv.timestamp = s.timestamp
lv.deleteDeadline = deleteDeadline lv.deleteDeadline = deleteDeadline
inputKey = bytesutil.InternString(inputKey)
sv.lastValues[inputKey] = lv sv.lastValues[inputKey] = lv
sv.deleteDeadline = deleteDeadline sv.deleteDeadline = deleteDeadline
} }

View file

@ -2,9 +2,9 @@ package streamaggr
import ( import (
"math" "math"
"strings"
"sync" "sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
) )
@ -35,7 +35,8 @@ func (as *stddevAggrState) pushSamples(samples []pushSample) {
if !ok { if !ok {
// The entry is missing in the map. Try creating it. // The entry is missing in the map. Try creating it.
v = &stddevStateValue{} v = &stddevStateValue{}
vNew, loaded := as.m.LoadOrStore(strings.Clone(outputKey), v) outputKey = bytesutil.InternString(outputKey)
vNew, loaded := as.m.LoadOrStore(outputKey, v)
if loaded { if loaded {
// Use the entry created by a concurrent goroutine. // Use the entry created by a concurrent goroutine.
v = vNew v = vNew

View file

@ -1,9 +1,9 @@
package streamaggr package streamaggr
import ( import (
"strings"
"sync" "sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
) )
@ -34,7 +34,8 @@ func (as *stdvarAggrState) pushSamples(samples []pushSample) {
if !ok { if !ok {
// The entry is missing in the map. Try creating it. // The entry is missing in the map. Try creating it.
v = &stdvarStateValue{} v = &stdvarStateValue{}
vNew, loaded := as.m.LoadOrStore(strings.Clone(outputKey), v) outputKey = bytesutil.InternString(outputKey)
vNew, loaded := as.m.LoadOrStore(outputKey, v)
if loaded { if loaded {
// Use the entry created by a concurrent goroutine. // Use the entry created by a concurrent goroutine.
v = vNew v = vNew

View file

@ -398,7 +398,11 @@ type aggregator struct {
} }
type aggrState interface { type aggrState interface {
// pushSamples must push samples to the aggrState.
//
// samples[].key must be cloned by aggrState, since it may change after returning from pushSamples.
pushSamples(samples []pushSample) pushSamples(samples []pushSample)
flushState(ctx *flushCtx, resetState bool) flushState(ctx *flushCtx, resetState bool)
} }

View file

@ -1,9 +1,9 @@
package streamaggr package streamaggr
import ( import (
"strings"
"sync" "sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
) )
@ -34,7 +34,8 @@ func (as *sumSamplesAggrState) pushSamples(samples []pushSample) {
v = &sumSamplesStateValue{ v = &sumSamplesStateValue{
sum: s.value, sum: s.value,
} }
vNew, loaded := as.m.LoadOrStore(strings.Clone(outputKey), v) outputKey = bytesutil.InternString(outputKey)
vNew, loaded := as.m.LoadOrStore(outputKey, v)
if !loaded { if !loaded {
// The new entry has been successfully created. // The new entry has been successfully created.
continue continue

View file

@ -2,10 +2,10 @@ package streamaggr
import ( import (
"math" "math"
"strings"
"sync" "sync"
"time" "time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
) )
@ -81,7 +81,8 @@ func (as *totalAggrState) pushSamples(samples []pushSample) {
v = &totalStateValue{ v = &totalStateValue{
lastValues: make(map[string]totalLastValueState), lastValues: make(map[string]totalLastValueState),
} }
vNew, loaded := as.m.LoadOrStore(strings.Clone(outputKey), v) outputKey = bytesutil.InternString(outputKey)
vNew, loaded := as.m.LoadOrStore(outputKey, v)
if loaded { if loaded {
// Use the entry created by a concurrent goroutine. // Use the entry created by a concurrent goroutine.
v = vNew v = vNew
@ -109,9 +110,8 @@ func (as *totalAggrState) pushSamples(samples []pushSample) {
lv.value = s.value lv.value = s.value
lv.timestamp = s.timestamp lv.timestamp = s.timestamp
lv.deleteDeadline = deleteDeadline lv.deleteDeadline = deleteDeadline
if !ok {
inputKey = strings.Clone(inputKey) inputKey = bytesutil.InternString(inputKey)
}
sv.lastValues[inputKey] = lv sv.lastValues[inputKey] = lv
sv.deleteDeadline = deleteDeadline sv.deleteDeadline = deleteDeadline
} }

View file

@ -1,9 +1,9 @@
package streamaggr package streamaggr
import ( import (
"strings"
"sync" "sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
) )
@ -36,7 +36,8 @@ func (as *uniqueSamplesAggrState) pushSamples(samples []pushSample) {
s.value: {}, s.value: {},
}, },
} }
vNew, loaded := as.m.LoadOrStore(strings.Clone(outputKey), v) outputKey = bytesutil.InternString(outputKey)
vNew, loaded := as.m.LoadOrStore(outputKey, v)
if !loaded { if !loaded {
// The new entry has been successfully created. // The new entry has been successfully created.
continue continue