diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 212840203..2b0d1848a 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -9,6 +9,8 @@ sort: 15 * FEATURE: vmagent: add service discovery for DigitalOcean (aka [digitalocean_sd_config](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#digitalocean_sd_config)). See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1367). * FEATURE: vmagent: show the number of samples the target returned during the last scrape on `/targets` and `/api/v1/targets` pages. This should simplify debugging targets, which may return too big or too low number of samples. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1377). +* BUGFIX: prevent from adding new samples to deleted time series after the rotation of the inverted index (the rotation is performed once per `-retentionPeriod`). See [this comment](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1347#issuecomment-861232136) for details. + ## [v1.61.1](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.61.1) diff --git a/lib/storage/index_db.go b/lib/storage/index_db.go index f2244438a..5123f98c3 100644 --- a/lib/storage/index_db.go +++ b/lib/storage/index_db.go @@ -103,15 +103,6 @@ type indexDB struct { loopsPerDateTagFilterCache *workingsetcache.Cache indexSearchPool sync.Pool - - // An inmemory set of deleted metricIDs. - // - // The set holds deleted metricIDs for the current db and for the extDB. - // - // It is safe to keep the set in memory even for big number of deleted - // metricIDs, since it usually requires 1 bit per deleted metricID. - deletedMetricIDs atomic.Value - deletedMetricIDsUpdateLock sync.Mutex } // openIndexDB opens index db from the given path with the given caches. @@ -140,14 +131,6 @@ func openIndexDB(path string, s *Storage) (*indexDB, error) { uselessTagFiltersCache: workingsetcache.New(mem/128, time.Hour), loopsPerDateTagFilterCache: workingsetcache.New(mem/128, time.Hour), } - - is := db.getIndexSearch(noDeadline) - dmis, err := is.loadDeletedMetricIDs() - db.putIndexSearch(is) - if err != nil { - return nil, fmt.Errorf("cannot load deleted metricIDs: %w", err) - } - db.setDeletedMetricIDs(dmis) return db, nil } @@ -214,7 +197,7 @@ func (db *indexDB) UpdateMetrics(m *IndexDBMetrics) { m.UselessTagFiltersCacheRequests += cs.GetCalls m.UselessTagFiltersCacheMisses += cs.Misses - m.DeletedMetricsCount += uint64(db.getDeletedMetricIDs().Len()) + m.DeletedMetricsCount += uint64(db.s.getDeletedMetricIDs().Len()) m.IndexDBRefCount += atomic.LoadUint64(&db.refCount) m.NewTimeseriesCreated += atomic.LoadUint64(&db.newTimeseriesCreated) @@ -260,12 +243,6 @@ func (db *indexDB) doExtDB(f func(extDB *indexDB)) bool { // // It decrements refCount for the previous extDB. func (db *indexDB) SetExtDB(extDB *indexDB) { - // Add deleted metricIDs from extDB to db. - if extDB != nil { - dmisExt := extDB.getDeletedMetricIDs() - db.updateDeletedMetricIDs(dmisExt) - } - db.extDBLock.Lock() prevExtDB := db.extDB db.extDB = extDB @@ -737,7 +714,7 @@ func (is *indexSearch) searchTagKeysOnDate(tks map[string]struct{}, date uint64, kb := &is.kb mp := &is.mp mp.Reset() - dmis := is.db.getDeletedMetricIDs() + dmis := is.db.s.getDeletedMetricIDs() loopsPaceLimiter := 0 kb.B = is.marshalCommonPrefix(kb.B[:0], nsPrefixDateTagToMetricIDs) kb.B = encoding.MarshalUint64(kb.B, date) @@ -817,7 +794,7 @@ func (is *indexSearch) searchTagKeys(tks map[string]struct{}, maxTagKeys int) er kb := &is.kb mp := &is.mp mp.Reset() - dmis := is.db.getDeletedMetricIDs() + dmis := is.db.s.getDeletedMetricIDs() loopsPaceLimiter := 0 kb.B = is.marshalCommonPrefix(kb.B[:0], nsPrefixTagToMetricIDs) prefix := kb.B @@ -935,7 +912,7 @@ func (is *indexSearch) searchTagValuesOnDate(tvs map[string]struct{}, tagKey []b kb := &is.kb mp := &is.mp mp.Reset() - dmis := is.db.getDeletedMetricIDs() + dmis := is.db.s.getDeletedMetricIDs() loopsPaceLimiter := 0 kb.B = is.marshalCommonPrefix(kb.B[:0], nsPrefixDateTagToMetricIDs) kb.B = encoding.MarshalUint64(kb.B, date) @@ -1021,7 +998,7 @@ func (is *indexSearch) searchTagValues(tvs map[string]struct{}, tagKey []byte, m kb := &is.kb mp := &is.mp mp.Reset() - dmis := is.db.getDeletedMetricIDs() + dmis := is.db.s.getDeletedMetricIDs() loopsPaceLimiter := 0 kb.B = is.marshalCommonPrefix(kb.B[:0], nsPrefixTagToMetricIDs) kb.B = marshalTagValue(kb.B, tagKey) @@ -1175,7 +1152,7 @@ func (is *indexSearch) searchTagValueSuffixesForPrefix(tvss map[string]struct{}, ts := &is.ts mp := &is.mp mp.Reset() - dmis := is.db.getDeletedMetricIDs() + dmis := is.db.s.getDeletedMetricIDs() loopsPaceLimiter := 0 ts.Seek(prefix) for len(tvss) < maxTagValueSuffixes && ts.NextItem() { @@ -1616,7 +1593,7 @@ func (db *indexDB) deleteMetricIDs(metricIDs []uint64) error { // atomically add deleted metricIDs to an inmemory map. dmis := &uint64set.Set{} dmis.AddMulti(metricIDs) - db.updateDeletedMetricIDs(dmis) + db.s.updateDeletedMetricIDs(dmis) // Reset TagFilters -> TSIDS cache, since it may contain deleted TSIDs. invalidateTagCache() @@ -1643,21 +1620,14 @@ func (db *indexDB) deleteMetricIDs(metricIDs []uint64) error { return err } -func (db *indexDB) getDeletedMetricIDs() *uint64set.Set { - return db.deletedMetricIDs.Load().(*uint64set.Set) -} - -func (db *indexDB) setDeletedMetricIDs(dmis *uint64set.Set) { - db.deletedMetricIDs.Store(dmis) -} - -func (db *indexDB) updateDeletedMetricIDs(metricIDs *uint64set.Set) { - db.deletedMetricIDsUpdateLock.Lock() - dmisOld := db.getDeletedMetricIDs() - dmisNew := dmisOld.Clone() - dmisNew.Union(metricIDs) - db.setDeletedMetricIDs(dmisNew) - db.deletedMetricIDsUpdateLock.Unlock() +func (db *indexDB) loadDeletedMetricIDs() (*uint64set.Set, error) { + is := db.getIndexSearch(noDeadline) + dmis, err := is.loadDeletedMetricIDs() + db.putIndexSearch(is) + if err != nil { + return nil, err + } + return dmis, nil } func (is *indexSearch) loadDeletedMetricIDs() (*uint64set.Set, error) { @@ -1751,7 +1721,7 @@ func (db *indexDB) searchTSIDs(tfss []*TagFilters, tr TimeRange, maxMetrics int, var tagFiltersKeyBufPool bytesutil.ByteBufferPool func (is *indexSearch) getTSIDByMetricName(dst *TSID, metricName []byte) error { - dmis := is.db.getDeletedMetricIDs() + dmis := is.db.s.getDeletedMetricIDs() ts := &is.ts kb := &is.kb kb.B = append(kb.B[:0], nsPrefixMetricNameToTSID) @@ -2315,7 +2285,7 @@ func (is *indexSearch) searchMetricIDs(tfss []*TagFilters, tr TimeRange, maxMetr sortedMetricIDs := metricIDs.AppendTo(nil) // Filter out deleted metricIDs. - dmis := is.db.getDeletedMetricIDs() + dmis := is.db.s.getDeletedMetricIDs() if dmis.Len() > 0 { metricIDsFiltered := sortedMetricIDs[:0] for _, metricID := range sortedMetricIDs { diff --git a/lib/storage/index_db_test.go b/lib/storage/index_db_test.go index 681dd8525..6cdeac28a 100644 --- a/lib/storage/index_db_test.go +++ b/lib/storage/index_db_test.go @@ -1711,13 +1711,15 @@ func toTFPointers(tfs []tagFilter) []*tagFilter { } func newTestStorage() *Storage { - return &Storage{ + s := &Storage{ cachePath: "test-storage-cache", metricIDCache: workingsetcache.New(1234, time.Hour), metricNameCache: workingsetcache.New(1234, time.Hour), tsidCache: workingsetcache.New(1234, time.Hour), } + s.setDeletedMetricIDs(&uint64set.Set{}) + return s } func stopTestStorage(s *Storage) { diff --git a/lib/storage/storage.go b/lib/storage/storage.go index 239dce203..bcf129207 100644 --- a/lib/storage/storage.go +++ b/lib/storage/storage.go @@ -120,6 +120,13 @@ type Storage struct { // The minimum timestamp when composite index search can be used. minTimestampForCompositeIndex int64 + + // An inmemory set of deleted metricIDs. + // + // It is safe to keep the set in memory even for big number of deleted + // metricIDs, since it usually requires 1 bit per deleted metricID. + deletedMetricIDs atomic.Value + deletedMetricIDsUpdateLock sync.Mutex } // OpenStorage opens storage on the given path with the given retentionMsecs. @@ -208,6 +215,18 @@ func OpenStorage(path string, retentionMsecs int64, maxHourlySeries, maxDailySer idbCurr.SetExtDB(idbPrev) s.idbCurr.Store(idbCurr) + // Load deleted metricIDs from idbCurr and idbPrev + dmisCurr, err := idbCurr.loadDeletedMetricIDs() + if err != nil { + return nil, fmt.Errorf("cannot load deleted metricIDs for the current indexDB: %w", err) + } + dmisPrev, err := idbPrev.loadDeletedMetricIDs() + if err != nil { + return nil, fmt.Errorf("cannot load deleted metricIDs for the previous indexDB: %w", err) + } + s.setDeletedMetricIDs(dmisCurr) + s.updateDeletedMetricIDs(dmisPrev) + // Load data tablePath := path + "/data" tb, err := openTable(tablePath, s.getDeletedMetricIDs, retentionMsecs) @@ -224,16 +243,29 @@ func OpenStorage(path string, retentionMsecs int64, maxHourlySeries, maxDailySer return s, nil } +func (s *Storage) getDeletedMetricIDs() *uint64set.Set { + return s.deletedMetricIDs.Load().(*uint64set.Set) +} + +func (s *Storage) setDeletedMetricIDs(dmis *uint64set.Set) { + s.deletedMetricIDs.Store(dmis) +} + +func (s *Storage) updateDeletedMetricIDs(metricIDs *uint64set.Set) { + s.deletedMetricIDsUpdateLock.Lock() + dmisOld := s.getDeletedMetricIDs() + dmisNew := dmisOld.Clone() + dmisNew.Union(metricIDs) + s.setDeletedMetricIDs(dmisNew) + s.deletedMetricIDsUpdateLock.Unlock() +} + // DebugFlush flushes recently added storage data, so it becomes visible to search. func (s *Storage) DebugFlush() { s.tb.flushRawRows() s.idb().tb.DebugFlush() } -func (s *Storage) getDeletedMetricIDs() *uint64set.Set { - return s.idb().getDeletedMetricIDs() -} - // CreateSnapshot creates snapshot for s and returns the snapshot name. func (s *Storage) CreateSnapshot() (string, error) { logger.Infof("creating Storage snapshot for %q...", s.path)