mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
lib/logstorage: do not count dictionary values which have no matching logs in count_uniq
stats function
Create blockResultColumn.forEachDictValue* helper functions for visiting matching
dictionary values. These helper functions should prevent from counting dictionary values
without matching logs in the future.
This is a follow-up for 0c0f013a60
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7152
This commit is contained in:
parent
630211cfed
commit
a350be48b6
5 changed files with 68 additions and 40 deletions
|
@ -17,6 +17,8 @@ according to [these docs](https://docs.victoriametrics.com/victorialogs/quicksta
|
||||||
|
|
||||||
* FEATURE: add interactive command-line tool for querying VictoriaLogs - [`vlogscli`](https://docs.victoriametrics.com/victorialogs/querying/vlogscli/).
|
* FEATURE: add interactive command-line tool for querying VictoriaLogs - [`vlogscli`](https://docs.victoriametrics.com/victorialogs/querying/vlogscli/).
|
||||||
|
|
||||||
|
* BUGFIX: [`count_uniq` stats function](https://docs.victoriametrics.com/victorialogs/logsql/#count_uniq-stats): do not count field values, which aren't matched by the used filters. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7152).
|
||||||
|
|
||||||
## [v0.32.1](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.32.1-victorialogs)
|
## [v0.32.1](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.32.1-victorialogs)
|
||||||
|
|
||||||
Released at 2024-09-30
|
Released at 2024-09-30
|
||||||
|
|
|
@ -1728,6 +1728,60 @@ func (c *blockResultColumn) getValuesEncoded(br *blockResult) []string {
|
||||||
return c.valuesEncoded
|
return c.valuesEncoded
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// forEachDictValue calls f for every value in the column dictionary.
|
||||||
|
func (c *blockResultColumn) forEachDictValue(br *blockResult, f func(v string)) {
|
||||||
|
if c.valueType != valueTypeDict {
|
||||||
|
logger.Panicf("BUG: unexpected column valueType=%d; want %d", c.valueType, valueTypeDict)
|
||||||
|
}
|
||||||
|
if uint64(br.rowsLen) == br.bs.bsw.bh.rowsCount {
|
||||||
|
// Fast path - there is no need in reading encoded values
|
||||||
|
for _, v := range c.dictValues {
|
||||||
|
f(v)
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Slow path - need to read encoded values in order filter not referenced columns.
|
||||||
|
a := encoding.GetUint64s(len(c.dictValues))
|
||||||
|
hits := a.A
|
||||||
|
clear(hits)
|
||||||
|
valuesEncoded := c.getValuesEncoded(br)
|
||||||
|
for _, v := range valuesEncoded {
|
||||||
|
idx := unmarshalUint8(v)
|
||||||
|
hits[idx]++
|
||||||
|
}
|
||||||
|
for i, v := range c.dictValues {
|
||||||
|
if h := hits[i]; h > 0 {
|
||||||
|
f(v)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
encoding.PutUint64s(a)
|
||||||
|
}
|
||||||
|
|
||||||
|
// forEachDictValueWithHits calls f for every value in the column dictionary.
|
||||||
|
//
|
||||||
|
// hits is the number of rows with the given value v in the column.
|
||||||
|
func (c *blockResultColumn) forEachDictValueWithHits(br *blockResult, f func(v string, hits uint64)) {
|
||||||
|
if c.valueType != valueTypeDict {
|
||||||
|
logger.Panicf("BUG: unexpected column valueType=%d; want %d", c.valueType, valueTypeDict)
|
||||||
|
}
|
||||||
|
|
||||||
|
a := encoding.GetUint64s(len(c.dictValues))
|
||||||
|
hits := a.A
|
||||||
|
clear(hits)
|
||||||
|
valuesEncoded := c.getValuesEncoded(br)
|
||||||
|
for _, v := range valuesEncoded {
|
||||||
|
idx := unmarshalUint8(v)
|
||||||
|
hits[idx]++
|
||||||
|
}
|
||||||
|
for i, v := range c.dictValues {
|
||||||
|
if h := hits[i]; h > 0 {
|
||||||
|
f(v, h)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
encoding.PutUint64s(a)
|
||||||
|
}
|
||||||
|
|
||||||
func (c *blockResultColumn) getFloatValueAtRow(br *blockResult, rowIdx int) (float64, bool) {
|
func (c *blockResultColumn) getFloatValueAtRow(br *blockResult, rowIdx int) (float64, bool) {
|
||||||
if c.isConst {
|
if c.isConst {
|
||||||
v := c.valuesEncoded[0]
|
v := c.valuesEncoded[0]
|
||||||
|
|
|
@ -164,18 +164,7 @@ func (shard *pipeTopProcessorShard) writeBlock(br *blockResult) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if c.valueType == valueTypeDict {
|
if c.valueType == valueTypeDict {
|
||||||
a := encoding.GetUint64s(len(c.dictValues))
|
c.forEachDictValueWithHits(br, shard.updateState)
|
||||||
hits := a.A
|
|
||||||
clear(hits)
|
|
||||||
valuesEncoded := c.getValuesEncoded(br)
|
|
||||||
for _, v := range valuesEncoded {
|
|
||||||
idx := unmarshalUint8(v)
|
|
||||||
hits[idx]++
|
|
||||||
}
|
|
||||||
for i, v := range c.dictValues {
|
|
||||||
shard.updateState(v, hits[i])
|
|
||||||
}
|
|
||||||
encoding.PutUint64s(a)
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -207,10 +196,6 @@ func (shard *pipeTopProcessorShard) writeBlock(br *blockResult) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (shard *pipeTopProcessorShard) updateState(v string, hits uint64) {
|
func (shard *pipeTopProcessorShard) updateState(v string, hits uint64) {
|
||||||
if hits == 0 {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
m := shard.getM()
|
m := shard.getM()
|
||||||
pHits, ok := m[v]
|
pHits, ok := m[v]
|
||||||
if !ok {
|
if !ok {
|
||||||
|
|
|
@ -166,18 +166,7 @@ func (shard *pipeUniqProcessorShard) writeBlock(br *blockResult) bool {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
if c.valueType == valueTypeDict {
|
if c.valueType == valueTypeDict {
|
||||||
a := encoding.GetUint64s(len(c.dictValues))
|
c.forEachDictValueWithHits(br, shard.updateState)
|
||||||
hits := a.A
|
|
||||||
clear(hits)
|
|
||||||
valuesEncoded := c.getValuesEncoded(br)
|
|
||||||
for _, v := range valuesEncoded {
|
|
||||||
idx := unmarshalUint8(v)
|
|
||||||
hits[idx]++
|
|
||||||
}
|
|
||||||
for i, v := range c.dictValues {
|
|
||||||
shard.updateState(v, hits[i])
|
|
||||||
}
|
|
||||||
encoding.PutUint64s(a)
|
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -224,10 +213,6 @@ func (shard *pipeUniqProcessorShard) writeBlock(br *blockResult) bool {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (shard *pipeUniqProcessorShard) updateState(v string, hits uint64) {
|
func (shard *pipeUniqProcessorShard) updateState(v string, hits uint64) {
|
||||||
if hits == 0 {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
m := shard.getM()
|
m := shard.getM()
|
||||||
pHits, ok := m[v]
|
pHits, ok := m[v]
|
||||||
if !ok {
|
if !ok {
|
||||||
|
|
|
@ -42,6 +42,7 @@ type statsCountUniqProcessor struct {
|
||||||
|
|
||||||
columnValues [][]string
|
columnValues [][]string
|
||||||
keyBuf []byte
|
keyBuf []byte
|
||||||
|
tmpNum int
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sup *statsCountUniqProcessor) updateStatsForAllRows(br *blockResult) int {
|
func (sup *statsCountUniqProcessor) updateStatsForAllRows(br *blockResult) int {
|
||||||
|
@ -133,18 +134,19 @@ func (sup *statsCountUniqProcessor) updateStatsForAllRows(br *blockResult) int {
|
||||||
return stateSizeIncrease
|
return stateSizeIncrease
|
||||||
}
|
}
|
||||||
if c.valueType == valueTypeDict {
|
if c.valueType == valueTypeDict {
|
||||||
// count unique non-zero c.dictValues
|
// count unique non-zero dict values for the selected logs
|
||||||
keyBuf := sup.keyBuf[:0]
|
sup.tmpNum = 0
|
||||||
for _, v := range c.dictValues {
|
c.forEachDictValue(br, func(v string) {
|
||||||
if v == "" {
|
if v == "" {
|
||||||
// Do not count empty values
|
// Do not count empty values
|
||||||
continue
|
return
|
||||||
}
|
}
|
||||||
keyBuf = append(keyBuf[:0], 0)
|
keyBuf := append(sup.keyBuf[:0], 0)
|
||||||
keyBuf = append(keyBuf, v...)
|
keyBuf = append(keyBuf, v...)
|
||||||
stateSizeIncrease += sup.updateState(keyBuf)
|
sup.tmpNum += sup.updateState(keyBuf)
|
||||||
}
|
sup.keyBuf = keyBuf
|
||||||
sup.keyBuf = keyBuf
|
})
|
||||||
|
stateSizeIncrease += sup.tmpNum
|
||||||
return stateSizeIncrease
|
return stateSizeIncrease
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue