diff --git a/lib/logstorage/filter_and.go b/lib/logstorage/filter_and.go index 2d797a4e9..27a9125b0 100644 --- a/lib/logstorage/filter_and.go +++ b/lib/logstorage/filter_and.go @@ -13,8 +13,13 @@ import ( type filterAnd struct { filters []filter - msgTokensOnce sync.Once - msgTokens []string + byFieldTokensOnce sync.Once + byFieldTokens []fieldTokens +} + +type fieldTokens struct { + field string + tokens []string } func (fa *filterAnd) String() string { @@ -49,8 +54,8 @@ func (fa *filterAnd) applyToBlockResult(br *blockResult, bm *bitmap) { } func (fa *filterAnd) applyToBlockSearch(bs *blockSearch, bm *bitmap) { - if !fa.matchMessageBloomFilter(bs) { - // Fast path - fa doesn't match _msg bloom filter. + if !fa.matchBloomFilters(bs) { + // Fast path - fa doesn't match bloom filters. bm.resetBits() return } @@ -66,64 +71,103 @@ func (fa *filterAnd) applyToBlockSearch(bs *blockSearch, bm *bitmap) { } } -func (fa *filterAnd) matchMessageBloomFilter(bs *blockSearch) bool { - tokens := fa.getMessageTokens() - if len(tokens) == 0 { +func (fa *filterAnd) matchBloomFilters(bs *blockSearch) bool { + byFieldTokens := fa.getByFieldTokens() + if len(byFieldTokens) == 0 { return true } - v := bs.csh.getConstColumnValue("_msg") - if v != "" { - return matchStringByAllTokens(v, tokens) - } + for _, fieldTokens := range byFieldTokens { + fieldName := fieldTokens.field + tokens := fieldTokens.tokens - ch := bs.csh.getColumnHeader("_msg") - if ch == nil { - return false - } + v := bs.csh.getConstColumnValue(fieldName) + if v != "" { + if !matchStringByAllTokens(v, tokens) { + return false + } + continue + } - if ch.valueType == valueTypeDict { - return matchDictValuesByAllTokens(ch.valuesDict.values, tokens) - } - return matchBloomFilterAllTokens(bs, ch, tokens) -} + ch := bs.csh.getColumnHeader(fieldName) + if ch == nil { + return false + } -func (fa *filterAnd) getMessageTokens() []string { - fa.msgTokensOnce.Do(fa.initMsgTokens) - return fa.msgTokens -} - -func (fa *filterAnd) initMsgTokens() { - var a []string - for _, f := range fa.filters { - switch t := f.(type) { - case *filterExact: - if isMsgFieldName(t.fieldName) { - a = append(a, t.getTokens()...) - } - case *filterExactPrefix: - if isMsgFieldName(t.fieldName) { - a = append(a, t.getTokens()...) - } - case *filterPhrase: - if isMsgFieldName(t.fieldName) { - a = append(a, t.getTokens()...) - } - case *filterPrefix: - if isMsgFieldName(t.fieldName) { - a = append(a, t.getTokens()...) - } - case *filterRegexp: - if isMsgFieldName(t.fieldName) { - a = append(a, t.getTokens()...) - } - case *filterSequence: - if isMsgFieldName(t.fieldName) { - a = append(a, t.getTokens()...) + if ch.valueType == valueTypeDict { + if !matchDictValuesByAllTokens(ch.valuesDict.values, tokens) { + return false } + continue + } + if !matchBloomFilterAllTokens(bs, ch, tokens) { + return false } } - fa.msgTokens = a + + return true +} + +func (fa *filterAnd) getByFieldTokens() []fieldTokens { + fa.byFieldTokensOnce.Do(fa.initByFieldTokens) + return fa.byFieldTokens +} + +func (fa *filterAnd) initByFieldTokens() { + m := make(map[string][]string) + byFieldFilters := make(map[string]int) + var fieldNames []string + + for _, f := range fa.filters { + fieldName := "" + var tokens []string + + switch t := f.(type) { + case *filterExact: + fieldName = t.fieldName + tokens = t.getTokens() + case *filterExactPrefix: + fieldName = t.fieldName + tokens = t.getTokens() + case *filterPhrase: + fieldName = t.fieldName + tokens = t.getTokens() + case *filterPrefix: + fieldName = t.fieldName + tokens = t.getTokens() + case *filterRegexp: + fieldName = t.fieldName + tokens = t.getTokens() + case *filterSequence: + fieldName = t.fieldName + tokens = t.getTokens() + } + + fieldName = getCanonicalColumnName(fieldName) + byFieldFilters[fieldName]++ + + if len(tokens) > 0 { + a, ok := m[fieldName] + if !ok { + fieldNames = append(fieldNames, fieldName) + } + m[fieldName] = append(a, tokens...) + } + } + + var byFieldTokens []fieldTokens + for _, fieldName := range fieldNames { + if byFieldFilters[fieldName] < 2 { + // It is faster to perform bloom filter match inline when visiting the corresponding column + continue + } + byFieldTokens = append(byFieldTokens, fieldTokens{ + field: fieldName, + tokens: m[fieldName], + }) + } + + fa.byFieldTokens = byFieldTokens } func matchStringByAllTokens(v string, tokens []string) bool {