lib/logstorage: skip values with zero hits for 'uniq', 'top' and 'field_values' pipes

See https://github.com/VictoriaMetrics/victorialogs-datasource/issues/72#issuecomment-2352078483
This commit is contained in:
Aliaksandr Valialkin 2024-09-30 14:06:20 +02:00
parent ba037a9777
commit dbcf06cd85
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB
3 changed files with 21 additions and 17 deletions

View file

@ -15,6 +15,8 @@ according to [these docs](https://docs.victoriametrics.com/victorialogs/quicksta
## tip ## tip
* BUGFIX: do not return field values with zero matching logs from [`field_values`](https://docs.victoriametrics.com/victorialogs/logsql/#field_values-pipe), [`top`](https://docs.victoriametrics.com/victorialogs/logsql/#top-pipe) and [`uniq`](https://docs.victoriametrics.com/victorialogs/logsql/#uniq-pipe) pipes. See [this issue](https://github.com/VictoriaMetrics/victorialogs-datasource/issues/72#issuecomment-2352078483).
## [v0.32.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.32.0-victorialogs) ## [v0.32.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.32.0-victorialogs)
Released at 2024-09-29 Released at 2024-09-29

View file

@ -207,6 +207,10 @@ func (shard *pipeTopProcessorShard) writeBlock(br *blockResult) {
} }
func (shard *pipeTopProcessorShard) updateState(v string, hits uint64) { func (shard *pipeTopProcessorShard) updateState(v string, hits uint64) {
if hits == 0 {
return
}
m := shard.getM() m := shard.getM()
pHits, ok := m[v] pHits, ok := m[v]
if !ok { if !ok {

View file

@ -166,24 +166,18 @@ func (shard *pipeUniqProcessorShard) writeBlock(br *blockResult) bool {
return true return true
} }
if c.valueType == valueTypeDict { if c.valueType == valueTypeDict {
if needHits { a := encoding.GetUint64s(len(c.dictValues))
a := encoding.GetUint64s(len(c.dictValues)) hits := a.A
hits := a.A clear(hits)
clear(hits) valuesEncoded := c.getValuesEncoded(br)
valuesEncoded := c.getValuesEncoded(br) for _, v := range valuesEncoded {
for _, v := range valuesEncoded { idx := unmarshalUint8(v)
idx := unmarshalUint8(v) hits[idx]++
hits[idx]++
}
for i, v := range c.dictValues {
shard.updateState(v, hits[i])
}
encoding.PutUint64s(a)
} else {
for _, v := range c.dictValues {
shard.updateState(v, 0)
}
} }
for i, v := range c.dictValues {
shard.updateState(v, hits[i])
}
encoding.PutUint64s(a)
return true return true
} }
@ -230,6 +224,10 @@ func (shard *pipeUniqProcessorShard) writeBlock(br *blockResult) bool {
} }
func (shard *pipeUniqProcessorShard) updateState(v string, hits uint64) { func (shard *pipeUniqProcessorShard) updateState(v string, hits uint64) {
if hits == 0 {
return
}
m := shard.getM() m := shard.getM()
pHits, ok := m[v] pHits, ok := m[v]
if !ok { if !ok {