diff --git a/app/vmctl/flags.go b/app/vmctl/flags.go index eae82bdcb..d1d1d6acb 100644 --- a/app/vmctl/flags.go +++ b/app/vmctl/flags.go @@ -7,7 +7,8 @@ import ( ) const ( - globalSilent = "s" + globalSilent = "s" + globalVerbose = "verbose" ) var ( @@ -17,6 +18,11 @@ var ( Value: false, Usage: "Whether to run in silent mode. If set to true no confirmation prompts will appear.", }, + &cli.BoolFlag{ + Name: globalVerbose, + Value: false, + Usage: "Whether to enable verbosity in logs output.", + }, } ) diff --git a/app/vmctl/influx.go b/app/vmctl/influx.go index ececc59c6..1118ba606 100644 --- a/app/vmctl/influx.go +++ b/app/vmctl/influx.go @@ -30,7 +30,7 @@ func newInfluxProcessor(ic *influx.Client, im *vm.Importer, cc int, separator st } } -func (ip *influxProcessor) run(silent bool) error { +func (ip *influxProcessor) run(silent, verbose bool) error { series, err := ip.ic.Explore() if err != nil { return fmt.Errorf("explore query failed: %s", err) @@ -70,7 +70,7 @@ func (ip *influxProcessor) run(silent bool) error { case infErr := <-errCh: return fmt.Errorf("influx error: %s", infErr) case vmErr := <-ip.im.Errors(): - return fmt.Errorf("Import process failed: \n%s", wrapErr(vmErr)) + return fmt.Errorf("import process failed: %s", wrapErr(vmErr, verbose)) case seriesCh <- s: } } @@ -80,7 +80,9 @@ func (ip *influxProcessor) run(silent bool) error { ip.im.Close() // drain import errors channel for vmErr := range ip.im.Errors() { - return fmt.Errorf("Import process failed: \n%s", wrapErr(vmErr)) + if vmErr.Err != nil { + return fmt.Errorf("import process failed: %s", wrapErr(vmErr, verbose)) + } } bar.Finish() log.Println("Import finished!") diff --git a/app/vmctl/main.go b/app/vmctl/main.go index de146e3db..299656163 100644 --- a/app/vmctl/main.go +++ b/app/vmctl/main.go @@ -18,6 +18,11 @@ import ( ) func main() { + var ( + err error + importer *vm.Importer + ) + start := time.Now() app := &cli.App{ Name: "vmctl", @@ -53,7 +58,7 @@ func main() { } otsdbProcessor := newOtsdbProcessor(otsdbClient, importer, c.Int(otsdbConcurrency)) - return otsdbProcessor.run(c.Bool(globalSilent)) + return otsdbProcessor.run(c.Bool(globalSilent), c.Bool(globalVerbose)) }, }, { @@ -82,14 +87,14 @@ func main() { } vmCfg := initConfigVM(c) - importer, err := vm.NewImporter(vmCfg) + importer, err = vm.NewImporter(vmCfg) if err != nil { return fmt.Errorf("failed to create VM importer: %s", err) } processor := newInfluxProcessor(influxClient, importer, c.Int(influxConcurrency), c.String(influxMeasurementFieldSeparator)) - return processor.run(c.Bool(globalSilent)) + return processor.run(c.Bool(globalSilent), c.Bool(globalVerbose)) }, }, { @@ -100,7 +105,7 @@ func main() { fmt.Println("Prometheus import mode") vmCfg := initConfigVM(c) - importer, err := vm.NewImporter(vmCfg) + importer, err = vm.NewImporter(vmCfg) if err != nil { return fmt.Errorf("failed to create VM importer: %s", err) } @@ -123,7 +128,7 @@ func main() { im: importer, cc: c.Int(promConcurrency), } - return pp.run(c.Bool(globalSilent)) + return pp.run(c.Bool(globalSilent), c.Bool(globalVerbose)) }, }, { @@ -166,12 +171,14 @@ func main() { go func() { <-c fmt.Println("\r- Execution cancelled") - os.Exit(0) + if importer != nil { + importer.Close() + } }() - err := app.Run(os.Args) + err = app.Run(os.Args) if err != nil { - log.Fatal(err) + log.Println(err) } log.Printf("Total time: %v", time.Since(start)) } diff --git a/app/vmctl/opentsdb.go b/app/vmctl/opentsdb.go index 95faa490d..2018c9a63 100644 --- a/app/vmctl/opentsdb.go +++ b/app/vmctl/opentsdb.go @@ -35,7 +35,7 @@ func newOtsdbProcessor(oc *opentsdb.Client, im *vm.Importer, otsdbcc int) *otsdb } } -func (op *otsdbProcessor) run(silent bool) error { +func (op *otsdbProcessor) run(silent, verbose bool) error { log.Println("Loading all metrics from OpenTSDB for filters: ", op.oc.Filters) var metrics []string for _, filter := range op.oc.Filters { @@ -111,7 +111,7 @@ func (op *otsdbProcessor) run(silent bool) error { case otsdbErr := <-errCh: return fmt.Errorf("opentsdb error: %s", otsdbErr) case vmErr := <-op.im.Errors(): - return fmt.Errorf("Import process failed: \n%s", wrapErr(vmErr)) + return fmt.Errorf("import process failed: %s", wrapErr(vmErr, verbose)) case seriesCh <- queryObj{ Tr: tr, StartTime: startTime, Series: series, Rt: opentsdb.RetentionMeta{ @@ -133,7 +133,9 @@ func (op *otsdbProcessor) run(silent bool) error { } op.im.Close() for vmErr := range op.im.Errors() { - return fmt.Errorf("Import process failed: \n%s", wrapErr(vmErr)) + if vmErr.Err != nil { + return fmt.Errorf("import process failed: %s", wrapErr(vmErr, verbose)) + } } log.Println("Import finished!") log.Print(op.im.Stats()) diff --git a/app/vmctl/prometheus.go b/app/vmctl/prometheus.go index cb0c02b82..c417204e5 100644 --- a/app/vmctl/prometheus.go +++ b/app/vmctl/prometheus.go @@ -25,7 +25,7 @@ type prometheusProcessor struct { cc int } -func (pp *prometheusProcessor) run(silent bool) error { +func (pp *prometheusProcessor) run(silent, verbose bool) error { blocks, err := pp.cl.Explore() if err != nil { return fmt.Errorf("explore failed: %s", err) @@ -66,7 +66,7 @@ func (pp *prometheusProcessor) run(silent bool) error { return fmt.Errorf("prometheus error: %s", promErr) case vmErr := <-pp.im.Errors(): close(blockReadersCh) - return fmt.Errorf("Import process failed: \n%s", wrapErr(vmErr)) + return fmt.Errorf("import process failed: %s", wrapErr(vmErr, verbose)) case blockReadersCh <- br: } } @@ -77,7 +77,9 @@ func (pp *prometheusProcessor) run(silent bool) error { pp.im.Close() // drain import errors channel for vmErr := range pp.im.Errors() { - return fmt.Errorf("Import process failed: \n%s", wrapErr(vmErr)) + if vmErr.Err != nil { + return fmt.Errorf("import process failed: %s", wrapErr(vmErr, verbose)) + } } bar.Finish() log.Println("Import finished!") diff --git a/app/vmctl/utils.go b/app/vmctl/utils.go index 678e70635..522169227 100644 --- a/app/vmctl/utils.go +++ b/app/vmctl/utils.go @@ -23,11 +23,29 @@ func prompt(question string) bool { return false } -func wrapErr(vmErr *vm.ImportError) error { +func wrapErr(vmErr *vm.ImportError, verbose bool) error { var errTS string + var maxTS, minTS int64 for _, ts := range vmErr.Batch { - errTS += fmt.Sprintf("%s for timestamps range %d - %d\n", - ts.String(), ts.Timestamps[0], ts.Timestamps[len(ts.Timestamps)-1]) + if minTS < ts.Timestamps[0] || minTS == 0 { + minTS = ts.Timestamps[0] + } + if maxTS < ts.Timestamps[len(ts.Timestamps)-1] { + maxTS = ts.Timestamps[len(ts.Timestamps)-1] + } + if verbose { + errTS += fmt.Sprintf("%s for timestamps range %d - %d\n", + ts.String(), ts.Timestamps[0], ts.Timestamps[len(ts.Timestamps)-1]) + } } - return fmt.Errorf("%s with error: %s", errTS, vmErr.Err) + var verboseMsg string + if !verbose { + verboseMsg = "(enable `--verbose` output to get more details)" + } + if vmErr.Err == nil { + return fmt.Errorf("%s\n\tLatest delivered batch for timestamps range %d - %d %s\n%s", + vmErr.Err, minTS, maxTS, verboseMsg, errTS) + } + return fmt.Errorf("%s\n\tImporting batch failed for timestamps range %d - %d %s\n%s", + vmErr.Err, minTS, maxTS, verboseMsg, errTS) } diff --git a/app/vmctl/vm/vm.go b/app/vmctl/vm/vm.go index 90ec23697..ff15e776c 100644 --- a/app/vmctl/vm/vm.go +++ b/app/vmctl/vm/vm.go @@ -149,9 +149,11 @@ func NewImporter(cfg Config) (*Importer, error) { // ImportError is type of error generated // in case of unsuccessful import request type ImportError struct { - // The batch of timeseries that failed + // The batch of timeseries processed by importer at the moment Batch []*TimeSeries // The error that appeared during insert + // If err is nil - no error happened and Batch + // Is the latest delivered Batch. Err error } @@ -180,12 +182,13 @@ func (im *Importer) startWorker(batchSize, significantFigures, roundDigits int) for { select { case <-im.close: - if err := im.Import(batch); err != nil { - im.errors <- &ImportError{ - Batch: batch, - Err: err, - } + exitErr := &ImportError{ + Batch: batch, } + if err := im.Import(batch); err != nil { + exitErr.Err = err + } + im.errors <- exitErr return case ts := <-im.input: // init waitForBatch when first