mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-10 15:14:09 +00:00
all: consistently use timers from timerpool
This commit is contained in:
parent
3149ac7a7e
commit
de3c662e8a
2 changed files with 14 additions and 8 deletions
|
@ -17,6 +17,7 @@ import (
|
|||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/persistentqueue"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool"
|
||||
"github.com/VictoriaMetrics/metrics"
|
||||
)
|
||||
|
||||
|
@ -238,12 +239,13 @@ again:
|
|||
}
|
||||
logger.Errorf("couldn't send a block with size %d bytes to %q: %s; re-sending the block in %.3f seconds",
|
||||
len(block), c.sanitizedURL, err, retryDuration.Seconds())
|
||||
t := time.NewTimer(retryDuration)
|
||||
t := timerpool.Get(retryDuration)
|
||||
select {
|
||||
case <-c.stopCh:
|
||||
t.Stop()
|
||||
timerpool.Put(t)
|
||||
return
|
||||
case <-t.C:
|
||||
timerpool.Put(t)
|
||||
}
|
||||
c.retriesCount.Inc()
|
||||
goto again
|
||||
|
@ -280,12 +282,13 @@ again:
|
|||
logger.Errorf("unexpected status code received after sending a block with size %d bytes to %q during retry #%d: %d; response body=%q; "+
|
||||
"re-sending the block in %.3f seconds", len(block), c.sanitizedURL, retriesCount, statusCode, body, retryDuration.Seconds())
|
||||
}
|
||||
t := time.NewTimer(retryDuration)
|
||||
t := timerpool.Get(retryDuration)
|
||||
select {
|
||||
case <-c.stopCh:
|
||||
t.Stop()
|
||||
timerpool.Put(t)
|
||||
return
|
||||
case <-t.C:
|
||||
timerpool.Put(t)
|
||||
}
|
||||
c.retriesCount.Inc()
|
||||
goto again
|
||||
|
@ -312,12 +315,13 @@ func (rl *rateLimiter) register(dataLen int, stopCh <-chan struct{}) {
|
|||
now := time.Now()
|
||||
if d := rl.deadline.Sub(now); d > 0 {
|
||||
rl.limitReached.Inc()
|
||||
t := time.NewTimer(retryDuration)
|
||||
t := timerpool.Get(d)
|
||||
select {
|
||||
case <-stopCh:
|
||||
t.Stop()
|
||||
timerpool.Put(t)
|
||||
return
|
||||
case <-t.C:
|
||||
timerpool.Put(t)
|
||||
}
|
||||
}
|
||||
rl.budget += limit
|
||||
|
|
|
@ -18,6 +18,7 @@ import (
|
|||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel"
|
||||
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/prometheus"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/proxy"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool"
|
||||
"github.com/VictoriaMetrics/metrics"
|
||||
xxhash "github.com/cespare/xxhash/v2"
|
||||
)
|
||||
|
@ -193,14 +194,15 @@ func (sw *scrapeWork) run(stopCh <-chan struct{}) {
|
|||
randSleep += uint64(scrapeInterval)
|
||||
}
|
||||
randSleep -= sleepOffset
|
||||
timer := time.NewTimer(time.Duration(randSleep))
|
||||
timer := timerpool.Get(time.Duration(randSleep))
|
||||
var timestamp int64
|
||||
var ticker *time.Ticker
|
||||
select {
|
||||
case <-stopCh:
|
||||
timer.Stop()
|
||||
timerpool.Put(timer)
|
||||
return
|
||||
case <-timer.C:
|
||||
timerpool.Put(timer)
|
||||
ticker = time.NewTicker(scrapeInterval)
|
||||
timestamp = time.Now().UnixNano() / 1e6
|
||||
sw.scrapeAndLogError(timestamp, timestamp)
|
||||
|
|
Loading…
Reference in a new issue