diff --git a/app/vmagent/remotewrite/client.go b/app/vmagent/remotewrite/client.go index 370fa853a..d5d4baefb 100644 --- a/app/vmagent/remotewrite/client.go +++ b/app/vmagent/remotewrite/client.go @@ -18,6 +18,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" "github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/timeutil" "github.com/VictoriaMetrics/metrics" ) @@ -395,7 +396,8 @@ func (c *client) newRequest(url string, body []byte) (*http.Request, error) { // Otherwise it tries sending the block to remote storage indefinitely. func (c *client) sendBlockHTTP(block []byte) bool { c.rl.register(len(block), c.stopCh) - retryDuration := time.Second + maxRetryDuration := timeutil.AddJitterToDuration(time.Minute) + retryDuration := timeutil.AddJitterToDuration(time.Second) retriesCount := 0 again: @@ -405,8 +407,8 @@ again: if err != nil { c.errorsCount.Inc() retryDuration *= 2 - if retryDuration > time.Minute { - retryDuration = time.Minute + if retryDuration > maxRetryDuration { + retryDuration = maxRetryDuration } logger.Warnf("couldn't send a block with size %d bytes to %q: %s; re-sending the block in %.3f seconds", len(block), c.sanitizedURL, err, retryDuration.Seconds()) @@ -452,8 +454,8 @@ again: // Unexpected status code returned retriesCount++ retryDuration *= 2 - if retryDuration > time.Minute { - retryDuration = time.Minute + if retryDuration > maxRetryDuration { + retryDuration = maxRetryDuration } body, err := io.ReadAll(resp.Body) _ = resp.Body.Close() diff --git a/app/vmagent/remotewrite/pendingseries.go b/app/vmagent/remotewrite/pendingseries.go index 3d582f91f..5fa35f511 100644 --- a/app/vmagent/remotewrite/pendingseries.go +++ b/app/vmagent/remotewrite/pendingseries.go @@ -15,6 +15,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/persistentqueue" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/timeutil" "github.com/VictoriaMetrics/metrics" "github.com/golang/snappy" ) @@ -69,7 +70,8 @@ func (ps *pendingSeries) periodicFlusher() { if flushSeconds <= 0 { flushSeconds = 1 } - ticker := time.NewTicker(*flushInterval) + d := timeutil.AddJitterToDuration(*flushInterval) + ticker := time.NewTicker(d) defer ticker.Stop() for { select { diff --git a/app/vmstorage/main.go b/app/vmstorage/main.go index 1c182532d..cca39859c 100644 --- a/app/vmstorage/main.go +++ b/app/vmstorage/main.go @@ -10,6 +10,8 @@ import ( "sync" "time" + "github.com/VictoriaMetrics/metrics" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" @@ -20,7 +22,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/querytracer" "github.com/VictoriaMetrics/VictoriaMetrics/lib/storage" "github.com/VictoriaMetrics/VictoriaMetrics/lib/syncwg" - "github.com/VictoriaMetrics/metrics" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/timeutil" ) var ( @@ -400,7 +402,8 @@ func initStaleSnapshotsRemover(strg *storage.Storage) { staleSnapshotsRemoverWG.Add(1) go func() { defer staleSnapshotsRemoverWG.Done() - t := time.NewTicker(11 * time.Second) + d := timeutil.AddJitterToDuration(time.Second * 11) + t := time.NewTicker(d) defer t.Stop() for { select { diff --git a/lib/blockcache/blockcache.go b/lib/blockcache/blockcache.go index fecb62b97..9192ffd70 100644 --- a/lib/blockcache/blockcache.go +++ b/lib/blockcache/blockcache.go @@ -10,6 +10,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/timeutil" "github.com/cespare/xxhash/v2" ) @@ -138,9 +139,12 @@ func (c *Cache) Misses() uint64 { } func (c *Cache) cleaner() { - ticker := time.NewTicker(57 * time.Second) + d := timeutil.AddJitterToDuration(time.Minute) + ticker := time.NewTicker(d) defer ticker.Stop() - perKeyMissesTicker := time.NewTicker(3 * time.Minute) + + d = timeutil.AddJitterToDuration(time.Minute * 3) + perKeyMissesTicker := time.NewTicker(d) defer perKeyMissesTicker.Stop() for { select { diff --git a/lib/logstorage/storage.go b/lib/logstorage/storage.go index 2c7b0e25f..570111003 100644 --- a/lib/logstorage/storage.go +++ b/lib/logstorage/storage.go @@ -11,6 +11,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/fs" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/memory" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/timeutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/workingsetcache" ) @@ -318,7 +319,8 @@ func (s *Storage) runRetentionWatcher() { } func (s *Storage) watchRetention() { - ticker := time.NewTicker(time.Hour) + d := timeutil.AddJitterToDuration(time.Hour) + ticker := time.NewTicker(d) defer ticker.Stop() for { var ptwsToDelete []*partitionWrapper diff --git a/lib/lrucache/lrucache.go b/lib/lrucache/lrucache.go index 0c5acbcff..3d545cb4a 100644 --- a/lib/lrucache/lrucache.go +++ b/lib/lrucache/lrucache.go @@ -9,6 +9,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/timeutil" "github.com/cespare/xxhash/v2" ) @@ -127,7 +128,8 @@ func (c *Cache) Misses() uint64 { } func (c *Cache) cleaner() { - ticker := time.NewTicker(53 * time.Second) + d := timeutil.AddJitterToDuration(time.Second * 53) + ticker := time.NewTicker(d) defer ticker.Stop() for { select { diff --git a/lib/mergeset/table.go b/lib/mergeset/table.go index 3fa5e48c5..6cf669615 100644 --- a/lib/mergeset/table.go +++ b/lib/mergeset/table.go @@ -19,6 +19,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/memory" "github.com/VictoriaMetrics/VictoriaMetrics/lib/syncwg" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/timeutil" ) // maxInmemoryParts is the maximum number of inmemory parts in the table. @@ -353,7 +354,8 @@ func MustOpenTable(path string, flushCallback func(), prepareBlock PrepareBlockC go func() { // call flushCallback once per 10 seconds in order to improve the effectiveness of caches, // which are reset by the flushCallback. - tc := time.NewTicker(10 * time.Second) + d := timeutil.AddJitterToDuration(time.Second * 10) + tc := time.NewTicker(d) for { select { case <-tb.stopCh: @@ -603,7 +605,8 @@ func (tb *Table) startPendingItemsFlusher() { } func (tb *Table) inmemoryPartsFlusher() { - ticker := time.NewTicker(dataFlushInterval) + d := timeutil.AddJitterToDuration(dataFlushInterval) + ticker := time.NewTicker(d) defer ticker.Stop() for { select { diff --git a/lib/promscrape/discovery/kubernetes/api_watcher.go b/lib/promscrape/discovery/kubernetes/api_watcher.go index a27ad3c5f..48544f264 100644 --- a/lib/promscrape/discovery/kubernetes/api_watcher.go +++ b/lib/promscrape/discovery/kubernetes/api_watcher.go @@ -24,6 +24,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils" "github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/timeutil" ) var ( @@ -581,7 +582,7 @@ func newURLWatcher(role, apiURL string, gw *groupWatcher) *urlWatcher { } func (uw *urlWatcher) recreateScrapeWorks() { - const minSleepTime = 5 * time.Second + minSleepTime := timeutil.AddJitterToDuration(5 * time.Second) sleepTime := minSleepTime gw := uw.gw stopCh := gw.ctx.Done() @@ -756,8 +757,9 @@ func (uw *urlWatcher) reloadObjects() string { func (uw *urlWatcher) watchForUpdates() { gw := uw.gw stopCh := gw.ctx.Done() - backoffDelay := time.Second - maxBackoffDelay := 30 * time.Second + minBackoffDelay := timeutil.AddJitterToDuration(time.Second) + maxBackoffDelay := timeutil.AddJitterToDuration(time.Second * 30) + backoffDelay := minBackoffDelay backoffSleep := func() { t := timerpool.Get(backoffDelay) select { @@ -802,7 +804,7 @@ func (uw *urlWatcher) watchForUpdates() { if resp.StatusCode != http.StatusOK { if resp.StatusCode == 410 { // There is no need for sleep on 410 error. See https://kubernetes.io/docs/reference/using-api/api-concepts/#410-gone-responses - backoffDelay = time.Second + backoffDelay = minBackoffDelay uw.staleResourceVersions.Inc() uw.resourceVersion = "" } else { @@ -813,7 +815,7 @@ func (uw *urlWatcher) watchForUpdates() { } continue } - backoffDelay = time.Second + backoffDelay = minBackoffDelay err = uw.readObjectUpdateStream(resp.Body) _ = resp.Body.Close() if err != nil { diff --git a/lib/storage/partition.go b/lib/storage/partition.go index 256c8fb90..acec2bb18 100644 --- a/lib/storage/partition.go +++ b/lib/storage/partition.go @@ -20,6 +20,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/memory" "github.com/VictoriaMetrics/VictoriaMetrics/lib/mergeset" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/timeutil" ) // The maximum size of big part. @@ -815,7 +816,8 @@ func (pt *partition) startPendingRowsFlusher() { } func (pt *partition) inmemoryPartsFlusher() { - ticker := time.NewTicker(dataFlushInterval) + d := timeutil.AddJitterToDuration(dataFlushInterval) + ticker := time.NewTicker(d) defer ticker.Stop() for { select { @@ -828,7 +830,8 @@ func (pt *partition) inmemoryPartsFlusher() { } func (pt *partition) pendingRowsFlusher() { - ticker := time.NewTicker(pendingRowsFlushInterval) + d := timeutil.AddJitterToDuration(pendingRowsFlushInterval) + ticker := time.NewTicker(d) defer ticker.Stop() var rows []rawRow for { @@ -1574,7 +1577,8 @@ func (pt *partition) startStalePartsRemover() { } func (pt *partition) stalePartsRemover() { - ticker := time.NewTicker(7 * time.Minute) + d := timeutil.AddJitterToDuration(7 * time.Minute) + ticker := time.NewTicker(d) defer ticker.Stop() for { select { diff --git a/lib/storage/storage.go b/lib/storage/storage.go index 9f0417da0..4302366d8 100644 --- a/lib/storage/storage.go +++ b/lib/storage/storage.go @@ -26,11 +26,11 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/memory" "github.com/VictoriaMetrics/VictoriaMetrics/lib/querytracer" "github.com/VictoriaMetrics/VictoriaMetrics/lib/snapshot" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/timeutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/uint64set" "github.com/VictoriaMetrics/VictoriaMetrics/lib/workingsetcache" "github.com/VictoriaMetrics/fastcache" "github.com/VictoriaMetrics/metricsql" - "github.com/valyala/fastrand" ) const ( @@ -671,7 +671,8 @@ func (s *Storage) startFreeDiskSpaceWatcher() { s.freeDiskSpaceWatcherWG.Add(1) go func() { defer s.freeDiskSpaceWatcherWG.Done() - ticker := time.NewTicker(time.Second) + d := timeutil.AddJitterToDuration(time.Second) + ticker := time.NewTicker(d) defer ticker.Stop() for { select { @@ -720,10 +721,9 @@ func (s *Storage) startNextDayMetricIDsUpdater() { }() } -var currHourMetricIDsUpdateInterval = time.Second * 10 - func (s *Storage) currHourMetricIDsUpdater() { - ticker := time.NewTicker(currHourMetricIDsUpdateInterval) + d := timeutil.AddJitterToDuration(time.Second * 10) + ticker := time.NewTicker(d) defer ticker.Stop() for { select { @@ -738,10 +738,9 @@ func (s *Storage) currHourMetricIDsUpdater() { } } -var nextDayMetricIDsUpdateInterval = time.Second * 11 - func (s *Storage) nextDayMetricIDsUpdater() { - ticker := time.NewTicker(nextDayMetricIDsUpdateInterval) + d := timeutil.AddJitterToDuration(time.Second * 11) + ticker := time.NewTicker(d) defer ticker.Stop() for { select { @@ -1214,9 +1213,8 @@ func (s *Storage) prefetchMetricNames(qt *querytracer.Tracer, srcMetricIDs []uin if fasttime.UnixTimestamp() > atomic.LoadUint64(&s.prefetchedMetricIDsDeadline) { // Periodically reset the prefetchedMetricIDs in order to limit its size. s.prefetchedMetricIDs = &uint64set.Set{} - const deadlineSec = 20 * 60 - jitterSec := fastrand.Uint32n(deadlineSec / 10) - metricIDsDeadline := fasttime.UnixTimestamp() + deadlineSec + uint64(jitterSec) + d := timeutil.AddJitterToDuration(time.Second * 20 * 60) + metricIDsDeadline := fasttime.UnixTimestamp() + uint64(d.Seconds()) atomic.StoreUint64(&s.prefetchedMetricIDsDeadline, metricIDsDeadline) } s.prefetchedMetricIDs.AddMulti(metricIDs) diff --git a/lib/storage/table.go b/lib/storage/table.go index 29ba9616b..d3b2efbf3 100644 --- a/lib/storage/table.go +++ b/lib/storage/table.go @@ -11,6 +11,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fs" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/timeutil" ) // table represents a single table with time series data. @@ -366,7 +367,8 @@ func (tb *table) startRetentionWatcher() { } func (tb *table) retentionWatcher() { - ticker := time.NewTicker(time.Minute) + d := timeutil.AddJitterToDuration(time.Minute) + ticker := time.NewTicker(d) defer ticker.Stop() for { select { @@ -433,7 +435,8 @@ func (tb *table) finalDedupWatcher() { } } } - t := time.NewTicker(time.Hour) + d := timeutil.AddJitterToDuration(time.Hour) + t := time.NewTicker(d) defer t.Stop() for { select { diff --git a/lib/timeutil/timeutil.go b/lib/timeutil/timeutil.go new file mode 100644 index 000000000..919bd0f46 --- /dev/null +++ b/lib/timeutil/timeutil.go @@ -0,0 +1,19 @@ +package timeutil + +import ( + "time" + + "github.com/valyala/fastrand" +) + +// AddJitterToDuration adds up to 10% random jitter to d and returns the resulting duration. +// +// The maximum jitter is limited by 10 seconds. +func AddJitterToDuration(d time.Duration) time.Duration { + dv := d / 10 + if dv > 10*time.Second { + dv = 10 * time.Second + } + p := float64(fastrand.Uint32()) / (1 << 32) + return d + time.Duration(p*float64(dv)) +} diff --git a/lib/timeutil/timeutil_test.go b/lib/timeutil/timeutil_test.go new file mode 100644 index 000000000..55032fb35 --- /dev/null +++ b/lib/timeutil/timeutil_test.go @@ -0,0 +1,27 @@ +package timeutil + +import ( + "testing" + "time" +) + +func TestAddJitterToDuration(t *testing.T) { + f := func(d time.Duration) { + t.Helper() + result := AddJitterToDuration(d) + if result < d { + t.Fatalf("unexpected negative jitter") + } + variance := (float64(result) - float64(d)) / float64(d) + if variance > 0.1 { + t.Fatalf("too big variance=%.2f for result=%s, d=%s; mustn't exceed 0.1", variance, result, d) + } + } + + f(time.Nanosecond) + f(time.Microsecond) + f(time.Millisecond) + f(time.Second) + f(time.Hour) + f(24 * time.Hour) +} diff --git a/lib/workingsetcache/cache.go b/lib/workingsetcache/cache.go index 74d54d81f..cc27fcb08 100644 --- a/lib/workingsetcache/cache.go +++ b/lib/workingsetcache/cache.go @@ -8,6 +8,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/timeutil" "github.com/VictoriaMetrics/fastcache" ) @@ -132,7 +133,7 @@ func (c *Cache) runWatchers(expireDuration time.Duration) { } func (c *Cache) expirationWatcher(expireDuration time.Duration) { - expireDuration += timeJitter(expireDuration / 10) + expireDuration = timeutil.AddJitterToDuration(expireDuration) t := time.NewTicker(expireDuration) defer t.Stop() for { @@ -170,8 +171,7 @@ func (c *Cache) prevCacheWatcher() { // Watch for the usage of the prev cache and drop it whenever it receives // less than prevCacheRemovalPercent requests comparing to the curr cache during the last 60 seconds. - checkInterval := 60 * time.Second - checkInterval += timeJitter(checkInterval / 10) + checkInterval := timeutil.AddJitterToDuration(time.Second * 60) t := time.NewTicker(checkInterval) defer t.Stop() prevGetCalls := uint64(0) @@ -216,8 +216,7 @@ func (c *Cache) prevCacheWatcher() { } func (c *Cache) cacheSizeWatcher() { - checkInterval := 1500 * time.Millisecond - checkInterval += timeJitter(checkInterval / 10) + checkInterval := timeutil.AddJitterToDuration(time.Millisecond * 1500) t := time.NewTicker(checkInterval) defer t.Stop() @@ -451,8 +450,3 @@ func (c *Cache) SetBig(key, value []byte) { curr := c.curr.Load() curr.SetBig(key, value) } - -func timeJitter(d time.Duration) time.Duration { - n := float64(time.Now().UnixNano()%1e9) / 1e9 - return time.Duration(float64(d) * n) -}