From 430bebb5f0d044e49c78bc6d64b9184490392aeb Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Thu, 13 Jun 2024 18:49:15 +0200 Subject: [PATCH] lib/logstorage: add `top` pipe, which returns top N field sets with the biggest number of matching logs --- docs/VictoriaLogs/CHANGELOG.md | 2 + docs/VictoriaLogs/LogsQL.md | 40 +- docs/VictoriaLogs/logsql-examples.md | 6 + lib/logstorage/parser.go | 2 +- lib/logstorage/pipe.go | 7 + .../{pipe_topk.go => pipe_sort_topk.go} | 0 lib/logstorage/pipe_top.go | 500 ++++++++++++++++++ lib/logstorage/pipe_top_test.go | 313 +++++++++++ 8 files changed, 868 insertions(+), 2 deletions(-) rename lib/logstorage/{pipe_topk.go => pipe_sort_topk.go} (100%) create mode 100644 lib/logstorage/pipe_top.go create mode 100644 lib/logstorage/pipe_top_test.go diff --git a/docs/VictoriaLogs/CHANGELOG.md b/docs/VictoriaLogs/CHANGELOG.md index c56e8801a..d12602ecc 100644 --- a/docs/VictoriaLogs/CHANGELOG.md +++ b/docs/VictoriaLogs/CHANGELOG.md @@ -19,6 +19,8 @@ according to [these docs](https://docs.victoriametrics.com/victorialogs/quicksta ## tip +* FEATURE: add [`top` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#top-pipe) for returning top N sets of the given fields with the maximum number of matching log entries. + ## [v0.19.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.19.0-victorialogs) Released at 2024-06-11 diff --git a/docs/VictoriaLogs/LogsQL.md b/docs/VictoriaLogs/LogsQL.md index 7ba1cc861..5bb6e12a3 100644 --- a/docs/VictoriaLogs/LogsQL.md +++ b/docs/VictoriaLogs/LogsQL.md @@ -1265,6 +1265,7 @@ LogsQL supports the following pipes: - [`replace_regexp`](#replace_regexp-pipe) updates [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) with regular expressions. - [`sort`](#sort-pipe) sorts logs by the given [fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). - [`stats`](#stats-pipe) calculates various stats over the selected logs. +- [`top`](#top-pipe) returns top `N` field sets with the maximum number of matching logs. - [`uniq`](#uniq-pipe) returns unique log entires. - [`unpack_json`](#unpack_json-pipe) unpacks JSON messages from [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). - [`unpack_logfmt`](#unpack_logfmt-pipe) unpacks [logfmt](https://brandur.org/logfmt) messages from [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). @@ -1573,6 +1574,7 @@ If the limit is reached, then the set of returned values is random. Also the num See also: - [`field_names` pipe](#field_names-pipe) +- [`top` pipe](#top-pipe) - [`uniq` pipe](#uniq-pipe) ### fields pipe @@ -2139,6 +2141,8 @@ See also: - [stats pipe functions](#stats-pipe-functions) - [`math` pipe](#math-pipe) - [`sort` pipe](#sort-pipe) +- [`uniq` pipe](#uniq-pipe) +- [`top` pipe](#top-pipe) #### Stats by fields @@ -2256,9 +2260,41 @@ _time:5m | stats count() total ``` +### top pipe + +`| top N by (field1, ..., fieldN)` [pipe](#pipes) returns top `N` sets for `(field1, ..., fieldN)` [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) +with the maximum number of matching log entries. + +For example, the following query returns top 7 [log streams](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields) +with the maximum number of log entries over the last 5 minutes: + +```logsql +_time:5m | top 7 by (_stream) +``` + +The `N` is optional. If it is skipped, then top 10 entries are returned. For example, the following query returns top 10 values +for `ip` [field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) seen in logs for the last 5 minutes: + +```logsql +_time:5m | top by (ip) +``` + +The `by (...)` part in the `top` [pipe](#pipes) is optional. If it is skipped, then all the log fields are taken into account +when determining top field sets. This is useful when the field sets are already limited by other pipes such as [`fields` pipe](#fields-pipe). +For example, the following query is equivalent to the previous one: + +```logsql +_time:5m | fields ip | top +``` + +See also: + +- [`uniq` pipe](#uniq-pipe) +- [`stats` pipe](#stats-pipe) + ### uniq pipe -`| uniq ...` pipe returns unique results over the selected logs. For example, the following LogsQL query +`| uniq ...` [pipe](#pipes) returns unique results over the selected logs. For example, the following LogsQL query returns unique values for `ip` [log field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) over logs for the last 5 minutes: @@ -2300,6 +2336,8 @@ _time:5m | uniq (host, path) limit 100 See also: - [`uniq_values` stats function](#uniq_values-stats) +- [`top` pipe](#top-pipe) +- [`stats` pipe](#stats-pipe) ### unpack_json pipe diff --git a/docs/VictoriaLogs/logsql-examples.md b/docs/VictoriaLogs/logsql-examples.md index b4706de3e..c676d8b9e 100644 --- a/docs/VictoriaLogs/logsql-examples.md +++ b/docs/VictoriaLogs/logsql-examples.md @@ -286,6 +286,12 @@ This query uses the following [LogsQL](https://docs.victoriametrics.com/victoria - [`sort` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#sort-pipe) for sorting the stats by `logs` field in descending order. - [`limit` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#limit-pipe) for limiting the number of returned results to 10. +This query can be simplified into the following one, which uses [`top` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#top-pipe): + +```logsql +_time:5m | top 10 by (_stream) +``` + See also: - [How to filter out data after stats calculation?](#how-to-filter-out-data-after-stats-calculation) diff --git a/lib/logstorage/parser.go b/lib/logstorage/parser.go index 9aaaa3b26..a4e4d119c 100644 --- a/lib/logstorage/parser.go +++ b/lib/logstorage/parser.go @@ -1316,7 +1316,7 @@ func parseNumber(lex *lexer) (float64, string, error) { return f, s, nil } - return 0, "", fmt.Errorf("cannot parse %q as float64", s) + return 0, s, fmt.Errorf("cannot parse %q as float64", s) } func parseFuncArg(lex *lexer, fieldName string, callback func(args string) (filter, error)) (filter, error) { diff --git a/lib/logstorage/pipe.go b/lib/logstorage/pipe.go index d9890f043..c330fc331 100644 --- a/lib/logstorage/pipe.go +++ b/lib/logstorage/pipe.go @@ -214,6 +214,12 @@ func parsePipe(lex *lexer) (pipe, error) { return nil, fmt.Errorf("cannot parse 'stats' pipe: %w", err) } return ps, nil + case lex.isKeyword("top"): + pt, err := parsePipeTop(lex) + if err != nil { + return nil, fmt.Errorf("cannot parse 'top' pipe: %w", err) + } + return pt, nil case lex.isKeyword("uniq"): pu, err := parsePipeUniq(lex) if err != nil { @@ -287,6 +293,7 @@ var pipeNames = func() map[string]struct{} { "replace_regexp", "sort", "stats", + "top", "uniq", "unpack_json", "unpack_logfmt", diff --git a/lib/logstorage/pipe_topk.go b/lib/logstorage/pipe_sort_topk.go similarity index 100% rename from lib/logstorage/pipe_topk.go rename to lib/logstorage/pipe_sort_topk.go diff --git a/lib/logstorage/pipe_top.go b/lib/logstorage/pipe_top.go new file mode 100644 index 000000000..e633d87c8 --- /dev/null +++ b/lib/logstorage/pipe_top.go @@ -0,0 +1,500 @@ +package logstorage + +import ( + "fmt" + "slices" + "sort" + "strings" + "sync/atomic" + "unsafe" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/memory" +) + +// pipeTopDefaultLimit is the default number of entries pipeTop returns. +const pipeTopDefaultLimit = 10 + +// pipeTop processes '| top ...' queries. +// +// See https://docs.victoriametrics.com/victorialogs/logsql/#top-pipe +type pipeTop struct { + // fields contains field names for returning top values for. + byFields []string + + // limit is the number of top (byFields) sets to return. + limit uint64 + + // limitStr is string representation of the limit. + limitStr string + + // if hitsFieldName isn't empty, then the number of hits per each unique value is returned in this field. + hitsFieldName string +} + +func (pt *pipeTop) String() string { + s := "top" + if pt.limit != pipeTopDefaultLimit { + s += " " + pt.limitStr + } + if len(pt.byFields) > 0 { + s += " by (" + fieldNamesString(pt.byFields) + ")" + } + return s +} + +func (pt *pipeTop) updateNeededFields(neededFields, unneededFields fieldsSet) { + neededFields.reset() + unneededFields.reset() + + if len(pt.byFields) == 0 { + neededFields.add("*") + } else { + neededFields.addFields(pt.byFields) + } +} + +func (pt *pipeTop) optimize() { + // nothing to do +} + +func (pt *pipeTop) hasFilterInWithQuery() bool { + return false +} + +func (pt *pipeTop) initFilterInValues(_ map[string][]string, _ getFieldValuesFunc) (pipe, error) { + return pt, nil +} + +func (pt *pipeTop) newPipeProcessor(workersCount int, stopCh <-chan struct{}, cancel func(), ppNext pipeProcessor) pipeProcessor { + maxStateSize := int64(float64(memory.Allowed()) * 0.2) + + shards := make([]pipeTopProcessorShard, workersCount) + for i := range shards { + shards[i] = pipeTopProcessorShard{ + pipeTopProcessorShardNopad: pipeTopProcessorShardNopad{ + pt: pt, + stateSizeBudget: stateSizeBudgetChunk, + }, + } + maxStateSize -= stateSizeBudgetChunk + } + + ptp := &pipeTopProcessor{ + pt: pt, + stopCh: stopCh, + cancel: cancel, + ppNext: ppNext, + + shards: shards, + + maxStateSize: maxStateSize, + } + ptp.stateSizeBudget.Store(maxStateSize) + + return ptp +} + +type pipeTopProcessor struct { + pt *pipeTop + stopCh <-chan struct{} + cancel func() + ppNext pipeProcessor + + shards []pipeTopProcessorShard + + maxStateSize int64 + stateSizeBudget atomic.Int64 +} + +type pipeTopProcessorShard struct { + pipeTopProcessorShardNopad + + // The padding prevents false sharing on widespread platforms with 128 mod (cache line size) = 0 . + _ [128 - unsafe.Sizeof(pipeTopProcessorShardNopad{})%128]byte +} + +type pipeTopProcessorShardNopad struct { + // pt points to the parent pipeTop. + pt *pipeTop + + // m holds per-row hits. + m map[string]*uint64 + + // keyBuf is a temporary buffer for building keys for m. + keyBuf []byte + + // columnValues is a temporary buffer for the processed column values. + columnValues [][]string + + // stateSizeBudget is the remaining budget for the whole state size for the shard. + // The per-shard budget is provided in chunks from the parent pipeTopProcessor. + stateSizeBudget int +} + +// writeBlock writes br to shard. +func (shard *pipeTopProcessorShard) writeBlock(br *blockResult) { + byFields := shard.pt.byFields + if len(byFields) == 0 { + // Take into account all the columns in br. + keyBuf := shard.keyBuf + cs := br.getColumns() + for i := range br.timestamps { + keyBuf = keyBuf[:0] + for _, c := range cs { + v := c.getValueAtRow(br, i) + keyBuf = encoding.MarshalBytes(keyBuf, bytesutil.ToUnsafeBytes(c.name)) + keyBuf = encoding.MarshalBytes(keyBuf, bytesutil.ToUnsafeBytes(v)) + } + shard.updateState(bytesutil.ToUnsafeString(keyBuf), 1) + } + shard.keyBuf = keyBuf + return + } + if len(byFields) == 1 { + // Fast path for a single field. + c := br.getColumnByName(byFields[0]) + if c.isConst { + v := c.valuesEncoded[0] + shard.updateState(v, uint64(len(br.timestamps))) + return + } + if c.valueType == valueTypeDict { + a := encoding.GetUint64s(len(c.dictValues)) + hits := a.A + valuesEncoded := c.getValuesEncoded(br) + for _, v := range valuesEncoded { + idx := unmarshalUint8(v) + hits[idx]++ + } + for i, v := range c.dictValues { + shard.updateState(v, hits[i]) + } + encoding.PutUint64s(a) + return + } + + values := c.getValues(br) + for _, v := range values { + shard.updateState(v, 1) + } + return + } + + // Take into account only the selected columns. + columnValues := shard.columnValues[:0] + for _, f := range byFields { + c := br.getColumnByName(f) + values := c.getValues(br) + columnValues = append(columnValues, values) + } + shard.columnValues = columnValues + + keyBuf := shard.keyBuf + for i := range br.timestamps { + keyBuf = keyBuf[:0] + for _, values := range columnValues { + keyBuf = encoding.MarshalBytes(keyBuf, bytesutil.ToUnsafeBytes(values[i])) + } + shard.updateState(bytesutil.ToUnsafeString(keyBuf), 1) + } + shard.keyBuf = keyBuf +} + +func (shard *pipeTopProcessorShard) updateState(v string, hits uint64) { + m := shard.getM() + pHits, ok := m[v] + if !ok { + vCopy := strings.Clone(v) + hits := uint64(0) + pHits = &hits + m[vCopy] = pHits + shard.stateSizeBudget -= len(vCopy) + int(unsafe.Sizeof(vCopy)+unsafe.Sizeof(hits)+unsafe.Sizeof(pHits)) + } + *pHits += hits +} + +func (shard *pipeTopProcessorShard) getM() map[string]*uint64 { + if shard.m == nil { + shard.m = make(map[string]*uint64) + } + return shard.m +} + +func (ptp *pipeTopProcessor) writeBlock(workerID uint, br *blockResult) { + if len(br.timestamps) == 0 { + return + } + + shard := &ptp.shards[workerID] + + for shard.stateSizeBudget < 0 { + // steal some budget for the state size from the global budget. + remaining := ptp.stateSizeBudget.Add(-stateSizeBudgetChunk) + if remaining < 0 { + // The state size is too big. Stop processing data in order to avoid OOM crash. + if remaining+stateSizeBudgetChunk >= 0 { + // Notify worker goroutines to stop calling writeBlock() in order to save CPU time. + ptp.cancel() + } + return + } + shard.stateSizeBudget += stateSizeBudgetChunk + } + + shard.writeBlock(br) +} + +func (ptp *pipeTopProcessor) flush() error { + if n := ptp.stateSizeBudget.Load(); n <= 0 { + return fmt.Errorf("cannot calculate [%s], since it requires more than %dMB of memory", ptp.pt.String(), ptp.maxStateSize/(1<<20)) + } + + // merge state across shards + shards := ptp.shards + m := shards[0].getM() + shards = shards[1:] + for i := range shards { + if needStop(ptp.stopCh) { + return nil + } + + for k, pHitsSrc := range shards[i].getM() { + pHits, ok := m[k] + if !ok { + m[k] = pHitsSrc + } else { + *pHits += *pHitsSrc + } + } + } + + // select top entries with the biggest number of hits + entries := make([]pipeTopEntry, 0, len(m)) + for k, pHits := range m { + entries = append(entries, pipeTopEntry{ + k: k, + hits: *pHits, + }) + } + sort.Slice(entries, func(i, j int) bool { + a, b := &entries[i], &entries[j] + if a.hits == b.hits { + return a.k < b.k + } + return a.hits > b.hits + }) + if uint64(len(entries)) > ptp.pt.limit { + entries = entries[:ptp.pt.limit] + } + + // write result + wctx := &pipeTopWriteContext{ + ptp: ptp, + } + byFields := ptp.pt.byFields + var rowFields []Field + + addHitsField := func(dst []Field, hits uint64) []Field { + hitsStr := string(marshalUint64String(nil, hits)) + dst = append(dst, Field{ + Name: ptp.pt.hitsFieldName, + Value: hitsStr, + }) + return dst + } + + if len(byFields) == 0 { + for _, e := range entries { + if needStop(ptp.stopCh) { + return nil + } + + rowFields = rowFields[:0] + keyBuf := bytesutil.ToUnsafeBytes(e.k) + for len(keyBuf) > 0 { + name, nSize := encoding.UnmarshalBytes(keyBuf) + if nSize <= 0 { + logger.Panicf("BUG: cannot unmarshal field name") + } + keyBuf = keyBuf[nSize:] + + value, nSize := encoding.UnmarshalBytes(keyBuf) + if nSize <= 0 { + logger.Panicf("BUG: cannot unmarshal field value") + } + keyBuf = keyBuf[nSize:] + + rowFields = append(rowFields, Field{ + Name: bytesutil.ToUnsafeString(name), + Value: bytesutil.ToUnsafeString(value), + }) + } + rowFields = addHitsField(rowFields, e.hits) + wctx.writeRow(rowFields) + } + } else if len(byFields) == 1 { + fieldName := byFields[0] + for _, e := range entries { + if needStop(ptp.stopCh) { + return nil + } + + rowFields = append(rowFields[:0], Field{ + Name: fieldName, + Value: e.k, + }) + rowFields = addHitsField(rowFields, e.hits) + wctx.writeRow(rowFields) + } + } else { + for _, e := range entries { + if needStop(ptp.stopCh) { + return nil + } + + rowFields = rowFields[:0] + keyBuf := bytesutil.ToUnsafeBytes(e.k) + fieldIdx := 0 + for len(keyBuf) > 0 { + value, nSize := encoding.UnmarshalBytes(keyBuf) + if nSize <= 0 { + logger.Panicf("BUG: cannot unmarshal field value") + } + keyBuf = keyBuf[nSize:] + + rowFields = append(rowFields, Field{ + Name: byFields[fieldIdx], + Value: bytesutil.ToUnsafeString(value), + }) + fieldIdx++ + } + rowFields = addHitsField(rowFields, e.hits) + wctx.writeRow(rowFields) + } + } + + wctx.flush() + + return nil +} + +type pipeTopEntry struct { + k string + hits uint64 +} + +type pipeTopWriteContext struct { + ptp *pipeTopProcessor + rcs []resultColumn + br blockResult + + // rowsCount is the number of rows in the current block + rowsCount int + + // valuesLen is the total length of values in the current block + valuesLen int +} + +func (wctx *pipeTopWriteContext) writeRow(rowFields []Field) { + rcs := wctx.rcs + + areEqualColumns := len(rcs) == len(rowFields) + if areEqualColumns { + for i, f := range rowFields { + if rcs[i].name != f.Name { + areEqualColumns = false + break + } + } + } + if !areEqualColumns { + // send the current block to ppNext and construct a block with new set of columns + wctx.flush() + + rcs = wctx.rcs[:0] + for _, f := range rowFields { + rcs = appendResultColumnWithName(rcs, f.Name) + } + wctx.rcs = rcs + } + + for i, f := range rowFields { + v := f.Value + rcs[i].addValue(v) + wctx.valuesLen += len(v) + } + + wctx.rowsCount++ + if wctx.valuesLen >= 1_000_000 { + wctx.flush() + } +} + +func (wctx *pipeTopWriteContext) flush() { + rcs := wctx.rcs + br := &wctx.br + + wctx.valuesLen = 0 + + // Flush rcs to ppNext + br.setResultColumns(rcs, wctx.rowsCount) + wctx.rowsCount = 0 + wctx.ptp.ppNext.writeBlock(0, br) + br.reset() + for i := range rcs { + rcs[i].resetValues() + } +} + +func parsePipeTop(lex *lexer) (*pipeTop, error) { + if !lex.isKeyword("top") { + return nil, fmt.Errorf("expecting 'top'; got %q", lex.token) + } + lex.nextToken() + + limit := uint64(pipeTopDefaultLimit) + limitStr := "" + if isNumberPrefix(lex.token) { + limitF, s, err := parseNumber(lex) + if err != nil { + return nil, fmt.Errorf("cannot parse N in 'top': %w", err) + } + if limitF < 1 { + return nil, fmt.Errorf("N in 'top %s' must be integer bigger than 0", s) + } + limit = uint64(limitF) + limitStr = s + } + + var byFields []string + if lex.isKeyword("by", "(") { + if lex.isKeyword("by") { + lex.nextToken() + } + bfs, err := parseFieldNamesInParens(lex) + if err != nil { + return nil, fmt.Errorf("cannot parse 'by' clause in 'top': %w", err) + } + if slices.Contains(bfs, "*") { + bfs = nil + } + byFields = bfs + } + + hitsFieldName := "hits" + for slices.Contains(byFields, hitsFieldName) { + hitsFieldName += "s" + } + + pt := &pipeTop{ + byFields: byFields, + limit: limit, + limitStr: limitStr, + hitsFieldName: hitsFieldName, + } + + return pt, nil +} diff --git a/lib/logstorage/pipe_top_test.go b/lib/logstorage/pipe_top_test.go new file mode 100644 index 000000000..edcd4db46 --- /dev/null +++ b/lib/logstorage/pipe_top_test.go @@ -0,0 +1,313 @@ +package logstorage + +import ( + "testing" +) + +func TestParsePipeTopSuccess(t *testing.T) { + f := func(pipeStr string) { + t.Helper() + expectParsePipeSuccess(t, pipeStr) + } + + f(`top`) + f(`top 5`) + f(`top by (x)`) + f(`top 5 by (x)`) + f(`top by (x, y)`) + f(`top 5 by (x, y)`) +} + +func TestParsePipeTopFailure(t *testing.T) { + f := func(pipeStr string) { + t.Helper() + expectParsePipeFailure(t, pipeStr) + } + + f(`top 5 foo`) + f(`top 5 by`) + f(`top 5 by (`) + f(`top 5foo`) + f(`top foo`) + f(`top by`) +} + +func TestPipeTop(t *testing.T) { + f := func(pipeStr string, rows, rowsExpected [][]Field) { + t.Helper() + expectPipeResults(t, pipeStr, rows, rowsExpected) + } + + f("top", [][]Field{ + { + {"a", `2`}, + {"b", `3`}, + }, + { + {"a", "2"}, + {"b", "3"}, + }, + { + {"a", `2`}, + {"b", `54`}, + {"c", "d"}, + }, + }, [][]Field{ + { + {"a", "2"}, + {"b", "3"}, + {"hits", "2"}, + }, + { + {"a", `2`}, + {"b", `54`}, + {"c", "d"}, + {"hits", "1"}, + }, + }) + + f("top 1", [][]Field{ + { + {"a", `2`}, + {"b", `3`}, + }, + { + {"a", "2"}, + {"b", "3"}, + }, + { + {"a", `2`}, + {"b", `54`}, + {"c", "d"}, + }, + }, [][]Field{ + { + {"a", "2"}, + {"b", "3"}, + {"hits", "2"}, + }, + }) + + f("top by (a)", [][]Field{ + { + {"a", `2`}, + {"b", `3`}, + }, + { + {"a", "2"}, + {"b", "3"}, + }, + { + {"a", `2`}, + {"b", `54`}, + {"c", "d"}, + }, + }, [][]Field{ + { + {"a", "2"}, + {"hits", "3"}, + }, + }) + + f("top by (b)", [][]Field{ + { + {"a", `2`}, + {"b", `3`}, + }, + { + {"a", "2"}, + {"b", "3"}, + }, + { + {"a", `2`}, + {"b", `54`}, + {"c", "d"}, + }, + }, [][]Field{ + { + {"b", "3"}, + {"hits", "2"}, + }, + { + {"b", "54"}, + {"hits", "1"}, + }, + }) + + f("top by (hits)", [][]Field{ + { + {"a", `2`}, + {"hits", `3`}, + }, + { + {"a", "2"}, + {"hits", "3"}, + }, + { + {"a", `2`}, + {"hits", `54`}, + {"c", "d"}, + }, + }, [][]Field{ + { + {"hits", "3"}, + {"hitss", "2"}, + }, + { + {"hits", "54"}, + {"hitss", "1"}, + }, + }) + + f("top by (c)", [][]Field{ + { + {"a", `2`}, + {"b", `3`}, + }, + { + {"a", "2"}, + {"b", "3"}, + }, + { + {"a", `2`}, + {"b", `54`}, + {"c", "d"}, + }, + }, [][]Field{ + { + {"c", ""}, + {"hits", "2"}, + }, + { + {"c", "d"}, + {"hits", "1"}, + }, + }) + + f("top by (d)", [][]Field{ + { + {"a", `2`}, + {"b", `3`}, + }, + { + {"a", "2"}, + {"b", "3"}, + }, + { + {"a", `2`}, + {"b", `54`}, + {"c", "d"}, + }, + }, [][]Field{ + { + {"d", ""}, + {"hits", "3"}, + }, + }) + + f("top by (a, b)", [][]Field{ + { + {"a", `2`}, + {"b", `3`}, + }, + { + {"a", "2"}, + {"b", "3"}, + }, + { + {"a", `2`}, + {"b", `54`}, + {"c", "d"}, + }, + }, [][]Field{ + { + {"a", "2"}, + {"b", "3"}, + {"hits", "2"}, + }, + { + {"a", "2"}, + {"b", "54"}, + {"hits", "1"}, + }, + }) + + f("top 10 by (a, b)", [][]Field{ + { + {"a", `2`}, + {"b", `3`}, + }, + { + {"a", "2"}, + {"b", "3"}, + }, + { + {"a", `2`}, + {"b", `54`}, + {"c", "d"}, + }, + }, [][]Field{ + { + {"a", "2"}, + {"b", "3"}, + {"hits", "2"}, + }, + { + {"a", "2"}, + {"b", "54"}, + {"hits", "1"}, + }, + }) + + f("top 1 by (a, b)", [][]Field{ + { + {"a", `2`}, + {"b", `3`}, + }, + { + {"a", "2"}, + {"b", "3"}, + }, + { + {"a", `2`}, + {"b", `54`}, + {"c", "d"}, + }, + }, [][]Field{ + { + {"a", "2"}, + {"b", "3"}, + {"hits", "2"}, + }, + }) +} + +func TestPipeTopUpdateNeededFields(t *testing.T) { + f := func(s, neededFields, unneededFields, neededFieldsExpected, unneededFieldsExpected string) { + t.Helper() + expectPipeNeededFields(t, s, neededFields, unneededFields, neededFieldsExpected, unneededFieldsExpected) + } + + // all the needed fields + f("top", "*", "", "*", "") + f("top by()", "*", "", "*", "") + f("top by(*)", "*", "", "*", "") + f("top by(f1,f2)", "*", "", "f1,f2", "") + f("top by(f1,f2)", "*", "", "f1,f2", "") + + // all the needed fields, unneeded fields do not intersect with src + f("top by(s1, s2)", "*", "f1,f2", "s1,s2", "") + f("top", "*", "f1,f2", "*", "") + + // all the needed fields, unneeded fields intersect with src + f("top by(s1, s2)", "*", "s1,f1,f2", "s1,s2", "") + f("top by(*)", "*", "s1,f1,f2", "*", "") + f("top by(s1, s2)", "*", "s1,s2,f1", "s1,s2", "") + + // needed fields do not intersect with src + f("top by (s1, s2)", "f1,f2", "", "s1,s2", "") + + // needed fields intersect with src + f("top by (s1, s2)", "s1,f1,f2", "", "s1,s2", "") + f("top by (*)", "s1,f1,f2", "", "*", "") +}