diff --git a/lib/logstorage/filter_in.go b/lib/logstorage/filter_in.go index b1d7b4821..742a0fb40 100644 --- a/lib/logstorage/filter_in.go +++ b/lib/logstorage/filter_in.go @@ -3,6 +3,7 @@ package logstorage import ( "fmt" "math" + "slices" "strings" "sync" @@ -27,8 +28,9 @@ type filterIn struct { // qFieldName must be set to field name for obtaining values from if q is non-nil. qFieldName string - tokenSetsOnce sync.Once - tokenSets [][]string + tokensOnce sync.Once + commonTokens []string + tokenSets [][]string stringValuesOnce sync.Once stringValues map[string]struct{} @@ -74,28 +76,15 @@ func (fi *filterIn) updateNeededFields(neededFields fieldsSet) { neededFields.add(fi.fieldName) } -func (fi *filterIn) getTokenSets() [][]string { - fi.tokenSetsOnce.Do(fi.initTokenSets) - return fi.tokenSets +func (fi *filterIn) getTokens() ([]string, [][]string) { + fi.tokensOnce.Do(fi.initTokens) + return fi.commonTokens, fi.tokenSets } -// It is faster to match every row in the block instead of checking too big number of tokenSets against bloom filter. -const maxTokenSetsToInit = 1000 +func (fi *filterIn) initTokens() { + commonTokens, tokenSets := getCommonTokensAndTokenSets(fi.values) -func (fi *filterIn) initTokenSets() { - values := fi.values - tokenSetsLen := len(values) - if tokenSetsLen > maxTokenSetsToInit { - tokenSetsLen = maxTokenSetsToInit - } - tokenSets := make([][]string, 0, tokenSetsLen+1) - for _, v := range values { - tokens := tokenizeStrings(nil, []string{v}) - tokenSets = append(tokenSets, tokens) - if len(tokens) > maxTokenSetsToInit { - break - } - } + fi.commonTokens = commonTokens fi.tokenSets = tokenSets } @@ -385,47 +374,47 @@ func (fi *filterIn) applyToBlockSearch(bs *blockSearch, bm *bitmap) { return } - tokenSets := fi.getTokenSets() + commonTokens, tokenSets := fi.getTokens() switch ch.valueType { case valueTypeString: stringValues := fi.getStringValues() - matchAnyValue(bs, ch, bm, stringValues, tokenSets) + matchAnyValue(bs, ch, bm, stringValues, commonTokens, tokenSets) case valueTypeDict: stringValues := fi.getStringValues() matchValuesDictByAnyValue(bs, ch, bm, stringValues) case valueTypeUint8: binValues := fi.getUint8Values() - matchAnyValue(bs, ch, bm, binValues, tokenSets) + matchAnyValue(bs, ch, bm, binValues, commonTokens, tokenSets) case valueTypeUint16: binValues := fi.getUint16Values() - matchAnyValue(bs, ch, bm, binValues, tokenSets) + matchAnyValue(bs, ch, bm, binValues, commonTokens, tokenSets) case valueTypeUint32: binValues := fi.getUint32Values() - matchAnyValue(bs, ch, bm, binValues, tokenSets) + matchAnyValue(bs, ch, bm, binValues, commonTokens, tokenSets) case valueTypeUint64: binValues := fi.getUint64Values() - matchAnyValue(bs, ch, bm, binValues, tokenSets) + matchAnyValue(bs, ch, bm, binValues, commonTokens, tokenSets) case valueTypeFloat64: binValues := fi.getFloat64Values() - matchAnyValue(bs, ch, bm, binValues, tokenSets) + matchAnyValue(bs, ch, bm, binValues, commonTokens, tokenSets) case valueTypeIPv4: binValues := fi.getIPv4Values() - matchAnyValue(bs, ch, bm, binValues, tokenSets) + matchAnyValue(bs, ch, bm, binValues, commonTokens, tokenSets) case valueTypeTimestampISO8601: binValues := fi.getTimestampISO8601Values() - matchAnyValue(bs, ch, bm, binValues, tokenSets) + matchAnyValue(bs, ch, bm, binValues, commonTokens, tokenSets) default: logger.Panicf("FATAL: %s: unknown valueType=%d", bs.partPath(), ch.valueType) } } -func matchAnyValue(bs *blockSearch, ch *columnHeader, bm *bitmap, values map[string]struct{}, tokenSets [][]string) { +func matchAnyValue(bs *blockSearch, ch *columnHeader, bm *bitmap, values map[string]struct{}, commonTokens []string, tokenSets [][]string) { if len(values) == 0 { bm.resetBits() return } - if !matchBloomFilterAnyTokenSet(bs, ch, tokenSets) { + if !matchBloomFilterAnyTokenSet(bs, ch, commonTokens, tokenSets) { bm.resetBits() return } @@ -435,7 +424,10 @@ func matchAnyValue(bs *blockSearch, ch *columnHeader, bm *bitmap, values map[str }) } -func matchBloomFilterAnyTokenSet(bs *blockSearch, ch *columnHeader, tokenSets [][]string) bool { +func matchBloomFilterAnyTokenSet(bs *blockSearch, ch *columnHeader, commonTokens []string, tokenSets [][]string) bool { + if len(commonTokens) > 0 { + return matchBloomFilterAllTokens(bs, ch, commonTokens) + } if len(tokenSets) == 0 { return false } @@ -453,6 +445,9 @@ func matchBloomFilterAnyTokenSet(bs *blockSearch, ch *columnHeader, tokenSets [] return false } +// It is faster to match every row in the block instead of checking too big number of tokenSets against bloom filter. +const maxTokenSetsToInit = 1000 + func matchValuesDictByAnyValue(bs *blockSearch, ch *columnHeader, bm *bitmap, values map[string]struct{}) { bb := bbPool.Get() for _, v := range ch.valuesDict.values { @@ -465,3 +460,44 @@ func matchValuesDictByAnyValue(bs *blockSearch, ch *columnHeader, bm *bitmap, va matchEncodedValuesDict(bs, ch, bm, bb.B) bbPool.Put(bb) } + +func getCommonTokensAndTokenSets(values []string) ([]string, [][]string) { + tokenSets := make([][]string, len(values)) + for i, v := range values { + tokenSets[i] = tokenizeStrings(nil, []string{v}) + } + + commonTokens := getCommonTokens(tokenSets) + if len(commonTokens) == 0 { + return nil, tokenSets + } + return commonTokens, nil +} + +func getCommonTokens(tokenSets [][]string) []string { + if len(tokenSets) == 0 { + return nil + } + + m := make(map[string]struct{}, len(tokenSets[0])) + for _, token := range tokenSets[0] { + m[token] = struct{}{} + } + + for _, tokens := range tokenSets[1:] { + if len(m) == 0 { + return nil + } + for token := range m { + if !slices.Contains(tokens, token) { + delete(m, token) + } + } + } + + tokens := make([]string, 0, len(m)) + for token := range m { + tokens = append(tokens, token) + } + return tokens +} diff --git a/lib/logstorage/filter_in_test.go b/lib/logstorage/filter_in_test.go index ffe8e944b..29f1e35ff 100644 --- a/lib/logstorage/filter_in_test.go +++ b/lib/logstorage/filter_in_test.go @@ -1,6 +1,8 @@ package logstorage import ( + "reflect" + "slices" "testing" ) @@ -688,3 +690,32 @@ func TestFilterIn(t *testing.T) { testFilterMatchForColumns(t, columns, fi, "_msg", nil) }) } + +func TestGetCommonTokensAndTokenSets(t *testing.T) { + f := func(values []string, commonTokensExpected []string, tokenSetsExpected [][]string) { + t.Helper() + + commonTokens, tokenSets := getCommonTokensAndTokenSets(values) + slices.Sort(commonTokens) + + if !reflect.DeepEqual(commonTokens, commonTokensExpected) { + t.Fatalf("unexpected commonTokens for values=%q\ngot\n%q\nwant\n%q", values, commonTokens, commonTokensExpected) + } + + for i, tokens := range tokenSets { + slices.Sort(tokens) + tokensExpected := tokenSetsExpected[i] + if !reflect.DeepEqual(tokens, tokensExpected) { + t.Fatalf("unexpected tokens for value=%q\ngot\n%q\nwant\n%q", values[i], tokens, tokensExpected) + } + } + } + + f(nil, nil, nil) + f([]string{"foo"}, []string{"foo"}, nil) + f([]string{"foo", "foo"}, []string{"foo"}, nil) + f([]string{"foo", "bar", "bar", "foo"}, nil, [][]string{{"foo"}, {"bar"}, {"bar"}, {"foo"}}) + f([]string{"foo", "foo bar", "bar foo"}, []string{"foo"}, nil) + f([]string{"a foo bar", "bar abc foo", "foo abc a bar"}, []string{"bar", "foo"}, nil) + f([]string{"a xfoo bar", "xbar abc foo", "foo abc a bar"}, nil, [][]string{{"a", "bar", "xfoo"}, {"abc", "foo", "xbar"}, {"a", "abc", "bar", "foo"}}) +} diff --git a/lib/logstorage/pipe_math_test.go b/lib/logstorage/pipe_math_test.go index 2b4dc8db0..0703464c4 100644 --- a/lib/logstorage/pipe_math_test.go +++ b/lib/logstorage/pipe_math_test.go @@ -167,3 +167,37 @@ func TestPipeMath(t *testing.T) { }, }) } + +func TestPipeMathUpdateNeededFields(t *testing.T) { + f := func(s string, neededFields, unneededFields, neededFieldsExpected, unneededFieldsExpected string) { + t.Helper() + expectPipeNeededFields(t, s, neededFields, unneededFields, neededFieldsExpected, unneededFieldsExpected) + } + + // all the needed fields + f("math (x + 1) as y", "*", "", "*", "y") + + // all the needed fields, unneeded fields do not intersect with src and dst + f("math (x + 1) as y", "*", "f1,f2", "*", "f1,f2,y") + + // all the needed fields, unneeded fields intersect with src + f("math (x + 1) as y", "*", "f1,x", "*", "f1,y") + + // all the needed fields, unneeded fields intersect with dst + f("math (x + 1) as y", "*", "f1,y", "*", "f1,y") + + // all the needed fields, unneeded fields intersect with src and dst + f("math (x + 1) as y", "*", "f1,x,y", "*", "f1,x,y") + + // needed fields do not intersect with src and dst + f("math (x + 1) as y", "f1,f2", "", "f1,f2", "") + + // needed fields intersect with src + f("math (x + 1) as y", "f1,x", "", "f1,x", "") + + // needed fields intersect with dst + f("math (x + 1) as y", "f1,y", "", "f1,x", "") + + // needed fields intersect with src and dst + f("math (x + 1) as y", "f1,x,y", "", "f1,x", "") +}