mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
lib/fs: replace MkdirAllIfNotExist->MustMkdirIfNotExist and MkdirAllFailIfExist->MustMkdirFailIfExist
Callers of these functions log the returned error and then exit. The returned error already contains the path to directory, which was failed to be created. So let's just log the error together with the call stack inside these functions. This leaves the debuggability of the returned error at the same level while allows simplifying the code at callers' side. While at it, properly use MustMkdirFailIfExist instead of MustMkdirIfNotExist inside inmemoryPart.MustStoreToDisk(). It is expected that the inmemoryPart.MustStoreToDick() must fail if there is already a directory under the given path.
This commit is contained in:
parent
b4c330ea2b
commit
aac3dccfd1
16 changed files with 71 additions and 148 deletions
|
@ -3,6 +3,7 @@ package netstorage
|
|||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||
|
@ -20,11 +21,9 @@ func InitTmpBlocksDir(tmpDirPath string) {
|
|||
if len(tmpDirPath) == 0 {
|
||||
tmpDirPath = os.TempDir()
|
||||
}
|
||||
tmpBlocksDir = tmpDirPath + "/searchResults"
|
||||
tmpBlocksDir = filepath.Join(tmpDirPath, "searchResults")
|
||||
fs.MustRemoveAll(tmpBlocksDir)
|
||||
if err := fs.MkdirAllIfNotExist(tmpBlocksDir); err != nil {
|
||||
logger.Panicf("FATAL: cannot create %q: %s", tmpBlocksDir, err)
|
||||
}
|
||||
fs.MustMkdirIfNotExist(tmpBlocksDir)
|
||||
}
|
||||
|
||||
var tmpBlocksDir string
|
||||
|
|
|
@ -45,9 +45,7 @@ func (r *Restore) Run() error {
|
|||
startTime := time.Now()
|
||||
|
||||
// Make sure VictoriaMetrics doesn't run during the restore process.
|
||||
if err := fs.MkdirAllIfNotExist(r.Dst.Dir); err != nil {
|
||||
return fmt.Errorf("cannot create dir %q: %w", r.Dst.Dir, err)
|
||||
}
|
||||
fs.MustMkdirIfNotExist(r.Dst.Dir)
|
||||
flockF, err := fs.CreateFlockFile(r.Dst.Dir)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot create lock file in %q; make sure VictoriaMetrics doesn't use the dir; error: %w", r.Dst.Dir, err)
|
||||
|
|
26
lib/fs/fs.go
26
lib/fs/fs.go
|
@ -99,22 +99,26 @@ func IsTemporaryFileName(fn string) bool {
|
|||
// tmpFileNameRe is regexp for temporary file name - see WriteFileAtomically for details.
|
||||
var tmpFileNameRe = regexp.MustCompile(`\.tmp\.\d+$`)
|
||||
|
||||
// MkdirAllIfNotExist creates the given path dir if it isn't exist.
|
||||
func MkdirAllIfNotExist(path string) error {
|
||||
// MustMkdirIfNotExist creates the given path dir if it isn't exist.
|
||||
func MustMkdirIfNotExist(path string) {
|
||||
if IsPathExist(path) {
|
||||
return nil
|
||||
return
|
||||
}
|
||||
if err := mkdirSync(path); err != nil {
|
||||
logger.Panicf("FATAL: cannot create directory: %s", err)
|
||||
}
|
||||
return mkdirSync(path)
|
||||
}
|
||||
|
||||
// MkdirAllFailIfExist creates the given path dir if it isn't exist.
|
||||
// MustMkdirFailIfExist creates the given path dir if it isn't exist.
|
||||
//
|
||||
// Returns error if path already exists.
|
||||
func MkdirAllFailIfExist(path string) error {
|
||||
// If the directory at the given path already exists, then the function logs the error and exits.
|
||||
func MustMkdirFailIfExist(path string) {
|
||||
if IsPathExist(path) {
|
||||
return fmt.Errorf("the %q already exists", path)
|
||||
logger.Panicf("FATAL: the %q already exists", path)
|
||||
}
|
||||
if err := mkdirSync(path); err != nil {
|
||||
logger.Panicf("FATAL: cannot create directory: %s", err)
|
||||
}
|
||||
return mkdirSync(path)
|
||||
}
|
||||
|
||||
func mkdirSync(path string) error {
|
||||
|
@ -310,9 +314,7 @@ func CopyDirectory(srcPath, dstPath string) error {
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := MkdirAllIfNotExist(dstPath); err != nil {
|
||||
return err
|
||||
}
|
||||
MustMkdirIfNotExist(dstPath)
|
||||
for _, de := range des {
|
||||
if !de.Type().IsRegular() {
|
||||
// Skip non-files
|
||||
|
|
|
@ -78,9 +78,7 @@ func (bsw *blockStreamWriter) InitFromFilePart(path string, nocache bool, compre
|
|||
path = filepath.Clean(path)
|
||||
|
||||
// Create the directory
|
||||
if err := fs.MkdirAllFailIfExist(path); err != nil {
|
||||
return fmt.Errorf("cannot create directory %q: %w", path, err)
|
||||
}
|
||||
fs.MustMkdirFailIfExist(path)
|
||||
|
||||
// Create part files in the directory.
|
||||
|
||||
|
|
|
@ -1,7 +1,6 @@
|
|||
package mergeset
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"path/filepath"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||
|
@ -32,11 +31,10 @@ func (mp *inmemoryPart) Reset() {
|
|||
mp.lensData.Reset()
|
||||
}
|
||||
|
||||
// StoreToDisk stores mp to the given path on disk.
|
||||
func (mp *inmemoryPart) StoreToDisk(path string) error {
|
||||
if err := fs.MkdirAllIfNotExist(path); err != nil {
|
||||
return fmt.Errorf("cannot create directory %q: %w", path, err)
|
||||
}
|
||||
// MustStoreToDisk stores mp to the given path on disk.
|
||||
func (mp *inmemoryPart) MustStoreToDisk(path string) {
|
||||
fs.MustMkdirFailIfExist(path)
|
||||
|
||||
metaindexPath := filepath.Join(path, metaindexFilename)
|
||||
fs.MustWriteSync(metaindexPath, mp.metaindexData.B)
|
||||
|
||||
|
@ -53,7 +51,6 @@ func (mp *inmemoryPart) StoreToDisk(path string) error {
|
|||
|
||||
fs.MustSyncPath(path)
|
||||
// Do not sync parent directory - it must be synced by the caller.
|
||||
return nil
|
||||
}
|
||||
|
||||
// Init initializes mp from ib.
|
||||
|
|
|
@ -328,9 +328,7 @@ func OpenTable(path string, flushCallback func(), prepareBlock PrepareBlockCallb
|
|||
startTime := time.Now()
|
||||
|
||||
// Create a directory for the table if it doesn't exist yet.
|
||||
if err := fs.MkdirAllIfNotExist(path); err != nil {
|
||||
return nil, fmt.Errorf("cannot create directory %q: %w", path, err)
|
||||
}
|
||||
fs.MustMkdirIfNotExist(path)
|
||||
|
||||
// Protect from concurrent opens.
|
||||
flockF, err := fs.CreateFlockFile(path)
|
||||
|
@ -1091,9 +1089,7 @@ func (tb *Table) mergeParts(pws []*partWrapper, stopCh <-chan struct{}, isFinal
|
|||
if isFinal && len(pws) == 1 && pws[0].mp != nil {
|
||||
// Fast path: flush a single in-memory part to disk.
|
||||
mp := pws[0].mp
|
||||
if err := mp.StoreToDisk(dstPartPath); err != nil {
|
||||
logger.Panicf("FATAL: cannot store in-memory part to %s: %s", dstPartPath, err)
|
||||
}
|
||||
mp.MustStoreToDisk(dstPartPath)
|
||||
pwNew := tb.openCreatedPart(pws, nil, dstPartPath)
|
||||
tb.swapSrcWithDstParts(pws, pwNew, dstPartType)
|
||||
return nil
|
||||
|
@ -1373,9 +1369,7 @@ var mergeWorkersLimitCh = make(chan struct{}, cgroup.AvailableCPUs())
|
|||
|
||||
func openParts(path string) ([]*partWrapper, error) {
|
||||
// The path can be missing after restoring from backup, so create it if needed.
|
||||
if err := fs.MkdirAllIfNotExist(path); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
fs.MustMkdirIfNotExist(path)
|
||||
fs.MustRemoveTemporaryDirs(path)
|
||||
|
||||
// Remove txn and tmp directories, which may be left after the upgrade
|
||||
|
@ -1464,9 +1458,7 @@ func (tb *Table) CreateSnapshotAt(dstDir string, deadline uint64) error {
|
|||
// Flush inmemory items to disk.
|
||||
tb.flushInmemoryItems()
|
||||
|
||||
if err := fs.MkdirAllFailIfExist(dstDir); err != nil {
|
||||
return fmt.Errorf("cannot create snapshot dir %q: %w", dstDir, err)
|
||||
}
|
||||
fs.MustMkdirFailIfExist(dstDir)
|
||||
|
||||
pws := tb.getParts(nil)
|
||||
defer tb.putParts(pws)
|
||||
|
|
|
@ -183,9 +183,7 @@ func tryOpeningQueue(path, name string, chunkFileSize, maxBlockSize, maxPendingB
|
|||
}
|
||||
}
|
||||
|
||||
if err := fs.MkdirAllIfNotExist(path); err != nil {
|
||||
return nil, fmt.Errorf("cannot create directory %q: %w", path, err)
|
||||
}
|
||||
fs.MustMkdirIfNotExist(path)
|
||||
q.flockF = mustCreateFlockFile(path)
|
||||
mustCloseFlockF := true
|
||||
defer func() {
|
||||
|
|
|
@ -97,9 +97,7 @@ func (bsw *blockStreamWriter) InitFromFilePart(path string, nocache bool, compre
|
|||
path = filepath.Clean(path)
|
||||
|
||||
// Create the directory
|
||||
if err := fs.MkdirAllFailIfExist(path); err != nil {
|
||||
return fmt.Errorf("cannot create directory %q: %w", path, err)
|
||||
}
|
||||
fs.MustMkdirFailIfExist(path)
|
||||
|
||||
// Create part files in the directory.
|
||||
timestampsPath := filepath.Join(path, timestampsFilename)
|
||||
|
|
|
@ -1,7 +1,6 @@
|
|||
package storage
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"path/filepath"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||
|
@ -35,11 +34,10 @@ func (mp *inmemoryPart) Reset() {
|
|||
mp.creationTime = 0
|
||||
}
|
||||
|
||||
// StoreToDisk stores the mp to the given path on disk.
|
||||
func (mp *inmemoryPart) StoreToDisk(path string) error {
|
||||
if err := fs.MkdirAllIfNotExist(path); err != nil {
|
||||
return fmt.Errorf("cannot create directory %q: %w", path, err)
|
||||
}
|
||||
// MustStoreToDisk stores the mp to the given path on disk.
|
||||
func (mp *inmemoryPart) MustStoreToDisk(path string) {
|
||||
fs.MustMkdirFailIfExist(path)
|
||||
|
||||
timestampsPath := filepath.Join(path, timestampsFilename)
|
||||
fs.MustWriteSync(timestampsPath, mp.timestampsData.B)
|
||||
|
||||
|
@ -56,7 +54,6 @@ func (mp *inmemoryPart) StoreToDisk(path string) error {
|
|||
|
||||
fs.MustSyncPath(path)
|
||||
// Do not sync parent directory - it must be synced by the caller.
|
||||
return nil
|
||||
}
|
||||
|
||||
// InitFromRows initializes mp from the given rows.
|
||||
|
|
|
@ -212,20 +212,16 @@ func (pw *partWrapper) decRef() {
|
|||
}
|
||||
}
|
||||
|
||||
// createPartition creates new partition for the given timestamp and the given paths
|
||||
// mustCreatePartition creates new partition for the given timestamp and the given paths
|
||||
// to small and big partitions.
|
||||
func createPartition(timestamp int64, smallPartitionsPath, bigPartitionsPath string, s *Storage) (*partition, error) {
|
||||
func mustCreatePartition(timestamp int64, smallPartitionsPath, bigPartitionsPath string, s *Storage) *partition {
|
||||
name := timestampToPartitionName(timestamp)
|
||||
smallPartsPath := filepath.Join(filepath.Clean(smallPartitionsPath), name)
|
||||
bigPartsPath := filepath.Join(filepath.Clean(bigPartitionsPath), name)
|
||||
logger.Infof("creating a partition %q with smallPartsPath=%q, bigPartsPath=%q", name, smallPartsPath, bigPartsPath)
|
||||
|
||||
if err := fs.MkdirAllFailIfExist(smallPartsPath); err != nil {
|
||||
return nil, fmt.Errorf("cannot create directory for small parts %q: %w", smallPartsPath, err)
|
||||
}
|
||||
if err := fs.MkdirAllFailIfExist(bigPartsPath); err != nil {
|
||||
return nil, fmt.Errorf("cannot create directory for big parts %q: %w", bigPartsPath, err)
|
||||
}
|
||||
fs.MustMkdirFailIfExist(smallPartsPath)
|
||||
fs.MustMkdirFailIfExist(bigPartsPath)
|
||||
|
||||
pt := newPartition(name, smallPartsPath, bigPartsPath, s)
|
||||
pt.tr.fromPartitionTimestamp(timestamp)
|
||||
|
@ -233,7 +229,7 @@ func createPartition(timestamp int64, smallPartitionsPath, bigPartitionsPath str
|
|||
|
||||
logger.Infof("partition %q has been created", name)
|
||||
|
||||
return pt, nil
|
||||
return pt
|
||||
}
|
||||
|
||||
func (pt *partition) startBackgroundWorkers() {
|
||||
|
@ -1268,9 +1264,7 @@ func (pt *partition) mergeParts(pws []*partWrapper, stopCh <-chan struct{}, isFi
|
|||
if !isDedupEnabled() && isFinal && len(pws) == 1 && pws[0].mp != nil {
|
||||
// Fast path: flush a single in-memory part to disk.
|
||||
mp := pws[0].mp
|
||||
if err := mp.StoreToDisk(dstPartPath); err != nil {
|
||||
logger.Panicf("FATAL: cannot store in-memory part to %s: %s", dstPartPath, err)
|
||||
}
|
||||
mp.MustStoreToDisk(dstPartPath)
|
||||
pwNew := pt.openCreatedPart(&mp.ph, pws, nil, dstPartPath)
|
||||
pt.swapSrcWithDstParts(pws, pwNew, dstPartType)
|
||||
return nil
|
||||
|
@ -1796,9 +1790,7 @@ func getPartsSize(pws []*partWrapper) uint64 {
|
|||
|
||||
func openParts(path string, partNames []string) ([]*partWrapper, error) {
|
||||
// The path can be missing after restoring from backup, so create it if needed.
|
||||
if err := fs.MkdirAllIfNotExist(path); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
fs.MustMkdirIfNotExist(path)
|
||||
fs.MustRemoveTemporaryDirs(path)
|
||||
|
||||
// Remove txn and tmp directories, which may be left after the upgrade
|
||||
|
@ -1879,12 +1871,8 @@ func (pt *partition) CreateSnapshotAt(smallPath, bigPath string) error {
|
|||
pt.PutParts(pwsBig)
|
||||
}()
|
||||
|
||||
if err := fs.MkdirAllFailIfExist(smallPath); err != nil {
|
||||
return fmt.Errorf("cannot create snapshot dir %q: %w", smallPath, err)
|
||||
}
|
||||
if err := fs.MkdirAllFailIfExist(bigPath); err != nil {
|
||||
return fmt.Errorf("cannot create snapshot dir %q: %w", bigPath, err)
|
||||
}
|
||||
fs.MustMkdirFailIfExist(smallPath)
|
||||
fs.MustMkdirFailIfExist(bigPath)
|
||||
|
||||
// Create a file with part names at smallPath
|
||||
mustWritePartNames(pwsSmall, pwsBig, smallPath)
|
||||
|
|
|
@ -169,10 +169,7 @@ func testPartitionSearchEx(t *testing.T, ptt int64, tr TimeRange, partsCount, ma
|
|||
// Create partition from rowss and test search on it.
|
||||
strg := newTestStorage()
|
||||
strg.retentionMsecs = timestampFromTime(time.Now()) - ptr.MinTimestamp + 3600*1000
|
||||
pt, err := createPartition(ptt, "small-table", "big-table", strg)
|
||||
if err != nil {
|
||||
t.Fatalf("cannot create partition: %s", err)
|
||||
}
|
||||
pt := mustCreatePartition(ptt, "small-table", "big-table", strg)
|
||||
smallPartsPath := pt.smallPartsPath
|
||||
bigPartsPath := pt.bigPartsPath
|
||||
defer func() {
|
||||
|
@ -194,6 +191,7 @@ func testPartitionSearchEx(t *testing.T, ptt int64, tr TimeRange, partsCount, ma
|
|||
pt.MustClose()
|
||||
|
||||
// Open the created partition and test search on it.
|
||||
var err error
|
||||
pt, err = openPartition(smallPartsPath, bigPartsPath, strg)
|
||||
if err != nil {
|
||||
t.Fatalf("cannot open partition: %s", err)
|
||||
|
|
|
@ -163,9 +163,7 @@ func OpenStorage(path string, retentionMsecs int64, maxHourlySeries, maxDailySer
|
|||
retentionMsecs: retentionMsecs,
|
||||
stop: make(chan struct{}),
|
||||
}
|
||||
if err := fs.MkdirAllIfNotExist(path); err != nil {
|
||||
return nil, fmt.Errorf("cannot create a directory for the storage at %q: %w", path, err)
|
||||
}
|
||||
fs.MustMkdirIfNotExist(path)
|
||||
|
||||
// Check whether the cache directory must be removed
|
||||
// It is removed if it contains resetCacheOnStartupFilename.
|
||||
|
@ -195,9 +193,7 @@ func OpenStorage(path string, retentionMsecs int64, maxHourlySeries, maxDailySer
|
|||
|
||||
// Pre-create snapshots directory if it is missing.
|
||||
snapshotsPath := filepath.Join(path, snapshotsDirname)
|
||||
if err := fs.MkdirAllIfNotExist(snapshotsPath); err != nil {
|
||||
return nil, fmt.Errorf("cannot create %q: %w", snapshotsPath, err)
|
||||
}
|
||||
fs.MustMkdirIfNotExist(snapshotsPath)
|
||||
fs.MustRemoveTemporaryDirs(snapshotsPath)
|
||||
|
||||
// Initialize series cardinality limiter.
|
||||
|
@ -231,17 +227,13 @@ func OpenStorage(path string, retentionMsecs int64, maxHourlySeries, maxDailySer
|
|||
// Load metadata
|
||||
metadataDir := filepath.Join(path, metadataDirname)
|
||||
isEmptyDB := !fs.IsPathExist(filepath.Join(path, indexdbDirname))
|
||||
if err := fs.MkdirAllIfNotExist(metadataDir); err != nil {
|
||||
return nil, fmt.Errorf("cannot create %q: %w", metadataDir, err)
|
||||
}
|
||||
fs.MustMkdirIfNotExist(metadataDir)
|
||||
s.minTimestampForCompositeIndex = mustGetMinTimestampForCompositeIndex(metadataDir, isEmptyDB)
|
||||
|
||||
// Load indexdb
|
||||
idbPath := filepath.Join(path, indexdbDirname)
|
||||
idbSnapshotsPath := filepath.Join(idbPath, snapshotsDirname)
|
||||
if err := fs.MkdirAllIfNotExist(idbSnapshotsPath); err != nil {
|
||||
return nil, fmt.Errorf("cannot create %q: %w", idbSnapshotsPath, err)
|
||||
}
|
||||
fs.MustMkdirIfNotExist(idbSnapshotsPath)
|
||||
fs.MustRemoveTemporaryDirs(idbSnapshotsPath)
|
||||
idbCurr, idbPrev, err := s.openIndexDBTables(idbPath)
|
||||
if err != nil {
|
||||
|
@ -342,9 +334,7 @@ func (s *Storage) CreateSnapshot(deadline uint64) (string, error) {
|
|||
snapshotName := snapshot.NewName()
|
||||
srcDir := s.path
|
||||
dstDir := filepath.Join(srcDir, snapshotsDirname, snapshotName)
|
||||
if err := fs.MkdirAllFailIfExist(dstDir); err != nil {
|
||||
return "", fmt.Errorf("cannot create dir %q: %w", dstDir, err)
|
||||
}
|
||||
fs.MustMkdirFailIfExist(dstDir)
|
||||
dirsToRemoveOnError = append(dirsToRemoveOnError, dstDir)
|
||||
|
||||
smallDir, bigDir, err := s.tb.CreateSnapshot(snapshotName, deadline)
|
||||
|
@ -354,9 +344,7 @@ func (s *Storage) CreateSnapshot(deadline uint64) (string, error) {
|
|||
dirsToRemoveOnError = append(dirsToRemoveOnError, smallDir, bigDir)
|
||||
|
||||
dstDataDir := filepath.Join(dstDir, dataDirname)
|
||||
if err := fs.MkdirAllFailIfExist(dstDataDir); err != nil {
|
||||
return "", fmt.Errorf("cannot create dir %q: %w", dstDataDir, err)
|
||||
}
|
||||
fs.MustMkdirFailIfExist(dstDataDir)
|
||||
dstSmallDir := filepath.Join(dstDataDir, smallDirname)
|
||||
if err := fs.SymlinkRelative(smallDir, dstSmallDir); err != nil {
|
||||
return "", fmt.Errorf("cannot create symlink from %q to %q: %w", smallDir, dstSmallDir, err)
|
||||
|
@ -1896,10 +1884,7 @@ func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, preci
|
|||
if err != nil {
|
||||
err = fmt.Errorf("cannot update per-date data: %w", err)
|
||||
} else {
|
||||
err = s.tb.AddRows(rows)
|
||||
if err != nil {
|
||||
err = fmt.Errorf("cannot add rows to table: %w", err)
|
||||
}
|
||||
s.tb.MustAddRows(rows)
|
||||
}
|
||||
if err != nil {
|
||||
return fmt.Errorf("error occurred during rows addition: %w", err)
|
||||
|
@ -2507,9 +2492,7 @@ func (s *Storage) putTSIDToCache(tsid *generationTSID, metricName []byte) {
|
|||
}
|
||||
|
||||
func (s *Storage) openIndexDBTables(path string) (curr, prev *indexDB, err error) {
|
||||
if err := fs.MkdirAllIfNotExist(path); err != nil {
|
||||
return nil, nil, fmt.Errorf("cannot create directory %q: %w", path, err)
|
||||
}
|
||||
fs.MustMkdirIfNotExist(path)
|
||||
fs.MustRemoveTemporaryDirs(path)
|
||||
|
||||
// Search for the two most recent tables - the last one is active,
|
||||
|
|
|
@ -83,9 +83,7 @@ func openTable(path string, s *Storage) (*table, error) {
|
|||
path = filepath.Clean(path)
|
||||
|
||||
// Create a directory for the table if it doesn't exist yet.
|
||||
if err := fs.MkdirAllIfNotExist(path); err != nil {
|
||||
return nil, fmt.Errorf("cannot create directory for table %q: %w", path, err)
|
||||
}
|
||||
fs.MustMkdirIfNotExist(path)
|
||||
|
||||
// Protect from concurrent opens.
|
||||
flockF, err := fs.CreateFlockFile(path)
|
||||
|
@ -96,25 +94,19 @@ func openTable(path string, s *Storage) (*table, error) {
|
|||
|
||||
// Create directories for small and big partitions if they don't exist yet.
|
||||
smallPartitionsPath := filepath.Join(path, smallDirname)
|
||||
if err := fs.MkdirAllIfNotExist(smallPartitionsPath); err != nil {
|
||||
return nil, fmt.Errorf("cannot create directory for small partitions %q: %w", smallPartitionsPath, err)
|
||||
}
|
||||
fs.MustMkdirIfNotExist(smallPartitionsPath)
|
||||
fs.MustRemoveTemporaryDirs(smallPartitionsPath)
|
||||
|
||||
smallSnapshotsPath := filepath.Join(smallPartitionsPath, snapshotsDirname)
|
||||
if err := fs.MkdirAllIfNotExist(smallSnapshotsPath); err != nil {
|
||||
return nil, fmt.Errorf("cannot create %q: %w", smallSnapshotsPath, err)
|
||||
}
|
||||
fs.MustMkdirIfNotExist(smallSnapshotsPath)
|
||||
fs.MustRemoveTemporaryDirs(smallSnapshotsPath)
|
||||
|
||||
bigPartitionsPath := filepath.Join(path, bigDirname)
|
||||
if err := fs.MkdirAllIfNotExist(bigPartitionsPath); err != nil {
|
||||
return nil, fmt.Errorf("cannot create directory for big partitions %q: %w", bigPartitionsPath, err)
|
||||
}
|
||||
fs.MustMkdirIfNotExist(bigPartitionsPath)
|
||||
fs.MustRemoveTemporaryDirs(bigPartitionsPath)
|
||||
|
||||
bigSnapshotsPath := filepath.Join(bigPartitionsPath, snapshotsDirname)
|
||||
if err := fs.MkdirAllIfNotExist(bigSnapshotsPath); err != nil {
|
||||
return nil, fmt.Errorf("cannot create %q: %w", bigSnapshotsPath, err)
|
||||
}
|
||||
fs.MustMkdirIfNotExist(bigSnapshotsPath)
|
||||
fs.MustRemoveTemporaryDirs(bigSnapshotsPath)
|
||||
|
||||
// Open partitions.
|
||||
|
@ -152,14 +144,10 @@ func (tb *table) CreateSnapshot(snapshotName string, deadline uint64) (string, s
|
|||
defer tb.PutPartitions(ptws)
|
||||
|
||||
dstSmallDir := filepath.Join(tb.path, smallDirname, snapshotsDirname, snapshotName)
|
||||
if err := fs.MkdirAllFailIfExist(dstSmallDir); err != nil {
|
||||
return "", "", fmt.Errorf("cannot create dir %q: %w", dstSmallDir, err)
|
||||
}
|
||||
fs.MustMkdirFailIfExist(dstSmallDir)
|
||||
|
||||
dstBigDir := filepath.Join(tb.path, bigDirname, snapshotsDirname, snapshotName)
|
||||
if err := fs.MkdirAllFailIfExist(dstBigDir); err != nil {
|
||||
fs.MustRemoveAll(dstSmallDir)
|
||||
return "", "", fmt.Errorf("cannot create dir %q: %w", dstBigDir, err)
|
||||
}
|
||||
fs.MustMkdirFailIfExist(dstBigDir)
|
||||
|
||||
for _, ptw := range ptws {
|
||||
if deadline > 0 && fasttime.UnixTimestamp() > deadline {
|
||||
|
@ -277,10 +265,10 @@ func (tb *table) ForceMergePartitions(partitionNamePrefix string) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// AddRows adds the given rows to the table tb.
|
||||
func (tb *table) AddRows(rows []rawRow) error {
|
||||
// MustAddRows adds the given rows to the table tb.
|
||||
func (tb *table) MustAddRows(rows []rawRow) {
|
||||
if len(rows) == 0 {
|
||||
return nil
|
||||
return
|
||||
}
|
||||
|
||||
// Verify whether all the rows may be added to a single partition.
|
||||
|
@ -317,7 +305,7 @@ func (tb *table) AddRows(rows []rawRow) error {
|
|||
// Fast path - add all the rows into the ptw.
|
||||
ptw.pt.AddRows(rows)
|
||||
tb.PutPartitions(ptws)
|
||||
return nil
|
||||
return
|
||||
}
|
||||
|
||||
// Slower path - split rows into per-partition buckets.
|
||||
|
@ -343,7 +331,7 @@ func (tb *table) AddRows(rows []rawRow) error {
|
|||
}
|
||||
tb.PutPartitions(ptws)
|
||||
if len(missingRows) == 0 {
|
||||
return nil
|
||||
return
|
||||
}
|
||||
|
||||
// The slowest path - there are rows that don't fit any existing partition.
|
||||
|
@ -372,18 +360,11 @@ func (tb *table) AddRows(rows []rawRow) error {
|
|||
continue
|
||||
}
|
||||
|
||||
pt, err := createPartition(r.Timestamp, tb.smallPartitionsPath, tb.bigPartitionsPath, tb.s)
|
||||
if err != nil {
|
||||
// Return only the first error, since it has no sense in returning all errors.
|
||||
tb.ptwsLock.Unlock()
|
||||
return fmt.Errorf("errors while adding rows to table %q: %w", tb.path, err)
|
||||
}
|
||||
pt := mustCreatePartition(r.Timestamp, tb.smallPartitionsPath, tb.bigPartitionsPath, tb.s)
|
||||
pt.AddRows(missingRows[i : i+1])
|
||||
tb.addPartitionNolock(pt)
|
||||
}
|
||||
tb.ptwsLock.Unlock()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (tb *table) getMinMaxTimestamps() (int64, int64) {
|
||||
|
|
|
@ -194,9 +194,7 @@ func testTableSearchEx(t *testing.T, rng *rand.Rand, trData, trSearch TimeRange,
|
|||
}
|
||||
}()
|
||||
for _, rows := range rowss {
|
||||
if err := tb.AddRows(rows); err != nil {
|
||||
t.Fatalf("cannot add rows to table: %s", err)
|
||||
}
|
||||
tb.MustAddRows(rows)
|
||||
|
||||
// Flush rows to parts.
|
||||
tb.flushPendingRows()
|
||||
|
|
|
@ -95,9 +95,7 @@ func createBenchTable(b *testing.B, path string, startTimestamp int64, rowsPerIn
|
|||
r.Timestamp = int64(ts)
|
||||
r.Value = value
|
||||
}
|
||||
if err := tb.AddRows(rows); err != nil {
|
||||
panic(fmt.Errorf("cannot add %d rows: %w", rowsPerInsert, err))
|
||||
}
|
||||
tb.MustAddRows(rows)
|
||||
}
|
||||
wg.Done()
|
||||
}(k)
|
||||
|
|
|
@ -80,9 +80,7 @@ func benchmarkTableAddRows(b *testing.B, rowsPerInsert, tsidsCount int) {
|
|||
r.Value++
|
||||
}
|
||||
// Add updated rowsCopy.
|
||||
if err := tb.AddRows(rowsCopy); err != nil {
|
||||
panic(fmt.Errorf("cannot add rows to table %q: %w", tablePath, err))
|
||||
}
|
||||
tb.MustAddRows(rowsCopy)
|
||||
}
|
||||
|
||||
doneCh <- struct{}{}
|
||||
|
|
Loading…
Reference in a new issue