app/vmctl: add verbose output for docker installations or when TTY isn't available (#4333)

* app/vmctl: add verbose output for docker installations or when TTY isn't available

* app/vmctl: fix tests

* app/vmctl: make vmctl interactive if no tty

* app/vmctl: cleanup

* app/vmctl: add comment

---------

Co-authored-by: Nikolay <nik@victoriametrics.com>

(cherry picked from commit fc5292d8ed)
Signed-off-by: hagen1778 <roman@victoriametrics.com>
This commit is contained in:
Dmytro Kozlov 2023-06-02 14:57:08 +02:00 committed by hagen1778
parent c5debee3f4
commit dd89fb2e12
No known key found for this signature in database
GPG key ID: 3BF75F3741CA9640
8 changed files with 82 additions and 45 deletions

View file

@ -3,7 +3,13 @@
// altogether. // altogether.
package barpool package barpool
import "github.com/cheggaaa/pb/v3" import (
"fmt"
"os"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/terminal"
"github.com/cheggaaa/pb/v3"
)
var pool = pb.NewPool() var pool = pb.NewPool()
@ -20,7 +26,22 @@ func Stop() { _ = pool.Stop() }
// AddWithTemplate adds bar with the given template // AddWithTemplate adds bar with the given template
// to the global pool // to the global pool
func AddWithTemplate(format string, total int) *pb.ProgressBar { func AddWithTemplate(format string, total int) *pb.ProgressBar {
bar := pb.ProgressBarTemplate(format).New(total) tpl := getTemplate(format)
bar := pb.ProgressBarTemplate(tpl).New(total)
Add(bar) Add(bar)
return bar return bar
} }
// NewSingleProgress returns progress bar with given template
func NewSingleProgress(format string, total int) *pb.ProgressBar {
tpl := getTemplate(format)
return pb.ProgressBarTemplate(tpl).New(total)
}
func getTemplate(format string) string {
isTerminal := terminal.IsTerminal(int(os.Stdout.Fd()))
if !isTerminal {
format = fmt.Sprintf("%s\n", format)
}
return format
}

View file

