VictoriaMetrics/app/vmalert/datasource/vm_prom_api.go

276 lines
8.5 KiB
Go
Raw Permalink Normal View History

package datasource
import (
"encoding/json"
"flag"
"fmt"
"net/http"
"strconv"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/valyala/fastjson"
)
var (
disablePathAppend = flag.Bool("remoteRead.disablePathAppend", false, "Whether to disable automatic appending of '/api/v1/query' path "+
"to the configured -datasource.url and -remoteRead.url")
disableStepParam = flag.Bool("datasource.disableStepParam", false, "Whether to disable adding 'step' param to the issued instant queries. "+
"This might be useful when using vmalert with datasources that do not support 'step' param for instant queries, like Google Managed Prometheus. "+
"It is not recommended to enable this flag if you use vmalert with VictoriaMetrics.")
)
type promResponse struct {
Status string `json:"status"`
ErrorType string `json:"errorType"`
Error string `json:"error"`
Data struct {
ResultType string `json:"resultType"`
Result json.RawMessage `json:"result"`
} `json:"data"`
// Stats supported by VictoriaMetrics since v1.90
Stats struct {
SeriesFetched *string `json:"seriesFetched,omitempty"`
} `json:"stats,omitempty"`
}
// see https://prometheus.io/docs/prometheus/latest/querying/api/#instant-queries
type promInstant struct {
// ms is populated after Unmarshal call
ms []Metric
}
// metrics returned parsed Metric slice
// Must be called only after Unmarshal
func (pi *promInstant) metrics() ([]Metric, error) {
return pi.ms, nil
}
var jsonParserPool fastjson.ParserPool
// Unmarshal unmarshals the given byte slice into promInstant
// It is using fastjson to reduce number of allocations compared to
// standard json.Unmarshal function.
// Response example:
//
// [{"metric":{"__name__":"up","job":"prometheus"},value": [ 1435781451.781,"1"]},
// {"metric":{"__name__":"up","job":"node"},value": [ 1435781451.781,"0"]}]
func (pi *promInstant) Unmarshal(b []byte) error {
app/vmalert: update parsing for instant responses (#6859) This change is made in attempt to reduce memory usage by vmalert when parsing big instant responses from VM/Prometheus. In https://github.com/VictoriaMetrics/VictoriaMetrics/commit/a5c427bac4102c53d1ea2aefe0b0e6872120ba9c vmalert switched from std json lib to fastjson lib in order to reduce amount of allocations, as according to highloaded profiles of vmalert the CPU is mostly spent on GC. But switching to fastjson resulted into excessive memory usage for cases when vmalert has to parse long json lines, which usually happens when instant response contains many `metric` objects. In this change we do a mixed parsing: 1. Slice of `metric` objects is parsed with std lib to keep mem low 2. Each `metric` object is parsed with fastjson to reduce allocs The benchmark results are the following: ``` pkg: github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/datasource BenchmarkParsePrometheusResponse/Instant_std+fastjson-10 1760 668959 ns/op 280147 B/op 5781 allocs/op MBs allocated at heap: 493.078392 mallocs: 18655472 BenchmarkParsePrometheusResponse/Instant_fastjson-10 6109 198258 ns/op 172839 B/op 5548 allocs/op MBs allocated at heap: 1056.384464 mallocs: 34457184 BenchmarkParsePrometheusResponse/Instant_std-10 1287 950987 ns/op 451677 B/op 9619 allocs/op MBs allocated at heap: 580.802976 mallocs: 13351636 ``` The benchmark function code with mem measurement is available here https://gist.github.com/hagen1778/b9c3ca7f8ca7d6b21aec9777112c5810 The benchmark contains 3 results: 1. Instant_std+fastjson is the implementation in this change 2. Instant_fastjson-10 is the implementation from https://github.com/VictoriaMetrics/VictoriaMetrics/commit/a5c427bac4102c53d1ea2aefe0b0e6872120ba9c 3. BenchmarkParsePrometheusResponse/Instant_std-10 is implementation before https://github.com/VictoriaMetrics/VictoriaMetrics/commit/a5c427bac4102c53d1ea2aefe0b0e6872120ba9c According to these results, this new implementation is slower than previous, but faster than before switching to fastjson. It also has lower number of allocations and roughly the same memory allocation on heap with GC turned off. --------- Other changes: 1. rm BenchmarkMetrics as it doesn't measure anything 2. simplify BenchmarkParsePrometheusResponse into BenchmarkPromInstantUnmarshal ### Describe Your Changes Please provide a brief description of the changes you made. Be as specific as possible to help others understand the purpose and impact of your modifications. ### Checklist The following checks are **mandatory**: - [ ] My change adheres [VictoriaMetrics contributing guidelines](https://docs.victoriametrics.com/contributing/). Signed-off-by: hagen1778 <roman@victoriametrics.com>
2024-08-22 15:36:11 +00:00
var metrics []json.RawMessage
// metrics slice could be large, so parsing it with fastjson could consume a lot of memory.
// We parse the slice with standard lib to keep mem usage low.
// And each metric object will be parsed with fastjson to reduce allocations.
if err := json.Unmarshal(b, &metrics); err != nil {
return fmt.Errorf("cannot unmarshal metrics: %w", err)
}
p := jsonParserPool.Get()
defer jsonParserPool.Put(p)
app/vmalert: update parsing for instant responses (#6859) This change is made in attempt to reduce memory usage by vmalert when parsing big instant responses from VM/Prometheus. In https://github.com/VictoriaMetrics/VictoriaMetrics/commit/a5c427bac4102c53d1ea2aefe0b0e6872120ba9c vmalert switched from std json lib to fastjson lib in order to reduce amount of allocations, as according to highloaded profiles of vmalert the CPU is mostly spent on GC. But switching to fastjson resulted into excessive memory usage for cases when vmalert has to parse long json lines, which usually happens when instant response contains many `metric` objects. In this change we do a mixed parsing: 1. Slice of `metric` objects is parsed with std lib to keep mem low 2. Each `metric` object is parsed with fastjson to reduce allocs The benchmark results are the following: ``` pkg: github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/datasource BenchmarkParsePrometheusResponse/Instant_std+fastjson-10 1760 668959 ns/op 280147 B/op 5781 allocs/op MBs allocated at heap: 493.078392 mallocs: 18655472 BenchmarkParsePrometheusResponse/Instant_fastjson-10 6109 198258 ns/op 172839 B/op 5548 allocs/op MBs allocated at heap: 1056.384464 mallocs: 34457184 BenchmarkParsePrometheusResponse/Instant_std-10 1287 950987 ns/op 451677 B/op 9619 allocs/op MBs allocated at heap: 580.802976 mallocs: 13351636 ``` The benchmark function code with mem measurement is available here https://gist.github.com/hagen1778/b9c3ca7f8ca7d6b21aec9777112c5810 The benchmark contains 3 results: 1. Instant_std+fastjson is the implementation in this change 2. Instant_fastjson-10 is the implementation from https://github.com/VictoriaMetrics/VictoriaMetrics/commit/a5c427bac4102c53d1ea2aefe0b0e6872120ba9c 3. BenchmarkParsePrometheusResponse/Instant_std-10 is implementation before https://github.com/VictoriaMetrics/VictoriaMetrics/commit/a5c427bac4102c53d1ea2aefe0b0e6872120ba9c According to these results, this new implementation is slower than previous, but faster than before switching to fastjson. It also has lower number of allocations and roughly the same memory allocation on heap with GC turned off. --------- Other changes: 1. rm BenchmarkMetrics as it doesn't measure anything 2. simplify BenchmarkParsePrometheusResponse into BenchmarkPromInstantUnmarshal ### Describe Your Changes Please provide a brief description of the changes you made. Be as specific as possible to help others understand the purpose and impact of your modifications. ### Checklist The following checks are **mandatory**: - [ ] My change adheres [VictoriaMetrics contributing guidelines](https://docs.victoriametrics.com/contributing/). Signed-off-by: hagen1778 <roman@victoriametrics.com>
2024-08-22 15:36:11 +00:00
pi.ms = make([]Metric, len(metrics))
for i, data := range metrics {
row, err := p.ParseBytes(data)
if err != nil {
return fmt.Errorf("cannot parse metric object: %w", err)
}
metric := row.Get("metric")
if metric == nil {
return fmt.Errorf("can't find `metric` object in %q", row)
}
labels := metric.GetObject()
r := &pi.ms[i]
r.Labels = make([]Label, 0, labels.Len())
labels.Visit(func(key []byte, v *fastjson.Value) {
lv, errLocal := v.StringBytes()
if errLocal != nil {
err = fmt.Errorf("error when parsing label value %q: %s", v, errLocal)
return
}
r.Labels = append(r.Labels, Label{
Name: string(key),
Value: string(lv),
})
})
if err != nil {
return fmt.Errorf("error when parsing `metric` object in %q: %w", row, err)
}
value := row.Get("value")
if value == nil {
return fmt.Errorf("can't find `value` object in %q", row)
}
sample := value.GetArray()
if len(sample) != 2 {
return fmt.Errorf("object `value` in %q should contain 2 values, but contains %d instead", row, len(sample))
}
r.Timestamps = []int64{sample[0].GetInt64()}
val, err := sample[1].StringBytes()
if err != nil {
return fmt.Errorf("error when parsing `value` object %q: %s", sample[1], err)
}
f, err := strconv.ParseFloat(bytesutil.ToUnsafeString(val), 64)
if err != nil {
return fmt.Errorf("error when parsing float64 from %s in %q: %w", sample[1], row, err)
}
r.Values = []float64{f}
}
return nil
}
type promRange struct {
Result []struct {
Labels map[string]string `json:"metric"`
TVs [][2]any `json:"values"`
} `json:"result"`
}
func (r promRange) metrics() ([]Metric, error) {
var result []Metric
for i, res := range r.Result {
var m Metric
for _, tv := range res.TVs {
f, err := strconv.ParseFloat(tv[1].(string), 64)
if err != nil {
return nil, fmt.Errorf("metric %v, unable to parse float64 from %s: %w", res, tv[1], err)
}
m.Values = append(m.Values, f)
m.Timestamps = append(m.Timestamps, int64(tv[0].(float64)))
}
if len(m.Values) < 1 || len(m.Timestamps) < 1 {
return nil, fmt.Errorf("metric %v contains no values", res)
}
m.Labels = nil
for k, v := range r.Result[i].Labels {
m.AddLabel(k, v)
}
result = append(result, m)
}
return result, nil
}
type promScalar [2]any
func (r promScalar) metrics() ([]Metric, error) {
var m Metric
f, err := strconv.ParseFloat(r[1].(string), 64)
if err != nil {
return nil, fmt.Errorf("metric %v, unable to parse float64 from %s: %w", r, r[1], err)
}
m.Values = append(m.Values, f)
m.Timestamps = append(m.Timestamps, int64(r[0].(float64)))
return []Metric{m}, nil
}
const (
statusSuccess, statusError = "success", "error"
rtVector, rtMatrix, rScalar = "vector", "matrix", "scalar"
)
func parsePrometheusResponse(req *http.Request, resp *http.Response) (res Result, err error) {
r := &promResponse{}
if err = json.NewDecoder(resp.Body).Decode(r); err != nil {
return res, fmt.Errorf("error parsing prometheus metrics for %s: %w", req.URL.Redacted(), err)
}
if r.Status == statusError {
return res, fmt.Errorf("response error, query: %s, errorType: %s, error: %s", req.URL.Redacted(), r.ErrorType, r.Error)
}
if r.Status != statusSuccess {
return res, fmt.Errorf("unknown status: %s, Expected success or error", r.Status)
}
var parseFn func() ([]Metric, error)
switch r.Data.ResultType {
case rtVector:
var pi promInstant
if err := pi.Unmarshal(r.Data.Result); err != nil {
return res, fmt.Errorf("unmarshal err %w; \n %#v", err, string(r.Data.Result))
}
parseFn = pi.metrics
case rtMatrix:
var pr promRange
if err := json.Unmarshal(r.Data.Result, &pr.Result); err != nil {
return res, err
}
parseFn = pr.metrics
case rScalar:
var ps promScalar
if err := json.Unmarshal(r.Data.Result, &ps); err != nil {
return res, err
}
parseFn = ps.metrics
default:
return res, fmt.Errorf("unknown result type %q", r.Data.ResultType)
}
ms, err := parseFn()
if err != nil {
return res, err
}
res = Result{Data: ms}
if r.Stats.SeriesFetched != nil {
intV, err := strconv.Atoi(*r.Stats.SeriesFetched)
if err != nil {
return res, fmt.Errorf("failed to convert stats.seriesFetched to int: %w", err)
}
res.SeriesFetched = &intV
}
return res, nil
}
func (s *VMStorage) setPrometheusInstantReqParams(r *http.Request, query string, timestamp time.Time) {
if s.appendTypePrefix {
r.URL.Path += "/prometheus"
}
if !*disablePathAppend {
r.URL.Path += "/api/v1/query"
}
q := r.URL.Query()
q.Set("time", timestamp.Format(time.RFC3339))
if !*disableStepParam && s.evaluationInterval > 0 { // set step as evaluationInterval by default
// always convert to seconds to keep compatibility with older
// Prometheus versions. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1943
q.Set("step", fmt.Sprintf("%ds", int(s.evaluationInterval.Seconds())))
}
if !*disableStepParam && s.queryStep > 0 { // override step with user-specified value
// always convert to seconds to keep compatibility with older
// Prometheus versions. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1943
q.Set("step", fmt.Sprintf("%ds", int(s.queryStep.Seconds())))
}
r.URL.RawQuery = q.Encode()
s.setPrometheusReqParams(r, query)
}
func (s *VMStorage) setPrometheusRangeReqParams(r *http.Request, query string, start, end time.Time) {
if s.appendTypePrefix {
r.URL.Path += "/prometheus"
}
if !*disablePathAppend {
r.URL.Path += "/api/v1/query_range"
}
q := r.URL.Query()
q.Add("start", start.Format(time.RFC3339))
q.Add("end", end.Format(time.RFC3339))
if s.evaluationInterval > 0 { // set step as evaluationInterval by default
// always convert to seconds to keep compatibility with older
// Prometheus versions. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1943
q.Set("step", fmt.Sprintf("%ds", int(s.evaluationInterval.Seconds())))
}
r.URL.RawQuery = q.Encode()
s.setPrometheusReqParams(r, query)
}
func (s *VMStorage) setPrometheusReqParams(r *http.Request, query string) {
q := r.URL.Query()
for k, vs := range s.extraParams {
if q.Has(k) { // extraParams are prior to params in URL
q.Del(k)
}
for _, v := range vs {
q.Add(k, v)
}
}
q.Set("query", query)
r.URL.RawQuery = q.Encode()
}