vmctl/vm: added datapoints collection bar (#2486)

add progress bars to the VM importer

The new progress bars supposed to display the processing speed per each
VM importer worker. This info should help to identify if there is a bottleneck
on the VM side during the import process, without waiting for its finish.
The new progress bars can be disabled by passing `vm-disable-progress-bar` flag.

Plotting multiple progress bars requires using experimental progress bar pool
from github.com/cheggaaa/pb/v3. Switch to progress bar pool required changes
in all import modes.

The openTSDB mode wasn't changed due to its implementation, which implies individual progress
bars per each series. Because of this, using the pool wasn't possible.

Signed-off-by: dmitryk-dk <kozlovdmitriyy@gmail.com>

Co-authored-by: hagen1778 <roman@victoriametrics.com>
This commit is contained in:
Dmytro Kozlov 2022-05-02 10:06:34 +03:00 committed by GitHub
parent 8688ea8aa8
commit b2294d1cf1
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
25 changed files with 833 additions and 558 deletions

View file

@ -1,135 +1,135 @@
// Code generated by qtc from "alertmanager_request.qtpl". DO NOT EDIT.
// See https://github.com/valyala/quicktemplate for details.
//line app/vmalert/notifier/alertmanager_request.qtpl:1
//line notifier/alertmanager_request.qtpl:1
package notifier
//line app/vmalert/notifier/alertmanager_request.qtpl:1
//line notifier/alertmanager_request.qtpl:1
import (
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel"
)
//line app/vmalert/notifier/alertmanager_request.qtpl:8
//line notifier/alertmanager_request.qtpl:8
import (
qtio422016 "io"
qt422016 "github.com/valyala/quicktemplate"
)
//line app/vmalert/notifier/alertmanager_request.qtpl:8
//line notifier/alertmanager_request.qtpl:8
var (
_ = qtio422016.Copy
_ = qt422016.AcquireByteBuffer
)
//line app/vmalert/notifier/alertmanager_request.qtpl:8
//line notifier/alertmanager_request.qtpl:8
func streamamRequest(qw422016 *qt422016.Writer, alerts []Alert, generatorURL func(Alert) string, relabelCfg *promrelabel.ParsedConfigs) {
//line app/vmalert/notifier/alertmanager_request.qtpl:8
//line notifier/alertmanager_request.qtpl:8
qw422016.N().S(`[`)
//line app/vmalert/notifier/alertmanager_request.qtpl:10
//line notifier/alertmanager_request.qtpl:10
for i, alert := range alerts {
//line app/vmalert/notifier/alertmanager_request.qtpl:10
//line notifier/alertmanager_request.qtpl:10
qw422016.N().S(`{"startsAt":`)
//line app/vmalert/notifier/alertmanager_request.qtpl:12
//line notifier/alertmanager_request.qtpl:12
qw422016.N().Q(alert.Start.Format(time.RFC3339Nano))
//line app/vmalert/notifier/alertmanager_request.qtpl:12
//line notifier/alertmanager_request.qtpl:12
qw422016.N().S(`,"generatorURL":`)
//line app/vmalert/notifier/alertmanager_request.qtpl:13
//line notifier/alertmanager_request.qtpl:13
qw422016.N().Q(generatorURL(alert))
//line app/vmalert/notifier/alertmanager_request.qtpl:13
//line notifier/alertmanager_request.qtpl:13
qw422016.N().S(`,`)
//line app/vmalert/notifier/alertmanager_request.qtpl:14
//line notifier/alertmanager_request.qtpl:14
if !alert.End.IsZero() {
//line app/vmalert/notifier/alertmanager_request.qtpl:14
//line notifier/alertmanager_request.qtpl:14
qw422016.N().S(`"endsAt":`)
//line app/vmalert/notifier/alertmanager_request.qtpl:15
//line notifier/alertmanager_request.qtpl:15
qw422016.N().Q(alert.End.Format(time.RFC3339Nano))
//line app/vmalert/notifier/alertmanager_request.qtpl:15
//line notifier/alertmanager_request.qtpl:15
qw422016.N().S(`,`)
//line app/vmalert/notifier/alertmanager_request.qtpl:16
//line notifier/alertmanager_request.qtpl:16
}
//line app/vmalert/notifier/alertmanager_request.qtpl:16
//line notifier/alertmanager_request.qtpl:16
qw422016.N().S(`"labels": {"alertname":`)
//line app/vmalert/notifier/alertmanager_request.qtpl:18
//line notifier/alertmanager_request.qtpl:18
qw422016.N().Q(alert.Name)
//line app/vmalert/notifier/alertmanager_request.qtpl:19
//line notifier/alertmanager_request.qtpl:19
lbls := alert.toPromLabels(relabelCfg)
//line app/vmalert/notifier/alertmanager_request.qtpl:20
//line notifier/alertmanager_request.qtpl:20
for _, l := range lbls {
//line app/vmalert/notifier/alertmanager_request.qtpl:20
//line notifier/alertmanager_request.qtpl:20
qw422016.N().S(`,`)
//line app/vmalert/notifier/alertmanager_request.qtpl:21
//line notifier/alertmanager_request.qtpl:21
qw422016.N().Q(l.Name)
//line app/vmalert/notifier/alertmanager_request.qtpl:21
//line notifier/alertmanager_request.qtpl:21
qw422016.N().S(`:`)
//line app/vmalert/notifier/alertmanager_request.qtpl:21
//line notifier/alertmanager_request.qtpl:21
qw422016.N().Q(l.Value)
//line app/vmalert/notifier/alertmanager_request.qtpl:22
//line notifier/alertmanager_request.qtpl:22
}
//line app/vmalert/notifier/alertmanager_request.qtpl:22
//line notifier/alertmanager_request.qtpl:22
qw422016.N().S(`},"annotations": {`)
//line app/vmalert/notifier/alertmanager_request.qtpl:25
//line notifier/alertmanager_request.qtpl:25
c := len(alert.Annotations)
//line app/vmalert/notifier/alertmanager_request.qtpl:26
//line notifier/alertmanager_request.qtpl:26
for k, v := range alert.Annotations {
//line app/vmalert/notifier/alertmanager_request.qtpl:27
//line notifier/alertmanager_request.qtpl:27
c = c - 1
//line app/vmalert/notifier/alertmanager_request.qtpl:28
//line notifier/alertmanager_request.qtpl:28
qw422016.N().Q(k)
//line app/vmalert/notifier/alertmanager_request.qtpl:28
//line notifier/alertmanager_request.qtpl:28
qw422016.N().S(`:`)
//line app/vmalert/notifier/alertmanager_request.qtpl:28
//line notifier/alertmanager_request.qtpl:28
qw422016.N().Q(v)
//line app/vmalert/notifier/alertmanager_request.qtpl:28
//line notifier/alertmanager_request.qtpl:28
if c > 0 {
//line app/vmalert/notifier/alertmanager_request.qtpl:28
//line notifier/alertmanager_request.qtpl:28
qw422016.N().S(`,`)
//line app/vmalert/notifier/alertmanager_request.qtpl:28
//line notifier/alertmanager_request.qtpl:28
}
//line app/vmalert/notifier/alertmanager_request.qtpl:29
//line notifier/alertmanager_request.qtpl:29
}
//line app/vmalert/notifier/alertmanager_request.qtpl:29
//line notifier/alertmanager_request.qtpl:29
qw422016.N().S(`}}`)
//line app/vmalert/notifier/alertmanager_request.qtpl:32
//line notifier/alertmanager_request.qtpl:32
if i != len(alerts)-1 {
//line app/vmalert/notifier/alertmanager_request.qtpl:32
//line notifier/alertmanager_request.qtpl:32
qw422016.N().S(`,`)
//line app/vmalert/notifier/alertmanager_request.qtpl:32
//line notifier/alertmanager_request.qtpl:32
}
//line app/vmalert/notifier/alertmanager_request.qtpl:33
//line notifier/alertmanager_request.qtpl:33
}
//line app/vmalert/notifier/alertmanager_request.qtpl:33
//line notifier/alertmanager_request.qtpl:33
qw422016.N().S(`]`)
//line app/vmalert/notifier/alertmanager_request.qtpl:35
//line notifier/alertmanager_request.qtpl:35
}
//line app/vmalert/notifier/alertmanager_request.qtpl:35
//line notifier/alertmanager_request.qtpl:35
func writeamRequest(qq422016 qtio422016.Writer, alerts []Alert, generatorURL func(Alert) string, relabelCfg *promrelabel.ParsedConfigs) {
//line app/vmalert/notifier/alertmanager_request.qtpl:35
//line notifier/alertmanager_request.qtpl:35
qw422016 := qt422016.AcquireWriter(qq422016)
//line app/vmalert/notifier/alertmanager_request.qtpl:35
//line notifier/alertmanager_request.qtpl:35
streamamRequest(qw422016, alerts, generatorURL, relabelCfg)
//line app/vmalert/notifier/alertmanager_request.qtpl:35
//line notifier/alertmanager_request.qtpl:35
qt422016.ReleaseWriter(qw422016)
//line app/vmalert/notifier/alertmanager_request.qtpl:35
//line notifier/alertmanager_request.qtpl:35
}
//line app/vmalert/notifier/alertmanager_request.qtpl:35
//line notifier/alertmanager_request.qtpl:35
func amRequest(alerts []Alert, generatorURL func(Alert) string, relabelCfg *promrelabel.ParsedConfigs) string {
//line app/vmalert/notifier/alertmanager_request.qtpl:35
//line notifier/alertmanager_request.qtpl:35
qb422016 := qt422016.AcquireByteBuffer()
//line app/vmalert/notifier/alertmanager_request.qtpl:35
//line notifier/alertmanager_request.qtpl:35
writeamRequest(qb422016, alerts, generatorURL, relabelCfg)
//line app/vmalert/notifier/alertmanager_request.qtpl:35
//line notifier/alertmanager_request.qtpl:35
qs422016 := string(qb422016.B)
//line app/vmalert/notifier/alertmanager_request.qtpl:35
//line notifier/alertmanager_request.qtpl:35
qt422016.ReleaseByteBuffer(qb422016)
//line app/vmalert/notifier/alertmanager_request.qtpl:35
//line notifier/alertmanager_request.qtpl:35
return qs422016
//line app/vmalert/notifier/alertmanager_request.qtpl:35
//line notifier/alertmanager_request.qtpl:35
}

View file

@ -1,25 +1,25 @@
// Code generated by qtc from "footer.qtpl". DO NOT EDIT.
// See https://github.com/valyala/quicktemplate for details.
//line app/vmalert/tpl/footer.qtpl:1
//line tpl/footer.qtpl:1
package tpl
//line app/vmalert/tpl/footer.qtpl:1
//line tpl/footer.qtpl:1
import (
qtio422016 "io"
qt422016 "github.com/valyala/quicktemplate"
)
//line app/vmalert/tpl/footer.qtpl:1
//line tpl/footer.qtpl:1
var (
_ = qtio422016.Copy
_ = qt422016.AcquireByteBuffer
)
//line app/vmalert/tpl/footer.qtpl:1
//line tpl/footer.qtpl:1
func StreamFooter(qw422016 *qt422016.Writer) {
//line app/vmalert/tpl/footer.qtpl:1
//line tpl/footer.qtpl:1
qw422016.N().S(`
</main>
<script src="https://cdn.jsdelivr.net/npm/bootstrap@5.0.2/dist/js/bootstrap.bundle.min.js" integrity="sha384-MrcW6ZMFYlzcLA8Nl+NtUVF0sA7MsXsP1UyJoMp4YLEuNSfAP+JcXn/tWtIaxVXM" crossorigin="anonymous"></script>
@ -56,31 +56,31 @@ func StreamFooter(qw422016 *qt422016.Writer) {
</body>
</html>
`)
//line app/vmalert/tpl/footer.qtpl:36
//line tpl/footer.qtpl:36
}
//line app/vmalert/tpl/footer.qtpl:36
//line tpl/footer.qtpl:36
func WriteFooter(qq422016 qtio422016.Writer) {
//line app/vmalert/tpl/footer.qtpl:36
//line tpl/footer.qtpl:36
qw422016 := qt422016.AcquireWriter(qq422016)
//line app/vmalert/tpl/footer.qtpl:36
//line tpl/footer.qtpl:36
StreamFooter(qw422016)
//line app/vmalert/tpl/footer.qtpl:36
//line tpl/footer.qtpl:36
qt422016.ReleaseWriter(qw422016)
//line app/vmalert/tpl/footer.qtpl:36
//line tpl/footer.qtpl:36
}
//line app/vmalert/tpl/footer.qtpl:36
//line tpl/footer.qtpl:36
func Footer() string {
//line app/vmalert/tpl/footer.qtpl:36
//line tpl/footer.qtpl:36
qb422016 := qt422016.AcquireByteBuffer()
//line app/vmalert/tpl/footer.qtpl:36
//line tpl/footer.qtpl:36
WriteFooter(qb422016)
//line app/vmalert/tpl/footer.qtpl:36
//line tpl/footer.qtpl:36
qs422016 := string(qb422016.B)
//line app/vmalert/tpl/footer.qtpl:36
//line tpl/footer.qtpl:36
qt422016.ReleaseByteBuffer(qb422016)
//line app/vmalert/tpl/footer.qtpl:36
//line tpl/footer.qtpl:36
return qs422016
//line app/vmalert/tpl/footer.qtpl:36
//line tpl/footer.qtpl:36
}

View file

@ -9,11 +9,6 @@
min-height: 75rem;
padding-top: 4.5rem;
}
pre {
overflow: scroll;
max-width: 600px;
min-height: 30px;
}
.group-heading {
cursor: pointer;
padding: 5px;

View file

@ -1,39 +1,39 @@
// Code generated by qtc from "header.qtpl". DO NOT EDIT.
// See https://github.com/valyala/quicktemplate for details.
//line app/vmalert/tpl/header.qtpl:1
//line tpl/header.qtpl:1
package tpl
//line app/vmalert/tpl/header.qtpl:1
//line tpl/header.qtpl:1
import (
qtio422016 "io"
qt422016 "github.com/valyala/quicktemplate"
)
//line app/vmalert/tpl/header.qtpl:1
//line tpl/header.qtpl:1
var (
_ = qtio422016.Copy
_ = qt422016.AcquireByteBuffer
)
//line app/vmalert/tpl/header.qtpl:1
//line tpl/header.qtpl:1
func StreamHeader(qw422016 *qt422016.Writer, title string, pages []NavItem) {
//line app/vmalert/tpl/header.qtpl:1
//line tpl/header.qtpl:1
qw422016.N().S(`
<!DOCTYPE html>
<html lang="en">
<head>
<title>vmalert`)
//line app/vmalert/tpl/header.qtpl:5
//line tpl/header.qtpl:5
if title != "" {
//line app/vmalert/tpl/header.qtpl:5
//line tpl/header.qtpl:5
qw422016.N().S(` - `)
//line app/vmalert/tpl/header.qtpl:5
//line tpl/header.qtpl:5
qw422016.E().S(title)
//line app/vmalert/tpl/header.qtpl:5
//line tpl/header.qtpl:5
}
//line app/vmalert/tpl/header.qtpl:5
//line tpl/header.qtpl:5
qw422016.N().S(`</title>
<link href="https://cdn.jsdelivr.net/npm/bootstrap@5.0.2/dist/css/bootstrap.min.css" rel="stylesheet" integrity="sha384-EVSTQN3/azprG1Anm3QDgpJLIm9Nao0Yz1ztcQTwFspd3yD65VohhpuuCOmLASjC" crossorigin="anonymous">
<style>
@ -41,11 +41,6 @@ func StreamHeader(qw422016 *qt422016.Writer, title string, pages []NavItem) {
min-height: 75rem;
padding-top: 4.5rem;
}
pre {
overflow: scroll;
max-width: 600px;
min-height: 30px;
}
.group-heading {
cursor: pointer;
padding: 5px;
@ -71,37 +66,37 @@ func StreamHeader(qw422016 *qt422016.Writer, title string, pages []NavItem) {
</head>
<body>
`)
//line app/vmalert/tpl/header.qtpl:41
//line tpl/header.qtpl:36
StreamPrintNavItems(qw422016, title, pages)
//line app/vmalert/tpl/header.qtpl:41
//line tpl/header.qtpl:36
qw422016.N().S(`
<main class="px-2">
`)
//line app/vmalert/tpl/header.qtpl:43
//line tpl/header.qtpl:38
}
//line app/vmalert/tpl/header.qtpl:43
//line tpl/header.qtpl:38
func WriteHeader(qq422016 qtio422016.Writer, title string, pages []NavItem) {
//line app/vmalert/tpl/header.qtpl:43
//line tpl/header.qtpl:38
qw422016 := qt422016.AcquireWriter(qq422016)
//line app/vmalert/tpl/header.qtpl:43
//line tpl/header.qtpl:38
StreamHeader(qw422016, title, pages)
//line app/vmalert/tpl/header.qtpl:43
//line tpl/header.qtpl:38
qt422016.ReleaseWriter(qw422016)
//line app/vmalert/tpl/header.qtpl:43
//line tpl/header.qtpl:38
}
//line app/vmalert/tpl/header.qtpl:43
//line tpl/header.qtpl:38
func Header(title string, pages []NavItem) string {
//line app/vmalert/tpl/header.qtpl:43
//line tpl/header.qtpl:38
qb422016 := qt422016.AcquireByteBuffer()
//line app/vmalert/tpl/header.qtpl:43
//line tpl/header.qtpl:38
WriteHeader(qb422016, title, pages)
//line app/vmalert/tpl/header.qtpl:43
//line tpl/header.qtpl:38
qs422016 := string(qb422016.B)
//line app/vmalert/tpl/header.qtpl:43
//line tpl/header.qtpl:38
qt422016.ReleaseByteBuffer(qb422016)
//line app/vmalert/tpl/header.qtpl:43
//line tpl/header.qtpl:38
return qs422016
//line app/vmalert/tpl/header.qtpl:43
//line tpl/header.qtpl:38
}

View file

@ -1,96 +1,96 @@
// Code generated by qtc from "nav.qtpl". DO NOT EDIT.
// See https://github.com/valyala/quicktemplate for details.
//line app/vmalert/tpl/nav.qtpl:1
//line tpl/nav.qtpl:1
package tpl
//line app/vmalert/tpl/nav.qtpl:1
//line tpl/nav.qtpl:1
import (
qtio422016 "io"
qt422016 "github.com/valyala/quicktemplate"
)
//line app/vmalert/tpl/nav.qtpl:1
//line tpl/nav.qtpl:1
var (
_ = qtio422016.Copy
_ = qt422016.AcquireByteBuffer
)
//line app/vmalert/tpl/nav.qtpl:2
//line tpl/nav.qtpl:2
type NavItem struct {
Name string
Url string
}
//line app/vmalert/tpl/nav.qtpl:8
//line tpl/nav.qtpl:8
func StreamPrintNavItems(qw422016 *qt422016.Writer, current string, items []NavItem) {
//line app/vmalert/tpl/nav.qtpl:8
//line tpl/nav.qtpl:8
qw422016.N().S(`
<nav class="navbar navbar-expand-md navbar-dark fixed-top bg-dark">
<div class="container-fluid">
<div class="collapse navbar-collapse" id="navbarCollapse">
<ul class="navbar-nav me-auto mb-2 mb-md-0">
`)
//line app/vmalert/tpl/nav.qtpl:13
//line tpl/nav.qtpl:13
for _, item := range items {
//line app/vmalert/tpl/nav.qtpl:13
//line tpl/nav.qtpl:13
qw422016.N().S(`
<li class="nav-item">
<a class="nav-link`)
//line app/vmalert/tpl/nav.qtpl:15
//line tpl/nav.qtpl:15
if current == item.Name {
//line app/vmalert/tpl/nav.qtpl:15
//line tpl/nav.qtpl:15
qw422016.N().S(` active`)
//line app/vmalert/tpl/nav.qtpl:15
//line tpl/nav.qtpl:15
}
//line app/vmalert/tpl/nav.qtpl:15
//line tpl/nav.qtpl:15
qw422016.N().S(`" href="`)
//line app/vmalert/tpl/nav.qtpl:15
//line tpl/nav.qtpl:15
qw422016.E().S(item.Url)
//line app/vmalert/tpl/nav.qtpl:15
//line tpl/nav.qtpl:15
qw422016.N().S(`">
`)
//line app/vmalert/tpl/nav.qtpl:16
//line tpl/nav.qtpl:16
qw422016.E().S(item.Name)
//line app/vmalert/tpl/nav.qtpl:16
//line tpl/nav.qtpl:16
qw422016.N().S(`
</a>
</li>
`)
//line app/vmalert/tpl/nav.qtpl:19
//line tpl/nav.qtpl:19
}
//line app/vmalert/tpl/nav.qtpl:19
//line tpl/nav.qtpl:19
qw422016.N().S(`
</ul>
</div>
</nav>
`)
//line app/vmalert/tpl/nav.qtpl:23
//line tpl/nav.qtpl:23
}
//line app/vmalert/tpl/nav.qtpl:23
//line tpl/nav.qtpl:23
func WritePrintNavItems(qq422016 qtio422016.Writer, current string, items []NavItem) {
//line app/vmalert/tpl/nav.qtpl:23
//line tpl/nav.qtpl:23
qw422016 := qt422016.AcquireWriter(qq422016)
//line app/vmalert/tpl/nav.qtpl:23
//line tpl/nav.qtpl:23
StreamPrintNavItems(qw422016, current, items)
//line app/vmalert/tpl/nav.qtpl:23
//line tpl/nav.qtpl:23
qt422016.ReleaseWriter(qw422016)
//line app/vmalert/tpl/nav.qtpl:23
//line tpl/nav.qtpl:23
}
//line app/vmalert/tpl/nav.qtpl:23
//line tpl/nav.qtpl:23
func PrintNavItems(current string, items []NavItem) string {
//line app/vmalert/tpl/nav.qtpl:23
//line tpl/nav.qtpl:23
qb422016 := qt422016.AcquireByteBuffer()
//line app/vmalert/tpl/nav.qtpl:23
//line tpl/nav.qtpl:23
WritePrintNavItems(qb422016, current, items)
//line app/vmalert/tpl/nav.qtpl:23
//line tpl/nav.qtpl:23
qs422016 := string(qb422016.B)
//line app/vmalert/tpl/nav.qtpl:23
//line tpl/nav.qtpl:23
qt422016.ReleaseByteBuffer(qb422016)
//line app/vmalert/tpl/nav.qtpl:23
//line tpl/nav.qtpl:23
return qs422016
//line app/vmalert/tpl/nav.qtpl:23
//line tpl/nav.qtpl:23
}

View file

@ -77,7 +77,7 @@
<b>record:</b> {%s r.Name %}
{% endif %}
<br>
<code><pre>{%s r.Query %}</pre></code><br>
<code><pre class="text-wrap">{%s r.Query %}</pre></code><br>
{% if len(r.Labels) > 0 %} <b>Labels:</b>{% endif %}
{% for k, v := range r.Labels %}
<span class="ms-1 badge bg-primary">{%s k %}={%s v %}</span>

File diff suppressed because it is too large Load diff

26
app/vmctl/barpool/pool.go Normal file
View file

@ -0,0 +1,26 @@
// Package barpool provides access to the global
// pool of progress bars, so they could be rendered
// altogether.
package barpool
import "github.com/cheggaaa/pb/v3"
var pool = pb.NewPool()
// Add adds bar to the global pool
func Add(bar *pb.ProgressBar) { pool.Add(bar) }
// Start starts the global pool
// Must be called after all progress bars were added
func Start() error { return pool.Start() }
// Stop stops the global pool
func Stop() { _ = pool.Stop() }
// AddWithTemplate adds bar with the given template
// to the global pool
func AddWithTemplate(format string, total int) *pb.ProgressBar {
bar := pb.ProgressBarTemplate(format).New(total)
Add(bar)
return bar
}

View file

@ -36,6 +36,7 @@ const (
vmBatchSize = "vm-batch-size"
vmSignificantFigures = "vm-significant-figures"
vmRoundDigits = "vm-round-digits"
vmDisableProgressBar = "vm-disable-progress-bar"
// also used in vm-native
vmExtraLabel = "vm-extra-label"
@ -109,6 +110,10 @@ var (
Usage: "Optional data transfer rate limit in bytes per second.\n" +
"By default the rate limit is disabled. It can be useful for limiting load on configured via '--vmAddr' destination.",
},
&cli.BoolFlag{
Name: vmDisableProgressBar,
Usage: "Whether to disable progress bar per each worker during the import.",
},
}
)