@ -18,9 +18,11 @@ type influxProcessor struct {
separator string separator string
skipDbLabel bool skipDbLabel bool
promMode bool promMode bool
isSilent bool
isVerbose bool
} }
func newInfluxProcessor(ic *influx.Client, im *vm.Importer, cc int, separator string, skipDbLabel bool, promMode bool) *influxProcessor { func newInfluxProcessor(ic *influx.Client, im *vm.Importer, cc int, separator string, skipDbLabel, promMode, silent, verbose bool) *influxProcessor {
if cc < 1 { if cc < 1 {
cc = 1 cc = 1
} }
@ -31,10 +33,12 @@ func newInfluxProcessor(ic *influx.Client, im *vm.Importer, cc int, separator st
separator: separator, separator: separator,
skipDbLabel: skipDbLabel, skipDbLabel: skipDbLabel,
promMode: promMode, promMode: promMode,
isSilent: silent,
isVerbose: verbose,
} }
} }
func (ip *influxProcessor) run(silent, verbose bool) error { func (ip *influxProcessor) run() error {
series, err := ip.ic.Explore() series, err := ip.ic.Explore()
if err != nil { if err != nil {
return fmt.Errorf("explore query failed: %s", err) return fmt.Errorf("explore query failed: %s", err)
@ -44,7 +48,7 @@ func (ip *influxProcessor) run(silent, verbose bool) error {
} }
question := fmt.Sprintf("Found %d timeseries to import. Continue?", len(series)) question := fmt.Sprintf("Found %d timeseries to import. Continue?", len(series))
if !silent && !prompt(question) { if !ip.isSilent && !prompt(question) {
return nil return nil
} }
@ -79,7 +83,7 @@ func (ip *influxProcessor) run(silent, verbose bool) error {
case infErr := <-errCh: case infErr := <-errCh:
return fmt.Errorf("influx error: %s", infErr) return fmt.Errorf("influx error: %s", infErr)
case vmErr := <-ip.im.Errors(): case vmErr := <-ip.im.Errors():
return fmt.Errorf("import process failed: %s", wrapErr(vmErr, verbose)) return fmt.Errorf("import process failed: %s", wrapErr(vmErr, ip.isVerbose))
case seriesCh <- s: case seriesCh <- s:
} }
} }
@ -91,7 +95,7 @@ func (ip *influxProcessor) run(silent, verbose bool) error {
// drain import errors channel // drain import errors channel
for vmErr := range ip.im.Errors() { for vmErr := range ip.im.Errors() {
if vmErr.Err != nil { if vmErr.Err != nil {
return fmt.Errorf("import process failed: %s", wrapErr(vmErr, verbose)) return fmt.Errorf("import process failed: %s", wrapErr(vmErr, ip.isVerbose))
} }
} }
for err := range errCh { for err := range errCh {

View file

@ -16,7 +16,6 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/backoff" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/backoff"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/native" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/native"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/remoteread" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/remoteread"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/terminal"
"github.com/urfave/cli/v2" "github.com/urfave/cli/v2"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/influx" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/influx"
@ -72,8 +71,8 @@ func main() {
return fmt.Errorf("failed to create VM importer: %s", err) return fmt.Errorf("failed to create VM importer: %s", err)
} }
otsdbProcessor := newOtsdbProcessor(otsdbClient, importer, c.Int(otsdbConcurrency)) otsdbProcessor := newOtsdbProcessor(otsdbClient, importer, c.Int(otsdbConcurrency), c.Bool(globalSilent), c.Bool(globalVerbose))
return otsdbProcessor.run(isNonInteractive(c), c.Bool(globalVerbose)) return otsdbProcessor.run()
}, },
}, },
{ {
@ -113,8 +112,10 @@ func main() {
c.Int(influxConcurrency), c.Int(influxConcurrency),
c.String(influxMeasurementFieldSeparator), c.String(influxMeasurementFieldSeparator),
c.Bool(influxSkipDatabaseLabel), c.Bool(influxSkipDatabaseLabel),
c.Bool(influxPrometheusMode)) c.Bool(influxPrometheusMode),
return processor.run(isNonInteractive(c), c.Bool(globalVerbose)) c.Bool(globalSilent),
c.Bool(globalVerbose))
return processor.run()
}, },
}, },
{ {
@ -152,9 +153,11 @@ func main() {
timeEnd: c.Timestamp(remoteReadFilterTimeEnd), timeEnd: c.Timestamp(remoteReadFilterTimeEnd),
chunk: c.String(remoteReadStepInterval), chunk: c.String(remoteReadStepInterval),
}, },
cc: c.Int(remoteReadConcurrency), cc: c.Int(remoteReadConcurrency),
isSilent: c.Bool(globalSilent),
isVerbose: c.Bool(globalVerbose),
} }
return rmp.run(ctx, isNonInteractive(c), c.Bool(globalVerbose)) return rmp.run(ctx)
}, },
}, },
{ {
@ -188,7 +191,7 @@ func main() {
im: importer, im: importer,
cc: c.Int(promConcurrency), cc: c.Int(promConcurrency),
} }
return pp.run(isNonInteractive(c), c.Bool(globalVerbose)) return pp.run(c.Bool(globalSilent), c.Bool(globalVerbose))
}, },
}, },
{ {
@ -250,8 +253,9 @@ func main() {
backoff: backoff.New(), backoff: backoff.New(),
cc: c.Int(vmConcurrency), cc: c.Int(vmConcurrency),
disableRetries: c.Bool(vmNativeDisableRetries), disableRetries: c.Bool(vmNativeDisableRetries),
isSilent: c.Bool(globalSilent),
} }
return p.run(ctx, isNonInteractive(c)) return p.run(ctx)
}, },
}, },
{ {
@ -324,8 +328,3 @@ func initConfigVM(c *cli.Context) vm.Config {
DisableProgressBar: c.Bool(vmDisableProgressBar), DisableProgressBar: c.Bool(vmDisableProgressBar),
} }
} }
func isNonInteractive(c *cli.Context) bool {
isTerminal := terminal.IsTerminal(int(os.Stdout.Fd()))
return c.Bool(globalSilent) || !isTerminal
}

View file

