Fix for PR

This commit is contained in:
Alexander Marshalov 2023-12-12 16:34:07 +01:00
parent 8f946a927c
commit 92cadfff65
No known key found for this signature in database
13 changed files with 32 additions and 29 deletions

View file

@ -18,7 +18,7 @@ type avgAggrState struct {
type avgStateValue struct {
mu sync.Mutex
sum float64
count uint64
count int64
deleted bool
}
@ -71,10 +71,7 @@ func (as *avgAggrState) appendSeriesForFlush(ctx *flushCtx) {
sv := v.(*avgStateValue)
sv.mu.Lock()
var avg float64
if sv.count > 0 {
avg = sv.sum / float64(sv.count)
}
avg := sv.sum / float64(sv.count)
// Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls.
sv.deleted = true
sv.mu.Unlock()
@ -101,7 +98,7 @@ func (as *avgAggrState) getStateRepresentation(suffix string) aggrStateRepresent
metrics = append(metrics, aggrStateRepresentationMetric{
metric: getLabelsStringFromKey(k.(string), suffix, as.getOutputName()),
currentValue: value.sum / float64(value.count),
samplesCount: value.count,
samplesCount: uint64(value.count),
})
return true
})

View file

@ -12,7 +12,6 @@ import (
type countSamplesAggrState struct {
m sync.Map
intervalSecs uint64
stalenessSecs uint64
lastPushTimestamp atomic.Uint64
}
@ -66,6 +65,7 @@ func (as *countSamplesAggrState) appendSeriesForFlush(ctx *flushCtx) {
m.Range(func(k, v interface{}) bool {
// Atomically delete the entry from the map, so new entry is created for the next flush.
m.Delete(k)
sv := v.(*countSamplesStateValue)
sv.mu.Lock()
n := sv.n

View file

@ -74,10 +74,10 @@ func (as *countSeriesAggrState) appendSeriesForFlush(ctx *flushCtx) {
m.Range(func(k, v interface{}) bool {
// Atomically delete the entry from the map, so new entry is created for the next flush.
m.Delete(k)
sv := v.(*countSeriesStateValue)
sv.mu.Lock()
n := sv.n
sv.n = 0
// Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls.
sv.deleted = true
sv.mu.Unlock()

View file

@ -10,10 +10,11 @@ import (
// increaseAggrState calculates output=increase, e.g. the increase over input counters.
type increaseAggrState struct {
m sync.Map
intervalSecs uint64
m sync.Map
ignoreInputDeadline uint64
stalenessSecs uint64
intervalSecs uint64
lastPushTimestamp atomic.Uint64
}
@ -31,9 +32,9 @@ func newIncreaseAggrState(interval time.Duration, stalenessInterval time.Duratio
intervalSecs := roundDurationToSecs(interval)
stalenessSecs := roundDurationToSecs(stalenessInterval)
return &increaseAggrState{
intervalSecs: intervalSecs,
stalenessSecs: stalenessSecs,
ignoreInputDeadline: currentTime + intervalSecs,
stalenessSecs: stalenessSecs,
intervalSecs: intervalSecs,
}
}

View file

@ -67,6 +67,7 @@ func (as *lastAggrState) appendSeriesForFlush(ctx *flushCtx) {
m.Range(func(k, v interface{}) bool {
// Atomically delete the entry from the map, so new entry is created for the next flush.
m.Delete(k)
sv := v.(*lastStateValue)
sv.mu.Lock()
last := sv.last

View file

@ -69,6 +69,7 @@ func (as *maxAggrState) appendSeriesForFlush(ctx *flushCtx) {
m.Range(func(k, v interface{}) bool {
// Atomically delete the entry from the map, so new entry is created for the next flush.
m.Delete(k)
sv := v.(*maxStateValue)
sv.mu.Lock()
value := sv.max

View file

@ -69,6 +69,7 @@ func (as *minAggrState) appendSeriesForFlush(ctx *flushCtx) {
m.Range(func(k, v interface{}) bool {
// Atomically delete the entry from the map, so new entry is created for the next flush.
m.Delete(k)
sv := v.(*minStateValue)
sv.mu.Lock()
value := sv.min

View file

@ -1,7 +1,6 @@
package streamaggr
import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
"strconv"
"sync"
"sync/atomic"
@ -9,6 +8,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
"github.com/valyala/histogram"
)

View file

@ -68,6 +68,7 @@ func (as *stddevAggrState) appendSeriesForFlush(ctx *flushCtx) {
m.Range(func(k, v interface{}) bool {
// Atomically delete the entry from the map, so new entry is created for the next flush.
m.Delete(k)
sv := v.(*stddevStateValue)
sv.mu.Lock()
stddev := math.Sqrt(sv.q / sv.count)

View file

@ -67,6 +67,7 @@ func (as *stdvarAggrState) appendSeriesForFlush(ctx *flushCtx) {
m.Range(func(k, v interface{}) bool {
// Atomically delete the entry from the map, so new entry is created for the next flush.
m.Delete(k)
sv := v.(*stdvarStateValue)
sv.mu.Lock()
stdvar := sv.q / sv.count

View file

@ -3,7 +3,6 @@ package streamaggr
import (
"encoding/json"
"fmt"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"math"
"sort"
"strconv"
@ -14,6 +13,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
@ -240,9 +240,10 @@ type aggregator struct {
// for `interval: 1m`, `by: [job]`
suffix string
wg sync.WaitGroup
stopCh chan struct{}
initialTime uint64
wg sync.WaitGroup
stopCh chan struct{}
}
type aggrState interface {
@ -279,9 +280,9 @@ func newAggregator(cfg *Config, pushFunc PushFunc, dedupInterval time.Duration)
if err != nil {
return nil, fmt.Errorf("cannot parse `interval: %q`: %w", cfg.Interval, err)
}
//if interval <= time.Second {
// return nil, fmt.Errorf("the minimum supported aggregation interval is 1s; got %s", interval)
//}
if interval <= time.Second {
return nil, fmt.Errorf("the minimum supported aggregation interval is 1s; got %s", interval)
}
// check cfg.StalenessInterval
stalenessInterval := interval * 2
@ -410,8 +411,9 @@ func newAggregator(cfg *Config, pushFunc PushFunc, dedupInterval time.Duration)
suffix: suffix,
stopCh: make(chan struct{}),
initialTime: fasttime.UnixTimestamp(),
stopCh: make(chan struct{}),
}
if dedupAggr != nil {

View file

@ -16,11 +16,10 @@ type sumSamplesAggrState struct {
}
type sumSamplesStateValue struct {
mu sync.Mutex
sum float64
samplesCount uint64
deleted bool
deleteDeadline uint64
mu sync.Mutex
sum float64
samplesCount uint64
deleted bool
}
func newSumSamplesAggrState(interval time.Duration) *sumSamplesAggrState {
@ -68,6 +67,7 @@ func (as *sumSamplesAggrState) appendSeriesForFlush(ctx *flushCtx) {
m.Range(func(k, v interface{}) bool {
// Atomically delete the entry from the map, so new entry is created for the next flush.
m.Delete(k)
sv := v.(*sumSamplesStateValue)
sv.mu.Lock()
sum := sv.sum

View file

@ -29,8 +29,6 @@ type totalStateValue struct {
type lastValueState struct {
value float64
firstValue float64
correction float64
deleteDeadline uint64
}
@ -39,9 +37,9 @@ func newTotalAggrState(interval time.Duration, stalenessInterval time.Duration)
intervalSecs := roundDurationToSecs(interval)
stalenessSecs := roundDurationToSecs(stalenessInterval)
return &totalAggrState{
intervalSecs: intervalSecs,
stalenessSecs: stalenessSecs,
ignoreInputDeadline: currentTime + intervalSecs,
stalenessSecs: stalenessSecs,
intervalSecs: intervalSecs,
}
}