app/vmctl: add retry backoff policy (#3844)

app/vmctl: move retries logic into a separate pkg
This commit is contained in:
Dmytro Kozlov 2023-02-22 14:06:55 +02:00 committed by GitHub
parent 57801660ab
commit 110c3896e7
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 188 additions and 40 deletions

View file

@ -0,0 +1,60 @@
package backoff
import (
"context"
"errors"
"fmt"
"math"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
)
const (
backoffRetries = 5
backoffFactor = 1.7
backoffMinDuration = time.Second
)
// retryableFunc describes call back which will repeat on errors
type retryableFunc func() error
var ErrBadRequest = errors.New("bad request")
// Backoff describes object with backoff policy params
type Backoff struct {
retries int
factor float64
minDuration time.Duration
}
// New initialize backoff object
func New() *Backoff {
return &Backoff{
retries: backoffRetries,
factor: backoffFactor,
minDuration: backoffMinDuration,
}
}
// Retry process retries until all attempts are completed
func (b *Backoff) Retry(ctx context.Context, cb retryableFunc) (uint64, error) {
var attempt uint64
for i := 0; i < b.retries; i++ {
// @TODO we should use context to cancel retries
err := cb()
if err == nil {
return attempt, nil
}
if errors.Is(err, ErrBadRequest) {
logger.Errorf("unrecoverable error: %s", err)
return attempt, err // fail fast if not recoverable
}
attempt++
backoff := float64(b.minDuration) * math.Pow(b.factor, float64(i))
dur := time.Duration(backoff)
logger.Errorf("got error: %s on attempt: %d; will retry in %v", err, attempt, dur)
time.Sleep(time.Duration(backoff))
}
return attempt, fmt.Errorf("execution failed after %d retry attempts", b.retries)
}

View file

@ -0,0 +1,96 @@
package backoff
import (
"context"
"fmt"
"testing"
"time"
)
func TestRetry_Do(t *testing.T) {
counter := 0
tests := []struct {
name string
backoffRetries int
backoffFactor float64
backoffMinDuration time.Duration
retryableFunc retryableFunc
ctx context.Context
withCancel bool
want uint64
wantErr bool
}{
{
name: "return bad request",
retryableFunc: func() error {
return ErrBadRequest
},
ctx: context.Background(),
want: 0,
wantErr: true,
},
{
name: "empty retries values",
retryableFunc: func() error {
time.Sleep(time.Millisecond * 100)
return nil
},
ctx: context.Background(),
want: 0,
wantErr: false,
},
{
name: "only one retry test",
backoffRetries: 5,
backoffFactor: 1.7,
backoffMinDuration: time.Millisecond * 10,
retryableFunc: func() error {
t := time.NewTicker(time.Millisecond * 5)
defer t.Stop()
for range t.C {
counter++
if counter%2 == 0 {
return fmt.Errorf("got some error")
}
if counter%3 == 0 {
return nil
}
}
return nil
},
ctx: context.Background(),
want: 1,
wantErr: false,
},
{
name: "all retries failed test",
backoffRetries: 5,
backoffFactor: 0.1,
backoffMinDuration: time.Millisecond * 10,
retryableFunc: func() error {
t := time.NewTicker(time.Millisecond * 5)
defer t.Stop()
for range t.C {
return fmt.Errorf("got some error")
}
return nil
},
ctx: context.Background(),
want: 5,
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
r := New()
got, err := r.Retry(tt.ctx, tt.retryableFunc)
if (err != nil) != tt.wantErr {
t.Errorf("Retry() error = %v, wantErr %v", err, tt.wantErr)
return
}
if got != tt.want {
t.Errorf("Retry() got = %v, want %v", got, tt.want)
}
})
}
}

View file

