This commit is contained in:
Aliaksandr Valialkin 2024-05-24 18:31:49 +02:00
parent 761a110392
commit a3032067bd
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB
11 changed files with 348 additions and 42 deletions

View file

@ -19,6 +19,8 @@ according to [these docs](https://docs.victoriametrics.com/VictoriaLogs/QuickSta
## tip
* FEATURE: add an ability to preserve the original non-empty field values when performing [`extract`](https://docs.victoriametrics.com/victorialogs/logsql/#extract-pipe), [`unpack_json`](https://docs.victoriametrics.com/victorialogs/logsql/#unpack_json-pipe), [`unpack_logfmt`](https://docs.victoriametrics.com/victorialogs/logsql/#unpack_logfmt-pipe) and [`format`](https://docs.victoriametrics.com/victorialogs/logsql/#format-pipe) pipes.
## [v0.10.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.10.0-victorialogs)
Released at 2024-05-24

View file

@ -1162,6 +1162,13 @@ For example, the following query extracts `ip` from the corresponding JSON field
_time:5m | extract '"ip":"<ip>"'
```
Add `keep_original_fields` to the end of `extract ...` when the original non-empty values of the named fields mentioned in the pattern must be preserved
instead of overwriting it with the extracted values. For example, the following query extracts `<ip>` only if the original value for `ip` field is missing or is empty:
```logsql
_time:5m | extract 'ip=<ip> ' keep_original_fields
```
See also:
- [Format for extract pipe pattern](#format-for-extract-pipe-pattern)
@ -1244,6 +1251,13 @@ if the input [log entry](https://docs.victoriametrics.com/VictoriaLogs/keyConcep
_time:5m | extract if (ip:"") "ip=<ip> "
```
An alternative approach is to add `keep_original_fields` to the end of `extract`, in order to keep the original non-empty values for the extracted fields.
For example, the following query is equivalent to the previous one:
```logsql
_time:5m | extract "ip=<ip> " keep_original_fields
```
### field_names pipe
`| field_names` [pipe](#pipes) returns all the names of [log fields](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#data-model)
@ -1302,7 +1316,7 @@ See also:
### format pipe
`| format "pattern" as result_field` [pipe](#pipe) combines [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model)
according to the `pattern` and stores it to the `result_field`. All the other fields remain unchanged after the `| format ...` pipe.
according to the `pattern` and stores it to the `result_field`.
For example, the following query stores `request from <ip>:<port>` text into [`_msg` field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field),
by substituting `<ip>` and `<port>` with the corresponding [log field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) values:
@ -1326,6 +1340,13 @@ and stores it into `my_json` output field:
_time:5m | format '{"_msg":<q:_msg>,"stacktrace":<q:stacktrace>}' as my_json
```
Add `keep_original_fields` to the end of `format ... as result_field` when the original non-empty value of the `result_field` must be preserved
instead of overwriting it with the `format` results. For example, the following query adds formatted result to `foo` field only if it was missing or empty:
```logsql
_time:5m | format 'some_text' as foo keep_original_fields
```
See also:
- [Conditional format](#conditional-format)
@ -1692,6 +1713,13 @@ fields from JSON value stored in `my_json` [log field](https://docs.victoriametr
_time:5m | unpack_json from my_json fields (foo, bar)
```
If it is needed to preserve the original non-empty values of the unpacked fields, then add `keep_original_fields` to the end of `unpack_json ...`. For example,
the following query preserves the original non-empty values for `ip` and `host` fields instead of overwriting it with the unpacked values:
```logsql
_time:5m | unpack_json from foo fields (ip, host) keep_original_fields
```
Performance tip: if you need extracting a single field from long JSON, it is faster to use [`extract` pipe](#extract-pipe). For example, the following query extracts `"ip"` field from JSON
stored in [`_msg` field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field) at the maximum speed:
@ -1754,6 +1782,13 @@ from logfmt stored in the `my_logfmt` field:
_time:5m | unpack_logfmt from my_logfmt fields (foo, bar)
```
If it is needed to preserve the original non-empty values of the unpacked fields, then add `keep_original_fields` to the end of `unpack_logfmt ...`. For example,
the following query preserves the original non-empty values for `ip` and `host` fields instead of overwriting it with the unpacked values:
```logsql
_time:5m | unpack_logfmt from foo fields (ip, host) keep_original_fields
```
Performance tip: if you need extracting a single field from long [logfmt](https://brandur.org/logfmt) line, it is faster to use [`extract` pipe](#extract-pipe).
For example, the following query extracts `"ip"` field from [logfmt](https://brandur.org/logfmt) line stored
in [`_msg` field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field):

View file

@ -9,10 +9,12 @@ import (
// See https://docs.victoriametrics.com/victorialogs/logsql/#extract-pipe
type pipeExtract struct {
fromField string
ptn *pattern
ptn *pattern
patternStr string
keepOriginalFields bool
// iff is an optional filter for skipping the extract func
iff *ifFilter
}
@ -26,6 +28,9 @@ func (pe *pipeExtract) String() string {
if !isMsgFieldName(pe.fromField) {
s += " from " + quoteTokenIfNeeded(pe.fromField)
}
if pe.keepOriginalFields {
s += " keep_original_fields"
}
return s
}
@ -38,7 +43,9 @@ func (pe *pipeExtract) updateNeededFields(neededFields, unneededFields fieldsSet
if !unneededFieldsOrig.contains(step.field) {
needFromField = true
}
unneededFields.add(step.field)
if !pe.keepOriginalFields {
unneededFields.add(step.field)
}
}
}
if needFromField {
@ -55,7 +62,9 @@ func (pe *pipeExtract) updateNeededFields(neededFields, unneededFields fieldsSet
for _, step := range pe.ptn.steps {
if step.field != "" && neededFieldsOrig.contains(step.field) {
needFromField = true
neededFields.remove(step.field)
if !pe.keepOriginalFields {
neededFields.remove(step.field)
}
}
}
if needFromField {
@ -81,7 +90,7 @@ func (pe *pipeExtract) newPipeProcessor(workersCount int, _ <-chan struct{}, _ f
}
}
return newPipeUnpackProcessor(workersCount, unpackFunc, ppBase, pe.fromField, "", pe.iff)
return newPipeUnpackProcessor(workersCount, unpackFunc, ppBase, pe.fromField, "", pe.keepOriginalFields, pe.iff)
}
func parsePipeExtract(lex *lexer) (*pipeExtract, error) {
@ -121,11 +130,18 @@ func parsePipeExtract(lex *lexer) (*pipeExtract, error) {
fromField = f
}
keepOriginalFields := false
if lex.isKeyword("keep_original_fields") {
lex.nextToken()
keepOriginalFields = true
}
pe := &pipeExtract{
fromField: fromField,
ptn: ptn,
patternStr: patternStr,
iff: iff,
fromField: fromField,
ptn: ptn,
patternStr: patternStr,
keepOriginalFields: keepOriginalFields,
iff: iff,
}
return pe, nil

View file

@ -11,8 +11,11 @@ func TestParsePipeExtractSuccess(t *testing.T) {
}
f(`extract "foo<bar>"`)
f(`extract "foo<bar>" keep_original_fields`)
f(`extract "foo<bar>" from x`)
f(`extract "foo<bar>" from x keep_original_fields`)
f(`extract if (x:y) "foo<bar>" from baz`)
f(`extract if (x:y) "foo<bar>" from baz keep_original_fields`)
}
func TestParsePipeExtractFailure(t *testing.T) {
@ -22,6 +25,7 @@ func TestParsePipeExtractFailure(t *testing.T) {
}
f(`extract`)
f(`extract keep_original_fields`)
f(`extract from`)
f(`extract from x`)
f(`extract from x "y<foo>"`)
@ -38,6 +42,36 @@ func TestPipeExtract(t *testing.T) {
expectPipeResults(t, pipeStr, rows, rowsExpected)
}
// keep original fields
f(`extract "baz=<abc> a=<aa>" keep_original_fields`, [][]Field{
{
{"_msg", `foo=bar baz="x y=z" a=b`},
{"aa", "foobar"},
{"abc", ""},
},
}, [][]Field{
{
{"_msg", `foo=bar baz="x y=z" a=b`},
{"abc", "x y=z"},
{"aa", "foobar"},
},
})
// no keep original fields
f(`extract "baz=<abc> a=<aa>"`, [][]Field{
{
{"_msg", `foo=bar baz="x y=z" a=b`},
{"aa", "foobar"},
{"abc", ""},
},
}, [][]Field{
{
{"_msg", `foo=bar baz="x y=z" a=b`},
{"abc", "x y=z"},
{"aa", "b"},
},
})
// single row, extract from _msg
f(`extract "baz=<abc> a=<aa>"`, [][]Field{
{
@ -226,41 +260,52 @@ func TestPipeExtractUpdateNeededFields(t *testing.T) {
// all the needed fields
f("extract '<foo>' from x", "*", "", "*", "foo")
f("extract if (foo:bar) '<foo>' from x", "*", "", "*", "")
f("extract if (foo:bar) '<foo>' from x keep_original_fields", "*", "", "*", "")
// unneeded fields do not intersect with pattern and output fields
f("extract '<foo>' from x", "*", "f1,f2", "*", "f1,f2,foo")
f("extract '<foo>' from x keep_original_fields", "*", "f1,f2", "*", "f1,f2")
f("extract if (f1:x) '<foo>' from x", "*", "f1,f2", "*", "f2,foo")
f("extract if (f1:x) '<foo>' from x keep_original_fields", "*", "f1,f2", "*", "f2")
f("extract if (foo:bar f1:x) '<foo>' from x", "*", "f1,f2", "*", "f2")
// unneeded fields intersect with pattern
f("extract '<foo>' from x", "*", "f2,x", "*", "f2,foo")
f("extract '<foo>' from x keep_original_fields", "*", "f2,x", "*", "f2")
f("extract if (f1:abc) '<foo>' from x", "*", "f2,x", "*", "f2,foo")
f("extract if (f2:abc) '<foo>' from x", "*", "f2,x", "*", "foo")
// unneeded fields intersect with output fields
f("extract '<foo>x<bar>' from x", "*", "f2,foo", "*", "bar,f2,foo")
f("extract '<foo>x<bar>' from x keep_original_fields", "*", "f2,foo", "*", "f2,foo")
f("extract if (f1:abc) '<foo>x<bar>' from x", "*", "f2,foo", "*", "bar,f2,foo")
f("extract if (f2:abc foo:w) '<foo>x<bar>' from x", "*", "f2,foo", "*", "bar")
f("extract if (f2:abc foo:w) '<foo>x<bar>' from x keep_original_fields", "*", "f2,foo", "*", "")
// unneeded fields intersect with all the output fields
f("extract '<foo>x<bar>' from x", "*", "f2,foo,bar", "*", "bar,f2,foo,x")
f("extract if (a:b f2:q x:y foo:w) '<foo>x<bar>' from x", "*", "f2,foo,bar", "*", "bar,f2,foo,x")
f("extract if (a:b f2:q x:y foo:w) '<foo>x<bar>' from x keep_original_fields", "*", "f2,foo,bar", "*", "bar,f2,foo,x")
// needed fields do not intersect with pattern and output fields
f("extract '<foo>x<bar>' from x", "f1,f2", "", "f1,f2", "")
f("extract '<foo>x<bar>' from x keep_original_fields", "f1,f2", "", "f1,f2", "")
f("extract if (a:b) '<foo>x<bar>' from x", "f1,f2", "", "f1,f2", "")
f("extract if (f1:b) '<foo>x<bar>' from x", "f1,f2", "", "f1,f2", "")
// needed fields intersect with pattern field
f("extract '<foo>x<bar>' from x", "f2,x", "", "f2,x", "")
f("extract '<foo>x<bar>' from x keep_original_fields", "f2,x", "", "f2,x", "")
f("extract if (a:b) '<foo>x<bar>' from x", "f2,x", "", "f2,x", "")
// needed fields intersect with output fields
f("extract '<foo>x<bar>' from x", "f2,foo", "", "f2,x", "")
f("extract '<foo>x<bar>' from x keep_original_fields", "f2,foo", "", "foo,f2,x", "")
f("extract if (a:b) '<foo>x<bar>' from x", "f2,foo", "", "a,f2,x", "")
// needed fields intersect with pattern and output fields
f("extract '<foo>x<bar>' from x", "f2,foo,x,y", "", "f2,x,y", "")
f("extract '<foo>x<bar>' from x keep_original_fields", "f2,foo,x,y", "", "foo,f2,x,y", "")
f("extract if (a:b foo:q) '<foo>x<bar>' from x", "f2,foo,x,y", "", "a,f2,foo,x,y", "")
}

View file

@ -17,6 +17,8 @@ type pipeFormat struct {
resultField string
keepOriginalFields bool
// iff is an optional filter for skipping the format func
iff *ifFilter
}
@ -30,13 +32,18 @@ func (pf *pipeFormat) String() string {
if !isMsgFieldName(pf.resultField) {
s += " as " + quoteTokenIfNeeded(pf.resultField)
}
if pf.keepOriginalFields {
s += " keep_original_fields"
}
return s
}
func (pf *pipeFormat) updateNeededFields(neededFields, unneededFields fieldsSet) {
if neededFields.contains("*") {
if !unneededFields.contains(pf.resultField) {
unneededFields.add(pf.resultField)
if !pf.keepOriginalFields {
unneededFields.add(pf.resultField)
}
if pf.iff != nil {
unneededFields.removeFields(pf.iff.neededFields)
}
@ -48,7 +55,9 @@ func (pf *pipeFormat) updateNeededFields(neededFields, unneededFields fieldsSet)
}
} else {
if neededFields.contains(pf.resultField) {
neededFields.remove(pf.resultField)
if !pf.keepOriginalFields {
neededFields.remove(pf.resultField)
}
if pf.iff != nil {
neededFields.addFields(pf.iff.neededFields)
}
@ -97,7 +106,7 @@ func (pfp *pipeFormatProcessor) writeBlock(workerID uint, br *blockResult) {
}
shard := &pfp.shards[workerID]
shard.wctx.init(workerID, pfp.ppBase, br)
shard.wctx.init(workerID, pfp.ppBase, pfp.pf.keepOriginalFields, br)
shard.uctx.init(workerID, "")
bm := &shard.bm
@ -189,11 +198,18 @@ func parsePipeFormat(lex *lexer) (*pipeFormat, error) {
resultField = field
}
keepOriginalFields := false
if lex.isKeyword("keep_original_fields") {
lex.nextToken()
keepOriginalFields = true
}
pf := &pipeFormat{
formatStr: formatStr,
steps: steps,
resultField: resultField,
iff: iff,
formatStr: formatStr,
steps: steps,
resultField: resultField,
keepOriginalFields: keepOriginalFields,
iff: iff,
}
return pf, nil

View file

@ -11,13 +11,17 @@ func TestParsePipeFormatSuccess(t *testing.T) {
}
f(`format "foo<bar>"`)
f(`format "foo<bar>" keep_original_fields`)
f(`format "" as x`)
f(`format "<>" as x`)
f(`format foo as x`)
f(`format foo as x keep_original_fields`)
f(`format "<foo>"`)
f(`format "<foo>bar<baz>"`)
f(`format "bar<baz><xyz>bac"`)
f(`format "bar<baz><xyz>bac" keep_original_fields`)
f(`format if (x:y) "bar<baz><xyz>bac"`)
f(`format if (x:y) "bar<baz><xyz>bac" keep_original_fields`)
}
func TestParsePipeFormatFailure(t *testing.T) {
@ -39,6 +43,54 @@ func TestPipeFormat(t *testing.T) {
expectPipeResults(t, pipeStr, rows, rowsExpected)
}
// no keep_original_fields
f(`format '{"foo":<q:foo>,"bar":"<bar>"}' as x`, [][]Field{
{
{"foo", `abc`},
{"bar", `cde`},
{"x", "qwe"},
},
{
{"foo", `ppp`},
{"bar", `123`},
},
}, [][]Field{
{
{"foo", `abc`},
{"bar", `cde`},
{"x", `{"foo":"abc","bar":"cde"}`},
},
{
{"foo", `ppp`},
{"bar", `123`},
{"x", `{"foo":"ppp","bar":"123"}`},
},
})
// keep_original_fields
f(`format '{"foo":<q:foo>,"bar":"<bar>"}' as x keep_original_fields`, [][]Field{
{
{"foo", `abc`},
{"bar", `cde`},
{"x", "qwe"},
},
{
{"foo", `ppp`},
{"bar", `123`},
},
}, [][]Field{
{
{"foo", `abc`},
{"bar", `cde`},
{"x", `qwe`},
},
{
{"foo", `ppp`},
{"bar", `123`},
{"x", `{"foo":"ppp","bar":"123"}`},
},
})
// plain string into a single field
f(`format '{"foo":<q:foo>,"bar":"<bar>"}' as x`, [][]Field{
{
@ -160,42 +212,61 @@ func TestPipeFormatUpdateNeededFields(t *testing.T) {
// all the needed fields
f(`format "foo" as x`, "*", "", "*", "x")
f(`format "foo" as x keep_original_fields`, "*", "", "*", "")
f(`format "<f1>foo" as x`, "*", "", "*", "x")
f(`format if (f2:z) "<f1>foo" as x`, "*", "", "*", "x")
f(`format if (f2:z) "<f1>foo" as x keep_original_fields`, "*", "", "*", "")
// unneeded fields do not intersect with pattern and output field
f(`format "foo" as x`, "*", "f1,f2", "*", "f1,f2,x")
f(`format "<f3>foo" as x`, "*", "f1,f2", "*", "f1,f2,x")
f(`format if (f4:z) "<f3>foo" as x`, "*", "f1,f2", "*", "f1,f2,x")
f(`format if (f1:z) "<f3>foo" as x`, "*", "f1,f2", "*", "f2,x")
f(`format if (f1:z) "<f3>foo" as x keep_original_fields`, "*", "f1,f2", "*", "f2")
// unneeded fields intersect with pattern
f(`format "<f1>foo" as x`, "*", "f1,f2", "*", "f2,x")
f(`format "<f1>foo" as x keep_original_fields`, "*", "f1,f2", "*", "f2")
f(`format if (f4:z) "<f1>foo" as x`, "*", "f1,f2", "*", "f2,x")
f(`format if (f4:z) "<f1>foo" as x keep_original_fields`, "*", "f1,f2", "*", "f2")
f(`format if (f2:z) "<f1>foo" as x`, "*", "f1,f2", "*", "x")
f(`format if (f2:z) "<f1>foo" as x keep_original_fields`, "*", "f1,f2", "*", "")
// unneeded fields intersect with output field
f(`format "<f1>foo" as x`, "*", "x,y", "*", "x,y")
f(`format "<f1>foo" as x keep_original_fields`, "*", "x,y", "*", "x,y")
f(`format if (f2:z) "<f1>foo" as x`, "*", "x,y", "*", "x,y")
f(`format if (f2:z) "<f1>foo" as x keep_original_fields`, "*", "x,y", "*", "x,y")
f(`format if (y:z) "<f1>foo" as x`, "*", "x,y", "*", "x,y")
f(`format if (y:z) "<f1>foo" as x keep_original_fields`, "*", "x,y", "*", "x,y")
// needed fields do not intersect with pattern and output field
f(`format "<f1>foo" as f2`, "x,y", "", "x,y", "")
f(`format "<f1>foo" as f2 keep_original_fields`, "x,y", "", "x,y", "")
f(`format if (f3:z) "<f1>foo" as f2`, "x,y", "", "x,y", "")
f(`format if (f3:z) "<f1>foo" as f2 keep_original_fields`, "x,y", "", "x,y", "")
f(`format if (x:z) "<f1>foo" as f2`, "x,y", "", "x,y", "")
f(`format if (x:z) "<f1>foo" as f2 keep_original_fields`, "x,y", "", "x,y", "")
// needed fields intersect with pattern field
f(`format "<f1>foo" as f2`, "f1,y", "", "f1,y", "")
f(`format "<f1>foo" as f2 keep_original_fields`, "f1,y", "", "f1,y", "")
f(`format if (f3:z) "<f1>foo" as f2`, "f1,y", "", "f1,y", "")
f(`format if (x:z) "<f1>foo" as f2`, "f1,y", "", "f1,y", "")
f(`format if (x:z) "<f1>foo" as f2 keep_original_fields`, "f1,y", "", "f1,y", "")
// needed fields intersect with output field
f(`format "<f1>foo" as f2`, "f2,y", "", "f1,y", "")
f(`format "<f1>foo" as f2 keep_original_fields`, "f2,y", "", "f1,f2,y", "")
f(`format if (f3:z) "<f1>foo" as f2`, "f2,y", "", "f1,f3,y", "")
f(`format if (x:z or y:w) "<f1>foo" as f2`, "f2,y", "", "f1,x,y", "")
f(`format if (x:z or y:w) "<f1>foo" as f2 keep_original_fields`, "f2,y", "", "f1,f2,x,y", "")
// needed fields intersect with pattern and output fields
f(`format "<f1>foo" as f2`, "f1,f2,y", "", "f1,y", "")
f(`format "<f1>foo" as f2 keep_original_fields`, "f1,f2,y", "", "f1,f2,y", "")
f(`format if (f3:z) "<f1>foo" as f2`, "f1,f2,y", "", "f1,f3,y", "")
f(`format if (f3:z) "<f1>foo" as f2 keep_original_fields`, "f1,f2,y", "", "f1,f2,f3,y", "")
f(`format if (x:z or y:w) "<f1>foo" as f2`, "f1,f2,y", "", "f1,x,y", "")
f(`format if (x:z or y:w) "<f1>foo" as f2 keep_original_fields`, "f1,f2,y", "", "f1,f2,x,y", "")
}

View file

@ -54,7 +54,7 @@ func (uctx *fieldsUnpackerContext) addField(name, value string) {
}
func newPipeUnpackProcessor(workersCount int, unpackFunc func(uctx *fieldsUnpackerContext, s string), ppBase pipeProcessor,
fromField, fieldPrefix string, iff *ifFilter) *pipeUnpackProcessor {
fromField string, fieldPrefix string, keepOriginalFields bool, iff *ifFilter) *pipeUnpackProcessor {
return &pipeUnpackProcessor{
unpackFunc: unpackFunc,
@ -62,9 +62,10 @@ func newPipeUnpackProcessor(workersCount int, unpackFunc func(uctx *fieldsUnpack
shards: make([]pipeUnpackProcessorShard, workersCount),
fromField: fromField,
fieldPrefix: fieldPrefix,
iff: iff,
fromField: fromField,
fieldPrefix: fieldPrefix,
keepOriginalFields: keepOriginalFields,
iff: iff,
}
}
@ -74,8 +75,9 @@ type pipeUnpackProcessor struct {
shards []pipeUnpackProcessorShard
fromField string
fieldPrefix string
fromField string
fieldPrefix string
keepOriginalFields bool
iff *ifFilter
}
@ -100,7 +102,7 @@ func (pup *pipeUnpackProcessor) writeBlock(workerID uint, br *blockResult) {
}
shard := &pup.shards[workerID]
shard.wctx.init(workerID, pup.ppBase, br)
shard.wctx.init(workerID, pup.ppBase, pup.keepOriginalFields, br)
shard.uctx.init(workerID, pup.fieldPrefix)
bm := &shard.bm
@ -153,8 +155,9 @@ func (pup *pipeUnpackProcessor) flush() error {
}
type pipeUnpackWriteContext struct {
workerID uint
ppBase pipeProcessor
workerID uint
ppBase pipeProcessor
keepOriginalFields bool
brSrc *blockResult
csSrc []*blockResultColumn
@ -172,6 +175,7 @@ type pipeUnpackWriteContext struct {
func (wctx *pipeUnpackWriteContext) reset() {
wctx.workerID = 0
wctx.ppBase = nil
wctx.keepOriginalFields = false
wctx.brSrc = nil
wctx.csSrc = nil
@ -186,11 +190,12 @@ func (wctx *pipeUnpackWriteContext) reset() {
wctx.valuesLen = 0
}
func (wctx *pipeUnpackWriteContext) init(workerID uint, ppBase pipeProcessor, brSrc *blockResult) {
func (wctx *pipeUnpackWriteContext) init(workerID uint, ppBase pipeProcessor, keepOriginalFields bool, brSrc *blockResult) {
wctx.reset()
wctx.workerID = workerID
wctx.ppBase = ppBase
wctx.keepOriginalFields = keepOriginalFields
wctx.brSrc = brSrc
wctx.csSrc = brSrc.getColumns()
@ -231,6 +236,15 @@ func (wctx *pipeUnpackWriteContext) writeRow(rowIdx int, extraFields []Field) {
}
for i, f := range extraFields {
v := f.Value
if wctx.keepOriginalFields {
idx := getBlockResultColumnIdxByName(csSrc, f.Name)
if idx >= 0 {
vOrig := csSrc[idx].getValueAtRow(brSrc, rowIdx)
if vOrig != "" {
v = vOrig
}
}
}
rcs[len(csSrc)+i].addValue(v)
wctx.valuesLen += len(v)
}

View file

@ -22,6 +22,8 @@ type pipeUnpackJSON struct {
// resultPrefix is prefix to add to unpacked field names
resultPrefix string
keepOriginalFields bool
// iff is an optional filter for skipping unpacking json
iff *ifFilter
}
@ -40,14 +42,17 @@ func (pu *pipeUnpackJSON) String() string {
if pu.resultPrefix != "" {
s += " result_prefix " + quoteTokenIfNeeded(pu.resultPrefix)
}
if pu.keepOriginalFields {
s += " keep_original_fields"
}
return s
}
func (pu *pipeUnpackJSON) updateNeededFields(neededFields, unneededFields fieldsSet) {
updateNeededFieldsForUnpackPipe(pu.fromField, pu.fields, pu.iff, neededFields, unneededFields)
updateNeededFieldsForUnpackPipe(pu.fromField, pu.fields, pu.keepOriginalFields, pu.iff, neededFields, unneededFields)
}
func updateNeededFieldsForUnpackPipe(fromField string, outFields []string, iff *ifFilter, neededFields, unneededFields fieldsSet) {
func updateNeededFieldsForUnpackPipe(fromField string, outFields []string, keepOriginalFields bool, iff *ifFilter, neededFields, unneededFields fieldsSet) {
if neededFields.contains("*") {
unneededFieldsOrig := unneededFields.clone()
unneededFieldsCount := 0
@ -56,7 +61,9 @@ func updateNeededFieldsForUnpackPipe(fromField string, outFields []string, iff *
if unneededFieldsOrig.contains(f) {
unneededFieldsCount++
}
unneededFields.add(f)
if !keepOriginalFields {
unneededFields.add(f)
}
}
}
if len(outFields) == 0 || unneededFieldsCount < len(outFields) {
@ -74,7 +81,9 @@ func updateNeededFieldsForUnpackPipe(fromField string, outFields []string, iff *
if neededFieldsOrig.contains(f) {
needFromField = true
}
neededFields.remove(f)
if !keepOriginalFields {
neededFields.remove(f)
}
}
}
if needFromField {
@ -121,7 +130,7 @@ func (pu *pipeUnpackJSON) newPipeProcessor(workersCount int, _ <-chan struct{},
}
PutJSONParser(p)
}
return newPipeUnpackProcessor(workersCount, unpackJSON, ppBase, pu.fromField, pu.resultPrefix, pu.iff)
return newPipeUnpackProcessor(workersCount, unpackJSON, ppBase, pu.fromField, pu.resultPrefix, pu.keepOriginalFields, pu.iff)
}
func parsePipeUnpackJSON(lex *lexer) (*pipeUnpackJSON, error) {
@ -172,11 +181,18 @@ func parsePipeUnpackJSON(lex *lexer) (*pipeUnpackJSON, error) {
resultPrefix = p
}
keepOriginalFields := false
if lex.isKeyword("keep_original_fields") {
lex.nextToken()
keepOriginalFields = true
}
pu := &pipeUnpackJSON{
fromField: fromField,
fields: fields,
resultPrefix: resultPrefix,
iff: iff,
fromField: fromField,
fields: fields,
resultPrefix: resultPrefix,
keepOriginalFields: keepOriginalFields,
iff: iff,
}
return pu, nil

View file

@ -15,16 +15,23 @@ func TestParsePipeUnpackJSONSuccess(t *testing.T) {
}
f(`unpack_json`)
f(`unpack_json keep_original_fields`)
f(`unpack_json fields (a)`)
f(`unpack_json fields (a, b, c)`)
f(`unpack_json fields (a, b, c) keep_original_fields`)
f(`unpack_json if (a:x)`)
f(`unpack_json if (a:x) keep_original_fields`)
f(`unpack_json from x`)
f(`unpack_json from x keep_original_fields`)
f(`unpack_json from x fields (a, b)`)
f(`unpack_json if (a:x) from x fields (a, b)`)
f(`unpack_json if (a:x) from x fields (a, b) keep_original_fields`)
f(`unpack_json from x result_prefix abc`)
f(`unpack_json if (a:x) from x fields (a, b) result_prefix abc`)
f(`unpack_json if (a:x) from x fields (a, b) result_prefix abc keep_original_fields`)
f(`unpack_json result_prefix abc`)
f(`unpack_json if (a:x) fields (a, b) result_prefix abc`)
f(`unpack_json if (a:x) fields (a, b) result_prefix abc keep_original_fields`)
}
func TestParsePipeUnpackJSONFailure(t *testing.T) {
@ -55,6 +62,38 @@ func TestPipeUnpackJSON(t *testing.T) {
expectPipeResults(t, pipeStr, rows, rowsExpected)
}
// no keep original fields fields
f("unpack_json", [][]Field{
{
{"_msg", `{"foo":"bar","z":"q","a":"b"}`},
{"foo", "x"},
{"a", ""},
},
}, [][]Field{
{
{"_msg", `{"foo":"bar","z":"q","a":"b"}`},
{"foo", "bar"},
{"z", "q"},
{"a", "b"},
},
})
// keep original fields
f("unpack_json keep_original_fields", [][]Field{
{
{"_msg", `{"foo":"bar","z":"q","a":"b"}`},
{"foo", "x"},
{"a", ""},
},
}, [][]Field{
{
{"_msg", `{"foo":"bar","z":"q","a":"b"}`},
{"foo", "x"},
{"z", "q"},
{"a", "b"},
},
})
// unpack only the requested fields
f("unpack_json fields (foo, b)", [][]Field{
{
@ -465,35 +504,48 @@ func TestPipeUnpackJSONUpdateNeededFields(t *testing.T) {
// all the needed fields
f("unpack_json from x", "*", "", "*", "")
f("unpack_json from x keep_original_fields", "*", "", "*", "")
f("unpack_json if (y:z) from x", "*", "", "*", "")
f("unpack_json if (y:z) from x fields (a, b)", "*", "", "*", "a,b")
f("unpack_json if (y:z) from x fields (a, b) keep_original_fields", "*", "", "*", "")
// all the needed fields, unneeded fields do not intersect with src
f("unpack_json from x", "*", "f1,f2", "*", "f1,f2")
f("unpack_json from x keep_original_fields", "*", "f1,f2", "*", "f1,f2")
f("unpack_json if (y:z) from x", "*", "f1,f2", "*", "f1,f2")
f("unpack_json if (f1:z) from x", "*", "f1,f2", "*", "f2")
f("unpack_json if (y:z) from x fields (f3)", "*", "f1,f2", "*", "f1,f2,f3")
f("unpack_json if (y:z) from x fields (f1)", "*", "f1,f2", "*", "f1,f2")
f("unpack_json if (y:z) from x fields (f1) keep_original_fields", "*", "f1,f2", "*", "f1,f2")
// all the needed fields, unneeded fields intersect with src
f("unpack_json from x", "*", "f2,x", "*", "f2")
f("unpack_json from x keep_original_fields", "*", "f2,x", "*", "f2")
f("unpack_json if (y:z) from x", "*", "f2,x", "*", "f2")
f("unpack_json if (f2:z) from x", "*", "f1,f2,x", "*", "f1")
f("unpack_json if (f2:z) from x fields (f3)", "*", "f1,f2,x", "*", "f1,f3")
f("unpack_json if (f2:z) from x fields (f3) keep_original_fields", "*", "f1,f2,x", "*", "f1")
// needed fields do not intersect with src
f("unpack_json from x", "f1,f2", "", "f1,f2,x", "")
f("unpack_json from x keep_original_fields", "f1,f2", "", "f1,f2,x", "")
f("unpack_json if (y:z) from x", "f1,f2", "", "f1,f2,x,y", "")
f("unpack_json if (f1:z) from x", "f1,f2", "", "f1,f2,x", "")
f("unpack_json if (y:z) from x fields (f3)", "f1,f2", "", "f1,f2", "")
f("unpack_json if (y:z) from x fields (f3) keep_original_fields", "f1,f2", "", "f1,f2", "")
f("unpack_json if (y:z) from x fields (f2)", "f1,f2", "", "f1,x,y", "")
f("unpack_json if (f2:z) from x fields (f2)", "f1,f2", "", "f1,f2,x", "")
f("unpack_json if (f2:z) from x fields (f2) keep_original_fields", "f1,f2", "", "f1,f2,x", "")
// needed fields intersect with src
f("unpack_json from x", "f2,x", "", "f2,x", "")
f("unpack_json from x keep_original_fields", "f2,x", "", "f2,x", "")
f("unpack_json if (y:z) from x", "f2,x", "", "f2,x,y", "")
f("unpack_json if (f2:z y:qwe) from x", "f2,x", "", "f2,x,y", "")
f("unpack_json if (y:z) from x fields (f1)", "f2,x", "", "f2,x", "")
f("unpack_json if (y:z) from x fields (f1) keep_original_fields", "f2,x", "", "f2,x", "")
f("unpack_json if (y:z) from x fields (f2)", "f2,x", "", "x,y", "")
f("unpack_json if (y:z) from x fields (f2) keep_original_fields", "f2,x", "", "f2,x,y", "")
f("unpack_json if (y:z) from x fields (x)", "f2,x", "", "f2,x,y", "")
f("unpack_json if (y:z) from x fields (x) keep_original_fields", "f2,x", "", "f2,x,y", "")
}

View file

@ -20,6 +20,8 @@ type pipeUnpackLogfmt struct {
// resultPrefix is prefix to add to unpacked field names
resultPrefix string
keepOriginalFields bool
// iff is an optional filter for skipping unpacking logfmt
iff *ifFilter
}
@ -38,11 +40,14 @@ func (pu *pipeUnpackLogfmt) String() string {
if pu.resultPrefix != "" {
s += " result_prefix " + quoteTokenIfNeeded(pu.resultPrefix)
}
if pu.keepOriginalFields {
s += " keep_original_fields"
}
return s
}
func (pu *pipeUnpackLogfmt) updateNeededFields(neededFields, unneededFields fieldsSet) {
updateNeededFieldsForUnpackPipe(pu.fromField, pu.fields, pu.iff, neededFields, unneededFields)
updateNeededFieldsForUnpackPipe(pu.fromField, pu.fields, pu.keepOriginalFields, pu.iff, neededFields, unneededFields)
}
func (pu *pipeUnpackLogfmt) newPipeProcessor(workersCount int, _ <-chan struct{}, _ func(), ppBase pipeProcessor) pipeProcessor {
@ -73,7 +78,7 @@ func (pu *pipeUnpackLogfmt) newPipeProcessor(workersCount int, _ <-chan struct{}
putLogfmtParser(p)
}
return newPipeUnpackProcessor(workersCount, unpackLogfmt, ppBase, pu.fromField, pu.resultPrefix, pu.iff)
return newPipeUnpackProcessor(workersCount, unpackLogfmt, ppBase, pu.fromField, pu.resultPrefix, pu.keepOriginalFields, pu.iff)
}
@ -125,11 +130,18 @@ func parsePipeUnpackLogfmt(lex *lexer) (*pipeUnpackLogfmt, error) {
resultPrefix = p
}
keepOriginalFields := false
if lex.isKeyword("keep_original_fields") {
lex.nextToken()
keepOriginalFields = true
}
pu := &pipeUnpackLogfmt{
fromField: fromField,
fields: fields,
resultPrefix: resultPrefix,
iff: iff,
fromField: fromField,
fields: fields,
resultPrefix: resultPrefix,
keepOriginalFields: keepOriginalFields,
iff: iff,
}
return pu, nil

View file

@ -11,19 +11,26 @@ func TestParsePipeUnpackLogfmtSuccess(t *testing.T) {
}
f(`unpack_logfmt`)
f(`unpack_logfmt keep_original_fields`)
f(`unpack_logfmt fields (a, b)`)
f(`unpack_logfmt fields (a, b) keep_original_fields`)
f(`unpack_logfmt if (a:x)`)
f(`unpack_logfmt if (a:x) keep_original_fields`)
f(`unpack_logfmt if (a:x) fields (a, b)`)
f(`unpack_logfmt from x`)
f(`unpack_logfmt from x keep_original_fields`)
f(`unpack_logfmt from x fields (a, b)`)
f(`unpack_logfmt from x fields (a, b) keep_original_fields`)
f(`unpack_logfmt if (a:x) from x`)
f(`unpack_logfmt if (a:x) from x fields (a, b)`)
f(`unpack_logfmt from x result_prefix abc`)
f(`unpack_logfmt if (a:x) from x result_prefix abc`)
f(`unpack_logfmt if (a:x) from x fields (a, b) result_prefix abc`)
f(`unpack_logfmt if (a:x) from x fields (a, b) result_prefix abc keep_original_fields`)
f(`unpack_logfmt result_prefix abc`)
f(`unpack_logfmt if (a:x) result_prefix abc`)
f(`unpack_logfmt if (a:x) fields (a, b) result_prefix abc`)
f(`unpack_logfmt if (a:x) fields (a, b) result_prefix abc keep_original_fields`)
}
func TestParsePipeUnpackLogfmtFailure(t *testing.T) {
@ -57,6 +64,7 @@ func TestPipeUnpackLogfmt(t *testing.T) {
f("unpack_logfmt fields (foo, a, b)", [][]Field{
{
{"_msg", `foo=bar baz="x y=z" a=b`},
{"a", "xxx"},
},
}, [][]Field{
{
@ -67,10 +75,26 @@ func TestPipeUnpackLogfmt(t *testing.T) {
},
})
// keep original fields
f("unpack_logfmt keep_original_fields", [][]Field{
{
{"_msg", `foo=bar baz="x y=z" a=b`},
{"baz", "abcdef"},
},
}, [][]Field{
{
{"_msg", `foo=bar baz="x y=z" a=b`},
{"foo", "bar"},
{"baz", "abcdef"},
{"a", "b"},
},
})
// single row, unpack from _msg
f("unpack_logfmt", [][]Field{
{
{"_msg", `foo=bar baz="x y=z" a=b`},
{"baz", "abcdef"},
},
}, [][]Field{
{
@ -242,7 +266,10 @@ func TestPipeUnpackLogfmtUpdateNeededFields(t *testing.T) {
}
// all the needed fields
f("unpack_logfmt from x", "*", "", "*", "")
f("unpack_logfmt", "*", "", "*", "")
f("unpack_logfmt fields (f1, f2)", "*", "", "*", "f1,f2")
f("unpack_logfmt fields (f1, f2) keep_original_fields", "*", "", "*", "")
f("unpack_logfmt keep_original_fields", "*", "", "*", "")
f("unpack_logfmt if (y:z) from x", "*", "", "*", "")
// all the needed fields, unneeded fields do not intersect with src