From 5f8b91186a0326df5bc7cf19798fa8d07b41b2db Mon Sep 17 00:00:00 2001 From: Dmytro Kozlov Date: Mon, 18 Mar 2024 12:18:32 +0100 Subject: [PATCH] app/vmctl: break explore phase in `vm-native` mode by time intervals When `--vm-native-step-interval` is specified, explore phase will be executed within specified intervals. Discovered metric names will be associated with time intervals at which they were discovered. This suppose to reduce number of requests vmctl makes per metric name since it will skip time intervals when metric name didn't exist. This should also reduce probability of exceeding complexity limits for number of selected series in one request during explore phase. https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5369 Signed-off-by: hagen1778 --- app/vmctl/native/client.go | 17 ++----- app/vmctl/{ => utils}/time.go | 6 ++- app/vmctl/{ => utils}/time_test.go | 4 +- app/vmctl/vm_native.go | 76 ++++++++++++++++++++---------- docs/CHANGELOG.md | 1 + docs/vmctl.md | 3 +- 6 files changed, 66 insertions(+), 41 deletions(-) rename app/vmctl/{ => utils}/time.go (79%) rename app/vmctl/{ => utils}/time_test.go (98%) diff --git a/app/vmctl/native/client.go b/app/vmctl/native/client.go index bd22bc6b7..2ed491ec0 100644 --- a/app/vmctl/native/client.go +++ b/app/vmctl/native/client.go @@ -6,6 +6,7 @@ import ( "fmt" "io" "net/http" + "time" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/auth" ) @@ -34,7 +35,7 @@ type Response struct { } // Explore finds metric names by provided filter from api/v1/label/__name__/values -func (c *Client) Explore(ctx context.Context, f Filter, tenantID string) ([]string, error) { +func (c *Client) Explore(ctx context.Context, f Filter, tenantID string, start, end time.Time) ([]string, error) { url := fmt.Sprintf("%s/%s", c.Addr, nativeMetricNamesAddr) if tenantID != "" { url = fmt.Sprintf("%s/select/%s/prometheus/%s", c.Addr, tenantID, nativeMetricNamesAddr) @@ -45,12 +46,8 @@ func (c *Client) Explore(ctx context.Context, f Filter, tenantID string) ([]stri } params := req.URL.Query() - if f.TimeStart != "" { - params.Set("start", f.TimeStart) - } - if f.TimeEnd != "" { - params.Set("end", f.TimeEnd) - } + params.Set("start", start.Format(time.RFC3339)) + params.Set("end", end.Format(time.RFC3339)) params.Set("match[]", f.Match) req.URL.RawQuery = params.Encode() @@ -63,11 +60,7 @@ func (c *Client) Explore(ctx context.Context, f Filter, tenantID string) ([]stri if err := json.NewDecoder(resp.Body).Decode(&response); err != nil { return nil, fmt.Errorf("cannot decode series response: %s", err) } - - if err := resp.Body.Close(); err != nil { - return nil, fmt.Errorf("cannot close series response body: %s", err) - } - return response.MetricNames, nil + return response.MetricNames, resp.Body.Close() } // ImportPipe uses pipe reader in request to process data diff --git a/app/vmctl/time.go b/app/vmctl/utils/time.go similarity index 79% rename from app/vmctl/time.go rename to app/vmctl/utils/time.go index e4c5f03ff..069dfb054 100644 --- a/app/vmctl/time.go +++ b/app/vmctl/utils/time.go @@ -1,4 +1,4 @@ -package main +package utils import ( "fmt" @@ -13,7 +13,9 @@ const ( maxTimeMsecs = int64(1<<63-1) / 1e6 ) -func parseTime(s string) (time.Time, error) { +// ParseTime parses time in s string and returns time.Time object +// if parse correctly or error if not +func ParseTime(s string) (time.Time, error) { msecs, err := promutils.ParseTimeMsec(s) if err != nil { return time.Time{}, fmt.Errorf("cannot parse %s: %w", s, err) diff --git a/app/vmctl/time_test.go b/app/vmctl/utils/time_test.go similarity index 98% rename from app/vmctl/time_test.go rename to app/vmctl/utils/time_test.go index 989162078..95f5fbdf4 100644 --- a/app/vmctl/time_test.go +++ b/app/vmctl/utils/time_test.go @@ -1,4 +1,4 @@ -package main +package utils import ( "testing" @@ -165,7 +165,7 @@ func TestGetTime(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - got, err := parseTime(tt.s) + got, err := ParseTime(tt.s) if (err != nil) != tt.wantErr { t.Errorf("ParseTime() error = %v, wantErr %v", err, tt.wantErr) return diff --git a/app/vmctl/vm_native.go b/app/vmctl/vm_native.go index 31375ae3d..bbdbe5f63 100644 --- a/app/vmctl/vm_native.go +++ b/app/vmctl/vm_native.go @@ -16,6 +16,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/limiter" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/native" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/stepper" + "github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/utils" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/vm" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/searchutils" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" @@ -53,14 +54,14 @@ func (p *vmNativeProcessor) run(ctx context.Context) error { startTime: time.Now(), } - start, err := parseTime(p.filter.TimeStart) + start, err := utils.ParseTime(p.filter.TimeStart) if err != nil { return fmt.Errorf("failed to parse %s, provided: %s, error: %w", vmNativeFilterTimeStart, p.filter.TimeStart, err) } end := time.Now().In(start.Location()) if p.filter.TimeEnd != "" { - end, err = parseTime(p.filter.TimeEnd) + end, err = utils.ParseTime(p.filter.TimeEnd) if err != nil { return fmt.Errorf("failed to parse %s, provided: %s, error: %w", vmNativeFilterTimeEnd, p.filter.TimeEnd, err) } @@ -174,28 +175,29 @@ func (p *vmNativeProcessor) runBackfilling(ctx context.Context, tenantID string, dstURL = fmt.Sprintf("%s/insert/%s/prometheus/%s", p.dst.Addr, tenantID, importAddr) } - barPrefix := "Requests to make" initMessage := "Initing import process from %q to %q with filter %s" initParams := []interface{}{srcURL, dstURL, p.filter.String()} if p.interCluster { - barPrefix = fmt.Sprintf("Requests to make for tenant %s", tenantID) initMessage = "Initing import process from %q to %q with filter %s for tenant %s" initParams = []interface{}{srcURL, dstURL, p.filter.String(), tenantID} } fmt.Println("") // extra line for better output formatting log.Printf(initMessage, initParams...) + if len(ranges) > 1 { + log.Printf("Selected time range will be split into %d ranges according to %q step", len(ranges), p.filter.Chunk) + } var foundSeriesMsg string - - metrics := []string{p.filter.Match} + var requestsToMake int + var metrics = map[string][][]time.Time{ + "": ranges, + } if !p.disablePerMetricRequests { - log.Printf("Exploring metrics...") - metrics, err = p.src.Explore(ctx, p.filter, tenantID) + metrics, err = p.explore(ctx, p.src, tenantID, ranges, silent) if err != nil { - return fmt.Errorf("cannot get metrics from source %s: %w", p.src.Addr, err) + return fmt.Errorf("failed to explore metric names: %s", err) } - if len(metrics) == 0 { errMsg := "no metrics found" if tenantID != "" { @@ -204,7 +206,10 @@ func (p *vmNativeProcessor) runBackfilling(ctx context.Context, tenantID string, log.Println(errMsg) return nil } - foundSeriesMsg = fmt.Sprintf("Found %d metrics to import", len(metrics)) + for _, m := range metrics { + requestsToMake += len(m) + } + foundSeriesMsg = fmt.Sprintf("Found %d unique metric names to import. Total import/export requests to make %d", len(metrics), requestsToMake) } if !p.interCluster { @@ -218,15 +223,13 @@ func (p *vmNativeProcessor) runBackfilling(ctx context.Context, tenantID string, log.Print(foundSeriesMsg) } - processingMsg := fmt.Sprintf("Requests to make: %d", len(metrics)*len(ranges)) - if len(ranges) > 1 { - processingMsg = fmt.Sprintf("Selected time range will be split into %d ranges according to %q step. %s", len(ranges), p.filter.Chunk, processingMsg) - } - log.Print(processingMsg) - var bar *pb.ProgressBar + barPrefix := "Requests to make" + if p.interCluster { + barPrefix = fmt.Sprintf("Requests to make for tenant %s", tenantID) + } if !silent { - bar = barpool.NewSingleProgress(fmt.Sprintf(nativeWithBackoffTpl, barPrefix), len(metrics)*len(ranges)) + bar = barpool.NewSingleProgress(fmt.Sprintf(nativeWithBackoffTpl, barPrefix), requestsToMake) if p.disablePerMetricRequests { bar = barpool.NewSingleProgress(nativeSingleProcessTpl, 0) } @@ -262,20 +265,19 @@ func (p *vmNativeProcessor) runBackfilling(ctx context.Context, tenantID string, } // any error breaks the import - for _, s := range metrics { - - match, err := buildMatchWithFilter(p.filter.Match, s) + for mName, mRanges := range metrics { + match, err := buildMatchWithFilter(p.filter.Match, mName) if err != nil { - logger.Errorf("failed to build export filters: %s", err) + logger.Errorf("failed to build filter %q for metric name %q: %s", p.filter.Match, mName, err) continue } - for _, times := range ranges { + for _, times := range mRanges { select { case <-ctx.Done(): return fmt.Errorf("context canceled") case infErr := <-errCh: - return fmt.Errorf("native error: %s", infErr) + return fmt.Errorf("export/import error: %s", infErr) case filterCh <- native.Filter{ Match: match, TimeStart: times[0].Format(time.RFC3339), @@ -296,6 +298,32 @@ func (p *vmNativeProcessor) runBackfilling(ctx context.Context, tenantID string, return nil } +func (p *vmNativeProcessor) explore(ctx context.Context, src *native.Client, tenantID string, ranges [][]time.Time, silent bool) (map[string][][]time.Time, error) { + log.Printf("Exploring metrics...") + + var bar *pb.ProgressBar + if !silent { + bar = barpool.NewSingleProgress(fmt.Sprintf(nativeWithBackoffTpl, "Explore requests to make"), len(ranges)) + bar.Start() + defer bar.Finish() + } + + metrics := make(map[string][][]time.Time) + for _, r := range ranges { + ms, err := src.Explore(ctx, p.filter, tenantID, r[0], r[1]) + if err != nil { + return nil, fmt.Errorf("cannot get metrics from %s on interval %v-%v: %w", src.Addr, r[0], r[1], err) + } + for i := range ms { + metrics[ms[i]] = append(metrics[ms[i]], r) + } + if bar != nil { + bar.Increment() + } + } + return metrics, nil +} + // stats represents client statistic // when processing data type stats struct { diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 50434078c..039bb1139 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -55,6 +55,7 @@ See also [LTS releases](https://docs.victoriametrics.com/lts-releases/). * FEATURE: optimize [`/api/v1/labels`](https://docs.victoriametrics.com/url-examples/#apiv1labels) and [`/api/v1/label/.../values`](https://docs.victoriametrics.com/url-examples/#apiv1labelvalues) when `match[]` filters contains metric name. For example, `/api/v1/label/instance/values?match[]=up` now works much faster than before. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5055). * FEATURE: [vmctl](https://docs.victoriametrics.com/vmctl.html): support client-side TLS configuration for [native protocol](https://docs.victoriametrics.com/vmctl/#migrating-data-from-victoriametrics). See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5748). Thanks to @khushijain21 for the [pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5824). * FEATURE: [vmctl](https://docs.victoriametrics.com/vmctl.html): support client-side TLS configuration for VictoriaMetrics destination specified via `--vm-*` cmd-line flags used in [InfluxDB](https://docs.victoriametrics.com/vmctl/#migrating-data-from-influxdb-1x), [Remote Read protocol](https://docs.victoriametrics.com/vmctl/#migrating-data-by-remote-read-protocol), [OpenTSDB](https://docs.victoriametrics.com/vmctl/#migrating-data-from-opentsdb), [Prometheus](https://docs.victoriametrics.com/vmctl/#migrating-data-from-prometheus) and [Promscale](https://docs.victoriametrics.com/vmctl/#migrating-data-from-promscale) migration modes. +* FEATURE: [vmctl](https://docs.victoriametrics.com/vmctl.html): split [explore phase](https://docs.victoriametrics.com/vmctl/#migrating-data-from-victoriametrics) in `vm-native` mode by time intervals when [--vm-native-step-interval](https://docs.victoriametrics.com/vmctl/#using-time-based-chunking-of-migration) is specified. This should reduce probability of exceeding complexity limits for number of selected series during explore phase. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5369). * BUGFIX: prevent from automatic deletion of newly registered time series when it is queried immediately after the addition. The probability of this bug has been increased significantly after [v1.99.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.99.0) because of optimizations related to registering new time series. See [this](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5948) and [this](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5959) issue. * BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): properly set `Host` header in requests to scrape targets if it is specified via [`headers` option](https://docs.victoriametrics.com/sd_configs/#http-api-client-options). Thanks to @fholzer for [the bugreport](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5969) and [the fix](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5970). diff --git a/docs/vmctl.md b/docs/vmctl.md index 885635c87..79e4c948f 100644 --- a/docs/vmctl.md +++ b/docs/vmctl.md @@ -942,7 +942,8 @@ To use this mode you need to set `--vm-intercluster` flag to `true`, `--vm-nativ --vm-native-filter-match='{__name__="vm_app_uptime_seconds"}' \ --vm-native-filter-time-start='2023-02-01T00:00:00Z' \ --vm-native-step-interval=day \ ---vm-intercluster + --vm-intercluster + VictoriaMetrics Native import mode 2023/02/28 10:41:42 Discovering tenants... 2023/02/28 10:41:42 The following tenants were discovered: [0:0 1:0 2:0 3:0 4:0]