minor fix

This commit is contained in:
Andrii Chubatiuk 2024-10-10 20:34:13 +03:00
parent 3a14689567
commit d18a599d85
No known key found for this signature in database
GPG key ID: 96D776CC99880667
2 changed files with 31 additions and 17 deletions

View file

@ -227,6 +227,7 @@ func (das *dedupAggrShard) flush(ctx *dedupFlushCtx, f aggrPushFunc) {
var m map[string]*dedupAggrSample
if len(das.state) == 0 {
das.mu.Unlock()
return
}
state := das.state[ctx.dedupIdx]

View file

@ -556,7 +556,6 @@ func newAggregator(cfg *Config, path string, pushFunc PushFunc, ms *metrics.Set,
if name == "" {
name = "none"
}
metricLabels := fmt.Sprintf(`name=%q,path=%q,url=%q,position="%d"`, name, path, alias, aggrID)
// initialize aggrOutputs
if len(cfg.Outputs) == 0 {
@ -564,9 +563,8 @@ func newAggregator(cfg *Config, path string, pushFunc PushFunc, ms *metrics.Set,
"see https://docs.victoriametrics.com/stream-aggregation/", supportedOutputs)
}
aggrOutputs := &aggrOutputs{
initFns: make([]aggrValuesFn, len(cfg.Outputs)),
outputSamples: ms.NewCounter(fmt.Sprintf(`vm_streamaggr_output_samples_total{outputs=%q,%s}`, "test", metricLabels)),
stateSize: stateSize,
initFns: make([]aggrValuesFn, len(cfg.Outputs)),
stateSize: stateSize,
}
outputsSeen := make(map[string]struct{}, len(cfg.Outputs))
for i, output := range cfg.Outputs {
@ -576,6 +574,12 @@ func newAggregator(cfg *Config, path string, pushFunc PushFunc, ms *metrics.Set,
}
aggrOutputs.initFns[i] = oc
}
outputsLabels := make([]string, 0, len(outputsSeen))
for o := range outputsSeen {
outputsLabels = append(outputsLabels, o)
}
metricLabels := fmt.Sprintf(`outputs=%q,name=%q,path=%q,url=%q,position="%d"`, strings.Join(outputsLabels, ","), name, path, alias, aggrID)
aggrOutputs.outputSamples = ms.NewCounter(fmt.Sprintf(`vm_streamaggr_output_samples_total{%s}`, metricLabels))
// initialize suffix to add to metric names after aggregation
suffix := ":" + cfg.Interval
@ -740,11 +744,14 @@ func newOutputInitFns(output string, outputsSeen map[string]struct{}, stateSize
}
func (a *aggregator) runFlusher(pushFunc PushFunc, alignFlushToInterval, skipIncompleteFlush bool, ignoreFirstIntervals int) {
flushDeadline := time.Now().Add(a.interval)
alignedSleep := func(d time.Duration) {
if !alignFlushToInterval {
return
}
if flushDeadline != flushDeadline.Truncate(d) {
flushDeadline = flushDeadline.Truncate(d).Add(d)
}
ct := time.Duration(time.Now().UnixNano())
dSleep := d - (ct % d)
timer := timerpool.Get(dSleep)
@ -755,18 +762,26 @@ func (a *aggregator) runFlusher(pushFunc PushFunc, alignFlushToInterval, skipInc
}
}
var dedupTime time.Time
truncateIfNeeded := func(t time.Time) time.Time {
if alignFlushToInterval {
return t.Truncate(a.interval)
}
return t
}
tickerWait := func(t *time.Ticker) bool {
select {
case <-a.stopCh:
dedupTime = time.Now()
return false
case <-t.C:
case ct := <-t.C:
dedupTime = ct
return true
}
}
flushDeadline := time.Now().Truncate(a.interval).Add(a.interval)
tickInterval := time.Duration(a.tickInterval) * time.Millisecond
alignedSleep(tickInterval)
alignedSleep(a.interval)
var dedupIdx, flushIdx int
@ -775,11 +790,9 @@ func (a *aggregator) runFlusher(pushFunc PushFunc, alignFlushToInterval, skipInc
isSkippedFirstFlush := false
for tickerWait(t) {
ct := time.Now()
dedupTime := ct.Truncate(tickInterval)
dedupDeadline := truncateIfNeeded(dedupTime)
if a.ignoreOldSamples {
dedupIdx, flushIdx = a.getAggrIdxs(dedupTime, flushDeadline)
dedupIdx, flushIdx = a.getAggrIdxs(dedupDeadline, flushDeadline)
}
pf := pushFunc
@ -798,10 +811,10 @@ func (a *aggregator) runFlusher(pushFunc PushFunc, alignFlushToInterval, skipInc
time.Sleep(flushAfter)
}
deleteDeadline := dedupTime.Add(a.stalenessInterval)
deleteDeadline := dedupDeadline.Add(a.stalenessInterval)
a.dedupFlush(deleteDeadline.UnixMilli(), dedupIdx, flushIdx)
if ct.After(flushDeadline) {
if dedupTime.After(flushDeadline) {
// It is time to flush the aggregated state
if alignFlushToInterval && skipIncompleteFlush && !isSkippedFirstFlush {
a.flush(nil, 0, flushIdx)
@ -812,7 +825,7 @@ func (a *aggregator) runFlusher(pushFunc PushFunc, alignFlushToInterval, skipInc
} else {
a.flush(pf, flushDeadline.UnixMilli(), flushIdx)
}
for ct.After(flushDeadline) {
for dedupTime.After(flushDeadline) {
flushDeadline = flushDeadline.Add(a.interval)
}
}
@ -825,10 +838,10 @@ func (a *aggregator) runFlusher(pushFunc PushFunc, alignFlushToInterval, skipInc
}
if !skipIncompleteFlush && ignoreFirstIntervals <= 0 {
dedupTime := time.Now().Truncate(tickInterval).Add(tickInterval)
dedupDeadline := truncateIfNeeded(dedupTime)
deleteDeadline := flushDeadline.Add(a.stalenessInterval)
if a.ignoreOldSamples {
dedupIdx, flushIdx = a.getAggrIdxs(dedupTime, flushDeadline)
dedupIdx, flushIdx = a.getAggrIdxs(dedupDeadline, flushDeadline)
}
a.dedupFlush(deleteDeadline.UnixMilli(), dedupIdx, flushIdx)
a.flush(pushFunc, flushDeadline.UnixMilli(), flushIdx)