VictoriaMetrics/lib/logstorage/pipe_unpack.go

330 lines
7.1 KiB
Go
Raw Normal View History

2024-05-20 12:09:39 +00:00
package logstorage
import (
"unsafe"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
)
2024-05-25 12:37:26 +00:00
func updateNeededFieldsForUnpackPipe(fromField string, outFields []string, keepOriginalFields, skipEmptyResults bool, iff *ifFilter, neededFields, unneededFields fieldsSet) {
2024-05-30 10:29:24 +00:00
if neededFields.isEmpty() {
if iff != nil {
neededFields.addFields(iff.neededFields)
}
return
}
2024-05-25 12:37:26 +00:00
if neededFields.contains("*") {
unneededFieldsOrig := unneededFields.clone()
unneededFieldsCount := 0
if len(outFields) > 0 {
for _, f := range outFields {
if unneededFieldsOrig.contains(f) {
unneededFieldsCount++
}
if !keepOriginalFields && !skipEmptyResults {
unneededFields.add(f)
}
}
}
if len(outFields) == 0 || unneededFieldsCount < len(outFields) {
unneededFields.remove(fromField)
if iff != nil {
unneededFields.removeFields(iff.neededFields)
}
}
} else {
neededFieldsOrig := neededFields.clone()
needFromField := len(outFields) == 0
if len(outFields) > 0 {
needFromField = false
for _, f := range outFields {
if neededFieldsOrig.contains(f) {
needFromField = true
}
if !keepOriginalFields && !skipEmptyResults {
neededFields.remove(f)
}
}
}
if needFromField {
neededFields.add(fromField)
if iff != nil {
neededFields.addFields(iff.neededFields)
}
}
}
}
2024-05-20 12:09:39 +00:00
type fieldsUnpackerContext struct {
2024-05-22 11:04:16 +00:00
fieldPrefix string
fields []Field
a arena
2024-05-20 12:09:39 +00:00
}
func (uctx *fieldsUnpackerContext) reset() {
2024-05-22 11:04:16 +00:00
uctx.fieldPrefix = ""
2024-05-20 12:09:39 +00:00
uctx.resetFields()
uctx.a.reset()
}
func (uctx *fieldsUnpackerContext) resetFields() {
clear(uctx.fields)
uctx.fields = uctx.fields[:0]
}
2024-05-30 17:27:44 +00:00
func (uctx *fieldsUnpackerContext) init(fieldPrefix string) {
2024-05-22 11:04:16 +00:00
uctx.reset()
uctx.fieldPrefix = fieldPrefix
}
func (uctx *fieldsUnpackerContext) addField(name, value string) {
2024-05-20 21:23:22 +00:00
nameCopy := ""
2024-05-22 11:04:16 +00:00
fieldPrefix := uctx.fieldPrefix
2024-05-20 21:23:22 +00:00
if fieldPrefix != "" {
2024-05-26 22:58:41 +00:00
b := uctx.a.b
bLen := len(b)
b = append(b, fieldPrefix...)
b = append(b, name...)
uctx.a.b = b
nameCopy = bytesutil.ToUnsafeString(b[bLen:])
2024-05-20 21:23:22 +00:00
} else {
nameCopy = uctx.a.copyString(name)
}
2024-05-20 12:09:39 +00:00
valueCopy := uctx.a.copyString(value)
uctx.fields = append(uctx.fields, Field{
Name: nameCopy,
Value: valueCopy,
})
}
2024-05-25 18:13:01 +00:00
func newPipeUnpackProcessor(workersCount int, unpackFunc func(uctx *fieldsUnpackerContext, s string), ppNext pipeProcessor,
2024-05-24 20:17:21 +00:00
fromField string, fieldPrefix string, keepOriginalFields, skipEmptyResults bool, iff *ifFilter) *pipeUnpackProcessor {
2024-05-22 09:38:10 +00:00
2024-05-20 12:09:39 +00:00
return &pipeUnpackProcessor{
unpackFunc: unpackFunc,
2024-05-25 18:13:01 +00:00
ppNext: ppNext,
2024-05-20 12:09:39 +00:00
shards: make([]pipeUnpackProcessorShard, workersCount),
2024-05-24 16:31:49 +00:00
fromField: fromField,
fieldPrefix: fieldPrefix,
keepOriginalFields: keepOriginalFields,
2024-05-24 20:17:21 +00:00
skipEmptyResults: skipEmptyResults,
2024-05-24 16:31:49 +00:00
iff: iff,
2024-05-20 12:09:39 +00:00
}
}
type pipeUnpackProcessor struct {
2024-05-22 11:04:16 +00:00
unpackFunc func(uctx *fieldsUnpackerContext, s string)
2024-05-25 18:13:01 +00:00
ppNext pipeProcessor
2024-05-20 12:09:39 +00:00
shards []pipeUnpackProcessorShard
2024-05-24 16:31:49 +00:00
fromField string
fieldPrefix string
keepOriginalFields bool
2024-05-24 20:17:21 +00:00
skipEmptyResults bool
2024-05-21 10:55:11 +00:00
iff *ifFilter
2024-05-20 12:09:39 +00:00
}
type pipeUnpackProcessorShard struct {
pipeUnpackProcessorShardNopad
// The padding prevents false sharing on widespread platforms with 128 mod (cache line size) = 0 .
_ [128 - unsafe.Sizeof(pipeUnpackProcessorShardNopad{})%128]byte
}
type pipeUnpackProcessorShardNopad struct {
2024-05-21 10:55:11 +00:00
bm bitmap
2024-05-20 12:09:39 +00:00
uctx fieldsUnpackerContext
wctx pipeUnpackWriteContext
}
func (pup *pipeUnpackProcessor) writeBlock(workerID uint, br *blockResult) {
if len(br.timestamps) == 0 {
return
}
shard := &pup.shards[workerID]
2024-05-25 18:13:01 +00:00
shard.wctx.init(workerID, pup.ppNext, pup.keepOriginalFields, pup.skipEmptyResults, br)
2024-05-30 17:27:44 +00:00
shard.uctx.init(pup.fieldPrefix)
2024-05-20 12:09:39 +00:00
2024-05-21 10:55:11 +00:00
bm := &shard.bm
bm.init(len(br.timestamps))
bm.setBits()
if pup.iff != nil {
pup.iff.f.applyToBlockResult(br, bm)
if bm.isZero() {
2024-05-25 18:13:01 +00:00
pup.ppNext.writeBlock(workerID, br)
2024-05-21 10:55:11 +00:00
return
}
}
2024-05-20 12:09:39 +00:00
c := br.getColumnByName(pup.fromField)
if c.isConst {
v := c.valuesEncoded[0]
shard.uctx.resetFields()
2024-05-22 11:04:16 +00:00
pup.unpackFunc(&shard.uctx, v)
2024-05-20 12:09:39 +00:00
for rowIdx := range br.timestamps {
2024-05-21 10:55:11 +00:00
if bm.isSetBit(rowIdx) {
shard.wctx.writeRow(rowIdx, shard.uctx.fields)
} else {
shard.wctx.writeRow(rowIdx, nil)
}
2024-05-20 12:09:39 +00:00
}
} else {
values := c.getValues(br)
2024-05-25 12:37:26 +00:00
vPrev := ""
hadUnpacks := false
2024-05-20 12:09:39 +00:00
for i, v := range values {
2024-05-21 10:55:11 +00:00
if bm.isSetBit(i) {
2024-05-25 12:37:26 +00:00
if !hadUnpacks || vPrev != v {
vPrev = v
hadUnpacks = true
2024-05-21 10:55:11 +00:00
shard.uctx.resetFields()
2024-05-22 11:04:16 +00:00
pup.unpackFunc(&shard.uctx, v)
2024-05-21 10:55:11 +00:00
}
shard.wctx.writeRow(i, shard.uctx.fields)
} else {
shard.wctx.writeRow(i, nil)
2024-05-20 12:09:39 +00:00
}
}
}
shard.wctx.flush()
2024-05-22 11:04:16 +00:00
shard.wctx.reset()
2024-05-20 12:09:39 +00:00
shard.uctx.reset()
}
func (pup *pipeUnpackProcessor) flush() error {
return nil
}
type pipeUnpackWriteContext struct {
2024-05-24 16:31:49 +00:00
workerID uint
2024-05-25 18:13:01 +00:00
ppNext pipeProcessor
2024-05-24 16:31:49 +00:00
keepOriginalFields bool
2024-05-24 20:17:21 +00:00
skipEmptyResults bool
2024-05-20 12:09:39 +00:00
2024-05-22 11:04:16 +00:00
brSrc *blockResult
csSrc []*blockResultColumn
2024-05-20 12:09:39 +00:00
rcs []resultColumn
br blockResult
2024-05-21 08:39:02 +00:00
// rowsCount is the number of rows in the current block
rowsCount int
// valuesLen is the total length of values in the current block
2024-05-20 12:09:39 +00:00
valuesLen int
}
2024-05-20 21:23:22 +00:00
func (wctx *pipeUnpackWriteContext) reset() {
2024-05-22 00:03:31 +00:00
wctx.workerID = 0
2024-05-25 18:13:01 +00:00
wctx.ppNext = nil
2024-05-24 16:31:49 +00:00
wctx.keepOriginalFields = false
2024-05-30 17:27:44 +00:00
wctx.skipEmptyResults = false
2024-05-22 11:04:16 +00:00
2024-05-20 21:23:22 +00:00
wctx.brSrc = nil
wctx.csSrc = nil
rcs := wctx.rcs
for i := range rcs {
rcs[i].reset()
}
wctx.rcs = rcs[:0]
2024-05-21 08:39:02 +00:00
wctx.rowsCount = 0
2024-05-20 21:23:22 +00:00
wctx.valuesLen = 0
}
2024-05-25 18:13:01 +00:00
func (wctx *pipeUnpackWriteContext) init(workerID uint, ppNext pipeProcessor, keepOriginalFields, skipEmptyResults bool, brSrc *blockResult) {
2024-05-20 21:23:22 +00:00
wctx.reset()
2024-05-22 00:03:31 +00:00
wctx.workerID = workerID
2024-05-25 18:13:01 +00:00
wctx.ppNext = ppNext
2024-05-24 16:31:49 +00:00
wctx.keepOriginalFields = keepOriginalFields
2024-05-24 20:17:21 +00:00
wctx.skipEmptyResults = skipEmptyResults
2024-05-22 11:04:16 +00:00
2024-05-20 12:09:39 +00:00
wctx.brSrc = brSrc
wctx.csSrc = brSrc.getColumns()
}
func (wctx *pipeUnpackWriteContext) writeRow(rowIdx int, extraFields []Field) {
csSrc := wctx.csSrc
rcs := wctx.rcs
areEqualColumns := len(rcs) == len(csSrc)+len(extraFields)
if areEqualColumns {
for i, f := range extraFields {
if rcs[len(csSrc)+i].name != f.Name {
areEqualColumns = false
break
}
}
}
if !areEqualColumns {
2024-05-25 18:13:01 +00:00
// send the current block to ppNext and construct a block with new set of columns
2024-05-20 12:09:39 +00:00
wctx.flush()
rcs = wctx.rcs[:0]
for _, c := range csSrc {
rcs = appendResultColumnWithName(rcs, c.name)
}
for _, f := range extraFields {
rcs = appendResultColumnWithName(rcs, f.Name)
}
wctx.rcs = rcs
}
brSrc := wctx.brSrc
for i, c := range csSrc {
v := c.getValueAtRow(brSrc, rowIdx)
rcs[i].addValue(v)
wctx.valuesLen += len(v)
}
for i, f := range extraFields {
v := f.Value
2024-05-24 20:17:21 +00:00
if v == "" && wctx.skipEmptyResults || wctx.keepOriginalFields {
2024-05-24 16:31:49 +00:00
idx := getBlockResultColumnIdxByName(csSrc, f.Name)
if idx >= 0 {
vOrig := csSrc[idx].getValueAtRow(brSrc, rowIdx)
if vOrig != "" {
v = vOrig
}
}
}
2024-05-20 12:09:39 +00:00
rcs[len(csSrc)+i].addValue(v)
wctx.valuesLen += len(v)
}
2024-05-21 08:39:02 +00:00
wctx.rowsCount++
2024-05-20 12:09:39 +00:00
if wctx.valuesLen >= 1_000_000 {
wctx.flush()
}
}
func (wctx *pipeUnpackWriteContext) flush() {
rcs := wctx.rcs
wctx.valuesLen = 0
2024-05-25 18:13:01 +00:00
// Flush rcs to ppNext
2024-05-20 12:09:39 +00:00
br := &wctx.br
2024-05-21 08:39:02 +00:00
br.setResultColumns(rcs, wctx.rowsCount)
wctx.rowsCount = 0
2024-05-25 18:13:01 +00:00
wctx.ppNext.writeBlock(wctx.workerID, br)
2024-05-20 12:09:39 +00:00
br.reset()
for i := range rcs {
rcs[i].resetValues()
}
}