From be0ab4fbfe411988c579931bf8977869a4c45f57 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Tue, 14 Jul 2020 14:02:14 +0300 Subject: [PATCH] lib/storage: reset `MetricName->TSID` cache after marking metricIDs as deleted This is a follow-up commit after 12b16077c4da153e82ad3011f610e5da5d5d46f7 , which didn't reset the `tsidCache` in all the required places. This could result in indefinite errors like: missing metricName by metricID ...; this could be the case after unclean shutdown; deleting the metricID, so it could be re-created next time Fix this by resetting the cache inside deleteMetricIDs function. --- lib/storage/index_db.go | 15 +++++++++++++-- lib/storage/index_db_test.go | 18 +++++++++++++----- lib/storage/index_db_timing_test.go | 12 +++++++++--- lib/storage/storage.go | 18 +++++++++--------- 4 files changed, 44 insertions(+), 19 deletions(-) diff --git a/lib/storage/index_db.go b/lib/storage/index_db.go index 1d306ec4b..e3edeec02 100644 --- a/lib/storage/index_db.go +++ b/lib/storage/index_db.go @@ -128,6 +128,9 @@ type indexDB struct { // Cache for fast MetricID -> MetricName lookup. metricNameCache *workingsetcache.Cache + // Cache for fast MetricName -> TSID lookups. + tsidCache *workingsetcache.Cache + // Cache for useless TagFilters entries, which have no tag filters // matching low number of metrics. uselessTagFiltersCache *workingsetcache.Cache @@ -155,13 +158,16 @@ type indexDB struct { } // openIndexDB opens index db from the given path with the given caches. -func openIndexDB(path string, metricIDCache, metricNameCache *workingsetcache.Cache, currHourMetricIDs, prevHourMetricIDs *atomic.Value) (*indexDB, error) { +func openIndexDB(path string, metricIDCache, metricNameCache, tsidCache *workingsetcache.Cache, currHourMetricIDs, prevHourMetricIDs *atomic.Value) (*indexDB, error) { if metricIDCache == nil { logger.Panicf("BUG: metricIDCache must be non-nil") } if metricNameCache == nil { logger.Panicf("BUG: metricNameCache must be non-nil") } + if tsidCache == nil { + logger.Panicf("BUG: tsidCache must be nin-nil") + } if currHourMetricIDs == nil { logger.Panicf("BUG: currHourMetricIDs must be non-nil") } @@ -187,6 +193,7 @@ func openIndexDB(path string, metricIDCache, metricNameCache *workingsetcache.Ca tagCache: workingsetcache.New(mem/32, time.Hour), metricIDCache: metricIDCache, metricNameCache: metricNameCache, + tsidCache: tsidCache, uselessTagFiltersCache: workingsetcache.New(mem/128, time.Hour), metricIDsPerDateTagFilterCache: workingsetcache.New(mem/128, time.Hour), @@ -360,6 +367,7 @@ func (db *indexDB) decRef() { db.tagCache = nil db.metricIDCache = nil db.metricNameCache = nil + db.tsidCache = nil db.uselessTagFiltersCache = nil db.metricIDsPerDateTagFilterCache = nil @@ -1233,6 +1241,9 @@ func (db *indexDB) deleteMetricIDs(metricIDs []uint64) error { // Reset TagFilters -> TSIDS cache, since it may contain deleted TSIDs. invalidateTagCache() + // Reset MetricName -> TSID cache, since it may contain deleted TSIDs. + db.tsidCache.Reset() + // Do not reset uselessTagFiltersCache, since the found metricIDs // on cache miss are filtered out later with deletedMetricIDs. return nil @@ -1515,7 +1526,7 @@ func (is *indexSearch) searchTSIDs(tfss []*TagFilters, tr TimeRange, maxMetrics tsids := make([]TSID, len(metricIDs)) i := 0 for _, metricID := range metricIDs { - // Try obtaining TSIDs from db.tsidCache. This is much faster + // Try obtaining TSIDs from MetricID->TSID cache. This is much faster // than scanning the mergeset if it contains a lot of metricIDs. tsid := &tsids[i] err := is.db.getFromMetricIDCache(tsid, metricID) diff --git a/lib/storage/index_db_test.go b/lib/storage/index_db_test.go index f1ff6aa62..cc57999d1 100644 --- a/lib/storage/index_db_test.go +++ b/lib/storage/index_db_test.go @@ -466,8 +466,10 @@ func TestMarshalUnmarshalTSIDs(t *testing.T) { func TestIndexDBOpenClose(t *testing.T) { metricIDCache := workingsetcache.New(1234, time.Hour) metricNameCache := workingsetcache.New(1234, time.Hour) + tsidCache := workingsetcache.New(1234, time.Hour) defer metricIDCache.Stop() defer metricNameCache.Stop() + defer tsidCache.Stop() var hmCurr atomic.Value hmCurr.Store(&hourMetricIDs{}) @@ -475,7 +477,7 @@ func TestIndexDBOpenClose(t *testing.T) { hmPrev.Store(&hourMetricIDs{}) for i := 0; i < 5; i++ { - db, err := openIndexDB("test-index-db", metricIDCache, metricNameCache, &hmCurr, &hmPrev) + db, err := openIndexDB("test-index-db", metricIDCache, metricNameCache, tsidCache, &hmCurr, &hmPrev) if err != nil { t.Fatalf("cannot open indexDB: %s", err) } @@ -494,8 +496,10 @@ func TestIndexDB(t *testing.T) { t.Run("serial", func(t *testing.T) { metricIDCache := workingsetcache.New(1234, time.Hour) metricNameCache := workingsetcache.New(1234, time.Hour) + tsidCache := workingsetcache.New(1234, time.Hour) defer metricIDCache.Stop() defer metricNameCache.Stop() + defer tsidCache.Stop() var hmCurr atomic.Value hmCurr.Store(&hourMetricIDs{}) @@ -503,7 +507,7 @@ func TestIndexDB(t *testing.T) { hmPrev.Store(&hourMetricIDs{}) dbName := "test-index-db-serial" - db, err := openIndexDB(dbName, metricIDCache, metricNameCache, &hmCurr, &hmPrev) + db, err := openIndexDB(dbName, metricIDCache, metricNameCache, tsidCache, &hmCurr, &hmPrev) if err != nil { t.Fatalf("cannot open indexDB: %s", err) } @@ -533,7 +537,7 @@ func TestIndexDB(t *testing.T) { // Re-open the db and verify it works as expected. db.MustClose() - db, err = openIndexDB(dbName, metricIDCache, metricNameCache, &hmCurr, &hmPrev) + db, err = openIndexDB(dbName, metricIDCache, metricNameCache, tsidCache, &hmCurr, &hmPrev) if err != nil { t.Fatalf("cannot open indexDB: %s", err) } @@ -551,8 +555,10 @@ func TestIndexDB(t *testing.T) { t.Run("concurrent", func(t *testing.T) { metricIDCache := workingsetcache.New(1234, time.Hour) metricNameCache := workingsetcache.New(1234, time.Hour) + tsidCache := workingsetcache.New(1234, time.Hour) defer metricIDCache.Stop() defer metricNameCache.Stop() + defer tsidCache.Stop() var hmCurr atomic.Value hmCurr.Store(&hourMetricIDs{}) @@ -560,7 +566,7 @@ func TestIndexDB(t *testing.T) { hmPrev.Store(&hourMetricIDs{}) dbName := "test-index-db-concurrent" - db, err := openIndexDB(dbName, metricIDCache, metricNameCache, &hmCurr, &hmPrev) + db, err := openIndexDB(dbName, metricIDCache, metricNameCache, tsidCache, &hmCurr, &hmPrev) if err != nil { t.Fatalf("cannot open indexDB: %s", err) } @@ -1532,8 +1538,10 @@ func TestMatchTagFilters(t *testing.T) { func TestSearchTSIDWithTimeRange(t *testing.T) { metricIDCache := workingsetcache.New(1234, time.Hour) metricNameCache := workingsetcache.New(1234, time.Hour) + tsidCache := workingsetcache.New(1234, time.Hour) defer metricIDCache.Stop() defer metricNameCache.Stop() + defer tsidCache.Stop() currMetricIDs := &hourMetricIDs{ isFull: true, @@ -1551,7 +1559,7 @@ func TestSearchTSIDWithTimeRange(t *testing.T) { hmPrev.Store(prevMetricIDs) dbName := "test-index-db-ts-range" - db, err := openIndexDB(dbName, metricIDCache, metricNameCache, &hmCurr, &hmPrev) + db, err := openIndexDB(dbName, metricIDCache, metricNameCache, tsidCache, &hmCurr, &hmPrev) if err != nil { t.Fatalf("cannot open indexDB: %s", err) } diff --git a/lib/storage/index_db_timing_test.go b/lib/storage/index_db_timing_test.go index 4dba8c6d9..09efcfeb3 100644 --- a/lib/storage/index_db_timing_test.go +++ b/lib/storage/index_db_timing_test.go @@ -45,8 +45,10 @@ func BenchmarkIndexDBAddTSIDs(b *testing.B) { metricIDCache := workingsetcache.New(1234, time.Hour) metricNameCache := workingsetcache.New(1234, time.Hour) + tsidCache := workingsetcache.New(1234, time.Hour) defer metricIDCache.Stop() defer metricNameCache.Stop() + defer tsidCache.Stop() var hmCurr atomic.Value hmCurr.Store(&hourMetricIDs{}) @@ -54,7 +56,7 @@ func BenchmarkIndexDBAddTSIDs(b *testing.B) { hmPrev.Store(&hourMetricIDs{}) const dbName = "bench-index-db-add-tsids" - db, err := openIndexDB(dbName, metricIDCache, metricNameCache, &hmCurr, &hmPrev) + db, err := openIndexDB(dbName, metricIDCache, metricNameCache, tsidCache, &hmCurr, &hmPrev) if err != nil { b.Fatalf("cannot open indexDB: %s", err) } @@ -116,8 +118,10 @@ func BenchmarkHeadPostingForMatchers(b *testing.B) { // See https://www.robustperception.io/evaluating-performance-and-correctness for more details. metricIDCache := workingsetcache.New(1234, time.Hour) metricNameCache := workingsetcache.New(1234, time.Hour) + tsidCache := workingsetcache.New(1234, time.Hour) defer metricIDCache.Stop() defer metricNameCache.Stop() + defer tsidCache.Stop() var hmCurr atomic.Value hmCurr.Store(&hourMetricIDs{}) @@ -125,7 +129,7 @@ func BenchmarkHeadPostingForMatchers(b *testing.B) { hmPrev.Store(&hourMetricIDs{}) const dbName = "bench-head-posting-for-matchers" - db, err := openIndexDB(dbName, metricIDCache, metricNameCache, &hmCurr, &hmPrev) + db, err := openIndexDB(dbName, metricIDCache, metricNameCache, tsidCache, &hmCurr, &hmPrev) if err != nil { b.Fatalf("cannot open indexDB: %s", err) } @@ -302,8 +306,10 @@ func BenchmarkHeadPostingForMatchers(b *testing.B) { func BenchmarkIndexDBGetTSIDs(b *testing.B) { metricIDCache := workingsetcache.New(1234, time.Hour) metricNameCache := workingsetcache.New(1234, time.Hour) + tsidCache := workingsetcache.New(1234, time.Hour) defer metricIDCache.Stop() defer metricNameCache.Stop() + defer tsidCache.Stop() var hmCurr atomic.Value hmCurr.Store(&hourMetricIDs{}) @@ -311,7 +317,7 @@ func BenchmarkIndexDBGetTSIDs(b *testing.B) { hmPrev.Store(&hourMetricIDs{}) const dbName = "bench-index-db-get-tsids" - db, err := openIndexDB(dbName, metricIDCache, metricNameCache, &hmCurr, &hmPrev) + db, err := openIndexDB(dbName, metricIDCache, metricNameCache, tsidCache, &hmCurr, &hmPrev) if err != nil { b.Fatalf("cannot open indexDB: %s", err) } diff --git a/lib/storage/storage.go b/lib/storage/storage.go index 9664a81ea..fd004ca2d 100644 --- a/lib/storage/storage.go +++ b/lib/storage/storage.go @@ -176,7 +176,7 @@ func OpenStorage(path string, retentionMonths int) (*Storage, error) { if err := fs.MkdirAllIfNotExist(idbSnapshotsPath); err != nil { return nil, fmt.Errorf("cannot create %q: %w", idbSnapshotsPath, err) } - idbCurr, idbPrev, err := openIndexDBTables(idbPath, s.metricIDCache, s.metricNameCache, &s.currHourMetricIDs, &s.prevHourMetricIDs) + idbCurr, idbPrev, err := openIndexDBTables(idbPath, s.metricIDCache, s.metricNameCache, s.tsidCache, &s.currHourMetricIDs, &s.prevHourMetricIDs) if err != nil { return nil, fmt.Errorf("cannot open indexdb tables at %q: %w", idbPath, err) } @@ -552,7 +552,7 @@ func (s *Storage) mustRotateIndexDB() { // Create new indexdb table. newTableName := nextIndexDBTableName() idbNewPath := s.path + "/indexdb/" + newTableName - idbNew, err := openIndexDB(idbNewPath, s.metricIDCache, s.metricNameCache, &s.currHourMetricIDs, &s.prevHourMetricIDs) + idbNew, err := openIndexDB(idbNewPath, s.metricIDCache, s.metricNameCache, s.tsidCache, &s.currHourMetricIDs, &s.prevHourMetricIDs) if err != nil { logger.Panicf("FATAL: cannot create new indexDB at %q: %s", idbNewPath, err) } @@ -942,11 +942,10 @@ func (s *Storage) DeleteMetrics(tfss []*TagFilters) (int, error) { if err != nil { return deletedCount, fmt.Errorf("cannot delete tsids: %w", err) } - // Reset MetricName->TSID cache in order to prevent from adding new data points - // to deleted time series in Storage.add. - s.tsidCache.Reset() + // Do not reset MetricName->TSID cache in order to prevent from adding new data points + // to deleted time series in Storage.add, since it is already reset inside DeleteTSIDs. - // Do not reset MetricID -> MetricName cache, since it must be used only + // Do not reset MetricID->MetricName cache, since it must be used only // after filtering out deleted metricIDs. return deletedCount, nil @@ -1730,7 +1729,8 @@ func (s *Storage) putTSIDToCache(tsid *TSID, metricName []byte) { s.tsidCache.Set(metricName, buf) } -func openIndexDBTables(path string, metricIDCache, metricNameCache *workingsetcache.Cache, currHourMetricIDs, prevHourMetricIDs *atomic.Value) (curr, prev *indexDB, err error) { +func openIndexDBTables(path string, metricIDCache, metricNameCache, tsidCache *workingsetcache.Cache, + currHourMetricIDs, prevHourMetricIDs *atomic.Value) (curr, prev *indexDB, err error) { if err := fs.MkdirAllIfNotExist(path); err != nil { return nil, nil, fmt.Errorf("cannot create directory %q: %w", path, err) } @@ -1789,12 +1789,12 @@ func openIndexDBTables(path string, metricIDCache, metricNameCache *workingsetca // Open the last two tables. currPath := path + "/" + tableNames[len(tableNames)-1] - curr, err = openIndexDB(currPath, metricIDCache, metricNameCache, currHourMetricIDs, prevHourMetricIDs) + curr, err = openIndexDB(currPath, metricIDCache, metricNameCache, tsidCache, currHourMetricIDs, prevHourMetricIDs) if err != nil { return nil, nil, fmt.Errorf("cannot open curr indexdb table at %q: %w", currPath, err) } prevPath := path + "/" + tableNames[len(tableNames)-2] - prev, err = openIndexDB(prevPath, metricIDCache, metricNameCache, currHourMetricIDs, prevHourMetricIDs) + prev, err = openIndexDB(prevPath, metricIDCache, metricNameCache, tsidCache, currHourMetricIDs, prevHourMetricIDs) if err != nil { curr.MustClose() return nil, nil, fmt.Errorf("cannot open prev indexdb table at %q: %w", prevPath, err)