diff --git a/go.mod b/go.mod index f7e130f6b..f5c2004f8 100644 --- a/go.mod +++ b/go.mod @@ -8,7 +8,7 @@ require ( github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.3.2 github.com/VictoriaMetrics/easyproto v0.1.4 github.com/VictoriaMetrics/fastcache v1.12.2 - github.com/VictoriaMetrics/metrics v1.34.0 + github.com/VictoriaMetrics/metrics v1.34.1-0.20240709080436-925f863c7bd0 github.com/VictoriaMetrics/metricsql v0.76.0 github.com/aws/aws-sdk-go-v2 v1.30.1 github.com/aws/aws-sdk-go-v2/config v1.27.23 diff --git a/go.sum b/go.sum index 417ded19b..6756e5338 100644 --- a/go.sum +++ b/go.sum @@ -73,6 +73,10 @@ github.com/VictoriaMetrics/fastcache v1.12.2 h1:N0y9ASrJ0F6h0QaC3o6uJb3NIZ9VKLjC github.com/VictoriaMetrics/fastcache v1.12.2/go.mod h1:AmC+Nzz1+3G2eCPapF6UcsnkThDcMsQicp4xDukwJYI= github.com/VictoriaMetrics/metrics v1.34.0 h1:0i8k/gdOJdSoZB4Z9pikVnVQXfhcIvnG7M7h2WaQW2w= github.com/VictoriaMetrics/metrics v1.34.0/go.mod h1:r7hveu6xMdUACXvB8TYdAj8WEsKzWB0EkpJN+RDtOf8= +github.com/VictoriaMetrics/metrics v1.34.1-0.20240709080040-3f62e95de24e h1:IgNJoIYb2IhknxOLEAAG0ktj0f1609jpgmXjpPVrJ7s= +github.com/VictoriaMetrics/metrics v1.34.1-0.20240709080040-3f62e95de24e/go.mod h1:r7hveu6xMdUACXvB8TYdAj8WEsKzWB0EkpJN+RDtOf8= +github.com/VictoriaMetrics/metrics v1.34.1-0.20240709080436-925f863c7bd0 h1:qP+3SX4eslXLPmsJpGjnMv+9UbmyrSj/Yf5CqPm6bLE= +github.com/VictoriaMetrics/metrics v1.34.1-0.20240709080436-925f863c7bd0/go.mod h1:r7hveu6xMdUACXvB8TYdAj8WEsKzWB0EkpJN+RDtOf8= github.com/VictoriaMetrics/metricsql v0.76.0 h1:hl7vqJqyH2d8zKImzalkFrkFiD5q4ACF8gl3s86DqKA= github.com/VictoriaMetrics/metricsql v0.76.0/go.mod h1:1g4hdCwlbJZ851PU9VN65xy9Rdlzupo6fx3SNZ8Z64U= github.com/VividCortex/ewma v1.2.0 h1:f58SaIzcDXrSy3kWaHNvuJgJ3Nmz59Zji6XoJR/q1ow= diff --git a/lib/streamaggr/dedup.go b/lib/streamaggr/dedup.go index 145b8b74f..70a77a6d8 100644 --- a/lib/streamaggr/dedup.go +++ b/lib/streamaggr/dedup.go @@ -72,7 +72,7 @@ func (da *dedupAggr) itemsCount() uint64 { return n } -func (da *dedupAggr) pushSamples(samples []pushSample, dedupIdx int) { +func (da *dedupAggr) pushSamples(samples []pushSample, _ int64, dedupIdx int) { pss := getPerShardSamples() shards := pss.shards for _, sample := range samples { diff --git a/lib/streamaggr/dedup_test.go b/lib/streamaggr/dedup_test.go index f0c2c3365..94daeb33a 100644 --- a/lib/streamaggr/dedup_test.go +++ b/lib/streamaggr/dedup_test.go @@ -21,7 +21,7 @@ func TestDedupAggrSerial(t *testing.T) { sample.value = float64(i + j) expectedSamplesMap[sample.key] = *sample } - da.pushSamples(samples, 0) + da.pushSamples(samples, 0, 0) } if n := da.sizeBytes(); n > 5_000_000 { @@ -73,7 +73,7 @@ func TestDedupAggrConcurrent(_ *testing.T) { sample.key = fmt.Sprintf("key_%d", j) sample.value = float64(i + j) } - da.pushSamples(samples, 0) + da.pushSamples(samples, 0, 0) } }() } diff --git a/lib/streamaggr/dedup_timing_test.go b/lib/streamaggr/dedup_timing_test.go index 4a6fb586a..6f6cc4c7a 100644 --- a/lib/streamaggr/dedup_timing_test.go +++ b/lib/streamaggr/dedup_timing_test.go @@ -27,7 +27,7 @@ func benchmarkDedupAggr(b *testing.B, samplesPerPush int) { b.RunParallel(func(pb *testing.PB) { for pb.Next() { for i := 0; i < loops; i++ { - da.pushSamples(benchSamples, 0) + da.pushSamples(benchSamples, 0, 0) } } }) diff --git a/lib/streamaggr/deduplicator.go b/lib/streamaggr/deduplicator.go index 8abe335de..df3ec2d5f 100644 --- a/lib/streamaggr/deduplicator.go +++ b/lib/streamaggr/deduplicator.go @@ -115,7 +115,7 @@ func (d *Deduplicator) Push(tss []prompbmarshal.TimeSeries) { } for idx, ps := range pss { - d.da.pushSamples(ps, idx) + d.da.pushSamples(ps, 0, idx) } ctx.pss = pss diff --git a/lib/streamaggr/rate.go b/lib/streamaggr/rate.go index 2a9cd1e39..f71bca298 100644 --- a/lib/streamaggr/rate.go +++ b/lib/streamaggr/rate.go @@ -129,7 +129,7 @@ func (as *rateAggrState) flushState(ctx *flushCtx, flushTimestamp int64, idx int // Delete outdated entries in state var rate float64 - totalItems := len(sv.state) + var totalItems int for k1, state := range sv.state { if flushTimestamp > state.deleteDeadline { delete(sv.state, k1) @@ -149,10 +149,8 @@ func (as *rateAggrState) flushState(ctx *flushCtx, flushTimestamp int64, idx int rate += (v1.total) * 1000 / float64(rateInterval) state.prevTimestamp = v1.timestamp state.prevValue = v1.value - } else { - totalItems-- + totalItems++ } - totalItems -= staleInputSamples state.lastValues[idx] = rateLastValueState{} sv.state[k1] = state } diff --git a/lib/streamaggr/streamaggr.go b/lib/streamaggr/streamaggr.go index dd157ff21..4e84997a2 100644 --- a/lib/streamaggr/streamaggr.go +++ b/lib/streamaggr/streamaggr.go @@ -742,7 +742,7 @@ func (a *aggregator) runFlusher(pushFunc PushFunc, alignFlushToInterval, skipInc dedupTime := ct.Truncate(tickInterval) if a.ignoreOldSamples { - dedupIdx, flushIdx = a.getFlushIndices(dedupInterval, interval, dedupTime, flushDeadline) + dedupIdx, flushIdx = getAggrIdxs(dedupInterval, interval, dedupTime, flushDeadline) } pf := pushFunc @@ -789,28 +789,24 @@ func (a *aggregator) runFlusher(pushFunc PushFunc, alignFlushToInterval, skipInc if !skipIncompleteFlush && ignoreFirstIntervals <= 0 { dedupTime := time.Now().Truncate(tickInterval).Add(tickInterval) if a.ignoreOldSamples { - dedupIdx, flushIdx = a.getFlushIndices(dedupInterval, interval, dedupTime, flushDeadline) + dedupIdx, flushIdx = getAggrIdxs(dedupInterval, interval, dedupTime, flushDeadline) } a.dedupFlush(dedupInterval, flushDeadline.UnixMilli(), dedupIdx, flushIdx) a.flush(pushFunc, interval, flushDeadline.UnixMilli(), flushIdx) } } -func (a *aggregator) getFlushIndices(dedupInterval, interval time.Duration, dedupTime, flushTime time.Time) (int, int) { - flushTimestamp := flushTime.UnixMilli() - flushIntervals := int(flushTimestamp / int64(interval/time.Millisecond)) - var dedupIndex, flushIndex int +func getAggrIdxs(dedupInterval, interval time.Duration, dedupTime, flushTime time.Time) (int, int) { + flushIdx := getStateIdx(interval.Milliseconds(), flushTime.Add(-interval).UnixMilli()) + dedupIdx := flushIdx if dedupInterval > 0 { - dedupTimestamp := dedupTime.UnixMilli() - dedupIntervals := int(dedupTimestamp / int64(dedupInterval/time.Millisecond)) - intervalsRatio := int(interval / dedupInterval) - dedupIndex = dedupIntervals % aggrStateSize - flushIndex = flushIntervals % (aggrStateSize / intervalsRatio) - } else { - flushIndex = flushIntervals % aggrStateSize - dedupIndex = flushIndex + dedupIdx = getStateIdx(dedupInterval.Milliseconds(), dedupTime.Add(-dedupInterval).UnixMilli()) } - return dedupIndex, flushIndex + return dedupIdx, flushIdx +} + +func getStateIdx(interval int64, ts int64) int { + return int(ts/interval) % aggrStateSize } func (a *aggregator) dedupFlush(dedupInterval time.Duration, deleteDeadline int64, dedupIdx, flushIdx int) { @@ -933,13 +929,11 @@ func (a *aggregator) Push(tss []prompbmarshal.TimeSeries, matchIdxs []byte) { a.flushAfter.Update(float64(currentTimestamp - sample.Timestamp)) a.muFlushAfter.Unlock() if math.IsNaN(sample.Value) { - a.ignoredNanSamples.Inc() // Skip NaN values a.ignoredNanSamples.Inc() continue } if ignoreOldSamples && sample.Timestamp < minTimestamp { - a.ignoredOldSamples.Inc() // Skip old samples outside the current aggregation interval a.ignoredOldSamples.Inc() continue @@ -948,7 +942,7 @@ func (a *aggregator) Push(tss []prompbmarshal.TimeSeries, matchIdxs []byte) { maxLag = now - sample.Timestamp } if ignoreOldSamples { - flushIdx = int((sample.Timestamp)/a.tickInterval+1) % aggrStateSize + flushIdx = getStateIdx(a.tickInterval, sample.Timestamp) } samples[flushIdx] = append(samples[flushIdx], pushSample{ key: key, @@ -957,20 +951,20 @@ func (a *aggregator) Push(tss []prompbmarshal.TimeSeries, matchIdxs []byte) { }) } } - if len(samples) > 0 { - a.matchedSamples.Add(len(samples)) - a.samplesLag.Update(float64(maxLag) / 1_000) - } + ctx.samples = samples ctx.buf = buf + pushSamples := a.pushSamples if a.da != nil { - for idx, s := range samples { - a.da.pushSamples(s, idx) - } - } else { - for idx, s := range samples { - a.pushSamples(s, deleteDeadlineMilli, idx) + pushSamples = a.da.pushSamples + } + + for idx, s := range samples { + if len(s) > 0 { + a.samplesLag.Update(float64(maxLag) / 1_000) + a.matchedSamples.Add(len(s)) + pushSamples(s, deleteDeadlineMilli, idx) } } } diff --git a/lib/streamaggr/unique_samples.go b/lib/streamaggr/unique_samples.go index 49103c513..9dc69359e 100644 --- a/lib/streamaggr/unique_samples.go +++ b/lib/streamaggr/unique_samples.go @@ -79,10 +79,8 @@ func (as *uniqueSamplesAggrState) flushState(ctx *flushCtx, flushTimestamp int64 state := len(sv.state[idx]) sv.state[idx] = make(map[float64]struct{}) sv.mu.Unlock() - if state > 0 { - key := k.(string) - ctx.appendSeries(key, "unique_series", flushTimestamp, float64(state)) - } + key := k.(string) + ctx.appendSeries(key, "unique_samples", flushTimestamp, float64(state)) return true }) } diff --git a/vendor/github.com/VictoriaMetrics/metrics/histogram.go b/vendor/github.com/VictoriaMetrics/metrics/histogram.go index 0d953d59a..ccb63d99e 100644 --- a/vendor/github.com/VictoriaMetrics/metrics/histogram.go +++ b/vendor/github.com/VictoriaMetrics/metrics/histogram.go @@ -130,7 +130,7 @@ func (h *Histogram) Merge(b *Histogram) { h.decimalBuckets[i] = &b } for j := range db { - h.decimalBuckets[i][j] = db[j] + h.decimalBuckets[i][j] += db[j] } } } diff --git a/vendor/modules.txt b/vendor/modules.txt index 3629c3a10..3a5c337a8 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -115,7 +115,7 @@ github.com/VictoriaMetrics/easyproto # github.com/VictoriaMetrics/fastcache v1.12.2 ## explicit; go 1.13 github.com/VictoriaMetrics/fastcache -# github.com/VictoriaMetrics/metrics v1.34.0 +# github.com/VictoriaMetrics/metrics v1.34.1-0.20240709080436-925f863c7bd0 ## explicit; go 1.17 github.com/VictoriaMetrics/metrics # github.com/VictoriaMetrics/metricsql v0.76.0