diff --git a/lib/logstorage/pipe_extract.go b/lib/logstorage/pipe_extract.go index 9099f4fe3..ef135e722 100644 --- a/lib/logstorage/pipe_extract.go +++ b/lib/logstorage/pipe_extract.go @@ -116,7 +116,7 @@ func (pep *pipeExtractProcessor) writeBlock(workerID uint, br *blockResult) { } shard := &pep.shards[workerID] - shard.wctx.init(br, pep.ppBase) + shard.wctx.init(workerID, br, pep.ppBase) ef := shard.ef bm := &shard.bm diff --git a/lib/logstorage/pipe_unpack.go b/lib/logstorage/pipe_unpack.go index 1a39872dc..7dff8cd03 100644 --- a/lib/logstorage/pipe_unpack.go +++ b/lib/logstorage/pipe_unpack.go @@ -86,7 +86,7 @@ func (pup *pipeUnpackProcessor) writeBlock(workerID uint, br *blockResult) { } shard := &pup.shards[workerID] - shard.wctx.init(br, pup.ppBase) + shard.wctx.init(workerID, br, pup.ppBase) bm := &shard.bm bm.init(len(br.timestamps)) @@ -137,9 +137,10 @@ func (pup *pipeUnpackProcessor) flush() error { } type pipeUnpackWriteContext struct { - brSrc *blockResult - csSrc []*blockResultColumn - ppBase pipeProcessor + workerID uint + brSrc *blockResult + csSrc []*blockResultColumn + ppBase pipeProcessor rcs []resultColumn br blockResult @@ -152,6 +153,7 @@ type pipeUnpackWriteContext struct { } func (wctx *pipeUnpackWriteContext) reset() { + wctx.workerID = 0 wctx.brSrc = nil wctx.csSrc = nil wctx.ppBase = nil @@ -166,9 +168,10 @@ func (wctx *pipeUnpackWriteContext) reset() { wctx.valuesLen = 0 } -func (wctx *pipeUnpackWriteContext) init(brSrc *blockResult, ppBase pipeProcessor) { +func (wctx *pipeUnpackWriteContext) init(workerID uint, brSrc *blockResult, ppBase pipeProcessor) { wctx.reset() + wctx.workerID = workerID wctx.brSrc = brSrc wctx.csSrc = brSrc.getColumns() wctx.ppBase = ppBase @@ -228,7 +231,7 @@ func (wctx *pipeUnpackWriteContext) flush() { br := &wctx.br br.setResultColumns(rcs, wctx.rowsCount) wctx.rowsCount = 0 - wctx.ppBase.writeBlock(0, br) + wctx.ppBase.writeBlock(wctx.workerID, br) br.reset() for i := range rcs { rcs[i].resetValues()