diff --git a/app/vmctl/barpool/pool.go b/app/vmctl/barpool/pool.go index 05fe8b418..deb20b4fb 100644 --- a/app/vmctl/barpool/pool.go +++ b/app/vmctl/barpool/pool.go @@ -5,37 +5,88 @@ package barpool import ( "fmt" + "io" "os" - "github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/terminal" "github.com/cheggaaa/pb/v3" + + "github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/terminal" ) +var isDisabled bool + +// Disable sets progress bar to be no-op if v==true +func Disable(v bool) { + isDisabled = v +} + var pool = pb.NewPool() -// Add adds bar to the global pool -func Add(bar *pb.ProgressBar) { pool.Add(bar) } +// Bar is an interface for progress bar +type Bar interface { + Add(value int) + Increment() + Start() + Finish() + NewProxyReader(r io.Reader) *pb.Reader +} + +type progressBar struct { + *pb.ProgressBar +} + +func (pb *progressBar) Finish() { pb.ProgressBar.Finish() } +func (pb *progressBar) Start() { pb.ProgressBar.Start() } +func (pb *progressBar) Add(value int) { pb.ProgressBar.Add(value) } +func (pb *progressBar) Increment() { pb.ProgressBar.Increment() } +func (pb *progressBar) NewProxyReader(r io.Reader) *pb.Reader { + return pb.ProgressBar.NewProxyReader(r) +} + +type progressBarNoOp struct{} + +func (pbno *progressBarNoOp) Finish() {} +func (pbno *progressBarNoOp) Start() {} +func (pbno *progressBarNoOp) Add(int) {} +func (pbno *progressBarNoOp) Increment() {} +func (pbno *progressBarNoOp) NewProxyReader(_ io.Reader) *pb.Reader { return nil } // Start starts the global pool // Must be called after all progress bars were added -func Start() error { return pool.Start() } +func Start() error { + if isDisabled { + return nil + } + return pool.Start() +} // Stop stops the global pool -func Stop() { _ = pool.Stop() } +func Stop() { + if isDisabled { + return + } + _ = pool.Stop() +} // AddWithTemplate adds bar with the given template // to the global pool -func AddWithTemplate(format string, total int) *pb.ProgressBar { +func AddWithTemplate(format string, total int) Bar { + if isDisabled { + return &progressBarNoOp{} + } tpl := getTemplate(format) bar := pb.ProgressBarTemplate(tpl).New(total) - Add(bar) - return bar + pool.Add(bar) + return &progressBar{bar} } // NewSingleProgress returns progress bar with given template -func NewSingleProgress(format string, total int) *pb.ProgressBar { +func NewSingleProgress(format string, total int) Bar { + if isDisabled { + return &progressBarNoOp{} + } tpl := getTemplate(format) - return pb.ProgressBarTemplate(tpl).New(total) + return &progressBar{pb.ProgressBarTemplate(tpl).New(total)} } func getTemplate(format string) string { diff --git a/app/vmctl/flags.go b/app/vmctl/flags.go index 51146880e..a9987fcb1 100644 --- a/app/vmctl/flags.go +++ b/app/vmctl/flags.go @@ -10,8 +10,9 @@ import ( ) const ( - globalSilent = "s" - globalVerbose = "verbose" + globalSilent = "s" + globalVerbose = "verbose" + globalDisableProgressBar = "disable-progress-bar" ) var ( @@ -26,6 +27,11 @@ var ( Value: false, Usage: "Whether to enable verbosity in logs output.", }, + &cli.BoolFlag{ + Name: globalDisableProgressBar, + Value: false, + Usage: "Whether to disable progress bar during the import.", + }, } ) @@ -122,7 +128,7 @@ var ( }, &cli.BoolFlag{ Name: vmDisableProgressBar, - Usage: "Whether to disable progress bar per each worker during the import.", + Usage: "Whether to disable progress bar per each worker during the import. This flag is deprecated. Use global flag instead.", }, &cli.StringFlag{ Name: vmCertFile, diff --git a/app/vmctl/influx.go b/app/vmctl/influx.go index 41cf74083..eeb91e820 100644 --- a/app/vmctl/influx.go +++ b/app/vmctl/influx.go @@ -18,14 +18,14 @@ type influxProcessor struct { separator string skipDbLabel bool promMode bool - isSilent bool isVerbose bool } -func newInfluxProcessor(ic *influx.Client, im *vm.Importer, cc int, separator string, skipDbLabel, promMode, silent, verbose bool) *influxProcessor { +func newInfluxProcessor(ic *influx.Client, im *vm.Importer, cc int, separator string, skipDbLabel, promMode, verbose bool) *influxProcessor { if cc < 1 { cc = 1 } + return &influxProcessor{ ic: ic, im: im, @@ -33,7 +33,6 @@ func newInfluxProcessor(ic *influx.Client, im *vm.Importer, cc int, separator st separator: separator, skipDbLabel: skipDbLabel, promMode: promMode, - isSilent: silent, isVerbose: verbose, } } @@ -48,7 +47,7 @@ func (ip *influxProcessor) run() error { } question := fmt.Sprintf("Found %d timeseries to import. Continue?", len(series)) - if !ip.isSilent && !prompt(question) { + if !prompt(question) { return nil } diff --git a/app/vmctl/main.go b/app/vmctl/main.go index 10ca56d35..136c186a9 100644 --- a/app/vmctl/main.go +++ b/app/vmctl/main.go @@ -16,6 +16,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/auth" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/backoff" + "github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/barpool" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/native" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/remoteread" @@ -37,15 +38,23 @@ func main() { ctx, cancelCtx := context.WithCancel(context.Background()) start := time.Now() + beforeFn := func(c *cli.Context) error { + isSilent = c.Bool(globalSilent) + if c.Bool(globalDisableProgressBar) { + barpool.Disable(true) + } + return nil + } app := &cli.App{ Name: "vmctl", Usage: "VictoriaMetrics command-line tool", Version: buildinfo.Version, Commands: []*cli.Command{ { - Name: "opentsdb", - Usage: "Migrate time series from OpenTSDB", - Flags: mergeFlags(globalFlags, otsdbFlags, vmFlags), + Name: "opentsdb", + Usage: "Migrate time series from OpenTSDB", + Flags: mergeFlags(globalFlags, otsdbFlags, vmFlags), + Before: beforeFn, Action: func(c *cli.Context) error { fmt.Println("OpenTSDB import mode") @@ -81,22 +90,20 @@ func main() { if err != nil { return fmt.Errorf("failed to init VM configuration: %s", err) } - // disable progress bars since openTSDB implementation - // does not use progress bar pool - vmCfg.DisableProgressBar = true importer, err := vm.NewImporter(ctx, vmCfg) if err != nil { return fmt.Errorf("failed to create VM importer: %s", err) } - otsdbProcessor := newOtsdbProcessor(otsdbClient, importer, c.Int(otsdbConcurrency), c.Bool(globalSilent), c.Bool(globalVerbose)) + otsdbProcessor := newOtsdbProcessor(otsdbClient, importer, c.Int(otsdbConcurrency), c.Bool(globalVerbose)) return otsdbProcessor.run() }, }, { - Name: "influx", - Usage: "Migrate time series from InfluxDB", - Flags: mergeFlags(globalFlags, influxFlags, vmFlags), + Name: "influx", + Usage: "Migrate time series from InfluxDB", + Flags: mergeFlags(globalFlags, influxFlags, vmFlags), + Before: beforeFn, Action: func(c *cli.Context) error { fmt.Println("InfluxDB import mode") @@ -148,15 +155,15 @@ func main() { c.String(influxMeasurementFieldSeparator), c.Bool(influxSkipDatabaseLabel), c.Bool(influxPrometheusMode), - c.Bool(globalSilent), c.Bool(globalVerbose)) return processor.run() }, }, { - Name: "remote-read", - Usage: "Migrate time series via Prometheus remote-read protocol", - Flags: mergeFlags(globalFlags, remoteReadFlags, vmFlags), + Name: "remote-read", + Usage: "Migrate time series via Prometheus remote-read protocol", + Flags: mergeFlags(globalFlags, remoteReadFlags, vmFlags), + Before: beforeFn, Action: func(c *cli.Context) error { fmt.Println("Remote-read import mode") @@ -209,16 +216,16 @@ func main() { timeReverse: c.Bool(remoteReadFilterTimeReverse), }, cc: c.Int(remoteReadConcurrency), - isSilent: c.Bool(globalSilent), isVerbose: c.Bool(globalVerbose), } return rmp.run(ctx) }, }, { - Name: "prometheus", - Usage: "Migrate time series from Prometheus", - Flags: mergeFlags(globalFlags, promFlags, vmFlags), + Name: "prometheus", + Usage: "Migrate time series from Prometheus", + Flags: mergeFlags(globalFlags, promFlags, vmFlags), + Before: beforeFn, Action: func(c *cli.Context) error { fmt.Println("Prometheus import mode") @@ -245,17 +252,19 @@ func main() { return fmt.Errorf("failed to create prometheus client: %s", err) } pp := prometheusProcessor{ - cl: cl, - im: importer, - cc: c.Int(promConcurrency), + cl: cl, + im: importer, + cc: c.Int(promConcurrency), + isVerbose: c.Bool(globalVerbose), } - return pp.run(c.Bool(globalSilent), c.Bool(globalVerbose)) + return pp.run() }, }, { - Name: "vm-native", - Usage: "Migrate time series between VictoriaMetrics installations via native binary format", - Flags: mergeFlags(globalFlags, vmNativeFlags), + Name: "vm-native", + Usage: "Migrate time series between VictoriaMetrics installations via native binary format", + Flags: mergeFlags(globalFlags, vmNativeFlags), + Before: beforeFn, Action: func(c *cli.Context) error { fmt.Println("VictoriaMetrics Native import mode") @@ -344,7 +353,6 @@ func main() { backoff: backoff.New(), cc: c.Int(vmConcurrency), disablePerMetricRequests: c.Bool(vmNativeDisablePerMetricMigration), - isSilent: c.Bool(globalSilent), isNative: !c.Bool(vmNativeDisableBinaryProtocol), } return p.run(ctx) @@ -360,6 +368,7 @@ func main() { Value: false, }, }, + Before: beforeFn, Action: func(c *cli.Context) error { common.StartUnmarshalWorkers() blockPath := c.Args().First() @@ -433,6 +442,5 @@ func initConfigVM(c *cli.Context) (vm.Config, error) { RoundDigits: c.Int(vmRoundDigits), ExtraLabels: c.StringSlice(vmExtraLabel), RateLimit: c.Int64(vmRateLimit), - DisableProgressBar: c.Bool(vmDisableProgressBar), }, nil } diff --git a/app/vmctl/opentsdb.go b/app/vmctl/opentsdb.go index 07fc3a6d5..437da9e69 100644 --- a/app/vmctl/opentsdb.go +++ b/app/vmctl/opentsdb.go @@ -15,7 +15,6 @@ type otsdbProcessor struct { oc *opentsdb.Client im *vm.Importer otsdbcc int - isSilent bool isVerbose bool } @@ -26,7 +25,7 @@ type queryObj struct { StartTime int64 } -func newOtsdbProcessor(oc *opentsdb.Client, im *vm.Importer, otsdbcc int, silent, verbose bool) *otsdbProcessor { +func newOtsdbProcessor(oc *opentsdb.Client, im *vm.Importer, otsdbcc int, verbose bool) *otsdbProcessor { if otsdbcc < 1 { otsdbcc = 1 } @@ -34,7 +33,6 @@ func newOtsdbProcessor(oc *opentsdb.Client, im *vm.Importer, otsdbcc int, silent oc: oc, im: im, otsdbcc: otsdbcc, - isSilent: silent, isVerbose: verbose, } } @@ -55,7 +53,7 @@ func (op *otsdbProcessor) run() error { } question := fmt.Sprintf("Found %d metrics to import. Continue?", len(metrics)) - if !op.isSilent && !prompt(question) { + if !prompt(question) { return nil } op.im.ResetStats() diff --git a/app/vmctl/prometheus.go b/app/vmctl/prometheus.go index 4b18bff27..eba010070 100644 --- a/app/vmctl/prometheus.go +++ b/app/vmctl/prometheus.go @@ -5,11 +5,12 @@ import ( "log" "sync" + "github.com/prometheus/prometheus/tsdb" + "github.com/prometheus/prometheus/tsdb/chunkenc" + "github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/barpool" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/prometheus" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/vm" - "github.com/prometheus/prometheus/tsdb" - "github.com/prometheus/prometheus/tsdb/chunkenc" ) type prometheusProcessor struct { @@ -24,9 +25,12 @@ type prometheusProcessor struct { // and defines number of concurrently // running snapshot block readers cc int + + // isVerbose enables verbose output + isVerbose bool } -func (pp *prometheusProcessor) run(silent, verbose bool) error { +func (pp *prometheusProcessor) run() error { blocks, err := pp.cl.Explore() if err != nil { return fmt.Errorf("explore failed: %s", err) @@ -35,12 +39,11 @@ func (pp *prometheusProcessor) run(silent, verbose bool) error { return fmt.Errorf("found no blocks to import") } question := fmt.Sprintf("Found %d blocks to import. Continue?", len(blocks)) - if !silent && !prompt(question) { + if !prompt(question) { return nil } bar := barpool.AddWithTemplate(fmt.Sprintf(barTpl, "Processing blocks"), len(blocks)) - if err := barpool.Start(); err != nil { return err } @@ -72,7 +75,7 @@ func (pp *prometheusProcessor) run(silent, verbose bool) error { return fmt.Errorf("prometheus error: %s", promErr) case vmErr := <-pp.im.Errors(): close(blockReadersCh) - return fmt.Errorf("import process failed: %s", wrapErr(vmErr, verbose)) + return fmt.Errorf("import process failed: %s", wrapErr(vmErr, pp.isVerbose)) case blockReadersCh <- br: } } @@ -85,7 +88,7 @@ func (pp *prometheusProcessor) run(silent, verbose bool) error { // drain import errors channel for vmErr := range pp.im.Errors() { if vmErr.Err != nil { - return fmt.Errorf("import process failed: %s", wrapErr(vmErr, verbose)) + return fmt.Errorf("import process failed: %s", wrapErr(vmErr, pp.isVerbose)) } } for err := range errCh { diff --git a/app/vmctl/prometheus_test.go b/app/vmctl/prometheus_test.go index dbf74eda5..13281d025 100644 --- a/app/vmctl/prometheus_test.go +++ b/app/vmctl/prometheus_test.go @@ -25,6 +25,9 @@ const ( // This test simulates close process if user abort it func Test_prometheusProcessor_run(t *testing.T) { t.Skip() + + defer func() { isSilent = false }() + type fields struct { cfg prometheus.Config vmCfg vm.Config @@ -117,11 +120,12 @@ func Test_prometheusProcessor_run(t *testing.T) { t.Run(tt.name, func(t *testing.T) { client := tt.fields.cl(tt.fields.cfg) importer := tt.fields.im(tt.fields.vmCfg) - + isSilent = tt.args.silent pp := &prometheusProcessor{ - cl: client, - im: importer, - cc: tt.fields.cc, + cl: client, + im: importer, + cc: tt.fields.cc, + isVerbose: tt.args.verbose, } // we should answer on prompt @@ -157,7 +161,7 @@ func Test_prometheusProcessor_run(t *testing.T) { go tt.fields.closer(importer) } - if err := pp.run(tt.args.silent, tt.args.verbose); (err != nil) != tt.wantErr { + if err := pp.run(); (err != nil) != tt.wantErr { t.Errorf("run() error = %v, wantErr %v", err, tt.wantErr) } }) diff --git a/app/vmctl/remote_read_test.go b/app/vmctl/remote_read_test.go index 759331317..c5a69a61c 100644 --- a/app/vmctl/remote_read_test.go +++ b/app/vmctl/remote_read_test.go @@ -6,14 +6,21 @@ import ( "testing" "time" + "github.com/prometheus/prometheus/prompb" + + "github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/barpool" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/remoteread" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/stepper" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/testdata/servers_integration_test" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/vm" - "github.com/prometheus/prometheus/prompb" ) func TestRemoteRead(t *testing.T) { + barpool.Disable(true) + defer func() { + barpool.Disable(false) + }() + defer func() { isSilent = false }() var testCases = []struct { name string @@ -31,7 +38,7 @@ func TestRemoteRead(t *testing.T) { { name: "step minute on minute time range", remoteReadConfig: remoteread.Config{Addr: "", LabelName: "__name__", LabelValue: ".*"}, - vmCfg: vm.Config{Addr: "", Concurrency: 1, DisableProgressBar: true}, + vmCfg: vm.Config{Addr: "", Concurrency: 1}, start: "2022-11-26T11:23:05+02:00", end: "2022-11-26T11:24:05+02:00", numOfSamples: 2, @@ -62,7 +69,7 @@ func TestRemoteRead(t *testing.T) { { name: "step month on month time range", remoteReadConfig: remoteread.Config{Addr: "", LabelName: "__name__", LabelValue: ".*"}, - vmCfg: vm.Config{Addr: "", Concurrency: 1, DisableProgressBar: true, + vmCfg: vm.Config{Addr: "", Concurrency: 1, Transport: http.DefaultTransport.(*http.Transport)}, start: "2022-09-26T11:23:05+02:00", end: "2022-11-26T11:24:05+02:00", @@ -157,7 +164,6 @@ func TestRemoteRead(t *testing.T) { chunk: tt.chunk, }, cc: 1, - isSilent: true, isVerbose: false, } @@ -170,6 +176,11 @@ func TestRemoteRead(t *testing.T) { } func TestSteamRemoteRead(t *testing.T) { + barpool.Disable(true) + defer func() { + barpool.Disable(false) + }() + defer func() { isSilent = false }() var testCases = []struct { name string @@ -187,7 +198,7 @@ func TestSteamRemoteRead(t *testing.T) { { name: "step minute on minute time range", remoteReadConfig: remoteread.Config{Addr: "", LabelName: "__name__", LabelValue: ".*", UseStream: true}, - vmCfg: vm.Config{Addr: "", Concurrency: 1, DisableProgressBar: true}, + vmCfg: vm.Config{Addr: "", Concurrency: 1}, start: "2022-11-26T11:23:05+02:00", end: "2022-11-26T11:24:05+02:00", numOfSamples: 2, @@ -218,7 +229,7 @@ func TestSteamRemoteRead(t *testing.T) { { name: "step month on month time range", remoteReadConfig: remoteread.Config{Addr: "", LabelName: "__name__", LabelValue: ".*", UseStream: true}, - vmCfg: vm.Config{Addr: "", Concurrency: 1, DisableProgressBar: true}, + vmCfg: vm.Config{Addr: "", Concurrency: 1}, start: "2022-09-26T11:23:05+02:00", end: "2022-11-26T11:24:05+02:00", numOfSamples: 2, @@ -312,7 +323,6 @@ func TestSteamRemoteRead(t *testing.T) { chunk: tt.chunk, }, cc: 1, - isSilent: true, isVerbose: false, } diff --git a/app/vmctl/remoteread.go b/app/vmctl/remoteread.go index 8632369c6..09ac7f052 100644 --- a/app/vmctl/remoteread.go +++ b/app/vmctl/remoteread.go @@ -7,8 +7,6 @@ import ( "sync" "time" - "github.com/cheggaaa/pb/v3" - "github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/barpool" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/remoteread" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/stepper" @@ -22,7 +20,6 @@ type remoteReadProcessor struct { src *remoteread.Client cc int - isSilent bool isVerbose bool } @@ -50,21 +47,17 @@ func (rrp *remoteReadProcessor) run(ctx context.Context) error { question := fmt.Sprintf("Selected time range %q - %q will be split into %d ranges according to %q step. Continue?", rrp.filter.timeStart.String(), rrp.filter.timeEnd.String(), len(ranges), rrp.filter.chunk) - if !rrp.isSilent && !prompt(question) { + if !prompt(question) { return nil } - var bar *pb.ProgressBar - if !rrp.isSilent { - bar = barpool.AddWithTemplate(fmt.Sprintf(barTpl, "Processing ranges"), len(ranges)) - if err := barpool.Start(); err != nil { - return err - } + bar := barpool.AddWithTemplate(fmt.Sprintf(barTpl, "Processing ranges"), len(ranges)) + if err := barpool.Start(); err != nil { + return err } + defer func() { - if !rrp.isSilent { - barpool.Stop() - } + barpool.Stop() log.Println("Import finished!") log.Print(rrp.dst.Stats()) }() @@ -82,9 +75,7 @@ func (rrp *remoteReadProcessor) run(ctx context.Context) error { errCh <- fmt.Errorf("request failed for: %s", err) return } - if bar != nil { - bar.Increment() - } + bar.Increment() } }() } diff --git a/app/vmctl/utils.go b/app/vmctl/utils.go index c53b95554..3fa384d63 100644 --- a/app/vmctl/utils.go +++ b/app/vmctl/utils.go @@ -12,7 +12,13 @@ import ( const barTpl = `{{ blue "%s:" }} {{ counters . }} {{ bar . "[" "█" (cycle . "█") "▒" "]" }} {{ percent . }}` +// isSilent should be inited in main +var isSilent bool + func prompt(question string) bool { + if isSilent { + return true + } isTerminal := terminal.IsTerminal(int(os.Stdout.Fd())) if !isTerminal { return true diff --git a/app/vmctl/vm/vm.go b/app/vmctl/vm/vm.go index 539976770..03183e867 100644 --- a/app/vmctl/vm/vm.go +++ b/app/vmctl/vm/vm.go @@ -16,7 +16,6 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/barpool" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/limiter" "github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal" - "github.com/cheggaaa/pb/v3" ) // Config contains list of params to configure @@ -55,8 +54,6 @@ type Config struct { // RateLimit defines a data transfer speed in bytes per second. // Is applied to each worker (see Concurrency) independently. RateLimit int64 - // Whether to disable progress bar per VM worker - DisableProgressBar bool } // Importer performs insertion of timeseries @@ -159,12 +156,10 @@ func NewImporter(ctx context.Context, cfg Config) (*Importer, error) { im.wg.Add(int(cfg.Concurrency)) for i := 0; i < int(cfg.Concurrency); i++ { - var bar *pb.ProgressBar - if !cfg.DisableProgressBar { - pbPrefix := fmt.Sprintf(`{{ green "VM worker %d:" }}`, i) - bar = barpool.AddWithTemplate(pbPrefix+pbTpl, 0) - } - go func(bar *pb.ProgressBar) { + pbPrefix := fmt.Sprintf(`{{ green "VM worker %d:" }}`, i) + bar := barpool.AddWithTemplate(pbPrefix+pbTpl, 0) + + go func(bar barpool.Bar) { defer im.wg.Done() im.startWorker(ctx, bar, cfg.BatchSize, cfg.SignificantFigures, cfg.RoundDigits) }(bar) @@ -217,7 +212,7 @@ func (im *Importer) Close() { }) } -func (im *Importer) startWorker(ctx context.Context, bar *pb.ProgressBar, batchSize, significantFigures, roundDigits int) { +func (im *Importer) startWorker(ctx context.Context, bar barpool.Bar, batchSize, significantFigures, roundDigits int) { var batch []*TimeSeries var dataPoints int var waitForBatch time.Time @@ -252,9 +247,7 @@ func (im *Importer) startWorker(ctx context.Context, bar *pb.ProgressBar, batchS batch = append(batch, ts) dataPoints += len(ts.Values) - if bar != nil { - bar.Add(len(ts.Values)) - } + bar.Add(len(ts.Values)) if dataPoints < batchSize { continue diff --git a/app/vmctl/vm_native.go b/app/vmctl/vm_native.go index bbdbe5f63..dea0fdc75 100644 --- a/app/vmctl/vm_native.go +++ b/app/vmctl/vm_native.go @@ -9,8 +9,6 @@ import ( "sync" "time" - "github.com/cheggaaa/pb/v3" - "github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/backoff" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/barpool" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/limiter" @@ -33,7 +31,6 @@ type vmNativeProcessor struct { rateLimit int64 interCluster bool cc int - isSilent bool isNative bool disablePerMetricRequests bool @@ -82,13 +79,13 @@ func (p *vmNativeProcessor) run(ctx context.Context) error { return fmt.Errorf("failed to get tenants: %w", err) } question := fmt.Sprintf("The following tenants were discovered: %s.\n Continue?", tenants) - if !p.isSilent && !prompt(question) { + if !prompt(question) { return nil } } for _, tenantID := range tenants { - err := p.runBackfilling(ctx, tenantID, ranges, p.isSilent) + err := p.runBackfilling(ctx, tenantID, ranges) if err != nil { return fmt.Errorf("migration failed: %s", err) } @@ -100,7 +97,7 @@ func (p *vmNativeProcessor) run(ctx context.Context) error { return nil } -func (p *vmNativeProcessor) do(ctx context.Context, f native.Filter, srcURL, dstURL string, bar *pb.ProgressBar) error { +func (p *vmNativeProcessor) do(ctx context.Context, f native.Filter, srcURL, dstURL string, bar barpool.Bar) error { retryableFunc := func() error { return p.runSingle(ctx, f, srcURL, dstURL, bar) } attempts, err := p.backoff.Retry(ctx, retryableFunc) @@ -114,15 +111,18 @@ func (p *vmNativeProcessor) do(ctx context.Context, f native.Filter, srcURL, dst return nil } -func (p *vmNativeProcessor) runSingle(ctx context.Context, f native.Filter, srcURL, dstURL string, bar *pb.ProgressBar) error { +func (p *vmNativeProcessor) runSingle(ctx context.Context, f native.Filter, srcURL, dstURL string, bar barpool.Bar) error { reader, err := p.src.ExportPipe(ctx, srcURL, f) if err != nil { return fmt.Errorf("failed to init export pipe: %w", err) } - if p.disablePerMetricRequests && bar != nil { - fmt.Printf("Continue import process with filter %s:\n", f.String()) - reader = bar.NewProxyReader(reader) + if p.disablePerMetricRequests { + pr := bar.NewProxyReader(reader) + if pr != nil { + reader = bar.NewProxyReader(reader) + fmt.Printf("Continue import process with filter %s:\n", f.String()) + } } pr, pw := io.Pipe() @@ -155,7 +155,7 @@ func (p *vmNativeProcessor) runSingle(ctx context.Context, f native.Filter, srcU return <-importCh } -func (p *vmNativeProcessor) runBackfilling(ctx context.Context, tenantID string, ranges [][]time.Time, silent bool) error { +func (p *vmNativeProcessor) runBackfilling(ctx context.Context, tenantID string, ranges [][]time.Time) error { exportAddr := nativeExportAddr importAddr := nativeImportAddr if p.isNative { @@ -194,7 +194,7 @@ func (p *vmNativeProcessor) runBackfilling(ctx context.Context, tenantID string, "": ranges, } if !p.disablePerMetricRequests { - metrics, err = p.explore(ctx, p.src, tenantID, ranges, silent) + metrics, err = p.explore(ctx, p.src, tenantID, ranges) if err != nil { return fmt.Errorf("failed to explore metric names: %s", err) } @@ -216,26 +216,24 @@ func (p *vmNativeProcessor) runBackfilling(ctx context.Context, tenantID string, // do not prompt for intercluster because there could be many tenants, // and we don't want to interrupt the process when moving to the next tenant. question := foundSeriesMsg + ". Continue?" - if !silent && !prompt(question) { + if !prompt(question) { return nil } } else { log.Print(foundSeriesMsg) } - var bar *pb.ProgressBar barPrefix := "Requests to make" if p.interCluster { barPrefix = fmt.Sprintf("Requests to make for tenant %s", tenantID) } - if !silent { - bar = barpool.NewSingleProgress(fmt.Sprintf(nativeWithBackoffTpl, barPrefix), requestsToMake) - if p.disablePerMetricRequests { - bar = barpool.NewSingleProgress(nativeSingleProcessTpl, 0) - } - bar.Start() - defer bar.Finish() + + bar := barpool.NewSingleProgress(fmt.Sprintf(nativeWithBackoffTpl, barPrefix), requestsToMake) + if p.disablePerMetricRequests { + bar = barpool.NewSingleProgress(nativeSingleProcessTpl, 0) } + bar.Start() + defer bar.Finish() filterCh := make(chan native.Filter) errCh := make(chan error, p.cc) @@ -251,9 +249,7 @@ func (p *vmNativeProcessor) runBackfilling(ctx context.Context, tenantID string, errCh <- err return } - if bar != nil { - bar.Increment() - } + bar.Increment() } else { if err := p.runSingle(ctx, f, srcURL, dstURL, bar); err != nil { errCh <- err @@ -298,15 +294,12 @@ func (p *vmNativeProcessor) runBackfilling(ctx context.Context, tenantID string, return nil } -func (p *vmNativeProcessor) explore(ctx context.Context, src *native.Client, tenantID string, ranges [][]time.Time, silent bool) (map[string][][]time.Time, error) { +func (p *vmNativeProcessor) explore(ctx context.Context, src *native.Client, tenantID string, ranges [][]time.Time) (map[string][][]time.Time, error) { log.Printf("Exploring metrics...") - var bar *pb.ProgressBar - if !silent { - bar = barpool.NewSingleProgress(fmt.Sprintf(nativeWithBackoffTpl, "Explore requests to make"), len(ranges)) - bar.Start() - defer bar.Finish() - } + bar := barpool.NewSingleProgress(fmt.Sprintf(nativeWithBackoffTpl, "Explore requests to make"), len(ranges)) + bar.Start() + defer bar.Finish() metrics := make(map[string][][]time.Time) for _, r := range ranges { @@ -317,9 +310,7 @@ func (p *vmNativeProcessor) explore(ctx context.Context, src *native.Client, ten for i := range ms { metrics[ms[i]] = append(metrics[ms[i]], r) } - if bar != nil { - bar.Increment() - } + bar.Increment() } return metrics, nil } diff --git a/app/vmctl/vm_native_test.go b/app/vmctl/vm_native_test.go index 78c35612c..1f24dad56 100644 --- a/app/vmctl/vm_native_test.go +++ b/app/vmctl/vm_native_test.go @@ -4,6 +4,7 @@ import ( "context" "flag" "fmt" + "github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/barpool" "log" "net/http" "os" @@ -37,6 +38,12 @@ func Test_vmNativeProcessor_run(t *testing.T) { } }() + barpool.Disable(true) + defer func() { + barpool.Disable(false) + }() + defer func() { isSilent = false }() + type fields struct { filter native.Filter dst *native.Client @@ -218,6 +225,7 @@ func Test_vmNativeProcessor_run(t *testing.T) { HTTPClient: &http.Client{Transport: &http.Transport{DisableKeepAlives: false}}, } + isSilent = tt.args.silent p := &vmNativeProcessor{ filter: tt.fields.filter, dst: tt.fields.dst, @@ -227,7 +235,6 @@ func Test_vmNativeProcessor_run(t *testing.T) { rateLimit: tt.fields.rateLimit, interCluster: tt.fields.interCluster, cc: tt.fields.cc, - isSilent: tt.args.silent, isNative: true, } diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 5ed1ebbfa..87d26420d 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -30,10 +30,13 @@ See also [LTS releases](https://docs.victoriametrics.com/lts-releases/). ## tip +**Update note 1: the `--vm-disable-progress-bar` command-line flag at `vmctl` is deprecated and will be removed in the future releases. Use `--disable-progress-bar` instead.** + * FEATURE: [alerts-vmagent](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/deployment/docker/alerts-vmagent.yml): add new alerting rules `StreamAggrFlushTimeout` and `StreamAggrDedupFlushTimeout` to notify about issues during stream aggregation. * FEATURE: [dashboards/vmagent](https://grafana.com/grafana/dashboards/12683): add row `Streaming aggregation` with panels related to [streaming aggregation](https://docs.victoriametrics.com/stream-aggregation/) process. * BUGFIX: all VictoriaMetrics components: prioritize `-configAuthKey` and `-reloadAuthKey` over `-httpAuth.*` settings. This change aligns behavior of mentioned flags with other auth flags like `-metricsAuthKey`, `-flagsAuthKey`, `-pprofAuthKey`. Check [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6329). +* BUGFIX: [vmctl](https://docs.victoriametrics.com/vmctl/): add `--disable-progress-bar` global command-line flag. It can be used for disabling dynamic progress bar for all migration modes. `--vm-disable-progress-bar` command-line flag is deprecated and will be removed in the future releases. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6367). ## [v1.102.0-rc1](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.102.0-rc1)