mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-03-11 15:34:56 +00:00
lib/storage: serialize snapshot creation process with mutex
This guarantees that the snapshot contains all the recently added data from inmemory buffers when multiple concurrent calls to Storage.CreateSnapshot are performed.
This commit is contained in:
parent
97fb0edd07
commit
f3e0c55ea1
2 changed files with 40 additions and 23 deletions
|
@ -80,6 +80,12 @@ type Storage struct {
|
|||
currHourMetricIDsUpdaterWG sync.WaitGroup
|
||||
retentionWatcherWG sync.WaitGroup
|
||||
prefetchedMetricIDsCleanerWG sync.WaitGroup
|
||||
|
||||
// The snapshotLock prevents from concurrent creation of snapshots,
|
||||
// since this may result in snapshots without recently added data,
|
||||
// which may be in the process of flushing to disk by concurrently running
|
||||
// snapshot process.
|
||||
snapshotLock sync.Mutex
|
||||
}
|
||||
|
||||
// OpenStorage opens storage on the given path with the given number of retention months.
|
||||
|
@ -178,6 +184,9 @@ func (s *Storage) CreateSnapshot() (string, error) {
|
|||
logger.Infof("creating Storage snapshot for %q...", s.path)
|
||||
startTime := time.Now()
|
||||
|
||||
s.snapshotLock.Lock()
|
||||
defer s.snapshotLock.Unlock()
|
||||
|
||||
snapshotName := fmt.Sprintf("%s-%08X", time.Now().UTC().Format("20060102150405"), nextSnapshotIdx())
|
||||
srcDir := s.path
|
||||
dstDir := fmt.Sprintf("%s/snapshots/%s", srcDir, snapshotName)
|
||||
|
|
|
@ -675,35 +675,43 @@ func checkTagKeys(tks []string, tksExpected map[string]bool) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func TestStorageAddRows(t *testing.T) {
|
||||
path := "TestStorageAddRows"
|
||||
func TestStorageAddRowsSerial(t *testing.T) {
|
||||
path := "TestStorageAddRowsSerial"
|
||||
s, err := OpenStorage(path, 0)
|
||||
if err != nil {
|
||||
t.Fatalf("cannot open storage: %s", err)
|
||||
}
|
||||
t.Run("serial", func(t *testing.T) {
|
||||
if err := testStorageAddRows(s); err != nil {
|
||||
t.Fatalf("unexpected error: %s", err)
|
||||
}
|
||||
})
|
||||
t.Run("concurrent", func(t *testing.T) {
|
||||
ch := make(chan error, 3)
|
||||
for i := 0; i < cap(ch); i++ {
|
||||
go func() {
|
||||
ch <- testStorageAddRows(s)
|
||||
}()
|
||||
}
|
||||
for i := 0; i < cap(ch); i++ {
|
||||
select {
|
||||
case err := <-ch:
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %s", err)
|
||||
}
|
||||
case <-time.After(3 * time.Second):
|
||||
t.Fatalf("timeout")
|
||||
if err := testStorageAddRows(s); err != nil {
|
||||
t.Fatalf("unexpected error: %s", err)
|
||||
}
|
||||
s.MustClose()
|
||||
if err := os.RemoveAll(path); err != nil {
|
||||
t.Fatalf("cannot remove %q: %s", path, err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestStorageAddRowsConcurrent(t *testing.T) {
|
||||
path := "TestStorageAddRowsConcurrent"
|
||||
s, err := OpenStorage(path, 0)
|
||||
if err != nil {
|
||||
t.Fatalf("cannot open storage: %s", err)
|
||||
}
|
||||
ch := make(chan error, 3)
|
||||
for i := 0; i < cap(ch); i++ {
|
||||
go func() {
|
||||
ch <- testStorageAddRows(s)
|
||||
}()
|
||||
}
|
||||
for i := 0; i < cap(ch); i++ {
|
||||
select {
|
||||
case err := <-ch:
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %s", err)
|
||||
}
|
||||
case <-time.After(10 * time.Second):
|
||||
t.Fatalf("timeout")
|
||||
}
|
||||
})
|
||||
}
|
||||
s.MustClose()
|
||||
if err := os.RemoveAll(path); err != nil {
|
||||
t.Fatalf("cannot remove %q: %s", path, err)
|
||||
|
|
Loading…
Reference in a new issue