mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
lib/storage: search for TSIDs for the given metricIDs in the previous indexdb if they aren't found in the current indexdb
The issue triggers after the indexdb rotation for time series, which stop receiving new samples.
This results in missing data for such time series in query responses.
This commit should address the https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3502
The issue has been introduced in 2dd93449d8
This commit is contained in:
parent
dc0b08efb0
commit
6c98b56935
3 changed files with 75 additions and 32 deletions
|
@ -18,11 +18,10 @@ The following tip changes can be tested by building VictoriaMetrics components f
|
|||
* FEATURE: support overriding of `-search.latencyOffset` value via URL param `latency_offset` when performing requests to [/api/v1/query](https://docs.victoriametrics.com/keyConcepts.html#instant-query) and [/api/v1/query_range](https://docs.victoriametrics.com/keyConcepts.html#range-query). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3481).
|
||||
* FEATURE: allow changing field names in JSON logs if VictoriaMetrics components are started with `-loggerFormat=json` command-line flags. The field names can be changed with the `-loggerJSONFields` command-line flag. For example `-loggerJSONFields=ts:timestamp,msg:message` would rename `ts` and `msg` fields on the output JSON to `timestamp` and `message` fields. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2348). Thanks to @michal-kralik for [the pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/3488).
|
||||
|
||||
* BUGFIX: properly return query results for time series, which stop receiving new samples after the rotation of `indexdb`. Previously such time series could be missing in query results. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3502). The issue has been introduced in [v1.83.0](https://docs.victoriametrics.com/CHANGELOG.html#v1830).
|
||||
* BUGFIX: allow specifying values bigger than 2GiB to the following command-line flag values on 32-bit architectures (`386` and `arm`): `-storage.minFreeDiskSpaceBytes` and `-remoteWrite.maxDiskUsagePerURL`. Previously values bigger than 2GiB were incorrectly truncated on these architectures.
|
||||
* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): stop dropping metric name by a mistake on the [/metric-relabel-debug](https://docs.victoriametrics.com/vmagent.html#relabel-debug) page.
|
||||
|
||||
* BUGFIX: allow specifying values bigger than 2GiB to the following command-line flag values on 32-bit architectures (`386` and `arm`): `-storage.minFreeDiskSpaceBytes` and `-remoteWrite.maxDiskUsagePerURL`. Previously values bigger than 2GiB were incorrectly truncated on these architectures.
|
||||
|
||||
## [v1.85.1](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.85.1)
|
||||
|
||||
Released at 14-12-2022
|
||||
|
|
|
@ -1847,43 +1847,83 @@ func (db *indexDB) getTSIDsFromMetricIDs(qt *querytracer.Tracer, metricIDs []uin
|
|||
return nil, nil
|
||||
}
|
||||
tsids := make([]TSID, len(metricIDs))
|
||||
is := db.getIndexSearch(deadline)
|
||||
defer db.putIndexSearch(is)
|
||||
var extMetricIDs []uint64
|
||||
i := 0
|
||||
for loopsPaceLimiter, metricID := range metricIDs {
|
||||
if loopsPaceLimiter&paceLimiterSlowIterationsMask == 0 {
|
||||
if err := checkSearchDeadlineAndPace(is.deadline); err != nil {
|
||||
return nil, err
|
||||
err := func() error {
|
||||
is := db.getIndexSearch(deadline)
|
||||
defer db.putIndexSearch(is)
|
||||
for loopsPaceLimiter, metricID := range metricIDs {
|
||||
if loopsPaceLimiter&paceLimiterSlowIterationsMask == 0 {
|
||||
if err := checkSearchDeadlineAndPace(is.deadline); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
// Try obtaining TSIDs from MetricID->TSID cache. This is much faster
|
||||
// than scanning the mergeset if it contains a lot of metricIDs.
|
||||
tsid := &tsids[i]
|
||||
err := is.db.getFromMetricIDCache(tsid, metricID)
|
||||
if err == nil {
|
||||
// Fast path - the tsid for metricID is found in cache.
|
||||
i++
|
||||
continue
|
||||
}
|
||||
if err != io.EOF {
|
||||
return nil, err
|
||||
}
|
||||
if err := is.getTSIDByMetricID(tsid, metricID); err != nil {
|
||||
if err == io.EOF {
|
||||
// Cannot find TSID for the given metricID.
|
||||
// This may be the case on incomplete indexDB
|
||||
// due to snapshot or due to unflushed entries.
|
||||
// Just increment errors counter and skip it.
|
||||
atomic.AddUint64(&is.db.missingTSIDsForMetricID, 1)
|
||||
// Try obtaining TSIDs from MetricID->TSID cache. This is much faster
|
||||
// than scanning the mergeset if it contains a lot of metricIDs.
|
||||
tsid := &tsids[i]
|
||||
err := is.db.getFromMetricIDCache(tsid, metricID)
|
||||
if err == nil {
|
||||
// Fast path - the tsid for metricID is found in cache.
|
||||
i++
|
||||
continue
|
||||
}
|
||||
return nil, fmt.Errorf("cannot find tsid %d out of %d for metricID %d: %w", i, len(metricIDs), metricID, err)
|
||||
if err != io.EOF {
|
||||
return err
|
||||
}
|
||||
if err := is.getTSIDByMetricID(tsid, metricID); err != nil {
|
||||
if err == io.EOF {
|
||||
// Postpone searching for the metricID in the extDB.
|
||||
extMetricIDs = append(extMetricIDs, metricID)
|
||||
continue
|
||||
}
|
||||
return fmt.Errorf("cannot find tsid %d out of %d for metricID %d: %w", i, len(metricIDs), metricID, err)
|
||||
}
|
||||
is.db.putToMetricIDCache(metricID, tsid)
|
||||
i++
|
||||
}
|
||||
is.db.putToMetricIDCache(metricID, tsid)
|
||||
i++
|
||||
return nil
|
||||
}()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error when searching for TISDs by metricIDs in the current indexdb: %w", err)
|
||||
}
|
||||
tsidsFound := i
|
||||
qt.Printf("found %d tsids for %d metricIDs in the current indexdb", tsidsFound, len(metricIDs))
|
||||
|
||||
// Search for extMetricIDs in the extDB.
|
||||
db.doExtDB(func(extDB *indexDB) {
|
||||
is := extDB.getIndexSearch(deadline)
|
||||
defer extDB.putIndexSearch(is)
|
||||
for loopsPaceLimiter, metricID := range extMetricIDs {
|
||||
if loopsPaceLimiter&paceLimiterSlowIterationsMask == 0 {
|
||||
if err = checkSearchDeadlineAndPace(is.deadline); err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
// There is no need in searching for TSIDs in MetricID->TSID cache, since
|
||||
// this has been already done in the loop above (the MetricID->TSID cache is global).
|
||||
tsid := &tsids[i]
|
||||
if err = is.getTSIDByMetricID(tsid, metricID); err != nil {
|
||||
if err == io.EOF {
|
||||
// Cannot find TSID for the given metricID.
|
||||
// This may be the case on incomplete indexDB
|
||||
// due to snapshot or due to unflushed entries.
|
||||
// Just increment errors counter and skip it.
|
||||
atomic.AddUint64(&is.db.missingTSIDsForMetricID, 1)
|
||||
continue
|
||||
}
|
||||
err = fmt.Errorf("cannot find tsid for metricID=%d: %w", metricID, err)
|
||||
}
|
||||
is.db.putToMetricIDCache(metricID, tsid)
|
||||
i++
|
||||
}
|
||||
})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error when searching for TSIDs by metricIDs in the previous indexdb: %w", err)
|
||||
}
|
||||
qt.Printf("found %d tsids for %d metricIDs in the previous indexdb", i-tsidsFound, len(extMetricIDs))
|
||||
|
||||
tsids = tsids[:i]
|
||||
qt.Printf("load %d tsids from %d metricIDs", len(tsids), len(metricIDs))
|
||||
qt.Printf("load %d tsids for %d metricIDs from both current and previous indexdb", len(tsids), len(metricIDs))
|
||||
|
||||
// Sort the found tsids, since they must be passed to TSID search
|
||||
// in the sorted order.
|
||||
|
|
|
@ -91,6 +91,10 @@ func TestMergeSortedMetricIDs(t *testing.T) {
|
|||
f([]uint64{2, 3, 4, 6, 7, 8, 9}, []uint64{1, 2, 3, 4, 5, 6, 7})
|
||||
f([]uint64{1, 2, 3, 4, 6, 7, 8, 9}, []uint64{1, 2, 3, 4, 5, 6, 7})
|
||||
f([]uint64{1, 2, 3, 4, 6, 7, 8, 9}, []uint64{2, 3, 4, 5, 6, 7})
|
||||
f([]uint64{}, []uint64{1, 2, 3})
|
||||
f([]uint64{0}, []uint64{1, 2, 3})
|
||||
f([]uint64{1}, []uint64{1, 2, 3})
|
||||
f([]uint64{1, 2}, []uint64{3, 4})
|
||||
}
|
||||
|
||||
func TestReverseBytes(t *testing.T) {
|
||||
|
|
Loading…
Reference in a new issue