From 33610de341a46c6870a4ac1809518cfe5362f481 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Sat, 25 May 2024 14:37:26 +0200 Subject: [PATCH] wip --- docs/VictoriaLogs/CHANGELOG.md | 4 + docs/VictoriaLogs/LogsQL.md | 55 ++++++ lib/logstorage/parser.go | 19 +- lib/logstorage/pipe.go | 17 ++ lib/logstorage/pipe_copy.go | 12 ++ lib/logstorage/pipe_delete.go | 12 ++ lib/logstorage/pipe_extract.go | 18 ++ lib/logstorage/pipe_field_names.go | 12 ++ lib/logstorage/pipe_fields.go | 12 ++ lib/logstorage/pipe_filter.go | 18 ++ lib/logstorage/pipe_format.go | 18 ++ lib/logstorage/pipe_limit.go | 13 ++ lib/logstorage/pipe_offset.go | 13 ++ lib/logstorage/pipe_rename.go | 12 ++ lib/logstorage/pipe_replace.go | 127 ++++--------- lib/logstorage/pipe_replace_regexp.go | 170 ++++++++++++++++++ lib/logstorage/pipe_replace_regexp_test.go | 200 +++++++++++++++++++++ lib/logstorage/pipe_replace_test.go | 3 +- lib/logstorage/pipe_sort.go | 12 ++ lib/logstorage/pipe_stats.go | 30 ++++ lib/logstorage/pipe_uniq.go | 12 ++ lib/logstorage/pipe_unpack.go | 52 +++++- lib/logstorage/pipe_unpack_json.go | 55 ++---- lib/logstorage/pipe_unpack_logfmt.go | 18 ++ lib/logstorage/pipe_update.go | 103 +++++++++++ lib/logstorage/storage_search.go | 90 +--------- 26 files changed, 868 insertions(+), 239 deletions(-) create mode 100644 lib/logstorage/pipe_replace_regexp.go create mode 100644 lib/logstorage/pipe_replace_regexp_test.go create mode 100644 lib/logstorage/pipe_update.go diff --git a/docs/VictoriaLogs/CHANGELOG.md b/docs/VictoriaLogs/CHANGELOG.md index 109a1d5b7..59ae6447f 100644 --- a/docs/VictoriaLogs/CHANGELOG.md +++ b/docs/VictoriaLogs/CHANGELOG.md @@ -19,6 +19,10 @@ according to [these docs](https://docs.victoriametrics.com/victorialogs/quicksta ## tip +* FEATURE: add [`replace_regexp` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#replace_regexp-pipe), which allows updating [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) with regular expressions. + +* BUGFIX: properly initialize values for [`in(...)` filter](https://docs.victoriametrics.com/victorialogs/logsql/#exact-filter) inside [`filter` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#filter-pipe) if the `in(...)` contains other [filters](https://docs.victoriametrics.com/victorialogs/logsql/#filters). For example, `_time:5m | filter ip:in(user_type:admin | fields ip)` now works correctly. + ## [v0.11.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.11.0-victorialogs) Released at 2024-05-25 diff --git a/docs/VictoriaLogs/LogsQL.md b/docs/VictoriaLogs/LogsQL.md index f4b794d0d..8a0cf0d52 100644 --- a/docs/VictoriaLogs/LogsQL.md +++ b/docs/VictoriaLogs/LogsQL.md @@ -1082,6 +1082,7 @@ LogsQL supports the following pipes: - [`offset`](#offset-pipe) skips the given number of selected logs. - [`rename`](#rename-pipe) renames [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). - [`replace`](#replace-pipe) replaces substrings in the specified [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). +- [`replace_regexp`](#replace_regexp-pipe) updates [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) with regular expressions. - [`sort`](#sort-pipe) sorts logs by the given [fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). - [`stats`](#stats-pipe) calculates various stats over the selected logs. - [`uniq`](#uniq-pipe) returns unique log entires. @@ -1373,6 +1374,7 @@ See also: - [Conditional format](#conditional-format) - [`replace` pipe](#replace-pipe) +- [`replace_regexp` pipe](#replace_regexp-pipe) - [`extract` pipe](#extract-pipe) @@ -1482,6 +1484,7 @@ See [general performance tips](#performance-tips) for details. See also: - [Conditional replace](#conditional-replace) +- [`replace_regexp` pipe](#replace_regexp-pipe) - [`format` pipe](#format-pipe) - [`extract` pipe](#extract-pipe) @@ -1496,6 +1499,58 @@ only if `user_type` field equals to `admin`: _time:5m | replace if (user_type:=admin) replace ("secret", "***") at password ``` +### replace_regexp pipe + +`| replace_regexp ("regexp", "replacement") at field` [pipe](#pipes) replaces all the substrings matching the given `regexp` with the given `replacement` +in the given [`field`](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). + +The `regexp` must contain regular expression with [RE2 syntax](https://github.com/google/re2/wiki/Syntax). +The `replacement` may contain `$N` or `${N}` placeholders, which are substituted with the `N-th` capturing group in the `regexp`. + +For example, the following query replaces all the substrings starting with `host-` and ending with `-foo` with the contents between `host-` and `-foo` in the [`_msg` field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field) for logs over the last 5 minutes: + +```logsql +_time:5m | replace_regexp ("host-(.+?)-foo", "$1") at _msg +``` + +The `at _msg` part can be omitted if the replacement occurs in the [`_msg` field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field). +The following query is equivalent to the previous one: + +```logsql +_time:5m | replace_regexp ("host-(.+?)-foo", "$1") +``` + +The number of replacements can be limited with `limit N` at the end of `replace`. For example, the following query replaces only the first `password: ...` substring +ending with whitespace with empty substring at the [log field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) `baz`: + +```logsql +_time:5m | replace_regexp ('password: [^ ]+', '') at baz limit 1 +``` + +Performance tips: + +- It is recommended using [`replace` pipe](#replace-pipe) instead of `replace_regexp` if possible, since it works faster. +- It is recommended using more specific [log filters](#filters) in order to reduce the number of log entries, which are passed to `replace`. + See [general performance tips](#performance-tips) for details. + +See also: + +- [Conditional replace_regexp](#conditional-replace_regexp) +- [`replace` pipe](#replace-pipe) +- [`format` pipe](#format-pipe) +- [`extract` pipe](#extract-pipe) + +#### Conditional replace_regexp + +If the [`replace_regexp` pipe](#replace-pipe) musn't be applied to every [log entry](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model), +then add `if ()` after `replace_regexp`. +The `` can contain arbitrary [filters](#filters). For example, the following query replaces `password: ...` substrings ending with whitespace +with `***` in the `foo` field only if `user_type` field equals to `admin`: + +```logsql +_time:5m | replace_regexp if (user_type:=admin) replace ("password: [^ ]+", "") at foo +``` + ### sort pipe By default logs are selected in arbitrary order because of performance reasons. If logs must be sorted, then `| sort by (field1, ..., fieldN)` [pipe](#pipes) can be used. diff --git a/lib/logstorage/parser.go b/lib/logstorage/parser.go index 4cae9d853..f613ea8c0 100644 --- a/lib/logstorage/parser.go +++ b/lib/logstorage/parser.go @@ -321,23 +321,10 @@ func (q *Query) Optimize() { // Call Optimize for queries from 'in(query)' filters. optimizeFilterIn(q.f) + + // Optimize individual pipes. for _, p := range q.pipes { - switch t := p.(type) { - case *pipeStats: - for _, f := range t.funcs { - f.iff.optimizeFilterIn() - } - case *pipeReplace: - t.iff.optimizeFilterIn() - case *pipeFormat: - t.iff.optimizeFilterIn() - case *pipeExtract: - t.iff.optimizeFilterIn() - case *pipeUnpackJSON: - t.iff.optimizeFilterIn() - case *pipeUnpackLogfmt: - t.iff.optimizeFilterIn() - } + p.optimize() } } diff --git a/lib/logstorage/pipe.go b/lib/logstorage/pipe.go index 1e4449c1b..dbd852ba8 100644 --- a/lib/logstorage/pipe.go +++ b/lib/logstorage/pipe.go @@ -20,6 +20,17 @@ type pipe interface { // // The returned pipeProcessor may call cancel() at any time in order to notify worker goroutines to stop sending new data to pipeProcessor. newPipeProcessor(workersCount int, stopCh <-chan struct{}, cancel func(), ppBase pipeProcessor) pipeProcessor + + // optimize must optimize the pipe + optimize() + + // hasFilterInWithQuery must return true of pipe contains 'in(subquery)' filter (recursively). + hasFilterInWithQuery() bool + + // initFilterInValues must return new pipe with the initialized values for 'in(subquery)' filters (recursively). + // + // It is OK to return the pipe itself if it doesn't contain 'in(subquery)' filters. + initFilterInValues(cache map[string][]string, getFieldValuesFunc getFieldValuesFunc) (pipe, error) } // pipeProcessor must process a single pipe. @@ -147,6 +158,12 @@ func parsePipe(lex *lexer) (pipe, error) { return nil, fmt.Errorf("cannot parse 'replace' pipe: %w", err) } return pr, nil + case lex.isKeyword("replace_regexp"): + pr, err := parsePipeReplaceRegexp(lex) + if err != nil { + return nil, fmt.Errorf("cannot parse 'replace_regexp' pipe: %w", err) + } + return pr, nil case lex.isKeyword("sort"): ps, err := parsePipeSort(lex) if err != nil { diff --git a/lib/logstorage/pipe_copy.go b/lib/logstorage/pipe_copy.go index 8868b9c90..858cba747 100644 --- a/lib/logstorage/pipe_copy.go +++ b/lib/logstorage/pipe_copy.go @@ -50,6 +50,18 @@ func (pc *pipeCopy) updateNeededFields(neededFields, unneededFields fieldsSet) { } } +func (pc *pipeCopy) optimize() { + // Nothing to do +} + +func (pc *pipeCopy) hasFilterInWithQuery() bool { + return false +} + +func (pc *pipeCopy) initFilterInValues(cache map[string][]string, getFieldValuesFunc getFieldValuesFunc) (pipe, error) { + return pc, nil +} + func (pc *pipeCopy) newPipeProcessor(_ int, _ <-chan struct{}, _ func(), ppBase pipeProcessor) pipeProcessor { return &pipeCopyProcessor{ pc: pc, diff --git a/lib/logstorage/pipe_delete.go b/lib/logstorage/pipe_delete.go index 3389cc94e..d329482de 100644 --- a/lib/logstorage/pipe_delete.go +++ b/lib/logstorage/pipe_delete.go @@ -32,6 +32,18 @@ func (pd *pipeDelete) updateNeededFields(neededFields, unneededFields fieldsSet) } } +func (pd *pipeDelete) optimize() { + // nothing to do +} + +func (pd *pipeDelete) hasFilterInWithQuery() bool { + return false +} + +func (pd *pipeDelete) initFilterInValues(cache map[string][]string, getFieldValuesFunc getFieldValuesFunc) (pipe, error) { + return pd, nil +} + func (pd *pipeDelete) newPipeProcessor(_ int, _ <-chan struct{}, _ func(), ppBase pipeProcessor) pipeProcessor { return &pipeDeleteProcessor{ pd: pd, diff --git a/lib/logstorage/pipe_extract.go b/lib/logstorage/pipe_extract.go index 6e06627ba..0171bf313 100644 --- a/lib/logstorage/pipe_extract.go +++ b/lib/logstorage/pipe_extract.go @@ -38,6 +38,24 @@ func (pe *pipeExtract) String() string { return s } +func (pe *pipeExtract) optimize() { + pe.iff.optimizeFilterIn() +} + +func (pe *pipeExtract) hasFilterInWithQuery() bool { + return pe.iff.hasFilterInWithQuery() +} + +func (pe *pipeExtract) initFilterInValues(cache map[string][]string, getFieldValuesFunc getFieldValuesFunc) (pipe, error) { + iffNew, err := pe.iff.initFilterInValues(cache, getFieldValuesFunc) + if err != nil { + return nil, err + } + peNew := *pe + peNew.iff = iffNew + return &peNew, nil +} + func (pe *pipeExtract) updateNeededFields(neededFields, unneededFields fieldsSet) { if neededFields.contains("*") { unneededFieldsOrig := unneededFields.clone() diff --git a/lib/logstorage/pipe_field_names.go b/lib/logstorage/pipe_field_names.go index 5feb23cf8..63eff0ab0 100644 --- a/lib/logstorage/pipe_field_names.go +++ b/lib/logstorage/pipe_field_names.go @@ -37,6 +37,18 @@ func (pf *pipeFieldNames) updateNeededFields(neededFields, unneededFields fields } } +func (pf *pipeFieldNames) optimize() { + // nothing to do +} + +func (pf *pipeFieldNames) hasFilterInWithQuery() bool { + return false +} + +func (pf *pipeFieldNames) initFilterInValues(cache map[string][]string, getFieldValuesFunc getFieldValuesFunc) (pipe, error) { + return pf, nil +} + func (pf *pipeFieldNames) newPipeProcessor(workersCount int, stopCh <-chan struct{}, _ func(), ppBase pipeProcessor) pipeProcessor { shards := make([]pipeFieldNamesProcessorShard, workersCount) diff --git a/lib/logstorage/pipe_fields.go b/lib/logstorage/pipe_fields.go index a391cbd0e..0fa12212f 100644 --- a/lib/logstorage/pipe_fields.go +++ b/lib/logstorage/pipe_fields.go @@ -49,6 +49,18 @@ func (pf *pipeFields) updateNeededFields(neededFields, unneededFields fieldsSet) unneededFields.reset() } +func (pf *pipeFields) optimize() { + // nothing to do +} + +func (pf *pipeFields) hasFilterInWithQuery() bool { + return false +} + +func (pf *pipeFields) initFilterInValues(cache map[string][]string, getFieldValuesFunc getFieldValuesFunc) (pipe, error) { + return pf, nil +} + func (pf *pipeFields) newPipeProcessor(_ int, _ <-chan struct{}, _ func(), ppBase pipeProcessor) pipeProcessor { return &pipeFieldsProcessor{ pf: pf, diff --git a/lib/logstorage/pipe_filter.go b/lib/logstorage/pipe_filter.go index a8f8787b6..050901ec8 100644 --- a/lib/logstorage/pipe_filter.go +++ b/lib/logstorage/pipe_filter.go @@ -29,6 +29,24 @@ func (pf *pipeFilter) updateNeededFields(neededFields, unneededFields fieldsSet) } } +func (pf *pipeFilter) optimize() { + optimizeFilterIn(pf.f) +} + +func (pf *pipeFilter) hasFilterInWithQuery() bool { + return hasFilterInWithQueryForFilter(pf.f) +} + +func (pf *pipeFilter) initFilterInValues(cache map[string][]string, getFieldValuesFunc getFieldValuesFunc) (pipe, error) { + fNew, err := initFilterInValuesForFilter(cache, pf.f, getFieldValuesFunc) + if err != nil { + return nil, err + } + pfNew := *pf + pf.f = fNew + return &pfNew, nil +} + func (pf *pipeFilter) newPipeProcessor(workersCount int, _ <-chan struct{}, _ func(), ppBase pipeProcessor) pipeProcessor { shards := make([]pipeFilterProcessorShard, workersCount) diff --git a/lib/logstorage/pipe_format.go b/lib/logstorage/pipe_format.go index e47da7c93..805dfaaa0 100644 --- a/lib/logstorage/pipe_format.go +++ b/lib/logstorage/pipe_format.go @@ -74,6 +74,24 @@ func (pf *pipeFormat) updateNeededFields(neededFields, unneededFields fieldsSet) } } +func (pf *pipeFormat) optimize() { + pf.iff.optimizeFilterIn() +} + +func (pf *pipeFormat) hasFilterInWithQuery() bool { + return pf.iff.hasFilterInWithQuery() +} + +func (pf *pipeFormat) initFilterInValues(cache map[string][]string, getFieldValuesFunc getFieldValuesFunc) (pipe, error) { + iffNew, err := pf.iff.initFilterInValues(cache, getFieldValuesFunc) + if err != nil { + return nil, err + } + pfNew := *pf + pfNew.iff = iffNew + return &pfNew, nil +} + func (pf *pipeFormat) newPipeProcessor(workersCount int, _ <-chan struct{}, _ func(), ppBase pipeProcessor) pipeProcessor { return &pipeFormatProcessor{ pf: pf, diff --git a/lib/logstorage/pipe_limit.go b/lib/logstorage/pipe_limit.go index 15e0f4f26..5e5d0692c 100644 --- a/lib/logstorage/pipe_limit.go +++ b/lib/logstorage/pipe_limit.go @@ -17,6 +17,19 @@ func (pl *pipeLimit) String() string { } func (pl *pipeLimit) updateNeededFields(_, _ fieldsSet) { + // nothing to do +} + +func (pl *pipeLimit) optimize() { + // nothing to do +} + +func (pl *pipeLimit) hasFilterInWithQuery() bool { + return false +} + +func (pl *pipeLimit) initFilterInValues(cache map[string][]string, getFieldValuesFunc getFieldValuesFunc) (pipe, error) { + return pl, nil } func (pl *pipeLimit) newPipeProcessor(_ int, _ <-chan struct{}, cancel func(), ppBase pipeProcessor) pipeProcessor { diff --git a/lib/logstorage/pipe_offset.go b/lib/logstorage/pipe_offset.go index 5518d80f8..f6fd2aa1c 100644 --- a/lib/logstorage/pipe_offset.go +++ b/lib/logstorage/pipe_offset.go @@ -17,6 +17,19 @@ func (po *pipeOffset) String() string { } func (po *pipeOffset) updateNeededFields(_, _ fieldsSet) { + // nothing to do +} + +func (po *pipeOffset) optimize() { + // nothing to do +} + +func (po *pipeOffset) hasFilterInWithQuery() bool { + return false +} + +func (po *pipeOffset) initFilterInValues(cache map[string][]string, getFieldValuesFunc getFieldValuesFunc) (pipe, error) { + return po, nil } func (po *pipeOffset) newPipeProcessor(_ int, _ <-chan struct{}, _ func(), ppBase pipeProcessor) pipeProcessor { diff --git a/lib/logstorage/pipe_rename.go b/lib/logstorage/pipe_rename.go index 76814c3e3..a99cb84bd 100644 --- a/lib/logstorage/pipe_rename.go +++ b/lib/logstorage/pipe_rename.go @@ -54,6 +54,18 @@ func (pr *pipeRename) updateNeededFields(neededFields, unneededFields fieldsSet) } } +func (pr *pipeRename) optimize() { + // nothing to do +} + +func (pr *pipeRename) hasFilterInWithQuery() bool { + return false +} + +func (pr *pipeRename) initFilterInValues(cache map[string][]string, getFieldValuesFunc getFieldValuesFunc) (pipe, error) { + return pr, nil +} + func (pr *pipeRename) newPipeProcessor(_ int, _ <-chan struct{}, _ func(), ppBase pipeProcessor) pipeProcessor { return &pipeRenameProcessor{ pr: pr, diff --git a/lib/logstorage/pipe_replace.go b/lib/logstorage/pipe_replace.go index 69993b14d..f20de6b75 100644 --- a/lib/logstorage/pipe_replace.go +++ b/lib/logstorage/pipe_replace.go @@ -3,16 +3,13 @@ package logstorage import ( "fmt" "strings" - "unsafe" - - "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" ) // pipeReplace processes '| replace ...' pipe. // // See https://docs.victoriametrics.com/victorialogs/logsql/#replace-pipe type pipeReplace struct { - srcField string + field string oldSubstr string newSubstr string @@ -29,8 +26,8 @@ func (pr *pipeReplace) String() string { s += " " + pr.iff.String() } s += fmt.Sprintf(" (%s, %s)", quoteTokenIfNeeded(pr.oldSubstr), quoteTokenIfNeeded(pr.newSubstr)) - if pr.srcField != "_msg" { - s += " at " + quoteTokenIfNeeded(pr.srcField) + if pr.field != "_msg" { + s += " at " + quoteTokenIfNeeded(pr.field) } if pr.limit > 0 { s += fmt.Sprintf(" limit %d", pr.limit) @@ -39,97 +36,37 @@ func (pr *pipeReplace) String() string { } func (pr *pipeReplace) updateNeededFields(neededFields, unneededFields fieldsSet) { - if neededFields.contains("*") { - if !unneededFields.contains(pr.srcField) && pr.iff != nil { - unneededFields.removeFields(pr.iff.neededFields) - } - } else { - if neededFields.contains(pr.srcField) && pr.iff != nil { - neededFields.addFields(pr.iff.neededFields) - } + updateNeededFieldsForUpdatePipe(neededFields, unneededFields, pr.field, pr.iff) +} + +func (pr *pipeReplace) optimize() { + pr.iff.optimizeFilterIn() +} + +func (pr *pipeReplace) hasFilterInWithQuery() bool { + return pr.iff.hasFilterInWithQuery() +} + +func (pr *pipeReplace) initFilterInValues(cache map[string][]string, getFieldValuesFunc getFieldValuesFunc) (pipe, error) { + iffNew, err := pr.iff.initFilterInValues(cache, getFieldValuesFunc) + if err != nil { + return nil, err } + peNew := *pr + peNew.iff = iffNew + return &peNew, nil } func (pr *pipeReplace) newPipeProcessor(workersCount int, _ <-chan struct{}, _ func(), ppBase pipeProcessor) pipeProcessor { - return &pipeReplaceProcessor{ - pr: pr, - ppBase: ppBase, - - shards: make([]pipeReplaceProcessorShard, workersCount), - } -} - -type pipeReplaceProcessor struct { - pr *pipeReplace - ppBase pipeProcessor - - shards []pipeReplaceProcessorShard -} - -type pipeReplaceProcessorShard struct { - pipeReplaceProcessorShardNopad - - // The padding prevents false sharing on widespread platforms with 128 mod (cache line size) = 0 . - _ [128 - unsafe.Sizeof(pipeReplaceProcessorShardNopad{})%128]byte -} - -type pipeReplaceProcessorShardNopad struct { - bm bitmap - - uctx fieldsUnpackerContext - wctx pipeUnpackWriteContext -} - -func (prp *pipeReplaceProcessor) writeBlock(workerID uint, br *blockResult) { - if len(br.timestamps) == 0 { - return + updateFunc := func(a *arena, v string) string { + bb := bbPool.Get() + bb.B = appendReplace(bb.B[:0], v, pr.oldSubstr, pr.newSubstr, pr.limit) + result := a.copyBytesToString(bb.B) + bbPool.Put(bb) + return result } - shard := &prp.shards[workerID] - shard.wctx.init(workerID, prp.ppBase, false, false, br) - shard.uctx.init(workerID, "") - - pr := prp.pr - bm := &shard.bm - bm.init(len(br.timestamps)) - bm.setBits() - if iff := pr.iff; iff != nil { - iff.f.applyToBlockResult(br, bm) - if bm.isZero() { - prp.ppBase.writeBlock(workerID, br) - return - } - } - - c := br.getColumnByName(pr.srcField) - values := c.getValues(br) - - bb := bbPool.Get() - vPrev := "" - shard.uctx.addField(pr.srcField, "") - for rowIdx, v := range values { - if bm.isSetBit(rowIdx) { - if vPrev != v { - bb.B = appendReplace(bb.B[:0], v, pr.oldSubstr, pr.newSubstr, pr.limit) - s := bytesutil.ToUnsafeString(bb.B) - shard.uctx.resetFields() - shard.uctx.addField(pr.srcField, s) - vPrev = v - } - shard.wctx.writeRow(rowIdx, shard.uctx.fields) - } else { - shard.wctx.writeRow(rowIdx, nil) - } - } - bbPool.Put(bb) - - shard.wctx.flush() - shard.wctx.reset() - shard.uctx.reset() -} - -func (prp *pipeReplaceProcessor) flush() error { - return nil + return newPipeUpdateProcessor(workersCount, updateFunc, ppBase, pr.field, pr.iff) } func parsePipeReplace(lex *lexer) (*pipeReplace, error) { @@ -164,7 +101,7 @@ func parsePipeReplace(lex *lexer) (*pipeReplace, error) { newSubstr, err := getCompoundToken(lex) if err != nil { - return nil, fmt.Errorf("cannot parse newSubstr in 'replace': %w", err) + return nil, fmt.Errorf("cannot parse newSubstr in 'replace(%q': %w", oldSubstr, err) } if !lex.isKeyword(")") { @@ -172,14 +109,14 @@ func parsePipeReplace(lex *lexer) (*pipeReplace, error) { } lex.nextToken() - srcField := "_msg" + field := "_msg" if lex.isKeyword("at") { lex.nextToken() f, err := parseFieldName(lex) if err != nil { return nil, fmt.Errorf("cannot parse 'at' field after 'replace(%q, %q)': %w", oldSubstr, newSubstr, err) } - srcField = f + field = f } limit := uint64(0) @@ -194,7 +131,7 @@ func parsePipeReplace(lex *lexer) (*pipeReplace, error) { } pr := &pipeReplace{ - srcField: srcField, + field: field, oldSubstr: oldSubstr, newSubstr: newSubstr, limit: limit, diff --git a/lib/logstorage/pipe_replace_regexp.go b/lib/logstorage/pipe_replace_regexp.go new file mode 100644 index 000000000..053dbd0fb --- /dev/null +++ b/lib/logstorage/pipe_replace_regexp.go @@ -0,0 +1,170 @@ +package logstorage + +import ( + "fmt" + "regexp" +) + +// pipeReplaceRegexp processes '| replace_regexp ...' pipe. +// +// See https://docs.victoriametrics.com/victorialogs/logsql/#replace_regexp-pipe +type pipeReplaceRegexp struct { + field string + re *regexp.Regexp + replacement string + + // limit limits the number of replacements, which can be performed + limit uint64 + + // iff is an optional filter for skipping the replace_regexp operation + iff *ifFilter +} + +func (pr *pipeReplaceRegexp) String() string { + s := "replace_regexp" + if pr.iff != nil { + s += " " + pr.iff.String() + } + s += fmt.Sprintf(" (%s, %s)", quoteTokenIfNeeded(pr.re.String()), quoteTokenIfNeeded(pr.replacement)) + if pr.field != "_msg" { + s += " at " + quoteTokenIfNeeded(pr.field) + } + if pr.limit > 0 { + s += fmt.Sprintf(" limit %d", pr.limit) + } + return s +} + +func (pr *pipeReplaceRegexp) updateNeededFields(neededFields, unneededFields fieldsSet) { + updateNeededFieldsForUpdatePipe(neededFields, unneededFields, pr.field, pr.iff) +} + +func (pr *pipeReplaceRegexp) optimize() { + pr.iff.optimizeFilterIn() +} + +func (pr *pipeReplaceRegexp) hasFilterInWithQuery() bool { + return pr.iff.hasFilterInWithQuery() +} + +func (pr *pipeReplaceRegexp) initFilterInValues(cache map[string][]string, getFieldValuesFunc getFieldValuesFunc) (pipe, error) { + iffNew, err := pr.iff.initFilterInValues(cache, getFieldValuesFunc) + if err != nil { + return nil, err + } + peNew := *pr + peNew.iff = iffNew + return &peNew, nil +} + +func (pr *pipeReplaceRegexp) newPipeProcessor(workersCount int, _ <-chan struct{}, _ func(), ppBase pipeProcessor) pipeProcessor { + updateFunc := func(a *arena, v string) string { + bb := bbPool.Get() + bb.B = appendReplaceRegexp(bb.B[:0], v, pr.re, pr.replacement, pr.limit) + result := a.copyBytesToString(bb.B) + bbPool.Put(bb) + return result + } + + return newPipeUpdateProcessor(workersCount, updateFunc, ppBase, pr.field, pr.iff) + +} + +func parsePipeReplaceRegexp(lex *lexer) (*pipeReplaceRegexp, error) { + if !lex.isKeyword("replace_regexp") { + return nil, fmt.Errorf("unexpected token: %q; want %q", lex.token, "replace_regexp") + } + lex.nextToken() + + // parse optional if (...) + var iff *ifFilter + if lex.isKeyword("if") { + f, err := parseIfFilter(lex) + if err != nil { + return nil, err + } + iff = f + } + + if !lex.isKeyword("(") { + return nil, fmt.Errorf("missing '(' after 'replace_regexp'") + } + lex.nextToken() + + reStr, err := getCompoundToken(lex) + if err != nil { + return nil, fmt.Errorf("cannot parse reStr in 'replace_regexp': %w", err) + } + re, err := regexp.Compile(reStr) + if err != nil { + return nil, fmt.Errorf("cannot parse regexp %q in 'replace_regexp': %w", reStr, err) + } + if !lex.isKeyword(",") { + return nil, fmt.Errorf("missing ',' after 'replace_regexp(%q'", reStr) + } + lex.nextToken() + + replacement, err := getCompoundToken(lex) + if err != nil { + return nil, fmt.Errorf("cannot parse replacement in 'replace_regexp(%q': %w", reStr, err) + } + + if !lex.isKeyword(")") { + return nil, fmt.Errorf("missing ')' after 'replace_regexp(%q, %q'", reStr, replacement) + } + lex.nextToken() + + field := "_msg" + if lex.isKeyword("at") { + lex.nextToken() + f, err := parseFieldName(lex) + if err != nil { + return nil, fmt.Errorf("cannot parse 'at' field after 'replace_regexp(%q, %q)': %w", reStr, replacement, err) + } + field = f + } + + limit := uint64(0) + if lex.isKeyword("limit") { + lex.nextToken() + n, ok := tryParseUint64(lex.token) + if !ok { + return nil, fmt.Errorf("cannot parse 'limit %s' in 'replace_regexp'", lex.token) + } + lex.nextToken() + limit = n + } + + pr := &pipeReplaceRegexp{ + field: field, + re: re, + replacement: replacement, + limit: limit, + iff: iff, + } + + return pr, nil +} + +func appendReplaceRegexp(dst []byte, s string, re *regexp.Regexp, replacement string, limit uint64) []byte { + if len(s) == 0 { + return dst + } + + replacements := uint64(0) + for { + locs := re.FindStringSubmatchIndex(s) + if locs == nil { + return append(dst, s...) + } + start := locs[0] + dst = append(dst, s[:start]...) + end := locs[1] + dst = re.ExpandString(dst, replacement, s, locs) + s = s[end:] + replacements++ + if limit > 0 && replacements >= limit { + return append(dst, s...) + } + } +} diff --git a/lib/logstorage/pipe_replace_regexp_test.go b/lib/logstorage/pipe_replace_regexp_test.go new file mode 100644 index 000000000..81e0230d3 --- /dev/null +++ b/lib/logstorage/pipe_replace_regexp_test.go @@ -0,0 +1,200 @@ +package logstorage + +import ( + "regexp" + "testing" +) + +func TestParsePipeReplaceRegexpSuccess(t *testing.T) { + f := func(pipeStr string) { + t.Helper() + expectParsePipeSuccess(t, pipeStr) + } + + f(`replace_regexp (foo, bar)`) + f(`replace_regexp ("foo[^ ]+bar|baz", "bar${1}x$0")`) + f(`replace_regexp (" ", "") at x`) + f(`replace_regexp if (x:y) ("-", ":") at a`) + f(`replace_regexp (" ", "") at x limit 10`) + f(`replace_regexp if (x:y) (" ", "") at foo limit 10`) +} + +func TestParsePipeReplaceRegexpFailure(t *testing.T) { + f := func(pipeStr string) { + t.Helper() + expectParsePipeFailure(t, pipeStr) + } + + f(`replace_regexp`) + f(`replace_regexp if`) + f(`replace_regexp foo`) + f(`replace_regexp (`) + f(`replace_regexp (foo`) + f(`replace_regexp (foo,`) + f(`replace_regexp(foo,bar`) + f(`replace_regexp(foo,bar,baz)`) + f(`replace_regexp(foo,bar) abc`) + f(`replace_regexp(bar,baz) limit`) + f(`replace_regexp(bar,baz) limit N`) + f(`replace_regexp ("foo[", "bar")`) +} + +func TestPipeReplaceRegexp(t *testing.T) { + f := func(pipeStr string, rows, rowsExpected [][]Field) { + t.Helper() + expectPipeResults(t, pipeStr, rows, rowsExpected) + } + + // replace_regexp with placeholders + f(`replace_regexp ("foo(.+?)bar", "q-$1-x")`, [][]Field{ + { + {"_msg", `abc foo a bar foobar foo b bar`}, + {"bar", `cde`}, + }, + { + {"_msg", `1234`}, + }, + }, [][]Field{ + { + {"_msg", `abc q- a -x q-bar foo b -x`}, + {"bar", `cde`}, + }, + { + {"_msg", `1234`}, + }, + }) + + // replace_regexp without limits at _msg + f(`replace_regexp ("[_/]", "-")`, [][]Field{ + { + {"_msg", `a_bc_d/ef`}, + {"bar", `cde`}, + }, + { + {"_msg", `1234`}, + }, + }, [][]Field{ + { + {"_msg", `a-bc-d-ef`}, + {"bar", `cde`}, + }, + { + {"_msg", `1234`}, + }, + }) + + // replace_regexp with limit 1 at foo + f(`replace_regexp ("[_/]", "-") at foo limit 1`, [][]Field{ + { + {"foo", `a_bc_d/ef`}, + {"bar", `cde`}, + }, + { + {"foo", `1234`}, + }, + }, [][]Field{ + { + {"foo", `a-bc_d/ef`}, + {"bar", `cde`}, + }, + { + {"foo", `1234`}, + }, + }) + + // replace_regexp with limit 100 at foo + f(`replace_regexp ("[_/]", "-") at foo limit 100`, [][]Field{ + { + {"foo", `a_bc_d/ef`}, + {"bar", `cde`}, + }, + { + {"foo", `1234`}, + }, + }, [][]Field{ + { + {"foo", `a-bc-d-ef`}, + {"bar", `cde`}, + }, + { + {"foo", `1234`}, + }, + }) + + // conditional replace_regexp at foo + f(`replace_regexp if (bar:abc) ("[_/]", "") at foo`, [][]Field{ + { + {"foo", `a_bc_d/ef`}, + {"bar", `cde`}, + }, + { + {"foo", `123_45/6`}, + {"bar", "abc"}, + }, + }, [][]Field{ + { + {"foo", `a_bc_d/ef`}, + {"bar", `cde`}, + }, + { + {"foo", `123456`}, + {"bar", "abc"}, + }, + }) +} + +func TestPipeReplaceRegexpUpdateNeededFields(t *testing.T) { + f := func(s string, neededFields, unneededFields, neededFieldsExpected, unneededFieldsExpected string) { + t.Helper() + expectPipeNeededFields(t, s, neededFields, unneededFields, neededFieldsExpected, unneededFieldsExpected) + } + + // all the needed fields + f(`replace_regexp ("a", "b") at x`, "*", "", "*", "") + f(`replace_regexp if (f1:q) ("a", "b") at x`, "*", "", "*", "") + + // unneeded fields do not intersect with at field + f(`replace_regexp ("a", "b") at x`, "*", "f1,f2", "*", "f1,f2") + f(`replace_regexp if (f3:q) ("a", "b") at x`, "*", "f1,f2", "*", "f1,f2") + f(`replace_regexp if (f2:q) ("a", "b") at x`, "*", "f1,f2", "*", "f1") + + // unneeded fields intersect with at field + f(`replace_regexp ("a", "b") at x`, "*", "x,y", "*", "x,y") + f(`replace_regexp if (f1:q) ("a", "b") at x`, "*", "x,y", "*", "x,y") + f(`replace_regexp if (x:q) ("a", "b") at x`, "*", "x,y", "*", "x,y") + f(`replace_regexp if (y:q) ("a", "b") at x`, "*", "x,y", "*", "x,y") + + // needed fields do not intersect with at field + f(`replace_regexp ("a", "b") at x`, "f2,y", "", "f2,y", "") + f(`replace_regexp if (f1:q) ("a", "b") at x`, "f2,y", "", "f2,y", "") + + // needed fields intersect with at field + f(`replace_regexp ("a", "b") at y`, "f2,y", "", "f2,y", "") + f(`replace_regexp if (f1:q) ("a", "b") at y`, "f2,y", "", "f1,f2,y", "") +} + +func TestAppendReplaceRegexp(t *testing.T) { + f := func(s, reStr, replacement string, limit int, resultExpected string) { + t.Helper() + + re := regexp.MustCompile(reStr) + result := appendReplaceRegexp(nil, s, re, replacement, uint64(limit)) + if string(result) != resultExpected { + t.Fatalf("unexpected result for appendReplaceRegexp(%q, %q, %q, %d)\ngot\n%s\nwant\n%s", s, reStr, replacement, limit, result, resultExpected) + } + } + + f("", "", "", 0, "") + f("", "foo", "bar", 0, "") + f("abc", "foo", "bar", 0, "abc") + f("foo", "fo+", "bar", 0, "bar") + f("foox", "fo+", "bar", 0, "barx") + f("afoo", "fo+", "bar", 0, "abar") + f("afoox", "fo+", "bar", 0, "abarx") + f("foo-bar/baz", "[-/]", "_", 0, "foo_bar_baz") + f("foo bar/ baz ", "[ /]", "", 2, "foobar baz ") + + // placeholders + f("afoo abc barz", "a([^ ]+)", "b${1}x", 0, "bfoox bbcx bbrzx") + f("afoo abc barz", "a([^ ]+)", "b${1}x", 1, "bfoox abc barz") +} diff --git a/lib/logstorage/pipe_replace_test.go b/lib/logstorage/pipe_replace_test.go index 63ed49d08..2663790e1 100644 --- a/lib/logstorage/pipe_replace_test.go +++ b/lib/logstorage/pipe_replace_test.go @@ -163,10 +163,11 @@ func TestAppendReplace(t *testing.T) { f("", "", "", 0, "") f("", "foo", "bar", 0, "") + f("abc", "foo", "bar", 0, "abc") f("foo", "foo", "bar", 0, "bar") f("foox", "foo", "bar", 0, "barx") f("afoo", "foo", "bar", 0, "abar") f("afoox", "foo", "bar", 0, "abarx") f("foo-bar-baz", "-", "_", 0, "foo_bar_baz") - f("foo bar baz ", " ", "", 0, "foobarbaz") + f("foo bar baz ", " ", "", 1, "foobar baz ") } diff --git a/lib/logstorage/pipe_sort.go b/lib/logstorage/pipe_sort.go index cdc294644..6150b31cf 100644 --- a/lib/logstorage/pipe_sort.go +++ b/lib/logstorage/pipe_sort.go @@ -67,6 +67,18 @@ func (ps *pipeSort) updateNeededFields(neededFields, unneededFields fieldsSet) { } } +func (ps *pipeSort) optimize() { + // nothing to do +} + +func (ps *pipeSort) hasFilterInWithQuery() bool { + return false +} + +func (ps *pipeSort) initFilterInValues(cache map[string][]string, getFieldValuesFunc getFieldValuesFunc) (pipe, error) { + return ps, nil +} + func (ps *pipeSort) newPipeProcessor(workersCount int, stopCh <-chan struct{}, cancel func(), ppBase pipeProcessor) pipeProcessor { if ps.limit > 0 { return newPipeTopkProcessor(ps, workersCount, stopCh, cancel, ppBase) diff --git a/lib/logstorage/pipe_stats.go b/lib/logstorage/pipe_stats.go index 4643017ae..8f8039e41 100644 --- a/lib/logstorage/pipe_stats.go +++ b/lib/logstorage/pipe_stats.go @@ -116,6 +116,36 @@ func (ps *pipeStats) updateNeededFields(neededFields, unneededFields fieldsSet) unneededFields.reset() } +func (ps *pipeStats) optimize() { + for _, f := range ps.funcs { + f.iff.optimizeFilterIn() + } +} + +func (ps *pipeStats) hasFilterInWithQuery() bool { + for _, f := range ps.funcs { + if f.iff.hasFilterInWithQuery() { + return true + } + } + return false +} + +func (ps *pipeStats) initFilterInValues(cache map[string][]string, getFieldValuesFunc getFieldValuesFunc) (pipe, error) { + funcsNew := make([]pipeStatsFunc, len(ps.funcs)) + for i, f := range ps.funcs { + iffNew, err := f.iff.initFilterInValues(cache, getFieldValuesFunc) + if err != nil { + return nil, err + } + f.iff = iffNew + funcsNew[i] = f + } + psNew := *ps + ps.funcs = funcsNew + return &psNew, nil +} + const stateSizeBudgetChunk = 1 << 20 func (ps *pipeStats) newPipeProcessor(workersCount int, stopCh <-chan struct{}, cancel func(), ppBase pipeProcessor) pipeProcessor { diff --git a/lib/logstorage/pipe_uniq.go b/lib/logstorage/pipe_uniq.go index 794a25d8e..bac5475cb 100644 --- a/lib/logstorage/pipe_uniq.go +++ b/lib/logstorage/pipe_uniq.go @@ -51,6 +51,18 @@ func (pu *pipeUniq) updateNeededFields(neededFields, unneededFields fieldsSet) { } } +func (pu *pipeUniq) optimize() { + // nothing to do +} + +func (pu *pipeUniq) hasFilterInWithQuery() bool { + return false +} + +func (pu *pipeUniq) initFilterInValues(cache map[string][]string, getFieldValuesFunc getFieldValuesFunc) (pipe, error) { + return pu, nil +} + func (pu *pipeUniq) newPipeProcessor(workersCount int, stopCh <-chan struct{}, cancel func(), ppBase pipeProcessor) pipeProcessor { maxStateSize := int64(float64(memory.Allowed()) * 0.2) diff --git a/lib/logstorage/pipe_unpack.go b/lib/logstorage/pipe_unpack.go index 86c023959..1f41a3936 100644 --- a/lib/logstorage/pipe_unpack.go +++ b/lib/logstorage/pipe_unpack.go @@ -6,6 +6,49 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" ) +func updateNeededFieldsForUnpackPipe(fromField string, outFields []string, keepOriginalFields, skipEmptyResults bool, iff *ifFilter, neededFields, unneededFields fieldsSet) { + if neededFields.contains("*") { + unneededFieldsOrig := unneededFields.clone() + unneededFieldsCount := 0 + if len(outFields) > 0 { + for _, f := range outFields { + if unneededFieldsOrig.contains(f) { + unneededFieldsCount++ + } + if !keepOriginalFields && !skipEmptyResults { + unneededFields.add(f) + } + } + } + if len(outFields) == 0 || unneededFieldsCount < len(outFields) { + unneededFields.remove(fromField) + if iff != nil { + unneededFields.removeFields(iff.neededFields) + } + } + } else { + neededFieldsOrig := neededFields.clone() + needFromField := len(outFields) == 0 + if len(outFields) > 0 { + needFromField = false + for _, f := range outFields { + if neededFieldsOrig.contains(f) { + needFromField = true + } + if !keepOriginalFields && !skipEmptyResults { + neededFields.remove(f) + } + } + } + if needFromField { + neededFields.add(fromField) + if iff != nil { + neededFields.addFields(iff.neededFields) + } + } + } +} + type fieldsUnpackerContext struct { workerID uint fieldPrefix string @@ -132,13 +175,16 @@ func (pup *pipeUnpackProcessor) writeBlock(workerID uint, br *blockResult) { } } else { values := c.getValues(br) - vPrevApplied := "" + vPrev := "" + hadUnpacks := false for i, v := range values { if bm.isSetBit(i) { - if vPrevApplied != v { + if !hadUnpacks || vPrev != v { + vPrev = v + hadUnpacks = true + shard.uctx.resetFields() pup.unpackFunc(&shard.uctx, v) - vPrevApplied = v } shard.wctx.writeRow(i, shard.uctx.fields) } else { diff --git a/lib/logstorage/pipe_unpack_json.go b/lib/logstorage/pipe_unpack_json.go index 0eeecbf23..d2e4c5100 100644 --- a/lib/logstorage/pipe_unpack_json.go +++ b/lib/logstorage/pipe_unpack_json.go @@ -56,47 +56,22 @@ func (pu *pipeUnpackJSON) updateNeededFields(neededFields, unneededFields fields updateNeededFieldsForUnpackPipe(pu.fromField, pu.fields, pu.keepOriginalFields, pu.skipEmptyResults, pu.iff, neededFields, unneededFields) } -func updateNeededFieldsForUnpackPipe(fromField string, outFields []string, keepOriginalFields, skipEmptyResults bool, iff *ifFilter, neededFields, unneededFields fieldsSet) { - if neededFields.contains("*") { - unneededFieldsOrig := unneededFields.clone() - unneededFieldsCount := 0 - if len(outFields) > 0 { - for _, f := range outFields { - if unneededFieldsOrig.contains(f) { - unneededFieldsCount++ - } - if !keepOriginalFields && !skipEmptyResults { - unneededFields.add(f) - } - } - } - if len(outFields) == 0 || unneededFieldsCount < len(outFields) { - unneededFields.remove(fromField) - if iff != nil { - unneededFields.removeFields(iff.neededFields) - } - } - } else { - neededFieldsOrig := neededFields.clone() - needFromField := len(outFields) == 0 - if len(outFields) > 0 { - needFromField = false - for _, f := range outFields { - if neededFieldsOrig.contains(f) { - needFromField = true - } - if !keepOriginalFields && !skipEmptyResults { - neededFields.remove(f) - } - } - } - if needFromField { - neededFields.add(fromField) - if iff != nil { - neededFields.addFields(iff.neededFields) - } - } +func (pu *pipeUnpackJSON) optimize() { + pu.iff.optimizeFilterIn() +} + +func (pu *pipeUnpackJSON) hasFilterInWithQuery() bool { + return pu.iff.hasFilterInWithQuery() +} + +func (pu *pipeUnpackJSON) initFilterInValues(cache map[string][]string, getFieldValuesFunc getFieldValuesFunc) (pipe, error) { + iffNew, err := pu.iff.initFilterInValues(cache, getFieldValuesFunc) + if err != nil { + return nil, err } + puNew := *pu + puNew.iff = iffNew + return &puNew, nil } func (pu *pipeUnpackJSON) newPipeProcessor(workersCount int, _ <-chan struct{}, _ func(), ppBase pipeProcessor) pipeProcessor { diff --git a/lib/logstorage/pipe_unpack_logfmt.go b/lib/logstorage/pipe_unpack_logfmt.go index e2dbfa8f4..5d69786e3 100644 --- a/lib/logstorage/pipe_unpack_logfmt.go +++ b/lib/logstorage/pipe_unpack_logfmt.go @@ -54,6 +54,24 @@ func (pu *pipeUnpackLogfmt) updateNeededFields(neededFields, unneededFields fiel updateNeededFieldsForUnpackPipe(pu.fromField, pu.fields, pu.keepOriginalFields, pu.skipEmptyResults, pu.iff, neededFields, unneededFields) } +func (pu *pipeUnpackLogfmt) optimize() { + pu.iff.optimizeFilterIn() +} + +func (pu *pipeUnpackLogfmt) hasFilterInWithQuery() bool { + return pu.iff.hasFilterInWithQuery() +} + +func (pu *pipeUnpackLogfmt) initFilterInValues(cache map[string][]string, getFieldValuesFunc getFieldValuesFunc) (pipe, error) { + iffNew, err := pu.iff.initFilterInValues(cache, getFieldValuesFunc) + if err != nil { + return nil, err + } + puNew := *pu + puNew.iff = iffNew + return &puNew, nil +} + func (pu *pipeUnpackLogfmt) newPipeProcessor(workersCount int, _ <-chan struct{}, _ func(), ppBase pipeProcessor) pipeProcessor { unpackLogfmt := func(uctx *fieldsUnpackerContext, s string) { p := getLogfmtParser() diff --git a/lib/logstorage/pipe_update.go b/lib/logstorage/pipe_update.go new file mode 100644 index 000000000..490d0ab5b --- /dev/null +++ b/lib/logstorage/pipe_update.go @@ -0,0 +1,103 @@ +package logstorage + +import ( + "unsafe" +) + +func updateNeededFieldsForUpdatePipe(neededFields, unneededFields fieldsSet, field string, iff *ifFilter) { + if neededFields.contains("*") { + if !unneededFields.contains(field) && iff != nil { + unneededFields.removeFields(iff.neededFields) + } + } else { + if neededFields.contains(field) && iff != nil { + neededFields.addFields(iff.neededFields) + } + } +} + +func newPipeUpdateProcessor(workersCount int, updateFunc func(a *arena, v string) string, ppBase pipeProcessor, field string, iff *ifFilter) pipeProcessor { + return &pipeUpdateProcessor{ + updateFunc: updateFunc, + + field: field, + iff: iff, + + ppBase: ppBase, + + shards: make([]pipeUpdateProcessorShard, workersCount), + } +} + +type pipeUpdateProcessor struct { + updateFunc func(a *arena, v string) string + + field string + iff *ifFilter + + ppBase pipeProcessor + + shards []pipeUpdateProcessorShard +} + +type pipeUpdateProcessorShard struct { + pipeUpdateProcessorShardNopad + + // The padding prevents false sharing on widespread platforms with 128 mod (cache line size) = 0 . + _ [128 - unsafe.Sizeof(pipeUpdateProcessorShardNopad{})%128]byte +} + +type pipeUpdateProcessorShardNopad struct { + bm bitmap + + rc resultColumn + a arena +} + +func (pup *pipeUpdateProcessor) writeBlock(workerID uint, br *blockResult) { + if len(br.timestamps) == 0 { + return + } + + shard := &pup.shards[workerID] + + bm := &shard.bm + bm.init(len(br.timestamps)) + bm.setBits() + if iff := pup.iff; iff != nil { + iff.f.applyToBlockResult(br, bm) + if bm.isZero() { + pup.ppBase.writeBlock(workerID, br) + return + } + } + + shard.rc.name = pup.field + + c := br.getColumnByName(pup.field) + values := c.getValues(br) + + hadUpdates := false + vPrev := "" + for rowIdx, v := range values { + if bm.isSetBit(rowIdx) { + if !hadUpdates || vPrev != v { + vPrev = v + hadUpdates = true + + v = pup.updateFunc(&shard.a, v) + } + } + shard.rc.addValue(v) + } + + br.addResultColumn(&shard.rc) + pup.ppBase.writeBlock(workerID, br) + + shard.rc.reset() + shard.a.reset() +} + +func (pup *pipeUpdateProcessor) flush() error { + return nil +} diff --git a/lib/logstorage/storage_search.go b/lib/logstorage/storage_search.go index ad2372376..f7e43b89b 100644 --- a/lib/logstorage/storage_search.go +++ b/lib/logstorage/storage_search.go @@ -429,33 +429,8 @@ func hasFilterInWithQueryForFilter(f filter) bool { func hasFilterInWithQueryForPipes(pipes []pipe) bool { for _, p := range pipes { - switch t := p.(type) { - case *pipeStats: - for _, f := range t.funcs { - if f.iff.hasFilterInWithQuery() { - return true - } - } - case *pipeReplace: - if t.iff.hasFilterInWithQuery() { - return true - } - case *pipeFormat: - if t.iff.hasFilterInWithQuery() { - return true - } - case *pipeExtract: - if t.iff.hasFilterInWithQuery() { - return true - } - case *pipeUnpackJSON: - if t.iff.hasFilterInWithQuery() { - return true - } - case *pipeUnpackLogfmt: - if t.iff.hasFilterInWithQuery() { - return true - } + if p.hasFilterInWithQuery() { + return true } } return false @@ -514,64 +489,11 @@ func initFilterInValuesForFilter(cache map[string][]string, f filter, getFieldVa func initFilterInValuesForPipes(cache map[string][]string, pipes []pipe, getFieldValuesFunc getFieldValuesFunc) ([]pipe, error) { pipesNew := make([]pipe, len(pipes)) for i, p := range pipes { - switch t := p.(type) { - case *pipeStats: - funcsNew := make([]pipeStatsFunc, len(t.funcs)) - for j, f := range t.funcs { - iffNew, err := f.iff.initFilterInValues(cache, getFieldValuesFunc) - if err != nil { - return nil, err - } - f.iff = iffNew - funcsNew[j] = f - } - pipesNew[i] = &pipeStats{ - byFields: t.byFields, - funcs: funcsNew, - } - case *pipeReplace: - iffNew, err := t.iff.initFilterInValues(cache, getFieldValuesFunc) - if err != nil { - return nil, err - } - pr := *t - pr.iff = iffNew - pipesNew[i] = &pr - case *pipeFormat: - iffNew, err := t.iff.initFilterInValues(cache, getFieldValuesFunc) - if err != nil { - return nil, err - } - pf := *t - pf.iff = iffNew - pipesNew[i] = &pf - case *pipeExtract: - iffNew, err := t.iff.initFilterInValues(cache, getFieldValuesFunc) - if err != nil { - return nil, err - } - pe := *t - pe.iff = iffNew - pipesNew[i] = &pe - case *pipeUnpackJSON: - iffNew, err := t.iff.initFilterInValues(cache, getFieldValuesFunc) - if err != nil { - return nil, err - } - pu := *t - pu.iff = iffNew - pipesNew[i] = &pu - case *pipeUnpackLogfmt: - iffNew, err := t.iff.initFilterInValues(cache, getFieldValuesFunc) - if err != nil { - return nil, err - } - pu := *t - pu.iff = iffNew - pipesNew[i] = &pu - default: - pipesNew[i] = p + pNew, err := p.initFilterInValues(cache, getFieldValuesFunc) + if err != nil { + return nil, err } + pipesNew[i] = pNew } return pipesNew, nil }