diff --git a/app/vmagent/remotewrite/remotewrite_test.go b/app/vmagent/remotewrite/remotewrite_test.go index 01a9fc1f2..6430ff016 100644 --- a/app/vmagent/remotewrite/remotewrite_test.go +++ b/app/vmagent/remotewrite/remotewrite_test.go @@ -53,7 +53,7 @@ func TestGetLabelsHash_Distribution(t *testing.T) { } func TestRemoteWriteContext_TryPush_ImmutableTimeseries(t *testing.T) { - f := func(streamAggrConfig, relabelConfig string, dedupInterval time.Duration, keepInput, dropInput bool, input string) { + f := func(streamAggrConfig, relabelConfig string, stateSize int, dedupInterval time.Duration, keepInput, dropInput bool, input string) { t.Helper() perURLRelabel, err := promrelabel.ParseRelabelConfigsData([]byte(relabelConfig)) if err != nil { @@ -77,12 +77,15 @@ func TestRemoteWriteContext_TryPush_ImmutableTimeseries(t *testing.T) { rowsDroppedByRelabel: metrics.GetOrCreateCounter(`bar`), } if dedupInterval > 0 { - rwctx.deduplicator = streamaggr.NewDeduplicator(nil, dedupInterval, nil, "dedup-global") + rwctx.deduplicator = streamaggr.NewDeduplicator(nil, stateSize, dedupInterval, nil, "dedup-global") } if streamAggrConfig != "" { pushNoop := func(_ []prompbmarshal.TimeSeries) {} - sas, err := streamaggr.LoadFromData([]byte(streamAggrConfig), pushNoop, nil, "global") + opts := streamaggr.Options{ + StateSize: stateSize, + } + sas, err := streamaggr.LoadFromData([]byte(streamAggrConfig), pushNoop, &opts, "global") if err != nil { t.Fatalf("cannot load streamaggr configs: %s", err) } @@ -114,13 +117,13 @@ func TestRemoteWriteContext_TryPush_ImmutableTimeseries(t *testing.T) { - action: keep source_labels: [env] regex: "dev" -`, 0, false, false, ` +`, 1, 0, false, false, ` metric{env="dev"} 10 metric{env="bar"} 20 metric{env="dev"} 15 metric{env="bar"} 25 `) - f(``, ``, time.Hour, false, false, ` + f(``, ``, 2, time.Hour, false, false, ` metric{env="dev"} 10 metric{env="foo"} 20 metric{env="dev"} 15 @@ -130,7 +133,7 @@ metric{env="foo"} 25 - action: keep source_labels: [env] regex: "dev" -`, time.Hour, false, false, ` +`, 3, time.Hour, false, false, ` metric{env="dev"} 10 metric{env="bar"} 20 metric{env="dev"} 15 @@ -140,7 +143,7 @@ metric{env="bar"} 25 - action: keep source_labels: [env] regex: "dev" -`, time.Hour, true, false, ` +`, 6, time.Hour, true, false, ` metric{env="test"} 10 metric{env="dev"} 20 metric{env="foo"} 15 @@ -150,7 +153,7 @@ metric{env="dev"} 25 - action: keep source_labels: [env] regex: "dev" -`, time.Hour, false, true, ` +`, 10, time.Hour, false, true, ` metric{env="foo"} 10 metric{env="dev"} 20 metric{env="foo"} 15 @@ -160,7 +163,7 @@ metric{env="dev"} 25 - action: keep source_labels: [env] regex: "dev" -`, time.Hour, true, true, ` +`, 11, time.Hour, true, true, ` metric{env="dev"} 10 metric{env="test"} 20 metric{env="dev"} 15 diff --git a/app/vmagent/remotewrite/streamaggr.go b/app/vmagent/remotewrite/streamaggr.go index 6391cfa3f..9a5d9adeb 100644 --- a/app/vmagent/remotewrite/streamaggr.go +++ b/app/vmagent/remotewrite/streamaggr.go @@ -35,6 +35,7 @@ var ( "clients pushing data into the vmagent. See https://docs.victoriametrics.com/stream-aggregation/#ignore-aggregation-intervals-on-start") streamAggrGlobalDropInputLabels = flagutil.NewArrayString("streamAggr.dropInputLabels", "An optional list of labels to drop from samples for aggregator "+ "before stream de-duplication and aggregation . See https://docs.victoriametrics.com/stream-aggregation/#dropping-unneeded-labels") + streamAggrGlobalStateSize = flag.Int("streamAggr.stateSize", 1, "Amount of aggregation intervals") // Per URL config streamAggrConfig = flagutil.NewArrayString("remoteWrite.streamAggr.config", "Optional path to file with stream aggregation config for the corresponding -remoteWrite.url. "+ @@ -59,6 +60,7 @@ var ( "before stream de-duplication and aggregation with -remoteWrite.streamAggr.config and -remoteWrite.streamAggr.dedupInterval at the corresponding -remoteWrite.url. "+ "Multiple labels per remoteWrite.url must be delimited by '^^': -remoteWrite.streamAggr.dropInputLabels='replica^^az,replica'. "+ "See https://docs.victoriametrics.com/stream-aggregation/#dropping-unneeded-labels") + streamAggrStateSize = flagutil.NewArrayInt("remoteWrite.streamAggr.stateSize", 1, "Amount of aggregation intervals") ) // CheckStreamAggrConfigs checks -remoteWrite.streamAggr.config and -streamAggr.config. @@ -134,8 +136,11 @@ func initStreamAggrConfigGlobal() { metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_success_timestamp_seconds{path=%q}`, filePath)).Set(fasttime.UnixTimestamp()) } dedupInterval := streamAggrGlobalDedupInterval.Duration() + if *streamAggrGlobalStateSize < 1 { + logger.Fatalf("--streamAggr.stateSize should be greater than 0") + } if dedupInterval > 0 { - deduplicatorGlobal = streamaggr.NewDeduplicator(pushToRemoteStoragesTrackDropped, dedupInterval, *streamAggrGlobalDropInputLabels, "dedup-global") + deduplicatorGlobal = streamaggr.NewDeduplicator(pushToRemoteStoragesTrackDropped, *streamAggrGlobalStateSize, dedupInterval, *streamAggrGlobalDropInputLabels, "dedup-global") } } @@ -161,7 +166,7 @@ func (rwctx *remoteWriteCtx) initStreamAggrConfig() { if streamAggrDropInputLabels.GetOptionalArg(idx) != "" { dropLabels = strings.Split(streamAggrDropInputLabels.GetOptionalArg(idx), "^^") } - rwctx.deduplicator = streamaggr.NewDeduplicator(rwctx.pushInternalTrackDropped, dedupInterval, dropLabels, alias) + rwctx.deduplicator = streamaggr.NewDeduplicator(rwctx.pushInternalTrackDropped, *streamAggrGlobalStateSize, dedupInterval, dropLabels, alias) } } @@ -207,6 +212,7 @@ func newStreamAggrConfigGlobal() (*streamaggr.Aggregators, error) { IgnoreOldSamples: *streamAggrGlobalIgnoreOldSamples, IgnoreFirstIntervals: *streamAggrGlobalIgnoreFirstIntervals, KeepInput: *streamAggrGlobalKeepInput, + StateSize: *streamAggrGlobalStateSize, } sas, err := streamaggr.LoadFromFile(path, pushToRemoteStoragesTrackDropped, opts, "global") @@ -221,6 +227,9 @@ func (rwctx *remoteWriteCtx) newStreamAggrConfig() (*streamaggr.Aggregators, err } func newStreamAggrConfigPerURL(idx int, pushFunc streamaggr.PushFunc) (*streamaggr.Aggregators, error) { + if streamAggrStateSize.GetOptionalArg(idx) < 1 { + return nil, fmt.Errorf("--remoteWrite.streamAggr.stateSize should be greater than 0") + } path := streamAggrConfig.GetOptionalArg(idx) if path == "" { return nil, nil @@ -240,6 +249,7 @@ func newStreamAggrConfigPerURL(idx int, pushFunc streamaggr.PushFunc) (*streamag IgnoreOldSamples: streamAggrIgnoreOldSamples.GetOptionalArg(idx), IgnoreFirstIntervals: streamAggrIgnoreFirstIntervals.GetOptionalArg(idx), KeepInput: streamAggrKeepInput.GetOptionalArg(idx), + StateSize: streamAggrStateSize.GetOptionalArg(idx), } sas, err := streamaggr.LoadFromFile(path, pushFunc, opts, alias) diff --git a/app/vminsert/common/streamaggr.go b/app/vminsert/common/streamaggr.go index 3cc649c52..855a8ae75 100644 --- a/app/vminsert/common/streamaggr.go +++ b/app/vminsert/common/streamaggr.go @@ -36,6 +36,7 @@ var ( "See https://docs.victoriametrics.com/stream-aggregation/#ignoring-old-samples") streamAggrIgnoreFirstIntervals = flag.Int("streamAggr.ignoreFirstIntervals", 0, "Number of aggregation intervals to skip after the start. Increase this value if you observe incorrect aggregation results after restarts. It could be caused by receiving unordered delayed data from clients pushing data into the database. "+ "See https://docs.victoriametrics.com/stream-aggregation/#ignore-aggregation-intervals-on-start") + streamAggrStateSize = flag.Int("streamAggr.stateSize", 1, "Number of aggregation intervals") ) var ( @@ -62,6 +63,7 @@ func CheckStreamAggrConfig() error { DropInputLabels: *streamAggrDropInputLabels, IgnoreOldSamples: *streamAggrIgnoreOldSamples, IgnoreFirstIntervals: *streamAggrIgnoreFirstIntervals, + StateSize: *streamAggrStateSize, } sas, err := streamaggr.LoadFromFile(*streamAggrConfig, pushNoop, opts, "global") if err != nil { @@ -78,7 +80,7 @@ func InitStreamAggr() { saCfgReloaderStopCh = make(chan struct{}) if *streamAggrConfig == "" { if *streamAggrDedupInterval > 0 { - deduplicator = streamaggr.NewDeduplicator(pushAggregateSeries, *streamAggrDedupInterval, *streamAggrDropInputLabels, "global") + deduplicator = streamaggr.NewDeduplicator(pushAggregateSeries, *streamAggrStateSize, *streamAggrDedupInterval, *streamAggrDropInputLabels, "global") } return } diff --git a/lib/streamaggr/avg.go b/lib/streamaggr/avg.go index 0f7803220..83b4d5f8e 100644 --- a/lib/streamaggr/avg.go +++ b/lib/streamaggr/avg.go @@ -1,90 +1,20 @@ package streamaggr -import ( - "sync" - - "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" -) - -// avgAggrState calculates output=avg, e.g. the average value over input samples. -type avgAggrState struct { - m sync.Map -} - -type avgState struct { +type avgAggrValue struct { sum float64 count float64 } -type avgStateValue struct { - mu sync.Mutex - state [aggrStateSize]avgState - deleted bool - deleteDeadline int64 +func (sv *avgAggrValue) pushSample(ctx *pushSampleCtx) { + sv.sum += ctx.sample.value + sv.count++ } -func newAvgAggrState() *avgAggrState { - return &avgAggrState{} -} - -func (as *avgAggrState) pushSamples(samples []pushSample, deleteDeadline int64, idx int) { - for i := range samples { - s := &samples[i] - outputKey := getOutputKey(s.key) - - again: - v, ok := as.m.Load(outputKey) - if !ok { - // The entry is missing in the map. Try creating it. - v = &avgStateValue{} - outputKey = bytesutil.InternString(outputKey) - vNew, loaded := as.m.LoadOrStore(outputKey, v) - if loaded { - // Use the entry created by a concurrent goroutine. - v = vNew - } - } - sv := v.(*avgStateValue) - sv.mu.Lock() - deleted := sv.deleted - if !deleted { - sv.state[idx].sum += s.value - sv.state[idx].count++ - sv.deleteDeadline = deleteDeadline - } - sv.mu.Unlock() - if deleted { - // The entry has been deleted by the concurrent call to flushState - // Try obtaining and updating the entry again. - goto again - } +func (sv *avgAggrValue) flush(ctx *flushCtx, key string) { + if sv.count > 0 { + avg := sv.sum / sv.count + ctx.appendSeries(key, "avg", avg) + sv.sum = 0 + sv.count = 0 } } - -func (as *avgAggrState) flushState(ctx *flushCtx) { - m := &as.m - m.Range(func(k, v any) bool { - // Atomically delete the entry from the map, so new entry is created for the next flush. - sv := v.(*avgStateValue) - sv.mu.Lock() - - // check for stale entries - deleted := ctx.flushTimestamp > sv.deleteDeadline - if deleted { - // Mark the current entry as deleted - sv.deleted = deleted - sv.mu.Unlock() - m.Delete(k) - return true - } - state := sv.state[ctx.idx] - sv.state[ctx.idx] = avgState{} - sv.mu.Unlock() - if state.count > 0 { - key := k.(string) - avg := state.sum/state.count - ctx.appendSeries(key, "avg", avg) - } - return true - }) -} diff --git a/lib/streamaggr/count_samples.go b/lib/streamaggr/count_samples.go index df6d22bef..c0320c224 100644 --- a/lib/streamaggr/count_samples.go +++ b/lib/streamaggr/count_samples.go @@ -1,82 +1,14 @@ package streamaggr -import ( - "sync" - - "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" -) - -// countSamplesAggrState calculates output=count_samples, e.g. the count of input samples. -type countSamplesAggrState struct { - m sync.Map +type countSamplesAggrValue struct { + count uint64 } -type countSamplesStateValue struct { - mu sync.Mutex - state [aggrStateSize]uint64 - deleted bool - deleteDeadline int64 +func (av *countSamplesAggrValue) pushSample(_ *pushSampleCtx) { + av.count++ } -func newCountSamplesAggrState() *countSamplesAggrState { - return &countSamplesAggrState{} -} - -func (as *countSamplesAggrState) pushSamples(samples []pushSample, deleteDeadline int64, idx int) { - for i := range samples { - s := &samples[i] - outputKey := getOutputKey(s.key) - - again: - v, ok := as.m.Load(outputKey) - if !ok { - // The entry is missing in the map. Try creating it. - v = &countSamplesStateValue{} - outputKey = bytesutil.InternString(outputKey) - vNew, loaded := as.m.LoadOrStore(outputKey, v) - if loaded { - // Use the entry created by a concurrent goroutine. - v = vNew - } - } - sv := v.(*countSamplesStateValue) - sv.mu.Lock() - deleted := sv.deleted - if !deleted { - sv.state[idx]++ - sv.deleteDeadline = deleteDeadline - } - sv.mu.Unlock() - if deleted { - // The entry has been deleted by the concurrent call to flushState - // Try obtaining and updating the entry again. - goto again - } - } -} - -func (as *countSamplesAggrState) flushState(ctx *flushCtx) { - m := &as.m - m.Range(func(k, v any) bool { - sv := v.(*countSamplesStateValue) - sv.mu.Lock() - - // check for stale entries - deleted := ctx.flushTimestamp > sv.deleteDeadline - if deleted { - // Mark the current entry as deleted - sv.deleted = deleted - sv.mu.Unlock() - m.Delete(k) - return true - } - state := sv.state[ctx.idx] - sv.state[ctx.idx] = 0 - sv.mu.Unlock() - if state > 0 { - key := k.(string) - ctx.appendSeries(key, "count_samples", float64(state)) - } - return true - }) +func (av *countSamplesAggrValue) flush(ctx *flushCtx, key string) { + ctx.appendSeries(key, "count_samples", float64(av.count)) + av.count = 0 } diff --git a/lib/streamaggr/count_series.go b/lib/streamaggr/count_series.go index e0ab85885..ed92c46c1 100644 --- a/lib/streamaggr/count_series.go +++ b/lib/streamaggr/count_series.go @@ -1,93 +1,33 @@ package streamaggr import ( - "sync" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/cespare/xxhash/v2" ) -// countSeriesAggrState calculates output=count_series, e.g. the number of unique series. -type countSeriesAggrState struct { - m sync.Map +func countSeriesInitFn(values []aggrValue) []aggrValue { + for i := range values { + values[i] = &countSeriesAggrValue{ + samples: make(map[uint64]struct{}), + } + } + return values } -type countSeriesStateValue struct { - mu sync.Mutex - state [aggrStateSize]map[uint64]struct{} - deleted bool - deleteDeadline int64 +type countSeriesAggrValue struct { + samples map[uint64]struct{} } -func newCountSeriesAggrState() *countSeriesAggrState { - return &countSeriesAggrState{} -} - -func (as *countSeriesAggrState) pushSamples(samples []pushSample, deleteDeadline int64, idx int) { - for i := range samples { - s := &samples[i] - inputKey, outputKey := getInputOutputKey(s.key) - - // Count unique hashes over the inputKeys instead of unique inputKey values. - // This reduces memory usage at the cost of possible hash collisions for distinct inputKey values. - h := xxhash.Sum64(bytesutil.ToUnsafeBytes(inputKey)) - - again: - v, ok := as.m.Load(outputKey) - if !ok { - // The entry is missing in the map. Try creating it. - csv := &countSeriesStateValue{} - for ic := range csv.state { - csv.state[ic] = make(map[uint64]struct{}) - } - v = csv - outputKey = bytesutil.InternString(outputKey) - vNew, loaded := as.m.LoadOrStore(outputKey, v) - if loaded { - // Update the entry created by a concurrent goroutine. - v = vNew - } - } - sv := v.(*countSeriesStateValue) - sv.mu.Lock() - deleted := sv.deleted - if !deleted { - if _, ok := sv.state[idx][h]; !ok { - sv.state[idx][h] = struct{}{} - } - sv.deleteDeadline = deleteDeadline - } - sv.mu.Unlock() - if deleted { - // The entry has been deleted by the concurrent call to flushState - // Try obtaining and updating the entry again. - goto again - } +func (av *countSeriesAggrValue) pushSample(ctx *pushSampleCtx) { + // Count unique hashes over the inputKeys instead of unique inputKey values. + // This reduces memory usage at the cost of possible hash collisions for distinct inputKey values. + h := xxhash.Sum64(bytesutil.ToUnsafeBytes(ctx.inputKey)) + if _, ok := av.samples[h]; !ok { + av.samples[h] = struct{}{} } } -func (as *countSeriesAggrState) flushState(ctx *flushCtx) { - m := &as.m - m.Range(func(k, v any) bool { - sv := v.(*countSeriesStateValue) - sv.mu.Lock() - - // check for stale entries - deleted := ctx.flushTimestamp > sv.deleteDeadline - if deleted { - // Mark the current entry as deleted - sv.deleted = deleted - sv.mu.Unlock() - m.Delete(k) - return true - } - state := len(sv.state[ctx.idx]) - sv.state[ctx.idx] = make(map[uint64]struct{}) - sv.mu.Unlock() - if state > 0 { - key := k.(string) - ctx.appendSeries(key, "count_series", float64(state)) - } - return true - }) +func (av *countSeriesAggrValue) flush(ctx *flushCtx, key string) { + ctx.appendSeries(key, "count_series", float64(len(av.samples))) + clear(av.samples) } diff --git a/lib/streamaggr/dedup.go b/lib/streamaggr/dedup.go index 70a77a6d8..24dcca554 100644 --- a/lib/streamaggr/dedup.go +++ b/lib/streamaggr/dedup.go @@ -14,8 +14,8 @@ import ( const dedupAggrShardsCount = 128 type dedupAggr struct { - shards []dedupAggrShard - currentIdx atomic.Int32 + shards []dedupAggrShard + stateSize int } type dedupAggrShard struct { @@ -34,8 +34,8 @@ type dedupAggrState struct { } type dedupAggrShardNopad struct { - mu sync.RWMutex - state [aggrStateSize]*dedupAggrState + mu sync.Mutex + state []*dedupAggrState } type dedupAggrSample struct { @@ -43,19 +43,21 @@ type dedupAggrSample struct { timestamp int64 } -func newDedupAggr() *dedupAggr { +func newDedupAggr(stateSize int) *dedupAggr { shards := make([]dedupAggrShard, dedupAggrShardsCount) return &dedupAggr{ - shards: shards, + shards: shards, + stateSize: stateSize, } } func (da *dedupAggr) sizeBytes() uint64 { n := uint64(unsafe.Sizeof(*da)) - currentIdx := da.currentIdx.Load() for i := range da.shards { - if da.shards[i].state[currentIdx] != nil { - n += da.shards[i].state[currentIdx].sizeBytes.Load() + for _, state := range da.shards[i].state { + if state != nil { + n += state.sizeBytes.Load() + } } } return n @@ -63,19 +65,20 @@ func (da *dedupAggr) sizeBytes() uint64 { func (da *dedupAggr) itemsCount() uint64 { n := uint64(0) - currentIdx := da.currentIdx.Load() for i := range da.shards { - if da.shards[i].state[currentIdx] != nil { - n += da.shards[i].state[currentIdx].itemsCount.Load() + for _, state := range da.shards[i].state { + if state != nil { + n += state.itemsCount.Load() + } } } return n } -func (da *dedupAggr) pushSamples(samples []pushSample, _ int64, dedupIdx int) { +func (da *dedupAggr) pushSamples(data *pushCtxData) { pss := getPerShardSamples() shards := pss.shards - for _, sample := range samples { + for _, sample := range data.samples { h := xxhash.Sum64(bytesutil.ToUnsafeBytes(sample.key)) idx := h % uint64(len(shards)) shards[idx] = append(shards[idx], sample) @@ -84,17 +87,21 @@ func (da *dedupAggr) pushSamples(samples []pushSample, _ int64, dedupIdx int) { if len(shardSamples) == 0 { continue } - da.shards[i].pushSamples(shardSamples, dedupIdx) + da.shards[i].pushSamples(shardSamples, da.stateSize, data.idx) } putPerShardSamples(pss) } -func getDedupFlushCtx() *dedupFlushCtx { +func getDedupFlushCtx(deleteDeadline int64, dedupIdx, flushIdx int) *dedupFlushCtx { v := dedupFlushCtxPool.Get() if v == nil { - return &dedupFlushCtx{} + v = &dedupFlushCtx{} } - return v.(*dedupFlushCtx) + ctx := v.(*dedupFlushCtx) + ctx.deleteDeadline = deleteDeadline + ctx.dedupIdx = dedupIdx + ctx.flushIdx = flushIdx + return ctx } func putDedupFlushCtx(ctx *dedupFlushCtx) { @@ -105,12 +112,26 @@ func putDedupFlushCtx(ctx *dedupFlushCtx) { var dedupFlushCtxPool sync.Pool type dedupFlushCtx struct { - samples []pushSample + samples []pushSample + deleteDeadline int64 + dedupIdx int + flushIdx int +} + +func (ctx *dedupFlushCtx) getPushCtxData(samples []pushSample) *pushCtxData { + return &pushCtxData{ + samples: samples, + deleteDeadline: ctx.deleteDeadline, + idx: ctx.flushIdx, + } } func (ctx *dedupFlushCtx) reset() { clear(ctx.samples) ctx.samples = ctx.samples[:0] + ctx.deleteDeadline = 0 + ctx.dedupIdx = 0 + ctx.flushIdx = 0 } func (da *dedupAggr) flush(f aggrPushFunc, deleteDeadline int64, dedupIdx, flushIdx int) { @@ -123,13 +144,11 @@ func (da *dedupAggr) flush(f aggrPushFunc, deleteDeadline int64, dedupIdx, flush <-flushConcurrencyCh wg.Done() }() - - ctx := getDedupFlushCtx() - shard.flush(ctx, f, deleteDeadline, dedupIdx, flushIdx) + ctx := getDedupFlushCtx(deleteDeadline, dedupIdx, flushIdx) + shard.flush(ctx, f) putDedupFlushCtx(ctx) }(&da.shards[i]) } - da.currentIdx.Store((da.currentIdx.Load() + 1) % aggrStateSize) wg.Wait() } @@ -164,10 +183,13 @@ func putPerShardSamples(pss *perShardSamples) { var perShardSamplesPool sync.Pool -func (das *dedupAggrShard) pushSamples(samples []pushSample, dedupIdx int) { +func (das *dedupAggrShard) pushSamples(samples []pushSample, stateSize, dedupIdx int) { das.mu.Lock() defer das.mu.Unlock() + if len(das.state) == 0 { + das.state = make([]*dedupAggrState, stateSize) + } state := das.state[dedupIdx] if state == nil { state = &dedupAggrState{ @@ -200,17 +222,20 @@ func (das *dedupAggrShard) pushSamples(samples []pushSample, dedupIdx int) { das.state[dedupIdx].samplesBuf = samplesBuf } -func (das *dedupAggrShard) flush(ctx *dedupFlushCtx, f aggrPushFunc, deleteDeadline int64, dedupIdx, flushIdx int) { +func (das *dedupAggrShard) flush(ctx *dedupFlushCtx, f aggrPushFunc) { das.mu.Lock() var m map[string]*dedupAggrSample - state := das.state[dedupIdx] + if len(das.state) == 0 { + return + } + state := das.state[ctx.dedupIdx] if state != nil && len(state.m) > 0 { m = state.m - das.state[dedupIdx].m = make(map[string]*dedupAggrSample, len(state.m)) - das.state[dedupIdx].samplesBuf = make([]dedupAggrSample, 0, len(das.state[dedupIdx].samplesBuf)) - das.state[dedupIdx].sizeBytes.Store(0) - das.state[dedupIdx].itemsCount.Store(0) + das.state[ctx.dedupIdx].m = make(map[string]*dedupAggrSample, len(state.m)) + das.state[ctx.dedupIdx].samplesBuf = make([]dedupAggrSample, 0, len(das.state[ctx.dedupIdx].samplesBuf)) + das.state[ctx.dedupIdx].sizeBytes.Store(0) + das.state[ctx.dedupIdx].itemsCount.Store(0) } das.mu.Unlock() @@ -229,11 +254,13 @@ func (das *dedupAggrShard) flush(ctx *dedupFlushCtx, f aggrPushFunc, deleteDeadl // Limit the number of samples per each flush in order to limit memory usage. if len(dstSamples) >= 10_000 { - f(dstSamples, deleteDeadline, flushIdx) + data := ctx.getPushCtxData(dstSamples) + f(data) clear(dstSamples) dstSamples = dstSamples[:0] } } - f(dstSamples, deleteDeadline, flushIdx) + data := ctx.getPushCtxData(dstSamples) + f(data) ctx.samples = dstSamples } diff --git a/lib/streamaggr/dedup_test.go b/lib/streamaggr/dedup_test.go index 94daeb33a..f27b9af1a 100644 --- a/lib/streamaggr/dedup_test.go +++ b/lib/streamaggr/dedup_test.go @@ -9,7 +9,7 @@ import ( ) func TestDedupAggrSerial(t *testing.T) { - da := newDedupAggr() + da := newDedupAggr(2) const seriesCount = 100_000 expectedSamplesMap := make(map[string]pushSample) @@ -21,7 +21,10 @@ func TestDedupAggrSerial(t *testing.T) { sample.value = float64(i + j) expectedSamplesMap[sample.key] = *sample } - da.pushSamples(samples, 0, 0) + data := &pushCtxData{ + samples: samples, + } + da.pushSamples(data) } if n := da.sizeBytes(); n > 5_000_000 { @@ -33,9 +36,9 @@ func TestDedupAggrSerial(t *testing.T) { flushedSamplesMap := make(map[string]pushSample) var mu sync.Mutex - flushSamples := func(samples []pushSample, _ int64, _ int) { + flushSamples := func(ctx *pushCtxData) { mu.Lock() - for _, sample := range samples { + for _, sample := range ctx.samples { flushedSamplesMap[sample.key] = sample } mu.Unlock() @@ -59,7 +62,7 @@ func TestDedupAggrSerial(t *testing.T) { func TestDedupAggrConcurrent(_ *testing.T) { const concurrency = 5 const seriesCount = 10_000 - da := newDedupAggr() + da := newDedupAggr(2) var wg sync.WaitGroup for i := 0; i < concurrency; i++ { @@ -67,13 +70,15 @@ func TestDedupAggrConcurrent(_ *testing.T) { go func() { defer wg.Done() for i := 0; i < 10; i++ { - samples := make([]pushSample, seriesCount) - for j := range samples { - sample := &samples[j] + data := &pushCtxData{ + samples: make([]pushSample, seriesCount), + } + for j := range data.samples { + sample := &data.samples[j] sample.key = fmt.Sprintf("key_%d", j) sample.value = float64(i + j) } - da.pushSamples(samples, 0, 0) + da.pushSamples(data) } }() } diff --git a/lib/streamaggr/dedup_timing_test.go b/lib/streamaggr/dedup_timing_test.go index 6f6cc4c7a..ad09ab1c2 100644 --- a/lib/streamaggr/dedup_timing_test.go +++ b/lib/streamaggr/dedup_timing_test.go @@ -19,7 +19,7 @@ func BenchmarkDedupAggr(b *testing.B) { func benchmarkDedupAggr(b *testing.B, samplesPerPush int) { const loops = 2 benchSamples := newBenchSamples(samplesPerPush) - da := newDedupAggr() + da := newDedupAggr(2) b.ResetTimer() b.ReportAllocs() @@ -27,7 +27,9 @@ func benchmarkDedupAggr(b *testing.B, samplesPerPush int) { b.RunParallel(func(pb *testing.PB) { for pb.Next() { for i := 0; i < loops; i++ { - da.pushSamples(benchSamples, 0, 0) + da.pushSamples(&pushCtxData{ + samples: benchSamples, + }) } } }) diff --git a/lib/streamaggr/deduplicator.go b/lib/streamaggr/deduplicator.go index e24b2d6cb..42391b5e6 100644 --- a/lib/streamaggr/deduplicator.go +++ b/lib/streamaggr/deduplicator.go @@ -10,6 +10,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil" "github.com/VictoriaMetrics/metrics" ) @@ -17,6 +18,7 @@ import ( type Deduplicator struct { da *dedupAggr + stateSize int dropLabels []string dedupInterval int64 @@ -39,11 +41,12 @@ type Deduplicator struct { // alias is url label used in metrics exposed by the returned Deduplicator. // // MustStop must be called on the returned deduplicator in order to free up occupied resources. -func NewDeduplicator(pushFunc PushFunc, dedupInterval time.Duration, dropLabels []string, alias string) *Deduplicator { +func NewDeduplicator(pushFunc PushFunc, stateSize int, dedupInterval time.Duration, dropLabels []string, alias string) *Deduplicator { d := &Deduplicator{ - da: newDedupAggr(), + da: newDedupAggr(stateSize), dropLabels: dropLabels, dedupInterval: dedupInterval.Milliseconds(), + stateSize: stateSize, stopCh: make(chan struct{}), ms: metrics.NewSet(), @@ -85,13 +88,13 @@ func (d *Deduplicator) MustStop() { // Push pushes tss to d. func (d *Deduplicator) Push(tss []prompbmarshal.TimeSeries) { - ctx := getDeduplicatorPushCtx() + ctx := getDeduplicatorPushCtx(d.stateSize) pss := ctx.pss labels := &ctx.labels buf := ctx.buf dropLabels := d.dropLabels - aggrIntervals := int64(aggrStateSize) + aggrIntervals := int64(d.stateSize) for _, ts := range tss { if len(dropLabels) > 0 { labels.Labels = dropSeriesLabels(labels.Labels[:0], ts.Labels, dropLabels) @@ -117,8 +120,11 @@ func (d *Deduplicator) Push(tss []prompbmarshal.TimeSeries) { } } + data := &pushCtxData{} for idx, ps := range pss { - d.da.pushSamples(ps, 0, idx) + data.idx = idx + data.samples = ps + d.da.pushSamples(data) } ctx.pss = pss @@ -146,20 +152,21 @@ func (d *Deduplicator) runFlusher(pushFunc PushFunc, dedupInterval time.Duration flushTime := t.Truncate(dedupInterval).Add(dedupInterval) flushTimestamp := flushTime.UnixMilli() flushIntervals := int(flushTimestamp / int64(dedupInterval/time.Millisecond)) - flushIdx := flushIntervals % aggrStateSize - d.flush(pushFunc, dedupInterval, flushTime, flushIdx) + flushIdx := flushIntervals % d.stateSize + d.flush(pushFunc, dedupInterval, flushTimestamp, flushIdx) } } } -func (d *Deduplicator) flush(pushFunc PushFunc, dedupInterval time.Duration, flushTime time.Time, flushIdx int) { - d.da.flush(func(pss []pushSample, _ int64, _ int) { +func (d *Deduplicator) flush(pushFunc PushFunc, dedupInterval time.Duration, flushTimestamp int64, idx int) { + startTime := time.Now() + d.da.flush(func(data *pushCtxData) { ctx := getDeduplicatorFlushCtx() tss := ctx.tss labels := ctx.labels samples := ctx.samples - for _, ps := range pss { + for _, ps := range data.samples { labelsLen := len(labels) labels = decompressLabels(labels, ps.key) @@ -180,9 +187,9 @@ func (d *Deduplicator) flush(pushFunc PushFunc, dedupInterval time.Duration, flu ctx.labels = labels ctx.samples = samples putDeduplicatorFlushCtx(ctx) - }, flushTime.UnixMilli(), flushIdx, flushIdx) + }, flushTimestamp, idx, idx) - duration := time.Since(flushTime) + duration := time.Since(startTime) d.dedupFlushDuration.Update(duration.Seconds()) if duration > dedupInterval { d.dedupFlushTimeouts.Inc() @@ -193,7 +200,7 @@ func (d *Deduplicator) flush(pushFunc PushFunc, dedupInterval time.Duration, flu } type deduplicatorPushCtx struct { - pss [aggrStateSize][]pushSample + pss [][]pushSample labels promutils.Labels buf []byte } @@ -208,12 +215,18 @@ func (ctx *deduplicatorPushCtx) reset() { ctx.buf = ctx.buf[:0] } -func getDeduplicatorPushCtx() *deduplicatorPushCtx { +func getDeduplicatorPushCtx(stateSize int) *deduplicatorPushCtx { v := deduplicatorPushCtxPool.Get() if v == nil { - return &deduplicatorPushCtx{} + return &deduplicatorPushCtx{ + pss: make([][]pushSample, stateSize), + } } - return v.(*deduplicatorPushCtx) + ctx := v.(*deduplicatorPushCtx) + if len(ctx.pss) < stateSize { + ctx.pss = slicesutil.SetLength(ctx.pss, stateSize) + } + return ctx } func putDeduplicatorPushCtx(ctx *deduplicatorPushCtx) { diff --git a/lib/streamaggr/deduplicator_test.go b/lib/streamaggr/deduplicator_test.go index 48b5b4c83..3da2b5cf7 100644 --- a/lib/streamaggr/deduplicator_test.go +++ b/lib/streamaggr/deduplicator_test.go @@ -31,16 +31,17 @@ baz_aaa_aaa_fdd{instance="x",job="aaa",pod="sdfd-dfdfdfs",node="aosijjewrerfd",n `, offsetMsecs) dedupInterval := time.Hour - d := NewDeduplicator(pushFunc, dedupInterval, []string{"node", "instance"}, "global") + d := NewDeduplicator(pushFunc, 2, dedupInterval, []string{"node", "instance"}, "global") for i := 0; i < 10; i++ { d.Push(tss) } flushTime := time.Now() flushIntervals := flushTime.UnixMilli()/dedupInterval.Milliseconds() + 1 + aggrStateSize := 2 idx := int(flushIntervals % int64(aggrStateSize)) - d.flush(pushFunc, time.Hour, time.Now(), idx) + d.flush(pushFunc, time.Hour, time.Now().UnixMilli(), idx) d.MustStop() result := timeSeriessToString(tssResult) diff --git a/lib/streamaggr/deduplicator_timing_test.go b/lib/streamaggr/deduplicator_timing_test.go index ed98b638c..b9b184037 100644 --- a/lib/streamaggr/deduplicator_timing_test.go +++ b/lib/streamaggr/deduplicator_timing_test.go @@ -9,7 +9,7 @@ import ( func BenchmarkDeduplicatorPush(b *testing.B) { pushFunc := func(_ []prompbmarshal.TimeSeries) {} - d := NewDeduplicator(pushFunc, time.Hour, nil, "global") + d := NewDeduplicator(pushFunc, 2, time.Hour, nil, "global") b.ReportAllocs() b.SetBytes(int64(len(benchSeries))) diff --git a/lib/streamaggr/histogram_bucket.go b/lib/streamaggr/histogram_bucket.go index 7a72f30f7..a11dad755 100644 --- a/lib/streamaggr/histogram_bucket.go +++ b/lib/streamaggr/histogram_bucket.go @@ -1,85 +1,24 @@ package streamaggr import ( - "sync" - - "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/metrics" ) -// histogramBucketAggrState calculates output=histogram_bucket, e.g. VictoriaMetrics histogram over input samples. -type histogramBucketAggrState struct { - m sync.Map +// histogramBucketAggrValue calculates output=histogram_bucket, e.g. VictoriaMetrics histogram over input samples. +type histogramBucketAggrValue struct { + h metrics.Histogram + state metrics.Histogram } -type histogramBucketStateValue struct { - mu sync.Mutex - state [aggrStateSize]metrics.Histogram - total metrics.Histogram - deleted bool - deleteDeadline int64 +func (sv *histogramBucketAggrValue) pushSample(ctx *pushSampleCtx) { + sv.h.Update(ctx.sample.value) } -func newHistogramBucketAggrState() *histogramBucketAggrState { - return &histogramBucketAggrState{} -} - -func (as *histogramBucketAggrState) pushSamples(samples []pushSample, deleteDeadline int64, idx int) { - for i := range samples { - s := &samples[i] - outputKey := getOutputKey(s.key) - - again: - v, ok := as.m.Load(outputKey) - if !ok { - // The entry is missing in the map. Try creating it. - v = &histogramBucketStateValue{} - outputKey = bytesutil.InternString(outputKey) - vNew, loaded := as.m.LoadOrStore(outputKey, v) - if loaded { - // Use the entry created by a concurrent goroutine. - v = vNew - } - } - sv := v.(*histogramBucketStateValue) - sv.mu.Lock() - deleted := sv.deleted - if !deleted { - sv.state[idx].Update(s.value) - sv.deleteDeadline = deleteDeadline - } - sv.mu.Unlock() - if deleted { - // The entry has been deleted by the concurrent call to flushState - // Try obtaining and updating the entry again. - goto again - } - } -} - -func (as *histogramBucketAggrState) flushState(ctx *flushCtx) { - m := &as.m - m.Range(func(k, v any) bool { - sv := v.(*histogramBucketStateValue) - sv.mu.Lock() - - // check for stale entries - deleted := ctx.flushTimestamp > sv.deleteDeadline - if deleted { - // Mark the current entry as deleted - sv.deleted = deleted - sv.mu.Unlock() - m.Delete(k) - return true - } - sv.total.Merge(&sv.state[ctx.idx]) - total := &sv.total - sv.state[ctx.idx] = metrics.Histogram{} - sv.mu.Unlock() - key := k.(string) - total.VisitNonZeroBuckets(func(vmrange string, count uint64) { - ctx.appendSeriesWithExtraLabel(key, "histogram_bucket", float64(count), "vmrange", vmrange) - }) - return true +func (sv *histogramBucketAggrValue) flush(ctx *flushCtx, key string) { + total := &sv.state + total.Merge(&sv.h) + total.VisitNonZeroBuckets(func(vmrange string, count uint64) { + ctx.appendSeriesWithExtraLabel(key, "histogram_bucket", float64(count), "vmrange", vmrange) }) + total.Reset() } diff --git a/lib/streamaggr/last.go b/lib/streamaggr/last.go index 4c39049de..4d6781afc 100644 --- a/lib/streamaggr/last.go +++ b/lib/streamaggr/last.go @@ -1,90 +1,20 @@ package streamaggr -import ( - "sync" - - "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" -) - -// lastAggrState calculates output=last, e.g. the last value over input samples. -type lastAggrState struct { - m sync.Map -} - -type lastStateValue struct { - mu sync.Mutex - state [aggrStateSize]lastState - deleted bool - deleteDeadline int64 -} - -type lastState struct { +type lastAggrValue struct { last float64 timestamp int64 } -func newLastAggrState() *lastAggrState { - return &lastAggrState{} -} - -func (as *lastAggrState) pushSamples(samples []pushSample, deleteDeadline int64, idx int) { - for i := range samples { - s := &samples[i] - outputKey := getOutputKey(s.key) - - again: - v, ok := as.m.Load(outputKey) - if !ok { - // The entry is missing in the map. Try creating it. - v = &lastStateValue{} - outputKey = bytesutil.InternString(outputKey) - vNew, loaded := as.m.LoadOrStore(outputKey, v) - if loaded { - // Update the entry created by a concurrent goroutine. - v = vNew - } - } - sv := v.(*lastStateValue) - sv.mu.Lock() - deleted := sv.deleted - if !deleted { - if s.timestamp >= sv.state[idx].timestamp { - sv.state[idx].last = s.value - sv.state[idx].timestamp = s.timestamp - } - sv.deleteDeadline = deleteDeadline - } - sv.mu.Unlock() - if deleted { - // The entry has been deleted by the concurrent call to flushState - // Try obtaining and updating the entry again. - goto again - } +func (av *lastAggrValue) pushSample(ctx *pushSampleCtx) { + if ctx.sample.timestamp >= av.timestamp { + av.last = ctx.sample.value + av.timestamp = ctx.sample.timestamp } } -func (as *lastAggrState) flushState(ctx *flushCtx) { - m := &as.m - m.Range(func(k, v any) bool { - sv := v.(*lastStateValue) - sv.mu.Lock() - - // check for stale entries - deleted := ctx.flushTimestamp > sv.deleteDeadline - if deleted { - // Mark the current entry as deleted - sv.deleted = deleted - sv.mu.Unlock() - m.Delete(k) - return true - } - state := sv.state[ctx.idx] - sv.state[ctx.idx] = lastState{} - sv.mu.Unlock() - if state.timestamp > 0 { - key := k.(string) - ctx.appendSeries(key, "last", state.last) - } - return true - }) +func (av *lastAggrValue) flush(ctx *flushCtx, key string) { + if av.timestamp > 0 { + ctx.appendSeries(key, "last", av.last) + av.timestamp = 0 + } } diff --git a/lib/streamaggr/max.go b/lib/streamaggr/max.go index 54224929b..aa18b0340 100644 --- a/lib/streamaggr/max.go +++ b/lib/streamaggr/max.go @@ -1,93 +1,23 @@ package streamaggr -import ( - "sync" - - "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" -) - -// maxAggrState calculates output=max, e.g. the maximum value over input samples. -type maxAggrState struct { - m sync.Map +type maxAggrValue struct { + max float64 + defined bool } -type maxStateValue struct { - mu sync.Mutex - state [aggrStateSize]maxState - deleted bool - deleteDeadline int64 -} - -type maxState struct { - max float64 - exists bool -} - -func newMaxAggrState() *maxAggrState { - return &maxAggrState{} -} - -func (as *maxAggrState) pushSamples(samples []pushSample, deleteDeadline int64, idx int) { - for i := range samples { - s := &samples[i] - outputKey := getOutputKey(s.key) - - again: - v, ok := as.m.Load(outputKey) - if !ok { - // The entry is missing in the map. Try creating it. - v = &maxStateValue{} - outputKey = bytesutil.InternString(outputKey) - vNew, loaded := as.m.LoadOrStore(outputKey, v) - if loaded { - // Use the entry created by a concurrent goroutine. - v = vNew - } - } - sv := v.(*maxStateValue) - sv.mu.Lock() - deleted := sv.deleted - if !deleted { - state := &sv.state[idx] - if !state.exists { - state.max = s.value - state.exists = true - } else if s.value > state.max { - state.max = s.value - } - sv.deleteDeadline = deleteDeadline - } - sv.mu.Unlock() - if deleted { - // The entry has been deleted by the concurrent call to flushState - // Try obtaining and updating the entry again. - goto again - } +func (av *maxAggrValue) pushSample(ctx *pushSampleCtx) { + if ctx.sample.value > av.max || !av.defined { + av.max = ctx.sample.value + } + if !av.defined { + av.defined = true } } -func (as *maxAggrState) flushState(ctx *flushCtx) { - m := &as.m - m.Range(func(k, v any) bool { - sv := v.(*maxStateValue) - sv.mu.Lock() - - // check for stale entries - deleted := ctx.flushTimestamp > sv.deleteDeadline - if deleted { - // Mark the current entry as deleted - sv.deleted = deleted - sv.mu.Unlock() - m.Delete(k) - return true - } - state := sv.state[ctx.idx] - sv.state[ctx.idx] = maxState{} - sv.mu.Unlock() - if state.exists { - key := k.(string) - ctx.appendSeries(key, "max", state.max) - } - return true - }) +func (av *maxAggrValue) flush(ctx *flushCtx, key string) { + if av.defined { + ctx.appendSeries(key, "max", av.max) + av.max = 0 + av.defined = false + } } diff --git a/lib/streamaggr/min.go b/lib/streamaggr/min.go index de6620c3f..1b74e5f93 100644 --- a/lib/streamaggr/min.go +++ b/lib/streamaggr/min.go @@ -1,93 +1,23 @@ package streamaggr -import ( - "sync" - - "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" -) - -// minAggrState calculates output=min, e.g. the minimum value over input samples. -type minAggrState struct { - m sync.Map +type minAggrValue struct { + min float64 + defined bool } -type minStateValue struct { - mu sync.Mutex - state [aggrStateSize]minState - deleted bool - deleteDeadline int64 -} - -type minState struct { - min float64 - exists bool -} - -func newMinAggrState() *minAggrState { - return &minAggrState{} -} - -func (as *minAggrState) pushSamples(samples []pushSample, deleteDeadline int64, idx int) { - for i := range samples { - s := &samples[i] - outputKey := getOutputKey(s.key) - - again: - v, ok := as.m.Load(outputKey) - if !ok { - // The entry is missing in the map. Try creating it. - v = &minStateValue{} - outputKey = bytesutil.InternString(outputKey) - vNew, loaded := as.m.LoadOrStore(outputKey, v) - if loaded { - // Use the entry created by a concurrent goroutine. - v = vNew - } - } - sv := v.(*minStateValue) - sv.mu.Lock() - deleted := sv.deleted - if !deleted { - state := &sv.state[idx] - if !state.exists { - state.min = s.value - state.exists = true - } else if s.value < state.min { - state.min = s.value - } - sv.deleteDeadline = deleteDeadline - } - sv.mu.Unlock() - if deleted { - // The entry has been deleted by the concurrent call to flushState - // Try obtaining and updating the entry again. - goto again - } +func (av *minAggrValue) pushSample(ctx *pushSampleCtx) { + if ctx.sample.value < av.min || !av.defined { + av.min = ctx.sample.value + } + if !av.defined { + av.defined = true } } -func (as *minAggrState) flushState(ctx *flushCtx) { - m := &as.m - m.Range(func(k, v any) bool { - sv := v.(*minStateValue) - sv.mu.Lock() - - // check for stale entries - deleted := ctx.flushTimestamp > sv.deleteDeadline - if deleted { - // Mark the current entry as deleted - sv.deleted = deleted - sv.mu.Unlock() - m.Delete(k) - return true - } - state := sv.state[ctx.idx] - sv.state[ctx.idx] = minState{} - sv.mu.Unlock() - if state.exists { - key := k.(string) - ctx.appendSeries(key, "min", state.min) - } - return true - }) +func (av *minAggrValue) flush(ctx *flushCtx, key string) { + if av.defined { + ctx.appendSeries(key, "min", av.min) + av.defined = false + av.min = 0 + } } diff --git a/lib/streamaggr/output.go b/lib/streamaggr/output.go new file mode 100644 index 000000000..66b91cef5 --- /dev/null +++ b/lib/streamaggr/output.go @@ -0,0 +1,129 @@ +package streamaggr + +import ( + "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" + "github.com/VictoriaMetrics/metrics" + "sync" +) + +type pushSampleCtx struct { + stateSize int + deleteDeadline int64 + sample *pushSample + idx int + inputKey string +} + +type aggrValuesFn func(*pushSampleCtx) []aggrValue + +type aggrValuesInitFn func([]aggrValue) []aggrValue + +func newAggrValues[V any, VP aggrValuePtr[V]](initFn aggrValuesInitFn) aggrValuesFn { + return func(ctx *pushSampleCtx) []aggrValue { + output := make([]aggrValue, ctx.stateSize) + if initFn != nil { + return initFn(output) + } + for i := range output { + var v VP = new(V) + output[i] = v + } + return output + } +} + +type aggrOutputs struct { + m sync.Map + stateSize int + initFns []aggrValuesFn + outputSamples *metrics.Counter +} + +func (ao *aggrOutputs) pushSamples(data *pushCtxData) { + ctx := &pushSampleCtx{ + stateSize: ao.stateSize, + deleteDeadline: data.deleteDeadline, + idx: data.idx, + } + var outputKey string + for i := range data.samples { + ctx.sample = &data.samples[i] + ctx.inputKey, outputKey = getInputOutputKey(ctx.sample.key) + + again: + v, ok := ao.m.Load(outputKey) + if !ok { + // The entry is missing in the map. Try creating it. + nv := &aggrValues{ + values: make([][]aggrValue, len(ao.initFns)), + } + for i, initFn := range ao.initFns { + nv.values[i] = initFn(ctx) + } + v = nv + outputKey = bytesutil.InternString(outputKey) + vNew, loaded := ao.m.LoadOrStore(outputKey, v) + if loaded { + // Use the entry created by a concurrent goroutine. + v = vNew + } + } + av := v.(*aggrValues) + av.mu.Lock() + deleted := av.deleted + if !deleted { + for i := range av.values { + av.values[i][data.idx].pushSample(ctx) + } + av.deleteDeadline = data.deleteDeadline + } + av.mu.Unlock() + if deleted { + // The entry has been deleted by the concurrent call to flush + // Try obtaining and updating the entry again. + goto again + } + } +} + +func (ao *aggrOutputs) flushState(ctx *flushCtx) { + m := &ao.m + m.Range(func(k, v any) bool { + // Atomically delete the entry from the map, so new entry is created for the next flush. + av := v.(*aggrValues) + av.mu.Lock() + + // check for stale entries + deleted := ctx.flushTimestamp > av.deleteDeadline + if deleted { + // Mark the current entry as deleted + av.deleted = deleted + av.mu.Unlock() + m.Delete(k) + return true + } + key := k.(string) + for _, ov := range av.values { + ov[ctx.idx].flush(ctx, key) + } + av.mu.Unlock() + return true + }) +} + +type aggrValues struct { + mu sync.Mutex + values [][]aggrValue + deleteDeadline int64 + deleted bool +} + +type aggrValue interface { + pushSample(*pushSampleCtx) + flush(*flushCtx, string) +} + +type aggrValuePtr[V any] interface { + *V + aggrValue +} diff --git a/lib/streamaggr/quantiles.go b/lib/streamaggr/quantiles.go index c866ae17e..615eb7c21 100644 --- a/lib/streamaggr/quantiles.go +++ b/lib/streamaggr/quantiles.go @@ -1,102 +1,59 @@ package streamaggr import ( - "strconv" - "sync" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/valyala/histogram" + "strconv" ) -// quantilesAggrState calculates output=quantiles, e.g. the given quantiles over the input samples. +func quantilesInitFn(stateSize int, phis []float64) aggrValuesInitFn { + states := make([]*quantilesAggrState, stateSize) + return func(values []aggrValue) []aggrValue { + for i := range values { + state := states[i] + if state == nil { + state = &quantilesAggrState{ + phis: phis, + } + states[i] = state + } + values[i] = &quantilesAggrValue{ + state: state, + } + } + return values + } +} + type quantilesAggrState struct { - m sync.Map - phis []float64 + phis []float64 + quantiles []float64 + b []byte } -type quantilesStateValue struct { - mu sync.Mutex - state [aggrStateSize]*histogram.Fast - deleted bool - deleteDeadline int64 +// quantilesAggrValue calculates output=quantiles, e.g. the given quantiles over the input samples. +type quantilesAggrValue struct { + h *histogram.Fast + state *quantilesAggrState } -func newQuantilesAggrState(phis []float64) *quantilesAggrState { - return &quantilesAggrState{ - phis: phis, +func (av *quantilesAggrValue) pushSample(ctx *pushSampleCtx) { + if av.h == nil { + av.h = histogram.GetFast() } + av.h.Update(ctx.sample.value) } -func (as *quantilesAggrState) pushSamples(samples []pushSample, deleteDeadline int64, idx int) { - for i := range samples { - s := &samples[i] - outputKey := getOutputKey(s.key) - - again: - v, ok := as.m.Load(outputKey) - if !ok { - // The entry is missing in the map. Try creating it. - v = &quantilesStateValue{} - outputKey = bytesutil.InternString(outputKey) - vNew, loaded := as.m.LoadOrStore(outputKey, v) - if loaded { - // Use the entry created by a concurrent goroutine. - v = vNew - } - } - sv := v.(*quantilesStateValue) - sv.mu.Lock() - deleted := sv.deleted - if !deleted { - if sv.state[idx] == nil { - sv.state[idx] = histogram.GetFast() - } - sv.state[idx].Update(s.value) - sv.deleteDeadline = deleteDeadline - } - sv.mu.Unlock() - if deleted { - // The entry has been deleted by the concurrent call to flushState - // Try obtaining and updating the entry again. - goto again +func (av *quantilesAggrValue) flush(ctx *flushCtx, key string) { + if av.h != nil { + av.state.quantiles = av.h.Quantiles(av.state.quantiles[:0], av.state.phis) + } + histogram.PutFast(av.h) + if len(av.state.quantiles) > 0 { + for i, quantile := range av.state.quantiles { + av.state.b = strconv.AppendFloat(av.state.b[:0], av.state.phis[i], 'g', -1, 64) + phiStr := bytesutil.InternBytes(av.state.b) + ctx.appendSeriesWithExtraLabel(key, "quantiles", quantile, "quantile", phiStr) } } } - -func (as *quantilesAggrState) flushState(ctx *flushCtx) { - m := &as.m - phis := as.phis - var quantiles []float64 - var b []byte - m.Range(func(k, v any) bool { - sv := v.(*quantilesStateValue) - sv.mu.Lock() - - // check for stale entries - deleted := ctx.flushTimestamp > sv.deleteDeadline - if deleted { - // Mark the current entry as deleted - sv.deleted = deleted - sv.mu.Unlock() - m.Delete(k) - return true - } - state := sv.state[ctx.idx] - quantiles = quantiles[:0] - if state != nil { - quantiles = state.Quantiles(quantiles[:0], phis) - histogram.PutFast(state) - state.Reset() - } - sv.mu.Unlock() - if len(quantiles) > 0 { - key := k.(string) - for i, quantile := range quantiles { - b = strconv.AppendFloat(b[:0], phis[i], 'g', -1, 64) - phiStr := bytesutil.InternBytes(b) - ctx.appendSeriesWithExtraLabel(key, "quantiles", quantile, "quantile", phiStr) - } - } - return true - }) -} diff --git a/lib/streamaggr/rate.go b/lib/streamaggr/rate.go index 4e091af3b..78defedd0 100644 --- a/lib/streamaggr/rate.go +++ b/lib/streamaggr/rate.go @@ -1,176 +1,115 @@ package streamaggr import ( - "sync" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" ) -// rateAggrState calculates output=rate_avg and rate_sum, e.g. the average per-second increase rate for counter metrics. -type rateAggrState struct { - m sync.Map - - // isAvg is set to true if rate_avg() must be calculated instead of rate_sum(). - isAvg bool +func rateInitFn(isAvg bool) aggrValuesInitFn { + return func(values []aggrValue) []aggrValue { + shared := &rateAggrValueShared{ + lastValues: make(map[string]rateLastValue), + } + for i := range values { + values[i] = &rateAggrValue{ + isAvg: isAvg, + shared: shared, + state: make(map[string]rateAggrValueState), + } + } + return values + } } -type rateStateValue struct { - mu sync.Mutex - state map[string]rateState - deleted bool +// rateLastValue calculates output=rate_avg and rate_sum, e.g. the average per-second increase rate for counter metrics. +type rateLastValue struct { + value float64 deleteDeadline int64 -} -type rateState struct { - lastValues [aggrStateSize]rateLastValueState - // prevTimestamp stores timestamp of the last registered value - // in the previous aggregation interval + // prevTimestamp is the timestamp of the last registered sample in the previous aggregation interval prevTimestamp int64 - - // prevValue stores last registered value - // in the previous aggregation interval - prevValue float64 - deleteDeadline int64 } -type rateLastValueState struct { - firstValue float64 - value float64 - timestamp int64 - - // total stores cumulative difference between registered values - // in the aggregation interval - total float64 +type rateAggrValueShared struct { + lastValues map[string]rateLastValue } -func newRateAggrState(isAvg bool) *rateAggrState { - return &rateAggrState{ - isAvg: isAvg, +type rateAggrValueState struct { + // increase stores cumulative increase for the current time series on the current aggregation interval + increase float64 + timestamp int64 +} + +type rateAggrValue struct { + shared *rateAggrValueShared + state map[string]rateAggrValueState + isAvg bool +} + +func (av *rateAggrValue) pushSample(ctx *pushSampleCtx) { + sv := av.state[ctx.inputKey] + inputKey := ctx.inputKey + lv, ok := av.shared.lastValues[ctx.inputKey] + if ok { + if ctx.sample.timestamp < sv.timestamp { + // Skip out of order sample + return + } + if ctx.sample.value >= lv.value { + sv.increase += ctx.sample.value - lv.value + } else { + // counter reset + sv.increase += ctx.sample.value + } + } else { + lv.prevTimestamp = ctx.sample.timestamp + } + lv.value = ctx.sample.value + lv.deleteDeadline = ctx.deleteDeadline + sv.timestamp = ctx.sample.timestamp + inputKey = bytesutil.InternString(inputKey) + av.state[inputKey] = sv + av.shared.lastValues[inputKey] = lv +} + +func (av *rateAggrValue) flush(ctx *flushCtx, key string) { + suffix := av.getSuffix() + rate := 0.0 + countSeries := 0 + lvs := av.shared.lastValues + for lk, lv := range lvs { + if ctx.flushTimestamp > lv.deleteDeadline { + delete(lvs, lk) + continue + } + } + for sk, sv := range av.state { + lv := lvs[sk] + if lv.prevTimestamp == 0 { + continue + } + d := float64(sv.timestamp-lv.prevTimestamp) / 1000 + if d > 0 { + rate += sv.increase / d + countSeries++ + } + lv.prevTimestamp = sv.timestamp + lvs[sk] = lv + delete(av.state, sk) + } + if countSeries == 0 { + return + } + if av.isAvg { + rate /= float64(countSeries) + } + if rate > 0 { + ctx.appendSeries(key, suffix, rate) } } -func (as *rateAggrState) pushSamples(samples []pushSample, deleteDeadline int64, idx int) { - for i := range samples { - s := &samples[i] - inputKey, outputKey := getInputOutputKey(s.key) - - again: - v, ok := as.m.Load(outputKey) - if !ok { - // The entry is missing in the map. Try creating it. - rsv := &rateStateValue{ - state: make(map[string]rateState), - } - v = rsv - outputKey = bytesutil.InternString(outputKey) - vNew, loaded := as.m.LoadOrStore(outputKey, v) - if loaded { - // Use the entry created by a concurrent goroutine. - v = vNew - } - } - sv := v.(*rateStateValue) - sv.mu.Lock() - deleted := sv.deleted - if !deleted { - state, ok := sv.state[inputKey] - lv := state.lastValues[idx] - if ok && lv.timestamp > 0 { - if s.timestamp < lv.timestamp { - // Skip out of order sample - sv.mu.Unlock() - continue - } - if state.prevTimestamp == 0 { - state.prevTimestamp = lv.timestamp - state.prevValue = lv.value - } - if s.value >= lv.value { - lv.total += s.value - lv.value - } else { - // counter reset - lv.total += s.value - } - } else if state.prevTimestamp > 0 { - lv.firstValue = s.value - } - lv.value = s.value - lv.timestamp = s.timestamp - state.lastValues[idx] = lv - state.deleteDeadline = deleteDeadline - inputKey = bytesutil.InternString(inputKey) - sv.state[inputKey] = state - sv.deleteDeadline = deleteDeadline - } - sv.mu.Unlock() - if deleted { - // The entry has been deleted by the concurrent call to flushState - // Try obtaining and updating the entry again. - goto again - } +func (av *rateAggrValue) getSuffix() string { + if av.isAvg { + return "rate_avg" } -} - -func (as *rateAggrState) getSuffix() string { - if as.isAvg { - return "rate_avg" - } - return "rate_sum" -} - -func (as *rateAggrState) flushState(ctx *flushCtx) { - m := &as.m - suffix := as.getSuffix() - m.Range(func(k, v any) bool { - sv := v.(*rateStateValue) - sv.mu.Lock() - - // check for stale entries - deleted := ctx.flushTimestamp > sv.deleteDeadline - if deleted { - // Mark the current entry as deleted - sv.deleted = true - sv.mu.Unlock() - m.Delete(k) - return true - } - - // Delete outdated entries in state - rate := 0.0 - countSeries := 0 - for k1, state := range sv.state { - if ctx.flushTimestamp > state.deleteDeadline { - delete(sv.state, k1) - continue - } - v1 := state.lastValues[ctx.idx] - rateInterval := v1.timestamp - state.prevTimestamp - if rateInterval > 0 && state.prevTimestamp > 0 { - if v1.firstValue >= state.prevValue { - v1.total += v1.firstValue - state.prevValue - } else { - v1.total += v1.firstValue - } - - // calculate rate only if value was seen at least twice with different timestamps - rate += (v1.total) * 1000 / float64(rateInterval) - state.prevTimestamp = v1.timestamp - state.prevValue = v1.value - countSeries++ - } - state.lastValues[ctx.idx] = rateLastValueState{} - sv.state[k1] = state - } - - sv.mu.Unlock() - - if countSeries > 0 { - if as.isAvg { - rate /= float64(countSeries) - } - key := k.(string) - ctx.appendSeries(key, suffix, rate) - } - return true - }) + return "rate_sum" } diff --git a/lib/streamaggr/stddev.go b/lib/streamaggr/stddev.go index e8c95cf1f..b89ecbd7b 100644 --- a/lib/streamaggr/stddev.go +++ b/lib/streamaggr/stddev.go @@ -2,93 +2,27 @@ package streamaggr import ( "math" - "sync" - - "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" ) -// stddevAggrState calculates output=stddev, e.g. the average value over input samples. -type stddevAggrState struct { - m sync.Map -} - -type stddevStateValue struct { - mu sync.Mutex - state [aggrStateSize]stddevState - deleted bool - deleteDeadline int64 -} - -type stddevState struct { +// stddevAggrValue calculates output=stddev, e.g. the average value over input samples. +type stddevAggrValue struct { count float64 avg float64 q float64 } -func newStddevAggrState() *stddevAggrState { - return &stddevAggrState{} +func (av *stddevAggrValue) pushSample(ctx *pushSampleCtx) { + av.count++ + avg := av.avg + (ctx.sample.value-av.avg)/av.count + av.q += (ctx.sample.value - av.avg) * (ctx.sample.value - avg) + av.avg = avg } -func (as *stddevAggrState) pushSamples(samples []pushSample, deleteDeadline int64, idx int) { - for i := range samples { - s := &samples[i] - outputKey := getOutputKey(s.key) - - again: - v, ok := as.m.Load(outputKey) - if !ok { - // The entry is missing in the map. Try creating it. - v = &stddevStateValue{} - outputKey = bytesutil.InternString(outputKey) - vNew, loaded := as.m.LoadOrStore(outputKey, v) - if loaded { - // Use the entry created by a concurrent goroutine. - v = vNew - } - } - sv := v.(*stddevStateValue) - sv.mu.Lock() - deleted := sv.deleted - if !deleted { - // See `Rapid calculation methods` at https://en.wikipedia.org/wiki/Standard_deviation - state := &sv.state[idx] - state.count++ - avg := state.avg + (s.value-state.avg)/state.count - state.q += (s.value - state.avg) * (s.value - avg) - state.avg = avg - sv.deleteDeadline = deleteDeadline - } - sv.mu.Unlock() - if deleted { - // The entry has been deleted by the concurrent call to flushState - // Try obtaining and updating the entry again. - goto again - } +func (av *stddevAggrValue) flush(ctx *flushCtx, key string) { + if av.count > 0 { + ctx.appendSeries(key, "stddev", math.Sqrt(av.q/av.count)) + av.count = 0 + av.avg = 0 + av.q = 0 } } - -func (as *stddevAggrState) flushState(ctx *flushCtx) { - m := &as.m - m.Range(func(k, v any) bool { - sv := v.(*stddevStateValue) - sv.mu.Lock() - - // check for stale entries - deleted := ctx.flushTimestamp > sv.deleteDeadline - if deleted { - // Mark the current entry as deleted - sv.deleted = deleted - sv.mu.Unlock() - m.Delete(k) - return true - } - state := sv.state[ctx.idx] - sv.state[ctx.idx] = stddevState{} - sv.mu.Unlock() - if state.count > 0 { - key := k.(string) - ctx.appendSeries(key, "stddev", math.Sqrt(state.q/state.count)) - } - return true - }) -} diff --git a/lib/streamaggr/stdvar.go b/lib/streamaggr/stdvar.go index f066ae30d..1c9cd9fb2 100644 --- a/lib/streamaggr/stdvar.go +++ b/lib/streamaggr/stdvar.go @@ -1,93 +1,24 @@ package streamaggr -import ( - "sync" - - "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" -) - -// stdvarAggrState calculates output=stdvar, e.g. the average value over input samples. -type stdvarAggrState struct { - m sync.Map -} - -type stdvarStateValue struct { - mu sync.Mutex - state [aggrStateSize]stdvarState - deleted bool - deleteDeadline int64 -} - -type stdvarState struct { +// stdvarAggrValue calculates output=stdvar, e.g. the average value over input samples. +type stdvarAggrValue struct { count float64 avg float64 q float64 } -func newStdvarAggrState() *stdvarAggrState { - return &stdvarAggrState{} +func (av *stdvarAggrValue) pushSample(ctx *pushSampleCtx) { + av.count++ + avg := av.avg + (ctx.sample.value-av.avg)/av.count + av.q += (ctx.sample.value - av.avg) * (ctx.sample.value - avg) + av.avg = avg } -func (as *stdvarAggrState) pushSamples(samples []pushSample, deleteDeadline int64, idx int) { - for i := range samples { - s := &samples[i] - outputKey := getOutputKey(s.key) - - again: - v, ok := as.m.Load(outputKey) - if !ok { - // The entry is missing in the map. Try creating it. - v = &stdvarStateValue{} - outputKey = bytesutil.InternString(outputKey) - vNew, loaded := as.m.LoadOrStore(outputKey, v) - if loaded { - // Use the entry created by a concurrent goroutine. - v = vNew - } - } - sv := v.(*stdvarStateValue) - sv.mu.Lock() - deleted := sv.deleted - if !deleted { - // See `Rapid calculation methods` at https://en.wikipedia.org/wiki/Standard_deviation - state := &sv.state[idx] - state.count++ - avg := state.avg + (s.value-state.avg)/state.count - state.q += (s.value - state.avg) * (s.value - avg) - state.avg = avg - sv.deleteDeadline = deleteDeadline - } - sv.mu.Unlock() - if deleted { - // The entry has been deleted by the concurrent call to flushState - // Try obtaining and updating the entry again. - goto again - } +func (av *stdvarAggrValue) flush(ctx *flushCtx, key string) { + if av.count > 0 { + ctx.appendSeries(key, "stdvar", av.q/av.count) + av.count = 0 + av.avg = 0 + av.q = 0 } } - -func (as *stdvarAggrState) flushState(ctx *flushCtx) { - m := &as.m - m.Range(func(k, v any) bool { - sv := v.(*stdvarStateValue) - sv.mu.Lock() - - // check for stale entries - deleted := ctx.flushTimestamp > sv.deleteDeadline - if deleted { - // Mark the current entry as deleted - sv.deleted = deleted - sv.mu.Unlock() - m.Delete(k) - return true - } - state := sv.state[ctx.idx] - sv.state[ctx.idx] = stdvarState{} - sv.mu.Unlock() - if state.count > 0 { - key := k.(string) - ctx.appendSeries(key, "stdvar", state.q/state.count) - } - return true - }) -} diff --git a/lib/streamaggr/streamaggr.go b/lib/streamaggr/streamaggr.go index c5bac0cfa..eac5630d1 100644 --- a/lib/streamaggr/streamaggr.go +++ b/lib/streamaggr/streamaggr.go @@ -27,9 +27,6 @@ import ( "gopkg.in/yaml.v2" ) -// count of aggregation intervals for states -const aggrStateSize = 2 - var supportedOutputs = []string{ "avg", "count_samples", @@ -144,6 +141,9 @@ type Options struct { // // By default, aggregates samples are dropped, while the remaining samples are written to the corresponding -remoteWrite.url. KeepInput bool + + // StateSize defines a number of intervals to aggregate for + StateSize int } // Config is a configuration for a single stream aggregation. @@ -246,6 +246,9 @@ type Config struct { // OutputRelabelConfigs is an optional relabeling rules, which are applied // on the aggregated output before being sent to remote storage. OutputRelabelConfigs []promrelabel.RelabelConfig `yaml:"output_relabel_configs,omitempty"` + + // StateSize + StateSize *int `yaml:"state_size,omitempty"` } // Aggregators aggregates metrics passed to Push and calls pushFunc for aggregated data. @@ -394,14 +397,14 @@ type aggregator struct { da *dedupAggr // aggrOutputs contains aggregate states for the given outputs - aggrOutputs []aggrOutput + aggrOutputs *aggrOutputs // minTimestamp is used for ignoring old samples when ignoreOldSamples is set minTimestamp atomic.Int64 // time to wait after interval end before flush flushAfter *histogram.Fast - muFlushAfter sync.Mutex + flushAfterMu sync.Mutex // suffix contains a suffix, which should be added to aggregate metric names // @@ -424,26 +427,10 @@ type aggregator struct { matchedSamples *metrics.Counter } -type aggrOutput struct { - as aggrState - - outputSamples *metrics.Counter -} - -type aggrState interface { - // pushSamples must push samples to the aggrState. - // - // samples[].key must be cloned by aggrState, since it may change after returning from pushSamples. - pushSamples(samples []pushSample, deleteDeadline int64, idx int) - - // flushState must flush aggrState data to ctx. - flushState(ctx *flushCtx) -} - // PushFunc is called by Aggregators when it needs to push its state to metrics storage type PushFunc func(tss []prompbmarshal.TimeSeries) -type aggrPushFunc func(samples []pushSample, deleteDeadline int64, idx int) +type aggrPushFunc func(data *pushCtxData) // newAggregator creates new aggregator for the given cfg, which pushes the aggregate data to pushFunc. // @@ -554,6 +541,16 @@ func newAggregator(cfg *Config, path string, pushFunc PushFunc, ms *metrics.Set, ignoreFirstIntervals = *v } + // check cfg.StateSize + stateSize := opts.StateSize + if v := cfg.StateSize; v != nil { + stateSize = *v + } + + if stateSize < 1 { + return nil, fmt.Errorf("`state_size` must be greater or equal to 1") + } + // Initialize common metric labels name := cfg.Name if name == "" { @@ -566,18 +563,18 @@ func newAggregator(cfg *Config, path string, pushFunc PushFunc, ms *metrics.Set, return nil, fmt.Errorf("`outputs` list must contain at least a single entry from the list %s; "+ "see https://docs.victoriametrics.com/stream-aggregation/", supportedOutputs) } - aggrOutputs := make([]aggrOutput, len(cfg.Outputs)) + aggrOutputs := &aggrOutputs{ + initFns: make([]aggrValuesFn, len(cfg.Outputs)), + outputSamples: ms.NewCounter(fmt.Sprintf(`vm_streamaggr_output_samples_total{outputs=%q,%s}`, "test", metricLabels)), + stateSize: stateSize, + } outputsSeen := make(map[string]struct{}, len(cfg.Outputs)) for i, output := range cfg.Outputs { - as, err := newAggrState(output, outputsSeen, stalenessInterval) + oc, err := newOutputInitFns(output, outputsSeen, stateSize) if err != nil { return nil, err } - aggrOutputs[i] = aggrOutput{ - as: as, - - outputSamples: ms.NewCounter(fmt.Sprintf(`vm_streamaggr_output_samples_total{output=%q,%s}`, output, metricLabels)), - } + aggrOutputs.initFns[i] = oc } // initialize suffix to add to metric names after aggregation @@ -629,7 +626,7 @@ func newAggregator(cfg *Config, path string, pushFunc PushFunc, ms *metrics.Set, } if dedupInterval > 0 { - a.da = newDedupAggr() + a.da = newDedupAggr(stateSize) _ = ms.NewGauge(fmt.Sprintf(`vm_streamaggr_dedup_state_size_bytes{%s}`, metricLabels), func() float64 { n := a.da.sizeBytes() @@ -667,7 +664,7 @@ func newAggregator(cfg *Config, path string, pushFunc PushFunc, ms *metrics.Set, return a, nil } -func newAggrState(output string, outputsSeen map[string]struct{}, stalenessInterval time.Duration) (aggrState, error) { +func newOutputInitFns(output string, outputsSeen map[string]struct{}, stateSize int) (aggrValuesFn, error) { // check for duplicated output if _, ok := outputsSeen[output]; ok { return nil, fmt.Errorf("`outputs` list contains duplicate aggregation function: %s", output) @@ -699,44 +696,44 @@ func newAggrState(output string, outputsSeen map[string]struct{}, stalenessInter return nil, fmt.Errorf("`outputs` list contains duplicated `quantiles()` function, please combine multiple phi* like `quantiles(0.5, 0.9)`") } outputsSeen["quantiles"] = struct{}{} - return newQuantilesAggrState(phis), nil + return newAggrValues[quantilesAggrValue](quantilesInitFn(stateSize, phis)), nil } switch output { case "avg": - return newAvgAggrState(), nil + return newAggrValues[avgAggrValue](nil), nil case "count_samples": - return newCountSamplesAggrState(), nil + return newAggrValues[countSamplesAggrValue](nil), nil case "count_series": - return newCountSeriesAggrState(), nil + return newAggrValues[countSeriesAggrValue](countSeriesInitFn), nil case "histogram_bucket": - return newHistogramBucketAggrState(), nil + return newAggrValues[histogramBucketAggrValue](nil), nil case "increase": - return newTotalAggrState(true, true), nil + return newAggrValues[totalAggrValue](totalInitFn(true, true)), nil case "increase_prometheus": - return newTotalAggrState(true, false), nil + return newAggrValues[totalAggrValue](totalInitFn(true, false)), nil case "last": - return newLastAggrState(), nil + return newAggrValues[lastAggrValue](nil), nil case "max": - return newMaxAggrState(), nil + return newAggrValues[maxAggrValue](nil), nil case "min": - return newMinAggrState(), nil + return newAggrValues[minAggrValue](nil), nil case "rate_avg": - return newRateAggrState(true), nil + return newAggrValues[rateAggrValue](rateInitFn(true)), nil case "rate_sum": - return newRateAggrState(false), nil + return newAggrValues[rateAggrValue](rateInitFn(false)), nil case "stddev": - return newStddevAggrState(), nil + return newAggrValues[stddevAggrValue](nil), nil case "stdvar": - return newStdvarAggrState(), nil + return newAggrValues[stdvarAggrValue](nil), nil case "sum_samples": - return newSumSamplesAggrState(), nil + return newAggrValues[sumSamplesAggrValue](nil), nil case "total": - return newTotalAggrState(false, true), nil + return newAggrValues[totalAggrValue](totalInitFn(false, true)), nil case "total_prometheus": - return newTotalAggrState(false, false), nil + return newAggrValues[totalAggrValue](totalInitFn(false, false)), nil case "unique_samples": - return newUniqueSamplesAggrState(), nil + return newAggrValues[uniqueSamplesAggrValue](uniqueSamplesInitFn), nil default: return nil, fmt.Errorf("unsupported output=%q; supported values: %s; see https://docs.victoriametrics.com/stream-aggregation/", output, supportedOutputs) } @@ -758,15 +755,11 @@ func (a *aggregator) runFlusher(pushFunc PushFunc, alignFlushToInterval, skipInc } } - var flushTimeMsec int64 tickerWait := func(t *time.Ticker) bool { select { case <-a.stopCh: - flushTimeMsec = time.Now().UnixMilli() return false - case ct := <-t.C: - flushTimeMsec = ct.UnixMilli() - fmt.Println(flushTimeMsec) + case <-t.C: return true } } @@ -791,10 +784,10 @@ func (a *aggregator) runFlusher(pushFunc PushFunc, alignFlushToInterval, skipInc pf := pushFunc // Calculate delay - a.muFlushAfter.Lock() + a.flushAfterMu.Lock() flushAfterMsec := a.flushAfter.Quantile(0.95) a.flushAfter.Reset() - a.muFlushAfter.Unlock() + a.flushAfterMu.Unlock() flushAfter := time.Duration(flushAfterMsec) * time.Millisecond if flushAfter > tickInterval { @@ -805,7 +798,8 @@ func (a *aggregator) runFlusher(pushFunc PushFunc, alignFlushToInterval, skipInc time.Sleep(flushAfter) } - a.dedupFlush(dedupTime.UnixMilli(), dedupIdx, flushIdx) + deleteDeadline := dedupTime.Add(a.stalenessInterval) + a.dedupFlush(deleteDeadline.UnixMilli(), dedupIdx, flushIdx) if ct.After(flushDeadline) { // It is time to flush the aggregated state @@ -832,25 +826,26 @@ func (a *aggregator) runFlusher(pushFunc PushFunc, alignFlushToInterval, skipInc if !skipIncompleteFlush && ignoreFirstIntervals <= 0 { dedupTime := time.Now().Truncate(tickInterval).Add(tickInterval) + deleteDeadline := flushDeadline.Add(a.stalenessInterval) if a.ignoreOldSamples { dedupIdx, flushIdx = a.getAggrIdxs(dedupTime, flushDeadline) } - a.dedupFlush(flushDeadline.UnixMilli(), dedupIdx, flushIdx) + a.dedupFlush(deleteDeadline.UnixMilli(), dedupIdx, flushIdx) a.flush(pushFunc, flushDeadline.UnixMilli(), flushIdx) } } func (a *aggregator) getAggrIdxs(dedupTime, flushTime time.Time) (int, int) { - flushIdx := getStateIdx(a.interval.Milliseconds(), flushTime.Add(-a.interval).UnixMilli()) + flushIdx := a.getStateIdx(a.interval.Milliseconds(), flushTime.Add(-a.interval).UnixMilli()) dedupIdx := flushIdx if a.dedupInterval > 0 { - dedupIdx = getStateIdx(a.dedupInterval.Milliseconds(), dedupTime.Add(-a.dedupInterval).UnixMilli()) + dedupIdx = a.getStateIdx(a.dedupInterval.Milliseconds(), dedupTime.Add(-a.dedupInterval).UnixMilli()) } return dedupIdx, flushIdx } -func getStateIdx(interval int64, ts int64) int { - return int(ts/interval) % aggrStateSize +func (a *aggregator) getStateIdx(interval int64, ts int64) int { + return int(ts/interval) % a.aggrOutputs.stateSize } func (a *aggregator) dedupFlush(deleteDeadline int64, dedupIdx, flushIdx int) { @@ -861,7 +856,7 @@ func (a *aggregator) dedupFlush(deleteDeadline int64, dedupIdx, flushIdx int) { startTime := time.Now() - a.da.flush(a.pushSamples, deleteDeadline, dedupIdx, flushIdx) + a.da.flush(a.aggrOutputs.pushSamples, deleteDeadline, dedupIdx, flushIdx) d := time.Since(startTime) a.dedupFlushDuration.Update(d.Seconds()) @@ -879,24 +874,12 @@ func (a *aggregator) dedupFlush(deleteDeadline int64, dedupIdx, flushIdx int) { func (a *aggregator) flush(pushFunc PushFunc, flushTimeMsec int64, idx int) { startTime := time.Now() - var wg sync.WaitGroup - for i := range a.aggrOutputs { - ao := &a.aggrOutputs[i] - flushConcurrencyCh <- struct{}{} - wg.Add(1) - go func(ao *aggrOutput) { - defer func() { - <-flushConcurrencyCh - wg.Done() - }() + ao := a.aggrOutputs - ctx := getFlushCtx(a, ao, pushFunc, flushTimeMsec, idx) - ao.as.flushState(ctx) - ctx.flushSeries() - putFlushCtx(ctx) - }(ao) - } - wg.Wait() + ctx := getFlushCtx(a, ao, pushFunc, flushTimeMsec, idx) + ao.flushState(ctx) + ctx.flushSeries() + putFlushCtx(ctx) d := time.Since(startTime) a.flushDuration.Update(d.Seconds()) @@ -921,7 +904,7 @@ func (a *aggregator) MustStop() { // Push pushes tss to a. func (a *aggregator) Push(tss []prompbmarshal.TimeSeries, matchIdxs []byte) { - ctx := getPushCtx() + ctx := getPushCtx(a.aggrOutputs.stateSize) defer putPushCtx(ctx) samples := ctx.samples @@ -972,9 +955,6 @@ func (a *aggregator) Push(tss []prompbmarshal.TimeSeries, matchIdxs []byte) { // do not intern key because number of unique keys could be too high key := bytesutil.ToUnsafeString(buf[bufLen:]) for _, s := range ts.Samples { - a.muFlushAfter.Lock() - a.flushAfter.Update(float64(nowMsec - s.Timestamp)) - a.muFlushAfter.Unlock() if math.IsNaN(s.Value) { // Skip NaN values a.ignoredNaNSamples.Inc() @@ -990,7 +970,7 @@ func (a *aggregator) Push(tss []prompbmarshal.TimeSeries, matchIdxs []byte) { maxLagMsec = lagMsec } if ignoreOldSamples { - flushIdx = getStateIdx(a.tickInterval, s.Timestamp) + flushIdx = a.getStateIdx(a.tickInterval, s.Timestamp) } samples[flushIdx] = append(samples[flushIdx], pushSample{ key: key, @@ -999,11 +979,14 @@ func (a *aggregator) Push(tss []prompbmarshal.TimeSeries, matchIdxs []byte) { }) } } + a.flushAfterMu.Lock() + a.flushAfter.Update(float64(maxLagMsec)) + a.flushAfterMu.Unlock() ctx.samples = samples ctx.buf = buf - pushSamples := a.pushSamples + pushSamples := a.aggrOutputs.pushSamples if a.da != nil { pushSamples = a.da.pushSamples } @@ -1012,7 +995,8 @@ func (a *aggregator) Push(tss []prompbmarshal.TimeSeries, matchIdxs []byte) { if len(s) > 0 { a.samplesLag.Update(float64(maxLagMsec) / 1_000) a.matchedSamples.Add(len(s)) - pushSamples(s, deleteDeadlineMsec, idx) + data := ctx.getPushCtxData(s, nowMsec, deleteDeadlineMsec, idx) + pushSamples(data) } } } @@ -1031,17 +1015,6 @@ func decompressLabels(dst []prompbmarshal.Label, key string) []prompbmarshal.Lab return lc.Decompress(dst, bytesutil.ToUnsafeBytes(key)) } -func getOutputKey(key string) string { - src := bytesutil.ToUnsafeBytes(key) - inputKeyLen, nSize := encoding.UnmarshalVarUint64(src) - if nSize <= 0 { - logger.Panicf("BUG: cannot unmarshal inputKeyLen from uvarint") - } - src = src[nSize:] - outputKey := src[inputKeyLen:] - return bytesutil.ToUnsafeString(outputKey) -} - func getInputOutputKey(key string) (string, string) { src := bytesutil.ToUnsafeBytes(key) inputKeyLen, nSize := encoding.UnmarshalVarUint64(src) @@ -1054,20 +1027,30 @@ func getInputOutputKey(key string) (string, string) { return bytesutil.ToUnsafeString(inputKey), bytesutil.ToUnsafeString(outputKey) } -func (a *aggregator) pushSamples(samples []pushSample, deleteDeadline int64, idx int) { - for _, ao := range a.aggrOutputs { - ao.as.pushSamples(samples, deleteDeadline, idx) - } +type pushCtxData struct { + samples []pushSample + deleteDeadline int64 + idx int + now int64 } type pushCtx struct { - samples [aggrStateSize][]pushSample + samples [][]pushSample labels promutils.Labels inputLabels promutils.Labels outputLabels promutils.Labels buf []byte } +func (ctx *pushCtx) getPushCtxData(samples []pushSample, now, deleteDeadline int64, idx int) *pushCtxData { + return &pushCtxData{ + samples: samples, + deleteDeadline: deleteDeadline, + idx: idx, + now: now, + } +} + func (ctx *pushCtx) reset() { for i := range ctx.samples { ctx.samples[i] = ctx.samples[i][:0] @@ -1087,10 +1070,12 @@ type pushSample struct { timestamp int64 } -func getPushCtx() *pushCtx { +func getPushCtx(stateSize int) *pushCtx { v := pushCtxPool.Get() if v == nil { - return &pushCtx{} + return &pushCtx{ + samples: make([][]pushSample, stateSize), + } } return v.(*pushCtx) } @@ -1123,7 +1108,7 @@ func getInputOutputLabels(dstInput, dstOutput, labels []prompbmarshal.Label, by, return dstInput, dstOutput } -func getFlushCtx(a *aggregator, ao *aggrOutput, pushFunc PushFunc, flushTimestamp int64, idx int) *flushCtx { +func getFlushCtx(a *aggregator, ao *aggrOutputs, pushFunc PushFunc, flushTimestamp int64, idx int) *flushCtx { v := flushCtxPool.Get() if v == nil { v = &flushCtx{} @@ -1146,7 +1131,7 @@ var flushCtxPool sync.Pool type flushCtx struct { a *aggregator - ao *aggrOutput + ao *aggrOutputs pushFunc PushFunc flushTimestamp int64 idx int diff --git a/lib/streamaggr/streamaggr_test.go b/lib/streamaggr/streamaggr_test.go index 9166a633e..405989d42 100644 --- a/lib/streamaggr/streamaggr_test.go +++ b/lib/streamaggr/streamaggr_test.go @@ -200,11 +200,14 @@ func TestAggregatorsEqual(t *testing.T) { t.Helper() pushFunc := func(_ []prompbmarshal.TimeSeries) {} - aa, err := LoadFromData([]byte(a), pushFunc, nil, "some_alias") + opts := Options{ + StateSize: 2, + } + aa, err := LoadFromData([]byte(a), pushFunc, &opts, "some_alias") if err != nil { t.Fatalf("cannot initialize aggregators: %s", err) } - ab, err := LoadFromData([]byte(b), pushFunc, nil, "some_alias") + ab, err := LoadFromData([]byte(b), pushFunc, &opts, "some_alias") if err != nil { t.Fatalf("cannot initialize aggregators: %s", err) } @@ -266,6 +269,7 @@ func TestAggregatorsSuccess(t *testing.T) { opts := &Options{ FlushOnShutdown: true, NoAlignFlushToInterval: true, + StateSize: 2, } a, err := LoadFromData([]byte(config), pushFunc, opts, "some_alias") if err != nil { @@ -985,6 +989,7 @@ func TestAggregatorsWithDedupInterval(t *testing.T) { opts := &Options{ DedupInterval: 30 * time.Second, FlushOnShutdown: true, + StateSize: 2, } a, err := LoadFromData([]byte(config), pushFunc, opts, "some_alias") if err != nil { diff --git a/lib/streamaggr/sum_samples.go b/lib/streamaggr/sum_samples.go index 0af8c07d0..ed57d08ed 100644 --- a/lib/streamaggr/sum_samples.go +++ b/lib/streamaggr/sum_samples.go @@ -1,88 +1,14 @@ package streamaggr -import ( - "sync" - - "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" -) - -// sumSamplesAggrState calculates output=sum_samples, e.g. the sum over input samples. -type sumSamplesAggrState struct { - m sync.Map +type sumSamplesAggrValue struct { + sum float64 } -type sumSamplesStateValue struct { - mu sync.Mutex - state [aggrStateSize]sumState - deleted bool - deleteDeadline int64 +func (av *sumSamplesAggrValue) pushSample(ctx *pushSampleCtx) { + av.sum += ctx.sample.value } -type sumState struct { - sum float64 - exists bool -} - -func newSumSamplesAggrState() *sumSamplesAggrState { - return &sumSamplesAggrState{} -} - -func (as *sumSamplesAggrState) pushSamples(samples []pushSample, deleteDeadline int64, idx int) { - for i := range samples { - s := &samples[i] - outputKey := getOutputKey(s.key) - - again: - v, ok := as.m.Load(outputKey) - if !ok { - // The entry is missing in the map. Try creating it. - v = &sumSamplesStateValue{} - outputKey = bytesutil.InternString(outputKey) - vNew, loaded := as.m.LoadOrStore(outputKey, v) - if loaded { - // Update the entry created by a concurrent goroutine. - v = vNew - } - } - sv := v.(*sumSamplesStateValue) - sv.mu.Lock() - deleted := sv.deleted - if !deleted { - sv.state[idx].sum += s.value - sv.state[idx].exists = true - sv.deleteDeadline = deleteDeadline - } - sv.mu.Unlock() - if deleted { - // The entry has been deleted by the concurrent call to flushState - // Try obtaining and updating the entry again. - goto again - } - } -} - -func (as *sumSamplesAggrState) flushState(ctx *flushCtx) { - m := &as.m - m.Range(func(k, v any) bool { - sv := v.(*sumSamplesStateValue) - sv.mu.Lock() - - // check for stale entries - deleted := ctx.flushTimestamp > sv.deleteDeadline - if deleted { - // Mark the current entry as deleted - sv.deleted = deleted - sv.mu.Unlock() - m.Delete(k) - return true - } - state := sv.state[ctx.idx] - sv.state[ctx.idx] = sumState{} - sv.mu.Unlock() - if state.exists { - key := k.(string) - ctx.appendSeries(key, "sum_samples", state.sum) - } - return true - }) +func (av *sumSamplesAggrValue) flush(ctx *flushCtx, key string) { + ctx.appendSeries(key, "sum_samples", av.sum) + av.sum = 0 } diff --git a/lib/streamaggr/total.go b/lib/streamaggr/total.go index d8975f8b4..91cf3a698 100644 --- a/lib/streamaggr/total.go +++ b/lib/streamaggr/total.go @@ -2,155 +2,100 @@ package streamaggr import ( "math" - "sync" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" ) -// totalAggrState calculates output=total, total_prometheus, increase and increase_prometheus. -type totalAggrState struct { - m sync.Map - - // Whether to reset the output value on every flushState call. - resetTotalOnFlush bool - - // Whether to take into account the first sample in new time series when calculating the output value. - keepFirstSample bool +func totalInitFn(resetTotalOnFlush, keepFirstSample bool) aggrValuesInitFn { + return func(values []aggrValue) []aggrValue { + shared := &totalAggrValueShared{ + lastValues: make(map[string]totalLastValue), + } + for i := range values { + values[i] = &totalAggrValue{ + keepFirstSample: keepFirstSample, + resetTotalOnFlush: resetTotalOnFlush, + shared: shared, + } + } + return values + } } -type totalStateValue struct { - mu sync.Mutex - shared totalState - state [aggrStateSize]float64 - deleteDeadline int64 - deleted bool -} - -type totalState struct { - total float64 - lastValues map[string]totalLastValueState -} - -type totalLastValueState struct { +type totalLastValue struct { value float64 timestamp int64 deleteDeadline int64 } -func newTotalAggrState(resetTotalOnFlush, keepFirstSample bool) *totalAggrState { - return &totalAggrState{ - resetTotalOnFlush: resetTotalOnFlush, - keepFirstSample: keepFirstSample, - } +type totalAggrValueShared struct { + lastValues map[string]totalLastValue + total float64 } -func (as *totalAggrState) pushSamples(samples []pushSample, deleteDeadline int64, idx int) { - var deleted bool - for i := range samples { - s := &samples[i] - inputKey, outputKey := getInputOutputKey(s.key) +type totalAggrValue struct { + total float64 + keepFirstSample bool + resetTotalOnFlush bool + shared *totalAggrValueShared +} - again: - v, ok := as.m.Load(outputKey) - if !ok { - // The entry is missing in the map. Try creating it. - v = &totalStateValue{ - shared: totalState{ - lastValues: make(map[string]totalLastValueState), - }, - } - outputKey = bytesutil.InternString(outputKey) - vNew, loaded := as.m.LoadOrStore(outputKey, v) - if loaded { - // Use the entry created by a concurrent goroutine. - v = vNew - } +func (av *totalAggrValue) pushSample(ctx *pushSampleCtx) { + shared := av.shared + inputKey := ctx.inputKey + lv, ok := shared.lastValues[inputKey] + if ok || av.keepFirstSample { + if ctx.sample.timestamp < lv.timestamp { + // Skip out of order sample + return } - sv := v.(*totalStateValue) - sv.mu.Lock() - deleted = sv.deleted - if !deleted { - lv, ok := sv.shared.lastValues[inputKey] - if ok || as.keepFirstSample { - if s.timestamp < lv.timestamp { - // Skip out of order sample - sv.mu.Unlock() - continue - } - - if s.value >= lv.value { - sv.state[idx] += s.value - lv.value - } else { - // counter reset - sv.state[idx] += s.value - } - } - lv.value = s.value - lv.timestamp = s.timestamp - lv.deleteDeadline = deleteDeadline - - inputKey = bytesutil.InternString(inputKey) - sv.shared.lastValues[inputKey] = lv - sv.deleteDeadline = deleteDeadline - } - sv.mu.Unlock() - if deleted { - // The entry has been deleted by the concurrent call to flushState - // Try obtaining and updating the entry again. - goto again + if ctx.sample.value >= lv.value { + av.total += ctx.sample.value - lv.value + } else { + // counter reset + av.total += ctx.sample.value } } + lv.value = ctx.sample.value + lv.timestamp = ctx.sample.timestamp + lv.deleteDeadline = ctx.deleteDeadline + + inputKey = bytesutil.InternString(inputKey) + shared.lastValues[inputKey] = lv } -func (as *totalAggrState) getSuffix() string { - // Note: this function is at hot path, so it shouldn't allocate. - if as.resetTotalOnFlush { - if as.keepFirstSample { - return "increase" - } - return "increase_prometheus" - } - if as.keepFirstSample { - return "total" - } - return "total_prometheus" +func (av *totalAggrValue) flush(ctx *flushCtx, key string) { + suffix := av.getSuffix() + // check for stale entries + total := av.shared.total + av.total + av.total = 0 + lvs := av.shared.lastValues + for lk, lv := range lvs { + if ctx.flushTimestamp > lv.deleteDeadline { + delete(lvs, lk) + } + } + if av.resetTotalOnFlush { + av.shared.total = 0 + } else if math.Abs(total) >= (1 << 53) { + // It is time to reset the entry, since it starts losing float64 precision + av.shared.total = 0 + } else { + av.shared.total = total + } + ctx.appendSeries(key, suffix, total) } -func (as *totalAggrState) flushState(ctx *flushCtx) { - var total float64 - m := &as.m - suffix := as.getSuffix() - m.Range(func(k, v interface{}) bool { - sv := v.(*totalStateValue) - - sv.mu.Lock() - // check for stale entries - deleted := ctx.flushTimestamp > sv.deleteDeadline - if deleted { - // Mark the current entry as deleted - sv.deleted = deleted - sv.mu.Unlock() - m.Delete(k) - return true +func (av *totalAggrValue) getSuffix() string { + // Note: this function is at hot path, so it shouldn't allocate. + if av.resetTotalOnFlush { + if av.keepFirstSample { + return "increase" } - total = sv.shared.total + sv.state[ctx.idx] - for k1, v1 := range sv.shared.lastValues { - if ctx.flushTimestamp > v1.deleteDeadline { - delete(sv.shared.lastValues, k1) - } - } - sv.state[ctx.idx] = 0 - if !as.resetTotalOnFlush { - if math.Abs(total) >= (1 << 53) { - // It is time to reset the entry, since it starts losing float64 precision - sv.shared.total = 0 - } else { - sv.shared.total = total - } - } - sv.mu.Unlock() - key := k.(string) - ctx.appendSeries(key, suffix, total) - return true - }) + return "increase_prometheus" + } + if av.keepFirstSample { + return "total" + } + return "total_prometheus" } diff --git a/lib/streamaggr/unique_samples.go b/lib/streamaggr/unique_samples.go index ad99d6c6c..16fe49df1 100644 --- a/lib/streamaggr/unique_samples.go +++ b/lib/streamaggr/unique_samples.go @@ -1,86 +1,25 @@ package streamaggr -import ( - "sync" - - "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" -) - -// uniqueSamplesAggrState calculates output=unique_samples, e.g. the number of unique sample values. -type uniqueSamplesAggrState struct { - m sync.Map +func uniqueSamplesInitFn(values []aggrValue) []aggrValue { + for i := range values { + values[i] = &uniqueSamplesAggrValue{ + samples: make(map[float64]struct{}), + } + } + return values } -type uniqueSamplesStateValue struct { - mu sync.Mutex - state [aggrStateSize]map[float64]struct{} - deleted bool - deleteDeadline int64 +type uniqueSamplesAggrValue struct { + samples map[float64]struct{} } -func newUniqueSamplesAggrState() *uniqueSamplesAggrState { - return &uniqueSamplesAggrState{} -} - -func (as *uniqueSamplesAggrState) pushSamples(samples []pushSample, deleteDeadline int64, idx int) { - for i := range samples { - s := &samples[i] - outputKey := getOutputKey(s.key) - - again: - v, ok := as.m.Load(outputKey) - if !ok { - // The entry is missing in the map. Try creating it. - usv := &uniqueSamplesStateValue{} - for iu := range usv.state { - usv.state[iu] = make(map[float64]struct{}) - } - v = usv - outputKey = bytesutil.InternString(outputKey) - vNew, loaded := as.m.LoadOrStore(outputKey, v) - if loaded { - // Update the entry created by a concurrent goroutine. - v = vNew - } - } - sv := v.(*uniqueSamplesStateValue) - sv.mu.Lock() - deleted := sv.deleted - if !deleted { - if _, ok := sv.state[idx][s.value]; !ok { - sv.state[idx][s.value] = struct{}{} - } - sv.deleteDeadline = deleteDeadline - } - sv.mu.Unlock() - if deleted { - // The entry has been deleted by the concurrent call to flushState - // Try obtaining and updating the entry again. - goto again - } +func (av *uniqueSamplesAggrValue) pushSample(ctx *pushSampleCtx) { + if _, ok := av.samples[ctx.sample.value]; !ok { + av.samples[ctx.sample.value] = struct{}{} } } -func (as *uniqueSamplesAggrState) flushState(ctx *flushCtx) { - m := &as.m - m.Range(func(k, v any) bool { - sv := v.(*uniqueSamplesStateValue) - sv.mu.Lock() - - // check for stale entries - deleted := ctx.flushTimestamp > sv.deleteDeadline - if deleted { - // Mark the current entry as deleted - sv.deleted = deleted - sv.mu.Unlock() - m.Delete(k) - return true - } - state := len(sv.state[ctx.idx]) - sv.state[ctx.idx] = make(map[float64]struct{}) - sv.mu.Unlock() - key := k.(string) - ctx.appendSeries(key, "unique_samples", float64(state)) - return true - }) +func (av *uniqueSamplesAggrValue) flush(ctx *flushCtx, key string) { + ctx.appendSeries(key, "unique_samples", float64(len(av.samples))) + clear(av.samples) }