OpenTSDB Migration Fix (#1946)

* Simplify queries to OpenTSDB (and make them properly appear in OpenTSDB query stats) and also tweak defaults a bit

Signed-off-by: John Seekins <jseekins@datto.com>
This commit is contained in:
John Seekins 2021-12-20 00:35:51 -07:00 committed by Aliaksandr Valialkin
parent c446858d08
commit fcb759ebc3
No known key found for this signature in database
GPG key ID: A72BEC6CD3D0DED1

View file

@ -187,18 +187,28 @@ func (c Client) GetData(series Meta, rt RetentionMeta, start int64, end int64) (
if err != nil {
return Metric{}, fmt.Errorf("failed to send GET request to %q: %s", q, err)
}
/*
There are three potential failures here, none of which should kill the entire
migration run:
1. bad response code
2. failure to read response body
3. bad format of response body
*/
if resp.StatusCode != 200 {
return Metric{}, fmt.Errorf("Bad return from OpenTSDB: %q: %v", resp.StatusCode, resp)
log.Println(fmt.Sprintf("bad response code from OpenTSDB query %v...skipping", resp.StatusCode))
return Metric{}, nil
}
defer func() { _ = resp.Body.Close() }()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return Metric{}, fmt.Errorf("could not retrieve series data from %q: %s", q, err)
log.Println("couldn't read response body from OpenTSDB query...skipping")
return Metric{}, nil
}
var output []OtsdbMetric
err = json.Unmarshal(body, &output)
if err != nil {
return Metric{}, fmt.Errorf("failed to unmarshal response from %q [%v]: %s", q, body, err)
log.Println(fmt.Sprintf("couldn't marshall response body from OpenTSDB query (%s)...skipping", body))
return Metric{}, nil
}
/*
We expect results to look like:
@ -227,6 +237,8 @@ func (c Client) GetData(series Meta, rt RetentionMeta, start int64, end int64) (
An empty array doesn't cast to a OtsdbMetric struct well, and there's no reason to try, so we should just skip it
Because we're trying to migrate data without transformations, seeing aggregate tags could mean
we're dropping series on the floor.
In all "bad" cases, we don't end the migration, we just don't process that particular message
*/
if len(output) < 1 {
// no results returned...return an empty object without error
@ -234,11 +246,11 @@ func (c Client) GetData(series Meta, rt RetentionMeta, start int64, end int64) (
}
if len(output) > 1 {
// multiple series returned for a single query. We can't process this right, so...
return Metric{}, fmt.Errorf("Query returned multiple results: %v", output)
return Metric{}, nil
}
if len(output[0].AggregateTags) > 0 {
// This failure means we've suppressed potential series somehow...
return Metric{}, fmt.Errorf("Query somehow has aggregate tags: %v", output[0].AggregateTags)
return Metric{}, nil
}
data := Metric{}
data.Metric = output[0].Metric
@ -249,7 +261,7 @@ func (c Client) GetData(series Meta, rt RetentionMeta, start int64, end int64) (
*/
data, err = modifyData(data, c.Normalize)
if err != nil {
return Metric{}, fmt.Errorf("invalid series data from %q: %s", q, err)
return Metric{}, nil
}
/*