From 8f6b1262df90ebd88617440ca571eb63c67f3c94 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Sat, 11 May 2024 08:01:31 +0200 Subject: [PATCH] wip --- docs/VictoriaLogs/CHANGELOG.md | 1 + docs/VictoriaLogs/LogsQL.md | 26 ++ lib/logstorage/parser_test.go | 25 ++ lib/logstorage/pipe.go | 6 + lib/logstorage/pipe_sort.go | 8 +- lib/logstorage/pipe_uniq.go | 391 +++++++++++++++++++++++++++++++ lib/logstorage/pipe_uniq_test.go | 45 ++++ 7 files changed, 498 insertions(+), 4 deletions(-) create mode 100644 lib/logstorage/pipe_uniq.go create mode 100644 lib/logstorage/pipe_uniq_test.go diff --git a/docs/VictoriaLogs/CHANGELOG.md b/docs/VictoriaLogs/CHANGELOG.md index 65cf35c96..9b64a8446 100644 --- a/docs/VictoriaLogs/CHANGELOG.md +++ b/docs/VictoriaLogs/CHANGELOG.md @@ -23,6 +23,7 @@ according to [these docs](https://docs.victoriametrics.com/VictoriaLogs/QuickSta * FEATURE: add support for returning only the requested log [fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). See [these docs](https://docs.victoriametrics.com/victorialogs/logsql/#fields-pipe). * FEATURE: add support for calculating various stats over [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). Grouping by arbitrary set of [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) is supported. See [these docs](https://docs.victoriametrics.com/victorialogs/logsql/#stats-pipe) for details. * FEATURE: add support for sorting the returned results. See [these docs](https://docs.victoriametrics.com/victorialogs/logsql/#sort-pipe). +* FEATURE: add support for returning unique results. See [these docs](https://docs.victoriametrics.com/victorialogs/logsql/#uniq-pipe). * FEATURE: add support for limiting the number of returned results. See [these docs](https://docs.victoriametrics.com/victorialogs/logsql/#limiters). * FEATURE: add support for copying and renaming the selected log fields. See [these](https://docs.victoriametrics.com/victorialogs/logsql/#copy-pipe) and [these](https://docs.victoriametrics.com/victorialogs/logsql/#rename-pipe) docs. * FEATURE: allow using `_` inside numbers. For example, `score:range[1_000, 5_000_000]` for [`range` filter](https://docs.victoriametrics.com/victorialogs/logsql/#range-filter). diff --git a/docs/VictoriaLogs/LogsQL.md b/docs/VictoriaLogs/LogsQL.md index f51d6fae6..7faa4d0c3 100644 --- a/docs/VictoriaLogs/LogsQL.md +++ b/docs/VictoriaLogs/LogsQL.md @@ -1051,6 +1051,7 @@ LogsQL supports the following pipes: - [`rename`](#rename-pipe) renames [log fields](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#data-model). - [`sort`](#sort-pipe) sorts logs by the given [fields](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#data-model). - [`stats`](#stats-pipe) calculates various stats over the selected logs. +- [`uniq`](#uniq-pipe) returns unique log entires. ### copy pipe @@ -1206,6 +1207,31 @@ See also: - [`limit` pipe](#limit-pipe) - [`offset` pipe](#offset-pipe) +### uniq pipe + +`| uniq ...` pipe allows returning only unique results over the selected logs. For example, the following LogsQL query +returns uniq values for `ip` [log field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) +over logs for the last 5 minutes: + +```logsql +_time:5m | uniq by (ip) +``` + +It is possible to specify multiple fields inside `by(...)` clause. In this case all the unique sets for the given fields +are returned. For example, the following query returns all the unique `(host, path)` pairs for the logs over the last 5 minutes: + +```logsql +_time:5m | uniq by (host, path) +``` + +Unique entries are stored in memory during query execution. Big number of unique selected entries may require a lot of memory. +Sometimes it is enough to return up to `N` unique entries. This can be done by adding `limit N` after `by (...)` clause. +This allows limiting memory usage. For example, the following query returns up to 100 unique `(host, path)` pairs for the logs over the last 5 minutes: + +```logsql +_time:5m | uniq by (host, path) limit 100 +``` + ### stats pipe `| stats ...` pipe allows calculating various stats over the selected logs. For example, the following LogsQL query diff --git a/lib/logstorage/parser_test.go b/lib/logstorage/parser_test.go index 7c37230bd..97a06ee1d 100644 --- a/lib/logstorage/parser_test.go +++ b/lib/logstorage/parser_test.go @@ -954,6 +954,15 @@ func TestParseQuerySuccess(t *testing.T) { f(`* | sORt bY (_time, _stream DEsc, host)`, `* | sort by (_time, _stream desc, host)`) f(`* | sort bY (foo desc, bar,) desc`, `* | sort by (foo desc, bar) desc`) + // uniq pipe + f(`* | uniq`, `* | uniq`) + f(`* | uniq by()`, `* | uniq`) + f(`* | uniq by(*)`, `* | uniq`) + f(`* | uniq by(foo,*,bar)`, `* | uniq`) + f(`* | uniq by(f1,f2)`, `* | uniq by (f1, f2)`) + f(`* | uniq by(f1,f2) limit 10`, `* | uniq by (f1, f2) limit 10`) + f(`* | uniq limit 10`, `* | uniq limit 10`) + // multiple different pipes f(`* | fields foo, bar | limit 100 | stats by(foo,bar) count(baz) as qwert`, `* | fields foo, bar | limit 100 | stats by (foo, bar) count(baz) as qwert`) f(`* | skip 100 | head 20 | skip 10`, `* | offset 100 | limit 20 | offset 10`) @@ -1288,6 +1297,16 @@ func TestParseQueryFailure(t *testing.T) { f(`foo | sort by(baz`) f(`foo | sort by(baz,`) f(`foo | sort by(bar) foo`) + + // invalid uniq pipe + f(`foo | uniq bar`) + f(`foo | uniq limit`) + f(`foo | uniq by(`) + f(`foo | uniq by(a`) + f(`foo | uniq by(a,`) + f(`foo | uniq by(a) bar`) + f(`foo | uniq by(a) limit -10`) + f(`foo | uniq by(a) limit foo`) } func TestQueryGetNeededColumns(t *testing.T) { @@ -1398,6 +1417,12 @@ func TestQueryGetNeededColumns(t *testing.T) { f(`* | stats by(f3,f4) count(f1,f2) r1 | stats count(f2) r1, count(r1) r2 | fields r2`, `f1,f2,f3,f4`, ``) f(`* | stats by(f3,f4) count(f1,f2) r1 | stats count(f3) r1, count(r1) r2 | fields r1`, `f3,f4`, ``) + f(`* | uniq`, `*`, ``) + f(`* | uniq by (f1,f2)`, `f1,f2`, ``) + f(`* | uniq by (f1,f2) | fields f1,f3`, `f1,f2`, ``) + f(`* | uniq by (f1,f2) | rm f1,f3`, `f1,f2`, ``) + f(`* | uniq by (f1,f2) | fields f3`, `f1,f2`, ``) + f(`* | rm f1, f2`, `*`, `f1,f2`) f(`* | rm f1, f2 | mv f2 f3`, `*`, `f1,f2,f3`) f(`* | rm f1, f2 | cp f2 f3`, `*`, `f1,f2,f3`) diff --git a/lib/logstorage/pipe.go b/lib/logstorage/pipe.go index c1576f465..55c2bab72 100644 --- a/lib/logstorage/pipe.go +++ b/lib/logstorage/pipe.go @@ -83,6 +83,12 @@ func parsePipes(lex *lexer) ([]pipe, error) { return nil, fmt.Errorf("cannot parse 'sort' pipe: %w", err) } pipes = append(pipes, ps) + case lex.isKeyword("uniq"): + pu, err := parsePipeUniq(lex) + if err != nil { + return nil, fmt.Errorf("cannot parse 'uniq' pipe: %w", err) + } + pipes = append(pipes, pu) case lex.isKeyword("limit", "head"): pl, err := parsePipeLimit(lex) if err != nil { diff --git a/lib/logstorage/pipe_sort.go b/lib/logstorage/pipe_sort.go index c411a5be2..4b9ddf11b 100644 --- a/lib/logstorage/pipe_sort.go +++ b/lib/logstorage/pipe_sort.go @@ -30,8 +30,8 @@ func (ps *pipeSort) String() string { s := "sort" if len(ps.byFields) > 0 { a := make([]string, len(ps.byFields)) - for i := range ps.byFields { - a[i] = ps.byFields[i].String() + for i, bf := range ps.byFields { + a[i] = bf.String() } s += " by (" + strings.Join(a, ", ") + ")" } @@ -99,7 +99,7 @@ type pipeSortProcessorShard struct { } type pipeSortProcessorShardNopad struct { - // ps point to the parent pipeSort. + // ps points to the parent pipeSort. ps *pipeSort // blocks holds all the blocks with logs written to the shard. @@ -165,7 +165,7 @@ func (c *sortBlockByColumn) getF64ValueAtRow(rowIdx int) float64 { return c.f64Values[rowIdx] } -// writeBlock writes br with the given byFields to shard. +// writeBlock writes br to shard. func (shard *pipeSortProcessorShard) writeBlock(br *blockResult) { // clone br, so it could be owned by shard br = br.clone() diff --git a/lib/logstorage/pipe_uniq.go b/lib/logstorage/pipe_uniq.go new file mode 100644 index 000000000..95936fb52 --- /dev/null +++ b/lib/logstorage/pipe_uniq.go @@ -0,0 +1,391 @@ +package logstorage + +import ( + "fmt" + "slices" + "sync/atomic" + "unsafe" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/memory" +) + +// pipeUniq processes '| uniq ...' queries. +// +// See https://docs.victoriametrics.com/victorialogs/logsql/#uniq-pipe +type pipeUniq struct { + // fields contains field names for returning unique values + byFields []string + + limit uint64 +} + +func (pu *pipeUniq) String() string { + s := "uniq" + if len(pu.byFields) > 0 { + s += " by (" + fieldNamesString(pu.byFields) + ")" + } + if pu.limit > 0 { + s += fmt.Sprintf(" limit %d", pu.limit) + } + return s +} + +func (pu *pipeUniq) updateNeededFields(neededFields, unneededFields fieldsSet) { + neededFields.reset() + unneededFields.reset() + + if len(pu.byFields) == 0 { + neededFields.add("*") + } else { + neededFields.addAll(pu.byFields) + } +} + +func (pu *pipeUniq) newPipeProcessor(workersCount int, stopCh <-chan struct{}, cancel func(), ppBase pipeProcessor) pipeProcessor { + maxStateSize := int64(float64(memory.Allowed()) * 0.2) + + shards := make([]pipeUniqProcessorShard, workersCount) + for i := range shards { + shard := &shards[i] + shard.pu = pu + shard.m = make(map[string]struct{}) + shard.stateSizeBudget = stateSizeBudgetChunk + maxStateSize -= stateSizeBudgetChunk + } + + pup := &pipeUniqProcessor{ + pu: pu, + stopCh: stopCh, + cancel: cancel, + ppBase: ppBase, + + shards: shards, + + maxStateSize: maxStateSize, + } + pup.stateSizeBudget.Store(maxStateSize) + + return pup +} + +type pipeUniqProcessor struct { + pu *pipeUniq + stopCh <-chan struct{} + cancel func() + ppBase pipeProcessor + + shards []pipeUniqProcessorShard + + maxStateSize int64 + stateSizeBudget atomic.Int64 +} + +type pipeUniqProcessorShard struct { + pipeUniqProcessorShardNopad + + // The padding prevents false sharing on widespread platforms with 128 mod (cache line size) = 0 . + _ [128 - unsafe.Sizeof(pipeUniqProcessorShardNopad{})%128]byte +} + +type pipeUniqProcessorShardNopad struct { + // pu points to the parent pipeUniq. + pu *pipeUniq + + // m holds unique rows. + m map[string]struct{} + + // keyBuf is a temporary buffer for building keys for m. + keyBuf []byte + + // columnValues is a temporary buffer for the processed column values. + columnValues [][]string + + // stateSizeBudget is the remaining budget for the whole state size for the shard. + // The per-shard budget is provided in chunks from the parent pipeUniqProcessor. + stateSizeBudget int +} + +// writeBlock writes br to shard. +// +// It returns false if the block cannot be written because of the exceeded limit. +func (shard *pipeUniqProcessorShard) writeBlock(br *blockResult) bool { + if limit := shard.pu.limit; limit > 0 && uint64(len(shard.m)) >= limit { + return false + } + + m := shard.m + byFields := shard.pu.byFields + if len(byFields) == 0 { + // Take into account all the columns in br. + keyBuf := shard.keyBuf + for i := range br.timestamps { + keyBuf = keyBuf[:0] + for _, c := range br.getColumns() { + v := c.getValueAtRow(br, i) + keyBuf = encoding.MarshalBytes(keyBuf, bytesutil.ToUnsafeBytes(c.name)) + keyBuf = encoding.MarshalBytes(keyBuf, bytesutil.ToUnsafeBytes(v)) + } + if _, ok := m[string(keyBuf)]; !ok { + m[string(keyBuf)] = struct{}{} + shard.stateSizeBudget -= len(keyBuf) + int(unsafe.Sizeof("")) + } + } + shard.keyBuf = keyBuf + return true + } + + // Take into account only the selected columns. + columnValues := shard.columnValues[:0] + for _, f := range byFields { + c := br.getColumnByName(f) + columnValues = append(columnValues, c.getValues(br)) + } + shard.columnValues = columnValues + + keyBuf := shard.keyBuf + for i := range br.timestamps { + seenValue := true + for _, values := range columnValues { + if i == 0 || values[i-1] != values[i] { + seenValue = false + break + } + } + if seenValue { + continue + } + + keyBuf = keyBuf[:0] + for _, values := range columnValues { + keyBuf = encoding.MarshalBytes(keyBuf, bytesutil.ToUnsafeBytes(values[i])) + } + if _, ok := m[string(keyBuf)]; !ok { + m[string(keyBuf)] = struct{}{} + shard.stateSizeBudget -= len(keyBuf) + int(unsafe.Sizeof("")) + } + } + shard.keyBuf = keyBuf + + return true +} + +func (pup *pipeUniqProcessor) writeBlock(workerID uint, br *blockResult) { + if len(br.timestamps) == 0 { + return + } + + shard := &pup.shards[workerID] + + for shard.stateSizeBudget < 0 { + // steal some budget for the state size from the global budget. + remaining := pup.stateSizeBudget.Add(-stateSizeBudgetChunk) + if remaining < 0 { + // The state size is too big. Stop processing data in order to avoid OOM crash. + if remaining+stateSizeBudgetChunk >= 0 { + // Notify worker goroutines to stop calling writeBlock() in order to save CPU time. + pup.cancel() + } + return + } + shard.stateSizeBudget += stateSizeBudgetChunk + } + + if !shard.writeBlock(br) { + pup.cancel() + } +} + +func (pup *pipeUniqProcessor) flush() error { + if n := pup.stateSizeBudget.Load(); n <= 0 { + return fmt.Errorf("cannot calculate [%s], since it requires more than %dMB of memory", pup.pu.String(), pup.maxStateSize/(1<<20)) + } + + // merge state across shards + shards := pup.shards + m := shards[0].m + shards = shards[1:] + for i := range shards { + select { + case <-pup.stopCh: + return nil + default: + } + + for k := range shards[i].m { + m[k] = struct{}{} + } + } + + // write result + wctx := &pipeUniqWriteContext{ + pup: pup, + } + byFields := pup.pu.byFields + var rowFields []Field + + if len(byFields) == 0 { + for k := range m { + select { + case <-pup.stopCh: + return nil + default: + } + + rowFields = rowFields[:0] + keyBuf := bytesutil.ToUnsafeBytes(k) + for len(keyBuf) > 0 { + tail, name, err := encoding.UnmarshalBytes(keyBuf) + if err != nil { + logger.Panicf("BUG: cannot unmarshal field name: %s", err) + } + keyBuf = tail + + tail, value, err := encoding.UnmarshalBytes(keyBuf) + if err != nil { + logger.Panicf("BUG: cannot unmarshal field value: %s", err) + } + keyBuf = tail + + rowFields = append(rowFields, Field{ + Name: bytesutil.ToUnsafeString(name), + Value: bytesutil.ToUnsafeString(value), + }) + } + wctx.writeRow(rowFields) + } + } else { + for k := range m { + select { + case <-pup.stopCh: + return nil + default: + } + + rowFields = rowFields[:0] + keyBuf := bytesutil.ToUnsafeBytes(k) + fieldIdx := 0 + for len(keyBuf) > 0 { + tail, value, err := encoding.UnmarshalBytes(keyBuf) + if err != nil { + logger.Panicf("BUG: cannot unmarshal field value: %s", err) + } + keyBuf = tail + + rowFields = append(rowFields, Field{ + Name: byFields[fieldIdx], + Value: bytesutil.ToUnsafeString(value), + }) + fieldIdx++ + } + wctx.writeRow(rowFields) + } + } + + wctx.flush() + + return nil +} + +type pipeUniqWriteContext struct { + pup *pipeUniqProcessor + rcs []resultColumn + br blockResult + + rowsWritten uint64 + + valuesLen int +} + +func (wctx *pipeUniqWriteContext) writeRow(rowFields []Field) { + if limit := wctx.pup.pu.limit; limit > 0 && wctx.rowsWritten >= limit { + return + } + wctx.rowsWritten++ + + rcs := wctx.rcs + + areEqualColumns := len(rcs) == len(rowFields) + if areEqualColumns { + for i, f := range rowFields { + if rcs[i].name != f.Name { + areEqualColumns = false + break + } + } + } + if !areEqualColumns { + // send the current block to bbBase and construct a block with new set of columns + wctx.flush() + + rcs = wctx.rcs[:0] + for _, f := range rowFields { + rcs = append(rcs, resultColumn{ + name: f.Name, + }) + } + wctx.rcs = rcs + } + + for i, f := range rowFields { + v := f.Value + rcs[i].addValue(v) + wctx.valuesLen += len(v) + } + if wctx.valuesLen >= 1_000_000 { + wctx.flush() + } +} + +func (wctx *pipeUniqWriteContext) flush() { + rcs := wctx.rcs + br := &wctx.br + + wctx.valuesLen = 0 + + if len(rcs) == 0 { + return + } + + // Flush rcs to ppBase + br.setResultColumns(rcs) + wctx.pup.ppBase.writeBlock(0, br) + br.reset() + for i := range rcs { + rcs[i].resetKeepName() + } +} + +func parsePipeUniq(lex *lexer) (*pipeUniq, error) { + if !lex.isKeyword("uniq") { + return nil, fmt.Errorf("expecting 'uniq'; got %q", lex.token) + } + lex.nextToken() + + var pu pipeUniq + if lex.isKeyword("by") { + lex.nextToken() + bfs, err := parseFieldNamesInParens(lex) + if err != nil { + return nil, fmt.Errorf("cannot parse 'by' clause: %w", err) + } + if slices.Contains(bfs, "*") { + bfs = nil + } + pu.byFields = bfs + } + + if lex.isKeyword("limit") { + lex.nextToken() + n, ok := tryParseUint64(lex.token) + if !ok { + return nil, fmt.Errorf("cannot parse 'limit %s'", lex.token) + } + lex.nextToken() + pu.limit = n + } + + return &pu, nil +} diff --git a/lib/logstorage/pipe_uniq_test.go b/lib/logstorage/pipe_uniq_test.go new file mode 100644 index 000000000..6896c3b04 --- /dev/null +++ b/lib/logstorage/pipe_uniq_test.go @@ -0,0 +1,45 @@ +package logstorage + +import ( + "testing" +) + +func TestPipeUniqUpdateNeededFields(t *testing.T) { + f := func(s, neededFields, unneededFields, neededFieldsExpected, unneededFieldsExpected string) { + t.Helper() + + nfs := newTestFieldsSet(neededFields) + unfs := newTestFieldsSet(unneededFields) + + lex := newLexer(s) + p, err := parsePipeUniq(lex) + if err != nil { + t.Fatalf("cannot parse %s: %s", s, err) + } + p.updateNeededFields(nfs, unfs) + + assertNeededFields(t, nfs, unfs, neededFieldsExpected, unneededFieldsExpected) + } + + // all the needed fields + f("uniq", "*", "", "*", "") + f("uniq by()", "*", "", "*", "") + f("uniq by(*)", "*", "", "*", "") + f("uniq by(f1,f2)", "*", "", "f1,f2", "") + + // all the needed fields, unneeded fields do not intersect with src + f("uniq by(s1, s2)", "*", "f1,f2", "s1,s2", "") + f("uniq", "*", "f1,f2", "*", "") + + // all the needed fields, unneeded fields intersect with src + f("uniq by(s1, s2)", "*", "s1,f1,f2", "s1,s2", "") + f("uniq by(*)", "*", "s1,f1,f2", "*", "") + f("uniq by(s1, s2)", "*", "s1,s2,f1", "s1,s2", "") + + // needed fields do not intersect with src + f("uniq by (s1, s2)", "f1,f2", "", "s1,s2", "") + + // needed fields intersect with src + f("uniq by (s1, s2)", "s1,f1,f2", "", "s1,s2", "") + f("uniq by (*)", "s1,f1,f2", "", "*", "") +}