app/vmstorage: export vm_cache_size_max_bytes metrics for determining capacity of various caches

The vm_cache_size_max_bytes metric can be used for determining caches which reach their capacity via the following query:

   vm_cache_size_bytes / vm_cache_size_max_bytes > 0.9
This commit is contained in:
Aliaksandr Valialkin 2021-12-02 10:28:45 +02:00
parent 2f63dec2e3
commit 7275ebf91a
No known key found for this signature in database
GPG key ID: A72BEC6CD3D0DED1
13 changed files with 131 additions and 59 deletions

View file

@ -713,6 +713,31 @@ func registerStorageMetrics() {
return float64(m().PrefetchedMetricIDsSizeBytes)
})
metrics.NewGauge(`vm_cache_size_max_bytes{type="storage/tsid"}`, func() float64 {
return float64(m().TSIDCacheSizeMaxBytes)
})
metrics.NewGauge(`vm_cache_size_max_bytes{type="storage/metricIDs"}`, func() float64 {
return float64(m().MetricIDCacheSizeMaxBytes)
})
metrics.NewGauge(`vm_cache_size_max_bytes{type="storage/metricName"}`, func() float64 {
return float64(m().MetricNameCacheSizeMaxBytes)
})
metrics.NewGauge(`vm_cache_size_max_bytes{type="storage/bigIndexBlocks"}`, func() float64 {
return float64(tm().BigIndexBlocksCacheSizeMaxBytes)
})
metrics.NewGauge(`vm_cache_size_max_bytes{type="storage/smallIndexBlocks"}`, func() float64 {
return float64(tm().SmallIndexBlocksCacheSizeMaxBytes)
})
metrics.NewGauge(`vm_cache_size_max_bytes{type="indexdb/dataBlocks"}`, func() float64 {
return float64(idbm().DataBlocksCacheSizeMaxBytes)
})
metrics.NewGauge(`vm_cache_size_max_bytes{type="indexdb/indexBlocks"}`, func() float64 {
return float64(idbm().IndexBlocksCacheSizeMaxBytes)
})
metrics.NewGauge(`vm_cache_size_max_bytes{type="indexdb/tagFilters"}`, func() float64 {
return float64(idbm().TagFiltersCacheSizeMaxBytes)
})
metrics.NewGauge(`vm_cache_requests_total{type="storage/tsid"}`, func() float64 {
return float64(m().TSIDCacheRequests)
})

View file

