lib/storage: add a hint for merge about type of parts in merge (#7998)

Hint allows to choose type of cache to be used for index search:
- in-memory parts are storing recently ingested samples and should use
main cache. This improves ingestion speed and cache hit ration for
queries accessing recently ingested samples.
- merges of file parts is performed in background, using a separate
cache allows avoiding pollution of the main cache with irrelevant
entries.

Related issue:
https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7182

---------

Signed-off-by: f41gh7 <nik@victoriametrics.com>
This commit is contained in:
Nikolay 2025-01-10 13:01:39 +01:00 committed by GitHub
parent 9ada784983
commit e9f86af7f5
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
21 changed files with 116 additions and 48 deletions

View file

@ -67,6 +67,8 @@ var (
"See https://docs.victoriametrics.com/single-server-victoriametrics/#cache-tuning")
cacheSizeIndexDBDataBlocks = flagutil.NewBytes("storage.cacheSizeIndexDBDataBlocks", 0, "Overrides max size for indexdb/dataBlocks cache. "+
"See https://docs.victoriametrics.com/single-server-victoriametrics/#cache-tuning")
cacheSizeIndexDBDataBlocksSparse = flagutil.NewBytes("storage.cacheSizeIndexDBDataBlocksSparse", 0, "Overrides max size for indexdb/dataBlocksSparse cache. "+
"See https://docs.victoriametrics.com/single-server-victoriametrics/#cache-tuning")
cacheSizeIndexDBTagFilters = flagutil.NewBytes("storage.cacheSizeIndexDBTagFilters", 0, "Overrides max size for indexdb/tagFiltersToMetricIDs cache. "+
"See https://docs.victoriametrics.com/single-server-victoriametrics/#cache-tuning")
)
@ -100,6 +102,7 @@ func Init(resetCacheIfNeeded func(mrs []storage.MetricRow)) {
storage.SetTagFiltersCacheSize(cacheSizeIndexDBTagFilters.IntN())
mergeset.SetIndexBlocksCacheSize(cacheSizeIndexDBIndexBlocks.IntN())
mergeset.SetDataBlocksCacheSize(cacheSizeIndexDBDataBlocks.IntN())
mergeset.SetDataBlocksSparseCacheSize(cacheSizeIndexDBDataBlocksSparse.IntN())
if retentionPeriod.Duration() < 24*time.Hour {
logger.Fatalf("-retentionPeriod cannot be smaller than a day; got %s", retentionPeriod)
@ -581,6 +584,7 @@ func writeStorageMetrics(w io.Writer, strg *storage.Storage) {
metrics.WriteGaugeUint64(w, `vm_cache_entries{type="storage/next_day_metric_ids"}`, m.NextDayMetricIDCacheSize)
metrics.WriteGaugeUint64(w, `vm_cache_entries{type="storage/indexBlocks"}`, tm.IndexBlocksCacheSize)
metrics.WriteGaugeUint64(w, `vm_cache_entries{type="indexdb/dataBlocks"}`, idbm.DataBlocksCacheSize)
metrics.WriteGaugeUint64(w, `vm_cache_entries{type="indexdb/dataBlocksSparse"}`, idbm.DataBlocksSparseCacheSize)
metrics.WriteGaugeUint64(w, `vm_cache_entries{type="indexdb/indexBlocks"}`, idbm.IndexBlocksCacheSize)
metrics.WriteGaugeUint64(w, `vm_cache_entries{type="indexdb/tagFiltersToMetricIDs"}`, idbm.TagFiltersToMetricIDsCacheSize)
metrics.WriteGaugeUint64(w, `vm_cache_entries{type="storage/regexps"}`, uint64(storage.RegexpCacheSize()))
@ -592,6 +596,7 @@ func writeStorageMetrics(w io.Writer, strg *storage.Storage) {
metrics.WriteGaugeUint64(w, `vm_cache_size_bytes{type="storage/metricName"}`, m.MetricNameCacheSizeBytes)
metrics.WriteGaugeUint64(w, `vm_cache_size_bytes{type="storage/indexBlocks"}`, tm.IndexBlocksCacheSizeBytes)
metrics.WriteGaugeUint64(w, `vm_cache_size_bytes{type="indexdb/dataBlocks"}`, idbm.DataBlocksCacheSizeBytes)
metrics.WriteGaugeUint64(w, `vm_cache_size_bytes{type="indexdb/dataBlocksSparse"}`, idbm.DataBlocksSparseCacheSizeBytes)
metrics.WriteGaugeUint64(w, `vm_cache_size_bytes{type="indexdb/indexBlocks"}`, idbm.IndexBlocksCacheSizeBytes)
metrics.WriteGaugeUint64(w, `vm_cache_size_bytes{type="storage/date_metricID"}`, m.DateMetricIDCacheSizeBytes)
metrics.WriteGaugeUint64(w, `vm_cache_size_bytes{type="storage/hour_metric_ids"}`, m.HourMetricIDCacheSizeBytes)
@ -606,6 +611,7 @@ func writeStorageMetrics(w io.Writer, strg *storage.Storage) {
metrics.WriteGaugeUint64(w, `vm_cache_size_max_bytes{type="storage/metricName"}`, m.MetricNameCacheSizeMaxBytes)
metrics.WriteGaugeUint64(w, `vm_cache_size_max_bytes{type="storage/indexBlocks"}`, tm.IndexBlocksCacheSizeMaxBytes)
metrics.WriteGaugeUint64(w, `vm_cache_size_max_bytes{type="indexdb/dataBlocks"}`, idbm.DataBlocksCacheSizeMaxBytes)
metrics.WriteGaugeUint64(w, `vm_cache_size_max_bytes{type="indexdb/dataBlocksSparse"}`, idbm.DataBlocksSparseCacheSizeMaxBytes)
metrics.WriteGaugeUint64(w, `vm_cache_size_max_bytes{type="indexdb/indexBlocks"}`, idbm.IndexBlocksCacheSizeMaxBytes)
metrics.WriteGaugeUint64(w, `vm_cache_size_max_bytes{type="indexdb/tagFiltersToMetricIDs"}`, idbm.TagFiltersToMetricIDsCacheSizeMaxBytes)
metrics.WriteGaugeUint64(w, `vm_cache_size_max_bytes{type="storage/regexps"}`, uint64(storage.RegexpCacheMaxSizeBytes()))
@ -616,6 +622,7 @@ func writeStorageMetrics(w io.Writer, strg *storage.Storage) {
metrics.WriteCounterUint64(w, `vm_cache_requests_total{type="storage/metricName"}`, m.MetricNameCacheRequests)
metrics.WriteCounterUint64(w, `vm_cache_requests_total{type="storage/indexBlocks"}`, tm.IndexBlocksCacheRequests)
metrics.WriteCounterUint64(w, `vm_cache_requests_total{type="indexdb/dataBlocks"}`, idbm.DataBlocksCacheRequests)
metrics.WriteCounterUint64(w, `vm_cache_requests_total{type="indexdb/dataBlocksSparse"}`, idbm.DataBlocksSparseCacheRequests)
metrics.WriteCounterUint64(w, `vm_cache_requests_total{type="indexdb/indexBlocks"}`, idbm.IndexBlocksCacheRequests)
metrics.WriteCounterUint64(w, `vm_cache_requests_total{type="indexdb/tagFiltersToMetricIDs"}`, idbm.TagFiltersToMetricIDsCacheRequests)
metrics.WriteCounterUint64(w, `vm_cache_requests_total{type="storage/regexps"}`, storage.RegexpCacheRequests())
@ -626,6 +633,7 @@ func writeStorageMetrics(w io.Writer, strg *storage.Storage) {
metrics.WriteCounterUint64(w, `vm_cache_misses_total{type="storage/metricName"}`, m.MetricNameCacheMisses)
metrics.WriteCounterUint64(w, `vm_cache_misses_total{type="storage/indexBlocks"}`, tm.IndexBlocksCacheMisses)
metrics.WriteCounterUint64(w, `vm_cache_misses_total{type="indexdb/dataBlocks"}`, idbm.DataBlocksCacheMisses)
metrics.WriteCounterUint64(w, `vm_cache_misses_total{type="indexdb/dataBlocksSparse"}`, idbm.DataBlocksSparseCacheMisses)
metrics.WriteCounterUint64(w, `vm_cache_misses_total{type="indexdb/indexBlocks"}`, idbm.IndexBlocksCacheMisses)
metrics.WriteCounterUint64(w, `vm_cache_misses_total{type="indexdb/tagFiltersToMetricIDs"}`, idbm.TagFiltersToMetricIDsCacheMisses)
metrics.WriteCounterUint64(w, `vm_cache_misses_total{type="storage/regexps"}`, storage.RegexpCacheMisses())

View file

@ -1924,6 +1924,9 @@ Below is the output for `/path/to/vmstorage -help`:
-storage.cacheSizeIndexDBDataBlocks size
Overrides max size for indexdb/dataBlocks cache. See https://docs.victoriametrics.com/single-server-victoriametrics/#cache-tuning
Supports the following optional suffixes for size values: KB, MB, GB, TB, KiB, MiB, GiB, TiB (default 0)
-storage.cacheSizeIndexDBDataBlocksSparse size
Overrides max size for indexdb/dataBlocksSparse cache. See https://docs.victoriametrics.com/single-server-victoriametrics/#cache-tuning
Supports the following optional suffixes for size values: KB, MB, GB, TB, KiB, MiB, GiB, TiB (default 0)
-storage.cacheSizeIndexDBIndexBlocks size
Overrides max size for indexdb/indexBlocks cache. See https://docs.victoriametrics.com/single-server-victoriametrics/#cache-tuning
Supports the following optional suffixes for size values: KB, MB, GB, TB, KiB, MiB, GiB, TiB (default 0)

View file

@ -3300,6 +3300,9 @@ Pass `-help` to VictoriaMetrics in order to see the list of supported command-li
-storage.cacheSizeIndexDBDataBlocks size
Overrides max size for indexdb/dataBlocks cache. See https://docs.victoriametrics.com/single-server-victoriametrics/#cache-tuning
Supports the following optional suffixes for size values: KB, MB, GB, TB, KiB, MiB, GiB, TiB (default 0)
-storage.cacheSizeIndexDBDataBlocksSparse size
Overrides max size for indexdb/dataBlocksSparse cache. See https://docs.victoriametrics.com/single-server-victoriametrics/#cache-tuning
Supports the following optional suffixes for size values: KB, MB, GB, TB, KiB, MiB, GiB, TiB (default 0)
-storage.cacheSizeIndexDBIndexBlocks size
Overrides max size for indexdb/indexBlocks cache. See https://docs.victoriametrics.com/single-server-victoriametrics/#cache-tuning
Supports the following optional suffixes for size values: KB, MB, GB, TB, KiB, MiB, GiB, TiB (default 0)

View file

@ -166,7 +166,7 @@ func (idb *indexdb) getIndexSearch() *indexSearch {
}
}
is := v.(*indexSearch)
is.ts.Init(idb.tb)
is.ts.Init(idb.tb, false)
return is
}

View file

@ -14,6 +14,7 @@ import (
var idxbCache = blockcache.NewCache(getMaxIndexBlocksCacheSize)
var ibCache = blockcache.NewCache(getMaxInmemoryBlocksCacheSize)
var ibSparseCache = blockcache.NewCache(getMaxInmemoryBlocksSparseCacheSize)
// SetIndexBlocksCacheSize overrides the default size of indexdb/indexBlocks cache
func SetIndexBlocksCacheSize(size int) {
@ -48,9 +49,26 @@ func getMaxInmemoryBlocksCacheSize() int {
return maxInmemoryBlockCacheSize
}
// SetDataBlocksSparseCacheSize overrides the default size of indexdb/dataBlocksSparse cache
func SetDataBlocksSparseCacheSize(size int) {
maxInmemorySparseMergeCacheSize = size
}
func getMaxInmemoryBlocksSparseCacheSize() int {
maxInmemoryBlockSparseCacheSizeOnce.Do(func() {
if maxInmemorySparseMergeCacheSize <= 0 {
maxInmemorySparseMergeCacheSize = int(0.05 * float64(memory.Allowed()))
}
})
return maxInmemorySparseMergeCacheSize
}
var (
maxInmemoryBlockCacheSize int
maxInmemoryBlockCacheSizeOnce sync.Once
maxInmemorySparseMergeCacheSize int
maxInmemoryBlockSparseCacheSizeOnce sync.Once
)
type part struct {
@ -118,6 +136,7 @@ func (p *part) MustClose() {
idxbCache.RemoveBlocksForPart(p)
ibCache.RemoveBlocksForPart(p)
ibSparseCache.RemoveBlocksForPart(p)
}
type indexBlock struct {

View file

@ -36,6 +36,8 @@ type partSearch struct {
ib *inmemoryBlock
ibItemIdx int
sparse bool
}
func (ps *partSearch) reset() {
@ -52,15 +54,17 @@ func (ps *partSearch) reset() {
ps.ib = nil
ps.ibItemIdx = 0
ps.sparse = false
}
// Init initializes ps for search in the p.
//
// Use Seek for search in p.
func (ps *partSearch) Init(p *part) {
func (ps *partSearch) Init(p *part, sparse bool) {
ps.reset()
ps.p = p
ps.sparse = sparse
}
// Seek seeks for the first item greater or equal to k in ps.
@ -299,18 +303,22 @@ func (ps *partSearch) readIndexBlock(mr *metaindexRow) (*indexBlock, error) {
}
func (ps *partSearch) getInmemoryBlock(bh *blockHeader) (*inmemoryBlock, error) {
cache := ibCache
if ps.sparse {
cache = ibSparseCache
}
ibKey := blockcache.Key{
Part: ps.p,
Offset: bh.itemsBlockOffset,
}
b := ibCache.GetBlock(ibKey)
b := cache.GetBlock(ibKey)
if b == nil {
ib, err := ps.readInmemoryBlock(bh)
if err != nil {
return nil, err
}
b = ib
ibCache.PutBlock(ibKey, b)
cache.PutBlock(ibKey, b)
}
ib := b.(*inmemoryBlock)
return ib, nil

View file

@ -54,7 +54,7 @@ func testPartSearchConcurrent(p *part, items []string) error {
func testPartSearchSerial(r *rand.Rand, p *part, items []string) error {
var ps partSearch
ps.Init(p)
ps.Init(p, true)
var k []byte
// Search for the item smaller than the items[0]

View file

@ -574,6 +574,12 @@ type TableMetrics struct {
DataBlocksCacheRequests uint64
DataBlocksCacheMisses uint64
DataBlocksSparseCacheSize uint64
DataBlocksSparseCacheSizeBytes uint64
DataBlocksSparseCacheSizeMaxBytes uint64
DataBlocksSparseCacheRequests uint64
DataBlocksSparseCacheMisses uint64
IndexBlocksCacheSize uint64
IndexBlocksCacheSizeBytes uint64
IndexBlocksCacheSizeMaxBytes uint64
@ -635,6 +641,12 @@ func (tb *Table) UpdateMetrics(m *TableMetrics) {
m.DataBlocksCacheRequests = ibCache.Requests()
m.DataBlocksCacheMisses = ibCache.Misses()
m.DataBlocksSparseCacheSize = uint64(ibSparseCache.Len())
m.DataBlocksSparseCacheSizeBytes = uint64(ibSparseCache.SizeBytes())
m.DataBlocksSparseCacheSizeMaxBytes = uint64(ibSparseCache.SizeMaxBytes())
m.DataBlocksSparseCacheRequests = ibSparseCache.Requests()
m.DataBlocksSparseCacheMisses = ibSparseCache.Misses()
m.IndexBlocksCacheSize = uint64(idxbCache.Len())
m.IndexBlocksCacheSizeBytes = uint64(idxbCache.SizeBytes())
m.IndexBlocksCacheSizeMaxBytes = uint64(idxbCache.SizeMaxBytes())

View file

@ -59,7 +59,7 @@ func (ts *TableSearch) reset() {
// Init initializes ts for searching in the tb.
//
// MustClose must be called when the ts is no longer needed.
func (ts *TableSearch) Init(tb *Table) {
func (ts *TableSearch) Init(tb *Table, sparse bool) {
if ts.needClosing {
logger.Panicf("BUG: missing MustClose call before the next call to Init")
}
@ -74,7 +74,7 @@ func (ts *TableSearch) Init(tb *Table) {
// Initialize the psPool.
ts.psPool = slicesutil.SetLength(ts.psPool, len(ts.pws))
for i, pw := range ts.pws {
ts.psPool[i].Init(pw.p)
ts.psPool[i].Init(pw.p, sparse)
}
}

View file

@ -107,7 +107,7 @@ func testTableSearchConcurrent(tb *Table, items []string) error {
func testTableSearchSerial(tb *Table, items []string) error {
var ts TableSearch
ts.Init(tb)
ts.Init(tb, false)
for _, key := range []string{
"",
"123",

View file

@ -83,7 +83,7 @@ func benchmarkTableSearchKeysExt(b *testing.B, tb *Table, keys [][]byte, stripSu
b.RunParallel(func(pb *testing.PB) {
r := rand.New(rand.NewSource(1))
var ts TableSearch
ts.Init(tb)
ts.Init(tb, false)
defer ts.MustClose()
for pb.Next() {
startIdx := r.Intn(len(keys) - searchKeysCount)

View file

@ -139,9 +139,9 @@ func TestTableCreateSnapshotAt(t *testing.T) {
tb2 := MustOpenTable(snapshot2, nil, nil, &isReadOnly)
var ts, ts1, ts2 TableSearch
ts.Init(tb)
ts1.Init(tb1)
ts2.Init(tb2)
ts.Init(tb, false)
ts1.Init(tb1, false)
ts2.Init(tb2, false)
for i := 0; i < itemsCount; i++ {
key := []byte(fmt.Sprintf("item %d", i))
if err := ts.FirstItemWithPrefix(key); err != nil {

View file

@ -21,6 +21,9 @@ type blockStreamMerger struct {
// The last error
err error
// A flag to indicate which cache to use: sparse or regular.
useSparseCache bool
}
func (bsm *blockStreamMerger) reset() {
@ -34,10 +37,11 @@ func (bsm *blockStreamMerger) reset() {
bsm.retentionDeadline = 0
bsm.nextBlockNoop = false
bsm.err = nil
bsm.useSparseCache = false
}
// Init initializes bsm with the given bsrs.
func (bsm *blockStreamMerger) Init(bsrs []*blockStreamReader, retentionDeadline int64) {
func (bsm *blockStreamMerger) Init(bsrs []*blockStreamReader, retentionDeadline int64, useSparseCache bool) {
bsm.reset()
bsm.retentionDeadline = retentionDeadline
for _, bsr := range bsrs {
@ -59,6 +63,7 @@ func (bsm *blockStreamMerger) Init(bsrs []*blockStreamReader, retentionDeadline
heap.Init(&bsm.bsrHeap)
bsm.Block = &bsm.bsrHeap[0].Block
bsm.nextBlockNoop = true
bsm.useSparseCache = useSparseCache
}
func (bsm *blockStreamMerger) getRetentionDeadline(_ *blockHeader) int64 {

View file

@ -14,6 +14,9 @@ import (
"time"
"unsafe"
"github.com/VictoriaMetrics/fastcache"
"github.com/cespare/xxhash/v2"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
@ -25,8 +28,6 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/uint64set"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/workingsetcache"
"github.com/VictoriaMetrics/fastcache"
"github.com/cespare/xxhash/v2"
)
const (
@ -525,7 +526,12 @@ type indexSearch struct {
deadline uint64
}
// getIndexSearch returns an indexSearch with default configuration
func (db *indexDB) getIndexSearch(deadline uint64) *indexSearch {
return db.getIndexSearchInternal(deadline, false)
}
func (db *indexDB) getIndexSearchInternal(deadline uint64, sparse bool) *indexSearch {
v := db.indexSearchPool.Get()
if v == nil {
v = &indexSearch{
@ -533,7 +539,7 @@ func (db *indexDB) getIndexSearch(deadline uint64) *indexSearch {
}
}
is := v.(*indexSearch)
is.ts.Init(db.tb)
is.ts.Init(db.tb, sparse)
is.deadline = deadline
return is
}
@ -1522,31 +1528,35 @@ func (th *topHeap) Pop() any {
panic(fmt.Errorf("BUG: Pop shouldn't be called"))
}
// searchMetricNameWithCache appends metric name for the given metricID to dst
// searchMetricName appends metric name for the given metricID to dst
// and returns the result.
func (db *indexDB) searchMetricNameWithCache(dst []byte, metricID uint64) ([]byte, bool) {
metricName := db.getMetricNameFromCache(dst, metricID)
if len(metricName) > len(dst) {
return metricName, true
func (db *indexDB) searchMetricName(dst []byte, metricID uint64, noCache bool) ([]byte, bool) {
if !noCache {
metricName := db.getMetricNameFromCache(dst, metricID)
if len(metricName) > len(dst) {
return metricName, true
}
}
is := db.getIndexSearch(noDeadline)
is := db.getIndexSearchInternal(noDeadline, noCache)
var ok bool
dst, ok = is.searchMetricName(dst, metricID)
db.putIndexSearch(is)
if ok {
// There is no need in verifying whether the given metricID is deleted,
// since the filtering must be performed before calling this func.
db.putMetricNameToCache(metricID, dst)
if !noCache {
db.putMetricNameToCache(metricID, dst)
}
return dst, true
}
// Try searching in the external indexDB.
db.doExtDB(func(extDB *indexDB) {
is := extDB.getIndexSearch(noDeadline)
is := extDB.getIndexSearchInternal(noDeadline, noCache)
dst, ok = is.searchMetricName(dst, metricID)
extDB.putIndexSearch(is)
if ok {
if ok && !noCache {
// There is no need in verifying whether the given metricID is deleted,
// since the filtering must be performed before calling this func.
extDB.putMetricNameToCache(metricID, dst)

View file

@ -729,7 +729,7 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, isC
// Search for metric name for the given metricID.
var ok bool
metricNameCopy, ok = db.searchMetricNameWithCache(metricNameCopy[:0], genTSID.TSID.MetricID)
metricNameCopy, ok = db.searchMetricName(metricNameCopy[:0], genTSID.TSID.MetricID, false)
if !ok {
return fmt.Errorf("cannot find metricName for metricID=%d; i=%d", genTSID.TSID.MetricID, i)
}
@ -738,7 +738,7 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, isC
}
// Try searching metric name for non-existent MetricID.
buf, found := db.searchMetricNameWithCache(nil, 1)
buf, found := db.searchMetricName(nil, 1, false)
if found {
return fmt.Errorf("unexpected metricName found for non-existing metricID; got %X", buf)
}

View file

@ -15,11 +15,11 @@ import (
//
// rowsMerged is atomically updated with the number of merged rows during the merge.
func mergeBlockStreams(ph *partHeader, bsw *blockStreamWriter, bsrs []*blockStreamReader, stopCh <-chan struct{}, s *Storage, retentionDeadline int64,
rowsMerged, rowsDeleted *atomic.Uint64) error {
rowsMerged, rowsDeleted *atomic.Uint64, useSparseCache bool) error {
ph.Reset()
bsm := bsmPool.Get().(*blockStreamMerger)
bsm.Init(bsrs, retentionDeadline)
bsm.Init(bsrs, retentionDeadline, useSparseCache)
err := mergeBlockStreamsInternal(ph, bsw, bsm, stopCh, s, rowsMerged, rowsDeleted)
bsm.reset()
bsmPool.Put(bsm)

View file

@ -376,7 +376,7 @@ func TestMergeForciblyStop(t *testing.T) {
close(ch)
strg := newTestStorage()
if err := mergeBlockStreams(&mp.ph, &bsw, bsrs, ch, strg, 0, &rowsMerged, &rowsDeleted); !errors.Is(err, errForciblyStopped) {
if err := mergeBlockStreams(&mp.ph, &bsw, bsrs, ch, strg, 0, &rowsMerged, &rowsDeleted, true); !errors.Is(err, errForciblyStopped) {
t.Fatalf("unexpected error in mergeBlockStreams: got %v; want %v", err, errForciblyStopped)
}
if n := rowsMerged.Load(); n != 0 {
@ -398,7 +398,7 @@ func testMergeBlockStreams(t *testing.T, bsrs []*blockStreamReader, expectedBloc
strg := newTestStorage()
var rowsMerged, rowsDeleted atomic.Uint64
if err := mergeBlockStreams(&mp.ph, &bsw, bsrs, nil, strg, 0, &rowsMerged, &rowsDeleted); err != nil {
if err := mergeBlockStreams(&mp.ph, &bsw, bsrs, nil, strg, 0, &rowsMerged, &rowsDeleted, true); err != nil {
t.Fatalf("unexpected error in mergeBlockStreams: %s", err)
}
stopTestStorage(strg)

View file

@ -43,7 +43,7 @@ func benchmarkMergeBlockStreams(b *testing.B, mps []*inmemoryPart, rowsPerLoop i
}
mpOut.Reset()
bsw.MustInitFromInmemoryPart(&mpOut, -5)
if err := mergeBlockStreams(&mpOut.ph, &bsw, bsrs, nil, strg, 0, &rowsMerged, &rowsDeleted); err != nil {
if err := mergeBlockStreams(&mpOut.ph, &bsw, bsrs, nil, strg, 0, &rowsMerged, &rowsDeleted, true); err != nil {
panic(fmt.Errorf("cannot merge block streams: %w", err))
}
}

View file

@ -634,7 +634,7 @@ func (pt *partition) inmemoryPartsMerger() {
}
inmemoryPartsConcurrencyCh <- struct{}{}
err := pt.mergeParts(pws, pt.stopCh, false)
err := pt.mergeParts(pws, pt.stopCh, false, false)
<-inmemoryPartsConcurrencyCh
if err == nil {
@ -667,7 +667,7 @@ func (pt *partition) smallPartsMerger() {
}
smallPartsConcurrencyCh <- struct{}{}
err := pt.mergeParts(pws, pt.stopCh, false)
err := pt.mergeParts(pws, pt.stopCh, false, false)
<-smallPartsConcurrencyCh
if err == nil {
@ -700,7 +700,7 @@ func (pt *partition) bigPartsMerger() {
}
bigPartsConcurrencyCh <- struct{}{}
err := pt.mergeParts(pws, pt.stopCh, false)
err := pt.mergeParts(pws, pt.stopCh, false, false)
<-bigPartsConcurrencyCh
if err == nil {
@ -799,7 +799,7 @@ func (pt *partition) mustMergeInmemoryPartsFinal(pws []*partWrapper) *partWrappe
// Merge parts.
// The merge shouldn't be interrupted by stopCh, so use nil stopCh.
ph, err := pt.mergePartsInternal("", bsw, bsrs, partInmemory, nil)
ph, err := pt.mergePartsInternal("", bsw, bsrs, partInmemory, nil, time.Now().UnixMilli(), false)
putBlockStreamWriter(bsw)
for _, bsr := range bsrs {
putBlockStreamReader(bsr)
@ -1107,7 +1107,7 @@ func (pt *partition) flushInmemoryPartsToFiles(isFinal bool) {
}
pt.partsLock.Unlock()
if err := pt.mergePartsToFiles(pws, nil, inmemoryPartsConcurrencyCh); err != nil {
if err := pt.mergePartsToFiles(pws, nil, inmemoryPartsConcurrencyCh, false); err != nil {
logger.Panicf("FATAL: cannot merge in-memory parts: %s", err)
}
}
@ -1172,7 +1172,7 @@ func appendRawRowss(dst [][]rawRow, src []rawRow) [][]rawRow {
return dst
}
func (pt *partition) mergePartsToFiles(pws []*partWrapper, stopCh <-chan struct{}, concurrencyCh chan struct{}) error {
func (pt *partition) mergePartsToFiles(pws []*partWrapper, stopCh <-chan struct{}, concurrencyCh chan struct{}, useSparseCache bool) error {
pwsLen := len(pws)
var errGlobal error
@ -1188,7 +1188,7 @@ func (pt *partition) mergePartsToFiles(pws []*partWrapper, stopCh <-chan struct{
wg.Done()
}()
if err := pt.mergeParts(pwsChunk, stopCh, true); err != nil && !errors.Is(err, errForciblyStopped) {
if err := pt.mergeParts(pwsChunk, stopCh, true, useSparseCache); err != nil && !errors.Is(err, errForciblyStopped) {
errGlobalLock.Lock()
if errGlobal == nil {
errGlobal = err
@ -1228,7 +1228,7 @@ func (pt *partition) ForceMergeAllParts(stopCh <-chan struct{}) error {
// If len(pws) == 1, then the merge must run anyway.
// This allows applying the configured retention, removing the deleted series
// and performing de-duplication if needed.
if err := pt.mergePartsToFiles(pws, stopCh, bigPartsConcurrencyCh); err != nil {
if err := pt.mergePartsToFiles(pws, stopCh, bigPartsConcurrencyCh, true); err != nil {
return fmt.Errorf("cannot force merge %d parts from partition %q: %w", len(pws), pt.name, err)
}
@ -1379,7 +1379,7 @@ func getMinDedupInterval(pws []*partWrapper) int64 {
//
// All the parts inside pws must have isInMerge field set to true.
// The isInMerge field inside pws parts is set to false before returning from the function.
func (pt *partition) mergeParts(pws []*partWrapper, stopCh <-chan struct{}, isFinal bool) error {
func (pt *partition) mergeParts(pws []*partWrapper, stopCh <-chan struct{}, isFinal, useSparseCache bool) error {
if len(pws) == 0 {
logger.Panicf("BUG: empty pws cannot be passed to mergeParts()")
}
@ -1417,6 +1417,7 @@ func (pt *partition) mergeParts(pws []*partWrapper, stopCh <-chan struct{}, isFi
}
rowsPerBlock := float64(srcRowsCount) / float64(srcBlocksCount)
compressLevel := getCompressLevel(rowsPerBlock)
currentTimestamp := startTime.UnixMilli()
bsw := getBlockStreamWriter()
var mpNew *inmemoryPart
if dstPartType == partInmemory {
@ -1430,8 +1431,7 @@ func (pt *partition) mergeParts(pws []*partWrapper, stopCh <-chan struct{}, isFi
bsw.MustInitFromFilePart(dstPartPath, nocache, compressLevel)
}
// Merge source parts to destination part.
ph, err := pt.mergePartsInternal(dstPartPath, bsw, bsrs, dstPartType, stopCh)
ph, err := pt.mergePartsInternal(dstPartPath, bsw, bsrs, dstPartType, stopCh, currentTimestamp, useSparseCache)
putBlockStreamWriter(bsw)
for _, bsr := range bsrs {
putBlockStreamReader(bsr)
@ -1543,7 +1543,7 @@ func mustOpenBlockStreamReaders(pws []*partWrapper) []*blockStreamReader {
return bsrs
}
func (pt *partition) mergePartsInternal(dstPartPath string, bsw *blockStreamWriter, bsrs []*blockStreamReader, dstPartType partType, stopCh <-chan struct{}) (*partHeader, error) {
func (pt *partition) mergePartsInternal(dstPartPath string, bsw *blockStreamWriter, bsrs []*blockStreamReader, dstPartType partType, stopCh <-chan struct{}, currentTimestamp int64, useSparseCache bool) (*partHeader, error) {
var ph partHeader
var rowsMerged *atomic.Uint64
var rowsDeleted *atomic.Uint64
@ -1568,9 +1568,9 @@ func (pt *partition) mergePartsInternal(dstPartPath string, bsw *blockStreamWrit
default:
logger.Panicf("BUG: unknown partType=%d", dstPartType)
}
retentionDeadline := timestampFromTime(time.Now()) - pt.s.retentionMsecs
retentionDeadline := currentTimestamp - pt.s.retentionMsecs
activeMerges.Add(1)
err := mergeBlockStreams(&ph, bsw, bsrs, stopCh, pt.s, retentionDeadline, rowsMerged, rowsDeleted)
err := mergeBlockStreams(&ph, bsw, bsrs, stopCh, pt.s, retentionDeadline, rowsMerged, rowsDeleted, useSparseCache)
activeMerges.Add(-1)
mergesCount.Add(1)
if err != nil {

View file

@ -214,7 +214,7 @@ func (s *Search) NextMetricBlock() bool {
continue
}
var ok bool
s.MetricBlockRef.MetricName, ok = s.idb.searchMetricNameWithCache(s.MetricBlockRef.MetricName[:0], tsid.MetricID)
s.MetricBlockRef.MetricName, ok = s.idb.searchMetricName(s.MetricBlockRef.MetricName[:0], tsid.MetricID, false)
if !ok {
// Skip missing metricName for tsid.MetricID.
// It should be automatically fixed. See indexDB.searchMetricNameWithCache for details.

View file

@ -1159,7 +1159,7 @@ func (s *Storage) SearchMetricNames(qt *querytracer.Tracer, tfss []*TagFilters,
}
}
var ok bool
metricName, ok = idb.searchMetricNameWithCache(metricName[:0], metricID)
metricName, ok = idb.searchMetricName(metricName[:0], metricID, false)
if !ok {
// Skip missing metricName for metricID.
// It should be automatically fixed. See indexDB.searchMetricNameWithCache for details.