From 347aaba79d44df3b21bef1f8f107352a133a635c Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin <valyala@gmail.com> Date: Thu, 13 Feb 2020 12:55:58 +0200 Subject: [PATCH] lib/{storage,mergeset}: use time.Ticker instead of time.Timer where appropriate It has been appeared that time.Timer was used in places where time.Ticker must be used instead. This could result in blocked goroutines as in the https://github.com/VictoriaMetrics/VictoriaMetrics/issues/316 . --- app/vminsert/netstorage/netstorage.go | 15 ++++++--------- lib/mergeset/part.go | 17 +++++++++-------- lib/mergeset/table.go | 9 ++++----- lib/storage/part.go | 11 ++++++----- lib/storage/partition.go | 23 +++++++++++------------ lib/storage/storage.go | 6 +++--- lib/storage/table.go | 6 +++--- 7 files changed, 42 insertions(+), 45 deletions(-) diff --git a/app/vminsert/netstorage/netstorage.go b/app/vminsert/netstorage/netstorage.go index 068d48f3af..66268a10fb 100644 --- a/app/vminsert/netstorage/netstorage.go +++ b/app/vminsert/netstorage/netstorage.go @@ -178,7 +178,8 @@ func (sn *storageNode) closeBrokenConn() { } func (sn *storageNode) run(stopCh <-chan struct{}) { - t := time.NewTimer(time.Second) + ticker := time.NewTicker(time.Second) + defer ticker.Stop() mustStop := false for !mustStop { select { @@ -186,7 +187,7 @@ func (sn *storageNode) run(stopCh <-chan struct{}) { mustStop = true // Make sure flushBufLocked is called last time before returning // in order to send the remaining bits of data. - case <-t.C: + case <-ticker.C: } sn.mu.Lock() @@ -195,14 +196,12 @@ func (sn *storageNode) run(stopCh <-chan struct{}) { logger.Errorf("cannot flush data to storageNode %q: %s", sn.dialer.Addr(), err) } sn.mu.Unlock() - - t.Reset(time.Second) } - t.Stop() } func rerouteWorker(stopCh <-chan struct{}) { - t := time.NewTimer(time.Second) + ticker := time.NewTicker(time.Second) + defer ticker.Stop() var buf []byte mustStop := false for !mustStop { @@ -211,7 +210,7 @@ func rerouteWorker(stopCh <-chan struct{}) { mustStop = true // Make sure spreadReroutedBufToStorageNodes is called last time before returning // in order to reroute the remaining data to healthy vmstorage nodes. - case <-t.C: + case <-ticker.C: } var err error @@ -220,9 +219,7 @@ func rerouteWorker(stopCh <-chan struct{}) { rerouteErrors.Inc() logger.Errorf("cannot reroute data among healthy vmstorage nodes: %s", err) } - t.Reset(time.Second) } - t.Stop() } // storageNode is a client sending data to vmstorage node. diff --git a/lib/mergeset/part.go b/lib/mergeset/part.go index 3d4c085c8b..a5231e58e2 100644 --- a/lib/mergeset/part.go +++ b/lib/mergeset/part.go @@ -216,13 +216,13 @@ func (idxbc *indexBlockCache) MustClose() { // cleaner periodically cleans least recently used items. func (idxbc *indexBlockCache) cleaner() { - t := time.NewTimer(5 * time.Second) + ticker := time.NewTicker(5 * time.Second) + defer ticker.Stop() for { select { - case <-t.C: + case <-ticker.C: idxbc.cleanByTimeout() case <-idxbc.cleanerStopCh: - t.Stop() return } } @@ -373,13 +373,13 @@ func (ibc *inmemoryBlockCache) MustClose() { // cleaner periodically cleans least recently used items. func (ibc *inmemoryBlockCache) cleaner() { - t := time.NewTimer(5 * time.Second) + ticker := time.NewTicker(5 * time.Second) + defer ticker.Stop() for { select { - case <-t.C: + case <-ticker.C: ibc.cleanByTimeout() case <-ibc.cleanerStopCh: - t.Stop() return } } @@ -468,8 +468,9 @@ func (ibc *inmemoryBlockCache) Misses() uint64 { func init() { go func() { - t := time.NewTimer(time.Second) - for tm := range t.C { + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + for tm := range ticker.C { t := uint64(tm.Unix()) atomic.StoreUint64(¤tTimestamp, t) } diff --git a/lib/mergeset/table.go b/lib/mergeset/table.go index 7a0c0df460..ea81ebd7c8 100644 --- a/lib/mergeset/table.go +++ b/lib/mergeset/table.go @@ -403,16 +403,15 @@ func (tb *Table) startRawItemsFlusher() { } func (tb *Table) rawItemsFlusher() { - t := time.NewTimer(rawItemsFlushInterval) + ticker := time.NewTicker(rawItemsFlushInterval) + defer ticker.Stop() for { select { case <-tb.stopCh: return - case <-t.C: - t.Reset(rawItemsFlushInterval) + case <-ticker.C: + tb.flushRawItems(false) } - - tb.flushRawItems(false) } } diff --git a/lib/storage/part.go b/lib/storage/part.go index e12ba82046..21071888a9 100644 --- a/lib/storage/part.go +++ b/lib/storage/part.go @@ -223,13 +223,13 @@ func (ibc *indexBlockCache) MustClose(isBig bool) { // cleaner periodically cleans least recently used items. func (ibc *indexBlockCache) cleaner() { - t := time.NewTimer(5 * time.Second) + ticker := time.NewTicker(5 * time.Second) + defer ticker.Stop() for { select { - case <-t.C: + case <-ticker.C: ibc.cleanByTimeout() case <-ibc.cleanerStopCh: - t.Stop() return } } @@ -317,8 +317,9 @@ func (ibc *indexBlockCache) Len() uint64 { func init() { go func() { - t := time.NewTimer(time.Second) - for tm := range t.C { + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + for tm := range ticker.C { t := uint64(tm.Unix()) atomic.StoreUint64(¤tTimestamp, t) } diff --git a/lib/storage/partition.go b/lib/storage/partition.go index feaad32bec..b9cf1b2c56 100644 --- a/lib/storage/partition.go +++ b/lib/storage/partition.go @@ -688,15 +688,15 @@ func (pt *partition) startRawRowsFlusher() { } func (pt *partition) rawRowsFlusher() { - t := time.NewTimer(rawRowsFlushInterval) + ticker := time.NewTicker(rawRowsFlushInterval) + defer ticker.Stop() for { select { case <-pt.stopCh: return - case <-t.C: - t.Reset(rawRowsFlushInterval) + case <-ticker.C: + pt.flushRawRows(false) } - pt.flushRawRows(false) } } @@ -736,20 +736,19 @@ func (pt *partition) startInmemoryPartsFlusher() { } func (pt *partition) inmemoryPartsFlusher() { - t := time.NewTimer(inmemoryPartsFlushInterval) + ticker := time.NewTicker(inmemoryPartsFlushInterval) + defer ticker.Stop() var pwsBuf []*partWrapper var err error for { select { case <-pt.stopCh: return - case <-t.C: - t.Reset(inmemoryPartsFlushInterval) - } - - pwsBuf, err = pt.flushInmemoryParts(pwsBuf[:0], false) - if err != nil { - logger.Panicf("FATAL: cannot flush inmemory parts: %s", err) + case <-ticker.C: + pwsBuf, err = pt.flushInmemoryParts(pwsBuf[:0], false) + if err != nil { + logger.Panicf("FATAL: cannot flush inmemory parts: %s", err) + } } } } diff --git a/lib/storage/storage.go b/lib/storage/storage.go index 13e89a106b..db2f4f5fb6 100644 --- a/lib/storage/storage.go +++ b/lib/storage/storage.go @@ -453,15 +453,15 @@ func (s *Storage) startCurrHourMetricIDsUpdater() { var currHourMetricIDsUpdateInterval = time.Second * 10 func (s *Storage) currHourMetricIDsUpdater() { - t := time.NewTimer(currHourMetricIDsUpdateInterval) + ticker := time.NewTicker(currHourMetricIDsUpdateInterval) + defer ticker.Stop() for { select { case <-s.stop: s.updateCurrHourMetricIDs() return - case <-t.C: + case <-ticker.C: s.updateCurrHourMetricIDs() - t.Reset(currHourMetricIDsUpdateInterval) } } } diff --git a/lib/storage/table.go b/lib/storage/table.go index 698b4569d6..19e3035c91 100644 --- a/lib/storage/table.go +++ b/lib/storage/table.go @@ -375,13 +375,13 @@ func (tb *table) startRetentionWatcher() { } func (tb *table) retentionWatcher() { - t := time.NewTimer(time.Minute) + ticker := time.NewTicker(time.Minute) + defer ticker.Stop() for { select { case <-tb.stop: return - case <-t.C: - t.Reset(time.Minute) + case <-ticker.C: } minTimestamp := timestampFromTime(time.Now()) - tb.retentionMilliseconds