@ -10,6 +10,7 @@ sort: 15
* FEATURE: vmauth: allow using optional `name` field in configs. This field is then used as `username` label value for `vmauth_user_requests_total` metric. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1805).
* FEATURE: vmagent: export `vm_persistentqueue_read_duration_seconds_total` and `vm_persistentqueue_write_duration_seconds_total` metrics, which can be used for detecting persistent queue saturation with `rate(vm_persistentqueue_write_duration_seconds_total) > 0.9` alerting rule.
* FEATURE: export `vm_filestream_read_duration_seconds_total` and `vm_filestream_write_duration_seconds_total` metrics, which can be used for detecting persistent disk saturation with `rate(vm_filestream_read_duration_seconds_total) > 0.9` alerting rule.
* FEATURE: export `vm_cache_size_max_bytes` metrics, which show capacity for various caches. These metrics can be used for determining caches reaches its capacity with `vm_cache_size_bytes / vm_cache_size_max_bytes > 0.9` query.
* FEATURE: [vmbackup](https://docs.victoriametrics.com/vmbackup.html), [vmrestore](https://docs.victoriametrics.com/vmrestore.html): add `-s3ForcePathStyle` command-line flag, which can be used for making backups to [Aliyun OSS](https://www.aliyun.com/product/oss). See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/1802).
* FEATURE: [vmctl](https://docs.victoriametrics.com/vmctl.html): improve data migration from OpenTSDB. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/1809). Thanks to @johnseekins .
* FEATURE: suppress `connection reset by peer` errors when remote client resets TCP connection to VictoriaMetrics / vmagent while ingesting the data via InfluxDB line protocol, Graphite protocol or OpenTSDB protocol. This error is expected, so there is no need in logging it.

2
go.mod
View file

@ -2,7 +2,7 @@ module github.com/VictoriaMetrics/VictoriaMetrics
require (
cloud.google.com/go/storage v1.18.2
github.com/VictoriaMetrics/fastcache v1.7.0
github.com/VictoriaMetrics/fastcache v1.8.0
// Do not use the original github.com/valyala/fasthttp because of issues
// like https://github.com/valyala/fasthttp/commit/996610f021ff45fdc98c2ce7884d5fa4e7f9199b

4
go.sum
View file

@ -101,8 +101,8 @@ github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578/go.mod h1:uGdko
github.com/SAP/go-hdb v0.14.1/go.mod h1:7fdQLVC2lER3urZLjZCm0AuMQfApof92n3aylBPEkMo=
github.com/Shopify/sarama v1.19.0/go.mod h1:FVkBWblsNy7DGZRfXLU0O9RCGt5g3g3yEuWXgklEdEo=
github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI=
github.com/VictoriaMetrics/fastcache v1.7.0 h1:E6GibaGI685TafrI7E/QqZPkMsOzRw+3gpICQx08ISg=
github.com/VictoriaMetrics/fastcache v1.7.0/go.mod h1:n7Sl+ioh/HlWeYHLSIBIE8TcZFHg/+xgvomWSS5xuEE=
github.com/VictoriaMetrics/fastcache v1.8.0 h1:ybZqS7kRy8YVzYsI09GLzQhs7iqS6cOEH2avtknD1SU=
github.com/VictoriaMetrics/fastcache v1.8.0/go.mod h1:n7Sl+ioh/HlWeYHLSIBIE8TcZFHg/+xgvomWSS5xuEE=
github.com/VictoriaMetrics/fasthttp v1.1.0 h1:3crd4YWHsMwu60GUXRH6OstowiFvqrwS4a/ueoLdLL0=
github.com/VictoriaMetrics/fasthttp v1.1.0/go.mod h1:/7DMcogqd+aaD3G3Hg5kFgoFwlR2uydjiWvoLp5ZTqQ=
github.com/VictoriaMetrics/metrics v1.12.2/go.mod h1:Z1tSfPfngDn12bTfZSCqArT3OPY3u88J12hSoOhuiRE=

View file

@ -296,6 +296,15 @@ func (idxbc *indexBlockCache) SizeBytes() uint64 {
return uint64(n)
}
func (idxbc *indexBlockCache) SizeMaxBytes() uint64 {
avgBlockSize := float64(64 * 1024)
blocksCount := idxbc.Len()
if blocksCount > 0 {
avgBlockSize = float64(idxbc.SizeBytes()) / float64(blocksCount)
}
return uint64(avgBlockSize * float64(getMaxCachedIndexBlocksPerPart()))
}
func (idxbc *indexBlockCache) Requests() uint64 {
return atomic.LoadUint64(&idxbc.requests)
}
@ -463,6 +472,15 @@ func (ibc *inmemoryBlockCache) SizeBytes() uint64 {
return uint64(n)
}
func (ibc *inmemoryBlockCache) SizeMaxBytes() uint64 {
avgBlockSize := float64(128 * 1024)
blocksCount := ibc.Len()
if blocksCount > 0 {
avgBlockSize = float64(ibc.SizeBytes()) / float64(blocksCount)
}
return uint64(avgBlockSize * float64(getMaxCachedInmemoryBlocksPerPart()))
}
func (ibc *inmemoryBlockCache) Requests() uint64 {
return atomic.LoadUint64(&ibc.requests)
}

View file

@ -410,15 +410,17 @@ type TableMetrics struct {
ItemsCount uint64
SizeBytes uint64
DataBlocksCacheSize uint64
DataBlocksCacheSizeBytes uint64
DataBlocksCacheRequests uint64
DataBlocksCacheMisses uint64
DataBlocksCacheSize uint64
DataBlocksCacheSizeBytes uint64
DataBlocksCacheSizeMaxBytes uint64
DataBlocksCacheRequests uint64
DataBlocksCacheMisses uint64
IndexBlocksCacheSize uint64
IndexBlocksCacheSizeBytes uint64
IndexBlocksCacheRequests uint64
IndexBlocksCacheMisses uint64
IndexBlocksCacheSize uint64
IndexBlocksCacheSizeBytes uint64
IndexBlocksCacheSizeMaxBytes uint64
IndexBlocksCacheRequests uint64
IndexBlocksCacheMisses uint64
PartsRefCount uint64
}
@ -443,11 +445,13 @@ func (tb *Table) UpdateMetrics(m *TableMetrics) {
m.DataBlocksCacheSize += p.ibCache.Len()
m.DataBlocksCacheSizeBytes += p.ibCache.SizeBytes()
m.DataBlocksCacheSizeMaxBytes += p.ibCache.SizeMaxBytes()
m.DataBlocksCacheRequests += p.ibCache.Requests()
m.DataBlocksCacheMisses += p.ibCache.Misses()
m.IndexBlocksCacheSize += p.idxbCache.Len()
m.IndexBlocksCacheSizeBytes += p.idxbCache.SizeBytes()
m.IndexBlocksCacheSizeMaxBytes += p.idxbCache.SizeMaxBytes()
m.IndexBlocksCacheRequests += p.idxbCache.Requests()
m.IndexBlocksCacheMisses += p.idxbCache.Misses()

View file

@ -130,10 +130,11 @@ const noDeadline = 1<<64 - 1
// IndexDBMetrics contains essential metrics for indexDB.
type IndexDBMetrics struct {
TagFiltersCacheSize uint64
TagFiltersCacheSizeBytes uint64
TagFiltersCacheRequests uint64
TagFiltersCacheMisses uint64
TagFiltersCacheSize uint64
TagFiltersCacheSizeBytes uint64
TagFiltersCacheSizeMaxBytes uint64
TagFiltersCacheRequests uint64
TagFiltersCacheMisses uint64
DeletedMetricsCount uint64
@ -173,6 +174,7 @@ func (db *indexDB) UpdateMetrics(m *IndexDBMetrics) {
db.tagFiltersCache.UpdateStats(&cs)
m.TagFiltersCacheSize += cs.EntriesCount
m.TagFiltersCacheSizeBytes += cs.BytesSize
m.TagFiltersCacheSizeMaxBytes += cs.MaxBytesSize
m.TagFiltersCacheRequests += cs.GetCalls
m.TagFiltersCacheMisses += cs.Misses

View file

@ -280,3 +280,12 @@ func (ibc *indexBlockCache) SizeBytes() uint64 {
ibc.mu.Unlock()
return uint64(n)
}
func (ibc *indexBlockCache) SizeMaxBytes() uint64 {
avgBlockSize := float64(64 * 1024)
blocksCount := ibc.Len()
if blocksCount > 0 {
avgBlockSize = float64(ibc.SizeBytes()) / float64(blocksCount)
}
return uint64(avgBlockSize * float64(getMaxCachedIndexBlocksPerPart()))
}

View file

@ -306,15 +306,17 @@ func newPartition(name, smallPartsPath, bigPartsPath string, getDeletedMetricIDs
type partitionMetrics struct {
PendingRows uint64
BigIndexBlocksCacheSize uint64
BigIndexBlocksCacheSizeBytes uint64
BigIndexBlocksCacheRequests uint64
BigIndexBlocksCacheMisses uint64
BigIndexBlocksCacheSize uint64
BigIndexBlocksCacheSizeBytes uint64
BigIndexBlocksCacheSizeMaxBytes uint64
BigIndexBlocksCacheRequests uint64
BigIndexBlocksCacheMisses uint64
SmallIndexBlocksCacheSize uint64
SmallIndexBlocksCacheSizeBytes uint64
SmallIndexBlocksCacheRequests uint64
SmallIndexBlocksCacheMisses uint64
SmallIndexBlocksCacheSize uint64
SmallIndexBlocksCacheSizeBytes uint64
SmallIndexBlocksCacheSizeMaxBytes uint64
SmallIndexBlocksCacheRequests uint64
SmallIndexBlocksCacheMisses uint64
BigSizeBytes uint64
SmallSizeBytes uint64
@ -362,6 +364,7 @@ func (pt *partition) UpdateMetrics(m *partitionMetrics) {
m.BigIndexBlocksCacheSize += p.ibCache.Len()
m.BigIndexBlocksCacheSizeBytes += p.ibCache.SizeBytes()
m.BigIndexBlocksCacheSizeMaxBytes += p.ibCache.SizeMaxBytes()
m.BigIndexBlocksCacheRequests += p.ibCache.Requests()
m.BigIndexBlocksCacheMisses += p.ibCache.Misses()
m.BigRowsCount += p.ph.RowsCount
@ -375,6 +378,7 @@ func (pt *partition) UpdateMetrics(m *partitionMetrics) {
m.SmallIndexBlocksCacheSize += p.ibCache.Len()
m.SmallIndexBlocksCacheSizeBytes += p.ibCache.SizeBytes()
m.SmallIndexBlocksCacheSizeMaxBytes += p.ibCache.SizeMaxBytes()
m.SmallIndexBlocksCacheRequests += p.ibCache.Requests()
m.SmallIndexBlocksCacheMisses += p.ibCache.Misses()
m.SmallRowsCount += p.ph.RowsCount

View file

@ -439,23 +439,26 @@ type Metrics struct {
TimestampsBlocksMerged uint64
TimestampsBytesSaved uint64
TSIDCacheSize uint64
TSIDCacheSizeBytes uint64
TSIDCacheRequests uint64
TSIDCacheMisses uint64
TSIDCacheCollisions uint64
TSIDCacheSize uint64
TSIDCacheSizeBytes uint64
TSIDCacheSizeMaxBytes uint64
TSIDCacheRequests uint64
TSIDCacheMisses uint64
TSIDCacheCollisions uint64
MetricIDCacheSize uint64
MetricIDCacheSizeBytes uint64
MetricIDCacheRequests uint64
MetricIDCacheMisses uint64
MetricIDCacheCollisions uint64
MetricIDCacheSize uint64
MetricIDCacheSizeBytes uint64
MetricIDCacheSizeMaxBytes uint64
MetricIDCacheRequests uint64
MetricIDCacheMisses uint64
MetricIDCacheCollisions uint64
MetricNameCacheSize uint64
MetricNameCacheSizeBytes uint64
MetricNameCacheRequests uint64
MetricNameCacheMisses uint64
MetricNameCacheCollisions uint64
MetricNameCacheSize uint64
MetricNameCacheSizeBytes uint64
MetricNameCacheSizeMaxBytes uint64
MetricNameCacheRequests uint64
MetricNameCacheMisses uint64
MetricNameCacheCollisions uint64
DateMetricIDCacheSize uint64
DateMetricIDCacheSizeBytes uint64
@ -515,6 +518,7 @@ func (s *Storage) UpdateMetrics(m *Metrics) {
s.tsidCache.UpdateStats(&cs)
m.TSIDCacheSize += cs.EntriesCount
m.TSIDCacheSizeBytes += cs.BytesSize
m.TSIDCacheSizeMaxBytes += cs.MaxBytesSize
m.TSIDCacheRequests += cs.GetCalls
m.TSIDCacheMisses += cs.Misses
m.TSIDCacheCollisions += cs.Collisions
@ -523,6 +527,7 @@ func (s *Storage) UpdateMetrics(m *Metrics) {
s.metricIDCache.UpdateStats(&cs)
m.MetricIDCacheSize += cs.EntriesCount
m.MetricIDCacheSizeBytes += cs.BytesSize
m.MetricIDCacheSizeMaxBytes += cs.MaxBytesSize
m.MetricIDCacheRequests += cs.GetCalls
m.MetricIDCacheMisses += cs.Misses
m.MetricIDCacheCollisions += cs.Collisions
@ -531,6 +536,7 @@ func (s *Storage) UpdateMetrics(m *Metrics) {
s.metricNameCache.UpdateStats(&cs)
m.MetricNameCacheSize += cs.EntriesCount
m.MetricNameCacheSizeBytes += cs.BytesSize
m.MetricNameCacheSizeMaxBytes += cs.MaxBytesSize
m.MetricNameCacheRequests += cs.GetCalls
m.MetricNameCacheMisses += cs.Misses
m.MetricNameCacheCollisions += cs.Collisions

View file

@ -21,9 +21,6 @@ const (
//
// The cache evicts inactive entries after the given expireDuration.
// Recently accessed entries survive expireDuration.
//
// Comparing to fastcache, this cache minimizes the required RAM size
// to values smaller than maxBytes.
type Cache struct {
curr atomic.Value
prev atomic.Value
@ -36,9 +33,6 @@ type Cache struct {
// After the process of switching, this flag will be set to whole.
mode uint32
// The maximum cache size in bytes.
maxBytes int
// mu serializes access to curr, prev and mode
// in expirationWatcher and cacheSizeWatcher.
mu sync.Mutex
@ -65,7 +59,7 @@ func Load(filePath string, maxBytes int, expireDuration time.Duration) *Cache {
// Try loading it again with maxBytes / 2 size.
curr := fastcache.New(maxBytes / 2)
prev := fastcache.LoadFromFileOrNew(filePath, maxBytes/2)
c := newCacheInternal(curr, prev, maxBytes, split)
c := newCacheInternal(curr, prev, split)
c.runWatchers(expireDuration)
return c
}
@ -74,24 +68,23 @@ func Load(filePath string, maxBytes int, expireDuration time.Duration) *Cache {
// Set its' mode to `whole`.
// There is no need in runWatchers call.
prev := fastcache.New(1024)
return newCacheInternal(curr, prev, maxBytes, whole)
return newCacheInternal(curr, prev, whole)
}
// New creates new cache with the given maxBytes capcity and the given expireDuration
// New creates new cache with the given maxBytes capacity and the given expireDuration
// for inactive entries.
//
// Stop must be called on the returned cache when it is no longer needed.
func New(maxBytes int, expireDuration time.Duration) *Cache {
curr := fastcache.New(maxBytes / 2)
prev := fastcache.New(1024)
c := newCacheInternal(curr, prev, maxBytes, split)
c := newCacheInternal(curr, prev, split)
c.runWatchers(expireDuration)
return c
}
func newCacheInternal(curr, prev *fastcache.Cache, maxBytes, mode int) *Cache {
func newCacheInternal(curr, prev *fastcache.Cache, mode int) *Cache {
var c Cache
c.maxBytes = maxBytes
c.curr.Store(curr)
c.prev.Store(prev)
c.stopCh = make(chan struct{})
@ -128,13 +121,15 @@ func (c *Cache) expirationWatcher(expireDuration time.Duration) {
c.mu.Unlock()
return
}
// Expire prev cache and create fresh curr cache with c.maxBytes/2 capacity.
// Do not reuse prev cache, since it can have too big capacity.
// Expire prev cache and create fresh curr cache with the same capacity.
// Do not reuse prev cache, since it can occupy too big amounts of memory.
prev := c.prev.Load().(*fastcache.Cache)
prev.Reset()
curr := c.curr.Load().(*fastcache.Cache)
var cs fastcache.Stats
curr.UpdateStats(&cs)
c.prev.Store(curr)
curr = fastcache.New(c.maxBytes / 2)
curr = fastcache.New(int(cs.MaxBytesSize))
c.curr.Store(curr)
c.mu.Unlock()
}
@ -144,6 +139,7 @@ func (c *Cache) cacheSizeWatcher() {
t := time.NewTicker(time.Minute)
defer t.Stop()
var maxBytesSize uint64
for {
select {
case <-c.stopCh:
@ -153,12 +149,13 @@ func (c *Cache) cacheSizeWatcher() {
var cs fastcache.Stats
curr := c.curr.Load().(*fastcache.Cache)
curr.UpdateStats(&cs)
if cs.BytesSize >= uint64(0.95*float64(c.maxBytes)/2) {
if cs.BytesSize >= uint64(0.9*float64(cs.MaxBytesSize)) {
maxBytesSize = cs.MaxBytesSize
break
}
}
// curr cache size exceeds 50% of its capacity. It is better
// curr cache size exceeds 90% of its capacity. It is better
// to double the size of curr cache and stop using prev cache,
// since this will result in higher summary cache capacity.
//
@ -166,7 +163,7 @@ func (c *Cache) cacheSizeWatcher() {
// 1) switch to mode=switching
// 2) move curr cache to prev
// 3) create curr with the double size
// 4) wait until curr size exceeds c.maxBytes/2, i.e. it is populated with new data
// 4) wait until curr size exceeds maxBytesSize, i.e. it is populated with new data
// 5) switch to mode=whole
// 6) drop prev
@ -176,7 +173,7 @@ func (c *Cache) cacheSizeWatcher() {
prev.Reset()
curr := c.curr.Load().(*fastcache.Cache)
c.prev.Store(curr)
c.curr.Store(fastcache.New(c.maxBytes))
c.curr.Store(fastcache.New(int(maxBytesSize * 2)))
c.mu.Unlock()
for {
@ -188,7 +185,7 @@ func (c *Cache) cacheSizeWatcher() {
var cs fastcache.Stats
curr := c.curr.Load().(*fastcache.Cache)
curr.UpdateStats(&cs)
if cs.BytesSize >= uint64(c.maxBytes)/2 {
if cs.BytesSize >= maxBytesSize {
break
}
}
@ -245,6 +242,7 @@ func (c *Cache) UpdateStats(fcs *fastcache.Stats) {
fcs.Corruptions += cs.Corruptions
fcs.EntriesCount += cs.EntriesCount
fcs.BytesSize += cs.BytesSize
fcs.MaxBytesSize += cs.MaxBytesSize
fcs.GetCalls += atomic.LoadUint64(&c.cs.GetCalls)
fcs.SetCalls += atomic.LoadUint64(&c.cs.SetCalls)
@ -255,6 +253,7 @@ func (c *Cache) UpdateStats(fcs *fastcache.Stats) {
prev.UpdateStats(&cs)
fcs.EntriesCount += cs.EntriesCount
fcs.BytesSize += cs.BytesSize
fcs.MaxBytesSize += cs.MaxBytesSize
}
// Get appends the found value for the given key to dst and returns the result.

View file

@ -53,6 +53,9 @@ type Stats struct {
// BytesSize is the current size of the cache in bytes.
BytesSize uint64
// MaxBytesSize is the maximum allowed size of the cache in bytes (aka capacity).
MaxBytesSize uint64
// BigStats contains stats for GetBig/SetBig methods.
BigStats
}
@ -296,6 +299,7 @@ func (b *bucket) UpdateStats(s *Stats) {
for _, chunk := range b.chunks {
s.BytesSize += uint64(cap(chunk))
}
s.MaxBytesSize += uint64(len(b.chunks))*chunkSize
b.mu.RUnlock()
}

2
vendor/modules.txt vendored
View file

@ -10,7 +10,7 @@ cloud.google.com/go/internal/version
## explicit
cloud.google.com/go/storage
cloud.google.com/go/storage/internal/apiv2
# github.com/VictoriaMetrics/fastcache v1.7.0
# github.com/VictoriaMetrics/fastcache v1.8.0
## explicit
github.com/VictoriaMetrics/fastcache
# github.com/VictoriaMetrics/fasthttp v1.1.0