app/vmctl: break explore phase in vm-native mode by time intervals

When `--vm-native-step-interval` is specified, explore phase will be executed
within specified intervals. Discovered metric names will be associated with
time intervals at which they were discovered. This suppose to reduce number
of requests vmctl makes per metric name since it will skip time intervals
when metric name didn't exist.

This should also reduce probability of exceeding complexity limits
for number of selected series in one request during explore phase.

https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5369
Signed-off-by: hagen1778 <roman@victoriametrics.com>
This commit is contained in:
Dmytro Kozlov 2024-03-18 12:18:32 +01:00 committed by GitHub
parent e6dd52b04c
commit 5f8b91186a
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 66 additions and 41 deletions

View file

@ -6,6 +6,7 @@ import (
"fmt"
"io"
"net/http"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/auth"
)
@ -34,7 +35,7 @@ type Response struct {
}
// Explore finds metric names by provided filter from api/v1/label/__name__/values
func (c *Client) Explore(ctx context.Context, f Filter, tenantID string) ([]string, error) {
func (c *Client) Explore(ctx context.Context, f Filter, tenantID string, start, end time.Time) ([]string, error) {
url := fmt.Sprintf("%s/%s", c.Addr, nativeMetricNamesAddr)
if tenantID != "" {
url = fmt.Sprintf("%s/select/%s/prometheus/%s", c.Addr, tenantID, nativeMetricNamesAddr)
@ -45,12 +46,8 @@ func (c *Client) Explore(ctx context.Context, f Filter, tenantID string) ([]stri
}
params := req.URL.Query()
if f.TimeStart != "" {
params.Set("start", f.TimeStart)
}
if f.TimeEnd != "" {
params.Set("end", f.TimeEnd)
}
params.Set("start", start.Format(time.RFC3339))
params.Set("end", end.Format(time.RFC3339))
params.Set("match[]", f.Match)
req.URL.RawQuery = params.Encode()
@ -63,11 +60,7 @@ func (c *Client) Explore(ctx context.Context, f Filter, tenantID string) ([]stri
if err := json.NewDecoder(resp.Body).Decode(&response); err != nil {
return nil, fmt.Errorf("cannot decode series response: %s", err)
}
if err := resp.Body.Close(); err != nil {
return nil, fmt.Errorf("cannot close series response body: %s", err)
}
return response.MetricNames, nil
return response.MetricNames, resp.Body.Close()
}
// ImportPipe uses pipe reader in request to process data

View file

@ -1,4 +1,4 @@
package main
package utils
import (
"fmt"
@ -13,7 +13,9 @@ const (
maxTimeMsecs = int64(1<<63-1) / 1e6
)
func parseTime(s string) (time.Time, error) {
// ParseTime parses time in s string and returns time.Time object
// if parse correctly or error if not
func ParseTime(s string) (time.Time, error) {
msecs, err := promutils.ParseTimeMsec(s)
if err != nil {
return time.Time{}, fmt.Errorf("cannot parse %s: %w", s, err)

View file

@ -1,4 +1,4 @@
package main
package utils
import (
"testing"
@ -165,7 +165,7 @@ func TestGetTime(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := parseTime(tt.s)
got, err := ParseTime(tt.s)
if (err != nil) != tt.wantErr {
t.Errorf("ParseTime() error = %v, wantErr %v", err, tt.wantErr)
return

View file

@ -16,6 +16,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/limiter"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/native"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/stepper"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/utils"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/vm"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/searchutils"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
@ -53,14 +54,14 @@ func (p *vmNativeProcessor) run(ctx context.Context) error {
startTime: time.Now(),
}
start, err := parseTime(p.filter.TimeStart)
start, err := utils.ParseTime(p.filter.TimeStart)
if err != nil {
return fmt.Errorf("failed to parse %s, provided: %s, error: %w", vmNativeFilterTimeStart, p.filter.TimeStart, err)
}
end := time.Now().In(start.Location())
if p.filter.TimeEnd != "" {
end, err = parseTime(p.filter.TimeEnd)
end, err = utils.ParseTime(p.filter.TimeEnd)
if err != nil {
return fmt.Errorf("failed to parse %s, provided: %s, error: %w", vmNativeFilterTimeEnd, p.filter.TimeEnd, err)
}
@ -174,28 +175,29 @@ func (p *vmNativeProcessor) runBackfilling(ctx context.Context, tenantID string,
dstURL = fmt.Sprintf("%s/insert/%s/prometheus/%s", p.dst.Addr, tenantID, importAddr)
}
barPrefix := "Requests to make"
initMessage := "Initing import process from %q to %q with filter %s"
initParams := []interface{}{srcURL, dstURL, p.filter.String()}
if p.interCluster {
barPrefix = fmt.Sprintf("Requests to make for tenant %s", tenantID)
initMessage = "Initing import process from %q to %q with filter %s for tenant %s"
initParams = []interface{}{srcURL, dstURL, p.filter.String(), tenantID}
}
fmt.Println("") // extra line for better output formatting
log.Printf(initMessage, initParams...)
var foundSeriesMsg string
metrics := []string{p.filter.Match}
if !p.disablePerMetricRequests {
log.Printf("Exploring metrics...")
metrics, err = p.src.Explore(ctx, p.filter, tenantID)
if err != nil {
return fmt.Errorf("cannot get metrics from source %s: %w", p.src.Addr, err)
if len(ranges) > 1 {
log.Printf("Selected time range will be split into %d ranges according to %q step", len(ranges), p.filter.Chunk)
}
var foundSeriesMsg string
var requestsToMake int
var metrics = map[string][][]time.Time{
"": ranges,
}
if !p.disablePerMetricRequests {
metrics, err = p.explore(ctx, p.src, tenantID, ranges, silent)
if err != nil {
return fmt.Errorf("failed to explore metric names: %s", err)
}
if len(metrics) == 0 {
errMsg := "no metrics found"
if tenantID != "" {
@ -204,7 +206,10 @@ func (p *vmNativeProcessor) runBackfilling(ctx context.Context, tenantID string,
log.Println(errMsg)
return nil
}
foundSeriesMsg = fmt.Sprintf("Found %d metrics to import", len(metrics))
for _, m := range metrics {
requestsToMake += len(m)
}
foundSeriesMsg = fmt.Sprintf("Found %d unique metric names to import. Total import/export requests to make %d", len(metrics), requestsToMake)
}
if !p.interCluster {
@ -218,15 +223,13 @@ func (p *vmNativeProcessor) runBackfilling(ctx context.Context, tenantID string,
log.Print(foundSeriesMsg)
}
processingMsg := fmt.Sprintf("Requests to make: %d", len(metrics)*len(ranges))
if len(ranges) > 1 {
processingMsg = fmt.Sprintf("Selected time range will be split into %d ranges according to %q step. %s", len(ranges), p.filter.Chunk, processingMsg)
}
log.Print(processingMsg)
var bar *pb.ProgressBar
barPrefix := "Requests to make"
if p.interCluster {
barPrefix = fmt.Sprintf("Requests to make for tenant %s", tenantID)
}
if !silent {
bar = barpool.NewSingleProgress(fmt.Sprintf(nativeWithBackoffTpl, barPrefix), len(metrics)*len(ranges))
bar = barpool.NewSingleProgress(fmt.Sprintf(nativeWithBackoffTpl, barPrefix), requestsToMake)
if p.disablePerMetricRequests {
bar = barpool.NewSingleProgress(nativeSingleProcessTpl, 0)
}
@ -262,20 +265,19 @@ func (p *vmNativeProcessor) runBackfilling(ctx context.Context, tenantID string,
}
// any error breaks the import
for _, s := range metrics {
match, err := buildMatchWithFilter(p.filter.Match, s)
for mName, mRanges := range metrics {
match, err := buildMatchWithFilter(p.filter.Match, mName)
if err != nil {
logger.Errorf("failed to build export filters: %s", err)
logger.Errorf("failed to build filter %q for metric name %q: %s", p.filter.Match, mName, err)
continue
}
for _, times := range ranges {
for _, times := range mRanges {
select {
case <-ctx.Done():
return fmt.Errorf("context canceled")
case infErr := <-errCh:
return fmt.Errorf("native error: %s", infErr)
return fmt.Errorf("export/import error: %s", infErr)
case filterCh <- native.Filter{
Match: match,
TimeStart: times[0].Format(time.RFC3339),
@ -296,6 +298,32 @@ func (p *vmNativeProcessor) runBackfilling(ctx context.Context, tenantID string,
return nil
}
func (p *vmNativeProcessor) explore(ctx context.Context, src *native.Client, tenantID string, ranges [][]time.Time, silent bool) (map[string][][]time.Time, error) {
log.Printf("Exploring metrics...")
var bar *pb.ProgressBar
if !silent {
bar = barpool.NewSingleProgress(fmt.Sprintf(nativeWithBackoffTpl, "Explore requests to make"), len(ranges))
bar.Start()
defer bar.Finish()
}
metrics := make(map[string][][]time.Time)
for _, r := range ranges {
ms, err := src.Explore(ctx, p.filter, tenantID, r[0], r[1])
if err != nil {
return nil, fmt.Errorf("cannot get metrics from %s on interval %v-%v: %w", src.Addr, r[0], r[1], err)
}
for i := range ms {
metrics[ms[i]] = append(metrics[ms[i]], r)
}
if bar != nil {
bar.Increment()
}
}
return metrics, nil
}
// stats represents client statistic
// when processing data
type stats struct {

View file

@ -55,6 +55,7 @@ See also [LTS releases](https://docs.victoriametrics.com/lts-releases/).
* FEATURE: optimize [`/api/v1/labels`](https://docs.victoriametrics.com/url-examples/#apiv1labels) and [`/api/v1/label/.../values`](https://docs.victoriametrics.com/url-examples/#apiv1labelvalues) when `match[]` filters contains metric name. For example, `/api/v1/label/instance/values?match[]=up` now works much faster than before. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5055).
* FEATURE: [vmctl](https://docs.victoriametrics.com/vmctl.html): support client-side TLS configuration for [native protocol](https://docs.victoriametrics.com/vmctl/#migrating-data-from-victoriametrics). See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5748). Thanks to @khushijain21 for the [pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5824).
* FEATURE: [vmctl](https://docs.victoriametrics.com/vmctl.html): support client-side TLS configuration for VictoriaMetrics destination specified via `--vm-*` cmd-line flags used in [InfluxDB](https://docs.victoriametrics.com/vmctl/#migrating-data-from-influxdb-1x), [Remote Read protocol](https://docs.victoriametrics.com/vmctl/#migrating-data-by-remote-read-protocol), [OpenTSDB](https://docs.victoriametrics.com/vmctl/#migrating-data-from-opentsdb), [Prometheus](https://docs.victoriametrics.com/vmctl/#migrating-data-from-prometheus) and [Promscale](https://docs.victoriametrics.com/vmctl/#migrating-data-from-promscale) migration modes.
* FEATURE: [vmctl](https://docs.victoriametrics.com/vmctl.html): split [explore phase](https://docs.victoriametrics.com/vmctl/#migrating-data-from-victoriametrics) in `vm-native` mode by time intervals when [--vm-native-step-interval](https://docs.victoriametrics.com/vmctl/#using-time-based-chunking-of-migration) is specified. This should reduce probability of exceeding complexity limits for number of selected series during explore phase. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5369).
* BUGFIX: prevent from automatic deletion of newly registered time series when it is queried immediately after the addition. The probability of this bug has been increased significantly after [v1.99.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.99.0) because of optimizations related to registering new time series. See [this](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5948) and [this](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5959) issue.
* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): properly set `Host` header in requests to scrape targets if it is specified via [`headers` option](https://docs.victoriametrics.com/sd_configs/#http-api-client-options). Thanks to @fholzer for [the bugreport](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5969) and [the fix](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5970).

View file

@ -942,7 +942,8 @@ To use this mode you need to set `--vm-intercluster` flag to `true`, `--vm-nativ
--vm-native-filter-match='{__name__="vm_app_uptime_seconds"}' \
--vm-native-filter-time-start='2023-02-01T00:00:00Z' \
--vm-native-step-interval=day \
--vm-intercluster
--vm-intercluster
VictoriaMetrics Native import mode
2023/02/28 10:41:42 Discovering tenants...
2023/02/28 10:41:42 The following tenants were discovered: [0:0 1:0 2:0 3:0 4:0]