package opentsdb import ( "encoding/json" "fmt" "io/ioutil" "log" "net/http" "strings" "time" ) // Retention objects contain meta data about what to query for our run type Retention struct { /* OpenTSDB has two levels of aggregation, First, we aggregate any un-mentioned tags into the last result Second, we aggregate into buckets over time To simulate this with config, we have FirstOrder (e.g. sum/avg/max/etc.) SecondOrder (e.g. sum/avg/max/etc.) AggTime (e.g. 1m/10m/1d/etc.) This will build into m=:--none: Or an example: m=sum:1m-avg-none */ FirstOrder string SecondOrder string AggTime string // The actual ranges will will attempt to query (as offsets from now) QueryRanges []TimeRange } // RetentionMeta objects exist to pass smaller subsets (only one retention range) of a full Retention object around type RetentionMeta struct { FirstOrder string SecondOrder string AggTime string } // Client object holds general config about how queries should be performed type Client struct { Addr string // The meta query limit for series returned Limit int Retentions []Retention Filters []string Normalize bool HardTS int64 } // Config contains fields required // for Client configuration type Config struct { Addr string Limit int Offset int64 HardTS int64 Retentions []string Filters []string Normalize bool MsecsTime bool } // TimeRange contains data about time ranges to query type TimeRange struct { Start int64 End int64 } // MetaResults contains return data from search series lookup queries type MetaResults struct { Type string `json:"type"` Results []Meta `json:"results"` //metric string //tags interface{} //limit int //time int //startIndex int //totalResults int } // Meta A meta object about a metric // only contain the tags/etc. and no data type Meta struct { //tsuid string Metric string `json:"metric"` Tags map[string]string `json:"tags"` } // OtsdbMetric is a single series in OpenTSDB's returned format type OtsdbMetric struct { Metric string Tags map[string]string AggregateTags []string Dps map[int64]float64 } // Metric holds the time series data in VictoriaMetrics format type Metric struct { Metric string Tags map[string]string Timestamps []int64 Values []float64 } // FindMetrics discovers all metrics that OpenTSDB knows about (given a filter) // e.g. /api/suggest?type=metrics&q=system&max=100000 func (c Client) FindMetrics(q string) ([]string, error) { resp, err := http.Get(q) if err != nil { return nil, fmt.Errorf("failed to send GET request to %q: %s", q, err) } if resp.StatusCode != 200 { return nil, fmt.Errorf("Bad return from OpenTSDB: %q: %v", resp.StatusCode, resp) } defer func() { _ = resp.Body.Close() }() body, err := ioutil.ReadAll(resp.Body) if err != nil { return nil, fmt.Errorf("could not retrieve metric data from %q: %s", q, err) } var metriclist []string err = json.Unmarshal(body, &metriclist) if err != nil { return nil, fmt.Errorf("failed to read response from %q: %s", q, err) } return metriclist, nil } // FindSeries discovers all series associated with a metric // e.g. /api/search/lookup?m=system.load5&limit=1000000 func (c Client) FindSeries(metric string) ([]Meta, error) { q := fmt.Sprintf("%s/api/search/lookup?m=%s&limit=%d", c.Addr, metric, c.Limit) resp, err := http.Get(q) if err != nil { return nil, fmt.Errorf("failed to set GET request to %q: %s", q, err) } if resp.StatusCode != 200 { return nil, fmt.Errorf("Bad return from OpenTSDB: %q: %v", resp.StatusCode, resp) } defer func() { _ = resp.Body.Close() }() body, err := ioutil.ReadAll(resp.Body) if err != nil { return nil, fmt.Errorf("could not retrieve series data from %q: %s", q, err) } var results MetaResults err = json.Unmarshal(body, &results) if err != nil { return nil, fmt.Errorf("failed to read response from %q: %s", q, err) } return results.Results, nil } // GetData actually retrieves data for a series at a specified time range // e.g. /api/query?start=1&end=200&m=sum:1m-avg-none:system.load5{host=host1} func (c Client) GetData(series Meta, rt RetentionMeta, start int64, end int64) (Metric, error) { /* First, build our tag string. It's literally just key=value,key=value,... */ tagStr := "" for k, v := range series.Tags { tagStr += fmt.Sprintf("%s=%s,", k, v) } // obviously we don't want trailing commas... tagStr = strings.Trim(tagStr, ",") /* The aggregation policy should already be somewhat formatted: FirstOrder (e.g. sum/avg/max/etc.) SecondOrder (e.g. sum/avg/max/etc.) AggTime (e.g. 1m/10m/1d/etc.) This will build into m=:--none: Or an example: m=sum:1m-avg-none */ aggPol := fmt.Sprintf("%s:%s-%s-none", rt.FirstOrder, rt.AggTime, rt.SecondOrder) /* Our actual query string: Start and End are just timestamps We then add the aggregation policy, the metric, and the tag set */ queryStr := fmt.Sprintf("start=%v&end=%v&m=%s:%s{%s}", start, end, aggPol, series.Metric, tagStr) q := fmt.Sprintf("%s/api/query?%s", c.Addr, queryStr) resp, err := http.Get(q) if err != nil { return Metric{}, fmt.Errorf("failed to send GET request to %q: %s", q, err) } if resp.StatusCode != 200 { return Metric{}, fmt.Errorf("Bad return from OpenTSDB: %q: %v", resp.StatusCode, resp) } defer func() { _ = resp.Body.Close() }() body, err := ioutil.ReadAll(resp.Body) if err != nil { return Metric{}, fmt.Errorf("could not retrieve series data from %q: %s", q, err) } var output []OtsdbMetric err = json.Unmarshal(body, &output) if err != nil { return Metric{}, fmt.Errorf("failed to unmarshal response from %q [%v]: %s", q, body, err) } /* We expect results to look like: [ { "metric": "zfs_filesystem.available", "tags": { "rack": "6", "replica": "1", "host": "c7-bfyii-115", "pool": "dattoarray", "row": "c", "dc": "us-west-3", "group": "legonode" }, "aggregateTags": [], "dps": { "1626019200": 32490602877610.668, "1626033600": 32486439014058.668 } } ] There are two things that could be bad here: 1. There are no actual stats returned (an empty array -> []) 2. There are aggregate tags in the results An empty array doesn't cast to a OtsdbMetric struct well, and there's no reason to try, so we should just skip it Because we're trying to migrate data without transformations, seeing aggregate tags could mean we're dropping series on the floor. */ if len(output) < 1 { // no results returned...return an empty object without error return Metric{}, nil } if len(output) > 1 { // multiple series returned for a single query. We can't process this right, so... return Metric{}, fmt.Errorf("Query returned multiple results: %v", output) } if len(output[0].AggregateTags) > 0 { // This failure means we've suppressed potential series somehow... return Metric{}, fmt.Errorf("Query somehow has aggregate tags: %v", output[0].AggregateTags) } data := Metric{} data.Metric = output[0].Metric data.Tags = output[0].Tags /* We evaluate data for correctness before formatting the actual values to skip a little bit of time if the series has invalid formatting */ data, err = modifyData(data, c.Normalize) if err != nil { return Metric{}, fmt.Errorf("invalid series data from %q: %s", q, err) } /* Convert data from OpenTSDB's output format ([[ts,val],[ts,val]...]) to VictoriaMetrics format: {"timestamps": [ts,ts,ts...], "values": [val,val,val...]} The nasty part here is that because an object in each array can be a float64, we have to initially cast _all_ objects that way then convert the timestamp back to something reasonable. */ for ts, val := range output[0].Dps { data.Timestamps = append(data.Timestamps, ts) data.Values = append(data.Values, val) } return data, nil } // NewClient creates and returns OpenTSDB client // configured with passed Config func NewClient(cfg Config) (*Client, error) { var retentions []Retention offsetPrint := int64(time.Now().Unix()) offsetSecs := cfg.Offset * 24 * 60 * 60 if cfg.MsecsTime { // 1000000 == Nanoseconds -> Milliseconds difference offsetPrint = int64(time.Now().UnixNano() / 1000000) // also bump offsetSecs to milliseconds offsetSecs = offsetSecs * 1000 } if cfg.HardTS > 0 { /* HardTS is a specific timestamp we'll be starting at. Just present that if it is defined */ offsetPrint = cfg.HardTS } else if offsetSecs > 0 { /* Our "offset" is the number of days (in seconds) we should step back before starting to scan for data */ offsetPrint = offsetPrint - offsetSecs } log.Println(fmt.Sprintf("Will collect data starting at TS %v", offsetPrint)) for _, r := range cfg.Retentions { ret, err := convertRetention(r, offsetSecs, cfg.MsecsTime) if err != nil { return &Client{}, fmt.Errorf("Couldn't parse retention %q :: %v", r, err) } retentions = append(retentions, ret) } client := &Client{ Addr: strings.Trim(cfg.Addr, "/"), Retentions: retentions, Limit: cfg.Limit, Filters: cfg.Filters, Normalize: cfg.Normalize, HardTS: cfg.HardTS, } return client, nil }