mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
lib/storage: stop background merge when storage enters read-only mode
This should prevent from `no space left on device` errors when VictoriaMetrics under-estimates the additional disk space needed for background merge. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2603
This commit is contained in:
parent
642eb1c534
commit
ea06d2fd3c
16 changed files with 112 additions and 53 deletions
|
@ -25,6 +25,7 @@ The following tip changes can be tested by building VictoriaMetrics components f
|
|||
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add `-promscrape.suppressScrapeErrorsDelay` command-line flag, which can be used for delaying and aggregating the logging of per-target scrape errors. This may reduce the amounts of logs when `vmagent` scrapes many unreliable targets. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2575). Thanks to @jelmd for [the initial implementation](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/2576).
|
||||
|
||||
* BUGFIX: [vmalert](https://docs.victoriametrics.com/vmalert.html): properly apply `alert_relabel_configs` relabeling rules to `-notifier.config` according to [these docs](https://docs.victoriametrics.com/vmalert.html#notifier-configuration-file). Thanks to @spectvtor for [the bugfix](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/2633).
|
||||
* BUGFIX: deny [background merge](https://valyala.medium.com/how-victoriametrics-makes-instant-snapshots-for-multi-terabyte-time-series-data-e1f3fb0e0282) when the storage enters read-only mode, e.g. when free disk space becomes lower than `-storage.minFreeDiskSpaceBytes`. Background merge needs additional disk space, so it could result in `no space left on device` errors. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2603).
|
||||
|
||||
## [v1.77.2](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.77.2)
|
||||
|
||||
|
|
|
@ -91,6 +91,7 @@ type Table struct {
|
|||
needFlushCallbackCall uint32
|
||||
|
||||
prepareBlock PrepareBlockCallback
|
||||
isReadOnly *uint32
|
||||
|
||||
partsLock sync.Mutex
|
||||
parts []*partWrapper
|
||||
|
@ -254,7 +255,7 @@ func (pw *partWrapper) decRef() {
|
|||
// to persistent storage.
|
||||
//
|
||||
// The table is created if it doesn't exist yet.
|
||||
func OpenTable(path string, flushCallback func(), prepareBlock PrepareBlockCallback) (*Table, error) {
|
||||
func OpenTable(path string, flushCallback func(), prepareBlock PrepareBlockCallback, isReadOnly *uint32) (*Table, error) {
|
||||
path = filepath.Clean(path)
|
||||
logger.Infof("opening table %q...", path)
|
||||
startTime := time.Now()
|
||||
|
@ -280,6 +281,7 @@ func OpenTable(path string, flushCallback func(), prepareBlock PrepareBlockCallb
|
|||
path: path,
|
||||
flushCallback: flushCallback,
|
||||
prepareBlock: prepareBlock,
|
||||
isReadOnly: isReadOnly,
|
||||
parts: pws,
|
||||
mergeIdx: uint64(time.Now().UnixNano()),
|
||||
flockF: flockF,
|
||||
|
@ -799,7 +801,17 @@ func (tb *Table) startPartMergers() {
|
|||
}
|
||||
}
|
||||
|
||||
func (tb *Table) canBackgroundMerge() bool {
|
||||
return atomic.LoadUint32(tb.isReadOnly) == 0
|
||||
}
|
||||
|
||||
func (tb *Table) mergeExistingParts(isFinal bool) error {
|
||||
if !tb.canBackgroundMerge() {
|
||||
// Do not perform background merge in read-only mode
|
||||
// in order to prevent from disk space shortage.
|
||||
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2603
|
||||
return nil
|
||||
}
|
||||
n := fs.MustGetFreeSpace(tb.path)
|
||||
// Divide free space by the max number of concurrent merges.
|
||||
maxOutBytes := n / uint64(mergeWorkersCount)
|
||||
|
|
|
@ -40,7 +40,8 @@ func TestTableSearchSerial(t *testing.T) {
|
|||
|
||||
func() {
|
||||
// Re-open the table and verify the search works.
|
||||
tb, err := OpenTable(path, nil, nil)
|
||||
var isReadOnly uint32
|
||||
tb, err := OpenTable(path, nil, nil, &isReadOnly)
|
||||
if err != nil {
|
||||
t.Fatalf("cannot open table: %s", err)
|
||||
}
|
||||
|
@ -75,7 +76,8 @@ func TestTableSearchConcurrent(t *testing.T) {
|
|||
|
||||
// Re-open the table and verify the search works.
|
||||
func() {
|
||||
tb, err := OpenTable(path, nil, nil)
|
||||
var isReadOnly uint32
|
||||
tb, err := OpenTable(path, nil, nil, &isReadOnly)
|
||||
if err != nil {
|
||||
t.Fatalf("cannot open table: %s", err)
|
||||
}
|
||||
|
@ -151,7 +153,8 @@ func newTestTable(path string, itemsCount int) (*Table, []string, error) {
|
|||
flushCallback := func() {
|
||||
atomic.AddUint64(&flushes, 1)
|
||||
}
|
||||
tb, err := OpenTable(path, flushCallback, nil)
|
||||
var isReadOnly uint32
|
||||
tb, err := OpenTable(path, flushCallback, nil, &isReadOnly)
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("cannot open table: %w", err)
|
||||
}
|
||||
|
|
|
@ -32,7 +32,8 @@ func benchmarkTableSearch(b *testing.B, itemsCount int) {
|
|||
|
||||
// Force finishing pending merges
|
||||
tb.MustClose()
|
||||
tb, err = OpenTable(path, nil, nil)
|
||||
var isReadOnly uint32
|
||||
tb, err = OpenTable(path, nil, nil, &isReadOnly)
|
||||
if err != nil {
|
||||
b.Fatalf("unexpected error when re-opening table %q: %s", path, err)
|
||||
}
|
||||
|
|
|
@ -21,7 +21,8 @@ func TestTableOpenClose(t *testing.T) {
|
|||
}()
|
||||
|
||||
// Create a new table
|
||||
tb, err := OpenTable(path, nil, nil)
|
||||
var isReadOnly uint32
|
||||
tb, err := OpenTable(path, nil, nil, &isReadOnly)
|
||||
if err != nil {
|
||||
t.Fatalf("cannot create new table: %s", err)
|
||||
}
|
||||
|
@ -31,7 +32,7 @@ func TestTableOpenClose(t *testing.T) {
|
|||
|
||||
// Re-open created table multiple times.
|
||||
for i := 0; i < 10; i++ {
|
||||
tb, err := OpenTable(path, nil, nil)
|
||||
tb, err := OpenTable(path, nil, nil, &isReadOnly)
|
||||
if err != nil {
|
||||
t.Fatalf("cannot open created table: %s", err)
|
||||
}
|
||||
|
@ -45,14 +46,15 @@ func TestTableOpenMultipleTimes(t *testing.T) {
|
|||
_ = os.RemoveAll(path)
|
||||
}()
|
||||
|
||||
tb1, err := OpenTable(path, nil, nil)
|
||||
var isReadOnly uint32
|
||||
tb1, err := OpenTable(path, nil, nil, &isReadOnly)
|
||||
if err != nil {
|
||||
t.Fatalf("cannot open table: %s", err)
|
||||
}
|
||||
defer tb1.MustClose()
|
||||
|
||||
for i := 0; i < 10; i++ {
|
||||
tb2, err := OpenTable(path, nil, nil)
|
||||
tb2, err := OpenTable(path, nil, nil, &isReadOnly)
|
||||
if err == nil {
|
||||
tb2.MustClose()
|
||||
t.Fatalf("expecting non-nil error when opening already opened table")
|
||||
|
@ -73,7 +75,8 @@ func TestTableAddItemSerial(t *testing.T) {
|
|||
flushCallback := func() {
|
||||
atomic.AddUint64(&flushes, 1)
|
||||
}
|
||||
tb, err := OpenTable(path, flushCallback, nil)
|
||||
var isReadOnly uint32
|
||||
tb, err := OpenTable(path, flushCallback, nil, &isReadOnly)
|
||||
if err != nil {
|
||||
t.Fatalf("cannot open %q: %s", path, err)
|
||||
}
|
||||
|
@ -99,7 +102,7 @@ func TestTableAddItemSerial(t *testing.T) {
|
|||
testReopenTable(t, path, itemsCount)
|
||||
|
||||
// Add more items in order to verify merge between inmemory parts and file-based parts.
|
||||
tb, err = OpenTable(path, nil, nil)
|
||||
tb, err = OpenTable(path, nil, nil, &isReadOnly)
|
||||
if err != nil {
|
||||
t.Fatalf("cannot open %q: %s", path, err)
|
||||
}
|
||||
|
@ -132,7 +135,8 @@ func TestTableCreateSnapshotAt(t *testing.T) {
|
|||
_ = os.RemoveAll(path)
|
||||
}()
|
||||
|
||||
tb, err := OpenTable(path, nil, nil)
|
||||
var isReadOnly uint32
|
||||
tb, err := OpenTable(path, nil, nil, &isReadOnly)
|
||||
if err != nil {
|
||||
t.Fatalf("cannot open %q: %s", path, err)
|
||||
}
|
||||
|
@ -163,13 +167,13 @@ func TestTableCreateSnapshotAt(t *testing.T) {
|
|||
}()
|
||||
|
||||
// Verify snapshots contain all the data.
|
||||
tb1, err := OpenTable(snapshot1, nil, nil)
|
||||
tb1, err := OpenTable(snapshot1, nil, nil, &isReadOnly)
|
||||
if err != nil {
|
||||
t.Fatalf("cannot open %q: %s", path, err)
|
||||
}
|
||||
defer tb1.MustClose()
|
||||
|
||||
tb2, err := OpenTable(snapshot2, nil, nil)
|
||||
tb2, err := OpenTable(snapshot2, nil, nil, &isReadOnly)
|
||||
if err != nil {
|
||||
t.Fatalf("cannot open %q: %s", path, err)
|
||||
}
|
||||
|
@ -222,7 +226,8 @@ func TestTableAddItemsConcurrent(t *testing.T) {
|
|||
atomic.AddUint64(&itemsMerged, uint64(len(items)))
|
||||
return data, items
|
||||
}
|
||||
tb, err := OpenTable(path, flushCallback, prepareBlock)
|
||||
var isReadOnly uint32
|
||||
tb, err := OpenTable(path, flushCallback, prepareBlock, &isReadOnly)
|
||||
if err != nil {
|
||||
t.Fatalf("cannot open %q: %s", path, err)
|
||||
}
|
||||
|
@ -252,7 +257,7 @@ func TestTableAddItemsConcurrent(t *testing.T) {
|
|||
testReopenTable(t, path, itemsCount)
|
||||
|
||||
// Add more items in order to verify merge between inmemory parts and file-based parts.
|
||||
tb, err = OpenTable(path, nil, nil)
|
||||
tb, err = OpenTable(path, nil, nil, &isReadOnly)
|
||||
if err != nil {
|
||||
t.Fatalf("cannot open %q: %s", path, err)
|
||||
}
|
||||
|
@ -294,7 +299,8 @@ func testReopenTable(t *testing.T, path string, itemsCount int) {
|
|||
t.Helper()
|
||||
|
||||
for i := 0; i < 10; i++ {
|
||||
tb, err := OpenTable(path, nil, nil)
|
||||
var isReadOnly uint32
|
||||
tb, err := OpenTable(path, nil, nil, &isReadOnly)
|
||||
if err != nil {
|
||||
t.Fatalf("cannot re-open %q: %s", path, err)
|
||||
}
|
||||
|
|
|
@ -131,7 +131,7 @@ func getTagFilterCacheSize() int {
|
|||
//
|
||||
// The rotationTimestamp must be set to the current unix timestamp when openIndexDB
|
||||
// is called when creating new indexdb during indexdb rotation.
|
||||
func openIndexDB(path string, s *Storage, rotationTimestamp uint64) (*indexDB, error) {
|
||||
func openIndexDB(path string, s *Storage, rotationTimestamp uint64, isReadOnly *uint32) (*indexDB, error) {
|
||||
if s == nil {
|
||||
logger.Panicf("BUG: Storage must be nin-nil")
|
||||
}
|
||||
|
@ -142,7 +142,7 @@ func openIndexDB(path string, s *Storage, rotationTimestamp uint64) (*indexDB, e
|
|||
return nil, fmt.Errorf("failed to parse indexdb path %q: %w", path, err)
|
||||
}
|
||||
|
||||
tb, err := mergeset.OpenTable(path, invalidateTagFiltersCache, mergeTagToMetricIDsRows)
|
||||
tb, err := mergeset.OpenTable(path, invalidateTagFiltersCache, mergeTagToMetricIDsRows, isReadOnly)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot open indexDB %q: %w", path, err)
|
||||
}
|
||||
|
|
|
@ -461,7 +461,8 @@ func TestIndexDBOpenClose(t *testing.T) {
|
|||
defer stopTestStorage(s)
|
||||
tableName := nextIndexDBTableName()
|
||||
for i := 0; i < 5; i++ {
|
||||
db, err := openIndexDB(tableName, s, 0)
|
||||
var isReadOnly uint32
|
||||
db, err := openIndexDB(tableName, s, 0, &isReadOnly)
|
||||
if err != nil {
|
||||
t.Fatalf("cannot open indexDB: %s", err)
|
||||
}
|
||||
|
@ -480,7 +481,8 @@ func TestIndexDB(t *testing.T) {
|
|||
defer stopTestStorage(s)
|
||||
|
||||
dbName := nextIndexDBTableName()
|
||||
db, err := openIndexDB(dbName, s, 0)
|
||||
var isReadOnly uint32
|
||||
db, err := openIndexDB(dbName, s, 0, &isReadOnly)
|
||||
if err != nil {
|
||||
t.Fatalf("cannot open indexDB: %s", err)
|
||||
}
|
||||
|
@ -510,7 +512,7 @@ func TestIndexDB(t *testing.T) {
|
|||
|
||||
// Re-open the db and verify it works as expected.
|
||||
db.MustClose()
|
||||
db, err = openIndexDB(dbName, s, 0)
|
||||
db, err = openIndexDB(dbName, s, 0, &isReadOnly)
|
||||
if err != nil {
|
||||
t.Fatalf("cannot open indexDB: %s", err)
|
||||
}
|
||||
|
@ -530,7 +532,8 @@ func TestIndexDB(t *testing.T) {
|
|||
defer stopTestStorage(s)
|
||||
|
||||
dbName := nextIndexDBTableName()
|
||||
db, err := openIndexDB(dbName, s, 0)
|
||||
var isReadOnly uint32
|
||||
db, err := openIndexDB(dbName, s, 0, &isReadOnly)
|
||||
if err != nil {
|
||||
t.Fatalf("cannot open indexDB: %s", err)
|
||||
}
|
||||
|
@ -1606,7 +1609,8 @@ func TestSearchTSIDWithTimeRange(t *testing.T) {
|
|||
defer stopTestStorage(s)
|
||||
|
||||
dbName := nextIndexDBTableName()
|
||||
db, err := openIndexDB(dbName, s, 0)
|
||||
var isReadOnly uint32
|
||||
db, err := openIndexDB(dbName, s, 0, &isReadOnly)
|
||||
if err != nil {
|
||||
t.Fatalf("cannot open indexDB: %s", err)
|
||||
}
|
||||
|
|
|
@ -44,7 +44,8 @@ func BenchmarkIndexDBAddTSIDs(b *testing.B) {
|
|||
defer stopTestStorage(s)
|
||||
|
||||
dbName := nextIndexDBTableName()
|
||||
db, err := openIndexDB(dbName, s, 0)
|
||||
var isReadOnly uint32
|
||||
db, err := openIndexDB(dbName, s, 0, &isReadOnly)
|
||||
if err != nil {
|
||||
b.Fatalf("cannot open indexDB: %s", err)
|
||||
}
|
||||
|
@ -105,7 +106,8 @@ func BenchmarkHeadPostingForMatchers(b *testing.B) {
|
|||
defer stopTestStorage(s)
|
||||
|
||||
dbName := nextIndexDBTableName()
|
||||
db, err := openIndexDB(dbName, s, 0)
|
||||
var isReadOnly uint32
|
||||
db, err := openIndexDB(dbName, s, 0, &isReadOnly)
|
||||
if err != nil {
|
||||
b.Fatalf("cannot open indexDB: %s", err)
|
||||
}
|
||||
|
@ -280,7 +282,8 @@ func BenchmarkIndexDBGetTSIDs(b *testing.B) {
|
|||
defer stopTestStorage(s)
|
||||
|
||||
dbName := nextIndexDBTableName()
|
||||
db, err := openIndexDB(dbName, s, 0)
|
||||
var isReadOnly uint32
|
||||
db, err := openIndexDB(dbName, s, 0, &isReadOnly)
|
||||
if err != nil {
|
||||
b.Fatalf("cannot open indexDB: %s", err)
|
||||
}
|
||||
|
|
|
@ -126,6 +126,10 @@ type partition struct {
|
|||
// Used for deleting data outside the retention during background merge.
|
||||
retentionMsecs int64
|
||||
|
||||
// Whether the storage is in read-only mode.
|
||||
// Background merge is stopped in read-only mode.
|
||||
isReadOnly *uint32
|
||||
|
||||
// Name is the name of the partition in the form YYYY_MM.
|
||||
name string
|
||||
|
||||
|
@ -199,7 +203,8 @@ func (pw *partWrapper) decRef() {
|
|||
|
||||
// createPartition creates new partition for the given timestamp and the given paths
|
||||
// to small and big partitions.
|
||||
func createPartition(timestamp int64, smallPartitionsPath, bigPartitionsPath string, getDeletedMetricIDs func() *uint64set.Set, retentionMsecs int64) (*partition, error) {
|
||||
func createPartition(timestamp int64, smallPartitionsPath, bigPartitionsPath string,
|
||||
getDeletedMetricIDs func() *uint64set.Set, retentionMsecs int64, isReadOnly *uint32) (*partition, error) {
|
||||
name := timestampToPartitionName(timestamp)
|
||||
smallPartsPath := filepath.Clean(smallPartitionsPath) + "/" + name
|
||||
bigPartsPath := filepath.Clean(bigPartitionsPath) + "/" + name
|
||||
|
@ -212,7 +217,7 @@ func createPartition(timestamp int64, smallPartitionsPath, bigPartitionsPath str
|
|||
return nil, fmt.Errorf("cannot create directories for big parts %q: %w", bigPartsPath, err)
|
||||
}
|
||||
|
||||
pt := newPartition(name, smallPartsPath, bigPartsPath, getDeletedMetricIDs, retentionMsecs)
|
||||
pt := newPartition(name, smallPartsPath, bigPartsPath, getDeletedMetricIDs, retentionMsecs, isReadOnly)
|
||||
pt.tr.fromPartitionTimestamp(timestamp)
|
||||
pt.startMergeWorkers()
|
||||
pt.startRawRowsFlusher()
|
||||
|
@ -238,7 +243,7 @@ func (pt *partition) Drop() {
|
|||
}
|
||||
|
||||
// openPartition opens the existing partition from the given paths.
|
||||
func openPartition(smallPartsPath, bigPartsPath string, getDeletedMetricIDs func() *uint64set.Set, retentionMsecs int64) (*partition, error) {
|
||||
func openPartition(smallPartsPath, bigPartsPath string, getDeletedMetricIDs func() *uint64set.Set, retentionMsecs int64, isReadOnly *uint32) (*partition, error) {
|
||||
smallPartsPath = filepath.Clean(smallPartsPath)
|
||||
bigPartsPath = filepath.Clean(bigPartsPath)
|
||||
|
||||
|
@ -262,7 +267,7 @@ func openPartition(smallPartsPath, bigPartsPath string, getDeletedMetricIDs func
|
|||
return nil, fmt.Errorf("cannot open big parts from %q: %w", bigPartsPath, err)
|
||||
}
|
||||
|
||||
pt := newPartition(name, smallPartsPath, bigPartsPath, getDeletedMetricIDs, retentionMsecs)
|
||||
pt := newPartition(name, smallPartsPath, bigPartsPath, getDeletedMetricIDs, retentionMsecs, isReadOnly)
|
||||
pt.smallParts = smallParts
|
||||
pt.bigParts = bigParts
|
||||
if err := pt.tr.fromPartitionName(name); err != nil {
|
||||
|
@ -276,7 +281,7 @@ func openPartition(smallPartsPath, bigPartsPath string, getDeletedMetricIDs func
|
|||
return pt, nil
|
||||
}
|
||||
|
||||
func newPartition(name, smallPartsPath, bigPartsPath string, getDeletedMetricIDs func() *uint64set.Set, retentionMsecs int64) *partition {
|
||||
func newPartition(name, smallPartsPath, bigPartsPath string, getDeletedMetricIDs func() *uint64set.Set, retentionMsecs int64, isReadOnly *uint32) *partition {
|
||||
p := &partition{
|
||||
name: name,
|
||||
smallPartsPath: smallPartsPath,
|
||||
|
@ -284,6 +289,7 @@ func newPartition(name, smallPartsPath, bigPartsPath string, getDeletedMetricIDs
|
|||
|
||||
getDeletedMetricIDs: getDeletedMetricIDs,
|
||||
retentionMsecs: retentionMsecs,
|
||||
isReadOnly: isReadOnly,
|
||||
|
||||
mergeIdx: uint64(time.Now().UnixNano()),
|
||||
stopCh: make(chan struct{}),
|
||||
|
@ -993,7 +999,16 @@ func getMaxOutBytes(path string, workersCount int) uint64 {
|
|||
return maxOutBytes
|
||||
}
|
||||
|
||||
func (pt *partition) canBackgroundMerge() bool {
|
||||
return atomic.LoadUint32(pt.isReadOnly) == 0
|
||||
}
|
||||
|
||||
func (pt *partition) mergeBigParts(isFinal bool) error {
|
||||
if !pt.canBackgroundMerge() {
|
||||
// Do not perform merge in read-only mode, since this may result in disk space shortage.
|
||||
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2603
|
||||
return nil
|
||||
}
|
||||
maxOutBytes := getMaxOutBytes(pt.bigPartsPath, bigMergeWorkersCount)
|
||||
|
||||
pt.partsLock.Lock()
|
||||
|
@ -1005,6 +1020,11 @@ func (pt *partition) mergeBigParts(isFinal bool) error {
|
|||
}
|
||||
|
||||
func (pt *partition) mergeSmallParts(isFinal bool) error {
|
||||
if !pt.canBackgroundMerge() {
|
||||
// Do not perform merge in read-only mode, since this may result in disk space shortage.
|
||||
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2603
|
||||
return nil
|
||||
}
|
||||
// Try merging small parts to a big part at first.
|
||||
maxBigPartOutBytes := getMaxOutBytes(pt.bigPartsPath, bigMergeWorkersCount)
|
||||
pt.partsLock.Lock()
|
||||
|
|
|
@ -168,7 +168,8 @@ func testPartitionSearchEx(t *testing.T, ptt int64, tr TimeRange, partsCount, ma
|
|||
|
||||
// Create partition from rowss and test search on it.
|
||||
retentionMsecs := timestampFromTime(time.Now()) - ptr.MinTimestamp + 3600*1000
|
||||
pt, err := createPartition(ptt, "./small-table", "./big-table", nilGetDeletedMetricIDs, retentionMsecs)
|
||||
var isReadOnly uint32
|
||||
pt, err := createPartition(ptt, "./small-table", "./big-table", nilGetDeletedMetricIDs, retentionMsecs, &isReadOnly)
|
||||
if err != nil {
|
||||
t.Fatalf("cannot create partition: %s", err)
|
||||
}
|
||||
|
@ -192,7 +193,7 @@ func testPartitionSearchEx(t *testing.T, ptt int64, tr TimeRange, partsCount, ma
|
|||
pt.MustClose()
|
||||
|
||||
// Open the created partition and test search on it.
|
||||
pt, err = openPartition(smallPartsPath, bigPartsPath, nilGetDeletedMetricIDs, retentionMsecs)
|
||||
pt, err = openPartition(smallPartsPath, bigPartsPath, nilGetDeletedMetricIDs, retentionMsecs, &isReadOnly)
|
||||
if err != nil {
|
||||
t.Fatalf("cannot open partition: %s", err)
|
||||
}
|
||||
|
|
|
@ -259,7 +259,7 @@ func OpenStorage(path string, retentionMsecs int64, maxHourlySeries, maxDailySer
|
|||
|
||||
// Load data
|
||||
tablePath := path + "/data"
|
||||
tb, err := openTable(tablePath, s.getDeletedMetricIDs, retentionMsecs)
|
||||
tb, err := openTable(tablePath, s.getDeletedMetricIDs, retentionMsecs, &s.isReadOnly)
|
||||
if err != nil {
|
||||
s.idb().MustClose()
|
||||
return nil, fmt.Errorf("cannot open table at %q: %w", tablePath, err)
|
||||
|
@ -722,7 +722,7 @@ func (s *Storage) mustRotateIndexDB() {
|
|||
newTableName := nextIndexDBTableName()
|
||||
idbNewPath := s.path + "/indexdb/" + newTableName
|
||||
rotationTimestamp := fasttime.UnixTimestamp()
|
||||
idbNew, err := openIndexDB(idbNewPath, s, rotationTimestamp)
|
||||
idbNew, err := openIndexDB(idbNewPath, s, rotationTimestamp, &s.isReadOnly)
|
||||
if err != nil {
|
||||
logger.Panicf("FATAL: cannot create new indexDB at %q: %s", idbNewPath, err)
|
||||
}
|
||||
|
@ -2526,12 +2526,12 @@ func (s *Storage) openIndexDBTables(path string) (curr, prev *indexDB, err error
|
|||
// Open the last two tables.
|
||||
currPath := path + "/" + tableNames[len(tableNames)-1]
|
||||
|
||||
curr, err = openIndexDB(currPath, s, 0)
|
||||
curr, err = openIndexDB(currPath, s, 0, &s.isReadOnly)
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("cannot open curr indexdb table at %q: %w", currPath, err)
|
||||
}
|
||||
prevPath := path + "/" + tableNames[len(tableNames)-2]
|
||||
prev, err = openIndexDB(prevPath, s, 0)
|
||||
prev, err = openIndexDB(prevPath, s, 0, &s.isReadOnly)
|
||||
if err != nil {
|
||||
curr.MustClose()
|
||||
return nil, nil, fmt.Errorf("cannot open prev indexdb table at %q: %w", prevPath, err)
|
||||
|
|
|
@ -23,6 +23,7 @@ type table struct {
|
|||
|
||||
getDeletedMetricIDs func() *uint64set.Set
|
||||
retentionMsecs int64
|
||||
isReadOnly *uint32
|
||||
|
||||
ptws []*partitionWrapper
|
||||
ptwsLock sync.Mutex
|
||||
|
@ -83,7 +84,7 @@ func (ptw *partitionWrapper) scheduleToDrop() {
|
|||
// The table is created if it doesn't exist.
|
||||
//
|
||||
// Data older than the retentionMsecs may be dropped at any time.
|
||||
func openTable(path string, getDeletedMetricIDs func() *uint64set.Set, retentionMsecs int64) (*table, error) {
|
||||
func openTable(path string, getDeletedMetricIDs func() *uint64set.Set, retentionMsecs int64, isReadOnly *uint32) (*table, error) {
|
||||
path = filepath.Clean(path)
|
||||
|
||||
// Create a directory for the table if it doesn't exist yet.
|
||||
|
@ -116,7 +117,7 @@ func openTable(path string, getDeletedMetricIDs func() *uint64set.Set, retention
|
|||
}
|
||||
|
||||
// Open partitions.
|
||||
pts, err := openPartitions(smallPartitionsPath, bigPartitionsPath, getDeletedMetricIDs, retentionMsecs)
|
||||
pts, err := openPartitions(smallPartitionsPath, bigPartitionsPath, getDeletedMetricIDs, retentionMsecs, isReadOnly)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot open partitions in the table %q: %w", path, err)
|
||||
}
|
||||
|
@ -127,6 +128,7 @@ func openTable(path string, getDeletedMetricIDs func() *uint64set.Set, retention
|
|||
bigPartitionsPath: bigPartitionsPath,
|
||||
getDeletedMetricIDs: getDeletedMetricIDs,
|
||||
retentionMsecs: retentionMsecs,
|
||||
isReadOnly: isReadOnly,
|
||||
|
||||
flockF: flockF,
|
||||
|
||||
|
@ -359,7 +361,7 @@ func (tb *table) AddRows(rows []rawRow) error {
|
|||
continue
|
||||
}
|
||||
|
||||
pt, err := createPartition(r.Timestamp, tb.smallPartitionsPath, tb.bigPartitionsPath, tb.getDeletedMetricIDs, tb.retentionMsecs)
|
||||
pt, err := createPartition(r.Timestamp, tb.smallPartitionsPath, tb.bigPartitionsPath, tb.getDeletedMetricIDs, tb.retentionMsecs, tb.isReadOnly)
|
||||
if err != nil {
|
||||
// Return only the first error, since it has no sense in returning all errors.
|
||||
tb.ptwsLock.Unlock()
|
||||
|
@ -497,7 +499,7 @@ func (tb *table) PutPartitions(ptws []*partitionWrapper) {
|
|||
}
|
||||
}
|
||||
|
||||
func openPartitions(smallPartitionsPath, bigPartitionsPath string, getDeletedMetricIDs func() *uint64set.Set, retentionMsecs int64) ([]*partition, error) {
|
||||
func openPartitions(smallPartitionsPath, bigPartitionsPath string, getDeletedMetricIDs func() *uint64set.Set, retentionMsecs int64, isReadOnly *uint32) ([]*partition, error) {
|
||||
// Certain partition directories in either `big` or `small` dir may be missing
|
||||
// after restoring from backup. So populate partition names from both dirs.
|
||||
ptNames := make(map[string]bool)
|
||||
|
@ -511,7 +513,7 @@ func openPartitions(smallPartitionsPath, bigPartitionsPath string, getDeletedMet
|
|||
for ptName := range ptNames {
|
||||
smallPartsPath := smallPartitionsPath + "/" + ptName
|
||||
bigPartsPath := bigPartitionsPath + "/" + ptName
|
||||
pt, err := openPartition(smallPartsPath, bigPartsPath, getDeletedMetricIDs, retentionMsecs)
|
||||
pt, err := openPartition(smallPartsPath, bigPartsPath, getDeletedMetricIDs, retentionMsecs, isReadOnly)
|
||||
if err != nil {
|
||||
mustClosePartitions(pts)
|
||||
return nil, fmt.Errorf("cannot open partition %q: %w", ptName, err)
|
||||
|
|
|
@ -181,7 +181,8 @@ func testTableSearchEx(t *testing.T, trData, trSearch TimeRange, partitionsCount
|
|||
})
|
||||
|
||||
// Create a table from rowss and test search on it.
|
||||
tb, err := openTable("./test-table", nilGetDeletedMetricIDs, maxRetentionMsecs)
|
||||
var isReadOnly uint32
|
||||
tb, err := openTable("./test-table", nilGetDeletedMetricIDs, maxRetentionMsecs, &isReadOnly)
|
||||
if err != nil {
|
||||
t.Fatalf("cannot create table: %s", err)
|
||||
}
|
||||
|
@ -202,7 +203,7 @@ func testTableSearchEx(t *testing.T, trData, trSearch TimeRange, partitionsCount
|
|||
tb.MustClose()
|
||||
|
||||
// Open the created table and test search on it.
|
||||
tb, err = openTable("./test-table", nilGetDeletedMetricIDs, maxRetentionMsecs)
|
||||
tb, err = openTable("./test-table", nilGetDeletedMetricIDs, maxRetentionMsecs, &isReadOnly)
|
||||
if err != nil {
|
||||
t.Fatalf("cannot open table: %s", err)
|
||||
}
|
||||
|
|
|
@ -48,7 +48,8 @@ func openBenchTable(b *testing.B, startTimestamp int64, rowsPerInsert, rowsCount
|
|||
createBenchTable(b, path, startTimestamp, rowsPerInsert, rowsCount, tsidsCount)
|
||||
createdBenchTables[path] = true
|
||||
}
|
||||
tb, err := openTable(path, nilGetDeletedMetricIDs, maxRetentionMsecs)
|
||||
var isReadOnly uint32
|
||||
tb, err := openTable(path, nilGetDeletedMetricIDs, maxRetentionMsecs, &isReadOnly)
|
||||
if err != nil {
|
||||
b.Fatalf("cnanot open table %q: %s", path, err)
|
||||
}
|
||||
|
@ -71,7 +72,8 @@ var createdBenchTables = make(map[string]bool)
|
|||
func createBenchTable(b *testing.B, path string, startTimestamp int64, rowsPerInsert, rowsCount, tsidsCount int) {
|
||||
b.Helper()
|
||||
|
||||
tb, err := openTable(path, nilGetDeletedMetricIDs, maxRetentionMsecs)
|
||||
var isReadOnly uint32
|
||||
tb, err := openTable(path, nilGetDeletedMetricIDs, maxRetentionMsecs, &isReadOnly)
|
||||
if err != nil {
|
||||
b.Fatalf("cannot open table %q: %s", path, err)
|
||||
}
|
||||
|
|
|
@ -17,7 +17,8 @@ func TestTableOpenClose(t *testing.T) {
|
|||
}()
|
||||
|
||||
// Create a new table
|
||||
tb, err := openTable(path, nilGetDeletedMetricIDs, retentionMsecs)
|
||||
var isReadOnly uint32
|
||||
tb, err := openTable(path, nilGetDeletedMetricIDs, retentionMsecs, &isReadOnly)
|
||||
if err != nil {
|
||||
t.Fatalf("cannot create new table: %s", err)
|
||||
}
|
||||
|
@ -27,7 +28,7 @@ func TestTableOpenClose(t *testing.T) {
|
|||
|
||||
// Re-open created table multiple times.
|
||||
for i := 0; i < 10; i++ {
|
||||
tb, err := openTable(path, nilGetDeletedMetricIDs, retentionMsecs)
|
||||
tb, err := openTable(path, nilGetDeletedMetricIDs, retentionMsecs, &isReadOnly)
|
||||
if err != nil {
|
||||
t.Fatalf("cannot open created table: %s", err)
|
||||
}
|
||||
|
@ -43,14 +44,15 @@ func TestTableOpenMultipleTimes(t *testing.T) {
|
|||
_ = os.RemoveAll(path)
|
||||
}()
|
||||
|
||||
tb1, err := openTable(path, nilGetDeletedMetricIDs, retentionMsecs)
|
||||
var isReadOnly uint32
|
||||
tb1, err := openTable(path, nilGetDeletedMetricIDs, retentionMsecs, &isReadOnly)
|
||||
if err != nil {
|
||||
t.Fatalf("cannot open table the first time: %s", err)
|
||||
}
|
||||
defer tb1.MustClose()
|
||||
|
||||
for i := 0; i < 10; i++ {
|
||||
tb2, err := openTable(path, nilGetDeletedMetricIDs, retentionMsecs)
|
||||
tb2, err := openTable(path, nilGetDeletedMetricIDs, retentionMsecs, &isReadOnly)
|
||||
if err == nil {
|
||||
tb2.MustClose()
|
||||
t.Fatalf("expecting non-nil error when opening already opened table")
|
||||
|
|
|
@ -46,7 +46,8 @@ func benchmarkTableAddRows(b *testing.B, rowsPerInsert, tsidsCount int) {
|
|||
b.SetBytes(int64(rowsCountExpected))
|
||||
tablePath := "./benchmarkTableAddRows"
|
||||
for i := 0; i < b.N; i++ {
|
||||
tb, err := openTable(tablePath, nilGetDeletedMetricIDs, maxRetentionMsecs)
|
||||
var isReadOnly uint32
|
||||
tb, err := openTable(tablePath, nilGetDeletedMetricIDs, maxRetentionMsecs, &isReadOnly)
|
||||
if err != nil {
|
||||
b.Fatalf("cannot open table %q: %s", tablePath, err)
|
||||
}
|
||||
|
@ -94,7 +95,7 @@ func benchmarkTableAddRows(b *testing.B, rowsPerInsert, tsidsCount int) {
|
|||
tb.MustClose()
|
||||
|
||||
// Open the table from files and verify the rows count on it
|
||||
tb, err = openTable(tablePath, nilGetDeletedMetricIDs, maxRetentionMsecs)
|
||||
tb, err = openTable(tablePath, nilGetDeletedMetricIDs, maxRetentionMsecs, &isReadOnly)
|
||||
if err != nil {
|
||||
b.Fatalf("cannot open table %q: %s", tablePath, err)
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue