lib/storage: move deletedMetricIDs set from indexDB to Storage

This makes consitent the list of deleted metricIDs when it is used from both the current indexDB and the previous indexDB (aka extDB).
This should fix the issue, which could lead to storing new samples under deleted metricIDs after indexDB rotation.
See more details at https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1347#issuecomment-861232136 .

Thanks to @tangqipengleoo for the initial analysis and the pull request - https://github.com/VictoriaMetrics/VictoriaMetrics/pull/1383 .

This commit resolves the issue in more generic way compared to https://github.com/VictoriaMetrics/VictoriaMetrics/pull/1383 .

The downside of the commit is the deletedMetricIDs set isn't cleaned from the metricIDs outside the retention. It needs app restart.
This should be OK in most cases.
This commit is contained in:
Aliaksandr Valialkin 2021-06-15 14:56:51 +03:00
parent e028ad241a
commit 84fb59b0ba
4 changed files with 58 additions and 52 deletions

View file

@ -9,6 +9,8 @@ sort: 15
* FEATURE: vmagent: add service discovery for DigitalOcean (aka [digitalocean_sd_config](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#digitalocean_sd_config)). See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1367).
* FEATURE: vmagent: show the number of samples the target returned during the last scrape on `/targets` and `/api/v1/targets` pages. This should simplify debugging targets, which may return too big or too low number of samples. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1377).
* BUGFIX: prevent from adding new samples to deleted time series after the rotation of the inverted index (the rotation is performed once per `-retentionPeriod`). See [this comment](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1347#issuecomment-861232136) for details.
## [v1.61.1](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.61.1)

View file

@ -103,15 +103,6 @@ type indexDB struct {
loopsPerDateTagFilterCache *workingsetcache.Cache
indexSearchPool sync.Pool
// An inmemory set of deleted metricIDs.
//
// The set holds deleted metricIDs for the current db and for the extDB.
//
// It is safe to keep the set in memory even for big number of deleted
// metricIDs, since it usually requires 1 bit per deleted metricID.
deletedMetricIDs atomic.Value
deletedMetricIDsUpdateLock sync.Mutex
}
// openIndexDB opens index db from the given path with the given caches.
@ -140,14 +131,6 @@ func openIndexDB(path string, s *Storage) (*indexDB, error) {
uselessTagFiltersCache: workingsetcache.New(mem/128, time.Hour),
loopsPerDateTagFilterCache: workingsetcache.New(mem/128, time.Hour),
}
is := db.getIndexSearch(noDeadline)
dmis, err := is.loadDeletedMetricIDs()
db.putIndexSearch(is)
if err != nil {
return nil, fmt.Errorf("cannot load deleted metricIDs: %w", err)
}
db.setDeletedMetricIDs(dmis)
return db, nil
}
@ -214,7 +197,7 @@ func (db *indexDB) UpdateMetrics(m *IndexDBMetrics) {
m.UselessTagFiltersCacheRequests += cs.GetCalls
m.UselessTagFiltersCacheMisses += cs.Misses
m.DeletedMetricsCount += uint64(db.getDeletedMetricIDs().Len())
m.DeletedMetricsCount += uint64(db.s.getDeletedMetricIDs().Len())
m.IndexDBRefCount += atomic.LoadUint64(&db.refCount)
m.NewTimeseriesCreated += atomic.LoadUint64(&db.newTimeseriesCreated)
@ -260,12 +243,6 @@ func (db *indexDB) doExtDB(f func(extDB *indexDB)) bool {
//
// It decrements refCount for the previous extDB.
func (db *indexDB) SetExtDB(extDB *indexDB) {
// Add deleted metricIDs from extDB to db.
if extDB != nil {
dmisExt := extDB.getDeletedMetricIDs()
db.updateDeletedMetricIDs(dmisExt)
}
db.extDBLock.Lock()
prevExtDB := db.extDB
db.extDB = extDB
@ -737,7 +714,7 @@ func (is *indexSearch) searchTagKeysOnDate(tks map[string]struct{}, date uint64,
kb := &is.kb
mp := &is.mp
mp.Reset()
dmis := is.db.getDeletedMetricIDs()
dmis := is.db.s.getDeletedMetricIDs()
loopsPaceLimiter := 0
kb.B = is.marshalCommonPrefix(kb.B[:0], nsPrefixDateTagToMetricIDs)
kb.B = encoding.MarshalUint64(kb.B, date)
@ -817,7 +794,7 @@ func (is *indexSearch) searchTagKeys(tks map[string]struct{}, maxTagKeys int) er
kb := &is.kb
mp := &is.mp
mp.Reset()
dmis := is.db.getDeletedMetricIDs()
dmis := is.db.s.getDeletedMetricIDs()
loopsPaceLimiter := 0
kb.B = is.marshalCommonPrefix(kb.B[:0], nsPrefixTagToMetricIDs)
prefix := kb.B
@ -935,7 +912,7 @@ func (is *indexSearch) searchTagValuesOnDate(tvs map[string]struct{}, tagKey []b
kb := &is.kb
mp := &is.mp
mp.Reset()
dmis := is.db.getDeletedMetricIDs()
dmis := is.db.s.getDeletedMetricIDs()
loopsPaceLimiter := 0
kb.B = is.marshalCommonPrefix(kb.B[:0], nsPrefixDateTagToMetricIDs)
kb.B = encoding.MarshalUint64(kb.B, date)
@ -1021,7 +998,7 @@ func (is *indexSearch) searchTagValues(tvs map[string]struct{}, tagKey []byte, m
kb := &is.kb
mp := &is.mp
mp.Reset()
dmis := is.db.getDeletedMetricIDs()
dmis := is.db.s.getDeletedMetricIDs()
loopsPaceLimiter := 0
kb.B = is.marshalCommonPrefix(kb.B[:0], nsPrefixTagToMetricIDs)
kb.B = marshalTagValue(kb.B, tagKey)
@ -1175,7 +1152,7 @@ func (is *indexSearch) searchTagValueSuffixesForPrefix(tvss map[string]struct{},
ts := &is.ts
mp := &is.mp
mp.Reset()
dmis := is.db.getDeletedMetricIDs()
dmis := is.db.s.getDeletedMetricIDs()
loopsPaceLimiter := 0
ts.Seek(prefix)
for len(tvss) < maxTagValueSuffixes && ts.NextItem() {
@ -1616,7 +1593,7 @@ func (db *indexDB) deleteMetricIDs(metricIDs []uint64) error {
// atomically add deleted metricIDs to an inmemory map.
dmis := &uint64set.Set{}
dmis.AddMulti(metricIDs)
db.updateDeletedMetricIDs(dmis)
db.s.updateDeletedMetricIDs(dmis)
// Reset TagFilters -> TSIDS cache, since it may contain deleted TSIDs.
invalidateTagCache()
@ -1643,21 +1620,14 @@ func (db *indexDB) deleteMetricIDs(metricIDs []uint64) error {
return err
}
func (db *indexDB) getDeletedMetricIDs() *uint64set.Set {
return db.deletedMetricIDs.Load().(*uint64set.Set)
}
func (db *indexDB) setDeletedMetricIDs(dmis *uint64set.Set) {
db.deletedMetricIDs.Store(dmis)
}
func (db *indexDB) updateDeletedMetricIDs(metricIDs *uint64set.Set) {
db.deletedMetricIDsUpdateLock.Lock()
dmisOld := db.getDeletedMetricIDs()
dmisNew := dmisOld.Clone()
dmisNew.Union(metricIDs)
db.setDeletedMetricIDs(dmisNew)
db.deletedMetricIDsUpdateLock.Unlock()
func (db *indexDB) loadDeletedMetricIDs() (*uint64set.Set, error) {
is := db.getIndexSearch(noDeadline)
dmis, err := is.loadDeletedMetricIDs()
db.putIndexSearch(is)
if err != nil {
return nil, err
}
return dmis, nil
}
func (is *indexSearch) loadDeletedMetricIDs() (*uint64set.Set, error) {
@ -1751,7 +1721,7 @@ func (db *indexDB) searchTSIDs(tfss []*TagFilters, tr TimeRange, maxMetrics int,
var tagFiltersKeyBufPool bytesutil.ByteBufferPool
func (is *indexSearch) getTSIDByMetricName(dst *TSID, metricName []byte) error {
dmis := is.db.getDeletedMetricIDs()
dmis := is.db.s.getDeletedMetricIDs()
ts := &is.ts
kb := &is.kb
kb.B = append(kb.B[:0], nsPrefixMetricNameToTSID)
@ -2315,7 +2285,7 @@ func (is *indexSearch) searchMetricIDs(tfss []*TagFilters, tr TimeRange, maxMetr
sortedMetricIDs := metricIDs.AppendTo(nil)
// Filter out deleted metricIDs.
dmis := is.db.getDeletedMetricIDs()
dmis := is.db.s.getDeletedMetricIDs()
if dmis.Len() > 0 {
metricIDsFiltered := sortedMetricIDs[:0]
for _, metricID := range sortedMetricIDs {

View file

@ -1711,13 +1711,15 @@ func toTFPointers(tfs []tagFilter) []*tagFilter {
}
func newTestStorage() *Storage {
return &Storage{
s := &Storage{
cachePath: "test-storage-cache",
metricIDCache: workingsetcache.New(1234, time.Hour),
metricNameCache: workingsetcache.New(1234, time.Hour),
tsidCache: workingsetcache.New(1234, time.Hour),
}
s.setDeletedMetricIDs(&uint64set.Set{})
return s
}
func stopTestStorage(s *Storage) {

View file

@ -120,6 +120,13 @@ type Storage struct {
// The minimum timestamp when composite index search can be used.
minTimestampForCompositeIndex int64
// An inmemory set of deleted metricIDs.
//
// It is safe to keep the set in memory even for big number of deleted
// metricIDs, since it usually requires 1 bit per deleted metricID.
deletedMetricIDs atomic.Value
deletedMetricIDsUpdateLock sync.Mutex
}
// OpenStorage opens storage on the given path with the given retentionMsecs.
@ -208,6 +215,18 @@ func OpenStorage(path string, retentionMsecs int64, maxHourlySeries, maxDailySer
idbCurr.SetExtDB(idbPrev)
s.idbCurr.Store(idbCurr)
// Load deleted metricIDs from idbCurr and idbPrev
dmisCurr, err := idbCurr.loadDeletedMetricIDs()
if err != nil {
return nil, fmt.Errorf("cannot load deleted metricIDs for the current indexDB: %w", err)
}
dmisPrev, err := idbPrev.loadDeletedMetricIDs()
if err != nil {
return nil, fmt.Errorf("cannot load deleted metricIDs for the previous indexDB: %w", err)
}
s.setDeletedMetricIDs(dmisCurr)
s.updateDeletedMetricIDs(dmisPrev)
// Load data
tablePath := path + "/data"
tb, err := openTable(tablePath, s.getDeletedMetricIDs, retentionMsecs)
@ -224,16 +243,29 @@ func OpenStorage(path string, retentionMsecs int64, maxHourlySeries, maxDailySer
return s, nil
}
func (s *Storage) getDeletedMetricIDs() *uint64set.Set {
return s.deletedMetricIDs.Load().(*uint64set.Set)
}
func (s *Storage) setDeletedMetricIDs(dmis *uint64set.Set) {
s.deletedMetricIDs.Store(dmis)
}
func (s *Storage) updateDeletedMetricIDs(metricIDs *uint64set.Set) {
s.deletedMetricIDsUpdateLock.Lock()
dmisOld := s.getDeletedMetricIDs()
dmisNew := dmisOld.Clone()
dmisNew.Union(metricIDs)
s.setDeletedMetricIDs(dmisNew)
s.deletedMetricIDsUpdateLock.Unlock()
}
// DebugFlush flushes recently added storage data, so it becomes visible to search.
func (s *Storage) DebugFlush() {
s.tb.flushRawRows()
s.idb().tb.DebugFlush()
}
func (s *Storage) getDeletedMetricIDs() *uint64set.Set {
return s.idb().getDeletedMetricIDs()
}
// CreateSnapshot creates snapshot for s and returns the snapshot name.
func (s *Storage) CreateSnapshot() (string, error) {
logger.Infof("creating Storage snapshot for %q...", s.path)