diff --git a/app/vmctl/native/client.go b/app/vmctl/native/client.go index bd22bc6b7..2ed491ec0 100644 --- a/app/vmctl/native/client.go +++ b/app/vmctl/native/client.go @@ -6,6 +6,7 @@ import ( "fmt" "io" "net/http" + "time" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/auth" ) @@ -34,7 +35,7 @@ type Response struct { } // Explore finds metric names by provided filter from api/v1/label/__name__/values -func (c *Client) Explore(ctx context.Context, f Filter, tenantID string) ([]string, error) { +func (c *Client) Explore(ctx context.Context, f Filter, tenantID string, start, end time.Time) ([]string, error) { url := fmt.Sprintf("%s/%s", c.Addr, nativeMetricNamesAddr) if tenantID != "" { url = fmt.Sprintf("%s/select/%s/prometheus/%s", c.Addr, tenantID, nativeMetricNamesAddr) @@ -45,12 +46,8 @@ func (c *Client) Explore(ctx context.Context, f Filter, tenantID string) ([]stri } params := req.URL.Query() - if f.TimeStart != "" { - params.Set("start", f.TimeStart) - } - if f.TimeEnd != "" { - params.Set("end", f.TimeEnd) - } + params.Set("start", start.Format(time.RFC3339)) + params.Set("end", end.Format(time.RFC3339)) params.Set("match[]", f.Match) req.URL.RawQuery = params.Encode() @@ -63,11 +60,7 @@ func (c *Client) Explore(ctx context.Context, f Filter, tenantID string) ([]stri if err := json.NewDecoder(resp.Body).Decode(&response); err != nil { return nil, fmt.Errorf("cannot decode series response: %s", err) } - - if err := resp.Body.Close(); err != nil { - return nil, fmt.Errorf("cannot close series response body: %s", err) - } - return response.MetricNames, nil + return response.MetricNames, resp.Body.Close() } // ImportPipe uses pipe reader in request to process data diff --git a/app/vmctl/time.go b/app/vmctl/utils/time.go similarity index 79% rename from app/vmctl/time.go rename to app/vmctl/utils/time.go index e4c5f03ff..069dfb054 100644 --- a/app/vmctl/time.go +++ b/app/vmctl/utils/time.go @@ -1,4 +1,4 @@ -package main +package utils import ( "fmt" @@ -13,7 +13,9 @@ const ( maxTimeMsecs = int64(1<<63-1) / 1e6 ) -func parseTime(s string) (time.Time, error) { +// ParseTime parses time in s string and returns time.Time object +// if parse correctly or error if not +func ParseTime(s string) (time.Time, error) { msecs, err := promutils.ParseTimeMsec(s) if err != nil { return time.Time{}, fmt.Errorf("cannot parse %s: %w", s, err) diff --git a/app/vmctl/time_test.go b/app/vmctl/utils/time_test.go similarity index 98% rename from app/vmctl/time_test.go rename to app/vmctl/utils/time_test.go index 989162078..95f5fbdf4 100644 --- a/app/vmctl/time_test.go +++ b/app/vmctl/utils/time_test.go @@ -1,4 +1,4 @@ -package main +package utils import ( "testing" @@ -165,7 +165,7 @@ func TestGetTime(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - got, err := parseTime(tt.s) + got, err := ParseTime(tt.s) if (err != nil) != tt.wantErr { t.Errorf("ParseTime() error = %v, wantErr %v", err, tt.wantErr) return diff --git a/app/vmctl/vm_native.go b/app/vmctl/vm_native.go index 31375ae3d..bbdbe5f63 100644 --- a/app/vmctl/vm_native.go +++ b/app/vmctl/vm_native.go @@ -16,6 +16,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/limiter" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/native" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/stepper" + "github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/utils" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/vm" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/searchutils" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" @@ -53,14 +54,14 @@ func (p *vmNativeProcessor) run(ctx context.Context) error { startTime: time.Now(), } - start, err := parseTime(p.filter.TimeStart) + start, err := utils.ParseTime(p.filter.TimeStart) if err != nil { return fmt.Errorf("failed to parse %s, provided: %s, error: %w", vmNativeFilterTimeStart, p.filter.TimeStart, err) } end := time.Now().In(start.Location()) if p.filter.TimeEnd != "" { - end, err = parseTime(p.filter.TimeEnd) + end, err = utils.ParseTime(p.filter.TimeEnd) if err != nil { return fmt.Errorf("failed to parse %s, provided: %s, error: %w", vmNativeFilterTimeEnd, p.filter.TimeEnd, err) } @@ -174,28 +175,29 @@ func (p *vmNativeProcessor) runBackfilling(ctx context.Context, tenantID string, dstURL = fmt.Sprintf("%s/insert/%s/prometheus/%s", p.dst.Addr, tenantID, importAddr) } - barPrefix := "Requests to make" initMessage := "Initing import process from %q to %q with filter %s" initParams := []interface{}{srcURL, dstURL, p.filter.String()} if p.interCluster { - barPrefix = fmt.Sprintf("Requests to make for tenant %s", tenantID) initMessage = "Initing import process from %q to %q with filter %s for tenant %s" initParams = []interface{}{srcURL, dstURL, p.filter.String(), tenantID} } fmt.Println("") // extra line for better output formatting log.Printf(initMessage, initParams...) + if len(ranges) > 1 { + log.Printf("Selected time range will be split into %d ranges according to %q step", len(ranges), p.filter.Chunk) + } var foundSeriesMsg string - - metrics := []string{p.filter.Match} + var requestsToMake int + var metrics = map[string][][]time.Time{ + "": ranges, + } if !p.disablePerMetricRequests { - log.Printf("Exploring metrics...") - metrics, err = p.src.Explore(ctx, p.filter, tenantID) + metrics, err = p.explore(ctx, p.src, tenantID, ranges, silent) if err != nil { - return fmt.Errorf("cannot get metrics from source %s: %w", p.src.Addr, err) + return fmt.Errorf("failed to explore metric names: %s", err) } - if len(metrics) == 0 { errMsg := "no metrics found" if tenantID != "" { @@ -204,7 +206,10 @@ func (p *vmNativeProcessor) runBackfilling(ctx context.Context, tenantID string, log.Println(errMsg) return nil } - foundSeriesMsg = fmt.Sprintf("Found %d metrics to import", len(metrics)) + for _, m := range metrics { + requestsToMake += len(m) + } + foundSeriesMsg = fmt.Sprintf("Found %d unique metric names to import. Total import/export requests to make %d", len(metrics), requestsToMake) } if !p.interCluster { @@ -218,15 +223,13 @@ func (p *vmNativeProcessor) runBackfilling(ctx context.Context, tenantID string, log.Print(foundSeriesMsg) } - processingMsg := fmt.Sprintf("Requests to make: %d", len(metrics)*len(ranges)) - if len(ranges) > 1 { - processingMsg = fmt.Sprintf("Selected time range will be split into %d ranges according to %q step. %s", len(ranges), p.filter.Chunk, processingMsg) - } - log.Print(processingMsg) - var bar *pb.ProgressBar + barPrefix := "Requests to make" + if p.interCluster { + barPrefix = fmt.Sprintf("Requests to make for tenant %s", tenantID) + } if !silent { - bar = barpool.NewSingleProgress(fmt.Sprintf(nativeWithBackoffTpl, barPrefix), len(metrics)*len(ranges)) + bar = barpool.NewSingleProgress(fmt.Sprintf(nativeWithBackoffTpl, barPrefix), requestsToMake) if p.disablePerMetricRequests { bar = barpool.NewSingleProgress(nativeSingleProcessTpl, 0) } @@ -262,20 +265,19 @@ func (p *vmNativeProcessor) runBackfilling(ctx context.Context, tenantID string, } // any error breaks the import - for _, s := range metrics { - - match, err := buildMatchWithFilter(p.filter.Match, s) + for mName, mRanges := range metrics { + match, err := buildMatchWithFilter(p.filter.Match, mName) if err != nil { - logger.Errorf("failed to build export filters: %s", err) + logger.Errorf("failed to build filter %q for metric name %q: %s", p.filter.Match, mName, err) continue } - for _, times := range ranges { + for _, times := range mRanges { select { case <-ctx.Done(): return fmt.Errorf("context canceled") case infErr := <-errCh: - return fmt.Errorf("native error: %s", infErr) + return fmt.Errorf("export/import error: %s", infErr) case filterCh <- native.Filter{ Match: match, TimeStart: times[0].Format(time.RFC3339), @@ -296,6 +298,32 @@ func (p *vmNativeProcessor) runBackfilling(ctx context.Context, tenantID string, return nil } +func (p *vmNativeProcessor) explore(ctx context.Context, src *native.Client, tenantID string, ranges [][]time.Time, silent bool) (map[string][][]time.Time, error) { + log.Printf("Exploring metrics...") + + var bar *pb.ProgressBar + if !silent { + bar = barpool.NewSingleProgress(fmt.Sprintf(nativeWithBackoffTpl, "Explore requests to make"), len(ranges)) + bar.Start() + defer bar.Finish() + } + + metrics := make(map[string][][]time.Time) + for _, r := range ranges { + ms, err := src.Explore(ctx, p.filter, tenantID, r[0], r[1]) + if err != nil { + return nil, fmt.Errorf("cannot get metrics from %s on interval %v-%v: %w", src.Addr, r[0], r[1], err) + } + for i := range ms { + metrics[ms[i]] = append(metrics[ms[i]], r) + } + if bar != nil { + bar.Increment() + } + } + return metrics, nil +} + // stats represents client statistic // when processing data type stats struct { diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 50434078c..039bb1139 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -55,6 +55,7 @@ See also [LTS releases](https://docs.victoriametrics.com/lts-releases/). * FEATURE: optimize [`/api/v1/labels`](https://docs.victoriametrics.com/url-examples/#apiv1labels) and [`/api/v1/label/.../values`](https://docs.victoriametrics.com/url-examples/#apiv1labelvalues) when `match[]` filters contains metric name. For example, `/api/v1/label/instance/values?match[]=up` now works much faster than before. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5055). * FEATURE: [vmctl](https://docs.victoriametrics.com/vmctl.html): support client-side TLS configuration for [native protocol](https://docs.victoriametrics.com/vmctl/#migrating-data-from-victoriametrics). See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5748). Thanks to @khushijain21 for the [pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5824). * FEATURE: [vmctl](https://docs.victoriametrics.com/vmctl.html): support client-side TLS configuration for VictoriaMetrics destination specified via `--vm-*` cmd-line flags used in [InfluxDB](https://docs.victoriametrics.com/vmctl/#migrating-data-from-influxdb-1x), [Remote Read protocol](https://docs.victoriametrics.com/vmctl/#migrating-data-by-remote-read-protocol), [OpenTSDB](https://docs.victoriametrics.com/vmctl/#migrating-data-from-opentsdb), [Prometheus](https://docs.victoriametrics.com/vmctl/#migrating-data-from-prometheus) and [Promscale](https://docs.victoriametrics.com/vmctl/#migrating-data-from-promscale) migration modes. +* FEATURE: [vmctl](https://docs.victoriametrics.com/vmctl.html): split [explore phase](https://docs.victoriametrics.com/vmctl/#migrating-data-from-victoriametrics) in `vm-native` mode by time intervals when [--vm-native-step-interval](https://docs.victoriametrics.com/vmctl/#using-time-based-chunking-of-migration) is specified. This should reduce probability of exceeding complexity limits for number of selected series during explore phase. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5369). * BUGFIX: prevent from automatic deletion of newly registered time series when it is queried immediately after the addition. The probability of this bug has been increased significantly after [v1.99.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.99.0) because of optimizations related to registering new time series. See [this](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5948) and [this](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5959) issue. * BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): properly set `Host` header in requests to scrape targets if it is specified via [`headers` option](https://docs.victoriametrics.com/sd_configs/#http-api-client-options). Thanks to @fholzer for [the bugreport](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5969) and [the fix](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5970). diff --git a/docs/vmctl.md b/docs/vmctl.md index 885635c87..79e4c948f 100644 --- a/docs/vmctl.md +++ b/docs/vmctl.md @@ -942,7 +942,8 @@ To use this mode you need to set `--vm-intercluster` flag to `true`, `--vm-nativ --vm-native-filter-match='{__name__="vm_app_uptime_seconds"}' \ --vm-native-filter-time-start='2023-02-01T00:00:00Z' \ --vm-native-step-interval=day \ ---vm-intercluster + --vm-intercluster + VictoriaMetrics Native import mode 2023/02/28 10:41:42 Discovering tenants... 2023/02/28 10:41:42 The following tenants were discovered: [0:0 1:0 2:0 3:0 4:0]