diff --git a/lib/logstorage/bitmap.go b/lib/logstorage/bitmap.go index b12680848..ac1c10695 100644 --- a/lib/logstorage/bitmap.go +++ b/lib/logstorage/bitmap.go @@ -98,6 +98,13 @@ func (bm *bitmap) areAllBitsSet() bool { return true } +func (bm *bitmap) isSetBit(i int) bool { + wordIdx := uint(i) / 64 + wordOffset := uint(i) % 64 + word := bm.a[wordIdx] + return (word & (1 << wordOffset)) != 0 +} + func (bm *bitmap) andNot(x *bitmap) { if bm.bitsLen != x.bitsLen { logger.Panicf("BUG: cannot merge bitmaps with distinct lengths; %d vs %d", bm.bitsLen, x.bitsLen) diff --git a/lib/logstorage/parser_test.go b/lib/logstorage/parser_test.go index bd45658d8..7b986feee 100644 --- a/lib/logstorage/parser_test.go +++ b/lib/logstorage/parser_test.go @@ -1001,6 +1001,7 @@ func TestParseQuerySuccess(t *testing.T) { f(`* | extract from '' 'foobaz'`, `* | extract "foobaz"`) f("* | extract from x `foobaz`", `* | extract from x "foobaz"`) f("* | extract from x foobaz", `* | extract from x "foobaz"`) + f("* | extract from x foobaz if (a:b)", `* | extract from x "foobaz" if (a:b)`) // unpack_json pipe f(`* | unpack_json`, `* | unpack_json`) @@ -1571,14 +1572,24 @@ func TestQueryGetNeededColumns(t *testing.T) { f(`* | rm x,y | field_names as bar | fields baz`, `*`, `x,y`) f(`* | extract from s1 "x"`, `*`, `f1,f2`) + f(`* | extract from s1 "x" if (f3:foo)`, `*`, `f1,f2`) + f(`* | extract from s1 "x" if (f1:foo)`, `*`, `f2`) f(`* | extract from s1 "x" | fields foo`, `foo`, ``) + f(`* | extract from s1 "x" if (x:bar) | fields foo`, `foo`, ``) f(`* | extract from s1 "x" | fields foo,s1`, `foo,s1`, ``) + f(`* | extract from s1 "x" if (x:bar) | fields foo,s1`, `foo,s1`, ``) f(`* | extract from s1 "x" | fields foo,f1`, `foo,s1`, ``) + f(`* | extract from s1 "x" if (x:bar) | fields foo,f1`, `foo,s1,x`, ``) f(`* | extract from s1 "x" | fields foo,f1,f2`, `foo,s1`, ``) + f(`* | extract from s1 "x" if (x:bar) | fields foo,f1,f2`, `foo,s1,x`, ``) f(`* | extract from s1 "x" | rm foo`, `*`, `f1,f2,foo`) + f(`* | extract from s1 "x" if (x:bar) | rm foo`, `*`, `f1,f2,foo`) f(`* | extract from s1 "x" | rm foo,s1`, `*`, `f1,f2,foo`) + f(`* | extract from s1 "x" if (x:bar) | rm foo,s1`, `*`, `f1,f2,foo`) f(`* | extract from s1 "x" | rm foo,f1`, `*`, `f1,f2,foo`) + f(`* | extract from s1 "x" if (x:bar) | rm foo,f1`, `*`, `f1,f2,foo`) f(`* | extract from s1 "x" | rm foo,f1,f2`, `*`, `f1,f2,foo,s1`) + f(`* | extract from s1 "x" if (x:bar) | rm foo,f1,f2`, `*`, `f1,f2,foo,s1`) f(`* | unpack_json`, `*`, ``) f(`* | unpack_json from s1`, `*`, ``) diff --git a/lib/logstorage/pipe_extract.go b/lib/logstorage/pipe_extract.go index eab780856..46bef0415 100644 --- a/lib/logstorage/pipe_extract.go +++ b/lib/logstorage/pipe_extract.go @@ -13,6 +13,10 @@ type pipeExtract struct { steps []patternStep pattern string + + // iff is an optional filter for skipping the extract func + iff filter + iffNeededFields []string } func (pe *pipeExtract) String() string { @@ -21,6 +25,9 @@ func (pe *pipeExtract) String() string { s += " from " + quoteTokenIfNeeded(pe.fromField) } s += " " + quoteTokenIfNeeded(pe.pattern) + if pe.iff != nil { + s += " if (" + pe.iff.String() + ")" + } return s } @@ -37,19 +44,22 @@ func (pe *pipeExtract) updateNeededFields(neededFields, unneededFields fieldsSet } } if needFromField { + unneededFields.removeFields(pe.iffNeededFields) unneededFields.remove(pe.fromField) } else { unneededFields.add(pe.fromField) } } else { + neededFieldsOrig := neededFields.clone() needFromField := false for _, step := range pe.steps { - if step.field != "" && neededFields.contains(step.field) { + if step.field != "" && neededFieldsOrig.contains(step.field) { needFromField = true neededFields.remove(step.field) } } if needFromField { + neededFields.addFields(pe.iffNeededFields) neededFields.add(pe.fromField) } } @@ -59,14 +69,9 @@ func (pe *pipeExtract) newPipeProcessor(workersCount int, _ <-chan struct{}, _ f shards := make([]pipeExtractProcessorShard, workersCount) for i := range shards { ef := newPattern(pe.steps) - rcs := make([]resultColumn, len(ef.fields)) - for j := range rcs { - rcs[j].name = ef.fields[j].name - } shards[i] = pipeExtractProcessorShard{ pipeExtractProcessorShardNopad: pipeExtractProcessorShardNopad{ - ef: ef, - rcs: rcs, + ef: ef, }, } } @@ -97,7 +102,10 @@ type pipeExtractProcessorShard struct { type pipeExtractProcessorShardNopad struct { ef *pattern - rcs []resultColumn + bm bitmap + + uctx fieldsUnpackerContext + wctx pipeUnpackWriteContext } func (pep *pipeExtractProcessor) writeBlock(workerID uint, br *blockResult) { @@ -106,38 +114,55 @@ func (pep *pipeExtractProcessor) writeBlock(workerID uint, br *blockResult) { } shard := &pep.shards[workerID] + shard.wctx.init(br, pep.ppBase) ef := shard.ef - rcs := shard.rcs + + bm := &shard.bm + bm.init(len(br.timestamps)) + bm.setBits() + if iff := pep.pe.iff; iff != nil { + iff.applyToBlockResult(br, bm) + if bm.isZero() { + // Fast path - nothing to extract. + pep.ppBase.writeBlock(workerID, br) + return + } + } c := br.getColumnByName(pep.pe.fromField) if c.isConst { v := c.valuesEncoded[0] ef.apply(v) - for i, f := range ef.fields { - fieldValue := *f.value - rc := &rcs[i] - for range br.timestamps { - rc.addValue(fieldValue) + for i := range br.timestamps { + shard.uctx.resetFields() + if bm.isSetBit(i) { + for _, f := range ef.fields { + shard.uctx.addField(f.name, *f.value, "") + } } + shard.wctx.writeRow(i, shard.uctx.fields) + } } else { values := c.getValues(br) + vPrevApplied := "" for i, v := range values { - if i == 0 || values[i-1] != v { - ef.apply(v) - } - for j, f := range ef.fields { - rcs[j].addValue(*f.value) + shard.uctx.resetFields() + if bm.isSetBit(i) { + if vPrevApplied != v { + ef.apply(v) + vPrevApplied = v + } + for _, f := range ef.fields { + shard.uctx.addField(f.name, *f.value, "") + } } + shard.wctx.writeRow(i, shard.uctx.fields) } } - br.addResultColumns(rcs) - pep.ppBase.writeBlock(workerID, br) - - for i := range rcs { - rcs[i].resetValues() - } + shard.wctx.flush() + shard.uctx.reset() } func (pep *pipeExtractProcessor) flush() error { @@ -160,6 +185,7 @@ func parsePipeExtract(lex *lexer) (*pipeExtract, error) { fromField = f } + // parse pattern pattern, err := getCompoundToken(lex) if err != nil { return nil, fmt.Errorf("cannot read 'pattern': %w", err) @@ -174,5 +200,19 @@ func parsePipeExtract(lex *lexer) (*pipeExtract, error) { steps: steps, pattern: pattern, } + + // parse optional if (...) + if lex.isKeyword("if") { + iff, err := parseIfFilter(lex) + if err != nil { + return nil, fmt.Errorf("cannot parse 'if' filter for %s: %w", pe, err) + } + pe.iff = iff + + neededFields := newFieldsSet() + iff.updateNeededFields(neededFields) + pe.iffNeededFields = neededFields.getAll() + } + return pe, nil } diff --git a/lib/logstorage/pipe_extract_test.go b/lib/logstorage/pipe_extract_test.go index 940b610a9..8f3437c1b 100644 --- a/lib/logstorage/pipe_extract_test.go +++ b/lib/logstorage/pipe_extract_test.go @@ -4,6 +4,167 @@ import ( "testing" ) +func TestPipeExtract(t *testing.T) { + f := func(pipeStr string, rows, rowsExpected [][]Field) { + t.Helper() + expectPipeResults(t, pipeStr, rows, rowsExpected) + } + + // single row, extract from _msg + f(`extract "baz= a="`, [][]Field{ + { + {"_msg", `foo=bar baz="x y=z" a=b`}, + }, + }, [][]Field{ + { + {"_msg", `foo=bar baz="x y=z" a=b`}, + {"abc", "x y=z"}, + {"aa", "b"}, + }, + }) + + // single row, extract from _msg into _msg + f(`extract "msg=<_msg>"`, [][]Field{ + { + {"_msg", `msg=bar`}, + }, + }, [][]Field{ + { + {"_msg", "bar"}, + }, + }) + + // single row, extract from non-existing field + f(`extract from x "foo="`, [][]Field{ + { + {"_msg", `foo=bar`}, + }, + }, [][]Field{ + { + {"_msg", `foo=bar`}, + {"bar", ""}, + }, + }) + + // single row, pattern mismatch + f(`extract from x "foo="`, [][]Field{ + { + {"x", `foobar`}, + }, + }, [][]Field{ + { + {"x", `foobar`}, + {"bar", ""}, + }, + }) + + // single row, partial partern match + f(`extract from x "foo= baz="`, [][]Field{ + { + {"x", `a foo="a\"b\\c" cde baz=aa`}, + }, + }, [][]Field{ + { + {"x", `a foo="a\"b\\c" cde baz=aa`}, + {"bar", `a"b\c`}, + {"xx", ""}, + }, + }) + + // single row, overwirte existing column + f(`extract from x "foo= baz="`, [][]Field{ + { + {"x", `a foo=cc baz=aa b`}, + {"bar", "abc"}, + }, + }, [][]Field{ + { + {"x", `a foo=cc baz=aa b`}, + {"bar", `cc`}, + {"xx", `aa b`}, + }, + }) + + // single row, if match + f(`extract from x "foo= baz=" if (x:baz)`, [][]Field{ + { + {"x", `a foo=cc baz=aa b`}, + {"bar", "abc"}, + }, + }, [][]Field{ + { + {"x", `a foo=cc baz=aa b`}, + {"bar", `cc`}, + {"xx", `aa b`}, + }, + }) + + // single row, if mismatch + f(`extract from x "foo= baz=" if (bar:"")`, [][]Field{ + { + {"x", `a foo=cc baz=aa b`}, + {"bar", "abc"}, + }, + }, [][]Field{ + { + {"x", `a foo=cc baz=aa b`}, + {"bar", `abc`}, + }, + }) + + // multiple rows with distinct set of labels + f(`extract "ip= " if (!ip:keep)`, [][]Field{ + { + {"foo", "bar"}, + {"_msg", "request from ip=1.2.3.4 xxx"}, + {"f3", "y"}, + }, + { + {"foo", "aaa"}, + {"_msg", "ip=5.4.3.1 abcd"}, + {"ip", "keep"}, + {"a", "b"}, + }, + { + {"foo", "aaa"}, + {"_msg", "ip=34.32.11.94 abcd"}, + {"ip", "ppp"}, + {"a", "b"}, + }, + { + {"foo", "klkfs"}, + {"_msg", "sdfdsfds dsf fd fdsa ip=123 abcd"}, + {"ip", "bbbsd"}, + {"a", "klo2i"}, + }, + }, [][]Field{ + { + {"foo", "bar"}, + {"_msg", "request from ip=1.2.3.4 xxx"}, + {"f3", "y"}, + {"ip", "1.2.3.4"}, + }, + { + {"foo", "aaa"}, + {"_msg", "ip=5.4.3.1 abcd"}, + {"ip", "keep"}, + {"a", "b"}, + }, + { + {"foo", "aaa"}, + {"_msg", "ip=34.32.11.94 abcd"}, + {"ip", "34.32.11.94"}, + {"a", "b"}, + }, + { + {"foo", "klkfs"}, + {"_msg", "sdfdsfds dsf fd fdsa ip=123 abcd"}, + {"ip", "123"}, + {"a", "klo2i"}, + }, + }) +} + func TestPipeExtractUpdateNeededFields(t *testing.T) { f := func(s string, neededFields, unneededFields, neededFieldsExpected, unneededFieldsExpected string) { t.Helper() @@ -12,28 +173,41 @@ func TestPipeExtractUpdateNeededFields(t *testing.T) { // all the needed fields f("extract from x ''", "*", "", "*", "foo") + f("extract from x '' if (foo:bar)", "*", "", "*", "") - // all the needed fields, unneeded fields do not intersect with fromField and output fields + // unneeded fields do not intersect with pattern and output fields f("extract from x ''", "*", "f1,f2", "*", "f1,f2,foo") + f("extract from x '' if (f1:x)", "*", "f1,f2", "*", "f2,foo") + f("extract from x '' if (foo:bar f1:x)", "*", "f1,f2", "*", "f2") - // all the needed fields, unneeded fields intersect with fromField + // unneeded fields intersect with pattern f("extract from x ''", "*", "f2,x", "*", "f2,foo") + f("extract from x '' if (f1:abc)", "*", "f2,x", "*", "f2,foo") + f("extract from x '' if (f2:abc)", "*", "f2,x", "*", "foo") - // all the needed fields, unneeded fields intersect with output fields + // unneeded fields intersect with output fields f("extract from x 'x'", "*", "f2,foo", "*", "bar,f2,foo") + f("extract from x 'x' if (f1:abc)", "*", "f2,foo", "*", "bar,f2,foo") + f("extract from x 'x' if (f2:abc foo:w)", "*", "f2,foo", "*", "bar") - // all the needed fields, unneeded fields intersect with all the output fields + // unneeded fields intersect with all the output fields f("extract from x 'x'", "*", "f2,foo,bar", "*", "bar,f2,foo,x") + f("extract from x 'x if (a:b f2:q x:y foo:w)'", "*", "f2,foo,bar", "*", "bar,f2,foo,x") - // needed fields do not intersect with fromField and output fields + // needed fields do not intersect with pattern and output fields f("extract from x 'x'", "f1,f2", "", "f1,f2", "") + f("extract from x 'x' if (a:b)", "f1,f2", "", "f1,f2", "") + f("extract from x 'x' if (f1:b)", "f1,f2", "", "f1,f2", "") - // needed fields intersect with fromField + // needed fields intersect with pattern field f("extract from x 'x'", "f2,x", "", "f2,x", "") + f("extract from x 'x' if (a:b)", "f2,x", "", "f2,x", "") // needed fields intersect with output fields f("extract from x 'x'", "f2,foo", "", "f2,x", "") + f("extract from x 'x' if (a:b)", "f2,foo", "", "a,f2,x", "") - // needed fields intersect with fromField and output fields + // needed fields intersect with pattern and output fields f("extract from x 'x'", "f2,foo,x,y", "", "f2,x,y", "") + f("extract from x 'x' if (a:b foo:q)", "f2,foo,x,y", "", "a,f2,foo,x,y", "") } diff --git a/lib/logstorage/pipe_unpack_json_test.go b/lib/logstorage/pipe_unpack_json_test.go index 2f04d5737..99e0fccd3 100644 --- a/lib/logstorage/pipe_unpack_json_test.go +++ b/lib/logstorage/pipe_unpack_json_test.go @@ -173,6 +173,11 @@ func expectPipeResults(t *testing.T, pipeStr string, rows, rowsExpected [][]Fiel t.Fatalf("unexpected error when parsing %q: %s", pipeStr, err) } + pipeStrResult := p.String() + if pipeStrResult != pipeStr { + t.Fatalf("unexpected string representation for the pipe; got\n%s\nwant\n%s", pipeStrResult, pipeStr) + } + workersCount := 5 stopCh := make(chan struct{}) cancel := func() {} @@ -240,6 +245,7 @@ func (brw *testBlockResultWriter) flush() { for i := range brw.rcs { brw.rcs[i].resetValues() } + brw.ppBase.flush() } func newTestPipeProcessor() *testPipeProcessor {