diff --git a/app/vmctl/influx.go b/app/vmctl/influx.go index b795d12e96..bcefc947d3 100644 --- a/app/vmctl/influx.go +++ b/app/vmctl/influx.go @@ -82,12 +82,16 @@ func (ip *influxProcessor) run(silent, verbose bool) error { close(seriesCh) wg.Wait() ip.im.Close() + close(errCh) // drain import errors channel for vmErr := range ip.im.Errors() { if vmErr.Err != nil { return fmt.Errorf("import process failed: %s", wrapErr(vmErr, verbose)) } } + for err := range errCh { + return fmt.Errorf("import process failed: %s", err) + } barpool.Stop() log.Println("Import finished!") log.Print(ip.im.Stats()) @@ -142,11 +146,14 @@ func (ip *influxProcessor) do(s *influx.Series) error { if len(time) < 1 { continue } - ip.im.Input() <- &vm.TimeSeries{ + ts := vm.TimeSeries{ Name: name, LabelPairs: labels, Timestamps: time, Values: values, } + if err := ip.im.Input(&ts); err != nil { + return err + } } } diff --git a/app/vmctl/main.go b/app/vmctl/main.go index 9b021371d1..eefe0f6bd2 100644 --- a/app/vmctl/main.go +++ b/app/vmctl/main.go @@ -1,6 +1,7 @@ package main import ( + "context" "fmt" "log" "os" @@ -26,6 +27,7 @@ func main() { importer *vm.Importer ) + ctx, cancelCtx := context.WithCancel(context.Background()) start := time.Now() app := &cli.App{ Name: "vmctl", @@ -98,8 +100,11 @@ func main() { return fmt.Errorf("failed to create VM importer: %s", err) } - processor := newInfluxProcessor(influxClient, importer, - c.Int(influxConcurrency), c.String(influxMeasurementFieldSeparator)) + processor := newInfluxProcessor( + influxClient, + importer, + c.Int(influxConcurrency), + c.String(influxMeasurementFieldSeparator)) return processor.run(c.Bool(globalSilent), c.Bool(globalVerbose)) }, }, @@ -167,7 +172,7 @@ func main() { extraLabels: c.StringSlice(vmExtraLabel), }, } - return p.run() + return p.run(ctx) }, }, { @@ -214,6 +219,7 @@ func main() { if importer != nil { importer.Close() } + cancelCtx() }() err = app.Run(os.Args) diff --git a/app/vmctl/opentsdb.go b/app/vmctl/opentsdb.go index f3532a4ca9..e31de67f0f 100644 --- a/app/vmctl/opentsdb.go +++ b/app/vmctl/opentsdb.go @@ -120,6 +120,7 @@ func (op *otsdbProcessor) run(silent, verbose bool) error { } } } + // Drain channels per metric close(seriesCh) wg.Wait() @@ -156,11 +157,14 @@ func (op *otsdbProcessor) do(s queryObj) error { for k, v := range data.Tags { labels = append(labels, vm.LabelPair{Name: k, Value: v}) } - op.im.Input() <- &vm.TimeSeries{ + ts := vm.TimeSeries{ Name: data.Metric, LabelPairs: labels, Timestamps: data.Timestamps, Values: data.Values, } + if err := op.im.Input(&ts); err != nil { + return err + } return nil } diff --git a/app/vmctl/prometheus.go b/app/vmctl/prometheus.go index 325e53eda6..b5e018c4a2 100644 --- a/app/vmctl/prometheus.go +++ b/app/vmctl/prometheus.go @@ -62,7 +62,6 @@ func (pp *prometheusProcessor) run(silent, verbose bool) error { } }() } - // any error breaks the import for _, br := range blocks { select { @@ -80,12 +79,16 @@ func (pp *prometheusProcessor) run(silent, verbose bool) error { wg.Wait() // wait for all buffers to flush pp.im.Close() + close(errCh) // drain import errors channel for vmErr := range pp.im.Errors() { if vmErr.Err != nil { return fmt.Errorf("import process failed: %s", wrapErr(vmErr, verbose)) } } + for err := range errCh { + return fmt.Errorf("import process failed: %s", err) + } barpool.Stop() log.Println("Import finished!") log.Print(pp.im.Stats()) @@ -127,12 +130,15 @@ func (pp *prometheusProcessor) do(b tsdb.BlockReader) error { if err := it.Err(); err != nil { return err } - pp.im.Input() <- &vm.TimeSeries{ + ts := vm.TimeSeries{ Name: name, LabelPairs: labels, Timestamps: timestamps, Values: values, } + if err := pp.im.Input(&ts); err != nil { + return err + } } return ss.Err() } diff --git a/app/vmctl/prometheus_test.go b/app/vmctl/prometheus_test.go new file mode 100644 index 0000000000..cf515ee108 --- /dev/null +++ b/app/vmctl/prometheus_test.go @@ -0,0 +1,164 @@ +package main + +import ( + "os" + "testing" + "time" + + "github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/prometheus" + "github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/vm" +) + +// If you want to run this test: +// 1. provide test snapshot path in const testSnapshot +// 2. define httpAddr const with your victoriametrics address +// 3. run victoria metrics with defined address +// 4. remove t.Skip() from Test_prometheusProcessor_run +// 5. run tests one by one not all at one time + +const ( + httpAddr = "http://127.0.0.1:8428/" + testSnapshot = "./testdata/20220427T130947Z-70ba49b1093fd0bf" +) + +// This test simulates close process if user abort it +func Test_prometheusProcessor_run(t *testing.T) { + t.Skip() + type fields struct { + cfg prometheus.Config + vmCfg vm.Config + cl func(prometheus.Config) *prometheus.Client + im func(vm.Config) *vm.Importer + closer func(importer *vm.Importer) + cc int + } + type args struct { + silent bool + verbose bool + } + + tests := []struct { + name string + fields fields + args args + wantErr bool + }{ + { + name: "simulate syscall.SIGINT", + fields: fields{ + cfg: prometheus.Config{ + Snapshot: testSnapshot, + Filter: prometheus.Filter{}, + }, + cl: func(cfg prometheus.Config) *prometheus.Client { + client, err := prometheus.NewClient(cfg) + if err != nil { + t.Fatalf("error init prometeus client: %s", err) + } + return client + }, + im: func(vmCfg vm.Config) *vm.Importer { + importer, err := vm.NewImporter(vmCfg) + if err != nil { + t.Fatalf("error init importer: %s", err) + } + return importer + }, + closer: func(importer *vm.Importer) { + // simulate syscall.SIGINT + time.Sleep(time.Second * 5) + if importer != nil { + importer.Close() + } + }, + vmCfg: vm.Config{Addr: httpAddr, Concurrency: 1}, + cc: 2, + }, + args: args{ + silent: false, + verbose: false, + }, + wantErr: true, + }, + { + name: "simulate correct work", + fields: fields{ + cfg: prometheus.Config{ + Snapshot: testSnapshot, + Filter: prometheus.Filter{}, + }, + cl: func(cfg prometheus.Config) *prometheus.Client { + client, err := prometheus.NewClient(cfg) + if err != nil { + t.Fatalf("error init prometeus client: %s", err) + } + return client + }, + im: func(vmCfg vm.Config) *vm.Importer { + importer, err := vm.NewImporter(vmCfg) + if err != nil { + t.Fatalf("error init importer: %s", err) + } + return importer + }, + closer: nil, + vmCfg: vm.Config{Addr: httpAddr, Concurrency: 5}, + cc: 2, + }, + args: args{ + silent: true, + verbose: false, + }, + wantErr: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + client := tt.fields.cl(tt.fields.cfg) + importer := tt.fields.im(tt.fields.vmCfg) + + pp := &prometheusProcessor{ + cl: client, + im: importer, + cc: tt.fields.cc, + } + + // we should answer on prompt + if !tt.args.silent { + input := []byte("Y\n") + + r, w, err := os.Pipe() + if err != nil { + t.Fatal(err) + } + + _, err = w.Write(input) + if err != nil { + t.Error(err) + } + err = w.Close() + if err != nil { + t.Error(err) + } + + stdin := os.Stdin + // Restore stdin right after the test. + defer func() { + os.Stdin = stdin + _ = r.Close() + _ = w.Close() + }() + os.Stdin = r + } + + // simulate close if needed + if tt.fields.closer != nil { + go tt.fields.closer(importer) + } + + if err := pp.run(tt.args.silent, tt.args.verbose); (err != nil) != tt.wantErr { + t.Errorf("run() error = %v, wantErr %v", err, tt.wantErr) + } + }) + } +} diff --git a/app/vmctl/vm/vm.go b/app/vmctl/vm/vm.go index 3df1bed97f..8809db7a23 100644 --- a/app/vmctl/vm/vm.go +++ b/app/vmctl/vm/vm.go @@ -182,7 +182,17 @@ func (im *Importer) Errors() chan *ImportError { return im.errors } // Input returns a channel for sending timeseries // that need to be imported -func (im *Importer) Input() chan<- *TimeSeries { return im.input } +func (im *Importer) Input(ts *TimeSeries) error { + select { + case im.input <- ts: + return nil + case err := <-im.errors: + if err != nil && err.Err != nil { + return err.Err + } + return fmt.Errorf("process aborted") + } +} // Close sends signal to all goroutines to exit // and waits until they are finished diff --git a/app/vmctl/vm_native.go b/app/vmctl/vm_native.go index b04e8bd7a1..88d687e3a4 100644 --- a/app/vmctl/vm_native.go +++ b/app/vmctl/vm_native.go @@ -1,6 +1,7 @@ package main import ( + "context" "fmt" "io" "io/ioutil" @@ -51,25 +52,25 @@ const ( nativeBarTpl = `Total: {{counters . }} {{ cycle . "↖" "↗" "↘" "↙" }} Speed: {{speed . }} {{string . "suffix"}}` ) -func (p *vmNativeProcessor) run() error { +func (p *vmNativeProcessor) run(ctx context.Context) error { pr, pw := io.Pipe() fmt.Printf("Initing export pipe from %q with filters: %s\n", p.src.addr, p.filter) - exportReader, err := p.exportPipe() + exportReader, err := p.exportPipe(ctx) if err != nil { return fmt.Errorf("failed to init export pipe: %s", err) } - sync := make(chan struct{}) nativeImportAddr, err := vm.AddExtraLabelsToImportPath(nativeImportAddr, p.dst.extraLabels) if err != nil { return err } + sync := make(chan struct{}) go func() { defer func() { close(sync) }() u := fmt.Sprintf("%s/%s", p.dst.addr, nativeImportAddr) - req, err := http.NewRequest("POST", u, pr) + req, err := http.NewRequestWithContext(ctx, "POST", u, pr) if err != nil { log.Fatalf("cannot create import request to %q: %s", p.dst.addr, err) } @@ -95,22 +96,25 @@ func (p *vmNativeProcessor) run() error { rl := limiter.NewLimiter(p.rateLimit) w = limiter.NewWriteLimiter(pw, rl) } + _, err = io.Copy(w, barReader) if err != nil { return fmt.Errorf("failed to write into %q: %s", p.dst.addr, err) } + if err := pw.Close(); err != nil { return err } <-sync barpool.Stop() + log.Println("Import finished!") return nil } -func (p *vmNativeProcessor) exportPipe() (io.ReadCloser, error) { +func (p *vmNativeProcessor) exportPipe(ctx context.Context) (io.ReadCloser, error) { u := fmt.Sprintf("%s/%s", p.src.addr, nativeExportAddr) - req, err := http.NewRequest("GET", u, nil) + req, err := http.NewRequestWithContext(ctx, "GET", u, nil) if err != nil { return nil, fmt.Errorf("cannot create request to %q: %s", p.src.addr, err) } diff --git a/app/vmctl/vm_native_test.go b/app/vmctl/vm_native_test.go new file mode 100644 index 0000000000..3e229bcbcd --- /dev/null +++ b/app/vmctl/vm_native_test.go @@ -0,0 +1,95 @@ +package main + +import ( + "context" + "testing" + "time" +) + +// If you want to run this test: +// 1. run two instances of victoriametrics and define -httpListenAddr for both or just for second instance +// 2. define srcAddr and dstAddr const with your victoriametrics addresses +// 3. define matchFilter const with your importing data +// 4. define timeStartFilter +// 5. run each test one by one + +const ( + matchFilter = `{job="avalanche"}` + timeStartFilter = "2020-01-01T20:07:00Z" + srcAddr = "http://127.0.0.1:8428" + dstAddr = "http://127.0.0.1:8528" +) + +// This test simulates close process if user abort it +func Test_vmNativeProcessor_run(t *testing.T) { + t.Skip() + type fields struct { + filter filter + rateLimit int64 + dst *vmNativeClient + src *vmNativeClient + } + tests := []struct { + name string + fields fields + closer func(cancelFunc context.CancelFunc) + wantErr bool + }{ + { + name: "simulate syscall.SIGINT", + fields: fields{ + filter: filter{ + match: matchFilter, + timeStart: timeStartFilter, + }, + rateLimit: 0, + dst: &vmNativeClient{ + addr: dstAddr, + }, + src: &vmNativeClient{ + addr: srcAddr, + }, + }, + closer: func(cancelFunc context.CancelFunc) { + time.Sleep(time.Second * 5) + cancelFunc() + }, + wantErr: true, + }, + { + name: "simulate correct work", + fields: fields{ + filter: filter{ + match: matchFilter, + timeStart: timeStartFilter, + }, + rateLimit: 0, + dst: &vmNativeClient{ + addr: dstAddr, + }, + src: &vmNativeClient{ + addr: srcAddr, + }, + }, + closer: func(cancelFunc context.CancelFunc) {}, + wantErr: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctx, cancelFn := context.WithCancel(context.Background()) + p := &vmNativeProcessor{ + filter: tt.fields.filter, + rateLimit: tt.fields.rateLimit, + dst: tt.fields.dst, + src: tt.fields.src, + } + + tt.closer(cancelFn) + + if err := p.run(ctx); (err != nil) != tt.wantErr { + t.Errorf("run() error = %v, wantErr %v", err, tt.wantErr) + } + }) + } +}