vmctl: fixed blocking when aborting import process (#2509)

vmctl: fix vmctl blocking on process interrupt

This change prevents vmctl from indefinite blocking on
receiving the interrupt signal. The update touches all
import modes and suppose to improve tool reliability.

https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2491
This commit is contained in:
Dmytro Kozlov 2022-05-03 08:03:41 +03:00 committed by GitHub
parent aa02719d86
commit 488c34f5e1
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 310 additions and 14 deletions

View file

@ -82,12 +82,16 @@ func (ip *influxProcessor) run(silent, verbose bool) error {
close(seriesCh)
wg.Wait()
ip.im.Close()
close(errCh)
// drain import errors channel
for vmErr := range ip.im.Errors() {
if vmErr.Err != nil {
return fmt.Errorf("import process failed: %s", wrapErr(vmErr, verbose))
}
}
for err := range errCh {
return fmt.Errorf("import process failed: %s", err)
}
barpool.Stop()
log.Println("Import finished!")
log.Print(ip.im.Stats())
@ -142,11 +146,14 @@ func (ip *influxProcessor) do(s *influx.Series) error {
if len(time) < 1 {
continue
}
ip.im.Input() <- &vm.TimeSeries{
ts := vm.TimeSeries{
Name: name,
LabelPairs: labels,
Timestamps: time,
Values: values,
}
if err := ip.im.Input(&ts); err != nil {
return err
}
}
}

View file

@ -1,6 +1,7 @@
package main
import (
"context"
"fmt"
"log"
"os"
@ -26,6 +27,7 @@ func main() {
importer *vm.Importer
)
ctx, cancelCtx := context.WithCancel(context.Background())
start := time.Now()
app := &cli.App{
Name: "vmctl",
@ -98,8 +100,11 @@ func main() {
return fmt.Errorf("failed to create VM importer: %s", err)
}
processor := newInfluxProcessor(influxClient, importer,
c.Int(influxConcurrency), c.String(influxMeasurementFieldSeparator))
processor := newInfluxProcessor(
influxClient,
importer,
c.Int(influxConcurrency),
c.String(influxMeasurementFieldSeparator))
return processor.run(c.Bool(globalSilent), c.Bool(globalVerbose))
},
},
@ -167,7 +172,7 @@ func main() {
extraLabels: c.StringSlice(vmExtraLabel),
},
}
return p.run()
return p.run(ctx)
},
},
{
@ -214,6 +219,7 @@ func main() {
if importer != nil {
importer.Close()
}
cancelCtx()
}()
err = app.Run(os.Args)

View file

@ -120,6 +120,7 @@ func (op *otsdbProcessor) run(silent, verbose bool) error {
}
}
}
// Drain channels per metric
close(seriesCh)
wg.Wait()
@ -156,11 +157,14 @@ func (op *otsdbProcessor) do(s queryObj) error {
for k, v := range data.Tags {
labels = append(labels, vm.LabelPair{Name: k, Value: v})
}
op.im.Input() <- &vm.TimeSeries{
ts := vm.TimeSeries{
Name: data.Metric,
LabelPairs: labels,
Timestamps: data.Timestamps,
Values: data.Values,
}
if err := op.im.Input(&ts); err != nil {
return err
}
return nil
}

View file

@ -62,7 +62,6 @@ func (pp *prometheusProcessor) run(silent, verbose bool) error {
}
}()
}
// any error breaks the import
for _, br := range blocks {
select {
@ -80,12 +79,16 @@ func (pp *prometheusProcessor) run(silent, verbose bool) error {
wg.Wait()
// wait for all buffers to flush
pp.im.Close()
close(errCh)
// drain import errors channel
for vmErr := range pp.im.Errors() {
if vmErr.Err != nil {
return fmt.Errorf("import process failed: %s", wrapErr(vmErr, verbose))
}
}
for err := range errCh {
return fmt.Errorf("import process failed: %s", err)
}
barpool.Stop()
log.Println("Import finished!")
log.Print(pp.im.Stats())
@ -127,12 +130,15 @@ func (pp *prometheusProcessor) do(b tsdb.BlockReader) error {
if err := it.Err(); err != nil {
return err
}
pp.im.Input() <- &vm.TimeSeries{
ts := vm.TimeSeries{
Name: name,
LabelPairs: labels,
Timestamps: timestamps,
Values: values,
}
if err := pp.im.Input(&ts); err != nil {
return err
}
}
return ss.Err()
}

View file

@ -0,0 +1,164 @@
package main
import (
"os"
"testing"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/prometheus"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/vm"
)
// If you want to run this test:
// 1. provide test snapshot path in const testSnapshot
// 2. define httpAddr const with your victoriametrics address
// 3. run victoria metrics with defined address
// 4. remove t.Skip() from Test_prometheusProcessor_run
// 5. run tests one by one not all at one time
const (
httpAddr = "http://127.0.0.1:8428/"
testSnapshot = "./testdata/20220427T130947Z-70ba49b1093fd0bf"
)
// This test simulates close process if user abort it
func Test_prometheusProcessor_run(t *testing.T) {
t.Skip()
type fields struct {
cfg prometheus.Config
vmCfg vm.Config
cl func(prometheus.Config) *prometheus.Client
im func(vm.Config) *vm.Importer
closer func(importer *vm.Importer)
cc int
}
type args struct {
silent bool
verbose bool
}
tests := []struct {
name string
fields fields
args args
wantErr bool
}{
{
name: "simulate syscall.SIGINT",
fields: fields{
cfg: prometheus.Config{
Snapshot: testSnapshot,
Filter: prometheus.Filter{},
},
cl: func(cfg prometheus.Config) *prometheus.Client {
client, err := prometheus.NewClient(cfg)
if err != nil {
t.Fatalf("error init prometeus client: %s", err)
}
return client
},
im: func(vmCfg vm.Config) *vm.Importer {
importer, err := vm.NewImporter(vmCfg)
if err != nil {
t.Fatalf("error init importer: %s", err)
}
return importer
},
closer: func(importer *vm.Importer) {
// simulate syscall.SIGINT
time.Sleep(time.Second * 5)
if importer != nil {
importer.Close()
}
},
vmCfg: vm.Config{Addr: httpAddr, Concurrency: 1},
cc: 2,
},
args: args{
silent: false,
verbose: false,
},
wantErr: true,
},
{
name: "simulate correct work",
fields: fields{
cfg: prometheus.Config{
Snapshot: testSnapshot,
Filter: prometheus.Filter{},
},
cl: func(cfg prometheus.Config) *prometheus.Client {
client, err := prometheus.NewClient(cfg)
if err != nil {
t.Fatalf("error init prometeus client: %s", err)
}
return client
},
im: func(vmCfg vm.Config) *vm.Importer {
importer, err := vm.NewImporter(vmCfg)
if err != nil {
t.Fatalf("error init importer: %s", err)
}
return importer
},
closer: nil,
vmCfg: vm.Config{Addr: httpAddr, Concurrency: 5},
cc: 2,
},
args: args{
silent: true,
verbose: false,
},
wantErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
client := tt.fields.cl(tt.fields.cfg)
importer := tt.fields.im(tt.fields.vmCfg)
pp := &prometheusProcessor{
cl: client,
im: importer,
cc: tt.fields.cc,
}
// we should answer on prompt
if !tt.args.silent {
input := []byte("Y\n")
r, w, err := os.Pipe()
if err != nil {
t.Fatal(err)
}
_, err = w.Write(input)
if err != nil {
t.Error(err)
}
err = w.Close()
if err != nil {
t.Error(err)
}
stdin := os.Stdin
// Restore stdin right after the test.
defer func() {
os.Stdin = stdin
_ = r.Close()
_ = w.Close()
}()
os.Stdin = r
}
// simulate close if needed
if tt.fields.closer != nil {
go tt.fields.closer(importer)
}
if err := pp.run(tt.args.silent, tt.args.verbose); (err != nil) != tt.wantErr {
t.Errorf("run() error = %v, wantErr %v", err, tt.wantErr)
}
})
}
}

View file

@ -182,7 +182,17 @@ func (im *Importer) Errors() chan *ImportError { return im.errors }
// Input returns a channel for sending timeseries
// that need to be imported
func (im *Importer) Input() chan<- *TimeSeries { return im.input }
func (im *Importer) Input(ts *TimeSeries) error {
select {
case im.input <- ts:
return nil
case err := <-im.errors:
if err != nil && err.Err != nil {
return err.Err
}
return fmt.Errorf("process aborted")
}
}
// Close sends signal to all goroutines to exit
// and waits until they are finished

View file

@ -1,6 +1,7 @@
package main
import (
"context"
"fmt"
"io"
"io/ioutil"
@ -51,25 +52,25 @@ const (
nativeBarTpl = `Total: {{counters . }} {{ cycle . "↖" "↗" "↘" "↙" }} Speed: {{speed . }} {{string . "suffix"}}`
)
func (p *vmNativeProcessor) run() error {
func (p *vmNativeProcessor) run(ctx context.Context) error {
pr, pw := io.Pipe()
fmt.Printf("Initing export pipe from %q with filters: %s\n", p.src.addr, p.filter)
exportReader, err := p.exportPipe()
exportReader, err := p.exportPipe(ctx)
if err != nil {
return fmt.Errorf("failed to init export pipe: %s", err)
}
sync := make(chan struct{})
nativeImportAddr, err := vm.AddExtraLabelsToImportPath(nativeImportAddr, p.dst.extraLabels)
if err != nil {
return err
}
sync := make(chan struct{})
go func() {
defer func() { close(sync) }()
u := fmt.Sprintf("%s/%s", p.dst.addr, nativeImportAddr)
req, err := http.NewRequest("POST", u, pr)
req, err := http.NewRequestWithContext(ctx, "POST", u, pr)
if err != nil {
log.Fatalf("cannot create import request to %q: %s", p.dst.addr, err)
}
@ -95,22 +96,25 @@ func (p *vmNativeProcessor) run() error {
rl := limiter.NewLimiter(p.rateLimit)
w = limiter.NewWriteLimiter(pw, rl)
}
_, err = io.Copy(w, barReader)
if err != nil {
return fmt.Errorf("failed to write into %q: %s", p.dst.addr, err)
}
if err := pw.Close(); err != nil {
return err
}
<-sync
barpool.Stop()
log.Println("Import finished!")
return nil
}
func (p *vmNativeProcessor) exportPipe() (io.ReadCloser, error) {
func (p *vmNativeProcessor) exportPipe(ctx context.Context) (io.ReadCloser, error) {
u := fmt.Sprintf("%s/%s", p.src.addr, nativeExportAddr)
req, err := http.NewRequest("GET", u, nil)
req, err := http.NewRequestWithContext(ctx, "GET", u, nil)
if err != nil {
return nil, fmt.Errorf("cannot create request to %q: %s", p.src.addr, err)
}

View file

@ -0,0 +1,95 @@
package main
import (
"context"
"testing"
"time"
)
// If you want to run this test:
// 1. run two instances of victoriametrics and define -httpListenAddr for both or just for second instance
// 2. define srcAddr and dstAddr const with your victoriametrics addresses
// 3. define matchFilter const with your importing data
// 4. define timeStartFilter
// 5. run each test one by one
const (
matchFilter = `{job="avalanche"}`
timeStartFilter = "2020-01-01T20:07:00Z"
srcAddr = "http://127.0.0.1:8428"
dstAddr = "http://127.0.0.1:8528"
)
// This test simulates close process if user abort it
func Test_vmNativeProcessor_run(t *testing.T) {
t.Skip()
type fields struct {
filter filter
rateLimit int64
dst *vmNativeClient
src *vmNativeClient
}
tests := []struct {
name string
fields fields
closer func(cancelFunc context.CancelFunc)
wantErr bool
}{
{
name: "simulate syscall.SIGINT",
fields: fields{
filter: filter{
match: matchFilter,
timeStart: timeStartFilter,
},
rateLimit: 0,
dst: &vmNativeClient{
addr: dstAddr,
},
src: &vmNativeClient{
addr: srcAddr,
},
},
closer: func(cancelFunc context.CancelFunc) {
time.Sleep(time.Second * 5)
cancelFunc()
},
wantErr: true,
},
{
name: "simulate correct work",
fields: fields{
filter: filter{
match: matchFilter,
timeStart: timeStartFilter,
},
rateLimit: 0,
dst: &vmNativeClient{
addr: dstAddr,
},
src: &vmNativeClient{
addr: srcAddr,
},
},
closer: func(cancelFunc context.CancelFunc) {},
wantErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ctx, cancelFn := context.WithCancel(context.Background())
p := &vmNativeProcessor{
filter: tt.fields.filter,
rateLimit: tt.fields.rateLimit,
dst: tt.fields.dst,
src: tt.fields.src,
}
tt.closer(cancelFn)
if err := p.run(ctx); (err != nil) != tt.wantErr {
t.Errorf("run() error = %v, wantErr %v", err, tt.wantErr)
}
})
}
}