From 84fb59b0baa2bfe0ebc9495d220b5658f3b0c799 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Tue, 15 Jun 2021 14:56:51 +0300 Subject: [PATCH] lib/storage: move deletedMetricIDs set from indexDB to Storage This makes consitent the list of deleted metricIDs when it is used from both the current indexDB and the previous indexDB (aka extDB). This should fix the issue, which could lead to storing new samples under deleted metricIDs after indexDB rotation. See more details at https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1347#issuecomment-861232136 . Thanks to @tangqipengleoo for the initial analysis and the pull request - https://github.com/VictoriaMetrics/VictoriaMetrics/pull/1383 . This commit resolves the issue in more generic way compared to https://github.com/VictoriaMetrics/VictoriaMetrics/pull/1383 . The downside of the commit is the deletedMetricIDs set isn't cleaned from the metricIDs outside the retention. It needs app restart. This should be OK in most cases. --- docs/CHANGELOG.md | 2 ++ lib/storage/index_db.go | 64 ++++++++++-------------------------- lib/storage/index_db_test.go | 4 ++- lib/storage/storage.go | 40 +++++++++++++++++++--- 4 files changed, 58 insertions(+), 52 deletions(-) diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 2128402039..2b0d1848a7 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -9,6 +9,8 @@ sort: 15 * FEATURE: vmagent: add service discovery for DigitalOcean (aka [digitalocean_sd_config](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#digitalocean_sd_config)). See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1367). * FEATURE: vmagent: show the number of samples the target returned during the last scrape on `/targets` and `/api/v1/targets` pages. This should simplify debugging targets, which may return too big or too low number of samples. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1377). +* BUGFIX: prevent from adding new samples to deleted time series after the rotation of the inverted index (the rotation is performed once per `-retentionPeriod`). See [this comment](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1347#issuecomment-861232136) for details. + ## [v1.61.1](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.61.1) diff --git a/lib/storage/index_db.go b/lib/storage/index_db.go index f2244438ad..5123f98c39 100644 --- a/lib/storage/index_db.go +++ b/lib/storage/index_db.go @@ -103,15 +103,6 @@ type indexDB struct { loopsPerDateTagFilterCache *workingsetcache.Cache indexSearchPool sync.Pool - - // An inmemory set of deleted metricIDs. - // - // The set holds deleted metricIDs for the current db and for the extDB. - // - // It is safe to keep the set in memory even for big number of deleted - // metricIDs, since it usually requires 1 bit per deleted metricID. - deletedMetricIDs atomic.Value - deletedMetricIDsUpdateLock sync.Mutex } // openIndexDB opens index db from the given path with the given caches. @@ -140,14 +131,6 @@ func openIndexDB(path string, s *Storage) (*indexDB, error) { uselessTagFiltersCache: workingsetcache.New(mem/128, time.Hour), loopsPerDateTagFilterCache: workingsetcache.New(mem/128, time.Hour), } - - is := db.getIndexSearch(noDeadline) - dmis, err := is.loadDeletedMetricIDs() - db.putIndexSearch(is) - if err != nil { - return nil, fmt.Errorf("cannot load deleted metricIDs: %w", err) - } - db.setDeletedMetricIDs(dmis) return db, nil } @@ -214,7 +197,7 @@ func (db *indexDB) UpdateMetrics(m *IndexDBMetrics) { m.UselessTagFiltersCacheRequests += cs.GetCalls m.UselessTagFiltersCacheMisses += cs.Misses - m.DeletedMetricsCount += uint64(db.getDeletedMetricIDs().Len()) + m.DeletedMetricsCount += uint64(db.s.getDeletedMetricIDs().Len()) m.IndexDBRefCount += atomic.LoadUint64(&db.refCount) m.NewTimeseriesCreated += atomic.LoadUint64(&db.newTimeseriesCreated) @@ -260,12 +243,6 @@ func (db *indexDB) doExtDB(f func(extDB *indexDB)) bool { // // It decrements refCount for the previous extDB. func (db *indexDB) SetExtDB(extDB *indexDB) { - // Add deleted metricIDs from extDB to db. - if extDB != nil { - dmisExt := extDB.getDeletedMetricIDs() - db.updateDeletedMetricIDs(dmisExt) - } - db.extDBLock.Lock() prevExtDB := db.extDB db.extDB = extDB @@ -737,7 +714,7 @@ func (is *indexSearch) searchTagKeysOnDate(tks map[string]struct{}, date uint64, kb := &is.kb mp := &is.mp mp.Reset() - dmis := is.db.getDeletedMetricIDs() + dmis := is.db.s.getDeletedMetricIDs() loopsPaceLimiter := 0 kb.B = is.marshalCommonPrefix(kb.B[:0], nsPrefixDateTagToMetricIDs) kb.B = encoding.MarshalUint64(kb.B, date) @@ -817,7 +794,7 @@ func (is *indexSearch) searchTagKeys(tks map[string]struct{}, maxTagKeys int) er kb := &is.kb mp := &is.mp mp.Reset() - dmis := is.db.getDeletedMetricIDs() + dmis := is.db.s.getDeletedMetricIDs() loopsPaceLimiter := 0 kb.B = is.marshalCommonPrefix(kb.B[:0], nsPrefixTagToMetricIDs) prefix := kb.B @@ -935,7 +912,7 @@ func (is *indexSearch) searchTagValuesOnDate(tvs map[string]struct{}, tagKey []b kb := &is.kb mp := &is.mp mp.Reset() - dmis := is.db.getDeletedMetricIDs() + dmis := is.db.s.getDeletedMetricIDs() loopsPaceLimiter := 0 kb.B = is.marshalCommonPrefix(kb.B[:0], nsPrefixDateTagToMetricIDs) kb.B = encoding.MarshalUint64(kb.B, date) @@ -1021,7 +998,7 @@ func (is *indexSearch) searchTagValues(tvs map[string]struct{}, tagKey []byte, m kb := &is.kb mp := &is.mp mp.Reset() - dmis := is.db.getDeletedMetricIDs() + dmis := is.db.s.getDeletedMetricIDs() loopsPaceLimiter := 0 kb.B = is.marshalCommonPrefix(kb.B[:0], nsPrefixTagToMetricIDs) kb.B = marshalTagValue(kb.B, tagKey) @@ -1175,7 +1152,7 @@ func (is *indexSearch) searchTagValueSuffixesForPrefix(tvss map[string]struct{}, ts := &is.ts mp := &is.mp mp.Reset() - dmis := is.db.getDeletedMetricIDs() + dmis := is.db.s.getDeletedMetricIDs() loopsPaceLimiter := 0 ts.Seek(prefix) for len(tvss) < maxTagValueSuffixes && ts.NextItem() { @@ -1616,7 +1593,7 @@ func (db *indexDB) deleteMetricIDs(metricIDs []uint64) error { // atomically add deleted metricIDs to an inmemory map. dmis := &uint64set.Set{} dmis.AddMulti(metricIDs) - db.updateDeletedMetricIDs(dmis) + db.s.updateDeletedMetricIDs(dmis) // Reset TagFilters -> TSIDS cache, since it may contain deleted TSIDs. invalidateTagCache() @@ -1643,21 +1620,14 @@ func (db *indexDB) deleteMetricIDs(metricIDs []uint64) error { return err } -func (db *indexDB) getDeletedMetricIDs() *uint64set.Set { - return db.deletedMetricIDs.Load().(*uint64set.Set) -} - -func (db *indexDB) setDeletedMetricIDs(dmis *uint64set.Set) { - db.deletedMetricIDs.Store(dmis) -} - -func (db *indexDB) updateDeletedMetricIDs(metricIDs *uint64set.Set) { - db.deletedMetricIDsUpdateLock.Lock() - dmisOld := db.getDeletedMetricIDs() - dmisNew := dmisOld.Clone() - dmisNew.Union(metricIDs) - db.setDeletedMetricIDs(dmisNew) - db.deletedMetricIDsUpdateLock.Unlock() +func (db *indexDB) loadDeletedMetricIDs() (*uint64set.Set, error) { + is := db.getIndexSearch(noDeadline) + dmis, err := is.loadDeletedMetricIDs() + db.putIndexSearch(is) + if err != nil { + return nil, err + } + return dmis, nil } func (is *indexSearch) loadDeletedMetricIDs() (*uint64set.Set, error) { @@ -1751,7 +1721,7 @@ func (db *indexDB) searchTSIDs(tfss []*TagFilters, tr TimeRange, maxMetrics int, var tagFiltersKeyBufPool bytesutil.ByteBufferPool func (is *indexSearch) getTSIDByMetricName(dst *TSID, metricName []byte) error { - dmis := is.db.getDeletedMetricIDs() + dmis := is.db.s.getDeletedMetricIDs() ts := &is.ts kb := &is.kb kb.B = append(kb.B[:0], nsPrefixMetricNameToTSID) @@ -2315,7 +2285,7 @@ func (is *indexSearch) searchMetricIDs(tfss []*TagFilters, tr TimeRange, maxMetr sortedMetricIDs := metricIDs.AppendTo(nil) // Filter out deleted metricIDs. - dmis := is.db.getDeletedMetricIDs() + dmis := is.db.s.getDeletedMetricIDs() if dmis.Len() > 0 { metricIDsFiltered := sortedMetricIDs[:0] for _, metricID := range sortedMetricIDs { diff --git a/lib/storage/index_db_test.go b/lib/storage/index_db_test.go index 681dd85259..6cdeac28a0 100644 --- a/lib/storage/index_db_test.go +++ b/lib/storage/index_db_test.go @@ -1711,13 +1711,15 @@ func toTFPointers(tfs []tagFilter) []*tagFilter { } func newTestStorage() *Storage { - return &Storage{ + s := &Storage{ cachePath: "test-storage-cache", metricIDCache: workingsetcache.New(1234, time.Hour), metricNameCache: workingsetcache.New(1234, time.Hour), tsidCache: workingsetcache.New(1234, time.Hour), } + s.setDeletedMetricIDs(&uint64set.Set{}) + return s } func stopTestStorage(s *Storage) { diff --git a/lib/storage/storage.go b/lib/storage/storage.go index 239dce2034..bcf1292078 100644 --- a/lib/storage/storage.go +++ b/lib/storage/storage.go @@ -120,6 +120,13 @@ type Storage struct { // The minimum timestamp when composite index search can be used. minTimestampForCompositeIndex int64 + + // An inmemory set of deleted metricIDs. + // + // It is safe to keep the set in memory even for big number of deleted + // metricIDs, since it usually requires 1 bit per deleted metricID. + deletedMetricIDs atomic.Value + deletedMetricIDsUpdateLock sync.Mutex } // OpenStorage opens storage on the given path with the given retentionMsecs. @@ -208,6 +215,18 @@ func OpenStorage(path string, retentionMsecs int64, maxHourlySeries, maxDailySer idbCurr.SetExtDB(idbPrev) s.idbCurr.Store(idbCurr) + // Load deleted metricIDs from idbCurr and idbPrev + dmisCurr, err := idbCurr.loadDeletedMetricIDs() + if err != nil { + return nil, fmt.Errorf("cannot load deleted metricIDs for the current indexDB: %w", err) + } + dmisPrev, err := idbPrev.loadDeletedMetricIDs() + if err != nil { + return nil, fmt.Errorf("cannot load deleted metricIDs for the previous indexDB: %w", err) + } + s.setDeletedMetricIDs(dmisCurr) + s.updateDeletedMetricIDs(dmisPrev) + // Load data tablePath := path + "/data" tb, err := openTable(tablePath, s.getDeletedMetricIDs, retentionMsecs) @@ -224,16 +243,29 @@ func OpenStorage(path string, retentionMsecs int64, maxHourlySeries, maxDailySer return s, nil } +func (s *Storage) getDeletedMetricIDs() *uint64set.Set { + return s.deletedMetricIDs.Load().(*uint64set.Set) +} + +func (s *Storage) setDeletedMetricIDs(dmis *uint64set.Set) { + s.deletedMetricIDs.Store(dmis) +} + +func (s *Storage) updateDeletedMetricIDs(metricIDs *uint64set.Set) { + s.deletedMetricIDsUpdateLock.Lock() + dmisOld := s.getDeletedMetricIDs() + dmisNew := dmisOld.Clone() + dmisNew.Union(metricIDs) + s.setDeletedMetricIDs(dmisNew) + s.deletedMetricIDsUpdateLock.Unlock() +} + // DebugFlush flushes recently added storage data, so it becomes visible to search. func (s *Storage) DebugFlush() { s.tb.flushRawRows() s.idb().tb.DebugFlush() } -func (s *Storage) getDeletedMetricIDs() *uint64set.Set { - return s.idb().getDeletedMetricIDs() -} - // CreateSnapshot creates snapshot for s and returns the snapshot name. func (s *Storage) CreateSnapshot() (string, error) { logger.Infof("creating Storage snapshot for %q...", s.path)