diff --git a/lib/storage/storage.go b/lib/storage/storage.go index 1de341e9a..85ea1cc2d 100644 --- a/lib/storage/storage.go +++ b/lib/storage/storage.go @@ -2,6 +2,7 @@ package storage import ( "fmt" + "io/ioutil" "math" "os" "path/filepath" @@ -112,6 +113,13 @@ func OpenStorage(path string, retentionMonths int) (*Storage, error) { s.metricNameCache = s.mustLoadCache("MetricID->MetricName", "metricID_metricName", mem/8) s.dateMetricIDCache = s.mustLoadCache("Date->MetricID", "date_metricID", mem/32) + hour := uint64(timestampFromTime(time.Now())) / msecPerHour + hmCurr := s.mustLoadHourMetricIDs(hour, "curr_hour_metric_ids") + hmPrev := s.mustLoadHourMetricIDs(hour-1, "prev_hour_metric_ids") + s.currHourMetricIDs.Store(hmCurr) + s.prevHourMetricIDs.Store(hmPrev) + s.pendingHourMetricIDs = make(map[uint64]struct{}) + // Load indexdb idbPath := path + "/indexdb" idbSnapshotsPath := idbPath + "/snapshots" @@ -134,10 +142,7 @@ func OpenStorage(path string, retentionMonths int) (*Storage, error) { } s.tb = tb - s.currHourMetricIDs.Store(&hourMetricIDs{}) - s.pendingHourMetricIDs = make(map[uint64]struct{}) s.startCurrHourMetricIDsUpdater() - s.startRetentionWatcher() return s, nil @@ -429,12 +434,83 @@ func (s *Storage) MustClose() { s.mustSaveCache(s.metricNameCache, "MetricID->MetricName", "metricID_metricName") s.mustSaveCache(s.dateMetricIDCache, "Date->MetricID", "date_metricID") + hmCurr := s.currHourMetricIDs.Load().(*hourMetricIDs) + s.mustSaveHourMetricIDs(hmCurr, "curr_hour_metric_ids") + hmPrev := s.prevHourMetricIDs.Load().(*hourMetricIDs) + s.mustSaveHourMetricIDs(hmPrev, "prev_hour_metric_ids") + // Release lock file. if err := s.flockF.Close(); err != nil { logger.Panicf("FATAL: cannot close lock file %q: %s", s.flockF.Name(), err) } } +func (s *Storage) mustLoadHourMetricIDs(hour uint64, name string) *hourMetricIDs { + path := s.cachePath + "/" + name + logger.Infof("loading %s from %q...", name, path) + startTime := time.Now() + if !fs.IsPathExist(path) { + logger.Infof("nothing to load from %q", path) + return &hourMetricIDs{} + } + src, err := ioutil.ReadFile(path) + if err != nil { + logger.Panicf("FATAL: cannot read %s: %s", path, err) + } + srcOrigLen := len(src) + if len(src) < 24 { + logger.Errorf("discarding %s, since it has broken header; got %d bytes; want %d bytes", path, len(src), 24) + return &hourMetricIDs{} + } + isFull := encoding.UnmarshalUint64(src) + src = src[8:] + hourLoaded := encoding.UnmarshalUint64(src) + src = src[8:] + if hourLoaded != hour { + logger.Infof("discarding %s, since it is outdated", name) + return &hourMetricIDs{} + } + hmLen := encoding.UnmarshalUint64(src) + src = src[8:] + if uint64(len(src)) != 8*hmLen { + logger.Errorf("discarding %s, since it has broken body; got %d bytes; want %d bytes", path, len(src), 8*hmLen) + return &hourMetricIDs{} + } + m := make(map[uint64]struct{}, hmLen) + for i := uint64(0); i < hmLen; i++ { + metricID := encoding.UnmarshalUint64(src) + src = src[8:] + m[metricID] = struct{}{} + } + logger.Infof("loaded %s from %q in %s; entriesCount: %d; bytesSize: %d", name, path, time.Since(startTime), hmLen, srcOrigLen) + return &hourMetricIDs{ + m: m, + hour: hourLoaded, + isFull: isFull != 0, + } +} + +func (s *Storage) mustSaveHourMetricIDs(hm *hourMetricIDs, name string) { + path := s.cachePath + "/" + name + logger.Infof("saving %s to %q...", name, path) + startTime := time.Now() + dst := make([]byte, 0, len(hm.m)*8+24) + isFull := uint64(0) + if hm.isFull { + isFull = 1 + } + dst = encoding.MarshalUint64(dst, isFull) + dst = encoding.MarshalUint64(dst, hm.hour) + dst = encoding.MarshalUint64(dst, uint64(len(hm.m))) + for metricID := range hm.m { + dst = encoding.MarshalUint64(dst, metricID) + } + if err := ioutil.WriteFile(path, dst, 0644); err != nil { + logger.Panicf("FATAL: cannot write %d bytes to %q: %s", len(dst), path, err) + } + logger.Infof("saved %s to %q in %s; entriesCount: %d; bytesSize: %d", name, path, time.Since(startTime), len(hm.m), len(dst)) +} + func (s *Storage) mustLoadCache(info, name string, bytesSize int) *fastcache.Cache { path := s.cachePath + "/" + name logger.Infof("loading %s cache from %q...", info, path)