This commit is contained in:
Aliaksandr Valialkin 2024-05-22 13:04:16 +02:00
parent ddc3914fa7
commit fb251af08a
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB
4 changed files with 47 additions and 36 deletions

View file

@ -11,7 +11,7 @@ type pipeExtract struct {
fromField string fromField string
steps []patternStep steps []patternStep
pattern string patternStr string
// iff is an optional filter for skipping the extract func // iff is an optional filter for skipping the extract func
iff *ifFilter iff *ifFilter
@ -22,7 +22,7 @@ func (pe *pipeExtract) String() string {
if !isMsgFieldName(pe.fromField) { if !isMsgFieldName(pe.fromField) {
s += " from " + quoteTokenIfNeeded(pe.fromField) s += " from " + quoteTokenIfNeeded(pe.fromField)
} }
s += " " + quoteTokenIfNeeded(pe.pattern) s += " " + quoteTokenIfNeeded(pe.patternStr)
if pe.iff != nil { if pe.iff != nil {
s += " " + pe.iff.String() s += " " + pe.iff.String()
} }
@ -73,11 +73,11 @@ func (pe *pipeExtract) newPipeProcessor(workersCount int, _ <-chan struct{}, _ f
patterns[i] = newPattern(pe.steps) patterns[i] = newPattern(pe.steps)
} }
unpackFunc := func(uctx *fieldsUnpackerContext, s, fieldPrefix string) { unpackFunc := func(uctx *fieldsUnpackerContext, s string) {
ptn := patterns[uctx.workerID] ptn := patterns[uctx.workerID]
ptn.apply(s) ptn.apply(s)
for _, f := range ptn.fields { for _, f := range ptn.fields {
uctx.addField(f.name, *f.value, fieldPrefix) uctx.addField(f.name, *f.value)
} }
} }
@ -101,19 +101,19 @@ func parsePipeExtract(lex *lexer) (*pipeExtract, error) {
} }
// parse pattern // parse pattern
pattern, err := getCompoundToken(lex) patternStr, err := getCompoundToken(lex)
if err != nil { if err != nil {
return nil, fmt.Errorf("cannot read 'pattern': %w", err) return nil, fmt.Errorf("cannot read 'pattern': %w", err)
} }
steps, err := parsePatternSteps(pattern) steps, err := parsePatternSteps(patternStr)
if err != nil { if err != nil {
return nil, fmt.Errorf("cannot parse 'pattern' %q: %w", pattern, err) return nil, fmt.Errorf("cannot parse 'pattern' %q: %w", patternStr, err)
} }
pe := &pipeExtract{ pe := &pipeExtract{
fromField: fromField, fromField: fromField,
steps: steps, steps: steps,
pattern: pattern, patternStr: patternStr,
} }
// parse optional if (...) // parse optional if (...)

View file

