diff --git a/lib/streamaggr/streamaggr.go b/lib/streamaggr/streamaggr.go index d975f9406..730c9897b 100644 --- a/lib/streamaggr/streamaggr.go +++ b/lib/streamaggr/streamaggr.go @@ -82,16 +82,22 @@ type Options struct { // The last sample per each series is left per each DedupInterval if DedupInterval > 0. // // By default deduplication is disabled. + // + // The deduplication can be set up individually per each aggregation via dedup_interval option. DedupInterval time.Duration // NoAlignFlushToInterval disables alignment of flushes to the aggregation interval. // // By default flushes are aligned to aggregation interval. + // + // The alignment of flushes can be disabled individually per each aggregation via no_align_flush_to_interval option. NoAlignFlushToInterval bool - // FlushOnShutdown enables flush of incomplete state on shutdown. + // FlushOnShutdown enables flush of incomplete state on start and shutdown. // // By default incomplete state is dropped on shutdown. + // + // The flush of incomplete state can be enabled individually per each aggregation via flush_on_shutdown option. FlushOnShutdown bool // KeepMetricNames instructs to leave metric names as is for the output time series without adding any suffix. @@ -100,6 +106,7 @@ type Options struct { // // input_name:[_by_][_without_]_ // + // This option can be overriden individually per each aggregation via keep_metric_names option. KeepMetricNames bool } @@ -601,20 +608,22 @@ func (a *aggregator) runFlusher(pushFunc PushFunc, alignFlushToInterval, skipInc if d < interval { sleep(d) } - flushTickerCh <- time.NewTicker(interval) + t := time.NewTicker(interval) + if skipIncompleteFlush { + a.dedupFlush(dedupInterval) + a.flush(nil, interval) + } + flushTickerCh <- t }() var flushTickerC <-chan time.Time var dedupFlushTickerC <-chan time.Time - isFirstFlush := true for { select { case <-a.stopCh: if !skipIncompleteFlush { - if dedupInterval > 0 { - a.dedupFlush() - } - a.flush(pushFunc) + a.dedupFlush(dedupInterval) + a.flush(pushFunc, interval) } return case flushTicker := <-flushTickerCh: @@ -624,43 +633,42 @@ func (a *aggregator) runFlusher(pushFunc PushFunc, alignFlushToInterval, skipInc dedupFlushTickerC = dedupFlushTicker.C defer dedupFlushTicker.Stop() case <-flushTickerC: - if isFirstFlush { - isFirstFlush = false - if alignFlushToInterval && skipIncompleteFlush { - a.flush(nil) - continue - } - } - startTime := time.Now() - a.flush(pushFunc) - d := time.Since(startTime) - a.flushDuration.Update(d.Seconds()) - if d > interval { - a.flushTimeouts.Inc() - logger.Warnf("stream aggregation couldn't be finished in the configured interval=%s; it took %s; "+ - "possible solutions: increase interval; use match filter matching smaller number of series; "+ - "reduce samples' ingestion rate to stream aggregation", interval, d) + select { + case <-dedupFlushTickerC: + // flush deduplicated samples if needed before flushing the aggregated samples + a.dedupFlush(dedupInterval) + default: } + a.flush(pushFunc, interval) case <-dedupFlushTickerC: - startTime := time.Now() - a.dedupFlush() - d := time.Since(startTime) - a.dedupFlushDuration.Update(d.Seconds()) - if d > dedupInterval { - a.dedupFlushTimeouts.Inc() - logger.Warnf("stream aggregation deduplication couldn't be finished in the configured dedup_interval=%s; it took %s; "+ - "possible solutions: increase dedup_interval; use match filter matching smaller number of series; "+ - "reduce samples' ingestion rate to stream aggregation", dedupInterval, d) - } + a.dedupFlush(dedupInterval) } } } -func (a *aggregator) dedupFlush() { +func (a *aggregator) dedupFlush(dedupInterval time.Duration) { + if dedupInterval <= 0 { + // The de-duplication is disabled. + return + } + + startTime := time.Now() + a.da.flush(a.pushSamples) + + d := time.Since(startTime) + a.dedupFlushDuration.Update(d.Seconds()) + if d > dedupInterval { + a.dedupFlushTimeouts.Inc() + logger.Warnf("deduplication couldn't be finished in the configured dedup_interval=%s; it took %.03f; "+ + "possible solutions: increase dedup_interval; use match filter matching smaller number of series; "+ + "reduce samples' ingestion rate to stream aggregation", dedupInterval, d.Seconds()) + } } -func (a *aggregator) flush(pushFunc PushFunc) { +func (a *aggregator) flush(pushFunc PushFunc, interval time.Duration) { + startTime := time.Now() + var wg sync.WaitGroup for _, as := range a.aggrStates { flushConcurrencyCh <- struct{}{} @@ -679,6 +687,15 @@ func (a *aggregator) flush(pushFunc PushFunc) { }(as) } wg.Wait() + + d := time.Since(startTime) + a.flushDuration.Update(d.Seconds()) + if d > interval { + a.flushTimeouts.Inc() + logger.Warnf("stream aggregation couldn't be finished in the configured interval=%s; it took %.03f; "+ + "possible solutions: increase interval; use match filter matching smaller number of series; "+ + "reduce samples' ingestion rate to stream aggregation", interval, d.Seconds()) + } } var flushConcurrencyCh = make(chan struct{}, cgroup.AvailableCPUs()) diff --git a/lib/streamaggr/streamaggr_timing_test.go b/lib/streamaggr/streamaggr_timing_test.go index ad220eee6..ed6c8f032 100644 --- a/lib/streamaggr/streamaggr_timing_test.go +++ b/lib/streamaggr/streamaggr_timing_test.go @@ -4,6 +4,7 @@ import ( "fmt" "strings" "testing" + "time" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" ) @@ -55,7 +56,7 @@ func benchmarkAggregatorsFlushSerial(b *testing.B, output string) { for i := 0; i < b.N; i++ { matchIdxs = a.Push(benchSeries, matchIdxs) for _, aggr := range a.as { - aggr.flush(pushFunc) + aggr.flush(pushFunc, time.Hour) } } }