mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
b2294d1cf1
add progress bars to the VM importer The new progress bars supposed to display the processing speed per each VM importer worker. This info should help to identify if there is a bottleneck on the VM side during the import process, without waiting for its finish. The new progress bars can be disabled by passing `vm-disable-progress-bar` flag. Plotting multiple progress bars requires using experimental progress bar pool from github.com/cheggaaa/pb/v3. Switch to progress bar pool required changes in all import modes. The openTSDB mode wasn't changed due to its implementation, which implies individual progress bars per each series. Because of this, using the pool wasn't possible. Signed-off-by: dmitryk-dk <kozlovdmitriyy@gmail.com> Co-authored-by: hagen1778 <roman@victoriametrics.com>
138 lines
3.1 KiB
Go
138 lines
3.1 KiB
Go
package main
|
|
|
|
import (
|
|
"fmt"
|
|
"log"
|
|
"sync"
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/barpool"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/prometheus"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/vm"
|
|
"github.com/prometheus/prometheus/tsdb"
|
|
)
|
|
|
|
type prometheusProcessor struct {
|
|
// prometheus client fetches and reads
|
|
// snapshot blocks
|
|
cl *prometheus.Client
|
|
// importer performs import requests
|
|
// for timeseries data returned from
|
|
// snapshot blocks
|
|
im *vm.Importer
|
|
// cc stands for concurrency
|
|
// and defines number of concurrently
|
|
// running snapshot block readers
|
|
cc int
|
|
}
|
|
|
|
func (pp *prometheusProcessor) run(silent, verbose bool) error {
|
|
blocks, err := pp.cl.Explore()
|
|
if err != nil {
|
|
return fmt.Errorf("explore failed: %s", err)
|
|
}
|
|
if len(blocks) < 1 {
|
|
return fmt.Errorf("found no blocks to import")
|
|
}
|
|
question := fmt.Sprintf("Found %d blocks to import. Continue?", len(blocks))
|
|
if !silent && !prompt(question) {
|
|
return nil
|
|
}
|
|
|
|
bar := barpool.AddWithTemplate(fmt.Sprintf(barTpl, "Processing blocks"), len(blocks))
|
|
|
|
if err := barpool.Start(); err != nil {
|
|
return err
|
|
}
|
|
|
|
blockReadersCh := make(chan tsdb.BlockReader)
|
|
errCh := make(chan error, pp.cc)
|
|
pp.im.ResetStats()
|
|
|
|
var wg sync.WaitGroup
|
|
wg.Add(pp.cc)
|
|
for i := 0; i < pp.cc; i++ {
|
|
go func() {
|
|
defer wg.Done()
|
|
for br := range blockReadersCh {
|
|
if err := pp.do(br); err != nil {
|
|
errCh <- fmt.Errorf("read failed for block %q: %s", br.Meta().ULID, err)
|
|
return
|
|
}
|
|
bar.Increment()
|
|
}
|
|
}()
|
|
}
|
|
|
|
// any error breaks the import
|
|
for _, br := range blocks {
|
|
select {
|
|
case promErr := <-errCh:
|
|
close(blockReadersCh)
|
|
return fmt.Errorf("prometheus error: %s", promErr)
|
|
case vmErr := <-pp.im.Errors():
|
|
close(blockReadersCh)
|
|
return fmt.Errorf("import process failed: %s", wrapErr(vmErr, verbose))
|
|
case blockReadersCh <- br:
|
|
}
|
|
}
|
|
|
|
close(blockReadersCh)
|
|
wg.Wait()
|
|
// wait for all buffers to flush
|
|
pp.im.Close()
|
|
// drain import errors channel
|
|
for vmErr := range pp.im.Errors() {
|
|
if vmErr.Err != nil {
|
|
return fmt.Errorf("import process failed: %s", wrapErr(vmErr, verbose))
|
|
}
|
|
}
|
|
barpool.Stop()
|
|
log.Println("Import finished!")
|
|
log.Print(pp.im.Stats())
|
|
return nil
|
|
}
|
|
|
|
func (pp *prometheusProcessor) do(b tsdb.BlockReader) error {
|
|
ss, err := pp.cl.Read(b)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to read block: %s", err)
|
|
}
|
|
for ss.Next() {
|
|
var name string
|
|
var labels []vm.LabelPair
|
|
series := ss.At()
|
|
|
|
for _, label := range series.Labels() {
|
|
if label.Name == "__name__" {
|
|
name = label.Value
|
|
continue
|
|
}
|
|
labels = append(labels, vm.LabelPair{
|
|
Name: label.Name,
|
|
Value: label.Value,
|
|
})
|
|
}
|
|
if name == "" {
|
|
return fmt.Errorf("failed to find `__name__` label in labelset for block %v", b.Meta().ULID)
|
|
}
|
|
|
|
var timestamps []int64
|
|
var values []float64
|
|
it := series.Iterator()
|
|
for it.Next() {
|
|
t, v := it.At()
|
|
timestamps = append(timestamps, t)
|
|
values = append(values, v)
|
|
}
|
|
if err := it.Err(); err != nil {
|
|
return err
|
|
}
|
|
pp.im.Input() <- &vm.TimeSeries{
|
|
Name: name,
|
|
LabelPairs: labels,
|
|
Timestamps: timestamps,
|
|
Values: values,
|
|
}
|
|
}
|
|
return ss.Err()
|
|
}
|