vmctl: add option to rate limit data transfer speed

The new flag `vm-rate-limit` defines data transfer speed limit
in bytes per second. Rate limiting is not applied if flag is omitted.

https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1405

Signed-off-by: hagen1778 <roman@victoriametrics.com>
This commit is contained in:
hagen1778 2021-12-24 12:18:07 +02:00 committed by Nikolay
parent dd1b789c15
commit b8369e2f3e
7 changed files with 137 additions and 7 deletions

View file

@ -560,6 +560,15 @@ results such as `average`, `rate`, etc.
If multiple labels needs to be added, set flag for each label, for example, `--vm-extra-label label1=value1 --vm-extra-label label2=value2`.
If timeseries already have label, that must be added with `--vm-extra-label` flag, flag has priority and will override label value from timeseries.
### Rate limiting
Limiting the rate of data transfer could help to reduce pressure on disk or on destination database.
The rate limit may be set in bytes-per-second via `--vm-rate-limit` flag.
Please note, you can also use [vmagent](https://docs.victoriametrics.com/vmagent.html)
as a proxy between `vmctl` and destination with `-remoteWrite.rateLimit` flag enabled.
## How to build
It is recommended using [binary releases](https://github.com/VictoriaMetrics/VictoriaMetrics/releases) - `vmctl` is located in `vmutils-*` archives there.

View file

@ -36,7 +36,10 @@ const (
vmBatchSize = "vm-batch-size"
vmSignificantFigures = "vm-significant-figures"
vmRoundDigits = "vm-round-digits"
vmExtraLabel = "vm-extra-label"
// also used in vm-native
vmExtraLabel = "vm-extra-label"
vmRateLimit = "vm-rate-limit"
)
var (
@ -101,6 +104,11 @@ var (
Usage: "Extra labels, that will be added to imported timeseries. In case of collision, label value defined by flag" +
"will have priority. Flag can be set multiple times, to add few additional labels.",
},
&cli.Int64Flag{
Name: vmRateLimit,
Usage: "Optional data transfer rate limit in bytes per second.\n" +
"By default the rate limit is disabled. It can be useful for limiting load on configured via '--vmAddr' destination.",
},
}
)
@ -360,6 +368,11 @@ var (
Usage: "Extra labels, that will be added to imported timeseries. In case of collision, label value defined by flag" +
"will have priority. Flag can be set multiple times, to add few additional labels.",
},
&cli.Int64Flag{
Name: vmRateLimit,
Usage: "Optional data transfer rate limit in bytes per second.\n" +
"By default the rate limit is disabled. It can be useful for limiting load on source or destination databases.",
},
}
)

View file

@ -0,0 +1,53 @@
package limiter
import (
"sync"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool"
)
// NewLimiter creates a Limiter object
// for the given perSecondLimit
func NewLimiter(perSecondLimit int64) *Limiter {
return &Limiter{perSecondLimit: perSecondLimit}
}
// Limiter controls the amount of budget
// that can be spent according to configured perSecondLimit
type Limiter struct {
perSecondLimit int64
// mu protects budget and deadline from concurrent access.
mu sync.Mutex
// The current budget. It is increased by perSecondLimit every second.
budget int64
// The next deadline for increasing the budget by perSecondLimit
deadline time.Time
}
// Register blocks for amount of time
// needed to process the given dataLen according
// to the configured perSecondLimit.
func (l *Limiter) Register(dataLen int) {
limit := l.perSecondLimit
if limit <= 0 {
return
}
l.mu.Lock()
defer l.mu.Unlock()
for l.budget <= 0 {
if d := time.Until(l.deadline); d > 0 {
t := timerpool.Get(d)
<-t.C
timerpool.Put(t)
}
l.budget += limit
l.deadline = time.Now().Add(time.Second)
}
l.budget -= int64(dataLen)
}

View file

