From 33a01c659b3fb5400f581fdad4a336991b8c6316 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Wed, 15 May 2024 02:45:43 +0200 Subject: [PATCH] wip --- docs/VictoriaLogs/CHANGELOG.md | 1 + docs/VictoriaLogs/LogsQL.md | 12 + lib/logstorage/parser_test.go | 8 + lib/logstorage/pipe.go | 2 +- lib/logstorage/pipe_sort.go | 104 +++--- lib/logstorage/pipe_stats.go | 8 +- lib/logstorage/pipe_topk.go | 546 +++++++++++++++++++++++++++++++ lib/logstorage/pipe_uniq.go | 12 +- lib/logstorage/storage_search.go | 16 +- 9 files changed, 643 insertions(+), 66 deletions(-) create mode 100644 lib/logstorage/pipe_topk.go diff --git a/docs/VictoriaLogs/CHANGELOG.md b/docs/VictoriaLogs/CHANGELOG.md index f88962963..f4575985a 100644 --- a/docs/VictoriaLogs/CHANGELOG.md +++ b/docs/VictoriaLogs/CHANGELOG.md @@ -19,6 +19,7 @@ according to [these docs](https://docs.victoriametrics.com/VictoriaLogs/QuickSta ## tip +* FEATURE: add ability to return the first `N` results from [`sort` pipe](#https://docs.victoriametrics.com/victorialogs/logsql/#sort-pipe). This is useful when `N` biggest or `N` smallest values must be returned from large amounts of logs. * FEATURE: add [`quantile`](https://docs.victoriametrics.com/victorialogs/logsql/#quantile-stats) and [`median`](https://docs.victoriametrics.com/victorialogs/logsql/#median-stats) [stats functions](https://docs.victoriametrics.com/victorialogs/logsql/#stats-pipe). ## [v0.6.1](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.6.1-victorialogs) diff --git a/docs/VictoriaLogs/LogsQL.md b/docs/VictoriaLogs/LogsQL.md index b4de1dd9a..ae98cd04f 100644 --- a/docs/VictoriaLogs/LogsQL.md +++ b/docs/VictoriaLogs/LogsQL.md @@ -1198,11 +1198,23 @@ The reverse order can be applied globally via `desc` keyword after `by(...)` cla _time:5m | sort by (foo, bar) desc ``` +Sorting of big number of logs can consume a lot of CPU time and memory. Sometimes it is enough to return the first `N` entries with the biggest +or smallest values. This can be done by adding ``first N` to the end of `sort ...` pipe. +Such a query consumes lower amounts of memory when sorting big number of logs, since it keeps in memory only `N` log entries. +For example, the following query returns top 10 log entries with the biggest values +for the `request_duration` [field](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#data-model) during the last hour: + +```logsql +_time:1h | sort by (request_duration desc) first 10 +``` + Note that sorting of big number of logs can be slow and can consume a lot of additional memory. It is recommended limiting the number of logs before sorting with the following approaches: +- Adding `first N` to the end of `sort ...` pipe. - Reducing the selected time range with [time filter](#time-filter). - Using more specific [filters](#filters), so they select less logs. +- Limiting the number of selected [fields](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#data-model) via [`fields` pipe](#fields-pipe). See also: diff --git a/lib/logstorage/parser_test.go b/lib/logstorage/parser_test.go index 0595c2de0..e5a04f417 100644 --- a/lib/logstorage/parser_test.go +++ b/lib/logstorage/parser_test.go @@ -974,6 +974,10 @@ func TestParseQuerySuccess(t *testing.T) { f(`* | sort bY (foo)`, `* | sort by (foo)`) f(`* | sORt bY (_time, _stream DEsc, host)`, `* | sort by (_time, _stream desc, host)`) f(`* | sort bY (foo desc, bar,) desc`, `* | sort by (foo desc, bar) desc`) + f(`* | sort first 10`, `* | sort first 10`) + f(`* | sort desc first 10`, `* | sort desc first 10`) + f(`* | sort by (foo desc, bar) first 10`, `* | sort by (foo desc, bar) first 10`) + f(`* | sort by (foo desc, bar) desc first 10`, `* | sort by (foo desc, bar) desc first 10`) // uniq pipe f(`* | uniq`, `* | uniq`) @@ -1330,6 +1334,10 @@ func TestParseQueryFailure(t *testing.T) { f(`foo | sort by(baz`) f(`foo | sort by(baz,`) f(`foo | sort by(bar) foo`) + f(`foo | sort by(bar) first`) + f(`foo | sort by(bar) first foo`) + f(`foo | sort by(bar) first -1234`) + f(`foo | sort by(bar) first 12.34`) // invalid uniq pipe f(`foo | uniq bar`) diff --git a/lib/logstorage/pipe.go b/lib/logstorage/pipe.go index 55c2bab72..7623b6075 100644 --- a/lib/logstorage/pipe.go +++ b/lib/logstorage/pipe.go @@ -65,7 +65,7 @@ func parsePipes(lex *lexer) ([]pipe, error) { var pipes []pipe for !lex.isKeyword(")", "") { if !lex.isKeyword("|") { - return nil, fmt.Errorf("expecting '|'") + return nil, fmt.Errorf("expecting '|'; got %q", lex.token) } if !lex.mustNextToken() { return nil, fmt.Errorf("missing token after '|'") diff --git a/lib/logstorage/pipe_sort.go b/lib/logstorage/pipe_sort.go index aee2028cf..b7b0f7533 100644 --- a/lib/logstorage/pipe_sort.go +++ b/lib/logstorage/pipe_sort.go @@ -25,6 +25,11 @@ type pipeSort struct { // whether to apply descending order isDesc bool + + // how many results to return + // + // if zero, then all the results are returned + limit uint64 } func (ps *pipeSort) String() string { @@ -39,6 +44,9 @@ func (ps *pipeSort) String() string { if ps.isDesc { s += " desc" } + if ps.limit > 0 { + s += fmt.Sprintf(" first %d", ps.limit) + } return s } @@ -55,6 +63,13 @@ func (ps *pipeSort) updateNeededFields(neededFields, unneededFields fieldsSet) { } func (ps *pipeSort) newPipeProcessor(workersCount int, stopCh <-chan struct{}, cancel func(), ppBase pipeProcessor) pipeProcessor { + if ps.limit > 0 { + return newPipeTopkProcessor(ps, workersCount, stopCh, cancel, ppBase) + } + return newPipeSortProcessor(ps, workersCount, stopCh, cancel, ppBase) +} + +func newPipeSortProcessor(ps *pipeSort, workersCount int, stopCh <-chan struct{}, cancel func(), ppBase pipeProcessor) pipeProcessor { maxStateSize := int64(float64(memory.Allowed()) * 0.2) shards := make([]pipeSortProcessorShard, workersCount) @@ -117,6 +132,9 @@ type pipeSortProcessorShardNopad struct { // stateSizeBudget is the remaining budget for the whole state size for the shard. // The per-shard budget is provided in chunks from the parent pipeSortProcessor. stateSizeBudget int + + // columnValues is used as temporary buffer at pipeSortProcessorShard.writeBlock + columnValues [][]string } // sortBlock represents a block of logs for sorting. @@ -176,16 +194,23 @@ func (shard *pipeSortProcessorShard) writeBlock(br *blockResult) { if len(byFields) == 0 { // Sort by all the columns + columnValues := shard.columnValues[:0] + for _, c := range cs { + columnValues = append(columnValues, c.getValues(br)) + } + shard.columnValues = columnValues + // Generate byColumns var rc resultColumn + bb := bbPool.Get() - for i := range br.timestamps { - // JSON-encode all the columns per each row into a single string + for rowIdx := range br.timestamps { + // Marshal all the columns per each row into a single string // and sort rows by the resulting string. bb.B = bb.B[:0] - for _, c := range cs { - v := c.getValueAtRow(br, i) - bb.B = marshalJSONKeyValue(bb.B, c.name, v) + for i, values := range columnValues { + v := values[rowIdx] + bb.B = marshalJSONKeyValue(bb.B, cs[i].name, v) bb.B = append(bb.B, ',') } rc.addValue(bytesutil.ToUnsafeString(bb.B)) @@ -358,10 +383,8 @@ func (psp *pipeSortProcessor) flush() error { return fmt.Errorf("cannot calculate [%s], since it requires more than %dMB of memory", psp.ps.String(), psp.maxStateSize/(1<<20)) } - select { - case <-psp.stopCh: + if needStop(psp.stopCh) { return nil - default: } // Sort every shard in parallel @@ -377,17 +400,15 @@ func (psp *pipeSortProcessor) flush() error { } wg.Wait() - select { - case <-psp.stopCh: + if needStop(psp.stopCh) { return nil - default: } // Merge sorted results across shards sh := pipeSortProcessorShardsHeap(make([]*pipeSortProcessorShard, 0, len(shards))) for i := range shards { shard := &shards[i] - if shard.Len() > 0 { + if len(shard.rowRefs) > 0 { sh = append(sh, shard) } } @@ -400,49 +421,43 @@ func (psp *pipeSortProcessor) flush() error { wctx := &pipeSortWriteContext{ psp: psp, } - var shardNext *pipeSortProcessorShard + shardNextIdx := 0 for len(sh) > 1 { shard := sh[0] - wctx.writeRow(shard, shard.rowRefNext) - shard.rowRefNext++ + wctx.writeNextRow(shard) if shard.rowRefNext >= len(shard.rowRefs) { _ = heap.Pop(&sh) - shardNext = nil + shardNextIdx = 0 - select { - case <-psp.stopCh: + if needStop(psp.stopCh) { return nil - default: } continue } - if shardNext == nil { - shardNext = sh[1] - if len(sh) > 2 && sortBlockLess(sh[2], sh[2].rowRefNext, shardNext, shardNext.rowRefNext) { - shardNext = sh[2] + if shardNextIdx == 0 { + shardNextIdx = 1 + if len(sh) > 2 && sh.Less(2, 1) { + shardNextIdx = 2 } } - if sortBlockLess(shardNext, shardNext.rowRefNext, shard, shard.rowRefNext) { + if sh.Less(shardNextIdx, 0) { heap.Fix(&sh, 0) - shardNext = nil + shardNextIdx = 0 - select { - case <-psp.stopCh: + if needStop(psp.stopCh) { return nil - default: } } } if len(sh) == 1 { shard := sh[0] for shard.rowRefNext < len(shard.rowRefs) { - wctx.writeRow(shard, shard.rowRefNext) - shard.rowRefNext++ + wctx.writeNextRow(shard) } } wctx.flush() @@ -458,7 +473,9 @@ type pipeSortWriteContext struct { valuesLen int } -func (wctx *pipeSortWriteContext) writeRow(shard *pipeSortProcessorShard, rowIdx int) { +func (wctx *pipeSortWriteContext) writeNextRow(shard *pipeSortProcessorShard) { + rowIdx := shard.rowRefNext + shard.rowRefNext++ rr := shard.rowRefs[rowIdx] b := &shard.blocks[rr.blockIdx] @@ -671,6 +688,17 @@ func parsePipeSort(lex *lexer) (*pipeSort, error) { ps.isDesc = true } + switch { + case lex.isKeyword("first"): + lex.nextToken() + n, ok := tryParseUint64(lex.token) + lex.nextToken() + if !ok { + return nil, fmt.Errorf("cannot parse 'first %s'", lex.token) + } + ps.limit = n + } + return &ps, nil } @@ -725,13 +753,6 @@ func parseBySortFields(lex *lexer) ([]*bySortField, error) { } } -func marshalJSONKeyValue(dst []byte, k, v string) []byte { - dst = strconv.AppendQuote(dst, k) - dst = append(dst, ':') - dst = strconv.AppendQuote(dst, v) - return dst -} - func tryParseInt64(s string) (int64, bool) { if len(s) == 0 { return 0, false @@ -756,3 +777,10 @@ func tryParseInt64(s string) (int64, bool) { } return -int64(u64), true } + +func marshalJSONKeyValue(dst []byte, k, v string) []byte { + dst = strconv.AppendQuote(dst, k) + dst = append(dst, ':') + dst = strconv.AppendQuote(dst, v) + return dst +} diff --git a/lib/logstorage/pipe_stats.go b/lib/logstorage/pipe_stats.go index 685a44358..56e086460 100644 --- a/lib/logstorage/pipe_stats.go +++ b/lib/logstorage/pipe_stats.go @@ -345,10 +345,8 @@ func (psp *pipeStatsProcessor) flush() error { for key, psg := range shard.m { // shard.m may be quite big, so this loop can take a lot of time and CPU. // Stop processing data as soon as stopCh is closed without wasting additional CPU time. - select { - case <-psp.stopCh: + if needStop(psp.stopCh) { return nil - default: } spgBase := m[key] @@ -388,10 +386,8 @@ func (psp *pipeStatsProcessor) flush() error { for key, psg := range m { // m may be quite big, so this loop can take a lot of time and CPU. // Stop processing data as soon as stopCh is closed without wasting additional CPU time. - select { - case <-psp.stopCh: + if needStop(psp.stopCh) { return nil - default: } // Unmarshal values for byFields from key. diff --git a/lib/logstorage/pipe_topk.go b/lib/logstorage/pipe_topk.go new file mode 100644 index 000000000..d90efa94e --- /dev/null +++ b/lib/logstorage/pipe_topk.go @@ -0,0 +1,546 @@ +package logstorage + +import ( + "container/heap" + "fmt" + "strings" + "sync" + "sync/atomic" + "unsafe" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/memory" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/stringsutil" +) + +func newPipeTopkProcessor(ps *pipeSort, workersCount int, stopCh <-chan struct{}, cancel func(), ppBase pipeProcessor) pipeProcessor { + maxStateSize := int64(float64(memory.Allowed()) * 0.2) + + shards := make([]pipeTopkProcessorShard, workersCount) + for i := range shards { + shard := &shards[i] + shard.ps = ps + shard.stateSizeBudget = stateSizeBudgetChunk + maxStateSize -= stateSizeBudgetChunk + } + + ptp := &pipeTopkProcessor{ + ps: ps, + stopCh: stopCh, + cancel: cancel, + ppBase: ppBase, + + shards: shards, + + maxStateSize: maxStateSize, + } + ptp.stateSizeBudget.Store(maxStateSize) + + return ptp +} + +type pipeTopkProcessor struct { + ps *pipeSort + stopCh <-chan struct{} + cancel func() + ppBase pipeProcessor + + shards []pipeTopkProcessorShard + + maxStateSize int64 + stateSizeBudget atomic.Int64 +} + +type pipeTopkProcessorShard struct { + pipeTopkProcessorShardNopad + + // The padding prevents false sharing on widespread platforms with 128 mod (cache line size) = 0 . + _ [128 - unsafe.Sizeof(pipeTopkProcessorShardNopad{})%128]byte +} + +type pipeTopkProcessorShardNopad struct { + // ps points to the parent pipeSort. + ps *pipeSort + + // rows contains rows tracked by the given shard. + rows []*pipeTopkRow + + // rowNext points to the next index at rows during merge shards phase + rowNext int + + // tmpRow is used as a temporary row when determining whether the next ingested row must be stored in the shard. + tmpRow pipeTopkRow + + // these are aux fields for determining whether the next row must be stored in rows. + byColumnValues [][]string + otherColumnValues []pipeTopkOtherColumn + byColumns []string + otherColumns []Field + + // stateSizeBudget is the remaining budget for the whole state size for the shard. + // The per-shard budget is provided in chunks from the parent pipeTopkProcessor. + stateSizeBudget int +} + +type pipeTopkRow struct { + byColumns []string + otherColumns []Field +} + +type pipeTopkOtherColumn struct { + name string + values []string +} + +func (r *pipeTopkRow) clone() *pipeTopkRow { + byColumnsCopy := make([]string, len(r.byColumns)) + for i := range byColumnsCopy { + byColumnsCopy[i] = strings.Clone(r.byColumns[i]) + } + + otherColumnsCopy := make([]Field, len(r.otherColumns)) + for i := range otherColumnsCopy { + src := &r.otherColumns[i] + dst := &otherColumnsCopy[i] + dst.Name = strings.Clone(src.Name) + dst.Value = strings.Clone(src.Value) + } + + return &pipeTopkRow{ + byColumns: byColumnsCopy, + otherColumns: otherColumnsCopy, + } +} + +func (r *pipeTopkRow) sizeBytes() int { + n := int(unsafe.Sizeof(*r)) + + for _, v := range r.byColumns { + n += len(v) + } + n += len(r.byColumns) * int(unsafe.Sizeof(r.byColumns[0])) + + for _, f := range r.otherColumns { + n += len(f.Name) + len(f.Value) + } + n += len(r.otherColumns) * int(unsafe.Sizeof(r.otherColumns[0])) + + return n +} + +func (shard *pipeTopkProcessorShard) Len() int { + return len(shard.rows) +} + +func (shard *pipeTopkProcessorShard) Swap(i, j int) { + rows := shard.rows + rows[i], rows[j] = rows[j], rows[i] +} + +func (shard *pipeTopkProcessorShard) Less(i, j int) bool { + rows := shard.rows + + // This is max heap + return topkLess(shard.ps, rows[j], rows[i]) +} + +func (shard *pipeTopkProcessorShard) Push(x any) { + r := x.(*pipeTopkRow) + shard.rows = append(shard.rows, r) +} + +func (shard *pipeTopkProcessorShard) Pop() any { + rows := shard.rows + x := rows[len(rows)-1] + rows[len(rows)-1] = nil + shard.rows = rows[:len(rows)-1] + return x +} + +// writeBlock writes br to shard. +func (shard *pipeTopkProcessorShard) writeBlock(br *blockResult) { + cs := br.getColumns() + + byFields := shard.ps.byFields + if len(byFields) == 0 { + // Sort by all the fields + + byColumnValues := shard.byColumnValues[:0] + for _, c := range cs { + byColumnValues = append(byColumnValues, c.getValues(br)) + } + shard.byColumnValues = byColumnValues + + byColumns := shard.byColumns[:0] + otherColumns := shard.otherColumns[:0] + bb := bbPool.Get() + for rowIdx := range br.timestamps { + byColumns = byColumns[:0] + bb.B = bb.B[:0] + for i, values := range byColumnValues { + v := values[rowIdx] + bb.B = marshalJSONKeyValue(bb.B, cs[i].name, v) + bb.B = append(bb.B, ',') + } + byColumns = append(byColumns, bytesutil.ToUnsafeString(bb.B)) + + otherColumns = otherColumns[:0] + for i, values := range byColumnValues { + otherColumns = append(otherColumns, Field{ + Name: cs[i].name, + Value: values[rowIdx], + }) + } + + shard.addRow(byColumns, otherColumns) + } + bbPool.Put(bb) + shard.byColumns = byColumns + shard.otherColumns = otherColumns + } else { + // Sort by byFields + + byColumnValues := shard.byColumnValues[:0] + for _, bf := range byFields { + c := br.getColumnByName(bf.name) + byColumnValues = append(byColumnValues, c.getValues(br)) + } + shard.byColumnValues = byColumnValues + + otherColumnValues := shard.otherColumnValues[:0] + for _, c := range cs { + isByField := false + for _, bf := range byFields { + if bf.name == c.name { + isByField = true + break + } + } + if !isByField { + otherColumnValues = append(otherColumnValues, pipeTopkOtherColumn{ + name: c.name, + values: c.getValues(br), + }) + } + } + shard.otherColumnValues = otherColumnValues + + // add rows to shard + byColumns := shard.byColumns[:0] + otherColumns := shard.otherColumns[:0] + for rowIdx := range br.timestamps { + byColumns = byColumns[:0] + for _, values := range byColumnValues { + byColumns = append(byColumns, values[rowIdx]) + } + + otherColumns = otherColumns[:0] + for _, ocv := range otherColumnValues { + otherColumns = append(otherColumns, Field{ + Name: ocv.name, + Value: ocv.values[rowIdx], + }) + } + + shard.addRow(byColumns, otherColumns) + } + shard.byColumns = byColumns + shard.otherColumns = otherColumns + } +} + +func (shard *pipeTopkProcessorShard) addRow(byColumns []string, otherColumns []Field) { + r := &shard.tmpRow + r.byColumns = byColumns + r.otherColumns = otherColumns + + rows := shard.rows + if len(rows) > 0 && !topkLess(shard.ps, r, rows[0]) { + // Fast path - nothing to add. + return + } + + // Slow path - add r to shard.rows. + r = r.clone() + shard.stateSizeBudget -= r.sizeBytes() + if uint64(len(rows)) < shard.ps.limit { + heap.Push(shard, r) + shard.stateSizeBudget -= int(unsafe.Sizeof(r)) + } else { + shard.stateSizeBudget += rows[0].sizeBytes() + rows[0] = r + heap.Fix(shard, 0) + } +} + +func (shard *pipeTopkProcessorShard) sortRows(stopCh <-chan struct{}) { + rows := shard.rows + for i := len(rows) - 1; i > 0; i-- { + x := heap.Pop(shard) + rows[i] = x.(*pipeTopkRow) + + if needStop(stopCh) { + return + } + } +} + +func (ptp *pipeTopkProcessor) writeBlock(workerID uint, br *blockResult) { + if len(br.timestamps) == 0 { + return + } + + shard := &ptp.shards[workerID] + + for shard.stateSizeBudget < 0 { + // steal some budget for the state size from the global budget. + remaining := ptp.stateSizeBudget.Add(-stateSizeBudgetChunk) + if remaining < 0 { + // The state size is too big. Stop processing data in order to avoid OOM crash. + if remaining+stateSizeBudgetChunk >= 0 { + // Notify worker goroutines to stop calling writeBlock() in order to save CPU time. + ptp.cancel() + } + return + } + shard.stateSizeBudget += stateSizeBudgetChunk + } + + shard.writeBlock(br) +} + +func (ptp *pipeTopkProcessor) flush() error { + if n := ptp.stateSizeBudget.Load(); n <= 0 { + return fmt.Errorf("cannot calculate [%s], since it requires more than %dMB of memory", ptp.ps.String(), ptp.maxStateSize/(1<<20)) + } + + if needStop(ptp.stopCh) { + return nil + } + + // Sort every shard in parallel + var wg sync.WaitGroup + shards := ptp.shards + for i := range shards { + wg.Add(1) + go func(shard *pipeTopkProcessorShard) { + shard.sortRows(ptp.stopCh) + wg.Done() + }(&shards[i]) + } + wg.Wait() + + if needStop(ptp.stopCh) { + return nil + } + + // Merge sorted results across shards + sh := pipeTopkProcessorShardsHeap(make([]*pipeTopkProcessorShard, 0, len(shards))) + for i := range shards { + shard := &shards[i] + if len(shard.rows) > 0 { + sh = append(sh, shard) + } + } + if len(sh) == 0 { + return nil + } + + heap.Init(&sh) + + wctx := &pipeTopkWriteContext{ + ptp: ptp, + } + shardNextIdx := 0 + + for len(sh) > 1 { + shard := sh[0] + if !wctx.writeNextRow(shard) { + break + } + + if shard.rowNext >= len(shard.rows) { + _ = heap.Pop(&sh) + shardNextIdx = 0 + + if needStop(ptp.stopCh) { + return nil + } + + continue + } + + if shardNextIdx == 0 { + shardNextIdx = 1 + if len(sh) > 2 && sh.Less(2, 1) { + shardNextIdx = 2 + } + } + + if sh.Less(shardNextIdx, 0) { + heap.Fix(&sh, 0) + shardNextIdx = 0 + + if needStop(ptp.stopCh) { + return nil + } + } + } + if len(sh) == 1 { + shard := sh[0] + for shard.rowNext < len(shard.rows) { + if !wctx.writeNextRow(shard) { + break + } + } + } + wctx.flush() + + return nil +} + +type pipeTopkWriteContext struct { + ptp *pipeTopkProcessor + rcs []resultColumn + br blockResult + + rowsWritten uint64 + valuesLen int +} + +func (wctx *pipeTopkWriteContext) writeNextRow(shard *pipeTopkProcessorShard) bool { + if wctx.rowsWritten >= wctx.ptp.ps.limit { + return false + } + wctx.rowsWritten++ + + rowIdx := shard.rowNext + shard.rowNext++ + r := shard.rows[rowIdx] + + byFields := shard.ps.byFields + rcs := wctx.rcs + + areEqualColumns := len(rcs) == len(byFields)+len(r.otherColumns) + if areEqualColumns { + for i, c := range r.otherColumns { + if rcs[len(byFields)+i].name != c.Name { + areEqualColumns = false + break + } + } + } + if !areEqualColumns { + // send the current block to bbBase and construct a block with new set of columns + wctx.flush() + + rcs = wctx.rcs[:0] + for _, bf := range byFields { + rcs = append(rcs, resultColumn{ + name: bf.name, + }) + } + for _, c := range r.otherColumns { + rcs = append(rcs, resultColumn{ + name: c.Name, + }) + } + wctx.rcs = rcs + } + + byColumns := r.byColumns + for i := range byFields { + v := byColumns[i] + rcs[i].addValue(v) + wctx.valuesLen += len(v) + } + + for i, c := range r.otherColumns { + v := c.Value + rcs[len(byFields)+i].addValue(v) + wctx.valuesLen += len(v) + } + + if wctx.valuesLen >= 1_000_000 { + wctx.flush() + } + + return true +} + +func (wctx *pipeTopkWriteContext) flush() { + rcs := wctx.rcs + br := &wctx.br + + wctx.valuesLen = 0 + + if len(rcs) == 0 { + return + } + + // Flush rcs to ppBase + br.setResultColumns(rcs) + wctx.ptp.ppBase.writeBlock(0, br) + br.reset() + for i := range rcs { + rcs[i].resetKeepName() + } +} + +type pipeTopkProcessorShardsHeap []*pipeTopkProcessorShard + +func (sh *pipeTopkProcessorShardsHeap) Len() int { + return len(*sh) +} + +func (sh *pipeTopkProcessorShardsHeap) Swap(i, j int) { + a := *sh + a[i], a[j] = a[j], a[i] +} + +func (sh *pipeTopkProcessorShardsHeap) Less(i, j int) bool { + a := *sh + shardA := a[i] + shardB := a[j] + return topkLess(shardA.ps, shardA.rows[shardA.rowNext], shardB.rows[shardB.rowNext]) +} + +func (sh *pipeTopkProcessorShardsHeap) Push(x any) { + shard := x.(*pipeTopkProcessorShard) + *sh = append(*sh, shard) +} + +func (sh *pipeTopkProcessorShardsHeap) Pop() any { + a := *sh + x := a[len(a)-1] + a[len(a)-1] = nil + *sh = a[:len(a)-1] + return x +} + +func topkLess(ps *pipeSort, a, b *pipeTopkRow) bool { + byFields := ps.byFields + + csA := a.byColumns + csB := b.byColumns + + for k := range csA { + isDesc := ps.isDesc + if len(byFields) > 0 && byFields[k].isDesc { + isDesc = !isDesc + } + + vA := csA[k] + vB := csB[k] + + if vA == vB { + continue + } + + if isDesc { + return stringsutil.LessNatural(vB, vA) + } + return stringsutil.LessNatural(vA, vB) + } + return false +} diff --git a/lib/logstorage/pipe_uniq.go b/lib/logstorage/pipe_uniq.go index 6cdd46fc7..3b72aaac2 100644 --- a/lib/logstorage/pipe_uniq.go +++ b/lib/logstorage/pipe_uniq.go @@ -209,10 +209,8 @@ func (pup *pipeUniqProcessor) flush() error { m := shards[0].m shards = shards[1:] for i := range shards { - select { - case <-pup.stopCh: + if needStop(pup.stopCh) { return nil - default: } for k := range shards[i].m { @@ -229,10 +227,8 @@ func (pup *pipeUniqProcessor) flush() error { if len(byFields) == 0 { for k := range m { - select { - case <-pup.stopCh: + if needStop(pup.stopCh) { return nil - default: } rowFields = rowFields[:0] @@ -259,10 +255,8 @@ func (pup *pipeUniqProcessor) flush() error { } } else { for k := range m { - select { - case <-pup.stopCh: + if needStop(pup.stopCh) { return nil - default: } rowFields = rowFields[:0] diff --git a/lib/logstorage/storage_search.go b/lib/logstorage/storage_search.go index 2b1ea3ba4..5ed9b80d0 100644 --- a/lib/logstorage/storage_search.go +++ b/lib/logstorage/storage_search.go @@ -182,12 +182,10 @@ func (s *Storage) search(workersCount int, so *genericSearchOptions, stopCh <-ch bsws := bswb.bsws for i := range bsws { bsw := &bsws[i] - select { - case <-stopCh: + if needStop(stopCh) { // The search has been canceled. Just skip all the scheduled work in order to save CPU time. bsw.reset() continue - default: } bs.search(bsw) @@ -266,11 +264,9 @@ var partitionSearchConcurrencyLimitCh = make(chan struct{}, cgroup.AvailableCPUs type partitionSearchFinalizer func() func (pt *partition) search(ft *filterTime, sf *StreamFilter, f filter, so *genericSearchOptions, workCh chan<- *blockSearchWorkBatch, stopCh <-chan struct{}) partitionSearchFinalizer { - select { - case <-stopCh: + if needStop(stopCh) { // Do not spend CPU time on search, since it is already stopped. return func() {} - default: } tenantIDs := so.tenantIDs @@ -436,10 +432,8 @@ func (p *part) searchByTenantIDs(so *searchOptions, bhss *blockHeaders, workCh c // it is assumed that ibhs are sorted ibhs := p.indexBlockHeaders for len(ibhs) > 0 && len(tenantIDs) > 0 { - select { - case <-stopCh: + if needStop(stopCh) { return - default: } // locate tenantID equal or bigger than the tenantID in ibhs[0] @@ -541,10 +535,8 @@ func (p *part) searchByStreamIDs(so *searchOptions, bhss *blockHeaders, workCh c ibhs := p.indexBlockHeaders for len(ibhs) > 0 && len(streamIDs) > 0 { - select { - case <-stopCh: + if needStop(stopCh) { return - default: } // locate streamID equal or bigger than the streamID in ibhs[0]