From a350be48b68330ee1a487e1fb09b002d3be45163 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Tue, 1 Oct 2024 13:29:07 +0200 Subject: [PATCH] lib/logstorage: do not count dictionary values which have no matching logs in `count_uniq` stats function Create blockResultColumn.forEachDictValue* helper functions for visiting matching dictionary values. These helper functions should prevent from counting dictionary values without matching logs in the future. This is a follow-up for 0c0f013a60c812f46df1b8fa2863d1038e0e0208 Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7152 --- docs/VictoriaLogs/CHANGELOG.md | 2 ++ lib/logstorage/block_result.go | 54 ++++++++++++++++++++++++++++++ lib/logstorage/pipe_top.go | 17 +--------- lib/logstorage/pipe_uniq.go | 17 +--------- lib/logstorage/stats_count_uniq.go | 18 +++++----- 5 files changed, 68 insertions(+), 40 deletions(-) diff --git a/docs/VictoriaLogs/CHANGELOG.md b/docs/VictoriaLogs/CHANGELOG.md index 7ac0526ed..5c4703dac 100644 --- a/docs/VictoriaLogs/CHANGELOG.md +++ b/docs/VictoriaLogs/CHANGELOG.md @@ -17,6 +17,8 @@ according to [these docs](https://docs.victoriametrics.com/victorialogs/quicksta * FEATURE: add interactive command-line tool for querying VictoriaLogs - [`vlogscli`](https://docs.victoriametrics.com/victorialogs/querying/vlogscli/). +* BUGFIX: [`count_uniq` stats function](https://docs.victoriametrics.com/victorialogs/logsql/#count_uniq-stats): do not count field values, which aren't matched by the used filters. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7152). + ## [v0.32.1](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.32.1-victorialogs) Released at 2024-09-30 diff --git a/lib/logstorage/block_result.go b/lib/logstorage/block_result.go index d703c6c1c..319df03fb 100644 --- a/lib/logstorage/block_result.go +++ b/lib/logstorage/block_result.go @@ -1728,6 +1728,60 @@ func (c *blockResultColumn) getValuesEncoded(br *blockResult) []string { return c.valuesEncoded } +// forEachDictValue calls f for every value in the column dictionary. +func (c *blockResultColumn) forEachDictValue(br *blockResult, f func(v string)) { + if c.valueType != valueTypeDict { + logger.Panicf("BUG: unexpected column valueType=%d; want %d", c.valueType, valueTypeDict) + } + if uint64(br.rowsLen) == br.bs.bsw.bh.rowsCount { + // Fast path - there is no need in reading encoded values + for _, v := range c.dictValues { + f(v) + } + return + } + + // Slow path - need to read encoded values in order filter not referenced columns. + a := encoding.GetUint64s(len(c.dictValues)) + hits := a.A + clear(hits) + valuesEncoded := c.getValuesEncoded(br) + for _, v := range valuesEncoded { + idx := unmarshalUint8(v) + hits[idx]++ + } + for i, v := range c.dictValues { + if h := hits[i]; h > 0 { + f(v) + } + } + encoding.PutUint64s(a) +} + +// forEachDictValueWithHits calls f for every value in the column dictionary. +// +// hits is the number of rows with the given value v in the column. +func (c *blockResultColumn) forEachDictValueWithHits(br *blockResult, f func(v string, hits uint64)) { + if c.valueType != valueTypeDict { + logger.Panicf("BUG: unexpected column valueType=%d; want %d", c.valueType, valueTypeDict) + } + + a := encoding.GetUint64s(len(c.dictValues)) + hits := a.A + clear(hits) + valuesEncoded := c.getValuesEncoded(br) + for _, v := range valuesEncoded { + idx := unmarshalUint8(v) + hits[idx]++ + } + for i, v := range c.dictValues { + if h := hits[i]; h > 0 { + f(v, h) + } + } + encoding.PutUint64s(a) +} + func (c *blockResultColumn) getFloatValueAtRow(br *blockResult, rowIdx int) (float64, bool) { if c.isConst { v := c.valuesEncoded[0] diff --git a/lib/logstorage/pipe_top.go b/lib/logstorage/pipe_top.go index fb196cacb..1b8677934 100644 --- a/lib/logstorage/pipe_top.go +++ b/lib/logstorage/pipe_top.go @@ -164,18 +164,7 @@ func (shard *pipeTopProcessorShard) writeBlock(br *blockResult) { return } if c.valueType == valueTypeDict { - a := encoding.GetUint64s(len(c.dictValues)) - hits := a.A - clear(hits) - valuesEncoded := c.getValuesEncoded(br) - for _, v := range valuesEncoded { - idx := unmarshalUint8(v) - hits[idx]++ - } - for i, v := range c.dictValues { - shard.updateState(v, hits[i]) - } - encoding.PutUint64s(a) + c.forEachDictValueWithHits(br, shard.updateState) return } @@ -207,10 +196,6 @@ func (shard *pipeTopProcessorShard) writeBlock(br *blockResult) { } func (shard *pipeTopProcessorShard) updateState(v string, hits uint64) { - if hits == 0 { - return - } - m := shard.getM() pHits, ok := m[v] if !ok { diff --git a/lib/logstorage/pipe_uniq.go b/lib/logstorage/pipe_uniq.go index 4803ac45a..dd199d1cf 100644 --- a/lib/logstorage/pipe_uniq.go +++ b/lib/logstorage/pipe_uniq.go @@ -166,18 +166,7 @@ func (shard *pipeUniqProcessorShard) writeBlock(br *blockResult) bool { return true } if c.valueType == valueTypeDict { - a := encoding.GetUint64s(len(c.dictValues)) - hits := a.A - clear(hits) - valuesEncoded := c.getValuesEncoded(br) - for _, v := range valuesEncoded { - idx := unmarshalUint8(v) - hits[idx]++ - } - for i, v := range c.dictValues { - shard.updateState(v, hits[i]) - } - encoding.PutUint64s(a) + c.forEachDictValueWithHits(br, shard.updateState) return true } @@ -224,10 +213,6 @@ func (shard *pipeUniqProcessorShard) writeBlock(br *blockResult) bool { } func (shard *pipeUniqProcessorShard) updateState(v string, hits uint64) { - if hits == 0 { - return - } - m := shard.getM() pHits, ok := m[v] if !ok { diff --git a/lib/logstorage/stats_count_uniq.go b/lib/logstorage/stats_count_uniq.go index 42a71a03f..f22075e92 100644 --- a/lib/logstorage/stats_count_uniq.go +++ b/lib/logstorage/stats_count_uniq.go @@ -42,6 +42,7 @@ type statsCountUniqProcessor struct { columnValues [][]string keyBuf []byte + tmpNum int } func (sup *statsCountUniqProcessor) updateStatsForAllRows(br *blockResult) int { @@ -133,18 +134,19 @@ func (sup *statsCountUniqProcessor) updateStatsForAllRows(br *blockResult) int { return stateSizeIncrease } if c.valueType == valueTypeDict { - // count unique non-zero c.dictValues - keyBuf := sup.keyBuf[:0] - for _, v := range c.dictValues { + // count unique non-zero dict values for the selected logs + sup.tmpNum = 0 + c.forEachDictValue(br, func(v string) { if v == "" { // Do not count empty values - continue + return } - keyBuf = append(keyBuf[:0], 0) + keyBuf := append(sup.keyBuf[:0], 0) keyBuf = append(keyBuf, v...) - stateSizeIncrease += sup.updateState(keyBuf) - } - sup.keyBuf = keyBuf + sup.tmpNum += sup.updateState(keyBuf) + sup.keyBuf = keyBuf + }) + stateSizeIncrease += sup.tmpNum return stateSizeIncrease }