diff --git a/app/vminsert/netstorage/netstorage.go b/app/vminsert/netstorage/netstorage.go index 068d48f3af..66268a10fb 100644 --- a/app/vminsert/netstorage/netstorage.go +++ b/app/vminsert/netstorage/netstorage.go @@ -178,7 +178,8 @@ func (sn *storageNode) closeBrokenConn() { } func (sn *storageNode) run(stopCh <-chan struct{}) { - t := time.NewTimer(time.Second) + ticker := time.NewTicker(time.Second) + defer ticker.Stop() mustStop := false for !mustStop { select { @@ -186,7 +187,7 @@ func (sn *storageNode) run(stopCh <-chan struct{}) { mustStop = true // Make sure flushBufLocked is called last time before returning // in order to send the remaining bits of data. - case <-t.C: + case <-ticker.C: } sn.mu.Lock() @@ -195,14 +196,12 @@ func (sn *storageNode) run(stopCh <-chan struct{}) { logger.Errorf("cannot flush data to storageNode %q: %s", sn.dialer.Addr(), err) } sn.mu.Unlock() - - t.Reset(time.Second) } - t.Stop() } func rerouteWorker(stopCh <-chan struct{}) { - t := time.NewTimer(time.Second) + ticker := time.NewTicker(time.Second) + defer ticker.Stop() var buf []byte mustStop := false for !mustStop { @@ -211,7 +210,7 @@ func rerouteWorker(stopCh <-chan struct{}) { mustStop = true // Make sure spreadReroutedBufToStorageNodes is called last time before returning // in order to reroute the remaining data to healthy vmstorage nodes. - case <-t.C: + case <-ticker.C: } var err error @@ -220,9 +219,7 @@ func rerouteWorker(stopCh <-chan struct{}) { rerouteErrors.Inc() logger.Errorf("cannot reroute data among healthy vmstorage nodes: %s", err) } - t.Reset(time.Second) } - t.Stop() } // storageNode is a client sending data to vmstorage node. diff --git a/lib/mergeset/part.go b/lib/mergeset/part.go index 3d4c085c8b..a5231e58e2 100644 --- a/lib/mergeset/part.go +++ b/lib/mergeset/part.go @@ -216,13 +216,13 @@ func (idxbc *indexBlockCache) MustClose() { // cleaner periodically cleans least recently used items. func (idxbc *indexBlockCache) cleaner() { - t := time.NewTimer(5 * time.Second) + ticker := time.NewTicker(5 * time.Second) + defer ticker.Stop() for { select { - case <-t.C: + case <-ticker.C: idxbc.cleanByTimeout() case <-idxbc.cleanerStopCh: - t.Stop() return } } @@ -373,13 +373,13 @@ func (ibc *inmemoryBlockCache) MustClose() { // cleaner periodically cleans least recently used items. func (ibc *inmemoryBlockCache) cleaner() { - t := time.NewTimer(5 * time.Second) + ticker := time.NewTicker(5 * time.Second) + defer ticker.Stop() for { select { - case <-t.C: + case <-ticker.C: ibc.cleanByTimeout() case <-ibc.cleanerStopCh: - t.Stop() return } } @@ -468,8 +468,9 @@ func (ibc *inmemoryBlockCache) Misses() uint64 { func init() { go func() { - t := time.NewTimer(time.Second) - for tm := range t.C { + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + for tm := range ticker.C { t := uint64(tm.Unix()) atomic.StoreUint64(¤tTimestamp, t) } diff --git a/lib/mergeset/table.go b/lib/mergeset/table.go index 7a0c0df460..ea81ebd7c8 100644 --- a/lib/mergeset/table.go +++ b/lib/mergeset/table.go @@ -403,16 +403,15 @@ func (tb *Table) startRawItemsFlusher() { } func (tb *Table) rawItemsFlusher() { - t := time.NewTimer(rawItemsFlushInterval) + ticker := time.NewTicker(rawItemsFlushInterval) + defer ticker.Stop() for { select { case <-tb.stopCh: return - case <-t.C: - t.Reset(rawItemsFlushInterval) + case <-ticker.C: + tb.flushRawItems(false) } - - tb.flushRawItems(false) } } diff --git a/lib/storage/part.go b/lib/storage/part.go index e12ba82046..21071888a9 100644 --- a/lib/storage/part.go +++ b/lib/storage/part.go @@ -223,13 +223,13 @@ func (ibc *indexBlockCache) MustClose(isBig bool) { // cleaner periodically cleans least recently used items. func (ibc *indexBlockCache) cleaner() { - t := time.NewTimer(5 * time.Second) + ticker := time.NewTicker(5 * time.Second) + defer ticker.Stop() for { select { - case <-t.C: + case <-ticker.C: ibc.cleanByTimeout() case <-ibc.cleanerStopCh: - t.Stop() return } } @@ -317,8 +317,9 @@ func (ibc *indexBlockCache) Len() uint64 { func init() { go func() { - t := time.NewTimer(time.Second) - for tm := range t.C { + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + for tm := range ticker.C { t := uint64(tm.Unix()) atomic.StoreUint64(¤tTimestamp, t) } diff --git a/lib/storage/partition.go b/lib/storage/partition.go index feaad32bec..b9cf1b2c56 100644 --- a/lib/storage/partition.go +++ b/lib/storage/partition.go @@ -688,15 +688,15 @@ func (pt *partition) startRawRowsFlusher() { } func (pt *partition) rawRowsFlusher() { - t := time.NewTimer(rawRowsFlushInterval) + ticker := time.NewTicker(rawRowsFlushInterval) + defer ticker.Stop() for { select { case <-pt.stopCh: return - case <-t.C: - t.Reset(rawRowsFlushInterval) + case <-ticker.C: + pt.flushRawRows(false) } - pt.flushRawRows(false) } } @@ -736,20 +736,19 @@ func (pt *partition) startInmemoryPartsFlusher() { } func (pt *partition) inmemoryPartsFlusher() { - t := time.NewTimer(inmemoryPartsFlushInterval) + ticker := time.NewTicker(inmemoryPartsFlushInterval) + defer ticker.Stop() var pwsBuf []*partWrapper var err error for { select { case <-pt.stopCh: return - case <-t.C: - t.Reset(inmemoryPartsFlushInterval) - } - - pwsBuf, err = pt.flushInmemoryParts(pwsBuf[:0], false) - if err != nil { - logger.Panicf("FATAL: cannot flush inmemory parts: %s", err) + case <-ticker.C: + pwsBuf, err = pt.flushInmemoryParts(pwsBuf[:0], false) + if err != nil { + logger.Panicf("FATAL: cannot flush inmemory parts: %s", err) + } } } } diff --git a/lib/storage/storage.go b/lib/storage/storage.go index 13e89a106b..db2f4f5fb6 100644 --- a/lib/storage/storage.go +++ b/lib/storage/storage.go @@ -453,15 +453,15 @@ func (s *Storage) startCurrHourMetricIDsUpdater() { var currHourMetricIDsUpdateInterval = time.Second * 10 func (s *Storage) currHourMetricIDsUpdater() { - t := time.NewTimer(currHourMetricIDsUpdateInterval) + ticker := time.NewTicker(currHourMetricIDsUpdateInterval) + defer ticker.Stop() for { select { case <-s.stop: s.updateCurrHourMetricIDs() return - case <-t.C: + case <-ticker.C: s.updateCurrHourMetricIDs() - t.Reset(currHourMetricIDsUpdateInterval) } } } diff --git a/lib/storage/table.go b/lib/storage/table.go index 698b4569d6..19e3035c91 100644 --- a/lib/storage/table.go +++ b/lib/storage/table.go @@ -375,13 +375,13 @@ func (tb *table) startRetentionWatcher() { } func (tb *table) retentionWatcher() { - t := time.NewTimer(time.Minute) + ticker := time.NewTicker(time.Minute) + defer ticker.Stop() for { select { case <-tb.stop: return - case <-t.C: - t.Reset(time.Minute) + case <-ticker.C: } minTimestamp := timestampFromTime(time.Now()) - tb.retentionMilliseconds