vmctl: fixed blocking when aborting import process (#2509)

vmctl: fix vmctl blocking on process interrupt

This change prevents vmctl from indefinite blocking on
receiving the interrupt signal. The update touches all
import modes and suppose to improve tool reliability.

https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2491
This commit is contained in:
Dmytro Kozlov 2022-05-03 08:03:41 +03:00 committed by Aliaksandr Valialkin
parent d384997657
commit b9ff61811d
No known key found for this signature in database
GPG key ID: A72BEC6CD3D0DED1
8 changed files with 310 additions and 14 deletions

View file

@ -82,12 +82,16 @@ func (ip *influxProcessor) run(silent, verbose bool) error {
close(seriesCh) close(seriesCh)
wg.Wait() wg.Wait()
ip.im.Close() ip.im.Close()
close(errCh)
// drain import errors channel // drain import errors channel
for vmErr := range ip.im.Errors() { for vmErr := range ip.im.Errors() {
if vmErr.Err != nil { if vmErr.Err != nil {
return fmt.Errorf("import process failed: %s", wrapErr(vmErr, verbose)) return fmt.Errorf("import process failed: %s", wrapErr(vmErr, verbose))
} }
} }
for err := range errCh {
return fmt.Errorf("import process failed: %s", err)
}
barpool.Stop() barpool.Stop()
log.Println("Import finished!") log.Println("Import finished!")
log.Print(ip.im.Stats()) log.Print(ip.im.Stats())
@ -142,11 +146,14 @@ func (ip *influxProcessor) do(s *influx.Series) error {
if len(time) < 1 { if len(time) < 1 {
continue continue
} }
ip.im.Input() <- &vm.TimeSeries{ ts := vm.TimeSeries{
Name: name, Name: name,
LabelPairs: labels, LabelPairs: labels,
Timestamps: time, Timestamps: time,
Values: values, Values: values,
} }
if err := ip.im.Input(&ts); err != nil {
return err
}
} }
} }

View file

@ -1,6 +1,7 @@
package main package main
import ( import (
"context"
"fmt" "fmt"
"log" "log"
"os" "os"
@ -26,6 +27,7 @@ func main() {
importer *vm.Importer importer *vm.Importer
) )
ctx, cancelCtx := context.WithCancel(context.Background())
start := time.Now() start := time.Now()
app := &cli.App{ app := &cli.App{
Name: "vmctl", Name: "vmctl",
@ -98,8 +100,11 @@ func main() {
return fmt.Errorf("failed to create VM importer: %s", err) return fmt.Errorf("failed to create VM importer: %s", err)
} }
processor := newInfluxProcessor(influxClient, importer, processor := newInfluxProcessor(
c.Int(influxConcurrency), c.String(influxMeasurementFieldSeparator)) influxClient,
importer,
c.Int(influxConcurrency),
c.String(influxMeasurementFieldSeparator))
return processor.run(c.Bool(globalSilent), c.Bool(globalVerbose)) return processor.run(c.Bool(globalSilent), c.Bool(globalVerbose))
}, },
}, },
@ -167,7 +172,7 @@ func main() {
extraLabels: c.StringSlice(vmExtraLabel), extraLabels: c.StringSlice(vmExtraLabel),
}, },
} }
return p.run() return p.run(ctx)
}, },
}, },
{ {
@ -214,6 +219,7 @@ func main() {
if importer != nil { if importer != nil {
importer.Close() importer.Close()
} }
cancelCtx()
}() }()
err = app.Run(os.Args) err = app.Run(os.Args)

View file

@ -120,6 +120,7 @@ func (op *otsdbProcessor) run(silent, verbose bool) error {
} }
} }
} }
// Drain channels per metric // Drain channels per metric
close(seriesCh) close(seriesCh)
wg.Wait() wg.Wait()
@ -156,11 +157,14 @@ func (op *otsdbProcessor) do(s queryObj) error {
for k, v := range data.Tags { for k, v := range data.Tags {
labels = append(labels, vm.LabelPair{Name: k, Value: v}) labels = append(labels, vm.LabelPair{Name: k, Value: v})
} }
op.im.Input() <- &vm.TimeSeries{ ts := vm.TimeSeries{
Name: data.Metric, Name: data.Metric,
LabelPairs: labels, LabelPairs: labels,
Timestamps: data.Timestamps, Timestamps: data.Timestamps,
Values: data.Values, Values: data.Values,
} }
if err := op.im.Input(&ts); err != nil {
return err
}
return nil return nil
} }

