Address some edge cases in OpenTSDB importer and speed it up (#2019)

* Simplify queries to OpenTSDB (and make them properly appear in OpenTSDB query stats) and also tweak defaults a bit

* Convert seconds to milliseconds before writing to VictoriaMetrics and increase subquery size

Signed-off-by: John Seekins <jseekins@datto.com>
This commit is contained in:
John Seekins 2022-01-03 23:51:23 -07:00 committed by GitHub
parent 5ce94e1dd3
commit 60266078ca
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 68 additions and 25 deletions

View file

@ -145,7 +145,7 @@ func (op *otsdbProcessor) run(silent, verbose bool) error {
func (op *otsdbProcessor) do(s queryObj) error {
start := s.StartTime - s.Tr.Start
end := s.StartTime - s.Tr.End
data, err := op.oc.GetData(s.Series, s.Rt, start, end)
data, err := op.oc.GetData(s.Series, s.Rt, start, end, op.oc.MsecsTime)
if err != nil {
return fmt.Errorf("failed to collect data for %v in %v:%v :: %v", s.Series, s.Rt, s.Tr, err)
}

View file

@ -46,6 +46,7 @@ type Client struct {
Filters []string
Normalize bool
HardTS int64
MsecsTime bool
}
// Config contains fields required
@ -82,9 +83,9 @@ type MetaResults struct {
// Meta A meta object about a metric
// only contain the tags/etc. and no data
type Meta struct {
//tsuid string
Metric string `json:"metric"`
Tags map[string]string `json:"tags"`
//tsuid string
}
// OtsdbMetric is a single series in OpenTSDB's returned format
@ -152,7 +153,7 @@ func (c Client) FindSeries(metric string) ([]Meta, error) {
// GetData actually retrieves data for a series at a specified time range
// e.g. /api/query?start=1&end=200&m=sum:1m-avg-none:system.load5{host=host1}
func (c Client) GetData(series Meta, rt RetentionMeta, start int64, end int64) (Metric, error) {
func (c Client) GetData(series Meta, rt RetentionMeta, start int64, end int64, mSecs bool) (Metric, error) {
/*
First, build our tag string.
It's literally just key=value,key=value,...
@ -195,7 +196,7 @@ func (c Client) GetData(series Meta, rt RetentionMeta, start int64, end int64) (
3. bad format of response body
*/
if resp.StatusCode != 200 {
log.Println(fmt.Sprintf("bad response code from OpenTSDB query %v...skipping", resp.StatusCode))
log.Println(fmt.Sprintf("bad response code from OpenTSDB query %v for %q...skipping", resp.StatusCode, q))
return Metric{}, nil
}
defer func() { _ = resp.Body.Close() }()
@ -272,7 +273,11 @@ func (c Client) GetData(series Meta, rt RetentionMeta, start int64, end int64) (
then convert the timestamp back to something reasonable.
*/
for ts, val := range output[0].Dps {
data.Timestamps = append(data.Timestamps, ts)
if !mSecs {
data.Timestamps = append(data.Timestamps, ts*1000)
} else {
data.Timestamps = append(data.Timestamps, ts)
}
data.Values = append(data.Values, val)
}
return data, nil
@ -283,6 +288,7 @@ func (c Client) GetData(series Meta, rt RetentionMeta, start int64, end int64) (
func NewClient(cfg Config) (*Client, error) {
var retentions []Retention
offsetPrint := int64(time.Now().Unix())
// convert a number of days to seconds
offsetSecs := cfg.Offset * 24 * 60 * 60
if cfg.MsecsTime {
// 1000000 == Nanoseconds -> Milliseconds difference
@ -318,6 +324,7 @@ func NewClient(cfg Config) (*Client, error) {
Filters: cfg.Filters,
Normalize: cfg.Normalize,
HardTS: cfg.HardTS,
MsecsTime: cfg.MsecsTime,
}
return client, nil
}

View file

@ -87,6 +87,34 @@ func convertRetention(retention string, offset int64, msecTime bool) (Retention,
if len(chunks) != 3 {
return Retention{}, fmt.Errorf("invalid retention string: %q", retention)
}
queryLengthDuration, err := convertDuration(chunks[2])
if err != nil {
return Retention{}, fmt.Errorf("invalid ttl (second order) duration string: %q: %s", chunks[2], err)
}
// set ttl in milliseconds, unless we aren't using millisecond time in OpenTSDB...then use seconds
queryLength := queryLengthDuration.Milliseconds()
if !msecTime {
queryLength = queryLength / 1000
}
queryRange := queryLength
// bump by the offset so we don't look at empty ranges any time offset > ttl
queryLength += offset
// first/second order aggregations for queries defined in chunk 0...
aggregates := strings.Split(chunks[0], "-")
if len(aggregates) != 3 {
return Retention{}, fmt.Errorf("invalid aggregation string: %q", chunks[0])
}
aggTimeDuration, err := convertDuration(aggregates[1])
if err != nil {
return Retention{}, fmt.Errorf("invalid aggregation time duration string: %q: %s", aggregates[1], err)
}
aggTime := aggTimeDuration.Milliseconds()
if !msecTime {
aggTime = aggTime / 1000
}
rowLengthDuration, err := convertDuration(chunks[1])
if err != nil {
return Retention{}, fmt.Errorf("invalid row length (first order) duration string: %q: %s", chunks[1], err)
@ -96,27 +124,35 @@ func convertRetention(retention string, offset int64, msecTime bool) (Retention,
if !msecTime {
rowLength = rowLength / 1000
}
ttlDuration, err := convertDuration(chunks[2])
if err != nil {
return Retention{}, fmt.Errorf("invalid ttl (second order) duration string: %q: %s", chunks[2], err)
var querySize int64
/*
The idea here is to ensure each individual query sent to OpenTSDB is *at least*
large enough to ensure no single query requests essentially 0 data.
*/
if rowLength > aggTime {
/*
We'll look at 2x the row size for each query we perform
This is a strange function, but the logic works like this:
1. we discover the "number" of ranges we should split the time range into
This is found with queryRange / (rowLength * 4)...kind of a percentage query
2. we discover the actual size of each "chunk"
This is second division step
*/
querySize = int64(queryRange / (queryRange / (rowLength * 4)))
} else {
/*
Unless the aggTime (how long a range of data we're requesting per individual point)
is greater than the row size. Then we'll need to use that to determine
how big each individual query should be
*/
querySize = int64(queryRange / (queryRange / (aggTime * 4)))
}
// set ttl in milliseconds, unless we aren't using millisecond time in OpenTSDB...then use seconds
ttl := ttlDuration.Milliseconds()
if !msecTime {
ttl = ttl / 1000
}
// bump by the offset so we don't look at empty ranges any time offset > ttl
ttl += offset
var timeChunks []TimeRange
var i int64
for i = offset; i <= ttl; i = i + rowLength {
timeChunks = append(timeChunks, TimeRange{Start: i + rowLength, End: i})
}
// first/second order aggregations for queries defined in chunk 0...
aggregates := strings.Split(chunks[0], "-")
if len(aggregates) != 3 {
return Retention{}, fmt.Errorf("invalid aggregation string: %q", chunks[0])
for i = offset; i <= queryLength; i = i + querySize {
timeChunks = append(timeChunks, TimeRange{Start: i + querySize, End: i})
}
ret := Retention{FirstOrder: aggregates[0],

View file

@ -8,7 +8,7 @@ func TestConvertRetention(t *testing.T) {
/*
2592000 seconds in 30 days
3600 in one hour
2592000 / 3600 = 720 individual query "ranges" should exist, plus one because time ranges can be weird
2592000 / 14400 = 180 individual query "ranges" should exist, plus one because time ranges can be weird
First order should == "sum"
Second order should == "avg"
AggTime should == "1m"
@ -17,8 +17,8 @@ func TestConvertRetention(t *testing.T) {
if err != nil {
t.Fatalf("Error parsing valid retention string: %v", err)
}
if len(res.QueryRanges) != 721 {
t.Fatalf("Found %v query ranges. Should have found 720", len(res.QueryRanges))
if len(res.QueryRanges) != 181 {
t.Fatalf("Found %v query ranges. Should have found 181", len(res.QueryRanges))
}
if res.FirstOrder != "sum" {
t.Fatalf("Incorrect first order aggregation %q. Should have been 'sum'", res.FirstOrder)