diff --git a/lib/logstorage/pipe_unpack.go b/lib/logstorage/pipe_unpack.go index fe2e36d78..fbc4589f1 100644 --- a/lib/logstorage/pipe_unpack.go +++ b/lib/logstorage/pipe_unpack.go @@ -57,7 +57,6 @@ func updateNeededFieldsForUnpackPipe(fromField string, outFields []string, keepO } type fieldsUnpackerContext struct { - workerID uint fieldPrefix string fields []Field @@ -65,7 +64,6 @@ type fieldsUnpackerContext struct { } func (uctx *fieldsUnpackerContext) reset() { - uctx.workerID = 0 uctx.fieldPrefix = "" uctx.resetFields() uctx.a.reset() @@ -76,10 +74,9 @@ func (uctx *fieldsUnpackerContext) resetFields() { uctx.fields = uctx.fields[:0] } -func (uctx *fieldsUnpackerContext) init(workerID uint, fieldPrefix string) { +func (uctx *fieldsUnpackerContext) init(fieldPrefix string) { uctx.reset() - uctx.workerID = workerID uctx.fieldPrefix = fieldPrefix } @@ -157,7 +154,7 @@ func (pup *pipeUnpackProcessor) writeBlock(workerID uint, br *blockResult) { shard := &pup.shards[workerID] shard.wctx.init(workerID, pup.ppNext, pup.keepOriginalFields, pup.skipEmptyResults, br) - shard.uctx.init(workerID, pup.fieldPrefix) + shard.uctx.init(pup.fieldPrefix) bm := &shard.bm bm.init(len(br.timestamps)) @@ -234,6 +231,7 @@ func (wctx *pipeUnpackWriteContext) reset() { wctx.workerID = 0 wctx.ppNext = nil wctx.keepOriginalFields = false + wctx.skipEmptyResults = false wctx.brSrc = nil wctx.csSrc = nil