mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
lib/storage: persist metric ids for the current and the previous hour on graceful shutdown
This should improve performance after restart when the db contains a lot of time series with high time series churn (i.e. metrics from Kubernetes with many pods and frequent deployments)
This commit is contained in:
parent
beb479b8f1
commit
d2c801029b
1 changed files with 79 additions and 3 deletions
|
@ -2,6 +2,7 @@ package storage
|
|||
|
||||
import (
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"math"
|
||||
"os"
|
||||
"path/filepath"
|
||||
|
@ -112,6 +113,13 @@ func OpenStorage(path string, retentionMonths int) (*Storage, error) {
|
|||
s.metricNameCache = s.mustLoadCache("MetricID->MetricName", "metricID_metricName", mem/8)
|
||||
s.dateMetricIDCache = s.mustLoadCache("Date->MetricID", "date_metricID", mem/32)
|
||||
|
||||
hour := uint64(timestampFromTime(time.Now())) / msecPerHour
|
||||
hmCurr := s.mustLoadHourMetricIDs(hour, "curr_hour_metric_ids")
|
||||
hmPrev := s.mustLoadHourMetricIDs(hour-1, "prev_hour_metric_ids")
|
||||
s.currHourMetricIDs.Store(hmCurr)
|
||||
s.prevHourMetricIDs.Store(hmPrev)
|
||||
s.pendingHourMetricIDs = make(map[uint64]struct{})
|
||||
|
||||
// Load indexdb
|
||||
idbPath := path + "/indexdb"
|
||||
idbSnapshotsPath := idbPath + "/snapshots"
|
||||
|
@ -134,10 +142,7 @@ func OpenStorage(path string, retentionMonths int) (*Storage, error) {
|
|||
}
|
||||
s.tb = tb
|
||||
|
||||
s.currHourMetricIDs.Store(&hourMetricIDs{})
|
||||
s.pendingHourMetricIDs = make(map[uint64]struct{})
|
||||
s.startCurrHourMetricIDsUpdater()
|
||||
|
||||
s.startRetentionWatcher()
|
||||
|
||||
return s, nil
|
||||
|
@ -429,12 +434,83 @@ func (s *Storage) MustClose() {
|
|||
s.mustSaveCache(s.metricNameCache, "MetricID->MetricName", "metricID_metricName")
|
||||
s.mustSaveCache(s.dateMetricIDCache, "Date->MetricID", "date_metricID")
|
||||
|
||||
hmCurr := s.currHourMetricIDs.Load().(*hourMetricIDs)
|
||||
s.mustSaveHourMetricIDs(hmCurr, "curr_hour_metric_ids")
|
||||
hmPrev := s.prevHourMetricIDs.Load().(*hourMetricIDs)
|
||||
s.mustSaveHourMetricIDs(hmPrev, "prev_hour_metric_ids")
|
||||
|
||||
// Release lock file.
|
||||
if err := s.flockF.Close(); err != nil {
|
||||
logger.Panicf("FATAL: cannot close lock file %q: %s", s.flockF.Name(), err)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Storage) mustLoadHourMetricIDs(hour uint64, name string) *hourMetricIDs {
|
||||
path := s.cachePath + "/" + name
|
||||
logger.Infof("loading %s from %q...", name, path)
|
||||
startTime := time.Now()
|
||||
if !fs.IsPathExist(path) {
|
||||
logger.Infof("nothing to load from %q", path)
|
||||
return &hourMetricIDs{}
|
||||
}
|
||||
src, err := ioutil.ReadFile(path)
|
||||
if err != nil {
|
||||
logger.Panicf("FATAL: cannot read %s: %s", path, err)
|
||||
}
|
||||
srcOrigLen := len(src)
|
||||
if len(src) < 24 {
|
||||
logger.Errorf("discarding %s, since it has broken header; got %d bytes; want %d bytes", path, len(src), 24)
|
||||
return &hourMetricIDs{}
|
||||
}
|
||||
isFull := encoding.UnmarshalUint64(src)
|
||||
src = src[8:]
|
||||
hourLoaded := encoding.UnmarshalUint64(src)
|
||||
src = src[8:]
|
||||
if hourLoaded != hour {
|
||||
logger.Infof("discarding %s, since it is outdated", name)
|
||||
return &hourMetricIDs{}
|
||||
}
|
||||
hmLen := encoding.UnmarshalUint64(src)
|
||||
src = src[8:]
|
||||
if uint64(len(src)) != 8*hmLen {
|
||||
logger.Errorf("discarding %s, since it has broken body; got %d bytes; want %d bytes", path, len(src), 8*hmLen)
|
||||
return &hourMetricIDs{}
|
||||
}
|
||||
m := make(map[uint64]struct{}, hmLen)
|
||||
for i := uint64(0); i < hmLen; i++ {
|
||||
metricID := encoding.UnmarshalUint64(src)
|
||||
src = src[8:]
|
||||
m[metricID] = struct{}{}
|
||||
}
|
||||
logger.Infof("loaded %s from %q in %s; entriesCount: %d; bytesSize: %d", name, path, time.Since(startTime), hmLen, srcOrigLen)
|
||||
return &hourMetricIDs{
|
||||
m: m,
|
||||
hour: hourLoaded,
|
||||
isFull: isFull != 0,
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Storage) mustSaveHourMetricIDs(hm *hourMetricIDs, name string) {
|
||||
path := s.cachePath + "/" + name
|
||||
logger.Infof("saving %s to %q...", name, path)
|
||||
startTime := time.Now()
|
||||
dst := make([]byte, 0, len(hm.m)*8+24)
|
||||
isFull := uint64(0)
|
||||
if hm.isFull {
|
||||
isFull = 1
|
||||
}
|
||||
dst = encoding.MarshalUint64(dst, isFull)
|
||||
dst = encoding.MarshalUint64(dst, hm.hour)
|
||||
dst = encoding.MarshalUint64(dst, uint64(len(hm.m)))
|
||||
for metricID := range hm.m {
|
||||
dst = encoding.MarshalUint64(dst, metricID)
|
||||
}
|
||||
if err := ioutil.WriteFile(path, dst, 0644); err != nil {
|
||||
logger.Panicf("FATAL: cannot write %d bytes to %q: %s", len(dst), path, err)
|
||||
}
|
||||
logger.Infof("saved %s to %q in %s; entriesCount: %d; bytesSize: %d", name, path, time.Since(startTime), len(hm.m), len(dst))
|
||||
}
|
||||
|
||||
func (s *Storage) mustLoadCache(info, name string, bytesSize int) *fastcache.Cache {
|
||||
path := s.cachePath + "/" + name
|
||||
logger.Infof("loading %s cache from %q...", info, path)
|
||||
|
|
Loading…
Reference in a new issue