This commit is contained in:
Aliaksandr Valialkin 2024-05-19 15:11:17 +02:00
parent 38b9213a0c
commit fde91c8f97
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB
4 changed files with 96 additions and 18 deletions

View file

@ -1544,6 +1544,16 @@ func TestQueryGetNeededColumns(t *testing.T) {
f(`* | fields x,y | field_names as bar | fields baz`, `x,y`, ``)
f(`* | rm x,y | field_names as bar | fields baz`, `*`, `x,y`)
f(`* | extract from s1 "<f1>x<f2>"`, `*`, ``)
f(`* | extract from s1 "<f1>x<f2>" | fields foo`, `foo`, ``)
f(`* | extract from s1 "<f1>x<f2>" | fields foo,s1`, `foo,s1`, ``)
f(`* | extract from s1 "<f1>x<f2>" | fields foo,f1`, `foo,s1`, ``)
f(`* | extract from s1 "<f1>x<f2>" | fields foo,f1,f2`, `foo,s1`, ``)
f(`* | extract from s1 "<f1>x<f2>" | rm foo`, `*`, `foo`)
f(`* | extract from s1 "<f1>x<f2>" | rm foo,s1`, `*`, `foo`)
f(`* | extract from s1 "<f1>x<f2>" | rm foo,f1`, `*`, `foo`)
f(`* | extract from s1 "<f1>x<f2>" | rm foo,f1,f2`, `*`, `foo,s1`)
f(`* | rm f1, f2`, `*`, `f1,f2`)
f(`* | rm f1, f2 | mv f2 f3`, `*`, `f1,f2,f3`)
f(`* | rm f1, f2 | cp f2 f3`, `*`, `f1,f2,f3`)

View file

@ -10,14 +10,14 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
)
// pipeExtract processes '| extract (field, format)' pipe.
// pipeExtract processes '| extract from <field> <pattern>' pipe.
//
// See https://docs.victoriametrics.com/victorialogs/logsql/#extract-pipe
type pipeExtract struct {
fromField string
steps []extractFormatStep
format string
pattern string
}
func (pe *pipeExtract) String() string {
@ -25,18 +25,41 @@ func (pe *pipeExtract) String() string {
if !isMsgFieldName(pe.fromField) {
s += " from " + quoteTokenIfNeeded(pe.fromField)
}
s += " " + quoteTokenIfNeeded(pe.format)
s += " " + quoteTokenIfNeeded(pe.pattern)
return s
}
func (pe *pipeExtract) updateNeededFields(neededFields, unneededFields fieldsSet) {
neededFields.add(pe.fromField)
if neededFields.contains("*") {
needFromField := false
for _, step := range pe.steps {
if step.field != "" {
if !unneededFields.contains(step.field) {
needFromField = true
} else {
unneededFields.remove(step.field)
}
}
}
if needFromField {
unneededFields.remove(pe.fromField)
} else {
unneededFields.add(pe.fromField)
}
} else {
needFromField := false
for _, step := range pe.steps {
if step.field != "" {
if neededFields.contains(step.field) {
needFromField = true
neededFields.remove(step.field)
}
}
}
if needFromField {
neededFields.add(pe.fromField)
}
}
}
func (pe *pipeExtract) newPipeProcessor(workersCount int, stopCh <-chan struct{}, _ func(), ppBase pipeProcessor) pipeProcessor {
@ -146,19 +169,19 @@ func parsePipeExtract(lex *lexer) (*pipeExtract, error) {
fromField = f
}
format, err := getCompoundToken(lex)
pattern, err := getCompoundToken(lex)
if err != nil {
return nil, fmt.Errorf("cannot read 'format': %w", err)
return nil, fmt.Errorf("cannot read 'pattern': %w", err)
}
steps, err := parseExtractFormatSteps(format)
steps, err := parseExtractFormatSteps(pattern)
if err != nil {
return nil, fmt.Errorf("cannot parse 'format' %q: %w", format, err)
return nil, fmt.Errorf("cannot parse 'pattern' %q: %w", pattern, err)
}
pe := &pipeExtract{
fromField: fromField,
steps: steps,
format: format,
pattern: pattern,
}
return pe, nil
}

View file

@ -6,10 +6,10 @@ import (
)
func TestExtractFormatApply(t *testing.T) {
f := func(format, s string, resultsExpected []string) {
f := func(pattern, s string, resultsExpected []string) {
t.Helper()
steps, err := parseExtractFormatSteps(format)
steps, err := parseExtractFormatSteps(pattern)
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
@ -45,7 +45,7 @@ func TestExtractFormatApply(t *testing.T) {
f("ip=<ip> <> path=<path> ", "x=a, ip=1.2.3.4 method=GET host='abc' path=/foo/bar some tail here", []string{"1.2.3.4", "/foo/bar"})
// escaped format
// escaped pattern
f("ip=&lt;<ip>&gt;", "foo ip=<1.2.3.4> bar", []string{"1.2.3.4"})
f("ip=&lt;<ip>&gt;", "foo ip=<foo&amp;bar> bar", []string{"foo&amp;bar"})
@ -177,3 +177,48 @@ func TestParseExtractFormatStepFailure(t *testing.T) {
f("<foo")
f("foo<bar")
}
func TestPipeExtractUpdateNeededFields(t *testing.T) {
f := func(s string, neededFields, unneededFields, neededFieldsExpected, unneededFieldsExpected string) {
t.Helper()
nfs := newTestFieldsSet(neededFields)
unfs := newTestFieldsSet(unneededFields)
lex := newLexer(s)
p, err := parsePipeExtract(lex)
if err != nil {
t.Fatalf("cannot parse %s: %s", s, err)
}
p.updateNeededFields(nfs, unfs)
assertNeededFields(t, nfs, unfs, neededFieldsExpected, unneededFieldsExpected)
}
// all the needed fields
f("extract from x '<foo>'", "*", "", "*", "")
// all the needed fields, unneeded fields do not intersect with fromField and output fields
f("extract from x '<foo>'", "*", "f1,f2", "*", "f1,f2")
// all the needed fields, unneeded fields intersect with fromField
f("extract from x '<foo>'", "*", "f2,x", "*", "f2")
// all the needed fields, unneeded fields intersect with output fields
f("extract from x '<foo>x<bar>'", "*", "f2,foo", "*", "f2")
// all the needed fields, unneeded fields intersect with all the output fields
f("extract from x '<foo>x<bar>'", "*", "f2,foo,bar", "*", "f2,x")
// needed fields do not intersect with fromField and output fields
f("extract from x '<foo>x<bar>'", "f1,f2", "", "f1,f2", "")
// needed fields intersect with fromField
f("extract from x '<foo>x<bar>'", "f2,x", "", "f2,x", "")
// needed fields intersect with output fields
f("extract from x '<foo>x<bar>'", "f2,foo", "", "f2,x", "")
// needed fields intersect with fromField and output fields
f("extract from x '<foo>x<bar>'", "f2,foo,x,y", "", "f2,x,y", "")
}

View file

@ -51,8 +51,8 @@ func BenchmarkExtractFormatApply(b *testing.B) {
})
}
func benchmarkExtractFormatApply(b *testing.B, format string, a []string) {
steps, err := parseExtractFormatSteps(format)
func benchmarkExtractFormatApply(b *testing.B, pattern string, a []string) {
steps, err := parseExtractFormatSteps(pattern)
if err != nil {
b.Fatalf("unexpected error: %s", err)
}