mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-02-09 15:27:11 +00:00
lib/storage: serialize snapshot creation process with mutex
This guarantees that the snapshot contains all the recently added data from inmemory buffers when multiple concurrent calls to Storage.CreateSnapshot are performed.
This commit is contained in:
parent
c31b956355
commit
7cdac6634c
2 changed files with 40 additions and 23 deletions
|
@ -80,6 +80,12 @@ type Storage struct {
|
|||
currHourMetricIDsUpdaterWG sync.WaitGroup
|
||||
retentionWatcherWG sync.WaitGroup
|
||||
prefetchedMetricIDsCleanerWG sync.WaitGroup
|
||||
|
||||
// The snapshotLock prevents from concurrent creation of snapshots,
|
||||
// since this may result in snapshots without recently added data,
|
||||
// which may be in the process of flushing to disk by concurrently running
|
||||
// snapshot process.
|
||||
snapshotLock sync.Mutex
|
||||
}
|
||||
|
||||
type pendingHourMetricIDEntry struct {
|
||||
|
@ -188,6 +194,9 @@ func (s *Storage) CreateSnapshot() (string, error) {
|
|||
logger.Infof("creating Storage snapshot for %q...", s.path)
|
||||
startTime := time.Now()
|
||||
|
||||
s.snapshotLock.Lock()
|
||||
defer s.snapshotLock.Unlock()
|
||||
|
||||
snapshotName := fmt.Sprintf("%s-%08X", time.Now().UTC().Format("20060102150405"), nextSnapshotIdx())
|
||||
srcDir := s.path
|
||||
dstDir := fmt.Sprintf("%s/snapshots/%s", srcDir, snapshotName)
|
||||
|
|
|
@ -715,35 +715,43 @@ func checkTagKeys(tks []string, tksExpected map[string]bool) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func TestStorageAddRows(t *testing.T) {
|
||||
path := "TestStorageAddRows"
|
||||
func TestStorageAddRowsSerial(t *testing.T) {
|
||||
path := "TestStorageAddRowsSerial"
|
||||
s, err := OpenStorage(path, 0)
|
||||
if err != nil {
|
||||
t.Fatalf("cannot open storage: %s", err)
|
||||
}
|
||||
t.Run("serial", func(t *testing.T) {
|
||||
if err := testStorageAddRows(s); err != nil {
|
||||
t.Fatalf("unexpected error: %s", err)
|
||||
}
|
||||
})
|
||||
t.Run("concurrent", func(t *testing.T) {
|
||||
ch := make(chan error, 3)
|
||||
for i := 0; i < cap(ch); i++ {
|
||||
go func() {
|
||||
ch <- testStorageAddRows(s)
|
||||
}()
|
||||
}
|
||||
for i := 0; i < cap(ch); i++ {
|
||||
select {
|
||||
case err := <-ch:
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %s", err)
|
||||
}
|
||||
case <-time.After(3 * time.Second):
|
||||
t.Fatalf("timeout")
|
||||
if err := testStorageAddRows(s); err != nil {
|
||||
t.Fatalf("unexpected error: %s", err)
|
||||
}
|
||||
s.MustClose()
|
||||
if err := os.RemoveAll(path); err != nil {
|
||||
t.Fatalf("cannot remove %q: %s", path, err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestStorageAddRowsConcurrent(t *testing.T) {
|
||||
path := "TestStorageAddRowsConcurrent"
|
||||
s, err := OpenStorage(path, 0)
|
||||
if err != nil {
|
||||
t.Fatalf("cannot open storage: %s", err)
|
||||
}
|
||||
ch := make(chan error, 3)
|
||||
for i := 0; i < cap(ch); i++ {
|
||||
go func() {
|
||||
ch <- testStorageAddRows(s)
|
||||
}()
|
||||
}
|
||||
for i := 0; i < cap(ch); i++ {
|
||||
select {
|
||||
case err := <-ch:
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %s", err)
|
||||
}
|
||||
case <-time.After(10 * time.Second):
|
||||
t.Fatalf("timeout")
|
||||
}
|
||||
})
|
||||
}
|
||||
s.MustClose()
|
||||
if err := os.RemoveAll(path); err != nil {
|
||||
t.Fatalf("cannot remove %q: %s", path, err)
|
||||
|
|
Loading…
Reference in a new issue