mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-02-09 15:27:11 +00:00
lib/storage: consistency renaming: tagCache -> tagFiltersCache
This improves code readability
This commit is contained in:
parent
21abf487c3
commit
22c6e64bbc
2 changed files with 32 additions and 33 deletions
|
@ -532,7 +532,7 @@ func registerStorageMetrics(strg *storage.Storage) {
|
||||||
return float64(idbm().IndexBlocksCacheSize)
|
return float64(idbm().IndexBlocksCacheSize)
|
||||||
})
|
})
|
||||||
metrics.NewGauge(`vm_cache_entries{type="indexdb/tagFilters"}`, func() float64 {
|
metrics.NewGauge(`vm_cache_entries{type="indexdb/tagFilters"}`, func() float64 {
|
||||||
return float64(idbm().TagCacheSize)
|
return float64(idbm().TagFiltersCacheSize)
|
||||||
})
|
})
|
||||||
metrics.NewGauge(`vm_cache_entries{type="indexdb/uselessTagFilters"}`, func() float64 {
|
metrics.NewGauge(`vm_cache_entries{type="indexdb/uselessTagFilters"}`, func() float64 {
|
||||||
return float64(idbm().UselessTagFiltersCacheSize)
|
return float64(idbm().UselessTagFiltersCacheSize)
|
||||||
|
@ -575,7 +575,7 @@ func registerStorageMetrics(strg *storage.Storage) {
|
||||||
return float64(m().NextDayMetricIDCacheSizeBytes)
|
return float64(m().NextDayMetricIDCacheSizeBytes)
|
||||||
})
|
})
|
||||||
metrics.NewGauge(`vm_cache_size_bytes{type="indexdb/tagFilters"}`, func() float64 {
|
metrics.NewGauge(`vm_cache_size_bytes{type="indexdb/tagFilters"}`, func() float64 {
|
||||||
return float64(idbm().TagCacheSizeBytes)
|
return float64(idbm().TagFiltersCacheSizeBytes)
|
||||||
})
|
})
|
||||||
metrics.NewGauge(`vm_cache_size_bytes{type="indexdb/uselessTagFilters"}`, func() float64 {
|
metrics.NewGauge(`vm_cache_size_bytes{type="indexdb/uselessTagFilters"}`, func() float64 {
|
||||||
return float64(idbm().UselessTagFiltersCacheSizeBytes)
|
return float64(idbm().UselessTagFiltersCacheSizeBytes)
|
||||||
|
@ -606,7 +606,7 @@ func registerStorageMetrics(strg *storage.Storage) {
|
||||||
return float64(idbm().IndexBlocksCacheRequests)
|
return float64(idbm().IndexBlocksCacheRequests)
|
||||||
})
|
})
|
||||||
metrics.NewGauge(`vm_cache_requests_total{type="indexdb/tagFilters"}`, func() float64 {
|
metrics.NewGauge(`vm_cache_requests_total{type="indexdb/tagFilters"}`, func() float64 {
|
||||||
return float64(idbm().TagCacheRequests)
|
return float64(idbm().TagFiltersCacheRequests)
|
||||||
})
|
})
|
||||||
metrics.NewGauge(`vm_cache_requests_total{type="indexdb/uselessTagFilters"}`, func() float64 {
|
metrics.NewGauge(`vm_cache_requests_total{type="indexdb/uselessTagFilters"}`, func() float64 {
|
||||||
return float64(idbm().UselessTagFiltersCacheRequests)
|
return float64(idbm().UselessTagFiltersCacheRequests)
|
||||||
|
@ -637,7 +637,7 @@ func registerStorageMetrics(strg *storage.Storage) {
|
||||||
return float64(idbm().IndexBlocksCacheMisses)
|
return float64(idbm().IndexBlocksCacheMisses)
|
||||||
})
|
})
|
||||||
metrics.NewGauge(`vm_cache_misses_total{type="indexdb/tagFilters"}`, func() float64 {
|
metrics.NewGauge(`vm_cache_misses_total{type="indexdb/tagFilters"}`, func() float64 {
|
||||||
return float64(idbm().TagCacheMisses)
|
return float64(idbm().TagFiltersCacheMisses)
|
||||||
})
|
})
|
||||||
metrics.NewGauge(`vm_cache_misses_total{type="indexdb/uselessTagFilters"}`, func() float64 {
|
metrics.NewGauge(`vm_cache_misses_total{type="indexdb/uselessTagFilters"}`, func() float64 {
|
||||||
return float64(idbm().UselessTagFiltersCacheMisses)
|
return float64(idbm().UselessTagFiltersCacheMisses)
|
||||||
|
|
|
@ -89,7 +89,7 @@ type indexDB struct {
|
||||||
extDBLock sync.Mutex
|
extDBLock sync.Mutex
|
||||||
|
|
||||||
// Cache for fast TagFilters -> TSIDs lookup.
|
// Cache for fast TagFilters -> TSIDs lookup.
|
||||||
tagCache *workingsetcache.Cache
|
tagFiltersCache *workingsetcache.Cache
|
||||||
|
|
||||||
// The parent storage.
|
// The parent storage.
|
||||||
s *Storage
|
s *Storage
|
||||||
|
@ -111,14 +111,14 @@ func openIndexDB(path string, s *Storage) (*indexDB, error) {
|
||||||
logger.Panicf("BUG: Storage must be nin-nil")
|
logger.Panicf("BUG: Storage must be nin-nil")
|
||||||
}
|
}
|
||||||
|
|
||||||
tb, err := mergeset.OpenTable(path, invalidateTagCache, mergeTagToMetricIDsRows)
|
tb, err := mergeset.OpenTable(path, invalidateTagFiltersCache, mergeTagToMetricIDsRows)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("cannot open indexDB %q: %w", path, err)
|
return nil, fmt.Errorf("cannot open indexDB %q: %w", path, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
name := filepath.Base(path)
|
name := filepath.Base(path)
|
||||||
|
|
||||||
// Do not persist tagCache in files, since it is very volatile.
|
// Do not persist tagFiltersCache in files, since it is very volatile.
|
||||||
mem := memory.Allowed()
|
mem := memory.Allowed()
|
||||||
|
|
||||||
db := &indexDB{
|
db := &indexDB{
|
||||||
|
@ -126,7 +126,7 @@ func openIndexDB(path string, s *Storage) (*indexDB, error) {
|
||||||
tb: tb,
|
tb: tb,
|
||||||
name: name,
|
name: name,
|
||||||
|
|
||||||
tagCache: workingsetcache.New(mem/32, time.Hour),
|
tagFiltersCache: workingsetcache.New(mem/32, time.Hour),
|
||||||
s: s,
|
s: s,
|
||||||
uselessTagFiltersCache: workingsetcache.New(mem/128, time.Hour),
|
uselessTagFiltersCache: workingsetcache.New(mem/128, time.Hour),
|
||||||
loopsPerDateTagFilterCache: workingsetcache.New(mem/128, time.Hour),
|
loopsPerDateTagFilterCache: workingsetcache.New(mem/128, time.Hour),
|
||||||
|
@ -138,10 +138,10 @@ const noDeadline = 1<<64 - 1
|
||||||
|
|
||||||
// IndexDBMetrics contains essential metrics for indexDB.
|
// IndexDBMetrics contains essential metrics for indexDB.
|
||||||
type IndexDBMetrics struct {
|
type IndexDBMetrics struct {
|
||||||
TagCacheSize uint64
|
TagFiltersCacheSize uint64
|
||||||
TagCacheSizeBytes uint64
|
TagFiltersCacheSizeBytes uint64
|
||||||
TagCacheRequests uint64
|
TagFiltersCacheRequests uint64
|
||||||
TagCacheMisses uint64
|
TagFiltersCacheMisses uint64
|
||||||
|
|
||||||
UselessTagFiltersCacheSize uint64
|
UselessTagFiltersCacheSize uint64
|
||||||
UselessTagFiltersCacheSizeBytes uint64
|
UselessTagFiltersCacheSizeBytes uint64
|
||||||
|
@ -184,11 +184,11 @@ func (db *indexDB) UpdateMetrics(m *IndexDBMetrics) {
|
||||||
var cs fastcache.Stats
|
var cs fastcache.Stats
|
||||||
|
|
||||||
cs.Reset()
|
cs.Reset()
|
||||||
db.tagCache.UpdateStats(&cs)
|
db.tagFiltersCache.UpdateStats(&cs)
|
||||||
m.TagCacheSize += cs.EntriesCount
|
m.TagFiltersCacheSize += cs.EntriesCount
|
||||||
m.TagCacheSizeBytes += cs.BytesSize
|
m.TagFiltersCacheSizeBytes += cs.BytesSize
|
||||||
m.TagCacheRequests += cs.GetCalls
|
m.TagFiltersCacheRequests += cs.GetCalls
|
||||||
m.TagCacheMisses += cs.Misses
|
m.TagFiltersCacheMisses += cs.Misses
|
||||||
|
|
||||||
cs.Reset()
|
cs.Reset()
|
||||||
db.uselessTagFiltersCache.UpdateStats(&cs)
|
db.uselessTagFiltersCache.UpdateStats(&cs)
|
||||||
|
@ -276,11 +276,11 @@ func (db *indexDB) decRef() {
|
||||||
db.SetExtDB(nil)
|
db.SetExtDB(nil)
|
||||||
|
|
||||||
// Free space occupied by caches owned by db.
|
// Free space occupied by caches owned by db.
|
||||||
db.tagCache.Stop()
|
db.tagFiltersCache.Stop()
|
||||||
db.uselessTagFiltersCache.Stop()
|
db.uselessTagFiltersCache.Stop()
|
||||||
db.loopsPerDateTagFilterCache.Stop()
|
db.loopsPerDateTagFilterCache.Stop()
|
||||||
|
|
||||||
db.tagCache = nil
|
db.tagFiltersCache = nil
|
||||||
db.s = nil
|
db.s = nil
|
||||||
db.uselessTagFiltersCache = nil
|
db.uselessTagFiltersCache = nil
|
||||||
db.loopsPerDateTagFilterCache = nil
|
db.loopsPerDateTagFilterCache = nil
|
||||||
|
@ -294,10 +294,10 @@ func (db *indexDB) decRef() {
|
||||||
logger.Infof("indexDB %q has been dropped", tbPath)
|
logger.Infof("indexDB %q has been dropped", tbPath)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *indexDB) getFromTagCache(key []byte) ([]TSID, bool) {
|
func (db *indexDB) getFromTagFiltersCache(key []byte) ([]TSID, bool) {
|
||||||
compressedBuf := tagBufPool.Get()
|
compressedBuf := tagBufPool.Get()
|
||||||
defer tagBufPool.Put(compressedBuf)
|
defer tagBufPool.Put(compressedBuf)
|
||||||
compressedBuf.B = db.tagCache.GetBig(compressedBuf.B[:0], key)
|
compressedBuf.B = db.tagFiltersCache.GetBig(compressedBuf.B[:0], key)
|
||||||
if len(compressedBuf.B) == 0 {
|
if len(compressedBuf.B) == 0 {
|
||||||
return nil, false
|
return nil, false
|
||||||
}
|
}
|
||||||
|
@ -306,24 +306,24 @@ func (db *indexDB) getFromTagCache(key []byte) ([]TSID, bool) {
|
||||||
var err error
|
var err error
|
||||||
buf.B, err = encoding.DecompressZSTD(buf.B[:0], compressedBuf.B)
|
buf.B, err = encoding.DecompressZSTD(buf.B[:0], compressedBuf.B)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Panicf("FATAL: cannot decompress tsids from tagCache: %s", err)
|
logger.Panicf("FATAL: cannot decompress tsids from tagFiltersCache: %s", err)
|
||||||
}
|
}
|
||||||
tsids, err := unmarshalTSIDs(nil, buf.B)
|
tsids, err := unmarshalTSIDs(nil, buf.B)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Panicf("FATAL: cannot unmarshal tsids from tagCache: %s", err)
|
logger.Panicf("FATAL: cannot unmarshal tsids from tagFiltersCache: %s", err)
|
||||||
}
|
}
|
||||||
return tsids, true
|
return tsids, true
|
||||||
}
|
}
|
||||||
|
|
||||||
var tagBufPool bytesutil.ByteBufferPool
|
var tagBufPool bytesutil.ByteBufferPool
|
||||||
|
|
||||||
func (db *indexDB) putToTagCache(tsids []TSID, key []byte) {
|
func (db *indexDB) putToTagFiltersCache(tsids []TSID, key []byte) {
|
||||||
buf := tagBufPool.Get()
|
buf := tagBufPool.Get()
|
||||||
buf.B = marshalTSIDs(buf.B[:0], tsids)
|
buf.B = marshalTSIDs(buf.B[:0], tsids)
|
||||||
compressedBuf := tagBufPool.Get()
|
compressedBuf := tagBufPool.Get()
|
||||||
compressedBuf.B = encoding.CompressZSTDLevel(compressedBuf.B[:0], buf.B, 1)
|
compressedBuf.B = encoding.CompressZSTDLevel(compressedBuf.B[:0], buf.B, 1)
|
||||||
tagBufPool.Put(buf)
|
tagBufPool.Put(buf)
|
||||||
db.tagCache.SetBig(key, compressedBuf.B)
|
db.tagFiltersCache.SetBig(key, compressedBuf.B)
|
||||||
tagBufPool.Put(compressedBuf)
|
tagBufPool.Put(compressedBuf)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -394,7 +394,7 @@ func marshalTagFiltersKey(dst []byte, tfss []*TagFilters, tr TimeRange, versione
|
||||||
return dst
|
return dst
|
||||||
}
|
}
|
||||||
|
|
||||||
func invalidateTagCache() {
|
func invalidateTagFiltersCache() {
|
||||||
// This function must be fast, since it is called each
|
// This function must be fast, since it is called each
|
||||||
// time new timeseries is added.
|
// time new timeseries is added.
|
||||||
atomic.AddUint64(&tagFiltersKeyGen, 1)
|
atomic.AddUint64(&tagFiltersKeyGen, 1)
|
||||||
|
@ -549,8 +549,7 @@ func (db *indexDB) createTSIDByName(dst *TSID, metricName []byte) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// There is no need in invalidating tag cache, since it is invalidated
|
// There is no need in invalidating tag cache, since it is invalidated
|
||||||
// on db.tb flush via invalidateTagCache flushCallback passed to OpenTable.
|
// on db.tb flush via invalidateTagFiltersCache flushCallback passed to OpenTable.
|
||||||
|
|
||||||
atomic.AddUint64(&db.newTimeseriesCreated, 1)
|
atomic.AddUint64(&db.newTimeseriesCreated, 1)
|
||||||
if logNewSeries {
|
if logNewSeries {
|
||||||
logger.Infof("new series created: %s", mn.String())
|
logger.Infof("new series created: %s", mn.String())
|
||||||
|
@ -1619,7 +1618,7 @@ func (db *indexDB) deleteMetricIDs(metricIDs []uint64) error {
|
||||||
db.s.updateDeletedMetricIDs(dmis)
|
db.s.updateDeletedMetricIDs(dmis)
|
||||||
|
|
||||||
// Reset TagFilters -> TSIDS cache, since it may contain deleted TSIDs.
|
// Reset TagFilters -> TSIDS cache, since it may contain deleted TSIDs.
|
||||||
invalidateTagCache()
|
invalidateTagFiltersCache()
|
||||||
|
|
||||||
// Reset MetricName -> TSID cache, since it may contain deleted TSIDs.
|
// Reset MetricName -> TSID cache, since it may contain deleted TSIDs.
|
||||||
db.s.resetAndSaveTSIDCache()
|
db.s.resetAndSaveTSIDCache()
|
||||||
|
@ -1690,7 +1689,7 @@ func (db *indexDB) searchTSIDs(tfss []*TagFilters, tr TimeRange, maxMetrics int,
|
||||||
defer tagFiltersKeyBufPool.Put(tfKeyBuf)
|
defer tagFiltersKeyBufPool.Put(tfKeyBuf)
|
||||||
|
|
||||||
tfKeyBuf.B = marshalTagFiltersKey(tfKeyBuf.B[:0], tfss, tr, true)
|
tfKeyBuf.B = marshalTagFiltersKey(tfKeyBuf.B[:0], tfss, tr, true)
|
||||||
tsids, ok := db.getFromTagCache(tfKeyBuf.B)
|
tsids, ok := db.getFromTagFiltersCache(tfKeyBuf.B)
|
||||||
if ok {
|
if ok {
|
||||||
// Fast path - tsids found in the cache.
|
// Fast path - tsids found in the cache.
|
||||||
return tsids, nil
|
return tsids, nil
|
||||||
|
@ -1713,7 +1712,7 @@ func (db *indexDB) searchTSIDs(tfss []*TagFilters, tr TimeRange, maxMetrics int,
|
||||||
|
|
||||||
// Data in extDB cannot be changed, so use unversioned keys for tag cache.
|
// Data in extDB cannot be changed, so use unversioned keys for tag cache.
|
||||||
tfKeyExtBuf.B = marshalTagFiltersKey(tfKeyExtBuf.B[:0], tfss, tr, false)
|
tfKeyExtBuf.B = marshalTagFiltersKey(tfKeyExtBuf.B[:0], tfss, tr, false)
|
||||||
tsids, ok := extDB.getFromTagCache(tfKeyExtBuf.B)
|
tsids, ok := extDB.getFromTagFiltersCache(tfKeyExtBuf.B)
|
||||||
if ok {
|
if ok {
|
||||||
extTSIDs = tsids
|
extTSIDs = tsids
|
||||||
return
|
return
|
||||||
|
@ -1723,7 +1722,7 @@ func (db *indexDB) searchTSIDs(tfss []*TagFilters, tr TimeRange, maxMetrics int,
|
||||||
extDB.putIndexSearch(is)
|
extDB.putIndexSearch(is)
|
||||||
|
|
||||||
sort.Slice(extTSIDs, func(i, j int) bool { return extTSIDs[i].Less(&extTSIDs[j]) })
|
sort.Slice(extTSIDs, func(i, j int) bool { return extTSIDs[i].Less(&extTSIDs[j]) })
|
||||||
extDB.putToTagCache(extTSIDs, tfKeyExtBuf.B)
|
extDB.putToTagFiltersCache(extTSIDs, tfKeyExtBuf.B)
|
||||||
}) {
|
}) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -1738,7 +1737,7 @@ func (db *indexDB) searchTSIDs(tfss []*TagFilters, tr TimeRange, maxMetrics int,
|
||||||
sort.Slice(tsids, func(i, j int) bool { return tsids[i].Less(&tsids[j]) })
|
sort.Slice(tsids, func(i, j int) bool { return tsids[i].Less(&tsids[j]) })
|
||||||
|
|
||||||
// Store TSIDs in the cache.
|
// Store TSIDs in the cache.
|
||||||
db.putToTagCache(tsids, tfKeyBuf.B)
|
db.putToTagFiltersCache(tsids, tfKeyBuf.B)
|
||||||
|
|
||||||
return tsids, err
|
return tsids, err
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue