This commit is contained in:
Aliaksandr Valialkin 2024-05-22 15:29:18 +02:00
parent 6458b5c138
commit 93a645dcfc
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB
11 changed files with 297 additions and 16 deletions

View file

@ -1220,6 +1220,12 @@ and [`_msg`](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-
_time:5m | fields host, _msg
```
`keep` can be used instead of `fields` for convenience. For example, the following query is equivalent to the previous one:
```logsql
_time:5m | keep host, _msg
```
See also:
- [`copy` pipe](#copy-pipe)

View file

@ -327,6 +327,8 @@ func (q *Query) Optimize() {
for _, f := range t.funcs {
f.iff.optimizeFilterIn()
}
case *pipeFormat:
t.iff.optimizeFilterIn()
case *pipeExtract:
t.iff.optimizeFilterIn()
case *pipeUnpackJSON:

View file

@ -1621,6 +1621,38 @@ func TestQueryGetNeededColumns(t *testing.T) {
f(`* | fields x,y | field_names as bar | fields baz`, `x,y`, ``)
f(`* | rm x,y | field_names as bar | fields baz`, `*`, `x,y`)
f(`* | format "foo" as s1`, `*`, `s1`)
f(`* | format "foo<f1>" as s1`, `*`, `s1`)
f(`* | format "foo<s1>" as s1`, `*`, ``)
f(`* | format "foo" if (x1:y) as s1`, `*`, `s1`)
f(`* | format "foo<f1>" if (x1:y) as s1`, `*`, `s1`)
f(`* | format "foo<f1>" if (s1:y) as s1`, `*`, ``)
f(`* | format "foo<s1>" if (x1:y) as s1`, `*`, ``)
f(`* | format "foo" as s1 | fields f1`, `f1`, ``)
f(`* | format "foo" as s1 | fields s1`, ``, ``)
f(`* | format "foo<f1>" as s1 | fields f2`, `f2`, ``)
f(`* | format "foo<f1>" as s1 | fields f1`, `f1`, ``)
f(`* | format "foo<f1>" as s1 | fields s1`, `f1`, ``)
f(`* | format "foo<s1>" as s1 | fields f1`, `f1`, ``)
f(`* | format "foo<s1>" as s1 | fields s1`, `s1`, ``)
f(`* | format "foo" if (f1:x) as s1 | fields s1`, `f1`, ``)
f(`* | format "foo" if (f1:x) as s1 | fields s2`, `s2`, ``)
f(`* | format "foo" as s1 | rm f1`, `*`, `f1,s1`)
f(`* | format "foo" as s1 | rm s1`, `*`, `s1`)
f(`* | format "foo<f1>" as s1 | rm f2`, `*`, `f2,s1`)
f(`* | format "foo<f1>" as s1 | rm f1`, `*`, `s1`)
f(`* | format "foo<f1>" as s1 | rm s1`, `*`, `s1`)
f(`* | format "foo<s1>" as s1 | rm f1`, `*`, `f1`)
f(`* | format "foo<s1>" as s1 | rm s1`, `*`, `s1`)
f(`* | format "foo" if (f1:x) as s1 | rm s1`, `*`, `s1`)
f(`* | format "foo" if (f1:x) as s1 | rm f1`, `*`, `s1`)
f(`* | format "foo" if (f1:x) as s1 | rm f2`, `*`, `f2,s1`)
f(`* | extract from s1 "<f1>x<f2>"`, `*`, `f1,f2`)
f(`* | extract from s1 "<f1>x<f2>" if (f3:foo)`, `*`, `f1,f2`)
f(`* | extract from s1 "<f1>x<f2>" if (f1:foo)`, `*`, `f2`)
@ -1641,6 +1673,29 @@ func TestQueryGetNeededColumns(t *testing.T) {
f(`* | extract from s1 "<f1>x<f2>" | rm foo,f1,f2`, `*`, `f1,f2,foo,s1`)
f(`* | extract from s1 "<f1>x<f2>" if (x:bar) | rm foo,f1,f2`, `*`, `f1,f2,foo,s1`)
f(`* | extract from s1 "x<s1>y"`, `*`, ``)
f(`* | extract from s1 "x<s1>y" if (x:foo)`, `*`, ``)
f(`* | extract from s1 "x<s1>y" if (s1:foo)`, `*`, ``)
f(`* | extract from s1 "x<f1>y" if (s1:foo)`, `*`, `f1`)
f(`* | extract from s1 "x<s1>y" | fields s2`, `s2`, ``)
f(`* | extract from s1 "x<s1>y" | fields s1`, `s1`, ``)
f(`* | extract from s1 "x<s1>y" if (x:foo) | fields s1`, `s1,x`, ``)
f(`* | extract from s1 "x<s1>y" if (x:foo) | fields s2`, `s2`, ``)
f(`* | extract from s1 "x<s1>y" if (s1:foo) | fields s1`, `s1`, ``)
f(`* | extract from s1 "x<s1>y" if (s1:foo) | fields s2`, `s2`, ``)
f(`* | extract from s1 "x<f1>y" if (s1:foo) | fields s1`, `s1`, ``)
f(`* | extract from s1 "x<f1>y" if (s1:foo) | fields s2`, `s2`, ``)
f(`* | extract from s1 "x<s1>y" | rm s2`, `*`, `s2`)
f(`* | extract from s1 "x<s1>y" | rm s1`, `*`, `s1`)
f(`* | extract from s1 "x<s1>y" if (x:foo) | rm s1`, `*`, `s1`)
f(`* | extract from s1 "x<s1>y" if (x:foo) | rm s2`, `*`, `s2`)
f(`* | extract from s1 "x<s1>y" if (s1:foo) | rm s1`, `*`, `s1`)
f(`* | extract from s1 "x<s1>y" if (s1:foo) | rm s2`, `*`, `s2`)
f(`* | extract from s1 "x<f1>y" if (s1:foo) | rm s1`, `*`, `f1`)
f(`* | extract from s1 "x<f1>y" if (s1:foo) | rm s2`, `*`, `f1,s2`)
f(`* | unpack_json`, `*`, ``)
f(`* | unpack_json from s1`, `*`, ``)
f(`* | unpack_json from s1 | fields f1`, `f1,s1`, ``)

View file

@ -163,7 +163,7 @@ func parsePatternSteps(s string) ([]patternStep, error) {
n := strings.IndexByte(s, '<')
if n < 0 {
steps = append(steps, patternStep{
prefix: s,
prefix: html.UnescapeString(s),
})
return steps, nil
}

View file

@ -191,6 +191,11 @@ func TestParsePatternStepsSuccess(t *testing.T) {
prefix: "baz",
},
})
f("&lt;&amp;&gt;", []patternStep{
{
prefix: "<&>",
},
})
f("&lt;<foo>&amp;gt;", []patternStep{
{
prefix: "<",

View file

@ -105,7 +105,7 @@ func parsePipe(lex *lexer) (pipe, error) {
return nil, fmt.Errorf("cannot parse 'field_names' pipe: %w", err)
}
return pf, nil
case lex.isKeyword("fields"):
case lex.isKeyword("fields", "keep"):
pf, err := parsePipeFields(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse 'fields' pipe: %w", err)
@ -117,6 +117,12 @@ func parsePipe(lex *lexer) (pipe, error) {
return nil, fmt.Errorf("cannot parse 'filter' pipe: %w", err)
}
return pf, nil
case lex.isKeyword("format"):
pf, err := parsePipeFormat(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse 'format' pipe: %w", err)
}
return pf, nil
case lex.isKeyword("limit", "head"):
pl, err := parsePipeLimit(lex)
if err != nil {

View file

@ -42,10 +42,10 @@ func (pe *pipeExtract) updateNeededFields(neededFields, unneededFields fieldsSet
}
}
if needFromField {
unneededFields.remove(pe.fromField)
if pe.iff != nil {
unneededFields.removeFields(pe.iff.neededFields)
}
unneededFields.remove(pe.fromField)
} else {
unneededFields.add(pe.fromField)
}
@ -59,10 +59,10 @@ func (pe *pipeExtract) updateNeededFields(neededFields, unneededFields fieldsSet
}
}
if needFromField {
neededFields.add(pe.fromField)
if pe.iff != nil {
neededFields.addFields(pe.iff.neededFields)
}
neededFields.add(pe.fromField)
}
}
}

View file

@ -77,7 +77,7 @@ func (pfp *pipeFieldsProcessor) flush() error {
}
func parsePipeFields(lex *lexer) (*pipeFields, error) {
if !lex.isKeyword("fields") {
if !lex.isKeyword("fields", "keep") {
return nil, fmt.Errorf("expecting 'fields'; got %q", lex.token)
}

View file

@ -33,6 +33,9 @@ func (pf *pipeFormat) updateNeededFields(neededFields, unneededFields fieldsSet)
if neededFields.contains("*") {
if !unneededFields.contains(pf.resultField) {
unneededFields.add(pf.resultField)
if pf.iff != nil {
unneededFields.removeFields(pf.iff.neededFields)
}
for _, step := range pf.steps {
if step.field != "" {
unneededFields.remove(step.field)
@ -42,6 +45,9 @@ func (pf *pipeFormat) updateNeededFields(neededFields, unneededFields fieldsSet)
} else {
if neededFields.contains(pf.resultField) {
neededFields.remove(pf.resultField)
if pf.iff != nil {
neededFields.addFields(pf.iff.neededFields)
}
for _, step := range pf.steps {
if step.field != "" {
neededFields.add(step.field)
@ -154,11 +160,21 @@ func parsePipeFormat(lex *lexer) (*pipeFormat, error) {
return nil, fmt.Errorf("cannot parse 'pattern' %q: %w", formatStr, err)
}
// parse optional if (...)
var iff *ifFilter
if lex.isKeyword("if") {
f, err := parseIfFilter(lex)
if err != nil {
return nil, err
}
iff = f
}
// parse resultField
if !lex.isKeyword("as") {
return nil, fmt.Errorf("missing 'as' keyword after 'format %q'", formatStr)
}
lex.nextToken()
resultField, err := parseFieldName(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse result field after 'format %q as': %w", formatStr, err)
@ -168,15 +184,7 @@ func parsePipeFormat(lex *lexer) (*pipeFormat, error) {
formatStr: formatStr,
steps: steps,
resultField: resultField,
}
// parse optional if (...)
if lex.isKeyword("if") {
iff, err := parseIfFilter(lex)
if err != nil {
return nil, err
}
pf.iff = iff
iff: iff,
}
return pf, nil

View file

@ -0,0 +1,187 @@
package logstorage
import (
"testing"
)
func TestParsePipeFormatSuccess(t *testing.T) {
f := func(pipeStr string) {
t.Helper()
expectParsePipeSuccess(t, pipeStr)
}
f(`format "" as x`)
f(`format "<>" as x`)
f(`format foo as x`)
f(`format "<foo>" as _msg`)
f(`format "<foo>bar<baz>" as _msg`)
f(`format "bar<baz><xyz>bac" as _msg`)
f(`format "bar<baz><xyz>bac" if (x:y) as _msg`)
}
func TestParsePipeFormatFailure(t *testing.T) {
f := func(pipeStr string) {
t.Helper()
expectParsePipeFailure(t, pipeStr)
}
f(`format`)
f(`format foo`)
f(`format foo bar`)
f(`format foo as`)
f(`format foo if`)
f(`format foo as x if (x:y)`)
}
func TestPipeFormat(t *testing.T) {
f := func(pipeStr string, rows, rowsExpected [][]Field) {
t.Helper()
expectPipeResults(t, pipeStr, rows, rowsExpected)
}
// plain string into a single field
f(`format foo as x`, [][]Field{
{
{"_msg", `foobar`},
{"a", "x"},
},
}, [][]Field{
{
{"_msg", `foobar`},
{"a", "x"},
{"x", `foo`},
},
})
// plain string with html escaping into a single field
f(`format "&lt;foo&gt;" as x`, [][]Field{
{
{"_msg", `foobar`},
{"a", "x"},
},
}, [][]Field{
{
{"_msg", `foobar`},
{"a", "x"},
{"x", `<foo>`},
},
})
// format with empty placeholders into existing field
f(`format "<_>foo<_>" as _msg`, [][]Field{
{
{"_msg", `foobar`},
{"a", "x"},
},
}, [][]Field{
{
{"_msg", `foo`},
{"a", "x"},
},
})
// format with various placeholders into new field
f(`format "a<foo>aa<_msg>xx<a>x" as x`, [][]Field{
{
{"_msg", `foobar`},
{"a", "b"},
},
}, [][]Field{
{
{"_msg", `foobar`},
{"a", "b"},
{"x", `aaafoobarxxbx`},
},
})
// format into existing field
f(`format "a<foo>aa<_msg>xx<a>x" as _msg`, [][]Field{
{
{"_msg", `foobar`},
{"a", "b"},
},
}, [][]Field{
{
{"_msg", `aaafoobarxxbx`},
{"a", "b"},
},
})
// conditional format over multiple rows
f(`format "a: <a>, b: <b>, x: <a>" if (!c:*) as c`, [][]Field{
{
{"b", "bar"},
{"a", "foo"},
{"c", "keep-me"},
},
{
{"c", ""},
{"a", "f"},
},
{
{"b", "x"},
},
}, [][]Field{
{
{"b", "bar"},
{"a", "foo"},
{"c", "keep-me"},
},
{
{"a", "f"},
{"c", "a: f, b: , x: f"},
},
{
{"b", "x"},
{"c", "a: , b: x, x: "},
},
})
}
func TestPipeFormatUpdateNeededFields(t *testing.T) {
f := func(s string, neededFields, unneededFields, neededFieldsExpected, unneededFieldsExpected string) {
t.Helper()
expectPipeNeededFields(t, s, neededFields, unneededFields, neededFieldsExpected, unneededFieldsExpected)
}
// all the needed fields
f(`format "foo" as x`, "*", "", "*", "x")
f(`format "<f1>foo" as x`, "*", "", "*", "x")
f(`format "<f1>foo" if (f2:z) as x`, "*", "", "*", "x")
// unneeded fields do not intersect with pattern and output field
f(`format "foo" as x`, "*", "f1,f2", "*", "f1,f2,x")
f(`format "<f3>foo" as x`, "*", "f1,f2", "*", "f1,f2,x")
f(`format "<f3>foo" if (f4:z) as x`, "*", "f1,f2", "*", "f1,f2,x")
f(`format "<f3>foo" if (f1:z) as x`, "*", "f1,f2", "*", "f2,x")
// unneeded fields intersect with pattern
f(`format "<f1>foo" as x`, "*", "f1,f2", "*", "f2,x")
f(`format "<f1>foo" if (f4:z) as x`, "*", "f1,f2", "*", "f2,x")
f(`format "<f1>foo" if (f2:z) as x`, "*", "f1,f2", "*", "x")
// unneeded fields intersect with output field
f(`format "<f1>foo" as x`, "*", "x,y", "*", "x,y")
f(`format "<f1>foo" if (f2:z) as x`, "*", "x,y", "*", "x,y")
f(`format "<f1>foo" if (y:z) as x`, "*", "x,y", "*", "x,y")
// needed fields do not intersect with pattern and output field
f(`format "<f1>foo" as f2`, "x,y", "", "x,y", "")
f(`format "<f1>foo" if (f3:z) as f2`, "x,y", "", "x,y", "")
f(`format "<f1>foo" if (x:z) as f2`, "x,y", "", "x,y", "")
// needed fields intersect with pattern field
f(`format "<f1>foo" as f2`, "f1,y", "", "f1,y", "")
f(`format "<f1>foo" if (f3:z) as f2`, "f1,y", "", "f1,y", "")
f(`format "<f1>foo" if (x:z) as f2`, "f1,y", "", "f1,y", "")
// needed fields intersect with output field
f(`format "<f1>foo" as f2`, "f2,y", "", "f1,y", "")
f(`format "<f1>foo" if (f3:z) as f2`, "f2,y", "", "f1,f3,y", "")
f(`format "<f1>foo" if (x:z or y:w) as f2`, "f2,y", "", "f1,x,y", "")
// needed fields intersect with pattern and output fields
f(`format "<f1>foo" as f2`, "f1,f2,y", "", "f1,y", "")
f(`format "<f1>foo" if (f3:z) as f2`, "f1,f2,y", "", "f1,f3,y", "")
f(`format "<f1>foo" if (x:z or y:w) as f2`, "f1,f2,y", "", "f1,x,y", "")
}

View file

@ -356,6 +356,10 @@ func hasFilterInWithQueryForPipes(pipes []pipe) bool {
return true
}
}
case *pipeFormat:
if t.iff.hasFilterInWithQuery() {
return true
}
case *pipeExtract:
if t.iff.hasFilterInWithQuery() {
return true
@ -441,6 +445,14 @@ func initFilterInValuesForPipes(cache map[string][]string, pipes []pipe, getFiel
byFields: t.byFields,
funcs: funcsNew,
}
case *pipeFormat:
iffNew, err := t.iff.initFilterInValues(cache, getFieldValuesFunc)
if err != nil {
return nil, err
}
pf := *t
pf.iff = iffNew
pipesNew[i] = &pf
case *pipeExtract:
iffNew, err := t.iff.initFilterInValues(cache, getFieldValuesFunc)
if err != nil {