From 0b7c47a40ca7cc277d48dfb471df41b059dafd8b Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Mon, 10 Jun 2024 16:05:49 +0200 Subject: [PATCH] lib/streamaggr: use strings.Clone() instead of bytesutil.InternString() for creating series key in dedupAggr Our internal testing shows that this reduces GC overhead when deduplicating tens of millions of active series. --- lib/streamaggr/dedup.go | 22 ++++++++++------------ 1 file changed, 10 insertions(+), 12 deletions(-) diff --git a/lib/streamaggr/dedup.go b/lib/streamaggr/dedup.go index f17487e0e..f419311d0 100644 --- a/lib/streamaggr/dedup.go +++ b/lib/streamaggr/dedup.go @@ -1,6 +1,7 @@ package streamaggr import ( + "strings" "sync" "sync/atomic" "unsafe" @@ -26,7 +27,7 @@ type dedupAggrShard struct { type dedupAggrShardNopad struct { mu sync.Mutex - m map[string]dedupAggrSample + m map[string]*dedupAggrSample sizeBytes atomic.Uint64 itemsCount atomic.Uint64 @@ -157,28 +158,25 @@ func (das *dedupAggrShard) pushSamples(samples []pushSample) { m := das.m if m == nil { - m = make(map[string]dedupAggrSample, len(samples)) + m = make(map[string]*dedupAggrSample, len(samples)) das.m = m } for _, sample := range samples { s, ok := m[sample.key] if !ok { - key := bytesutil.InternString(sample.key) - m[key] = dedupAggrSample{ + key := strings.Clone(sample.key) + m[key] = &dedupAggrSample{ value: sample.value, timestamp: sample.timestamp, } das.itemsCount.Add(1) - das.sizeBytes.Add(uint64(len(key)) + uint64(unsafe.Sizeof("")+unsafe.Sizeof(dedupAggrSample{}))) + das.sizeBytes.Add(uint64(len(key)) + uint64(unsafe.Sizeof(key)+unsafe.Sizeof(s))) continue } // Update the existing value according to logic described at https://docs.victoriametrics.com/#deduplication if sample.timestamp > s.timestamp || (sample.timestamp == s.timestamp && sample.value > s.value) { - key := bytesutil.InternString(sample.key) - m[key] = dedupAggrSample{ - value: sample.value, - timestamp: sample.timestamp, - } + s.value = sample.value + s.timestamp = sample.timestamp } } } @@ -188,7 +186,7 @@ func (das *dedupAggrShard) flush(ctx *dedupFlushCtx, f func(samples []pushSample m := das.m if len(m) > 0 { - das.m = make(map[string]dedupAggrSample, len(m)) + das.m = make(map[string]*dedupAggrSample, len(m)) das.sizeBytes.Store(0) das.itemsCount.Store(0) } @@ -208,7 +206,7 @@ func (das *dedupAggrShard) flush(ctx *dedupFlushCtx, f func(samples []pushSample }) // Limit the number of samples per each flush in order to limit memory usage. - if len(dstSamples) >= 100_000 { + if len(dstSamples) >= 10_000 { f(dstSamples) clear(dstSamples) dstSamples = dstSamples[:0]