VictoriaMetrics/lib/logstorage/pipe_unpack.go
Aliaksandr Valialkin fba053b34d
wip
2024-05-20 14:09:39 +02:00

184 lines
3.9 KiB
Go

package logstorage
import (
"unsafe"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
)
type fieldsUnpackerContext struct {
fields []Field
a arena
}
func (uctx *fieldsUnpackerContext) reset() {
uctx.resetFields()
uctx.a.reset()
}
func (uctx *fieldsUnpackerContext) resetFields() {
clear(uctx.fields)
uctx.fields = uctx.fields[:0]
}
func (uctx *fieldsUnpackerContext) addField(name, value, fieldPrefix string) {
nameBuf := uctx.a.newBytes(len(fieldPrefix) + len(name))
copy(nameBuf, fieldPrefix)
copy(nameBuf[len(fieldPrefix):], name)
nameCopy := bytesutil.ToUnsafeString(nameBuf)
valueCopy := uctx.a.copyString(value)
uctx.fields = append(uctx.fields, Field{
Name: nameCopy,
Value: valueCopy,
})
}
func newPipeUnpackProcessor(workersCount int, unpackFunc func(uctx *fieldsUnpackerContext, s, fieldPrefix string), ppBase pipeProcessor, fromField, fieldPrefix string) *pipeUnpackProcessor {
return &pipeUnpackProcessor{
unpackFunc: unpackFunc,
ppBase: ppBase,
shards: make([]pipeUnpackProcessorShard, workersCount),
fromField: fromField,
fieldPrefix: fieldPrefix,
}
}
type pipeUnpackProcessor struct {
unpackFunc func(uctx *fieldsUnpackerContext, s, fieldPrefix string)
ppBase pipeProcessor
shards []pipeUnpackProcessorShard
fromField string
fieldPrefix string
}
type pipeUnpackProcessorShard struct {
pipeUnpackProcessorShardNopad
// The padding prevents false sharing on widespread platforms with 128 mod (cache line size) = 0 .
_ [128 - unsafe.Sizeof(pipeUnpackProcessorShardNopad{})%128]byte
}
type pipeUnpackProcessorShardNopad struct {
uctx fieldsUnpackerContext
wctx pipeUnpackWriteContext
}
func (pup *pipeUnpackProcessor) writeBlock(workerID uint, br *blockResult) {
if len(br.timestamps) == 0 {
return
}
shard := &pup.shards[workerID]
shard.wctx.init(br, pup.ppBase)
c := br.getColumnByName(pup.fromField)
if c.isConst {
v := c.valuesEncoded[0]
shard.uctx.resetFields()
pup.unpackFunc(&shard.uctx, v, pup.fieldPrefix)
for rowIdx := range br.timestamps {
shard.wctx.writeRow(rowIdx, shard.uctx.fields)
}
} else {
values := c.getValues(br)
for i, v := range values {
if i == 0 || values[i-1] != v {
shard.uctx.resetFields()
pup.unpackFunc(&shard.uctx, v, pup.fieldPrefix)
}
shard.wctx.writeRow(i, shard.uctx.fields)
}
}
shard.wctx.flush()
shard.uctx.reset()
}
func (pup *pipeUnpackProcessor) flush() error {
return nil
}
type pipeUnpackWriteContext struct {
brSrc *blockResult
csSrc []*blockResultColumn
ppBase pipeProcessor
rcs []resultColumn
br blockResult
valuesLen int
}
func (wctx *pipeUnpackWriteContext) init(brSrc *blockResult, ppBase pipeProcessor) {
wctx.brSrc = brSrc
wctx.csSrc = brSrc.getColumns()
wctx.ppBase = ppBase
}
func (wctx *pipeUnpackWriteContext) writeRow(rowIdx int, extraFields []Field) {
csSrc := wctx.csSrc
rcs := wctx.rcs
areEqualColumns := len(rcs) == len(csSrc)+len(extraFields)
if areEqualColumns {
for i, f := range extraFields {
if rcs[len(csSrc)+i].name != f.Name {
areEqualColumns = false
break
}
}
}
if !areEqualColumns {
// send the current block to bbBase and construct a block with new set of columns
wctx.flush()
rcs = wctx.rcs[:0]
for _, c := range csSrc {
rcs = appendResultColumnWithName(rcs, c.name)
}
for _, f := range extraFields {
rcs = appendResultColumnWithName(rcs, f.Name)
}
wctx.rcs = rcs
}
brSrc := wctx.brSrc
for i, c := range csSrc {
v := c.getValueAtRow(brSrc, rowIdx)
rcs[i].addValue(v)
wctx.valuesLen += len(v)
}
for i, f := range extraFields {
v := f.Value
rcs[len(csSrc)+i].addValue(v)
wctx.valuesLen += len(v)
}
if wctx.valuesLen >= 1_000_000 {
wctx.flush()
}
}
func (wctx *pipeUnpackWriteContext) flush() {
rcs := wctx.rcs
wctx.valuesLen = 0
if len(rcs) == 0 {
return
}
// Flush rcs to ppBase
br := &wctx.br
br.setResultColumns(rcs)
wctx.ppBase.writeBlock(0, br)
br.reset()
for i := range rcs {
rcs[i].resetValues()
}
}