diff --git a/lib/logstorage/block_result.go b/lib/logstorage/block_result.go index 2ef7fe857..2dc30a3fb 100644 --- a/lib/logstorage/block_result.go +++ b/lib/logstorage/block_result.go @@ -500,43 +500,11 @@ func (br *blockResult) addStreamIDColumn(bs *blockSearch) { } func (br *blockResult) addStreamColumn(bs *blockSearch) bool { - if !bs.prevStreamID.equal(&bs.bsw.bh.streamID) { - return br.addStreamColumnSlow(bs) - } - - if len(bs.prevStream) == 0 { + streamStr := bs.getStreamStr() + if streamStr == "" { return false } - br.addConstColumn("_stream", bytesutil.ToUnsafeString(bs.prevStream)) - return true -} - -func (br *blockResult) addStreamColumnSlow(bs *blockSearch) bool { - bb := bbPool.Get() - defer bbPool.Put(bb) - - streamID := &bs.bsw.bh.streamID - bb.B = bs.bsw.p.pt.appendStreamTagsByStreamID(bb.B[:0], streamID) - if len(bb.B) == 0 { - // Couldn't find stream tags by streamID. This may be the case when the corresponding log stream - // was recently registered and its tags aren't visible to search yet. - // The stream tags must become visible in a few seconds. - // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6042 - bs.prevStreamID = *streamID - bs.prevStream = bs.prevStream[:0] - return false - } - - st := GetStreamTags() - mustUnmarshalStreamTags(st, bb.B) - bb.B = st.marshalString(bb.B[:0]) - PutStreamTags(st) - - s := bytesutil.ToUnsafeString(bb.B) - br.addConstColumn("_stream", s) - - bs.prevStreamID = *streamID - bs.prevStream = append(bs.prevStream[:0], s...) + br.addConstColumn("_stream", streamStr) return true } diff --git a/lib/logstorage/block_search.go b/lib/logstorage/block_search.go index cf3995412..3512dd321 100644 --- a/lib/logstorage/block_search.go +++ b/lib/logstorage/block_search.go @@ -85,6 +85,10 @@ func getBlockSearch() *blockSearch { func putBlockSearch(bs *blockSearch) { bs.reset() + + // reset seenStreams before returning bs to the pool in order to reduce memory usage. + bs.seenStreams = nil + blockSearchPool.Put(bs) } @@ -115,10 +119,9 @@ type blockSearch struct { // a is used for storing unmarshaled data in csh a arena - // prevStreamID and prevStream are used for speeding up fetching _stream columns - // across sequential blocks belonging to the same stream. - prevStreamID streamID - prevStream []byte + // seenStreams contains seen streamIDs for the recent searches. + // It is used for speeding up fetching _stream column. + seenStreams map[u128]string } func (bs *blockSearch) reset() { @@ -145,6 +148,8 @@ func (bs *blockSearch) reset() { bs.sbu.reset() bs.csh.reset() bs.a.reset() + + // Do not reset seenStreams, since its' lifetime is managed by blockResult.addStreamColumn() code. } func (bs *blockSearch) partPath() string { @@ -326,3 +331,48 @@ func (ih *indexBlockHeader) mustReadBlockHeaders(dst []blockHeader, p *part) []b return dst } + +// getStreamStr returns _stream value for the given block at bs. +func (bs *blockSearch) getStreamStr() string { + sid := bs.bsw.bh.streamID.id + streamStr := bs.seenStreams[sid] + if streamStr != "" { + // Fast path - streamStr is found in the seenStreams. + return streamStr + } + + // Slow path - load streamStr from the storage. + streamStr = bs.getStreamStrSlow() + if streamStr != "" { + // Store the found streamStr in seenStreams. + if len(bs.seenStreams) > 20_000 { + bs.seenStreams = nil + } + if bs.seenStreams == nil { + bs.seenStreams = make(map[u128]string) + } + bs.seenStreams[sid] = streamStr + } + return streamStr +} + +func (bs *blockSearch) getStreamStrSlow() string { + bb := bbPool.Get() + defer bbPool.Put(bb) + + bb.B = bs.bsw.p.pt.idb.appendStreamTagsByStreamID(bb.B[:0], &bs.bsw.bh.streamID) + if len(bb.B) == 0 { + // Couldn't find stream tags by sid. This may be the case when the corresponding log stream + // was recently registered and its tags aren't visible to search yet. + // The stream tags must become visible in a few seconds. + // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6042 + return "" + } + + st := GetStreamTags() + mustUnmarshalStreamTags(st, bb.B) + bb.B = st.marshalString(bb.B[:0]) + PutStreamTags(st) + + return string(bb.B) +} diff --git a/lib/logstorage/partition.go b/lib/logstorage/partition.go index 64465de20..fc88510ec 100644 --- a/lib/logstorage/partition.go +++ b/lib/logstorage/partition.go @@ -159,32 +159,6 @@ func (pt *partition) logIngestedRows(lr *LogRows) { } } -// appendStreamTagsByStreamID appends canonical representation of stream tags for the given sid to dst -// and returns the result. -func (pt *partition) appendStreamTagsByStreamID(dst []byte, sid *streamID) []byte { - // Search for the StreamTags in the cache. - key := bbPool.Get() - defer bbPool.Put(key) - - // There is no need in putting partition name into key here, - // since StreamTags is uniquely identified by streamID. - key.B = sid.marshal(key.B) - dstLen := len(dst) - dst = pt.s.streamTagsCache.GetBig(dst, key.B) - if len(dst) > dstLen { - // Fast path - the StreamTags have been found in cache. - return dst - } - - // Slow path - search for StreamTags in idb - dst = pt.idb.appendStreamTagsByStreamID(dst, sid) - if len(dst) > dstLen { - // Store the found StreamTags to cache - pt.s.streamTagsCache.SetBig(key.B, dst[dstLen:]) - } - return dst -} - func (pt *partition) hasStreamIDInCache(sid *streamID) bool { var result [1]byte diff --git a/lib/logstorage/storage.go b/lib/logstorage/storage.go index b0d94babf..b729ffe30 100644 --- a/lib/logstorage/storage.go +++ b/lib/logstorage/storage.go @@ -135,15 +135,6 @@ type Storage struct { // the check whether the given stream is already registered in the persistent storage. streamIDCache *workingsetcache.Cache - // streamTagsCache caches StreamTags entries keyed by streamID. - // - // There is no need to put partition into the key for StreamTags, - // since StreamTags are uniquely identified by streamID. - // - // It reduces the load on persistent storage during querying - // when StreamTags must be found for the particular streamID - streamTagsCache *workingsetcache.Cache - // filterStreamCache caches streamIDs keyed by (partition, []TenanID, StreamFilter). // // It reduces the load on persistent storage during querying by _stream:{...} filter. @@ -253,8 +244,6 @@ func MustOpenStorage(path string, cfg *StorageConfig) *Storage { streamIDCachePath := filepath.Join(path, cacheDirname, streamIDCacheFilename) streamIDCache := workingsetcache.Load(streamIDCachePath, mem/16) - streamTagsCache := workingsetcache.New(mem / 10) - filterStreamCache := workingsetcache.New(mem / 10) s := &Storage{ @@ -270,7 +259,6 @@ func MustOpenStorage(path string, cfg *StorageConfig) *Storage { stopCh: make(chan struct{}), streamIDCache: streamIDCache, - streamTagsCache: streamTagsCache, filterStreamCache: filterStreamCache, } @@ -474,9 +462,6 @@ func (s *Storage) MustClose() { s.streamIDCache.Stop() s.streamIDCache = nil - s.streamTagsCache.Stop() - s.streamTagsCache = nil - s.filterStreamCache.Stop() s.filterStreamCache = nil