mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-10 15:14:09 +00:00
lib/storage: handle fatal errors inside indexSearch.getTSIDByMetricID() instead of returning them to the caller
This simplifies the code a bit at caller side
This commit is contained in:
parent
bf30ded160
commit
24cec79763
2 changed files with 33 additions and 37 deletions
|
@ -1849,6 +1849,8 @@ func (db *indexDB) getTSIDsFromMetricIDs(qt *querytracer.Tracer, accountID, proj
|
|||
if len(metricIDs) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// Search for TSIDs in the current indexdb
|
||||
tsids := make([]TSID, len(metricIDs))
|
||||
var extMetricIDs []uint64
|
||||
i := 0
|
||||
|
@ -1873,13 +1875,10 @@ func (db *indexDB) getTSIDsFromMetricIDs(qt *querytracer.Tracer, accountID, proj
|
|||
if err != io.EOF {
|
||||
return err
|
||||
}
|
||||
if err := is.getTSIDByMetricID(tsid, metricID); err != nil {
|
||||
if err == io.EOF {
|
||||
// Postpone searching for the metricID in the extDB.
|
||||
extMetricIDs = append(extMetricIDs, metricID)
|
||||
continue
|
||||
}
|
||||
return fmt.Errorf("cannot find tsid %d out of %d for metricID %d: %w", i, len(metricIDs), metricID, err)
|
||||
if !is.getTSIDByMetricID(tsid, metricID) {
|
||||
// Postpone searching for the missing metricID in the extDB.
|
||||
extMetricIDs = append(extMetricIDs, metricID)
|
||||
continue
|
||||
}
|
||||
is.db.putToMetricIDCache(metricID, tsid)
|
||||
i++
|
||||
|
@ -1892,40 +1891,37 @@ func (db *indexDB) getTSIDsFromMetricIDs(qt *querytracer.Tracer, accountID, proj
|
|||
tsidsFound := i
|
||||
qt.Printf("found %d tsids for %d metricIDs in the current indexdb", tsidsFound, len(metricIDs))
|
||||
|
||||
// Search for extMetricIDs in the extDB.
|
||||
db.doExtDB(func(extDB *indexDB) {
|
||||
is := extDB.getIndexSearch(accountID, projectID, deadline)
|
||||
defer extDB.putIndexSearch(is)
|
||||
for loopsPaceLimiter, metricID := range extMetricIDs {
|
||||
if loopsPaceLimiter&paceLimiterSlowIterationsMask == 0 {
|
||||
if err = checkSearchDeadlineAndPace(is.deadline); err != nil {
|
||||
return
|
||||
if len(extMetricIDs) > 0 {
|
||||
// Search for extMetricIDs in the previous indexdb (aka extDB)
|
||||
db.doExtDB(func(extDB *indexDB) {
|
||||
is := extDB.getIndexSearch(accountID, projectID, deadline)
|
||||
defer extDB.putIndexSearch(is)
|
||||
for loopsPaceLimiter, metricID := range extMetricIDs {
|
||||
if loopsPaceLimiter&paceLimiterSlowIterationsMask == 0 {
|
||||
if err = checkSearchDeadlineAndPace(is.deadline); err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
// There is no need in searching for TSIDs in MetricID->TSID cache, since
|
||||
// this has been already done in the loop above (the MetricID->TSID cache is global).
|
||||
tsid := &tsids[i]
|
||||
if err = is.getTSIDByMetricID(tsid, metricID); err != nil {
|
||||
if err == io.EOF {
|
||||
// There is no need in searching for TSIDs in MetricID->TSID cache, since
|
||||
// this has been already done in the loop above (the MetricID->TSID cache is global).
|
||||
tsid := &tsids[i]
|
||||
if !is.getTSIDByMetricID(tsid, metricID) {
|
||||
// Cannot find TSID for the given metricID.
|
||||
// This may be the case on incomplete indexDB
|
||||
// due to snapshot or due to unflushed entries.
|
||||
// Just increment errors counter and skip it for now.
|
||||
atomic.AddUint64(&is.db.missingTSIDsForMetricID, 1)
|
||||
err = nil
|
||||
continue
|
||||
}
|
||||
err = fmt.Errorf("cannot find tsid for metricID=%d: %w", metricID, err)
|
||||
return
|
||||
is.db.putToMetricIDCache(metricID, tsid)
|
||||
i++
|
||||
}
|
||||
is.db.putToMetricIDCache(metricID, tsid)
|
||||
i++
|
||||
})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error when searching for TSIDs by metricIDs in the previous indexdb: %w", err)
|
||||
}
|
||||
})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error when searching for TSIDs by metricIDs in the previous indexdb: %w", err)
|
||||
qt.Printf("found %d tsids for %d metricIDs in the previous indexdb", i-tsidsFound, len(extMetricIDs))
|
||||
}
|
||||
qt.Printf("found %d tsids for %d metricIDs in the previous indexdb", i-tsidsFound, len(extMetricIDs))
|
||||
|
||||
tsids = tsids[:i]
|
||||
qt.Printf("load %d tsids for %d metricIDs from both current and previous indexdb", len(tsids), len(metricIDs))
|
||||
|
@ -2039,7 +2035,7 @@ func (is *indexSearch) containsTimeRange(tr TimeRange) (bool, error) {
|
|||
return true, nil
|
||||
}
|
||||
|
||||
func (is *indexSearch) getTSIDByMetricID(dst *TSID, metricID uint64) error {
|
||||
func (is *indexSearch) getTSIDByMetricID(dst *TSID, metricID uint64) bool {
|
||||
// There is no need in checking for deleted metricIDs here, since they
|
||||
// must be checked by the caller.
|
||||
ts := &is.ts
|
||||
|
@ -2048,19 +2044,19 @@ func (is *indexSearch) getTSIDByMetricID(dst *TSID, metricID uint64) error {
|
|||
kb.B = encoding.MarshalUint64(kb.B, metricID)
|
||||
if err := ts.FirstItemWithPrefix(kb.B); err != nil {
|
||||
if err == io.EOF {
|
||||
return err
|
||||
return false
|
||||
}
|
||||
return fmt.Errorf("error when searching TSID by metricID; searchPrefix %q: %w", kb.B, err)
|
||||
logger.Panicf("FATAL: error when searching TSID by metricID=%d; searchPrefix %q: %s", metricID, kb.B, err)
|
||||
}
|
||||
v := ts.Item[len(kb.B):]
|
||||
tail, err := dst.Unmarshal(v)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot unmarshal TSID=%X: %w", v, err)
|
||||
logger.Panicf("FATAL: cannot unmarshal the found TSID=%X for metricID=%d: %s", v, metricID, err)
|
||||
}
|
||||
if len(tail) > 0 {
|
||||
return fmt.Errorf("unexpected non-zero tail left after unmarshaling TSID: %X", tail)
|
||||
logger.Panicf("FATAL: unexpected non-zero tail left after unmarshaling TSID for metricID=%d: %X", metricID, tail)
|
||||
}
|
||||
return nil
|
||||
return true
|
||||
}
|
||||
|
||||
// updateMetricIDsByMetricNameMatch matches metricName values for the given srcMetricIDs against tfs
|
||||
|
|
|
@ -1223,7 +1223,7 @@ func (s *Storage) prefetchMetricNames(qt *querytracer.Tracer, accountID, project
|
|||
qt.Printf("%d out of %d metric names must be pre-fetched", len(metricIDs), len(srcMetricIDs))
|
||||
if len(metricIDs) < 500 {
|
||||
// It is cheaper to skip pre-fetching and obtain metricNames inline.
|
||||
qt.Printf("skip pre-fetching metric names for low number of metrid ids=%d", len(metricIDs))
|
||||
qt.Printf("skip pre-fetching metric names for low number of metric ids=%d", len(metricIDs))
|
||||
return nil
|
||||
}
|
||||
atomic.AddUint64(&s.slowMetricNameLoads, uint64(len(metricIDs)))
|
||||
|
|
Loading…
Reference in a new issue