package main import ( "fmt" "log" "sync" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/barpool" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/prometheus" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/vm" "github.com/prometheus/prometheus/tsdb" ) type prometheusProcessor struct { // prometheus client fetches and reads // snapshot blocks cl *prometheus.Client // importer performs import requests // for timeseries data returned from // snapshot blocks im *vm.Importer // cc stands for concurrency // and defines number of concurrently // running snapshot block readers cc int } func (pp *prometheusProcessor) run(silent, verbose bool) error { blocks, err := pp.cl.Explore() if err != nil { return fmt.Errorf("explore failed: %s", err) } if len(blocks) < 1 { return fmt.Errorf("found no blocks to import") } question := fmt.Sprintf("Found %d blocks to import. Continue?", len(blocks)) if !silent && !prompt(question) { return nil } bar := barpool.AddWithTemplate(fmt.Sprintf(barTpl, "Processing blocks"), len(blocks)) if err := barpool.Start(); err != nil { return err } blockReadersCh := make(chan tsdb.BlockReader) errCh := make(chan error, pp.cc) pp.im.ResetStats() var wg sync.WaitGroup wg.Add(pp.cc) for i := 0; i < pp.cc; i++ { go func() { defer wg.Done() for br := range blockReadersCh { if err := pp.do(br); err != nil { errCh <- fmt.Errorf("read failed for block %q: %s", br.Meta().ULID, err) return } bar.Increment() } }() } // any error breaks the import for _, br := range blocks { select { case promErr := <-errCh: close(blockReadersCh) return fmt.Errorf("prometheus error: %s", promErr) case vmErr := <-pp.im.Errors(): close(blockReadersCh) return fmt.Errorf("import process failed: %s", wrapErr(vmErr, verbose)) case blockReadersCh <- br: } } close(blockReadersCh) wg.Wait() // wait for all buffers to flush pp.im.Close() close(errCh) // drain import errors channel for vmErr := range pp.im.Errors() { if vmErr.Err != nil { return fmt.Errorf("import process failed: %s", wrapErr(vmErr, verbose)) } } for err := range errCh { return fmt.Errorf("import process failed: %s", err) } barpool.Stop() log.Println("Import finished!") log.Print(pp.im.Stats()) return nil } func (pp *prometheusProcessor) do(b tsdb.BlockReader) error { ss, err := pp.cl.Read(b) if err != nil { return fmt.Errorf("failed to read block: %s", err) } for ss.Next() { var name string var labels []vm.LabelPair series := ss.At() for _, label := range series.Labels() { if label.Name == "__name__" { name = label.Value continue } labels = append(labels, vm.LabelPair{ Name: label.Name, Value: label.Value, }) } if name == "" { return fmt.Errorf("failed to find `__name__` label in labelset for block %v", b.Meta().ULID) } var timestamps []int64 var values []float64 it := series.Iterator() for it.Next() { t, v := it.At() timestamps = append(timestamps, t) values = append(values, v) } if err := it.Err(); err != nil { return err } ts := vm.TimeSeries{ Name: name, LabelPairs: labels, Timestamps: timestamps, Values: values, } if err := pp.im.Input(&ts); err != nil { return err } } return ss.Err() }