diff --git a/app/vmstorage/main.go b/app/vmstorage/main.go index 807a1de88..64c6fbc4e 100644 --- a/app/vmstorage/main.go +++ b/app/vmstorage/main.go @@ -12,6 +12,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/app/vmstorage/servers" "github.com/VictoriaMetrics/VictoriaMetrics/lib/buildinfo" "github.com/VictoriaMetrics/VictoriaMetrics/lib/envflag" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fs" "github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver" @@ -29,13 +30,14 @@ var ( httpListenAddr = flag.String("httpListenAddr", ":8482", "Address to listen for http connections. See also -httpListenAddr.useProxyProtocol") useProxyProtocol = flag.Bool("httpListenAddr.useProxyProtocol", false, "Whether to use proxy protocol for connections accepted at -httpListenAddr . "+ "See https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt") - storageDataPath = flag.String("storageDataPath", "vmstorage-data", "Path to storage data") - vminsertAddr = flag.String("vminsertAddr", ":8400", "TCP address to accept connections from vminsert services") - vmselectAddr = flag.String("vmselectAddr", ":8401", "TCP address to accept connections from vmselect services") - snapshotAuthKey = flag.String("snapshotAuthKey", "", "authKey, which must be passed in query string to /snapshot* pages") - forceMergeAuthKey = flag.String("forceMergeAuthKey", "", "authKey, which must be passed in query string to /internal/force_merge pages") - forceFlushAuthKey = flag.String("forceFlushAuthKey", "", "authKey, which must be passed in query string to /internal/force_flush pages") - snapshotsMaxAge = flagutil.NewDuration("snapshotsMaxAge", "0", "Automatically delete snapshots older than -snapshotsMaxAge if it is set to non-zero duration. Make sure that backup process has enough time to finish the backup before the corresponding snapshot is automatically deleted") + storageDataPath = flag.String("storageDataPath", "vmstorage-data", "Path to storage data") + vminsertAddr = flag.String("vminsertAddr", ":8400", "TCP address to accept connections from vminsert services") + vmselectAddr = flag.String("vmselectAddr", ":8401", "TCP address to accept connections from vmselect services") + snapshotAuthKey = flag.String("snapshotAuthKey", "", "authKey, which must be passed in query string to /snapshot* pages") + forceMergeAuthKey = flag.String("forceMergeAuthKey", "", "authKey, which must be passed in query string to /internal/force_merge pages") + forceFlushAuthKey = flag.String("forceFlushAuthKey", "", "authKey, which must be passed in query string to /internal/force_flush pages") + snapshotsMaxAge = flagutil.NewDuration("snapshotsMaxAge", "0", "Automatically delete snapshots older than -snapshotsMaxAge if it is set to non-zero duration. Make sure that backup process has enough time to finish the backup before the corresponding snapshot is automatically deleted") + snapshotCreateTimeout = flagutil.NewDuration("snapshotCreateTimeout", "0", "Defines timeout value for process of creating new snapshot if it is set to non-zero duration. If set, make sure that timeout is lower than backup period.") finalMergeDelay = flag.Duration("finalMergeDelay", 0, "The delay before starting final merge for per-month partition after no new data is ingested into it. "+ "Final merge may require additional disk IO and CPU resources. Final merge may increase query speed and reduce disk space usage in some cases. "+ @@ -208,21 +210,29 @@ func requestHandler(w http.ResponseWriter, r *http.Request, strg *storage.Storag switch path { case "/create": + snapshotsCreateTotal.Inc() w.Header().Set("Content-Type", "application/json") - snapshotPath, err := strg.CreateSnapshot() + deadline := uint64(0) + if snapshotCreateTimeout.Msecs > 0 { + deadline = fasttime.UnixTimestamp() + uint64(snapshotCreateTimeout.Msecs/1e3) + } + snapshotPath, err := strg.CreateSnapshot(deadline) if err != nil { err = fmt.Errorf("cannot create snapshot: %w", err) jsonResponseError(w, err) + snapshotsCreateErrorsTotal.Inc() return true } fmt.Fprintf(w, `{"status":"ok","snapshot":%q}`, snapshotPath) return true case "/list": + snapshotsListTotal.Inc() w.Header().Set("Content-Type", "application/json") snapshots, err := strg.ListSnapshots() if err != nil { err = fmt.Errorf("cannot list snapshots: %w", err) jsonResponseError(w, err) + snapshotsListErrorsTotal.Inc() return true } fmt.Fprintf(w, `{"status":"ok","snapshots":[`) @@ -235,6 +245,7 @@ func requestHandler(w http.ResponseWriter, r *http.Request, strg *storage.Storag fmt.Fprintf(w, `]}`) return true case "/delete": + snapshotsDeleteTotal.Inc() w.Header().Set("Content-Type", "application/json") snapshotName := r.FormValue("snapshot") @@ -242,6 +253,7 @@ func requestHandler(w http.ResponseWriter, r *http.Request, strg *storage.Storag if err != nil { err = fmt.Errorf("cannot list snapshots: %w", err) jsonResponseError(w, err) + snapshotsDeleteErrorsTotal.Inc() return true } for _, snName := range snapshots { @@ -249,6 +261,7 @@ func requestHandler(w http.ResponseWriter, r *http.Request, strg *storage.Storag if err := strg.DeleteSnapshot(snName); err != nil { err = fmt.Errorf("cannot delete snapshot %q: %w", snName, err) jsonResponseError(w, err) + snapshotsDeleteErrorsTotal.Inc() return true } fmt.Fprintf(w, `{"status":"ok"}`) @@ -260,17 +273,20 @@ func requestHandler(w http.ResponseWriter, r *http.Request, strg *storage.Storag jsonResponseError(w, err) return true case "/delete_all": + snapshotsDeleteAllTotal.Inc() w.Header().Set("Content-Type", "application/json") snapshots, err := strg.ListSnapshots() if err != nil { err = fmt.Errorf("cannot list snapshots: %w", err) jsonResponseError(w, err) + snapshotsDeleteAllErrorsTotal.Inc() return true } for _, snapshotName := range snapshots { if err := strg.DeleteSnapshot(snapshotName); err != nil { err = fmt.Errorf("cannot delete snapshot %q: %w", snapshotName, err) jsonResponseError(w, err) + snapshotsDeleteAllErrorsTotal.Inc() return true } } @@ -316,7 +332,20 @@ var ( staleSnapshotsRemoverWG sync.WaitGroup ) -var activeForceMerges = metrics.NewCounter("vm_active_force_merges") +var ( + activeForceMerges = metrics.NewCounter("vm_active_force_merges") + snapshotsCreateTotal = metrics.NewCounter(`vm_http_requests_total{path="/snapshot/create"}`) + snapshotsCreateErrorsTotal = metrics.NewCounter(`vm_http_request_errors_total{path="/snapshot/create"}`) + + snapshotsListTotal = metrics.NewCounter(`vm_http_requests_total{path="/snapshot/list"}`) + snapshotsListErrorsTotal = metrics.NewCounter(`vm_http_request_errors_total{path="/snapshot/list"}`) + + snapshotsDeleteTotal = metrics.NewCounter(`vm_http_requests_total{path="/snapshot/delete"}`) + snapshotsDeleteErrorsTotal = metrics.NewCounter(`vm_http_request_errors_total{path="/snapshot/delete"}`) + + snapshotsDeleteAllTotal = metrics.NewCounter(`vm_http_requests_total{path="/snapshot/delete_all"}`) + snapshotsDeleteAllErrorsTotal = metrics.NewCounter(`vm_http_request_errors_total{path="/snapshot/delete_all"}`) +) func registerStorageMetrics(strg *storage.Storage) { mCache := &storage.Metrics{} diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index ce1c622c8..433cb0fdf 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -15,12 +15,15 @@ The following tip changes can be tested by building VictoriaMetrics components f ## tip +* FEATURE: add `-snapshotCreateTimeout` flag to allow configuring timeout for snapshot process. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3551). +* FEATURE: expose `vm_http_requests_total` and `vm_http_request_errors_total` metrics for `snapshot/*` paths at [VictoriaMetrics cluster](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html) `vmstorage` and [VictoriaMetrics Single](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3551). * FEATURE: [vmgateway](https://docs.victoriametrics.com/vmgateway.html): add the ability to discover keys for JWT verification via [OpenID discovery endpoint](https://openid.net/specs/openid-connect-discovery-1_0.html). See [these docs](https://docs.victoriametrics.com/vmgateway.html#using-openid-discovery-endpoint-for-jwt-signature-verification). * BUGFIX: [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html): fix panic when executing the query `aggr_func(rollup*(some_value))`. The panic has been introduced in [v1.88.0](https://docs.victoriametrics.com/CHANGELOG.html#v1880). * BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): use the provided `-remoteWrite.*` auth options when determining whether the remote storage supports [VictoriaMetrics remote write protocol](https://docs.victoriametrics.com/vmagent.html#victoriametrics-remote-write-protocol). Previously the auth options were ignored. This was preventing from automatic switch to VictoriaMetrics remote write protocol. * BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): do not register `vm_promscrape_config_*` metrics if `-promscrape.config` flag is not used. Previously those metrics were registered and never updated, which was confusing and could trigger false-positive alerts. * BUGFIX: [vmctl](https://docs.victoriametrics.com/vmctl.html): skip measurements with no fields when migrating data from influxdb. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3837). +* BUGFIX: delete failed snapshot contents from disk when creating snapshot fails. Previously failed snapshot contents could remain on disk in incomplete state. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3858) ## [v1.88.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.88.0) @@ -47,7 +50,6 @@ Released at 2023-02-24 * FEATURE: [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html): add `range_trim_zscore(z, q)` function for dropping outliers located farther than `z*range_stddev(q)` from `range_avg(q)`. This should help removing outliers during query time at [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3759). * FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): show `median` instead of `avg` in graph tooltip and line legend, since `median` is more tolerant against spikes. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3706). * FEATURE: add `-search.maxSeriesPerAggrFunc` command-line flag, which can be used for limiting the number of time series [MetricsQL aggregate functions](https://docs.victoriametrics.com/MetricsQL.html#aggregate-functions) can return in a single query. This flag can be useful for preventing OOMs when [count_values](https://docs.victoriametrics.com/MetricsQL.html#count_values) function is improperly used. - * FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): small UX improvements for mobile view. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3707) and [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/3848). * FEATURE: add `-search.logQueryMemoryUsage` command-line flag for logging queries, which need more memory than specified by this command-line flag. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3553). Thanks to @michal-kralik for the idea and the intial implementation. * FEATURE: allow setting zero value for `-search.latencyOffset` command-line flag. This may be needed in [some cases](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2061#issuecomment-1299109836). Previously the minimum supported value for `-search.latencyOffset` command-line flag was `1s`. diff --git a/docs/Cluster-VictoriaMetrics.md b/docs/Cluster-VictoriaMetrics.md index 74f2735b5..30c4abcf2 100644 --- a/docs/Cluster-VictoriaMetrics.md +++ b/docs/Cluster-VictoriaMetrics.md @@ -1361,6 +1361,9 @@ Below is the output for `/path/to/vmstorage -help`: The maximum number of CPU cores to use for small merges. Default value is used if set to 0 -snapshotAuthKey string authKey, which must be passed in query string to /snapshot* pages + -snapshotCreateTimeout value + Defines timeout value for process of creating new snapshot if it is set to non-zero duration. If set, make sure that timeout is lower than backup period. + The following optional suffixes are supported: h (hour), d (day), w (week), y (year). If suffix isn't set, then the duration is counted in months (default 0) -snapshotsMaxAge value Automatically delete snapshots older than -snapshotsMaxAge if it is set to non-zero duration. Make sure that backup process has enough time to finish the backup before the corresponding snapshot is automatically deleted The following optional suffixes are supported: h (hour), d (day), w (week), y (year). If suffix isn't set, then the duration is counted in months (default 0) diff --git a/docs/Single-server-VictoriaMetrics.md b/docs/Single-server-VictoriaMetrics.md index 893a58a64..565e22cb4 100644 --- a/docs/Single-server-VictoriaMetrics.md +++ b/docs/Single-server-VictoriaMetrics.md @@ -2485,6 +2485,9 @@ Pass `-help` to VictoriaMetrics in order to see the list of supported command-li The maximum number of CPU cores to use for small merges. Default value is used if set to 0 -snapshotAuthKey string authKey, which must be passed in query string to /snapshot* pages + -snapshotCreateTimeout value + Defines timeout value for process of creating new snapshot if it is set to non-zero duration. If set, make sure that timeout is lower than backup period. + The following optional suffixes are supported: h (hour), d (day), w (week), y (year). If suffix isn't set, then the duration is counted in months (default 0) -snapshotsMaxAge value Automatically delete snapshots older than -snapshotsMaxAge if it is set to non-zero duration. Make sure that backup process has enough time to finish the backup before the corresponding snapshot is automatically deleted The following optional suffixes are supported: h (hour), d (day), w (week), y (year). If suffix isn't set, then the duration is counted in months (default 0) diff --git a/lib/mergeset/table.go b/lib/mergeset/table.go index 7c7d200a1..23158caa4 100644 --- a/lib/mergeset/table.go +++ b/lib/mergeset/table.go @@ -1503,7 +1503,10 @@ func mustCloseParts(pws []*partWrapper) { // // Snapshot is created using linux hard links, so it is usually created // very quickly. -func (tb *Table) CreateSnapshotAt(dstDir string) error { +// +// If deadline is reached before snapshot is created error is returned. +// If any error occurs during snapshot created data is not removed. +func (tb *Table) CreateSnapshotAt(dstDir string, deadline uint64) error { logger.Infof("creating Table snapshot of %q...", tb.path) startTime := time.Now() @@ -1543,7 +1546,14 @@ func (tb *Table) CreateSnapshotAt(dstDir string) error { if err != nil { return fmt.Errorf("cannot read directory: %w", err) } - for _, fi := range fis { + + for i, fi := range fis { + if deadline > 0 && i%5 == 0 { + if fasttime.UnixTimestamp() > deadline { + return fmt.Errorf("cannot create snapshot for %q in time: timeout exceeded", tb.path) + } + } + fn := fi.Name() if !fs.IsDirOrSymlink(fi) { // Skip non-directories. diff --git a/lib/mergeset/table_test.go b/lib/mergeset/table_test.go index 6f3de5dee..d9154786d 100644 --- a/lib/mergeset/table_test.go +++ b/lib/mergeset/table_test.go @@ -150,11 +150,11 @@ func TestTableCreateSnapshotAt(t *testing.T) { // Create multiple snapshots. snapshot1 := path + "-test-snapshot1" - if err := tb.CreateSnapshotAt(snapshot1); err != nil { + if err := tb.CreateSnapshotAt(snapshot1, 0); err != nil { t.Fatalf("cannot create snapshot1: %s", err) } snapshot2 := path + "-test-snapshot2" - if err := tb.CreateSnapshotAt(snapshot2); err != nil { + if err := tb.CreateSnapshotAt(snapshot2, 0); err != nil { t.Fatalf("cannot create snapshot2: %s", err) } defer func() { diff --git a/lib/storage/storage.go b/lib/storage/storage.go index d3026856a..0c0f3db56 100644 --- a/lib/storage/storage.go +++ b/lib/storage/storage.go @@ -321,7 +321,7 @@ func (s *Storage) DebugFlush() { } // CreateSnapshot creates snapshot for s and returns the snapshot name. -func (s *Storage) CreateSnapshot() (string, error) { +func (s *Storage) CreateSnapshot(deadline uint64) (string, error) { logger.Infof("creating Storage snapshot for %q...", s.path) startTime := time.Now() @@ -339,16 +339,19 @@ func (s *Storage) CreateSnapshot() (string, error) { return "", fmt.Errorf("cannot create dir %q: %w", dstDataDir, err) } - smallDir, bigDir, err := s.tb.CreateSnapshot(snapshotName) + smallDir, bigDir, err := s.tb.CreateSnapshot(snapshotName, deadline) if err != nil { + fs.MustRemoveAll(dstDir) return "", fmt.Errorf("cannot create table snapshot: %w", err) } dstSmallDir := dstDataDir + "/small" if err := fs.SymlinkRelative(smallDir, dstSmallDir); err != nil { + fs.MustRemoveAll(dstDir) return "", fmt.Errorf("cannot create symlink from %q to %q: %w", smallDir, dstSmallDir, err) } dstBigDir := dstDataDir + "/big" if err := fs.SymlinkRelative(bigDir, dstBigDir); err != nil { + fs.MustRemoveAll(dstDir) return "", fmt.Errorf("cannot create symlink from %q to %q: %w", bigDir, dstBigDir, err) } fs.MustSyncPath(dstDataDir) @@ -356,24 +359,28 @@ func (s *Storage) CreateSnapshot() (string, error) { idbSnapshot := fmt.Sprintf("%s/indexdb/snapshots/%s", srcDir, snapshotName) idb := s.idb() currSnapshot := idbSnapshot + "/" + idb.name - if err := idb.tb.CreateSnapshotAt(currSnapshot); err != nil { + if err := idb.tb.CreateSnapshotAt(currSnapshot, deadline); err != nil { + fs.MustRemoveAll(dstDir) return "", fmt.Errorf("cannot create curr indexDB snapshot: %w", err) } ok := idb.doExtDB(func(extDB *indexDB) { prevSnapshot := idbSnapshot + "/" + extDB.name - err = extDB.tb.CreateSnapshotAt(prevSnapshot) + err = extDB.tb.CreateSnapshotAt(prevSnapshot, deadline) }) if ok && err != nil { + fs.MustRemoveAll(dstDir) return "", fmt.Errorf("cannot create prev indexDB snapshot: %w", err) } dstIdbDir := dstDir + "/indexdb" if err := fs.SymlinkRelative(idbSnapshot, dstIdbDir); err != nil { + fs.MustRemoveAll(dstDir) return "", fmt.Errorf("cannot create symlink from %q to %q: %w", idbSnapshot, dstIdbDir, err) } srcMetadataDir := srcDir + "/metadata" dstMetadataDir := dstDir + "/metadata" if err := fs.CopyDirectory(srcMetadataDir, dstMetadataDir); err != nil { + fs.MustRemoveAll(dstDir) return "", fmt.Errorf("cannot copy metadata: %w", err) } diff --git a/lib/storage/storage_test.go b/lib/storage/storage_test.go index e22d9cad4..2068b0744 100644 --- a/lib/storage/storage_test.go +++ b/lib/storage/storage_test.go @@ -1165,7 +1165,7 @@ func testStorageAddRows(rng *rand.Rand, s *Storage) error { } // Try creating a snapshot from the storage. - snapshotName, err := s.CreateSnapshot() + snapshotName, err := s.CreateSnapshot(0) if err != nil { return fmt.Errorf("cannot create snapshot from the storage: %w", err) } @@ -1332,7 +1332,7 @@ func TestStorageDeleteStaleSnapshots(t *testing.T) { } } // Try creating a snapshot from the storage. - snapshotName, err := s.CreateSnapshot() + snapshotName, err := s.CreateSnapshot(0) if err != nil { t.Fatalf("cannot create snapshot from the storage: %s", err) } diff --git a/lib/storage/table.go b/lib/storage/table.go index 855cc4100..508644bae 100644 --- a/lib/storage/table.go +++ b/lib/storage/table.go @@ -142,7 +142,9 @@ func openTable(path string, s *Storage) (*table, error) { } // CreateSnapshot creates tb snapshot and returns paths to small and big parts of it. -func (tb *table) CreateSnapshot(snapshotName string) (string, string, error) { +// If deadline is reached before snapshot is created error is returned. +// If any error occurs during snapshot created data is not removed. +func (tb *table) CreateSnapshot(snapshotName string, deadline uint64) (string, string, error) { logger.Infof("creating table snapshot of %q...", tb.path) startTime := time.Now() @@ -158,7 +160,13 @@ func (tb *table) CreateSnapshot(snapshotName string) (string, string, error) { return "", "", fmt.Errorf("cannot create dir %q: %w", dstBigDir, err) } - for _, ptw := range ptws { + for i, ptw := range ptws { + if deadline > 0 && i%5 == 0 { + if fasttime.UnixTimestamp() > deadline { + return "", "", fmt.Errorf("cannot create snapshot for %q in %q in time: timeout exceeded", tb.path, snapshotName) + } + } + smallPath := dstSmallDir + "/" + ptw.pt.name bigPath := dstBigDir + "/" + ptw.pt.name if err := ptw.pt.CreateSnapshotAt(smallPath, bigPath); err != nil {