This commit is contained in:
Aliaksandr Valialkin 2024-05-25 16:09:59 +02:00
parent 46bc1c3435
commit 22c2671205
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB
3 changed files with 39 additions and 25 deletions

View file

@ -1343,8 +1343,8 @@ func (br *blockResult) getColumnByName(columnName string) *blockResultColumn {
// Create missing empty column
br.csEmpty = append(br.csEmpty, blockResultColumn{
name: br.a.copyString(columnName),
isConst: true,
name: br.a.copyString(columnName),
isConst: true,
valuesEncoded: getEmptyStrings(1),
})
return &br.csEmpty[len(br.csEmpty)-1]

View file

@ -128,7 +128,9 @@ type pipeExtractProcessorShardNopad struct {
bm bitmap
ptn *pattern
resultValues []string
resultColumns []*blockResultColumn
resultValues []string
rcs []resultColumn
a arena
}
@ -166,8 +168,15 @@ func (pep *pipeExtractProcessor) writeBlock(workerID uint, br *blockResult) {
c := br.getColumnByName(pe.fromField)
values := c.getValues(br)
shard.resultColumns = slicesutil.SetLength(shard.resultColumns, len(rcs))
resultColumns := shard.resultColumns
for i := range resultColumns {
resultColumns[i] = br.getColumnByName(rcs[i].name)
}
shard.resultValues = slicesutil.SetLength(shard.resultValues, len(rcs))
resultValues := shard.resultValues
hadUpdates := false
vPrev := ""
for rowIdx, v := range values {
@ -181,7 +190,7 @@ func (pep *pipeExtractProcessor) writeBlock(workerID uint, br *blockResult) {
for i, f := range ptn.fields {
v := *f.value
if v == "" && pe.skipEmptyResults || pe.keepOriginalFields {
c := br.getColumnByName(rcs[i].name)
c := resultColumns[i]
if vOrig := c.getValueAtRow(br, rowIdx); vOrig != "" {
v = vOrig
}
@ -192,10 +201,8 @@ func (pep *pipeExtractProcessor) writeBlock(workerID uint, br *blockResult) {
}
}
} else {
for i := range rcs {
c := br.getColumnByName(rcs[i].name)
v := c.getValueAtRow(br, rowIdx)
resultValues[i] = v
for i, c := range resultColumns {
resultValues[i] = c.getValueAtRow(br, rowIdx)
}
}

View file

@ -4,8 +4,6 @@ import (
"fmt"
"strconv"
"unsafe"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
)
// pipeFormat processes '| format ...' pipe.
@ -118,8 +116,8 @@ type pipeFormatProcessorShard struct {
type pipeFormatProcessorShardNopad struct {
bm bitmap
uctx fieldsUnpackerContext
wctx pipeUnpackWriteContext
a arena
rc resultColumn
}
func (pfp *pipeFormatProcessor) writeBlock(workerID uint, br *blockResult) {
@ -128,13 +126,12 @@ func (pfp *pipeFormatProcessor) writeBlock(workerID uint, br *blockResult) {
}
shard := &pfp.shards[workerID]
shard.wctx.init(workerID, pfp.ppBase, pfp.pf.keepOriginalFields, pfp.pf.skipEmptyResults, br)
shard.uctx.init(workerID, "")
pf := pfp.pf
bm := &shard.bm
bm.init(len(br.timestamps))
bm.setBits()
if iff := pfp.pf.iff; iff != nil {
if iff := pf.iff; iff != nil {
iff.f.applyToBlockResult(br, bm)
if bm.isZero() {
pfp.ppBase.writeBlock(workerID, br)
@ -142,25 +139,36 @@ func (pfp *pipeFormatProcessor) writeBlock(workerID uint, br *blockResult) {
}
}
shard.rc.name = pf.resultField
resultColumn := br.getColumnByName(pf.resultField)
for rowIdx := range br.timestamps {
v := ""
if bm.isSetBit(rowIdx) {
shard.formatRow(pfp.pf, br, rowIdx)
shard.wctx.writeRow(rowIdx, shard.uctx.fields)
v = shard.formatRow(pf, br, rowIdx)
if v == "" && pf.skipEmptyResults || pf.keepOriginalFields {
if vOrig := resultColumn.getValueAtRow(br, rowIdx); vOrig != "" {
v = vOrig
}
}
} else {
shard.wctx.writeRow(rowIdx, nil)
v = resultColumn.getValueAtRow(br, rowIdx)
}
shard.rc.addValue(v)
}
shard.wctx.flush()
shard.wctx.reset()
shard.uctx.reset()
br.addResultColumn(&shard.rc)
pfp.ppBase.writeBlock(workerID, br)
shard.a.reset()
shard.rc.reset()
}
func (pfp *pipeFormatProcessor) flush() error {
return nil
}
func (shard *pipeFormatProcessorShard) formatRow(pf *pipeFormat, br *blockResult, rowIdx int) {
func (shard *pipeFormatProcessorShard) formatRow(pf *pipeFormat, br *blockResult, rowIdx int) string {
bb := bbPool.Get()
b := bb.B
for _, step := range pf.steps {
@ -177,10 +185,9 @@ func (shard *pipeFormatProcessorShard) formatRow(pf *pipeFormat, br *blockResult
}
bb.B = b
s := bytesutil.ToUnsafeString(b)
shard.uctx.resetFields()
shard.uctx.addField(pf.resultField, s)
v := shard.a.copyBytesToString(b)
bbPool.Put(bb)
return v
}
func parsePipeFormat(lex *lexer) (*pipeFormat, error) {