VictoriaMetrics/lib/streamaggr/quantiles.go
Aliaksandr Valialkin 29d526e20a
lib/streamaggr: remove resetState arg from aggrState.flushState()
The resetState arg was used only for the BenchmarkAggregatorsFlushInternalSerial benchmark.
This benchmark was testing aggregate state flush performance by keeping the same state across flushes.
The benhmark didn't reflect the performance and scalability of stream aggregation in production,
while it led to non-trivial code changes related to resetState arg handling.

So let's drop the benchmark together with all the code related to resetState handling,
in order to simplify the code at lib/streamaggr a bit.

Thanks to @AndrewChubatiuk for the original idea at https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6314
2024-08-07 11:46:49 +02:00

91 lines
2.1 KiB
Go

package streamaggr
import (
"strconv"
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/valyala/histogram"
)
// quantilesAggrState calculates output=quantiles, e.g. the given quantiles over the input samples.
type quantilesAggrState struct {
m sync.Map
phis []float64
}
type quantilesStateValue struct {
mu sync.Mutex
h *histogram.Fast
deleted bool
}
func newQuantilesAggrState(phis []float64) *quantilesAggrState {
return &quantilesAggrState{
phis: phis,
}
}
func (as *quantilesAggrState) pushSamples(samples []pushSample) {
for i := range samples {
s := &samples[i]
outputKey := getOutputKey(s.key)
again:
v, ok := as.m.Load(outputKey)
if !ok {
// The entry is missing in the map. Try creating it.
h := histogram.GetFast()
v = &quantilesStateValue{
h: h,
}
outputKey = bytesutil.InternString(outputKey)
vNew, loaded := as.m.LoadOrStore(outputKey, v)
if loaded {
// Use the entry created by a concurrent goroutine.
histogram.PutFast(h)
v = vNew
}
}
sv := v.(*quantilesStateValue)
sv.mu.Lock()
deleted := sv.deleted
if !deleted {
sv.h.Update(s.value)
}
sv.mu.Unlock()
if deleted {
// The entry has been deleted by the concurrent call to flushState
// Try obtaining and updating the entry again.
goto again
}
}
}
func (as *quantilesAggrState) flushState(ctx *flushCtx) {
m := &as.m
phis := as.phis
var quantiles []float64
var b []byte
m.Range(func(k, v any) bool {
// Atomically delete the entry from the map, so new entry is created for the next flush.
m.Delete(k)
sv := v.(*quantilesStateValue)
sv.mu.Lock()
quantiles = sv.h.Quantiles(quantiles[:0], phis)
histogram.PutFast(sv.h)
// Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls.
sv.deleted = true
sv.mu.Unlock()
key := k.(string)
for i, quantile := range quantiles {
b = strconv.AppendFloat(b[:0], phis[i], 'g', -1, 64)
phiStr := bytesutil.InternBytes(b)
ctx.appendSeriesWithExtraLabel(key, "quantiles", quantile, "quantile", phiStr)
}
return true
})
}