This commit is contained in:
Aliaksandr Valialkin 2024-05-20 16:09:07 +02:00
parent fba053b34d
commit a8dde0aeac
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB
5 changed files with 270 additions and 32 deletions

View file

@ -98,6 +98,13 @@ func (bm *bitmap) areAllBitsSet() bool {
return true
}
func (bm *bitmap) isSetBit(i int) bool {
wordIdx := uint(i) / 64
wordOffset := uint(i) % 64
word := bm.a[wordIdx]
return (word & (1 << wordOffset)) != 0
}
func (bm *bitmap) andNot(x *bitmap) {
if bm.bitsLen != x.bitsLen {
logger.Panicf("BUG: cannot merge bitmaps with distinct lengths; %d vs %d", bm.bitsLen, x.bitsLen)

View file

@ -1001,6 +1001,7 @@ func TestParseQuerySuccess(t *testing.T) {
f(`* | extract from '' 'foo<bar>baz'`, `* | extract "foo<bar>baz"`)
f("* | extract from x `foo<bar>baz`", `* | extract from x "foo<bar>baz"`)
f("* | extract from x foo<bar>baz", `* | extract from x "foo<bar>baz"`)
f("* | extract from x foo<bar>baz if (a:b)", `* | extract from x "foo<bar>baz" if (a:b)`)
// unpack_json pipe
f(`* | unpack_json`, `* | unpack_json`)
@ -1571,14 +1572,24 @@ func TestQueryGetNeededColumns(t *testing.T) {
f(`* | rm x,y | field_names as bar | fields baz`, `*`, `x,y`)
f(`* | extract from s1 "<f1>x<f2>"`, `*`, `f1,f2`)
f(`* | extract from s1 "<f1>x<f2>" if (f3:foo)`, `*`, `f1,f2`)
f(`* | extract from s1 "<f1>x<f2>" if (f1:foo)`, `*`, `f2`)
f(`* | extract from s1 "<f1>x<f2>" | fields foo`, `foo`, ``)
f(`* | extract from s1 "<f1>x<f2>" if (x:bar) | fields foo`, `foo`, ``)
f(`* | extract from s1 "<f1>x<f2>" | fields foo,s1`, `foo,s1`, ``)
f(`* | extract from s1 "<f1>x<f2>" if (x:bar) | fields foo,s1`, `foo,s1`, ``)
f(`* | extract from s1 "<f1>x<f2>" | fields foo,f1`, `foo,s1`, ``)
f(`* | extract from s1 "<f1>x<f2>" if (x:bar) | fields foo,f1`, `foo,s1,x`, ``)
f(`* | extract from s1 "<f1>x<f2>" | fields foo,f1,f2`, `foo,s1`, ``)
f(`* | extract from s1 "<f1>x<f2>" if (x:bar) | fields foo,f1,f2`, `foo,s1,x`, ``)
f(`* | extract from s1 "<f1>x<f2>" | rm foo`, `*`, `f1,f2,foo`)
f(`* | extract from s1 "<f1>x<f2>" if (x:bar) | rm foo`, `*`, `f1,f2,foo`)
f(`* | extract from s1 "<f1>x<f2>" | rm foo,s1`, `*`, `f1,f2,foo`)
f(`* | extract from s1 "<f1>x<f2>" if (x:bar) | rm foo,s1`, `*`, `f1,f2,foo`)
f(`* | extract from s1 "<f1>x<f2>" | rm foo,f1`, `*`, `f1,f2,foo`)
f(`* | extract from s1 "<f1>x<f2>" if (x:bar) | rm foo,f1`, `*`, `f1,f2,foo`)
f(`* | extract from s1 "<f1>x<f2>" | rm foo,f1,f2`, `*`, `f1,f2,foo,s1`)
f(`* | extract from s1 "<f1>x<f2>" if (x:bar) | rm foo,f1,f2`, `*`, `f1,f2,foo,s1`)
f(`* | unpack_json`, `*`, ``)
f(`* | unpack_json from s1`, `*`, ``)

View file

@ -13,6 +13,10 @@ type pipeExtract struct {
steps []patternStep
pattern string
// iff is an optional filter for skipping the extract func
iff filter
iffNeededFields []string
}
func (pe *pipeExtract) String() string {
@ -21,6 +25,9 @@ func (pe *pipeExtract) String() string {
s += " from " + quoteTokenIfNeeded(pe.fromField)
}
s += " " + quoteTokenIfNeeded(pe.pattern)
if pe.iff != nil {
s += " if (" + pe.iff.String() + ")"
}
return s
}
@ -37,19 +44,22 @@ func (pe *pipeExtract) updateNeededFields(neededFields, unneededFields fieldsSet
}
}
if needFromField {
unneededFields.removeFields(pe.iffNeededFields)
unneededFields.remove(pe.fromField)
} else {
unneededFields.add(pe.fromField)
}
} else {
neededFieldsOrig := neededFields.clone()
needFromField := false
for _, step := range pe.steps {
if step.field != "" && neededFields.contains(step.field) {
if step.field != "" && neededFieldsOrig.contains(step.field) {
needFromField = true
neededFields.remove(step.field)
}
}
if needFromField {
neededFields.addFields(pe.iffNeededFields)
neededFields.add(pe.fromField)
}
}
@ -59,14 +69,9 @@ func (pe *pipeExtract) newPipeProcessor(workersCount int, _ <-chan struct{}, _ f
shards := make([]pipeExtractProcessorShard, workersCount)
for i := range shards {
ef := newPattern(pe.steps)
rcs := make([]resultColumn, len(ef.fields))
for j := range rcs {
rcs[j].name = ef.fields[j].name
}
shards[i] = pipeExtractProcessorShard{
pipeExtractProcessorShardNopad: pipeExtractProcessorShardNopad{
ef: ef,
rcs: rcs,
ef: ef,
},
}
}
@ -97,7 +102,10 @@ type pipeExtractProcessorShard struct {
type pipeExtractProcessorShardNopad struct {
ef *pattern
rcs []resultColumn
bm bitmap
uctx fieldsUnpackerContext
wctx pipeUnpackWriteContext
}
func (pep *pipeExtractProcessor) writeBlock(workerID uint, br *blockResult) {
@ -106,38 +114,55 @@ func (pep *pipeExtractProcessor) writeBlock(workerID uint, br *blockResult) {
}
shard := &pep.shards[workerID]
shard.wctx.init(br, pep.ppBase)
ef := shard.ef
rcs := shard.rcs
bm := &shard.bm
bm.init(len(br.timestamps))
bm.setBits()
if iff := pep.pe.iff; iff != nil {
iff.applyToBlockResult(br, bm)
if bm.isZero() {
// Fast path - nothing to extract.
pep.ppBase.writeBlock(workerID, br)
return
}
}
c := br.getColumnByName(pep.pe.fromField)
if c.isConst {
v := c.valuesEncoded[0]
ef.apply(v)
for i, f := range ef.fields {
fieldValue := *f.value
rc := &rcs[i]
for range br.timestamps {
rc.addValue(fieldValue)
for i := range br.timestamps {
shard.uctx.resetFields()
if bm.isSetBit(i) {
for _, f := range ef.fields {
shard.uctx.addField(f.name, *f.value, "")
}
}
shard.wctx.writeRow(i, shard.uctx.fields)
}
} else {
values := c.getValues(br)
vPrevApplied := ""
for i, v := range values {
if i == 0 || values[i-1] != v {
ef.apply(v)
}
for j, f := range ef.fields {
rcs[j].addValue(*f.value)
shard.uctx.resetFields()
if bm.isSetBit(i) {
if vPrevApplied != v {
ef.apply(v)
vPrevApplied = v
}
for _, f := range ef.fields {
shard.uctx.addField(f.name, *f.value, "")
}
}
shard.wctx.writeRow(i, shard.uctx.fields)
}
}
br.addResultColumns(rcs)
pep.ppBase.writeBlock(workerID, br)
for i := range rcs {
rcs[i].resetValues()
}
shard.wctx.flush()
shard.uctx.reset()
}
func (pep *pipeExtractProcessor) flush() error {
@ -160,6 +185,7 @@ func parsePipeExtract(lex *lexer) (*pipeExtract, error) {
fromField = f
}
// parse pattern
pattern, err := getCompoundToken(lex)
if err != nil {
return nil, fmt.Errorf("cannot read 'pattern': %w", err)
@ -174,5 +200,19 @@ func parsePipeExtract(lex *lexer) (*pipeExtract, error) {
steps: steps,
pattern: pattern,
}
// parse optional if (...)
if lex.isKeyword("if") {
iff, err := parseIfFilter(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse 'if' filter for %s: %w", pe, err)
}
pe.iff = iff
neededFields := newFieldsSet()
iff.updateNeededFields(neededFields)
pe.iffNeededFields = neededFields.getAll()
}
return pe, nil
}

View file

@ -4,6 +4,167 @@ import (
"testing"
)
func TestPipeExtract(t *testing.T) {
f := func(pipeStr string, rows, rowsExpected [][]Field) {
t.Helper()
expectPipeResults(t, pipeStr, rows, rowsExpected)
}
// single row, extract from _msg
f(`extract "baz=<abc> a=<aa>"`, [][]Field{
{
{"_msg", `foo=bar baz="x y=z" a=b`},
},
}, [][]Field{
{
{"_msg", `foo=bar baz="x y=z" a=b`},
{"abc", "x y=z"},
{"aa", "b"},
},
})
// single row, extract from _msg into _msg
f(`extract "msg=<_msg>"`, [][]Field{
{
{"_msg", `msg=bar`},
},
}, [][]Field{
{
{"_msg", "bar"},
},
})
// single row, extract from non-existing field
f(`extract from x "foo=<bar>"`, [][]Field{
{
{"_msg", `foo=bar`},
},
}, [][]Field{
{
{"_msg", `foo=bar`},
{"bar", ""},
},
})
// single row, pattern mismatch
f(`extract from x "foo=<bar>"`, [][]Field{
{
{"x", `foobar`},
},
}, [][]Field{
{
{"x", `foobar`},
{"bar", ""},
},
})
// single row, partial partern match
f(`extract from x "foo=<bar> baz=<xx>"`, [][]Field{
{
{"x", `a foo="a\"b\\c" cde baz=aa`},
},
}, [][]Field{
{
{"x", `a foo="a\"b\\c" cde baz=aa`},
{"bar", `a"b\c`},
{"xx", ""},
},
})
// single row, overwirte existing column
f(`extract from x "foo=<bar> baz=<xx>"`, [][]Field{
{
{"x", `a foo=cc baz=aa b`},
{"bar", "abc"},
},
}, [][]Field{
{
{"x", `a foo=cc baz=aa b`},
{"bar", `cc`},
{"xx", `aa b`},
},
})
// single row, if match
f(`extract from x "foo=<bar> baz=<xx>" if (x:baz)`, [][]Field{
{
{"x", `a foo=cc baz=aa b`},
{"bar", "abc"},
},
}, [][]Field{
{
{"x", `a foo=cc baz=aa b`},
{"bar", `cc`},
{"xx", `aa b`},
},
})
// single row, if mismatch
f(`extract from x "foo=<bar> baz=<xx>" if (bar:"")`, [][]Field{
{
{"x", `a foo=cc baz=aa b`},
{"bar", "abc"},
},
}, [][]Field{
{
{"x", `a foo=cc baz=aa b`},
{"bar", `abc`},
},
})
// multiple rows with distinct set of labels
f(`extract "ip=<ip> " if (!ip:keep)`, [][]Field{
{
{"foo", "bar"},
{"_msg", "request from ip=1.2.3.4 xxx"},
{"f3", "y"},
},
{
{"foo", "aaa"},
{"_msg", "ip=5.4.3.1 abcd"},
{"ip", "keep"},
{"a", "b"},
},
{
{"foo", "aaa"},
{"_msg", "ip=34.32.11.94 abcd"},
{"ip", "ppp"},
{"a", "b"},
},
{
{"foo", "klkfs"},
{"_msg", "sdfdsfds dsf fd fdsa ip=123 abcd"},
{"ip", "bbbsd"},
{"a", "klo2i"},
},
}, [][]Field{
{
{"foo", "bar"},
{"_msg", "request from ip=1.2.3.4 xxx"},
{"f3", "y"},
{"ip", "1.2.3.4"},
},
{
{"foo", "aaa"},
{"_msg", "ip=5.4.3.1 abcd"},
{"ip", "keep"},
{"a", "b"},
},
{
{"foo", "aaa"},
{"_msg", "ip=34.32.11.94 abcd"},
{"ip", "34.32.11.94"},
{"a", "b"},
},
{
{"foo", "klkfs"},
{"_msg", "sdfdsfds dsf fd fdsa ip=123 abcd"},
{"ip", "123"},
{"a", "klo2i"},
},
})
}
func TestPipeExtractUpdateNeededFields(t *testing.T) {
f := func(s string, neededFields, unneededFields, neededFieldsExpected, unneededFieldsExpected string) {
t.Helper()
@ -12,28 +173,41 @@ func TestPipeExtractUpdateNeededFields(t *testing.T) {
// all the needed fields
f("extract from x '<foo>'", "*", "", "*", "foo")
f("extract from x '<foo>' if (foo:bar)", "*", "", "*", "")
// all the needed fields, unneeded fields do not intersect with fromField and output fields
// unneeded fields do not intersect with pattern and output fields
f("extract from x '<foo>'", "*", "f1,f2", "*", "f1,f2,foo")
f("extract from x '<foo>' if (f1:x)", "*", "f1,f2", "*", "f2,foo")
f("extract from x '<foo>' if (foo:bar f1:x)", "*", "f1,f2", "*", "f2")
// all the needed fields, unneeded fields intersect with fromField
// unneeded fields intersect with pattern
f("extract from x '<foo>'", "*", "f2,x", "*", "f2,foo")
f("extract from x '<foo>' if (f1:abc)", "*", "f2,x", "*", "f2,foo")
f("extract from x '<foo>' if (f2:abc)", "*", "f2,x", "*", "foo")
// all the needed fields, unneeded fields intersect with output fields
// unneeded fields intersect with output fields
f("extract from x '<foo>x<bar>'", "*", "f2,foo", "*", "bar,f2,foo")
f("extract from x '<foo>x<bar>' if (f1:abc)", "*", "f2,foo", "*", "bar,f2,foo")
f("extract from x '<foo>x<bar>' if (f2:abc foo:w)", "*", "f2,foo", "*", "bar")
// all the needed fields, unneeded fields intersect with all the output fields
// unneeded fields intersect with all the output fields
f("extract from x '<foo>x<bar>'", "*", "f2,foo,bar", "*", "bar,f2,foo,x")
f("extract from x '<foo>x<bar> if (a:b f2:q x:y foo:w)'", "*", "f2,foo,bar", "*", "bar,f2,foo,x")
// needed fields do not intersect with fromField and output fields
// needed fields do not intersect with pattern and output fields
f("extract from x '<foo>x<bar>'", "f1,f2", "", "f1,f2", "")
f("extract from x '<foo>x<bar>' if (a:b)", "f1,f2", "", "f1,f2", "")
f("extract from x '<foo>x<bar>' if (f1:b)", "f1,f2", "", "f1,f2", "")
// needed fields intersect with fromField
// needed fields intersect with pattern field
f("extract from x '<foo>x<bar>'", "f2,x", "", "f2,x", "")
f("extract from x '<foo>x<bar>' if (a:b)", "f2,x", "", "f2,x", "")
// needed fields intersect with output fields
f("extract from x '<foo>x<bar>'", "f2,foo", "", "f2,x", "")
f("extract from x '<foo>x<bar>' if (a:b)", "f2,foo", "", "a,f2,x", "")
// needed fields intersect with fromField and output fields
// needed fields intersect with pattern and output fields
f("extract from x '<foo>x<bar>'", "f2,foo,x,y", "", "f2,x,y", "")
f("extract from x '<foo>x<bar>' if (a:b foo:q)", "f2,foo,x,y", "", "a,f2,foo,x,y", "")
}

View file

@ -173,6 +173,11 @@ func expectPipeResults(t *testing.T, pipeStr string, rows, rowsExpected [][]Fiel
t.Fatalf("unexpected error when parsing %q: %s", pipeStr, err)
}
pipeStrResult := p.String()
if pipeStrResult != pipeStr {
t.Fatalf("unexpected string representation for the pipe; got\n%s\nwant\n%s", pipeStrResult, pipeStr)
}
workersCount := 5
stopCh := make(chan struct{})
cancel := func() {}
@ -240,6 +245,7 @@ func (brw *testBlockResultWriter) flush() {
for i := range brw.rcs {
brw.rcs[i].resetValues()
}
brw.ppBase.flush()
}
func newTestPipeProcessor() *testPipeProcessor {