mirror of
synced 2025-03-21 15:45:01 +00:00

It is better developing vmctl tool in VictoriaMetrics repository, so it could be released together with the rest of vmutils tools such as vmalert, vmagent, vmbackup, vmrestore and vmauth.
369 lines
8.8 KiB
369 lines
8.8 KiB
package vm
import (
// Config contains list of params to configure
// the Importer
type Config struct {
// VictoriaMetrics address to perform import requests
// --httpListenAddr value for single node version
// --httpListenAddr value of VMSelect component for cluster version
Addr string
// Concurrency defines number of worker
// performing the import requests concurrently
Concurrency uint8
// Whether to apply gzip compression
Compress bool
// AccountID for cluster version.
// Empty value assumes it is a single node version
AccountID string
// BatchSize defines how many samples
// importer collects before sending the import request
BatchSize int
// User name for basic auth
User string
// Password for basic auth
Password string
// SignificantFigures defines the number of significant figures to leave
// in metric values before importing.
// Zero value saves all the significant decimal places
SignificantFigures int
// ExtraLabels that will be added to all imported series. Must be in label=value format.
ExtraLabels []string
// Importer performs insertion of timeseries
// via VictoriaMetrics import protocol
// see https://github.com/VictoriaMetrics/VictoriaMetrics/tree/master#how-to-import-time-series-data
type Importer struct {
addr string
importPath string
compress bool
user string
password string
close chan struct{}
input chan *TimeSeries
errors chan *ImportError
wg sync.WaitGroup
once sync.Once
s *stats
func (im *Importer) ResetStats() {
im.s = &stats{
startTime: time.Now(),
func (im *Importer) Stats() string {
return im.s.String()
// AddExtraLabelsToImportPath - adds extra labels query params to given url path.
func AddExtraLabelsToImportPath(path string, extraLabels []string) (string, error) {
dst := path
separator := "?"
for _, extraLabel := range extraLabels {
if !strings.Contains(extraLabel, "=") {
return path, fmt.Errorf("bad format for extra_label flag, it must be `key=value`, got: %q", extraLabel)
if strings.Contains(dst, "?") {
separator = "&"
dst += fmt.Sprintf("%sextra_label=%s", separator, extraLabel)
return dst, nil
func NewImporter(cfg Config) (*Importer, error) {
if cfg.Concurrency < 1 {
return nil, fmt.Errorf("concurrency can't be lower than 1")
addr := strings.TrimRight(cfg.Addr, "/")
// if single version
// see https://github.com/VictoriaMetrics/VictoriaMetrics/tree/master#how-to-import-time-series-data
importPath := addr + "/api/v1/import"
if cfg.AccountID != "" {
// if cluster version
// see https://github.com/VictoriaMetrics/VictoriaMetrics/tree/cluster#url-format
importPath = fmt.Sprintf("%s/insert/%s/prometheus/api/v1/import", addr, cfg.AccountID)
importPath, err := AddExtraLabelsToImportPath(importPath, cfg.ExtraLabels)
if err != nil {
return nil, err
im := &Importer{
addr: addr,
importPath: importPath,
compress: cfg.Compress,
user: cfg.User,
password: cfg.Password,
close: make(chan struct{}),
input: make(chan *TimeSeries, cfg.Concurrency*4),
errors: make(chan *ImportError, cfg.Concurrency),
if err := im.Ping(); err != nil {
return nil, fmt.Errorf("ping to %q failed: %s", addr, err)
if cfg.BatchSize < 1 {
cfg.BatchSize = 1e5
for i := 0; i < int(cfg.Concurrency); i++ {
go func() {
defer im.wg.Done()
im.startWorker(cfg.BatchSize, cfg.SignificantFigures)
return im, nil
// ImportError is type of error generated
// in case of unsuccessful import request
type ImportError struct {
// The batch of timeseries that failed
Batch []*TimeSeries
// The error that appeared during insert
Err error
// Errors returns a channel for receiving
// import errors if any
func (im *Importer) Errors() chan *ImportError { return im.errors }
// Input returns a channel for sending timeseries
// that need to be imported
func (im *Importer) Input() chan<- *TimeSeries { return im.input }
// Close sends signal to all goroutines to exit
// and waits until they are finished
func (im *Importer) Close() {
im.once.Do(func() {
func (im *Importer) startWorker(batchSize, significantFigures int) {
var batch []*TimeSeries
var dataPoints int
var waitForBatch time.Time
for {
select {
case <-im.close:
if err := im.Import(batch); err != nil {
im.errors <- &ImportError{
Batch: batch,
Err: err,
case ts := <-im.input:
// init waitForBatch when first
// value was received
if waitForBatch.IsZero() {
waitForBatch = time.Now()
if significantFigures > 0 {
// Round values according to significantFigures
for i, v := range ts.Values {
ts.Values[i] = decimal.Round(v, significantFigures)
batch = append(batch, ts)
dataPoints += len(ts.Values)
if dataPoints < batchSize {
im.s.idleDuration += time.Since(waitForBatch)
if err := im.flush(batch); err != nil {
im.errors <- &ImportError{
Batch: batch,
Err: err,
// make a new batch, since old one was referenced as err
batch = make([]*TimeSeries, len(batch))
batch = batch[:0]
dataPoints = 0
waitForBatch = time.Now()
const (
// TODO: make configurable
backoffRetries = 5
backoffFactor = 1.7
backoffMinDuration = time.Second
func (im *Importer) flush(b []*TimeSeries) error {
var err error
for i := 0; i < backoffRetries; i++ {
err = im.Import(b)
if err == nil {
return nil
if errors.Is(err, ErrBadRequest) {
return err // fail fast if not recoverable
backoff := float64(backoffMinDuration) * math.Pow(backoffFactor, float64(i))
return fmt.Errorf("import failed with %d retries: %s", backoffRetries, err)
func (im *Importer) Ping() error {
url := fmt.Sprintf("%s/health", im.addr)
req, err := http.NewRequest("GET", url, nil)
if err != nil {
return fmt.Errorf("cannot create request to %q: %s", im.addr, err)
if im.user != "" {
req.SetBasicAuth(im.user, im.password)
resp, err := http.DefaultClient.Do(req)
if err != nil {
return err
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("bad status code: %d", resp.StatusCode)
return nil
func (im *Importer) Import(tsBatch []*TimeSeries) error {
if len(tsBatch) < 1 {
return nil
pr, pw := io.Pipe()
req, err := http.NewRequest("POST", im.importPath, pr)
if err != nil {
return fmt.Errorf("cannot create request to %q: %s", im.addr, err)
if im.user != "" {
req.SetBasicAuth(im.user, im.password)
if im.compress {
req.Header.Set("Content-Encoding", "gzip")
errCh := make(chan error)
go func() {
errCh <- do(req)
w := io.Writer(pw)
if im.compress {
zw, err := gzip.NewWriterLevel(pw, 1)
if err != nil {
return fmt.Errorf("unexpected error when creating gzip writer: %s", err)
w = zw
bw := bufio.NewWriterSize(w, 16*1024)
var totalSamples, totalBytes int
for _, ts := range tsBatch {
n, err := ts.write(bw)
if err != nil {
return fmt.Errorf("write err: %w", err)
totalBytes += n
totalSamples += len(ts.Values)
if err := bw.Flush(); err != nil {
return err
if im.compress {
err := w.(*gzip.Writer).Close()
if err != nil {
return err
if err := pw.Close(); err != nil {
return err
requestErr := <-errCh
if requestErr != nil {
return fmt.Errorf("import request error for %q: %w", im.addr, requestErr)
im.s.bytes += uint64(totalBytes)
im.s.samples += uint64(totalSamples)
return nil
var ErrBadRequest = errors.New("bad request")
func do(req *http.Request) error {
resp, err := http.DefaultClient.Do(req)
if err != nil {
return fmt.Errorf("unexpected error when performing request: %s", err)
defer resp.Body.Close()
if resp.StatusCode != http.StatusNoContent {
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return fmt.Errorf("failed to read response body for status code %d: %s", resp.StatusCode, err)
if resp.StatusCode == http.StatusBadRequest {
return fmt.Errorf("%w: unexpected response code %d: %s", ErrBadRequest, resp.StatusCode, string(body))
return fmt.Errorf("unexpected response code %d: %s", resp.StatusCode, string(body))
return nil
func byteCountSI(b int64) string {
const unit = 1000
if b < unit {
return fmt.Sprintf("%d B", b)
div, exp := int64(unit), 0
for n := b / unit; n >= unit; n /= unit {
div *= unit
return fmt.Sprintf("%.1f %cB",
float64(b)/float64(div), "kMGTPE"[exp])