added compressor cleanup

This commit is contained in:
Andrii Chubatiuk 2024-11-18 09:29:53 +02:00
parent 7e72848ab3
commit 60941a311f
No known key found for this signature in database
GPG key ID: 96D776CC99880667
22 changed files with 652 additions and 370 deletions

View file

@ -11,16 +11,77 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
) )
type queue struct {
in chan uint64
out chan uint64
storage []uint64
}
func newQueue() *queue {
q := &queue{
out: make(chan uint64),
in: make(chan uint64),
}
go func() {
defer close(q.out)
for {
if len(q.storage) == 0 {
item, ok := <-q.in
if !ok {
return
}
q.storage = append(q.storage, item)
continue
}
select {
case item, ok := <-q.in:
if ok {
q.storage = append(q.storage, item)
} else {
// unwind storage
for _, item := range q.storage {
q.out <- item
}
return
}
case q.out <- q.storage[0]:
if len(q.storage) == 1 {
q.storage = nil
} else {
q.storage = q.storage[1:]
}
}
}
}()
return q
}
type compressedLabel struct {
mu sync.Mutex
code uint64
deleted bool
deleteDeadline int64
}
// LabelsCompressor compresses []prompbmarshal.Label into short binary strings // LabelsCompressor compresses []prompbmarshal.Label into short binary strings
type LabelsCompressor struct { type LabelsCompressor struct {
labelToIdx sync.Map labelToIdx sync.Map
idxToLabel labelsMap idxToLabel labelsMap
freeIdxs *queue
nextIdx atomic.Uint64 nextIdx atomic.Uint64
totalSizeBytes atomic.Uint64 totalSizeBytes atomic.Uint64
} }
func NewLabelsCompressor() *LabelsCompressor {
return &LabelsCompressor{
freeIdxs: newQueue(),
}
}
// SizeBytes returns the size of lc data in bytes // SizeBytes returns the size of lc data in bytes
func (lc *LabelsCompressor) SizeBytes() uint64 { func (lc *LabelsCompressor) SizeBytes() uint64 {
return uint64(unsafe.Sizeof(*lc)) + lc.totalSizeBytes.Load() return uint64(unsafe.Sizeof(*lc)) + lc.totalSizeBytes.Load()
@ -28,13 +89,60 @@ func (lc *LabelsCompressor) SizeBytes() uint64 {
// ItemsCount returns the number of items in lc // ItemsCount returns the number of items in lc
func (lc *LabelsCompressor) ItemsCount() uint64 { func (lc *LabelsCompressor) ItemsCount() uint64 {
return lc.nextIdx.Load() return lc.nextIdx.Load() - uint64(len(lc.freeIdxs.storage))
}
// Delete adds stale labels idx to lc.freeIdxs list
func (lc *LabelsCompressor) Delete(src []byte, ts int64) {
labelsLen, nSize := encoding.UnmarshalVarUint64(src)
if nSize <= 0 {
logger.Panicf("BUG: cannot unmarshal labels length from uvarint")
}
tail := src[nSize:]
if labelsLen == 0 {
// fast path - nothing to decode
if len(tail) > 0 {
logger.Panicf("BUG: unexpected non-empty tail left; len(tail)=%d; tail=%X", len(tail), tail)
}
return
}
a := encoding.GetUint64s(int(labelsLen))
var err error
tail, err = encoding.UnmarshalVarUint64s(a.A, tail)
if err != nil {
logger.Panicf("BUG: cannot unmarshal label indexes: %s", err)
}
if len(tail) > 0 {
logger.Panicf("BUG: unexpected non-empty tail left: len(tail)=%d; tail=%X", len(tail), tail)
}
for _, idx := range a.A {
label, ok := lc.idxToLabel.Load(idx)
if !ok {
logger.Panicf("BUG: missing label for idx=%d", idx)
}
v, ok := lc.labelToIdx.Load(label)
if !ok {
continue
}
cl := v.(compressedLabel)
cl.mu.Lock()
if cl.deleteDeadline < ts {
cl.deleted = true
cl.mu.Unlock()
lc.freeIdxs.in <- idx
} else {
cl.mu.Unlock()
}
}
encoding.PutUint64s(a)
} }
// Compress compresses labels, appends the compressed labels to dst and returns the result. // Compress compresses labels, appends the compressed labels to dst and returns the result.
// //
// It is safe calling Compress from concurrent goroutines. // It is safe calling Compress from concurrent goroutines.
func (lc *LabelsCompressor) Compress(dst []byte, labels []prompbmarshal.Label) []byte { func (lc *LabelsCompressor) Compress(dst []byte, labels []prompbmarshal.Label, deleteDeadline int64) []byte {
if len(labels) == 0 { if len(labels) == 0 {
// Fast path // Fast path
return append(dst, 0) return append(dst, 0)
@ -42,22 +150,27 @@ func (lc *LabelsCompressor) Compress(dst []byte, labels []prompbmarshal.Label) [
a := encoding.GetUint64s(len(labels) + 1) a := encoding.GetUint64s(len(labels) + 1)
a.A[0] = uint64(len(labels)) a.A[0] = uint64(len(labels))
lc.compress(a.A[1:], labels) lc.compress(a.A[1:], labels, deleteDeadline)
dst = encoding.MarshalVarUint64s(dst, a.A) dst = encoding.MarshalVarUint64s(dst, a.A)
encoding.PutUint64s(a) encoding.PutUint64s(a)
return dst return dst
} }
func (lc *LabelsCompressor) compress(dst []uint64, labels []prompbmarshal.Label) { func (lc *LabelsCompressor) compress(dst []uint64, labels []prompbmarshal.Label, deleteDeadline int64) {
if len(labels) == 0 { if len(labels) == 0 {
return return
} }
_ = dst[len(labels)-1] _ = dst[len(labels)-1]
for i, label := range labels { for i, label := range labels {
again:
v, ok := lc.labelToIdx.Load(label) v, ok := lc.labelToIdx.Load(label)
if !ok { if !ok {
idx := lc.nextIdx.Add(1) var idx uint64
v = idx select {
case idx = <-lc.freeIdxs.out:
default:
idx = lc.nextIdx.Add(1)
}
labelCopy := cloneLabel(label) labelCopy := cloneLabel(label)
// Must store idxToLabel entry before labelToIdx, // Must store idxToLabel entry before labelToIdx,
@ -66,6 +179,10 @@ func (lc *LabelsCompressor) compress(dst []uint64, labels []prompbmarshal.Label)
// We might store duplicated entries for single label with different indexes, // We might store duplicated entries for single label with different indexes,
// and it's fine, see https://github.com/VictoriaMetrics/VictoriaMetrics/pull/7118. // and it's fine, see https://github.com/VictoriaMetrics/VictoriaMetrics/pull/7118.
lc.idxToLabel.Store(idx, labelCopy) lc.idxToLabel.Store(idx, labelCopy)
v = &compressedLabel{
deleteDeadline: deleteDeadline,
code: idx,
}
vNew, loaded := lc.labelToIdx.LoadOrStore(labelCopy, v) vNew, loaded := lc.labelToIdx.LoadOrStore(labelCopy, v)
if loaded { if loaded {
// This label has been stored by a concurrent goroutine with different index, // This label has been stored by a concurrent goroutine with different index,
@ -78,7 +195,17 @@ func (lc *LabelsCompressor) compress(dst []uint64, labels []prompbmarshal.Label)
entrySizeBytes := labelSizeBytes + uint64(2*(unsafe.Sizeof(label)+unsafe.Sizeof(&label))+unsafe.Sizeof(v)) entrySizeBytes := labelSizeBytes + uint64(2*(unsafe.Sizeof(label)+unsafe.Sizeof(&label))+unsafe.Sizeof(v))
lc.totalSizeBytes.Add(entrySizeBytes) lc.totalSizeBytes.Add(entrySizeBytes)
} }
dst[i] = v.(uint64) cl := v.(*compressedLabel)
dst[i] = cl.code
cl.mu.Lock()
deleted := cl.deleted
if !deleted {
cl.deleteDeadline = deleteDeadline
}
cl.mu.Unlock()
if deleted {
goto again
}
} }
} }

View file

@ -9,14 +9,14 @@ import (
) )
func TestLabelsCompressorSerial(t *testing.T) { func TestLabelsCompressorSerial(t *testing.T) {
var lc LabelsCompressor lc := NewLabelsCompressor()
f := func(labels []prompbmarshal.Label) { f := func(labels []prompbmarshal.Label) {
t.Helper() t.Helper()
sExpected := labelsToString(labels) sExpected := labelsToString(labels)
data := lc.Compress(nil, labels) data := lc.Compress(nil, labels, 0)
labelsResult := lc.Decompress(nil, data) labelsResult := lc.Decompress(nil, data)
sResult := labelsToString(labelsResult) sResult := labelsToString(labelsResult)
@ -67,7 +67,7 @@ func TestLabelsCompressorSerial(t *testing.T) {
func TestLabelsCompressorConcurrent(t *testing.T) { func TestLabelsCompressorConcurrent(t *testing.T) {
const concurrency = 5 const concurrency = 5
var lc LabelsCompressor lc := NewLabelsCompressor()
var expectCompressedKeys sync.Map var expectCompressedKeys sync.Map
var wg sync.WaitGroup var wg sync.WaitGroup
@ -78,7 +78,7 @@ func TestLabelsCompressorConcurrent(t *testing.T) {
series := newTestSeries(100, 20) series := newTestSeries(100, 20)
for n, labels := range series { for n, labels := range series {
sExpected := labelsToString(labels) sExpected := labelsToString(labels)
data := lc.Compress(nil, labels) data := lc.Compress(nil, labels, 0)
if expectData, ok := expectCompressedKeys.LoadOrStore(n, data); ok { if expectData, ok := expectCompressedKeys.LoadOrStore(n, data); ok {
if string(data) != string(expectData.([]byte)) { if string(data) != string(expectData.([]byte)) {
panic(fmt.Errorf("unexpected compress result at series/%d in iteration %d ", n, i)) panic(fmt.Errorf("unexpected compress result at series/%d in iteration %d ", n, i))

View file

@ -8,7 +8,7 @@ import (
) )
func BenchmarkLabelsCompressorCompress(b *testing.B) { func BenchmarkLabelsCompressorCompress(b *testing.B) {
var lc LabelsCompressor lc := NewLabelsCompressor()
series := newTestSeries(100, 10) series := newTestSeries(100, 10)
b.ReportAllocs() b.ReportAllocs()
@ -19,7 +19,7 @@ func BenchmarkLabelsCompressorCompress(b *testing.B) {
for pb.Next() { for pb.Next() {
dst = dst[:0] dst = dst[:0]
for _, labels := range series { for _, labels := range series {
dst = lc.Compress(dst, labels) dst = lc.Compress(dst, labels, 0)
} }
Sink.Add(uint64(len(dst))) Sink.Add(uint64(len(dst)))
} }
@ -27,13 +27,13 @@ func BenchmarkLabelsCompressorCompress(b *testing.B) {
} }
func BenchmarkLabelsCompressorDecompress(b *testing.B) { func BenchmarkLabelsCompressorDecompress(b *testing.B) {
var lc LabelsCompressor lc := NewLabelsCompressor()
series := newTestSeries(100, 10) series := newTestSeries(100, 10)
datas := make([][]byte, len(series)) datas := make([][]byte, len(series))
var dst []byte var dst []byte
for i, labels := range series { for i, labels := range series {
dstLen := len(dst) dstLen := len(dst)
dst = lc.Compress(dst, labels) dst = lc.Compress(dst, labels, 0)
datas[i] = dst[dstLen:] datas[i] = dst[dstLen:]
} }

View file

@ -12,37 +12,33 @@ type avgAggrState struct {
} }
type avgStateValue struct { type avgStateValue struct {
mu sync.Mutex mu sync.Mutex
sum float64 sum float64
count int64 count int64
deleted bool deleted bool
deleteDeadline int64
} }
func newAvgAggrState() *avgAggrState { func newAvgAggrState() *avgAggrState {
return &avgAggrState{} return &avgAggrState{}
} }
func (as *avgAggrState) pushSamples(samples []pushSample) { func (as *avgAggrState) pushSamples(samples []pushSample, deleteDeadline int64, includeInputKey bool) {
for i := range samples { for i := range samples {
s := &samples[i] s := &samples[i]
outputKey := getOutputKey(s.key) outputKey := getOutputKey(s.key, includeInputKey)
again: again:
v, ok := as.m.Load(outputKey) v, ok := as.m.Load(outputKey)
if !ok { if !ok {
// The entry is missing in the map. Try creating it. // The entry is missing in the map. Try creating it.
v = &avgStateValue{ v = &avgStateValue{}
sum: s.value,
count: 1,
}
outputKey = bytesutil.InternString(outputKey) outputKey = bytesutil.InternString(outputKey)
vNew, loaded := as.m.LoadOrStore(outputKey, v) vNew, loaded := as.m.LoadOrStore(outputKey, v)
if !loaded { if loaded {
// The entry has been successfully stored // Update the entry created by a concurrent goroutine.
continue v = vNew
} }
// Update the entry created by a concurrent goroutine.
v = vNew
} }
sv := v.(*avgStateValue) sv := v.(*avgStateValue)
sv.mu.Lock() sv.mu.Lock()
@ -50,6 +46,7 @@ func (as *avgAggrState) pushSamples(samples []pushSample) {
if !deleted { if !deleted {
sv.sum += s.value sv.sum += s.value
sv.count++ sv.count++
sv.deleteDeadline = deleteDeadline
} }
sv.mu.Unlock() sv.mu.Unlock()
if deleted { if deleted {
@ -68,9 +65,22 @@ func (as *avgAggrState) flushState(ctx *flushCtx) {
sv := v.(*avgStateValue) sv := v.(*avgStateValue)
sv.mu.Lock() sv.mu.Lock()
if ctx.flushTimestamp > sv.deleteDeadline {
sv.deleted = true
sv.mu.Unlock()
key := k.(string)
ctx.a.lc.Delete(bytesutil.ToUnsafeBytes(key), ctx.flushTimestamp)
m.Delete(k)
return true
}
if sv.count == 0 {
sv.mu.Unlock()
return true
}
avg := sv.sum / float64(sv.count) avg := sv.sum / float64(sv.count)
// Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls. sv.sum = 0
sv.deleted = true sv.count = 0
sv.mu.Unlock() sv.mu.Unlock()
key := k.(string) key := k.(string)

View file

@ -12,41 +12,39 @@ type countSamplesAggrState struct {
} }
type countSamplesStateValue struct { type countSamplesStateValue struct {
mu sync.Mutex mu sync.Mutex
n uint64 n uint64
deleted bool deleted bool
deleteDeadline int64
} }
func newCountSamplesAggrState() *countSamplesAggrState { func newCountSamplesAggrState() *countSamplesAggrState {
return &countSamplesAggrState{} return &countSamplesAggrState{}
} }
func (as *countSamplesAggrState) pushSamples(samples []pushSample) { func (as *countSamplesAggrState) pushSamples(samples []pushSample, deleteDeadline int64, includeInputKey bool) {
for i := range samples { for i := range samples {
s := &samples[i] s := &samples[i]
outputKey := getOutputKey(s.key) outputKey := getOutputKey(s.key, includeInputKey)
again: again:
v, ok := as.m.Load(outputKey) v, ok := as.m.Load(outputKey)
if !ok { if !ok {
// The entry is missing in the map. Try creating it. // The entry is missing in the map. Try creating it.
v = &countSamplesStateValue{ v = &countSamplesStateValue{}
n: 1,
}
outputKey = bytesutil.InternString(outputKey) outputKey = bytesutil.InternString(outputKey)
vNew, loaded := as.m.LoadOrStore(outputKey, v) vNew, loaded := as.m.LoadOrStore(outputKey, v)
if !loaded { if loaded {
// The new entry has been successfully created. // Use the entry created by a concurrent goroutine.
continue v = vNew
} }
// Use the entry created by a concurrent goroutine.
v = vNew
} }
sv := v.(*countSamplesStateValue) sv := v.(*countSamplesStateValue)
sv.mu.Lock() sv.mu.Lock()
deleted := sv.deleted deleted := sv.deleted
if !deleted { if !deleted {
sv.n++ sv.n++
sv.deleteDeadline = deleteDeadline
} }
sv.mu.Unlock() sv.mu.Unlock()
if deleted { if deleted {
@ -65,9 +63,21 @@ func (as *countSamplesAggrState) flushState(ctx *flushCtx) {
sv := v.(*countSamplesStateValue) sv := v.(*countSamplesStateValue)
sv.mu.Lock() sv.mu.Lock()
if ctx.flushTimestamp > sv.deleteDeadline {
sv.deleted = true
sv.mu.Unlock()
key := k.(string)
ctx.a.lc.Delete(bytesutil.ToUnsafeBytes(key), ctx.flushTimestamp)
m.Delete(k)
return true
}
n := sv.n n := sv.n
// Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls. if n == 0 {
sv.deleted = true sv.mu.Unlock()
return true
}
sv.n = 0
sv.mu.Unlock() sv.mu.Unlock()
key := k.(string) key := k.(string)

View file

@ -13,16 +13,17 @@ type countSeriesAggrState struct {
} }
type countSeriesStateValue struct { type countSeriesStateValue struct {
mu sync.Mutex mu sync.Mutex
m map[uint64]struct{} m map[uint64]struct{}
deleted bool deleted bool
deleteDeadline int64
} }
func newCountSeriesAggrState() *countSeriesAggrState { func newCountSeriesAggrState() *countSeriesAggrState {
return &countSeriesAggrState{} return &countSeriesAggrState{}
} }
func (as *countSeriesAggrState) pushSamples(samples []pushSample) { func (as *countSeriesAggrState) pushSamples(samples []pushSample, deleteDeadline int64, _ bool) {
for i := range samples { for i := range samples {
s := &samples[i] s := &samples[i]
inputKey, outputKey := getInputOutputKey(s.key) inputKey, outputKey := getInputOutputKey(s.key)
@ -36,18 +37,14 @@ func (as *countSeriesAggrState) pushSamples(samples []pushSample) {
if !ok { if !ok {
// The entry is missing in the map. Try creating it. // The entry is missing in the map. Try creating it.
v = &countSeriesStateValue{ v = &countSeriesStateValue{
m: map[uint64]struct{}{ m: make(map[uint64]struct{}),
h: {},
},
} }
outputKey = bytesutil.InternString(outputKey) outputKey = bytesutil.InternString(outputKey)
vNew, loaded := as.m.LoadOrStore(outputKey, v) vNew, loaded := as.m.LoadOrStore(outputKey, v)
if !loaded { if loaded {
// The entry has been added to the map. // Update the entry created by a concurrent goroutine.
continue v = vNew
} }
// Update the entry created by a concurrent goroutine.
v = vNew
} }
sv := v.(*countSeriesStateValue) sv := v.(*countSeriesStateValue)
sv.mu.Lock() sv.mu.Lock()
@ -56,6 +53,7 @@ func (as *countSeriesAggrState) pushSamples(samples []pushSample) {
if _, ok := sv.m[h]; !ok { if _, ok := sv.m[h]; !ok {
sv.m[h] = struct{}{} sv.m[h] = struct{}{}
} }
sv.deleteDeadline = deleteDeadline
} }
sv.mu.Unlock() sv.mu.Unlock()
if deleted { if deleted {
@ -74,9 +72,21 @@ func (as *countSeriesAggrState) flushState(ctx *flushCtx) {
sv := v.(*countSeriesStateValue) sv := v.(*countSeriesStateValue)
sv.mu.Lock() sv.mu.Lock()
if ctx.flushTimestamp > sv.deleteDeadline {
sv.deleted = true
sv.mu.Unlock()
key := k.(string)
ctx.a.lc.Delete(bytesutil.ToUnsafeBytes(key), ctx.flushTimestamp)
m.Delete(k)
return true
}
n := len(sv.m) n := len(sv.m)
// Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls. if n == 0 {
sv.deleted = true sv.mu.Unlock()
return true
}
sv.m = make(map[uint64]struct{})
sv.mu.Unlock() sv.mu.Unlock()
key := k.(string) key := k.(string)

View file

@ -8,12 +8,14 @@ import (
"github.com/cespare/xxhash/v2" "github.com/cespare/xxhash/v2"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil"
) )
const dedupAggrShardsCount = 128 const dedupAggrShardsCount = 128
type dedupAggr struct { type dedupAggr struct {
lc *promutils.LabelsCompressor
shards []dedupAggrShard shards []dedupAggrShard
} }
@ -26,7 +28,7 @@ type dedupAggrShard struct {
} }
type dedupAggrShardNopad struct { type dedupAggrShardNopad struct {
mu sync.Mutex mu sync.RWMutex
m map[string]*dedupAggrSample m map[string]*dedupAggrSample
samplesBuf []dedupAggrSample samplesBuf []dedupAggrSample
@ -36,14 +38,16 @@ type dedupAggrShardNopad struct {
} }
type dedupAggrSample struct { type dedupAggrSample struct {
value float64 value float64
timestamp int64 timestamp int64
deleteDeadline int64
} }
func newDedupAggr() *dedupAggr { func newDedupAggr(lc *promutils.LabelsCompressor) *dedupAggr {
shards := make([]dedupAggrShard, dedupAggrShardsCount) shards := make([]dedupAggrShard, dedupAggrShardsCount)
return &dedupAggr{ return &dedupAggr{
shards: shards, shards: shards,
lc: lc,
} }
} }
@ -63,7 +67,7 @@ func (da *dedupAggr) itemsCount() uint64 {
return n return n
} }
func (da *dedupAggr) pushSamples(samples []pushSample) { func (da *dedupAggr) pushSamples(samples []pushSample, deleteDeadline int64) {
pss := getPerShardSamples() pss := getPerShardSamples()
shards := pss.shards shards := pss.shards
for _, sample := range samples { for _, sample := range samples {
@ -75,17 +79,21 @@ func (da *dedupAggr) pushSamples(samples []pushSample) {
if len(shardSamples) == 0 { if len(shardSamples) == 0 {
continue continue
} }
da.shards[i].pushSamples(shardSamples) da.shards[i].pushSamples(shardSamples, deleteDeadline)
} }
putPerShardSamples(pss) putPerShardSamples(pss)
} }
func getDedupFlushCtx() *dedupFlushCtx { func getDedupFlushCtx(flushTimestamp, deleteDeadline int64, lc *promutils.LabelsCompressor) *dedupFlushCtx {
v := dedupFlushCtxPool.Get() v := dedupFlushCtxPool.Get()
if v == nil { if v == nil {
return &dedupFlushCtx{} v = &dedupFlushCtx{}
} }
return v.(*dedupFlushCtx) ctx := v.(*dedupFlushCtx)
ctx.lc = lc
ctx.deleteDeadline = deleteDeadline
ctx.flushTimestamp = flushTimestamp
return ctx
} }
func putDedupFlushCtx(ctx *dedupFlushCtx) { func putDedupFlushCtx(ctx *dedupFlushCtx) {
@ -96,15 +104,24 @@ func putDedupFlushCtx(ctx *dedupFlushCtx) {
var dedupFlushCtxPool sync.Pool var dedupFlushCtxPool sync.Pool
type dedupFlushCtx struct { type dedupFlushCtx struct {
samples []pushSample keysToDelete []string
samples []pushSample
deleteDeadline int64
flushTimestamp int64
lc *promutils.LabelsCompressor
} }
func (ctx *dedupFlushCtx) reset() { func (ctx *dedupFlushCtx) reset() {
ctx.deleteDeadline = 0
ctx.flushTimestamp = 0
ctx.lc = nil
clear(ctx.keysToDelete)
ctx.keysToDelete = ctx.keysToDelete[:0]
clear(ctx.samples) clear(ctx.samples)
ctx.samples = ctx.samples[:0] ctx.samples = ctx.samples[:0]
} }
func (da *dedupAggr) flush(f func(samples []pushSample)) { func (da *dedupAggr) flush(f func(samples []pushSample, deleteDeadline int64), flushTimestamp, deleteDeadline int64) {
var wg sync.WaitGroup var wg sync.WaitGroup
for i := range da.shards { for i := range da.shards {
flushConcurrencyCh <- struct{}{} flushConcurrencyCh <- struct{}{}
@ -115,7 +132,7 @@ func (da *dedupAggr) flush(f func(samples []pushSample)) {
wg.Done() wg.Done()
}() }()
ctx := getDedupFlushCtx() ctx := getDedupFlushCtx(flushTimestamp, deleteDeadline, da.lc)
shard.flush(ctx, f) shard.flush(ctx, f)
putDedupFlushCtx(ctx) putDedupFlushCtx(ctx)
}(&da.shards[i]) }(&da.shards[i])
@ -154,7 +171,7 @@ func putPerShardSamples(pss *perShardSamples) {
var perShardSamplesPool sync.Pool var perShardSamplesPool sync.Pool
func (das *dedupAggrShard) pushSamples(samples []pushSample) { func (das *dedupAggrShard) pushSamples(samples []pushSample, deleteDeadline int64) {
das.mu.Lock() das.mu.Lock()
defer das.mu.Unlock() defer das.mu.Unlock()
@ -171,6 +188,7 @@ func (das *dedupAggrShard) pushSamples(samples []pushSample) {
s = &samplesBuf[len(samplesBuf)-1] s = &samplesBuf[len(samplesBuf)-1]
s.value = sample.value s.value = sample.value
s.timestamp = sample.timestamp s.timestamp = sample.timestamp
s.deleteDeadline = deleteDeadline
key := bytesutil.InternString(sample.key) key := bytesutil.InternString(sample.key)
m[key] = s m[key] = s
@ -183,30 +201,28 @@ func (das *dedupAggrShard) pushSamples(samples []pushSample) {
if sample.timestamp > s.timestamp || (sample.timestamp == s.timestamp && sample.value > s.value) { if sample.timestamp > s.timestamp || (sample.timestamp == s.timestamp && sample.value > s.value) {
s.value = sample.value s.value = sample.value
s.timestamp = sample.timestamp s.timestamp = sample.timestamp
s.deleteDeadline = deleteDeadline
} }
} }
das.samplesBuf = samplesBuf das.samplesBuf = samplesBuf
} }
func (das *dedupAggrShard) flush(ctx *dedupFlushCtx, f func(samples []pushSample)) { func (das *dedupAggrShard) flush(ctx *dedupFlushCtx, f func(samples []pushSample, deleteDeadline int64)) {
das.mu.Lock() if len(das.m) == 0 {
m := das.m
if len(m) > 0 {
das.m = make(map[string]*dedupAggrSample, len(m))
das.sizeBytes.Store(0)
das.itemsCount.Store(0)
das.samplesBuf = make([]dedupAggrSample, 0, len(das.samplesBuf))
}
das.mu.Unlock()
if len(m) == 0 {
return return
} }
keysToDelete := ctx.keysToDelete
dstSamples := ctx.samples dstSamples := ctx.samples
for key, s := range m { das.mu.RLock()
for key, s := range das.m {
if ctx.flushTimestamp > s.deleteDeadline {
das.itemsCount.Add(^uint64(0))
//ctx.lc.Delete(key)
das.sizeBytes.Add(^(uint64(len(key)) + uint64(unsafe.Sizeof(key)+unsafe.Sizeof(s)+unsafe.Sizeof(*s)) - 1))
keysToDelete = append(keysToDelete, key)
continue
}
dstSamples = append(dstSamples, pushSample{ dstSamples = append(dstSamples, pushSample{
key: key, key: key,
value: s.value, value: s.value,
@ -215,11 +231,17 @@ func (das *dedupAggrShard) flush(ctx *dedupFlushCtx, f func(samples []pushSample
// Limit the number of samples per each flush in order to limit memory usage. // Limit the number of samples per each flush in order to limit memory usage.
if len(dstSamples) >= 10_000 { if len(dstSamples) >= 10_000 {
f(dstSamples) f(dstSamples, ctx.deleteDeadline)
clear(dstSamples) clear(dstSamples)
dstSamples = dstSamples[:0] dstSamples = dstSamples[:0]
} }
} }
f(dstSamples) das.mu.RUnlock()
das.mu.Lock()
for _, key := range keysToDelete {
delete(das.m, key)
}
das.mu.Unlock()
f(dstSamples, ctx.deleteDeadline)
ctx.samples = dstSamples ctx.samples = dstSamples
} }

View file

@ -5,10 +5,13 @@ import (
"reflect" "reflect"
"sync" "sync"
"testing" "testing"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils"
) )
func TestDedupAggrSerial(t *testing.T) { func TestDedupAggrSerial(t *testing.T) {
da := newDedupAggr() var lc promutils.LabelsCompressor
da := newDedupAggr(&lc)
const seriesCount = 100_000 const seriesCount = 100_000
expectedSamplesMap := make(map[string]pushSample) expectedSamplesMap := make(map[string]pushSample)
@ -20,11 +23,11 @@ func TestDedupAggrSerial(t *testing.T) {
sample.value = float64(i + j) sample.value = float64(i + j)
expectedSamplesMap[sample.key] = *sample expectedSamplesMap[sample.key] = *sample
} }
da.pushSamples(samples) da.pushSamples(samples, 0)
} }
if n := da.sizeBytes(); n > 5_000_000 { if n := da.sizeBytes(); n > 6_000_000 {
t.Fatalf("too big dedupAggr state before flush: %d bytes; it shouldn't exceed 5_000_000 bytes", n) t.Fatalf("too big dedupAggr state before flush: %d bytes; it shouldn't exceed 6_000_000 bytes", n)
} }
if n := da.itemsCount(); n != seriesCount { if n := da.itemsCount(); n != seriesCount {
t.Fatalf("unexpected itemsCount; got %d; want %d", n, seriesCount) t.Fatalf("unexpected itemsCount; got %d; want %d", n, seriesCount)
@ -32,19 +35,21 @@ func TestDedupAggrSerial(t *testing.T) {
flushedSamplesMap := make(map[string]pushSample) flushedSamplesMap := make(map[string]pushSample)
var mu sync.Mutex var mu sync.Mutex
flushSamples := func(samples []pushSample) { flushSamples := func(samples []pushSample, _ int64) {
mu.Lock() mu.Lock()
for _, sample := range samples { for _, sample := range samples {
flushedSamplesMap[sample.key] = sample flushedSamplesMap[sample.key] = sample
} }
mu.Unlock() mu.Unlock()
} }
da.flush(flushSamples) da.flush(flushSamples, 0, 0)
if !reflect.DeepEqual(expectedSamplesMap, flushedSamplesMap) { if !reflect.DeepEqual(expectedSamplesMap, flushedSamplesMap) {
t.Fatalf("unexpected samples;\ngot\n%v\nwant\n%v", flushedSamplesMap, expectedSamplesMap) t.Fatalf("unexpected samples;\ngot\n%v\nwant\n%v", flushedSamplesMap, expectedSamplesMap)
} }
da.flush(flushSamples, 1, 0)
if n := da.sizeBytes(); n > 17_000 { if n := da.sizeBytes(); n > 17_000 {
t.Fatalf("too big dedupAggr state after flush; %d bytes; it shouldn't exceed 17_000 bytes", n) t.Fatalf("too big dedupAggr state after flush; %d bytes; it shouldn't exceed 17_000 bytes", n)
} }
@ -54,9 +59,10 @@ func TestDedupAggrSerial(t *testing.T) {
} }
func TestDedupAggrConcurrent(_ *testing.T) { func TestDedupAggrConcurrent(_ *testing.T) {
var lc promutils.LabelsCompressor
const concurrency = 5 const concurrency = 5
const seriesCount = 10_000 const seriesCount = 10_000
da := newDedupAggr() da := newDedupAggr(&lc)
var wg sync.WaitGroup var wg sync.WaitGroup
for i := 0; i < concurrency; i++ { for i := 0; i < concurrency; i++ {
@ -70,7 +76,7 @@ func TestDedupAggrConcurrent(_ *testing.T) {
sample.key = fmt.Sprintf("key_%d", j) sample.key = fmt.Sprintf("key_%d", j)
sample.value = float64(i + j) sample.value = float64(i + j)
} }
da.pushSamples(samples) da.pushSamples(samples, 0)
} }
}() }()
} }

View file

@ -6,6 +6,7 @@ import (
"testing" "testing"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils"
) )
func BenchmarkDedupAggr(b *testing.B) { func BenchmarkDedupAggr(b *testing.B) {
@ -17,9 +18,11 @@ func BenchmarkDedupAggr(b *testing.B) {
} }
func benchmarkDedupAggr(b *testing.B, samplesPerPush int) { func benchmarkDedupAggr(b *testing.B, samplesPerPush int) {
var lc promutils.LabelsCompressor
const loops = 2 const loops = 2
benchSamples := newBenchSamples(samplesPerPush) benchSamples := newBenchSamples(samplesPerPush)
da := newDedupAggr()
da := newDedupAggr(&lc)
b.ResetTimer() b.ResetTimer()
b.ReportAllocs() b.ReportAllocs()
@ -27,13 +30,14 @@ func benchmarkDedupAggr(b *testing.B, samplesPerPush int) {
b.RunParallel(func(pb *testing.PB) { b.RunParallel(func(pb *testing.PB) {
for pb.Next() { for pb.Next() {
for i := 0; i < loops; i++ { for i := 0; i < loops; i++ {
da.pushSamples(benchSamples) da.pushSamples(benchSamples, 0)
} }
} }
}) })
} }
func newBenchSamples(count int) []pushSample { func newBenchSamples(count int) []pushSample {
var lc promutils.LabelsCompressor
labels := []prompbmarshal.Label{ labels := []prompbmarshal.Label{
{ {
Name: "app", Name: "app",
@ -65,7 +69,7 @@ func newBenchSamples(count int) []pushSample {
Name: "app", Name: "app",
Value: fmt.Sprintf("instance-%d", i), Value: fmt.Sprintf("instance-%d", i),
}) })
keyBuf = compressLabels(keyBuf[:0], labels[:labelsLen], labels[labelsLen:]) keyBuf = compressLabels(keyBuf[:0], &lc, labels[:labelsLen], labels[labelsLen:], false, 0)
sample.key = string(keyBuf) sample.key = string(keyBuf)
sample.value = float64(i) sample.value = float64(i)
} }

View file

@ -16,8 +16,10 @@ import (
// Deduplicator deduplicates samples per each time series. // Deduplicator deduplicates samples per each time series.
type Deduplicator struct { type Deduplicator struct {
da *dedupAggr da *dedupAggr
lc *promutils.LabelsCompressor
dropLabels []string dropLabels []string
stalenessInterval int64
wg sync.WaitGroup wg sync.WaitGroup
stopCh chan struct{} stopCh chan struct{}
@ -40,13 +42,16 @@ type Deduplicator struct {
// MustStop must be called on the returned deduplicator in order to free up occupied resources. // MustStop must be called on the returned deduplicator in order to free up occupied resources.
func NewDeduplicator(pushFunc PushFunc, dedupInterval time.Duration, dropLabels []string, alias string) *Deduplicator { func NewDeduplicator(pushFunc PushFunc, dedupInterval time.Duration, dropLabels []string, alias string) *Deduplicator {
d := &Deduplicator{ d := &Deduplicator{
da: newDedupAggr(), dropLabels: dropLabels,
dropLabels: dropLabels, stalenessInterval: 2 * dedupInterval.Milliseconds(),
lc: promutils.NewLabelsCompressor(),
stopCh: make(chan struct{}), stopCh: make(chan struct{}),
ms: metrics.NewSet(), ms: metrics.NewSet(),
} }
d.da = newDedupAggr(d.lc)
ms := d.ms ms := d.ms
metricLabels := fmt.Sprintf(`name="dedup",url=%q`, alias) metricLabels := fmt.Sprintf(`name="dedup",url=%q`, alias)
@ -57,6 +62,12 @@ func NewDeduplicator(pushFunc PushFunc, dedupInterval time.Duration, dropLabels
_ = ms.NewGauge(fmt.Sprintf(`vm_streamaggr_dedup_state_items_count{%s}`, metricLabels), func() float64 { _ = ms.NewGauge(fmt.Sprintf(`vm_streamaggr_dedup_state_items_count{%s}`, metricLabels), func() float64 {
return float64(d.da.itemsCount()) return float64(d.da.itemsCount())
}) })
_ = ms.NewGauge(fmt.Sprintf(`vm_streamaggr_labels_compressor_size_bytes{%s}`, metricLabels), func() float64 {
return float64(d.lc.SizeBytes())
})
_ = ms.NewGauge(fmt.Sprintf(`vm_streamaggr_labels_compressor_items_count{%s}`, metricLabels), func() float64 {
return float64(d.lc.ItemsCount())
})
d.dedupFlushDuration = ms.NewHistogram(fmt.Sprintf(`vm_streamaggr_dedup_flush_duration_seconds{%s}`, metricLabels)) d.dedupFlushDuration = ms.NewHistogram(fmt.Sprintf(`vm_streamaggr_dedup_flush_duration_seconds{%s}`, metricLabels))
d.dedupFlushTimeouts = ms.NewCounter(fmt.Sprintf(`vm_streamaggr_dedup_flush_timeouts_total{%s}`, metricLabels)) d.dedupFlushTimeouts = ms.NewCounter(fmt.Sprintf(`vm_streamaggr_dedup_flush_timeouts_total{%s}`, metricLabels))
@ -87,6 +98,7 @@ func (d *Deduplicator) Push(tss []prompbmarshal.TimeSeries) {
pss := ctx.pss pss := ctx.pss
labels := &ctx.labels labels := &ctx.labels
buf := ctx.buf buf := ctx.buf
deleteDeadline := time.Now().UnixMilli() + d.stalenessInterval
dropLabels := d.dropLabels dropLabels := d.dropLabels
for _, ts := range tss { for _, ts := range tss {
@ -101,7 +113,7 @@ func (d *Deduplicator) Push(tss []prompbmarshal.TimeSeries) {
labels.Sort() labels.Sort()
bufLen := len(buf) bufLen := len(buf)
buf = lc.Compress(buf, labels.Labels) buf = d.lc.Compress(buf, labels.Labels, deleteDeadline)
key := bytesutil.ToUnsafeString(buf[bufLen:]) key := bytesutil.ToUnsafeString(buf[bufLen:])
for _, s := range ts.Samples { for _, s := range ts.Samples {
pss = append(pss, pushSample{ pss = append(pss, pushSample{
@ -112,7 +124,7 @@ func (d *Deduplicator) Push(tss []prompbmarshal.TimeSeries) {
} }
} }
d.da.pushSamples(pss) d.da.pushSamples(pss, deleteDeadline)
ctx.pss = pss ctx.pss = pss
ctx.buf = buf ctx.buf = buf
@ -145,7 +157,8 @@ func (d *Deduplicator) flush(pushFunc PushFunc, dedupInterval time.Duration) {
startTime := time.Now() startTime := time.Now()
timestamp := startTime.UnixMilli() timestamp := startTime.UnixMilli()
d.da.flush(func(pss []pushSample) { deleteDeadline := timestamp + d.stalenessInterval
d.da.flush(func(pss []pushSample, deleteDeadline int64) {
ctx := getDeduplicatorFlushCtx() ctx := getDeduplicatorFlushCtx()
tss := ctx.tss tss := ctx.tss
@ -153,7 +166,7 @@ func (d *Deduplicator) flush(pushFunc PushFunc, dedupInterval time.Duration) {
samples := ctx.samples samples := ctx.samples
for _, ps := range pss { for _, ps := range pss {
labelsLen := len(labels) labelsLen := len(labels)
labels = decompressLabels(labels, ps.key) labels = decompressLabels(labels, d.lc, ps.key)
samplesLen := len(samples) samplesLen := len(samples)
samples = append(samples, prompbmarshal.Sample{ samples = append(samples, prompbmarshal.Sample{
@ -172,7 +185,7 @@ func (d *Deduplicator) flush(pushFunc PushFunc, dedupInterval time.Duration) {
ctx.labels = labels ctx.labels = labels
ctx.samples = samples ctx.samples = samples
putDeduplicatorFlushCtx(ctx) putDeduplicatorFlushCtx(ctx)
}) }, timestamp, deleteDeadline)
duration := time.Since(startTime) duration := time.Since(startTime)
d.dedupFlushDuration.Update(duration.Seconds()) d.dedupFlushDuration.Update(duration.Seconds())

View file

@ -1,42 +1,32 @@
package streamaggr package streamaggr
import ( import (
"math"
"sync" "sync"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/metrics" "github.com/VictoriaMetrics/metrics"
) )
// histogramBucketAggrState calculates output=histogram_bucket, e.g. VictoriaMetrics histogram over input samples. // histogramBucketAggrState calculates output=histogram_bucket, e.g. VictoriaMetrics histogram over input samples.
type histogramBucketAggrState struct { type histogramBucketAggrState struct {
m sync.Map m sync.Map
stalenessSecs uint64
} }
type histogramBucketStateValue struct { type histogramBucketStateValue struct {
mu sync.Mutex mu sync.Mutex
h metrics.Histogram h metrics.Histogram
deleteDeadline uint64 deleteDeadline int64
deleted bool deleted bool
} }
func newHistogramBucketAggrState(stalenessInterval time.Duration) *histogramBucketAggrState { func newHistogramBucketAggrState() *histogramBucketAggrState {
stalenessSecs := roundDurationToSecs(stalenessInterval) return &histogramBucketAggrState{}
return &histogramBucketAggrState{
stalenessSecs: stalenessSecs,
}
} }
func (as *histogramBucketAggrState) pushSamples(samples []pushSample) { func (as *histogramBucketAggrState) pushSamples(samples []pushSample, deleteDeadline int64, includeInputKey bool) {
currentTime := fasttime.UnixTimestamp()
deleteDeadline := currentTime + as.stalenessSecs
for i := range samples { for i := range samples {
s := &samples[i] s := &samples[i]
outputKey := getOutputKey(s.key) outputKey := getOutputKey(s.key, includeInputKey)
again: again:
v, ok := as.m.Load(outputKey) v, ok := as.m.Load(outputKey)
@ -66,13 +56,13 @@ func (as *histogramBucketAggrState) pushSamples(samples []pushSample) {
} }
} }
func (as *histogramBucketAggrState) removeOldEntries(currentTime uint64) { func (as *histogramBucketAggrState) removeOldEntries(ctx *flushCtx) {
m := &as.m m := &as.m
m.Range(func(k, v any) bool { m.Range(func(k, v any) bool {
sv := v.(*histogramBucketStateValue) sv := v.(*histogramBucketStateValue)
sv.mu.Lock() sv.mu.Lock()
deleted := currentTime > sv.deleteDeadline deleted := ctx.flushTimestamp > sv.deleteDeadline
if deleted { if deleted {
// Mark the current entry as deleted // Mark the current entry as deleted
sv.deleted = deleted sv.deleted = deleted
@ -80,6 +70,8 @@ func (as *histogramBucketAggrState) removeOldEntries(currentTime uint64) {
sv.mu.Unlock() sv.mu.Unlock()
if deleted { if deleted {
key := k.(string)
ctx.a.lc.Delete(bytesutil.ToUnsafeBytes(key), ctx.flushTimestamp)
m.Delete(k) m.Delete(k)
} }
return true return true
@ -87,9 +79,7 @@ func (as *histogramBucketAggrState) removeOldEntries(currentTime uint64) {
} }
func (as *histogramBucketAggrState) flushState(ctx *flushCtx) { func (as *histogramBucketAggrState) flushState(ctx *flushCtx) {
currentTime := fasttime.UnixTimestamp() as.removeOldEntries(ctx)
as.removeOldEntries(currentTime)
m := &as.m m := &as.m
m.Range(func(k, v any) bool { m.Range(func(k, v any) bool {
@ -105,11 +95,3 @@ func (as *histogramBucketAggrState) flushState(ctx *flushCtx) {
return true return true
}) })
} }
func roundDurationToSecs(d time.Duration) uint64 {
if d < 0 {
return 0
}
secs := d.Seconds()
return uint64(math.Ceil(secs))
}

View file

@ -12,46 +12,45 @@ type lastAggrState struct {
} }
type lastStateValue struct { type lastStateValue struct {
mu sync.Mutex mu sync.Mutex
last float64 last float64
timestamp int64 timestamp int64
deleted bool deleted bool
defined bool
deleteDeadline int64
} }
func newLastAggrState() *lastAggrState { func newLastAggrState() *lastAggrState {
return &lastAggrState{} return &lastAggrState{}
} }
func (as *lastAggrState) pushSamples(samples []pushSample) { func (as *lastAggrState) pushSamples(samples []pushSample, deleteDeadline int64, includeInputKey bool) {
for i := range samples { for i := range samples {
s := &samples[i] s := &samples[i]
outputKey := getOutputKey(s.key) outputKey := getOutputKey(s.key, includeInputKey)
again: again:
v, ok := as.m.Load(outputKey) v, ok := as.m.Load(outputKey)
if !ok { if !ok {
// The entry is missing in the map. Try creating it. // The entry is missing in the map. Try creating it.
v = &lastStateValue{ v = &lastStateValue{}
last: s.value,
timestamp: s.timestamp,
}
outputKey = bytesutil.InternString(outputKey) outputKey = bytesutil.InternString(outputKey)
vNew, loaded := as.m.LoadOrStore(outputKey, v) vNew, loaded := as.m.LoadOrStore(outputKey, v)
if !loaded { if loaded {
// The new entry has been successfully created. // Use the entry created by a concurrent goroutine.
continue v = vNew
} }
// Use the entry created by a concurrent goroutine.
v = vNew
} }
sv := v.(*lastStateValue) sv := v.(*lastStateValue)
sv.mu.Lock() sv.mu.Lock()
deleted := sv.deleted deleted := sv.deleted
if !deleted { if !deleted {
if s.timestamp >= sv.timestamp { if !sv.defined || s.timestamp >= sv.timestamp {
sv.last = s.value sv.last = s.value
sv.timestamp = s.timestamp sv.timestamp = s.timestamp
sv.deleteDeadline = deleteDeadline
} }
sv.defined = true
} }
sv.mu.Unlock() sv.mu.Unlock()
if deleted { if deleted {
@ -70,6 +69,14 @@ func (as *lastAggrState) flushState(ctx *flushCtx) {
sv := v.(*lastStateValue) sv := v.(*lastStateValue)
sv.mu.Lock() sv.mu.Lock()
if ctx.flushTimestamp > sv.deleteDeadline {
sv.deleted = true
sv.mu.Unlock()
key := k.(string)
ctx.a.lc.Delete(bytesutil.ToUnsafeBytes(key), ctx.flushTimestamp)
m.Delete(k)
return true
}
last := sv.last last := sv.last
// Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls. // Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls.
sv.deleted = true sv.deleted = true

View file

@ -12,35 +12,33 @@ type maxAggrState struct {
} }
type maxStateValue struct { type maxStateValue struct {
mu sync.Mutex mu sync.Mutex
max float64 max float64
deleted bool deleted bool
defined bool
deleteDeadline int64
} }
func newMaxAggrState() *maxAggrState { func newMaxAggrState() *maxAggrState {
return &maxAggrState{} return &maxAggrState{}
} }
func (as *maxAggrState) pushSamples(samples []pushSample) { func (as *maxAggrState) pushSamples(samples []pushSample, deleteDeadline int64, includeInputKey bool) {
for i := range samples { for i := range samples {
s := &samples[i] s := &samples[i]
outputKey := getOutputKey(s.key) outputKey := getOutputKey(s.key, includeInputKey)
again: again:
v, ok := as.m.Load(outputKey) v, ok := as.m.Load(outputKey)
if !ok { if !ok {
// The entry is missing in the map. Try creating it. // The entry is missing in the map. Try creating it.
v = &maxStateValue{ v = &maxStateValue{}
max: s.value,
}
outputKey = bytesutil.InternString(outputKey) outputKey = bytesutil.InternString(outputKey)
vNew, loaded := as.m.LoadOrStore(outputKey, v) vNew, loaded := as.m.LoadOrStore(outputKey, v)
if !loaded { if loaded {
// The new entry has been successfully created. // Use the entry created by a concurrent goroutine.
continue v = vNew
} }
// Use the entry created by a concurrent goroutine.
v = vNew
} }
sv := v.(*maxStateValue) sv := v.(*maxStateValue)
sv.mu.Lock() sv.mu.Lock()
@ -49,6 +47,10 @@ func (as *maxAggrState) pushSamples(samples []pushSample) {
if s.value > sv.max { if s.value > sv.max {
sv.max = s.value sv.max = s.value
} }
if !sv.defined {
sv.defined = true
}
sv.deleteDeadline = deleteDeadline
} }
sv.mu.Unlock() sv.mu.Unlock()
if deleted { if deleted {
@ -67,9 +69,21 @@ func (as *maxAggrState) flushState(ctx *flushCtx) {
sv := v.(*maxStateValue) sv := v.(*maxStateValue)
sv.mu.Lock() sv.mu.Lock()
if ctx.flushTimestamp > sv.deleteDeadline {
sv.deleted = true
sv.mu.Unlock()
key := k.(string)
ctx.a.lc.Delete(bytesutil.ToUnsafeBytes(key), ctx.flushTimestamp)
m.Delete(k)
return true
}
if !sv.defined {
sv.mu.Unlock()
return true
}
max := sv.max max := sv.max
// Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls. sv.max = 0
sv.deleted = true sv.defined = false
sv.mu.Unlock() sv.mu.Unlock()
key := k.(string) key := k.(string)

View file

@ -12,43 +12,45 @@ type minAggrState struct {
} }
type minStateValue struct { type minStateValue struct {
mu sync.Mutex mu sync.Mutex
min float64 min float64
deleted bool deleted bool
defined bool
deleteDeadline int64
} }
func newMinAggrState() *minAggrState { func newMinAggrState() *minAggrState {
return &minAggrState{} return &minAggrState{}
} }
func (as *minAggrState) pushSamples(samples []pushSample) { func (as *minAggrState) pushSamples(samples []pushSample, deleteDeadline int64, includeInputKey bool) {
for i := range samples { for i := range samples {
s := &samples[i] s := &samples[i]
outputKey := getOutputKey(s.key) outputKey := getOutputKey(s.key, includeInputKey)
again: again:
v, ok := as.m.Load(outputKey) v, ok := as.m.Load(outputKey)
if !ok { if !ok {
// The entry is missing in the map. Try creating it. // The entry is missing in the map. Try creating it.
v = &minStateValue{ v = &minStateValue{}
min: s.value,
}
outputKey = bytesutil.InternString(outputKey) outputKey = bytesutil.InternString(outputKey)
vNew, loaded := as.m.LoadOrStore(outputKey, v) vNew, loaded := as.m.LoadOrStore(outputKey, v)
if !loaded { if loaded {
// The new entry has been successfully created. // Use the entry created by a concurrent goroutine.
continue v = vNew
} }
// Use the entry created by a concurrent goroutine.
v = vNew
} }
sv := v.(*minStateValue) sv := v.(*minStateValue)
sv.mu.Lock() sv.mu.Lock()
deleted := sv.deleted deleted := sv.deleted
if !deleted { if !deleted {
if s.value < sv.min { if !sv.defined || s.value < sv.min {
sv.min = s.value sv.min = s.value
} }
sv.deleteDeadline = deleteDeadline
if !sv.defined {
sv.defined = true
}
} }
sv.mu.Unlock() sv.mu.Unlock()
if deleted { if deleted {
@ -67,10 +69,23 @@ func (as *minAggrState) flushState(ctx *flushCtx) {
sv := v.(*minStateValue) sv := v.(*minStateValue)
sv.mu.Lock() sv.mu.Lock()
if ctx.flushTimestamp > sv.deleteDeadline {
sv.deleted = true
sv.mu.Unlock()
key := k.(string)
ctx.a.lc.Delete(bytesutil.ToUnsafeBytes(key), ctx.flushTimestamp)
m.Delete(k)
return true
}
if !sv.defined {
sv.mu.Unlock()
return true
}
min := sv.min min := sv.min
// Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls. sv.min = 0
sv.deleted = true sv.defined = false
sv.mu.Unlock() sv.mu.Unlock()
key := k.(string) key := k.(string)
ctx.appendSeries(key, "min", min) ctx.appendSeries(key, "min", min)
return true return true

View file

@ -16,9 +16,10 @@ type quantilesAggrState struct {
} }
type quantilesStateValue struct { type quantilesStateValue struct {
mu sync.Mutex mu sync.Mutex
h *histogram.Fast h *histogram.Fast
deleted bool deleted bool
deleteDeadline int64
} }
func newQuantilesAggrState(phis []float64) *quantilesAggrState { func newQuantilesAggrState(phis []float64) *quantilesAggrState {
@ -27,10 +28,10 @@ func newQuantilesAggrState(phis []float64) *quantilesAggrState {
} }
} }
func (as *quantilesAggrState) pushSamples(samples []pushSample) { func (as *quantilesAggrState) pushSamples(samples []pushSample, deleteDeadline int64, includeInputKey bool) {
for i := range samples { for i := range samples {
s := &samples[i] s := &samples[i]
outputKey := getOutputKey(s.key) outputKey := getOutputKey(s.key, includeInputKey)
again: again:
v, ok := as.m.Load(outputKey) v, ok := as.m.Load(outputKey)
@ -53,6 +54,7 @@ func (as *quantilesAggrState) pushSamples(samples []pushSample) {
deleted := sv.deleted deleted := sv.deleted
if !deleted { if !deleted {
sv.h.Update(s.value) sv.h.Update(s.value)
sv.deleteDeadline = deleteDeadline
} }
sv.mu.Unlock() sv.mu.Unlock()
if deleted { if deleted {
@ -74,10 +76,16 @@ func (as *quantilesAggrState) flushState(ctx *flushCtx) {
sv := v.(*quantilesStateValue) sv := v.(*quantilesStateValue)
sv.mu.Lock() sv.mu.Lock()
if ctx.flushTimestamp > sv.deleteDeadline {
sv.deleted = true
sv.mu.Unlock()
key := k.(string)
ctx.a.lc.Delete(bytesutil.ToUnsafeBytes(key), ctx.flushTimestamp)
m.Delete(k)
return true
}
quantiles = sv.h.Quantiles(quantiles[:0], phis) quantiles = sv.h.Quantiles(quantiles[:0], phis)
histogram.PutFast(sv.h) sv.h.Reset()
// Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls.
sv.deleted = true
sv.mu.Unlock() sv.mu.Unlock()
key := k.(string) key := k.(string)

View file

@ -2,10 +2,8 @@ package streamaggr
import ( import (
"sync" "sync"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
) )
// rateAggrState calculates output=rate_avg and rate_sum, e.g. the average per-second increase rate for counter metrics. // rateAggrState calculates output=rate_avg and rate_sum, e.g. the average per-second increase rate for counter metrics.
@ -14,22 +12,19 @@ type rateAggrState struct {
// isAvg is set to true if rate_avg() must be calculated instead of rate_sum(). // isAvg is set to true if rate_avg() must be calculated instead of rate_sum().
isAvg bool isAvg bool
// Time series state is dropped if no new samples are received during stalenessSecs.
stalenessSecs uint64
} }
type rateStateValue struct { type rateStateValue struct {
mu sync.Mutex mu sync.Mutex
lastValues map[string]rateLastValueState lastValues map[string]rateLastValueState
deleteDeadline uint64 deleteDeadline int64
deleted bool deleted bool
} }
type rateLastValueState struct { type rateLastValueState struct {
value float64 value float64
timestamp int64 timestamp int64
deleteDeadline uint64 deleteDeadline int64
// increase stores cumulative increase for the current time series on the current aggregation interval // increase stores cumulative increase for the current time series on the current aggregation interval
increase float64 increase float64
@ -38,17 +33,13 @@ type rateLastValueState struct {
prevTimestamp int64 prevTimestamp int64
} }
func newRateAggrState(stalenessInterval time.Duration, isAvg bool) *rateAggrState { func newRateAggrState(isAvg bool) *rateAggrState {
stalenessSecs := roundDurationToSecs(stalenessInterval)
return &rateAggrState{ return &rateAggrState{
isAvg: isAvg, isAvg: isAvg,
stalenessSecs: stalenessSecs,
} }
} }
func (as *rateAggrState) pushSamples(samples []pushSample) { func (as *rateAggrState) pushSamples(samples []pushSample, deleteDeadline int64, _ bool) {
currentTime := fasttime.UnixTimestamp()
deleteDeadline := currentTime + as.stalenessSecs
for i := range samples { for i := range samples {
s := &samples[i] s := &samples[i]
inputKey, outputKey := getInputOutputKey(s.key) inputKey, outputKey := getInputOutputKey(s.key)
@ -106,11 +97,9 @@ func (as *rateAggrState) pushSamples(samples []pushSample) {
} }
func (as *rateAggrState) flushState(ctx *flushCtx) { func (as *rateAggrState) flushState(ctx *flushCtx) {
currentTime := fasttime.UnixTimestamp()
suffix := as.getSuffix() suffix := as.getSuffix()
as.removeOldEntries(currentTime) as.removeOldEntries(ctx)
m := &as.m m := &as.m
m.Range(func(k, v any) bool { m.Range(func(k, v any) bool {
@ -156,16 +145,18 @@ func (as *rateAggrState) getSuffix() string {
return "rate_sum" return "rate_sum"
} }
func (as *rateAggrState) removeOldEntries(currentTime uint64) { func (as *rateAggrState) removeOldEntries(ctx *flushCtx) {
m := &as.m m := &as.m
m.Range(func(k, v any) bool { m.Range(func(k, v any) bool {
sv := v.(*rateStateValue) sv := v.(*rateStateValue)
sv.mu.Lock() sv.mu.Lock()
if currentTime > sv.deleteDeadline { if ctx.flushTimestamp > sv.deleteDeadline {
// Mark the current entry as deleted // Mark the current entry as deleted
sv.deleted = true sv.deleted = true
sv.mu.Unlock() sv.mu.Unlock()
key := k.(string)
ctx.a.lc.Delete(bytesutil.ToUnsafeBytes(key), ctx.flushTimestamp)
m.Delete(k) m.Delete(k)
return true return true
} }
@ -173,7 +164,7 @@ func (as *rateAggrState) removeOldEntries(currentTime uint64) {
// Delete outdated entries in sv.lastValues // Delete outdated entries in sv.lastValues
lvs := sv.lastValues lvs := sv.lastValues
for k1, lv := range lvs { for k1, lv := range lvs {
if currentTime > lv.deleteDeadline { if ctx.flushTimestamp > lv.deleteDeadline {
delete(lvs, k1) delete(lvs, k1)
} }
} }

View file

@ -13,21 +13,22 @@ type stddevAggrState struct {
} }
type stddevStateValue struct { type stddevStateValue struct {
mu sync.Mutex mu sync.Mutex
count float64 count float64
avg float64 avg float64
q float64 q float64
deleted bool deleted bool
deleteDeadline int64
} }
func newStddevAggrState() *stddevAggrState { func newStddevAggrState() *stddevAggrState {
return &stddevAggrState{} return &stddevAggrState{}
} }
func (as *stddevAggrState) pushSamples(samples []pushSample) { func (as *stddevAggrState) pushSamples(samples []pushSample, deleteDeadline int64, includeInputKey bool) {
for i := range samples { for i := range samples {
s := &samples[i] s := &samples[i]
outputKey := getOutputKey(s.key) outputKey := getOutputKey(s.key, includeInputKey)
again: again:
v, ok := as.m.Load(outputKey) v, ok := as.m.Load(outputKey)
@ -50,6 +51,7 @@ func (as *stddevAggrState) pushSamples(samples []pushSample) {
avg := sv.avg + (s.value-sv.avg)/sv.count avg := sv.avg + (s.value-sv.avg)/sv.count
sv.q += (s.value - sv.avg) * (s.value - avg) sv.q += (s.value - sv.avg) * (s.value - avg)
sv.avg = avg sv.avg = avg
sv.deleteDeadline = deleteDeadline
} }
sv.mu.Unlock() sv.mu.Unlock()
if deleted { if deleted {
@ -68,9 +70,23 @@ func (as *stddevAggrState) flushState(ctx *flushCtx) {
sv := v.(*stddevStateValue) sv := v.(*stddevStateValue)
sv.mu.Lock() sv.mu.Lock()
if ctx.flushTimestamp > sv.deleteDeadline {
sv.deleted = true
sv.mu.Unlock()
key := k.(string)
ctx.a.lc.Delete(bytesutil.ToUnsafeBytes(key), ctx.flushTimestamp)
m.Delete(k)
return true
}
if sv.count == 0 {
sv.mu.Unlock()
return true
}
stddev := math.Sqrt(sv.q / sv.count) stddev := math.Sqrt(sv.q / sv.count)
// Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls. sv.q = 0
sv.deleted = true sv.count = 0
sv.avg = 0
sv.mu.Unlock() sv.mu.Unlock()
key := k.(string) key := k.(string)

View file

@ -12,21 +12,22 @@ type stdvarAggrState struct {
} }
type stdvarStateValue struct { type stdvarStateValue struct {
mu sync.Mutex mu sync.Mutex
count float64 count float64
avg float64 avg float64
q float64 q float64
deleted bool deleted bool
deleteDeadline int64
} }
func newStdvarAggrState() *stdvarAggrState { func newStdvarAggrState() *stdvarAggrState {
return &stdvarAggrState{} return &stdvarAggrState{}
} }
func (as *stdvarAggrState) pushSamples(samples []pushSample) { func (as *stdvarAggrState) pushSamples(samples []pushSample, deleteDeadline int64, includeInputKey bool) {
for i := range samples { for i := range samples {
s := &samples[i] s := &samples[i]
outputKey := getOutputKey(s.key) outputKey := getOutputKey(s.key, includeInputKey)
again: again:
v, ok := as.m.Load(outputKey) v, ok := as.m.Load(outputKey)
@ -49,6 +50,7 @@ func (as *stdvarAggrState) pushSamples(samples []pushSample) {
avg := sv.avg + (s.value-sv.avg)/sv.count avg := sv.avg + (s.value-sv.avg)/sv.count
sv.q += (s.value - sv.avg) * (s.value - avg) sv.q += (s.value - sv.avg) * (s.value - avg)
sv.avg = avg sv.avg = avg
sv.deleteDeadline = deleteDeadline
} }
sv.mu.Unlock() sv.mu.Unlock()
if deleted { if deleted {
@ -67,9 +69,22 @@ func (as *stdvarAggrState) flushState(ctx *flushCtx) {
sv := v.(*stdvarStateValue) sv := v.(*stdvarStateValue)
sv.mu.Lock() sv.mu.Lock()
if ctx.flushTimestamp > sv.deleteDeadline {
sv.deleted = true
sv.mu.Unlock()
key := k.(string)
ctx.a.lc.Delete(bytesutil.ToUnsafeBytes(key), ctx.flushTimestamp)
m.Delete(k)
return true
}
if sv.count == 0 {
sv.mu.Unlock()
return true
}
stdvar := sv.q / sv.count stdvar := sv.q / sv.count
// Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls. sv.q = 0
sv.deleted = true sv.count = 0
sv.avg = 0
sv.mu.Unlock() sv.mu.Unlock()
key := k.(string) key := k.(string)

View file

@ -47,19 +47,6 @@ var supportedOutputs = []string{
"unique_samples", "unique_samples",
} }
var (
// lc contains information about all compressed labels for streaming aggregation
lc promutils.LabelsCompressor
_ = metrics.NewGauge(`vm_streamaggr_labels_compressor_size_bytes`, func() float64 {
return float64(lc.SizeBytes())
})
_ = metrics.NewGauge(`vm_streamaggr_labels_compressor_items_count`, func() float64 {
return float64(lc.ItemsCount())
})
)
// LoadFromFile loads Aggregators from the given path and uses the given pushFunc for pushing the aggregated data. // LoadFromFile loads Aggregators from the given path and uses the given pushFunc for pushing the aggregated data.
// //
// opts can contain additional options. If opts is nil, then default options are used. // opts can contain additional options. If opts is nil, then default options are used.
@ -373,13 +360,15 @@ type aggregator struct {
keepMetricNames bool keepMetricNames bool
ignoreOldSamples bool ignoreOldSamples bool
includeInputKey bool
by []string by []string
without []string without []string
aggregateOnlyByTime bool aggregateOnlyByTime bool
// interval is the interval between flushes // interval is the interval between flushes
interval time.Duration interval time.Duration
stalenessInterval int64
// dedupInterval is optional deduplication interval for incoming samples // dedupInterval is optional deduplication interval for incoming samples
dedupInterval time.Duration dedupInterval time.Duration
@ -390,6 +379,9 @@ type aggregator struct {
// aggrOutputs contains aggregate states for the given outputs // aggrOutputs contains aggregate states for the given outputs
aggrOutputs []aggrOutput aggrOutputs []aggrOutput
// lc is used for compressing series keys before passing them to dedupAggr and aggrState
lc *promutils.LabelsCompressor
// minTimestamp is used for ignoring old samples when ignoreOldSamples is set // minTimestamp is used for ignoring old samples when ignoreOldSamples is set
minTimestamp atomic.Int64 minTimestamp atomic.Int64
@ -424,7 +416,7 @@ type aggrState interface {
// pushSamples must push samples to the aggrState. // pushSamples must push samples to the aggrState.
// //
// samples[].key must be cloned by aggrState, since it may change after returning from pushSamples. // samples[].key must be cloned by aggrState, since it may change after returning from pushSamples.
pushSamples(samples []pushSample) pushSamples(samples []pushSample, deleteDeadline int64, includeInputKey bool)
// flushState must flush aggrState data to ctx. // flushState must flush aggrState data to ctx.
flushState(ctx *flushCtx) flushState(ctx *flushCtx)
@ -556,11 +548,15 @@ func newAggregator(cfg *Config, path string, pushFunc PushFunc, ms *metrics.Set,
} }
aggrOutputs := make([]aggrOutput, len(cfg.Outputs)) aggrOutputs := make([]aggrOutput, len(cfg.Outputs))
outputsSeen := make(map[string]struct{}, len(cfg.Outputs)) outputsSeen := make(map[string]struct{}, len(cfg.Outputs))
includeInputKey := false
for i, output := range cfg.Outputs { for i, output := range cfg.Outputs {
as, err := newAggrState(output, outputsSeen, stalenessInterval) as, ik, err := newAggrState(output, outputsSeen)
if err != nil { if err != nil {
return nil, err return nil, err
} }
if ik {
includeInputKey = true
}
aggrOutputs[i] = aggrOutput{ aggrOutputs[i] = aggrOutput{
as: as, as: as,
@ -592,9 +588,11 @@ func newAggregator(cfg *Config, path string, pushFunc PushFunc, ms *metrics.Set,
by: by, by: by,
without: without, without: without,
aggregateOnlyByTime: aggregateOnlyByTime, aggregateOnlyByTime: aggregateOnlyByTime,
lc: promutils.NewLabelsCompressor(),
interval: interval, interval: interval,
dedupInterval: dedupInterval, dedupInterval: dedupInterval,
stalenessInterval: stalenessInterval.Milliseconds(),
aggrOutputs: aggrOutputs, aggrOutputs: aggrOutputs,
@ -614,18 +612,25 @@ func newAggregator(cfg *Config, path string, pushFunc PushFunc, ms *metrics.Set,
} }
if dedupInterval > 0 { if dedupInterval > 0 {
a.da = newDedupAggr() includeInputKey = true
a.da = newDedupAggr(a.lc)
_ = ms.NewGauge(fmt.Sprintf(`vm_streamaggr_dedup_state_size_bytes{%s}`, metricLabels), func() float64 { _ = ms.NewGauge(fmt.Sprintf(`vm_streamaggr_dedup_state_size_bytes{%s}`, metricLabels), func() float64 {
n := a.da.sizeBytes() return float64(a.da.sizeBytes())
return float64(n)
}) })
_ = ms.NewGauge(fmt.Sprintf(`vm_streamaggr_dedup_state_items_count{%s}`, metricLabels), func() float64 { _ = ms.NewGauge(fmt.Sprintf(`vm_streamaggr_dedup_state_items_count{%s}`, metricLabels), func() float64 {
n := a.da.itemsCount() return float64(a.da.itemsCount())
return float64(n) })
_ = ms.NewGauge(fmt.Sprintf(`vm_streamaggr_labels_compressor_size_bytes{%s}`, metricLabels), func() float64 {
return float64(a.lc.SizeBytes())
})
_ = ms.NewGauge(fmt.Sprintf(`vm_streamaggr_labels_compressor_items_count{%s}`, metricLabels), func() float64 {
return float64(a.lc.ItemsCount())
}) })
} }
a.includeInputKey = includeInputKey
alignFlushToInterval := !opts.NoAlignFlushToInterval alignFlushToInterval := !opts.NoAlignFlushToInterval
if v := cfg.NoAlignFlushToInterval; v != nil { if v := cfg.NoAlignFlushToInterval; v != nil {
alignFlushToInterval = !*v alignFlushToInterval = !*v
@ -645,20 +650,20 @@ func newAggregator(cfg *Config, path string, pushFunc PushFunc, ms *metrics.Set,
return a, nil return a, nil
} }
func newAggrState(output string, outputsSeen map[string]struct{}, stalenessInterval time.Duration) (aggrState, error) { func newAggrState(output string, outputsSeen map[string]struct{}) (aggrState, bool, error) {
// check for duplicated output // check for duplicated output
if _, ok := outputsSeen[output]; ok { if _, ok := outputsSeen[output]; ok {
return nil, fmt.Errorf("`outputs` list contains duplicate aggregation function: %s", output) return nil, true, fmt.Errorf("`outputs` list contains duplicate aggregation function: %s", output)
} }
outputsSeen[output] = struct{}{} outputsSeen[output] = struct{}{}
if strings.HasPrefix(output, "quantiles(") { if strings.HasPrefix(output, "quantiles(") {
if !strings.HasSuffix(output, ")") { if !strings.HasSuffix(output, ")") {
return nil, fmt.Errorf("missing closing brace for `quantiles()` output") return nil, false, fmt.Errorf("missing closing brace for `quantiles()` output")
} }
argsStr := output[len("quantiles(") : len(output)-1] argsStr := output[len("quantiles(") : len(output)-1]
if len(argsStr) == 0 { if len(argsStr) == 0 {
return nil, fmt.Errorf("`quantiles()` must contain at least one phi") return nil, false, fmt.Errorf("`quantiles()` must contain at least one phi")
} }
args := strings.Split(argsStr, ",") args := strings.Split(argsStr, ",")
phis := make([]float64, len(args)) phis := make([]float64, len(args))
@ -666,57 +671,57 @@ func newAggrState(output string, outputsSeen map[string]struct{}, stalenessInter
arg = strings.TrimSpace(arg) arg = strings.TrimSpace(arg)
phi, err := strconv.ParseFloat(arg, 64) phi, err := strconv.ParseFloat(arg, 64)
if err != nil { if err != nil {
return nil, fmt.Errorf("cannot parse phi=%q for quantiles(%s): %w", arg, argsStr, err) return nil, false, fmt.Errorf("cannot parse phi=%q for quantiles(%s): %w", arg, argsStr, err)
} }
if phi < 0 || phi > 1 { if phi < 0 || phi > 1 {
return nil, fmt.Errorf("phi inside quantiles(%s) must be in the range [0..1]; got %v", argsStr, phi) return nil, false, fmt.Errorf("phi inside quantiles(%s) must be in the range [0..1]; got %v", argsStr, phi)
} }
phis[i] = phi phis[i] = phi
} }
if _, ok := outputsSeen["quantiles"]; ok { if _, ok := outputsSeen["quantiles"]; ok {
return nil, fmt.Errorf("`outputs` list contains duplicated `quantiles()` function, please combine multiple phi* like `quantiles(0.5, 0.9)`") return nil, false, fmt.Errorf("`outputs` list contains duplicated `quantiles()` function, please combine multiple phi* like `quantiles(0.5, 0.9)`")
} }
outputsSeen["quantiles"] = struct{}{} outputsSeen["quantiles"] = struct{}{}
return newQuantilesAggrState(phis), nil return newQuantilesAggrState(phis), false, nil
} }
switch output { switch output {
case "avg": case "avg":
return newAvgAggrState(), nil return newAvgAggrState(), false, nil
case "count_samples": case "count_samples":
return newCountSamplesAggrState(), nil return newCountSamplesAggrState(), false, nil
case "count_series": case "count_series":
return newCountSeriesAggrState(), nil return newCountSeriesAggrState(), true, nil
case "histogram_bucket": case "histogram_bucket":
return newHistogramBucketAggrState(stalenessInterval), nil return newHistogramBucketAggrState(), false, nil
case "increase": case "increase":
return newTotalAggrState(stalenessInterval, true, true), nil return newTotalAggrState(true, true), true, nil
case "increase_prometheus": case "increase_prometheus":
return newTotalAggrState(stalenessInterval, true, false), nil return newTotalAggrState(true, false), true, nil
case "last": case "last":
return newLastAggrState(), nil return newLastAggrState(), false, nil
case "max": case "max":
return newMaxAggrState(), nil return newMaxAggrState(), false, nil
case "min": case "min":
return newMinAggrState(), nil return newMinAggrState(), false, nil
case "rate_avg": case "rate_avg":
return newRateAggrState(stalenessInterval, true), nil return newRateAggrState(true), true, nil
case "rate_sum": case "rate_sum":
return newRateAggrState(stalenessInterval, false), nil return newRateAggrState(false), true, nil
case "stddev": case "stddev":
return newStddevAggrState(), nil return newStddevAggrState(), false, nil
case "stdvar": case "stdvar":
return newStdvarAggrState(), nil return newStdvarAggrState(), false, nil
case "sum_samples": case "sum_samples":
return newSumSamplesAggrState(), nil return newSumSamplesAggrState(), false, nil
case "total": case "total":
return newTotalAggrState(stalenessInterval, false, true), nil return newTotalAggrState(false, true), true, nil
case "total_prometheus": case "total_prometheus":
return newTotalAggrState(stalenessInterval, false, false), nil return newTotalAggrState(false, false), true, nil
case "unique_samples": case "unique_samples":
return newUniqueSamplesAggrState(), nil return newUniqueSamplesAggrState(), false, nil
default: default:
return nil, fmt.Errorf("unsupported output=%q; supported values: %s; see https://docs.victoriametrics.com/stream-aggregation/", output, supportedOutputs) return nil, false, fmt.Errorf("unsupported output=%q; supported values: %s; see https://docs.victoriametrics.com/stream-aggregation/", output, supportedOutputs)
} }
} }
@ -823,8 +828,10 @@ func (a *aggregator) dedupFlush() {
} }
startTime := time.Now() startTime := time.Now()
timestamp := startTime.UnixMilli()
deleteDeadline := timestamp + a.stalenessInterval
a.da.flush(a.pushSamples) a.da.flush(a.pushSamples, timestamp, deleteDeadline)
d := time.Since(startTime) d := time.Since(startTime)
a.dedupFlushDuration.Update(d.Seconds()) a.dedupFlushDuration.Update(d.Seconds())
@ -902,6 +909,7 @@ func (a *aggregator) Push(tss []prompbmarshal.TimeSeries, matchIdxs []byte) {
minTimestamp := a.minTimestamp.Load() minTimestamp := a.minTimestamp.Load()
nowMsec := time.Now().UnixMilli() nowMsec := time.Now().UnixMilli()
deleteDeadline := nowMsec + a.stalenessInterval
var maxLagMsec int64 var maxLagMsec int64
for idx, ts := range tss { for idx, ts := range tss {
if !a.match.Match(ts.Labels) { if !a.match.Match(ts.Labels) {
@ -930,7 +938,7 @@ func (a *aggregator) Push(tss []prompbmarshal.TimeSeries, matchIdxs []byte) {
} }
bufLen := len(buf) bufLen := len(buf)
buf = compressLabels(buf, inputLabels.Labels, outputLabels.Labels) buf = compressLabels(buf, a.lc, inputLabels.Labels, outputLabels.Labels, a.includeInputKey, deleteDeadline)
// key remains valid only by the end of this function and can't be reused after // key remains valid only by the end of this function and can't be reused after
// do not intern key because number of unique keys could be too high // do not intern key because number of unique keys could be too high
key := bytesutil.ToUnsafeString(buf[bufLen:]) key := bytesutil.ToUnsafeString(buf[bufLen:])
@ -964,52 +972,60 @@ func (a *aggregator) Push(tss []prompbmarshal.TimeSeries, matchIdxs []byte) {
ctx.buf = buf ctx.buf = buf
if a.da != nil { if a.da != nil {
a.da.pushSamples(samples) a.da.pushSamples(samples, deleteDeadline)
} else { } else {
a.pushSamples(samples) a.pushSamples(samples, deleteDeadline)
} }
} }
func compressLabels(dst []byte, inputLabels, outputLabels []prompbmarshal.Label) []byte { func compressLabels(dst []byte, lc *promutils.LabelsCompressor, inputLabels, outputLabels []prompbmarshal.Label, includeInputKey bool, deleteDeadline int64) []byte {
bb := bbPool.Get() bb := bbPool.Get()
bb.B = lc.Compress(bb.B, inputLabels) bb.B = lc.Compress(bb.B, outputLabels, deleteDeadline)
dst = encoding.MarshalVarUint64(dst, uint64(len(bb.B))) if includeInputKey {
dst = append(dst, bb.B...) dst = encoding.MarshalVarUint64(dst, uint64(len(bb.B)))
bbPool.Put(bb) dst = append(dst, bb.B...)
dst = lc.Compress(dst, outputLabels) bbPool.Put(bb)
dst = lc.Compress(dst, inputLabels, deleteDeadline)
} else {
dst = append(dst, bb.B...)
bbPool.Put(bb)
}
return dst return dst
} }
func decompressLabels(dst []prompbmarshal.Label, key string) []prompbmarshal.Label { func decompressLabels(dst []prompbmarshal.Label, lc *promutils.LabelsCompressor, key string) []prompbmarshal.Label {
return lc.Decompress(dst, bytesutil.ToUnsafeBytes(key)) return lc.Decompress(dst, bytesutil.ToUnsafeBytes(key))
} }
func getOutputKey(key string) string { func getOutputKey(key string, includeInputKey bool) string {
src := bytesutil.ToUnsafeBytes(key) src := bytesutil.ToUnsafeBytes(key)
inputKeyLen, nSize := encoding.UnmarshalVarUint64(src) outputKey := src
if nSize <= 0 { if includeInputKey {
logger.Panicf("BUG: cannot unmarshal inputKeyLen from uvarint") outputKeyLen, nSize := encoding.UnmarshalVarUint64(src)
if nSize <= 0 {
logger.Panicf("BUG: cannot unmarshal outputKeyLen from uvarint")
}
src = src[nSize:]
outputKey = src[:outputKeyLen]
} }
src = src[nSize:]
outputKey := src[inputKeyLen:]
return bytesutil.ToUnsafeString(outputKey) return bytesutil.ToUnsafeString(outputKey)
} }
func getInputOutputKey(key string) (string, string) { func getInputOutputKey(key string) (string, string) {
src := bytesutil.ToUnsafeBytes(key) src := bytesutil.ToUnsafeBytes(key)
inputKeyLen, nSize := encoding.UnmarshalVarUint64(src) outputKeyLen, nSize := encoding.UnmarshalVarUint64(src)
if nSize <= 0 { if nSize <= 0 {
logger.Panicf("BUG: cannot unmarshal inputKeyLen from uvarint") logger.Panicf("BUG: cannot unmarshal outputKeyLen from uvarint")
} }
src = src[nSize:] src = src[nSize:]
inputKey := src[:inputKeyLen] outputKey := src[:outputKeyLen]
outputKey := src[inputKeyLen:] inputKey := src[outputKeyLen:]
return bytesutil.ToUnsafeString(inputKey), bytesutil.ToUnsafeString(outputKey) return bytesutil.ToUnsafeString(inputKey), bytesutil.ToUnsafeString(outputKey)
} }
func (a *aggregator) pushSamples(samples []pushSample) { func (a *aggregator) pushSamples(samples []pushSample, deleteDeadline int64) {
for _, ao := range a.aggrOutputs { for _, ao := range a.aggrOutputs {
ao.as.pushSamples(samples) ao.as.pushSamples(samples, deleteDeadline, a.includeInputKey)
} }
} }
@ -1169,7 +1185,7 @@ func (ctx *flushCtx) flushSeries() {
func (ctx *flushCtx) appendSeries(key, suffix string, value float64) { func (ctx *flushCtx) appendSeries(key, suffix string, value float64) {
labelsLen := len(ctx.labels) labelsLen := len(ctx.labels)
samplesLen := len(ctx.samples) samplesLen := len(ctx.samples)
ctx.labels = decompressLabels(ctx.labels, key) ctx.labels = decompressLabels(ctx.labels, ctx.a.lc, key)
if !ctx.a.keepMetricNames { if !ctx.a.keepMetricNames {
ctx.labels = addMetricSuffix(ctx.labels, labelsLen, ctx.a.suffix, suffix) ctx.labels = addMetricSuffix(ctx.labels, labelsLen, ctx.a.suffix, suffix)
} }
@ -1191,7 +1207,7 @@ func (ctx *flushCtx) appendSeries(key, suffix string, value float64) {
func (ctx *flushCtx) appendSeriesWithExtraLabel(key, suffix string, value float64, extraName, extraValue string) { func (ctx *flushCtx) appendSeriesWithExtraLabel(key, suffix string, value float64, extraName, extraValue string) {
labelsLen := len(ctx.labels) labelsLen := len(ctx.labels)
samplesLen := len(ctx.samples) samplesLen := len(ctx.samples)
ctx.labels = decompressLabels(ctx.labels, key) ctx.labels = decompressLabels(ctx.labels, ctx.a.lc, key)
if !ctx.a.keepMetricNames { if !ctx.a.keepMetricNames {
ctx.labels = addMetricSuffix(ctx.labels, labelsLen, ctx.a.suffix, suffix) ctx.labels = addMetricSuffix(ctx.labels, labelsLen, ctx.a.suffix, suffix)
} }

View file

@ -12,41 +12,46 @@ type sumSamplesAggrState struct {
} }
type sumSamplesStateValue struct { type sumSamplesStateValue struct {
mu sync.Mutex mu sync.Mutex
sum float64 sum float64
deleted bool deleted bool
defined bool
deleteDeadline int64
} }
func newSumSamplesAggrState() *sumSamplesAggrState { func newSumSamplesAggrState() *sumSamplesAggrState {
return &sumSamplesAggrState{} return &sumSamplesAggrState{}
} }
func (as *sumSamplesAggrState) pushSamples(samples []pushSample) { func (as *sumSamplesAggrState) pushSamples(samples []pushSample, deleteDeadline int64, includeInputKey bool) {
for i := range samples { for i := range samples {
s := &samples[i] s := &samples[i]
outputKey := getOutputKey(s.key) outputKey := getOutputKey(s.key, includeInputKey)
again: again:
v, ok := as.m.Load(outputKey) v, ok := as.m.Load(outputKey)
if !ok { if !ok {
// The entry is missing in the map. Try creating it. // The entry is missing in the map. Try creating it.
v = &sumSamplesStateValue{ v = &sumSamplesStateValue{}
sum: s.value,
}
outputKey = bytesutil.InternString(outputKey) outputKey = bytesutil.InternString(outputKey)
vNew, loaded := as.m.LoadOrStore(outputKey, v) vNew, loaded := as.m.LoadOrStore(outputKey, v)
if !loaded { if loaded {
// The new entry has been successfully created. // Use the entry created by a concurrent goroutine.
continue v = vNew
} }
// Use the entry created by a concurrent goroutine.
v = vNew
} }
sv := v.(*sumSamplesStateValue) sv := v.(*sumSamplesStateValue)
sv.mu.Lock() sv.mu.Lock()
if !sv.defined {
sv.defined = true
}
deleted := sv.deleted deleted := sv.deleted
if !deleted { if !deleted {
sv.sum += s.value sv.sum += s.value
sv.deleteDeadline = deleteDeadline
if !sv.defined {
sv.defined = true
}
} }
sv.mu.Unlock() sv.mu.Unlock()
if deleted { if deleted {
@ -65,9 +70,22 @@ func (as *sumSamplesAggrState) flushState(ctx *flushCtx) {
sv := v.(*sumSamplesStateValue) sv := v.(*sumSamplesStateValue)
sv.mu.Lock() sv.mu.Lock()
if ctx.flushTimestamp > sv.deleteDeadline {
sv.deleted = true
sv.mu.Unlock()
key := k.(string)
ctx.a.lc.Delete(bytesutil.ToUnsafeBytes(key), ctx.flushTimestamp)
m.Delete(k)
return true
}
if !sv.defined {
sv.mu.Unlock()
return true
}
sum := sv.sum sum := sv.sum
// Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls. sv.defined = false
sv.deleted = true sv.sum = 0
sv.mu.Unlock() sv.mu.Unlock()
key := k.(string) key := k.(string)

View file

@ -3,10 +3,9 @@ package streamaggr
import ( import (
"math" "math"
"sync" "sync"
"time" "sync/atomic"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
) )
// totalAggrState calculates output=total, total_prometheus, increase and increase_prometheus. // totalAggrState calculates output=total, total_prometheus, increase and increase_prometheus.
@ -19,48 +18,37 @@ type totalAggrState struct {
// Whether to take into account the first sample in new time series when calculating the output value. // Whether to take into account the first sample in new time series when calculating the output value.
keepFirstSample bool keepFirstSample bool
// Time series state is dropped if no new samples are received during stalenessSecs. // The first sample per each new series is ignored first two intervals
//
// Aslo, the first sample per each new series is ignored during stalenessSecs even if keepFirstSample is set.
// see ignoreFirstSampleDeadline for more details.
stalenessSecs uint64
// The first sample per each new series is ignored until this unix timestamp deadline in seconds even if keepFirstSample is set.
// This allows avoiding an initial spike of the output values at startup when new time series // This allows avoiding an initial spike of the output values at startup when new time series
// cannot be distinguished from already existing series. This is tracked with ignoreFirstSampleDeadline. // cannot be distinguished from already existing series. This is tracked with ignoreFirstSamples.
ignoreFirstSampleDeadline uint64 ignoreFirstSamples atomic.Int32
} }
type totalStateValue struct { type totalStateValue struct {
mu sync.Mutex mu sync.Mutex
lastValues map[string]totalLastValueState lastValues map[string]totalLastValueState
total float64 total float64
deleteDeadline uint64 deleteDeadline int64
deleted bool deleted bool
} }
type totalLastValueState struct { type totalLastValueState struct {
value float64 value float64
timestamp int64 timestamp int64
deleteDeadline uint64 deleteDeadline int64
} }
func newTotalAggrState(stalenessInterval time.Duration, resetTotalOnFlush, keepFirstSample bool) *totalAggrState { func newTotalAggrState(resetTotalOnFlush, keepFirstSample bool) *totalAggrState {
stalenessSecs := roundDurationToSecs(stalenessInterval) as := &totalAggrState{
ignoreFirstSampleDeadline := fasttime.UnixTimestamp() + stalenessSecs resetTotalOnFlush: resetTotalOnFlush,
keepFirstSample: keepFirstSample,
return &totalAggrState{
resetTotalOnFlush: resetTotalOnFlush,
keepFirstSample: keepFirstSample,
stalenessSecs: stalenessSecs,
ignoreFirstSampleDeadline: ignoreFirstSampleDeadline,
} }
as.ignoreFirstSamples.Store(2)
return as
} }
func (as *totalAggrState) pushSamples(samples []pushSample) { func (as *totalAggrState) pushSamples(samples []pushSample, deleteDeadline int64, _ bool) {
currentTime := fasttime.UnixTimestamp() keepFirstSample := as.keepFirstSample && as.ignoreFirstSamples.Load() <= 0
deleteDeadline := currentTime + as.stalenessSecs
keepFirstSample := as.keepFirstSample && currentTime > as.ignoreFirstSampleDeadline
for i := range samples { for i := range samples {
s := &samples[i] s := &samples[i]
inputKey, outputKey := getInputOutputKey(s.key) inputKey, outputKey := getInputOutputKey(s.key)
@ -116,11 +104,9 @@ func (as *totalAggrState) pushSamples(samples []pushSample) {
} }
func (as *totalAggrState) flushState(ctx *flushCtx) { func (as *totalAggrState) flushState(ctx *flushCtx) {
currentTime := fasttime.UnixTimestamp()
suffix := as.getSuffix() suffix := as.getSuffix()
as.removeOldEntries(currentTime) as.removeOldEntries(ctx)
m := &as.m m := &as.m
m.Range(func(k, v any) bool { m.Range(func(k, v any) bool {
@ -143,6 +129,10 @@ func (as *totalAggrState) flushState(ctx *flushCtx) {
} }
return true return true
}) })
ignoreFirstSamples := as.ignoreFirstSamples.Load()
if ignoreFirstSamples > 0 {
as.ignoreFirstSamples.Add(-1)
}
} }
func (as *totalAggrState) getSuffix() string { func (as *totalAggrState) getSuffix() string {
@ -159,16 +149,18 @@ func (as *totalAggrState) getSuffix() string {
return "total_prometheus" return "total_prometheus"
} }
func (as *totalAggrState) removeOldEntries(currentTime uint64) { func (as *totalAggrState) removeOldEntries(ctx *flushCtx) {
m := &as.m m := &as.m
m.Range(func(k, v any) bool { m.Range(func(k, v any) bool {
sv := v.(*totalStateValue) sv := v.(*totalStateValue)
sv.mu.Lock() sv.mu.Lock()
if currentTime > sv.deleteDeadline { if ctx.flushTimestamp > sv.deleteDeadline {
// Mark the current entry as deleted // Mark the current entry as deleted
sv.deleted = true sv.deleted = true
sv.mu.Unlock() sv.mu.Unlock()
key := k.(string)
ctx.a.lc.Delete(bytesutil.ToUnsafeBytes(key), ctx.flushTimestamp)
m.Delete(k) m.Delete(k)
return true return true
} }
@ -176,7 +168,7 @@ func (as *totalAggrState) removeOldEntries(currentTime uint64) {
// Delete outdated entries in sv.lastValues // Delete outdated entries in sv.lastValues
lvs := sv.lastValues lvs := sv.lastValues
for k1, lv := range lvs { for k1, lv := range lvs {
if currentTime > lv.deleteDeadline { if ctx.flushTimestamp > lv.deleteDeadline {
delete(lvs, k1) delete(lvs, k1)
} }
} }

View file

@ -12,37 +12,34 @@ type uniqueSamplesAggrState struct {
} }
type uniqueSamplesStateValue struct { type uniqueSamplesStateValue struct {
mu sync.Mutex mu sync.Mutex
m map[float64]struct{} m map[float64]struct{}
deleted bool deleted bool
deleteDeadline int64
} }
func newUniqueSamplesAggrState() *uniqueSamplesAggrState { func newUniqueSamplesAggrState() *uniqueSamplesAggrState {
return &uniqueSamplesAggrState{} return &uniqueSamplesAggrState{}
} }
func (as *uniqueSamplesAggrState) pushSamples(samples []pushSample) { func (as *uniqueSamplesAggrState) pushSamples(samples []pushSample, deleteDeadline int64, includeInputKey bool) {
for i := range samples { for i := range samples {
s := &samples[i] s := &samples[i]
outputKey := getOutputKey(s.key) outputKey := getOutputKey(s.key, includeInputKey)
again: again:
v, ok := as.m.Load(outputKey) v, ok := as.m.Load(outputKey)
if !ok { if !ok {
// The entry is missing in the map. Try creating it. // The entry is missing in the map. Try creating it.
v = &uniqueSamplesStateValue{ v = &uniqueSamplesStateValue{
m: map[float64]struct{}{ m: make(map[float64]struct{}),
s.value: {},
},
} }
outputKey = bytesutil.InternString(outputKey) outputKey = bytesutil.InternString(outputKey)
vNew, loaded := as.m.LoadOrStore(outputKey, v) vNew, loaded := as.m.LoadOrStore(outputKey, v)
if !loaded { if loaded {
// The new entry has been successfully created. // Use the entry created by a concurrent goroutine.
continue v = vNew
} }
// Use the entry created by a concurrent goroutine.
v = vNew
} }
sv := v.(*uniqueSamplesStateValue) sv := v.(*uniqueSamplesStateValue)
sv.mu.Lock() sv.mu.Lock()
@ -51,6 +48,7 @@ func (as *uniqueSamplesAggrState) pushSamples(samples []pushSample) {
if _, ok := sv.m[s.value]; !ok { if _, ok := sv.m[s.value]; !ok {
sv.m[s.value] = struct{}{} sv.m[s.value] = struct{}{}
} }
sv.deleteDeadline = deleteDeadline
} }
sv.mu.Unlock() sv.mu.Unlock()
if deleted { if deleted {
@ -69,13 +67,21 @@ func (as *uniqueSamplesAggrState) flushState(ctx *flushCtx) {
sv := v.(*uniqueSamplesStateValue) sv := v.(*uniqueSamplesStateValue)
sv.mu.Lock() sv.mu.Lock()
if ctx.flushTimestamp > sv.deleteDeadline {
sv.deleted = true
sv.mu.Unlock()
key := k.(string)
ctx.a.lc.Delete(bytesutil.ToUnsafeBytes(key), ctx.flushTimestamp)
m.Delete(k)
return true
}
n := len(sv.m) n := len(sv.m)
// Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls. sv.m = make(map[float64]struct{})
sv.deleted = true
sv.mu.Unlock() sv.mu.Unlock()
if n > 0 {
key := k.(string) key := k.(string)
ctx.appendSeries(key, "unique_samples", float64(n)) ctx.appendSeries(key, "unique_samples", float64(n))
}
return true return true
}) })
} }