mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-12-31 15:06:26 +00:00
wip
This commit is contained in:
parent
aa23832921
commit
f8e034f061
12 changed files with 316 additions and 32 deletions
|
@ -21,7 +21,8 @@ according to [these docs](https://docs.victoriametrics.com/victorialogs/quicksta
|
|||
|
||||
* FEATURE: support [comparing](https://docs.victoriametrics.com/victorialogs/logsql/#range-filter) log field values with [special numeric values](https://docs.victoriametrics.com/victorialogs/logsql/#numeric-values). For example, `duration:>1.5s` and `response_size:<15KiB` are valid filters now.
|
||||
* FEATURE: properly sort [durations](https://docs.victoriametrics.com/victorialogs/logsql/#duration-values) and [short numeric values](https://docs.victoriametrics.com/victorialogs/logsql/#short-numeric-values) in [`sort` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#sort-pipe). For example, `10s` goes in front of `1h`, while `10KB` goes in front of `1GB`.
|
||||
* FEATURE: add an ability to preserve the original non-empty field values when performing [`extract`](https://docs.victoriametrics.com/victorialogs/logsql/#extract-pipe), [`unpack_json`](https://docs.victoriametrics.com/victorialogs/logsql/#unpack_json-pipe), [`unpack_logfmt`](https://docs.victoriametrics.com/victorialogs/logsql/#unpack_logfmt-pipe) and [`format`](https://docs.victoriametrics.com/victorialogs/logsql/#format-pipe) pipes.
|
||||
* FEATURE: add an ability to preserve the original non-empty field values when executing [`extract`](https://docs.victoriametrics.com/victorialogs/logsql/#extract-pipe), [`unpack_json`](https://docs.victoriametrics.com/victorialogs/logsql/#unpack_json-pipe), [`unpack_logfmt`](https://docs.victoriametrics.com/victorialogs/logsql/#unpack_logfmt-pipe) and [`format`](https://docs.victoriametrics.com/victorialogs/logsql/#format-pipe) pipes.
|
||||
* FEATURE: add an ability to preserve the original field values if the corresponding unpacked values are empty when executing [`extract`](https://docs.victoriametrics.com/victorialogs/logsql/#extract-pipe), [`unpack_json`](https://docs.victoriametrics.com/victorialogs/logsql/#unpack_json-pipe), [`unpack_logfmt`](https://docs.victoriametrics.com/victorialogs/logsql/#unpack_logfmt-pipe) and [`format`](https://docs.victoriametrics.com/victorialogs/logsql/#format-pipe) pipes.
|
||||
|
||||
## [v0.10.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.10.0-victorialogs)
|
||||
|
||||
|
|
|
@ -1162,13 +1162,21 @@ For example, the following query extracts `ip` from the corresponding JSON field
|
|||
_time:5m | extract '"ip":"<ip>"'
|
||||
```
|
||||
|
||||
Add `keep_original_fields` to the end of `extract ...` when the original non-empty values of the named fields mentioned in the pattern must be preserved
|
||||
Add `keep_original_fields` to the end of `extract ...` when the original non-empty values of the fields mentioned in the pattern must be preserved
|
||||
instead of overwriting it with the extracted values. For example, the following query extracts `<ip>` only if the original value for `ip` field is missing or is empty:
|
||||
|
||||
```logsql
|
||||
_time:5m | extract 'ip=<ip> ' keep_original_fields
|
||||
```
|
||||
|
||||
By default `extract` writes empty matching fields to the output, which may overwrite existing values. Add `skip_empty_results` to the end of `extract ...`
|
||||
in order to prevent from overwriting the existing values for the corresponding fields with empty values.
|
||||
For example, the following query preserves the original `ip` field value if `foo` field doesn't contain the matching ip:
|
||||
|
||||
```logsql
|
||||
_time:5m | extract 'ip=<ip> ' from foo skip_empty_results
|
||||
```
|
||||
|
||||
See also:
|
||||
|
||||
- [Format for extract pipe pattern](#format-for-extract-pipe-pattern)
|
||||
|
@ -1347,6 +1355,13 @@ instead of overwriting it with the `format` results. For example, the following
|
|||
_time:5m | format 'some_text' as foo keep_original_fields
|
||||
```
|
||||
|
||||
Add `skip_empty_results` to the end of `format ...` if emty results shouldn't be written to the output. For example, the following query adds formatted result to `foo` field
|
||||
when at least `field1` or `field2` aren't empty, while preserving the original `foo` value:
|
||||
|
||||
```logsql
|
||||
_time:5m | format "<field1><field2>" as foo skip_empty_results
|
||||
```
|
||||
|
||||
See also:
|
||||
|
||||
- [Conditional format](#conditional-format)
|
||||
|
@ -1713,13 +1728,20 @@ fields from JSON value stored in `my_json` [log field](https://docs.victoriametr
|
|||
_time:5m | unpack_json from my_json fields (foo, bar)
|
||||
```
|
||||
|
||||
If it is needed to preserve the original non-empty values of the unpacked fields, then add `keep_original_fields` to the end of `unpack_json ...`. For example,
|
||||
the following query preserves the original non-empty values for `ip` and `host` fields instead of overwriting it with the unpacked values:
|
||||
If it is needed to preserve the original non-empty field values, then add `keep_original_fields` to the end of `unpack_json ...`. For example,
|
||||
the following query preserves the original non-empty values for `ip` and `host` fields instead of overwriting them with the unpacked values:
|
||||
|
||||
```logsql
|
||||
_time:5m | unpack_json from foo fields (ip, host) keep_original_fields
|
||||
```
|
||||
|
||||
Add `skip_empty_results` to the end of `unpack_json ...` if the original field values must be preserved when the corresponding unpacked values are empty.
|
||||
For example, the following query preserves the original `ip` and `host` field values for empty unpacked values:
|
||||
|
||||
```logsql
|
||||
_time:5m | unpack_json fields (ip, host) skip_empty_results
|
||||
```
|
||||
|
||||
Performance tip: if you need extracting a single field from long JSON, it is faster to use [`extract` pipe](#extract-pipe). For example, the following query extracts `"ip"` field from JSON
|
||||
stored in [`_msg` field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field) at the maximum speed:
|
||||
|
||||
|
@ -1782,13 +1804,20 @@ from logfmt stored in the `my_logfmt` field:
|
|||
_time:5m | unpack_logfmt from my_logfmt fields (foo, bar)
|
||||
```
|
||||
|
||||
If it is needed to preserve the original non-empty values of the unpacked fields, then add `keep_original_fields` to the end of `unpack_logfmt ...`. For example,
|
||||
the following query preserves the original non-empty values for `ip` and `host` fields instead of overwriting it with the unpacked values:
|
||||
If it is needed to preserve the original non-empty field values, then add `keep_original_fields` to the end of `unpack_logfmt ...`. For example,
|
||||
the following query preserves the original non-empty values for `ip` and `host` fields instead of overwriting them with the unpacked values:
|
||||
|
||||
```logsql
|
||||
_time:5m | unpack_logfmt from foo fields (ip, host) keep_original_fields
|
||||
```
|
||||
|
||||
Add `skip_empty_results` to the end of `unpack_logfmt ...` if the original field values must be preserved when the corresponding unpacked values are empty.
|
||||
For example, the following query preserves the original `ip` and `host` field values for empty unpacked values:
|
||||
|
||||
```logsql
|
||||
_time:5m | unpack_logfmt fields (ip, host) skip_empty_results
|
||||
```
|
||||
|
||||
Performance tip: if you need extracting a single field from long [logfmt](https://brandur.org/logfmt) line, it is faster to use [`extract` pipe](#extract-pipe).
|
||||
For example, the following query extracts `"ip"` field from [logfmt](https://brandur.org/logfmt) line stored
|
||||
in [`_msg` field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field):
|
||||
|
|
|
@ -14,6 +14,7 @@ type pipeExtract struct {
|
|||
patternStr string
|
||||
|
||||
keepOriginalFields bool
|
||||
skipEmptyResults bool
|
||||
|
||||
// iff is an optional filter for skipping the extract func
|
||||
iff *ifFilter
|
||||
|
@ -31,6 +32,9 @@ func (pe *pipeExtract) String() string {
|
|||
if pe.keepOriginalFields {
|
||||
s += " keep_original_fields"
|
||||
}
|
||||
if pe.skipEmptyResults {
|
||||
s += " skip_empty_results"
|
||||
}
|
||||
return s
|
||||
}
|
||||
|
||||
|
@ -43,7 +47,7 @@ func (pe *pipeExtract) updateNeededFields(neededFields, unneededFields fieldsSet
|
|||
if !unneededFieldsOrig.contains(step.field) {
|
||||
needFromField = true
|
||||
}
|
||||
if !pe.keepOriginalFields {
|
||||
if !pe.keepOriginalFields && !pe.skipEmptyResults {
|
||||
unneededFields.add(step.field)
|
||||
}
|
||||
}
|
||||
|
@ -62,7 +66,7 @@ func (pe *pipeExtract) updateNeededFields(neededFields, unneededFields fieldsSet
|
|||
for _, step := range pe.ptn.steps {
|
||||
if step.field != "" && neededFieldsOrig.contains(step.field) {
|
||||
needFromField = true
|
||||
if !pe.keepOriginalFields {
|
||||
if !pe.keepOriginalFields && !pe.skipEmptyResults {
|
||||
neededFields.remove(step.field)
|
||||
}
|
||||
}
|
||||
|
@ -90,7 +94,7 @@ func (pe *pipeExtract) newPipeProcessor(workersCount int, _ <-chan struct{}, _ f
|
|||
}
|
||||
}
|
||||
|
||||
return newPipeUnpackProcessor(workersCount, unpackFunc, ppBase, pe.fromField, "", pe.keepOriginalFields, pe.iff)
|
||||
return newPipeUnpackProcessor(workersCount, unpackFunc, ppBase, pe.fromField, "", pe.keepOriginalFields, pe.skipEmptyResults, pe.iff)
|
||||
}
|
||||
|
||||
func parsePipeExtract(lex *lexer) (*pipeExtract, error) {
|
||||
|
@ -131,9 +135,14 @@ func parsePipeExtract(lex *lexer) (*pipeExtract, error) {
|
|||
}
|
||||
|
||||
keepOriginalFields := false
|
||||
if lex.isKeyword("keep_original_fields") {
|
||||
skipEmptyResults := false
|
||||
switch {
|
||||
case lex.isKeyword("keep_original_fields"):
|
||||
lex.nextToken()
|
||||
keepOriginalFields = true
|
||||
case lex.isKeyword("skip_empty_results"):
|
||||
lex.nextToken()
|
||||
skipEmptyResults = true
|
||||
}
|
||||
|
||||
pe := &pipeExtract{
|
||||
|
@ -141,6 +150,7 @@ func parsePipeExtract(lex *lexer) (*pipeExtract, error) {
|
|||
ptn: ptn,
|
||||
patternStr: patternStr,
|
||||
keepOriginalFields: keepOriginalFields,
|
||||
skipEmptyResults: skipEmptyResults,
|
||||
iff: iff,
|
||||
}
|
||||
|
||||
|
|
|
@ -11,10 +11,13 @@ func TestParsePipeExtractSuccess(t *testing.T) {
|
|||
}
|
||||
|
||||
f(`extract "foo<bar>"`)
|
||||
f(`extract "foo<bar>" skip_empty_results`)
|
||||
f(`extract "foo<bar>" keep_original_fields`)
|
||||
f(`extract "foo<bar>" from x`)
|
||||
f(`extract "foo<bar>" from x skip_empty_results`)
|
||||
f(`extract "foo<bar>" from x keep_original_fields`)
|
||||
f(`extract if (x:y) "foo<bar>" from baz`)
|
||||
f(`extract if (x:y) "foo<bar>" from baz skip_empty_results`)
|
||||
f(`extract if (x:y) "foo<bar>" from baz keep_original_fields`)
|
||||
}
|
||||
|
||||
|
@ -26,6 +29,7 @@ func TestParsePipeExtractFailure(t *testing.T) {
|
|||
|
||||
f(`extract`)
|
||||
f(`extract keep_original_fields`)
|
||||
f(`extract skip_empty_results`)
|
||||
f(`extract from`)
|
||||
f(`extract from x`)
|
||||
f(`extract from x "y<foo>"`)
|
||||
|
@ -42,6 +46,36 @@ func TestPipeExtract(t *testing.T) {
|
|||
expectPipeResults(t, pipeStr, rows, rowsExpected)
|
||||
}
|
||||
|
||||
// skip empty results
|
||||
f(`extract "baz=<abc> a=<aa>" skip_empty_results`, [][]Field{
|
||||
{
|
||||
{"_msg", `foo=bar baz="x y=z" `},
|
||||
{"aa", "foobar"},
|
||||
{"abc", "ippl"},
|
||||
},
|
||||
}, [][]Field{
|
||||
{
|
||||
{"_msg", `foo=bar baz="x y=z" `},
|
||||
{"aa", "foobar"},
|
||||
{"abc", "x y=z"},
|
||||
},
|
||||
})
|
||||
|
||||
// no skip empty results
|
||||
f(`extract "baz=<abc> a=<aa>"`, [][]Field{
|
||||
{
|
||||
{"_msg", `foo=bar baz="x y=z" `},
|
||||
{"aa", "foobar"},
|
||||
{"abc", "ippl"},
|
||||
},
|
||||
}, [][]Field{
|
||||
{
|
||||
{"_msg", `foo=bar baz="x y=z" `},
|
||||
{"aa", ""},
|
||||
{"abc", "x y=z"},
|
||||
},
|
||||
})
|
||||
|
||||
// keep original fields
|
||||
f(`extract "baz=<abc> a=<aa>" keep_original_fields`, [][]Field{
|
||||
{
|
||||
|
@ -261,51 +295,62 @@ func TestPipeExtractUpdateNeededFields(t *testing.T) {
|
|||
f("extract '<foo>' from x", "*", "", "*", "foo")
|
||||
f("extract if (foo:bar) '<foo>' from x", "*", "", "*", "")
|
||||
f("extract if (foo:bar) '<foo>' from x keep_original_fields", "*", "", "*", "")
|
||||
f("extract if (foo:bar) '<foo>' from x skip_empty_results", "*", "", "*", "")
|
||||
|
||||
// unneeded fields do not intersect with pattern and output fields
|
||||
f("extract '<foo>' from x", "*", "f1,f2", "*", "f1,f2,foo")
|
||||
f("extract '<foo>' from x keep_original_fields", "*", "f1,f2", "*", "f1,f2")
|
||||
f("extract '<foo>' from x skip_empty_results", "*", "f1,f2", "*", "f1,f2")
|
||||
f("extract if (f1:x) '<foo>' from x", "*", "f1,f2", "*", "f2,foo")
|
||||
f("extract if (f1:x) '<foo>' from x keep_original_fields", "*", "f1,f2", "*", "f2")
|
||||
f("extract if (f1:x) '<foo>' from x skip_empty_results", "*", "f1,f2", "*", "f2")
|
||||
f("extract if (foo:bar f1:x) '<foo>' from x", "*", "f1,f2", "*", "f2")
|
||||
|
||||
// unneeded fields intersect with pattern
|
||||
f("extract '<foo>' from x", "*", "f2,x", "*", "f2,foo")
|
||||
f("extract '<foo>' from x keep_original_fields", "*", "f2,x", "*", "f2")
|
||||
f("extract '<foo>' from x skip_empty_results", "*", "f2,x", "*", "f2")
|
||||
f("extract if (f1:abc) '<foo>' from x", "*", "f2,x", "*", "f2,foo")
|
||||
f("extract if (f2:abc) '<foo>' from x", "*", "f2,x", "*", "foo")
|
||||
|
||||
// unneeded fields intersect with output fields
|
||||
f("extract '<foo>x<bar>' from x", "*", "f2,foo", "*", "bar,f2,foo")
|
||||
f("extract '<foo>x<bar>' from x keep_original_fields", "*", "f2,foo", "*", "f2,foo")
|
||||
f("extract '<foo>x<bar>' from x skip_empty_results", "*", "f2,foo", "*", "f2,foo")
|
||||
f("extract if (f1:abc) '<foo>x<bar>' from x", "*", "f2,foo", "*", "bar,f2,foo")
|
||||
f("extract if (f2:abc foo:w) '<foo>x<bar>' from x", "*", "f2,foo", "*", "bar")
|
||||
f("extract if (f2:abc foo:w) '<foo>x<bar>' from x keep_original_fields", "*", "f2,foo", "*", "")
|
||||
f("extract if (f2:abc foo:w) '<foo>x<bar>' from x skip_empty_results", "*", "f2,foo", "*", "")
|
||||
|
||||
// unneeded fields intersect with all the output fields
|
||||
f("extract '<foo>x<bar>' from x", "*", "f2,foo,bar", "*", "bar,f2,foo,x")
|
||||
f("extract if (a:b f2:q x:y foo:w) '<foo>x<bar>' from x", "*", "f2,foo,bar", "*", "bar,f2,foo,x")
|
||||
f("extract if (a:b f2:q x:y foo:w) '<foo>x<bar>' from x keep_original_fields", "*", "f2,foo,bar", "*", "bar,f2,foo,x")
|
||||
f("extract if (a:b f2:q x:y foo:w) '<foo>x<bar>' from x skip_empty_results", "*", "f2,foo,bar", "*", "bar,f2,foo,x")
|
||||
|
||||
// needed fields do not intersect with pattern and output fields
|
||||
f("extract '<foo>x<bar>' from x", "f1,f2", "", "f1,f2", "")
|
||||
f("extract '<foo>x<bar>' from x keep_original_fields", "f1,f2", "", "f1,f2", "")
|
||||
f("extract '<foo>x<bar>' from x skip_empty_results", "f1,f2", "", "f1,f2", "")
|
||||
f("extract if (a:b) '<foo>x<bar>' from x", "f1,f2", "", "f1,f2", "")
|
||||
f("extract if (f1:b) '<foo>x<bar>' from x", "f1,f2", "", "f1,f2", "")
|
||||
|
||||
// needed fields intersect with pattern field
|
||||
f("extract '<foo>x<bar>' from x", "f2,x", "", "f2,x", "")
|
||||
f("extract '<foo>x<bar>' from x keep_original_fields", "f2,x", "", "f2,x", "")
|
||||
f("extract '<foo>x<bar>' from x skip_empty_results", "f2,x", "", "f2,x", "")
|
||||
f("extract if (a:b) '<foo>x<bar>' from x", "f2,x", "", "f2,x", "")
|
||||
|
||||
// needed fields intersect with output fields
|
||||
f("extract '<foo>x<bar>' from x", "f2,foo", "", "f2,x", "")
|
||||
f("extract '<foo>x<bar>' from x keep_original_fields", "f2,foo", "", "foo,f2,x", "")
|
||||
f("extract '<foo>x<bar>' from x skip_empty_results", "f2,foo", "", "foo,f2,x", "")
|
||||
f("extract if (a:b) '<foo>x<bar>' from x", "f2,foo", "", "a,f2,x", "")
|
||||
|
||||
// needed fields intersect with pattern and output fields
|
||||
f("extract '<foo>x<bar>' from x", "f2,foo,x,y", "", "f2,x,y", "")
|
||||
f("extract '<foo>x<bar>' from x keep_original_fields", "f2,foo,x,y", "", "foo,f2,x,y", "")
|
||||
f("extract '<foo>x<bar>' from x skip_empty_results", "f2,foo,x,y", "", "foo,f2,x,y", "")
|
||||
f("extract if (a:b foo:q) '<foo>x<bar>' from x", "f2,foo,x,y", "", "a,f2,foo,x,y", "")
|
||||
}
|
||||
|
||||
|
|
|
@ -18,6 +18,7 @@ type pipeFormat struct {
|
|||
resultField string
|
||||
|
||||
keepOriginalFields bool
|
||||
skipEmptyResults bool
|
||||
|
||||
// iff is an optional filter for skipping the format func
|
||||
iff *ifFilter
|
||||
|
@ -35,13 +36,16 @@ func (pf *pipeFormat) String() string {
|
|||
if pf.keepOriginalFields {
|
||||
s += " keep_original_fields"
|
||||
}
|
||||
if pf.skipEmptyResults {
|
||||
s += " skip_empty_results"
|
||||
}
|
||||
return s
|
||||
}
|
||||
|
||||
func (pf *pipeFormat) updateNeededFields(neededFields, unneededFields fieldsSet) {
|
||||
if neededFields.contains("*") {
|
||||
if !unneededFields.contains(pf.resultField) {
|
||||
if !pf.keepOriginalFields {
|
||||
if !pf.keepOriginalFields && !pf.skipEmptyResults {
|
||||
unneededFields.add(pf.resultField)
|
||||
}
|
||||
if pf.iff != nil {
|
||||
|
@ -55,7 +59,7 @@ func (pf *pipeFormat) updateNeededFields(neededFields, unneededFields fieldsSet)
|
|||
}
|
||||
} else {
|
||||
if neededFields.contains(pf.resultField) {
|
||||
if !pf.keepOriginalFields {
|
||||
if !pf.keepOriginalFields && !pf.skipEmptyResults {
|
||||
neededFields.remove(pf.resultField)
|
||||
}
|
||||
if pf.iff != nil {
|
||||
|
@ -106,7 +110,7 @@ func (pfp *pipeFormatProcessor) writeBlock(workerID uint, br *blockResult) {
|
|||
}
|
||||
|
||||
shard := &pfp.shards[workerID]
|
||||
shard.wctx.init(workerID, pfp.ppBase, pfp.pf.keepOriginalFields, br)
|
||||
shard.wctx.init(workerID, pfp.ppBase, pfp.pf.keepOriginalFields, pfp.pf.skipEmptyResults, br)
|
||||
shard.uctx.init(workerID, "")
|
||||
|
||||
bm := &shard.bm
|
||||
|
@ -199,9 +203,14 @@ func parsePipeFormat(lex *lexer) (*pipeFormat, error) {
|
|||
}
|
||||
|
||||
keepOriginalFields := false
|
||||
if lex.isKeyword("keep_original_fields") {
|
||||
skipEmptyResults := false
|
||||
switch {
|
||||
case lex.isKeyword("keep_original_fields"):
|
||||
lex.nextToken()
|
||||
keepOriginalFields = true
|
||||
case lex.isKeyword("skip_empty_results"):
|
||||
lex.nextToken()
|
||||
skipEmptyResults = true
|
||||
}
|
||||
|
||||
pf := &pipeFormat{
|
||||
|
@ -209,6 +218,7 @@ func parsePipeFormat(lex *lexer) (*pipeFormat, error) {
|
|||
steps: steps,
|
||||
resultField: resultField,
|
||||
keepOriginalFields: keepOriginalFields,
|
||||
skipEmptyResults: skipEmptyResults,
|
||||
iff: iff,
|
||||
}
|
||||
|
||||
|
|
|
@ -11,16 +11,20 @@ func TestParsePipeFormatSuccess(t *testing.T) {
|
|||
}
|
||||
|
||||
f(`format "foo<bar>"`)
|
||||
f(`format "foo<bar>" skip_empty_results`)
|
||||
f(`format "foo<bar>" keep_original_fields`)
|
||||
f(`format "" as x`)
|
||||
f(`format "<>" as x`)
|
||||
f(`format foo as x`)
|
||||
f(`format foo as x skip_empty_results`)
|
||||
f(`format foo as x keep_original_fields`)
|
||||
f(`format "<foo>"`)
|
||||
f(`format "<foo>bar<baz>"`)
|
||||
f(`format "bar<baz><xyz>bac"`)
|
||||
f(`format "bar<baz><xyz>bac" skip_empty_results`)
|
||||
f(`format "bar<baz><xyz>bac" keep_original_fields`)
|
||||
f(`format if (x:y) "bar<baz><xyz>bac"`)
|
||||
f(`format if (x:y) "bar<baz><xyz>bac" skip_empty_results`)
|
||||
f(`format if (x:y) "bar<baz><xyz>bac" keep_original_fields`)
|
||||
}
|
||||
|
||||
|
@ -43,6 +47,56 @@ func TestPipeFormat(t *testing.T) {
|
|||
expectPipeResults(t, pipeStr, rows, rowsExpected)
|
||||
}
|
||||
|
||||
// skip_empty_results
|
||||
f(`format '<foo><bar>' as x skip_empty_results`, [][]Field{
|
||||
{
|
||||
{"foo", `abc`},
|
||||
{"bar", `cde`},
|
||||
{"x", "111"},
|
||||
},
|
||||
{
|
||||
{"xfoo", `ppp`},
|
||||
{"xbar", `123`},
|
||||
{"x", "222"},
|
||||
},
|
||||
}, [][]Field{
|
||||
{
|
||||
{"foo", `abc`},
|
||||
{"bar", `cde`},
|
||||
{"x", `abccde`},
|
||||
},
|
||||
{
|
||||
{"xfoo", `ppp`},
|
||||
{"xbar", `123`},
|
||||
{"x", `222`},
|
||||
},
|
||||
})
|
||||
|
||||
// no skip_empty_results
|
||||
f(`format '<foo><bar>' as x`, [][]Field{
|
||||
{
|
||||
{"foo", `abc`},
|
||||
{"bar", `cde`},
|
||||
{"x", "111"},
|
||||
},
|
||||
{
|
||||
{"xfoo", `ppp`},
|
||||
{"xbar", `123`},
|
||||
{"x", "222"},
|
||||
},
|
||||
}, [][]Field{
|
||||
{
|
||||
{"foo", `abc`},
|
||||
{"bar", `cde`},
|
||||
{"x", `abccde`},
|
||||
},
|
||||
{
|
||||
{"xfoo", `ppp`},
|
||||
{"xbar", `123`},
|
||||
{"x", ``},
|
||||
},
|
||||
})
|
||||
|
||||
// no keep_original_fields
|
||||
f(`format '{"foo":<q:foo>,"bar":"<bar>"}' as x`, [][]Field{
|
||||
{
|
||||
|
@ -212,9 +266,11 @@ func TestPipeFormatUpdateNeededFields(t *testing.T) {
|
|||
|
||||
// all the needed fields
|
||||
f(`format "foo" as x`, "*", "", "*", "x")
|
||||
f(`format "foo" as x skip_empty_results`, "*", "", "*", "")
|
||||
f(`format "foo" as x keep_original_fields`, "*", "", "*", "")
|
||||
f(`format "<f1>foo" as x`, "*", "", "*", "x")
|
||||
f(`format if (f2:z) "<f1>foo" as x`, "*", "", "*", "x")
|
||||
f(`format if (f2:z) "<f1>foo" as x skip_empty_results`, "*", "", "*", "")
|
||||
f(`format if (f2:z) "<f1>foo" as x keep_original_fields`, "*", "", "*", "")
|
||||
|
||||
// unneeded fields do not intersect with pattern and output field
|
||||
|
@ -222,51 +278,68 @@ func TestPipeFormatUpdateNeededFields(t *testing.T) {
|
|||
f(`format "<f3>foo" as x`, "*", "f1,f2", "*", "f1,f2,x")
|
||||
f(`format if (f4:z) "<f3>foo" as x`, "*", "f1,f2", "*", "f1,f2,x")
|
||||
f(`format if (f1:z) "<f3>foo" as x`, "*", "f1,f2", "*", "f2,x")
|
||||
f(`format if (f1:z) "<f3>foo" as x skip_empty_results`, "*", "f1,f2", "*", "f2")
|
||||
f(`format if (f1:z) "<f3>foo" as x keep_original_fields`, "*", "f1,f2", "*", "f2")
|
||||
|
||||
// unneeded fields intersect with pattern
|
||||
f(`format "<f1>foo" as x`, "*", "f1,f2", "*", "f2,x")
|
||||
f(`format "<f1>foo" as x skip_empty_results`, "*", "f1,f2", "*", "f2")
|
||||
f(`format "<f1>foo" as x keep_original_fields`, "*", "f1,f2", "*", "f2")
|
||||
f(`format if (f4:z) "<f1>foo" as x`, "*", "f1,f2", "*", "f2,x")
|
||||
f(`format if (f4:z) "<f1>foo" as x skip_empty_results`, "*", "f1,f2", "*", "f2")
|
||||
f(`format if (f4:z) "<f1>foo" as x keep_original_fields`, "*", "f1,f2", "*", "f2")
|
||||
f(`format if (f2:z) "<f1>foo" as x`, "*", "f1,f2", "*", "x")
|
||||
f(`format if (f2:z) "<f1>foo" as x skip_empty_results`, "*", "f1,f2", "*", "")
|
||||
f(`format if (f2:z) "<f1>foo" as x keep_original_fields`, "*", "f1,f2", "*", "")
|
||||
|
||||
// unneeded fields intersect with output field
|
||||
f(`format "<f1>foo" as x`, "*", "x,y", "*", "x,y")
|
||||
f(`format "<f1>foo" as x skip_empty_results`, "*", "x,y", "*", "x,y")
|
||||
f(`format "<f1>foo" as x keep_original_fields`, "*", "x,y", "*", "x,y")
|
||||
f(`format if (f2:z) "<f1>foo" as x`, "*", "x,y", "*", "x,y")
|
||||
f(`format if (f2:z) "<f1>foo" as x skip_empty_results`, "*", "x,y", "*", "x,y")
|
||||
f(`format if (f2:z) "<f1>foo" as x keep_original_fields`, "*", "x,y", "*", "x,y")
|
||||
f(`format if (y:z) "<f1>foo" as x`, "*", "x,y", "*", "x,y")
|
||||
f(`format if (y:z) "<f1>foo" as x skip_empty_results`, "*", "x,y", "*", "x,y")
|
||||
f(`format if (y:z) "<f1>foo" as x keep_original_fields`, "*", "x,y", "*", "x,y")
|
||||
|
||||
// needed fields do not intersect with pattern and output field
|
||||
f(`format "<f1>foo" as f2`, "x,y", "", "x,y", "")
|
||||
f(`format "<f1>foo" as f2 keep_original_fields`, "x,y", "", "x,y", "")
|
||||
f(`format "<f1>foo" as f2 skip_empty_results`, "x,y", "", "x,y", "")
|
||||
f(`format if (f3:z) "<f1>foo" as f2`, "x,y", "", "x,y", "")
|
||||
f(`format if (f3:z) "<f1>foo" as f2 skip_empty_results`, "x,y", "", "x,y", "")
|
||||
f(`format if (f3:z) "<f1>foo" as f2 keep_original_fields`, "x,y", "", "x,y", "")
|
||||
f(`format if (x:z) "<f1>foo" as f2`, "x,y", "", "x,y", "")
|
||||
f(`format if (x:z) "<f1>foo" as f2 skip_empty_results`, "x,y", "", "x,y", "")
|
||||
f(`format if (x:z) "<f1>foo" as f2 keep_original_fields`, "x,y", "", "x,y", "")
|
||||
|
||||
// needed fields intersect with pattern field
|
||||
f(`format "<f1>foo" as f2`, "f1,y", "", "f1,y", "")
|
||||
f(`format "<f1>foo" as f2 skip_empty_results`, "f1,y", "", "f1,y", "")
|
||||
f(`format "<f1>foo" as f2 keep_original_fields`, "f1,y", "", "f1,y", "")
|
||||
f(`format if (f3:z) "<f1>foo" as f2`, "f1,y", "", "f1,y", "")
|
||||
f(`format if (x:z) "<f1>foo" as f2`, "f1,y", "", "f1,y", "")
|
||||
f(`format if (x:z) "<f1>foo" as f2 skip_empty_results`, "f1,y", "", "f1,y", "")
|
||||
f(`format if (x:z) "<f1>foo" as f2 keep_original_fields`, "f1,y", "", "f1,y", "")
|
||||
|
||||
// needed fields intersect with output field
|
||||
f(`format "<f1>foo" as f2`, "f2,y", "", "f1,y", "")
|
||||
f(`format "<f1>foo" as f2 skip_empty_results`, "f2,y", "", "f1,f2,y", "")
|
||||
f(`format "<f1>foo" as f2 keep_original_fields`, "f2,y", "", "f1,f2,y", "")
|
||||
f(`format if (f3:z) "<f1>foo" as f2`, "f2,y", "", "f1,f3,y", "")
|
||||
f(`format if (x:z or y:w) "<f1>foo" as f2`, "f2,y", "", "f1,x,y", "")
|
||||
f(`format if (x:z or y:w) "<f1>foo" as f2 skip_empty_results`, "f2,y", "", "f1,f2,x,y", "")
|
||||
f(`format if (x:z or y:w) "<f1>foo" as f2 keep_original_fields`, "f2,y", "", "f1,f2,x,y", "")
|
||||
|
||||
// needed fields intersect with pattern and output fields
|
||||
f(`format "<f1>foo" as f2`, "f1,f2,y", "", "f1,y", "")
|
||||
f(`format "<f1>foo" as f2 skip_empty_results`, "f1,f2,y", "", "f1,f2,y", "")
|
||||
f(`format "<f1>foo" as f2 keep_original_fields`, "f1,f2,y", "", "f1,f2,y", "")
|
||||
f(`format if (f3:z) "<f1>foo" as f2`, "f1,f2,y", "", "f1,f3,y", "")
|
||||
f(`format if (f3:z) "<f1>foo" as f2 skip_empty_results`, "f1,f2,y", "", "f1,f2,f3,y", "")
|
||||
f(`format if (f3:z) "<f1>foo" as f2 keep_original_fields`, "f1,f2,y", "", "f1,f2,f3,y", "")
|
||||
f(`format if (x:z or y:w) "<f1>foo" as f2`, "f1,f2,y", "", "f1,x,y", "")
|
||||
f(`format if (x:z or y:w) "<f1>foo" as f2 skip_empty_results`, "f1,f2,y", "", "f1,f2,x,y", "")
|
||||
f(`format if (x:z or y:w) "<f1>foo" as f2 keep_original_fields`, "f1,f2,y", "", "f1,f2,x,y", "")
|
||||
}
|
||||
|
|
|
@ -54,7 +54,7 @@ func (uctx *fieldsUnpackerContext) addField(name, value string) {
|
|||
}
|
||||
|
||||
func newPipeUnpackProcessor(workersCount int, unpackFunc func(uctx *fieldsUnpackerContext, s string), ppBase pipeProcessor,
|
||||
fromField string, fieldPrefix string, keepOriginalFields bool, iff *ifFilter) *pipeUnpackProcessor {
|
||||
fromField string, fieldPrefix string, keepOriginalFields, skipEmptyResults bool, iff *ifFilter) *pipeUnpackProcessor {
|
||||
|
||||
return &pipeUnpackProcessor{
|
||||
unpackFunc: unpackFunc,
|
||||
|
@ -65,6 +65,7 @@ func newPipeUnpackProcessor(workersCount int, unpackFunc func(uctx *fieldsUnpack
|
|||
fromField: fromField,
|
||||
fieldPrefix: fieldPrefix,
|
||||
keepOriginalFields: keepOriginalFields,
|
||||
skipEmptyResults: skipEmptyResults,
|
||||
iff: iff,
|
||||
}
|
||||
}
|
||||
|
@ -78,6 +79,7 @@ type pipeUnpackProcessor struct {
|
|||
fromField string
|
||||
fieldPrefix string
|
||||
keepOriginalFields bool
|
||||
skipEmptyResults bool
|
||||
|
||||
iff *ifFilter
|
||||
}
|
||||
|
@ -102,7 +104,7 @@ func (pup *pipeUnpackProcessor) writeBlock(workerID uint, br *blockResult) {
|
|||
}
|
||||
|
||||
shard := &pup.shards[workerID]
|
||||
shard.wctx.init(workerID, pup.ppBase, pup.keepOriginalFields, br)
|
||||
shard.wctx.init(workerID, pup.ppBase, pup.keepOriginalFields, pup.skipEmptyResults, br)
|
||||
shard.uctx.init(workerID, pup.fieldPrefix)
|
||||
|
||||
bm := &shard.bm
|
||||
|
@ -158,6 +160,7 @@ type pipeUnpackWriteContext struct {
|
|||
workerID uint
|
||||
ppBase pipeProcessor
|
||||
keepOriginalFields bool
|
||||
skipEmptyResults bool
|
||||
|
||||
brSrc *blockResult
|
||||
csSrc []*blockResultColumn
|
||||
|
@ -190,12 +193,13 @@ func (wctx *pipeUnpackWriteContext) reset() {
|
|||
wctx.valuesLen = 0
|
||||
}
|
||||
|
||||
func (wctx *pipeUnpackWriteContext) init(workerID uint, ppBase pipeProcessor, keepOriginalFields bool, brSrc *blockResult) {
|
||||
func (wctx *pipeUnpackWriteContext) init(workerID uint, ppBase pipeProcessor, keepOriginalFields, skipEmptyResults bool, brSrc *blockResult) {
|
||||
wctx.reset()
|
||||
|
||||
wctx.workerID = workerID
|
||||
wctx.ppBase = ppBase
|
||||
wctx.keepOriginalFields = keepOriginalFields
|
||||
wctx.skipEmptyResults = skipEmptyResults
|
||||
|
||||
wctx.brSrc = brSrc
|
||||
wctx.csSrc = brSrc.getColumns()
|
||||
|
@ -236,7 +240,7 @@ func (wctx *pipeUnpackWriteContext) writeRow(rowIdx int, extraFields []Field) {
|
|||
}
|
||||
for i, f := range extraFields {
|
||||
v := f.Value
|
||||
if wctx.keepOriginalFields {
|
||||
if v == "" && wctx.skipEmptyResults || wctx.keepOriginalFields {
|
||||
idx := getBlockResultColumnIdxByName(csSrc, f.Name)
|
||||
if idx >= 0 {
|
||||
vOrig := csSrc[idx].getValueAtRow(brSrc, rowIdx)
|
||||
|
|
|
@ -23,6 +23,7 @@ type pipeUnpackJSON struct {
|
|||
resultPrefix string
|
||||
|
||||
keepOriginalFields bool
|
||||
skipEmptyResults bool
|
||||
|
||||
// iff is an optional filter for skipping unpacking json
|
||||
iff *ifFilter
|
||||
|
@ -45,14 +46,17 @@ func (pu *pipeUnpackJSON) String() string {
|
|||
if pu.keepOriginalFields {
|
||||
s += " keep_original_fields"
|
||||
}
|
||||
if pu.skipEmptyResults {
|
||||
s += " skip_empty_results"
|
||||
}
|
||||
return s
|
||||
}
|
||||
|
||||
func (pu *pipeUnpackJSON) updateNeededFields(neededFields, unneededFields fieldsSet) {
|
||||
updateNeededFieldsForUnpackPipe(pu.fromField, pu.fields, pu.keepOriginalFields, pu.iff, neededFields, unneededFields)
|
||||
updateNeededFieldsForUnpackPipe(pu.fromField, pu.fields, pu.keepOriginalFields, pu.skipEmptyResults, pu.iff, neededFields, unneededFields)
|
||||
}
|
||||
|
||||
func updateNeededFieldsForUnpackPipe(fromField string, outFields []string, keepOriginalFields bool, iff *ifFilter, neededFields, unneededFields fieldsSet) {
|
||||
func updateNeededFieldsForUnpackPipe(fromField string, outFields []string, keepOriginalFields, skipEmptyResults bool, iff *ifFilter, neededFields, unneededFields fieldsSet) {
|
||||
if neededFields.contains("*") {
|
||||
unneededFieldsOrig := unneededFields.clone()
|
||||
unneededFieldsCount := 0
|
||||
|
@ -61,7 +65,7 @@ func updateNeededFieldsForUnpackPipe(fromField string, outFields []string, keepO
|
|||
if unneededFieldsOrig.contains(f) {
|
||||
unneededFieldsCount++
|
||||
}
|
||||
if !keepOriginalFields {
|
||||
if !keepOriginalFields && !skipEmptyResults {
|
||||
unneededFields.add(f)
|
||||
}
|
||||
}
|
||||
|
@ -81,7 +85,7 @@ func updateNeededFieldsForUnpackPipe(fromField string, outFields []string, keepO
|
|||
if neededFieldsOrig.contains(f) {
|
||||
needFromField = true
|
||||
}
|
||||
if !keepOriginalFields {
|
||||
if !keepOriginalFields && !skipEmptyResults {
|
||||
neededFields.remove(f)
|
||||
}
|
||||
}
|
||||
|
@ -130,7 +134,7 @@ func (pu *pipeUnpackJSON) newPipeProcessor(workersCount int, _ <-chan struct{},
|
|||
}
|
||||
PutJSONParser(p)
|
||||
}
|
||||
return newPipeUnpackProcessor(workersCount, unpackJSON, ppBase, pu.fromField, pu.resultPrefix, pu.keepOriginalFields, pu.iff)
|
||||
return newPipeUnpackProcessor(workersCount, unpackJSON, ppBase, pu.fromField, pu.resultPrefix, pu.keepOriginalFields, pu.skipEmptyResults, pu.iff)
|
||||
}
|
||||
|
||||
func parsePipeUnpackJSON(lex *lexer) (*pipeUnpackJSON, error) {
|
||||
|
@ -182,9 +186,14 @@ func parsePipeUnpackJSON(lex *lexer) (*pipeUnpackJSON, error) {
|
|||
}
|
||||
|
||||
keepOriginalFields := false
|
||||
if lex.isKeyword("keep_original_fields") {
|
||||
skipEmptyResults := false
|
||||
switch {
|
||||
case lex.isKeyword("keep_original_fields"):
|
||||
lex.nextToken()
|
||||
keepOriginalFields = true
|
||||
case lex.isKeyword("skip_empty_results"):
|
||||
lex.nextToken()
|
||||
skipEmptyResults = true
|
||||
}
|
||||
|
||||
pu := &pipeUnpackJSON{
|
||||
|
@ -192,6 +201,7 @@ func parsePipeUnpackJSON(lex *lexer) (*pipeUnpackJSON, error) {
|
|||
fields: fields,
|
||||
resultPrefix: resultPrefix,
|
||||
keepOriginalFields: keepOriginalFields,
|
||||
skipEmptyResults: skipEmptyResults,
|
||||
iff: iff,
|
||||
}
|
||||
|
||||
|
|
|
@ -15,22 +15,29 @@ func TestParsePipeUnpackJSONSuccess(t *testing.T) {
|
|||
}
|
||||
|
||||
f(`unpack_json`)
|
||||
f(`unpack_json skip_empty_results`)
|
||||
f(`unpack_json keep_original_fields`)
|
||||
f(`unpack_json fields (a)`)
|
||||
f(`unpack_json fields (a, b, c)`)
|
||||
f(`unpack_json fields (a, b, c) skip_empty_results`)
|
||||
f(`unpack_json fields (a, b, c) keep_original_fields`)
|
||||
f(`unpack_json if (a:x)`)
|
||||
f(`unpack_json if (a:x) skip_empty_results`)
|
||||
f(`unpack_json if (a:x) keep_original_fields`)
|
||||
f(`unpack_json from x`)
|
||||
f(`unpack_json from x skip_empty_results`)
|
||||
f(`unpack_json from x keep_original_fields`)
|
||||
f(`unpack_json from x fields (a, b)`)
|
||||
f(`unpack_json if (a:x) from x fields (a, b)`)
|
||||
f(`unpack_json if (a:x) from x fields (a, b) skip_empty_results`)
|
||||
f(`unpack_json if (a:x) from x fields (a, b) keep_original_fields`)
|
||||
f(`unpack_json from x result_prefix abc`)
|
||||
f(`unpack_json if (a:x) from x fields (a, b) result_prefix abc`)
|
||||
f(`unpack_json if (a:x) from x fields (a, b) result_prefix abc skip_empty_results`)
|
||||
f(`unpack_json if (a:x) from x fields (a, b) result_prefix abc keep_original_fields`)
|
||||
f(`unpack_json result_prefix abc`)
|
||||
f(`unpack_json if (a:x) fields (a, b) result_prefix abc`)
|
||||
f(`unpack_json if (a:x) fields (a, b) result_prefix abc skip_empty_results`)
|
||||
f(`unpack_json if (a:x) fields (a, b) result_prefix abc keep_original_fields`)
|
||||
}
|
||||
|
||||
|
@ -62,7 +69,39 @@ func TestPipeUnpackJSON(t *testing.T) {
|
|||
expectPipeResults(t, pipeStr, rows, rowsExpected)
|
||||
}
|
||||
|
||||
// no keep original fields fields
|
||||
// skip empty results
|
||||
f("unpack_json skip_empty_results", [][]Field{
|
||||
{
|
||||
{"_msg", `{"foo":"bar","z":"q","a":""}`},
|
||||
{"foo", "x"},
|
||||
{"a", "foobar"},
|
||||
},
|
||||
}, [][]Field{
|
||||
{
|
||||
{"_msg", `{"foo":"bar","z":"q","a":""}`},
|
||||
{"foo", "bar"},
|
||||
{"z", "q"},
|
||||
{"a", "foobar"},
|
||||
},
|
||||
})
|
||||
|
||||
// no skip empty results
|
||||
f("unpack_json", [][]Field{
|
||||
{
|
||||
{"_msg", `{"foo":"bar","z":"q","a":""}`},
|
||||
{"foo", "x"},
|
||||
{"a", "foobar"},
|
||||
},
|
||||
}, [][]Field{
|
||||
{
|
||||
{"_msg", `{"foo":"bar","z":"q","a":""}`},
|
||||
{"foo", "bar"},
|
||||
{"z", "q"},
|
||||
{"a", ""},
|
||||
},
|
||||
})
|
||||
|
||||
// no keep original fields
|
||||
f("unpack_json", [][]Field{
|
||||
{
|
||||
{"_msg", `{"foo":"bar","z":"q","a":"b"}`},
|
||||
|
@ -504,48 +543,61 @@ func TestPipeUnpackJSONUpdateNeededFields(t *testing.T) {
|
|||
|
||||
// all the needed fields
|
||||
f("unpack_json from x", "*", "", "*", "")
|
||||
f("unpack_json from x skip_empty_results", "*", "", "*", "")
|
||||
f("unpack_json from x keep_original_fields", "*", "", "*", "")
|
||||
f("unpack_json if (y:z) from x", "*", "", "*", "")
|
||||
f("unpack_json if (y:z) from x fields (a, b)", "*", "", "*", "a,b")
|
||||
f("unpack_json if (y:z) from x fields (a, b) skip_empty_results", "*", "", "*", "")
|
||||
f("unpack_json if (y:z) from x fields (a, b) keep_original_fields", "*", "", "*", "")
|
||||
|
||||
// all the needed fields, unneeded fields do not intersect with src
|
||||
f("unpack_json from x", "*", "f1,f2", "*", "f1,f2")
|
||||
f("unpack_json from x skip_empty_results", "*", "f1,f2", "*", "f1,f2")
|
||||
f("unpack_json from x keep_original_fields", "*", "f1,f2", "*", "f1,f2")
|
||||
f("unpack_json if (y:z) from x", "*", "f1,f2", "*", "f1,f2")
|
||||
f("unpack_json if (f1:z) from x", "*", "f1,f2", "*", "f2")
|
||||
f("unpack_json if (y:z) from x fields (f3)", "*", "f1,f2", "*", "f1,f2,f3")
|
||||
f("unpack_json if (y:z) from x fields (f1)", "*", "f1,f2", "*", "f1,f2")
|
||||
f("unpack_json if (y:z) from x fields (f1) skip_empty_results", "*", "f1,f2", "*", "f1,f2")
|
||||
f("unpack_json if (y:z) from x fields (f1) keep_original_fields", "*", "f1,f2", "*", "f1,f2")
|
||||
|
||||
// all the needed fields, unneeded fields intersect with src
|
||||
f("unpack_json from x", "*", "f2,x", "*", "f2")
|
||||
f("unpack_json from x skip_empty_results", "*", "f2,x", "*", "f2")
|
||||
f("unpack_json from x keep_original_fields", "*", "f2,x", "*", "f2")
|
||||
f("unpack_json if (y:z) from x", "*", "f2,x", "*", "f2")
|
||||
f("unpack_json if (f2:z) from x", "*", "f1,f2,x", "*", "f1")
|
||||
f("unpack_json if (f2:z) from x fields (f3)", "*", "f1,f2,x", "*", "f1,f3")
|
||||
f("unpack_json if (f2:z) from x fields (f3) skip_empty_results", "*", "f1,f2,x", "*", "f1")
|
||||
f("unpack_json if (f2:z) from x fields (f3) keep_original_fields", "*", "f1,f2,x", "*", "f1")
|
||||
|
||||
// needed fields do not intersect with src
|
||||
f("unpack_json from x", "f1,f2", "", "f1,f2,x", "")
|
||||
f("unpack_json from x skip_empty_results", "f1,f2", "", "f1,f2,x", "")
|
||||
f("unpack_json from x keep_original_fields", "f1,f2", "", "f1,f2,x", "")
|
||||
f("unpack_json if (y:z) from x", "f1,f2", "", "f1,f2,x,y", "")
|
||||
f("unpack_json if (f1:z) from x", "f1,f2", "", "f1,f2,x", "")
|
||||
f("unpack_json if (y:z) from x fields (f3)", "f1,f2", "", "f1,f2", "")
|
||||
f("unpack_json if (y:z) from x fields (f3) skip_empty_results", "f1,f2", "", "f1,f2", "")
|
||||
f("unpack_json if (y:z) from x fields (f3) keep_original_fields", "f1,f2", "", "f1,f2", "")
|
||||
f("unpack_json if (y:z) from x fields (f2)", "f1,f2", "", "f1,x,y", "")
|
||||
f("unpack_json if (f2:z) from x fields (f2)", "f1,f2", "", "f1,f2,x", "")
|
||||
f("unpack_json if (f2:z) from x fields (f2) skip_empty_results", "f1,f2", "", "f1,f2,x", "")
|
||||
f("unpack_json if (f2:z) from x fields (f2) keep_original_fields", "f1,f2", "", "f1,f2,x", "")
|
||||
|
||||
// needed fields intersect with src
|
||||
f("unpack_json from x", "f2,x", "", "f2,x", "")
|
||||
f("unpack_json from x skip_empty_results", "f2,x", "", "f2,x", "")
|
||||
f("unpack_json from x keep_original_fields", "f2,x", "", "f2,x", "")
|
||||
f("unpack_json if (y:z) from x", "f2,x", "", "f2,x,y", "")
|
||||
f("unpack_json if (f2:z y:qwe) from x", "f2,x", "", "f2,x,y", "")
|
||||
f("unpack_json if (y:z) from x fields (f1)", "f2,x", "", "f2,x", "")
|
||||
f("unpack_json if (y:z) from x fields (f1) skip_empty_results", "f2,x", "", "f2,x", "")
|
||||
f("unpack_json if (y:z) from x fields (f1) keep_original_fields", "f2,x", "", "f2,x", "")
|
||||
f("unpack_json if (y:z) from x fields (f2)", "f2,x", "", "x,y", "")
|
||||
f("unpack_json if (y:z) from x fields (f2) skip_empty_results", "f2,x", "", "f2,x,y", "")
|
||||
f("unpack_json if (y:z) from x fields (f2) keep_original_fields", "f2,x", "", "f2,x,y", "")
|
||||
f("unpack_json if (y:z) from x fields (x)", "f2,x", "", "f2,x,y", "")
|
||||
f("unpack_json if (y:z) from x fields (x) skip_empty_results", "f2,x", "", "f2,x,y", "")
|
||||
f("unpack_json if (y:z) from x fields (x) keep_original_fields", "f2,x", "", "f2,x,y", "")
|
||||
}
|
||||
|
|
|
@ -21,6 +21,7 @@ type pipeUnpackLogfmt struct {
|
|||
resultPrefix string
|
||||
|
||||
keepOriginalFields bool
|
||||
skipEmptyResults bool
|
||||
|
||||
// iff is an optional filter for skipping unpacking logfmt
|
||||
iff *ifFilter
|
||||
|
@ -43,11 +44,14 @@ func (pu *pipeUnpackLogfmt) String() string {
|
|||
if pu.keepOriginalFields {
|
||||
s += " keep_original_fields"
|
||||
}
|
||||
if pu.skipEmptyResults {
|
||||
s += " skip_empty_results"
|
||||
}
|
||||
return s
|
||||
}
|
||||
|
||||
func (pu *pipeUnpackLogfmt) updateNeededFields(neededFields, unneededFields fieldsSet) {
|
||||
updateNeededFieldsForUnpackPipe(pu.fromField, pu.fields, pu.keepOriginalFields, pu.iff, neededFields, unneededFields)
|
||||
updateNeededFieldsForUnpackPipe(pu.fromField, pu.fields, pu.keepOriginalFields, pu.skipEmptyResults, pu.iff, neededFields, unneededFields)
|
||||
}
|
||||
|
||||
func (pu *pipeUnpackLogfmt) newPipeProcessor(workersCount int, _ <-chan struct{}, _ func(), ppBase pipeProcessor) pipeProcessor {
|
||||
|
@ -78,7 +82,7 @@ func (pu *pipeUnpackLogfmt) newPipeProcessor(workersCount int, _ <-chan struct{}
|
|||
putLogfmtParser(p)
|
||||
}
|
||||
|
||||
return newPipeUnpackProcessor(workersCount, unpackLogfmt, ppBase, pu.fromField, pu.resultPrefix, pu.keepOriginalFields, pu.iff)
|
||||
return newPipeUnpackProcessor(workersCount, unpackLogfmt, ppBase, pu.fromField, pu.resultPrefix, pu.keepOriginalFields, pu.skipEmptyResults, pu.iff)
|
||||
|
||||
}
|
||||
|
||||
|
@ -131,9 +135,14 @@ func parsePipeUnpackLogfmt(lex *lexer) (*pipeUnpackLogfmt, error) {
|
|||
}
|
||||
|
||||
keepOriginalFields := false
|
||||
if lex.isKeyword("keep_original_fields") {
|
||||
skipEmptyResults := false
|
||||
switch {
|
||||
case lex.isKeyword("keep_original_fields"):
|
||||
lex.nextToken()
|
||||
keepOriginalFields = true
|
||||
case lex.isKeyword("skip_empty_results"):
|
||||
lex.nextToken()
|
||||
skipEmptyResults = true
|
||||
}
|
||||
|
||||
pu := &pipeUnpackLogfmt{
|
||||
|
@ -141,6 +150,7 @@ func parsePipeUnpackLogfmt(lex *lexer) (*pipeUnpackLogfmt, error) {
|
|||
fields: fields,
|
||||
resultPrefix: resultPrefix,
|
||||
keepOriginalFields: keepOriginalFields,
|
||||
skipEmptyResults: skipEmptyResults,
|
||||
iff: iff,
|
||||
}
|
||||
|
||||
|
|
|
@ -11,25 +11,32 @@ func TestParsePipeUnpackLogfmtSuccess(t *testing.T) {
|
|||
}
|
||||
|
||||
f(`unpack_logfmt`)
|
||||
f(`unpack_logfmt skip_empty_results`)
|
||||
f(`unpack_logfmt keep_original_fields`)
|
||||
f(`unpack_logfmt fields (a, b)`)
|
||||
f(`unpack_logfmt fields (a, b) skip_empty_results`)
|
||||
f(`unpack_logfmt fields (a, b) keep_original_fields`)
|
||||
f(`unpack_logfmt if (a:x)`)
|
||||
f(`unpack_logfmt if (a:x) skip_empty_results`)
|
||||
f(`unpack_logfmt if (a:x) keep_original_fields`)
|
||||
f(`unpack_logfmt if (a:x) fields (a, b)`)
|
||||
f(`unpack_logfmt from x`)
|
||||
f(`unpack_logfmt from x skip_empty_results`)
|
||||
f(`unpack_logfmt from x keep_original_fields`)
|
||||
f(`unpack_logfmt from x fields (a, b)`)
|
||||
f(`unpack_logfmt from x fields (a, b) skip_empty_results`)
|
||||
f(`unpack_logfmt from x fields (a, b) keep_original_fields`)
|
||||
f(`unpack_logfmt if (a:x) from x`)
|
||||
f(`unpack_logfmt if (a:x) from x fields (a, b)`)
|
||||
f(`unpack_logfmt from x result_prefix abc`)
|
||||
f(`unpack_logfmt if (a:x) from x result_prefix abc`)
|
||||
f(`unpack_logfmt if (a:x) from x fields (a, b) result_prefix abc`)
|
||||
f(`unpack_logfmt if (a:x) from x fields (a, b) result_prefix abc skip_empty_results`)
|
||||
f(`unpack_logfmt if (a:x) from x fields (a, b) result_prefix abc keep_original_fields`)
|
||||
f(`unpack_logfmt result_prefix abc`)
|
||||
f(`unpack_logfmt if (a:x) result_prefix abc`)
|
||||
f(`unpack_logfmt if (a:x) fields (a, b) result_prefix abc`)
|
||||
f(`unpack_logfmt if (a:x) fields (a, b) result_prefix abc skip_empty_results`)
|
||||
f(`unpack_logfmt if (a:x) fields (a, b) result_prefix abc keep_original_fields`)
|
||||
}
|
||||
|
||||
|
@ -75,6 +82,38 @@ func TestPipeUnpackLogfmt(t *testing.T) {
|
|||
},
|
||||
})
|
||||
|
||||
// no skip empty results
|
||||
f("unpack_logfmt", [][]Field{
|
||||
{
|
||||
{"_msg", `foo= baz="x y=z" a=b`},
|
||||
{"foo", "321"},
|
||||
{"baz", "abcdef"},
|
||||
},
|
||||
}, [][]Field{
|
||||
{
|
||||
{"_msg", `foo= baz="x y=z" a=b`},
|
||||
{"foo", ""},
|
||||
{"baz", "x y=z"},
|
||||
{"a", "b"},
|
||||
},
|
||||
})
|
||||
|
||||
// skip empty results
|
||||
f("unpack_logfmt skip_empty_results", [][]Field{
|
||||
{
|
||||
{"_msg", `foo= baz="x y=z" a=b`},
|
||||
{"foo", "321"},
|
||||
{"baz", "abcdef"},
|
||||
},
|
||||
}, [][]Field{
|
||||
{
|
||||
{"_msg", `foo= baz="x y=z" a=b`},
|
||||
{"foo", "321"},
|
||||
{"baz", "x y=z"},
|
||||
{"a", "b"},
|
||||
},
|
||||
})
|
||||
|
||||
// keep original fields
|
||||
f("unpack_logfmt keep_original_fields", [][]Field{
|
||||
{
|
||||
|
@ -268,6 +307,7 @@ func TestPipeUnpackLogfmtUpdateNeededFields(t *testing.T) {
|
|||
// all the needed fields
|
||||
f("unpack_logfmt", "*", "", "*", "")
|
||||
f("unpack_logfmt fields (f1, f2)", "*", "", "*", "f1,f2")
|
||||
f("unpack_logfmt fields (f1, f2) skip_empty_results", "*", "", "*", "")
|
||||
f("unpack_logfmt fields (f1, f2) keep_original_fields", "*", "", "*", "")
|
||||
f("unpack_logfmt keep_original_fields", "*", "", "*", "")
|
||||
f("unpack_logfmt if (y:z) from x", "*", "", "*", "")
|
||||
|
|
Loading…
Reference in a new issue