diff --git a/lib/streamaggr/avg.go b/lib/streamaggr/avg.go index 9a5e6df5f..c629af8d9 100644 --- a/lib/streamaggr/avg.go +++ b/lib/streamaggr/avg.go @@ -54,14 +54,14 @@ func (as *avgAggrState) pushSamples(samples []pushSample) { } sv.mu.Unlock() if deleted { - // The entry has been deleted by the concurrent call to appendSeriesForFlush + // The entry has been deleted by the concurrent call to flushState // Try obtaining and updating the entry again. goto again } } } -func (as *avgAggrState) appendSeriesForFlush(ctx *flushCtx) { +func (as *avgAggrState) flushState(ctx *flushCtx) { currentTimeMsec := int64(fasttime.UnixTimestamp()) * 1000 m := &as.m m.Range(func(k, v interface{}) bool { diff --git a/lib/streamaggr/count_samples.go b/lib/streamaggr/count_samples.go index cd59829ae..807fbdfba 100644 --- a/lib/streamaggr/count_samples.go +++ b/lib/streamaggr/count_samples.go @@ -51,14 +51,14 @@ func (as *countSamplesAggrState) pushSamples(samples []pushSample) { } sv.mu.Unlock() if deleted { - // The entry has been deleted by the concurrent call to appendSeriesForFlush + // The entry has been deleted by the concurrent call to flushState // Try obtaining and updating the entry again. goto again } } } -func (as *countSamplesAggrState) appendSeriesForFlush(ctx *flushCtx) { +func (as *countSamplesAggrState) flushState(ctx *flushCtx) { currentTimeMsec := int64(fasttime.UnixTimestamp()) * 1000 m := &as.m m.Range(func(k, v interface{}) bool { diff --git a/lib/streamaggr/count_series.go b/lib/streamaggr/count_series.go index 04b7383c8..e78ce9273 100644 --- a/lib/streamaggr/count_series.go +++ b/lib/streamaggr/count_series.go @@ -61,14 +61,14 @@ func (as *countSeriesAggrState) pushSamples(samples []pushSample) { } sv.mu.Unlock() if deleted { - // The entry has been deleted by the concurrent call to appendSeriesForFlush + // The entry has been deleted by the concurrent call to flushState // Try obtaining and updating the entry again. goto again } } } -func (as *countSeriesAggrState) appendSeriesForFlush(ctx *flushCtx) { +func (as *countSeriesAggrState) flushState(ctx *flushCtx) { currentTimeMsec := int64(fasttime.UnixTimestamp()) * 1000 m := &as.m m.Range(func(k, v interface{}) bool { diff --git a/lib/streamaggr/dedup.go b/lib/streamaggr/dedup.go index 7de6ca908..e4570f065 100644 --- a/lib/streamaggr/dedup.go +++ b/lib/streamaggr/dedup.go @@ -89,6 +89,21 @@ func (da *dedupAggr) pushSamples(samples []pushSample) { putPerShardSamples(pss) } +func getDedupFlushCtx() *dedupFlushCtx { + v := dedupFlushCtxPool.Get() + if v == nil { + return &dedupFlushCtx{} + } + return v.(*dedupFlushCtx) +} + +func putDedupFlushCtx(ctx *dedupFlushCtx) { + ctx.reset() + dedupFlushCtxPool.Put(ctx) +} + +var dedupFlushCtxPool sync.Pool + type dedupFlushCtx struct { samples []pushSample } @@ -99,12 +114,22 @@ func (ctx *dedupFlushCtx) reset() { } func (da *dedupAggr) flush(f func(samples []pushSample)) { - ctx := &dedupFlushCtx{} - shards := da.shards - for i := range shards { - ctx.reset() - shards[i].flush(ctx, f) + var wg sync.WaitGroup + for i := range da.shards { + flushConcurrencyCh <- struct{}{} + wg.Add(1) + go func(shard *dedupAggrShard) { + defer func() { + <-flushConcurrencyCh + wg.Done() + }() + + ctx := getDedupFlushCtx() + shard.flush(ctx, f) + putDedupFlushCtx(ctx) + }(&da.shards[i]) } + wg.Wait() } type perShardSamples struct { @@ -178,8 +203,14 @@ func (das *dedupAggrShard) flush(ctx *dedupFlushCtx, f func(samples []pushSample key: key, value: s.value, }) - } - ctx.samples = dstSamples + // Limit the number of samples per each flush in order to limit memory usage. + if len(dstSamples) >= 100_000 { + f(dstSamples) + clear(dstSamples) + dstSamples = dstSamples[:0] + } + } f(dstSamples) + ctx.samples = dstSamples } diff --git a/lib/streamaggr/dedup_test.go b/lib/streamaggr/dedup_test.go index 247e3ccc4..c3892a5f8 100644 --- a/lib/streamaggr/dedup_test.go +++ b/lib/streamaggr/dedup_test.go @@ -33,11 +33,14 @@ func TestDedupAggrSerial(t *testing.T) { } flushedSamplesMap := make(map[string]pushSample) + var mu sync.Mutex flushSamples := func(samples []pushSample) { + mu.Lock() for _, sample := range samples { sample.key = strings.Clone(sample.key) flushedSamplesMap[sample.key] = sample } + mu.Unlock() } da.flush(flushSamples) diff --git a/lib/streamaggr/histogram_bucket.go b/lib/streamaggr/histogram_bucket.go index bc91d2f23..1430b2e58 100644 --- a/lib/streamaggr/histogram_bucket.go +++ b/lib/streamaggr/histogram_bucket.go @@ -59,7 +59,7 @@ func (as *histogramBucketAggrState) pushSamples(samples []pushSample) { } sv.mu.Unlock() if deleted { - // The entry has been deleted by the concurrent call to appendSeriesForFlush + // The entry has been deleted by the concurrent call to flushState // Try obtaining and updating the entry again. goto again } @@ -86,7 +86,7 @@ func (as *histogramBucketAggrState) removeOldEntries(currentTime uint64) { }) } -func (as *histogramBucketAggrState) appendSeriesForFlush(ctx *flushCtx) { +func (as *histogramBucketAggrState) flushState(ctx *flushCtx) { currentTime := fasttime.UnixTimestamp() currentTimeMsec := int64(currentTime) * 1000 diff --git a/lib/streamaggr/last.go b/lib/streamaggr/last.go index b4710b21f..667251076 100644 --- a/lib/streamaggr/last.go +++ b/lib/streamaggr/last.go @@ -51,14 +51,14 @@ func (as *lastAggrState) pushSamples(samples []pushSample) { } sv.mu.Unlock() if deleted { - // The entry has been deleted by the concurrent call to appendSeriesForFlush + // The entry has been deleted by the concurrent call to flushState // Try obtaining and updating the entry again. goto again } } } -func (as *lastAggrState) appendSeriesForFlush(ctx *flushCtx) { +func (as *lastAggrState) flushState(ctx *flushCtx) { currentTimeMsec := int64(fasttime.UnixTimestamp()) * 1000 m := &as.m m.Range(func(k, v interface{}) bool { diff --git a/lib/streamaggr/max.go b/lib/streamaggr/max.go index 45d248cbb..4450e210a 100644 --- a/lib/streamaggr/max.go +++ b/lib/streamaggr/max.go @@ -53,14 +53,14 @@ func (as *maxAggrState) pushSamples(samples []pushSample) { } sv.mu.Unlock() if deleted { - // The entry has been deleted by the concurrent call to appendSeriesForFlush + // The entry has been deleted by the concurrent call to flushState // Try obtaining and updating the entry again. goto again } } } -func (as *maxAggrState) appendSeriesForFlush(ctx *flushCtx) { +func (as *maxAggrState) flushState(ctx *flushCtx) { currentTimeMsec := int64(fasttime.UnixTimestamp()) * 1000 m := &as.m m.Range(func(k, v interface{}) bool { diff --git a/lib/streamaggr/min.go b/lib/streamaggr/min.go index 866b95ff9..42d8a2314 100644 --- a/lib/streamaggr/min.go +++ b/lib/streamaggr/min.go @@ -53,14 +53,14 @@ func (as *minAggrState) pushSamples(samples []pushSample) { } sv.mu.Unlock() if deleted { - // The entry has been deleted by the concurrent call to appendSeriesForFlush + // The entry has been deleted by the concurrent call to flushState // Try obtaining and updating the entry again. goto again } } } -func (as *minAggrState) appendSeriesForFlush(ctx *flushCtx) { +func (as *minAggrState) flushState(ctx *flushCtx) { currentTimeMsec := int64(fasttime.UnixTimestamp()) * 1000 m := &as.m m.Range(func(k, v interface{}) bool { diff --git a/lib/streamaggr/quantiles.go b/lib/streamaggr/quantiles.go index 5142e6558..4e605cc91 100644 --- a/lib/streamaggr/quantiles.go +++ b/lib/streamaggr/quantiles.go @@ -58,14 +58,14 @@ func (as *quantilesAggrState) pushSamples(samples []pushSample) { } sv.mu.Unlock() if deleted { - // The entry has been deleted by the concurrent call to appendSeriesForFlush + // The entry has been deleted by the concurrent call to flushState // Try obtaining and updating the entry again. goto again } } } -func (as *quantilesAggrState) appendSeriesForFlush(ctx *flushCtx) { +func (as *quantilesAggrState) flushState(ctx *flushCtx) { currentTimeMsec := int64(fasttime.UnixTimestamp()) * 1000 m := &as.m phis := as.phis diff --git a/lib/streamaggr/stddev.go b/lib/streamaggr/stddev.go index 592787efe..777573ea3 100644 --- a/lib/streamaggr/stddev.go +++ b/lib/streamaggr/stddev.go @@ -54,14 +54,14 @@ func (as *stddevAggrState) pushSamples(samples []pushSample) { } sv.mu.Unlock() if deleted { - // The entry has been deleted by the concurrent call to appendSeriesForFlush + // The entry has been deleted by the concurrent call to flushState // Try obtaining and updating the entry again. goto again } } } -func (as *stddevAggrState) appendSeriesForFlush(ctx *flushCtx) { +func (as *stddevAggrState) flushState(ctx *flushCtx) { currentTimeMsec := int64(fasttime.UnixTimestamp()) * 1000 m := &as.m m.Range(func(k, v interface{}) bool { diff --git a/lib/streamaggr/stdvar.go b/lib/streamaggr/stdvar.go index 585ec22e4..c6f5cfed9 100644 --- a/lib/streamaggr/stdvar.go +++ b/lib/streamaggr/stdvar.go @@ -53,14 +53,14 @@ func (as *stdvarAggrState) pushSamples(samples []pushSample) { } sv.mu.Unlock() if deleted { - // The entry has been deleted by the concurrent call to appendSeriesForFlush + // The entry has been deleted by the concurrent call to flushState // Try obtaining and updating the entry again. goto again } } } -func (as *stdvarAggrState) appendSeriesForFlush(ctx *flushCtx) { +func (as *stdvarAggrState) flushState(ctx *flushCtx) { currentTimeMsec := int64(fasttime.UnixTimestamp()) * 1000 m := &as.m m.Range(func(k, v interface{}) bool { diff --git a/lib/streamaggr/streamaggr.go b/lib/streamaggr/streamaggr.go index bc3dcf71b..0c2d080a5 100644 --- a/lib/streamaggr/streamaggr.go +++ b/lib/streamaggr/streamaggr.go @@ -304,6 +304,7 @@ type aggregator struct { // lc is used for compressing series keys before passing them to dedupAggr and aggrState. lc promutils.LabelsCompressor + // pushFunc is the callback, which is called by aggrState when flushing its state. pushFunc PushFunc // suffix contains a suffix, which should be added to aggregate metric names @@ -329,7 +330,7 @@ type aggregator struct { type aggrState interface { pushSamples(samples []pushSample) - appendSeriesForFlush(ctx *flushCtx) + flushState(ctx *flushCtx) } // PushFunc is called by Aggregators when it needs to push its state to metrics storage @@ -544,11 +545,7 @@ func (a *aggregator) runFlusher(interval, dedupInterval time.Duration) { return case <-tickerFlush.C: startTime := time.Now() - - flushConcurrencyCh <- struct{}{} a.flush() - <-flushConcurrencyCh - d := time.Since(startTime) a.flushDuration.Update(d.Seconds()) if d > interval { @@ -559,11 +556,7 @@ func (a *aggregator) runFlusher(interval, dedupInterval time.Duration) { } case <-dedupTickerCh: startTime := time.Now() - - flushConcurrencyCh <- struct{}{} a.dedupFlush() - <-flushConcurrencyCh - d := time.Since(startTime) a.dedupFlushDuration.Update(d.Seconds()) if d > dedupInterval { @@ -576,49 +569,32 @@ func (a *aggregator) runFlusher(interval, dedupInterval time.Duration) { } } -var flushConcurrencyCh = make(chan struct{}, cgroup.AvailableCPUs()) - func (a *aggregator) dedupFlush() { a.da.flush(a.pushSamples) } func (a *aggregator) flush() { - ctx := &flushCtx{ - a: a, - } + var wg sync.WaitGroup for _, as := range a.aggrStates { - ctx.reset() - as.appendSeriesForFlush(ctx) + flushConcurrencyCh <- struct{}{} + wg.Add(1) + go func(as aggrState) { + defer func() { + <-flushConcurrencyCh + wg.Done() + }() - tss := ctx.tss - - if a.outputRelabeling == nil { - // Fast path - push the output metrics. - a.pushFunc(tss) - continue - } - - // Slower path - apply output relabeling and then push the output metrics. - auxLabels := promutils.GetLabels() - dstLabels := auxLabels.Labels[:0] - dst := tss[:0] - for _, ts := range tss { - dstLabelsLen := len(dstLabels) - dstLabels = append(dstLabels, ts.Labels...) - dstLabels = a.outputRelabeling.Apply(dstLabels, dstLabelsLen) - if len(dstLabels) == dstLabelsLen { - // The metric has been deleted by the relabeling - continue - } - ts.Labels = dstLabels[dstLabelsLen:] - dst = append(dst, ts) - } - a.pushFunc(dst) - auxLabels.Labels = dstLabels - promutils.PutLabels(auxLabels) + ctx := getFlushCtx(a) + as.flushState(ctx) + ctx.flushSeries() + putFlushCtx(ctx) + }(as) } + wg.Wait() } +var flushConcurrencyCh = make(chan struct{}, cgroup.AvailableCPUs()) + // MustStop stops the aggregator. // // The aggregator stops pushing the aggregated metrics after this call. @@ -631,12 +607,10 @@ func (a *aggregator) MustStop() { } // Flush the remaining data from the last interval if needed. - flushConcurrencyCh <- struct{}{} if a.da != nil { a.dedupFlush() } a.flush() - <-flushConcurrencyCh } // Push pushes tss to a. @@ -796,6 +770,23 @@ func getInputOutputLabels(dstInput, dstOutput, labels []prompbmarshal.Label, by, return dstInput, dstOutput } +func getFlushCtx(a *aggregator) *flushCtx { + v := flushCtxPool.Get() + if v == nil { + v = &flushCtx{} + } + ctx := v.(*flushCtx) + ctx.a = a + return ctx +} + +func putFlushCtx(ctx *flushCtx) { + ctx.reset() + flushCtxPool.Put(ctx) +} + +var flushCtxPool sync.Pool + type flushCtx struct { a *aggregator @@ -805,12 +796,55 @@ type flushCtx struct { } func (ctx *flushCtx) reset() { + ctx.a = nil + ctx.resetSeries() +} + +func (ctx *flushCtx) resetSeries() { ctx.tss = prompbmarshal.ResetTimeSeries(ctx.tss) - promrelabel.CleanLabels(ctx.labels) + + clear(ctx.labels) ctx.labels = ctx.labels[:0] + ctx.samples = ctx.samples[:0] } +func (ctx *flushCtx) flushSeries() { + tss := ctx.tss + if len(tss) == 0 { + // nothing to flush + return + } + + outputRelabeling := ctx.a.outputRelabeling + if outputRelabeling == nil { + // Fast path - push the output metrics. + ctx.a.pushFunc(tss) + return + } + + // Slow path - apply output relabeling and then push the output metrics. + auxLabels := promutils.GetLabels() + dstLabels := auxLabels.Labels[:0] + dst := tss[:0] + for _, ts := range tss { + dstLabelsLen := len(dstLabels) + dstLabels = append(dstLabels, ts.Labels...) + dstLabels = outputRelabeling.Apply(dstLabels, dstLabelsLen) + if len(dstLabels) == dstLabelsLen { + // The metric has been deleted by the relabeling + continue + } + ts.Labels = dstLabels[dstLabelsLen:] + dst = append(dst, ts) + } + ctx.a.pushFunc(dst) + auxLabels.Labels = dstLabels + promutils.PutLabels(auxLabels) + + ctx.resetSeries() +} + func (ctx *flushCtx) appendSeries(key, suffix string, timestamp int64, value float64) { labelsLen := len(ctx.labels) samplesLen := len(ctx.samples) @@ -826,6 +860,11 @@ func (ctx *flushCtx) appendSeries(key, suffix string, timestamp int64, value flo Labels: ctx.labels[labelsLen:], Samples: ctx.samples[samplesLen:], }) + + // Limit the maximum length of ctx.tss in order to limit memory usage. + if len(ctx.tss) >= 10_000 { + ctx.flushSeries() + } } func (ctx *flushCtx) appendSeriesWithExtraLabel(key, suffix string, timestamp int64, value float64, extraName, extraValue string) { diff --git a/lib/streamaggr/sum_samples.go b/lib/streamaggr/sum_samples.go index 2ea43d46d..080df9347 100644 --- a/lib/streamaggr/sum_samples.go +++ b/lib/streamaggr/sum_samples.go @@ -51,14 +51,14 @@ func (as *sumSamplesAggrState) pushSamples(samples []pushSample) { } sv.mu.Unlock() if deleted { - // The entry has been deleted by the concurrent call to appendSeriesForFlush + // The entry has been deleted by the concurrent call to flushState // Try obtaining and updating the entry again. goto again } } } -func (as *sumSamplesAggrState) appendSeriesForFlush(ctx *flushCtx) { +func (as *sumSamplesAggrState) flushState(ctx *flushCtx) { currentTimeMsec := int64(fasttime.UnixTimestamp()) * 1000 m := &as.m m.Range(func(k, v interface{}) bool { diff --git a/lib/streamaggr/total.go b/lib/streamaggr/total.go index 67adbae7f..bfce7e66f 100644 --- a/lib/streamaggr/total.go +++ b/lib/streamaggr/total.go @@ -90,7 +90,7 @@ func (as *totalAggrState) pushSamples(samples []pushSample) { } sv.mu.Unlock() if deleted { - // The entry has been deleted by the concurrent call to appendSeriesForFlush + // The entry has been deleted by the concurrent call to flushState // Try obtaining and updating the entry again. goto again } @@ -125,7 +125,7 @@ func (as *totalAggrState) removeOldEntries(currentTime uint64) { }) } -func (as *totalAggrState) appendSeriesForFlush(ctx *flushCtx) { +func (as *totalAggrState) flushState(ctx *flushCtx) { currentTime := fasttime.UnixTimestamp() currentTimeMsec := int64(currentTime) * 1000 diff --git a/lib/streamaggr/unique_samples.go b/lib/streamaggr/unique_samples.go index 4db18f750..60cd3317d 100644 --- a/lib/streamaggr/unique_samples.go +++ b/lib/streamaggr/unique_samples.go @@ -55,14 +55,14 @@ func (as *uniqueSamplesAggrState) pushSamples(samples []pushSample) { } sv.mu.Unlock() if deleted { - // The entry has been deleted by the concurrent call to appendSeriesForFlush + // The entry has been deleted by the concurrent call to flushState // Try obtaining and updating the entry again. goto again } } } -func (as *uniqueSamplesAggrState) appendSeriesForFlush(ctx *flushCtx) { +func (as *uniqueSamplesAggrState) flushState(ctx *flushCtx) { currentTimeMsec := int64(fasttime.UnixTimestamp()) * 1000 m := &as.m m.Range(func(k, v interface{}) bool {