lib/streamaggr: flush dedup state and aggregation state in parallel on all the available CPU cores

This should reduce the time needed for aggregation state flush on systems with many CPU cores
This commit is contained in:
Aliaksandr Valialkin 2024-03-04 01:21:39 +02:00
parent 3c06b3af92
commit 0422ae01ba
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB
16 changed files with 151 additions and 78 deletions

View file

@ -54,14 +54,14 @@ func (as *avgAggrState) pushSamples(samples []pushSample) {
}
sv.mu.Unlock()
if deleted {
// The entry has been deleted by the concurrent call to appendSeriesForFlush
// The entry has been deleted by the concurrent call to flushState
// Try obtaining and updating the entry again.
goto again
}
}
}
func (as *avgAggrState) appendSeriesForFlush(ctx *flushCtx) {
func (as *avgAggrState) flushState(ctx *flushCtx) {
currentTimeMsec := int64(fasttime.UnixTimestamp()) * 1000
m := &as.m
m.Range(func(k, v interface{}) bool {

View file

@ -51,14 +51,14 @@ func (as *countSamplesAggrState) pushSamples(samples []pushSample) {
}
sv.mu.Unlock()
if deleted {
// The entry has been deleted by the concurrent call to appendSeriesForFlush
// The entry has been deleted by the concurrent call to flushState
// Try obtaining and updating the entry again.
goto again
}
}
}
func (as *countSamplesAggrState) appendSeriesForFlush(ctx *flushCtx) {
func (as *countSamplesAggrState) flushState(ctx *flushCtx) {
currentTimeMsec := int64(fasttime.UnixTimestamp()) * 1000
m := &as.m
m.Range(func(k, v interface{}) bool {

View file

@ -61,14 +61,14 @@ func (as *countSeriesAggrState) pushSamples(samples []pushSample) {
}
sv.mu.Unlock()
if deleted {
// The entry has been deleted by the concurrent call to appendSeriesForFlush
// The entry has been deleted by the concurrent call to flushState
// Try obtaining and updating the entry again.
goto again
}
}
}
func (as *countSeriesAggrState) appendSeriesForFlush(ctx *flushCtx) {
func (as *countSeriesAggrState) flushState(ctx *flushCtx) {
currentTimeMsec := int64(fasttime.UnixTimestamp()) * 1000
m := &as.m
m.Range(func(k, v interface{}) bool {

View file

@ -89,6 +89,21 @@ func (da *dedupAggr) pushSamples(samples []pushSample) {
putPerShardSamples(pss)
}
func getDedupFlushCtx() *dedupFlushCtx {
v := dedupFlushCtxPool.Get()
if v == nil {
return &dedupFlushCtx{}
}
return v.(*dedupFlushCtx)
}
func putDedupFlushCtx(ctx *dedupFlushCtx) {
ctx.reset()
dedupFlushCtxPool.Put(ctx)
}
var dedupFlushCtxPool sync.Pool
type dedupFlushCtx struct {
samples []pushSample
}
@ -99,12 +114,22 @@ func (ctx *dedupFlushCtx) reset() {
}
func (da *dedupAggr) flush(f func(samples []pushSample)) {
ctx := &dedupFlushCtx{}
shards := da.shards
for i := range shards {
ctx.reset()
shards[i].flush(ctx, f)
var wg sync.WaitGroup
for i := range da.shards {
flushConcurrencyCh <- struct{}{}
wg.Add(1)
go func(shard *dedupAggrShard) {
defer func() {
<-flushConcurrencyCh
wg.Done()
}()
ctx := getDedupFlushCtx()
shard.flush(ctx, f)
putDedupFlushCtx(ctx)
}(&da.shards[i])
}
wg.Wait()
}
type perShardSamples struct {
@ -178,8 +203,14 @@ func (das *dedupAggrShard) flush(ctx *dedupFlushCtx, f func(samples []pushSample
key: key,
value: s.value,
})
}
ctx.samples = dstSamples
// Limit the number of samples per each flush in order to limit memory usage.
if len(dstSamples) >= 100_000 {
f(dstSamples)
clear(dstSamples)
dstSamples = dstSamples[:0]
}
}
f(dstSamples)
ctx.samples = dstSamples
}

View file

@ -33,11 +33,14 @@ func TestDedupAggrSerial(t *testing.T) {
}
flushedSamplesMap := make(map[string]pushSample)
var mu sync.Mutex
flushSamples := func(samples []pushSample) {
mu.Lock()
for _, sample := range samples {
sample.key = strings.Clone(sample.key)
flushedSamplesMap[sample.key] = sample
}
mu.Unlock()
}
da.flush(flushSamples)

View file

@ -59,7 +59,7 @@ func (as *histogramBucketAggrState) pushSamples(samples []pushSample) {
}
sv.mu.Unlock()
if deleted {
// The entry has been deleted by the concurrent call to appendSeriesForFlush
// The entry has been deleted by the concurrent call to flushState
// Try obtaining and updating the entry again.
goto again
}
@ -86,7 +86,7 @@ func (as *histogramBucketAggrState) removeOldEntries(currentTime uint64) {
})
}
func (as *histogramBucketAggrState) appendSeriesForFlush(ctx *flushCtx) {
func (as *histogramBucketAggrState) flushState(ctx *flushCtx) {
currentTime := fasttime.UnixTimestamp()
currentTimeMsec := int64(currentTime) * 1000

View file

@ -51,14 +51,14 @@ func (as *lastAggrState) pushSamples(samples []pushSample) {
}
sv.mu.Unlock()
if deleted {
// The entry has been deleted by the concurrent call to appendSeriesForFlush
// The entry has been deleted by the concurrent call to flushState
// Try obtaining and updating the entry again.
goto again
}
}
}
func (as *lastAggrState) appendSeriesForFlush(ctx *flushCtx) {
func (as *lastAggrState) flushState(ctx *flushCtx) {
currentTimeMsec := int64(fasttime.UnixTimestamp()) * 1000
m := &as.m
m.Range(func(k, v interface{}) bool {

View file

@ -53,14 +53,14 @@ func (as *maxAggrState) pushSamples(samples []pushSample) {
}
sv.mu.Unlock()
if deleted {
// The entry has been deleted by the concurrent call to appendSeriesForFlush
// The entry has been deleted by the concurrent call to flushState
// Try obtaining and updating the entry again.
goto again
}
}
}
func (as *maxAggrState) appendSeriesForFlush(ctx *flushCtx) {
func (as *maxAggrState) flushState(ctx *flushCtx) {
currentTimeMsec := int64(fasttime.UnixTimestamp()) * 1000
m := &as.m
m.Range(func(k, v interface{}) bool {

View file

@ -53,14 +53,14 @@ func (as *minAggrState) pushSamples(samples []pushSample) {
}
sv.mu.Unlock()
if deleted {
// The entry has been deleted by the concurrent call to appendSeriesForFlush
// The entry has been deleted by the concurrent call to flushState
// Try obtaining and updating the entry again.
goto again
}
}
}
func (as *minAggrState) appendSeriesForFlush(ctx *flushCtx) {
func (as *minAggrState) flushState(ctx *flushCtx) {
currentTimeMsec := int64(fasttime.UnixTimestamp()) * 1000
m := &as.m
m.Range(func(k, v interface{}) bool {

View file

@ -58,14 +58,14 @@ func (as *quantilesAggrState) pushSamples(samples []pushSample) {
}
sv.mu.Unlock()
if deleted {
// The entry has been deleted by the concurrent call to appendSeriesForFlush
// The entry has been deleted by the concurrent call to flushState
// Try obtaining and updating the entry again.
goto again
}
}
}
func (as *quantilesAggrState) appendSeriesForFlush(ctx *flushCtx) {
func (as *quantilesAggrState) flushState(ctx *flushCtx) {
currentTimeMsec := int64(fasttime.UnixTimestamp()) * 1000
m := &as.m
phis := as.phis

View file

@ -54,14 +54,14 @@ func (as *stddevAggrState) pushSamples(samples []pushSample) {
}
sv.mu.Unlock()
if deleted {
// The entry has been deleted by the concurrent call to appendSeriesForFlush
// The entry has been deleted by the concurrent call to flushState
// Try obtaining and updating the entry again.
goto again
}
}
}
func (as *stddevAggrState) appendSeriesForFlush(ctx *flushCtx) {
func (as *stddevAggrState) flushState(ctx *flushCtx) {
currentTimeMsec := int64(fasttime.UnixTimestamp()) * 1000
m := &as.m
m.Range(func(k, v interface{}) bool {

View file

@ -53,14 +53,14 @@ func (as *stdvarAggrState) pushSamples(samples []pushSample) {
}
sv.mu.Unlock()
if deleted {
// The entry has been deleted by the concurrent call to appendSeriesForFlush
// The entry has been deleted by the concurrent call to flushState
// Try obtaining and updating the entry again.
goto again
}
}
}
func (as *stdvarAggrState) appendSeriesForFlush(ctx *flushCtx) {
func (as *stdvarAggrState) flushState(ctx *flushCtx) {
currentTimeMsec := int64(fasttime.UnixTimestamp()) * 1000
m := &as.m
m.Range(func(k, v interface{}) bool {

View file

@ -304,6 +304,7 @@ type aggregator struct {
// lc is used for compressing series keys before passing them to dedupAggr and aggrState.
lc promutils.LabelsCompressor
// pushFunc is the callback, which is called by aggrState when flushing its state.
pushFunc PushFunc
// suffix contains a suffix, which should be added to aggregate metric names
@ -329,7 +330,7 @@ type aggregator struct {
type aggrState interface {
pushSamples(samples []pushSample)
appendSeriesForFlush(ctx *flushCtx)
flushState(ctx *flushCtx)
}
// PushFunc is called by Aggregators when it needs to push its state to metrics storage
@ -544,11 +545,7 @@ func (a *aggregator) runFlusher(interval, dedupInterval time.Duration) {
return
case <-tickerFlush.C:
startTime := time.Now()
flushConcurrencyCh <- struct{}{}
a.flush()
<-flushConcurrencyCh
d := time.Since(startTime)
a.flushDuration.Update(d.Seconds())
if d > interval {
@ -559,11 +556,7 @@ func (a *aggregator) runFlusher(interval, dedupInterval time.Duration) {
}
case <-dedupTickerCh:
startTime := time.Now()
flushConcurrencyCh <- struct{}{}
a.dedupFlush()
<-flushConcurrencyCh
d := time.Since(startTime)
a.dedupFlushDuration.Update(d.Seconds())
if d > dedupInterval {
@ -576,49 +569,32 @@ func (a *aggregator) runFlusher(interval, dedupInterval time.Duration) {
}
}
var flushConcurrencyCh = make(chan struct{}, cgroup.AvailableCPUs())
func (a *aggregator) dedupFlush() {
a.da.flush(a.pushSamples)
}
func (a *aggregator) flush() {
ctx := &flushCtx{
a: a,
}
var wg sync.WaitGroup
for _, as := range a.aggrStates {
ctx.reset()
as.appendSeriesForFlush(ctx)
flushConcurrencyCh <- struct{}{}
wg.Add(1)
go func(as aggrState) {
defer func() {
<-flushConcurrencyCh
wg.Done()
}()
tss := ctx.tss
if a.outputRelabeling == nil {
// Fast path - push the output metrics.
a.pushFunc(tss)
continue
}
// Slower path - apply output relabeling and then push the output metrics.
auxLabels := promutils.GetLabels()
dstLabels := auxLabels.Labels[:0]
dst := tss[:0]
for _, ts := range tss {
dstLabelsLen := len(dstLabels)
dstLabels = append(dstLabels, ts.Labels...)
dstLabels = a.outputRelabeling.Apply(dstLabels, dstLabelsLen)
if len(dstLabels) == dstLabelsLen {
// The metric has been deleted by the relabeling
continue
}
ts.Labels = dstLabels[dstLabelsLen:]
dst = append(dst, ts)
}
a.pushFunc(dst)
auxLabels.Labels = dstLabels
promutils.PutLabels(auxLabels)
ctx := getFlushCtx(a)
as.flushState(ctx)
ctx.flushSeries()
putFlushCtx(ctx)
}(as)
}
wg.Wait()
}
var flushConcurrencyCh = make(chan struct{}, cgroup.AvailableCPUs())
// MustStop stops the aggregator.
//
// The aggregator stops pushing the aggregated metrics after this call.
@ -631,12 +607,10 @@ func (a *aggregator) MustStop() {
}
// Flush the remaining data from the last interval if needed.
flushConcurrencyCh <- struct{}{}
if a.da != nil {
a.dedupFlush()
}
a.flush()
<-flushConcurrencyCh
}
// Push pushes tss to a.
@ -796,6 +770,23 @@ func getInputOutputLabels(dstInput, dstOutput, labels []prompbmarshal.Label, by,
return dstInput, dstOutput
}
func getFlushCtx(a *aggregator) *flushCtx {
v := flushCtxPool.Get()
if v == nil {
v = &flushCtx{}
}
ctx := v.(*flushCtx)
ctx.a = a
return ctx
}
func putFlushCtx(ctx *flushCtx) {
ctx.reset()
flushCtxPool.Put(ctx)
}
var flushCtxPool sync.Pool
type flushCtx struct {
a *aggregator
@ -805,12 +796,55 @@ type flushCtx struct {
}
func (ctx *flushCtx) reset() {
ctx.a = nil
ctx.resetSeries()
}
func (ctx *flushCtx) resetSeries() {
ctx.tss = prompbmarshal.ResetTimeSeries(ctx.tss)
promrelabel.CleanLabels(ctx.labels)
clear(ctx.labels)
ctx.labels = ctx.labels[:0]
ctx.samples = ctx.samples[:0]
}
func (ctx *flushCtx) flushSeries() {
tss := ctx.tss
if len(tss) == 0 {
// nothing to flush
return
}
outputRelabeling := ctx.a.outputRelabeling
if outputRelabeling == nil {
// Fast path - push the output metrics.
ctx.a.pushFunc(tss)
return
}
// Slow path - apply output relabeling and then push the output metrics.
auxLabels := promutils.GetLabels()
dstLabels := auxLabels.Labels[:0]
dst := tss[:0]
for _, ts := range tss {
dstLabelsLen := len(dstLabels)
dstLabels = append(dstLabels, ts.Labels...)
dstLabels = outputRelabeling.Apply(dstLabels, dstLabelsLen)
if len(dstLabels) == dstLabelsLen {
// The metric has been deleted by the relabeling
continue
}
ts.Labels = dstLabels[dstLabelsLen:]
dst = append(dst, ts)
}
ctx.a.pushFunc(dst)
auxLabels.Labels = dstLabels
promutils.PutLabels(auxLabels)
ctx.resetSeries()
}
func (ctx *flushCtx) appendSeries(key, suffix string, timestamp int64, value float64) {
labelsLen := len(ctx.labels)
samplesLen := len(ctx.samples)
@ -826,6 +860,11 @@ func (ctx *flushCtx) appendSeries(key, suffix string, timestamp int64, value flo
Labels: ctx.labels[labelsLen:],
Samples: ctx.samples[samplesLen:],
})
// Limit the maximum length of ctx.tss in order to limit memory usage.
if len(ctx.tss) >= 10_000 {
ctx.flushSeries()
}
}
func (ctx *flushCtx) appendSeriesWithExtraLabel(key, suffix string, timestamp int64, value float64, extraName, extraValue string) {

View file

@ -51,14 +51,14 @@ func (as *sumSamplesAggrState) pushSamples(samples []pushSample) {
}
sv.mu.Unlock()
if deleted {
// The entry has been deleted by the concurrent call to appendSeriesForFlush
// The entry has been deleted by the concurrent call to flushState
// Try obtaining and updating the entry again.
goto again
}
}
}
func (as *sumSamplesAggrState) appendSeriesForFlush(ctx *flushCtx) {
func (as *sumSamplesAggrState) flushState(ctx *flushCtx) {
currentTimeMsec := int64(fasttime.UnixTimestamp()) * 1000
m := &as.m
m.Range(func(k, v interface{}) bool {

View file

@ -90,7 +90,7 @@ func (as *totalAggrState) pushSamples(samples []pushSample) {
}
sv.mu.Unlock()
if deleted {
// The entry has been deleted by the concurrent call to appendSeriesForFlush
// The entry has been deleted by the concurrent call to flushState
// Try obtaining and updating the entry again.
goto again
}
@ -125,7 +125,7 @@ func (as *totalAggrState) removeOldEntries(currentTime uint64) {
})
}
func (as *totalAggrState) appendSeriesForFlush(ctx *flushCtx) {
func (as *totalAggrState) flushState(ctx *flushCtx) {
currentTime := fasttime.UnixTimestamp()
currentTimeMsec := int64(currentTime) * 1000

View file

@ -55,14 +55,14 @@ func (as *uniqueSamplesAggrState) pushSamples(samples []pushSample) {
}
sv.mu.Unlock()
if deleted {
// The entry has been deleted by the concurrent call to appendSeriesForFlush
// The entry has been deleted by the concurrent call to flushState
// Try obtaining and updating the entry again.
goto again
}
}
}
func (as *uniqueSamplesAggrState) appendSeriesForFlush(ctx *flushCtx) {
func (as *uniqueSamplesAggrState) flushState(ctx *flushCtx) {
currentTimeMsec := int64(fasttime.UnixTimestamp()) * 1000
m := &as.m
m.Range(func(k, v interface{}) bool {