From 3449d563bdd86efbe7a43eba5388aa996ac752c1 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Mon, 22 Jan 2024 18:12:37 +0200 Subject: [PATCH] all: add up to 10% random jitter to the interval between periodic tasks performed by various components This should smooth CPU and RAM usage spikes related to these periodic tasks, by reducing the probability that multiple concurrent periodic tasks are performed at the same time. --- app/vmagent/remotewrite/client.go | 12 +++++---- app/vmagent/remotewrite/pendingseries.go | 4 ++- app/vmstorage/main.go | 7 +++-- lib/blockcache/blockcache.go | 8 ++++-- lib/logstorage/storage.go | 4 ++- lib/lrucache/lrucache.go | 4 ++- lib/mergeset/table.go | 7 +++-- .../discovery/kubernetes/api_watcher.go | 12 +++++---- lib/storage/partition.go | 10 ++++--- lib/storage/storage.go | 20 +++++++------- lib/storage/table.go | 7 +++-- lib/timeutil/timeutil.go | 19 +++++++++++++ lib/timeutil/timeutil_test.go | 27 +++++++++++++++++++ lib/workingsetcache/cache.go | 14 +++------- 14 files changed, 110 insertions(+), 45 deletions(-) create mode 100644 lib/timeutil/timeutil.go create mode 100644 lib/timeutil/timeutil_test.go diff --git a/app/vmagent/remotewrite/client.go b/app/vmagent/remotewrite/client.go index 370fa853a..d5d4baefb 100644 --- a/app/vmagent/remotewrite/client.go +++ b/app/vmagent/remotewrite/client.go @@ -18,6 +18,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" "github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/timeutil" "github.com/VictoriaMetrics/metrics" ) @@ -395,7 +396,8 @@ func (c *client) newRequest(url string, body []byte) (*http.Request, error) { // Otherwise it tries sending the block to remote storage indefinitely. func (c *client) sendBlockHTTP(block []byte) bool { c.rl.register(len(block), c.stopCh) - retryDuration := time.Second + maxRetryDuration := timeutil.AddJitterToDuration(time.Minute) + retryDuration := timeutil.AddJitterToDuration(time.Second) retriesCount := 0 again: @@ -405,8 +407,8 @@ again: if err != nil { c.errorsCount.Inc() retryDuration *= 2 - if retryDuration > time.Minute { - retryDuration = time.Minute + if retryDuration > maxRetryDuration { + retryDuration = maxRetryDuration } logger.Warnf("couldn't send a block with size %d bytes to %q: %s; re-sending the block in %.3f seconds", len(block), c.sanitizedURL, err, retryDuration.Seconds()) @@ -452,8 +454,8 @@ again: // Unexpected status code returned retriesCount++ retryDuration *= 2 - if retryDuration > time.Minute { - retryDuration = time.Minute + if retryDuration > maxRetryDuration { + retryDuration = maxRetryDuration } body, err := io.ReadAll(resp.Body) _ = resp.Body.Close() diff --git a/app/vmagent/remotewrite/pendingseries.go b/app/vmagent/remotewrite/pendingseries.go index 3d582f91f..5fa35f511 100644 --- a/app/vmagent/remotewrite/pendingseries.go +++ b/app/vmagent/remotewrite/pendingseries.go @@ -15,6 +15,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/persistentqueue" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/timeutil" "github.com/VictoriaMetrics/metrics" "github.com/golang/snappy" ) @@ -69,7 +70,8 @@ func (ps *pendingSeries) periodicFlusher() { if flushSeconds <= 0 { flushSeconds = 1 } - ticker := time.NewTicker(*flushInterval) + d := timeutil.AddJitterToDuration(*flushInterval) + ticker := time.NewTicker(d) defer ticker.Stop() for { select { diff --git a/app/vmstorage/main.go b/app/vmstorage/main.go index 1c182532d..cca39859c 100644 --- a/app/vmstorage/main.go +++ b/app/vmstorage/main.go @@ -10,6 +10,8 @@ import ( "sync" "time" + "github.com/VictoriaMetrics/metrics" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" @@ -20,7 +22,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/querytracer" "github.com/VictoriaMetrics/VictoriaMetrics/lib/storage" "github.com/VictoriaMetrics/VictoriaMetrics/lib/syncwg" - "github.com/VictoriaMetrics/metrics" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/timeutil" ) var ( @@ -400,7 +402,8 @@ func initStaleSnapshotsRemover(strg *storage.Storage) { staleSnapshotsRemoverWG.Add(1) go func() { defer staleSnapshotsRemoverWG.Done() - t := time.NewTicker(11 * time.Second) + d := timeutil.AddJitterToDuration(time.Second * 11) + t := time.NewTicker(d) defer t.Stop() for { select { diff --git a/lib/blockcache/blockcache.go b/lib/blockcache/blockcache.go index fecb62b97..9192ffd70 100644 --- a/lib/blockcache/blockcache.go +++ b/lib/blockcache/blockcache.go @@ -10,6 +10,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/timeutil" "github.com/cespare/xxhash/v2" ) @@ -138,9 +139,12 @@ func (c *Cache) Misses() uint64 { } func (c *Cache) cleaner() { - ticker := time.NewTicker(57 * time.Second) + d := timeutil.AddJitterToDuration(time.Minute) + ticker := time.NewTicker(d) defer ticker.Stop() - perKeyMissesTicker := time.NewTicker(3 * time.Minute) + + d = timeutil.AddJitterToDuration(time.Minute * 3) + perKeyMissesTicker := time.NewTicker(d) defer perKeyMissesTicker.Stop() for { select { diff --git a/lib/logstorage/storage.go b/lib/logstorage/storage.go index 2c7b0e25f..570111003 100644 --- a/lib/logstorage/storage.go +++ b/lib/logstorage/storage.go @@ -11,6 +11,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/fs" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/memory" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/timeutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/workingsetcache" ) @@ -318,7 +319,8 @@ func (s *Storage) runRetentionWatcher() { } func (s *Storage) watchRetention() { - ticker := time.NewTicker(time.Hour) + d := timeutil.AddJitterToDuration(time.Hour) + ticker := time.NewTicker(d) defer ticker.Stop() for { var ptwsToDelete []*partitionWrapper diff --git a/lib/lrucache/lrucache.go b/lib/lrucache/lrucache.go index 0c5acbcff..3d545cb4a 100644 --- a/lib/lrucache/lrucache.go +++ b/lib/lrucache/lrucache.go @@ -9,6 +9,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/timeutil" "github.com/cespare/xxhash/v2" ) @@ -127,7 +128,8 @@ func (c *Cache) Misses() uint64 { } func (c *Cache) cleaner() { - ticker := time.NewTicker(53 * time.Second) + d := timeutil.AddJitterToDuration(time.Second * 53) + ticker := time.NewTicker(d) defer ticker.Stop() for { select { diff --git a/lib/mergeset/table.go b/lib/mergeset/table.go index 3fa5e48c5..6cf669615 100644 --- a/lib/mergeset/table.go +++ b/lib/mergeset/table.go @@ -19,6 +19,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/memory" "github.com/VictoriaMetrics/VictoriaMetrics/lib/syncwg" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/timeutil" ) // maxInmemoryParts is the maximum number of inmemory parts in the table. @@ -353,7 +354,8 @@ func MustOpenTable(path string, flushCallback func(), prepareBlock PrepareBlockC go func() { // call flushCallback once per 10 seconds in order to improve the effectiveness of caches, // which are reset by the flushCallback. - tc := time.NewTicker(10 * time.Second) + d := timeutil.AddJitterToDuration(time.Second * 10) + tc := time.NewTicker(d) for { select { case <-tb.stopCh: @@ -603,7 +605,8 @@ func (tb *Table) startPendingItemsFlusher() { } func (tb *Table) inmemoryPartsFlusher() { - ticker := time.NewTicker(dataFlushInterval) + d := timeutil.AddJitterToDuration(dataFlushInterval) + ticker := time.NewTicker(d) defer ticker.Stop() for { select { diff --git a/lib/promscrape/discovery/kubernetes/api_watcher.go b/lib/promscrape/discovery/kubernetes/api_watcher.go index a27ad3c5f..48544f264 100644 --- a/lib/promscrape/discovery/kubernetes/api_watcher.go +++ b/lib/promscrape/discovery/kubernetes/api_watcher.go @@ -24,6 +24,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils" "github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/timeutil" ) var ( @@ -581,7 +582,7 @@ func newURLWatcher(role, apiURL string, gw *groupWatcher) *urlWatcher { } func (uw *urlWatcher) recreateScrapeWorks() { - const minSleepTime = 5 * time.Second + minSleepTime := timeutil.AddJitterToDuration(5 * time.Second) sleepTime := minSleepTime gw := uw.gw stopCh := gw.ctx.Done() @@ -756,8 +757,9 @@ func (uw *urlWatcher) reloadObjects() string { func (uw *urlWatcher) watchForUpdates() { gw := uw.gw stopCh := gw.ctx.Done() - backoffDelay := time.Second - maxBackoffDelay := 30 * time.Second + minBackoffDelay := timeutil.AddJitterToDuration(time.Second) + maxBackoffDelay := timeutil.AddJitterToDuration(time.Second * 30) + backoffDelay := minBackoffDelay backoffSleep := func() { t := timerpool.Get(backoffDelay) select { @@ -802,7 +804,7 @@ func (uw *urlWatcher) watchForUpdates() { if resp.StatusCode != http.StatusOK { if resp.StatusCode == 410 { // There is no need for sleep on 410 error. See https://kubernetes.io/docs/reference/using-api/api-concepts/#410-gone-responses - backoffDelay = time.Second + backoffDelay = minBackoffDelay uw.staleResourceVersions.Inc() uw.resourceVersion = "" } else { @@ -813,7 +815,7 @@ func (uw *urlWatcher) watchForUpdates() { } continue } - backoffDelay = time.Second + backoffDelay = minBackoffDelay err = uw.readObjectUpdateStream(resp.Body) _ = resp.Body.Close() if err != nil { diff --git a/lib/storage/partition.go b/lib/storage/partition.go index 256c8fb90..acec2bb18 100644 --- a/lib/storage/partition.go +++ b/lib/storage/partition.go @@ -20,6 +20,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/memory" "github.com/VictoriaMetrics/VictoriaMetrics/lib/mergeset" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/timeutil" ) // The maximum size of big part. @@ -815,7 +816,8 @@ func (pt *partition) startPendingRowsFlusher() { } func (pt *partition) inmemoryPartsFlusher() { - ticker := time.NewTicker(dataFlushInterval) + d := timeutil.AddJitterToDuration(dataFlushInterval) + ticker := time.NewTicker(d) defer ticker.Stop() for { select { @@ -828,7 +830,8 @@ func (pt *partition) inmemoryPartsFlusher() { } func (pt *partition) pendingRowsFlusher() { - ticker := time.NewTicker(pendingRowsFlushInterval) + d := timeutil.AddJitterToDuration(pendingRowsFlushInterval) + ticker := time.NewTicker(d) defer ticker.Stop() var rows []rawRow for { @@ -1574,7 +1577,8 @@ func (pt *partition) startStalePartsRemover() { } func (pt *partition) stalePartsRemover() { - ticker := time.NewTicker(7 * time.Minute) + d := timeutil.AddJitterToDuration(7 * time.Minute) + ticker := time.NewTicker(d) defer ticker.Stop() for { select { diff --git a/lib/storage/storage.go b/lib/storage/storage.go index 9f0417da0..4302366d8 100644 --- a/lib/storage/storage.go +++ b/lib/storage/storage.go @@ -26,11 +26,11 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/memory" "github.com/VictoriaMetrics/VictoriaMetrics/lib/querytracer" "github.com/VictoriaMetrics/VictoriaMetrics/lib/snapshot" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/timeutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/uint64set" "github.com/VictoriaMetrics/VictoriaMetrics/lib/workingsetcache" "github.com/VictoriaMetrics/fastcache" "github.com/VictoriaMetrics/metricsql" - "github.com/valyala/fastrand" ) const ( @@ -671,7 +671,8 @@ func (s *Storage) startFreeDiskSpaceWatcher() { s.freeDiskSpaceWatcherWG.Add(1) go func() { defer s.freeDiskSpaceWatcherWG.Done() - ticker := time.NewTicker(time.Second) + d := timeutil.AddJitterToDuration(time.Second) + ticker := time.NewTicker(d) defer ticker.Stop() for { select { @@ -720,10 +721,9 @@ func (s *Storage) startNextDayMetricIDsUpdater() { }() } -var currHourMetricIDsUpdateInterval = time.Second * 10 - func (s *Storage) currHourMetricIDsUpdater() { - ticker := time.NewTicker(currHourMetricIDsUpdateInterval) + d := timeutil.AddJitterToDuration(time.Second * 10) + ticker := time.NewTicker(d) defer ticker.Stop() for { select { @@ -738,10 +738,9 @@ func (s *Storage) currHourMetricIDsUpdater() { } } -var nextDayMetricIDsUpdateInterval = time.Second * 11 - func (s *Storage) nextDayMetricIDsUpdater() { - ticker := time.NewTicker(nextDayMetricIDsUpdateInterval) + d := timeutil.AddJitterToDuration(time.Second * 11) + ticker := time.NewTicker(d) defer ticker.Stop() for { select { @@ -1214,9 +1213,8 @@ func (s *Storage) prefetchMetricNames(qt *querytracer.Tracer, srcMetricIDs []uin if fasttime.UnixTimestamp() > atomic.LoadUint64(&s.prefetchedMetricIDsDeadline) { // Periodically reset the prefetchedMetricIDs in order to limit its size. s.prefetchedMetricIDs = &uint64set.Set{} - const deadlineSec = 20 * 60 - jitterSec := fastrand.Uint32n(deadlineSec / 10) - metricIDsDeadline := fasttime.UnixTimestamp() + deadlineSec + uint64(jitterSec) + d := timeutil.AddJitterToDuration(time.Second * 20 * 60) + metricIDsDeadline := fasttime.UnixTimestamp() + uint64(d.Seconds()) atomic.StoreUint64(&s.prefetchedMetricIDsDeadline, metricIDsDeadline) } s.prefetchedMetricIDs.AddMulti(metricIDs) diff --git a/lib/storage/table.go b/lib/storage/table.go index 29ba9616b..d3b2efbf3 100644 --- a/lib/storage/table.go +++ b/lib/storage/table.go @@ -11,6 +11,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fs" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/timeutil" ) // table represents a single table with time series data. @@ -366,7 +367,8 @@ func (tb *table) startRetentionWatcher() { } func (tb *table) retentionWatcher() { - ticker := time.NewTicker(time.Minute) + d := timeutil.AddJitterToDuration(time.Minute) + ticker := time.NewTicker(d) defer ticker.Stop() for { select { @@ -433,7 +435,8 @@ func (tb *table) finalDedupWatcher() { } } } - t := time.NewTicker(time.Hour) + d := timeutil.AddJitterToDuration(time.Hour) + t := time.NewTicker(d) defer t.Stop() for { select { diff --git a/lib/timeutil/timeutil.go b/lib/timeutil/timeutil.go new file mode 100644 index 000000000..919bd0f46 --- /dev/null +++ b/lib/timeutil/timeutil.go @@ -0,0 +1,19 @@ +package timeutil + +import ( + "time" + + "github.com/valyala/fastrand" +) + +// AddJitterToDuration adds up to 10% random jitter to d and returns the resulting duration. +// +// The maximum jitter is limited by 10 seconds. +func AddJitterToDuration(d time.Duration) time.Duration { + dv := d / 10 + if dv > 10*time.Second { + dv = 10 * time.Second + } + p := float64(fastrand.Uint32()) / (1 << 32) + return d + time.Duration(p*float64(dv)) +} diff --git a/lib/timeutil/timeutil_test.go b/lib/timeutil/timeutil_test.go new file mode 100644 index 000000000..55032fb35 --- /dev/null +++ b/lib/timeutil/timeutil_test.go @@ -0,0 +1,27 @@ +package timeutil + +import ( + "testing" + "time" +) + +func TestAddJitterToDuration(t *testing.T) { + f := func(d time.Duration) { + t.Helper() + result := AddJitterToDuration(d) + if result < d { + t.Fatalf("unexpected negative jitter") + } + variance := (float64(result) - float64(d)) / float64(d) + if variance > 0.1 { + t.Fatalf("too big variance=%.2f for result=%s, d=%s; mustn't exceed 0.1", variance, result, d) + } + } + + f(time.Nanosecond) + f(time.Microsecond) + f(time.Millisecond) + f(time.Second) + f(time.Hour) + f(24 * time.Hour) +} diff --git a/lib/workingsetcache/cache.go b/lib/workingsetcache/cache.go index 74d54d81f..cc27fcb08 100644 --- a/lib/workingsetcache/cache.go +++ b/lib/workingsetcache/cache.go @@ -8,6 +8,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/timeutil" "github.com/VictoriaMetrics/fastcache" ) @@ -132,7 +133,7 @@ func (c *Cache) runWatchers(expireDuration time.Duration) { } func (c *Cache) expirationWatcher(expireDuration time.Duration) { - expireDuration += timeJitter(expireDuration / 10) + expireDuration = timeutil.AddJitterToDuration(expireDuration) t := time.NewTicker(expireDuration) defer t.Stop() for { @@ -170,8 +171,7 @@ func (c *Cache) prevCacheWatcher() { // Watch for the usage of the prev cache and drop it whenever it receives // less than prevCacheRemovalPercent requests comparing to the curr cache during the last 60 seconds. - checkInterval := 60 * time.Second - checkInterval += timeJitter(checkInterval / 10) + checkInterval := timeutil.AddJitterToDuration(time.Second * 60) t := time.NewTicker(checkInterval) defer t.Stop() prevGetCalls := uint64(0) @@ -216,8 +216,7 @@ func (c *Cache) prevCacheWatcher() { } func (c *Cache) cacheSizeWatcher() { - checkInterval := 1500 * time.Millisecond - checkInterval += timeJitter(checkInterval / 10) + checkInterval := timeutil.AddJitterToDuration(time.Millisecond * 1500) t := time.NewTicker(checkInterval) defer t.Stop() @@ -451,8 +450,3 @@ func (c *Cache) SetBig(key, value []byte) { curr := c.curr.Load() curr.SetBig(key, value) } - -func timeJitter(d time.Duration) time.Duration { - n := float64(time.Now().UnixNano()%1e9) / 1e9 - return time.Duration(float64(d) * n) -}