lib/streamaggr: benchmark only flush routines in BenchmarkDedupAggrFlushSerial and BenchmarkAggregatorsFlushSerial

This commit is contained in:
Aliaksandr Valialkin 2024-03-04 19:12:06 +02:00
parent 074abd5bee
commit 6319d029a8
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB
18 changed files with 163 additions and 113 deletions

View file

@ -59,19 +59,24 @@ func (as *avgAggrState) pushSamples(samples []pushSample) {
}
}
func (as *avgAggrState) flushState(ctx *flushCtx) {
func (as *avgAggrState) flushState(ctx *flushCtx, resetState bool) {
currentTimeMsec := int64(fasttime.UnixTimestamp()) * 1000
m := &as.m
m.Range(func(k, v interface{}) bool {
// Atomically delete the entry from the map, so new entry is created for the next flush.
m.Delete(k)
if resetState {
// Atomically delete the entry from the map, so new entry is created for the next flush.
m.Delete(k)
}
sv := v.(*avgStateValue)
sv.mu.Lock()
avg := sv.sum / float64(sv.count)
// Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls.
sv.deleted = true
if resetState {
// Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls.
sv.deleted = true
}
sv.mu.Unlock()
key := k.(string)
ctx.appendSeries(key, "avg", currentTimeMsec, avg)
return true

View file

@ -56,19 +56,24 @@ func (as *countSamplesAggrState) pushSamples(samples []pushSample) {
}
}
func (as *countSamplesAggrState) flushState(ctx *flushCtx) {
func (as *countSamplesAggrState) flushState(ctx *flushCtx, resetState bool) {
currentTimeMsec := int64(fasttime.UnixTimestamp()) * 1000
m := &as.m
m.Range(func(k, v interface{}) bool {
// Atomically delete the entry from the map, so new entry is created for the next flush.
m.Delete(k)
if resetState {
// Atomically delete the entry from the map, so new entry is created for the next flush.
m.Delete(k)
}
sv := v.(*countSamplesStateValue)
sv.mu.Lock()
n := sv.n
// Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls.
sv.deleted = true
if resetState {
// Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls.
sv.deleted = true
}
sv.mu.Unlock()
key := k.(string)
ctx.appendSeries(key, "count_samples", currentTimeMsec, float64(n))
return true

View file

@ -66,19 +66,24 @@ func (as *countSeriesAggrState) pushSamples(samples []pushSample) {
}
}
func (as *countSeriesAggrState) flushState(ctx *flushCtx) {
func (as *countSeriesAggrState) flushState(ctx *flushCtx, resetState bool) {
currentTimeMsec := int64(fasttime.UnixTimestamp()) * 1000
m := &as.m
m.Range(func(k, v interface{}) bool {
// Atomically delete the entry from the map, so new entry is created for the next flush.
m.Delete(k)
if resetState {
// Atomically delete the entry from the map, so new entry is created for the next flush.
m.Delete(k)
}
sv := v.(*countSeriesStateValue)
sv.mu.Lock()
n := len(sv.m)
// Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls.
sv.deleted = true
if resetState {
// Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls.
sv.deleted = true
}
sv.mu.Unlock()
key := k.(string)
ctx.appendSeries(key, "count_series", currentTimeMsec, float64(n))
return true

View file

@ -112,7 +112,7 @@ func (ctx *dedupFlushCtx) reset() {
ctx.samples = ctx.samples[:0]
}
func (da *dedupAggr) flush(f func(samples []pushSample)) {
func (da *dedupAggr) flush(f func(samples []pushSample), resetState bool) {
var wg sync.WaitGroup
for i := range da.shards {
flushConcurrencyCh <- struct{}{}
@ -124,7 +124,7 @@ func (da *dedupAggr) flush(f func(samples []pushSample)) {
}()
ctx := getDedupFlushCtx()
shard.flush(ctx, f)
shard.flush(ctx, f, resetState)
putDedupFlushCtx(ctx)
}(&da.shards[i])
}
@ -178,11 +178,11 @@ func (das *dedupAggrShard) pushSamples(samples []pushSample) {
}
}
func (das *dedupAggrShard) flush(ctx *dedupFlushCtx, f func(samples []pushSample)) {
func (das *dedupAggrShard) flush(ctx *dedupFlushCtx, f func(samples []pushSample), resetState bool) {
das.mu.Lock()
m := das.m
if len(m) != 0 {
if resetState && len(m) > 0 {
das.m = make(map[string]dedupAggrSample, len(m))
}

View file

@ -4,7 +4,6 @@ import (
"fmt"
"reflect"
"sync"
"sync/atomic"
"testing"
)
@ -40,7 +39,7 @@ func TestDedupAggrSerial(t *testing.T) {
}
mu.Unlock()
}
da.flush(flushSamples)
da.flush(flushSamples, true)
if !reflect.DeepEqual(expectedSamplesMap, flushedSamplesMap) {
t.Fatalf("unexpected samples;\ngot\n%v\nwant\n%v", flushedSamplesMap, expectedSamplesMap)
@ -59,11 +58,6 @@ func TestDedupAggrConcurrent(t *testing.T) {
const seriesCount = 10_000
da := newDedupAggr()
var samplesFlushed atomic.Int64
flushSamples := func(samples []pushSample) {
samplesFlushed.Add(int64(len(samples)))
}
var wg sync.WaitGroup
for i := 0; i < concurrency; i++ {
wg.Add(1)
@ -78,12 +72,7 @@ func TestDedupAggrConcurrent(t *testing.T) {
}
da.pushSamples(samples)
}
da.flush(flushSamples)
}()
}
wg.Wait()
if n := samplesFlushed.Load(); n < seriesCount {
t.Fatalf("too small number of series flushed; got %d; want at least %d", n, seriesCount)
}
}

View file

@ -22,12 +22,13 @@ func BenchmarkDedupAggrFlushSerial(b *testing.B) {
as := newTotalAggrState(time.Hour, true, true)
benchSamples := newBenchSamples(100_000)
da := newDedupAggr()
da.pushSamples(benchSamples)
b.ResetTimer()
b.ReportAllocs()
b.SetBytes(int64(len(benchSamples)))
for i := 0; i < b.N; i++ {
da.pushSamples(benchSamples)
da.flush(as.pushSamples)
da.flush(as.pushSamples, false)
}
}
@ -36,6 +37,7 @@ func benchmarkDedupAggr(b *testing.B, samplesPerPush int) {
benchSamples := newBenchSamples(samplesPerPush)
da := newDedupAggr()
b.ResetTimer()
b.ReportAllocs()
b.SetBytes(int64(samplesPerPush * loops))
b.RunParallel(func(pb *testing.PB) {
@ -51,8 +53,8 @@ func newBenchSamples(count int) []pushSample {
var lc promutils.LabelsCompressor
labels := []prompbmarshal.Label{
{
Name: "instance",
Value: "host-123",
Name: "app",
Value: "app-123",
},
{
Name: "job",
@ -77,8 +79,8 @@ func newBenchSamples(count int) []pushSample {
for i := range samples {
sample := &samples[i]
labels = append(labels[:labelsLen], prompbmarshal.Label{
Name: "app",
Value: fmt.Sprintf("app-%d", i%10),
Name: "app",
Value: fmt.Sprintf("instance-%d", i),
})
keyBuf = compressLabels(keyBuf[:0], &lc, labels[:labelsLen], labels[labelsLen:])
sample.key = string(keyBuf)

View file

@ -84,7 +84,8 @@ func (as *histogramBucketAggrState) removeOldEntries(currentTime uint64) {
})
}
func (as *histogramBucketAggrState) flushState(ctx *flushCtx) {
func (as *histogramBucketAggrState) flushState(ctx *flushCtx, resetState bool) {
_ = resetState // it isn't used here
currentTime := fasttime.UnixTimestamp()
currentTimeMsec := int64(currentTime) * 1000

View file

@ -56,19 +56,24 @@ func (as *lastAggrState) pushSamples(samples []pushSample) {
}
}
func (as *lastAggrState) flushState(ctx *flushCtx) {
func (as *lastAggrState) flushState(ctx *flushCtx, resetState bool) {
currentTimeMsec := int64(fasttime.UnixTimestamp()) * 1000
m := &as.m
m.Range(func(k, v interface{}) bool {
// Atomically delete the entry from the map, so new entry is created for the next flush.
m.Delete(k)
if resetState {
// Atomically delete the entry from the map, so new entry is created for the next flush.
m.Delete(k)
}
sv := v.(*lastStateValue)
sv.mu.Lock()
last := sv.last
// Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls.
sv.deleted = true
if resetState {
// Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls.
sv.deleted = true
}
sv.mu.Unlock()
key := k.(string)
ctx.appendSeries(key, "last", currentTimeMsec, last)
return true

View file

@ -58,19 +58,24 @@ func (as *maxAggrState) pushSamples(samples []pushSample) {
}
}
func (as *maxAggrState) flushState(ctx *flushCtx) {
func (as *maxAggrState) flushState(ctx *flushCtx, resetState bool) {
currentTimeMsec := int64(fasttime.UnixTimestamp()) * 1000
m := &as.m
m.Range(func(k, v interface{}) bool {
// Atomically delete the entry from the map, so new entry is created for the next flush.
m.Delete(k)
if resetState {
// Atomically delete the entry from the map, so new entry is created for the next flush.
m.Delete(k)
}
sv := v.(*maxStateValue)
sv.mu.Lock()
max := sv.max
// Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls.
sv.deleted = true
if resetState {
// Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls.
sv.deleted = true
}
sv.mu.Unlock()
key := k.(string)
ctx.appendSeries(key, "max", currentTimeMsec, max)
return true

View file

@ -58,18 +58,22 @@ func (as *minAggrState) pushSamples(samples []pushSample) {
}
}
func (as *minAggrState) flushState(ctx *flushCtx) {
func (as *minAggrState) flushState(ctx *flushCtx, resetState bool) {
currentTimeMsec := int64(fasttime.UnixTimestamp()) * 1000
m := &as.m
m.Range(func(k, v interface{}) bool {
// Atomically delete the entry from the map, so new entry is created for the next flush.
m.Delete(k)
if resetState {
// Atomically delete the entry from the map, so new entry is created for the next flush.
m.Delete(k)
}
sv := v.(*minStateValue)
sv.mu.Lock()
min := sv.min
// Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls.
sv.deleted = true
if resetState {
// Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls.
sv.deleted = true
}
sv.mu.Unlock()
key := k.(string)
ctx.appendSeries(key, "min", currentTimeMsec, min)

View file

@ -63,22 +63,26 @@ func (as *quantilesAggrState) pushSamples(samples []pushSample) {
}
}
func (as *quantilesAggrState) flushState(ctx *flushCtx) {
func (as *quantilesAggrState) flushState(ctx *flushCtx, resetState bool) {
currentTimeMsec := int64(fasttime.UnixTimestamp()) * 1000
m := &as.m
phis := as.phis
var quantiles []float64
var b []byte
m.Range(func(k, v interface{}) bool {
// Atomically delete the entry from the map, so new entry is created for the next flush.
m.Delete(k)
if resetState {
// Atomically delete the entry from the map, so new entry is created for the next flush.
m.Delete(k)
}
sv := v.(*quantilesStateValue)
sv.mu.Lock()
quantiles = sv.h.Quantiles(quantiles[:0], phis)
histogram.PutFast(sv.h)
// Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls.
sv.deleted = true
if resetState {
// Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls.
sv.deleted = true
}
sv.mu.Unlock()
key := k.(string)

View file

@ -59,19 +59,24 @@ func (as *stddevAggrState) pushSamples(samples []pushSample) {
}
}
func (as *stddevAggrState) flushState(ctx *flushCtx) {
func (as *stddevAggrState) flushState(ctx *flushCtx, resetState bool) {
currentTimeMsec := int64(fasttime.UnixTimestamp()) * 1000
m := &as.m
m.Range(func(k, v interface{}) bool {
// Atomically delete the entry from the map, so new entry is created for the next flush.
m.Delete(k)
if resetState {
// Atomically delete the entry from the map, so new entry is created for the next flush.
m.Delete(k)
}
sv := v.(*stddevStateValue)
sv.mu.Lock()
stddev := math.Sqrt(sv.q / sv.count)
// Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls.
sv.deleted = true
if resetState {
// Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls.
sv.deleted = true
}
sv.mu.Unlock()
key := k.(string)
ctx.appendSeries(key, "stddev", currentTimeMsec, stddev)
return true

View file

@ -58,19 +58,24 @@ func (as *stdvarAggrState) pushSamples(samples []pushSample) {
}
}
func (as *stdvarAggrState) flushState(ctx *flushCtx) {
func (as *stdvarAggrState) flushState(ctx *flushCtx, resetState bool) {
currentTimeMsec := int64(fasttime.UnixTimestamp()) * 1000
m := &as.m
m.Range(func(k, v interface{}) bool {
// Atomically delete the entry from the map, so new entry is created for the next flush.
m.Delete(k)
if resetState {
// Atomically delete the entry from the map, so new entry is created for the next flush.
m.Delete(k)
}
sv := v.(*stdvarStateValue)
sv.mu.Lock()
stdvar := sv.q / sv.count
// Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls.
sv.deleted = true
if resetState {
// Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls.
sv.deleted = true
}
sv.mu.Unlock()
key := k.(string)
ctx.appendSeries(key, "stdvar", currentTimeMsec, stdvar)
return true

View file

@ -361,7 +361,7 @@ type aggregator struct {
type aggrState interface {
pushSamples(samples []pushSample)
flushState(ctx *flushCtx)
flushState(ctx *flushCtx, resetState bool)
}
// PushFunc is called by Aggregators when it needs to push its state to metrics storage
@ -606,18 +606,18 @@ func (a *aggregator) runFlusher(pushFunc PushFunc, alignFlushToInterval, skipInc
defer t.Stop()
if alignFlushToInterval && skipIncompleteFlush {
a.flush(nil, interval)
a.flush(nil, interval, true)
}
for tickerWait(t) {
a.flush(pushFunc, interval)
a.flush(pushFunc, interval, true)
if alignFlushToInterval {
select {
case <-t.C:
if skipIncompleteFlush && tickerWait(t) {
logger.Warnf("drop incomplete aggregation state because the previous flush took longer than interval=%s", interval)
a.flush(nil, interval)
a.flush(nil, interval, true)
}
default:
}
@ -637,10 +637,10 @@ func (a *aggregator) runFlusher(pushFunc PushFunc, alignFlushToInterval, skipInc
if ct.After(flushDeadline) {
// It is time to flush the aggregated state
if alignFlushToInterval && skipIncompleteFlush && !isSkippedFirstFlush {
a.flush(nil, interval)
a.flush(nil, interval, true)
isSkippedFirstFlush = true
} else {
a.flush(pushFunc, interval)
a.flush(pushFunc, interval, true)
}
for ct.After(flushDeadline) {
flushDeadline = flushDeadline.Add(interval)
@ -658,7 +658,7 @@ func (a *aggregator) runFlusher(pushFunc PushFunc, alignFlushToInterval, skipInc
if !skipIncompleteFlush {
a.dedupFlush(dedupInterval)
a.flush(pushFunc, interval)
a.flush(pushFunc, interval, true)
}
}
@ -670,7 +670,7 @@ func (a *aggregator) dedupFlush(dedupInterval time.Duration) {
startTime := time.Now()
a.da.flush(a.pushSamples)
a.da.flush(a.pushSamples, true)
d := time.Since(startTime)
a.dedupFlushDuration.Update(d.Seconds())
@ -682,7 +682,7 @@ func (a *aggregator) dedupFlush(dedupInterval time.Duration) {
}
}
func (a *aggregator) flush(pushFunc PushFunc, interval time.Duration) {
func (a *aggregator) flush(pushFunc PushFunc, interval time.Duration, resetState bool) {
startTime := time.Now()
var wg sync.WaitGroup
@ -696,7 +696,7 @@ func (a *aggregator) flush(pushFunc PushFunc, interval time.Duration) {
}()
ctx := getFlushCtx(a, pushFunc)
as.flushState(ctx)
as.flushState(ctx, resetState)
ctx.flushSeries()
ctx.resetSeries()
putFlushCtx(ctx)

View file

@ -2,6 +2,7 @@ package streamaggr
import (
"fmt"
"strconv"
"strings"
"testing"
"time"
@ -37,37 +38,34 @@ func BenchmarkAggregatorsPush(b *testing.B) {
}
func BenchmarkAggregatorsFlushSerial(b *testing.B) {
for _, output := range benchOutputs {
b.Run(fmt.Sprintf("output=%s", output), func(b *testing.B) {
benchmarkAggregatorsFlushSerial(b, output)
})
outputs := []string{
"total", "sum_samples", "count_samples", "min",
"max", "avg", "increase", "count_series",
"last", "stddev", "stdvar", "total_prometheus", "increase_prometheus",
}
}
func benchmarkAggregatorsFlushSerial(b *testing.B, output string) {
pushFunc := func(tss []prompbmarshal.TimeSeries) {}
a := newBenchAggregators(output, pushFunc)
a := newBenchAggregators(outputs, pushFunc)
defer a.MustStop()
_ = a.Push(benchSeries, nil)
var matchIdxs []byte
b.ResetTimer()
b.ReportAllocs()
b.SetBytes(int64(len(benchSeries)))
b.SetBytes(int64(len(benchSeries) * len(outputs)))
for i := 0; i < b.N; i++ {
matchIdxs = a.Push(benchSeries, matchIdxs)
for _, aggr := range a.as {
aggr.flush(pushFunc, time.Hour)
aggr.flush(pushFunc, time.Hour, false)
}
}
}
func benchmarkAggregatorsPush(b *testing.B, output string) {
pushFunc := func(tss []prompbmarshal.TimeSeries) {}
a := newBenchAggregators(output, pushFunc)
a := newBenchAggregators([]string{output}, pushFunc)
defer a.MustStop()
const loops = 100
b.ResetTimer()
b.ReportAllocs()
b.SetBytes(int64(len(benchSeries) * loops))
b.RunParallel(func(pb *testing.PB) {
@ -80,13 +78,18 @@ func benchmarkAggregatorsPush(b *testing.B, output string) {
})
}
func newBenchAggregators(output string, pushFunc PushFunc) *Aggregators {
func newBenchAggregators(outputs []string, pushFunc PushFunc) *Aggregators {
outputsQuoted := make([]string, len(outputs))
for i := range outputs {
outputsQuoted[i] = strconv.Quote(outputs[i])
}
config := fmt.Sprintf(`
- match: http_requests_total
interval: 24h
without: [job]
outputs: [%q]
`, output)
by: [job]
outputs: [%s]
`, strings.Join(outputsQuoted, ","))
a, err := newAggregatorsFromData([]byte(config), pushFunc, nil)
if err != nil {
panic(fmt.Errorf("unexpected error when initializing aggregators: %s", err))
@ -98,7 +101,7 @@ func newBenchSeries(seriesCount int) []prompbmarshal.TimeSeries {
a := make([]string, seriesCount)
for j := 0; j < seriesCount; j++ {
s := fmt.Sprintf(`http_requests_total{path="/foo/%d",job="foo_%d",instance="bar",pod="pod-123232312",namespace="kube-foo-bar",node="node-123-3434-443",`+
`some_other_label="foo-bar-baz",environment="prod",label1="value1",label2="value2",label3="value3"} %d`, j, j%10, j*1000)
`some_other_label="foo-bar-baz",environment="prod",label1="value1",label2="value2",label3="value3"} %d`, j, j%100, j*1000)
a = append(a, s)
}
metrics := strings.Join(a, "\n")

View file

@ -56,19 +56,24 @@ func (as *sumSamplesAggrState) pushSamples(samples []pushSample) {
}
}
func (as *sumSamplesAggrState) flushState(ctx *flushCtx) {
func (as *sumSamplesAggrState) flushState(ctx *flushCtx, resetState bool) {
currentTimeMsec := int64(fasttime.UnixTimestamp()) * 1000
m := &as.m
m.Range(func(k, v interface{}) bool {
// Atomically delete the entry from the map, so new entry is created for the next flush.
m.Delete(k)
if resetState {
// Atomically delete the entry from the map, so new entry is created for the next flush.
m.Delete(k)
}
sv := v.(*sumSamplesStateValue)
sv.mu.Lock()
sum := sv.sum
// Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls.
sv.deleted = true
if resetState {
// Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls.
sv.deleted = true
}
sv.mu.Unlock()
key := k.(string)
ctx.appendSeries(key, "sum_samples", currentTimeMsec, sum)
return true

View file

@ -137,7 +137,7 @@ func (as *totalAggrState) removeOldEntries(currentTime uint64) {
})
}
func (as *totalAggrState) flushState(ctx *flushCtx) {
func (as *totalAggrState) flushState(ctx *flushCtx, resetState bool) {
currentTime := fasttime.UnixTimestamp()
currentTimeMsec := int64(currentTime) * 1000
@ -148,11 +148,13 @@ func (as *totalAggrState) flushState(ctx *flushCtx) {
sv := v.(*totalStateValue)
sv.mu.Lock()
total := sv.total
if as.resetTotalOnFlush {
sv.total = 0
} else if math.Abs(sv.total) >= (1 << 53) {
// It is time to reset the entry, since it starts losing float64 precision
sv.total = 0
if resetState {
if as.resetTotalOnFlush {
sv.total = 0
} else if math.Abs(sv.total) >= (1 << 53) {
// It is time to reset the entry, since it starts losing float64 precision
sv.total = 0
}
}
deleted := sv.deleted
sv.mu.Unlock()

View file

@ -60,19 +60,24 @@ func (as *uniqueSamplesAggrState) pushSamples(samples []pushSample) {
}
}
func (as *uniqueSamplesAggrState) flushState(ctx *flushCtx) {
func (as *uniqueSamplesAggrState) flushState(ctx *flushCtx, resetState bool) {
currentTimeMsec := int64(fasttime.UnixTimestamp()) * 1000
m := &as.m
m.Range(func(k, v interface{}) bool {
// Atomically delete the entry from the map, so new entry is created for the next flush.
m.Delete(k)
if resetState {
// Atomically delete the entry from the map, so new entry is created for the next flush.
m.Delete(k)
}
sv := v.(*uniqueSamplesStateValue)
sv.mu.Lock()
n := len(sv.m)
// Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls.
sv.deleted = true
if resetState {
// Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls.
sv.deleted = true
}
sv.mu.Unlock()
key := k.(string)
ctx.appendSeries(key, "unique_samples", currentTimeMsec, float64(n))
return true