mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-20 15:16:42 +00:00
lib/storage: add storage node id
Generate random node ID on start if it is missing or load from disk. Save to storage on storage shutdown. Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com>
This commit is contained in:
parent
00b1ee6b5c
commit
8d8073a24d
2 changed files with 34 additions and 2 deletions
|
@ -10,6 +10,8 @@ const (
|
||||||
|
|
||||||
appliedRetentionFilename = "appliedRetention.txt"
|
appliedRetentionFilename = "appliedRetention.txt"
|
||||||
resetCacheOnStartupFilename = "reset_cache_on_startup"
|
resetCacheOnStartupFilename = "reset_cache_on_startup"
|
||||||
|
|
||||||
|
nodeIDFilename = "node_id.bin"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
|
|
@ -5,6 +5,7 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"math"
|
"math"
|
||||||
|
"math/rand"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"regexp"
|
"regexp"
|
||||||
|
@ -15,6 +16,9 @@ import (
|
||||||
"time"
|
"time"
|
||||||
"unsafe"
|
"unsafe"
|
||||||
|
|
||||||
|
"github.com/VictoriaMetrics/fastcache"
|
||||||
|
"github.com/VictoriaMetrics/metricsql"
|
||||||
|
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/backup/backupnames"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/backup/backupnames"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bloomfilter"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bloomfilter"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||||
|
@ -29,8 +33,6 @@ import (
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timeutil"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timeutil"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/uint64set"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/uint64set"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/workingsetcache"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/workingsetcache"
|
||||||
"github.com/VictoriaMetrics/fastcache"
|
|
||||||
"github.com/VictoriaMetrics/metricsql"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
@ -64,6 +66,9 @@ type Storage struct {
|
||||||
cachePath string
|
cachePath string
|
||||||
retentionMsecs int64
|
retentionMsecs int64
|
||||||
|
|
||||||
|
// Used to uniquely identify storage node
|
||||||
|
nodeID uint64
|
||||||
|
|
||||||
// lock file for exclusive access to the storage on the given path.
|
// lock file for exclusive access to the storage on the given path.
|
||||||
flockF *os.File
|
flockF *os.File
|
||||||
|
|
||||||
|
@ -209,6 +214,19 @@ func MustOpenStorage(path string, retention time.Duration, maxHourlySeries, maxD
|
||||||
logger.Panicf("FATAL: incomplete vmrestore run; run vmrestore again or remove lock file %q", restoreLockF)
|
logger.Panicf("FATAL: incomplete vmrestore run; run vmrestore again or remove lock file %q", restoreLockF)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
nodeIDFileF := filepath.Join(path, nodeIDFilename)
|
||||||
|
if fs.IsPathExist(nodeIDFileF) {
|
||||||
|
r, err := os.Open(nodeIDFileF)
|
||||||
|
nodeID, err := io.ReadAll(r)
|
||||||
|
if err != nil {
|
||||||
|
logger.Panicf("FATAL: cannot read nodeID from %q: %s", nodeIDFileF, err)
|
||||||
|
}
|
||||||
|
s.nodeID = encoding.UnmarshalUint64(nodeID)
|
||||||
|
} else {
|
||||||
|
nodeID := rand.Uint64()
|
||||||
|
s.nodeID = nodeID
|
||||||
|
}
|
||||||
|
|
||||||
// Pre-create snapshots directory if it is missing.
|
// Pre-create snapshots directory if it is missing.
|
||||||
snapshotsPath := filepath.Join(path, snapshotsDirname)
|
snapshotsPath := filepath.Join(path, snapshotsDirname)
|
||||||
fs.MustMkdirIfNotExist(snapshotsPath)
|
fs.MustMkdirIfNotExist(snapshotsPath)
|
||||||
|
@ -897,6 +915,8 @@ func (s *Storage) MustClose() {
|
||||||
nextDayMetricIDs := s.nextDayMetricIDs.Load()
|
nextDayMetricIDs := s.nextDayMetricIDs.Load()
|
||||||
s.mustSaveNextDayMetricIDs(nextDayMetricIDs)
|
s.mustSaveNextDayMetricIDs(nextDayMetricIDs)
|
||||||
|
|
||||||
|
s.mustSaveNodeID()
|
||||||
|
|
||||||
// Release lock file.
|
// Release lock file.
|
||||||
fs.MustClose(s.flockF)
|
fs.MustClose(s.flockF)
|
||||||
s.flockF = nil
|
s.flockF = nil
|
||||||
|
@ -1032,6 +1052,16 @@ func (s *Storage) mustLoadHourMetricIDs(hour uint64, name string) *hourMetricIDs
|
||||||
return hm
|
return hm
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *Storage) mustSaveNodeID() {
|
||||||
|
path := filepath.Join(s.path, nodeIDFilename)
|
||||||
|
dst := make([]byte, 0)
|
||||||
|
dst = encoding.MarshalUint64(dst, s.nodeID)
|
||||||
|
|
||||||
|
if err := os.WriteFile(path, dst, 0644); err != nil {
|
||||||
|
logger.Panicf("FATAL: cannot write %d bytes to %q: %s", len(dst), path, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (s *Storage) mustSaveNextDayMetricIDs(e *byDateMetricIDEntry) {
|
func (s *Storage) mustSaveNextDayMetricIDs(e *byDateMetricIDEntry) {
|
||||||
name := "next_day_metric_ids_v2"
|
name := "next_day_metric_ids_v2"
|
||||||
path := filepath.Join(s.cachePath, name)
|
path := filepath.Join(s.cachePath, name)
|
||||||
|
|
Loading…
Reference in a new issue