From 60941a311fc4033876bdb9267cad78a8e22b62de Mon Sep 17 00:00:00 2001 From: Andrii Chubatiuk Date: Mon, 18 Nov 2024 09:29:53 +0200 Subject: [PATCH] added compressor cleanup --- lib/promutils/labelscompressor.go | 141 ++++++++++++++- lib/promutils/labelscompressor_test.go | 8 +- lib/promutils/labelscompressor_timing_test.go | 8 +- lib/streamaggr/avg.go | 44 +++-- lib/streamaggr/count_samples.go | 40 +++-- lib/streamaggr/count_series.go | 38 ++-- lib/streamaggr/dedup.go | 82 +++++---- lib/streamaggr/dedup_test.go | 22 ++- lib/streamaggr/dedup_timing_test.go | 10 +- lib/streamaggr/deduplicator.go | 29 +++- lib/streamaggr/histogram_bucket.go | 38 ++-- lib/streamaggr/last.go | 39 +++-- lib/streamaggr/max.go | 44 +++-- lib/streamaggr/min.go | 47 +++-- lib/streamaggr/quantiles.go | 24 ++- lib/streamaggr/rate.go | 31 ++-- lib/streamaggr/stddev.go | 34 +++- lib/streamaggr/stdvar.go | 33 +++- lib/streamaggr/streamaggr.go | 164 ++++++++++-------- lib/streamaggr/sum_samples.go | 48 +++-- lib/streamaggr/total.go | 56 +++--- lib/streamaggr/unique_samples.go | 42 +++-- 22 files changed, 652 insertions(+), 370 deletions(-) diff --git a/lib/promutils/labelscompressor.go b/lib/promutils/labelscompressor.go index 24c9d468d..b362cf493 100644 --- a/lib/promutils/labelscompressor.go +++ b/lib/promutils/labelscompressor.go @@ -11,16 +11,77 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" ) +type queue struct { + in chan uint64 + out chan uint64 + storage []uint64 +} + +func newQueue() *queue { + q := &queue{ + out: make(chan uint64), + in: make(chan uint64), + } + go func() { + defer close(q.out) + for { + if len(q.storage) == 0 { + item, ok := <-q.in + if !ok { + return + } + q.storage = append(q.storage, item) + continue + } + + select { + case item, ok := <-q.in: + if ok { + q.storage = append(q.storage, item) + } else { + // unwind storage + for _, item := range q.storage { + q.out <- item + } + + return + } + case q.out <- q.storage[0]: + if len(q.storage) == 1 { + q.storage = nil + } else { + q.storage = q.storage[1:] + } + } + } + }() + return q +} + +type compressedLabel struct { + mu sync.Mutex + code uint64 + deleted bool + deleteDeadline int64 +} + // LabelsCompressor compresses []prompbmarshal.Label into short binary strings type LabelsCompressor struct { labelToIdx sync.Map idxToLabel labelsMap + freeIdxs *queue nextIdx atomic.Uint64 totalSizeBytes atomic.Uint64 } +func NewLabelsCompressor() *LabelsCompressor { + return &LabelsCompressor{ + freeIdxs: newQueue(), + } +} + // SizeBytes returns the size of lc data in bytes func (lc *LabelsCompressor) SizeBytes() uint64 { return uint64(unsafe.Sizeof(*lc)) + lc.totalSizeBytes.Load() @@ -28,13 +89,60 @@ func (lc *LabelsCompressor) SizeBytes() uint64 { // ItemsCount returns the number of items in lc func (lc *LabelsCompressor) ItemsCount() uint64 { - return lc.nextIdx.Load() + return lc.nextIdx.Load() - uint64(len(lc.freeIdxs.storage)) +} + +// Delete adds stale labels idx to lc.freeIdxs list +func (lc *LabelsCompressor) Delete(src []byte, ts int64) { + labelsLen, nSize := encoding.UnmarshalVarUint64(src) + if nSize <= 0 { + logger.Panicf("BUG: cannot unmarshal labels length from uvarint") + } + tail := src[nSize:] + if labelsLen == 0 { + // fast path - nothing to decode + if len(tail) > 0 { + logger.Panicf("BUG: unexpected non-empty tail left; len(tail)=%d; tail=%X", len(tail), tail) + } + return + } + + a := encoding.GetUint64s(int(labelsLen)) + var err error + tail, err = encoding.UnmarshalVarUint64s(a.A, tail) + if err != nil { + logger.Panicf("BUG: cannot unmarshal label indexes: %s", err) + } + if len(tail) > 0 { + logger.Panicf("BUG: unexpected non-empty tail left: len(tail)=%d; tail=%X", len(tail), tail) + } + for _, idx := range a.A { + label, ok := lc.idxToLabel.Load(idx) + if !ok { + logger.Panicf("BUG: missing label for idx=%d", idx) + } + v, ok := lc.labelToIdx.Load(label) + if !ok { + continue + } + cl := v.(compressedLabel) + cl.mu.Lock() + if cl.deleteDeadline < ts { + cl.deleted = true + cl.mu.Unlock() + lc.freeIdxs.in <- idx + } else { + cl.mu.Unlock() + } + + } + encoding.PutUint64s(a) } // Compress compresses labels, appends the compressed labels to dst and returns the result. // // It is safe calling Compress from concurrent goroutines. -func (lc *LabelsCompressor) Compress(dst []byte, labels []prompbmarshal.Label) []byte { +func (lc *LabelsCompressor) Compress(dst []byte, labels []prompbmarshal.Label, deleteDeadline int64) []byte { if len(labels) == 0 { // Fast path return append(dst, 0) @@ -42,22 +150,27 @@ func (lc *LabelsCompressor) Compress(dst []byte, labels []prompbmarshal.Label) [ a := encoding.GetUint64s(len(labels) + 1) a.A[0] = uint64(len(labels)) - lc.compress(a.A[1:], labels) + lc.compress(a.A[1:], labels, deleteDeadline) dst = encoding.MarshalVarUint64s(dst, a.A) encoding.PutUint64s(a) return dst } -func (lc *LabelsCompressor) compress(dst []uint64, labels []prompbmarshal.Label) { +func (lc *LabelsCompressor) compress(dst []uint64, labels []prompbmarshal.Label, deleteDeadline int64) { if len(labels) == 0 { return } _ = dst[len(labels)-1] for i, label := range labels { + again: v, ok := lc.labelToIdx.Load(label) if !ok { - idx := lc.nextIdx.Add(1) - v = idx + var idx uint64 + select { + case idx = <-lc.freeIdxs.out: + default: + idx = lc.nextIdx.Add(1) + } labelCopy := cloneLabel(label) // Must store idxToLabel entry before labelToIdx, @@ -66,6 +179,10 @@ func (lc *LabelsCompressor) compress(dst []uint64, labels []prompbmarshal.Label) // We might store duplicated entries for single label with different indexes, // and it's fine, see https://github.com/VictoriaMetrics/VictoriaMetrics/pull/7118. lc.idxToLabel.Store(idx, labelCopy) + v = &compressedLabel{ + deleteDeadline: deleteDeadline, + code: idx, + } vNew, loaded := lc.labelToIdx.LoadOrStore(labelCopy, v) if loaded { // This label has been stored by a concurrent goroutine with different index, @@ -78,7 +195,17 @@ func (lc *LabelsCompressor) compress(dst []uint64, labels []prompbmarshal.Label) entrySizeBytes := labelSizeBytes + uint64(2*(unsafe.Sizeof(label)+unsafe.Sizeof(&label))+unsafe.Sizeof(v)) lc.totalSizeBytes.Add(entrySizeBytes) } - dst[i] = v.(uint64) + cl := v.(*compressedLabel) + dst[i] = cl.code + cl.mu.Lock() + deleted := cl.deleted + if !deleted { + cl.deleteDeadline = deleteDeadline + } + cl.mu.Unlock() + if deleted { + goto again + } } } diff --git a/lib/promutils/labelscompressor_test.go b/lib/promutils/labelscompressor_test.go index 9b520dba8..3a4256174 100644 --- a/lib/promutils/labelscompressor_test.go +++ b/lib/promutils/labelscompressor_test.go @@ -9,14 +9,14 @@ import ( ) func TestLabelsCompressorSerial(t *testing.T) { - var lc LabelsCompressor + lc := NewLabelsCompressor() f := func(labels []prompbmarshal.Label) { t.Helper() sExpected := labelsToString(labels) - data := lc.Compress(nil, labels) + data := lc.Compress(nil, labels, 0) labelsResult := lc.Decompress(nil, data) sResult := labelsToString(labelsResult) @@ -67,7 +67,7 @@ func TestLabelsCompressorSerial(t *testing.T) { func TestLabelsCompressorConcurrent(t *testing.T) { const concurrency = 5 - var lc LabelsCompressor + lc := NewLabelsCompressor() var expectCompressedKeys sync.Map var wg sync.WaitGroup @@ -78,7 +78,7 @@ func TestLabelsCompressorConcurrent(t *testing.T) { series := newTestSeries(100, 20) for n, labels := range series { sExpected := labelsToString(labels) - data := lc.Compress(nil, labels) + data := lc.Compress(nil, labels, 0) if expectData, ok := expectCompressedKeys.LoadOrStore(n, data); ok { if string(data) != string(expectData.([]byte)) { panic(fmt.Errorf("unexpected compress result at series/%d in iteration %d ", n, i)) diff --git a/lib/promutils/labelscompressor_timing_test.go b/lib/promutils/labelscompressor_timing_test.go index 11f6c4f1c..723851b9c 100644 --- a/lib/promutils/labelscompressor_timing_test.go +++ b/lib/promutils/labelscompressor_timing_test.go @@ -8,7 +8,7 @@ import ( ) func BenchmarkLabelsCompressorCompress(b *testing.B) { - var lc LabelsCompressor + lc := NewLabelsCompressor() series := newTestSeries(100, 10) b.ReportAllocs() @@ -19,7 +19,7 @@ func BenchmarkLabelsCompressorCompress(b *testing.B) { for pb.Next() { dst = dst[:0] for _, labels := range series { - dst = lc.Compress(dst, labels) + dst = lc.Compress(dst, labels, 0) } Sink.Add(uint64(len(dst))) } @@ -27,13 +27,13 @@ func BenchmarkLabelsCompressorCompress(b *testing.B) { } func BenchmarkLabelsCompressorDecompress(b *testing.B) { - var lc LabelsCompressor + lc := NewLabelsCompressor() series := newTestSeries(100, 10) datas := make([][]byte, len(series)) var dst []byte for i, labels := range series { dstLen := len(dst) - dst = lc.Compress(dst, labels) + dst = lc.Compress(dst, labels, 0) datas[i] = dst[dstLen:] } diff --git a/lib/streamaggr/avg.go b/lib/streamaggr/avg.go index 32ab569ac..799fdfbd1 100644 --- a/lib/streamaggr/avg.go +++ b/lib/streamaggr/avg.go @@ -12,37 +12,33 @@ type avgAggrState struct { } type avgStateValue struct { - mu sync.Mutex - sum float64 - count int64 - deleted bool + mu sync.Mutex + sum float64 + count int64 + deleted bool + deleteDeadline int64 } func newAvgAggrState() *avgAggrState { return &avgAggrState{} } -func (as *avgAggrState) pushSamples(samples []pushSample) { +func (as *avgAggrState) pushSamples(samples []pushSample, deleteDeadline int64, includeInputKey bool) { for i := range samples { s := &samples[i] - outputKey := getOutputKey(s.key) + outputKey := getOutputKey(s.key, includeInputKey) again: v, ok := as.m.Load(outputKey) if !ok { // The entry is missing in the map. Try creating it. - v = &avgStateValue{ - sum: s.value, - count: 1, - } + v = &avgStateValue{} outputKey = bytesutil.InternString(outputKey) vNew, loaded := as.m.LoadOrStore(outputKey, v) - if !loaded { - // The entry has been successfully stored - continue + if loaded { + // Update the entry created by a concurrent goroutine. + v = vNew } - // Update the entry created by a concurrent goroutine. - v = vNew } sv := v.(*avgStateValue) sv.mu.Lock() @@ -50,6 +46,7 @@ func (as *avgAggrState) pushSamples(samples []pushSample) { if !deleted { sv.sum += s.value sv.count++ + sv.deleteDeadline = deleteDeadline } sv.mu.Unlock() if deleted { @@ -68,9 +65,22 @@ func (as *avgAggrState) flushState(ctx *flushCtx) { sv := v.(*avgStateValue) sv.mu.Lock() + if ctx.flushTimestamp > sv.deleteDeadline { + sv.deleted = true + sv.mu.Unlock() + key := k.(string) + ctx.a.lc.Delete(bytesutil.ToUnsafeBytes(key), ctx.flushTimestamp) + m.Delete(k) + return true + } + if sv.count == 0 { + sv.mu.Unlock() + return true + } + avg := sv.sum / float64(sv.count) - // Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls. - sv.deleted = true + sv.sum = 0 + sv.count = 0 sv.mu.Unlock() key := k.(string) diff --git a/lib/streamaggr/count_samples.go b/lib/streamaggr/count_samples.go index ca2bc709e..9afaf6bcb 100644 --- a/lib/streamaggr/count_samples.go +++ b/lib/streamaggr/count_samples.go @@ -12,41 +12,39 @@ type countSamplesAggrState struct { } type countSamplesStateValue struct { - mu sync.Mutex - n uint64 - deleted bool + mu sync.Mutex + n uint64 + deleted bool + deleteDeadline int64 } func newCountSamplesAggrState() *countSamplesAggrState { return &countSamplesAggrState{} } -func (as *countSamplesAggrState) pushSamples(samples []pushSample) { +func (as *countSamplesAggrState) pushSamples(samples []pushSample, deleteDeadline int64, includeInputKey bool) { for i := range samples { s := &samples[i] - outputKey := getOutputKey(s.key) + outputKey := getOutputKey(s.key, includeInputKey) again: v, ok := as.m.Load(outputKey) if !ok { // The entry is missing in the map. Try creating it. - v = &countSamplesStateValue{ - n: 1, - } + v = &countSamplesStateValue{} outputKey = bytesutil.InternString(outputKey) vNew, loaded := as.m.LoadOrStore(outputKey, v) - if !loaded { - // The new entry has been successfully created. - continue + if loaded { + // Use the entry created by a concurrent goroutine. + v = vNew } - // Use the entry created by a concurrent goroutine. - v = vNew } sv := v.(*countSamplesStateValue) sv.mu.Lock() deleted := sv.deleted if !deleted { sv.n++ + sv.deleteDeadline = deleteDeadline } sv.mu.Unlock() if deleted { @@ -65,9 +63,21 @@ func (as *countSamplesAggrState) flushState(ctx *flushCtx) { sv := v.(*countSamplesStateValue) sv.mu.Lock() + if ctx.flushTimestamp > sv.deleteDeadline { + sv.deleted = true + sv.mu.Unlock() + key := k.(string) + ctx.a.lc.Delete(bytesutil.ToUnsafeBytes(key), ctx.flushTimestamp) + m.Delete(k) + return true + } n := sv.n - // Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls. - sv.deleted = true + if n == 0 { + sv.mu.Unlock() + return true + } + + sv.n = 0 sv.mu.Unlock() key := k.(string) diff --git a/lib/streamaggr/count_series.go b/lib/streamaggr/count_series.go index 816581464..9e6ded1e4 100644 --- a/lib/streamaggr/count_series.go +++ b/lib/streamaggr/count_series.go @@ -13,16 +13,17 @@ type countSeriesAggrState struct { } type countSeriesStateValue struct { - mu sync.Mutex - m map[uint64]struct{} - deleted bool + mu sync.Mutex + m map[uint64]struct{} + deleted bool + deleteDeadline int64 } func newCountSeriesAggrState() *countSeriesAggrState { return &countSeriesAggrState{} } -func (as *countSeriesAggrState) pushSamples(samples []pushSample) { +func (as *countSeriesAggrState) pushSamples(samples []pushSample, deleteDeadline int64, _ bool) { for i := range samples { s := &samples[i] inputKey, outputKey := getInputOutputKey(s.key) @@ -36,18 +37,14 @@ func (as *countSeriesAggrState) pushSamples(samples []pushSample) { if !ok { // The entry is missing in the map. Try creating it. v = &countSeriesStateValue{ - m: map[uint64]struct{}{ - h: {}, - }, + m: make(map[uint64]struct{}), } outputKey = bytesutil.InternString(outputKey) vNew, loaded := as.m.LoadOrStore(outputKey, v) - if !loaded { - // The entry has been added to the map. - continue + if loaded { + // Update the entry created by a concurrent goroutine. + v = vNew } - // Update the entry created by a concurrent goroutine. - v = vNew } sv := v.(*countSeriesStateValue) sv.mu.Lock() @@ -56,6 +53,7 @@ func (as *countSeriesAggrState) pushSamples(samples []pushSample) { if _, ok := sv.m[h]; !ok { sv.m[h] = struct{}{} } + sv.deleteDeadline = deleteDeadline } sv.mu.Unlock() if deleted { @@ -74,9 +72,21 @@ func (as *countSeriesAggrState) flushState(ctx *flushCtx) { sv := v.(*countSeriesStateValue) sv.mu.Lock() + if ctx.flushTimestamp > sv.deleteDeadline { + sv.deleted = true + sv.mu.Unlock() + key := k.(string) + ctx.a.lc.Delete(bytesutil.ToUnsafeBytes(key), ctx.flushTimestamp) + m.Delete(k) + return true + } n := len(sv.m) - // Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls. - sv.deleted = true + if n == 0 { + sv.mu.Unlock() + return true + } + + sv.m = make(map[uint64]struct{}) sv.mu.Unlock() key := k.(string) diff --git a/lib/streamaggr/dedup.go b/lib/streamaggr/dedup.go index 1374a0e74..7461a870e 100644 --- a/lib/streamaggr/dedup.go +++ b/lib/streamaggr/dedup.go @@ -8,12 +8,14 @@ import ( "github.com/cespare/xxhash/v2" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils" "github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil" ) const dedupAggrShardsCount = 128 type dedupAggr struct { + lc *promutils.LabelsCompressor shards []dedupAggrShard } @@ -26,7 +28,7 @@ type dedupAggrShard struct { } type dedupAggrShardNopad struct { - mu sync.Mutex + mu sync.RWMutex m map[string]*dedupAggrSample samplesBuf []dedupAggrSample @@ -36,14 +38,16 @@ type dedupAggrShardNopad struct { } type dedupAggrSample struct { - value float64 - timestamp int64 + value float64 + timestamp int64 + deleteDeadline int64 } -func newDedupAggr() *dedupAggr { +func newDedupAggr(lc *promutils.LabelsCompressor) *dedupAggr { shards := make([]dedupAggrShard, dedupAggrShardsCount) return &dedupAggr{ shards: shards, + lc: lc, } } @@ -63,7 +67,7 @@ func (da *dedupAggr) itemsCount() uint64 { return n } -func (da *dedupAggr) pushSamples(samples []pushSample) { +func (da *dedupAggr) pushSamples(samples []pushSample, deleteDeadline int64) { pss := getPerShardSamples() shards := pss.shards for _, sample := range samples { @@ -75,17 +79,21 @@ func (da *dedupAggr) pushSamples(samples []pushSample) { if len(shardSamples) == 0 { continue } - da.shards[i].pushSamples(shardSamples) + da.shards[i].pushSamples(shardSamples, deleteDeadline) } putPerShardSamples(pss) } -func getDedupFlushCtx() *dedupFlushCtx { +func getDedupFlushCtx(flushTimestamp, deleteDeadline int64, lc *promutils.LabelsCompressor) *dedupFlushCtx { v := dedupFlushCtxPool.Get() if v == nil { - return &dedupFlushCtx{} + v = &dedupFlushCtx{} } - return v.(*dedupFlushCtx) + ctx := v.(*dedupFlushCtx) + ctx.lc = lc + ctx.deleteDeadline = deleteDeadline + ctx.flushTimestamp = flushTimestamp + return ctx } func putDedupFlushCtx(ctx *dedupFlushCtx) { @@ -96,15 +104,24 @@ func putDedupFlushCtx(ctx *dedupFlushCtx) { var dedupFlushCtxPool sync.Pool type dedupFlushCtx struct { - samples []pushSample + keysToDelete []string + samples []pushSample + deleteDeadline int64 + flushTimestamp int64 + lc *promutils.LabelsCompressor } func (ctx *dedupFlushCtx) reset() { + ctx.deleteDeadline = 0 + ctx.flushTimestamp = 0 + ctx.lc = nil + clear(ctx.keysToDelete) + ctx.keysToDelete = ctx.keysToDelete[:0] clear(ctx.samples) ctx.samples = ctx.samples[:0] } -func (da *dedupAggr) flush(f func(samples []pushSample)) { +func (da *dedupAggr) flush(f func(samples []pushSample, deleteDeadline int64), flushTimestamp, deleteDeadline int64) { var wg sync.WaitGroup for i := range da.shards { flushConcurrencyCh <- struct{}{} @@ -115,7 +132,7 @@ func (da *dedupAggr) flush(f func(samples []pushSample)) { wg.Done() }() - ctx := getDedupFlushCtx() + ctx := getDedupFlushCtx(flushTimestamp, deleteDeadline, da.lc) shard.flush(ctx, f) putDedupFlushCtx(ctx) }(&da.shards[i]) @@ -154,7 +171,7 @@ func putPerShardSamples(pss *perShardSamples) { var perShardSamplesPool sync.Pool -func (das *dedupAggrShard) pushSamples(samples []pushSample) { +func (das *dedupAggrShard) pushSamples(samples []pushSample, deleteDeadline int64) { das.mu.Lock() defer das.mu.Unlock() @@ -171,6 +188,7 @@ func (das *dedupAggrShard) pushSamples(samples []pushSample) { s = &samplesBuf[len(samplesBuf)-1] s.value = sample.value s.timestamp = sample.timestamp + s.deleteDeadline = deleteDeadline key := bytesutil.InternString(sample.key) m[key] = s @@ -183,30 +201,28 @@ func (das *dedupAggrShard) pushSamples(samples []pushSample) { if sample.timestamp > s.timestamp || (sample.timestamp == s.timestamp && sample.value > s.value) { s.value = sample.value s.timestamp = sample.timestamp + s.deleteDeadline = deleteDeadline } } das.samplesBuf = samplesBuf } -func (das *dedupAggrShard) flush(ctx *dedupFlushCtx, f func(samples []pushSample)) { - das.mu.Lock() - - m := das.m - if len(m) > 0 { - das.m = make(map[string]*dedupAggrSample, len(m)) - das.sizeBytes.Store(0) - das.itemsCount.Store(0) - das.samplesBuf = make([]dedupAggrSample, 0, len(das.samplesBuf)) - } - - das.mu.Unlock() - - if len(m) == 0 { +func (das *dedupAggrShard) flush(ctx *dedupFlushCtx, f func(samples []pushSample, deleteDeadline int64)) { + if len(das.m) == 0 { return } + keysToDelete := ctx.keysToDelete dstSamples := ctx.samples - for key, s := range m { + das.mu.RLock() + for key, s := range das.m { + if ctx.flushTimestamp > s.deleteDeadline { + das.itemsCount.Add(^uint64(0)) + //ctx.lc.Delete(key) + das.sizeBytes.Add(^(uint64(len(key)) + uint64(unsafe.Sizeof(key)+unsafe.Sizeof(s)+unsafe.Sizeof(*s)) - 1)) + keysToDelete = append(keysToDelete, key) + continue + } dstSamples = append(dstSamples, pushSample{ key: key, value: s.value, @@ -215,11 +231,17 @@ func (das *dedupAggrShard) flush(ctx *dedupFlushCtx, f func(samples []pushSample // Limit the number of samples per each flush in order to limit memory usage. if len(dstSamples) >= 10_000 { - f(dstSamples) + f(dstSamples, ctx.deleteDeadline) clear(dstSamples) dstSamples = dstSamples[:0] } } - f(dstSamples) + das.mu.RUnlock() + das.mu.Lock() + for _, key := range keysToDelete { + delete(das.m, key) + } + das.mu.Unlock() + f(dstSamples, ctx.deleteDeadline) ctx.samples = dstSamples } diff --git a/lib/streamaggr/dedup_test.go b/lib/streamaggr/dedup_test.go index 91ce09e13..1d540d690 100644 --- a/lib/streamaggr/dedup_test.go +++ b/lib/streamaggr/dedup_test.go @@ -5,10 +5,13 @@ import ( "reflect" "sync" "testing" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils" ) func TestDedupAggrSerial(t *testing.T) { - da := newDedupAggr() + var lc promutils.LabelsCompressor + da := newDedupAggr(&lc) const seriesCount = 100_000 expectedSamplesMap := make(map[string]pushSample) @@ -20,11 +23,11 @@ func TestDedupAggrSerial(t *testing.T) { sample.value = float64(i + j) expectedSamplesMap[sample.key] = *sample } - da.pushSamples(samples) + da.pushSamples(samples, 0) } - if n := da.sizeBytes(); n > 5_000_000 { - t.Fatalf("too big dedupAggr state before flush: %d bytes; it shouldn't exceed 5_000_000 bytes", n) + if n := da.sizeBytes(); n > 6_000_000 { + t.Fatalf("too big dedupAggr state before flush: %d bytes; it shouldn't exceed 6_000_000 bytes", n) } if n := da.itemsCount(); n != seriesCount { t.Fatalf("unexpected itemsCount; got %d; want %d", n, seriesCount) @@ -32,19 +35,21 @@ func TestDedupAggrSerial(t *testing.T) { flushedSamplesMap := make(map[string]pushSample) var mu sync.Mutex - flushSamples := func(samples []pushSample) { + flushSamples := func(samples []pushSample, _ int64) { mu.Lock() for _, sample := range samples { flushedSamplesMap[sample.key] = sample } mu.Unlock() } - da.flush(flushSamples) + da.flush(flushSamples, 0, 0) if !reflect.DeepEqual(expectedSamplesMap, flushedSamplesMap) { t.Fatalf("unexpected samples;\ngot\n%v\nwant\n%v", flushedSamplesMap, expectedSamplesMap) } + da.flush(flushSamples, 1, 0) + if n := da.sizeBytes(); n > 17_000 { t.Fatalf("too big dedupAggr state after flush; %d bytes; it shouldn't exceed 17_000 bytes", n) } @@ -54,9 +59,10 @@ func TestDedupAggrSerial(t *testing.T) { } func TestDedupAggrConcurrent(_ *testing.T) { + var lc promutils.LabelsCompressor const concurrency = 5 const seriesCount = 10_000 - da := newDedupAggr() + da := newDedupAggr(&lc) var wg sync.WaitGroup for i := 0; i < concurrency; i++ { @@ -70,7 +76,7 @@ func TestDedupAggrConcurrent(_ *testing.T) { sample.key = fmt.Sprintf("key_%d", j) sample.value = float64(i + j) } - da.pushSamples(samples) + da.pushSamples(samples, 0) } }() } diff --git a/lib/streamaggr/dedup_timing_test.go b/lib/streamaggr/dedup_timing_test.go index 2b6bab25c..8e9e2c6fb 100644 --- a/lib/streamaggr/dedup_timing_test.go +++ b/lib/streamaggr/dedup_timing_test.go @@ -6,6 +6,7 @@ import ( "testing" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils" ) func BenchmarkDedupAggr(b *testing.B) { @@ -17,9 +18,11 @@ func BenchmarkDedupAggr(b *testing.B) { } func benchmarkDedupAggr(b *testing.B, samplesPerPush int) { + var lc promutils.LabelsCompressor const loops = 2 benchSamples := newBenchSamples(samplesPerPush) - da := newDedupAggr() + + da := newDedupAggr(&lc) b.ResetTimer() b.ReportAllocs() @@ -27,13 +30,14 @@ func benchmarkDedupAggr(b *testing.B, samplesPerPush int) { b.RunParallel(func(pb *testing.PB) { for pb.Next() { for i := 0; i < loops; i++ { - da.pushSamples(benchSamples) + da.pushSamples(benchSamples, 0) } } }) } func newBenchSamples(count int) []pushSample { + var lc promutils.LabelsCompressor labels := []prompbmarshal.Label{ { Name: "app", @@ -65,7 +69,7 @@ func newBenchSamples(count int) []pushSample { Name: "app", Value: fmt.Sprintf("instance-%d", i), }) - keyBuf = compressLabels(keyBuf[:0], labels[:labelsLen], labels[labelsLen:]) + keyBuf = compressLabels(keyBuf[:0], &lc, labels[:labelsLen], labels[labelsLen:], false, 0) sample.key = string(keyBuf) sample.value = float64(i) } diff --git a/lib/streamaggr/deduplicator.go b/lib/streamaggr/deduplicator.go index 273fd6eef..60f091f65 100644 --- a/lib/streamaggr/deduplicator.go +++ b/lib/streamaggr/deduplicator.go @@ -16,8 +16,10 @@ import ( // Deduplicator deduplicates samples per each time series. type Deduplicator struct { da *dedupAggr + lc *promutils.LabelsCompressor - dropLabels []string + dropLabels []string + stalenessInterval int64 wg sync.WaitGroup stopCh chan struct{} @@ -40,13 +42,16 @@ type Deduplicator struct { // MustStop must be called on the returned deduplicator in order to free up occupied resources. func NewDeduplicator(pushFunc PushFunc, dedupInterval time.Duration, dropLabels []string, alias string) *Deduplicator { d := &Deduplicator{ - da: newDedupAggr(), - dropLabels: dropLabels, + dropLabels: dropLabels, + stalenessInterval: 2 * dedupInterval.Milliseconds(), + lc: promutils.NewLabelsCompressor(), stopCh: make(chan struct{}), ms: metrics.NewSet(), } + d.da = newDedupAggr(d.lc) + ms := d.ms metricLabels := fmt.Sprintf(`name="dedup",url=%q`, alias) @@ -57,6 +62,12 @@ func NewDeduplicator(pushFunc PushFunc, dedupInterval time.Duration, dropLabels _ = ms.NewGauge(fmt.Sprintf(`vm_streamaggr_dedup_state_items_count{%s}`, metricLabels), func() float64 { return float64(d.da.itemsCount()) }) + _ = ms.NewGauge(fmt.Sprintf(`vm_streamaggr_labels_compressor_size_bytes{%s}`, metricLabels), func() float64 { + return float64(d.lc.SizeBytes()) + }) + _ = ms.NewGauge(fmt.Sprintf(`vm_streamaggr_labels_compressor_items_count{%s}`, metricLabels), func() float64 { + return float64(d.lc.ItemsCount()) + }) d.dedupFlushDuration = ms.NewHistogram(fmt.Sprintf(`vm_streamaggr_dedup_flush_duration_seconds{%s}`, metricLabels)) d.dedupFlushTimeouts = ms.NewCounter(fmt.Sprintf(`vm_streamaggr_dedup_flush_timeouts_total{%s}`, metricLabels)) @@ -87,6 +98,7 @@ func (d *Deduplicator) Push(tss []prompbmarshal.TimeSeries) { pss := ctx.pss labels := &ctx.labels buf := ctx.buf + deleteDeadline := time.Now().UnixMilli() + d.stalenessInterval dropLabels := d.dropLabels for _, ts := range tss { @@ -101,7 +113,7 @@ func (d *Deduplicator) Push(tss []prompbmarshal.TimeSeries) { labels.Sort() bufLen := len(buf) - buf = lc.Compress(buf, labels.Labels) + buf = d.lc.Compress(buf, labels.Labels, deleteDeadline) key := bytesutil.ToUnsafeString(buf[bufLen:]) for _, s := range ts.Samples { pss = append(pss, pushSample{ @@ -112,7 +124,7 @@ func (d *Deduplicator) Push(tss []prompbmarshal.TimeSeries) { } } - d.da.pushSamples(pss) + d.da.pushSamples(pss, deleteDeadline) ctx.pss = pss ctx.buf = buf @@ -145,7 +157,8 @@ func (d *Deduplicator) flush(pushFunc PushFunc, dedupInterval time.Duration) { startTime := time.Now() timestamp := startTime.UnixMilli() - d.da.flush(func(pss []pushSample) { + deleteDeadline := timestamp + d.stalenessInterval + d.da.flush(func(pss []pushSample, deleteDeadline int64) { ctx := getDeduplicatorFlushCtx() tss := ctx.tss @@ -153,7 +166,7 @@ func (d *Deduplicator) flush(pushFunc PushFunc, dedupInterval time.Duration) { samples := ctx.samples for _, ps := range pss { labelsLen := len(labels) - labels = decompressLabels(labels, ps.key) + labels = decompressLabels(labels, d.lc, ps.key) samplesLen := len(samples) samples = append(samples, prompbmarshal.Sample{ @@ -172,7 +185,7 @@ func (d *Deduplicator) flush(pushFunc PushFunc, dedupInterval time.Duration) { ctx.labels = labels ctx.samples = samples putDeduplicatorFlushCtx(ctx) - }) + }, timestamp, deleteDeadline) duration := time.Since(startTime) d.dedupFlushDuration.Update(duration.Seconds()) diff --git a/lib/streamaggr/histogram_bucket.go b/lib/streamaggr/histogram_bucket.go index fdf7b3fad..db43bb6a5 100644 --- a/lib/streamaggr/histogram_bucket.go +++ b/lib/streamaggr/histogram_bucket.go @@ -1,42 +1,32 @@ package streamaggr import ( - "math" "sync" - "time" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/metrics" ) // histogramBucketAggrState calculates output=histogram_bucket, e.g. VictoriaMetrics histogram over input samples. type histogramBucketAggrState struct { m sync.Map - - stalenessSecs uint64 } type histogramBucketStateValue struct { mu sync.Mutex h metrics.Histogram - deleteDeadline uint64 + deleteDeadline int64 deleted bool } -func newHistogramBucketAggrState(stalenessInterval time.Duration) *histogramBucketAggrState { - stalenessSecs := roundDurationToSecs(stalenessInterval) - return &histogramBucketAggrState{ - stalenessSecs: stalenessSecs, - } +func newHistogramBucketAggrState() *histogramBucketAggrState { + return &histogramBucketAggrState{} } -func (as *histogramBucketAggrState) pushSamples(samples []pushSample) { - currentTime := fasttime.UnixTimestamp() - deleteDeadline := currentTime + as.stalenessSecs +func (as *histogramBucketAggrState) pushSamples(samples []pushSample, deleteDeadline int64, includeInputKey bool) { for i := range samples { s := &samples[i] - outputKey := getOutputKey(s.key) + outputKey := getOutputKey(s.key, includeInputKey) again: v, ok := as.m.Load(outputKey) @@ -66,13 +56,13 @@ func (as *histogramBucketAggrState) pushSamples(samples []pushSample) { } } -func (as *histogramBucketAggrState) removeOldEntries(currentTime uint64) { +func (as *histogramBucketAggrState) removeOldEntries(ctx *flushCtx) { m := &as.m m.Range(func(k, v any) bool { sv := v.(*histogramBucketStateValue) sv.mu.Lock() - deleted := currentTime > sv.deleteDeadline + deleted := ctx.flushTimestamp > sv.deleteDeadline if deleted { // Mark the current entry as deleted sv.deleted = deleted @@ -80,6 +70,8 @@ func (as *histogramBucketAggrState) removeOldEntries(currentTime uint64) { sv.mu.Unlock() if deleted { + key := k.(string) + ctx.a.lc.Delete(bytesutil.ToUnsafeBytes(key), ctx.flushTimestamp) m.Delete(k) } return true @@ -87,9 +79,7 @@ func (as *histogramBucketAggrState) removeOldEntries(currentTime uint64) { } func (as *histogramBucketAggrState) flushState(ctx *flushCtx) { - currentTime := fasttime.UnixTimestamp() - - as.removeOldEntries(currentTime) + as.removeOldEntries(ctx) m := &as.m m.Range(func(k, v any) bool { @@ -105,11 +95,3 @@ func (as *histogramBucketAggrState) flushState(ctx *flushCtx) { return true }) } - -func roundDurationToSecs(d time.Duration) uint64 { - if d < 0 { - return 0 - } - secs := d.Seconds() - return uint64(math.Ceil(secs)) -} diff --git a/lib/streamaggr/last.go b/lib/streamaggr/last.go index 0ae4b9b8c..8710f67bb 100644 --- a/lib/streamaggr/last.go +++ b/lib/streamaggr/last.go @@ -12,46 +12,45 @@ type lastAggrState struct { } type lastStateValue struct { - mu sync.Mutex - last float64 - timestamp int64 - deleted bool + mu sync.Mutex + last float64 + timestamp int64 + deleted bool + defined bool + deleteDeadline int64 } func newLastAggrState() *lastAggrState { return &lastAggrState{} } -func (as *lastAggrState) pushSamples(samples []pushSample) { +func (as *lastAggrState) pushSamples(samples []pushSample, deleteDeadline int64, includeInputKey bool) { for i := range samples { s := &samples[i] - outputKey := getOutputKey(s.key) + outputKey := getOutputKey(s.key, includeInputKey) again: v, ok := as.m.Load(outputKey) if !ok { // The entry is missing in the map. Try creating it. - v = &lastStateValue{ - last: s.value, - timestamp: s.timestamp, - } + v = &lastStateValue{} outputKey = bytesutil.InternString(outputKey) vNew, loaded := as.m.LoadOrStore(outputKey, v) - if !loaded { - // The new entry has been successfully created. - continue + if loaded { + // Use the entry created by a concurrent goroutine. + v = vNew } - // Use the entry created by a concurrent goroutine. - v = vNew } sv := v.(*lastStateValue) sv.mu.Lock() deleted := sv.deleted if !deleted { - if s.timestamp >= sv.timestamp { + if !sv.defined || s.timestamp >= sv.timestamp { sv.last = s.value sv.timestamp = s.timestamp + sv.deleteDeadline = deleteDeadline } + sv.defined = true } sv.mu.Unlock() if deleted { @@ -70,6 +69,14 @@ func (as *lastAggrState) flushState(ctx *flushCtx) { sv := v.(*lastStateValue) sv.mu.Lock() + if ctx.flushTimestamp > sv.deleteDeadline { + sv.deleted = true + sv.mu.Unlock() + key := k.(string) + ctx.a.lc.Delete(bytesutil.ToUnsafeBytes(key), ctx.flushTimestamp) + m.Delete(k) + return true + } last := sv.last // Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls. sv.deleted = true diff --git a/lib/streamaggr/max.go b/lib/streamaggr/max.go index 9197d3add..76bee0b7d 100644 --- a/lib/streamaggr/max.go +++ b/lib/streamaggr/max.go @@ -12,35 +12,33 @@ type maxAggrState struct { } type maxStateValue struct { - mu sync.Mutex - max float64 - deleted bool + mu sync.Mutex + max float64 + deleted bool + defined bool + deleteDeadline int64 } func newMaxAggrState() *maxAggrState { return &maxAggrState{} } -func (as *maxAggrState) pushSamples(samples []pushSample) { +func (as *maxAggrState) pushSamples(samples []pushSample, deleteDeadline int64, includeInputKey bool) { for i := range samples { s := &samples[i] - outputKey := getOutputKey(s.key) + outputKey := getOutputKey(s.key, includeInputKey) again: v, ok := as.m.Load(outputKey) if !ok { // The entry is missing in the map. Try creating it. - v = &maxStateValue{ - max: s.value, - } + v = &maxStateValue{} outputKey = bytesutil.InternString(outputKey) vNew, loaded := as.m.LoadOrStore(outputKey, v) - if !loaded { - // The new entry has been successfully created. - continue + if loaded { + // Use the entry created by a concurrent goroutine. + v = vNew } - // Use the entry created by a concurrent goroutine. - v = vNew } sv := v.(*maxStateValue) sv.mu.Lock() @@ -49,6 +47,10 @@ func (as *maxAggrState) pushSamples(samples []pushSample) { if s.value > sv.max { sv.max = s.value } + if !sv.defined { + sv.defined = true + } + sv.deleteDeadline = deleteDeadline } sv.mu.Unlock() if deleted { @@ -67,9 +69,21 @@ func (as *maxAggrState) flushState(ctx *flushCtx) { sv := v.(*maxStateValue) sv.mu.Lock() + if ctx.flushTimestamp > sv.deleteDeadline { + sv.deleted = true + sv.mu.Unlock() + key := k.(string) + ctx.a.lc.Delete(bytesutil.ToUnsafeBytes(key), ctx.flushTimestamp) + m.Delete(k) + return true + } + if !sv.defined { + sv.mu.Unlock() + return true + } max := sv.max - // Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls. - sv.deleted = true + sv.max = 0 + sv.defined = false sv.mu.Unlock() key := k.(string) diff --git a/lib/streamaggr/min.go b/lib/streamaggr/min.go index 308f259c7..1b0730a0b 100644 --- a/lib/streamaggr/min.go +++ b/lib/streamaggr/min.go @@ -12,43 +12,45 @@ type minAggrState struct { } type minStateValue struct { - mu sync.Mutex - min float64 - deleted bool + mu sync.Mutex + min float64 + deleted bool + defined bool + deleteDeadline int64 } func newMinAggrState() *minAggrState { return &minAggrState{} } -func (as *minAggrState) pushSamples(samples []pushSample) { +func (as *minAggrState) pushSamples(samples []pushSample, deleteDeadline int64, includeInputKey bool) { for i := range samples { s := &samples[i] - outputKey := getOutputKey(s.key) + outputKey := getOutputKey(s.key, includeInputKey) again: v, ok := as.m.Load(outputKey) if !ok { // The entry is missing in the map. Try creating it. - v = &minStateValue{ - min: s.value, - } + v = &minStateValue{} outputKey = bytesutil.InternString(outputKey) vNew, loaded := as.m.LoadOrStore(outputKey, v) - if !loaded { - // The new entry has been successfully created. - continue + if loaded { + // Use the entry created by a concurrent goroutine. + v = vNew } - // Use the entry created by a concurrent goroutine. - v = vNew } sv := v.(*minStateValue) sv.mu.Lock() deleted := sv.deleted if !deleted { - if s.value < sv.min { + if !sv.defined || s.value < sv.min { sv.min = s.value } + sv.deleteDeadline = deleteDeadline + if !sv.defined { + sv.defined = true + } } sv.mu.Unlock() if deleted { @@ -67,10 +69,23 @@ func (as *minAggrState) flushState(ctx *flushCtx) { sv := v.(*minStateValue) sv.mu.Lock() + if ctx.flushTimestamp > sv.deleteDeadline { + sv.deleted = true + sv.mu.Unlock() + key := k.(string) + ctx.a.lc.Delete(bytesutil.ToUnsafeBytes(key), ctx.flushTimestamp) + m.Delete(k) + return true + } + if !sv.defined { + sv.mu.Unlock() + return true + } min := sv.min - // Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls. - sv.deleted = true + sv.min = 0 + sv.defined = false sv.mu.Unlock() + key := k.(string) ctx.appendSeries(key, "min", min) return true diff --git a/lib/streamaggr/quantiles.go b/lib/streamaggr/quantiles.go index 00fefe877..96a7fb8c4 100644 --- a/lib/streamaggr/quantiles.go +++ b/lib/streamaggr/quantiles.go @@ -16,9 +16,10 @@ type quantilesAggrState struct { } type quantilesStateValue struct { - mu sync.Mutex - h *histogram.Fast - deleted bool + mu sync.Mutex + h *histogram.Fast + deleted bool + deleteDeadline int64 } func newQuantilesAggrState(phis []float64) *quantilesAggrState { @@ -27,10 +28,10 @@ func newQuantilesAggrState(phis []float64) *quantilesAggrState { } } -func (as *quantilesAggrState) pushSamples(samples []pushSample) { +func (as *quantilesAggrState) pushSamples(samples []pushSample, deleteDeadline int64, includeInputKey bool) { for i := range samples { s := &samples[i] - outputKey := getOutputKey(s.key) + outputKey := getOutputKey(s.key, includeInputKey) again: v, ok := as.m.Load(outputKey) @@ -53,6 +54,7 @@ func (as *quantilesAggrState) pushSamples(samples []pushSample) { deleted := sv.deleted if !deleted { sv.h.Update(s.value) + sv.deleteDeadline = deleteDeadline } sv.mu.Unlock() if deleted { @@ -74,10 +76,16 @@ func (as *quantilesAggrState) flushState(ctx *flushCtx) { sv := v.(*quantilesStateValue) sv.mu.Lock() + if ctx.flushTimestamp > sv.deleteDeadline { + sv.deleted = true + sv.mu.Unlock() + key := k.(string) + ctx.a.lc.Delete(bytesutil.ToUnsafeBytes(key), ctx.flushTimestamp) + m.Delete(k) + return true + } quantiles = sv.h.Quantiles(quantiles[:0], phis) - histogram.PutFast(sv.h) - // Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls. - sv.deleted = true + sv.h.Reset() sv.mu.Unlock() key := k.(string) diff --git a/lib/streamaggr/rate.go b/lib/streamaggr/rate.go index 1c3f2fb5c..39d541cd1 100644 --- a/lib/streamaggr/rate.go +++ b/lib/streamaggr/rate.go @@ -2,10 +2,8 @@ package streamaggr import ( "sync" - "time" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" ) // rateAggrState calculates output=rate_avg and rate_sum, e.g. the average per-second increase rate for counter metrics. @@ -14,22 +12,19 @@ type rateAggrState struct { // isAvg is set to true if rate_avg() must be calculated instead of rate_sum(). isAvg bool - - // Time series state is dropped if no new samples are received during stalenessSecs. - stalenessSecs uint64 } type rateStateValue struct { mu sync.Mutex lastValues map[string]rateLastValueState - deleteDeadline uint64 + deleteDeadline int64 deleted bool } type rateLastValueState struct { value float64 timestamp int64 - deleteDeadline uint64 + deleteDeadline int64 // increase stores cumulative increase for the current time series on the current aggregation interval increase float64 @@ -38,17 +33,13 @@ type rateLastValueState struct { prevTimestamp int64 } -func newRateAggrState(stalenessInterval time.Duration, isAvg bool) *rateAggrState { - stalenessSecs := roundDurationToSecs(stalenessInterval) +func newRateAggrState(isAvg bool) *rateAggrState { return &rateAggrState{ - isAvg: isAvg, - stalenessSecs: stalenessSecs, + isAvg: isAvg, } } -func (as *rateAggrState) pushSamples(samples []pushSample) { - currentTime := fasttime.UnixTimestamp() - deleteDeadline := currentTime + as.stalenessSecs +func (as *rateAggrState) pushSamples(samples []pushSample, deleteDeadline int64, _ bool) { for i := range samples { s := &samples[i] inputKey, outputKey := getInputOutputKey(s.key) @@ -106,11 +97,9 @@ func (as *rateAggrState) pushSamples(samples []pushSample) { } func (as *rateAggrState) flushState(ctx *flushCtx) { - currentTime := fasttime.UnixTimestamp() - suffix := as.getSuffix() - as.removeOldEntries(currentTime) + as.removeOldEntries(ctx) m := &as.m m.Range(func(k, v any) bool { @@ -156,16 +145,18 @@ func (as *rateAggrState) getSuffix() string { return "rate_sum" } -func (as *rateAggrState) removeOldEntries(currentTime uint64) { +func (as *rateAggrState) removeOldEntries(ctx *flushCtx) { m := &as.m m.Range(func(k, v any) bool { sv := v.(*rateStateValue) sv.mu.Lock() - if currentTime > sv.deleteDeadline { + if ctx.flushTimestamp > sv.deleteDeadline { // Mark the current entry as deleted sv.deleted = true sv.mu.Unlock() + key := k.(string) + ctx.a.lc.Delete(bytesutil.ToUnsafeBytes(key), ctx.flushTimestamp) m.Delete(k) return true } @@ -173,7 +164,7 @@ func (as *rateAggrState) removeOldEntries(currentTime uint64) { // Delete outdated entries in sv.lastValues lvs := sv.lastValues for k1, lv := range lvs { - if currentTime > lv.deleteDeadline { + if ctx.flushTimestamp > lv.deleteDeadline { delete(lvs, k1) } } diff --git a/lib/streamaggr/stddev.go b/lib/streamaggr/stddev.go index 053e0f209..38fd95693 100644 --- a/lib/streamaggr/stddev.go +++ b/lib/streamaggr/stddev.go @@ -13,21 +13,22 @@ type stddevAggrState struct { } type stddevStateValue struct { - mu sync.Mutex - count float64 - avg float64 - q float64 - deleted bool + mu sync.Mutex + count float64 + avg float64 + q float64 + deleted bool + deleteDeadline int64 } func newStddevAggrState() *stddevAggrState { return &stddevAggrState{} } -func (as *stddevAggrState) pushSamples(samples []pushSample) { +func (as *stddevAggrState) pushSamples(samples []pushSample, deleteDeadline int64, includeInputKey bool) { for i := range samples { s := &samples[i] - outputKey := getOutputKey(s.key) + outputKey := getOutputKey(s.key, includeInputKey) again: v, ok := as.m.Load(outputKey) @@ -50,6 +51,7 @@ func (as *stddevAggrState) pushSamples(samples []pushSample) { avg := sv.avg + (s.value-sv.avg)/sv.count sv.q += (s.value - sv.avg) * (s.value - avg) sv.avg = avg + sv.deleteDeadline = deleteDeadline } sv.mu.Unlock() if deleted { @@ -68,9 +70,23 @@ func (as *stddevAggrState) flushState(ctx *flushCtx) { sv := v.(*stddevStateValue) sv.mu.Lock() + if ctx.flushTimestamp > sv.deleteDeadline { + sv.deleted = true + sv.mu.Unlock() + key := k.(string) + ctx.a.lc.Delete(bytesutil.ToUnsafeBytes(key), ctx.flushTimestamp) + m.Delete(k) + return true + } + if sv.count == 0 { + sv.mu.Unlock() + return true + } + stddev := math.Sqrt(sv.q / sv.count) - // Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls. - sv.deleted = true + sv.q = 0 + sv.count = 0 + sv.avg = 0 sv.mu.Unlock() key := k.(string) diff --git a/lib/streamaggr/stdvar.go b/lib/streamaggr/stdvar.go index 8170f227a..8b4f57c84 100644 --- a/lib/streamaggr/stdvar.go +++ b/lib/streamaggr/stdvar.go @@ -12,21 +12,22 @@ type stdvarAggrState struct { } type stdvarStateValue struct { - mu sync.Mutex - count float64 - avg float64 - q float64 - deleted bool + mu sync.Mutex + count float64 + avg float64 + q float64 + deleted bool + deleteDeadline int64 } func newStdvarAggrState() *stdvarAggrState { return &stdvarAggrState{} } -func (as *stdvarAggrState) pushSamples(samples []pushSample) { +func (as *stdvarAggrState) pushSamples(samples []pushSample, deleteDeadline int64, includeInputKey bool) { for i := range samples { s := &samples[i] - outputKey := getOutputKey(s.key) + outputKey := getOutputKey(s.key, includeInputKey) again: v, ok := as.m.Load(outputKey) @@ -49,6 +50,7 @@ func (as *stdvarAggrState) pushSamples(samples []pushSample) { avg := sv.avg + (s.value-sv.avg)/sv.count sv.q += (s.value - sv.avg) * (s.value - avg) sv.avg = avg + sv.deleteDeadline = deleteDeadline } sv.mu.Unlock() if deleted { @@ -67,9 +69,22 @@ func (as *stdvarAggrState) flushState(ctx *flushCtx) { sv := v.(*stdvarStateValue) sv.mu.Lock() + if ctx.flushTimestamp > sv.deleteDeadline { + sv.deleted = true + sv.mu.Unlock() + key := k.(string) + ctx.a.lc.Delete(bytesutil.ToUnsafeBytes(key), ctx.flushTimestamp) + m.Delete(k) + return true + } + if sv.count == 0 { + sv.mu.Unlock() + return true + } stdvar := sv.q / sv.count - // Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls. - sv.deleted = true + sv.q = 0 + sv.count = 0 + sv.avg = 0 sv.mu.Unlock() key := k.(string) diff --git a/lib/streamaggr/streamaggr.go b/lib/streamaggr/streamaggr.go index 6188f8b80..4b27f4c7a 100644 --- a/lib/streamaggr/streamaggr.go +++ b/lib/streamaggr/streamaggr.go @@ -47,19 +47,6 @@ var supportedOutputs = []string{ "unique_samples", } -var ( - // lc contains information about all compressed labels for streaming aggregation - lc promutils.LabelsCompressor - - _ = metrics.NewGauge(`vm_streamaggr_labels_compressor_size_bytes`, func() float64 { - return float64(lc.SizeBytes()) - }) - - _ = metrics.NewGauge(`vm_streamaggr_labels_compressor_items_count`, func() float64 { - return float64(lc.ItemsCount()) - }) -) - // LoadFromFile loads Aggregators from the given path and uses the given pushFunc for pushing the aggregated data. // // opts can contain additional options. If opts is nil, then default options are used. @@ -373,13 +360,15 @@ type aggregator struct { keepMetricNames bool ignoreOldSamples bool + includeInputKey bool by []string without []string aggregateOnlyByTime bool // interval is the interval between flushes - interval time.Duration + interval time.Duration + stalenessInterval int64 // dedupInterval is optional deduplication interval for incoming samples dedupInterval time.Duration @@ -390,6 +379,9 @@ type aggregator struct { // aggrOutputs contains aggregate states for the given outputs aggrOutputs []aggrOutput + // lc is used for compressing series keys before passing them to dedupAggr and aggrState + lc *promutils.LabelsCompressor + // minTimestamp is used for ignoring old samples when ignoreOldSamples is set minTimestamp atomic.Int64 @@ -424,7 +416,7 @@ type aggrState interface { // pushSamples must push samples to the aggrState. // // samples[].key must be cloned by aggrState, since it may change after returning from pushSamples. - pushSamples(samples []pushSample) + pushSamples(samples []pushSample, deleteDeadline int64, includeInputKey bool) // flushState must flush aggrState data to ctx. flushState(ctx *flushCtx) @@ -556,11 +548,15 @@ func newAggregator(cfg *Config, path string, pushFunc PushFunc, ms *metrics.Set, } aggrOutputs := make([]aggrOutput, len(cfg.Outputs)) outputsSeen := make(map[string]struct{}, len(cfg.Outputs)) + includeInputKey := false for i, output := range cfg.Outputs { - as, err := newAggrState(output, outputsSeen, stalenessInterval) + as, ik, err := newAggrState(output, outputsSeen) if err != nil { return nil, err } + if ik { + includeInputKey = true + } aggrOutputs[i] = aggrOutput{ as: as, @@ -592,9 +588,11 @@ func newAggregator(cfg *Config, path string, pushFunc PushFunc, ms *metrics.Set, by: by, without: without, aggregateOnlyByTime: aggregateOnlyByTime, + lc: promutils.NewLabelsCompressor(), - interval: interval, - dedupInterval: dedupInterval, + interval: interval, + dedupInterval: dedupInterval, + stalenessInterval: stalenessInterval.Milliseconds(), aggrOutputs: aggrOutputs, @@ -614,18 +612,25 @@ func newAggregator(cfg *Config, path string, pushFunc PushFunc, ms *metrics.Set, } if dedupInterval > 0 { - a.da = newDedupAggr() + includeInputKey = true + a.da = newDedupAggr(a.lc) _ = ms.NewGauge(fmt.Sprintf(`vm_streamaggr_dedup_state_size_bytes{%s}`, metricLabels), func() float64 { - n := a.da.sizeBytes() - return float64(n) + return float64(a.da.sizeBytes()) }) _ = ms.NewGauge(fmt.Sprintf(`vm_streamaggr_dedup_state_items_count{%s}`, metricLabels), func() float64 { - n := a.da.itemsCount() - return float64(n) + return float64(a.da.itemsCount()) + }) + _ = ms.NewGauge(fmt.Sprintf(`vm_streamaggr_labels_compressor_size_bytes{%s}`, metricLabels), func() float64 { + return float64(a.lc.SizeBytes()) + }) + _ = ms.NewGauge(fmt.Sprintf(`vm_streamaggr_labels_compressor_items_count{%s}`, metricLabels), func() float64 { + return float64(a.lc.ItemsCount()) }) } + a.includeInputKey = includeInputKey + alignFlushToInterval := !opts.NoAlignFlushToInterval if v := cfg.NoAlignFlushToInterval; v != nil { alignFlushToInterval = !*v @@ -645,20 +650,20 @@ func newAggregator(cfg *Config, path string, pushFunc PushFunc, ms *metrics.Set, return a, nil } -func newAggrState(output string, outputsSeen map[string]struct{}, stalenessInterval time.Duration) (aggrState, error) { +func newAggrState(output string, outputsSeen map[string]struct{}) (aggrState, bool, error) { // check for duplicated output if _, ok := outputsSeen[output]; ok { - return nil, fmt.Errorf("`outputs` list contains duplicate aggregation function: %s", output) + return nil, true, fmt.Errorf("`outputs` list contains duplicate aggregation function: %s", output) } outputsSeen[output] = struct{}{} if strings.HasPrefix(output, "quantiles(") { if !strings.HasSuffix(output, ")") { - return nil, fmt.Errorf("missing closing brace for `quantiles()` output") + return nil, false, fmt.Errorf("missing closing brace for `quantiles()` output") } argsStr := output[len("quantiles(") : len(output)-1] if len(argsStr) == 0 { - return nil, fmt.Errorf("`quantiles()` must contain at least one phi") + return nil, false, fmt.Errorf("`quantiles()` must contain at least one phi") } args := strings.Split(argsStr, ",") phis := make([]float64, len(args)) @@ -666,57 +671,57 @@ func newAggrState(output string, outputsSeen map[string]struct{}, stalenessInter arg = strings.TrimSpace(arg) phi, err := strconv.ParseFloat(arg, 64) if err != nil { - return nil, fmt.Errorf("cannot parse phi=%q for quantiles(%s): %w", arg, argsStr, err) + return nil, false, fmt.Errorf("cannot parse phi=%q for quantiles(%s): %w", arg, argsStr, err) } if phi < 0 || phi > 1 { - return nil, fmt.Errorf("phi inside quantiles(%s) must be in the range [0..1]; got %v", argsStr, phi) + return nil, false, fmt.Errorf("phi inside quantiles(%s) must be in the range [0..1]; got %v", argsStr, phi) } phis[i] = phi } if _, ok := outputsSeen["quantiles"]; ok { - return nil, fmt.Errorf("`outputs` list contains duplicated `quantiles()` function, please combine multiple phi* like `quantiles(0.5, 0.9)`") + return nil, false, fmt.Errorf("`outputs` list contains duplicated `quantiles()` function, please combine multiple phi* like `quantiles(0.5, 0.9)`") } outputsSeen["quantiles"] = struct{}{} - return newQuantilesAggrState(phis), nil + return newQuantilesAggrState(phis), false, nil } switch output { case "avg": - return newAvgAggrState(), nil + return newAvgAggrState(), false, nil case "count_samples": - return newCountSamplesAggrState(), nil + return newCountSamplesAggrState(), false, nil case "count_series": - return newCountSeriesAggrState(), nil + return newCountSeriesAggrState(), true, nil case "histogram_bucket": - return newHistogramBucketAggrState(stalenessInterval), nil + return newHistogramBucketAggrState(), false, nil case "increase": - return newTotalAggrState(stalenessInterval, true, true), nil + return newTotalAggrState(true, true), true, nil case "increase_prometheus": - return newTotalAggrState(stalenessInterval, true, false), nil + return newTotalAggrState(true, false), true, nil case "last": - return newLastAggrState(), nil + return newLastAggrState(), false, nil case "max": - return newMaxAggrState(), nil + return newMaxAggrState(), false, nil case "min": - return newMinAggrState(), nil + return newMinAggrState(), false, nil case "rate_avg": - return newRateAggrState(stalenessInterval, true), nil + return newRateAggrState(true), true, nil case "rate_sum": - return newRateAggrState(stalenessInterval, false), nil + return newRateAggrState(false), true, nil case "stddev": - return newStddevAggrState(), nil + return newStddevAggrState(), false, nil case "stdvar": - return newStdvarAggrState(), nil + return newStdvarAggrState(), false, nil case "sum_samples": - return newSumSamplesAggrState(), nil + return newSumSamplesAggrState(), false, nil case "total": - return newTotalAggrState(stalenessInterval, false, true), nil + return newTotalAggrState(false, true), true, nil case "total_prometheus": - return newTotalAggrState(stalenessInterval, false, false), nil + return newTotalAggrState(false, false), true, nil case "unique_samples": - return newUniqueSamplesAggrState(), nil + return newUniqueSamplesAggrState(), false, nil default: - return nil, fmt.Errorf("unsupported output=%q; supported values: %s; see https://docs.victoriametrics.com/stream-aggregation/", output, supportedOutputs) + return nil, false, fmt.Errorf("unsupported output=%q; supported values: %s; see https://docs.victoriametrics.com/stream-aggregation/", output, supportedOutputs) } } @@ -823,8 +828,10 @@ func (a *aggregator) dedupFlush() { } startTime := time.Now() + timestamp := startTime.UnixMilli() + deleteDeadline := timestamp + a.stalenessInterval - a.da.flush(a.pushSamples) + a.da.flush(a.pushSamples, timestamp, deleteDeadline) d := time.Since(startTime) a.dedupFlushDuration.Update(d.Seconds()) @@ -902,6 +909,7 @@ func (a *aggregator) Push(tss []prompbmarshal.TimeSeries, matchIdxs []byte) { minTimestamp := a.minTimestamp.Load() nowMsec := time.Now().UnixMilli() + deleteDeadline := nowMsec + a.stalenessInterval var maxLagMsec int64 for idx, ts := range tss { if !a.match.Match(ts.Labels) { @@ -930,7 +938,7 @@ func (a *aggregator) Push(tss []prompbmarshal.TimeSeries, matchIdxs []byte) { } bufLen := len(buf) - buf = compressLabels(buf, inputLabels.Labels, outputLabels.Labels) + buf = compressLabels(buf, a.lc, inputLabels.Labels, outputLabels.Labels, a.includeInputKey, deleteDeadline) // key remains valid only by the end of this function and can't be reused after // do not intern key because number of unique keys could be too high key := bytesutil.ToUnsafeString(buf[bufLen:]) @@ -964,52 +972,60 @@ func (a *aggregator) Push(tss []prompbmarshal.TimeSeries, matchIdxs []byte) { ctx.buf = buf if a.da != nil { - a.da.pushSamples(samples) + a.da.pushSamples(samples, deleteDeadline) } else { - a.pushSamples(samples) + a.pushSamples(samples, deleteDeadline) } } -func compressLabels(dst []byte, inputLabels, outputLabels []prompbmarshal.Label) []byte { +func compressLabels(dst []byte, lc *promutils.LabelsCompressor, inputLabels, outputLabels []prompbmarshal.Label, includeInputKey bool, deleteDeadline int64) []byte { bb := bbPool.Get() - bb.B = lc.Compress(bb.B, inputLabels) - dst = encoding.MarshalVarUint64(dst, uint64(len(bb.B))) - dst = append(dst, bb.B...) - bbPool.Put(bb) - dst = lc.Compress(dst, outputLabels) + bb.B = lc.Compress(bb.B, outputLabels, deleteDeadline) + if includeInputKey { + dst = encoding.MarshalVarUint64(dst, uint64(len(bb.B))) + dst = append(dst, bb.B...) + bbPool.Put(bb) + dst = lc.Compress(dst, inputLabels, deleteDeadline) + } else { + dst = append(dst, bb.B...) + bbPool.Put(bb) + } return dst } -func decompressLabels(dst []prompbmarshal.Label, key string) []prompbmarshal.Label { +func decompressLabels(dst []prompbmarshal.Label, lc *promutils.LabelsCompressor, key string) []prompbmarshal.Label { return lc.Decompress(dst, bytesutil.ToUnsafeBytes(key)) } -func getOutputKey(key string) string { +func getOutputKey(key string, includeInputKey bool) string { src := bytesutil.ToUnsafeBytes(key) - inputKeyLen, nSize := encoding.UnmarshalVarUint64(src) - if nSize <= 0 { - logger.Panicf("BUG: cannot unmarshal inputKeyLen from uvarint") + outputKey := src + if includeInputKey { + outputKeyLen, nSize := encoding.UnmarshalVarUint64(src) + if nSize <= 0 { + logger.Panicf("BUG: cannot unmarshal outputKeyLen from uvarint") + } + src = src[nSize:] + outputKey = src[:outputKeyLen] } - src = src[nSize:] - outputKey := src[inputKeyLen:] return bytesutil.ToUnsafeString(outputKey) } func getInputOutputKey(key string) (string, string) { src := bytesutil.ToUnsafeBytes(key) - inputKeyLen, nSize := encoding.UnmarshalVarUint64(src) + outputKeyLen, nSize := encoding.UnmarshalVarUint64(src) if nSize <= 0 { - logger.Panicf("BUG: cannot unmarshal inputKeyLen from uvarint") + logger.Panicf("BUG: cannot unmarshal outputKeyLen from uvarint") } src = src[nSize:] - inputKey := src[:inputKeyLen] - outputKey := src[inputKeyLen:] + outputKey := src[:outputKeyLen] + inputKey := src[outputKeyLen:] return bytesutil.ToUnsafeString(inputKey), bytesutil.ToUnsafeString(outputKey) } -func (a *aggregator) pushSamples(samples []pushSample) { +func (a *aggregator) pushSamples(samples []pushSample, deleteDeadline int64) { for _, ao := range a.aggrOutputs { - ao.as.pushSamples(samples) + ao.as.pushSamples(samples, deleteDeadline, a.includeInputKey) } } @@ -1169,7 +1185,7 @@ func (ctx *flushCtx) flushSeries() { func (ctx *flushCtx) appendSeries(key, suffix string, value float64) { labelsLen := len(ctx.labels) samplesLen := len(ctx.samples) - ctx.labels = decompressLabels(ctx.labels, key) + ctx.labels = decompressLabels(ctx.labels, ctx.a.lc, key) if !ctx.a.keepMetricNames { ctx.labels = addMetricSuffix(ctx.labels, labelsLen, ctx.a.suffix, suffix) } @@ -1191,7 +1207,7 @@ func (ctx *flushCtx) appendSeries(key, suffix string, value float64) { func (ctx *flushCtx) appendSeriesWithExtraLabel(key, suffix string, value float64, extraName, extraValue string) { labelsLen := len(ctx.labels) samplesLen := len(ctx.samples) - ctx.labels = decompressLabels(ctx.labels, key) + ctx.labels = decompressLabels(ctx.labels, ctx.a.lc, key) if !ctx.a.keepMetricNames { ctx.labels = addMetricSuffix(ctx.labels, labelsLen, ctx.a.suffix, suffix) } diff --git a/lib/streamaggr/sum_samples.go b/lib/streamaggr/sum_samples.go index 947239bb3..d7928a020 100644 --- a/lib/streamaggr/sum_samples.go +++ b/lib/streamaggr/sum_samples.go @@ -12,41 +12,46 @@ type sumSamplesAggrState struct { } type sumSamplesStateValue struct { - mu sync.Mutex - sum float64 - deleted bool + mu sync.Mutex + sum float64 + deleted bool + defined bool + deleteDeadline int64 } func newSumSamplesAggrState() *sumSamplesAggrState { return &sumSamplesAggrState{} } -func (as *sumSamplesAggrState) pushSamples(samples []pushSample) { +func (as *sumSamplesAggrState) pushSamples(samples []pushSample, deleteDeadline int64, includeInputKey bool) { for i := range samples { s := &samples[i] - outputKey := getOutputKey(s.key) + outputKey := getOutputKey(s.key, includeInputKey) again: v, ok := as.m.Load(outputKey) if !ok { // The entry is missing in the map. Try creating it. - v = &sumSamplesStateValue{ - sum: s.value, - } + v = &sumSamplesStateValue{} outputKey = bytesutil.InternString(outputKey) vNew, loaded := as.m.LoadOrStore(outputKey, v) - if !loaded { - // The new entry has been successfully created. - continue + if loaded { + // Use the entry created by a concurrent goroutine. + v = vNew } - // Use the entry created by a concurrent goroutine. - v = vNew } sv := v.(*sumSamplesStateValue) sv.mu.Lock() + if !sv.defined { + sv.defined = true + } deleted := sv.deleted if !deleted { sv.sum += s.value + sv.deleteDeadline = deleteDeadline + if !sv.defined { + sv.defined = true + } } sv.mu.Unlock() if deleted { @@ -65,9 +70,22 @@ func (as *sumSamplesAggrState) flushState(ctx *flushCtx) { sv := v.(*sumSamplesStateValue) sv.mu.Lock() + if ctx.flushTimestamp > sv.deleteDeadline { + sv.deleted = true + sv.mu.Unlock() + key := k.(string) + ctx.a.lc.Delete(bytesutil.ToUnsafeBytes(key), ctx.flushTimestamp) + m.Delete(k) + return true + } + if !sv.defined { + sv.mu.Unlock() + return true + } + sum := sv.sum - // Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls. - sv.deleted = true + sv.defined = false + sv.sum = 0 sv.mu.Unlock() key := k.(string) diff --git a/lib/streamaggr/total.go b/lib/streamaggr/total.go index e905e4531..8ee9e0eab 100644 --- a/lib/streamaggr/total.go +++ b/lib/streamaggr/total.go @@ -3,10 +3,9 @@ package streamaggr import ( "math" "sync" - "time" + "sync/atomic" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" ) // totalAggrState calculates output=total, total_prometheus, increase and increase_prometheus. @@ -19,48 +18,37 @@ type totalAggrState struct { // Whether to take into account the first sample in new time series when calculating the output value. keepFirstSample bool - // Time series state is dropped if no new samples are received during stalenessSecs. - // - // Aslo, the first sample per each new series is ignored during stalenessSecs even if keepFirstSample is set. - // see ignoreFirstSampleDeadline for more details. - stalenessSecs uint64 - - // The first sample per each new series is ignored until this unix timestamp deadline in seconds even if keepFirstSample is set. + // The first sample per each new series is ignored first two intervals // This allows avoiding an initial spike of the output values at startup when new time series - // cannot be distinguished from already existing series. This is tracked with ignoreFirstSampleDeadline. - ignoreFirstSampleDeadline uint64 + // cannot be distinguished from already existing series. This is tracked with ignoreFirstSamples. + ignoreFirstSamples atomic.Int32 } type totalStateValue struct { mu sync.Mutex lastValues map[string]totalLastValueState total float64 - deleteDeadline uint64 + deleteDeadline int64 deleted bool } type totalLastValueState struct { value float64 timestamp int64 - deleteDeadline uint64 + deleteDeadline int64 } -func newTotalAggrState(stalenessInterval time.Duration, resetTotalOnFlush, keepFirstSample bool) *totalAggrState { - stalenessSecs := roundDurationToSecs(stalenessInterval) - ignoreFirstSampleDeadline := fasttime.UnixTimestamp() + stalenessSecs - - return &totalAggrState{ - resetTotalOnFlush: resetTotalOnFlush, - keepFirstSample: keepFirstSample, - stalenessSecs: stalenessSecs, - ignoreFirstSampleDeadline: ignoreFirstSampleDeadline, +func newTotalAggrState(resetTotalOnFlush, keepFirstSample bool) *totalAggrState { + as := &totalAggrState{ + resetTotalOnFlush: resetTotalOnFlush, + keepFirstSample: keepFirstSample, } + as.ignoreFirstSamples.Store(2) + return as } -func (as *totalAggrState) pushSamples(samples []pushSample) { - currentTime := fasttime.UnixTimestamp() - deleteDeadline := currentTime + as.stalenessSecs - keepFirstSample := as.keepFirstSample && currentTime > as.ignoreFirstSampleDeadline +func (as *totalAggrState) pushSamples(samples []pushSample, deleteDeadline int64, _ bool) { + keepFirstSample := as.keepFirstSample && as.ignoreFirstSamples.Load() <= 0 for i := range samples { s := &samples[i] inputKey, outputKey := getInputOutputKey(s.key) @@ -116,11 +104,9 @@ func (as *totalAggrState) pushSamples(samples []pushSample) { } func (as *totalAggrState) flushState(ctx *flushCtx) { - currentTime := fasttime.UnixTimestamp() - suffix := as.getSuffix() - as.removeOldEntries(currentTime) + as.removeOldEntries(ctx) m := &as.m m.Range(func(k, v any) bool { @@ -143,6 +129,10 @@ func (as *totalAggrState) flushState(ctx *flushCtx) { } return true }) + ignoreFirstSamples := as.ignoreFirstSamples.Load() + if ignoreFirstSamples > 0 { + as.ignoreFirstSamples.Add(-1) + } } func (as *totalAggrState) getSuffix() string { @@ -159,16 +149,18 @@ func (as *totalAggrState) getSuffix() string { return "total_prometheus" } -func (as *totalAggrState) removeOldEntries(currentTime uint64) { +func (as *totalAggrState) removeOldEntries(ctx *flushCtx) { m := &as.m m.Range(func(k, v any) bool { sv := v.(*totalStateValue) sv.mu.Lock() - if currentTime > sv.deleteDeadline { + if ctx.flushTimestamp > sv.deleteDeadline { // Mark the current entry as deleted sv.deleted = true sv.mu.Unlock() + key := k.(string) + ctx.a.lc.Delete(bytesutil.ToUnsafeBytes(key), ctx.flushTimestamp) m.Delete(k) return true } @@ -176,7 +168,7 @@ func (as *totalAggrState) removeOldEntries(currentTime uint64) { // Delete outdated entries in sv.lastValues lvs := sv.lastValues for k1, lv := range lvs { - if currentTime > lv.deleteDeadline { + if ctx.flushTimestamp > lv.deleteDeadline { delete(lvs, k1) } } diff --git a/lib/streamaggr/unique_samples.go b/lib/streamaggr/unique_samples.go index 93145397c..822ff5652 100644 --- a/lib/streamaggr/unique_samples.go +++ b/lib/streamaggr/unique_samples.go @@ -12,37 +12,34 @@ type uniqueSamplesAggrState struct { } type uniqueSamplesStateValue struct { - mu sync.Mutex - m map[float64]struct{} - deleted bool + mu sync.Mutex + m map[float64]struct{} + deleted bool + deleteDeadline int64 } func newUniqueSamplesAggrState() *uniqueSamplesAggrState { return &uniqueSamplesAggrState{} } -func (as *uniqueSamplesAggrState) pushSamples(samples []pushSample) { +func (as *uniqueSamplesAggrState) pushSamples(samples []pushSample, deleteDeadline int64, includeInputKey bool) { for i := range samples { s := &samples[i] - outputKey := getOutputKey(s.key) + outputKey := getOutputKey(s.key, includeInputKey) again: v, ok := as.m.Load(outputKey) if !ok { // The entry is missing in the map. Try creating it. v = &uniqueSamplesStateValue{ - m: map[float64]struct{}{ - s.value: {}, - }, + m: make(map[float64]struct{}), } outputKey = bytesutil.InternString(outputKey) vNew, loaded := as.m.LoadOrStore(outputKey, v) - if !loaded { - // The new entry has been successfully created. - continue + if loaded { + // Use the entry created by a concurrent goroutine. + v = vNew } - // Use the entry created by a concurrent goroutine. - v = vNew } sv := v.(*uniqueSamplesStateValue) sv.mu.Lock() @@ -51,6 +48,7 @@ func (as *uniqueSamplesAggrState) pushSamples(samples []pushSample) { if _, ok := sv.m[s.value]; !ok { sv.m[s.value] = struct{}{} } + sv.deleteDeadline = deleteDeadline } sv.mu.Unlock() if deleted { @@ -69,13 +67,21 @@ func (as *uniqueSamplesAggrState) flushState(ctx *flushCtx) { sv := v.(*uniqueSamplesStateValue) sv.mu.Lock() + if ctx.flushTimestamp > sv.deleteDeadline { + sv.deleted = true + sv.mu.Unlock() + key := k.(string) + ctx.a.lc.Delete(bytesutil.ToUnsafeBytes(key), ctx.flushTimestamp) + m.Delete(k) + return true + } n := len(sv.m) - // Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls. - sv.deleted = true + sv.m = make(map[float64]struct{}) sv.mu.Unlock() - - key := k.(string) - ctx.appendSeries(key, "unique_samples", float64(n)) + if n > 0 { + key := k.(string) + ctx.appendSeries(key, "unique_samples", float64(n)) + } return true }) }