diff --git a/app/vmselect/promql/rollup_result_cache.go b/app/vmselect/promql/rollup_result_cache.go index eabe7e30f..f33ed3e30 100644 --- a/app/vmselect/promql/rollup_result_cache.go +++ b/app/vmselect/promql/rollup_result_cache.go @@ -4,7 +4,6 @@ import ( "crypto/rand" "flag" "fmt" - "runtime" "sync" "sync/atomic" "time" @@ -12,6 +11,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/memory" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/workingsetcache" "github.com/VictoriaMetrics/fastcache" "github.com/VictoriaMetrics/metrics" ) @@ -19,7 +19,7 @@ import ( var disableCache = flag.Bool("search.disableCache", false, "Whether to disable response caching. This may be useful during data backfilling") var rollupResultCacheV = &rollupResultCache{ - fastcache.New(1024 * 1024), // This is a cache for testing. + c: workingsetcache.New(1024*1024, time.Hour), // This is a cache for testing. } var rollupResultCachePath string @@ -43,15 +43,17 @@ var ( func InitRollupResultCache(cachePath string) { rollupResultCachePath = cachePath startTime := time.Now() - var c *fastcache.Cache + cacheSize := getRollupResultCacheSize() + var c *workingsetcache.Cache if len(rollupResultCachePath) > 0 { logger.Infof("loading rollupResult cache from %q...", rollupResultCachePath) - c = fastcache.LoadFromFileOrNew(rollupResultCachePath, getRollupResultCacheSize()) + c = workingsetcache.Load(rollupResultCachePath, cacheSize, time.Hour) } else { - c = fastcache.New(getRollupResultCacheSize()) + c = workingsetcache.New(cacheSize, time.Hour) } if *disableCache { - c.Reset() + c.Stop() + c = nil } stats := &fastcache.Stats{} @@ -96,25 +98,28 @@ func InitRollupResultCache(cachePath string) { // StopRollupResultCache closes the rollupResult cache. func StopRollupResultCache() { if len(rollupResultCachePath) == 0 { - rollupResultCacheV.c.Reset() + if !*disableCache { + rollupResultCacheV.c.Stop() + rollupResultCacheV.c = nil + } return } - gomaxprocs := runtime.GOMAXPROCS(-1) logger.Infof("saving rollupResult cache to %q...", rollupResultCachePath) startTime := time.Now() - if err := rollupResultCacheV.c.SaveToFileConcurrent(rollupResultCachePath, gomaxprocs); err != nil { + if err := rollupResultCacheV.c.Save(rollupResultCachePath); err != nil { logger.Errorf("cannot close rollupResult cache at %q: %s", rollupResultCachePath, err) - } else { - var fcs fastcache.Stats - rollupResultCacheV.c.UpdateStats(&fcs) - rollupResultCacheV.c.Reset() - logger.Infof("saved rollupResult cache to %q in %s; entriesCount: %d, sizeBytes: %d", - rollupResultCachePath, time.Since(startTime), fcs.EntriesCount, fcs.BytesSize) + return } + var fcs fastcache.Stats + rollupResultCacheV.c.UpdateStats(&fcs) + rollupResultCacheV.c.Stop() + rollupResultCacheV.c = nil + logger.Infof("saved rollupResult cache to %q in %s; entriesCount: %d, sizeBytes: %d", + rollupResultCachePath, time.Since(startTime), fcs.EntriesCount, fcs.BytesSize) } type rollupResultCache struct { - c *fastcache.Cache + c *workingsetcache.Cache } var rollupResultCacheResets = metrics.NewCounter(`vm_cache_resets_total{type="promql/rollupResult"}`) diff --git a/lib/storage/index_db.go b/lib/storage/index_db.go index b6a6d6c61..3fb40e07d 100644 --- a/lib/storage/index_db.go +++ b/lib/storage/index_db.go @@ -18,6 +18,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/memory" "github.com/VictoriaMetrics/VictoriaMetrics/lib/mergeset" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/workingsetcache" "github.com/VictoriaMetrics/fastcache" xxhash "github.com/cespare/xxhash/v2" ) @@ -52,17 +53,17 @@ type indexDB struct { extDBLock sync.Mutex // Cache for fast TagFilters -> TSIDs lookup. - tagCache *fastcache.Cache + tagCache *workingsetcache.Cache // Cache for fast MetricID -> TSID lookup. - metricIDCache *fastcache.Cache + metricIDCache *workingsetcache.Cache // Cache for fast MetricID -> MetricName lookup. - metricNameCache *fastcache.Cache + metricNameCache *workingsetcache.Cache // Cache holding useless TagFilters entries, which have no tag filters // matching low number of metrics. - uselessTagFiltersCache *fastcache.Cache + uselessTagFiltersCache *workingsetcache.Cache indexSearchPool sync.Pool @@ -101,7 +102,7 @@ type indexDB struct { } // openIndexDB opens index db from the given path with the given caches. -func openIndexDB(path string, metricIDCache, metricNameCache *fastcache.Cache, currHourMetricIDs, prevHourMetricIDs *atomic.Value) (*indexDB, error) { +func openIndexDB(path string, metricIDCache, metricNameCache *workingsetcache.Cache, currHourMetricIDs, prevHourMetricIDs *atomic.Value) (*indexDB, error) { if metricIDCache == nil { logger.Panicf("BUG: metricIDCache must be non-nil") } @@ -130,10 +131,10 @@ func openIndexDB(path string, metricIDCache, metricNameCache *fastcache.Cache, c tb: tb, name: name, - tagCache: fastcache.New(mem / 32), + tagCache: workingsetcache.New(mem/32, time.Hour), metricIDCache: metricIDCache, metricNameCache: metricNameCache, - uselessTagFiltersCache: fastcache.New(mem / 128), + uselessTagFiltersCache: workingsetcache.New(mem/128, time.Hour), currHourMetricIDs: currHourMetricIDs, prevHourMetricIDs: prevHourMetricIDs, @@ -273,8 +274,8 @@ func (db *indexDB) decRef() { db.SetExtDB(nil) // Free space occupied by caches owned by db. - db.tagCache.Reset() - db.uselessTagFiltersCache.Reset() + db.tagCache.Stop() + db.uselessTagFiltersCache.Stop() db.tagCache = nil db.metricIDCache = nil diff --git a/lib/storage/index_db_test.go b/lib/storage/index_db_test.go index a85998805..744efe899 100644 --- a/lib/storage/index_db_test.go +++ b/lib/storage/index_db_test.go @@ -12,7 +12,7 @@ import ( "time" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" - "github.com/VictoriaMetrics/fastcache" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/workingsetcache" ) func TestMarshalUnmarshalTSIDs(t *testing.T) { @@ -57,10 +57,10 @@ func TestMarshalUnmarshalTSIDs(t *testing.T) { } func TestIndexDBOpenClose(t *testing.T) { - metricIDCache := fastcache.New(1234) - metricNameCache := fastcache.New(1234) - defer metricIDCache.Reset() - defer metricNameCache.Reset() + metricIDCache := workingsetcache.New(1234, time.Hour) + metricNameCache := workingsetcache.New(1234, time.Hour) + defer metricIDCache.Stop() + defer metricNameCache.Stop() var hmCurr atomic.Value hmCurr.Store(&hourMetricIDs{}) @@ -85,10 +85,10 @@ func TestIndexDB(t *testing.T) { const metricGroups = 10 t.Run("serial", func(t *testing.T) { - metricIDCache := fastcache.New(1234) - metricNameCache := fastcache.New(1234) - defer metricIDCache.Reset() - defer metricNameCache.Reset() + metricIDCache := workingsetcache.New(1234, time.Hour) + metricNameCache := workingsetcache.New(1234, time.Hour) + defer metricIDCache.Stop() + defer metricNameCache.Stop() var hmCurr atomic.Value hmCurr.Store(&hourMetricIDs{}) @@ -142,10 +142,10 @@ func TestIndexDB(t *testing.T) { }) t.Run("concurrent", func(t *testing.T) { - metricIDCache := fastcache.New(1234) - metricNameCache := fastcache.New(1234) - defer metricIDCache.Reset() - defer metricNameCache.Reset() + metricIDCache := workingsetcache.New(1234, time.Hour) + metricNameCache := workingsetcache.New(1234, time.Hour) + defer metricIDCache.Stop() + defer metricNameCache.Stop() var hmCurr atomic.Value hmCurr.Store(&hourMetricIDs{}) diff --git a/lib/storage/index_db_timing_test.go b/lib/storage/index_db_timing_test.go index 13c362454..759279b9b 100644 --- a/lib/storage/index_db_timing_test.go +++ b/lib/storage/index_db_timing_test.go @@ -6,17 +6,18 @@ import ( "strconv" "sync/atomic" "testing" + "time" - "github.com/VictoriaMetrics/fastcache" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/workingsetcache" ) func BenchmarkIndexDBAddTSIDs(b *testing.B) { const recordsPerLoop = 1e3 - metricIDCache := fastcache.New(1234) - metricNameCache := fastcache.New(1234) - defer metricIDCache.Reset() - defer metricNameCache.Reset() + metricIDCache := workingsetcache.New(1234, time.Hour) + metricNameCache := workingsetcache.New(1234, time.Hour) + defer metricIDCache.Stop() + defer metricNameCache.Stop() var hmCurr atomic.Value hmCurr.Store(&hourMetricIDs{}) @@ -79,10 +80,10 @@ func benchmarkIndexDBAddTSIDs(db *indexDB, tsid *TSID, mn *MetricName, startOffs } func BenchmarkIndexDBGetTSIDs(b *testing.B) { - metricIDCache := fastcache.New(1234) - metricNameCache := fastcache.New(1234) - defer metricIDCache.Reset() - defer metricNameCache.Reset() + metricIDCache := workingsetcache.New(1234, time.Hour) + metricNameCache := workingsetcache.New(1234, time.Hour) + defer metricIDCache.Stop() + defer metricNameCache.Stop() var hmCurr atomic.Value hmCurr.Store(&hourMetricIDs{}) diff --git a/lib/storage/storage.go b/lib/storage/storage.go index 84d6880be..2e8f3108a 100644 --- a/lib/storage/storage.go +++ b/lib/storage/storage.go @@ -20,6 +20,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/memory" "github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/workingsetcache" "github.com/VictoriaMetrics/fastcache" ) @@ -39,16 +40,16 @@ type Storage struct { tb *table // tsidCache is MetricName -> TSID cache. - tsidCache *fastcache.Cache + tsidCache *workingsetcache.Cache // metricIDCache is MetricID -> TSID cache. - metricIDCache *fastcache.Cache + metricIDCache *workingsetcache.Cache // metricNameCache is MetricID -> MetricName cache. - metricNameCache *fastcache.Cache + metricNameCache *workingsetcache.Cache // dateMetricIDCache is (Date, MetricID) cache. - dateMetricIDCache *fastcache.Cache + dateMetricIDCache *workingsetcache.Cache // Fast cache for MetricID values occured during the current hour. currHourMetricIDs atomic.Value @@ -460,10 +461,10 @@ func (s *Storage) MustClose() { s.idb().MustClose() // Save caches. - s.mustSaveCache(s.tsidCache, "MetricName->TSID", "metricName_tsid") - s.mustSaveCache(s.metricIDCache, "MetricID->TSID", "metricID_tsid") - s.mustSaveCache(s.metricNameCache, "MetricID->MetricName", "metricID_metricName") - s.mustSaveCache(s.dateMetricIDCache, "Date->MetricID", "date_metricID") + s.mustSaveAndStopCache(s.tsidCache, "MetricName->TSID", "metricName_tsid") + s.mustSaveAndStopCache(s.metricIDCache, "MetricID->TSID", "metricID_tsid") + s.mustSaveAndStopCache(s.metricNameCache, "MetricID->MetricName", "metricID_metricName") + s.mustSaveAndStopCache(s.dateMetricIDCache, "Date->MetricID", "date_metricID") hmCurr := s.currHourMetricIDs.Load().(*hourMetricIDs) s.mustSaveHourMetricIDs(hmCurr, "curr_hour_metric_ids") @@ -542,11 +543,11 @@ func (s *Storage) mustSaveHourMetricIDs(hm *hourMetricIDs, name string) { logger.Infof("saved %s to %q in %s; entriesCount: %d; sizeBytes: %d", name, path, time.Since(startTime), len(hm.m), len(dst)) } -func (s *Storage) mustLoadCache(info, name string, sizeBytes int) *fastcache.Cache { +func (s *Storage) mustLoadCache(info, name string, sizeBytes int) *workingsetcache.Cache { path := s.cachePath + "/" + name logger.Infof("loading %s cache from %q...", info, path) startTime := time.Now() - c := fastcache.LoadFromFileOrNew(path, sizeBytes) + c := workingsetcache.Load(path, sizeBytes, time.Hour) var cs fastcache.Stats c.UpdateStats(&cs) logger.Infof("loaded %s cache from %q in %s; entriesCount: %d; sizeBytes: %d", @@ -554,17 +555,16 @@ func (s *Storage) mustLoadCache(info, name string, sizeBytes int) *fastcache.Cac return c } -func (s *Storage) mustSaveCache(c *fastcache.Cache, info, name string) { - gomaxprocs := runtime.GOMAXPROCS(-1) +func (s *Storage) mustSaveAndStopCache(c *workingsetcache.Cache, info, name string) { path := s.cachePath + "/" + name logger.Infof("saving %s cache to %q...", info, path) startTime := time.Now() - if err := c.SaveToFileConcurrent(path, gomaxprocs); err != nil { + if err := c.Save(path); err != nil { logger.Panicf("FATAL: cannot save %s cache to %q: %s", info, path, err) } var cs fastcache.Stats c.UpdateStats(&cs) - c.Reset() + c.Stop() logger.Infof("saved %s cache to %q in %s; entriesCount: %d; sizeBytes: %d", info, path, time.Since(startTime), cs.EntriesCount, cs.BytesSize) } @@ -971,7 +971,7 @@ func (s *Storage) putTSIDToCache(tsid *TSID, metricName []byte) { s.tsidCache.Set(metricName, buf) } -func openIndexDBTables(path string, metricIDCache, metricNameCache *fastcache.Cache, currHourMetricIDs, prevHourMetricIDs *atomic.Value) (curr, prev *indexDB, err error) { +func openIndexDBTables(path string, metricIDCache, metricNameCache *workingsetcache.Cache, currHourMetricIDs, prevHourMetricIDs *atomic.Value) (curr, prev *indexDB, err error) { if err := fs.MkdirAllIfNotExist(path); err != nil { return nil, nil, fmt.Errorf("cannot create directory %q: %s", path, err) } diff --git a/lib/workingsetcache/cache.go b/lib/workingsetcache/cache.go new file mode 100644 index 000000000..8e6145563 --- /dev/null +++ b/lib/workingsetcache/cache.go @@ -0,0 +1,209 @@ +package workingsetcache + +import ( + "flag" + "runtime" + "sync" + "sync/atomic" + "time" + + "github.com/VictoriaMetrics/fastcache" +) + +var oldBehavior = flag.Bool("cache.oldBehavior", false, "Whether to use old behaviour for caches. Old behavior can give better resuts "+ + "for low-RAM systems serving big number of time series. Systems with enough RAM would consume more RAM when `-cache.oldBehavior` is enabled") + +// Cache is a cache for working set entries. +// +// The cache evicts inactive entries after the given expireDuration. +// Recently accessed entries survive expireDuration. +// +// Comparing to fastcache, this cache minimizes the required RAM size +// to values smaller than maxBytes. +type Cache struct { + curr atomic.Value + prev atomic.Value + + wg sync.WaitGroup + stopCh chan struct{} + + misses uint64 +} + +// Load loads the cache from filePath and limits its size to maxBytes +// and evicts inactive entires after expireDuration. +// +// Stop must be called on the returned cache when it is no longer needed. +func Load(filePath string, maxBytes int, expireDuration time.Duration) *Cache { + if !*oldBehavior { + // Split maxBytes between curr and prev caches. + maxBytes /= 2 + } + curr := fastcache.LoadFromFileOrNew(filePath, maxBytes) + return newWorkingSetCache(curr, maxBytes, expireDuration) +} + +// New creates new cache with the given maxBytes size and the given expireDuration +// for inactive entries. +// +// Stop must be called on the returned cache when it is no longer needed. +func New(maxBytes int, expireDuration time.Duration) *Cache { + if !*oldBehavior { + // Split maxBytes between curr and prev caches. + maxBytes /= 2 + } + curr := fastcache.New(maxBytes) + return newWorkingSetCache(curr, maxBytes, expireDuration) +} + +func newWorkingSetCache(curr *fastcache.Cache, maxBytes int, expireDuration time.Duration) *Cache { + prev := fastcache.New(1024) + var c Cache + c.curr.Store(curr) + c.prev.Store(prev) + c.stopCh = make(chan struct{}) + c.wg.Add(1) + go func() { + defer c.wg.Done() + t := time.NewTicker(expireDuration / 2) + for { + select { + case <-c.stopCh: + return + case <-t.C: + } + if *oldBehavior { + // Keep the curr cache for old behavior. + continue + } + + // Do not reuse prev cache, since it can have too big capacity. + prev := c.prev.Load().(*fastcache.Cache) + prev.Reset() + curr := c.curr.Load().(*fastcache.Cache) + c.prev.Store(curr) + curr = fastcache.New(maxBytes) + c.curr.Store(curr) + } + }() + return &c +} + +// Save safes the cache to filePath. +func (c *Cache) Save(filePath string) error { + curr := c.curr.Load().(*fastcache.Cache) + concurrency := runtime.GOMAXPROCS(-1) + return curr.SaveToFileConcurrent(filePath, concurrency) +} + +// Stop stops the cache. +// +// The cache cannot be used after the Stop call. +func (c *Cache) Stop() { + close(c.stopCh) + c.wg.Wait() + + c.Reset() +} + +// Reset resets the cache. +func (c *Cache) Reset() { + prev := c.prev.Load().(*fastcache.Cache) + prev.Reset() + curr := c.curr.Load().(*fastcache.Cache) + curr.Reset() + + c.misses = 0 +} + +// UpdateStats updates fcs with cache stats. +func (c *Cache) UpdateStats(fcs *fastcache.Stats) { + curr := c.curr.Load().(*fastcache.Cache) + fcsOrig := *fcs + curr.UpdateStats(fcs) + if *oldBehavior { + return + } + fcs.Misses = fcsOrig.Misses + atomic.LoadUint64(&c.misses) + + fcsOrig.Reset() + prev := c.prev.Load().(*fastcache.Cache) + prev.UpdateStats(&fcsOrig) + fcs.EntriesCount += fcsOrig.EntriesCount + fcs.BytesSize += fcsOrig.BytesSize +} + +// Get appends the found value for the given key to dst and returns the result. +func (c *Cache) Get(dst, key []byte) []byte { + curr := c.curr.Load().(*fastcache.Cache) + result := curr.Get(dst, key) + if len(result) > len(dst) { + // Fast path - the entry is found in the current cache. + return result + } + if *oldBehavior { + return result + } + + // Search for the entry in the previous cache. + prev := c.prev.Load().(*fastcache.Cache) + result = prev.Get(dst, key) + if len(result) <= len(dst) { + // Nothing found. + atomic.AddUint64(&c.misses, 1) + return result + } + // Cache the found entry in the current cache. + curr.Set(key, result[len(dst):]) + return result +} + +// Has verifies whether the cahce contains the given key. +func (c *Cache) Has(key []byte) bool { + curr := c.curr.Load().(*fastcache.Cache) + if curr.Has(key) { + return true + } + if *oldBehavior { + return false + } + prev := c.prev.Load().(*fastcache.Cache) + return prev.Has(key) +} + +// Set sets the given value for the given key. +func (c *Cache) Set(key, value []byte) { + curr := c.curr.Load().(*fastcache.Cache) + curr.Set(key, value) +} + +// GetBig appends the found value for the given key to dst and returns the result. +func (c *Cache) GetBig(dst, key []byte) []byte { + curr := c.curr.Load().(*fastcache.Cache) + result := curr.GetBig(dst, key) + if len(result) > len(dst) { + // Fast path - the entry is found in the current cache. + return result + } + if *oldBehavior { + return result + } + + // Search for the entry in the previous cache. + prev := c.prev.Load().(*fastcache.Cache) + result = prev.GetBig(dst, key) + if len(result) <= len(dst) { + // Nothing found. + atomic.AddUint64(&c.misses, 1) + return result + } + // Cache the found entry in the current cache. + curr.SetBig(key, result[len(dst):]) + return result +} + +// SetBig sets the given value for the given key. +func (c *Cache) SetBig(key, value []byte) { + curr := c.curr.Load().(*fastcache.Cache) + curr.SetBig(key, value) +}