mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-12-01 14:47:38 +00:00
b2294d1cf1
add progress bars to the VM importer The new progress bars supposed to display the processing speed per each VM importer worker. This info should help to identify if there is a bottleneck on the VM side during the import process, without waiting for its finish. The new progress bars can be disabled by passing `vm-disable-progress-bar` flag. Plotting multiple progress bars requires using experimental progress bar pool from github.com/cheggaaa/pb/v3. Switch to progress bar pool required changes in all import modes. The openTSDB mode wasn't changed due to its implementation, which implies individual progress bars per each series. Because of this, using the pool wasn't possible. Signed-off-by: dmitryk-dk <kozlovdmitriyy@gmail.com> Co-authored-by: hagen1778 <roman@victoriametrics.com>
105 lines
1.8 KiB
Go
105 lines
1.8 KiB
Go
// +build linux darwin freebsd netbsd openbsd solaris dragonfly windows plan9 aix
|
|
|
|
package pb
|
|
|
|
import (
|
|
"io"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/cheggaaa/pb/v3/termutil"
|
|
)
|
|
|
|
// Create and start new pool with given bars
|
|
// You need call pool.Stop() after work
|
|
func StartPool(pbs ...*ProgressBar) (pool *Pool, err error) {
|
|
pool = new(Pool)
|
|
if err = pool.Start(); err != nil {
|
|
return
|
|
}
|
|
pool.Add(pbs...)
|
|
return
|
|
}
|
|
|
|
// NewPool initialises a pool with progress bars, but
|
|
// doesn't start it. You need to call Start manually
|
|
func NewPool(pbs ...*ProgressBar) (pool *Pool) {
|
|
pool = new(Pool)
|
|
pool.Add(pbs...)
|
|
return
|
|
}
|
|
|
|
type Pool struct {
|
|
Output io.Writer
|
|
RefreshRate time.Duration
|
|
bars []*ProgressBar
|
|
lastBarsCount int
|
|
shutdownCh chan struct{}
|
|
workerCh chan struct{}
|
|
m sync.Mutex
|
|
finishOnce sync.Once
|
|
}
|
|
|
|
// Add progress bars.
|
|
func (p *Pool) Add(pbs ...*ProgressBar) {
|
|
p.m.Lock()
|
|
defer p.m.Unlock()
|
|
for _, bar := range pbs {
|
|
bar.Set(Static, true)
|
|
bar.Start()
|
|
p.bars = append(p.bars, bar)
|
|
}
|
|
}
|
|
|
|
func (p *Pool) Start() (err error) {
|
|
p.RefreshRate = defaultRefreshRate
|
|
p.shutdownCh, err = termutil.RawModeOn()
|
|
if err != nil {
|
|
return
|
|
}
|
|
p.workerCh = make(chan struct{})
|
|
go p.writer()
|
|
return
|
|
}
|
|
|
|
func (p *Pool) writer() {
|
|
var first = true
|
|
defer func() {
|
|
if first == false {
|
|
p.print(false)
|
|
} else {
|
|
p.print(true)
|
|
p.print(false)
|
|
}
|
|
close(p.workerCh)
|
|
}()
|
|
|
|
for {
|
|
select {
|
|
case <-time.After(p.RefreshRate):
|
|
if p.print(first) {
|
|
p.print(false)
|
|
return
|
|
}
|
|
first = false
|
|
case <-p.shutdownCh:
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// Restore terminal state and close pool
|
|
func (p *Pool) Stop() error {
|
|
p.finishOnce.Do(func() {
|
|
if p.shutdownCh != nil {
|
|
close(p.shutdownCh)
|
|
}
|
|
})
|
|
|
|
// Wait for the worker to complete
|
|
select {
|
|
case <-p.workerCh:
|
|
}
|
|
|
|
return termutil.RawModeOff()
|
|
}
|