diff --git a/lib/logstorage/block_result.go b/lib/logstorage/block_result.go index e88916cc6..d6d35825f 100644 --- a/lib/logstorage/block_result.go +++ b/lib/logstorage/block_result.go @@ -1343,8 +1343,8 @@ func (br *blockResult) getColumnByName(columnName string) *blockResultColumn { // Create missing empty column br.csEmpty = append(br.csEmpty, blockResultColumn{ - name: br.a.copyString(columnName), - isConst: true, + name: br.a.copyString(columnName), + isConst: true, valuesEncoded: getEmptyStrings(1), }) return &br.csEmpty[len(br.csEmpty)-1] diff --git a/lib/logstorage/pipe_extract.go b/lib/logstorage/pipe_extract.go index b0e16ae35..31bfcbd4b 100644 --- a/lib/logstorage/pipe_extract.go +++ b/lib/logstorage/pipe_extract.go @@ -128,7 +128,9 @@ type pipeExtractProcessorShardNopad struct { bm bitmap ptn *pattern - resultValues []string + resultColumns []*blockResultColumn + resultValues []string + rcs []resultColumn a arena } @@ -166,8 +168,15 @@ func (pep *pipeExtractProcessor) writeBlock(workerID uint, br *blockResult) { c := br.getColumnByName(pe.fromField) values := c.getValues(br) + shard.resultColumns = slicesutil.SetLength(shard.resultColumns, len(rcs)) + resultColumns := shard.resultColumns + for i := range resultColumns { + resultColumns[i] = br.getColumnByName(rcs[i].name) + } + shard.resultValues = slicesutil.SetLength(shard.resultValues, len(rcs)) resultValues := shard.resultValues + hadUpdates := false vPrev := "" for rowIdx, v := range values { @@ -181,7 +190,7 @@ func (pep *pipeExtractProcessor) writeBlock(workerID uint, br *blockResult) { for i, f := range ptn.fields { v := *f.value if v == "" && pe.skipEmptyResults || pe.keepOriginalFields { - c := br.getColumnByName(rcs[i].name) + c := resultColumns[i] if vOrig := c.getValueAtRow(br, rowIdx); vOrig != "" { v = vOrig } @@ -192,10 +201,8 @@ func (pep *pipeExtractProcessor) writeBlock(workerID uint, br *blockResult) { } } } else { - for i := range rcs { - c := br.getColumnByName(rcs[i].name) - v := c.getValueAtRow(br, rowIdx) - resultValues[i] = v + for i, c := range resultColumns { + resultValues[i] = c.getValueAtRow(br, rowIdx) } } diff --git a/lib/logstorage/pipe_format.go b/lib/logstorage/pipe_format.go index 805dfaaa0..76cf74209 100644 --- a/lib/logstorage/pipe_format.go +++ b/lib/logstorage/pipe_format.go @@ -4,8 +4,6 @@ import ( "fmt" "strconv" "unsafe" - - "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" ) // pipeFormat processes '| format ...' pipe. @@ -118,8 +116,8 @@ type pipeFormatProcessorShard struct { type pipeFormatProcessorShardNopad struct { bm bitmap - uctx fieldsUnpackerContext - wctx pipeUnpackWriteContext + a arena + rc resultColumn } func (pfp *pipeFormatProcessor) writeBlock(workerID uint, br *blockResult) { @@ -128,13 +126,12 @@ func (pfp *pipeFormatProcessor) writeBlock(workerID uint, br *blockResult) { } shard := &pfp.shards[workerID] - shard.wctx.init(workerID, pfp.ppBase, pfp.pf.keepOriginalFields, pfp.pf.skipEmptyResults, br) - shard.uctx.init(workerID, "") + pf := pfp.pf bm := &shard.bm bm.init(len(br.timestamps)) bm.setBits() - if iff := pfp.pf.iff; iff != nil { + if iff := pf.iff; iff != nil { iff.f.applyToBlockResult(br, bm) if bm.isZero() { pfp.ppBase.writeBlock(workerID, br) @@ -142,25 +139,36 @@ func (pfp *pipeFormatProcessor) writeBlock(workerID uint, br *blockResult) { } } + shard.rc.name = pf.resultField + + resultColumn := br.getColumnByName(pf.resultField) for rowIdx := range br.timestamps { + v := "" if bm.isSetBit(rowIdx) { - shard.formatRow(pfp.pf, br, rowIdx) - shard.wctx.writeRow(rowIdx, shard.uctx.fields) + v = shard.formatRow(pf, br, rowIdx) + if v == "" && pf.skipEmptyResults || pf.keepOriginalFields { + if vOrig := resultColumn.getValueAtRow(br, rowIdx); vOrig != "" { + v = vOrig + } + } } else { - shard.wctx.writeRow(rowIdx, nil) + v = resultColumn.getValueAtRow(br, rowIdx) } + shard.rc.addValue(v) } - shard.wctx.flush() - shard.wctx.reset() - shard.uctx.reset() + br.addResultColumn(&shard.rc) + pfp.ppBase.writeBlock(workerID, br) + + shard.a.reset() + shard.rc.reset() } func (pfp *pipeFormatProcessor) flush() error { return nil } -func (shard *pipeFormatProcessorShard) formatRow(pf *pipeFormat, br *blockResult, rowIdx int) { +func (shard *pipeFormatProcessorShard) formatRow(pf *pipeFormat, br *blockResult, rowIdx int) string { bb := bbPool.Get() b := bb.B for _, step := range pf.steps { @@ -177,10 +185,9 @@ func (shard *pipeFormatProcessorShard) formatRow(pf *pipeFormat, br *blockResult } bb.B = b - s := bytesutil.ToUnsafeString(b) - shard.uctx.resetFields() - shard.uctx.addField(pf.resultField, s) + v := shard.a.copyBytesToString(b) bbPool.Put(bb) + return v } func parsePipeFormat(lex *lexer) (*pipeFormat, error) {