From d3e464a68bf10451dba5d1d47fa6d203d976824e Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Wed, 15 May 2024 13:07:15 +0200 Subject: [PATCH] wip --- docs/VictoriaLogs/CHANGELOG.md | 2 + docs/VictoriaLogs/LogsQL.md | 6 +- lib/logstorage/block_result.go | 257 ++----------------------------- lib/logstorage/pipe_topk.go | 24 ++- lib/logstorage/stats_avg.go | 9 +- lib/logstorage/stats_max.go | 66 ++++---- lib/logstorage/stats_min.go | 58 ++++--- lib/logstorage/stats_quantile.go | 9 +- lib/logstorage/stats_sum.go | 8 +- 9 files changed, 126 insertions(+), 313 deletions(-) diff --git a/docs/VictoriaLogs/CHANGELOG.md b/docs/VictoriaLogs/CHANGELOG.md index 3a4846b2e..12e6731eb 100644 --- a/docs/VictoriaLogs/CHANGELOG.md +++ b/docs/VictoriaLogs/CHANGELOG.md @@ -19,6 +19,8 @@ according to [these docs](https://docs.victoriametrics.com/VictoriaLogs/QuickSta ## tip +* FEATURE: allow passing string values to [`min`](https://docs.victoriametrics.com/victorialogs/logsql/#min-stats) and [`max`](https://docs.victoriametrics.com/victorialogs/logsql/#max-stats) functions. Previously only numeric values could be passed to them. + * BUGFIX: properly take into account `offset` [`sort` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#sort-pipe) when it already has `limit`. For example, `_time:5m | sort by (foo) offset 20 limit 10`. ## [v0.7.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.7.0-victorialogs) diff --git a/docs/VictoriaLogs/LogsQL.md b/docs/VictoriaLogs/LogsQL.md index 62b6a4979..4418248ed 100644 --- a/docs/VictoriaLogs/LogsQL.md +++ b/docs/VictoriaLogs/LogsQL.md @@ -1505,9 +1505,8 @@ See also: ### max stats -`max(field1, ..., fieldN)` [stats pipe](#stats-pipe) calculates the maximum value across +`max(field1, ..., fieldN)` [stats pipe](#stats-pipe) returns the maximum value across all the mentioned [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). -Non-numeric values are ignored. For example, the following query returns the maximum value for the `duration` [field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) over logs for the last 5 minutes: @@ -1543,9 +1542,8 @@ See also: ### min stats -`min(field1, ..., fieldN)` [stats pipe](#stats-pipe) calculates the minimum value across +`min(field1, ..., fieldN)` [stats pipe](#stats-pipe) returns the minimum value across all the mentioned [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). -Non-numeric values are ignored. For example, the following query returns the minimum value for the `duration` [field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) over logs for the last 5 minutes: diff --git a/lib/logstorage/block_result.go b/lib/logstorage/block_result.go index d257775f0..90c88ce56 100644 --- a/lib/logstorage/block_result.go +++ b/lib/logstorage/block_result.go @@ -1470,271 +1470,46 @@ func (c *blockResultColumn) getValues(br *blockResult) []string { return c.values } -func (c *blockResultColumn) getFloatValueAtRow(rowIdx int) float64 { +func (c *blockResultColumn) getFloatValueAtRow(rowIdx int) (float64, bool) { if c.isConst { v := c.encodedValues[0] - f, ok := tryParseFloat64(v) - if !ok { - return nan - } - return f + return tryParseFloat64(v) } if c.isTime { - return nan + return 0, false } switch c.valueType { case valueTypeString: - f, ok := tryParseFloat64(c.encodedValues[rowIdx]) - if !ok { - return nan - } - return f + v := c.encodedValues[rowIdx] + return tryParseFloat64(v) case valueTypeDict: dictIdx := c.encodedValues[rowIdx][0] - f, ok := tryParseFloat64(c.dictValues[dictIdx]) - if !ok { - return nan - } - return f + v := c.dictValues[dictIdx] + return tryParseFloat64(v) case valueTypeUint8: - return float64(c.encodedValues[rowIdx][0]) + return float64(c.encodedValues[rowIdx][0]), true case valueTypeUint16: b := bytesutil.ToUnsafeBytes(c.encodedValues[rowIdx]) - return float64(encoding.UnmarshalUint16(b)) + return float64(encoding.UnmarshalUint16(b)), true case valueTypeUint32: b := bytesutil.ToUnsafeBytes(c.encodedValues[rowIdx]) - return float64(encoding.UnmarshalUint32(b)) + return float64(encoding.UnmarshalUint32(b)), true case valueTypeUint64: b := bytesutil.ToUnsafeBytes(c.encodedValues[rowIdx]) - return float64(encoding.UnmarshalUint64(b)) + return float64(encoding.UnmarshalUint64(b)), true case valueTypeFloat64: b := bytesutil.ToUnsafeBytes(c.encodedValues[rowIdx]) n := encoding.UnmarshalUint64(b) - return math.Float64frombits(n) + f := math.Float64frombits(n) + return f, !math.IsNaN(f) case valueTypeIPv4: - return nan + return 0, false case valueTypeTimestampISO8601: - return nan + return 0, false default: logger.Panicf("BUG: unknown valueType=%d", c.valueType) - return nan - } -} - -func (c *blockResultColumn) getMaxValue() float64 { - if c.isConst { - v := c.encodedValues[0] - f, ok := tryParseFloat64(v) - if !ok { - return nan - } - return f - } - if c.isTime { - return nan - } - - switch c.valueType { - case valueTypeString: - max := nan - f := float64(0) - ok := false - values := c.encodedValues - for i := range values { - if i == 0 || values[i-1] != values[i] { - f, ok = tryParseFloat64(values[i]) - } - if ok && (f > max || math.IsNaN(max)) { - max = f - } - } - return max - case valueTypeDict: - a := encoding.GetFloat64s(len(c.dictValues)) - dictValuesFloat := a.A - for i, v := range c.dictValues { - f, ok := tryParseFloat64(v) - if !ok { - f = nan - } - dictValuesFloat[i] = f - } - max := nan - for _, v := range c.encodedValues { - dictIdx := v[0] - f := dictValuesFloat[dictIdx] - if f > max || math.IsNaN(max) { - max = f - } - } - encoding.PutFloat64s(a) - return max - case valueTypeUint8: - max := -inf - for _, v := range c.encodedValues { - f := float64(v[0]) - if f > max { - max = f - } - } - return max - case valueTypeUint16: - max := -inf - for _, v := range c.encodedValues { - b := bytesutil.ToUnsafeBytes(v) - f := float64(encoding.UnmarshalUint16(b)) - if f > max { - max = f - } - } - return max - case valueTypeUint32: - max := -inf - for _, v := range c.encodedValues { - b := bytesutil.ToUnsafeBytes(v) - f := float64(encoding.UnmarshalUint32(b)) - if f > max { - max = f - } - } - return max - case valueTypeUint64: - max := -inf - for _, v := range c.encodedValues { - b := bytesutil.ToUnsafeBytes(v) - f := float64(encoding.UnmarshalUint64(b)) - if f > max { - max = f - } - } - return max - case valueTypeFloat64: - max := nan - for _, v := range c.encodedValues { - b := bytesutil.ToUnsafeBytes(v) - n := encoding.UnmarshalUint64(b) - f := math.Float64frombits(n) - if math.IsNaN(max) || f > max { - max = f - } - } - return max - case valueTypeIPv4: - return nan - case valueTypeTimestampISO8601: - return nan - default: - logger.Panicf("BUG: unknown valueType=%d", c.valueType) - return nan - } -} - -func (c *blockResultColumn) getMinValue() float64 { - if c.isConst { - v := c.encodedValues[0] - f, ok := tryParseFloat64(v) - if !ok { - return nan - } - return f - } - if c.isTime { - return nan - } - - switch c.valueType { - case valueTypeString: - min := nan - f := float64(0) - ok := false - values := c.encodedValues - for i := range values { - if i == 0 || values[i-1] != values[i] { - f, ok = tryParseFloat64(values[i]) - } - if ok && (f < min || math.IsNaN(min)) { - min = f - } - } - return min - case valueTypeDict: - a := encoding.GetFloat64s(len(c.dictValues)) - dictValuesFloat := a.A - for i, v := range c.dictValues { - f, ok := tryParseFloat64(v) - if !ok { - f = nan - } - dictValuesFloat[i] = f - } - min := nan - for _, v := range c.encodedValues { - dictIdx := v[0] - f := dictValuesFloat[dictIdx] - if f < min || math.IsNaN(min) { - min = f - } - } - encoding.PutFloat64s(a) - return min - case valueTypeUint8: - min := inf - for _, v := range c.encodedValues { - f := float64(v[0]) - if f < min { - min = f - } - } - return min - case valueTypeUint16: - min := inf - for _, v := range c.encodedValues { - b := bytesutil.ToUnsafeBytes(v) - f := float64(encoding.UnmarshalUint16(b)) - if f < min { - min = f - } - } - return min - case valueTypeUint32: - min := inf - for _, v := range c.encodedValues { - b := bytesutil.ToUnsafeBytes(v) - f := float64(encoding.UnmarshalUint32(b)) - if f < min { - min = f - } - } - return min - case valueTypeUint64: - min := inf - for _, v := range c.encodedValues { - b := bytesutil.ToUnsafeBytes(v) - f := float64(encoding.UnmarshalUint64(b)) - if f < min { - min = f - } - } - return min - case valueTypeFloat64: - min := nan - for _, v := range c.encodedValues { - b := bytesutil.ToUnsafeBytes(v) - n := encoding.UnmarshalUint64(b) - f := math.Float64frombits(n) - if math.IsNaN(min) || f < min { - min = f - } - } - return min - case valueTypeIPv4: - return nan - case valueTypeTimestampISO8601: - return nan - default: - logger.Panicf("BUG: unknown valueType=%d", c.valueType) - return nan + return 0, false } } diff --git a/lib/logstorage/pipe_topk.go b/lib/logstorage/pipe_topk.go index d18e3039b..5a6a7f8db 100644 --- a/lib/logstorage/pipe_topk.go +++ b/lib/logstorage/pipe_topk.go @@ -545,9 +545,29 @@ func topkLess(ps *pipeSort, a, b *pipeTopkRow) bool { } if isDesc { - return stringsutil.LessNatural(vB, vA) + return lessString(vB, vA) } - return stringsutil.LessNatural(vA, vB) + return lessString(vA, vB) } return false } + +func lessString(a, b string) bool { + if a == b { + return false + } + + nA, okA := tryParseUint64(a) + nB, okB := tryParseUint64(b) + if okA && okB { + return nA < nB + } + + fA, okA := tryParseFloat64(a) + fB, okB := tryParseFloat64(b) + if okA && okB { + return fA < fB + } + + return stringsutil.LessNatural(a, b) +} diff --git a/lib/logstorage/stats_avg.go b/lib/logstorage/stats_avg.go index 517fb6837..5cd0ec364 100644 --- a/lib/logstorage/stats_avg.go +++ b/lib/logstorage/stats_avg.go @@ -1,7 +1,6 @@ package logstorage import ( - "math" "slices" "strconv" "unsafe" @@ -58,8 +57,8 @@ func (sap *statsAvgProcessor) updateStatsForRow(br *blockResult, rowIdx int) int if sap.sa.containsStar { // Scan all the fields for the given row for _, c := range br.getColumns() { - f := c.getFloatValueAtRow(rowIdx) - if !math.IsNaN(f) { + f, ok := c.getFloatValueAtRow(rowIdx) + if ok { sap.sum += f sap.count++ } @@ -68,8 +67,8 @@ func (sap *statsAvgProcessor) updateStatsForRow(br *blockResult, rowIdx int) int // Scan only the given fields for the given row for _, field := range sap.sa.fields { c := br.getColumnByName(field) - f := c.getFloatValueAtRow(rowIdx) - if !math.IsNaN(f) { + f, ok := c.getFloatValueAtRow(rowIdx) + if ok { sap.sum += f sap.count++ } diff --git a/lib/logstorage/stats_max.go b/lib/logstorage/stats_max.go index 1c1f7dbf5..b2c35aa94 100644 --- a/lib/logstorage/stats_max.go +++ b/lib/logstorage/stats_max.go @@ -1,9 +1,8 @@ package logstorage import ( - "math" "slices" - "strconv" + "strings" "unsafe" ) @@ -22,8 +21,7 @@ func (sm *statsMax) neededFields() []string { func (sm *statsMax) newStatsProcessor() (statsProcessor, int) { smp := &statsMaxProcessor{ - sm: sm, - max: nan, + sm: sm, } return smp, int(unsafe.Sizeof(*smp)) } @@ -31,62 +29,74 @@ func (sm *statsMax) newStatsProcessor() (statsProcessor, int) { type statsMaxProcessor struct { sm *statsMax - max float64 + max string + hasMax bool } func (smp *statsMaxProcessor) updateStatsForAllRows(br *blockResult) int { + maxLen := len(smp.max) + if smp.sm.containsStar { - // Find the maximum value across all the columns + // Find the minimum value across all the columns for _, c := range br.getColumns() { - f := c.getMaxValue() - if f > smp.max || math.IsNaN(smp.max) { - smp.max = f + for _, v := range c.getValues(br) { + smp.updateState(v) } } } else { - // Find the maximum value across the requested columns + // Find the minimum value across the requested columns for _, field := range smp.sm.fields { c := br.getColumnByName(field) - f := c.getMaxValue() - if f > smp.max || math.IsNaN(smp.max) { - smp.max = f + for _, v := range c.getValues(br) { + smp.updateState(v) } } } - return 0 + + return len(smp.max) - maxLen } func (smp *statsMaxProcessor) updateStatsForRow(br *blockResult, rowIdx int) int { + maxLen := len(smp.max) + if smp.sm.containsStar { - // Find the maximum value across all the fields for the given row + // Find the minimum value across all the fields for the given row for _, c := range br.getColumns() { - f := c.getFloatValueAtRow(rowIdx) - if f > smp.max || math.IsNaN(smp.max) { - smp.max = f - } + v := c.getValueAtRow(br, rowIdx) + smp.updateState(v) } } else { - // Find the maximum value across the requested fields for the given row + // Find the minimum value across the requested fields for the given row for _, field := range smp.sm.fields { c := br.getColumnByName(field) - f := c.getFloatValueAtRow(rowIdx) - if f > smp.max || math.IsNaN(smp.max) { - smp.max = f - } + v := c.getValueAtRow(br, rowIdx) + smp.updateState(v) } } - return 0 + + return maxLen - len(smp.max) } func (smp *statsMaxProcessor) mergeState(sfp statsProcessor) { src := sfp.(*statsMaxProcessor) - if src.max > smp.max { - smp.max = src.max + if src.hasMax { + smp.updateState(src.max) } } +func (smp *statsMaxProcessor) updateState(v string) { + if smp.hasMax && !lessString(smp.max, v) { + return + } + smp.max = strings.Clone(v) + smp.hasMax = true +} + func (smp *statsMaxProcessor) finalizeStats() string { - return strconv.FormatFloat(smp.max, 'f', -1, 64) + if !smp.hasMax { + return "NaN" + } + return smp.max } func parseStatsMax(lex *lexer) (*statsMax, error) { diff --git a/lib/logstorage/stats_min.go b/lib/logstorage/stats_min.go index 5dcf1dd97..8e451a0e1 100644 --- a/lib/logstorage/stats_min.go +++ b/lib/logstorage/stats_min.go @@ -1,9 +1,8 @@ package logstorage import ( - "math" "slices" - "strconv" + "strings" "unsafe" ) @@ -22,8 +21,7 @@ func (sm *statsMin) neededFields() []string { func (sm *statsMin) newStatsProcessor() (statsProcessor, int) { smp := &statsMinProcessor{ - sm: sm, - min: nan, + sm: sm, } return smp, int(unsafe.Sizeof(*smp)) } @@ -31,62 +29,74 @@ func (sm *statsMin) newStatsProcessor() (statsProcessor, int) { type statsMinProcessor struct { sm *statsMin - min float64 + min string + hasMin bool } func (smp *statsMinProcessor) updateStatsForAllRows(br *blockResult) int { + minLen := len(smp.min) + if smp.sm.containsStar { // Find the minimum value across all the columns for _, c := range br.getColumns() { - f := c.getMinValue() - if f < smp.min || math.IsNaN(smp.min) { - smp.min = f + for _, v := range c.getValues(br) { + smp.updateState(v) } } } else { // Find the minimum value across the requested columns for _, field := range smp.sm.fields { c := br.getColumnByName(field) - f := c.getMinValue() - if f < smp.min || math.IsNaN(smp.min) { - smp.min = f + for _, v := range c.getValues(br) { + smp.updateState(v) } } } - return 0 + + return len(smp.min) - minLen } func (smp *statsMinProcessor) updateStatsForRow(br *blockResult, rowIdx int) int { + minLen := len(smp.min) + if smp.sm.containsStar { // Find the minimum value across all the fields for the given row for _, c := range br.getColumns() { - f := c.getFloatValueAtRow(rowIdx) - if f < smp.min || math.IsNaN(smp.min) { - smp.min = f - } + v := c.getValueAtRow(br, rowIdx) + smp.updateState(v) } } else { // Find the minimum value across the requested fields for the given row for _, field := range smp.sm.fields { c := br.getColumnByName(field) - f := c.getFloatValueAtRow(rowIdx) - if f < smp.min || math.IsNaN(smp.min) { - smp.min = f - } + v := c.getValueAtRow(br, rowIdx) + smp.updateState(v) } } - return 0 + + return minLen - len(smp.min) } func (smp *statsMinProcessor) mergeState(sfp statsProcessor) { src := sfp.(*statsMinProcessor) - if src.min < smp.min { - smp.min = src.min + if src.hasMin { + smp.updateState(src.min) } } +func (smp *statsMinProcessor) updateState(v string) { + if smp.hasMin && !lessString(v, smp.min) { + return + } + smp.min = strings.Clone(v) + smp.hasMin = true +} + func (smp *statsMinProcessor) finalizeStats() string { - return strconv.FormatFloat(smp.min, 'f', -1, 64) + if !smp.hasMin { + return "NaN" + } + return smp.min } func parseStatsMin(lex *lexer) (*statsMin, error) { diff --git a/lib/logstorage/stats_quantile.go b/lib/logstorage/stats_quantile.go index cbdcfc07c..2d6552d5d 100644 --- a/lib/logstorage/stats_quantile.go +++ b/lib/logstorage/stats_quantile.go @@ -2,7 +2,6 @@ package logstorage import ( "fmt" - "math" "slices" "strconv" "unsafe" @@ -72,16 +71,16 @@ func (sqp *statsQuantileProcessor) updateStatsForRow(br *blockResult, rowIdx int if sqp.sq.containsStar { for _, c := range br.getColumns() { - f := c.getFloatValueAtRow(rowIdx) - if !math.IsNaN(f) { + f, ok := c.getFloatValueAtRow(rowIdx) + if ok { stateSizeIncrease += h.update(f) } } } else { for _, field := range sqp.sq.fields { c := br.getColumnByName(field) - f := c.getFloatValueAtRow(rowIdx) - if !math.IsNaN(f) { + f, ok := c.getFloatValueAtRow(rowIdx) + if ok { stateSizeIncrease += h.update(f) } } diff --git a/lib/logstorage/stats_sum.go b/lib/logstorage/stats_sum.go index bdb97cbd5..4480214df 100644 --- a/lib/logstorage/stats_sum.go +++ b/lib/logstorage/stats_sum.go @@ -68,8 +68,8 @@ func (ssp *statsSumProcessor) updateStatsForRow(br *blockResult, rowIdx int) int if ssp.ss.containsStar { // Sum all the fields for the given row for _, c := range br.getColumns() { - f := c.getFloatValueAtRow(rowIdx) - if !math.IsNaN(f) { + f, ok := c.getFloatValueAtRow(rowIdx) + if ok { if math.IsNaN(ssp.sum) { ssp.sum = f } else { @@ -81,8 +81,8 @@ func (ssp *statsSumProcessor) updateStatsForRow(br *blockResult, rowIdx int) int // Sum only the given fields for the given row for _, field := range ssp.ss.fields { c := br.getColumnByName(field) - f := c.getFloatValueAtRow(rowIdx) - if !math.IsNaN(f) { + f, ok := c.getFloatValueAtRow(rowIdx) + if ok { if math.IsNaN(ssp.sum) { ssp.sum = f } else {