mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-10 15:14:09 +00:00
lib/storage: do not pass retentionMsecs and isReadOnly args explicitly - access them via Storage arg
This makes code easier to read.
This is a follow-up after d2d30581a0
This commit is contained in:
parent
89a1108b1a
commit
e2f0b76ebf
10 changed files with 37 additions and 57 deletions
|
@ -2109,6 +2109,7 @@ func newTestStorage() *Storage {
|
|||
metricNameCache: workingsetcache.New(1234),
|
||||
tsidCache: workingsetcache.New(1234),
|
||||
dateMetricIDCache: newDateMetricIDCache(),
|
||||
retentionMsecs: maxRetentionMsecs,
|
||||
}
|
||||
s.setDeletedMetricIDs(&uint64set.Set{})
|
||||
return s
|
||||
|
|
|
@ -120,14 +120,6 @@ type partition struct {
|
|||
// The parent storage.
|
||||
s *Storage
|
||||
|
||||
// data retention in milliseconds.
|
||||
// Used for deleting data outside the retention during background merge.
|
||||
retentionMsecs int64
|
||||
|
||||
// Whether the storage is in read-only mode.
|
||||
// Background merge is stopped in read-only mode.
|
||||
isReadOnly *uint32
|
||||
|
||||
// Name is the name of the partition in the form YYYY_MM.
|
||||
name string
|
||||
|
||||
|
@ -201,7 +193,7 @@ func (pw *partWrapper) decRef() {
|
|||
|
||||
// createPartition creates new partition for the given timestamp and the given paths
|
||||
// to small and big partitions.
|
||||
func createPartition(timestamp int64, smallPartitionsPath, bigPartitionsPath string, s *Storage, retentionMsecs int64, isReadOnly *uint32) (*partition, error) {
|
||||
func createPartition(timestamp int64, smallPartitionsPath, bigPartitionsPath string, s *Storage) (*partition, error) {
|
||||
name := timestampToPartitionName(timestamp)
|
||||
smallPartsPath := filepath.Clean(smallPartitionsPath) + "/" + name
|
||||
bigPartsPath := filepath.Clean(bigPartitionsPath) + "/" + name
|
||||
|
@ -214,7 +206,7 @@ func createPartition(timestamp int64, smallPartitionsPath, bigPartitionsPath str
|
|||
return nil, fmt.Errorf("cannot create directories for big parts %q: %w", bigPartsPath, err)
|
||||
}
|
||||
|
||||
pt := newPartition(name, smallPartsPath, bigPartsPath, s, retentionMsecs, isReadOnly)
|
||||
pt := newPartition(name, smallPartsPath, bigPartsPath, s)
|
||||
pt.tr.fromPartitionTimestamp(timestamp)
|
||||
pt.startMergeWorkers()
|
||||
pt.startRawRowsFlusher()
|
||||
|
@ -240,7 +232,7 @@ func (pt *partition) Drop() {
|
|||
}
|
||||
|
||||
// openPartition opens the existing partition from the given paths.
|
||||
func openPartition(smallPartsPath, bigPartsPath string, s *Storage, retentionMsecs int64, isReadOnly *uint32) (*partition, error) {
|
||||
func openPartition(smallPartsPath, bigPartsPath string, s *Storage) (*partition, error) {
|
||||
smallPartsPath = filepath.Clean(smallPartsPath)
|
||||
bigPartsPath = filepath.Clean(bigPartsPath)
|
||||
|
||||
|
@ -264,7 +256,7 @@ func openPartition(smallPartsPath, bigPartsPath string, s *Storage, retentionMse
|
|||
return nil, fmt.Errorf("cannot open big parts from %q: %w", bigPartsPath, err)
|
||||
}
|
||||
|
||||
pt := newPartition(name, smallPartsPath, bigPartsPath, s, retentionMsecs, isReadOnly)
|
||||
pt := newPartition(name, smallPartsPath, bigPartsPath, s)
|
||||
pt.smallParts = smallParts
|
||||
pt.bigParts = bigParts
|
||||
if err := pt.tr.fromPartitionName(name); err != nil {
|
||||
|
@ -278,15 +270,13 @@ func openPartition(smallPartsPath, bigPartsPath string, s *Storage, retentionMse
|
|||
return pt, nil
|
||||
}
|
||||
|
||||
func newPartition(name, smallPartsPath, bigPartsPath string, s *Storage, retentionMsecs int64, isReadOnly *uint32) *partition {
|
||||
func newPartition(name, smallPartsPath, bigPartsPath string, s *Storage) *partition {
|
||||
p := &partition{
|
||||
name: name,
|
||||
smallPartsPath: smallPartsPath,
|
||||
bigPartsPath: bigPartsPath,
|
||||
|
||||
s: s,
|
||||
retentionMsecs: retentionMsecs,
|
||||
isReadOnly: isReadOnly,
|
||||
s: s,
|
||||
|
||||
mergeIdx: uint64(time.Now().UnixNano()),
|
||||
stopCh: make(chan struct{}),
|
||||
|
@ -1030,7 +1020,7 @@ func getMaxOutBytes(path string, workersCount int) uint64 {
|
|||
}
|
||||
|
||||
func (pt *partition) canBackgroundMerge() bool {
|
||||
return atomic.LoadUint32(pt.isReadOnly) == 0
|
||||
return atomic.LoadUint32(&pt.s.isReadOnly) == 0
|
||||
}
|
||||
|
||||
var errReadOnlyMode = fmt.Errorf("storage is in readonly mode")
|
||||
|
@ -1217,7 +1207,7 @@ func (pt *partition) mergeParts(pws []*partWrapper, stopCh <-chan struct{}) erro
|
|||
atomic.AddUint64(&pt.smallMergesCount, 1)
|
||||
atomic.AddUint64(&pt.activeSmallMerges, 1)
|
||||
}
|
||||
retentionDeadline := timestampFromTime(startTime) - pt.retentionMsecs
|
||||
retentionDeadline := timestampFromTime(startTime) - pt.s.retentionMsecs
|
||||
err := mergeBlockStreams(&ph, bsw, bsrs, stopCh, pt.s, retentionDeadline, rowsMerged, rowsDeleted)
|
||||
if isBigPart {
|
||||
atomic.AddUint64(&pt.activeBigMerges, ^uint64(0))
|
||||
|
@ -1387,7 +1377,7 @@ func (pt *partition) stalePartsRemover() {
|
|||
func (pt *partition) removeStaleParts() {
|
||||
m := make(map[*partWrapper]bool)
|
||||
startTime := time.Now()
|
||||
retentionDeadline := timestampFromTime(startTime) - pt.retentionMsecs
|
||||
retentionDeadline := timestampFromTime(startTime) - pt.s.retentionMsecs
|
||||
|
||||
pt.partsLock.Lock()
|
||||
for _, pw := range pt.bigParts {
|
||||
|
@ -1418,7 +1408,7 @@ func (pt *partition) removeStaleParts() {
|
|||
// consistent snapshots with table.CreateSnapshot().
|
||||
pt.snapshotLock.RLock()
|
||||
for pw := range m {
|
||||
logger.Infof("removing part %q, since its data is out of the configured retention (%d secs)", pw.p.path, pt.retentionMsecs/1000)
|
||||
logger.Infof("removing part %q, since its data is out of the configured retention (%d secs)", pw.p.path, pt.s.retentionMsecs/1000)
|
||||
fs.MustRemoveDirAtomic(pw.p.path)
|
||||
}
|
||||
// There is no need in calling fs.MustSyncPath() on pt.smallPartsPath and pt.bigPartsPath,
|
||||
|
|
|
@ -166,9 +166,8 @@ func testPartitionSearchEx(t *testing.T, ptt int64, tr TimeRange, partsCount, ma
|
|||
|
||||
// Create partition from rowss and test search on it.
|
||||
strg := newTestStorage()
|
||||
retentionMsecs := timestampFromTime(time.Now()) - ptr.MinTimestamp + 3600*1000
|
||||
var isReadOnly uint32
|
||||
pt, err := createPartition(ptt, "./small-table", "./big-table", strg, retentionMsecs, &isReadOnly)
|
||||
strg.retentionMsecs = timestampFromTime(time.Now()) - ptr.MinTimestamp + 3600*1000
|
||||
pt, err := createPartition(ptt, "./small-table", "./big-table", strg)
|
||||
if err != nil {
|
||||
t.Fatalf("cannot create partition: %s", err)
|
||||
}
|
||||
|
@ -192,7 +191,7 @@ func testPartitionSearchEx(t *testing.T, ptt int64, tr TimeRange, partsCount, ma
|
|||
pt.MustClose()
|
||||
|
||||
// Open the created partition and test search on it.
|
||||
pt, err = openPartition(smallPartsPath, bigPartsPath, strg, retentionMsecs, &isReadOnly)
|
||||
pt, err = openPartition(smallPartsPath, bigPartsPath, strg)
|
||||
if err != nil {
|
||||
t.Fatalf("cannot open partition: %s", err)
|
||||
}
|
||||
|
|
|
@ -258,7 +258,7 @@ func OpenStorage(path string, retentionMsecs int64, maxHourlySeries, maxDailySer
|
|||
|
||||
// Load data
|
||||
tablePath := path + "/data"
|
||||
tb, err := openTable(tablePath, s, retentionMsecs, &s.isReadOnly)
|
||||
tb, err := openTable(tablePath, s)
|
||||
if err != nil {
|
||||
s.idb().MustClose()
|
||||
return nil, fmt.Errorf("cannot open table at %q: %w", tablePath, err)
|
||||
|
|
|
@ -20,9 +20,7 @@ type table struct {
|
|||
smallPartitionsPath string
|
||||
bigPartitionsPath string
|
||||
|
||||
s *Storage
|
||||
retentionMsecs int64
|
||||
isReadOnly *uint32
|
||||
s *Storage
|
||||
|
||||
ptws []*partitionWrapper
|
||||
ptwsLock sync.Mutex
|
||||
|
@ -78,12 +76,10 @@ func (ptw *partitionWrapper) scheduleToDrop() {
|
|||
atomic.AddUint64(&ptw.mustDrop, 1)
|
||||
}
|
||||
|
||||
// openTable opens a table on the given path with the given retentionMsecs.
|
||||
// openTable opens a table on the given path.
|
||||
//
|
||||
// The table is created if it doesn't exist.
|
||||
//
|
||||
// Data older than the retentionMsecs may be dropped at any time.
|
||||
func openTable(path string, s *Storage, retentionMsecs int64, isReadOnly *uint32) (*table, error) {
|
||||
func openTable(path string, s *Storage) (*table, error) {
|
||||
path = filepath.Clean(path)
|
||||
|
||||
// Create a directory for the table if it doesn't exist yet.
|
||||
|
@ -121,7 +117,7 @@ func openTable(path string, s *Storage, retentionMsecs int64, isReadOnly *uint32
|
|||
fs.MustRemoveTemporaryDirs(bigSnapshotsPath)
|
||||
|
||||
// Open partitions.
|
||||
pts, err := openPartitions(smallPartitionsPath, bigPartitionsPath, s, retentionMsecs, isReadOnly)
|
||||
pts, err := openPartitions(smallPartitionsPath, bigPartitionsPath, s)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot open partitions in the table %q: %w", path, err)
|
||||
}
|
||||
|
@ -131,8 +127,6 @@ func openTable(path string, s *Storage, retentionMsecs int64, isReadOnly *uint32
|
|||
smallPartitionsPath: smallPartitionsPath,
|
||||
bigPartitionsPath: bigPartitionsPath,
|
||||
s: s,
|
||||
retentionMsecs: retentionMsecs,
|
||||
isReadOnly: isReadOnly,
|
||||
|
||||
flockF: flockF,
|
||||
|
||||
|
@ -365,7 +359,7 @@ func (tb *table) AddRows(rows []rawRow) error {
|
|||
continue
|
||||
}
|
||||
|
||||
pt, err := createPartition(r.Timestamp, tb.smallPartitionsPath, tb.bigPartitionsPath, tb.s, tb.retentionMsecs, tb.isReadOnly)
|
||||
pt, err := createPartition(r.Timestamp, tb.smallPartitionsPath, tb.bigPartitionsPath, tb.s)
|
||||
if err != nil {
|
||||
// Return only the first error, since it has no sense in returning all errors.
|
||||
tb.ptwsLock.Unlock()
|
||||
|
@ -381,7 +375,7 @@ func (tb *table) AddRows(rows []rawRow) error {
|
|||
|
||||
func (tb *table) getMinMaxTimestamps() (int64, int64) {
|
||||
now := int64(fasttime.UnixTimestamp() * 1000)
|
||||
minTimestamp := now - tb.retentionMsecs
|
||||
minTimestamp := now - tb.s.retentionMsecs
|
||||
maxTimestamp := now + 2*24*3600*1000 // allow max +2 days from now due to timezones shit :)
|
||||
if minTimestamp < 0 {
|
||||
// Negative timestamps aren't supported by the storage.
|
||||
|
@ -411,7 +405,7 @@ func (tb *table) retentionWatcher() {
|
|||
case <-ticker.C:
|
||||
}
|
||||
|
||||
minTimestamp := int64(fasttime.UnixTimestamp()*1000) - tb.retentionMsecs
|
||||
minTimestamp := int64(fasttime.UnixTimestamp()*1000) - tb.s.retentionMsecs
|
||||
var ptwsDrop []*partitionWrapper
|
||||
tb.ptwsLock.Lock()
|
||||
dst := tb.ptws[:0]
|
||||
|
@ -503,7 +497,7 @@ func (tb *table) PutPartitions(ptws []*partitionWrapper) {
|
|||
}
|
||||
}
|
||||
|
||||
func openPartitions(smallPartitionsPath, bigPartitionsPath string, s *Storage, retentionMsecs int64, isReadOnly *uint32) ([]*partition, error) {
|
||||
func openPartitions(smallPartitionsPath, bigPartitionsPath string, s *Storage) ([]*partition, error) {
|
||||
// Certain partition directories in either `big` or `small` dir may be missing
|
||||
// after restoring from backup. So populate partition names from both dirs.
|
||||
ptNames := make(map[string]bool)
|
||||
|
@ -517,7 +511,7 @@ func openPartitions(smallPartitionsPath, bigPartitionsPath string, s *Storage, r
|
|||
for ptName := range ptNames {
|
||||
smallPartsPath := smallPartitionsPath + "/" + ptName
|
||||
bigPartsPath := bigPartitionsPath + "/" + ptName
|
||||
pt, err := openPartition(smallPartsPath, bigPartsPath, s, retentionMsecs, isReadOnly)
|
||||
pt, err := openPartition(smallPartsPath, bigPartsPath, s)
|
||||
if err != nil {
|
||||
mustClosePartitions(pts)
|
||||
return nil, fmt.Errorf("cannot open partition %q: %w", ptName, err)
|
||||
|
|
|
@ -66,7 +66,7 @@ func (ts *tableSearch) Init(tb *table, tsids []TSID, tr TimeRange) {
|
|||
// Adjust tr.MinTimestamp, so it doesn't obtain data older
|
||||
// than the tb retention.
|
||||
now := int64(fasttime.UnixTimestamp() * 1000)
|
||||
minTimestamp := now - tb.retentionMsecs
|
||||
minTimestamp := now - tb.s.retentionMsecs
|
||||
if tr.MinTimestamp < minTimestamp {
|
||||
tr.MinTimestamp = minTimestamp
|
||||
}
|
||||
|
|
|
@ -182,8 +182,7 @@ func testTableSearchEx(t *testing.T, trData, trSearch TimeRange, partitionsCount
|
|||
|
||||
// Create a table from rowss and test search on it.
|
||||
strg := newTestStorage()
|
||||
var isReadOnly uint32
|
||||
tb, err := openTable("./test-table", strg, maxRetentionMsecs, &isReadOnly)
|
||||
tb, err := openTable("./test-table", strg)
|
||||
if err != nil {
|
||||
t.Fatalf("cannot create table: %s", err)
|
||||
}
|
||||
|
@ -204,7 +203,7 @@ func testTableSearchEx(t *testing.T, trData, trSearch TimeRange, partitionsCount
|
|||
tb.MustClose()
|
||||
|
||||
// Open the created table and test search on it.
|
||||
tb, err = openTable("./test-table", strg, maxRetentionMsecs, &isReadOnly)
|
||||
tb, err = openTable("./test-table", strg)
|
||||
if err != nil {
|
||||
t.Fatalf("cannot open table: %s", err)
|
||||
}
|
||||
|
|
|
@ -45,8 +45,7 @@ func openBenchTable(b *testing.B, startTimestamp int64, rowsPerInsert, rowsCount
|
|||
createdBenchTables[path] = true
|
||||
}
|
||||
strg := newTestStorage()
|
||||
var isReadOnly uint32
|
||||
tb, err := openTable(path, strg, maxRetentionMsecs, &isReadOnly)
|
||||
tb, err := openTable(path, strg)
|
||||
if err != nil {
|
||||
b.Fatalf("cnanot open table %q: %s", path, err)
|
||||
}
|
||||
|
@ -70,8 +69,7 @@ func createBenchTable(b *testing.B, path string, startTimestamp int64, rowsPerIn
|
|||
b.Helper()
|
||||
|
||||
strg := newTestStorage()
|
||||
var isReadOnly uint32
|
||||
tb, err := openTable(path, strg, maxRetentionMsecs, &isReadOnly)
|
||||
tb, err := openTable(path, strg)
|
||||
if err != nil {
|
||||
b.Fatalf("cannot open table %q: %s", path, err)
|
||||
}
|
||||
|
|
|
@ -18,8 +18,8 @@ func TestTableOpenClose(t *testing.T) {
|
|||
|
||||
// Create a new table
|
||||
strg := newTestStorage()
|
||||
var isReadOnly uint32
|
||||
tb, err := openTable(path, strg, retentionMsecs, &isReadOnly)
|
||||
strg.retentionMsecs = retentionMsecs
|
||||
tb, err := openTable(path, strg)
|
||||
if err != nil {
|
||||
t.Fatalf("cannot create new table: %s", err)
|
||||
}
|
||||
|
@ -29,7 +29,7 @@ func TestTableOpenClose(t *testing.T) {
|
|||
|
||||
// Re-open created table multiple times.
|
||||
for i := 0; i < 10; i++ {
|
||||
tb, err := openTable(path, strg, retentionMsecs, &isReadOnly)
|
||||
tb, err := openTable(path, strg)
|
||||
if err != nil {
|
||||
t.Fatalf("cannot open created table: %s", err)
|
||||
}
|
||||
|
@ -46,15 +46,15 @@ func TestTableOpenMultipleTimes(t *testing.T) {
|
|||
}()
|
||||
|
||||
strg := newTestStorage()
|
||||
var isReadOnly uint32
|
||||
tb1, err := openTable(path, strg, retentionMsecs, &isReadOnly)
|
||||
strg.retentionMsecs = retentionMsecs
|
||||
tb1, err := openTable(path, strg)
|
||||
if err != nil {
|
||||
t.Fatalf("cannot open table the first time: %s", err)
|
||||
}
|
||||
defer tb1.MustClose()
|
||||
|
||||
for i := 0; i < 10; i++ {
|
||||
tb2, err := openTable(path, strg, retentionMsecs, &isReadOnly)
|
||||
tb2, err := openTable(path, strg)
|
||||
if err == nil {
|
||||
tb2.MustClose()
|
||||
t.Fatalf("expecting non-nil error when opening already opened table")
|
||||
|
|
|
@ -47,8 +47,7 @@ func benchmarkTableAddRows(b *testing.B, rowsPerInsert, tsidsCount int) {
|
|||
tablePath := "./benchmarkTableAddRows"
|
||||
strg := newTestStorage()
|
||||
for i := 0; i < b.N; i++ {
|
||||
var isReadOnly uint32
|
||||
tb, err := openTable(tablePath, strg, maxRetentionMsecs, &isReadOnly)
|
||||
tb, err := openTable(tablePath, strg)
|
||||
if err != nil {
|
||||
b.Fatalf("cannot open table %q: %s", tablePath, err)
|
||||
}
|
||||
|
@ -96,7 +95,7 @@ func benchmarkTableAddRows(b *testing.B, rowsPerInsert, tsidsCount int) {
|
|||
tb.MustClose()
|
||||
|
||||
// Open the table from files and verify the rows count on it
|
||||
tb, err = openTable(tablePath, strg, maxRetentionMsecs, &isReadOnly)
|
||||
tb, err = openTable(tablePath, strg)
|
||||
if err != nil {
|
||||
b.Fatalf("cannot open table %q: %s", tablePath, err)
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue