mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-03-11 15:34:56 +00:00
Address some edge cases in OpenTSDB importer and speed it up (#2019)
* Simplify queries to OpenTSDB (and make them properly appear in OpenTSDB query stats) and also tweak defaults a bit * Convert seconds to milliseconds before writing to VictoriaMetrics and increase subquery size Signed-off-by: John Seekins <jseekins@datto.com>
This commit is contained in:
parent
276cccb888
commit
edd2db1b10
4 changed files with 68 additions and 25 deletions
|
@ -145,7 +145,7 @@ func (op *otsdbProcessor) run(silent, verbose bool) error {
|
|||
func (op *otsdbProcessor) do(s queryObj) error {
|
||||
start := s.StartTime - s.Tr.Start
|
||||
end := s.StartTime - s.Tr.End
|
||||
data, err := op.oc.GetData(s.Series, s.Rt, start, end)
|
||||
data, err := op.oc.GetData(s.Series, s.Rt, start, end, op.oc.MsecsTime)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to collect data for %v in %v:%v :: %v", s.Series, s.Rt, s.Tr, err)
|
||||
}
|
||||
|
|
|
@ -46,6 +46,7 @@ type Client struct {
|
|||
Filters []string
|
||||
Normalize bool
|
||||
HardTS int64
|
||||
MsecsTime bool
|
||||
}
|
||||
|
||||
// Config contains fields required
|
||||
|
@ -82,9 +83,9 @@ type MetaResults struct {
|
|||
// Meta A meta object about a metric
|
||||
// only contain the tags/etc. and no data
|
||||
type Meta struct {
|
||||
//tsuid string
|
||||
Metric string `json:"metric"`
|
||||
Tags map[string]string `json:"tags"`
|
||||
//tsuid string
|
||||
}
|
||||
|
||||
// OtsdbMetric is a single series in OpenTSDB's returned format
|
||||
|
@ -152,7 +153,7 @@ func (c Client) FindSeries(metric string) ([]Meta, error) {
|
|||
|
||||
// GetData actually retrieves data for a series at a specified time range
|
||||
// e.g. /api/query?start=1&end=200&m=sum:1m-avg-none:system.load5{host=host1}
|
||||
func (c Client) GetData(series Meta, rt RetentionMeta, start int64, end int64) (Metric, error) {
|
||||
func (c Client) GetData(series Meta, rt RetentionMeta, start int64, end int64, mSecs bool) (Metric, error) {
|
||||
/*
|
||||
First, build our tag string.
|
||||
It's literally just key=value,key=value,...
|
||||
|
@ -195,7 +196,7 @@ func (c Client) GetData(series Meta, rt RetentionMeta, start int64, end int64) (
|
|||
3. bad format of response body
|
||||
*/
|
||||
if resp.StatusCode != 200 {
|
||||
log.Println(fmt.Sprintf("bad response code from OpenTSDB query %v...skipping", resp.StatusCode))
|
||||
log.Println(fmt.Sprintf("bad response code from OpenTSDB query %v for %q...skipping", resp.StatusCode, q))
|
||||
return Metric{}, nil
|
||||
}
|
||||
defer func() { _ = resp.Body.Close() }()
|
||||
|
@ -272,7 +273,11 @@ func (c Client) GetData(series Meta, rt RetentionMeta, start int64, end int64) (
|
|||
then convert the timestamp back to something reasonable.
|
||||
*/
|
||||
for ts, val := range output[0].Dps {
|
||||
data.Timestamps = append(data.Timestamps, ts)
|
||||
if !mSecs {
|
||||
data.Timestamps = append(data.Timestamps, ts*1000)
|
||||
} else {
|
||||
data.Timestamps = append(data.Timestamps, ts)
|
||||
}
|
||||
data.Values = append(data.Values, val)
|
||||
}
|
||||
return data, nil
|
||||
|
@ -283,6 +288,7 @@ func (c Client) GetData(series Meta, rt RetentionMeta, start int64, end int64) (
|
|||
func NewClient(cfg Config) (*Client, error) {
|
||||
var retentions []Retention
|
||||
offsetPrint := int64(time.Now().Unix())
|
||||
// convert a number of days to seconds
|
||||
offsetSecs := cfg.Offset * 24 * 60 * 60
|
||||
if cfg.MsecsTime {
|
||||
// 1000000 == Nanoseconds -> Milliseconds difference
|
||||
|
@ -318,6 +324,7 @@ func NewClient(cfg Config) (*Client, error) {
|
|||
Filters: cfg.Filters,
|
||||
Normalize: cfg.Normalize,
|
||||
HardTS: cfg.HardTS,
|
||||
MsecsTime: cfg.MsecsTime,
|
||||
}
|
||||
return client, nil
|
||||
}
|
||||
|
|
|
@ -87,6 +87,34 @@ func convertRetention(retention string, offset int64, msecTime bool) (Retention,
|
|||
if len(chunks) != 3 {
|
||||
return Retention{}, fmt.Errorf("invalid retention string: %q", retention)
|
||||
}
|
||||
queryLengthDuration, err := convertDuration(chunks[2])
|
||||
if err != nil {
|
||||
return Retention{}, fmt.Errorf("invalid ttl (second order) duration string: %q: %s", chunks[2], err)
|
||||
}
|
||||
// set ttl in milliseconds, unless we aren't using millisecond time in OpenTSDB...then use seconds
|
||||
queryLength := queryLengthDuration.Milliseconds()
|
||||
if !msecTime {
|
||||
queryLength = queryLength / 1000
|
||||
}
|
||||
queryRange := queryLength
|
||||
// bump by the offset so we don't look at empty ranges any time offset > ttl
|
||||
queryLength += offset
|
||||
|
||||
// first/second order aggregations for queries defined in chunk 0...
|
||||
aggregates := strings.Split(chunks[0], "-")
|
||||
if len(aggregates) != 3 {
|
||||
return Retention{}, fmt.Errorf("invalid aggregation string: %q", chunks[0])
|
||||
}
|
||||
|
||||
aggTimeDuration, err := convertDuration(aggregates[1])
|
||||
if err != nil {
|
||||
return Retention{}, fmt.Errorf("invalid aggregation time duration string: %q: %s", aggregates[1], err)
|
||||
}
|
||||
aggTime := aggTimeDuration.Milliseconds()
|
||||
if !msecTime {
|
||||
aggTime = aggTime / 1000
|
||||
}
|
||||
|
||||
rowLengthDuration, err := convertDuration(chunks[1])
|
||||
if err != nil {
|
||||
return Retention{}, fmt.Errorf("invalid row length (first order) duration string: %q: %s", chunks[1], err)
|
||||
|
@ -96,27 +124,35 @@ func convertRetention(retention string, offset int64, msecTime bool) (Retention,
|
|||
if !msecTime {
|
||||
rowLength = rowLength / 1000
|
||||
}
|
||||
ttlDuration, err := convertDuration(chunks[2])
|
||||
if err != nil {
|
||||
return Retention{}, fmt.Errorf("invalid ttl (second order) duration string: %q: %s", chunks[2], err)
|
||||
|
||||
var querySize int64
|
||||
/*
|
||||
The idea here is to ensure each individual query sent to OpenTSDB is *at least*
|
||||
large enough to ensure no single query requests essentially 0 data.
|
||||
*/
|
||||
if rowLength > aggTime {
|
||||
/*
|
||||
We'll look at 2x the row size for each query we perform
|
||||
This is a strange function, but the logic works like this:
|
||||
1. we discover the "number" of ranges we should split the time range into
|
||||
This is found with queryRange / (rowLength * 4)...kind of a percentage query
|
||||
2. we discover the actual size of each "chunk"
|
||||
This is second division step
|
||||
*/
|
||||
querySize = int64(queryRange / (queryRange / (rowLength * 4)))
|
||||
} else {
|
||||
/*
|
||||
Unless the aggTime (how long a range of data we're requesting per individual point)
|
||||
is greater than the row size. Then we'll need to use that to determine
|
||||
how big each individual query should be
|
||||
*/
|
||||
querySize = int64(queryRange / (queryRange / (aggTime * 4)))
|
||||
}
|
||||
// set ttl in milliseconds, unless we aren't using millisecond time in OpenTSDB...then use seconds
|
||||
ttl := ttlDuration.Milliseconds()
|
||||
if !msecTime {
|
||||
ttl = ttl / 1000
|
||||
}
|
||||
// bump by the offset so we don't look at empty ranges any time offset > ttl
|
||||
ttl += offset
|
||||
|
||||
var timeChunks []TimeRange
|
||||
var i int64
|
||||
for i = offset; i <= ttl; i = i + rowLength {
|
||||
timeChunks = append(timeChunks, TimeRange{Start: i + rowLength, End: i})
|
||||
}
|
||||
// first/second order aggregations for queries defined in chunk 0...
|
||||
aggregates := strings.Split(chunks[0], "-")
|
||||
if len(aggregates) != 3 {
|
||||
return Retention{}, fmt.Errorf("invalid aggregation string: %q", chunks[0])
|
||||
for i = offset; i <= queryLength; i = i + querySize {
|
||||
timeChunks = append(timeChunks, TimeRange{Start: i + querySize, End: i})
|
||||
}
|
||||
|
||||
ret := Retention{FirstOrder: aggregates[0],
|
||||
|
|
|
@ -8,7 +8,7 @@ func TestConvertRetention(t *testing.T) {
|
|||
/*
|
||||
2592000 seconds in 30 days
|
||||
3600 in one hour
|
||||
2592000 / 3600 = 720 individual query "ranges" should exist, plus one because time ranges can be weird
|
||||
2592000 / 14400 = 180 individual query "ranges" should exist, plus one because time ranges can be weird
|
||||
First order should == "sum"
|
||||
Second order should == "avg"
|
||||
AggTime should == "1m"
|
||||
|
@ -17,8 +17,8 @@ func TestConvertRetention(t *testing.T) {
|
|||
if err != nil {
|
||||
t.Fatalf("Error parsing valid retention string: %v", err)
|
||||
}
|
||||
if len(res.QueryRanges) != 721 {
|
||||
t.Fatalf("Found %v query ranges. Should have found 720", len(res.QueryRanges))
|
||||
if len(res.QueryRanges) != 181 {
|
||||
t.Fatalf("Found %v query ranges. Should have found 181", len(res.QueryRanges))
|
||||
}
|
||||
if res.FirstOrder != "sum" {
|
||||
t.Fatalf("Incorrect first order aggregation %q. Should have been 'sum'", res.FirstOrder)
|
||||
|
|
Loading…
Reference in a new issue