From d577657fb705ac3a6f3a79ab9ac8c33d9712b808 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Fri, 31 Mar 2023 21:27:45 -0700 Subject: [PATCH] lib/streamaggr: follow-up for ff72ca14b9d51323e8014f07f424eb9f72d50eb2 - Make sure that the last successfully loaded config is used on hot-reload failure - Properly cleanup resources occupied by already initialized aggregators when the current aggregator fails to be initialized - Expose distinct vmagent_streamaggr_config_reload* metrics per each -remoteWrite.streamAggr.config This should simplify monitoring and debugging failed reloads - Remove race condition at app/vminsert/common.MustStopStreamAggr when calling sa.MustStop() while sa could be in use at realoadSaConfig() - Remove lib/streamaggr.aggregator.hasState global variable, since it may negatively impact scalability on system with big number of CPU cores at hasState.Store(true) call inside aggregator.Push(). - Remove fine-grained aggregator reload - reload all the aggregators on config change instead. This simplifies the code a bit. The fine-grained aggregator reload may be returned back if there will be demand from real users for it. - Check -relabelConfig and -streamAggr.config files when single-node VictoriaMetrics runs with -dryRun flag - Return back accidentally removed changelog for v1.87.4 at docs/CHANGELOG.md Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3639 --- README.md | 2 +- app/victoria-metrics/main.go | 13 +- app/vmagent/README.md | 4 +- app/vmagent/main.go | 10 +- app/vmagent/remotewrite/remotewrite.go | 166 +++++++++++++--------- app/vmagent/remotewrite/streamagg.go | 118 ---------------- app/vminsert/common/insert_ctx.go | 3 +- app/vminsert/common/streamaggr.go | 108 +++++++------- app/vminsert/relabel/relabel.go | 6 + docs/CHANGELOG.md | 16 ++- docs/README.md | 2 +- docs/Single-server-VictoriaMetrics.md | 2 +- docs/stream-aggregation.md | 14 +- docs/vmagent.md | 2 +- lib/promscrape/scraper.go | 2 +- lib/streamaggr/streamaggr.go | 186 ++++++------------------- lib/streamaggr/streamaggr_test.go | 146 ++++++------------- 17 files changed, 291 insertions(+), 509 deletions(-) delete mode 100644 app/vmagent/remotewrite/streamagg.go diff --git a/README.md b/README.md index ff78de46e..20c3b3a64 100644 --- a/README.md +++ b/README.md @@ -2193,7 +2193,7 @@ Pass `-help` to VictoriaMetrics in order to see the list of supported command-li Comma-separated downsampling periods in the format 'offset:period'. For example, '30d:10m' instructs to leave a single sample per 10 minutes for samples older than 30 days. See https://docs.victoriametrics.com/#downsampling for details. This flag is available only in VictoriaMetrics enterprise. See https://docs.victoriametrics.com/enterprise.html Supports an array of values separated by comma or specified via multiple flags. -dryRun - Whether to check only -promscrape.config and then exit. Unknown config entries aren't allowed in -promscrape.config by default. This can be changed with -promscrape.config.strictParse=false command-line flag + Whether to check config files without running VictoriaMetrics. The following config files are checked: -promscrape.config, -relabelConfig and -streamAggr.config. Unknown config entries aren't allowed in -promscrape.config by default. This can be changed with -promscrape.config.strictParse=false command-line flag -enableTCP6 Whether to enable IPv6 for listening and dialing. By default only IPv4 TCP and UDP is used -envflag.enable diff --git a/app/victoria-metrics/main.go b/app/victoria-metrics/main.go index 848d592b5..a37f43b2e 100644 --- a/app/victoria-metrics/main.go +++ b/app/victoria-metrics/main.go @@ -8,6 +8,8 @@ import ( "time" "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert" + vminsertcommon "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/common" + vminsertrelabel "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/relabel" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/promql" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmstorage" @@ -30,8 +32,9 @@ var ( "With enabled proxy protocol http server cannot serve regular /metrics endpoint. Use -pushmetrics.url for metrics pushing") minScrapeInterval = flag.Duration("dedup.minScrapeInterval", 0, "Leave only the last sample in every time series per each discrete interval "+ "equal to -dedup.minScrapeInterval > 0. See https://docs.victoriametrics.com/#deduplication and https://docs.victoriametrics.com/#downsampling") - dryRun = flag.Bool("dryRun", false, "Whether to check only -promscrape.config and then exit. "+ - "Unknown config entries aren't allowed in -promscrape.config by default. This can be changed with -promscrape.config.strictParse=false command-line flag") + dryRun = flag.Bool("dryRun", false, "Whether to check config files without running VictoriaMetrics. The following config files are checked: "+ + "-promscrape.config, -relabelConfig and -streamAggr.config. Unknown config entries aren't allowed in -promscrape.config by default. "+ + "This can be changed with -promscrape.config.strictParse=false command-line flag") inmemoryDataFlushInterval = flag.Duration("inmemoryDataFlushInterval", 5*time.Second, "The interval for guaranteed saving of in-memory data to disk. "+ "The saved data survives unclean shutdown such as OOM crash, hardware reset, SIGKILL, etc. "+ "Bigger intervals may help increasing lifetime of flash storage with limited write cycles (e.g. Raspberry PI). "+ @@ -54,6 +57,12 @@ func main() { if err := promscrape.CheckConfig(); err != nil { logger.Fatalf("error when checking -promscrape.config: %s", err) } + if err := vminsertrelabel.CheckRelabelConfig(); err != nil { + logger.Fatalf("error when checking -relabelConfig: %s", err) + } + if err := vminsertcommon.CheckStreamAggrConfig(); err != nil { + logger.Fatalf("error when checking -streamAggr.config: %s", err) + } logger.Infof("-promscrape.config is ok; exiting with 0 status code") return } diff --git a/app/vmagent/README.md b/app/vmagent/README.md index 3fa07a66f..5a65edec7 100644 --- a/app/vmagent/README.md +++ b/app/vmagent/README.md @@ -104,7 +104,7 @@ additionally to pull-based Prometheus-compatible targets' scraping: `vmagent` should be restarted in order to update config options set via command-line args. `vmagent` supports multiple approaches for reloading configs from updated config files such as -`-promscrape.config`, `-remoteWrite.relabelConfig` and `-remoteWrite.urlRelabelConfig`: +`-promscrape.config`, `-remoteWrite.relabelConfig`, `-remoteWrite.urlRelabelConfig` and `-remoteWrite.streamAggr.config`: * Sending `SIGHUP` signal to `vmagent` process: @@ -1186,7 +1186,7 @@ See the docs at https://docs.victoriametrics.com/vmagent.html . -denyQueryTracing Whether to disable the ability to trace queries. See https://docs.victoriametrics.com/#query-tracing -dryRun - Whether to check only config files without running vmagent. The following files are checked: -promscrape.config, -remoteWrite.relabelConfig, -remoteWrite.urlRelabelConfig . Unknown config entries aren't allowed in -promscrape.config by default. This can be changed by passing -promscrape.config.strictParse=false command-line flag + Whether to check config files without running vmagent. The following files are checked: -promscrape.config, -remoteWrite.relabelConfig, -remoteWrite.urlRelabelConfig, -remoteWrite.streamAggr.config . Unknown config entries aren't allowed in -promscrape.config by default. This can be changed by passing -promscrape.config.strictParse=false command-line flag -enableTCP6 Whether to enable IPv6 for listening and dialing. By default only IPv4 TCP and UDP is used -envflag.enable diff --git a/app/vmagent/main.go b/app/vmagent/main.go index 7c7486935..a0db615b8 100644 --- a/app/vmagent/main.go +++ b/app/vmagent/main.go @@ -67,7 +67,7 @@ var ( opentsdbHTTPUseProxyProtocol = flag.Bool("opentsdbHTTPListenAddr.useProxyProtocol", false, "Whether to use proxy protocol for connections accepted "+ "at -opentsdbHTTPListenAddr . See https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt") configAuthKey = flag.String("configAuthKey", "", "Authorization key for accessing /config page. It must be passed via authKey query arg") - dryRun = flag.Bool("dryRun", false, "Whether to check only config files without running vmagent. The following files are checked: "+ + dryRun = flag.Bool("dryRun", false, "Whether to check config files without running vmagent. The following files are checked: "+ "-promscrape.config, -remoteWrite.relabelConfig, -remoteWrite.urlRelabelConfig, -remoteWrite.streamAggr.config . "+ "Unknown config entries aren't allowed in -promscrape.config by default. This can be changed by passing -promscrape.config.strictParse=false command-line flag") ) @@ -103,13 +103,13 @@ func main() { return } if *dryRun { - if err := remotewrite.CheckRelabelConfigs(); err != nil { - logger.Fatalf("error when checking relabel configs: %s", err) - } if err := promscrape.CheckConfig(); err != nil { logger.Fatalf("error when checking -promscrape.config: %s", err) } - if err := remotewrite.CheckStreamAggConfigs(); err != nil { + if err := remotewrite.CheckRelabelConfigs(); err != nil { + logger.Fatalf("error when checking relabel configs: %s", err) + } + if err := remotewrite.CheckStreamAggrConfigs(); err != nil { logger.Fatalf("error when checking -remoteWrite.streamAggr.config: %s", err) } logger.Infof("all the configs are ok; exiting with 0 status code") diff --git a/app/vmagent/remotewrite/remotewrite.go b/app/vmagent/remotewrite/remotewrite.go index 84a662d45..1b5390fe9 100644 --- a/app/vmagent/remotewrite/remotewrite.go +++ b/app/vmagent/remotewrite/remotewrite.go @@ -65,6 +65,15 @@ var ( "Excess series are logged and dropped. This can be useful for limiting series cardinality. See https://docs.victoriametrics.com/vmagent.html#cardinality-limiter") maxDailySeries = flag.Int("remoteWrite.maxDailySeries", 0, "The maximum number of unique series vmagent can send to remote storage systems during the last 24 hours. "+ "Excess series are logged and dropped. This can be useful for limiting series churn rate. See https://docs.victoriametrics.com/vmagent.html#cardinality-limiter") + + streamAggrConfig = flagutil.NewArrayString("remoteWrite.streamAggr.config", "Optional path to file with stream aggregation config. "+ + "See https://docs.victoriametrics.com/stream-aggregation.html . "+ + "See also -remoteWrite.streamAggr.keepInput and -remoteWrite.streamAggr.dedupInterval") + streamAggrKeepInput = flagutil.NewArrayBool("remoteWrite.streamAggr.keepInput", "Whether to keep input samples after the aggregation with -remoteWrite.streamAggr.config. "+ + "By default the input is dropped after the aggregation, so only the aggregate data is sent to the -remoteWrite.url. "+ + "See https://docs.victoriametrics.com/stream-aggregation.html") + streamAggrDedupInterval = flagutil.NewArrayDuration("remoteWrite.streamAggr.dedupInterval", "Input samples are de-duplicated with this interval before being aggregated. "+ + "Only the last sample per each time series per each interval is aggregated if the interval is greater than zero") ) var ( @@ -87,9 +96,6 @@ func MultitenancyEnabled() bool { // Contains the current relabelConfigs. var allRelabelConfigs atomic.Value -// Contains the loader for stream aggregation configs. -var saCfgLoader *saConfigsLoader - // maxQueues limits the maximum value for `-remoteWrite.queues`. There is no sense in setting too high value, // since it may lead to high memory usage due to big number of buffers. var maxQueues = cgroup.AvailableCPUs() * 16 @@ -152,15 +158,9 @@ func Init() { logger.Fatalf("cannot load relabel configs: %s", err) } allRelabelConfigs.Store(rcs) - relabelConfigSuccess.Set(1) relabelConfigTimestamp.Set(fasttime.UnixTimestamp()) - saCfgLoader, err = newSaConfigsLoader(*streamAggrConfig) - if err != nil { - logger.Fatalf("cannot load stream aggregation config: %s", err) - } - if len(*remoteWriteURLs) > 0 { rwctxsDefault = newRemoteWriteCtxs(nil, *remoteWriteURLs) } @@ -172,46 +172,31 @@ func Init() { for { select { case <-sighupCh: - case <-stopCh: + case <-configReloaderStopCh: return } - relabelConfigReloads.Inc() - logger.Infof("SIGHUP received; reloading relabel configs pointed by -remoteWrite.relabelConfig and -remoteWrite.urlRelabelConfig") - rcs, err := loadRelabelConfigs() - if err != nil { - relabelConfigReloadErrors.Inc() - relabelConfigSuccess.Set(0) - logger.Errorf("cannot reload relabel configs; preserving the previous configs; error: %s", err) - continue - } - allRelabelConfigs.Store(rcs) - relabelConfigSuccess.Set(1) - relabelConfigTimestamp.Set(fasttime.UnixTimestamp()) - logger.Infof("Successfully reloaded relabel configs") - - logger.Infof("reloading stream agg configs pointed by -remoteWrite.streamAggr.config") - err = saCfgLoader.reloadConfigs() - if err != nil { - logger.Errorf("Cannot reload stream aggregation configs: %s", err) - } - if len(*remoteWriteMultitenantURLs) > 0 { - rwctxsMapLock.Lock() - for _, rwctxs := range rwctxsMap { - for _, rwctx := range rwctxs { - rwctx.reinitStreamAggr() - } - } - rwctxsMapLock.Unlock() - } else { - for _, rwctx := range rwctxsDefault { - rwctx.reinitStreamAggr() - } - } - logger.Infof("Successfully reloaded stream aggregation configs") + reloadRelabelConfigs() + reloadStreamAggrConfigs() } }() } +func reloadRelabelConfigs() { + relabelConfigReloads.Inc() + logger.Infof("reloading relabel configs pointed by -remoteWrite.relabelConfig and -remoteWrite.urlRelabelConfig") + rcs, err := loadRelabelConfigs() + if err != nil { + relabelConfigReloadErrors.Inc() + relabelConfigSuccess.Set(0) + logger.Errorf("cannot reload relabel configs; preserving the previous configs; error: %s", err) + return + } + allRelabelConfigs.Store(rcs) + relabelConfigSuccess.Set(1) + relabelConfigTimestamp.Set(fasttime.UnixTimestamp()) + logger.Infof("successfully reloaded relabel configs") +} + var ( relabelConfigReloads = metrics.NewCounter(`vmagent_relabel_config_reloads_total`) relabelConfigReloadErrors = metrics.NewCounter(`vmagent_relabel_config_reloads_errors_total`) @@ -219,6 +204,24 @@ var ( relabelConfigTimestamp = metrics.NewCounter(`vmagent_relabel_config_last_reload_success_timestamp_seconds`) ) +func reloadStreamAggrConfigs() { + if len(*remoteWriteMultitenantURLs) > 0 { + rwctxsMapLock.Lock() + for _, rwctxs := range rwctxsMap { + reinitStreamAggr(rwctxs) + } + rwctxsMapLock.Unlock() + } else { + reinitStreamAggr(rwctxsDefault) + } +} + +func reinitStreamAggr(rwctxs []*remoteWriteCtx) { + for _, rwctx := range rwctxs { + rwctx.reinitStreamAggr() + } +} + func newRemoteWriteCtxs(at *auth.Token, urls []string) []*remoteWriteCtx { if len(urls) == 0 { logger.Panicf("BUG: urls must be non-empty") @@ -284,14 +287,14 @@ func newRemoteWriteCtxs(at *auth.Token, urls []string) []*remoteWriteCtx { return rwctxs } -var stopCh = make(chan struct{}) +var configReloaderStopCh = make(chan struct{}) var configReloaderWG sync.WaitGroup // Stop stops remotewrite. // // It is expected that nobody calls Push during and after the call to this func. func Stop() { - close(stopCh) + close(configReloaderStopCh) configReloaderWG.Wait() for _, rwctx := range rwctxsDefault { @@ -506,8 +509,7 @@ type remoteWriteCtx struct { fq *persistentqueue.FastQueue c *client - sas *streamaggr.Aggregators - saHash uint64 + sas atomic.Pointer[streamaggr.Aggregators] streamAggrKeepInput bool pss []*pendingSeries @@ -567,17 +569,17 @@ func newRemoteWriteCtx(argIdx int, at *auth.Token, remoteWriteURL *url.URL, maxI } // Initialize sas - saCfg, saHash := saCfgLoader.getCurrentConfig(argIdx) - if len(saCfg) > 0 { - sasFile := streamAggrConfig.GetOptionalArg(argIdx) + sasFile := streamAggrConfig.GetOptionalArg(argIdx) + if sasFile != "" { dedupInterval := streamAggrDedupInterval.GetOptionalArgOrDefault(argIdx, 0) - sas, err := streamaggr.NewAggregators(saCfg, rwctx.pushInternal, dedupInterval) + sas, err := streamaggr.LoadFromFile(sasFile, rwctx.pushInternal, dedupInterval) if err != nil { - logger.Fatalf("cannot initialize stream aggregators from -remoteWrite.streamAggrFile=%q: %s", sasFile, err) + logger.Fatalf("cannot initialize stream aggregators from -remoteWrite.streamAggr.config=%q: %s", sasFile, err) } - rwctx.sas = sas - rwctx.saHash = saHash + rwctx.sas.Store(sas) rwctx.streamAggrKeepInput = streamAggrKeepInput.GetOptionalArg(argIdx) + metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_successful{path=%q}`, sasFile)).Set(1) + metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_success_timestamp_seconds{path=%q}`, sasFile)).Set(fasttime.UnixTimestamp()) } return rwctx @@ -592,8 +594,10 @@ func (rwctx *remoteWriteCtx) MustStop() { rwctx.fq.UnblockAllReaders() rwctx.c.MustStop() rwctx.c = nil - rwctx.sas.MustStop() - rwctx.sas = nil + + sas := rwctx.sas.Swap(nil) + sas.MustStop() + rwctx.fq.MustClose() rwctx.fq = nil @@ -624,8 +628,9 @@ func (rwctx *remoteWriteCtx) Push(tss []prompbmarshal.TimeSeries) { rwctx.rowsPushedAfterRelabel.Add(rowsCount) // Apply stream aggregation if any - rwctx.sas.Push(tss) - if rwctx.sas == nil || rwctx.streamAggrKeepInput { + sas := rwctx.sas.Load() + sas.Push(tss) + if sas == nil || rwctx.streamAggrKeepInput { // Push samples to the remote storage rwctx.pushInternal(tss) } @@ -645,17 +650,33 @@ func (rwctx *remoteWriteCtx) pushInternal(tss []prompbmarshal.TimeSeries) { } func (rwctx *remoteWriteCtx) reinitStreamAggr() { - if rwctx.sas == nil { + sas := rwctx.sas.Load() + if sas == nil { + // There is no stream aggregation for rwctx return } - saCfg, saHash := saCfgLoader.getCurrentConfig(rwctx.idx) - if rwctx.saHash == saHash { + + sasFile := streamAggrConfig.GetOptionalArg(rwctx.idx) + logger.Infof("reloading stream aggregation configs pointed by -remoteWrite.streamAggr.config=%q", sasFile) + metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reloads_total{path=%q}`, sasFile)).Inc() + dedupInterval := streamAggrDedupInterval.GetOptionalArgOrDefault(rwctx.idx, 0) + sasNew, err := streamaggr.LoadFromFile(sasFile, rwctx.pushInternal, dedupInterval) + if err != nil { + metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reloads_errors_total{path=%q}`, sasFile)).Inc() + metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_successful{path=%q}`, sasFile)).Set(0) + logger.Errorf("cannot reload stream aggregation config from -remoteWrite.streamAggr.config=%q; continue using the previously loaded config; error: %s", sasFile, err) return } - if err := rwctx.sas.ReInitConfigs(saCfg); err != nil { - logger.Errorf("Cannot apply stream aggregation configs %d: %s", rwctx.idx, err) + if !sasNew.Equal(sas) { + sasOld := rwctx.sas.Swap(sasNew) + sasOld.MustStop() + logger.Infof("successfully reloaded stream aggregation configs at -remoteWrite.streamAggr.config=%q", sasFile) + } else { + sasNew.MustStop() + logger.Infof("the config at -remoteWrite.streamAggr.config=%q wasn't changed", sasFile) } - rwctx.saHash = saHash + metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_successful{path=%q}`, sasFile)).Set(1) + metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_success_timestamp_seconds{path=%q}`, sasFile)).Set(fasttime.UnixTimestamp()) } var tssRelabelPool = &sync.Pool{ @@ -672,3 +693,20 @@ func getRowsCount(tss []prompbmarshal.TimeSeries) int { } return rowsCount } + +// CheckStreamAggrConfigs checks configs pointed by -remoteWrite.streamAggr.config +func CheckStreamAggrConfigs() error { + pushNoop := func(tss []prompbmarshal.TimeSeries) {} + for idx, sasFile := range *streamAggrConfig { + if sasFile == "" { + continue + } + dedupInterval := streamAggrDedupInterval.GetOptionalArgOrDefault(idx, 0) + sas, err := streamaggr.LoadFromFile(sasFile, pushNoop, dedupInterval) + if err != nil { + return fmt.Errorf("cannot load -remoteWrite.streamAggr.config=%q: %w", sasFile, err) + } + sas.MustStop() + } + return nil +} diff --git a/app/vmagent/remotewrite/streamagg.go b/app/vmagent/remotewrite/streamagg.go deleted file mode 100644 index b56091f53..000000000 --- a/app/vmagent/remotewrite/streamagg.go +++ /dev/null @@ -1,118 +0,0 @@ -package remotewrite - -import ( - "fmt" - "sync/atomic" - - "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/streamaggr" - "github.com/VictoriaMetrics/metrics" -) - -var ( - streamAggrConfig = flagutil.NewArrayString("remoteWrite.streamAggr.config", "Optional path to file with stream aggregation config. "+ - "See https://docs.victoriametrics.com/stream-aggregation.html . "+ - "See also -remoteWrite.streamAggr.keepInput and -remoteWrite.streamAggr.dedupInterval") - streamAggrKeepInput = flagutil.NewArrayBool("remoteWrite.streamAggr.keepInput", "Whether to keep input samples after the aggregation with -remoteWrite.streamAggr.config. "+ - "By default the input is dropped after the aggregation, so only the aggregate data is sent to the -remoteWrite.url. "+ - "See https://docs.victoriametrics.com/stream-aggregation.html") - streamAggrDedupInterval = flagutil.NewArrayDuration("remoteWrite.streamAggr.dedupInterval", "Input samples are de-duplicated with this interval before being aggregated. "+ - "Only the last sample per each time series per each interval is aggregated if the interval is greater than zero") -) - -var ( - saCfgReloads = metrics.NewCounter(`vmagent_streamaggr_config_reloads_total`) - saCfgReloadErr = metrics.NewCounter(`vmagent_streamaggr_config_reloads_errors_total`) - saCfgSuccess = metrics.NewCounter(`vmagent_streamaggr_config_last_reload_successful`) - saCfgTimestamp = metrics.NewCounter(`vmagent_streamaggr_config_last_reload_success_timestamp_seconds`) -) - -// saConfigRules - type alias for unmarshalled stream aggregation config -type saConfigRules = []*streamaggr.Config - -// saConfigsLoader loads stream aggregation configs from the given files. -type saConfigsLoader struct { - files []string - configs atomic.Pointer[[]saConfig] -} - -// newSaConfigsLoader creates new saConfigsLoader for the given config files. -func newSaConfigsLoader(configFiles []string) (*saConfigsLoader, error) { - result := &saConfigsLoader{ - files: configFiles, - } - // Initial load of configs. - if err := result.reloadConfigs(); err != nil { - return nil, err - } - return result, nil -} - -// reloadConfigs reloads stream aggregation configs from the files given in constructor. -func (r *saConfigsLoader) reloadConfigs() error { - // Increment reloads counter if it is not the initial load. - if r.configs.Load() != nil { - saCfgReloads.Inc() - } - - // Load all configs from files. - var configs = make([]saConfig, len(r.files)) - for i, path := range r.files { - if len(path) == 0 { - // Skip empty stream aggregation config. - continue - } - rules, hash, err := streamaggr.LoadConfigsFromFile(path) - if err != nil { - saCfgSuccess.Set(0) - saCfgReloadErr.Inc() - return fmt.Errorf("cannot load stream aggregation config from %q: %w", path, err) - } - configs[i] = saConfig{ - path: path, - hash: hash, - rules: rules, - } - } - - // Update configs. - r.configs.Store(&configs) - - saCfgSuccess.Set(1) - saCfgTimestamp.Set(fasttime.UnixTimestamp()) - return nil -} - -// getCurrentConfig returns the current stream aggregation config with the given idx. -func (r *saConfigsLoader) getCurrentConfig(idx int) (saConfigRules, uint64) { - all := r.configs.Load() - if all == nil { - return nil, 0 - } - cfgs := *all - if len(cfgs) == 0 { - return nil, 0 - } - if idx >= len(cfgs) { - if len(cfgs) == 1 { - cfg := cfgs[0] - return cfg.rules, cfg.hash - } - return nil, 0 - } - cfg := cfgs[idx] - return cfg.rules, cfg.hash -} - -type saConfig struct { - path string - hash uint64 - rules saConfigRules -} - -// CheckStreamAggConfigs checks -remoteWrite.streamAggr.config. -func CheckStreamAggConfigs() error { - _, err := newSaConfigsLoader(*streamAggrConfig) - return err -} diff --git a/app/vminsert/common/insert_ctx.go b/app/vminsert/common/insert_ctx.go index 67ed92d90..b2acad938 100644 --- a/app/vminsert/common/insert_ctx.go +++ b/app/vminsert/common/insert_ctx.go @@ -137,7 +137,8 @@ func (ctx *InsertCtx) ApplyRelabeling() { // FlushBufs flushes buffered rows to the underlying storage. func (ctx *InsertCtx) FlushBufs() error { - if sa != nil && !ctx.skipStreamAggr { + sas := sasGlobal.Load() + if sas != nil && !ctx.skipStreamAggr { ctx.streamAggrCtx.push(ctx.mrs) if !*streamAggrKeepInput { ctx.Reset(0) diff --git a/app/vminsert/common/streamaggr.go b/app/vminsert/common/streamaggr.go index b01132791..6a512c81c 100644 --- a/app/vminsert/common/streamaggr.go +++ b/app/vminsert/common/streamaggr.go @@ -4,6 +4,7 @@ import ( "flag" "fmt" "sync" + "sync/atomic" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmstorage" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" @@ -29,18 +30,31 @@ var ( ) var ( - stopCh = make(chan struct{}) - configReloaderWG sync.WaitGroup + saCfgReloaderStopCh = make(chan struct{}) + saCfgReloaderWG sync.WaitGroup saCfgReloads = metrics.NewCounter(`vminsert_streamagg_config_reloads_total`) saCfgReloadErr = metrics.NewCounter(`vminsert_streamagg_config_reloads_errors_total`) saCfgSuccess = metrics.NewCounter(`vminsert_streamagg_config_last_reload_successful`) saCfgTimestamp = metrics.NewCounter(`vminsert_streamagg_config_last_reload_success_timestamp_seconds`) - sa *streamaggr.Aggregators - saHash uint64 + sasGlobal atomic.Pointer[streamaggr.Aggregators] ) +// CheckStreamAggrConfig checks config pointed by -stramaggr.config +func CheckStreamAggrConfig() error { + if *streamAggrConfig == "" { + return nil + } + pushNoop := func(tss []prompbmarshal.TimeSeries) {} + sas, err := streamaggr.LoadFromFile(*streamAggrConfig, pushNoop, *streamAggrDedupInterval) + if err != nil { + return fmt.Errorf("error when loading -streamAggr.config=%q: %w", *streamAggrConfig, err) + } + sas.MustStop() + return nil +} + // InitStreamAggr must be called after flag.Parse and before using the common package. // // MustStopStreamAggr must be called when stream aggr is no longer needed. @@ -51,45 +65,60 @@ func InitStreamAggr() { sighupCh := procutil.NewSighupChan() - configs, hash, err := streamaggr.LoadConfigsFromFile(*streamAggrConfig) + sas, err := streamaggr.LoadFromFile(*streamAggrConfig, pushAggregateSeries, *streamAggrDedupInterval) if err != nil { logger.Fatalf("cannot load -streamAggr.config=%q: %s", *streamAggrConfig, err) } - a, err := streamaggr.NewAggregators(configs, pushAggregateSeries, *streamAggrDedupInterval) - if err != nil { - logger.Fatalf("cannot init -streamAggr.config=%q: %s", *streamAggrConfig, err) - } - sa = a - saHash = hash + sasGlobal.Store(sas) saCfgSuccess.Set(1) saCfgTimestamp.Set(fasttime.UnixTimestamp()) // Start config reloader. - configReloaderWG.Add(1) + saCfgReloaderWG.Add(1) go func() { - defer configReloaderWG.Done() + defer saCfgReloaderWG.Done() for { select { case <-sighupCh: - case <-stopCh: + case <-saCfgReloaderStopCh: return } - if err := reloadSaConfig(); err != nil { - logger.Errorf("cannot reload -streamAggr.config=%q: %s", *streamAggrConfig, err) - continue - } + reloadStreamAggrConfig() } }() } +func reloadStreamAggrConfig() { + logger.Infof("reloading -streamAggr.config=%q", *streamAggrConfig) + saCfgReloads.Inc() + + sasNew, err := streamaggr.LoadFromFile(*streamAggrConfig, pushAggregateSeries, *streamAggrDedupInterval) + if err != nil { + saCfgSuccess.Set(0) + saCfgReloadErr.Inc() + logger.Errorf("cannot reload -streamAggr.config=%q: use the previously loaded config; error: %s", *streamAggrConfig, err) + return + } + sas := sasGlobal.Load() + if !sasNew.Equal(sas) { + sasOld := sasGlobal.Swap(sasNew) + sasOld.MustStop() + logger.Infof("successfully reloaded stream aggregation config at -streamAggr.config=%q", *streamAggrConfig) + } else { + logger.Infof("nothing changed in -streamAggr.config=%q", *streamAggrConfig) + sasNew.MustStop() + } + saCfgSuccess.Set(1) + saCfgTimestamp.Set(fasttime.UnixTimestamp()) +} + // MustStopStreamAggr stops stream aggregators. func MustStopStreamAggr() { - close(stopCh) + close(saCfgReloaderStopCh) + saCfgReloaderWG.Wait() - sa.MustStop() - sa = nil - - configReloaderWG.Wait() + sas := sasGlobal.Swap(nil) + sas.MustStop() } type streamAggrCtx struct { @@ -109,6 +138,7 @@ func (ctx *streamAggrCtx) push(mrs []storage.MetricRow) { ts := &tss[0] labels := ts.Labels samples := ts.Samples + sas := sasGlobal.Load() for _, mr := range mrs { if err := mn.UnmarshalRaw(mr.MetricNameRaw); err != nil { logger.Panicf("BUG: cannot unmarshal recently marshaled MetricName: %s", err) @@ -133,7 +163,7 @@ func (ctx *streamAggrCtx) push(mrs []storage.MetricRow) { ts.Labels = labels ts.Samples = samples - sa.Push(tss) + sas.Push(tss) } } @@ -164,33 +194,3 @@ func pushAggregateSeries(tss []prompbmarshal.TimeSeries) { logger.Errorf("cannot flush aggregate series: %s", err) } } - -func reloadSaConfig() error { - saCfgReloads.Inc() - - cfgs, hash, err := streamaggr.LoadConfigsFromFile(*streamAggrConfig) - if err != nil { - saCfgSuccess.Set(0) - saCfgReloadErr.Inc() - return fmt.Errorf("cannot reload -streamAggr.config=%q: %w", *streamAggrConfig, err) - } - - if saHash == hash { - return nil - } - - if err = sa.ReInitConfigs(cfgs); err != nil { - saCfgSuccess.Set(0) - saCfgReloadErr.Inc() - return fmt.Errorf("cannot apply new -streamAggr.config=%q: %w", *streamAggrConfig, err) - } - - saHash = hash - - saCfgSuccess.Set(1) - saCfgTimestamp.Set(fasttime.UnixTimestamp()) - - logger.Infof("Successfully reloaded stream aggregation config") - - return nil -} diff --git a/app/vminsert/relabel/relabel.go b/app/vminsert/relabel/relabel.go index 601eb4968..054b8b67f 100644 --- a/app/vminsert/relabel/relabel.go +++ b/app/vminsert/relabel/relabel.go @@ -71,6 +71,12 @@ var ( var pcsGlobal atomic.Value +// CheckRelabelConfig checks config pointed by -relabelConfig +func CheckRelabelConfig() error { + _, err := loadRelabelConfig() + return err +} + func loadRelabelConfig() (*promrelabel.ParsedConfigs, error) { if len(*relabelConfig) == 0 { return nil, nil diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index ede415cbb..452ed3364 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -26,7 +26,8 @@ created by v1.90.0 or newer versions. The solution is to upgrade to v1.90.0 or n * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add `-kafka.consumer.topic.concurrency` command-line flag. It controls the number of Kafka consumer workers to use by `vmagent`. It should eliminate the need to start multiple `vmagent` instances to improve data transfer rate. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1957). * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add support for [Kafka producer and consumer](https://docs.victoriametrics.com/vmagent.html#kafka-integration) on `arm64` machines. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2271). * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): delete unused buffered data at `-remoteWrite.tmpDataPath` directory when there is no matching `-remoteWrite.url` to send this data to. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4014). -* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add support for hot reload of stream aggregation configs. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3639). +* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add the ability for hot reloading of [stream aggregation](https://docs.victoriametrics.com/stream-aggregation.html) configs. See [these docs](https://docs.victoriametrics.com/stream-aggregation.html#configuration-update) and [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3639). +* FEATURE: check the contents of `-relabelConfig` and `-streamAggr.config` files additionally to `-promscrape.config` when single-node VictoriaMetrics runs with `-dryRun` command-line flag. This aligns the behaviour of single-node VictoriaMetrics with [vmagent](https://docs.victoriametrics.com/vmagent.html) behaviour for `-dryRun` command-line flag. * FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): automatically draw a heatmap graph when the query selects a single [histogram](https://docs.victoriametrics.com/keyConcepts.html#histogram). This simplifies analyzing histograms. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3384). * FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): add support for drag'n'drop and paste from clipboard in the "Trace analyzer" page. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/3971). * FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): hide messages longer than 3 lines in the trace. You can view the full message by clicking on the `show more` button. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/3971). @@ -138,6 +139,19 @@ Released at 2023-02-24 * BUGFIX: properly parse timestamps in milliseconds when [ingesting data via OpenTSDB telnet put protocol](https://docs.victoriametrics.com/#sending-data-via-telnet-put-protocol). Previously timestamps in milliseconds were mistakenly multiplied by 1000. Thanks to @Droxenator for the [pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/3810). * BUGFIX: [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html): do not add extrapolated points outside the real points when using [interpolate()](https://docs.victoriametrics.com/MetricsQL.html#interpolate) function. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3816). +## [v1.87.4](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.87.4) + +Released at 2023-03-25 + +**v1.87.x is a line of LTS releases (e.g. long-time support). It contains important up-to-date bugfixes. +The v1.87.x line will be supported for at least 12 months since [v1.87.0](https://docs.victoriametrics.com/CHANGELOG.html#v1870) release** + +* BUGFIX: prevent from slow [snapshot creating](https://docs.victoriametrics.com/#how-to-work-with-snapshots) under high data ingestion rate. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3551). +* BUGFIX: [vmauth](https://docs.victoriametrics.com/vmauth.html): suppress [proxy protocol](https://www.haproxy.org/download/2.3/doc/proxy-protocol.txt) parsing errors in case of `EOF`. Usually, the error is caused by health checks and is not a sign of an actual error. +* BUGFIX: [vmbackup](https://docs.victoriametrics.com/vmbackup.html): fix snapshot not being deleted in case of error during backup. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2055). +* BUGFIX: allow using dashes and dots in environment variables names referred in config files via `%{ENV-VAR.SYNTAX}`. See [these docs](https://docs.victoriametrics.com/#environment-variables) and [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3999). +* BUGFIX: return back query performance scalability on hosts with big number of CPU cores. The scalability has been reduced in [v1.86.0](https://docs.victoriametrics.com/CHANGELOG.html#v1860). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3966). + ## [v1.87.3](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.87.3) Released at 2023-03-12 diff --git a/docs/README.md b/docs/README.md index 52ff491d1..667a87195 100644 --- a/docs/README.md +++ b/docs/README.md @@ -2194,7 +2194,7 @@ Pass `-help` to VictoriaMetrics in order to see the list of supported command-li Comma-separated downsampling periods in the format 'offset:period'. For example, '30d:10m' instructs to leave a single sample per 10 minutes for samples older than 30 days. See https://docs.victoriametrics.com/#downsampling for details. This flag is available only in VictoriaMetrics enterprise. See https://docs.victoriametrics.com/enterprise.html Supports an array of values separated by comma or specified via multiple flags. -dryRun - Whether to check only -promscrape.config and then exit. Unknown config entries aren't allowed in -promscrape.config by default. This can be changed with -promscrape.config.strictParse=false command-line flag + Whether to check config files without running VictoriaMetrics. The following config files are checked: -promscrape.config, -relabelConfig and -streamAggr.config. Unknown config entries aren't allowed in -promscrape.config by default. This can be changed with -promscrape.config.strictParse=false command-line flag -enableTCP6 Whether to enable IPv6 for listening and dialing. By default only IPv4 TCP and UDP is used -envflag.enable diff --git a/docs/Single-server-VictoriaMetrics.md b/docs/Single-server-VictoriaMetrics.md index a9f13a957..c3b3aef06 100644 --- a/docs/Single-server-VictoriaMetrics.md +++ b/docs/Single-server-VictoriaMetrics.md @@ -2197,7 +2197,7 @@ Pass `-help` to VictoriaMetrics in order to see the list of supported command-li Comma-separated downsampling periods in the format 'offset:period'. For example, '30d:10m' instructs to leave a single sample per 10 minutes for samples older than 30 days. See https://docs.victoriametrics.com/#downsampling for details. This flag is available only in VictoriaMetrics enterprise. See https://docs.victoriametrics.com/enterprise.html Supports an array of values separated by comma or specified via multiple flags. -dryRun - Whether to check only -promscrape.config and then exit. Unknown config entries aren't allowed in -promscrape.config by default. This can be changed with -promscrape.config.strictParse=false command-line flag + Whether to check config files without running VictoriaMetrics. The following config files are checked: -promscrape.config, -relabelConfig and -streamAggr.config. Unknown config entries aren't allowed in -promscrape.config by default. This can be changed with -promscrape.config.strictParse=false command-line flag -enableTCP6 Whether to enable IPv6 for listening and dialing. By default only IPv4 TCP and UDP is used -envflag.enable diff --git a/docs/stream-aggregation.md b/docs/stream-aggregation.md index df983c6ba..1ad21511e 100644 --- a/docs/stream-aggregation.md +++ b/docs/stream-aggregation.md @@ -509,7 +509,7 @@ at [single-node VictoriaMetrics](https://docs.victoriametrics.com/Single-server- # match is an optional filter for incoming samples to aggregate. # It can contain arbitrary Prometheus series selector # according to https://docs.victoriametrics.com/keyConcepts.html#filtering . - # If match is missing, then all the incoming samples are aggregated. + # If match isn't set, then all the incoming samples are aggregated. - match: 'http_request_duration_seconds_bucket{env=~"prod|staging"}' # interval is the interval for the aggregation. @@ -548,17 +548,13 @@ per each specified config entry. ### Configuration update -[vmagent](https://docs.victoriametrics.com/vmagent.html) and -[single-node VictoriaMetrics](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html) support two -approaches for reloading stream aggregation configs from updated config files such as -`-remoteWrite.streamAggr.config` and `-streamAggr.config` without restart. +[vmagent](https://docs.victoriametrics.com/vmagent.html) and [single-node VictoriaMetrics](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html) +support the following approaches for hot reloading stream aggregation configs from `-remoteWrite.streamAggr.config` and `-streamAggr.config`: -* Sending `SIGHUP` signal to `vmagent` process: +* By sending `SIGHUP` signal to `vmagent` or `victoria-metrics` process: ```console kill -SIGHUP `pidof vmagent` ``` -* Sending HTTP request to `/-/reload` endpoint (e.g. `http://vmagent:8429/-/reload`). - -It will reset the aggregation state only for changed rules in the configuration files. +* By sending HTTP request to `/-/reload` endpoint (e.g. `http://vmagent:8429/-/reload` or `http://victoria-metrics:8428/-/reload). diff --git a/docs/vmagent.md b/docs/vmagent.md index 36ca3b697..4b640355e 100644 --- a/docs/vmagent.md +++ b/docs/vmagent.md @@ -1190,7 +1190,7 @@ See the docs at https://docs.victoriametrics.com/vmagent.html . -denyQueryTracing Whether to disable the ability to trace queries. See https://docs.victoriametrics.com/#query-tracing -dryRun - Whether to check only config files without running vmagent. The following files are checked: -promscrape.config, -remoteWrite.relabelConfig, -remoteWrite.urlRelabelConfig . Unknown config entries aren't allowed in -promscrape.config by default. This can be changed by passing -promscrape.config.strictParse=false command-line flag + Whether to check config files without running vmagent. The following files are checked: -promscrape.config, -remoteWrite.relabelConfig, -remoteWrite.urlRelabelConfig, -remoteWrite.streamAggr.config . Unknown config entries aren't allowed in -promscrape.config by default. This can be changed by passing -promscrape.config.strictParse=false command-line flag -enableTCP6 Whether to enable IPv6 for listening and dialing. By default only IPv4 TCP and UDP is used -envflag.enable diff --git a/lib/promscrape/scraper.go b/lib/promscrape/scraper.go index e191b1a97..661d9ebe5 100644 --- a/lib/promscrape/scraper.go +++ b/lib/promscrape/scraper.go @@ -50,7 +50,7 @@ var ( // CheckConfig checks -promscrape.config for errors and unsupported options. func CheckConfig() error { if *promscrapeConfigFile == "" { - return fmt.Errorf("missing -promscrape.config option") + return nil } _, _, err := loadConfig(*promscrapeConfigFile) return err diff --git a/lib/streamaggr/streamaggr.go b/lib/streamaggr/streamaggr.go index c177544f3..03a0e0210 100644 --- a/lib/streamaggr/streamaggr.go +++ b/lib/streamaggr/streamaggr.go @@ -8,7 +8,6 @@ import ( "strconv" "strings" "sync" - "sync/atomic" "time" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" @@ -19,7 +18,6 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils" - "github.com/cespare/xxhash/v2" "gopkg.in/yaml.v2" ) @@ -39,40 +37,22 @@ var supportedOutputs = []string{ "quantiles(phi1, ..., phiN)", } -// ParseConfig loads array of stream aggregation configs from the given path. -func ParseConfig(data []byte) ([]*Config, uint64, error) { - var cfgs []*Config - if err := yaml.UnmarshalStrict(data, &cfgs); err != nil { - return nil, 0, fmt.Errorf("cannot parse stream aggregation config %w", err) - } - return cfgs, xxhash.Sum64(data), nil -} - -// LoadConfigsFromFile loads array of stream aggregation configs from the given path. -func LoadConfigsFromFile(path string) ([]*Config, uint64, error) { - data, err := fs.ReadFileOrHTTP(path) - if err != nil { - return nil, 0, fmt.Errorf("cannot load stream aggregation config from %q: %w", path, err) - } - return ParseConfig(data) -} - -// LoadAggregatorsFromFile loads Aggregators from the given path and uses the given pushFunc for pushing the aggregated data. +// LoadFromFile loads Aggregators from the given path and uses the given pushFunc for pushing the aggregated data. // // If dedupInterval > 0, then the input samples are de-duplicated before being aggregated, // e.g. only the last sample per each time series per each dedupInterval is aggregated. // // The returned Aggregators must be stopped with MustStop() when no longer needed. -func LoadAggregatorsFromFile(path string, pushFunc PushFunc, dedupInterval time.Duration) (*Aggregators, uint64, error) { - cfgs, configHash, err := LoadConfigsFromFile(path) +func LoadFromFile(path string, pushFunc PushFunc, dedupInterval time.Duration) (*Aggregators, error) { + data, err := fs.ReadFileOrHTTP(path) if err != nil { - return nil, 0, fmt.Errorf("cannot load stream aggregation config: %w", err) + return nil, fmt.Errorf("cannot load aggregators: %w", err) } - as, err := NewAggregators(cfgs, pushFunc, dedupInterval) + as, err := NewAggregatorsFromData(data, pushFunc, dedupInterval) if err != nil { - return nil, 0, fmt.Errorf("cannot initialize aggregators from %q: %w", path, err) + return nil, fmt.Errorf("cannot initialize aggregators from %q: %w", path, err) } - return as, configHash, nil + return as, nil } // NewAggregatorsFromData initializes Aggregators from the given data and uses the given pushFunc for pushing the aggregated data. @@ -84,7 +64,7 @@ func LoadAggregatorsFromFile(path string, pushFunc PushFunc, dedupInterval time. func NewAggregatorsFromData(data []byte, pushFunc PushFunc, dedupInterval time.Duration) (*Aggregators, error) { var cfgs []*Config if err := yaml.UnmarshalStrict(data, &cfgs); err != nil { - return nil, err + return nil, fmt.Errorf("cannot parse stream aggregation config: %w", err) } return NewAggregators(cfgs, pushFunc, dedupInterval) } @@ -148,22 +128,13 @@ type Config struct { OutputRelabelConfigs []promrelabel.RelabelConfig `yaml:"output_relabel_configs,omitempty"` } -func (cfg *Config) hash() (uint64, error) { - if cfg == nil { - return 0, nil - } - data, err := json.Marshal(cfg) - if err != nil { - return 0, fmt.Errorf("cannot marshal stream aggregation rule %+v: %w", cfg, err) - } - return xxhash.Sum64(data), nil -} - // Aggregators aggregates metrics passed to Push and calls pushFunc for aggregate data. type Aggregators struct { - as atomic.Pointer[[]*aggregator] - pushFunc PushFunc - dedupInterval time.Duration + as []*aggregator + + // configData contains marshaled configs passed to NewAggregators(). + // It is used in Equal() for comparing Aggregators. + configData []byte } // NewAggregators creates Aggregators from the given cfgs. @@ -182,17 +153,22 @@ func NewAggregators(cfgs []*Config, pushFunc PushFunc, dedupInterval time.Durati for i, cfg := range cfgs { a, err := newAggregator(cfg, pushFunc, dedupInterval) if err != nil { + // Stop already initialized aggregators before returning the error. + for _, a := range as[:i] { + a.MustStop() + } return nil, fmt.Errorf("cannot initialize aggregator #%d: %w", i, err) } as[i] = a } - result := &Aggregators{ - pushFunc: pushFunc, - dedupInterval: dedupInterval, + configData, err := json.Marshal(cfgs) + if err != nil { + logger.Panicf("BUG: cannot marshal the provided configs: %s", err) } - result.as.Store(&as) - - return result, nil + return &Aggregators{ + as: as, + configData: configData, + }, nil } // MustStop stops a. @@ -200,84 +176,29 @@ func (a *Aggregators) MustStop() { if a == nil { return } - for _, aggr := range *a.as.Load() { + for _, aggr := range a.as { aggr.MustStop() } } +// Equal returns true if a and b are initialized from identical configs. +func (a *Aggregators) Equal(b *Aggregators) bool { + if a == nil || b == nil { + return a == nil && b == nil + } + return string(a.configData) == string(b.configData) +} + // Push pushes tss to a. func (a *Aggregators) Push(tss []prompbmarshal.TimeSeries) { if a == nil { return } - for _, aggr := range *a.as.Load() { + for _, aggr := range a.as { aggr.Push(tss) } } -// ReInitConfigs reinits state of Aggregators a with the given new stream aggregation config -func (a *Aggregators) ReInitConfigs(cfgs []*Config) error { - if a == nil { - return nil - } - - keys := make(map[uint64]struct{}) // set of all keys (configs and aggregators) - cfgsMap := make(map[uint64]*Config) // map of config keys to their indices in cfgs - aggrsMap := make(map[uint64]*aggregator) // map of aggregator keys to their indices in a.as - - for _, cfg := range cfgs { - key, err := cfg.hash() - if err != nil { - return fmt.Errorf("unable to calculate hash for config '%+v': %w", cfg, err) - } - keys[key] = struct{}{} - cfgsMap[key] = cfg - } - for _, aggr := range *a.as.Load() { - keys[aggr.cfgHash] = struct{}{} - aggrsMap[aggr.cfgHash] = aggr - } - - asNew := make([]*aggregator, 0, len(aggrsMap)) - asDel := make([]*aggregator, 0, len(aggrsMap)) - for key := range keys { - cfg, hasCfg := cfgsMap[key] - agg, hasAggr := aggrsMap[key] - - // if config for aggregator was changed or removed - // then we need to stop aggregator and remove it - if !hasCfg && hasAggr { - asDel = append(asDel, agg) - continue - } - - // if there is no aggregator for config (new config), - // then we need to create it - if hasCfg && !hasAggr { - newAgg, err := newAggregator(cfg, a.pushFunc, a.dedupInterval) - if err != nil { - return fmt.Errorf("cannot initialize aggregator for config '%+v': %w", cfg, err) - } - asNew = append(asNew, newAgg) - continue - } - - // if aggregator config was not changed, then we can just keep it - if hasCfg && hasAggr { - asNew = append(asNew, agg) - } - } - - // Atomically replace aggregators array. - a.as.Store(&asNew) - // and stop old aggregators - for _, aggr := range asDel { - aggr.MustStop() - } - - return nil -} - // aggregator aggregates input series according to the config passed to NewAggregator type aggregator struct { match *promrelabel.IfExpression @@ -295,7 +216,6 @@ type aggregator struct { // aggrStates contains aggregate states for the given outputs aggrStates []aggrState - hasState atomic.Bool pushFunc PushFunc @@ -304,8 +224,7 @@ type aggregator struct { // It contains the interval, labels in (by, without), plus output name. // For example, foo_bar metric name is transformed to foo_bar:1m_by_job // for `interval: 1m`, `by: [job]` - suffix string - cfgHash uint64 + suffix string wg sync.WaitGroup stopCh chan struct{} @@ -433,11 +352,6 @@ func newAggregator(cfg *Config, pushFunc PushFunc, dedupInterval time.Duration) dedupAggr = newLastAggrState() } - cfgHash, err := cfg.hash() - if err != nil { - return nil, fmt.Errorf("cannot calculate config hash for config %+v: %w", cfg, err) - } - // initialize the aggregator a := &aggregator{ match: cfg.Match, @@ -453,8 +367,7 @@ func newAggregator(cfg *Config, pushFunc PushFunc, dedupInterval time.Duration) aggrStates: aggrStates, pushFunc: pushFunc, - suffix: suffix, - cfgHash: cfgHash, + suffix: suffix, stopCh: make(chan struct{}), } @@ -521,8 +434,6 @@ func (a *aggregator) dedupFlush() { } a.dedupAggr.appendSeriesForFlush(ctx) a.push(ctx.tss) - - a.hasState.Store(false) } func (a *aggregator) flush() { @@ -552,8 +463,6 @@ func (a *aggregator) flush() { // Push the output metrics. a.pushFunc(tss) } - - a.hasState.Store(false) } // MustStop stops the aggregator. @@ -561,26 +470,19 @@ func (a *aggregator) flush() { // The aggregator stops pushing the aggregated metrics after this call. func (a *aggregator) MustStop() { close(a.stopCh) - - if a.hasState.Load() { - if a.dedupAggr != nil { - flushConcurrencyCh <- struct{}{} - a.dedupFlush() - <-flushConcurrencyCh - } - - flushConcurrencyCh <- struct{}{} - a.flush() - <-flushConcurrencyCh - } - a.wg.Wait() + + // Flush the remaining data from the last interval if needed. + flushConcurrencyCh <- struct{}{} + if a.dedupAggr != nil { + a.dedupFlush() + } + a.flush() + <-flushConcurrencyCh } // Push pushes tss to a. func (a *aggregator) Push(tss []prompbmarshal.TimeSeries) { - a.hasState.Store(true) - if a.dedupAggr == nil { a.push(tss) return diff --git a/lib/streamaggr/streamaggr_test.go b/lib/streamaggr/streamaggr_test.go index 096830696..57c74aa8e 100644 --- a/lib/streamaggr/streamaggr_test.go +++ b/lib/streamaggr/streamaggr_test.go @@ -118,6 +118,45 @@ func TestAggregatorsFailure(t *testing.T) { `) } +func TestAggregatorsEqual(t *testing.T) { + f := func(a, b string, expectedResult bool) { + t.Helper() + + pushFunc := func(tss []prompbmarshal.TimeSeries) {} + aa, err := NewAggregatorsFromData([]byte(a), pushFunc, 0) + if err != nil { + t.Fatalf("cannot initialize aggregators: %s", err) + } + ab, err := NewAggregatorsFromData([]byte(b), pushFunc, 0) + if err != nil { + t.Fatalf("cannot initialize aggregators: %s", err) + } + result := aa.Equal(ab) + if result != expectedResult { + t.Fatalf("unexpected result; got %v; want %v", result, expectedResult) + } + } + f("", "", true) + f(` +- outputs: [total] + interval: 5m +`, ``, false) + f(` +- outputs: [total] + interval: 5m +`, ` +- outputs: [total] + interval: 5m +`, true) + f(` +- outputs: [total] + interval: 3m +`, ` +- outputs: [total] + interval: 5m +`, false) +} + func TestAggregatorsSuccess(t *testing.T) { f := func(config, inputMetrics, outputMetricsExpected string) { t.Helper() @@ -145,11 +184,6 @@ func TestAggregatorsSuccess(t *testing.T) { // Push the inputMetrics to Aggregators tssInput := mustParsePromMetrics(inputMetrics) a.Push(tssInput) - if a != nil { - for _, aggr := range *a.as.Load() { - aggr.flush() - } - } a.MustStop() // Verify the tssOutput contains the expected metrics @@ -671,7 +705,7 @@ func TestAggregatorsWithDedupInterval(t *testing.T) { tssInput := mustParsePromMetrics(inputMetrics) a.Push(tssInput) if a != nil { - for _, aggr := range *a.as.Load() { + for _, aggr := range a.as { aggr.dedupFlush() aggr.flush() } @@ -719,106 +753,6 @@ foo:1m_sum_samples{baz="qwe"} 10 `) } -func TestAggregatorsReinit(t *testing.T) { - f := func(config, newConfig, inputMetrics, outputMetricsExpected string) { - t.Helper() - - // Initialize Aggregators - var tssOutput []prompbmarshal.TimeSeries - var tssOutputLock sync.Mutex - pushFunc := func(tss []prompbmarshal.TimeSeries) { - tssOutputLock.Lock() - for _, ts := range tss { - labelsCopy := append([]prompbmarshal.Label{}, ts.Labels...) - samplesCopy := append([]prompbmarshal.Sample{}, ts.Samples...) - tssOutput = append(tssOutput, prompbmarshal.TimeSeries{ - Labels: labelsCopy, - Samples: samplesCopy, - }) - } - tssOutputLock.Unlock() - } - - a, err := NewAggregatorsFromData([]byte(config), pushFunc, 0) - if err != nil { - t.Fatalf("cannot initialize aggregators: %s", err) - } - - // Push the inputMetrics to Aggregators - tssInput := mustParsePromMetrics(inputMetrics) - a.Push(tssInput) - - // Reinitialize Aggregators - nc, _, err := ParseConfig([]byte(newConfig)) - if err != nil { - t.Fatalf("cannot parse new config: %s", err) - } - err = a.ReInitConfigs(nc) - if err != nil { - t.Fatalf("cannot reinit aggregators: %s", err) - } - - // Push the inputMetrics to Aggregators - a.Push(tssInput) - if a != nil { - for _, aggr := range *a.as.Load() { - aggr.flush() - } - } - - a.MustStop() - - // Verify the tssOutput contains the expected metrics - tsStrings := make([]string, len(tssOutput)) - for i, ts := range tssOutput { - tsStrings[i] = timeSeriesToString(ts) - } - sort.Strings(tsStrings) - outputMetrics := strings.Join(tsStrings, "") - if outputMetrics != outputMetricsExpected { - t.Fatalf("unexpected output metrics;\ngot\n%s\nwant\n%s", outputMetrics, outputMetricsExpected) - } - } - - f(` -- interval: 1m - outputs: [count_samples] -`, ` -- interval: 1m - outputs: [sum_samples] -`, ` -foo 123 -bar 567 -foo 234 -`, `bar:1m_count_samples 1 -bar:1m_sum_samples 567 -foo:1m_count_samples 2 -foo:1m_sum_samples 357 -`) - - f(` -- interval: 1m - outputs: [total] -- interval: 2m - outputs: [count_samples] -`, ` -- interval: 1m - outputs: [sum_samples] -- interval: 2m - outputs: [count_samples] -`, ` -foo 123 -bar 567 -foo 234 -`, `bar:1m_sum_samples 567 -bar:1m_total 0 -bar:2m_count_samples 2 -foo:1m_sum_samples 357 -foo:1m_total 111 -foo:2m_count_samples 4 -`) -} - func timeSeriesToString(ts prompbmarshal.TimeSeries) string { labelsString := promrelabel.LabelsToString(ts.Labels) if len(ts.Samples) != 1 {