VictoriaMetrics/lib/logstorage/block_search.go
Aliaksandr Valialkin 8d968acd0a
lib/logstorage: avoid reading columnsHeader data when field_values pipe is applied directly to log filters
This improves performance of `field_values` pipe when it is applied to large number of data blocks.
This also improves performance of /select/logsql/field_values HTTP API.
2024-10-29 16:44:44 +01:00

587 lines
16 KiB
Go

package logstorage
import (
"strings"
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil"
)
// The number of blocks to search at once by a single worker
//
// This number must be increased on systems with many CPU cores in order to amortize
// the overhead for passing the blockSearchWork to worker goroutines.
const blockSearchWorksPerBatch = 64
type blockSearchWork struct {
// p is the part where the block belongs to.
p *part
// so contains search options for the block search.
so *searchOptions
// bh is the header of the block to search.
bh blockHeader
}
func (bsw *blockSearchWork) reset() {
bsw.p = nil
bsw.so = nil
bsw.bh.reset()
}
type blockSearchWorkBatch struct {
bsws []blockSearchWork
}
func (bswb *blockSearchWorkBatch) reset() {
bsws := bswb.bsws
for i := range bsws {
bsws[i].reset()
}
bswb.bsws = bsws[:0]
}
func getBlockSearchWorkBatch() *blockSearchWorkBatch {
v := blockSearchWorkBatchPool.Get()
if v == nil {
return &blockSearchWorkBatch{
bsws: make([]blockSearchWork, 0, blockSearchWorksPerBatch),
}
}
return v.(*blockSearchWorkBatch)
}
func putBlockSearchWorkBatch(bswb *blockSearchWorkBatch) {
bswb.reset()
blockSearchWorkBatchPool.Put(bswb)
}
var blockSearchWorkBatchPool sync.Pool
func (bswb *blockSearchWorkBatch) appendBlockSearchWork(p *part, so *searchOptions, bh *blockHeader) bool {
bsws := bswb.bsws
bsws = append(bsws, blockSearchWork{
p: p,
so: so,
})
bsw := &bsws[len(bsws)-1]
bsw.bh.copyFrom(bh)
bswb.bsws = bsws
return len(bsws) < cap(bsws)
}
func getBlockSearch() *blockSearch {
v := blockSearchPool.Get()
if v == nil {
return &blockSearch{}
}
return v.(*blockSearch)
}
func putBlockSearch(bs *blockSearch) {
bs.reset()
// reset seenStreams before returning bs to the pool in order to reduce memory usage.
bs.seenStreams = nil
blockSearchPool.Put(bs)
}
var blockSearchPool sync.Pool
type blockSearch struct {
// bsw is the actual work to perform on the given block pointed by bsw.ph
bsw *blockSearchWork
// br contains result for the search in the block after search() call
br blockResult
// timestampsCache contains cached timestamps for the given block.
timestampsCache *encoding.Int64s
// bloomFilterCache contains cached bloom filters for requested columns in the given block
bloomFilterCache map[string]*bloomFilter
// valuesCache contains cached values for requested columns in the given block
valuesCache map[string]*stringBucket
// sbu is used for unmarshaling local columns
sbu stringsBlockUnmarshaler
// cshIndexBlockCache holds columnsHeaderIndex data for the given block.
//
// It is initialized lazily by calling getColumnsHeaderIndex().
cshIndexBlockCache []byte
// cshBlockCache holds columnsHeader data for the given block.
//
// It is initialized lazily by calling getColumnsHeaderBlock().
cshBlockCache []byte
cshBlockInitialized bool
// ccsCache is the cache for accessed const columns
ccsCache []Field
// chsCache is the cache for accessed column headers
chsCache []columnHeader
// cshIndexCache is the columnsHeaderIndex associated with the given block.
//
// It is initialized lazily by calling getColumnsHeaderIndex().
cshIndexCache *columnsHeaderIndex
// cshCache is the columnsHeader associated with the given block.
//
// It is initialized lazily by calling getColumnsHeader().
cshCache *columnsHeader
// seenStreams contains seen streamIDs for the recent searches.
//
// It is used for speeding up fetching _stream column.
seenStreams map[u128]string
}
func (bs *blockSearch) reset() {
bs.bsw = nil
bs.br.reset()
if bs.timestampsCache != nil {
encoding.PutInt64s(bs.timestampsCache)
bs.timestampsCache = nil
}
bloomFilterCache := bs.bloomFilterCache
for k, bf := range bloomFilterCache {
putBloomFilter(bf)
delete(bloomFilterCache, k)
}
valuesCache := bs.valuesCache
for k, values := range valuesCache {
putStringBucket(values)
delete(valuesCache, k)
}
bs.sbu.reset()
bs.cshIndexBlockCache = bs.cshIndexBlockCache[:0]
bs.cshBlockCache = bs.cshBlockCache[:0]
bs.cshBlockInitialized = false
ccsCache := bs.ccsCache
for i := range ccsCache {
ccsCache[i].Reset()
}
bs.ccsCache = ccsCache[:0]
chsCache := bs.chsCache
for i := range chsCache {
chsCache[i].reset()
}
bs.chsCache = chsCache[:0]
if bs.cshIndexCache != nil {
putColumnsHeaderIndex(bs.cshIndexCache)
bs.cshIndexCache = nil
}
if bs.cshCache != nil {
putColumnsHeader(bs.cshCache)
bs.cshCache = nil
}
// Do not reset seenStreams, since its' lifetime is managed by blockResult.addStreamColumn() code.
}
func (bs *blockSearch) partPath() string {
return bs.bsw.p.path
}
func (bs *blockSearch) search(bsw *blockSearchWork, bm *bitmap) {
bs.reset()
bs.bsw = bsw
// search rows matching the given filter
bm.init(int(bsw.bh.rowsCount))
bm.setBits()
bs.bsw.so.filter.applyToBlockSearch(bs, bm)
if bm.isZero() {
// The filter doesn't match any logs in the current block.
return
}
bs.br.mustInit(bs, bm)
// fetch the requested columns to bs.br.
if bs.bsw.so.needAllColumns {
bs.br.initAllColumns()
} else {
bs.br.initRequestedColumns()
}
}
func (bs *blockSearch) partFormatVersion() uint {
return bs.bsw.p.ph.FormatVersion
}
func (bs *blockSearch) getConstColumnValue(name string) string {
if name == "_msg" {
name = ""
}
if bs.partFormatVersion() < 1 {
csh := bs.getColumnsHeader()
for _, cc := range csh.constColumns {
if cc.Name == name {
return cc.Value
}
}
return ""
}
columnNameID, ok := bs.getColumnNameID(name)
if !ok {
return ""
}
for i := range bs.ccsCache {
if bs.ccsCache[i].Name == name {
return bs.ccsCache[i].Value
}
}
cshIndex := bs.getColumnsHeaderIndex()
for _, cr := range cshIndex.constColumnsRefs {
if cr.columnNameID != columnNameID {
continue
}
b := bs.getColumnsHeaderBlock()
if cr.offset > uint64(len(b)) {
logger.Panicf("FATAL: %s: header offset for const column %q cannot exceed %d bytes; got %d bytes", bs.bsw.p.path, name, len(b), cr.offset)
}
b = b[cr.offset:]
bs.ccsCache = slicesutil.SetLength(bs.ccsCache, len(bs.ccsCache)+1)
cc := &bs.ccsCache[len(bs.ccsCache)-1]
if _, err := cc.unmarshalNoArena(b, false); err != nil {
logger.Panicf("FATAL: %s: cannot unmarshal header for const column %q: %s", bs.bsw.p.path, name, err)
}
cc.Name = strings.Clone(name)
return cc.Value
}
return ""
}
func (bs *blockSearch) getColumnHeader(name string) *columnHeader {
if name == "_msg" {
name = ""
}
if bs.partFormatVersion() < 1 {
csh := bs.getColumnsHeader()
chs := csh.columnHeaders
for i := range chs {
ch := &chs[i]
if ch.name == name {
return ch
}
}
return nil
}
columnNameID, ok := bs.getColumnNameID(name)
if !ok {
return nil
}
for i := range bs.chsCache {
if bs.chsCache[i].name == name {
return &bs.chsCache[i]
}
}
cshIndex := bs.getColumnsHeaderIndex()
for _, cr := range cshIndex.columnHeadersRefs {
if cr.columnNameID != columnNameID {
continue
}
b := bs.getColumnsHeaderBlock()
if cr.offset > uint64(len(b)) {
logger.Panicf("FATAL: %s: header offset for column %q cannot exceed %d bytes; got %d bytes", bs.bsw.p.path, name, len(b), cr.offset)
}
b = b[cr.offset:]
bs.chsCache = slicesutil.SetLength(bs.chsCache, len(bs.chsCache)+1)
ch := &bs.chsCache[len(bs.chsCache)-1]
if _, err := ch.unmarshalNoArena(b, partFormatLatestVersion); err != nil {
logger.Panicf("FATAL: %s: cannot unmarshal header for column %q: %s", bs.bsw.p.path, name, err)
}
ch.name = strings.Clone(name)
return ch
}
return nil
}
func (bs *blockSearch) getColumnNameID(name string) (uint64, bool) {
id, ok := bs.bsw.p.columnNameIDs[name]
return id, ok
}
func (bs *blockSearch) getColumnNameByID(columnNameID uint64) string {
columnNames := bs.bsw.p.columnNames
if columnNameID >= uint64(len(columnNames)) {
logger.Panicf("FATAL: %s: too big columnNameID=%d; it must be smaller than %d", bs.bsw.p.path, columnNameID, len(columnNames))
}
return columnNames[columnNameID]
}
func (bs *blockSearch) getColumnsHeaderIndex() *columnsHeaderIndex {
if bs.partFormatVersion() < 1 {
logger.Panicf("BUG: getColumnsHeaderIndex() can be called only for part encoding v1+, while it has been called for v%d", bs.partFormatVersion())
}
if bs.cshIndexCache == nil {
bs.cshIndexBlockCache = readColumnsHeaderIndexBlock(bs.cshIndexBlockCache[:0], bs.bsw.p, &bs.bsw.bh)
bs.cshIndexCache = getColumnsHeaderIndex()
if err := bs.cshIndexCache.unmarshalNoArena(bs.cshIndexBlockCache); err != nil {
logger.Panicf("FATAL: %s: cannot unmarshal columns header index: %s", bs.bsw.p.path, err)
}
}
return bs.cshIndexCache
}
func (bs *blockSearch) getColumnsHeader() *columnsHeader {
if bs.cshCache == nil {
b := bs.getColumnsHeaderBlock()
csh := getColumnsHeader()
partFormatVersion := bs.partFormatVersion()
if err := csh.unmarshalNoArena(b, partFormatVersion); err != nil {
logger.Panicf("FATAL: %s: cannot unmarshal columns header: %s", bs.bsw.p.path, err)
}
if partFormatVersion >= 1 {
cshIndex := bs.getColumnsHeaderIndex()
if err := csh.setColumnNames(cshIndex, bs.bsw.p.columnNames); err != nil {
logger.Panicf("FATAL: %s: %s", bs.bsw.p.path, err)
}
}
bs.cshCache = csh
}
return bs.cshCache
}
func (bs *blockSearch) getColumnsHeaderBlock() []byte {
if !bs.cshBlockInitialized {
bs.cshBlockCache = readColumnsHeaderBlock(bs.cshBlockCache[:0], bs.bsw.p, &bs.bsw.bh)
bs.cshBlockInitialized = true
}
return bs.cshBlockCache
}
func readColumnsHeaderIndexBlock(dst []byte, p *part, bh *blockHeader) []byte {
n := bh.columnsHeaderIndexSize
if n > maxColumnsHeaderIndexSize {
logger.Panicf("FATAL: %s: columns header index size cannot exceed %d bytes; got %d bytes", p.path, maxColumnsHeaderIndexSize, n)
}
dstLen := len(dst)
dst = bytesutil.ResizeNoCopyMayOverallocate(dst, int(n)+dstLen)
p.columnsHeaderIndexFile.MustReadAt(dst[dstLen:], int64(bh.columnsHeaderIndexOffset))
return dst
}
func readColumnsHeaderBlock(dst []byte, p *part, bh *blockHeader) []byte {
n := bh.columnsHeaderSize
if n > maxColumnsHeaderSize {
logger.Panicf("FATAL: %s: columns header size cannot exceed %d bytes; got %d bytes", p.path, maxColumnsHeaderSize, n)
}
dstLen := len(dst)
dst = bytesutil.ResizeNoCopyMayOverallocate(dst, int(n)+dstLen)
p.columnsHeaderFile.MustReadAt(dst[dstLen:], int64(bh.columnsHeaderOffset))
return dst
}
// getBloomFilterForColumn returns bloom filter for the given ch.
//
// The returned bloom filter belongs to bs, so it becomes invalid after bs reset.
func (bs *blockSearch) getBloomFilterForColumn(ch *columnHeader) *bloomFilter {
bf := bs.bloomFilterCache[ch.name]
if bf != nil {
return bf
}
p := bs.bsw.p
bloomValuesFile := p.getBloomValuesFileForColumnName(ch.name)
bb := longTermBufPool.Get()
bloomFilterSize := ch.bloomFilterSize
if bloomFilterSize > maxBloomFilterBlockSize {
logger.Panicf("FATAL: %s: bloom filter block size cannot exceed %d bytes; got %d bytes", bs.partPath(), maxBloomFilterBlockSize, bloomFilterSize)
}
bb.B = bytesutil.ResizeNoCopyMayOverallocate(bb.B, int(bloomFilterSize))
bloomValuesFile.bloom.MustReadAt(bb.B, int64(ch.bloomFilterOffset))
bf = getBloomFilter()
if err := bf.unmarshal(bb.B); err != nil {
logger.Panicf("FATAL: %s: cannot unmarshal bloom filter: %s", bs.partPath(), err)
}
longTermBufPool.Put(bb)
if bs.bloomFilterCache == nil {
bs.bloomFilterCache = make(map[string]*bloomFilter)
}
bs.bloomFilterCache[ch.name] = bf
return bf
}
// getValuesForColumn returns block values for the given ch.
//
// The returned values belong to bs, so they become invalid after bs reset.
func (bs *blockSearch) getValuesForColumn(ch *columnHeader) []string {
values := bs.valuesCache[ch.name]
if values != nil {
return values.a
}
p := bs.bsw.p
bloomValuesFile := p.getBloomValuesFileForColumnName(ch.name)
bb := longTermBufPool.Get()
valuesSize := ch.valuesSize
if valuesSize > maxValuesBlockSize {
logger.Panicf("FATAL: %s: values block size cannot exceed %d bytes; got %d bytes", bs.partPath(), maxValuesBlockSize, valuesSize)
}
bb.B = bytesutil.ResizeNoCopyMayOverallocate(bb.B, int(valuesSize))
bloomValuesFile.values.MustReadAt(bb.B, int64(ch.valuesOffset))
values = getStringBucket()
var err error
values.a, err = bs.sbu.unmarshal(values.a[:0], bb.B, bs.bsw.bh.rowsCount)
longTermBufPool.Put(bb)
if err != nil {
logger.Panicf("FATAL: %s: cannot unmarshal column %q: %s", bs.partPath(), ch.name, err)
}
if bs.valuesCache == nil {
bs.valuesCache = make(map[string]*stringBucket)
}
bs.valuesCache[ch.name] = values
return values.a
}
// getTimestamps returns timestamps for the given bs.
//
// The returned timestamps belong to bs, so they become invalid after bs reset.
func (bs *blockSearch) getTimestamps() []int64 {
timestamps := bs.timestampsCache
if timestamps != nil {
return timestamps.A
}
p := bs.bsw.p
bb := longTermBufPool.Get()
th := &bs.bsw.bh.timestampsHeader
blockSize := th.blockSize
if blockSize > maxTimestampsBlockSize {
logger.Panicf("FATAL: %s: timestamps block size cannot exceed %d bytes; got %d bytes", bs.partPath(), maxTimestampsBlockSize, blockSize)
}
bb.B = bytesutil.ResizeNoCopyMayOverallocate(bb.B, int(blockSize))
p.timestampsFile.MustReadAt(bb.B, int64(th.blockOffset))
rowsCount := int(bs.bsw.bh.rowsCount)
timestamps = encoding.GetInt64s(rowsCount)
var err error
timestamps.A, err = encoding.UnmarshalTimestamps(timestamps.A[:0], bb.B, th.marshalType, th.minTimestamp, rowsCount)
longTermBufPool.Put(bb)
if err != nil {
logger.Panicf("FATAL: %s: cannot unmarshal timestamps: %s", bs.partPath(), err)
}
bs.timestampsCache = timestamps
return timestamps.A
}
// mustReadBlockHeaders reads ih block headers from p, appends them to dst and returns the result.
func (ih *indexBlockHeader) mustReadBlockHeaders(dst []blockHeader, p *part) []blockHeader {
bbCompressed := longTermBufPool.Get()
indexBlockSize := ih.indexBlockSize
if indexBlockSize > maxIndexBlockSize {
logger.Panicf("FATAL: %s: index block size cannot exceed %d bytes; got %d bytes", p.indexFile.Path(), maxIndexBlockSize, indexBlockSize)
}
bbCompressed.B = bytesutil.ResizeNoCopyMayOverallocate(bbCompressed.B, int(indexBlockSize))
p.indexFile.MustReadAt(bbCompressed.B, int64(ih.indexBlockOffset))
bb := longTermBufPool.Get()
var err error
bb.B, err = encoding.DecompressZSTD(bb.B, bbCompressed.B)
longTermBufPool.Put(bbCompressed)
if err != nil {
logger.Panicf("FATAL: %s: cannot decompress indexBlock read at offset %d with size %d: %s", p.indexFile.Path(), ih.indexBlockOffset, ih.indexBlockSize, err)
}
dst, err = unmarshalBlockHeaders(dst, bb.B, p.ph.FormatVersion)
longTermBufPool.Put(bb)
if err != nil {
logger.Panicf("FATAL: %s: cannot unmarshal block headers read at offset %d with size %d: %s", p.indexFile.Path(), ih.indexBlockOffset, ih.indexBlockSize, err)
}
return dst
}
// getStreamStr returns _stream value for the given block at bs.
func (bs *blockSearch) getStreamStr() string {
sid := bs.bsw.bh.streamID.id
streamStr := bs.seenStreams[sid]
if streamStr != "" {
// Fast path - streamStr is found in the seenStreams.
return streamStr
}
// Slow path - load streamStr from the storage.
streamStr = bs.getStreamStrSlow()
if streamStr != "" {
// Store the found streamStr in seenStreams.
if len(bs.seenStreams) > 20_000 {
bs.seenStreams = nil
}
if bs.seenStreams == nil {
bs.seenStreams = make(map[u128]string)
}
bs.seenStreams[sid] = streamStr
}
return streamStr
}
func (bs *blockSearch) getStreamStrSlow() string {
bb := bbPool.Get()
defer bbPool.Put(bb)
bb.B = bs.bsw.p.pt.idb.appendStreamTagsByStreamID(bb.B[:0], &bs.bsw.bh.streamID)
if len(bb.B) == 0 {
// Couldn't find stream tags by sid. This may be the case when the corresponding log stream
// was recently registered and its tags aren't visible to search yet.
// The stream tags must become visible in a few seconds.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6042
return ""
}
st := GetStreamTags()
mustUnmarshalStreamTags(st, bb.B)
bb.B = st.marshalString(bb.B[:0])
PutStreamTags(st)
return string(bb.B)
}