From 93a645dcfc859842cbf10de377e510b107e2c1c7 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Wed, 22 May 2024 15:29:18 +0200 Subject: [PATCH] wip --- docs/VictoriaLogs/LogsQL.md | 6 + lib/logstorage/parser.go | 2 + lib/logstorage/parser_test.go | 55 +++++++++ lib/logstorage/pattern.go | 2 +- lib/logstorage/pattern_test.go | 5 + lib/logstorage/pipe.go | 8 +- lib/logstorage/pipe_extract.go | 4 +- lib/logstorage/pipe_fields.go | 2 +- lib/logstorage/pipe_format.go | 30 +++-- lib/logstorage/pipe_format_test.go | 187 +++++++++++++++++++++++++++++ lib/logstorage/storage_search.go | 12 ++ 11 files changed, 297 insertions(+), 16 deletions(-) create mode 100644 lib/logstorage/pipe_format_test.go diff --git a/docs/VictoriaLogs/LogsQL.md b/docs/VictoriaLogs/LogsQL.md index a16b7dcfe..5f89041bd 100644 --- a/docs/VictoriaLogs/LogsQL.md +++ b/docs/VictoriaLogs/LogsQL.md @@ -1220,6 +1220,12 @@ and [`_msg`](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message- _time:5m | fields host, _msg ``` +`keep` can be used instead of `fields` for convenience. For example, the following query is equivalent to the previous one: + +```logsql +_time:5m | keep host, _msg +``` + See also: - [`copy` pipe](#copy-pipe) diff --git a/lib/logstorage/parser.go b/lib/logstorage/parser.go index 8a71ba1de..e0804d4f2 100644 --- a/lib/logstorage/parser.go +++ b/lib/logstorage/parser.go @@ -327,6 +327,8 @@ func (q *Query) Optimize() { for _, f := range t.funcs { f.iff.optimizeFilterIn() } + case *pipeFormat: + t.iff.optimizeFilterIn() case *pipeExtract: t.iff.optimizeFilterIn() case *pipeUnpackJSON: diff --git a/lib/logstorage/parser_test.go b/lib/logstorage/parser_test.go index 55874b3d3..108e9f546 100644 --- a/lib/logstorage/parser_test.go +++ b/lib/logstorage/parser_test.go @@ -1621,6 +1621,38 @@ func TestQueryGetNeededColumns(t *testing.T) { f(`* | fields x,y | field_names as bar | fields baz`, `x,y`, ``) f(`* | rm x,y | field_names as bar | fields baz`, `*`, `x,y`) + f(`* | format "foo" as s1`, `*`, `s1`) + f(`* | format "foo" as s1`, `*`, `s1`) + f(`* | format "foo" as s1`, `*`, ``) + + f(`* | format "foo" if (x1:y) as s1`, `*`, `s1`) + f(`* | format "foo" if (x1:y) as s1`, `*`, `s1`) + f(`* | format "foo" if (s1:y) as s1`, `*`, ``) + f(`* | format "foo" if (x1:y) as s1`, `*`, ``) + + f(`* | format "foo" as s1 | fields f1`, `f1`, ``) + f(`* | format "foo" as s1 | fields s1`, ``, ``) + f(`* | format "foo" as s1 | fields f2`, `f2`, ``) + f(`* | format "foo" as s1 | fields f1`, `f1`, ``) + f(`* | format "foo" as s1 | fields s1`, `f1`, ``) + f(`* | format "foo" as s1 | fields f1`, `f1`, ``) + f(`* | format "foo" as s1 | fields s1`, `s1`, ``) + + f(`* | format "foo" if (f1:x) as s1 | fields s1`, `f1`, ``) + f(`* | format "foo" if (f1:x) as s1 | fields s2`, `s2`, ``) + + f(`* | format "foo" as s1 | rm f1`, `*`, `f1,s1`) + f(`* | format "foo" as s1 | rm s1`, `*`, `s1`) + f(`* | format "foo" as s1 | rm f2`, `*`, `f2,s1`) + f(`* | format "foo" as s1 | rm f1`, `*`, `s1`) + f(`* | format "foo" as s1 | rm s1`, `*`, `s1`) + f(`* | format "foo" as s1 | rm f1`, `*`, `f1`) + f(`* | format "foo" as s1 | rm s1`, `*`, `s1`) + + f(`* | format "foo" if (f1:x) as s1 | rm s1`, `*`, `s1`) + f(`* | format "foo" if (f1:x) as s1 | rm f1`, `*`, `s1`) + f(`* | format "foo" if (f1:x) as s1 | rm f2`, `*`, `f2,s1`) + f(`* | extract from s1 "x"`, `*`, `f1,f2`) f(`* | extract from s1 "x" if (f3:foo)`, `*`, `f1,f2`) f(`* | extract from s1 "x" if (f1:foo)`, `*`, `f2`) @@ -1641,6 +1673,29 @@ func TestQueryGetNeededColumns(t *testing.T) { f(`* | extract from s1 "x" | rm foo,f1,f2`, `*`, `f1,f2,foo,s1`) f(`* | extract from s1 "x" if (x:bar) | rm foo,f1,f2`, `*`, `f1,f2,foo,s1`) + f(`* | extract from s1 "xy"`, `*`, ``) + f(`* | extract from s1 "xy" if (x:foo)`, `*`, ``) + f(`* | extract from s1 "xy" if (s1:foo)`, `*`, ``) + f(`* | extract from s1 "xy" if (s1:foo)`, `*`, `f1`) + + f(`* | extract from s1 "xy" | fields s2`, `s2`, ``) + f(`* | extract from s1 "xy" | fields s1`, `s1`, ``) + f(`* | extract from s1 "xy" if (x:foo) | fields s1`, `s1,x`, ``) + f(`* | extract from s1 "xy" if (x:foo) | fields s2`, `s2`, ``) + f(`* | extract from s1 "xy" if (s1:foo) | fields s1`, `s1`, ``) + f(`* | extract from s1 "xy" if (s1:foo) | fields s2`, `s2`, ``) + f(`* | extract from s1 "xy" if (s1:foo) | fields s1`, `s1`, ``) + f(`* | extract from s1 "xy" if (s1:foo) | fields s2`, `s2`, ``) + + f(`* | extract from s1 "xy" | rm s2`, `*`, `s2`) + f(`* | extract from s1 "xy" | rm s1`, `*`, `s1`) + f(`* | extract from s1 "xy" if (x:foo) | rm s1`, `*`, `s1`) + f(`* | extract from s1 "xy" if (x:foo) | rm s2`, `*`, `s2`) + f(`* | extract from s1 "xy" if (s1:foo) | rm s1`, `*`, `s1`) + f(`* | extract from s1 "xy" if (s1:foo) | rm s2`, `*`, `s2`) + f(`* | extract from s1 "xy" if (s1:foo) | rm s1`, `*`, `f1`) + f(`* | extract from s1 "xy" if (s1:foo) | rm s2`, `*`, `f1,s2`) + f(`* | unpack_json`, `*`, ``) f(`* | unpack_json from s1`, `*`, ``) f(`* | unpack_json from s1 | fields f1`, `f1,s1`, ``) diff --git a/lib/logstorage/pattern.go b/lib/logstorage/pattern.go index 0ae0ef286..e195a998d 100644 --- a/lib/logstorage/pattern.go +++ b/lib/logstorage/pattern.go @@ -163,7 +163,7 @@ func parsePatternSteps(s string) ([]patternStep, error) { n := strings.IndexByte(s, '<') if n < 0 { steps = append(steps, patternStep{ - prefix: s, + prefix: html.UnescapeString(s), }) return steps, nil } diff --git a/lib/logstorage/pattern_test.go b/lib/logstorage/pattern_test.go index d7fe960b1..a17ba3fa0 100644 --- a/lib/logstorage/pattern_test.go +++ b/lib/logstorage/pattern_test.go @@ -191,6 +191,11 @@ func TestParsePatternStepsSuccess(t *testing.T) { prefix: "baz", }, }) + f("<&>", []patternStep{ + { + prefix: "<&>", + }, + }) f("<&gt;", []patternStep{ { prefix: "<", diff --git a/lib/logstorage/pipe.go b/lib/logstorage/pipe.go index 3e9eb9a58..ccc4dbeff 100644 --- a/lib/logstorage/pipe.go +++ b/lib/logstorage/pipe.go @@ -105,7 +105,7 @@ func parsePipe(lex *lexer) (pipe, error) { return nil, fmt.Errorf("cannot parse 'field_names' pipe: %w", err) } return pf, nil - case lex.isKeyword("fields"): + case lex.isKeyword("fields", "keep"): pf, err := parsePipeFields(lex) if err != nil { return nil, fmt.Errorf("cannot parse 'fields' pipe: %w", err) @@ -117,6 +117,12 @@ func parsePipe(lex *lexer) (pipe, error) { return nil, fmt.Errorf("cannot parse 'filter' pipe: %w", err) } return pf, nil + case lex.isKeyword("format"): + pf, err := parsePipeFormat(lex) + if err != nil { + return nil, fmt.Errorf("cannot parse 'format' pipe: %w", err) + } + return pf, nil case lex.isKeyword("limit", "head"): pl, err := parsePipeLimit(lex) if err != nil { diff --git a/lib/logstorage/pipe_extract.go b/lib/logstorage/pipe_extract.go index 0ffe7bd9e..b7097c78a 100644 --- a/lib/logstorage/pipe_extract.go +++ b/lib/logstorage/pipe_extract.go @@ -42,10 +42,10 @@ func (pe *pipeExtract) updateNeededFields(neededFields, unneededFields fieldsSet } } if needFromField { + unneededFields.remove(pe.fromField) if pe.iff != nil { unneededFields.removeFields(pe.iff.neededFields) } - unneededFields.remove(pe.fromField) } else { unneededFields.add(pe.fromField) } @@ -59,10 +59,10 @@ func (pe *pipeExtract) updateNeededFields(neededFields, unneededFields fieldsSet } } if needFromField { + neededFields.add(pe.fromField) if pe.iff != nil { neededFields.addFields(pe.iff.neededFields) } - neededFields.add(pe.fromField) } } } diff --git a/lib/logstorage/pipe_fields.go b/lib/logstorage/pipe_fields.go index 07298c6dd..a391cbd0e 100644 --- a/lib/logstorage/pipe_fields.go +++ b/lib/logstorage/pipe_fields.go @@ -77,7 +77,7 @@ func (pfp *pipeFieldsProcessor) flush() error { } func parsePipeFields(lex *lexer) (*pipeFields, error) { - if !lex.isKeyword("fields") { + if !lex.isKeyword("fields", "keep") { return nil, fmt.Errorf("expecting 'fields'; got %q", lex.token) } diff --git a/lib/logstorage/pipe_format.go b/lib/logstorage/pipe_format.go index 6840641d3..c71aba4a3 100644 --- a/lib/logstorage/pipe_format.go +++ b/lib/logstorage/pipe_format.go @@ -21,7 +21,7 @@ type pipeFormat struct { } func (pf *pipeFormat) String() string { - s := "format" + quoteTokenIfNeeded(pf.formatStr) + s := "format " + quoteTokenIfNeeded(pf.formatStr) if pf.iff != nil { s += " " + pf.iff.String() } @@ -33,6 +33,9 @@ func (pf *pipeFormat) updateNeededFields(neededFields, unneededFields fieldsSet) if neededFields.contains("*") { if !unneededFields.contains(pf.resultField) { unneededFields.add(pf.resultField) + if pf.iff != nil { + unneededFields.removeFields(pf.iff.neededFields) + } for _, step := range pf.steps { if step.field != "" { unneededFields.remove(step.field) @@ -42,6 +45,9 @@ func (pf *pipeFormat) updateNeededFields(neededFields, unneededFields fieldsSet) } else { if neededFields.contains(pf.resultField) { neededFields.remove(pf.resultField) + if pf.iff != nil { + neededFields.addFields(pf.iff.neededFields) + } for _, step := range pf.steps { if step.field != "" { neededFields.add(step.field) @@ -154,11 +160,21 @@ func parsePipeFormat(lex *lexer) (*pipeFormat, error) { return nil, fmt.Errorf("cannot parse 'pattern' %q: %w", formatStr, err) } + // parse optional if (...) + var iff *ifFilter + if lex.isKeyword("if") { + f, err := parseIfFilter(lex) + if err != nil { + return nil, err + } + iff = f + } + + // parse resultField if !lex.isKeyword("as") { return nil, fmt.Errorf("missing 'as' keyword after 'format %q'", formatStr) } lex.nextToken() - resultField, err := parseFieldName(lex) if err != nil { return nil, fmt.Errorf("cannot parse result field after 'format %q as': %w", formatStr, err) @@ -168,15 +184,7 @@ func parsePipeFormat(lex *lexer) (*pipeFormat, error) { formatStr: formatStr, steps: steps, resultField: resultField, - } - - // parse optional if (...) - if lex.isKeyword("if") { - iff, err := parseIfFilter(lex) - if err != nil { - return nil, err - } - pf.iff = iff + iff: iff, } return pf, nil diff --git a/lib/logstorage/pipe_format_test.go b/lib/logstorage/pipe_format_test.go new file mode 100644 index 000000000..b8e78d666 --- /dev/null +++ b/lib/logstorage/pipe_format_test.go @@ -0,0 +1,187 @@ +package logstorage + +import ( + "testing" +) + +func TestParsePipeFormatSuccess(t *testing.T) { + f := func(pipeStr string) { + t.Helper() + expectParsePipeSuccess(t, pipeStr) + } + + f(`format "" as x`) + f(`format "<>" as x`) + f(`format foo as x`) + f(`format "" as _msg`) + f(`format "bar" as _msg`) + f(`format "barbac" as _msg`) + f(`format "barbac" if (x:y) as _msg`) +} + +func TestParsePipeFormatFailure(t *testing.T) { + f := func(pipeStr string) { + t.Helper() + expectParsePipeFailure(t, pipeStr) + } + + f(`format`) + f(`format foo`) + f(`format foo bar`) + f(`format foo as`) + f(`format foo if`) + f(`format foo as x if (x:y)`) +} + +func TestPipeFormat(t *testing.T) { + f := func(pipeStr string, rows, rowsExpected [][]Field) { + t.Helper() + expectPipeResults(t, pipeStr, rows, rowsExpected) + } + + // plain string into a single field + f(`format foo as x`, [][]Field{ + { + {"_msg", `foobar`}, + {"a", "x"}, + }, + }, [][]Field{ + { + {"_msg", `foobar`}, + {"a", "x"}, + {"x", `foo`}, + }, + }) + + // plain string with html escaping into a single field + f(`format "<foo>" as x`, [][]Field{ + { + {"_msg", `foobar`}, + {"a", "x"}, + }, + }, [][]Field{ + { + {"_msg", `foobar`}, + {"a", "x"}, + {"x", ``}, + }, + }) + + // format with empty placeholders into existing field + f(`format "<_>foo<_>" as _msg`, [][]Field{ + { + {"_msg", `foobar`}, + {"a", "x"}, + }, + }, [][]Field{ + { + {"_msg", `foo`}, + {"a", "x"}, + }, + }) + + // format with various placeholders into new field + f(`format "aaa<_msg>xxx" as x`, [][]Field{ + { + {"_msg", `foobar`}, + {"a", "b"}, + }, + }, [][]Field{ + { + {"_msg", `foobar`}, + {"a", "b"}, + {"x", `aaafoobarxxbx`}, + }, + }) + + // format into existing field + f(`format "aaa<_msg>xxx" as _msg`, [][]Field{ + { + {"_msg", `foobar`}, + {"a", "b"}, + }, + }, [][]Field{ + { + {"_msg", `aaafoobarxxbx`}, + {"a", "b"}, + }, + }) + + // conditional format over multiple rows + f(`format "a: , b: , x: " if (!c:*) as c`, [][]Field{ + { + {"b", "bar"}, + {"a", "foo"}, + {"c", "keep-me"}, + }, + { + {"c", ""}, + {"a", "f"}, + }, + { + {"b", "x"}, + }, + }, [][]Field{ + { + {"b", "bar"}, + {"a", "foo"}, + {"c", "keep-me"}, + }, + { + {"a", "f"}, + {"c", "a: f, b: , x: f"}, + }, + { + {"b", "x"}, + {"c", "a: , b: x, x: "}, + }, + }) +} + +func TestPipeFormatUpdateNeededFields(t *testing.T) { + f := func(s string, neededFields, unneededFields, neededFieldsExpected, unneededFieldsExpected string) { + t.Helper() + expectPipeNeededFields(t, s, neededFields, unneededFields, neededFieldsExpected, unneededFieldsExpected) + } + + // all the needed fields + f(`format "foo" as x`, "*", "", "*", "x") + f(`format "foo" as x`, "*", "", "*", "x") + f(`format "foo" if (f2:z) as x`, "*", "", "*", "x") + + // unneeded fields do not intersect with pattern and output field + f(`format "foo" as x`, "*", "f1,f2", "*", "f1,f2,x") + f(`format "foo" as x`, "*", "f1,f2", "*", "f1,f2,x") + f(`format "foo" if (f4:z) as x`, "*", "f1,f2", "*", "f1,f2,x") + f(`format "foo" if (f1:z) as x`, "*", "f1,f2", "*", "f2,x") + + // unneeded fields intersect with pattern + f(`format "foo" as x`, "*", "f1,f2", "*", "f2,x") + f(`format "foo" if (f4:z) as x`, "*", "f1,f2", "*", "f2,x") + f(`format "foo" if (f2:z) as x`, "*", "f1,f2", "*", "x") + + // unneeded fields intersect with output field + f(`format "foo" as x`, "*", "x,y", "*", "x,y") + f(`format "foo" if (f2:z) as x`, "*", "x,y", "*", "x,y") + f(`format "foo" if (y:z) as x`, "*", "x,y", "*", "x,y") + + // needed fields do not intersect with pattern and output field + f(`format "foo" as f2`, "x,y", "", "x,y", "") + f(`format "foo" if (f3:z) as f2`, "x,y", "", "x,y", "") + f(`format "foo" if (x:z) as f2`, "x,y", "", "x,y", "") + + // needed fields intersect with pattern field + f(`format "foo" as f2`, "f1,y", "", "f1,y", "") + f(`format "foo" if (f3:z) as f2`, "f1,y", "", "f1,y", "") + f(`format "foo" if (x:z) as f2`, "f1,y", "", "f1,y", "") + + // needed fields intersect with output field + f(`format "foo" as f2`, "f2,y", "", "f1,y", "") + f(`format "foo" if (f3:z) as f2`, "f2,y", "", "f1,f3,y", "") + f(`format "foo" if (x:z or y:w) as f2`, "f2,y", "", "f1,x,y", "") + + // needed fields intersect with pattern and output fields + f(`format "foo" as f2`, "f1,f2,y", "", "f1,y", "") + f(`format "foo" if (f3:z) as f2`, "f1,f2,y", "", "f1,f3,y", "") + f(`format "foo" if (x:z or y:w) as f2`, "f1,f2,y", "", "f1,x,y", "") +} diff --git a/lib/logstorage/storage_search.go b/lib/logstorage/storage_search.go index 765444487..69be30c46 100644 --- a/lib/logstorage/storage_search.go +++ b/lib/logstorage/storage_search.go @@ -356,6 +356,10 @@ func hasFilterInWithQueryForPipes(pipes []pipe) bool { return true } } + case *pipeFormat: + if t.iff.hasFilterInWithQuery() { + return true + } case *pipeExtract: if t.iff.hasFilterInWithQuery() { return true @@ -441,6 +445,14 @@ func initFilterInValuesForPipes(cache map[string][]string, pipes []pipe, getFiel byFields: t.byFields, funcs: funcsNew, } + case *pipeFormat: + iffNew, err := t.iff.initFilterInValues(cache, getFieldValuesFunc) + if err != nil { + return nil, err + } + pf := *t + pf.iff = iffNew + pipesNew[i] = &pf case *pipeExtract: iffNew, err := t.iff.initFilterInValues(cache, getFieldValuesFunc) if err != nil {