@ -7,13 +7,16 @@ import (
) )
type fieldsUnpackerContext struct { type fieldsUnpackerContext struct {
workerID uint workerID uint
fields []Field fieldPrefix string
a arena
fields []Field
a arena
} }
func (uctx *fieldsUnpackerContext) reset() { func (uctx *fieldsUnpackerContext) reset() {
uctx.workerID = 0 uctx.workerID = 0
uctx.fieldPrefix = ""
uctx.resetFields() uctx.resetFields()
uctx.a.reset() uctx.a.reset()
} }
@ -23,8 +26,16 @@ func (uctx *fieldsUnpackerContext) resetFields() {
uctx.fields = uctx.fields[:0] uctx.fields = uctx.fields[:0]
} }
func (uctx *fieldsUnpackerContext) addField(name, value, fieldPrefix string) { func (uctx *fieldsUnpackerContext) init(workerID uint, fieldPrefix string) {
uctx.reset()
uctx.workerID = workerID
uctx.fieldPrefix = fieldPrefix
}
func (uctx *fieldsUnpackerContext) addField(name, value string) {
nameCopy := "" nameCopy := ""
fieldPrefix := uctx.fieldPrefix
if fieldPrefix != "" { if fieldPrefix != "" {
nameBuf := uctx.a.newBytes(len(fieldPrefix) + len(name)) nameBuf := uctx.a.newBytes(len(fieldPrefix) + len(name))
copy(nameBuf, fieldPrefix) copy(nameBuf, fieldPrefix)
@ -42,14 +53,9 @@ func (uctx *fieldsUnpackerContext) addField(name, value, fieldPrefix string) {
}) })
} }
func newPipeUnpackProcessor(workersCount int, unpackFunc func(uctx *fieldsUnpackerContext, s, fieldPrefix string), ppBase pipeProcessor, func newPipeUnpackProcessor(workersCount int, unpackFunc func(uctx *fieldsUnpackerContext, s string), ppBase pipeProcessor,
fromField, fieldPrefix string, iff *ifFilter) *pipeUnpackProcessor { fromField, fieldPrefix string, iff *ifFilter) *pipeUnpackProcessor {
shards := make([]pipeUnpackProcessorShard, workersCount)
for i := range shards {
shards[i].wctx.workerID = uint(i)
}
return &pipeUnpackProcessor{ return &pipeUnpackProcessor{
unpackFunc: unpackFunc, unpackFunc: unpackFunc,
ppBase: ppBase, ppBase: ppBase,
@ -63,7 +69,7 @@ func newPipeUnpackProcessor(workersCount int, unpackFunc func(uctx *fieldsUnpack
} }
type pipeUnpackProcessor struct { type pipeUnpackProcessor struct {
unpackFunc func(uctx *fieldsUnpackerContext, s, fieldPrefix string) unpackFunc func(uctx *fieldsUnpackerContext, s string)
ppBase pipeProcessor ppBase pipeProcessor
shards []pipeUnpackProcessorShard shards []pipeUnpackProcessorShard
@ -94,7 +100,8 @@ func (pup *pipeUnpackProcessor) writeBlock(workerID uint, br *blockResult) {
} }
shard := &pup.shards[workerID] shard := &pup.shards[workerID]
shard.wctx.init(workerID, br, pup.ppBase) shard.wctx.init(workerID, pup.ppBase, br)
shard.uctx.init(workerID, pup.fieldPrefix)
bm := &shard.bm bm := &shard.bm
bm.init(len(br.timestamps)) bm.init(len(br.timestamps))
@ -111,7 +118,7 @@ func (pup *pipeUnpackProcessor) writeBlock(workerID uint, br *blockResult) {
if c.isConst { if c.isConst {
v := c.valuesEncoded[0] v := c.valuesEncoded[0]
shard.uctx.resetFields() shard.uctx.resetFields()
pup.unpackFunc(&shard.uctx, v, pup.fieldPrefix) pup.unpackFunc(&shard.uctx, v)
for rowIdx := range br.timestamps { for rowIdx := range br.timestamps {
if bm.isSetBit(rowIdx) { if bm.isSetBit(rowIdx) {
shard.wctx.writeRow(rowIdx, shard.uctx.fields) shard.wctx.writeRow(rowIdx, shard.uctx.fields)
@ -126,7 +133,7 @@ func (pup *pipeUnpackProcessor) writeBlock(workerID uint, br *blockResult) {
if bm.isSetBit(i) { if bm.isSetBit(i) {
if vPrevApplied != v { if vPrevApplied != v {
shard.uctx.resetFields() shard.uctx.resetFields()
pup.unpackFunc(&shard.uctx, v, pup.fieldPrefix) pup.unpackFunc(&shard.uctx, v)
vPrevApplied = v vPrevApplied = v
} }
shard.wctx.writeRow(i, shard.uctx.fields) shard.wctx.writeRow(i, shard.uctx.fields)
@ -137,6 +144,7 @@ func (pup *pipeUnpackProcessor) writeBlock(workerID uint, br *blockResult) {
} }
shard.wctx.flush() shard.wctx.flush()
shard.wctx.reset()
shard.uctx.reset() shard.uctx.reset()
} }
@ -146,10 +154,11 @@ func (pup *pipeUnpackProcessor) flush() error {
type pipeUnpackWriteContext struct { type pipeUnpackWriteContext struct {
workerID uint workerID uint
brSrc *blockResult
csSrc []*blockResultColumn
ppBase pipeProcessor ppBase pipeProcessor
brSrc *blockResult
csSrc []*blockResultColumn
rcs []resultColumn rcs []resultColumn
br blockResult br blockResult
@ -162,9 +171,10 @@ type pipeUnpackWriteContext struct {
func (wctx *pipeUnpackWriteContext) reset() { func (wctx *pipeUnpackWriteContext) reset() {
wctx.workerID = 0 wctx.workerID = 0
wctx.ppBase = nil
wctx.brSrc = nil wctx.brSrc = nil
wctx.csSrc = nil wctx.csSrc = nil
wctx.ppBase = nil
rcs := wctx.rcs rcs := wctx.rcs
for i := range rcs { for i := range rcs {
@ -176,13 +186,14 @@ func (wctx *pipeUnpackWriteContext) reset() {
wctx.valuesLen = 0 wctx.valuesLen = 0
} }
func (wctx *pipeUnpackWriteContext) init(workerID uint, brSrc *blockResult, ppBase pipeProcessor) { func (wctx *pipeUnpackWriteContext) init(workerID uint, ppBase pipeProcessor, brSrc *blockResult) {
wctx.reset() wctx.reset()
wctx.workerID = workerID wctx.workerID = workerID
wctx.ppBase = ppBase
wctx.brSrc = brSrc wctx.brSrc = brSrc
wctx.csSrc = brSrc.getColumns() wctx.csSrc = brSrc.getColumns()
wctx.ppBase = ppBase
} }
func (wctx *pipeUnpackWriteContext) writeRow(rowIdx int, extraFields []Field) { func (wctx *pipeUnpackWriteContext) writeRow(rowIdx int, extraFields []Field) {

View file

@ -52,7 +52,7 @@ func (pu *pipeUnpackJSON) newPipeProcessor(workersCount int, _ <-chan struct{},
return newPipeUnpackProcessor(workersCount, unpackJSON, ppBase, pu.fromField, pu.resultPrefix, pu.iff) return newPipeUnpackProcessor(workersCount, unpackJSON, ppBase, pu.fromField, pu.resultPrefix, pu.iff)
} }
func unpackJSON(uctx *fieldsUnpackerContext, s, fieldPrefix string) { func unpackJSON(uctx *fieldsUnpackerContext, s string) {
if len(s) == 0 || s[0] != '{' { if len(s) == 0 || s[0] != '{' {
// This isn't a JSON object // This isn't a JSON object
return return
@ -60,7 +60,7 @@ func unpackJSON(uctx *fieldsUnpackerContext, s, fieldPrefix string) {
p := GetJSONParser() p := GetJSONParser()
if err := p.ParseLogMessage(bytesutil.ToUnsafeBytes(s)); err == nil { if err := p.ParseLogMessage(bytesutil.ToUnsafeBytes(s)); err == nil {
for _, f := range p.Fields { for _, f := range p.Fields {
uctx.addField(f.Name, f.Value, fieldPrefix) uctx.addField(f.Name, f.Value)
} }
} }
PutJSONParser(p) PutJSONParser(p)

View file

@ -51,7 +51,7 @@ func (pu *pipeUnpackLogfmt) newPipeProcessor(workersCount int, _ <-chan struct{}
return newPipeUnpackProcessor(workersCount, unpackLogfmt, ppBase, pu.fromField, pu.resultPrefix, pu.iff) return newPipeUnpackProcessor(workersCount, unpackLogfmt, ppBase, pu.fromField, pu.resultPrefix, pu.iff)
} }
func unpackLogfmt(uctx *fieldsUnpackerContext, s, fieldPrefix string) { func unpackLogfmt(uctx *fieldsUnpackerContext, s string) {
for { for {
// Search for field name // Search for field name
n := strings.IndexByte(s, '=') n := strings.IndexByte(s, '=')
@ -63,13 +63,13 @@ func unpackLogfmt(uctx *fieldsUnpackerContext, s, fieldPrefix string) {
name := strings.TrimSpace(s[:n]) name := strings.TrimSpace(s[:n])
s = s[n+1:] s = s[n+1:]
if len(s) == 0 { if len(s) == 0 {
uctx.addField(name, "", fieldPrefix) uctx.addField(name, "")
} }
// Search for field value // Search for field value
value, nOffset := tryUnquoteString(s) value, nOffset := tryUnquoteString(s)
if nOffset >= 0 { if nOffset >= 0 {
uctx.addField(name, value, fieldPrefix) uctx.addField(name, value)
s = s[nOffset:] s = s[nOffset:]
if len(s) == 0 { if len(s) == 0 {
return return
@ -81,10 +81,10 @@ func unpackLogfmt(uctx *fieldsUnpackerContext, s, fieldPrefix string) {
} else { } else {
n := strings.IndexByte(s, ' ') n := strings.IndexByte(s, ' ')
if n < 0 { if n < 0 {
uctx.addField(name, s, fieldPrefix) uctx.addField(name, s)
return return
} }
uctx.addField(name, s[:n], fieldPrefix) uctx.addField(name, s[:n])
s = s[n+1:] s = s[n+1:]
} }
} }