From fedfc9e6860cc63f0a454fb38754b5a949bac2c1 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Wed, 1 Jun 2022 14:21:12 +0300 Subject: [PATCH] lib/storage: stop background merge when storage enters read-only mode This should prevent from `no space left on device` errors when VictoriaMetrics under-estimates the additional disk space needed for background merge. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2603 --- docs/CHANGELOG.md | 1 + lib/mergeset/table.go | 14 ++++++++++- lib/mergeset/table_search_test.go | 9 ++++--- lib/mergeset/table_search_timing_test.go | 3 ++- lib/mergeset/table_test.go | 30 ++++++++++++++---------- lib/storage/index_db.go | 6 ++--- lib/storage/index_db_test.go | 14 +++++++---- lib/storage/index_db_timing_test.go | 9 ++++--- lib/storage/partition.go | 30 ++++++++++++++++++++---- lib/storage/partition_search_test.go | 5 ++-- lib/storage/storage.go | 8 +++---- lib/storage/table.go | 12 ++++++---- lib/storage/table_search_test.go | 5 ++-- lib/storage/table_search_timing_test.go | 6 +++-- lib/storage/table_test.go | 10 ++++---- lib/storage/table_timing_test.go | 5 ++-- 16 files changed, 113 insertions(+), 54 deletions(-) diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 149741746..9571f0b7d 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -23,6 +23,7 @@ The following tip changes can be tested by building VictoriaMetrics components f * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add `-promscrape.suppressScrapeErrorsDelay` command-line flag, which can be used for delaying and aggregating the logging of per-target scrape errors. This may reduce the amounts of logs when `vmagent` scrapes many unreliable targets. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2575). Thanks to @jelmd for [the initial implementation](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/2576). * BUGFIX: [vmalert](https://docs.victoriametrics.com/vmalert.html): properly apply `alert_relabel_configs` relabeling rules to `-notifier.config` according to [these docs](https://docs.victoriametrics.com/vmalert.html#notifier-configuration-file). Thanks to @spectvtor for [the bugfix](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/2633). +* BUGFIX: deny [background merge](https://valyala.medium.com/how-victoriametrics-makes-instant-snapshots-for-multi-terabyte-time-series-data-e1f3fb0e0282) when the storage enters read-only mode, e.g. when free disk space becomes lower than `-storage.minFreeDiskSpaceBytes`. Background merge needs additional disk space, so it could result in `no space left on device` errors. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2603). ## [v1.77.2](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.77.2) diff --git a/lib/mergeset/table.go b/lib/mergeset/table.go index 217c454e8..eea3bf93c 100644 --- a/lib/mergeset/table.go +++ b/lib/mergeset/table.go @@ -91,6 +91,7 @@ type Table struct { needFlushCallbackCall uint32 prepareBlock PrepareBlockCallback + isReadOnly *uint32 partsLock sync.Mutex parts []*partWrapper @@ -254,7 +255,7 @@ func (pw *partWrapper) decRef() { // to persistent storage. // // The table is created if it doesn't exist yet. -func OpenTable(path string, flushCallback func(), prepareBlock PrepareBlockCallback) (*Table, error) { +func OpenTable(path string, flushCallback func(), prepareBlock PrepareBlockCallback, isReadOnly *uint32) (*Table, error) { path = filepath.Clean(path) logger.Infof("opening table %q...", path) startTime := time.Now() @@ -280,6 +281,7 @@ func OpenTable(path string, flushCallback func(), prepareBlock PrepareBlockCallb path: path, flushCallback: flushCallback, prepareBlock: prepareBlock, + isReadOnly: isReadOnly, parts: pws, mergeIdx: uint64(time.Now().UnixNano()), flockF: flockF, @@ -799,7 +801,17 @@ func (tb *Table) startPartMergers() { } } +func (tb *Table) canBackgroundMerge() bool { + return atomic.LoadUint32(tb.isReadOnly) == 0 +} + func (tb *Table) mergeExistingParts(isFinal bool) error { + if !tb.canBackgroundMerge() { + // Do not perform background merge in read-only mode + // in order to prevent from disk space shortage. + // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2603 + return nil + } n := fs.MustGetFreeSpace(tb.path) // Divide free space by the max number of concurrent merges. maxOutBytes := n / uint64(mergeWorkersCount) diff --git a/lib/mergeset/table_search_test.go b/lib/mergeset/table_search_test.go index 1d91c1ae3..249aa3109 100644 --- a/lib/mergeset/table_search_test.go +++ b/lib/mergeset/table_search_test.go @@ -40,7 +40,8 @@ func TestTableSearchSerial(t *testing.T) { func() { // Re-open the table and verify the search works. - tb, err := OpenTable(path, nil, nil) + var isReadOnly uint32 + tb, err := OpenTable(path, nil, nil, &isReadOnly) if err != nil { t.Fatalf("cannot open table: %s", err) } @@ -75,7 +76,8 @@ func TestTableSearchConcurrent(t *testing.T) { // Re-open the table and verify the search works. func() { - tb, err := OpenTable(path, nil, nil) + var isReadOnly uint32 + tb, err := OpenTable(path, nil, nil, &isReadOnly) if err != nil { t.Fatalf("cannot open table: %s", err) } @@ -151,7 +153,8 @@ func newTestTable(path string, itemsCount int) (*Table, []string, error) { flushCallback := func() { atomic.AddUint64(&flushes, 1) } - tb, err := OpenTable(path, flushCallback, nil) + var isReadOnly uint32 + tb, err := OpenTable(path, flushCallback, nil, &isReadOnly) if err != nil { return nil, nil, fmt.Errorf("cannot open table: %w", err) } diff --git a/lib/mergeset/table_search_timing_test.go b/lib/mergeset/table_search_timing_test.go index add04e46e..555fd8578 100644 --- a/lib/mergeset/table_search_timing_test.go +++ b/lib/mergeset/table_search_timing_test.go @@ -32,7 +32,8 @@ func benchmarkTableSearch(b *testing.B, itemsCount int) { // Force finishing pending merges tb.MustClose() - tb, err = OpenTable(path, nil, nil) + var isReadOnly uint32 + tb, err = OpenTable(path, nil, nil, &isReadOnly) if err != nil { b.Fatalf("unexpected error when re-opening table %q: %s", path, err) } diff --git a/lib/mergeset/table_test.go b/lib/mergeset/table_test.go index f61d2e635..f99ab937c 100644 --- a/lib/mergeset/table_test.go +++ b/lib/mergeset/table_test.go @@ -21,7 +21,8 @@ func TestTableOpenClose(t *testing.T) { }() // Create a new table - tb, err := OpenTable(path, nil, nil) + var isReadOnly uint32 + tb, err := OpenTable(path, nil, nil, &isReadOnly) if err != nil { t.Fatalf("cannot create new table: %s", err) } @@ -31,7 +32,7 @@ func TestTableOpenClose(t *testing.T) { // Re-open created table multiple times. for i := 0; i < 10; i++ { - tb, err := OpenTable(path, nil, nil) + tb, err := OpenTable(path, nil, nil, &isReadOnly) if err != nil { t.Fatalf("cannot open created table: %s", err) } @@ -45,14 +46,15 @@ func TestTableOpenMultipleTimes(t *testing.T) { _ = os.RemoveAll(path) }() - tb1, err := OpenTable(path, nil, nil) + var isReadOnly uint32 + tb1, err := OpenTable(path, nil, nil, &isReadOnly) if err != nil { t.Fatalf("cannot open table: %s", err) } defer tb1.MustClose() for i := 0; i < 10; i++ { - tb2, err := OpenTable(path, nil, nil) + tb2, err := OpenTable(path, nil, nil, &isReadOnly) if err == nil { tb2.MustClose() t.Fatalf("expecting non-nil error when opening already opened table") @@ -73,7 +75,8 @@ func TestTableAddItemSerial(t *testing.T) { flushCallback := func() { atomic.AddUint64(&flushes, 1) } - tb, err := OpenTable(path, flushCallback, nil) + var isReadOnly uint32 + tb, err := OpenTable(path, flushCallback, nil, &isReadOnly) if err != nil { t.Fatalf("cannot open %q: %s", path, err) } @@ -99,7 +102,7 @@ func TestTableAddItemSerial(t *testing.T) { testReopenTable(t, path, itemsCount) // Add more items in order to verify merge between inmemory parts and file-based parts. - tb, err = OpenTable(path, nil, nil) + tb, err = OpenTable(path, nil, nil, &isReadOnly) if err != nil { t.Fatalf("cannot open %q: %s", path, err) } @@ -132,7 +135,8 @@ func TestTableCreateSnapshotAt(t *testing.T) { _ = os.RemoveAll(path) }() - tb, err := OpenTable(path, nil, nil) + var isReadOnly uint32 + tb, err := OpenTable(path, nil, nil, &isReadOnly) if err != nil { t.Fatalf("cannot open %q: %s", path, err) } @@ -163,13 +167,13 @@ func TestTableCreateSnapshotAt(t *testing.T) { }() // Verify snapshots contain all the data. - tb1, err := OpenTable(snapshot1, nil, nil) + tb1, err := OpenTable(snapshot1, nil, nil, &isReadOnly) if err != nil { t.Fatalf("cannot open %q: %s", path, err) } defer tb1.MustClose() - tb2, err := OpenTable(snapshot2, nil, nil) + tb2, err := OpenTable(snapshot2, nil, nil, &isReadOnly) if err != nil { t.Fatalf("cannot open %q: %s", path, err) } @@ -222,7 +226,8 @@ func TestTableAddItemsConcurrent(t *testing.T) { atomic.AddUint64(&itemsMerged, uint64(len(items))) return data, items } - tb, err := OpenTable(path, flushCallback, prepareBlock) + var isReadOnly uint32 + tb, err := OpenTable(path, flushCallback, prepareBlock, &isReadOnly) if err != nil { t.Fatalf("cannot open %q: %s", path, err) } @@ -252,7 +257,7 @@ func TestTableAddItemsConcurrent(t *testing.T) { testReopenTable(t, path, itemsCount) // Add more items in order to verify merge between inmemory parts and file-based parts. - tb, err = OpenTable(path, nil, nil) + tb, err = OpenTable(path, nil, nil, &isReadOnly) if err != nil { t.Fatalf("cannot open %q: %s", path, err) } @@ -294,7 +299,8 @@ func testReopenTable(t *testing.T, path string, itemsCount int) { t.Helper() for i := 0; i < 10; i++ { - tb, err := OpenTable(path, nil, nil) + var isReadOnly uint32 + tb, err := OpenTable(path, nil, nil, &isReadOnly) if err != nil { t.Fatalf("cannot re-open %q: %s", path, err) } diff --git a/lib/storage/index_db.go b/lib/storage/index_db.go index 8194eb18a..f8e760d09 100644 --- a/lib/storage/index_db.go +++ b/lib/storage/index_db.go @@ -115,9 +115,9 @@ type indexDB struct { // The last segment of the path should contain unique hex value which // will be then used as indexDB.generation // -// The rotationTimestamp must be set to the current unix timestamp when ipenIndexDB +// The rotationTimestamp must be set to the current unix timestamp when openIndexDB // is called when creating new indexdb during indexdb rotation. -func openIndexDB(path string, s *Storage, rotationTimestamp uint64) (*indexDB, error) { +func openIndexDB(path string, s *Storage, rotationTimestamp uint64, isReadOnly *uint32) (*indexDB, error) { if s == nil { logger.Panicf("BUG: Storage must be nin-nil") } @@ -128,7 +128,7 @@ func openIndexDB(path string, s *Storage, rotationTimestamp uint64) (*indexDB, e return nil, fmt.Errorf("failed to parse indexdb path %q: %w", path, err) } - tb, err := mergeset.OpenTable(path, invalidateTagFiltersCache, mergeTagToMetricIDsRows) + tb, err := mergeset.OpenTable(path, invalidateTagFiltersCache, mergeTagToMetricIDsRows, isReadOnly) if err != nil { return nil, fmt.Errorf("cannot open indexDB %q: %w", path, err) } diff --git a/lib/storage/index_db_test.go b/lib/storage/index_db_test.go index f96af2062..4bdea0a6e 100644 --- a/lib/storage/index_db_test.go +++ b/lib/storage/index_db_test.go @@ -477,7 +477,8 @@ func TestIndexDBOpenClose(t *testing.T) { defer stopTestStorage(s) tableName := nextIndexDBTableName() for i := 0; i < 5; i++ { - db, err := openIndexDB(tableName, s, 0) + var isReadOnly uint32 + db, err := openIndexDB(tableName, s, 0, &isReadOnly) if err != nil { t.Fatalf("cannot open indexDB: %s", err) } @@ -498,7 +499,8 @@ func TestIndexDB(t *testing.T) { defer stopTestStorage(s) dbName := nextIndexDBTableName() - db, err := openIndexDB(dbName, s, 0) + var isReadOnly uint32 + db, err := openIndexDB(dbName, s, 0, &isReadOnly) if err != nil { t.Fatalf("cannot open indexDB: %s", err) } @@ -528,7 +530,7 @@ func TestIndexDB(t *testing.T) { // Re-open the db and verify it works as expected. db.MustClose() - db, err = openIndexDB(dbName, s, 0) + db, err = openIndexDB(dbName, s, 0, &isReadOnly) if err != nil { t.Fatalf("cannot open indexDB: %s", err) } @@ -548,7 +550,8 @@ func TestIndexDB(t *testing.T) { defer stopTestStorage(s) dbName := nextIndexDBTableName() - db, err := openIndexDB(dbName, s, 0) + var isReadOnly uint32 + db, err := openIndexDB(dbName, s, 0, &isReadOnly) if err != nil { t.Fatalf("cannot open indexDB: %s", err) } @@ -1677,7 +1680,8 @@ func TestSearchTSIDWithTimeRange(t *testing.T) { defer stopTestStorage(s) dbName := nextIndexDBTableName() - db, err := openIndexDB(dbName, s, 0) + var isReadOnly uint32 + db, err := openIndexDB(dbName, s, 0, &isReadOnly) if err != nil { t.Fatalf("cannot open indexDB: %s", err) } diff --git a/lib/storage/index_db_timing_test.go b/lib/storage/index_db_timing_test.go index 754e39f19..aaeb3c81b 100644 --- a/lib/storage/index_db_timing_test.go +++ b/lib/storage/index_db_timing_test.go @@ -45,7 +45,8 @@ func BenchmarkIndexDBAddTSIDs(b *testing.B) { defer stopTestStorage(s) dbName := nextIndexDBTableName() - db, err := openIndexDB(dbName, s, 0) + var isReadOnly uint32 + db, err := openIndexDB(dbName, s, 0, &isReadOnly) if err != nil { b.Fatalf("cannot open indexDB: %s", err) } @@ -109,7 +110,8 @@ func BenchmarkHeadPostingForMatchers(b *testing.B) { defer stopTestStorage(s) dbName := nextIndexDBTableName() - db, err := openIndexDB(dbName, s, 0) + var isReadOnly uint32 + db, err := openIndexDB(dbName, s, 0, &isReadOnly) if err != nil { b.Fatalf("cannot open indexDB: %s", err) } @@ -288,7 +290,8 @@ func BenchmarkIndexDBGetTSIDs(b *testing.B) { defer stopTestStorage(s) dbName := nextIndexDBTableName() - db, err := openIndexDB(dbName, s, 0) + var isReadOnly uint32 + db, err := openIndexDB(dbName, s, 0, &isReadOnly) if err != nil { b.Fatalf("cannot open indexDB: %s", err) } diff --git a/lib/storage/partition.go b/lib/storage/partition.go index 43439edcc..c61e281f1 100644 --- a/lib/storage/partition.go +++ b/lib/storage/partition.go @@ -126,6 +126,10 @@ type partition struct { // Used for deleting data outside the retention during background merge. retentionMsecs int64 + // Whether the storage is in read-only mode. + // Background merge is stopped in read-only mode. + isReadOnly *uint32 + // Name is the name of the partition in the form YYYY_MM. name string @@ -199,7 +203,8 @@ func (pw *partWrapper) decRef() { // createPartition creates new partition for the given timestamp and the given paths // to small and big partitions. -func createPartition(timestamp int64, smallPartitionsPath, bigPartitionsPath string, getDeletedMetricIDs func() *uint64set.Set, retentionMsecs int64) (*partition, error) { +func createPartition(timestamp int64, smallPartitionsPath, bigPartitionsPath string, + getDeletedMetricIDs func() *uint64set.Set, retentionMsecs int64, isReadOnly *uint32) (*partition, error) { name := timestampToPartitionName(timestamp) smallPartsPath := filepath.Clean(smallPartitionsPath) + "/" + name bigPartsPath := filepath.Clean(bigPartitionsPath) + "/" + name @@ -212,7 +217,7 @@ func createPartition(timestamp int64, smallPartitionsPath, bigPartitionsPath str return nil, fmt.Errorf("cannot create directories for big parts %q: %w", bigPartsPath, err) } - pt := newPartition(name, smallPartsPath, bigPartsPath, getDeletedMetricIDs, retentionMsecs) + pt := newPartition(name, smallPartsPath, bigPartsPath, getDeletedMetricIDs, retentionMsecs, isReadOnly) pt.tr.fromPartitionTimestamp(timestamp) pt.startMergeWorkers() pt.startRawRowsFlusher() @@ -238,7 +243,7 @@ func (pt *partition) Drop() { } // openPartition opens the existing partition from the given paths. -func openPartition(smallPartsPath, bigPartsPath string, getDeletedMetricIDs func() *uint64set.Set, retentionMsecs int64) (*partition, error) { +func openPartition(smallPartsPath, bigPartsPath string, getDeletedMetricIDs func() *uint64set.Set, retentionMsecs int64, isReadOnly *uint32) (*partition, error) { smallPartsPath = filepath.Clean(smallPartsPath) bigPartsPath = filepath.Clean(bigPartsPath) @@ -262,7 +267,7 @@ func openPartition(smallPartsPath, bigPartsPath string, getDeletedMetricIDs func return nil, fmt.Errorf("cannot open big parts from %q: %w", bigPartsPath, err) } - pt := newPartition(name, smallPartsPath, bigPartsPath, getDeletedMetricIDs, retentionMsecs) + pt := newPartition(name, smallPartsPath, bigPartsPath, getDeletedMetricIDs, retentionMsecs, isReadOnly) pt.smallParts = smallParts pt.bigParts = bigParts if err := pt.tr.fromPartitionName(name); err != nil { @@ -276,7 +281,7 @@ func openPartition(smallPartsPath, bigPartsPath string, getDeletedMetricIDs func return pt, nil } -func newPartition(name, smallPartsPath, bigPartsPath string, getDeletedMetricIDs func() *uint64set.Set, retentionMsecs int64) *partition { +func newPartition(name, smallPartsPath, bigPartsPath string, getDeletedMetricIDs func() *uint64set.Set, retentionMsecs int64, isReadOnly *uint32) *partition { p := &partition{ name: name, smallPartsPath: smallPartsPath, @@ -284,6 +289,7 @@ func newPartition(name, smallPartsPath, bigPartsPath string, getDeletedMetricIDs getDeletedMetricIDs: getDeletedMetricIDs, retentionMsecs: retentionMsecs, + isReadOnly: isReadOnly, mergeIdx: uint64(time.Now().UnixNano()), stopCh: make(chan struct{}), @@ -993,7 +999,16 @@ func getMaxOutBytes(path string, workersCount int) uint64 { return maxOutBytes } +func (pt *partition) canBackgroundMerge() bool { + return atomic.LoadUint32(pt.isReadOnly) == 0 +} + func (pt *partition) mergeBigParts(isFinal bool) error { + if !pt.canBackgroundMerge() { + // Do not perform merge in read-only mode, since this may result in disk space shortage. + // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2603 + return nil + } maxOutBytes := getMaxOutBytes(pt.bigPartsPath, bigMergeWorkersCount) pt.partsLock.Lock() @@ -1005,6 +1020,11 @@ func (pt *partition) mergeBigParts(isFinal bool) error { } func (pt *partition) mergeSmallParts(isFinal bool) error { + if !pt.canBackgroundMerge() { + // Do not perform merge in read-only mode, since this may result in disk space shortage. + // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2603 + return nil + } // Try merging small parts to a big part at first. maxBigPartOutBytes := getMaxOutBytes(pt.bigPartsPath, bigMergeWorkersCount) pt.partsLock.Lock() diff --git a/lib/storage/partition_search_test.go b/lib/storage/partition_search_test.go index 42c90fe83..2ea714e33 100644 --- a/lib/storage/partition_search_test.go +++ b/lib/storage/partition_search_test.go @@ -168,7 +168,8 @@ func testPartitionSearchEx(t *testing.T, ptt int64, tr TimeRange, partsCount, ma // Create partition from rowss and test search on it. retentionMsecs := timestampFromTime(time.Now()) - ptr.MinTimestamp + 3600*1000 - pt, err := createPartition(ptt, "./small-table", "./big-table", nilGetDeletedMetricIDs, retentionMsecs) + var isReadOnly uint32 + pt, err := createPartition(ptt, "./small-table", "./big-table", nilGetDeletedMetricIDs, retentionMsecs, &isReadOnly) if err != nil { t.Fatalf("cannot create partition: %s", err) } @@ -192,7 +193,7 @@ func testPartitionSearchEx(t *testing.T, ptt int64, tr TimeRange, partsCount, ma pt.MustClose() // Open the created partition and test search on it. - pt, err = openPartition(smallPartsPath, bigPartsPath, nilGetDeletedMetricIDs, retentionMsecs) + pt, err = openPartition(smallPartsPath, bigPartsPath, nilGetDeletedMetricIDs, retentionMsecs, &isReadOnly) if err != nil { t.Fatalf("cannot open partition: %s", err) } diff --git a/lib/storage/storage.go b/lib/storage/storage.go index be805caca..fe280b7c4 100644 --- a/lib/storage/storage.go +++ b/lib/storage/storage.go @@ -269,7 +269,7 @@ func OpenStorage(path string, retentionMsecs int64, maxHourlySeries, maxDailySer // Load data tablePath := path + "/data" - tb, err := openTable(tablePath, s.getDeletedMetricIDs, retentionMsecs) + tb, err := openTable(tablePath, s.getDeletedMetricIDs, retentionMsecs, &s.isReadOnly) if err != nil { s.idb().MustClose() return nil, fmt.Errorf("cannot open table at %q: %w", tablePath, err) @@ -737,7 +737,7 @@ func (s *Storage) mustRotateIndexDB() { newTableName := nextIndexDBTableName() idbNewPath := s.path + "/indexdb/" + newTableName rotationTimestamp := fasttime.UnixTimestamp() - idbNew, err := openIndexDB(idbNewPath, s, rotationTimestamp) + idbNew, err := openIndexDB(idbNewPath, s, rotationTimestamp, &s.isReadOnly) if err != nil { logger.Panicf("FATAL: cannot create new indexDB at %q: %s", idbNewPath, err) } @@ -2681,12 +2681,12 @@ func (s *Storage) openIndexDBTables(path string) (curr, prev *indexDB, err error // Open the last two tables. currPath := path + "/" + tableNames[len(tableNames)-1] - curr, err = openIndexDB(currPath, s, 0) + curr, err = openIndexDB(currPath, s, 0, &s.isReadOnly) if err != nil { return nil, nil, fmt.Errorf("cannot open curr indexdb table at %q: %w", currPath, err) } prevPath := path + "/" + tableNames[len(tableNames)-2] - prev, err = openIndexDB(prevPath, s, 0) + prev, err = openIndexDB(prevPath, s, 0, &s.isReadOnly) if err != nil { curr.MustClose() return nil, nil, fmt.Errorf("cannot open prev indexdb table at %q: %w", prevPath, err) diff --git a/lib/storage/table.go b/lib/storage/table.go index c75d548b3..2d691ce07 100644 --- a/lib/storage/table.go +++ b/lib/storage/table.go @@ -23,6 +23,7 @@ type table struct { getDeletedMetricIDs func() *uint64set.Set retentionMsecs int64 + isReadOnly *uint32 ptws []*partitionWrapper ptwsLock sync.Mutex @@ -83,7 +84,7 @@ func (ptw *partitionWrapper) scheduleToDrop() { // The table is created if it doesn't exist. // // Data older than the retentionMsecs may be dropped at any time. -func openTable(path string, getDeletedMetricIDs func() *uint64set.Set, retentionMsecs int64) (*table, error) { +func openTable(path string, getDeletedMetricIDs func() *uint64set.Set, retentionMsecs int64, isReadOnly *uint32) (*table, error) { path = filepath.Clean(path) // Create a directory for the table if it doesn't exist yet. @@ -116,7 +117,7 @@ func openTable(path string, getDeletedMetricIDs func() *uint64set.Set, retention } // Open partitions. - pts, err := openPartitions(smallPartitionsPath, bigPartitionsPath, getDeletedMetricIDs, retentionMsecs) + pts, err := openPartitions(smallPartitionsPath, bigPartitionsPath, getDeletedMetricIDs, retentionMsecs, isReadOnly) if err != nil { return nil, fmt.Errorf("cannot open partitions in the table %q: %w", path, err) } @@ -127,6 +128,7 @@ func openTable(path string, getDeletedMetricIDs func() *uint64set.Set, retention bigPartitionsPath: bigPartitionsPath, getDeletedMetricIDs: getDeletedMetricIDs, retentionMsecs: retentionMsecs, + isReadOnly: isReadOnly, flockF: flockF, @@ -359,7 +361,7 @@ func (tb *table) AddRows(rows []rawRow) error { continue } - pt, err := createPartition(r.Timestamp, tb.smallPartitionsPath, tb.bigPartitionsPath, tb.getDeletedMetricIDs, tb.retentionMsecs) + pt, err := createPartition(r.Timestamp, tb.smallPartitionsPath, tb.bigPartitionsPath, tb.getDeletedMetricIDs, tb.retentionMsecs, tb.isReadOnly) if err != nil { // Return only the first error, since it has no sense in returning all errors. tb.ptwsLock.Unlock() @@ -497,7 +499,7 @@ func (tb *table) PutPartitions(ptws []*partitionWrapper) { } } -func openPartitions(smallPartitionsPath, bigPartitionsPath string, getDeletedMetricIDs func() *uint64set.Set, retentionMsecs int64) ([]*partition, error) { +func openPartitions(smallPartitionsPath, bigPartitionsPath string, getDeletedMetricIDs func() *uint64set.Set, retentionMsecs int64, isReadOnly *uint32) ([]*partition, error) { // Certain partition directories in either `big` or `small` dir may be missing // after restoring from backup. So populate partition names from both dirs. ptNames := make(map[string]bool) @@ -511,7 +513,7 @@ func openPartitions(smallPartitionsPath, bigPartitionsPath string, getDeletedMet for ptName := range ptNames { smallPartsPath := smallPartitionsPath + "/" + ptName bigPartsPath := bigPartitionsPath + "/" + ptName - pt, err := openPartition(smallPartsPath, bigPartsPath, getDeletedMetricIDs, retentionMsecs) + pt, err := openPartition(smallPartsPath, bigPartsPath, getDeletedMetricIDs, retentionMsecs, isReadOnly) if err != nil { mustClosePartitions(pts) return nil, fmt.Errorf("cannot open partition %q: %w", ptName, err) diff --git a/lib/storage/table_search_test.go b/lib/storage/table_search_test.go index cda14c8b8..bd0889df9 100644 --- a/lib/storage/table_search_test.go +++ b/lib/storage/table_search_test.go @@ -181,7 +181,8 @@ func testTableSearchEx(t *testing.T, trData, trSearch TimeRange, partitionsCount }) // Create a table from rowss and test search on it. - tb, err := openTable("./test-table", nilGetDeletedMetricIDs, maxRetentionMsecs) + var isReadOnly uint32 + tb, err := openTable("./test-table", nilGetDeletedMetricIDs, maxRetentionMsecs, &isReadOnly) if err != nil { t.Fatalf("cannot create table: %s", err) } @@ -202,7 +203,7 @@ func testTableSearchEx(t *testing.T, trData, trSearch TimeRange, partitionsCount tb.MustClose() // Open the created table and test search on it. - tb, err = openTable("./test-table", nilGetDeletedMetricIDs, maxRetentionMsecs) + tb, err = openTable("./test-table", nilGetDeletedMetricIDs, maxRetentionMsecs, &isReadOnly) if err != nil { t.Fatalf("cannot open table: %s", err) } diff --git a/lib/storage/table_search_timing_test.go b/lib/storage/table_search_timing_test.go index cbe939d4b..193ea1525 100644 --- a/lib/storage/table_search_timing_test.go +++ b/lib/storage/table_search_timing_test.go @@ -48,7 +48,8 @@ func openBenchTable(b *testing.B, startTimestamp int64, rowsPerInsert, rowsCount createBenchTable(b, path, startTimestamp, rowsPerInsert, rowsCount, tsidsCount) createdBenchTables[path] = true } - tb, err := openTable(path, nilGetDeletedMetricIDs, maxRetentionMsecs) + var isReadOnly uint32 + tb, err := openTable(path, nilGetDeletedMetricIDs, maxRetentionMsecs, &isReadOnly) if err != nil { b.Fatalf("cnanot open table %q: %s", path, err) } @@ -71,7 +72,8 @@ var createdBenchTables = make(map[string]bool) func createBenchTable(b *testing.B, path string, startTimestamp int64, rowsPerInsert, rowsCount, tsidsCount int) { b.Helper() - tb, err := openTable(path, nilGetDeletedMetricIDs, maxRetentionMsecs) + var isReadOnly uint32 + tb, err := openTable(path, nilGetDeletedMetricIDs, maxRetentionMsecs, &isReadOnly) if err != nil { b.Fatalf("cannot open table %q: %s", path, err) } diff --git a/lib/storage/table_test.go b/lib/storage/table_test.go index 68a76dbc9..1d1ce4cd5 100644 --- a/lib/storage/table_test.go +++ b/lib/storage/table_test.go @@ -17,7 +17,8 @@ func TestTableOpenClose(t *testing.T) { }() // Create a new table - tb, err := openTable(path, nilGetDeletedMetricIDs, retentionMsecs) + var isReadOnly uint32 + tb, err := openTable(path, nilGetDeletedMetricIDs, retentionMsecs, &isReadOnly) if err != nil { t.Fatalf("cannot create new table: %s", err) } @@ -27,7 +28,7 @@ func TestTableOpenClose(t *testing.T) { // Re-open created table multiple times. for i := 0; i < 10; i++ { - tb, err := openTable(path, nilGetDeletedMetricIDs, retentionMsecs) + tb, err := openTable(path, nilGetDeletedMetricIDs, retentionMsecs, &isReadOnly) if err != nil { t.Fatalf("cannot open created table: %s", err) } @@ -43,14 +44,15 @@ func TestTableOpenMultipleTimes(t *testing.T) { _ = os.RemoveAll(path) }() - tb1, err := openTable(path, nilGetDeletedMetricIDs, retentionMsecs) + var isReadOnly uint32 + tb1, err := openTable(path, nilGetDeletedMetricIDs, retentionMsecs, &isReadOnly) if err != nil { t.Fatalf("cannot open table the first time: %s", err) } defer tb1.MustClose() for i := 0; i < 10; i++ { - tb2, err := openTable(path, nilGetDeletedMetricIDs, retentionMsecs) + tb2, err := openTable(path, nilGetDeletedMetricIDs, retentionMsecs, &isReadOnly) if err == nil { tb2.MustClose() t.Fatalf("expecting non-nil error when opening already opened table") diff --git a/lib/storage/table_timing_test.go b/lib/storage/table_timing_test.go index bd88dd247..8125d83ce 100644 --- a/lib/storage/table_timing_test.go +++ b/lib/storage/table_timing_test.go @@ -46,7 +46,8 @@ func benchmarkTableAddRows(b *testing.B, rowsPerInsert, tsidsCount int) { b.SetBytes(int64(rowsCountExpected)) tablePath := "./benchmarkTableAddRows" for i := 0; i < b.N; i++ { - tb, err := openTable(tablePath, nilGetDeletedMetricIDs, maxRetentionMsecs) + var isReadOnly uint32 + tb, err := openTable(tablePath, nilGetDeletedMetricIDs, maxRetentionMsecs, &isReadOnly) if err != nil { b.Fatalf("cannot open table %q: %s", tablePath, err) } @@ -94,7 +95,7 @@ func benchmarkTableAddRows(b *testing.B, rowsPerInsert, tsidsCount int) { tb.MustClose() // Open the table from files and verify the rows count on it - tb, err = openTable(tablePath, nilGetDeletedMetricIDs, maxRetentionMsecs) + tb, err = openTable(tablePath, nilGetDeletedMetricIDs, maxRetentionMsecs, &isReadOnly) if err != nil { b.Fatalf("cannot open table %q: %s", tablePath, err) }