This commit is contained in:
Aliaksandr Valialkin 2024-05-13 14:00:33 +02:00
parent 51eb3134c0
commit ecd51e48ec
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB
4 changed files with 73 additions and 29 deletions

View file

@ -149,8 +149,8 @@ func (c *column) resizeValues(valuesLen int) []string {
// mustWriteTo writes c to sw and updates ch accordingly.
//
// ch is valid until a.reset() is called.
func (c *column) mustWriteTo(a *arena, ch *columnHeader, sw *streamWriters) {
// ch is valid until c is changed.
func (c *column) mustWriteToNoArena(ch *columnHeader, sw *streamWriters) {
ch.reset()
valuesWriter := &sw.fieldValuesWriter
@ -160,7 +160,7 @@ func (c *column) mustWriteTo(a *arena, ch *columnHeader, sw *streamWriters) {
bloomFilterWriter = &sw.messageBloomFilterWriter
}
ch.name = a.copyString(c.name)
ch.name = c.name
// encode values
ve := getValuesEncoder()
@ -454,20 +454,18 @@ func (b *block) mustWriteTo(sid *streamID, bh *blockHeader, sw *streamWriters) {
// Marshal columns
cs := b.columns
a := getArena()
csh := getColumnsHeader()
chs := csh.resizeColumnHeaders(len(cs))
for i := range cs {
cs[i].mustWriteTo(a, &chs[i], sw)
cs[i].mustWriteToNoArena(&chs[i], sw)
}
csh.constColumns = appendFields(a, csh.constColumns[:0], b.constColumns)
csh.constColumns = append(csh.constColumns[:0], b.constColumns...)
bb := longTermBufPool.Get()
bb.B = csh.marshal(bb.B)
putColumnsHeader(csh)
putArena(a)
bh.columnsHeaderOffset = sw.columnsHeaderWriter.bytesWritten
bh.columnsHeaderSize = uint64(len(bb.B))

View file

@ -110,20 +110,18 @@ func (bd *blockData) mustWriteTo(bh *blockHeader, sw *streamWriters) {
// Marshal columns
cds := bd.columnsData
a := getArena()
csh := getColumnsHeader()
chs := csh.resizeColumnHeaders(len(cds))
for i := range cds {
cds[i].mustWriteTo(a, &chs[i], sw)
cds[i].mustWriteToNoArena(&chs[i], sw)
}
csh.constColumns = appendFields(a, csh.constColumns[:0], bd.constColumns)
csh.constColumns = append(csh.constColumns[:0], bd.constColumns...)
bb := longTermBufPool.Get()
bb.B = csh.marshal(bb.B)
putColumnsHeader(csh)
putArena(a)
bh.columnsHeaderOffset = sw.columnsHeaderWriter.bytesWritten
bh.columnsHeaderSize = uint64(len(bb.B))
@ -310,8 +308,8 @@ func (cd *columnData) copyFrom(a *arena, src *columnData) {
// mustWriteTo writes cd to sw and updates ch accordingly.
//
// ch is valid until a.reset() is called.
func (cd *columnData) mustWriteTo(a *arena, ch *columnHeader, sw *streamWriters) {
// ch is valid until cd is changed.
func (cd *columnData) mustWriteToNoArena(ch *columnHeader, sw *streamWriters) {
ch.reset()
valuesWriter := &sw.fieldValuesWriter
@ -321,12 +319,12 @@ func (cd *columnData) mustWriteTo(a *arena, ch *columnHeader, sw *streamWriters)
bloomFilterWriter = &sw.messageBloomFilterWriter
}
ch.name = a.copyString(cd.name)
ch.name = cd.name
ch.valueType = cd.valueType
ch.minValue = cd.minValue
ch.maxValue = cd.maxValue
ch.valuesDict.copyFrom(a, &cd.valuesDict)
ch.valuesDict.copyFromNoArena(&cd.valuesDict)
// marshal values
ch.valuesSize = uint64(len(cd.valuesData))

View file

@ -3,6 +3,8 @@ package logstorage
import (
"strings"
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
)
// filterAnd contains filters joined by AND opertor.
@ -30,19 +32,10 @@ func (fa *filterAnd) String() string {
}
func (fa *filterAnd) apply(bs *blockSearch, bm *bitmap) {
if tokens := fa.getMsgTokens(); len(tokens) > 0 {
// Verify whether fa tokens for the _msg field match bloom filter.
ch := bs.csh.getColumnHeader("_msg")
if ch == nil {
// Fast path - there is no _msg field in the block.
bm.resetBits()
return
}
if !matchBloomFilterAllTokens(bs, ch, tokens) {
// Fast path - fa tokens for the _msg field do not match bloom filter.
bm.resetBits()
return
}
if !fa.matchMessageBloomFilter(bs) {
// Fast path - fa doesn't match _msg bloom filter.
bm.resetBits()
return
}
// Slow path - verify every filter separately.
@ -56,7 +49,29 @@ func (fa *filterAnd) apply(bs *blockSearch, bm *bitmap) {
}
}
func (fa *filterAnd) getMsgTokens() []string {
func (fa *filterAnd) matchMessageBloomFilter(bs *blockSearch) bool {
tokens := fa.getMessageTokens()
if len(tokens) == 0 {
return true
}
v := bs.csh.getConstColumnValue("_msg")
if v != "" {
return matchStringByAllTokens(v, tokens)
}
ch := bs.csh.getColumnHeader("_msg")
if ch == nil {
return false
}
if ch.valueType == valueTypeDict {
return matchDictValuesByAllTokens(ch.valuesDict.values, tokens)
}
return matchBloomFilterAllTokens(bs, ch, tokens)
}
func (fa *filterAnd) getMessageTokens() []string {
fa.msgTokensOnce.Do(fa.initMsgTokens)
return fa.msgTokens
}
@ -89,3 +104,24 @@ func (fa *filterAnd) initMsgTokens() {
}
fa.msgTokens = a
}
func matchStringByAllTokens(v string, tokens []string) bool {
for _, token := range tokens {
if !matchPhrase(v, token) {
return false
}
}
return true
}
func matchDictValuesByAllTokens(dictValues, tokens []string) bool {
bb := bbPool.Get()
for _, v := range dictValues {
bb.B = append(bb.B, v...)
bb.B = append(bb.B, ',')
}
v := bytesutil.ToUnsafeString(bb.B)
ok := matchStringByAllTokens(v, tokens)
bbPool.Put(bb)
return ok
}

View file

@ -70,6 +70,8 @@ func (ve *valuesEncoder) reset() {
}
// encode encodes values to ve.values and returns the encoded value type with min/max encoded values.
//
// ve.values and dict is valid until values are changed.
func (ve *valuesEncoder) encode(values []string, dict *valuesDict) (valueType, uint64, uint64) {
ve.reset()
@ -1091,6 +1093,16 @@ func (vd *valuesDict) copyFrom(a *arena, src *valuesDict) {
vd.values = dstValues
}
func (vd *valuesDict) copyFromNoArena(src *valuesDict) {
vd.reset()
dstValues := vd.values
for _, v := range src.values {
dstValues = append(dstValues, v)
}
vd.values = dstValues
}
func (vd *valuesDict) getOrAdd(k string) (byte, bool) {
if len(k) > maxDictSizeBytes {
return 0, false