lib/logstorage: skip log entries if it is impossible to find stream tags for them

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6042
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6127
This commit is contained in:
Aliaksandr Valialkin 2024-04-24 23:24:07 +02:00
parent 543a6b9cee
commit e42b9d26a9
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB

View file

@ -105,12 +105,14 @@ func (bs *blockSearch) search(bsw *blockSearchWork) {
// search rows matching the given filter // search rows matching the given filter
bm := getFilterBitmap(int(bsw.bh.rowsCount)) bm := getFilterBitmap(int(bsw.bh.rowsCount))
defer putFilterBitmap(bm)
bm.setBits() bm.setBits()
bs.bsw.so.filter.apply(bs, bm) bs.bsw.so.filter.apply(bs, bm)
bs.br.mustInit(bs, bm) bs.br.mustInit(bs, bm)
if bm.isZero() { if bm.isZero() {
putFilterBitmap(bm) // The filter doesn't match any logs in the current block.
return return
} }
@ -118,7 +120,11 @@ func (bs *blockSearch) search(bsw *blockSearchWork) {
for _, columnName := range bs.bsw.so.resultColumnNames { for _, columnName := range bs.bsw.so.resultColumnNames {
switch columnName { switch columnName {
case "_stream": case "_stream":
bs.br.addStreamColumn(bs) if !bs.br.addStreamColumn(bs) {
// Skip the current block, since the associated stream tags are missing.
bs.br.reset()
return
}
case "_time": case "_time":
bs.br.addTimeColumn() bs.br.addTimeColumn()
default: default:
@ -135,7 +141,6 @@ func (bs *blockSearch) search(bsw *blockSearchWork) {
} }
} }
} }
putFilterBitmap(bm)
} }
func (csh *columnsHeader) initFromBlockHeader(p *part, bh *blockHeader) { func (csh *columnsHeader) initFromBlockHeader(p *part, bh *blockHeader) {
@ -458,18 +463,27 @@ func (br *blockResult) addTimeColumn() {
}) })
} }
func (br *blockResult) addStreamColumn(bs *blockSearch) { func (br *blockResult) addStreamColumn(bs *blockSearch) bool {
bb := bbPool.Get() bb := bbPool.Get()
defer bbPool.Put(bb)
bb.B = bs.bsw.p.pt.appendStreamTagsByStreamID(bb.B[:0], &br.streamID) bb.B = bs.bsw.p.pt.appendStreamTagsByStreamID(bb.B[:0], &br.streamID)
if len(bb.B) > 0 { if len(bb.B) == 0 {
st := GetStreamTags() // Couldn't find stream tags by streamID. This may be the case when the corresponding log stream
mustUnmarshalStreamTags(st, bb.B) // was recently registered and its tags aren't visible to search yet.
bb.B = st.marshalString(bb.B[:0]) // The stream tags must become visible in a few seconds.
PutStreamTags(st) // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6042
return false
} }
st := GetStreamTags()
mustUnmarshalStreamTags(st, bb.B)
bb.B = st.marshalString(bb.B[:0])
PutStreamTags(st)
s := bytesutil.ToUnsafeString(bb.B) s := bytesutil.ToUnsafeString(bb.B)
br.addConstColumn(s) br.addConstColumn(s)
bbPool.Put(bb) return true
} }
func (br *blockResult) addConstColumn(value string) { func (br *blockResult) addConstColumn(value string) {