@ -65,7 +65,7 @@ func main() {
// disable progress bars since openTSDB implementation
// does not use progress bar pool
vmCfg.DisableProgressBar = true
importer, err := vm.NewImporter(vmCfg)
importer, err := vm.NewImporter(ctx, vmCfg)
if err != nil {
return fmt.Errorf("failed to create VM importer: %s", err)
}
@ -100,7 +100,7 @@ func main() {
}
vmCfg := initConfigVM(c)
importer, err = vm.NewImporter(vmCfg)
importer, err = vm.NewImporter(ctx, vmCfg)
if err != nil {
return fmt.Errorf("failed to create VM importer: %s", err)
}
@ -137,7 +137,7 @@ func main() {
vmCfg := initConfigVM(c)
importer, err := vm.NewImporter(vmCfg)
importer, err := vm.NewImporter(ctx, vmCfg)
if err != nil {
return fmt.Errorf("failed to create VM importer: %s", err)
}
@ -163,7 +163,7 @@ func main() {
fmt.Println("Prometheus import mode")
vmCfg := initConfigVM(c)
importer, err = vm.NewImporter(vmCfg)
importer, err = vm.NewImporter(ctx, vmCfg)
if err != nil {
return fmt.Errorf("failed to create VM importer: %s", err)
}

View file

@ -1,6 +1,7 @@
package main
import (
"context"
"os"
"testing"
"time"
@ -58,7 +59,7 @@ func Test_prometheusProcessor_run(t *testing.T) {
return client
},
im: func(vmCfg vm.Config) *vm.Importer {
importer, err := vm.NewImporter(vmCfg)
importer, err := vm.NewImporter(context.Background(), vmCfg)
if err != nil {
t.Fatalf("error init importer: %s", err)
}
@ -95,7 +96,7 @@ func Test_prometheusProcessor_run(t *testing.T) {
return client
},
im: func(vmCfg vm.Config) *vm.Importer {
importer, err := vm.NewImporter(vmCfg)
importer, err := vm.NewImporter(context.Background(), vmCfg)
if err != nil {
t.Fatalf("error init importer: %s", err)
}

View file

@ -110,6 +110,7 @@ func TestRemoteRead(t *testing.T) {
for _, tt := range testCases {
t.Run(tt.name, func(t *testing.T) {
ctx := context.Background()
remoteReadServer := remote_read_integration.NewRemoteReadServer(t)
defer remoteReadServer.Close()
remoteWriteServer := remote_read_integration.NewRemoteWriteServer(t)
@ -139,7 +140,7 @@ func TestRemoteRead(t *testing.T) {
tt.vmCfg.Addr = remoteWriteServer.URL()
importer, err := vm.NewImporter(tt.vmCfg)
importer, err := vm.NewImporter(ctx, tt.vmCfg)
if err != nil {
t.Fatalf("failed to create VM importer: %s", err)
}
@ -156,7 +157,6 @@ func TestRemoteRead(t *testing.T) {
cc: 1,
}
ctx := context.Background()
err = rmp.run(ctx, true, false)
if err != nil {
t.Fatalf("failed to run remote read processor: %s", err)
@ -263,6 +263,7 @@ func TestSteamRemoteRead(t *testing.T) {
for _, tt := range testCases {
t.Run(tt.name, func(t *testing.T) {
ctx := context.Background()
remoteReadServer := remote_read_integration.NewRemoteReadStreamServer(t)
defer remoteReadServer.Close()
remoteWriteServer := remote_read_integration.NewRemoteWriteServer(t)
@ -292,7 +293,7 @@ func TestSteamRemoteRead(t *testing.T) {
tt.vmCfg.Addr = remoteWriteServer.URL()
importer, err := vm.NewImporter(tt.vmCfg)
importer, err := vm.NewImporter(ctx, tt.vmCfg)
if err != nil {
t.Fatalf("failed to create VM importer: %s", err)
}
@ -309,7 +310,6 @@ func TestSteamRemoteRead(t *testing.T) {
cc: 1,
}
ctx := context.Background()
err = rmp.run(ctx, true, false)
if err != nil {
t.Fatalf("failed to run remote read processor: %s", err)

View file

@ -3,15 +3,16 @@ package vm
import (
"bufio"
"compress/gzip"
"context"
"errors"
"fmt"
"io"
"math"
"net/http"
"strings"
"sync"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/backoff"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/barpool"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/limiter"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal"
@ -75,7 +76,8 @@ type Importer struct {
wg sync.WaitGroup
once sync.Once
s *stats
s *stats
backoff *backoff.Backoff
}
// ResetStats resets im stats.
@ -107,7 +109,7 @@ func AddExtraLabelsToImportPath(path string, extraLabels []string) (string, erro
}
// NewImporter creates new Importer for the given cfg.
func NewImporter(cfg Config) (*Importer, error) {
func NewImporter(ctx context.Context, cfg Config) (*Importer, error) {
if cfg.Concurrency < 1 {
return nil, fmt.Errorf("concurrency can't be lower than 1")
}
@ -136,6 +138,7 @@ func NewImporter(cfg Config) (*Importer, error) {
close: make(chan struct{}),
input: make(chan *TimeSeries, cfg.Concurrency*4),
errors: make(chan *ImportError, cfg.Concurrency),
backoff: backoff.New(),
}
if err := im.Ping(); err != nil {
return nil, fmt.Errorf("ping to %q failed: %s", addr, err)
@ -154,7 +157,7 @@ func NewImporter(cfg Config) (*Importer, error) {
}
go func(bar *pb.ProgressBar) {
defer im.wg.Done()
im.startWorker(bar, cfg.BatchSize, cfg.SignificantFigures, cfg.RoundDigits)
im.startWorker(ctx, bar, cfg.BatchSize, cfg.SignificantFigures, cfg.RoundDigits)
}(bar)
}
im.ResetStats()
@ -205,7 +208,7 @@ func (im *Importer) Close() {
})
}
func (im *Importer) startWorker(bar *pb.ProgressBar, batchSize, significantFigures, roundDigits int) {
func (im *Importer) startWorker(ctx context.Context, bar *pb.ProgressBar, batchSize, significantFigures, roundDigits int) {
var batch []*TimeSeries
var dataPoints int
var waitForBatch time.Time
@ -219,7 +222,9 @@ func (im *Importer) startWorker(bar *pb.ProgressBar, batchSize, significantFigur
exitErr := &ImportError{
Batch: batch,
}
if err := im.Import(batch); err != nil {
retryableFunc := func() error { return im.Import(batch) }
_, err := im.backoff.Retry(ctx, retryableFunc)
if err != nil {
exitErr.Err = err
}
im.errors <- exitErr
@ -249,7 +254,7 @@ func (im *Importer) startWorker(bar *pb.ProgressBar, batchSize, significantFigur
im.s.idleDuration += time.Since(waitForBatch)
im.s.Unlock()
if err := im.flush(batch); err != nil {
if err := im.flush(ctx, batch); err != nil {
im.errors <- &ImportError{
Batch: batch,
Err: err,
@ -264,30 +269,16 @@ func (im *Importer) startWorker(bar *pb.ProgressBar, batchSize, significantFigur
}
}
const (
// TODO: make configurable
backoffRetries = 5
backoffFactor = 1.7
backoffMinDuration = time.Second
)
func (im *Importer) flush(b []*TimeSeries) error {
var err error
for i := 0; i < backoffRetries; i++ {
err = im.Import(b)
if err == nil {
return nil
}
if errors.Is(err, ErrBadRequest) {
return err // fail fast if not recoverable
}
im.s.Lock()
im.s.retries++
im.s.Unlock()
backoff := float64(backoffMinDuration) * math.Pow(backoffFactor, float64(i))
time.Sleep(time.Duration(backoff))
func (im *Importer) flush(ctx context.Context, b []*TimeSeries) error {
retryableFunc := func() error { return im.Import(b) }
attempts, err := im.backoff.Retry(ctx, retryableFunc)
if err != nil {
return fmt.Errorf("import failed with %d retries: %s", attempts, err)
}
return fmt.Errorf("import failed with %d retries: %s", backoffRetries, err)
im.s.Lock()
im.s.retries = attempts
im.s.Unlock()
return nil
}
// Ping sends a ping to im.addr.