lib/storage: pre-fetch metricNames for the found metricIDs in Search.Init

This should speed up Search.NextMetricBlock loop for big number of found time series.
This commit is contained in:
Aliaksandr Valialkin 2020-01-30 01:59:43 +02:00
parent 5bb9ccb6bf
commit d68546aa4a
3 changed files with 84 additions and 3 deletions

View file

@ -461,6 +461,9 @@ func registerStorageMetrics() {
metrics.NewGauge(`vm_cache_entries{type="storage/regexps"}`, func() float64 { metrics.NewGauge(`vm_cache_entries{type="storage/regexps"}`, func() float64 {
return float64(storage.RegexpCacheSize()) return float64(storage.RegexpCacheSize())
}) })
metrics.NewGauge(`vm_cache_size_entries{type="storage/prefetchedMetricIDs"}`, func() float64 {
return float64(m().PrefetchedMetricIDsSize)
})
metrics.NewGauge(`vm_cache_size_bytes{type="storage/tsid"}`, func() float64 { metrics.NewGauge(`vm_cache_size_bytes{type="storage/tsid"}`, func() float64 {
return float64(m().TSIDCacheSizeBytes) return float64(m().TSIDCacheSizeBytes)
@ -483,6 +486,9 @@ func registerStorageMetrics() {
metrics.NewGauge(`vm_cache_size_bytes{type="indexdb/uselessTagFilters"}`, func() float64 { metrics.NewGauge(`vm_cache_size_bytes{type="indexdb/uselessTagFilters"}`, func() float64 {
return float64(idbm().UselessTagFiltersCacheSizeBytes) return float64(idbm().UselessTagFiltersCacheSizeBytes)
}) })
metrics.NewGauge(`vm_cache_size_bytes{type="storage/prefetchedMetricIDs"}`, func() float64 {
return float64(m().PrefetchedMetricIDsSizeBytes)
})
metrics.NewGauge(`vm_cache_requests_total{type="storage/tsid"}`, func() float64 { metrics.NewGauge(`vm_cache_requests_total{type="storage/tsid"}`, func() float64 {
return float64(m().TSIDCacheRequests) return float64(m().TSIDCacheRequests)

View file

@ -112,7 +112,9 @@ func (s *Search) Init(storage *Storage, tfss []*TagFilters, tr TimeRange, fetchD
s.needClosing = true s.needClosing = true
tsids, err := storage.searchTSIDs(tfss, tr, maxMetrics) tsids, err := storage.searchTSIDs(tfss, tr, maxMetrics)
if err == nil {
err = storage.prefetchMetricNames(tsids)
}
// It is ok to call Init on error from storage.searchTSIDs. // It is ok to call Init on error from storage.searchTSIDs.
// Init must be called before returning because it will fail // Init must be called before returning because it will fail
// on Seach.MustClose otherwise. // on Seach.MustClose otherwise.

View file

@ -71,10 +71,14 @@ type Storage struct {
pendingHourEntriesLock sync.Mutex pendingHourEntriesLock sync.Mutex
pendingHourEntries *uint64set.Set pendingHourEntries *uint64set.Set
// metricIDs for pre-fetched metricNames in the prefetchMetricNames function.
prefetchedMetricIDs atomic.Value
stop chan struct{} stop chan struct{}
currHourMetricIDsUpdaterWG sync.WaitGroup currHourMetricIDsUpdaterWG sync.WaitGroup
retentionWatcherWG sync.WaitGroup retentionWatcherWG sync.WaitGroup
prefetchedMetricIDsCleanerWG sync.WaitGroup
} }
// OpenStorage opens storage on the given path with the given number of retention months. // OpenStorage opens storage on the given path with the given number of retention months.
@ -127,6 +131,8 @@ func OpenStorage(path string, retentionMonths int) (*Storage, error) {
s.prevHourMetricIDs.Store(hmPrev) s.prevHourMetricIDs.Store(hmPrev)
s.pendingHourEntries = &uint64set.Set{} s.pendingHourEntries = &uint64set.Set{}
s.prefetchedMetricIDs.Store(&uint64set.Set{})
// Load indexdb // Load indexdb
idbPath := path + "/indexdb" idbPath := path + "/indexdb"
idbSnapshotsPath := idbPath + "/snapshots" idbSnapshotsPath := idbPath + "/snapshots"
@ -151,6 +157,7 @@ func OpenStorage(path string, retentionMonths int) (*Storage, error) {
s.startCurrHourMetricIDsUpdater() s.startCurrHourMetricIDsUpdater()
s.startRetentionWatcher() s.startRetentionWatcher()
s.startPrefetchedMetricIDsCleaner()
return s, nil return s, nil
} }
@ -313,6 +320,9 @@ type Metrics struct {
HourMetricIDCacheSize uint64 HourMetricIDCacheSize uint64
HourMetricIDCacheSizeBytes uint64 HourMetricIDCacheSizeBytes uint64
PrefetchedMetricIDsSize uint64
PrefetchedMetricIDsSizeBytes uint64
IndexDBMetrics IndexDBMetrics IndexDBMetrics IndexDBMetrics
TableMetrics TableMetrics TableMetrics TableMetrics
} }
@ -372,10 +382,35 @@ func (s *Storage) UpdateMetrics(m *Metrics) {
m.HourMetricIDCacheSizeBytes += hmCurr.m.SizeBytes() m.HourMetricIDCacheSizeBytes += hmCurr.m.SizeBytes()
m.HourMetricIDCacheSizeBytes += hmPrev.m.SizeBytes() m.HourMetricIDCacheSizeBytes += hmPrev.m.SizeBytes()
prefetchedMetricIDs := s.prefetchedMetricIDs.Load().(*uint64set.Set)
m.PrefetchedMetricIDsSize += uint64(prefetchedMetricIDs.Len())
m.PrefetchedMetricIDsSizeBytes += uint64(prefetchedMetricIDs.SizeBytes())
s.idb().UpdateMetrics(&m.IndexDBMetrics) s.idb().UpdateMetrics(&m.IndexDBMetrics)
s.tb.UpdateMetrics(&m.TableMetrics) s.tb.UpdateMetrics(&m.TableMetrics)
} }
func (s *Storage) startPrefetchedMetricIDsCleaner() {
s.prefetchedMetricIDsCleanerWG.Add(1)
go func() {
s.prefetchedMetricIDsCleaner()
s.prefetchedMetricIDsCleanerWG.Done()
}()
}
func (s *Storage) prefetchedMetricIDsCleaner() {
t := time.NewTicker(7 * time.Minute)
for {
select {
case <-s.stop:
t.Stop()
return
case <-t.C:
s.prefetchedMetricIDs.Store(&uint64set.Set{})
}
}
}
func (s *Storage) startRetentionWatcher() { func (s *Storage) startRetentionWatcher() {
s.retentionWatcherWG.Add(1) s.retentionWatcherWG.Add(1)
go func() { go func() {
@ -616,6 +651,44 @@ func (s *Storage) searchTSIDs(tfss []*TagFilters, tr TimeRange, maxMetrics int)
return tsids, nil return tsids, nil
} }
// prefetchMetricNames pre-fetches metric names for the given tsids into metricID->metricName cache.
//
// This should speed-up further searchMetricName calls for metricIDs from tsids.
func (s *Storage) prefetchMetricNames(tsids []TSID) error {
var metricIDs uint64Sorter
prefetchedMetricIDs := s.prefetchedMetricIDs.Load().(*uint64set.Set)
for i := range tsids {
metricID := tsids[i].MetricID
if prefetchedMetricIDs.Has(metricID) {
continue
}
metricIDs = append(metricIDs, metricID)
}
if len(metricIDs) < 500 {
// It is cheaper to skip pre-fetching and obtain metricNames inline.
return nil
}
// Pre-fetch metricIDs.
sort.Sort(metricIDs)
var metricName []byte
var err error
for _, metricID := range metricIDs {
metricName, err = s.searchMetricName(metricName[:0], metricID)
if err != nil {
return fmt.Errorf("error in pre-fetching metricName for metricID=%d: %s", metricID, err)
}
}
// Store the pre-fetched metricIDs, so they aren't pre-fetched next time.
prefetchedMetricIDsNew := prefetchedMetricIDs.Clone()
for _, metricID := range metricIDs {
prefetchedMetricIDsNew.Add(metricID)
}
s.prefetchedMetricIDs.Store(prefetchedMetricIDsNew)
return nil
}
// DeleteMetrics deletes all the metrics matching the given tfss. // DeleteMetrics deletes all the metrics matching the given tfss.
// //
// Returns the number of metrics deleted. // Returns the number of metrics deleted.