From 664f337c70c401a7e3341c8d7974c1184b910306 Mon Sep 17 00:00:00 2001 From: Hui Wang Date: Mon, 30 Sep 2024 20:24:59 +0800 Subject: [PATCH] stream aggregation: fix possible duplicated aggregation results (#7118) When ingesting samples with the same labels(duplicated samples or samples with the same labels after `by` or `without` options). They could register different entries for the same labelset in LabelsCompressor. For example, both index 99 and 100 can be assigned to label `foo=1` in two concurrent pushes. Then due to differing label indexes in encoded keys, the samples will appear as distinct in aggrState, resulting in duplicated results after decompressing the label indexes. https://github.com/VictoriaMetrics/VictoriaMetrics/blob/fbde238cdcdf4e2c892d85a3e9e2be6e54e69cef/lib/streamaggr/streamaggr.go#L933 In this pull request, since we need to store `idxToLabel` first to ensure the idx can be searched after `lc.labelToIdxStore`, the `lc.idxToLabel` still could contain a duplicated entries [100]="foo=1". But given the low likelihood of this issue and the size of idxToLabel, it should be fine. --- docs/changelog/CHANGELOG.md | 1 + lib/promutils/labelscompressor.go | 13 ++++++++++++- lib/promutils/labelscompressor_test.go | 8 +++++++- 3 files changed, 20 insertions(+), 2 deletions(-) diff --git a/docs/changelog/CHANGELOG.md b/docs/changelog/CHANGELOG.md index 66b8ba573..3e332c86f 100644 --- a/docs/changelog/CHANGELOG.md +++ b/docs/changelog/CHANGELOG.md @@ -40,6 +40,7 @@ See also [LTS releases](https://docs.victoriametrics.com/lts-releases/). * BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent/): properly consume messages [from kafka](https://docs.victoriametrics.com/vmagent/#kafka-integration). Previously vmagent could skip some messages during start-up. * BUGFIX: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): perform deduplication for all received data when specifying `-streamAggr.dedupInterval` or `-remoteWrite.streamAggr.dedupInterval` command-line flags are set. Previously, if the `-remoteWrite.streamAggr.config` or `-streamAggr.config` is set, only series that matched aggregation config were deduplicated. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6711#issuecomment-2288361213) for details. * BUGFIX: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): fix `-remoteWrite.streamAggr.dropInputLabels` labels parsing. Now, this flag allows specifying a list of labels to drop (by using '^^' separator, i.e. `dropInputLabels='replica^^az,replica'`) per each corresponding `remoteWrite.url`. Before, `-remoteWrite.streamAggr.dropInputLabels` labels were incorrectly applied to all configured `remoteWrite.url`s. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6780) for the details. +* BUGFIX: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): fix duplicated aggregation results if there are multiple concurrent writing samples with the same label. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/7118). * BUGFIX: [vmagent dashboard](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/dashboards/vmagent.json): fix legend captions for stream aggregation related panels. Before they were displaying wrong label names. * BUGFIX: [vmgateway](https://docs.victoriametrics.com/vmgateway/): add missing `datadog`, `newrelic`, `opentelemetry` and `pushgateway` routes to the `JWT` authorization routes. Allows prefixed (`promtheus/graphite`) routes for query requests. * BUGFIX: [Single-node VictoriaMetrics](https://docs.victoriametrics.com/) and `vmstorage` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/cluster-victoriametrics/): properly cache empty list of matching time series for the given [labels filter](https://docs.victoriametrics.com/keyconcepts/#filtering). This type of caching was broken since [v1.97.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.97.0), which could result in the increased CPU usage when performing queries, which match zero time series. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7009). diff --git a/lib/promutils/labelscompressor.go b/lib/promutils/labelscompressor.go index 391b8f3c5..24c9d468d 100644 --- a/lib/promutils/labelscompressor.go +++ b/lib/promutils/labelscompressor.go @@ -59,8 +59,19 @@ func (lc *LabelsCompressor) compress(dst []uint64, labels []prompbmarshal.Label) idx := lc.nextIdx.Add(1) v = idx labelCopy := cloneLabel(label) + + // Must store idxToLabel entry before labelToIdx, + // so it can be found by possible concurrent goroutines. + // + // We might store duplicated entries for single label with different indexes, + // and it's fine, see https://github.com/VictoriaMetrics/VictoriaMetrics/pull/7118. lc.idxToLabel.Store(idx, labelCopy) - lc.labelToIdx.Store(labelCopy, v) + vNew, loaded := lc.labelToIdx.LoadOrStore(labelCopy, v) + if loaded { + // This label has been stored by a concurrent goroutine with different index, + // use it for key consistency in aggrState. + v = vNew + } // Update lc.totalSizeBytes labelSizeBytes := uint64(len(label.Name) + len(label.Value)) diff --git a/lib/promutils/labelscompressor_test.go b/lib/promutils/labelscompressor_test.go index b5d787be7..9b520dba8 100644 --- a/lib/promutils/labelscompressor_test.go +++ b/lib/promutils/labelscompressor_test.go @@ -68,6 +68,7 @@ func TestLabelsCompressorSerial(t *testing.T) { func TestLabelsCompressorConcurrent(t *testing.T) { const concurrency = 5 var lc LabelsCompressor + var expectCompressedKeys sync.Map var wg sync.WaitGroup for i := 0; i < concurrency; i++ { @@ -75,9 +76,14 @@ func TestLabelsCompressorConcurrent(t *testing.T) { go func() { defer wg.Done() series := newTestSeries(100, 20) - for i, labels := range series { + for n, labels := range series { sExpected := labelsToString(labels) data := lc.Compress(nil, labels) + if expectData, ok := expectCompressedKeys.LoadOrStore(n, data); ok { + if string(data) != string(expectData.([]byte)) { + panic(fmt.Errorf("unexpected compress result at series/%d in iteration %d ", n, i)) + } + } labelsResult := lc.Decompress(nil, data) sResult := labelsToString(labelsResult) if sExpected != sResult {