lib/streamaggr: use strings.Clone() instead of bytesutil.InternString() for creating series key in dedupAggr

Our internal testing shows that this reduces GC overhead when deduplicating tens of millions of active series.
This commit is contained in:
Aliaksandr Valialkin 2024-06-10 16:05:49 +02:00
parent 9ed9e766e8
commit d269a95da3
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB

View file

@ -1,6 +1,7 @@
package streamaggr
import (
"strings"
"sync"
"sync/atomic"
"unsafe"
@ -26,7 +27,7 @@ type dedupAggrShard struct {
type dedupAggrShardNopad struct {
mu sync.Mutex
m map[string]dedupAggrSample
m map[string]*dedupAggrSample
sizeBytes atomic.Uint64
itemsCount atomic.Uint64
@ -157,28 +158,25 @@ func (das *dedupAggrShard) pushSamples(samples []pushSample) {
m := das.m
if m == nil {
m = make(map[string]dedupAggrSample, len(samples))
m = make(map[string]*dedupAggrSample, len(samples))
das.m = m
}
for _, sample := range samples {
s, ok := m[sample.key]
if !ok {
key := bytesutil.InternString(sample.key)
m[key] = dedupAggrSample{
key := strings.Clone(sample.key)
m[key] = &dedupAggrSample{
value: sample.value,
timestamp: sample.timestamp,
}
das.itemsCount.Add(1)
das.sizeBytes.Add(uint64(len(key)) + uint64(unsafe.Sizeof("")+unsafe.Sizeof(dedupAggrSample{})))
das.sizeBytes.Add(uint64(len(key)) + uint64(unsafe.Sizeof(key)+unsafe.Sizeof(s)))
continue
}
// Update the existing value according to logic described at https://docs.victoriametrics.com/#deduplication
if sample.timestamp > s.timestamp || (sample.timestamp == s.timestamp && sample.value > s.value) {
key := bytesutil.InternString(sample.key)
m[key] = dedupAggrSample{
value: sample.value,
timestamp: sample.timestamp,
}
s.value = sample.value
s.timestamp = sample.timestamp
}
}
}
@ -188,7 +186,7 @@ func (das *dedupAggrShard) flush(ctx *dedupFlushCtx, f func(samples []pushSample
m := das.m
if len(m) > 0 {
das.m = make(map[string]dedupAggrSample, len(m))
das.m = make(map[string]*dedupAggrSample, len(m))
das.sizeBytes.Store(0)
das.itemsCount.Store(0)
}
@ -208,7 +206,7 @@ func (das *dedupAggrShard) flush(ctx *dedupFlushCtx, f func(samples []pushSample
})
// Limit the number of samples per each flush in order to limit memory usage.
if len(dstSamples) >= 100_000 {
if len(dstSamples) >= 10_000 {
f(dstSamples)
clear(dstSamples)
dstSamples = dstSamples[:0]