Simplify queries to OpenTSDB for migration (#1809)

* Simplify queries to OpenTSDB (and make them properly appear in OpenTSDB query stats) and also tweak defaults a bit

Signed-off-by: John Seekins <jseekins@datto.com>

* remove extraneous printlns

Signed-off-by: John Seekins <jseekins@datto.com>

* remove empty line

Signed-off-by: John Seekins <jseekins@datto.com>

* fix bug in offset calcuation and closer to working with simpler queries

Signed-off-by: John Seekins <jseekins@datto.com>

* fix boolean eval

Signed-off-by: John Seekins <jseekins@datto.com>

* fix casting and check for multiple series

Signed-off-by: John Seekins <jseekins@datto.com>
This commit is contained in:
John Seekins 2021-11-18 10:18:15 -07:00 committed by Aliaksandr Valialkin
parent 84decc4254
commit 50c33b7265
No known key found for this signature in database
GPG key ID: A72BEC6CD3D0DED1
5 changed files with 90 additions and 587 deletions

View file

@ -155,7 +155,7 @@ var (
&cli.IntFlag{
Name: otsdbQueryLimit,
Usage: "Result limit on meta queries to OpenTSDB (affects both metric name and tag value queries, recommended to use a value exceeding your largest series)",
Value: 100e3,
Value: 100e6,
},
&cli.BoolFlag{
Name: otsdbMsecsTime,

View file

@ -1,7 +1,6 @@
package opentsdb
import (
"bytes"
"encoding/json"
"fmt"
"io/ioutil"
@ -88,7 +87,15 @@ type Meta struct {
Tags map[string]string `json:"tags"`
}
// Metric holds the time series data
// OtsdbMetric is a single series in OpenTSDB's returned format
type OtsdbMetric struct {
Metric string
Tags map[string]string
AggregateTags []string
Dps map[int64]float64
}
// Metric holds the time series data in VictoriaMetrics format
type Metric struct {
Metric string
Tags map[string]string
@ -96,83 +103,6 @@ type Metric struct {
Values []float64
}
// ExpressionOutput contains results from actual data queries
type ExpressionOutput struct {
Outputs []qoObj `json:"outputs"`
Query interface{} `json:"query"`
}
// QoObj contains actual timeseries data from the returned data query
type qoObj struct {
ID string `json:"id"`
Alias string `json:"alias"`
Dps [][]float64 `json:"dps"`
//dpsMeta interface{}
//meta interface{}
}
// Expression objects format our data queries
/*
All of the following structs are to build a OpenTSDB expression object
*/
type Expression struct {
Time timeObj `json:"time"`
Filters []filterObj `json:"filters"`
Metrics []metricObj `json:"metrics"`
// this just needs to be an empty object, so the value doesn't matter
Expressions []int `json:"expressions"`
Outputs []outputObj `json:"outputs"`
}
type timeObj struct {
Start int64 `json:"start"`
End int64 `json:"end"`
Aggregator string `json:"aggregator"`
Downsampler dSObj `json:"downsampler"`
}
type dSObj struct {
Interval string `json:"interval"`
Aggregator string `json:"aggregator"`
FillPolicy fillObj `json:"fillPolicy"`
}
type fillObj struct {
// we'll always hard-code to NaN here, so we don't need value
Policy string `json:"policy"`
}
type filterObj struct {
Tags []tagObj `json:"tags"`
ID string `json:"id"`
}
type tagObj struct {
Type string `json:"type"`
Tagk string `json:"tagk"`
Filter string `json:"filter"`
GroupBy bool `json:"groupBy"`
}
type metricObj struct {
ID string `json:"id"`
Metric string `json:"metric"`
Filter string `json:"filter"`
FillPolicy fillObj `json:"fillPolicy"`
}
type outputObj struct {
ID string `json:"id"`
Alias string `json:"alias"`
}
/* End expression object structs */
var (
exprOutput = outputObj{ID: "a", Alias: "query"}
exprFillPolicy = fillObj{Policy: "nan"}
)
// FindMetrics discovers all metrics that OpenTSDB knows about (given a filter)
// e.g. /api/suggest?type=metrics&q=system&max=100000
func (c Client) FindMetrics(q string) ([]string, error) {
@ -221,41 +151,39 @@ func (c Client) FindSeries(metric string) ([]Meta, error) {
}
// GetData actually retrieves data for a series at a specified time range
// e.g. /api/query?start=1&end=200&m=sum:1m-avg-none:system.load5{host=host1}
func (c Client) GetData(series Meta, rt RetentionMeta, start int64, end int64) (Metric, error) {
/*
Here we build the actual exp query we'll send to OpenTSDB
This is comprised of a number of different settings. We hard-code
a few to simplify the JSON object creation.
There are examples queries available, so not too much detail here...
First, build our tag string.
It's literally just key=value,key=value,...
*/
expr := Expression{}
expr.Outputs = []outputObj{exprOutput}
expr.Metrics = append(expr.Metrics, metricObj{ID: "a", Metric: series.Metric,
Filter: "f1", FillPolicy: exprFillPolicy})
expr.Time = timeObj{Start: start, End: end, Aggregator: rt.FirstOrder,
Downsampler: dSObj{Interval: rt.AggTime,
Aggregator: rt.SecondOrder,
FillPolicy: exprFillPolicy}}
var TagList []tagObj
tagStr := ""
for k, v := range series.Tags {
/*
every tag should be a literal_or because that's the closest to a full "==" that
this endpoint allows for
*/
TagList = append(TagList, tagObj{Type: "literal_or", Tagk: k,
Filter: v, GroupBy: true})
}
expr.Filters = append(expr.Filters, filterObj{ID: "f1", Tags: TagList})
// "expressions" is required in the query object or we get a 5xx, so force it to exist
expr.Expressions = make([]int, 0)
inputData, err := json.Marshal(expr)
if err != nil {
return Metric{}, fmt.Errorf("failed to marshal query JSON %s", err)
tagStr += fmt.Sprintf("%s=%s,", k, v)
}
// obviously we don't want trailing commas...
tagStr = strings.Trim(tagStr, ",")
q := fmt.Sprintf("%s/api/query/exp", c.Addr)
resp, err := http.Post(q, "application/json", bytes.NewBuffer(inputData))
/*
The aggregation policy should already be somewhat formatted:
FirstOrder (e.g. sum/avg/max/etc.)
SecondOrder (e.g. sum/avg/max/etc.)
AggTime (e.g. 1m/10m/1d/etc.)
This will build into m=<FirstOrder>:<AggTime>-<SecondOrder>-none:
Or an example: m=sum:1m-avg-none
*/
aggPol := fmt.Sprintf("%s:%s-%s-none", rt.FirstOrder, rt.AggTime, rt.SecondOrder)
/*
Our actual query string:
Start and End are just timestamps
We then add the aggregation policy, the metric, and the tag set
*/
queryStr := fmt.Sprintf("start=%v&end=%v&m=%s:%s{%s}", start, end, aggPol,
series.Metric, tagStr)
q := fmt.Sprintf("%s/api/query?%s", c.Addr, queryStr)
resp, err := http.Get(q)
if err != nil {
return Metric{}, fmt.Errorf("failed to send GET request to %q: %s", q, err)
}
@ -267,28 +195,63 @@ func (c Client) GetData(series Meta, rt RetentionMeta, start int64, end int64) (
if err != nil {
return Metric{}, fmt.Errorf("could not retrieve series data from %q: %s", q, err)
}
var output ExpressionOutput
var output []OtsdbMetric
err = json.Unmarshal(body, &output)
if err != nil {
return Metric{}, fmt.Errorf("failed to unmarshal response from %q: %s", q, err)
return Metric{}, fmt.Errorf("failed to unmarshal response from %q [%v]: %s", q, body, err)
}
if len(output.Outputs) < 1 {
/*
We expect results to look like:
[
{
"metric": "zfs_filesystem.available",
"tags": {
"rack": "6",
"replica": "1",
"host": "c7-bfyii-115",
"pool": "dattoarray",
"row": "c",
"dc": "us-west-3",
"group": "legonode"
},
"aggregateTags": [],
"dps": {
"1626019200": 32490602877610.668,
"1626033600": 32486439014058.668
}
}
]
There are two things that could be bad here:
1. There are no actual stats returned (an empty array -> [])
2. There are aggregate tags in the results
An empty array doesn't cast to a OtsdbMetric struct well, and there's no reason to try, so we should just skip it
Because we're trying to migrate data without transformations, seeing aggregate tags could mean
we're dropping series on the floor.
*/
if len(output) < 1 {
// no results returned...return an empty object without error
return Metric{}, nil
}
if len(output) > 1 {
// multiple series returned for a single query. We can't process this right, so...
return Metric{}, fmt.Errorf("Query returned multiple results: %v", output)
}
if len(output[0].AggregateTags) > 0 {
// This failure means we've suppressed potential series somehow...
return Metric{}, fmt.Errorf("Query somehow has aggregate tags: %v", output[0].AggregateTags)
}
data := Metric{}
data.Metric = series.Metric
data.Tags = series.Tags
data.Metric = output[0].Metric
data.Tags = output[0].Tags
/*
We evaluate data for correctness before formatting the actual values
to skip a little bit of time if the series has invalid formatting
First step is to enforce Prometheus' data model
*/
data, err = modifyData(data, c.Normalize)
if err != nil {
return Metric{}, fmt.Errorf("invalid series data from %q: %s", q, err)
}
/*
Convert data from OpenTSDB's output format ([[ts,val],[ts,val]...])
to VictoriaMetrics format: {"timestamps": [ts,ts,ts...], "values": [val,val,val...]}
@ -296,9 +259,9 @@ func (c Client) GetData(series Meta, rt RetentionMeta, start int64, end int64) (
can be a float64, we have to initially cast _all_ objects that way
then convert the timestamp back to something reasonable.
*/
for _, tsobj := range output.Outputs[0].Dps {
data.Timestamps = append(data.Timestamps, int64(tsobj[0]))
data.Values = append(data.Values, tsobj[1])
for ts, val := range output[0].Dps {
data.Timestamps = append(data.Timestamps, ts)
data.Values = append(data.Values, val)
}
return data, nil
}
@ -308,9 +271,12 @@ func (c Client) GetData(series Meta, rt RetentionMeta, start int64, end int64) (
func NewClient(cfg Config) (*Client, error) {
var retentions []Retention
offsetPrint := int64(time.Now().Unix())
offsetSecs := cfg.Offset * 24 * 60 * 60
if cfg.MsecsTime {
// 1000000 == Nanoseconds -> Milliseconds difference
offsetPrint = int64(time.Now().UnixNano() / 1000000)
// also bump offsetSecs to milliseconds
offsetSecs = offsetSecs * 1000
}
if cfg.HardTS > 0 {
/*
@ -318,20 +284,16 @@ func NewClient(cfg Config) (*Client, error) {
Just present that if it is defined
*/
offsetPrint = cfg.HardTS
} else if cfg.Offset > 0 {
} else if offsetSecs > 0 {
/*
Our "offset" is the number of days we should step
Our "offset" is the number of days (in seconds) we should step
back before starting to scan for data
*/
if cfg.MsecsTime {
offsetPrint = offsetPrint - (cfg.Offset * 24 * 60 * 60 * 1000)
} else {
offsetPrint = offsetPrint - (cfg.Offset * 24 * 60 * 60)
}
offsetPrint = offsetPrint - offsetSecs
}
log.Println(fmt.Sprintf("Will collect data starting at TS %v", offsetPrint))
for _, r := range cfg.Retentions {
ret, err := convertRetention(r, cfg.Offset, cfg.MsecsTime)
ret, err := convertRetention(r, offsetSecs, cfg.MsecsTime)
if err != nil {
return &Client{}, fmt.Errorf("Couldn't parse retention %q :: %v", r, err)
}

View file

@ -107,6 +107,7 @@ func convertRetention(retention string, offset int64, msecTime bool) (Retention,
}
// bump by the offset so we don't look at empty ranges any time offset > ttl
ttl += offset
var timeChunks []TimeRange
var i int64
for i = offset; i <= ttl; i = i + rowLength {

View file

@ -1,398 +0,0 @@
{
"outputs": [
{
"id": "a",
"alias": "query",
"dps": [
[
1614099600000,
0.28
],
[
1614099660000,
0.22
],
[
1614099720000,
0.18
],
[
1614099780000,
0.14
],
[
1614099840000,
0.24
],
[
1614099900000,
0.19
],
[
1614099960000,
0.22
],
[
1614100020000,
0.2
],
[
1614100080000,
0.18
],
[
1614100140000,
0.22
],
[
1614100200000,
0.17
],
[
1614100260000,
0.16
],
[
1614100320000,
0.22
],
[
1614100380000,
0.3
],
[
1614100440000,
0.28
],
[
1614100500000,
0.27
],
[
1614100560000,
0.26
],
[
1614100620000,
0.23
],
[
1614100680000,
0.18
],
[
1614100740000,
0.3
],
[
1614100800000,
0.24
],
[
1614100860000,
0.19
],
[
1614100920000,
0.16
],
[
1614100980000,
0.19
],
[
1614101040000,
0.23
],
[
1614101100000,
0.18
],
[
1614101160000,
0.15
],
[
1614101220000,
0.12
],
[
1614101280000,
0.1
],
[
1614101340000,
0.24
],
[
1614101400000,
0.19
],
[
1614101460000,
0.16
],
[
1614101520000,
0.14
],
[
1614101580000,
0.12
],
[
1614101640000,
0.14
],
[
1614101700000,
0.12
],
[
1614101760000,
0.13
],
[
1614101820000,
0.12
],
[
1614101880000,
0.11
],
[
1614101940000,
0.36
],
[
1614102000000,
0.35
],
[
1614102060000,
0.3
],
[
1614102120000,
0.32
],
[
1614102180000,
0.27
],
[
1614102240000,
0.26
],
[
1614102300000,
0.21
],
[
1614102360000,
0.18
],
[
1614102420000,
0.15
],
[
1614102480000,
0.12
],
[
1614102540000,
0.24
],
[
1614102600000,
0.2
],
[
1614102660000,
0.17
],
[
1614102720000,
0.18
],
[
1614102780000,
0.14
],
[
1614102840000,
0.39
],
[
1614102900000,
0.31
],
[
1614102960000,
0.3
],
[
1614103020000,
0.24
],
[
1614103080000,
0.26
],
[
1614103140000,
0.21
],
[
1614103200000,
0.17
],
[
1614103260000,
0.15
],
[
1614103320000,
0.2
],
[
1614103380000,
0.2
],
[
1614103440000,
0.22
],
[
1614103500000,
0.19
],
[
1614103560000,
0.22
],
[
1614103620000,
0.29
],
[
1614103680000,
0.31
],
[
1614103740000,
0.28
],
[
1614103800000,
0.23
]
],
"dpsMeta": {
"firstTimestamp": 1614099600000,
"lastTimestamp": 1614103800000,
"setCount": 71,
"series": 1
},
"meta": [
{
"index": 0,
"metrics": [
"timestamp"
]
},
{
"index": 1,
"metrics": [
"system.load5"
],
"commonTags": {
"rack": "undef",
"host": "use1-mon-metrics-1",
"row": "undef",
"dc": "us-east-1",
"group": "monitoring"
},
"aggregatedTags": []
}
]
}
],
"query": {
"name": null,
"time": {
"start": "1h-ago",
"end": null,
"timezone": null,
"downsampler": {
"interval": "1m",
"aggregator": "avg",
"fillPolicy": {
"policy": "nan",
"value": "NaN"
}
},
"aggregator": "sum",
"rate": false
},
"filters": [
{
"id": "f1",
"tags": [
{
"tagk": "host",
"filter": "use1-mon-metrics-1",
"group_by": true,
"type": "literal_or"
},
{
"tagk": "group",
"filter": "monitoring",
"group_by": true,
"type": "literal_or"
},
{
"tagk": "dc",
"filter": "us-east-1",
"group_by": true,
"type": "literal_or"
},
{
"tagk": "rack",
"filter": "undef",
"group_by": true,
"type": "literal_or"
},
{
"tagk": "row",
"filter": "undef",
"group_by": true,
"type": "literal_or"
}
],
"explicitTags": false
}
],
"metrics": [
{
"metric": "system.load5",
"id": "a",
"filter": "f1",
"aggregator": null,
"timeOffset": null,
"fillPolicy": {
"policy": "nan",
"value": "NaN"
}
}
],
"expressions": [],
"outputs": [
{
"id": "a",
"alias": "query"
}
]
}
}

View file

@ -1,62 +0,0 @@
{
"time": {
"start": "1h-ago",
"aggregator":"sum",
"downsampler": {
"interval": "1m",
"aggregator": "avg",
"fillPolicy": {
"policy": "nan"
}
}
},
"filters": [
{
"tags": [
{
"type": "literal_or",
"tagk": "host",
"filter": "use1-mon-metrics-1",
"groupBy": true
},
{
"type": "literal_or",
"tagk": "group",
"filter": "monitoring",
"groupBy": true
},
{
"type": "literal_or",
"tagk": "dc",
"filter": "us-east-1",
"groupBy": true
},
{
"type": "literal_or",
"tagk": "rack",
"filter": "undef",
"groupBy": true
},
{
"type": "literal_or",
"tagk": "row",
"filter": "undef",
"groupBy": true
}
],
"id": "f1"
}
],
"metrics": [
{
"id": "a",
"metric": "system.load5",
"filter": "f1",
"fillPolicy":{"policy":"nan"}
}
],
"expressions": [],
"outputs":[
{"id":"a", "alias":"query"}
]
}