diff --git a/lib/logstorage/pipe_unroll.go b/lib/logstorage/pipe_unroll.go index e1e4e3be9..180b5687b 100644 --- a/lib/logstorage/pipe_unroll.go +++ b/lib/logstorage/pipe_unroll.go @@ -74,9 +74,10 @@ func (pu *pipeUnroll) updateNeededFields(neededFields, unneededFields fieldsSet) } } -func (pu *pipeUnroll) newPipeProcessor(workersCount int, _ <-chan struct{}, _ func(), ppNext pipeProcessor) pipeProcessor { +func (pu *pipeUnroll) newPipeProcessor(workersCount int, stopCh <-chan struct{}, _ func(), ppNext pipeProcessor) pipeProcessor { return &pipeUnrollProcessor{ pu: pu, + stopCh: stopCh, ppNext: ppNext, shards: make([]pipeUnrollProcessorShard, workersCount), @@ -85,6 +86,7 @@ func (pu *pipeUnroll) newPipeProcessor(workersCount int, _ <-chan struct{}, _ fu type pipeUnrollProcessor struct { pu *pipeUnroll + stopCh <-chan struct{} ppNext pipeProcessor shards []pipeUnrollProcessorShard @@ -139,6 +141,9 @@ func (pup *pipeUnrollProcessor) writeBlock(workerID uint, br *blockResult) { fields := shard.fields for rowIdx := range br.timestamps { if bm.isSetBit(rowIdx) { + if needStop(pup.stopCh) { + return + } shard.writeUnrolledFields(br, pu.fields, columnValues, rowIdx) } else { fields = fields[:0]