mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
stream aggregation: fix possible duplicated aggregation results (#7118)
When ingesting samples with the same labels(duplicated samples or
samples with the same labels after `by` or `without` options). They
could register different entries for the same labelset in
LabelsCompressor.
For example, both index 99 and 100 can be assigned to label `foo=1` in
two concurrent pushes. Then due to differing label indexes in encoded
keys, the samples will appear as distinct in aggrState, resulting in
duplicated results after decompressing the label indexes.
fbde238cdc/lib/streamaggr/streamaggr.go (L933)
In this pull request, since we need to store `idxToLabel` first to
ensure the idx can be searched after `lc.labelToIdxStore`,
the `lc.idxToLabel` still could contain a duplicated entries
[100]="foo=1". But given the low likelihood of this issue and the size
of idxToLabel, it should be fine.
This commit is contained in:
parent
0c0f013a60
commit
664f337c70
3 changed files with 20 additions and 2 deletions
|
@ -40,6 +40,7 @@ See also [LTS releases](https://docs.victoriametrics.com/lts-releases/).
|
||||||
* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent/): properly consume messages [from kafka](https://docs.victoriametrics.com/vmagent/#kafka-integration). Previously vmagent could skip some messages during start-up.
|
* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent/): properly consume messages [from kafka](https://docs.victoriametrics.com/vmagent/#kafka-integration). Previously vmagent could skip some messages during start-up.
|
||||||
* BUGFIX: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): perform deduplication for all received data when specifying `-streamAggr.dedupInterval` or `-remoteWrite.streamAggr.dedupInterval` command-line flags are set. Previously, if the `-remoteWrite.streamAggr.config` or `-streamAggr.config` is set, only series that matched aggregation config were deduplicated. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6711#issuecomment-2288361213) for details.
|
* BUGFIX: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): perform deduplication for all received data when specifying `-streamAggr.dedupInterval` or `-remoteWrite.streamAggr.dedupInterval` command-line flags are set. Previously, if the `-remoteWrite.streamAggr.config` or `-streamAggr.config` is set, only series that matched aggregation config were deduplicated. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6711#issuecomment-2288361213) for details.
|
||||||
* BUGFIX: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): fix `-remoteWrite.streamAggr.dropInputLabels` labels parsing. Now, this flag allows specifying a list of labels to drop (by using '^^' separator, i.e. `dropInputLabels='replica^^az,replica'`) per each corresponding `remoteWrite.url`. Before, `-remoteWrite.streamAggr.dropInputLabels` labels were incorrectly applied to all configured `remoteWrite.url`s. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6780) for the details.
|
* BUGFIX: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): fix `-remoteWrite.streamAggr.dropInputLabels` labels parsing. Now, this flag allows specifying a list of labels to drop (by using '^^' separator, i.e. `dropInputLabels='replica^^az,replica'`) per each corresponding `remoteWrite.url`. Before, `-remoteWrite.streamAggr.dropInputLabels` labels were incorrectly applied to all configured `remoteWrite.url`s. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6780) for the details.
|
||||||
|
* BUGFIX: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): fix duplicated aggregation results if there are multiple concurrent writing samples with the same label. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/7118).
|
||||||
* BUGFIX: [vmagent dashboard](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/dashboards/vmagent.json): fix legend captions for stream aggregation related panels. Before they were displaying wrong label names.
|
* BUGFIX: [vmagent dashboard](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/dashboards/vmagent.json): fix legend captions for stream aggregation related panels. Before they were displaying wrong label names.
|
||||||
* BUGFIX: [vmgateway](https://docs.victoriametrics.com/vmgateway/): add missing `datadog`, `newrelic`, `opentelemetry` and `pushgateway` routes to the `JWT` authorization routes. Allows prefixed (`promtheus/graphite`) routes for query requests.
|
* BUGFIX: [vmgateway](https://docs.victoriametrics.com/vmgateway/): add missing `datadog`, `newrelic`, `opentelemetry` and `pushgateway` routes to the `JWT` authorization routes. Allows prefixed (`promtheus/graphite`) routes for query requests.
|
||||||
* BUGFIX: [Single-node VictoriaMetrics](https://docs.victoriametrics.com/) and `vmstorage` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/cluster-victoriametrics/): properly cache empty list of matching time series for the given [labels filter](https://docs.victoriametrics.com/keyconcepts/#filtering). This type of caching was broken since [v1.97.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.97.0), which could result in the increased CPU usage when performing queries, which match zero time series. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7009).
|
* BUGFIX: [Single-node VictoriaMetrics](https://docs.victoriametrics.com/) and `vmstorage` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/cluster-victoriametrics/): properly cache empty list of matching time series for the given [labels filter](https://docs.victoriametrics.com/keyconcepts/#filtering). This type of caching was broken since [v1.97.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.97.0), which could result in the increased CPU usage when performing queries, which match zero time series. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7009).
|
||||||
|
|
|
@ -59,8 +59,19 @@ func (lc *LabelsCompressor) compress(dst []uint64, labels []prompbmarshal.Label)
|
||||||
idx := lc.nextIdx.Add(1)
|
idx := lc.nextIdx.Add(1)
|
||||||
v = idx
|
v = idx
|
||||||
labelCopy := cloneLabel(label)
|
labelCopy := cloneLabel(label)
|
||||||
|
|
||||||
|
// Must store idxToLabel entry before labelToIdx,
|
||||||
|
// so it can be found by possible concurrent goroutines.
|
||||||
|
//
|
||||||
|
// We might store duplicated entries for single label with different indexes,
|
||||||
|
// and it's fine, see https://github.com/VictoriaMetrics/VictoriaMetrics/pull/7118.
|
||||||
lc.idxToLabel.Store(idx, labelCopy)
|
lc.idxToLabel.Store(idx, labelCopy)
|
||||||
lc.labelToIdx.Store(labelCopy, v)
|
vNew, loaded := lc.labelToIdx.LoadOrStore(labelCopy, v)
|
||||||
|
if loaded {
|
||||||
|
// This label has been stored by a concurrent goroutine with different index,
|
||||||
|
// use it for key consistency in aggrState.
|
||||||
|
v = vNew
|
||||||
|
}
|
||||||
|
|
||||||
// Update lc.totalSizeBytes
|
// Update lc.totalSizeBytes
|
||||||
labelSizeBytes := uint64(len(label.Name) + len(label.Value))
|
labelSizeBytes := uint64(len(label.Name) + len(label.Value))
|
||||||
|
|
|
@ -68,6 +68,7 @@ func TestLabelsCompressorSerial(t *testing.T) {
|
||||||
func TestLabelsCompressorConcurrent(t *testing.T) {
|
func TestLabelsCompressorConcurrent(t *testing.T) {
|
||||||
const concurrency = 5
|
const concurrency = 5
|
||||||
var lc LabelsCompressor
|
var lc LabelsCompressor
|
||||||
|
var expectCompressedKeys sync.Map
|
||||||
|
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
for i := 0; i < concurrency; i++ {
|
for i := 0; i < concurrency; i++ {
|
||||||
|
@ -75,9 +76,14 @@ func TestLabelsCompressorConcurrent(t *testing.T) {
|
||||||
go func() {
|
go func() {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
series := newTestSeries(100, 20)
|
series := newTestSeries(100, 20)
|
||||||
for i, labels := range series {
|
for n, labels := range series {
|
||||||
sExpected := labelsToString(labels)
|
sExpected := labelsToString(labels)
|
||||||
data := lc.Compress(nil, labels)
|
data := lc.Compress(nil, labels)
|
||||||
|
if expectData, ok := expectCompressedKeys.LoadOrStore(n, data); ok {
|
||||||
|
if string(data) != string(expectData.([]byte)) {
|
||||||
|
panic(fmt.Errorf("unexpected compress result at series/%d in iteration %d ", n, i))
|
||||||
|
}
|
||||||
|
}
|
||||||
labelsResult := lc.Decompress(nil, data)
|
labelsResult := lc.Decompress(nil, data)
|
||||||
sResult := labelsToString(labelsResult)
|
sResult := labelsToString(labelsResult)
|
||||||
if sExpected != sResult {
|
if sExpected != sResult {
|
||||||
|
|
Loading…
Reference in a new issue