VictoriaMetrics/app/vmctl/opentsdb/opentsdb.go
John Seekins edd2db1b10
Address some edge cases in OpenTSDB importer and speed it up (#2019)
* Simplify queries to OpenTSDB (and make them properly appear in OpenTSDB query stats) and also tweak defaults a bit

* Convert seconds to milliseconds before writing to VictoriaMetrics and increase subquery size

Signed-off-by: John Seekins <jseekins@datto.com>
2022-01-04 11:48:23 +02:00

330 lines
9.5 KiB
Go

package opentsdb
import (
"encoding/json"
"fmt"
"io/ioutil"
"log"
"net/http"
"strings"
"time"
)
// Retention objects contain meta data about what to query for our run
type Retention struct {
/*
OpenTSDB has two levels of aggregation,
First, we aggregate any un-mentioned tags into the last result
Second, we aggregate into buckets over time
To simulate this with config, we have
FirstOrder (e.g. sum/avg/max/etc.)
SecondOrder (e.g. sum/avg/max/etc.)
AggTime (e.g. 1m/10m/1d/etc.)
This will build into m=<FirstOrder>:<AggTime>-<SecondOrder>-none:
Or an example: m=sum:1m-avg-none
*/
FirstOrder string
SecondOrder string
AggTime string
// The actual ranges will will attempt to query (as offsets from now)
QueryRanges []TimeRange
}
// RetentionMeta objects exist to pass smaller subsets (only one retention range) of a full Retention object around
type RetentionMeta struct {
FirstOrder string
SecondOrder string
AggTime string
}
// Client object holds general config about how queries should be performed
type Client struct {
Addr string
// The meta query limit for series returned
Limit int
Retentions []Retention
Filters []string
Normalize bool
HardTS int64
MsecsTime bool
}
// Config contains fields required
// for Client configuration
type Config struct {
Addr string
Limit int
Offset int64
HardTS int64
Retentions []string
Filters []string
Normalize bool
MsecsTime bool
}
// TimeRange contains data about time ranges to query
type TimeRange struct {
Start int64
End int64
}
// MetaResults contains return data from search series lookup queries
type MetaResults struct {
Type string `json:"type"`
Results []Meta `json:"results"`
//metric string
//tags interface{}
//limit int
//time int
//startIndex int
//totalResults int
}
// Meta A meta object about a metric
// only contain the tags/etc. and no data
type Meta struct {
Metric string `json:"metric"`
Tags map[string]string `json:"tags"`
//tsuid string
}
// OtsdbMetric is a single series in OpenTSDB's returned format
type OtsdbMetric struct {
Metric string
Tags map[string]string
AggregateTags []string
Dps map[int64]float64
}
// Metric holds the time series data in VictoriaMetrics format
type Metric struct {
Metric string
Tags map[string]string
Timestamps []int64
Values []float64
}
// FindMetrics discovers all metrics that OpenTSDB knows about (given a filter)
// e.g. /api/suggest?type=metrics&q=system&max=100000
func (c Client) FindMetrics(q string) ([]string, error) {
resp, err := http.Get(q)
if err != nil {
return nil, fmt.Errorf("failed to send GET request to %q: %s", q, err)
}
if resp.StatusCode != 200 {
return nil, fmt.Errorf("Bad return from OpenTSDB: %q: %v", resp.StatusCode, resp)
}
defer func() { _ = resp.Body.Close() }()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("could not retrieve metric data from %q: %s", q, err)
}
var metriclist []string
err = json.Unmarshal(body, &metriclist)
if err != nil {
return nil, fmt.Errorf("failed to read response from %q: %s", q, err)
}
return metriclist, nil
}
// FindSeries discovers all series associated with a metric
// e.g. /api/search/lookup?m=system.load5&limit=1000000
func (c Client) FindSeries(metric string) ([]Meta, error) {
q := fmt.Sprintf("%s/api/search/lookup?m=%s&limit=%d", c.Addr, metric, c.Limit)
resp, err := http.Get(q)
if err != nil {
return nil, fmt.Errorf("failed to set GET request to %q: %s", q, err)
}
if resp.StatusCode != 200 {
return nil, fmt.Errorf("Bad return from OpenTSDB: %q: %v", resp.StatusCode, resp)
}
defer func() { _ = resp.Body.Close() }()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("could not retrieve series data from %q: %s", q, err)
}
var results MetaResults
err = json.Unmarshal(body, &results)
if err != nil {
return nil, fmt.Errorf("failed to read response from %q: %s", q, err)
}
return results.Results, nil
}
// GetData actually retrieves data for a series at a specified time range
// e.g. /api/query?start=1&end=200&m=sum:1m-avg-none:system.load5{host=host1}
func (c Client) GetData(series Meta, rt RetentionMeta, start int64, end int64, mSecs bool) (Metric, error) {
/*
First, build our tag string.
It's literally just key=value,key=value,...
*/
tagStr := ""
for k, v := range series.Tags {
tagStr += fmt.Sprintf("%s=%s,", k, v)
}
// obviously we don't want trailing commas...
tagStr = strings.Trim(tagStr, ",")
/*
The aggregation policy should already be somewhat formatted:
FirstOrder (e.g. sum/avg/max/etc.)
SecondOrder (e.g. sum/avg/max/etc.)
AggTime (e.g. 1m/10m/1d/etc.)
This will build into m=<FirstOrder>:<AggTime>-<SecondOrder>-none:
Or an example: m=sum:1m-avg-none
*/
aggPol := fmt.Sprintf("%s:%s-%s-none", rt.FirstOrder, rt.AggTime, rt.SecondOrder)
/*
Our actual query string:
Start and End are just timestamps
We then add the aggregation policy, the metric, and the tag set
*/
queryStr := fmt.Sprintf("start=%v&end=%v&m=%s:%s{%s}", start, end, aggPol,
series.Metric, tagStr)
q := fmt.Sprintf("%s/api/query?%s", c.Addr, queryStr)
resp, err := http.Get(q)
if err != nil {
return Metric{}, fmt.Errorf("failed to send GET request to %q: %s", q, err)
}
/*
There are three potential failures here, none of which should kill the entire
migration run:
1. bad response code
2. failure to read response body
3. bad format of response body
*/
if resp.StatusCode != 200 {
log.Println(fmt.Sprintf("bad response code from OpenTSDB query %v for %q...skipping", resp.StatusCode, q))
return Metric{}, nil
}
defer func() { _ = resp.Body.Close() }()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
log.Println("couldn't read response body from OpenTSDB query...skipping")
return Metric{}, nil
}
var output []OtsdbMetric
err = json.Unmarshal(body, &output)
if err != nil {
log.Println(fmt.Sprintf("couldn't marshall response body from OpenTSDB query (%s)...skipping", body))
return Metric{}, nil
}
/*
We expect results to look like:
[
{
"metric": "zfs_filesystem.available",
"tags": {
"rack": "6",
"replica": "1",
"host": "c7-bfyii-115",
"pool": "dattoarray",
"row": "c",
"dc": "us-west-3",
"group": "legonode"
},
"aggregateTags": [],
"dps": {
"1626019200": 32490602877610.668,
"1626033600": 32486439014058.668
}
}
]
There are two things that could be bad here:
1. There are no actual stats returned (an empty array -> [])
2. There are aggregate tags in the results
An empty array doesn't cast to a OtsdbMetric struct well, and there's no reason to try, so we should just skip it
Because we're trying to migrate data without transformations, seeing aggregate tags could mean
we're dropping series on the floor.
In all "bad" cases, we don't end the migration, we just don't process that particular message
*/
if len(output) < 1 {
// no results returned...return an empty object without error
return Metric{}, nil
}
if len(output) > 1 {
// multiple series returned for a single query. We can't process this right, so...
return Metric{}, nil
}
if len(output[0].AggregateTags) > 0 {
// This failure means we've suppressed potential series somehow...
return Metric{}, nil
}
data := Metric{}
data.Metric = output[0].Metric
data.Tags = output[0].Tags
/*
We evaluate data for correctness before formatting the actual values
to skip a little bit of time if the series has invalid formatting
*/
data, err = modifyData(data, c.Normalize)
if err != nil {
return Metric{}, nil
}
/*
Convert data from OpenTSDB's output format ([[ts,val],[ts,val]...])
to VictoriaMetrics format: {"timestamps": [ts,ts,ts...], "values": [val,val,val...]}
The nasty part here is that because an object in each array
can be a float64, we have to initially cast _all_ objects that way
then convert the timestamp back to something reasonable.
*/
for ts, val := range output[0].Dps {
if !mSecs {
data.Timestamps = append(data.Timestamps, ts*1000)
} else {
data.Timestamps = append(data.Timestamps, ts)
}
data.Values = append(data.Values, val)
}
return data, nil
}
// NewClient creates and returns OpenTSDB client
// configured with passed Config
func NewClient(cfg Config) (*Client, error) {
var retentions []Retention
offsetPrint := int64(time.Now().Unix())
// convert a number of days to seconds
offsetSecs := cfg.Offset * 24 * 60 * 60
if cfg.MsecsTime {
// 1000000 == Nanoseconds -> Milliseconds difference
offsetPrint = int64(time.Now().UnixNano() / 1000000)
// also bump offsetSecs to milliseconds
offsetSecs = offsetSecs * 1000
}
if cfg.HardTS > 0 {
/*
HardTS is a specific timestamp we'll be starting at.
Just present that if it is defined
*/
offsetPrint = cfg.HardTS
} else if offsetSecs > 0 {
/*
Our "offset" is the number of days (in seconds) we should step
back before starting to scan for data
*/
offsetPrint = offsetPrint - offsetSecs
}
log.Println(fmt.Sprintf("Will collect data starting at TS %v", offsetPrint))
for _, r := range cfg.Retentions {
ret, err := convertRetention(r, offsetSecs, cfg.MsecsTime)
if err != nil {
return &Client{}, fmt.Errorf("Couldn't parse retention %q :: %v", r, err)
}
retentions = append(retentions, ret)
}
client := &Client{
Addr: strings.Trim(cfg.Addr, "/"),
Retentions: retentions,
Limit: cfg.Limit,
Filters: cfg.Filters,
Normalize: cfg.Normalize,
HardTS: cfg.HardTS,
MsecsTime: cfg.MsecsTime,
}
return client, nil
}