lib/storage: pre-fetch metricNames for the found metricIDs in Search.Init

This should speed up Search.NextMetricBlock loop for big number of found time series.
This commit is contained in:
Aliaksandr Valialkin 2020-01-30 01:59:43 +02:00
parent ced989c966
commit 4ed5e9a7ce
3 changed files with 84 additions and 3 deletions

View file

@ -406,6 +406,9 @@ func registerStorageMetrics(strg *storage.Storage) {
metrics.NewGauge(`vm_cache_entries{type="storage/regexps"}`, func() float64 {
return float64(storage.RegexpCacheSize())
})
metrics.NewGauge(`vm_cache_size_entries{type="storage/prefetchedMetricIDs"}`, func() float64 {
return float64(m().PrefetchedMetricIDsSize)
})
metrics.NewGauge(`vm_cache_size_bytes{type="storage/tsid"}`, func() float64 {
return float64(m().TSIDCacheSizeBytes)
@ -428,6 +431,9 @@ func registerStorageMetrics(strg *storage.Storage) {
metrics.NewGauge(`vm_cache_size_bytes{type="indexdb/uselessTagFilters"}`, func() float64 {
return float64(idbm().UselessTagFiltersCacheSizeBytes)
})
metrics.NewGauge(`vm_cache_size_bytes{type="storage/prefetchedMetricIDs"}`, func() float64 {
return float64(m().PrefetchedMetricIDsSizeBytes)
})
metrics.NewGauge(`vm_cache_requests_total{type="storage/tsid"}`, func() float64 {
return float64(m().TSIDCacheRequests)

View file

@ -112,7 +112,9 @@ func (s *Search) Init(storage *Storage, tfss []*TagFilters, tr TimeRange, fetchD
s.needClosing = true
tsids, err := storage.searchTSIDs(tfss, tr, maxMetrics)
if err == nil {
err = storage.prefetchMetricNames(tsids)
}
// It is ok to call Init on error from storage.searchTSIDs.
// Init must be called before returning because it will fail
// on Seach.MustClose otherwise.

View file

@ -71,10 +71,14 @@ type Storage struct {
pendingHourEntriesLock sync.Mutex
pendingHourEntries []pendingHourMetricIDEntry
// metricIDs for pre-fetched metricNames in the prefetchMetricNames function.
prefetchedMetricIDs atomic.Value
stop chan struct{}
currHourMetricIDsUpdaterWG sync.WaitGroup
retentionWatcherWG sync.WaitGroup
prefetchedMetricIDsCleanerWG sync.WaitGroup
}
type pendingHourMetricIDEntry struct {
@ -137,6 +141,8 @@ func OpenStorage(path string, retentionMonths int) (*Storage, error) {
s.currHourMetricIDs.Store(hmCurr)
s.prevHourMetricIDs.Store(hmPrev)
s.prefetchedMetricIDs.Store(&uint64set.Set{})
// Load indexdb
idbPath := path + "/indexdb"
idbSnapshotsPath := idbPath + "/snapshots"
@ -161,6 +167,7 @@ func OpenStorage(path string, retentionMonths int) (*Storage, error) {
s.startCurrHourMetricIDsUpdater()
s.startRetentionWatcher()
s.startPrefetchedMetricIDsCleaner()
return s, nil
}
@ -323,6 +330,9 @@ type Metrics struct {
HourMetricIDCacheSize uint64
HourMetricIDCacheSizeBytes uint64
PrefetchedMetricIDsSize uint64
PrefetchedMetricIDsSizeBytes uint64
IndexDBMetrics IndexDBMetrics
TableMetrics TableMetrics
}
@ -382,10 +392,35 @@ func (s *Storage) UpdateMetrics(m *Metrics) {
m.HourMetricIDCacheSizeBytes += hmCurr.m.SizeBytes()
m.HourMetricIDCacheSizeBytes += hmPrev.m.SizeBytes()
prefetchedMetricIDs := s.prefetchedMetricIDs.Load().(*uint64set.Set)
m.PrefetchedMetricIDsSize += uint64(prefetchedMetricIDs.Len())
m.PrefetchedMetricIDsSizeBytes += uint64(prefetchedMetricIDs.SizeBytes())
s.idb().UpdateMetrics(&m.IndexDBMetrics)
s.tb.UpdateMetrics(&m.TableMetrics)
}
func (s *Storage) startPrefetchedMetricIDsCleaner() {
s.prefetchedMetricIDsCleanerWG.Add(1)
go func() {
s.prefetchedMetricIDsCleaner()
s.prefetchedMetricIDsCleanerWG.Done()
}()
}
func (s *Storage) prefetchedMetricIDsCleaner() {
t := time.NewTicker(7 * time.Minute)
for {
select {
case <-s.stop:
t.Stop()
return
case <-t.C:
s.prefetchedMetricIDs.Store(&uint64set.Set{})
}
}
}
func (s *Storage) startRetentionWatcher() {
s.retentionWatcherWG.Add(1)
go func() {
@ -676,6 +711,44 @@ func (s *Storage) searchTSIDs(tfss []*TagFilters, tr TimeRange, maxMetrics int)
return tsids, nil
}
// prefetchMetricNames pre-fetches metric names for the given tsids into metricID->metricName cache.
//
// This should speed-up further searchMetricName calls for metricIDs from tsids.
func (s *Storage) prefetchMetricNames(tsids []TSID) error {
var metricIDs uint64Sorter
prefetchedMetricIDs := s.prefetchedMetricIDs.Load().(*uint64set.Set)
for i := range tsids {
metricID := tsids[i].MetricID
if prefetchedMetricIDs.Has(metricID) {
continue
}
metricIDs = append(metricIDs, metricID)
}
if len(metricIDs) < 500 {
// It is cheaper to skip pre-fetching and obtain metricNames inline.
return nil
}
// Pre-fetch metricIDs.
sort.Sort(metricIDs)
var metricName []byte
var err error
for _, metricID := range metricIDs {
metricName, err = s.searchMetricName(metricName[:0], metricID)
if err != nil {
return fmt.Errorf("error in pre-fetching metricName for metricID=%d: %s", metricID, err)
}
}
// Store the pre-fetched metricIDs, so they aren't pre-fetched next time.
prefetchedMetricIDsNew := prefetchedMetricIDs.Clone()
for _, metricID := range metricIDs {
prefetchedMetricIDsNew.Add(metricID)
}
s.prefetchedMetricIDs.Store(prefetchedMetricIDsNew)
return nil
}
// DeleteMetrics deletes all the metrics matching the given tfss.
//
// Returns the number of metrics deleted.