From 5efe4eeadded69070fa848f8da765010e2d14f39 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Wed, 1 May 2024 10:31:46 +0200 Subject: [PATCH] wip --- docs/VictoriaLogs/CHANGELOG.md | 4 +- lib/logstorage/block_result.go | 67 ++++++++++++++++------------------ lib/logstorage/stats_max.go | 12 +++--- lib/logstorage/stats_min.go | 12 +++--- lib/logstorage/stats_sum.go | 33 ++++++++++++++--- 5 files changed, 74 insertions(+), 54 deletions(-) diff --git a/docs/VictoriaLogs/CHANGELOG.md b/docs/VictoriaLogs/CHANGELOG.md index 3c60273ed..f24aa9376 100644 --- a/docs/VictoriaLogs/CHANGELOG.md +++ b/docs/VictoriaLogs/CHANGELOG.md @@ -21,9 +21,7 @@ according to [these docs](https://docs.victoriametrics.com/VictoriaLogs/QuickSta * FEATURE: return all the log fields by default in query results. Previously only [`_stream`](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields), [`_time`](https://docs.victoriametrics.com/victorialogs/keyconcepts/#time-field) and [`_msg`](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field) fields were returned by default. * FEATURE: add support for returning only the requested log [fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). See [these docs](https://docs.victoriametrics.com/victorialogs/logsql/#querying-specific-fields). -* FEATURE: add support for calculating the number of matching logs and the number of logs with non-empty [fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). Grouping by arbitrary set of [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) is supported. See [these docs](https://docs.victoriametrics.com/victorialogs/logsql/#stats) for details. -* FEATURE: add support for counting the number of unique values for [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). See [these docs](https://docs.victoriametrics.com/victorialogs/logsql/#stats) for details. -* FEATURE: add support for summing [log field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) values. See [these docs](https://docs.victoriametrics.com/victorialogs/logsql/#stats) for details. +* FEATURE: add support for calculating `count()`, `uniq()`, `sum()`, `min()` and `max()` over [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). Grouping by arbitrary set of [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) is supported. See [these docs](https://docs.victoriametrics.com/victorialogs/logsql/#stats) for details. * FEATURE: add support for limiting the number of returned results. See [these docs](https://docs.victoriametrics.com/victorialogs/logsql/#limiters). * FEATURE: optimize performance for [LogsQL query](https://docs.victoriametrics.com/victorialogs/logsql/), which contains multiple filters for [words](https://docs.victoriametrics.com/victorialogs/logsql/#word-filter) or [phrases](https://docs.victoriametrics.com/victorialogs/logsql/#phrase-filter) delimited with [`AND` operator](https://docs.victoriametrics.com/victorialogs/logsql/#logical-filter). For example, `foo AND bar` query must find [log messages](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field) with `foo` and `bar` words at faster speed. * FEATURE: allow using `_` inside numbers. For example, `score:range[1_000, 5_000_000]` for [`range` filter](https://docs.victoriametrics.com/victorialogs/logsql/#range-filter). diff --git a/lib/logstorage/block_result.go b/lib/logstorage/block_result.go index 6c92cc13b..637873e31 100644 --- a/lib/logstorage/block_result.go +++ b/lib/logstorage/block_result.go @@ -750,7 +750,7 @@ func (c *blockResultColumn) getMaxValue(br *blockResult) float64 { switch c.valueType { case valueTypeString: - max := math.Inf(-1) + max := nan f := float64(0) ok := false values := c.encodedValues @@ -758,13 +758,10 @@ func (c *blockResultColumn) getMaxValue(br *blockResult) float64 { if i == 0 || values[i-1] != values[i] { f, ok = tryParseFloat64(values[i]) } - if ok && f > max { + if ok && (f > max || math.IsNaN(max)) { max = f } } - if math.IsInf(max, -1) { - return nan - } return max case valueTypeDict: a := encoding.GetFloat64s(len(c.dictValues)) @@ -776,18 +773,15 @@ func (c *blockResultColumn) getMaxValue(br *blockResult) float64 { } dictValuesFloat[i] = f } - max := math.Inf(-1) + max := nan for _, v := range c.encodedValues { dictIdx := v[0] f := dictValuesFloat[dictIdx] - if f > max { + if f > max || math.IsNaN(max) { max = f } } encoding.PutFloat64s(a) - if math.IsInf(max, -1) { - return nan - } return max case valueTypeUint8: max := math.Inf(-1) @@ -853,7 +847,7 @@ func (c *blockResultColumn) getMinValue(br *blockResult) float64 { switch c.valueType { case valueTypeString: - min := math.Inf(1) + min := nan f := float64(0) ok := false values := c.encodedValues @@ -861,13 +855,10 @@ func (c *blockResultColumn) getMinValue(br *blockResult) float64 { if i == 0 || values[i-1] != values[i] { f, ok = tryParseFloat64(values[i]) } - if ok && f < min { + if ok && (f < min || math.IsNaN(min)) { min = f } } - if math.IsInf(min, 1) { - return nan - } return min case valueTypeDict: a := encoding.GetFloat64s(len(c.dictValues)) @@ -879,18 +870,15 @@ func (c *blockResultColumn) getMinValue(br *blockResult) float64 { } dictValuesFloat[i] = f } - min := math.Inf(1) + min := nan for _, v := range c.encodedValues { dictIdx := v[0] f := dictValuesFloat[dictIdx] - if f < min { + if f < min || math.IsNaN(min) { min = f } } encoding.PutFloat64s(a) - if math.IsInf(min, 1) { - return nan - } return min case valueTypeUint8: min := math.Inf(1) @@ -941,22 +929,23 @@ func (c *blockResultColumn) getMinValue(br *blockResult) float64 { } } -func (c *blockResultColumn) sumValues(br *blockResult) float64 { +func (c *blockResultColumn) sumValues(br *blockResult) (float64, int) { if c.isConst { v := c.encodedValues[0] f, ok := tryParseFloat64(v) if !ok { - return 0 + return 0, 0 } - return f * float64(len(br.timestamps)) + return f * float64(len(br.timestamps)), len(br.timestamps) } if c.isTime { - return 0 + return 0, 0 } switch c.valueType { case valueTypeString: sum := float64(0) + count := 0 f := float64(0) ok := false values := c.encodedValues @@ -966,53 +955,59 @@ func (c *blockResultColumn) sumValues(br *blockResult) float64 { } if ok { sum += f + count++ } } - return sum + return sum, count case valueTypeDict: a := encoding.GetFloat64s(len(c.dictValues)) dictValuesFloat := a.A for i, v := range c.dictValues { f, ok := tryParseFloat64(v) if !ok { - f = 0 + f = nan } dictValuesFloat[i] = f } sum := float64(0) + count := 0 for _, v := range c.encodedValues { dictIdx := v[0] - sum += dictValuesFloat[dictIdx] + f := dictValuesFloat[dictIdx] + if !math.IsNaN(f) { + sum += f + count++ + } } encoding.PutFloat64s(a) - return sum + return sum, count case valueTypeUint8: sum := uint64(0) for _, v := range c.encodedValues { sum += uint64(v[0]) } - return float64(sum) + return float64(sum), len(br.timestamps) case valueTypeUint16: sum := uint64(0) for _, v := range c.encodedValues { b := bytesutil.ToUnsafeBytes(v) sum += uint64(encoding.UnmarshalUint16(b)) } - return float64(sum) + return float64(sum), len(br.timestamps) case valueTypeUint32: sum := uint64(0) for _, v := range c.encodedValues { b := bytesutil.ToUnsafeBytes(v) sum += uint64(encoding.UnmarshalUint32(b)) } - return float64(sum) + return float64(sum), len(br.timestamps) case valueTypeUint64: sum := float64(0) for _, v := range c.encodedValues { b := bytesutil.ToUnsafeBytes(v) sum += float64(encoding.UnmarshalUint64(b)) } - return sum + return sum, len(br.timestamps) case valueTypeFloat64: sum := float64(0) for _, v := range c.encodedValues { @@ -1023,14 +1018,14 @@ func (c *blockResultColumn) sumValues(br *blockResult) float64 { sum += f } } - return sum + return sum, len(br.timestamps) case valueTypeIPv4: - return 0 + return 0, 0 case valueTypeTimestampISO8601: - return 0 + return 0, 0 default: logger.Panicf("BUG: unknown valueType=%d", c.valueType) - return 0 + return 0, 0 } } diff --git a/lib/logstorage/stats_max.go b/lib/logstorage/stats_max.go index ce57f9577..34ada87a1 100644 --- a/lib/logstorage/stats_max.go +++ b/lib/logstorage/stats_max.go @@ -2,6 +2,7 @@ package logstorage import ( "fmt" + "math" "slices" "strconv" "unsafe" @@ -22,7 +23,8 @@ func (sm *statsMax) neededFields() []string { func (sm *statsMax) newStatsProcessor() (statsProcessor, int) { smp := &statsMaxProcessor{ - sm: sm, + sm: sm, + max: nan, } return smp, int(unsafe.Sizeof(*smp)) } @@ -38,7 +40,7 @@ func (smp *statsMaxProcessor) updateStatsForAllRows(br *blockResult) int { // Find the maximum value across all the columns for _, c := range br.getColumns() { f := c.getMaxValue(br) - if f > smp.max { + if f > smp.max || math.IsNaN(smp.max) { smp.max = f } } @@ -49,7 +51,7 @@ func (smp *statsMaxProcessor) updateStatsForAllRows(br *blockResult) int { for _, field := range smp.sm.fields { c := br.getColumnByName(field) f := c.getMaxValue(br) - if f > smp.max { + if f > smp.max || math.IsNaN(smp.max) { smp.max = f } } @@ -61,7 +63,7 @@ func (smp *statsMaxProcessor) updateStatsForRow(br *blockResult, rowIdx int) int // Find the maximum value across all the fields for the given row for _, c := range br.getColumns() { f := c.getFloatValueAtRow(rowIdx) - if f > smp.max { + if f > smp.max || math.IsNaN(smp.max) { smp.max = f } } @@ -72,7 +74,7 @@ func (smp *statsMaxProcessor) updateStatsForRow(br *blockResult, rowIdx int) int for _, field := range smp.sm.fields { c := br.getColumnByName(field) f := c.getFloatValueAtRow(rowIdx) - if f > smp.max { + if f > smp.max || math.IsNaN(smp.max) { smp.max = f } } diff --git a/lib/logstorage/stats_min.go b/lib/logstorage/stats_min.go index 67852de7d..50e47e1fd 100644 --- a/lib/logstorage/stats_min.go +++ b/lib/logstorage/stats_min.go @@ -2,6 +2,7 @@ package logstorage import ( "fmt" + "math" "slices" "strconv" "unsafe" @@ -22,7 +23,8 @@ func (sm *statsMin) neededFields() []string { func (sm *statsMin) newStatsProcessor() (statsProcessor, int) { smp := &statsMinProcessor{ - sm: sm, + sm: sm, + min: nan, } return smp, int(unsafe.Sizeof(*smp)) } @@ -38,7 +40,7 @@ func (smp *statsMinProcessor) updateStatsForAllRows(br *blockResult) int { // Find the minimum value across all the columns for _, c := range br.getColumns() { f := c.getMinValue(br) - if f < smp.min { + if f < smp.min || math.IsNaN(smp.min) { smp.min = f } } @@ -49,7 +51,7 @@ func (smp *statsMinProcessor) updateStatsForAllRows(br *blockResult) int { for _, field := range smp.sm.fields { c := br.getColumnByName(field) f := c.getMinValue(br) - if f < smp.min { + if f < smp.min || math.IsNaN(smp.min) { smp.min = f } } @@ -61,7 +63,7 @@ func (smp *statsMinProcessor) updateStatsForRow(br *blockResult, rowIdx int) int // Find the minimum value across all the fields for the given row for _, c := range br.getColumns() { f := c.getFloatValueAtRow(rowIdx) - if f < smp.min { + if f < smp.min || math.IsNaN(smp.min) { smp.min = f } } @@ -72,7 +74,7 @@ func (smp *statsMinProcessor) updateStatsForRow(br *blockResult, rowIdx int) int for _, field := range smp.sm.fields { c := br.getColumnByName(field) f := c.getFloatValueAtRow(rowIdx) - if f < smp.min { + if f < smp.min || math.IsNaN(smp.min) { smp.min = f } } diff --git a/lib/logstorage/stats_sum.go b/lib/logstorage/stats_sum.go index 1a328f0ef..7515abf15 100644 --- a/lib/logstorage/stats_sum.go +++ b/lib/logstorage/stats_sum.go @@ -23,7 +23,8 @@ func (ss *statsSum) neededFields() []string { func (ss *statsSum) newStatsProcessor() (statsProcessor, int) { ssp := &statsSumProcessor{ - ss: ss, + ss: ss, + sum: nan, } return ssp, int(unsafe.Sizeof(*ssp)) } @@ -38,7 +39,14 @@ func (ssp *statsSumProcessor) updateStatsForAllRows(br *blockResult) int { if ssp.ss.containsStar { // Sum all the columns for _, c := range br.getColumns() { - ssp.sum += c.sumValues(br) + f, count := c.sumValues(br) + if count > 0 { + if math.IsNaN(ssp.sum) { + ssp.sum = f + } else { + ssp.sum += f + } + } } return 0 } @@ -46,7 +54,14 @@ func (ssp *statsSumProcessor) updateStatsForAllRows(br *blockResult) int { // Sum the requested columns for _, field := range ssp.ss.fields { c := br.getColumnByName(field) - ssp.sum += c.sumValues(br) + f, count := c.sumValues(br) + if count > 0 { + if math.IsNaN(ssp.sum) { + ssp.sum = f + } else { + ssp.sum += f + } + } } return 0 } @@ -57,7 +72,11 @@ func (ssp *statsSumProcessor) updateStatsForRow(br *blockResult, rowIdx int) int for _, c := range br.getColumns() { f := c.getFloatValueAtRow(rowIdx) if !math.IsNaN(f) { - ssp.sum += f + if math.IsNaN(ssp.sum) { + ssp.sum = f + } else { + ssp.sum += f + } } } return 0 @@ -68,7 +87,11 @@ func (ssp *statsSumProcessor) updateStatsForRow(br *blockResult, rowIdx int) int c := br.getColumnByName(field) f := c.getFloatValueAtRow(rowIdx) if !math.IsNaN(f) { - ssp.sum += f + if math.IsNaN(ssp.sum) { + ssp.sum = f + } else { + ssp.sum += f + } } } return 0