mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-10 15:14:09 +00:00
vmctl: add option to rate limit data transfer speed
The new flag `vm-rate-limit` defines data transfer speed limit in bytes per second. Rate limiting is not applied if flag is omitted. https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1405 Signed-off-by: hagen1778 <roman@victoriametrics.com>
This commit is contained in:
parent
c74a6e8714
commit
e405b29a83
7 changed files with 137 additions and 7 deletions
|
@ -560,6 +560,15 @@ results such as `average`, `rate`, etc.
|
|||
If multiple labels needs to be added, set flag for each label, for example, `--vm-extra-label label1=value1 --vm-extra-label label2=value2`.
|
||||
If timeseries already have label, that must be added with `--vm-extra-label` flag, flag has priority and will override label value from timeseries.
|
||||
|
||||
### Rate limiting
|
||||
|
||||
Limiting the rate of data transfer could help to reduce pressure on disk or on destination database.
|
||||
The rate limit may be set in bytes-per-second via `--vm-rate-limit` flag.
|
||||
|
||||
Please note, you can also use [vmagent](https://docs.victoriametrics.com/vmagent.html)
|
||||
as a proxy between `vmctl` and destination with `-remoteWrite.rateLimit` flag enabled.
|
||||
|
||||
|
||||
## How to build
|
||||
|
||||
It is recommended using [binary releases](https://github.com/VictoriaMetrics/VictoriaMetrics/releases) - `vmctl` is located in `vmutils-*` archives there.
|
||||
|
|
|
@ -36,7 +36,10 @@ const (
|
|||
vmBatchSize = "vm-batch-size"
|
||||
vmSignificantFigures = "vm-significant-figures"
|
||||
vmRoundDigits = "vm-round-digits"
|
||||
vmExtraLabel = "vm-extra-label"
|
||||
|
||||
// also used in vm-native
|
||||
vmExtraLabel = "vm-extra-label"
|
||||
vmRateLimit = "vm-rate-limit"
|
||||
)
|
||||
|
||||
var (
|
||||
|
@ -101,6 +104,11 @@ var (
|
|||
Usage: "Extra labels, that will be added to imported timeseries. In case of collision, label value defined by flag" +
|
||||
"will have priority. Flag can be set multiple times, to add few additional labels.",
|
||||
},
|
||||
&cli.Int64Flag{
|
||||
Name: vmRateLimit,
|
||||
Usage: "Optional data transfer rate limit in bytes per second.\n" +
|
||||
"By default the rate limit is disabled. It can be useful for limiting load on configured via '--vmAddr' destination.",
|
||||
},
|
||||
}
|
||||
)
|
||||
|
||||
|
@ -360,6 +368,11 @@ var (
|
|||
Usage: "Extra labels, that will be added to imported timeseries. In case of collision, label value defined by flag" +
|
||||
"will have priority. Flag can be set multiple times, to add few additional labels.",
|
||||
},
|
||||
&cli.Int64Flag{
|
||||
Name: vmRateLimit,
|
||||
Usage: "Optional data transfer rate limit in bytes per second.\n" +
|
||||
"By default the rate limit is disabled. It can be useful for limiting load on source or destination databases.",
|
||||
},
|
||||
}
|
||||
)
|
||||
|
||||
|
|
53
app/vmctl/limiter/limiter.go
Normal file
53
app/vmctl/limiter/limiter.go
Normal file
|
@ -0,0 +1,53 @@
|
|||
package limiter
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool"
|
||||
)
|
||||
|
||||
// NewLimiter creates a Limiter object
|
||||
// for the given perSecondLimit
|
||||
func NewLimiter(perSecondLimit int64) *Limiter {
|
||||
return &Limiter{perSecondLimit: perSecondLimit}
|
||||
}
|
||||
|
||||
// Limiter controls the amount of budget
|
||||
// that can be spent according to configured perSecondLimit
|
||||
type Limiter struct {
|
||||
perSecondLimit int64
|
||||
|
||||
// mu protects budget and deadline from concurrent access.
|
||||
mu sync.Mutex
|
||||
|
||||
// The current budget. It is increased by perSecondLimit every second.
|
||||
budget int64
|
||||
|
||||
// The next deadline for increasing the budget by perSecondLimit
|
||||
deadline time.Time
|
||||
}
|
||||
|
||||
// Register blocks for amount of time
|
||||
// needed to process the given dataLen according
|
||||
// to the configured perSecondLimit.
|
||||
func (l *Limiter) Register(dataLen int) {
|
||||
limit := l.perSecondLimit
|
||||
if limit <= 0 {
|
||||
return
|
||||
}
|
||||
|
||||
l.mu.Lock()
|
||||
defer l.mu.Unlock()
|
||||
|
||||
for l.budget <= 0 {
|
||||
if d := time.Until(l.deadline); d > 0 {
|
||||
t := timerpool.Get(d)
|
||||
<-t.C
|
||||
timerpool.Put(t)
|
||||
}
|
||||
l.budget += limit
|
||||
l.deadline = time.Now().Add(time.Second)
|
||||
}
|
||||
l.budget -= int64(dataLen)
|
||||
}
|
37
app/vmctl/limiter/writer.go
Normal file
37
app/vmctl/limiter/writer.go
Normal file
|
@ -0,0 +1,37 @@
|
|||
package limiter
|
||||
|
||||
import (
|
||||
"io"
|
||||
)
|
||||
|
||||
// NewWriteLimiter creates a new WriteLimiter object
|
||||
// for the give writer and Limiter.
|
||||
func NewWriteLimiter(w io.Writer, limiter *Limiter) *WriteLimiter {
|
||||
return &WriteLimiter{
|
||||
writer: w,
|
||||
limiter: limiter,
|
||||
}
|
||||
}
|
||||
|
||||
// WriteLimiter limits the amount of bytes written
|
||||
// per second via Write() method.
|
||||
// Must be created via NewWriteLimiter.
|
||||
type WriteLimiter struct {
|
||||
writer io.Writer
|
||||
limiter *Limiter
|
||||
}
|
||||
|
||||
// Close implements io.Closer
|
||||
// also calls Close for wrapped io.WriteCloser
|
||||
func (wl *WriteLimiter) Close() error {
|
||||
if c, ok := wl.writer.(io.Closer); ok {
|
||||
return c.Close()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Write implements io.Writer
|
||||
func (wl *WriteLimiter) Write(p []byte) (n int, err error) {
|
||||
wl.limiter.Register(len(p))
|
||||
return wl.writer.Write(p)
|
||||
}
|
|
@ -143,6 +143,7 @@ func main() {
|
|||
}
|
||||
|
||||
p := vmNativeProcessor{
|
||||
rateLimit: c.Int64(vmRateLimit),
|
||||
filter: filter{
|
||||
match: c.String(vmNativeFilterMatch),
|
||||
timeStart: c.String(vmNativeFilterTimeStart),
|
||||
|
@ -195,5 +196,6 @@ func initConfigVM(c *cli.Context) vm.Config {
|
|||
SignificantFigures: c.Int(vmSignificantFigures),
|
||||
RoundDigits: c.Int(vmRoundDigits),
|
||||
ExtraLabels: c.StringSlice(vmExtraLabel),
|
||||
RateLimit: c.Int64(vmRateLimit),
|
||||
}
|
||||
}
|
||||
|
|
|
@ -13,6 +13,7 @@ import (
|
|||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/limiter"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal"
|
||||
)
|
||||
|
||||
|
@ -47,6 +48,9 @@ type Config struct {
|
|||
RoundDigits int
|
||||
// ExtraLabels that will be added to all imported series. Must be in label=value format.
|
||||
ExtraLabels []string
|
||||
// RateLimit defines a data transfer speed in bytes per second.
|
||||
// Is applied to each worker (see Concurrency) independently.
|
||||
RateLimit int64
|
||||
}
|
||||
|
||||
// Importer performs insertion of timeseries
|
||||
|
@ -63,6 +67,8 @@ type Importer struct {
|
|||
input chan *TimeSeries
|
||||
errors chan *ImportError
|
||||
|
||||
rl *limiter.Limiter
|
||||
|
||||
wg sync.WaitGroup
|
||||
once sync.Once
|
||||
|
||||
|
@ -123,6 +129,7 @@ func NewImporter(cfg Config) (*Importer, error) {
|
|||
compress: cfg.Compress,
|
||||
user: cfg.User,
|
||||
password: cfg.Password,
|
||||
rl: limiter.NewLimiter(cfg.RateLimit),
|
||||
close: make(chan struct{}),
|
||||
input: make(chan *TimeSeries, cfg.Concurrency*4),
|
||||
errors: make(chan *ImportError, cfg.Concurrency),
|
||||
|
@ -304,12 +311,13 @@ func (im *Importer) Import(tsBatch []*TimeSeries) error {
|
|||
|
||||
w := io.Writer(pw)
|
||||
if im.compress {
|
||||
zw, err := gzip.NewWriterLevel(pw, 1)
|
||||
zw, err := gzip.NewWriterLevel(w, 1)
|
||||
if err != nil {
|
||||
return fmt.Errorf("unexpected error when creating gzip writer: %s", err)
|
||||
}
|
||||
w = zw
|
||||
}
|
||||
w = limiter.NewWriteLimiter(w, im.rl)
|
||||
bw := bufio.NewWriterSize(w, 16*1024)
|
||||
|
||||
var totalSamples, totalBytes int
|
||||
|
@ -324,8 +332,8 @@ func (im *Importer) Import(tsBatch []*TimeSeries) error {
|
|||
if err := bw.Flush(); err != nil {
|
||||
return err
|
||||
}
|
||||
if im.compress {
|
||||
err := w.(*gzip.Writer).Close()
|
||||
if closer, ok := w.(io.Closer); ok {
|
||||
err := closer.Close()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -7,12 +7,15 @@ import (
|
|||
"log"
|
||||
"net/http"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/vm"
|
||||
"github.com/cheggaaa/pb/v3"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/limiter"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/vm"
|
||||
)
|
||||
|
||||
type vmNativeProcessor struct {
|
||||
filter filter
|
||||
filter filter
|
||||
rateLimit int64
|
||||
|
||||
dst *vmNativeClient
|
||||
src *vmNativeClient
|
||||
|
@ -84,7 +87,12 @@ func (p *vmNativeProcessor) run() error {
|
|||
bar := pb.ProgressBarTemplate(barTpl).Start64(0)
|
||||
barReader := bar.NewProxyReader(exportReader)
|
||||
|
||||
_, err = io.Copy(pw, barReader)
|
||||
w := io.Writer(pw)
|
||||
if p.rateLimit > 0 {
|
||||
rl := limiter.NewLimiter(p.rateLimit)
|
||||
w = limiter.NewWriteLimiter(pw, rl)
|
||||
}
|
||||
_, err = io.Copy(w, barReader)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to write into %q: %s", p.dst.addr, err)
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue