OpenTSDB migration to VictoriaMetrics (#1089)

This commit is contained in:
John Seekins 2021-04-08 13:58:06 -06:00 committed by Aliaksandr Valialkin
parent c4f6b79d76
commit 97fafce028
9 changed files with 1517 additions and 0 deletions

View file

@ -7,8 +7,37 @@ Features:
- [x] Thanos: migrate data from Thanos to VictoriaMetrics
- [ ] ~~Prometheus: migrate data from Prometheus to VictoriaMetrics by query~~(discarded)
- [x] InfluxDB: migrate data from InfluxDB to VictoriaMetrics
- [x] OpenTSDB: migrate data from OpenTSDB to VictoriaMetrics
- [ ] Storage Management: data re-balancing between nodes
# Table of contents
* [Articles](#articles)
* [How to build](#how-to-build)
* [Migrating data from OpenTSDB](#migrating-data-from-opentsdb)
* [Migrating data from InfluxDB 1.x](#migrating-data-from-influxdb-1x)
* [Data mapping](#data-mapping)
* [Configuration](#configuration)
* [Filtering](#filtering)
* [Migrating data from InfluxDB 2.x](#migrating-data-from-influxdb-2x)
* [Migrating data from Prometheus](#migrating-data-from-prometheus)
* [Data mapping](#data-mapping-1)
* [Configuration](#configuration-1)
* [Filtering](#filtering-1)
* [Migrating data from Thanos](#migrating-data-from-thanos)
* [Current data](#current-data)
* [Historical data](#historical-data)
* [Migrating data from VictoriaMetrics](#migrating-data-from-victoriametrics)
* [Native protocol](#native-protocol)
* [Tuning](#tuning)
* [Influx mode](#influx-mode)
* [Prometheus mode](#prometheus-mode)
* [VictoriaMetrics importer](#victoriametrics-importer)
* [Importer stats](#importer-stats)
* [Significant figures](#significant-figures)
* [Adding extra labels](#adding-extra-labels)
## Articles
* [How to migrate data from Prometheus](https://medium.com/@romanhavronenko/victoriametrics-how-to-migrate-data-from-prometheus-d44a6728f043)
@ -60,6 +89,33 @@ ARM build may run on Raspberry Pi or on [energy-efficient ARM servers](https://b
2. Run `make vmctl-arm-prod` or `make vmctl-arm64-prod` from the root folder of [the repository](https://github.com/VictoriaMetrics/VictoriaMetrics).
It builds `vmctl-arm-prod` or `vmctl-arm64-prod` binary respectively and puts it into the `bin` folder.
## Migrating data from OpenTSDB
`vmctl` supports the `opentsdb` mode to migrate data from OpenTSDB to VictoriaMetrics time-series database.
See `./vmctl opentsdb --help` for details and full list of flags.
*OpenTSDB migration is not possible without a functioning [meta](http://opentsdb.net/docs/build/html/user_guide/metadata.html) table to search for metrics/series.*
OpenTSDB migration works like so:
1. Find metrics based on selected filters (or the default filter set ['a','b','c','d','e','f','g','h','i','j','k','l','m','n','o','p','q','r','s','t','u','v','w','x','y','z'])
* e.g. `curl -Ss "http://opentsdb:4242/api/suggest?type=metrics&q=sys"`
2. Find series associated with each returned metric
* e.g. `curl -Ss "http://opentsdb:4242/api/search/lookup?m=system.load5&limit=1000000"`
3. Download data for each series in chunks defined in the CLI switches
* e.g. `-retention=sum-1m-avg:1h:90d` ==
* `curl -Ss "http://opentsdb:4242/api/query?start=1h-ago&end=now&m=sum:1m-avg-none:system.load5\{host=host1\}"`
* `curl -Ss "http://opentsdb:4242/api/query?start=2h-ago&end=1h-ago&m=sum:1m-avg-none:system.load5\{host=host1\}"`
* `curl -Ss "http://opentsdb:4242/api/query?start=3h-ago&end=2h-ago&m=sum:1m-avg-none:system.load5\{host=host1\}"`
* ...
* `curl -Ss "http://opentsdb:4242/api/query?start=2160h-ago&end=2159h-ago&m=sum:1m-avg-none:system.load5\{host=host1\}"`
This means that we must stream data from OpenTSDB to VictoriaMetrics in chunks. This is where concurrency for OpenTSDB comes in. We can query multiple chunks at once, but we shouldn't perform too many chunks at a time to avoid overloading the OpenTSDB cluster.
### Restarting OpenTSDB migrations
One important note for OpenTSDB migration: Queries/HBase scans can "get stuck" within OpenTSDB itself. This can cause instability and performance issues within an OpenTSDB cluster, so stopping the migrator to deal with it may be necessary. Because of this, we provide the timstamp we started collecting data from at thebeginning of the run. You can stop and restart the importer using this "hard timestamp" to ensure you collect data from the same time range over multiple runs.
## Migrating data from InfluxDB (1.x)

View file

@ -96,6 +96,78 @@ var (
}
)
const (
otsdbAddr = "otsdb-addr"
otsdbConcurrency = "otsdb-concurrency"
otsdbQueryLimit = "otsdb-query-limit"
otsdbOffsetDays = "otsdb-offset-days"
otsdbHardTSStart = "otsdb-hard-ts-start"
otsdbRetentions = "otsdb-retentions"
otsdbFilters = "otsdb-filters"
otsdbNormalize = "otsdb-normalize"
otsdbMsecsTime = "otsdb-msecstime"
)
var (
otsdbFlags = []cli.Flag{
&cli.StringFlag{
Name: otsdbAddr,
Value: "http://localhost:4242",
Required: true,
Usage: "OpenTSDB server addr",
},
&cli.IntFlag{
Name: otsdbConcurrency,
Usage: "Number of concurrently running fetch queries to OpenTSDB per metric",
Value: 1,
},
&cli.StringSliceFlag{
Name: otsdbRetentions,
Value: nil,
Required: true,
Usage: "Retentions patterns to collect on. Each pattern should describe the aggregation performed " +
"for the query, the row size (in HBase) that will define how long each individual query is, " +
"and the time range to query for. e.g. sum-1m-avg:1h:3d. " +
"The first time range defined should be a multiple of the row size in HBase. " +
"e.g. if the row size is 2 hours, 4h is good, 5h less so. We want each query to land on unique rows.",
},
&cli.StringSliceFlag{
Name: otsdbFilters,
Value: cli.NewStringSlice("a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k", "l", "m", "n", "o", "p", "q", "r", "s", "t", "u", "v", "w", "x", "y", "z"),
Usage: "Filters to process for discovering metrics in OpenTSDB",
},
&cli.Int64Flag{
Name: otsdbOffsetDays,
Usage: "Days to offset our 'starting' point for collecting data from OpenTSDB",
Value: 0,
},
&cli.Int64Flag{
Name: otsdbHardTSStart,
Usage: "A specific timestamp to start from, will override using an offset",
Value: 0,
},
/*
because the defaults are set *extremely* low in OpenTSDB (10-25 results), we will
set a larger default limit, but still allow a user to increase/decrease it
*/
&cli.IntFlag{
Name: otsdbQueryLimit,
Usage: "Result limit on meta queries to OpenTSDB (affects both metric name and tag value queries, recommended to use a value exceeding your largest series)",
Value: 100e3,
},
&cli.BoolFlag{
Name: otsdbMsecsTime,
Value: false,
Usage: "Whether OpenTSDB is writing values in milliseconds or seconds",
},
&cli.BoolFlag{
Name: otsdbNormalize,
Value: false,
Usage: "Whether to normalize all data received to lower case before forwarding to VictoriaMetrics",
},
}
)
const (
influxAddr = "influx-addr"
influxUser = "influx-user"

View file

@ -10,6 +10,7 @@ import (
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/influx"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/opentsdb"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/prometheus"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/vm"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/buildinfo"
@ -23,6 +24,38 @@ func main() {
Usage: "Victoria metrics command-line tool",
Version: buildinfo.Version,
Commands: []*cli.Command{
{
Name: "opentsdb",
Usage: "Migrate timeseries from OpenTSDB",
Flags: mergeFlags(globalFlags, otsdbFlags, vmFlags),
Action: func(c *cli.Context) error {
fmt.Println("OpenTSDB import mode")
oCfg := opentsdb.Config{
Addr: c.String(otsdbAddr),
Limit: c.Int(otsdbQueryLimit),
Offset: c.Int64(otsdbOffsetDays),
HardTS: c.Int64(otsdbHardTSStart),
Retentions: c.StringSlice(otsdbRetentions),
Filters: c.StringSlice(otsdbFilters),
Normalize: c.Bool(otsdbNormalize),
MsecsTime: c.Bool(otsdbMsecsTime),
}
otsdbClient, err := opentsdb.NewClient(oCfg)
if err != nil {
return fmt.Errorf("failed to create opentsdb client: %s", err)
}
vmCfg := initConfigVM(c)
importer, err := vm.NewImporter(vmCfg)
if err != nil {
return fmt.Errorf("failed to create VM importer: %s", err)
}
otsdbProcessor := newOtsdbProcessor(otsdbClient, importer, c.Int(otsdbConcurrency))
return otsdbProcessor.run(c.Bool(globalSilent))
},
},
{
Name: "influx",
Usage: "Migrate timeseries from InfluxDB",

159
app/vmctl/opentsdb.go Normal file
View file

@ -0,0 +1,159 @@
package main
import (
"fmt"
"log"
"sync"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/opentsdb"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/vm"
"github.com/cheggaaa/pb/v3"
)
type otsdbProcessor struct {
oc *opentsdb.Client
im *vm.Importer
otsdbcc int
}
type queryObj struct {
Series opentsdb.Meta
Rt opentsdb.RetentionMeta
Tr opentsdb.TimeRange
StartTime int64
}
func newOtsdbProcessor(oc *opentsdb.Client, im *vm.Importer, otsdbcc int) *otsdbProcessor {
if otsdbcc < 1 {
otsdbcc = 1
}
return &otsdbProcessor{
oc: oc,
im: im,
otsdbcc: otsdbcc,
}
}
func (op *otsdbProcessor) run(silent bool) error {
log.Println("Loading all metrics from OpenTSDB for filters: ", op.oc.Filters)
var metrics []string
for _, filter := range op.oc.Filters {
q := fmt.Sprintf("%s/api/suggest?type=metrics&q=%s&max=%d", op.oc.Addr, filter, op.oc.Limit)
m, err := op.oc.FindMetrics(q)
if err != nil {
return fmt.Errorf("metric discovery failed for %q: %s", q, err)
}
metrics = append(metrics, m...)
}
if len(metrics) < 1 {
return fmt.Errorf("found no timeseries to import with filters %q", op.oc.Filters)
}
question := fmt.Sprintf("Found %d metrics to import. Continue?", len(metrics))
if !silent && !prompt(question) {
return nil
}
op.im.ResetStats()
startTime := time.Now().Unix()
queryRanges := 0
// pre-calculate the number of query ranges we'll be processing
for _, rt := range op.oc.Retentions {
queryRanges += len(rt.QueryRanges)
}
for _, metric := range metrics {
log.Println(fmt.Sprintf("Starting work on %s", metric))
serieslist, err := op.oc.FindSeries(metric)
if err != nil {
return fmt.Errorf("couldn't retrieve series list for %s : %s", metric, err)
}
/*
Create channels for collecting/processing series and errors
We'll create them per metric to reduce pressure against OpenTSDB
Limit the size of seriesCh so we can't get too far ahead of actual processing
*/
seriesCh := make(chan queryObj, op.otsdbcc)
errCh := make(chan error)
// we're going to make serieslist * queryRanges queries, so we should represent that in the progress bar
bar := pb.StartNew(len(serieslist) * queryRanges)
var wg sync.WaitGroup
wg.Add(op.otsdbcc)
for i := 0; i < op.otsdbcc; i++ {
go func() {
defer wg.Done()
for s := range seriesCh {
if err := op.do(s); err != nil {
errCh <- fmt.Errorf("couldn't retrieve series for %s : %s", metric, err)
return
}
bar.Increment()
}
}()
}
/*
Loop through all series for this metric, processing all retentions and time ranges
requested. This loop is our primary "collect data from OpenTSDB loop" and should
be async, sending data to VictoriaMetrics over time.
The idea with having the select at the inner-most loop is to ensure quick
short-circuiting on error.
*/
for _, series := range serieslist {
for _, rt := range op.oc.Retentions {
for _, tr := range rt.QueryRanges {
select {
case otsdbErr := <-errCh:
return fmt.Errorf("opentsdb error: %s", otsdbErr)
case vmErr := <-op.im.Errors():
return fmt.Errorf("Import process failed: \n%s", wrapErr(vmErr))
case seriesCh <- queryObj{
Tr: tr, StartTime: startTime,
Series: series, Rt: opentsdb.RetentionMeta{
FirstOrder: rt.FirstOrder, SecondOrder: rt.SecondOrder, AggTime: rt.AggTime}}:
}
}
}
}
// Drain channels per metric
close(seriesCh)
wg.Wait()
close(errCh)
// check for any lingering errors on the query side
for otsdbErr := range errCh {
return fmt.Errorf("Import process failed: \n%s", otsdbErr)
}
bar.Finish()
log.Print(op.im.Stats())
}
op.im.Close()
for vmErr := range op.im.Errors() {
return fmt.Errorf("Import process failed: \n%s", wrapErr(vmErr))
}
log.Println("Import finished!")
log.Print(op.im.Stats())
return nil
}
func (op *otsdbProcessor) do(s queryObj) error {
start := s.StartTime - s.Tr.Start
end := s.StartTime - s.Tr.End
data, err := op.oc.GetData(s.Series, s.Rt, start, end)
if err != nil {
return fmt.Errorf("failed to collect data for %v in %v:%v :: %v", s.Series, s.Rt, s.Tr, err)
}
if len(data.Timestamps) < 1 || len(data.Values) < 1 {
return nil
}
labels := make([]vm.LabelPair, len(data.Tags))
for k, v := range data.Tags {
labels = append(labels, vm.LabelPair{Name: k, Value: v})
}
op.im.Input() <- &vm.TimeSeries{
Name: data.Metric,
LabelPairs: labels,
Timestamps: data.Timestamps,
Values: data.Values,
}
return nil
}

View file

@ -0,0 +1,347 @@
package opentsdb
import (
"bytes"
"encoding/json"
"fmt"
"io/ioutil"
"log"
"net/http"
"strings"
"time"
)
// Retention objects contain meta data about what to query for our run
type Retention struct {
/*
OpenTSDB has two levels of aggregation,
First, we aggregate any un-mentioned tags into the last result
Second, we aggregate into buckets over time
To simulate this with config, we have
FirstOrder (e.g. sum/avg/max/etc.)
SecondOrder (e.g. sum/avg/max/etc.)
AggTime (e.g. 1m/10m/1d/etc.)
This will build into m=<FirstOrder>:<AggTime>-<SecondOrder>-none:
Or an example: m=sum:1m-avg-none
*/
FirstOrder string
SecondOrder string
AggTime string
// The actual ranges will will attempt to query (as offsets from now)
QueryRanges []TimeRange
}
// RetentionMeta objects exist to pass smaller subsets (only one retention range) of a full Retention object around
type RetentionMeta struct {
FirstOrder string
SecondOrder string
AggTime string
}
// Client object holds general config about how queries should be performed
type Client struct {
Addr string
// The meta query limit for series returned
Limit int
Retentions []Retention
Filters []string
Normalize bool
}
// Config contains fields required
// for Client configuration
type Config struct {
Addr string
Limit int
Offset int64
HardTS int64
Retentions []string
Filters []string
Normalize bool
MsecsTime bool
}
// TimeRange contains data about time ranges to query
type TimeRange struct {
Start int64
End int64
}
// MetaResults contains return data from search series lookup queries
type MetaResults struct {
Type string `json:"type"`
Results []Meta `json:"results"`
//metric string
//tags interface{}
//limit int
//time int
//startIndex int
//totalResults int
}
// Meta A meta object about a metric
// only contain the tags/etc. and no data
type Meta struct {
//tsuid string
Metric string `json:"metric"`
Tags map[string]string `json:"tags"`
}
// Metric holds the time series data
type Metric struct {
Metric string
Tags map[string]string
Timestamps []int64
Values []float64
}
// ExpressionOutput contains results from actual data queries
type ExpressionOutput struct {
Outputs []qoObj `json:"outputs"`
Query interface{} `json:"query"`
}
// QoObj contains actual timeseries data from the returned data query
type qoObj struct {
ID string `json:"id"`
Alias string `json:"alias"`
Dps [][]float64 `json:"dps"`
//dpsMeta interface{}
//meta interface{}
}
// Expression objects format our data queries
/*
All of the following structs are to build a OpenTSDB expression object
*/
type Expression struct {
Time timeObj `json:"time"`
Filters []filterObj `json:"filters"`
Metrics []metricObj `json:"metrics"`
// this just needs to be an empty object, so the value doesn't matter
Expressions []int `json:"expressions"`
Outputs []outputObj `json:"outputs"`
}
type timeObj struct {
Start int64 `json:"start"`
End int64 `json:"end"`
Aggregator string `json:"aggregator"`
Downsampler dSObj `json:"downsampler"`
}
type dSObj struct {
Interval string `json:"interval"`
Aggregator string `json:"aggregator"`
FillPolicy fillObj `json:"fillPolicy"`
}
type fillObj struct {
// we'll always hard-code to NaN here, so we don't need value
Policy string `json:"policy"`
}
type filterObj struct {
Tags []tagObj `json:"tags"`
ID string `json:"id"`
}
type tagObj struct {
Type string `json:"type"`
Tagk string `json:"tagk"`
Filter string `json:"filter"`
GroupBy bool `json:"groupBy"`
}
type metricObj struct {
ID string `json:"id"`
Metric string `json:"metric"`
Filter string `json:"filter"`
FillPolicy fillObj `json:"fillPolicy"`
}
type outputObj struct {
ID string `json:"id"`
Alias string `json:"alias"`
}
/* End expression object structs */
var (
exprOutput = outputObj{ID: "a", Alias: "query"}
exprFillPolicy = fillObj{Policy: "nan"}
)
// FindMetrics discovers all metrics that OpenTSDB knows about (given a filter)
// e.g. /api/suggest?type=metrics&q=system&max=100000
func (c Client) FindMetrics(q string) ([]string, error) {
resp, err := http.Get(q)
if err != nil {
return nil, fmt.Errorf("failed to send GET request to %q: %s", q, err)
}
if resp.StatusCode != 200 {
return nil, fmt.Errorf("Bad return from OpenTSDB: %q: %v", resp.StatusCode, resp)
}
defer func() { _ = resp.Body.Close() }()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("could not retrieve metric data from %q: %s", q, err)
}
var metriclist []string
err = json.Unmarshal(body, &metriclist)
if err != nil {
return nil, fmt.Errorf("failed to read response from %q: %s", q, err)
}
return metriclist, nil
}
// FindSeries discovers all series associated with a metric
// e.g. /api/search/lookup?m=system.load5&limit=1000000
func (c Client) FindSeries(metric string) ([]Meta, error) {
q := fmt.Sprintf("%s/api/search/lookup?m=%s&limit=%d", c.Addr, metric, c.Limit)
resp, err := http.Get(q)
if err != nil {
return nil, fmt.Errorf("failed to set GET request to %q: %s", q, err)
}
if resp.StatusCode != 200 {
return nil, fmt.Errorf("Bad return from OpenTSDB: %q: %v", resp.StatusCode, resp)
}
defer func() { _ = resp.Body.Close() }()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("could not retrieve series data from %q: %s", q, err)
}
var results MetaResults
err = json.Unmarshal(body, &results)
if err != nil {
return nil, fmt.Errorf("failed to read response from %q: %s", q, err)
}
return results.Results, nil
}
// GetData actually retrieves data for a series at a specified time range
func (c Client) GetData(series Meta, rt RetentionMeta, start int64, end int64) (Metric, error) {
/*
Here we build the actual exp query we'll send to OpenTSDB
This is comprised of a number of different settings. We hard-code
a few to simplify the JSON object creation.
There are examples queries available, so not too much detail here...
*/
expr := Expression{}
expr.Outputs = []outputObj{exprOutput}
expr.Metrics = append(expr.Metrics, metricObj{ID: "a", Metric: series.Metric,
Filter: "f1", FillPolicy: exprFillPolicy})
expr.Time = timeObj{Start: start, End: end, Aggregator: rt.FirstOrder,
Downsampler: dSObj{Interval: rt.AggTime,
Aggregator: rt.SecondOrder,
FillPolicy: exprFillPolicy}}
var TagList []tagObj
for k, v := range series.Tags {
/*
every tag should be a literal_or because that's the closest to a full "==" that
this endpoint allows for
*/
TagList = append(TagList, tagObj{Type: "literal_or", Tagk: k,
Filter: v, GroupBy: true})
}
expr.Filters = append(expr.Filters, filterObj{ID: "f1", Tags: TagList})
// "expressions" is required in the query object or we get a 5xx, so force it to exist
expr.Expressions = make([]int, 0)
inputData, err := json.Marshal(expr)
if err != nil {
return Metric{}, fmt.Errorf("failed to marshal query JSON %s", err)
}
q := fmt.Sprintf("%s/api/query/exp", c.Addr)
resp, err := http.Post(q, "application/json", bytes.NewBuffer(inputData))
if err != nil {
return Metric{}, fmt.Errorf("failed to send GET request to %q: %s", q, err)
}
if resp.StatusCode != 200 {
return Metric{}, fmt.Errorf("Bad return from OpenTSDB: %q: %v", resp.StatusCode, resp)
}
defer func() { _ = resp.Body.Close() }()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return Metric{}, fmt.Errorf("could not retrieve series data from %q: %s", q, err)
}
var output ExpressionOutput
err = json.Unmarshal(body, &output)
if err != nil {
return Metric{}, fmt.Errorf("failed to unmarshal response from %q: %s", q, err)
}
if len(output.Outputs) < 1 {
// no results returned...return an empty object without error
return Metric{}, nil
}
data := Metric{}
data.Metric = series.Metric
data.Tags = series.Tags
/*
We evaluate data for correctness before formatting the actual values
to skip a little bit of time if the series has invalid formatting
First step is to enforce Prometheus' data model
*/
data, err = modifyData(data, c.Normalize)
if err != nil {
return Metric{}, fmt.Errorf("invalid series data from %q: %s", q, err)
}
/*
Convert data from OpenTSDB's output format ([[ts,val],[ts,val]...])
to VictoriaMetrics format: {"timestamps": [ts,ts,ts...], "values": [val,val,val...]}
The nasty part here is that because an object in each array
can be a float64, we have to initially cast _all_ objects that way
then convert the timestamp back to something reasonable.
*/
for _, tsobj := range output.Outputs[0].Dps {
data.Timestamps = append(data.Timestamps, int64(tsobj[0]))
data.Values = append(data.Values, tsobj[1])
}
return data, nil
}
// NewClient creates and returns OpenTSDB client
// configured with passed Config
func NewClient(cfg Config) (*Client, error) {
var retentions []Retention
offsetPrint := int64(time.Now().Unix())
if cfg.MsecsTime {
// 1000000 == Nanoseconds -> Milliseconds difference
offsetPrint = int64(time.Now().UnixNano() / 1000000)
}
if cfg.HardTS > 0 {
/*
"Hard" offsets are specific timestamps, rather than
a relative number of days. To use them effectively
we should subtract them from our default offset (Now)
*/
offsetPrint = offsetPrint - cfg.HardTS
} else if cfg.Offset > 0 {
/*
Our "offset" is the number of days we should step
back before starting to scan for data
*/
if cfg.MsecsTime {
offsetPrint = offsetPrint - (cfg.Offset * 24 * 60 * 60 * 1000)
} else {
offsetPrint = offsetPrint - (cfg.Offset * 24 * 60 * 60)
}
}
log.Println(fmt.Sprintf("Will collect data starting at TS %v", offsetPrint))
for _, r := range cfg.Retentions {
ret, err := convertRetention(r, offsetPrint, cfg.MsecsTime)
if err != nil {
return &Client{}, fmt.Errorf("Couldn't parse retention %q :: %v", r, err)
}
retentions = append(retentions, ret)
}
client := &Client{
Addr: strings.Trim(cfg.Addr, "/"),
Retentions: retentions,
Limit: cfg.Limit,
Filters: cfg.Filters,
Normalize: cfg.Normalize,
}
return client, nil
}

View file

@ -0,0 +1,173 @@
package opentsdb
import (
"fmt"
"regexp"
"strconv"
"strings"
"time"
)
var (
allowedNames = regexp.MustCompile("^[a-zA-Z][a-zA-Z0-9_:]*$")
allowedFirstChar = regexp.MustCompile("^[a-zA-Z]")
replaceChars = regexp.MustCompile("[^a-zA-Z0-9_:]")
allowedTagKeys = regexp.MustCompile("^[a-zA-Z][a-zA-Z0-9_]*$")
)
func convertDuration(duration string) (time.Duration, error) {
/*
Golang's time library doesn't support many different
string formats (year, month, week, day) because they
aren't consistent ranges. But Java's library _does_.
Consequently, we'll need to handle all the custom
time ranges, and, to make the internal API call consistent,
we'll need to allow for durations that Go supports, too.
The nice thing is all the "broken" time ranges are > 1 hour,
so we can just make assumptions to convert them to a range in hours.
They aren't *good* assumptions, but they're reasonable
for this function.
*/
var actualDuration time.Duration
var err error
var timeValue int
if strings.HasSuffix(duration, "y") {
timeValue, err = strconv.Atoi(strings.Trim(duration, "y"))
if err != nil {
return 0, fmt.Errorf("invalid time range: %q", duration)
}
timeValue = timeValue * 365 * 24
actualDuration, err = time.ParseDuration(fmt.Sprintf("%vh", timeValue))
if err != nil {
return 0, fmt.Errorf("invalid time range: %q", duration)
}
} else if strings.HasSuffix(duration, "w") {
timeValue, err = strconv.Atoi(strings.Trim(duration, "w"))
if err != nil {
return 0, fmt.Errorf("invalid time range: %q", duration)
}
timeValue = timeValue * 7 * 24
actualDuration, err = time.ParseDuration(fmt.Sprintf("%vh", timeValue))
if err != nil {
return 0, fmt.Errorf("invalid time range: %q", duration)
}
} else if strings.HasSuffix(duration, "d") {
timeValue, err = strconv.Atoi(strings.Trim(duration, "d"))
if err != nil {
return 0, fmt.Errorf("invalid time range: %q", duration)
}
timeValue = timeValue * 24
actualDuration, err = time.ParseDuration(fmt.Sprintf("%vh", timeValue))
if err != nil {
return 0, fmt.Errorf("invalid time range: %q", duration)
}
} else if strings.HasSuffix(duration, "h") || strings.HasSuffix(duration, "m") || strings.HasSuffix(duration, "s") || strings.HasSuffix(duration, "ms") {
actualDuration, err = time.ParseDuration(duration)
if err != nil {
return 0, fmt.Errorf("invalid time range: %q", duration)
}
} else {
return 0, fmt.Errorf("invalid time duration string: %q", duration)
}
return actualDuration, nil
}
// Convert an incoming retention "string" into the component parts
func convertRetention(retention string, offset int64, msecTime bool) (Retention, error) {
/*
A retention string coming in looks like
sum-1m-avg:1h:30d
So we:
1. split on the :
2. split on the - in slice 0
3. create the time ranges we actually need
*/
chunks := strings.Split(retention, ":")
if len(chunks) != 3 {
return Retention{}, fmt.Errorf("invalid retention string: %q", retention)
}
rowLengthDuration, err := convertDuration(chunks[1])
if err != nil {
return Retention{}, fmt.Errorf("invalid row length (first order) duration string: %q: %s", chunks[1], err)
}
// set length of each row in milliseconds, unless we aren't using millisecond time in OpenTSDB...then use seconds
rowLength := rowLengthDuration.Milliseconds()
if !msecTime {
rowLength = rowLength / 1000
}
ttlDuration, err := convertDuration(chunks[2])
if err != nil {
return Retention{}, fmt.Errorf("invalid ttl (second order) duration string: %q: %s", chunks[2], err)
}
// set ttl in milliseconds, unless we aren't using millisecond time in OpenTSDB...then use seconds
ttl := ttlDuration.Milliseconds()
if !msecTime {
ttl = ttl / 1000
}
// bump by the offset so we don't look at empty ranges any time offset > ttl
ttl += offset
var timeChunks []TimeRange
var i int64
for i = offset; i <= ttl; i = i + rowLength {
timeChunks = append(timeChunks, TimeRange{Start: i + rowLength, End: i})
}
// first/second order aggregations for queries defined in chunk 0...
aggregates := strings.Split(chunks[0], "-")
if len(aggregates) != 3 {
return Retention{}, fmt.Errorf("invalid aggregation string: %q", chunks[0])
}
ret := Retention{FirstOrder: aggregates[0],
SecondOrder: aggregates[2],
AggTime: aggregates[1],
QueryRanges: timeChunks}
return ret, nil
}
// This ensures any incoming data from OpenTSDB matches the Prometheus data model
// https://prometheus.io/docs/concepts/data_model
func modifyData(msg Metric, normalize bool) (Metric, error) {
finalMsg := Metric{
Metric: "", Tags: make(map[string]string),
Timestamps: msg.Timestamps, Values: msg.Values,
}
// if the metric name has invalid characters, the data model says to drop it
if !allowedFirstChar.MatchString(msg.Metric) {
return Metric{}, fmt.Errorf("%s has a bad first character", msg.Metric)
}
name := msg.Metric
// if normalization requested, lowercase the name
if normalize {
name = strings.ToLower(name)
}
/*
replace bad characters in metric name with _ per the data model
only replace if needed to reduce string processing time
*/
if !allowedNames.MatchString(name) {
finalMsg.Metric = replaceChars.ReplaceAllString(name, "_")
} else {
finalMsg.Metric = name
}
// replace bad characters in tag keys with _ per the data model
for key, value := range msg.Tags {
// if normalization requested, lowercase the key and value
if normalize {
key = strings.ToLower(key)
value = strings.ToLower(value)
}
/*
replace all explicitly bad characters with _
only replace if needed to reduce string processing time
*/
if !allowedTagKeys.MatchString(key) {
key = replaceChars.ReplaceAllString(key, "_")
}
// tags that start with __ are considered custom stats for internal prometheus stuff, we should drop them
if !strings.HasPrefix(key, "__") {
finalMsg.Tags[key] = value
}
}
return finalMsg, nil
}

View file

@ -0,0 +1,217 @@
package opentsdb
import (
"testing"
)
func TestConvertRetention(t *testing.T) {
/*
2592000 seconds in 30 days
3600 in one hour
2592000 / 3600 = 720 individual query "ranges" should exist, plus one because time ranges can be weird
First order should == "sum"
Second order should == "avg"
AggTime should == "1m"
*/
res, err := convertRetention("sum-1m-avg:1h:30d", 0, false)
if err != nil {
t.Fatalf("Error parsing valid retention string: %v", err)
}
if len(res.QueryRanges) != 721 {
t.Fatalf("Found %v query ranges. Should have found 720", len(res.QueryRanges))
}
if res.FirstOrder != "sum" {
t.Fatalf("Incorrect first order aggregation %q. Should have been 'sum'", res.FirstOrder)
}
if res.SecondOrder != "avg" {
t.Fatalf("Incorrect second order aggregation %q. Should have been 'avg'", res.SecondOrder)
}
if res.AggTime != "1m" {
t.Fatalf("Incorrect aggregation time length %q. Should have been '1m'", res.AggTime)
}
/*
Invalid retention string
*/
res, err = convertRetention("sum-1m-avg:30d", 0, false)
if err == nil {
t.Fatalf("Bad retention string (sum-1m-avg:30d) didn't fail: %v", res)
}
/*
Invalid aggregation string
*/
res, err = convertRetention("sum-1m:1h:30d", 0, false)
if err == nil {
t.Fatalf("Bad aggregation string (sum-1m:1h:30d) didn't fail: %v", res)
}
}
func TestModifyData(t *testing.T) {
/*
Good metric metadata
*/
m := Metric{
Metric: "cpu",
Tags: map[string]string{
"core": "0",
},
Values: []float64{
0,
},
Timestamps: []int64{
0,
},
}
res, err := modifyData(m, false)
if err != nil {
t.Fatalf("Valid metric %v failed to parse: %v", m, err)
}
if res.Metric != "cpu" {
t.Fatalf("Valid metric name %q was converted: %q", m.Metric, res.Metric)
}
found := false
for k := range res.Tags {
if k == "core" {
found = true
break
}
}
if !found {
t.Fatalf("Valid metric tag name 'core' missing: %v", res.Tags)
}
/*
Bad first character in metric name
metric names cannot start with _, so this
metric should fail entirely
*/
m = Metric{
Metric: "_cpu",
Tags: map[string]string{
"core": "0",
},
Values: []float64{
0,
},
Timestamps: []int64{
0,
},
}
res, err = modifyData(m, false)
if err == nil {
t.Fatalf("Invalid metric %v parsed?", m)
}
/*
Bad character in metric name
metric names cannot have `.`, so this
should be converted to `_`
*/
m = Metric{
Metric: "cpu.name",
Tags: map[string]string{
"core": "0",
},
Values: []float64{
0,
},
Timestamps: []int64{
0,
},
}
res, err = modifyData(m, false)
if err != nil {
t.Fatalf("Valid metric failed to parse? %v", err)
}
if res.Metric != "cpu_name" {
t.Fatalf("Metric name not correctly converted from 'cpu.name' to 'cpu_name': %q", res.Metric)
}
/*
bad tag prefix (__)
Prometheus considers tags beginning with __
to be internal use only. They should not show up in incoming data.
this tag should be dropped from the result
*/
m = Metric{
Metric: "cpu",
Tags: map[string]string{
"__core": "0",
},
Values: []float64{
0,
},
Timestamps: []int64{
0,
},
}
res, err = modifyData(m, false)
if err != nil {
t.Fatalf("Valid metric failed to parse? %v", err)
}
found = false
for k := range res.Tags {
if k == "__core" {
found = true
break
}
}
if found {
t.Fatalf("Bad tag key prefix (__) found")
}
/*
bad tag key
tag keys cannot contain `.`, this should be
replaced with `_`
*/
m = Metric{
Metric: "cpu",
Tags: map[string]string{
"core.name": "0",
},
Values: []float64{
0,
},
Timestamps: []int64{
0,
},
}
res, err = modifyData(m, false)
if err != nil {
t.Fatalf("Valid metric failed to parse? %v", err)
}
found = false
for k := range res.Tags {
if k == "core.name" {
found = true
break
}
}
if found {
t.Fatalf("Bad tag key 'core.name' not converted")
}
/*
test normalize
All characters should be returned lowercase
*/
m = Metric{
Metric: "CPU",
Tags: map[string]string{
"core": "0",
},
Values: []float64{
0,
},
Timestamps: []int64{
0,
},
}
res, err = modifyData(m, true)
if err != nil {
t.Fatalf("Valid metric failed to parse? %v", err)
}
if res.Metric != "cpu" {
t.Fatalf("Normalization of metric name didn't happen!")
}
}

View file

@ -0,0 +1,398 @@
{
"outputs": [
{
"id": "a",
"alias": "query",
"dps": [
[
1614099600000,
0.28
],
[
1614099660000,
0.22
],
[
1614099720000,
0.18
],
[
1614099780000,
0.14
],
[
1614099840000,
0.24
],
[
1614099900000,
0.19
],
[
1614099960000,
0.22
],
[
1614100020000,
0.2
],
[
1614100080000,
0.18
],
[
1614100140000,
0.22
],
[
1614100200000,
0.17
],
[
1614100260000,
0.16
],
[
1614100320000,
0.22
],
[
1614100380000,
0.3
],
[
1614100440000,
0.28
],
[
1614100500000,
0.27
],
[
1614100560000,
0.26
],
[
1614100620000,
0.23
],
[
1614100680000,
0.18
],
[
1614100740000,
0.3
],
[
1614100800000,
0.24
],
[
1614100860000,
0.19
],
[
1614100920000,
0.16
],
[
1614100980000,
0.19
],
[
1614101040000,
0.23
],
[
1614101100000,
0.18
],
[
1614101160000,
0.15
],
[
1614101220000,
0.12
],
[
1614101280000,
0.1
],
[
1614101340000,
0.24
],
[
1614101400000,
0.19
],
[
1614101460000,
0.16
],
[
1614101520000,
0.14
],
[
1614101580000,
0.12
],
[
1614101640000,
0.14
],
[
1614101700000,
0.12
],
[
1614101760000,
0.13
],
[
1614101820000,
0.12
],
[
1614101880000,
0.11
],
[
1614101940000,
0.36
],
[
1614102000000,
0.35
],
[
1614102060000,
0.3
],
[
1614102120000,
0.32
],
[
1614102180000,
0.27
],
[
1614102240000,
0.26
],
[
1614102300000,
0.21
],
[
1614102360000,
0.18
],
[
1614102420000,
0.15
],
[
1614102480000,
0.12
],
[
1614102540000,
0.24
],
[
1614102600000,
0.2
],
[
1614102660000,
0.17
],
[
1614102720000,
0.18
],
[
1614102780000,
0.14
],
[
1614102840000,
0.39
],
[
1614102900000,
0.31
],
[
1614102960000,
0.3
],
[
1614103020000,
0.24
],
[
1614103080000,
0.26
],
[
1614103140000,
0.21
],
[
1614103200000,
0.17
],
[
1614103260000,
0.15
],
[
1614103320000,
0.2
],
[
1614103380000,
0.2
],
[
1614103440000,
0.22
],
[
1614103500000,
0.19
],
[
1614103560000,
0.22
],
[
1614103620000,
0.29
],
[
1614103680000,
0.31
],
[
1614103740000,
0.28
],
[
1614103800000,
0.23
]
],
"dpsMeta": {
"firstTimestamp": 1614099600000,
"lastTimestamp": 1614103800000,
"setCount": 71,
"series": 1
},
"meta": [
{
"index": 0,
"metrics": [
"timestamp"
]
},
{
"index": 1,
"metrics": [
"system.load5"
],
"commonTags": {
"rack": "undef",
"host": "use1-mon-metrics-1",
"row": "undef",
"dc": "us-east-1",
"group": "monitoring"
},
"aggregatedTags": []
}
]
}
],
"query": {
"name": null,
"time": {
"start": "1h-ago",
"end": null,
"timezone": null,
"downsampler": {
"interval": "1m",
"aggregator": "avg",
"fillPolicy": {
"policy": "nan",
"value": "NaN"
}
},
"aggregator": "sum",
"rate": false
},
"filters": [
{
"id": "f1",
"tags": [
{
"tagk": "host",
"filter": "use1-mon-metrics-1",
"group_by": true,
"type": "literal_or"
},
{
"tagk": "group",
"filter": "monitoring",
"group_by": true,
"type": "literal_or"
},
{
"tagk": "dc",
"filter": "us-east-1",
"group_by": true,
"type": "literal_or"
},
{
"tagk": "rack",
"filter": "undef",
"group_by": true,
"type": "literal_or"
},
{
"tagk": "row",
"filter": "undef",
"group_by": true,
"type": "literal_or"
}
],
"explicitTags": false
}
],
"metrics": [
{
"metric": "system.load5",
"id": "a",
"filter": "f1",
"aggregator": null,
"timeOffset": null,
"fillPolicy": {
"policy": "nan",
"value": "NaN"
}
}
],
"expressions": [],
"outputs": [
{
"id": "a",
"alias": "query"
}
]
}
}

View file

@ -0,0 +1,62 @@
{
"time": {
"start": "1h-ago",
"aggregator":"sum",
"downsampler": {
"interval": "1m",
"aggregator": "avg",
"fillPolicy": {
"policy": "nan"
}
}
},
"filters": [
{
"tags": [
{
"type": "literal_or",
"tagk": "host",
"filter": "use1-mon-metrics-1",
"groupBy": true
},
{
"type": "literal_or",
"tagk": "group",
"filter": "monitoring",
"groupBy": true
},
{
"type": "literal_or",
"tagk": "dc",
"filter": "us-east-1",
"groupBy": true
},
{
"type": "literal_or",
"tagk": "rack",
"filter": "undef",
"groupBy": true
},
{
"type": "literal_or",
"tagk": "row",
"filter": "undef",
"groupBy": true
}
],
"id": "f1"
}
],
"metrics": [
{
"id": "a",
"metric": "system.load5",
"filter": "f1",
"fillPolicy":{"policy":"nan"}
}
],
"expressions": [],
"outputs":[
{"id":"a", "alias":"query"}
]
}