lib/streamaggr: improve performance for dedupAggr.sizeBytes() and dedupAggr.itemsCount()

These functions are called every time `/metrics` page is scraped, so it would be great
if they could be sped up for the cases when dedupAggr tracks tens of millions of active time series.
This commit is contained in:
Aliaksandr Valialkin 2024-06-10 15:58:03 +02:00
parent 387c22da49
commit 9ed9e766e8
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB

View file

@ -2,6 +2,7 @@ package streamaggr
import (
"sync"
"sync/atomic"
"unsafe"
"github.com/cespare/xxhash/v2"
@ -26,6 +27,9 @@ type dedupAggrShard struct {
type dedupAggrShardNopad struct {
mu sync.Mutex
m map[string]dedupAggrSample
sizeBytes atomic.Uint64
itemsCount atomic.Uint64
}
type dedupAggrSample struct {
@ -43,7 +47,7 @@ func newDedupAggr() *dedupAggr {
func (da *dedupAggr) sizeBytes() uint64 {
n := uint64(unsafe.Sizeof(*da))
for i := range da.shards {
n += da.shards[i].sizeBytes()
n += da.shards[i].sizeBytes.Load()
}
return n
}
@ -51,28 +55,11 @@ func (da *dedupAggr) sizeBytes() uint64 {
func (da *dedupAggr) itemsCount() uint64 {
n := uint64(0)
for i := range da.shards {
n += da.shards[i].itemsCount()
n += da.shards[i].itemsCount.Load()
}
return n
}
func (das *dedupAggrShard) sizeBytes() uint64 {
das.mu.Lock()
n := uint64(unsafe.Sizeof(*das))
for k, s := range das.m {
n += uint64(len(k)) + uint64(unsafe.Sizeof(k)+unsafe.Sizeof(s))
}
das.mu.Unlock()
return n
}
func (das *dedupAggrShard) itemsCount() uint64 {
das.mu.Lock()
n := uint64(len(das.m))
das.mu.Unlock()
return n
}
func (da *dedupAggr) pushSamples(samples []pushSample) {
pss := getPerShardSamples()
shards := pss.shards
@ -181,6 +168,8 @@ func (das *dedupAggrShard) pushSamples(samples []pushSample) {
value: sample.value,
timestamp: sample.timestamp,
}
das.itemsCount.Add(1)
das.sizeBytes.Add(uint64(len(key)) + uint64(unsafe.Sizeof("")+unsafe.Sizeof(dedupAggrSample{})))
continue
}
// Update the existing value according to logic described at https://docs.victoriametrics.com/#deduplication
@ -200,6 +189,8 @@ func (das *dedupAggrShard) flush(ctx *dedupFlushCtx, f func(samples []pushSample
m := das.m
if len(m) > 0 {
das.m = make(map[string]dedupAggrSample, len(m))
das.sizeBytes.Store(0)
das.itemsCount.Store(0)
}
das.mu.Unlock()