diff --git a/lib/storage/index_db.go b/lib/storage/index_db.go index ab399bf29..9db1625cf 100644 --- a/lib/storage/index_db.go +++ b/lib/storage/index_db.go @@ -1501,9 +1501,14 @@ func (th *topHeap) Pop() interface{} { panic(fmt.Errorf("BUG: Pop shouldn't be called")) } -// searchMetricName appends metric name for the given metricID to dst +// searchMetricNameWithCache appends metric name for the given metricID to dst // and returns the result. -func (db *indexDB) searchMetricName(dst []byte, metricID uint64) ([]byte, error) { +func (db *indexDB) searchMetricNameWithCache(dst []byte, metricID uint64) ([]byte, error) { + metricName := db.getMetricNameFromCache(dst, metricID) + if len(metricName) > len(dst) { + return metricName, nil + } + is := db.getIndexSearch(noDeadline) dst, err := is.searchMetricName(dst, metricID) db.putIndexSearch(is) @@ -1753,12 +1758,15 @@ func (is *indexSearch) getTSIDByMetricName(dst *TSID, metricName []byte) error { return io.EOF } -func (is *indexSearch) searchMetricName(dst []byte, metricID uint64) ([]byte, error) { +func (is *indexSearch) searchMetricNameWithCache(dst []byte, metricID uint64) ([]byte, error) { metricName := is.db.getMetricNameFromCache(dst, metricID) if len(metricName) > len(dst) { return metricName, nil } + return is.searchMetricName(dst, metricID) +} +func (is *indexSearch) searchMetricName(dst []byte, metricID uint64) ([]byte, error) { ts := &is.ts kb := &is.kb kb.B = is.marshalCommonPrefix(kb.B[:0], nsPrefixMetricIDToMetricName) @@ -1929,7 +1937,7 @@ func (is *indexSearch) updateMetricIDsByMetricNameMatch(metricIDs, srcMetricIDs } } var err error - metricName.B, err = is.searchMetricName(metricName.B[:0], metricID) + metricName.B, err = is.searchMetricNameWithCache(metricName.B[:0], metricID) if err != nil { if err == io.EOF { // It is likely the metricID->metricName entry didn't propagate to inverted index yet. @@ -3015,7 +3023,7 @@ func (is *indexSearch) storeDateMetricID(date, metricID uint64) error { // There is no need in searching for metric name in is.db.extDB, // Since the storeDateMetricID function is called only after the metricID->metricName // is added into the current is.db. - kb.B, err = is.searchMetricName(kb.B[:0], metricID) + kb.B, err = is.searchMetricNameWithCache(kb.B[:0], metricID) if err != nil { if err == io.EOF { logger.Errorf("missing metricName by metricID %d; this could be the case after unclean shutdown; "+ diff --git a/lib/storage/index_db_test.go b/lib/storage/index_db_test.go index 25fbe858c..98a4943d5 100644 --- a/lib/storage/index_db_test.go +++ b/lib/storage/index_db_test.go @@ -746,16 +746,16 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, isC // Search for metric name for the given metricID. var err error - metricNameCopy, err = db.searchMetricName(metricNameCopy[:0], tsidCopy.MetricID) + metricNameCopy, err = db.searchMetricNameWithCache(metricNameCopy[:0], tsidCopy.MetricID) if err != nil { - return fmt.Errorf("error in searchMetricName for metricID=%d; i=%d: %w", tsidCopy.MetricID, i, err) + return fmt.Errorf("error in searchMetricNameWithCache for metricID=%d; i=%d: %w", tsidCopy.MetricID, i, err) } if !bytes.Equal(metricName, metricNameCopy) { return fmt.Errorf("unexpected mn for metricID=%d;\ngot\n%q\nwant\n%q", tsidCopy.MetricID, metricNameCopy, metricName) } // Try searching metric name for non-existent MetricID. - buf, err := db.searchMetricName(nil, 1) + buf, err := db.searchMetricNameWithCache(nil, 1) if err != io.EOF { return fmt.Errorf("expecting io.EOF error when searching for non-existing metricID; got %v", err) } diff --git a/lib/storage/search.go b/lib/storage/search.go index ac644cc65..10bb6f86d 100644 --- a/lib/storage/search.go +++ b/lib/storage/search.go @@ -91,7 +91,7 @@ type Search struct { // MetricBlockRef is updated with each Search.NextMetricBlock call. MetricBlockRef MetricBlockRef - storage *Storage + idb *indexDB ts tableSearch @@ -115,7 +115,7 @@ func (s *Search) reset() { s.MetricBlockRef.MetricName = s.MetricBlockRef.MetricName[:0] s.MetricBlockRef.BlockRef = nil - s.storage = nil + s.idb = nil s.ts.reset() s.tr = TimeRange{} s.tfss = nil @@ -155,7 +155,7 @@ func (s *Search) Init(storage *Storage, tfss []*TagFilters, tr TimeRange, maxMet return 0 } - s.storage = storage + s.idb = storage.idb() return len(tsids) } @@ -191,7 +191,7 @@ func (s *Search) NextMetricBlock() bool { s.loops++ tsid := &s.ts.BlockRef.bh.TSID var err error - s.MetricBlockRef.MetricName, err = s.storage.searchMetricName(s.MetricBlockRef.MetricName[:0], tsid.MetricID) + s.MetricBlockRef.MetricName, err = s.idb.searchMetricNameWithCache(s.MetricBlockRef.MetricName[:0], tsid.MetricID) if err != nil { if err == io.EOF { // Skip missing metricName for tsid.MetricID. diff --git a/lib/storage/storage.go b/lib/storage/storage.go index d299fcd81..63733fd1a 100644 --- a/lib/storage/storage.go +++ b/lib/storage/storage.go @@ -867,14 +867,12 @@ func (s *Storage) SearchMetricNames(tfss []*TagFilters, tr TimeRange, maxMetrics return nil, err } idb := s.idb() - is := idb.getIndexSearch(deadline) - defer idb.putIndexSearch(is) mns := make([]MetricName, 0, len(tsids)) var metricName []byte for i := range tsids { metricID := tsids[i].MetricID var err error - metricName, err = is.searchMetricName(metricName[:0], metricID) + metricName, err = idb.searchMetricNameWithCache(metricName[:0], metricID) if err != nil { if err == io.EOF { // Skip missing metricName for metricID. @@ -939,7 +937,7 @@ var ( // prefetchMetricNames pre-fetches metric names for the given tsids into metricID->metricName cache. // -// This should speed-up further searchMetricName calls for metricIDs from tsids. +// This should speed-up further searchMetricNameWithCache calls for metricIDs from tsids. func (s *Storage) prefetchMetricNames(tsids []TSID, deadline uint64) error { if len(tsids) == 0 { return nil @@ -962,6 +960,7 @@ func (s *Storage) prefetchMetricNames(tsids []TSID, deadline uint64) error { // Pre-fetch metricIDs. sort.Sort(metricIDs) + var missingMetricIDs []uint64 var metricName []byte var err error idb := s.idb() @@ -973,11 +972,34 @@ func (s *Storage) prefetchMetricNames(tsids []TSID, deadline uint64) error { return err } } - metricName, err = is.searchMetricName(metricName[:0], metricID) - if err != nil && err != io.EOF { + metricName, err = is.searchMetricNameWithCache(metricName[:0], metricID) + if err != nil { + if err == io.EOF { + missingMetricIDs = append(missingMetricIDs, metricID) + continue + } return fmt.Errorf("error in pre-fetching metricName for metricID=%d: %w", metricID, err) } } + idb.doExtDB(func(extDB *indexDB) { + is := extDB.getIndexSearch(deadline) + defer extDB.putIndexSearch(is) + for loops, metricID := range missingMetricIDs { + if loops&paceLimiterSlowIterationsMask == 0 { + if err = checkSearchDeadlineAndPace(is.deadline); err != nil { + return + } + } + metricName, err = is.searchMetricNameWithCache(metricName[:0], metricID) + if err != nil && err != io.EOF { + err = fmt.Errorf("error in pre-fetching metricName for metricID=%d in extDB: %w", metricID, err) + return + } + } + }) + if err != nil { + return err + } // Store the pre-fetched metricIDs, so they aren't pre-fetched next time. @@ -1011,12 +1033,6 @@ func (s *Storage) DeleteMetrics(tfss []*TagFilters) (int, error) { return deletedCount, nil } -// searchMetricName appends metric name for the given metricID to dst -// and returns the result. -func (s *Storage) searchMetricName(dst []byte, metricID uint64) ([]byte, error) { - return s.idb().searchMetricName(dst, metricID) -} - // SearchTagKeysOnTimeRange searches for tag keys on tr. func (s *Storage) SearchTagKeysOnTimeRange(tr TimeRange, maxTagKeys int, deadline uint64) ([]string, error) { return s.idb().SearchTagKeysOnTimeRange(tr, maxTagKeys, deadline)