From eceaf13e5e1eb5463770fff3ee19b0f3b51d9713 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Thu, 13 Feb 2020 12:55:58 +0200 Subject: [PATCH] lib/{storage,mergeset}: use time.Ticker instead of time.Timer where appropriate It has been appeared that time.Timer was used in places where time.Ticker must be used instead. This could result in blocked goroutines as in the https://github.com/VictoriaMetrics/VictoriaMetrics/issues/316 . --- lib/mergeset/part.go | 17 +++++++++-------- lib/mergeset/table.go | 9 ++++----- lib/storage/part.go | 11 ++++++----- lib/storage/partition.go | 23 +++++++++++------------ lib/storage/storage.go | 6 +++--- lib/storage/table.go | 6 +++--- 6 files changed, 36 insertions(+), 36 deletions(-) diff --git a/lib/mergeset/part.go b/lib/mergeset/part.go index 3d4c085c8b..a5231e58e2 100644 --- a/lib/mergeset/part.go +++ b/lib/mergeset/part.go @@ -216,13 +216,13 @@ func (idxbc *indexBlockCache) MustClose() { // cleaner periodically cleans least recently used items. func (idxbc *indexBlockCache) cleaner() { - t := time.NewTimer(5 * time.Second) + ticker := time.NewTicker(5 * time.Second) + defer ticker.Stop() for { select { - case <-t.C: + case <-ticker.C: idxbc.cleanByTimeout() case <-idxbc.cleanerStopCh: - t.Stop() return } } @@ -373,13 +373,13 @@ func (ibc *inmemoryBlockCache) MustClose() { // cleaner periodically cleans least recently used items. func (ibc *inmemoryBlockCache) cleaner() { - t := time.NewTimer(5 * time.Second) + ticker := time.NewTicker(5 * time.Second) + defer ticker.Stop() for { select { - case <-t.C: + case <-ticker.C: ibc.cleanByTimeout() case <-ibc.cleanerStopCh: - t.Stop() return } } @@ -468,8 +468,9 @@ func (ibc *inmemoryBlockCache) Misses() uint64 { func init() { go func() { - t := time.NewTimer(time.Second) - for tm := range t.C { + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + for tm := range ticker.C { t := uint64(tm.Unix()) atomic.StoreUint64(¤tTimestamp, t) } diff --git a/lib/mergeset/table.go b/lib/mergeset/table.go index 7a0c0df460..ea81ebd7c8 100644 --- a/lib/mergeset/table.go +++ b/lib/mergeset/table.go @@ -403,16 +403,15 @@ func (tb *Table) startRawItemsFlusher() { } func (tb *Table) rawItemsFlusher() { - t := time.NewTimer(rawItemsFlushInterval) + ticker := time.NewTicker(rawItemsFlushInterval) + defer ticker.Stop() for { select { case <-tb.stopCh: return - case <-t.C: - t.Reset(rawItemsFlushInterval) + case <-ticker.C: + tb.flushRawItems(false) } - - tb.flushRawItems(false) } } diff --git a/lib/storage/part.go b/lib/storage/part.go index e12ba82046..21071888a9 100644 --- a/lib/storage/part.go +++ b/lib/storage/part.go @@ -223,13 +223,13 @@ func (ibc *indexBlockCache) MustClose(isBig bool) { // cleaner periodically cleans least recently used items. func (ibc *indexBlockCache) cleaner() { - t := time.NewTimer(5 * time.Second) + ticker := time.NewTicker(5 * time.Second) + defer ticker.Stop() for { select { - case <-t.C: + case <-ticker.C: ibc.cleanByTimeout() case <-ibc.cleanerStopCh: - t.Stop() return } } @@ -317,8 +317,9 @@ func (ibc *indexBlockCache) Len() uint64 { func init() { go func() { - t := time.NewTimer(time.Second) - for tm := range t.C { + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + for tm := range ticker.C { t := uint64(tm.Unix()) atomic.StoreUint64(¤tTimestamp, t) } diff --git a/lib/storage/partition.go b/lib/storage/partition.go index feaad32bec..b9cf1b2c56 100644 --- a/lib/storage/partition.go +++ b/lib/storage/partition.go @@ -688,15 +688,15 @@ func (pt *partition) startRawRowsFlusher() { } func (pt *partition) rawRowsFlusher() { - t := time.NewTimer(rawRowsFlushInterval) + ticker := time.NewTicker(rawRowsFlushInterval) + defer ticker.Stop() for { select { case <-pt.stopCh: return - case <-t.C: - t.Reset(rawRowsFlushInterval) + case <-ticker.C: + pt.flushRawRows(false) } - pt.flushRawRows(false) } } @@ -736,20 +736,19 @@ func (pt *partition) startInmemoryPartsFlusher() { } func (pt *partition) inmemoryPartsFlusher() { - t := time.NewTimer(inmemoryPartsFlushInterval) + ticker := time.NewTicker(inmemoryPartsFlushInterval) + defer ticker.Stop() var pwsBuf []*partWrapper var err error for { select { case <-pt.stopCh: return - case <-t.C: - t.Reset(inmemoryPartsFlushInterval) - } - - pwsBuf, err = pt.flushInmemoryParts(pwsBuf[:0], false) - if err != nil { - logger.Panicf("FATAL: cannot flush inmemory parts: %s", err) + case <-ticker.C: + pwsBuf, err = pt.flushInmemoryParts(pwsBuf[:0], false) + if err != nil { + logger.Panicf("FATAL: cannot flush inmemory parts: %s", err) + } } } } diff --git a/lib/storage/storage.go b/lib/storage/storage.go index 0e5a4ae2c9..65c98a7098 100644 --- a/lib/storage/storage.go +++ b/lib/storage/storage.go @@ -443,15 +443,15 @@ func (s *Storage) startCurrHourMetricIDsUpdater() { var currHourMetricIDsUpdateInterval = time.Second * 10 func (s *Storage) currHourMetricIDsUpdater() { - t := time.NewTimer(currHourMetricIDsUpdateInterval) + ticker := time.NewTicker(currHourMetricIDsUpdateInterval) + defer ticker.Stop() for { select { case <-s.stop: s.updateCurrHourMetricIDs() return - case <-t.C: + case <-ticker.C: s.updateCurrHourMetricIDs() - t.Reset(currHourMetricIDsUpdateInterval) } } } diff --git a/lib/storage/table.go b/lib/storage/table.go index 698b4569d6..19e3035c91 100644 --- a/lib/storage/table.go +++ b/lib/storage/table.go @@ -375,13 +375,13 @@ func (tb *table) startRetentionWatcher() { } func (tb *table) retentionWatcher() { - t := time.NewTimer(time.Minute) + ticker := time.NewTicker(time.Minute) + defer ticker.Stop() for { select { case <-tb.stop: return - case <-t.C: - t.Reset(time.Minute) + case <-ticker.C: } minTimestamp := timestampFromTime(time.Now()) - tb.retentionMilliseconds