diff --git a/lib/logstorage/pipe_extract.go b/lib/logstorage/pipe_extract.go index ef135e722..c3e976896 100644 --- a/lib/logstorage/pipe_extract.go +++ b/lib/logstorage/pipe_extract.go @@ -2,7 +2,6 @@ package logstorage import ( "fmt" - "unsafe" ) // pipeExtract processes '| extract from ' pipe. @@ -69,109 +68,20 @@ func (pe *pipeExtract) updateNeededFields(neededFields, unneededFields fieldsSet } func (pe *pipeExtract) newPipeProcessor(workersCount int, _ <-chan struct{}, _ func(), ppBase pipeProcessor) pipeProcessor { - shards := make([]pipeExtractProcessorShard, workersCount) - for i := range shards { - shards[i] = pipeExtractProcessorShard{ - pipeExtractProcessorShardNopad: pipeExtractProcessorShardNopad{ - ef: newPattern(pe.steps), - }, + patterns := make([]*pattern, workersCount) + for i := range patterns { + patterns[i] = newPattern(pe.steps) + } + + unpackFunc := func(uctx *fieldsUnpackerContext, s, fieldPrefix string) { + ptn := patterns[uctx.workerID] + ptn.apply(s) + for _, f := range ptn.fields { + uctx.addField(f.name, *f.value, fieldPrefix) } } - pep := &pipeExtractProcessor{ - pe: pe, - ppBase: ppBase, - - shards: shards, - } - return pep -} - -type pipeExtractProcessor struct { - pe *pipeExtract - ppBase pipeProcessor - - shards []pipeExtractProcessorShard -} - -type pipeExtractProcessorShard struct { - pipeExtractProcessorShardNopad - - // The padding prevents false sharing on widespread platforms with 128 mod (cache line size) = 0 . - _ [128 - unsafe.Sizeof(pipeExtractProcessorShardNopad{})%128]byte -} - -type pipeExtractProcessorShardNopad struct { - ef *pattern - - bm bitmap - - uctx fieldsUnpackerContext - wctx pipeUnpackWriteContext -} - -func (pep *pipeExtractProcessor) writeBlock(workerID uint, br *blockResult) { - if len(br.timestamps) == 0 { - return - } - - shard := &pep.shards[workerID] - shard.wctx.init(workerID, br, pep.ppBase) - ef := shard.ef - - bm := &shard.bm - bm.init(len(br.timestamps)) - bm.setBits() - if iff := pep.pe.iff; iff != nil { - iff.f.applyToBlockResult(br, bm) - if bm.isZero() { - // Fast path - nothing to extract. - pep.ppBase.writeBlock(workerID, br) - return - } - } - - c := br.getColumnByName(pep.pe.fromField) - if c.isConst { - v := c.valuesEncoded[0] - ef.apply(v) - for _, f := range ef.fields { - shard.uctx.addField(f.name, *f.value, "") - } - for i := range br.timestamps { - if bm.isSetBit(i) { - shard.wctx.writeRow(i, shard.uctx.fields) - } else { - shard.wctx.writeRow(i, nil) - } - - } - } else { - values := c.getValues(br) - vPrevApplied := "" - for i, v := range values { - if bm.isSetBit(i) { - if vPrevApplied != v { - ef.apply(v) - shard.uctx.resetFields() - for _, f := range ef.fields { - shard.uctx.addField(f.name, *f.value, "") - } - vPrevApplied = v - } - shard.wctx.writeRow(i, shard.uctx.fields) - } else { - shard.wctx.writeRow(i, nil) - } - } - } - - shard.wctx.flush() - shard.uctx.reset() -} - -func (pep *pipeExtractProcessor) flush() error { - return nil + return newPipeUnpackProcessor(workersCount, unpackFunc, ppBase, pe.fromField, "", pe.iff) } func parsePipeExtract(lex *lexer) (*pipeExtract, error) { diff --git a/lib/logstorage/pipe_unpack.go b/lib/logstorage/pipe_unpack.go index 7dff8cd03..92e05270e 100644 --- a/lib/logstorage/pipe_unpack.go +++ b/lib/logstorage/pipe_unpack.go @@ -7,11 +7,13 @@ import ( ) type fieldsUnpackerContext struct { - fields []Field - a arena + workerID uint + fields []Field + a arena } func (uctx *fieldsUnpackerContext) reset() { + uctx.workerID = 0 uctx.resetFields() uctx.a.reset() } @@ -42,6 +44,12 @@ func (uctx *fieldsUnpackerContext) addField(name, value, fieldPrefix string) { func newPipeUnpackProcessor(workersCount int, unpackFunc func(uctx *fieldsUnpackerContext, s, fieldPrefix string), ppBase pipeProcessor, fromField, fieldPrefix string, iff *ifFilter) *pipeUnpackProcessor { + + shards := make([]pipeUnpackProcessorShard, workersCount) + for i := range shards { + shards[i].wctx.workerID = uint(i) + } + return &pipeUnpackProcessor{ unpackFunc: unpackFunc, ppBase: ppBase,