mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-10 15:14:09 +00:00
lib/logstorage: improve the performance of obtaining _stream column value
Substitute global streamTagsCache with per-blockSearch cache for ((stream.id) -> (_stream value)) entries. This improves scalability of obtaining _stream values on a machine with many CPU cores, since every CPU has its own blockSearch instance. This also should reduce memory usage when querying logs over big number of streams, since per-blockSearch cache of ((stream.id) -> (_stream value)) entries is limited in size, and its lifetime is bounded by a single query.
This commit is contained in:
parent
9d11a21541
commit
180137a377
4 changed files with 57 additions and 80 deletions
|
@ -500,43 +500,11 @@ func (br *blockResult) addStreamIDColumn(bs *blockSearch) {
|
|||
}
|
||||
|
||||
func (br *blockResult) addStreamColumn(bs *blockSearch) bool {
|
||||
if !bs.prevStreamID.equal(&bs.bsw.bh.streamID) {
|
||||
return br.addStreamColumnSlow(bs)
|
||||
}
|
||||
|
||||
if len(bs.prevStream) == 0 {
|
||||
streamStr := bs.getStreamStr()
|
||||
if streamStr == "" {
|
||||
return false
|
||||
}
|
||||
br.addConstColumn("_stream", bytesutil.ToUnsafeString(bs.prevStream))
|
||||
return true
|
||||
}
|
||||
|
||||
func (br *blockResult) addStreamColumnSlow(bs *blockSearch) bool {
|
||||
bb := bbPool.Get()
|
||||
defer bbPool.Put(bb)
|
||||
|
||||
streamID := &bs.bsw.bh.streamID
|
||||
bb.B = bs.bsw.p.pt.appendStreamTagsByStreamID(bb.B[:0], streamID)
|
||||
if len(bb.B) == 0 {
|
||||
// Couldn't find stream tags by streamID. This may be the case when the corresponding log stream
|
||||
// was recently registered and its tags aren't visible to search yet.
|
||||
// The stream tags must become visible in a few seconds.
|
||||
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6042
|
||||
bs.prevStreamID = *streamID
|
||||
bs.prevStream = bs.prevStream[:0]
|
||||
return false
|
||||
}
|
||||
|
||||
st := GetStreamTags()
|
||||
mustUnmarshalStreamTags(st, bb.B)
|
||||
bb.B = st.marshalString(bb.B[:0])
|
||||
PutStreamTags(st)
|
||||
|
||||
s := bytesutil.ToUnsafeString(bb.B)
|
||||
br.addConstColumn("_stream", s)
|
||||
|
||||
bs.prevStreamID = *streamID
|
||||
bs.prevStream = append(bs.prevStream[:0], s...)
|
||||
br.addConstColumn("_stream", streamStr)
|
||||
return true
|
||||
}
|
||||
|
||||
|
|
|
@ -85,6 +85,10 @@ func getBlockSearch() *blockSearch {
|
|||
|
||||
func putBlockSearch(bs *blockSearch) {
|
||||
bs.reset()
|
||||
|
||||
// reset seenStreams before returning bs to the pool in order to reduce memory usage.
|
||||
bs.seenStreams = nil
|
||||
|
||||
blockSearchPool.Put(bs)
|
||||
}
|
||||
|
||||
|
@ -115,10 +119,9 @@ type blockSearch struct {
|
|||
// a is used for storing unmarshaled data in csh
|
||||
a arena
|
||||
|
||||
// prevStreamID and prevStream are used for speeding up fetching _stream columns
|
||||
// across sequential blocks belonging to the same stream.
|
||||
prevStreamID streamID
|
||||
prevStream []byte
|
||||
// seenStreams contains seen streamIDs for the recent searches.
|
||||
// It is used for speeding up fetching _stream column.
|
||||
seenStreams map[u128]string
|
||||
}
|
||||
|
||||
func (bs *blockSearch) reset() {
|
||||
|
@ -145,6 +148,8 @@ func (bs *blockSearch) reset() {
|
|||
bs.sbu.reset()
|
||||
bs.csh.reset()
|
||||
bs.a.reset()
|
||||
|
||||
// Do not reset seenStreams, since its' lifetime is managed by blockResult.addStreamColumn() code.
|
||||
}
|
||||
|
||||
func (bs *blockSearch) partPath() string {
|
||||
|
@ -326,3 +331,48 @@ func (ih *indexBlockHeader) mustReadBlockHeaders(dst []blockHeader, p *part) []b
|
|||
|
||||
return dst
|
||||
}
|
||||
|
||||
// getStreamStr returns _stream value for the given block at bs.
|
||||
func (bs *blockSearch) getStreamStr() string {
|
||||
sid := bs.bsw.bh.streamID.id
|
||||
streamStr := bs.seenStreams[sid]
|
||||
if streamStr != "" {
|
||||
// Fast path - streamStr is found in the seenStreams.
|
||||
return streamStr
|
||||
}
|
||||
|
||||
// Slow path - load streamStr from the storage.
|
||||
streamStr = bs.getStreamStrSlow()
|
||||
if streamStr != "" {
|
||||
// Store the found streamStr in seenStreams.
|
||||
if len(bs.seenStreams) > 20_000 {
|
||||
bs.seenStreams = nil
|
||||
}
|
||||
if bs.seenStreams == nil {
|
||||
bs.seenStreams = make(map[u128]string)
|
||||
}
|
||||
bs.seenStreams[sid] = streamStr
|
||||
}
|
||||
return streamStr
|
||||
}
|
||||
|
||||
func (bs *blockSearch) getStreamStrSlow() string {
|
||||
bb := bbPool.Get()
|
||||
defer bbPool.Put(bb)
|
||||
|
||||
bb.B = bs.bsw.p.pt.idb.appendStreamTagsByStreamID(bb.B[:0], &bs.bsw.bh.streamID)
|
||||
if len(bb.B) == 0 {
|
||||
// Couldn't find stream tags by sid. This may be the case when the corresponding log stream
|
||||
// was recently registered and its tags aren't visible to search yet.
|
||||
// The stream tags must become visible in a few seconds.
|
||||
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6042
|
||||
return ""
|
||||
}
|
||||
|
||||
st := GetStreamTags()
|
||||
mustUnmarshalStreamTags(st, bb.B)
|
||||
bb.B = st.marshalString(bb.B[:0])
|
||||
PutStreamTags(st)
|
||||
|
||||
return string(bb.B)
|
||||
}
|
||||
|
|
|
@ -159,32 +159,6 @@ func (pt *partition) logIngestedRows(lr *LogRows) {
|
|||
}
|
||||
}
|
||||
|
||||
// appendStreamTagsByStreamID appends canonical representation of stream tags for the given sid to dst
|
||||
// and returns the result.
|
||||
func (pt *partition) appendStreamTagsByStreamID(dst []byte, sid *streamID) []byte {
|
||||
// Search for the StreamTags in the cache.
|
||||
key := bbPool.Get()
|
||||
defer bbPool.Put(key)
|
||||
|
||||
// There is no need in putting partition name into key here,
|
||||
// since StreamTags is uniquely identified by streamID.
|
||||
key.B = sid.marshal(key.B)
|
||||
dstLen := len(dst)
|
||||
dst = pt.s.streamTagsCache.GetBig(dst, key.B)
|
||||
if len(dst) > dstLen {
|
||||
// Fast path - the StreamTags have been found in cache.
|
||||
return dst
|
||||
}
|
||||
|
||||
// Slow path - search for StreamTags in idb
|
||||
dst = pt.idb.appendStreamTagsByStreamID(dst, sid)
|
||||
if len(dst) > dstLen {
|
||||
// Store the found StreamTags to cache
|
||||
pt.s.streamTagsCache.SetBig(key.B, dst[dstLen:])
|
||||
}
|
||||
return dst
|
||||
}
|
||||
|
||||
func (pt *partition) hasStreamIDInCache(sid *streamID) bool {
|
||||
var result [1]byte
|
||||
|
||||
|
|
|
@ -135,15 +135,6 @@ type Storage struct {
|
|||
// the check whether the given stream is already registered in the persistent storage.
|
||||
streamIDCache *workingsetcache.Cache
|
||||
|
||||
// streamTagsCache caches StreamTags entries keyed by streamID.
|
||||
//
|
||||
// There is no need to put partition into the key for StreamTags,
|
||||
// since StreamTags are uniquely identified by streamID.
|
||||
//
|
||||
// It reduces the load on persistent storage during querying
|
||||
// when StreamTags must be found for the particular streamID
|
||||
streamTagsCache *workingsetcache.Cache
|
||||
|
||||
// filterStreamCache caches streamIDs keyed by (partition, []TenanID, StreamFilter).
|
||||
//
|
||||
// It reduces the load on persistent storage during querying by _stream:{...} filter.
|
||||
|
@ -253,8 +244,6 @@ func MustOpenStorage(path string, cfg *StorageConfig) *Storage {
|
|||
streamIDCachePath := filepath.Join(path, cacheDirname, streamIDCacheFilename)
|
||||
streamIDCache := workingsetcache.Load(streamIDCachePath, mem/16)
|
||||
|
||||
streamTagsCache := workingsetcache.New(mem / 10)
|
||||
|
||||
filterStreamCache := workingsetcache.New(mem / 10)
|
||||
|
||||
s := &Storage{
|
||||
|
@ -270,7 +259,6 @@ func MustOpenStorage(path string, cfg *StorageConfig) *Storage {
|
|||
stopCh: make(chan struct{}),
|
||||
|
||||
streamIDCache: streamIDCache,
|
||||
streamTagsCache: streamTagsCache,
|
||||
filterStreamCache: filterStreamCache,
|
||||
}
|
||||
|
||||
|
@ -474,9 +462,6 @@ func (s *Storage) MustClose() {
|
|||
s.streamIDCache.Stop()
|
||||
s.streamIDCache = nil
|
||||
|
||||
s.streamTagsCache.Stop()
|
||||
s.streamTagsCache = nil
|
||||
|
||||
s.filterStreamCache.Stop()
|
||||
s.filterStreamCache = nil
|
||||
|
||||
|
|
Loading…
Reference in a new issue