mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-02-19 15:30:17 +00:00
lib/storage: follow-up for 4b8088e377
- Clarify the bugfix description at docs/CHANGELOG.md - Simplify the code by accessing prefetchedMetricIDs struct under the lock instead of using lockless access to immutable struct. This shouldn't worsen code scalability too much on busy systems with many CPU cores, since the code executed under the lock is quite small and fast. This allows removing cloning of prefetchedMetricIDs struct every time new metric names are pre-fetched. This should reduce load on Go GC, since the cloning of uin64set.Set struct allocates many new objects.
This commit is contained in:
parent
30ff086f4d
commit
a2366e3b75
2 changed files with 18 additions and 21 deletions
|
@ -17,8 +17,8 @@ The following tip changes can be tested by building VictoriaMetrics components f
|
|||
|
||||
* BUGFIX: `vmstorage`: added missing `-inmemoryDataFlushInterval` command-line flag, which was missing in [VictoriaMetrics cluster](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html) after implementing [this feature](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3337) in [v1.85.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.85.0).
|
||||
* BUGFIX: [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html): properly handle queries, which wrap [rollup functions](https://docs.victoriametrics.com/MetricsQL.html#rollup-functions) with multiple arguments without explicitly specified lookbehind window in square brackets into [aggregate functions](https://docs.victoriametrics.com/MetricsQL.html#aggregate-functions). For example, `sum(quantile_over_time(0.5, process_resident_memory_bytes))` was resulting to `expecting at least 2 args to ...; got 1 args' error. Thanks to @atykhyy for [the pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5414).
|
||||
* BUGFIX: `vmstorage`: properly expire `storage/prefetchedMetricIDs` cache. Previously this cache was never expired, so it could grow big under [high churn rate](https://docs.victoriametrics.com/FAQ.html#what-is-high-churn-rate). This could result in increasing CPU load over time.
|
||||
* BUGFIX: [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html): properly return results from [bottomk](https://docs.victoriametrics.com/MetricsQL.html#bottomk) and `bottomk_*()` functions when some of these results contain NaN values. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5506). Thanks to @xiaozongyang for [the fix](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5509).
|
||||
* BUGFIX: `vmstorage`: properly check for `storage/prefetchedMetricIDs` cache expiration deadline. Before, this cache was limited only by size.
|
||||
|
||||
## [v1.87.12](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.87.12)
|
||||
|
||||
|
|
|
@ -100,14 +100,12 @@ type Storage struct {
|
|||
pendingNextDayMetricIDs *uint64set.Set
|
||||
|
||||
// prefetchedMetricIDs contains metricIDs for pre-fetched metricNames in the prefetchMetricNames function.
|
||||
prefetchedMetricIDs atomic.Value
|
||||
prefetchedMetricIDsLock sync.Mutex
|
||||
prefetchedMetricIDs *uint64set.Set
|
||||
|
||||
// prefetchedMetricIDsDeadline is used for periodic reset of prefetchedMetricIDs in order to limit its size under high rate of creating new series.
|
||||
prefetchedMetricIDsDeadline uint64
|
||||
|
||||
// prefetchedMetricIDsLock is used for serializing updates of prefetchedMetricIDs from concurrent goroutines.
|
||||
prefetchedMetricIDsLock sync.Mutex
|
||||
|
||||
stop chan struct{}
|
||||
|
||||
currHourMetricIDsUpdaterWG sync.WaitGroup
|
||||
|
@ -216,7 +214,7 @@ func OpenStorage(path string, retentionMsecs int64, maxHourlySeries, maxDailySer
|
|||
s.nextDayMetricIDs.Store(nextDayMetricIDs)
|
||||
s.pendingNextDayMetricIDs = &uint64set.Set{}
|
||||
|
||||
s.prefetchedMetricIDs.Store(&uint64set.Set{})
|
||||
s.prefetchedMetricIDs = &uint64set.Set{}
|
||||
|
||||
// Load metadata
|
||||
metadataDir := path + "/metadata"
|
||||
|
@ -583,9 +581,11 @@ func (s *Storage) UpdateMetrics(m *Metrics) {
|
|||
m.NextDayMetricIDCacheSize += uint64(nextDayMetricIDs.Len())
|
||||
m.NextDayMetricIDCacheSizeBytes += nextDayMetricIDs.SizeBytes()
|
||||
|
||||
prefetchedMetricIDs := s.prefetchedMetricIDs.Load().(*uint64set.Set)
|
||||
s.prefetchedMetricIDsLock.Lock()
|
||||
prefetchedMetricIDs := s.prefetchedMetricIDs
|
||||
m.PrefetchedMetricIDsSize += uint64(prefetchedMetricIDs.Len())
|
||||
m.PrefetchedMetricIDsSizeBytes += uint64(prefetchedMetricIDs.SizeBytes())
|
||||
s.prefetchedMetricIDsLock.Unlock()
|
||||
|
||||
m.NextRetentionSeconds = uint64(nextRetentionDuration(s.retentionMsecs).Seconds())
|
||||
|
||||
|
@ -1127,14 +1127,18 @@ func (s *Storage) prefetchMetricNames(qt *querytracer.Tracer, srcMetricIDs []uin
|
|||
qt.Printf("nothing to prefetch")
|
||||
return nil
|
||||
}
|
||||
|
||||
var metricIDs uint64Sorter
|
||||
prefetchedMetricIDs := s.prefetchedMetricIDs.Load().(*uint64set.Set)
|
||||
s.prefetchedMetricIDsLock.Lock()
|
||||
prefetchedMetricIDs := s.prefetchedMetricIDs
|
||||
for _, metricID := range srcMetricIDs {
|
||||
if prefetchedMetricIDs.Has(metricID) {
|
||||
continue
|
||||
}
|
||||
metricIDs = append(metricIDs, metricID)
|
||||
}
|
||||
s.prefetchedMetricIDsLock.Unlock()
|
||||
|
||||
qt.Printf("%d out of %d metric names must be pre-fetched", len(metricIDs), len(srcMetricIDs))
|
||||
if len(metricIDs) < 500 {
|
||||
// It is cheaper to skip pre-fetching and obtain metricNames inline.
|
||||
|
@ -1189,24 +1193,17 @@ func (s *Storage) prefetchMetricNames(qt *querytracer.Tracer, srcMetricIDs []uin
|
|||
|
||||
// Store the pre-fetched metricIDs, so they aren't pre-fetched next time.
|
||||
s.prefetchedMetricIDsLock.Lock()
|
||||
var prefetchedMetricIDsNew *uint64set.Set
|
||||
if fasttime.UnixTimestamp() > atomic.LoadUint64(&s.prefetchedMetricIDsDeadline) {
|
||||
// Periodically reset the prefetchedMetricIDs in order to limit its size.
|
||||
prefetchedMetricIDsNew = &uint64set.Set{}
|
||||
deadlineSec := 73 * 60
|
||||
jitterSec := fastrand.Uint32n(uint32(deadlineSec / 10))
|
||||
metricIDsDeadline := fasttime.UnixTimestamp() + uint64(deadlineSec) + uint64(jitterSec)
|
||||
s.prefetchedMetricIDs = &uint64set.Set{}
|
||||
const deadlineSec = 20 * 60
|
||||
jitterSec := fastrand.Uint32n(deadlineSec / 10)
|
||||
metricIDsDeadline := fasttime.UnixTimestamp() + deadlineSec + uint64(jitterSec)
|
||||
atomic.StoreUint64(&s.prefetchedMetricIDsDeadline, metricIDsDeadline)
|
||||
} else {
|
||||
prefetchedMetricIDsNew = prefetchedMetricIDs.Clone()
|
||||
}
|
||||
prefetchedMetricIDsNew.AddMulti(metricIDs)
|
||||
if prefetchedMetricIDsNew.SizeBytes() > uint64(memory.Allowed())/32 {
|
||||
// Reset prefetchedMetricIDsNew if it occupies too much space.
|
||||
prefetchedMetricIDsNew = &uint64set.Set{}
|
||||
}
|
||||
s.prefetchedMetricIDs.Store(prefetchedMetricIDsNew)
|
||||
s.prefetchedMetricIDs.AddMulti(metricIDs)
|
||||
s.prefetchedMetricIDsLock.Unlock()
|
||||
|
||||
qt.Printf("cache metric ids for pre-fetched metric names")
|
||||
return nil
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue