diff --git a/lib/logstorage/pipe_sort.go b/lib/logstorage/pipe_sort.go index 2e7d42d29..a0e3366f2 100644 --- a/lib/logstorage/pipe_sort.go +++ b/lib/logstorage/pipe_sort.go @@ -399,6 +399,10 @@ func (psp *pipeSortProcessor) flush() error { sh = append(sh, shard) } } + if len(sh) == 0 { + return nil + } + heap.Init(&sh) wctx := &pipeSortWriteContext{ @@ -413,6 +417,13 @@ func (psp *pipeSortProcessor) flush() error { if shard.rowRefNext >= uint(len(shard.rowRefs)) { _ = heap.Pop(&sh) shardNext = nil + + select { + case <-psp.stopCh: + return nil + default: + } + continue } @@ -473,7 +484,7 @@ func (wctx *pipeSortWriteContext) writeRow(shard *pipeSortProcessorShard, rowIdx // send the current block to bbBase and construct new columns wctx.flush() - rcs = rcs[:0] + rcs = wctx.rcs[:0] for _, bf := range byFields { rcs = append(rcs, resultColumn{ name: bf.name,