From e42b9d26a998d198f2bd562d85e4df559b1cf03d Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Wed, 24 Apr 2024 23:24:07 +0200 Subject: [PATCH] lib/logstorage: skip log entries if it is impossible to find stream tags for them Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6042 Updates https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6127 --- lib/logstorage/block_search.go | 34 ++++++++++++++++++++++++---------- 1 file changed, 24 insertions(+), 10 deletions(-) diff --git a/lib/logstorage/block_search.go b/lib/logstorage/block_search.go index d9be8ea29..b0f7a324d 100644 --- a/lib/logstorage/block_search.go +++ b/lib/logstorage/block_search.go @@ -105,12 +105,14 @@ func (bs *blockSearch) search(bsw *blockSearchWork) { // search rows matching the given filter bm := getFilterBitmap(int(bsw.bh.rowsCount)) + defer putFilterBitmap(bm) + bm.setBits() bs.bsw.so.filter.apply(bs, bm) bs.br.mustInit(bs, bm) if bm.isZero() { - putFilterBitmap(bm) + // The filter doesn't match any logs in the current block. return } @@ -118,7 +120,11 @@ func (bs *blockSearch) search(bsw *blockSearchWork) { for _, columnName := range bs.bsw.so.resultColumnNames { switch columnName { case "_stream": - bs.br.addStreamColumn(bs) + if !bs.br.addStreamColumn(bs) { + // Skip the current block, since the associated stream tags are missing. + bs.br.reset() + return + } case "_time": bs.br.addTimeColumn() default: @@ -135,7 +141,6 @@ func (bs *blockSearch) search(bsw *blockSearchWork) { } } } - putFilterBitmap(bm) } func (csh *columnsHeader) initFromBlockHeader(p *part, bh *blockHeader) { @@ -458,18 +463,27 @@ func (br *blockResult) addTimeColumn() { }) } -func (br *blockResult) addStreamColumn(bs *blockSearch) { +func (br *blockResult) addStreamColumn(bs *blockSearch) bool { bb := bbPool.Get() + defer bbPool.Put(bb) + bb.B = bs.bsw.p.pt.appendStreamTagsByStreamID(bb.B[:0], &br.streamID) - if len(bb.B) > 0 { - st := GetStreamTags() - mustUnmarshalStreamTags(st, bb.B) - bb.B = st.marshalString(bb.B[:0]) - PutStreamTags(st) + if len(bb.B) == 0 { + // Couldn't find stream tags by streamID. This may be the case when the corresponding log stream + // was recently registered and its tags aren't visible to search yet. + // The stream tags must become visible in a few seconds. + // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6042 + return false } + + st := GetStreamTags() + mustUnmarshalStreamTags(st, bb.B) + bb.B = st.marshalString(bb.B[:0]) + PutStreamTags(st) + s := bytesutil.ToUnsafeString(bb.B) br.addConstColumn(s) - bbPool.Put(bb) + return true } func (br *blockResult) addConstColumn(value string) {