mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-02-09 15:27:11 +00:00
lib/storage: handle fatal errors inside indexSearch.getTSIDByMetricID() instead of returning them to the caller
This simplifies the code a bit at caller side
This commit is contained in:
parent
9f3f085d7f
commit
a09c680170
2 changed files with 33 additions and 37 deletions
|
@ -1695,6 +1695,8 @@ func (db *indexDB) getTSIDsFromMetricIDs(qt *querytracer.Tracer, metricIDs []uin
|
||||||
if len(metricIDs) == 0 {
|
if len(metricIDs) == 0 {
|
||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Search for TSIDs in the current indexdb
|
||||||
tsids := make([]TSID, len(metricIDs))
|
tsids := make([]TSID, len(metricIDs))
|
||||||
var extMetricIDs []uint64
|
var extMetricIDs []uint64
|
||||||
i := 0
|
i := 0
|
||||||
|
@ -1719,13 +1721,10 @@ func (db *indexDB) getTSIDsFromMetricIDs(qt *querytracer.Tracer, metricIDs []uin
|
||||||
if err != io.EOF {
|
if err != io.EOF {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if err := is.getTSIDByMetricID(tsid, metricID); err != nil {
|
if !is.getTSIDByMetricID(tsid, metricID) {
|
||||||
if err == io.EOF {
|
// Postpone searching for the missing metricID in the extDB.
|
||||||
// Postpone searching for the metricID in the extDB.
|
extMetricIDs = append(extMetricIDs, metricID)
|
||||||
extMetricIDs = append(extMetricIDs, metricID)
|
continue
|
||||||
continue
|
|
||||||
}
|
|
||||||
return fmt.Errorf("cannot find tsid %d out of %d for metricID %d: %w", i, len(metricIDs), metricID, err)
|
|
||||||
}
|
}
|
||||||
is.db.putToMetricIDCache(metricID, tsid)
|
is.db.putToMetricIDCache(metricID, tsid)
|
||||||
i++
|
i++
|
||||||
|
@ -1738,40 +1737,37 @@ func (db *indexDB) getTSIDsFromMetricIDs(qt *querytracer.Tracer, metricIDs []uin
|
||||||
tsidsFound := i
|
tsidsFound := i
|
||||||
qt.Printf("found %d tsids for %d metricIDs in the current indexdb", tsidsFound, len(metricIDs))
|
qt.Printf("found %d tsids for %d metricIDs in the current indexdb", tsidsFound, len(metricIDs))
|
||||||
|
|
||||||
// Search for extMetricIDs in the extDB.
|
if len(extMetricIDs) > 0 {
|
||||||
db.doExtDB(func(extDB *indexDB) {
|
// Search for extMetricIDs in the previous indexdb (aka extDB)
|
||||||
is := extDB.getIndexSearch(deadline)
|
db.doExtDB(func(extDB *indexDB) {
|
||||||
defer extDB.putIndexSearch(is)
|
is := extDB.getIndexSearch(deadline)
|
||||||
for loopsPaceLimiter, metricID := range extMetricIDs {
|
defer extDB.putIndexSearch(is)
|
||||||
if loopsPaceLimiter&paceLimiterSlowIterationsMask == 0 {
|
for loopsPaceLimiter, metricID := range extMetricIDs {
|
||||||
if err = checkSearchDeadlineAndPace(is.deadline); err != nil {
|
if loopsPaceLimiter&paceLimiterSlowIterationsMask == 0 {
|
||||||
return
|
if err = checkSearchDeadlineAndPace(is.deadline); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
// There is no need in searching for TSIDs in MetricID->TSID cache, since
|
||||||
// There is no need in searching for TSIDs in MetricID->TSID cache, since
|
// this has been already done in the loop above (the MetricID->TSID cache is global).
|
||||||
// this has been already done in the loop above (the MetricID->TSID cache is global).
|
tsid := &tsids[i]
|
||||||
tsid := &tsids[i]
|
if !is.getTSIDByMetricID(tsid, metricID) {
|
||||||
if err = is.getTSIDByMetricID(tsid, metricID); err != nil {
|
|
||||||
if err == io.EOF {
|
|
||||||
// Cannot find TSID for the given metricID.
|
// Cannot find TSID for the given metricID.
|
||||||
// This may be the case on incomplete indexDB
|
// This may be the case on incomplete indexDB
|
||||||
// due to snapshot or due to unflushed entries.
|
// due to snapshot or due to unflushed entries.
|
||||||
// Just increment errors counter and skip it for now.
|
// Just increment errors counter and skip it for now.
|
||||||
atomic.AddUint64(&is.db.missingTSIDsForMetricID, 1)
|
atomic.AddUint64(&is.db.missingTSIDsForMetricID, 1)
|
||||||
err = nil
|
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
err = fmt.Errorf("cannot find tsid for metricID=%d: %w", metricID, err)
|
is.db.putToMetricIDCache(metricID, tsid)
|
||||||
return
|
i++
|
||||||
}
|
}
|
||||||
is.db.putToMetricIDCache(metricID, tsid)
|
})
|
||||||
i++
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("error when searching for TSIDs by metricIDs in the previous indexdb: %w", err)
|
||||||
}
|
}
|
||||||
})
|
qt.Printf("found %d tsids for %d metricIDs in the previous indexdb", i-tsidsFound, len(extMetricIDs))
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("error when searching for TSIDs by metricIDs in the previous indexdb: %w", err)
|
|
||||||
}
|
}
|
||||||
qt.Printf("found %d tsids for %d metricIDs in the previous indexdb", i-tsidsFound, len(extMetricIDs))
|
|
||||||
|
|
||||||
tsids = tsids[:i]
|
tsids = tsids[:i]
|
||||||
qt.Printf("load %d tsids for %d metricIDs from both current and previous indexdb", len(tsids), len(metricIDs))
|
qt.Printf("load %d tsids for %d metricIDs from both current and previous indexdb", len(tsids), len(metricIDs))
|
||||||
|
@ -1884,7 +1880,7 @@ func (is *indexSearch) containsTimeRange(tr TimeRange) (bool, error) {
|
||||||
return true, nil
|
return true, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (is *indexSearch) getTSIDByMetricID(dst *TSID, metricID uint64) error {
|
func (is *indexSearch) getTSIDByMetricID(dst *TSID, metricID uint64) bool {
|
||||||
// There is no need in checking for deleted metricIDs here, since they
|
// There is no need in checking for deleted metricIDs here, since they
|
||||||
// must be checked by the caller.
|
// must be checked by the caller.
|
||||||
ts := &is.ts
|
ts := &is.ts
|
||||||
|
@ -1893,19 +1889,19 @@ func (is *indexSearch) getTSIDByMetricID(dst *TSID, metricID uint64) error {
|
||||||
kb.B = encoding.MarshalUint64(kb.B, metricID)
|
kb.B = encoding.MarshalUint64(kb.B, metricID)
|
||||||
if err := ts.FirstItemWithPrefix(kb.B); err != nil {
|
if err := ts.FirstItemWithPrefix(kb.B); err != nil {
|
||||||
if err == io.EOF {
|
if err == io.EOF {
|
||||||
return err
|
return false
|
||||||
}
|
}
|
||||||
return fmt.Errorf("error when searching TSID by metricID; searchPrefix %q: %w", kb.B, err)
|
logger.Panicf("FATAL: error when searching TSID by metricID=%d; searchPrefix %q: %s", metricID, kb.B, err)
|
||||||
}
|
}
|
||||||
v := ts.Item[len(kb.B):]
|
v := ts.Item[len(kb.B):]
|
||||||
tail, err := dst.Unmarshal(v)
|
tail, err := dst.Unmarshal(v)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("cannot unmarshal TSID=%X: %w", v, err)
|
logger.Panicf("FATAL: cannot unmarshal the found TSID=%X for metricID=%d: %s", v, metricID, err)
|
||||||
}
|
}
|
||||||
if len(tail) > 0 {
|
if len(tail) > 0 {
|
||||||
return fmt.Errorf("unexpected non-zero tail left after unmarshaling TSID: %X", tail)
|
logger.Panicf("FATAL: unexpected non-zero tail left after unmarshaling TSID for metricID=%d: %X", metricID, tail)
|
||||||
}
|
}
|
||||||
return nil
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
// updateMetricIDsByMetricNameMatch matches metricName values for the given srcMetricIDs against tfs
|
// updateMetricIDsByMetricNameMatch matches metricName values for the given srcMetricIDs against tfs
|
||||||
|
|
|
@ -1156,7 +1156,7 @@ func (s *Storage) prefetchMetricNames(qt *querytracer.Tracer, srcMetricIDs []uin
|
||||||
qt.Printf("%d out of %d metric names must be pre-fetched", len(metricIDs), len(srcMetricIDs))
|
qt.Printf("%d out of %d metric names must be pre-fetched", len(metricIDs), len(srcMetricIDs))
|
||||||
if len(metricIDs) < 500 {
|
if len(metricIDs) < 500 {
|
||||||
// It is cheaper to skip pre-fetching and obtain metricNames inline.
|
// It is cheaper to skip pre-fetching and obtain metricNames inline.
|
||||||
qt.Printf("skip pre-fetching metric names for low number of metrid ids=%d", len(metricIDs))
|
qt.Printf("skip pre-fetching metric names for low number of metric ids=%d", len(metricIDs))
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
atomic.AddUint64(&s.slowMetricNameLoads, uint64(len(metricIDs)))
|
atomic.AddUint64(&s.slowMetricNameLoads, uint64(len(metricIDs)))
|
||||||
|
|
Loading…
Reference in a new issue