diff --git a/lib/streamaggr/dedup.go b/lib/streamaggr/dedup.go index 399e9a3dc..7bb44262e 100644 --- a/lib/streamaggr/dedup.go +++ b/lib/streamaggr/dedup.go @@ -4,8 +4,9 @@ import ( "sync" "unsafe" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/cespare/xxhash/v2" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" ) const dedupAggrShardsCount = 128 @@ -113,7 +114,7 @@ func (ctx *dedupFlushCtx) reset() { ctx.samples = ctx.samples[:0] } -func (da *dedupAggr) flush(f func(samples []pushSample), resetState bool) { +func (da *dedupAggr) flush(f func(samples []pushSample)) { var wg sync.WaitGroup for i := range da.shards { flushConcurrencyCh <- struct{}{} @@ -125,7 +126,7 @@ func (da *dedupAggr) flush(f func(samples []pushSample), resetState bool) { }() ctx := getDedupFlushCtx() - shard.flush(ctx, f, resetState) + shard.flush(ctx, f) putDedupFlushCtx(ctx) }(&da.shards[i]) } @@ -193,11 +194,11 @@ func (das *dedupAggrShard) pushSamples(samples []pushSample) { } } -func (das *dedupAggrShard) flush(ctx *dedupFlushCtx, f func(samples []pushSample), resetState bool) { +func (das *dedupAggrShard) flush(ctx *dedupFlushCtx, f func(samples []pushSample)) { das.mu.Lock() m := das.m - if resetState && len(m) > 0 { + if len(m) > 0 { das.m = make(map[string]dedupAggrSample, len(m)) } diff --git a/lib/streamaggr/dedup_test.go b/lib/streamaggr/dedup_test.go index f0b4cc951..38e03678b 100644 --- a/lib/streamaggr/dedup_test.go +++ b/lib/streamaggr/dedup_test.go @@ -39,7 +39,7 @@ func TestDedupAggrSerial(t *testing.T) { } mu.Unlock() } - da.flush(flushSamples, true) + da.flush(flushSamples) if !reflect.DeepEqual(expectedSamplesMap, flushedSamplesMap) { t.Fatalf("unexpected samples;\ngot\n%v\nwant\n%v", flushedSamplesMap, expectedSamplesMap) diff --git a/lib/streamaggr/dedup_timing_test.go b/lib/streamaggr/dedup_timing_test.go index abb1d0451..d7a05aa64 100644 --- a/lib/streamaggr/dedup_timing_test.go +++ b/lib/streamaggr/dedup_timing_test.go @@ -4,7 +4,6 @@ import ( "fmt" "sync/atomic" "testing" - "time" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" ) @@ -17,20 +16,6 @@ func BenchmarkDedupAggr(b *testing.B) { } } -func BenchmarkDedupAggrFlushSerial(b *testing.B) { - as := newTotalAggrState(time.Hour, true, true) - benchSamples := newBenchSamples(100_000) - da := newDedupAggr() - da.pushSamples(benchSamples) - - b.ResetTimer() - b.ReportAllocs() - b.SetBytes(int64(len(benchSamples))) - for i := 0; i < b.N; i++ { - da.flush(as.pushSamples, false) - } -} - func benchmarkDedupAggr(b *testing.B, samplesPerPush int) { const loops = 100 benchSamples := newBenchSamples(samplesPerPush) diff --git a/lib/streamaggr/deduplicator.go b/lib/streamaggr/deduplicator.go index 3ec25f5fa..0212bfd9e 100644 --- a/lib/streamaggr/deduplicator.go +++ b/lib/streamaggr/deduplicator.go @@ -166,7 +166,7 @@ func (d *Deduplicator) flush(pushFunc PushFunc, dedupInterval time.Duration) { ctx.labels = labels ctx.samples = samples putDeduplicatorFlushCtx(ctx) - }, true) + }) duration := time.Since(startTime) d.dedupFlushDuration.Update(duration.Seconds()) diff --git a/lib/streamaggr/streamaggr.go b/lib/streamaggr/streamaggr.go index 92fdba57a..34ad118cf 100644 --- a/lib/streamaggr/streamaggr.go +++ b/lib/streamaggr/streamaggr.go @@ -747,7 +747,7 @@ func (a *aggregator) dedupFlush(dedupInterval time.Duration) { startTime := time.Now() - a.da.flush(a.pushSamples, true) + a.da.flush(a.pushSamples) d := time.Since(startTime) a.dedupFlushDuration.Update(d.Seconds())