View file

@ -6,9 +6,9 @@ import (
"log"
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/barpool"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/influx"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/vm"
"github.com/cheggaaa/pb/v3"
)
type influxProcessor struct {
@ -44,7 +44,11 @@ func (ip *influxProcessor) run(silent, verbose bool) error {
return nil
}
bar := pb.StartNew(len(series))
bar := barpool.AddWithTemplate(fmt.Sprintf(barTpl, "Processing series"), len(series))
if err := barpool.Start(); err != nil {
return err
}
seriesCh := make(chan *influx.Series)
errCh := make(chan error)
ip.im.ResetStats()
@ -84,7 +88,7 @@ func (ip *influxProcessor) run(silent, verbose bool) error {
return fmt.Errorf("import process failed: %s", wrapErr(vmErr, verbose))
}
}
bar.Finish()
barpool.Stop()
log.Println("Import finished!")
log.Print(ip.im.Stats())
return nil

View file

@ -55,6 +55,9 @@ func main() {
}
vmCfg := initConfigVM(c)
// disable progress bars since openTSDB implementation
// does not use progress bar pool
vmCfg.DisableProgressBar = true
importer, err := vm.NewImporter(vmCfg)
if err != nil {
return fmt.Errorf("failed to create VM importer: %s", err)
@ -233,5 +236,6 @@ func initConfigVM(c *cli.Context) vm.Config {
RoundDigits: c.Int(vmRoundDigits),
ExtraLabels: c.StringSlice(vmExtraLabel),
RateLimit: c.Int64(vmRateLimit),
DisableProgressBar: c.Bool(vmDisableProgressBar),
}
}

