lib/streamaggr: reduce memory allocations by using dedupAggrSample buffer per each dedupAggrShard

This commit is contained in:
Aliaksandr Valialkin 2024-06-10 16:38:17 +02:00
parent a1e8003754
commit 253c0cffbe
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB
2 changed files with 16 additions and 7 deletions

View file

@ -9,6 +9,7 @@ import (
"github.com/cespare/xxhash/v2" "github.com/cespare/xxhash/v2"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil"
) )
const dedupAggrShardsCount = 128 const dedupAggrShardsCount = 128
@ -29,6 +30,8 @@ type dedupAggrShardNopad struct {
mu sync.Mutex mu sync.Mutex
m map[string]*dedupAggrSample m map[string]*dedupAggrSample
samplesBuf []dedupAggrSample
sizeBytes atomic.Uint64 sizeBytes atomic.Uint64
itemsCount atomic.Uint64 itemsCount atomic.Uint64
} }
@ -161,16 +164,20 @@ func (das *dedupAggrShard) pushSamples(samples []pushSample) {
m = make(map[string]*dedupAggrSample, len(samples)) m = make(map[string]*dedupAggrSample, len(samples))
das.m = m das.m = m
} }
samplesBuf := das.samplesBuf
for _, sample := range samples { for _, sample := range samples {
s, ok := m[sample.key] s, ok := m[sample.key]
if !ok { if !ok {
samplesBuf = slicesutil.SetLength(samplesBuf, len(samplesBuf)+1)
s = &samplesBuf[len(samplesBuf)-1]
s.value = sample.value
s.timestamp = sample.timestamp
key := strings.Clone(sample.key) key := strings.Clone(sample.key)
m[key] = &dedupAggrSample{ m[key] = s
value: sample.value,
timestamp: sample.timestamp,
}
das.itemsCount.Add(1) das.itemsCount.Add(1)
das.sizeBytes.Add(uint64(len(key)) + uint64(unsafe.Sizeof(key)+unsafe.Sizeof(s))) das.sizeBytes.Add(uint64(len(key)) + uint64(unsafe.Sizeof(key)+unsafe.Sizeof(s)+unsafe.Sizeof(*s)))
continue continue
} }
// Update the existing value according to logic described at https://docs.victoriametrics.com/#deduplication // Update the existing value according to logic described at https://docs.victoriametrics.com/#deduplication
@ -179,6 +186,7 @@ func (das *dedupAggrShard) pushSamples(samples []pushSample) {
s.timestamp = sample.timestamp s.timestamp = sample.timestamp
} }
} }
das.samplesBuf = samplesBuf
} }
func (das *dedupAggrShard) flush(ctx *dedupFlushCtx, f func(samples []pushSample)) { func (das *dedupAggrShard) flush(ctx *dedupFlushCtx, f func(samples []pushSample)) {
@ -189,6 +197,7 @@ func (das *dedupAggrShard) flush(ctx *dedupFlushCtx, f func(samples []pushSample
das.m = make(map[string]*dedupAggrSample, len(m)) das.m = make(map[string]*dedupAggrSample, len(m))
das.sizeBytes.Store(0) das.sizeBytes.Store(0)
das.itemsCount.Store(0) das.itemsCount.Store(0)
das.samplesBuf = das.samplesBuf[:0]
} }
das.mu.Unlock() das.mu.Unlock()

View file

@ -23,8 +23,8 @@ func TestDedupAggrSerial(t *testing.T) {
da.pushSamples(samples) da.pushSamples(samples)
} }
if n := da.sizeBytes(); n > 4_200_000 { if n := da.sizeBytes(); n > 5_000_000 {
t.Fatalf("too big dedupAggr state before flush: %d bytes; it shouldn't exceed 4_200_000 bytes", n) t.Fatalf("too big dedupAggr state before flush: %d bytes; it shouldn't exceed 5_000_000 bytes", n)
} }
if n := da.itemsCount(); n != seriesCount { if n := da.itemsCount(); n != seriesCount {
t.Fatalf("unexpected itemsCount; got %d; want %d", n, seriesCount) t.Fatalf("unexpected itemsCount; got %d; want %d", n, seriesCount)