removed some context structs

This commit is contained in:
Andrii Chubatiuk 2025-01-23 10:27:32 +02:00
parent daaafa5718
commit 87634e578d
No known key found for this signature in database
GPG key ID: 96D776CC99880667
18 changed files with 291 additions and 280 deletions

View file

@ -1,10 +1,7 @@
package streamaggr
func avgInitFn(v *aggrValues, enableWindows bool) {
v.blue = append(v.blue, new(avgAggrValue))
if enableWindows {
v.green = append(v.green, new(avgAggrValue))
}
func avgInitFn(_ any) aggrValue {
return &avgAggrValue{}
}
type avgAggrValue struct {
@ -12,16 +9,20 @@ type avgAggrValue struct {
count float64
}
func (sv *avgAggrValue) pushSample(_ string, sample *pushSample, _ int64) {
sv.sum += sample.value
sv.count++
func (av *avgAggrValue) pushSample(_ string, sample *pushSample, _ int64) {
av.sum += sample.value
av.count++
}
func (sv *avgAggrValue) flush(ctx *flushCtx, key string) {
if sv.count > 0 {
avg := sv.sum / sv.count
func (av *avgAggrValue) flush(ctx *flushCtx, key string) {
if av.count > 0 {
avg := av.sum / av.count
ctx.appendSeries(key, "avg", avg)
sv.sum = 0
sv.count = 0
av.sum = 0
av.count = 0
}
}
func (*avgAggrValue) state() any {
return nil
}

View file

@ -1,10 +1,7 @@
package streamaggr
func countSamplesInitFn(v *aggrValues, enableWindows bool) {
v.blue = append(v.blue, new(countSamplesAggrValue))
if enableWindows {
v.green = append(v.green, new(countSamplesAggrValue))
}
func countSamplesInitFn(_ any) aggrValue {
return &countSamplesAggrValue{}
}
type countSamplesAggrValue struct {
@ -19,3 +16,7 @@ func (av *countSamplesAggrValue) flush(ctx *flushCtx, key string) {
ctx.appendSeries(key, "count_samples", float64(av.count))
av.count = 0
}
func (*countSamplesAggrValue) state() any {
return nil
}

View file

@ -5,14 +5,9 @@ import (
"github.com/cespare/xxhash/v2"
)
func countSeriesInitFn(v *aggrValues, enableWindows bool) {
v.blue = append(v.blue, &countSeriesAggrValue{
func countSeriesInitFn(_ any) aggrValue {
return &countSeriesAggrValue{
samples: make(map[uint64]struct{}),
})
if enableWindows {
v.green = append(v.green, &countSeriesAggrValue{
samples: make(map[uint64]struct{}),
})
}
}
@ -20,10 +15,10 @@ type countSeriesAggrValue struct {
samples map[uint64]struct{}
}
func (av *countSeriesAggrValue) pushSample(inputKey string, _ *pushSample, _ int64) {
// Count unique hashes over the inputKeys instead of unique inputKey values.
// This reduces memory usage at the cost of possible hash collisions for distinct inputKey values.
h := xxhash.Sum64(bytesutil.ToUnsafeBytes(inputKey))
func (av *countSeriesAggrValue) pushSample(key string, _ *pushSample, _ int64) {
// Count unique hashes over the keys instead of unique key values.
// This reduces memory usage at the cost of possible hash collisions for distinct key values.
h := xxhash.Sum64(bytesutil.ToUnsafeBytes(key))
if _, ok := av.samples[h]; !ok {
av.samples[h] = struct{}{}
}
@ -31,5 +26,9 @@ func (av *countSeriesAggrValue) pushSample(inputKey string, _ *pushSample, _ int
func (av *countSeriesAggrValue) flush(ctx *flushCtx, key string) {
ctx.appendSeries(key, "count_series", float64(len(av.samples)))
clear(av.samples)
av.samples = make(map[uint64]struct{}, len(av.samples))
}
func (*countSeriesAggrValue) state() any {
return nil
}

View file

@ -5,6 +5,7 @@ import (
"sync/atomic"
"unsafe"
"github.com/VictoriaMetrics/metrics"
"github.com/cespare/xxhash/v2"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
@ -15,7 +16,9 @@ import (
const dedupAggrShardsCount = 128
type dedupAggr struct {
shards []dedupAggrShard
shards []dedupAggrShard
flushDuration *metrics.Histogram
flushTimeouts *metrics.Counter
}
type dedupAggrShard struct {
@ -28,15 +31,15 @@ type dedupAggrShard struct {
type dedupAggrState struct {
m map[string]*dedupAggrSample
mu sync.Mutex
samplesBuf []dedupAggrSample
sizeBytes atomic.Uint64
itemsCount atomic.Uint64
}
type dedupAggrShardNopad struct {
mu sync.Mutex
blue *dedupAggrState
green *dedupAggrState
blue dedupAggrState
green dedupAggrState
}
type dedupAggrSample struct {
@ -52,32 +55,22 @@ func newDedupAggr() *dedupAggr {
func (da *dedupAggr) sizeBytes() uint64 {
n := uint64(unsafe.Sizeof(*da))
var state *dedupAggrState
var shard *dedupAggrShard
for i := range da.shards {
state = da.shards[i].green
if state != nil {
n += state.sizeBytes.Load()
}
state = da.shards[i].blue
if state != nil {
n += state.sizeBytes.Load()
}
shard = &da.shards[i]
n += shard.blue.sizeBytes.Load()
n += shard.green.sizeBytes.Load()
}
return n
}
func (da *dedupAggr) itemsCount() uint64 {
n := uint64(0)
var state *dedupAggrState
var shard *dedupAggrShard
for i := range da.shards {
state = da.shards[i].green
if state != nil {
n += state.itemsCount.Load()
}
state = da.shards[i].blue
if state != nil {
n += state.itemsCount.Load()
}
shard = &da.shards[i]
n += shard.blue.itemsCount.Load()
n += shard.green.itemsCount.Load()
}
return n
}
@ -179,23 +172,16 @@ func putPerShardSamples(pss *perShardSamples) {
var perShardSamplesPool sync.Pool
func (das *dedupAggrShard) pushSamples(samples []pushSample, isGreen bool) {
das.mu.Lock()
defer das.mu.Unlock()
var state *dedupAggrState
if isGreen {
if das.green == nil {
das.green = new(dedupAggrState)
}
state = das.green
state = &das.green
} else {
if das.blue == nil {
das.blue = new(dedupAggrState)
}
state = das.blue
state = &das.blue
}
state.mu.Lock()
defer state.mu.Unlock()
if state.m == nil {
state.m = make(map[string]*dedupAggrSample, len(samples))
}
@ -241,19 +227,15 @@ func isDuplicate(a *dedupAggrSample, b pushSample) bool {
}
func (das *dedupAggrShard) flush(ctx *dedupFlushCtx, f aggrPushFunc) {
das.mu.Lock()
var m map[string]*dedupAggrSample
var state *dedupAggrState
if ctx.isGreen {
state = das.green
state = &das.green
} else {
state = das.blue
}
if state == nil {
das.mu.Unlock()
return
state = &das.blue
}
state.mu.Lock()
if len(state.m) > 0 {
m = state.m
state.m = make(map[string]*dedupAggrSample, len(state.m))
@ -261,8 +243,7 @@ func (das *dedupAggrShard) flush(ctx *dedupFlushCtx, f aggrPushFunc) {
state.sizeBytes.Store(0)
state.itemsCount.Store(0)
}
das.mu.Unlock()
state.mu.Unlock()
if len(m) == 0 {
return

View file

@ -32,9 +32,6 @@ type Deduplicator struct {
// time to wait after interval end before flush
flushAfter atomic.Pointer[histogram.Fast]
flushDuration *metrics.Histogram
flushTimeouts *metrics.Counter
}
// NewDeduplicator returns new deduplicator, which deduplicates samples per each time series.
@ -77,8 +74,8 @@ func NewDeduplicator(pushFunc PushFunc, enableWindows bool, interval time.Durati
return float64(d.da.itemsCount())
})
d.flushDuration = ms.NewHistogram(fmt.Sprintf(`vm_streamaggr_dedup_flush_duration_seconds{%s}`, metricLabels))
d.flushTimeouts = ms.NewCounter(fmt.Sprintf(`vm_streamaggr_dedup_flush_timeouts_total{%s}`, metricLabels))
d.da.flushDuration = ms.NewHistogram(fmt.Sprintf(`vm_streamaggr_dedup_flush_duration_seconds{%s}`, metricLabels))
d.da.flushTimeouts = ms.NewCounter(fmt.Sprintf(`vm_streamaggr_dedup_flush_timeouts_total{%s}`, metricLabels))
metrics.RegisterSet(ms)
@ -233,9 +230,9 @@ func (d *Deduplicator) flush(pushFunc PushFunc) {
d.current.Store(current)
duration := time.Since(startTime)
d.flushDuration.Update(duration.Seconds())
d.da.flushDuration.Update(duration.Seconds())
if duration > d.interval {
d.flushTimeouts.Inc()
d.da.flushTimeouts.Inc()
logger.Warnf("deduplication couldn't be finished in the configured dedupInterval=%s; it took %.03fs; "+
"possible solutions: increase dedupInterval; reduce samples' ingestion rate", d.interval, duration.Seconds())
}

View file

@ -4,28 +4,45 @@ import (
"github.com/VictoriaMetrics/metrics"
)
func histogramBucketInitFn(v *aggrValues, enableWindows bool) {
v.blue = append(v.blue, new(histogramBucketAggrValue))
if enableWindows {
v.green = append(v.green, new(histogramBucketAggrValue))
func histogramBucketInitFn(enableWindows bool) aggrValueFn {
return func(s any) aggrValue {
var total *metrics.Histogram
if enableWindows {
if s == nil {
total = &metrics.Histogram{}
} else {
total = s.(*metrics.Histogram)
}
}
return &histogramBucketAggrValue{
total: total,
}
}
}
// histogramBucketAggrValue calculates output=histogram_bucket, e.g. VictoriaMetrics histogram over input samples.
type histogramBucketAggrValue struct {
h metrics.Histogram
state metrics.Histogram
total *metrics.Histogram
}
func (sv *histogramBucketAggrValue) pushSample(_ string, sample *pushSample, _ int64) {
sv.h.Update(sample.value)
func (av *histogramBucketAggrValue) pushSample(_ string, sample *pushSample, _ int64) {
av.h.Update(sample.value)
}
func (sv *histogramBucketAggrValue) flush(ctx *flushCtx, key string) {
total := &sv.state
total.Merge(&sv.h)
func (av *histogramBucketAggrValue) flush(ctx *flushCtx, key string) {
total := av.total
if total == nil {
total = &av.h
} else {
total.Merge(&av.h)
av.h.Reset()
}
total.VisitNonZeroBuckets(func(vmrange string, count uint64) {
ctx.appendSeriesWithExtraLabel(key, "histogram_bucket", float64(count), "vmrange", vmrange)
})
total.Reset()
}
func (av *histogramBucketAggrValue) state() any {
return av.total
}

View file

@ -1,10 +1,7 @@
package streamaggr
func lastInitFn(v *aggrValues, enableWindows bool) {
v.blue = append(v.blue, new(lastAggrValue))
if enableWindows {
v.green = append(v.green, new(lastAggrValue))
}
func lastInitFn(_ any) aggrValue {
return &lastAggrValue{}
}
type lastAggrValue struct {
@ -25,3 +22,7 @@ func (av *lastAggrValue) flush(ctx *flushCtx, key string) {
av.timestamp = 0
}
}
func (*lastAggrValue) state() any {
return nil
}

View file

@ -1,10 +1,7 @@
package streamaggr
func maxInitFn(v *aggrValues, enableWindows bool) {
v.blue = append(v.blue, new(maxAggrValue))
if enableWindows {
v.green = append(v.green, new(maxAggrValue))
}
func maxInitFn(_ any) aggrValue {
return &maxAggrValue{}
}
type maxAggrValue struct {
@ -28,3 +25,7 @@ func (av *maxAggrValue) flush(ctx *flushCtx, key string) {
av.defined = false
}
}
func (*maxAggrValue) state() any {
return nil
}

View file

@ -1,11 +1,7 @@
package streamaggr
func minInitFn(v *aggrValues, enableWindows bool) {
v.blue = append(v.blue, new(minAggrValue))
if enableWindows {
v.green = append(v.green, new(minAggrValue))
}
func minInitFn(_ any) aggrValue {
return &minAggrValue{}
}
type minAggrValue struct {
@ -29,3 +25,7 @@ func (av *minAggrValue) flush(ctx *flushCtx, key string) {
av.min = 0
}
}
func (*minAggrValue) state() any {
return nil
}

View file

@ -2,17 +2,35 @@ package streamaggr
import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/metrics"
"sync"
)
type aggrValuesFn func(*aggrValues, bool)
type aggrValueFn func(any) aggrValue
type aggrOutputs struct {
m sync.Map
enableWindows bool
initFns []aggrValuesFn
initFns []aggrValueFn
outputSamples *metrics.Counter
dedupEnabled bool
}
func (ao *aggrOutputs) getInputOutputKey(key string) (string, string) {
src := bytesutil.ToUnsafeBytes(key)
outputKeyLen, nSize := encoding.UnmarshalVarUint64(src)
if nSize <= 0 {
logger.Panicf("BUG: cannot unmarshal outputKeyLen from uvarint")
}
src = src[nSize:]
outputKey := src[:outputKeyLen]
if ao.dedupEnabled {
return key, bytesutil.ToUnsafeString(outputKey)
}
inputKey := src[outputKeyLen:]
return bytesutil.ToUnsafeString(inputKey), bytesutil.ToUnsafeString(outputKey)
}
func (ao *aggrOutputs) pushSamples(samples []pushSample, deleteDeadline int64, isGreen bool) {
@ -20,20 +38,18 @@ func (ao *aggrOutputs) pushSamples(samples []pushSample, deleteDeadline int64, i
var sample *pushSample
for i := range samples {
sample = &samples[i]
inputKey, outputKey = getInputOutputKey(sample.key)
inputKey, outputKey = ao.getInputOutputKey(sample.key)
again:
v, ok := ao.m.Load(outputKey)
if !ok {
// The entry is missing in the map. Try creating it.
nv := &aggrValues{
blue: make([]aggrValue, 0, len(ao.initFns)),
}
if ao.enableWindows {
nv.green = make([]aggrValue, 0, len(ao.initFns))
}
for _, initFn := range ao.initFns {
initFn(nv, ao.enableWindows)
nv := &aggrValues{}
for idx, initFn := range ao.initFns {
nv.blue = append(nv.blue, initFn(nil))
if ao.enableWindows {
nv.green = append(nv.green, initFn(nv.blue[idx].state()))
}
}
v = nv
outputKey = bytesutil.InternString(outputKey)
@ -84,16 +100,19 @@ func (ao *aggrOutputs) flushState(ctx *flushCtx) {
m.Delete(k)
return true
}
key := k.(string)
outputKey := k.(string)
if ctx.isGreen {
outputs = av.green
} else {
outputs = av.blue
}
for _, state := range outputs {
state.flush(ctx, key)
state.flush(ctx, outputKey)
}
av.mu.Unlock()
if ctx.isLast {
m.Delete(k)
}
return true
})
}
@ -109,4 +128,5 @@ type aggrValues struct {
type aggrValue interface {
pushSample(string, *pushSample, int64)
flush(*flushCtx, string)
state() any
}

View file

@ -6,21 +6,13 @@ import (
"strconv"
)
func quantilesInitFn(phis []float64) aggrValuesFn {
blue := &quantilesAggrState{
func quantilesInitFn(phis []float64) aggrValueFn {
state := &quantilesAggrState{
phis: phis,
}
green := &quantilesAggrState{
phis: phis,
}
return func(v *aggrValues, enableWindows bool) {
v.blue = append(v.blue, &quantilesAggrValue{
state: blue,
})
if enableWindows {
v.green = append(v.green, &quantilesAggrValue{
state: green,
})
return func(_ any) aggrValue {
return &quantilesAggrValue{
buf: state,
}
}
}
@ -33,8 +25,8 @@ type quantilesAggrState struct {
// quantilesAggrValue calculates output=quantiles, e.g. the given quantiles over the input samples.
type quantilesAggrValue struct {
h *histogram.Fast
state *quantilesAggrState
h *histogram.Fast
buf *quantilesAggrState
}
func (av *quantilesAggrValue) pushSample(_ string, sample *pushSample, _ int64) {
@ -46,14 +38,18 @@ func (av *quantilesAggrValue) pushSample(_ string, sample *pushSample, _ int64)
func (av *quantilesAggrValue) flush(ctx *flushCtx, key string) {
if av.h != nil {
av.state.quantiles = av.h.Quantiles(av.state.quantiles[:0], av.state.phis)
av.buf.quantiles = av.h.Quantiles(av.buf.quantiles[:0], av.buf.phis)
}
histogram.PutFast(av.h)
if len(av.state.quantiles) > 0 {
for i, quantile := range av.state.quantiles {
av.state.b = strconv.AppendFloat(av.state.b[:0], av.state.phis[i], 'g', -1, 64)
phiStr := bytesutil.InternBytes(av.state.b)
if len(av.buf.quantiles) > 0 {
for i, quantile := range av.buf.quantiles {
av.buf.b = strconv.AppendFloat(av.buf.b[:0], av.buf.phis[i], 'g', -1, 64)
phiStr := bytesutil.InternBytes(av.buf.b)
ctx.appendSeriesWithExtraLabel(key, "quantiles", quantile, "quantile", phiStr)
}
}
}
func (*quantilesAggrValue) state() any {
return nil
}

View file

@ -4,22 +4,20 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
)
func rateInitFn(isAvg bool) aggrValuesFn {
return func(v *aggrValues, enableWindows bool) {
shared := &rateAggrValueShared{
lastValues: make(map[string]rateLastValue),
func rateInitFn(isAvg bool) aggrValueFn {
return func(s any) aggrValue {
var shared *rateAggrValueShared
if s == nil {
shared = &rateAggrValueShared{
isAvg: isAvg,
lastValues: make(map[string]rateLastValue),
}
} else {
shared = s.(*rateAggrValueShared)
}
v.blue = append(v.blue, &rateAggrValue{
isAvg: isAvg,
return &rateAggrValue{
shared: shared,
state: make(map[string]rateAggrValueState),
})
if enableWindows {
v.green = append(v.green, &rateAggrValue{
isAvg: isAvg,
shared: shared,
state: make(map[string]rateAggrValueState),
})
states: make(map[string]rateAggrValueState),
}
}
}
@ -35,6 +33,7 @@ type rateLastValue struct {
type rateAggrValueShared struct {
lastValues map[string]rateLastValue
isAvg bool
}
type rateAggrValueState struct {
@ -45,13 +44,12 @@ type rateAggrValueState struct {
type rateAggrValue struct {
shared *rateAggrValueShared
state map[string]rateAggrValueState
isAvg bool
states map[string]rateAggrValueState
}
func (av *rateAggrValue) pushSample(inputKey string, sample *pushSample, deleteDeadline int64) {
sv := av.state[inputKey]
lv, ok := av.shared.lastValues[inputKey]
func (av *rateAggrValue) pushSample(key string, sample *pushSample, deleteDeadline int64) {
sv := av.states[key]
lv, ok := av.shared.lastValues[key]
if ok {
if sample.timestamp < sv.timestamp {
// Skip out of order sample
@ -69,9 +67,9 @@ func (av *rateAggrValue) pushSample(inputKey string, sample *pushSample, deleteD
lv.value = sample.value
lv.deleteDeadline = deleteDeadline
sv.timestamp = sample.timestamp
inputKey = bytesutil.InternString(inputKey)
av.state[inputKey] = sv
av.shared.lastValues[inputKey] = lv
key = bytesutil.InternString(key)
av.states[key] = sv
av.shared.lastValues[key] = lv
}
func (av *rateAggrValue) flush(ctx *flushCtx, key string) {
@ -85,7 +83,7 @@ func (av *rateAggrValue) flush(ctx *flushCtx, key string) {
continue
}
}
for sk, sv := range av.state {
for sk, sv := range av.states {
lv := lvs[sk]
if lv.prevTimestamp == 0 {
continue
@ -97,12 +95,14 @@ func (av *rateAggrValue) flush(ctx *flushCtx, key string) {
}
lv.prevTimestamp = sv.timestamp
lvs[sk] = lv
delete(av.state, sk)
}
av.states = make(map[string]rateAggrValueState, len(lvs))
if countSeries == 0 {
return
}
if av.isAvg {
if av.shared.isAvg {
rate /= float64(countSeries)
}
if rate > 0 {
@ -111,8 +111,12 @@ func (av *rateAggrValue) flush(ctx *flushCtx, key string) {
}
func (av *rateAggrValue) getSuffix() string {
if av.isAvg {
if av.shared.isAvg {
return "rate_avg"
}
return "rate_sum"
}
func (av *rateAggrValue) state() any {
return av.shared
}

View file

@ -4,11 +4,8 @@ import (
"math"
)
func stddevInitFn(v *aggrValues, enableWindows bool) {
v.blue = append(v.blue, new(stddevAggrValue))
if enableWindows {
v.green = append(v.green, new(stddevAggrValue))
}
func stddevInitFn(_ any) aggrValue {
return &stddevAggrValue{}
}
// stddevAggrValue calculates output=stddev, e.g. the average value over input samples.
@ -33,3 +30,7 @@ func (av *stddevAggrValue) flush(ctx *flushCtx, key string) {
av.q = 0
}
}
func (*stddevAggrValue) state() any {
return nil
}

View file

@ -1,10 +1,7 @@
package streamaggr
func stdvarInitFn(v *aggrValues, enableWindows bool) {
v.blue = append(v.blue, new(stdvarAggrValue))
if enableWindows {
v.green = append(v.green, new(stdvarAggrValue))
}
func stdvarInitFn(_ any) aggrValue {
return &stdvarAggrValue{}
}
// stdvarAggrValue calculates output=stdvar, e.g. the average value over input samples.
@ -29,3 +26,7 @@ func (av *stdvarAggrValue) flush(ctx *flushCtx, key string) {
av.q = 0
}
}
func (*stdvarAggrValue) state() any {
return nil
}

View file

@ -427,15 +427,13 @@ type aggregator struct {
wg sync.WaitGroup
stopCh chan struct{}
flushDuration *metrics.Histogram
dedupFlushDuration *metrics.Histogram
samplesLag *metrics.Histogram
flushDuration *metrics.Histogram
samplesLag *metrics.Histogram
flushTimeouts *metrics.Counter
dedupFlushTimeouts *metrics.Counter
ignoredOldSamples *metrics.Counter
ignoredNaNSamples *metrics.Counter
matchedSamples *metrics.Counter
flushTimeouts *metrics.Counter
ignoredOldSamples *metrics.Counter
ignoredNaNSamples *metrics.Counter
matchedSamples *metrics.Counter
}
// PushFunc is called by Aggregators when it needs to push its state to metrics storage
@ -580,12 +578,12 @@ func newAggregator(cfg *Config, path string, pushFunc PushFunc, ms *metrics.Set,
"see https://docs.victoriametrics.com/stream-aggregation/", supportedOutputs)
}
aggrOutputs := &aggrOutputs{
initFns: make([]aggrValuesFn, len(cfg.Outputs)),
initFns: make([]aggrValueFn, len(cfg.Outputs)),
enableWindows: enableWindows,
}
outputsSeen := make(map[string]struct{}, len(cfg.Outputs))
for i, output := range cfg.Outputs {
oc, err := newOutputInitFns(output, outputsSeen, ignoreFirstSampleInterval)
oc, err := newOutputInitFns(output, outputsSeen, enableWindows, ignoreFirstSampleInterval)
if err != nil {
return nil, err
}
@ -634,21 +632,22 @@ func newAggregator(cfg *Config, path string, pushFunc PushFunc, ms *metrics.Set,
stopCh: make(chan struct{}),
flushDuration: ms.NewHistogram(fmt.Sprintf(`vm_streamaggr_flush_duration_seconds{%s}`, metricLabels)),
dedupFlushDuration: ms.NewHistogram(fmt.Sprintf(`vm_streamaggr_dedup_flush_duration_seconds{%s}`, metricLabels)),
samplesLag: ms.NewHistogram(fmt.Sprintf(`vm_streamaggr_samples_lag_seconds{%s}`, metricLabels)),
flushDuration: ms.NewHistogram(fmt.Sprintf(`vm_streamaggr_flush_duration_seconds{%s}`, metricLabels)),
samplesLag: ms.NewHistogram(fmt.Sprintf(`vm_streamaggr_samples_lag_seconds{%s}`, metricLabels)),
matchedSamples: ms.NewCounter(fmt.Sprintf(`vm_streamaggr_matched_samples_total{%s}`, metricLabels)),
flushTimeouts: ms.NewCounter(fmt.Sprintf(`vm_streamaggr_flush_timeouts_total{%s}`, metricLabels)),
dedupFlushTimeouts: ms.NewCounter(fmt.Sprintf(`vm_streamaggr_dedup_flush_timeouts_total{%s}`, metricLabels)),
ignoredNaNSamples: ms.NewCounter(fmt.Sprintf(`vm_streamaggr_ignored_samples_total{reason="nan",%s}`, metricLabels)),
ignoredOldSamples: ms.NewCounter(fmt.Sprintf(`vm_streamaggr_ignored_samples_total{reason="too_old",%s}`, metricLabels)),
matchedSamples: ms.NewCounter(fmt.Sprintf(`vm_streamaggr_matched_samples_total{%s}`, metricLabels)),
flushTimeouts: ms.NewCounter(fmt.Sprintf(`vm_streamaggr_flush_timeouts_total{%s}`, metricLabels)),
ignoredNaNSamples: ms.NewCounter(fmt.Sprintf(`vm_streamaggr_ignored_samples_total{reason="nan",%s}`, metricLabels)),
ignoredOldSamples: ms.NewCounter(fmt.Sprintf(`vm_streamaggr_ignored_samples_total{reason="too_old",%s}`, metricLabels)),
}
a.flushAfter.Store(histogram.GetFast())
if dedupInterval > 0 {
a.aggrOutputs.dedupEnabled = true
a.da = newDedupAggr()
a.da.flushTimeouts = ms.NewCounter(fmt.Sprintf(`vm_streamaggr_dedup_flush_timeouts_total{%s}`, metricLabels))
a.da.flushDuration = ms.NewHistogram(fmt.Sprintf(`vm_streamaggr_dedup_flush_duration_seconds{%s}`, metricLabels))
_ = ms.NewGauge(fmt.Sprintf(`vm_streamaggr_dedup_state_size_bytes{%s}`, metricLabels), func() float64 {
n := a.da.sizeBytes()
@ -693,7 +692,7 @@ func newAggregator(cfg *Config, path string, pushFunc PushFunc, ms *metrics.Set,
return a, nil
}
func newOutputInitFns(output string, outputsSeen map[string]struct{}, ignoreFirstSampleInterval time.Duration) (aggrValuesFn, error) {
func newOutputInitFns(output string, outputsSeen map[string]struct{}, enableWindows bool, ignoreFirstSampleInterval time.Duration) (aggrValueFn, error) {
// check for duplicated output
if _, ok := outputsSeen[output]; ok {
return nil, fmt.Errorf("`outputs` list contains duplicate aggregation function: %s", output)
@ -737,7 +736,7 @@ func newOutputInitFns(output string, outputsSeen map[string]struct{}, ignoreFirs
case "count_series":
return countSeriesInitFn, nil
case "histogram_bucket":
return histogramBucketInitFn, nil
return histogramBucketInitFn(enableWindows), nil
case "increase":
return totalInitFn(ignoreFirstSampleIntervalSecs, true, true), nil
case "increase_prometheus":
@ -804,7 +803,6 @@ func (a *aggregator) runFlusher(pushFunc PushFunc, alignFlushToInterval, skipInc
t := time.NewTicker(interval)
defer t.Stop()
isSkippedFirstFlush := false
for tickerWait(t) {
pf := pushFunc
if a.enableWindows {
@ -822,14 +820,11 @@ func (a *aggregator) runFlusher(pushFunc PushFunc, alignFlushToInterval, skipInc
if !flushTime.After(dedupTime) {
// It is time to flush the aggregated state
if alignFlushToInterval && skipIncompleteFlush && !isSkippedFirstFlush {
a.flush(nil, flushTime)
isSkippedFirstFlush = true
} else if ignoreFirstIntervals > 0 {
a.flush(nil, flushTime)
if ignoreFirstIntervals > 0 {
a.flush(nil, flushTime, false)
ignoreFirstIntervals--
} else {
a.flush(pf, flushTime)
a.flush(pf, flushTime, false)
}
for time.Now().After(flushTime) {
flushTime = flushTime.Add(a.interval)
@ -848,22 +843,25 @@ func (a *aggregator) runFlusher(pushFunc PushFunc, alignFlushToInterval, skipInc
}
}
if !skipIncompleteFlush && ignoreFirstIntervals <= 0 {
current := a.current.Load()
var dedupTime time.Time
if alignFlushToInterval {
if a.dedupInterval > 0 {
dedupTime = time.UnixMilli(current.deadline)
}
} else {
flushTime = time.Now()
if a.dedupInterval > 0 {
dedupTime = flushTime
}
current := a.current.Load()
var dedupTime time.Time
if alignFlushToInterval {
if a.dedupInterval > 0 {
dedupTime = time.UnixMilli(current.deadline)
}
} else {
flushTime = time.Now()
if a.dedupInterval > 0 {
dedupTime = flushTime
}
a.dedupFlush(dedupTime)
a.flush(pushFunc, flushTime)
}
a.dedupFlush(dedupTime)
pf := pushFunc
if skipIncompleteFlush || ignoreFirstIntervals > 0 {
pf = nil
}
a.flush(pf, flushTime, true)
}
func (a *aggregator) dedupFlush(dedupTime time.Time) {
@ -879,9 +877,9 @@ func (a *aggregator) dedupFlush(dedupTime time.Time) {
a.da.flush(a.aggrOutputs.pushSamples, deleteDeadline.UnixMilli(), current.isGreen)
d := time.Since(startTime)
a.dedupFlushDuration.Update(d.Seconds())
a.da.flushDuration.Update(d.Seconds())
if d > a.dedupInterval {
a.dedupFlushTimeouts.Inc()
a.da.flushTimeouts.Inc()
logger.Warnf("deduplication couldn't be finished in the configured dedup_interval=%s; it took %.03fs; "+
"possible solutions: increase dedup_interval; use match filter matching smaller number of series; "+
"reduce samples' ingestion rate to stream aggregation", a.dedupInterval, d.Seconds())
@ -897,12 +895,12 @@ func (a *aggregator) dedupFlush(dedupTime time.Time) {
// flush flushes aggregator state to pushFunc.
//
// If pushFunc is nil, then the aggregator state is just reset.
func (a *aggregator) flush(pushFunc PushFunc, flushTime time.Time) {
func (a *aggregator) flush(pushFunc PushFunc, flushTime time.Time, isLast bool) {
startTime := time.Now()
ao := a.aggrOutputs
ctx := getFlushCtx(a, ao, pushFunc, flushTime.UnixMilli())
ctx := getFlushCtx(a, ao, pushFunc, flushTime.UnixMilli(), isLast)
if a.dedupInterval <= 0 {
current := a.current.Load()
ctx.isGreen = current.isGreen
@ -1039,11 +1037,11 @@ func (a *aggregator) Push(tss []prompbmarshal.TimeSeries, matchIdxs []byte) {
func compressLabels(dst []byte, inputLabels, outputLabels []prompbmarshal.Label) []byte {
bb := bbPool.Get()
bb.B = lc.Compress(bb.B, inputLabels)
bb.B = lc.Compress(bb.B, outputLabels)
dst = encoding.MarshalVarUint64(dst, uint64(len(bb.B)))
dst = append(dst, bb.B...)
bbPool.Put(bb)
dst = lc.Compress(dst, outputLabels)
dst = lc.Compress(dst, inputLabels)
return dst
}
@ -1051,18 +1049,6 @@ func decompressLabels(dst []prompbmarshal.Label, key string) []prompbmarshal.Lab
return lc.Decompress(dst, bytesutil.ToUnsafeBytes(key))
}
func getInputOutputKey(key string) (string, string) {
src := bytesutil.ToUnsafeBytes(key)
inputKeyLen, nSize := encoding.UnmarshalVarUint64(src)
if nSize <= 0 {
logger.Panicf("BUG: cannot unmarshal inputKeyLen from uvarint")
}
src = src[nSize:]
inputKey := src[:inputKeyLen]
outputKey := src[inputKeyLen:]
return bytesutil.ToUnsafeString(inputKey), bytesutil.ToUnsafeString(outputKey)
}
type pushCtx struct {
green []pushSample
blue []pushSample
@ -1125,7 +1111,7 @@ func getInputOutputLabels(dstInput, dstOutput, labels []prompbmarshal.Label, by,
return dstInput, dstOutput
}
func getFlushCtx(a *aggregator, ao *aggrOutputs, pushFunc PushFunc, flushTimestamp int64) *flushCtx {
func getFlushCtx(a *aggregator, ao *aggrOutputs, pushFunc PushFunc, flushTimestamp int64, isLast bool) *flushCtx {
v := flushCtxPool.Get()
if v == nil {
v = &flushCtx{}
@ -1135,6 +1121,7 @@ func getFlushCtx(a *aggregator, ao *aggrOutputs, pushFunc PushFunc, flushTimesta
ctx.ao = ao
ctx.pushFunc = pushFunc
ctx.flushTimestamp = flushTimestamp
ctx.isLast = isLast
return ctx
}
@ -1151,6 +1138,7 @@ type flushCtx struct {
pushFunc PushFunc
flushTimestamp int64
isGreen bool
isLast bool
tss []prompbmarshal.TimeSeries
labels []prompbmarshal.Label
@ -1161,6 +1149,8 @@ func (ctx *flushCtx) reset() {
ctx.a = nil
ctx.ao = nil
ctx.pushFunc = nil
ctx.isGreen = false
ctx.isLast = false
ctx.flushTimestamp = 0
ctx.resetSeries()
}

View file

@ -1,10 +1,7 @@
package streamaggr
func sumSamplesInitFn(v *aggrValues, enableWindows bool) {
v.blue = append(v.blue, new(sumSamplesAggrValue))
if enableWindows {
v.green = append(v.green, new(sumSamplesAggrValue))
}
func sumSamplesInitFn(_ any) aggrValue {
return &sumSamplesAggrValue{}
}
type sumSamplesAggrValue struct {
@ -19,3 +16,7 @@ func (av *sumSamplesAggrValue) flush(ctx *flushCtx, key string) {
ctx.appendSeries(key, "sum_samples", av.sum)
av.sum = 0
}
func (*sumSamplesAggrValue) state() any {
return nil
}

View file

@ -7,25 +7,22 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
)
func totalInitFn(ignoreFirstSampleIntervalSecs uint64, resetTotalOnFlush, keepFirstSample bool) aggrValuesFn {
func totalInitFn(ignoreFirstSampleIntervalSecs uint64, resetTotalOnFlush, keepFirstSample bool) aggrValueFn {
ignoreFirstSampleDeadline := fasttime.UnixTimestamp() + ignoreFirstSampleIntervalSecs
return func(v *aggrValues, enableWindows bool) {
shared := &totalAggrValueShared{
lastValues: make(map[string]totalLastValue),
}
v.blue = append(v.green, &totalAggrValue{
keepFirstSample: keepFirstSample,
resetTotalOnFlush: resetTotalOnFlush,
shared: shared,
ignoreFirstSampleDeadline: ignoreFirstSampleDeadline,
})
if enableWindows {
v.green = append(v.green, &totalAggrValue{
return func(s any) aggrValue {
var shared *totalAggrValueShared
if s == nil {
shared = &totalAggrValueShared{
lastValues: make(map[string]totalLastValue),
keepFirstSample: keepFirstSample,
resetTotalOnFlush: resetTotalOnFlush,
shared: shared,
ignoreFirstSampleDeadline: ignoreFirstSampleDeadline,
})
}
} else {
shared = s.(*totalAggrValueShared)
}
return &totalAggrValue{
shared: shared,
}
}
}
@ -37,14 +34,9 @@ type totalLastValue struct {
}
type totalAggrValueShared struct {
lastValues map[string]totalLastValue
total float64
}
type totalAggrValue struct {
lastValues map[string]totalLastValue
total float64
resetTotalOnFlush bool
shared *totalAggrValueShared
// Whether to take into account the first sample in new time series when calculating the output value.
keepFirstSample bool
@ -55,11 +47,16 @@ type totalAggrValue struct {
ignoreFirstSampleDeadline uint64
}
func (av *totalAggrValue) pushSample(inputKey string, sample *pushSample, deleteDeadline int64) {
type totalAggrValue struct {
total float64
shared *totalAggrValueShared
}
func (av *totalAggrValue) pushSample(key string, sample *pushSample, deleteDeadline int64) {
shared := av.shared
currentTime := fasttime.UnixTimestamp()
keepFirstSample := av.keepFirstSample && currentTime >= av.ignoreFirstSampleDeadline
lv, ok := shared.lastValues[inputKey]
keepFirstSample := shared.keepFirstSample && currentTime >= shared.ignoreFirstSampleDeadline
lv, ok := shared.lastValues[key]
if ok || keepFirstSample {
if sample.timestamp < lv.timestamp {
// Skip out of order sample
@ -76,8 +73,8 @@ func (av *totalAggrValue) pushSample(inputKey string, sample *pushSample, delete
lv.timestamp = sample.timestamp
lv.deleteDeadline = deleteDeadline
inputKey = bytesutil.InternString(inputKey)
shared.lastValues[inputKey] = lv
key = bytesutil.InternString(key)
shared.lastValues[key] = lv
}
func (av *totalAggrValue) flush(ctx *flushCtx, key string) {
@ -91,7 +88,7 @@ func (av *totalAggrValue) flush(ctx *flushCtx, key string) {
delete(lvs, lk)
}
}
if av.resetTotalOnFlush {
if av.shared.resetTotalOnFlush {
av.shared.total = 0
} else if math.Abs(total) >= (1 << 53) {
// It is time to reset the entry, since it starts losing float64 precision
@ -103,15 +100,19 @@ func (av *totalAggrValue) flush(ctx *flushCtx, key string) {
}
func (av *totalAggrValue) getSuffix() string {
// Note: this function is at hot path, so it shouldn't allocate.
if av.resetTotalOnFlush {
if av.keepFirstSample {
shared := av.shared
if shared.resetTotalOnFlush {
if shared.keepFirstSample {
return "increase"
}
return "increase_prometheus"
}
if av.keepFirstSample {
if shared.keepFirstSample {
return "total"
}
return "total_prometheus"
}
func (av *totalAggrValue) state() any {
return av.shared
}

View file

@ -1,13 +1,8 @@
package streamaggr
func uniqueSamplesInitFn(v *aggrValues, enableWindows bool) {
v.blue = append(v.blue, &uniqueSamplesAggrValue{
func uniqueSamplesInitFn(_ any) aggrValue {
return &uniqueSamplesAggrValue{
samples: make(map[float64]struct{}),
})
if enableWindows {
v.green = append(v.green, &uniqueSamplesAggrValue{
samples: make(map[float64]struct{}),
})
}
}
@ -23,5 +18,9 @@ func (av *uniqueSamplesAggrValue) pushSample(_ string, sample *pushSample, _ int
func (av *uniqueSamplesAggrValue) flush(ctx *flushCtx, key string) {
ctx.appendSeries(key, "unique_samples", float64(len(av.samples)))
clear(av.samples)
av.samples = make(map[float64]struct{}, len(av.samples))
}
func (*uniqueSamplesAggrValue) state() any {
return nil
}