View file

@ -5,9 +5,9 @@ import (
"log"
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/barpool"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/prometheus"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/vm"
"github.com/cheggaaa/pb/v3"
"github.com/prometheus/prometheus/tsdb"
)
@ -38,7 +38,12 @@ func (pp *prometheusProcessor) run(silent, verbose bool) error {
return nil
}
bar := pb.StartNew(len(blocks))
bar := barpool.AddWithTemplate(fmt.Sprintf(barTpl, "Processing blocks"), len(blocks))
if err := barpool.Start(); err != nil {
return err
}
blockReadersCh := make(chan tsdb.BlockReader)
errCh := make(chan error, pp.cc)
pp.im.ResetStats()
@ -81,7 +86,7 @@ func (pp *prometheusProcessor) run(silent, verbose bool) error {
return fmt.Errorf("import process failed: %s", wrapErr(vmErr, verbose))
}
}
bar.Finish()
barpool.Stop()
log.Println("Import finished!")
log.Print(pp.im.Stats())
return nil

View file

@ -9,6 +9,8 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/vm"
)
const barTpl = `{{ blue "%s:" }} {{ counters . }} {{ bar . "[" "█" (cycle . "█") "▒" "]" }} {{ percent . }}`
func prompt(question string) bool {
reader := bufio.NewReader(os.Stdin)
fmt.Print(question, " [Y/n] ")

View file

@ -13,6 +13,9 @@ import (
"sync"
"time"
"github.com/cheggaaa/pb/v3"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/barpool"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/limiter"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal"
)
@ -51,6 +54,8 @@ type Config struct {
// RateLimit defines a data transfer speed in bytes per second.
// Is applied to each worker (see Concurrency) independently.
RateLimit int64
// Whether to disable progress bar per VM worker
DisableProgressBar bool
}
// Importer performs insertion of timeseries
@ -144,15 +149,22 @@ func NewImporter(cfg Config) (*Importer, error) {
im.wg.Add(int(cfg.Concurrency))
for i := 0; i < int(cfg.Concurrency); i++ {
go func() {
var bar *pb.ProgressBar
if !cfg.DisableProgressBar {
pbPrefix := fmt.Sprintf(`{{ green "VM worker %d:" }}`, i)
bar = barpool.AddWithTemplate(pbPrefix+pbTpl, 0)
}
go func(bar *pb.ProgressBar) {
defer im.wg.Done()
im.startWorker(cfg.BatchSize, cfg.SignificantFigures, cfg.RoundDigits)
}()
im.startWorker(bar, cfg.BatchSize, cfg.SignificantFigures, cfg.RoundDigits)
}(bar)
}
im.ResetStats()
return im, nil
}
const pbTpl = `{{ (cycle . "←" "↖" "↑" "↗" "→" "↘" "↓" "↙" ) }} {{speed . "%s samples/s"}}`
// ImportError is type of error generated
// in case of unsuccessful import request
type ImportError struct {
@ -182,7 +194,7 @@ func (im *Importer) Close() {
})
}
func (im *Importer) startWorker(batchSize, significantFigures, roundDigits int) {
func (im *Importer) startWorker(bar *pb.ProgressBar, batchSize, significantFigures, roundDigits int) {
var batch []*TimeSeries
var dataPoints int
var waitForBatch time.Time
@ -217,6 +229,11 @@ func (im *Importer) startWorker(batchSize, significantFigures, roundDigits int)
batch = append(batch, ts)
dataPoints += len(ts.Values)
if bar != nil {
bar.Add(len(ts.Values))
}
if dataPoints < batchSize {
continue
}
@ -232,8 +249,8 @@ func (im *Importer) startWorker(batchSize, significantFigures, roundDigits int)
// make a new batch, since old one was referenced as err
batch = make([]*TimeSeries, len(batch))
}
batch = batch[:0]
dataPoints = 0
batch = batch[:0]
waitForBatch = time.Now()
}
}

