lib/logstorage: improve the performance of obtaining _stream column value

Substitute global streamTagsCache with per-blockSearch cache for ((stream.id) -> (_stream value)) entries.
This improves scalability of obtaining _stream values on a machine with many CPU cores, since every CPU
has its own blockSearch instance.

This also should reduce memory usage when querying logs over big number of streams, since per-blockSearch
cache of ((stream.id) -> (_stream value)) entries is limited in size, and its lifetime is bounded by a single query.
This commit is contained in:
Aliaksandr Valialkin 2024-09-24 20:51:13 +02:00
parent cf2e7d0d92
commit 7f1ba18719
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB
4 changed files with 57 additions and 80 deletions

View file

@ -500,43 +500,11 @@ func (br *blockResult) addStreamIDColumn(bs *blockSearch) {
}
func (br *blockResult) addStreamColumn(bs *blockSearch) bool {
if !bs.prevStreamID.equal(&bs.bsw.bh.streamID) {
return br.addStreamColumnSlow(bs)
}
if len(bs.prevStream) == 0 {
streamStr := bs.getStreamStr()
if streamStr == "" {
return false
}
br.addConstColumn("_stream", bytesutil.ToUnsafeString(bs.prevStream))
return true
}
func (br *blockResult) addStreamColumnSlow(bs *blockSearch) bool {
bb := bbPool.Get()
defer bbPool.Put(bb)
streamID := &bs.bsw.bh.streamID
bb.B = bs.bsw.p.pt.appendStreamTagsByStreamID(bb.B[:0], streamID)
if len(bb.B) == 0 {
// Couldn't find stream tags by streamID. This may be the case when the corresponding log stream
// was recently registered and its tags aren't visible to search yet.
// The stream tags must become visible in a few seconds.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6042
bs.prevStreamID = *streamID
bs.prevStream = bs.prevStream[:0]
return false
}
st := GetStreamTags()
mustUnmarshalStreamTags(st, bb.B)
bb.B = st.marshalString(bb.B[:0])
PutStreamTags(st)
s := bytesutil.ToUnsafeString(bb.B)
br.addConstColumn("_stream", s)
bs.prevStreamID = *streamID
bs.prevStream = append(bs.prevStream[:0], s...)
br.addConstColumn("_stream", streamStr)
return true
}

View file

@ -85,6 +85,10 @@ func getBlockSearch() *blockSearch {
func putBlockSearch(bs *blockSearch) {
bs.reset()
// reset seenStreams before returning bs to the pool in order to reduce memory usage.
bs.seenStreams = nil
blockSearchPool.Put(bs)
}
@ -115,10 +119,9 @@ type blockSearch struct {
// a is used for storing unmarshaled data in csh
a arena
// prevStreamID and prevStream are used for speeding up fetching _stream columns
// across sequential blocks belonging to the same stream.
prevStreamID streamID
prevStream []byte
// seenStreams contains seen streamIDs for the recent searches.
// It is used for speeding up fetching _stream column.
seenStreams map[u128]string
}
func (bs *blockSearch) reset() {
@ -145,6 +148,8 @@ func (bs *blockSearch) reset() {
bs.sbu.reset()
bs.csh.reset()
bs.a.reset()
// Do not reset seenStreams, since its' lifetime is managed by blockResult.addStreamColumn() code.
}
func (bs *blockSearch) partPath() string {
@ -326,3 +331,48 @@ func (ih *indexBlockHeader) mustReadBlockHeaders(dst []blockHeader, p *part) []b
return dst
}
// getStreamStr returns _stream value for the given block at bs.
func (bs *blockSearch) getStreamStr() string {
sid := bs.bsw.bh.streamID.id
streamStr := bs.seenStreams[sid]
if streamStr != "" {
// Fast path - streamStr is found in the seenStreams.
return streamStr
}
// Slow path - load streamStr from the storage.
streamStr = bs.getStreamStrSlow()
if streamStr != "" {
// Store the found streamStr in seenStreams.
if len(bs.seenStreams) > 20_000 {
bs.seenStreams = nil
}
if bs.seenStreams == nil {
bs.seenStreams = make(map[u128]string)
}
bs.seenStreams[sid] = streamStr
}
return streamStr
}
func (bs *blockSearch) getStreamStrSlow() string {
bb := bbPool.Get()
defer bbPool.Put(bb)
bb.B = bs.bsw.p.pt.idb.appendStreamTagsByStreamID(bb.B[:0], &bs.bsw.bh.streamID)
if len(bb.B) == 0 {
// Couldn't find stream tags by sid. This may be the case when the corresponding log stream
// was recently registered and its tags aren't visible to search yet.
// The stream tags must become visible in a few seconds.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6042
return ""
}
st := GetStreamTags()
mustUnmarshalStreamTags(st, bb.B)
bb.B = st.marshalString(bb.B[:0])
PutStreamTags(st)
return string(bb.B)
}

View file

@ -159,32 +159,6 @@ func (pt *partition) logIngestedRows(lr *LogRows) {
}
}
// appendStreamTagsByStreamID appends canonical representation of stream tags for the given sid to dst
// and returns the result.
func (pt *partition) appendStreamTagsByStreamID(dst []byte, sid *streamID) []byte {
// Search for the StreamTags in the cache.
key := bbPool.Get()
defer bbPool.Put(key)
// There is no need in putting partition name into key here,
// since StreamTags is uniquely identified by streamID.
key.B = sid.marshal(key.B)
dstLen := len(dst)
dst = pt.s.streamTagsCache.GetBig(dst, key.B)
if len(dst) > dstLen {
// Fast path - the StreamTags have been found in cache.
return dst
}
// Slow path - search for StreamTags in idb
dst = pt.idb.appendStreamTagsByStreamID(dst, sid)
if len(dst) > dstLen {
// Store the found StreamTags to cache
pt.s.streamTagsCache.SetBig(key.B, dst[dstLen:])
}
return dst
}
func (pt *partition) hasStreamIDInCache(sid *streamID) bool {
var result [1]byte

View file

@ -135,15 +135,6 @@ type Storage struct {
// the check whether the given stream is already registered in the persistent storage.
streamIDCache *workingsetcache.Cache
// streamTagsCache caches StreamTags entries keyed by streamID.
//
// There is no need to put partition into the key for StreamTags,
// since StreamTags are uniquely identified by streamID.
//
// It reduces the load on persistent storage during querying
// when StreamTags must be found for the particular streamID
streamTagsCache *workingsetcache.Cache
// filterStreamCache caches streamIDs keyed by (partition, []TenanID, StreamFilter).
//
// It reduces the load on persistent storage during querying by _stream:{...} filter.
@ -253,8 +244,6 @@ func MustOpenStorage(path string, cfg *StorageConfig) *Storage {
streamIDCachePath := filepath.Join(path, cacheDirname, streamIDCacheFilename)
streamIDCache := workingsetcache.Load(streamIDCachePath, mem/16)
streamTagsCache := workingsetcache.New(mem / 10)
filterStreamCache := workingsetcache.New(mem / 10)
s := &Storage{
@ -270,7 +259,6 @@ func MustOpenStorage(path string, cfg *StorageConfig) *Storage {
stopCh: make(chan struct{}),
streamIDCache: streamIDCache,
streamTagsCache: streamTagsCache,
filterStreamCache: filterStreamCache,
}
@ -474,9 +462,6 @@ func (s *Storage) MustClose() {
s.streamIDCache.Stop()
s.streamIDCache = nil
s.streamTagsCache.Stop()
s.streamTagsCache = nil
s.filterStreamCache.Stop()
s.filterStreamCache = nil