diff --git a/app/vmstorage/main.go b/app/vmstorage/main.go index 99982b3b7..938d57c01 100644 --- a/app/vmstorage/main.go +++ b/app/vmstorage/main.go @@ -406,6 +406,9 @@ func registerStorageMetrics(strg *storage.Storage) { metrics.NewGauge(`vm_cache_entries{type="storage/regexps"}`, func() float64 { return float64(storage.RegexpCacheSize()) }) + metrics.NewGauge(`vm_cache_size_entries{type="storage/prefetchedMetricIDs"}`, func() float64 { + return float64(m().PrefetchedMetricIDsSize) + }) metrics.NewGauge(`vm_cache_size_bytes{type="storage/tsid"}`, func() float64 { return float64(m().TSIDCacheSizeBytes) @@ -428,6 +431,9 @@ func registerStorageMetrics(strg *storage.Storage) { metrics.NewGauge(`vm_cache_size_bytes{type="indexdb/uselessTagFilters"}`, func() float64 { return float64(idbm().UselessTagFiltersCacheSizeBytes) }) + metrics.NewGauge(`vm_cache_size_bytes{type="storage/prefetchedMetricIDs"}`, func() float64 { + return float64(m().PrefetchedMetricIDsSizeBytes) + }) metrics.NewGauge(`vm_cache_requests_total{type="storage/tsid"}`, func() float64 { return float64(m().TSIDCacheRequests) diff --git a/lib/storage/search.go b/lib/storage/search.go index a2cfa4eb4..e82b7ed99 100644 --- a/lib/storage/search.go +++ b/lib/storage/search.go @@ -112,7 +112,9 @@ func (s *Search) Init(storage *Storage, tfss []*TagFilters, tr TimeRange, fetchD s.needClosing = true tsids, err := storage.searchTSIDs(tfss, tr, maxMetrics) - + if err == nil { + err = storage.prefetchMetricNames(tsids) + } // It is ok to call Init on error from storage.searchTSIDs. // Init must be called before returning because it will fail // on Seach.MustClose otherwise. diff --git a/lib/storage/storage.go b/lib/storage/storage.go index a6ea9b9cf..d8a711913 100644 --- a/lib/storage/storage.go +++ b/lib/storage/storage.go @@ -71,10 +71,14 @@ type Storage struct { pendingHourEntriesLock sync.Mutex pendingHourEntries []pendingHourMetricIDEntry + // metricIDs for pre-fetched metricNames in the prefetchMetricNames function. + prefetchedMetricIDs atomic.Value + stop chan struct{} - currHourMetricIDsUpdaterWG sync.WaitGroup - retentionWatcherWG sync.WaitGroup + currHourMetricIDsUpdaterWG sync.WaitGroup + retentionWatcherWG sync.WaitGroup + prefetchedMetricIDsCleanerWG sync.WaitGroup } type pendingHourMetricIDEntry struct { @@ -137,6 +141,8 @@ func OpenStorage(path string, retentionMonths int) (*Storage, error) { s.currHourMetricIDs.Store(hmCurr) s.prevHourMetricIDs.Store(hmPrev) + s.prefetchedMetricIDs.Store(&uint64set.Set{}) + // Load indexdb idbPath := path + "/indexdb" idbSnapshotsPath := idbPath + "/snapshots" @@ -161,6 +167,7 @@ func OpenStorage(path string, retentionMonths int) (*Storage, error) { s.startCurrHourMetricIDsUpdater() s.startRetentionWatcher() + s.startPrefetchedMetricIDsCleaner() return s, nil } @@ -323,6 +330,9 @@ type Metrics struct { HourMetricIDCacheSize uint64 HourMetricIDCacheSizeBytes uint64 + PrefetchedMetricIDsSize uint64 + PrefetchedMetricIDsSizeBytes uint64 + IndexDBMetrics IndexDBMetrics TableMetrics TableMetrics } @@ -382,10 +392,35 @@ func (s *Storage) UpdateMetrics(m *Metrics) { m.HourMetricIDCacheSizeBytes += hmCurr.m.SizeBytes() m.HourMetricIDCacheSizeBytes += hmPrev.m.SizeBytes() + prefetchedMetricIDs := s.prefetchedMetricIDs.Load().(*uint64set.Set) + m.PrefetchedMetricIDsSize += uint64(prefetchedMetricIDs.Len()) + m.PrefetchedMetricIDsSizeBytes += uint64(prefetchedMetricIDs.SizeBytes()) + s.idb().UpdateMetrics(&m.IndexDBMetrics) s.tb.UpdateMetrics(&m.TableMetrics) } +func (s *Storage) startPrefetchedMetricIDsCleaner() { + s.prefetchedMetricIDsCleanerWG.Add(1) + go func() { + s.prefetchedMetricIDsCleaner() + s.prefetchedMetricIDsCleanerWG.Done() + }() +} + +func (s *Storage) prefetchedMetricIDsCleaner() { + t := time.NewTicker(7 * time.Minute) + for { + select { + case <-s.stop: + t.Stop() + return + case <-t.C: + s.prefetchedMetricIDs.Store(&uint64set.Set{}) + } + } +} + func (s *Storage) startRetentionWatcher() { s.retentionWatcherWG.Add(1) go func() { @@ -676,6 +711,44 @@ func (s *Storage) searchTSIDs(tfss []*TagFilters, tr TimeRange, maxMetrics int) return tsids, nil } +// prefetchMetricNames pre-fetches metric names for the given tsids into metricID->metricName cache. +// +// This should speed-up further searchMetricName calls for metricIDs from tsids. +func (s *Storage) prefetchMetricNames(tsids []TSID) error { + var metricIDs uint64Sorter + prefetchedMetricIDs := s.prefetchedMetricIDs.Load().(*uint64set.Set) + for i := range tsids { + metricID := tsids[i].MetricID + if prefetchedMetricIDs.Has(metricID) { + continue + } + metricIDs = append(metricIDs, metricID) + } + if len(metricIDs) < 500 { + // It is cheaper to skip pre-fetching and obtain metricNames inline. + return nil + } + + // Pre-fetch metricIDs. + sort.Sort(metricIDs) + var metricName []byte + var err error + for _, metricID := range metricIDs { + metricName, err = s.searchMetricName(metricName[:0], metricID) + if err != nil { + return fmt.Errorf("error in pre-fetching metricName for metricID=%d: %s", metricID, err) + } + } + + // Store the pre-fetched metricIDs, so they aren't pre-fetched next time. + prefetchedMetricIDsNew := prefetchedMetricIDs.Clone() + for _, metricID := range metricIDs { + prefetchedMetricIDsNew.Add(metricID) + } + s.prefetchedMetricIDs.Store(prefetchedMetricIDsNew) + return nil +} + // DeleteMetrics deletes all the metrics matching the given tfss. // // Returns the number of metrics deleted.