From 4e5b24c53a03be95bbaf20323dd64c37ee5f41ca Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Sun, 19 May 2024 13:47:30 +0200 Subject: [PATCH] wip --- lib/logstorage/block_result.go | 9 ++++-- lib/logstorage/parser_test.go | 20 ++++++++++++++ lib/logstorage/pipe.go | 6 ++++ lib/logstorage/pipe_extract.go | 50 ++++++++++++++++++++++++++++++---- 4 files changed, 76 insertions(+), 9 deletions(-) diff --git a/lib/logstorage/block_result.go b/lib/logstorage/block_result.go index 06ac2ee64..b2b34816e 100644 --- a/lib/logstorage/block_result.go +++ b/lib/logstorage/block_result.go @@ -1333,9 +1333,12 @@ func (br *blockResult) getColumnByName(columnName string) *blockResultColumn { if columnName == "" { columnName = "_msg" } - for _, c := range br.getColumns() { - if c.name == columnName { - return c + cs := br.getColumns() + + // Search for the needed column in reverse order, since the old column may be overridden by new column in addResultColumn() + for i := len(cs) - 1; i >= 0; i-- { + if cs[i].name == columnName { + return cs[i] } } diff --git a/lib/logstorage/parser_test.go b/lib/logstorage/parser_test.go index e896e026d..a4a2a170b 100644 --- a/lib/logstorage/parser_test.go +++ b/lib/logstorage/parser_test.go @@ -995,6 +995,13 @@ func TestParseQuerySuccess(t *testing.T) { f(`* | filter error ip:12.3.4.5 or warn`, `* | filter error ip:12.3.4.5 or warn`) f(`foo | stats by (host) count() logs | filter logs:>50 | sort by (logs desc) | limit 10`, `foo | stats by (host) count(*) as logs | filter logs:>50 | sort by (logs desc) | limit 10`) + // extract pipe + f(`* | extract "foobaz"`, `* | extract "foobaz"`) + f(`* | extract from _msg "foobaz"`, `* | extract "foobaz"`) + f(`* | extract from '' 'foobaz'`, `* | extract "foobaz"`) + f("* | extract from x `foobaz`", `* | extract from x "foobaz"`) + f("* | extract from x foobaz", `* | extract from x "foobaz"`) + // multiple different pipes f(`* | fields foo, bar | limit 100 | stats by(foo,bar) count(baz) as qwert`, `* | fields foo, bar | limit 100 | stats by (foo, bar) count(baz) as qwert`) f(`* | skip 100 | head 20 | skip 10`, `* | offset 100 | limit 20 | offset 10`) @@ -1383,6 +1390,19 @@ func TestParseQueryFailure(t *testing.T) { f(`foo | filter | sort by (x)`) f(`foo | filter (`) f(`foo | filter )`) + + // invalid extract pipe + f(`foo | extract`) + f(`foo | extract bar`) + f(`foo | extract "xy"`) + f(`foo | extract "<>"`) + f(`foo | extract "foo<>foo"`) + f(`foo | extract "foo<>foo<_>bar<*>asdf"`) + f(`foo | extract from`) + f(`foo | extract from x`) + f(`foo | extract from x "abc"`) + f(`foo | extract from x "" de`) } func TestQueryGetNeededColumns(t *testing.T) { diff --git a/lib/logstorage/pipe.go b/lib/logstorage/pipe.go index 36adb7b1e..5bd12a99d 100644 --- a/lib/logstorage/pipe.go +++ b/lib/logstorage/pipe.go @@ -83,6 +83,12 @@ func parsePipes(lex *lexer) ([]pipe, error) { return nil, fmt.Errorf("cannot parse 'delete' pipe: %w", err) } pipes = append(pipes, pd) + case lex.isKeyword("extract"): + pe, err := parsePipeExtract(lex) + if err != nil { + return nil, fmt.Errorf("cannot parse 'extract' pipe: %w", err) + } + pipes = append(pipes, pe) case lex.isKeyword("field_names"): pf, err := parsePipeFieldNames(lex) if err != nil { diff --git a/lib/logstorage/pipe_extract.go b/lib/logstorage/pipe_extract.go index 991003b9c..7a8631e8d 100644 --- a/lib/logstorage/pipe_extract.go +++ b/lib/logstorage/pipe_extract.go @@ -14,18 +14,23 @@ import ( // // See https://docs.victoriametrics.com/victorialogs/logsql/#extract-pipe type pipeExtract struct { - field string - steps []extractFormatStep + fromField string + steps []extractFormatStep - stepsStr string + format string } func (pe *pipeExtract) String() string { - return fmt.Sprintf("extract(%s, %s)", quoteTokenIfNeeded(pe.field), pe.stepsStr) + s := "extract" + if !isMsgFieldName(pe.fromField) { + s += " from " + quoteTokenIfNeeded(pe.fromField) + } + s += " " + quoteTokenIfNeeded(pe.format) + return s } func (pe *pipeExtract) updateNeededFields(neededFields, unneededFields fieldsSet) { - neededFields.add(pe.field) + neededFields.add(pe.fromField) for _, step := range pe.steps { if step.field != "" { @@ -87,7 +92,7 @@ func (pep *pipeExtractProcessor) writeBlock(workerID uint, br *blockResult) { } shard := &pep.shards[workerID] - c := br.getColumnByName(pep.pe.field) + c := br.getColumnByName(pep.pe.fromField) values := c.getValues(br) ef := shard.ef @@ -110,6 +115,39 @@ func (pep *pipeExtractProcessor) flush() error { return nil } +func parsePipeExtract(lex *lexer) (*pipeExtract, error) { + if !lex.isKeyword("extract") { + return nil, fmt.Errorf("unexpected token: %q; want %q", lex.token, "extract") + } + lex.nextToken() + + fromField := "_msg" + if lex.isKeyword("from") { + lex.nextToken() + f, err := parseFieldName(lex) + if err != nil { + return nil, fmt.Errorf("cannot parse 'from' field name: %w", err) + } + fromField = f + } + + format, err := getCompoundToken(lex) + if err != nil { + return nil, fmt.Errorf("cannot read 'format': %w", err) + } + steps, err := parseExtractFormatSteps(format) + if err != nil { + return nil, fmt.Errorf("cannot parse 'format' %q: %w", format, err) + } + + pe := &pipeExtract{ + fromField: fromField, + steps: steps, + format: format, + } + return pe, nil +} + type extractFormat struct { // steps contains steps for extracting fields from string steps []extractFormatStep