app/vmctl: fix performance degradation, add flag to disable backoff policy (#4097)

* app/vmctl: change api for getting metric names

* app/vmctl: fix tests

* app/vmctl: add flag to enable backoff policy, fix test, performance improvements

* app/vmctl: use one http client

* app/vmctl: made linter happy

* app/vmctl: updated documentation and CHANGELOG.md

* app/vmctl: cleanup

* app/vmctl: rename flag

* app/vmctl: cleanup

* app/vmctl: fix comments

* app/vmctl: fix metrics parser problem, improve tests
This commit is contained in:
Dmytro Kozlov 2023-04-14 09:34:54 +03:00 committed by Aliaksandr Valialkin
parent 7fb2b14ca0
commit 10351a9dbb
No known key found for this signature in database
GPG key ID: A72BEC6CD3D0DED1
7 changed files with 217 additions and 76 deletions

View file

@ -738,7 +738,7 @@ or higher.
See `./vmctl vm-native --help` for details and full list of flags.
Migration in `vm-native` mode takes two steps:
1. Explore the list of the metrics to migrate via `/api/v1/series` API;
1. Explore the list of the metrics to migrate via `api/v1/label/__name__/values` API;
2. Migrate explored metrics one-by-one.
```
@ -765,6 +765,57 @@ Requests to make: 9 / 9 [██████████████████
requests retries: 0;
2023/03/02 09:22:06 Total time: 3.633127625s
```
`vmctl` uses retries with backoff policy by default.
The benefits of this retry backoff policy include:
1. Improved success rates:
With each retry attempt, the migration process has a higher chance of success.
By increasing the delay between retries, the system can avoid overwhelming the service with too many requests at once.
2. Reduced load on the system:
By increasing the delay between retries, the system can reduce the load on the service by limiting the number of
requests made in a short amount of time.
3. Can help to migrate a big amount of data
However, there are also some potential penalties associated with using a backoff retry policy, including:
1. Increased migration process latency:
`vmctl` need to make additional call to the `api/v1/label/__name__/values` with defined `--vm-native-filter-match` flag,
and after process all metric names with additional filters.
In case when retries with backoff policy is unneeded `--vm-native-disable-retries` command line flag can be used.
When this flag is set to `true`, `vmctl` skips additional call to the `api/v1/label/__name__/values` API and starts
migration process by making calls to the `/api/v1/export` and `api/v1/import`. If some errors happen `vmctl` immediately
stops the migration process.
```
./vmctl vm-native --vm-native-src-addr=http://127.0.0.1:8481/select/0/prometheus \
--vm-native-dst-addr=http://127.0.0.1:8428 \
--vm-native-filter-match='{__name__!=""}' \
--vm-native-filter-time-start='2023-04-08T11:30:30Z' \
--vm-native-disable-retries=true
VictoriaMetrics Native import mode
2023/04/11 10:17:14 Initing import process from "http://127.0.0.1:8481/select/0/prometheus/api/v1/export/native" to "http://localhost:8428/api/v1/import/native" with filter
filter: match[]={__name__!=""}
start: 2023-04-08T11:30:30Z
. Continue? [Y/n]
2023/04/11 10:17:15 Requests to make: 1
2023/04/11 10:17:15 number of workers decreased to 1, because vmctl calculated requests to make 1
Total: 0 ↙ Speed: ? p/s Continue import process with filter
filter: match[]={__name__!=""}
start: 2023-04-08T11:30:30Z
end: 2023-04-11T07:17:14Z:
Total: 1.64 GiB ↖ Speed: 11.20 MiB p/s
2023/04/11 10:19:45 Import finished!
2023/04/11 10:19:45 VictoriaMetrics importer stats:
time spent while importing: 2m30.813841541s;
total bytes: 1.8 GB;
bytes/s: 11.7 MB;
requests: 1;
requests retries: 0;
2023/04/11 10:19:45 Total time: 2m30.814721125s
```
Importing tips:

View file

@ -326,6 +326,7 @@ const (
vmNativeStepInterval = "vm-native-step-interval"
vmNativeDisableHTTPKeepAlive = "vm-native-disable-http-keep-alive"
vmNativeDisableRetries = "vm-native-disable-retries"
vmNativeSrcAddr = "vm-native-src-addr"
vmNativeSrcUser = "vm-native-src-user"
@ -443,6 +444,11 @@ var (
Usage: "Number of workers concurrently performing import requests to VM",
Value: 2,
},
&cli.BoolFlag{
Name: vmNativeDisableRetries,
Usage: "Defines whether to disable retries with backoff policy for migration process",
Value: false,
},
}
)

