lib/{storage,mergeset}: use time.Ticker instead of time.Timer where appropriate

It has been appeared that time.Timer was used in places where time.Ticker must be used instead.
This could result in blocked goroutines as in the https://github.com/VictoriaMetrics/VictoriaMetrics/issues/316 .
This commit is contained in:
Aliaksandr Valialkin 2020-02-13 12:55:58 +02:00
parent 6e0013ca39
commit 347aaba79d
7 changed files with 42 additions and 45 deletions

View file

@ -178,7 +178,8 @@ func (sn *storageNode) closeBrokenConn() {
}
func (sn *storageNode) run(stopCh <-chan struct{}) {
t := time.NewTimer(time.Second)
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
mustStop := false
for !mustStop {
select {
@ -186,7 +187,7 @@ func (sn *storageNode) run(stopCh <-chan struct{}) {
mustStop = true
// Make sure flushBufLocked is called last time before returning
// in order to send the remaining bits of data.
case <-t.C:
case <-ticker.C:
}
sn.mu.Lock()
@ -195,14 +196,12 @@ func (sn *storageNode) run(stopCh <-chan struct{}) {
logger.Errorf("cannot flush data to storageNode %q: %s", sn.dialer.Addr(), err)
}
sn.mu.Unlock()
t.Reset(time.Second)
}
t.Stop()
}
func rerouteWorker(stopCh <-chan struct{}) {
t := time.NewTimer(time.Second)
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
var buf []byte
mustStop := false
for !mustStop {
@ -211,7 +210,7 @@ func rerouteWorker(stopCh <-chan struct{}) {
mustStop = true
// Make sure spreadReroutedBufToStorageNodes is called last time before returning
// in order to reroute the remaining data to healthy vmstorage nodes.
case <-t.C:
case <-ticker.C:
}
var err error
@ -220,9 +219,7 @@ func rerouteWorker(stopCh <-chan struct{}) {
rerouteErrors.Inc()
logger.Errorf("cannot reroute data among healthy vmstorage nodes: %s", err)
}
t.Reset(time.Second)
}
t.Stop()
}
// storageNode is a client sending data to vmstorage node.

View file

@ -216,13 +216,13 @@ func (idxbc *indexBlockCache) MustClose() {
// cleaner periodically cleans least recently used items.
func (idxbc *indexBlockCache) cleaner() {
t := time.NewTimer(5 * time.Second)
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
for {
select {
case <-t.C:
case <-ticker.C:
idxbc.cleanByTimeout()
case <-idxbc.cleanerStopCh:
t.Stop()
return
}
}
@ -373,13 +373,13 @@ func (ibc *inmemoryBlockCache) MustClose() {
// cleaner periodically cleans least recently used items.
func (ibc *inmemoryBlockCache) cleaner() {
t := time.NewTimer(5 * time.Second)
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
for {
select {
case <-t.C:
case <-ticker.C:
ibc.cleanByTimeout()
case <-ibc.cleanerStopCh:
t.Stop()
return
}
}
@ -468,8 +468,9 @@ func (ibc *inmemoryBlockCache) Misses() uint64 {
func init() {
go func() {
t := time.NewTimer(time.Second)
for tm := range t.C {
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for tm := range ticker.C {
t := uint64(tm.Unix())
atomic.StoreUint64(&currentTimestamp, t)
}

View file

@ -403,16 +403,15 @@ func (tb *Table) startRawItemsFlusher() {
}
func (tb *Table) rawItemsFlusher() {
t := time.NewTimer(rawItemsFlushInterval)
ticker := time.NewTicker(rawItemsFlushInterval)
defer ticker.Stop()
for {
select {
case <-tb.stopCh:
return
case <-t.C:
t.Reset(rawItemsFlushInterval)
case <-ticker.C:
tb.flushRawItems(false)
}
tb.flushRawItems(false)
}
}

View file

@ -223,13 +223,13 @@ func (ibc *indexBlockCache) MustClose(isBig bool) {
// cleaner periodically cleans least recently used items.
func (ibc *indexBlockCache) cleaner() {
t := time.NewTimer(5 * time.Second)
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
for {
select {
case <-t.C:
case <-ticker.C:
ibc.cleanByTimeout()
case <-ibc.cleanerStopCh:
t.Stop()
return
}
}
@ -317,8 +317,9 @@ func (ibc *indexBlockCache) Len() uint64 {
func init() {
go func() {
t := time.NewTimer(time.Second)
for tm := range t.C {
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for tm := range ticker.C {
t := uint64(tm.Unix())
atomic.StoreUint64(&currentTimestamp, t)
}

View file

@ -688,15 +688,15 @@ func (pt *partition) startRawRowsFlusher() {
}
func (pt *partition) rawRowsFlusher() {
t := time.NewTimer(rawRowsFlushInterval)
ticker := time.NewTicker(rawRowsFlushInterval)
defer ticker.Stop()
for {
select {
case <-pt.stopCh:
return
case <-t.C:
t.Reset(rawRowsFlushInterval)
case <-ticker.C:
pt.flushRawRows(false)
}
pt.flushRawRows(false)
}
}
@ -736,20 +736,19 @@ func (pt *partition) startInmemoryPartsFlusher() {
}
func (pt *partition) inmemoryPartsFlusher() {
t := time.NewTimer(inmemoryPartsFlushInterval)
ticker := time.NewTicker(inmemoryPartsFlushInterval)
defer ticker.Stop()
var pwsBuf []*partWrapper
var err error
for {
select {
case <-pt.stopCh:
return
case <-t.C:
t.Reset(inmemoryPartsFlushInterval)
}
pwsBuf, err = pt.flushInmemoryParts(pwsBuf[:0], false)
if err != nil {
logger.Panicf("FATAL: cannot flush inmemory parts: %s", err)
case <-ticker.C:
pwsBuf, err = pt.flushInmemoryParts(pwsBuf[:0], false)
if err != nil {
logger.Panicf("FATAL: cannot flush inmemory parts: %s", err)
}
}
}
}

View file

@ -453,15 +453,15 @@ func (s *Storage) startCurrHourMetricIDsUpdater() {
var currHourMetricIDsUpdateInterval = time.Second * 10
func (s *Storage) currHourMetricIDsUpdater() {
t := time.NewTimer(currHourMetricIDsUpdateInterval)
ticker := time.NewTicker(currHourMetricIDsUpdateInterval)
defer ticker.Stop()
for {
select {
case <-s.stop:
s.updateCurrHourMetricIDs()
return
case <-t.C:
case <-ticker.C:
s.updateCurrHourMetricIDs()
t.Reset(currHourMetricIDsUpdateInterval)
}
}
}

View file

@ -375,13 +375,13 @@ func (tb *table) startRetentionWatcher() {
}
func (tb *table) retentionWatcher() {
t := time.NewTimer(time.Minute)
ticker := time.NewTicker(time.Minute)
defer ticker.Stop()
for {
select {
case <-tb.stop:
return
case <-t.C:
t.Reset(time.Minute)
case <-ticker.C:
}
minTimestamp := timestampFromTime(time.Now()) - tb.retentionMilliseconds