all: use workingsetcache instead of fastcache

This should reduce the amount of RAM required for processing time series
with non-zero churn rate.

The previous cache behavior can be restored with `-cache.oldBehavior` command-line flag.
This commit is contained in:
Aliaksandr Valialkin 2019-08-13 21:35:19 +03:00
parent 99c37c2c96
commit 09fc6e22e5
6 changed files with 278 additions and 62 deletions

View file

@ -4,7 +4,6 @@ import (
"crypto/rand"
"flag"
"fmt"
"runtime"
"sync"
"sync/atomic"
"time"
@ -12,6 +11,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/workingsetcache"
"github.com/VictoriaMetrics/fastcache"
"github.com/VictoriaMetrics/metrics"
)
@ -19,7 +19,7 @@ import (
var disableCache = flag.Bool("search.disableCache", false, "Whether to disable response caching. This may be useful during data backfilling")
var rollupResultCacheV = &rollupResultCache{
fastcache.New(1024 * 1024), // This is a cache for testing.
c: workingsetcache.New(1024*1024, time.Hour), // This is a cache for testing.
}
var rollupResultCachePath string
@ -43,15 +43,17 @@ var (
func InitRollupResultCache(cachePath string) {
rollupResultCachePath = cachePath
startTime := time.Now()
var c *fastcache.Cache
cacheSize := getRollupResultCacheSize()
var c *workingsetcache.Cache
if len(rollupResultCachePath) > 0 {
logger.Infof("loading rollupResult cache from %q...", rollupResultCachePath)
c = fastcache.LoadFromFileOrNew(rollupResultCachePath, getRollupResultCacheSize())
c = workingsetcache.Load(rollupResultCachePath, cacheSize, time.Hour)
} else {
c = fastcache.New(getRollupResultCacheSize())
c = workingsetcache.New(cacheSize, time.Hour)
}
if *disableCache {
c.Reset()
c.Stop()
c = nil
}
stats := &fastcache.Stats{}
@ -96,25 +98,28 @@ func InitRollupResultCache(cachePath string) {
// StopRollupResultCache closes the rollupResult cache.
func StopRollupResultCache() {
if len(rollupResultCachePath) == 0 {
rollupResultCacheV.c.Reset()
if !*disableCache {
rollupResultCacheV.c.Stop()
rollupResultCacheV.c = nil
}
return
}
gomaxprocs := runtime.GOMAXPROCS(-1)
logger.Infof("saving rollupResult cache to %q...", rollupResultCachePath)
startTime := time.Now()
if err := rollupResultCacheV.c.SaveToFileConcurrent(rollupResultCachePath, gomaxprocs); err != nil {
if err := rollupResultCacheV.c.Save(rollupResultCachePath); err != nil {
logger.Errorf("cannot close rollupResult cache at %q: %s", rollupResultCachePath, err)
} else {
var fcs fastcache.Stats
rollupResultCacheV.c.UpdateStats(&fcs)
rollupResultCacheV.c.Reset()
logger.Infof("saved rollupResult cache to %q in %s; entriesCount: %d, sizeBytes: %d",
rollupResultCachePath, time.Since(startTime), fcs.EntriesCount, fcs.BytesSize)
return
}
var fcs fastcache.Stats
rollupResultCacheV.c.UpdateStats(&fcs)
rollupResultCacheV.c.Stop()
rollupResultCacheV.c = nil
logger.Infof("saved rollupResult cache to %q in %s; entriesCount: %d, sizeBytes: %d",
rollupResultCachePath, time.Since(startTime), fcs.EntriesCount, fcs.BytesSize)
}
type rollupResultCache struct {
c *fastcache.Cache
c *workingsetcache.Cache
}
var rollupResultCacheResets = metrics.NewCounter(`vm_cache_resets_total{type="promql/rollupResult"}`)

View file

@ -18,6 +18,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/mergeset"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/workingsetcache"
"github.com/VictoriaMetrics/fastcache"
xxhash "github.com/cespare/xxhash/v2"
)
@ -52,17 +53,17 @@ type indexDB struct {
extDBLock sync.Mutex
// Cache for fast TagFilters -> TSIDs lookup.
tagCache *fastcache.Cache
tagCache *workingsetcache.Cache
// Cache for fast MetricID -> TSID lookup.
metricIDCache *fastcache.Cache
metricIDCache *workingsetcache.Cache
// Cache for fast MetricID -> MetricName lookup.
metricNameCache *fastcache.Cache
metricNameCache *workingsetcache.Cache
// Cache holding useless TagFilters entries, which have no tag filters
// matching low number of metrics.
uselessTagFiltersCache *fastcache.Cache
uselessTagFiltersCache *workingsetcache.Cache
indexSearchPool sync.Pool
@ -101,7 +102,7 @@ type indexDB struct {
}
// openIndexDB opens index db from the given path with the given caches.
func openIndexDB(path string, metricIDCache, metricNameCache *fastcache.Cache, currHourMetricIDs, prevHourMetricIDs *atomic.Value) (*indexDB, error) {
func openIndexDB(path string, metricIDCache, metricNameCache *workingsetcache.Cache, currHourMetricIDs, prevHourMetricIDs *atomic.Value) (*indexDB, error) {
if metricIDCache == nil {
logger.Panicf("BUG: metricIDCache must be non-nil")
}
@ -130,10 +131,10 @@ func openIndexDB(path string, metricIDCache, metricNameCache *fastcache.Cache, c
tb: tb,
name: name,
tagCache: fastcache.New(mem / 32),
tagCache: workingsetcache.New(mem/32, time.Hour),
metricIDCache: metricIDCache,
metricNameCache: metricNameCache,
uselessTagFiltersCache: fastcache.New(mem / 128),
uselessTagFiltersCache: workingsetcache.New(mem/128, time.Hour),
currHourMetricIDs: currHourMetricIDs,
prevHourMetricIDs: prevHourMetricIDs,
@ -273,8 +274,8 @@ func (db *indexDB) decRef() {
db.SetExtDB(nil)
// Free space occupied by caches owned by db.
db.tagCache.Reset()
db.uselessTagFiltersCache.Reset()
db.tagCache.Stop()
db.uselessTagFiltersCache.Stop()
db.tagCache = nil
db.metricIDCache = nil

View file

@ -12,7 +12,7 @@ import (
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/fastcache"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/workingsetcache"
)
func TestMarshalUnmarshalTSIDs(t *testing.T) {
@ -57,10 +57,10 @@ func TestMarshalUnmarshalTSIDs(t *testing.T) {
}
func TestIndexDBOpenClose(t *testing.T) {
metricIDCache := fastcache.New(1234)
metricNameCache := fastcache.New(1234)
defer metricIDCache.Reset()
defer metricNameCache.Reset()
metricIDCache := workingsetcache.New(1234, time.Hour)
metricNameCache := workingsetcache.New(1234, time.Hour)
defer metricIDCache.Stop()
defer metricNameCache.Stop()
var hmCurr atomic.Value
hmCurr.Store(&hourMetricIDs{})
@ -85,10 +85,10 @@ func TestIndexDB(t *testing.T) {
const metricGroups = 10
t.Run("serial", func(t *testing.T) {
metricIDCache := fastcache.New(1234)
metricNameCache := fastcache.New(1234)
defer metricIDCache.Reset()
defer metricNameCache.Reset()
metricIDCache := workingsetcache.New(1234, time.Hour)
metricNameCache := workingsetcache.New(1234, time.Hour)
defer metricIDCache.Stop()
defer metricNameCache.Stop()
var hmCurr atomic.Value
hmCurr.Store(&hourMetricIDs{})
@ -142,10 +142,10 @@ func TestIndexDB(t *testing.T) {
})
t.Run("concurrent", func(t *testing.T) {
metricIDCache := fastcache.New(1234)
metricNameCache := fastcache.New(1234)
defer metricIDCache.Reset()
defer metricNameCache.Reset()
metricIDCache := workingsetcache.New(1234, time.Hour)
metricNameCache := workingsetcache.New(1234, time.Hour)
defer metricIDCache.Stop()
defer metricNameCache.Stop()
var hmCurr atomic.Value
hmCurr.Store(&hourMetricIDs{})

View file

@ -6,17 +6,18 @@ import (
"strconv"
"sync/atomic"
"testing"
"time"
"github.com/VictoriaMetrics/fastcache"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/workingsetcache"
)
func BenchmarkIndexDBAddTSIDs(b *testing.B) {
const recordsPerLoop = 1e3
metricIDCache := fastcache.New(1234)
metricNameCache := fastcache.New(1234)
defer metricIDCache.Reset()
defer metricNameCache.Reset()
metricIDCache := workingsetcache.New(1234, time.Hour)
metricNameCache := workingsetcache.New(1234, time.Hour)
defer metricIDCache.Stop()
defer metricNameCache.Stop()
var hmCurr atomic.Value
hmCurr.Store(&hourMetricIDs{})
@ -79,10 +80,10 @@ func benchmarkIndexDBAddTSIDs(db *indexDB, tsid *TSID, mn *MetricName, startOffs
}
func BenchmarkIndexDBGetTSIDs(b *testing.B) {
metricIDCache := fastcache.New(1234)
metricNameCache := fastcache.New(1234)
defer metricIDCache.Reset()
defer metricNameCache.Reset()
metricIDCache := workingsetcache.New(1234, time.Hour)
metricNameCache := workingsetcache.New(1234, time.Hour)
defer metricIDCache.Stop()
defer metricNameCache.Stop()
var hmCurr atomic.Value
hmCurr.Store(&hourMetricIDs{})

View file

@ -20,6 +20,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/workingsetcache"
"github.com/VictoriaMetrics/fastcache"
)
@ -39,16 +40,16 @@ type Storage struct {
tb *table
// tsidCache is MetricName -> TSID cache.
tsidCache *fastcache.Cache
tsidCache *workingsetcache.Cache
// metricIDCache is MetricID -> TSID cache.
metricIDCache *fastcache.Cache
metricIDCache *workingsetcache.Cache
// metricNameCache is MetricID -> MetricName cache.
metricNameCache *fastcache.Cache
metricNameCache *workingsetcache.Cache
// dateMetricIDCache is (Date, MetricID) cache.
dateMetricIDCache *fastcache.Cache
dateMetricIDCache *workingsetcache.Cache
// Fast cache for MetricID values occured during the current hour.
currHourMetricIDs atomic.Value
@ -460,10 +461,10 @@ func (s *Storage) MustClose() {
s.idb().MustClose()
// Save caches.
s.mustSaveCache(s.tsidCache, "MetricName->TSID", "metricName_tsid")
s.mustSaveCache(s.metricIDCache, "MetricID->TSID", "metricID_tsid")
s.mustSaveCache(s.metricNameCache, "MetricID->MetricName", "metricID_metricName")
s.mustSaveCache(s.dateMetricIDCache, "Date->MetricID", "date_metricID")
s.mustSaveAndStopCache(s.tsidCache, "MetricName->TSID", "metricName_tsid")
s.mustSaveAndStopCache(s.metricIDCache, "MetricID->TSID", "metricID_tsid")
s.mustSaveAndStopCache(s.metricNameCache, "MetricID->MetricName", "metricID_metricName")
s.mustSaveAndStopCache(s.dateMetricIDCache, "Date->MetricID", "date_metricID")
hmCurr := s.currHourMetricIDs.Load().(*hourMetricIDs)
s.mustSaveHourMetricIDs(hmCurr, "curr_hour_metric_ids")
@ -542,11 +543,11 @@ func (s *Storage) mustSaveHourMetricIDs(hm *hourMetricIDs, name string) {
logger.Infof("saved %s to %q in %s; entriesCount: %d; sizeBytes: %d", name, path, time.Since(startTime), len(hm.m), len(dst))
}
func (s *Storage) mustLoadCache(info, name string, sizeBytes int) *fastcache.Cache {
func (s *Storage) mustLoadCache(info, name string, sizeBytes int) *workingsetcache.Cache {
path := s.cachePath + "/" + name
logger.Infof("loading %s cache from %q...", info, path)
startTime := time.Now()
c := fastcache.LoadFromFileOrNew(path, sizeBytes)
c := workingsetcache.Load(path, sizeBytes, time.Hour)
var cs fastcache.Stats
c.UpdateStats(&cs)
logger.Infof("loaded %s cache from %q in %s; entriesCount: %d; sizeBytes: %d",
@ -554,17 +555,16 @@ func (s *Storage) mustLoadCache(info, name string, sizeBytes int) *fastcache.Cac
return c
}
func (s *Storage) mustSaveCache(c *fastcache.Cache, info, name string) {
gomaxprocs := runtime.GOMAXPROCS(-1)
func (s *Storage) mustSaveAndStopCache(c *workingsetcache.Cache, info, name string) {
path := s.cachePath + "/" + name
logger.Infof("saving %s cache to %q...", info, path)
startTime := time.Now()
if err := c.SaveToFileConcurrent(path, gomaxprocs); err != nil {
if err := c.Save(path); err != nil {
logger.Panicf("FATAL: cannot save %s cache to %q: %s", info, path, err)
}
var cs fastcache.Stats
c.UpdateStats(&cs)
c.Reset()
c.Stop()
logger.Infof("saved %s cache to %q in %s; entriesCount: %d; sizeBytes: %d",
info, path, time.Since(startTime), cs.EntriesCount, cs.BytesSize)
}
@ -971,7 +971,7 @@ func (s *Storage) putTSIDToCache(tsid *TSID, metricName []byte) {
s.tsidCache.Set(metricName, buf)
}
func openIndexDBTables(path string, metricIDCache, metricNameCache *fastcache.Cache, currHourMetricIDs, prevHourMetricIDs *atomic.Value) (curr, prev *indexDB, err error) {
func openIndexDBTables(path string, metricIDCache, metricNameCache *workingsetcache.Cache, currHourMetricIDs, prevHourMetricIDs *atomic.Value) (curr, prev *indexDB, err error) {
if err := fs.MkdirAllIfNotExist(path); err != nil {
return nil, nil, fmt.Errorf("cannot create directory %q: %s", path, err)
}

View file

@ -0,0 +1,209 @@
package workingsetcache
import (
"flag"
"runtime"
"sync"
"sync/atomic"
"time"
"github.com/VictoriaMetrics/fastcache"
)
var oldBehavior = flag.Bool("cache.oldBehavior", false, "Whether to use old behaviour for caches. Old behavior can give better resuts "+
"for low-RAM systems serving big number of time series. Systems with enough RAM would consume more RAM when `-cache.oldBehavior` is enabled")
// Cache is a cache for working set entries.
//
// The cache evicts inactive entries after the given expireDuration.
// Recently accessed entries survive expireDuration.
//
// Comparing to fastcache, this cache minimizes the required RAM size
// to values smaller than maxBytes.
type Cache struct {
curr atomic.Value
prev atomic.Value
wg sync.WaitGroup
stopCh chan struct{}
misses uint64
}
// Load loads the cache from filePath and limits its size to maxBytes
// and evicts inactive entires after expireDuration.
//
// Stop must be called on the returned cache when it is no longer needed.
func Load(filePath string, maxBytes int, expireDuration time.Duration) *Cache {
if !*oldBehavior {
// Split maxBytes between curr and prev caches.
maxBytes /= 2
}
curr := fastcache.LoadFromFileOrNew(filePath, maxBytes)
return newWorkingSetCache(curr, maxBytes, expireDuration)
}
// New creates new cache with the given maxBytes size and the given expireDuration
// for inactive entries.
//
// Stop must be called on the returned cache when it is no longer needed.
func New(maxBytes int, expireDuration time.Duration) *Cache {
if !*oldBehavior {
// Split maxBytes between curr and prev caches.
maxBytes /= 2
}
curr := fastcache.New(maxBytes)
return newWorkingSetCache(curr, maxBytes, expireDuration)
}
func newWorkingSetCache(curr *fastcache.Cache, maxBytes int, expireDuration time.Duration) *Cache {
prev := fastcache.New(1024)
var c Cache
c.curr.Store(curr)
c.prev.Store(prev)
c.stopCh = make(chan struct{})
c.wg.Add(1)
go func() {
defer c.wg.Done()
t := time.NewTicker(expireDuration / 2)
for {
select {
case <-c.stopCh:
return
case <-t.C:
}
if *oldBehavior {
// Keep the curr cache for old behavior.
continue
}
// Do not reuse prev cache, since it can have too big capacity.
prev := c.prev.Load().(*fastcache.Cache)
prev.Reset()
curr := c.curr.Load().(*fastcache.Cache)
c.prev.Store(curr)
curr = fastcache.New(maxBytes)
c.curr.Store(curr)
}
}()
return &c
}
// Save safes the cache to filePath.
func (c *Cache) Save(filePath string) error {
curr := c.curr.Load().(*fastcache.Cache)
concurrency := runtime.GOMAXPROCS(-1)
return curr.SaveToFileConcurrent(filePath, concurrency)
}
// Stop stops the cache.
//
// The cache cannot be used after the Stop call.
func (c *Cache) Stop() {
close(c.stopCh)
c.wg.Wait()
c.Reset()
}
// Reset resets the cache.
func (c *Cache) Reset() {
prev := c.prev.Load().(*fastcache.Cache)
prev.Reset()
curr := c.curr.Load().(*fastcache.Cache)
curr.Reset()
c.misses = 0
}
// UpdateStats updates fcs with cache stats.
func (c *Cache) UpdateStats(fcs *fastcache.Stats) {
curr := c.curr.Load().(*fastcache.Cache)
fcsOrig := *fcs
curr.UpdateStats(fcs)
if *oldBehavior {
return
}
fcs.Misses = fcsOrig.Misses + atomic.LoadUint64(&c.misses)
fcsOrig.Reset()
prev := c.prev.Load().(*fastcache.Cache)
prev.UpdateStats(&fcsOrig)
fcs.EntriesCount += fcsOrig.EntriesCount
fcs.BytesSize += fcsOrig.BytesSize
}
// Get appends the found value for the given key to dst and returns the result.
func (c *Cache) Get(dst, key []byte) []byte {
curr := c.curr.Load().(*fastcache.Cache)
result := curr.Get(dst, key)
if len(result) > len(dst) {
// Fast path - the entry is found in the current cache.
return result
}
if *oldBehavior {
return result
}
// Search for the entry in the previous cache.
prev := c.prev.Load().(*fastcache.Cache)
result = prev.Get(dst, key)
if len(result) <= len(dst) {
// Nothing found.
atomic.AddUint64(&c.misses, 1)
return result
}
// Cache the found entry in the current cache.
curr.Set(key, result[len(dst):])
return result
}
// Has verifies whether the cahce contains the given key.
func (c *Cache) Has(key []byte) bool {
curr := c.curr.Load().(*fastcache.Cache)
if curr.Has(key) {
return true
}
if *oldBehavior {
return false
}
prev := c.prev.Load().(*fastcache.Cache)
return prev.Has(key)
}
// Set sets the given value for the given key.
func (c *Cache) Set(key, value []byte) {
curr := c.curr.Load().(*fastcache.Cache)
curr.Set(key, value)
}
// GetBig appends the found value for the given key to dst and returns the result.
func (c *Cache) GetBig(dst, key []byte) []byte {
curr := c.curr.Load().(*fastcache.Cache)
result := curr.GetBig(dst, key)
if len(result) > len(dst) {
// Fast path - the entry is found in the current cache.
return result
}
if *oldBehavior {
return result
}
// Search for the entry in the previous cache.
prev := c.prev.Load().(*fastcache.Cache)
result = prev.GetBig(dst, key)
if len(result) <= len(dst) {
// Nothing found.
atomic.AddUint64(&c.misses, 1)
return result
}
// Cache the found entry in the current cache.
curr.SetBig(key, result[len(dst):])
return result
}
// SetBig sets the given value for the given key.
func (c *Cache) SetBig(key, value []byte) {
curr := c.curr.Load().(*fastcache.Cache)
curr.SetBig(key, value)
}