app/vmctl: add verbose output for docker installations or when TTY isn't available (#4333)

* app/vmctl: add verbose output for docker installations or when TTY isn't available

* app/vmctl: fix tests

* app/vmctl: make vmctl interactive if no tty

* app/vmctl: cleanup

* app/vmctl: add comment

---------

Co-authored-by: Nikolay <nik@victoriametrics.com>
This commit is contained in:
Dmytro Kozlov 2023-06-02 15:57:08 +03:00 committed by GitHub
parent c7884f8686
commit fc5292d8ed
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 92 additions and 50 deletions

View file

@ -3,7 +3,13 @@
// altogether.
package barpool
import "github.com/cheggaaa/pb/v3"
import (
"fmt"
"os"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/terminal"
"github.com/cheggaaa/pb/v3"
)
var pool = pb.NewPool()
@ -20,7 +26,22 @@ func Stop() { _ = pool.Stop() }
// AddWithTemplate adds bar with the given template
// to the global pool
func AddWithTemplate(format string, total int) *pb.ProgressBar {
bar := pb.ProgressBarTemplate(format).New(total)
tpl := getTemplate(format)
bar := pb.ProgressBarTemplate(tpl).New(total)
Add(bar)
return bar
}
// NewSingleProgress returns progress bar with given template
func NewSingleProgress(format string, total int) *pb.ProgressBar {
tpl := getTemplate(format)
return pb.ProgressBarTemplate(tpl).New(total)
}
func getTemplate(format string) string {
isTerminal := terminal.IsTerminal(int(os.Stdout.Fd()))
if !isTerminal {
format = fmt.Sprintf("%s\n", format)
}
return format
}

View file

@ -18,9 +18,11 @@ type influxProcessor struct {
separator string
skipDbLabel bool
promMode bool
isSilent bool
isVerbose bool
}
func newInfluxProcessor(ic *influx.Client, im *vm.Importer, cc int, separator string, skipDbLabel bool, promMode bool) *influxProcessor {
func newInfluxProcessor(ic *influx.Client, im *vm.Importer, cc int, separator string, skipDbLabel, promMode, silent, verbose bool) *influxProcessor {
if cc < 1 {
cc = 1
}
@ -31,10 +33,12 @@ func newInfluxProcessor(ic *influx.Client, im *vm.Importer, cc int, separator st
separator: separator,
skipDbLabel: skipDbLabel,
promMode: promMode,
isSilent: silent,
isVerbose: verbose,
}
}
func (ip *influxProcessor) run(silent, verbose bool) error {
func (ip *influxProcessor) run() error {
series, err := ip.ic.Explore()
if err != nil {
return fmt.Errorf("explore query failed: %s", err)
@ -44,7 +48,7 @@ func (ip *influxProcessor) run(silent, verbose bool) error {
}
question := fmt.Sprintf("Found %d timeseries to import. Continue?", len(series))
if !silent && !prompt(question) {
if !ip.isSilent && !prompt(question) {
return nil
}
@ -79,7 +83,7 @@ func (ip *influxProcessor) run(silent, verbose bool) error {
case infErr := <-errCh:
return fmt.Errorf("influx error: %s", infErr)
case vmErr := <-ip.im.Errors():
return fmt.Errorf("import process failed: %s", wrapErr(vmErr, verbose))
return fmt.Errorf("import process failed: %s", wrapErr(vmErr, ip.isVerbose))
case seriesCh <- s:
}
}
@ -91,7 +95,7 @@ func (ip *influxProcessor) run(silent, verbose bool) error {
// drain import errors channel
for vmErr := range ip.im.Errors() {
if vmErr.Err != nil {
return fmt.Errorf("import process failed: %s", wrapErr(vmErr, verbose))
return fmt.Errorf("import process failed: %s", wrapErr(vmErr, ip.isVerbose))
}
}
for err := range errCh {

View file

@ -16,7 +16,6 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/backoff"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/native"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/remoteread"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/terminal"
"github.com/urfave/cli/v2"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/influx"
@ -72,8 +71,8 @@ func main() {
return fmt.Errorf("failed to create VM importer: %s", err)
}
otsdbProcessor := newOtsdbProcessor(otsdbClient, importer, c.Int(otsdbConcurrency))
return otsdbProcessor.run(isNonInteractive(c), c.Bool(globalVerbose))
otsdbProcessor := newOtsdbProcessor(otsdbClient, importer, c.Int(otsdbConcurrency), c.Bool(globalSilent), c.Bool(globalVerbose))
return otsdbProcessor.run()
},
},
{
@ -113,8 +112,10 @@ func main() {
c.Int(influxConcurrency),
c.String(influxMeasurementFieldSeparator),
c.Bool(influxSkipDatabaseLabel),
c.Bool(influxPrometheusMode))
return processor.run(isNonInteractive(c), c.Bool(globalVerbose))
c.Bool(influxPrometheusMode),
c.Bool(globalSilent),
c.Bool(globalVerbose))
return processor.run()
},
},
{
@ -152,9 +153,11 @@ func main() {
timeEnd: c.Timestamp(remoteReadFilterTimeEnd),
chunk: c.String(remoteReadStepInterval),
},
cc: c.Int(remoteReadConcurrency),
cc: c.Int(remoteReadConcurrency),
isSilent: c.Bool(globalSilent),
isVerbose: c.Bool(globalVerbose),
}
return rmp.run(ctx, isNonInteractive(c), c.Bool(globalVerbose))
return rmp.run(ctx)
},
},
{
@ -188,7 +191,7 @@ func main() {
im: importer,
cc: c.Int(promConcurrency),
}
return pp.run(isNonInteractive(c), c.Bool(globalVerbose))
return pp.run(c.Bool(globalSilent), c.Bool(globalVerbose))
},
},
{
@ -250,8 +253,9 @@ func main() {
backoff: backoff.New(),
cc: c.Int(vmConcurrency),
disableRetries: c.Bool(vmNativeDisableRetries),
isSilent: c.Bool(globalSilent),
}
return p.run(ctx, isNonInteractive(c))
return p.run(ctx)
},
},
{
@ -324,8 +328,3 @@ func initConfigVM(c *cli.Context) vm.Config {
DisableProgressBar: c.Bool(vmDisableProgressBar),
}
}
func isNonInteractive(c *cli.Context) bool {
isTerminal := terminal.IsTerminal(int(os.Stdout.Fd()))
return c.Bool(globalSilent) || !isTerminal
}

