From e313e57090b82b4475d458b88b53770103016dbd Mon Sep 17 00:00:00 2001
From: Roman Khavronenko <hagen1778@gmail.com>
Date: Mon, 3 Jan 2022 21:12:01 +0200
Subject: [PATCH] vmctl: improve logging during import cancels/errors (#2006)

On import process interruption `vmctl` now prints the max and min timestamps of:
* last failed batch if import ended with error;
* last sent batch if import was cancelled by user.

To get more details for each timeseries in batch user needs to specify `--verbose` flag.

The change does not relate to `vm-native` mode, since `vmctl` has no control over
transferred data in this mode.

https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1236
Signed-off-by: hagen1778 <roman@victoriametrics.com>
---
 app/vmctl/flags.go      |  8 +++++++-
 app/vmctl/influx.go     |  8 +++++---
 app/vmctl/main.go       | 23 +++++++++++++++--------
 app/vmctl/opentsdb.go   |  8 +++++---
 app/vmctl/prometheus.go |  8 +++++---
 app/vmctl/utils.go      | 26 ++++++++++++++++++++++----
 app/vmctl/vm/vm.go      | 15 +++++++++------
 7 files changed, 68 insertions(+), 28 deletions(-)

diff --git a/app/vmctl/flags.go b/app/vmctl/flags.go
index eae82bdcb6..d1d1d6acbb 100644
--- a/app/vmctl/flags.go
+++ b/app/vmctl/flags.go
@@ -7,7 +7,8 @@ import (
 )
 
 const (
-	globalSilent = "s"
+	globalSilent  = "s"
+	globalVerbose = "verbose"
 )
 
 var (
@@ -17,6 +18,11 @@ var (
 			Value: false,
 			Usage: "Whether to run in silent mode. If set to true no confirmation prompts will appear.",
 		},
+		&cli.BoolFlag{
+			Name:  globalVerbose,
+			Value: false,
+			Usage: "Whether to enable verbosity in logs output.",
+		},
 	}
 )
 
diff --git a/app/vmctl/influx.go b/app/vmctl/influx.go
index ececc59c6d..1118ba6063 100644
--- a/app/vmctl/influx.go
+++ b/app/vmctl/influx.go
@@ -30,7 +30,7 @@ func newInfluxProcessor(ic *influx.Client, im *vm.Importer, cc int, separator st
 	}
 }
 
