lib/streamaggr: added aggregation windows

This commit is contained in:
AndrewChubatiuk 2024-07-03 13:42:45 +03:00 committed by Andrii Chubatiuk
parent 0e1dbdee28
commit c8685741b3
22 changed files with 747 additions and 669 deletions

View file

@ -4,7 +4,6 @@ import (
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
)
// avgAggrState calculates output=avg, e.g. the average value over input samples.
@ -12,18 +11,23 @@ type avgAggrState struct {
m sync.Map
}
type avgState struct {
sum float64
count float64
}
type avgStateValue struct {
mu sync.Mutex
sum float64
count int64
deleted bool
mu sync.Mutex
state [aggrStateSize]avgState
deleted bool
deleteDeadline int64
}
func newAvgAggrState() *avgAggrState {
return &avgAggrState{}
}
func (as *avgAggrState) pushSamples(samples []pushSample) {
func (as *avgAggrState) pushSamples(samples []pushSample, deleteDeadline int64, idx int) {
for i := range samples {
s := &samples[i]
outputKey := getOutputKey(s.key)
@ -32,25 +36,21 @@ func (as *avgAggrState) pushSamples(samples []pushSample) {
v, ok := as.m.Load(outputKey)
if !ok {
// The entry is missing in the map. Try creating it.
v = &avgStateValue{
sum: s.value,
count: 1,
}
v = &avgStateValue{}
outputKey = bytesutil.InternString(outputKey)
vNew, loaded := as.m.LoadOrStore(outputKey, v)
if !loaded {
// The entry has been successfully stored
continue
if loaded {
// Use the entry created by a concurrent goroutine.
v = vNew
}
// Update the entry created by a concurrent goroutine.
v = vNew
}
sv := v.(*avgStateValue)
sv.mu.Lock()
deleted := sv.deleted
if !deleted {
sv.sum += s.value
sv.count++
sv.state[idx].sum += s.value
sv.state[idx].count++
sv.deleteDeadline = deleteDeadline
}
sv.mu.Unlock()
if deleted {
@ -61,26 +61,28 @@ func (as *avgAggrState) pushSamples(samples []pushSample) {
}
}
func (as *avgAggrState) flushState(ctx *flushCtx, resetState bool) {
currentTimeMsec := int64(fasttime.UnixTimestamp()) * 1000
func (as *avgAggrState) flushState(ctx *flushCtx, flushTimestamp int64, idx int) {
m := &as.m
m.Range(func(k, v interface{}) bool {
if resetState {
// Atomically delete the entry from the map, so new entry is created for the next flush.
m.Delete(k)
}
sv := v.(*avgStateValue)
sv.mu.Lock()
avg := sv.sum / float64(sv.count)
if resetState {
// Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls.
sv.deleted = true
}
sv.mu.Unlock()
key := k.(string)
ctx.appendSeries(key, "avg", currentTimeMsec, avg)
// check for stale entries
deleted := flushTimestamp > sv.deleteDeadline
if deleted {
// Mark the current entry as deleted
sv.deleted = deleted
sv.mu.Unlock()
m.Delete(k)
return true
}
state := sv.state[idx]
sv.state[idx] = avgState{}
sv.mu.Unlock()
if state.count > 0 {
key := k.(string)
ctx.appendSeries(key, "avg", flushTimestamp, state.sum/state.count)
}
return true
})
}

View file

@ -4,7 +4,6 @@ import (
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
)
// countSamplesAggrState calculates output=count_samples, e.g. the count of input samples.
@ -13,16 +12,17 @@ type countSamplesAggrState struct {
}
type countSamplesStateValue struct {
mu sync.Mutex
n uint64
deleted bool
mu sync.Mutex
state [aggrStateSize]uint64
deleted bool
deleteDeadline int64
}
func newCountSamplesAggrState() *countSamplesAggrState {
return &countSamplesAggrState{}
}
func (as *countSamplesAggrState) pushSamples(samples []pushSample) {
func (as *countSamplesAggrState) pushSamples(samples []pushSample, deleteDeadline int64, idx int) {
for i := range samples {
s := &samples[i]
outputKey := getOutputKey(s.key)
@ -31,23 +31,20 @@ func (as *countSamplesAggrState) pushSamples(samples []pushSample) {
v, ok := as.m.Load(outputKey)
if !ok {
// The entry is missing in the map. Try creating it.
v = &countSamplesStateValue{
n: 1,
}
v = &countSamplesStateValue{}
outputKey = bytesutil.InternString(outputKey)
vNew, loaded := as.m.LoadOrStore(outputKey, v)
if !loaded {
// The new entry has been successfully created.
continue
if loaded {
// Use the entry created by a concurrent goroutine.
v = vNew
}
// Use the entry created by a concurrent goroutine.
v = vNew
}
sv := v.(*countSamplesStateValue)
sv.mu.Lock()
deleted := sv.deleted
if !deleted {
sv.n++
sv.state[idx]++
sv.deleteDeadline = deleteDeadline
}
sv.mu.Unlock()
if deleted {
@ -58,26 +55,28 @@ func (as *countSamplesAggrState) pushSamples(samples []pushSample) {
}
}
func (as *countSamplesAggrState) flushState(ctx *flushCtx, resetState bool) {
currentTimeMsec := int64(fasttime.UnixTimestamp()) * 1000
func (as *countSamplesAggrState) flushState(ctx *flushCtx, flushTimestamp int64, idx int) {
m := &as.m
m.Range(func(k, v interface{}) bool {
if resetState {
// Atomically delete the entry from the map, so new entry is created for the next flush.
m.Delete(k)
}
sv := v.(*countSamplesStateValue)
sv.mu.Lock()
n := sv.n
if resetState {
// Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls.
sv.deleted = true
}
sv.mu.Unlock()
key := k.(string)
ctx.appendSeries(key, "count_samples", currentTimeMsec, float64(n))
// check for stale entries
deleted := flushTimestamp > sv.deleteDeadline
if deleted {
// Mark the current entry as deleted
sv.deleted = deleted
sv.mu.Unlock()
m.Delete(k)
return true
}
state := sv.state[idx]
sv.state[idx] = 0
sv.mu.Unlock()
if state > 0 {
key := k.(string)
ctx.appendSeries(key, "count_samples", flushTimestamp, float64(state))
}
return true
})
}

View file

@ -4,7 +4,6 @@ import (
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/cespare/xxhash/v2"
)
@ -14,16 +13,17 @@ type countSeriesAggrState struct {
}
type countSeriesStateValue struct {
mu sync.Mutex
m map[uint64]struct{}
deleted bool
mu sync.Mutex
state [aggrStateSize]map[uint64]struct{}
deleted bool
deleteDeadline int64
}
func newCountSeriesAggrState() *countSeriesAggrState {
return &countSeriesAggrState{}
}
func (as *countSeriesAggrState) pushSamples(samples []pushSample) {
func (as *countSeriesAggrState) pushSamples(samples []pushSample, deleteDeadline int64, idx int) {
for i := range samples {
s := &samples[i]
inputKey, outputKey := getInputOutputKey(s.key)
@ -36,27 +36,26 @@ func (as *countSeriesAggrState) pushSamples(samples []pushSample) {
v, ok := as.m.Load(outputKey)
if !ok {
// The entry is missing in the map. Try creating it.
v = &countSeriesStateValue{
m: map[uint64]struct{}{
h: {},
},
csv := &countSeriesStateValue{}
for ic := range csv.state {
csv.state[ic] = make(map[uint64]struct{})
}
v = csv
outputKey = bytesutil.InternString(outputKey)
vNew, loaded := as.m.LoadOrStore(outputKey, v)
if !loaded {
// The entry has been added to the map.
continue
if loaded {
// Update the entry created by a concurrent goroutine.
v = vNew
}
// Update the entry created by a concurrent goroutine.
v = vNew
}
sv := v.(*countSeriesStateValue)
sv.mu.Lock()
deleted := sv.deleted
if !deleted {
if _, ok := sv.m[h]; !ok {
sv.m[h] = struct{}{}
if _, ok := sv.state[idx][h]; !ok {
sv.state[idx][h] = struct{}{}
}
sv.deleteDeadline = deleteDeadline
}
sv.mu.Unlock()
if deleted {
@ -67,26 +66,28 @@ func (as *countSeriesAggrState) pushSamples(samples []pushSample) {
}
}
func (as *countSeriesAggrState) flushState(ctx *flushCtx, resetState bool) {
currentTimeMsec := int64(fasttime.UnixTimestamp()) * 1000
func (as *countSeriesAggrState) flushState(ctx *flushCtx, flushTimestamp int64, idx int) {
m := &as.m
m.Range(func(k, v interface{}) bool {
if resetState {
// Atomically delete the entry from the map, so new entry is created for the next flush.
m.Delete(k)
}
sv := v.(*countSeriesStateValue)
sv.mu.Lock()
n := len(sv.m)
if resetState {
// Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls.
sv.deleted = true
}
sv.mu.Unlock()
key := k.(string)
ctx.appendSeries(key, "count_series", currentTimeMsec, float64(n))
// check for stale entries
deleted := flushTimestamp > sv.deleteDeadline
if deleted {
// Mark the current entry as deleted
sv.deleted = deleted
sv.mu.Unlock()
m.Delete(k)
return true
}
state := len(sv.state[idx])
sv.state[idx] = make(map[uint64]struct{})
sv.mu.Unlock()
if state > 0 {
key := k.(string)
ctx.appendSeries(key, "count_series", flushTimestamp, float64(state))
}
return true
})
}

View file

@ -14,7 +14,8 @@ import (
const dedupAggrShardsCount = 128
type dedupAggr struct {
shards []dedupAggrShard
shards []dedupAggrShard
currentIdx atomic.Int32
}
type dedupAggrShard struct {
@ -25,16 +26,18 @@ type dedupAggrShard struct {
_ [128 - unsafe.Sizeof(dedupAggrShardNopad{})%128]byte
}
type dedupAggrShardNopad struct {
mu sync.Mutex
m map[string]*dedupAggrSample
type dedupAggrState struct {
m map[string]*dedupAggrSample
samplesBuf []dedupAggrSample
sizeBytes atomic.Uint64
itemsCount atomic.Uint64
}
type dedupAggrShardNopad struct {
mu sync.RWMutex
state [aggrStateSize]*dedupAggrState
}
type dedupAggrSample struct {
value float64
timestamp int64
@ -49,21 +52,27 @@ func newDedupAggr() *dedupAggr {
func (da *dedupAggr) sizeBytes() uint64 {
n := uint64(unsafe.Sizeof(*da))
currentIdx := da.currentIdx.Load()
for i := range da.shards {
n += da.shards[i].sizeBytes.Load()
if da.shards[i].state[currentIdx] != nil {
n += da.shards[i].state[currentIdx].sizeBytes.Load()
}
}
return n
}
func (da *dedupAggr) itemsCount() uint64 {
n := uint64(0)
currentIdx := da.currentIdx.Load()
for i := range da.shards {
n += da.shards[i].itemsCount.Load()
if da.shards[i].state[currentIdx] != nil {
n += da.shards[i].state[currentIdx].itemsCount.Load()
}
}
return n
}
func (da *dedupAggr) pushSamples(samples []pushSample) {
func (da *dedupAggr) pushSamples(samples []pushSample, dedupIdx int) {
pss := getPerShardSamples()
shards := pss.shards
for _, sample := range samples {
@ -75,7 +84,7 @@ func (da *dedupAggr) pushSamples(samples []pushSample) {
if len(shardSamples) == 0 {
continue
}
da.shards[i].pushSamples(shardSamples)
da.shards[i].pushSamples(shardSamples, dedupIdx)
}
putPerShardSamples(pss)
}
@ -104,7 +113,7 @@ func (ctx *dedupFlushCtx) reset() {
ctx.samples = ctx.samples[:0]
}
func (da *dedupAggr) flush(f func(samples []pushSample)) {
func (da *dedupAggr) flush(f aggrPushFunc, deleteDeadline int64, dedupIdx, flushIdx int) {
var wg sync.WaitGroup
for i := range da.shards {
flushConcurrencyCh <- struct{}{}
@ -116,10 +125,11 @@ func (da *dedupAggr) flush(f func(samples []pushSample)) {
}()
ctx := getDedupFlushCtx()
shard.flush(ctx, f)
shard.flush(ctx, f, deleteDeadline, dedupIdx, flushIdx)
putDedupFlushCtx(ctx)
}(&da.shards[i])
}
da.currentIdx.Store((da.currentIdx.Load() + 1) % aggrStateSize)
wg.Wait()
}
@ -154,18 +164,20 @@ func putPerShardSamples(pss *perShardSamples) {
var perShardSamplesPool sync.Pool
func (das *dedupAggrShard) pushSamples(samples []pushSample) {
func (das *dedupAggrShard) pushSamples(samples []pushSample, dedupIdx int) {
das.mu.Lock()
defer das.mu.Unlock()
m := das.m
if m == nil {
m = make(map[string]*dedupAggrSample, len(samples))
das.m = m
state := das.state[dedupIdx]
if state == nil {
state = &dedupAggrState{
m: make(map[string]*dedupAggrSample, len(samples)),
}
das.state[dedupIdx] = state
}
samplesBuf := das.samplesBuf
samplesBuf := state.samplesBuf
for _, sample := range samples {
s, ok := m[sample.key]
s, ok := state.m[sample.key]
if !ok {
samplesBuf = slicesutil.SetLength(samplesBuf, len(samplesBuf)+1)
s = &samplesBuf[len(samplesBuf)-1]
@ -173,10 +185,10 @@ func (das *dedupAggrShard) pushSamples(samples []pushSample) {
s.timestamp = sample.timestamp
key := bytesutil.InternString(sample.key)
m[key] = s
state.m[key] = s
das.itemsCount.Add(1)
das.sizeBytes.Add(uint64(len(key)) + uint64(unsafe.Sizeof(key)+unsafe.Sizeof(s)+unsafe.Sizeof(*s)))
das.state[dedupIdx].itemsCount.Add(1)
das.state[dedupIdx].sizeBytes.Add(uint64(len(key)) + uint64(unsafe.Sizeof(key)+unsafe.Sizeof(s)+unsafe.Sizeof(*s)))
continue
}
// Update the existing value according to logic described at https://docs.victoriametrics.com/#deduplication
@ -185,18 +197,20 @@ func (das *dedupAggrShard) pushSamples(samples []pushSample) {
s.timestamp = sample.timestamp
}
}
das.samplesBuf = samplesBuf
das.state[dedupIdx].samplesBuf = samplesBuf
}
func (das *dedupAggrShard) flush(ctx *dedupFlushCtx, f func(samples []pushSample)) {
func (das *dedupAggrShard) flush(ctx *dedupFlushCtx, f aggrPushFunc, deleteDeadline int64, dedupIdx, flushIdx int) {
das.mu.Lock()
m := das.m
if len(m) > 0 {
das.m = make(map[string]*dedupAggrSample, len(m))
das.sizeBytes.Store(0)
das.itemsCount.Store(0)
das.samplesBuf = make([]dedupAggrSample, 0, len(das.samplesBuf))
var m map[string]*dedupAggrSample
state := das.state[dedupIdx]
if state != nil && len(state.m) > 0 {
m = state.m
das.state[dedupIdx].m = make(map[string]*dedupAggrSample, len(state.m))
das.state[dedupIdx].samplesBuf = make([]dedupAggrSample, 0, len(das.state[dedupIdx].samplesBuf))
das.state[dedupIdx].sizeBytes.Store(0)
das.state[dedupIdx].itemsCount.Store(0)
}
das.mu.Unlock()
@ -215,11 +229,11 @@ func (das *dedupAggrShard) flush(ctx *dedupFlushCtx, f func(samples []pushSample
// Limit the number of samples per each flush in order to limit memory usage.
if len(dstSamples) >= 10_000 {
f(dstSamples)
f(dstSamples, deleteDeadline, flushIdx)
clear(dstSamples)
dstSamples = dstSamples[:0]
}
}
f(dstSamples)
f(dstSamples, deleteDeadline, flushIdx)
ctx.samples = dstSamples
}

View file

@ -5,6 +5,7 @@ import (
"reflect"
"sync"
"testing"
"time"
)
func TestDedupAggrSerial(t *testing.T) {
@ -20,7 +21,7 @@ func TestDedupAggrSerial(t *testing.T) {
sample.value = float64(i + j)
expectedSamplesMap[sample.key] = *sample
}
da.pushSamples(samples)
da.pushSamples(samples, 0)
}
if n := da.sizeBytes(); n > 5_000_000 {
@ -32,14 +33,16 @@ func TestDedupAggrSerial(t *testing.T) {
flushedSamplesMap := make(map[string]pushSample)
var mu sync.Mutex
flushSamples := func(samples []pushSample) {
flushSamples := func(samples []pushSample, _ int64, _ int) {
mu.Lock()
for _, sample := range samples {
flushedSamplesMap[sample.key] = sample
}
mu.Unlock()
}
da.flush(flushSamples)
flushTimestamp := time.Now().UnixMilli()
da.flush(flushSamples, flushTimestamp, 0, 0)
if !reflect.DeepEqual(expectedSamplesMap, flushedSamplesMap) {
t.Fatalf("unexpected samples;\ngot\n%v\nwant\n%v", flushedSamplesMap, expectedSamplesMap)
@ -70,7 +73,7 @@ func TestDedupAggrConcurrent(_ *testing.T) {
sample.key = fmt.Sprintf("key_%d", j)
sample.value = float64(i + j)
}
da.pushSamples(samples)
da.pushSamples(samples, 0)
}
}()
}

View file

@ -27,7 +27,7 @@ func benchmarkDedupAggr(b *testing.B, samplesPerPush int) {
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
for i := 0; i < loops; i++ {
da.pushSamples(benchSamples)
da.pushSamples(benchSamples, 0)
}
}
})

View file

@ -17,7 +17,8 @@ import (
type Deduplicator struct {
da *dedupAggr
dropLabels []string
dropLabels []string
dedupInterval int64
wg sync.WaitGroup
stopCh chan struct{}
@ -38,8 +39,9 @@ type Deduplicator struct {
// MustStop must be called on the returned deduplicator in order to free up occupied resources.
func NewDeduplicator(pushFunc PushFunc, dedupInterval time.Duration, dropLabels []string, alias string) *Deduplicator {
d := &Deduplicator{
da: newDedupAggr(),
dropLabels: dropLabels,
da: newDedupAggr(),
dropLabels: dropLabels,
dedupInterval: dedupInterval.Milliseconds(),
stopCh: make(chan struct{}),
ms: metrics.NewSet(),
@ -86,6 +88,7 @@ func (d *Deduplicator) Push(tss []prompbmarshal.TimeSeries) {
buf := ctx.buf
dropLabels := d.dropLabels
aggrIntervals := int64(aggrStateSize)
for _, ts := range tss {
if len(dropLabels) > 0 {
labels.Labels = dropSeriesLabels(labels.Labels[:0], ts.Labels, dropLabels)
@ -101,7 +104,9 @@ func (d *Deduplicator) Push(tss []prompbmarshal.TimeSeries) {
buf = lc.Compress(buf, labels.Labels)
key := bytesutil.ToUnsafeString(buf[bufLen:])
for _, s := range ts.Samples {
pss = append(pss, pushSample{
flushIntervals := s.Timestamp/d.dedupInterval + 1
idx := int(flushIntervals % aggrIntervals)
pss[idx] = append(pss[idx], pushSample{
key: key,
value: s.Value,
timestamp: s.Timestamp,
@ -109,7 +114,9 @@ func (d *Deduplicator) Push(tss []prompbmarshal.TimeSeries) {
}
}
d.da.pushSamples(pss)
for idx, ps := range pss {
d.da.pushSamples(ps, idx)
}
ctx.pss = pss
ctx.buf = buf
@ -132,17 +139,18 @@ func (d *Deduplicator) runFlusher(pushFunc PushFunc, dedupInterval time.Duration
select {
case <-d.stopCh:
return
case <-t.C:
d.flush(pushFunc, dedupInterval)
case t := <-t.C:
flushTime := t.Truncate(dedupInterval).Add(dedupInterval)
flushTimestamp := flushTime.UnixMilli()
flushIntervals := int(flushTimestamp / int64(dedupInterval/time.Millisecond))
flushIdx := flushIntervals % aggrStateSize
d.flush(pushFunc, dedupInterval, flushTime, flushIdx)
}
}
}
func (d *Deduplicator) flush(pushFunc PushFunc, dedupInterval time.Duration) {
startTime := time.Now()
timestamp := startTime.UnixMilli()
d.da.flush(func(pss []pushSample) {
func (d *Deduplicator) flush(pushFunc PushFunc, dedupInterval time.Duration, flushTime time.Time, flushIdx int) {
d.da.flush(func(pss []pushSample, _ int64, _ int) {
ctx := getDeduplicatorFlushCtx()
tss := ctx.tss
@ -155,7 +163,7 @@ func (d *Deduplicator) flush(pushFunc PushFunc, dedupInterval time.Duration) {
samplesLen := len(samples)
samples = append(samples, prompbmarshal.Sample{
Value: ps.value,
Timestamp: timestamp,
Timestamp: ps.timestamp,
})
tss = append(tss, prompbmarshal.TimeSeries{
@ -169,9 +177,9 @@ func (d *Deduplicator) flush(pushFunc PushFunc, dedupInterval time.Duration) {
ctx.labels = labels
ctx.samples = samples
putDeduplicatorFlushCtx(ctx)
})
}, flushTime.UnixMilli(), flushIdx, flushIdx)
duration := time.Since(startTime)
duration := time.Since(flushTime)
d.dedupFlushDuration.Update(duration.Seconds())
if duration > dedupInterval {
d.dedupFlushTimeouts.Inc()
@ -182,14 +190,15 @@ func (d *Deduplicator) flush(pushFunc PushFunc, dedupInterval time.Duration) {
}
type deduplicatorPushCtx struct {
pss []pushSample
pss [aggrStateSize][]pushSample
labels promutils.Labels
buf []byte
}
func (ctx *deduplicatorPushCtx) reset() {
clear(ctx.pss)
ctx.pss = ctx.pss[:0]
for i, sc := range ctx.pss {
ctx.pss[i] = sc[:0]
}
ctx.labels.Reset()

View file

@ -21,20 +21,26 @@ func TestDeduplicator(t *testing.T) {
tss := prompbmarshal.MustParsePromMetrics(`
foo{instance="x",job="aaa",pod="sdfd-dfdfdfs",node="aosijjewrerfd",namespace="asdff",container="ohohffd"} 123
bar{instance="x",job="aaa",pod="sdfd-dfdfdfs",node="aosijjewrerfd",namespace="asdff",container="ohohffd"} 34.54
x 8943 1000
x 8943 1
baz_aaa_aaa_fdd{instance="x",job="aaa",pod="sdfd-dfdfdfs",node="aosijjewrerfd",namespace="asdff",container="ohohffd"} -34.34
x 90984 900
x 433 1000
x 90984
x 433 1
asfjkldsf{instance="x",job="aaa",pod="sdfd-dfdfdfs",node="aosijjewrerfd",namespace="asdff",container="ohohffd"} 12322
foo{instance="x",job="aaa",pod="sdfd-dfdfdfs",node="aosijjewrerfd",namespace="asdff",container="ohohffd"} 894
baz_aaa_aaa_fdd{instance="x",job="aaa",pod="sdfd-dfdfdfs",node="aosijjewrerfd",namespace="asdff",container="ohohffd"} -2.3
`, offsetMsecs)
d := NewDeduplicator(pushFunc, time.Hour, []string{"node", "instance"}, "global")
dedupInterval := time.Hour
d := NewDeduplicator(pushFunc, dedupInterval, []string{"node", "instance"}, "global")
for i := 0; i < 10; i++ {
d.Push(tss)
}
d.flush(pushFunc, time.Hour)
flushTime := time.Now()
flushIntervals := flushTime.UnixMilli()/dedupInterval.Milliseconds() + 1
idx := int(flushIntervals % int64(aggrStateSize))
d.flush(pushFunc, time.Hour, time.Now(), idx)
d.MustStop()
result := timeSeriessToString(tssResult)

View file

@ -1,39 +1,30 @@
package streamaggr
import (
"math"
"sync"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/metrics"
)
// histogramBucketAggrState calculates output=histogram_bucket, e.g. VictoriaMetrics histogram over input samples.
type histogramBucketAggrState struct {
m sync.Map
stalenessSecs uint64
}
type histogramBucketStateValue struct {
mu sync.Mutex
h metrics.Histogram
deleteDeadline uint64
state [aggrStateSize]metrics.Histogram
total metrics.Histogram
deleted bool
deleteDeadline int64
}
func newHistogramBucketAggrState(stalenessInterval time.Duration) *histogramBucketAggrState {
stalenessSecs := roundDurationToSecs(stalenessInterval)
return &histogramBucketAggrState{
stalenessSecs: stalenessSecs,
}
func newHistogramBucketAggrState() *histogramBucketAggrState {
return &histogramBucketAggrState{}
}
func (as *histogramBucketAggrState) pushSamples(samples []pushSample) {
currentTime := fasttime.UnixTimestamp()
deleteDeadline := currentTime + as.stalenessSecs
func (as *histogramBucketAggrState) pushSamples(samples []pushSample, deleteDeadline int64, idx int) {
for i := range samples {
s := &samples[i]
outputKey := getOutputKey(s.key)
@ -54,7 +45,7 @@ func (as *histogramBucketAggrState) pushSamples(samples []pushSample) {
sv.mu.Lock()
deleted := sv.deleted
if !deleted {
sv.h.Update(s.value)
sv.state[idx].Update(s.value)
sv.deleteDeadline = deleteDeadline
}
sv.mu.Unlock()
@ -66,54 +57,32 @@ func (as *histogramBucketAggrState) pushSamples(samples []pushSample) {
}
}
func (as *histogramBucketAggrState) removeOldEntries(ctx *flushCtx, currentTime uint64) {
func (as *histogramBucketAggrState) flushState(ctx *flushCtx, flushTimestamp int64, idx int) {
m := &as.m
var staleOutputSamples int
m.Range(func(k, v interface{}) bool {
sv := v.(*histogramBucketStateValue)
sv.mu.Lock()
deleted := currentTime > sv.deleteDeadline
// check for stale entries
deleted := flushTimestamp > sv.deleteDeadline
if deleted {
// Mark the current entry as deleted
sv.deleted = deleted
staleOutputSamples++
}
sv.mu.Unlock()
if deleted {
sv.mu.Unlock()
m.Delete(k)
return true
}
sv.total.Merge(&sv.state[idx])
total := &sv.total
sv.state[idx] = metrics.Histogram{}
sv.mu.Unlock()
key := k.(string)
total.VisitNonZeroBuckets(func(vmrange string, count uint64) {
ctx.appendSeriesWithExtraLabel(key, "histogram_bucket", flushTimestamp, float64(count), "vmrange", vmrange)
})
return true
})
ctx.a.staleOutputSamples["histogram_bucket"].Add(staleOutputSamples)
}
func (as *histogramBucketAggrState) flushState(ctx *flushCtx, _ bool) {
currentTime := fasttime.UnixTimestamp()
currentTimeMsec := int64(currentTime) * 1000
as.removeOldEntries(ctx, currentTime)
m := &as.m
m.Range(func(k, v interface{}) bool {
sv := v.(*histogramBucketStateValue)
sv.mu.Lock()
if !sv.deleted {
key := k.(string)
sv.h.VisitNonZeroBuckets(func(vmrange string, count uint64) {
ctx.appendSeriesWithExtraLabel(key, "histogram_bucket", currentTimeMsec, float64(count), "vmrange", vmrange)
})
}
sv.mu.Unlock()
return true
})
}
func roundDurationToSecs(d time.Duration) uint64 {
if d < 0 {
return 0
}
secs := d.Seconds()
return uint64(math.Ceil(secs))
}

View file

@ -4,7 +4,6 @@ import (
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
)
// lastAggrState calculates output=last, e.g. the last value over input samples.
@ -13,17 +12,22 @@ type lastAggrState struct {
}
type lastStateValue struct {
mu sync.Mutex
mu sync.Mutex
state [aggrStateSize]lastState
deleted bool
deleteDeadline int64
}
type lastState struct {
last float64
timestamp int64
deleted bool
}
func newLastAggrState() *lastAggrState {
return &lastAggrState{}
}
func (as *lastAggrState) pushSamples(samples []pushSample) {
func (as *lastAggrState) pushSamples(samples []pushSample, deleteDeadline int64, idx int) {
for i := range samples {
s := &samples[i]
outputKey := getOutputKey(s.key)
@ -32,27 +36,23 @@ func (as *lastAggrState) pushSamples(samples []pushSample) {
v, ok := as.m.Load(outputKey)
if !ok {
// The entry is missing in the map. Try creating it.
v = &lastStateValue{
last: s.value,
timestamp: s.timestamp,
}
v = &lastStateValue{}
outputKey = bytesutil.InternString(outputKey)
vNew, loaded := as.m.LoadOrStore(outputKey, v)
if !loaded {
// The new entry has been successfully created.
continue
if loaded {
// Update the entry created by a concurrent goroutine.
v = vNew
}
// Use the entry created by a concurrent goroutine.
v = vNew
}
sv := v.(*lastStateValue)
sv.mu.Lock()
deleted := sv.deleted
if !deleted {
if s.timestamp >= sv.timestamp {
sv.last = s.value
sv.timestamp = s.timestamp
if s.timestamp >= sv.state[idx].timestamp {
sv.state[idx].last = s.value
sv.state[idx].timestamp = s.timestamp
}
sv.deleteDeadline = deleteDeadline
}
sv.mu.Unlock()
if deleted {
@ -63,26 +63,28 @@ func (as *lastAggrState) pushSamples(samples []pushSample) {
}
}
func (as *lastAggrState) flushState(ctx *flushCtx, resetState bool) {
currentTimeMsec := int64(fasttime.UnixTimestamp()) * 1000
func (as *lastAggrState) flushState(ctx *flushCtx, flushTimestamp int64, idx int) {
m := &as.m
m.Range(func(k, v interface{}) bool {
if resetState {
// Atomically delete the entry from the map, so new entry is created for the next flush.
m.Delete(k)
}
sv := v.(*lastStateValue)
sv.mu.Lock()
last := sv.last
if resetState {
// Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls.
sv.deleted = true
}
sv.mu.Unlock()
key := k.(string)
ctx.appendSeries(key, "last", currentTimeMsec, last)
// check for stale entries
deleted := flushTimestamp > sv.deleteDeadline
if deleted {
// Mark the current entry as deleted
sv.deleted = deleted
sv.mu.Unlock()
m.Delete(k)
return true
}
state := sv.state[idx]
sv.state[idx] = lastState{}
sv.mu.Unlock()
if state.timestamp > 0 {
key := k.(string)
ctx.appendSeries(key, "last", flushTimestamp, state.last)
}
return true
})
}

View file

@ -4,7 +4,6 @@ import (
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
)
// maxAggrState calculates output=max, e.g. the maximum value over input samples.
@ -13,16 +12,22 @@ type maxAggrState struct {
}
type maxStateValue struct {
mu sync.Mutex
max float64
deleted bool
mu sync.Mutex
state [aggrStateSize]maxState
deleted bool
deleteDeadline int64
}
type maxState struct {
max float64
exists bool
}
func newMaxAggrState() *maxAggrState {
return &maxAggrState{}
}
func (as *maxAggrState) pushSamples(samples []pushSample) {
func (as *maxAggrState) pushSamples(samples []pushSample, deleteDeadline int64, idx int) {
for i := range samples {
s := &samples[i]
outputKey := getOutputKey(s.key)
@ -31,25 +36,26 @@ func (as *maxAggrState) pushSamples(samples []pushSample) {
v, ok := as.m.Load(outputKey)
if !ok {
// The entry is missing in the map. Try creating it.
v = &maxStateValue{
max: s.value,
}
v = &maxStateValue{}
outputKey = bytesutil.InternString(outputKey)
vNew, loaded := as.m.LoadOrStore(outputKey, v)
if !loaded {
// The new entry has been successfully created.
continue
if loaded {
// Use the entry created by a concurrent goroutine.
v = vNew
}
// Use the entry created by a concurrent goroutine.
v = vNew
}
sv := v.(*maxStateValue)
sv.mu.Lock()
deleted := sv.deleted
if !deleted {
if s.value > sv.max {
sv.max = s.value
state := &sv.state[idx]
if !state.exists {
state.max = s.value
state.exists = true
} else if s.value > state.max {
state.max = s.value
}
sv.deleteDeadline = deleteDeadline
}
sv.mu.Unlock()
if deleted {
@ -60,26 +66,28 @@ func (as *maxAggrState) pushSamples(samples []pushSample) {
}
}
func (as *maxAggrState) flushState(ctx *flushCtx, resetState bool) {
currentTimeMsec := int64(fasttime.UnixTimestamp()) * 1000
func (as *maxAggrState) flushState(ctx *flushCtx, flushTimestamp int64, idx int) {
m := &as.m
m.Range(func(k, v interface{}) bool {
if resetState {
// Atomically delete the entry from the map, so new entry is created for the next flush.
m.Delete(k)
}
sv := v.(*maxStateValue)
sv.mu.Lock()
max := sv.max
if resetState {
// Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls.
sv.deleted = true
}
sv.mu.Unlock()
key := k.(string)
ctx.appendSeries(key, "max", currentTimeMsec, max)
// check for stale entries
deleted := flushTimestamp > sv.deleteDeadline
if deleted {
// Mark the current entry as deleted
sv.deleted = deleted
sv.mu.Unlock()
m.Delete(k)
return true
}
state := sv.state[idx]
sv.state[idx] = maxState{}
sv.mu.Unlock()
if state.exists {
key := k.(string)
ctx.appendSeries(key, "max", flushTimestamp, state.max)
}
return true
})
}

View file

@ -4,7 +4,6 @@ import (
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
)
// minAggrState calculates output=min, e.g. the minimum value over input samples.
@ -13,16 +12,22 @@ type minAggrState struct {
}
type minStateValue struct {
mu sync.Mutex
min float64
deleted bool
mu sync.Mutex
state [aggrStateSize]minState
deleted bool
deleteDeadline int64
}
type minState struct {
min float64
exists bool
}
func newMinAggrState() *minAggrState {
return &minAggrState{}
}
func (as *minAggrState) pushSamples(samples []pushSample) {
func (as *minAggrState) pushSamples(samples []pushSample, deleteDeadline int64, idx int) {
for i := range samples {
s := &samples[i]
outputKey := getOutputKey(s.key)
@ -31,25 +36,26 @@ func (as *minAggrState) pushSamples(samples []pushSample) {
v, ok := as.m.Load(outputKey)
if !ok {
// The entry is missing in the map. Try creating it.
v = &minStateValue{
min: s.value,
}
v = &minStateValue{}
outputKey = bytesutil.InternString(outputKey)
vNew, loaded := as.m.LoadOrStore(outputKey, v)
if !loaded {
// The new entry has been successfully created.
continue
if loaded {
// Use the entry created by a concurrent goroutine.
v = vNew
}
// Use the entry created by a concurrent goroutine.
v = vNew
}
sv := v.(*minStateValue)
sv.mu.Lock()
deleted := sv.deleted
if !deleted {
if s.value < sv.min {
sv.min = s.value
state := &sv.state[idx]
if !state.exists {
state.min = s.value
state.exists = true
} else if s.value < state.min {
state.min = s.value
}
sv.deleteDeadline = deleteDeadline
}
sv.mu.Unlock()
if deleted {
@ -60,25 +66,28 @@ func (as *minAggrState) pushSamples(samples []pushSample) {
}
}
func (as *minAggrState) flushState(ctx *flushCtx, resetState bool) {
currentTimeMsec := int64(fasttime.UnixTimestamp()) * 1000
func (as *minAggrState) flushState(ctx *flushCtx, flushTimestamp int64, idx int) {
m := &as.m
m.Range(func(k, v interface{}) bool {
if resetState {
// Atomically delete the entry from the map, so new entry is created for the next flush.
m.Delete(k)
}
sv := v.(*minStateValue)
sv.mu.Lock()
min := sv.min
if resetState {
// Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls.
sv.deleted = true
// check for stale entries
deleted := flushTimestamp > sv.deleteDeadline
if deleted {
// Mark the current entry as deleted
sv.deleted = deleted
sv.mu.Unlock()
m.Delete(k)
return true
}
state := sv.state[idx]
sv.state[idx] = minState{}
sv.mu.Unlock()
key := k.(string)
ctx.appendSeries(key, "min", currentTimeMsec, min)
if state.exists {
key := k.(string)
ctx.appendSeries(key, "min", flushTimestamp, state.min)
}
return true
})
}

View file

@ -5,21 +5,20 @@ import (
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/valyala/histogram"
)
// quantilesAggrState calculates output=quantiles, e.g. the the given quantiles over the input samples.
type quantilesAggrState struct {
m sync.Map
m sync.Map
phis []float64
}
type quantilesStateValue struct {
mu sync.Mutex
h *histogram.Fast
deleted bool
mu sync.Mutex
state [aggrStateSize]*histogram.Fast
deleted bool
deleteDeadline int64
}
func newQuantilesAggrState(phis []float64) *quantilesAggrState {
@ -28,7 +27,7 @@ func newQuantilesAggrState(phis []float64) *quantilesAggrState {
}
}
func (as *quantilesAggrState) pushSamples(samples []pushSample) {
func (as *quantilesAggrState) pushSamples(samples []pushSample, deleteDeadline int64, idx int) {
for i := range samples {
s := &samples[i]
outputKey := getOutputKey(s.key)
@ -37,15 +36,11 @@ func (as *quantilesAggrState) pushSamples(samples []pushSample) {
v, ok := as.m.Load(outputKey)
if !ok {
// The entry is missing in the map. Try creating it.
h := histogram.GetFast()
v = &quantilesStateValue{
h: h,
}
v = &quantilesStateValue{}
outputKey = bytesutil.InternString(outputKey)
vNew, loaded := as.m.LoadOrStore(outputKey, v)
if loaded {
// Use the entry created by a concurrent goroutine.
histogram.PutFast(h)
v = vNew
}
}
@ -53,7 +48,11 @@ func (as *quantilesAggrState) pushSamples(samples []pushSample) {
sv.mu.Lock()
deleted := sv.deleted
if !deleted {
sv.h.Update(s.value)
if sv.state[idx] == nil {
sv.state[idx] = histogram.GetFast()
}
sv.state[idx].Update(s.value)
sv.deleteDeadline = deleteDeadline
}
sv.mu.Unlock()
if deleted {
@ -64,33 +63,39 @@ func (as *quantilesAggrState) pushSamples(samples []pushSample) {
}
}
func (as *quantilesAggrState) flushState(ctx *flushCtx, resetState bool) {
currentTimeMsec := int64(fasttime.UnixTimestamp()) * 1000
func (as *quantilesAggrState) flushState(ctx *flushCtx, flushTimestamp int64, idx int) {
m := &as.m
phis := as.phis
var quantiles []float64
var b []byte
m.Range(func(k, v interface{}) bool {
if resetState {
// Atomically delete the entry from the map, so new entry is created for the next flush.
m.Delete(k)
}
sv := v.(*quantilesStateValue)
sv.mu.Lock()
quantiles = sv.h.Quantiles(quantiles[:0], phis)
histogram.PutFast(sv.h)
if resetState {
// Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls.
sv.deleted = true
// check for stale entries
deleted := flushTimestamp > sv.deleteDeadline
if deleted {
// Mark the current entry as deleted
sv.deleted = deleted
sv.mu.Unlock()
m.Delete(k)
return true
}
state := sv.state[idx]
quantiles = quantiles[:0]
if state != nil {
quantiles = state.Quantiles(quantiles[:0], phis)
histogram.PutFast(state)
state.Reset()
}
sv.mu.Unlock()
key := k.(string)
for i, quantile := range quantiles {
b = strconv.AppendFloat(b[:0], phis[i], 'g', -1, 64)
phiStr := bytesutil.InternBytes(b)
ctx.appendSeriesWithExtraLabel(key, "quantiles", currentTimeMsec, quantile, "quantile", phiStr)
if len(quantiles) > 0 {
key := k.(string)
for i, quantile := range quantiles {
b = strconv.AppendFloat(b[:0], phis[i], 'g', -1, 64)
phiStr := bytesutil.InternBytes(b)
ctx.appendSeriesWithExtraLabel(key, "quantiles", flushTimestamp, quantile, "quantile", phiStr)
}
}
return true
})

View file

@ -2,53 +2,52 @@ package streamaggr
import (
"sync"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
)
// rateAggrState calculates output=rate, e.g. the counter per-second change.
type rateAggrState struct {
m sync.Map
m sync.Map
suffix string
// Time series state is dropped if no new samples are received during stalenessSecs.
stalenessSecs uint64
}
type rateStateValue struct {
mu sync.Mutex
lastValues map[string]rateLastValueState
deleteDeadline uint64
state map[string]rateState
deleted bool
deleteDeadline int64
}
type rateState struct {
lastValues [aggrStateSize]rateLastValueState
// prevTimestamp stores timestamp of the last registered value
// in the previous aggregation interval
prevTimestamp int64
// prevValue stores last registered value
// in the previous aggregation interval
prevValue float64
deleteDeadline int64
}
type rateLastValueState struct {
value float64
timestamp int64
deleteDeadline uint64
firstValue float64
value float64
timestamp int64
// total stores cumulative difference between registered values
// in the aggregation interval
total float64
// prevTimestamp stores timestamp of the last registered value
// in the previous aggregation interval
prevTimestamp int64
}
func newRateAggrState(stalenessInterval time.Duration, suffix string) *rateAggrState {
stalenessSecs := roundDurationToSecs(stalenessInterval)
func newRateAggrState(suffix string) *rateAggrState {
return &rateAggrState{
suffix: suffix,
stalenessSecs: stalenessSecs,
suffix: suffix,
}
}
func (as *rateAggrState) pushSamples(samples []pushSample) {
currentTime := fasttime.UnixTimestamp()
deleteDeadline := currentTime + as.stalenessSecs
func (as *rateAggrState) pushSamples(samples []pushSample, deleteDeadline int64, idx int) {
for i := range samples {
s := &samples[i]
inputKey, outputKey := getInputOutputKey(s.key)
@ -57,9 +56,10 @@ func (as *rateAggrState) pushSamples(samples []pushSample) {
v, ok := as.m.Load(outputKey)
if !ok {
// The entry is missing in the map. Try creating it.
v = &rateStateValue{
lastValues: make(map[string]rateLastValueState),
rsv := &rateStateValue{
state: make(map[string]rateState),
}
v = rsv
outputKey = bytesutil.InternString(outputKey)
vNew, loaded := as.m.LoadOrStore(outputKey, v)
if loaded {
@ -71,15 +71,17 @@ func (as *rateAggrState) pushSamples(samples []pushSample) {
sv.mu.Lock()
deleted := sv.deleted
if !deleted {
lv, ok := sv.lastValues[inputKey]
if ok {
state, ok := sv.state[inputKey]
lv := state.lastValues[idx]
if ok && lv.timestamp > 0 {
if s.timestamp < lv.timestamp {
// Skip out of order sample
sv.mu.Unlock()
continue
}
if lv.prevTimestamp == 0 {
lv.prevTimestamp = lv.timestamp
if state.prevTimestamp == 0 {
state.prevTimestamp = lv.timestamp
state.prevValue = lv.value
}
if s.value >= lv.value {
lv.total += s.value - lv.value
@ -87,13 +89,15 @@ func (as *rateAggrState) pushSamples(samples []pushSample) {
// counter reset
lv.total += s.value
}
} else if state.prevTimestamp > 0 {
lv.firstValue = s.value
}
lv.value = s.value
lv.timestamp = s.timestamp
lv.deleteDeadline = deleteDeadline
state.lastValues[idx] = lv
state.deleteDeadline = deleteDeadline
inputKey = bytesutil.InternString(inputKey)
sv.lastValues[inputKey] = lv
sv.state[inputKey] = state
sv.deleteDeadline = deleteDeadline
}
sv.mu.Unlock()
@ -105,18 +109,15 @@ func (as *rateAggrState) pushSamples(samples []pushSample) {
}
}
func (as *rateAggrState) flushState(ctx *flushCtx, _ bool) {
currentTime := fasttime.UnixTimestamp()
currentTimeMsec := int64(currentTime) * 1000
func (as *rateAggrState) flushState(ctx *flushCtx, flushTimestamp int64, idx int) {
var staleOutputSamples, staleInputSamples int
m := &as.m
m.Range(func(k, v interface{}) bool {
sv := v.(*rateStateValue)
sv.mu.Lock()
// check for stale entries
deleted := currentTime > sv.deleteDeadline
deleted := flushTimestamp > sv.deleteDeadline
if deleted {
// Mark the current entry as deleted
sv.deleted = deleted
@ -126,26 +127,36 @@ func (as *rateAggrState) flushState(ctx *flushCtx, _ bool) {
return true
}
// Delete outdated entries in sv.lastValues
// Delete outdated entries in state
var rate float64
lvs := sv.lastValues
for k1, v1 := range lvs {
if currentTime > v1.deleteDeadline {
delete(lvs, k1)
totalItems := len(sv.state)
for k1, state := range sv.state {
if flushTimestamp > state.deleteDeadline {
delete(sv.state, k1)
staleInputSamples++
continue
}
rateInterval := v1.timestamp - v1.prevTimestamp
if v1.prevTimestamp > 0 && rateInterval > 0 {
v1 := state.lastValues[idx]
rateInterval := v1.timestamp - state.prevTimestamp
if rateInterval > 0 && state.prevTimestamp > 0 {
if v1.firstValue >= state.prevValue {
v1.total += v1.firstValue - state.prevValue
} else {
v1.total += v1.firstValue
}
// calculate rate only if value was seen at least twice with different timestamps
rate += v1.total * 1000 / float64(rateInterval)
v1.prevTimestamp = v1.timestamp
v1.total = 0
lvs[k1] = v1
rate += (v1.total) * 1000 / float64(rateInterval)
state.prevTimestamp = v1.timestamp
state.prevValue = v1.value
} else {
totalItems--
}
totalItems -= staleInputSamples
state.lastValues[idx] = rateLastValueState{}
sv.state[k1] = state
}
// capture m length after deleted items were removed
totalItems := len(lvs)
sv.mu.Unlock()
if as.suffix == "rate_avg" && totalItems > 0 {
@ -153,7 +164,7 @@ func (as *rateAggrState) flushState(ctx *flushCtx, _ bool) {
}
key := k.(string)
ctx.appendSeries(key, as.suffix, currentTimeMsec, rate)
ctx.appendSeries(key, as.suffix, flushTimestamp, rate)
return true
})
ctx.a.staleOutputSamples[as.suffix].Add(staleOutputSamples)

View file

@ -5,7 +5,6 @@ import (
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
)
// stddevAggrState calculates output=stddev, e.g. the average value over input samples.
@ -14,18 +13,23 @@ type stddevAggrState struct {
}
type stddevStateValue struct {
mu sync.Mutex
count float64
avg float64
q float64
deleted bool
mu sync.Mutex
state [aggrStateSize]stddevState
deleted bool
deleteDeadline int64
}
type stddevState struct {
count float64
avg float64
q float64
}
func newStddevAggrState() *stddevAggrState {
return &stddevAggrState{}
}
func (as *stddevAggrState) pushSamples(samples []pushSample) {
func (as *stddevAggrState) pushSamples(samples []pushSample, deleteDeadline int64, idx int) {
for i := range samples {
s := &samples[i]
outputKey := getOutputKey(s.key)
@ -47,10 +51,12 @@ func (as *stddevAggrState) pushSamples(samples []pushSample) {
deleted := sv.deleted
if !deleted {
// See `Rapid calculation methods` at https://en.wikipedia.org/wiki/Standard_deviation
sv.count++
avg := sv.avg + (s.value-sv.avg)/sv.count
sv.q += (s.value - sv.avg) * (s.value - avg)
sv.avg = avg
state := &sv.state[idx]
state.count++
avg := state.avg + (s.value-state.avg)/state.count
state.q += (s.value - state.avg) * (s.value - avg)
state.avg = avg
sv.deleteDeadline = deleteDeadline
}
sv.mu.Unlock()
if deleted {
@ -61,26 +67,28 @@ func (as *stddevAggrState) pushSamples(samples []pushSample) {
}
}
func (as *stddevAggrState) flushState(ctx *flushCtx, resetState bool) {
currentTimeMsec := int64(fasttime.UnixTimestamp()) * 1000
func (as *stddevAggrState) flushState(ctx *flushCtx, flushTimestamp int64, idx int) {
m := &as.m
m.Range(func(k, v interface{}) bool {
if resetState {
// Atomically delete the entry from the map, so new entry is created for the next flush.
m.Delete(k)
}
sv := v.(*stddevStateValue)
sv.mu.Lock()
stddev := math.Sqrt(sv.q / sv.count)
if resetState {
// Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls.
sv.deleted = true
}
sv.mu.Unlock()
key := k.(string)
ctx.appendSeries(key, "stddev", currentTimeMsec, stddev)
// check for stale entries
deleted := flushTimestamp > sv.deleteDeadline
if deleted {
// Mark the current entry as deleted
sv.deleted = deleted
sv.mu.Unlock()
m.Delete(k)
return true
}
state := sv.state[idx]
sv.state[idx] = stddevState{}
sv.mu.Unlock()
if state.count > 0 {
key := k.(string)
ctx.appendSeries(key, "stddev", flushTimestamp, math.Sqrt(state.q/state.count))
}
return true
})
}

View file

@ -4,7 +4,6 @@ import (
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
)
// stdvarAggrState calculates output=stdvar, e.g. the average value over input samples.
@ -13,18 +12,23 @@ type stdvarAggrState struct {
}
type stdvarStateValue struct {
mu sync.Mutex
count float64
avg float64
q float64
deleted bool
mu sync.Mutex
state [aggrStateSize]stdvarState
deleted bool
deleteDeadline int64
}
type stdvarState struct {
count float64
avg float64
q float64
}
func newStdvarAggrState() *stdvarAggrState {
return &stdvarAggrState{}
}
func (as *stdvarAggrState) pushSamples(samples []pushSample) {
func (as *stdvarAggrState) pushSamples(samples []pushSample, deleteDeadline int64, idx int) {
for i := range samples {
s := &samples[i]
outputKey := getOutputKey(s.key)
@ -46,10 +50,12 @@ func (as *stdvarAggrState) pushSamples(samples []pushSample) {
deleted := sv.deleted
if !deleted {
// See `Rapid calculation methods` at https://en.wikipedia.org/wiki/Standard_deviation
sv.count++
avg := sv.avg + (s.value-sv.avg)/sv.count
sv.q += (s.value - sv.avg) * (s.value - avg)
sv.avg = avg
state := &sv.state[idx]
state.count++
avg := state.avg + (s.value-state.avg)/state.count
state.q += (s.value - state.avg) * (s.value - avg)
state.avg = avg
sv.deleteDeadline = deleteDeadline
}
sv.mu.Unlock()
if deleted {
@ -60,26 +66,28 @@ func (as *stdvarAggrState) pushSamples(samples []pushSample) {
}
}
func (as *stdvarAggrState) flushState(ctx *flushCtx, resetState bool) {
currentTimeMsec := int64(fasttime.UnixTimestamp()) * 1000
func (as *stdvarAggrState) flushState(ctx *flushCtx, flushTimestamp int64, idx int) {
m := &as.m
m.Range(func(k, v interface{}) bool {
if resetState {
// Atomically delete the entry from the map, so new entry is created for the next flush.
m.Delete(k)
}
sv := v.(*stdvarStateValue)
sv.mu.Lock()
stdvar := sv.q / sv.count
if resetState {
// Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls.
sv.deleted = true
}
sv.mu.Unlock()
key := k.(string)
ctx.appendSeries(key, "stdvar", currentTimeMsec, stdvar)
// check for stale entries
deleted := flushTimestamp > sv.deleteDeadline
if deleted {
// Mark the current entry as deleted
sv.deleted = deleted
sv.mu.Unlock()
m.Delete(k)
return true
}
state := sv.state[idx]
sv.state[idx] = stdvarState{}
sv.mu.Unlock()
if state.count > 0 {
key := k.(string)
ctx.appendSeries(key, "stdvar", flushTimestamp, state.q/state.count)
}
return true
})
}

View file

@ -23,9 +23,13 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool"
"github.com/VictoriaMetrics/metrics"
"github.com/valyala/histogram"
"gopkg.in/yaml.v2"
)
// count of aggregation intervals for states
const aggrStateSize = 2
var supportedOutputs = []string{
"rate_sum",
"rate_avg",
@ -370,8 +374,9 @@ type aggregator struct {
dropInputLabels []string
inputRelabeling *promrelabel.ParsedConfigs
outputRelabeling *promrelabel.ParsedConfigs
inputRelabeling *promrelabel.ParsedConfigs
outputRelabeling *promrelabel.ParsedConfigs
stalenessInterval time.Duration
keepMetricNames bool
ignoreOldSamples bool
@ -379,6 +384,7 @@ type aggregator struct {
by []string
without []string
aggregateOnlyByTime bool
tickInterval int64
// da is set to non-nil if input samples must be de-duplicated
da *dedupAggr
@ -389,6 +395,10 @@ type aggregator struct {
// minTimestamp is used for ignoring old samples when ignoreOldSamples is set
minTimestamp atomic.Int64
// time to wait after interval end before flush
flushAfter *histogram.Fast
muFlushAfter sync.Mutex
// suffix contains a suffix, which should be added to aggregate metric names
//
// It contains the interval, labels in (by, without), plus output name.
@ -414,17 +424,15 @@ type aggregator struct {
}
type aggrState interface {
// pushSamples must push samples to the aggrState.
//
// samples[].key must be cloned by aggrState, since it may change after returning from pushSamples.
pushSamples(samples []pushSample)
flushState(ctx *flushCtx, resetState bool)
pushSamples(samples []pushSample, deleteDeadline int64, idx int)
flushState(ctx *flushCtx, flushTimestamp int64, idx int)
}
// PushFunc is called by Aggregators when it needs to push its state to metrics storage
type PushFunc func(tss []prompbmarshal.TimeSeries)
type aggrPushFunc func(samples []pushSample, deleteDeadline int64, idx int)
// newAggregator creates new aggregator for the given cfg, which pushes the aggregate data to pushFunc.
//
// opts can contain additional options. If opts is nil, then default options are used.
@ -557,17 +565,17 @@ func newAggregator(cfg *Config, pushFunc PushFunc, ms *metrics.Set, opts Options
}
switch output {
case "total":
aggrStates[i] = newTotalAggrState(stalenessInterval, false, true)
aggrStates[i] = newTotalAggrState(false, true)
case "total_prometheus":
aggrStates[i] = newTotalAggrState(stalenessInterval, false, false)
aggrStates[i] = newTotalAggrState(false, false)
case "increase":
aggrStates[i] = newTotalAggrState(stalenessInterval, true, true)
aggrStates[i] = newTotalAggrState(true, true)
case "increase_prometheus":
aggrStates[i] = newTotalAggrState(stalenessInterval, true, false)
aggrStates[i] = newTotalAggrState(true, false)
case "rate_sum":
aggrStates[i] = newRateAggrState(stalenessInterval, "rate_sum")
aggrStates[i] = newRateAggrState("rate_sum")
case "rate_avg":
aggrStates[i] = newRateAggrState(stalenessInterval, "rate_avg")
aggrStates[i] = newRateAggrState("rate_avg")
case "count_series":
aggrStates[i] = newCountSeriesAggrState()
case "count_samples":
@ -589,7 +597,7 @@ func newAggregator(cfg *Config, pushFunc PushFunc, ms *metrics.Set, opts Options
case "stdvar":
aggrStates[i] = newStdvarAggrState()
case "histogram_bucket":
aggrStates[i] = newHistogramBucketAggrState(stalenessInterval)
aggrStates[i] = newHistogramBucketAggrState()
default:
return nil, fmt.Errorf("unsupported output=%q; supported values: %s; "+
"see https://docs.victoriametrics.com/stream-aggregation/", output, supportedOutputs)
@ -626,18 +634,21 @@ func newAggregator(cfg *Config, pushFunc PushFunc, ms *metrics.Set, opts Options
inputRelabeling: inputRelabeling,
outputRelabeling: outputRelabeling,
keepMetricNames: keepMetricNames,
ignoreOldSamples: ignoreOldSamples,
keepMetricNames: keepMetricNames,
ignoreOldSamples: ignoreOldSamples,
stalenessInterval: stalenessInterval,
by: by,
without: without,
aggregateOnlyByTime: aggregateOnlyByTime,
tickInterval: interval.Milliseconds(),
aggrStates: aggrStates,
suffix: suffix,
stopCh: make(chan struct{}),
stopCh: make(chan struct{}),
flushAfter: histogram.NewFast(),
flushDuration: ms.GetOrCreateHistogram(fmt.Sprintf(`vm_streamaggr_flush_duration_seconds{outputs=%q, %s}`, outputs, metricLabels)),
dedupFlushDuration: ms.GetOrCreateHistogram(fmt.Sprintf(`vm_streamaggr_dedup_flush_duration_seconds{outputs=%q, %s}`, outputs, metricLabels)),
@ -663,6 +674,7 @@ func newAggregator(cfg *Config, pushFunc PushFunc, ms *metrics.Set, opts Options
}
if dedupInterval > 0 {
a.da = newDedupAggr()
a.tickInterval = dedupInterval.Milliseconds()
}
alignFlushToInterval := !opts.NoAlignFlushToInterval
@ -675,6 +687,12 @@ func newAggregator(cfg *Config, pushFunc PushFunc, ms *metrics.Set, opts Options
skipIncompleteFlush = !*v
}
minTime := time.Now()
if skipIncompleteFlush && alignFlushToInterval {
minTime = minTime.Truncate(interval).Add(interval)
}
a.minTimestamp.Store(minTime.UnixMilli())
a.wg.Add(1)
go func() {
a.runFlusher(pushFunc, alignFlushToInterval, skipIncompleteFlush, interval, dedupInterval, ignoreFirstIntervals)
@ -709,75 +727,93 @@ func (a *aggregator) runFlusher(pushFunc PushFunc, alignFlushToInterval, skipInc
}
}
if dedupInterval <= 0 {
alignedSleep(interval)
t := time.NewTicker(interval)
defer t.Stop()
flushDeadline := time.Now().Truncate(interval).Add(interval)
tickInterval := time.Duration(a.tickInterval) * time.Millisecond
alignedSleep(tickInterval)
if alignFlushToInterval && skipIncompleteFlush {
a.flush(nil, interval, true)
ignoreFirstIntervals--
var dedupIdx, flushIdx int
t := time.NewTicker(tickInterval)
defer t.Stop()
isSkippedFirstFlush := false
for tickerWait(t) {
ct := time.Now()
dedupTime := ct.Truncate(tickInterval)
if a.ignoreOldSamples {
dedupIdx, flushIdx = a.getFlushIndices(dedupInterval, interval, dedupTime, flushDeadline)
}
pf := pushFunc
// Calculate delay
a.muFlushAfter.Lock()
flushAfterMsec := a.flushAfter.Quantile(0.95)
a.flushAfter.Reset()
a.muFlushAfter.Unlock()
flushAfter := time.Duration(flushAfterMsec) * time.Millisecond
if flushAfter > tickInterval {
logger.Warnf("metrics ingestion lag (%v) is more than tick interval (%v). "+
"gaps are expected in aggregations", flushAfter, tickInterval)
pf = nil
} else {
time.Sleep(flushAfter)
}
for tickerWait(t) {
if ignoreFirstIntervals > 0 {
a.flush(nil, interval, true)
a.dedupFlush(dedupInterval, dedupTime.UnixMilli(), dedupIdx, flushIdx)
if ct.After(flushDeadline) {
// It is time to flush the aggregated state
if alignFlushToInterval && skipIncompleteFlush && !isSkippedFirstFlush {
a.flush(nil, interval, flushDeadline.UnixMilli(), flushIdx)
isSkippedFirstFlush = true
} else if ignoreFirstIntervals > 0 {
a.flush(nil, interval, flushDeadline.UnixMilli(), flushIdx)
ignoreFirstIntervals--
} else {
a.flush(pushFunc, interval, true)
a.flush(pf, interval, flushDeadline.UnixMilli(), flushIdx)
}
if alignFlushToInterval {
select {
case <-t.C:
default:
}
for ct.After(flushDeadline) {
flushDeadline = flushDeadline.Add(interval)
}
}
} else {
alignedSleep(dedupInterval)
t := time.NewTicker(dedupInterval)
defer t.Stop()
flushDeadline := time.Now().Add(interval)
isSkippedFirstFlush := false
for tickerWait(t) {
a.dedupFlush(dedupInterval)
ct := time.Now()
if ct.After(flushDeadline) {
// It is time to flush the aggregated state
if alignFlushToInterval && skipIncompleteFlush && !isSkippedFirstFlush {
a.flush(nil, interval, true)
ignoreFirstIntervals--
isSkippedFirstFlush = true
} else if ignoreFirstIntervals > 0 {
a.flush(nil, interval, true)
ignoreFirstIntervals--
} else {
a.flush(pushFunc, interval, true)
}
for ct.After(flushDeadline) {
flushDeadline = flushDeadline.Add(interval)
}
}
if alignFlushToInterval {
select {
case <-t.C:
default:
}
if alignFlushToInterval {
select {
case <-t.C:
default:
}
}
}
if !skipIncompleteFlush && ignoreFirstIntervals <= 0 {
a.dedupFlush(dedupInterval)
a.flush(pushFunc, interval, true)
dedupTime := time.Now().Truncate(tickInterval).Add(tickInterval)
if a.ignoreOldSamples {
dedupIdx, flushIdx = a.getFlushIndices(dedupInterval, interval, dedupTime, flushDeadline)
}
a.dedupFlush(dedupInterval, flushDeadline.UnixMilli(), dedupIdx, flushIdx)
a.flush(pushFunc, interval, flushDeadline.UnixMilli(), flushIdx)
}
}
func (a *aggregator) dedupFlush(dedupInterval time.Duration) {
func (a *aggregator) getFlushIndices(dedupInterval, interval time.Duration, dedupTime, flushTime time.Time) (int, int) {
flushTimestamp := flushTime.UnixMilli()
flushIntervals := int(flushTimestamp / int64(interval/time.Millisecond))
var dedupIndex, flushIndex int
if dedupInterval > 0 {
dedupTimestamp := dedupTime.UnixMilli()
dedupIntervals := int(dedupTimestamp / int64(dedupInterval/time.Millisecond))
intervalsRatio := int(interval / dedupInterval)
dedupIndex = dedupIntervals % aggrStateSize
flushIndex = flushIntervals % (aggrStateSize / intervalsRatio)
} else {
flushIndex = flushIntervals % aggrStateSize
dedupIndex = flushIndex
}
return dedupIndex, flushIndex
}
func (a *aggregator) dedupFlush(dedupInterval time.Duration, deleteDeadline int64, dedupIdx, flushIdx int) {
if dedupInterval <= 0 {
// The de-duplication is disabled.
return
@ -785,7 +821,7 @@ func (a *aggregator) dedupFlush(dedupInterval time.Duration) {
startTime := time.Now()
a.da.flush(a.pushSamples)
a.da.flush(a.pushSamples, deleteDeadline, dedupIdx, flushIdx)
d := time.Since(startTime)
a.dedupFlushDuration.Update(d.Seconds())
@ -797,14 +833,9 @@ func (a *aggregator) dedupFlush(dedupInterval time.Duration) {
}
}
func (a *aggregator) flush(pushFunc PushFunc, interval time.Duration, resetState bool) {
func (a *aggregator) flush(pushFunc PushFunc, interval time.Duration, flushTimestamp int64, idx int) {
startTime := time.Now()
// Update minTimestamp before flushing samples to the storage,
// since the flush durtion can be quite long.
// This should prevent from dropping samples with old timestamps when the flush takes long time.
a.minTimestamp.Store(startTime.UnixMilli() - 5_000)
var wg sync.WaitGroup
for _, as := range a.aggrStates {
flushConcurrencyCh <- struct{}{}
@ -816,7 +847,7 @@ func (a *aggregator) flush(pushFunc PushFunc, interval time.Duration, resetState
}()
ctx := getFlushCtx(a, pushFunc)
as.flushState(ctx, resetState)
as.flushState(ctx, flushTimestamp, idx)
ctx.flushSeries()
ctx.resetSeries()
putFlushCtx(ctx)
@ -826,6 +857,7 @@ func (a *aggregator) flush(pushFunc PushFunc, interval time.Duration, resetState
d := time.Since(startTime)
a.flushDuration.Update(d.Seconds())
a.minTimestamp.Store(flushTimestamp)
if d > interval {
a.flushTimeouts.Inc()
logger.Warnf("stream aggregation couldn't be finished in the configured interval=%s; it took %.03fs; "+
@ -855,11 +887,16 @@ func (a *aggregator) Push(tss []prompbmarshal.TimeSeries, matchIdxs []byte) {
labels := &ctx.labels
inputLabels := &ctx.inputLabels
outputLabels := &ctx.outputLabels
currentTime := time.Now()
currentTimestamp := currentTime.UnixMilli()
deleteDeadline := currentTime.Add(a.stalenessInterval)
deleteDeadlineMilli := deleteDeadline.UnixMilli()
dropLabels := a.dropInputLabels
ignoreOldSamples := a.ignoreOldSamples
minTimestamp := a.minTimestamp.Load()
var maxLag int64
var flushIdx int
for idx, ts := range tss {
if !a.match.Match(ts.Labels) {
continue
@ -892,20 +929,28 @@ func (a *aggregator) Push(tss []prompbmarshal.TimeSeries, matchIdxs []byte) {
// do not intern key because number of unique keys could be too high
key := bytesutil.ToUnsafeString(buf[bufLen:])
for _, sample := range ts.Samples {
a.muFlushAfter.Lock()
a.flushAfter.Update(float64(currentTimestamp - sample.Timestamp))
a.muFlushAfter.Unlock()
if math.IsNaN(sample.Value) {
a.ignoredNanSamples.Inc()
// Skip NaN values
a.ignoredNanSamples.Inc()
continue
}
if ignoreOldSamples && sample.Timestamp < minTimestamp {
a.ignoredOldSamples.Inc()
// Skip old samples outside the current aggregation interval
a.ignoredOldSamples.Inc()
continue
}
if maxLag < now-sample.Timestamp {
maxLag = now - sample.Timestamp
}
samples = append(samples, pushSample{
if ignoreOldSamples {
flushIdx = int((sample.Timestamp)/a.tickInterval+1) % aggrStateSize
}
samples[flushIdx] = append(samples[flushIdx], pushSample{
key: key,
value: sample.Value,
timestamp: sample.Timestamp,
@ -920,9 +965,13 @@ func (a *aggregator) Push(tss []prompbmarshal.TimeSeries, matchIdxs []byte) {
ctx.buf = buf
if a.da != nil {
a.da.pushSamples(samples)
for idx, s := range samples {
a.da.pushSamples(s, idx)
}
} else {
a.pushSamples(samples)
for idx, s := range samples {
a.pushSamples(s, deleteDeadlineMilli, idx)
}
}
}
@ -963,14 +1012,14 @@ func getInputOutputKey(key string) (string, string) {
return bytesutil.ToUnsafeString(inputKey), bytesutil.ToUnsafeString(outputKey)
}
func (a *aggregator) pushSamples(samples []pushSample) {
func (a *aggregator) pushSamples(samples []pushSample, deleteDeadline int64, idx int) {
for _, as := range a.aggrStates {
as.pushSamples(samples)
as.pushSamples(samples, deleteDeadline, idx)
}
}
type pushCtx struct {
samples []pushSample
samples [aggrStateSize][]pushSample
labels promutils.Labels
inputLabels promutils.Labels
outputLabels promutils.Labels
@ -978,8 +1027,9 @@ type pushCtx struct {
}
func (ctx *pushCtx) reset() {
clear(ctx.samples)
ctx.samples = ctx.samples[:0]
for i := range ctx.samples {
ctx.samples[i] = ctx.samples[i][:0]
}
ctx.labels.Reset()
ctx.inputLabels.Reset()

View file

@ -262,7 +262,7 @@ func TestAggregatorsSuccess(t *testing.T) {
outputs: [count_samples, sum_samples, count_series, last]
`, `
foo{abc="123"} 4
bar 5 100
bar 5 11
bar 34 10
foo{abc="123"} 8.5
foo{abc="456",de="fg"} 8
@ -507,8 +507,8 @@ foo:1m_by_abc_sum_samples{abc="456"} 8
`, `
foo 123
bar{baz="qwe"} 4.34
`, `bar:1m_total{baz="qwe"} 0
foo:1m_total 0
`, `bar:1m_total{baz="qwe"} 4.34
foo:1m_total 123
`, "11")
// total_prometheus output for non-repeated series
@ -529,16 +529,16 @@ foo:1m_total_prometheus 0
`, `
foo 123
bar{baz="qwe"} 1.31
bar{baz="qwe"} 4.34 1000
bar{baz="qwe"} 4.34 1
bar{baz="qwe"} 2
foo{baz="qwe"} -5
bar{baz="qwer"} 343
bar{baz="qwer"} 344
foo{baz="qwe"} 10
`, `bar:1m_total{baz="qwe"} 3.03
bar:1m_total{baz="qwer"} 1
foo:1m_total 0
foo:1m_total{baz="qwe"} 15
`, `bar:1m_total{baz="qwe"} 4.34
bar:1m_total{baz="qwer"} 344
foo:1m_total 123
foo:1m_total{baz="qwe"} 10
`, "11111111")
// total_prometheus output for repeated series
@ -574,8 +574,8 @@ foo{baz="qwe"} -5
bar{baz="qwer"} 343
bar{baz="qwer"} 344
foo{baz="qwe"} 10
`, `bar:1m_total 6.02
foo:1m_total 15
`, `bar:1m_total 350.34
foo:1m_total 133
`, "11111111")
// total_prometheus output for repeated series with group by __name__
@ -603,8 +603,8 @@ foo:1m_total_prometheus 15
`, `
foo 123
bar{baz="qwe"} 4.34
`, `bar:1m_increase{baz="qwe"} 0
foo:1m_increase 0
`, `bar:1m_increase{baz="qwe"} 4.34
foo:1m_increase 123
`, "11")
// increase_prometheus output for non-repeated series
@ -631,10 +631,10 @@ foo{baz="qwe"} -5
bar{baz="qwer"} 343
bar{baz="qwer"} 344
foo{baz="qwe"} 10
`, `bar:1m_increase{baz="qwe"} 5.02
bar:1m_increase{baz="qwer"} 1
foo:1m_increase 0
foo:1m_increase{baz="qwe"} 15
`, `bar:1m_increase{baz="qwe"} 6.34
bar:1m_increase{baz="qwer"} 344
foo:1m_increase 123
foo:1m_increase{baz="qwe"} 10
`, "11111111")
// increase_prometheus output for repeated series

View file

@ -55,7 +55,7 @@ func BenchmarkAggregatorsFlushSerial(b *testing.B) {
b.SetBytes(int64(len(benchSeries) * len(outputs)))
for i := 0; i < b.N; i++ {
for _, aggr := range a.as {
aggr.flush(pushFunc, time.Hour, false)
aggr.flush(pushFunc, time.Hour, time.Now().UnixMilli(), 0)
}
}
}

View file

@ -4,7 +4,6 @@ import (
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
)
// sumSamplesAggrState calculates output=sum_samples, e.g. the sum over input samples.
@ -13,16 +12,22 @@ type sumSamplesAggrState struct {
}
type sumSamplesStateValue struct {
mu sync.Mutex
sum float64
deleted bool
mu sync.Mutex
state [aggrStateSize]sumState
deleted bool
deleteDeadline int64
}
type sumState struct {
sum float64
exists bool
}
func newSumSamplesAggrState() *sumSamplesAggrState {
return &sumSamplesAggrState{}
}
func (as *sumSamplesAggrState) pushSamples(samples []pushSample) {
func (as *sumSamplesAggrState) pushSamples(samples []pushSample, deleteDeadline int64, idx int) {
for i := range samples {
s := &samples[i]
outputKey := getOutputKey(s.key)
@ -31,23 +36,21 @@ func (as *sumSamplesAggrState) pushSamples(samples []pushSample) {
v, ok := as.m.Load(outputKey)
if !ok {
// The entry is missing in the map. Try creating it.
v = &sumSamplesStateValue{
sum: s.value,
}
v = &sumSamplesStateValue{}
outputKey = bytesutil.InternString(outputKey)
vNew, loaded := as.m.LoadOrStore(outputKey, v)
if !loaded {
// The new entry has been successfully created.
continue
if loaded {
// Update the entry created by a concurrent goroutine.
v = vNew
}
// Use the entry created by a concurrent goroutine.
v = vNew
}
sv := v.(*sumSamplesStateValue)
sv.mu.Lock()
deleted := sv.deleted
if !deleted {
sv.sum += s.value
sv.state[idx].sum += s.value
sv.state[idx].exists = true
sv.deleteDeadline = deleteDeadline
}
sv.mu.Unlock()
if deleted {
@ -58,26 +61,28 @@ func (as *sumSamplesAggrState) pushSamples(samples []pushSample) {
}
}
func (as *sumSamplesAggrState) flushState(ctx *flushCtx, resetState bool) {
currentTimeMsec := int64(fasttime.UnixTimestamp()) * 1000
func (as *sumSamplesAggrState) flushState(ctx *flushCtx, flushTimestamp int64, idx int) {
m := &as.m
m.Range(func(k, v interface{}) bool {
if resetState {
// Atomically delete the entry from the map, so new entry is created for the next flush.
m.Delete(k)
}
sv := v.(*sumSamplesStateValue)
sv.mu.Lock()
sum := sv.sum
if resetState {
// Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls.
sv.deleted = true
}
sv.mu.Unlock()
key := k.(string)
ctx.appendSeries(key, "sum_samples", currentTimeMsec, sum)
// check for stale entries
deleted := flushTimestamp > sv.deleteDeadline
if deleted {
// Mark the current entry as deleted
sv.deleted = deleted
sv.mu.Unlock()
m.Delete(k)
return true
}
state := sv.state[idx]
sv.state[idx] = sumState{}
sv.mu.Unlock()
if state.exists {
key := k.(string)
ctx.appendSeries(key, "sum_samples", flushTimestamp, state.sum)
}
return true
})
}

View file

@ -3,10 +3,8 @@ package streamaggr
import (
"math"
"sync"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
)
// totalAggrState calculates output=total, e.g. the summary counter over input counters.
@ -20,36 +18,28 @@ type totalAggrState struct {
// Whether to take into account the first sample in new time series when calculating the output value.
keepFirstSample bool
// Time series state is dropped if no new samples are received during stalenessSecs.
//
// Aslo, the first sample per each new series is ignored during stalenessSecs even if keepFirstSample is set.
// see ignoreFirstSampleDeadline for more details.
stalenessSecs uint64
// The first sample per each new series is ignored until this unix timestamp deadline in seconds even if keepFirstSample is set.
// This allows avoiding an initial spike of the output values at startup when new time series
// cannot be distinguished from already existing series. This is tracked with ignoreFirstSampleDeadline.
ignoreFirstSampleDeadline uint64
}
type totalStateValue struct {
mu sync.Mutex
lastValues map[string]totalLastValueState
total float64
deleteDeadline uint64
shared totalState
state [aggrStateSize]float64
deleteDeadline int64
deleted bool
}
type totalState struct {
total float64
lastValues map[string]totalLastValueState
}
type totalLastValueState struct {
value float64
timestamp int64
deleteDeadline uint64
deleteDeadline int64
}
func newTotalAggrState(stalenessInterval time.Duration, resetTotalOnFlush, keepFirstSample bool) *totalAggrState {
stalenessSecs := roundDurationToSecs(stalenessInterval)
ignoreFirstSampleDeadline := fasttime.UnixTimestamp() + stalenessSecs
func newTotalAggrState(resetTotalOnFlush, keepFirstSample bool) *totalAggrState {
suffix := "total"
if resetTotalOnFlush {
suffix = "increase"
@ -58,18 +48,14 @@ func newTotalAggrState(stalenessInterval time.Duration, resetTotalOnFlush, keepF
suffix += "_prometheus"
}
return &totalAggrState{
suffix: suffix,
resetTotalOnFlush: resetTotalOnFlush,
keepFirstSample: keepFirstSample,
stalenessSecs: stalenessSecs,
ignoreFirstSampleDeadline: ignoreFirstSampleDeadline,
suffix: suffix,
resetTotalOnFlush: resetTotalOnFlush,
keepFirstSample: keepFirstSample,
}
}
func (as *totalAggrState) pushSamples(samples []pushSample) {
currentTime := fasttime.UnixTimestamp()
deleteDeadline := currentTime + as.stalenessSecs
keepFirstSample := as.keepFirstSample && currentTime > as.ignoreFirstSampleDeadline
func (as *totalAggrState) pushSamples(samples []pushSample, deleteDeadline int64, idx int) {
var deleted bool
for i := range samples {
s := &samples[i]
inputKey, outputKey := getInputOutputKey(s.key)
@ -79,7 +65,9 @@ func (as *totalAggrState) pushSamples(samples []pushSample) {
if !ok {
// The entry is missing in the map. Try creating it.
v = &totalStateValue{
lastValues: make(map[string]totalLastValueState),
shared: totalState{
lastValues: make(map[string]totalLastValueState),
},
}
outputKey = bytesutil.InternString(outputKey)
vNew, loaded := as.m.LoadOrStore(outputKey, v)
@ -90,10 +78,10 @@ func (as *totalAggrState) pushSamples(samples []pushSample) {
}
sv := v.(*totalStateValue)
sv.mu.Lock()
deleted := sv.deleted
deleted = sv.deleted
if !deleted {
lv, ok := sv.lastValues[inputKey]
if ok || keepFirstSample {
lv, ok := sv.shared.lastValues[inputKey]
if ok || as.keepFirstSample {
if s.timestamp < lv.timestamp {
// Skip out of order sample
sv.mu.Unlock()
@ -101,10 +89,10 @@ func (as *totalAggrState) pushSamples(samples []pushSample) {
}
if s.value >= lv.value {
sv.total += s.value - lv.value
sv.state[idx] += s.value - lv.value
} else {
// counter reset
sv.total += s.value
sv.state[idx] += s.value
}
}
lv.value = s.value
@ -112,7 +100,7 @@ func (as *totalAggrState) pushSamples(samples []pushSample) {
lv.deleteDeadline = deleteDeadline
inputKey = bytesutil.InternString(inputKey)
sv.lastValues[inputKey] = lv
sv.shared.lastValues[inputKey] = lv
sv.deleteDeadline = deleteDeadline
}
sv.mu.Unlock()
@ -124,64 +112,44 @@ func (as *totalAggrState) pushSamples(samples []pushSample) {
}
}
func (as *totalAggrState) removeOldEntries(ctx *flushCtx, currentTime uint64) {
func (as *totalAggrState) flushState(ctx *flushCtx, flushTimestamp int64, idx int) {
var total float64
m := &as.m
var staleInputSamples, staleOutputSamples int
m.Range(func(k, v interface{}) bool {
sv := v.(*totalStateValue)
sv.mu.Lock()
deleted := currentTime > sv.deleteDeadline
// check for stale entries
deleted := flushTimestamp > sv.deleteDeadline
if deleted {
// Mark the current entry as deleted
sv.deleted = deleted
staleOutputSamples++
} else {
// Delete outdated entries in sv.lastValues
m := sv.lastValues
for k1, v1 := range m {
if currentTime > v1.deleteDeadline {
delete(m, k1)
staleInputSamples++
}
sv.mu.Unlock()
m.Delete(k)
return true
}
total = sv.shared.total + sv.state[idx]
for k1, v1 := range sv.shared.lastValues {
if flushTimestamp > v1.deleteDeadline {
delete(sv.shared.lastValues, k1)
}
}
sv.state[idx] = 0
if !as.resetTotalOnFlush {
if math.Abs(total) >= (1 << 53) {
// It is time to reset the entry, since it starts losing float64 precision
sv.shared.total = 0
} else {
sv.shared.total = total
}
}
sv.mu.Unlock()
if deleted {
m.Delete(k)
}
key := k.(string)
ctx.appendSeries(key, as.suffix, flushTimestamp, total)
return true
})
ctx.a.staleInputSamples[as.suffix].Add(staleInputSamples)
ctx.a.staleOutputSamples[as.suffix].Add(staleOutputSamples)
}
func (as *totalAggrState) flushState(ctx *flushCtx, resetState bool) {
currentTime := fasttime.UnixTimestamp()
currentTimeMsec := int64(currentTime) * 1000
as.removeOldEntries(ctx, currentTime)
m := &as.m
m.Range(func(k, v interface{}) bool {
sv := v.(*totalStateValue)
sv.mu.Lock()
total := sv.total
if resetState {
if as.resetTotalOnFlush {
sv.total = 0
} else if math.Abs(sv.total) >= (1 << 53) {
// It is time to reset the entry, since it starts losing float64 precision
sv.total = 0
}
}
deleted := sv.deleted
sv.mu.Unlock()
if !deleted {
key := k.(string)
ctx.appendSeries(key, as.suffix, currentTimeMsec, total)
}
return true
})
}

View file

@ -4,7 +4,6 @@ import (
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
)
// uniqueSamplesAggrState calculates output=unique_samples, e.g. the number of unique sample values.
@ -13,16 +12,17 @@ type uniqueSamplesAggrState struct {
}
type uniqueSamplesStateValue struct {
mu sync.Mutex
m map[float64]struct{}
deleted bool
mu sync.Mutex
state [aggrStateSize]map[float64]struct{}
deleted bool
deleteDeadline int64
}
func newUniqueSamplesAggrState() *uniqueSamplesAggrState {
return &uniqueSamplesAggrState{}
}
func (as *uniqueSamplesAggrState) pushSamples(samples []pushSample) {
func (as *uniqueSamplesAggrState) pushSamples(samples []pushSample, deleteDeadline int64, idx int) {
for i := range samples {
s := &samples[i]
outputKey := getOutputKey(s.key)
@ -31,27 +31,26 @@ func (as *uniqueSamplesAggrState) pushSamples(samples []pushSample) {
v, ok := as.m.Load(outputKey)
if !ok {
// The entry is missing in the map. Try creating it.
v = &uniqueSamplesStateValue{
m: map[float64]struct{}{
s.value: {},
},
usv := &uniqueSamplesStateValue{}
for iu := range usv.state {
usv.state[iu] = make(map[float64]struct{})
}
v = usv
outputKey = bytesutil.InternString(outputKey)
vNew, loaded := as.m.LoadOrStore(outputKey, v)
if !loaded {
// The new entry has been successfully created.
continue
if loaded {
// Update the entry created by a concurrent goroutine.
v = vNew
}
// Use the entry created by a concurrent goroutine.
v = vNew
}
sv := v.(*uniqueSamplesStateValue)
sv.mu.Lock()
deleted := sv.deleted
if !deleted {
if _, ok := sv.m[s.value]; !ok {
sv.m[s.value] = struct{}{}
if _, ok := sv.state[idx][s.value]; !ok {
sv.state[idx][s.value] = struct{}{}
}
sv.deleteDeadline = deleteDeadline
}
sv.mu.Unlock()
if deleted {
@ -62,26 +61,28 @@ func (as *uniqueSamplesAggrState) pushSamples(samples []pushSample) {
}
}
func (as *uniqueSamplesAggrState) flushState(ctx *flushCtx, resetState bool) {
currentTimeMsec := int64(fasttime.UnixTimestamp()) * 1000
func (as *uniqueSamplesAggrState) flushState(ctx *flushCtx, flushTimestamp int64, idx int) {
m := &as.m
m.Range(func(k, v interface{}) bool {
if resetState {
// Atomically delete the entry from the map, so new entry is created for the next flush.
m.Delete(k)
}
sv := v.(*uniqueSamplesStateValue)
sv.mu.Lock()
n := len(sv.m)
if resetState {
// Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls.
sv.deleted = true
}
sv.mu.Unlock()
key := k.(string)
ctx.appendSeries(key, "unique_samples", currentTimeMsec, float64(n))
// check for stale entries
deleted := flushTimestamp > sv.deleteDeadline
if deleted {
// Mark the current entry as deleted
sv.deleted = deleted
sv.mu.Unlock()
m.Delete(k)
return true
}
state := len(sv.state[idx])
sv.state[idx] = make(map[float64]struct{})
sv.mu.Unlock()
if state > 0 {
key := k.(string)
ctx.appendSeries(key, "unique_series", flushTimestamp, float64(state))
}
return true
})
}