diff --git a/app/vmctl/README.md b/app/vmctl/README.md index d2a81ed5a..b463b8f9a 100644 --- a/app/vmctl/README.md +++ b/app/vmctl/README.md @@ -738,7 +738,7 @@ or higher. See `./vmctl vm-native --help` for details and full list of flags. Migration in `vm-native` mode takes two steps: -1. Explore the list of the metrics to migrate via `/api/v1/series` API; +1. Explore the list of the metrics to migrate via `api/v1/label/__name__/values` API; 2. Migrate explored metrics one-by-one. ``` @@ -765,6 +765,57 @@ Requests to make: 9 / 9 [██████████████████ requests retries: 0; 2023/03/02 09:22:06 Total time: 3.633127625s ``` +`vmctl` uses retries with backoff policy by default. + +The benefits of this retry backoff policy include: +1. Improved success rates: + With each retry attempt, the migration process has a higher chance of success. + By increasing the delay between retries, the system can avoid overwhelming the service with too many requests at once. + +2. Reduced load on the system: + By increasing the delay between retries, the system can reduce the load on the service by limiting the number of + requests made in a short amount of time. +3. Can help to migrate a big amount of data + +However, there are also some potential penalties associated with using a backoff retry policy, including: +1. Increased migration process latency: + `vmctl` need to make additional call to the `api/v1/label/__name__/values` with defined `--vm-native-filter-match` flag, + and after process all metric names with additional filters. + +In case when retries with backoff policy is unneeded `--vm-native-disable-retries` command line flag can be used. +When this flag is set to `true`, `vmctl` skips additional call to the `api/v1/label/__name__/values` API and starts +migration process by making calls to the `/api/v1/export` and `api/v1/import`. If some errors happen `vmctl` immediately +stops the migration process. + +``` +./vmctl vm-native --vm-native-src-addr=http://127.0.0.1:8481/select/0/prometheus \ + --vm-native-dst-addr=http://127.0.0.1:8428 \ + --vm-native-filter-match='{__name__!=""}' \ + --vm-native-filter-time-start='2023-04-08T11:30:30Z' \ + --vm-native-disable-retries=true + +VictoriaMetrics Native import mode + +2023/04/11 10:17:14 Initing import process from "http://127.0.0.1:8481/select/0/prometheus/api/v1/export/native" to "http://localhost:8428/api/v1/import/native" with filter + filter: match[]={__name__!=""} + start: 2023-04-08T11:30:30Z +. Continue? [Y/n] +2023/04/11 10:17:15 Requests to make: 1 +2023/04/11 10:17:15 number of workers decreased to 1, because vmctl calculated requests to make 1 +Total: 0 ↙ Speed: ? p/s Continue import process with filter + filter: match[]={__name__!=""} + start: 2023-04-08T11:30:30Z + end: 2023-04-11T07:17:14Z: +Total: 1.64 GiB ↖ Speed: 11.20 MiB p/s +2023/04/11 10:19:45 Import finished! +2023/04/11 10:19:45 VictoriaMetrics importer stats: + time spent while importing: 2m30.813841541s; + total bytes: 1.8 GB; + bytes/s: 11.7 MB; + requests: 1; + requests retries: 0; +2023/04/11 10:19:45 Total time: 2m30.814721125s +``` Importing tips: diff --git a/app/vmctl/flags.go b/app/vmctl/flags.go index 5eae96c9c..86e4dca07 100644 --- a/app/vmctl/flags.go +++ b/app/vmctl/flags.go @@ -326,6 +326,7 @@ const ( vmNativeStepInterval = "vm-native-step-interval" vmNativeDisableHTTPKeepAlive = "vm-native-disable-http-keep-alive" + vmNativeDisableRetries = "vm-native-disable-retries" vmNativeSrcAddr = "vm-native-src-addr" vmNativeSrcUser = "vm-native-src-user" @@ -443,6 +444,11 @@ var ( Usage: "Number of workers concurrently performing import requests to VM", Value: 2, }, + &cli.BoolFlag{ + Name: vmNativeDisableRetries, + Usage: "Defines whether to disable retries with backoff policy for migration process", + Value: false, + }, } ) diff --git a/app/vmctl/main.go b/app/vmctl/main.go index a4efd2b6d..be75ceb11 100644 --- a/app/vmctl/main.go +++ b/app/vmctl/main.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "log" + "net/http" "os" "os/signal" "strings" @@ -201,6 +202,8 @@ func main() { return fmt.Errorf("flag %q can't be empty", vmNativeFilterMatch) } + disableKeepAlive := c.Bool(vmNativeDisableHTTPKeepAlive) + var srcExtraLabels []string srcAddr := strings.Trim(c.String(vmNativeSrcAddr), "/") srcAuthConfig, err := auth.Generate( @@ -210,6 +213,7 @@ func main() { if err != nil { return fmt.Errorf("error initilize auth config for source: %s", srcAddr) } + srcHTTPClient := &http.Client{Transport: &http.Transport{DisableKeepAlives: disableKeepAlive}} dstAddr := strings.Trim(c.String(vmNativeDstAddr), "/") dstExtraLabels := c.StringSlice(vmExtraLabel) @@ -220,6 +224,7 @@ func main() { if err != nil { return fmt.Errorf("error initilize auth config for destination: %s", dstAddr) } + dstHTTPClient := &http.Client{Transport: &http.Transport{DisableKeepAlives: disableKeepAlive}} p := vmNativeProcessor{ rateLimit: c.Int64(vmRateLimit), @@ -231,19 +236,20 @@ func main() { Chunk: c.String(vmNativeStepInterval), }, src: &native.Client{ - AuthCfg: srcAuthConfig, - Addr: srcAddr, - ExtraLabels: srcExtraLabels, - DisableHTTPKeepAlive: c.Bool(vmNativeDisableHTTPKeepAlive), + AuthCfg: srcAuthConfig, + Addr: srcAddr, + ExtraLabels: srcExtraLabels, + HTTPClient: srcHTTPClient, }, dst: &native.Client{ - AuthCfg: dstAuthConfig, - Addr: dstAddr, - ExtraLabels: dstExtraLabels, - DisableHTTPKeepAlive: c.Bool(vmNativeDisableHTTPKeepAlive), + AuthCfg: dstAuthConfig, + Addr: dstAddr, + ExtraLabels: dstExtraLabels, + HTTPClient: dstHTTPClient, }, - backoff: backoff.New(), - cc: c.Int(vmConcurrency), + backoff: backoff.New(), + cc: c.Int(vmConcurrency), + disableRetries: c.Bool(vmNativeDisableRetries), } return p.run(ctx, isNonInteractive(c)) }, diff --git a/app/vmctl/native/client.go b/app/vmctl/native/client.go index 1b9f779a7..bd22bc6b7 100644 --- a/app/vmctl/native/client.go +++ b/app/vmctl/native/client.go @@ -11,34 +11,33 @@ import ( ) const ( - nativeTenantsAddr = "admin/tenants" - nativeSeriesAddr = "api/v1/series" - nameLabel = "__name__" + nativeTenantsAddr = "admin/tenants" + nativeMetricNamesAddr = "api/v1/label/__name__/values" ) // Client is an HTTP client for exporting and importing // time series via native protocol. type Client struct { - AuthCfg *auth.Config - Addr string - ExtraLabels []string - DisableHTTPKeepAlive bool + AuthCfg *auth.Config + Addr string + ExtraLabels []string + HTTPClient *http.Client } // LabelValues represents series from api/v1/series response type LabelValues map[string]string -// Response represents response from api/v1/series +// Response represents response from api/v1/label/__name__/values type Response struct { - Status string `json:"status"` - Series []LabelValues `json:"data"` + Status string `json:"status"` + MetricNames []string `json:"data"` } -// Explore finds series by provided filter from api/v1/series -func (c *Client) Explore(ctx context.Context, f Filter, tenantID string) (map[string]struct{}, error) { - url := fmt.Sprintf("%s/%s", c.Addr, nativeSeriesAddr) +// Explore finds metric names by provided filter from api/v1/label/__name__/values +func (c *Client) Explore(ctx context.Context, f Filter, tenantID string) ([]string, error) { + url := fmt.Sprintf("%s/%s", c.Addr, nativeMetricNamesAddr) if tenantID != "" { - url = fmt.Sprintf("%s/select/%s/prometheus/%s", c.Addr, tenantID, nativeSeriesAddr) + url = fmt.Sprintf("%s/select/%s/prometheus/%s", c.Addr, tenantID, nativeMetricNamesAddr) } req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) if err != nil { @@ -68,21 +67,7 @@ func (c *Client) Explore(ctx context.Context, f Filter, tenantID string) (map[st if err := resp.Body.Close(); err != nil { return nil, fmt.Errorf("cannot close series response body: %s", err) } - names := make(map[string]struct{}) - for _, series := range response.Series { - // TODO: consider tweaking /api/v1/series API to return metric names only - // this could make explore response much lighter. - for key, value := range series { - if key != nameLabel { - continue - } - if _, ok := names[value]; ok { - continue - } - names[value] = struct{}{} - } - } - return names, nil + return response.MetricNames, nil } // ImportPipe uses pipe reader in request to process data @@ -169,8 +154,8 @@ func (c *Client) do(req *http.Request, expSC int) (*http.Response, error) { if c.AuthCfg != nil { c.AuthCfg.SetHeaders(req, true) } - var httpClient = &http.Client{Transport: &http.Transport{DisableKeepAlives: c.DisableHTTPKeepAlive}} - resp, err := httpClient.Do(req) + + resp, err := c.HTTPClient.Do(req) if err != nil { return nil, fmt.Errorf("unexpected error when performing request: %w", err) } diff --git a/app/vmctl/vm_native.go b/app/vmctl/vm_native.go index da76d4a05..440f2f21d 100644 --- a/app/vmctl/vm_native.go +++ b/app/vmctl/vm_native.go @@ -5,6 +5,7 @@ import ( "fmt" "io" "log" + "strings" "sync" "time" @@ -13,8 +14,8 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/native" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/stepper" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/vm" + "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/searchutils" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils" "github.com/cheggaaa/pb/v3" ) @@ -25,16 +26,18 @@ type vmNativeProcessor struct { src *native.Client backoff *backoff.Backoff - s *stats - rateLimit int64 - interCluster bool - cc int + s *stats + rateLimit int64 + interCluster bool + cc int + disableRetries bool } const ( - nativeExportAddr = "api/v1/export/native" - nativeImportAddr = "api/v1/import/native" - nativeBarTpl = `{{ blue "%s:" }} {{ counters . }} {{ bar . "[" "█" (cycle . "█") "▒" "]" }} {{ percent . }}` + nativeExportAddr = "api/v1/export/native" + nativeImportAddr = "api/v1/import/native" + nativeWithBackoffTpl = `{{ blue "%s:" }} {{ counters . }} {{ bar . "[" "█" (cycle . "█") "▒" "]" }} {{ percent . }}` + nativeSingleProcessTpl = `Total: {{counters . }} {{ cycle . "↖" "↗" "↘" "↙" }} Speed: {{speed . }} {{string . "suffix"}}` ) func (p *vmNativeProcessor) run(ctx context.Context, silent bool) error { @@ -94,9 +97,9 @@ func (p *vmNativeProcessor) run(ctx context.Context, silent bool) error { return nil } -func (p *vmNativeProcessor) do(ctx context.Context, f native.Filter, srcURL, dstURL string) error { +func (p *vmNativeProcessor) do(ctx context.Context, f native.Filter, srcURL, dstURL string, bar *pb.ProgressBar) error { - retryableFunc := func() error { return p.runSingle(ctx, f, srcURL, dstURL) } + retryableFunc := func() error { return p.runSingle(ctx, f, srcURL, dstURL, bar) } attempts, err := p.backoff.Retry(ctx, retryableFunc) p.s.Lock() p.s.retries += attempts @@ -108,13 +111,18 @@ func (p *vmNativeProcessor) do(ctx context.Context, f native.Filter, srcURL, dst return nil } -func (p *vmNativeProcessor) runSingle(ctx context.Context, f native.Filter, srcURL, dstURL string) error { +func (p *vmNativeProcessor) runSingle(ctx context.Context, f native.Filter, srcURL, dstURL string, bar *pb.ProgressBar) error { - exportReader, err := p.src.ExportPipe(ctx, srcURL, f) + reader, err := p.src.ExportPipe(ctx, srcURL, f) if err != nil { return fmt.Errorf("failed to init export pipe: %w", err) } + if p.disableRetries && bar != nil { + fmt.Printf("Continue import process with filter %s:\n", f.String()) + reader = bar.NewProxyReader(reader) + } + pr, pw := io.Pipe() done := make(chan struct{}) go func() { @@ -131,7 +139,7 @@ func (p *vmNativeProcessor) runSingle(ctx context.Context, f native.Filter, srcU w = limiter.NewWriteLimiter(pw, rl) } - written, err := io.Copy(w, exportReader) + written, err := io.Copy(w, reader) if err != nil { return fmt.Errorf("failed to write into %q: %s", p.dst.Addr, err) } @@ -176,17 +184,22 @@ func (p *vmNativeProcessor) runBackfilling(ctx context.Context, tenantID string, fmt.Println("") // extra line for better output formatting log.Printf(initMessage, initParams...) - log.Printf("Exploring metrics...") - metrics, err := p.src.Explore(ctx, p.filter, tenantID) - if err != nil { - return fmt.Errorf("cannot get metrics from source %s: %w", p.src.Addr, err) + var foundSeriesMsg string + + metrics := []string{p.filter.Match} + if !p.disableRetries { + log.Printf("Exploring metrics...") + metrics, err = p.src.Explore(ctx, p.filter, tenantID) + if err != nil { + return fmt.Errorf("cannot get metrics from source %s: %w", p.src.Addr, err) + } + + if len(metrics) == 0 { + return fmt.Errorf("no metrics found") + } + foundSeriesMsg = fmt.Sprintf("Found %d metrics to import", len(metrics)) } - if len(metrics) == 0 { - return fmt.Errorf("no metrics found") - } - - foundSeriesMsg := fmt.Sprintf("Found %d metrics to import", len(metrics)) if !p.interCluster { // do not prompt for intercluster because there could be many tenants, // and we don't want to interrupt the process when moving to the next tenant. @@ -206,7 +219,10 @@ func (p *vmNativeProcessor) runBackfilling(ctx context.Context, tenantID string, var bar *pb.ProgressBar if !silent { - bar = pb.ProgressBarTemplate(fmt.Sprintf(nativeBarTpl, barPrefix)).New(len(metrics) * len(ranges)) + bar = pb.ProgressBarTemplate(fmt.Sprintf(nativeWithBackoffTpl, barPrefix)).New(len(metrics) * len(ranges)) + if p.disableRetries { + bar = pb.ProgressBarTemplate(nativeSingleProcessTpl).New(0) + } bar.Start() defer bar.Finish() } @@ -220,19 +236,26 @@ func (p *vmNativeProcessor) runBackfilling(ctx context.Context, tenantID string, go func() { defer wg.Done() for f := range filterCh { - if err := p.do(ctx, f, srcURL, dstURL); err != nil { - errCh <- err - return - } - if bar != nil { - bar.Increment() + if !p.disableRetries { + if err := p.do(ctx, f, srcURL, dstURL, nil); err != nil { + errCh <- err + return + } + if bar != nil { + bar.Increment() + } + } else { + if err := p.runSingle(ctx, f, srcURL, dstURL, bar); err != nil { + errCh <- err + return + } } } }() } // any error breaks the import - for s := range metrics { + for _, s := range metrics { match, err := buildMatchWithFilter(p.filter.Match, s) if err != nil { @@ -313,11 +336,26 @@ func byteCountSI(b int64) string { } func buildMatchWithFilter(filter string, metricName string) (string, error) { - labels, err := promutils.NewLabelsFromString(filter) + if filter == metricName { + return filter, nil + } + + labels, err := searchutils.ParseMetricSelector(filter) if err != nil { return "", err } - labels.Set("__name__", metricName) - return labels.String(), nil + str := make([]string, 0, len(labels)) + for _, label := range labels { + if len(label.Key) == 0 { + continue + } + str = append(str, label.String()) + } + + nameFilter := fmt.Sprintf("__name__=%q", metricName) + str = append(str, nameFilter) + + match := fmt.Sprintf("{%s}", strings.Join(str, ",")) + return match, nil } diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index e6c8cf524..f462d289a 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -13,6 +13,8 @@ The following tip changes can be tested by building VictoriaMetrics components f * [How to build vmauth](https://docs.victoriametrics.com/vmauth.html#how-to-build-from-sources) * [How to build vmctl](https://docs.victoriametrics.com/vmctl.html#how-to-build) +* BUGFIX: [vmctl](https://docs.victoriametrics.com/vmctl.html): fix performance issue when using `vmctl vm-native`. Added flag to disable backoff policy `--vm-native-disable-retries`. Changed API call to explore the list of the metric names from `/api/v1/series` to `api/v1/label/__name__/values`. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4092). + ## tip * FEATURE: [vmbackup](https://docs.victoriametrics.com/vmbackup.html): store backup creation and completion time in `backup_complete.ignore` file of backup contents. This is useful to determine point in time when backup was created and completed. diff --git a/docs/vmctl.md b/docs/vmctl.md index c4a4d0ecb..34b9628b5 100644 --- a/docs/vmctl.md +++ b/docs/vmctl.md @@ -742,7 +742,7 @@ or higher. See `./vmctl vm-native --help` for details and full list of flags. Migration in `vm-native` mode takes two steps: -1. Explore the list of the metrics to migrate via `/api/v1/series` API; +1. Explore the list of the metrics to migrate via `api/v1/label/__name__/values` API; 2. Migrate explored metrics one-by-one. ``` @@ -770,6 +770,59 @@ Requests to make: 9 / 9 [██████████████████ 2023/03/02 09:22:06 Total time: 3.633127625s ``` + +`vmctl` uses retries with backoff policy by default. + +The benefits of this retry backoff policy include: +1. Improved success rates: + With each retry attempt, the migration process has a higher chance of success. + By increasing the delay between retries, the system can avoid overwhelming the service with too many requests at once. + +2. Reduced load on the system: + By increasing the delay between retries, the system can reduce the load on the service by limiting the number of + requests made in a short amount of time. +3. Can help to migrate a big amount of data + +However, there are also some potential penalties associated with using a backoff retry policy, including: +1. Increased migration process latency: +`vmctl` need to make additional call to the `api/v1/label/__name__/values` with defined `--vm-native-filter-match` flag, +and after process all metric names with additional filters. + +In case when retries with backoff policy is unneeded `--vm-native-disable-retries` command line flag can be used. +When this flag is set to `true`, `vmctl` skips additional call to the `api/v1/label/__name__/values` API and starts +migration process by making calls to the `/api/v1/export` and `api/v1/import`. If some errors happen `vmctl` immediately +stops the migration process. + +``` +./vmctl vm-native --vm-native-src-addr=http://127.0.0.1:8481/select/0/prometheus \ + --vm-native-dst-addr=http://127.0.0.1:8428 \ + --vm-native-filter-match='{__name__!=""}' \ + --vm-native-filter-time-start='2023-04-08T11:30:30Z' \ + --vm-native-disable-retries=true + +VictoriaMetrics Native import mode + +2023/04/11 10:17:14 Initing import process from "http://127.0.0.1:8481/select/0/prometheus/api/v1/export/native" to "http://localhost:8428/api/v1/import/native" with filter + filter: match[]={__name__!=""} + start: 2023-04-08T11:30:30Z +. Continue? [Y/n] +2023/04/11 10:17:15 Requests to make: 1 +2023/04/11 10:17:15 number of workers decreased to 1, because vmctl calculated requests to make 1 +Total: 0 ↙ Speed: ? p/s Continue import process with filter + filter: match[]={__name__!=""} + start: 2023-04-08T11:30:30Z + end: 2023-04-11T07:17:14Z: +Total: 1.64 GiB ↖ Speed: 11.20 MiB p/s +2023/04/11 10:19:45 Import finished! +2023/04/11 10:19:45 VictoriaMetrics importer stats: + time spent while importing: 2m30.813841541s; + total bytes: 1.8 GB; + bytes/s: 11.7 MB; + requests: 1; + requests retries: 0; +2023/04/11 10:19:45 Total time: 2m30.814721125s +``` + Importing tips: 1. Migrating big volumes of data may result in reaching the safety limits on `src` side.