use single sync.map for all output states, moved common logic out of outputs logic

This commit is contained in:
Andrii Chubatiuk 2024-09-24 23:03:04 +03:00
parent 2f7e0637de
commit 3a14689567
No known key found for this signature in database
GPG key ID: 96D776CC99880667
26 changed files with 710 additions and 1426 deletions

View file

@ -53,7 +53,7 @@ func TestGetLabelsHash_Distribution(t *testing.T) {
} }
func TestRemoteWriteContext_TryPush_ImmutableTimeseries(t *testing.T) { func TestRemoteWriteContext_TryPush_ImmutableTimeseries(t *testing.T) {
f := func(streamAggrConfig, relabelConfig string, dedupInterval time.Duration, keepInput, dropInput bool, input string) { f := func(streamAggrConfig, relabelConfig string, stateSize int, dedupInterval time.Duration, keepInput, dropInput bool, input string) {
t.Helper() t.Helper()
perURLRelabel, err := promrelabel.ParseRelabelConfigsData([]byte(relabelConfig)) perURLRelabel, err := promrelabel.ParseRelabelConfigsData([]byte(relabelConfig))
if err != nil { if err != nil {
@ -77,12 +77,15 @@ func TestRemoteWriteContext_TryPush_ImmutableTimeseries(t *testing.T) {
rowsDroppedByRelabel: metrics.GetOrCreateCounter(`bar`), rowsDroppedByRelabel: metrics.GetOrCreateCounter(`bar`),
} }
if dedupInterval > 0 { if dedupInterval > 0 {
rwctx.deduplicator = streamaggr.NewDeduplicator(nil, dedupInterval, nil, "dedup-global") rwctx.deduplicator = streamaggr.NewDeduplicator(nil, stateSize, dedupInterval, nil, "dedup-global")
} }
if streamAggrConfig != "" { if streamAggrConfig != "" {
pushNoop := func(_ []prompbmarshal.TimeSeries) {} pushNoop := func(_ []prompbmarshal.TimeSeries) {}
sas, err := streamaggr.LoadFromData([]byte(streamAggrConfig), pushNoop, nil, "global") opts := streamaggr.Options{
StateSize: stateSize,
}
sas, err := streamaggr.LoadFromData([]byte(streamAggrConfig), pushNoop, &opts, "global")
if err != nil { if err != nil {
t.Fatalf("cannot load streamaggr configs: %s", err) t.Fatalf("cannot load streamaggr configs: %s", err)
} }
@ -114,13 +117,13 @@ func TestRemoteWriteContext_TryPush_ImmutableTimeseries(t *testing.T) {
- action: keep - action: keep
source_labels: [env] source_labels: [env]
regex: "dev" regex: "dev"
`, 0, false, false, ` `, 1, 0, false, false, `
metric{env="dev"} 10 metric{env="dev"} 10
metric{env="bar"} 20 metric{env="bar"} 20
metric{env="dev"} 15 metric{env="dev"} 15
metric{env="bar"} 25 metric{env="bar"} 25
`) `)
f(``, ``, time.Hour, false, false, ` f(``, ``, 2, time.Hour, false, false, `
metric{env="dev"} 10 metric{env="dev"} 10
metric{env="foo"} 20 metric{env="foo"} 20
metric{env="dev"} 15 metric{env="dev"} 15
@ -130,7 +133,7 @@ metric{env="foo"} 25
- action: keep - action: keep
source_labels: [env] source_labels: [env]
regex: "dev" regex: "dev"
`, time.Hour, false, false, ` `, 3, time.Hour, false, false, `
metric{env="dev"} 10 metric{env="dev"} 10
metric{env="bar"} 20 metric{env="bar"} 20
metric{env="dev"} 15 metric{env="dev"} 15
@ -140,7 +143,7 @@ metric{env="bar"} 25
- action: keep - action: keep
source_labels: [env] source_labels: [env]
regex: "dev" regex: "dev"
`, time.Hour, true, false, ` `, 6, time.Hour, true, false, `
metric{env="test"} 10 metric{env="test"} 10
metric{env="dev"} 20 metric{env="dev"} 20
metric{env="foo"} 15 metric{env="foo"} 15
@ -150,7 +153,7 @@ metric{env="dev"} 25
- action: keep - action: keep
source_labels: [env] source_labels: [env]
regex: "dev" regex: "dev"
`, time.Hour, false, true, ` `, 10, time.Hour, false, true, `
metric{env="foo"} 10 metric{env="foo"} 10
metric{env="dev"} 20 metric{env="dev"} 20
metric{env="foo"} 15 metric{env="foo"} 15
@ -160,7 +163,7 @@ metric{env="dev"} 25
- action: keep - action: keep
source_labels: [env] source_labels: [env]
regex: "dev" regex: "dev"
`, time.Hour, true, true, ` `, 11, time.Hour, true, true, `
metric{env="dev"} 10 metric{env="dev"} 10
metric{env="test"} 20 metric{env="test"} 20
metric{env="dev"} 15 metric{env="dev"} 15

View file

@ -35,6 +35,7 @@ var (
"clients pushing data into the vmagent. See https://docs.victoriametrics.com/stream-aggregation/#ignore-aggregation-intervals-on-start") "clients pushing data into the vmagent. See https://docs.victoriametrics.com/stream-aggregation/#ignore-aggregation-intervals-on-start")
streamAggrGlobalDropInputLabels = flagutil.NewArrayString("streamAggr.dropInputLabels", "An optional list of labels to drop from samples for aggregator "+ streamAggrGlobalDropInputLabels = flagutil.NewArrayString("streamAggr.dropInputLabels", "An optional list of labels to drop from samples for aggregator "+
"before stream de-duplication and aggregation . See https://docs.victoriametrics.com/stream-aggregation/#dropping-unneeded-labels") "before stream de-duplication and aggregation . See https://docs.victoriametrics.com/stream-aggregation/#dropping-unneeded-labels")
streamAggrGlobalStateSize = flag.Int("streamAggr.stateSize", 1, "Amount of aggregation intervals")
// Per URL config // Per URL config
streamAggrConfig = flagutil.NewArrayString("remoteWrite.streamAggr.config", "Optional path to file with stream aggregation config for the corresponding -remoteWrite.url. "+ streamAggrConfig = flagutil.NewArrayString("remoteWrite.streamAggr.config", "Optional path to file with stream aggregation config for the corresponding -remoteWrite.url. "+
@ -59,6 +60,7 @@ var (
"before stream de-duplication and aggregation with -remoteWrite.streamAggr.config and -remoteWrite.streamAggr.dedupInterval at the corresponding -remoteWrite.url. "+ "before stream de-duplication and aggregation with -remoteWrite.streamAggr.config and -remoteWrite.streamAggr.dedupInterval at the corresponding -remoteWrite.url. "+
"Multiple labels per remoteWrite.url must be delimited by '^^': -remoteWrite.streamAggr.dropInputLabels='replica^^az,replica'. "+ "Multiple labels per remoteWrite.url must be delimited by '^^': -remoteWrite.streamAggr.dropInputLabels='replica^^az,replica'. "+
"See https://docs.victoriametrics.com/stream-aggregation/#dropping-unneeded-labels") "See https://docs.victoriametrics.com/stream-aggregation/#dropping-unneeded-labels")
streamAggrStateSize = flagutil.NewArrayInt("remoteWrite.streamAggr.stateSize", 1, "Amount of aggregation intervals")
) )
// CheckStreamAggrConfigs checks -remoteWrite.streamAggr.config and -streamAggr.config. // CheckStreamAggrConfigs checks -remoteWrite.streamAggr.config and -streamAggr.config.
@ -134,8 +136,11 @@ func initStreamAggrConfigGlobal() {
metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_success_timestamp_seconds{path=%q}`, filePath)).Set(fasttime.UnixTimestamp()) metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_success_timestamp_seconds{path=%q}`, filePath)).Set(fasttime.UnixTimestamp())
} }
dedupInterval := streamAggrGlobalDedupInterval.Duration() dedupInterval := streamAggrGlobalDedupInterval.Duration()
if *streamAggrGlobalStateSize < 1 {
logger.Fatalf("--streamAggr.stateSize should be greater than 0")
}
if dedupInterval > 0 { if dedupInterval > 0 {
deduplicatorGlobal = streamaggr.NewDeduplicator(pushToRemoteStoragesTrackDropped, dedupInterval, *streamAggrGlobalDropInputLabels, "dedup-global") deduplicatorGlobal = streamaggr.NewDeduplicator(pushToRemoteStoragesTrackDropped, *streamAggrGlobalStateSize, dedupInterval, *streamAggrGlobalDropInputLabels, "dedup-global")
} }
} }
@ -161,7 +166,7 @@ func (rwctx *remoteWriteCtx) initStreamAggrConfig() {
if streamAggrDropInputLabels.GetOptionalArg(idx) != "" { if streamAggrDropInputLabels.GetOptionalArg(idx) != "" {
dropLabels = strings.Split(streamAggrDropInputLabels.GetOptionalArg(idx), "^^") dropLabels = strings.Split(streamAggrDropInputLabels.GetOptionalArg(idx), "^^")
} }
rwctx.deduplicator = streamaggr.NewDeduplicator(rwctx.pushInternalTrackDropped, dedupInterval, dropLabels, alias) rwctx.deduplicator = streamaggr.NewDeduplicator(rwctx.pushInternalTrackDropped, *streamAggrGlobalStateSize, dedupInterval, dropLabels, alias)
} }
} }
@ -207,6 +212,7 @@ func newStreamAggrConfigGlobal() (*streamaggr.Aggregators, error) {
IgnoreOldSamples: *streamAggrGlobalIgnoreOldSamples, IgnoreOldSamples: *streamAggrGlobalIgnoreOldSamples,
IgnoreFirstIntervals: *streamAggrGlobalIgnoreFirstIntervals, IgnoreFirstIntervals: *streamAggrGlobalIgnoreFirstIntervals,
KeepInput: *streamAggrGlobalKeepInput, KeepInput: *streamAggrGlobalKeepInput,
StateSize: *streamAggrGlobalStateSize,
} }
sas, err := streamaggr.LoadFromFile(path, pushToRemoteStoragesTrackDropped, opts, "global") sas, err := streamaggr.LoadFromFile(path, pushToRemoteStoragesTrackDropped, opts, "global")
@ -221,6 +227,9 @@ func (rwctx *remoteWriteCtx) newStreamAggrConfig() (*streamaggr.Aggregators, err
} }
func newStreamAggrConfigPerURL(idx int, pushFunc streamaggr.PushFunc) (*streamaggr.Aggregators, error) { func newStreamAggrConfigPerURL(idx int, pushFunc streamaggr.PushFunc) (*streamaggr.Aggregators, error) {
if streamAggrStateSize.GetOptionalArg(idx) < 1 {
return nil, fmt.Errorf("--remoteWrite.streamAggr.stateSize should be greater than 0")
}
path := streamAggrConfig.GetOptionalArg(idx) path := streamAggrConfig.GetOptionalArg(idx)
if path == "" { if path == "" {
return nil, nil return nil, nil
@ -240,6 +249,7 @@ func newStreamAggrConfigPerURL(idx int, pushFunc streamaggr.PushFunc) (*streamag
IgnoreOldSamples: streamAggrIgnoreOldSamples.GetOptionalArg(idx), IgnoreOldSamples: streamAggrIgnoreOldSamples.GetOptionalArg(idx),
IgnoreFirstIntervals: streamAggrIgnoreFirstIntervals.GetOptionalArg(idx), IgnoreFirstIntervals: streamAggrIgnoreFirstIntervals.GetOptionalArg(idx),
KeepInput: streamAggrKeepInput.GetOptionalArg(idx), KeepInput: streamAggrKeepInput.GetOptionalArg(idx),
StateSize: streamAggrStateSize.GetOptionalArg(idx),
} }
sas, err := streamaggr.LoadFromFile(path, pushFunc, opts, alias) sas, err := streamaggr.LoadFromFile(path, pushFunc, opts, alias)

View file

@ -36,6 +36,7 @@ var (
"See https://docs.victoriametrics.com/stream-aggregation/#ignoring-old-samples") "See https://docs.victoriametrics.com/stream-aggregation/#ignoring-old-samples")
streamAggrIgnoreFirstIntervals = flag.Int("streamAggr.ignoreFirstIntervals", 0, "Number of aggregation intervals to skip after the start. Increase this value if you observe incorrect aggregation results after restarts. It could be caused by receiving unordered delayed data from clients pushing data into the database. "+ streamAggrIgnoreFirstIntervals = flag.Int("streamAggr.ignoreFirstIntervals", 0, "Number of aggregation intervals to skip after the start. Increase this value if you observe incorrect aggregation results after restarts. It could be caused by receiving unordered delayed data from clients pushing data into the database. "+
"See https://docs.victoriametrics.com/stream-aggregation/#ignore-aggregation-intervals-on-start") "See https://docs.victoriametrics.com/stream-aggregation/#ignore-aggregation-intervals-on-start")
streamAggrStateSize = flag.Int("streamAggr.stateSize", 1, "Number of aggregation intervals")
) )
var ( var (
@ -62,6 +63,7 @@ func CheckStreamAggrConfig() error {
DropInputLabels: *streamAggrDropInputLabels, DropInputLabels: *streamAggrDropInputLabels,
IgnoreOldSamples: *streamAggrIgnoreOldSamples, IgnoreOldSamples: *streamAggrIgnoreOldSamples,
IgnoreFirstIntervals: *streamAggrIgnoreFirstIntervals, IgnoreFirstIntervals: *streamAggrIgnoreFirstIntervals,
StateSize: *streamAggrStateSize,
} }
sas, err := streamaggr.LoadFromFile(*streamAggrConfig, pushNoop, opts, "global") sas, err := streamaggr.LoadFromFile(*streamAggrConfig, pushNoop, opts, "global")
if err != nil { if err != nil {
@ -78,7 +80,7 @@ func InitStreamAggr() {
saCfgReloaderStopCh = make(chan struct{}) saCfgReloaderStopCh = make(chan struct{})
if *streamAggrConfig == "" { if *streamAggrConfig == "" {
if *streamAggrDedupInterval > 0 { if *streamAggrDedupInterval > 0 {
deduplicator = streamaggr.NewDeduplicator(pushAggregateSeries, *streamAggrDedupInterval, *streamAggrDropInputLabels, "global") deduplicator = streamaggr.NewDeduplicator(pushAggregateSeries, *streamAggrStateSize, *streamAggrDedupInterval, *streamAggrDropInputLabels, "global")
} }
return return
} }

View file

@ -1,90 +1,20 @@
package streamaggr package streamaggr
import ( type avgAggrValue struct {
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
)
// avgAggrState calculates output=avg, e.g. the average value over input samples.
type avgAggrState struct {
m sync.Map
}
type avgState struct {
sum float64 sum float64
count float64 count float64
} }
type avgStateValue struct { func (sv *avgAggrValue) pushSample(ctx *pushSampleCtx) {
mu sync.Mutex sv.sum += ctx.sample.value
state [aggrStateSize]avgState sv.count++
deleted bool
deleteDeadline int64
} }
func newAvgAggrState() *avgAggrState { func (sv *avgAggrValue) flush(ctx *flushCtx, key string) {
return &avgAggrState{} if sv.count > 0 {
} avg := sv.sum / sv.count
ctx.appendSeries(key, "avg", avg)
func (as *avgAggrState) pushSamples(samples []pushSample, deleteDeadline int64, idx int) { sv.sum = 0
for i := range samples { sv.count = 0
s := &samples[i]
outputKey := getOutputKey(s.key)
again:
v, ok := as.m.Load(outputKey)
if !ok {
// The entry is missing in the map. Try creating it.
v = &avgStateValue{}
outputKey = bytesutil.InternString(outputKey)
vNew, loaded := as.m.LoadOrStore(outputKey, v)
if loaded {
// Use the entry created by a concurrent goroutine.
v = vNew
}
}
sv := v.(*avgStateValue)
sv.mu.Lock()
deleted := sv.deleted
if !deleted {
sv.state[idx].sum += s.value
sv.state[idx].count++
sv.deleteDeadline = deleteDeadline
}
sv.mu.Unlock()
if deleted {
// The entry has been deleted by the concurrent call to flushState
// Try obtaining and updating the entry again.
goto again
}
} }
} }
func (as *avgAggrState) flushState(ctx *flushCtx) {
m := &as.m
m.Range(func(k, v any) bool {
// Atomically delete the entry from the map, so new entry is created for the next flush.
sv := v.(*avgStateValue)
sv.mu.Lock()
// check for stale entries
deleted := ctx.flushTimestamp > sv.deleteDeadline
if deleted {
// Mark the current entry as deleted
sv.deleted = deleted
sv.mu.Unlock()
m.Delete(k)
return true
}
state := sv.state[ctx.idx]
sv.state[ctx.idx] = avgState{}
sv.mu.Unlock()
if state.count > 0 {
key := k.(string)
avg := state.sum/state.count
ctx.appendSeries(key, "avg", avg)
}
return true
})
}

View file

@ -1,82 +1,14 @@
package streamaggr package streamaggr
import ( type countSamplesAggrValue struct {
"sync" count uint64
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
)
// countSamplesAggrState calculates output=count_samples, e.g. the count of input samples.
type countSamplesAggrState struct {
m sync.Map
} }
type countSamplesStateValue struct { func (av *countSamplesAggrValue) pushSample(_ *pushSampleCtx) {
mu sync.Mutex av.count++
state [aggrStateSize]uint64
deleted bool
deleteDeadline int64
} }
func newCountSamplesAggrState() *countSamplesAggrState { func (av *countSamplesAggrValue) flush(ctx *flushCtx, key string) {
return &countSamplesAggrState{} ctx.appendSeries(key, "count_samples", float64(av.count))
} av.count = 0
func (as *countSamplesAggrState) pushSamples(samples []pushSample, deleteDeadline int64, idx int) {
for i := range samples {
s := &samples[i]
outputKey := getOutputKey(s.key)
again:
v, ok := as.m.Load(outputKey)
if !ok {
// The entry is missing in the map. Try creating it.
v = &countSamplesStateValue{}
outputKey = bytesutil.InternString(outputKey)
vNew, loaded := as.m.LoadOrStore(outputKey, v)
if loaded {
// Use the entry created by a concurrent goroutine.
v = vNew
}
}
sv := v.(*countSamplesStateValue)
sv.mu.Lock()
deleted := sv.deleted
if !deleted {
sv.state[idx]++
sv.deleteDeadline = deleteDeadline
}
sv.mu.Unlock()
if deleted {
// The entry has been deleted by the concurrent call to flushState
// Try obtaining and updating the entry again.
goto again
}
}
}
func (as *countSamplesAggrState) flushState(ctx *flushCtx) {
m := &as.m
m.Range(func(k, v any) bool {
sv := v.(*countSamplesStateValue)
sv.mu.Lock()
// check for stale entries
deleted := ctx.flushTimestamp > sv.deleteDeadline
if deleted {
// Mark the current entry as deleted
sv.deleted = deleted
sv.mu.Unlock()
m.Delete(k)
return true
}
state := sv.state[ctx.idx]
sv.state[ctx.idx] = 0
sv.mu.Unlock()
if state > 0 {
key := k.(string)
ctx.appendSeries(key, "count_samples", float64(state))
}
return true
})
} }

View file

@ -1,93 +1,33 @@
package streamaggr package streamaggr
import ( import (
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/cespare/xxhash/v2" "github.com/cespare/xxhash/v2"
) )
// countSeriesAggrState calculates output=count_series, e.g. the number of unique series. func countSeriesInitFn(values []aggrValue) []aggrValue {
type countSeriesAggrState struct { for i := range values {
m sync.Map values[i] = &countSeriesAggrValue{
samples: make(map[uint64]struct{}),
}
}
return values
} }
type countSeriesStateValue struct { type countSeriesAggrValue struct {
mu sync.Mutex samples map[uint64]struct{}
state [aggrStateSize]map[uint64]struct{}
deleted bool
deleteDeadline int64
} }
func newCountSeriesAggrState() *countSeriesAggrState { func (av *countSeriesAggrValue) pushSample(ctx *pushSampleCtx) {
return &countSeriesAggrState{} // Count unique hashes over the inputKeys instead of unique inputKey values.
} // This reduces memory usage at the cost of possible hash collisions for distinct inputKey values.
h := xxhash.Sum64(bytesutil.ToUnsafeBytes(ctx.inputKey))
func (as *countSeriesAggrState) pushSamples(samples []pushSample, deleteDeadline int64, idx int) { if _, ok := av.samples[h]; !ok {
for i := range samples { av.samples[h] = struct{}{}
s := &samples[i]
inputKey, outputKey := getInputOutputKey(s.key)
// Count unique hashes over the inputKeys instead of unique inputKey values.
// This reduces memory usage at the cost of possible hash collisions for distinct inputKey values.
h := xxhash.Sum64(bytesutil.ToUnsafeBytes(inputKey))
again:
v, ok := as.m.Load(outputKey)
if !ok {
// The entry is missing in the map. Try creating it.
csv := &countSeriesStateValue{}
for ic := range csv.state {
csv.state[ic] = make(map[uint64]struct{})
}
v = csv
outputKey = bytesutil.InternString(outputKey)
vNew, loaded := as.m.LoadOrStore(outputKey, v)
if loaded {
// Update the entry created by a concurrent goroutine.
v = vNew
}
}
sv := v.(*countSeriesStateValue)
sv.mu.Lock()
deleted := sv.deleted
if !deleted {
if _, ok := sv.state[idx][h]; !ok {
sv.state[idx][h] = struct{}{}
}
sv.deleteDeadline = deleteDeadline
}
sv.mu.Unlock()
if deleted {
// The entry has been deleted by the concurrent call to flushState
// Try obtaining and updating the entry again.
goto again
}
} }
} }
func (as *countSeriesAggrState) flushState(ctx *flushCtx) { func (av *countSeriesAggrValue) flush(ctx *flushCtx, key string) {
m := &as.m ctx.appendSeries(key, "count_series", float64(len(av.samples)))
m.Range(func(k, v any) bool { clear(av.samples)
sv := v.(*countSeriesStateValue)
sv.mu.Lock()
// check for stale entries
deleted := ctx.flushTimestamp > sv.deleteDeadline
if deleted {
// Mark the current entry as deleted
sv.deleted = deleted
sv.mu.Unlock()
m.Delete(k)
return true
}
state := len(sv.state[ctx.idx])
sv.state[ctx.idx] = make(map[uint64]struct{})
sv.mu.Unlock()
if state > 0 {
key := k.(string)
ctx.appendSeries(key, "count_series", float64(state))
}
return true
})
} }

View file

@ -14,8 +14,8 @@ import (
const dedupAggrShardsCount = 128 const dedupAggrShardsCount = 128
type dedupAggr struct { type dedupAggr struct {
shards []dedupAggrShard shards []dedupAggrShard
currentIdx atomic.Int32 stateSize int
} }
type dedupAggrShard struct { type dedupAggrShard struct {
@ -34,8 +34,8 @@ type dedupAggrState struct {
} }
type dedupAggrShardNopad struct { type dedupAggrShardNopad struct {
mu sync.RWMutex mu sync.Mutex
state [aggrStateSize]*dedupAggrState state []*dedupAggrState
} }
type dedupAggrSample struct { type dedupAggrSample struct {
@ -43,19 +43,21 @@ type dedupAggrSample struct {
timestamp int64 timestamp int64
} }
func newDedupAggr() *dedupAggr { func newDedupAggr(stateSize int) *dedupAggr {
shards := make([]dedupAggrShard, dedupAggrShardsCount) shards := make([]dedupAggrShard, dedupAggrShardsCount)
return &dedupAggr{ return &dedupAggr{
shards: shards, shards: shards,
stateSize: stateSize,
} }
} }
func (da *dedupAggr) sizeBytes() uint64 { func (da *dedupAggr) sizeBytes() uint64 {
n := uint64(unsafe.Sizeof(*da)) n := uint64(unsafe.Sizeof(*da))
currentIdx := da.currentIdx.Load()
for i := range da.shards { for i := range da.shards {
if da.shards[i].state[currentIdx] != nil { for _, state := range da.shards[i].state {
n += da.shards[i].state[currentIdx].sizeBytes.Load() if state != nil {
n += state.sizeBytes.Load()
}
} }
} }
return n return n
@ -63,19 +65,20 @@ func (da *dedupAggr) sizeBytes() uint64 {
func (da *dedupAggr) itemsCount() uint64 { func (da *dedupAggr) itemsCount() uint64 {
n := uint64(0) n := uint64(0)
currentIdx := da.currentIdx.Load()
for i := range da.shards { for i := range da.shards {
if da.shards[i].state[currentIdx] != nil { for _, state := range da.shards[i].state {
n += da.shards[i].state[currentIdx].itemsCount.Load() if state != nil {
n += state.itemsCount.Load()
}
} }
} }
return n return n
} }
func (da *dedupAggr) pushSamples(samples []pushSample, _ int64, dedupIdx int) { func (da *dedupAggr) pushSamples(data *pushCtxData) {
pss := getPerShardSamples() pss := getPerShardSamples()
shards := pss.shards shards := pss.shards
for _, sample := range samples { for _, sample := range data.samples {
h := xxhash.Sum64(bytesutil.ToUnsafeBytes(sample.key)) h := xxhash.Sum64(bytesutil.ToUnsafeBytes(sample.key))
idx := h % uint64(len(shards)) idx := h % uint64(len(shards))
shards[idx] = append(shards[idx], sample) shards[idx] = append(shards[idx], sample)
@ -84,17 +87,21 @@ func (da *dedupAggr) pushSamples(samples []pushSample, _ int64, dedupIdx int) {
if len(shardSamples) == 0 { if len(shardSamples) == 0 {
continue continue
} }
da.shards[i].pushSamples(shardSamples, dedupIdx) da.shards[i].pushSamples(shardSamples, da.stateSize, data.idx)
} }
putPerShardSamples(pss) putPerShardSamples(pss)
} }
func getDedupFlushCtx() *dedupFlushCtx { func getDedupFlushCtx(deleteDeadline int64, dedupIdx, flushIdx int) *dedupFlushCtx {
v := dedupFlushCtxPool.Get() v := dedupFlushCtxPool.Get()
if v == nil { if v == nil {
return &dedupFlushCtx{} v = &dedupFlushCtx{}
} }
return v.(*dedupFlushCtx) ctx := v.(*dedupFlushCtx)
ctx.deleteDeadline = deleteDeadline
ctx.dedupIdx = dedupIdx
ctx.flushIdx = flushIdx
return ctx
} }
func putDedupFlushCtx(ctx *dedupFlushCtx) { func putDedupFlushCtx(ctx *dedupFlushCtx) {
@ -105,12 +112,26 @@ func putDedupFlushCtx(ctx *dedupFlushCtx) {
var dedupFlushCtxPool sync.Pool var dedupFlushCtxPool sync.Pool
type dedupFlushCtx struct { type dedupFlushCtx struct {
samples []pushSample samples []pushSample
deleteDeadline int64
dedupIdx int
flushIdx int
}
func (ctx *dedupFlushCtx) getPushCtxData(samples []pushSample) *pushCtxData {
return &pushCtxData{
samples: samples,
deleteDeadline: ctx.deleteDeadline,
idx: ctx.flushIdx,
}
} }
func (ctx *dedupFlushCtx) reset() { func (ctx *dedupFlushCtx) reset() {
clear(ctx.samples) clear(ctx.samples)
ctx.samples = ctx.samples[:0] ctx.samples = ctx.samples[:0]
ctx.deleteDeadline = 0
ctx.dedupIdx = 0
ctx.flushIdx = 0
} }
func (da *dedupAggr) flush(f aggrPushFunc, deleteDeadline int64, dedupIdx, flushIdx int) { func (da *dedupAggr) flush(f aggrPushFunc, deleteDeadline int64, dedupIdx, flushIdx int) {
@ -123,13 +144,11 @@ func (da *dedupAggr) flush(f aggrPushFunc, deleteDeadline int64, dedupIdx, flush
<-flushConcurrencyCh <-flushConcurrencyCh
wg.Done() wg.Done()
}() }()
ctx := getDedupFlushCtx(deleteDeadline, dedupIdx, flushIdx)
ctx := getDedupFlushCtx() shard.flush(ctx, f)
shard.flush(ctx, f, deleteDeadline, dedupIdx, flushIdx)
putDedupFlushCtx(ctx) putDedupFlushCtx(ctx)
}(&da.shards[i]) }(&da.shards[i])
} }
da.currentIdx.Store((da.currentIdx.Load() + 1) % aggrStateSize)
wg.Wait() wg.Wait()
} }
@ -164,10 +183,13 @@ func putPerShardSamples(pss *perShardSamples) {
var perShardSamplesPool sync.Pool var perShardSamplesPool sync.Pool
func (das *dedupAggrShard) pushSamples(samples []pushSample, dedupIdx int) { func (das *dedupAggrShard) pushSamples(samples []pushSample, stateSize, dedupIdx int) {
das.mu.Lock() das.mu.Lock()
defer das.mu.Unlock() defer das.mu.Unlock()
if len(das.state) == 0 {
das.state = make([]*dedupAggrState, stateSize)
}
state := das.state[dedupIdx] state := das.state[dedupIdx]
if state == nil { if state == nil {
state = &dedupAggrState{ state = &dedupAggrState{
@ -200,17 +222,20 @@ func (das *dedupAggrShard) pushSamples(samples []pushSample, dedupIdx int) {
das.state[dedupIdx].samplesBuf = samplesBuf das.state[dedupIdx].samplesBuf = samplesBuf
} }
func (das *dedupAggrShard) flush(ctx *dedupFlushCtx, f aggrPushFunc, deleteDeadline int64, dedupIdx, flushIdx int) { func (das *dedupAggrShard) flush(ctx *dedupFlushCtx, f aggrPushFunc) {
das.mu.Lock() das.mu.Lock()
var m map[string]*dedupAggrSample var m map[string]*dedupAggrSample
state := das.state[dedupIdx] if len(das.state) == 0 {
return
}
state := das.state[ctx.dedupIdx]
if state != nil && len(state.m) > 0 { if state != nil && len(state.m) > 0 {
m = state.m m = state.m
das.state[dedupIdx].m = make(map[string]*dedupAggrSample, len(state.m)) das.state[ctx.dedupIdx].m = make(map[string]*dedupAggrSample, len(state.m))
das.state[dedupIdx].samplesBuf = make([]dedupAggrSample, 0, len(das.state[dedupIdx].samplesBuf)) das.state[ctx.dedupIdx].samplesBuf = make([]dedupAggrSample, 0, len(das.state[ctx.dedupIdx].samplesBuf))
das.state[dedupIdx].sizeBytes.Store(0) das.state[ctx.dedupIdx].sizeBytes.Store(0)
das.state[dedupIdx].itemsCount.Store(0) das.state[ctx.dedupIdx].itemsCount.Store(0)
} }
das.mu.Unlock() das.mu.Unlock()
@ -229,11 +254,13 @@ func (das *dedupAggrShard) flush(ctx *dedupFlushCtx, f aggrPushFunc, deleteDeadl
// Limit the number of samples per each flush in order to limit memory usage. // Limit the number of samples per each flush in order to limit memory usage.
if len(dstSamples) >= 10_000 { if len(dstSamples) >= 10_000 {
f(dstSamples, deleteDeadline, flushIdx) data := ctx.getPushCtxData(dstSamples)
f(data)
clear(dstSamples) clear(dstSamples)
dstSamples = dstSamples[:0] dstSamples = dstSamples[:0]
} }
} }
f(dstSamples, deleteDeadline, flushIdx) data := ctx.getPushCtxData(dstSamples)
f(data)
ctx.samples = dstSamples ctx.samples = dstSamples
} }

View file

@ -9,7 +9,7 @@ import (
) )
func TestDedupAggrSerial(t *testing.T) { func TestDedupAggrSerial(t *testing.T) {
da := newDedupAggr() da := newDedupAggr(2)
const seriesCount = 100_000 const seriesCount = 100_000
expectedSamplesMap := make(map[string]pushSample) expectedSamplesMap := make(map[string]pushSample)
@ -21,7 +21,10 @@ func TestDedupAggrSerial(t *testing.T) {
sample.value = float64(i + j) sample.value = float64(i + j)
expectedSamplesMap[sample.key] = *sample expectedSamplesMap[sample.key] = *sample
} }
da.pushSamples(samples, 0, 0) data := &pushCtxData{
samples: samples,
}
da.pushSamples(data)
} }
if n := da.sizeBytes(); n > 5_000_000 { if n := da.sizeBytes(); n > 5_000_000 {
@ -33,9 +36,9 @@ func TestDedupAggrSerial(t *testing.T) {
flushedSamplesMap := make(map[string]pushSample) flushedSamplesMap := make(map[string]pushSample)
var mu sync.Mutex var mu sync.Mutex
flushSamples := func(samples []pushSample, _ int64, _ int) { flushSamples := func(ctx *pushCtxData) {
mu.Lock() mu.Lock()
for _, sample := range samples { for _, sample := range ctx.samples {
flushedSamplesMap[sample.key] = sample flushedSamplesMap[sample.key] = sample
} }
mu.Unlock() mu.Unlock()
@ -59,7 +62,7 @@ func TestDedupAggrSerial(t *testing.T) {
func TestDedupAggrConcurrent(_ *testing.T) { func TestDedupAggrConcurrent(_ *testing.T) {
const concurrency = 5 const concurrency = 5
const seriesCount = 10_000 const seriesCount = 10_000
da := newDedupAggr() da := newDedupAggr(2)
var wg sync.WaitGroup var wg sync.WaitGroup
for i := 0; i < concurrency; i++ { for i := 0; i < concurrency; i++ {
@ -67,13 +70,15 @@ func TestDedupAggrConcurrent(_ *testing.T) {
go func() { go func() {
defer wg.Done() defer wg.Done()
for i := 0; i < 10; i++ { for i := 0; i < 10; i++ {
samples := make([]pushSample, seriesCount) data := &pushCtxData{
for j := range samples { samples: make([]pushSample, seriesCount),
sample := &samples[j] }
for j := range data.samples {
sample := &data.samples[j]
sample.key = fmt.Sprintf("key_%d", j) sample.key = fmt.Sprintf("key_%d", j)
sample.value = float64(i + j) sample.value = float64(i + j)
} }
da.pushSamples(samples, 0, 0) da.pushSamples(data)
} }
}() }()
} }

View file

@ -19,7 +19,7 @@ func BenchmarkDedupAggr(b *testing.B) {
func benchmarkDedupAggr(b *testing.B, samplesPerPush int) { func benchmarkDedupAggr(b *testing.B, samplesPerPush int) {
const loops = 2 const loops = 2
benchSamples := newBenchSamples(samplesPerPush) benchSamples := newBenchSamples(samplesPerPush)
da := newDedupAggr() da := newDedupAggr(2)
b.ResetTimer() b.ResetTimer()
b.ReportAllocs() b.ReportAllocs()
@ -27,7 +27,9 @@ func benchmarkDedupAggr(b *testing.B, samplesPerPush int) {
b.RunParallel(func(pb *testing.PB) { b.RunParallel(func(pb *testing.PB) {
for pb.Next() { for pb.Next() {
for i := 0; i < loops; i++ { for i := 0; i < loops; i++ {
da.pushSamples(benchSamples, 0, 0) da.pushSamples(&pushCtxData{
samples: benchSamples,
})
} }
} }
}) })

View file

@ -10,6 +10,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil"
"github.com/VictoriaMetrics/metrics" "github.com/VictoriaMetrics/metrics"
) )
@ -17,6 +18,7 @@ import (
type Deduplicator struct { type Deduplicator struct {
da *dedupAggr da *dedupAggr
stateSize int
dropLabels []string dropLabels []string
dedupInterval int64 dedupInterval int64
@ -39,11 +41,12 @@ type Deduplicator struct {
// alias is url label used in metrics exposed by the returned Deduplicator. // alias is url label used in metrics exposed by the returned Deduplicator.
// //
// MustStop must be called on the returned deduplicator in order to free up occupied resources. // MustStop must be called on the returned deduplicator in order to free up occupied resources.
func NewDeduplicator(pushFunc PushFunc, dedupInterval time.Duration, dropLabels []string, alias string) *Deduplicator { func NewDeduplicator(pushFunc PushFunc, stateSize int, dedupInterval time.Duration, dropLabels []string, alias string) *Deduplicator {
d := &Deduplicator{ d := &Deduplicator{
da: newDedupAggr(), da: newDedupAggr(stateSize),
dropLabels: dropLabels, dropLabels: dropLabels,
dedupInterval: dedupInterval.Milliseconds(), dedupInterval: dedupInterval.Milliseconds(),
stateSize: stateSize,
stopCh: make(chan struct{}), stopCh: make(chan struct{}),
ms: metrics.NewSet(), ms: metrics.NewSet(),
@ -85,13 +88,13 @@ func (d *Deduplicator) MustStop() {
// Push pushes tss to d. // Push pushes tss to d.
func (d *Deduplicator) Push(tss []prompbmarshal.TimeSeries) { func (d *Deduplicator) Push(tss []prompbmarshal.TimeSeries) {
ctx := getDeduplicatorPushCtx() ctx := getDeduplicatorPushCtx(d.stateSize)
pss := ctx.pss pss := ctx.pss
labels := &ctx.labels labels := &ctx.labels
buf := ctx.buf buf := ctx.buf
dropLabels := d.dropLabels dropLabels := d.dropLabels
aggrIntervals := int64(aggrStateSize) aggrIntervals := int64(d.stateSize)
for _, ts := range tss { for _, ts := range tss {
if len(dropLabels) > 0 { if len(dropLabels) > 0 {
labels.Labels = dropSeriesLabels(labels.Labels[:0], ts.Labels, dropLabels) labels.Labels = dropSeriesLabels(labels.Labels[:0], ts.Labels, dropLabels)
@ -117,8 +120,11 @@ func (d *Deduplicator) Push(tss []prompbmarshal.TimeSeries) {
} }
} }
data := &pushCtxData{}
for idx, ps := range pss { for idx, ps := range pss {
d.da.pushSamples(ps, 0, idx) data.idx = idx
data.samples = ps
d.da.pushSamples(data)
} }
ctx.pss = pss ctx.pss = pss
@ -146,20 +152,21 @@ func (d *Deduplicator) runFlusher(pushFunc PushFunc, dedupInterval time.Duration
flushTime := t.Truncate(dedupInterval).Add(dedupInterval) flushTime := t.Truncate(dedupInterval).Add(dedupInterval)
flushTimestamp := flushTime.UnixMilli() flushTimestamp := flushTime.UnixMilli()
flushIntervals := int(flushTimestamp / int64(dedupInterval/time.Millisecond)) flushIntervals := int(flushTimestamp / int64(dedupInterval/time.Millisecond))
flushIdx := flushIntervals % aggrStateSize flushIdx := flushIntervals % d.stateSize
d.flush(pushFunc, dedupInterval, flushTime, flushIdx) d.flush(pushFunc, dedupInterval, flushTimestamp, flushIdx)
} }
} }
} }
func (d *Deduplicator) flush(pushFunc PushFunc, dedupInterval time.Duration, flushTime time.Time, flushIdx int) { func (d *Deduplicator) flush(pushFunc PushFunc, dedupInterval time.Duration, flushTimestamp int64, idx int) {
d.da.flush(func(pss []pushSample, _ int64, _ int) { startTime := time.Now()
d.da.flush(func(data *pushCtxData) {
ctx := getDeduplicatorFlushCtx() ctx := getDeduplicatorFlushCtx()
tss := ctx.tss tss := ctx.tss
labels := ctx.labels labels := ctx.labels
samples := ctx.samples samples := ctx.samples
for _, ps := range pss { for _, ps := range data.samples {
labelsLen := len(labels) labelsLen := len(labels)
labels = decompressLabels(labels, ps.key) labels = decompressLabels(labels, ps.key)
@ -180,9 +187,9 @@ func (d *Deduplicator) flush(pushFunc PushFunc, dedupInterval time.Duration, flu
ctx.labels = labels ctx.labels = labels
ctx.samples = samples ctx.samples = samples
putDeduplicatorFlushCtx(ctx) putDeduplicatorFlushCtx(ctx)
}, flushTime.UnixMilli(), flushIdx, flushIdx) }, flushTimestamp, idx, idx)
duration := time.Since(flushTime) duration := time.Since(startTime)
d.dedupFlushDuration.Update(duration.Seconds()) d.dedupFlushDuration.Update(duration.Seconds())
if duration > dedupInterval { if duration > dedupInterval {
d.dedupFlushTimeouts.Inc() d.dedupFlushTimeouts.Inc()
@ -193,7 +200,7 @@ func (d *Deduplicator) flush(pushFunc PushFunc, dedupInterval time.Duration, flu
} }
type deduplicatorPushCtx struct { type deduplicatorPushCtx struct {
pss [aggrStateSize][]pushSample pss [][]pushSample
labels promutils.Labels labels promutils.Labels
buf []byte buf []byte
} }
@ -208,12 +215,18 @@ func (ctx *deduplicatorPushCtx) reset() {
ctx.buf = ctx.buf[:0] ctx.buf = ctx.buf[:0]
} }
func getDeduplicatorPushCtx() *deduplicatorPushCtx { func getDeduplicatorPushCtx(stateSize int) *deduplicatorPushCtx {
v := deduplicatorPushCtxPool.Get() v := deduplicatorPushCtxPool.Get()
if v == nil { if v == nil {
return &deduplicatorPushCtx{} return &deduplicatorPushCtx{
pss: make([][]pushSample, stateSize),
}
} }
return v.(*deduplicatorPushCtx) ctx := v.(*deduplicatorPushCtx)
if len(ctx.pss) < stateSize {
ctx.pss = slicesutil.SetLength(ctx.pss, stateSize)
}
return ctx
} }
func putDeduplicatorPushCtx(ctx *deduplicatorPushCtx) { func putDeduplicatorPushCtx(ctx *deduplicatorPushCtx) {

View file

@ -31,16 +31,17 @@ baz_aaa_aaa_fdd{instance="x",job="aaa",pod="sdfd-dfdfdfs",node="aosijjewrerfd",n
`, offsetMsecs) `, offsetMsecs)
dedupInterval := time.Hour dedupInterval := time.Hour
d := NewDeduplicator(pushFunc, dedupInterval, []string{"node", "instance"}, "global") d := NewDeduplicator(pushFunc, 2, dedupInterval, []string{"node", "instance"}, "global")
for i := 0; i < 10; i++ { for i := 0; i < 10; i++ {
d.Push(tss) d.Push(tss)
} }
flushTime := time.Now() flushTime := time.Now()
flushIntervals := flushTime.UnixMilli()/dedupInterval.Milliseconds() + 1 flushIntervals := flushTime.UnixMilli()/dedupInterval.Milliseconds() + 1
aggrStateSize := 2
idx := int(flushIntervals % int64(aggrStateSize)) idx := int(flushIntervals % int64(aggrStateSize))
d.flush(pushFunc, time.Hour, time.Now(), idx) d.flush(pushFunc, time.Hour, time.Now().UnixMilli(), idx)
d.MustStop() d.MustStop()
result := timeSeriessToString(tssResult) result := timeSeriessToString(tssResult)

View file

@ -9,7 +9,7 @@ import (
func BenchmarkDeduplicatorPush(b *testing.B) { func BenchmarkDeduplicatorPush(b *testing.B) {
pushFunc := func(_ []prompbmarshal.TimeSeries) {} pushFunc := func(_ []prompbmarshal.TimeSeries) {}
d := NewDeduplicator(pushFunc, time.Hour, nil, "global") d := NewDeduplicator(pushFunc, 2, time.Hour, nil, "global")
b.ReportAllocs() b.ReportAllocs()
b.SetBytes(int64(len(benchSeries))) b.SetBytes(int64(len(benchSeries)))

View file

@ -1,85 +1,24 @@
package streamaggr package streamaggr
import ( import (
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/metrics" "github.com/VictoriaMetrics/metrics"
) )
// histogramBucketAggrState calculates output=histogram_bucket, e.g. VictoriaMetrics histogram over input samples. // histogramBucketAggrValue calculates output=histogram_bucket, e.g. VictoriaMetrics histogram over input samples.
type histogramBucketAggrState struct { type histogramBucketAggrValue struct {
m sync.Map h metrics.Histogram
state metrics.Histogram
} }
type histogramBucketStateValue struct { func (sv *histogramBucketAggrValue) pushSample(ctx *pushSampleCtx) {
mu sync.Mutex sv.h.Update(ctx.sample.value)
state [aggrStateSize]metrics.Histogram
total metrics.Histogram
deleted bool
deleteDeadline int64
} }
func newHistogramBucketAggrState() *histogramBucketAggrState { func (sv *histogramBucketAggrValue) flush(ctx *flushCtx, key string) {
return &histogramBucketAggrState{} total := &sv.state
} total.Merge(&sv.h)
total.VisitNonZeroBuckets(func(vmrange string, count uint64) {
func (as *histogramBucketAggrState) pushSamples(samples []pushSample, deleteDeadline int64, idx int) { ctx.appendSeriesWithExtraLabel(key, "histogram_bucket", float64(count), "vmrange", vmrange)
for i := range samples {
s := &samples[i]
outputKey := getOutputKey(s.key)
again:
v, ok := as.m.Load(outputKey)
if !ok {
// The entry is missing in the map. Try creating it.
v = &histogramBucketStateValue{}
outputKey = bytesutil.InternString(outputKey)
vNew, loaded := as.m.LoadOrStore(outputKey, v)
if loaded {
// Use the entry created by a concurrent goroutine.
v = vNew
}
}
sv := v.(*histogramBucketStateValue)
sv.mu.Lock()
deleted := sv.deleted
if !deleted {
sv.state[idx].Update(s.value)
sv.deleteDeadline = deleteDeadline
}
sv.mu.Unlock()
if deleted {
// The entry has been deleted by the concurrent call to flushState
// Try obtaining and updating the entry again.
goto again
}
}
}
func (as *histogramBucketAggrState) flushState(ctx *flushCtx) {
m := &as.m
m.Range(func(k, v any) bool {
sv := v.(*histogramBucketStateValue)
sv.mu.Lock()
// check for stale entries
deleted := ctx.flushTimestamp > sv.deleteDeadline
if deleted {
// Mark the current entry as deleted
sv.deleted = deleted
sv.mu.Unlock()
m.Delete(k)
return true
}
sv.total.Merge(&sv.state[ctx.idx])
total := &sv.total
sv.state[ctx.idx] = metrics.Histogram{}
sv.mu.Unlock()
key := k.(string)
total.VisitNonZeroBuckets(func(vmrange string, count uint64) {
ctx.appendSeriesWithExtraLabel(key, "histogram_bucket", float64(count), "vmrange", vmrange)
})
return true
}) })
total.Reset()
} }

View file

@ -1,90 +1,20 @@
package streamaggr package streamaggr
import ( type lastAggrValue struct {
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
)
// lastAggrState calculates output=last, e.g. the last value over input samples.
type lastAggrState struct {
m sync.Map
}
type lastStateValue struct {
mu sync.Mutex
state [aggrStateSize]lastState
deleted bool
deleteDeadline int64
}
type lastState struct {
last float64 last float64
timestamp int64 timestamp int64
} }
func newLastAggrState() *lastAggrState { func (av *lastAggrValue) pushSample(ctx *pushSampleCtx) {
return &lastAggrState{} if ctx.sample.timestamp >= av.timestamp {
} av.last = ctx.sample.value
av.timestamp = ctx.sample.timestamp
func (as *lastAggrState) pushSamples(samples []pushSample, deleteDeadline int64, idx int) {
for i := range samples {
s := &samples[i]
outputKey := getOutputKey(s.key)
again:
v, ok := as.m.Load(outputKey)
if !ok {
// The entry is missing in the map. Try creating it.
v = &lastStateValue{}
outputKey = bytesutil.InternString(outputKey)
vNew, loaded := as.m.LoadOrStore(outputKey, v)
if loaded {
// Update the entry created by a concurrent goroutine.
v = vNew
}
}
sv := v.(*lastStateValue)
sv.mu.Lock()
deleted := sv.deleted
if !deleted {
if s.timestamp >= sv.state[idx].timestamp {
sv.state[idx].last = s.value
sv.state[idx].timestamp = s.timestamp
}
sv.deleteDeadline = deleteDeadline
}
sv.mu.Unlock()
if deleted {
// The entry has been deleted by the concurrent call to flushState
// Try obtaining and updating the entry again.
goto again
}
} }
} }
func (as *lastAggrState) flushState(ctx *flushCtx) { func (av *lastAggrValue) flush(ctx *flushCtx, key string) {
m := &as.m if av.timestamp > 0 {
m.Range(func(k, v any) bool { ctx.appendSeries(key, "last", av.last)
sv := v.(*lastStateValue) av.timestamp = 0
sv.mu.Lock() }
// check for stale entries
deleted := ctx.flushTimestamp > sv.deleteDeadline
if deleted {
// Mark the current entry as deleted
sv.deleted = deleted
sv.mu.Unlock()
m.Delete(k)
return true
}
state := sv.state[ctx.idx]
sv.state[ctx.idx] = lastState{}
sv.mu.Unlock()
if state.timestamp > 0 {
key := k.(string)
ctx.appendSeries(key, "last", state.last)
}
return true
})
} }

View file

@ -1,93 +1,23 @@
package streamaggr package streamaggr
import ( type maxAggrValue struct {
"sync" max float64
defined bool
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
)
// maxAggrState calculates output=max, e.g. the maximum value over input samples.
type maxAggrState struct {
m sync.Map
} }
type maxStateValue struct { func (av *maxAggrValue) pushSample(ctx *pushSampleCtx) {
mu sync.Mutex if ctx.sample.value > av.max || !av.defined {
state [aggrStateSize]maxState av.max = ctx.sample.value
deleted bool }
deleteDeadline int64 if !av.defined {
} av.defined = true
type maxState struct {
max float64
exists bool
}
func newMaxAggrState() *maxAggrState {
return &maxAggrState{}
}
func (as *maxAggrState) pushSamples(samples []pushSample, deleteDeadline int64, idx int) {
for i := range samples {
s := &samples[i]
outputKey := getOutputKey(s.key)
again:
v, ok := as.m.Load(outputKey)
if !ok {
// The entry is missing in the map. Try creating it.
v = &maxStateValue{}
outputKey = bytesutil.InternString(outputKey)
vNew, loaded := as.m.LoadOrStore(outputKey, v)
if loaded {
// Use the entry created by a concurrent goroutine.
v = vNew
}
}
sv := v.(*maxStateValue)
sv.mu.Lock()
deleted := sv.deleted
if !deleted {
state := &sv.state[idx]
if !state.exists {
state.max = s.value
state.exists = true
} else if s.value > state.max {
state.max = s.value
}
sv.deleteDeadline = deleteDeadline
}
sv.mu.Unlock()
if deleted {
// The entry has been deleted by the concurrent call to flushState
// Try obtaining and updating the entry again.
goto again
}
} }
} }
func (as *maxAggrState) flushState(ctx *flushCtx) { func (av *maxAggrValue) flush(ctx *flushCtx, key string) {
m := &as.m if av.defined {
m.Range(func(k, v any) bool { ctx.appendSeries(key, "max", av.max)
sv := v.(*maxStateValue) av.max = 0
sv.mu.Lock() av.defined = false
}
// check for stale entries
deleted := ctx.flushTimestamp > sv.deleteDeadline
if deleted {
// Mark the current entry as deleted
sv.deleted = deleted
sv.mu.Unlock()
m.Delete(k)
return true
}
state := sv.state[ctx.idx]
sv.state[ctx.idx] = maxState{}
sv.mu.Unlock()
if state.exists {
key := k.(string)
ctx.appendSeries(key, "max", state.max)
}
return true
})
} }

View file

@ -1,93 +1,23 @@
package streamaggr package streamaggr
import ( type minAggrValue struct {
"sync" min float64
defined bool
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
)
// minAggrState calculates output=min, e.g. the minimum value over input samples.
type minAggrState struct {
m sync.Map
} }
type minStateValue struct { func (av *minAggrValue) pushSample(ctx *pushSampleCtx) {
mu sync.Mutex if ctx.sample.value < av.min || !av.defined {
state [aggrStateSize]minState av.min = ctx.sample.value
deleted bool }
deleteDeadline int64 if !av.defined {
} av.defined = true
type minState struct {
min float64
exists bool
}
func newMinAggrState() *minAggrState {
return &minAggrState{}
}
func (as *minAggrState) pushSamples(samples []pushSample, deleteDeadline int64, idx int) {
for i := range samples {
s := &samples[i]
outputKey := getOutputKey(s.key)
again:
v, ok := as.m.Load(outputKey)
if !ok {
// The entry is missing in the map. Try creating it.
v = &minStateValue{}
outputKey = bytesutil.InternString(outputKey)
vNew, loaded := as.m.LoadOrStore(outputKey, v)
if loaded {
// Use the entry created by a concurrent goroutine.
v = vNew
}
}
sv := v.(*minStateValue)
sv.mu.Lock()
deleted := sv.deleted
if !deleted {
state := &sv.state[idx]
if !state.exists {
state.min = s.value
state.exists = true
} else if s.value < state.min {
state.min = s.value
}
sv.deleteDeadline = deleteDeadline
}
sv.mu.Unlock()
if deleted {
// The entry has been deleted by the concurrent call to flushState
// Try obtaining and updating the entry again.
goto again
}
} }
} }
func (as *minAggrState) flushState(ctx *flushCtx) { func (av *minAggrValue) flush(ctx *flushCtx, key string) {
m := &as.m if av.defined {
m.Range(func(k, v any) bool { ctx.appendSeries(key, "min", av.min)
sv := v.(*minStateValue) av.defined = false
sv.mu.Lock() av.min = 0
}
// check for stale entries
deleted := ctx.flushTimestamp > sv.deleteDeadline
if deleted {
// Mark the current entry as deleted
sv.deleted = deleted
sv.mu.Unlock()
m.Delete(k)
return true
}
state := sv.state[ctx.idx]
sv.state[ctx.idx] = minState{}
sv.mu.Unlock()
if state.exists {
key := k.(string)
ctx.appendSeries(key, "min", state.min)
}
return true
})
} }

129
lib/streamaggr/output.go Normal file
View file

@ -0,0 +1,129 @@
package streamaggr
import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/metrics"
"sync"
)
type pushSampleCtx struct {
stateSize int
deleteDeadline int64
sample *pushSample
idx int
inputKey string
}
type aggrValuesFn func(*pushSampleCtx) []aggrValue
type aggrValuesInitFn func([]aggrValue) []aggrValue
func newAggrValues[V any, VP aggrValuePtr[V]](initFn aggrValuesInitFn) aggrValuesFn {
return func(ctx *pushSampleCtx) []aggrValue {
output := make([]aggrValue, ctx.stateSize)
if initFn != nil {
return initFn(output)
}
for i := range output {
var v VP = new(V)
output[i] = v
}
return output
}
}
type aggrOutputs struct {
m sync.Map
stateSize int
initFns []aggrValuesFn
outputSamples *metrics.Counter
}
func (ao *aggrOutputs) pushSamples(data *pushCtxData) {
ctx := &pushSampleCtx{
stateSize: ao.stateSize,
deleteDeadline: data.deleteDeadline,
idx: data.idx,
}
var outputKey string
for i := range data.samples {
ctx.sample = &data.samples[i]
ctx.inputKey, outputKey = getInputOutputKey(ctx.sample.key)
again:
v, ok := ao.m.Load(outputKey)
if !ok {
// The entry is missing in the map. Try creating it.
nv := &aggrValues{
values: make([][]aggrValue, len(ao.initFns)),
}
for i, initFn := range ao.initFns {
nv.values[i] = initFn(ctx)
}
v = nv
outputKey = bytesutil.InternString(outputKey)
vNew, loaded := ao.m.LoadOrStore(outputKey, v)
if loaded {
// Use the entry created by a concurrent goroutine.
v = vNew
}
}
av := v.(*aggrValues)
av.mu.Lock()
deleted := av.deleted
if !deleted {
for i := range av.values {
av.values[i][data.idx].pushSample(ctx)
}
av.deleteDeadline = data.deleteDeadline
}
av.mu.Unlock()
if deleted {
// The entry has been deleted by the concurrent call to flush
// Try obtaining and updating the entry again.
goto again
}
}
}
func (ao *aggrOutputs) flushState(ctx *flushCtx) {
m := &ao.m
m.Range(func(k, v any) bool {
// Atomically delete the entry from the map, so new entry is created for the next flush.
av := v.(*aggrValues)
av.mu.Lock()
// check for stale entries
deleted := ctx.flushTimestamp > av.deleteDeadline
if deleted {
// Mark the current entry as deleted
av.deleted = deleted
av.mu.Unlock()
m.Delete(k)
return true
}
key := k.(string)
for _, ov := range av.values {
ov[ctx.idx].flush(ctx, key)
}
av.mu.Unlock()
return true
})
}
type aggrValues struct {
mu sync.Mutex
values [][]aggrValue
deleteDeadline int64
deleted bool
}
type aggrValue interface {
pushSample(*pushSampleCtx)
flush(*flushCtx, string)
}
type aggrValuePtr[V any] interface {
*V
aggrValue
}

View file

@ -1,102 +1,59 @@
package streamaggr package streamaggr
import ( import (
"strconv"
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/valyala/histogram" "github.com/valyala/histogram"
"strconv"
) )
// quantilesAggrState calculates output=quantiles, e.g. the given quantiles over the input samples. func quantilesInitFn(stateSize int, phis []float64) aggrValuesInitFn {
states := make([]*quantilesAggrState, stateSize)
return func(values []aggrValue) []aggrValue {
for i := range values {
state := states[i]
if state == nil {
state = &quantilesAggrState{
phis: phis,
}
states[i] = state
}
values[i] = &quantilesAggrValue{
state: state,
}
}
return values
}
}
type quantilesAggrState struct { type quantilesAggrState struct {
m sync.Map phis []float64
phis []float64 quantiles []float64
b []byte
} }
type quantilesStateValue struct { // quantilesAggrValue calculates output=quantiles, e.g. the given quantiles over the input samples.
mu sync.Mutex type quantilesAggrValue struct {
state [aggrStateSize]*histogram.Fast h *histogram.Fast
deleted bool state *quantilesAggrState
deleteDeadline int64
} }
func newQuantilesAggrState(phis []float64) *quantilesAggrState { func (av *quantilesAggrValue) pushSample(ctx *pushSampleCtx) {
return &quantilesAggrState{ if av.h == nil {
phis: phis, av.h = histogram.GetFast()
} }
av.h.Update(ctx.sample.value)
} }
func (as *quantilesAggrState) pushSamples(samples []pushSample, deleteDeadline int64, idx int) { func (av *quantilesAggrValue) flush(ctx *flushCtx, key string) {
for i := range samples { if av.h != nil {
s := &samples[i] av.state.quantiles = av.h.Quantiles(av.state.quantiles[:0], av.state.phis)
outputKey := getOutputKey(s.key) }
histogram.PutFast(av.h)
again: if len(av.state.quantiles) > 0 {
v, ok := as.m.Load(outputKey) for i, quantile := range av.state.quantiles {
if !ok { av.state.b = strconv.AppendFloat(av.state.b[:0], av.state.phis[i], 'g', -1, 64)
// The entry is missing in the map. Try creating it. phiStr := bytesutil.InternBytes(av.state.b)
v = &quantilesStateValue{} ctx.appendSeriesWithExtraLabel(key, "quantiles", quantile, "quantile", phiStr)
outputKey = bytesutil.InternString(outputKey)
vNew, loaded := as.m.LoadOrStore(outputKey, v)
if loaded {
// Use the entry created by a concurrent goroutine.
v = vNew
}
}
sv := v.(*quantilesStateValue)
sv.mu.Lock()
deleted := sv.deleted
if !deleted {
if sv.state[idx] == nil {
sv.state[idx] = histogram.GetFast()
}
sv.state[idx].Update(s.value)
sv.deleteDeadline = deleteDeadline
}
sv.mu.Unlock()
if deleted {
// The entry has been deleted by the concurrent call to flushState
// Try obtaining and updating the entry again.
goto again
} }
} }
} }
func (as *quantilesAggrState) flushState(ctx *flushCtx) {
m := &as.m
phis := as.phis
var quantiles []float64
var b []byte
m.Range(func(k, v any) bool {
sv := v.(*quantilesStateValue)
sv.mu.Lock()
// check for stale entries
deleted := ctx.flushTimestamp > sv.deleteDeadline
if deleted {
// Mark the current entry as deleted
sv.deleted = deleted
sv.mu.Unlock()
m.Delete(k)
return true
}
state := sv.state[ctx.idx]
quantiles = quantiles[:0]
if state != nil {
quantiles = state.Quantiles(quantiles[:0], phis)
histogram.PutFast(state)
state.Reset()
}
sv.mu.Unlock()
if len(quantiles) > 0 {
key := k.(string)
for i, quantile := range quantiles {
b = strconv.AppendFloat(b[:0], phis[i], 'g', -1, 64)
phiStr := bytesutil.InternBytes(b)
ctx.appendSeriesWithExtraLabel(key, "quantiles", quantile, "quantile", phiStr)
}
}
return true
})
}

View file

@ -1,176 +1,115 @@
package streamaggr package streamaggr
import ( import (
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
) )
// rateAggrState calculates output=rate_avg and rate_sum, e.g. the average per-second increase rate for counter metrics. func rateInitFn(isAvg bool) aggrValuesInitFn {
type rateAggrState struct { return func(values []aggrValue) []aggrValue {
m sync.Map shared := &rateAggrValueShared{
lastValues: make(map[string]rateLastValue),
// isAvg is set to true if rate_avg() must be calculated instead of rate_sum(). }
isAvg bool for i := range values {
values[i] = &rateAggrValue{
isAvg: isAvg,
shared: shared,
state: make(map[string]rateAggrValueState),
}
}
return values
}
} }
type rateStateValue struct { // rateLastValue calculates output=rate_avg and rate_sum, e.g. the average per-second increase rate for counter metrics.
mu sync.Mutex type rateLastValue struct {
state map[string]rateState value float64
deleted bool
deleteDeadline int64 deleteDeadline int64
}
type rateState struct { // prevTimestamp is the timestamp of the last registered sample in the previous aggregation interval
lastValues [aggrStateSize]rateLastValueState
// prevTimestamp stores timestamp of the last registered value
// in the previous aggregation interval
prevTimestamp int64 prevTimestamp int64
// prevValue stores last registered value
// in the previous aggregation interval
prevValue float64
deleteDeadline int64
} }
type rateLastValueState struct { type rateAggrValueShared struct {
firstValue float64 lastValues map[string]rateLastValue
value float64
timestamp int64
// total stores cumulative difference between registered values
// in the aggregation interval
total float64
} }
func newRateAggrState(isAvg bool) *rateAggrState { type rateAggrValueState struct {
return &rateAggrState{ // increase stores cumulative increase for the current time series on the current aggregation interval
isAvg: isAvg, increase float64
timestamp int64
}
type rateAggrValue struct {
shared *rateAggrValueShared
state map[string]rateAggrValueState
isAvg bool
}
func (av *rateAggrValue) pushSample(ctx *pushSampleCtx) {
sv := av.state[ctx.inputKey]
inputKey := ctx.inputKey
lv, ok := av.shared.lastValues[ctx.inputKey]
if ok {
if ctx.sample.timestamp < sv.timestamp {
// Skip out of order sample
return
}
if ctx.sample.value >= lv.value {
sv.increase += ctx.sample.value - lv.value
} else {
// counter reset
sv.increase += ctx.sample.value
}
} else {
lv.prevTimestamp = ctx.sample.timestamp
}
lv.value = ctx.sample.value
lv.deleteDeadline = ctx.deleteDeadline
sv.timestamp = ctx.sample.timestamp
inputKey = bytesutil.InternString(inputKey)
av.state[inputKey] = sv
av.shared.lastValues[inputKey] = lv
}
func (av *rateAggrValue) flush(ctx *flushCtx, key string) {
suffix := av.getSuffix()
rate := 0.0
countSeries := 0
lvs := av.shared.lastValues
for lk, lv := range lvs {
if ctx.flushTimestamp > lv.deleteDeadline {
delete(lvs, lk)
continue
}
}
for sk, sv := range av.state {
lv := lvs[sk]
if lv.prevTimestamp == 0 {
continue
}
d := float64(sv.timestamp-lv.prevTimestamp) / 1000
if d > 0 {
rate += sv.increase / d
countSeries++
}
lv.prevTimestamp = sv.timestamp
lvs[sk] = lv
delete(av.state, sk)
}
if countSeries == 0 {
return
}
if av.isAvg {
rate /= float64(countSeries)
}
if rate > 0 {
ctx.appendSeries(key, suffix, rate)
} }
} }
func (as *rateAggrState) pushSamples(samples []pushSample, deleteDeadline int64, idx int) { func (av *rateAggrValue) getSuffix() string {
for i := range samples { if av.isAvg {
s := &samples[i] return "rate_avg"
inputKey, outputKey := getInputOutputKey(s.key)
again:
v, ok := as.m.Load(outputKey)
if !ok {
// The entry is missing in the map. Try creating it.
rsv := &rateStateValue{
state: make(map[string]rateState),
}
v = rsv
outputKey = bytesutil.InternString(outputKey)
vNew, loaded := as.m.LoadOrStore(outputKey, v)
if loaded {
// Use the entry created by a concurrent goroutine.
v = vNew
}
}
sv := v.(*rateStateValue)
sv.mu.Lock()
deleted := sv.deleted
if !deleted {
state, ok := sv.state[inputKey]
lv := state.lastValues[idx]
if ok && lv.timestamp > 0 {
if s.timestamp < lv.timestamp {
// Skip out of order sample
sv.mu.Unlock()
continue
}
if state.prevTimestamp == 0 {
state.prevTimestamp = lv.timestamp
state.prevValue = lv.value
}
if s.value >= lv.value {
lv.total += s.value - lv.value
} else {
// counter reset
lv.total += s.value
}
} else if state.prevTimestamp > 0 {
lv.firstValue = s.value
}
lv.value = s.value
lv.timestamp = s.timestamp
state.lastValues[idx] = lv
state.deleteDeadline = deleteDeadline
inputKey = bytesutil.InternString(inputKey)
sv.state[inputKey] = state
sv.deleteDeadline = deleteDeadline
}
sv.mu.Unlock()
if deleted {
// The entry has been deleted by the concurrent call to flushState
// Try obtaining and updating the entry again.
goto again
}
} }
} return "rate_sum"
func (as *rateAggrState) getSuffix() string {
if as.isAvg {
return "rate_avg"
}
return "rate_sum"
}
func (as *rateAggrState) flushState(ctx *flushCtx) {
m := &as.m
suffix := as.getSuffix()
m.Range(func(k, v any) bool {
sv := v.(*rateStateValue)
sv.mu.Lock()
// check for stale entries
deleted := ctx.flushTimestamp > sv.deleteDeadline
if deleted {
// Mark the current entry as deleted
sv.deleted = true
sv.mu.Unlock()
m.Delete(k)
return true
}
// Delete outdated entries in state
rate := 0.0
countSeries := 0
for k1, state := range sv.state {
if ctx.flushTimestamp > state.deleteDeadline {
delete(sv.state, k1)
continue
}
v1 := state.lastValues[ctx.idx]
rateInterval := v1.timestamp - state.prevTimestamp
if rateInterval > 0 && state.prevTimestamp > 0 {
if v1.firstValue >= state.prevValue {
v1.total += v1.firstValue - state.prevValue
} else {
v1.total += v1.firstValue
}
// calculate rate only if value was seen at least twice with different timestamps
rate += (v1.total) * 1000 / float64(rateInterval)
state.prevTimestamp = v1.timestamp
state.prevValue = v1.value
countSeries++
}
state.lastValues[ctx.idx] = rateLastValueState{}
sv.state[k1] = state
}
sv.mu.Unlock()
if countSeries > 0 {
if as.isAvg {
rate /= float64(countSeries)
}
key := k.(string)
ctx.appendSeries(key, suffix, rate)
}
return true
})
} }

View file

@ -2,93 +2,27 @@ package streamaggr
import ( import (
"math" "math"
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
) )
// stddevAggrState calculates output=stddev, e.g. the average value over input samples. // stddevAggrValue calculates output=stddev, e.g. the average value over input samples.
type stddevAggrState struct { type stddevAggrValue struct {
m sync.Map
}
type stddevStateValue struct {
mu sync.Mutex
state [aggrStateSize]stddevState
deleted bool
deleteDeadline int64
}
type stddevState struct {
count float64 count float64
avg float64 avg float64
q float64 q float64
} }
func newStddevAggrState() *stddevAggrState { func (av *stddevAggrValue) pushSample(ctx *pushSampleCtx) {
return &stddevAggrState{} av.count++
avg := av.avg + (ctx.sample.value-av.avg)/av.count
av.q += (ctx.sample.value - av.avg) * (ctx.sample.value - avg)
av.avg = avg
} }
func (as *stddevAggrState) pushSamples(samples []pushSample, deleteDeadline int64, idx int) { func (av *stddevAggrValue) flush(ctx *flushCtx, key string) {
for i := range samples { if av.count > 0 {
s := &samples[i] ctx.appendSeries(key, "stddev", math.Sqrt(av.q/av.count))
outputKey := getOutputKey(s.key) av.count = 0
av.avg = 0
again: av.q = 0
v, ok := as.m.Load(outputKey)
if !ok {
// The entry is missing in the map. Try creating it.
v = &stddevStateValue{}
outputKey = bytesutil.InternString(outputKey)
vNew, loaded := as.m.LoadOrStore(outputKey, v)
if loaded {
// Use the entry created by a concurrent goroutine.
v = vNew
}
}
sv := v.(*stddevStateValue)
sv.mu.Lock()
deleted := sv.deleted
if !deleted {
// See `Rapid calculation methods` at https://en.wikipedia.org/wiki/Standard_deviation
state := &sv.state[idx]
state.count++
avg := state.avg + (s.value-state.avg)/state.count
state.q += (s.value - state.avg) * (s.value - avg)
state.avg = avg
sv.deleteDeadline = deleteDeadline
}
sv.mu.Unlock()
if deleted {
// The entry has been deleted by the concurrent call to flushState
// Try obtaining and updating the entry again.
goto again
}
} }
} }
func (as *stddevAggrState) flushState(ctx *flushCtx) {
m := &as.m
m.Range(func(k, v any) bool {
sv := v.(*stddevStateValue)
sv.mu.Lock()
// check for stale entries
deleted := ctx.flushTimestamp > sv.deleteDeadline
if deleted {
// Mark the current entry as deleted
sv.deleted = deleted
sv.mu.Unlock()
m.Delete(k)
return true
}
state := sv.state[ctx.idx]
sv.state[ctx.idx] = stddevState{}
sv.mu.Unlock()
if state.count > 0 {
key := k.(string)
ctx.appendSeries(key, "stddev", math.Sqrt(state.q/state.count))
}
return true
})
}

View file

@ -1,93 +1,24 @@
package streamaggr package streamaggr
import ( // stdvarAggrValue calculates output=stdvar, e.g. the average value over input samples.
"sync" type stdvarAggrValue struct {
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
)
// stdvarAggrState calculates output=stdvar, e.g. the average value over input samples.
type stdvarAggrState struct {
m sync.Map
}
type stdvarStateValue struct {
mu sync.Mutex
state [aggrStateSize]stdvarState
deleted bool
deleteDeadline int64
}
type stdvarState struct {
count float64 count float64
avg float64 avg float64
q float64 q float64
} }
func newStdvarAggrState() *stdvarAggrState { func (av *stdvarAggrValue) pushSample(ctx *pushSampleCtx) {
return &stdvarAggrState{} av.count++
avg := av.avg + (ctx.sample.value-av.avg)/av.count
av.q += (ctx.sample.value - av.avg) * (ctx.sample.value - avg)
av.avg = avg
} }
func (as *stdvarAggrState) pushSamples(samples []pushSample, deleteDeadline int64, idx int) { func (av *stdvarAggrValue) flush(ctx *flushCtx, key string) {
for i := range samples { if av.count > 0 {
s := &samples[i] ctx.appendSeries(key, "stdvar", av.q/av.count)
outputKey := getOutputKey(s.key) av.count = 0
av.avg = 0
again: av.q = 0
v, ok := as.m.Load(outputKey)
if !ok {
// The entry is missing in the map. Try creating it.
v = &stdvarStateValue{}
outputKey = bytesutil.InternString(outputKey)
vNew, loaded := as.m.LoadOrStore(outputKey, v)
if loaded {
// Use the entry created by a concurrent goroutine.
v = vNew
}
}
sv := v.(*stdvarStateValue)
sv.mu.Lock()
deleted := sv.deleted
if !deleted {
// See `Rapid calculation methods` at https://en.wikipedia.org/wiki/Standard_deviation
state := &sv.state[idx]
state.count++
avg := state.avg + (s.value-state.avg)/state.count
state.q += (s.value - state.avg) * (s.value - avg)
state.avg = avg
sv.deleteDeadline = deleteDeadline
}
sv.mu.Unlock()
if deleted {
// The entry has been deleted by the concurrent call to flushState
// Try obtaining and updating the entry again.
goto again
}
} }
} }
func (as *stdvarAggrState) flushState(ctx *flushCtx) {
m := &as.m
m.Range(func(k, v any) bool {
sv := v.(*stdvarStateValue)
sv.mu.Lock()
// check for stale entries
deleted := ctx.flushTimestamp > sv.deleteDeadline
if deleted {
// Mark the current entry as deleted
sv.deleted = deleted
sv.mu.Unlock()
m.Delete(k)
return true
}
state := sv.state[ctx.idx]
sv.state[ctx.idx] = stdvarState{}
sv.mu.Unlock()
if state.count > 0 {
key := k.(string)
ctx.appendSeries(key, "stdvar", state.q/state.count)
}
return true
})
}

View file

@ -27,9 +27,6 @@ import (
"gopkg.in/yaml.v2" "gopkg.in/yaml.v2"
) )
// count of aggregation intervals for states
const aggrStateSize = 2
var supportedOutputs = []string{ var supportedOutputs = []string{
"avg", "avg",
"count_samples", "count_samples",
@ -144,6 +141,9 @@ type Options struct {
// //
// By default, aggregates samples are dropped, while the remaining samples are written to the corresponding -remoteWrite.url. // By default, aggregates samples are dropped, while the remaining samples are written to the corresponding -remoteWrite.url.
KeepInput bool KeepInput bool
// StateSize defines a number of intervals to aggregate for
StateSize int
} }
// Config is a configuration for a single stream aggregation. // Config is a configuration for a single stream aggregation.
@ -246,6 +246,9 @@ type Config struct {
// OutputRelabelConfigs is an optional relabeling rules, which are applied // OutputRelabelConfigs is an optional relabeling rules, which are applied
// on the aggregated output before being sent to remote storage. // on the aggregated output before being sent to remote storage.
OutputRelabelConfigs []promrelabel.RelabelConfig `yaml:"output_relabel_configs,omitempty"` OutputRelabelConfigs []promrelabel.RelabelConfig `yaml:"output_relabel_configs,omitempty"`
// StateSize
StateSize *int `yaml:"state_size,omitempty"`
} }
// Aggregators aggregates metrics passed to Push and calls pushFunc for aggregated data. // Aggregators aggregates metrics passed to Push and calls pushFunc for aggregated data.
@ -394,14 +397,14 @@ type aggregator struct {
da *dedupAggr da *dedupAggr
// aggrOutputs contains aggregate states for the given outputs // aggrOutputs contains aggregate states for the given outputs
aggrOutputs []aggrOutput aggrOutputs *aggrOutputs
// minTimestamp is used for ignoring old samples when ignoreOldSamples is set // minTimestamp is used for ignoring old samples when ignoreOldSamples is set
minTimestamp atomic.Int64 minTimestamp atomic.Int64
// time to wait after interval end before flush // time to wait after interval end before flush
flushAfter *histogram.Fast flushAfter *histogram.Fast
muFlushAfter sync.Mutex flushAfterMu sync.Mutex
// suffix contains a suffix, which should be added to aggregate metric names // suffix contains a suffix, which should be added to aggregate metric names
// //
@ -424,26 +427,10 @@ type aggregator struct {
matchedSamples *metrics.Counter matchedSamples *metrics.Counter
} }
type aggrOutput struct {
as aggrState
outputSamples *metrics.Counter
}
type aggrState interface {
// pushSamples must push samples to the aggrState.
//
// samples[].key must be cloned by aggrState, since it may change after returning from pushSamples.
pushSamples(samples []pushSample, deleteDeadline int64, idx int)
// flushState must flush aggrState data to ctx.
flushState(ctx *flushCtx)
}
// PushFunc is called by Aggregators when it needs to push its state to metrics storage // PushFunc is called by Aggregators when it needs to push its state to metrics storage
type PushFunc func(tss []prompbmarshal.TimeSeries) type PushFunc func(tss []prompbmarshal.TimeSeries)
type aggrPushFunc func(samples []pushSample, deleteDeadline int64, idx int) type aggrPushFunc func(data *pushCtxData)
// newAggregator creates new aggregator for the given cfg, which pushes the aggregate data to pushFunc. // newAggregator creates new aggregator for the given cfg, which pushes the aggregate data to pushFunc.
// //
@ -554,6 +541,16 @@ func newAggregator(cfg *Config, path string, pushFunc PushFunc, ms *metrics.Set,
ignoreFirstIntervals = *v ignoreFirstIntervals = *v
} }
// check cfg.StateSize
stateSize := opts.StateSize
if v := cfg.StateSize; v != nil {
stateSize = *v
}
if stateSize < 1 {
return nil, fmt.Errorf("`state_size` must be greater or equal to 1")
}
// Initialize common metric labels // Initialize common metric labels
name := cfg.Name name := cfg.Name
if name == "" { if name == "" {
@ -566,18 +563,18 @@ func newAggregator(cfg *Config, path string, pushFunc PushFunc, ms *metrics.Set,
return nil, fmt.Errorf("`outputs` list must contain at least a single entry from the list %s; "+ return nil, fmt.Errorf("`outputs` list must contain at least a single entry from the list %s; "+
"see https://docs.victoriametrics.com/stream-aggregation/", supportedOutputs) "see https://docs.victoriametrics.com/stream-aggregation/", supportedOutputs)
} }
aggrOutputs := make([]aggrOutput, len(cfg.Outputs)) aggrOutputs := &aggrOutputs{
initFns: make([]aggrValuesFn, len(cfg.Outputs)),
outputSamples: ms.NewCounter(fmt.Sprintf(`vm_streamaggr_output_samples_total{outputs=%q,%s}`, "test", metricLabels)),
stateSize: stateSize,
}
outputsSeen := make(map[string]struct{}, len(cfg.Outputs)) outputsSeen := make(map[string]struct{}, len(cfg.Outputs))
for i, output := range cfg.Outputs { for i, output := range cfg.Outputs {
as, err := newAggrState(output, outputsSeen, stalenessInterval) oc, err := newOutputInitFns(output, outputsSeen, stateSize)
if err != nil { if err != nil {
return nil, err return nil, err
} }
aggrOutputs[i] = aggrOutput{ aggrOutputs.initFns[i] = oc
as: as,
outputSamples: ms.NewCounter(fmt.Sprintf(`vm_streamaggr_output_samples_total{output=%q,%s}`, output, metricLabels)),
}
} }
// initialize suffix to add to metric names after aggregation // initialize suffix to add to metric names after aggregation
@ -629,7 +626,7 @@ func newAggregator(cfg *Config, path string, pushFunc PushFunc, ms *metrics.Set,
} }
if dedupInterval > 0 { if dedupInterval > 0 {
a.da = newDedupAggr() a.da = newDedupAggr(stateSize)
_ = ms.NewGauge(fmt.Sprintf(`vm_streamaggr_dedup_state_size_bytes{%s}`, metricLabels), func() float64 { _ = ms.NewGauge(fmt.Sprintf(`vm_streamaggr_dedup_state_size_bytes{%s}`, metricLabels), func() float64 {
n := a.da.sizeBytes() n := a.da.sizeBytes()
@ -667,7 +664,7 @@ func newAggregator(cfg *Config, path string, pushFunc PushFunc, ms *metrics.Set,
return a, nil return a, nil
} }
func newAggrState(output string, outputsSeen map[string]struct{}, stalenessInterval time.Duration) (aggrState, error) { func newOutputInitFns(output string, outputsSeen map[string]struct{}, stateSize int) (aggrValuesFn, error) {
// check for duplicated output // check for duplicated output
if _, ok := outputsSeen[output]; ok { if _, ok := outputsSeen[output]; ok {
return nil, fmt.Errorf("`outputs` list contains duplicate aggregation function: %s", output) return nil, fmt.Errorf("`outputs` list contains duplicate aggregation function: %s", output)
@ -699,44 +696,44 @@ func newAggrState(output string, outputsSeen map[string]struct{}, stalenessInter
return nil, fmt.Errorf("`outputs` list contains duplicated `quantiles()` function, please combine multiple phi* like `quantiles(0.5, 0.9)`") return nil, fmt.Errorf("`outputs` list contains duplicated `quantiles()` function, please combine multiple phi* like `quantiles(0.5, 0.9)`")
} }
outputsSeen["quantiles"] = struct{}{} outputsSeen["quantiles"] = struct{}{}
return newQuantilesAggrState(phis), nil return newAggrValues[quantilesAggrValue](quantilesInitFn(stateSize, phis)), nil
} }
switch output { switch output {
case "avg": case "avg":
return newAvgAggrState(), nil return newAggrValues[avgAggrValue](nil), nil
case "count_samples": case "count_samples":
return newCountSamplesAggrState(), nil return newAggrValues[countSamplesAggrValue](nil), nil
case "count_series": case "count_series":
return newCountSeriesAggrState(), nil return newAggrValues[countSeriesAggrValue](countSeriesInitFn), nil
case "histogram_bucket": case "histogram_bucket":
return newHistogramBucketAggrState(), nil return newAggrValues[histogramBucketAggrValue](nil), nil
case "increase": case "increase":
return newTotalAggrState(true, true), nil return newAggrValues[totalAggrValue](totalInitFn(true, true)), nil
case "increase_prometheus": case "increase_prometheus":
return newTotalAggrState(true, false), nil return newAggrValues[totalAggrValue](totalInitFn(true, false)), nil
case "last": case "last":
return newLastAggrState(), nil return newAggrValues[lastAggrValue](nil), nil
case "max": case "max":
return newMaxAggrState(), nil return newAggrValues[maxAggrValue](nil), nil
case "min": case "min":
return newMinAggrState(), nil return newAggrValues[minAggrValue](nil), nil
case "rate_avg": case "rate_avg":
return newRateAggrState(true), nil return newAggrValues[rateAggrValue](rateInitFn(true)), nil
case "rate_sum": case "rate_sum":
return newRateAggrState(false), nil return newAggrValues[rateAggrValue](rateInitFn(false)), nil
case "stddev": case "stddev":
return newStddevAggrState(), nil return newAggrValues[stddevAggrValue](nil), nil
case "stdvar": case "stdvar":
return newStdvarAggrState(), nil return newAggrValues[stdvarAggrValue](nil), nil
case "sum_samples": case "sum_samples":
return newSumSamplesAggrState(), nil return newAggrValues[sumSamplesAggrValue](nil), nil
case "total": case "total":
return newTotalAggrState(false, true), nil return newAggrValues[totalAggrValue](totalInitFn(false, true)), nil
case "total_prometheus": case "total_prometheus":
return newTotalAggrState(false, false), nil return newAggrValues[totalAggrValue](totalInitFn(false, false)), nil
case "unique_samples": case "unique_samples":
return newUniqueSamplesAggrState(), nil return newAggrValues[uniqueSamplesAggrValue](uniqueSamplesInitFn), nil
default: default:
return nil, fmt.Errorf("unsupported output=%q; supported values: %s; see https://docs.victoriametrics.com/stream-aggregation/", output, supportedOutputs) return nil, fmt.Errorf("unsupported output=%q; supported values: %s; see https://docs.victoriametrics.com/stream-aggregation/", output, supportedOutputs)
} }
@ -758,15 +755,11 @@ func (a *aggregator) runFlusher(pushFunc PushFunc, alignFlushToInterval, skipInc
} }
} }
var flushTimeMsec int64
tickerWait := func(t *time.Ticker) bool { tickerWait := func(t *time.Ticker) bool {
select { select {
case <-a.stopCh: case <-a.stopCh:
flushTimeMsec = time.Now().UnixMilli()
return false return false
case ct := <-t.C: case <-t.C:
flushTimeMsec = ct.UnixMilli()
fmt.Println(flushTimeMsec)
return true return true
} }
} }
@ -791,10 +784,10 @@ func (a *aggregator) runFlusher(pushFunc PushFunc, alignFlushToInterval, skipInc
pf := pushFunc pf := pushFunc
// Calculate delay // Calculate delay
a.muFlushAfter.Lock() a.flushAfterMu.Lock()
flushAfterMsec := a.flushAfter.Quantile(0.95) flushAfterMsec := a.flushAfter.Quantile(0.95)
a.flushAfter.Reset() a.flushAfter.Reset()
a.muFlushAfter.Unlock() a.flushAfterMu.Unlock()
flushAfter := time.Duration(flushAfterMsec) * time.Millisecond flushAfter := time.Duration(flushAfterMsec) * time.Millisecond
if flushAfter > tickInterval { if flushAfter > tickInterval {
@ -805,7 +798,8 @@ func (a *aggregator) runFlusher(pushFunc PushFunc, alignFlushToInterval, skipInc
time.Sleep(flushAfter) time.Sleep(flushAfter)
} }
a.dedupFlush(dedupTime.UnixMilli(), dedupIdx, flushIdx) deleteDeadline := dedupTime.Add(a.stalenessInterval)
a.dedupFlush(deleteDeadline.UnixMilli(), dedupIdx, flushIdx)
if ct.After(flushDeadline) { if ct.After(flushDeadline) {
// It is time to flush the aggregated state // It is time to flush the aggregated state
@ -832,25 +826,26 @@ func (a *aggregator) runFlusher(pushFunc PushFunc, alignFlushToInterval, skipInc
if !skipIncompleteFlush && ignoreFirstIntervals <= 0 { if !skipIncompleteFlush && ignoreFirstIntervals <= 0 {
dedupTime := time.Now().Truncate(tickInterval).Add(tickInterval) dedupTime := time.Now().Truncate(tickInterval).Add(tickInterval)
deleteDeadline := flushDeadline.Add(a.stalenessInterval)
if a.ignoreOldSamples { if a.ignoreOldSamples {
dedupIdx, flushIdx = a.getAggrIdxs(dedupTime, flushDeadline) dedupIdx, flushIdx = a.getAggrIdxs(dedupTime, flushDeadline)
} }
a.dedupFlush(flushDeadline.UnixMilli(), dedupIdx, flushIdx) a.dedupFlush(deleteDeadline.UnixMilli(), dedupIdx, flushIdx)
a.flush(pushFunc, flushDeadline.UnixMilli(), flushIdx) a.flush(pushFunc, flushDeadline.UnixMilli(), flushIdx)
} }
} }
func (a *aggregator) getAggrIdxs(dedupTime, flushTime time.Time) (int, int) { func (a *aggregator) getAggrIdxs(dedupTime, flushTime time.Time) (int, int) {
flushIdx := getStateIdx(a.interval.Milliseconds(), flushTime.Add(-a.interval).UnixMilli()) flushIdx := a.getStateIdx(a.interval.Milliseconds(), flushTime.Add(-a.interval).UnixMilli())
dedupIdx := flushIdx dedupIdx := flushIdx
if a.dedupInterval > 0 { if a.dedupInterval > 0 {
dedupIdx = getStateIdx(a.dedupInterval.Milliseconds(), dedupTime.Add(-a.dedupInterval).UnixMilli()) dedupIdx = a.getStateIdx(a.dedupInterval.Milliseconds(), dedupTime.Add(-a.dedupInterval).UnixMilli())
} }
return dedupIdx, flushIdx return dedupIdx, flushIdx
} }
func getStateIdx(interval int64, ts int64) int { func (a *aggregator) getStateIdx(interval int64, ts int64) int {
return int(ts/interval) % aggrStateSize return int(ts/interval) % a.aggrOutputs.stateSize
} }
func (a *aggregator) dedupFlush(deleteDeadline int64, dedupIdx, flushIdx int) { func (a *aggregator) dedupFlush(deleteDeadline int64, dedupIdx, flushIdx int) {
@ -861,7 +856,7 @@ func (a *aggregator) dedupFlush(deleteDeadline int64, dedupIdx, flushIdx int) {
startTime := time.Now() startTime := time.Now()
a.da.flush(a.pushSamples, deleteDeadline, dedupIdx, flushIdx) a.da.flush(a.aggrOutputs.pushSamples, deleteDeadline, dedupIdx, flushIdx)
d := time.Since(startTime) d := time.Since(startTime)
a.dedupFlushDuration.Update(d.Seconds()) a.dedupFlushDuration.Update(d.Seconds())
@ -879,24 +874,12 @@ func (a *aggregator) dedupFlush(deleteDeadline int64, dedupIdx, flushIdx int) {
func (a *aggregator) flush(pushFunc PushFunc, flushTimeMsec int64, idx int) { func (a *aggregator) flush(pushFunc PushFunc, flushTimeMsec int64, idx int) {
startTime := time.Now() startTime := time.Now()
var wg sync.WaitGroup ao := a.aggrOutputs
for i := range a.aggrOutputs {
ao := &a.aggrOutputs[i]
flushConcurrencyCh <- struct{}{}
wg.Add(1)
go func(ao *aggrOutput) {
defer func() {
<-flushConcurrencyCh
wg.Done()
}()
ctx := getFlushCtx(a, ao, pushFunc, flushTimeMsec, idx) ctx := getFlushCtx(a, ao, pushFunc, flushTimeMsec, idx)
ao.as.flushState(ctx) ao.flushState(ctx)
ctx.flushSeries() ctx.flushSeries()
putFlushCtx(ctx) putFlushCtx(ctx)
}(ao)
}
wg.Wait()
d := time.Since(startTime) d := time.Since(startTime)
a.flushDuration.Update(d.Seconds()) a.flushDuration.Update(d.Seconds())
@ -921,7 +904,7 @@ func (a *aggregator) MustStop() {
// Push pushes tss to a. // Push pushes tss to a.
func (a *aggregator) Push(tss []prompbmarshal.TimeSeries, matchIdxs []byte) { func (a *aggregator) Push(tss []prompbmarshal.TimeSeries, matchIdxs []byte) {
ctx := getPushCtx() ctx := getPushCtx(a.aggrOutputs.stateSize)
defer putPushCtx(ctx) defer putPushCtx(ctx)
samples := ctx.samples samples := ctx.samples
@ -972,9 +955,6 @@ func (a *aggregator) Push(tss []prompbmarshal.TimeSeries, matchIdxs []byte) {
// do not intern key because number of unique keys could be too high // do not intern key because number of unique keys could be too high
key := bytesutil.ToUnsafeString(buf[bufLen:]) key := bytesutil.ToUnsafeString(buf[bufLen:])
for _, s := range ts.Samples { for _, s := range ts.Samples {
a.muFlushAfter.Lock()
a.flushAfter.Update(float64(nowMsec - s.Timestamp))
a.muFlushAfter.Unlock()
if math.IsNaN(s.Value) { if math.IsNaN(s.Value) {
// Skip NaN values // Skip NaN values
a.ignoredNaNSamples.Inc() a.ignoredNaNSamples.Inc()
@ -990,7 +970,7 @@ func (a *aggregator) Push(tss []prompbmarshal.TimeSeries, matchIdxs []byte) {
maxLagMsec = lagMsec maxLagMsec = lagMsec
} }
if ignoreOldSamples { if ignoreOldSamples {
flushIdx = getStateIdx(a.tickInterval, s.Timestamp) flushIdx = a.getStateIdx(a.tickInterval, s.Timestamp)
} }
samples[flushIdx] = append(samples[flushIdx], pushSample{ samples[flushIdx] = append(samples[flushIdx], pushSample{
key: key, key: key,
@ -999,11 +979,14 @@ func (a *aggregator) Push(tss []prompbmarshal.TimeSeries, matchIdxs []byte) {
}) })
} }
} }
a.flushAfterMu.Lock()
a.flushAfter.Update(float64(maxLagMsec))
a.flushAfterMu.Unlock()
ctx.samples = samples ctx.samples = samples
ctx.buf = buf ctx.buf = buf
pushSamples := a.pushSamples pushSamples := a.aggrOutputs.pushSamples
if a.da != nil { if a.da != nil {
pushSamples = a.da.pushSamples pushSamples = a.da.pushSamples
} }
@ -1012,7 +995,8 @@ func (a *aggregator) Push(tss []prompbmarshal.TimeSeries, matchIdxs []byte) {
if len(s) > 0 { if len(s) > 0 {
a.samplesLag.Update(float64(maxLagMsec) / 1_000) a.samplesLag.Update(float64(maxLagMsec) / 1_000)
a.matchedSamples.Add(len(s)) a.matchedSamples.Add(len(s))
pushSamples(s, deleteDeadlineMsec, idx) data := ctx.getPushCtxData(s, nowMsec, deleteDeadlineMsec, idx)
pushSamples(data)
} }
} }
} }
@ -1031,17 +1015,6 @@ func decompressLabels(dst []prompbmarshal.Label, key string) []prompbmarshal.Lab
return lc.Decompress(dst, bytesutil.ToUnsafeBytes(key)) return lc.Decompress(dst, bytesutil.ToUnsafeBytes(key))
} }
func getOutputKey(key string) string {
src := bytesutil.ToUnsafeBytes(key)
inputKeyLen, nSize := encoding.UnmarshalVarUint64(src)
if nSize <= 0 {
logger.Panicf("BUG: cannot unmarshal inputKeyLen from uvarint")
}
src = src[nSize:]
outputKey := src[inputKeyLen:]
return bytesutil.ToUnsafeString(outputKey)
}
func getInputOutputKey(key string) (string, string) { func getInputOutputKey(key string) (string, string) {
src := bytesutil.ToUnsafeBytes(key) src := bytesutil.ToUnsafeBytes(key)
inputKeyLen, nSize := encoding.UnmarshalVarUint64(src) inputKeyLen, nSize := encoding.UnmarshalVarUint64(src)
@ -1054,20 +1027,30 @@ func getInputOutputKey(key string) (string, string) {
return bytesutil.ToUnsafeString(inputKey), bytesutil.ToUnsafeString(outputKey) return bytesutil.ToUnsafeString(inputKey), bytesutil.ToUnsafeString(outputKey)
} }
func (a *aggregator) pushSamples(samples []pushSample, deleteDeadline int64, idx int) { type pushCtxData struct {
for _, ao := range a.aggrOutputs { samples []pushSample
ao.as.pushSamples(samples, deleteDeadline, idx) deleteDeadline int64
} idx int
now int64
} }
type pushCtx struct { type pushCtx struct {
samples [aggrStateSize][]pushSample samples [][]pushSample
labels promutils.Labels labels promutils.Labels
inputLabels promutils.Labels inputLabels promutils.Labels
outputLabels promutils.Labels outputLabels promutils.Labels
buf []byte buf []byte
} }
func (ctx *pushCtx) getPushCtxData(samples []pushSample, now, deleteDeadline int64, idx int) *pushCtxData {
return &pushCtxData{
samples: samples,
deleteDeadline: deleteDeadline,
idx: idx,
now: now,
}
}
func (ctx *pushCtx) reset() { func (ctx *pushCtx) reset() {
for i := range ctx.samples { for i := range ctx.samples {
ctx.samples[i] = ctx.samples[i][:0] ctx.samples[i] = ctx.samples[i][:0]
@ -1087,10 +1070,12 @@ type pushSample struct {
timestamp int64 timestamp int64
} }
func getPushCtx() *pushCtx { func getPushCtx(stateSize int) *pushCtx {
v := pushCtxPool.Get() v := pushCtxPool.Get()
if v == nil { if v == nil {
return &pushCtx{} return &pushCtx{
samples: make([][]pushSample, stateSize),
}
} }
return v.(*pushCtx) return v.(*pushCtx)
} }
@ -1123,7 +1108,7 @@ func getInputOutputLabels(dstInput, dstOutput, labels []prompbmarshal.Label, by,
return dstInput, dstOutput return dstInput, dstOutput
} }
func getFlushCtx(a *aggregator, ao *aggrOutput, pushFunc PushFunc, flushTimestamp int64, idx int) *flushCtx { func getFlushCtx(a *aggregator, ao *aggrOutputs, pushFunc PushFunc, flushTimestamp int64, idx int) *flushCtx {
v := flushCtxPool.Get() v := flushCtxPool.Get()
if v == nil { if v == nil {
v = &flushCtx{} v = &flushCtx{}
@ -1146,7 +1131,7 @@ var flushCtxPool sync.Pool
type flushCtx struct { type flushCtx struct {
a *aggregator a *aggregator
ao *aggrOutput ao *aggrOutputs
pushFunc PushFunc pushFunc PushFunc
flushTimestamp int64 flushTimestamp int64
idx int idx int

View file

@ -200,11 +200,14 @@ func TestAggregatorsEqual(t *testing.T) {
t.Helper() t.Helper()
pushFunc := func(_ []prompbmarshal.TimeSeries) {} pushFunc := func(_ []prompbmarshal.TimeSeries) {}
aa, err := LoadFromData([]byte(a), pushFunc, nil, "some_alias") opts := Options{
StateSize: 2,
}
aa, err := LoadFromData([]byte(a), pushFunc, &opts, "some_alias")
if err != nil { if err != nil {
t.Fatalf("cannot initialize aggregators: %s", err) t.Fatalf("cannot initialize aggregators: %s", err)
} }
ab, err := LoadFromData([]byte(b), pushFunc, nil, "some_alias") ab, err := LoadFromData([]byte(b), pushFunc, &opts, "some_alias")
if err != nil { if err != nil {
t.Fatalf("cannot initialize aggregators: %s", err) t.Fatalf("cannot initialize aggregators: %s", err)
} }
@ -266,6 +269,7 @@ func TestAggregatorsSuccess(t *testing.T) {
opts := &Options{ opts := &Options{
FlushOnShutdown: true, FlushOnShutdown: true,
NoAlignFlushToInterval: true, NoAlignFlushToInterval: true,
StateSize: 2,
} }
a, err := LoadFromData([]byte(config), pushFunc, opts, "some_alias") a, err := LoadFromData([]byte(config), pushFunc, opts, "some_alias")
if err != nil { if err != nil {
@ -985,6 +989,7 @@ func TestAggregatorsWithDedupInterval(t *testing.T) {
opts := &Options{ opts := &Options{
DedupInterval: 30 * time.Second, DedupInterval: 30 * time.Second,
FlushOnShutdown: true, FlushOnShutdown: true,
StateSize: 2,
} }
a, err := LoadFromData([]byte(config), pushFunc, opts, "some_alias") a, err := LoadFromData([]byte(config), pushFunc, opts, "some_alias")
if err != nil { if err != nil {

View file

@ -1,88 +1,14 @@
package streamaggr package streamaggr
import ( type sumSamplesAggrValue struct {
"sync" sum float64
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
)
// sumSamplesAggrState calculates output=sum_samples, e.g. the sum over input samples.
type sumSamplesAggrState struct {
m sync.Map
} }
type sumSamplesStateValue struct { func (av *sumSamplesAggrValue) pushSample(ctx *pushSampleCtx) {
mu sync.Mutex av.sum += ctx.sample.value
state [aggrStateSize]sumState
deleted bool
deleteDeadline int64
} }
type sumState struct { func (av *sumSamplesAggrValue) flush(ctx *flushCtx, key string) {
sum float64 ctx.appendSeries(key, "sum_samples", av.sum)
exists bool av.sum = 0
}
func newSumSamplesAggrState() *sumSamplesAggrState {
return &sumSamplesAggrState{}
}
func (as *sumSamplesAggrState) pushSamples(samples []pushSample, deleteDeadline int64, idx int) {
for i := range samples {
s := &samples[i]
outputKey := getOutputKey(s.key)
again:
v, ok := as.m.Load(outputKey)
if !ok {
// The entry is missing in the map. Try creating it.
v = &sumSamplesStateValue{}
outputKey = bytesutil.InternString(outputKey)
vNew, loaded := as.m.LoadOrStore(outputKey, v)
if loaded {
// Update the entry created by a concurrent goroutine.
v = vNew
}
}
sv := v.(*sumSamplesStateValue)
sv.mu.Lock()
deleted := sv.deleted
if !deleted {
sv.state[idx].sum += s.value
sv.state[idx].exists = true
sv.deleteDeadline = deleteDeadline
}
sv.mu.Unlock()
if deleted {
// The entry has been deleted by the concurrent call to flushState
// Try obtaining and updating the entry again.
goto again
}
}
}
func (as *sumSamplesAggrState) flushState(ctx *flushCtx) {
m := &as.m
m.Range(func(k, v any) bool {
sv := v.(*sumSamplesStateValue)
sv.mu.Lock()
// check for stale entries
deleted := ctx.flushTimestamp > sv.deleteDeadline
if deleted {
// Mark the current entry as deleted
sv.deleted = deleted
sv.mu.Unlock()
m.Delete(k)
return true
}
state := sv.state[ctx.idx]
sv.state[ctx.idx] = sumState{}
sv.mu.Unlock()
if state.exists {
key := k.(string)
ctx.appendSeries(key, "sum_samples", state.sum)
}
return true
})
} }

View file

@ -2,155 +2,100 @@ package streamaggr
import ( import (
"math" "math"
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
) )
// totalAggrState calculates output=total, total_prometheus, increase and increase_prometheus. func totalInitFn(resetTotalOnFlush, keepFirstSample bool) aggrValuesInitFn {
type totalAggrState struct { return func(values []aggrValue) []aggrValue {
m sync.Map shared := &totalAggrValueShared{
lastValues: make(map[string]totalLastValue),
// Whether to reset the output value on every flushState call. }
resetTotalOnFlush bool for i := range values {
values[i] = &totalAggrValue{
// Whether to take into account the first sample in new time series when calculating the output value. keepFirstSample: keepFirstSample,
keepFirstSample bool resetTotalOnFlush: resetTotalOnFlush,
shared: shared,
}
}
return values
}
} }
type totalStateValue struct { type totalLastValue struct {
mu sync.Mutex
shared totalState
state [aggrStateSize]float64
deleteDeadline int64
deleted bool
}
type totalState struct {
total float64
lastValues map[string]totalLastValueState
}
type totalLastValueState struct {
value float64 value float64
timestamp int64 timestamp int64
deleteDeadline int64 deleteDeadline int64
} }
func newTotalAggrState(resetTotalOnFlush, keepFirstSample bool) *totalAggrState { type totalAggrValueShared struct {
return &totalAggrState{ lastValues map[string]totalLastValue
resetTotalOnFlush: resetTotalOnFlush, total float64
keepFirstSample: keepFirstSample,
}
} }
func (as *totalAggrState) pushSamples(samples []pushSample, deleteDeadline int64, idx int) { type totalAggrValue struct {
var deleted bool total float64
for i := range samples { keepFirstSample bool
s := &samples[i] resetTotalOnFlush bool
inputKey, outputKey := getInputOutputKey(s.key) shared *totalAggrValueShared
}
again: func (av *totalAggrValue) pushSample(ctx *pushSampleCtx) {
v, ok := as.m.Load(outputKey) shared := av.shared
if !ok { inputKey := ctx.inputKey
// The entry is missing in the map. Try creating it. lv, ok := shared.lastValues[inputKey]
v = &totalStateValue{ if ok || av.keepFirstSample {
shared: totalState{ if ctx.sample.timestamp < lv.timestamp {
lastValues: make(map[string]totalLastValueState), // Skip out of order sample
}, return
}
outputKey = bytesutil.InternString(outputKey)
vNew, loaded := as.m.LoadOrStore(outputKey, v)
if loaded {
// Use the entry created by a concurrent goroutine.
v = vNew
}
} }
sv := v.(*totalStateValue) if ctx.sample.value >= lv.value {
sv.mu.Lock() av.total += ctx.sample.value - lv.value
deleted = sv.deleted } else {
if !deleted { // counter reset
lv, ok := sv.shared.lastValues[inputKey] av.total += ctx.sample.value
if ok || as.keepFirstSample {
if s.timestamp < lv.timestamp {
// Skip out of order sample
sv.mu.Unlock()
continue
}
if s.value >= lv.value {
sv.state[idx] += s.value - lv.value
} else {
// counter reset
sv.state[idx] += s.value
}
}
lv.value = s.value
lv.timestamp = s.timestamp
lv.deleteDeadline = deleteDeadline
inputKey = bytesutil.InternString(inputKey)
sv.shared.lastValues[inputKey] = lv
sv.deleteDeadline = deleteDeadline
}
sv.mu.Unlock()
if deleted {
// The entry has been deleted by the concurrent call to flushState
// Try obtaining and updating the entry again.
goto again
} }
} }
lv.value = ctx.sample.value
lv.timestamp = ctx.sample.timestamp
lv.deleteDeadline = ctx.deleteDeadline
inputKey = bytesutil.InternString(inputKey)
shared.lastValues[inputKey] = lv
} }
func (as *totalAggrState) getSuffix() string { func (av *totalAggrValue) flush(ctx *flushCtx, key string) {
// Note: this function is at hot path, so it shouldn't allocate. suffix := av.getSuffix()
if as.resetTotalOnFlush { // check for stale entries
if as.keepFirstSample { total := av.shared.total + av.total
return "increase" av.total = 0
} lvs := av.shared.lastValues
return "increase_prometheus" for lk, lv := range lvs {
} if ctx.flushTimestamp > lv.deleteDeadline {
if as.keepFirstSample { delete(lvs, lk)
return "total" }
} }
return "total_prometheus" if av.resetTotalOnFlush {
av.shared.total = 0
} else if math.Abs(total) >= (1 << 53) {
// It is time to reset the entry, since it starts losing float64 precision
av.shared.total = 0
} else {
av.shared.total = total
}
ctx.appendSeries(key, suffix, total)
} }
func (as *totalAggrState) flushState(ctx *flushCtx) { func (av *totalAggrValue) getSuffix() string {
var total float64 // Note: this function is at hot path, so it shouldn't allocate.
m := &as.m if av.resetTotalOnFlush {
suffix := as.getSuffix() if av.keepFirstSample {
m.Range(func(k, v interface{}) bool { return "increase"
sv := v.(*totalStateValue)
sv.mu.Lock()
// check for stale entries
deleted := ctx.flushTimestamp > sv.deleteDeadline
if deleted {
// Mark the current entry as deleted
sv.deleted = deleted
sv.mu.Unlock()
m.Delete(k)
return true
} }
total = sv.shared.total + sv.state[ctx.idx] return "increase_prometheus"
for k1, v1 := range sv.shared.lastValues { }
if ctx.flushTimestamp > v1.deleteDeadline { if av.keepFirstSample {
delete(sv.shared.lastValues, k1) return "total"
} }
} return "total_prometheus"
sv.state[ctx.idx] = 0
if !as.resetTotalOnFlush {
if math.Abs(total) >= (1 << 53) {
// It is time to reset the entry, since it starts losing float64 precision
sv.shared.total = 0
} else {
sv.shared.total = total
}
}
sv.mu.Unlock()
key := k.(string)
ctx.appendSeries(key, suffix, total)
return true
})
} }

View file

@ -1,86 +1,25 @@
package streamaggr package streamaggr
import ( func uniqueSamplesInitFn(values []aggrValue) []aggrValue {
"sync" for i := range values {
values[i] = &uniqueSamplesAggrValue{
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" samples: make(map[float64]struct{}),
) }
}
// uniqueSamplesAggrState calculates output=unique_samples, e.g. the number of unique sample values. return values
type uniqueSamplesAggrState struct {
m sync.Map
} }
type uniqueSamplesStateValue struct { type uniqueSamplesAggrValue struct {
mu sync.Mutex samples map[float64]struct{}
state [aggrStateSize]map[float64]struct{}
deleted bool
deleteDeadline int64
} }
func newUniqueSamplesAggrState() *uniqueSamplesAggrState { func (av *uniqueSamplesAggrValue) pushSample(ctx *pushSampleCtx) {
return &uniqueSamplesAggrState{} if _, ok := av.samples[ctx.sample.value]; !ok {
} av.samples[ctx.sample.value] = struct{}{}
func (as *uniqueSamplesAggrState) pushSamples(samples []pushSample, deleteDeadline int64, idx int) {
for i := range samples {
s := &samples[i]
outputKey := getOutputKey(s.key)
again:
v, ok := as.m.Load(outputKey)
if !ok {
// The entry is missing in the map. Try creating it.
usv := &uniqueSamplesStateValue{}
for iu := range usv.state {
usv.state[iu] = make(map[float64]struct{})
}
v = usv
outputKey = bytesutil.InternString(outputKey)
vNew, loaded := as.m.LoadOrStore(outputKey, v)
if loaded {
// Update the entry created by a concurrent goroutine.
v = vNew
}
}
sv := v.(*uniqueSamplesStateValue)
sv.mu.Lock()
deleted := sv.deleted
if !deleted {
if _, ok := sv.state[idx][s.value]; !ok {
sv.state[idx][s.value] = struct{}{}
}
sv.deleteDeadline = deleteDeadline
}
sv.mu.Unlock()
if deleted {
// The entry has been deleted by the concurrent call to flushState
// Try obtaining and updating the entry again.
goto again
}
} }
} }
func (as *uniqueSamplesAggrState) flushState(ctx *flushCtx) { func (av *uniqueSamplesAggrValue) flush(ctx *flushCtx, key string) {
m := &as.m ctx.appendSeries(key, "unique_samples", float64(len(av.samples)))
m.Range(func(k, v any) bool { clear(av.samples)
sv := v.(*uniqueSamplesStateValue)
sv.mu.Lock()
// check for stale entries
deleted := ctx.flushTimestamp > sv.deleteDeadline
if deleted {
// Mark the current entry as deleted
sv.deleted = deleted
sv.mu.Unlock()
m.Delete(k)
return true
}
state := len(sv.state[ctx.idx])
sv.state[ctx.idx] = make(map[float64]struct{})
sv.mu.Unlock()
key := k.(string)
ctx.appendSeries(key, "unique_samples", float64(state))
return true
})
} }