app/vmctl: add option to migrate between clusters with automatic tenants discovery (#3450)

This commit is contained in:
Zakhar Bessarab 2022-12-06 05:18:09 +04:00 committed by GitHub
parent f3e84b4dea
commit 86c31f2955
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 169 additions and 21 deletions

View file

@ -833,6 +833,80 @@ Total: 16 B ↗ Speed: 186.32 KiB p/s
2022/08/30 19:48:24 Total time: 12.680582ms
```
#### Cluster-to-cluster migration mode
Using cluster-to-cluster migration mode helps to migrate all tenants data in a single `vmctl` run.
Cluster-to-cluster uses `/admin/tenants` endpoint (available starting from [v1.84.0](https://docs.victoriametrics.com/CHANGELOG.html#v1840)) to discover list of tenants from source cluster.
To use this mode you need to set `--vm-intercluster` flag to `true`, `--vm-native-src-addr` flag to 'http://vmselect:8481/' and `--vm-native-dst-addr` value to http://vminsert:8480/:
```console
./bin/vmctl vm-native --vm-intercluster=true --vm-native-src-addr=http://localhost:8481/ --vm-native-dst-addr=http://172.17.0.3:8480/
VictoriaMetrics Native import mode
2022/12/05 21:20:06 Discovered tenants: [123:1 12812919:1 1289198:1 1289:1283 12:1 1:0 1:1 1:1231231 1:1271727 1:12819 1:281 812891298:1]
2022/12/05 21:20:06 Initing export pipe from "http://localhost:8481/select/123:1/prometheus/api/v1/export/native" with filters:
filter: match[]={__name__!=""}
Initing import process to "http://172.17.0.3:8480/insert/123:1/prometheus/api/v1/import/native":
Total: 61.13 MiB ↖ Speed: 2.05 MiB p/s
Total: 61.13 MiB ↗ Speed: 2.30 MiB p/s
2022/12/05 21:20:33 Initing export pipe from "http://localhost:8481/select/12812919:1/prometheus/api/v1/export/native" with filters:
filter: match[]={__name__!=""}
Initing import process to "http://172.17.0.3:8480/insert/12812919:1/prometheus/api/v1/import/native":
Total: 43.14 MiB ↘ Speed: 1.86 MiB p/s
Total: 43.14 MiB ↙ Speed: 2.36 MiB p/s
2022/12/05 21:20:51 Initing export pipe from "http://localhost:8481/select/1289198:1/prometheus/api/v1/export/native" with filters:
filter: match[]={__name__!=""}
Initing import process to "http://172.17.0.3:8480/insert/1289198:1/prometheus/api/v1/import/native":
Total: 16.64 MiB ↗ Speed: 2.66 MiB p/s
Total: 16.64 MiB ↘ Speed: 2.19 MiB p/s
2022/12/05 21:20:59 Initing export pipe from "http://localhost:8481/select/1289:1283/prometheus/api/v1/export/native" with filters:
filter: match[]={__name__!=""}
Initing import process to "http://172.17.0.3:8480/insert/1289:1283/prometheus/api/v1/import/native":
Total: 43.33 MiB ↙ Speed: 1.94 MiB p/s
Total: 43.33 MiB ↖ Speed: 2.35 MiB p/s
2022/12/05 21:21:18 Initing export pipe from "http://localhost:8481/select/12:1/prometheus/api/v1/export/native" with filters:
filter: match[]={__name__!=""}
Initing import process to "http://172.17.0.3:8480/insert/12:1/prometheus/api/v1/import/native":
Total: 63.78 MiB ↙ Speed: 1.96 MiB p/s
Total: 63.78 MiB ↖ Speed: 2.28 MiB p/s
2022/12/05 21:21:46 Initing export pipe from "http://localhost:8481/select/1:0/prometheus/api/v1/export/native" with filters:
filter: match[]={__name__!=""}
Initing import process to "http://172.17.0.3:8480/insert/1:0/prometheus/api/v1/import/native":
2022/12/05 21:21:46 Import finished!
Total: 330 B ↗ Speed: 3.53 MiB p/s
2022/12/05 21:21:46 Initing export pipe from "http://localhost:8481/select/1:1/prometheus/api/v1/export/native" with filters:
filter: match[]={__name__!=""}
Initing import process to "http://172.17.0.3:8480/insert/1:1/prometheus/api/v1/import/native":
Total: 63.81 MiB ↙ Speed: 1.96 MiB p/s
Total: 63.81 MiB ↖ Speed: 2.28 MiB p/s
2022/12/05 21:22:14 Initing export pipe from "http://localhost:8481/select/1:1231231/prometheus/api/v1/export/native" with filters:
filter: match[]={__name__!=""}
Initing import process to "http://172.17.0.3:8480/insert/1:1231231/prometheus/api/v1/import/native":
Total: 63.84 MiB ↙ Speed: 1.93 MiB p/s
Total: 63.84 MiB ↖ Speed: 2.29 MiB p/s
2022/12/05 21:22:42 Initing export pipe from "http://localhost:8481/select/1:1271727/prometheus/api/v1/export/native" with filters:
filter: match[]={__name__!=""}
Initing import process to "http://172.17.0.3:8480/insert/1:1271727/prometheus/api/v1/import/native":
Total: 54.37 MiB ↘ Speed: 1.90 MiB p/s
Total: 54.37 MiB ↙ Speed: 2.37 MiB p/s
2022/12/05 21:23:05 Initing export pipe from "http://localhost:8481/select/1:12819/prometheus/api/v1/export/native" with filters:
filter: match[]={__name__!=""}
Initing import process to "http://172.17.0.3:8480/insert/1:12819/prometheus/api/v1/import/native":
Total: 17.01 MiB ↙ Speed: 1.75 MiB p/s
Total: 17.01 MiB ↖ Speed: 2.15 MiB p/s
2022/12/05 21:23:13 Initing export pipe from "http://localhost:8481/select/1:281/prometheus/api/v1/export/native" with filters:
filter: match[]={__name__!=""}
Initing import process to "http://172.17.0.3:8480/insert/1:281/prometheus/api/v1/import/native":
Total: 63.89 MiB ↘ Speed: 1.90 MiB p/s
Total: 63.89 MiB ↙ Speed: 2.29 MiB p/s
2022/12/05 21:23:42 Initing export pipe from "http://localhost:8481/select/812891298:1/prometheus/api/v1/export/native" with filters:
filter: match[]={__name__!=""}
Initing import process to "http://172.17.0.3:8480/insert/812891298:1/prometheus/api/v1/import/native":
Total: 63.84 MiB ↖ Speed: 1.99 MiB p/s
Total: 63.84 MiB ↗ Speed: 2.26 MiB p/s
2022/12/05 21:24:10 Total time: 4m4.1466565s
```
## Verifying exported blocks from VictoriaMetrics

View file

@ -44,6 +44,8 @@ const (
// also used in vm-native
vmExtraLabel = "vm-extra-label"
vmRateLimit = "vm-rate-limit"
vmInterCluster = "vm-intercluster"
)
var (
@ -398,6 +400,12 @@ var (
Usage: "Optional data transfer rate limit in bytes per second.\n" +
"By default the rate limit is disabled. It can be useful for limiting load on source or destination databases.",
},
&cli.BoolFlag{
Name: vmInterCluster,
Usage: "Enables cluster-to-cluster migration mode with automatic tenants data migration.\n" +
fmt.Sprintf(" In this mode --%s flag format is: 'http://vmselect:8481/'. --%s flag format is: http://vminsert:8480/. \n", vmNativeSrcAddr, vmNativeDstAddr) +
" TenantID will be appended automatically after discovering tenants from src.",
},
}
)

View file

@ -200,7 +200,8 @@ func main() {
}
p := vmNativeProcessor{
rateLimit: c.Int64(vmRateLimit),
rateLimit: c.Int64(vmRateLimit),
interCluster: c.Bool(vmInterCluster),
filter: filter{
match: c.String(vmNativeFilterMatch),
timeStart: c.String(vmNativeFilterTimeStart),

View file

@ -2,6 +2,7 @@ package main
import (
"context"
"encoding/json"
"fmt"
"io"
"log"
@ -19,8 +20,9 @@ type vmNativeProcessor struct {
filter filter
rateLimit int64
dst *vmNativeClient
src *vmNativeClient
dst *vmNativeClient
src *vmNativeClient
interCluster bool
}
type vmNativeClient struct {
@ -49,15 +51,16 @@ func (f filter) String() string {
}
const (
nativeExportAddr = "api/v1/export/native"
nativeImportAddr = "api/v1/import/native"
nativeExportAddr = "api/v1/export/native"
nativeImportAddr = "api/v1/import/native"
nativeTenantsAddr = "admin/tenants"
nativeBarTpl = `Total: {{counters . }} {{ cycle . "↖" "↗" "↘" "↙" }} Speed: {{speed . }} {{string . "suffix"}}`
)
func (p *vmNativeProcessor) run(ctx context.Context) error {
if p.filter.chunk == "" {
return p.runSingle(ctx, p.filter)
return p.runWithFilter(ctx, p.filter)
}
startOfRange, err := time.Parse(time.RFC3339, p.filter.timeStart)
@ -89,7 +92,7 @@ func (p *vmNativeProcessor) run(ctx context.Context) error {
timeStart: formattedStartTime,
timeEnd: formattedEndTime,
}
err := p.runSingle(ctx, f)
err := p.runWithFilter(ctx, f)
if err != nil {
log.Printf("processing failed for range %d/%d: %s - %s \n", rangeIdx+1, len(ranges), formattedStartTime, formattedEndTime)
@ -99,25 +102,52 @@ func (p *vmNativeProcessor) run(ctx context.Context) error {
return nil
}
func (p *vmNativeProcessor) runSingle(ctx context.Context, f filter) error {
pr, pw := io.Pipe()
func (p *vmNativeProcessor) runWithFilter(ctx context.Context, f filter) error {
nativeImportAddr, err := vm.AddExtraLabelsToImportPath(nativeImportAddr, p.dst.extraLabels)
log.Printf("Initing export pipe from %q with filters: %s\n", p.src.addr, f)
exportReader, err := p.exportPipe(ctx, f)
if err != nil {
return fmt.Errorf("failed to add labels to import path: %s", err)
}
if !p.interCluster {
srcURL := fmt.Sprintf("%s/%s", p.src.addr, nativeExportAddr)
dstURL := fmt.Sprintf("%s/%s", p.dst.addr, nativeImportAddr)
return p.runSingle(ctx, f, srcURL, dstURL)
}
tenants, err := p.getSourceTenants(ctx, f)
if err != nil {
return fmt.Errorf("failed to get source tenants: %s", err)
}
log.Printf("Discovered tenants: %v", tenants)
for _, tenant := range tenants {
// src and dst expected formats: http://vminsert:8480/ and http://vmselect:8481/
srcURL := fmt.Sprintf("%s/select/%s/prometheus/%s", p.src.addr, tenant, nativeExportAddr)
dstURL := fmt.Sprintf("%s/insert/%s/prometheus/%s", p.dst.addr, tenant, nativeImportAddr)
if err := p.runSingle(ctx, f, srcURL, dstURL); err != nil {
return fmt.Errorf("failed to migrate data for tenant %q: %s", tenant, err)
}
}
return nil
}
func (p *vmNativeProcessor) runSingle(ctx context.Context, f filter, srcURL, dstURL string) error {
log.Printf("Initing export pipe from %q with filters: %s\n", srcURL, f)
exportReader, err := p.exportPipe(ctx, srcURL, f)
if err != nil {
return fmt.Errorf("failed to init export pipe: %s", err)
}
nativeImportAddr, err := vm.AddExtraLabelsToImportPath(nativeImportAddr, p.dst.extraLabels)
if err != nil {
return err
}
pr, pw := io.Pipe()
sync := make(chan struct{})
go func() {
defer func() { close(sync) }()
u := fmt.Sprintf("%s/%s", p.dst.addr, nativeImportAddr)
req, err := http.NewRequestWithContext(ctx, "POST", u, pr)
req, err := http.NewRequestWithContext(ctx, "POST", dstURL, pr)
if err != nil {
log.Fatalf("cannot create import request to %q: %s", p.dst.addr, err)
}
@ -130,7 +160,7 @@ func (p *vmNativeProcessor) runSingle(ctx context.Context, f filter) error {
}
}()
fmt.Printf("Initing import process to %q:\n", p.dst.addr)
fmt.Printf("Initing import process to %q:\n", dstURL)
pool := pb.NewPool()
bar := pb.ProgressBarTemplate(nativeBarTpl).New(0)
pool.Add(bar)
@ -166,9 +196,43 @@ func (p *vmNativeProcessor) runSingle(ctx context.Context, f filter) error {
return nil
}
func (p *vmNativeProcessor) exportPipe(ctx context.Context, f filter) (io.ReadCloser, error) {
u := fmt.Sprintf("%s/%s", p.src.addr, nativeExportAddr)
func (p *vmNativeProcessor) getSourceTenants(ctx context.Context, f filter) ([]string, error) {
u := fmt.Sprintf("%s/%s", p.src.addr, nativeTenantsAddr)
req, err := http.NewRequestWithContext(ctx, "GET", u, nil)
if err != nil {
return nil, fmt.Errorf("cannot create request to %q: %s", u, err)
}
params := req.URL.Query()
if f.timeStart != "" {
params.Set("start", f.timeStart)
}
if f.timeEnd != "" {
params.Set("end", f.timeEnd)
}
req.URL.RawQuery = params.Encode()
resp, err := p.src.do(req, http.StatusOK)
if err != nil {
return nil, fmt.Errorf("tenants request failed: %s", err)
}
var r struct {
Tenants []string `json:"data"`
}
if err := json.NewDecoder(resp.Body).Decode(&r); err != nil {
return nil, fmt.Errorf("cannot decode tenants response: %s", err)
}
if err := resp.Body.Close(); err != nil {
return nil, fmt.Errorf("cannot close tenants response body: %s", err)
}
return r.Tenants, nil
}
func (p *vmNativeProcessor) exportPipe(ctx context.Context, url string, f filter) (io.ReadCloser, error) {
req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
if err != nil {
return nil, fmt.Errorf("cannot create request to %q: %s", p.src.addr, err)
}

View file

@ -50,6 +50,7 @@ The following tip changes can be tested by building VictoriaMetrics components f
* FEATURE: [VictoriaMetrics cluster](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html): improve error message when the requested path cannot be properly parsed, so users could identify the issue and properly fix the path. Now the error message links to [url format docs](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#url-format). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3402).
* FEATURE: [vmctl](https://docs.victoriametrics.com/vmctl.html): add ability to copy data from sources via Prometheus `remote_read` protocol. See [these docs](https://docs.victoriametrics.com/vmctl.html#migrating-data-by-remote-read-protocol). The related issues: [one](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3132) and [two](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1101).
* FEATURE: [vmalert](https://docs.victoriametrics.com/vmalert.html): add `-remoteWrite.sendTimeout` command-line flag, which allows configuring timeout for sending data to `-remoteWrite.url`. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3408).
* FEATURE: [vmctl](https://docs.victoriametrics.com/vmctl.html): add ability to migrate data between VictoriaMetrics clusters with automatic tenants discovery. See [these docs](https://docs.victoriametrics.com/vmctl.html#cluster-to-cluster-migration-mode) and [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2930)
* BUGFIX: [vmalert](https://docs.victoriametrics.com/vmalert.html): properly pass HTTP headers during the alert state restore procedure. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3418).
* BUGFIX: [vmalert](https://docs.victoriametrics.com/vmalert.html): properly specify rule evaluation step during the [replay mode](https://docs.victoriametrics.com/vmalert.html#rules-backfilling). The `step` value was previously overriden by `-datasource.queryStep` command-line flag.