mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
lib/storage: enhancements for snapshots process (#3873)
* lib/{fs,mergeset,storage}: skip `.must-remove.` dirs when creating snapshot (#3858) * lib/{mergeset,storage}: add timeout configuration for snapshots creation, remove incomplete snapshots from storage * docs: fix formatting * app/vmstorage: add metrics to track status of snapshots * app/vmstorage: use `vm_http_requests_total` metric for snapshot endpoints metrics, rename new flag to make name more clear Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com> * app/vmstorage: update flag name in docs Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com> * app/vmstorage: reflect new metrics names change in docs Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com> --------- Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com> Co-authored-by: Aliaksandr Valialkin <valyala@victoriametrics.com>
This commit is contained in:
parent
1db010797e
commit
26682e369e
9 changed files with 84 additions and 22 deletions
|
@ -12,6 +12,7 @@ import (
|
|||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmstorage/servers"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/buildinfo"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/envflag"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
|
||||
|
@ -29,13 +30,14 @@ var (
|
|||
httpListenAddr = flag.String("httpListenAddr", ":8482", "Address to listen for http connections. See also -httpListenAddr.useProxyProtocol")
|
||||
useProxyProtocol = flag.Bool("httpListenAddr.useProxyProtocol", false, "Whether to use proxy protocol for connections accepted at -httpListenAddr . "+
|
||||
"See https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt")
|
||||
storageDataPath = flag.String("storageDataPath", "vmstorage-data", "Path to storage data")
|
||||
vminsertAddr = flag.String("vminsertAddr", ":8400", "TCP address to accept connections from vminsert services")
|
||||
vmselectAddr = flag.String("vmselectAddr", ":8401", "TCP address to accept connections from vmselect services")
|
||||
snapshotAuthKey = flag.String("snapshotAuthKey", "", "authKey, which must be passed in query string to /snapshot* pages")
|
||||
forceMergeAuthKey = flag.String("forceMergeAuthKey", "", "authKey, which must be passed in query string to /internal/force_merge pages")
|
||||
forceFlushAuthKey = flag.String("forceFlushAuthKey", "", "authKey, which must be passed in query string to /internal/force_flush pages")
|
||||
snapshotsMaxAge = flagutil.NewDuration("snapshotsMaxAge", "0", "Automatically delete snapshots older than -snapshotsMaxAge if it is set to non-zero duration. Make sure that backup process has enough time to finish the backup before the corresponding snapshot is automatically deleted")
|
||||
storageDataPath = flag.String("storageDataPath", "vmstorage-data", "Path to storage data")
|
||||
vminsertAddr = flag.String("vminsertAddr", ":8400", "TCP address to accept connections from vminsert services")
|
||||
vmselectAddr = flag.String("vmselectAddr", ":8401", "TCP address to accept connections from vmselect services")
|
||||
snapshotAuthKey = flag.String("snapshotAuthKey", "", "authKey, which must be passed in query string to /snapshot* pages")
|
||||
forceMergeAuthKey = flag.String("forceMergeAuthKey", "", "authKey, which must be passed in query string to /internal/force_merge pages")
|
||||
forceFlushAuthKey = flag.String("forceFlushAuthKey", "", "authKey, which must be passed in query string to /internal/force_flush pages")
|
||||
snapshotsMaxAge = flagutil.NewDuration("snapshotsMaxAge", "0", "Automatically delete snapshots older than -snapshotsMaxAge if it is set to non-zero duration. Make sure that backup process has enough time to finish the backup before the corresponding snapshot is automatically deleted")
|
||||
snapshotCreateTimeout = flagutil.NewDuration("snapshotCreateTimeout", "0", "Defines timeout value for process of creating new snapshot if it is set to non-zero duration. If set, make sure that timeout is lower than backup period.")
|
||||
|
||||
finalMergeDelay = flag.Duration("finalMergeDelay", 0, "The delay before starting final merge for per-month partition after no new data is ingested into it. "+
|
||||
"Final merge may require additional disk IO and CPU resources. Final merge may increase query speed and reduce disk space usage in some cases. "+
|
||||
|
@ -208,21 +210,29 @@ func requestHandler(w http.ResponseWriter, r *http.Request, strg *storage.Storag
|
|||
|
||||
switch path {
|
||||
case "/create":
|
||||
snapshotsCreateTotal.Inc()
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
snapshotPath, err := strg.CreateSnapshot()
|
||||
deadline := uint64(0)
|
||||
if snapshotCreateTimeout.Msecs > 0 {
|
||||
deadline = fasttime.UnixTimestamp() + uint64(snapshotCreateTimeout.Msecs/1e3)
|
||||
}
|
||||
snapshotPath, err := strg.CreateSnapshot(deadline)
|
||||
if err != nil {
|
||||
err = fmt.Errorf("cannot create snapshot: %w", err)
|
||||
jsonResponseError(w, err)
|
||||
snapshotsCreateErrorsTotal.Inc()
|
||||
return true
|
||||
}
|
||||
fmt.Fprintf(w, `{"status":"ok","snapshot":%q}`, snapshotPath)
|
||||
return true
|
||||
case "/list":
|
||||
snapshotsListTotal.Inc()
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
snapshots, err := strg.ListSnapshots()
|
||||
if err != nil {
|
||||
err = fmt.Errorf("cannot list snapshots: %w", err)
|
||||
jsonResponseError(w, err)
|
||||
snapshotsListErrorsTotal.Inc()
|
||||
return true
|
||||
}
|
||||
fmt.Fprintf(w, `{"status":"ok","snapshots":[`)
|
||||
|
@ -235,6 +245,7 @@ func requestHandler(w http.ResponseWriter, r *http.Request, strg *storage.Storag
|
|||
fmt.Fprintf(w, `]}`)
|
||||
return true
|
||||
case "/delete":
|
||||
snapshotsDeleteTotal.Inc()
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
snapshotName := r.FormValue("snapshot")
|
||||
|
||||
|
@ -242,6 +253,7 @@ func requestHandler(w http.ResponseWriter, r *http.Request, strg *storage.Storag
|
|||
if err != nil {
|
||||
err = fmt.Errorf("cannot list snapshots: %w", err)
|
||||
jsonResponseError(w, err)
|
||||
snapshotsDeleteErrorsTotal.Inc()
|
||||
return true
|
||||
}
|
||||
for _, snName := range snapshots {
|
||||
|
@ -249,6 +261,7 @@ func requestHandler(w http.ResponseWriter, r *http.Request, strg *storage.Storag
|
|||
if err := strg.DeleteSnapshot(snName); err != nil {
|
||||
err = fmt.Errorf("cannot delete snapshot %q: %w", snName, err)
|
||||
jsonResponseError(w, err)
|
||||
snapshotsDeleteErrorsTotal.Inc()
|
||||
return true
|
||||
}
|
||||
fmt.Fprintf(w, `{"status":"ok"}`)
|
||||
|
@ -260,17 +273,20 @@ func requestHandler(w http.ResponseWriter, r *http.Request, strg *storage.Storag
|
|||
jsonResponseError(w, err)
|
||||
return true
|
||||
case "/delete_all":
|
||||
snapshotsDeleteAllTotal.Inc()
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
snapshots, err := strg.ListSnapshots()
|
||||
if err != nil {
|
||||
err = fmt.Errorf("cannot list snapshots: %w", err)
|
||||
jsonResponseError(w, err)
|
||||
snapshotsDeleteAllErrorsTotal.Inc()
|
||||
return true
|
||||
}
|
||||
for _, snapshotName := range snapshots {
|
||||
if err := strg.DeleteSnapshot(snapshotName); err != nil {
|
||||
err = fmt.Errorf("cannot delete snapshot %q: %w", snapshotName, err)
|
||||
jsonResponseError(w, err)
|
||||
snapshotsDeleteAllErrorsTotal.Inc()
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
@ -316,7 +332,20 @@ var (
|
|||
staleSnapshotsRemoverWG sync.WaitGroup
|
||||
)
|
||||
|
||||
var activeForceMerges = metrics.NewCounter("vm_active_force_merges")
|
||||
var (
|
||||
activeForceMerges = metrics.NewCounter("vm_active_force_merges")
|
||||
snapshotsCreateTotal = metrics.NewCounter(`vm_http_requests_total{path="/snapshot/create"}`)
|
||||
snapshotsCreateErrorsTotal = metrics.NewCounter(`vm_http_request_errors_total{path="/snapshot/create"}`)
|
||||
|
||||
snapshotsListTotal = metrics.NewCounter(`vm_http_requests_total{path="/snapshot/list"}`)
|
||||
snapshotsListErrorsTotal = metrics.NewCounter(`vm_http_request_errors_total{path="/snapshot/list"}`)
|
||||
|
||||
snapshotsDeleteTotal = metrics.NewCounter(`vm_http_requests_total{path="/snapshot/delete"}`)
|
||||
snapshotsDeleteErrorsTotal = metrics.NewCounter(`vm_http_request_errors_total{path="/snapshot/delete"}`)
|
||||
|
||||
snapshotsDeleteAllTotal = metrics.NewCounter(`vm_http_requests_total{path="/snapshot/delete_all"}`)
|
||||
snapshotsDeleteAllErrorsTotal = metrics.NewCounter(`vm_http_request_errors_total{path="/snapshot/delete_all"}`)
|
||||
)
|
||||
|
||||
func registerStorageMetrics(strg *storage.Storage) {
|
||||
mCache := &storage.Metrics{}
|
||||
|
|
|
@ -15,12 +15,15 @@ The following tip changes can be tested by building VictoriaMetrics components f
|
|||
|
||||
## tip
|
||||
|
||||
* FEATURE: add `-snapshotCreateTimeout` flag to allow configuring timeout for snapshot process. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3551).
|
||||
* FEATURE: expose `vm_http_requests_total` and `vm_http_request_errors_total` metrics for `snapshot/*` paths at [VictoriaMetrics cluster](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html) `vmstorage` and [VictoriaMetrics Single](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3551).
|
||||
* FEATURE: [vmgateway](https://docs.victoriametrics.com/vmgateway.html): add the ability to discover keys for JWT verification via [OpenID discovery endpoint](https://openid.net/specs/openid-connect-discovery-1_0.html). See [these docs](https://docs.victoriametrics.com/vmgateway.html#using-openid-discovery-endpoint-for-jwt-signature-verification).
|
||||
|
||||
* BUGFIX: [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html): fix panic when executing the query `aggr_func(rollup*(some_value))`. The panic has been introduced in [v1.88.0](https://docs.victoriametrics.com/CHANGELOG.html#v1880).
|
||||
* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): use the provided `-remoteWrite.*` auth options when determining whether the remote storage supports [VictoriaMetrics remote write protocol](https://docs.victoriametrics.com/vmagent.html#victoriametrics-remote-write-protocol). Previously the auth options were ignored. This was preventing from automatic switch to VictoriaMetrics remote write protocol.
|
||||
* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): do not register `vm_promscrape_config_*` metrics if `-promscrape.config` flag is not used. Previously those metrics were registered and never updated, which was confusing and could trigger false-positive alerts.
|
||||
* BUGFIX: [vmctl](https://docs.victoriametrics.com/vmctl.html): skip measurements with no fields when migrating data from influxdb. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3837).
|
||||
* BUGFIX: delete failed snapshot contents from disk when creating snapshot fails. Previously failed snapshot contents could remain on disk in incomplete state. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3858)
|
||||
|
||||
## [v1.88.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.88.0)
|
||||
|
||||
|
@ -47,7 +50,6 @@ Released at 2023-02-24
|
|||
* FEATURE: [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html): add `range_trim_zscore(z, q)` function for dropping outliers located farther than `z*range_stddev(q)` from `range_avg(q)`. This should help removing outliers during query time at [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3759).
|
||||
* FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): show `median` instead of `avg` in graph tooltip and line legend, since `median` is more tolerant against spikes. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3706).
|
||||
* FEATURE: add `-search.maxSeriesPerAggrFunc` command-line flag, which can be used for limiting the number of time series [MetricsQL aggregate functions](https://docs.victoriametrics.com/MetricsQL.html#aggregate-functions) can return in a single query. This flag can be useful for preventing OOMs when [count_values](https://docs.victoriametrics.com/MetricsQL.html#count_values) function is improperly used.
|
||||
|
||||
* FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): small UX improvements for mobile view. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3707) and [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/3848).
|
||||
* FEATURE: add `-search.logQueryMemoryUsage` command-line flag for logging queries, which need more memory than specified by this command-line flag. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3553). Thanks to @michal-kralik for the idea and the intial implementation.
|
||||
* FEATURE: allow setting zero value for `-search.latencyOffset` command-line flag. This may be needed in [some cases](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2061#issuecomment-1299109836). Previously the minimum supported value for `-search.latencyOffset` command-line flag was `1s`.
|
||||
|
|
|
@ -1361,6 +1361,9 @@ Below is the output for `/path/to/vmstorage -help`:
|
|||
The maximum number of CPU cores to use for small merges. Default value is used if set to 0
|
||||
-snapshotAuthKey string
|
||||
authKey, which must be passed in query string to /snapshot* pages
|
||||
-snapshotCreateTimeout value
|
||||
Defines timeout value for process of creating new snapshot if it is set to non-zero duration. If set, make sure that timeout is lower than backup period.
|
||||
The following optional suffixes are supported: h (hour), d (day), w (week), y (year). If suffix isn't set, then the duration is counted in months (default 0)
|
||||
-snapshotsMaxAge value
|
||||
Automatically delete snapshots older than -snapshotsMaxAge if it is set to non-zero duration. Make sure that backup process has enough time to finish the backup before the corresponding snapshot is automatically deleted
|
||||
The following optional suffixes are supported: h (hour), d (day), w (week), y (year). If suffix isn't set, then the duration is counted in months (default 0)
|
||||
|
|
|
@ -2485,6 +2485,9 @@ Pass `-help` to VictoriaMetrics in order to see the list of supported command-li
|
|||
The maximum number of CPU cores to use for small merges. Default value is used if set to 0
|
||||
-snapshotAuthKey string
|
||||
authKey, which must be passed in query string to /snapshot* pages
|
||||
-snapshotCreateTimeout value
|
||||
Defines timeout value for process of creating new snapshot if it is set to non-zero duration. If set, make sure that timeout is lower than backup period.
|
||||
The following optional suffixes are supported: h (hour), d (day), w (week), y (year). If suffix isn't set, then the duration is counted in months (default 0)
|
||||
-snapshotsMaxAge value
|
||||
Automatically delete snapshots older than -snapshotsMaxAge if it is set to non-zero duration. Make sure that backup process has enough time to finish the backup before the corresponding snapshot is automatically deleted
|
||||
The following optional suffixes are supported: h (hour), d (day), w (week), y (year). If suffix isn't set, then the duration is counted in months (default 0)
|
||||
|
|
|
@ -1503,7 +1503,10 @@ func mustCloseParts(pws []*partWrapper) {
|
|||
//
|
||||
// Snapshot is created using linux hard links, so it is usually created
|
||||
// very quickly.
|
||||
func (tb *Table) CreateSnapshotAt(dstDir string) error {
|
||||
//
|
||||
// If deadline is reached before snapshot is created error is returned.
|
||||
// If any error occurs during snapshot created data is not removed.
|
||||
func (tb *Table) CreateSnapshotAt(dstDir string, deadline uint64) error {
|
||||
logger.Infof("creating Table snapshot of %q...", tb.path)
|
||||
startTime := time.Now()
|
||||
|
||||
|
@ -1543,7 +1546,14 @@ func (tb *Table) CreateSnapshotAt(dstDir string) error {
|
|||
if err != nil {
|
||||
return fmt.Errorf("cannot read directory: %w", err)
|
||||
}
|
||||
for _, fi := range fis {
|
||||
|
||||
for i, fi := range fis {
|
||||
if deadline > 0 && i%5 == 0 {
|
||||
if fasttime.UnixTimestamp() > deadline {
|
||||
return fmt.Errorf("cannot create snapshot for %q in time: timeout exceeded", tb.path)
|
||||
}
|
||||
}
|
||||
|
||||
fn := fi.Name()
|
||||
if !fs.IsDirOrSymlink(fi) {
|
||||
// Skip non-directories.
|
||||
|
|
|
@ -150,11 +150,11 @@ func TestTableCreateSnapshotAt(t *testing.T) {
|
|||
|
||||
// Create multiple snapshots.
|
||||
snapshot1 := path + "-test-snapshot1"
|
||||
if err := tb.CreateSnapshotAt(snapshot1); err != nil {
|
||||
if err := tb.CreateSnapshotAt(snapshot1, 0); err != nil {
|
||||
t.Fatalf("cannot create snapshot1: %s", err)
|
||||
}
|
||||
snapshot2 := path + "-test-snapshot2"
|
||||
if err := tb.CreateSnapshotAt(snapshot2); err != nil {
|
||||
if err := tb.CreateSnapshotAt(snapshot2, 0); err != nil {
|
||||
t.Fatalf("cannot create snapshot2: %s", err)
|
||||
}
|
||||
defer func() {
|
||||
|
|
|
@ -321,7 +321,7 @@ func (s *Storage) DebugFlush() {
|
|||
}
|
||||
|
||||
// CreateSnapshot creates snapshot for s and returns the snapshot name.
|
||||
func (s *Storage) CreateSnapshot() (string, error) {
|
||||
func (s *Storage) CreateSnapshot(deadline uint64) (string, error) {
|
||||
logger.Infof("creating Storage snapshot for %q...", s.path)
|
||||
startTime := time.Now()
|
||||
|
||||
|
@ -339,16 +339,19 @@ func (s *Storage) CreateSnapshot() (string, error) {
|
|||
return "", fmt.Errorf("cannot create dir %q: %w", dstDataDir, err)
|
||||
}
|
||||
|
||||
smallDir, bigDir, err := s.tb.CreateSnapshot(snapshotName)
|
||||
smallDir, bigDir, err := s.tb.CreateSnapshot(snapshotName, deadline)
|
||||
if err != nil {
|
||||
fs.MustRemoveAll(dstDir)
|
||||
return "", fmt.Errorf("cannot create table snapshot: %w", err)
|
||||
}
|
||||
dstSmallDir := dstDataDir + "/small"
|
||||
if err := fs.SymlinkRelative(smallDir, dstSmallDir); err != nil {
|
||||
fs.MustRemoveAll(dstDir)
|
||||
return "", fmt.Errorf("cannot create symlink from %q to %q: %w", smallDir, dstSmallDir, err)
|
||||
}
|
||||
dstBigDir := dstDataDir + "/big"
|
||||
if err := fs.SymlinkRelative(bigDir, dstBigDir); err != nil {
|
||||
fs.MustRemoveAll(dstDir)
|
||||
return "", fmt.Errorf("cannot create symlink from %q to %q: %w", bigDir, dstBigDir, err)
|
||||
}
|
||||
fs.MustSyncPath(dstDataDir)
|
||||
|
@ -356,24 +359,28 @@ func (s *Storage) CreateSnapshot() (string, error) {
|
|||
idbSnapshot := fmt.Sprintf("%s/indexdb/snapshots/%s", srcDir, snapshotName)
|
||||
idb := s.idb()
|
||||
currSnapshot := idbSnapshot + "/" + idb.name
|
||||
if err := idb.tb.CreateSnapshotAt(currSnapshot); err != nil {
|
||||
if err := idb.tb.CreateSnapshotAt(currSnapshot, deadline); err != nil {
|
||||
fs.MustRemoveAll(dstDir)
|
||||
return "", fmt.Errorf("cannot create curr indexDB snapshot: %w", err)
|
||||
}
|
||||
ok := idb.doExtDB(func(extDB *indexDB) {
|
||||
prevSnapshot := idbSnapshot + "/" + extDB.name
|
||||
err = extDB.tb.CreateSnapshotAt(prevSnapshot)
|
||||
err = extDB.tb.CreateSnapshotAt(prevSnapshot, deadline)
|
||||
})
|
||||
if ok && err != nil {
|
||||
fs.MustRemoveAll(dstDir)
|
||||
return "", fmt.Errorf("cannot create prev indexDB snapshot: %w", err)
|
||||
}
|
||||
dstIdbDir := dstDir + "/indexdb"
|
||||
if err := fs.SymlinkRelative(idbSnapshot, dstIdbDir); err != nil {
|
||||
fs.MustRemoveAll(dstDir)
|
||||
return "", fmt.Errorf("cannot create symlink from %q to %q: %w", idbSnapshot, dstIdbDir, err)
|
||||
}
|
||||
|
||||
srcMetadataDir := srcDir + "/metadata"
|
||||
dstMetadataDir := dstDir + "/metadata"
|
||||
if err := fs.CopyDirectory(srcMetadataDir, dstMetadataDir); err != nil {
|
||||
fs.MustRemoveAll(dstDir)
|
||||
return "", fmt.Errorf("cannot copy metadata: %w", err)
|
||||
}
|
||||
|
||||
|
|
|
@ -1165,7 +1165,7 @@ func testStorageAddRows(rng *rand.Rand, s *Storage) error {
|
|||
}
|
||||
|
||||
// Try creating a snapshot from the storage.
|
||||
snapshotName, err := s.CreateSnapshot()
|
||||
snapshotName, err := s.CreateSnapshot(0)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot create snapshot from the storage: %w", err)
|
||||
}
|
||||
|
@ -1332,7 +1332,7 @@ func TestStorageDeleteStaleSnapshots(t *testing.T) {
|
|||
}
|
||||
}
|
||||
// Try creating a snapshot from the storage.
|
||||
snapshotName, err := s.CreateSnapshot()
|
||||
snapshotName, err := s.CreateSnapshot(0)
|
||||
if err != nil {
|
||||
t.Fatalf("cannot create snapshot from the storage: %s", err)
|
||||
}
|
||||
|
|
|
@ -142,7 +142,9 @@ func openTable(path string, s *Storage) (*table, error) {
|
|||
}
|
||||
|
||||
// CreateSnapshot creates tb snapshot and returns paths to small and big parts of it.
|
||||
func (tb *table) CreateSnapshot(snapshotName string) (string, string, error) {
|
||||
// If deadline is reached before snapshot is created error is returned.
|
||||
// If any error occurs during snapshot created data is not removed.
|
||||
func (tb *table) CreateSnapshot(snapshotName string, deadline uint64) (string, string, error) {
|
||||
logger.Infof("creating table snapshot of %q...", tb.path)
|
||||
startTime := time.Now()
|
||||
|
||||
|
@ -158,7 +160,13 @@ func (tb *table) CreateSnapshot(snapshotName string) (string, string, error) {
|
|||
return "", "", fmt.Errorf("cannot create dir %q: %w", dstBigDir, err)
|
||||
}
|
||||
|
||||
for _, ptw := range ptws {
|
||||
for i, ptw := range ptws {
|
||||
if deadline > 0 && i%5 == 0 {
|
||||
if fasttime.UnixTimestamp() > deadline {
|
||||
return "", "", fmt.Errorf("cannot create snapshot for %q in %q in time: timeout exceeded", tb.path, snapshotName)
|
||||
}
|
||||
}
|
||||
|
||||
smallPath := dstSmallDir + "/" + ptw.pt.name
|
||||
bigPath := dstBigDir + "/" + ptw.pt.name
|
||||
if err := ptw.pt.CreateSnapshotAt(smallPath, bigPath); err != nil {
|
||||
|
|
Loading…
Reference in a new issue