From 7654bd60f8ff4067bdfade735e921202eb32de76 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Sat, 25 May 2024 00:22:14 +0200 Subject: [PATCH] wip --- docs/VictoriaLogs/CHANGELOG.md | 1 + docs/VictoriaLogs/LogsQL.md | 46 ++++++ lib/logstorage/parser.go | 2 + lib/logstorage/pipe.go | 6 + lib/logstorage/pipe_replace.go | 229 ++++++++++++++++++++++++++++ lib/logstorage/pipe_replace_test.go | 172 +++++++++++++++++++++ lib/logstorage/storage_search.go | 12 ++ 7 files changed, 468 insertions(+) create mode 100644 lib/logstorage/pipe_replace.go create mode 100644 lib/logstorage/pipe_replace_test.go diff --git a/docs/VictoriaLogs/CHANGELOG.md b/docs/VictoriaLogs/CHANGELOG.md index 12d2ae68a..15aa26473 100644 --- a/docs/VictoriaLogs/CHANGELOG.md +++ b/docs/VictoriaLogs/CHANGELOG.md @@ -19,6 +19,7 @@ according to [these docs](https://docs.victoriametrics.com/victorialogs/quicksta ## tip +* FEATURE: add [`replace` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#replace-pipe), which allows replacing substrings in [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). * FEATURE: support [comparing](https://docs.victoriametrics.com/victorialogs/logsql/#range-filter) log field values with [special numeric values](https://docs.victoriametrics.com/victorialogs/logsql/#numeric-values). For example, `duration:>1.5s` and `response_size:<15KiB` are valid filters now. * FEATURE: properly sort [durations](https://docs.victoriametrics.com/victorialogs/logsql/#duration-values) and [short numeric values](https://docs.victoriametrics.com/victorialogs/logsql/#short-numeric-values) in [`sort` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#sort-pipe). For example, `10s` goes in front of `1h`, while `10KB` goes in front of `1GB`. * FEATURE: add an ability to preserve the original non-empty field values when executing [`extract`](https://docs.victoriametrics.com/victorialogs/logsql/#extract-pipe), [`unpack_json`](https://docs.victoriametrics.com/victorialogs/logsql/#unpack_json-pipe), [`unpack_logfmt`](https://docs.victoriametrics.com/victorialogs/logsql/#unpack_logfmt-pipe) and [`format`](https://docs.victoriametrics.com/victorialogs/logsql/#format-pipe) pipes. diff --git a/docs/VictoriaLogs/LogsQL.md b/docs/VictoriaLogs/LogsQL.md index b3ba96c14..508b0b221 100644 --- a/docs/VictoriaLogs/LogsQL.md +++ b/docs/VictoriaLogs/LogsQL.md @@ -1081,6 +1081,7 @@ LogsQL supports the following pipes: - [`limit`](#limit-pipe) limits the number selected logs. - [`offset`](#offset-pipe) skips the given number of selected logs. - [`rename`](#rename-pipe) renames [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). +- [`replace`](#replace-pipe) replaces substrings in the specified [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). - [`sort`](#sort-pipe) sorts logs by the given [fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). - [`stats`](#stats-pipe) calculates various stats over the selected logs. - [`uniq`](#uniq-pipe) returns unique log entires. @@ -1365,6 +1366,7 @@ _time:5m | format "" as foo skip_empty_results See also: - [Conditional format](#conditional-format) +- [`replace` pipe](#replace-pipe) - [`extract` pipe](#extract-pipe) @@ -1442,6 +1444,49 @@ See also: - [`fields` pipe](#fields-pipe) - [`delete` pipe](#delete-pipe) +### replace pipe + +`| replace ("old", "new") at field` [pipe](#pipes) replaces all the occurences of the `old` substring with the `new` substring +in the given [`field`](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). + +For example, the following query replaces all the `secret-password` substrings with `***` in the [`_msg` field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field) +for logs over the last 5 minutes: + +```logsql +_time:5m | replace ("secret-password", "***") at _msg +``` + +The `at _msg` part can be omitted if the replacement occurs in the [`_msg` field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field). +The following query is equivalent to the previous one: + +```logsql +_time:5m | replace ("secret-password", "***") +``` + +The number of replacements can be limited with `limit N` at the end of `replace`. For example, the following query replaces only the first `foo` substring with `bar` +at the [log field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) `baz`: + +```logsql +_time:5m | replace ('foo', 'bar') at baz limit 1 +``` + +See also: + +- [Conditional replace](#conditional-replace) +- [`format` pipe](#format-pipe) +- [`extract` pipe](#extract-pipe) + +#### Conditional replace + +If the [`replace` pipe](#replace-pipe) musn't be applied to every [log entry](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model), +then add `if ()` after `replace`. +The `` can contain arbitrary [filters](#filters). For example, the following query replaces `secret` with `***` in the `password` field +only if `user_type` field equals to `admin`: + +```logsql +_time:5m | replace if (user_type:=admin) replace ("secret", "***") at password +``` + ### sort pipe By default logs are selected in arbitrary order because of performance reasons. If logs must be sorted, then `| sort by (field1, ..., fieldN)` [pipe](#pipes) can be used. @@ -2213,6 +2258,7 @@ LogsQL supports the following transformations on the log entries selected with [ - Unpacking JSON fields from [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). See [these docs](#unpack_json-pipe). - Unpacking [logfmt](https://brandur.org/logfmt) fields from [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). See [these docs](#unpack_logfmt-pipe). - Creating a new field from existing [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) according to the provided format. See [these docs](#format-pipe). +- Replacing substrings in the given [log field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). See [these docs](#replace-pipe). LogsQL will support the following transformations in the future: diff --git a/lib/logstorage/parser.go b/lib/logstorage/parser.go index 67d36ca80..4cae9d853 100644 --- a/lib/logstorage/parser.go +++ b/lib/logstorage/parser.go @@ -327,6 +327,8 @@ func (q *Query) Optimize() { for _, f := range t.funcs { f.iff.optimizeFilterIn() } + case *pipeReplace: + t.iff.optimizeFilterIn() case *pipeFormat: t.iff.optimizeFilterIn() case *pipeExtract: diff --git a/lib/logstorage/pipe.go b/lib/logstorage/pipe.go index ccc4dbeff..1e4449c1b 100644 --- a/lib/logstorage/pipe.go +++ b/lib/logstorage/pipe.go @@ -141,6 +141,12 @@ func parsePipe(lex *lexer) (pipe, error) { return nil, fmt.Errorf("cannot parse 'rename' pipe: %w", err) } return pr, nil + case lex.isKeyword("replace"): + pr, err := parsePipeReplace(lex) + if err != nil { + return nil, fmt.Errorf("cannot parse 'replace' pipe: %w", err) + } + return pr, nil case lex.isKeyword("sort"): ps, err := parsePipeSort(lex) if err != nil { diff --git a/lib/logstorage/pipe_replace.go b/lib/logstorage/pipe_replace.go new file mode 100644 index 000000000..69993b14d --- /dev/null +++ b/lib/logstorage/pipe_replace.go @@ -0,0 +1,229 @@ +package logstorage + +import ( + "fmt" + "strings" + "unsafe" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" +) + +// pipeReplace processes '| replace ...' pipe. +// +// See https://docs.victoriametrics.com/victorialogs/logsql/#replace-pipe +type pipeReplace struct { + srcField string + oldSubstr string + newSubstr string + + // limit limits the number of replacements, which can be performed + limit uint64 + + // iff is an optional filter for skipping the replace operation + iff *ifFilter +} + +func (pr *pipeReplace) String() string { + s := "replace" + if pr.iff != nil { + s += " " + pr.iff.String() + } + s += fmt.Sprintf(" (%s, %s)", quoteTokenIfNeeded(pr.oldSubstr), quoteTokenIfNeeded(pr.newSubstr)) + if pr.srcField != "_msg" { + s += " at " + quoteTokenIfNeeded(pr.srcField) + } + if pr.limit > 0 { + s += fmt.Sprintf(" limit %d", pr.limit) + } + return s +} + +func (pr *pipeReplace) updateNeededFields(neededFields, unneededFields fieldsSet) { + if neededFields.contains("*") { + if !unneededFields.contains(pr.srcField) && pr.iff != nil { + unneededFields.removeFields(pr.iff.neededFields) + } + } else { + if neededFields.contains(pr.srcField) && pr.iff != nil { + neededFields.addFields(pr.iff.neededFields) + } + } +} + +func (pr *pipeReplace) newPipeProcessor(workersCount int, _ <-chan struct{}, _ func(), ppBase pipeProcessor) pipeProcessor { + return &pipeReplaceProcessor{ + pr: pr, + ppBase: ppBase, + + shards: make([]pipeReplaceProcessorShard, workersCount), + } +} + +type pipeReplaceProcessor struct { + pr *pipeReplace + ppBase pipeProcessor + + shards []pipeReplaceProcessorShard +} + +type pipeReplaceProcessorShard struct { + pipeReplaceProcessorShardNopad + + // The padding prevents false sharing on widespread platforms with 128 mod (cache line size) = 0 . + _ [128 - unsafe.Sizeof(pipeReplaceProcessorShardNopad{})%128]byte +} + +type pipeReplaceProcessorShardNopad struct { + bm bitmap + + uctx fieldsUnpackerContext + wctx pipeUnpackWriteContext +} + +func (prp *pipeReplaceProcessor) writeBlock(workerID uint, br *blockResult) { + if len(br.timestamps) == 0 { + return + } + + shard := &prp.shards[workerID] + shard.wctx.init(workerID, prp.ppBase, false, false, br) + shard.uctx.init(workerID, "") + + pr := prp.pr + bm := &shard.bm + bm.init(len(br.timestamps)) + bm.setBits() + if iff := pr.iff; iff != nil { + iff.f.applyToBlockResult(br, bm) + if bm.isZero() { + prp.ppBase.writeBlock(workerID, br) + return + } + } + + c := br.getColumnByName(pr.srcField) + values := c.getValues(br) + + bb := bbPool.Get() + vPrev := "" + shard.uctx.addField(pr.srcField, "") + for rowIdx, v := range values { + if bm.isSetBit(rowIdx) { + if vPrev != v { + bb.B = appendReplace(bb.B[:0], v, pr.oldSubstr, pr.newSubstr, pr.limit) + s := bytesutil.ToUnsafeString(bb.B) + shard.uctx.resetFields() + shard.uctx.addField(pr.srcField, s) + vPrev = v + } + shard.wctx.writeRow(rowIdx, shard.uctx.fields) + } else { + shard.wctx.writeRow(rowIdx, nil) + } + } + bbPool.Put(bb) + + shard.wctx.flush() + shard.wctx.reset() + shard.uctx.reset() +} + +func (prp *pipeReplaceProcessor) flush() error { + return nil +} + +func parsePipeReplace(lex *lexer) (*pipeReplace, error) { + if !lex.isKeyword("replace") { + return nil, fmt.Errorf("unexpected token: %q; want %q", lex.token, "replace") + } + lex.nextToken() + + // parse optional if (...) + var iff *ifFilter + if lex.isKeyword("if") { + f, err := parseIfFilter(lex) + if err != nil { + return nil, err + } + iff = f + } + + if !lex.isKeyword("(") { + return nil, fmt.Errorf("missing '(' after 'replace'") + } + lex.nextToken() + + oldSubstr, err := getCompoundToken(lex) + if err != nil { + return nil, fmt.Errorf("cannot parse oldSubstr in 'replace': %w", err) + } + if !lex.isKeyword(",") { + return nil, fmt.Errorf("missing ',' after 'replace(%q'", oldSubstr) + } + lex.nextToken() + + newSubstr, err := getCompoundToken(lex) + if err != nil { + return nil, fmt.Errorf("cannot parse newSubstr in 'replace': %w", err) + } + + if !lex.isKeyword(")") { + return nil, fmt.Errorf("missing ')' after 'replace(%q, %q'", oldSubstr, newSubstr) + } + lex.nextToken() + + srcField := "_msg" + if lex.isKeyword("at") { + lex.nextToken() + f, err := parseFieldName(lex) + if err != nil { + return nil, fmt.Errorf("cannot parse 'at' field after 'replace(%q, %q)': %w", oldSubstr, newSubstr, err) + } + srcField = f + } + + limit := uint64(0) + if lex.isKeyword("limit") { + lex.nextToken() + n, ok := tryParseUint64(lex.token) + if !ok { + return nil, fmt.Errorf("cannot parse 'limit %s' in 'replace'", lex.token) + } + lex.nextToken() + limit = n + } + + pr := &pipeReplace{ + srcField: srcField, + oldSubstr: oldSubstr, + newSubstr: newSubstr, + limit: limit, + iff: iff, + } + + return pr, nil +} + +func appendReplace(dst []byte, s, oldSubstr, newSubstr string, limit uint64) []byte { + if len(s) == 0 { + return dst + } + if len(oldSubstr) == 0 { + return append(dst, s...) + } + + replacements := uint64(0) + for { + n := strings.Index(s, oldSubstr) + if n < 0 { + return append(dst, s...) + } + dst = append(dst, s[:n]...) + dst = append(dst, newSubstr...) + s = s[n+len(oldSubstr):] + replacements++ + if limit > 0 && replacements >= limit { + return append(dst, s...) + } + } +} diff --git a/lib/logstorage/pipe_replace_test.go b/lib/logstorage/pipe_replace_test.go new file mode 100644 index 000000000..63ed49d08 --- /dev/null +++ b/lib/logstorage/pipe_replace_test.go @@ -0,0 +1,172 @@ +package logstorage + +import ( + "testing" +) + +func TestParsePipeReplaceSuccess(t *testing.T) { + f := func(pipeStr string) { + t.Helper() + expectParsePipeSuccess(t, pipeStr) + } + + f(`replace (foo, bar)`) + f(`replace (" ", "") at x`) + f(`replace if (x:y) ("-", ":") at a`) + f(`replace (" ", "") at x limit 10`) + f(`replace if (x:y) (" ", "") at foo limit 10`) +} + +func TestParsePipeReplaceFailure(t *testing.T) { + f := func(pipeStr string) { + t.Helper() + expectParsePipeFailure(t, pipeStr) + } + + f(`replace`) + f(`replace if`) + f(`replace foo`) + f(`replace (`) + f(`replace (foo`) + f(`replace (foo,`) + f(`replace(foo,bar`) + f(`replace(foo,bar,baz)`) + f(`replace(foo,bar) abc`) + f(`replace(bar,baz) limit`) + f(`replace(bar,baz) limit N`) +} + +func TestPipeReplace(t *testing.T) { + f := func(pipeStr string, rows, rowsExpected [][]Field) { + t.Helper() + expectPipeResults(t, pipeStr, rows, rowsExpected) + } + + // replace without limits at _msg + f(`replace ("_", "-")`, [][]Field{ + { + {"_msg", `a_bc_def`}, + {"bar", `cde`}, + }, + { + {"_msg", `1234`}, + }, + }, [][]Field{ + { + {"_msg", `a-bc-def`}, + {"bar", `cde`}, + }, + { + {"_msg", `1234`}, + }, + }) + + // replace with limit 1 at foo + f(`replace ("_", "-") at foo limit 1`, [][]Field{ + { + {"foo", `a_bc_def`}, + {"bar", `cde`}, + }, + { + {"foo", `1234`}, + }, + }, [][]Field{ + { + {"foo", `a-bc_def`}, + {"bar", `cde`}, + }, + { + {"foo", `1234`}, + }, + }) + + // replace with limit 100 at foo + f(`replace ("_", "-") at foo limit 100`, [][]Field{ + { + {"foo", `a_bc_def`}, + {"bar", `cde`}, + }, + { + {"foo", `1234`}, + }, + }, [][]Field{ + { + {"foo", `a-bc-def`}, + {"bar", `cde`}, + }, + { + {"foo", `1234`}, + }, + }) + + // conditional replace at foo + f(`replace if (bar:abc) ("_", "") at foo`, [][]Field{ + { + {"foo", `a_bc_def`}, + {"bar", `cde`}, + }, + { + {"foo", `123_456`}, + {"bar", "abc"}, + }, + }, [][]Field{ + { + {"foo", `a_bc_def`}, + {"bar", `cde`}, + }, + { + {"foo", `123456`}, + {"bar", "abc"}, + }, + }) +} + +func TestPipeReplaceUpdateNeededFields(t *testing.T) { + f := func(s string, neededFields, unneededFields, neededFieldsExpected, unneededFieldsExpected string) { + t.Helper() + expectPipeNeededFields(t, s, neededFields, unneededFields, neededFieldsExpected, unneededFieldsExpected) + } + + // all the needed fields + f(`replace ("a", "b") at x`, "*", "", "*", "") + f(`replace if (f1:q) ("a", "b") at x`, "*", "", "*", "") + + // unneeded fields do not intersect with at field + f(`replace ("a", "b") at x`, "*", "f1,f2", "*", "f1,f2") + f(`replace if (f3:q) ("a", "b") at x`, "*", "f1,f2", "*", "f1,f2") + f(`replace if (f2:q) ("a", "b") at x`, "*", "f1,f2", "*", "f1") + + // unneeded fields intersect with at field + f(`replace ("a", "b") at x`, "*", "x,y", "*", "x,y") + f(`replace if (f1:q) ("a", "b") at x`, "*", "x,y", "*", "x,y") + f(`replace if (x:q) ("a", "b") at x`, "*", "x,y", "*", "x,y") + f(`replace if (y:q) ("a", "b") at x`, "*", "x,y", "*", "x,y") + + // needed fields do not intersect with at field + f(`replace ("a", "b") at x`, "f2,y", "", "f2,y", "") + f(`replace if (f1:q) ("a", "b") at x`, "f2,y", "", "f2,y", "") + + // needed fields intersect with at field + f(`replace ("a", "b") at y`, "f2,y", "", "f2,y", "") + f(`replace if (f1:q) ("a", "b") at y`, "f2,y", "", "f1,f2,y", "") +} + +func TestAppendReplace(t *testing.T) { + f := func(s, oldSubstr, newSubstr string, limit int, resultExpected string) { + t.Helper() + + result := appendReplace(nil, s, oldSubstr, newSubstr, uint64(limit)) + if string(result) != resultExpected { + t.Fatalf("unexpected result for appendReplace(%q, %q, %q, %d)\ngot\n%s\nwant\n%s", s, oldSubstr, newSubstr, limit, result, resultExpected) + } + } + + f("", "", "", 0, "") + f("", "foo", "bar", 0, "") + f("foo", "foo", "bar", 0, "bar") + f("foox", "foo", "bar", 0, "barx") + f("afoo", "foo", "bar", 0, "abar") + f("afoox", "foo", "bar", 0, "abarx") + f("foo-bar-baz", "-", "_", 0, "foo_bar_baz") + f("foo bar baz ", " ", "", 0, "foobarbaz") +} diff --git a/lib/logstorage/storage_search.go b/lib/logstorage/storage_search.go index 5ff0384f6..de754582d 100644 --- a/lib/logstorage/storage_search.go +++ b/lib/logstorage/storage_search.go @@ -436,6 +436,10 @@ func hasFilterInWithQueryForPipes(pipes []pipe) bool { return true } } + case *pipeReplace: + if t.iff.hasFilterInWithQuery() { + return true + } case *pipeFormat: if t.iff.hasFilterInWithQuery() { return true @@ -525,6 +529,14 @@ func initFilterInValuesForPipes(cache map[string][]string, pipes []pipe, getFiel byFields: t.byFields, funcs: funcsNew, } + case *pipeReplace: + iffNew, err := t.iff.initFilterInValues(cache, getFieldValuesFunc) + if err != nil { + return nil, err + } + pr := *t + pr.iff = iffNew + pipesNew[i] = &pr case *pipeFormat: iffNew, err := t.iff.initFilterInValues(cache, getFieldValuesFunc) if err != nil {