-func (ip *influxProcessor) run(silent bool) error {
+func (ip *influxProcessor) run(silent, verbose bool) error {
 	series, err := ip.ic.Explore()
 	if err != nil {
 		return fmt.Errorf("explore query failed: %s", err)
@@ -70,7 +70,7 @@ func (ip *influxProcessor) run(silent bool) error {
 		case infErr := <-errCh:
 			return fmt.Errorf("influx error: %s", infErr)
 		case vmErr := <-ip.im.Errors():
-			return fmt.Errorf("Import process failed: \n%s", wrapErr(vmErr))
+			return fmt.Errorf("import process failed: %s", wrapErr(vmErr, verbose))
 		case seriesCh <- s:
 		}
 	}
@@ -80,7 +80,9 @@ func (ip *influxProcessor) run(silent bool) error {
 	ip.im.Close()
 	// drain import errors channel
 	for vmErr := range ip.im.Errors() {
-		return fmt.Errorf("Import process failed: \n%s", wrapErr(vmErr))
+		if vmErr.Err != nil {
+			return fmt.Errorf("import process failed: %s", wrapErr(vmErr, verbose))
+		}
 	}
 	bar.Finish()
 	log.Println("Import finished!")
diff --git a/app/vmctl/main.go b/app/vmctl/main.go
index de146e3db6..2996561634 100644
--- a/app/vmctl/main.go
+++ b/app/vmctl/main.go
@@ -18,6 +18,11 @@ import (
 )
 
 func main() {
+	var (
+		err      error
+		importer *vm.Importer
+	)
+
 	start := time.Now()
 	app := &cli.App{
 		Name:    "vmctl",
@@ -53,7 +58,7 @@ func main() {
 					}
 
 					otsdbProcessor := newOtsdbProcessor(otsdbClient, importer, c.Int(otsdbConcurrency))
-					return otsdbProcessor.run(c.Bool(globalSilent))
+					return otsdbProcessor.run(c.Bool(globalSilent), c.Bool(globalVerbose))
 				},
 			},
 			{
@@ -82,14 +87,14 @@ func main() {
 					}
 
 					vmCfg := initConfigVM(c)
-					importer, err := vm.NewImporter(vmCfg)
+					importer, err = vm.NewImporter(vmCfg)
 					if err != nil {
 						return fmt.Errorf("failed to create VM importer: %s", err)
 					}
 
 					processor := newInfluxProcessor(influxClient, importer,
 						c.Int(influxConcurrency), c.String(influxMeasurementFieldSeparator))
-					return processor.run(c.Bool(globalSilent))
+					return processor.run(c.Bool(globalSilent), c.Bool(globalVerbose))
 				},
 			},
 			{
@@ -100,7 +105,7 @@ func main() {
 					fmt.Println("Prometheus import mode")
 
 					vmCfg := initConfigVM(c)
-					importer, err := vm.NewImporter(vmCfg)
+					importer, err = vm.NewImporter(vmCfg)
 					if err != nil {
 						return fmt.Errorf("failed to create VM importer: %s", err)
 					}
@@ -123,7 +128,7 @@ func main() {
 						im: importer,
 						cc: c.Int(promConcurrency),
 					}
-					return pp.run(c.Bool(globalSilent))
+					return pp.run(c.Bool(globalSilent), c.Bool(globalVerbose))
 				},
 			},
 			{
@@ -166,12 +171,14 @@ func main() {
 	go func() {
 		<-c
 		fmt.Println("\r- Execution cancelled")
-		os.Exit(0)
+		if importer != nil {
+			importer.Close()
+		}
 	}()
 
-	err := app.Run(os.Args)
+	err = app.Run(os.Args)
 	if err != nil {
-		log.Fatal(err)
+		log.Println(err)
 	}
 	log.Printf("Total time: %v", time.Since(start))
 }
diff --git a/app/vmctl/opentsdb.go b/app/vmctl/opentsdb.go
index 95faa490d7..2018c9a63b 100644
--- a/app/vmctl/opentsdb.go
+++ b/app/vmctl/opentsdb.go
@@ -35,7 +35,7 @@ func newOtsdbProcessor(oc *opentsdb.Client, im *vm.Importer, otsdbcc int) *otsdb
 	}
 }
 
-func (op *otsdbProcessor) run(silent bool) error {
+func (op *otsdbProcessor) run(silent, verbose bool) error {
 	log.Println("Loading all metrics from OpenTSDB for filters: ", op.oc.Filters)
 	var metrics []string
 	for _, filter := range op.oc.Filters {
@@ -111,7 +111,7 @@ func (op *otsdbProcessor) run(silent bool) error {
 					case otsdbErr := <-errCh:
 						return fmt.Errorf("opentsdb error: %s", otsdbErr)
 					case vmErr := <-op.im.Errors():
-						return fmt.Errorf("Import process failed: \n%s", wrapErr(vmErr))
+						return fmt.Errorf("import process failed: %s", wrapErr(vmErr, verbose))
 					case seriesCh <- queryObj{
 						Tr: tr, StartTime: startTime,
 						Series: series, Rt: opentsdb.RetentionMeta{
@@ -133,7 +133,9 @@ func (op *otsdbProcessor) run(silent bool) error {
 	}
 	op.im.Close()
 	for vmErr := range op.im.Errors() {
-		return fmt.Errorf("Import process failed: \n%s", wrapErr(vmErr))
+		if vmErr.Err != nil {
+			return fmt.Errorf("import process failed: %s", wrapErr(vmErr, verbose))
+		}
 	}
 	log.Println("Import finished!")
 	log.Print(op.im.Stats())
diff --git a/app/vmctl/prometheus.go b/app/vmctl/prometheus.go
index cb0c02b828..c417204e53 100644
--- a/app/vmctl/prometheus.go
+++ b/app/vmctl/prometheus.go
@@ -25,7 +25,7 @@ type prometheusProcessor struct {
 	cc int
 }
 
-func (pp *prometheusProcessor) run(silent bool) error {
+func (pp *prometheusProcessor) run(silent, verbose bool) error {
 	blocks, err := pp.cl.Explore()
 	if err != nil {
 		return fmt.Errorf("explore failed: %s", err)
@@ -66,7 +66,7 @@ func (pp *prometheusProcessor) run(silent bool) error {
 			return fmt.Errorf("prometheus error: %s", promErr)
 		case vmErr := <-pp.im.Errors():
 			close(blockReadersCh)
-			return fmt.Errorf("Import process failed: \n%s", wrapErr(vmErr))
+			return fmt.Errorf("import process failed: %s", wrapErr(vmErr, verbose))
 		case blockReadersCh <- br:
 		}
 	}
@@ -77,7 +77,9 @@ func (pp *prometheusProcessor) run(silent bool) error {
 	pp.im.Close()
 	// drain import errors channel
 	for vmErr := range pp.im.Errors() {
-		return fmt.Errorf("Import process failed: \n%s", wrapErr(vmErr))
+		if vmErr.Err != nil {
+			return fmt.Errorf("import process failed: %s", wrapErr(vmErr, verbose))
+		}
 	}
 	bar.Finish()
 	log.Println("Import finished!")
diff --git a/app/vmctl/utils.go b/app/vmctl/utils.go
index 678e70635c..522169227d 100644
--- a/app/vmctl/utils.go
+++ b/app/vmctl/utils.go
@@ -23,11 +23,29 @@ func prompt(question string) bool {
 	return false
 }
 
-func wrapErr(vmErr *vm.ImportError) error {
+func wrapErr(vmErr *vm.ImportError, verbose bool) error {
 	var errTS string
+	var maxTS, minTS int64
 	for _, ts := range vmErr.Batch {
-		errTS += fmt.Sprintf("%s for timestamps range %d - %d\n",
-			ts.String(), ts.Timestamps[0], ts.Timestamps[len(ts.Timestamps)-1])
+		if minTS < ts.Timestamps[0] || minTS == 0 {
+			minTS = ts.Timestamps[0]
+		}
+		if maxTS < ts.Timestamps[len(ts.Timestamps)-1] {
+			maxTS = ts.Timestamps[len(ts.Timestamps)-1]
+		}
+		if verbose {
+			errTS += fmt.Sprintf("%s for timestamps range %d - %d\n",
+				ts.String(), ts.Timestamps[0], ts.Timestamps[len(ts.Timestamps)-1])
+		}
 	}
-	return fmt.Errorf("%s with error: %s", errTS, vmErr.Err)
+	var verboseMsg string
+	if !verbose {
+		verboseMsg = "(enable `--verbose` output to get more details)"
+	}
+	if vmErr.Err == nil {
+		return fmt.Errorf("%s\n\tLatest delivered batch for timestamps range %d - %d %s\n%s",
+			vmErr.Err, minTS, maxTS, verboseMsg, errTS)
+	}
+	return fmt.Errorf("%s\n\tImporting batch failed for timestamps range %d - %d %s\n%s",
+		vmErr.Err, minTS, maxTS, verboseMsg, errTS)
 }
diff --git a/app/vmctl/vm/vm.go b/app/vmctl/vm/vm.go
index 90ec236977..ff15e776c6 100644
--- a/app/vmctl/vm/vm.go
+++ b/app/vmctl/vm/vm.go
@@ -149,9 +149,11 @@ func NewImporter(cfg Config) (*Importer, error) {
 // ImportError is type of error generated
 // in case of unsuccessful import request
 type ImportError struct {
-	// The batch of timeseries that failed
+	// The batch of timeseries processed by importer at the moment
 	Batch []*TimeSeries
 	// The error that appeared during insert
+	// If err is nil - no error happened and Batch
+	// Is the latest delivered Batch.
 	Err error
 }
 
@@ -180,12 +182,13 @@ func (im *Importer) startWorker(batchSize, significantFigures, roundDigits int)
 	for {
 		select {
 		case <-im.close:
-			if err := im.Import(batch); err != nil {
-				im.errors <- &ImportError{
-					Batch: batch,
-					Err:   err,
-				}
+			exitErr := &ImportError{
+				Batch: batch,
 			}
+			if err := im.Import(batch); err != nil {
+				exitErr.Err = err
+			}
+			im.errors <- exitErr
 			return
 		case ts := <-im.input:
 			// init waitForBatch when first