minor fix

This commit is contained in:
Andrii Chubatiuk 2024-10-10 20:34:13 +03:00
parent b1a5ed7604
commit 87d512f09f
No known key found for this signature in database
GPG key ID: 96D776CC99880667
2 changed files with 31 additions and 17 deletions

View file

@ -227,6 +227,7 @@ func (das *dedupAggrShard) flush(ctx *dedupFlushCtx, f aggrPushFunc) {
var m map[string]*dedupAggrSample var m map[string]*dedupAggrSample
if len(das.state) == 0 { if len(das.state) == 0 {
das.mu.Unlock()
return return
} }
state := das.state[ctx.dedupIdx] state := das.state[ctx.dedupIdx]

View file

@ -571,7 +571,6 @@ func newAggregator(cfg *Config, path string, pushFunc PushFunc, ms *metrics.Set,
if name == "" { if name == "" {
name = "none" name = "none"
} }
metricLabels := fmt.Sprintf(`name=%q,path=%q,url=%q,position="%d"`, name, path, alias, aggrID)
// initialize aggrOutputs // initialize aggrOutputs
if len(cfg.Outputs) == 0 { if len(cfg.Outputs) == 0 {
@ -579,9 +578,8 @@ func newAggregator(cfg *Config, path string, pushFunc PushFunc, ms *metrics.Set,
"see https://docs.victoriametrics.com/stream-aggregation/", supportedOutputs) "see https://docs.victoriametrics.com/stream-aggregation/", supportedOutputs)
} }
aggrOutputs := &aggrOutputs{ aggrOutputs := &aggrOutputs{
initFns: make([]aggrValuesFn, len(cfg.Outputs)), initFns: make([]aggrValuesFn, len(cfg.Outputs)),
outputSamples: ms.NewCounter(fmt.Sprintf(`vm_streamaggr_output_samples_total{outputs=%q,%s}`, "test", metricLabels)), stateSize: stateSize,
stateSize: stateSize,
} }
outputsSeen := make(map[string]struct{}, len(cfg.Outputs)) outputsSeen := make(map[string]struct{}, len(cfg.Outputs))
for i, output := range cfg.Outputs { for i, output := range cfg.Outputs {
@ -591,6 +589,12 @@ func newAggregator(cfg *Config, path string, pushFunc PushFunc, ms *metrics.Set,
} }
aggrOutputs.initFns[i] = oc aggrOutputs.initFns[i] = oc
} }
outputsLabels := make([]string, 0, len(outputsSeen))
for o := range outputsSeen {
outputsLabels = append(outputsLabels, o)
}
metricLabels := fmt.Sprintf(`outputs=%q,name=%q,path=%q,url=%q,position="%d"`, strings.Join(outputsLabels, ","), name, path, alias, aggrID)
aggrOutputs.outputSamples = ms.NewCounter(fmt.Sprintf(`vm_streamaggr_output_samples_total{%s}`, metricLabels))
// initialize suffix to add to metric names after aggregation // initialize suffix to add to metric names after aggregation
suffix := ":" + cfg.Interval suffix := ":" + cfg.Interval
@ -755,11 +759,14 @@ func newOutputInitFns(output string, outputsSeen map[string]struct{}, ignoreFirs
} }
func (a *aggregator) runFlusher(pushFunc PushFunc, alignFlushToInterval, skipIncompleteFlush bool, ignoreFirstIntervals int) { func (a *aggregator) runFlusher(pushFunc PushFunc, alignFlushToInterval, skipIncompleteFlush bool, ignoreFirstIntervals int) {
flushDeadline := time.Now().Add(a.interval)
alignedSleep := func(d time.Duration) { alignedSleep := func(d time.Duration) {
if !alignFlushToInterval { if !alignFlushToInterval {
return return
} }
if flushDeadline != flushDeadline.Truncate(d) {
flushDeadline = flushDeadline.Truncate(d).Add(d)
}
ct := time.Duration(time.Now().UnixNano()) ct := time.Duration(time.Now().UnixNano())
dSleep := d - (ct % d) dSleep := d - (ct % d)
timer := timerpool.Get(dSleep) timer := timerpool.Get(dSleep)
@ -770,18 +777,26 @@ func (a *aggregator) runFlusher(pushFunc PushFunc, alignFlushToInterval, skipInc
} }
} }
var dedupTime time.Time
truncateIfNeeded := func(t time.Time) time.Time {
if alignFlushToInterval {
return t.Truncate(a.interval)
}
return t
}
tickerWait := func(t *time.Ticker) bool { tickerWait := func(t *time.Ticker) bool {
select { select {
case <-a.stopCh: case <-a.stopCh:
dedupTime = time.Now()
return false return false
case <-t.C: case ct := <-t.C:
dedupTime = ct
return true return true
} }
} }
flushDeadline := time.Now().Truncate(a.interval).Add(a.interval)
tickInterval := time.Duration(a.tickInterval) * time.Millisecond tickInterval := time.Duration(a.tickInterval) * time.Millisecond
alignedSleep(tickInterval) alignedSleep(a.interval)
var dedupIdx, flushIdx int var dedupIdx, flushIdx int
@ -790,11 +805,9 @@ func (a *aggregator) runFlusher(pushFunc PushFunc, alignFlushToInterval, skipInc
isSkippedFirstFlush := false isSkippedFirstFlush := false
for tickerWait(t) { for tickerWait(t) {
ct := time.Now() dedupDeadline := truncateIfNeeded(dedupTime)
dedupTime := ct.Truncate(tickInterval)
if a.ignoreOldSamples { if a.ignoreOldSamples {
dedupIdx, flushIdx = a.getAggrIdxs(dedupTime, flushDeadline) dedupIdx, flushIdx = a.getAggrIdxs(dedupDeadline, flushDeadline)
} }
pf := pushFunc pf := pushFunc
@ -813,10 +826,10 @@ func (a *aggregator) runFlusher(pushFunc PushFunc, alignFlushToInterval, skipInc
time.Sleep(flushAfter) time.Sleep(flushAfter)
} }
deleteDeadline := dedupTime.Add(a.stalenessInterval) deleteDeadline := dedupDeadline.Add(a.stalenessInterval)
a.dedupFlush(deleteDeadline.UnixMilli(), dedupIdx, flushIdx) a.dedupFlush(deleteDeadline.UnixMilli(), dedupIdx, flushIdx)
if ct.After(flushDeadline) { if dedupTime.After(flushDeadline) {
// It is time to flush the aggregated state // It is time to flush the aggregated state
if alignFlushToInterval && skipIncompleteFlush && !isSkippedFirstFlush { if alignFlushToInterval && skipIncompleteFlush && !isSkippedFirstFlush {
a.flush(nil, 0, flushIdx) a.flush(nil, 0, flushIdx)
@ -827,7 +840,7 @@ func (a *aggregator) runFlusher(pushFunc PushFunc, alignFlushToInterval, skipInc
} else { } else {
a.flush(pf, flushDeadline.UnixMilli(), flushIdx) a.flush(pf, flushDeadline.UnixMilli(), flushIdx)
} }
for ct.After(flushDeadline) { for dedupTime.After(flushDeadline) {
flushDeadline = flushDeadline.Add(a.interval) flushDeadline = flushDeadline.Add(a.interval)
} }
} }
@ -840,10 +853,10 @@ func (a *aggregator) runFlusher(pushFunc PushFunc, alignFlushToInterval, skipInc
} }
if !skipIncompleteFlush && ignoreFirstIntervals <= 0 { if !skipIncompleteFlush && ignoreFirstIntervals <= 0 {
dedupTime := time.Now().Truncate(tickInterval).Add(tickInterval) dedupDeadline := truncateIfNeeded(dedupTime)
deleteDeadline := flushDeadline.Add(a.stalenessInterval) deleteDeadline := flushDeadline.Add(a.stalenessInterval)
if a.ignoreOldSamples { if a.ignoreOldSamples {
dedupIdx, flushIdx = a.getAggrIdxs(dedupTime, flushDeadline) dedupIdx, flushIdx = a.getAggrIdxs(dedupDeadline, flushDeadline)
} }
a.dedupFlush(deleteDeadline.UnixMilli(), dedupIdx, flushIdx) a.dedupFlush(deleteDeadline.UnixMilli(), dedupIdx, flushIdx)
a.flush(pushFunc, flushDeadline.UnixMilli(), flushIdx) a.flush(pushFunc, flushDeadline.UnixMilli(), flushIdx)