This commit is contained in:
Aliaksandr Valialkin 2024-05-22 02:03:31 +02:00
parent 46bfdf6796
commit a4d5ef8382
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB
2 changed files with 10 additions and 7 deletions

View file

@ -116,7 +116,7 @@ func (pep *pipeExtractProcessor) writeBlock(workerID uint, br *blockResult) {
} }
shard := &pep.shards[workerID] shard := &pep.shards[workerID]
shard.wctx.init(br, pep.ppBase) shard.wctx.init(workerID, br, pep.ppBase)
ef := shard.ef ef := shard.ef
bm := &shard.bm bm := &shard.bm

View file

@ -86,7 +86,7 @@ func (pup *pipeUnpackProcessor) writeBlock(workerID uint, br *blockResult) {
} }
shard := &pup.shards[workerID] shard := &pup.shards[workerID]
shard.wctx.init(br, pup.ppBase) shard.wctx.init(workerID, br, pup.ppBase)
bm := &shard.bm bm := &shard.bm
bm.init(len(br.timestamps)) bm.init(len(br.timestamps))
@ -137,9 +137,10 @@ func (pup *pipeUnpackProcessor) flush() error {
} }
type pipeUnpackWriteContext struct { type pipeUnpackWriteContext struct {
brSrc *blockResult workerID uint
csSrc []*blockResultColumn brSrc *blockResult
ppBase pipeProcessor csSrc []*blockResultColumn
ppBase pipeProcessor
rcs []resultColumn rcs []resultColumn
br blockResult br blockResult
@ -152,6 +153,7 @@ type pipeUnpackWriteContext struct {
} }
func (wctx *pipeUnpackWriteContext) reset() { func (wctx *pipeUnpackWriteContext) reset() {
wctx.workerID = 0
wctx.brSrc = nil wctx.brSrc = nil
wctx.csSrc = nil wctx.csSrc = nil
wctx.ppBase = nil wctx.ppBase = nil
@ -166,9 +168,10 @@ func (wctx *pipeUnpackWriteContext) reset() {
wctx.valuesLen = 0 wctx.valuesLen = 0
} }
func (wctx *pipeUnpackWriteContext) init(brSrc *blockResult, ppBase pipeProcessor) { func (wctx *pipeUnpackWriteContext) init(workerID uint, brSrc *blockResult, ppBase pipeProcessor) {
wctx.reset() wctx.reset()
wctx.workerID = workerID
wctx.brSrc = brSrc wctx.brSrc = brSrc
wctx.csSrc = brSrc.getColumns() wctx.csSrc = brSrc.getColumns()
wctx.ppBase = ppBase wctx.ppBase = ppBase
@ -228,7 +231,7 @@ func (wctx *pipeUnpackWriteContext) flush() {
br := &wctx.br br := &wctx.br
br.setResultColumns(rcs, wctx.rowsCount) br.setResultColumns(rcs, wctx.rowsCount)
wctx.rowsCount = 0 wctx.rowsCount = 0
wctx.ppBase.writeBlock(0, br) wctx.ppBase.writeBlock(wctx.workerID, br)
br.reset() br.reset()
for i := range rcs { for i := range rcs {
rcs[i].resetValues() rcs[i].resetValues()