mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
lib/storage: properly check for minMissingTimestamps
After changes at commit 787b9cd
. Minimal timestamps for extDB check was performed without context of the index search prefix.
It worked fine for Single node version, but for cluster version a different prefix was used for
metricID search requests. It may lead to incomplete results, if minimal missing timestamp was cached
for the tenant with different ingestion patterns.
Minimal reproducible case is:
- metrics were ingested for tenants 0 and 1
- at some point in time metrics ingestion for tenant 1 stopped
- index records have the following timestamps layout:
tenant 0: 1,2,3,4,5,6
tenant 1: 1,2,3,4
- after indexDB rotation, containsTimeRange lookups may produce
incorrect results:
time range request for tenant 1 - 5:6 caches 5 as min timestamp
request for the same or smaller time range for tenant 0 now returns
empty results.
Second case:
- requests for the tenant without metrics always updates atomic value with incorrect minimal time range for other tenants.
This commit replaces single atomic with map of search prefix keys. It should have slight performance overhead,
but work consistently for cluster version. minMissingTimestamp is cached by prefix search key, which included tenantID.
Since it will be only populated at runtime, it doesn't hold unused tenants for queries.
Related issue:
https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7417
This commit is contained in:
parent
3120dc2054
commit
59d2b4c7fc
3 changed files with 226 additions and 19 deletions
|
@ -23,10 +23,9 @@ See also [LTS releases](https://docs.victoriametrics.com/lts-releases/).
|
|||
* BUGFIX: [vmctl](https://docs.victoriametrics.com/vmctl/): drop rows that do not belong to the current series during import. The dropped rows should belong to another series whose tags are a superset of the current series. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7301) and [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/7330). Thanks to @dpedu for reporting and cooperating with the test.
|
||||
* BUGFIX: [vmsingle](https://docs.victoriametrics.com/single-server-victoriametrics/), `vmselect` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/cluster-victoriametrics/): keep the order of resulting time series when `limit_offset` is applied. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7068).
|
||||
* BUGFIX: [graphite](https://docs.victoriametrics.com/#graphite-render-api-usage): properly handle xFilesFactor=0 for `transformRemoveEmptySeries` function. See [this PR](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/7337) for details.
|
||||
* BUGFIX: [vmauth](https://docs.victoriametrics.com/vmauth/): properly check availability of all the backends before giving up when proxying requests. Previously, vmauth could return an error even if there were healthy backends available. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3061) for details.
|
||||
* BUGFIX: [vmauth](https://docs.victoriametrics.com/vmauth/): properly inherit [`drop_src_path_prefix_parts`](https://docs.victoriametrics.com/vmauth/#dropping-request-path-prefix), [`load_balancing_policy`](https://docs.victoriametrics.com/vmauth/#high-availability), [`retry_status_codes`](https://docs.victoriametrics.com/vmauth/#load-balancing) and [`discover_backend_ips`](https://docs.victoriametrics.com/vmauth/#discovering-backend-ips) options by `url_map` entries if `url_prefix` option isn't set at the [user config level](https://docs.victoriametrics.com/vmauth/#auth-config). These options were inherited only when the `url_prefix` option was set. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7519).
|
||||
* BUGFIX: [dashboards](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/dashboards): add `file` label filter to vmalert dashboard panels. Previously, metrics from groups with the same name but different rule files could be mixed in the results.
|
||||
BUGFIX: `vmselect` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/cluster-victoriametrics/): Properly handle [multitenant](https://docs.victoriametrics.com/cluster-victoriametrics/#multitenancy-via-labels) query request errors and correctly perform search for available tenants. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7549) for details.
|
||||
* BUGFIX: [vmauth](https://docs.victoriametrics.com/vmauth): properly check availability of all the backends before giving up when proxying requests. Previously, vmauth could return an error even if there were healthy backends available. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3061) for details.
|
||||
* BUGFIX: `vmstorage` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/cluster-victoriametrics/): Properly return query results for search requests after index rotation. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7417) for details.
|
||||
* BUGFIX: `vmselect` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/cluster-victoriametrics/): Properly handle [multitenant](https://docs.victoriametrics.com/cluster-victoriametrics/#multitenancy-via-labels) query request errors and correctly perform search for available tenants. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7549) for details.
|
||||
|
||||
## [v1.106.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.106.0)
|
||||
|
||||
|
|
|
@ -91,13 +91,17 @@ type indexDB struct {
|
|||
// The db must be automatically recovered after that.
|
||||
missingMetricNamesForMetricID atomic.Uint64
|
||||
|
||||
// minMissingTimestamp is the minimum timestamp, which is missing in the given indexDB.
|
||||
// minMissingTimestampByKey holds the minimum timestamps by index search key,
|
||||
// which is missing in the given indexDB.
|
||||
// Key must be formed with marshalCommonPrefix function.
|
||||
//
|
||||
// This field is used at containsTimeRange() function only for the previous indexDB,
|
||||
// since this indexDB is readonly.
|
||||
// This field cannot be used for the current indexDB, since it may receive data
|
||||
// with bigger timestamps at any time.
|
||||
minMissingTimestamp atomic.Int64
|
||||
minMissingTimestampByKey map[string]int64
|
||||
// protects minMissingTimestampByKey
|
||||
minMissingTimestampByKeyLock sync.RWMutex
|
||||
|
||||
// generation identifies the index generation ID
|
||||
// and is used for syncing items from different indexDBs
|
||||
|
@ -162,6 +166,7 @@ func mustOpenIndexDB(path string, s *Storage, isReadOnly *atomic.Bool) *indexDB
|
|||
tb: tb,
|
||||
name: name,
|
||||
|
||||
minMissingTimestampByKey: make(map[string]int64),
|
||||
tagFiltersToMetricIDsCache: workingsetcache.New(tagFiltersCacheSize),
|
||||
s: s,
|
||||
loopsPerDateTagFilterCache: workingsetcache.New(mem / 128),
|
||||
|
@ -2100,25 +2105,36 @@ func (is *indexSearch) containsTimeRange(tr TimeRange) bool {
|
|||
// This means that it may contain data for the given tr with probability close to 100%.
|
||||
return true
|
||||
}
|
||||
|
||||
// The db corresponds to the previous indexDB, which is readonly.
|
||||
// So it is safe caching the minimum timestamp, which isn't covered by the db.
|
||||
minMissingTimestamp := db.minMissingTimestamp.Load()
|
||||
if minMissingTimestamp != 0 && tr.MinTimestamp >= minMissingTimestamp {
|
||||
|
||||
// use common prefix as a key for minMissingTimestamp
|
||||
// it's needed to properly track timestamps for cluster version
|
||||
// which uses tenant labels for the index search
|
||||
kb := &is.kb
|
||||
kb.B = is.marshalCommonPrefix(kb.B[:0], nsPrefixDateToMetricID)
|
||||
key := kb.B
|
||||
|
||||
db.minMissingTimestampByKeyLock.RLock()
|
||||
minMissingTimestamp, ok := db.minMissingTimestampByKey[string(key)]
|
||||
db.minMissingTimestampByKeyLock.RUnlock()
|
||||
|
||||
if ok && tr.MinTimestamp >= minMissingTimestamp {
|
||||
return false
|
||||
}
|
||||
|
||||
if is.containsTimeRangeSlow(tr) {
|
||||
if is.containsTimeRangeSlowForPrefixBuf(kb, tr) {
|
||||
return true
|
||||
}
|
||||
|
||||
db.minMissingTimestamp.CompareAndSwap(minMissingTimestamp, tr.MinTimestamp)
|
||||
db.minMissingTimestampByKeyLock.Lock()
|
||||
db.minMissingTimestampByKey[string(key)] = tr.MinTimestamp
|
||||
db.minMissingTimestampByKeyLock.Unlock()
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
func (is *indexSearch) containsTimeRangeSlow(tr TimeRange) bool {
|
||||
func (is *indexSearch) containsTimeRangeSlowForPrefixBuf(prefixBuf *bytesutil.ByteBuffer, tr TimeRange) bool {
|
||||
ts := &is.ts
|
||||
kb := &is.kb
|
||||
|
||||
// Verify whether the tr.MinTimestamp is included into `ts` or is smaller than the minimum date stored in `ts`.
|
||||
// Do not check whether tr.MaxTimestamp is included into `ts` or is bigger than the max date stored in `ts` for performance reasons.
|
||||
|
@ -2127,13 +2143,12 @@ func (is *indexSearch) containsTimeRangeSlow(tr TimeRange) bool {
|
|||
// The main practical case allows skipping searching in prev indexdb (`ts`) when `tr`
|
||||
// is located above the max date stored there.
|
||||
minDate := uint64(tr.MinTimestamp) / msecPerDay
|
||||
kb.B = is.marshalCommonPrefix(kb.B[:0], nsPrefixDateToMetricID)
|
||||
prefix := kb.B
|
||||
kb.B = encoding.MarshalUint64(kb.B, minDate)
|
||||
ts.Seek(kb.B)
|
||||
prefix := prefixBuf.B
|
||||
prefixBuf.B = encoding.MarshalUint64(prefixBuf.B, minDate)
|
||||
ts.Seek(prefixBuf.B)
|
||||
if !ts.NextItem() {
|
||||
if err := ts.Error(); err != nil {
|
||||
logger.Panicf("FATAL: error when searching for minDate=%d, prefix %q: %w", minDate, kb.B, err)
|
||||
logger.Panicf("FATAL: error when searching for minDate=%d, prefix %q: %w", minDate, prefixBuf.B, err)
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
|
|
@ -2216,3 +2216,196 @@ func stopTestStorage(s *Storage) {
|
|||
s.tsidCache.Stop()
|
||||
fs.MustRemoveDirAtomic(s.cachePath)
|
||||
}
|
||||
|
||||
func TestSearchContainsTimeRange(t *testing.T) {
|
||||
path := t.Name()
|
||||
os.RemoveAll(path)
|
||||
s := MustOpenStorage(path, retentionMax, 0, 0)
|
||||
db := s.idb()
|
||||
|
||||
is := db.getIndexSearch(0, 0, noDeadline)
|
||||
|
||||
// Create a bunch of per-day time series
|
||||
const (
|
||||
days = 6
|
||||
tenant2IngestionDay = 8
|
||||
metricsPerDay = 1000
|
||||
)
|
||||
rotationDay := time.Date(2019, time.October, 15, 5, 1, 0, 0, time.UTC)
|
||||
rotationMillis := uint64(rotationDay.UnixMilli())
|
||||
rotationDate := rotationMillis / msecPerDay
|
||||
var metricNameBuf []byte
|
||||
perDayMetricIDs := make(map[uint64]*uint64set.Set)
|
||||
labelNames := []string{
|
||||
"__name__", "constant", "day", "UniqueId", "some_unique_id",
|
||||
}
|
||||
|
||||
sort.Strings(labelNames)
|
||||
|
||||
newMN := func(name string, day, metric int) MetricName {
|
||||
var mn MetricName
|
||||
mn.MetricGroup = []byte(name)
|
||||
mn.AddTag(
|
||||
"constant",
|
||||
"const",
|
||||
)
|
||||
mn.AddTag(
|
||||
"day",
|
||||
fmt.Sprintf("%v", day),
|
||||
)
|
||||
mn.AddTag(
|
||||
"UniqueId",
|
||||
fmt.Sprintf("%v", metric),
|
||||
)
|
||||
mn.AddTag(
|
||||
"some_unique_id",
|
||||
fmt.Sprintf("%v", day),
|
||||
)
|
||||
mn.sortTags()
|
||||
return mn
|
||||
}
|
||||
|
||||
// ingest metrics for tenant 0:0
|
||||
for day := 0; day < days; day++ {
|
||||
date := rotationDate - uint64(day)
|
||||
|
||||
var metricIDs uint64set.Set
|
||||
for metric := range metricsPerDay {
|
||||
mn := newMN("testMetric", day, metric)
|
||||
metricNameBuf = mn.Marshal(metricNameBuf[:0])
|
||||
var genTSID generationTSID
|
||||
if !is.getTSIDByMetricName(&genTSID, metricNameBuf, date) {
|
||||
generateTSID(&genTSID.TSID, &mn)
|
||||
createAllIndexesForMetricName(is, &mn, &genTSID.TSID, date)
|
||||
}
|
||||
metricIDs.Add(genTSID.TSID.MetricID)
|
||||
}
|
||||
|
||||
perDayMetricIDs[date] = &metricIDs
|
||||
}
|
||||
db.putIndexSearch(is)
|
||||
|
||||
// ingest metrics for tenant 1:1
|
||||
isTenant2 := db.getIndexSearch(1, 1, noDeadline)
|
||||
{
|
||||
var metricIDs uint64set.Set
|
||||
// ingestion day must be outside of tenant 0:0 data ingestion
|
||||
date := rotationDate - uint64(tenant2IngestionDay)
|
||||
|
||||
for metric := range metricsPerDay {
|
||||
mn := newMN("testMetric2", tenant2IngestionDay, metric)
|
||||
mn.AccountID = 1
|
||||
mn.ProjectID = 1
|
||||
metricNameBuf = mn.Marshal(metricNameBuf[:0])
|
||||
var genTSID generationTSID
|
||||
if !isTenant2.getTSIDByMetricName(&genTSID, metricNameBuf, date) {
|
||||
generateTSID(&genTSID.TSID, &mn)
|
||||
createAllIndexesForMetricName(isTenant2, &mn, &genTSID.TSID, date)
|
||||
}
|
||||
metricIDs.Add(genTSID.TSID.MetricID)
|
||||
}
|
||||
perDayMetricIDs[date] = &metricIDs
|
||||
}
|
||||
db.putIndexSearch(isTenant2)
|
||||
|
||||
// Flush index to disk, so it becomes visible for search
|
||||
s.DebugFlush()
|
||||
|
||||
is2 := db.getIndexSearch(0, 0, noDeadline)
|
||||
|
||||
// Check that all the metrics are found for all the days.
|
||||
for date := rotationDate - days + 1; date <= rotationDate; date++ {
|
||||
|
||||
metricIDs, err := is2.getMetricIDsForDate(date, metricsPerDay)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %s", err)
|
||||
}
|
||||
if !perDayMetricIDs[date].Equal(metricIDs) {
|
||||
t.Fatalf("unexpected metricIDs found;\ngot\n%d\nwant\n%d", metricIDs.AppendTo(nil), perDayMetricIDs[date].AppendTo(nil))
|
||||
}
|
||||
}
|
||||
|
||||
db.putIndexSearch(is2)
|
||||
|
||||
// Check that all metrics for tenant 2 are found at ingestion day
|
||||
is2Tenant2 := db.getIndexSearch(1, 1, noDeadline)
|
||||
{
|
||||
date := rotationDate - uint64(tenant2IngestionDay+1)
|
||||
metricIDs, err := is2Tenant2.getMetricIDsForDate(date, metricsPerDay)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %s", err)
|
||||
}
|
||||
if !perDayMetricIDs[date].Equal(metricIDs) {
|
||||
t.Fatalf("unexpected 1:1 tenant metricIDs found;\ngot\n%d\nwant\n%d", metricIDs.AppendTo(nil), perDayMetricIDs[date].AppendTo(nil))
|
||||
}
|
||||
}
|
||||
db.putIndexSearch(is2Tenant2)
|
||||
|
||||
// rotate indexdb
|
||||
s.mustRotateIndexDB(rotationDay)
|
||||
db = s.idb()
|
||||
|
||||
isExtTenant2 := db.extDB.getIndexSearch(1, 1, noDeadline)
|
||||
|
||||
// search for range covers metrics for 1:1 at prev index
|
||||
tr := TimeRange{
|
||||
MinTimestamp: int64(rotationMillis - msecPerDay*(tenant2IngestionDay) + 1),
|
||||
MaxTimestamp: int64(rotationMillis),
|
||||
}
|
||||
if !isExtTenant2.containsTimeRange(tr) {
|
||||
t.Fatalf("expected to have given time range at prev IndexDB")
|
||||
}
|
||||
// search for range missing for 1:1 at prev index
|
||||
tr = TimeRange{
|
||||
MinTimestamp: int64(rotationMillis - msecPerDay*(tenant2IngestionDay-1)),
|
||||
MaxTimestamp: int64(rotationMillis),
|
||||
}
|
||||
if isExtTenant2.containsTimeRange(tr) {
|
||||
t.Fatalf("not expected to have given time range at prev IndexDB")
|
||||
}
|
||||
key := isExtTenant2.marshalCommonPrefix(nil, nsPrefixDateToMetricID)
|
||||
|
||||
db.extDB.minMissingTimestampByKeyLock.Lock()
|
||||
minMissingTimetamp := db.extDB.minMissingTimestampByKey[string(key)]
|
||||
db.extDB.minMissingTimestampByKeyLock.Unlock()
|
||||
|
||||
if minMissingTimetamp != tr.MinTimestamp {
|
||||
t.Fatalf("unexpected minMissingTimestamp for 1:1 tenant got %d, want %d", minMissingTimetamp, tr.MinTimestamp)
|
||||
}
|
||||
db.extDB.putIndexSearch(isExtTenant2)
|
||||
|
||||
// perform search for 0:0 tenant
|
||||
// results of previous search requests shouldn't affect it
|
||||
|
||||
isExt := db.extDB.getIndexSearch(0, 0, noDeadline)
|
||||
// search for range that covers prev indexDB for dates before ingestion
|
||||
tr = TimeRange{
|
||||
MinTimestamp: int64(rotationMillis - msecPerDay*(days)),
|
||||
MaxTimestamp: int64(rotationMillis),
|
||||
}
|
||||
if !isExt.containsTimeRange(tr) {
|
||||
t.Fatalf("expected to have given time range at prev IndexDB")
|
||||
}
|
||||
|
||||
// search for range not exist at prev indexDB
|
||||
tr = TimeRange{
|
||||
MinTimestamp: int64(rotationMillis + msecPerDay*(days+4)),
|
||||
MaxTimestamp: int64(rotationMillis + msecPerDay*(days+2)),
|
||||
}
|
||||
if isExt.containsTimeRange(tr) {
|
||||
t.Fatalf("not expected to have given time range at prev IndexDB")
|
||||
}
|
||||
key = isExt.marshalCommonPrefix(key[:0], nsPrefixDateToMetricID)
|
||||
|
||||
db.extDB.minMissingTimestampByKeyLock.Lock()
|
||||
minMissingTimetamp = db.extDB.minMissingTimestampByKey[string(key)]
|
||||
db.extDB.minMissingTimestampByKeyLock.Unlock()
|
||||
|
||||
if minMissingTimetamp != tr.MinTimestamp {
|
||||
t.Fatalf("unexpected minMissingTimestamp for 0:0 tenant got %d, want %d", minMissingTimetamp, tr.MinTimestamp)
|
||||
}
|
||||
|
||||
db.extDB.putIndexSearch(isExt)
|
||||
s.MustClose()
|
||||
fs.MustRemoveAll(path)
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue