From 8c14d176941f4686a2bb570eb3b1537b2569815d Mon Sep 17 00:00:00 2001
From: Alexander Marshalov <_@marshalov.org>
Date: Wed, 29 Mar 2023 18:05:58 +0200
Subject: [PATCH] added hot reload support for stream aggregation configs
 (#3969) (#3970)

added hot reload support for stream aggregation configs (#3969)

Signed-off-by: Alexander Marshalov <_@marshalov.org>
---
 app/vmagent/main.go                    |   5 +-
 app/vmagent/remotewrite/remotewrite.go |  83 +++++++++----
 app/vmagent/remotewrite/streamagg.go   | 118 ++++++++++++++++++
 docs/CHANGELOG.md                      |  14 +--
 docs/stream-aggregation.md             |  17 +++
 docs/vmagent.md                        |   2 +-
 lib/streamaggr/streamaggr.go           | 159 ++++++++++++++++++++++---
 lib/streamaggr/streamaggr_test.go      | 104 +++++++++++++++-
 8 files changed, 445 insertions(+), 57 deletions(-)
 create mode 100644 app/vmagent/remotewrite/streamagg.go

diff --git a/app/vmagent/main.go b/app/vmagent/main.go
index 9c4a6adda1..7c74869355 100644
--- a/app/vmagent/main.go
+++ b/app/vmagent/main.go
@@ -68,7 +68,7 @@ var (
 		"at -opentsdbHTTPListenAddr . See https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt")
 	configAuthKey = flag.String("configAuthKey", "", "Authorization key for accessing /config page. It must be passed via authKey query arg")
 	dryRun        = flag.Bool("dryRun", false, "Whether to check only config files without running vmagent. The following files are checked: "+
-		"-promscrape.config, -remoteWrite.relabelConfig, -remoteWrite.urlRelabelConfig . "+
+		"-promscrape.config, -remoteWrite.relabelConfig, -remoteWrite.urlRelabelConfig, -remoteWrite.streamAggr.config . "+
 		"Unknown config entries aren't allowed in -promscrape.config by default. This can be changed by passing -promscrape.config.strictParse=false command-line flag")
 )
 
@@ -109,6 +109,9 @@ func main() {
 		if err := promscrape.CheckConfig(); err != nil {
 			logger.Fatalf("error when checking -promscrape.config: %s", err)
 		}
+		if err := remotewrite.CheckStreamAggConfigs(); err != nil {
+			logger.Fatalf("error when checking -remoteWrite.streamAggr.config: %s", err)
+		}
 		logger.Infof("all the configs are ok; exiting with 0 status code")
 		return
 	}
diff --git a/app/vmagent/remotewrite/remotewrite.go b/app/vmagent/remotewrite/remotewrite.go
index 1ffa377b07..84a662d45c 100644
--- a/app/vmagent/remotewrite/remotewrite.go
+++ b/app/vmagent/remotewrite/remotewrite.go
@@ -65,15 +65,6 @@ var (
 		"Excess series are logged and dropped. This can be useful for limiting series cardinality. See https://docs.victoriametrics.com/vmagent.html#cardinality-limiter")
 	maxDailySeries = flag.Int("remoteWrite.maxDailySeries", 0, "The maximum number of unique series vmagent can send to remote storage systems during the last 24 hours. "+
 		"Excess series are logged and dropped. This can be useful for limiting series churn rate. See https://docs.victoriametrics.com/vmagent.html#cardinality-limiter")
-
-	streamAggrConfig = flagutil.NewArrayString("remoteWrite.streamAggr.config", "Optional path to file with stream aggregation config. "+
-		"See https://docs.victoriametrics.com/stream-aggregation.html . "+
-		"See also -remoteWrite.streamAggr.keepInput and -remoteWrite.streamAggr.dedupInterval")
-	streamAggrKeepInput = flagutil.NewArrayBool("remoteWrite.streamAggr.keepInput", "Whether to keep input samples after the aggregation with -remoteWrite.streamAggr.config. "+
-		"By default the input is dropped after the aggregation, so only the aggregate data is sent to the -remoteWrite.url. "+
-		"See https://docs.victoriametrics.com/stream-aggregation.html")
-	streamAggrDedupInterval = flagutil.NewArrayDuration("remoteWrite.streamAggr.dedupInterval", "Input samples are de-duplicated with this interval before being aggregated. "+
-		"Only the last sample per each time series per each interval is aggregated if the interval is greater than zero")
 )
 
 var (
@@ -96,6 +87,9 @@ func MultitenancyEnabled() bool {
 // Contains the current relabelConfigs.
 var allRelabelConfigs atomic.Value
 
+// Contains the loader for stream aggregation configs.
+var saCfgLoader *saConfigsLoader
+
 // maxQueues limits the maximum value for `-remoteWrite.queues`. There is no sense in setting too high value,
 // since it may lead to high memory usage due to big number of buffers.
 var maxQueues = cgroup.AvailableCPUs() * 16
@@ -159,8 +153,13 @@ func Init() {
 	}
 	allRelabelConfigs.Store(rcs)
 
-	configSuccess.Set(1)
-	configTimestamp.Set(fasttime.UnixTimestamp())
+	relabelConfigSuccess.Set(1)
+	relabelConfigTimestamp.Set(fasttime.UnixTimestamp())
+
+	saCfgLoader, err = newSaConfigsLoader(*streamAggrConfig)
+	if err != nil {
+		logger.Fatalf("cannot load stream aggregation config: %s", err)
+	}
 
 	if len(*remoteWriteURLs) > 0 {
 		rwctxsDefault = newRemoteWriteCtxs(nil, *remoteWriteURLs)
@@ -176,29 +175,48 @@ func Init() {
 			case <-stopCh:
 				return
 			}
-			configReloads.Inc()
+			relabelConfigReloads.Inc()
 			logger.Infof("SIGHUP received; reloading relabel configs pointed by -remoteWrite.relabelConfig and -remoteWrite.urlRelabelConfig")
 			rcs, err := loadRelabelConfigs()
 			if err != nil {
-				configReloadErrors.Inc()
-				configSuccess.Set(0)
+				relabelConfigReloadErrors.Inc()
+				relabelConfigSuccess.Set(0)
 				logger.Errorf("cannot reload relabel configs; preserving the previous configs; error: %s", err)
 				continue
 			}
-
 			allRelabelConfigs.Store(rcs)
-			configSuccess.Set(1)
-			configTimestamp.Set(fasttime.UnixTimestamp())
+			relabelConfigSuccess.Set(1)
+			relabelConfigTimestamp.Set(fasttime.UnixTimestamp())
 			logger.Infof("Successfully reloaded relabel configs")
+
+			logger.Infof("reloading stream agg configs pointed by -remoteWrite.streamAggr.config")
+			err = saCfgLoader.reloadConfigs()
+			if err != nil {
+				logger.Errorf("Cannot reload stream aggregation configs: %s", err)
+			}
+			if len(*remoteWriteMultitenantURLs) > 0 {
+				rwctxsMapLock.Lock()
+				for _, rwctxs := range rwctxsMap {
+					for _, rwctx := range rwctxs {
+						rwctx.reinitStreamAggr()
+					}
+				}
+				rwctxsMapLock.Unlock()
+			} else {
+				for _, rwctx := range rwctxsDefault {
+					rwctx.reinitStreamAggr()
+				}
+			}
+			logger.Infof("Successfully reloaded stream aggregation configs")
 		}
 	}()
 }
 
 var (
-	configReloads      = metrics.NewCounter(`vmagent_relabel_config_reloads_total`)
-	configReloadErrors = metrics.NewCounter(`vmagent_relabel_config_reloads_errors_total`)
-	configSuccess      = metrics.NewCounter(`vmagent_relabel_config_last_reload_successful`)
-	configTimestamp    = metrics.NewCounter(`vmagent_relabel_config_last_reload_success_timestamp_seconds`)
+	relabelConfigReloads      = metrics.NewCounter(`vmagent_relabel_config_reloads_total`)
+	relabelConfigReloadErrors = metrics.NewCounter(`vmagent_relabel_config_reloads_errors_total`)
+	relabelConfigSuccess      = metrics.NewCounter(`vmagent_relabel_config_last_reload_successful`)
+	relabelConfigTimestamp    = metrics.NewCounter(`vmagent_relabel_config_last_reload_success_timestamp_seconds`)
 )
 
 func newRemoteWriteCtxs(at *auth.Token, urls []string) []*remoteWriteCtx {
@@ -489,6 +507,7 @@ type remoteWriteCtx struct {
 	c   *client
 
 	sas                 *streamaggr.Aggregators
+	saHash              uint64
 	streamAggrKeepInput bool
 
 	pss        []*pendingSeries
@@ -548,14 +567,16 @@ func newRemoteWriteCtx(argIdx int, at *auth.Token, remoteWriteURL *url.URL, maxI
 	}
 
 	// Initialize sas
-	sasFile := streamAggrConfig.GetOptionalArg(argIdx)
-	if sasFile != "" {
+	saCfg, saHash := saCfgLoader.getCurrentConfig(argIdx)
+	if len(saCfg) > 0 {
+		sasFile := streamAggrConfig.GetOptionalArg(argIdx)
 		dedupInterval := streamAggrDedupInterval.GetOptionalArgOrDefault(argIdx, 0)
-		sas, err := streamaggr.LoadFromFile(sasFile, rwctx.pushInternal, dedupInterval)
+		sas, err := streamaggr.NewAggregators(saCfg, rwctx.pushInternal, dedupInterval)
 		if err != nil {
 			logger.Fatalf("cannot initialize stream aggregators from -remoteWrite.streamAggrFile=%q: %s", sasFile, err)
 		}
 		rwctx.sas = sas
+		rwctx.saHash = saHash
 		rwctx.streamAggrKeepInput = streamAggrKeepInput.GetOptionalArg(argIdx)
 	}
 
@@ -623,6 +644,20 @@ func (rwctx *remoteWriteCtx) pushInternal(tss []prompbmarshal.TimeSeries) {
 	pss[idx].Push(tss)
 }
 
+func (rwctx *remoteWriteCtx) reinitStreamAggr() {
+	if rwctx.sas == nil {
+		return
+	}
+	saCfg, saHash := saCfgLoader.getCurrentConfig(rwctx.idx)
+	if rwctx.saHash == saHash {
+		return
+	}
+	if err := rwctx.sas.ReInitConfigs(saCfg); err != nil {
+		logger.Errorf("Cannot apply stream aggregation configs %d: %s", rwctx.idx, err)
+	}
+	rwctx.saHash = saHash
+}
+
 var tssRelabelPool = &sync.Pool{
 	New: func() interface{} {
 		a := []prompbmarshal.TimeSeries{}
diff --git a/app/vmagent/remotewrite/streamagg.go b/app/vmagent/remotewrite/streamagg.go
new file mode 100644
index 0000000000..b56091f531
--- /dev/null
+++ b/app/vmagent/remotewrite/streamagg.go
@@ -0,0 +1,118 @@
+package remotewrite
+
+import (
+	"fmt"
+	"sync/atomic"
+
+	"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
+	"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
+	"github.com/VictoriaMetrics/VictoriaMetrics/lib/streamaggr"
+	"github.com/VictoriaMetrics/metrics"
+)
+
+var (
+	streamAggrConfig = flagutil.NewArrayString("remoteWrite.streamAggr.config", "Optional path to file with stream aggregation config. "+
+		"See https://docs.victoriametrics.com/stream-aggregation.html . "+
+		"See also -remoteWrite.streamAggr.keepInput and -remoteWrite.streamAggr.dedupInterval")
+	streamAggrKeepInput = flagutil.NewArrayBool("remoteWrite.streamAggr.keepInput", "Whether to keep input samples after the aggregation with -remoteWrite.streamAggr.config. "+
+		"By default the input is dropped after the aggregation, so only the aggregate data is sent to the -remoteWrite.url. "+
+		"See https://docs.victoriametrics.com/stream-aggregation.html")
+	streamAggrDedupInterval = flagutil.NewArrayDuration("remoteWrite.streamAggr.dedupInterval", "Input samples are de-duplicated with this interval before being aggregated. "+
+		"Only the last sample per each time series per each interval is aggregated if the interval is greater than zero")
+)
+
+var (
+	saCfgReloads   = metrics.NewCounter(`vmagent_streamaggr_config_reloads_total`)
+	saCfgReloadErr = metrics.NewCounter(`vmagent_streamaggr_config_reloads_errors_total`)
+	saCfgSuccess   = metrics.NewCounter(`vmagent_streamaggr_config_last_reload_successful`)
+	saCfgTimestamp = metrics.NewCounter(`vmagent_streamaggr_config_last_reload_success_timestamp_seconds`)
+)
+
+// saConfigRules - type alias for unmarshalled stream aggregation config
+type saConfigRules = []*streamaggr.Config
+
+// saConfigsLoader loads stream aggregation configs from the given files.
+type saConfigsLoader struct {
+	files   []string
+	configs atomic.Pointer[[]saConfig]
+}
+
+// newSaConfigsLoader creates new saConfigsLoader for the given config files.
+func newSaConfigsLoader(configFiles []string) (*saConfigsLoader, error) {
+	result := &saConfigsLoader{
+		files: configFiles,
+	}
+	// Initial load of configs.
+	if err := result.reloadConfigs(); err != nil {
+		return nil, err
+	}
+	return result, nil
+}
+
+// reloadConfigs reloads stream aggregation configs from the files given in constructor.
+func (r *saConfigsLoader) reloadConfigs() error {
+	// Increment reloads counter if it is not the initial load.
+	if r.configs.Load() != nil {
+		saCfgReloads.Inc()
+	}
+
+	// Load all configs from files.
+	var configs = make([]saConfig, len(r.files))
+	for i, path := range r.files {
+		if len(path) == 0 {
+			// Skip empty stream aggregation config.
+			continue
+		}
+		rules, hash, err := streamaggr.LoadConfigsFromFile(path)
+		if err != nil {
+			saCfgSuccess.Set(0)
+			saCfgReloadErr.Inc()
+			return fmt.Errorf("cannot load stream aggregation config from %q: %w", path, err)
+		}
+		configs[i] = saConfig{
+			path:  path,
+			hash:  hash,
+			rules: rules,
+		}
+	}
+
+	// Update configs.
+	r.configs.Store(&configs)
+
+	saCfgSuccess.Set(1)
+	saCfgTimestamp.Set(fasttime.UnixTimestamp())
+	return nil
+}
+
+// getCurrentConfig returns the current stream aggregation config with the given idx.
+func (r *saConfigsLoader) getCurrentConfig(idx int) (saConfigRules, uint64) {
+	all := r.configs.Load()
+	if all == nil {
+		return nil, 0
+	}
+	cfgs := *all
+	if len(cfgs) == 0 {
+		return nil, 0
+	}
+	if idx >= len(cfgs) {
+		if len(cfgs) == 1 {
+			cfg := cfgs[0]
+			return cfg.rules, cfg.hash
+		}
+		return nil, 0
+	}
+	cfg := cfgs[idx]
+	return cfg.rules, cfg.hash
+}
+
+type saConfig struct {
+	path  string
+	hash  uint64
+	rules saConfigRules
+}
+
+// CheckStreamAggConfigs checks -remoteWrite.streamAggr.config.
+func CheckStreamAggConfigs() error {
+	_, err := newSaConfigsLoader(*streamAggrConfig)
+	return err
+}
diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md
index 9f60f91003..301a6dc521 100644
--- a/docs/CHANGELOG.md
+++ b/docs/CHANGELOG.md
@@ -26,6 +26,7 @@ created by v1.90.0 or newer versions. The solution is to upgrade to v1.90.0 or n
 * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add `-kafka.consumer.topic.concurrency` command-line flag. It controls the number of Kafka consumer workers to use by `vmagent`. It should eliminate the need to start multiple `vmagent` instances to improve data transfer rate. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1957).
 * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add support for [Kafka producer and consumer](https://docs.victoriametrics.com/vmagent.html#kafka-integration) on `arm64` machines. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2271).
 * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): delete unused buffered data at `-remoteWrite.tmpDataPath` directory when there is no matching `-remoteWrite.url` to send this data to. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4014).
+* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add support for hot reload of stream aggregation configs. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3639).
 * FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): automatically draw a heatmap graph when the query selects a single [histogram](https://docs.victoriametrics.com/keyConcepts.html#histogram). This simplifies analyzing histograms. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3384).
 * FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): add support for drag'n'drop and paste from clipboard in the "Trace analyzer" page. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/3971).
 * FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): hide messages longer than 3 lines in the trace. You can view the full message by clicking on the `show more` button. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/3971).
@@ -135,19 +136,6 @@ Released at 2023-02-24
 * BUGFIX: properly parse timestamps in milliseconds when [ingesting data via OpenTSDB telnet put protocol](https://docs.victoriametrics.com/#sending-data-via-telnet-put-protocol). Previously timestamps in milliseconds were mistakenly multiplied by 1000. Thanks to @Droxenator for the [pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/3810).
 * BUGFIX: [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html): do not add extrapolated points outside the real points when using [interpolate()](https://docs.victoriametrics.com/MetricsQL.html#interpolate) function. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3816).
 
-## [v1.87.4](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.87.4)
-
-Released at 2023-03-25
-
-**v1.87.x is a line of LTS releases (e.g. long-time support). It contains important up-to-date bugfixes.
-The v1.87.x line will be supported for at least 12 months since [v1.87.0](https://docs.victoriametrics.com/CHANGELOG.html#v1870) release**
-
-* BUGFIX: prevent from slow [snapshot creating](https://docs.victoriametrics.com/#how-to-work-with-snapshots) under high data ingestion rate. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3551).
-* BUGFIX: [vmauth](https://docs.victoriametrics.com/vmauth.html):  suppress [proxy protocol](https://www.haproxy.org/download/2.3/doc/proxy-protocol.txt) parsing errors in case of `EOF`. Usually, the error is caused by health checks and is not a sign of an actual error.
-* BUGFIX: [vmbackup](https://docs.victoriametrics.com/vmbackup.html): fix snapshot not being deleted in case of error during backup. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2055).
-* BUGFIX: allow using dashes and dots in environment variables names referred in config files via `%{ENV-VAR.SYNTAX}`. See [these docs](https://docs.victoriametrics.com/#environment-variables) and [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3999).
-* BUGFIX: return back query performance scalability on hosts with big number of CPU cores. The scalability has been reduced in [v1.86.0](https://docs.victoriametrics.com/CHANGELOG.html#v1860). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3966).
-
 ## [v1.87.3](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.87.3)
 
 Released at 2023-03-12
diff --git a/docs/stream-aggregation.md b/docs/stream-aggregation.md
index 4eb6c86a49..df983c6ba9 100644
--- a/docs/stream-aggregation.md
+++ b/docs/stream-aggregation.md
@@ -545,3 +545,20 @@ at [single-node VictoriaMetrics](https://docs.victoriametrics.com/Single-server-
 
 The file can contain multiple aggregation configs. The aggregation is performed independently
 per each specified config entry.
+
+### Configuration update
+
+[vmagent](https://docs.victoriametrics.com/vmagent.html) and 
+[single-node VictoriaMetrics](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html) support two 
+approaches for reloading stream aggregation configs from updated config files such as
+`-remoteWrite.streamAggr.config` and `-streamAggr.config` without restart. 
+
+* Sending `SIGHUP` signal to `vmagent` process:
+
+  ```console
+  kill -SIGHUP `pidof vmagent`
+  ```
+
+* Sending HTTP request to `/-/reload` endpoint (e.g. `http://vmagent:8429/-/reload`).
+
+It will reset the aggregation state only for changed rules in the configuration files.
diff --git a/docs/vmagent.md b/docs/vmagent.md
index 5f425b3fa2..36ca3b697c 100644
--- a/docs/vmagent.md
+++ b/docs/vmagent.md
@@ -108,7 +108,7 @@ additionally to pull-based Prometheus-compatible targets' scraping:
 
 `vmagent` should be restarted in order to update config options set via command-line args.
 `vmagent` supports multiple approaches for reloading configs from updated config files such as
-`-promscrape.config`, `-remoteWrite.relabelConfig` and `-remoteWrite.urlRelabelConfig`:
+`-promscrape.config`, `-remoteWrite.relabelConfig`, `-remoteWrite.urlRelabelConfig` and `-remoteWrite.streamAggr.config`:
 
 * Sending `SIGHUP` signal to `vmagent` process:
 
diff --git a/lib/streamaggr/streamaggr.go b/lib/streamaggr/streamaggr.go
index 3e6199350b..c177544f30 100644
--- a/lib/streamaggr/streamaggr.go
+++ b/lib/streamaggr/streamaggr.go
@@ -1,12 +1,14 @@
 package streamaggr
 
 import (
+	"encoding/json"
 	"fmt"
 	"math"
 	"sort"
 	"strconv"
 	"strings"
 	"sync"
+	"sync/atomic"
 	"time"
 
 	"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
@@ -17,6 +19,7 @@ import (
 	"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
 	"github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel"
 	"github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils"
+	"github.com/cespare/xxhash/v2"
 	"gopkg.in/yaml.v2"
 )
 
@@ -36,22 +39,40 @@ var supportedOutputs = []string{
 	"quantiles(phi1, ..., phiN)",
 }
 
-// LoadFromFile loads Aggregators from the given path and uses the given pushFunc for pushing the aggregated data.
+// ParseConfig loads array of stream aggregation configs from the given path.
+func ParseConfig(data []byte) ([]*Config, uint64, error) {
+	var cfgs []*Config
+	if err := yaml.UnmarshalStrict(data, &cfgs); err != nil {
+		return nil, 0, fmt.Errorf("cannot parse stream aggregation config %w", err)
+	}
+	return cfgs, xxhash.Sum64(data), nil
+}
+
+// LoadConfigsFromFile loads array of stream aggregation configs from the given path.
+func LoadConfigsFromFile(path string) ([]*Config, uint64, error) {
+	data, err := fs.ReadFileOrHTTP(path)
+	if err != nil {
+		return nil, 0, fmt.Errorf("cannot load stream aggregation config from %q: %w", path, err)
+	}
+	return ParseConfig(data)
+}
+
+// LoadAggregatorsFromFile loads Aggregators from the given path and uses the given pushFunc for pushing the aggregated data.
 //
 // If dedupInterval > 0, then the input samples are de-duplicated before being aggregated,
 // e.g. only the last sample per each time series per each dedupInterval is aggregated.
 //
 // The returned Aggregators must be stopped with MustStop() when no longer needed.
-func LoadFromFile(path string, pushFunc PushFunc, dedupInterval time.Duration) (*Aggregators, error) {
-	data, err := fs.ReadFileOrHTTP(path)
+func LoadAggregatorsFromFile(path string, pushFunc PushFunc, dedupInterval time.Duration) (*Aggregators, uint64, error) {
+	cfgs, configHash, err := LoadConfigsFromFile(path)
 	if err != nil {
-		return nil, fmt.Errorf("cannot load aggregators: %w", err)
+		return nil, 0, fmt.Errorf("cannot load stream aggregation config: %w", err)
 	}
-	as, err := NewAggregatorsFromData(data, pushFunc, dedupInterval)
+	as, err := NewAggregators(cfgs, pushFunc, dedupInterval)
 	if err != nil {
-		return nil, fmt.Errorf("cannot initialize aggregators from %q: %w", path, err)
+		return nil, 0, fmt.Errorf("cannot initialize aggregators from %q: %w", path, err)
 	}
-	return as, nil
+	return as, configHash, nil
 }
 
 // NewAggregatorsFromData initializes Aggregators from the given data and uses the given pushFunc for pushing the aggregated data.
@@ -127,9 +148,22 @@ type Config struct {
 	OutputRelabelConfigs []promrelabel.RelabelConfig `yaml:"output_relabel_configs,omitempty"`
 }
 
+func (cfg *Config) hash() (uint64, error) {
+	if cfg == nil {
+		return 0, nil
+	}
+	data, err := json.Marshal(cfg)
+	if err != nil {
+		return 0, fmt.Errorf("cannot marshal stream aggregation rule %+v: %w", cfg, err)
+	}
+	return xxhash.Sum64(data), nil
+}
+
 // Aggregators aggregates metrics passed to Push and calls pushFunc for aggregate data.
 type Aggregators struct {
-	as []*aggregator
+	as            atomic.Pointer[[]*aggregator]
+	pushFunc      PushFunc
+	dedupInterval time.Duration
 }
 
 // NewAggregators creates Aggregators from the given cfgs.
@@ -152,9 +186,13 @@ func NewAggregators(cfgs []*Config, pushFunc PushFunc, dedupInterval time.Durati
 		}
 		as[i] = a
 	}
-	return &Aggregators{
-		as: as,
-	}, nil
+	result := &Aggregators{
+		pushFunc:      pushFunc,
+		dedupInterval: dedupInterval,
+	}
+	result.as.Store(&as)
+
+	return result, nil
 }
 
 // MustStop stops a.
@@ -162,7 +200,7 @@ func (a *Aggregators) MustStop() {
 	if a == nil {
 		return
 	}
-	for _, aggr := range a.as {
+	for _, aggr := range *a.as.Load() {
 		aggr.MustStop()
 	}
 }
@@ -172,11 +210,74 @@ func (a *Aggregators) Push(tss []prompbmarshal.TimeSeries) {
 	if a == nil {
 		return
 	}
-	for _, aggr := range a.as {
+	for _, aggr := range *a.as.Load() {
 		aggr.Push(tss)
 	}
 }
 
+// ReInitConfigs reinits state of Aggregators a with the given new stream aggregation config
+func (a *Aggregators) ReInitConfigs(cfgs []*Config) error {
+	if a == nil {
+		return nil
+	}
+
+	keys := make(map[uint64]struct{})        // set of all keys (configs and aggregators)
+	cfgsMap := make(map[uint64]*Config)      // map of config keys to their indices in cfgs
+	aggrsMap := make(map[uint64]*aggregator) // map of aggregator keys to their indices in a.as
+
+	for _, cfg := range cfgs {
+		key, err := cfg.hash()
+		if err != nil {
+			return fmt.Errorf("unable to calculate hash for config '%+v': %w", cfg, err)
+		}
+		keys[key] = struct{}{}
+		cfgsMap[key] = cfg
+	}
+	for _, aggr := range *a.as.Load() {
+		keys[aggr.cfgHash] = struct{}{}
+		aggrsMap[aggr.cfgHash] = aggr
+	}
+
+	asNew := make([]*aggregator, 0, len(aggrsMap))
+	asDel := make([]*aggregator, 0, len(aggrsMap))
+	for key := range keys {
+		cfg, hasCfg := cfgsMap[key]
+		agg, hasAggr := aggrsMap[key]
+
+		// if config for aggregator was changed or removed
+		// then we need to stop aggregator and remove it
+		if !hasCfg && hasAggr {
+			asDel = append(asDel, agg)
+			continue
+		}
+
+		// if there is no aggregator for config (new config),
+		// then we need to create it
+		if hasCfg && !hasAggr {
+			newAgg, err := newAggregator(cfg, a.pushFunc, a.dedupInterval)
+			if err != nil {
+				return fmt.Errorf("cannot initialize aggregator for config '%+v': %w", cfg, err)
+			}
+			asNew = append(asNew, newAgg)
+			continue
+		}
+
+		// if aggregator config was not changed, then we can just keep it
+		if hasCfg && hasAggr {
+			asNew = append(asNew, agg)
+		}
+	}
+
+	// Atomically replace aggregators array.
+	a.as.Store(&asNew)
+	// and stop old aggregators
+	for _, aggr := range asDel {
+		aggr.MustStop()
+	}
+
+	return nil
+}
+
 // aggregator aggregates input series according to the config passed to NewAggregator
 type aggregator struct {
 	match *promrelabel.IfExpression
@@ -194,6 +295,7 @@ type aggregator struct {
 
 	// aggrStates contains aggregate states for the given outputs
 	aggrStates []aggrState
+	hasState   atomic.Bool
 
 	pushFunc PushFunc
 
@@ -202,7 +304,8 @@ type aggregator struct {
 	// It contains the interval, labels in (by, without), plus output name.
 	// For example, foo_bar metric name is transformed to foo_bar:1m_by_job
 	// for `interval: 1m`, `by: [job]`
-	suffix string
+	suffix  string
+	cfgHash uint64
 
 	wg     sync.WaitGroup
 	stopCh chan struct{}
@@ -330,6 +433,11 @@ func newAggregator(cfg *Config, pushFunc PushFunc, dedupInterval time.Duration)
 		dedupAggr = newLastAggrState()
 	}
 
+	cfgHash, err := cfg.hash()
+	if err != nil {
+		return nil, fmt.Errorf("cannot calculate config hash for config %+v: %w", cfg, err)
+	}
+
 	// initialize the aggregator
 	a := &aggregator{
 		match: cfg.Match,
@@ -345,7 +453,8 @@ func newAggregator(cfg *Config, pushFunc PushFunc, dedupInterval time.Duration)
 		aggrStates: aggrStates,
 		pushFunc:   pushFunc,
 
-		suffix: suffix,
+		suffix:  suffix,
+		cfgHash: cfgHash,
 
 		stopCh: make(chan struct{}),
 	}
@@ -411,8 +520,9 @@ func (a *aggregator) dedupFlush() {
 		skipAggrSuffix: true,
 	}
 	a.dedupAggr.appendSeriesForFlush(ctx)
-	logger.Errorf("series after dedup: %v", ctx.tss)
 	a.push(ctx.tss)
+
+	a.hasState.Store(false)
 }
 
 func (a *aggregator) flush() {
@@ -442,6 +552,8 @@ func (a *aggregator) flush() {
 		// Push the output metrics.
 		a.pushFunc(tss)
 	}
+
+	a.hasState.Store(false)
 }
 
 // MustStop stops the aggregator.
@@ -449,11 +561,26 @@ func (a *aggregator) flush() {
 // The aggregator stops pushing the aggregated metrics after this call.
 func (a *aggregator) MustStop() {
 	close(a.stopCh)
+
+	if a.hasState.Load() {
+		if a.dedupAggr != nil {
+			flushConcurrencyCh <- struct{}{}
+			a.dedupFlush()
+			<-flushConcurrencyCh
+		}
+
+		flushConcurrencyCh <- struct{}{}
+		a.flush()
+		<-flushConcurrencyCh
+	}
+
 	a.wg.Wait()
 }
 
 // Push pushes tss to a.
 func (a *aggregator) Push(tss []prompbmarshal.TimeSeries) {
+	a.hasState.Store(true)
+
 	if a.dedupAggr == nil {
 		a.push(tss)
 		return
diff --git a/lib/streamaggr/streamaggr_test.go b/lib/streamaggr/streamaggr_test.go
index a3c002c8d0..0968306962 100644
--- a/lib/streamaggr/streamaggr_test.go
+++ b/lib/streamaggr/streamaggr_test.go
@@ -146,7 +146,7 @@ func TestAggregatorsSuccess(t *testing.T) {
 		tssInput := mustParsePromMetrics(inputMetrics)
 		a.Push(tssInput)
 		if a != nil {
-			for _, aggr := range a.as {
+			for _, aggr := range *a.as.Load() {
 				aggr.flush()
 			}
 		}
@@ -671,7 +671,7 @@ func TestAggregatorsWithDedupInterval(t *testing.T) {
 		tssInput := mustParsePromMetrics(inputMetrics)
 		a.Push(tssInput)
 		if a != nil {
-			for _, aggr := range a.as {
+			for _, aggr := range *a.as.Load() {
 				aggr.dedupFlush()
 				aggr.flush()
 			}
@@ -719,6 +719,106 @@ foo:1m_sum_samples{baz="qwe"} 10
 `)
 }
 
+func TestAggregatorsReinit(t *testing.T) {
+	f := func(config, newConfig, inputMetrics, outputMetricsExpected string) {
+		t.Helper()
+
+		// Initialize Aggregators
+		var tssOutput []prompbmarshal.TimeSeries
+		var tssOutputLock sync.Mutex
+		pushFunc := func(tss []prompbmarshal.TimeSeries) {
+			tssOutputLock.Lock()
+			for _, ts := range tss {
+				labelsCopy := append([]prompbmarshal.Label{}, ts.Labels...)
+				samplesCopy := append([]prompbmarshal.Sample{}, ts.Samples...)
+				tssOutput = append(tssOutput, prompbmarshal.TimeSeries{
+					Labels:  labelsCopy,
+					Samples: samplesCopy,
+				})
+			}
+			tssOutputLock.Unlock()
+		}
+
+		a, err := NewAggregatorsFromData([]byte(config), pushFunc, 0)
+		if err != nil {
+			t.Fatalf("cannot initialize aggregators: %s", err)
+		}
+
+		// Push the inputMetrics to Aggregators
+		tssInput := mustParsePromMetrics(inputMetrics)
+		a.Push(tssInput)
+
+		// Reinitialize Aggregators
+		nc, _, err := ParseConfig([]byte(newConfig))
+		if err != nil {
+			t.Fatalf("cannot parse new config: %s", err)
+		}
+		err = a.ReInitConfigs(nc)
+		if err != nil {
+			t.Fatalf("cannot reinit aggregators: %s", err)
+		}
+
+		// Push the inputMetrics to Aggregators
+		a.Push(tssInput)
+		if a != nil {
+			for _, aggr := range *a.as.Load() {
+				aggr.flush()
+			}
+		}
+
+		a.MustStop()
+
+		// Verify the tssOutput contains the expected metrics
+		tsStrings := make([]string, len(tssOutput))
+		for i, ts := range tssOutput {
+			tsStrings[i] = timeSeriesToString(ts)
+		}
+		sort.Strings(tsStrings)
+		outputMetrics := strings.Join(tsStrings, "")
+		if outputMetrics != outputMetricsExpected {
+			t.Fatalf("unexpected output metrics;\ngot\n%s\nwant\n%s", outputMetrics, outputMetricsExpected)
+		}
+	}
+
+	f(`
+- interval: 1m
+  outputs: [count_samples]
+`, `
+- interval: 1m
+  outputs: [sum_samples]
+`, `
+foo 123
+bar 567
+foo 234
+`, `bar:1m_count_samples 1
+bar:1m_sum_samples 567
+foo:1m_count_samples 2
+foo:1m_sum_samples 357
+`)
+
+	f(`
+- interval: 1m
+  outputs: [total]
+- interval: 2m
+  outputs: [count_samples]
+`, `
+- interval: 1m
+  outputs: [sum_samples]
+- interval: 2m
+  outputs: [count_samples]
+`, `
+foo 123
+bar 567
+foo 234
+`, `bar:1m_sum_samples 567
+bar:1m_total 0
+bar:2m_count_samples 2
+foo:1m_sum_samples 357
+foo:1m_total 111
+foo:2m_count_samples 4
+`)
+}
+
 func timeSeriesToString(ts prompbmarshal.TimeSeries) string {
 	labelsString := promrelabel.LabelsToString(ts.Labels)
 	if len(ts.Samples) != 1 {