From a3032067bd60cca1c15cf4fa4a764bd415de2316 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Fri, 24 May 2024 18:31:49 +0200 Subject: [PATCH] wip --- docs/VictoriaLogs/CHANGELOG.md | 2 + docs/VictoriaLogs/LogsQL.md | 37 +++++++++++- lib/logstorage/pipe_extract.go | 32 +++++++--- lib/logstorage/pipe_extract_test.go | 45 ++++++++++++++ lib/logstorage/pipe_format.go | 30 +++++++--- lib/logstorage/pipe_format_test.go | 71 +++++++++++++++++++++++ lib/logstorage/pipe_unpack.go | 34 +++++++---- lib/logstorage/pipe_unpack_json.go | 34 ++++++++--- lib/logstorage/pipe_unpack_json_test.go | 52 +++++++++++++++++ lib/logstorage/pipe_unpack_logfmt.go | 24 ++++++-- lib/logstorage/pipe_unpack_logfmt_test.go | 29 ++++++++- 11 files changed, 348 insertions(+), 42 deletions(-) diff --git a/docs/VictoriaLogs/CHANGELOG.md b/docs/VictoriaLogs/CHANGELOG.md index 2258d921c..df69bc6c8 100644 --- a/docs/VictoriaLogs/CHANGELOG.md +++ b/docs/VictoriaLogs/CHANGELOG.md @@ -19,6 +19,8 @@ according to [these docs](https://docs.victoriametrics.com/VictoriaLogs/QuickSta ## tip +* FEATURE: add an ability to preserve the original non-empty field values when performing [`extract`](https://docs.victoriametrics.com/victorialogs/logsql/#extract-pipe), [`unpack_json`](https://docs.victoriametrics.com/victorialogs/logsql/#unpack_json-pipe), [`unpack_logfmt`](https://docs.victoriametrics.com/victorialogs/logsql/#unpack_logfmt-pipe) and [`format`](https://docs.victoriametrics.com/victorialogs/logsql/#format-pipe) pipes. + ## [v0.10.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.10.0-victorialogs) Released at 2024-05-24 diff --git a/docs/VictoriaLogs/LogsQL.md b/docs/VictoriaLogs/LogsQL.md index 6a5e3ff70..944d1abc7 100644 --- a/docs/VictoriaLogs/LogsQL.md +++ b/docs/VictoriaLogs/LogsQL.md @@ -1162,6 +1162,13 @@ For example, the following query extracts `ip` from the corresponding JSON field _time:5m | extract '"ip":""' ``` +Add `keep_original_fields` to the end of `extract ...` when the original non-empty values of the named fields mentioned in the pattern must be preserved +instead of overwriting it with the extracted values. For example, the following query extracts `` only if the original value for `ip` field is missing or is empty: + +```logsql +_time:5m | extract 'ip= ' keep_original_fields +``` + See also: - [Format for extract pipe pattern](#format-for-extract-pipe-pattern) @@ -1244,6 +1251,13 @@ if the input [log entry](https://docs.victoriametrics.com/VictoriaLogs/keyConcep _time:5m | extract if (ip:"") "ip= " ``` +An alternative approach is to add `keep_original_fields` to the end of `extract`, in order to keep the original non-empty values for the extracted fields. +For example, the following query is equivalent to the previous one: + +```logsql +_time:5m | extract "ip= " keep_original_fields +``` + ### field_names pipe `| field_names` [pipe](#pipes) returns all the names of [log fields](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#data-model) @@ -1302,7 +1316,7 @@ See also: ### format pipe `| format "pattern" as result_field` [pipe](#pipe) combines [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) -according to the `pattern` and stores it to the `result_field`. All the other fields remain unchanged after the `| format ...` pipe. +according to the `pattern` and stores it to the `result_field`. For example, the following query stores `request from :` text into [`_msg` field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field), by substituting `` and `` with the corresponding [log field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) values: @@ -1326,6 +1340,13 @@ and stores it into `my_json` output field: _time:5m | format '{"_msg":,"stacktrace":}' as my_json ``` +Add `keep_original_fields` to the end of `format ... as result_field` when the original non-empty value of the `result_field` must be preserved +instead of overwriting it with the `format` results. For example, the following query adds formatted result to `foo` field only if it was missing or empty: + +```logsql +_time:5m | format 'some_text' as foo keep_original_fields +``` + See also: - [Conditional format](#conditional-format) @@ -1692,6 +1713,13 @@ fields from JSON value stored in `my_json` [log field](https://docs.victoriametr _time:5m | unpack_json from my_json fields (foo, bar) ``` +If it is needed to preserve the original non-empty values of the unpacked fields, then add `keep_original_fields` to the end of `unpack_json ...`. For example, +the following query preserves the original non-empty values for `ip` and `host` fields instead of overwriting it with the unpacked values: + +```logsql +_time:5m | unpack_json from foo fields (ip, host) keep_original_fields +``` + Performance tip: if you need extracting a single field from long JSON, it is faster to use [`extract` pipe](#extract-pipe). For example, the following query extracts `"ip"` field from JSON stored in [`_msg` field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field) at the maximum speed: @@ -1754,6 +1782,13 @@ from logfmt stored in the `my_logfmt` field: _time:5m | unpack_logfmt from my_logfmt fields (foo, bar) ``` +If it is needed to preserve the original non-empty values of the unpacked fields, then add `keep_original_fields` to the end of `unpack_logfmt ...`. For example, +the following query preserves the original non-empty values for `ip` and `host` fields instead of overwriting it with the unpacked values: + +```logsql +_time:5m | unpack_logfmt from foo fields (ip, host) keep_original_fields +``` + Performance tip: if you need extracting a single field from long [logfmt](https://brandur.org/logfmt) line, it is faster to use [`extract` pipe](#extract-pipe). For example, the following query extracts `"ip"` field from [logfmt](https://brandur.org/logfmt) line stored in [`_msg` field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field): diff --git a/lib/logstorage/pipe_extract.go b/lib/logstorage/pipe_extract.go index b9b4c3ae1..6ba89d800 100644 --- a/lib/logstorage/pipe_extract.go +++ b/lib/logstorage/pipe_extract.go @@ -9,10 +9,12 @@ import ( // See https://docs.victoriametrics.com/victorialogs/logsql/#extract-pipe type pipeExtract struct { fromField string - ptn *pattern + ptn *pattern patternStr string + keepOriginalFields bool + // iff is an optional filter for skipping the extract func iff *ifFilter } @@ -26,6 +28,9 @@ func (pe *pipeExtract) String() string { if !isMsgFieldName(pe.fromField) { s += " from " + quoteTokenIfNeeded(pe.fromField) } + if pe.keepOriginalFields { + s += " keep_original_fields" + } return s } @@ -38,7 +43,9 @@ func (pe *pipeExtract) updateNeededFields(neededFields, unneededFields fieldsSet if !unneededFieldsOrig.contains(step.field) { needFromField = true } - unneededFields.add(step.field) + if !pe.keepOriginalFields { + unneededFields.add(step.field) + } } } if needFromField { @@ -55,7 +62,9 @@ func (pe *pipeExtract) updateNeededFields(neededFields, unneededFields fieldsSet for _, step := range pe.ptn.steps { if step.field != "" && neededFieldsOrig.contains(step.field) { needFromField = true - neededFields.remove(step.field) + if !pe.keepOriginalFields { + neededFields.remove(step.field) + } } } if needFromField { @@ -81,7 +90,7 @@ func (pe *pipeExtract) newPipeProcessor(workersCount int, _ <-chan struct{}, _ f } } - return newPipeUnpackProcessor(workersCount, unpackFunc, ppBase, pe.fromField, "", pe.iff) + return newPipeUnpackProcessor(workersCount, unpackFunc, ppBase, pe.fromField, "", pe.keepOriginalFields, pe.iff) } func parsePipeExtract(lex *lexer) (*pipeExtract, error) { @@ -121,11 +130,18 @@ func parsePipeExtract(lex *lexer) (*pipeExtract, error) { fromField = f } + keepOriginalFields := false + if lex.isKeyword("keep_original_fields") { + lex.nextToken() + keepOriginalFields = true + } + pe := &pipeExtract{ - fromField: fromField, - ptn: ptn, - patternStr: patternStr, - iff: iff, + fromField: fromField, + ptn: ptn, + patternStr: patternStr, + keepOriginalFields: keepOriginalFields, + iff: iff, } return pe, nil diff --git a/lib/logstorage/pipe_extract_test.go b/lib/logstorage/pipe_extract_test.go index a80803a28..661bca7b3 100644 --- a/lib/logstorage/pipe_extract_test.go +++ b/lib/logstorage/pipe_extract_test.go @@ -11,8 +11,11 @@ func TestParsePipeExtractSuccess(t *testing.T) { } f(`extract "foo"`) + f(`extract "foo" keep_original_fields`) f(`extract "foo" from x`) + f(`extract "foo" from x keep_original_fields`) f(`extract if (x:y) "foo" from baz`) + f(`extract if (x:y) "foo" from baz keep_original_fields`) } func TestParsePipeExtractFailure(t *testing.T) { @@ -22,6 +25,7 @@ func TestParsePipeExtractFailure(t *testing.T) { } f(`extract`) + f(`extract keep_original_fields`) f(`extract from`) f(`extract from x`) f(`extract from x "y"`) @@ -38,6 +42,36 @@ func TestPipeExtract(t *testing.T) { expectPipeResults(t, pipeStr, rows, rowsExpected) } + // keep original fields + f(`extract "baz= a=" keep_original_fields`, [][]Field{ + { + {"_msg", `foo=bar baz="x y=z" a=b`}, + {"aa", "foobar"}, + {"abc", ""}, + }, + }, [][]Field{ + { + {"_msg", `foo=bar baz="x y=z" a=b`}, + {"abc", "x y=z"}, + {"aa", "foobar"}, + }, + }) + + // no keep original fields + f(`extract "baz= a="`, [][]Field{ + { + {"_msg", `foo=bar baz="x y=z" a=b`}, + {"aa", "foobar"}, + {"abc", ""}, + }, + }, [][]Field{ + { + {"_msg", `foo=bar baz="x y=z" a=b`}, + {"abc", "x y=z"}, + {"aa", "b"}, + }, + }) + // single row, extract from _msg f(`extract "baz= a="`, [][]Field{ { @@ -226,41 +260,52 @@ func TestPipeExtractUpdateNeededFields(t *testing.T) { // all the needed fields f("extract '' from x", "*", "", "*", "foo") f("extract if (foo:bar) '' from x", "*", "", "*", "") + f("extract if (foo:bar) '' from x keep_original_fields", "*", "", "*", "") // unneeded fields do not intersect with pattern and output fields f("extract '' from x", "*", "f1,f2", "*", "f1,f2,foo") + f("extract '' from x keep_original_fields", "*", "f1,f2", "*", "f1,f2") f("extract if (f1:x) '' from x", "*", "f1,f2", "*", "f2,foo") + f("extract if (f1:x) '' from x keep_original_fields", "*", "f1,f2", "*", "f2") f("extract if (foo:bar f1:x) '' from x", "*", "f1,f2", "*", "f2") // unneeded fields intersect with pattern f("extract '' from x", "*", "f2,x", "*", "f2,foo") + f("extract '' from x keep_original_fields", "*", "f2,x", "*", "f2") f("extract if (f1:abc) '' from x", "*", "f2,x", "*", "f2,foo") f("extract if (f2:abc) '' from x", "*", "f2,x", "*", "foo") // unneeded fields intersect with output fields f("extract 'x' from x", "*", "f2,foo", "*", "bar,f2,foo") + f("extract 'x' from x keep_original_fields", "*", "f2,foo", "*", "f2,foo") f("extract if (f1:abc) 'x' from x", "*", "f2,foo", "*", "bar,f2,foo") f("extract if (f2:abc foo:w) 'x' from x", "*", "f2,foo", "*", "bar") + f("extract if (f2:abc foo:w) 'x' from x keep_original_fields", "*", "f2,foo", "*", "") // unneeded fields intersect with all the output fields f("extract 'x' from x", "*", "f2,foo,bar", "*", "bar,f2,foo,x") f("extract if (a:b f2:q x:y foo:w) 'x' from x", "*", "f2,foo,bar", "*", "bar,f2,foo,x") + f("extract if (a:b f2:q x:y foo:w) 'x' from x keep_original_fields", "*", "f2,foo,bar", "*", "bar,f2,foo,x") // needed fields do not intersect with pattern and output fields f("extract 'x' from x", "f1,f2", "", "f1,f2", "") + f("extract 'x' from x keep_original_fields", "f1,f2", "", "f1,f2", "") f("extract if (a:b) 'x' from x", "f1,f2", "", "f1,f2", "") f("extract if (f1:b) 'x' from x", "f1,f2", "", "f1,f2", "") // needed fields intersect with pattern field f("extract 'x' from x", "f2,x", "", "f2,x", "") + f("extract 'x' from x keep_original_fields", "f2,x", "", "f2,x", "") f("extract if (a:b) 'x' from x", "f2,x", "", "f2,x", "") // needed fields intersect with output fields f("extract 'x' from x", "f2,foo", "", "f2,x", "") + f("extract 'x' from x keep_original_fields", "f2,foo", "", "foo,f2,x", "") f("extract if (a:b) 'x' from x", "f2,foo", "", "a,f2,x", "") // needed fields intersect with pattern and output fields f("extract 'x' from x", "f2,foo,x,y", "", "f2,x,y", "") + f("extract 'x' from x keep_original_fields", "f2,foo,x,y", "", "foo,f2,x,y", "") f("extract if (a:b foo:q) 'x' from x", "f2,foo,x,y", "", "a,f2,foo,x,y", "") } diff --git a/lib/logstorage/pipe_format.go b/lib/logstorage/pipe_format.go index 1fe18b5ba..e2cff8eb0 100644 --- a/lib/logstorage/pipe_format.go +++ b/lib/logstorage/pipe_format.go @@ -17,6 +17,8 @@ type pipeFormat struct { resultField string + keepOriginalFields bool + // iff is an optional filter for skipping the format func iff *ifFilter } @@ -30,13 +32,18 @@ func (pf *pipeFormat) String() string { if !isMsgFieldName(pf.resultField) { s += " as " + quoteTokenIfNeeded(pf.resultField) } + if pf.keepOriginalFields { + s += " keep_original_fields" + } return s } func (pf *pipeFormat) updateNeededFields(neededFields, unneededFields fieldsSet) { if neededFields.contains("*") { if !unneededFields.contains(pf.resultField) { - unneededFields.add(pf.resultField) + if !pf.keepOriginalFields { + unneededFields.add(pf.resultField) + } if pf.iff != nil { unneededFields.removeFields(pf.iff.neededFields) } @@ -48,7 +55,9 @@ func (pf *pipeFormat) updateNeededFields(neededFields, unneededFields fieldsSet) } } else { if neededFields.contains(pf.resultField) { - neededFields.remove(pf.resultField) + if !pf.keepOriginalFields { + neededFields.remove(pf.resultField) + } if pf.iff != nil { neededFields.addFields(pf.iff.neededFields) } @@ -97,7 +106,7 @@ func (pfp *pipeFormatProcessor) writeBlock(workerID uint, br *blockResult) { } shard := &pfp.shards[workerID] - shard.wctx.init(workerID, pfp.ppBase, br) + shard.wctx.init(workerID, pfp.ppBase, pfp.pf.keepOriginalFields, br) shard.uctx.init(workerID, "") bm := &shard.bm @@ -189,11 +198,18 @@ func parsePipeFormat(lex *lexer) (*pipeFormat, error) { resultField = field } + keepOriginalFields := false + if lex.isKeyword("keep_original_fields") { + lex.nextToken() + keepOriginalFields = true + } + pf := &pipeFormat{ - formatStr: formatStr, - steps: steps, - resultField: resultField, - iff: iff, + formatStr: formatStr, + steps: steps, + resultField: resultField, + keepOriginalFields: keepOriginalFields, + iff: iff, } return pf, nil diff --git a/lib/logstorage/pipe_format_test.go b/lib/logstorage/pipe_format_test.go index bda91e588..2ee159dfc 100644 --- a/lib/logstorage/pipe_format_test.go +++ b/lib/logstorage/pipe_format_test.go @@ -11,13 +11,17 @@ func TestParsePipeFormatSuccess(t *testing.T) { } f(`format "foo"`) + f(`format "foo" keep_original_fields`) f(`format "" as x`) f(`format "<>" as x`) f(`format foo as x`) + f(`format foo as x keep_original_fields`) f(`format ""`) f(`format "bar"`) f(`format "barbac"`) + f(`format "barbac" keep_original_fields`) f(`format if (x:y) "barbac"`) + f(`format if (x:y) "barbac" keep_original_fields`) } func TestParsePipeFormatFailure(t *testing.T) { @@ -39,6 +43,54 @@ func TestPipeFormat(t *testing.T) { expectPipeResults(t, pipeStr, rows, rowsExpected) } + // no keep_original_fields + f(`format '{"foo":,"bar":""}' as x`, [][]Field{ + { + {"foo", `abc`}, + {"bar", `cde`}, + {"x", "qwe"}, + }, + { + {"foo", `ppp`}, + {"bar", `123`}, + }, + }, [][]Field{ + { + {"foo", `abc`}, + {"bar", `cde`}, + {"x", `{"foo":"abc","bar":"cde"}`}, + }, + { + {"foo", `ppp`}, + {"bar", `123`}, + {"x", `{"foo":"ppp","bar":"123"}`}, + }, + }) + + // keep_original_fields + f(`format '{"foo":,"bar":""}' as x keep_original_fields`, [][]Field{ + { + {"foo", `abc`}, + {"bar", `cde`}, + {"x", "qwe"}, + }, + { + {"foo", `ppp`}, + {"bar", `123`}, + }, + }, [][]Field{ + { + {"foo", `abc`}, + {"bar", `cde`}, + {"x", `qwe`}, + }, + { + {"foo", `ppp`}, + {"bar", `123`}, + {"x", `{"foo":"ppp","bar":"123"}`}, + }, + }) + // plain string into a single field f(`format '{"foo":,"bar":""}' as x`, [][]Field{ { @@ -160,42 +212,61 @@ func TestPipeFormatUpdateNeededFields(t *testing.T) { // all the needed fields f(`format "foo" as x`, "*", "", "*", "x") + f(`format "foo" as x keep_original_fields`, "*", "", "*", "") f(`format "foo" as x`, "*", "", "*", "x") f(`format if (f2:z) "foo" as x`, "*", "", "*", "x") + f(`format if (f2:z) "foo" as x keep_original_fields`, "*", "", "*", "") // unneeded fields do not intersect with pattern and output field f(`format "foo" as x`, "*", "f1,f2", "*", "f1,f2,x") f(`format "foo" as x`, "*", "f1,f2", "*", "f1,f2,x") f(`format if (f4:z) "foo" as x`, "*", "f1,f2", "*", "f1,f2,x") f(`format if (f1:z) "foo" as x`, "*", "f1,f2", "*", "f2,x") + f(`format if (f1:z) "foo" as x keep_original_fields`, "*", "f1,f2", "*", "f2") // unneeded fields intersect with pattern f(`format "foo" as x`, "*", "f1,f2", "*", "f2,x") + f(`format "foo" as x keep_original_fields`, "*", "f1,f2", "*", "f2") f(`format if (f4:z) "foo" as x`, "*", "f1,f2", "*", "f2,x") + f(`format if (f4:z) "foo" as x keep_original_fields`, "*", "f1,f2", "*", "f2") f(`format if (f2:z) "foo" as x`, "*", "f1,f2", "*", "x") + f(`format if (f2:z) "foo" as x keep_original_fields`, "*", "f1,f2", "*", "") // unneeded fields intersect with output field f(`format "foo" as x`, "*", "x,y", "*", "x,y") + f(`format "foo" as x keep_original_fields`, "*", "x,y", "*", "x,y") f(`format if (f2:z) "foo" as x`, "*", "x,y", "*", "x,y") + f(`format if (f2:z) "foo" as x keep_original_fields`, "*", "x,y", "*", "x,y") f(`format if (y:z) "foo" as x`, "*", "x,y", "*", "x,y") + f(`format if (y:z) "foo" as x keep_original_fields`, "*", "x,y", "*", "x,y") // needed fields do not intersect with pattern and output field f(`format "foo" as f2`, "x,y", "", "x,y", "") + f(`format "foo" as f2 keep_original_fields`, "x,y", "", "x,y", "") f(`format if (f3:z) "foo" as f2`, "x,y", "", "x,y", "") + f(`format if (f3:z) "foo" as f2 keep_original_fields`, "x,y", "", "x,y", "") f(`format if (x:z) "foo" as f2`, "x,y", "", "x,y", "") + f(`format if (x:z) "foo" as f2 keep_original_fields`, "x,y", "", "x,y", "") // needed fields intersect with pattern field f(`format "foo" as f2`, "f1,y", "", "f1,y", "") + f(`format "foo" as f2 keep_original_fields`, "f1,y", "", "f1,y", "") f(`format if (f3:z) "foo" as f2`, "f1,y", "", "f1,y", "") f(`format if (x:z) "foo" as f2`, "f1,y", "", "f1,y", "") + f(`format if (x:z) "foo" as f2 keep_original_fields`, "f1,y", "", "f1,y", "") // needed fields intersect with output field f(`format "foo" as f2`, "f2,y", "", "f1,y", "") + f(`format "foo" as f2 keep_original_fields`, "f2,y", "", "f1,f2,y", "") f(`format if (f3:z) "foo" as f2`, "f2,y", "", "f1,f3,y", "") f(`format if (x:z or y:w) "foo" as f2`, "f2,y", "", "f1,x,y", "") + f(`format if (x:z or y:w) "foo" as f2 keep_original_fields`, "f2,y", "", "f1,f2,x,y", "") // needed fields intersect with pattern and output fields f(`format "foo" as f2`, "f1,f2,y", "", "f1,y", "") + f(`format "foo" as f2 keep_original_fields`, "f1,f2,y", "", "f1,f2,y", "") f(`format if (f3:z) "foo" as f2`, "f1,f2,y", "", "f1,f3,y", "") + f(`format if (f3:z) "foo" as f2 keep_original_fields`, "f1,f2,y", "", "f1,f2,f3,y", "") f(`format if (x:z or y:w) "foo" as f2`, "f1,f2,y", "", "f1,x,y", "") + f(`format if (x:z or y:w) "foo" as f2 keep_original_fields`, "f1,f2,y", "", "f1,f2,x,y", "") } diff --git a/lib/logstorage/pipe_unpack.go b/lib/logstorage/pipe_unpack.go index 5b4333452..4db27570e 100644 --- a/lib/logstorage/pipe_unpack.go +++ b/lib/logstorage/pipe_unpack.go @@ -54,7 +54,7 @@ func (uctx *fieldsUnpackerContext) addField(name, value string) { } func newPipeUnpackProcessor(workersCount int, unpackFunc func(uctx *fieldsUnpackerContext, s string), ppBase pipeProcessor, - fromField, fieldPrefix string, iff *ifFilter) *pipeUnpackProcessor { + fromField string, fieldPrefix string, keepOriginalFields bool, iff *ifFilter) *pipeUnpackProcessor { return &pipeUnpackProcessor{ unpackFunc: unpackFunc, @@ -62,9 +62,10 @@ func newPipeUnpackProcessor(workersCount int, unpackFunc func(uctx *fieldsUnpack shards: make([]pipeUnpackProcessorShard, workersCount), - fromField: fromField, - fieldPrefix: fieldPrefix, - iff: iff, + fromField: fromField, + fieldPrefix: fieldPrefix, + keepOriginalFields: keepOriginalFields, + iff: iff, } } @@ -74,8 +75,9 @@ type pipeUnpackProcessor struct { shards []pipeUnpackProcessorShard - fromField string - fieldPrefix string + fromField string + fieldPrefix string + keepOriginalFields bool iff *ifFilter } @@ -100,7 +102,7 @@ func (pup *pipeUnpackProcessor) writeBlock(workerID uint, br *blockResult) { } shard := &pup.shards[workerID] - shard.wctx.init(workerID, pup.ppBase, br) + shard.wctx.init(workerID, pup.ppBase, pup.keepOriginalFields, br) shard.uctx.init(workerID, pup.fieldPrefix) bm := &shard.bm @@ -153,8 +155,9 @@ func (pup *pipeUnpackProcessor) flush() error { } type pipeUnpackWriteContext struct { - workerID uint - ppBase pipeProcessor + workerID uint + ppBase pipeProcessor + keepOriginalFields bool brSrc *blockResult csSrc []*blockResultColumn @@ -172,6 +175,7 @@ type pipeUnpackWriteContext struct { func (wctx *pipeUnpackWriteContext) reset() { wctx.workerID = 0 wctx.ppBase = nil + wctx.keepOriginalFields = false wctx.brSrc = nil wctx.csSrc = nil @@ -186,11 +190,12 @@ func (wctx *pipeUnpackWriteContext) reset() { wctx.valuesLen = 0 } -func (wctx *pipeUnpackWriteContext) init(workerID uint, ppBase pipeProcessor, brSrc *blockResult) { +func (wctx *pipeUnpackWriteContext) init(workerID uint, ppBase pipeProcessor, keepOriginalFields bool, brSrc *blockResult) { wctx.reset() wctx.workerID = workerID wctx.ppBase = ppBase + wctx.keepOriginalFields = keepOriginalFields wctx.brSrc = brSrc wctx.csSrc = brSrc.getColumns() @@ -231,6 +236,15 @@ func (wctx *pipeUnpackWriteContext) writeRow(rowIdx int, extraFields []Field) { } for i, f := range extraFields { v := f.Value + if wctx.keepOriginalFields { + idx := getBlockResultColumnIdxByName(csSrc, f.Name) + if idx >= 0 { + vOrig := csSrc[idx].getValueAtRow(brSrc, rowIdx) + if vOrig != "" { + v = vOrig + } + } + } rcs[len(csSrc)+i].addValue(v) wctx.valuesLen += len(v) } diff --git a/lib/logstorage/pipe_unpack_json.go b/lib/logstorage/pipe_unpack_json.go index 790950c45..2883d098f 100644 --- a/lib/logstorage/pipe_unpack_json.go +++ b/lib/logstorage/pipe_unpack_json.go @@ -22,6 +22,8 @@ type pipeUnpackJSON struct { // resultPrefix is prefix to add to unpacked field names resultPrefix string + keepOriginalFields bool + // iff is an optional filter for skipping unpacking json iff *ifFilter } @@ -40,14 +42,17 @@ func (pu *pipeUnpackJSON) String() string { if pu.resultPrefix != "" { s += " result_prefix " + quoteTokenIfNeeded(pu.resultPrefix) } + if pu.keepOriginalFields { + s += " keep_original_fields" + } return s } func (pu *pipeUnpackJSON) updateNeededFields(neededFields, unneededFields fieldsSet) { - updateNeededFieldsForUnpackPipe(pu.fromField, pu.fields, pu.iff, neededFields, unneededFields) + updateNeededFieldsForUnpackPipe(pu.fromField, pu.fields, pu.keepOriginalFields, pu.iff, neededFields, unneededFields) } -func updateNeededFieldsForUnpackPipe(fromField string, outFields []string, iff *ifFilter, neededFields, unneededFields fieldsSet) { +func updateNeededFieldsForUnpackPipe(fromField string, outFields []string, keepOriginalFields bool, iff *ifFilter, neededFields, unneededFields fieldsSet) { if neededFields.contains("*") { unneededFieldsOrig := unneededFields.clone() unneededFieldsCount := 0 @@ -56,7 +61,9 @@ func updateNeededFieldsForUnpackPipe(fromField string, outFields []string, iff * if unneededFieldsOrig.contains(f) { unneededFieldsCount++ } - unneededFields.add(f) + if !keepOriginalFields { + unneededFields.add(f) + } } } if len(outFields) == 0 || unneededFieldsCount < len(outFields) { @@ -74,7 +81,9 @@ func updateNeededFieldsForUnpackPipe(fromField string, outFields []string, iff * if neededFieldsOrig.contains(f) { needFromField = true } - neededFields.remove(f) + if !keepOriginalFields { + neededFields.remove(f) + } } } if needFromField { @@ -121,7 +130,7 @@ func (pu *pipeUnpackJSON) newPipeProcessor(workersCount int, _ <-chan struct{}, } PutJSONParser(p) } - return newPipeUnpackProcessor(workersCount, unpackJSON, ppBase, pu.fromField, pu.resultPrefix, pu.iff) + return newPipeUnpackProcessor(workersCount, unpackJSON, ppBase, pu.fromField, pu.resultPrefix, pu.keepOriginalFields, pu.iff) } func parsePipeUnpackJSON(lex *lexer) (*pipeUnpackJSON, error) { @@ -172,11 +181,18 @@ func parsePipeUnpackJSON(lex *lexer) (*pipeUnpackJSON, error) { resultPrefix = p } + keepOriginalFields := false + if lex.isKeyword("keep_original_fields") { + lex.nextToken() + keepOriginalFields = true + } + pu := &pipeUnpackJSON{ - fromField: fromField, - fields: fields, - resultPrefix: resultPrefix, - iff: iff, + fromField: fromField, + fields: fields, + resultPrefix: resultPrefix, + keepOriginalFields: keepOriginalFields, + iff: iff, } return pu, nil diff --git a/lib/logstorage/pipe_unpack_json_test.go b/lib/logstorage/pipe_unpack_json_test.go index 53650f071..1256d49d5 100644 --- a/lib/logstorage/pipe_unpack_json_test.go +++ b/lib/logstorage/pipe_unpack_json_test.go @@ -15,16 +15,23 @@ func TestParsePipeUnpackJSONSuccess(t *testing.T) { } f(`unpack_json`) + f(`unpack_json keep_original_fields`) f(`unpack_json fields (a)`) f(`unpack_json fields (a, b, c)`) + f(`unpack_json fields (a, b, c) keep_original_fields`) f(`unpack_json if (a:x)`) + f(`unpack_json if (a:x) keep_original_fields`) f(`unpack_json from x`) + f(`unpack_json from x keep_original_fields`) f(`unpack_json from x fields (a, b)`) f(`unpack_json if (a:x) from x fields (a, b)`) + f(`unpack_json if (a:x) from x fields (a, b) keep_original_fields`) f(`unpack_json from x result_prefix abc`) f(`unpack_json if (a:x) from x fields (a, b) result_prefix abc`) + f(`unpack_json if (a:x) from x fields (a, b) result_prefix abc keep_original_fields`) f(`unpack_json result_prefix abc`) f(`unpack_json if (a:x) fields (a, b) result_prefix abc`) + f(`unpack_json if (a:x) fields (a, b) result_prefix abc keep_original_fields`) } func TestParsePipeUnpackJSONFailure(t *testing.T) { @@ -55,6 +62,38 @@ func TestPipeUnpackJSON(t *testing.T) { expectPipeResults(t, pipeStr, rows, rowsExpected) } + // no keep original fields fields + f("unpack_json", [][]Field{ + { + {"_msg", `{"foo":"bar","z":"q","a":"b"}`}, + {"foo", "x"}, + {"a", ""}, + }, + }, [][]Field{ + { + {"_msg", `{"foo":"bar","z":"q","a":"b"}`}, + {"foo", "bar"}, + {"z", "q"}, + {"a", "b"}, + }, + }) + + // keep original fields + f("unpack_json keep_original_fields", [][]Field{ + { + {"_msg", `{"foo":"bar","z":"q","a":"b"}`}, + {"foo", "x"}, + {"a", ""}, + }, + }, [][]Field{ + { + {"_msg", `{"foo":"bar","z":"q","a":"b"}`}, + {"foo", "x"}, + {"z", "q"}, + {"a", "b"}, + }, + }) + // unpack only the requested fields f("unpack_json fields (foo, b)", [][]Field{ { @@ -465,35 +504,48 @@ func TestPipeUnpackJSONUpdateNeededFields(t *testing.T) { // all the needed fields f("unpack_json from x", "*", "", "*", "") + f("unpack_json from x keep_original_fields", "*", "", "*", "") f("unpack_json if (y:z) from x", "*", "", "*", "") f("unpack_json if (y:z) from x fields (a, b)", "*", "", "*", "a,b") + f("unpack_json if (y:z) from x fields (a, b) keep_original_fields", "*", "", "*", "") // all the needed fields, unneeded fields do not intersect with src f("unpack_json from x", "*", "f1,f2", "*", "f1,f2") + f("unpack_json from x keep_original_fields", "*", "f1,f2", "*", "f1,f2") f("unpack_json if (y:z) from x", "*", "f1,f2", "*", "f1,f2") f("unpack_json if (f1:z) from x", "*", "f1,f2", "*", "f2") f("unpack_json if (y:z) from x fields (f3)", "*", "f1,f2", "*", "f1,f2,f3") f("unpack_json if (y:z) from x fields (f1)", "*", "f1,f2", "*", "f1,f2") + f("unpack_json if (y:z) from x fields (f1) keep_original_fields", "*", "f1,f2", "*", "f1,f2") // all the needed fields, unneeded fields intersect with src f("unpack_json from x", "*", "f2,x", "*", "f2") + f("unpack_json from x keep_original_fields", "*", "f2,x", "*", "f2") f("unpack_json if (y:z) from x", "*", "f2,x", "*", "f2") f("unpack_json if (f2:z) from x", "*", "f1,f2,x", "*", "f1") f("unpack_json if (f2:z) from x fields (f3)", "*", "f1,f2,x", "*", "f1,f3") + f("unpack_json if (f2:z) from x fields (f3) keep_original_fields", "*", "f1,f2,x", "*", "f1") // needed fields do not intersect with src f("unpack_json from x", "f1,f2", "", "f1,f2,x", "") + f("unpack_json from x keep_original_fields", "f1,f2", "", "f1,f2,x", "") f("unpack_json if (y:z) from x", "f1,f2", "", "f1,f2,x,y", "") f("unpack_json if (f1:z) from x", "f1,f2", "", "f1,f2,x", "") f("unpack_json if (y:z) from x fields (f3)", "f1,f2", "", "f1,f2", "") + f("unpack_json if (y:z) from x fields (f3) keep_original_fields", "f1,f2", "", "f1,f2", "") f("unpack_json if (y:z) from x fields (f2)", "f1,f2", "", "f1,x,y", "") f("unpack_json if (f2:z) from x fields (f2)", "f1,f2", "", "f1,f2,x", "") + f("unpack_json if (f2:z) from x fields (f2) keep_original_fields", "f1,f2", "", "f1,f2,x", "") // needed fields intersect with src f("unpack_json from x", "f2,x", "", "f2,x", "") + f("unpack_json from x keep_original_fields", "f2,x", "", "f2,x", "") f("unpack_json if (y:z) from x", "f2,x", "", "f2,x,y", "") f("unpack_json if (f2:z y:qwe) from x", "f2,x", "", "f2,x,y", "") f("unpack_json if (y:z) from x fields (f1)", "f2,x", "", "f2,x", "") + f("unpack_json if (y:z) from x fields (f1) keep_original_fields", "f2,x", "", "f2,x", "") f("unpack_json if (y:z) from x fields (f2)", "f2,x", "", "x,y", "") + f("unpack_json if (y:z) from x fields (f2) keep_original_fields", "f2,x", "", "f2,x,y", "") f("unpack_json if (y:z) from x fields (x)", "f2,x", "", "f2,x,y", "") + f("unpack_json if (y:z) from x fields (x) keep_original_fields", "f2,x", "", "f2,x,y", "") } diff --git a/lib/logstorage/pipe_unpack_logfmt.go b/lib/logstorage/pipe_unpack_logfmt.go index bc294a9b5..fb41699a8 100644 --- a/lib/logstorage/pipe_unpack_logfmt.go +++ b/lib/logstorage/pipe_unpack_logfmt.go @@ -20,6 +20,8 @@ type pipeUnpackLogfmt struct { // resultPrefix is prefix to add to unpacked field names resultPrefix string + keepOriginalFields bool + // iff is an optional filter for skipping unpacking logfmt iff *ifFilter } @@ -38,11 +40,14 @@ func (pu *pipeUnpackLogfmt) String() string { if pu.resultPrefix != "" { s += " result_prefix " + quoteTokenIfNeeded(pu.resultPrefix) } + if pu.keepOriginalFields { + s += " keep_original_fields" + } return s } func (pu *pipeUnpackLogfmt) updateNeededFields(neededFields, unneededFields fieldsSet) { - updateNeededFieldsForUnpackPipe(pu.fromField, pu.fields, pu.iff, neededFields, unneededFields) + updateNeededFieldsForUnpackPipe(pu.fromField, pu.fields, pu.keepOriginalFields, pu.iff, neededFields, unneededFields) } func (pu *pipeUnpackLogfmt) newPipeProcessor(workersCount int, _ <-chan struct{}, _ func(), ppBase pipeProcessor) pipeProcessor { @@ -73,7 +78,7 @@ func (pu *pipeUnpackLogfmt) newPipeProcessor(workersCount int, _ <-chan struct{} putLogfmtParser(p) } - return newPipeUnpackProcessor(workersCount, unpackLogfmt, ppBase, pu.fromField, pu.resultPrefix, pu.iff) + return newPipeUnpackProcessor(workersCount, unpackLogfmt, ppBase, pu.fromField, pu.resultPrefix, pu.keepOriginalFields, pu.iff) } @@ -125,11 +130,18 @@ func parsePipeUnpackLogfmt(lex *lexer) (*pipeUnpackLogfmt, error) { resultPrefix = p } + keepOriginalFields := false + if lex.isKeyword("keep_original_fields") { + lex.nextToken() + keepOriginalFields = true + } + pu := &pipeUnpackLogfmt{ - fromField: fromField, - fields: fields, - resultPrefix: resultPrefix, - iff: iff, + fromField: fromField, + fields: fields, + resultPrefix: resultPrefix, + keepOriginalFields: keepOriginalFields, + iff: iff, } return pu, nil diff --git a/lib/logstorage/pipe_unpack_logfmt_test.go b/lib/logstorage/pipe_unpack_logfmt_test.go index a5c7426f8..bbc538fae 100644 --- a/lib/logstorage/pipe_unpack_logfmt_test.go +++ b/lib/logstorage/pipe_unpack_logfmt_test.go @@ -11,19 +11,26 @@ func TestParsePipeUnpackLogfmtSuccess(t *testing.T) { } f(`unpack_logfmt`) + f(`unpack_logfmt keep_original_fields`) f(`unpack_logfmt fields (a, b)`) + f(`unpack_logfmt fields (a, b) keep_original_fields`) f(`unpack_logfmt if (a:x)`) + f(`unpack_logfmt if (a:x) keep_original_fields`) f(`unpack_logfmt if (a:x) fields (a, b)`) f(`unpack_logfmt from x`) + f(`unpack_logfmt from x keep_original_fields`) f(`unpack_logfmt from x fields (a, b)`) + f(`unpack_logfmt from x fields (a, b) keep_original_fields`) f(`unpack_logfmt if (a:x) from x`) f(`unpack_logfmt if (a:x) from x fields (a, b)`) f(`unpack_logfmt from x result_prefix abc`) f(`unpack_logfmt if (a:x) from x result_prefix abc`) f(`unpack_logfmt if (a:x) from x fields (a, b) result_prefix abc`) + f(`unpack_logfmt if (a:x) from x fields (a, b) result_prefix abc keep_original_fields`) f(`unpack_logfmt result_prefix abc`) f(`unpack_logfmt if (a:x) result_prefix abc`) f(`unpack_logfmt if (a:x) fields (a, b) result_prefix abc`) + f(`unpack_logfmt if (a:x) fields (a, b) result_prefix abc keep_original_fields`) } func TestParsePipeUnpackLogfmtFailure(t *testing.T) { @@ -57,6 +64,7 @@ func TestPipeUnpackLogfmt(t *testing.T) { f("unpack_logfmt fields (foo, a, b)", [][]Field{ { {"_msg", `foo=bar baz="x y=z" a=b`}, + {"a", "xxx"}, }, }, [][]Field{ { @@ -67,10 +75,26 @@ func TestPipeUnpackLogfmt(t *testing.T) { }, }) + // keep original fields + f("unpack_logfmt keep_original_fields", [][]Field{ + { + {"_msg", `foo=bar baz="x y=z" a=b`}, + {"baz", "abcdef"}, + }, + }, [][]Field{ + { + {"_msg", `foo=bar baz="x y=z" a=b`}, + {"foo", "bar"}, + {"baz", "abcdef"}, + {"a", "b"}, + }, + }) + // single row, unpack from _msg f("unpack_logfmt", [][]Field{ { {"_msg", `foo=bar baz="x y=z" a=b`}, + {"baz", "abcdef"}, }, }, [][]Field{ { @@ -242,7 +266,10 @@ func TestPipeUnpackLogfmtUpdateNeededFields(t *testing.T) { } // all the needed fields - f("unpack_logfmt from x", "*", "", "*", "") + f("unpack_logfmt", "*", "", "*", "") + f("unpack_logfmt fields (f1, f2)", "*", "", "*", "f1,f2") + f("unpack_logfmt fields (f1, f2) keep_original_fields", "*", "", "*", "") + f("unpack_logfmt keep_original_fields", "*", "", "*", "") f("unpack_logfmt if (y:z) from x", "*", "", "*", "") // all the needed fields, unneeded fields do not intersect with src