From d37924900b8c2ada28918f7b60560d3e5fd3468f Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Sun, 9 Jun 2019 19:06:53 +0300 Subject: [PATCH] lib/storage: optimize time series lookup for recent hours when the db contains many millions of time series with high churn rate (aka frequent deployments in Kubernetes) --- app/vmstorage/main.go | 12 ++ lib/storage/index_db.go | 97 ++++++++++++++- lib/storage/index_db_test.go | 8 +- lib/storage/index_db_timing_test.go | 6 +- lib/storage/storage.go | 135 +++++++++++---------- lib/storage/storage_test.go | 178 ++++++++++++++++++++++++++++ lib/storage/time.go | 2 + 7 files changed, 367 insertions(+), 71 deletions(-) diff --git a/app/vmstorage/main.go b/app/vmstorage/main.go index 1e08b323a..a78a17946 100644 --- a/app/vmstorage/main.go +++ b/app/vmstorage/main.go @@ -285,6 +285,18 @@ func registerStorageMetrics(strg *storage.Storage) { metrics.NewGauge(`vm_missing_tsids_for_metric_id_total`, func() float64 { return float64(idbm().MissingTSIDsForMetricID) }) + metrics.NewGauge(`vm_recent_hour_metric_ids_search_calls`, func() float64 { + return float64(idbm().RecentHourMetricIDsSearchCalls) + }) + metrics.NewGauge(`vm_recent_hour_metric_ids_search_hits`, func() float64 { + return float64(idbm().RecentHourMetricIDsSearchHits) + }) + metrics.NewGauge(`vm_date_metric_ids_search_calls`, func() float64 { + return float64(idbm().DateMetricIDsSearchCalls) + }) + metrics.NewGauge(`vm_date_metric_ids_search_hits`, func() float64 { + return float64(idbm().DateMetricIDsSearchHits) + }) metrics.NewGauge(`vm_assisted_merges_total{type="storage/small"}`, func() float64 { return float64(tm().SmallAssistedMerges) diff --git a/lib/storage/index_db.go b/lib/storage/index_db.go index d6aa7e514..e0069771a 100644 --- a/lib/storage/index_db.go +++ b/lib/storage/index_db.go @@ -71,15 +71,33 @@ type indexDB struct { deletedMetricIDs atomic.Value deletedMetricIDsUpdateLock sync.Mutex + // Global lists of metric ids for the current and the previous hours. + // They are used for fast lookups on small time ranges covering + // up to two last hours. + currHourMetricIDs *atomic.Value + prevHourMetricIDs *atomic.Value + // The number of missing MetricID -> TSID entries. // High rate for this value means corrupted indexDB. missingTSIDsForMetricID uint64 + // The number of calls to search for metric ids for recent hours. + recentHourMetricIDsSearchCalls uint64 + + // The number of cache hits during search for metric ids in recent hours. + recentHourMetricIDsSearchHits uint64 + + // The number of searches for metric ids by days. + dateMetricIDsSearchCalls uint64 + + // The number of successful searches for metric ids by days. + dateMetricIDsSearchHits uint64 + mustDrop uint64 } // openIndexDB opens index db from the given path with the given caches. -func openIndexDB(path string, metricIDCache, metricNameCache *fastcache.Cache) (*indexDB, error) { +func openIndexDB(path string, metricIDCache, metricNameCache *fastcache.Cache, currHourMetricIDs, prevHourMetricIDs *atomic.Value) (*indexDB, error) { tb, err := mergeset.OpenTable(path) if err != nil { return nil, fmt.Errorf("cannot open indexDB %q: %s", path, err) @@ -99,6 +117,9 @@ func openIndexDB(path string, metricIDCache, metricNameCache *fastcache.Cache) ( tagCache: tagCache, metricIDCache: metricIDCache, metricNameCache: metricNameCache, + + currHourMetricIDs: currHourMetricIDs, + prevHourMetricIDs: prevHourMetricIDs, } is := db.getIndexSearch() @@ -125,6 +146,11 @@ type IndexDBMetrics struct { MissingTSIDsForMetricID uint64 + RecentHourMetricIDsSearchCalls uint64 + RecentHourMetricIDsSearchHits uint64 + DateMetricIDsSearchCalls uint64 + DateMetricIDsSearchHits uint64 + mergeset.TableMetrics } @@ -145,6 +171,10 @@ func (db *indexDB) UpdateMetrics(m *IndexDBMetrics) { m.IndexDBRefCount += atomic.LoadUint64(&db.refCount) m.MissingTSIDsForMetricID += atomic.LoadUint64(&db.missingTSIDsForMetricID) + m.RecentHourMetricIDsSearchCalls += atomic.LoadUint64(&db.recentHourMetricIDsSearchCalls) + m.RecentHourMetricIDsSearchHits += atomic.LoadUint64(&db.recentHourMetricIDsSearchHits) + m.DateMetricIDsSearchCalls += atomic.LoadUint64(&db.dateMetricIDsSearchCalls) + m.DateMetricIDsSearchHits += atomic.LoadUint64(&db.dateMetricIDsSearchHits) db.tb.UpdateMetrics(&m.TableMetrics) db.doExtDB(func(extDB *indexDB) { @@ -1541,6 +1571,16 @@ func (is *indexSearch) getMetricIDsForTimeRange(tr TimeRange, maxMetrics int) (m if tr.isZero() { return nil, errMissingMetricIDsForDate } + atomic.AddUint64(&is.db.recentHourMetricIDsSearchCalls, 1) + if metricIDs, ok := is.getMetricIDsForRecentHours(tr, maxMetrics); ok { + // Fast path: tr covers the current and / or the previous hour. + // Return the full list of metric ids for this time range. + atomic.AddUint64(&is.db.recentHourMetricIDsSearchHits, 1) + return metricIDs, nil + } + + // Slow path: collect the metric ids for all the days covering the given tr. + atomic.AddUint64(&is.db.dateMetricIDsSearchCalls, 1) minDate := tr.MinTimestamp / msecPerDay maxDate := tr.MaxTimestamp / msecPerDay if maxDate-minDate > 40 { @@ -1554,9 +1594,64 @@ func (is *indexSearch) getMetricIDsForTimeRange(tr TimeRange, maxMetrics int) (m } minDate++ } + atomic.AddUint64(&is.db.dateMetricIDsSearchHits, 1) return metricIDs, nil } +func (is *indexSearch) getMetricIDsForRecentHours(tr TimeRange, maxMetrics int) (map[uint64]struct{}, bool) { + minHour := uint64(tr.MinTimestamp) / msecPerHour + maxHour := uint64(tr.MaxTimestamp) / msecPerHour + if is.db.currHourMetricIDs == nil { + return nil, false + } + hmCurr := is.db.currHourMetricIDs.Load().(*hourMetricIDs) + if maxHour == hmCurr.hour && minHour == maxHour && hmCurr.isFull { + // The tr fits the current hour. + // Return a copy of hmCurr.m, because the caller may modify + // the returned map. + if len(hmCurr.m) > maxMetrics { + return nil, false + } + return getMetricIDsCopy(hmCurr.m), true + } + if is.db.prevHourMetricIDs == nil { + return nil, false + } + hmPrev := is.db.prevHourMetricIDs.Load().(*hourMetricIDs) + if maxHour == hmPrev.hour && minHour == maxHour && hmPrev.isFull { + // The tr fits the previous hour. + // Return a copy of hmPrev.m, because the caller may modify + // the returned map. + if len(hmPrev.m) > maxMetrics { + return nil, false + } + return getMetricIDsCopy(hmPrev.m), true + } + if maxHour == hmCurr.hour && minHour == hmPrev.hour && hmCurr.isFull && hmPrev.isFull { + // The tr spans the previous and the current hours. + if len(hmCurr.m)+len(hmPrev.m) > maxMetrics { + return nil, false + } + metricIDs := make(map[uint64]struct{}, len(hmCurr.m)+len(hmPrev.m)) + for metricID := range hmCurr.m { + metricIDs[metricID] = struct{}{} + } + for metricID := range hmPrev.m { + metricIDs[metricID] = struct{}{} + } + return metricIDs, true + } + return nil, false +} + +func getMetricIDsCopy(src map[uint64]struct{}) map[uint64]struct{} { + dst := make(map[uint64]struct{}, len(src)) + for metricID := range src { + dst[metricID] = struct{}{} + } + return dst +} + func (db *indexDB) storeDateMetricID(date, metricID uint64) error { is := db.getIndexSearch() ok, err := is.hasDateMetricID(date, metricID) diff --git a/lib/storage/index_db_test.go b/lib/storage/index_db_test.go index 071859119..1944a7aef 100644 --- a/lib/storage/index_db_test.go +++ b/lib/storage/index_db_test.go @@ -61,7 +61,7 @@ func TestIndexDBOpenClose(t *testing.T) { defer metricIDCache.Reset() defer metricNameCache.Reset() for i := 0; i < 5; i++ { - db, err := openIndexDB("test-index-db", metricIDCache, metricNameCache) + db, err := openIndexDB("test-index-db", metricIDCache, metricNameCache, nil, nil) if err != nil { t.Fatalf("cannot open indexDB: %s", err) } @@ -83,7 +83,7 @@ func TestIndexDB(t *testing.T) { defer metricIDCache.Reset() defer metricNameCache.Reset() dbName := "test-index-db-serial" - db, err := openIndexDB(dbName, metricIDCache, metricNameCache) + db, err := openIndexDB(dbName, metricIDCache, metricNameCache, nil, nil) if err != nil { t.Fatalf("cannot open indexDB: %s", err) } @@ -113,7 +113,7 @@ func TestIndexDB(t *testing.T) { // Re-open the db and verify it works as expected. db.MustClose() - db, err = openIndexDB(dbName, metricIDCache, metricNameCache) + db, err = openIndexDB(dbName, metricIDCache, metricNameCache, nil, nil) if err != nil { t.Fatalf("cannot open indexDB: %s", err) } @@ -134,7 +134,7 @@ func TestIndexDB(t *testing.T) { defer metricIDCache.Reset() defer metricNameCache.Reset() dbName := "test-index-db-concurrent" - db, err := openIndexDB(dbName, metricIDCache, metricNameCache) + db, err := openIndexDB(dbName, metricIDCache, metricNameCache, nil, nil) if err != nil { t.Fatalf("cannot open indexDB: %s", err) } diff --git a/lib/storage/index_db_timing_test.go b/lib/storage/index_db_timing_test.go index e830a3777..ae18371ad 100644 --- a/lib/storage/index_db_timing_test.go +++ b/lib/storage/index_db_timing_test.go @@ -17,7 +17,7 @@ func BenchmarkIndexDBAddTSIDs(b *testing.B) { defer metricIDCache.Reset() defer metricNameCache.Reset() const dbName = "bench-index-db-add-tsids" - db, err := openIndexDB(dbName, metricIDCache, metricNameCache) + db, err := openIndexDB(dbName, metricIDCache, metricNameCache, nil, nil) if err != nil { b.Fatalf("cannot open indexDB: %s", err) } @@ -77,7 +77,7 @@ func BenchmarkIndexDBSearchTSIDs(b *testing.B) { defer metricIDCache.Reset() defer metricNameCache.Reset() const dbName = "bench-index-db-search-tsids" - db, err := openIndexDB(dbName, metricIDCache, metricNameCache) + db, err := openIndexDB(dbName, metricIDCache, metricNameCache, nil, nil) if err != nil { b.Fatalf("cannot open indexDB: %s", err) } @@ -148,7 +148,7 @@ func BenchmarkIndexDBGetTSIDs(b *testing.B) { defer metricIDCache.Reset() defer metricNameCache.Reset() const dbName = "bench-index-db-get-tsids" - db, err := openIndexDB(dbName, metricIDCache, metricNameCache) + db, err := openIndexDB(dbName, metricIDCache, metricNameCache, nil, nil) if err != nil { b.Fatalf("cannot open indexDB: %s", err) } diff --git a/lib/storage/storage.go b/lib/storage/storage.go index 58b148805..31f5f6d4f 100644 --- a/lib/storage/storage.go +++ b/lib/storage/storage.go @@ -50,15 +50,20 @@ type Storage struct { // dateMetricIDCache is (Date, MetricID) cache. dateMetricIDCache *fastcache.Cache - // Fast cache for today MetricID values. - todayMetricIDs atomic.Value - pendingTodayMetricIDsLock sync.Mutex - pendingTodayMetricIDs map[uint64]struct{} + // Fast cache for MetricID values occured during the current hour. + currHourMetricIDs atomic.Value + + // Fast cache for MetricID values occured during the previous hour. + prevHourMetricIDs atomic.Value + + // Pending MetricID values to be added to currHourMetricIDs. + pendingHourMetricIDsLock sync.Mutex + pendingHourMetricIDs map[uint64]struct{} stop chan struct{} - todayMetricIDsUpdaterWG sync.WaitGroup - retentionWatcherWG sync.WaitGroup + currHourMetricIDsUpdaterWG sync.WaitGroup + retentionWatcherWG sync.WaitGroup } // OpenStorage opens storage on the given path with the given number of retention months. @@ -113,7 +118,7 @@ func OpenStorage(path string, retentionMonths int) (*Storage, error) { if err := fs.MkdirAllIfNotExist(idbSnapshotsPath); err != nil { return nil, fmt.Errorf("cannot create %q: %s", idbSnapshotsPath, err) } - idbCurr, idbPrev, err := openIndexDBTables(idbPath, s.metricIDCache, s.metricNameCache) + idbCurr, idbPrev, err := openIndexDBTables(idbPath, s.metricIDCache, s.metricNameCache, &s.currHourMetricIDs, &s.prevHourMetricIDs) if err != nil { return nil, fmt.Errorf("cannot open indexdb tables at %q: %s", idbPath, err) } @@ -129,9 +134,9 @@ func OpenStorage(path string, retentionMonths int) (*Storage, error) { } s.tb = tb - s.todayMetricIDs.Store(&todayMetricIDs{}) - s.pendingTodayMetricIDs = make(map[uint64]struct{}) - s.startTodayMetricIDsUpdater() + s.currHourMetricIDs.Store(&hourMetricIDs{}) + s.pendingHourMetricIDs = make(map[uint64]struct{}) + s.startCurrHourMetricIDsUpdater() s.startRetentionWatcher() @@ -352,25 +357,25 @@ func (s *Storage) retentionWatcher() { } } -func (s *Storage) startTodayMetricIDsUpdater() { - s.todayMetricIDsUpdaterWG.Add(1) +func (s *Storage) startCurrHourMetricIDsUpdater() { + s.currHourMetricIDsUpdaterWG.Add(1) go func() { - s.todayMetricIDsUpdater() - s.todayMetricIDsUpdaterWG.Done() + s.currHourMetricIDsUpdater() + s.currHourMetricIDsUpdaterWG.Done() }() } -var todayMetricIDsUpdateInterval = time.Second * 10 +var currHourMetricIDsUpdateInterval = time.Second * 10 -func (s *Storage) todayMetricIDsUpdater() { - t := time.NewTimer(todayMetricIDsUpdateInterval) +func (s *Storage) currHourMetricIDsUpdater() { + t := time.NewTimer(currHourMetricIDsUpdateInterval) for { select { case <-s.stop: return case <-t.C: - s.updateTodayMetricIDs() - t.Reset(todayMetricIDsUpdateInterval) + s.updateCurrHourMetricIDs() + t.Reset(currHourMetricIDsUpdateInterval) } } } @@ -379,7 +384,7 @@ func (s *Storage) mustRotateIndexDB() { // Create new indexdb table. newTableName := nextIndexDBTableName() idbNewPath := s.path + "/indexdb/" + newTableName - idbNew, err := openIndexDB(idbNewPath, s.metricIDCache, s.metricNameCache) + idbNew, err := openIndexDB(idbNewPath, s.metricIDCache, s.metricNameCache, &s.currHourMetricIDs, &s.prevHourMetricIDs) if err != nil { logger.Panicf("FATAL: cannot create new indexDB at %q: %s", idbNewPath, err) } @@ -413,7 +418,7 @@ func (s *Storage) MustClose() { close(s.stop) s.retentionWatcherWG.Wait() - s.todayMetricIDsUpdaterWG.Wait() + s.currHourMetricIDsUpdaterWG.Wait() s.tb.MustClose() s.idb().MustClose() @@ -700,6 +705,7 @@ func (s *Storage) add(rows []rawRow, mrs []MetricRow, precisionBits uint8) ([]ra func (s *Storage) updateDateMetricIDCache(rows []rawRow, errors []error) []error { var date uint64 + var hour uint64 var prevTimestamp int64 kb := kbPool.Get() defer kbPool.Put(kb) @@ -711,18 +717,20 @@ func (s *Storage) updateDateMetricIDCache(rows []rawRow, errors []error) []error r := &rows[i] if r.Timestamp != prevTimestamp { date = uint64(r.Timestamp) / msecPerDay + hour = uint64(r.Timestamp) / msecPerHour prevTimestamp = r.Timestamp } metricID := r.TSID.MetricID - tm := s.todayMetricIDs.Load().(*todayMetricIDs) - if date == tm.date { - if _, ok := tm.m[metricID]; ok { - // Fast path: the (date, metricID) entry is in the fast cache with today's metric ids. + hm := s.currHourMetricIDs.Load().(*hourMetricIDs) + if hour == hm.hour { + // The r belongs to the current hour. Check for the current hour cache. + if _, ok := hm.m[metricID]; ok { + // Fast path: the metricID is in the current hour cache. continue } - s.pendingTodayMetricIDsLock.Lock() - s.pendingTodayMetricIDs[metricID] = struct{}{} - s.pendingTodayMetricIDsLock.Unlock() + s.pendingHourMetricIDsLock.Lock() + s.pendingHourMetricIDs[metricID] = struct{}{} + s.pendingHourMetricIDsLock.Unlock() } // Slower path: check global cache for (date, metricID) entry. @@ -744,51 +752,52 @@ func (s *Storage) updateDateMetricIDCache(rows []rawRow, errors []error) []error return errors } -func (s *Storage) updateTodayMetricIDs() { - tm := s.todayMetricIDs.Load().(*todayMetricIDs) - s.pendingTodayMetricIDsLock.Lock() - newMetricIDsLen := len(s.pendingTodayMetricIDs) - s.pendingTodayMetricIDsLock.Unlock() - today := uint64(timestampFromTime(time.Now())) / msecPerDay - if newMetricIDsLen == 0 { - if tm.date == today { - // Fast path: nothing to update. - return - } - // Reset tm because of new day. - tmNew := &todayMetricIDs{ - m: make(map[uint64]struct{}, len(tm.m)), - date: today, - } - s.todayMetricIDs.Store(tmNew) +func (s *Storage) updateCurrHourMetricIDs() { + hm := s.currHourMetricIDs.Load().(*hourMetricIDs) + s.pendingHourMetricIDsLock.Lock() + newMetricIDsLen := len(s.pendingHourMetricIDs) + s.pendingHourMetricIDsLock.Unlock() + hour := uint64(timestampFromTime(time.Now())) / msecPerHour + if newMetricIDsLen == 0 && hm.hour == hour { + // Fast path: nothing to update. return } - // Slow path: tm.m must be updated with non-empty s.pendingTodayMetricIDs. - m := make(map[uint64]struct{}, len(tm.m)+newMetricIDsLen) - if tm.date == today { - for metricID := range tm.m { + // Slow path: hm.m must be updated with non-empty s.pendingHourMetricIDs. + var m map[uint64]struct{} + isFull := hm.isFull + if hm.hour == hour { + m = make(map[uint64]struct{}, len(hm.m)+newMetricIDsLen) + for metricID := range hm.m { m[metricID] = struct{}{} } + } else { + m = make(map[uint64]struct{}, newMetricIDsLen) + isFull = true } - s.pendingTodayMetricIDsLock.Lock() - newMetricIDs := s.pendingTodayMetricIDs - s.pendingTodayMetricIDs = make(map[uint64]struct{}, len(newMetricIDs)) - s.pendingTodayMetricIDsLock.Unlock() + s.pendingHourMetricIDsLock.Lock() + newMetricIDs := s.pendingHourMetricIDs + s.pendingHourMetricIDs = make(map[uint64]struct{}, len(newMetricIDs)) + s.pendingHourMetricIDsLock.Unlock() for metricID := range newMetricIDs { m[metricID] = struct{}{} } - tmNew := &todayMetricIDs{ - m: m, - date: today, + hmNew := &hourMetricIDs{ + m: m, + hour: hour, + isFull: isFull, + } + s.currHourMetricIDs.Store(hmNew) + if hm.hour != hour { + s.prevHourMetricIDs.Store(hm) } - s.todayMetricIDs.Store(tmNew) } -type todayMetricIDs struct { - m map[uint64]struct{} - date uint64 +type hourMetricIDs struct { + m map[uint64]struct{} + hour uint64 + isFull bool } func (s *Storage) getTSIDFromCache(dst *TSID, metricName []byte) bool { @@ -802,7 +811,7 @@ func (s *Storage) putTSIDToCache(tsid *TSID, metricName []byte) { s.tsidCache.Set(metricName, buf) } -func openIndexDBTables(path string, metricIDCache, metricNameCache *fastcache.Cache) (curr, prev *indexDB, err error) { +func openIndexDBTables(path string, metricIDCache, metricNameCache *fastcache.Cache, currHourMetricIDs, prevHourMetricIDs *atomic.Value) (curr, prev *indexDB, err error) { if err := fs.MkdirAllIfNotExist(path); err != nil { return nil, nil, fmt.Errorf("cannot create directory %q: %s", path, err) } @@ -863,12 +872,12 @@ func openIndexDBTables(path string, metricIDCache, metricNameCache *fastcache.Ca // Open the last two tables. currPath := path + "/" + tableNames[len(tableNames)-1] - curr, err = openIndexDB(currPath, metricIDCache, metricNameCache) + curr, err = openIndexDB(currPath, metricIDCache, metricNameCache, currHourMetricIDs, prevHourMetricIDs) if err != nil { return nil, nil, fmt.Errorf("cannot open curr indexdb table at %q: %s", currPath, err) } prevPath := path + "/" + tableNames[len(tableNames)-2] - prev, err = openIndexDB(prevPath, metricIDCache, metricNameCache) + prev, err = openIndexDB(prevPath, metricIDCache, metricNameCache, currHourMetricIDs, prevHourMetricIDs) if err != nil { curr.MustClose() return nil, nil, fmt.Errorf("cannot open prev indexdb table at %q: %s", prevPath, err) diff --git a/lib/storage/storage_test.go b/lib/storage/storage_test.go index 5dc2b1dd6..18e9adf99 100644 --- a/lib/storage/storage_test.go +++ b/lib/storage/storage_test.go @@ -11,6 +11,184 @@ import ( "time" ) +func TestUpdateCurrHourMetricIDs(t *testing.T) { + newStorage := func() *Storage { + var s Storage + s.currHourMetricIDs.Store(&hourMetricIDs{}) + s.prevHourMetricIDs.Store(&hourMetricIDs{}) + s.pendingHourMetricIDs = make(map[uint64]struct{}) + return &s + } + t.Run("empty_pedning_metric_ids_stale_curr_hour", func(t *testing.T) { + s := newStorage() + hour := uint64(timestampFromTime(time.Now())) / msecPerHour + hmOrig := &hourMetricIDs{ + m: map[uint64]struct{}{ + 12: {}, + 34: {}, + }, + hour: 123, + } + s.currHourMetricIDs.Store(hmOrig) + s.updateCurrHourMetricIDs() + hmCurr := s.currHourMetricIDs.Load().(*hourMetricIDs) + if hmCurr.hour != hour { + // It is possible new hour occured. Update the hour and verify it again. + hour = uint64(timestampFromTime(time.Now())) / msecPerHour + if hmCurr.hour != hour { + t.Fatalf("unexpected hmCurr.hour; got %d; want %d", hmCurr.hour, hour) + } + } + if len(hmCurr.m) != 0 { + t.Fatalf("unexpected length of hm.m; got %d; want %d", len(hmCurr.m), 0) + } + if !hmCurr.isFull { + t.Fatalf("unexpected hmCurr.isFull; got %v; want %v", hmCurr.isFull, true) + } + + hmPrev := s.prevHourMetricIDs.Load().(*hourMetricIDs) + if !reflect.DeepEqual(hmPrev, hmOrig) { + t.Fatalf("unexpected hmPrev; got %v; want %v", hmPrev, hmOrig) + } + + if len(s.pendingHourMetricIDs) != 0 { + t.Fatalf("unexpected len(s.pendingHourMetricIDs); got %d; want %d", len(s.pendingHourMetricIDs), 0) + } + }) + t.Run("empty_pedning_metric_ids_valid_curr_hour", func(t *testing.T) { + s := newStorage() + hour := uint64(timestampFromTime(time.Now())) / msecPerHour + hmOrig := &hourMetricIDs{ + m: map[uint64]struct{}{ + 12: {}, + 34: {}, + }, + hour: hour, + } + s.currHourMetricIDs.Store(hmOrig) + s.updateCurrHourMetricIDs() + hmCurr := s.currHourMetricIDs.Load().(*hourMetricIDs) + if hmCurr.hour != hour { + // It is possible new hour occured. Update the hour and verify it again. + hour = uint64(timestampFromTime(time.Now())) / msecPerHour + if hmCurr.hour != hour { + t.Fatalf("unexpected hmCurr.hour; got %d; want %d", hmCurr.hour, hour) + } + // Do not run other checks, since they may fail. + return + } + if !reflect.DeepEqual(hmCurr, hmOrig) { + t.Fatalf("unexpected hmCurr; got %v; want %v", hmCurr, hmOrig) + } + if hmCurr.isFull { + t.Fatalf("unexpected hmCurr.isFull; got %v; want %v", hmCurr.isFull, false) + } + + hmPrev := s.prevHourMetricIDs.Load().(*hourMetricIDs) + hmEmpty := &hourMetricIDs{} + if !reflect.DeepEqual(hmPrev, hmEmpty) { + t.Fatalf("unexpected hmPrev; got %v; want %v", hmPrev, hmEmpty) + } + + if len(s.pendingHourMetricIDs) != 0 { + t.Fatalf("unexpected len(s.pendingHourMetricIDs); got %d; want %d", len(s.pendingHourMetricIDs), 0) + } + }) + t.Run("nonempty_pending_metric_ids_stale_curr_hour", func(t *testing.T) { + s := newStorage() + pendingHourMetricIDs := map[uint64]struct{}{ + 343: {}, + 32424: {}, + 8293432: {}, + } + s.pendingHourMetricIDs = pendingHourMetricIDs + + hour := uint64(timestampFromTime(time.Now())) / msecPerHour + hmOrig := &hourMetricIDs{ + m: map[uint64]struct{}{ + 12: {}, + 34: {}, + }, + hour: 123, + } + s.currHourMetricIDs.Store(hmOrig) + s.updateCurrHourMetricIDs() + hmCurr := s.currHourMetricIDs.Load().(*hourMetricIDs) + if hmCurr.hour != hour { + // It is possible new hour occured. Update the hour and verify it again. + hour = uint64(timestampFromTime(time.Now())) / msecPerHour + if hmCurr.hour != hour { + t.Fatalf("unexpected hmCurr.hour; got %d; want %d", hmCurr.hour, hour) + } + } + if !reflect.DeepEqual(hmCurr.m, pendingHourMetricIDs) { + t.Fatalf("unexpected hm.m; got %v; want %v", hmCurr.m, pendingHourMetricIDs) + } + if !hmCurr.isFull { + t.Fatalf("unexpected hmCurr.isFull; got %v; want %v", hmCurr.isFull, true) + } + + hmPrev := s.prevHourMetricIDs.Load().(*hourMetricIDs) + if !reflect.DeepEqual(hmPrev, hmOrig) { + t.Fatalf("unexpected hmPrev; got %v; want %v", hmPrev, hmOrig) + } + + if len(s.pendingHourMetricIDs) != 0 { + t.Fatalf("unexpected len(s.pendingHourMetricIDs); got %d; want %d", len(s.pendingHourMetricIDs), 0) + } + }) + t.Run("nonempty_pending_metric_ids_valid_curr_hour", func(t *testing.T) { + s := newStorage() + pendingHourMetricIDs := map[uint64]struct{}{ + 343: {}, + 32424: {}, + 8293432: {}, + } + s.pendingHourMetricIDs = pendingHourMetricIDs + + hour := uint64(timestampFromTime(time.Now())) / msecPerHour + hmOrig := &hourMetricIDs{ + m: map[uint64]struct{}{ + 12: {}, + 34: {}, + }, + hour: hour, + } + s.currHourMetricIDs.Store(hmOrig) + s.updateCurrHourMetricIDs() + hmCurr := s.currHourMetricIDs.Load().(*hourMetricIDs) + if hmCurr.hour != hour { + // It is possible new hour occured. Update the hour and verify it again. + hour = uint64(timestampFromTime(time.Now())) / msecPerHour + if hmCurr.hour != hour { + t.Fatalf("unexpected hmCurr.hour; got %d; want %d", hmCurr.hour, hour) + } + // Do not run other checks, since they may fail. + return + } + m := getMetricIDsCopy(pendingHourMetricIDs) + for metricID := range hmOrig.m { + m[metricID] = struct{}{} + } + if !reflect.DeepEqual(hmCurr.m, m) { + t.Fatalf("unexpected hm.m; got %v; want %v", hmCurr.m, m) + } + if hmCurr.isFull { + t.Fatalf("unexpected hmCurr.isFull; got %v; want %v", hmCurr.isFull, false) + } + + hmPrev := s.prevHourMetricIDs.Load().(*hourMetricIDs) + hmEmpty := &hourMetricIDs{} + if !reflect.DeepEqual(hmPrev, hmEmpty) { + t.Fatalf("unexpected hmPrev; got %v; want %v", hmPrev, hmEmpty) + } + + if len(s.pendingHourMetricIDs) != 0 { + t.Fatalf("unexpected len(s.pendingHourMetricIDs); got %d; want %d", len(s.pendingHourMetricIDs), 0) + } + }) +} + func TestMetricRowMarshalUnmarshal(t *testing.T) { var buf []byte typ := reflect.TypeOf(&MetricRow{}) diff --git a/lib/storage/time.go b/lib/storage/time.go index 7593b1959..eaa2a3f29 100644 --- a/lib/storage/time.go +++ b/lib/storage/time.go @@ -67,3 +67,5 @@ func (tr *TimeRange) fromPartitionTime(t time.Time) { } const msecPerDay = 24 * 3600 * 1000 + +const msecPerHour = 3600 * 1000