@ -0,0 +1,37 @@
package limiter
import (
"io"
)
// NewWriteLimiter creates a new WriteLimiter object
// for the give writer and Limiter.
func NewWriteLimiter(w io.Writer, limiter *Limiter) *WriteLimiter {
return &WriteLimiter{
writer: w,
limiter: limiter,
}
}
// WriteLimiter limits the amount of bytes written
// per second via Write() method.
// Must be created via NewWriteLimiter.
type WriteLimiter struct {
writer io.Writer
limiter *Limiter
}
// Close implements io.Closer
// also calls Close for wrapped io.WriteCloser
func (wl *WriteLimiter) Close() error {
if c, ok := wl.writer.(io.Closer); ok {
return c.Close()
}
return nil
}
// Write implements io.Writer
func (wl *WriteLimiter) Write(p []byte) (n int, err error) {
wl.limiter.Register(len(p))
return wl.writer.Write(p)
}

View file

@ -143,6 +143,7 @@ func main() {
}
p := vmNativeProcessor{
rateLimit: c.Int64(vmRateLimit),
filter: filter{
match: c.String(vmNativeFilterMatch),
timeStart: c.String(vmNativeFilterTimeStart),
@ -195,5 +196,6 @@ func initConfigVM(c *cli.Context) vm.Config {
SignificantFigures: c.Int(vmSignificantFigures),
RoundDigits: c.Int(vmRoundDigits),
ExtraLabels: c.StringSlice(vmExtraLabel),
RateLimit: c.Int64(vmRateLimit),
}
}

View file

@ -13,6 +13,7 @@ import (
"sync"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/limiter"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal"
)
@ -47,6 +48,9 @@ type Config struct {
RoundDigits int
// ExtraLabels that will be added to all imported series. Must be in label=value format.
ExtraLabels []string
// RateLimit defines a data transfer speed in bytes per second.
// Is applied to each worker (see Concurrency) independently.
RateLimit int64
}
// Importer performs insertion of timeseries
@ -63,6 +67,8 @@ type Importer struct {
input chan *TimeSeries
errors chan *ImportError
rl *limiter.Limiter
wg sync.WaitGroup
once sync.Once
@ -123,6 +129,7 @@ func NewImporter(cfg Config) (*Importer, error) {
compress: cfg.Compress,
user: cfg.User,
password: cfg.Password,
rl: limiter.NewLimiter(cfg.RateLimit),
close: make(chan struct{}),
input: make(chan *TimeSeries, cfg.Concurrency*4),
errors: make(chan *ImportError, cfg.Concurrency),
@ -304,12 +311,13 @@ func (im *Importer) Import(tsBatch []*TimeSeries) error {
w := io.Writer(pw)
if im.compress {
zw, err := gzip.NewWriterLevel(pw, 1)
zw, err := gzip.NewWriterLevel(w, 1)
if err != nil {
return fmt.Errorf("unexpected error when creating gzip writer: %s", err)
}
w = zw
}
w = limiter.NewWriteLimiter(w, im.rl)
bw := bufio.NewWriterSize(w, 16*1024)
var totalSamples, totalBytes int
@ -324,8 +332,8 @@ func (im *Importer) Import(tsBatch []*TimeSeries) error {
if err := bw.Flush(); err != nil {
return err
}
if im.compress {
err := w.(*gzip.Writer).Close()
if closer, ok := w.(io.Closer); ok {
err := closer.Close()
if err != nil {
return err
}

View file

@ -7,12 +7,15 @@ import (
"log"
"net/http"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/vm"
"github.com/cheggaaa/pb/v3"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/limiter"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/vm"
)
type vmNativeProcessor struct {
filter filter
filter filter
rateLimit int64
dst *vmNativeClient
src *vmNativeClient
@ -84,7 +87,12 @@ func (p *vmNativeProcessor) run() error {
bar := pb.ProgressBarTemplate(barTpl).Start64(0)
barReader := bar.NewProxyReader(exportReader)
_, err = io.Copy(pw, barReader)
w := io.Writer(pw)
if p.rateLimit > 0 {
rl := limiter.NewLimiter(p.rateLimit)
w = limiter.NewWriteLimiter(pw, rl)
}
_, err = io.Copy(w, barReader)
if err != nil {
return fmt.Errorf("failed to write into %q: %s", p.dst.addr, err)
}