From ff0e63ef0df138b745a6173e447e86a433db9f3f Mon Sep 17 00:00:00 2001 From: Dmytro Kozlov Date: Thu, 2 Mar 2023 14:19:45 +0200 Subject: [PATCH] app/vmctl: add backoff retries to native protocol (#3859) app/vmctl: vm-native - split migration on per-metric basis `vm-native` mode now splits the migration process on per-metric basis. This allows to migrate metrics one-by-one according to the specified filter. This change allows to retry export/import requests for a specific metric and provides a better understanding of the migration progress. --------- Signed-off-by: hagen1778 Co-authored-by: hagen1778 --- app/vmctl/README.md | 225 ++++++++----------- app/vmctl/backoff/backoff.go | 2 +- app/vmctl/flags.go | 10 +- app/vmctl/main.go | 36 +-- app/vmctl/native/client.go | 181 +++++++++++++++ app/vmctl/native/filter.go | 22 ++ app/vmctl/vm_native.go | 417 +++++++++++++++++++---------------- app/vmctl/vm_native_test.go | 55 ++--- docs/CHANGELOG.md | 1 + docs/vmctl.md | 225 ++++++++----------- 10 files changed, 680 insertions(+), 494 deletions(-) create mode 100644 app/vmctl/native/client.go create mode 100644 app/vmctl/native/filter.go diff --git a/app/vmctl/README.md b/app/vmctl/README.md index fc08cdb77..373708e18 100644 --- a/app/vmctl/README.md +++ b/app/vmctl/README.md @@ -737,21 +737,33 @@ or higher. See `./vmctl vm-native --help` for details and full list of flags. -In this mode `vmctl` acts as a proxy between two VM instances, where time series filtering is done by "source" (`src`) -and processing is done by "destination" (`dst`). Because of that, `vmctl` doesn't actually know how much data will be -processed and can't show the progress bar. It will show the current processing speed and total number of processed bytes: +Migration in `vm-native` mode takes two steps: +1. Explore the list of the metrics to migrate via `/api/v1/series` API; +2. Migrate explored metrics one-by-one. ``` -./vmctl vm-native --vm-native-src-addr=http://localhost:8528 \ - --vm-native-dst-addr=http://localhost:8428 \ - --vm-native-filter-match='{job="vmagent"}' \ - --vm-native-filter-time-start='2020-01-01T20:07:00Z' +./vmctl vm-native \ + --vm-native-src-addr=http://127.0.0.1:8481/select/0/prometheus \ + --vm-native-dst-addr=http://localhost:8428 \ + --vm-native-filter-time-start='2022-11-20T00:00:00Z' \ + --vm-native-filter-match='{__name__=~"vm_cache_.*"}' VictoriaMetrics Native import mode -Initing export pipe from "http://localhost:8528" with filters: - filter: match[]={job="vmagent"} -Initing import process to "http://localhost:8428": -Total: 336.75 KiB ↖ Speed: 454.46 KiB p/s -2020/10/13 17:04:59 Total time: 952.143376ms + +2023/03/02 09:22:02 Initing import process from "http://127.0.0.1:8481/select/0/prometheus/api/v1/export/native" to "http://localhost:8428/api/v1/import/native" with filter + filter: match[]={__name__=~"vm_cache_.*"} + start: 2022-11-20T00:00:00Z +2023/03/02 09:22:02 Exploring metrics... +Found 9 metrics to import. Continue? [Y/n] +2023/03/02 09:22:04 Requests to make: 9 +Requests to make: 9 / 9 [███████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████] 100.00% +2023/03/02 09:22:06 Import finished! +2023/03/02 09:22:06 VictoriaMetrics importer stats: + time spent while importing: 3.632638875s; + total bytes: 7.8 MB; + bytes/s: 2.1 MB; + requests: 9; + requests retries: 0; +2023/03/02 09:22:06 Total time: 3.633127625s ``` Importing tips: @@ -759,6 +771,7 @@ Importing tips: 1. Migrating big volumes of data may result in reaching the safety limits on `src` side. Please verify that `-search.maxExportDuration` and `-search.maxExportSeries` were set with proper values for `src`. If hitting the limits, follow the recommendations [here](https://docs.victoriametrics.com/#how-to-export-data-in-native-format). +If hitting `the number of matching timeseries exceeds...` error, adjust filters to match less time series or update `-search.maxSeries` command-line flag on vmselect/vmsingle; 2. Migrating all the metrics from one VM to another may collide with existing application metrics (prefixed with `vm_`) at destination and lead to confusion when using [official Grafana dashboards](https://grafana.com/orgs/victoriametrics/dashboards). @@ -770,71 +783,57 @@ Instead, use [relabeling in VictoriaMetrics](https://github.com/VictoriaMetrics/ 5. When importing in or from cluster version remember to use correct [URL format](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#url-format) and specify `accountID` param. 6. When migrating large volumes of data it might be useful to use `--vm-native-step-interval` flag to split single process into smaller steps. +7. `vmctl` supports `--vm-concurrency` which controls the number of concurrent workers that process the input from source query results. +Please note that each import request can load up to a single vCPU core on VictoriaMetrics. So try to set it according +to allocated CPU resources of your VictoriaMetrics installation. + +In this mode `vmctl` acts as a proxy between two VM instances, where time series filtering is done by "source" (`src`) +and processing is done by "destination" (`dst`). So no extra memory or CPU resources required on `vmctl` side. Only +`src` and `dst` resource matter. #### Using time-based chunking of migration -It is possible split migration process into set of smaller batches based on time. This is especially useful when migrating large volumes of data as this adds indication of progress and ability to restore process from certain point in case of failure. +It is possible split migration process into set of smaller batches based on time. This is especially useful when +migrating large volumes of data as this adds indication of progress and ability to restore process from certain point +in case of failure. To use this you need to specify `--vm-native-step-interval` flag. Supported values are: `month`, `day`, `hour`. -Note that in order to use this it is required `--vm-native-filter-time-start` to be set to calculate time ranges for export process. +Note that in order to use this it is required `--vm-native-filter-time-start` to be set to calculate time ranges for +export process. Every range is being processed independently, which means that: - after range processing is finished all data within range is migrated -- if process fails on one of stages it is guaranteed that data of prior stages is already written, so it is possible to restart process starting from failed range +- if process fails on one of stages it is guaranteed that data of prior stages is already written, +so it is possible to restart process starting from failed range. -It is recommended using the `month` step when migrating the data over multiple months, since the migration with `day` and `hour` steps may take longer time to complete -because of additional overhead. +It is recommended using the `month` step when migrating the data over multiple months, +since the migration with `day` and `hour` steps may take longer time to complete because of additional overhead. Usage example: ```console -./vmctl vm-native - --vm-native-filter-time-start 2022-06-17T00:07:00Z \ - --vm-native-filter-time-end 2022-10-03T00:07:00Z \ - --vm-native-src-addr http://localhost:8428 \ - --vm-native-dst-addr http://localhost:8528 \ - --vm-native-step-interval=month +./vmctl vm-native \ + --vm-native-src-addr=http://127.0.0.1:8481/select/0/prometheus \ + --vm-native-dst-addr=http://localhost:8428 \ + --vm-native-filter-time-start='2022-11-20T00:00:00Z' \ + --vm-native-step-interval=month \ + --vm-native-filter-match='{__name__=~"vm_cache_.*"}' VictoriaMetrics Native import mode -2022/08/30 19:48:24 Processing range 1/5: 2022-06-17T00:07:00Z - 2022-06-30T23:59:59Z -2022/08/30 19:48:24 Initing export pipe from "http://localhost:8428" with filters: - filter: match[]={__name__!=""} - start: 2022-06-17T00:07:00Z - end: 2022-06-30T23:59:59Z -Initing import process to "http://localhost:8428": -2022/08/30 19:48:24 Import finished! -Total: 16 B ↗ Speed: 28.89 KiB p/s -2022/08/30 19:48:24 Processing range 2/5: 2022-07-01T00:00:00Z - 2022-07-31T23:59:59Z -2022/08/30 19:48:24 Initing export pipe from "http://localhost:8428" with filters: - filter: match[]={__name__!=""} - start: 2022-07-01T00:00:00Z - end: 2022-07-31T23:59:59Z -Initing import process to "http://localhost:8428": -2022/08/30 19:48:24 Import finished! -Total: 16 B ↗ Speed: 164.35 KiB p/s -2022/08/30 19:48:24 Processing range 3/5: 2022-08-01T00:00:00Z - 2022-08-31T23:59:59Z -2022/08/30 19:48:24 Initing export pipe from "http://localhost:8428" with filters: - filter: match[]={__name__!=""} - start: 2022-08-01T00:00:00Z - end: 2022-08-31T23:59:59Z -Initing import process to "http://localhost:8428": -2022/08/30 19:48:24 Import finished! -Total: 16 B ↗ Speed: 191.42 KiB p/s -2022/08/30 19:48:24 Processing range 4/5: 2022-09-01T00:00:00Z - 2022-09-30T23:59:59Z -2022/08/30 19:48:24 Initing export pipe from "http://localhost:8428" with filters: - filter: match[]={__name__!=""} - start: 2022-09-01T00:00:00Z - end: 2022-09-30T23:59:59Z -Initing import process to "http://localhost:8428": -2022/08/30 19:48:24 Import finished! -Total: 16 B ↗ Speed: 141.04 KiB p/s -2022/08/30 19:48:24 Processing range 5/5: 2022-10-01T00:00:00Z - 2022-10-03T00:07:00Z -2022/08/30 19:48:24 Initing export pipe from "http://localhost:8428" with filters: - filter: match[]={__name__!=""} - start: 2022-10-01T00:00:00Z - end: 2022-10-03T00:07:00Z -Initing import process to "http://localhost:8428": -2022/08/30 19:48:24 Import finished! -Total: 16 B ↗ Speed: 186.32 KiB p/s -2022/08/30 19:48:24 Total time: 12.680582ms + +2023/03/02 09:18:05 Initing import process from "http://127.0.0.1:8481/select/0/prometheus/api/v1/export/native" to "http://localhost:8428/api/v1/import/native" with filter + filter: match[]={__name__=~"vm_cache_.*"} + start: 2022-11-20T00:00:00Z +2023/03/02 09:18:05 Exploring metrics... +Found 9 metrics to import. Continue? [Y/n] +2023/03/02 09:18:07 Selected time range will be split into 5 ranges according to "month" step. Requests to make: 45. +Requests to make: 45 / 45 [█████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████] 100.00% +2023/03/02 09:18:12 Import finished! +2023/03/02 09:18:12 VictoriaMetrics importer stats: + time spent while importing: 7.111870667s; + total bytes: 7.7 MB; + bytes/s: 1.1 MB; + requests: 45; + requests retries: 0; +2023/03/02 09:18:12 Total time: 7.112405875s ``` #### Cluster-to-cluster migration mode @@ -846,70 +845,41 @@ Cluster-to-cluster uses `/admin/tenants` endpoint (available starting from [v1.8 To use this mode you need to set `--vm-intercluster` flag to `true`, `--vm-native-src-addr` flag to 'http://vmselect:8481/' and `--vm-native-dst-addr` value to http://vminsert:8480/: ```console -./bin/vmctl vm-native --vm-intercluster=true --vm-native-src-addr=http://localhost:8481/ --vm-native-dst-addr=http://172.17.0.3:8480/ + ./vmctl vm-native --vm-native-src-addr=http://127.0.0.1:8481/ \ + --vm-native-dst-addr=http://127.0.0.1:8480/ \ + --vm-native-filter-match='{__name__="vm_app_uptime_seconds"}' \ + --vm-native-filter-time-start='2023-02-01T00:00:00Z' \ + --vm-native-step-interval=day \ +--vm-intercluster VictoriaMetrics Native import mode -2022/12/05 21:20:06 Discovered tenants: [123:1 12812919:1 1289198:1 1289:1283 12:1 1:0 1:1 1:1231231 1:1271727 1:12819 1:281 812891298:1] -2022/12/05 21:20:06 Initing export pipe from "http://localhost:8481/select/123:1/prometheus/api/v1/export/native" with filters: - filter: match[]={__name__!=""} -Initing import process to "http://172.17.0.3:8480/insert/123:1/prometheus/api/v1/import/native": -Total: 61.13 MiB ↖ Speed: 2.05 MiB p/s -Total: 61.13 MiB ↗ Speed: 2.30 MiB p/s -2022/12/05 21:20:33 Initing export pipe from "http://localhost:8481/select/12812919:1/prometheus/api/v1/export/native" with filters: - filter: match[]={__name__!=""} -Initing import process to "http://172.17.0.3:8480/insert/12812919:1/prometheus/api/v1/import/native": -Total: 43.14 MiB ↘ Speed: 1.86 MiB p/s -Total: 43.14 MiB ↙ Speed: 2.36 MiB p/s -2022/12/05 21:20:51 Initing export pipe from "http://localhost:8481/select/1289198:1/prometheus/api/v1/export/native" with filters: - filter: match[]={__name__!=""} -Initing import process to "http://172.17.0.3:8480/insert/1289198:1/prometheus/api/v1/import/native": -Total: 16.64 MiB ↗ Speed: 2.66 MiB p/s -Total: 16.64 MiB ↘ Speed: 2.19 MiB p/s -2022/12/05 21:20:59 Initing export pipe from "http://localhost:8481/select/1289:1283/prometheus/api/v1/export/native" with filters: - filter: match[]={__name__!=""} -Initing import process to "http://172.17.0.3:8480/insert/1289:1283/prometheus/api/v1/import/native": -Total: 43.33 MiB ↙ Speed: 1.94 MiB p/s -Total: 43.33 MiB ↖ Speed: 2.35 MiB p/s -2022/12/05 21:21:18 Initing export pipe from "http://localhost:8481/select/12:1/prometheus/api/v1/export/native" with filters: - filter: match[]={__name__!=""} -Initing import process to "http://172.17.0.3:8480/insert/12:1/prometheus/api/v1/import/native": -Total: 63.78 MiB ↙ Speed: 1.96 MiB p/s -Total: 63.78 MiB ↖ Speed: 2.28 MiB p/s -2022/12/05 21:21:46 Initing export pipe from "http://localhost:8481/select/1:0/prometheus/api/v1/export/native" with filters: - filter: match[]={__name__!=""} -Initing import process to "http://172.17.0.3:8480/insert/1:0/prometheus/api/v1/import/native": -2022/12/05 21:21:46 Import finished! -Total: 330 B ↗ Speed: 3.53 MiB p/s -2022/12/05 21:21:46 Initing export pipe from "http://localhost:8481/select/1:1/prometheus/api/v1/export/native" with filters: - filter: match[]={__name__!=""} -Initing import process to "http://172.17.0.3:8480/insert/1:1/prometheus/api/v1/import/native": -Total: 63.81 MiB ↙ Speed: 1.96 MiB p/s -Total: 63.81 MiB ↖ Speed: 2.28 MiB p/s -2022/12/05 21:22:14 Initing export pipe from "http://localhost:8481/select/1:1231231/prometheus/api/v1/export/native" with filters: - filter: match[]={__name__!=""} -Initing import process to "http://172.17.0.3:8480/insert/1:1231231/prometheus/api/v1/import/native": -Total: 63.84 MiB ↙ Speed: 1.93 MiB p/s -Total: 63.84 MiB ↖ Speed: 2.29 MiB p/s -2022/12/05 21:22:42 Initing export pipe from "http://localhost:8481/select/1:1271727/prometheus/api/v1/export/native" with filters: - filter: match[]={__name__!=""} -Initing import process to "http://172.17.0.3:8480/insert/1:1271727/prometheus/api/v1/import/native": -Total: 54.37 MiB ↘ Speed: 1.90 MiB p/s -Total: 54.37 MiB ↙ Speed: 2.37 MiB p/s -2022/12/05 21:23:05 Initing export pipe from "http://localhost:8481/select/1:12819/prometheus/api/v1/export/native" with filters: - filter: match[]={__name__!=""} -Initing import process to "http://172.17.0.3:8480/insert/1:12819/prometheus/api/v1/import/native": -Total: 17.01 MiB ↙ Speed: 1.75 MiB p/s -Total: 17.01 MiB ↖ Speed: 2.15 MiB p/s -2022/12/05 21:23:13 Initing export pipe from "http://localhost:8481/select/1:281/prometheus/api/v1/export/native" with filters: - filter: match[]={__name__!=""} -Initing import process to "http://172.17.0.3:8480/insert/1:281/prometheus/api/v1/import/native": -Total: 63.89 MiB ↘ Speed: 1.90 MiB p/s -Total: 63.89 MiB ↙ Speed: 2.29 MiB p/s -2022/12/05 21:23:42 Initing export pipe from "http://localhost:8481/select/812891298:1/prometheus/api/v1/export/native" with filters: - filter: match[]={__name__!=""} -Initing import process to "http://172.17.0.3:8480/insert/812891298:1/prometheus/api/v1/import/native": -Total: 63.84 MiB ↖ Speed: 1.99 MiB p/s -Total: 63.84 MiB ↗ Speed: 2.26 MiB p/s -2022/12/05 21:24:10 Total time: 4m4.1466565s +2023/02/28 10:41:42 Discovering tenants... +2023/02/28 10:41:42 The following tenants were discovered: [0:0 1:0 2:0 3:0 4:0] +2023/02/28 10:41:42 Initing import process from "http://127.0.0.1:8481/select/0:0/prometheus/api/v1/export/native" to "http://127.0.0.1:8480/insert/0:0/prometheus/api/v1/import/native" with filter + filter: match[]={__name__="vm_app_uptime_seconds"} + start: 2023-02-01T00:00:00Z for tenant 0:0 +2023/02/28 10:41:42 Exploring metrics... +2023/02/28 10:41:42 Found 1 metrics to import +2023/02/28 10:41:42 Selected time range will be split into 28 ranges according to "day" step. +Requests to make for tenant 0:0: 28 / 28 [███████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████] 100.00% + +2023/02/28 10:41:45 Initing import process from "http://127.0.0.1:8481/select/1:0/prometheus/api/v1/export/native" to "http://127.0.0.1:8480/insert/1:0/prometheus/api/v1/import/native" with filter + filter: match[]={__name__="vm_app_uptime_seconds"} + start: 2023-02-01T00:00:00Z for tenant 1:0 +2023/02/28 10:41:45 Exploring metrics... +2023/02/28 10:41:45 Found 1 metrics to import +2023/02/28 10:41:45 Selected time range will be split into 28 ranges according to "day" step. Requests to make: 28 +Requests to make for tenant 1:0: 28 / 28 [████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████] 100.00% + +... + +2023/02/28 10:42:49 Import finished! +2023/02/28 10:42:49 VictoriaMetrics importer stats: + time spent while importing: 1m6.714210417s; + total bytes: 39.7 MB; + bytes/s: 594.4 kB; + requests: 140; + requests retries: 0; +2023/02/28 10:42:49 Total time: 1m7.147971417s ``` ## Verifying exported blocks from VictoriaMetrics @@ -976,6 +946,7 @@ a sign of network issues or VM being overloaded. See the logs during import for By default `vmctl` waits confirmation from user before starting the import. If this is unwanted behavior and no user interaction required - pass `-s` flag to enable "silence" mode: +See below the example of `vm-native` migration process: ``` -s Whether to run in silent mode. If set to true no confirmation prompts will appear. (default: false) ``` diff --git a/app/vmctl/backoff/backoff.go b/app/vmctl/backoff/backoff.go index beb14304d..431d6a0c4 100644 --- a/app/vmctl/backoff/backoff.go +++ b/app/vmctl/backoff/backoff.go @@ -47,7 +47,7 @@ func (b *Backoff) Retry(ctx context.Context, cb retryableFunc) (uint64, error) { if err == nil { return attempt, nil } - if errors.Is(err, ErrBadRequest) { + if errors.Is(err, ErrBadRequest) || errors.Is(err, context.Canceled) { logger.Errorf("unrecoverable error: %s", err) return attempt, err // fail fast if not recoverable } diff --git a/app/vmctl/flags.go b/app/vmctl/flags.go index d20808b9b..0f2b6da1d 100644 --- a/app/vmctl/flags.go +++ b/app/vmctl/flags.go @@ -344,8 +344,9 @@ var ( Value: `{__name__!=""}`, }, &cli.StringFlag{ - Name: vmNativeFilterTimeStart, - Usage: "The time filter may contain either unix timestamp in seconds or RFC3339 values. E.g. '2020-01-01T20:07:00Z'", + Name: vmNativeFilterTimeStart, + Usage: "The time filter may contain either unix timestamp in seconds or RFC3339 values. E.g. '2020-01-01T20:07:00Z'", + Required: true, }, &cli.StringFlag{ Name: vmNativeFilterTimeEnd, @@ -406,6 +407,11 @@ var ( fmt.Sprintf(" In this mode --%s flag format is: 'http://vmselect:8481/'. --%s flag format is: http://vminsert:8480/. \n", vmNativeSrcAddr, vmNativeDstAddr) + " TenantID will be appended automatically after discovering tenants from src.", }, + &cli.UintFlag{ + Name: vmConcurrency, + Usage: "Number of workers concurrently performing import requests to VM", + Value: 2, + }, } ) diff --git a/app/vmctl/main.go b/app/vmctl/main.go index dbfc332f1..384d918ff 100644 --- a/app/vmctl/main.go +++ b/app/vmctl/main.go @@ -11,6 +11,8 @@ import ( "syscall" "time" + "github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/backoff" + "github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/native" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/remoteread" "github.com/urfave/cli/v2" @@ -189,7 +191,7 @@ func main() { { Name: "vm-native", Usage: "Migrate time series between VictoriaMetrics installations via native binary format", - Flags: vmNativeFlags, + Flags: mergeFlags(globalFlags, vmNativeFlags), Action: func(c *cli.Context) error { fmt.Println("VictoriaMetrics Native import mode") @@ -200,25 +202,27 @@ func main() { p := vmNativeProcessor{ rateLimit: c.Int64(vmRateLimit), interCluster: c.Bool(vmInterCluster), - filter: filter{ - match: c.String(vmNativeFilterMatch), - timeStart: c.String(vmNativeFilterTimeStart), - timeEnd: c.String(vmNativeFilterTimeEnd), - chunk: c.String(vmNativeStepInterval), + filter: native.Filter{ + Match: c.String(vmNativeFilterMatch), + TimeStart: c.String(vmNativeFilterTimeStart), + TimeEnd: c.String(vmNativeFilterTimeEnd), + Chunk: c.String(vmNativeStepInterval), }, - src: &vmNativeClient{ - addr: strings.Trim(c.String(vmNativeSrcAddr), "/"), - user: c.String(vmNativeSrcUser), - password: c.String(vmNativeSrcPassword), + src: &native.Client{ + Addr: strings.Trim(c.String(vmNativeSrcAddr), "/"), + User: c.String(vmNativeSrcUser), + Password: c.String(vmNativeSrcPassword), }, - dst: &vmNativeClient{ - addr: strings.Trim(c.String(vmNativeDstAddr), "/"), - user: c.String(vmNativeDstUser), - password: c.String(vmNativeDstPassword), - extraLabels: c.StringSlice(vmExtraLabel), + dst: &native.Client{ + Addr: strings.Trim(c.String(vmNativeDstAddr), "/"), + User: c.String(vmNativeDstUser), + Password: c.String(vmNativeDstPassword), + ExtraLabels: c.StringSlice(vmExtraLabel), }, + backoff: backoff.New(), + cc: c.Int(vmConcurrency), } - return p.run(ctx) + return p.run(ctx, c.Bool(globalSilent)) }, }, { diff --git a/app/vmctl/native/client.go b/app/vmctl/native/client.go new file mode 100644 index 000000000..908251b9a --- /dev/null +++ b/app/vmctl/native/client.go @@ -0,0 +1,181 @@ +package native + +import ( + "context" + "encoding/json" + "fmt" + "io" + "net/http" +) + +const ( + nativeTenantsAddr = "admin/tenants" + nativeSeriesAddr = "api/v1/series" + nameLabel = "__name__" +) + +// Client is an HTTP client for exporting and importing +// time series via native protocol. +type Client struct { + Addr string + User string + Password string + ExtraLabels []string +} + +// LabelValues represents series from api/v1/series response +type LabelValues map[string]string + +// Response represents response from api/v1/series +type Response struct { + Status string `json:"status"` + Series []LabelValues `json:"data"` +} + +// Explore finds series by provided filter from api/v1/series +func (c *Client) Explore(ctx context.Context, f Filter, tenantID string) (map[string]struct{}, error) { + url := fmt.Sprintf("%s/%s", c.Addr, nativeSeriesAddr) + if tenantID != "" { + url = fmt.Sprintf("%s/select/%s/prometheus/%s", c.Addr, tenantID, nativeSeriesAddr) + } + req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) + if err != nil { + return nil, fmt.Errorf("cannot create request to %q: %s", url, err) + } + + params := req.URL.Query() + if f.TimeStart != "" { + params.Set("start", f.TimeStart) + } + if f.TimeEnd != "" { + params.Set("end", f.TimeEnd) + } + params.Set("match[]", f.Match) + req.URL.RawQuery = params.Encode() + + resp, err := c.do(req, http.StatusOK) + if err != nil { + return nil, fmt.Errorf("series request failed: %s", err) + } + + var response Response + if err := json.NewDecoder(resp.Body).Decode(&response); err != nil { + return nil, fmt.Errorf("cannot decode series response: %s", err) + } + + if err := resp.Body.Close(); err != nil { + return nil, fmt.Errorf("cannot close series response body: %s", err) + } + names := make(map[string]struct{}) + for _, series := range response.Series { + // TODO: consider tweaking /api/v1/series API to return metric names only + // this could make explore response much lighter. + for key, value := range series { + if key != nameLabel { + continue + } + if _, ok := names[value]; ok { + continue + } + names[value] = struct{}{} + } + } + return names, nil +} + +// ImportPipe uses pipe reader in request to process data +func (c *Client) ImportPipe(ctx context.Context, dstURL string, pr *io.PipeReader) error { + req, err := http.NewRequestWithContext(ctx, http.MethodPost, dstURL, pr) + if err != nil { + return fmt.Errorf("cannot create import request to %q: %s", c.Addr, err) + } + importResp, err := c.do(req, http.StatusNoContent) + if err != nil { + return fmt.Errorf("import request failed: %s", err) + } + if err := importResp.Body.Close(); err != nil { + return fmt.Errorf("cannot close import response body: %s", err) + } + return nil +} + +// ExportPipe makes request by provided filter and return io.ReadCloser which can be used to get data +func (c *Client) ExportPipe(ctx context.Context, url string, f Filter) (io.ReadCloser, error) { + req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) + if err != nil { + return nil, fmt.Errorf("cannot create request to %q: %s", c.Addr, err) + } + + params := req.URL.Query() + params.Set("match[]", f.Match) + if f.TimeStart != "" { + params.Set("start", f.TimeStart) + } + if f.TimeEnd != "" { + params.Set("end", f.TimeEnd) + } + req.URL.RawQuery = params.Encode() + + // disable compression since it is meaningless for native format + req.Header.Set("Accept-Encoding", "identity") + resp, err := c.do(req, http.StatusOK) + if err != nil { + return nil, fmt.Errorf("export request failed: %w", err) + } + return resp.Body, nil +} + +// GetSourceTenants discovers tenants by provided filter +func (c *Client) GetSourceTenants(ctx context.Context, f Filter) ([]string, error) { + u := fmt.Sprintf("%s/%s", c.Addr, nativeTenantsAddr) + req, err := http.NewRequestWithContext(ctx, http.MethodGet, u, nil) + if err != nil { + return nil, fmt.Errorf("cannot create request to %q: %s", u, err) + } + + params := req.URL.Query() + if f.TimeStart != "" { + params.Set("start", f.TimeStart) + } + if f.TimeEnd != "" { + params.Set("end", f.TimeEnd) + } + req.URL.RawQuery = params.Encode() + + resp, err := c.do(req, http.StatusOK) + if err != nil { + return nil, fmt.Errorf("tenants request failed: %s", err) + } + + var r struct { + Tenants []string `json:"data"` + } + if err := json.NewDecoder(resp.Body).Decode(&r); err != nil { + return nil, fmt.Errorf("cannot decode tenants response: %s", err) + } + + if err := resp.Body.Close(); err != nil { + return nil, fmt.Errorf("cannot close tenants response body: %s", err) + } + + return r.Tenants, nil +} + +func (c *Client) do(req *http.Request, expSC int) (*http.Response, error) { + if c.User != "" { + req.SetBasicAuth(c.User, c.Password) + } + resp, err := http.DefaultClient.Do(req) + if err != nil { + return nil, fmt.Errorf("unexpected error when performing request: %w", err) + } + + if resp.StatusCode != expSC { + body, err := io.ReadAll(resp.Body) + if err != nil { + return nil, fmt.Errorf("failed to read response body for status code %d: %s", resp.StatusCode, err) + } + return nil, fmt.Errorf("unexpected response code %d: %s", resp.StatusCode, string(body)) + } + return resp, err +} diff --git a/app/vmctl/native/filter.go b/app/vmctl/native/filter.go new file mode 100644 index 000000000..a038ba3ca --- /dev/null +++ b/app/vmctl/native/filter.go @@ -0,0 +1,22 @@ +package native + +import "fmt" + +// Filter represents request filter +type Filter struct { + Match string + TimeStart string + TimeEnd string + Chunk string +} + +func (f Filter) String() string { + s := fmt.Sprintf("\n\tfilter: match[]=%s", f.Match) + if f.TimeStart != "" { + s += fmt.Sprintf("\n\tstart: %s", f.TimeStart) + } + if f.TimeEnd != "" { + s += fmt.Sprintf("\n\tend: %s", f.TimeEnd) + } + return s +} diff --git a/app/vmctl/vm_native.go b/app/vmctl/vm_native.go index b86dc81f7..999fd3b63 100644 --- a/app/vmctl/vm_native.go +++ b/app/vmctl/vm_native.go @@ -2,177 +2,125 @@ package main import ( "context" - "encoding/json" "fmt" "io" "log" - "net/http" + "sync" "time" - "github.com/cheggaaa/pb/v3" - + "github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/backoff" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/limiter" + "github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/native" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/stepper" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/vm" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" + "github.com/cheggaaa/pb/v3" ) type vmNativeProcessor struct { - filter filter - rateLimit int64 + filter native.Filter - dst *vmNativeClient - src *vmNativeClient + dst *native.Client + src *native.Client + backoff *backoff.Backoff + + s *stats + rateLimit int64 interCluster bool -} - -type vmNativeClient struct { - addr string - user string - password string - extraLabels []string -} - -type filter struct { - match string - timeStart string - timeEnd string - chunk string -} - -func (f filter) String() string { - s := fmt.Sprintf("\n\tfilter: match[]=%s", f.match) - if f.timeStart != "" { - s += fmt.Sprintf("\n\tstart: %s", f.timeStart) - } - if f.timeEnd != "" { - s += fmt.Sprintf("\n\tend: %s", f.timeEnd) - } - return s + cc int } const ( - nativeExportAddr = "api/v1/export/native" - nativeImportAddr = "api/v1/import/native" - nativeTenantsAddr = "admin/tenants" - - nativeBarTpl = `Total: {{counters . }} {{ cycle . "↖" "↗" "↘" "↙" }} Speed: {{speed . }} {{string . "suffix"}}` + nativeExportAddr = "api/v1/export/native" + nativeImportAddr = "api/v1/import/native" + nativeBarTpl = `{{ blue "%s:" }} {{ counters . }} {{ bar . "[" "█" (cycle . "█") "▒" "]" }} {{ percent . }}` ) -func (p *vmNativeProcessor) run(ctx context.Context) error { - if p.filter.chunk == "" { - return p.runWithFilter(ctx, p.filter) +func (p *vmNativeProcessor) run(ctx context.Context, silent bool) error { + if p.cc == 0 { + p.cc = 1 + } + p.s = &stats{ + startTime: time.Now(), } - startOfRange, err := time.Parse(time.RFC3339, p.filter.timeStart) + start, err := time.Parse(time.RFC3339, p.filter.TimeStart) if err != nil { - return fmt.Errorf("failed to parse %s, provided: %s, expected format: %s, error: %v", vmNativeFilterTimeStart, p.filter.timeStart, time.RFC3339, err) + return fmt.Errorf("failed to parse %s, provided: %s, expected format: %s, error: %w", + vmNativeFilterTimeStart, p.filter.TimeStart, time.RFC3339, err) } - var endOfRange time.Time - if p.filter.timeEnd != "" { - endOfRange, err = time.Parse(time.RFC3339, p.filter.timeEnd) + end := time.Now().In(start.Location()) + if p.filter.TimeEnd != "" { + end, err = time.Parse(time.RFC3339, p.filter.TimeEnd) if err != nil { - return fmt.Errorf("failed to parse %s, provided: %s, expected format: %s, error: %v", vmNativeFilterTimeEnd, p.filter.timeEnd, time.RFC3339, err) + return fmt.Errorf("failed to parse %s, provided: %s, expected format: %s, error: %w", + vmNativeFilterTimeEnd, p.filter.TimeEnd, time.RFC3339, err) } - } else { - endOfRange = time.Now() } - ranges, err := stepper.SplitDateRange(startOfRange, endOfRange, p.filter.chunk) - if err != nil { - return fmt.Errorf("failed to create date ranges for the given time filters: %v", err) - } - - for rangeIdx, r := range ranges { - formattedStartTime := r[0].Format(time.RFC3339) - formattedEndTime := r[1].Format(time.RFC3339) - log.Printf("Processing range %d/%d: %s - %s \n", rangeIdx+1, len(ranges), formattedStartTime, formattedEndTime) - f := filter{ - match: p.filter.match, - timeStart: formattedStartTime, - timeEnd: formattedEndTime, - } - err := p.runWithFilter(ctx, f) - + ranges := [][]time.Time{{start, end}} + if p.filter.Chunk != "" { + ranges, err = stepper.SplitDateRange(start, end, p.filter.Chunk) if err != nil { - log.Printf("processing failed for range %d/%d: %s - %s \n", rangeIdx+1, len(ranges), formattedStartTime, formattedEndTime) - return err + return fmt.Errorf("failed to create date ranges for the given time filters: %w", err) } } + + tenants := []string{""} + if p.interCluster { + log.Printf("Discovering tenants...") + tenants, err = p.src.GetSourceTenants(ctx, p.filter) + if err != nil { + return fmt.Errorf("failed to get tenants: %w", err) + } + question := fmt.Sprintf("The following tenants were discovered: %s.\n Continue?", tenants) + if !silent && !prompt(question) { + return nil + } + } + + for _, tenantID := range tenants { + err := p.runBackfilling(ctx, tenantID, ranges, silent) + if err != nil { + return fmt.Errorf("migration failed: %s", err) + } + } + + log.Println("Import finished!") + log.Print(p.s) + return nil } -func (p *vmNativeProcessor) runWithFilter(ctx context.Context, f filter) error { - nativeImportAddr, err := vm.AddExtraLabelsToImportPath(nativeImportAddr, p.dst.extraLabels) +func (p *vmNativeProcessor) do(ctx context.Context, f native.Filter, srcURL, dstURL string) error { + retryableFunc := func() error { return p.runSingle(ctx, f, srcURL, dstURL) } + attempts, err := p.backoff.Retry(ctx, retryableFunc) + p.s.Lock() + p.s.retries += attempts + p.s.Unlock() if err != nil { - return fmt.Errorf("failed to add labels to import path: %s", err) - } - - if !p.interCluster { - srcURL := fmt.Sprintf("%s/%s", p.src.addr, nativeExportAddr) - dstURL := fmt.Sprintf("%s/%s", p.dst.addr, nativeImportAddr) - - return p.runSingle(ctx, f, srcURL, dstURL) - } - - tenants, err := p.getSourceTenants(ctx, f) - if err != nil { - return fmt.Errorf("failed to get source tenants: %s", err) - } - - log.Printf("Discovered tenants: %v", tenants) - for _, tenant := range tenants { - // src and dst expected formats: http://vminsert:8480/ and http://vmselect:8481/ - srcURL := fmt.Sprintf("%s/select/%s/prometheus/%s", p.src.addr, tenant, nativeExportAddr) - dstURL := fmt.Sprintf("%s/insert/%s/prometheus/%s", p.dst.addr, tenant, nativeImportAddr) - - if err := p.runSingle(ctx, f, srcURL, dstURL); err != nil { - return fmt.Errorf("failed to migrate data for tenant %q: %s", tenant, err) - } + return fmt.Errorf("failed to migrate from %s to %s (retry attempts: %d): %w\nwith fileter %s", srcURL, dstURL, attempts, err, f) } return nil } -func (p *vmNativeProcessor) runSingle(ctx context.Context, f filter, srcURL, dstURL string) error { - log.Printf("Initing export pipe from %q with filters: %s\n", srcURL, f) +func (p *vmNativeProcessor) runSingle(ctx context.Context, f native.Filter, srcURL, dstURL string) error { - exportReader, err := p.exportPipe(ctx, srcURL, f) + exportReader, err := p.src.ExportPipe(ctx, srcURL, f) if err != nil { - return fmt.Errorf("failed to init export pipe: %s", err) + return fmt.Errorf("failed to init export pipe: %w", err) } pr, pw := io.Pipe() - sync := make(chan struct{}) + done := make(chan struct{}) go func() { - defer func() { close(sync) }() - req, err := http.NewRequestWithContext(ctx, http.MethodPost, dstURL, pr) - if err != nil { - log.Fatalf("cannot create import request to %q: %s", p.dst.addr, err) - } - importResp, err := p.dst.do(req, http.StatusNoContent) - if err != nil { - log.Fatalf("import request failed: %s", err) - } - if err := importResp.Body.Close(); err != nil { - log.Fatalf("cannot close import response body: %s", err) - } - }() - - fmt.Printf("Initing import process to %q:\n", dstURL) - pool := pb.NewPool() - bar := pb.ProgressBarTemplate(nativeBarTpl).New(0) - pool.Add(bar) - barReader := bar.NewProxyReader(exportReader) - if err := pool.Start(); err != nil { - log.Printf("error start process bars pool: %s", err) - return err - } - defer func() { - bar.Finish() - if err := pool.Stop(); err != nil { - fmt.Printf("failed to stop barpool: %+v\n", err) + defer func() { close(done) }() + if err := p.dst.ImportPipe(ctx, dstURL, pr); err != nil { + logger.Errorf("error initialize import pipe: %s", err) + return } }() @@ -182,95 +130,176 @@ func (p *vmNativeProcessor) runSingle(ctx context.Context, f filter, srcURL, dst w = limiter.NewWriteLimiter(pw, rl) } - _, err = io.Copy(w, barReader) + written, err := io.Copy(w, exportReader) if err != nil { - return fmt.Errorf("failed to write into %q: %s", p.dst.addr, err) + return fmt.Errorf("failed to write into %q: %s", p.dst.Addr, err) } + p.s.Lock() + p.s.bytes += uint64(written) + p.s.requests++ + p.s.Unlock() + if err := pw.Close(); err != nil { return err } - <-sync + <-done - log.Println("Import finished!") return nil } -func (p *vmNativeProcessor) getSourceTenants(ctx context.Context, f filter) ([]string, error) { - u := fmt.Sprintf("%s/%s", p.src.addr, nativeTenantsAddr) - req, err := http.NewRequestWithContext(ctx, http.MethodGet, u, nil) +func (p *vmNativeProcessor) runBackfilling(ctx context.Context, tenantID string, ranges [][]time.Time, silent bool) error { + exportAddr := nativeExportAddr + srcURL := fmt.Sprintf("%s/%s", p.src.Addr, exportAddr) + + importAddr, err := vm.AddExtraLabelsToImportPath(nativeImportAddr, p.dst.ExtraLabels) if err != nil { - return nil, fmt.Errorf("cannot create request to %q: %s", u, err) + return fmt.Errorf("failed to add labels to import path: %s", err) + } + dstURL := fmt.Sprintf("%s/%s", p.dst.Addr, importAddr) + + if p.interCluster { + srcURL = fmt.Sprintf("%s/select/%s/prometheus/%s", p.src.Addr, tenantID, exportAddr) + dstURL = fmt.Sprintf("%s/insert/%s/prometheus/%s", p.dst.Addr, tenantID, importAddr) } - params := req.URL.Query() - if f.timeStart != "" { - params.Set("start", f.timeStart) + barPrefix := "Requests to make" + initMessage := "Initing import process from %q to %q with filter %s" + initParams := []interface{}{srcURL, dstURL, p.filter.String()} + if p.interCluster { + barPrefix = fmt.Sprintf("Requests to make for tenant %s", tenantID) + initMessage = "Initing import process from %q to %q with filter %s for tenant %s" + initParams = []interface{}{srcURL, dstURL, p.filter.String(), tenantID} } - if f.timeEnd != "" { - params.Set("end", f.timeEnd) - } - req.URL.RawQuery = params.Encode() - resp, err := p.src.do(req, http.StatusOK) + fmt.Println("") // extra line for better output formatting + log.Printf(initMessage, initParams...) + + log.Printf("Exploring metrics...") + metrics, err := p.src.Explore(ctx, p.filter, tenantID) if err != nil { - return nil, fmt.Errorf("tenants request failed: %s", err) + return fmt.Errorf("cannot get metrics from source %s: %w", p.src.Addr, err) } - var r struct { - Tenants []string `json:"data"` - } - if err := json.NewDecoder(resp.Body).Decode(&r); err != nil { - return nil, fmt.Errorf("cannot decode tenants response: %s", err) + if len(metrics) == 0 { + return fmt.Errorf("no metrics found") } - if err := resp.Body.Close(); err != nil { - return nil, fmt.Errorf("cannot close tenants response body: %s", err) - } - - return r.Tenants, nil -} - -func (p *vmNativeProcessor) exportPipe(ctx context.Context, url string, f filter) (io.ReadCloser, error) { - req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) - if err != nil { - return nil, fmt.Errorf("cannot create request to %q: %s", p.src.addr, err) - } - - params := req.URL.Query() - params.Set("match[]", f.match) - if f.timeStart != "" { - params.Set("start", f.timeStart) - } - if f.timeEnd != "" { - params.Set("end", f.timeEnd) - } - req.URL.RawQuery = params.Encode() - - // disable compression since it is meaningless for native format - req.Header.Set("Accept-Encoding", "identity") - resp, err := p.src.do(req, http.StatusOK) - if err != nil { - return nil, fmt.Errorf("export request failed: %s", err) - } - return resp.Body, nil -} - -func (c *vmNativeClient) do(req *http.Request, expSC int) (*http.Response, error) { - if c.user != "" { - req.SetBasicAuth(c.user, c.password) - } - resp, err := http.DefaultClient.Do(req) - if err != nil { - return nil, fmt.Errorf("unexpected error when performing request: %s", err) - } - - if resp.StatusCode != expSC { - body, err := io.ReadAll(resp.Body) - if err != nil { - return nil, fmt.Errorf("failed to read response body for status code %d: %s", resp.StatusCode, err) + foundSeriesMsg := fmt.Sprintf("Found %d metrics to import", len(metrics)) + if !p.interCluster { + // do not prompt for intercluster because there could be many tenants, + // and we don't want to interrupt the process when moving to the next tenant. + question := foundSeriesMsg + ". Continue?" + if !silent && !prompt(question) { + return nil } - return nil, fmt.Errorf("unexpected response code %d: %s", resp.StatusCode, string(body)) + } else { + log.Print(foundSeriesMsg) } - return resp, err + + processingMsg := fmt.Sprintf("Requests to make: %d", len(metrics)*len(ranges)) + if len(ranges) > 1 { + processingMsg = fmt.Sprintf("Selected time range will be split into %d ranges according to %q step. %s", len(ranges), p.filter.Chunk, processingMsg) + } + log.Print(processingMsg) + + var bar *pb.ProgressBar + if !silent { + bar = pb.ProgressBarTemplate(fmt.Sprintf(nativeBarTpl, barPrefix)).New(len(metrics) * len(ranges)) + bar.Start() + defer bar.Finish() + } + + filterCh := make(chan native.Filter) + errCh := make(chan error, p.cc) + + var wg sync.WaitGroup + for i := 0; i < p.cc; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for f := range filterCh { + if err := p.do(ctx, f, srcURL, dstURL); err != nil { + errCh <- err + return + } + if bar != nil { + bar.Increment() + } + } + }() + } + + // any error breaks the import + for s := range metrics { + for _, times := range ranges { + select { + case <-ctx.Done(): + return fmt.Errorf("context canceled") + case infErr := <-errCh: + return fmt.Errorf("native error: %s", infErr) + case filterCh <- native.Filter{ + Match: fmt.Sprintf("{%s=%q}", nameLabel, s), + TimeStart: times[0].Format(time.RFC3339), + TimeEnd: times[1].Format(time.RFC3339), + }: + } + } + } + + close(filterCh) + wg.Wait() + close(errCh) + + for err := range errCh { + return fmt.Errorf("import process failed: %s", err) + } + + return nil +} + +// stats represents client statistic +// when processing data +type stats struct { + sync.Mutex + startTime time.Time + bytes uint64 + requests uint64 + retries uint64 +} + +func (s *stats) String() string { + s.Lock() + defer s.Unlock() + + totalImportDuration := time.Since(s.startTime) + totalImportDurationS := totalImportDuration.Seconds() + bytesPerS := byteCountSI(0) + if s.bytes > 0 && totalImportDurationS > 0 { + bytesPerS = byteCountSI(int64(float64(s.bytes) / totalImportDurationS)) + } + + return fmt.Sprintf("VictoriaMetrics importer stats:\n"+ + " time spent while importing: %v;\n"+ + " total bytes: %s;\n"+ + " bytes/s: %s;\n"+ + " requests: %d;\n"+ + " requests retries: %d;", + totalImportDuration, + byteCountSI(int64(s.bytes)), bytesPerS, + s.requests, s.retries) +} + +func byteCountSI(b int64) string { + const unit = 1000 + if b < unit { + return fmt.Sprintf("%d B", b) + } + div, exp := int64(unit), 0 + for n := b / unit; n >= unit; n /= unit { + div *= unit + exp++ + } + return fmt.Sprintf("%.1f %cB", + float64(b)/float64(div), "kMGTPE"[exp]) } diff --git a/app/vmctl/vm_native_test.go b/app/vmctl/vm_native_test.go index b254dc33c..79329d8f5 100644 --- a/app/vmctl/vm_native_test.go +++ b/app/vmctl/vm_native_test.go @@ -5,6 +5,7 @@ import ( "testing" "time" + "github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/native" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/stepper" ) @@ -27,10 +28,10 @@ const ( func Test_vmNativeProcessor_run(t *testing.T) { t.Skip() type fields struct { - filter filter + filter native.Filter rateLimit int64 - dst *vmNativeClient - src *vmNativeClient + dst *native.Client + src *native.Client } tests := []struct { name string @@ -41,16 +42,16 @@ func Test_vmNativeProcessor_run(t *testing.T) { { name: "simulate syscall.SIGINT", fields: fields{ - filter: filter{ - match: matchFilter, - timeStart: timeStartFilter, + filter: native.Filter{ + Match: matchFilter, + TimeStart: timeStartFilter, }, rateLimit: 0, - dst: &vmNativeClient{ - addr: dstAddr, + dst: &native.Client{ + Addr: dstAddr, }, - src: &vmNativeClient{ - addr: srcAddr, + src: &native.Client{ + Addr: srcAddr, }, }, closer: func(cancelFunc context.CancelFunc) { @@ -62,16 +63,16 @@ func Test_vmNativeProcessor_run(t *testing.T) { { name: "simulate correct work", fields: fields{ - filter: filter{ - match: matchFilter, - timeStart: timeStartFilter, + filter: native.Filter{ + Match: matchFilter, + TimeStart: timeStartFilter, }, rateLimit: 0, - dst: &vmNativeClient{ - addr: dstAddr, + dst: &native.Client{ + Addr: dstAddr, }, - src: &vmNativeClient{ - addr: srcAddr, + src: &native.Client{ + Addr: srcAddr, }, }, closer: func(cancelFunc context.CancelFunc) {}, @@ -80,18 +81,18 @@ func Test_vmNativeProcessor_run(t *testing.T) { { name: "simulate correct work with chunking", fields: fields{ - filter: filter{ - match: matchFilter, - timeStart: timeStartFilter, - timeEnd: timeEndFilter, - chunk: stepper.StepMonth, + filter: native.Filter{ + Match: matchFilter, + TimeStart: timeStartFilter, + TimeEnd: timeEndFilter, + Chunk: stepper.StepMonth, }, rateLimit: 0, - dst: &vmNativeClient{ - addr: dstAddr, + dst: &native.Client{ + Addr: dstAddr, }, - src: &vmNativeClient{ - addr: srcAddr, + src: &native.Client{ + Addr: srcAddr, }, }, closer: func(cancelFunc context.CancelFunc) {}, @@ -110,7 +111,7 @@ func Test_vmNativeProcessor_run(t *testing.T) { tt.closer(cancelFn) - if err := p.run(ctx); (err != nil) != tt.wantErr { + if err := p.run(ctx, true); (err != nil) != tt.wantErr { t.Errorf("run() error = %v, wantErr %v", err, tt.wantErr) } }) diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 81d3dd4a0..a2f78d593 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -15,6 +15,7 @@ The following tip changes can be tested by building VictoriaMetrics components f ## tip +* FEATURE: [vmctl](https://docs.victoriametrics.com/vmctl.html): `vmctl` `vm-native` mode now splits the migration process on per-metric basis. This allows to migrate metrics one-by-one according to the specified filter. This change allows to retry export/import requests for a specific metric and provides a better understanding of the migration progress. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3600). * BUGFIX: prevent from possible panic during [background merge process](https://docs.victoriametrics.com/#storage). It may occur in rare case and was introduced at [v1.85.0](https://docs.victoriametrics.com/CHANGELOG.html#v1850) when implementing [this feature](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3337). diff --git a/docs/vmctl.md b/docs/vmctl.md index df6dc5de0..4f7d2ccef 100644 --- a/docs/vmctl.md +++ b/docs/vmctl.md @@ -741,21 +741,33 @@ or higher. See `./vmctl vm-native --help` for details and full list of flags. -In this mode `vmctl` acts as a proxy between two VM instances, where time series filtering is done by "source" (`src`) -and processing is done by "destination" (`dst`). Because of that, `vmctl` doesn't actually know how much data will be -processed and can't show the progress bar. It will show the current processing speed and total number of processed bytes: +Migration in `vm-native` mode takes two steps: +1. Explore the list of the metrics to migrate via `/api/v1/series` API; +2. Migrate explored metrics one-by-one. ``` -./vmctl vm-native --vm-native-src-addr=http://localhost:8528 \ - --vm-native-dst-addr=http://localhost:8428 \ - --vm-native-filter-match='{job="vmagent"}' \ - --vm-native-filter-time-start='2020-01-01T20:07:00Z' +./vmctl vm-native \ + --vm-native-src-addr=http://127.0.0.1:8481/select/0/prometheus \ + --vm-native-dst-addr=http://localhost:8428 \ + --vm-native-filter-time-start='2022-11-20T00:00:00Z' \ + --vm-native-filter-match='{__name__=~"vm_cache_.*"}' VictoriaMetrics Native import mode -Initing export pipe from "http://localhost:8528" with filters: - filter: match[]={job="vmagent"} -Initing import process to "http://localhost:8428": -Total: 336.75 KiB ↖ Speed: 454.46 KiB p/s -2020/10/13 17:04:59 Total time: 952.143376ms + +2023/03/02 09:22:02 Initing import process from "http://127.0.0.1:8481/select/0/prometheus/api/v1/export/native" to "http://localhost:8428/api/v1/import/native" with filter + filter: match[]={__name__=~"vm_cache_.*"} + start: 2022-11-20T00:00:00Z +2023/03/02 09:22:02 Exploring metrics... +Found 9 metrics to import. Continue? [Y/n] +2023/03/02 09:22:04 Requests to make: 9 +Requests to make: 9 / 9 [███████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████] 100.00% +2023/03/02 09:22:06 Import finished! +2023/03/02 09:22:06 VictoriaMetrics importer stats: + time spent while importing: 3.632638875s; + total bytes: 7.8 MB; + bytes/s: 2.1 MB; + requests: 9; + requests retries: 0; +2023/03/02 09:22:06 Total time: 3.633127625s ``` Importing tips: @@ -763,6 +775,7 @@ Importing tips: 1. Migrating big volumes of data may result in reaching the safety limits on `src` side. Please verify that `-search.maxExportDuration` and `-search.maxExportSeries` were set with proper values for `src`. If hitting the limits, follow the recommendations [here](https://docs.victoriametrics.com/#how-to-export-data-in-native-format). +If hitting `the number of matching timeseries exceeds...` error, adjust filters to match less time series or update `-search.maxSeries` command-line flag on vmselect/vmsingle; 2. Migrating all the metrics from one VM to another may collide with existing application metrics (prefixed with `vm_`) at destination and lead to confusion when using [official Grafana dashboards](https://grafana.com/orgs/victoriametrics/dashboards). @@ -774,71 +787,57 @@ Instead, use [relabeling in VictoriaMetrics](https://github.com/VictoriaMetrics/ 5. When importing in or from cluster version remember to use correct [URL format](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#url-format) and specify `accountID` param. 6. When migrating large volumes of data it might be useful to use `--vm-native-step-interval` flag to split single process into smaller steps. +7. `vmctl` supports `--vm-concurrency` which controls the number of concurrent workers that process the input from source query results. +Please note that each import request can load up to a single vCPU core on VictoriaMetrics. So try to set it according +to allocated CPU resources of your VictoriaMetrics installation. + +In this mode `vmctl` acts as a proxy between two VM instances, where time series filtering is done by "source" (`src`) +and processing is done by "destination" (`dst`). So no extra memory or CPU resources required on `vmctl` side. Only +`src` and `dst` resource matter. #### Using time-based chunking of migration -It is possible split migration process into set of smaller batches based on time. This is especially useful when migrating large volumes of data as this adds indication of progress and ability to restore process from certain point in case of failure. +It is possible split migration process into set of smaller batches based on time. This is especially useful when +migrating large volumes of data as this adds indication of progress and ability to restore process from certain point +in case of failure. To use this you need to specify `--vm-native-step-interval` flag. Supported values are: `month`, `day`, `hour`. -Note that in order to use this it is required `--vm-native-filter-time-start` to be set to calculate time ranges for export process. +Note that in order to use this it is required `--vm-native-filter-time-start` to be set to calculate time ranges for +export process. Every range is being processed independently, which means that: - after range processing is finished all data within range is migrated -- if process fails on one of stages it is guaranteed that data of prior stages is already written, so it is possible to restart process starting from failed range +- if process fails on one of stages it is guaranteed that data of prior stages is already written, +so it is possible to restart process starting from failed range. -It is recommended using the `month` step when migrating the data over multiple months, since the migration with `day` and `hour` steps may take longer time to complete -because of additional overhead. +It is recommended using the `month` step when migrating the data over multiple months, +since the migration with `day` and `hour` steps may take longer time to complete because of additional overhead. Usage example: ```console -./vmctl vm-native - --vm-native-filter-time-start 2022-06-17T00:07:00Z \ - --vm-native-filter-time-end 2022-10-03T00:07:00Z \ - --vm-native-src-addr http://localhost:8428 \ - --vm-native-dst-addr http://localhost:8528 \ - --vm-native-step-interval=month +./vmctl vm-native \ + --vm-native-src-addr=http://127.0.0.1:8481/select/0/prometheus \ + --vm-native-dst-addr=http://localhost:8428 \ + --vm-native-filter-time-start='2022-11-20T00:00:00Z' \ + --vm-native-step-interval=month \ + --vm-native-filter-match='{__name__=~"vm_cache_.*"}' VictoriaMetrics Native import mode -2022/08/30 19:48:24 Processing range 1/5: 2022-06-17T00:07:00Z - 2022-06-30T23:59:59Z -2022/08/30 19:48:24 Initing export pipe from "http://localhost:8428" with filters: - filter: match[]={__name__!=""} - start: 2022-06-17T00:07:00Z - end: 2022-06-30T23:59:59Z -Initing import process to "http://localhost:8428": -2022/08/30 19:48:24 Import finished! -Total: 16 B ↗ Speed: 28.89 KiB p/s -2022/08/30 19:48:24 Processing range 2/5: 2022-07-01T00:00:00Z - 2022-07-31T23:59:59Z -2022/08/30 19:48:24 Initing export pipe from "http://localhost:8428" with filters: - filter: match[]={__name__!=""} - start: 2022-07-01T00:00:00Z - end: 2022-07-31T23:59:59Z -Initing import process to "http://localhost:8428": -2022/08/30 19:48:24 Import finished! -Total: 16 B ↗ Speed: 164.35 KiB p/s -2022/08/30 19:48:24 Processing range 3/5: 2022-08-01T00:00:00Z - 2022-08-31T23:59:59Z -2022/08/30 19:48:24 Initing export pipe from "http://localhost:8428" with filters: - filter: match[]={__name__!=""} - start: 2022-08-01T00:00:00Z - end: 2022-08-31T23:59:59Z -Initing import process to "http://localhost:8428": -2022/08/30 19:48:24 Import finished! -Total: 16 B ↗ Speed: 191.42 KiB p/s -2022/08/30 19:48:24 Processing range 4/5: 2022-09-01T00:00:00Z - 2022-09-30T23:59:59Z -2022/08/30 19:48:24 Initing export pipe from "http://localhost:8428" with filters: - filter: match[]={__name__!=""} - start: 2022-09-01T00:00:00Z - end: 2022-09-30T23:59:59Z -Initing import process to "http://localhost:8428": -2022/08/30 19:48:24 Import finished! -Total: 16 B ↗ Speed: 141.04 KiB p/s -2022/08/30 19:48:24 Processing range 5/5: 2022-10-01T00:00:00Z - 2022-10-03T00:07:00Z -2022/08/30 19:48:24 Initing export pipe from "http://localhost:8428" with filters: - filter: match[]={__name__!=""} - start: 2022-10-01T00:00:00Z - end: 2022-10-03T00:07:00Z -Initing import process to "http://localhost:8428": -2022/08/30 19:48:24 Import finished! -Total: 16 B ↗ Speed: 186.32 KiB p/s -2022/08/30 19:48:24 Total time: 12.680582ms + +2023/03/02 09:18:05 Initing import process from "http://127.0.0.1:8481/select/0/prometheus/api/v1/export/native" to "http://localhost:8428/api/v1/import/native" with filter + filter: match[]={__name__=~"vm_cache_.*"} + start: 2022-11-20T00:00:00Z +2023/03/02 09:18:05 Exploring metrics... +Found 9 metrics to import. Continue? [Y/n] +2023/03/02 09:18:07 Selected time range will be split into 5 ranges according to "month" step. Requests to make: 45. +Requests to make: 45 / 45 [█████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████] 100.00% +2023/03/02 09:18:12 Import finished! +2023/03/02 09:18:12 VictoriaMetrics importer stats: + time spent while importing: 7.111870667s; + total bytes: 7.7 MB; + bytes/s: 1.1 MB; + requests: 45; + requests retries: 0; +2023/03/02 09:18:12 Total time: 7.112405875s ``` #### Cluster-to-cluster migration mode @@ -850,70 +849,41 @@ Cluster-to-cluster uses `/admin/tenants` endpoint (available starting from [v1.8 To use this mode you need to set `--vm-intercluster` flag to `true`, `--vm-native-src-addr` flag to 'http://vmselect:8481/' and `--vm-native-dst-addr` value to http://vminsert:8480/: ```console -./bin/vmctl vm-native --vm-intercluster=true --vm-native-src-addr=http://localhost:8481/ --vm-native-dst-addr=http://172.17.0.3:8480/ + ./vmctl vm-native --vm-native-src-addr=http://127.0.0.1:8481/ \ + --vm-native-dst-addr=http://127.0.0.1:8480/ \ + --vm-native-filter-match='{__name__="vm_app_uptime_seconds"}' \ + --vm-native-filter-time-start='2023-02-01T00:00:00Z' \ + --vm-native-step-interval=day \ +--vm-intercluster VictoriaMetrics Native import mode -2022/12/05 21:20:06 Discovered tenants: [123:1 12812919:1 1289198:1 1289:1283 12:1 1:0 1:1 1:1231231 1:1271727 1:12819 1:281 812891298:1] -2022/12/05 21:20:06 Initing export pipe from "http://localhost:8481/select/123:1/prometheus/api/v1/export/native" with filters: - filter: match[]={__name__!=""} -Initing import process to "http://172.17.0.3:8480/insert/123:1/prometheus/api/v1/import/native": -Total: 61.13 MiB ↖ Speed: 2.05 MiB p/s -Total: 61.13 MiB ↗ Speed: 2.30 MiB p/s -2022/12/05 21:20:33 Initing export pipe from "http://localhost:8481/select/12812919:1/prometheus/api/v1/export/native" with filters: - filter: match[]={__name__!=""} -Initing import process to "http://172.17.0.3:8480/insert/12812919:1/prometheus/api/v1/import/native": -Total: 43.14 MiB ↘ Speed: 1.86 MiB p/s -Total: 43.14 MiB ↙ Speed: 2.36 MiB p/s -2022/12/05 21:20:51 Initing export pipe from "http://localhost:8481/select/1289198:1/prometheus/api/v1/export/native" with filters: - filter: match[]={__name__!=""} -Initing import process to "http://172.17.0.3:8480/insert/1289198:1/prometheus/api/v1/import/native": -Total: 16.64 MiB ↗ Speed: 2.66 MiB p/s -Total: 16.64 MiB ↘ Speed: 2.19 MiB p/s -2022/12/05 21:20:59 Initing export pipe from "http://localhost:8481/select/1289:1283/prometheus/api/v1/export/native" with filters: - filter: match[]={__name__!=""} -Initing import process to "http://172.17.0.3:8480/insert/1289:1283/prometheus/api/v1/import/native": -Total: 43.33 MiB ↙ Speed: 1.94 MiB p/s -Total: 43.33 MiB ↖ Speed: 2.35 MiB p/s -2022/12/05 21:21:18 Initing export pipe from "http://localhost:8481/select/12:1/prometheus/api/v1/export/native" with filters: - filter: match[]={__name__!=""} -Initing import process to "http://172.17.0.3:8480/insert/12:1/prometheus/api/v1/import/native": -Total: 63.78 MiB ↙ Speed: 1.96 MiB p/s -Total: 63.78 MiB ↖ Speed: 2.28 MiB p/s -2022/12/05 21:21:46 Initing export pipe from "http://localhost:8481/select/1:0/prometheus/api/v1/export/native" with filters: - filter: match[]={__name__!=""} -Initing import process to "http://172.17.0.3:8480/insert/1:0/prometheus/api/v1/import/native": -2022/12/05 21:21:46 Import finished! -Total: 330 B ↗ Speed: 3.53 MiB p/s -2022/12/05 21:21:46 Initing export pipe from "http://localhost:8481/select/1:1/prometheus/api/v1/export/native" with filters: - filter: match[]={__name__!=""} -Initing import process to "http://172.17.0.3:8480/insert/1:1/prometheus/api/v1/import/native": -Total: 63.81 MiB ↙ Speed: 1.96 MiB p/s -Total: 63.81 MiB ↖ Speed: 2.28 MiB p/s -2022/12/05 21:22:14 Initing export pipe from "http://localhost:8481/select/1:1231231/prometheus/api/v1/export/native" with filters: - filter: match[]={__name__!=""} -Initing import process to "http://172.17.0.3:8480/insert/1:1231231/prometheus/api/v1/import/native": -Total: 63.84 MiB ↙ Speed: 1.93 MiB p/s -Total: 63.84 MiB ↖ Speed: 2.29 MiB p/s -2022/12/05 21:22:42 Initing export pipe from "http://localhost:8481/select/1:1271727/prometheus/api/v1/export/native" with filters: - filter: match[]={__name__!=""} -Initing import process to "http://172.17.0.3:8480/insert/1:1271727/prometheus/api/v1/import/native": -Total: 54.37 MiB ↘ Speed: 1.90 MiB p/s -Total: 54.37 MiB ↙ Speed: 2.37 MiB p/s -2022/12/05 21:23:05 Initing export pipe from "http://localhost:8481/select/1:12819/prometheus/api/v1/export/native" with filters: - filter: match[]={__name__!=""} -Initing import process to "http://172.17.0.3:8480/insert/1:12819/prometheus/api/v1/import/native": -Total: 17.01 MiB ↙ Speed: 1.75 MiB p/s -Total: 17.01 MiB ↖ Speed: 2.15 MiB p/s -2022/12/05 21:23:13 Initing export pipe from "http://localhost:8481/select/1:281/prometheus/api/v1/export/native" with filters: - filter: match[]={__name__!=""} -Initing import process to "http://172.17.0.3:8480/insert/1:281/prometheus/api/v1/import/native": -Total: 63.89 MiB ↘ Speed: 1.90 MiB p/s -Total: 63.89 MiB ↙ Speed: 2.29 MiB p/s -2022/12/05 21:23:42 Initing export pipe from "http://localhost:8481/select/812891298:1/prometheus/api/v1/export/native" with filters: - filter: match[]={__name__!=""} -Initing import process to "http://172.17.0.3:8480/insert/812891298:1/prometheus/api/v1/import/native": -Total: 63.84 MiB ↖ Speed: 1.99 MiB p/s -Total: 63.84 MiB ↗ Speed: 2.26 MiB p/s -2022/12/05 21:24:10 Total time: 4m4.1466565s +2023/02/28 10:41:42 Discovering tenants... +2023/02/28 10:41:42 The following tenants were discovered: [0:0 1:0 2:0 3:0 4:0] +2023/02/28 10:41:42 Initing import process from "http://127.0.0.1:8481/select/0:0/prometheus/api/v1/export/native" to "http://127.0.0.1:8480/insert/0:0/prometheus/api/v1/import/native" with filter + filter: match[]={__name__="vm_app_uptime_seconds"} + start: 2023-02-01T00:00:00Z for tenant 0:0 +2023/02/28 10:41:42 Exploring metrics... +2023/02/28 10:41:42 Found 1 metrics to import +2023/02/28 10:41:42 Selected time range will be split into 28 ranges according to "day" step. +Requests to make for tenant 0:0: 28 / 28 [███████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████] 100.00% + +2023/02/28 10:41:45 Initing import process from "http://127.0.0.1:8481/select/1:0/prometheus/api/v1/export/native" to "http://127.0.0.1:8480/insert/1:0/prometheus/api/v1/import/native" with filter + filter: match[]={__name__="vm_app_uptime_seconds"} + start: 2023-02-01T00:00:00Z for tenant 1:0 +2023/02/28 10:41:45 Exploring metrics... +2023/02/28 10:41:45 Found 1 metrics to import +2023/02/28 10:41:45 Selected time range will be split into 28 ranges according to "day" step. Requests to make: 28 +Requests to make for tenant 1:0: 28 / 28 [████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████] 100.00% + +... + +2023/02/28 10:42:49 Import finished! +2023/02/28 10:42:49 VictoriaMetrics importer stats: + time spent while importing: 1m6.714210417s; + total bytes: 39.7 MB; + bytes/s: 594.4 kB; + requests: 140; + requests retries: 0; +2023/02/28 10:42:49 Total time: 1m7.147971417s ``` ## Verifying exported blocks from VictoriaMetrics @@ -980,6 +950,7 @@ a sign of network issues or VM being overloaded. See the logs during import for By default `vmctl` waits confirmation from user before starting the import. If this is unwanted behavior and no user interaction required - pass `-s` flag to enable "silence" mode: +See below the example of `vm-native` migration process: ``` -s Whether to run in silent mode. If set to true no confirmation prompts will appear. (default: false) ```