diff --git a/lib/storage/storage.go b/lib/storage/storage.go index 795076dc5..9a5c46bd4 100644 --- a/lib/storage/storage.go +++ b/lib/storage/storage.go @@ -95,10 +95,9 @@ type Storage struct { stop chan struct{} - currHourMetricIDsUpdaterWG sync.WaitGroup - nextDayMetricIDsUpdaterWG sync.WaitGroup - retentionWatcherWG sync.WaitGroup - prefetchedMetricIDsCleanerWG sync.WaitGroup + currHourMetricIDsUpdaterWG sync.WaitGroup + nextDayMetricIDsUpdaterWG sync.WaitGroup + retentionWatcherWG sync.WaitGroup // The snapshotLock prevents from concurrent creation of snapshots, // since this may result in snapshots without recently added data, @@ -189,7 +188,6 @@ func OpenStorage(path string, retentionMonths int) (*Storage, error) { s.startCurrHourMetricIDsUpdater() s.startNextDayMetricIDsUpdater() s.startRetentionWatcher() - s.startPrefetchedMetricIDsCleaner() return s, nil } @@ -458,27 +456,6 @@ func (s *Storage) UpdateMetrics(m *Metrics) { s.tb.UpdateMetrics(&m.TableMetrics) } -func (s *Storage) startPrefetchedMetricIDsCleaner() { - s.prefetchedMetricIDsCleanerWG.Add(1) - go func() { - s.prefetchedMetricIDsCleaner() - s.prefetchedMetricIDsCleanerWG.Done() - }() -} - -func (s *Storage) prefetchedMetricIDsCleaner() { - t := time.NewTicker(7 * time.Minute) - for { - select { - case <-s.stop: - t.Stop() - return - case <-t.C: - s.prefetchedMetricIDs.Store(&uint64set.Set{}) - } - } -} - func (s *Storage) startRetentionWatcher() { s.retentionWatcherWG.Add(1) go func() { @@ -899,8 +876,13 @@ func (s *Storage) prefetchMetricNames(tsids []TSID, deadline uint64) error { } // Store the pre-fetched metricIDs, so they aren't pre-fetched next time. + prefetchedMetricIDsNew := prefetchedMetricIDs.Clone() prefetchedMetricIDsNew.AddMulti(metricIDs) + if prefetchedMetricIDsNew.SizeBytes() > uint64(memory.Allowed())/32 { + // Reset prefetchedMetricIDsNew if it occupies too much space. + prefetchedMetricIDsNew = &uint64set.Set{} + } s.prefetchedMetricIDs.Store(prefetchedMetricIDsNew) return nil }