lib/logstorage: properly skip filtered out dict values when calculating uniq_values, min, max, row_min and row_max stats functions

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7458
This commit is contained in:
Aliaksandr Valialkin 2024-11-08 23:17:38 +01:00
parent 2f1ce74d97
commit 546bf7d579
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB
7 changed files with 24 additions and 23 deletions

View file

@ -26,6 +26,7 @@ according to [these docs](https://docs.victoriametrics.com/victorialogs/quicksta
* FEATURE: [Loki data ingestion](https://docs.victoriametrics.com/victorialogs/data-ingestion/#loki-json-api): show the original request body on parse errors. This should simplify debugging. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7490).
* BUGFIX: [`values` stats function](https://docs.victoriametrics.com/victorialogs/logsql/#values-stats): fix a bug, which could lead to corrupted results. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7458).
* BUGFIX: [`uniq_values`](https://docs.victoriametrics.com/victorialogs/logsql/#uniq_values-stats), [`min`](https://docs.victoriametrics.com/victorialogs/logsql/#min-stats), [`max`](https://docs.victoriametrics.com/victorialogs/logsql/#max-stats), [`row_min`](https://docs.victoriametrics.com/victorialogs/logsql/#row_min-stats) and [`row_max`](https://docs.victoriametrics.com/victorialogs/logsql/#row_max-stats) stats functions: fix a bug, which could return non-matching field values for these functions. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7458).
* BUGFIX: [HTTP querying APIs](https://docs.victoriametrics.com/victorialogs/querying/#http-api): properly take into account the `end` query arg when calculating time range for [`_time:duration` filter](https://docs.victoriametrics.com/victorialogs/logsql/#time-filter). Previously the `_time:duration` filter was treated as `_time:[now-duration, now)`, while it should be treated as `_time:[end-duration, end)`.
## [v0.41.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.41.0-victorialogs)

View file

@ -1726,7 +1726,9 @@ func (c *blockResultColumn) getValuesEncoded(br *blockResult) []string {
return c.valuesEncoded
}
// forEachDictValue calls f for every value in the column dictionary.
// forEachDictValue calls f for every matching value in the column dictionary.
//
// It properly skips non-matching dict values.
func (c *blockResultColumn) forEachDictValue(br *blockResult, f func(v string)) {
if c.valueType != valueTypeDict {
logger.Panicf("BUG: unexpected column valueType=%d; want %d", c.valueType, valueTypeDict)
@ -1756,7 +1758,9 @@ func (c *blockResultColumn) forEachDictValue(br *blockResult, f func(v string))
encoding.PutUint64s(a)
}
// forEachDictValueWithHits calls f for every value in the column dictionary.
// forEachDictValueWithHits calls f for every matching value in the column dictionary.
//
// It properly skips non-matching dict values.
//
// hits is the number of rows with the given value v in the column.
func (c *blockResultColumn) forEachDictValueWithHits(br *blockResult, f func(v string, hits uint64)) {

View file

@ -114,9 +114,9 @@ func (smp *statsMaxProcessor) updateStateForColumn(br *blockResult, c *blockResu
smp.updateStateString(v)
}
case valueTypeDict:
for _, v := range c.dictValues {
c.forEachDictValue(br, func(v string) {
smp.updateStateString(v)
}
})
case valueTypeUint8, valueTypeUint16, valueTypeUint32, valueTypeUint64:
bb := bbPool.Get()
bb.B = marshalUint64String(bb.B[:0], c.maxValue)

View file

@ -116,9 +116,9 @@ func (smp *statsMinProcessor) updateStateForColumn(br *blockResult, c *blockResu
smp.updateStateString(v)
}
case valueTypeDict:
for _, v := range c.dictValues {
c.forEachDictValue(br, func(v string) {
smp.updateStateString(v)
}
})
case valueTypeUint8, valueTypeUint16, valueTypeUint32, valueTypeUint64:
bb := bbPool.Get()
bb.B = marshalUint64String(bb.B[:0], c.minValue)

View file

@ -82,12 +82,11 @@ func (smp *statsRowMaxProcessor) updateStatsForAllRows(br *blockResult) int {
case valueTypeString:
needUpdateState = true
case valueTypeDict:
for _, v := range c.dictValues {
if smp.needUpdateStateString(v) {
c.forEachDictValue(br, func(v string) {
if !needUpdateState && smp.needUpdateStateString(v) {
needUpdateState = true
break
}
}
})
case valueTypeUint8, valueTypeUint16, valueTypeUint32, valueTypeUint64:
bb := bbPool.Get()
bb.B = marshalUint64String(bb.B[:0], c.maxValue)

View file

@ -82,12 +82,11 @@ func (smp *statsRowMinProcessor) updateStatsForAllRows(br *blockResult) int {
case valueTypeString:
needUpdateState = true
case valueTypeDict:
for _, v := range c.dictValues {
if smp.needUpdateStateString(v) {
c.forEachDictValue(br, func(v string) {
if !needUpdateState && smp.needUpdateStateString(v) {
needUpdateState = true
break
}
}
})
case valueTypeUint8, valueTypeUint16, valueTypeUint32, valueTypeUint64:
bb := bbPool.Get()
bb.B = marshalUint64String(bb.B[:0], c.minValue)

View file

@ -74,9 +74,9 @@ func (sup *statsUniqValuesProcessor) updateStatsForAllRowsColumn(c *blockResultC
stateSizeIncrease := 0
if c.valueType == valueTypeDict {
// collect unique non-zero c.dictValues
for _, v := range c.dictValues {
c.forEachDictValue(br, func(v string) {
stateSizeIncrease += sup.updateState(v)
}
})
return stateSizeIncrease
}
@ -181,14 +181,12 @@ func (sup *statsUniqValuesProcessor) updateState(v string) int {
// Skip empty values
return 0
}
stateSizeIncrease := 0
if _, ok := sup.m[v]; !ok {
if _, ok := sup.m[v]; ok {
return 0
}
vCopy := strings.Clone(v)
sup.m[vCopy] = struct{}{}
stateSizeIncrease += len(vCopy) + int(unsafe.Sizeof(vCopy))
}
return stateSizeIncrease
return len(vCopy) + int(unsafe.Sizeof(vCopy))
}
func (sup *statsUniqValuesProcessor) limitReached() bool {