diff --git a/docs/VictoriaLogs/CHANGELOG.md b/docs/VictoriaLogs/CHANGELOG.md index b756ecd17..810256634 100644 --- a/docs/VictoriaLogs/CHANGELOG.md +++ b/docs/VictoriaLogs/CHANGELOG.md @@ -19,7 +19,9 @@ according to [these docs](https://docs.victoriametrics.com/VictoriaLogs/QuickSta ## tip -* FEATURE: add ability to extract fields only if the given condition is met. See [these docs](https://docs.victoriametrics.com/victorialogs/logsql/#conditional-extract). +* FEATURE: add ability to extract fields with [`extract` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#extract-pipe) only if the given conditions are met. See [these docs](https://docs.victoriametrics.com/victorialogs/logsql/#conditional-extract). +* FEATURE: add ability to unpack JSON fields with [`unpack_json` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#unpack_json-pipe) only if the given conditions are met. See [these docs](https://docs.victoriametrics.com/victorialogs/logsql/#conditional-unpack_json). +* FEATURE: add ability to unpack [logfmt](https://brandur.org/logfmt) fields with [`unpack_logfmt` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#unpack_logfmt-pipe) only if the given conditions are met. See [these docs](https://docs.victoriametrics.com/victorialogs/logsql/#conditional-unpack_logfmt). ## [v0.8.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.8.0-victorialogs) diff --git a/docs/VictoriaLogs/LogsQL.md b/docs/VictoriaLogs/LogsQL.md index df74d88a7..35b725b85 100644 --- a/docs/VictoriaLogs/LogsQL.md +++ b/docs/VictoriaLogs/LogsQL.md @@ -1142,7 +1142,7 @@ See also: #### Conditional extract -Sometimes it is needed to skip some entries from applying [`extract` pipe](#extract-pipe). This can be done by adding `if ()` filter to the end of `| extract ...` pipe. +If some log entries must be skipped from [`extract` pipe](#extract-pipe), then add `if ()` filter to the end of `| extract ...` pipe. The `` can contain arbitrary [filters](#filters). For example, the following query extracts `ip` field only if the input [log entry](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#data-model) doesn't contain `ip` field or this field is empty: @@ -1594,9 +1594,20 @@ _time:5m | extract '"ip":' See also: +- [Conditional `unpack_json`](#conditional-unpack_json) - [`unpack_logfmt` pipe](#unpack_logfmt-pipe) - [`extract` pipe](#extract-pipe) +#### Conditional unpack_json + +If the [`unpack_json` pipe](#unpack_json-pipe) musn't be applied to every [log entry](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model), +then add `if ()` to the end of `unpack_json ...`. +The `` can contain arbitrary [filters](#filters). For example, the following query unpacks JSON fields only if `ip` field in the current log entry isn't set or empty: + +```logsql +_time:5m | unpack_json if (ip:"") +``` + ### unpack_logfmt pipe `| unpack_logfmt from field_name` pipe unpacks `k1=v1 ... kN=vN` [logfmt](https://brandur.org/logfmt) fields @@ -1635,9 +1646,20 @@ _time:5m | extract ' ip=' See also: +- [Conditional unpack_logfmt](#conditional-unpack_logfmt) - [`unpack_json` pipe](#unpack_json-pipe) - [`extract` pipe](#extract-pipe) +#### Conditional unpack_logfmt + +If the [`unpack_logfmt` pipe](#unpack_logfmt-pipe) musn't be applied to every [log entry](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model), +then add `if ()` to the end of `unpack_logfmt ...`. +The `` can contain arbitrary [filters](#filters). For example, the following query unpacks logfmt fields only if `ip` field in the current log entry isn't set or empty: + +```logsql +_time:5m | unpack_logfmt if (ip:"") +``` + ## stats pipe functions LogsQL supports the following functions for [`stats` pipe](#stats-pipe): diff --git a/lib/logstorage/if_filter.go b/lib/logstorage/if_filter.go new file mode 100644 index 000000000..f6852e604 --- /dev/null +++ b/lib/logstorage/if_filter.go @@ -0,0 +1,75 @@ +package logstorage + +import ( + "fmt" +) + +type ifFilter struct { + f filter + neededFields []string +} + +func (iff *ifFilter) String() string { + return "if (" + iff.f.String() + ")" +} + +func parseIfFilter(lex *lexer) (*ifFilter, error) { + if !lex.isKeyword("if") { + return nil, fmt.Errorf("unexpected keyword %q; expecting 'if'", lex.token) + } + lex.nextToken() + if !lex.isKeyword("(") { + return nil, fmt.Errorf("unexpected token %q after 'if'; expecting '('", lex.token) + } + lex.nextToken() + + if lex.isKeyword(")") { + lex.nextToken() + iff := &ifFilter{ + f: &filterNoop{}, + } + return iff, nil + } + + f, err := parseFilter(lex) + if err != nil { + return nil, fmt.Errorf("cannot parse 'if' filter: %w", err) + } + if !lex.isKeyword(")") { + return nil, fmt.Errorf("unexpected token %q after 'if' filter; expecting ')'", lex.token) + } + lex.nextToken() + + neededFields := newFieldsSet() + f.updateNeededFields(neededFields) + + iff := &ifFilter{ + f: f, + neededFields: neededFields.getAll(), + } + + return iff, nil +} + +func (iff *ifFilter) optimizeFilterIn() { + if iff == nil { + return + } + + optimizeFilterIn(iff.f) +} + +func optimizeFilterIn(f filter) { + if f == nil { + return + } + + visitFunc := func(f filter) bool { + fi, ok := f.(*filterIn) + if ok && fi.q != nil { + fi.q.Optimize() + } + return false + } + _ = visitFilter(f, visitFunc) +} diff --git a/lib/logstorage/parser.go b/lib/logstorage/parser.go index fc5832f00..c64adba9e 100644 --- a/lib/logstorage/parser.go +++ b/lib/logstorage/parser.go @@ -320,10 +320,14 @@ func (q *Query) Optimize() { switch t := p.(type) { case *pipeStats: for _, f := range t.funcs { - optimizeFilterIn(f.iff) + f.iff.optimizeFilterIn() } case *pipeExtract: - optimizeFilterIn(t.iff) + t.iff.optimizeFilterIn() + case *pipeUnpackJSON: + t.iff.optimizeFilterIn() + case *pipeUnpackLogfmt: + t.iff.optimizeFilterIn() } } } @@ -344,21 +348,6 @@ func removeStarFilters(f filter) filter { return f } -func optimizeFilterIn(f filter) { - if f == nil { - return - } - - visitFunc := func(f filter) bool { - fi, ok := f.(*filterIn) - if ok && fi.q != nil { - fi.q.Optimize() - } - return false - } - _ = visitFilter(f, visitFunc) -} - func optimizeSortOffsetPipes(pipes []pipe) []pipe { // Merge 'sort ... | offset ...' into 'sort ... offset ...' i := 1 diff --git a/lib/logstorage/pipe_extract.go b/lib/logstorage/pipe_extract.go index f829d792d..9099f4fe3 100644 --- a/lib/logstorage/pipe_extract.go +++ b/lib/logstorage/pipe_extract.go @@ -15,8 +15,7 @@ type pipeExtract struct { pattern string // iff is an optional filter for skipping the extract func - iff filter - iffNeededFields []string + iff *ifFilter } func (pe *pipeExtract) String() string { @@ -26,7 +25,7 @@ func (pe *pipeExtract) String() string { } s += " " + quoteTokenIfNeeded(pe.pattern) if pe.iff != nil { - s += " if (" + pe.iff.String() + ")" + s += " " + pe.iff.String() } return s } @@ -44,7 +43,9 @@ func (pe *pipeExtract) updateNeededFields(neededFields, unneededFields fieldsSet } } if needFromField { - unneededFields.removeFields(pe.iffNeededFields) + if pe.iff != nil { + unneededFields.removeFields(pe.iff.neededFields) + } unneededFields.remove(pe.fromField) } else { unneededFields.add(pe.fromField) @@ -59,7 +60,9 @@ func (pe *pipeExtract) updateNeededFields(neededFields, unneededFields fieldsSet } } if needFromField { - neededFields.addFields(pe.iffNeededFields) + if pe.iff != nil { + neededFields.addFields(pe.iff.neededFields) + } neededFields.add(pe.fromField) } } @@ -120,7 +123,7 @@ func (pep *pipeExtractProcessor) writeBlock(workerID uint, br *blockResult) { bm.init(len(br.timestamps)) bm.setBits() if iff := pep.pe.iff; iff != nil { - iff.applyToBlockResult(br, bm) + iff.f.applyToBlockResult(br, bm) if bm.isZero() { // Fast path - nothing to extract. pep.ppBase.writeBlock(workerID, br) @@ -132,31 +135,34 @@ func (pep *pipeExtractProcessor) writeBlock(workerID uint, br *blockResult) { if c.isConst { v := c.valuesEncoded[0] ef.apply(v) + for _, f := range ef.fields { + shard.uctx.addField(f.name, *f.value, "") + } for i := range br.timestamps { - shard.uctx.resetFields() if bm.isSetBit(i) { - for _, f := range ef.fields { - shard.uctx.addField(f.name, *f.value, "") - } + shard.wctx.writeRow(i, shard.uctx.fields) + } else { + shard.wctx.writeRow(i, nil) } - shard.wctx.writeRow(i, shard.uctx.fields) } } else { values := c.getValues(br) vPrevApplied := "" for i, v := range values { - shard.uctx.resetFields() if bm.isSetBit(i) { if vPrevApplied != v { ef.apply(v) + shard.uctx.resetFields() + for _, f := range ef.fields { + shard.uctx.addField(f.name, *f.value, "") + } vPrevApplied = v } - for _, f := range ef.fields { - shard.uctx.addField(f.name, *f.value, "") - } + shard.wctx.writeRow(i, shard.uctx.fields) + } else { + shard.wctx.writeRow(i, nil) } - shard.wctx.writeRow(i, shard.uctx.fields) } } @@ -207,10 +213,6 @@ func parsePipeExtract(lex *lexer) (*pipeExtract, error) { return nil, err } pe.iff = iff - - neededFields := newFieldsSet() - iff.updateNeededFields(neededFields) - pe.iffNeededFields = neededFields.getAll() } return pe, nil diff --git a/lib/logstorage/pipe_stats.go b/lib/logstorage/pipe_stats.go index 9b7116633..20dd69f32 100644 --- a/lib/logstorage/pipe_stats.go +++ b/lib/logstorage/pipe_stats.go @@ -28,11 +28,8 @@ type pipeStatsFunc struct { // f is stats function to execute f statsFunc - // neededFieldsForFunc contains needed fields for f execution - neededFieldsForFunc []string - // iff is an additional filter, which is applied to results before executing f on them - iff filter + iff *ifFilter // resultName is the name of the output generated by f resultName string @@ -90,7 +87,7 @@ func (ps *pipeStats) String() string { for i, f := range ps.funcs { line := f.f.String() if f.iff != nil { - line += " if (" + f.iff.String() + ")" + line += " " + f.iff.String() } line += " as " + quoteTokenIfNeeded(f.resultName) a[i] = line @@ -112,7 +109,7 @@ func (ps *pipeStats) updateNeededFields(neededFields, unneededFields fieldsSet) if neededFieldsOrig.contains(f.resultName) && !unneededFields.contains(f.resultName) { f.f.updateNeededFields(neededFields) if f.iff != nil { - f.iff.updateNeededFields(neededFields) + neededFields.addFields(f.iff.neededFields) } } } @@ -311,7 +308,7 @@ func (shard *pipeStatsProcessorShard) applyPerFunctionFilters(brSrc *blockResult bm := &shard.bms[i] bm.init(len(brSrc.timestamps)) bm.setBits() - iff.applyToBlockResult(brSrc, bm) + iff.f.applyToBlockResult(brSrc, bm) if bm.areAllBitsSet() { // Fast path - per-function filter doesn't filter out rows brs[i] = brSrc @@ -323,7 +320,7 @@ func (shard *pipeStatsProcessorShard) applyPerFunctionFilters(brSrc *blockResult if bm.isZero() { brDst.reset() } else { - brDst.initFromFilterNeededColumns(brSrc, bm, funcs[i].neededFieldsForFunc) + brDst.initFromFilterNeededColumns(brSrc, bm, iff.neededFields) } brs[i] = brDst } @@ -533,10 +530,6 @@ func parsePipeStats(lex *lexer) (*pipeStats, error) { return nil, err } f.iff = iff - - neededFields := newFieldsSet() - iff.updateNeededFields(neededFields) - f.neededFieldsForFunc = neededFields.getAll() } resultName, err := parseResultName(lex) @@ -558,30 +551,6 @@ func parsePipeStats(lex *lexer) (*pipeStats, error) { } } -func parseIfFilter(lex *lexer) (filter, error) { - if !lex.isKeyword("if") { - return nil, fmt.Errorf("unexpected keyword %q; expecting 'if'", lex.token) - } - lex.nextToken() - if !lex.isKeyword("(") { - return nil, fmt.Errorf("unexpected token %q after 'if'; expecting '('", lex.token) - } - lex.nextToken() - if lex.isKeyword(")") { - lex.nextToken() - return &filterNoop{}, nil - } - f, err := parseFilter(lex) - if err != nil { - return nil, fmt.Errorf("cannot parse 'if' filter: %w", err) - } - if !lex.isKeyword(")") { - return nil, fmt.Errorf("unexpected token %q after 'if' filter; expecting ')'", lex.token) - } - lex.nextToken() - return f, nil -} - func parseStatsFunc(lex *lexer) (statsFunc, error) { switch { case lex.isKeyword("count"): diff --git a/lib/logstorage/pipe_unpack.go b/lib/logstorage/pipe_unpack.go index 2f232d8ab..1a39872dc 100644 --- a/lib/logstorage/pipe_unpack.go +++ b/lib/logstorage/pipe_unpack.go @@ -40,7 +40,8 @@ func (uctx *fieldsUnpackerContext) addField(name, value, fieldPrefix string) { }) } -func newPipeUnpackProcessor(workersCount int, unpackFunc func(uctx *fieldsUnpackerContext, s, fieldPrefix string), ppBase pipeProcessor, fromField, fieldPrefix string) *pipeUnpackProcessor { +func newPipeUnpackProcessor(workersCount int, unpackFunc func(uctx *fieldsUnpackerContext, s, fieldPrefix string), ppBase pipeProcessor, + fromField, fieldPrefix string, iff *ifFilter) *pipeUnpackProcessor { return &pipeUnpackProcessor{ unpackFunc: unpackFunc, ppBase: ppBase, @@ -49,6 +50,7 @@ func newPipeUnpackProcessor(workersCount int, unpackFunc func(uctx *fieldsUnpack fromField: fromField, fieldPrefix: fieldPrefix, + iff: iff, } } @@ -60,6 +62,8 @@ type pipeUnpackProcessor struct { fromField string fieldPrefix string + + iff *ifFilter } type pipeUnpackProcessorShard struct { @@ -70,6 +74,8 @@ type pipeUnpackProcessorShard struct { } type pipeUnpackProcessorShardNopad struct { + bm bitmap + uctx fieldsUnpackerContext wctx pipeUnpackWriteContext } @@ -82,22 +88,43 @@ func (pup *pipeUnpackProcessor) writeBlock(workerID uint, br *blockResult) { shard := &pup.shards[workerID] shard.wctx.init(br, pup.ppBase) + bm := &shard.bm + bm.init(len(br.timestamps)) + bm.setBits() + if pup.iff != nil { + pup.iff.f.applyToBlockResult(br, bm) + if bm.isZero() { + pup.ppBase.writeBlock(workerID, br) + return + } + } + c := br.getColumnByName(pup.fromField) if c.isConst { v := c.valuesEncoded[0] shard.uctx.resetFields() pup.unpackFunc(&shard.uctx, v, pup.fieldPrefix) for rowIdx := range br.timestamps { - shard.wctx.writeRow(rowIdx, shard.uctx.fields) + if bm.isSetBit(rowIdx) { + shard.wctx.writeRow(rowIdx, shard.uctx.fields) + } else { + shard.wctx.writeRow(rowIdx, nil) + } } } else { values := c.getValues(br) + vPrevApplied := "" for i, v := range values { - if i == 0 || values[i-1] != v { - shard.uctx.resetFields() - pup.unpackFunc(&shard.uctx, v, pup.fieldPrefix) + if bm.isSetBit(i) { + if vPrevApplied != v { + shard.uctx.resetFields() + pup.unpackFunc(&shard.uctx, v, pup.fieldPrefix) + vPrevApplied = v + } + shard.wctx.writeRow(i, shard.uctx.fields) + } else { + shard.wctx.writeRow(i, nil) } - shard.wctx.writeRow(i, shard.uctx.fields) } } diff --git a/lib/logstorage/pipe_unpack_json.go b/lib/logstorage/pipe_unpack_json.go index 951bae0f7..4d2e15308 100644 --- a/lib/logstorage/pipe_unpack_json.go +++ b/lib/logstorage/pipe_unpack_json.go @@ -10,9 +10,14 @@ import ( // // See https://docs.victoriametrics.com/victorialogs/logsql/#unpack_json-pipe type pipeUnpackJSON struct { + // fromField is the field to unpack json fields from fromField string + // resultPrefix is prefix to add to unpacked field names resultPrefix string + + // iff is an optional filter for skipping unpacking json + iff *ifFilter } func (pu *pipeUnpackJSON) String() string { @@ -23,19 +28,28 @@ func (pu *pipeUnpackJSON) String() string { if pu.resultPrefix != "" { s += " result_prefix " + quoteTokenIfNeeded(pu.resultPrefix) } + if pu.iff != nil { + s += " " + pu.iff.String() + } return s } func (pu *pipeUnpackJSON) updateNeededFields(neededFields, unneededFields fieldsSet) { if neededFields.contains("*") { unneededFields.remove(pu.fromField) + if pu.iff != nil { + unneededFields.removeFields(pu.iff.neededFields) + } } else { neededFields.add(pu.fromField) + if pu.iff != nil { + neededFields.addFields(pu.iff.neededFields) + } } } func (pu *pipeUnpackJSON) newPipeProcessor(workersCount int, _ <-chan struct{}, _ func(), ppBase pipeProcessor) pipeProcessor { - return newPipeUnpackProcessor(workersCount, unpackJSON, ppBase, pu.fromField, pu.resultPrefix) + return newPipeUnpackProcessor(workersCount, unpackJSON, ppBase, pu.fromField, pu.resultPrefix, pu.iff) } func unpackJSON(uctx *fieldsUnpackerContext, s, fieldPrefix string) { @@ -82,5 +96,14 @@ func parsePipeUnpackJSON(lex *lexer) (*pipeUnpackJSON, error) { fromField: fromField, resultPrefix: resultPrefix, } + + if lex.isKeyword("if") { + iff, err := parseIfFilter(lex) + if err != nil { + return nil, err + } + pu.iff = iff + } + return pu, nil } diff --git a/lib/logstorage/pipe_unpack_json_test.go b/lib/logstorage/pipe_unpack_json_test.go index a1e6bed52..f37cfc51f 100644 --- a/lib/logstorage/pipe_unpack_json_test.go +++ b/lib/logstorage/pipe_unpack_json_test.go @@ -15,9 +15,13 @@ func TestParsePipeUnpackJSONSuccess(t *testing.T) { } f(`unpack_json`) + f(`unpack_json if (a:x)`) f(`unpack_json from x`) + f(`unpack_json from x if (a:x)`) f(`unpack_json from x result_prefix abc`) + f(`unpack_json from x result_prefix abc if (a:x)`) f(`unpack_json result_prefix abc`) + f(`unpack_json result_prefix abc if (a:x)`) } func TestParsePipeUnpackJSONFailure(t *testing.T) { @@ -27,12 +31,19 @@ func TestParsePipeUnpackJSONFailure(t *testing.T) { } f(`unpack_json foo`) + f(`unpack_json if`) + f(`unpack_json if (x:y) foobar`) f(`unpack_json from`) + f(`unpack_json from if`) f(`unpack_json from x y`) + f(`unpack_json from x if`) f(`unpack_json from x result_prefix`) + f(`unpack_json from x result_prefix if`) f(`unpack_json from x result_prefix a b`) + f(`unpack_json from x result_prefix a if`) f(`unpack_json result_prefix`) f(`unpack_json result_prefix a b`) + f(`unpack_json result_prefix a if`) } func TestPipeUnpackJSON(t *testing.T) { @@ -53,6 +64,30 @@ func TestPipeUnpackJSON(t *testing.T) { }, }) + // failed if condition + f("unpack_json if (x:foo)", [][]Field{ + { + {"_msg", `{"foo":"bar"}`}, + }, + }, [][]Field{ + { + {"_msg", `{"foo":"bar"}`}, + {"x", ""}, + }, + }) + + // matched if condition + f("unpack_json if (foo)", [][]Field{ + { + {"_msg", `{"foo":"bar"}`}, + }, + }, [][]Field{ + { + {"_msg", `{"foo":"bar"}`}, + {"foo", "bar"}, + }, + }) + // single row, unpack from _msg into _msg f("unpack_json", [][]Field{ { @@ -160,8 +195,8 @@ func TestPipeUnpackJSON(t *testing.T) { }, }) - // multiple rows with distinct number of fields with result_prefix - f("unpack_json from x result_prefix qwe_", [][]Field{ + // multiple rows with distinct number of fields with result_prefix and if condition + f("unpack_json from x result_prefix qwe_ if (y:abc)", [][]Field{ { {"x", `{"foo":"bar","baz":"xyz"}`}, {"y", `abc`}, @@ -184,9 +219,9 @@ func TestPipeUnpackJSON(t *testing.T) { {"y", `abc`}, }, { + {"y", ""}, {"z", `foobar`}, {"x", `{"z":["bar",123]}`}, - {"qwe_z", `["bar",123]`}, }, }) } @@ -419,16 +454,25 @@ func TestPipeUnpackJSONUpdateNeededFields(t *testing.T) { // all the needed fields f("unpack_json from x", "*", "", "*", "") + f("unpack_json from x if (y:z)", "*", "", "*", "") // all the needed fields, unneeded fields do not intersect with src f("unpack_json from x", "*", "f1,f2", "*", "f1,f2") + f("unpack_json from x if (y:z)", "*", "f1,f2", "*", "f1,f2") + f("unpack_json from x if (f1:z)", "*", "f1,f2", "*", "f2") // all the needed fields, unneeded fields intersect with src f("unpack_json from x", "*", "f2,x", "*", "f2") + f("unpack_json from x if (y:z)", "*", "f2,x", "*", "f2") + f("unpack_json from x if (f2:z)", "*", "f1,f2,x", "*", "f1") // needed fields do not intersect with src f("unpack_json from x", "f1,f2", "", "f1,f2,x", "") + f("unpack_json from x if (y:z)", "f1,f2", "", "f1,f2,x,y", "") + f("unpack_json from x if (f1:z)", "f1,f2", "", "f1,f2,x", "") // needed fields intersect with src f("unpack_json from x", "f2,x", "", "f2,x", "") + f("unpack_json from x if (y:z)", "f2,x", "", "f2,x,y", "") + f("unpack_json from x if (f2:z y:qwe)", "f2,x", "", "f2,x,y", "") } diff --git a/lib/logstorage/pipe_unpack_logfmt.go b/lib/logstorage/pipe_unpack_logfmt.go index 0859a3f48..d2ad828df 100644 --- a/lib/logstorage/pipe_unpack_logfmt.go +++ b/lib/logstorage/pipe_unpack_logfmt.go @@ -9,9 +9,14 @@ import ( // // See https://docs.victoriametrics.com/victorialogs/logsql/#unpack_logfmt-pipe type pipeUnpackLogfmt struct { + // fromField is the field to unpack logfmt fields from fromField string + // resultPrefix is prefix to add to unpacked field names resultPrefix string + + // iff is an optional filter for skipping unpacking logfmt + iff *ifFilter } func (pu *pipeUnpackLogfmt) String() string { @@ -22,19 +27,28 @@ func (pu *pipeUnpackLogfmt) String() string { if pu.resultPrefix != "" { s += " result_prefix " + quoteTokenIfNeeded(pu.resultPrefix) } + if pu.iff != nil { + s += " " + pu.iff.String() + } return s } func (pu *pipeUnpackLogfmt) updateNeededFields(neededFields, unneededFields fieldsSet) { if neededFields.contains("*") { unneededFields.remove(pu.fromField) + if pu.iff != nil { + unneededFields.removeFields(pu.iff.neededFields) + } } else { neededFields.add(pu.fromField) + if pu.iff != nil { + neededFields.addFields(pu.iff.neededFields) + } } } func (pu *pipeUnpackLogfmt) newPipeProcessor(workersCount int, _ <-chan struct{}, _ func(), ppBase pipeProcessor) pipeProcessor { - return newPipeUnpackProcessor(workersCount, unpackLogfmt, ppBase, pu.fromField, pu.resultPrefix) + return newPipeUnpackProcessor(workersCount, unpackLogfmt, ppBase, pu.fromField, pu.resultPrefix, pu.iff) } func unpackLogfmt(uctx *fieldsUnpackerContext, s, fieldPrefix string) { @@ -106,5 +120,14 @@ func parsePipeUnpackLogfmt(lex *lexer) (*pipeUnpackLogfmt, error) { fromField: fromField, resultPrefix: resultPrefix, } + + if lex.isKeyword("if") { + iff, err := parseIfFilter(lex) + if err != nil { + return nil, err + } + pu.iff = iff + } + return pu, nil } diff --git a/lib/logstorage/pipe_unpack_logfmt_test.go b/lib/logstorage/pipe_unpack_logfmt_test.go index 4cc73c5a6..7da7dba4f 100644 --- a/lib/logstorage/pipe_unpack_logfmt_test.go +++ b/lib/logstorage/pipe_unpack_logfmt_test.go @@ -11,9 +11,13 @@ func TestParsePipeUnpackLogfmtSuccess(t *testing.T) { } f(`unpack_logfmt`) + f(`unpack_logfmt if (a:x)`) f(`unpack_logfmt from x`) + f(`unpack_logfmt from x if (a:x)`) f(`unpack_logfmt from x result_prefix abc`) + f(`unpack_logfmt from x result_prefix abc if (a:x)`) f(`unpack_logfmt result_prefix abc`) + f(`unpack_logfmt result_prefix abc if (a:x)`) } func TestParsePipeUnpackLogfmtFailure(t *testing.T) { @@ -23,12 +27,19 @@ func TestParsePipeUnpackLogfmtFailure(t *testing.T) { } f(`unpack_logfmt foo`) + f(`unpack_logfmt if`) + f(`unpack_logfmt if (x:y) foobar`) f(`unpack_logfmt from`) + f(`unpack_logfmt from if`) f(`unpack_logfmt from x y`) + f(`unpack_logfmt from x if`) f(`unpack_logfmt from x result_prefix`) + f(`unpack_logfmt from x result_prefix if`) f(`unpack_logfmt from x result_prefix a b`) + f(`unpack_logfmt from x result_prefix a if`) f(`unpack_logfmt result_prefix`) f(`unpack_logfmt result_prefix a b`) + f(`unpack_logfmt result_prefix a if`) } func TestPipeUnpackLogfmt(t *testing.T) { @@ -51,6 +62,32 @@ func TestPipeUnpackLogfmt(t *testing.T) { }, }) + // failed if condition + f("unpack_logfmt if (foo:bar)", [][]Field{ + { + {"_msg", `foo=bar baz="x y=z" a=b`}, + }, + }, [][]Field{ + { + {"foo", ""}, + {"_msg", `foo=bar baz="x y=z" a=b`}, + }, + }) + + // matched if condition + f("unpack_logfmt if (foo)", [][]Field{ + { + {"_msg", `foo=bar baz="x y=z" a=b`}, + }, + }, [][]Field{ + { + {"_msg", `foo=bar baz="x y=z" a=b`}, + {"foo", "bar"}, + {"baz", "x y=z"}, + {"a", "b"}, + }, + }) + // single row, unpack from _msg into _msg f("unpack_logfmt", [][]Field{ { @@ -148,8 +185,8 @@ func TestPipeUnpackLogfmt(t *testing.T) { }, }) - // multiple rows with distinct number of fields, with result_prefix - f("unpack_logfmt from x result_prefix qwe_", [][]Field{ + // multiple rows with distinct number of fields, with result_prefix and if condition + f("unpack_logfmt from x result_prefix qwe_ if (y:abc)", [][]Field{ { {"x", `foo=bar baz=xyz`}, {"y", `abc`}, @@ -172,9 +209,9 @@ func TestPipeUnpackLogfmt(t *testing.T) { {"y", `abc`}, }, { + {"y", ""}, {"z", `foobar`}, {"x", `z=bar`}, - {"qwe_z", `bar`}, }, }) } @@ -187,16 +224,25 @@ func TestPipeUnpackLogfmtUpdateNeededFields(t *testing.T) { // all the needed fields f("unpack_logfmt from x", "*", "", "*", "") + f("unpack_logfmt from x if (y:z)", "*", "", "*", "") // all the needed fields, unneeded fields do not intersect with src f("unpack_logfmt from x", "*", "f1,f2", "*", "f1,f2") + f("unpack_logfmt from x if (y:z)", "*", "f1,f2", "*", "f1,f2") + f("unpack_logfmt from x if (f1:z)", "*", "f1,f2", "*", "f2") // all the needed fields, unneeded fields intersect with src f("unpack_logfmt from x", "*", "f2,x", "*", "f2") + f("unpack_logfmt from x if (y:z)", "*", "f2,x", "*", "f2") + f("unpack_logfmt from x if (f2:z)", "*", "f1,f2,x", "*", "f1") // needed fields do not intersect with src f("unpack_logfmt from x", "f1,f2", "", "f1,f2,x", "") + f("unpack_logfmt from x if (y:z)", "f1,f2", "", "f1,f2,x,y", "") + f("unpack_logfmt from x if (f1:z)", "f1,f2", "", "f1,f2,x", "") // needed fields intersect with src f("unpack_logfmt from x", "f2,x", "", "f2,x", "") + f("unpack_logfmt from x if (y:z)", "f2,x", "", "f2,x,y", "") + f("unpack_logfmt from x if (f2:z y:qwe)", "f2,x", "", "f2,x,y", "") } diff --git a/lib/logstorage/storage_search.go b/lib/logstorage/storage_search.go index 5e79f888f..5aba815fd 100644 --- a/lib/logstorage/storage_search.go +++ b/lib/logstorage/storage_search.go @@ -259,6 +259,13 @@ func (s *Storage) initFilterInValues(ctx context.Context, tenantIDs []TenantID, return qNew, nil } +func (iff *ifFilter) hasFilterInWithQuery() bool { + if iff == nil { + return false + } + return hasFilterInWithQueryForFilter(iff.f) +} + func hasFilterInWithQueryForFilter(f filter) bool { if f == nil { return false @@ -275,12 +282,20 @@ func hasFilterInWithQueryForPipes(pipes []pipe) bool { switch t := p.(type) { case *pipeStats: for _, f := range t.funcs { - if hasFilterInWithQueryForFilter(f.iff) { + if f.iff.hasFilterInWithQuery() { return true } } case *pipeExtract: - if hasFilterInWithQueryForFilter(t.iff) { + if t.iff.hasFilterInWithQuery() { + return true + } + case *pipeUnpackJSON: + if t.iff.hasFilterInWithQuery() { + return true + } + case *pipeUnpackLogfmt: + if t.iff.hasFilterInWithQuery() { return true } } @@ -290,7 +305,26 @@ func hasFilterInWithQueryForPipes(pipes []pipe) bool { type getFieldValuesFunc func(q *Query, fieldName string) ([]string, error) +func (iff *ifFilter) initFilterInValues(cache map[string][]string, getFieldValuesFunc getFieldValuesFunc) (*ifFilter, error) { + if iff == nil { + return nil, nil + } + + f, err := initFilterInValuesForFilter(cache, iff.f, getFieldValuesFunc) + if err != nil { + return nil, err + } + + iffNew := *iff + iffNew.f = f + return &iffNew, nil +} + func initFilterInValuesForFilter(cache map[string][]string, f filter, getFieldValuesFunc getFieldValuesFunc) (filter, error) { + if f == nil { + return nil, nil + } + visitFunc := func(f filter) bool { fi, ok := f.(*filterIn) return ok && fi.needExecuteQuery @@ -326,13 +360,11 @@ func initFilterInValuesForPipes(cache map[string][]string, pipes []pipe, getFiel case *pipeStats: funcsNew := make([]pipeStatsFunc, len(t.funcs)) for j, f := range t.funcs { - if f.iff != nil { - fNew, err := initFilterInValuesForFilter(cache, f.iff, getFieldValuesFunc) - if err != nil { - return nil, err - } - f.iff = fNew + iffNew, err := f.iff.initFilterInValues(cache, getFieldValuesFunc) + if err != nil { + return nil, err } + f.iff = iffNew funcsNew[j] = f } pipesNew[i] = &pipeStats{ @@ -340,13 +372,29 @@ func initFilterInValuesForPipes(cache map[string][]string, pipes []pipe, getFiel funcs: funcsNew, } case *pipeExtract: - fNew, err := initFilterInValuesForFilter(cache, t.iff, getFieldValuesFunc) + iffNew, err := t.iff.initFilterInValues(cache, getFieldValuesFunc) if err != nil { return nil, err } pe := *t - pe.iff = fNew + pe.iff = iffNew pipesNew[i] = &pe + case *pipeUnpackJSON: + iffNew, err := t.iff.initFilterInValues(cache, getFieldValuesFunc) + if err != nil { + return nil, err + } + pu := *t + pu.iff = iffNew + pipesNew[i] = &pu + case *pipeUnpackLogfmt: + iffNew, err := t.iff.initFilterInValues(cache, getFieldValuesFunc) + if err != nil { + return nil, err + } + pu := *t + pu.iff = iffNew + pipesNew[i] = &pu default: pipesNew[i] = p }