mirror of
synced 2025-03-01 15:33:35 +00:00

When `--vm-native-step-interval` is specified, explore phase will be executed
within specified intervals. Discovered metric names will be associated with
time intervals at which they were discovered. This suppose to reduce number
of requests vmctl makes per metric name since it will skip time intervals
when metric name didn't exist.
This should also reduce probability of exceeding complexity limits
for number of selected series in one request during explore phase.
Signed-off-by: hagen1778 <roman@victoriametrics.com>
(cherry picked from commit 5f8b91186a
399 lines
10 KiB
399 lines
10 KiB
package main
import (
type vmNativeProcessor struct {
filter native.Filter
dst *native.Client
src *native.Client
backoff *backoff.Backoff
s *stats
rateLimit int64
interCluster bool
cc int
isSilent bool
isNative bool
disablePerMetricRequests bool
const (
nativeExportAddr = "api/v1/export"
nativeImportAddr = "api/v1/import"
nativeWithBackoffTpl = `{{ blue "%s:" }} {{ counters . }} {{ bar . "[" "█" (cycle . "█") "▒" "]" }} {{ percent . }}`
nativeSingleProcessTpl = `Total: {{counters . }} {{ cycle . "↖" "↗" "↘" "↙" }} Speed: {{speed . }} {{string . "suffix"}}`
func (p *vmNativeProcessor) run(ctx context.Context) error {
if p.cc == 0 {
p.cc = 1
p.s = &stats{
startTime: time.Now(),
start, err := utils.ParseTime(p.filter.TimeStart)
if err != nil {
return fmt.Errorf("failed to parse %s, provided: %s, error: %w", vmNativeFilterTimeStart, p.filter.TimeStart, err)
end := time.Now().In(start.Location())
if p.filter.TimeEnd != "" {
end, err = utils.ParseTime(p.filter.TimeEnd)
if err != nil {
return fmt.Errorf("failed to parse %s, provided: %s, error: %w", vmNativeFilterTimeEnd, p.filter.TimeEnd, err)
ranges := [][]time.Time{{start, end}}
if p.filter.Chunk != "" {
ranges, err = stepper.SplitDateRange(start, end, p.filter.Chunk, p.filter.TimeReverse)
if err != nil {
return fmt.Errorf("failed to create date ranges for the given time filters: %w", err)
tenants := []string{""}
if p.interCluster {
log.Printf("Discovering tenants...")
tenants, err = p.src.GetSourceTenants(ctx, p.filter)
if err != nil {
return fmt.Errorf("failed to get tenants: %w", err)
question := fmt.Sprintf("The following tenants were discovered: %s.\n Continue?", tenants)
if !p.isSilent && !prompt(question) {
return nil
for _, tenantID := range tenants {
err := p.runBackfilling(ctx, tenantID, ranges, p.isSilent)
if err != nil {
return fmt.Errorf("migration failed: %s", err)
log.Println("Import finished!")
return nil
func (p *vmNativeProcessor) do(ctx context.Context, f native.Filter, srcURL, dstURL string, bar *pb.ProgressBar) error {
retryableFunc := func() error { return p.runSingle(ctx, f, srcURL, dstURL, bar) }
attempts, err := p.backoff.Retry(ctx, retryableFunc)
p.s.retries += attempts
if err != nil {
return fmt.Errorf("failed to migrate from %s to %s (retry attempts: %d): %w\nwith filter %s", srcURL, dstURL, attempts, err, f)
return nil
func (p *vmNativeProcessor) runSingle(ctx context.Context, f native.Filter, srcURL, dstURL string, bar *pb.ProgressBar) error {
reader, err := p.src.ExportPipe(ctx, srcURL, f)
if err != nil {
return fmt.Errorf("failed to init export pipe: %w", err)
if p.disablePerMetricRequests && bar != nil {
fmt.Printf("Continue import process with filter %s:\n", f.String())
reader = bar.NewProxyReader(reader)
pr, pw := io.Pipe()
importCh := make(chan error)
go func() {
importCh <- p.dst.ImportPipe(ctx, dstURL, pr)
w := io.Writer(pw)
if p.rateLimit > 0 {
rl := limiter.NewLimiter(p.rateLimit)
w = limiter.NewWriteLimiter(pw, rl)
written, err := io.Copy(w, reader)
if err != nil {
return fmt.Errorf("failed to write into %q: %s", p.dst.Addr, err)
p.s.bytes += uint64(written)
if err := pw.Close(); err != nil {
return err
return <-importCh
func (p *vmNativeProcessor) runBackfilling(ctx context.Context, tenantID string, ranges [][]time.Time, silent bool) error {
exportAddr := nativeExportAddr
importAddr := nativeImportAddr
if p.isNative {
exportAddr += "/native"
importAddr += "/native"
srcURL := fmt.Sprintf("%s/%s", p.src.Addr, exportAddr)
importAddr, err := vm.AddExtraLabelsToImportPath(importAddr, p.dst.ExtraLabels)
if err != nil {
return fmt.Errorf("failed to add labels to import path: %s", err)
dstURL := fmt.Sprintf("%s/%s", p.dst.Addr, importAddr)
if p.interCluster {
srcURL = fmt.Sprintf("%s/select/%s/prometheus/%s", p.src.Addr, tenantID, exportAddr)
dstURL = fmt.Sprintf("%s/insert/%s/prometheus/%s", p.dst.Addr, tenantID, importAddr)
initMessage := "Initing import process from %q to %q with filter %s"
initParams := []interface{}{srcURL, dstURL, p.filter.String()}
if p.interCluster {
initMessage = "Initing import process from %q to %q with filter %s for tenant %s"
initParams = []interface{}{srcURL, dstURL, p.filter.String(), tenantID}
fmt.Println("") // extra line for better output formatting
log.Printf(initMessage, initParams...)
if len(ranges) > 1 {
log.Printf("Selected time range will be split into %d ranges according to %q step", len(ranges), p.filter.Chunk)
var foundSeriesMsg string
var requestsToMake int
var metrics = map[string][][]time.Time{
"": ranges,
if !p.disablePerMetricRequests {
metrics, err = p.explore(ctx, p.src, tenantID, ranges, silent)
if err != nil {
return fmt.Errorf("failed to explore metric names: %s", err)
if len(metrics) == 0 {
errMsg := "no metrics found"
if tenantID != "" {
errMsg = fmt.Sprintf("%s for tenant id: %s", errMsg, tenantID)
return nil
for _, m := range metrics {
requestsToMake += len(m)
foundSeriesMsg = fmt.Sprintf("Found %d unique metric names to import. Total import/export requests to make %d", len(metrics), requestsToMake)
if !p.interCluster {
// do not prompt for intercluster because there could be many tenants,
// and we don't want to interrupt the process when moving to the next tenant.
question := foundSeriesMsg + ". Continue?"
if !silent && !prompt(question) {
return nil
} else {
var bar *pb.ProgressBar
barPrefix := "Requests to make"
if p.interCluster {
barPrefix = fmt.Sprintf("Requests to make for tenant %s", tenantID)
if !silent {
bar = barpool.NewSingleProgress(fmt.Sprintf(nativeWithBackoffTpl, barPrefix), requestsToMake)
if p.disablePerMetricRequests {
bar = barpool.NewSingleProgress(nativeSingleProcessTpl, 0)
defer bar.Finish()
filterCh := make(chan native.Filter)
errCh := make(chan error, p.cc)
var wg sync.WaitGroup
for i := 0; i < p.cc; i++ {
go func() {
defer wg.Done()
for f := range filterCh {
if !p.disablePerMetricRequests {
if err := p.do(ctx, f, srcURL, dstURL, nil); err != nil {
errCh <- err
if bar != nil {
} else {
if err := p.runSingle(ctx, f, srcURL, dstURL, bar); err != nil {
errCh <- err
// any error breaks the import
for mName, mRanges := range metrics {
match, err := buildMatchWithFilter(p.filter.Match, mName)
if err != nil {
logger.Errorf("failed to build filter %q for metric name %q: %s", p.filter.Match, mName, err)
for _, times := range mRanges {
select {
case <-ctx.Done():
return fmt.Errorf("context canceled")
case infErr := <-errCh:
return fmt.Errorf("export/import error: %s", infErr)
case filterCh <- native.Filter{
Match: match,
TimeStart: times[0].Format(time.RFC3339),
TimeEnd: times[1].Format(time.RFC3339),
for err := range errCh {
return fmt.Errorf("import process failed: %s", err)
return nil
func (p *vmNativeProcessor) explore(ctx context.Context, src *native.Client, tenantID string, ranges [][]time.Time, silent bool) (map[string][][]time.Time, error) {
log.Printf("Exploring metrics...")
var bar *pb.ProgressBar
if !silent {
bar = barpool.NewSingleProgress(fmt.Sprintf(nativeWithBackoffTpl, "Explore requests to make"), len(ranges))
defer bar.Finish()
metrics := make(map[string][][]time.Time)
for _, r := range ranges {
ms, err := src.Explore(ctx, p.filter, tenantID, r[0], r[1])
if err != nil {
return nil, fmt.Errorf("cannot get metrics from %s on interval %v-%v: %w", src.Addr, r[0], r[1], err)
for i := range ms {
metrics[ms[i]] = append(metrics[ms[i]], r)
if bar != nil {
return metrics, nil
// stats represents client statistic
// when processing data
type stats struct {
startTime time.Time
bytes uint64
requests uint64
retries uint64
func (s *stats) String() string {
defer s.Unlock()
totalImportDuration := time.Since(s.startTime)
totalImportDurationS := totalImportDuration.Seconds()
bytesPerS := byteCountSI(0)
if s.bytes > 0 && totalImportDurationS > 0 {
bytesPerS = byteCountSI(int64(float64(s.bytes) / totalImportDurationS))
return fmt.Sprintf("VictoriaMetrics importer stats:\n"+
" time spent while importing: %v;\n"+
" total bytes: %s;\n"+
" bytes/s: %s;\n"+
" requests: %d;\n"+
" requests retries: %d;",
byteCountSI(int64(s.bytes)), bytesPerS,
s.requests, s.retries)
func byteCountSI(b int64) string {
const unit = 1000
if b < unit {
return fmt.Sprintf("%d B", b)
div, exp := int64(unit), 0
for n := b / unit; n >= unit; n /= unit {
div *= unit
return fmt.Sprintf("%.1f %cB",
float64(b)/float64(div), "kMGTPE"[exp])
func buildMatchWithFilter(filter string, metricName string) (string, error) {
if filter == metricName {
return filter, nil
nameFilter := fmt.Sprintf("__name__=%q", metricName)
tfss, err := searchutils.ParseMetricSelector(filter)
if err != nil {
return "", err
var filters []string
for _, tfs := range tfss {
var a []string
for _, tf := range tfs {
if len(tf.Key) == 0 {
a = append(a, tf.String())
a = append(a, nameFilter)
filters = append(filters, strings.Join(a, ","))
match := "{" + strings.Join(filters, " or ") + "}"
return match, nil