From 8248afc9bd3dbc8d183c05963c4f508c10842d98 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Sat, 25 May 2024 22:20:27 +0200 Subject: [PATCH] wip --- lib/logstorage/pipe_unroll.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/lib/logstorage/pipe_unroll.go b/lib/logstorage/pipe_unroll.go index e1e4e3be9..180b5687b 100644 --- a/lib/logstorage/pipe_unroll.go +++ b/lib/logstorage/pipe_unroll.go @@ -74,9 +74,10 @@ func (pu *pipeUnroll) updateNeededFields(neededFields, unneededFields fieldsSet) } } -func (pu *pipeUnroll) newPipeProcessor(workersCount int, _ <-chan struct{}, _ func(), ppNext pipeProcessor) pipeProcessor { +func (pu *pipeUnroll) newPipeProcessor(workersCount int, stopCh <-chan struct{}, _ func(), ppNext pipeProcessor) pipeProcessor { return &pipeUnrollProcessor{ pu: pu, + stopCh: stopCh, ppNext: ppNext, shards: make([]pipeUnrollProcessorShard, workersCount), @@ -85,6 +86,7 @@ func (pu *pipeUnroll) newPipeProcessor(workersCount int, _ <-chan struct{}, _ fu type pipeUnrollProcessor struct { pu *pipeUnroll + stopCh <-chan struct{} ppNext pipeProcessor shards []pipeUnrollProcessorShard @@ -139,6 +141,9 @@ func (pup *pipeUnrollProcessor) writeBlock(workerID uint, br *blockResult) { fields := shard.fields for rowIdx := range br.timestamps { if bm.isSetBit(rowIdx) { + if needStop(pup.stopCh) { + return + } shard.writeUnrolledFields(br, pu.fields, columnValues, rowIdx) } else { fields = fields[:0]