VictoriaMetrics/lib/logstorage/pipe_update.go
Aliaksandr Valialkin 33610de341
wip
2024-05-25 14:37:26 +02:00

103 lines
2 KiB
Go

package logstorage
import (
"unsafe"
)
func updateNeededFieldsForUpdatePipe(neededFields, unneededFields fieldsSet, field string, iff *ifFilter) {
if neededFields.contains("*") {
if !unneededFields.contains(field) && iff != nil {
unneededFields.removeFields(iff.neededFields)
}
} else {
if neededFields.contains(field) && iff != nil {
neededFields.addFields(iff.neededFields)
}
}
}
func newPipeUpdateProcessor(workersCount int, updateFunc func(a *arena, v string) string, ppBase pipeProcessor, field string, iff *ifFilter) pipeProcessor {
return &pipeUpdateProcessor{
updateFunc: updateFunc,
field: field,
iff: iff,
ppBase: ppBase,
shards: make([]pipeUpdateProcessorShard, workersCount),
}
}
type pipeUpdateProcessor struct {
updateFunc func(a *arena, v string) string
field string
iff *ifFilter
ppBase pipeProcessor
shards []pipeUpdateProcessorShard
}
type pipeUpdateProcessorShard struct {
pipeUpdateProcessorShardNopad
// The padding prevents false sharing on widespread platforms with 128 mod (cache line size) = 0 .
_ [128 - unsafe.Sizeof(pipeUpdateProcessorShardNopad{})%128]byte
}
type pipeUpdateProcessorShardNopad struct {
bm bitmap
rc resultColumn
a arena
}
func (pup *pipeUpdateProcessor) writeBlock(workerID uint, br *blockResult) {
if len(br.timestamps) == 0 {
return
}
shard := &pup.shards[workerID]
bm := &shard.bm
bm.init(len(br.timestamps))
bm.setBits()
if iff := pup.iff; iff != nil {
iff.f.applyToBlockResult(br, bm)
if bm.isZero() {
pup.ppBase.writeBlock(workerID, br)
return
}
}
shard.rc.name = pup.field
c := br.getColumnByName(pup.field)
values := c.getValues(br)
hadUpdates := false
vPrev := ""
for rowIdx, v := range values {
if bm.isSetBit(rowIdx) {
if !hadUpdates || vPrev != v {
vPrev = v
hadUpdates = true
v = pup.updateFunc(&shard.a, v)
}
}
shard.rc.addValue(v)
}
br.addResultColumn(&shard.rc)
pup.ppBase.writeBlock(workerID, br)
shard.rc.reset()
shard.a.reset()
}
func (pup *pipeUpdateProcessor) flush() error {
return nil
}