mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-12-31 15:06:26 +00:00
lib/logstorage: optimize typical AND case
This commit is contained in:
parent
ffdafc32c6
commit
984dd2abaf
1 changed files with 60 additions and 6 deletions
|
@ -214,6 +214,9 @@ func (of *orFilter) apply(bs *blockSearch, bm *filterBitmap) {
|
|||
// It is expressed as `f1 AND f2 ... AND fN` in LogsQL.
|
||||
type andFilter struct {
|
||||
filters []filter
|
||||
|
||||
msgTokensOnce sync.Once
|
||||
msgTokens []string
|
||||
}
|
||||
|
||||
func (af *andFilter) String() string {
|
||||
|
@ -237,6 +240,22 @@ func (af *andFilter) updateReferencedColumnNames(m map[string]struct{}) {
|
|||
}
|
||||
|
||||
func (af *andFilter) apply(bs *blockSearch, bm *filterBitmap) {
|
||||
if tokens := af.getMsgTokens(); len(tokens) > 0 {
|
||||
// Verify whether af tokens for the _msg field match bloom filter.
|
||||
ch := bs.csh.getColumnHeader("_msg")
|
||||
if ch == nil {
|
||||
// Fast path - there is no _msg field in the block.
|
||||
bm.resetBits()
|
||||
return
|
||||
}
|
||||
if !matchBloomFilterAllTokens(bs, ch, tokens) {
|
||||
// Fast path - af tokens for the _msg field do not match bloom filter.
|
||||
bm.resetBits()
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// Slow path - verify every filter separately.
|
||||
for _, f := range af.filters {
|
||||
f.apply(bs, bm)
|
||||
if bm.isZero() {
|
||||
|
@ -247,6 +266,40 @@ func (af *andFilter) apply(bs *blockSearch, bm *filterBitmap) {
|
|||
}
|
||||
}
|
||||
|
||||
func (af *andFilter) getMsgTokens() []string {
|
||||
af.msgTokensOnce.Do(af.initMsgTokens)
|
||||
return af.msgTokens
|
||||
}
|
||||
|
||||
func (af *andFilter) initMsgTokens() {
|
||||
var a []string
|
||||
for _, f := range af.filters {
|
||||
switch t := f.(type) {
|
||||
case *phraseFilter:
|
||||
if isMsgFieldName(t.fieldName) {
|
||||
a = append(a, t.getTokens()...)
|
||||
}
|
||||
case *sequenceFilter:
|
||||
if isMsgFieldName(t.fieldName) {
|
||||
a = append(a, t.getTokens()...)
|
||||
}
|
||||
case *exactFilter:
|
||||
if isMsgFieldName(t.fieldName) {
|
||||
a = append(a, t.getTokens()...)
|
||||
}
|
||||
case *exactPrefixFilter:
|
||||
if isMsgFieldName(t.fieldName) {
|
||||
a = append(a, t.getTokens()...)
|
||||
}
|
||||
case *prefixFilter:
|
||||
if isMsgFieldName(t.fieldName) {
|
||||
a = append(a, t.getTokens()...)
|
||||
}
|
||||
}
|
||||
}
|
||||
af.msgTokens = a
|
||||
}
|
||||
|
||||
// notFilter negates the filter.
|
||||
//
|
||||
// It is expressed as `NOT f` or `!f` in LogsQL.
|
||||
|
@ -2933,11 +2986,8 @@ type stringBucket struct {
|
|||
}
|
||||
|
||||
func (sb *stringBucket) reset() {
|
||||
a := sb.a
|
||||
for i := range a {
|
||||
a[i] = ""
|
||||
}
|
||||
sb.a = a[:0]
|
||||
clear(sb.a)
|
||||
sb.a = sb.a[:0]
|
||||
}
|
||||
|
||||
func getStringBucket() *stringBucket {
|
||||
|
@ -2983,12 +3033,16 @@ func toUint64Clamp(f float64) uint64 {
|
|||
}
|
||||
|
||||
func quoteFieldNameIfNeeded(s string) string {
|
||||
if s == "_msg" || s == "" {
|
||||
if isMsgFieldName(s) {
|
||||
return ""
|
||||
}
|
||||
return quoteTokenIfNeeded(s) + ":"
|
||||
}
|
||||
|
||||
func isMsgFieldName(fieldName string) bool {
|
||||
return fieldName == "" || fieldName == "_msg"
|
||||
}
|
||||
|
||||
func toUint8String(bs *blockSearch, bb *bytesutil.ByteBuffer, v string) string {
|
||||
if len(v) != 1 {
|
||||
logger.Panicf("FATAL: %s: unexpected length for binary representation of uint8 number: got %d; want 1", bs.partPath(), len(v))
|
||||
|
|
Loading…
Reference in a new issue