View file

@ -4,6 +4,7 @@ import (
"context"
"fmt"
"log"
"net/http"
"os"
"os/signal"
"strings"
@ -201,6 +202,8 @@ func main() {
return fmt.Errorf("flag %q can't be empty", vmNativeFilterMatch)
}
disableKeepAlive := c.Bool(vmNativeDisableHTTPKeepAlive)
var srcExtraLabels []string
srcAddr := strings.Trim(c.String(vmNativeSrcAddr), "/")
srcAuthConfig, err := auth.Generate(
@ -210,6 +213,7 @@ func main() {
if err != nil {
return fmt.Errorf("error initilize auth config for source: %s", srcAddr)
}
srcHTTPClient := &http.Client{Transport: &http.Transport{DisableKeepAlives: disableKeepAlive}}
dstAddr := strings.Trim(c.String(vmNativeDstAddr), "/")
dstExtraLabels := c.StringSlice(vmExtraLabel)
@ -220,6 +224,7 @@ func main() {
if err != nil {
return fmt.Errorf("error initilize auth config for destination: %s", dstAddr)
}
dstHTTPClient := &http.Client{Transport: &http.Transport{DisableKeepAlives: disableKeepAlive}}
p := vmNativeProcessor{
rateLimit: c.Int64(vmRateLimit),
@ -231,19 +236,20 @@ func main() {
Chunk: c.String(vmNativeStepInterval),
},
src: &native.Client{
AuthCfg: srcAuthConfig,
Addr: srcAddr,
ExtraLabels: srcExtraLabels,
DisableHTTPKeepAlive: c.Bool(vmNativeDisableHTTPKeepAlive),
AuthCfg: srcAuthConfig,
Addr: srcAddr,
ExtraLabels: srcExtraLabels,
HTTPClient: srcHTTPClient,
},
dst: &native.Client{
AuthCfg: dstAuthConfig,
Addr: dstAddr,
ExtraLabels: dstExtraLabels,
DisableHTTPKeepAlive: c.Bool(vmNativeDisableHTTPKeepAlive),
AuthCfg: dstAuthConfig,
Addr: dstAddr,
ExtraLabels: dstExtraLabels,
HTTPClient: dstHTTPClient,
},
backoff: backoff.New(),
cc: c.Int(vmConcurrency),
backoff: backoff.New(),
cc: c.Int(vmConcurrency),
disableRetries: c.Bool(vmNativeDisableRetries),
}
return p.run(ctx, isNonInteractive(c))
},

View file

@ -11,34 +11,33 @@ import (
)
const (
nativeTenantsAddr = "admin/tenants"
nativeSeriesAddr = "api/v1/series"
nameLabel = "__name__"
nativeTenantsAddr = "admin/tenants"
nativeMetricNamesAddr = "api/v1/label/__name__/values"
)
// Client is an HTTP client for exporting and importing
// time series via native protocol.
type Client struct {
AuthCfg *auth.Config
Addr string
ExtraLabels []string
DisableHTTPKeepAlive bool
AuthCfg *auth.Config
Addr string
ExtraLabels []string
HTTPClient *http.Client
}
// LabelValues represents series from api/v1/series response
type LabelValues map[string]string
// Response represents response from api/v1/series
// Response represents response from api/v1/label/__name__/values
type Response struct {
Status string `json:"status"`
Series []LabelValues `json:"data"`
Status string `json:"status"`
MetricNames []string `json:"data"`
}
// Explore finds series by provided filter from api/v1/series
func (c *Client) Explore(ctx context.Context, f Filter, tenantID string) (map[string]struct{}, error) {
url := fmt.Sprintf("%s/%s", c.Addr, nativeSeriesAddr)
// Explore finds metric names by provided filter from api/v1/label/__name__/values
func (c *Client) Explore(ctx context.Context, f Filter, tenantID string) ([]string, error) {
url := fmt.Sprintf("%s/%s", c.Addr, nativeMetricNamesAddr)
if tenantID != "" {
url = fmt.Sprintf("%s/select/%s/prometheus/%s", c.Addr, tenantID, nativeSeriesAddr)
url = fmt.Sprintf("%s/select/%s/prometheus/%s", c.Addr, tenantID, nativeMetricNamesAddr)
}
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
@ -68,21 +67,7 @@ func (c *Client) Explore(ctx context.Context, f Filter, tenantID string) (map[st
if err := resp.Body.Close(); err != nil {
return nil, fmt.Errorf("cannot close series response body: %s", err)
}
names := make(map[string]struct{})
for _, series := range response.Series {
// TODO: consider tweaking /api/v1/series API to return metric names only
// this could make explore response much lighter.
for key, value := range series {
if key != nameLabel {
continue
}
if _, ok := names[value]; ok {
continue
}
names[value] = struct{}{}
}
}
return names, nil
return response.MetricNames, nil
}
// ImportPipe uses pipe reader in request to process data
@ -169,8 +154,8 @@ func (c *Client) do(req *http.Request, expSC int) (*http.Response, error) {
if c.AuthCfg != nil {
c.AuthCfg.SetHeaders(req, true)
}
var httpClient = &http.Client{Transport: &http.Transport{DisableKeepAlives: c.DisableHTTPKeepAlive}}
resp, err := httpClient.Do(req)
resp, err := c.HTTPClient.Do(req)
if err != nil {
return nil, fmt.Errorf("unexpected error when performing request: %w", err)
}

View file

@ -5,6 +5,7 @@ import (
"fmt"
"io"
"log"
"strings"
"sync"
"time"
@ -13,8 +14,8 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/native"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/stepper"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/vm"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/searchutils"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils"
"github.com/cheggaaa/pb/v3"
)
@ -25,16 +26,18 @@ type vmNativeProcessor struct {
src *native.Client
backoff *backoff.Backoff
s *stats
rateLimit int64
interCluster bool
cc int
s *stats
rateLimit int64
interCluster bool
cc int
disableRetries bool
}
const (
nativeExportAddr = "api/v1/export/native"
nativeImportAddr = "api/v1/import/native"
nativeBarTpl = `{{ blue "%s:" }} {{ counters . }} {{ bar . "[" "█" (cycle . "█") "▒" "]" }} {{ percent . }}`
nativeExportAddr = "api/v1/export/native"
nativeImportAddr = "api/v1/import/native"
nativeWithBackoffTpl = `{{ blue "%s:" }} {{ counters . }} {{ bar . "[" "█" (cycle . "█") "▒" "]" }} {{ percent . }}`
nativeSingleProcessTpl = `Total: {{counters . }} {{ cycle . "↖" "↗" "↘" "↙" }} Speed: {{speed . }} {{string . "suffix"}}`
)
func (p *vmNativeProcessor) run(ctx context.Context, silent bool) error {
@ -94,9 +97,9 @@ func (p *vmNativeProcessor) run(ctx context.Context, silent bool) error {
return nil
}
func (p *vmNativeProcessor) do(ctx context.Context, f native.Filter, srcURL, dstURL string) error {
func (p *vmNativeProcessor) do(ctx context.Context, f native.Filter, srcURL, dstURL string, bar *pb.ProgressBar) error {
retryableFunc := func() error { return p.runSingle(ctx, f, srcURL, dstURL) }
retryableFunc := func() error { return p.runSingle(ctx, f, srcURL, dstURL, bar) }
attempts, err := p.backoff.Retry(ctx, retryableFunc)
p.s.Lock()
p.s.retries += attempts
@ -108,13 +111,18 @@ func (p *vmNativeProcessor) do(ctx context.Context, f native.Filter, srcURL, dst
return nil
}
func (p *vmNativeProcessor) runSingle(ctx context.Context, f native.Filter, srcURL, dstURL string) error {
func (p *vmNativeProcessor) runSingle(ctx context.Context, f native.Filter, srcURL, dstURL string, bar *pb.ProgressBar) error {
exportReader, err := p.src.ExportPipe(ctx, srcURL, f)
reader, err := p.src.ExportPipe(ctx, srcURL, f)
if err != nil {
return fmt.Errorf("failed to init export pipe: %w", err)
}
if p.disableRetries && bar != nil {
fmt.Printf("Continue import process with filter %s:\n", f.String())
reader = bar.NewProxyReader(reader)
}
pr, pw := io.Pipe()
done := make(chan struct{})
go func() {
@ -131,7 +139,7 @@ func (p *vmNativeProcessor) runSingle(ctx context.Context, f native.Filter, srcU
w = limiter.NewWriteLimiter(pw, rl)
}
written, err := io.Copy(w, exportReader)
written, err := io.Copy(w, reader)
if err != nil {
return fmt.Errorf("failed to write into %q: %s", p.dst.Addr, err)
}
@ -176,17 +184,22 @@ func (p *vmNativeProcessor) runBackfilling(ctx context.Context, tenantID string,
fmt.Println("") // extra line for better output formatting
log.Printf(initMessage, initParams...)
log.Printf("Exploring metrics...")
metrics, err := p.src.Explore(ctx, p.filter, tenantID)
if err != nil {
return fmt.Errorf("cannot get metrics from source %s: %w", p.src.Addr, err)
var foundSeriesMsg string
metrics := []string{p.filter.Match}
if !p.disableRetries {
log.Printf("Exploring metrics...")
metrics, err = p.src.Explore(ctx, p.filter, tenantID)
if err != nil {
return fmt.Errorf("cannot get metrics from source %s: %w", p.src.Addr, err)
}
if len(metrics) == 0 {
return fmt.Errorf("no metrics found")
}
foundSeriesMsg = fmt.Sprintf("Found %d metrics to import", len(metrics))
}
if len(metrics) == 0 {
return fmt.Errorf("no metrics found")
}
foundSeriesMsg := fmt.Sprintf("Found %d metrics to import", len(metrics))
if !p.interCluster {
// do not prompt for intercluster because there could be many tenants,
// and we don't want to interrupt the process when moving to the next tenant.
@ -206,7 +219,10 @@ func (p *vmNativeProcessor) runBackfilling(ctx context.Context, tenantID string,
var bar *pb.ProgressBar
if !silent {
bar = pb.ProgressBarTemplate(fmt.Sprintf(nativeBarTpl, barPrefix)).New(len(metrics) * len(ranges))
bar = pb.ProgressBarTemplate(fmt.Sprintf(nativeWithBackoffTpl, barPrefix)).New(len(metrics) * len(ranges))
if p.disableRetries {
bar = pb.ProgressBarTemplate(nativeSingleProcessTpl).New(0)
}
bar.Start()
defer bar.Finish()
}
@ -220,19 +236,26 @@ func (p *vmNativeProcessor) runBackfilling(ctx context.Context, tenantID string,
go func() {
defer wg.Done()
for f := range filterCh {
if err := p.do(ctx, f, srcURL, dstURL); err != nil {
errCh <- err
return
}
if bar != nil {
bar.Increment()
if !p.disableRetries {
if err := p.do(ctx, f, srcURL, dstURL, nil); err != nil {
errCh <- err
return
}
if bar != nil {
bar.Increment()
}
} else {
if err := p.runSingle(ctx, f, srcURL, dstURL, bar); err != nil {
errCh <- err
return
}
}
}
}()
}
// any error breaks the import
for s := range metrics {
for _, s := range metrics {
match, err := buildMatchWithFilter(p.filter.Match, s)
if err != nil {
@ -313,11 +336,26 @@ func byteCountSI(b int64) string {
}
func buildMatchWithFilter(filter string, metricName string) (string, error) {
labels, err := promutils.NewLabelsFromString(filter)
if filter == metricName {
return filter, nil
}
labels, err := searchutils.ParseMetricSelector(filter)
if err != nil {
return "", err
}
labels.Set("__name__", metricName)
return labels.String(), nil
str := make([]string, 0, len(labels))
for _, label := range labels {
if len(label.Key) == 0 {
continue
}
str = append(str, label.String())
}
nameFilter := fmt.Sprintf("__name__=%q", metricName)
str = append(str, nameFilter)
match := fmt.Sprintf("{%s}", strings.Join(str, ","))
return match, nil
}

View file

@ -13,6 +13,8 @@ The following tip changes can be tested by building VictoriaMetrics components f
* [How to build vmauth](https://docs.victoriametrics.com/vmauth.html#how-to-build-from-sources)
* [How to build vmctl](https://docs.victoriametrics.com/vmctl.html#how-to-build)
* BUGFIX: [vmctl](https://docs.victoriametrics.com/vmctl.html): fix performance issue when using `vmctl vm-native`. Added flag to disable backoff policy `--vm-native-disable-retries`. Changed API call to explore the list of the metric names from `/api/v1/series` to `api/v1/label/__name__/values`. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4092).
## tip
* FEATURE: [vmbackup](https://docs.victoriametrics.com/vmbackup.html): store backup creation and completion time in `backup_complete.ignore` file of backup contents. This is useful to determine point in time when backup was created and completed.

View file

@ -742,7 +742,7 @@ or higher.
See `./vmctl vm-native --help` for details and full list of flags.
Migration in `vm-native` mode takes two steps:
1. Explore the list of the metrics to migrate via `/api/v1/series` API;
1. Explore the list of the metrics to migrate via `api/v1/label/__name__/values` API;
2. Migrate explored metrics one-by-one.
```
@ -770,6 +770,59 @@ Requests to make: 9 / 9 [██████████████████
2023/03/02 09:22:06 Total time: 3.633127625s
```
`vmctl` uses retries with backoff policy by default.
The benefits of this retry backoff policy include:
1. Improved success rates:
With each retry attempt, the migration process has a higher chance of success.
By increasing the delay between retries, the system can avoid overwhelming the service with too many requests at once.
2. Reduced load on the system:
By increasing the delay between retries, the system can reduce the load on the service by limiting the number of
requests made in a short amount of time.
3. Can help to migrate a big amount of data
However, there are also some potential penalties associated with using a backoff retry policy, including:
1. Increased migration process latency:
`vmctl` need to make additional call to the `api/v1/label/__name__/values` with defined `--vm-native-filter-match` flag,
and after process all metric names with additional filters.
In case when retries with backoff policy is unneeded `--vm-native-disable-retries` command line flag can be used.
When this flag is set to `true`, `vmctl` skips additional call to the `api/v1/label/__name__/values` API and starts
migration process by making calls to the `/api/v1/export` and `api/v1/import`. If some errors happen `vmctl` immediately
stops the migration process.
```
./vmctl vm-native --vm-native-src-addr=http://127.0.0.1:8481/select/0/prometheus \
--vm-native-dst-addr=http://127.0.0.1:8428 \
--vm-native-filter-match='{__name__!=""}' \
--vm-native-filter-time-start='2023-04-08T11:30:30Z' \
--vm-native-disable-retries=true
VictoriaMetrics Native import mode
2023/04/11 10:17:14 Initing import process from "http://127.0.0.1:8481/select/0/prometheus/api/v1/export/native" to "http://localhost:8428/api/v1/import/native" with filter
filter: match[]={__name__!=""}
start: 2023-04-08T11:30:30Z
. Continue? [Y/n]
2023/04/11 10:17:15 Requests to make: 1
2023/04/11 10:17:15 number of workers decreased to 1, because vmctl calculated requests to make 1
Total: 0 ↙ Speed: ? p/s Continue import process with filter
filter: match[]={__name__!=""}
start: 2023-04-08T11:30:30Z
end: 2023-04-11T07:17:14Z:
Total: 1.64 GiB ↖ Speed: 11.20 MiB p/s
2023/04/11 10:19:45 Import finished!
2023/04/11 10:19:45 VictoriaMetrics importer stats:
time spent while importing: 2m30.813841541s;
total bytes: 1.8 GB;
bytes/s: 11.7 MB;
requests: 1;
requests retries: 0;
2023/04/11 10:19:45 Total time: 2m30.814721125s
```
Importing tips:
1. Migrating big volumes of data may result in reaching the safety limits on `src` side.