diff --git a/app/vmctl/barpool/pool.go b/app/vmctl/barpool/pool.go index 4f095d8ac..05fe8b418 100644 --- a/app/vmctl/barpool/pool.go +++ b/app/vmctl/barpool/pool.go @@ -3,7 +3,13 @@ // altogether. package barpool -import "github.com/cheggaaa/pb/v3" +import ( + "fmt" + "os" + + "github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/terminal" + "github.com/cheggaaa/pb/v3" +) var pool = pb.NewPool() @@ -20,7 +26,22 @@ func Stop() { _ = pool.Stop() } // AddWithTemplate adds bar with the given template // to the global pool func AddWithTemplate(format string, total int) *pb.ProgressBar { - bar := pb.ProgressBarTemplate(format).New(total) + tpl := getTemplate(format) + bar := pb.ProgressBarTemplate(tpl).New(total) Add(bar) return bar } + +// NewSingleProgress returns progress bar with given template +func NewSingleProgress(format string, total int) *pb.ProgressBar { + tpl := getTemplate(format) + return pb.ProgressBarTemplate(tpl).New(total) +} + +func getTemplate(format string) string { + isTerminal := terminal.IsTerminal(int(os.Stdout.Fd())) + if !isTerminal { + format = fmt.Sprintf("%s\n", format) + } + return format +} diff --git a/app/vmctl/influx.go b/app/vmctl/influx.go index 8e11d91be..41cf74083 100644 --- a/app/vmctl/influx.go +++ b/app/vmctl/influx.go @@ -18,9 +18,11 @@ type influxProcessor struct { separator string skipDbLabel bool promMode bool + isSilent bool + isVerbose bool } -func newInfluxProcessor(ic *influx.Client, im *vm.Importer, cc int, separator string, skipDbLabel bool, promMode bool) *influxProcessor { +func newInfluxProcessor(ic *influx.Client, im *vm.Importer, cc int, separator string, skipDbLabel, promMode, silent, verbose bool) *influxProcessor { if cc < 1 { cc = 1 } @@ -31,10 +33,12 @@ func newInfluxProcessor(ic *influx.Client, im *vm.Importer, cc int, separator st separator: separator, skipDbLabel: skipDbLabel, promMode: promMode, + isSilent: silent, + isVerbose: verbose, } } -func (ip *influxProcessor) run(silent, verbose bool) error { +func (ip *influxProcessor) run() error { series, err := ip.ic.Explore() if err != nil { return fmt.Errorf("explore query failed: %s", err) @@ -44,7 +48,7 @@ func (ip *influxProcessor) run(silent, verbose bool) error { } question := fmt.Sprintf("Found %d timeseries to import. Continue?", len(series)) - if !silent && !prompt(question) { + if !ip.isSilent && !prompt(question) { return nil } @@ -79,7 +83,7 @@ func (ip *influxProcessor) run(silent, verbose bool) error { case infErr := <-errCh: return fmt.Errorf("influx error: %s", infErr) case vmErr := <-ip.im.Errors(): - return fmt.Errorf("import process failed: %s", wrapErr(vmErr, verbose)) + return fmt.Errorf("import process failed: %s", wrapErr(vmErr, ip.isVerbose)) case seriesCh <- s: } } @@ -91,7 +95,7 @@ func (ip *influxProcessor) run(silent, verbose bool) error { // drain import errors channel for vmErr := range ip.im.Errors() { if vmErr.Err != nil { - return fmt.Errorf("import process failed: %s", wrapErr(vmErr, verbose)) + return fmt.Errorf("import process failed: %s", wrapErr(vmErr, ip.isVerbose)) } } for err := range errCh { diff --git a/app/vmctl/main.go b/app/vmctl/main.go index be75ceb11..e81746a76 100644 --- a/app/vmctl/main.go +++ b/app/vmctl/main.go @@ -16,7 +16,6 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/backoff" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/native" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/remoteread" - "github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/terminal" "github.com/urfave/cli/v2" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/influx" @@ -72,8 +71,8 @@ func main() { return fmt.Errorf("failed to create VM importer: %s", err) } - otsdbProcessor := newOtsdbProcessor(otsdbClient, importer, c.Int(otsdbConcurrency)) - return otsdbProcessor.run(isNonInteractive(c), c.Bool(globalVerbose)) + otsdbProcessor := newOtsdbProcessor(otsdbClient, importer, c.Int(otsdbConcurrency), c.Bool(globalSilent), c.Bool(globalVerbose)) + return otsdbProcessor.run() }, }, { @@ -113,8 +112,10 @@ func main() { c.Int(influxConcurrency), c.String(influxMeasurementFieldSeparator), c.Bool(influxSkipDatabaseLabel), - c.Bool(influxPrometheusMode)) - return processor.run(isNonInteractive(c), c.Bool(globalVerbose)) + c.Bool(influxPrometheusMode), + c.Bool(globalSilent), + c.Bool(globalVerbose)) + return processor.run() }, }, { @@ -152,9 +153,11 @@ func main() { timeEnd: c.Timestamp(remoteReadFilterTimeEnd), chunk: c.String(remoteReadStepInterval), }, - cc: c.Int(remoteReadConcurrency), + cc: c.Int(remoteReadConcurrency), + isSilent: c.Bool(globalSilent), + isVerbose: c.Bool(globalVerbose), } - return rmp.run(ctx, isNonInteractive(c), c.Bool(globalVerbose)) + return rmp.run(ctx) }, }, { @@ -188,7 +191,7 @@ func main() { im: importer, cc: c.Int(promConcurrency), } - return pp.run(isNonInteractive(c), c.Bool(globalVerbose)) + return pp.run(c.Bool(globalSilent), c.Bool(globalVerbose)) }, }, { @@ -250,8 +253,9 @@ func main() { backoff: backoff.New(), cc: c.Int(vmConcurrency), disableRetries: c.Bool(vmNativeDisableRetries), + isSilent: c.Bool(globalSilent), } - return p.run(ctx, isNonInteractive(c)) + return p.run(ctx) }, }, { @@ -324,8 +328,3 @@ func initConfigVM(c *cli.Context) vm.Config { DisableProgressBar: c.Bool(vmDisableProgressBar), } } - -func isNonInteractive(c *cli.Context) bool { - isTerminal := terminal.IsTerminal(int(os.Stdout.Fd())) - return c.Bool(globalSilent) || !isTerminal -} diff --git a/app/vmctl/opentsdb.go b/app/vmctl/opentsdb.go index b956f0592..a20ee007f 100644 --- a/app/vmctl/opentsdb.go +++ b/app/vmctl/opentsdb.go @@ -12,9 +12,11 @@ import ( ) type otsdbProcessor struct { - oc *opentsdb.Client - im *vm.Importer - otsdbcc int + oc *opentsdb.Client + im *vm.Importer + otsdbcc int + isSilent bool + isVerbose bool } type queryObj struct { @@ -24,18 +26,20 @@ type queryObj struct { StartTime int64 } -func newOtsdbProcessor(oc *opentsdb.Client, im *vm.Importer, otsdbcc int) *otsdbProcessor { +func newOtsdbProcessor(oc *opentsdb.Client, im *vm.Importer, otsdbcc int, silent, verbose bool) *otsdbProcessor { if otsdbcc < 1 { otsdbcc = 1 } return &otsdbProcessor{ - oc: oc, - im: im, - otsdbcc: otsdbcc, + oc: oc, + im: im, + otsdbcc: otsdbcc, + isSilent: silent, + isVerbose: verbose, } } -func (op *otsdbProcessor) run(silent, verbose bool) error { +func (op *otsdbProcessor) run() error { log.Println("Loading all metrics from OpenTSDB for filters: ", op.oc.Filters) var metrics []string for _, filter := range op.oc.Filters { @@ -51,7 +55,7 @@ func (op *otsdbProcessor) run(silent, verbose bool) error { } question := fmt.Sprintf("Found %d metrics to import. Continue?", len(metrics)) - if !silent && !prompt(question) { + if !op.isSilent && !prompt(question) { return nil } op.im.ResetStats() @@ -114,7 +118,7 @@ func (op *otsdbProcessor) run(silent, verbose bool) error { case otsdbErr := <-errCh: return fmt.Errorf("opentsdb error: %s", otsdbErr) case vmErr := <-op.im.Errors(): - return fmt.Errorf("import process failed: %s", wrapErr(vmErr, verbose)) + return fmt.Errorf("import process failed: %s", wrapErr(vmErr, op.isVerbose)) case seriesCh <- queryObj{ Tr: tr, StartTime: startTime, Series: series, Rt: opentsdb.RetentionMeta{ @@ -138,7 +142,7 @@ func (op *otsdbProcessor) run(silent, verbose bool) error { op.im.Close() for vmErr := range op.im.Errors() { if vmErr.Err != nil { - return fmt.Errorf("import process failed: %s", wrapErr(vmErr, verbose)) + return fmt.Errorf("import process failed: %s", wrapErr(vmErr, op.isVerbose)) } } log.Println("Import finished!") diff --git a/app/vmctl/remoteread.go b/app/vmctl/remoteread.go index 6539268f4..8531c6cd6 100644 --- a/app/vmctl/remoteread.go +++ b/app/vmctl/remoteread.go @@ -20,7 +20,9 @@ type remoteReadProcessor struct { dst *vm.Importer src *remoteread.Client - cc int + cc int + isSilent bool + isVerbose bool } type remoteReadFilter struct { @@ -29,7 +31,7 @@ type remoteReadFilter struct { chunk string } -func (rrp *remoteReadProcessor) run(ctx context.Context, silent, verbose bool) error { +func (rrp *remoteReadProcessor) run(ctx context.Context) error { rrp.dst.ResetStats() if rrp.filter.timeEnd == nil { t := time.Now().In(rrp.filter.timeStart.Location()) @@ -46,19 +48,19 @@ func (rrp *remoteReadProcessor) run(ctx context.Context, silent, verbose bool) e question := fmt.Sprintf("Selected time range %q - %q will be split into %d ranges according to %q step. Continue?", rrp.filter.timeStart.String(), rrp.filter.timeEnd.String(), len(ranges), rrp.filter.chunk) - if !silent && !prompt(question) { + if !rrp.isSilent && !prompt(question) { return nil } var bar *pb.ProgressBar - if !silent { + if !rrp.isSilent { bar = barpool.AddWithTemplate(fmt.Sprintf(barTpl, "Processing ranges"), len(ranges)) if err := barpool.Start(); err != nil { return err } } defer func() { - if !silent { + if !rrp.isSilent { barpool.Stop() } log.Println("Import finished!") @@ -90,7 +92,7 @@ func (rrp *remoteReadProcessor) run(ctx context.Context, silent, verbose bool) e case infErr := <-errCh: return fmt.Errorf("remote read error: %s", infErr) case vmErr := <-rrp.dst.Errors(): - return fmt.Errorf("import process failed: %s", wrapErr(vmErr, verbose)) + return fmt.Errorf("import process failed: %s", wrapErr(vmErr, rrp.isVerbose)) case rangeC <- &remoteread.Filter{ StartTimestampMs: r[0].UnixMilli(), EndTimestampMs: r[1].UnixMilli(), @@ -105,7 +107,7 @@ func (rrp *remoteReadProcessor) run(ctx context.Context, silent, verbose bool) e // drain import errors channel for vmErr := range rrp.dst.Errors() { if vmErr.Err != nil { - return fmt.Errorf("import process failed: %s", wrapErr(vmErr, verbose)) + return fmt.Errorf("import process failed: %s", wrapErr(vmErr, rrp.isVerbose)) } } for err := range errCh { diff --git a/app/vmctl/utils.go b/app/vmctl/utils.go index de1fde0c2..c53b95554 100644 --- a/app/vmctl/utils.go +++ b/app/vmctl/utils.go @@ -6,12 +6,17 @@ import ( "os" "strings" + "github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/terminal" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/vm" ) const barTpl = `{{ blue "%s:" }} {{ counters . }} {{ bar . "[" "█" (cycle . "█") "▒" "]" }} {{ percent . }}` func prompt(question string) bool { + isTerminal := terminal.IsTerminal(int(os.Stdout.Fd())) + if !isTerminal { + return true + } reader := bufio.NewReader(os.Stdin) fmt.Print(question, " [Y/n] ") answer, err := reader.ReadString('\n') diff --git a/app/vmctl/vm_native.go b/app/vmctl/vm_native.go index 64b2801eb..683ad6150 100644 --- a/app/vmctl/vm_native.go +++ b/app/vmctl/vm_native.go @@ -10,6 +10,7 @@ import ( "time" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/backoff" + "github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/barpool" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/limiter" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/native" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/stepper" @@ -32,6 +33,7 @@ type vmNativeProcessor struct { interCluster bool cc int disableRetries bool + isSilent bool } const ( @@ -41,7 +43,7 @@ const ( nativeSingleProcessTpl = `Total: {{counters . }} {{ cycle . "↖" "↗" "↘" "↙" }} Speed: {{speed . }} {{string . "suffix"}}` ) -func (p *vmNativeProcessor) run(ctx context.Context, silent bool) error { +func (p *vmNativeProcessor) run(ctx context.Context) error { if p.cc == 0 { p.cc = 1 } @@ -78,13 +80,13 @@ func (p *vmNativeProcessor) run(ctx context.Context, silent bool) error { return fmt.Errorf("failed to get tenants: %w", err) } question := fmt.Sprintf("The following tenants were discovered: %s.\n Continue?", tenants) - if !silent && !prompt(question) { + if !p.isSilent && !prompt(question) { return nil } } for _, tenantID := range tenants { - err := p.runBackfilling(ctx, tenantID, ranges, silent) + err := p.runBackfilling(ctx, tenantID, ranges, p.isSilent) if err != nil { return fmt.Errorf("migration failed: %s", err) } @@ -111,7 +113,6 @@ func (p *vmNativeProcessor) do(ctx context.Context, f native.Filter, srcURL, dst } func (p *vmNativeProcessor) runSingle(ctx context.Context, f native.Filter, srcURL, dstURL string, bar *pb.ProgressBar) error { - reader, err := p.src.ExportPipe(ctx, srcURL, f) if err != nil { return fmt.Errorf("failed to init export pipe: %w", err) @@ -218,9 +219,9 @@ func (p *vmNativeProcessor) runBackfilling(ctx context.Context, tenantID string, var bar *pb.ProgressBar if !silent { - bar = pb.ProgressBarTemplate(fmt.Sprintf(nativeWithBackoffTpl, barPrefix)).New(len(metrics) * len(ranges)) + bar = barpool.NewSingleProgress(fmt.Sprintf(nativeWithBackoffTpl, barPrefix), len(metrics)*len(ranges)) if p.disableRetries { - bar = pb.ProgressBarTemplate(nativeSingleProcessTpl).New(0) + bar = barpool.NewSingleProgress(nativeSingleProcessTpl, 0) } bar.Start() defer bar.Finish() diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 7ff9628a1..4b2390b66 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -24,6 +24,7 @@ The following tip changes can be tested by building VictoriaMetrics components f ## tip +* FEATURE: [vmctl](https://docs.victoriametrics.com/vmctl.html): add verbose output for docker installations or when TTY isn't available. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4081). * BUGFIX: [vmalert](https://docs.victoriametrics.com/vmalert.html): fix nil map assignment panic in runtime introduced in this [change](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/4341). * BUGFIX: add the following command-line flags, which can be used for limiting Graphite API calls: `--search.maxGraphiteTagKeys` for limiting the number of tag keys returned from Graphite `/tags`, `/tags/autoComplete/*`, `/tags/findSeries` API.