From 044a3448641fab163ac637df2114c74bcddd7eb8 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Tue, 28 May 2024 19:11:15 +0200 Subject: [PATCH] wip --- docs/VictoriaLogs/CHANGELOG.md | 1 + docs/VictoriaLogs/LogsQL.md | 29 ++ lib/logstorage/pipe.go | 8 +- lib/logstorage/pipe_delete.go | 4 +- lib/logstorage/pipe_extract.go | 20 +- lib/logstorage/pipe_extract_regexp.go | 334 +++++++++++++++++++++ lib/logstorage/pipe_extract_regexp_test.go | 329 ++++++++++++++++++++ lib/logstorage/pipe_extract_test.go | 28 -- lib/logstorage/pipe_math_test.go | 14 + lib/logstorage/pipe_utils_test.go | 28 ++ 10 files changed, 756 insertions(+), 39 deletions(-) create mode 100644 lib/logstorage/pipe_extract_regexp.go create mode 100644 lib/logstorage/pipe_extract_regexp_test.go diff --git a/docs/VictoriaLogs/CHANGELOG.md b/docs/VictoriaLogs/CHANGELOG.md index 615a6a15d..18ce37d44 100644 --- a/docs/VictoriaLogs/CHANGELOG.md +++ b/docs/VictoriaLogs/CHANGELOG.md @@ -19,6 +19,7 @@ according to [these docs](https://docs.victoriametrics.com/victorialogs/quicksta ## tip +* FEATURE: add [`extract_regexp` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#extract_regexp-pipe) for extracting arbitrary substrings from [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) with [RE2 egular expressions](https://github.com/google/re2/wiki/Syntax). * FEATURE: add [`math` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#math-pipe) for mathematical calculations over [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). * FEATURE: add [`field_values` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#field_values-pipe), which returns unique values for the given [log field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). * FEATURE: allow omitting `stats` prefix in [`stats` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#stats-pipe). For example, `_time:5m | count() rows` is a valid query now. It is equivalent to `_time:5m | stats count() as rows`. diff --git a/docs/VictoriaLogs/LogsQL.md b/docs/VictoriaLogs/LogsQL.md index c3e95c7e8..72317d1a4 100644 --- a/docs/VictoriaLogs/LogsQL.md +++ b/docs/VictoriaLogs/LogsQL.md @@ -1152,6 +1152,7 @@ LogsQL supports the following pipes: - [`copy`](#copy-pipe) copies [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). - [`delete`](#delete-pipe) deletes [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). - [`extract`](#extract-pipe) extracts the sepcified text into the given log fields. +- [`extract_regexp`](#extract_regexp-pipe) extracts the sepcified text into the given log fields via [RE2 regular expressions](https://github.com/google/re2/wiki/Syntax). - [`field_names`](#field_names-pipe) returns all the names of [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). - [`field_values`](#field_values-pipe) returns all the values for the given [log field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). - [`fields`](#fields-pipe) selects the given set of [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). @@ -1354,6 +1355,34 @@ For example, the following query is equivalent to the previous one: _time:5m | extract "ip= " keep_original_fields ``` +### extract_regexp pipe + +`| extract_regexp "pattern" from field_name` [pipe](#pipes) extracts substrings from the [`field_name` field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) +according to the provided `pattern`, and stores them into field names according to the named fields inside the `pattern`. +The `pattern` must contain [RE2 regular expression](https://github.com/google/re2/wiki/Syntax) with named fields (aka capturing groups) in the form `(?P...)`. +Matching substrings are stored to the given `capture_field_name` [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). +For example, the following query extracts ipv4 addresses from [`_msg` field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field) +and puts them into `ip` field for logs over the last 5 minutes: + +```logsql +_time:5m | extract_regexp "(?P([0-9]+[.]){3}[0-9]+)" from _msg +``` + +The `from _msg` part can be omitted if the data extraction is performed from the [`_msg` field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field). +So the following query is equivalent to the previous one: + +```logsql +_time:5m | extract_regexp "(?P([0-9]+[.]){3}[0-9]+)" +``` + +Performance tip: it is recommended using [`extract` pipe](#extract-pipe) instead of `extract_regexp` for achieving higher query performance. + +See also: + +- [`extract` pipe](#extract-pipe) +- [`replace_regexp` pipe](#replace_regexp-pipe) +- [`unpack_json` pipe](#unpack_json-pipe) + ### field_names pipe `| field_names` [pipe](#pipes) returns all the names of [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) diff --git a/lib/logstorage/pipe.go b/lib/logstorage/pipe.go index 89780da49..32a72d0dc 100644 --- a/lib/logstorage/pipe.go +++ b/lib/logstorage/pipe.go @@ -100,7 +100,7 @@ func parsePipe(lex *lexer) (pipe, error) { return nil, fmt.Errorf("cannot parse 'copy' pipe: %w", err) } return pc, nil - case lex.isKeyword("delete", "del", "rm"): + case lex.isKeyword("delete", "del", "rm", "drop"): pd, err := parsePipeDelete(lex) if err != nil { return nil, fmt.Errorf("cannot parse 'delete' pipe: %w", err) @@ -112,6 +112,12 @@ func parsePipe(lex *lexer) (pipe, error) { return nil, fmt.Errorf("cannot parse 'extract' pipe: %w", err) } return pe, nil + case lex.isKeyword("extract_regexp"): + pe, err := parsePipeExtractRegexp(lex) + if err != nil { + return nil, fmt.Errorf("cannot parse 'extract_regexp' pipe: %w", err) + } + return pe, nil case lex.isKeyword("field_names"): pf, err := parsePipeFieldNames(lex) if err != nil { diff --git a/lib/logstorage/pipe_delete.go b/lib/logstorage/pipe_delete.go index 9f2ca5a68..543b769fb 100644 --- a/lib/logstorage/pipe_delete.go +++ b/lib/logstorage/pipe_delete.go @@ -70,8 +70,8 @@ func (pdp *pipeDeleteProcessor) flush() error { } func parsePipeDelete(lex *lexer) (*pipeDelete, error) { - if !lex.isKeyword("delete", "del", "rm") { - return nil, fmt.Errorf("expecting 'delete', 'del' or 'rm'; got %q", lex.token) + if !lex.isKeyword("delete", "del", "rm", "drop") { + return nil, fmt.Errorf("expecting 'delete', 'del', 'rm' or 'drop'; got %q", lex.token) } var fields []string diff --git a/lib/logstorage/pipe_extract.go b/lib/logstorage/pipe_extract.go index e5c950592..a0cfe16f4 100644 --- a/lib/logstorage/pipe_extract.go +++ b/lib/logstorage/pipe_extract.go @@ -64,13 +64,14 @@ func (pe *pipeExtract) updateNeededFields(neededFields, unneededFields fieldsSet unneededFieldsOrig := unneededFields.clone() needFromField := false for _, step := range pe.ptn.steps { - if step.field != "" { - if !unneededFieldsOrig.contains(step.field) { - needFromField = true - } - if !pe.keepOriginalFields && !pe.skipEmptyResults { - unneededFields.add(step.field) - } + if step.field == "" { + continue + } + if !unneededFieldsOrig.contains(step.field) { + needFromField = true + } + if !pe.keepOriginalFields && !pe.skipEmptyResults { + unneededFields.add(step.field) } } if needFromField { @@ -85,7 +86,10 @@ func (pe *pipeExtract) updateNeededFields(neededFields, unneededFields fieldsSet neededFieldsOrig := neededFields.clone() needFromField := false for _, step := range pe.ptn.steps { - if step.field != "" && neededFieldsOrig.contains(step.field) { + if step.field == "" { + continue + } + if neededFieldsOrig.contains(step.field) { needFromField = true if !pe.keepOriginalFields && !pe.skipEmptyResults { neededFields.remove(step.field) diff --git a/lib/logstorage/pipe_extract_regexp.go b/lib/logstorage/pipe_extract_regexp.go new file mode 100644 index 000000000..5a5c4017a --- /dev/null +++ b/lib/logstorage/pipe_extract_regexp.go @@ -0,0 +1,334 @@ +package logstorage + +import ( + "fmt" + "regexp" + "unsafe" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil" +) + +// pipeExtractRegexp processes '| extract_regexp ...' pipe. +// +// See https://docs.victoriametrics.com/victorialogs/logsql/#extract_regexp-pipe +type pipeExtractRegexp struct { + fromField string + + re *regexp.Regexp + reFields []string + + keepOriginalFields bool + skipEmptyResults bool + + // iff is an optional filter for skipping the extract func + iff *ifFilter +} + +func (pe *pipeExtractRegexp) String() string { + s := "extract_regexp" + if pe.iff != nil { + s += " " + pe.iff.String() + } + reStr := pe.re.String() + s += " " + quoteTokenIfNeeded(reStr) + if !isMsgFieldName(pe.fromField) { + s += " from " + quoteTokenIfNeeded(pe.fromField) + } + if pe.keepOriginalFields { + s += " keep_original_fields" + } + if pe.skipEmptyResults { + s += " skip_empty_results" + } + return s +} + +func (pe *pipeExtractRegexp) optimize() { + pe.iff.optimizeFilterIn() +} + +func (pe *pipeExtractRegexp) hasFilterInWithQuery() bool { + return pe.iff.hasFilterInWithQuery() +} + +func (pe *pipeExtractRegexp) initFilterInValues(cache map[string][]string, getFieldValuesFunc getFieldValuesFunc) (pipe, error) { + iffNew, err := pe.iff.initFilterInValues(cache, getFieldValuesFunc) + if err != nil { + return nil, err + } + peNew := *pe + peNew.iff = iffNew + return &peNew, nil +} + +func (pe *pipeExtractRegexp) updateNeededFields(neededFields, unneededFields fieldsSet) { + if neededFields.contains("*") { + unneededFieldsOrig := unneededFields.clone() + needFromField := false + for _, f := range pe.reFields { + if f == "" { + continue + } + if !unneededFieldsOrig.contains(f) { + needFromField = true + } + if !pe.keepOriginalFields && !pe.skipEmptyResults { + unneededFields.add(f) + } + } + if needFromField { + unneededFields.remove(pe.fromField) + if pe.iff != nil { + unneededFields.removeFields(pe.iff.neededFields) + } + } else { + unneededFields.add(pe.fromField) + } + } else { + neededFieldsOrig := neededFields.clone() + needFromField := false + for _, f := range pe.reFields { + if f == "" { + continue + } + if neededFieldsOrig.contains(f) { + needFromField = true + if !pe.keepOriginalFields && !pe.skipEmptyResults { + neededFields.remove(f) + } + } + } + if needFromField { + neededFields.add(pe.fromField) + if pe.iff != nil { + neededFields.addFields(pe.iff.neededFields) + } + } + } +} + +func (pe *pipeExtractRegexp) newPipeProcessor(workersCount int, _ <-chan struct{}, _ func(), ppNext pipeProcessor) pipeProcessor { + return &pipeExtractRegexpProcessor{ + pe: pe, + ppNext: ppNext, + + shards: make([]pipeExtractRegexpProcessorShard, workersCount), + } +} + +type pipeExtractRegexpProcessor struct { + pe *pipeExtractRegexp + ppNext pipeProcessor + + shards []pipeExtractRegexpProcessorShard +} + +type pipeExtractRegexpProcessorShard struct { + pipeExtractRegexpProcessorShardNopad + + // The padding prevents false sharing on widespread platforms with 128 mod (cache line size) = 0 . + _ [128 - unsafe.Sizeof(pipeExtractRegexpProcessorShardNopad{})%128]byte +} + +func (shard *pipeExtractRegexpProcessorShard) apply(re *regexp.Regexp, v string) { + shard.fields = slicesutil.SetLength(shard.fields, len(shard.rcs)) + fields := shard.fields + clear(fields) + + locs := re.FindStringSubmatchIndex(v) + if locs == nil { + return + } + + for i := range fields { + start := locs[2*i] + if start < 0 { + // mismatch + continue + } + end := locs[2*i+1] + fields[i] = v[start:end] + } +} + +type pipeExtractRegexpProcessorShardNopad struct { + bm bitmap + + resultColumns []*blockResultColumn + resultValues []string + + rcs []resultColumn + a arena + + fields []string +} + +func (pep *pipeExtractRegexpProcessor) writeBlock(workerID uint, br *blockResult) { + if len(br.timestamps) == 0 { + return + } + + pe := pep.pe + shard := &pep.shards[workerID] + + bm := &shard.bm + bm.init(len(br.timestamps)) + bm.setBits() + if iff := pe.iff; iff != nil { + iff.f.applyToBlockResult(br, bm) + if bm.isZero() { + pep.ppNext.writeBlock(workerID, br) + return + } + } + + reFields := pe.reFields + + shard.rcs = slicesutil.SetLength(shard.rcs, len(reFields)) + rcs := shard.rcs + for i := range reFields { + rcs[i].name = reFields[i] + } + + c := br.getColumnByName(pe.fromField) + values := c.getValues(br) + + shard.resultColumns = slicesutil.SetLength(shard.resultColumns, len(rcs)) + resultColumns := shard.resultColumns + for i := range resultColumns { + if reFields[i] != "" { + resultColumns[i] = br.getColumnByName(rcs[i].name) + } + } + + shard.resultValues = slicesutil.SetLength(shard.resultValues, len(rcs)) + resultValues := shard.resultValues + + hadUpdates := false + vPrev := "" + for rowIdx, v := range values { + if bm.isSetBit(rowIdx) { + if !hadUpdates || vPrev != v { + vPrev = v + hadUpdates = true + + shard.apply(pe.re, v) + + for i, v := range shard.fields { + if reFields[i] == "" { + continue + } + if v == "" && pe.skipEmptyResults || pe.keepOriginalFields { + c := resultColumns[i] + if vOrig := c.getValueAtRow(br, rowIdx); vOrig != "" { + v = vOrig + } + } else { + v = shard.a.copyString(v) + } + resultValues[i] = v + } + } + } else { + for i, c := range resultColumns { + if reFields[i] != "" { + resultValues[i] = c.getValueAtRow(br, rowIdx) + } + } + } + + for i, v := range resultValues { + if reFields[i] != "" { + rcs[i].addValue(v) + } + } + } + + for i := range rcs { + if reFields[i] != "" { + br.addResultColumn(&rcs[i]) + } + } + pep.ppNext.writeBlock(workerID, br) + + for i := range rcs { + rcs[i].reset() + } + shard.a.reset() +} + +func (pep *pipeExtractRegexpProcessor) flush() error { + return nil +} + +func parsePipeExtractRegexp(lex *lexer) (*pipeExtractRegexp, error) { + if !lex.isKeyword("extract_regexp") { + return nil, fmt.Errorf("unexpected token: %q; want %q", lex.token, "extract_regexp") + } + lex.nextToken() + + // parse optional if (...) + var iff *ifFilter + if lex.isKeyword("if") { + f, err := parseIfFilter(lex) + if err != nil { + return nil, err + } + iff = f + } + + // parse pattern + patternStr, err := getCompoundToken(lex) + if err != nil { + return nil, fmt.Errorf("cannot read 'pattern': %w", err) + } + re, err := regexp.Compile(patternStr) + if err != nil { + return nil, fmt.Errorf("cannot parse 'pattern' %q: %w", patternStr, err) + } + reFields := re.SubexpNames() + + hasNamedFields := false + for _, f := range reFields { + if f != "" { + hasNamedFields = true + break + } + } + if !hasNamedFields { + return nil, fmt.Errorf("the 'pattern' %q must contain at least a single named group in the form (?P...)", patternStr) + } + + // parse optional 'from ...' part + fromField := "_msg" + if lex.isKeyword("from") { + lex.nextToken() + f, err := parseFieldName(lex) + if err != nil { + return nil, fmt.Errorf("cannot parse 'from' field name: %w", err) + } + fromField = f + } + + keepOriginalFields := false + skipEmptyResults := false + switch { + case lex.isKeyword("keep_original_fields"): + lex.nextToken() + keepOriginalFields = true + case lex.isKeyword("skip_empty_results"): + lex.nextToken() + skipEmptyResults = true + } + + pe := &pipeExtractRegexp{ + fromField: fromField, + re: re, + reFields: reFields, + keepOriginalFields: keepOriginalFields, + skipEmptyResults: skipEmptyResults, + iff: iff, + } + + return pe, nil +} diff --git a/lib/logstorage/pipe_extract_regexp_test.go b/lib/logstorage/pipe_extract_regexp_test.go new file mode 100644 index 000000000..e4fc7ced4 --- /dev/null +++ b/lib/logstorage/pipe_extract_regexp_test.go @@ -0,0 +1,329 @@ +package logstorage + +import ( + "testing" +) + +func TestParsePipeExtractRegexpSuccess(t *testing.T) { + f := func(pipeStr string) { + t.Helper() + expectParsePipeSuccess(t, pipeStr) + } + + f(`extract_regexp "foo(?P.*)"`) + f(`extract_regexp "foo(?P.*)" skip_empty_results`) + f(`extract_regexp "foo(?P.*)" keep_original_fields`) + f(`extract_regexp "foo(?P.*)" from x`) + f(`extract_regexp "foo(?P.*)" from x skip_empty_results`) + f(`extract_regexp "foo(?P.*)" from x keep_original_fields`) + f(`extract_regexp if (x:y) "foo(?P.*)" from baz`) + f(`extract_regexp if (x:y) "foo(?P.*)" from baz skip_empty_results`) + f(`extract_regexp if (x:y) "foo(?P.*)" from baz keep_original_fields`) +} + +func TestParsePipeExtractRegexpFailure(t *testing.T) { + f := func(pipeStr string) { + t.Helper() + expectParsePipeFailure(t, pipeStr) + } + + f(`extract_regexp`) + f(`extract_regexp keep_original_fields`) + f(`extract_regexp skip_empty_results`) + f(`extract_regexp from`) + f(`extract_regexp from x`) + f(`extract_regexp from x "y(?P.*)"`) + f(`extract_regexp if (x:y)`) + f(`extract_regexp "a(?P.*)" if (x:y)`) + f(`extract_regexp "a"`) + f(`extract_regexp "(foo)"`) +} + +func TestPipeExtractRegexp(t *testing.T) { + f := func(pipeStr string, rows, rowsExpected [][]Field) { + t.Helper() + expectPipeResults(t, pipeStr, rows, rowsExpected) + } + + // skip empty results + f(`extract_regexp "baz=(?P.*) a=(?P.*)" skip_empty_results`, [][]Field{ + { + {"_msg", `foo=bar baz="x y=z" a=`}, + {"aa", "foobar"}, + {"abc", "ippl"}, + }, + }, [][]Field{ + { + {"_msg", `foo=bar baz="x y=z" a=`}, + {"aa", "foobar"}, + {"abc", `"x y=z"`}, + }, + }) + + // no skip empty results + f(`extract_regexp "baz=(?P.*) a=(?P.*)"`, [][]Field{ + { + {"_msg", `foo=bar baz="x y=z" a=`}, + {"aa", "foobar"}, + {"abc", "ippl"}, + }, + }, [][]Field{ + { + {"_msg", `foo=bar baz="x y=z" a=`}, + {"aa", ""}, + {"abc", `"x y=z"`}, + }, + }) + + // keep original fields + f(`extract_regexp "baz=(?P.*) a=(?P.*)" keep_original_fields`, [][]Field{ + { + {"_msg", `foo=bar baz="x y=z" a=b`}, + {"aa", "foobar"}, + {"abc", ""}, + }, + }, [][]Field{ + { + {"_msg", `foo=bar baz="x y=z" a=b`}, + {"abc", `"x y=z"`}, + {"aa", "foobar"}, + }, + }) + + // no keep original fields + f(`extract_regexp "baz=(?P.*) a=(?P.*)"`, [][]Field{ + { + {"_msg", `foo=bar baz="x y=z" a=b`}, + {"aa", "foobar"}, + {"abc", ""}, + }, + }, [][]Field{ + { + {"_msg", `foo=bar baz="x y=z" a=b`}, + {"abc", `"x y=z"`}, + {"aa", "b"}, + }, + }) + + // single row, extract from _msg + f(`extract_regexp "baz=(?P.*) a=(?P.*)"`, [][]Field{ + { + {"_msg", `foo=bar baz="x y=z" a=b`}, + }, + }, [][]Field{ + { + {"_msg", `foo=bar baz="x y=z" a=b`}, + {"abc", `"x y=z"`}, + {"aa", "b"}, + }, + }) + + // single row, extract from _msg into _msg + f(`extract_regexp "msg=(?P<_msg>.*)"`, [][]Field{ + { + {"_msg", `msg=bar`}, + }, + }, [][]Field{ + { + {"_msg", "bar"}, + }, + }) + + // single row, extract from non-existing field + f(`extract_regexp "foo=(?P.*)" from x`, [][]Field{ + { + {"_msg", `foo=bar`}, + }, + }, [][]Field{ + { + {"_msg", `foo=bar`}, + {"bar", ""}, + }, + }) + + // single row, pattern mismatch + f(`extract_regexp "foo=(?P.*)" from x`, [][]Field{ + { + {"x", `foobar`}, + }, + }, [][]Field{ + { + {"x", `foobar`}, + {"bar", ""}, + }, + }) + + f(`extract_regexp "foo=(?P.*) baz=(?P.*)" from x`, [][]Field{ + { + {"x", `a foo="a\"b\\c" cde baz=aa`}, + }, + }, [][]Field{ + { + {"x", `a foo="a\"b\\c" cde baz=aa`}, + {"bar", `"a\"b\\c" cde`}, + {"xx", "aa"}, + }, + }) + + // single row, overwirte existing column + f(`extract_regexp "foo=(?P.*) baz=(?P.*)" from x`, [][]Field{ + { + {"x", `a foo=cc baz=aa b`}, + {"bar", "abc"}, + }, + }, [][]Field{ + { + {"x", `a foo=cc baz=aa b`}, + {"bar", `cc`}, + {"xx", `aa b`}, + }, + }) + + // single row, if match + f(`extract_regexp if (x:baz) "foo=(?P.*) baz=(?P.*)" from "x"`, [][]Field{ + { + {"x", `a foo=cc baz=aa b`}, + {"bar", "abc"}, + }, + }, [][]Field{ + { + {"x", `a foo=cc baz=aa b`}, + {"bar", `cc`}, + {"xx", `aa b`}, + }, + }) + + // single row, if mismatch + f(`extract_regexp if (bar:"") "foo=(?P.*) baz=(?P.*)" from 'x'`, [][]Field{ + { + {"x", `a foo=cc baz=aa b`}, + {"bar", "abc"}, + }, + }, [][]Field{ + { + {"x", `a foo=cc baz=aa b`}, + {"bar", `abc`}, + }, + }) + + // multiple rows with distinct set of labels + f(`extract_regexp if (!ip:keep) "ip=(?P([0-9]+[.]){3}[0-9]+) "`, [][]Field{ + { + {"foo", "bar"}, + {"_msg", "request from ip=1.2.3.4 xxx"}, + {"f3", "y"}, + }, + { + {"foo", "aaa"}, + {"_msg", "ip=5.4.3.1 abcd"}, + {"ip", "keep"}, + {"a", "b"}, + }, + { + {"foo", "aaa"}, + {"_msg", "ip=34.32.11.94 abcd"}, + {"ip", "ppp"}, + {"a", "b"}, + }, + { + {"foo", "klkfs"}, + {"_msg", "sdfdsfds dsf fd fdsa ip=123 abcd"}, + {"ip", "bbbsd"}, + {"a", "klo2i"}, + }, + }, [][]Field{ + { + {"foo", "bar"}, + {"_msg", "request from ip=1.2.3.4 xxx"}, + {"f3", "y"}, + {"ip", "1.2.3.4"}, + }, + { + {"foo", "aaa"}, + {"_msg", "ip=5.4.3.1 abcd"}, + {"ip", "keep"}, + {"a", "b"}, + }, + { + {"foo", "aaa"}, + {"_msg", "ip=34.32.11.94 abcd"}, + {"ip", "34.32.11.94"}, + {"a", "b"}, + }, + { + {"foo", "klkfs"}, + {"_msg", "sdfdsfds dsf fd fdsa ip=123 abcd"}, + {"ip", ""}, + {"a", "klo2i"}, + }, + }) +} + +func TestPipeExtractRegexpUpdateNeededFields(t *testing.T) { + f := func(s string, neededFields, unneededFields, neededFieldsExpected, unneededFieldsExpected string) { + t.Helper() + expectPipeNeededFields(t, s, neededFields, unneededFields, neededFieldsExpected, unneededFieldsExpected) + } + + // all the needed fields + f("extract_regexp '(?P.*)' from x", "*", "", "*", "foo") + f("extract_regexp if (foo:bar) '(?P.*)' from x", "*", "", "*", "") + f("extract_regexp if (foo:bar) '(?P.*)' from x keep_original_fields", "*", "", "*", "") + f("extract_regexp if (foo:bar) '(?P.*)' from x skip_empty_results", "*", "", "*", "") + + // unneeded fields do not intersect with pattern and output fields + f("extract_regexp '(?P.*)' from x", "*", "f1,f2", "*", "f1,f2,foo") + f("extract_regexp '(?P.*)' from x keep_original_fields", "*", "f1,f2", "*", "f1,f2") + f("extract_regexp '(?P.*)' from x skip_empty_results", "*", "f1,f2", "*", "f1,f2") + f("extract_regexp if (f1:x) '(?P.*)' from x", "*", "f1,f2", "*", "f2,foo") + f("extract_regexp if (f1:x) '(?P.*)' from x keep_original_fields", "*", "f1,f2", "*", "f2") + f("extract_regexp if (f1:x) '(?P.*)' from x skip_empty_results", "*", "f1,f2", "*", "f2") + f("extract_regexp if (foo:bar f1:x) '(?P.*)' from x", "*", "f1,f2", "*", "f2") + + // unneeded fields intersect with pattern + f("extract_regexp '(?P.*)' from x", "*", "f2,x", "*", "f2,foo") + f("extract_regexp '(?P.*)' from x keep_original_fields", "*", "f2,x", "*", "f2") + f("extract_regexp '(?P.*)' from x skip_empty_results", "*", "f2,x", "*", "f2") + f("extract_regexp if (f1:abc) '(?P.*)' from x", "*", "f2,x", "*", "f2,foo") + f("extract_regexp if (f2:abc) '(?P.*)' from x", "*", "f2,x", "*", "foo") + + // unneeded fields intersect with output fields + f("extract_regexp '(?P.*)x(?P.*)' from x", "*", "f2,foo", "*", "bar,f2,foo") + f("extract_regexp '(?P.*)x(?P.*)' from x keep_original_fields", "*", "f2,foo", "*", "f2,foo") + f("extract_regexp '(?P.*)x(?P.*)' from x skip_empty_results", "*", "f2,foo", "*", "f2,foo") + f("extract_regexp if (f1:abc) '(?P.*)x(?P.*)' from x", "*", "f2,foo", "*", "bar,f2,foo") + f("extract_regexp if (f2:abc foo:w) '(?P.*)x(?P.*)' from x", "*", "f2,foo", "*", "bar") + f("extract_regexp if (f2:abc foo:w) '(?P.*)x(?P.*)' from x keep_original_fields", "*", "f2,foo", "*", "") + f("extract_regexp if (f2:abc foo:w) '(?P.*)x(?P.*)' from x skip_empty_results", "*", "f2,foo", "*", "") + + // unneeded fields intersect with all the output fields + f("extract_regexp '(?P.*)x(?P.*)' from x", "*", "f2,foo,bar", "*", "bar,f2,foo,x") + f("extract_regexp if (a:b f2:q x:y foo:w) '(?P.*)x(?P.*)' from x", "*", "f2,foo,bar", "*", "bar,f2,foo,x") + f("extract_regexp if (a:b f2:q x:y foo:w) '(?P.*)x(?P.*)' from x keep_original_fields", "*", "f2,foo,bar", "*", "bar,f2,foo,x") + f("extract_regexp if (a:b f2:q x:y foo:w) '(?P.*)x(?P.*)' from x skip_empty_results", "*", "f2,foo,bar", "*", "bar,f2,foo,x") + + // needed fields do not intersect with pattern and output fields + f("extract_regexp '(?P.*)x(?P.*)' from x", "f1,f2", "", "f1,f2", "") + f("extract_regexp '(?P.*)x(?P.*)' from x keep_original_fields", "f1,f2", "", "f1,f2", "") + f("extract_regexp '(?P.*)x(?P.*)' from x skip_empty_results", "f1,f2", "", "f1,f2", "") + f("extract_regexp if (a:b) '(?P.*)x(?P.*)' from x", "f1,f2", "", "f1,f2", "") + f("extract_regexp if (f1:b) '(?P.*)x(?P.*)' from x", "f1,f2", "", "f1,f2", "") + + // needed fields intersect with pattern field + f("extract_regexp '(?P.*)x(?P.*)' from x", "f2,x", "", "f2,x", "") + f("extract_regexp '(?P.*)x(?P.*)' from x keep_original_fields", "f2,x", "", "f2,x", "") + f("extract_regexp '(?P.*)x(?P.*)' from x skip_empty_results", "f2,x", "", "f2,x", "") + f("extract_regexp if (a:b) '(?P.*)x(?P.*)' from x", "f2,x", "", "f2,x", "") + + // needed fields intersect with output fields + f("extract_regexp '(?P.*)x(?P.*)' from x", "f2,foo", "", "f2,x", "") + f("extract_regexp '(?P.*)x(?P.*)' from x keep_original_fields", "f2,foo", "", "foo,f2,x", "") + f("extract_regexp '(?P.*)x(?P.*)' from x skip_empty_results", "f2,foo", "", "foo,f2,x", "") + f("extract_regexp if (a:b) '(?P.*)x(?P.*)' from x", "f2,foo", "", "a,f2,x", "") + + // needed fields intersect with pattern and output fields + f("extract_regexp '(?P.*)x(?P.*)' from x", "f2,foo,x,y", "", "f2,x,y", "") + f("extract_regexp '(?P.*)x(?P.*)' from x keep_original_fields", "f2,foo,x,y", "", "foo,f2,x,y", "") + f("extract_regexp '(?P.*)x(?P.*)' from x skip_empty_results", "f2,foo,x,y", "", "foo,f2,x,y", "") + f("extract_regexp if (a:b foo:q) '(?P.*)x(?P.*)' from x", "f2,foo,x,y", "", "a,f2,foo,x,y", "") +} diff --git a/lib/logstorage/pipe_extract_test.go b/lib/logstorage/pipe_extract_test.go index 08c94c520..740420513 100644 --- a/lib/logstorage/pipe_extract_test.go +++ b/lib/logstorage/pipe_extract_test.go @@ -353,31 +353,3 @@ func TestPipeExtractUpdateNeededFields(t *testing.T) { f("extract 'x' from x skip_empty_results", "f2,foo,x,y", "", "foo,f2,x,y", "") f("extract if (a:b foo:q) 'x' from x", "f2,foo,x,y", "", "a,f2,foo,x,y", "") } - -func expectParsePipeFailure(t *testing.T, pipeStr string) { - t.Helper() - - lex := newLexer(pipeStr) - p, err := parsePipe(lex) - if err == nil && lex.isEnd() { - t.Fatalf("expecting error when parsing [%s]; parsed result: [%s]", pipeStr, p) - } -} - -func expectParsePipeSuccess(t *testing.T, pipeStr string) { - t.Helper() - - lex := newLexer(pipeStr) - p, err := parsePipe(lex) - if err != nil { - t.Fatalf("cannot parse [%s]: %s", pipeStr, err) - } - if !lex.isEnd() { - t.Fatalf("unexpected tail after parsing [%s]: [%s]", pipeStr, lex.s) - } - - pipeStrResult := p.String() - if pipeStrResult != pipeStr { - t.Fatalf("unexpected string representation of pipe; got\n%s\nwant\n%s", pipeStrResult, pipeStr) - } -} diff --git a/lib/logstorage/pipe_math_test.go b/lib/logstorage/pipe_math_test.go index 1500e33e6..66a9da24f 100644 --- a/lib/logstorage/pipe_math_test.go +++ b/lib/logstorage/pipe_math_test.go @@ -49,6 +49,20 @@ func TestPipeMath(t *testing.T) { expectPipeResults(t, pipeStr, rows, rowsExpected) } + f("math b+1 as a, a*2 as b, b-10.5+c as c", [][]Field{ + { + {"a", "v1"}, + {"b", "2"}, + {"c", "3"}, + }, + }, [][]Field{ + { + {"a", "3"}, + {"b", "6"}, + {"c", "-1.5"}, + }, + }) + f("math 1 as a", [][]Field{ { {"a", "v1"}, diff --git a/lib/logstorage/pipe_utils_test.go b/lib/logstorage/pipe_utils_test.go index af6c3b9fd..418f235bc 100644 --- a/lib/logstorage/pipe_utils_test.go +++ b/lib/logstorage/pipe_utils_test.go @@ -8,6 +8,34 @@ import ( "testing" ) +func expectParsePipeFailure(t *testing.T, pipeStr string) { + t.Helper() + + lex := newLexer(pipeStr) + p, err := parsePipe(lex) + if err == nil && lex.isEnd() { + t.Fatalf("expecting error when parsing [%s]; parsed result: [%s]", pipeStr, p) + } +} + +func expectParsePipeSuccess(t *testing.T, pipeStr string) { + t.Helper() + + lex := newLexer(pipeStr) + p, err := parsePipe(lex) + if err != nil { + t.Fatalf("cannot parse [%s]: %s", pipeStr, err) + } + if !lex.isEnd() { + t.Fatalf("unexpected tail after parsing [%s]: [%s]", pipeStr, lex.s) + } + + pipeStrResult := p.String() + if pipeStrResult != pipeStr { + t.Fatalf("unexpected string representation of pipe; got\n%s\nwant\n%s", pipeStrResult, pipeStr) + } +} + func expectPipeResults(t *testing.T, pipeStr string, rows, rowsExpected [][]Field) { t.Helper()