diff --git a/lib/streamaggr/avg.go b/lib/streamaggr/avg.go index 32ab569acc..0f78032203 100644 --- a/lib/streamaggr/avg.go +++ b/lib/streamaggr/avg.go @@ -11,18 +11,23 @@ type avgAggrState struct { m sync.Map } +type avgState struct { + sum float64 + count float64 +} + type avgStateValue struct { - mu sync.Mutex - sum float64 - count int64 - deleted bool + mu sync.Mutex + state [aggrStateSize]avgState + deleted bool + deleteDeadline int64 } func newAvgAggrState() *avgAggrState { return &avgAggrState{} } -func (as *avgAggrState) pushSamples(samples []pushSample) { +func (as *avgAggrState) pushSamples(samples []pushSample, deleteDeadline int64, idx int) { for i := range samples { s := &samples[i] outputKey := getOutputKey(s.key) @@ -31,25 +36,21 @@ func (as *avgAggrState) pushSamples(samples []pushSample) { v, ok := as.m.Load(outputKey) if !ok { // The entry is missing in the map. Try creating it. - v = &avgStateValue{ - sum: s.value, - count: 1, - } + v = &avgStateValue{} outputKey = bytesutil.InternString(outputKey) vNew, loaded := as.m.LoadOrStore(outputKey, v) - if !loaded { - // The entry has been successfully stored - continue + if loaded { + // Use the entry created by a concurrent goroutine. + v = vNew } - // Update the entry created by a concurrent goroutine. - v = vNew } sv := v.(*avgStateValue) sv.mu.Lock() deleted := sv.deleted if !deleted { - sv.sum += s.value - sv.count++ + sv.state[idx].sum += s.value + sv.state[idx].count++ + sv.deleteDeadline = deleteDeadline } sv.mu.Unlock() if deleted { @@ -64,17 +65,26 @@ func (as *avgAggrState) flushState(ctx *flushCtx) { m := &as.m m.Range(func(k, v any) bool { // Atomically delete the entry from the map, so new entry is created for the next flush. - m.Delete(k) - sv := v.(*avgStateValue) sv.mu.Lock() - avg := sv.sum / float64(sv.count) - // Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls. - sv.deleted = true - sv.mu.Unlock() - key := k.(string) - ctx.appendSeries(key, "avg", avg) + // check for stale entries + deleted := ctx.flushTimestamp > sv.deleteDeadline + if deleted { + // Mark the current entry as deleted + sv.deleted = deleted + sv.mu.Unlock() + m.Delete(k) + return true + } + state := sv.state[ctx.idx] + sv.state[ctx.idx] = avgState{} + sv.mu.Unlock() + if state.count > 0 { + key := k.(string) + avg := state.sum/state.count + ctx.appendSeries(key, "avg", avg) + } return true }) } diff --git a/lib/streamaggr/count_samples.go b/lib/streamaggr/count_samples.go index ca2bc709e0..df6d22beff 100644 --- a/lib/streamaggr/count_samples.go +++ b/lib/streamaggr/count_samples.go @@ -12,16 +12,17 @@ type countSamplesAggrState struct { } type countSamplesStateValue struct { - mu sync.Mutex - n uint64 - deleted bool + mu sync.Mutex + state [aggrStateSize]uint64 + deleted bool + deleteDeadline int64 } func newCountSamplesAggrState() *countSamplesAggrState { return &countSamplesAggrState{} } -func (as *countSamplesAggrState) pushSamples(samples []pushSample) { +func (as *countSamplesAggrState) pushSamples(samples []pushSample, deleteDeadline int64, idx int) { for i := range samples { s := &samples[i] outputKey := getOutputKey(s.key) @@ -30,23 +31,20 @@ func (as *countSamplesAggrState) pushSamples(samples []pushSample) { v, ok := as.m.Load(outputKey) if !ok { // The entry is missing in the map. Try creating it. - v = &countSamplesStateValue{ - n: 1, - } + v = &countSamplesStateValue{} outputKey = bytesutil.InternString(outputKey) vNew, loaded := as.m.LoadOrStore(outputKey, v) - if !loaded { - // The new entry has been successfully created. - continue + if loaded { + // Use the entry created by a concurrent goroutine. + v = vNew } - // Use the entry created by a concurrent goroutine. - v = vNew } sv := v.(*countSamplesStateValue) sv.mu.Lock() deleted := sv.deleted if !deleted { - sv.n++ + sv.state[idx]++ + sv.deleteDeadline = deleteDeadline } sv.mu.Unlock() if deleted { @@ -60,18 +58,25 @@ func (as *countSamplesAggrState) pushSamples(samples []pushSample) { func (as *countSamplesAggrState) flushState(ctx *flushCtx) { m := &as.m m.Range(func(k, v any) bool { - // Atomically delete the entry from the map, so new entry is created for the next flush. - m.Delete(k) - sv := v.(*countSamplesStateValue) sv.mu.Lock() - n := sv.n - // Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls. - sv.deleted = true - sv.mu.Unlock() - key := k.(string) - ctx.appendSeries(key, "count_samples", float64(n)) + // check for stale entries + deleted := ctx.flushTimestamp > sv.deleteDeadline + if deleted { + // Mark the current entry as deleted + sv.deleted = deleted + sv.mu.Unlock() + m.Delete(k) + return true + } + state := sv.state[ctx.idx] + sv.state[ctx.idx] = 0 + sv.mu.Unlock() + if state > 0 { + key := k.(string) + ctx.appendSeries(key, "count_samples", float64(state)) + } return true }) } diff --git a/lib/streamaggr/count_series.go b/lib/streamaggr/count_series.go index 816581464b..e0ab85885a 100644 --- a/lib/streamaggr/count_series.go +++ b/lib/streamaggr/count_series.go @@ -13,16 +13,17 @@ type countSeriesAggrState struct { } type countSeriesStateValue struct { - mu sync.Mutex - m map[uint64]struct{} - deleted bool + mu sync.Mutex + state [aggrStateSize]map[uint64]struct{} + deleted bool + deleteDeadline int64 } func newCountSeriesAggrState() *countSeriesAggrState { return &countSeriesAggrState{} } -func (as *countSeriesAggrState) pushSamples(samples []pushSample) { +func (as *countSeriesAggrState) pushSamples(samples []pushSample, deleteDeadline int64, idx int) { for i := range samples { s := &samples[i] inputKey, outputKey := getInputOutputKey(s.key) @@ -35,27 +36,26 @@ func (as *countSeriesAggrState) pushSamples(samples []pushSample) { v, ok := as.m.Load(outputKey) if !ok { // The entry is missing in the map. Try creating it. - v = &countSeriesStateValue{ - m: map[uint64]struct{}{ - h: {}, - }, + csv := &countSeriesStateValue{} + for ic := range csv.state { + csv.state[ic] = make(map[uint64]struct{}) } + v = csv outputKey = bytesutil.InternString(outputKey) vNew, loaded := as.m.LoadOrStore(outputKey, v) - if !loaded { - // The entry has been added to the map. - continue + if loaded { + // Update the entry created by a concurrent goroutine. + v = vNew } - // Update the entry created by a concurrent goroutine. - v = vNew } sv := v.(*countSeriesStateValue) sv.mu.Lock() deleted := sv.deleted if !deleted { - if _, ok := sv.m[h]; !ok { - sv.m[h] = struct{}{} + if _, ok := sv.state[idx][h]; !ok { + sv.state[idx][h] = struct{}{} } + sv.deleteDeadline = deleteDeadline } sv.mu.Unlock() if deleted { @@ -69,18 +69,25 @@ func (as *countSeriesAggrState) pushSamples(samples []pushSample) { func (as *countSeriesAggrState) flushState(ctx *flushCtx) { m := &as.m m.Range(func(k, v any) bool { - // Atomically delete the entry from the map, so new entry is created for the next flush. - m.Delete(k) - sv := v.(*countSeriesStateValue) sv.mu.Lock() - n := len(sv.m) - // Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls. - sv.deleted = true - sv.mu.Unlock() - key := k.(string) - ctx.appendSeries(key, "count_series", float64(n)) + // check for stale entries + deleted := ctx.flushTimestamp > sv.deleteDeadline + if deleted { + // Mark the current entry as deleted + sv.deleted = deleted + sv.mu.Unlock() + m.Delete(k) + return true + } + state := len(sv.state[ctx.idx]) + sv.state[ctx.idx] = make(map[uint64]struct{}) + sv.mu.Unlock() + if state > 0 { + key := k.(string) + ctx.appendSeries(key, "count_series", float64(state)) + } return true }) } diff --git a/lib/streamaggr/dedup.go b/lib/streamaggr/dedup.go index 1374a0e741..145b8b74f4 100644 --- a/lib/streamaggr/dedup.go +++ b/lib/streamaggr/dedup.go @@ -14,7 +14,8 @@ import ( const dedupAggrShardsCount = 128 type dedupAggr struct { - shards []dedupAggrShard + shards []dedupAggrShard + currentIdx atomic.Int32 } type dedupAggrShard struct { @@ -25,16 +26,18 @@ type dedupAggrShard struct { _ [128 - unsafe.Sizeof(dedupAggrShardNopad{})%128]byte } -type dedupAggrShardNopad struct { - mu sync.Mutex - m map[string]*dedupAggrSample - +type dedupAggrState struct { + m map[string]*dedupAggrSample samplesBuf []dedupAggrSample - sizeBytes atomic.Uint64 itemsCount atomic.Uint64 } +type dedupAggrShardNopad struct { + mu sync.RWMutex + state [aggrStateSize]*dedupAggrState +} + type dedupAggrSample struct { value float64 timestamp int64 @@ -49,21 +52,27 @@ func newDedupAggr() *dedupAggr { func (da *dedupAggr) sizeBytes() uint64 { n := uint64(unsafe.Sizeof(*da)) + currentIdx := da.currentIdx.Load() for i := range da.shards { - n += da.shards[i].sizeBytes.Load() + if da.shards[i].state[currentIdx] != nil { + n += da.shards[i].state[currentIdx].sizeBytes.Load() + } } return n } func (da *dedupAggr) itemsCount() uint64 { n := uint64(0) + currentIdx := da.currentIdx.Load() for i := range da.shards { - n += da.shards[i].itemsCount.Load() + if da.shards[i].state[currentIdx] != nil { + n += da.shards[i].state[currentIdx].itemsCount.Load() + } } return n } -func (da *dedupAggr) pushSamples(samples []pushSample) { +func (da *dedupAggr) pushSamples(samples []pushSample, dedupIdx int) { pss := getPerShardSamples() shards := pss.shards for _, sample := range samples { @@ -75,7 +84,7 @@ func (da *dedupAggr) pushSamples(samples []pushSample) { if len(shardSamples) == 0 { continue } - da.shards[i].pushSamples(shardSamples) + da.shards[i].pushSamples(shardSamples, dedupIdx) } putPerShardSamples(pss) } @@ -104,7 +113,7 @@ func (ctx *dedupFlushCtx) reset() { ctx.samples = ctx.samples[:0] } -func (da *dedupAggr) flush(f func(samples []pushSample)) { +func (da *dedupAggr) flush(f aggrPushFunc, deleteDeadline int64, dedupIdx, flushIdx int) { var wg sync.WaitGroup for i := range da.shards { flushConcurrencyCh <- struct{}{} @@ -116,10 +125,11 @@ func (da *dedupAggr) flush(f func(samples []pushSample)) { }() ctx := getDedupFlushCtx() - shard.flush(ctx, f) + shard.flush(ctx, f, deleteDeadline, dedupIdx, flushIdx) putDedupFlushCtx(ctx) }(&da.shards[i]) } + da.currentIdx.Store((da.currentIdx.Load() + 1) % aggrStateSize) wg.Wait() } @@ -154,18 +164,20 @@ func putPerShardSamples(pss *perShardSamples) { var perShardSamplesPool sync.Pool -func (das *dedupAggrShard) pushSamples(samples []pushSample) { +func (das *dedupAggrShard) pushSamples(samples []pushSample, dedupIdx int) { das.mu.Lock() defer das.mu.Unlock() - m := das.m - if m == nil { - m = make(map[string]*dedupAggrSample, len(samples)) - das.m = m + state := das.state[dedupIdx] + if state == nil { + state = &dedupAggrState{ + m: make(map[string]*dedupAggrSample, len(samples)), + } + das.state[dedupIdx] = state } - samplesBuf := das.samplesBuf + samplesBuf := state.samplesBuf for _, sample := range samples { - s, ok := m[sample.key] + s, ok := state.m[sample.key] if !ok { samplesBuf = slicesutil.SetLength(samplesBuf, len(samplesBuf)+1) s = &samplesBuf[len(samplesBuf)-1] @@ -173,10 +185,10 @@ func (das *dedupAggrShard) pushSamples(samples []pushSample) { s.timestamp = sample.timestamp key := bytesutil.InternString(sample.key) - m[key] = s + state.m[key] = s - das.itemsCount.Add(1) - das.sizeBytes.Add(uint64(len(key)) + uint64(unsafe.Sizeof(key)+unsafe.Sizeof(s)+unsafe.Sizeof(*s))) + das.state[dedupIdx].itemsCount.Add(1) + das.state[dedupIdx].sizeBytes.Add(uint64(len(key)) + uint64(unsafe.Sizeof(key)+unsafe.Sizeof(s)+unsafe.Sizeof(*s))) continue } // Update the existing value according to logic described at https://docs.victoriametrics.com/#deduplication @@ -185,18 +197,20 @@ func (das *dedupAggrShard) pushSamples(samples []pushSample) { s.timestamp = sample.timestamp } } - das.samplesBuf = samplesBuf + das.state[dedupIdx].samplesBuf = samplesBuf } -func (das *dedupAggrShard) flush(ctx *dedupFlushCtx, f func(samples []pushSample)) { +func (das *dedupAggrShard) flush(ctx *dedupFlushCtx, f aggrPushFunc, deleteDeadline int64, dedupIdx, flushIdx int) { das.mu.Lock() - m := das.m - if len(m) > 0 { - das.m = make(map[string]*dedupAggrSample, len(m)) - das.sizeBytes.Store(0) - das.itemsCount.Store(0) - das.samplesBuf = make([]dedupAggrSample, 0, len(das.samplesBuf)) + var m map[string]*dedupAggrSample + state := das.state[dedupIdx] + if state != nil && len(state.m) > 0 { + m = state.m + das.state[dedupIdx].m = make(map[string]*dedupAggrSample, len(state.m)) + das.state[dedupIdx].samplesBuf = make([]dedupAggrSample, 0, len(das.state[dedupIdx].samplesBuf)) + das.state[dedupIdx].sizeBytes.Store(0) + das.state[dedupIdx].itemsCount.Store(0) } das.mu.Unlock() @@ -215,11 +229,11 @@ func (das *dedupAggrShard) flush(ctx *dedupFlushCtx, f func(samples []pushSample // Limit the number of samples per each flush in order to limit memory usage. if len(dstSamples) >= 10_000 { - f(dstSamples) + f(dstSamples, deleteDeadline, flushIdx) clear(dstSamples) dstSamples = dstSamples[:0] } } - f(dstSamples) + f(dstSamples, deleteDeadline, flushIdx) ctx.samples = dstSamples } diff --git a/lib/streamaggr/dedup_test.go b/lib/streamaggr/dedup_test.go index 91ce09e136..f0c2c33653 100644 --- a/lib/streamaggr/dedup_test.go +++ b/lib/streamaggr/dedup_test.go @@ -5,6 +5,7 @@ import ( "reflect" "sync" "testing" + "time" ) func TestDedupAggrSerial(t *testing.T) { @@ -20,7 +21,7 @@ func TestDedupAggrSerial(t *testing.T) { sample.value = float64(i + j) expectedSamplesMap[sample.key] = *sample } - da.pushSamples(samples) + da.pushSamples(samples, 0) } if n := da.sizeBytes(); n > 5_000_000 { @@ -32,14 +33,16 @@ func TestDedupAggrSerial(t *testing.T) { flushedSamplesMap := make(map[string]pushSample) var mu sync.Mutex - flushSamples := func(samples []pushSample) { + flushSamples := func(samples []pushSample, _ int64, _ int) { mu.Lock() for _, sample := range samples { flushedSamplesMap[sample.key] = sample } mu.Unlock() } - da.flush(flushSamples) + + flushTimestamp := time.Now().UnixMilli() + da.flush(flushSamples, flushTimestamp, 0, 0) if !reflect.DeepEqual(expectedSamplesMap, flushedSamplesMap) { t.Fatalf("unexpected samples;\ngot\n%v\nwant\n%v", flushedSamplesMap, expectedSamplesMap) @@ -70,7 +73,7 @@ func TestDedupAggrConcurrent(_ *testing.T) { sample.key = fmt.Sprintf("key_%d", j) sample.value = float64(i + j) } - da.pushSamples(samples) + da.pushSamples(samples, 0) } }() } diff --git a/lib/streamaggr/dedup_timing_test.go b/lib/streamaggr/dedup_timing_test.go index 2b6bab25c9..4a6fb586ad 100644 --- a/lib/streamaggr/dedup_timing_test.go +++ b/lib/streamaggr/dedup_timing_test.go @@ -27,7 +27,7 @@ func benchmarkDedupAggr(b *testing.B, samplesPerPush int) { b.RunParallel(func(pb *testing.PB) { for pb.Next() { for i := 0; i < loops; i++ { - da.pushSamples(benchSamples) + da.pushSamples(benchSamples, 0) } } }) diff --git a/lib/streamaggr/deduplicator.go b/lib/streamaggr/deduplicator.go index 273fd6eefc..a39b42863f 100644 --- a/lib/streamaggr/deduplicator.go +++ b/lib/streamaggr/deduplicator.go @@ -17,7 +17,8 @@ import ( type Deduplicator struct { da *dedupAggr - dropLabels []string + dropLabels []string + dedupInterval int64 wg sync.WaitGroup stopCh chan struct{} @@ -40,8 +41,9 @@ type Deduplicator struct { // MustStop must be called on the returned deduplicator in order to free up occupied resources. func NewDeduplicator(pushFunc PushFunc, dedupInterval time.Duration, dropLabels []string, alias string) *Deduplicator { d := &Deduplicator{ - da: newDedupAggr(), - dropLabels: dropLabels, + da: newDedupAggr(), + dropLabels: dropLabels, + dedupInterval: dedupInterval.Milliseconds(), stopCh: make(chan struct{}), ms: metrics.NewSet(), @@ -89,6 +91,7 @@ func (d *Deduplicator) Push(tss []prompbmarshal.TimeSeries) { buf := ctx.buf dropLabels := d.dropLabels + aggrIntervals := int64(aggrStateSize) for _, ts := range tss { if len(dropLabels) > 0 { labels.Labels = dropSeriesLabels(labels.Labels[:0], ts.Labels, dropLabels) @@ -104,7 +107,9 @@ func (d *Deduplicator) Push(tss []prompbmarshal.TimeSeries) { buf = lc.Compress(buf, labels.Labels) key := bytesutil.ToUnsafeString(buf[bufLen:]) for _, s := range ts.Samples { - pss = append(pss, pushSample{ + flushIntervals := s.Timestamp/d.dedupInterval + 1 + idx := int(flushIntervals % aggrIntervals) + pss[idx] = append(pss[idx], pushSample{ key: key, value: s.Value, timestamp: s.Timestamp, @@ -112,7 +117,9 @@ func (d *Deduplicator) Push(tss []prompbmarshal.TimeSeries) { } } - d.da.pushSamples(pss) + for idx, ps := range pss { + d.da.pushSamples(ps, idx) + } ctx.pss = pss ctx.buf = buf @@ -135,17 +142,18 @@ func (d *Deduplicator) runFlusher(pushFunc PushFunc, dedupInterval time.Duration select { case <-d.stopCh: return - case <-t.C: - d.flush(pushFunc, dedupInterval) + case t := <-t.C: + flushTime := t.Truncate(dedupInterval).Add(dedupInterval) + flushTimestamp := flushTime.UnixMilli() + flushIntervals := int(flushTimestamp / int64(dedupInterval/time.Millisecond)) + flushIdx := flushIntervals % aggrStateSize + d.flush(pushFunc, dedupInterval, flushTime, flushIdx) } } } -func (d *Deduplicator) flush(pushFunc PushFunc, dedupInterval time.Duration) { - startTime := time.Now() - - timestamp := startTime.UnixMilli() - d.da.flush(func(pss []pushSample) { +func (d *Deduplicator) flush(pushFunc PushFunc, dedupInterval time.Duration, flushTime time.Time, flushIdx int) { + d.da.flush(func(pss []pushSample, _ int64, _ int) { ctx := getDeduplicatorFlushCtx() tss := ctx.tss @@ -158,7 +166,7 @@ func (d *Deduplicator) flush(pushFunc PushFunc, dedupInterval time.Duration) { samplesLen := len(samples) samples = append(samples, prompbmarshal.Sample{ Value: ps.value, - Timestamp: timestamp, + Timestamp: ps.timestamp, }) tss = append(tss, prompbmarshal.TimeSeries{ @@ -172,9 +180,9 @@ func (d *Deduplicator) flush(pushFunc PushFunc, dedupInterval time.Duration) { ctx.labels = labels ctx.samples = samples putDeduplicatorFlushCtx(ctx) - }) + }, flushTime.UnixMilli(), flushIdx, flushIdx) - duration := time.Since(startTime) + duration := time.Since(flushTime) d.dedupFlushDuration.Update(duration.Seconds()) if duration > dedupInterval { d.dedupFlushTimeouts.Inc() @@ -185,14 +193,15 @@ func (d *Deduplicator) flush(pushFunc PushFunc, dedupInterval time.Duration) { } type deduplicatorPushCtx struct { - pss []pushSample + pss [aggrStateSize][]pushSample labels promutils.Labels buf []byte } func (ctx *deduplicatorPushCtx) reset() { - clear(ctx.pss) - ctx.pss = ctx.pss[:0] + for i, sc := range ctx.pss { + ctx.pss[i] = sc[:0] + } ctx.labels.Reset() diff --git a/lib/streamaggr/deduplicator_test.go b/lib/streamaggr/deduplicator_test.go index 1d304f6dd0..48b5b4c839 100644 --- a/lib/streamaggr/deduplicator_test.go +++ b/lib/streamaggr/deduplicator_test.go @@ -21,20 +21,26 @@ func TestDeduplicator(t *testing.T) { tss := prompbmarshal.MustParsePromMetrics(` foo{instance="x",job="aaa",pod="sdfd-dfdfdfs",node="aosijjewrerfd",namespace="asdff",container="ohohffd"} 123 bar{instance="x",job="aaa",pod="sdfd-dfdfdfs",node="aosijjewrerfd",namespace="asdff",container="ohohffd"} 34.54 -x 8943 1000 +x 8943 1 baz_aaa_aaa_fdd{instance="x",job="aaa",pod="sdfd-dfdfdfs",node="aosijjewrerfd",namespace="asdff",container="ohohffd"} -34.34 -x 90984 900 -x 433 1000 +x 90984 +x 433 1 asfjkldsf{instance="x",job="aaa",pod="sdfd-dfdfdfs",node="aosijjewrerfd",namespace="asdff",container="ohohffd"} 12322 foo{instance="x",job="aaa",pod="sdfd-dfdfdfs",node="aosijjewrerfd",namespace="asdff",container="ohohffd"} 894 baz_aaa_aaa_fdd{instance="x",job="aaa",pod="sdfd-dfdfdfs",node="aosijjewrerfd",namespace="asdff",container="ohohffd"} -2.3 `, offsetMsecs) - d := NewDeduplicator(pushFunc, time.Hour, []string{"node", "instance"}, "global") + dedupInterval := time.Hour + d := NewDeduplicator(pushFunc, dedupInterval, []string{"node", "instance"}, "global") for i := 0; i < 10; i++ { d.Push(tss) } - d.flush(pushFunc, time.Hour) + + flushTime := time.Now() + flushIntervals := flushTime.UnixMilli()/dedupInterval.Milliseconds() + 1 + idx := int(flushIntervals % int64(aggrStateSize)) + + d.flush(pushFunc, time.Hour, time.Now(), idx) d.MustStop() result := timeSeriessToString(tssResult) diff --git a/lib/streamaggr/histogram_bucket.go b/lib/streamaggr/histogram_bucket.go index fdf7b3fad9..7a72f30f7a 100644 --- a/lib/streamaggr/histogram_bucket.go +++ b/lib/streamaggr/histogram_bucket.go @@ -1,39 +1,30 @@ package streamaggr import ( - "math" "sync" - "time" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/metrics" ) // histogramBucketAggrState calculates output=histogram_bucket, e.g. VictoriaMetrics histogram over input samples. type histogramBucketAggrState struct { m sync.Map - - stalenessSecs uint64 } type histogramBucketStateValue struct { mu sync.Mutex - h metrics.Histogram - deleteDeadline uint64 + state [aggrStateSize]metrics.Histogram + total metrics.Histogram deleted bool + deleteDeadline int64 } -func newHistogramBucketAggrState(stalenessInterval time.Duration) *histogramBucketAggrState { - stalenessSecs := roundDurationToSecs(stalenessInterval) - return &histogramBucketAggrState{ - stalenessSecs: stalenessSecs, - } +func newHistogramBucketAggrState() *histogramBucketAggrState { + return &histogramBucketAggrState{} } -func (as *histogramBucketAggrState) pushSamples(samples []pushSample) { - currentTime := fasttime.UnixTimestamp() - deleteDeadline := currentTime + as.stalenessSecs +func (as *histogramBucketAggrState) pushSamples(samples []pushSample, deleteDeadline int64, idx int) { for i := range samples { s := &samples[i] outputKey := getOutputKey(s.key) @@ -54,7 +45,7 @@ func (as *histogramBucketAggrState) pushSamples(samples []pushSample) { sv.mu.Lock() deleted := sv.deleted if !deleted { - sv.h.Update(s.value) + sv.state[idx].Update(s.value) sv.deleteDeadline = deleteDeadline } sv.mu.Unlock() @@ -66,50 +57,29 @@ func (as *histogramBucketAggrState) pushSamples(samples []pushSample) { } } -func (as *histogramBucketAggrState) removeOldEntries(currentTime uint64) { +func (as *histogramBucketAggrState) flushState(ctx *flushCtx) { m := &as.m m.Range(func(k, v any) bool { sv := v.(*histogramBucketStateValue) - sv.mu.Lock() - deleted := currentTime > sv.deleteDeadline + + // check for stale entries + deleted := ctx.flushTimestamp > sv.deleteDeadline if deleted { // Mark the current entry as deleted sv.deleted = deleted - } - sv.mu.Unlock() - - if deleted { + sv.mu.Unlock() m.Delete(k) + return true } - return true - }) -} - -func (as *histogramBucketAggrState) flushState(ctx *flushCtx) { - currentTime := fasttime.UnixTimestamp() - - as.removeOldEntries(currentTime) - - m := &as.m - m.Range(func(k, v any) bool { - sv := v.(*histogramBucketStateValue) - sv.mu.Lock() - if !sv.deleted { - key := k.(string) - sv.h.VisitNonZeroBuckets(func(vmrange string, count uint64) { - ctx.appendSeriesWithExtraLabel(key, "histogram_bucket", float64(count), "vmrange", vmrange) - }) - } + sv.total.Merge(&sv.state[ctx.idx]) + total := &sv.total + sv.state[ctx.idx] = metrics.Histogram{} sv.mu.Unlock() + key := k.(string) + total.VisitNonZeroBuckets(func(vmrange string, count uint64) { + ctx.appendSeriesWithExtraLabel(key, "histogram_bucket", float64(count), "vmrange", vmrange) + }) return true }) } - -func roundDurationToSecs(d time.Duration) uint64 { - if d < 0 { - return 0 - } - secs := d.Seconds() - return uint64(math.Ceil(secs)) -} diff --git a/lib/streamaggr/last.go b/lib/streamaggr/last.go index 0ae4b9b8c2..4c39049deb 100644 --- a/lib/streamaggr/last.go +++ b/lib/streamaggr/last.go @@ -12,17 +12,22 @@ type lastAggrState struct { } type lastStateValue struct { - mu sync.Mutex + mu sync.Mutex + state [aggrStateSize]lastState + deleted bool + deleteDeadline int64 +} + +type lastState struct { last float64 timestamp int64 - deleted bool } func newLastAggrState() *lastAggrState { return &lastAggrState{} } -func (as *lastAggrState) pushSamples(samples []pushSample) { +func (as *lastAggrState) pushSamples(samples []pushSample, deleteDeadline int64, idx int) { for i := range samples { s := &samples[i] outputKey := getOutputKey(s.key) @@ -31,27 +36,23 @@ func (as *lastAggrState) pushSamples(samples []pushSample) { v, ok := as.m.Load(outputKey) if !ok { // The entry is missing in the map. Try creating it. - v = &lastStateValue{ - last: s.value, - timestamp: s.timestamp, - } + v = &lastStateValue{} outputKey = bytesutil.InternString(outputKey) vNew, loaded := as.m.LoadOrStore(outputKey, v) - if !loaded { - // The new entry has been successfully created. - continue + if loaded { + // Update the entry created by a concurrent goroutine. + v = vNew } - // Use the entry created by a concurrent goroutine. - v = vNew } sv := v.(*lastStateValue) sv.mu.Lock() deleted := sv.deleted if !deleted { - if s.timestamp >= sv.timestamp { - sv.last = s.value - sv.timestamp = s.timestamp + if s.timestamp >= sv.state[idx].timestamp { + sv.state[idx].last = s.value + sv.state[idx].timestamp = s.timestamp } + sv.deleteDeadline = deleteDeadline } sv.mu.Unlock() if deleted { @@ -65,18 +66,25 @@ func (as *lastAggrState) pushSamples(samples []pushSample) { func (as *lastAggrState) flushState(ctx *flushCtx) { m := &as.m m.Range(func(k, v any) bool { - // Atomically delete the entry from the map, so new entry is created for the next flush. - m.Delete(k) - sv := v.(*lastStateValue) sv.mu.Lock() - last := sv.last - // Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls. - sv.deleted = true - sv.mu.Unlock() - key := k.(string) - ctx.appendSeries(key, "last", last) + // check for stale entries + deleted := ctx.flushTimestamp > sv.deleteDeadline + if deleted { + // Mark the current entry as deleted + sv.deleted = deleted + sv.mu.Unlock() + m.Delete(k) + return true + } + state := sv.state[ctx.idx] + sv.state[ctx.idx] = lastState{} + sv.mu.Unlock() + if state.timestamp > 0 { + key := k.(string) + ctx.appendSeries(key, "last", state.last) + } return true }) } diff --git a/lib/streamaggr/max.go b/lib/streamaggr/max.go index 9197d3add7..54224929b0 100644 --- a/lib/streamaggr/max.go +++ b/lib/streamaggr/max.go @@ -12,16 +12,22 @@ type maxAggrState struct { } type maxStateValue struct { - mu sync.Mutex - max float64 - deleted bool + mu sync.Mutex + state [aggrStateSize]maxState + deleted bool + deleteDeadline int64 +} + +type maxState struct { + max float64 + exists bool } func newMaxAggrState() *maxAggrState { return &maxAggrState{} } -func (as *maxAggrState) pushSamples(samples []pushSample) { +func (as *maxAggrState) pushSamples(samples []pushSample, deleteDeadline int64, idx int) { for i := range samples { s := &samples[i] outputKey := getOutputKey(s.key) @@ -30,25 +36,26 @@ func (as *maxAggrState) pushSamples(samples []pushSample) { v, ok := as.m.Load(outputKey) if !ok { // The entry is missing in the map. Try creating it. - v = &maxStateValue{ - max: s.value, - } + v = &maxStateValue{} outputKey = bytesutil.InternString(outputKey) vNew, loaded := as.m.LoadOrStore(outputKey, v) - if !loaded { - // The new entry has been successfully created. - continue + if loaded { + // Use the entry created by a concurrent goroutine. + v = vNew } - // Use the entry created by a concurrent goroutine. - v = vNew } sv := v.(*maxStateValue) sv.mu.Lock() deleted := sv.deleted if !deleted { - if s.value > sv.max { - sv.max = s.value + state := &sv.state[idx] + if !state.exists { + state.max = s.value + state.exists = true + } else if s.value > state.max { + state.max = s.value } + sv.deleteDeadline = deleteDeadline } sv.mu.Unlock() if deleted { @@ -62,18 +69,25 @@ func (as *maxAggrState) pushSamples(samples []pushSample) { func (as *maxAggrState) flushState(ctx *flushCtx) { m := &as.m m.Range(func(k, v any) bool { - // Atomically delete the entry from the map, so new entry is created for the next flush. - m.Delete(k) - sv := v.(*maxStateValue) sv.mu.Lock() - max := sv.max - // Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls. - sv.deleted = true - sv.mu.Unlock() - key := k.(string) - ctx.appendSeries(key, "max", max) + // check for stale entries + deleted := ctx.flushTimestamp > sv.deleteDeadline + if deleted { + // Mark the current entry as deleted + sv.deleted = deleted + sv.mu.Unlock() + m.Delete(k) + return true + } + state := sv.state[ctx.idx] + sv.state[ctx.idx] = maxState{} + sv.mu.Unlock() + if state.exists { + key := k.(string) + ctx.appendSeries(key, "max", state.max) + } return true }) } diff --git a/lib/streamaggr/min.go b/lib/streamaggr/min.go index 308f259c72..de6620c3fc 100644 --- a/lib/streamaggr/min.go +++ b/lib/streamaggr/min.go @@ -12,16 +12,22 @@ type minAggrState struct { } type minStateValue struct { - mu sync.Mutex - min float64 - deleted bool + mu sync.Mutex + state [aggrStateSize]minState + deleted bool + deleteDeadline int64 +} + +type minState struct { + min float64 + exists bool } func newMinAggrState() *minAggrState { return &minAggrState{} } -func (as *minAggrState) pushSamples(samples []pushSample) { +func (as *minAggrState) pushSamples(samples []pushSample, deleteDeadline int64, idx int) { for i := range samples { s := &samples[i] outputKey := getOutputKey(s.key) @@ -30,25 +36,26 @@ func (as *minAggrState) pushSamples(samples []pushSample) { v, ok := as.m.Load(outputKey) if !ok { // The entry is missing in the map. Try creating it. - v = &minStateValue{ - min: s.value, - } + v = &minStateValue{} outputKey = bytesutil.InternString(outputKey) vNew, loaded := as.m.LoadOrStore(outputKey, v) - if !loaded { - // The new entry has been successfully created. - continue + if loaded { + // Use the entry created by a concurrent goroutine. + v = vNew } - // Use the entry created by a concurrent goroutine. - v = vNew } sv := v.(*minStateValue) sv.mu.Lock() deleted := sv.deleted if !deleted { - if s.value < sv.min { - sv.min = s.value + state := &sv.state[idx] + if !state.exists { + state.min = s.value + state.exists = true + } else if s.value < state.min { + state.min = s.value } + sv.deleteDeadline = deleteDeadline } sv.mu.Unlock() if deleted { @@ -62,17 +69,25 @@ func (as *minAggrState) pushSamples(samples []pushSample) { func (as *minAggrState) flushState(ctx *flushCtx) { m := &as.m m.Range(func(k, v any) bool { - // Atomically delete the entry from the map, so new entry is created for the next flush. - m.Delete(k) - sv := v.(*minStateValue) sv.mu.Lock() - min := sv.min - // Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls. - sv.deleted = true + + // check for stale entries + deleted := ctx.flushTimestamp > sv.deleteDeadline + if deleted { + // Mark the current entry as deleted + sv.deleted = deleted + sv.mu.Unlock() + m.Delete(k) + return true + } + state := sv.state[ctx.idx] + sv.state[ctx.idx] = minState{} sv.mu.Unlock() - key := k.(string) - ctx.appendSeries(key, "min", min) + if state.exists { + key := k.(string) + ctx.appendSeries(key, "min", state.min) + } return true }) } diff --git a/lib/streamaggr/quantiles.go b/lib/streamaggr/quantiles.go index 00fefe8778..c866ae17e1 100644 --- a/lib/streamaggr/quantiles.go +++ b/lib/streamaggr/quantiles.go @@ -10,15 +10,15 @@ import ( // quantilesAggrState calculates output=quantiles, e.g. the given quantiles over the input samples. type quantilesAggrState struct { - m sync.Map - + m sync.Map phis []float64 } type quantilesStateValue struct { - mu sync.Mutex - h *histogram.Fast - deleted bool + mu sync.Mutex + state [aggrStateSize]*histogram.Fast + deleted bool + deleteDeadline int64 } func newQuantilesAggrState(phis []float64) *quantilesAggrState { @@ -27,7 +27,7 @@ func newQuantilesAggrState(phis []float64) *quantilesAggrState { } } -func (as *quantilesAggrState) pushSamples(samples []pushSample) { +func (as *quantilesAggrState) pushSamples(samples []pushSample, deleteDeadline int64, idx int) { for i := range samples { s := &samples[i] outputKey := getOutputKey(s.key) @@ -36,15 +36,11 @@ func (as *quantilesAggrState) pushSamples(samples []pushSample) { v, ok := as.m.Load(outputKey) if !ok { // The entry is missing in the map. Try creating it. - h := histogram.GetFast() - v = &quantilesStateValue{ - h: h, - } + v = &quantilesStateValue{} outputKey = bytesutil.InternString(outputKey) vNew, loaded := as.m.LoadOrStore(outputKey, v) if loaded { // Use the entry created by a concurrent goroutine. - histogram.PutFast(h) v = vNew } } @@ -52,7 +48,11 @@ func (as *quantilesAggrState) pushSamples(samples []pushSample) { sv.mu.Lock() deleted := sv.deleted if !deleted { - sv.h.Update(s.value) + if sv.state[idx] == nil { + sv.state[idx] = histogram.GetFast() + } + sv.state[idx].Update(s.value) + sv.deleteDeadline = deleteDeadline } sv.mu.Unlock() if deleted { @@ -69,22 +69,33 @@ func (as *quantilesAggrState) flushState(ctx *flushCtx) { var quantiles []float64 var b []byte m.Range(func(k, v any) bool { - // Atomically delete the entry from the map, so new entry is created for the next flush. - m.Delete(k) - sv := v.(*quantilesStateValue) sv.mu.Lock() - quantiles = sv.h.Quantiles(quantiles[:0], phis) - histogram.PutFast(sv.h) - // Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls. - sv.deleted = true - sv.mu.Unlock() - key := k.(string) - for i, quantile := range quantiles { - b = strconv.AppendFloat(b[:0], phis[i], 'g', -1, 64) - phiStr := bytesutil.InternBytes(b) - ctx.appendSeriesWithExtraLabel(key, "quantiles", quantile, "quantile", phiStr) + // check for stale entries + deleted := ctx.flushTimestamp > sv.deleteDeadline + if deleted { + // Mark the current entry as deleted + sv.deleted = deleted + sv.mu.Unlock() + m.Delete(k) + return true + } + state := sv.state[ctx.idx] + quantiles = quantiles[:0] + if state != nil { + quantiles = state.Quantiles(quantiles[:0], phis) + histogram.PutFast(state) + state.Reset() + } + sv.mu.Unlock() + if len(quantiles) > 0 { + key := k.(string) + for i, quantile := range quantiles { + b = strconv.AppendFloat(b[:0], phis[i], 'g', -1, 64) + phiStr := bytesutil.InternBytes(b) + ctx.appendSeriesWithExtraLabel(key, "quantiles", quantile, "quantile", phiStr) + } } return true }) diff --git a/lib/streamaggr/rate.go b/lib/streamaggr/rate.go index 1c3f2fb5ce..029fc1fb7a 100644 --- a/lib/streamaggr/rate.go +++ b/lib/streamaggr/rate.go @@ -2,10 +2,8 @@ package streamaggr import ( "sync" - "time" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" ) // rateAggrState calculates output=rate_avg and rate_sum, e.g. the average per-second increase rate for counter metrics. @@ -14,41 +12,44 @@ type rateAggrState struct { // isAvg is set to true if rate_avg() must be calculated instead of rate_sum(). isAvg bool - - // Time series state is dropped if no new samples are received during stalenessSecs. - stalenessSecs uint64 } type rateStateValue struct { mu sync.Mutex - lastValues map[string]rateLastValueState - deleteDeadline uint64 + state map[string]rateState deleted bool + deleteDeadline int64 +} + +type rateState struct { + lastValues [aggrStateSize]rateLastValueState + // prevTimestamp stores timestamp of the last registered value + // in the previous aggregation interval + prevTimestamp int64 + + // prevValue stores last registered value + // in the previous aggregation interval + prevValue float64 + deleteDeadline int64 } type rateLastValueState struct { - value float64 - timestamp int64 - deleteDeadline uint64 + firstValue float64 + value float64 + timestamp int64 - // increase stores cumulative increase for the current time series on the current aggregation interval - increase float64 - - // prevTimestamp is the timestamp of the last registered sample in the previous aggregation interval - prevTimestamp int64 + // total stores cumulative difference between registered values + // in the aggregation interval + total float64 } -func newRateAggrState(stalenessInterval time.Duration, isAvg bool) *rateAggrState { - stalenessSecs := roundDurationToSecs(stalenessInterval) +func newRateAggrState(isAvg bool) *rateAggrState { return &rateAggrState{ isAvg: isAvg, - stalenessSecs: stalenessSecs, } } -func (as *rateAggrState) pushSamples(samples []pushSample) { - currentTime := fasttime.UnixTimestamp() - deleteDeadline := currentTime + as.stalenessSecs +func (as *rateAggrState) pushSamples(samples []pushSample, deleteDeadline int64, idx int) { for i := range samples { s := &samples[i] inputKey, outputKey := getInputOutputKey(s.key) @@ -57,9 +58,10 @@ func (as *rateAggrState) pushSamples(samples []pushSample) { v, ok := as.m.Load(outputKey) if !ok { // The entry is missing in the map. Try creating it. - v = &rateStateValue{ - lastValues: make(map[string]rateLastValueState), + rsv := &rateStateValue{ + state: make(map[string]rateState), } + v = rsv outputKey = bytesutil.InternString(outputKey) vNew, loaded := as.m.LoadOrStore(outputKey, v) if loaded { @@ -71,29 +73,33 @@ func (as *rateAggrState) pushSamples(samples []pushSample) { sv.mu.Lock() deleted := sv.deleted if !deleted { - lv, ok := sv.lastValues[inputKey] - if ok { + state, ok := sv.state[inputKey] + lv := state.lastValues[idx] + if ok && lv.timestamp > 0 { if s.timestamp < lv.timestamp { // Skip out of order sample sv.mu.Unlock() continue } - + if state.prevTimestamp == 0 { + state.prevTimestamp = lv.timestamp + state.prevValue = lv.value + } if s.value >= lv.value { - lv.increase += s.value - lv.value + lv.total += s.value - lv.value } else { // counter reset - lv.increase += s.value + lv.total += s.value } - } else { - lv.prevTimestamp = s.timestamp + } else if state.prevTimestamp > 0 { + lv.firstValue = s.value } lv.value = s.value lv.timestamp = s.timestamp - lv.deleteDeadline = deleteDeadline - + state.lastValues[idx] = lv + state.deleteDeadline = deleteDeadline inputKey = bytesutil.InternString(inputKey) - sv.lastValues[inputKey] = lv + sv.state[inputKey] = state sv.deleteDeadline = deleteDeadline } sv.mu.Unlock() @@ -105,64 +111,23 @@ func (as *rateAggrState) pushSamples(samples []pushSample) { } } -func (as *rateAggrState) flushState(ctx *flushCtx) { - currentTime := fasttime.UnixTimestamp() - - suffix := as.getSuffix() - - as.removeOldEntries(currentTime) - - m := &as.m - m.Range(func(k, v any) bool { - sv := v.(*rateStateValue) - - sv.mu.Lock() - lvs := sv.lastValues - sumRate := 0.0 - countSeries := 0 - for k1, lv := range lvs { - d := float64(lv.timestamp-lv.prevTimestamp) / 1000 - if d > 0 { - sumRate += lv.increase / d - countSeries++ - } - lv.prevTimestamp = lv.timestamp - lv.increase = 0 - lvs[k1] = lv - } - deleted := sv.deleted - sv.mu.Unlock() - - if countSeries == 0 || deleted { - // Nothing to update - return true - } - - result := sumRate - if as.isAvg { - result /= float64(countSeries) - } - - key := k.(string) - ctx.appendSeries(key, suffix, result) - return true - }) -} - func (as *rateAggrState) getSuffix() string { - if as.isAvg { - return "rate_avg" - } - return "rate_sum" + if as.isAvg { + return "rate_avg" + } + return "rate_sum" } -func (as *rateAggrState) removeOldEntries(currentTime uint64) { +func (as *rateAggrState) flushState(ctx *flushCtx) { m := &as.m + suffix := as.getSuffix() m.Range(func(k, v any) bool { sv := v.(*rateStateValue) - sv.mu.Lock() - if currentTime > sv.deleteDeadline { + + // check for stale entries + deleted := ctx.flushTimestamp > sv.deleteDeadline + if deleted { // Mark the current entry as deleted sv.deleted = true sv.mu.Unlock() @@ -170,14 +135,42 @@ func (as *rateAggrState) removeOldEntries(currentTime uint64) { return true } - // Delete outdated entries in sv.lastValues - lvs := sv.lastValues - for k1, lv := range lvs { - if currentTime > lv.deleteDeadline { - delete(lvs, k1) + // Delete outdated entries in state + var rate float64 + var totalItems int + for k1, state := range sv.state { + if ctx.flushTimestamp > state.deleteDeadline { + delete(sv.state, k1) + continue } + v1 := state.lastValues[ctx.idx] + rateInterval := v1.timestamp - state.prevTimestamp + if rateInterval > 0 && state.prevTimestamp > 0 { + if v1.firstValue >= state.prevValue { + v1.total += v1.firstValue - state.prevValue + } else { + v1.total += v1.firstValue + } + + // calculate rate only if value was seen at least twice with different timestamps + rate += (v1.total) * 1000 / float64(rateInterval) + state.prevTimestamp = v1.timestamp + state.prevValue = v1.value + totalItems++ + } + state.lastValues[ctx.idx] = rateLastValueState{} + sv.state[k1] = state } + sv.mu.Unlock() + + if totalItems > 0 { + if as.isAvg { + rate /= float64(totalItems) + } + key := k.(string) + ctx.appendSeries(key, suffix, rate) + } return true }) } diff --git a/lib/streamaggr/stddev.go b/lib/streamaggr/stddev.go index 053e0f2090..e8c95cf1fc 100644 --- a/lib/streamaggr/stddev.go +++ b/lib/streamaggr/stddev.go @@ -13,18 +13,23 @@ type stddevAggrState struct { } type stddevStateValue struct { - mu sync.Mutex - count float64 - avg float64 - q float64 - deleted bool + mu sync.Mutex + state [aggrStateSize]stddevState + deleted bool + deleteDeadline int64 +} + +type stddevState struct { + count float64 + avg float64 + q float64 } func newStddevAggrState() *stddevAggrState { return &stddevAggrState{} } -func (as *stddevAggrState) pushSamples(samples []pushSample) { +func (as *stddevAggrState) pushSamples(samples []pushSample, deleteDeadline int64, idx int) { for i := range samples { s := &samples[i] outputKey := getOutputKey(s.key) @@ -46,10 +51,12 @@ func (as *stddevAggrState) pushSamples(samples []pushSample) { deleted := sv.deleted if !deleted { // See `Rapid calculation methods` at https://en.wikipedia.org/wiki/Standard_deviation - sv.count++ - avg := sv.avg + (s.value-sv.avg)/sv.count - sv.q += (s.value - sv.avg) * (s.value - avg) - sv.avg = avg + state := &sv.state[idx] + state.count++ + avg := state.avg + (s.value-state.avg)/state.count + state.q += (s.value - state.avg) * (s.value - avg) + state.avg = avg + sv.deleteDeadline = deleteDeadline } sv.mu.Unlock() if deleted { @@ -63,18 +70,25 @@ func (as *stddevAggrState) pushSamples(samples []pushSample) { func (as *stddevAggrState) flushState(ctx *flushCtx) { m := &as.m m.Range(func(k, v any) bool { - // Atomically delete the entry from the map, so new entry is created for the next flush. - m.Delete(k) - sv := v.(*stddevStateValue) sv.mu.Lock() - stddev := math.Sqrt(sv.q / sv.count) - // Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls. - sv.deleted = true - sv.mu.Unlock() - key := k.(string) - ctx.appendSeries(key, "stddev", stddev) + // check for stale entries + deleted := ctx.flushTimestamp > sv.deleteDeadline + if deleted { + // Mark the current entry as deleted + sv.deleted = deleted + sv.mu.Unlock() + m.Delete(k) + return true + } + state := sv.state[ctx.idx] + sv.state[ctx.idx] = stddevState{} + sv.mu.Unlock() + if state.count > 0 { + key := k.(string) + ctx.appendSeries(key, "stddev", math.Sqrt(state.q/state.count)) + } return true }) } diff --git a/lib/streamaggr/stdvar.go b/lib/streamaggr/stdvar.go index 8170f227ad..f066ae30de 100644 --- a/lib/streamaggr/stdvar.go +++ b/lib/streamaggr/stdvar.go @@ -12,18 +12,23 @@ type stdvarAggrState struct { } type stdvarStateValue struct { - mu sync.Mutex - count float64 - avg float64 - q float64 - deleted bool + mu sync.Mutex + state [aggrStateSize]stdvarState + deleted bool + deleteDeadline int64 +} + +type stdvarState struct { + count float64 + avg float64 + q float64 } func newStdvarAggrState() *stdvarAggrState { return &stdvarAggrState{} } -func (as *stdvarAggrState) pushSamples(samples []pushSample) { +func (as *stdvarAggrState) pushSamples(samples []pushSample, deleteDeadline int64, idx int) { for i := range samples { s := &samples[i] outputKey := getOutputKey(s.key) @@ -45,10 +50,12 @@ func (as *stdvarAggrState) pushSamples(samples []pushSample) { deleted := sv.deleted if !deleted { // See `Rapid calculation methods` at https://en.wikipedia.org/wiki/Standard_deviation - sv.count++ - avg := sv.avg + (s.value-sv.avg)/sv.count - sv.q += (s.value - sv.avg) * (s.value - avg) - sv.avg = avg + state := &sv.state[idx] + state.count++ + avg := state.avg + (s.value-state.avg)/state.count + state.q += (s.value - state.avg) * (s.value - avg) + state.avg = avg + sv.deleteDeadline = deleteDeadline } sv.mu.Unlock() if deleted { @@ -62,18 +69,25 @@ func (as *stdvarAggrState) pushSamples(samples []pushSample) { func (as *stdvarAggrState) flushState(ctx *flushCtx) { m := &as.m m.Range(func(k, v any) bool { - // Atomically delete the entry from the map, so new entry is created for the next flush. - m.Delete(k) - sv := v.(*stdvarStateValue) sv.mu.Lock() - stdvar := sv.q / sv.count - // Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls. - sv.deleted = true - sv.mu.Unlock() - key := k.(string) - ctx.appendSeries(key, "stdvar", stdvar) + // check for stale entries + deleted := ctx.flushTimestamp > sv.deleteDeadline + if deleted { + // Mark the current entry as deleted + sv.deleted = deleted + sv.mu.Unlock() + m.Delete(k) + return true + } + state := sv.state[ctx.idx] + sv.state[ctx.idx] = stdvarState{} + sv.mu.Unlock() + if state.count > 0 { + key := k.(string) + ctx.appendSeries(key, "stdvar", state.q/state.count) + } return true }) } diff --git a/lib/streamaggr/streamaggr.go b/lib/streamaggr/streamaggr.go index f16f9a10ad..be60bc3475 100644 --- a/lib/streamaggr/streamaggr.go +++ b/lib/streamaggr/streamaggr.go @@ -23,9 +23,13 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils" "github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool" "github.com/VictoriaMetrics/metrics" + "github.com/valyala/histogram" "gopkg.in/yaml.v2" ) +// count of aggregation intervals for states +const aggrStateSize = 2 + var supportedOutputs = []string{ "avg", "count_samples", @@ -373,8 +377,9 @@ type aggregator struct { dropInputLabels []string - inputRelabeling *promrelabel.ParsedConfigs - outputRelabeling *promrelabel.ParsedConfigs + inputRelabeling *promrelabel.ParsedConfigs + outputRelabeling *promrelabel.ParsedConfigs + stalenessInterval time.Duration keepMetricNames bool ignoreOldSamples bool @@ -382,6 +387,7 @@ type aggregator struct { by []string without []string aggregateOnlyByTime bool + tickInterval int64 // interval is the interval between flushes interval time.Duration @@ -398,6 +404,10 @@ type aggregator struct { // minTimestamp is used for ignoring old samples when ignoreOldSamples is set minTimestamp atomic.Int64 + // time to wait after interval end before flush + flushAfter *histogram.Fast + muFlushAfter sync.Mutex + // suffix contains a suffix, which should be added to aggregate metric names // // It contains the interval, labels in (by, without), plus output name. @@ -429,7 +439,7 @@ type aggrState interface { // pushSamples must push samples to the aggrState. // // samples[].key must be cloned by aggrState, since it may change after returning from pushSamples. - pushSamples(samples []pushSample) + pushSamples(samples []pushSample, deleteDeadline int64, idx int) // flushState must flush aggrState data to ctx. flushState(ctx *flushCtx) @@ -438,6 +448,8 @@ type aggrState interface { // PushFunc is called by Aggregators when it needs to push its state to metrics storage type PushFunc func(tss []prompbmarshal.TimeSeries) +type aggrPushFunc func(samples []pushSample, deleteDeadline int64, idx int) + // newAggregator creates new aggregator for the given cfg, which pushes the aggregate data to pushFunc. // // opts can contain additional options. If opts is nil, then default options are used. @@ -601,12 +613,14 @@ func newAggregator(cfg *Config, path string, pushFunc PushFunc, ms *metrics.Set, inputRelabeling: inputRelabeling, outputRelabeling: outputRelabeling, - keepMetricNames: keepMetricNames, - ignoreOldSamples: ignoreOldSamples, + keepMetricNames: keepMetricNames, + ignoreOldSamples: ignoreOldSamples, + stalenessInterval: stalenessInterval, by: by, without: without, aggregateOnlyByTime: aggregateOnlyByTime, + tickInterval: interval.Milliseconds(), interval: interval, dedupInterval: dedupInterval, @@ -615,7 +629,8 @@ func newAggregator(cfg *Config, path string, pushFunc PushFunc, ms *metrics.Set, suffix: suffix, - stopCh: make(chan struct{}), + stopCh: make(chan struct{}), + flushAfter: histogram.NewFast(), flushDuration: ms.NewHistogram(fmt.Sprintf(`vm_streamaggr_flush_duration_seconds{%s}`, metricLabels)), dedupFlushDuration: ms.NewHistogram(fmt.Sprintf(`vm_streamaggr_dedup_flush_duration_seconds{%s}`, metricLabels)), @@ -639,6 +654,7 @@ func newAggregator(cfg *Config, path string, pushFunc PushFunc, ms *metrics.Set, n := a.da.itemsCount() return float64(n) }) + a.tickInterval = dedupInterval.Milliseconds() } alignFlushToInterval := !opts.NoAlignFlushToInterval @@ -651,6 +667,12 @@ func newAggregator(cfg *Config, path string, pushFunc PushFunc, ms *metrics.Set, skipIncompleteFlush = !*v } + minTime := time.Now() + if skipIncompleteFlush && alignFlushToInterval { + minTime = minTime.Truncate(interval).Add(interval) + } + a.minTimestamp.Store(minTime.UnixMilli()) + a.wg.Add(1) go func() { a.runFlusher(pushFunc, alignFlushToInterval, skipIncompleteFlush, ignoreFirstIntervals) @@ -703,11 +725,11 @@ func newAggrState(output string, outputsSeen map[string]struct{}, stalenessInter case "count_series": return newCountSeriesAggrState(), nil case "histogram_bucket": - return newHistogramBucketAggrState(stalenessInterval), nil + return newHistogramBucketAggrState(), nil case "increase": - return newTotalAggrState(stalenessInterval, ignoreFirstSampleInterval, true, true), nil + return newTotalAggrState(ignoreFirstSampleInterval, true, true), nil case "increase_prometheus": - return newTotalAggrState(stalenessInterval, ignoreFirstSampleInterval, true, false), nil + return newTotalAggrState(ignoreFirstSampleInterval, true, false), nil case "last": return newLastAggrState(), nil case "max": @@ -715,9 +737,9 @@ func newAggrState(output string, outputsSeen map[string]struct{}, stalenessInter case "min": return newMinAggrState(), nil case "rate_avg": - return newRateAggrState(stalenessInterval, true), nil + return newRateAggrState(true), nil case "rate_sum": - return newRateAggrState(stalenessInterval, false), nil + return newRateAggrState(false), nil case "stddev": return newStddevAggrState(), nil case "stdvar": @@ -725,9 +747,9 @@ func newAggrState(output string, outputsSeen map[string]struct{}, stalenessInter case "sum_samples": return newSumSamplesAggrState(), nil case "total": - return newTotalAggrState(stalenessInterval, ignoreFirstSampleInterval, false, true), nil + return newTotalAggrState(ignoreFirstSampleInterval, false, true), nil case "total_prometheus": - return newTotalAggrState(stalenessInterval, ignoreFirstSampleInterval, false, false), nil + return newTotalAggrState(ignoreFirstSampleInterval, false, false), nil case "unique_samples": return newUniqueSamplesAggrState(), nil default: @@ -759,79 +781,98 @@ func (a *aggregator) runFlusher(pushFunc PushFunc, alignFlushToInterval, skipInc return false case ct := <-t.C: flushTimeMsec = ct.UnixMilli() + fmt.Println(flushTimeMsec) return true } } - if a.dedupInterval <= 0 { - alignedSleep(a.interval) - t := time.NewTicker(a.interval) - defer t.Stop() + flushDeadline := time.Now().Truncate(a.interval).Add(a.interval) + tickInterval := time.Duration(a.tickInterval) * time.Millisecond + alignedSleep(tickInterval) - if alignFlushToInterval && skipIncompleteFlush { - a.flush(nil, 0) - ignoreFirstIntervals-- + var dedupIdx, flushIdx int + + t := time.NewTicker(tickInterval) + defer t.Stop() + + isSkippedFirstFlush := false + for tickerWait(t) { + ct := time.Now() + + dedupTime := ct.Truncate(tickInterval) + if a.ignoreOldSamples { + dedupIdx, flushIdx = a.getFlushIndices(dedupTime, flushDeadline) + } + pf := pushFunc + + // Calculate delay + a.muFlushAfter.Lock() + flushAfterMsec := a.flushAfter.Quantile(0.95) + a.flushAfter.Reset() + a.muFlushAfter.Unlock() + flushAfter := time.Duration(flushAfterMsec) * time.Millisecond + + if flushAfter > tickInterval { + logger.Warnf("metrics ingestion lag (%v) is more than tick interval (%v). "+ + "gaps are expected in aggregations", flushAfter, tickInterval) + pf = nil + } else { + time.Sleep(flushAfter) } - for tickerWait(t) { - if ignoreFirstIntervals > 0 { - a.flush(nil, 0) + a.dedupFlush(dedupTime.UnixMilli(), dedupIdx, flushIdx) + + if ct.After(flushDeadline) { + // It is time to flush the aggregated state + if alignFlushToInterval && skipIncompleteFlush && !isSkippedFirstFlush { + a.flush(nil, 0, flushIdx) + isSkippedFirstFlush = true + } else if ignoreFirstIntervals > 0 { + a.flush(nil, 0, flushIdx) ignoreFirstIntervals-- } else { - a.flush(pushFunc, flushTimeMsec) + a.flush(pf, flushDeadline.UnixMilli(), flushIdx) } - - if alignFlushToInterval { - select { - case <-t.C: - default: - } + for ct.After(flushDeadline) { + flushDeadline = flushDeadline.Add(a.interval) } } - } else { - alignedSleep(a.dedupInterval) - t := time.NewTicker(a.dedupInterval) - defer t.Stop() - - flushDeadline := time.Now().Add(a.interval) - isSkippedFirstFlush := false - for tickerWait(t) { - a.dedupFlush() - - ct := time.Now() - if ct.After(flushDeadline) { - // It is time to flush the aggregated state - if alignFlushToInterval && skipIncompleteFlush && !isSkippedFirstFlush { - a.flush(nil, 0) - ignoreFirstIntervals-- - isSkippedFirstFlush = true - } else if ignoreFirstIntervals > 0 { - a.flush(nil, 0) - ignoreFirstIntervals-- - } else { - a.flush(pushFunc, flushTimeMsec) - } - for ct.After(flushDeadline) { - flushDeadline = flushDeadline.Add(a.interval) - } - } - - if alignFlushToInterval { - select { - case <-t.C: - default: - } + if alignFlushToInterval { + select { + case <-t.C: + default: } } } if !skipIncompleteFlush && ignoreFirstIntervals <= 0 { - a.dedupFlush() - a.flush(pushFunc, flushTimeMsec) + dedupTime := time.Now().Truncate(tickInterval).Add(tickInterval) + if a.ignoreOldSamples { + dedupIdx, flushIdx = a.getFlushIndices(dedupTime, flushDeadline) + } + a.dedupFlush(flushDeadline.UnixMilli(), dedupIdx, flushIdx) + a.flush(pushFunc, flushDeadline.UnixMilli(), flushIdx) } } -func (a *aggregator) dedupFlush() { +func (a *aggregator) getFlushIndices(dedupTime, flushTime time.Time) (int, int) { + flushTimestamp := flushTime.UnixMilli() + flushIntervals := int(flushTimestamp / int64(a.interval/time.Millisecond)) + var dedupIndex, flushIndex int + if a.dedupInterval > 0 { + dedupTimestamp := dedupTime.UnixMilli() + dedupIntervals := int(dedupTimestamp / int64(a.dedupInterval/time.Millisecond)) + intervalsRatio := int(a.interval / a.dedupInterval) + dedupIndex = dedupIntervals % aggrStateSize + flushIndex = flushIntervals % (aggrStateSize / intervalsRatio) + } else { + flushIndex = flushIntervals % aggrStateSize + dedupIndex = flushIndex + } + return dedupIndex, flushIndex +} + +func (a *aggregator) dedupFlush(deleteDeadline int64, dedupIdx, flushIdx int) { if a.dedupInterval <= 0 { // The de-duplication is disabled. return @@ -839,7 +880,7 @@ func (a *aggregator) dedupFlush() { startTime := time.Now() - a.da.flush(a.pushSamples) + a.da.flush(a.pushSamples, deleteDeadline, dedupIdx, flushIdx) d := time.Since(startTime) a.dedupFlushDuration.Update(d.Seconds()) @@ -854,14 +895,9 @@ func (a *aggregator) dedupFlush() { // flush flushes aggregator state to pushFunc. // // If pushFunc is nil, then the aggregator state is just reset. -func (a *aggregator) flush(pushFunc PushFunc, flushTimeMsec int64) { +func (a *aggregator) flush(pushFunc PushFunc, flushTimeMsec int64, idx int) { startTime := time.Now() - // Update minTimestamp before flushing samples to the storage, - // since the flush durtion can be quite long. - // This should prevent from dropping samples with old timestamps when the flush takes long time. - a.minTimestamp.Store(flushTimeMsec - 5_000) - var wg sync.WaitGroup for i := range a.aggrOutputs { ao := &a.aggrOutputs[i] @@ -873,7 +909,7 @@ func (a *aggregator) flush(pushFunc PushFunc, flushTimeMsec int64) { wg.Done() }() - ctx := getFlushCtx(a, ao, pushFunc, flushTimeMsec) + ctx := getFlushCtx(a, ao, pushFunc, flushTimeMsec, idx) ao.as.flushState(ctx) ctx.flushSeries() putFlushCtx(ctx) @@ -883,6 +919,7 @@ func (a *aggregator) flush(pushFunc PushFunc, flushTimeMsec int64) { d := time.Since(startTime) a.flushDuration.Update(d.Seconds()) + a.minTimestamp.Store(flushTimeMsec) if d > a.interval { a.flushTimeouts.Inc() logger.Warnf("stream aggregation couldn't be finished in the configured interval=%s; it took %.03fs; "+ @@ -911,13 +948,17 @@ func (a *aggregator) Push(tss []prompbmarshal.TimeSeries, matchIdxs []byte) { labels := &ctx.labels inputLabels := &ctx.inputLabels outputLabels := &ctx.outputLabels + now := time.Now() + nowMsec := now.UnixMilli() + deleteDeadline := now.Add(a.stalenessInterval) + deleteDeadlineMsec := deleteDeadline.UnixMilli() dropLabels := a.dropInputLabels ignoreOldSamples := a.ignoreOldSamples minTimestamp := a.minTimestamp.Load() - nowMsec := time.Now().UnixMilli() var maxLagMsec int64 + var flushIdx int for idx, ts := range tss { if !a.match.Match(ts.Labels) { continue @@ -950,21 +991,28 @@ func (a *aggregator) Push(tss []prompbmarshal.TimeSeries, matchIdxs []byte) { // do not intern key because number of unique keys could be too high key := bytesutil.ToUnsafeString(buf[bufLen:]) for _, s := range ts.Samples { + a.muFlushAfter.Lock() + a.flushAfter.Update(float64(nowMsec - s.Timestamp)) + a.muFlushAfter.Unlock() if math.IsNaN(s.Value) { - a.ignoredNaNSamples.Inc() // Skip NaN values + a.ignoredNaNSamples.Inc() continue } if ignoreOldSamples && s.Timestamp < minTimestamp { a.ignoredOldSamples.Inc() // Skip old samples outside the current aggregation interval + a.ignoredOldSamples.Inc() continue } lagMsec := nowMsec - s.Timestamp if lagMsec > maxLagMsec { maxLagMsec = lagMsec } - samples = append(samples, pushSample{ + if ignoreOldSamples { + flushIdx = int((s.Timestamp)/a.tickInterval+1) % aggrStateSize + } + samples[flushIdx] = append(samples[flushIdx], pushSample{ key: key, value: s.Value, timestamp: s.Timestamp, @@ -979,9 +1027,13 @@ func (a *aggregator) Push(tss []prompbmarshal.TimeSeries, matchIdxs []byte) { ctx.buf = buf if a.da != nil { - a.da.pushSamples(samples) + for idx, s := range samples { + a.da.pushSamples(s, idx) + } } else { - a.pushSamples(samples) + for idx, s := range samples { + a.pushSamples(s, deleteDeadlineMsec, idx) + } } } @@ -1022,14 +1074,14 @@ func getInputOutputKey(key string) (string, string) { return bytesutil.ToUnsafeString(inputKey), bytesutil.ToUnsafeString(outputKey) } -func (a *aggregator) pushSamples(samples []pushSample) { +func (a *aggregator) pushSamples(samples []pushSample, deleteDeadline int64, idx int) { for _, ao := range a.aggrOutputs { - ao.as.pushSamples(samples) + ao.as.pushSamples(samples, deleteDeadline, idx) } } type pushCtx struct { - samples []pushSample + samples [aggrStateSize][]pushSample labels promutils.Labels inputLabels promutils.Labels outputLabels promutils.Labels @@ -1037,8 +1089,9 @@ type pushCtx struct { } func (ctx *pushCtx) reset() { - clear(ctx.samples) - ctx.samples = ctx.samples[:0] + for i := range ctx.samples { + ctx.samples[i] = ctx.samples[i][:0] + } ctx.labels.Reset() ctx.inputLabels.Reset() @@ -1090,7 +1143,7 @@ func getInputOutputLabels(dstInput, dstOutput, labels []prompbmarshal.Label, by, return dstInput, dstOutput } -func getFlushCtx(a *aggregator, ao *aggrOutput, pushFunc PushFunc, flushTimestamp int64) *flushCtx { +func getFlushCtx(a *aggregator, ao *aggrOutput, pushFunc PushFunc, flushTimestamp int64, idx int) *flushCtx { v := flushCtxPool.Get() if v == nil { v = &flushCtx{} @@ -1100,6 +1153,7 @@ func getFlushCtx(a *aggregator, ao *aggrOutput, pushFunc PushFunc, flushTimestam ctx.ao = ao ctx.pushFunc = pushFunc ctx.flushTimestamp = flushTimestamp + ctx.idx = idx return ctx } @@ -1115,6 +1169,7 @@ type flushCtx struct { ao *aggrOutput pushFunc PushFunc flushTimestamp int64 + idx int tss []prompbmarshal.TimeSeries labels []prompbmarshal.Label diff --git a/lib/streamaggr/streamaggr_test.go b/lib/streamaggr/streamaggr_test.go index 8338be12cf..b34a816b42 100644 --- a/lib/streamaggr/streamaggr_test.go +++ b/lib/streamaggr/streamaggr_test.go @@ -305,7 +305,7 @@ func TestAggregatorsSuccess(t *testing.T) { outputs: [count_samples, sum_samples, count_series, last] `, ` foo{abc="123"} 4 -bar 5 100 +bar 5 11 bar 34 10 foo{abc="123"} 8.5 foo{abc="456",de="fg"} 8 @@ -586,7 +586,7 @@ foo:1m_total_prometheus 0 `, ` foo 123 bar{baz="qwe"} 1.31 -bar{baz="qwe"} 4.34 1000 +bar{baz="qwe"} 4.34 1 bar{baz="qwe"} 2 foo{baz="qwe"} -5 bar{baz="qwer"} 343 diff --git a/lib/streamaggr/sum_samples.go b/lib/streamaggr/sum_samples.go index 947239bb3e..0af8c07d02 100644 --- a/lib/streamaggr/sum_samples.go +++ b/lib/streamaggr/sum_samples.go @@ -12,16 +12,22 @@ type sumSamplesAggrState struct { } type sumSamplesStateValue struct { - mu sync.Mutex - sum float64 - deleted bool + mu sync.Mutex + state [aggrStateSize]sumState + deleted bool + deleteDeadline int64 +} + +type sumState struct { + sum float64 + exists bool } func newSumSamplesAggrState() *sumSamplesAggrState { return &sumSamplesAggrState{} } -func (as *sumSamplesAggrState) pushSamples(samples []pushSample) { +func (as *sumSamplesAggrState) pushSamples(samples []pushSample, deleteDeadline int64, idx int) { for i := range samples { s := &samples[i] outputKey := getOutputKey(s.key) @@ -30,23 +36,21 @@ func (as *sumSamplesAggrState) pushSamples(samples []pushSample) { v, ok := as.m.Load(outputKey) if !ok { // The entry is missing in the map. Try creating it. - v = &sumSamplesStateValue{ - sum: s.value, - } + v = &sumSamplesStateValue{} outputKey = bytesutil.InternString(outputKey) vNew, loaded := as.m.LoadOrStore(outputKey, v) - if !loaded { - // The new entry has been successfully created. - continue + if loaded { + // Update the entry created by a concurrent goroutine. + v = vNew } - // Use the entry created by a concurrent goroutine. - v = vNew } sv := v.(*sumSamplesStateValue) sv.mu.Lock() deleted := sv.deleted if !deleted { - sv.sum += s.value + sv.state[idx].sum += s.value + sv.state[idx].exists = true + sv.deleteDeadline = deleteDeadline } sv.mu.Unlock() if deleted { @@ -60,18 +64,25 @@ func (as *sumSamplesAggrState) pushSamples(samples []pushSample) { func (as *sumSamplesAggrState) flushState(ctx *flushCtx) { m := &as.m m.Range(func(k, v any) bool { - // Atomically delete the entry from the map, so new entry is created for the next flush. - m.Delete(k) - sv := v.(*sumSamplesStateValue) sv.mu.Lock() - sum := sv.sum - // Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls. - sv.deleted = true - sv.mu.Unlock() - key := k.(string) - ctx.appendSeries(key, "sum_samples", sum) + // check for stale entries + deleted := ctx.flushTimestamp > sv.deleteDeadline + if deleted { + // Mark the current entry as deleted + sv.deleted = deleted + sv.mu.Unlock() + m.Delete(k) + return true + } + state := sv.state[ctx.idx] + sv.state[ctx.idx] = sumState{} + sv.mu.Unlock() + if state.exists { + key := k.(string) + ctx.appendSeries(key, "sum_samples", state.sum) + } return true }) } diff --git a/lib/streamaggr/total.go b/lib/streamaggr/total.go index 53dbcdc478..51964e66d7 100644 --- a/lib/streamaggr/total.go +++ b/lib/streamaggr/total.go @@ -5,8 +5,8 @@ import ( "sync" "time" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" ) // totalAggrState calculates output=total, total_prometheus, increase and increase_prometheus. @@ -18,13 +18,7 @@ type totalAggrState struct { // Whether to take into account the first sample in new time series when calculating the output value. keepFirstSample bool - - // Time series state is dropped if no new samples are received during stalenessSecs. - // - // Aslo, the first sample per each new series is ignored during stalenessSecs even if keepFirstSample is set. - // see ignoreFirstSampleDeadline for more details. - stalenessSecs uint64 - + // The first sample per each new series is ignored until this unix timestamp deadline in seconds even if keepFirstSample is set. // This allows avoiding an initial spike of the output values at startup when new time series // cannot be distinguished from already existing series. This is tracked with ignoreFirstSampleDeadline. @@ -33,33 +27,36 @@ type totalAggrState struct { type totalStateValue struct { mu sync.Mutex - lastValues map[string]totalLastValueState - total float64 - deleteDeadline uint64 + shared totalState + state [aggrStateSize]float64 + deleteDeadline int64 deleted bool } +type totalState struct { + total float64 + lastValues map[string]totalLastValueState +} + type totalLastValueState struct { value float64 timestamp int64 - deleteDeadline uint64 + deleteDeadline int64 } -func newTotalAggrState(stalenessInterval, ignoreFirstSampleInterval time.Duration, resetTotalOnFlush, keepFirstSample bool) *totalAggrState { - stalenessSecs := roundDurationToSecs(stalenessInterval) - ignoreFirstSampleDeadline := fasttime.UnixTimestamp() + roundDurationToSecs(ignoreFirstSampleInterval) +func newTotalAggrState(ignoreFirstSampleInterval time.Duration, resetTotalOnFlush, keepFirstSample bool) *totalAggrState { + ignoreFirstSampleDeadline := time.Now().Add(ignoreFirstSampleInterval) return &totalAggrState{ resetTotalOnFlush: resetTotalOnFlush, keepFirstSample: keepFirstSample, - stalenessSecs: stalenessSecs, - ignoreFirstSampleDeadline: ignoreFirstSampleDeadline, + ignoreFirstSampleDeadline: uint64(ignoreFirstSampleDeadline.Unix()), } } -func (as *totalAggrState) pushSamples(samples []pushSample) { +func (as *totalAggrState) pushSamples(samples []pushSample, deleteDeadline int64, idx int) { + var deleted bool currentTime := fasttime.UnixTimestamp() - deleteDeadline := currentTime + as.stalenessSecs keepFirstSample := as.keepFirstSample && currentTime >= as.ignoreFirstSampleDeadline for i := range samples { s := &samples[i] @@ -70,7 +67,9 @@ func (as *totalAggrState) pushSamples(samples []pushSample) { if !ok { // The entry is missing in the map. Try creating it. v = &totalStateValue{ - lastValues: make(map[string]totalLastValueState), + shared: totalState{ + lastValues: make(map[string]totalLastValueState), + }, } outputKey = bytesutil.InternString(outputKey) vNew, loaded := as.m.LoadOrStore(outputKey, v) @@ -81,9 +80,9 @@ func (as *totalAggrState) pushSamples(samples []pushSample) { } sv := v.(*totalStateValue) sv.mu.Lock() - deleted := sv.deleted + deleted = sv.deleted if !deleted { - lv, ok := sv.lastValues[inputKey] + lv, ok := sv.shared.lastValues[inputKey] if ok || keepFirstSample { if s.timestamp < lv.timestamp { // Skip out of order sample @@ -92,10 +91,10 @@ func (as *totalAggrState) pushSamples(samples []pushSample) { } if s.value >= lv.value { - sv.total += s.value - lv.value + sv.state[idx] += s.value - lv.value } else { // counter reset - sv.total += s.value + sv.state[idx] += s.value } } lv.value = s.value @@ -103,7 +102,7 @@ func (as *totalAggrState) pushSamples(samples []pushSample) { lv.deleteDeadline = deleteDeadline inputKey = bytesutil.InternString(inputKey) - sv.lastValues[inputKey] = lv + sv.shared.lastValues[inputKey] = lv sv.deleteDeadline = deleteDeadline } sv.mu.Unlock() @@ -115,72 +114,55 @@ func (as *totalAggrState) pushSamples(samples []pushSample) { } } -func (as *totalAggrState) flushState(ctx *flushCtx) { - currentTime := fasttime.UnixTimestamp() - - suffix := as.getSuffix() - - as.removeOldEntries(currentTime) - - m := &as.m - m.Range(func(k, v any) bool { - sv := v.(*totalStateValue) - - sv.mu.Lock() - total := sv.total - if as.resetTotalOnFlush { - sv.total = 0 - } else if math.Abs(sv.total) >= (1 << 53) { - // It is time to reset the entry, since it starts losing float64 precision - sv.total = 0 - } - deleted := sv.deleted - sv.mu.Unlock() - - if !deleted { - key := k.(string) - ctx.appendSeries(key, suffix, total) - } - return true - }) -} - func (as *totalAggrState) getSuffix() string { - // Note: this function is at hot path, so it shouldn't allocate. - if as.resetTotalOnFlush { - if as.keepFirstSample { - return "increase" - } - return "increase_prometheus" - } - if as.keepFirstSample { - return "total" - } - return "total_prometheus" + // Note: this function is at hot path, so it shouldn't allocate. + if as.resetTotalOnFlush { + if as.keepFirstSample { + return "increase" + } + return "increase_prometheus" + } + if as.keepFirstSample { + return "total" + } + return "total_prometheus" } -func (as *totalAggrState) removeOldEntries(currentTime uint64) { +func (as *totalAggrState) flushState(ctx *flushCtx) { + var total float64 m := &as.m - m.Range(func(k, v any) bool { + suffix := as.getSuffix() + m.Range(func(k, v interface{}) bool { sv := v.(*totalStateValue) sv.mu.Lock() - if currentTime > sv.deleteDeadline { + // check for stale entries + deleted := ctx.flushTimestamp > sv.deleteDeadline + if deleted { // Mark the current entry as deleted - sv.deleted = true + sv.deleted = deleted sv.mu.Unlock() m.Delete(k) return true } - - // Delete outdated entries in sv.lastValues - lvs := sv.lastValues - for k1, lv := range lvs { - if currentTime > lv.deleteDeadline { - delete(lvs, k1) + total = sv.shared.total + sv.state[ctx.idx] + for k1, v1 := range sv.shared.lastValues { + if ctx.flushTimestamp > v1.deleteDeadline { + delete(sv.shared.lastValues, k1) + } + } + sv.state[ctx.idx] = 0 + if !as.resetTotalOnFlush { + if math.Abs(total) >= (1 << 53) { + // It is time to reset the entry, since it starts losing float64 precision + sv.shared.total = 0 + } else { + sv.shared.total = total } } sv.mu.Unlock() + key := k.(string) + ctx.appendSeries(key, suffix, total) return true }) } diff --git a/lib/streamaggr/unique_samples.go b/lib/streamaggr/unique_samples.go index 93145397cf..5baceac900 100644 --- a/lib/streamaggr/unique_samples.go +++ b/lib/streamaggr/unique_samples.go @@ -12,16 +12,17 @@ type uniqueSamplesAggrState struct { } type uniqueSamplesStateValue struct { - mu sync.Mutex - m map[float64]struct{} - deleted bool + mu sync.Mutex + state [aggrStateSize]map[float64]struct{} + deleted bool + deleteDeadline int64 } func newUniqueSamplesAggrState() *uniqueSamplesAggrState { return &uniqueSamplesAggrState{} } -func (as *uniqueSamplesAggrState) pushSamples(samples []pushSample) { +func (as *uniqueSamplesAggrState) pushSamples(samples []pushSample, deleteDeadline int64, idx int) { for i := range samples { s := &samples[i] outputKey := getOutputKey(s.key) @@ -30,27 +31,26 @@ func (as *uniqueSamplesAggrState) pushSamples(samples []pushSample) { v, ok := as.m.Load(outputKey) if !ok { // The entry is missing in the map. Try creating it. - v = &uniqueSamplesStateValue{ - m: map[float64]struct{}{ - s.value: {}, - }, + usv := &uniqueSamplesStateValue{} + for iu := range usv.state { + usv.state[iu] = make(map[float64]struct{}) } + v = usv outputKey = bytesutil.InternString(outputKey) vNew, loaded := as.m.LoadOrStore(outputKey, v) - if !loaded { - // The new entry has been successfully created. - continue + if loaded { + // Update the entry created by a concurrent goroutine. + v = vNew } - // Use the entry created by a concurrent goroutine. - v = vNew } sv := v.(*uniqueSamplesStateValue) sv.mu.Lock() deleted := sv.deleted if !deleted { - if _, ok := sv.m[s.value]; !ok { - sv.m[s.value] = struct{}{} + if _, ok := sv.state[idx][s.value]; !ok { + sv.state[idx][s.value] = struct{}{} } + sv.deleteDeadline = deleteDeadline } sv.mu.Unlock() if deleted { @@ -64,18 +64,25 @@ func (as *uniqueSamplesAggrState) pushSamples(samples []pushSample) { func (as *uniqueSamplesAggrState) flushState(ctx *flushCtx) { m := &as.m m.Range(func(k, v any) bool { - // Atomically delete the entry from the map, so new entry is created for the next flush. - m.Delete(k) - sv := v.(*uniqueSamplesStateValue) sv.mu.Lock() - n := len(sv.m) - // Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls. - sv.deleted = true - sv.mu.Unlock() - key := k.(string) - ctx.appendSeries(key, "unique_samples", float64(n)) + // check for stale entries + deleted := ctx.flushTimestamp > sv.deleteDeadline + if deleted { + // Mark the current entry as deleted + sv.deleted = deleted + sv.mu.Unlock() + m.Delete(k) + return true + } + state := len(sv.state[ctx.idx]) + sv.state[ctx.idx] = make(map[float64]struct{}) + sv.mu.Unlock() + if state > 0 { + key := k.(string) + ctx.appendSeries(key, "unique_samples", float64(state)) + } return true }) }