diff --git a/lib/logstorage/pipe_extract.go b/lib/logstorage/pipe_extract.go index c3e976896..69d89bac1 100644 --- a/lib/logstorage/pipe_extract.go +++ b/lib/logstorage/pipe_extract.go @@ -11,7 +11,7 @@ type pipeExtract struct { fromField string steps []patternStep - pattern string + patternStr string // iff is an optional filter for skipping the extract func iff *ifFilter @@ -22,7 +22,7 @@ func (pe *pipeExtract) String() string { if !isMsgFieldName(pe.fromField) { s += " from " + quoteTokenIfNeeded(pe.fromField) } - s += " " + quoteTokenIfNeeded(pe.pattern) + s += " " + quoteTokenIfNeeded(pe.patternStr) if pe.iff != nil { s += " " + pe.iff.String() } @@ -73,11 +73,11 @@ func (pe *pipeExtract) newPipeProcessor(workersCount int, _ <-chan struct{}, _ f patterns[i] = newPattern(pe.steps) } - unpackFunc := func(uctx *fieldsUnpackerContext, s, fieldPrefix string) { + unpackFunc := func(uctx *fieldsUnpackerContext, s string) { ptn := patterns[uctx.workerID] ptn.apply(s) for _, f := range ptn.fields { - uctx.addField(f.name, *f.value, fieldPrefix) + uctx.addField(f.name, *f.value) } } @@ -101,19 +101,19 @@ func parsePipeExtract(lex *lexer) (*pipeExtract, error) { } // parse pattern - pattern, err := getCompoundToken(lex) + patternStr, err := getCompoundToken(lex) if err != nil { return nil, fmt.Errorf("cannot read 'pattern': %w", err) } - steps, err := parsePatternSteps(pattern) + steps, err := parsePatternSteps(patternStr) if err != nil { - return nil, fmt.Errorf("cannot parse 'pattern' %q: %w", pattern, err) + return nil, fmt.Errorf("cannot parse 'pattern' %q: %w", patternStr, err) } pe := &pipeExtract{ - fromField: fromField, - steps: steps, - pattern: pattern, + fromField: fromField, + steps: steps, + patternStr: patternStr, } // parse optional if (...) diff --git a/lib/logstorage/pipe_unpack.go b/lib/logstorage/pipe_unpack.go index 92e05270e..5b4333452 100644 --- a/lib/logstorage/pipe_unpack.go +++ b/lib/logstorage/pipe_unpack.go @@ -7,13 +7,16 @@ import ( ) type fieldsUnpackerContext struct { - workerID uint - fields []Field - a arena + workerID uint + fieldPrefix string + + fields []Field + a arena } func (uctx *fieldsUnpackerContext) reset() { uctx.workerID = 0 + uctx.fieldPrefix = "" uctx.resetFields() uctx.a.reset() } @@ -23,8 +26,16 @@ func (uctx *fieldsUnpackerContext) resetFields() { uctx.fields = uctx.fields[:0] } -func (uctx *fieldsUnpackerContext) addField(name, value, fieldPrefix string) { +func (uctx *fieldsUnpackerContext) init(workerID uint, fieldPrefix string) { + uctx.reset() + + uctx.workerID = workerID + uctx.fieldPrefix = fieldPrefix +} + +func (uctx *fieldsUnpackerContext) addField(name, value string) { nameCopy := "" + fieldPrefix := uctx.fieldPrefix if fieldPrefix != "" { nameBuf := uctx.a.newBytes(len(fieldPrefix) + len(name)) copy(nameBuf, fieldPrefix) @@ -42,14 +53,9 @@ func (uctx *fieldsUnpackerContext) addField(name, value, fieldPrefix string) { }) } -func newPipeUnpackProcessor(workersCount int, unpackFunc func(uctx *fieldsUnpackerContext, s, fieldPrefix string), ppBase pipeProcessor, +func newPipeUnpackProcessor(workersCount int, unpackFunc func(uctx *fieldsUnpackerContext, s string), ppBase pipeProcessor, fromField, fieldPrefix string, iff *ifFilter) *pipeUnpackProcessor { - shards := make([]pipeUnpackProcessorShard, workersCount) - for i := range shards { - shards[i].wctx.workerID = uint(i) - } - return &pipeUnpackProcessor{ unpackFunc: unpackFunc, ppBase: ppBase, @@ -63,7 +69,7 @@ func newPipeUnpackProcessor(workersCount int, unpackFunc func(uctx *fieldsUnpack } type pipeUnpackProcessor struct { - unpackFunc func(uctx *fieldsUnpackerContext, s, fieldPrefix string) + unpackFunc func(uctx *fieldsUnpackerContext, s string) ppBase pipeProcessor shards []pipeUnpackProcessorShard @@ -94,7 +100,8 @@ func (pup *pipeUnpackProcessor) writeBlock(workerID uint, br *blockResult) { } shard := &pup.shards[workerID] - shard.wctx.init(workerID, br, pup.ppBase) + shard.wctx.init(workerID, pup.ppBase, br) + shard.uctx.init(workerID, pup.fieldPrefix) bm := &shard.bm bm.init(len(br.timestamps)) @@ -111,7 +118,7 @@ func (pup *pipeUnpackProcessor) writeBlock(workerID uint, br *blockResult) { if c.isConst { v := c.valuesEncoded[0] shard.uctx.resetFields() - pup.unpackFunc(&shard.uctx, v, pup.fieldPrefix) + pup.unpackFunc(&shard.uctx, v) for rowIdx := range br.timestamps { if bm.isSetBit(rowIdx) { shard.wctx.writeRow(rowIdx, shard.uctx.fields) @@ -126,7 +133,7 @@ func (pup *pipeUnpackProcessor) writeBlock(workerID uint, br *blockResult) { if bm.isSetBit(i) { if vPrevApplied != v { shard.uctx.resetFields() - pup.unpackFunc(&shard.uctx, v, pup.fieldPrefix) + pup.unpackFunc(&shard.uctx, v) vPrevApplied = v } shard.wctx.writeRow(i, shard.uctx.fields) @@ -137,6 +144,7 @@ func (pup *pipeUnpackProcessor) writeBlock(workerID uint, br *blockResult) { } shard.wctx.flush() + shard.wctx.reset() shard.uctx.reset() } @@ -146,10 +154,11 @@ func (pup *pipeUnpackProcessor) flush() error { type pipeUnpackWriteContext struct { workerID uint - brSrc *blockResult - csSrc []*blockResultColumn ppBase pipeProcessor + brSrc *blockResult + csSrc []*blockResultColumn + rcs []resultColumn br blockResult @@ -162,9 +171,10 @@ type pipeUnpackWriteContext struct { func (wctx *pipeUnpackWriteContext) reset() { wctx.workerID = 0 + wctx.ppBase = nil + wctx.brSrc = nil wctx.csSrc = nil - wctx.ppBase = nil rcs := wctx.rcs for i := range rcs { @@ -176,13 +186,14 @@ func (wctx *pipeUnpackWriteContext) reset() { wctx.valuesLen = 0 } -func (wctx *pipeUnpackWriteContext) init(workerID uint, brSrc *blockResult, ppBase pipeProcessor) { +func (wctx *pipeUnpackWriteContext) init(workerID uint, ppBase pipeProcessor, brSrc *blockResult) { wctx.reset() wctx.workerID = workerID + wctx.ppBase = ppBase + wctx.brSrc = brSrc wctx.csSrc = brSrc.getColumns() - wctx.ppBase = ppBase } func (wctx *pipeUnpackWriteContext) writeRow(rowIdx int, extraFields []Field) { diff --git a/lib/logstorage/pipe_unpack_json.go b/lib/logstorage/pipe_unpack_json.go index 4d2e15308..3b58f9a41 100644 --- a/lib/logstorage/pipe_unpack_json.go +++ b/lib/logstorage/pipe_unpack_json.go @@ -52,7 +52,7 @@ func (pu *pipeUnpackJSON) newPipeProcessor(workersCount int, _ <-chan struct{}, return newPipeUnpackProcessor(workersCount, unpackJSON, ppBase, pu.fromField, pu.resultPrefix, pu.iff) } -func unpackJSON(uctx *fieldsUnpackerContext, s, fieldPrefix string) { +func unpackJSON(uctx *fieldsUnpackerContext, s string) { if len(s) == 0 || s[0] != '{' { // This isn't a JSON object return @@ -60,7 +60,7 @@ func unpackJSON(uctx *fieldsUnpackerContext, s, fieldPrefix string) { p := GetJSONParser() if err := p.ParseLogMessage(bytesutil.ToUnsafeBytes(s)); err == nil { for _, f := range p.Fields { - uctx.addField(f.Name, f.Value, fieldPrefix) + uctx.addField(f.Name, f.Value) } } PutJSONParser(p) diff --git a/lib/logstorage/pipe_unpack_logfmt.go b/lib/logstorage/pipe_unpack_logfmt.go index d2ad828df..d371b011f 100644 --- a/lib/logstorage/pipe_unpack_logfmt.go +++ b/lib/logstorage/pipe_unpack_logfmt.go @@ -51,7 +51,7 @@ func (pu *pipeUnpackLogfmt) newPipeProcessor(workersCount int, _ <-chan struct{} return newPipeUnpackProcessor(workersCount, unpackLogfmt, ppBase, pu.fromField, pu.resultPrefix, pu.iff) } -func unpackLogfmt(uctx *fieldsUnpackerContext, s, fieldPrefix string) { +func unpackLogfmt(uctx *fieldsUnpackerContext, s string) { for { // Search for field name n := strings.IndexByte(s, '=') @@ -63,13 +63,13 @@ func unpackLogfmt(uctx *fieldsUnpackerContext, s, fieldPrefix string) { name := strings.TrimSpace(s[:n]) s = s[n+1:] if len(s) == 0 { - uctx.addField(name, "", fieldPrefix) + uctx.addField(name, "") } // Search for field value value, nOffset := tryUnquoteString(s) if nOffset >= 0 { - uctx.addField(name, value, fieldPrefix) + uctx.addField(name, value) s = s[nOffset:] if len(s) == 0 { return @@ -81,10 +81,10 @@ func unpackLogfmt(uctx *fieldsUnpackerContext, s, fieldPrefix string) { } else { n := strings.IndexByte(s, ' ') if n < 0 { - uctx.addField(name, s, fieldPrefix) + uctx.addField(name, s) return } - uctx.addField(name, s[:n], fieldPrefix) + uctx.addField(name, s[:n]) s = s[n+1:] } }