@ -12,9 +12,11 @@ import (
) )
type otsdbProcessor struct { type otsdbProcessor struct {
oc *opentsdb.Client oc *opentsdb.Client
im *vm.Importer im *vm.Importer
otsdbcc int otsdbcc int
isSilent bool
isVerbose bool
} }
type queryObj struct { type queryObj struct {
@ -24,18 +26,20 @@ type queryObj struct {
StartTime int64 StartTime int64
} }
func newOtsdbProcessor(oc *opentsdb.Client, im *vm.Importer, otsdbcc int) *otsdbProcessor { func newOtsdbProcessor(oc *opentsdb.Client, im *vm.Importer, otsdbcc int, silent, verbose bool) *otsdbProcessor {
if otsdbcc < 1 { if otsdbcc < 1 {
otsdbcc = 1 otsdbcc = 1
} }
return &otsdbProcessor{ return &otsdbProcessor{
oc: oc, oc: oc,
im: im, im: im,
otsdbcc: otsdbcc, otsdbcc: otsdbcc,
isSilent: silent,
isVerbose: verbose,
} }
} }
func (op *otsdbProcessor) run(silent, verbose bool) error { func (op *otsdbProcessor) run() error {
log.Println("Loading all metrics from OpenTSDB for filters: ", op.oc.Filters) log.Println("Loading all metrics from OpenTSDB for filters: ", op.oc.Filters)
var metrics []string var metrics []string
for _, filter := range op.oc.Filters { for _, filter := range op.oc.Filters {
@ -51,7 +55,7 @@ func (op *otsdbProcessor) run(silent, verbose bool) error {
} }
question := fmt.Sprintf("Found %d metrics to import. Continue?", len(metrics)) question := fmt.Sprintf("Found %d metrics to import. Continue?", len(metrics))
if !silent && !prompt(question) { if !op.isSilent && !prompt(question) {
return nil return nil
} }
op.im.ResetStats() op.im.ResetStats()
@ -114,7 +118,7 @@ func (op *otsdbProcessor) run(silent, verbose bool) error {
case otsdbErr := <-errCh: case otsdbErr := <-errCh:
return fmt.Errorf("opentsdb error: %s", otsdbErr) return fmt.Errorf("opentsdb error: %s", otsdbErr)
case vmErr := <-op.im.Errors(): case vmErr := <-op.im.Errors():
return fmt.Errorf("import process failed: %s", wrapErr(vmErr, verbose)) return fmt.Errorf("import process failed: %s", wrapErr(vmErr, op.isVerbose))
case seriesCh <- queryObj{ case seriesCh <- queryObj{
Tr: tr, StartTime: startTime, Tr: tr, StartTime: startTime,
Series: series, Rt: opentsdb.RetentionMeta{ Series: series, Rt: opentsdb.RetentionMeta{
@ -138,7 +142,7 @@ func (op *otsdbProcessor) run(silent, verbose bool) error {
op.im.Close() op.im.Close()
for vmErr := range op.im.Errors() { for vmErr := range op.im.Errors() {
if vmErr.Err != nil { if vmErr.Err != nil {
return fmt.Errorf("import process failed: %s", wrapErr(vmErr, verbose)) return fmt.Errorf("import process failed: %s", wrapErr(vmErr, op.isVerbose))
} }
} }
log.Println("Import finished!") log.Println("Import finished!")

View file

@ -20,7 +20,9 @@ type remoteReadProcessor struct {
dst *vm.Importer dst *vm.Importer
src *remoteread.Client src *remoteread.Client
cc int cc int
isSilent bool
isVerbose bool
} }
type remoteReadFilter struct { type remoteReadFilter struct {
@ -29,7 +31,7 @@ type remoteReadFilter struct {
chunk string chunk string
} }
func (rrp *remoteReadProcessor) run(ctx context.Context, silent, verbose bool) error { func (rrp *remoteReadProcessor) run(ctx context.Context) error {
rrp.dst.ResetStats() rrp.dst.ResetStats()
if rrp.filter.timeEnd == nil { if rrp.filter.timeEnd == nil {
t := time.Now().In(rrp.filter.timeStart.Location()) t := time.Now().In(rrp.filter.timeStart.Location())
@ -46,19 +48,19 @@ func (rrp *remoteReadProcessor) run(ctx context.Context, silent, verbose bool) e
question := fmt.Sprintf("Selected time range %q - %q will be split into %d ranges according to %q step. Continue?", question := fmt.Sprintf("Selected time range %q - %q will be split into %d ranges according to %q step. Continue?",
rrp.filter.timeStart.String(), rrp.filter.timeEnd.String(), len(ranges), rrp.filter.chunk) rrp.filter.timeStart.String(), rrp.filter.timeEnd.String(), len(ranges), rrp.filter.chunk)
if !silent && !prompt(question) { if !rrp.isSilent && !prompt(question) {
return nil return nil
} }
var bar *pb.ProgressBar var bar *pb.ProgressBar
if !silent { if !rrp.isSilent {
bar = barpool.AddWithTemplate(fmt.Sprintf(barTpl, "Processing ranges"), len(ranges)) bar = barpool.AddWithTemplate(fmt.Sprintf(barTpl, "Processing ranges"), len(ranges))
if err := barpool.Start(); err != nil { if err := barpool.Start(); err != nil {
return err return err
} }
} }
defer func() { defer func() {
if !silent { if !rrp.isSilent {
barpool.Stop() barpool.Stop()
} }
log.Println("Import finished!") log.Println("Import finished!")
@ -90,7 +92,7 @@ func (rrp *remoteReadProcessor) run(ctx context.Context, silent, verbose bool) e
case infErr := <-errCh: case infErr := <-errCh:
return fmt.Errorf("remote read error: %s", infErr) return fmt.Errorf("remote read error: %s", infErr)
case vmErr := <-rrp.dst.Errors(): case vmErr := <-rrp.dst.Errors():
return fmt.Errorf("import process failed: %s", wrapErr(vmErr, verbose)) return fmt.Errorf("import process failed: %s", wrapErr(vmErr, rrp.isVerbose))
case rangeC <- &remoteread.Filter{ case rangeC <- &remoteread.Filter{
StartTimestampMs: r[0].UnixMilli(), StartTimestampMs: r[0].UnixMilli(),
EndTimestampMs: r[1].UnixMilli(), EndTimestampMs: r[1].UnixMilli(),
@ -105,7 +107,7 @@ func (rrp *remoteReadProcessor) run(ctx context.Context, silent, verbose bool) e
// drain import errors channel // drain import errors channel
for vmErr := range rrp.dst.Errors() { for vmErr := range rrp.dst.Errors() {
if vmErr.Err != nil { if vmErr.Err != nil {
return fmt.Errorf("import process failed: %s", wrapErr(vmErr, verbose)) return fmt.Errorf("import process failed: %s", wrapErr(vmErr, rrp.isVerbose))
} }
} }
for err := range errCh { for err := range errCh {

View file

@ -6,12 +6,17 @@ import (
"os" "os"
"strings" "strings"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/terminal"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/vm" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/vm"
) )
const barTpl = `{{ blue "%s:" }} {{ counters . }} {{ bar . "[" "█" (cycle . "█") "▒" "]" }} {{ percent . }}` const barTpl = `{{ blue "%s:" }} {{ counters . }} {{ bar . "[" "█" (cycle . "█") "▒" "]" }} {{ percent . }}`
func prompt(question string) bool { func prompt(question string) bool {
isTerminal := terminal.IsTerminal(int(os.Stdout.Fd()))
if !isTerminal {
return true
}
reader := bufio.NewReader(os.Stdin) reader := bufio.NewReader(os.Stdin)
fmt.Print(question, " [Y/n] ") fmt.Print(question, " [Y/n] ")
answer, err := reader.ReadString('\n') answer, err := reader.ReadString('\n')

View file

@ -10,6 +10,7 @@ import (
"time" "time"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/backoff" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/backoff"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/barpool"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/limiter" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/limiter"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/native" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/native"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/stepper" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/stepper"
@ -32,6 +33,7 @@ type vmNativeProcessor struct {
interCluster bool interCluster bool
cc int cc int
disableRetries bool disableRetries bool
isSilent bool
} }
const ( const (
@ -41,7 +43,7 @@ const (
nativeSingleProcessTpl = `Total: {{counters . }} {{ cycle . "↖" "↗" "↘" "↙" }} Speed: {{speed . }} {{string . "suffix"}}` nativeSingleProcessTpl = `Total: {{counters . }} {{ cycle . "↖" "↗" "↘" "↙" }} Speed: {{speed . }} {{string . "suffix"}}`
) )
func (p *vmNativeProcessor) run(ctx context.Context, silent bool) error { func (p *vmNativeProcessor) run(ctx context.Context) error {
if p.cc == 0 { if p.cc == 0 {
p.cc = 1 p.cc = 1
} }
@ -78,13 +80,13 @@ func (p *vmNativeProcessor) run(ctx context.Context, silent bool) error {
return fmt.Errorf("failed to get tenants: %w", err) return fmt.Errorf("failed to get tenants: %w", err)
} }
question := fmt.Sprintf("The following tenants were discovered: %s.\n Continue?", tenants) question := fmt.Sprintf("The following tenants were discovered: %s.\n Continue?", tenants)
if !silent && !prompt(question) { if !p.isSilent && !prompt(question) {
return nil return nil
} }
} }
for _, tenantID := range tenants { for _, tenantID := range tenants {
err := p.runBackfilling(ctx, tenantID, ranges, silent) err := p.runBackfilling(ctx, tenantID, ranges, p.isSilent)
if err != nil { if err != nil {
return fmt.Errorf("migration failed: %s", err) return fmt.Errorf("migration failed: %s", err)
} }
@ -111,7 +113,6 @@ func (p *vmNativeProcessor) do(ctx context.Context, f native.Filter, srcURL, dst
} }
func (p *vmNativeProcessor) runSingle(ctx context.Context, f native.Filter, srcURL, dstURL string, bar *pb.ProgressBar) error { func (p *vmNativeProcessor) runSingle(ctx context.Context, f native.Filter, srcURL, dstURL string, bar *pb.ProgressBar) error {
reader, err := p.src.ExportPipe(ctx, srcURL, f) reader, err := p.src.ExportPipe(ctx, srcURL, f)
if err != nil { if err != nil {
return fmt.Errorf("failed to init export pipe: %w", err) return fmt.Errorf("failed to init export pipe: %w", err)
@ -218,9 +219,9 @@ func (p *vmNativeProcessor) runBackfilling(ctx context.Context, tenantID string,
var bar *pb.ProgressBar var bar *pb.ProgressBar
if !silent { if !silent {
bar = pb.ProgressBarTemplate(fmt.Sprintf(nativeWithBackoffTpl, barPrefix)).New(len(metrics) * len(ranges)) bar = barpool.NewSingleProgress(fmt.Sprintf(nativeWithBackoffTpl, barPrefix), len(metrics)*len(ranges))
if p.disableRetries { if p.disableRetries {
bar = pb.ProgressBarTemplate(nativeSingleProcessTpl).New(0) bar = barpool.NewSingleProgress(nativeSingleProcessTpl, 0)
} }
bar.Start() bar.Start()
defer bar.Finish() defer bar.Finish()

View file

@ -24,6 +24,7 @@ The following tip changes can be tested by building VictoriaMetrics components f
## tip ## tip
* FEATURE: [vmctl](https://docs.victoriametrics.com/vmctl.html): add verbose output for docker installations or when TTY isn't available. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4081).
* BUGFIX: [vmalert](https://docs.victoriametrics.com/vmalert.html): fix nil map assignment panic in runtime introduced in this [change](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/4341). * BUGFIX: [vmalert](https://docs.victoriametrics.com/vmalert.html): fix nil map assignment panic in runtime introduced in this [change](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/4341).
* BUGFIX: add the following command-line flags, which can be used for limiting Graphite API calls: * BUGFIX: add the following command-line flags, which can be used for limiting Graphite API calls:
`--search.maxGraphiteTagKeys` for limiting the number of tag keys returned from Graphite `/tags`, `/tags/autoComplete/*`, `/tags/findSeries` API. `--search.maxGraphiteTagKeys` for limiting the number of tag keys returned from Graphite `/tags`, `/tags/autoComplete/*`, `/tags/findSeries` API.