VictoriaMetrics/lib/logstorage/pipe_unpack.go

259 lines
5.2 KiB
Go
Raw Normal View History

2024-05-20 12:09:39 +00:00
package logstorage
import (
"unsafe"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
)
type fieldsUnpackerContext struct {
2024-05-22 11:04:16 +00:00
workerID uint
fieldPrefix string
fields []Field
a arena
2024-05-20 12:09:39 +00:00
}
func (uctx *fieldsUnpackerContext) reset() {
2024-05-22 09:38:10 +00:00
uctx.workerID = 0
2024-05-22 11:04:16 +00:00
uctx.fieldPrefix = ""
2024-05-20 12:09:39 +00:00
uctx.resetFields()
uctx.a.reset()
}
func (uctx *fieldsUnpackerContext) resetFields() {
clear(uctx.fields)
uctx.fields = uctx.fields[:0]
}
2024-05-22 11:04:16 +00:00
func (uctx *fieldsUnpackerContext) init(workerID uint, fieldPrefix string) {
uctx.reset()
uctx.workerID = workerID
uctx.fieldPrefix = fieldPrefix
}
func (uctx *fieldsUnpackerContext) addField(name, value string) {
2024-05-20 21:23:22 +00:00
nameCopy := ""
2024-05-22 11:04:16 +00:00
fieldPrefix := uctx.fieldPrefix
2024-05-20 21:23:22 +00:00
if fieldPrefix != "" {
nameBuf := uctx.a.newBytes(len(fieldPrefix) + len(name))
copy(nameBuf, fieldPrefix)
copy(nameBuf[len(fieldPrefix):], name)
nameCopy = bytesutil.ToUnsafeString(nameBuf)
} else {
nameCopy = uctx.a.copyString(name)
}
2024-05-20 12:09:39 +00:00
valueCopy := uctx.a.copyString(value)
uctx.fields = append(uctx.fields, Field{
Name: nameCopy,
Value: valueCopy,
})
}
2024-05-22 11:04:16 +00:00
func newPipeUnpackProcessor(workersCount int, unpackFunc func(uctx *fieldsUnpackerContext, s string), ppBase pipeProcessor,
2024-05-21 10:55:11 +00:00
fromField, fieldPrefix string, iff *ifFilter) *pipeUnpackProcessor {
2024-05-22 09:38:10 +00:00
2024-05-20 12:09:39 +00:00
return &pipeUnpackProcessor{
unpackFunc: unpackFunc,
ppBase: ppBase,
shards: make([]pipeUnpackProcessorShard, workersCount),
fromField: fromField,
fieldPrefix: fieldPrefix,
2024-05-21 10:55:11 +00:00
iff: iff,
2024-05-20 12:09:39 +00:00
}
}
type pipeUnpackProcessor struct {
2024-05-22 11:04:16 +00:00
unpackFunc func(uctx *fieldsUnpackerContext, s string)
2024-05-20 12:09:39 +00:00
ppBase pipeProcessor
shards []pipeUnpackProcessorShard
fromField string
fieldPrefix string
2024-05-21 10:55:11 +00:00
iff *ifFilter
2024-05-20 12:09:39 +00:00
}
type pipeUnpackProcessorShard struct {
pipeUnpackProcessorShardNopad
// The padding prevents false sharing on widespread platforms with 128 mod (cache line size) = 0 .
_ [128 - unsafe.Sizeof(pipeUnpackProcessorShardNopad{})%128]byte
}
type pipeUnpackProcessorShardNopad struct {
2024-05-21 10:55:11 +00:00
bm bitmap
2024-05-20 12:09:39 +00:00
uctx fieldsUnpackerContext
wctx pipeUnpackWriteContext
}
func (pup *pipeUnpackProcessor) writeBlock(workerID uint, br *blockResult) {
if len(br.timestamps) == 0 {
return
}
shard := &pup.shards[workerID]
2024-05-22 11:04:16 +00:00
shard.wctx.init(workerID, pup.ppBase, br)
shard.uctx.init(workerID, pup.fieldPrefix)
2024-05-20 12:09:39 +00:00
2024-05-21 10:55:11 +00:00
bm := &shard.bm
bm.init(len(br.timestamps))
bm.setBits()
if pup.iff != nil {
pup.iff.f.applyToBlockResult(br, bm)
if bm.isZero() {
pup.ppBase.writeBlock(workerID, br)
return
}
}
2024-05-20 12:09:39 +00:00
c := br.getColumnByName(pup.fromField)
if c.isConst {
v := c.valuesEncoded[0]
shard.uctx.resetFields()
2024-05-22 11:04:16 +00:00
pup.unpackFunc(&shard.uctx, v)
2024-05-20 12:09:39 +00:00
for rowIdx := range br.timestamps {
2024-05-21 10:55:11 +00:00
if bm.isSetBit(rowIdx) {
shard.wctx.writeRow(rowIdx, shard.uctx.fields)
} else {
shard.wctx.writeRow(rowIdx, nil)
}
2024-05-20 12:09:39 +00:00
}
} else {
values := c.getValues(br)
2024-05-21 10:55:11 +00:00
vPrevApplied := ""
2024-05-20 12:09:39 +00:00
for i, v := range values {
2024-05-21 10:55:11 +00:00
if bm.isSetBit(i) {
if vPrevApplied != v {
shard.uctx.resetFields()
2024-05-22 11:04:16 +00:00
pup.unpackFunc(&shard.uctx, v)
2024-05-21 10:55:11 +00:00
vPrevApplied = v
}
shard.wctx.writeRow(i, shard.uctx.fields)
} else {
shard.wctx.writeRow(i, nil)
2024-05-20 12:09:39 +00:00
}
}
}
shard.wctx.flush()
2024-05-22 11:04:16 +00:00
shard.wctx.reset()
2024-05-20 12:09:39 +00:00
shard.uctx.reset()
}
func (pup *pipeUnpackProcessor) flush() error {
return nil
}
type pipeUnpackWriteContext struct {
2024-05-22 00:03:31 +00:00
workerID uint
ppBase pipeProcessor
2024-05-20 12:09:39 +00:00
2024-05-22 11:04:16 +00:00
brSrc *blockResult
csSrc []*blockResultColumn
2024-05-20 12:09:39 +00:00
rcs []resultColumn
br blockResult
2024-05-21 08:39:02 +00:00
// rowsCount is the number of rows in the current block
rowsCount int
// valuesLen is the total length of values in the current block
2024-05-20 12:09:39 +00:00
valuesLen int
}
2024-05-20 21:23:22 +00:00
func (wctx *pipeUnpackWriteContext) reset() {
2024-05-22 00:03:31 +00:00
wctx.workerID = 0
2024-05-22 11:04:16 +00:00
wctx.ppBase = nil
2024-05-20 21:23:22 +00:00
wctx.brSrc = nil
wctx.csSrc = nil
rcs := wctx.rcs
for i := range rcs {
rcs[i].reset()
}
wctx.rcs = rcs[:0]
2024-05-21 08:39:02 +00:00
wctx.rowsCount = 0
2024-05-20 21:23:22 +00:00
wctx.valuesLen = 0
}
2024-05-22 11:04:16 +00:00
func (wctx *pipeUnpackWriteContext) init(workerID uint, ppBase pipeProcessor, brSrc *blockResult) {
2024-05-20 21:23:22 +00:00
wctx.reset()
2024-05-22 00:03:31 +00:00
wctx.workerID = workerID
2024-05-22 11:04:16 +00:00
wctx.ppBase = ppBase
2024-05-20 12:09:39 +00:00
wctx.brSrc = brSrc
wctx.csSrc = brSrc.getColumns()
}
func (wctx *pipeUnpackWriteContext) writeRow(rowIdx int, extraFields []Field) {
csSrc := wctx.csSrc
rcs := wctx.rcs
areEqualColumns := len(rcs) == len(csSrc)+len(extraFields)
if areEqualColumns {
for i, f := range extraFields {
if rcs[len(csSrc)+i].name != f.Name {
areEqualColumns = false
break
}
}
}
if !areEqualColumns {
2024-05-20 21:23:22 +00:00
// send the current block to ppBase and construct a block with new set of columns
2024-05-20 12:09:39 +00:00
wctx.flush()
rcs = wctx.rcs[:0]
for _, c := range csSrc {
rcs = appendResultColumnWithName(rcs, c.name)
}
for _, f := range extraFields {
rcs = appendResultColumnWithName(rcs, f.Name)
}
wctx.rcs = rcs
}
brSrc := wctx.brSrc
for i, c := range csSrc {
v := c.getValueAtRow(brSrc, rowIdx)
rcs[i].addValue(v)
wctx.valuesLen += len(v)
}
for i, f := range extraFields {
v := f.Value
rcs[len(csSrc)+i].addValue(v)
wctx.valuesLen += len(v)
}
2024-05-21 08:39:02 +00:00
wctx.rowsCount++
2024-05-20 12:09:39 +00:00
if wctx.valuesLen >= 1_000_000 {
wctx.flush()
}
}
func (wctx *pipeUnpackWriteContext) flush() {
rcs := wctx.rcs
wctx.valuesLen = 0
// Flush rcs to ppBase
br := &wctx.br
2024-05-21 08:39:02 +00:00
br.setResultColumns(rcs, wctx.rowsCount)
wctx.rowsCount = 0
2024-05-22 00:03:31 +00:00
wctx.ppBase.writeBlock(wctx.workerID, br)
2024-05-20 12:09:39 +00:00
br.reset()
for i := range rcs {
rcs[i].resetValues()
}
}