View file

@ -62,7 +62,6 @@ func (pp *prometheusProcessor) run(silent, verbose bool) error {
} }
}() }()
} }
// any error breaks the import // any error breaks the import
for _, br := range blocks { for _, br := range blocks {
select { select {
@ -80,12 +79,16 @@ func (pp *prometheusProcessor) run(silent, verbose bool) error {
wg.Wait() wg.Wait()
// wait for all buffers to flush // wait for all buffers to flush
pp.im.Close() pp.im.Close()
close(errCh)
// drain import errors channel // drain import errors channel
for vmErr := range pp.im.Errors() { for vmErr := range pp.im.Errors() {
if vmErr.Err != nil { if vmErr.Err != nil {
return fmt.Errorf("import process failed: %s", wrapErr(vmErr, verbose)) return fmt.Errorf("import process failed: %s", wrapErr(vmErr, verbose))
} }
} }
for err := range errCh {
return fmt.Errorf("import process failed: %s", err)
}
barpool.Stop() barpool.Stop()
log.Println("Import finished!") log.Println("Import finished!")
log.Print(pp.im.Stats()) log.Print(pp.im.Stats())
@ -127,12 +130,15 @@ func (pp *prometheusProcessor) do(b tsdb.BlockReader) error {
if err := it.Err(); err != nil { if err := it.Err(); err != nil {
return err return err
} }
pp.im.Input() <- &vm.TimeSeries{ ts := vm.TimeSeries{
Name: name, Name: name,
LabelPairs: labels, LabelPairs: labels,
Timestamps: timestamps, Timestamps: timestamps,
Values: values, Values: values,
} }
if err := pp.im.Input(&ts); err != nil {
return err
}
} }
return ss.Err() return ss.Err()
} }

View file

@ -0,0 +1,164 @@
package main
import (
"os"
"testing"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/prometheus"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/vm"
)
// If you want to run this test:
// 1. provide test snapshot path in const testSnapshot
// 2. define httpAddr const with your victoriametrics address
// 3. run victoria metrics with defined address
// 4. remove t.Skip() from Test_prometheusProcessor_run
// 5. run tests one by one not all at one time
const (
httpAddr = "http://127.0.0.1:8428/"
testSnapshot = "./testdata/20220427T130947Z-70ba49b1093fd0bf"
)
// This test simulates close process if user abort it
func Test_prometheusProcessor_run(t *testing.T) {
t.Skip()
type fields struct {
cfg prometheus.Config
vmCfg vm.Config
cl func(prometheus.Config) *prometheus.Client
im func(vm.Config) *vm.Importer
closer func(importer *vm.Importer)
cc int
}
type args struct {
silent bool
verbose bool
}
tests := []struct {
name string
fields fields
args args
wantErr bool
}{
{
name: "simulate syscall.SIGINT",
fields: fields{
cfg: prometheus.Config{
Snapshot: testSnapshot,
Filter: prometheus.Filter{},
},
cl: func(cfg prometheus.Config) *prometheus.Client {
client, err := prometheus.NewClient(cfg)
if err != nil {
t.Fatalf("error init prometeus client: %s", err)
}
return client
},
im: func(vmCfg vm.Config) *vm.Importer {
importer, err := vm.NewImporter(vmCfg)
if err != nil {
t.Fatalf("error init importer: %s", err)
}
return importer
},
closer: func(importer *vm.Importer) {
// simulate syscall.SIGINT
time.Sleep(time.Second * 5)
if importer != nil {
importer.Close()
}
},
vmCfg: vm.Config{Addr: httpAddr, Concurrency: 1},
cc: 2,
},
args: args{
silent: false,
verbose: false,
},
wantErr: true,
},
{
name: "simulate correct work",
fields: fields{
cfg: prometheus.Config{
Snapshot: testSnapshot,
Filter: prometheus.Filter{},
},
cl: func(cfg prometheus.Config) *prometheus.Client {
client, err := prometheus.NewClient(cfg)
if err != nil {
t.Fatalf("error init prometeus client: %s", err)
}
return client
},
im: func(vmCfg vm.Config) *vm.Importer {
importer, err := vm.NewImporter(vmCfg)
if err != nil {
t.Fatalf("error init importer: %s", err)
}
return importer
},
closer: nil,
vmCfg: vm.Config{Addr: httpAddr, Concurrency: 5},
cc: 2,
},
args: args{
silent: true,
verbose: false,
},
wantErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
client := tt.fields.cl(tt.fields.cfg)
importer := tt.fields.im(tt.fields.vmCfg)
pp := &prometheusProcessor{
cl: client,
im: importer,
cc: tt.fields.cc,
}
// we should answer on prompt
if !tt.args.silent {
input := []byte("Y\n")
r, w, err := os.Pipe()
if err != nil {
t.Fatal(err)
}
_, err = w.Write(input)
if err != nil {
t.Error(err)
}
err = w.Close()
if err != nil {
t.Error(err)
}
stdin := os.Stdin
// Restore stdin right after the test.
defer func() {
os.Stdin = stdin
_ = r.Close()
_ = w.Close()
}()
os.Stdin = r
}
// simulate close if needed
if tt.fields.closer != nil {
go tt.fields.closer(importer)
}
if err := pp.run(tt.args.silent, tt.args.verbose); (err != nil) != tt.wantErr {
t.Errorf("run() error = %v, wantErr %v", err, tt.wantErr)
}
})
}
}

View file

@ -182,7 +182,17 @@ func (im *Importer) Errors() chan *ImportError { return im.errors }
// Input returns a channel for sending timeseries // Input returns a channel for sending timeseries
// that need to be imported // that need to be imported
func (im *Importer) Input() chan<- *TimeSeries { return im.input } func (im *Importer) Input(ts *TimeSeries) error {
select {
case im.input <- ts:
return nil
case err := <-im.errors:
if err != nil && err.Err != nil {
return err.Err
}
return fmt.Errorf("process aborted")
}
}
// Close sends signal to all goroutines to exit // Close sends signal to all goroutines to exit
// and waits until they are finished // and waits until they are finished

View file

@ -1,6 +1,7 @@
package main package main
import ( import (
"context"
"fmt" "fmt"
"io" "io"
"io/ioutil" "io/ioutil"
@ -51,25 +52,25 @@ const (
nativeBarTpl = `Total: {{counters . }} {{ cycle . "↖" "↗" "↘" "↙" }} Speed: {{speed . }} {{string . "suffix"}}` nativeBarTpl = `Total: {{counters . }} {{ cycle . "↖" "↗" "↘" "↙" }} Speed: {{speed . }} {{string . "suffix"}}`
) )
func (p *vmNativeProcessor) run() error { func (p *vmNativeProcessor) run(ctx context.Context) error {
pr, pw := io.Pipe() pr, pw := io.Pipe()
fmt.Printf("Initing export pipe from %q with filters: %s\n", p.src.addr, p.filter) fmt.Printf("Initing export pipe from %q with filters: %s\n", p.src.addr, p.filter)
exportReader, err := p.exportPipe() exportReader, err := p.exportPipe(ctx)
if err != nil { if err != nil {
return fmt.Errorf("failed to init export pipe: %s", err) return fmt.Errorf("failed to init export pipe: %s", err)
} }
sync := make(chan struct{})
nativeImportAddr, err := vm.AddExtraLabelsToImportPath(nativeImportAddr, p.dst.extraLabels) nativeImportAddr, err := vm.AddExtraLabelsToImportPath(nativeImportAddr, p.dst.extraLabels)
if err != nil { if err != nil {
return err return err
} }
sync := make(chan struct{})
go func() { go func() {
defer func() { close(sync) }() defer func() { close(sync) }()
u := fmt.Sprintf("%s/%s", p.dst.addr, nativeImportAddr) u := fmt.Sprintf("%s/%s", p.dst.addr, nativeImportAddr)
req, err := http.NewRequest("POST", u, pr) req, err := http.NewRequestWithContext(ctx, "POST", u, pr)
if err != nil { if err != nil {
log.Fatalf("cannot create import request to %q: %s", p.dst.addr, err) log.Fatalf("cannot create import request to %q: %s", p.dst.addr, err)
} }
@ -95,22 +96,25 @@ func (p *vmNativeProcessor) run() error {
rl := limiter.NewLimiter(p.rateLimit) rl := limiter.NewLimiter(p.rateLimit)
w = limiter.NewWriteLimiter(pw, rl) w = limiter.NewWriteLimiter(pw, rl)
} }
_, err = io.Copy(w, barReader) _, err = io.Copy(w, barReader)
if err != nil { if err != nil {
return fmt.Errorf("failed to write into %q: %s", p.dst.addr, err) return fmt.Errorf("failed to write into %q: %s", p.dst.addr, err)
} }
if err := pw.Close(); err != nil { if err := pw.Close(); err != nil {
return err return err
} }
<-sync <-sync
barpool.Stop() barpool.Stop()
log.Println("Import finished!")
return nil return nil
} }
func (p *vmNativeProcessor) exportPipe() (io.ReadCloser, error) { func (p *vmNativeProcessor) exportPipe(ctx context.Context) (io.ReadCloser, error) {
u := fmt.Sprintf("%s/%s", p.src.addr, nativeExportAddr) u := fmt.Sprintf("%s/%s", p.src.addr, nativeExportAddr)
req, err := http.NewRequest("GET", u, nil) req, err := http.NewRequestWithContext(ctx, "GET", u, nil)
if err != nil { if err != nil {
return nil, fmt.Errorf("cannot create request to %q: %s", p.src.addr, err) return nil, fmt.Errorf("cannot create request to %q: %s", p.src.addr, err)
} }

View file

@ -0,0 +1,95 @@
package main
import (
"context"
"testing"
"time"
)
// If you want to run this test:
// 1. run two instances of victoriametrics and define -httpListenAddr for both or just for second instance
// 2. define srcAddr and dstAddr const with your victoriametrics addresses
// 3. define matchFilter const with your importing data
// 4. define timeStartFilter
// 5. run each test one by one
const (
matchFilter = `{job="avalanche"}`
timeStartFilter = "2020-01-01T20:07:00Z"
srcAddr = "http://127.0.0.1:8428"
dstAddr = "http://127.0.0.1:8528"
)
// This test simulates close process if user abort it
func Test_vmNativeProcessor_run(t *testing.T) {
t.Skip()
type fields struct {
filter filter
rateLimit int64
dst *vmNativeClient
src *vmNativeClient
}
tests := []struct {
name string
fields fields
closer func(cancelFunc context.CancelFunc)
wantErr bool
}{
{
name: "simulate syscall.SIGINT",
fields: fields{
filter: filter{
match: matchFilter,
timeStart: timeStartFilter,
},
rateLimit: 0,
dst: &vmNativeClient{
addr: dstAddr,
},
src: &vmNativeClient{
addr: srcAddr,
},
},
closer: func(cancelFunc context.CancelFunc) {
time.Sleep(time.Second * 5)
cancelFunc()
},
wantErr: true,
},
{
name: "simulate correct work",
fields: fields{
filter: filter{
match: matchFilter,
timeStart: timeStartFilter,
},
rateLimit: 0,
dst: &vmNativeClient{
addr: dstAddr,
},
src: &vmNativeClient{
addr: srcAddr,
},
},
closer: func(cancelFunc context.CancelFunc) {},
wantErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ctx, cancelFn := context.WithCancel(context.Background())
p := &vmNativeProcessor{
filter: tt.fields.filter,
rateLimit: tt.fields.rateLimit,
dst: tt.fields.dst,
src: tt.fields.src,
}
tt.closer(cancelFn)
if err := p.run(ctx); (err != nil) != tt.wantErr {
t.Errorf("run() error = %v, wantErr %v", err, tt.wantErr)
}
})
}
}