From d68546aa4ac7bbbadc59a128db85225b26279a33 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Thu, 30 Jan 2020 01:59:43 +0200 Subject: [PATCH] lib/storage: pre-fetch metricNames for the found metricIDs in Search.Init This should speed up Search.NextMetricBlock loop for big number of found time series. --- app/vmstorage/main.go | 6 ++++ lib/storage/search.go | 4 ++- lib/storage/storage.go | 77 ++++++++++++++++++++++++++++++++++++++++-- 3 files changed, 84 insertions(+), 3 deletions(-) diff --git a/app/vmstorage/main.go b/app/vmstorage/main.go index f25648ad57..0b775a8cc1 100644 --- a/app/vmstorage/main.go +++ b/app/vmstorage/main.go @@ -461,6 +461,9 @@ func registerStorageMetrics() { metrics.NewGauge(`vm_cache_entries{type="storage/regexps"}`, func() float64 { return float64(storage.RegexpCacheSize()) }) + metrics.NewGauge(`vm_cache_size_entries{type="storage/prefetchedMetricIDs"}`, func() float64 { + return float64(m().PrefetchedMetricIDsSize) + }) metrics.NewGauge(`vm_cache_size_bytes{type="storage/tsid"}`, func() float64 { return float64(m().TSIDCacheSizeBytes) @@ -483,6 +486,9 @@ func registerStorageMetrics() { metrics.NewGauge(`vm_cache_size_bytes{type="indexdb/uselessTagFilters"}`, func() float64 { return float64(idbm().UselessTagFiltersCacheSizeBytes) }) + metrics.NewGauge(`vm_cache_size_bytes{type="storage/prefetchedMetricIDs"}`, func() float64 { + return float64(m().PrefetchedMetricIDsSizeBytes) + }) metrics.NewGauge(`vm_cache_requests_total{type="storage/tsid"}`, func() float64 { return float64(m().TSIDCacheRequests) diff --git a/lib/storage/search.go b/lib/storage/search.go index 51f0761198..8318186866 100644 --- a/lib/storage/search.go +++ b/lib/storage/search.go @@ -112,7 +112,9 @@ func (s *Search) Init(storage *Storage, tfss []*TagFilters, tr TimeRange, fetchD s.needClosing = true tsids, err := storage.searchTSIDs(tfss, tr, maxMetrics) - + if err == nil { + err = storage.prefetchMetricNames(tsids) + } // It is ok to call Init on error from storage.searchTSIDs. // Init must be called before returning because it will fail // on Seach.MustClose otherwise. diff --git a/lib/storage/storage.go b/lib/storage/storage.go index 95d7958465..0418ea24f8 100644 --- a/lib/storage/storage.go +++ b/lib/storage/storage.go @@ -71,10 +71,14 @@ type Storage struct { pendingHourEntriesLock sync.Mutex pendingHourEntries *uint64set.Set + // metricIDs for pre-fetched metricNames in the prefetchMetricNames function. + prefetchedMetricIDs atomic.Value + stop chan struct{} - currHourMetricIDsUpdaterWG sync.WaitGroup - retentionWatcherWG sync.WaitGroup + currHourMetricIDsUpdaterWG sync.WaitGroup + retentionWatcherWG sync.WaitGroup + prefetchedMetricIDsCleanerWG sync.WaitGroup } // OpenStorage opens storage on the given path with the given number of retention months. @@ -127,6 +131,8 @@ func OpenStorage(path string, retentionMonths int) (*Storage, error) { s.prevHourMetricIDs.Store(hmPrev) s.pendingHourEntries = &uint64set.Set{} + s.prefetchedMetricIDs.Store(&uint64set.Set{}) + // Load indexdb idbPath := path + "/indexdb" idbSnapshotsPath := idbPath + "/snapshots" @@ -151,6 +157,7 @@ func OpenStorage(path string, retentionMonths int) (*Storage, error) { s.startCurrHourMetricIDsUpdater() s.startRetentionWatcher() + s.startPrefetchedMetricIDsCleaner() return s, nil } @@ -313,6 +320,9 @@ type Metrics struct { HourMetricIDCacheSize uint64 HourMetricIDCacheSizeBytes uint64 + PrefetchedMetricIDsSize uint64 + PrefetchedMetricIDsSizeBytes uint64 + IndexDBMetrics IndexDBMetrics TableMetrics TableMetrics } @@ -372,10 +382,35 @@ func (s *Storage) UpdateMetrics(m *Metrics) { m.HourMetricIDCacheSizeBytes += hmCurr.m.SizeBytes() m.HourMetricIDCacheSizeBytes += hmPrev.m.SizeBytes() + prefetchedMetricIDs := s.prefetchedMetricIDs.Load().(*uint64set.Set) + m.PrefetchedMetricIDsSize += uint64(prefetchedMetricIDs.Len()) + m.PrefetchedMetricIDsSizeBytes += uint64(prefetchedMetricIDs.SizeBytes()) + s.idb().UpdateMetrics(&m.IndexDBMetrics) s.tb.UpdateMetrics(&m.TableMetrics) } +func (s *Storage) startPrefetchedMetricIDsCleaner() { + s.prefetchedMetricIDsCleanerWG.Add(1) + go func() { + s.prefetchedMetricIDsCleaner() + s.prefetchedMetricIDsCleanerWG.Done() + }() +} + +func (s *Storage) prefetchedMetricIDsCleaner() { + t := time.NewTicker(7 * time.Minute) + for { + select { + case <-s.stop: + t.Stop() + return + case <-t.C: + s.prefetchedMetricIDs.Store(&uint64set.Set{}) + } + } +} + func (s *Storage) startRetentionWatcher() { s.retentionWatcherWG.Add(1) go func() { @@ -616,6 +651,44 @@ func (s *Storage) searchTSIDs(tfss []*TagFilters, tr TimeRange, maxMetrics int) return tsids, nil } +// prefetchMetricNames pre-fetches metric names for the given tsids into metricID->metricName cache. +// +// This should speed-up further searchMetricName calls for metricIDs from tsids. +func (s *Storage) prefetchMetricNames(tsids []TSID) error { + var metricIDs uint64Sorter + prefetchedMetricIDs := s.prefetchedMetricIDs.Load().(*uint64set.Set) + for i := range tsids { + metricID := tsids[i].MetricID + if prefetchedMetricIDs.Has(metricID) { + continue + } + metricIDs = append(metricIDs, metricID) + } + if len(metricIDs) < 500 { + // It is cheaper to skip pre-fetching and obtain metricNames inline. + return nil + } + + // Pre-fetch metricIDs. + sort.Sort(metricIDs) + var metricName []byte + var err error + for _, metricID := range metricIDs { + metricName, err = s.searchMetricName(metricName[:0], metricID) + if err != nil { + return fmt.Errorf("error in pre-fetching metricName for metricID=%d: %s", metricID, err) + } + } + + // Store the pre-fetched metricIDs, so they aren't pre-fetched next time. + prefetchedMetricIDsNew := prefetchedMetricIDs.Clone() + for _, metricID := range metricIDs { + prefetchedMetricIDsNew.Add(metricID) + } + s.prefetchedMetricIDs.Store(prefetchedMetricIDsNew) + return nil +} + // DeleteMetrics deletes all the metrics matching the given tfss. // // Returns the number of metrics deleted.