From db557b86ee483a6dea726f6021afe1755737a865 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Mon, 15 Jul 2024 18:01:37 +0200 Subject: [PATCH] app/vmagent/remotewrite: follow-up for f153f54d11250da050aa93bc4fa9b7ba9e144691 - Move the remaining code responsible for stream aggregation initialization from remotewrite.go to streamaggr.go . This improves code maintainability a bit. - Properly shut down streamaggr.Aggregators initialized inside remotewrite.CheckStreamAggrConfigs(). This prevents from potential resource leaks. - Use separate functions for initializing and reloading of global stream aggregation and per-remoteWrite.url stream aggregation. This makes the code easier to read and maintain. This also fixes INFO and ERROR logs emitted by these functions. - Add an ability to specify `name` option in every stream aggregation config. This option is used as `name` label in metrics exposed by stream aggregation at /metrics page. This simplifies investigation of the exposed metrics. - Add `path` label additionally to `name`, `url` and `position` labels at metrics exposed by streaming aggregation. This label should simplify investigation of the exposed metrics. - Remove `match` and `group` labels from metrics exposed by streaming aggregation, since they have little practical applicability: it is hard to use these labels in query filters and aggregation functions. - Rename the metric `vm_streamaggr_flushed_samples_total` to less misleading `vm_streamaggr_output_samples_total` . This metric shows the number of samples generated by the corresponding streaming aggregation rule. This metric has been added in the commit 861852f2624895e01f93ce196607c72616ce2a94 . See https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6462 - Remove the metric `vm_streamaggr_stale_samples_total`, since it is unclear how it can be used in practice. This metric has been added in the commit 861852f2624895e01f93ce196607c72616ce2a94 . See https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6462 - Remove Alias and aggrID fields from streamaggr.Options struct, since these fields aren't related to optional params, which could modify the behaviour of the constructed streaming aggregator. Convert the Alias field to regular argument passed to LoadFromFile() function, since this argument is mandatory. - Pass Options arg to LoadFromFile() function by reference, since this structure is quite big. This also allows passing nil instead of Options when default options are enough. - Add `name`, `path`, `url` and `position` labels to `vm_streamaggr_dedup_state_size_bytes` and `vm_streamaggr_dedup_state_items_count` metrics, so they have consistent set of labels comparing to the rest of streaming aggregation metrics. - Convert aggregator.aggrStates field type from `map[string]aggrState` to `[]aggrOutput`, where `aggrOutput` contains the corresponding `aggrState` plus all the related metrics (currently only `vm_streamaggr_output_samples_total` metric is exposed with the corresponding `output` label per each configured output function). This simplifies and speeds up the code responsible for updating per-output metrics. This is a follow-up for the commit 2eb1bc4f814037ae87ac6556011ae0d3caee6bc8 . See https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6604 - Added missing urls to docs ( https://docs.victoriametrics.com/stream-aggregation/ ) in error messages. These urls help users figuring out why VictoriaMetrics or vmagent generates the corresponding error messages. The urls were removed for unknown reason in the commit 2eb1bc4f814037ae87ac6556011ae0d3caee6bc8 . - Fix incorrect update for `vm_streamaggr_output_samples_total` metric in flushCtx.appendSeriesWithExtraLabel() function. While at it, reduce memory usage by limiting the maximum number of samples per flush to 10K. Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5467 Updates https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6268 --- app/vmagent/main.go | 2 +- app/vmagent/remotewrite/remotewrite.go | 40 +- app/vmagent/remotewrite/remotewrite_test.go | 8 +- app/vmagent/remotewrite/streamaggr.go | 200 ++++++--- app/vminsert/common/streamaggr.go | 19 +- dashboards/vm/vmagent.json | 99 +---- dashboards/vmagent.json | 99 +---- docs/CHANGELOG.md | 17 +- docs/stream-aggregation.md | 7 + lib/streamaggr/deduplicator.go | 9 +- lib/streamaggr/histogram_bucket.go | 7 +- lib/streamaggr/rate.go | 34 +- lib/streamaggr/streamaggr.go | 447 +++++++++++--------- lib/streamaggr/streamaggr_test.go | 16 +- lib/streamaggr/streamaggr_timing_test.go | 6 +- lib/streamaggr/total.go | 40 +- 16 files changed, 484 insertions(+), 566 deletions(-) diff --git a/app/vmagent/main.go b/app/vmagent/main.go index f67cdffd6..46c3a84c3 100644 --- a/app/vmagent/main.go +++ b/app/vmagent/main.go @@ -114,7 +114,7 @@ func main() { logger.Fatalf("error when checking relabel configs: %s", err) } if err := remotewrite.CheckStreamAggrConfigs(); err != nil { - logger.Fatalf("error when checking -remoteWrite.streamAggr.config: %s", err) + logger.Fatalf("error when checking -streamAggr.config and -remoteWrite.streamAggr.config: %s", err) } logger.Infof("all the configs are ok; exiting with 0 status code") return diff --git a/app/vmagent/remotewrite/remotewrite.go b/app/vmagent/remotewrite/remotewrite.go index 0de158b03..bc6159b43 100644 --- a/app/vmagent/remotewrite/remotewrite.go +++ b/app/vmagent/remotewrite/remotewrite.go @@ -207,16 +207,7 @@ func Init() { relabelConfigSuccess.Set(1) relabelConfigTimestamp.Set(fasttime.UnixTimestamp()) - sasFile, sasOpts := getStreamAggrOpts(-1) - if sasFile != "" { - sas, err := newStreamAggrConfig(-1, pushToRemoteStoragesDropFailed) - if err != nil { - logger.Fatalf("cannot initialize stream aggregators from -streamAggr.config=%q: %s", sasFile, err) - } - sasGlobal.Store(sas) - } else if sasOpts.DedupInterval > 0 { - deduplicatorGlobal = streamaggr.NewDeduplicator(pushToRemoteStoragesDropFailed, sasOpts.DedupInterval, sasOpts.DropInputLabels, sasOpts.Alias) - } + initStreamAggrConfigGlobal() rwctxsGlobal = newRemoteWriteCtxs(nil, *remoteWriteURLs) @@ -237,9 +228,9 @@ func Init() { defer configReloaderWG.Done() for { select { - case <-sighupCh: case <-configReloaderStopCh: return + case <-sighupCh: } reloadRelabelConfigs() reloadStreamAggrConfigs() @@ -536,7 +527,7 @@ func getEligibleRemoteWriteCtxs(tss []prompbmarshal.TimeSeries, forceDropSamples return rwctxs, true } -func pushToRemoteStoragesDropFailed(tss []prompbmarshal.TimeSeries) { +func pushToRemoteStoragesTrackDropped(tss []prompbmarshal.TimeSeries) { rwctxs, _ := getEligibleRemoteWriteCtxs(tss, true) if len(rwctxs) == 0 { return @@ -844,28 +835,13 @@ func newRemoteWriteCtx(argIdx int, remoteWriteURL *url.URL, maxInmemoryBlocks in c: c, pss: pss, - rowsPushedAfterRelabel: metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_rows_pushed_after_relabel_total{path=%q, url=%q}`, queuePath, sanitizedURL)), - rowsDroppedByRelabel: metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_relabel_metrics_dropped_total{path=%q, url=%q}`, queuePath, sanitizedURL)), + rowsPushedAfterRelabel: metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_rows_pushed_after_relabel_total{path=%q,url=%q}`, queuePath, sanitizedURL)), + rowsDroppedByRelabel: metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_relabel_metrics_dropped_total{path=%q,url=%q}`, queuePath, sanitizedURL)), - pushFailures: metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_push_failures_total{path=%q, url=%q}`, queuePath, sanitizedURL)), - rowsDroppedOnPushFailure: metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_samples_dropped_total{path=%q, url=%q}`, queuePath, sanitizedURL)), - } - - // Initialize sas - sasFile, sasOpts := getStreamAggrOpts(argIdx) - if sasFile != "" { - sas, err := newStreamAggrConfig(argIdx, rwctx.pushInternalTrackDropped) - if err != nil { - logger.Fatalf("cannot initialize stream aggregators from -remoteWrite.streamAggr.config=%q: %s", sasFile, err) - } - rwctx.sas.Store(sas) - rwctx.streamAggrKeepInput = streamAggrKeepInput.GetOptionalArg(argIdx) - rwctx.streamAggrDropInput = streamAggrDropInput.GetOptionalArg(argIdx) - metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_successful{path=%q}`, sasFile)).Set(1) - metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_success_timestamp_seconds{path=%q}`, sasFile)).Set(fasttime.UnixTimestamp()) - } else if sasOpts.DedupInterval > 0 { - rwctx.deduplicator = streamaggr.NewDeduplicator(rwctx.pushInternalTrackDropped, sasOpts.DedupInterval, sasOpts.DropInputLabels, sasOpts.Alias) + pushFailures: metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_push_failures_total{path=%q,url=%q}`, queuePath, sanitizedURL)), + rowsDroppedOnPushFailure: metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_samples_dropped_total{path=%q,url=%q}`, queuePath, sanitizedURL)), } + rwctx.initStreamAggrConfig() return rwctx } diff --git a/app/vmagent/remotewrite/remotewrite_test.go b/app/vmagent/remotewrite/remotewrite_test.go index 5461ed243..01a9fc1f2 100644 --- a/app/vmagent/remotewrite/remotewrite_test.go +++ b/app/vmagent/remotewrite/remotewrite_test.go @@ -77,14 +77,16 @@ func TestRemoteWriteContext_TryPush_ImmutableTimeseries(t *testing.T) { rowsDroppedByRelabel: metrics.GetOrCreateCounter(`bar`), } if dedupInterval > 0 { - rwctx.deduplicator = streamaggr.NewDeduplicator(nil, dedupInterval, nil, "global") + rwctx.deduplicator = streamaggr.NewDeduplicator(nil, dedupInterval, nil, "dedup-global") } - if len(streamAggrConfig) > 0 { - sas, err := streamaggr.LoadFromData([]byte(streamAggrConfig), nil, streamaggr.Options{}) + if streamAggrConfig != "" { + pushNoop := func(_ []prompbmarshal.TimeSeries) {} + sas, err := streamaggr.LoadFromData([]byte(streamAggrConfig), pushNoop, nil, "global") if err != nil { t.Fatalf("cannot load streamaggr configs: %s", err) } + defer sas.MustStop() rwctx.sas.Store(sas) } diff --git a/app/vmagent/remotewrite/streamaggr.go b/app/vmagent/remotewrite/streamaggr.go index c07fc46d8..0ebc2e624 100644 --- a/app/vmagent/remotewrite/streamaggr.go +++ b/app/vmagent/remotewrite/streamaggr.go @@ -61,104 +61,180 @@ var ( // CheckStreamAggrConfigs checks -remoteWrite.streamAggr.config and -streamAggr.config. func CheckStreamAggrConfigs() error { - pushNoop := func(_ []prompbmarshal.TimeSeries) {} + // Check global config + sas, err := newStreamAggrConfigGlobal() + if err != nil { + return err + } + sas.MustStop() - if _, err := newStreamAggrConfig(-1, pushNoop); err != nil { - return fmt.Errorf("could not load -streamAggr.config stream aggregation config: %w", err) - } if len(*streamAggrConfig) > len(*remoteWriteURLs) { - return fmt.Errorf("too many -remoteWrite.streamAggr.config args: %d; it mustn't exceed the number of -remoteWrite.url args: %d", - len(*streamAggrConfig), len(*remoteWriteURLs)) + return fmt.Errorf("too many -remoteWrite.streamAggr.config args: %d; it mustn't exceed the number of -remoteWrite.url args: %d", len(*streamAggrConfig), len(*remoteWriteURLs)) } + + pushNoop := func(_ []prompbmarshal.TimeSeries) {} for idx := range *streamAggrConfig { - if _, err := newStreamAggrConfig(idx, pushNoop); err != nil { + sas, err := newStreamAggrConfigPerURL(idx, pushNoop) + if err != nil { return err } + sas.MustStop() } return nil } func reloadStreamAggrConfigs() { - reloadStreamAggrConfig(-1, pushToRemoteStoragesDropFailed) - for idx, rwctx := range rwctxsGlobal { - reloadStreamAggrConfig(idx, rwctx.pushInternalTrackDropped) + reloadStreamAggrConfigGlobal() + for _, rwctx := range rwctxsGlobal { + rwctx.reloadStreamAggrConfig() } } -func reloadStreamAggrConfig(idx int, pushFunc streamaggr.PushFunc) { - path, opts := getStreamAggrOpts(idx) - logger.Infof("reloading stream aggregation configs pointed by -remoteWrite.streamAggr.config=%q", path) - metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reloads_total{path=%q}`, path)).Inc() - - sasNew, err := newStreamAggrConfigWithOpts(pushFunc, path, opts) - if err != nil { - metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reloads_errors_total{path=%q}`, path)).Inc() - metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_successful{path=%q}`, path)).Set(0) - logger.Errorf("cannot reload stream aggregation config at %q; continue using the previously loaded config; error: %s", path, err) +func reloadStreamAggrConfigGlobal() { + path := *streamAggrGlobalConfig + if path == "" { return } - var sas *streamaggr.Aggregators - if idx < 0 { - sas = sasGlobal.Load() - } else { - sas = rwctxsGlobal[idx].sas.Load() + logger.Infof("reloading stream aggregation configs pointed by -streamAggr.config=%q", path) + metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reloads_total{path=%q}`, path)).Inc() + + sasNew, err := newStreamAggrConfigGlobal() + if err != nil { + metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reloads_errors_total{path=%q}`, path)).Inc() + metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_successful{path=%q}`, path)).Set(0) + logger.Errorf("cannot reload -streamAggr.config=%q; continue using the previously loaded config; error: %s", path, err) + return } + sas := sasGlobal.Load() if !sasNew.Equal(sas) { - var sasOld *streamaggr.Aggregators - if idx < 0 { - sasOld = sasGlobal.Swap(sasNew) - } else { - sasOld = rwctxsGlobal[idx].sas.Swap(sasNew) - } + sasOld := sasGlobal.Swap(sasNew) sasOld.MustStop() - logger.Infof("successfully reloaded stream aggregation configs at %q", path) + logger.Infof("successfully reloaded -streamAggr.config=%q", path) } else { sasNew.MustStop() - logger.Infof("successfully reloaded stream aggregation configs at %q", path) + logger.Infof("-streamAggr.config=%q wasn't changed since the last reload", path) } metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_successful{path=%q}`, path)).Set(1) metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_success_timestamp_seconds{path=%q}`, path)).Set(fasttime.UnixTimestamp()) } -func getStreamAggrOpts(idx int) (string, streamaggr.Options) { - if idx < 0 { - return *streamAggrGlobalConfig, streamaggr.Options{ - DedupInterval: streamAggrGlobalDedupInterval.Duration(), - DropInputLabels: *streamAggrGlobalDropInputLabels, - IgnoreOldSamples: *streamAggrGlobalIgnoreOldSamples, - IgnoreFirstIntervals: *streamAggrGlobalIgnoreFirstIntervals, - Alias: "global", +func initStreamAggrConfigGlobal() { + sas, err := newStreamAggrConfigGlobal() + if err != nil { + logger.Fatalf("cannot initialize gloabl stream aggregators: %s", err) + } + if sas != nil { + filePath := sas.FilePath() + sasGlobal.Store(sas) + metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_successful{path=%q}`, filePath)).Set(1) + metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_success_timestamp_seconds{path=%q}`, filePath)).Set(fasttime.UnixTimestamp()) + } else { + dedupInterval := streamAggrGlobalDedupInterval.Duration() + if dedupInterval > 0 { + deduplicatorGlobal = streamaggr.NewDeduplicator(pushToRemoteStoragesTrackDropped, dedupInterval, *streamAggrDropInputLabels, "dedup-global") } } - url := fmt.Sprintf("%d:secret-url", idx+1) - if *showRemoteWriteURL { - url = fmt.Sprintf("%d:%s", idx+1, remoteWriteURLs.GetOptionalArg(idx)) +} + +func (rwctx *remoteWriteCtx) initStreamAggrConfig() { + idx := rwctx.idx + + sas, err := rwctx.newStreamAggrConfig() + if err != nil { + logger.Fatalf("cannot initialize stream aggregators: %s", err) } - opts := streamaggr.Options{ + if sas != nil { + filePath := sas.FilePath() + rwctx.sas.Store(sas) + rwctx.streamAggrKeepInput = streamAggrKeepInput.GetOptionalArg(idx) + rwctx.streamAggrDropInput = streamAggrDropInput.GetOptionalArg(idx) + metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_successful{path=%q}`, filePath)).Set(1) + metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_success_timestamp_seconds{path=%q}`, filePath)).Set(fasttime.UnixTimestamp()) + } else { + dedupInterval := streamAggrDedupInterval.GetOptionalArg(idx) + if dedupInterval > 0 { + alias := fmt.Sprintf("dedup-%d", idx+1) + rwctx.deduplicator = streamaggr.NewDeduplicator(rwctx.pushInternalTrackDropped, dedupInterval, *streamAggrDropInputLabels, alias) + } + } +} + +func (rwctx *remoteWriteCtx) reloadStreamAggrConfig() { + path := streamAggrConfig.GetOptionalArg(rwctx.idx) + if path == "" { + return + } + + logger.Infof("reloading stream aggregation configs pointed by -remoteWrite.streamAggr.config=%q", path) + metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reloads_total{path=%q}`, path)).Inc() + + sasNew, err := rwctx.newStreamAggrConfig() + if err != nil { + metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reloads_errors_total{path=%q}`, path)).Inc() + metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_successful{path=%q}`, path)).Set(0) + logger.Errorf("cannot reload -remoteWrite.streamAggr.config=%q; continue using the previously loaded config; error: %s", path, err) + return + } + + sas := rwctx.sas.Load() + if !sasNew.Equal(sas) { + sasOld := rwctx.sas.Swap(sasNew) + sasOld.MustStop() + logger.Infof("successfully reloaded -remoteWrite.streamAggr.config=%q", path) + } else { + sasNew.MustStop() + logger.Infof("-remoteWrite.streamAggr.config=%q wasn't changed since the last reload", path) + } + metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_successful{path=%q}`, path)).Set(1) + metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_success_timestamp_seconds{path=%q}`, path)).Set(fasttime.UnixTimestamp()) +} + +func newStreamAggrConfigGlobal() (*streamaggr.Aggregators, error) { + path := *streamAggrGlobalConfig + if path == "" { + return nil, nil + } + + opts := &streamaggr.Options{ + DedupInterval: streamAggrGlobalDedupInterval.Duration(), + DropInputLabels: *streamAggrGlobalDropInputLabels, + IgnoreOldSamples: *streamAggrGlobalIgnoreOldSamples, + IgnoreFirstIntervals: *streamAggrGlobalIgnoreFirstIntervals, + } + + sas, err := streamaggr.LoadFromFile(path, pushToRemoteStoragesTrackDropped, opts, "global") + if err != nil { + return nil, fmt.Errorf("cannot load -streamAggr.config=%q: %w", *streamAggrGlobalConfig, err) + } + return sas, nil +} + +func (rwctx *remoteWriteCtx) newStreamAggrConfig() (*streamaggr.Aggregators, error) { + return newStreamAggrConfigPerURL(rwctx.idx, rwctx.pushInternalTrackDropped) +} + +func newStreamAggrConfigPerURL(idx int, pushFunc streamaggr.PushFunc) (*streamaggr.Aggregators, error) { + path := streamAggrConfig.GetOptionalArg(idx) + if path == "" { + return nil, nil + } + + alias := fmt.Sprintf("%d:secret-url", idx+1) + if *showRemoteWriteURL { + alias = fmt.Sprintf("%d:%s", idx+1, remoteWriteURLs.GetOptionalArg(idx)) + } + opts := &streamaggr.Options{ DedupInterval: streamAggrDedupInterval.GetOptionalArg(idx), DropInputLabels: *streamAggrDropInputLabels, IgnoreOldSamples: streamAggrIgnoreOldSamples.GetOptionalArg(idx), IgnoreFirstIntervals: *streamAggrIgnoreFirstIntervals, - Alias: url, } - if len(*streamAggrConfig) == 0 { - return "", opts + sas, err := streamaggr.LoadFromFile(path, pushFunc, opts, alias) + if err != nil { + return nil, fmt.Errorf("cannot load -remoteWrite.streamAggr.config=%q: %w", path, err) } - return streamAggrConfig.GetOptionalArg(idx), opts -} - -func newStreamAggrConfigWithOpts(pushFunc streamaggr.PushFunc, path string, opts streamaggr.Options) (*streamaggr.Aggregators, error) { - if len(path) == 0 { - // Skip empty stream aggregation config. - return nil, nil - } - return streamaggr.LoadFromFile(path, pushFunc, opts) -} - -func newStreamAggrConfig(idx int, pushFunc streamaggr.PushFunc) (*streamaggr.Aggregators, error) { - path, opts := getStreamAggrOpts(idx) - return newStreamAggrConfigWithOpts(pushFunc, path, opts) + return sas, nil } diff --git a/app/vminsert/common/streamaggr.go b/app/vminsert/common/streamaggr.go index 851656d08..3cc649c52 100644 --- a/app/vminsert/common/streamaggr.go +++ b/app/vminsert/common/streamaggr.go @@ -57,14 +57,13 @@ func CheckStreamAggrConfig() error { return nil } pushNoop := func(_ []prompbmarshal.TimeSeries) {} - opts := streamaggr.Options{ + opts := &streamaggr.Options{ DedupInterval: *streamAggrDedupInterval, DropInputLabels: *streamAggrDropInputLabels, IgnoreOldSamples: *streamAggrIgnoreOldSamples, IgnoreFirstIntervals: *streamAggrIgnoreFirstIntervals, - Alias: "global", } - sas, err := streamaggr.LoadFromFile(*streamAggrConfig, pushNoop, opts) + sas, err := streamaggr.LoadFromFile(*streamAggrConfig, pushNoop, opts, "global") if err != nil { return fmt.Errorf("error when loading -streamAggr.config=%q: %w", *streamAggrConfig, err) } @@ -77,25 +76,22 @@ func CheckStreamAggrConfig() error { // MustStopStreamAggr must be called when stream aggr is no longer needed. func InitStreamAggr() { saCfgReloaderStopCh = make(chan struct{}) - rwctx := "global" - if *streamAggrConfig == "" { if *streamAggrDedupInterval > 0 { - deduplicator = streamaggr.NewDeduplicator(pushAggregateSeries, *streamAggrDedupInterval, *streamAggrDropInputLabels, rwctx) + deduplicator = streamaggr.NewDeduplicator(pushAggregateSeries, *streamAggrDedupInterval, *streamAggrDropInputLabels, "global") } return } sighupCh := procutil.NewSighupChan() - opts := streamaggr.Options{ + opts := &streamaggr.Options{ DedupInterval: *streamAggrDedupInterval, DropInputLabels: *streamAggrDropInputLabels, IgnoreOldSamples: *streamAggrIgnoreOldSamples, IgnoreFirstIntervals: *streamAggrIgnoreFirstIntervals, - Alias: rwctx, } - sas, err := streamaggr.LoadFromFile(*streamAggrConfig, pushAggregateSeries, opts) + sas, err := streamaggr.LoadFromFile(*streamAggrConfig, pushAggregateSeries, opts, "global") if err != nil { logger.Fatalf("cannot load -streamAggr.config=%q: %s", *streamAggrConfig, err) } @@ -123,14 +119,13 @@ func reloadStreamAggrConfig() { logger.Infof("reloading -streamAggr.config=%q", *streamAggrConfig) saCfgReloads.Inc() - opts := streamaggr.Options{ + opts := &streamaggr.Options{ DedupInterval: *streamAggrDedupInterval, DropInputLabels: *streamAggrDropInputLabels, IgnoreOldSamples: *streamAggrIgnoreOldSamples, IgnoreFirstIntervals: *streamAggrIgnoreFirstIntervals, - Alias: "global", } - sasNew, err := streamaggr.LoadFromFile(*streamAggrConfig, pushAggregateSeries, opts) + sasNew, err := streamaggr.LoadFromFile(*streamAggrConfig, pushAggregateSeries, opts, "global") if err != nil { saCfgSuccess.Set(0) saCfgReloadErr.Inc() diff --git a/dashboards/vm/vmagent.json b/dashboards/vm/vmagent.json index 27ad605a1..9afcc9109 100644 --- a/dashboards/vm/vmagent.json +++ b/dashboards/vm/vmagent.json @@ -5178,7 +5178,7 @@ "uid": "$ds" }, "editorMode": "code", - "expr": "sum(rate(vm_streamaggr_flushed_samples_total{job=~\"$job\",instance=~\"$instance\", url=~\"$url\"}[$__rate_interval])) without (instance, pod) > 0", + "expr": "sum(rate(vm_streamaggr_output_samples_total{job=~\"$job\",instance=~\"$instance\", url=~\"$url\"}[$__rate_interval])) without (instance, pod) > 0", "instant": false, "legendFormat": "{{url}} ({{job}}): match={{match}}; output={{output}}", "range": true, @@ -5496,103 +5496,6 @@ "title": "Dedup flush duration 0.99 quantile ($instance)", "type": "timeseries" }, - { - "datasource": { - "type": "victoriametrics-datasource", - "uid": "$ds" - }, - "description": "Shows the eviction rate of time series because of staleness.\n\nThere are two stages where series can be marked as stale.\n1. Input. Aggregator keeps in memory each received unique time series. The time series becomes stale and gets removed if no samples were received during [staleness interval](https://docs.victoriametrics.com/stream-aggregation/#staleness) for this series. \n\n2. Output. The output key is a resulting time series produced by aggregating many input series. The time series becomes stale and gets removed if no samples were received during [staleness interval](https://docs.victoriametrics.com/stream-aggregation/#staleness) for any of input series for this aggregation.\n\nIncrease in `input` keys shows that series previously matched by the aggregation rule now became stale.\n\nIncrease in `output` keys shows that series previously produced by the aggregation rule now became stale.", - "fieldConfig": { - "defaults": { - "color": { - "mode": "palette-classic" - }, - "custom": { - "axisBorderShow": false, - "axisCenteredZero": false, - "axisColorMode": "text", - "axisLabel": "", - "axisPlacement": "auto", - "axisSoftMin": 0, - "barAlignment": 0, - "drawStyle": "line", - "fillOpacity": 0, - "gradientMode": "none", - "hideFrom": { - "legend": false, - "tooltip": false, - "viz": false - }, - "insertNulls": false, - "lineInterpolation": "linear", - "lineWidth": 1, - "pointSize": 5, - "scaleDistribution": { - "type": "linear" - }, - "showPoints": "auto", - "spanNulls": false, - "stacking": { - "group": "A", - "mode": "none" - }, - "thresholdsStyle": { - "mode": "off" - } - }, - "mappings": [], - "thresholds": { - "mode": "absolute", - "steps": [ - { - "color": "green", - "value": null - }, - { - "color": "red", - "value": 80 - } - ] - } - }, - "overrides": [] - }, - "gridPos": { - "h": 8, - "w": 12, - "x": 0, - "y": 31 - }, - "id": 144, - "options": { - "legend": { - "calcs": [], - "displayMode": "list", - "placement": "bottom", - "showLegend": true - }, - "tooltip": { - "mode": "single", - "sort": "none" - } - }, - "targets": [ - { - "datasource": { - "type": "victoriametrics-datasource", - "uid": "$ds" - }, - "editorMode": "code", - "expr": "increase(vm_streamaggr_stale_samples_total{job=~\"$job\",instance=~\"$instance\", url=~\"$url\"}[$__rate_interval]) > 0", - "instant": false, - "legendFormat": "{{url}} ({{job}}): match={{match}}; key={{key}}", - "range": true, - "refId": "A" - } - ], - "title": "Staleness rate ($instance)", - "type": "timeseries" - }, { "datasource": { "type": "victoriametrics-datasource", diff --git a/dashboards/vmagent.json b/dashboards/vmagent.json index 3cf857e91..557cdc55b 100644 --- a/dashboards/vmagent.json +++ b/dashboards/vmagent.json @@ -5177,7 +5177,7 @@ "uid": "$ds" }, "editorMode": "code", - "expr": "sum(rate(vm_streamaggr_flushed_samples_total{job=~\"$job\",instance=~\"$instance\", url=~\"$url\"}[$__rate_interval])) without (instance, pod) > 0", + "expr": "sum(rate(vm_streamaggr_output_samples_total{job=~\"$job\",instance=~\"$instance\", url=~\"$url\"}[$__rate_interval])) without (instance, pod) > 0", "instant": false, "legendFormat": "{{url}} ({{job}}): match={{match}}; output={{output}}", "range": true, @@ -5495,103 +5495,6 @@ "title": "Dedup flush duration 0.99 quantile ($instance)", "type": "timeseries" }, - { - "datasource": { - "type": "prometheus", - "uid": "$ds" - }, - "description": "Shows the eviction rate of time series because of staleness.\n\nThere are two stages where series can be marked as stale.\n1. Input. Aggregator keeps in memory each received unique time series. The time series becomes stale and gets removed if no samples were received during [staleness interval](https://docs.victoriametrics.com/stream-aggregation/#staleness) for this series. \n\n2. Output. The output key is a resulting time series produced by aggregating many input series. The time series becomes stale and gets removed if no samples were received during [staleness interval](https://docs.victoriametrics.com/stream-aggregation/#staleness) for any of input series for this aggregation.\n\nIncrease in `input` keys shows that series previously matched by the aggregation rule now became stale.\n\nIncrease in `output` keys shows that series previously produced by the aggregation rule now became stale.", - "fieldConfig": { - "defaults": { - "color": { - "mode": "palette-classic" - }, - "custom": { - "axisBorderShow": false, - "axisCenteredZero": false, - "axisColorMode": "text", - "axisLabel": "", - "axisPlacement": "auto", - "axisSoftMin": 0, - "barAlignment": 0, - "drawStyle": "line", - "fillOpacity": 0, - "gradientMode": "none", - "hideFrom": { - "legend": false, - "tooltip": false, - "viz": false - }, - "insertNulls": false, - "lineInterpolation": "linear", - "lineWidth": 1, - "pointSize": 5, - "scaleDistribution": { - "type": "linear" - }, - "showPoints": "auto", - "spanNulls": false, - "stacking": { - "group": "A", - "mode": "none" - }, - "thresholdsStyle": { - "mode": "off" - } - }, - "mappings": [], - "thresholds": { - "mode": "absolute", - "steps": [ - { - "color": "green", - "value": null - }, - { - "color": "red", - "value": 80 - } - ] - } - }, - "overrides": [] - }, - "gridPos": { - "h": 8, - "w": 12, - "x": 0, - "y": 31 - }, - "id": 144, - "options": { - "legend": { - "calcs": [], - "displayMode": "list", - "placement": "bottom", - "showLegend": true - }, - "tooltip": { - "mode": "single", - "sort": "none" - } - }, - "targets": [ - { - "datasource": { - "type": "prometheus", - "uid": "$ds" - }, - "editorMode": "code", - "expr": "increase(vm_streamaggr_stale_samples_total{job=~\"$job\",instance=~\"$instance\", url=~\"$url\"}[$__rate_interval]) > 0", - "instant": false, - "legendFormat": "{{url}} ({{job}}): match={{match}}; key={{key}}", - "range": true, - "refId": "A" - } - ], - "title": "Staleness rate ($instance)", - "type": "timeseries" - }, { "datasource": { "type": "prometheus", diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index a4dea70bb..a48c326d5 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -42,14 +42,17 @@ See also [LTS releases](https://docs.victoriametrics.com/lts-releases/). * FEATURE: [dashboards](https://grafana.com/orgs/victoriametrics): add [Grafana dashboard](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/dashboards/vmauth.json) and [alerting rules](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/deployment/docker/alerts-vmauth.yml) for [vmauth](https://docs.victoriametrics.com/vmauth/) dashboard. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4313) for details. * FEATURE: [vmauth](https://docs.victoriametrics.com/vmauth/): reduces CPU usage by reusing request body buffer. Allows to disable requests caching with `-maxRequestBodySizeToRetry=0`. See this [PR](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6533) for details. * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent/): added `yandexcloud_sd` AWS API IMDSv2 support. -* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent/): expose metrics related to [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): - * `vm_streamaggr_matched_samples_total` - shows the number of samples matched by the aggregation rule; - * `vm_streamaggr_flushed_samples_total` - shows the number of samples produced by the aggregation rule; - * `vm_streamaggr_samples_lag_seconds` - shows the max lag between samples timestamps within one batch received by the aggregation; - * `vm_streamaggr_stale_samples_total` - shows the number of time series that became [stale](https://docs.victoriametrics.com/stream-aggregation/#staleness) during aggregation; - * metrics related to stream aggregation got additional labels `match` (matching param), `group` (`by` or `without` param), `url` (address of `remoteWrite.url` where aggregation is applied), `position` (the position of the aggregation rule in config file). - * These and other metrics were reflected on the [vmagent dashboard](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/dashboards/vmagent.json) in `stream aggregation` section. * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent/) and [Single-node VictoriaMetrics](https://docs.victoriametrics.com/): add `-graphite.sanitizeMetricName` cmd-line flag for sanitizing metrics ingested via [Graphite protocol](https://docs.victoriametrics.com/#how-to-send-data-from-graphite-compatible-agents-such-as-statsd). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6077). +* FEATURE: [streaming aggregation](https://docs.victoriametrics.com/stream-aggregation/): expose the following metrics at `/metrics` page of [vmagent](https://docs.victoriametrics.com/vmagent/) and [single-node VictoriaMetrics](https://docs.victoriametrics.com/): + * `vm_streamaggr_matched_samples_total` - the number of input samples matched by the corresponding aggregation rule + * `vm_streamaggr_output_samples_total` - the number of output samples produced by the corresponding aggregation rule + * `vm_streamaggr_samples_lag_seconds` - [histogram](https://docs.victoriametrics.com/keyconcepts/#histogram) with the lag between the current time and the timestamp seen in the aggregated input samples +* FEATURE: [steaming aggregation](https://docs.victoriametrics.com/stream-aggregation/): add new labels to `vl_streamaggr_*` metrics: + * `name` - the name of the streaming aggregation rule, which can be configured via `name` option - see [these docs](https://docs.victoriametrics.com/stream-aggregation/#stream-aggregation-config). + * `url` - `-remoteWrite.url` for the corresponding `-remoteWrite.streamAggr.config` + * `path` - path to the corresponding streaming aggregation config file + * `position` - the position of the aggregation rule in the corresponding streaming aggregation config file +* FEATURE: [vmagent dashboard](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/dashboards/vmagent.json): `stream aggregation` section: add graphs based on newly exposed streaming aggregation metrics. * FEATURE: [VictoriaMetrics cluster](https://docs.victoriametrics.com/cluster-victoriametrics/): do not retry RPC calls to vmstorage nodes if [complexity limits](https://docs.victoriametrics.com/#resource-usage-limits) were exceeded. * FEATURE: [vmalert](https://docs.victoriametrics.com/vmalert/): make `-replay.timeTo` optional in [replay mode](https://docs.victoriametrics.com/vmalert/#rules-backfilling). When omitted, the current timestamp will be used. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6492). * FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): show compacted result in the JSON tab for query results. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6559). diff --git a/docs/stream-aggregation.md b/docs/stream-aggregation.md index a61c9a202..2a8cb1cca 100644 --- a/docs/stream-aggregation.md +++ b/docs/stream-aggregation.md @@ -1018,6 +1018,13 @@ At [vmagent](https://docs.victoriametrics.com/vmagent/) `-remoteWrite.streamAggr specified individually per each `-remoteWrite.url`: ```yaml + # name is an optional name of the given streaming aggregation config. + # + # If it is set, then it is used as `name` label in the exposed metrics + # for the given aggregation config at /metrics page. + # See https://docs.victoriametrics.com/vmagent/#monitoring and https://docs.victoriametrics.com/#monitoring +- name: 'foobar' + # match is an optional filter for incoming samples to aggregate. # It can contain arbitrary Prometheus series selector # according to https://docs.victoriametrics.com/keyconcepts/#filtering . diff --git a/lib/streamaggr/deduplicator.go b/lib/streamaggr/deduplicator.go index ff7d3ddaa..273fd6eef 100644 --- a/lib/streamaggr/deduplicator.go +++ b/lib/streamaggr/deduplicator.go @@ -35,6 +35,8 @@ type Deduplicator struct { // An optional dropLabels list may contain label names, which must be dropped before de-duplicating samples. // Common case is to drop `replica`-like labels from samples received from HA datasources. // +// alias is url label used in metrics exposed by the returned Deduplicator. +// // MustStop must be called on the returned deduplicator in order to free up occupied resources. func NewDeduplicator(pushFunc PushFunc, dedupInterval time.Duration, dropLabels []string, alias string) *Deduplicator { d := &Deduplicator{ @@ -47,7 +49,8 @@ func NewDeduplicator(pushFunc PushFunc, dedupInterval time.Duration, dropLabels ms := d.ms - metricLabels := fmt.Sprintf(`url=%q`, alias) + metricLabels := fmt.Sprintf(`name="dedup",url=%q`, alias) + _ = ms.NewGauge(fmt.Sprintf(`vm_streamaggr_dedup_state_size_bytes{%s}`, metricLabels), func() float64 { return float64(d.da.sizeBytes()) }) @@ -55,8 +58,8 @@ func NewDeduplicator(pushFunc PushFunc, dedupInterval time.Duration, dropLabels return float64(d.da.itemsCount()) }) - d.dedupFlushDuration = ms.GetOrCreateHistogram(fmt.Sprintf(`vm_streamaggr_dedup_flush_duration_seconds{%s}`, metricLabels)) - d.dedupFlushTimeouts = ms.GetOrCreateCounter(fmt.Sprintf(`vm_streamaggr_dedup_flush_timeouts_total{%s}`, metricLabels)) + d.dedupFlushDuration = ms.NewHistogram(fmt.Sprintf(`vm_streamaggr_dedup_flush_duration_seconds{%s}`, metricLabels)) + d.dedupFlushTimeouts = ms.NewCounter(fmt.Sprintf(`vm_streamaggr_dedup_flush_timeouts_total{%s}`, metricLabels)) metrics.RegisterSet(ms) diff --git a/lib/streamaggr/histogram_bucket.go b/lib/streamaggr/histogram_bucket.go index c5fe06630..92982dcec 100644 --- a/lib/streamaggr/histogram_bucket.go +++ b/lib/streamaggr/histogram_bucket.go @@ -66,9 +66,8 @@ func (as *histogramBucketAggrState) pushSamples(samples []pushSample) { } } -func (as *histogramBucketAggrState) removeOldEntries(ctx *flushCtx, currentTime uint64) { +func (as *histogramBucketAggrState) removeOldEntries(currentTime uint64) { m := &as.m - var staleOutputSamples int m.Range(func(k, v any) bool { sv := v.(*histogramBucketStateValue) @@ -77,7 +76,6 @@ func (as *histogramBucketAggrState) removeOldEntries(ctx *flushCtx, currentTime if deleted { // Mark the current entry as deleted sv.deleted = deleted - staleOutputSamples++ } sv.mu.Unlock() @@ -86,14 +84,13 @@ func (as *histogramBucketAggrState) removeOldEntries(ctx *flushCtx, currentTime } return true }) - ctx.a.staleOutputSamples["histogram_bucket"].Add(staleOutputSamples) } func (as *histogramBucketAggrState) flushState(ctx *flushCtx, _ bool) { currentTime := fasttime.UnixTimestamp() currentTimeMsec := int64(currentTime) * 1000 - as.removeOldEntries(ctx, currentTime) + as.removeOldEntries(currentTime) m := &as.m m.Range(func(k, v any) bool { diff --git a/lib/streamaggr/rate.go b/lib/streamaggr/rate.go index 24101e4fe..55bd393db 100644 --- a/lib/streamaggr/rate.go +++ b/lib/streamaggr/rate.go @@ -105,16 +105,13 @@ func (as *rateAggrState) pushSamples(samples []pushSample) { } } -func (as *rateAggrState) flushState(ctx *flushCtx, _ bool) { +func (as *rateAggrState) flushState(ctx *flushCtx, resetState bool) { currentTime := fasttime.UnixTimestamp() currentTimeMsec := int64(currentTime) * 1000 - suffix := "rate_sum" - if as.isAvg { - suffix = "rate_avg" - } + suffix := as.getSuffix() - as.removeOldEntries(ctx, suffix, currentTime) + as.removeOldEntries(currentTime) m := &as.m m.Range(func(k, v any) bool { @@ -130,13 +127,16 @@ func (as *rateAggrState) flushState(ctx *flushCtx, _ bool) { sumRate += lv.increase / d countSeries++ } - lv.prevTimestamp = lv.timestamp - lv.increase = 0 - lvs[k1] = lv + if resetState { + lv.prevTimestamp = lv.timestamp + lv.increase = 0 + lvs[k1] = lv + } } + deleted := sv.deleted sv.mu.Unlock() - if countSeries == 0 { + if countSeries == 0 || deleted { // Nothing to update return true } @@ -152,9 +152,15 @@ func (as *rateAggrState) flushState(ctx *flushCtx, _ bool) { }) } -func (as *rateAggrState) removeOldEntries(ctx *flushCtx, suffix string, currentTime uint64) { +func (as *rateAggrState) getSuffix() string { + if as.isAvg { + return "rate_avg" + } + return "rate_sum" +} + +func (as *rateAggrState) removeOldEntries(currentTime uint64) { m := &as.m - var staleOutputSamples, staleInputSamples int m.Range(func(k, v any) bool { sv := v.(*rateStateValue) @@ -162,7 +168,6 @@ func (as *rateAggrState) removeOldEntries(ctx *flushCtx, suffix string, currentT if currentTime > sv.deleteDeadline { // Mark the current entry as deleted sv.deleted = true - staleOutputSamples++ sv.mu.Unlock() m.Delete(k) return true @@ -173,12 +178,9 @@ func (as *rateAggrState) removeOldEntries(ctx *flushCtx, suffix string, currentT for k1, lv := range lvs { if currentTime > lv.deleteDeadline { delete(lvs, k1) - staleInputSamples++ } } sv.mu.Unlock() return true }) - ctx.a.staleInputSamples[suffix].Add(staleInputSamples) - ctx.a.staleOutputSamples[suffix].Add(staleOutputSamples) } diff --git a/lib/streamaggr/streamaggr.go b/lib/streamaggr/streamaggr.go index b28914b3d..8e6068b7f 100644 --- a/lib/streamaggr/streamaggr.go +++ b/lib/streamaggr/streamaggr.go @@ -47,9 +47,6 @@ var supportedOutputs = []string{ "unique_samples", } -// maxLabelValueLen is maximum match expression label value length in stream aggregation metrics -const maxLabelValueLen = 64 - var ( // lc contains information about all compressed labels for streaming aggregation lc promutils.LabelsCompressor @@ -67,8 +64,10 @@ var ( // // opts can contain additional options. If opts is nil, then default options are used. // +// alias is used as url label in metrics exposed for the returned Aggregators. +// // The returned Aggregators must be stopped with MustStop() when no longer needed. -func LoadFromFile(path string, pushFunc PushFunc, opts Options) (*Aggregators, error) { +func LoadFromFile(path string, pushFunc PushFunc, opts *Options, alias string) (*Aggregators, error) { data, err := fscore.ReadFileOrHTTP(path) if err != nil { return nil, fmt.Errorf("cannot load aggregators: %w", err) @@ -78,7 +77,7 @@ func LoadFromFile(path string, pushFunc PushFunc, opts Options) (*Aggregators, e return nil, fmt.Errorf("cannot expand environment variables in %q: %w", path, err) } - as, err := LoadFromData(data, pushFunc, opts) + as, err := loadFromData(data, path, pushFunc, opts, alias) if err != nil { return nil, fmt.Errorf("cannot initialize aggregators from %q: %w; see https://docs.victoriametrics.com/stream-aggregation/#stream-aggregation-config", path, err) } @@ -136,16 +135,15 @@ type Options struct { // // This option can be overridden individually per each aggregation via ignore_first_intervals option. IgnoreFirstIntervals int - - // Alias is name or url of remote write context - Alias string - - // aggrID is aggregators id number starting from 1, which is used in metrics labels - aggrID int } // Config is a configuration for a single stream aggregation. type Config struct { + // Name is an optional name of the Config. + // + // It is used as `name` label in the exposed metrics for the given Config. + Name string `yaml:"name,omitempty"` + // Match is a label selector for filtering time series for the given selector. // // If the match isn't set, then all the input time series are processed. @@ -249,11 +247,26 @@ type Aggregators struct { // It is used in Equal() for comparing Aggregators. configData []byte + // filePath is the path to config file used for creating the Aggregators. + filePath string + + // ms contains metrics associated with the Aggregators. ms *metrics.Set } +// FilePath returns path to file with the configuration used for creating the given Aggregators. +func (a *Aggregators) FilePath() string { + return a.filePath +} + // LoadFromData loads aggregators from data. -func LoadFromData(data []byte, pushFunc PushFunc, opts Options) (*Aggregators, error) { +// +// opts can contain additional options. If opts is nil, then default options are used. +func LoadFromData(data []byte, pushFunc PushFunc, opts *Options, alias string) (*Aggregators, error) { + return loadFromData(data, "inmemory", pushFunc, opts, alias) +} + +func loadFromData(data []byte, filePath string, pushFunc PushFunc, opts *Options, alias string) (*Aggregators, error) { var cfgs []*Config if err := yaml.UnmarshalStrict(data, &cfgs); err != nil { return nil, fmt.Errorf("cannot parse stream aggregation config: %w", err) @@ -262,8 +275,7 @@ func LoadFromData(data []byte, pushFunc PushFunc, opts Options) (*Aggregators, e ms := metrics.NewSet() as := make([]*aggregator, len(cfgs)) for i, cfg := range cfgs { - opts.aggrID = i + 1 - a, err := newAggregator(cfg, pushFunc, ms, opts) + a, err := newAggregator(cfg, filePath, pushFunc, ms, opts, alias, i+1) if err != nil { // Stop already initialized aggregators before returning the error. for _, a := range as[:i] { @@ -278,30 +290,11 @@ func LoadFromData(data []byte, pushFunc PushFunc, opts Options) (*Aggregators, e logger.Panicf("BUG: cannot marshal the provided configs: %s", err) } - metricLabels := fmt.Sprintf("url=%q", opts.Alias) - _ = ms.NewGauge(fmt.Sprintf(`vm_streamaggr_dedup_state_size_bytes{%s}`, metricLabels), func() float64 { - n := uint64(0) - for _, aggr := range as { - if aggr.da != nil { - n += aggr.da.sizeBytes() - } - } - return float64(n) - }) - _ = ms.NewGauge(fmt.Sprintf(`vm_streamaggr_dedup_state_items_count{%s}`, metricLabels), func() float64 { - n := uint64(0) - for _, aggr := range as { - if aggr.da != nil { - n += aggr.da.itemsCount() - } - } - return float64(n) - }) - metrics.RegisterSet(ms) return &Aggregators{ as: as, configData: configData, + filePath: filePath, ms: ms, }, nil } @@ -380,11 +373,17 @@ type aggregator struct { without []string aggregateOnlyByTime bool + // interval is the interval between flushes + interval time.Duration + + // dedupInterval is optional deduplication interval for incoming samples + dedupInterval time.Duration + // da is set to non-nil if input samples must be de-duplicated da *dedupAggr - // aggrStates contains aggregate states for the given outputs - aggrStates map[string]aggrState + // aggrOutputs contains aggregate states for the given outputs + aggrOutputs []aggrOutput // minTimestamp is used for ignoring old samples when ignoreOldSamples is set minTimestamp atomic.Int64 @@ -406,11 +405,14 @@ type aggregator struct { flushTimeouts *metrics.Counter dedupFlushTimeouts *metrics.Counter ignoredOldSamples *metrics.Counter - ignoredNanSamples *metrics.Counter + IgnoredNaNSamples *metrics.Counter matchedSamples *metrics.Counter - staleInputSamples map[string]*metrics.Counter - staleOutputSamples map[string]*metrics.Counter - flushedSamples map[string]*metrics.Counter +} + +type aggrOutput struct { + as aggrState + + outputSamples *metrics.Counter } type aggrState interface { @@ -419,6 +421,13 @@ type aggrState interface { // samples[].key must be cloned by aggrState, since it may change after returning from pushSamples. pushSamples(samples []pushSample) + // flushState must flush aggrState data to ctx. + // + // if resetState is true, then aggrState must be reset after flushing the data to ctx, + // otherwise the aggrState data must be kept unchanged. + // + // The resetState is set to false only in the benchmark, which measures flushState() performance + // over the same aggrState. flushState(ctx *flushCtx, resetState bool) } @@ -430,7 +439,7 @@ type PushFunc func(tss []prompbmarshal.TimeSeries) // opts can contain additional options. If opts is nil, then default options are used. // // The returned aggregator must be stopped when no longer needed by calling MustStop(). -func newAggregator(cfg *Config, pushFunc PushFunc, ms *metrics.Set, opts Options) (*aggregator, error) { +func newAggregator(cfg *Config, path string, pushFunc PushFunc, ms *metrics.Set, opts *Options, alias string, aggrID int) (*aggregator, error) { // check cfg.Interval if cfg.Interval == "" { return nil, fmt.Errorf("missing `interval` option") @@ -443,6 +452,10 @@ func newAggregator(cfg *Config, pushFunc PushFunc, ms *metrics.Set, opts Options return nil, fmt.Errorf("aggregation interval cannot be smaller than 1s; got %s", interval) } + if opts == nil { + opts = &Options{} + } + // check cfg.DedupInterval dedupInterval := opts.DedupInterval if cfg.DedupInterval != "" { @@ -491,7 +504,7 @@ func newAggregator(cfg *Config, pushFunc PushFunc, ms *metrics.Set, opts Options by := sortAndRemoveDuplicates(cfg.By) without := sortAndRemoveDuplicates(cfg.Without) if len(by) > 0 && len(without) > 0 { - return nil, fmt.Errorf("`by: %s` and `without: %s` lists cannot be set simultaneously", by, without) + return nil, fmt.Errorf("`by: %s` and `without: %s` lists cannot be set simultaneously; see https://docs.victoriametrics.com/stream-aggregation/", by, without) } aggregateOnlyByTime := (len(by) == 0 && len(without) == 0) if !aggregateOnlyByTime && len(without) == 0 { @@ -505,10 +518,12 @@ func newAggregator(cfg *Config, pushFunc PushFunc, ms *metrics.Set, opts Options } if keepMetricNames { if len(cfg.Outputs) != 1 { - return nil, fmt.Errorf("`outputs` list must contain only a single entry if `keep_metric_names` is set; got %q", cfg.Outputs) + return nil, fmt.Errorf("`outputs` list must contain only a single entry if `keep_metric_names` is set; got %q; "+ + "see https://docs.victoriametrics.com/stream-aggregation/#output-metric-names", cfg.Outputs) } if cfg.Outputs[0] == "histogram_bucket" || strings.HasPrefix(cfg.Outputs[0], "quantiles(") && strings.Contains(cfg.Outputs[0], ",") { - return nil, fmt.Errorf("`keep_metric_names` cannot be applied to `outputs: %q`, since they can generate multiple time series", cfg.Outputs) + return nil, fmt.Errorf("`keep_metric_names` cannot be applied to `outputs: %q`, since they can generate multiple time series; "+ + "see https://docs.victoriametrics.com/stream-aggregation/#output-metric-names", cfg.Outputs) } } @@ -524,105 +539,42 @@ func newAggregator(cfg *Config, pushFunc PushFunc, ms *metrics.Set, opts Options ignoreFirstIntervals = *v } - // initialize outputs list - if len(cfg.Outputs) == 0 { - return nil, fmt.Errorf("`outputs` list must contain at least a single entry from the list %s", supportedOutputs) + // Initialize common metric labels + name := cfg.Name + if name == "" { + name = "none" } - aggrStates := make(map[string]aggrState, len(cfg.Outputs)) - for _, output := range cfg.Outputs { - // check for duplicated output - if _, ok := aggrStates[output]; ok { - return nil, fmt.Errorf("`outputs` list contains duplicated aggregation function: %s", output) + metricLabels := fmt.Sprintf(`name=%q,path=%q,url=%q,position="%d"`, name, path, alias, aggrID) + + // initialize aggrOutputs + if len(cfg.Outputs) == 0 { + return nil, fmt.Errorf("`outputs` list must contain at least a single entry from the list %s; "+ + "see https://docs.victoriametrics.com/stream-aggregation/", supportedOutputs) + } + aggrOutputs := make([]aggrOutput, len(cfg.Outputs)) + outputsSeen := make(map[string]struct{}, len(cfg.Outputs)) + for i, output := range cfg.Outputs { + as, err := newAggrState(output, outputsSeen, stalenessInterval) + if err != nil { + return nil, err } - if strings.HasPrefix(output, "quantiles(") { - if !strings.HasSuffix(output, ")") { - return nil, fmt.Errorf("missing closing brace for `quantiles()` output") - } - argsStr := output[len("quantiles(") : len(output)-1] - if len(argsStr) == 0 { - return nil, fmt.Errorf("`quantiles()` must contain at least one phi") - } - args := strings.Split(argsStr, ",") - phis := make([]float64, len(args)) - for j, arg := range args { - arg = strings.TrimSpace(arg) - phi, err := strconv.ParseFloat(arg, 64) - if err != nil { - return nil, fmt.Errorf("cannot parse phi=%q for quantiles(%s): %w", arg, argsStr, err) - } - if phi < 0 || phi > 1 { - return nil, fmt.Errorf("phi inside quantiles(%s) must be in the range [0..1]; got %v", argsStr, phi) - } - phis[j] = phi - } - if _, ok := aggrStates["quantiles"]; ok { - return nil, fmt.Errorf("`outputs` list contains duplicated `quantiles()` function, please combine multiple phi* like `quantiles(0.5, 0.9)`") - } - aggrStates["quantiles"] = newQuantilesAggrState(phis) - continue - } - switch output { - case "avg": - aggrStates[output] = newAvgAggrState() - case "count_samples": - aggrStates[output] = newCountSamplesAggrState() - case "count_series": - aggrStates[output] = newCountSeriesAggrState() - case "histogram_bucket": - aggrStates[output] = newHistogramBucketAggrState(stalenessInterval) - case "increase": - aggrStates[output] = newTotalAggrState(stalenessInterval, true, true) - case "increase_prometheus": - aggrStates[output] = newTotalAggrState(stalenessInterval, true, false) - case "last": - aggrStates[output] = newLastAggrState() - case "max": - aggrStates[output] = newMaxAggrState() - case "min": - aggrStates[output] = newMinAggrState() - case "rate_avg": - aggrStates[output] = newRateAggrState(stalenessInterval, true) - case "rate_sum": - aggrStates[output] = newRateAggrState(stalenessInterval, false) - case "stddev": - aggrStates[output] = newStddevAggrState() - case "stdvar": - aggrStates[output] = newStdvarAggrState() - case "sum_samples": - aggrStates[output] = newSumSamplesAggrState() - case "total": - aggrStates[output] = newTotalAggrState(stalenessInterval, false, true) - case "total_prometheus": - aggrStates[output] = newTotalAggrState(stalenessInterval, false, false) - case "unique_samples": - aggrStates[output] = newUniqueSamplesAggrState() - default: - return nil, fmt.Errorf("unsupported output=%q; supported values: %s;", output, supportedOutputs) + aggrOutputs[i] = aggrOutput{ + as: as, + + outputSamples: ms.NewCounter(fmt.Sprintf(`vm_streamaggr_output_samples_total{output=%q,%s}`, output, metricLabels)), } } // initialize suffix to add to metric names after aggregation suffix := ":" + cfg.Interval - group := "none" if labels := removeUnderscoreName(by); len(labels) > 0 { - group = fmt.Sprintf("by: %s", strings.Join(labels, ",")) suffix += fmt.Sprintf("_by_%s", strings.Join(labels, "_")) } if labels := removeUnderscoreName(without); len(labels) > 0 { - group = fmt.Sprintf("without: %s", strings.Join(labels, ",")) suffix += fmt.Sprintf("_without_%s", strings.Join(labels, "_")) } suffix += "_" - outputs := strings.Join(cfg.Outputs, ",") - - matchExpr := cfg.Match.String() - if len(matchExpr) > maxLabelValueLen { - matchExpr = matchExpr[:maxLabelValueLen-3] + "..." - } - - metricLabels := fmt.Sprintf(`match=%q, group=%q, url=%q, position="%d"`, matchExpr, group, opts.Alias, opts.aggrID) - // initialize the aggregator a := &aggregator{ match: cfg.Match, @@ -638,36 +590,37 @@ func newAggregator(cfg *Config, pushFunc PushFunc, ms *metrics.Set, opts Options without: without, aggregateOnlyByTime: aggregateOnlyByTime, - aggrStates: aggrStates, + interval: interval, + dedupInterval: dedupInterval, + + aggrOutputs: aggrOutputs, suffix: suffix, stopCh: make(chan struct{}), - flushDuration: ms.GetOrCreateHistogram(fmt.Sprintf(`vm_streamaggr_flush_duration_seconds{outputs=%q, %s}`, outputs, metricLabels)), - dedupFlushDuration: ms.GetOrCreateHistogram(fmt.Sprintf(`vm_streamaggr_dedup_flush_duration_seconds{outputs=%q, %s}`, outputs, metricLabels)), - samplesLag: ms.GetOrCreateHistogram(fmt.Sprintf(`vm_streamaggr_samples_lag_seconds{outputs=%q, %s}`, outputs, metricLabels)), + flushDuration: ms.NewHistogram(fmt.Sprintf(`vm_streamaggr_flush_duration_seconds{%s}`, metricLabels)), + dedupFlushDuration: ms.NewHistogram(fmt.Sprintf(`vm_streamaggr_dedup_flush_duration_seconds{%s}`, metricLabels)), + samplesLag: ms.NewHistogram(fmt.Sprintf(`vm_streamaggr_samples_lag_seconds{%s}`, metricLabels)), - matchedSamples: ms.GetOrCreateCounter(fmt.Sprintf(`vm_streamaggr_matched_samples_total{outputs=%q, %s}`, outputs, metricLabels)), - flushTimeouts: ms.GetOrCreateCounter(fmt.Sprintf(`vm_streamaggr_flush_timeouts_total{outputs=%q, %s}`, outputs, metricLabels)), - dedupFlushTimeouts: ms.GetOrCreateCounter(fmt.Sprintf(`vm_streamaggr_dedup_flush_timeouts_total{outputs=%q, %s}`, outputs, metricLabels)), - ignoredNanSamples: ms.GetOrCreateCounter(fmt.Sprintf(`vm_streamaggr_ignored_samples_total{reason="nan", outputs=%q, %s}`, outputs, metricLabels)), - ignoredOldSamples: ms.GetOrCreateCounter(fmt.Sprintf(`vm_streamaggr_ignored_samples_total{reason="too_old", outputs=%q, %s}`, outputs, metricLabels)), - staleInputSamples: make(map[string]*metrics.Counter, len(cfg.Outputs)), - staleOutputSamples: make(map[string]*metrics.Counter, len(cfg.Outputs)), - flushedSamples: make(map[string]*metrics.Counter, len(cfg.Outputs)), - } - for _, output := range cfg.Outputs { - // Removing output args for metric label value in outputs like quantile(arg1, arg2) - if ri := strings.IndexRune(output, '('); ri >= 0 { - output = output[:ri] - } - a.staleInputSamples[output] = ms.GetOrCreateCounter(fmt.Sprintf(`vm_streamaggr_stale_samples_total{key="input", output=%q, %s}`, output, metricLabels)) - a.staleOutputSamples[output] = ms.GetOrCreateCounter(fmt.Sprintf(`vm_streamaggr_stale_samples_total{key="output", output=%q, %s}`, output, metricLabels)) - a.flushedSamples[output] = ms.GetOrCreateCounter(fmt.Sprintf(`vm_streamaggr_flushed_samples_total{output=%q, %s}`, output, metricLabels)) + matchedSamples: ms.NewCounter(fmt.Sprintf(`vm_streamaggr_matched_samples_total{%s}`, metricLabels)), + flushTimeouts: ms.NewCounter(fmt.Sprintf(`vm_streamaggr_flush_timeouts_total{%s}`, metricLabels)), + dedupFlushTimeouts: ms.NewCounter(fmt.Sprintf(`vm_streamaggr_dedup_flush_timeouts_total{%s}`, metricLabels)), + IgnoredNaNSamples: ms.NewCounter(fmt.Sprintf(`vm_streamaggr_ignored_samples_total{reason="nan",%s}`, metricLabels)), + ignoredOldSamples: ms.NewCounter(fmt.Sprintf(`vm_streamaggr_ignored_samples_total{reason="too_old",%s}`, metricLabels)), } + if dedupInterval > 0 { a.da = newDedupAggr() + + _ = ms.NewGauge(fmt.Sprintf(`vm_streamaggr_dedup_state_size_bytes{%s}`, metricLabels), func() float64 { + n := a.da.sizeBytes() + return float64(n) + }) + _ = ms.NewGauge(fmt.Sprintf(`vm_streamaggr_dedup_state_items_count{%s}`, metricLabels), func() float64 { + n := a.da.itemsCount() + return float64(n) + }) } alignFlushToInterval := !opts.NoAlignFlushToInterval @@ -682,14 +635,89 @@ func newAggregator(cfg *Config, pushFunc PushFunc, ms *metrics.Set, opts Options a.wg.Add(1) go func() { - a.runFlusher(pushFunc, alignFlushToInterval, skipIncompleteFlush, interval, dedupInterval, ignoreFirstIntervals) + a.runFlusher(pushFunc, alignFlushToInterval, skipIncompleteFlush, ignoreFirstIntervals) a.wg.Done() }() return a, nil } -func (a *aggregator) runFlusher(pushFunc PushFunc, alignFlushToInterval, skipIncompleteFlush bool, interval, dedupInterval time.Duration, ignoreFirstIntervals int) { +func newAggrState(output string, outputsSeen map[string]struct{}, stalenessInterval time.Duration) (aggrState, error) { + // check for duplicated output + if _, ok := outputsSeen[output]; ok { + return nil, fmt.Errorf("`outputs` list contains duplicate aggregation function: %s", output) + } + outputsSeen[output] = struct{}{} + + if strings.HasPrefix(output, "quantiles(") { + if !strings.HasSuffix(output, ")") { + return nil, fmt.Errorf("missing closing brace for `quantiles()` output") + } + argsStr := output[len("quantiles(") : len(output)-1] + if len(argsStr) == 0 { + return nil, fmt.Errorf("`quantiles()` must contain at least one phi") + } + args := strings.Split(argsStr, ",") + phis := make([]float64, len(args)) + for i, arg := range args { + arg = strings.TrimSpace(arg) + phi, err := strconv.ParseFloat(arg, 64) + if err != nil { + return nil, fmt.Errorf("cannot parse phi=%q for quantiles(%s): %w", arg, argsStr, err) + } + if phi < 0 || phi > 1 { + return nil, fmt.Errorf("phi inside quantiles(%s) must be in the range [0..1]; got %v", argsStr, phi) + } + phis[i] = phi + } + if _, ok := outputsSeen["quantiles"]; ok { + return nil, fmt.Errorf("`outputs` list contains duplicated `quantiles()` function, please combine multiple phi* like `quantiles(0.5, 0.9)`") + } + outputsSeen["quantiles"] = struct{}{} + return newQuantilesAggrState(phis), nil + } + + switch output { + case "avg": + return newAvgAggrState(), nil + case "count_samples": + return newCountSamplesAggrState(), nil + case "count_series": + return newCountSeriesAggrState(), nil + case "histogram_bucket": + return newHistogramBucketAggrState(stalenessInterval), nil + case "increase": + return newTotalAggrState(stalenessInterval, true, true), nil + case "increase_prometheus": + return newTotalAggrState(stalenessInterval, true, false), nil + case "last": + return newLastAggrState(), nil + case "max": + return newMaxAggrState(), nil + case "min": + return newMinAggrState(), nil + case "rate_avg": + return newRateAggrState(stalenessInterval, true), nil + case "rate_sum": + return newRateAggrState(stalenessInterval, false), nil + case "stddev": + return newStddevAggrState(), nil + case "stdvar": + return newStdvarAggrState(), nil + case "sum_samples": + return newSumSamplesAggrState(), nil + case "total": + return newTotalAggrState(stalenessInterval, false, true), nil + case "total_prometheus": + return newTotalAggrState(stalenessInterval, false, false), nil + case "unique_samples": + return newUniqueSamplesAggrState(), nil + default: + return nil, fmt.Errorf("unsupported output=%q; supported values: %s; see https://docs.victoriametrics.com/stream-aggregation/", output, supportedOutputs) + } +} + +func (a *aggregator) runFlusher(pushFunc PushFunc, alignFlushToInterval, skipIncompleteFlush bool, ignoreFirstIntervals int) { alignedSleep := func(d time.Duration) { if !alignFlushToInterval { return @@ -714,22 +742,22 @@ func (a *aggregator) runFlusher(pushFunc PushFunc, alignFlushToInterval, skipInc } } - if dedupInterval <= 0 { - alignedSleep(interval) - t := time.NewTicker(interval) + if a.dedupInterval <= 0 { + alignedSleep(a.interval) + t := time.NewTicker(a.interval) defer t.Stop() if alignFlushToInterval && skipIncompleteFlush { - a.flush(nil, interval, true) + a.flush(nil) ignoreFirstIntervals-- } for tickerWait(t) { if ignoreFirstIntervals > 0 { - a.flush(nil, interval, true) + a.flush(nil) ignoreFirstIntervals-- } else { - a.flush(pushFunc, interval, true) + a.flush(pushFunc) } if alignFlushToInterval { @@ -740,30 +768,30 @@ func (a *aggregator) runFlusher(pushFunc PushFunc, alignFlushToInterval, skipInc } } } else { - alignedSleep(dedupInterval) - t := time.NewTicker(dedupInterval) + alignedSleep(a.dedupInterval) + t := time.NewTicker(a.dedupInterval) defer t.Stop() - flushDeadline := time.Now().Add(interval) + flushDeadline := time.Now().Add(a.interval) isSkippedFirstFlush := false for tickerWait(t) { - a.dedupFlush(dedupInterval) + a.dedupFlush() ct := time.Now() if ct.After(flushDeadline) { // It is time to flush the aggregated state if alignFlushToInterval && skipIncompleteFlush && !isSkippedFirstFlush { - a.flush(nil, interval, true) + a.flush(nil) ignoreFirstIntervals-- isSkippedFirstFlush = true } else if ignoreFirstIntervals > 0 { - a.flush(nil, interval, true) + a.flush(nil) ignoreFirstIntervals-- } else { - a.flush(pushFunc, interval, true) + a.flush(pushFunc) } for ct.After(flushDeadline) { - flushDeadline = flushDeadline.Add(interval) + flushDeadline = flushDeadline.Add(a.interval) } } @@ -777,13 +805,13 @@ func (a *aggregator) runFlusher(pushFunc PushFunc, alignFlushToInterval, skipInc } if !skipIncompleteFlush && ignoreFirstIntervals <= 0 { - a.dedupFlush(dedupInterval) - a.flush(pushFunc, interval, true) + a.dedupFlush() + a.flush(pushFunc) } } -func (a *aggregator) dedupFlush(dedupInterval time.Duration) { - if dedupInterval <= 0 { +func (a *aggregator) dedupFlush() { + if a.dedupInterval <= 0 { // The de-duplication is disabled. return } @@ -794,15 +822,22 @@ func (a *aggregator) dedupFlush(dedupInterval time.Duration) { d := time.Since(startTime) a.dedupFlushDuration.Update(d.Seconds()) - if d > dedupInterval { + if d > a.dedupInterval { a.dedupFlushTimeouts.Inc() logger.Warnf("deduplication couldn't be finished in the configured dedup_interval=%s; it took %.03fs; "+ "possible solutions: increase dedup_interval; use match filter matching smaller number of series; "+ - "reduce samples' ingestion rate to stream aggregation", dedupInterval, d.Seconds()) + "reduce samples' ingestion rate to stream aggregation", a.dedupInterval, d.Seconds()) } } -func (a *aggregator) flush(pushFunc PushFunc, interval time.Duration, resetState bool) { +// flush flushes aggregator state to pushFunc. +// +// If pushFunc is nil, then the aggregator state is just reset. +func (a *aggregator) flush(pushFunc PushFunc) { + a.flushInternal(pushFunc, true) +} + +func (a *aggregator) flushInternal(pushFunc PushFunc, resetState bool) { startTime := time.Now() // Update minTimestamp before flushing samples to the storage, @@ -811,31 +846,31 @@ func (a *aggregator) flush(pushFunc PushFunc, interval time.Duration, resetState a.minTimestamp.Store(startTime.UnixMilli() - 5_000) var wg sync.WaitGroup - for output, as := range a.aggrStates { + for i := range a.aggrOutputs { + ao := &a.aggrOutputs[i] flushConcurrencyCh <- struct{}{} wg.Add(1) - go func(as aggrState) { + go func(ao *aggrOutput) { defer func() { <-flushConcurrencyCh wg.Done() }() - ctx := getFlushCtx(a, pushFunc) - as.flushState(ctx, resetState) - ctx.flushSeries(output) - ctx.resetSeries() + ctx := getFlushCtx(a, ao, pushFunc) + ao.as.flushState(ctx, resetState) + ctx.flushSeries() putFlushCtx(ctx) - }(as) + }(ao) } wg.Wait() d := time.Since(startTime) a.flushDuration.Update(d.Seconds()) - if d > interval { + if d > a.interval { a.flushTimeouts.Inc() logger.Warnf("stream aggregation couldn't be finished in the configured interval=%s; it took %.03fs; "+ "possible solutions: increase interval; use match filter matching smaller number of series; "+ - "reduce samples' ingestion rate to stream aggregation", interval, d.Seconds()) + "reduce samples' ingestion rate to stream aggregation", a.interval, d.Seconds()) } } @@ -851,7 +886,6 @@ func (a *aggregator) MustStop() { // Push pushes tss to a. func (a *aggregator) Push(tss []prompbmarshal.TimeSeries, matchIdxs []byte) { - now := time.Now().UnixMilli() ctx := getPushCtx() defer putPushCtx(ctx) @@ -864,7 +898,9 @@ func (a *aggregator) Push(tss []prompbmarshal.TimeSeries, matchIdxs []byte) { dropLabels := a.dropInputLabels ignoreOldSamples := a.ignoreOldSamples minTimestamp := a.minTimestamp.Load() - var maxLag int64 + + nowMsec := time.Now().UnixMilli() + var maxLagMsec int64 for idx, ts := range tss { if !a.match.Match(ts.Labels) { continue @@ -896,30 +932,31 @@ func (a *aggregator) Push(tss []prompbmarshal.TimeSeries, matchIdxs []byte) { // key remains valid only by the end of this function and can't be reused after // do not intern key because number of unique keys could be too high key := bytesutil.ToUnsafeString(buf[bufLen:]) - for _, sample := range ts.Samples { - if math.IsNaN(sample.Value) { - a.ignoredNanSamples.Inc() + for _, s := range ts.Samples { + if math.IsNaN(s.Value) { + a.IgnoredNaNSamples.Inc() // Skip NaN values continue } - if ignoreOldSamples && sample.Timestamp < minTimestamp { + if ignoreOldSamples && s.Timestamp < minTimestamp { a.ignoredOldSamples.Inc() // Skip old samples outside the current aggregation interval continue } - if maxLag < now-sample.Timestamp { - maxLag = now - sample.Timestamp + lagMsec := nowMsec - s.Timestamp + if lagMsec > maxLagMsec { + maxLagMsec = lagMsec } samples = append(samples, pushSample{ key: key, - value: sample.Value, - timestamp: sample.Timestamp, + value: s.Value, + timestamp: s.Timestamp, }) } } if len(samples) > 0 { a.matchedSamples.Add(len(samples)) - a.samplesLag.Update(float64(maxLag) / 1_000) + a.samplesLag.Update(float64(maxLagMsec) / 1_000) } ctx.samples = samples ctx.buf = buf @@ -969,8 +1006,8 @@ func getInputOutputKey(key string) (string, string) { } func (a *aggregator) pushSamples(samples []pushSample) { - for _, as := range a.aggrStates { - as.pushSamples(samples) + for _, ao := range a.aggrOutputs { + ao.as.pushSamples(samples) } } @@ -1036,13 +1073,14 @@ func getInputOutputLabels(dstInput, dstOutput, labels []prompbmarshal.Label, by, return dstInput, dstOutput } -func getFlushCtx(a *aggregator, pushFunc PushFunc) *flushCtx { +func getFlushCtx(a *aggregator, ao *aggrOutput, pushFunc PushFunc) *flushCtx { v := flushCtxPool.Get() if v == nil { v = &flushCtx{} } ctx := v.(*flushCtx) ctx.a = a + ctx.ao = ao ctx.pushFunc = pushFunc return ctx } @@ -1056,6 +1094,7 @@ var flushCtxPool sync.Pool type flushCtx struct { a *aggregator + ao *aggrOutput pushFunc PushFunc tss []prompbmarshal.TimeSeries @@ -1065,6 +1104,7 @@ type flushCtx struct { func (ctx *flushCtx) reset() { ctx.a = nil + ctx.ao = nil ctx.pushFunc = nil ctx.resetSeries() } @@ -1079,7 +1119,9 @@ func (ctx *flushCtx) resetSeries() { ctx.samples = ctx.samples[:0] } -func (ctx *flushCtx) flushSeries(aggrStateSuffix string) { +func (ctx *flushCtx) flushSeries() { + defer ctx.resetSeries() + tss := ctx.tss if len(tss) == 0 { // nothing to flush @@ -1091,7 +1133,7 @@ func (ctx *flushCtx) flushSeries(aggrStateSuffix string) { // Fast path - push the output metrics. if ctx.pushFunc != nil { ctx.pushFunc(tss) - ctx.a.flushedSamples[aggrStateSuffix].Add(len(tss)) + ctx.ao.outputSamples.Add(len(tss)) } return } @@ -1113,7 +1155,7 @@ func (ctx *flushCtx) flushSeries(aggrStateSuffix string) { } if ctx.pushFunc != nil { ctx.pushFunc(dst) - ctx.a.flushedSamples[aggrStateSuffix].Add(len(dst)) + ctx.ao.outputSamples.Add(len(dst)) } auxLabels.Labels = dstLabels promutils.PutLabels(auxLabels) @@ -1137,8 +1179,7 @@ func (ctx *flushCtx) appendSeries(key, suffix string, timestamp int64, value flo // Limit the maximum length of ctx.tss in order to limit memory usage. if len(ctx.tss) >= 10_000 { - ctx.flushSeries(suffix) - ctx.resetSeries() + ctx.flushSeries() } } @@ -1161,7 +1202,11 @@ func (ctx *flushCtx) appendSeriesWithExtraLabel(key, suffix string, timestamp in Labels: ctx.labels[labelsLen:], Samples: ctx.samples[samplesLen:], }) - ctx.a.flushedSamples[suffix].Add(len(ctx.tss)) + + // Limit the maximum length of ctx.tss in order to limit memory usage. + if len(ctx.tss) >= 10_000 { + ctx.flushSeries() + } } func addMetricSuffix(labels []prompbmarshal.Label, offset int, firstSuffix, lastSuffix string) []prompbmarshal.Label { diff --git a/lib/streamaggr/streamaggr_test.go b/lib/streamaggr/streamaggr_test.go index 868b7bdbb..ae64a6ac7 100644 --- a/lib/streamaggr/streamaggr_test.go +++ b/lib/streamaggr/streamaggr_test.go @@ -19,7 +19,7 @@ func TestAggregatorsFailure(t *testing.T) { pushFunc := func(_ []prompbmarshal.TimeSeries) { panic(fmt.Errorf("pushFunc shouldn't be called")) } - a, err := LoadFromData([]byte(config), pushFunc, Options{}) + a, err := LoadFromData([]byte(config), pushFunc, nil, "some_alias") if err == nil { t.Fatalf("expecting non-nil error") } @@ -200,11 +200,11 @@ func TestAggregatorsEqual(t *testing.T) { t.Helper() pushFunc := func(_ []prompbmarshal.TimeSeries) {} - aa, err := LoadFromData([]byte(a), pushFunc, Options{}) + aa, err := LoadFromData([]byte(a), pushFunc, nil, "some_alias") if err != nil { t.Fatalf("cannot initialize aggregators: %s", err) } - ab, err := LoadFromData([]byte(b), pushFunc, Options{}) + ab, err := LoadFromData([]byte(b), pushFunc, nil, "some_alias") if err != nil { t.Fatalf("cannot initialize aggregators: %s", err) } @@ -263,11 +263,11 @@ func TestAggregatorsSuccess(t *testing.T) { tssOutput = appendClonedTimeseries(tssOutput, tss) tssOutputLock.Unlock() } - opts := Options{ + opts := &Options{ FlushOnShutdown: true, NoAlignFlushToInterval: true, } - a, err := LoadFromData([]byte(config), pushFunc, opts) + a, err := LoadFromData([]byte(config), pushFunc, opts, "some_alias") if err != nil { t.Fatalf("cannot initialize aggregators: %s", err) } @@ -515,6 +515,7 @@ foo-1m-without-abc-sum-samples 12.5 without: [abc] outputs: [count_samples, sum_samples, count_series] match: '{non_existing_label!=""}' + name: foobar `, ` foo{abc="123"} 4 bar 5 @@ -527,6 +528,7 @@ foo{abc="456",de="fg"} 8 - interval: 1m by: [abc] outputs: [count_samples, sum_samples, count_series] + name: abcdef match: - foo{abc=~".+"} - '{non_existing_label!=""}' @@ -980,11 +982,11 @@ func TestAggregatorsWithDedupInterval(t *testing.T) { } tssOutputLock.Unlock() } - opts := Options{ + opts := &Options{ DedupInterval: 30 * time.Second, FlushOnShutdown: true, } - a, err := LoadFromData([]byte(config), pushFunc, opts) + a, err := LoadFromData([]byte(config), pushFunc, opts, "some_alias") if err != nil { t.Fatalf("cannot initialize aggregators: %s", err) } diff --git a/lib/streamaggr/streamaggr_timing_test.go b/lib/streamaggr/streamaggr_timing_test.go index 4a787458f..530ac9d6b 100644 --- a/lib/streamaggr/streamaggr_timing_test.go +++ b/lib/streamaggr/streamaggr_timing_test.go @@ -39,7 +39,7 @@ func BenchmarkAggregatorsPush(b *testing.B) { } } -func BenchmarkAggregatorsFlushSerial(b *testing.B) { +func BenchmarkAggregatorsFlushInternalSerial(b *testing.B) { pushFunc := func(_ []prompbmarshal.TimeSeries) {} a := newBenchAggregators(benchOutputs, pushFunc) defer a.MustStop() @@ -50,7 +50,7 @@ func BenchmarkAggregatorsFlushSerial(b *testing.B) { b.SetBytes(int64(len(benchSeries) * len(benchOutputs))) for i := 0; i < b.N; i++ { for _, aggr := range a.as { - aggr.flush(pushFunc, time.Hour, false) + aggr.flushInternal(pushFunc, false) } } } @@ -87,7 +87,7 @@ func newBenchAggregators(outputs []string, pushFunc PushFunc) *Aggregators { outputs: [%s] `, strings.Join(outputsQuoted, ",")) - a, err := LoadFromData([]byte(config), pushFunc, Options{}) + a, err := LoadFromData([]byte(config), pushFunc, nil, "some_alias") if err != nil { panic(fmt.Errorf("unexpected error when initializing aggregators: %s", err)) } diff --git a/lib/streamaggr/total.go b/lib/streamaggr/total.go index c0df106f5..fda700b4c 100644 --- a/lib/streamaggr/total.go +++ b/lib/streamaggr/total.go @@ -13,8 +13,6 @@ import ( type totalAggrState struct { m sync.Map - suffix string - // Whether to reset the output value on every flushState call. resetTotalOnFlush bool @@ -50,15 +48,8 @@ type totalLastValueState struct { func newTotalAggrState(stalenessInterval time.Duration, resetTotalOnFlush, keepFirstSample bool) *totalAggrState { stalenessSecs := roundDurationToSecs(stalenessInterval) ignoreFirstSampleDeadline := fasttime.UnixTimestamp() + stalenessSecs - suffix := "total" - if resetTotalOnFlush { - suffix = "increase" - } - if !keepFirstSample { - suffix += "_prometheus" - } + return &totalAggrState{ - suffix: suffix, resetTotalOnFlush: resetTotalOnFlush, keepFirstSample: keepFirstSample, stalenessSecs: stalenessSecs, @@ -128,11 +119,14 @@ func (as *totalAggrState) flushState(ctx *flushCtx, resetState bool) { currentTime := fasttime.UnixTimestamp() currentTimeMsec := int64(currentTime) * 1000 - as.removeOldEntries(ctx, currentTime) + suffix := as.getSuffix() + + as.removeOldEntries(currentTime) m := &as.m m.Range(func(k, v any) bool { sv := v.(*totalStateValue) + sv.mu.Lock() total := sv.total if resetState { @@ -145,17 +139,31 @@ func (as *totalAggrState) flushState(ctx *flushCtx, resetState bool) { } deleted := sv.deleted sv.mu.Unlock() + if !deleted { key := k.(string) - ctx.appendSeries(key, as.suffix, currentTimeMsec, total) + ctx.appendSeries(key, suffix, currentTimeMsec, total) } return true }) } -func (as *totalAggrState) removeOldEntries(ctx *flushCtx, currentTime uint64) { +func (as *totalAggrState) getSuffix() string { + // Note: this function is at hot path, so it shouldn't allocate. + if as.resetTotalOnFlush { + if as.keepFirstSample { + return "increase" + } + return "increase_prometheus" + } + if as.keepFirstSample { + return "total" + } + return "total_prometheus" +} + +func (as *totalAggrState) removeOldEntries(currentTime uint64) { m := &as.m - var staleInputSamples, staleOutputSamples int m.Range(func(k, v any) bool { sv := v.(*totalStateValue) @@ -163,7 +171,6 @@ func (as *totalAggrState) removeOldEntries(ctx *flushCtx, currentTime uint64) { if currentTime > sv.deleteDeadline { // Mark the current entry as deleted sv.deleted = true - staleOutputSamples++ sv.mu.Unlock() m.Delete(k) return true @@ -174,12 +181,9 @@ func (as *totalAggrState) removeOldEntries(ctx *flushCtx, currentTime uint64) { for k1, lv := range lvs { if currentTime > lv.deleteDeadline { delete(lvs, k1) - staleInputSamples++ } } sv.mu.Unlock() return true }) - ctx.a.staleInputSamples[as.suffix].Add(staleInputSamples) - ctx.a.staleOutputSamples[as.suffix].Add(staleOutputSamples) }