lib/logstorage: skip values with zero hits for 'uniq', 'top' and 'field_values' pipes

See https://github.com/VictoriaMetrics/victorialogs-datasource/issues/72#issuecomment-2352078483
This commit is contained in:
Aliaksandr Valialkin 2024-09-30 14:06:20 +02:00
parent 85ea0f80fc
commit 0c0f013a60
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB
3 changed files with 21 additions and 17 deletions

View file

@ -15,6 +15,8 @@ according to [these docs](https://docs.victoriametrics.com/victorialogs/quicksta
## tip
* BUGFIX: do not return field values with zero matching logs from [`field_values`](https://docs.victoriametrics.com/victorialogs/logsql/#field_values-pipe), [`top`](https://docs.victoriametrics.com/victorialogs/logsql/#top-pipe) and [`uniq`](https://docs.victoriametrics.com/victorialogs/logsql/#uniq-pipe) pipes. See [this issue](https://github.com/VictoriaMetrics/victorialogs-datasource/issues/72#issuecomment-2352078483).
## [v0.32.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.32.0-victorialogs)
Released at 2024-09-29

View file

@ -207,6 +207,10 @@ func (shard *pipeTopProcessorShard) writeBlock(br *blockResult) {
}
func (shard *pipeTopProcessorShard) updateState(v string, hits uint64) {
if hits == 0 {
return
}
m := shard.getM()
pHits, ok := m[v]
if !ok {

View file

@ -166,24 +166,18 @@ func (shard *pipeUniqProcessorShard) writeBlock(br *blockResult) bool {
return true
}
if c.valueType == valueTypeDict {
if needHits {
a := encoding.GetUint64s(len(c.dictValues))
hits := a.A
clear(hits)
valuesEncoded := c.getValuesEncoded(br)
for _, v := range valuesEncoded {
idx := unmarshalUint8(v)
hits[idx]++
}
for i, v := range c.dictValues {
shard.updateState(v, hits[i])
}
encoding.PutUint64s(a)
} else {
for _, v := range c.dictValues {
shard.updateState(v, 0)
}
a := encoding.GetUint64s(len(c.dictValues))
hits := a.A
clear(hits)
valuesEncoded := c.getValuesEncoded(br)
for _, v := range valuesEncoded {
idx := unmarshalUint8(v)
hits[idx]++
}
for i, v := range c.dictValues {
shard.updateState(v, hits[i])
}
encoding.PutUint64s(a)
return true
}
@ -230,6 +224,10 @@ func (shard *pipeUniqProcessorShard) writeBlock(br *blockResult) bool {
}
func (shard *pipeUniqProcessorShard) updateState(v string, hits uint64) {
if hits == 0 {
return
}
m := shard.getM()
pHits, ok := m[v]
if !ok {