diff --git a/lib/logstorage/block.go b/lib/logstorage/block.go index 3b6cd073f..0d1f9026a 100644 --- a/lib/logstorage/block.go +++ b/lib/logstorage/block.go @@ -149,8 +149,8 @@ func (c *column) resizeValues(valuesLen int) []string { // mustWriteTo writes c to sw and updates ch accordingly. // -// ch is valid until a.reset() is called. -func (c *column) mustWriteTo(a *arena, ch *columnHeader, sw *streamWriters) { +// ch is valid until c is changed. +func (c *column) mustWriteToNoArena(ch *columnHeader, sw *streamWriters) { ch.reset() valuesWriter := &sw.fieldValuesWriter @@ -160,7 +160,7 @@ func (c *column) mustWriteTo(a *arena, ch *columnHeader, sw *streamWriters) { bloomFilterWriter = &sw.messageBloomFilterWriter } - ch.name = a.copyString(c.name) + ch.name = c.name // encode values ve := getValuesEncoder() @@ -454,20 +454,18 @@ func (b *block) mustWriteTo(sid *streamID, bh *blockHeader, sw *streamWriters) { // Marshal columns cs := b.columns - a := getArena() csh := getColumnsHeader() chs := csh.resizeColumnHeaders(len(cs)) for i := range cs { - cs[i].mustWriteTo(a, &chs[i], sw) + cs[i].mustWriteToNoArena(&chs[i], sw) } - csh.constColumns = appendFields(a, csh.constColumns[:0], b.constColumns) + csh.constColumns = append(csh.constColumns[:0], b.constColumns...) bb := longTermBufPool.Get() bb.B = csh.marshal(bb.B) putColumnsHeader(csh) - putArena(a) bh.columnsHeaderOffset = sw.columnsHeaderWriter.bytesWritten bh.columnsHeaderSize = uint64(len(bb.B)) diff --git a/lib/logstorage/block_data.go b/lib/logstorage/block_data.go index 308e4d109..9f009c098 100644 --- a/lib/logstorage/block_data.go +++ b/lib/logstorage/block_data.go @@ -110,20 +110,18 @@ func (bd *blockData) mustWriteTo(bh *blockHeader, sw *streamWriters) { // Marshal columns cds := bd.columnsData - a := getArena() csh := getColumnsHeader() chs := csh.resizeColumnHeaders(len(cds)) for i := range cds { - cds[i].mustWriteTo(a, &chs[i], sw) + cds[i].mustWriteToNoArena(&chs[i], sw) } - csh.constColumns = appendFields(a, csh.constColumns[:0], bd.constColumns) + csh.constColumns = append(csh.constColumns[:0], bd.constColumns...) bb := longTermBufPool.Get() bb.B = csh.marshal(bb.B) putColumnsHeader(csh) - putArena(a) bh.columnsHeaderOffset = sw.columnsHeaderWriter.bytesWritten bh.columnsHeaderSize = uint64(len(bb.B)) @@ -310,8 +308,8 @@ func (cd *columnData) copyFrom(a *arena, src *columnData) { // mustWriteTo writes cd to sw and updates ch accordingly. // -// ch is valid until a.reset() is called. -func (cd *columnData) mustWriteTo(a *arena, ch *columnHeader, sw *streamWriters) { +// ch is valid until cd is changed. +func (cd *columnData) mustWriteToNoArena(ch *columnHeader, sw *streamWriters) { ch.reset() valuesWriter := &sw.fieldValuesWriter @@ -321,12 +319,12 @@ func (cd *columnData) mustWriteTo(a *arena, ch *columnHeader, sw *streamWriters) bloomFilterWriter = &sw.messageBloomFilterWriter } - ch.name = a.copyString(cd.name) + ch.name = cd.name ch.valueType = cd.valueType ch.minValue = cd.minValue ch.maxValue = cd.maxValue - ch.valuesDict.copyFrom(a, &cd.valuesDict) + ch.valuesDict.copyFromNoArena(&cd.valuesDict) // marshal values ch.valuesSize = uint64(len(cd.valuesData)) diff --git a/lib/logstorage/filter_and.go b/lib/logstorage/filter_and.go index 7b4a46e02..fd5765004 100644 --- a/lib/logstorage/filter_and.go +++ b/lib/logstorage/filter_and.go @@ -3,6 +3,8 @@ package logstorage import ( "strings" "sync" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" ) // filterAnd contains filters joined by AND opertor. @@ -30,19 +32,10 @@ func (fa *filterAnd) String() string { } func (fa *filterAnd) apply(bs *blockSearch, bm *bitmap) { - if tokens := fa.getMsgTokens(); len(tokens) > 0 { - // Verify whether fa tokens for the _msg field match bloom filter. - ch := bs.csh.getColumnHeader("_msg") - if ch == nil { - // Fast path - there is no _msg field in the block. - bm.resetBits() - return - } - if !matchBloomFilterAllTokens(bs, ch, tokens) { - // Fast path - fa tokens for the _msg field do not match bloom filter. - bm.resetBits() - return - } + if !fa.matchMessageBloomFilter(bs) { + // Fast path - fa doesn't match _msg bloom filter. + bm.resetBits() + return } // Slow path - verify every filter separately. @@ -56,7 +49,29 @@ func (fa *filterAnd) apply(bs *blockSearch, bm *bitmap) { } } -func (fa *filterAnd) getMsgTokens() []string { +func (fa *filterAnd) matchMessageBloomFilter(bs *blockSearch) bool { + tokens := fa.getMessageTokens() + if len(tokens) == 0 { + return true + } + + v := bs.csh.getConstColumnValue("_msg") + if v != "" { + return matchStringByAllTokens(v, tokens) + } + + ch := bs.csh.getColumnHeader("_msg") + if ch == nil { + return false + } + + if ch.valueType == valueTypeDict { + return matchDictValuesByAllTokens(ch.valuesDict.values, tokens) + } + return matchBloomFilterAllTokens(bs, ch, tokens) +} + +func (fa *filterAnd) getMessageTokens() []string { fa.msgTokensOnce.Do(fa.initMsgTokens) return fa.msgTokens } @@ -89,3 +104,24 @@ func (fa *filterAnd) initMsgTokens() { } fa.msgTokens = a } + +func matchStringByAllTokens(v string, tokens []string) bool { + for _, token := range tokens { + if !matchPhrase(v, token) { + return false + } + } + return true +} + +func matchDictValuesByAllTokens(dictValues, tokens []string) bool { + bb := bbPool.Get() + for _, v := range dictValues { + bb.B = append(bb.B, v...) + bb.B = append(bb.B, ',') + } + v := bytesutil.ToUnsafeString(bb.B) + ok := matchStringByAllTokens(v, tokens) + bbPool.Put(bb) + return ok +} diff --git a/lib/logstorage/values_encoder.go b/lib/logstorage/values_encoder.go index 131721b70..70f4c1f5f 100644 --- a/lib/logstorage/values_encoder.go +++ b/lib/logstorage/values_encoder.go @@ -70,6 +70,8 @@ func (ve *valuesEncoder) reset() { } // encode encodes values to ve.values and returns the encoded value type with min/max encoded values. +// +// ve.values and dict is valid until values are changed. func (ve *valuesEncoder) encode(values []string, dict *valuesDict) (valueType, uint64, uint64) { ve.reset() @@ -1091,6 +1093,16 @@ func (vd *valuesDict) copyFrom(a *arena, src *valuesDict) { vd.values = dstValues } +func (vd *valuesDict) copyFromNoArena(src *valuesDict) { + vd.reset() + + dstValues := vd.values + for _, v := range src.values { + dstValues = append(dstValues, v) + } + vd.values = dstValues +} + func (vd *valuesDict) getOrAdd(k string) (byte, bool) { if len(k) > maxDictSizeBytes { return 0, false