This commit is contained in:
Aliaksandr Valialkin 2024-05-19 13:47:30 +02:00
parent 5175d99d49
commit 4e5b24c53a
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB
4 changed files with 76 additions and 9 deletions

View file

@ -1333,9 +1333,12 @@ func (br *blockResult) getColumnByName(columnName string) *blockResultColumn {
if columnName == "" { if columnName == "" {
columnName = "_msg" columnName = "_msg"
} }
for _, c := range br.getColumns() { cs := br.getColumns()
if c.name == columnName {
return c // Search for the needed column in reverse order, since the old column may be overridden by new column in addResultColumn()
for i := len(cs) - 1; i >= 0; i-- {
if cs[i].name == columnName {
return cs[i]
} }
} }

View file

@ -995,6 +995,13 @@ func TestParseQuerySuccess(t *testing.T) {
f(`* | filter error ip:12.3.4.5 or warn`, `* | filter error ip:12.3.4.5 or warn`) f(`* | filter error ip:12.3.4.5 or warn`, `* | filter error ip:12.3.4.5 or warn`)
f(`foo | stats by (host) count() logs | filter logs:>50 | sort by (logs desc) | limit 10`, `foo | stats by (host) count(*) as logs | filter logs:>50 | sort by (logs desc) | limit 10`) f(`foo | stats by (host) count() logs | filter logs:>50 | sort by (logs desc) | limit 10`, `foo | stats by (host) count(*) as logs | filter logs:>50 | sort by (logs desc) | limit 10`)
// extract pipe
f(`* | extract "foo<bar>baz"`, `* | extract "foo<bar>baz"`)
f(`* | extract from _msg "foo<bar>baz"`, `* | extract "foo<bar>baz"`)
f(`* | extract from '' 'foo<bar>baz'`, `* | extract "foo<bar>baz"`)
f("* | extract from x `foo<bar>baz`", `* | extract from x "foo<bar>baz"`)
f("* | extract from x foo<bar>baz", `* | extract from x "foo<bar>baz"`)
// multiple different pipes // multiple different pipes
f(`* | fields foo, bar | limit 100 | stats by(foo,bar) count(baz) as qwert`, `* | fields foo, bar | limit 100 | stats by (foo, bar) count(baz) as qwert`) f(`* | fields foo, bar | limit 100 | stats by(foo,bar) count(baz) as qwert`, `* | fields foo, bar | limit 100 | stats by (foo, bar) count(baz) as qwert`)
f(`* | skip 100 | head 20 | skip 10`, `* | offset 100 | limit 20 | offset 10`) f(`* | skip 100 | head 20 | skip 10`, `* | offset 100 | limit 20 | offset 10`)
@ -1383,6 +1390,19 @@ func TestParseQueryFailure(t *testing.T) {
f(`foo | filter | sort by (x)`) f(`foo | filter | sort by (x)`)
f(`foo | filter (`) f(`foo | filter (`)
f(`foo | filter )`) f(`foo | filter )`)
// invalid extract pipe
f(`foo | extract`)
f(`foo | extract bar`)
f(`foo | extract "xy"`)
f(`foo | extract "<>"`)
f(`foo | extract "foo<>foo"`)
f(`foo | extract "foo<>foo<_>bar<*>asdf"`)
f(`foo | extract from`)
f(`foo | extract from x`)
f(`foo | extract from x "abc"`)
f(`foo | extract from x "<abc`)
f(`foo | extract from x "<abc>" de`)
} }
func TestQueryGetNeededColumns(t *testing.T) { func TestQueryGetNeededColumns(t *testing.T) {

View file

@ -83,6 +83,12 @@ func parsePipes(lex *lexer) ([]pipe, error) {
return nil, fmt.Errorf("cannot parse 'delete' pipe: %w", err) return nil, fmt.Errorf("cannot parse 'delete' pipe: %w", err)
} }
pipes = append(pipes, pd) pipes = append(pipes, pd)
case lex.isKeyword("extract"):
pe, err := parsePipeExtract(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse 'extract' pipe: %w", err)
}
pipes = append(pipes, pe)
case lex.isKeyword("field_names"): case lex.isKeyword("field_names"):
pf, err := parsePipeFieldNames(lex) pf, err := parsePipeFieldNames(lex)
if err != nil { if err != nil {

View file

@ -14,18 +14,23 @@ import (
// //
// See https://docs.victoriametrics.com/victorialogs/logsql/#extract-pipe // See https://docs.victoriametrics.com/victorialogs/logsql/#extract-pipe
type pipeExtract struct { type pipeExtract struct {
field string fromField string
steps []extractFormatStep steps []extractFormatStep
stepsStr string format string
} }
func (pe *pipeExtract) String() string { func (pe *pipeExtract) String() string {
return fmt.Sprintf("extract(%s, %s)", quoteTokenIfNeeded(pe.field), pe.stepsStr) s := "extract"
if !isMsgFieldName(pe.fromField) {
s += " from " + quoteTokenIfNeeded(pe.fromField)
}
s += " " + quoteTokenIfNeeded(pe.format)
return s
} }
func (pe *pipeExtract) updateNeededFields(neededFields, unneededFields fieldsSet) { func (pe *pipeExtract) updateNeededFields(neededFields, unneededFields fieldsSet) {
neededFields.add(pe.field) neededFields.add(pe.fromField)
for _, step := range pe.steps { for _, step := range pe.steps {
if step.field != "" { if step.field != "" {
@ -87,7 +92,7 @@ func (pep *pipeExtractProcessor) writeBlock(workerID uint, br *blockResult) {
} }
shard := &pep.shards[workerID] shard := &pep.shards[workerID]
c := br.getColumnByName(pep.pe.field) c := br.getColumnByName(pep.pe.fromField)
values := c.getValues(br) values := c.getValues(br)
ef := shard.ef ef := shard.ef
@ -110,6 +115,39 @@ func (pep *pipeExtractProcessor) flush() error {
return nil return nil
} }
func parsePipeExtract(lex *lexer) (*pipeExtract, error) {
if !lex.isKeyword("extract") {
return nil, fmt.Errorf("unexpected token: %q; want %q", lex.token, "extract")
}
lex.nextToken()
fromField := "_msg"
if lex.isKeyword("from") {
lex.nextToken()
f, err := parseFieldName(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse 'from' field name: %w", err)
}
fromField = f
}
format, err := getCompoundToken(lex)
if err != nil {
return nil, fmt.Errorf("cannot read 'format': %w", err)
}
steps, err := parseExtractFormatSteps(format)
if err != nil {
return nil, fmt.Errorf("cannot parse 'format' %q: %w", format, err)
}
pe := &pipeExtract{
fromField: fromField,
steps: steps,
format: format,
}
return pe, nil
}
type extractFormat struct { type extractFormat struct {
// steps contains steps for extracting fields from string // steps contains steps for extracting fields from string
steps []extractFormatStep steps []extractFormatStep