diff --git a/app/vmctl/backoff/backoff.go b/app/vmctl/backoff/backoff.go index bedde6cdc..397b9b2ec 100644 --- a/app/vmctl/backoff/backoff.go +++ b/app/vmctl/backoff/backoff.go @@ -10,12 +10,6 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" ) -const ( - backoffRetries = 10 - backoffFactor = 1.8 - backoffMinDuration = time.Second * 2 -) - // retryableFunc describes call back which will repeat on errors type retryableFunc func() error @@ -30,12 +24,22 @@ type Backoff struct { } // New initialize backoff object -func New() *Backoff { - return &Backoff{ - retries: backoffRetries, - factor: backoffFactor, - minDuration: backoffMinDuration, +func New(retries int, factor float64, minDuration time.Duration) (*Backoff, error) { + if retries <= 0 { + return nil, fmt.Errorf("number of backoff retries must be greater than 0") } + if factor <= 1 { + return nil, fmt.Errorf("backoff retry factor must be greater than 1") + } + if minDuration <= 0 { + return nil, fmt.Errorf("backoff retry minimum duration must be greater than 0") + } + + return &Backoff{ + retries: retries, + factor: factor, + minDuration: minDuration, + }, nil } // Retry process retries until all attempts are completed diff --git a/app/vmctl/backoff/backoff_test.go b/app/vmctl/backoff/backoff_test.go index 2205b0e34..a5580c648 100644 --- a/app/vmctl/backoff/backoff_test.go +++ b/app/vmctl/backoff/backoff_test.go @@ -3,6 +3,7 @@ package backoff import ( "context" "fmt" + "strings" "testing" "time" ) @@ -110,3 +111,32 @@ func TestBackoffRetry_Success(t *testing.T) { resultExpected := 1 f(retryFunc, resultExpected) } + +func TestBackoff_New(t *testing.T) { + f := func(retries int, factor float64, minDuration time.Duration, errExpected string) { + t.Helper() + + _, err := New(retries, factor, minDuration) + if err == nil { + if errExpected != "" { + t.Fatalf("expecting non-nil error") + } + return + } + if !strings.Contains(err.Error(), errExpected) { + t.Fatalf("unexpected error: got %q; want %q", err.Error(), errExpected) + } + } + + // empty retries + f(0, 1.1, time.Millisecond*10, "retries must be greater than 0") + + // empty factor + f(1, 0, time.Millisecond*10, "factor must be greater than 1") + + // empty minDuration + f(1, 1.1, 0, "minimum duration must be greater than 0") + + // no errors + f(1, 1.1, time.Millisecond*10, "") +} diff --git a/app/vmctl/flags.go b/app/vmctl/flags.go index e5ae52e54..055704e08 100644 --- a/app/vmctl/flags.go +++ b/app/vmctl/flags.go @@ -56,6 +56,10 @@ const ( vmRateLimit = "vm-rate-limit" vmInterCluster = "vm-intercluster" + + vmBackoffRetries = "vm-backoff-retries" + vmBackoffFactor = "vm-backoff-factor" + vmBackoffMinDuration = "vm-backoff-min-duration" ) var ( @@ -146,6 +150,21 @@ var ( Usage: "Whether to skip tls verification when connecting to '--vmAddr'", Value: false, }, + &cli.IntFlag{ + Name: vmBackoffRetries, + Value: 10, + Usage: "How many import retries to perform before giving up.", + }, + &cli.Float64Flag{ + Name: vmBackoffFactor, + Value: 1.8, + Usage: "Factor to multiply the base duration after each failed import retry. Must be greater than 1.0", + }, + &cli.DurationFlag{ + Name: vmBackoffMinDuration, + Value: time.Second * 2, + Usage: "Minimum duration to wait before the first import retry. Each subsequent import retry will be multiplied by the '--vm-backoff-factor'.", + }, } ) @@ -430,6 +449,10 @@ const ( vmNativeDstCAFile = "vm-native-dst-ca-file" vmNativeDstServerName = "vm-native-dst-server-name" vmNativeDstInsecureSkipVerify = "vm-native-dst-insecure-skip-verify" + + vmNativeBackoffRetries = "vm-native-backoff-retries" + vmNativeBackoffFactor = "vm-native-backoff-factor" + vmNativeBackoffMinDuration = "vm-native-backoff-min-duration" ) var ( @@ -599,6 +622,21 @@ var ( "Non-binary export/import API is less efficient, but supports deduplication if it is configured on vm-native-src-addr side.", Value: false, }, + &cli.IntFlag{ + Name: vmNativeBackoffRetries, + Value: 10, + Usage: "How many export/import retries to perform before giving up.", + }, + &cli.Float64Flag{ + Name: vmNativeBackoffFactor, + Value: 1.8, + Usage: "Factor to multiply the base duration after each failed export/import retry. Must be greater than 1.0", + }, + &cli.DurationFlag{ + Name: vmNativeBackoffMinDuration, + Value: time.Second * 2, + Usage: "Minimum duration to wait before the first export/import retry. Each subsequent export/import retry will be multiplied by the '--vm-native-backoff-factor'.", + }, } ) diff --git a/app/vmctl/main.go b/app/vmctl/main.go index 136c186a9..3a07a6247 100644 --- a/app/vmctl/main.go +++ b/app/vmctl/main.go @@ -90,6 +90,7 @@ func main() { if err != nil { return fmt.Errorf("failed to init VM configuration: %s", err) } + importer, err := vm.NewImporter(ctx, vmCfg) if err != nil { return fmt.Errorf("failed to create VM importer: %s", err) @@ -143,6 +144,7 @@ func main() { if err != nil { return fmt.Errorf("failed to init VM configuration: %s", err) } + importer, err = vm.NewImporter(ctx, vmCfg) if err != nil { return fmt.Errorf("failed to create VM importer: %s", err) @@ -201,6 +203,7 @@ func main() { if err != nil { return fmt.Errorf("failed to init VM configuration: %s", err) } + importer, err := vm.NewImporter(ctx, vmCfg) if err != nil { return fmt.Errorf("failed to create VM importer: %s", err) @@ -233,6 +236,7 @@ func main() { if err != nil { return fmt.Errorf("failed to init VM configuration: %s", err) } + importer, err = vm.NewImporter(ctx, vmCfg) if err != nil { return fmt.Errorf("failed to create VM importer: %s", err) @@ -272,6 +276,14 @@ func main() { return fmt.Errorf("flag %q can't be empty", vmNativeFilterMatch) } + bfRetries := c.Int(vmNativeBackoffRetries) + bfFactor := c.Float64(vmNativeBackoffFactor) + bfMinDuration := c.Duration(vmNativeBackoffMinDuration) + bf, err := backoff.New(bfRetries, bfFactor, bfMinDuration) + if err != nil { + return fmt.Errorf("failed to create backoff object: %s", err) + } + disableKeepAlive := c.Bool(vmNativeDisableHTTPKeepAlive) var srcExtraLabels []string @@ -350,7 +362,7 @@ func main() { ExtraLabels: dstExtraLabels, HTTPClient: dstHTTPClient, }, - backoff: backoff.New(), + backoff: bf, cc: c.Int(vmConcurrency), disablePerMetricRequests: c.Bool(vmNativeDisablePerMetricMigration), isNative: !c.Bool(vmNativeDisableBinaryProtocol), @@ -429,6 +441,14 @@ func initConfigVM(c *cli.Context) (vm.Config, error) { return vm.Config{}, fmt.Errorf("failed to create Transport: %s", err) } + bfRetries := c.Int(vmBackoffRetries) + bfFactor := c.Float64(vmBackoffFactor) + bfMinDuration := c.Duration(vmBackoffMinDuration) + bf, err := backoff.New(bfRetries, bfFactor, bfMinDuration) + if err != nil { + return vm.Config{}, fmt.Errorf("failed to create backoff object: %s", err) + } + return vm.Config{ Addr: addr, Transport: tr, @@ -442,5 +462,6 @@ func initConfigVM(c *cli.Context) (vm.Config, error) { RoundDigits: c.Int(vmRoundDigits), ExtraLabels: c.StringSlice(vmExtraLabel), RateLimit: c.Int64(vmRateLimit), + Backoff: bf, }, nil } diff --git a/app/vmctl/remote_read_test.go b/app/vmctl/remote_read_test.go index c5a69a61c..023b63c8c 100644 --- a/app/vmctl/remote_read_test.go +++ b/app/vmctl/remote_read_test.go @@ -8,6 +8,7 @@ import ( "github.com/prometheus/prometheus/prompb" + "github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/backoff" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/barpool" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/remoteread" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/stepper" @@ -149,6 +150,12 @@ func TestRemoteRead(t *testing.T) { tt.vmCfg.Addr = remoteWriteServer.URL() + b, err := backoff.New(10, 1.8, time.Second*2) + if err != nil { + t.Fatalf("failed to create backoff: %s", err) + } + tt.vmCfg.Backoff = b + importer, err := vm.NewImporter(ctx, tt.vmCfg) if err != nil { t.Fatalf("failed to create VM importer: %s", err) @@ -308,6 +315,12 @@ func TestSteamRemoteRead(t *testing.T) { tt.vmCfg.Addr = remoteWriteServer.URL() + b, err := backoff.New(10, 1.8, time.Second*2) + if err != nil { + t.Fatalf("failed to create backoff: %s", err) + } + + tt.vmCfg.Backoff = b importer, err := vm.NewImporter(ctx, tt.vmCfg) if err != nil { t.Fatalf("failed to create VM importer: %s", err) diff --git a/app/vmctl/vm/vm.go b/app/vmctl/vm/vm.go index 03183e867..c022574fc 100644 --- a/app/vmctl/vm/vm.go +++ b/app/vmctl/vm/vm.go @@ -54,6 +54,8 @@ type Config struct { // RateLimit defines a data transfer speed in bytes per second. // Is applied to each worker (see Concurrency) independently. RateLimit int64 + // Backoff defines backoff policy for retries + Backoff *backoff.Backoff } // Importer performs insertion of timeseries @@ -144,7 +146,7 @@ func NewImporter(ctx context.Context, cfg Config) (*Importer, error) { close: make(chan struct{}), input: make(chan *TimeSeries, cfg.Concurrency*4), errors: make(chan *ImportError, cfg.Concurrency), - backoff: backoff.New(), + backoff: cfg.Backoff, } if err := im.Ping(); err != nil { return nil, fmt.Errorf("ping to %q failed: %s", addr, err) diff --git a/app/vmctl/vm_native_test.go b/app/vmctl/vm_native_test.go index 6060d79c4..874f8774f 100644 --- a/app/vmctl/vm_native_test.go +++ b/app/vmctl/vm_native_test.go @@ -81,11 +81,16 @@ func TestVMNativeProcessorRun(t *testing.T) { isSilent = true defer func() { isSilent = false }() + bf, err := backoff.New(10, 1.8, time.Second*2) + if err != nil { + t.Fatalf("cannot create backoff: %s", err) + } + p := &vmNativeProcessor{ filter: filter, dst: dstClient, src: srcClient, - backoff: backoff.New(), + backoff: bf, cc: 1, isNative: true, } diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 2af12fbf6..7afa1c3ee 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -30,6 +30,8 @@ See also [LTS releases](https://docs.victoriametrics.com/lts-releases/). ## tip +* FEATURE: [vmctl](https://docs.victoriametrics.com/vmctl/): add `--vm-backoff-retries`, `--vm-backoff-factor`, `--vm-backoff-min-duration` and `--vm-native-backoff-retries`, `--vm-native-backoff-factor`, `--vm-native-backoff-min-duration` command-line flags. These flags allow to change backoff policy config for import requests to VictoriaMetrics. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6622). + ## [v1.102.1](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.102.1) Released at 2024-08-01