mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
lib/storage: improve Search.NextMetricBlock performance by using MetricID->MetricName cache
This commit is contained in:
parent
e17eb35147
commit
8e2afdf568
4 changed files with 48 additions and 24 deletions
|
@ -1501,9 +1501,14 @@ func (th *topHeap) Pop() interface{} {
|
|||
panic(fmt.Errorf("BUG: Pop shouldn't be called"))
|
||||
}
|
||||
|
||||
// searchMetricName appends metric name for the given metricID to dst
|
||||
// searchMetricNameWithCache appends metric name for the given metricID to dst
|
||||
// and returns the result.
|
||||
func (db *indexDB) searchMetricName(dst []byte, metricID uint64) ([]byte, error) {
|
||||
func (db *indexDB) searchMetricNameWithCache(dst []byte, metricID uint64) ([]byte, error) {
|
||||
metricName := db.getMetricNameFromCache(dst, metricID)
|
||||
if len(metricName) > len(dst) {
|
||||
return metricName, nil
|
||||
}
|
||||
|
||||
is := db.getIndexSearch(noDeadline)
|
||||
dst, err := is.searchMetricName(dst, metricID)
|
||||
db.putIndexSearch(is)
|
||||
|
@ -1753,12 +1758,15 @@ func (is *indexSearch) getTSIDByMetricName(dst *TSID, metricName []byte) error {
|
|||
return io.EOF
|
||||
}
|
||||
|
||||
func (is *indexSearch) searchMetricName(dst []byte, metricID uint64) ([]byte, error) {
|
||||
func (is *indexSearch) searchMetricNameWithCache(dst []byte, metricID uint64) ([]byte, error) {
|
||||
metricName := is.db.getMetricNameFromCache(dst, metricID)
|
||||
if len(metricName) > len(dst) {
|
||||
return metricName, nil
|
||||
}
|
||||
return is.searchMetricName(dst, metricID)
|
||||
}
|
||||
|
||||
func (is *indexSearch) searchMetricName(dst []byte, metricID uint64) ([]byte, error) {
|
||||
ts := &is.ts
|
||||
kb := &is.kb
|
||||
kb.B = is.marshalCommonPrefix(kb.B[:0], nsPrefixMetricIDToMetricName)
|
||||
|
@ -1929,7 +1937,7 @@ func (is *indexSearch) updateMetricIDsByMetricNameMatch(metricIDs, srcMetricIDs
|
|||
}
|
||||
}
|
||||
var err error
|
||||
metricName.B, err = is.searchMetricName(metricName.B[:0], metricID)
|
||||
metricName.B, err = is.searchMetricNameWithCache(metricName.B[:0], metricID)
|
||||
if err != nil {
|
||||
if err == io.EOF {
|
||||
// It is likely the metricID->metricName entry didn't propagate to inverted index yet.
|
||||
|
@ -3015,7 +3023,7 @@ func (is *indexSearch) storeDateMetricID(date, metricID uint64) error {
|
|||
// There is no need in searching for metric name in is.db.extDB,
|
||||
// Since the storeDateMetricID function is called only after the metricID->metricName
|
||||
// is added into the current is.db.
|
||||
kb.B, err = is.searchMetricName(kb.B[:0], metricID)
|
||||
kb.B, err = is.searchMetricNameWithCache(kb.B[:0], metricID)
|
||||
if err != nil {
|
||||
if err == io.EOF {
|
||||
logger.Errorf("missing metricName by metricID %d; this could be the case after unclean shutdown; "+
|
||||
|
|
|
@ -746,16 +746,16 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, isC
|
|||
|
||||
// Search for metric name for the given metricID.
|
||||
var err error
|
||||
metricNameCopy, err = db.searchMetricName(metricNameCopy[:0], tsidCopy.MetricID)
|
||||
metricNameCopy, err = db.searchMetricNameWithCache(metricNameCopy[:0], tsidCopy.MetricID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error in searchMetricName for metricID=%d; i=%d: %w", tsidCopy.MetricID, i, err)
|
||||
return fmt.Errorf("error in searchMetricNameWithCache for metricID=%d; i=%d: %w", tsidCopy.MetricID, i, err)
|
||||
}
|
||||
if !bytes.Equal(metricName, metricNameCopy) {
|
||||
return fmt.Errorf("unexpected mn for metricID=%d;\ngot\n%q\nwant\n%q", tsidCopy.MetricID, metricNameCopy, metricName)
|
||||
}
|
||||
|
||||
// Try searching metric name for non-existent MetricID.
|
||||
buf, err := db.searchMetricName(nil, 1)
|
||||
buf, err := db.searchMetricNameWithCache(nil, 1)
|
||||
if err != io.EOF {
|
||||
return fmt.Errorf("expecting io.EOF error when searching for non-existing metricID; got %v", err)
|
||||
}
|
||||
|
|
|
@ -91,7 +91,7 @@ type Search struct {
|
|||
// MetricBlockRef is updated with each Search.NextMetricBlock call.
|
||||
MetricBlockRef MetricBlockRef
|
||||
|
||||
storage *Storage
|
||||
idb *indexDB
|
||||
|
||||
ts tableSearch
|
||||
|
||||
|
@ -115,7 +115,7 @@ func (s *Search) reset() {
|
|||
s.MetricBlockRef.MetricName = s.MetricBlockRef.MetricName[:0]
|
||||
s.MetricBlockRef.BlockRef = nil
|
||||
|
||||
s.storage = nil
|
||||
s.idb = nil
|
||||
s.ts.reset()
|
||||
s.tr = TimeRange{}
|
||||
s.tfss = nil
|
||||
|
@ -155,7 +155,7 @@ func (s *Search) Init(storage *Storage, tfss []*TagFilters, tr TimeRange, maxMet
|
|||
return 0
|
||||
}
|
||||
|
||||
s.storage = storage
|
||||
s.idb = storage.idb()
|
||||
return len(tsids)
|
||||
}
|
||||
|
||||
|
@ -191,7 +191,7 @@ func (s *Search) NextMetricBlock() bool {
|
|||
s.loops++
|
||||
tsid := &s.ts.BlockRef.bh.TSID
|
||||
var err error
|
||||
s.MetricBlockRef.MetricName, err = s.storage.searchMetricName(s.MetricBlockRef.MetricName[:0], tsid.MetricID)
|
||||
s.MetricBlockRef.MetricName, err = s.idb.searchMetricNameWithCache(s.MetricBlockRef.MetricName[:0], tsid.MetricID)
|
||||
if err != nil {
|
||||
if err == io.EOF {
|
||||
// Skip missing metricName for tsid.MetricID.
|
||||
|
|
|
@ -867,14 +867,12 @@ func (s *Storage) SearchMetricNames(tfss []*TagFilters, tr TimeRange, maxMetrics
|
|||
return nil, err
|
||||
}
|
||||
idb := s.idb()
|
||||
is := idb.getIndexSearch(deadline)
|
||||
defer idb.putIndexSearch(is)
|
||||
mns := make([]MetricName, 0, len(tsids))
|
||||
var metricName []byte
|
||||
for i := range tsids {
|
||||
metricID := tsids[i].MetricID
|
||||
var err error
|
||||
metricName, err = is.searchMetricName(metricName[:0], metricID)
|
||||
metricName, err = idb.searchMetricNameWithCache(metricName[:0], metricID)
|
||||
if err != nil {
|
||||
if err == io.EOF {
|
||||
// Skip missing metricName for metricID.
|
||||
|
@ -939,7 +937,7 @@ var (
|
|||
|
||||
// prefetchMetricNames pre-fetches metric names for the given tsids into metricID->metricName cache.
|
||||
//
|
||||
// This should speed-up further searchMetricName calls for metricIDs from tsids.
|
||||
// This should speed-up further searchMetricNameWithCache calls for metricIDs from tsids.
|
||||
func (s *Storage) prefetchMetricNames(tsids []TSID, deadline uint64) error {
|
||||
if len(tsids) == 0 {
|
||||
return nil
|
||||
|
@ -962,6 +960,7 @@ func (s *Storage) prefetchMetricNames(tsids []TSID, deadline uint64) error {
|
|||
|
||||
// Pre-fetch metricIDs.
|
||||
sort.Sort(metricIDs)
|
||||
var missingMetricIDs []uint64
|
||||
var metricName []byte
|
||||
var err error
|
||||
idb := s.idb()
|
||||
|
@ -973,11 +972,34 @@ func (s *Storage) prefetchMetricNames(tsids []TSID, deadline uint64) error {
|
|||
return err
|
||||
}
|
||||
}
|
||||
metricName, err = is.searchMetricName(metricName[:0], metricID)
|
||||
if err != nil && err != io.EOF {
|
||||
metricName, err = is.searchMetricNameWithCache(metricName[:0], metricID)
|
||||
if err != nil {
|
||||
if err == io.EOF {
|
||||
missingMetricIDs = append(missingMetricIDs, metricID)
|
||||
continue
|
||||
}
|
||||
return fmt.Errorf("error in pre-fetching metricName for metricID=%d: %w", metricID, err)
|
||||
}
|
||||
}
|
||||
idb.doExtDB(func(extDB *indexDB) {
|
||||
is := extDB.getIndexSearch(deadline)
|
||||
defer extDB.putIndexSearch(is)
|
||||
for loops, metricID := range missingMetricIDs {
|
||||
if loops&paceLimiterSlowIterationsMask == 0 {
|
||||
if err = checkSearchDeadlineAndPace(is.deadline); err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
metricName, err = is.searchMetricNameWithCache(metricName[:0], metricID)
|
||||
if err != nil && err != io.EOF {
|
||||
err = fmt.Errorf("error in pre-fetching metricName for metricID=%d in extDB: %w", metricID, err)
|
||||
return
|
||||
}
|
||||
}
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Store the pre-fetched metricIDs, so they aren't pre-fetched next time.
|
||||
|
||||
|
@ -1011,12 +1033,6 @@ func (s *Storage) DeleteMetrics(tfss []*TagFilters) (int, error) {
|
|||
return deletedCount, nil
|
||||
}
|
||||
|
||||
// searchMetricName appends metric name for the given metricID to dst
|
||||
// and returns the result.
|
||||
func (s *Storage) searchMetricName(dst []byte, metricID uint64) ([]byte, error) {
|
||||
return s.idb().searchMetricName(dst, metricID)
|
||||
}
|
||||
|
||||
// SearchTagKeysOnTimeRange searches for tag keys on tr.
|
||||
func (s *Storage) SearchTagKeysOnTimeRange(tr TimeRange, maxTagKeys int, deadline uint64) ([]string, error) {
|
||||
return s.idb().SearchTagKeysOnTimeRange(tr, maxTagKeys, deadline)
|
||||
|
|
Loading…
Reference in a new issue