lib/storage: stop background merge when storage enters read-only mode

This should prevent from `no space left on device` errors when VictoriaMetrics
under-estimates the additional disk space needed for background merge.

See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2603
This commit is contained in:
Aliaksandr Valialkin 2022-06-01 14:21:12 +03:00
parent f133756f02
commit fedfc9e686
No known key found for this signature in database
GPG key ID: A72BEC6CD3D0DED1
16 changed files with 113 additions and 54 deletions

View file

@ -23,6 +23,7 @@ The following tip changes can be tested by building VictoriaMetrics components f
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add `-promscrape.suppressScrapeErrorsDelay` command-line flag, which can be used for delaying and aggregating the logging of per-target scrape errors. This may reduce the amounts of logs when `vmagent` scrapes many unreliable targets. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2575). Thanks to @jelmd for [the initial implementation](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/2576). * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add `-promscrape.suppressScrapeErrorsDelay` command-line flag, which can be used for delaying and aggregating the logging of per-target scrape errors. This may reduce the amounts of logs when `vmagent` scrapes many unreliable targets. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2575). Thanks to @jelmd for [the initial implementation](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/2576).
* BUGFIX: [vmalert](https://docs.victoriametrics.com/vmalert.html): properly apply `alert_relabel_configs` relabeling rules to `-notifier.config` according to [these docs](https://docs.victoriametrics.com/vmalert.html#notifier-configuration-file). Thanks to @spectvtor for [the bugfix](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/2633). * BUGFIX: [vmalert](https://docs.victoriametrics.com/vmalert.html): properly apply `alert_relabel_configs` relabeling rules to `-notifier.config` according to [these docs](https://docs.victoriametrics.com/vmalert.html#notifier-configuration-file). Thanks to @spectvtor for [the bugfix](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/2633).
* BUGFIX: deny [background merge](https://valyala.medium.com/how-victoriametrics-makes-instant-snapshots-for-multi-terabyte-time-series-data-e1f3fb0e0282) when the storage enters read-only mode, e.g. when free disk space becomes lower than `-storage.minFreeDiskSpaceBytes`. Background merge needs additional disk space, so it could result in `no space left on device` errors. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2603).
## [v1.77.2](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.77.2) ## [v1.77.2](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.77.2)

View file

@ -91,6 +91,7 @@ type Table struct {
needFlushCallbackCall uint32 needFlushCallbackCall uint32
prepareBlock PrepareBlockCallback prepareBlock PrepareBlockCallback
isReadOnly *uint32
partsLock sync.Mutex partsLock sync.Mutex
parts []*partWrapper parts []*partWrapper
@ -254,7 +255,7 @@ func (pw *partWrapper) decRef() {
// to persistent storage. // to persistent storage.
// //
// The table is created if it doesn't exist yet. // The table is created if it doesn't exist yet.
func OpenTable(path string, flushCallback func(), prepareBlock PrepareBlockCallback) (*Table, error) { func OpenTable(path string, flushCallback func(), prepareBlock PrepareBlockCallback, isReadOnly *uint32) (*Table, error) {
path = filepath.Clean(path) path = filepath.Clean(path)
logger.Infof("opening table %q...", path) logger.Infof("opening table %q...", path)
startTime := time.Now() startTime := time.Now()
@ -280,6 +281,7 @@ func OpenTable(path string, flushCallback func(), prepareBlock PrepareBlockCallb
path: path, path: path,
flushCallback: flushCallback, flushCallback: flushCallback,
prepareBlock: prepareBlock, prepareBlock: prepareBlock,
isReadOnly: isReadOnly,
parts: pws, parts: pws,
mergeIdx: uint64(time.Now().UnixNano()), mergeIdx: uint64(time.Now().UnixNano()),
flockF: flockF, flockF: flockF,
@ -799,7 +801,17 @@ func (tb *Table) startPartMergers() {
} }
} }
func (tb *Table) canBackgroundMerge() bool {
return atomic.LoadUint32(tb.isReadOnly) == 0
}
func (tb *Table) mergeExistingParts(isFinal bool) error { func (tb *Table) mergeExistingParts(isFinal bool) error {
if !tb.canBackgroundMerge() {
// Do not perform background merge in read-only mode
// in order to prevent from disk space shortage.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2603
return nil
}
n := fs.MustGetFreeSpace(tb.path) n := fs.MustGetFreeSpace(tb.path)
// Divide free space by the max number of concurrent merges. // Divide free space by the max number of concurrent merges.
maxOutBytes := n / uint64(mergeWorkersCount) maxOutBytes := n / uint64(mergeWorkersCount)

View file

@ -40,7 +40,8 @@ func TestTableSearchSerial(t *testing.T) {
func() { func() {
// Re-open the table and verify the search works. // Re-open the table and verify the search works.
tb, err := OpenTable(path, nil, nil) var isReadOnly uint32
tb, err := OpenTable(path, nil, nil, &isReadOnly)
if err != nil { if err != nil {
t.Fatalf("cannot open table: %s", err) t.Fatalf("cannot open table: %s", err)
} }
@ -75,7 +76,8 @@ func TestTableSearchConcurrent(t *testing.T) {
// Re-open the table and verify the search works. // Re-open the table and verify the search works.
func() { func() {
tb, err := OpenTable(path, nil, nil) var isReadOnly uint32
tb, err := OpenTable(path, nil, nil, &isReadOnly)
if err != nil { if err != nil {
t.Fatalf("cannot open table: %s", err) t.Fatalf("cannot open table: %s", err)
} }
@ -151,7 +153,8 @@ func newTestTable(path string, itemsCount int) (*Table, []string, error) {
flushCallback := func() { flushCallback := func() {
atomic.AddUint64(&flushes, 1) atomic.AddUint64(&flushes, 1)
} }
tb, err := OpenTable(path, flushCallback, nil) var isReadOnly uint32
tb, err := OpenTable(path, flushCallback, nil, &isReadOnly)
if err != nil { if err != nil {
return nil, nil, fmt.Errorf("cannot open table: %w", err) return nil, nil, fmt.Errorf("cannot open table: %w", err)
} }

View file

@ -32,7 +32,8 @@ func benchmarkTableSearch(b *testing.B, itemsCount int) {
// Force finishing pending merges // Force finishing pending merges
tb.MustClose() tb.MustClose()
tb, err = OpenTable(path, nil, nil) var isReadOnly uint32
tb, err = OpenTable(path, nil, nil, &isReadOnly)
if err != nil { if err != nil {
b.Fatalf("unexpected error when re-opening table %q: %s", path, err) b.Fatalf("unexpected error when re-opening table %q: %s", path, err)
} }

View file

@ -21,7 +21,8 @@ func TestTableOpenClose(t *testing.T) {
}() }()
// Create a new table // Create a new table
tb, err := OpenTable(path, nil, nil) var isReadOnly uint32
tb, err := OpenTable(path, nil, nil, &isReadOnly)
if err != nil { if err != nil {
t.Fatalf("cannot create new table: %s", err) t.Fatalf("cannot create new table: %s", err)
} }
@ -31,7 +32,7 @@ func TestTableOpenClose(t *testing.T) {
// Re-open created table multiple times. // Re-open created table multiple times.
for i := 0; i < 10; i++ { for i := 0; i < 10; i++ {
tb, err := OpenTable(path, nil, nil) tb, err := OpenTable(path, nil, nil, &isReadOnly)
if err != nil { if err != nil {
t.Fatalf("cannot open created table: %s", err) t.Fatalf("cannot open created table: %s", err)
} }
@ -45,14 +46,15 @@ func TestTableOpenMultipleTimes(t *testing.T) {
_ = os.RemoveAll(path) _ = os.RemoveAll(path)
}() }()
tb1, err := OpenTable(path, nil, nil) var isReadOnly uint32
tb1, err := OpenTable(path, nil, nil, &isReadOnly)
if err != nil { if err != nil {
t.Fatalf("cannot open table: %s", err) t.Fatalf("cannot open table: %s", err)
} }
defer tb1.MustClose() defer tb1.MustClose()
for i := 0; i < 10; i++ { for i := 0; i < 10; i++ {
tb2, err := OpenTable(path, nil, nil) tb2, err := OpenTable(path, nil, nil, &isReadOnly)
if err == nil { if err == nil {
tb2.MustClose() tb2.MustClose()
t.Fatalf("expecting non-nil error when opening already opened table") t.Fatalf("expecting non-nil error when opening already opened table")
@ -73,7 +75,8 @@ func TestTableAddItemSerial(t *testing.T) {
flushCallback := func() { flushCallback := func() {
atomic.AddUint64(&flushes, 1) atomic.AddUint64(&flushes, 1)
} }
tb, err := OpenTable(path, flushCallback, nil) var isReadOnly uint32
tb, err := OpenTable(path, flushCallback, nil, &isReadOnly)
if err != nil { if err != nil {
t.Fatalf("cannot open %q: %s", path, err) t.Fatalf("cannot open %q: %s", path, err)
} }
@ -99,7 +102,7 @@ func TestTableAddItemSerial(t *testing.T) {
testReopenTable(t, path, itemsCount) testReopenTable(t, path, itemsCount)
// Add more items in order to verify merge between inmemory parts and file-based parts. // Add more items in order to verify merge between inmemory parts and file-based parts.
tb, err = OpenTable(path, nil, nil) tb, err = OpenTable(path, nil, nil, &isReadOnly)
if err != nil { if err != nil {
t.Fatalf("cannot open %q: %s", path, err) t.Fatalf("cannot open %q: %s", path, err)
} }
@ -132,7 +135,8 @@ func TestTableCreateSnapshotAt(t *testing.T) {
_ = os.RemoveAll(path) _ = os.RemoveAll(path)
}() }()
tb, err := OpenTable(path, nil, nil) var isReadOnly uint32
tb, err := OpenTable(path, nil, nil, &isReadOnly)
if err != nil { if err != nil {
t.Fatalf("cannot open %q: %s", path, err) t.Fatalf("cannot open %q: %s", path, err)
} }
@ -163,13 +167,13 @@ func TestTableCreateSnapshotAt(t *testing.T) {
}() }()
// Verify snapshots contain all the data. // Verify snapshots contain all the data.
tb1, err := OpenTable(snapshot1, nil, nil) tb1, err := OpenTable(snapshot1, nil, nil, &isReadOnly)
if err != nil { if err != nil {
t.Fatalf("cannot open %q: %s", path, err) t.Fatalf("cannot open %q: %s", path, err)
} }
defer tb1.MustClose() defer tb1.MustClose()
tb2, err := OpenTable(snapshot2, nil, nil) tb2, err := OpenTable(snapshot2, nil, nil, &isReadOnly)
if err != nil { if err != nil {
t.Fatalf("cannot open %q: %s", path, err) t.Fatalf("cannot open %q: %s", path, err)
} }
@ -222,7 +226,8 @@ func TestTableAddItemsConcurrent(t *testing.T) {
atomic.AddUint64(&itemsMerged, uint64(len(items))) atomic.AddUint64(&itemsMerged, uint64(len(items)))
return data, items return data, items
} }
tb, err := OpenTable(path, flushCallback, prepareBlock) var isReadOnly uint32
tb, err := OpenTable(path, flushCallback, prepareBlock, &isReadOnly)
if err != nil { if err != nil {
t.Fatalf("cannot open %q: %s", path, err) t.Fatalf("cannot open %q: %s", path, err)
} }
@ -252,7 +257,7 @@ func TestTableAddItemsConcurrent(t *testing.T) {
testReopenTable(t, path, itemsCount) testReopenTable(t, path, itemsCount)
// Add more items in order to verify merge between inmemory parts and file-based parts. // Add more items in order to verify merge between inmemory parts and file-based parts.
tb, err = OpenTable(path, nil, nil) tb, err = OpenTable(path, nil, nil, &isReadOnly)
if err != nil { if err != nil {
t.Fatalf("cannot open %q: %s", path, err) t.Fatalf("cannot open %q: %s", path, err)
} }
@ -294,7 +299,8 @@ func testReopenTable(t *testing.T, path string, itemsCount int) {
t.Helper() t.Helper()
for i := 0; i < 10; i++ { for i := 0; i < 10; i++ {
tb, err := OpenTable(path, nil, nil) var isReadOnly uint32
tb, err := OpenTable(path, nil, nil, &isReadOnly)
if err != nil { if err != nil {
t.Fatalf("cannot re-open %q: %s", path, err) t.Fatalf("cannot re-open %q: %s", path, err)
} }

View file

@ -115,9 +115,9 @@ type indexDB struct {
// The last segment of the path should contain unique hex value which // The last segment of the path should contain unique hex value which
// will be then used as indexDB.generation // will be then used as indexDB.generation
// //
// The rotationTimestamp must be set to the current unix timestamp when ipenIndexDB // The rotationTimestamp must be set to the current unix timestamp when openIndexDB
// is called when creating new indexdb during indexdb rotation. // is called when creating new indexdb during indexdb rotation.
func openIndexDB(path string, s *Storage, rotationTimestamp uint64) (*indexDB, error) { func openIndexDB(path string, s *Storage, rotationTimestamp uint64, isReadOnly *uint32) (*indexDB, error) {
if s == nil { if s == nil {
logger.Panicf("BUG: Storage must be nin-nil") logger.Panicf("BUG: Storage must be nin-nil")
} }
@ -128,7 +128,7 @@ func openIndexDB(path string, s *Storage, rotationTimestamp uint64) (*indexDB, e
return nil, fmt.Errorf("failed to parse indexdb path %q: %w", path, err) return nil, fmt.Errorf("failed to parse indexdb path %q: %w", path, err)
} }
tb, err := mergeset.OpenTable(path, invalidateTagFiltersCache, mergeTagToMetricIDsRows) tb, err := mergeset.OpenTable(path, invalidateTagFiltersCache, mergeTagToMetricIDsRows, isReadOnly)
if err != nil { if err != nil {
return nil, fmt.Errorf("cannot open indexDB %q: %w", path, err) return nil, fmt.Errorf("cannot open indexDB %q: %w", path, err)
} }

View file

@ -477,7 +477,8 @@ func TestIndexDBOpenClose(t *testing.T) {
defer stopTestStorage(s) defer stopTestStorage(s)
tableName := nextIndexDBTableName() tableName := nextIndexDBTableName()
for i := 0; i < 5; i++ { for i := 0; i < 5; i++ {
db, err := openIndexDB(tableName, s, 0) var isReadOnly uint32
db, err := openIndexDB(tableName, s, 0, &isReadOnly)
if err != nil { if err != nil {
t.Fatalf("cannot open indexDB: %s", err) t.Fatalf("cannot open indexDB: %s", err)
} }
@ -498,7 +499,8 @@ func TestIndexDB(t *testing.T) {
defer stopTestStorage(s) defer stopTestStorage(s)
dbName := nextIndexDBTableName() dbName := nextIndexDBTableName()
db, err := openIndexDB(dbName, s, 0) var isReadOnly uint32
db, err := openIndexDB(dbName, s, 0, &isReadOnly)
if err != nil { if err != nil {
t.Fatalf("cannot open indexDB: %s", err) t.Fatalf("cannot open indexDB: %s", err)
} }
@ -528,7 +530,7 @@ func TestIndexDB(t *testing.T) {
// Re-open the db and verify it works as expected. // Re-open the db and verify it works as expected.
db.MustClose() db.MustClose()
db, err = openIndexDB(dbName, s, 0) db, err = openIndexDB(dbName, s, 0, &isReadOnly)
if err != nil { if err != nil {
t.Fatalf("cannot open indexDB: %s", err) t.Fatalf("cannot open indexDB: %s", err)
} }
@ -548,7 +550,8 @@ func TestIndexDB(t *testing.T) {
defer stopTestStorage(s) defer stopTestStorage(s)
dbName := nextIndexDBTableName() dbName := nextIndexDBTableName()
db, err := openIndexDB(dbName, s, 0) var isReadOnly uint32
db, err := openIndexDB(dbName, s, 0, &isReadOnly)
if err != nil { if err != nil {
t.Fatalf("cannot open indexDB: %s", err) t.Fatalf("cannot open indexDB: %s", err)
} }
@ -1677,7 +1680,8 @@ func TestSearchTSIDWithTimeRange(t *testing.T) {
defer stopTestStorage(s) defer stopTestStorage(s)
dbName := nextIndexDBTableName() dbName := nextIndexDBTableName()
db, err := openIndexDB(dbName, s, 0) var isReadOnly uint32
db, err := openIndexDB(dbName, s, 0, &isReadOnly)
if err != nil { if err != nil {
t.Fatalf("cannot open indexDB: %s", err) t.Fatalf("cannot open indexDB: %s", err)
} }

View file

@ -45,7 +45,8 @@ func BenchmarkIndexDBAddTSIDs(b *testing.B) {
defer stopTestStorage(s) defer stopTestStorage(s)
dbName := nextIndexDBTableName() dbName := nextIndexDBTableName()
db, err := openIndexDB(dbName, s, 0) var isReadOnly uint32
db, err := openIndexDB(dbName, s, 0, &isReadOnly)
if err != nil { if err != nil {
b.Fatalf("cannot open indexDB: %s", err) b.Fatalf("cannot open indexDB: %s", err)
} }
@ -109,7 +110,8 @@ func BenchmarkHeadPostingForMatchers(b *testing.B) {
defer stopTestStorage(s) defer stopTestStorage(s)
dbName := nextIndexDBTableName() dbName := nextIndexDBTableName()
db, err := openIndexDB(dbName, s, 0) var isReadOnly uint32
db, err := openIndexDB(dbName, s, 0, &isReadOnly)
if err != nil { if err != nil {
b.Fatalf("cannot open indexDB: %s", err) b.Fatalf("cannot open indexDB: %s", err)
} }
@ -288,7 +290,8 @@ func BenchmarkIndexDBGetTSIDs(b *testing.B) {
defer stopTestStorage(s) defer stopTestStorage(s)
dbName := nextIndexDBTableName() dbName := nextIndexDBTableName()
db, err := openIndexDB(dbName, s, 0) var isReadOnly uint32
db, err := openIndexDB(dbName, s, 0, &isReadOnly)
if err != nil { if err != nil {
b.Fatalf("cannot open indexDB: %s", err) b.Fatalf("cannot open indexDB: %s", err)
} }

View file

@ -126,6 +126,10 @@ type partition struct {
// Used for deleting data outside the retention during background merge. // Used for deleting data outside the retention during background merge.
retentionMsecs int64 retentionMsecs int64
// Whether the storage is in read-only mode.
// Background merge is stopped in read-only mode.
isReadOnly *uint32
// Name is the name of the partition in the form YYYY_MM. // Name is the name of the partition in the form YYYY_MM.
name string name string
@ -199,7 +203,8 @@ func (pw *partWrapper) decRef() {
// createPartition creates new partition for the given timestamp and the given paths // createPartition creates new partition for the given timestamp and the given paths
// to small and big partitions. // to small and big partitions.
func createPartition(timestamp int64, smallPartitionsPath, bigPartitionsPath string, getDeletedMetricIDs func() *uint64set.Set, retentionMsecs int64) (*partition, error) { func createPartition(timestamp int64, smallPartitionsPath, bigPartitionsPath string,
getDeletedMetricIDs func() *uint64set.Set, retentionMsecs int64, isReadOnly *uint32) (*partition, error) {
name := timestampToPartitionName(timestamp) name := timestampToPartitionName(timestamp)
smallPartsPath := filepath.Clean(smallPartitionsPath) + "/" + name smallPartsPath := filepath.Clean(smallPartitionsPath) + "/" + name
bigPartsPath := filepath.Clean(bigPartitionsPath) + "/" + name bigPartsPath := filepath.Clean(bigPartitionsPath) + "/" + name
@ -212,7 +217,7 @@ func createPartition(timestamp int64, smallPartitionsPath, bigPartitionsPath str
return nil, fmt.Errorf("cannot create directories for big parts %q: %w", bigPartsPath, err) return nil, fmt.Errorf("cannot create directories for big parts %q: %w", bigPartsPath, err)
} }
pt := newPartition(name, smallPartsPath, bigPartsPath, getDeletedMetricIDs, retentionMsecs) pt := newPartition(name, smallPartsPath, bigPartsPath, getDeletedMetricIDs, retentionMsecs, isReadOnly)
pt.tr.fromPartitionTimestamp(timestamp) pt.tr.fromPartitionTimestamp(timestamp)
pt.startMergeWorkers() pt.startMergeWorkers()
pt.startRawRowsFlusher() pt.startRawRowsFlusher()
@ -238,7 +243,7 @@ func (pt *partition) Drop() {
} }
// openPartition opens the existing partition from the given paths. // openPartition opens the existing partition from the given paths.
func openPartition(smallPartsPath, bigPartsPath string, getDeletedMetricIDs func() *uint64set.Set, retentionMsecs int64) (*partition, error) { func openPartition(smallPartsPath, bigPartsPath string, getDeletedMetricIDs func() *uint64set.Set, retentionMsecs int64, isReadOnly *uint32) (*partition, error) {
smallPartsPath = filepath.Clean(smallPartsPath) smallPartsPath = filepath.Clean(smallPartsPath)
bigPartsPath = filepath.Clean(bigPartsPath) bigPartsPath = filepath.Clean(bigPartsPath)
@ -262,7 +267,7 @@ func openPartition(smallPartsPath, bigPartsPath string, getDeletedMetricIDs func
return nil, fmt.Errorf("cannot open big parts from %q: %w", bigPartsPath, err) return nil, fmt.Errorf("cannot open big parts from %q: %w", bigPartsPath, err)
} }
pt := newPartition(name, smallPartsPath, bigPartsPath, getDeletedMetricIDs, retentionMsecs) pt := newPartition(name, smallPartsPath, bigPartsPath, getDeletedMetricIDs, retentionMsecs, isReadOnly)
pt.smallParts = smallParts pt.smallParts = smallParts
pt.bigParts = bigParts pt.bigParts = bigParts
if err := pt.tr.fromPartitionName(name); err != nil { if err := pt.tr.fromPartitionName(name); err != nil {
@ -276,7 +281,7 @@ func openPartition(smallPartsPath, bigPartsPath string, getDeletedMetricIDs func
return pt, nil return pt, nil
} }
func newPartition(name, smallPartsPath, bigPartsPath string, getDeletedMetricIDs func() *uint64set.Set, retentionMsecs int64) *partition { func newPartition(name, smallPartsPath, bigPartsPath string, getDeletedMetricIDs func() *uint64set.Set, retentionMsecs int64, isReadOnly *uint32) *partition {
p := &partition{ p := &partition{
name: name, name: name,
smallPartsPath: smallPartsPath, smallPartsPath: smallPartsPath,
@ -284,6 +289,7 @@ func newPartition(name, smallPartsPath, bigPartsPath string, getDeletedMetricIDs
getDeletedMetricIDs: getDeletedMetricIDs, getDeletedMetricIDs: getDeletedMetricIDs,
retentionMsecs: retentionMsecs, retentionMsecs: retentionMsecs,
isReadOnly: isReadOnly,
mergeIdx: uint64(time.Now().UnixNano()), mergeIdx: uint64(time.Now().UnixNano()),
stopCh: make(chan struct{}), stopCh: make(chan struct{}),
@ -993,7 +999,16 @@ func getMaxOutBytes(path string, workersCount int) uint64 {
return maxOutBytes return maxOutBytes
} }
func (pt *partition) canBackgroundMerge() bool {
return atomic.LoadUint32(pt.isReadOnly) == 0
}
func (pt *partition) mergeBigParts(isFinal bool) error { func (pt *partition) mergeBigParts(isFinal bool) error {
if !pt.canBackgroundMerge() {
// Do not perform merge in read-only mode, since this may result in disk space shortage.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2603
return nil
}
maxOutBytes := getMaxOutBytes(pt.bigPartsPath, bigMergeWorkersCount) maxOutBytes := getMaxOutBytes(pt.bigPartsPath, bigMergeWorkersCount)
pt.partsLock.Lock() pt.partsLock.Lock()
@ -1005,6 +1020,11 @@ func (pt *partition) mergeBigParts(isFinal bool) error {
} }
func (pt *partition) mergeSmallParts(isFinal bool) error { func (pt *partition) mergeSmallParts(isFinal bool) error {
if !pt.canBackgroundMerge() {
// Do not perform merge in read-only mode, since this may result in disk space shortage.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2603
return nil
}
// Try merging small parts to a big part at first. // Try merging small parts to a big part at first.
maxBigPartOutBytes := getMaxOutBytes(pt.bigPartsPath, bigMergeWorkersCount) maxBigPartOutBytes := getMaxOutBytes(pt.bigPartsPath, bigMergeWorkersCount)
pt.partsLock.Lock() pt.partsLock.Lock()

View file

@ -168,7 +168,8 @@ func testPartitionSearchEx(t *testing.T, ptt int64, tr TimeRange, partsCount, ma
// Create partition from rowss and test search on it. // Create partition from rowss and test search on it.
retentionMsecs := timestampFromTime(time.Now()) - ptr.MinTimestamp + 3600*1000 retentionMsecs := timestampFromTime(time.Now()) - ptr.MinTimestamp + 3600*1000
pt, err := createPartition(ptt, "./small-table", "./big-table", nilGetDeletedMetricIDs, retentionMsecs) var isReadOnly uint32
pt, err := createPartition(ptt, "./small-table", "./big-table", nilGetDeletedMetricIDs, retentionMsecs, &isReadOnly)
if err != nil { if err != nil {
t.Fatalf("cannot create partition: %s", err) t.Fatalf("cannot create partition: %s", err)
} }
@ -192,7 +193,7 @@ func testPartitionSearchEx(t *testing.T, ptt int64, tr TimeRange, partsCount, ma
pt.MustClose() pt.MustClose()
// Open the created partition and test search on it. // Open the created partition and test search on it.
pt, err = openPartition(smallPartsPath, bigPartsPath, nilGetDeletedMetricIDs, retentionMsecs) pt, err = openPartition(smallPartsPath, bigPartsPath, nilGetDeletedMetricIDs, retentionMsecs, &isReadOnly)
if err != nil { if err != nil {
t.Fatalf("cannot open partition: %s", err) t.Fatalf("cannot open partition: %s", err)
} }

View file

@ -269,7 +269,7 @@ func OpenStorage(path string, retentionMsecs int64, maxHourlySeries, maxDailySer
// Load data // Load data
tablePath := path + "/data" tablePath := path + "/data"
tb, err := openTable(tablePath, s.getDeletedMetricIDs, retentionMsecs) tb, err := openTable(tablePath, s.getDeletedMetricIDs, retentionMsecs, &s.isReadOnly)
if err != nil { if err != nil {
s.idb().MustClose() s.idb().MustClose()
return nil, fmt.Errorf("cannot open table at %q: %w", tablePath, err) return nil, fmt.Errorf("cannot open table at %q: %w", tablePath, err)
@ -737,7 +737,7 @@ func (s *Storage) mustRotateIndexDB() {
newTableName := nextIndexDBTableName() newTableName := nextIndexDBTableName()
idbNewPath := s.path + "/indexdb/" + newTableName idbNewPath := s.path + "/indexdb/" + newTableName
rotationTimestamp := fasttime.UnixTimestamp() rotationTimestamp := fasttime.UnixTimestamp()
idbNew, err := openIndexDB(idbNewPath, s, rotationTimestamp) idbNew, err := openIndexDB(idbNewPath, s, rotationTimestamp, &s.isReadOnly)
if err != nil { if err != nil {
logger.Panicf("FATAL: cannot create new indexDB at %q: %s", idbNewPath, err) logger.Panicf("FATAL: cannot create new indexDB at %q: %s", idbNewPath, err)
} }
@ -2681,12 +2681,12 @@ func (s *Storage) openIndexDBTables(path string) (curr, prev *indexDB, err error
// Open the last two tables. // Open the last two tables.
currPath := path + "/" + tableNames[len(tableNames)-1] currPath := path + "/" + tableNames[len(tableNames)-1]
curr, err = openIndexDB(currPath, s, 0) curr, err = openIndexDB(currPath, s, 0, &s.isReadOnly)
if err != nil { if err != nil {
return nil, nil, fmt.Errorf("cannot open curr indexdb table at %q: %w", currPath, err) return nil, nil, fmt.Errorf("cannot open curr indexdb table at %q: %w", currPath, err)
} }
prevPath := path + "/" + tableNames[len(tableNames)-2] prevPath := path + "/" + tableNames[len(tableNames)-2]
prev, err = openIndexDB(prevPath, s, 0) prev, err = openIndexDB(prevPath, s, 0, &s.isReadOnly)
if err != nil { if err != nil {
curr.MustClose() curr.MustClose()
return nil, nil, fmt.Errorf("cannot open prev indexdb table at %q: %w", prevPath, err) return nil, nil, fmt.Errorf("cannot open prev indexdb table at %q: %w", prevPath, err)

View file

@ -23,6 +23,7 @@ type table struct {
getDeletedMetricIDs func() *uint64set.Set getDeletedMetricIDs func() *uint64set.Set
retentionMsecs int64 retentionMsecs int64
isReadOnly *uint32
ptws []*partitionWrapper ptws []*partitionWrapper
ptwsLock sync.Mutex ptwsLock sync.Mutex
@ -83,7 +84,7 @@ func (ptw *partitionWrapper) scheduleToDrop() {
// The table is created if it doesn't exist. // The table is created if it doesn't exist.
// //
// Data older than the retentionMsecs may be dropped at any time. // Data older than the retentionMsecs may be dropped at any time.
func openTable(path string, getDeletedMetricIDs func() *uint64set.Set, retentionMsecs int64) (*table, error) { func openTable(path string, getDeletedMetricIDs func() *uint64set.Set, retentionMsecs int64, isReadOnly *uint32) (*table, error) {
path = filepath.Clean(path) path = filepath.Clean(path)
// Create a directory for the table if it doesn't exist yet. // Create a directory for the table if it doesn't exist yet.
@ -116,7 +117,7 @@ func openTable(path string, getDeletedMetricIDs func() *uint64set.Set, retention
} }
// Open partitions. // Open partitions.
pts, err := openPartitions(smallPartitionsPath, bigPartitionsPath, getDeletedMetricIDs, retentionMsecs) pts, err := openPartitions(smallPartitionsPath, bigPartitionsPath, getDeletedMetricIDs, retentionMsecs, isReadOnly)
if err != nil { if err != nil {
return nil, fmt.Errorf("cannot open partitions in the table %q: %w", path, err) return nil, fmt.Errorf("cannot open partitions in the table %q: %w", path, err)
} }
@ -127,6 +128,7 @@ func openTable(path string, getDeletedMetricIDs func() *uint64set.Set, retention
bigPartitionsPath: bigPartitionsPath, bigPartitionsPath: bigPartitionsPath,
getDeletedMetricIDs: getDeletedMetricIDs, getDeletedMetricIDs: getDeletedMetricIDs,
retentionMsecs: retentionMsecs, retentionMsecs: retentionMsecs,
isReadOnly: isReadOnly,
flockF: flockF, flockF: flockF,
@ -359,7 +361,7 @@ func (tb *table) AddRows(rows []rawRow) error {
continue continue
} }
pt, err := createPartition(r.Timestamp, tb.smallPartitionsPath, tb.bigPartitionsPath, tb.getDeletedMetricIDs, tb.retentionMsecs) pt, err := createPartition(r.Timestamp, tb.smallPartitionsPath, tb.bigPartitionsPath, tb.getDeletedMetricIDs, tb.retentionMsecs, tb.isReadOnly)
if err != nil { if err != nil {
// Return only the first error, since it has no sense in returning all errors. // Return only the first error, since it has no sense in returning all errors.
tb.ptwsLock.Unlock() tb.ptwsLock.Unlock()
@ -497,7 +499,7 @@ func (tb *table) PutPartitions(ptws []*partitionWrapper) {
} }
} }
func openPartitions(smallPartitionsPath, bigPartitionsPath string, getDeletedMetricIDs func() *uint64set.Set, retentionMsecs int64) ([]*partition, error) { func openPartitions(smallPartitionsPath, bigPartitionsPath string, getDeletedMetricIDs func() *uint64set.Set, retentionMsecs int64, isReadOnly *uint32) ([]*partition, error) {
// Certain partition directories in either `big` or `small` dir may be missing // Certain partition directories in either `big` or `small` dir may be missing
// after restoring from backup. So populate partition names from both dirs. // after restoring from backup. So populate partition names from both dirs.
ptNames := make(map[string]bool) ptNames := make(map[string]bool)
@ -511,7 +513,7 @@ func openPartitions(smallPartitionsPath, bigPartitionsPath string, getDeletedMet
for ptName := range ptNames { for ptName := range ptNames {
smallPartsPath := smallPartitionsPath + "/" + ptName smallPartsPath := smallPartitionsPath + "/" + ptName
bigPartsPath := bigPartitionsPath + "/" + ptName bigPartsPath := bigPartitionsPath + "/" + ptName
pt, err := openPartition(smallPartsPath, bigPartsPath, getDeletedMetricIDs, retentionMsecs) pt, err := openPartition(smallPartsPath, bigPartsPath, getDeletedMetricIDs, retentionMsecs, isReadOnly)
if err != nil { if err != nil {
mustClosePartitions(pts) mustClosePartitions(pts)
return nil, fmt.Errorf("cannot open partition %q: %w", ptName, err) return nil, fmt.Errorf("cannot open partition %q: %w", ptName, err)

View file

@ -181,7 +181,8 @@ func testTableSearchEx(t *testing.T, trData, trSearch TimeRange, partitionsCount
}) })
// Create a table from rowss and test search on it. // Create a table from rowss and test search on it.
tb, err := openTable("./test-table", nilGetDeletedMetricIDs, maxRetentionMsecs) var isReadOnly uint32
tb, err := openTable("./test-table", nilGetDeletedMetricIDs, maxRetentionMsecs, &isReadOnly)
if err != nil { if err != nil {
t.Fatalf("cannot create table: %s", err) t.Fatalf("cannot create table: %s", err)
} }
@ -202,7 +203,7 @@ func testTableSearchEx(t *testing.T, trData, trSearch TimeRange, partitionsCount
tb.MustClose() tb.MustClose()
// Open the created table and test search on it. // Open the created table and test search on it.
tb, err = openTable("./test-table", nilGetDeletedMetricIDs, maxRetentionMsecs) tb, err = openTable("./test-table", nilGetDeletedMetricIDs, maxRetentionMsecs, &isReadOnly)
if err != nil { if err != nil {
t.Fatalf("cannot open table: %s", err) t.Fatalf("cannot open table: %s", err)
} }

View file

@ -48,7 +48,8 @@ func openBenchTable(b *testing.B, startTimestamp int64, rowsPerInsert, rowsCount
createBenchTable(b, path, startTimestamp, rowsPerInsert, rowsCount, tsidsCount) createBenchTable(b, path, startTimestamp, rowsPerInsert, rowsCount, tsidsCount)
createdBenchTables[path] = true createdBenchTables[path] = true
} }
tb, err := openTable(path, nilGetDeletedMetricIDs, maxRetentionMsecs) var isReadOnly uint32
tb, err := openTable(path, nilGetDeletedMetricIDs, maxRetentionMsecs, &isReadOnly)
if err != nil { if err != nil {
b.Fatalf("cnanot open table %q: %s", path, err) b.Fatalf("cnanot open table %q: %s", path, err)
} }
@ -71,7 +72,8 @@ var createdBenchTables = make(map[string]bool)
func createBenchTable(b *testing.B, path string, startTimestamp int64, rowsPerInsert, rowsCount, tsidsCount int) { func createBenchTable(b *testing.B, path string, startTimestamp int64, rowsPerInsert, rowsCount, tsidsCount int) {
b.Helper() b.Helper()
tb, err := openTable(path, nilGetDeletedMetricIDs, maxRetentionMsecs) var isReadOnly uint32
tb, err := openTable(path, nilGetDeletedMetricIDs, maxRetentionMsecs, &isReadOnly)
if err != nil { if err != nil {
b.Fatalf("cannot open table %q: %s", path, err) b.Fatalf("cannot open table %q: %s", path, err)
} }

View file

@ -17,7 +17,8 @@ func TestTableOpenClose(t *testing.T) {
}() }()
// Create a new table // Create a new table
tb, err := openTable(path, nilGetDeletedMetricIDs, retentionMsecs) var isReadOnly uint32
tb, err := openTable(path, nilGetDeletedMetricIDs, retentionMsecs, &isReadOnly)
if err != nil { if err != nil {
t.Fatalf("cannot create new table: %s", err) t.Fatalf("cannot create new table: %s", err)
} }
@ -27,7 +28,7 @@ func TestTableOpenClose(t *testing.T) {
// Re-open created table multiple times. // Re-open created table multiple times.
for i := 0; i < 10; i++ { for i := 0; i < 10; i++ {
tb, err := openTable(path, nilGetDeletedMetricIDs, retentionMsecs) tb, err := openTable(path, nilGetDeletedMetricIDs, retentionMsecs, &isReadOnly)
if err != nil { if err != nil {
t.Fatalf("cannot open created table: %s", err) t.Fatalf("cannot open created table: %s", err)
} }
@ -43,14 +44,15 @@ func TestTableOpenMultipleTimes(t *testing.T) {
_ = os.RemoveAll(path) _ = os.RemoveAll(path)
}() }()
tb1, err := openTable(path, nilGetDeletedMetricIDs, retentionMsecs) var isReadOnly uint32
tb1, err := openTable(path, nilGetDeletedMetricIDs, retentionMsecs, &isReadOnly)
if err != nil { if err != nil {
t.Fatalf("cannot open table the first time: %s", err) t.Fatalf("cannot open table the first time: %s", err)
} }
defer tb1.MustClose() defer tb1.MustClose()
for i := 0; i < 10; i++ { for i := 0; i < 10; i++ {
tb2, err := openTable(path, nilGetDeletedMetricIDs, retentionMsecs) tb2, err := openTable(path, nilGetDeletedMetricIDs, retentionMsecs, &isReadOnly)
if err == nil { if err == nil {
tb2.MustClose() tb2.MustClose()
t.Fatalf("expecting non-nil error when opening already opened table") t.Fatalf("expecting non-nil error when opening already opened table")

View file

@ -46,7 +46,8 @@ func benchmarkTableAddRows(b *testing.B, rowsPerInsert, tsidsCount int) {
b.SetBytes(int64(rowsCountExpected)) b.SetBytes(int64(rowsCountExpected))
tablePath := "./benchmarkTableAddRows" tablePath := "./benchmarkTableAddRows"
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
tb, err := openTable(tablePath, nilGetDeletedMetricIDs, maxRetentionMsecs) var isReadOnly uint32
tb, err := openTable(tablePath, nilGetDeletedMetricIDs, maxRetentionMsecs, &isReadOnly)
if err != nil { if err != nil {
b.Fatalf("cannot open table %q: %s", tablePath, err) b.Fatalf("cannot open table %q: %s", tablePath, err)
} }
@ -94,7 +95,7 @@ func benchmarkTableAddRows(b *testing.B, rowsPerInsert, tsidsCount int) {
tb.MustClose() tb.MustClose()
// Open the table from files and verify the rows count on it // Open the table from files and verify the rows count on it
tb, err = openTable(tablePath, nilGetDeletedMetricIDs, maxRetentionMsecs) tb, err = openTable(tablePath, nilGetDeletedMetricIDs, maxRetentionMsecs, &isReadOnly)
if err != nil { if err != nil {
b.Fatalf("cannot open table %q: %s", tablePath, err) b.Fatalf("cannot open table %q: %s", tablePath, err)
} }