From e8bb4359bbe8df977f42118e081d21cc0cff0a11 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Mon, 10 Jun 2024 15:58:03 +0200 Subject: [PATCH] lib/streamaggr: improve performance for dedupAggr.sizeBytes() and dedupAggr.itemsCount() These functions are called every time `/metrics` page is scraped, so it would be great if they could be sped up for the cases when dedupAggr tracks tens of millions of active time series. --- lib/streamaggr/dedup.go | 29 ++++++++++------------------- 1 file changed, 10 insertions(+), 19 deletions(-) diff --git a/lib/streamaggr/dedup.go b/lib/streamaggr/dedup.go index 7bb44262ef..f17487e0ee 100644 --- a/lib/streamaggr/dedup.go +++ b/lib/streamaggr/dedup.go @@ -2,6 +2,7 @@ package streamaggr import ( "sync" + "sync/atomic" "unsafe" "github.com/cespare/xxhash/v2" @@ -26,6 +27,9 @@ type dedupAggrShard struct { type dedupAggrShardNopad struct { mu sync.Mutex m map[string]dedupAggrSample + + sizeBytes atomic.Uint64 + itemsCount atomic.Uint64 } type dedupAggrSample struct { @@ -43,7 +47,7 @@ func newDedupAggr() *dedupAggr { func (da *dedupAggr) sizeBytes() uint64 { n := uint64(unsafe.Sizeof(*da)) for i := range da.shards { - n += da.shards[i].sizeBytes() + n += da.shards[i].sizeBytes.Load() } return n } @@ -51,28 +55,11 @@ func (da *dedupAggr) sizeBytes() uint64 { func (da *dedupAggr) itemsCount() uint64 { n := uint64(0) for i := range da.shards { - n += da.shards[i].itemsCount() + n += da.shards[i].itemsCount.Load() } return n } -func (das *dedupAggrShard) sizeBytes() uint64 { - das.mu.Lock() - n := uint64(unsafe.Sizeof(*das)) - for k, s := range das.m { - n += uint64(len(k)) + uint64(unsafe.Sizeof(k)+unsafe.Sizeof(s)) - } - das.mu.Unlock() - return n -} - -func (das *dedupAggrShard) itemsCount() uint64 { - das.mu.Lock() - n := uint64(len(das.m)) - das.mu.Unlock() - return n -} - func (da *dedupAggr) pushSamples(samples []pushSample) { pss := getPerShardSamples() shards := pss.shards @@ -181,6 +168,8 @@ func (das *dedupAggrShard) pushSamples(samples []pushSample) { value: sample.value, timestamp: sample.timestamp, } + das.itemsCount.Add(1) + das.sizeBytes.Add(uint64(len(key)) + uint64(unsafe.Sizeof("")+unsafe.Sizeof(dedupAggrSample{}))) continue } // Update the existing value according to logic described at https://docs.victoriametrics.com/#deduplication @@ -200,6 +189,8 @@ func (das *dedupAggrShard) flush(ctx *dedupFlushCtx, f func(samples []pushSample m := das.m if len(m) > 0 { das.m = make(map[string]dedupAggrSample, len(m)) + das.sizeBytes.Store(0) + das.itemsCount.Store(0) } das.mu.Unlock()