diff --git a/app/vmctl/README.md b/app/vmctl/README.md index 21a03138f..38be9a1c7 100644 --- a/app/vmctl/README.md +++ b/app/vmctl/README.md @@ -833,6 +833,80 @@ Total: 16 B ↗ Speed: 186.32 KiB p/s 2022/08/30 19:48:24 Total time: 12.680582ms ``` +#### Cluster-to-cluster migration mode + +Using cluster-to-cluster migration mode helps to migrate all tenants data in a single `vmctl` run. + +Cluster-to-cluster uses `/admin/tenants` endpoint (available starting from [v1.84.0](https://docs.victoriametrics.com/CHANGELOG.html#v1840)) to discover list of tenants from source cluster. + +To use this mode you need to set `--vm-intercluster` flag to `true`, `--vm-native-src-addr` flag to 'http://vmselect:8481/' and `--vm-native-dst-addr` value to http://vminsert:8480/: + +```console +./bin/vmctl vm-native --vm-intercluster=true --vm-native-src-addr=http://localhost:8481/ --vm-native-dst-addr=http://172.17.0.3:8480/ +VictoriaMetrics Native import mode +2022/12/05 21:20:06 Discovered tenants: [123:1 12812919:1 1289198:1 1289:1283 12:1 1:0 1:1 1:1231231 1:1271727 1:12819 1:281 812891298:1] +2022/12/05 21:20:06 Initing export pipe from "http://localhost:8481/select/123:1/prometheus/api/v1/export/native" with filters: + filter: match[]={__name__!=""} +Initing import process to "http://172.17.0.3:8480/insert/123:1/prometheus/api/v1/import/native": +Total: 61.13 MiB ↖ Speed: 2.05 MiB p/s +Total: 61.13 MiB ↗ Speed: 2.30 MiB p/s +2022/12/05 21:20:33 Initing export pipe from "http://localhost:8481/select/12812919:1/prometheus/api/v1/export/native" with filters: + filter: match[]={__name__!=""} +Initing import process to "http://172.17.0.3:8480/insert/12812919:1/prometheus/api/v1/import/native": +Total: 43.14 MiB ↘ Speed: 1.86 MiB p/s +Total: 43.14 MiB ↙ Speed: 2.36 MiB p/s +2022/12/05 21:20:51 Initing export pipe from "http://localhost:8481/select/1289198:1/prometheus/api/v1/export/native" with filters: + filter: match[]={__name__!=""} +Initing import process to "http://172.17.0.3:8480/insert/1289198:1/prometheus/api/v1/import/native": +Total: 16.64 MiB ↗ Speed: 2.66 MiB p/s +Total: 16.64 MiB ↘ Speed: 2.19 MiB p/s +2022/12/05 21:20:59 Initing export pipe from "http://localhost:8481/select/1289:1283/prometheus/api/v1/export/native" with filters: + filter: match[]={__name__!=""} +Initing import process to "http://172.17.0.3:8480/insert/1289:1283/prometheus/api/v1/import/native": +Total: 43.33 MiB ↙ Speed: 1.94 MiB p/s +Total: 43.33 MiB ↖ Speed: 2.35 MiB p/s +2022/12/05 21:21:18 Initing export pipe from "http://localhost:8481/select/12:1/prometheus/api/v1/export/native" with filters: + filter: match[]={__name__!=""} +Initing import process to "http://172.17.0.3:8480/insert/12:1/prometheus/api/v1/import/native": +Total: 63.78 MiB ↙ Speed: 1.96 MiB p/s +Total: 63.78 MiB ↖ Speed: 2.28 MiB p/s +2022/12/05 21:21:46 Initing export pipe from "http://localhost:8481/select/1:0/prometheus/api/v1/export/native" with filters: + filter: match[]={__name__!=""} +Initing import process to "http://172.17.0.3:8480/insert/1:0/prometheus/api/v1/import/native": +2022/12/05 21:21:46 Import finished! +Total: 330 B ↗ Speed: 3.53 MiB p/s +2022/12/05 21:21:46 Initing export pipe from "http://localhost:8481/select/1:1/prometheus/api/v1/export/native" with filters: + filter: match[]={__name__!=""} +Initing import process to "http://172.17.0.3:8480/insert/1:1/prometheus/api/v1/import/native": +Total: 63.81 MiB ↙ Speed: 1.96 MiB p/s +Total: 63.81 MiB ↖ Speed: 2.28 MiB p/s +2022/12/05 21:22:14 Initing export pipe from "http://localhost:8481/select/1:1231231/prometheus/api/v1/export/native" with filters: + filter: match[]={__name__!=""} +Initing import process to "http://172.17.0.3:8480/insert/1:1231231/prometheus/api/v1/import/native": +Total: 63.84 MiB ↙ Speed: 1.93 MiB p/s +Total: 63.84 MiB ↖ Speed: 2.29 MiB p/s +2022/12/05 21:22:42 Initing export pipe from "http://localhost:8481/select/1:1271727/prometheus/api/v1/export/native" with filters: + filter: match[]={__name__!=""} +Initing import process to "http://172.17.0.3:8480/insert/1:1271727/prometheus/api/v1/import/native": +Total: 54.37 MiB ↘ Speed: 1.90 MiB p/s +Total: 54.37 MiB ↙ Speed: 2.37 MiB p/s +2022/12/05 21:23:05 Initing export pipe from "http://localhost:8481/select/1:12819/prometheus/api/v1/export/native" with filters: + filter: match[]={__name__!=""} +Initing import process to "http://172.17.0.3:8480/insert/1:12819/prometheus/api/v1/import/native": +Total: 17.01 MiB ↙ Speed: 1.75 MiB p/s +Total: 17.01 MiB ↖ Speed: 2.15 MiB p/s +2022/12/05 21:23:13 Initing export pipe from "http://localhost:8481/select/1:281/prometheus/api/v1/export/native" with filters: + filter: match[]={__name__!=""} +Initing import process to "http://172.17.0.3:8480/insert/1:281/prometheus/api/v1/import/native": +Total: 63.89 MiB ↘ Speed: 1.90 MiB p/s +Total: 63.89 MiB ↙ Speed: 2.29 MiB p/s +2022/12/05 21:23:42 Initing export pipe from "http://localhost:8481/select/812891298:1/prometheus/api/v1/export/native" with filters: + filter: match[]={__name__!=""} +Initing import process to "http://172.17.0.3:8480/insert/812891298:1/prometheus/api/v1/import/native": +Total: 63.84 MiB ↖ Speed: 1.99 MiB p/s +Total: 63.84 MiB ↗ Speed: 2.26 MiB p/s +2022/12/05 21:24:10 Total time: 4m4.1466565s +``` ## Verifying exported blocks from VictoriaMetrics diff --git a/app/vmctl/flags.go b/app/vmctl/flags.go index b6254351b..dccfb1024 100644 --- a/app/vmctl/flags.go +++ b/app/vmctl/flags.go @@ -44,6 +44,8 @@ const ( // also used in vm-native vmExtraLabel = "vm-extra-label" vmRateLimit = "vm-rate-limit" + + vmInterCluster = "vm-intercluster" ) var ( @@ -398,6 +400,12 @@ var ( Usage: "Optional data transfer rate limit in bytes per second.\n" + "By default the rate limit is disabled. It can be useful for limiting load on source or destination databases.", }, + &cli.BoolFlag{ + Name: vmInterCluster, + Usage: "Enables cluster-to-cluster migration mode with automatic tenants data migration.\n" + + fmt.Sprintf(" In this mode --%s flag format is: 'http://vmselect:8481/'. --%s flag format is: http://vminsert:8480/. \n", vmNativeSrcAddr, vmNativeDstAddr) + + " TenantID will be appended automatically after discovering tenants from src.", + }, } ) diff --git a/app/vmctl/main.go b/app/vmctl/main.go index 19406d511..51ac55c51 100644 --- a/app/vmctl/main.go +++ b/app/vmctl/main.go @@ -200,7 +200,8 @@ func main() { } p := vmNativeProcessor{ - rateLimit: c.Int64(vmRateLimit), + rateLimit: c.Int64(vmRateLimit), + interCluster: c.Bool(vmInterCluster), filter: filter{ match: c.String(vmNativeFilterMatch), timeStart: c.String(vmNativeFilterTimeStart), diff --git a/app/vmctl/vm_native.go b/app/vmctl/vm_native.go index d2f8013bb..dd85d8b74 100644 --- a/app/vmctl/vm_native.go +++ b/app/vmctl/vm_native.go @@ -2,6 +2,7 @@ package main import ( "context" + "encoding/json" "fmt" "io" "log" @@ -19,8 +20,9 @@ type vmNativeProcessor struct { filter filter rateLimit int64 - dst *vmNativeClient - src *vmNativeClient + dst *vmNativeClient + src *vmNativeClient + interCluster bool } type vmNativeClient struct { @@ -49,15 +51,16 @@ func (f filter) String() string { } const ( - nativeExportAddr = "api/v1/export/native" - nativeImportAddr = "api/v1/import/native" + nativeExportAddr = "api/v1/export/native" + nativeImportAddr = "api/v1/import/native" + nativeTenantsAddr = "admin/tenants" nativeBarTpl = `Total: {{counters . }} {{ cycle . "↖" "↗" "↘" "↙" }} Speed: {{speed . }} {{string . "suffix"}}` ) func (p *vmNativeProcessor) run(ctx context.Context) error { if p.filter.chunk == "" { - return p.runSingle(ctx, p.filter) + return p.runWithFilter(ctx, p.filter) } startOfRange, err := time.Parse(time.RFC3339, p.filter.timeStart) @@ -89,7 +92,7 @@ func (p *vmNativeProcessor) run(ctx context.Context) error { timeStart: formattedStartTime, timeEnd: formattedEndTime, } - err := p.runSingle(ctx, f) + err := p.runWithFilter(ctx, f) if err != nil { log.Printf("processing failed for range %d/%d: %s - %s \n", rangeIdx+1, len(ranges), formattedStartTime, formattedEndTime) @@ -99,25 +102,52 @@ func (p *vmNativeProcessor) run(ctx context.Context) error { return nil } -func (p *vmNativeProcessor) runSingle(ctx context.Context, f filter) error { - pr, pw := io.Pipe() +func (p *vmNativeProcessor) runWithFilter(ctx context.Context, f filter) error { + nativeImportAddr, err := vm.AddExtraLabelsToImportPath(nativeImportAddr, p.dst.extraLabels) - log.Printf("Initing export pipe from %q with filters: %s\n", p.src.addr, f) - exportReader, err := p.exportPipe(ctx, f) + if err != nil { + return fmt.Errorf("failed to add labels to import path: %s", err) + } + + if !p.interCluster { + srcURL := fmt.Sprintf("%s/%s", p.src.addr, nativeExportAddr) + dstURL := fmt.Sprintf("%s/%s", p.dst.addr, nativeImportAddr) + + return p.runSingle(ctx, f, srcURL, dstURL) + } + + tenants, err := p.getSourceTenants(ctx, f) + if err != nil { + return fmt.Errorf("failed to get source tenants: %s", err) + } + + log.Printf("Discovered tenants: %v", tenants) + for _, tenant := range tenants { + // src and dst expected formats: http://vminsert:8480/ and http://vmselect:8481/ + srcURL := fmt.Sprintf("%s/select/%s/prometheus/%s", p.src.addr, tenant, nativeExportAddr) + dstURL := fmt.Sprintf("%s/insert/%s/prometheus/%s", p.dst.addr, tenant, nativeImportAddr) + + if err := p.runSingle(ctx, f, srcURL, dstURL); err != nil { + return fmt.Errorf("failed to migrate data for tenant %q: %s", tenant, err) + } + } + + return nil +} + +func (p *vmNativeProcessor) runSingle(ctx context.Context, f filter, srcURL, dstURL string) error { + log.Printf("Initing export pipe from %q with filters: %s\n", srcURL, f) + + exportReader, err := p.exportPipe(ctx, srcURL, f) if err != nil { return fmt.Errorf("failed to init export pipe: %s", err) } - nativeImportAddr, err := vm.AddExtraLabelsToImportPath(nativeImportAddr, p.dst.extraLabels) - if err != nil { - return err - } - + pr, pw := io.Pipe() sync := make(chan struct{}) go func() { defer func() { close(sync) }() - u := fmt.Sprintf("%s/%s", p.dst.addr, nativeImportAddr) - req, err := http.NewRequestWithContext(ctx, "POST", u, pr) + req, err := http.NewRequestWithContext(ctx, "POST", dstURL, pr) if err != nil { log.Fatalf("cannot create import request to %q: %s", p.dst.addr, err) } @@ -130,7 +160,7 @@ func (p *vmNativeProcessor) runSingle(ctx context.Context, f filter) error { } }() - fmt.Printf("Initing import process to %q:\n", p.dst.addr) + fmt.Printf("Initing import process to %q:\n", dstURL) pool := pb.NewPool() bar := pb.ProgressBarTemplate(nativeBarTpl).New(0) pool.Add(bar) @@ -166,9 +196,43 @@ func (p *vmNativeProcessor) runSingle(ctx context.Context, f filter) error { return nil } -func (p *vmNativeProcessor) exportPipe(ctx context.Context, f filter) (io.ReadCloser, error) { - u := fmt.Sprintf("%s/%s", p.src.addr, nativeExportAddr) +func (p *vmNativeProcessor) getSourceTenants(ctx context.Context, f filter) ([]string, error) { + u := fmt.Sprintf("%s/%s", p.src.addr, nativeTenantsAddr) req, err := http.NewRequestWithContext(ctx, "GET", u, nil) + if err != nil { + return nil, fmt.Errorf("cannot create request to %q: %s", u, err) + } + + params := req.URL.Query() + if f.timeStart != "" { + params.Set("start", f.timeStart) + } + if f.timeEnd != "" { + params.Set("end", f.timeEnd) + } + req.URL.RawQuery = params.Encode() + + resp, err := p.src.do(req, http.StatusOK) + if err != nil { + return nil, fmt.Errorf("tenants request failed: %s", err) + } + + var r struct { + Tenants []string `json:"data"` + } + if err := json.NewDecoder(resp.Body).Decode(&r); err != nil { + return nil, fmt.Errorf("cannot decode tenants response: %s", err) + } + + if err := resp.Body.Close(); err != nil { + return nil, fmt.Errorf("cannot close tenants response body: %s", err) + } + + return r.Tenants, nil +} + +func (p *vmNativeProcessor) exportPipe(ctx context.Context, url string, f filter) (io.ReadCloser, error) { + req, err := http.NewRequestWithContext(ctx, "GET", url, nil) if err != nil { return nil, fmt.Errorf("cannot create request to %q: %s", p.src.addr, err) } diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index bdd9c449a..2fb153788 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -50,6 +50,7 @@ The following tip changes can be tested by building VictoriaMetrics components f * FEATURE: [VictoriaMetrics cluster](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html): improve error message when the requested path cannot be properly parsed, so users could identify the issue and properly fix the path. Now the error message links to [url format docs](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#url-format). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3402). * FEATURE: [vmctl](https://docs.victoriametrics.com/vmctl.html): add ability to copy data from sources via Prometheus `remote_read` protocol. See [these docs](https://docs.victoriametrics.com/vmctl.html#migrating-data-by-remote-read-protocol). The related issues: [one](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3132) and [two](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1101). * FEATURE: [vmalert](https://docs.victoriametrics.com/vmalert.html): add `-remoteWrite.sendTimeout` command-line flag, which allows configuring timeout for sending data to `-remoteWrite.url`. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3408). +* FEATURE: [vmctl](https://docs.victoriametrics.com/vmctl.html): add ability to migrate data between VictoriaMetrics clusters with automatic tenants discovery. See [these docs](https://docs.victoriametrics.com/vmctl.html#cluster-to-cluster-migration-mode) and [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2930) * BUGFIX: [vmalert](https://docs.victoriametrics.com/vmalert.html): properly pass HTTP headers during the alert state restore procedure. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3418). * BUGFIX: [vmalert](https://docs.victoriametrics.com/vmalert.html): properly specify rule evaluation step during the [replay mode](https://docs.victoriametrics.com/vmalert.html#rules-backfilling). The `step` value was previously overriden by `-datasource.queryStep` command-line flag.