diff --git a/lib/streamaggr/dedup.go b/lib/streamaggr/dedup.go index f419311d0..03b84b4ec 100644 --- a/lib/streamaggr/dedup.go +++ b/lib/streamaggr/dedup.go @@ -9,6 +9,7 @@ import ( "github.com/cespare/xxhash/v2" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil" ) const dedupAggrShardsCount = 128 @@ -29,6 +30,8 @@ type dedupAggrShardNopad struct { mu sync.Mutex m map[string]*dedupAggrSample + samplesBuf []dedupAggrSample + sizeBytes atomic.Uint64 itemsCount atomic.Uint64 } @@ -161,16 +164,20 @@ func (das *dedupAggrShard) pushSamples(samples []pushSample) { m = make(map[string]*dedupAggrSample, len(samples)) das.m = m } + samplesBuf := das.samplesBuf for _, sample := range samples { s, ok := m[sample.key] if !ok { + samplesBuf = slicesutil.SetLength(samplesBuf, len(samplesBuf)+1) + s = &samplesBuf[len(samplesBuf)-1] + s.value = sample.value + s.timestamp = sample.timestamp + key := strings.Clone(sample.key) - m[key] = &dedupAggrSample{ - value: sample.value, - timestamp: sample.timestamp, - } + m[key] = s + das.itemsCount.Add(1) - das.sizeBytes.Add(uint64(len(key)) + uint64(unsafe.Sizeof(key)+unsafe.Sizeof(s))) + das.sizeBytes.Add(uint64(len(key)) + uint64(unsafe.Sizeof(key)+unsafe.Sizeof(s)+unsafe.Sizeof(*s))) continue } // Update the existing value according to logic described at https://docs.victoriametrics.com/#deduplication @@ -179,6 +186,7 @@ func (das *dedupAggrShard) pushSamples(samples []pushSample) { s.timestamp = sample.timestamp } } + das.samplesBuf = samplesBuf } func (das *dedupAggrShard) flush(ctx *dedupFlushCtx, f func(samples []pushSample)) { @@ -189,6 +197,7 @@ func (das *dedupAggrShard) flush(ctx *dedupFlushCtx, f func(samples []pushSample das.m = make(map[string]*dedupAggrSample, len(m)) das.sizeBytes.Store(0) das.itemsCount.Store(0) + das.samplesBuf = das.samplesBuf[:0] } das.mu.Unlock() diff --git a/lib/streamaggr/dedup_test.go b/lib/streamaggr/dedup_test.go index 38e03678b..91ce09e13 100644 --- a/lib/streamaggr/dedup_test.go +++ b/lib/streamaggr/dedup_test.go @@ -23,8 +23,8 @@ func TestDedupAggrSerial(t *testing.T) { da.pushSamples(samples) } - if n := da.sizeBytes(); n > 4_200_000 { - t.Fatalf("too big dedupAggr state before flush: %d bytes; it shouldn't exceed 4_200_000 bytes", n) + if n := da.sizeBytes(); n > 5_000_000 { + t.Fatalf("too big dedupAggr state before flush: %d bytes; it shouldn't exceed 5_000_000 bytes", n) } if n := da.itemsCount(); n != seriesCount { t.Fatalf("unexpected itemsCount; got %d; want %d", n, seriesCount)