lib/storage: optimize prefetching metric names for the given metricIDs

This commit is contained in:
Aliaksandr Valialkin 2020-08-06 16:48:21 +03:00
parent dbbdfbe7ee
commit ad730d8a17

View file

@ -95,10 +95,9 @@ type Storage struct {
stop chan struct{}
currHourMetricIDsUpdaterWG sync.WaitGroup
nextDayMetricIDsUpdaterWG sync.WaitGroup
retentionWatcherWG sync.WaitGroup
prefetchedMetricIDsCleanerWG sync.WaitGroup
currHourMetricIDsUpdaterWG sync.WaitGroup
nextDayMetricIDsUpdaterWG sync.WaitGroup
retentionWatcherWG sync.WaitGroup
// The snapshotLock prevents from concurrent creation of snapshots,
// since this may result in snapshots without recently added data,
@ -189,7 +188,6 @@ func OpenStorage(path string, retentionMonths int) (*Storage, error) {
s.startCurrHourMetricIDsUpdater()
s.startNextDayMetricIDsUpdater()
s.startRetentionWatcher()
s.startPrefetchedMetricIDsCleaner()
return s, nil
}
@ -458,27 +456,6 @@ func (s *Storage) UpdateMetrics(m *Metrics) {
s.tb.UpdateMetrics(&m.TableMetrics)
}
func (s *Storage) startPrefetchedMetricIDsCleaner() {
s.prefetchedMetricIDsCleanerWG.Add(1)
go func() {
s.prefetchedMetricIDsCleaner()
s.prefetchedMetricIDsCleanerWG.Done()
}()
}
func (s *Storage) prefetchedMetricIDsCleaner() {
t := time.NewTicker(7 * time.Minute)
for {
select {
case <-s.stop:
t.Stop()
return
case <-t.C:
s.prefetchedMetricIDs.Store(&uint64set.Set{})
}
}
}
func (s *Storage) startRetentionWatcher() {
s.retentionWatcherWG.Add(1)
go func() {
@ -899,8 +876,13 @@ func (s *Storage) prefetchMetricNames(tsids []TSID, deadline uint64) error {
}
// Store the pre-fetched metricIDs, so they aren't pre-fetched next time.
prefetchedMetricIDsNew := prefetchedMetricIDs.Clone()
prefetchedMetricIDsNew.AddMulti(metricIDs)
if prefetchedMetricIDsNew.SizeBytes() > uint64(memory.Allowed())/32 {
// Reset prefetchedMetricIDsNew if it occupies too much space.
prefetchedMetricIDsNew = &uint64set.Set{}
}
s.prefetchedMetricIDs.Store(prefetchedMetricIDsNew)
return nil
}