mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-20 15:16:42 +00:00
04981c7a7f
The resetState arg was used only for the BenchmarkAggregatorsFlushInternalSerial benchmark. This benchmark was testing aggregate state flush performance by keeping the same state across flushes. The benhmark didn't reflect the performance and scalability of stream aggregation in production, while it led to non-trivial code changes related to resetState arg handling. So let's drop the benchmark together with all the code related to resetState handling, in order to simplify the code at lib/streamaggr a bit. Thanks to @AndrewChubatiuk for the original idea at https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6314
86 lines
2.1 KiB
Go
86 lines
2.1 KiB
Go
package streamaggr
|
|
|
|
import (
|
|
"sync"
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
|
"github.com/cespare/xxhash/v2"
|
|
)
|
|
|
|
// countSeriesAggrState calculates output=count_series, e.g. the number of unique series.
|
|
type countSeriesAggrState struct {
|
|
m sync.Map
|
|
}
|
|
|
|
type countSeriesStateValue struct {
|
|
mu sync.Mutex
|
|
m map[uint64]struct{}
|
|
deleted bool
|
|
}
|
|
|
|
func newCountSeriesAggrState() *countSeriesAggrState {
|
|
return &countSeriesAggrState{}
|
|
}
|
|
|
|
func (as *countSeriesAggrState) pushSamples(samples []pushSample) {
|
|
for i := range samples {
|
|
s := &samples[i]
|
|
inputKey, outputKey := getInputOutputKey(s.key)
|
|
|
|
// Count unique hashes over the inputKeys instead of unique inputKey values.
|
|
// This reduces memory usage at the cost of possible hash collisions for distinct inputKey values.
|
|
h := xxhash.Sum64(bytesutil.ToUnsafeBytes(inputKey))
|
|
|
|
again:
|
|
v, ok := as.m.Load(outputKey)
|
|
if !ok {
|
|
// The entry is missing in the map. Try creating it.
|
|
v = &countSeriesStateValue{
|
|
m: map[uint64]struct{}{
|
|
h: {},
|
|
},
|
|
}
|
|
outputKey = bytesutil.InternString(outputKey)
|
|
vNew, loaded := as.m.LoadOrStore(outputKey, v)
|
|
if !loaded {
|
|
// The entry has been added to the map.
|
|
continue
|
|
}
|
|
// Update the entry created by a concurrent goroutine.
|
|
v = vNew
|
|
}
|
|
sv := v.(*countSeriesStateValue)
|
|
sv.mu.Lock()
|
|
deleted := sv.deleted
|
|
if !deleted {
|
|
if _, ok := sv.m[h]; !ok {
|
|
sv.m[h] = struct{}{}
|
|
}
|
|
}
|
|
sv.mu.Unlock()
|
|
if deleted {
|
|
// The entry has been deleted by the concurrent call to flushState
|
|
// Try obtaining and updating the entry again.
|
|
goto again
|
|
}
|
|
}
|
|
}
|
|
|
|
func (as *countSeriesAggrState) flushState(ctx *flushCtx) {
|
|
m := &as.m
|
|
m.Range(func(k, v any) bool {
|
|
// Atomically delete the entry from the map, so new entry is created for the next flush.
|
|
m.Delete(k)
|
|
|
|
sv := v.(*countSeriesStateValue)
|
|
sv.mu.Lock()
|
|
n := len(sv.m)
|
|
// Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls.
|
|
sv.deleted = true
|
|
sv.mu.Unlock()
|
|
|
|
key := k.(string)
|
|
ctx.appendSeries(key, "count_series", float64(n))
|
|
return true
|
|
})
|
|
}
|