View file

@ -7,8 +7,7 @@ import (
"log"
"net/http"
"github.com/cheggaaa/pb/v3"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/barpool"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/limiter"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/vm"
)
@ -49,7 +48,7 @@ const (
nativeExportAddr = "api/v1/export/native"
nativeImportAddr = "api/v1/import/native"
barTpl = `Total: {{counters . }} {{ cycle . "↖" "↗" "↘" "↙" }} Speed: {{speed . }} {{string . "suffix"}}`
nativeBarTpl = `Total: {{counters . }} {{ cycle . "↖" "↗" "↘" "↙" }} Speed: {{speed . }} {{string . "suffix"}}`
)
func (p *vmNativeProcessor) run() error {
@ -84,8 +83,12 @@ func (p *vmNativeProcessor) run() error {
}()
fmt.Printf("Initing import process to %q:\n", p.dst.addr)
bar := pb.ProgressBarTemplate(barTpl).Start64(0)
bar := barpool.AddWithTemplate(nativeBarTpl, 0)
barReader := bar.NewProxyReader(exportReader)
if err := barpool.Start(); err != nil {
log.Printf("error start process bars pool: %s", err)
return err
}
w := io.Writer(pw)
if p.rateLimit > 0 {
@ -101,7 +104,7 @@ func (p *vmNativeProcessor) run() error {
}
<-sync
bar.Finish()
barpool.Stop()
return nil
}

2
go.mod
View file

@ -13,7 +13,7 @@ require (
github.com/VictoriaMetrics/metricsql v0.43.0
github.com/aws/aws-sdk-go v1.44.0
github.com/cespare/xxhash/v2 v2.1.2
github.com/cheggaaa/pb/v3 v3.0.8
github.com/cheggaaa/pb/v3 v3.0.9-0.20211222075416-90c02fa07ea4
github.com/golang/snappy v0.0.4
github.com/influxdata/influxdb v1.9.6
github.com/klauspost/compress v1.15.1

4
go.sum
View file

@ -198,8 +198,8 @@ github.com/cespare/xxhash/v2 v2.1.0/go.mod h1:dgIUBU3pDso/gPgZ1osOZ0iQf77oPR28Tj
github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/cespare/xxhash/v2 v2.1.2 h1:YRXhKfTDauu4ajMg1TPgFO5jnlC2HCbmLXMcTG5cbYE=
github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/cheggaaa/pb/v3 v3.0.8 h1:bC8oemdChbke2FHIIGy9mn4DPJ2caZYQnfbRqwmdCoA=
github.com/cheggaaa/pb/v3 v3.0.8/go.mod h1:UICbiLec/XO6Hw6k+BHEtHeQFzzBH4i2/qk/ow1EJTA=
github.com/cheggaaa/pb/v3 v3.0.9-0.20211222075416-90c02fa07ea4 h1:sbHAiGddrdLsd3i9/RYsm0OKOEh+UDFOONxai8YMMcw=
github.com/cheggaaa/pb/v3 v3.0.9-0.20211222075416-90c02fa07ea4/go.mod h1:UICbiLec/XO6Hw6k+BHEtHeQFzzBH4i2/qk/ow1EJTA=
github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI=
github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI=
github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU=

View file

@ -408,6 +408,13 @@ func (pb *ProgressBar) IsStarted() bool {
return pb.finish != nil
}
// IsFinished indicates progress bar is finished
func (pb *ProgressBar) IsFinished() bool {
pb.mu.RLock()
defer pb.mu.RUnlock()
return pb.finished
}
// SetTemplateString sets ProgressBar tempate string and parse it
func (pb *ProgressBar) SetTemplateString(tmpl string) *ProgressBar {
pb.mu.Lock()

105
vendor/github.com/cheggaaa/pb/v3/pool.go generated vendored Normal file
View file

@ -0,0 +1,105 @@
// +build linux darwin freebsd netbsd openbsd solaris dragonfly windows plan9 aix
package pb
import (
"io"
"sync"
"time"
"github.com/cheggaaa/pb/v3/termutil"
)
// Create and start new pool with given bars
// You need call pool.Stop() after work
func StartPool(pbs ...*ProgressBar) (pool *Pool, err error) {
pool = new(Pool)
if err = pool.Start(); err != nil {
return
}
pool.Add(pbs...)
return
}
// NewPool initialises a pool with progress bars, but
// doesn't start it. You need to call Start manually
func NewPool(pbs ...*ProgressBar) (pool *Pool) {
pool = new(Pool)
pool.Add(pbs...)
return
}
type Pool struct {
Output io.Writer
RefreshRate time.Duration
bars []*ProgressBar
lastBarsCount int
shutdownCh chan struct{}
workerCh chan struct{}
m sync.Mutex
finishOnce sync.Once
}
// Add progress bars.
func (p *Pool) Add(pbs ...*ProgressBar) {
p.m.Lock()
defer p.m.Unlock()
for _, bar := range pbs {
bar.Set(Static, true)
bar.Start()
p.bars = append(p.bars, bar)
}
}
func (p *Pool) Start() (err error) {
p.RefreshRate = defaultRefreshRate
p.shutdownCh, err = termutil.RawModeOn()
if err != nil {
return
}
p.workerCh = make(chan struct{})
go p.writer()
return
}
func (p *Pool) writer() {
var first = true
defer func() {
if first == false {
p.print(false)
} else {
p.print(true)
p.print(false)
}
close(p.workerCh)
}()
for {
select {
case <-time.After(p.RefreshRate):
if p.print(first) {
p.print(false)
return
}
first = false
case <-p.shutdownCh:
return
}
}
}
// Restore terminal state and close pool
func (p *Pool) Stop() error {
p.finishOnce.Do(func() {
if p.shutdownCh != nil {
close(p.shutdownCh)
}
})
// Wait for the worker to complete
select {
case <-p.workerCh:
}
return termutil.RawModeOff()
}

46
vendor/github.com/cheggaaa/pb/v3/pool_win.go generated vendored Normal file
View file

@ -0,0 +1,46 @@
// +build windows
package pb
import (
"fmt"
"log"
"github.com/cheggaaa/pb/v3/termutil"
)
func (p *Pool) print(first bool) bool {
p.m.Lock()
defer p.m.Unlock()
var out string
if !first {
coords, err := termutil.GetCursorPos()
if err != nil {
log.Panic(err)
}
coords.Y -= int16(p.lastBarsCount)
if coords.Y < 0 {
coords.Y = 0
}
coords.X = 0
err = termutil.SetCursorPos(coords)
if err != nil {
log.Panic(err)
}
}
isFinished := true
for _, bar := range p.bars {
if !bar.IsFinished() {
isFinished = false
}
out += fmt.Sprintf("\r%s\n", bar.String())
}
if p.Output != nil {
fmt.Fprint(p.Output, out)
} else {
fmt.Print(out)
}
p.lastBarsCount = len(p.bars)
return isFinished
}

43
vendor/github.com/cheggaaa/pb/v3/pool_x.go generated vendored Normal file
View file

@ -0,0 +1,43 @@
// +build linux darwin freebsd netbsd openbsd solaris dragonfly plan9 aix
package pb
import (
"fmt"
"os"
"github.com/cheggaaa/pb/v3/termutil"
)
func (p *Pool) print(first bool) bool {
p.m.Lock()
defer p.m.Unlock()
var out string
if !first {
out = fmt.Sprintf("\033[%dA", p.lastBarsCount)
}
isFinished := true
bars := p.bars
rows, cols, err := termutil.TerminalSize()
if err != nil {
cols = defaultBarWidth
}
if rows > 0 && len(bars) > rows {
// we need to hide bars that overflow terminal height
bars = bars[len(bars)-rows:]
}
for _, bar := range bars {
if !bar.IsFinished() {
isFinished = false
}
bar.SetWidth(cols)
out += fmt.Sprintf("\r%s\n", bar.String())
}
if p.Output != nil {
fmt.Fprint(p.Output, out)
} else {
fmt.Fprint(os.Stderr, out)
}
p.lastBarsCount = len(bars)
return isFinished
}

View file

@ -10,6 +10,16 @@ import (
var echoLocked bool
var echoLockMutex sync.Mutex
var errLocked = errors.New("terminal locked")
var autoTerminate = true
// AutoTerminate enables or disables automatic terminate signal catching.
// It's needed to restore the terminal state after the pool was used.
// By default, it's enabled.
func AutoTerminate(enable bool) {
echoLockMutex.Lock()
defer echoLockMutex.Unlock()
autoTerminate = enable
}
// RawModeOn switches terminal to raw mode
func RawModeOn() (quit chan struct{}, err error) {
@ -45,8 +55,10 @@ func RawModeOff() (err error) {
// listen exit signals and restore terminal state
func catchTerminate(quit chan struct{}) {
sig := make(chan os.Signal, 1)
if autoTerminate {
signal.Notify(sig, unlockSignals...)
defer signal.Stop(sig)
}
select {
case <-quit:
RawModeOff()

View file

@ -111,7 +111,7 @@ func termWidthTPut() (width int, err error) {
return strconv.Atoi(string(res))
}
func getCursorPos() (pos coordinates, err error) {
func GetCursorPos() (pos coordinates, err error) {
var info consoleScreenBufferInfo
_, _, e := syscall.Syscall(procGetConsoleScreenBufferInfo.Addr(), 2, uintptr(syscall.Stdout), uintptr(unsafe.Pointer(&info)), 0)
if e != 0 {
@ -120,7 +120,7 @@ func getCursorPos() (pos coordinates, err error) {
return info.dwCursorPosition, nil
}
func setCursorPos(pos coordinates) error {
func SetCursorPos(pos coordinates) error {
_, _, e := syscall.Syscall(setConsoleCursorPosition.Addr(), 2, uintptr(syscall.Stdout), uintptr(uint32(uint16(pos.Y))<<16|uint32(uint16(pos.X))), 0)
if e != 0 {
return error(e)

View file

@ -35,6 +35,12 @@ func init() {
// TerminalWidth returns width of the terminal.
func TerminalWidth() (int, error) {
_, c, err := TerminalSize()
return c, err
}
// TerminalSize returns size of the terminal.
func TerminalSize() (rows, cols int, err error) {
w := new(window)
res, _, err := syscall.Syscall(sysIoctl,
tty.Fd(),
@ -42,9 +48,9 @@ func TerminalWidth() (int, error) {
uintptr(unsafe.Pointer(w)),
)
if int(res) == -1 {
return 0, err
return 0, 0, err
}
return int(w.Col), nil
return int(w.Row), int(w.Col), nil
}
var oldState syscall.Termios

2
vendor/modules.txt vendored
View file

@ -93,7 +93,7 @@ github.com/beorn7/perks/quantile
# github.com/cespare/xxhash/v2 v2.1.2
## explicit; go 1.11
github.com/cespare/xxhash/v2
# github.com/cheggaaa/pb/v3 v3.0.8
# github.com/cheggaaa/pb/v3 v3.0.9-0.20211222075416-90c02fa07ea4
## explicit; go 1.12
github.com/cheggaaa/pb/v3
github.com/cheggaaa/pb/v3/termutil