View file

@ -12,9 +12,11 @@ import (
)
type otsdbProcessor struct {
oc *opentsdb.Client
im *vm.Importer
otsdbcc int
oc *opentsdb.Client
im *vm.Importer
otsdbcc int
isSilent bool
isVerbose bool
}
type queryObj struct {
@ -24,18 +26,20 @@ type queryObj struct {
StartTime int64
}
func newOtsdbProcessor(oc *opentsdb.Client, im *vm.Importer, otsdbcc int) *otsdbProcessor {
func newOtsdbProcessor(oc *opentsdb.Client, im *vm.Importer, otsdbcc int, silent, verbose bool) *otsdbProcessor {
if otsdbcc < 1 {
otsdbcc = 1
}
return &otsdbProcessor{
oc: oc,
im: im,
otsdbcc: otsdbcc,
oc: oc,
im: im,
otsdbcc: otsdbcc,
isSilent: silent,
isVerbose: verbose,
}
}
func (op *otsdbProcessor) run(silent, verbose bool) error {
func (op *otsdbProcessor) run() error {
log.Println("Loading all metrics from OpenTSDB for filters: ", op.oc.Filters)
var metrics []string
for _, filter := range op.oc.Filters {
@ -51,7 +55,7 @@ func (op *otsdbProcessor) run(silent, verbose bool) error {
}
question := fmt.Sprintf("Found %d metrics to import. Continue?", len(metrics))
if !silent && !prompt(question) {
if !op.isSilent && !prompt(question) {
return nil
}
op.im.ResetStats()
@ -114,7 +118,7 @@ func (op *otsdbProcessor) run(silent, verbose bool) error {
case otsdbErr := <-errCh:
return fmt.Errorf("opentsdb error: %s", otsdbErr)
case vmErr := <-op.im.Errors():
return fmt.Errorf("import process failed: %s", wrapErr(vmErr, verbose))
return fmt.Errorf("import process failed: %s", wrapErr(vmErr, op.isVerbose))
case seriesCh <- queryObj{
Tr: tr, StartTime: startTime,
Series: series, Rt: opentsdb.RetentionMeta{
@ -138,7 +142,7 @@ func (op *otsdbProcessor) run(silent, verbose bool) error {
op.im.Close()
for vmErr := range op.im.Errors() {
if vmErr.Err != nil {
return fmt.Errorf("import process failed: %s", wrapErr(vmErr, verbose))
return fmt.Errorf("import process failed: %s", wrapErr(vmErr, op.isVerbose))
}
}
log.Println("Import finished!")

View file

@ -154,10 +154,12 @@ func TestRemoteRead(t *testing.T) {
timeEnd: &end,
chunk: tt.chunk,
},
cc: 1,
cc: 1,
isSilent: true,
isVerbose: false,
}
err = rmp.run(ctx, true, false)
err = rmp.run(ctx)
if err != nil {
t.Fatalf("failed to run remote read processor: %s", err)
}
@ -307,10 +309,12 @@ func TestSteamRemoteRead(t *testing.T) {
timeEnd: &end,
chunk: tt.chunk,
},
cc: 1,
cc: 1,
isSilent: true,
isVerbose: false,
}
err = rmp.run(ctx, true, false)
err = rmp.run(ctx)
if err != nil {
t.Fatalf("failed to run remote read processor: %s", err)
}

View file

@ -20,7 +20,9 @@ type remoteReadProcessor struct {
dst *vm.Importer
src *remoteread.Client
cc int
cc int
isSilent bool
isVerbose bool
}
type remoteReadFilter struct {
@ -29,7 +31,7 @@ type remoteReadFilter struct {
chunk string
}
func (rrp *remoteReadProcessor) run(ctx context.Context, silent, verbose bool) error {
func (rrp *remoteReadProcessor) run(ctx context.Context) error {
rrp.dst.ResetStats()
if rrp.filter.timeEnd == nil {
t := time.Now().In(rrp.filter.timeStart.Location())
@ -46,19 +48,19 @@ func (rrp *remoteReadProcessor) run(ctx context.Context, silent, verbose bool) e
question := fmt.Sprintf("Selected time range %q - %q will be split into %d ranges according to %q step. Continue?",
rrp.filter.timeStart.String(), rrp.filter.timeEnd.String(), len(ranges), rrp.filter.chunk)
if !silent && !prompt(question) {
if !rrp.isSilent && !prompt(question) {
return nil
}
var bar *pb.ProgressBar
if !silent {
if !rrp.isSilent {
bar = barpool.AddWithTemplate(fmt.Sprintf(barTpl, "Processing ranges"), len(ranges))
if err := barpool.Start(); err != nil {
return err
}
}
defer func() {
if !silent {
if !rrp.isSilent {
barpool.Stop()
}
log.Println("Import finished!")
@ -90,7 +92,7 @@ func (rrp *remoteReadProcessor) run(ctx context.Context, silent, verbose bool) e
case infErr := <-errCh:
return fmt.Errorf("remote read error: %s", infErr)
case vmErr := <-rrp.dst.Errors():
return fmt.Errorf("import process failed: %s", wrapErr(vmErr, verbose))
return fmt.Errorf("import process failed: %s", wrapErr(vmErr, rrp.isVerbose))
case rangeC <- &remoteread.Filter{
StartTimestampMs: r[0].UnixMilli(),
EndTimestampMs: r[1].UnixMilli(),
@ -105,7 +107,7 @@ func (rrp *remoteReadProcessor) run(ctx context.Context, silent, verbose bool) e
// drain import errors channel
for vmErr := range rrp.dst.Errors() {
if vmErr.Err != nil {
return fmt.Errorf("import process failed: %s", wrapErr(vmErr, verbose))
return fmt.Errorf("import process failed: %s", wrapErr(vmErr, rrp.isVerbose))
}
}
for err := range errCh {

View file

@ -6,12 +6,17 @@ import (
"os"
"strings"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/terminal"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/vm"
)
const barTpl = `{{ blue "%s:" }} {{ counters . }} {{ bar . "[" "█" (cycle . "█") "▒" "]" }} {{ percent . }}`
func prompt(question string) bool {
isTerminal := terminal.IsTerminal(int(os.Stdout.Fd()))
if !isTerminal {
return true
}
reader := bufio.NewReader(os.Stdin)
fmt.Print(question, " [Y/n] ")
answer, err := reader.ReadString('\n')

View file

@ -10,6 +10,7 @@ import (
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/backoff"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/barpool"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/limiter"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/native"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/stepper"
@ -32,6 +33,7 @@ type vmNativeProcessor struct {
interCluster bool
cc int
disableRetries bool
isSilent bool
}
const (
@ -41,7 +43,7 @@ const (
nativeSingleProcessTpl = `Total: {{counters . }} {{ cycle . "↖" "↗" "↘" "↙" }} Speed: {{speed . }} {{string . "suffix"}}`
)
func (p *vmNativeProcessor) run(ctx context.Context, silent bool) error {
func (p *vmNativeProcessor) run(ctx context.Context) error {
if p.cc == 0 {
p.cc = 1
}
@ -78,13 +80,13 @@ func (p *vmNativeProcessor) run(ctx context.Context, silent bool) error {
return fmt.Errorf("failed to get tenants: %w", err)
}
question := fmt.Sprintf("The following tenants were discovered: %s.\n Continue?", tenants)
if !silent && !prompt(question) {
if !p.isSilent && !prompt(question) {
return nil
}
}
for _, tenantID := range tenants {
err := p.runBackfilling(ctx, tenantID, ranges, silent)
err := p.runBackfilling(ctx, tenantID, ranges, p.isSilent)
if err != nil {
return fmt.Errorf("migration failed: %s", err)
}
@ -111,7 +113,6 @@ func (p *vmNativeProcessor) do(ctx context.Context, f native.Filter, srcURL, dst
}
func (p *vmNativeProcessor) runSingle(ctx context.Context, f native.Filter, srcURL, dstURL string, bar *pb.ProgressBar) error {
reader, err := p.src.ExportPipe(ctx, srcURL, f)
if err != nil {
return fmt.Errorf("failed to init export pipe: %w", err)
@ -218,9 +219,9 @@ func (p *vmNativeProcessor) runBackfilling(ctx context.Context, tenantID string,
var bar *pb.ProgressBar
if !silent {
bar = pb.ProgressBarTemplate(fmt.Sprintf(nativeWithBackoffTpl, barPrefix)).New(len(metrics) * len(ranges))
bar = barpool.NewSingleProgress(fmt.Sprintf(nativeWithBackoffTpl, barPrefix), len(metrics)*len(ranges))
if p.disableRetries {
bar = pb.ProgressBarTemplate(nativeSingleProcessTpl).New(0)
bar = barpool.NewSingleProgress(nativeSingleProcessTpl, 0)
}
bar.Start()
defer bar.Finish()

View file

@ -227,9 +227,10 @@ func Test_vmNativeProcessor_run(t *testing.T) {
rateLimit: tt.fields.rateLimit,
interCluster: tt.fields.interCluster,
cc: tt.fields.cc,
isSilent: tt.args.silent,
}
if err := p.run(tt.args.ctx, tt.args.silent); (err != nil) != tt.wantErr {
if err := p.run(tt.args.ctx); (err != nil) != tt.wantErr {
t.Errorf("run() error = %v, wantErr %v", err, tt.wantErr)
}
deleted, err := deleteSeries(tt.fields.matchName, tt.fields.matchValue)

View file

@ -24,6 +24,7 @@ The following tip changes can be tested by building VictoriaMetrics components f
## tip
* FEATURE: [vmctl](https://docs.victoriametrics.com/vmctl.html): add verbose output for docker installations or when TTY isn't available. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4081).
* BUGFIX: [vmalert](https://docs.victoriametrics.com/vmalert.html): fix nil map assignment panic in runtime introduced in this [change](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/4341).
* BUGFIX: add the following command-line flags, which can be used for limiting Graphite API calls:
`--search.maxGraphiteTagKeys` for limiting the number of tag keys returned from Graphite `/tags`, `/tags/autoComplete/*`, `/tags/findSeries` API.