From 364f084b43694be52eea81db7ff07261342e8b5c Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Thu, 3 Oct 2024 16:26:03 +0200 Subject: [PATCH] lib/logstorage: add `len` pipe for calculating byte length of log field values --- docs/VictoriaLogs/CHANGELOG.md | 3 + docs/VictoriaLogs/LogsQL.md | 22 ++- lib/logstorage/parser_test.go | 20 +++ lib/logstorage/pipe.go | 7 + lib/logstorage/pipe_field_values.go | 2 +- lib/logstorage/pipe_field_values_test.go | 2 +- lib/logstorage/pipe_len.go | 174 +++++++++++++++++++++++ lib/logstorage/pipe_len_test.go | 92 ++++++++++++ 8 files changed, 318 insertions(+), 4 deletions(-) create mode 100644 lib/logstorage/pipe_len.go create mode 100644 lib/logstorage/pipe_len_test.go diff --git a/docs/VictoriaLogs/CHANGELOG.md b/docs/VictoriaLogs/CHANGELOG.md index 3dd6b6e1e..e1a8b80df 100644 --- a/docs/VictoriaLogs/CHANGELOG.md +++ b/docs/VictoriaLogs/CHANGELOG.md @@ -15,6 +15,9 @@ according to [these docs](https://docs.victoriametrics.com/victorialogs/quicksta ## tip +* FEATURE: [vlogscli](https://docs.victoriametrics.com/victorialogs/querying/vlogscli/): preserve `less` output after the exit from scrolling mode. This should help re-using previous query results in subsequent queries. +* FEATURE: add [`len` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#len-pipe) for calculating the length for the given [log field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) value in bytes. + ## [v0.33.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.33.0-victorialogs) Released at 2024-10-01 diff --git a/docs/VictoriaLogs/LogsQL.md b/docs/VictoriaLogs/LogsQL.md index 7d26a7f52..78779a60b 100644 --- a/docs/VictoriaLogs/LogsQL.md +++ b/docs/VictoriaLogs/LogsQL.md @@ -1304,6 +1304,7 @@ LogsQL supports the following pipes: - [`fields`](#fields-pipe) selects the given set of [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). - [`filter`](#filter-pipe) applies additional [filters](#filters) to results. - [`format`](#format-pipe) formats output field from input [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). +- [`len`](#len-pipe) calculates byte length of the given [log field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) value. - [`limit`](#limit-pipe) limits the number selected logs. - [`math`](#math-pipe) performs mathematical calculations over [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). - [`offset`](#offset-pipe) skips the given number of selected logs. @@ -1758,6 +1759,22 @@ only if `ip` and `host` [fields](https://docs.victoriametrics.com/victorialogs/k _time:5m | format if (ip:* and host:*) "request from :" as message ``` +### len pipe + +The `| len(field) as result` pipe stores byte length of the given `field` value into the `result` field. +For example, the following query shows top 5 log entries with the maximum byte length of `_msg` field across +logs for the last 5 minutes: + +```logsql +_time:5m | len(_msg) as msg_len | sort by (msg_len desc) | limit 1 +``` + +See also: + +- [`sum_len` stats function](#sum-len-stats) +- [`sort` pipe](#sort-pipe) +- [`limit` pipe](#limit-pipe) + ### limit pipe If only a subset of selected logs must be processed, then `| limit N` [pipe](#pipes) can be used, where `N` can contain any [supported integer numeric value](#numeric-values). @@ -3027,10 +3044,10 @@ See also: ### sum_len stats -`sum_len(field1, ..., fieldN)` [stats pipe function](#stats-pipe-functions) calculates the sum of lengths of all the values +`sum_len(field1, ..., fieldN)` [stats pipe function](#stats-pipe-functions) calculates the sum of byte lengths of all the values for the given [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). -For example, the following query returns the sum of lengths of [`_msg` fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field) +For example, the following query returns the sum of byte lengths of [`_msg` fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field) across all the logs for the last 5 minutes: ```logsql @@ -3040,6 +3057,7 @@ _time:5m | stats sum_len(_msg) messages_len See also: - [`count`](#count-stats) +- [`len` pipe](#len-pipe) ### uniq_values stats diff --git a/lib/logstorage/parser_test.go b/lib/logstorage/parser_test.go index 853f380c7..77e46ca46 100644 --- a/lib/logstorage/parser_test.go +++ b/lib/logstorage/parser_test.go @@ -978,6 +978,10 @@ func TestParseQuerySuccess(t *testing.T) { f(`foo | field_names y`, `foo | field_names as y`) f(`foo | field_names`, `foo | field_names`) + // field_values pipe + f(`* | field_values x`, `* | field_values x`) + f(`* | field_values (x)`, `* | field_values x`) + // blocks_count pipe f(`foo | blocks_count as x`, `foo | blocks_count as x`) f(`foo | blocks_count y`, `foo | blocks_count as y`) @@ -999,6 +1003,14 @@ func TestParseQuerySuccess(t *testing.T) { f(`* | rm foo`, `* | delete foo`) f(`* | DELETE foo, bar`, `* | delete foo, bar`) + // len pipe + f(`* | len(x)`, `* | len(x)`) + f(`* | len(x) as _msg`, `* | len(x)`) + f(`* | len(x) y`, `* | len(x) as y`) + f(`* | len ( x ) as y`, `* | len(x) as y`) + f(`* | len x y`, `* | len(x) as y`) + f(`* | len x as y`, `* | len(x) as y`) + // limit and head pipe f(`foo | limit`, `foo | limit 10`) f(`foo | head`, `foo | limit 10`) @@ -1519,6 +1531,12 @@ func TestParseQueryFailure(t *testing.T) { f(`foo | delete foo,`) f(`foo | delete foo,,`) + // invalid len pipe + f(`foo | len`) + f(`foo | len(`) + f(`foo | len()`) + f(`foo | len (x) y z`) + // invalid limit pipe value f(`foo | limit bar`) f(`foo | limit -123`) @@ -2078,6 +2096,7 @@ func TestQueryCanReturnLastNResults(t *testing.T) { f("* | rm x", true) f("* | stats count() rows", false) f("* | sort by (x)", false) + f("* | len(x)", true) f("* | limit 10", false) f("* | offset 10", false) f("* | uniq (x)", false) @@ -2114,6 +2133,7 @@ func TestQueryCanLiveTail(t *testing.T) { f("* | field_values a", false) f("* | filter foo", true) f("* | format 'ac'", true) + f("* | len(x)", true) f("* | limit 10", false) f("* | math a/b as c", true) f("* | offset 10", false) diff --git a/lib/logstorage/pipe.go b/lib/logstorage/pipe.go index 5b0bc8b46..a8dcff2a6 100644 --- a/lib/logstorage/pipe.go +++ b/lib/logstorage/pipe.go @@ -165,6 +165,12 @@ func parsePipe(lex *lexer) (pipe, error) { return nil, fmt.Errorf("cannot parse 'format' pipe: %w", err) } return pf, nil + case lex.isKeyword("len"): + pl, err := parsePipeLen(lex) + if err != nil { + return nil, fmt.Errorf("cannot parse 'len' pipe: %w", err) + } + return pl, nil case lex.isKeyword("limit", "head"): pl, err := parsePipeLimit(lex) if err != nil { @@ -301,6 +307,7 @@ var pipeNames = func() map[string]struct{} { "fields", "keep", "filter", "where", "format", + "len", "limit", "head", "math", "eval", "offset", "skip", diff --git a/lib/logstorage/pipe_field_values.go b/lib/logstorage/pipe_field_values.go index ba804a878..4270f9e31 100644 --- a/lib/logstorage/pipe_field_values.go +++ b/lib/logstorage/pipe_field_values.go @@ -77,7 +77,7 @@ func parsePipeFieldValues(lex *lexer) (*pipeFieldValues, error) { } lex.nextToken() - field, err := parseFieldName(lex) + field, err := parseFieldNameWithOptionalParens(lex) if err != nil { return nil, fmt.Errorf("cannot parse field name for 'field_values': %w", err) } diff --git a/lib/logstorage/pipe_field_values_test.go b/lib/logstorage/pipe_field_values_test.go index 26b0419b6..a27494405 100644 --- a/lib/logstorage/pipe_field_values_test.go +++ b/lib/logstorage/pipe_field_values_test.go @@ -53,7 +53,7 @@ func TestPipeFieldValues(t *testing.T) { }, }) - f("field_values b", [][]Field{ + f("field_values (b)", [][]Field{ { {"a", `2`}, {"b", `3`}, diff --git a/lib/logstorage/pipe_len.go b/lib/logstorage/pipe_len.go new file mode 100644 index 000000000..907257e17 --- /dev/null +++ b/lib/logstorage/pipe_len.go @@ -0,0 +1,174 @@ +package logstorage + +import ( + "fmt" + "unsafe" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" +) + +// pipeLen processes '| len ...' pipe. +// +// See https://docs.victoriametrics.com/victorialogs/logsql/#len-pipe +type pipeLen struct { + fieldName string + resultField string +} + +func (pl *pipeLen) String() string { + s := "len(" + quoteTokenIfNeeded(pl.fieldName) + ")" + if !isMsgFieldName(pl.resultField) { + s += " as " + quoteTokenIfNeeded(pl.resultField) + } + return s +} + +func (pl *pipeLen) canLiveTail() bool { + return true +} + +func (pl *pipeLen) updateNeededFields(neededFields, unneededFields fieldsSet) { + if neededFields.contains("*") { + if !unneededFields.contains(pl.resultField) { + unneededFields.add(pl.resultField) + unneededFields.remove(pl.fieldName) + } + } else { + if neededFields.contains(pl.resultField) { + neededFields.remove(pl.resultField) + neededFields.add(pl.fieldName) + } + } +} + +func (pl *pipeLen) optimize() { + // Nothing to do +} + +func (pl *pipeLen) hasFilterInWithQuery() bool { + return false +} + +func (pl *pipeLen) initFilterInValues(_ map[string][]string, _ getFieldValuesFunc) (pipe, error) { + return pl, nil +} + +func (pl *pipeLen) newPipeProcessor(workersCount int, _ <-chan struct{}, _ func(), ppNext pipeProcessor) pipeProcessor { + return &pipeLenProcessor{ + pl: pl, + ppNext: ppNext, + + shards: make([]pipeLenProcessorShard, workersCount), + } +} + +type pipeLenProcessor struct { + pl *pipeLen + ppNext pipeProcessor + + shards []pipeLenProcessorShard +} + +type pipeLenProcessorShard struct { + pipeLenProcessorShardNopad + + // The padding prevents false sharing on widespread platforms with 128 mod (cache line size) = 0 . + _ [128 - unsafe.Sizeof(pipeLenProcessorShardNopad{})%128]byte +} + +type pipeLenProcessorShardNopad struct { + a arena + rc resultColumn +} + +func (plp *pipeLenProcessor) writeBlock(workerID uint, br *blockResult) { + if br.rowsLen == 0 { + return + } + + shard := &plp.shards[workerID] + shard.rc.name = plp.pl.resultField + + c := br.getColumnByName(plp.pl.fieldName) + if c.isConst { + // Fast path for const column + vLen := len(c.valuesEncoded[0]) + shard.a.b = marshalUint64String(shard.a.b[:0], uint64(vLen)) + vLenStr := bytesutil.ToUnsafeString(shard.a.b) + for rowIdx := 0; rowIdx < br.rowsLen; rowIdx++ { + shard.rc.addValue(vLenStr) + } + } else { + // Slow path for other columns + for rowIdx := 0; rowIdx < br.rowsLen; rowIdx++ { + v := c.getValueAtRow(br, rowIdx) + vLen := len(v) + aLen := len(shard.a.b) + shard.a.b = marshalUint64String(shard.a.b, uint64(vLen)) + vLenStr := bytesutil.ToUnsafeString(shard.a.b[aLen:]) + shard.rc.addValue(vLenStr) + } + } + + // Write the result to ppNext + br.addResultColumn(&shard.rc) + plp.ppNext.writeBlock(workerID, br) + + shard.a.reset() + shard.rc.reset() +} + +func (plp *pipeLenProcessor) flush() error { + return nil +} + +func parsePipeLen(lex *lexer) (*pipeLen, error) { + if !lex.isKeyword("len") { + return nil, fmt.Errorf("unexpected token: %q; want %q", lex.token, "len") + } + lex.nextToken() + + fieldName, err := parseFieldNameWithOptionalParens(lex) + if err != nil { + return nil, fmt.Errorf("cannot parse field name for 'len' pipe: %w", err) + } + + // parse optional 'as ...` part + resultField := "_msg" + if lex.isKeyword("as") { + lex.nextToken() + } + if !lex.isKeyword("|", ")", "") { + field, err := parseFieldName(lex) + if err != nil { + return nil, fmt.Errorf("cannot parse result field after 'len(%s)': %w", quoteTokenIfNeeded(fieldName), err) + } + resultField = field + } + + pl := &pipeLen{ + fieldName: fieldName, + resultField: resultField, + } + + return pl, nil +} + +func parseFieldNameWithOptionalParens(lex *lexer) (string, error) { + hasParens := false + if lex.isKeyword("(") { + lex.nextToken() + hasParens = true + } + fieldName, err := parseFieldName(lex) + if err != nil { + return "", err + } + if hasParens { + if !lex.isKeyword(")") { + return "", fmt.Errorf("missing ')' after '%s'", quoteTokenIfNeeded(fieldName)) + } + lex.nextToken() + } + return fieldName, nil +} diff --git a/lib/logstorage/pipe_len_test.go b/lib/logstorage/pipe_len_test.go new file mode 100644 index 000000000..68b0dcd41 --- /dev/null +++ b/lib/logstorage/pipe_len_test.go @@ -0,0 +1,92 @@ +package logstorage + +import ( + "testing" +) + +func TestParsePipeLenSuccess(t *testing.T) { + f := func(pipeStr string) { + t.Helper() + expectParsePipeSuccess(t, pipeStr) + } + + f(`len(foo)`) + f(`len(foo) as bar`) +} + +func TestParsePipeLenFailure(t *testing.T) { + f := func(pipeStr string) { + t.Helper() + expectParsePipeFailure(t, pipeStr) + } + + f(`len`) + f(`len(`) + f(`len()`) + f(`len(x) y z`) +} + +func TestPipeLen(t *testing.T) { + f := func(pipeStr string, rows, rowsExpected [][]Field) { + t.Helper() + expectPipeResults(t, pipeStr, rows, rowsExpected) + } + + f(`len(foo) x`, [][]Field{ + { + {"foo", `abcde`}, + {"baz", "1234567890"}, + }, + { + {"foo", `abc`}, + {"bar", `de`}, + }, + { + {"baz", "xyz"}, + }, + }, [][]Field{ + { + {"foo", `abcde`}, + {"baz", "1234567890"}, + {"x", "5"}, + }, + { + {"foo", `abc`}, + {"bar", `de`}, + {"x", "3"}, + }, + { + {"baz", "xyz"}, + {"x", "0"}, + }, + }) +} + +func TestPipeLenUpdateNeededFields(t *testing.T) { + f := func(s string, neededFields, unneededFields, neededFieldsExpected, unneededFieldsExpected string) { + t.Helper() + expectPipeNeededFields(t, s, neededFields, unneededFields, neededFieldsExpected, unneededFieldsExpected) + } + + // all the needed fields + f(`len(y) x`, "*", "", "*", "x") + f(`len(x) x`, "*", "", "*", "") + + // unneeded fields do not intersect with output field + f(`len(y) as x`, "*", "f1,f2", "*", "f1,f2,x") + f(`len(x) as x`, "*", "f1,f2", "*", "f1,f2") + + // unneeded fields intersect with output field + f(`len(z) as x`, "*", "x,y", "*", "x,y") + f(`len(y) as x`, "*", "x,y", "*", "x,y") + f(`len(x) as x`, "*", "x,y", "*", "x,y") + + // needed fields do not intersect with output field + f(`len(y) as z`, "x,y", "", "x,y", "") + f(`len(z) as z`, "x,y", "", "x,y", "") + + // needed fields intersect with output field + f(`len (z) as f2`, "f2,y", "", "y,z", "") + f(`len (y) as f2`, "f2,y", "", "y", "") + f(`len (y) as y`, "f2,y", "", "f2,y", "") +}