lib/streamaggr: improve performance for dedupAggr.sizeBytes() and dedupAggr.itemsCount()

These functions are called every time `/metrics` page is scraped, so it would be great
if they could be sped up for the cases when dedupAggr tracks tens of millions of active time series.
This commit is contained in:
Aliaksandr Valialkin 2024-06-10 15:58:03 +02:00
parent f45d02a243
commit e8bb4359bb
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB

View file

@ -2,6 +2,7 @@ package streamaggr
import ( import (
"sync" "sync"
"sync/atomic"
"unsafe" "unsafe"
"github.com/cespare/xxhash/v2" "github.com/cespare/xxhash/v2"
@ -26,6 +27,9 @@ type dedupAggrShard struct {
type dedupAggrShardNopad struct { type dedupAggrShardNopad struct {
mu sync.Mutex mu sync.Mutex
m map[string]dedupAggrSample m map[string]dedupAggrSample
sizeBytes atomic.Uint64
itemsCount atomic.Uint64
} }
type dedupAggrSample struct { type dedupAggrSample struct {
@ -43,7 +47,7 @@ func newDedupAggr() *dedupAggr {
func (da *dedupAggr) sizeBytes() uint64 { func (da *dedupAggr) sizeBytes() uint64 {
n := uint64(unsafe.Sizeof(*da)) n := uint64(unsafe.Sizeof(*da))
for i := range da.shards { for i := range da.shards {
n += da.shards[i].sizeBytes() n += da.shards[i].sizeBytes.Load()
} }
return n return n
} }
@ -51,28 +55,11 @@ func (da *dedupAggr) sizeBytes() uint64 {
func (da *dedupAggr) itemsCount() uint64 { func (da *dedupAggr) itemsCount() uint64 {
n := uint64(0) n := uint64(0)
for i := range da.shards { for i := range da.shards {
n += da.shards[i].itemsCount() n += da.shards[i].itemsCount.Load()
} }
return n return n
} }
func (das *dedupAggrShard) sizeBytes() uint64 {
das.mu.Lock()
n := uint64(unsafe.Sizeof(*das))
for k, s := range das.m {
n += uint64(len(k)) + uint64(unsafe.Sizeof(k)+unsafe.Sizeof(s))
}
das.mu.Unlock()
return n
}
func (das *dedupAggrShard) itemsCount() uint64 {
das.mu.Lock()
n := uint64(len(das.m))
das.mu.Unlock()
return n
}
func (da *dedupAggr) pushSamples(samples []pushSample) { func (da *dedupAggr) pushSamples(samples []pushSample) {
pss := getPerShardSamples() pss := getPerShardSamples()
shards := pss.shards shards := pss.shards
@ -181,6 +168,8 @@ func (das *dedupAggrShard) pushSamples(samples []pushSample) {
value: sample.value, value: sample.value,
timestamp: sample.timestamp, timestamp: sample.timestamp,
} }
das.itemsCount.Add(1)
das.sizeBytes.Add(uint64(len(key)) + uint64(unsafe.Sizeof("")+unsafe.Sizeof(dedupAggrSample{})))
continue continue
} }
// Update the existing value according to logic described at https://docs.victoriametrics.com/#deduplication // Update the existing value according to logic described at https://docs.victoriametrics.com/#deduplication
@ -200,6 +189,8 @@ func (das *dedupAggrShard) flush(ctx *dedupFlushCtx, f func(samples []pushSample
m := das.m m := das.m
if len(m) > 0 { if len(m) > 0 {
das.m = make(map[string]dedupAggrSample, len(m)) das.m = make(map[string]dedupAggrSample, len(m))
das.sizeBytes.Store(0)
das.itemsCount.Store(0)
} }
das.mu.Unlock() das.mu.Unlock()