diff --git a/lib/streamaggr/dedup.go b/lib/streamaggr/dedup.go index 24dcca554..5e8babc5a 100644 --- a/lib/streamaggr/dedup.go +++ b/lib/streamaggr/dedup.go @@ -227,6 +227,7 @@ func (das *dedupAggrShard) flush(ctx *dedupFlushCtx, f aggrPushFunc) { var m map[string]*dedupAggrSample if len(das.state) == 0 { + das.mu.Unlock() return } state := das.state[ctx.dedupIdx] diff --git a/lib/streamaggr/streamaggr.go b/lib/streamaggr/streamaggr.go index 422c678bf..dbd44b8c5 100644 --- a/lib/streamaggr/streamaggr.go +++ b/lib/streamaggr/streamaggr.go @@ -571,7 +571,6 @@ func newAggregator(cfg *Config, path string, pushFunc PushFunc, ms *metrics.Set, if name == "" { name = "none" } - metricLabels := fmt.Sprintf(`name=%q,path=%q,url=%q,position="%d"`, name, path, alias, aggrID) // initialize aggrOutputs if len(cfg.Outputs) == 0 { @@ -579,9 +578,8 @@ func newAggregator(cfg *Config, path string, pushFunc PushFunc, ms *metrics.Set, "see https://docs.victoriametrics.com/stream-aggregation/", supportedOutputs) } aggrOutputs := &aggrOutputs{ - initFns: make([]aggrValuesFn, len(cfg.Outputs)), - outputSamples: ms.NewCounter(fmt.Sprintf(`vm_streamaggr_output_samples_total{outputs=%q,%s}`, "test", metricLabels)), - stateSize: stateSize, + initFns: make([]aggrValuesFn, len(cfg.Outputs)), + stateSize: stateSize, } outputsSeen := make(map[string]struct{}, len(cfg.Outputs)) for i, output := range cfg.Outputs { @@ -591,6 +589,12 @@ func newAggregator(cfg *Config, path string, pushFunc PushFunc, ms *metrics.Set, } aggrOutputs.initFns[i] = oc } + outputsLabels := make([]string, 0, len(outputsSeen)) + for o := range outputsSeen { + outputsLabels = append(outputsLabels, o) + } + metricLabels := fmt.Sprintf(`outputs=%q,name=%q,path=%q,url=%q,position="%d"`, strings.Join(outputsLabels, ","), name, path, alias, aggrID) + aggrOutputs.outputSamples = ms.NewCounter(fmt.Sprintf(`vm_streamaggr_output_samples_total{%s}`, metricLabels)) // initialize suffix to add to metric names after aggregation suffix := ":" + cfg.Interval @@ -755,11 +759,14 @@ func newOutputInitFns(output string, outputsSeen map[string]struct{}, ignoreFirs } func (a *aggregator) runFlusher(pushFunc PushFunc, alignFlushToInterval, skipIncompleteFlush bool, ignoreFirstIntervals int) { + flushDeadline := time.Now().Add(a.interval) alignedSleep := func(d time.Duration) { if !alignFlushToInterval { return } - + if flushDeadline != flushDeadline.Truncate(d) { + flushDeadline = flushDeadline.Truncate(d).Add(d) + } ct := time.Duration(time.Now().UnixNano()) dSleep := d - (ct % d) timer := timerpool.Get(dSleep) @@ -770,18 +777,26 @@ func (a *aggregator) runFlusher(pushFunc PushFunc, alignFlushToInterval, skipInc } } + var dedupTime time.Time + truncateIfNeeded := func(t time.Time) time.Time { + if alignFlushToInterval { + return t.Truncate(a.interval) + } + return t + } tickerWait := func(t *time.Ticker) bool { select { case <-a.stopCh: + dedupTime = time.Now() return false - case <-t.C: + case ct := <-t.C: + dedupTime = ct return true } } - flushDeadline := time.Now().Truncate(a.interval).Add(a.interval) tickInterval := time.Duration(a.tickInterval) * time.Millisecond - alignedSleep(tickInterval) + alignedSleep(a.interval) var dedupIdx, flushIdx int @@ -790,11 +805,9 @@ func (a *aggregator) runFlusher(pushFunc PushFunc, alignFlushToInterval, skipInc isSkippedFirstFlush := false for tickerWait(t) { - ct := time.Now() - - dedupTime := ct.Truncate(tickInterval) + dedupDeadline := truncateIfNeeded(dedupTime) if a.ignoreOldSamples { - dedupIdx, flushIdx = a.getAggrIdxs(dedupTime, flushDeadline) + dedupIdx, flushIdx = a.getAggrIdxs(dedupDeadline, flushDeadline) } pf := pushFunc @@ -813,10 +826,10 @@ func (a *aggregator) runFlusher(pushFunc PushFunc, alignFlushToInterval, skipInc time.Sleep(flushAfter) } - deleteDeadline := dedupTime.Add(a.stalenessInterval) + deleteDeadline := dedupDeadline.Add(a.stalenessInterval) a.dedupFlush(deleteDeadline.UnixMilli(), dedupIdx, flushIdx) - if ct.After(flushDeadline) { + if dedupTime.After(flushDeadline) { // It is time to flush the aggregated state if alignFlushToInterval && skipIncompleteFlush && !isSkippedFirstFlush { a.flush(nil, 0, flushIdx) @@ -827,7 +840,7 @@ func (a *aggregator) runFlusher(pushFunc PushFunc, alignFlushToInterval, skipInc } else { a.flush(pf, flushDeadline.UnixMilli(), flushIdx) } - for ct.After(flushDeadline) { + for dedupTime.After(flushDeadline) { flushDeadline = flushDeadline.Add(a.interval) } } @@ -840,10 +853,10 @@ func (a *aggregator) runFlusher(pushFunc PushFunc, alignFlushToInterval, skipInc } if !skipIncompleteFlush && ignoreFirstIntervals <= 0 { - dedupTime := time.Now().Truncate(tickInterval).Add(tickInterval) + dedupDeadline := truncateIfNeeded(dedupTime) deleteDeadline := flushDeadline.Add(a.stalenessInterval) if a.ignoreOldSamples { - dedupIdx, flushIdx = a.getAggrIdxs(dedupTime, flushDeadline) + dedupIdx, flushIdx = a.getAggrIdxs(dedupDeadline, flushDeadline) } a.dedupFlush(deleteDeadline.UnixMilli(), dedupIdx, flushIdx) a.flush(pushFunc, flushDeadline.UnixMilli(), flushIdx)