diff --git a/lib/streamaggr/dedup.go b/lib/streamaggr/dedup.go index f17487e0ee..f419311d0b 100644 --- a/lib/streamaggr/dedup.go +++ b/lib/streamaggr/dedup.go @@ -1,6 +1,7 @@ package streamaggr import ( + "strings" "sync" "sync/atomic" "unsafe" @@ -26,7 +27,7 @@ type dedupAggrShard struct { type dedupAggrShardNopad struct { mu sync.Mutex - m map[string]dedupAggrSample + m map[string]*dedupAggrSample sizeBytes atomic.Uint64 itemsCount atomic.Uint64 @@ -157,28 +158,25 @@ func (das *dedupAggrShard) pushSamples(samples []pushSample) { m := das.m if m == nil { - m = make(map[string]dedupAggrSample, len(samples)) + m = make(map[string]*dedupAggrSample, len(samples)) das.m = m } for _, sample := range samples { s, ok := m[sample.key] if !ok { - key := bytesutil.InternString(sample.key) - m[key] = dedupAggrSample{ + key := strings.Clone(sample.key) + m[key] = &dedupAggrSample{ value: sample.value, timestamp: sample.timestamp, } das.itemsCount.Add(1) - das.sizeBytes.Add(uint64(len(key)) + uint64(unsafe.Sizeof("")+unsafe.Sizeof(dedupAggrSample{}))) + das.sizeBytes.Add(uint64(len(key)) + uint64(unsafe.Sizeof(key)+unsafe.Sizeof(s))) continue } // Update the existing value according to logic described at https://docs.victoriametrics.com/#deduplication if sample.timestamp > s.timestamp || (sample.timestamp == s.timestamp && sample.value > s.value) { - key := bytesutil.InternString(sample.key) - m[key] = dedupAggrSample{ - value: sample.value, - timestamp: sample.timestamp, - } + s.value = sample.value + s.timestamp = sample.timestamp } } } @@ -188,7 +186,7 @@ func (das *dedupAggrShard) flush(ctx *dedupFlushCtx, f func(samples []pushSample m := das.m if len(m) > 0 { - das.m = make(map[string]dedupAggrSample, len(m)) + das.m = make(map[string]*dedupAggrSample, len(m)) das.sizeBytes.Store(0) das.itemsCount.Store(0) } @@ -208,7 +206,7 @@ func (das *dedupAggrShard) flush(ctx *dedupFlushCtx, f func(samples []pushSample }) // Limit the number of samples per each flush in order to limit memory usage. - if len(dstSamples) >= 100_000 { + if len(dstSamples) >= 10_000 { f(dstSamples) clear(dstSamples) dstSamples = dstSamples[:0]