lib/storage: persist metric ids for the current and the previous hour on graceful shutdown

This should improve performance after restart when the db contains a lot of time series
with high time series churn (i.e. metrics from Kubernetes with many pods and frequent deployments)
This commit is contained in:
Aliaksandr Valialkin 2019-06-14 07:52:32 +03:00
parent ba3cccd471
commit f9e1d32168

View file

@ -2,6 +2,7 @@ package storage
import ( import (
"fmt" "fmt"
"io/ioutil"
"math" "math"
"os" "os"
"path/filepath" "path/filepath"
@ -112,6 +113,13 @@ func OpenStorage(path string, retentionMonths int) (*Storage, error) {
s.metricNameCache = s.mustLoadCache("MetricID->MetricName", "metricID_metricName", mem/8) s.metricNameCache = s.mustLoadCache("MetricID->MetricName", "metricID_metricName", mem/8)
s.dateMetricIDCache = s.mustLoadCache("Date->MetricID", "date_metricID", mem/32) s.dateMetricIDCache = s.mustLoadCache("Date->MetricID", "date_metricID", mem/32)
hour := uint64(timestampFromTime(time.Now())) / msecPerHour
hmCurr := s.mustLoadHourMetricIDs(hour, "curr_hour_metric_ids")
hmPrev := s.mustLoadHourMetricIDs(hour-1, "prev_hour_metric_ids")
s.currHourMetricIDs.Store(hmCurr)
s.prevHourMetricIDs.Store(hmPrev)
s.pendingHourMetricIDs = make(map[uint64]struct{})
// Load indexdb // Load indexdb
idbPath := path + "/indexdb" idbPath := path + "/indexdb"
idbSnapshotsPath := idbPath + "/snapshots" idbSnapshotsPath := idbPath + "/snapshots"
@ -134,10 +142,7 @@ func OpenStorage(path string, retentionMonths int) (*Storage, error) {
} }
s.tb = tb s.tb = tb
s.currHourMetricIDs.Store(&hourMetricIDs{})
s.pendingHourMetricIDs = make(map[uint64]struct{})
s.startCurrHourMetricIDsUpdater() s.startCurrHourMetricIDsUpdater()
s.startRetentionWatcher() s.startRetentionWatcher()
return s, nil return s, nil
@ -429,12 +434,83 @@ func (s *Storage) MustClose() {
s.mustSaveCache(s.metricNameCache, "MetricID->MetricName", "metricID_metricName") s.mustSaveCache(s.metricNameCache, "MetricID->MetricName", "metricID_metricName")
s.mustSaveCache(s.dateMetricIDCache, "Date->MetricID", "date_metricID") s.mustSaveCache(s.dateMetricIDCache, "Date->MetricID", "date_metricID")
hmCurr := s.currHourMetricIDs.Load().(*hourMetricIDs)
s.mustSaveHourMetricIDs(hmCurr, "curr_hour_metric_ids")
hmPrev := s.prevHourMetricIDs.Load().(*hourMetricIDs)
s.mustSaveHourMetricIDs(hmPrev, "prev_hour_metric_ids")
// Release lock file. // Release lock file.
if err := s.flockF.Close(); err != nil { if err := s.flockF.Close(); err != nil {
logger.Panicf("FATAL: cannot close lock file %q: %s", s.flockF.Name(), err) logger.Panicf("FATAL: cannot close lock file %q: %s", s.flockF.Name(), err)
} }
} }
func (s *Storage) mustLoadHourMetricIDs(hour uint64, name string) *hourMetricIDs {
path := s.cachePath + "/" + name
logger.Infof("loading %s from %q...", name, path)
startTime := time.Now()
if !fs.IsPathExist(path) {
logger.Infof("nothing to load from %q", path)
return &hourMetricIDs{}
}
src, err := ioutil.ReadFile(path)
if err != nil {
logger.Panicf("FATAL: cannot read %s: %s", path, err)
}
srcOrigLen := len(src)
if len(src) < 24 {
logger.Errorf("discarding %s, since it has broken header; got %d bytes; want %d bytes", path, len(src), 24)
return &hourMetricIDs{}
}
isFull := encoding.UnmarshalUint64(src)
src = src[8:]
hourLoaded := encoding.UnmarshalUint64(src)
src = src[8:]
if hourLoaded != hour {
logger.Infof("discarding %s, since it is outdated", name)
return &hourMetricIDs{}
}
hmLen := encoding.UnmarshalUint64(src)
src = src[8:]
if uint64(len(src)) != 8*hmLen {
logger.Errorf("discarding %s, since it has broken body; got %d bytes; want %d bytes", path, len(src), 8*hmLen)
return &hourMetricIDs{}
}
m := make(map[uint64]struct{}, hmLen)
for i := uint64(0); i < hmLen; i++ {
metricID := encoding.UnmarshalUint64(src)
src = src[8:]
m[metricID] = struct{}{}
}
logger.Infof("loaded %s from %q in %s; entriesCount: %d; bytesSize: %d", name, path, time.Since(startTime), hmLen, srcOrigLen)
return &hourMetricIDs{
m: m,
hour: hourLoaded,
isFull: isFull != 0,
}
}
func (s *Storage) mustSaveHourMetricIDs(hm *hourMetricIDs, name string) {
path := s.cachePath + "/" + name
logger.Infof("saving %s to %q...", name, path)
startTime := time.Now()
dst := make([]byte, 0, len(hm.m)*8+24)
isFull := uint64(0)
if hm.isFull {
isFull = 1
}
dst = encoding.MarshalUint64(dst, isFull)
dst = encoding.MarshalUint64(dst, hm.hour)
dst = encoding.MarshalUint64(dst, uint64(len(hm.m)))
for metricID := range hm.m {
dst = encoding.MarshalUint64(dst, metricID)
}
if err := ioutil.WriteFile(path, dst, 0644); err != nil {
logger.Panicf("FATAL: cannot write %d bytes to %q: %s", len(dst), path, err)
}
logger.Infof("saved %s to %q in %s; entriesCount: %d; bytesSize: %d", name, path, time.Since(startTime), len(hm.m), len(dst))
}
func (s *Storage) mustLoadCache(info, name string, bytesSize int) *fastcache.Cache { func (s *Storage) mustLoadCache(info, name string, bytesSize int) *fastcache.Cache {
path := s.cachePath + "/" + name path := s.cachePath + "/" + name
logger.Infof("loading %s cache from %q...", info, path) logger.Infof("loading %s cache from %q...", info, path)