From a19611336c50290e4dbca93f62cff8762731e336 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Mon, 6 May 2024 19:58:08 +0200 Subject: [PATCH] wip --- lib/logstorage/pipe_sort.go | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/lib/logstorage/pipe_sort.go b/lib/logstorage/pipe_sort.go index 2e7d42d29..a0e3366f2 100644 --- a/lib/logstorage/pipe_sort.go +++ b/lib/logstorage/pipe_sort.go @@ -399,6 +399,10 @@ func (psp *pipeSortProcessor) flush() error { sh = append(sh, shard) } } + if len(sh) == 0 { + return nil + } + heap.Init(&sh) wctx := &pipeSortWriteContext{ @@ -413,6 +417,13 @@ func (psp *pipeSortProcessor) flush() error { if shard.rowRefNext >= uint(len(shard.rowRefs)) { _ = heap.Pop(&sh) shardNext = nil + + select { + case <-psp.stopCh: + return nil + default: + } + continue } @@ -473,7 +484,7 @@ func (wctx *pipeSortWriteContext) writeRow(shard *pipeSortProcessorShard, rowIdx // send the current block to bbBase and construct new columns wctx.flush() - rcs = rcs[:0] + rcs = wctx.rcs[:0] for _, bf := range byFields { rcs = append(rcs, resultColumn{ name: bf.name,