diff --git a/lib/logstorage/filter_or.go b/lib/logstorage/filter_or.go index 5349c4370..256337a0c 100644 --- a/lib/logstorage/filter_or.go +++ b/lib/logstorage/filter_or.go @@ -2,6 +2,7 @@ package logstorage import ( "strings" + "sync" ) // filterOr contains filters joined by OR operator. @@ -9,6 +10,9 @@ import ( // It is epxressed as `f1 OR f2 ... OR fN` in LogsQL. type filterOr struct { filters []filter + + byFieldTokensOnce sync.Once + byFieldTokens []fieldTokens } func (fo *filterOr) String() string { @@ -51,6 +55,12 @@ func (fo *filterOr) applyToBlockResult(br *blockResult, bm *bitmap) { } func (fo *filterOr) applyToBlockSearch(bs *blockSearch, bm *bitmap) { + if !fo.matchBloomFilters(bs) { + // Fast path - fo doesn't match bloom filters. + bm.resetBits() + return + } + bmResult := getBitmap(bm.bitsLen) bmTmp := getBitmap(bm.bitsLen) for _, f := range fo.filters { @@ -72,3 +82,105 @@ func (fo *filterOr) applyToBlockSearch(bs *blockSearch, bm *bitmap) { bm.copyFrom(bmResult) putBitmap(bmResult) } + +func (fo *filterOr) matchBloomFilters(bs *blockSearch) bool { + byFieldTokens := fo.getByFieldTokens() + if len(byFieldTokens) == 0 { + return true + } + + for _, fieldTokens := range byFieldTokens { + fieldName := fieldTokens.field + tokens := fieldTokens.tokens + + v := bs.csh.getConstColumnValue(fieldName) + if v != "" { + if matchStringByAllTokens(v, tokens) { + return true + } + continue + } + + ch := bs.csh.getColumnHeader(fieldName) + if ch == nil { + continue + } + + if ch.valueType == valueTypeDict { + if matchDictValuesByAllTokens(ch.valuesDict.values, tokens) { + return true + } + continue + } + if matchBloomFilterAllTokens(bs, ch, tokens) { + return true + } + } + + return false +} + +func (fo *filterOr) getByFieldTokens() []fieldTokens { + fo.byFieldTokensOnce.Do(fo.initByFieldTokens) + return fo.byFieldTokens +} + +func (fo *filterOr) initByFieldTokens() { + m := make(map[string][][]string) + byFieldFilters := make(map[string]int) + var fieldNames []string + + for _, f := range fo.filters { + fieldName := "" + var tokens []string + + switch t := f.(type) { + case *filterExact: + fieldName = t.fieldName + tokens = t.getTokens() + case *filterExactPrefix: + fieldName = t.fieldName + tokens = t.getTokens() + case *filterPhrase: + fieldName = t.fieldName + tokens = t.getTokens() + case *filterPrefix: + fieldName = t.fieldName + tokens = t.getTokens() + case *filterRegexp: + fieldName = t.fieldName + tokens = t.getTokens() + case *filterSequence: + fieldName = t.fieldName + tokens = t.getTokens() + } + + fieldName = getCanonicalColumnName(fieldName) + byFieldFilters[fieldName]++ + + if len(tokens) > 0 { + if _, ok := m[fieldName]; !ok { + fieldNames = append(fieldNames, fieldName) + } + m[fieldName] = append(m[fieldName], tokens) + } + } + + var byFieldTokens []fieldTokens + for _, fieldName := range fieldNames { + if byFieldFilters[fieldName] < 2 { + // It is faster to perform bloom filter match inline when visiting the corresponding column + continue + } + + commonTokens := getCommonTokens(m[fieldName]) + if len(commonTokens) > 0 { + byFieldTokens = append(byFieldTokens, fieldTokens{ + field: fieldName, + tokens: commonTokens, + }) + } + } + + fo.byFieldTokens = byFieldTokens +}