diff --git a/docs/VictoriaLogs/LogsQL.md b/docs/VictoriaLogs/LogsQL.md index 44291699c..a9af1e1d8 100644 --- a/docs/VictoriaLogs/LogsQL.md +++ b/docs/VictoriaLogs/LogsQL.md @@ -1373,6 +1373,7 @@ LogsQL supports the following functions for [`stats` pipe](#stats-pipe): - [`max`](#max-stats) calcualtes the maximum value over the given numeric [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). - [`min`](#min-stats) calculates the minumum value over the given numeric [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). - [`sum`](#sum-stats) calculates the sum for the given numeric [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). +- [`sum_len`](#sum_len-stats) calculates the sum of lengths for the given [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). - [`uniq_values`](#uniq_values-stats) returns unique non-empty values for the given [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). - [`values`](#values-stats) returns all the values for the given [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). @@ -1535,6 +1536,22 @@ See also: - [`max`](#max-stats) - [`min`](#min-stats) +### sum_len stats + +`sum_len(field1, ..., fieldN)` [stats pipe](#stats-pipe) calculates the sum of lengths of all the values +for the given [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). + +For example, the following query returns the sum of lengths of [`_msg` fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field) +across all the logs for the last 5 minutes: + +```logsql +_time:5m | stats sum_len(_msg) messages_len +``` + +See also: + +- [`count`](#count-stats) + ### uniq_values stats `uniq_values(field1, ..., fieldN)` [stats pipe](#stats-pipe) returns the unique non-empty values across diff --git a/lib/logstorage/block_result.go b/lib/logstorage/block_result.go index a873a1c0f..d257775f0 100644 --- a/lib/logstorage/block_result.go +++ b/lib/logstorage/block_result.go @@ -1738,6 +1738,55 @@ func (c *blockResultColumn) getMinValue() float64 { } } +func (c *blockResultColumn) sumLenValues(br *blockResult) uint64 { + if c.isConst { + v := c.encodedValues[0] + return uint64(len(v)) * uint64(len(br.timestamps)) + } + if c.isTime { + return uint64(len(time.RFC3339Nano)) * uint64(len(br.timestamps)) + } + + switch c.valueType { + case valueTypeString: + return c.sumLenStringValues(br) + case valueTypeDict: + n := uint64(0) + dictValues := c.dictValues + for _, v := range c.encodedValues { + idx := v[0] + v := dictValues[idx] + n += uint64(len(v)) + } + return n + case valueTypeUint8: + return c.sumLenStringValues(br) + case valueTypeUint16: + return c.sumLenStringValues(br) + case valueTypeUint32: + return c.sumLenStringValues(br) + case valueTypeUint64: + return c.sumLenStringValues(br) + case valueTypeFloat64: + return c.sumLenStringValues(br) + case valueTypeIPv4: + return c.sumLenStringValues(br) + case valueTypeTimestampISO8601: + return uint64(len(iso8601Timestamp)) * uint64(len(br.timestamps)) + default: + logger.Panicf("BUG: unknown valueType=%d", c.valueType) + return 0 + } +} + +func (c *blockResultColumn) sumLenStringValues(br *blockResult) uint64 { + n := uint64(0) + for _, v := range c.getValues(br) { + n += uint64(len(v)) + } + return n +} + func (c *blockResultColumn) sumValues(br *blockResult) (float64, int) { if c.isConst { v := c.encodedValues[0] diff --git a/lib/logstorage/parser_test.go b/lib/logstorage/parser_test.go index ee9498a0b..fe4dbd9c2 100644 --- a/lib/logstorage/parser_test.go +++ b/lib/logstorage/parser_test.go @@ -926,6 +926,13 @@ func TestParseQuerySuccess(t *testing.T) { f(`* | stats by(x) values() limit 1_000 AS y`, `* | stats by (x) values(*) limit 1000 as y`) f(`* | stats by(x) values(a,*,b) y`, `* | stats by (x) values(*) as y`) + // stats pipe sum_len + f(`* | stats Sum_len(foo) bar`, `* | stats sum_len(foo) as bar`) + f(`* | stats BY(x, y, ) SUM_Len(foo,bar,) bar`, `* | stats by (x, y) sum_len(foo, bar) as bar`) + f(`* | stats sum_len() x`, `* | stats sum_len(*) as x`) + f(`* | stats sum_len(*) x`, `* | stats sum_len(*) as x`) + f(`* | stats sum_len(foo,*,bar) x`, `* | stats sum_len(*) as x`) + // stats pipe multiple funcs f(`* | stats count() "foo.bar:baz", count_uniq(a) bar`, `* | stats count(*) as "foo.bar:baz", count_uniq(a) as bar`) f(`* | stats by (x, y) count(*) foo, count_uniq(a,b) bar`, `* | stats by (x, y) count(*) as foo, count_uniq(a, b) as bar`) @@ -1275,6 +1282,10 @@ func TestParseQueryFailure(t *testing.T) { f(`foo | stats values(a) limit 0.5`) f(`foo | stats values(a) limit -1`) + // invalid stats sum_len + f(`foo | stats sum_len`) + f(`foo | stats sum_len()`) + // invalid stats grouping fields f(`foo | stats by(foo:bar) count() baz`) f(`foo | stats by(foo:/bar) count() baz`) diff --git a/lib/logstorage/pipe_stats.go b/lib/logstorage/pipe_stats.go index 3a9d8980b..fcdd43a5a 100644 --- a/lib/logstorage/pipe_stats.go +++ b/lib/logstorage/pipe_stats.go @@ -534,6 +534,12 @@ func parseStatsFunc(lex *lexer) (statsFunc, string, error) { return nil, "", fmt.Errorf("cannot parse 'values' func: %w", err) } sf = svs + case lex.isKeyword("sum_len"): + sss, err := parseStatsSumLen(lex) + if err != nil { + return nil, "", fmt.Errorf("cannot parse 'sum_len' func: %w", err) + } + sf = sss default: return nil, "", fmt.Errorf("unknown stats func %q", lex.token) } diff --git a/lib/logstorage/stats_sum_len.go b/lib/logstorage/stats_sum_len.go new file mode 100644 index 000000000..f09e48418 --- /dev/null +++ b/lib/logstorage/stats_sum_len.go @@ -0,0 +1,89 @@ +package logstorage + +import ( + "slices" + "strconv" + "unsafe" +) + +type statsSumLen struct { + fields []string + containsStar bool +} + +func (ss *statsSumLen) String() string { + return "sum_len(" + fieldNamesString(ss.fields) + ")" +} + +func (ss *statsSumLen) neededFields() []string { + return ss.fields +} + +func (ss *statsSumLen) newStatsProcessor() (statsProcessor, int) { + ssp := &statsSumLenProcessor{ + ss: ss, + sumLen: 0, + } + return ssp, int(unsafe.Sizeof(*ssp)) +} + +type statsSumLenProcessor struct { + ss *statsSumLen + + sumLen uint64 +} + +func (ssp *statsSumLenProcessor) updateStatsForAllRows(br *blockResult) int { + if ssp.ss.containsStar { + // Sum all the columns + for _, c := range br.getColumns() { + ssp.sumLen += c.sumLenValues(br) + } + } else { + // Sum the requested columns + for _, field := range ssp.ss.fields { + c := br.getColumnByName(field) + ssp.sumLen += c.sumLenValues(br) + } + } + return 0 +} + +func (ssp *statsSumLenProcessor) updateStatsForRow(br *blockResult, rowIdx int) int { + if ssp.ss.containsStar { + // Sum all the fields for the given row + for _, c := range br.getColumns() { + v := c.getValueAtRow(br, rowIdx) + ssp.sumLen += uint64(len(v)) + } + } else { + // Sum only the given fields for the given row + for _, field := range ssp.ss.fields { + c := br.getColumnByName(field) + v := c.getValueAtRow(br, rowIdx) + ssp.sumLen += uint64(len(v)) + } + } + return 0 +} + +func (ssp *statsSumLenProcessor) mergeState(sfp statsProcessor) { + src := sfp.(*statsSumLenProcessor) + ssp.sumLen += src.sumLen +} + +func (ssp *statsSumLenProcessor) finalizeStats() string { + return strconv.FormatUint(ssp.sumLen, 10) +} + +func parseStatsSumLen(lex *lexer) (*statsSumLen, error) { + fields, err := parseFieldNamesForStatsFunc(lex, "sum_len") + if err != nil { + return nil, err + } + ss := &statsSumLen{ + fields: fields, + containsStar: slices.Contains(fields, "*"), + } + return ss, nil +}