From 1985110de2fcc4efdaf522502d1908a5d8867f1e Mon Sep 17 00:00:00 2001 From: Nikolay Date: Fri, 15 Nov 2024 16:18:32 +0100 Subject: [PATCH] lib/storage: properly check for minMissingTimestamps After changes at commit 787b9cd. Minimal timestamps for extDB check was performed without context of the index search prefix. It worked fine for Single node version, but for cluster version a different prefix was used for metricID search requests. It may lead to incomplete results, if minimal missing timestamp was cached for the tenant with different ingestion patterns. Minimal reproducible case is: - metrics were ingested for tenants 0 and 1 - at some point in time metrics ingestion for tenant 1 stopped - index records have the following timestamps layout: tenant 0: 1,2,3,4,5,6 tenant 1: 1,2,3,4 - after indexDB rotation, containsTimeRange lookups may produce incorrect results: time range request for tenant 1 - 5:6 caches 5 as min timestamp request for the same or smaller time range for tenant 0 now returns empty results. Second case: - requests for the tenant without metrics always updates atomic value with incorrect minimal time range for other tenants. This commit replaces single atomic with map of search prefix keys. It should have slight performance overhead, but work consistently for cluster version. minMissingTimestamp is cached by prefix search key, which included tenantID. Since it will be only populated at runtime, it doesn't hold unused tenants for queries. Related issue: https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7417 --- docs/changelog/CHANGELOG.md | 7 +- lib/storage/index_db.go | 45 ++++++++----- lib/storage/index_db_test.go | 127 +++++++++++++++++++++++++++++++++++ 3 files changed, 160 insertions(+), 19 deletions(-) diff --git a/docs/changelog/CHANGELOG.md b/docs/changelog/CHANGELOG.md index e507bc482..eb8414aa3 100644 --- a/docs/changelog/CHANGELOG.md +++ b/docs/changelog/CHANGELOG.md @@ -24,10 +24,9 @@ See also [LTS releases](https://docs.victoriametrics.com/lts-releases/). * BUGFIX: [vmctl](https://docs.victoriametrics.com/vmctl/): drop rows that do not belong to the current series during import. The dropped rows should belong to another series whose tags are a superset of the current series. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7301) and [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/7330). Thanks to @dpedu for reporting and cooperating with the test. * BUGFIX: [vmsingle](https://docs.victoriametrics.com/single-server-victoriametrics/), `vmselect` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/cluster-victoriametrics/): keep the order of resulting time series when `limit_offset` is applied. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7068). * BUGFIX: [graphite](https://docs.victoriametrics.com/#graphite-render-api-usage): properly handle xFilesFactor=0 for `transformRemoveEmptySeries` function. See [this PR](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/7337) for details. -* BUGFIX: [vmauth](https://docs.victoriametrics.com/vmauth/): properly check availability of all the backends before giving up when proxying requests. Previously, vmauth could return an error even if there were healthy backends available. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3061) for details. -* BUGFIX: [vmauth](https://docs.victoriametrics.com/vmauth/): properly inherit [`drop_src_path_prefix_parts`](https://docs.victoriametrics.com/vmauth/#dropping-request-path-prefix), [`load_balancing_policy`](https://docs.victoriametrics.com/vmauth/#high-availability), [`retry_status_codes`](https://docs.victoriametrics.com/vmauth/#load-balancing) and [`discover_backend_ips`](https://docs.victoriametrics.com/vmauth/#discovering-backend-ips) options by `url_map` entries if `url_prefix` option isn't set at the [user config level](https://docs.victoriametrics.com/vmauth/#auth-config). These options were inherited only when the `url_prefix` option was set. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7519). -* BUGFIX: [dashboards](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/dashboards): add `file` label filter to vmalert dashboard panels. Previously, metrics from groups with the same name but different rule files could be mixed in the results. -BUGFIX: `vmselect` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/cluster-victoriametrics/): Properly handle [multitenant](https://docs.victoriametrics.com/cluster-victoriametrics/#multitenancy-via-labels) query request errors and correctly perform search for available tenants. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7549) for details. +* BUGFIX: [vmauth](https://docs.victoriametrics.com/vmauth): properly check availability of all the backends before giving up when proxying requests. Previously, vmauth could return an error even if there were healthy backends available. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3061) for details. +* BUGFIX: `vmstorage` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/cluster-victoriametrics/): Properly return query results for search requests after index rotation. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7417) for details. +* BUGFIX: `vmselect` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/cluster-victoriametrics/): Properly handle [multitenant](https://docs.victoriametrics.com/cluster-victoriametrics/#multitenancy-via-labels) query request errors and correctly perform search for available tenants. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7549) for details. ## [v1.106.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.106.0) diff --git a/lib/storage/index_db.go b/lib/storage/index_db.go index c935e33b8..50bc3b3dd 100644 --- a/lib/storage/index_db.go +++ b/lib/storage/index_db.go @@ -91,13 +91,17 @@ type indexDB struct { // The db must be automatically recovered after that. missingMetricNamesForMetricID atomic.Uint64 - // minMissingTimestamp is the minimum timestamp, which is missing in the given indexDB. + // minMissingTimestampByKey holds the minimum timestamps by index search key, + // which is missing in the given indexDB. + // Key must be formed with marshalCommonPrefix function. // // This field is used at containsTimeRange() function only for the previous indexDB, // since this indexDB is readonly. // This field cannot be used for the current indexDB, since it may receive data // with bigger timestamps at any time. - minMissingTimestamp atomic.Int64 + minMissingTimestampByKey map[string]int64 + // protects minMissingTimestampByKey + minMissingTimestampByKeyLock sync.RWMutex // generation identifies the index generation ID // and is used for syncing items from different indexDBs @@ -162,6 +166,7 @@ func mustOpenIndexDB(path string, s *Storage, isReadOnly *atomic.Bool) *indexDB tb: tb, name: name, + minMissingTimestampByKey: make(map[string]int64), tagFiltersToMetricIDsCache: workingsetcache.New(tagFiltersCacheSize), s: s, loopsPerDateTagFilterCache: workingsetcache.New(mem / 128), @@ -1945,25 +1950,36 @@ func (is *indexSearch) containsTimeRange(tr TimeRange) bool { // This means that it may contain data for the given tr with probability close to 100%. return true } - // The db corresponds to the previous indexDB, which is readonly. // So it is safe caching the minimum timestamp, which isn't covered by the db. - minMissingTimestamp := db.minMissingTimestamp.Load() - if minMissingTimestamp != 0 && tr.MinTimestamp >= minMissingTimestamp { + + // use common prefix as a key for minMissingTimestamp + // it's needed to properly track timestamps for cluster version + // which uses tenant labels for the index search + kb := &is.kb + kb.B = is.marshalCommonPrefix(kb.B[:0], nsPrefixDateToMetricID) + key := kb.B + + db.minMissingTimestampByKeyLock.RLock() + minMissingTimestamp, ok := db.minMissingTimestampByKey[string(key)] + db.minMissingTimestampByKeyLock.RUnlock() + + if ok && tr.MinTimestamp >= minMissingTimestamp { return false } - - if is.containsTimeRangeSlow(tr) { + if is.containsTimeRangeSlowForPrefixBuf(kb, tr) { return true } - db.minMissingTimestamp.CompareAndSwap(minMissingTimestamp, tr.MinTimestamp) + db.minMissingTimestampByKeyLock.Lock() + db.minMissingTimestampByKey[string(key)] = tr.MinTimestamp + db.minMissingTimestampByKeyLock.Unlock() + return false } -func (is *indexSearch) containsTimeRangeSlow(tr TimeRange) bool { +func (is *indexSearch) containsTimeRangeSlowForPrefixBuf(prefixBuf *bytesutil.ByteBuffer, tr TimeRange) bool { ts := &is.ts - kb := &is.kb // Verify whether the tr.MinTimestamp is included into `ts` or is smaller than the minimum date stored in `ts`. // Do not check whether tr.MaxTimestamp is included into `ts` or is bigger than the max date stored in `ts` for performance reasons. @@ -1972,13 +1988,12 @@ func (is *indexSearch) containsTimeRangeSlow(tr TimeRange) bool { // The main practical case allows skipping searching in prev indexdb (`ts`) when `tr` // is located above the max date stored there. minDate := uint64(tr.MinTimestamp) / msecPerDay - kb.B = is.marshalCommonPrefix(kb.B[:0], nsPrefixDateToMetricID) - prefix := kb.B - kb.B = encoding.MarshalUint64(kb.B, minDate) - ts.Seek(kb.B) + prefix := prefixBuf.B + prefixBuf.B = encoding.MarshalUint64(prefixBuf.B, minDate) + ts.Seek(prefixBuf.B) if !ts.NextItem() { if err := ts.Error(); err != nil { - logger.Panicf("FATAL: error when searching for minDate=%d, prefix %q: %w", minDate, kb.B, err) + logger.Panicf("FATAL: error when searching for minDate=%d, prefix %q: %w", minDate, prefixBuf.B, err) } return false } diff --git a/lib/storage/index_db_test.go b/lib/storage/index_db_test.go index d8f7dcfd3..9aae673ce 100644 --- a/lib/storage/index_db_test.go +++ b/lib/storage/index_db_test.go @@ -2101,3 +2101,130 @@ func stopTestStorage(s *Storage) { s.tsidCache.Stop() fs.MustRemoveDirAtomic(s.cachePath) } + +func TestSearchContainsTimeRange(t *testing.T) { + path := t.Name() + os.RemoveAll(path) + s := MustOpenStorage(path, retentionMax, 0, 0) + db := s.idb() + + is := db.getIndexSearch(noDeadline) + + // Create a bunch of per-day time series + const ( + days = 6 + tenant2IngestionDay = 8 + metricsPerDay = 1000 + ) + rotationDay := time.Date(2019, time.October, 15, 5, 1, 0, 0, time.UTC) + rotationMillis := uint64(rotationDay.UnixMilli()) + rotationDate := rotationMillis / msecPerDay + var metricNameBuf []byte + perDayMetricIDs := make(map[uint64]*uint64set.Set) + labelNames := []string{ + "__name__", "constant", "day", "UniqueId", "some_unique_id", + } + + sort.Strings(labelNames) + + newMN := func(name string, day, metric int) MetricName { + var mn MetricName + mn.MetricGroup = []byte(name) + mn.AddTag( + "constant", + "const", + ) + mn.AddTag( + "day", + fmt.Sprintf("%v", day), + ) + mn.AddTag( + "UniqueId", + fmt.Sprintf("%v", metric), + ) + mn.AddTag( + "some_unique_id", + fmt.Sprintf("%v", day), + ) + mn.sortTags() + return mn + } + + // ingest metrics for tenant 0:0 + for day := 0; day < days; day++ { + date := rotationDate - uint64(day) + + var metricIDs uint64set.Set + for metric := range metricsPerDay { + mn := newMN("testMetric", day, metric) + metricNameBuf = mn.Marshal(metricNameBuf[:0]) + var genTSID generationTSID + if !is.getTSIDByMetricName(&genTSID, metricNameBuf, date) { + generateTSID(&genTSID.TSID, &mn) + createAllIndexesForMetricName(is, &mn, &genTSID.TSID, date) + } + metricIDs.Add(genTSID.TSID.MetricID) + } + + perDayMetricIDs[date] = &metricIDs + } + db.putIndexSearch(is) + + // Flush index to disk, so it becomes visible for search + s.DebugFlush() + + is2 := db.getIndexSearch(noDeadline) + + // Check that all the metrics are found for all the days. + for date := rotationDate - days + 1; date <= rotationDate; date++ { + + metricIDs, err := is2.getMetricIDsForDate(date, metricsPerDay) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + if !perDayMetricIDs[date].Equal(metricIDs) { + t.Fatalf("unexpected metricIDs found;\ngot\n%d\nwant\n%d", metricIDs.AppendTo(nil), perDayMetricIDs[date].AppendTo(nil)) + } + } + + db.putIndexSearch(is2) + + // rotate indexdb + s.mustRotateIndexDB(rotationDay) + db = s.idb() + + // perform search for 0:0 tenant + // results of previous search requests shouldn't affect it + + isExt := db.extDB.getIndexSearch(noDeadline) + // search for range that covers prev indexDB for dates before ingestion + tr := TimeRange{ + MinTimestamp: int64(rotationMillis - msecPerDay*(days)), + MaxTimestamp: int64(rotationMillis), + } + if !isExt.containsTimeRange(tr) { + t.Fatalf("expected to have given time range at prev IndexDB") + } + + // search for range not exist at prev indexDB + tr = TimeRange{ + MinTimestamp: int64(rotationMillis + msecPerDay*(days+4)), + MaxTimestamp: int64(rotationMillis + msecPerDay*(days+2)), + } + if isExt.containsTimeRange(tr) { + t.Fatalf("not expected to have given time range at prev IndexDB") + } + key := isExt.marshalCommonPrefix(nil, nsPrefixDateToMetricID) + + db.extDB.minMissingTimestampByKeyLock.Lock() + minMissingTimetamp := db.extDB.minMissingTimestampByKey[string(key)] + db.extDB.minMissingTimestampByKeyLock.Unlock() + + if minMissingTimetamp != tr.MinTimestamp { + t.Fatalf("unexpected minMissingTimestamp for 0:0 tenant got %d, want %d", minMissingTimetamp, tr.MinTimestamp) + } + + db.extDB.putIndexSearch(isExt) + s.MustClose() + fs.MustRemoveAll(path) +}