diff --git a/app/vmagent/remotewrite/remotewrite.go b/app/vmagent/remotewrite/remotewrite.go
index c568e2a36f..ee29d77ce0 100644
--- a/app/vmagent/remotewrite/remotewrite.go
+++ b/app/vmagent/remotewrite/remotewrite.go
@@ -89,8 +89,9 @@ var (
 	streamAggrDropInput = flagutil.NewArrayBool("remoteWrite.streamAggr.dropInput", "Whether to drop all the input samples after the aggregation "+
 		"with -remoteWrite.streamAggr.config. By default, only aggregates samples are dropped, while the remaining samples "+
 		"are written to the corresponding -remoteWrite.url . See also -remoteWrite.streamAggr.keepInput and https://docs.victoriametrics.com/stream-aggregation.html")
-	streamAggrDedupInterval = flagutil.NewArrayDuration("remoteWrite.streamAggr.dedupInterval", 0, "Input samples are de-duplicated with this interval before being aggregated. "+
-		"Only the last sample per each time series per each interval is aggregated if the interval is greater than zero")
+	streamAggrDedupInterval = flagutil.NewArrayDuration("remoteWrite.streamAggr.dedupInterval", 0, "Input samples are de-duplicated with this interval before being aggregated "+
+		"by stream aggregation. Only the last sample per each time series per each interval is aggregated if the interval is greater than zero. "+
+		"See https://docs.victoriametrics.com/stream-aggregation.html")
 	disableOnDiskQueue = flag.Bool("remoteWrite.disableOnDiskQueue", false, "Whether to disable storing pending data to -remoteWrite.tmpDataPath "+
 		"when the configured remote storage systems cannot keep up with the data ingestion rate. See https://docs.victoriametrics.com/vmagent.html#disabling-on-disk-persistence ."+
 		"See also -remoteWrite.dropSamplesOnOverload")
@@ -739,8 +740,10 @@ func newRemoteWriteCtx(argIdx int, remoteWriteURL *url.URL, maxInmemoryBlocks in
 	// Initialize sas
 	sasFile := streamAggrConfig.GetOptionalArg(argIdx)
 	if sasFile != "" {
-		dedupInterval := streamAggrDedupInterval.GetOptionalArg(argIdx)
-		sas, err := streamaggr.LoadFromFile(sasFile, rwctx.pushInternalTrackDropped, dedupInterval)
+		opts := &streamaggr.Options{
+			DedupInterval: streamAggrDedupInterval.GetOptionalArg(argIdx),
+		}
+		sas, err := streamaggr.LoadFromFile(sasFile, rwctx.pushInternalTrackDropped, opts)
 		if err != nil {
 			logger.Fatalf("cannot initialize stream aggregators from -remoteWrite.streamAggr.config=%q: %s", sasFile, err)
 		}
@@ -894,8 +897,10 @@ func (rwctx *remoteWriteCtx) reinitStreamAggr() {
 
 	logger.Infof("reloading stream aggregation configs pointed by -remoteWrite.streamAggr.config=%q", sasFile)
 	metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reloads_total{path=%q}`, sasFile)).Inc()
-	dedupInterval := streamAggrDedupInterval.GetOptionalArg(rwctx.idx)
-	sasNew, err := streamaggr.LoadFromFile(sasFile, rwctx.pushInternalTrackDropped, dedupInterval)
+	opts := &streamaggr.Options{
+		DedupInterval: streamAggrDedupInterval.GetOptionalArg(rwctx.idx),
+	}
+	sasNew, err := streamaggr.LoadFromFile(sasFile, rwctx.pushInternalTrackDropped, opts)
 	if err != nil {
 		metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reloads_errors_total{path=%q}`, sasFile)).Inc()
 		metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_successful{path=%q}`, sasFile)).Set(0)
@@ -937,8 +942,10 @@ func CheckStreamAggrConfigs() error {
 		if sasFile == "" {
 			continue
 		}
-		dedupInterval := streamAggrDedupInterval.GetOptionalArg(idx)
-		sas, err := streamaggr.LoadFromFile(sasFile, pushNoop, dedupInterval)
+		opts := &streamaggr.Options{
+			DedupInterval: streamAggrDedupInterval.GetOptionalArg(idx),
+		}
+		sas, err := streamaggr.LoadFromFile(sasFile, pushNoop, opts)
 		if err != nil {
 			return fmt.Errorf("cannot load -remoteWrite.streamAggr.config=%q: %w", sasFile, err)
 		}
diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md
index 2b01abf3ee..2099ea007a 100644
--- a/docs/CHANGELOG.md
+++ b/docs/CHANGELOG.md
@@ -31,8 +31,9 @@ See also [LTS releases](https://docs.victoriametrics.com/lts-releases/).
 ## tip
 
 * FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): reduce memory usage by up to 5x when aggregating over big number of unique [time series](https://docs.victoriametrics.com/keyconcepts/#time-series). The memory usage reduction is most visible when [stream deduplication](https://docs.victoriametrics.com/stream-aggregation/#deduplication) is enabled. The downside is increased CPU usage by up to 30%.
-* FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): add support for `dedup_interval` option, which allows configuring individual [deduplication intervals](https://docs.victoriametrics.com/stream-aggregation/#deduplication) per each [stream aggregation config](https://docs.victoriametrics.com/stream-aggregation/#stream-aggregation-config).
-* FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): add support for `keep_metric_names` option, which can be set at [stream aggregation config](https://docs.victoriametrics.com/stream-aggregation/#stream-aggregation-config) in order to keep the original metric names in the output aggregated samples instead of using [the default output metric naming scheme](https://docs.victoriametrics.com/stream-aggregation/#output-metric-names).
+* FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): add `dedup_interval` option, which allows configuring individual [deduplication intervals](https://docs.victoriametrics.com/stream-aggregation/#deduplication) per each [stream aggregation config](https://docs.victoriametrics.com/stream-aggregation/#stream-aggregation-config).
+* FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): add `keep_metric_names` option, which can be set at [stream aggregation config](https://docs.victoriametrics.com/stream-aggregation/#stream-aggregation-config) in order to keep the original metric names in the output aggregated samples instead of using [the default output metric naming scheme](https://docs.victoriametrics.com/stream-aggregation/#output-metric-names).
+* FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): add `no_align_flush_to_interval` option for disabling time alignment for aggregated data flush. See [these docs](https://docs.victoriametrics.com/stream-aggregation/#flush-time-alignment) for details.
 * FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): add [unique_samples](https://docs.victoriametrics.com/stream-aggregation/#unique_samples) output, which can be used for calculating the number of unique sample values over the given `interval`.
 * FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): add [increase_prometheus](https://docs.victoriametrics.com/stream-aggregation/#increase_prometheus) and [total_prometheus](https://docs.victoriametrics.com/stream-aggregation/#total_prometheus) outputs, which can be used for `increase` and `total` aggregations when the first sample of every new [time series](https://docs.victoriametrics.com/keyconcepts/#time-series) must be ignored.
 * FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): expose `vm_streamaggr_flush_timeouts_total` and `vm_streamaggr_dedup_flush_timeouts_total` [counters](https://docs.victoriametrics.com/keyconcepts/#counter) at [`/metrics` page](https://docs.victoriametrics.com/#monitoring), which can be used for detecting flush timeouts for stream aggregation states. Expose also `vm_streamaggr_flush_duration_seconds` and `vm_streamaggr_dedup_flush_duration_seconds` [histograms](https://docs.victoriametrics.com/keyconcepts/#histogram) for monitoring the real flush durations of stream aggregation states.
diff --git a/docs/stream-aggregation.md b/docs/stream-aggregation.md
index 32f10ce9a6..909984d20a 100644
--- a/docs/stream-aggregation.md
+++ b/docs/stream-aggregation.md
@@ -62,6 +62,20 @@ In this case the [de-duplication](https://docs.victoriametrics.com/#deduplicatio
 
 De-duplicatation is performed after performing the input relabeling with `input_relabel_configs` - see [these docs](#relabeling).
 
+## Flush time alignment
+
+By default the time for aggregated data flush is aligned by the `interval` option specified in [aggregate config](#stream-aggregation-config).
+For example:
+- if `interval: 1m` is set, then the aggregated data is flushed to the storage at the end of every minute
+- if `interval: 1h` is set, then the aggregated data is flushed to the storage at the end of every hour
+
+If you do not need such an alignment, then set `no_align_flush_to_interval: true` option in the [aggregate config](#stream-aggregation-config).
+In this case aggregated data flushes will be aligned to the `vmagent` start time or to [config reload](#configuration-update) time.
+
+The aggregated data on the first and the last interval is dropped during `vmagent` start, restart or [config reload](#configuration-update),
+since the first and the last aggregation intervals are incomplete, so they usually contain incomplete confusing data.
+If you need preserving the aggregated data on these intervals, then set `flush_on_shutdown: true` option in the [aggregate config](#stream-aggregation-config).
+
 ## Use cases
 
 Stream aggregation can be used in the following cases:
@@ -423,10 +437,6 @@ and then sent to the storage once per `interval`. The aggregated samples are nam
 If `by` and `without` lists are specified in the [config](#stream-aggregation-config),
 then the [aggregation by labels](#aggregating-by-labels) is performed additionally to aggregation by `interval`.
 
-On vmagent shutdown or [configuration reload](#configuration-update) unfinished aggregated states are discarded,
-as they might produce lower values than user expects. It is possible to specify `flush_on_shutdown: true` setting in 
-aggregation config to make vmagent to send unfinished states to the remote storage.
-
 Below are aggregation functions that can be put in the `outputs` list at [stream aggregation config](#stream-aggregation-config):
 
 * [avg](#avg)
@@ -809,7 +819,8 @@ at [single-node VictoriaMetrics](https://docs.victoriametrics.com/Single-server-
   # Samples are de-duplicated on a per-series basis. See https://docs.victoriametrics.com/keyconcepts/#time-series
   # and https://docs.victoriametrics.com/#deduplication
   # The deduplication is performed after input_relabel_configs relabeling is applied.
-  # By default the deduplication is disabled.
+  # By default the deduplication is disabled unless -remoteWrite.streamAggr.dedupInterval or -streamAggr.dedupInterval
+  # command-line flags are set.
   #
   # dedup_interval: 30s
 
@@ -824,10 +835,17 @@ at [single-node VictoriaMetrics](https://docs.victoriametrics.com/Single-server-
   #
   # staleness_interval: 2m
   
-  # flush_on_shutdown defines whether to flush the unfinished aggregation states on process restarts
-  # or config reloads. It is not recommended changing this setting, unless unfinished aggregations states
-  # are preferred to missing data points.
-  # Unfinished aggregation states aren't flushed on shutdown by default.
+  # no_align_flush_to_interval disables aligning of flush times for the aggregated data to multiples of interval.
+  # By default flush times for the aggregated data is aligned to multiples of interval.
+  # For example:
+  # - if `interval: 1m` is set, then flushes happen at the end of every minute,
+  # - if `interval: 1h` is set, then flushes happen at the end of every hour
+  #
+  # no_align_flush_to_interval: false
+
+  # flush_on_shutdown instructs to flush aggregated data to the storage on the first and the last intervals
+  # during vmagent starts, restarts or configuration reloads.
+  # Incomplete aggregated data isn't flushed to the storage by default, since it is usually confusing.
   #
   # flush_on_shutdown: false
 
diff --git a/lib/streamaggr/streamaggr.go b/lib/streamaggr/streamaggr.go
index e3860c183e..19e0be4334 100644
--- a/lib/streamaggr/streamaggr.go
+++ b/lib/streamaggr/streamaggr.go
@@ -20,6 +20,7 @@ import (
 	"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
 	"github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel"
 	"github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils"
+	"github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool"
 	"github.com/VictoriaMetrics/metrics"
 	"gopkg.in/yaml.v2"
 )
@@ -45,11 +46,10 @@ var supportedOutputs = []string{
 
 // LoadFromFile loads Aggregators from the given path and uses the given pushFunc for pushing the aggregated data.
 //
-// If dedupInterval > 0, then the input samples are de-duplicated before being aggregated,
-// e.g. only the last sample per each time series per each dedupInterval is aggregated.
+// opts can contain additional options. If opts is nil, then default options are used.
 //
 // The returned Aggregators must be stopped with MustStop() when no longer needed.
-func LoadFromFile(path string, pushFunc PushFunc, dedupInterval time.Duration) (*Aggregators, error) {
+func LoadFromFile(path string, pushFunc PushFunc, opts *Options) (*Aggregators, error) {
 	data, err := fscore.ReadFileOrHTTP(path)
 	if err != nil {
 		return nil, fmt.Errorf("cannot load aggregators: %w", err)
@@ -59,7 +59,7 @@ func LoadFromFile(path string, pushFunc PushFunc, dedupInterval time.Duration) (
 		return nil, fmt.Errorf("cannot expand environment variables in %q: %w", path, err)
 	}
 
-	as, err := newAggregatorsFromData(data, pushFunc, dedupInterval)
+	as, err := newAggregatorsFromData(data, pushFunc, opts)
 	if err != nil {
 		return nil, fmt.Errorf("cannot initialize aggregators from %q: %w", path, err)
 	}
@@ -67,12 +67,40 @@ func LoadFromFile(path string, pushFunc PushFunc, dedupInterval time.Duration) (
 	return as, nil
 }
 
-func newAggregatorsFromData(data []byte, pushFunc PushFunc, dedupInterval time.Duration) (*Aggregators, error) {
+func newAggregatorsFromData(data []byte, pushFunc PushFunc, opts *Options) (*Aggregators, error) {
 	var cfgs []*Config
 	if err := yaml.UnmarshalStrict(data, &cfgs); err != nil {
 		return nil, fmt.Errorf("cannot parse stream aggregation config: %w", err)
 	}
-	return NewAggregators(cfgs, pushFunc, dedupInterval)
+	return NewAggregators(cfgs, pushFunc, opts)
+}
+
+// Options contains optional settings for the Aggregators.
+type Options struct {
+	// DedupInterval is deduplication interval for samples received for the same time series.
+	//
+	// The last sample per each series is left per each DedupInterval if DedupInterval > 0.
+	//
+	// By default deduplication is disabled.
+	DedupInterval time.Duration
+
+	// NoAlignFlushToInterval disables alignment of flushes to the aggregation interval.
+	//
+	// By default flushes are aligned to aggregation interval.
+	NoAlignFlushToInterval bool
+
+	// FlushOnShutdown enables flush of incomplete state on shutdown.
+	//
+	// By default incomplete state is dropped on shutdown.
+	FlushOnShutdown bool
+
+	// KeepMetricNames instructs to leave metric names as is for the output time series without adding any suffix.
+	//
+	// By default the following suffix is added to every output time series:
+	//
+	//     input_name:<interval>[_by_<by_labels>][_without_<without_labels>]_<output>
+	//
+	KeepMetricNames bool
 }
 
 // Config is a configuration for a single stream aggregation.
@@ -85,6 +113,16 @@ type Config struct {
 	// Interval is the interval between aggregations.
 	Interval string `yaml:"interval"`
 
+	// NoAlighFlushToInterval disables aligning of flushes to multiples of Interval.
+	// By default flushes are aligned to Interval.
+	//
+	// See also FlushOnShutdown.
+	NoAlignFlushToInterval *bool `yaml:"no_align_flush_to_interval,omitempty"`
+
+	// FlushOnShutdown defines whether to flush the aggregation state on process termination
+	// or config reload. By default the state is dropped on these events.
+	FlushOnShutdown *bool `yaml:"flush_on_shutdown,omitempty"`
+
 	// DedupInterval is an optional interval for deduplication.
 	DedupInterval string `yaml:"dedup_interval,omitempty"`
 
@@ -121,9 +159,8 @@ type Config struct {
 	//
 	Outputs []string `yaml:"outputs"`
 
-	// KeepMetricNames instructs to leave metric names as is for the output time series
-	// without adding any suffix.
-	KeepMetricNames bool `yaml:"keep_metric_names,omitempty"`
+	// KeepMetricNames instructs to leave metric names as is for the output time series without adding any suffix.
+	KeepMetricNames *bool `yaml:"keep_metric_names,omitempty"`
 
 	// By is an optional list of labels for grouping input series.
 	//
@@ -148,10 +185,6 @@ type Config struct {
 	// OutputRelabelConfigs is an optional relabeling rules, which are applied
 	// on the aggregated output before being sent to remote storage.
 	OutputRelabelConfigs []promrelabel.RelabelConfig `yaml:"output_relabel_configs,omitempty"`
-
-	// FlushOnShutdown defines whether to flush the aggregation state on process termination
-	// or config reload. Is `false` by default.
-	FlushOnShutdown bool `yaml:"flush_on_shutdown,omitempty"`
 }
 
 // Aggregators aggregates metrics passed to Push and calls pushFunc for aggregate data.
@@ -169,15 +202,14 @@ type Aggregators struct {
 //
 // pushFunc is called when the aggregated data must be flushed.
 //
-// If dedupInterval > 0, then the input samples are de-duplicated before being aggregated,
-// e.g. only the last sample per each time series per each dedupInterval is aggregated.
+// opts can contain additional options. If opts is nil, then default options are used.
 //
 // MustStop must be called on the returned Aggregators when they are no longer needed.
-func NewAggregators(cfgs []*Config, pushFunc PushFunc, dedupInterval time.Duration) (*Aggregators, error) {
+func NewAggregators(cfgs []*Config, pushFunc PushFunc, opts *Options) (*Aggregators, error) {
 	ms := metrics.NewSet()
 	as := make([]*aggregator, len(cfgs))
 	for i, cfg := range cfgs {
-		a, err := newAggregator(cfg, pushFunc, ms, dedupInterval)
+		a, err := newAggregator(cfg, pushFunc, ms, opts)
 		if err != nil {
 			// Stop already initialized aggregators before returning the error.
 			for _, a := range as[:i] {
@@ -294,19 +326,15 @@ type aggregator struct {
 	without             []string
 	aggregateOnlyByTime bool
 
-	// da is set to non-nil if input samples must be de-duplicated according
-	// to the dedupInterval passed to newAggregator().
+	// da is set to non-nil if input samples must be de-duplicated
 	da *dedupAggr
 
 	// aggrStates contains aggregate states for the given outputs
 	aggrStates []aggrState
 
-	// lc is used for compressing series keys before passing them to dedupAggr and aggrState.
+	// lc is used for compressing series keys before passing them to dedupAggr and aggrState
 	lc promutils.LabelsCompressor
 
-	// pushFunc is the callback, which is called by aggrState when flushing its state.
-	pushFunc PushFunc
-
 	// suffix contains a suffix, which should be added to aggregate metric names
 	//
 	// It contains the interval, labels in (by, without), plus output name.
@@ -314,10 +342,6 @@ type aggregator struct {
 	// for `interval: 1m`, `by: [job]`
 	suffix string
 
-	// flushOnShutdown defines whether to flush the state of aggregation
-	// on MustStop call.
-	flushOnShutdown bool
-
 	wg     sync.WaitGroup
 	stopCh chan struct{}
 
@@ -338,11 +362,14 @@ type PushFunc func(tss []prompbmarshal.TimeSeries)
 
 // newAggregator creates new aggregator for the given cfg, which pushes the aggregate data to pushFunc.
 //
-// If dedupInterval > 0, then the input samples are de-duplicated before being aggregated,
-// e.g. only the last sample per each time series per each dedupInterval is aggregated.
+// opts can contain additional options. If opts is nil, then default options are used.
 //
 // The returned aggregator must be stopped when no longer needed by calling MustStop().
-func newAggregator(cfg *Config, pushFunc PushFunc, ms *metrics.Set, dedupInterval time.Duration) (*aggregator, error) {
+func newAggregator(cfg *Config, pushFunc PushFunc, ms *metrics.Set, opts *Options) (*aggregator, error) {
+	if opts == nil {
+		opts = &Options{}
+	}
+
 	// check cfg.Interval
 	interval, err := time.ParseDuration(cfg.Interval)
 	if err != nil {
@@ -353,6 +380,7 @@ func newAggregator(cfg *Config, pushFunc PushFunc, ms *metrics.Set, dedupInterva
 	}
 
 	// check cfg.DedupInterval
+	dedupInterval := opts.DedupInterval
 	if cfg.DedupInterval != "" {
 		di, err := time.ParseDuration(cfg.DedupInterval)
 		if err != nil {
@@ -401,7 +429,11 @@ func newAggregator(cfg *Config, pushFunc PushFunc, ms *metrics.Set, dedupInterva
 	}
 
 	// check cfg.KeepMetricNames
-	if cfg.KeepMetricNames {
+	keepMetricNames := opts.KeepMetricNames
+	if v := cfg.KeepMetricNames; v != nil {
+		keepMetricNames = *v
+	}
+	if keepMetricNames {
 		if len(cfg.Outputs) != 1 {
 			return nil, fmt.Errorf("`ouputs` list must contain only a single entry if `keep_metric_names` is set; got %q", cfg.Outputs)
 		}
@@ -495,17 +527,15 @@ func newAggregator(cfg *Config, pushFunc PushFunc, ms *metrics.Set, dedupInterva
 		inputRelabeling:  inputRelabeling,
 		outputRelabeling: outputRelabeling,
 
-		keepMetricNames: cfg.KeepMetricNames,
+		keepMetricNames: keepMetricNames,
 
 		by:                  by,
 		without:             without,
 		aggregateOnlyByTime: aggregateOnlyByTime,
 
 		aggrStates: aggrStates,
-		pushFunc:   pushFunc,
 
-		suffix:          suffix,
-		flushOnShutdown: cfg.FlushOnShutdown,
+		suffix: suffix,
 
 		stopCh: make(chan struct{}),
 
@@ -519,33 +549,90 @@ func newAggregator(cfg *Config, pushFunc PushFunc, ms *metrics.Set, dedupInterva
 		a.da = newDedupAggr()
 	}
 
+	alignFlushToInterval := !opts.NoAlignFlushToInterval
+	if v := cfg.NoAlignFlushToInterval; v != nil {
+		alignFlushToInterval = !*v
+	}
+
+	skipIncompleteFlush := !opts.FlushOnShutdown
+	if v := cfg.FlushOnShutdown; v != nil {
+		skipIncompleteFlush = !*v
+	}
+
 	a.wg.Add(1)
 	go func() {
-		a.runFlusher(interval, dedupInterval)
+		a.runFlusher(pushFunc, alignFlushToInterval, skipIncompleteFlush, interval, dedupInterval)
 		a.wg.Done()
 	}()
 
 	return a, nil
 }
 
-func (a *aggregator) runFlusher(interval, dedupInterval time.Duration) {
-	tickerFlush := time.NewTicker(interval)
-	defer tickerFlush.Stop()
+func (a *aggregator) runFlusher(pushFunc PushFunc, alignFlushToInterval, skipIncompleteFlush bool, interval, dedupInterval time.Duration) {
+	flushTickerCh := make(chan *time.Ticker, 1)
+	dedupFlushTickerCh := make(chan *time.Ticker, 1)
+	go func() {
+		if !alignFlushToInterval {
+			flushTickerCh <- time.NewTicker(interval)
+			if dedupInterval > 0 {
+				dedupFlushTickerCh <- time.NewTicker(dedupInterval)
+			}
+			return
+		}
 
-	var dedupTickerCh <-chan time.Time
-	if dedupInterval > 0 {
-		t := time.NewTicker(dedupInterval)
-		defer t.Stop()
-		dedupTickerCh = t.C
-	}
+		sleep := func(d time.Duration) {
+			timer := timerpool.Get(d)
+			defer timerpool.Put(timer)
+			select {
+			case <-a.stopCh:
+			case <-timer.C:
+			}
+		}
+		currentTime := time.Duration(time.Now().UnixNano())
+		if dedupInterval > 0 {
+			d := dedupInterval - (currentTime % dedupInterval)
+			if d < dedupInterval {
+				sleep(d)
+			}
+			dedupFlushTickerCh <- time.NewTicker(dedupInterval)
+			currentTime += d
+		}
+		d := interval - (currentTime % interval)
+		if d < interval {
+			sleep(d)
+		}
+		flushTickerCh <- time.NewTicker(interval)
+	}()
 
+	var flushTickerC <-chan time.Time
+	var dedupFlushTickerC <-chan time.Time
+	isFirstFlush := true
 	for {
 		select {
 		case <-a.stopCh:
+			if !skipIncompleteFlush {
+				if dedupInterval > 0 {
+					a.dedupFlush()
+				}
+				a.flush(pushFunc)
+			}
 			return
-		case <-tickerFlush.C:
+		case flushTicker := <-flushTickerCh:
+			flushTickerC = flushTicker.C
+			defer flushTicker.Stop()
+		case dedupFlushTicker := <-dedupFlushTickerCh:
+			dedupFlushTickerC = dedupFlushTicker.C
+			defer dedupFlushTicker.Stop()
+		case <-flushTickerC:
+			if isFirstFlush {
+				isFirstFlush = false
+				if alignFlushToInterval && skipIncompleteFlush {
+					a.flush(nil)
+					continue
+				}
+			}
 			startTime := time.Now()
-			a.flush()
+			a.flush(pushFunc)
 			d := time.Since(startTime)
 			a.flushDuration.Update(d.Seconds())
 			if d > interval {
@@ -554,7 +641,7 @@ func (a *aggregator) runFlusher(interval, dedupInterval time.Duration) {
 					"possible solutions: increase interval; use match filter matching smaller number of series; "+
 					"reduce samples' ingestion rate to stream aggregation", interval, d)
 			}
-		case <-dedupTickerCh:
+		case <-dedupFlushTickerC:
 			startTime := time.Now()
 			a.dedupFlush()
 			d := time.Since(startTime)
@@ -573,7 +660,7 @@ func (a *aggregator) dedupFlush() {
 	a.da.flush(a.pushSamples)
 }
 
-func (a *aggregator) flush() {
+func (a *aggregator) flush(pushFunc PushFunc) {
 	var wg sync.WaitGroup
 	for _, as := range a.aggrStates {
 		flushConcurrencyCh <- struct{}{}
@@ -584,7 +671,7 @@ func (a *aggregator) flush() {
 				wg.Done()
 			}()
 
-			ctx := getFlushCtx(a)
+			ctx := getFlushCtx(a, pushFunc)
 			as.flushState(ctx)
 			ctx.flushSeries()
 			putFlushCtx(ctx)
@@ -601,16 +688,6 @@ var flushConcurrencyCh = make(chan struct{}, cgroup.AvailableCPUs())
 func (a *aggregator) MustStop() {
 	close(a.stopCh)
 	a.wg.Wait()
-
-	if !a.flushOnShutdown {
-		return
-	}
-
-	// Flush the remaining data from the last interval if needed.
-	if a.da != nil {
-		a.dedupFlush()
-	}
-	a.flush()
 }
 
 // Push pushes tss to a.
@@ -770,13 +847,14 @@ func getInputOutputLabels(dstInput, dstOutput, labels []prompbmarshal.Label, by,
 	return dstInput, dstOutput
 }
 
-func getFlushCtx(a *aggregator) *flushCtx {
+func getFlushCtx(a *aggregator, pushFunc PushFunc) *flushCtx {
 	v := flushCtxPool.Get()
 	if v == nil {
 		v = &flushCtx{}
 	}
 	ctx := v.(*flushCtx)
 	ctx.a = a
+	ctx.pushFunc = pushFunc
 	return ctx
 }
 
@@ -788,7 +866,8 @@ func putFlushCtx(ctx *flushCtx) {
 var flushCtxPool sync.Pool
 
 type flushCtx struct {
-	a *aggregator
+	a        *aggregator
+	pushFunc PushFunc
 
 	tss     []prompbmarshal.TimeSeries
 	labels  []prompbmarshal.Label
@@ -797,6 +876,7 @@ type flushCtx struct {
 
 func (ctx *flushCtx) reset() {
 	ctx.a = nil
+	ctx.pushFunc = nil
 	ctx.resetSeries()
 }
 
@@ -819,7 +899,9 @@ func (ctx *flushCtx) flushSeries() {
 	outputRelabeling := ctx.a.outputRelabeling
 	if outputRelabeling == nil {
 		// Fast path - push the output metrics.
-		ctx.a.pushFunc(tss)
+		if ctx.pushFunc != nil {
+			ctx.pushFunc(tss)
+		}
 		return
 	}
 
@@ -838,7 +920,9 @@ func (ctx *flushCtx) flushSeries() {
 		ts.Labels = dstLabels[dstLabelsLen:]
 		dst = append(dst, ts)
 	}
-	ctx.a.pushFunc(dst)
+	if ctx.pushFunc != nil {
+		ctx.pushFunc(dst)
+	}
 	auxLabels.Labels = dstLabels
 	promutils.PutLabels(auxLabels)
 
diff --git a/lib/streamaggr/streamaggr_test.go b/lib/streamaggr/streamaggr_test.go
index 5e10ade645..d00bc4d85e 100644
--- a/lib/streamaggr/streamaggr_test.go
+++ b/lib/streamaggr/streamaggr_test.go
@@ -20,7 +20,7 @@ func TestAggregatorsFailure(t *testing.T) {
 		pushFunc := func(tss []prompbmarshal.TimeSeries) {
 			panic(fmt.Errorf("pushFunc shouldn't be called"))
 		}
-		a, err := newAggregatorsFromData([]byte(config), pushFunc, 0)
+		a, err := newAggregatorsFromData([]byte(config), pushFunc, nil)
 		if err == nil {
 			t.Fatalf("expecting non-nil error")
 		}
@@ -158,11 +158,11 @@ func TestAggregatorsEqual(t *testing.T) {
 		t.Helper()
 
 		pushFunc := func(tss []prompbmarshal.TimeSeries) {}
-		aa, err := newAggregatorsFromData([]byte(a), pushFunc, 0)
+		aa, err := newAggregatorsFromData([]byte(a), pushFunc, nil)
 		if err != nil {
 			t.Fatalf("cannot initialize aggregators: %s", err)
 		}
-		ab, err := newAggregatorsFromData([]byte(b), pushFunc, 0)
+		ab, err := newAggregatorsFromData([]byte(b), pushFunc, nil)
 		if err != nil {
 			t.Fatalf("cannot initialize aggregators: %s", err)
 		}
@@ -220,15 +220,14 @@ func TestAggregatorsSuccess(t *testing.T) {
 			}
 			tssOutputLock.Unlock()
 		}
-		a, err := newAggregatorsFromData([]byte(config), pushFunc, 0)
+		opts := &Options{
+			FlushOnShutdown:        true,
+			NoAlignFlushToInterval: true,
+		}
+		a, err := newAggregatorsFromData([]byte(config), pushFunc, opts)
 		if err != nil {
 			t.Fatalf("cannot initialize aggregators: %s", err)
 		}
-		for _, ag := range a.as {
-			// explicitly set flushOnShutdown, so aggregations results
-			// are immediately available after a.MustStop() call.
-			ag.flushOnShutdown = true
-		}
 
 		// Push the inputMetrics to Aggregators
 		tssInput := mustParsePromMetrics(inputMetrics)
@@ -862,8 +861,11 @@ func TestAggregatorsWithDedupInterval(t *testing.T) {
 			}
 			tssOutputLock.Unlock()
 		}
-		const dedupInterval = 30 * time.Second
-		a, err := newAggregatorsFromData([]byte(config), pushFunc, dedupInterval)
+		opts := &Options{
+			DedupInterval:   30 * time.Second,
+			FlushOnShutdown: true,
+		}
+		a, err := newAggregatorsFromData([]byte(config), pushFunc, opts)
 		if err != nil {
 			t.Fatalf("cannot initialize aggregators: %s", err)
 		}
@@ -871,12 +873,6 @@ func TestAggregatorsWithDedupInterval(t *testing.T) {
 		// Push the inputMetrics to Aggregators
 		tssInput := mustParsePromMetrics(inputMetrics)
 		matchIdxs := a.Push(tssInput, nil)
-		if a != nil {
-			for _, aggr := range a.as {
-				aggr.dedupFlush()
-				aggr.flush()
-			}
-		}
 		a.MustStop()
 
 		// Verify matchIdxs equals to matchIdxsExpected
diff --git a/lib/streamaggr/streamaggr_timing_test.go b/lib/streamaggr/streamaggr_timing_test.go
index c472803125..ad220eee67 100644
--- a/lib/streamaggr/streamaggr_timing_test.go
+++ b/lib/streamaggr/streamaggr_timing_test.go
@@ -44,7 +44,8 @@ func BenchmarkAggregatorsFlushSerial(b *testing.B) {
 }
 
 func benchmarkAggregatorsFlushSerial(b *testing.B, output string) {
-	a := newBenchAggregators(output)
+	pushFunc := func(tss []prompbmarshal.TimeSeries) {}
+	a := newBenchAggregators(output, pushFunc)
 	defer a.MustStop()
 
 	var matchIdxs []byte
@@ -54,13 +55,14 @@ func benchmarkAggregatorsFlushSerial(b *testing.B, output string) {
 	for i := 0; i < b.N; i++ {
 		matchIdxs = a.Push(benchSeries, matchIdxs)
 		for _, aggr := range a.as {
-			aggr.flush()
+			aggr.flush(pushFunc)
 		}
 	}
 }
 
 func benchmarkAggregatorsPush(b *testing.B, output string) {
-	a := newBenchAggregators(output)
+	pushFunc := func(tss []prompbmarshal.TimeSeries) {}
+	a := newBenchAggregators(output, pushFunc)
 	defer a.MustStop()
 
 	const loops = 100
@@ -77,15 +79,14 @@ func benchmarkAggregatorsPush(b *testing.B, output string) {
 	})
 }
 
-func newBenchAggregators(output string) *Aggregators {
+func newBenchAggregators(output string, pushFunc PushFunc) *Aggregators {
 	config := fmt.Sprintf(`
 - match: http_requests_total
   interval: 24h
   without: [job]
   outputs: [%q]
 `, output)
-	pushFunc := func(tss []prompbmarshal.TimeSeries) {}
-	a, err := newAggregatorsFromData([]byte(config), pushFunc, 0)
+	a, err := newAggregatorsFromData([]byte(config), pushFunc, nil)
 	if err != nil {
 		panic(fmt.Errorf("unexpected error when initializing aggregators: %s", err))
 	}