diff --git a/app/vmctl/README.md b/app/vmctl/README.md index baafab5cd..30a394d29 100644 --- a/app/vmctl/README.md +++ b/app/vmctl/README.md @@ -560,6 +560,15 @@ results such as `average`, `rate`, etc. If multiple labels needs to be added, set flag for each label, for example, `--vm-extra-label label1=value1 --vm-extra-label label2=value2`. If timeseries already have label, that must be added with `--vm-extra-label` flag, flag has priority and will override label value from timeseries. +### Rate limiting + +Limiting the rate of data transfer could help to reduce pressure on disk or on destination database. +The rate limit may be set in bytes-per-second via `--vm-rate-limit` flag. + +Please note, you can also use [vmagent](https://docs.victoriametrics.com/vmagent.html) +as a proxy between `vmctl` and destination with `-remoteWrite.rateLimit` flag enabled. + + ## How to build It is recommended using [binary releases](https://github.com/VictoriaMetrics/VictoriaMetrics/releases) - `vmctl` is located in `vmutils-*` archives there. diff --git a/app/vmctl/flags.go b/app/vmctl/flags.go index d1d1d6acb..97765314a 100644 --- a/app/vmctl/flags.go +++ b/app/vmctl/flags.go @@ -36,7 +36,10 @@ const ( vmBatchSize = "vm-batch-size" vmSignificantFigures = "vm-significant-figures" vmRoundDigits = "vm-round-digits" - vmExtraLabel = "vm-extra-label" + + // also used in vm-native + vmExtraLabel = "vm-extra-label" + vmRateLimit = "vm-rate-limit" ) var ( @@ -101,6 +104,11 @@ var ( Usage: "Extra labels, that will be added to imported timeseries. In case of collision, label value defined by flag" + "will have priority. Flag can be set multiple times, to add few additional labels.", }, + &cli.Int64Flag{ + Name: vmRateLimit, + Usage: "Optional data transfer rate limit in bytes per second.\n" + + "By default the rate limit is disabled. It can be useful for limiting load on configured via '--vmAddr' destination.", + }, } ) @@ -360,6 +368,11 @@ var ( Usage: "Extra labels, that will be added to imported timeseries. In case of collision, label value defined by flag" + "will have priority. Flag can be set multiple times, to add few additional labels.", }, + &cli.Int64Flag{ + Name: vmRateLimit, + Usage: "Optional data transfer rate limit in bytes per second.\n" + + "By default the rate limit is disabled. It can be useful for limiting load on source or destination databases.", + }, } ) diff --git a/app/vmctl/limiter/limiter.go b/app/vmctl/limiter/limiter.go new file mode 100644 index 000000000..ace01f715 --- /dev/null +++ b/app/vmctl/limiter/limiter.go @@ -0,0 +1,53 @@ +package limiter + +import ( + "sync" + "time" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool" +) + +// NewLimiter creates a Limiter object +// for the given perSecondLimit +func NewLimiter(perSecondLimit int64) *Limiter { + return &Limiter{perSecondLimit: perSecondLimit} +} + +// Limiter controls the amount of budget +// that can be spent according to configured perSecondLimit +type Limiter struct { + perSecondLimit int64 + + // mu protects budget and deadline from concurrent access. + mu sync.Mutex + + // The current budget. It is increased by perSecondLimit every second. + budget int64 + + // The next deadline for increasing the budget by perSecondLimit + deadline time.Time +} + +// Register blocks for amount of time +// needed to process the given dataLen according +// to the configured perSecondLimit. +func (l *Limiter) Register(dataLen int) { + limit := l.perSecondLimit + if limit <= 0 { + return + } + + l.mu.Lock() + defer l.mu.Unlock() + + for l.budget <= 0 { + if d := time.Until(l.deadline); d > 0 { + t := timerpool.Get(d) + <-t.C + timerpool.Put(t) + } + l.budget += limit + l.deadline = time.Now().Add(time.Second) + } + l.budget -= int64(dataLen) +} diff --git a/app/vmctl/limiter/writer.go b/app/vmctl/limiter/writer.go new file mode 100644 index 000000000..134a8208e --- /dev/null +++ b/app/vmctl/limiter/writer.go @@ -0,0 +1,37 @@ +package limiter + +import ( + "io" +) + +// NewWriteLimiter creates a new WriteLimiter object +// for the give writer and Limiter. +func NewWriteLimiter(w io.Writer, limiter *Limiter) *WriteLimiter { + return &WriteLimiter{ + writer: w, + limiter: limiter, + } +} + +// WriteLimiter limits the amount of bytes written +// per second via Write() method. +// Must be created via NewWriteLimiter. +type WriteLimiter struct { + writer io.Writer + limiter *Limiter +} + +// Close implements io.Closer +// also calls Close for wrapped io.WriteCloser +func (wl *WriteLimiter) Close() error { + if c, ok := wl.writer.(io.Closer); ok { + return c.Close() + } + return nil +} + +// Write implements io.Writer +func (wl *WriteLimiter) Write(p []byte) (n int, err error) { + wl.limiter.Register(len(p)) + return wl.writer.Write(p) +} diff --git a/app/vmctl/main.go b/app/vmctl/main.go index 299656163..223da90f7 100644 --- a/app/vmctl/main.go +++ b/app/vmctl/main.go @@ -143,6 +143,7 @@ func main() { } p := vmNativeProcessor{ + rateLimit: c.Int64(vmRateLimit), filter: filter{ match: c.String(vmNativeFilterMatch), timeStart: c.String(vmNativeFilterTimeStart), @@ -195,5 +196,6 @@ func initConfigVM(c *cli.Context) vm.Config { SignificantFigures: c.Int(vmSignificantFigures), RoundDigits: c.Int(vmRoundDigits), ExtraLabels: c.StringSlice(vmExtraLabel), + RateLimit: c.Int64(vmRateLimit), } } diff --git a/app/vmctl/vm/vm.go b/app/vmctl/vm/vm.go index ff15e776c..d37b040ff 100644 --- a/app/vmctl/vm/vm.go +++ b/app/vmctl/vm/vm.go @@ -13,6 +13,7 @@ import ( "sync" "time" + "github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/limiter" "github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal" ) @@ -47,6 +48,9 @@ type Config struct { RoundDigits int // ExtraLabels that will be added to all imported series. Must be in label=value format. ExtraLabels []string + // RateLimit defines a data transfer speed in bytes per second. + // Is applied to each worker (see Concurrency) independently. + RateLimit int64 } // Importer performs insertion of timeseries @@ -63,6 +67,8 @@ type Importer struct { input chan *TimeSeries errors chan *ImportError + rl *limiter.Limiter + wg sync.WaitGroup once sync.Once @@ -123,6 +129,7 @@ func NewImporter(cfg Config) (*Importer, error) { compress: cfg.Compress, user: cfg.User, password: cfg.Password, + rl: limiter.NewLimiter(cfg.RateLimit), close: make(chan struct{}), input: make(chan *TimeSeries, cfg.Concurrency*4), errors: make(chan *ImportError, cfg.Concurrency), @@ -304,12 +311,13 @@ func (im *Importer) Import(tsBatch []*TimeSeries) error { w := io.Writer(pw) if im.compress { - zw, err := gzip.NewWriterLevel(pw, 1) + zw, err := gzip.NewWriterLevel(w, 1) if err != nil { return fmt.Errorf("unexpected error when creating gzip writer: %s", err) } w = zw } + w = limiter.NewWriteLimiter(w, im.rl) bw := bufio.NewWriterSize(w, 16*1024) var totalSamples, totalBytes int @@ -324,8 +332,8 @@ func (im *Importer) Import(tsBatch []*TimeSeries) error { if err := bw.Flush(); err != nil { return err } - if im.compress { - err := w.(*gzip.Writer).Close() + if closer, ok := w.(io.Closer); ok { + err := closer.Close() if err != nil { return err } diff --git a/app/vmctl/vm_native.go b/app/vmctl/vm_native.go index 02a9ffd0d..60cec652f 100644 --- a/app/vmctl/vm_native.go +++ b/app/vmctl/vm_native.go @@ -7,12 +7,15 @@ import ( "log" "net/http" - "github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/vm" "github.com/cheggaaa/pb/v3" + + "github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/limiter" + "github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/vm" ) type vmNativeProcessor struct { - filter filter + filter filter + rateLimit int64 dst *vmNativeClient src *vmNativeClient @@ -84,7 +87,12 @@ func (p *vmNativeProcessor) run() error { bar := pb.ProgressBarTemplate(barTpl).Start64(0) barReader := bar.NewProxyReader(exportReader) - _, err = io.Copy(pw, barReader) + w := io.Writer(pw) + if p.rateLimit > 0 { + rl := limiter.NewLimiter(p.rateLimit) + w = limiter.NewWriteLimiter(pw, rl) + } + _, err = io.Copy(w, barReader) if err != nil { return fmt.Errorf("failed to write into %q: %s", p.dst.addr, err) }