app/vmselect: initial implementation of Graphite Metrics API

See https://graphite-api.readthedocs.io/en/latest/api.html#the-metrics-api
This commit is contained in:
Aliaksandr Valialkin 2020-09-11 00:29:26 +03:00
parent 87f916a2fb
commit f307e6f432
20 changed files with 2018 additions and 334 deletions

View file

@ -180,7 +180,7 @@ or [an alternative dashboard for VictoriaMetrics cluster](https://grafana.com/gr
- `prometheus/api/v1/import/csv` - for importing arbitrary CSV data. See [these docs](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/README.md#how-to-import-csv-data) for details.
- `prometheus/api/v1/import/prometheus` - for importing data in Prometheus exposition format. See [these docs](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/README.md#how-to-import-data-in-prometheus-exposition-format) for details.
* URLs for querying: `http://<vmselect>:8481/select/<accountID>/prometheus/<suffix>`, where:
* URLs for [Prmetheus querying API](https://prometheus.io/docs/prometheus/latest/querying/api/): `http://<vmselect>:8481/select/<accountID>/prometheus/<suffix>`, where:
- `<accountID>` is an arbitrary number identifying data namespace for the query (aka tenant)
- `<suffix>` may have the following values:
- `api/v1/query` - performs [PromQL instant query](https://prometheus.io/docs/prometheus/latest/querying/api/#instant-queries).
@ -194,6 +194,13 @@ or [an alternative dashboard for VictoriaMetrics cluster](https://grafana.com/gr
- `api/v1/status/active_queries` - for currently executed active queries. Note that every `vmselect` maintains an independent list of active queries,
which is returned in the response.
* URLs for [Graphite Metrics API](https://graphite-api.readthedocs.io/en/latest/api.html#the-metrics-api): `http://<vmselect>:8481/select/<accountID>/graphite/<suffix>`, where:
- `<accountID>` is an arbitrary number identifying data namespace for query (aka tenant)
- `<suffix>` may have the following values:
- `metrics/find` - searches Graphite metrics. See [these docs](https://graphite-api.readthedocs.io/en/latest/api.html#metrics-find).
- `metrics/expand` - expands Graphite metrics. See [these docs](https://graphite-api.readthedocs.io/en/latest/api.html#metrics-expand).
- `metrics/index.json` - returns all the metric names. See [these docs](https://graphite-api.readthedocs.io/en/latest/api.html#metrics-index-json).
* URL for time series deletion: `http://<vmselect>:8481/delete/<accountID>/prometheus/api/v1/admin/tsdb/delete_series?match[]=<timeseries_selector_for_delete>`.
Note that the `delete_series` handler should be used only in exceptional cases such as deletion of accidentally ingested incorrect time series. It shouldn't
be used on a regular basis, since it carries non-zero overhead.

View file

@ -0,0 +1,396 @@
package graphite
import (
"fmt"
"net/http"
"regexp"
"sort"
"strings"
"sync"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/netstorage"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/searchutils"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
"github.com/VictoriaMetrics/metrics"
)
// MetricsFindHandler implements /metrics/find handler.
//
// See https://graphite-api.readthedocs.io/en/latest/api.html#metrics-find
func MetricsFindHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter, r *http.Request) error {
deadline := searchutils.GetDeadlineForQuery(r, startTime)
if err := r.ParseForm(); err != nil {
return fmt.Errorf("cannot parse form values: %w", err)
}
format := r.FormValue("format")
if format == "" {
format = "treejson"
}
switch format {
case "treejson", "completer":
default:
return fmt.Errorf(`unexpected "format" query arg: %q; expecting "treejson" or "completer"`, format)
}
query := r.FormValue("query")
if len(query) == 0 {
return fmt.Errorf("expecting non-empty `query` arg")
}
delimiter := r.FormValue("delimiter")
if delimiter == "" {
delimiter = "."
}
if len(delimiter) > 1 {
return fmt.Errorf("`delimiter` query arg must contain only a single char")
}
if searchutils.GetBool(r, "automatic_variants") {
// See https://github.com/graphite-project/graphite-web/blob/bb9feb0e6815faa73f538af6ed35adea0fb273fd/webapp/graphite/metrics/views.py#L152
query = addAutomaticVariants(query, delimiter)
}
if format == "completer" {
// See https://github.com/graphite-project/graphite-web/blob/bb9feb0e6815faa73f538af6ed35adea0fb273fd/webapp/graphite/metrics/views.py#L148
query = strings.ReplaceAll(query, "..", ".*")
if !strings.HasSuffix(query, "*") {
query += "*"
}
}
leavesOnly := searchutils.GetBool(r, "leavesOnly")
wildcards := searchutils.GetBool(r, "wildcards")
label := r.FormValue("label")
if label == "__name__" {
label = ""
}
jsonp := r.FormValue("jsonp")
from, err := searchutils.GetTime(r, "from", 0)
if err != nil {
return err
}
ct := startTime.UnixNano() / 1e6
until, err := searchutils.GetTime(r, "until", ct)
if err != nil {
return err
}
tr := storage.TimeRange{
MinTimestamp: from,
MaxTimestamp: until,
}
paths, isPartial, err := metricsFind(at, tr, label, query, delimiter[0], deadline)
if err != nil {
return err
}
if isPartial && searchutils.GetDenyPartialResponse(r) {
return fmt.Errorf("cannot return full response, since some of vmstorage nodes are unavailable")
}
if leavesOnly {
paths = filterLeaves(paths, delimiter)
}
sortPaths(paths, delimiter)
contentType := "application/json"
if jsonp != "" {
contentType = "text/javascript"
}
w.Header().Set("Content-Type", contentType)
WriteMetricsFindResponse(w, paths, delimiter, format, wildcards, jsonp)
metricsFindDuration.UpdateDuration(startTime)
return nil
}
// MetricsExpandHandler implements /metrics/expand handler.
//
// See https://graphite-api.readthedocs.io/en/latest/api.html#metrics-expand
func MetricsExpandHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter, r *http.Request) error {
deadline := searchutils.GetDeadlineForQuery(r, startTime)
if err := r.ParseForm(); err != nil {
return fmt.Errorf("cannot parse form values: %w", err)
}
queries := r.Form["query"]
if len(queries) == 0 {
return fmt.Errorf("missing `query` arg")
}
groupByExpr := searchutils.GetBool(r, "groupByExpr")
leavesOnly := searchutils.GetBool(r, "leavesOnly")
label := r.FormValue("label")
if label == "__name__" {
label = ""
}
delimiter := r.FormValue("delimiter")
if delimiter == "" {
delimiter = "."
}
if len(delimiter) > 1 {
return fmt.Errorf("`delimiter` query arg must contain only a single char")
}
jsonp := r.FormValue("jsonp")
from, err := searchutils.GetTime(r, "from", 0)
if err != nil {
return err
}
ct := startTime.UnixNano() / 1e6
until, err := searchutils.GetTime(r, "until", ct)
if err != nil {
return err
}
tr := storage.TimeRange{
MinTimestamp: from,
MaxTimestamp: until,
}
m := make(map[string][]string, len(queries))
for _, query := range queries {
paths, isPartial, err := metricsFind(at, tr, label, query, delimiter[0], deadline)
if err != nil {
return err
}
if isPartial && searchutils.GetDenyPartialResponse(r) {
return fmt.Errorf("cannot return full response, since some of vmstorage nodes are unavailable")
}
if leavesOnly {
paths = filterLeaves(paths, delimiter)
}
m[query] = paths
}
contentType := "application/json"
if jsonp != "" {
contentType = "text/javascript"
}
w.Header().Set("Content-Type", contentType)
if groupByExpr {
for _, paths := range m {
sortPaths(paths, delimiter)
}
WriteMetricsExpandResponseByQuery(w, m, jsonp)
return nil
}
paths := m[queries[0]]
if len(m) > 1 {
pathsSet := make(map[string]struct{})
for _, paths := range m {
for _, path := range paths {
pathsSet[path] = struct{}{}
}
}
paths = make([]string, 0, len(pathsSet))
for path := range pathsSet {
paths = append(paths, path)
}
}
sortPaths(paths, delimiter)
WriteMetricsExpandResponseFlat(w, paths, jsonp)
metricsExpandDuration.UpdateDuration(startTime)
return nil
}
// MetricsIndexHandler implements /metrics/index.json handler.
//
// See https://graphite-api.readthedocs.io/en/latest/api.html#metrics-index-json
func MetricsIndexHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter, r *http.Request) error {
deadline := searchutils.GetDeadlineForQuery(r, startTime)
if err := r.ParseForm(); err != nil {
return fmt.Errorf("cannot parse form values: %w", err)
}
jsonp := r.FormValue("jsonp")
metricNames, isPartial, err := netstorage.GetLabelValues(at, "__name__", deadline)
if err != nil {
return fmt.Errorf(`cannot obtain metric names: %w`, err)
}
if isPartial && searchutils.GetDenyPartialResponse(r) {
return fmt.Errorf("cannot return full response, since some of vmstorage nodes are unavailable")
}
contentType := "application/json"
if jsonp != "" {
contentType = "text/javascript"
}
w.Header().Set("Content-Type", contentType)
WriteMetricsIndexResponse(w, metricNames, jsonp)
metricsIndexDuration.UpdateDuration(startTime)
return nil
}
// metricsFind searches for label values that match the given query.
func metricsFind(at *auth.Token, tr storage.TimeRange, label, query string, delimiter byte, deadline netstorage.Deadline) ([]string, bool, error) {
expandTail := strings.HasSuffix(query, "*")
for strings.HasSuffix(query, "*") {
query = query[:len(query)-1]
}
var results []string
n := strings.IndexAny(query, "*{[")
if n < 0 {
suffixes, isPartial, err := netstorage.GetTagValueSuffixes(at, tr, label, query, delimiter, deadline)
if err != nil {
return nil, false, err
}
if expandTail {
for _, suffix := range suffixes {
results = append(results, query+suffix)
}
} else if isFullMatch(query, suffixes, delimiter) {
results = append(results, query)
}
return results, isPartial, nil
}
subquery := query[:n] + "*"
paths, isPartial, err := metricsFind(at, tr, label, subquery, delimiter, deadline)
if err != nil {
return nil, false, err
}
tail := ""
suffix := query[n:]
if m := strings.IndexByte(suffix, delimiter); m >= 0 {
tail = suffix[m+1:]
suffix = suffix[:m+1]
}
q := query[:n] + suffix
re, err := getRegexpForQuery(q, delimiter)
if err != nil {
return nil, false, fmt.Errorf("cannot convert query %q to regexp: %w", q, err)
}
if expandTail {
tail += "*"
}
for _, path := range paths {
if !re.MatchString(path) {
continue
}
subquery := path + tail
tmp, isPartialLocal, err := metricsFind(at, tr, label, subquery, delimiter, deadline)
if err != nil {
return nil, false, err
}
if isPartialLocal {
isPartial = true
}
results = append(results, tmp...)
}
return results, isPartial, nil
}
var (
metricsFindDuration = metrics.NewSummary(`vm_request_duration_seconds{path="/select/{}/graphite/metrics/find"}`)
metricsExpandDuration = metrics.NewSummary(`vm_request_duration_seconds{path="/select/{}/graphite/metrics/expand"}`)
metricsIndexDuration = metrics.NewSummary(`vm_request_duration_seconds{path="/select/{}/graphite/metrics/expand"}`)
)
func isFullMatch(tagValuePrefix string, suffixes []string, delimiter byte) bool {
if len(suffixes) == 0 {
return false
}
if strings.LastIndexByte(tagValuePrefix, delimiter) == len(tagValuePrefix)-1 {
return true
}
for _, suffix := range suffixes {
if suffix == "" {
return true
}
}
return false
}
func addAutomaticVariants(query, delimiter string) string {
// See https://github.com/graphite-project/graphite-web/blob/bb9feb0e6815faa73f538af6ed35adea0fb273fd/webapp/graphite/metrics/views.py#L152
parts := strings.Split(query, delimiter)
for i, part := range parts {
if strings.Contains(part, ",") && !strings.Contains(part, "{") {
parts[i] = "{" + part + "}"
}
}
return strings.Join(parts, delimiter)
}
func filterLeaves(paths []string, delimiter string) []string {
leaves := paths[:0]
for _, path := range paths {
if !strings.HasSuffix(path, delimiter) {
leaves = append(leaves, path)
}
}
return leaves
}
func sortPaths(paths []string, delimiter string) {
sort.Slice(paths, func(i, j int) bool {
a, b := paths[i], paths[j]
isNodeA := strings.HasSuffix(a, delimiter)
isNodeB := strings.HasSuffix(b, delimiter)
if isNodeA == isNodeB {
return a < b
}
return isNodeA
})
}
func getRegexpForQuery(query string, delimiter byte) (*regexp.Regexp, error) {
regexpCacheLock.Lock()
defer regexpCacheLock.Unlock()
k := regexpCacheKey{
query: query,
delimiter: delimiter,
}
if re := regexpCache[k]; re != nil {
return re.re, re.err
}
a := make([]string, 0, len(query))
tillNextDelimiter := "[^" + regexp.QuoteMeta(string([]byte{delimiter})) + "]*"
for i := 0; i < len(query); i++ {
switch query[i] {
case '*':
a = append(a, tillNextDelimiter)
case '{':
tmp := query[i+1:]
if n := strings.IndexByte(tmp, '}'); n < 0 {
a = append(a, regexp.QuoteMeta(query[i:]))
i = len(query)
} else {
a = append(a, "(?:")
opts := strings.Split(tmp[:n], ",")
for j, opt := range opts {
opts[j] = regexp.QuoteMeta(opt)
}
a = append(a, strings.Join(opts, "|"))
a = append(a, ")")
i += n + 1
}
case '[':
tmp := query[i:]
if n := strings.IndexByte(tmp, ']'); n < 0 {
a = append(a, regexp.QuoteMeta(query[i:]))
i = len(query)
} else {
a = append(a, tmp[:n+1])
i += n
}
default:
a = append(a, regexp.QuoteMeta(query[i:i+1]))
}
}
s := strings.Join(a, "")
re, err := regexp.Compile(s)
regexpCache[k] = &regexpCacheEntry{
re: re,
err: err,
}
if len(regexpCache) >= maxRegexpCacheSize {
for k := range regexpCache {
if len(regexpCache) < maxRegexpCacheSize {
break
}
delete(regexpCache, k)
}
}
return re, err
}
type regexpCacheEntry struct {
re *regexp.Regexp
err error
}
type regexpCacheKey struct {
query string
delimiter byte
}
var regexpCache = make(map[regexpCacheKey]*regexpCacheEntry)
var regexpCacheLock sync.Mutex
const maxRegexpCacheSize = 10000

View file

@ -0,0 +1,71 @@
package graphite
import (
"reflect"
"testing"
)
func TestGetRegexpForQuery(t *testing.T) {
f := func(query string, delimiter byte, reExpected string) {
t.Helper()
re, err := getRegexpForQuery(query, delimiter)
if err != nil {
t.Fatalf("unexpected error in getRegexpForQuery(%q): %s", query, err)
}
reStr := re.String()
if reStr != reExpected {
t.Fatalf("unexpected regexp for query=%q, delimiter=%c; got %s; want %s", query, delimiter, reStr, reExpected)
}
}
f("", '.', "")
f("foobar", '.', "foobar")
f("*", '.', `[^\.]*`)
f("*", '_', `[^_]*`)
f("foo.*.bar", '.', `foo\.[^\.]*\.bar`)
f("fo*b{ar,aaa}[a-z]xx*.d", '.', `fo[^\.]*b(?:ar|aaa)[a-z]xx[^\.]*\.d`)
f("fo*b{ar,aaa}[a-z]xx*_d", '_', `fo[^_]*b(?:ar|aaa)[a-z]xx[^_]*_d`)
}
func TestSortPaths(t *testing.T) {
f := func(paths []string, delimiter string, pathsSortedExpected []string) {
t.Helper()
sortPaths(paths, delimiter)
if !reflect.DeepEqual(paths, pathsSortedExpected) {
t.Fatalf("unexpected sortPaths result;\ngot\n%q\nwant\n%q", paths, pathsSortedExpected)
}
}
f([]string{"foo", "bar"}, ".", []string{"bar", "foo"})
f([]string{"foo.", "bar", "aa", "ab."}, ".", []string{"ab.", "foo.", "aa", "bar"})
f([]string{"foo.", "bar", "aa", "ab."}, "_", []string{"aa", "ab.", "bar", "foo."})
}
func TestFilterLeaves(t *testing.T) {
f := func(paths []string, delimiter string, leavesExpected []string) {
t.Helper()
leaves := filterLeaves(paths, delimiter)
if !reflect.DeepEqual(leaves, leavesExpected) {
t.Fatalf("unexpected leaves; got\n%q\nwant\n%q", leaves, leavesExpected)
}
}
f([]string{"foo", "bar"}, ".", []string{"foo", "bar"})
f([]string{"a.", ".", "bc"}, ".", []string{"bc"})
f([]string{"a.", ".", "bc"}, "_", []string{"a.", ".", "bc"})
f([]string{"a_", "_", "bc"}, "_", []string{"bc"})
f([]string{"foo.", "bar."}, ".", []string{})
}
func TestAddAutomaticVariants(t *testing.T) {
f := func(query, delimiter, resultExpected string) {
t.Helper()
result := addAutomaticVariants(query, delimiter)
if result != resultExpected {
t.Fatalf("unexpected result for addAutomaticVariants(%q, delimiter=%q); got %q; want %q", query, delimiter, result, resultExpected)
}
}
f("", ".", "")
f("foobar", ".", "foobar")
f("foo,bar.baz", ".", "{foo,bar}.baz")
f("foo,bar.baz", "_", "{foo,bar.baz}")
f("foo,bar_baz*", "_", "{foo,bar}_baz*")
f("foo.bar,baz,aa.bb,cc", ".", "foo.{bar,baz,aa}.{bb,cc}")
}

View file

@ -0,0 +1,38 @@
{% stripspace %}
MetricsExpandResponseByQuery generates response for /metrics/expand?groupByExpr=1 .
See https://graphite-api.readthedocs.io/en/latest/api.html#metrics-expand
{% func MetricsExpandResponseByQuery(m map[string][]string, jsonp string) %}
{% if jsonp != "" %}{%s= jsonp %}({% endif %}
{
"results":{
{% code i := 0 %}
{% for query, paths := range m %}
{%q= query %}:{%= metricPaths(paths) %}
{% code i++ %}
{% if i < len(m) %},{% endif %}
{% endfor %}
}
}
{% if jsonp != "" %}){% endif %}
{% endfunc %}
MetricsExpandResponseFlat generates response for /metrics/expand?groupByExpr=0 .
See https://graphite-api.readthedocs.io/en/latest/api.html#metrics-expand
{% func MetricsExpandResponseFlat(paths []string, jsonp string) %}
{% if jsonp != "" %}{%s= jsonp %}({% endif %}
{%= metricPaths(paths) %}
{% if jsonp != "" %}){% endif %}
{% endfunc %}
{% func metricPaths(paths []string) %}
[
{% for i, path := range paths %}
{%q= path %}
{% if i+1 < len(paths) %},{% endif %}
{% endfor %}
]
{% endfunc %}
{% endstripspace %}

View file

@ -0,0 +1,187 @@
// Code generated by qtc from "metrics_expand_response.qtpl". DO NOT EDIT.
// See https://github.com/valyala/quicktemplate for details.
// MetricsExpandResponseByQuery generates response for /metrics/expand?groupByExpr=1 .See https://graphite-api.readthedocs.io/en/latest/api.html#metrics-expand
//line app/vmselect/graphite/metrics_expand_response.qtpl:5
package graphite
//line app/vmselect/graphite/metrics_expand_response.qtpl:5
import (
qtio422016 "io"
qt422016 "github.com/valyala/quicktemplate"
)
//line app/vmselect/graphite/metrics_expand_response.qtpl:5
var (
_ = qtio422016.Copy
_ = qt422016.AcquireByteBuffer
)
//line app/vmselect/graphite/metrics_expand_response.qtpl:5
func StreamMetricsExpandResponseByQuery(qw422016 *qt422016.Writer, m map[string][]string, jsonp string) {
//line app/vmselect/graphite/metrics_expand_response.qtpl:6
if jsonp != "" {
//line app/vmselect/graphite/metrics_expand_response.qtpl:6
qw422016.N().S(jsonp)
//line app/vmselect/graphite/metrics_expand_response.qtpl:6
qw422016.N().S(`(`)
//line app/vmselect/graphite/metrics_expand_response.qtpl:6
}
//line app/vmselect/graphite/metrics_expand_response.qtpl:6
qw422016.N().S(`{"results":{`)
//line app/vmselect/graphite/metrics_expand_response.qtpl:9
i := 0
//line app/vmselect/graphite/metrics_expand_response.qtpl:10
for query, paths := range m {
//line app/vmselect/graphite/metrics_expand_response.qtpl:11
qw422016.N().Q(query)
//line app/vmselect/graphite/metrics_expand_response.qtpl:11
qw422016.N().S(`:`)
//line app/vmselect/graphite/metrics_expand_response.qtpl:11
streammetricPaths(qw422016, paths)
//line app/vmselect/graphite/metrics_expand_response.qtpl:12
i++
//line app/vmselect/graphite/metrics_expand_response.qtpl:13
if i < len(m) {
//line app/vmselect/graphite/metrics_expand_response.qtpl:13
qw422016.N().S(`,`)
//line app/vmselect/graphite/metrics_expand_response.qtpl:13
}
//line app/vmselect/graphite/metrics_expand_response.qtpl:14
}
//line app/vmselect/graphite/metrics_expand_response.qtpl:14
qw422016.N().S(`}}`)
//line app/vmselect/graphite/metrics_expand_response.qtpl:17
if jsonp != "" {
//line app/vmselect/graphite/metrics_expand_response.qtpl:17
qw422016.N().S(`)`)
//line app/vmselect/graphite/metrics_expand_response.qtpl:17
}
//line app/vmselect/graphite/metrics_expand_response.qtpl:18
}
//line app/vmselect/graphite/metrics_expand_response.qtpl:18
func WriteMetricsExpandResponseByQuery(qq422016 qtio422016.Writer, m map[string][]string, jsonp string) {
//line app/vmselect/graphite/metrics_expand_response.qtpl:18
qw422016 := qt422016.AcquireWriter(qq422016)
//line app/vmselect/graphite/metrics_expand_response.qtpl:18
StreamMetricsExpandResponseByQuery(qw422016, m, jsonp)
//line app/vmselect/graphite/metrics_expand_response.qtpl:18
qt422016.ReleaseWriter(qw422016)
//line app/vmselect/graphite/metrics_expand_response.qtpl:18
}
//line app/vmselect/graphite/metrics_expand_response.qtpl:18
func MetricsExpandResponseByQuery(m map[string][]string, jsonp string) string {
//line app/vmselect/graphite/metrics_expand_response.qtpl:18
qb422016 := qt422016.AcquireByteBuffer()
//line app/vmselect/graphite/metrics_expand_response.qtpl:18
WriteMetricsExpandResponseByQuery(qb422016, m, jsonp)
//line app/vmselect/graphite/metrics_expand_response.qtpl:18
qs422016 := string(qb422016.B)
//line app/vmselect/graphite/metrics_expand_response.qtpl:18
qt422016.ReleaseByteBuffer(qb422016)
//line app/vmselect/graphite/metrics_expand_response.qtpl:18
return qs422016
//line app/vmselect/graphite/metrics_expand_response.qtpl:18
}
// MetricsExpandResponseFlat generates response for /metrics/expand?groupByExpr=0 .See https://graphite-api.readthedocs.io/en/latest/api.html#metrics-expand
//line app/vmselect/graphite/metrics_expand_response.qtpl:23
func StreamMetricsExpandResponseFlat(qw422016 *qt422016.Writer, paths []string, jsonp string) {
//line app/vmselect/graphite/metrics_expand_response.qtpl:24
if jsonp != "" {
//line app/vmselect/graphite/metrics_expand_response.qtpl:24
qw422016.N().S(jsonp)
//line app/vmselect/graphite/metrics_expand_response.qtpl:24
qw422016.N().S(`(`)
//line app/vmselect/graphite/metrics_expand_response.qtpl:24
}
//line app/vmselect/graphite/metrics_expand_response.qtpl:25
streammetricPaths(qw422016, paths)
//line app/vmselect/graphite/metrics_expand_response.qtpl:26
if jsonp != "" {
//line app/vmselect/graphite/metrics_expand_response.qtpl:26
qw422016.N().S(`)`)
//line app/vmselect/graphite/metrics_expand_response.qtpl:26
}
//line app/vmselect/graphite/metrics_expand_response.qtpl:27
}
//line app/vmselect/graphite/metrics_expand_response.qtpl:27
func WriteMetricsExpandResponseFlat(qq422016 qtio422016.Writer, paths []string, jsonp string) {
//line app/vmselect/graphite/metrics_expand_response.qtpl:27
qw422016 := qt422016.AcquireWriter(qq422016)
//line app/vmselect/graphite/metrics_expand_response.qtpl:27
StreamMetricsExpandResponseFlat(qw422016, paths, jsonp)
//line app/vmselect/graphite/metrics_expand_response.qtpl:27
qt422016.ReleaseWriter(qw422016)
//line app/vmselect/graphite/metrics_expand_response.qtpl:27
}
//line app/vmselect/graphite/metrics_expand_response.qtpl:27
func MetricsExpandResponseFlat(paths []string, jsonp string) string {
//line app/vmselect/graphite/metrics_expand_response.qtpl:27
qb422016 := qt422016.AcquireByteBuffer()
//line app/vmselect/graphite/metrics_expand_response.qtpl:27
WriteMetricsExpandResponseFlat(qb422016, paths, jsonp)
//line app/vmselect/graphite/metrics_expand_response.qtpl:27
qs422016 := string(qb422016.B)
//line app/vmselect/graphite/metrics_expand_response.qtpl:27
qt422016.ReleaseByteBuffer(qb422016)
//line app/vmselect/graphite/metrics_expand_response.qtpl:27
return qs422016
//line app/vmselect/graphite/metrics_expand_response.qtpl:27
}
//line app/vmselect/graphite/metrics_expand_response.qtpl:29
func streammetricPaths(qw422016 *qt422016.Writer, paths []string) {
//line app/vmselect/graphite/metrics_expand_response.qtpl:29
qw422016.N().S(`[`)
//line app/vmselect/graphite/metrics_expand_response.qtpl:31
for i, path := range paths {
//line app/vmselect/graphite/metrics_expand_response.qtpl:32
qw422016.N().Q(path)
//line app/vmselect/graphite/metrics_expand_response.qtpl:33
if i+1 < len(paths) {
//line app/vmselect/graphite/metrics_expand_response.qtpl:33
qw422016.N().S(`,`)
//line app/vmselect/graphite/metrics_expand_response.qtpl:33
}
//line app/vmselect/graphite/metrics_expand_response.qtpl:34
}
//line app/vmselect/graphite/metrics_expand_response.qtpl:34
qw422016.N().S(`]`)
//line app/vmselect/graphite/metrics_expand_response.qtpl:36
}
//line app/vmselect/graphite/metrics_expand_response.qtpl:36
func writemetricPaths(qq422016 qtio422016.Writer, paths []string) {
//line app/vmselect/graphite/metrics_expand_response.qtpl:36
qw422016 := qt422016.AcquireWriter(qq422016)
//line app/vmselect/graphite/metrics_expand_response.qtpl:36
streammetricPaths(qw422016, paths)
//line app/vmselect/graphite/metrics_expand_response.qtpl:36
qt422016.ReleaseWriter(qw422016)
//line app/vmselect/graphite/metrics_expand_response.qtpl:36
}
//line app/vmselect/graphite/metrics_expand_response.qtpl:36
func metricPaths(paths []string) string {
//line app/vmselect/graphite/metrics_expand_response.qtpl:36
qb422016 := qt422016.AcquireByteBuffer()
//line app/vmselect/graphite/metrics_expand_response.qtpl:36
writemetricPaths(qb422016, paths)
//line app/vmselect/graphite/metrics_expand_response.qtpl:36
qs422016 := string(qb422016.B)
//line app/vmselect/graphite/metrics_expand_response.qtpl:36
qt422016.ReleaseByteBuffer(qb422016)
//line app/vmselect/graphite/metrics_expand_response.qtpl:36
return qs422016
//line app/vmselect/graphite/metrics_expand_response.qtpl:36
}

View file

@ -0,0 +1,110 @@
{% import (
"strings"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
) %}
{% stripspace %}
MetricsFindResponse generates response for /metrics/find .
See https://graphite-api.readthedocs.io/en/latest/api.html#metrics-find
{% func MetricsFindResponse(paths []string, delimiter, format string, addWildcards bool, jsonp string) %}
{% if jsonp != "" %}{%s= jsonp %}({% endif %}
{% switch format %}
{% case "completer" %}
{%= metricsFindResponseCompleter(paths, delimiter, addWildcards) %}
{% case "treejson" %}
{%= metricsFindResponseTreeJSON(paths, delimiter, addWildcards) %}
{% default %}
{% code logger.Panicf("BUG: unexpected format=%q", format) %}
{% endswitch %}
{% if jsonp != "" %}){% endif %}
{% endfunc %}
{% func metricsFindResponseCompleter(paths []string, delimiter string, addWildcards bool) %}
{
"metrics":[
{% for i, path := range paths %}
{
"path": {%q= path %},
"name": {%= metricPathName(path, delimiter) %},
"is_leaf": {% if strings.HasSuffix(path, delimiter) %}0{% else %}1{% endif %}
}
{% if i+1 < len(paths) %},{% endif %}
{% endfor %}
{% if addWildcards && len(paths) > 1 %}
,{
"name": "*"
}
{% endif %}
]
}
{% endfunc %}
{% func metricsFindResponseTreeJSON(paths []string, delimiter string, addWildcards bool) %}
[
{% for i, path := range paths %}
{
{% code
allowChildren := "0"
isLeaf := "1"
if strings.HasSuffix(path, delimiter) {
allowChildren = "1"
isLeaf = "0"
}
%}
"id": {%q= path %},
"text": {%= metricPathName(path, delimiter) %},
"allowChildren": {%s= allowChildren %},
"expandable": {%s= allowChildren %},
"leaf": {%s= isLeaf %}
}
{% if i+1 < len(paths) %},{% endif %}
{% endfor %}
{% if addWildcards && len(paths) > 1 %}
,{
{% code
path := paths[0]
for strings.HasSuffix(path, delimiter) {
path = path[:len(path)-1]
}
id := ""
if n := strings.LastIndexByte(path, delimiter[0]); n >= 0 {
id = path[:n+1]
}
id += "*"
allowChildren := "0"
isLeaf := "1"
for _, path := range paths {
if strings.HasSuffix(path, delimiter) {
allowChildren = "1"
isLeaf = "0"
break
}
}
%}
"id": {%q= id %},
"text": "*",
"allowChildren": {%s= allowChildren %},
"expandable": {%s= allowChildren %},
"leaf": {%s= isLeaf %}
}
{% endif %}
]
{% endfunc %}
{% func metricPathName(path, delimiter string) %}
{% code
name := path
for strings.HasSuffix(name, delimiter) {
name = name[:len(name)-1]
}
if n := strings.LastIndexByte(name, delimiter[0]); n >= 0 {
name = name[n+1:]
}
%}
{%q= name %}
{% endfunc %}
{% endstripspace %}

View file

@ -0,0 +1,326 @@
// Code generated by qtc from "metrics_find_response.qtpl". DO NOT EDIT.
// See https://github.com/valyala/quicktemplate for details.
//line app/vmselect/graphite/metrics_find_response.qtpl:1
package graphite
//line app/vmselect/graphite/metrics_find_response.qtpl:1
import (
"strings"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
)
// MetricsFindResponse generates response for /metrics/find .See https://graphite-api.readthedocs.io/en/latest/api.html#metrics-find
//line app/vmselect/graphite/metrics_find_response.qtpl:11
import (
qtio422016 "io"
qt422016 "github.com/valyala/quicktemplate"
)
//line app/vmselect/graphite/metrics_find_response.qtpl:11
var (
_ = qtio422016.Copy
_ = qt422016.AcquireByteBuffer
)
//line app/vmselect/graphite/metrics_find_response.qtpl:11
func StreamMetricsFindResponse(qw422016 *qt422016.Writer, paths []string, delimiter, format string, addWildcards bool, jsonp string) {
//line app/vmselect/graphite/metrics_find_response.qtpl:12
if jsonp != "" {
//line app/vmselect/graphite/metrics_find_response.qtpl:12
qw422016.N().S(jsonp)
//line app/vmselect/graphite/metrics_find_response.qtpl:12
qw422016.N().S(`(`)
//line app/vmselect/graphite/metrics_find_response.qtpl:12
}
//line app/vmselect/graphite/metrics_find_response.qtpl:13
switch format {
//line app/vmselect/graphite/metrics_find_response.qtpl:14
case "completer":
//line app/vmselect/graphite/metrics_find_response.qtpl:15
streammetricsFindResponseCompleter(qw422016, paths, delimiter, addWildcards)
//line app/vmselect/graphite/metrics_find_response.qtpl:16
case "treejson":
//line app/vmselect/graphite/metrics_find_response.qtpl:17
streammetricsFindResponseTreeJSON(qw422016, paths, delimiter, addWildcards)
//line app/vmselect/graphite/metrics_find_response.qtpl:18
default:
//line app/vmselect/graphite/metrics_find_response.qtpl:19
logger.Panicf("BUG: unexpected format=%q", format)
//line app/vmselect/graphite/metrics_find_response.qtpl:20
}
//line app/vmselect/graphite/metrics_find_response.qtpl:21
if jsonp != "" {
//line app/vmselect/graphite/metrics_find_response.qtpl:21
qw422016.N().S(`)`)
//line app/vmselect/graphite/metrics_find_response.qtpl:21
}
//line app/vmselect/graphite/metrics_find_response.qtpl:22
}
//line app/vmselect/graphite/metrics_find_response.qtpl:22
func WriteMetricsFindResponse(qq422016 qtio422016.Writer, paths []string, delimiter, format string, addWildcards bool, jsonp string) {
//line app/vmselect/graphite/metrics_find_response.qtpl:22
qw422016 := qt422016.AcquireWriter(qq422016)
//line app/vmselect/graphite/metrics_find_response.qtpl:22
StreamMetricsFindResponse(qw422016, paths, delimiter, format, addWildcards, jsonp)
//line app/vmselect/graphite/metrics_find_response.qtpl:22
qt422016.ReleaseWriter(qw422016)
//line app/vmselect/graphite/metrics_find_response.qtpl:22
}
//line app/vmselect/graphite/metrics_find_response.qtpl:22
func MetricsFindResponse(paths []string, delimiter, format string, addWildcards bool, jsonp string) string {
//line app/vmselect/graphite/metrics_find_response.qtpl:22
qb422016 := qt422016.AcquireByteBuffer()
//line app/vmselect/graphite/metrics_find_response.qtpl:22
WriteMetricsFindResponse(qb422016, paths, delimiter, format, addWildcards, jsonp)
//line app/vmselect/graphite/metrics_find_response.qtpl:22
qs422016 := string(qb422016.B)
//line app/vmselect/graphite/metrics_find_response.qtpl:22
qt422016.ReleaseByteBuffer(qb422016)
//line app/vmselect/graphite/metrics_find_response.qtpl:22
return qs422016
//line app/vmselect/graphite/metrics_find_response.qtpl:22
}
//line app/vmselect/graphite/metrics_find_response.qtpl:24
func streammetricsFindResponseCompleter(qw422016 *qt422016.Writer, paths []string, delimiter string, addWildcards bool) {
//line app/vmselect/graphite/metrics_find_response.qtpl:24
qw422016.N().S(`{"metrics":[`)
//line app/vmselect/graphite/metrics_find_response.qtpl:27
for i, path := range paths {
//line app/vmselect/graphite/metrics_find_response.qtpl:27
qw422016.N().S(`{"path":`)
//line app/vmselect/graphite/metrics_find_response.qtpl:29
qw422016.N().Q(path)
//line app/vmselect/graphite/metrics_find_response.qtpl:29
qw422016.N().S(`,"name":`)
//line app/vmselect/graphite/metrics_find_response.qtpl:30
streammetricPathName(qw422016, path, delimiter)
//line app/vmselect/graphite/metrics_find_response.qtpl:30
qw422016.N().S(`,"is_leaf":`)
//line app/vmselect/graphite/metrics_find_response.qtpl:31
if strings.HasSuffix(path, delimiter) {
//line app/vmselect/graphite/metrics_find_response.qtpl:31
qw422016.N().S(`0`)
//line app/vmselect/graphite/metrics_find_response.qtpl:31
} else {
//line app/vmselect/graphite/metrics_find_response.qtpl:31
qw422016.N().S(`1`)
//line app/vmselect/graphite/metrics_find_response.qtpl:31
}
//line app/vmselect/graphite/metrics_find_response.qtpl:31
qw422016.N().S(`}`)
//line app/vmselect/graphite/metrics_find_response.qtpl:33
if i+1 < len(paths) {
//line app/vmselect/graphite/metrics_find_response.qtpl:33
qw422016.N().S(`,`)
//line app/vmselect/graphite/metrics_find_response.qtpl:33
}
//line app/vmselect/graphite/metrics_find_response.qtpl:34
}
//line app/vmselect/graphite/metrics_find_response.qtpl:35
if addWildcards && len(paths) > 1 {
//line app/vmselect/graphite/metrics_find_response.qtpl:35
qw422016.N().S(`,{"name": "*"}`)
//line app/vmselect/graphite/metrics_find_response.qtpl:39
}
//line app/vmselect/graphite/metrics_find_response.qtpl:39
qw422016.N().S(`]}`)
//line app/vmselect/graphite/metrics_find_response.qtpl:42
}
//line app/vmselect/graphite/metrics_find_response.qtpl:42
func writemetricsFindResponseCompleter(qq422016 qtio422016.Writer, paths []string, delimiter string, addWildcards bool) {
//line app/vmselect/graphite/metrics_find_response.qtpl:42
qw422016 := qt422016.AcquireWriter(qq422016)
//line app/vmselect/graphite/metrics_find_response.qtpl:42
streammetricsFindResponseCompleter(qw422016, paths, delimiter, addWildcards)
//line app/vmselect/graphite/metrics_find_response.qtpl:42
qt422016.ReleaseWriter(qw422016)
//line app/vmselect/graphite/metrics_find_response.qtpl:42
}
//line app/vmselect/graphite/metrics_find_response.qtpl:42
func metricsFindResponseCompleter(paths []string, delimiter string, addWildcards bool) string {
//line app/vmselect/graphite/metrics_find_response.qtpl:42
qb422016 := qt422016.AcquireByteBuffer()
//line app/vmselect/graphite/metrics_find_response.qtpl:42
writemetricsFindResponseCompleter(qb422016, paths, delimiter, addWildcards)
//line app/vmselect/graphite/metrics_find_response.qtpl:42
qs422016 := string(qb422016.B)
//line app/vmselect/graphite/metrics_find_response.qtpl:42
qt422016.ReleaseByteBuffer(qb422016)
//line app/vmselect/graphite/metrics_find_response.qtpl:42
return qs422016
//line app/vmselect/graphite/metrics_find_response.qtpl:42
}
//line app/vmselect/graphite/metrics_find_response.qtpl:44
func streammetricsFindResponseTreeJSON(qw422016 *qt422016.Writer, paths []string, delimiter string, addWildcards bool) {
//line app/vmselect/graphite/metrics_find_response.qtpl:44
qw422016.N().S(`[`)
//line app/vmselect/graphite/metrics_find_response.qtpl:46
for i, path := range paths {
//line app/vmselect/graphite/metrics_find_response.qtpl:46
qw422016.N().S(`{`)
//line app/vmselect/graphite/metrics_find_response.qtpl:49
allowChildren := "0"
isLeaf := "1"
if strings.HasSuffix(path, delimiter) {
allowChildren = "1"
isLeaf = "0"
}
//line app/vmselect/graphite/metrics_find_response.qtpl:55
qw422016.N().S(`"id":`)
//line app/vmselect/graphite/metrics_find_response.qtpl:56
qw422016.N().Q(path)
//line app/vmselect/graphite/metrics_find_response.qtpl:56
qw422016.N().S(`,"text":`)
//line app/vmselect/graphite/metrics_find_response.qtpl:57
streammetricPathName(qw422016, path, delimiter)
//line app/vmselect/graphite/metrics_find_response.qtpl:57
qw422016.N().S(`,"allowChildren":`)
//line app/vmselect/graphite/metrics_find_response.qtpl:58
qw422016.N().S(allowChildren)
//line app/vmselect/graphite/metrics_find_response.qtpl:58
qw422016.N().S(`,"expandable":`)
//line app/vmselect/graphite/metrics_find_response.qtpl:59
qw422016.N().S(allowChildren)
//line app/vmselect/graphite/metrics_find_response.qtpl:59
qw422016.N().S(`,"leaf":`)
//line app/vmselect/graphite/metrics_find_response.qtpl:60
qw422016.N().S(isLeaf)
//line app/vmselect/graphite/metrics_find_response.qtpl:60
qw422016.N().S(`}`)
//line app/vmselect/graphite/metrics_find_response.qtpl:62
if i+1 < len(paths) {
//line app/vmselect/graphite/metrics_find_response.qtpl:62
qw422016.N().S(`,`)
//line app/vmselect/graphite/metrics_find_response.qtpl:62
}
//line app/vmselect/graphite/metrics_find_response.qtpl:63
}
//line app/vmselect/graphite/metrics_find_response.qtpl:64
if addWildcards && len(paths) > 1 {
//line app/vmselect/graphite/metrics_find_response.qtpl:64
qw422016.N().S(`,{`)
//line app/vmselect/graphite/metrics_find_response.qtpl:67
path := paths[0]
for strings.HasSuffix(path, delimiter) {
path = path[:len(path)-1]
}
id := ""
if n := strings.LastIndexByte(path, delimiter[0]); n >= 0 {
id = path[:n+1]
}
id += "*"
allowChildren := "0"
isLeaf := "1"
for _, path := range paths {
if strings.HasSuffix(path, delimiter) {
allowChildren = "1"
isLeaf = "0"
break
}
}
//line app/vmselect/graphite/metrics_find_response.qtpl:86
qw422016.N().S(`"id":`)
//line app/vmselect/graphite/metrics_find_response.qtpl:87
qw422016.N().Q(id)
//line app/vmselect/graphite/metrics_find_response.qtpl:87
qw422016.N().S(`,"text": "*","allowChildren":`)
//line app/vmselect/graphite/metrics_find_response.qtpl:89
qw422016.N().S(allowChildren)
//line app/vmselect/graphite/metrics_find_response.qtpl:89
qw422016.N().S(`,"expandable":`)
//line app/vmselect/graphite/metrics_find_response.qtpl:90
qw422016.N().S(allowChildren)
//line app/vmselect/graphite/metrics_find_response.qtpl:90
qw422016.N().S(`,"leaf":`)
//line app/vmselect/graphite/metrics_find_response.qtpl:91
qw422016.N().S(isLeaf)
//line app/vmselect/graphite/metrics_find_response.qtpl:91
qw422016.N().S(`}`)
//line app/vmselect/graphite/metrics_find_response.qtpl:93
}
//line app/vmselect/graphite/metrics_find_response.qtpl:93
qw422016.N().S(`]`)
//line app/vmselect/graphite/metrics_find_response.qtpl:95
}
//line app/vmselect/graphite/metrics_find_response.qtpl:95
func writemetricsFindResponseTreeJSON(qq422016 qtio422016.Writer, paths []string, delimiter string, addWildcards bool) {
//line app/vmselect/graphite/metrics_find_response.qtpl:95
qw422016 := qt422016.AcquireWriter(qq422016)
//line app/vmselect/graphite/metrics_find_response.qtpl:95
streammetricsFindResponseTreeJSON(qw422016, paths, delimiter, addWildcards)
//line app/vmselect/graphite/metrics_find_response.qtpl:95
qt422016.ReleaseWriter(qw422016)
//line app/vmselect/graphite/metrics_find_response.qtpl:95
}
//line app/vmselect/graphite/metrics_find_response.qtpl:95
func metricsFindResponseTreeJSON(paths []string, delimiter string, addWildcards bool) string {
//line app/vmselect/graphite/metrics_find_response.qtpl:95
qb422016 := qt422016.AcquireByteBuffer()
//line app/vmselect/graphite/metrics_find_response.qtpl:95
writemetricsFindResponseTreeJSON(qb422016, paths, delimiter, addWildcards)
//line app/vmselect/graphite/metrics_find_response.qtpl:95
qs422016 := string(qb422016.B)
//line app/vmselect/graphite/metrics_find_response.qtpl:95
qt422016.ReleaseByteBuffer(qb422016)
//line app/vmselect/graphite/metrics_find_response.qtpl:95
return qs422016
//line app/vmselect/graphite/metrics_find_response.qtpl:95
}
//line app/vmselect/graphite/metrics_find_response.qtpl:97
func streammetricPathName(qw422016 *qt422016.Writer, path, delimiter string) {
//line app/vmselect/graphite/metrics_find_response.qtpl:99
name := path
for strings.HasSuffix(name, delimiter) {
name = name[:len(name)-1]
}
if n := strings.LastIndexByte(name, delimiter[0]); n >= 0 {
name = name[n+1:]
}
//line app/vmselect/graphite/metrics_find_response.qtpl:107
qw422016.N().Q(name)
//line app/vmselect/graphite/metrics_find_response.qtpl:108
}
//line app/vmselect/graphite/metrics_find_response.qtpl:108
func writemetricPathName(qq422016 qtio422016.Writer, path, delimiter string) {
//line app/vmselect/graphite/metrics_find_response.qtpl:108
qw422016 := qt422016.AcquireWriter(qq422016)
//line app/vmselect/graphite/metrics_find_response.qtpl:108
streammetricPathName(qw422016, path, delimiter)
//line app/vmselect/graphite/metrics_find_response.qtpl:108
qt422016.ReleaseWriter(qw422016)
//line app/vmselect/graphite/metrics_find_response.qtpl:108
}
//line app/vmselect/graphite/metrics_find_response.qtpl:108
func metricPathName(path, delimiter string) string {
//line app/vmselect/graphite/metrics_find_response.qtpl:108
qb422016 := qt422016.AcquireByteBuffer()
//line app/vmselect/graphite/metrics_find_response.qtpl:108
writemetricPathName(qb422016, path, delimiter)
//line app/vmselect/graphite/metrics_find_response.qtpl:108
qs422016 := string(qb422016.B)
//line app/vmselect/graphite/metrics_find_response.qtpl:108
qt422016.ReleaseByteBuffer(qb422016)
//line app/vmselect/graphite/metrics_find_response.qtpl:108
return qs422016
//line app/vmselect/graphite/metrics_find_response.qtpl:108
}

View file

@ -0,0 +1,11 @@
{% stripspace %}
MetricsIndexResponse generates response for /metrics/index.json .
See https://graphite-api.readthedocs.io/en/latest/api.html#metrics-index-json
{% func MetricsIndexResponse(metricNames []string, jsonp string) %}
{% if jsonp != "" %}{%s= jsonp %}({% endif %}
{%= metricPaths(metricNames) %}
{% if jsonp != "" %}){% endif %}
{% endfunc %}
{% endstripspace %}

View file

@ -0,0 +1,67 @@
// Code generated by qtc from "metrics_index_response.qtpl". DO NOT EDIT.
// See https://github.com/valyala/quicktemplate for details.
// MetricsIndexResponse generates response for /metrics/index.json .See https://graphite-api.readthedocs.io/en/latest/api.html#metrics-index-json
//line app/vmselect/graphite/metrics_index_response.qtpl:5
package graphite
//line app/vmselect/graphite/metrics_index_response.qtpl:5
import (
qtio422016 "io"
qt422016 "github.com/valyala/quicktemplate"
)
//line app/vmselect/graphite/metrics_index_response.qtpl:5
var (
_ = qtio422016.Copy
_ = qt422016.AcquireByteBuffer
)
//line app/vmselect/graphite/metrics_index_response.qtpl:5
func StreamMetricsIndexResponse(qw422016 *qt422016.Writer, metricNames []string, jsonp string) {
//line app/vmselect/graphite/metrics_index_response.qtpl:6
if jsonp != "" {
//line app/vmselect/graphite/metrics_index_response.qtpl:6
qw422016.N().S(jsonp)
//line app/vmselect/graphite/metrics_index_response.qtpl:6
qw422016.N().S(`(`)
//line app/vmselect/graphite/metrics_index_response.qtpl:6
}
//line app/vmselect/graphite/metrics_index_response.qtpl:7
streammetricPaths(qw422016, metricNames)
//line app/vmselect/graphite/metrics_index_response.qtpl:8
if jsonp != "" {
//line app/vmselect/graphite/metrics_index_response.qtpl:8
qw422016.N().S(`)`)
//line app/vmselect/graphite/metrics_index_response.qtpl:8
}
//line app/vmselect/graphite/metrics_index_response.qtpl:9
}
//line app/vmselect/graphite/metrics_index_response.qtpl:9
func WriteMetricsIndexResponse(qq422016 qtio422016.Writer, metricNames []string, jsonp string) {
//line app/vmselect/graphite/metrics_index_response.qtpl:9
qw422016 := qt422016.AcquireWriter(qq422016)
//line app/vmselect/graphite/metrics_index_response.qtpl:9
StreamMetricsIndexResponse(qw422016, metricNames, jsonp)
//line app/vmselect/graphite/metrics_index_response.qtpl:9
qt422016.ReleaseWriter(qw422016)
//line app/vmselect/graphite/metrics_index_response.qtpl:9
}
//line app/vmselect/graphite/metrics_index_response.qtpl:9
func MetricsIndexResponse(metricNames []string, jsonp string) string {
//line app/vmselect/graphite/metrics_index_response.qtpl:9
qb422016 := qt422016.AcquireByteBuffer()
//line app/vmselect/graphite/metrics_index_response.qtpl:9
WriteMetricsIndexResponse(qb422016, metricNames, jsonp)
//line app/vmselect/graphite/metrics_index_response.qtpl:9
qs422016 := string(qb422016.B)
//line app/vmselect/graphite/metrics_index_response.qtpl:9
qt422016.ReleaseByteBuffer(qb422016)
//line app/vmselect/graphite/metrics_index_response.qtpl:9
return qs422016
//line app/vmselect/graphite/metrics_index_response.qtpl:9
}

View file

@ -10,6 +10,7 @@ import (
"strings"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/graphite"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/netstorage"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/prometheus"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/promql"
@ -281,6 +282,33 @@ func selectHandler(startTime time.Time, w http.ResponseWriter, r *http.Request,
return true
}
return true
case "graphite/metrics/find", "graphite/metrics/find/":
graphiteMetricsFindRequests.Inc()
httpserver.EnableCORS(w, r)
if err := graphite.MetricsFindHandler(startTime, at, w, r); err != nil {
graphiteMetricsFindErrors.Inc()
httpserver.Errorf(w, r, "error in %q: %s", r.URL.Path, err)
return true
}
return true
case "graphite/metrics/expand", "graphite/metrics/expand/":
graphiteMetricsExpandRequests.Inc()
httpserver.EnableCORS(w, r)
if err := graphite.MetricsExpandHandler(startTime, at, w, r); err != nil {
graphiteMetricsExpandErrors.Inc()
httpserver.Errorf(w, r, "error in %q: %s", r.URL.Path, err)
return true
}
return true
case "graphite/metrics/index.json", "graphite/metrics/index.json/":
graphiteMetricsIndexRequests.Inc()
httpserver.EnableCORS(w, r)
if err := graphite.MetricsIndexHandler(startTime, at, w, r); err != nil {
graphiteMetricsIndexErrors.Inc()
httpserver.Errorf(w, r, "error in %q: %s", r.URL.Path, err)
return true
}
return true
case "prometheus/api/v1/rules":
// Return dumb placeholder
rulesRequests.Inc()
@ -369,6 +397,15 @@ var (
federateRequests = metrics.NewCounter(`vm_http_requests_total{path="/select/{}/prometheus/federate"}`)
federateErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/select/{}/prometheus/federate"}`)
graphiteMetricsFindRequests = metrics.NewCounter(`vm_http_requests_total{path="/select/{}/graphite/metrics/find"}`)
graphiteMetricsFindErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/select/{}/graphite/metrics/find"}`)
graphiteMetricsExpandRequests = metrics.NewCounter(`vm_http_requests_total{path="/select/{}/graphite/metrics/expand"}`)
graphiteMetricsExpandErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/select/{}/graphite/metrics/expand"}`)
graphiteMetricsIndexRequests = metrics.NewCounter(`vm_http_requests_total{path="/select/{}/graphite/metrics/index.json"}`)
graphiteMetricsIndexErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/select/{}/graphite/metrics/index.json"}`)
rulesRequests = metrics.NewCounter(`vm_http_requests_total{path="/select/{}/prometheus/api/v1/rules"}`)
alertsRequests = metrics.NewCounter(`vm_http_requests_total{path="/select/{}/prometheus/api/v1/alerts"}`)
metadataRequests = metrics.NewCounter(`vm_http_requests_total{path="/select/{}/prometheus/api/v1/metadata"}`)

View file

@ -640,6 +640,73 @@ func GetLabelValues(at *auth.Token, labelName string, deadline Deadline) ([]stri
return labelValues, isPartialResult, nil
}
// GetTagValueSuffixes returns tag value suffixes for the given tagKey and the given tagValuePrefix.
//
// It can be used for implementing https://graphite-api.readthedocs.io/en/latest/api.html#metrics-find
func GetTagValueSuffixes(at *auth.Token, tr storage.TimeRange, tagKey, tagValuePrefix string, delimiter byte, deadline Deadline) ([]string, bool, error) {
if deadline.Exceeded() {
return nil, false, fmt.Errorf("timeout exceeded before starting the query processing: %s", deadline.String())
}
// Send the query to all the storage nodes in parallel.
type nodeResult struct {
suffixes []string
err error
}
resultsCh := make(chan nodeResult, len(storageNodes))
for _, sn := range storageNodes {
go func(sn *storageNode) {
sn.tagValueSuffixesRequests.Inc()
suffixes, err := sn.getTagValueSuffixes(at.AccountID, at.ProjectID, tr, tagKey, tagValuePrefix, delimiter, deadline)
if err != nil {
sn.tagValueSuffixesRequestErrors.Inc()
err = fmt.Errorf("cannot get tag value suffixes for tr=%s, tagKey=%q, tagValuePrefix=%q, delimiter=%c from vmstorage %s: %w",
tr.String(), tagKey, tagValuePrefix, delimiter, sn.connPool.Addr(), err)
}
resultsCh <- nodeResult{
suffixes: suffixes,
err: err,
}
}(sn)
}
// Collect results
m := make(map[string]struct{})
var errors []error
for i := 0; i < len(storageNodes); i++ {
// There is no need in timer here, since all the goroutines executing
// sn.getTagValueSuffixes must be finished until the deadline.
nr := <-resultsCh
if nr.err != nil {
errors = append(errors, nr.err)
continue
}
for _, suffix := range nr.suffixes {
m[suffix] = struct{}{}
}
}
isPartialResult := false
if len(errors) > 0 {
if len(errors) == len(storageNodes) {
// Return only the first error, since it has no sense in returning all errors.
return nil, true, fmt.Errorf("error occured during fetching tag value suffixes for tr=%s, tagKey=%q, tagValuePrefix=%q, delimiter=%c: %w",
tr.String(), tagKey, tagValuePrefix, delimiter, errors[0])
}
// Just log errors and return partial results.
// This allows gracefully degrade vmselect in the case
// if certain storageNodes are temporarily unavailable.
partialLabelEntriesResults.Inc()
// Log only the first error, since it has no sense in returning all errors.
logger.Errorf("certain storageNodes are unhealthy when fetching tag value suffixes: %s", errors[0])
isPartialResult = true
}
suffixes := make([]string, 0, len(m))
for suffix := range m {
suffixes = append(suffixes, suffix)
}
return suffixes, isPartialResult, nil
}
// GetLabelEntries returns all the label entries for at until the given deadline.
func GetLabelEntries(at *auth.Token, deadline Deadline) ([]storage.TagEntry, bool, error) {
if deadline.Exceeded() {
@ -1048,6 +1115,12 @@ type storageNode struct {
// The number of errors during requests to labelEntries.
labelEntriesRequestErrors *metrics.Counter
// The number of requests to tagValueSuffixes.
tagValueSuffixesRequests *metrics.Counter
// The number of errors during requests to tagValueSuffixes.
tagValueSuffixesRequestErrors *metrics.Counter
// The number of requests to tsdb status.
tsdbStatusRequests *metrics.Counter
@ -1133,6 +1206,26 @@ func (sn *storageNode) getLabelValues(accountID, projectID uint32, labelName str
return labelValues, nil
}
func (sn *storageNode) getTagValueSuffixes(accountID, projectID uint32, tr storage.TimeRange, tagKey, tagValuePrefix string, delimiter byte, deadline Deadline) ([]string, error) {
var suffixes []string
f := func(bc *handshake.BufferedConn) error {
ss, err := sn.getTagValueSuffixesOnConn(bc, accountID, projectID, tr, tagKey, tagValuePrefix, delimiter)
if err != nil {
return err
}
suffixes = ss
return nil
}
if err := sn.execOnConn("tagValueSuffixes_v1", f, deadline); err != nil {
// Try again before giving up.
suffixes = nil
if err = sn.execOnConn("tagValueSuffixes_v1", f, deadline); err != nil {
return nil, err
}
}
return suffixes, nil
}
func (sn *storageNode) getLabelEntries(accountID, projectID uint32, deadline Deadline) ([]storage.TagEntry, error) {
var tagEntries []storage.TagEntry
f := func(bc *handshake.BufferedConn) error {
@ -1325,11 +1418,8 @@ const maxLabelSize = 16 * 1024 * 1024
func (sn *storageNode) getLabelsOnConn(bc *handshake.BufferedConn, accountID, projectID uint32) ([]string, error) {
// Send the request to sn.
if err := writeUint32(bc, accountID); err != nil {
return nil, fmt.Errorf("cannot send accountID=%d to conn: %w", accountID, err)
}
if err := writeUint32(bc, projectID); err != nil {
return nil, fmt.Errorf("cannot send projectID=%d to conn: %w", projectID, err)
if err := sendAccountIDProjectID(bc, accountID, projectID); err != nil {
return nil, err
}
if err := bc.Flush(); err != nil {
return nil, fmt.Errorf("cannot flush request to conn: %w", err)
@ -1363,11 +1453,8 @@ const maxLabelValueSize = 16 * 1024 * 1024
func (sn *storageNode) getLabelValuesOnConn(bc *handshake.BufferedConn, accountID, projectID uint32, labelName string) ([]string, error) {
// Send the request to sn.
if err := writeUint32(bc, accountID); err != nil {
return nil, fmt.Errorf("cannot send accountID=%d to conn: %w", accountID, err)
}
if err := writeUint32(bc, projectID); err != nil {
return nil, fmt.Errorf("cannot send projectID=%d to conn: %w", projectID, err)
if err := sendAccountIDProjectID(bc, accountID, projectID); err != nil {
return nil, err
}
if err := writeBytes(bc, []byte(labelName)); err != nil {
return nil, fmt.Errorf("cannot send labelName=%q to conn: %w", labelName, err)
@ -1409,13 +1496,26 @@ func readLabelValues(buf []byte, bc *handshake.BufferedConn) ([]string, []byte,
}
}
func (sn *storageNode) getLabelEntriesOnConn(bc *handshake.BufferedConn, accountID, projectID uint32) ([]storage.TagEntry, error) {
func (sn *storageNode) getTagValueSuffixesOnConn(bc *handshake.BufferedConn, accountID, projectID uint32,
tr storage.TimeRange, tagKey, tagValuePrefix string, delimiter byte) ([]string, error) {
// Send the request to sn.
if err := writeUint32(bc, accountID); err != nil {
return nil, fmt.Errorf("cannot send accountID=%d to conn: %w", accountID, err)
if err := sendAccountIDProjectID(bc, accountID, projectID); err != nil {
return nil, err
}
if err := writeUint32(bc, projectID); err != nil {
return nil, fmt.Errorf("cannot send projectID=%d to conn: %w", projectID, err)
if err := writeUint64(bc, uint64(tr.MinTimestamp)); err != nil {
return nil, fmt.Errorf("cannot send minTimestamp=%d to conn: %w", tr.MinTimestamp, err)
}
if err := writeUint64(bc, uint64(tr.MaxTimestamp)); err != nil {
return nil, fmt.Errorf("cannot send maxTimestamp=%d to conn: %w", tr.MaxTimestamp, err)
}
if err := writeBytes(bc, []byte(tagKey)); err != nil {
return nil, fmt.Errorf("cannot send tagKey=%q to conn: %w", tagKey, err)
}
if err := writeBytes(bc, []byte(tagValuePrefix)); err != nil {
return nil, fmt.Errorf("cannot send tagValuePrefix=%q to conn: %w", tagValuePrefix, err)
}
if err := writeByte(bc, delimiter); err != nil {
return nil, fmt.Errorf("cannot send delimiter=%c to conn: %w", delimiter, err)
}
if err := bc.Flush(); err != nil {
return nil, fmt.Errorf("cannot flush request to conn: %w", err)
@ -1430,7 +1530,42 @@ func (sn *storageNode) getLabelEntriesOnConn(bc *handshake.BufferedConn, account
return nil, newErrRemote(buf)
}
// Read response
// Read response.
// The response may contain empty suffix, so it is prepended with the number of the following suffixes.
suffixesCount, err := readUint64(bc)
if err != nil {
return nil, fmt.Errorf("cannot read the number of tag value suffixes: %w", err)
}
suffixes := make([]string, 0, suffixesCount)
for i := 0; i < int(suffixesCount); i++ {
buf, err = readBytes(buf[:0], bc, maxLabelValueSize)
if err != nil {
return nil, fmt.Errorf("cannot read tag value suffix #%d: %w", i+1, err)
}
suffixes = append(suffixes, string(buf))
}
return suffixes, nil
}
func (sn *storageNode) getLabelEntriesOnConn(bc *handshake.BufferedConn, accountID, projectID uint32) ([]storage.TagEntry, error) {
// Send the request to sn.
if err := sendAccountIDProjectID(bc, accountID, projectID); err != nil {
return nil, err
}
if err := bc.Flush(); err != nil {
return nil, fmt.Errorf("cannot flush request to conn: %w", err)
}
// Read response error.
buf, err := readBytes(nil, bc, maxErrorMessageSize)
if err != nil {
return nil, fmt.Errorf("cannot read error message: %w", err)
}
if len(buf) > 0 {
return nil, newErrRemote(buf)
}
// Read response.
var labelEntries []storage.TagEntry
for {
buf, err = readBytes(buf[:0], bc, maxLabelSize)
@ -1456,11 +1591,8 @@ func (sn *storageNode) getLabelEntriesOnConn(bc *handshake.BufferedConn, account
func (sn *storageNode) getTSDBStatusForDateOnConn(bc *handshake.BufferedConn, accountID, projectID uint32, date uint64, topN int) (*storage.TSDBStatus, error) {
// Send the request to sn.
if err := writeUint32(bc, accountID); err != nil {
return nil, fmt.Errorf("cannot send accountID=%d to conn: %w", accountID, err)
}
if err := writeUint32(bc, projectID); err != nil {
return nil, fmt.Errorf("cannot send projectID=%d to conn: %w", projectID, err)
if err := sendAccountIDProjectID(bc, accountID, projectID); err != nil {
return nil, err
}
// date shouldn't exceed 32 bits, so send it as uint32.
if err := writeUint32(bc, uint32(date)); err != nil {
@ -1530,11 +1662,8 @@ func readTopHeapEntries(bc *handshake.BufferedConn) ([]storage.TopHeapEntry, err
func (sn *storageNode) getSeriesCountOnConn(bc *handshake.BufferedConn, accountID, projectID uint32) (uint64, error) {
// Send the request to sn.
if err := writeUint32(bc, accountID); err != nil {
return 0, fmt.Errorf("cannot send accountID=%d to conn: %w", accountID, err)
}
if err := writeUint32(bc, projectID); err != nil {
return 0, fmt.Errorf("cannot send projectID=%d to conn: %w", projectID, err)
if err := sendAccountIDProjectID(bc, accountID, projectID); err != nil {
return 0, err
}
if err := bc.Flush(); err != nil {
return 0, fmt.Errorf("cannot flush seriesCount args to conn: %w", err)
@ -1621,18 +1750,20 @@ func writeBytes(bc *handshake.BufferedConn, buf []byte) error {
if _, err := bc.Write(sizeBuf); err != nil {
return err
}
if _, err := bc.Write(buf); err != nil {
return err
}
return nil
_, err := bc.Write(buf)
return err
}
func writeUint32(bc *handshake.BufferedConn, n uint32) error {
buf := encoding.MarshalUint32(nil, n)
if _, err := bc.Write(buf); err != nil {
return err
}
return nil
_, err := bc.Write(buf)
return err
}
func writeUint64(bc *handshake.BufferedConn, n uint64) error {
buf := encoding.MarshalUint64(nil, n)
_, err := bc.Write(buf)
return err
}
func writeBool(bc *handshake.BufferedConn, b bool) error {
@ -1640,8 +1771,23 @@ func writeBool(bc *handshake.BufferedConn, b bool) error {
if b {
buf[0] = 1
}
if _, err := bc.Write(buf[:]); err != nil {
return err
_, err := bc.Write(buf[:])
return err
}
func writeByte(bc *handshake.BufferedConn, b byte) error {
var buf [1]byte
buf[0] = b
_, err := bc.Write(buf[:])
return err
}
func sendAccountIDProjectID(bc *handshake.BufferedConn, accountID, projectID uint32) error {
if err := writeUint32(bc, accountID); err != nil {
return fmt.Errorf("cannot send accountID=%d to conn: %w", accountID, err)
}
if err := writeUint32(bc, projectID); err != nil {
return fmt.Errorf("cannot send projectID=%d to conn: %w", projectID, err)
}
return nil
}
@ -1689,22 +1835,24 @@ func InitStorageNodes(addrs []string) {
concurrentQueriesCh: make(chan struct{}, maxConcurrentQueriesPerStorageNode),
deleteSeriesRequests: metrics.NewCounter(fmt.Sprintf(`vm_requests_total{action="deleteSeries", type="rpcClient", name="vmselect", addr=%q}`, addr)),
deleteSeriesRequestErrors: metrics.NewCounter(fmt.Sprintf(`vm_request_errors_total{action="deleteSeries", type="rpcClient", name="vmselect", addr=%q}`, addr)),
labelsRequests: metrics.NewCounter(fmt.Sprintf(`vm_requests_total{action="labels", type="rpcClient", name="vmselect", addr=%q}`, addr)),
labelsRequestErrors: metrics.NewCounter(fmt.Sprintf(`vm_request_errors_total{action="labels", type="rpcClient", name="vmselect", addr=%q}`, addr)),
labelValuesRequests: metrics.NewCounter(fmt.Sprintf(`vm_requests_total{action="labelValues", type="rpcClient", name="vmselect", addr=%q}`, addr)),
labelValuesRequestErrors: metrics.NewCounter(fmt.Sprintf(`vm_request_errors_total{action="labelValues", type="rpcClient", name="vmselect", addr=%q}`, addr)),
labelEntriesRequests: metrics.NewCounter(fmt.Sprintf(`vm_requests_total{action="labelEntries", type="rpcClient", name="vmselect", addr=%q}`, addr)),
labelEntriesRequestErrors: metrics.NewCounter(fmt.Sprintf(`vm_request_errors_total{action="labelEntries", type="rpcClient", name="vmselect", addr=%q}`, addr)),
tsdbStatusRequests: metrics.NewCounter(fmt.Sprintf(`vm_requests_total{action="tsdbStatus", type="rpcClient", name="vmselect", addr=%q}`, addr)),
tsdbStatusRequestErrors: metrics.NewCounter(fmt.Sprintf(`vm_request_errors_total{action="tsdbStatus", type="rpcClient", name="vmselect", addr=%q}`, addr)),
seriesCountRequests: metrics.NewCounter(fmt.Sprintf(`vm_requests_total{action="seriesCount", type="rpcClient", name="vmselect", addr=%q}`, addr)),
seriesCountRequestErrors: metrics.NewCounter(fmt.Sprintf(`vm_request_errors_total{action="seriesCount", type="rpcClient", name="vmselect", addr=%q}`, addr)),
searchRequests: metrics.NewCounter(fmt.Sprintf(`vm_requests_total{action="search", type="rpcClient", name="vmselect", addr=%q}`, addr)),
searchRequestErrors: metrics.NewCounter(fmt.Sprintf(`vm_request_errors_total{action="search", type="rpcClient", name="vmselect", addr=%q}`, addr)),
metricBlocksRead: metrics.NewCounter(fmt.Sprintf(`vm_metric_blocks_read_total{name="vmselect", addr=%q}`, addr)),
metricRowsRead: metrics.NewCounter(fmt.Sprintf(`vm_metric_rows_read_total{name="vmselect", addr=%q}`, addr)),
deleteSeriesRequests: metrics.NewCounter(fmt.Sprintf(`vm_requests_total{action="deleteSeries", type="rpcClient", name="vmselect", addr=%q}`, addr)),
deleteSeriesRequestErrors: metrics.NewCounter(fmt.Sprintf(`vm_request_errors_total{action="deleteSeries", type="rpcClient", name="vmselect", addr=%q}`, addr)),
labelsRequests: metrics.NewCounter(fmt.Sprintf(`vm_requests_total{action="labels", type="rpcClient", name="vmselect", addr=%q}`, addr)),
labelsRequestErrors: metrics.NewCounter(fmt.Sprintf(`vm_request_errors_total{action="labels", type="rpcClient", name="vmselect", addr=%q}`, addr)),
labelValuesRequests: metrics.NewCounter(fmt.Sprintf(`vm_requests_total{action="labelValues", type="rpcClient", name="vmselect", addr=%q}`, addr)),
labelValuesRequestErrors: metrics.NewCounter(fmt.Sprintf(`vm_request_errors_total{action="labelValues", type="rpcClient", name="vmselect", addr=%q}`, addr)),
labelEntriesRequests: metrics.NewCounter(fmt.Sprintf(`vm_requests_total{action="labelEntries", type="rpcClient", name="vmselect", addr=%q}`, addr)),
labelEntriesRequestErrors: metrics.NewCounter(fmt.Sprintf(`vm_request_errors_total{action="labelEntries", type="rpcClient", name="vmselect", addr=%q}`, addr)),
tagValueSuffixesRequests: metrics.NewCounter(fmt.Sprintf(`vm_requests_total{action="tagValueSuffixes", type="rpcClient", name="vmselect", addr=%q}`, addr)),
tagValueSuffixesRequestErrors: metrics.NewCounter(fmt.Sprintf(`vm_request_errors_total{action="tagValueSuffixes", type="rpcClient", name="vmselect", addr=%q}`, addr)),
tsdbStatusRequests: metrics.NewCounter(fmt.Sprintf(`vm_requests_total{action="tsdbStatus", type="rpcClient", name="vmselect", addr=%q}`, addr)),
tsdbStatusRequestErrors: metrics.NewCounter(fmt.Sprintf(`vm_request_errors_total{action="tsdbStatus", type="rpcClient", name="vmselect", addr=%q}`, addr)),
seriesCountRequests: metrics.NewCounter(fmt.Sprintf(`vm_requests_total{action="seriesCount", type="rpcClient", name="vmselect", addr=%q}`, addr)),
seriesCountRequestErrors: metrics.NewCounter(fmt.Sprintf(`vm_request_errors_total{action="seriesCount", type="rpcClient", name="vmselect", addr=%q}`, addr)),
searchRequests: metrics.NewCounter(fmt.Sprintf(`vm_requests_total{action="search", type="rpcClient", name="vmselect", addr=%q}`, addr)),
searchRequestErrors: metrics.NewCounter(fmt.Sprintf(`vm_request_errors_total{action="search", type="rpcClient", name="vmselect", addr=%q}`, addr)),
metricBlocksRead: metrics.NewCounter(fmt.Sprintf(`vm_metric_blocks_read_total{name="vmselect", addr=%q}`, addr)),
metricRowsRead: metrics.NewCounter(fmt.Sprintf(`vm_metric_rows_read_total{name="vmselect", addr=%q}`, addr)),
}
metrics.NewGauge(fmt.Sprintf(`vm_concurrent_queries{name="vmselect", addr=%q}`, addr), func() float64 {
return float64(len(sn.concurrentQueriesCh))

View file

@ -8,12 +8,12 @@ import (
"runtime"
"sort"
"strconv"
"strings"
"sync"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/netstorage"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/promql"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/searchutils"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
@ -29,18 +29,15 @@ import (
var (
latencyOffset = flag.Duration("search.latencyOffset", time.Second*30, "The time when data points become visible in query results after the collection. "+
"Too small value can result in incomplete last points for query results")
maxExportDuration = flag.Duration("search.maxExportDuration", time.Hour*24*30, "The maximum duration for /api/v1/export call")
maxQueryDuration = flag.Duration("search.maxQueryDuration", time.Second*30, "The maximum duration for search query execution")
maxQueryLen = flagutil.NewBytes("search.maxQueryLen", 16*1024, "The maximum search query length in bytes")
maxLookback = flag.Duration("search.maxLookback", 0, "Synonim to -search.lookback-delta from Prometheus. "+
maxQueryLen = flagutil.NewBytes("search.maxQueryLen", 16*1024, "The maximum search query length in bytes")
maxLookback = flag.Duration("search.maxLookback", 0, "Synonim to -search.lookback-delta from Prometheus. "+
"The value is dynamically detected from interval between time series datapoints if not set. It can be overridden on per-query basis via max_lookback arg. "+
"See also '-search.maxStalenessInterval' flag, which has the same meaining due to historical reasons")
maxStalenessInterval = flag.Duration("search.maxStalenessInterval", 0, "The maximum interval for staleness calculations. "+
"By default it is automatically calculated from the median interval between samples. This flag could be useful for tuning "+
"Prometheus data model closer to Influx-style data model. See https://prometheus.io/docs/prometheus/latest/querying/basics/#staleness for details. "+
"See also '-search.maxLookback' flag, which has the same meanining due to historical reasons")
denyPartialResponse = flag.Bool("search.denyPartialResponse", false, "Whether to deny partial responses when some of vmstorage nodes are unavailable. This trades consistency over availability")
selectNodes = flagutil.NewArray("selectNode", "Addresses of vmselect nodes; usage: -selectNode=vmselect-host1:8481 -selectNode=vmselect-host2:8481")
selectNodes = flagutil.NewArray("selectNode", "Addresses of vmselect nodes; usage: -selectNode=vmselect-host1:8481 -selectNode=vmselect-host2:8481")
)
// Default step used if not set.
@ -63,15 +60,15 @@ func FederateHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter,
if lookbackDelta <= 0 {
lookbackDelta = defaultStep
}
start, err := getTime(r, "start", ct-lookbackDelta)
start, err := searchutils.GetTime(r, "start", ct-lookbackDelta)
if err != nil {
return err
}
end, err := getTime(r, "end", ct)
end, err := searchutils.GetTime(r, "end", ct)
if err != nil {
return err
}
deadline := getDeadlineForQuery(r, startTime)
deadline := searchutils.GetDeadlineForQuery(r, startTime)
if start >= end {
start = end - defaultStep
}
@ -90,7 +87,7 @@ func FederateHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter,
if err != nil {
return fmt.Errorf("cannot fetch data for %q: %w", sq, err)
}
if isPartial && getDenyPartialResponse(r) {
if isPartial && searchutils.GetDenyPartialResponse(r) {
rss.Cancel()
return fmt.Errorf("cannot return full response, since some of vmstorage nodes are unavailable")
}
@ -138,17 +135,17 @@ func ExportHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter, r
}
matches = []string{match}
}
start, err := getTime(r, "start", 0)
start, err := searchutils.GetTime(r, "start", 0)
if err != nil {
return err
}
end, err := getTime(r, "end", ct)
end, err := searchutils.GetTime(r, "end", ct)
if err != nil {
return err
}
format := r.FormValue("format")
maxRowsPerLine := int(fastfloat.ParseInt64BestEffort(r.FormValue("max_rows_per_line")))
deadline := getDeadlineForExport(r, startTime)
deadline := searchutils.GetDeadlineForExport(r, startTime)
if start >= end {
end = start + defaultStep
}
@ -230,7 +227,7 @@ func exportHandler(at *auth.Token, w http.ResponseWriter, r *http.Request, match
if err != nil {
return fmt.Errorf("cannot fetch data for %q: %w", sq, err)
}
if isPartial && getDenyPartialResponse(r) {
if isPartial && searchutils.GetDenyPartialResponse(r) {
rss.Cancel()
return fmt.Errorf("cannot return full response, since some of vmstorage nodes are unavailable")
}
@ -274,7 +271,7 @@ func DeleteHandler(startTime time.Time, at *auth.Token, r *http.Request) error {
if len(matches) == 0 {
return fmt.Errorf("missing `match[]` arg")
}
deadline := getDeadlineForQuery(r, startTime)
deadline := searchutils.GetDeadlineForQuery(r, startTime)
tagFilterss, err := getTagFilterssFromMatches(matches)
if err != nil {
return err
@ -336,7 +333,7 @@ var httpClient = &http.Client{
//
// See https://prometheus.io/docs/prometheus/latest/querying/api/#querying-label-values
func LabelValuesHandler(startTime time.Time, at *auth.Token, labelName string, w http.ResponseWriter, r *http.Request) error {
deadline := getDeadlineForQuery(r, startTime)
deadline := searchutils.GetDeadlineForQuery(r, startTime)
if err := r.ParseForm(); err != nil {
return fmt.Errorf("cannot parse form values: %w", err)
}
@ -358,11 +355,11 @@ func LabelValuesHandler(startTime time.Time, at *auth.Token, labelName string, w
matches = []string{fmt.Sprintf("{%s!=''}", labelName)}
}
ct := startTime.UnixNano() / 1e6
end, err := getTime(r, "end", ct)
end, err := searchutils.GetTime(r, "end", ct)
if err != nil {
return err
}
start, err := getTime(r, "start", end-defaultStep)
start, err := searchutils.GetTime(r, "start", end-defaultStep)
if err != nil {
return err
}
@ -371,7 +368,7 @@ func LabelValuesHandler(startTime time.Time, at *auth.Token, labelName string, w
return fmt.Errorf("cannot obtain label values for %q, match[]=%q, start=%d, end=%d: %w", labelName, matches, start, end, err)
}
}
if isPartial && getDenyPartialResponse(r) {
if isPartial && searchutils.GetDenyPartialResponse(r) {
return fmt.Errorf("cannot return full response, since some of vmstorage nodes are unavailable")
}
@ -444,12 +441,12 @@ var labelValuesDuration = metrics.NewSummary(`vm_request_duration_seconds{path="
// LabelsCountHandler processes /api/v1/labels/count request.
func LabelsCountHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter, r *http.Request) error {
deadline := getDeadlineForQuery(r, startTime)
deadline := searchutils.GetDeadlineForQuery(r, startTime)
labelEntries, isPartial, err := netstorage.GetLabelEntries(at, deadline)
if err != nil {
return fmt.Errorf(`cannot obtain label entries: %w`, err)
}
if isPartial && getDenyPartialResponse(r) {
if isPartial && searchutils.GetDenyPartialResponse(r) {
return fmt.Errorf("cannot return full response, since some of vmstorage nodes are unavailable")
}
w.Header().Set("Content-Type", "application/json")
@ -466,7 +463,7 @@ const secsPerDay = 3600 * 24
//
// See https://prometheus.io/docs/prometheus/latest/querying/api/#tsdb-stats
func TSDBStatusHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter, r *http.Request) error {
deadline := getDeadlineForQuery(r, startTime)
deadline := searchutils.GetDeadlineForQuery(r, startTime)
if err := r.ParseForm(); err != nil {
return fmt.Errorf("cannot parse form values: %w", err)
}
@ -498,7 +495,7 @@ func TSDBStatusHandler(startTime time.Time, at *auth.Token, w http.ResponseWrite
if err != nil {
return fmt.Errorf(`cannot obtain tsdb status for date=%d, topN=%d: %w`, date, topN, err)
}
if isPartial && getDenyPartialResponse(r) {
if isPartial && searchutils.GetDenyPartialResponse(r) {
return fmt.Errorf("cannot return full response, since some of vmstorage nodes are unavailable")
}
w.Header().Set("Content-Type", "application/json")
@ -513,7 +510,7 @@ var tsdbStatusDuration = metrics.NewSummary(`vm_request_duration_seconds{path="/
//
// See https://prometheus.io/docs/prometheus/latest/querying/api/#getting-label-names
func LabelsHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter, r *http.Request) error {
deadline := getDeadlineForQuery(r, startTime)
deadline := searchutils.GetDeadlineForQuery(r, startTime)
if err := r.ParseForm(); err != nil {
return fmt.Errorf("cannot parse form values: %w", err)
}
@ -533,11 +530,11 @@ func LabelsHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter, r
matches = []string{"{__name__!=''}"}
}
ct := startTime.UnixNano() / 1e6
end, err := getTime(r, "end", ct)
end, err := searchutils.GetTime(r, "end", ct)
if err != nil {
return err
}
start, err := getTime(r, "start", end-defaultStep)
start, err := searchutils.GetTime(r, "start", end-defaultStep)
if err != nil {
return err
}
@ -546,7 +543,7 @@ func LabelsHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter, r
return fmt.Errorf("cannot obtain labels for match[]=%q, start=%d, end=%d: %w", matches, start, end, err)
}
}
if isPartial && getDenyPartialResponse(r) {
if isPartial && searchutils.GetDenyPartialResponse(r) {
return fmt.Errorf("cannot return full response, since some of vmstorage nodes are unavailable")
}
@ -607,12 +604,12 @@ var labelsDuration = metrics.NewSummary(`vm_request_duration_seconds{path="/api/
// SeriesCountHandler processes /api/v1/series/count request.
func SeriesCountHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter, r *http.Request) error {
deadline := getDeadlineForQuery(r, startTime)
deadline := searchutils.GetDeadlineForQuery(r, startTime)
n, isPartial, err := netstorage.GetSeriesCount(at, deadline)
if err != nil {
return fmt.Errorf("cannot obtain series count: %w", err)
}
if isPartial && getDenyPartialResponse(r) {
if isPartial && searchutils.GetDenyPartialResponse(r) {
return fmt.Errorf("cannot return full response, since some of vmstorage nodes are unavailable")
}
@ -636,20 +633,20 @@ func SeriesHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter, r
if len(matches) == 0 {
return fmt.Errorf("missing `match[]` arg")
}
end, err := getTime(r, "end", ct)
end, err := searchutils.GetTime(r, "end", ct)
if err != nil {
return err
}
// Do not set start to minTimeMsecs by default as Prometheus does,
// Do not set start to searchutils.minTimeMsecs by default as Prometheus does,
// since this leads to fetching and scanning all the data from the storage,
// which can take a lot of time for big storages.
// It is better setting start as end-defaultStep by default.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/91
start, err := getTime(r, "start", end-defaultStep)
start, err := searchutils.GetTime(r, "start", end-defaultStep)
if err != nil {
return err
}
deadline := getDeadlineForQuery(r, startTime)
deadline := searchutils.GetDeadlineForQuery(r, startTime)
tagFilterss, err := getTagFilterssFromMatches(matches)
if err != nil {
@ -669,7 +666,7 @@ func SeriesHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter, r
if err != nil {
return fmt.Errorf("cannot fetch data for %q: %w", sq, err)
}
if isPartial && getDenyPartialResponse(r) {
if isPartial && searchutils.GetDenyPartialResponse(r) {
rss.Cancel()
return fmt.Errorf("cannot return full response, since some of vmstorage nodes are unavailable")
}
@ -713,7 +710,7 @@ func QueryHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter, r
if len(query) == 0 {
return fmt.Errorf("missing `query` arg")
}
start, err := getTime(r, "time", ct)
start, err := searchutils.GetTime(r, "time", ct)
if err != nil {
return err
}
@ -721,20 +718,20 @@ func QueryHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter, r
if err != nil {
return err
}
step, err := getDuration(r, "step", lookbackDelta)
step, err := searchutils.GetDuration(r, "step", lookbackDelta)
if err != nil {
return err
}
if step <= 0 {
step = defaultStep
}
deadline := getDeadlineForQuery(r, startTime)
deadline := searchutils.GetDeadlineForQuery(r, startTime)
if len(query) > maxQueryLen.N {
return fmt.Errorf("too long query; got %d bytes; mustn't exceed `-search.maxQueryLen=%d` bytes", len(query), maxQueryLen.N)
}
queryOffset := getLatencyOffsetMilliseconds()
if !getBool(r, "nocache") && ct-start < queryOffset {
if !searchutils.GetBool(r, "nocache") && ct-start < queryOffset {
// Adjust start time only if `nocache` arg isn't set.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/241
start = ct - queryOffset
@ -792,7 +789,7 @@ func QueryHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter, r
Deadline: deadline,
LookbackDelta: lookbackDelta,
DenyPartialResponse: getDenyPartialResponse(r),
DenyPartialResponse: searchutils.GetDenyPartialResponse(r),
}
result, err := promql.Exec(&ec, query, true)
if err != nil {
@ -830,15 +827,15 @@ func QueryRangeHandler(startTime time.Time, at *auth.Token, w http.ResponseWrite
if len(query) == 0 {
return fmt.Errorf("missing `query` arg")
}
start, err := getTime(r, "start", ct-defaultStep)
start, err := searchutils.GetTime(r, "start", ct-defaultStep)
if err != nil {
return err
}
end, err := getTime(r, "end", ct)
end, err := searchutils.GetTime(r, "end", ct)
if err != nil {
return err
}
step, err := getDuration(r, "step", defaultStep)
step, err := searchutils.GetDuration(r, "step", defaultStep)
if err != nil {
return err
}
@ -850,8 +847,8 @@ func QueryRangeHandler(startTime time.Time, at *auth.Token, w http.ResponseWrite
}
func queryRangeHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter, query string, start, end, step int64, r *http.Request, ct int64) error {
deadline := getDeadlineForQuery(r, startTime)
mayCache := !getBool(r, "nocache")
deadline := searchutils.GetDeadlineForQuery(r, startTime)
mayCache := !searchutils.GetBool(r, "nocache")
lookbackDelta, err := getMaxLookback(r)
if err != nil {
return err
@ -881,7 +878,7 @@ func queryRangeHandler(startTime time.Time, at *auth.Token, w http.ResponseWrite
MayCache: mayCache,
LookbackDelta: lookbackDelta,
DenyPartialResponse: getDenyPartialResponse(r),
DenyPartialResponse: searchutils.GetDenyPartialResponse(r),
}
result, err := promql.Exec(&ec, query, false)
if err != nil {
@ -974,120 +971,12 @@ func adjustLastPoints(tss []netstorage.Result, start, end int64) []netstorage.Re
return tss
}
func getTime(r *http.Request, argKey string, defaultValue int64) (int64, error) {
argValue := r.FormValue(argKey)
if len(argValue) == 0 {
return defaultValue, nil
}
secs, err := strconv.ParseFloat(argValue, 64)
if err != nil {
// Try parsing string format
t, err := time.Parse(time.RFC3339, argValue)
if err != nil {
// Handle Prometheus'-provided minTime and maxTime.
// See https://github.com/prometheus/client_golang/issues/614
switch argValue {
case prometheusMinTimeFormatted:
return minTimeMsecs, nil
case prometheusMaxTimeFormatted:
return maxTimeMsecs, nil
}
// Try parsing duration relative to the current time
d, err1 := metricsql.DurationValue(argValue, 0)
if err1 != nil {
return 0, fmt.Errorf("cannot parse %q=%q: %w", argKey, argValue, err)
}
if d > 0 {
d = -d
}
t = time.Now().Add(time.Duration(d) * time.Millisecond)
}
secs = float64(t.UnixNano()) / 1e9
}
msecs := int64(secs * 1e3)
if msecs < minTimeMsecs {
msecs = 0
}
if msecs > maxTimeMsecs {
msecs = maxTimeMsecs
}
return msecs, nil
}
var (
// These constants were obtained from https://github.com/prometheus/prometheus/blob/91d7175eaac18b00e370965f3a8186cc40bf9f55/web/api/v1/api.go#L442
// See https://github.com/prometheus/client_golang/issues/614 for details.
prometheusMinTimeFormatted = time.Unix(math.MinInt64/1000+62135596801, 0).UTC().Format(time.RFC3339Nano)
prometheusMaxTimeFormatted = time.Unix(math.MaxInt64/1000-62135596801, 999999999).UTC().Format(time.RFC3339Nano)
)
const (
// These values prevent from overflow when storing msec-precision time in int64.
minTimeMsecs = 0 // use 0 instead of `int64(-1<<63) / 1e6` because the storage engine doesn't actually support negative time
maxTimeMsecs = int64(1<<63-1) / 1e6
)
func getDuration(r *http.Request, argKey string, defaultValue int64) (int64, error) {
argValue := r.FormValue(argKey)
if len(argValue) == 0 {
return defaultValue, nil
}
secs, err := strconv.ParseFloat(argValue, 64)
if err != nil {
// Try parsing string format
d, err := metricsql.DurationValue(argValue, 0)
if err != nil {
return 0, fmt.Errorf("cannot parse %q=%q: %w", argKey, argValue, err)
}
secs = float64(d) / 1000
}
msecs := int64(secs * 1e3)
if msecs <= 0 || msecs > maxDurationMsecs {
return 0, fmt.Errorf("%q=%dms is out of allowed range [%d ... %d]", argKey, msecs, 0, int64(maxDurationMsecs))
}
return msecs, nil
}
const maxDurationMsecs = 100 * 365 * 24 * 3600 * 1000
func getMaxLookback(r *http.Request) (int64, error) {
d := maxLookback.Milliseconds()
if d == 0 {
d = maxStalenessInterval.Milliseconds()
}
return getDuration(r, "max_lookback", d)
}
func getDeadlineForQuery(r *http.Request, startTime time.Time) netstorage.Deadline {
dMax := maxQueryDuration.Milliseconds()
return getDeadlineWithMaxDuration(r, startTime, dMax, "-search.maxQueryDuration")
}
func getDeadlineForExport(r *http.Request, startTime time.Time) netstorage.Deadline {
dMax := maxExportDuration.Milliseconds()
return getDeadlineWithMaxDuration(r, startTime, dMax, "-search.maxExportDuration")
}
func getDeadlineWithMaxDuration(r *http.Request, startTime time.Time, dMax int64, flagHint string) netstorage.Deadline {
d, err := getDuration(r, "timeout", 0)
if err != nil {
d = 0
}
if d <= 0 || d > dMax {
d = dMax
}
timeout := time.Duration(d) * time.Millisecond
return netstorage.NewDeadline(startTime, timeout, flagHint)
}
func getBool(r *http.Request, argKey string) bool {
argValue := r.FormValue(argKey)
switch strings.ToLower(argValue) {
case "", "0", "f", "false", "no":
return false
default:
return true
}
return searchutils.GetDuration(r, "max_lookback", d)
}
func getTagFilterssFromMatches(matches []string) ([][]storage.TagFilter, error) {
@ -1109,10 +998,3 @@ func getLatencyOffsetMilliseconds() int64 {
}
return d
}
func getDenyPartialResponse(r *http.Request) bool {
if *denyPartialResponse {
return true
}
return getBool(r, "deny_partial_response")
}

View file

@ -1,10 +1,7 @@
package prometheus
import (
"fmt"
"math"
"net/http"
"net/url"
"reflect"
"testing"
@ -50,76 +47,6 @@ func TestRemoveEmptyValuesAndTimeseries(t *testing.T) {
})
}
func TestGetTimeSuccess(t *testing.T) {
f := func(s string, timestampExpected int64) {
t.Helper()
urlStr := fmt.Sprintf("http://foo.bar/baz?s=%s", url.QueryEscape(s))
r, err := http.NewRequest("GET", urlStr, nil)
if err != nil {
t.Fatalf("unexpected error in NewRequest: %s", err)
}
// Verify defaultValue
ts, err := getTime(r, "foo", 123)
if err != nil {
t.Fatalf("unexpected error when obtaining default time from getTime(%q): %s", s, err)
}
if ts != 123 {
t.Fatalf("unexpected default value for getTime(%q); got %d; want %d", s, ts, 123)
}
// Verify timestampExpected
ts, err = getTime(r, "s", 123)
if err != nil {
t.Fatalf("unexpected error in getTime(%q): %s", s, err)
}
if ts != timestampExpected {
t.Fatalf("unexpected timestamp for getTime(%q); got %d; want %d", s, ts, timestampExpected)
}
}
f("2019-07-07T20:01:02Z", 1562529662000)
f("2019-07-07T20:47:40+03:00", 1562521660000)
f("-292273086-05-16T16:47:06Z", minTimeMsecs)
f("292277025-08-18T07:12:54.999999999Z", maxTimeMsecs)
f("1562529662.324", 1562529662324)
f("-9223372036.854", minTimeMsecs)
f("-9223372036.855", minTimeMsecs)
f("9223372036.855", maxTimeMsecs)
}
func TestGetTimeError(t *testing.T) {
f := func(s string) {
t.Helper()
urlStr := fmt.Sprintf("http://foo.bar/baz?s=%s", url.QueryEscape(s))
r, err := http.NewRequest("GET", urlStr, nil)
if err != nil {
t.Fatalf("unexpected error in NewRequest: %s", err)
}
// Verify defaultValue
ts, err := getTime(r, "foo", 123)
if err != nil {
t.Fatalf("unexpected error when obtaining default time from getTime(%q): %s", s, err)
}
if ts != 123 {
t.Fatalf("unexpected default value for getTime(%q); got %d; want %d", s, ts, 123)
}
// Verify timestampExpected
_, err = getTime(r, "s", 123)
if err == nil {
t.Fatalf("expecting non-nil error in getTime(%q)", s)
}
}
f("foo")
f("2019-07-07T20:01:02Zisdf")
f("2019-07-07T20:47:40+03:00123")
f("-292273086-05-16T16:47:07Z")
f("292277025-08-18T07:12:54.999999998Z")
}
func TestAdjustLastPoints(t *testing.T) {
f := func(tss []netstorage.Result, start, end int64, tssExpected []netstorage.Result) {
t.Helper()

View file

@ -0,0 +1,141 @@
package searchutils
import (
"flag"
"fmt"
"math"
"net/http"
"strconv"
"strings"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/netstorage"
"github.com/VictoriaMetrics/metricsql"
)
var (
maxExportDuration = flag.Duration("search.maxExportDuration", time.Hour*24*30, "The maximum duration for /api/v1/export call")
maxQueryDuration = flag.Duration("search.maxQueryDuration", time.Second*30, "The maximum duration for search query execution")
denyPartialResponse = flag.Bool("search.denyPartialResponse", false, "Whether to deny partial responses when some of vmstorage nodes are unavailable. This trades consistency over availability")
)
// GetTime returns time from the given argKey query arg.
func GetTime(r *http.Request, argKey string, defaultValue int64) (int64, error) {
argValue := r.FormValue(argKey)
if len(argValue) == 0 {
return defaultValue, nil
}
secs, err := strconv.ParseFloat(argValue, 64)
if err != nil {
// Try parsing string format
t, err := time.Parse(time.RFC3339, argValue)
if err != nil {
// Handle Prometheus'-provided minTime and maxTime.
// See https://github.com/prometheus/client_golang/issues/614
switch argValue {
case prometheusMinTimeFormatted:
return minTimeMsecs, nil
case prometheusMaxTimeFormatted:
return maxTimeMsecs, nil
}
// Try parsing duration relative to the current time
d, err1 := metricsql.DurationValue(argValue, 0)
if err1 != nil {
return 0, fmt.Errorf("cannot parse %q=%q: %w", argKey, argValue, err)
}
if d > 0 {
d = -d
}
t = time.Now().Add(time.Duration(d) * time.Millisecond)
}
secs = float64(t.UnixNano()) / 1e9
}
msecs := int64(secs * 1e3)
if msecs < minTimeMsecs {
msecs = 0
}
if msecs > maxTimeMsecs {
msecs = maxTimeMsecs
}
return msecs, nil
}
var (
// These constants were obtained from https://github.com/prometheus/prometheus/blob/91d7175eaac18b00e370965f3a8186cc40bf9f55/web/api/v1/api.go#L442
// See https://github.com/prometheus/client_golang/issues/614 for details.
prometheusMinTimeFormatted = time.Unix(math.MinInt64/1000+62135596801, 0).UTC().Format(time.RFC3339Nano)
prometheusMaxTimeFormatted = time.Unix(math.MaxInt64/1000-62135596801, 999999999).UTC().Format(time.RFC3339Nano)
)
const (
// These values prevent from overflow when storing msec-precision time in int64.
minTimeMsecs = 0 // use 0 instead of `int64(-1<<63) / 1e6` because the storage engine doesn't actually support negative time
maxTimeMsecs = int64(1<<63-1) / 1e6
)
// GetDuration returns duration from the given argKey query arg.
func GetDuration(r *http.Request, argKey string, defaultValue int64) (int64, error) {
argValue := r.FormValue(argKey)
if len(argValue) == 0 {
return defaultValue, nil
}
secs, err := strconv.ParseFloat(argValue, 64)
if err != nil {
// Try parsing string format
d, err := metricsql.DurationValue(argValue, 0)
if err != nil {
return 0, fmt.Errorf("cannot parse %q=%q: %w", argKey, argValue, err)
}
secs = float64(d) / 1000
}
msecs := int64(secs * 1e3)
if msecs <= 0 || msecs > maxDurationMsecs {
return 0, fmt.Errorf("%q=%dms is out of allowed range [%d ... %d]", argKey, msecs, 0, int64(maxDurationMsecs))
}
return msecs, nil
}
const maxDurationMsecs = 100 * 365 * 24 * 3600 * 1000
// GetDeadlineForQuery returns deadline for the given query r.
func GetDeadlineForQuery(r *http.Request, startTime time.Time) netstorage.Deadline {
dMax := maxQueryDuration.Milliseconds()
return getDeadlineWithMaxDuration(r, startTime, dMax, "-search.maxQueryDuration")
}
// GetDeadlineForExport returns deadline for the given request to /api/v1/export.
func GetDeadlineForExport(r *http.Request, startTime time.Time) netstorage.Deadline {
dMax := maxExportDuration.Milliseconds()
return getDeadlineWithMaxDuration(r, startTime, dMax, "-search.maxExportDuration")
}
func getDeadlineWithMaxDuration(r *http.Request, startTime time.Time, dMax int64, flagHint string) netstorage.Deadline {
d, err := GetDuration(r, "timeout", 0)
if err != nil {
d = 0
}
if d <= 0 || d > dMax {
d = dMax
}
timeout := time.Duration(d) * time.Millisecond
return netstorage.NewDeadline(startTime, timeout, flagHint)
}
// GetBool returns boolean value from the given argKey query arg.
func GetBool(r *http.Request, argKey string) bool {
argValue := r.FormValue(argKey)
switch strings.ToLower(argValue) {
case "", "0", "f", "false", "no":
return false
default:
return true
}
}
// GetDenyPartialResponse returns whether partial responses are denied.
func GetDenyPartialResponse(r *http.Request) bool {
if *denyPartialResponse {
return true
}
return GetBool(r, "deny_partial_response")
}

View file

@ -0,0 +1,78 @@
package searchutils
import (
"fmt"
"net/http"
"net/url"
"testing"
)
func TestGetTimeSuccess(t *testing.T) {
f := func(s string, timestampExpected int64) {
t.Helper()
urlStr := fmt.Sprintf("http://foo.bar/baz?s=%s", url.QueryEscape(s))
r, err := http.NewRequest("GET", urlStr, nil)
if err != nil {
t.Fatalf("unexpected error in NewRequest: %s", err)
}
// Verify defaultValue
ts, err := GetTime(r, "foo", 123)
if err != nil {
t.Fatalf("unexpected error when obtaining default time from GetTime(%q): %s", s, err)
}
if ts != 123 {
t.Fatalf("unexpected default value for GetTime(%q); got %d; want %d", s, ts, 123)
}
// Verify timestampExpected
ts, err = GetTime(r, "s", 123)
if err != nil {
t.Fatalf("unexpected error in GetTime(%q): %s", s, err)
}
if ts != timestampExpected {
t.Fatalf("unexpected timestamp for GetTime(%q); got %d; want %d", s, ts, timestampExpected)
}
}
f("2019-07-07T20:01:02Z", 1562529662000)
f("2019-07-07T20:47:40+03:00", 1562521660000)
f("-292273086-05-16T16:47:06Z", minTimeMsecs)
f("292277025-08-18T07:12:54.999999999Z", maxTimeMsecs)
f("1562529662.324", 1562529662324)
f("-9223372036.854", minTimeMsecs)
f("-9223372036.855", minTimeMsecs)
f("9223372036.855", maxTimeMsecs)
}
func TestGetTimeError(t *testing.T) {
f := func(s string) {
t.Helper()
urlStr := fmt.Sprintf("http://foo.bar/baz?s=%s", url.QueryEscape(s))
r, err := http.NewRequest("GET", urlStr, nil)
if err != nil {
t.Fatalf("unexpected error in NewRequest: %s", err)
}
// Verify defaultValue
ts, err := GetTime(r, "foo", 123)
if err != nil {
t.Fatalf("unexpected error when obtaining default time from GetTime(%q): %s", s, err)
}
if ts != 123 {
t.Fatalf("unexpected default value for GetTime(%q); got %d; want %d", s, ts, 123)
}
// Verify timestampExpected
_, err = GetTime(r, "s", 123)
if err == nil {
t.Fatalf("expecting non-nil error in GetTime(%q)", s)
}
}
f("foo")
f("2019-07-07T20:01:02Zisdf")
f("2019-07-07T20:47:40+03:00123")
f("-292273086-05-16T16:47:07Z")
f("292277025-08-18T07:12:54.999999998Z")
}

View file

@ -24,9 +24,10 @@ import (
)
var (
maxTagKeysPerSearch = flag.Int("search.maxTagKeys", 100e3, "The maximum number of tag keys returned per search")
maxTagValuesPerSearch = flag.Int("search.maxTagValues", 100e3, "The maximum number of tag values returned per search")
maxMetricsPerSearch = flag.Int("search.maxUniqueTimeseries", 300e3, "The maximum number of unique time series each search can scan")
maxTagKeysPerSearch = flag.Int("search.maxTagKeys", 100e3, "The maximum number of tag keys returned per search")
maxTagValuesPerSearch = flag.Int("search.maxTagValues", 100e3, "The maximum number of tag values returned per search")
maxTagValueSuffixesPerSearch = flag.Int("search.maxTagValueSuffixesPerSearch", 100e3, "The maximum number of tag value suffixes returned from /metrics/find")
maxMetricsPerSearch = flag.Int("search.maxUniqueTimeseries", 300e3, "The maximum number of unique time series each search can scan")
precisionBits = flag.Int("precisionBits", 64, "The number of precision bits to store per each value. Lower precision bits improves data compression at the cost of precision loss")
disableRPCCompression = flag.Bool(`rpc.disableCompression`, false, "Disable compression of RPC traffic. This reduces CPU usage at the cost of higher network bandwidth usage")
@ -422,6 +423,30 @@ func (ctx *vmselectRequestCtx) readUint32() (uint32, error) {
return n, nil
}
func (ctx *vmselectRequestCtx) readUint64() (uint64, error) {
ctx.sizeBuf = bytesutil.Resize(ctx.sizeBuf, 8)
if _, err := io.ReadFull(ctx.bc, ctx.sizeBuf); err != nil {
if err == io.EOF {
return 0, err
}
return 0, fmt.Errorf("cannot read uint64: %w", err)
}
n := encoding.UnmarshalUint64(ctx.sizeBuf)
return n, nil
}
func (ctx *vmselectRequestCtx) readAccountIDProjectID() (uint32, uint32, error) {
accountID, err := ctx.readUint32()
if err != nil {
return 0, 0, fmt.Errorf("cannot read accountID: %w", err)
}
projectID, err := ctx.readUint32()
if err != nil {
return 0, 0, fmt.Errorf("cannot read projectID: %w", err)
}
return accountID, projectID, nil
}
func (ctx *vmselectRequestCtx) readDataBufBytes(maxDataSize int) error {
ctx.sizeBuf = bytesutil.Resize(ctx.sizeBuf, 8)
if _, err := io.ReadFull(ctx.bc, ctx.sizeBuf); err != nil {
@ -456,6 +481,18 @@ func (ctx *vmselectRequestCtx) readBool() (bool, error) {
return v, nil
}
func (ctx *vmselectRequestCtx) readByte() (byte, error) {
ctx.dataBuf = bytesutil.Resize(ctx.dataBuf, 1)
if _, err := io.ReadFull(ctx.bc, ctx.dataBuf); err != nil {
if err == io.EOF {
return 0, err
}
return 0, fmt.Errorf("cannot read byte: %w", err)
}
b := ctx.dataBuf[0]
return b, nil
}
func (ctx *vmselectRequestCtx) writeDataBufBytes() error {
if err := ctx.writeUint64(uint64(len(ctx.dataBuf))); err != nil {
return fmt.Errorf("cannot write data size: %w", err)
@ -538,6 +575,8 @@ func (s *Server) processVMSelectRequest(ctx *vmselectRequestCtx) error {
return s.processVMSelectSearchQuery(ctx)
case "labelValues_v2":
return s.processVMSelectLabelValues(ctx)
case "tagValueSuffixes_v1":
return s.processVMSelectTagValueSuffixes(ctx)
case "labelEntries_v2":
return s.processVMSelectLabelEntries(ctx)
case "labels_v2":
@ -596,13 +635,9 @@ func (s *Server) processVMSelectLabels(ctx *vmselectRequestCtx) error {
vmselectLabelsRequests.Inc()
// Read request
accountID, err := ctx.readUint32()
accountID, projectID, err := ctx.readAccountIDProjectID()
if err != nil {
return fmt.Errorf("cannot read accountID: %w", err)
}
projectID, err := ctx.readUint32()
if err != nil {
return fmt.Errorf("cannot read projectID: %w", err)
return err
}
// Search for tag keys
@ -640,13 +675,9 @@ func (s *Server) processVMSelectLabelValues(ctx *vmselectRequestCtx) error {
vmselectLabelValuesRequests.Inc()
// Read request
accountID, err := ctx.readUint32()
accountID, projectID, err := ctx.readAccountIDProjectID()
if err != nil {
return fmt.Errorf("cannot read accountID: %w", err)
}
projectID, err := ctx.readUint32()
if err != nil {
return fmt.Errorf("cannot read projectID: %w", err)
return err
}
if err := ctx.readDataBufBytes(maxLabelValueSize); err != nil {
return fmt.Errorf("cannot read labelName: %w", err)
@ -667,6 +698,63 @@ func (s *Server) processVMSelectLabelValues(ctx *vmselectRequestCtx) error {
return writeLabelValues(ctx, labelValues)
}
func (s *Server) processVMSelectTagValueSuffixes(ctx *vmselectRequestCtx) error {
vmselectTagValueSuffixesRequests.Inc()
// read request
accountID, projectID, err := ctx.readAccountIDProjectID()
if err != nil {
return err
}
minTimestamp, err := ctx.readUint64()
if err != nil {
return fmt.Errorf("cannot read minTimestamp: %w", err)
}
maxTimestamp, err := ctx.readUint64()
if err != nil {
return fmt.Errorf("cannot read maxTimestamp: %w", err)
}
if err := ctx.readDataBufBytes(maxLabelValueSize); err != nil {
return fmt.Errorf("cannot read tagKey: %w", err)
}
tagKey := append([]byte{}, ctx.dataBuf...)
if err := ctx.readDataBufBytes(maxLabelValueSize); err != nil {
return fmt.Errorf("cannot read tagValuePrefix: %w", err)
}
tagValuePrefix := append([]byte{}, ctx.dataBuf...)
delimiter, err := ctx.readByte()
if err != nil {
return fmt.Errorf("cannot read delimiter: %s", err)
}
// Search for tag value suffixes
tr := storage.TimeRange{
MinTimestamp: int64(minTimestamp),
MaxTimestamp: int64(maxTimestamp),
}
suffixes, err := s.storage.SearchTagValueSuffixes(accountID, projectID, tr, tagKey, tagValuePrefix, delimiter, *maxTagValueSuffixesPerSearch, ctx.deadline)
if err != nil {
return ctx.writeErrorMessage(err)
}
// Send an empty error message to vmselect.
if err := ctx.writeString(""); err != nil {
return fmt.Errorf("cannot send empty error message: %w", err)
}
// Send suffixes to vmselect.
// Suffixes may contain empty string, so prepend suffixes with suffixCount.
if err := ctx.writeUint64(uint64(len(suffixes))); err != nil {
return fmt.Errorf("cannot write suffixesCount: %w", err)
}
for i, suffix := range suffixes {
if err := ctx.writeString(suffix); err != nil {
return fmt.Errorf("cannot write suffix #%d: %w", i+1, err)
}
}
return nil
}
func writeLabelValues(ctx *vmselectRequestCtx, labelValues []string) error {
for _, labelValue := range labelValues {
if len(labelValue) == 0 {
@ -688,13 +776,9 @@ func (s *Server) processVMSelectLabelEntries(ctx *vmselectRequestCtx) error {
vmselectLabelEntriesRequests.Inc()
// Read request
accountID, err := ctx.readUint32()
accountID, projectID, err := ctx.readAccountIDProjectID()
if err != nil {
return fmt.Errorf("cannot read accountID: %w", err)
}
projectID, err := ctx.readUint32()
if err != nil {
return fmt.Errorf("cannot read projectID: %w", err)
return err
}
// Perform the request
@ -735,13 +819,9 @@ func (s *Server) processVMSelectSeriesCount(ctx *vmselectRequestCtx) error {
vmselectSeriesCountRequests.Inc()
// Read request
accountID, err := ctx.readUint32()
accountID, projectID, err := ctx.readAccountIDProjectID()
if err != nil {
return fmt.Errorf("cannot read accountID: %w", err)
}
projectID, err := ctx.readUint32()
if err != nil {
return fmt.Errorf("cannot read projectID: %w", err)
return err
}
// Execute the request
@ -766,13 +846,9 @@ func (s *Server) processVMSelectTSDBStatus(ctx *vmselectRequestCtx) error {
vmselectTSDBStatusRequests.Inc()
// Read request
accountID, err := ctx.readUint32()
accountID, projectID, err := ctx.readAccountIDProjectID()
if err != nil {
return fmt.Errorf("cannot read accountID: %w", err)
}
projectID, err := ctx.readUint32()
if err != nil {
return fmt.Errorf("cannot read projectID: %w", err)
return err
}
date, err := ctx.readUint32()
if err != nil {
@ -907,15 +983,16 @@ func checkTimeRange(s *storage.Storage, tr storage.TimeRange) error {
}
var (
vmselectDeleteMetricsRequests = metrics.NewCounter("vm_vmselect_delete_metrics_requests_total")
vmselectLabelsRequests = metrics.NewCounter("vm_vmselect_labels_requests_total")
vmselectLabelValuesRequests = metrics.NewCounter("vm_vmselect_label_values_requests_total")
vmselectLabelEntriesRequests = metrics.NewCounter("vm_vmselect_label_entries_requests_total")
vmselectSeriesCountRequests = metrics.NewCounter("vm_vmselect_series_count_requests_total")
vmselectTSDBStatusRequests = metrics.NewCounter("vm_vmselect_tsdb_status_requests_total")
vmselectSearchQueryRequests = metrics.NewCounter("vm_vmselect_search_query_requests_total")
vmselectMetricBlocksRead = metrics.NewCounter("vm_vmselect_metric_blocks_read_total")
vmselectMetricRowsRead = metrics.NewCounter("vm_vmselect_metric_rows_read_total")
vmselectDeleteMetricsRequests = metrics.NewCounter("vm_vmselect_delete_metrics_requests_total")
vmselectLabelsRequests = metrics.NewCounter("vm_vmselect_labels_requests_total")
vmselectLabelValuesRequests = metrics.NewCounter("vm_vmselect_label_values_requests_total")
vmselectTagValueSuffixesRequests = metrics.NewCounter("vm_vmselect_tag_value_suffixes_requests_total")
vmselectLabelEntriesRequests = metrics.NewCounter("vm_vmselect_label_entries_requests_total")
vmselectSeriesCountRequests = metrics.NewCounter("vm_vmselect_series_count_requests_total")
vmselectTSDBStatusRequests = metrics.NewCounter("vm_vmselect_tsdb_status_requests_total")
vmselectSearchQueryRequests = metrics.NewCounter("vm_vmselect_search_query_requests_total")
vmselectMetricBlocksRead = metrics.NewCounter("vm_vmselect_metric_blocks_read_total")
vmselectMetricRowsRead = metrics.NewCounter("vm_vmselect_metric_rows_read_total")
)
func (ctx *vmselectRequestCtx) setupTfss() error {

View file

@ -180,7 +180,7 @@ or [an alternative dashboard for VictoriaMetrics cluster](https://grafana.com/gr
- `prometheus/api/v1/import/csv` - for importing arbitrary CSV data. See [these docs](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/README.md#how-to-import-csv-data) for details.
- `prometheus/api/v1/import/prometheus` - for importing data in Prometheus exposition format. See [these docs](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/README.md#how-to-import-data-in-prometheus-exposition-format) for details.
* URLs for querying: `http://<vmselect>:8481/select/<accountID>/prometheus/<suffix>`, where:
* URLs for [Prmetheus querying API](https://prometheus.io/docs/prometheus/latest/querying/api/): `http://<vmselect>:8481/select/<accountID>/prometheus/<suffix>`, where:
- `<accountID>` is an arbitrary number identifying data namespace for the query (aka tenant)
- `<suffix>` may have the following values:
- `api/v1/query` - performs [PromQL instant query](https://prometheus.io/docs/prometheus/latest/querying/api/#instant-queries).
@ -194,6 +194,13 @@ or [an alternative dashboard for VictoriaMetrics cluster](https://grafana.com/gr
- `api/v1/status/active_queries` - for currently executed active queries. Note that every `vmselect` maintains an independent list of active queries,
which is returned in the response.
* URLs for [Graphite Metrics API](https://graphite-api.readthedocs.io/en/latest/api.html#the-metrics-api): `http://<vmselect>:8481/select/<accountID>/graphite/<suffix>`, where:
- `<accountID>` is an arbitrary number identifying data namespace for query (aka tenant)
- `<suffix>` may have the following values:
- `metrics/find` - searches Graphite metrics. See [these docs](https://graphite-api.readthedocs.io/en/latest/api.html#metrics-find).
- `metrics/expand` - expands Graphite metrics. See [these docs](https://graphite-api.readthedocs.io/en/latest/api.html#metrics-expand).
- `metrics/index.json` - returns all the metric names. See [these docs](https://graphite-api.readthedocs.io/en/latest/api.html#metrics-index-json).
* URL for time series deletion: `http://<vmselect>:8481/delete/<accountID>/prometheus/api/v1/admin/tsdb/delete_series?match[]=<timeseries_selector_for_delete>`.
Note that the `delete_series` handler should be used only in exceptional cases such as deletion of accidentally ingested incorrect time series. It shouldn't
be used on a regular basis, since it carries non-zero overhead.

View file

@ -103,6 +103,8 @@ See [features available for enterprise customers](https://github.com/VictoriaMet
* [How to import data in Prometheus exposition format](#how-to-import-data-in-prometheus-exposition-format)
* [How to import CSV data](#how-to-import-csv-data)
* [Prometheus querying API usage](#prometheus-querying-api-usage)
* [Prometheus querying API enhancements](#prometheus-querying-api-enhancements)
* [Graphite Metrics API usage](#graphite-metrics-api-usage)
* [How to build from sources](#how-to-build-from-sources)
* [Development build](#development-build)
* [Production build](#production-build)
@ -392,9 +394,11 @@ The `/api/v1/export` endpoint should return the following response:
### Querying Graphite data
Data sent to VictoriaMetrics via `Graphite plaintext protocol` may be read either via
[Prometheus querying API](#prometheus-querying-api-usage)
or via [go-graphite/carbonapi](https://github.com/go-graphite/carbonapi/blob/master/cmd/carbonapi/carbonapi.example.prometheus.yaml).
Data sent to VictoriaMetrics via `Graphite plaintext protocol` may be read via the following APIs:
* [Prometheus querying API](#prometheus-querying-api-usage)
* Metric names can be explored via [Graphite metrics API](#graphite-metrics-api-usage)
* [go-graphite/carbonapi](https://github.com/go-graphite/carbonapi/blob/master/cmd/carbonapi/carbonapi.example.prometheus.yaml)
### How to send data from OpenTSDB-compatible agents
@ -585,6 +589,21 @@ Additionally VictoriaMetrics provides the following handlers:
* `/api/v1/labels/count` - it returns a list of `label: values_count` entries. It can be used for determining labels with the maximum number of values.
* `/api/v1/status/active_queries` - it returns a list of currently running queries.
### Graphite Metrics API usage
VictoriaMetrics supports the following handlers from [Graphite Metrics API](https://graphite-api.readthedocs.io/en/latest/api.html#the-metrics-api):
* [/metrics/find](https://graphite-api.readthedocs.io/en/latest/api.html#metrics-find)
* [/metrics/expand](https://graphite-api.readthedocs.io/en/latest/api.html#metrics-expand)
* [/metrics/index.json](https://graphite-api.readthedocs.io/en/latest/api.html#metrics-index-json)
VictoriaMetrics accepts the following additional query args at `/metrics/find` and `/metrics/expand`:
* `label` - for selecting arbitrary label values. By default `label=__name__`, i.e. metric names are selected.
* `delimiter` - for using different delimiters in metric name hierachy. For example, `/metrics/find?delimiter=_&query=node_*` would return all the metric name prefixes
that start with `node_`. By default `delimiter=.`.
### How to build from sources
We recommend using either [binary releases](https://github.com/VictoriaMetrics/VictoriaMetrics/releases) or

View file

@ -923,6 +923,153 @@ func (is *indexSearch) searchTagValues(tvs map[string]struct{}, tagKey []byte, m
return nil
}
// SearchTagValueSuffixes returns all the tag value suffixes for the given tagKey and tagValuePrefix on the given tr.
//
// This allows implementing https://graphite-api.readthedocs.io/en/latest/api.html#metrics-find or similar APIs.
func (db *indexDB) SearchTagValueSuffixes(accountID, projectID uint32, tr TimeRange, tagKey, tagValuePrefix []byte,
delimiter byte, maxTagValueSuffixes int, deadline uint64) ([]string, error) {
// TODO: cache results?
tvss := make(map[string]struct{})
is := db.getIndexSearch(accountID, projectID, deadline)
err := is.searchTagValueSuffixesForTimeRange(tvss, tr, tagKey, tagValuePrefix, delimiter, maxTagValueSuffixes)
db.putIndexSearch(is)
if err != nil {
return nil, err
}
ok := db.doExtDB(func(extDB *indexDB) {
is := extDB.getIndexSearch(accountID, projectID, deadline)
err = is.searchTagValueSuffixesForTimeRange(tvss, tr, tagKey, tagValuePrefix, delimiter, maxTagValueSuffixes)
extDB.putIndexSearch(is)
})
if ok && err != nil {
return nil, err
}
suffixes := make([]string, 0, len(tvss))
for suffix := range tvss {
// Do not skip empty suffixes, since they may represent leaf tag values.
suffixes = append(suffixes, suffix)
}
// Do not sort suffixes, since they must be sorted by vmselect.
return suffixes, nil
}
func (is *indexSearch) searchTagValueSuffixesForTimeRange(tvss map[string]struct{}, tr TimeRange, tagKey, tagValuePrefix []byte, delimiter byte, maxTagValueSuffixes int) error {
minDate := uint64(tr.MinTimestamp) / msecPerDay
maxDate := uint64(tr.MaxTimestamp) / msecPerDay
if maxDate-minDate > maxDaysForDateMetricIDs {
return is.searchTagValueSuffixesAll(tvss, tagKey, tagValuePrefix, delimiter, maxTagValueSuffixes)
}
// Query over multiple days in parallel.
var wg sync.WaitGroup
var errGlobal error
var mu sync.Mutex // protects tvss + errGlobal from concurrent access below.
for minDate <= maxDate {
wg.Add(1)
go func(date uint64) {
defer wg.Done()
tvssLocal := make(map[string]struct{})
isLocal := is.db.getIndexSearch(is.accountID, is.projectID, is.deadline)
defer is.db.putIndexSearch(isLocal)
err := isLocal.searchTagValueSuffixesForDate(tvssLocal, date, tagKey, tagValuePrefix, delimiter, maxTagValueSuffixes)
mu.Lock()
defer mu.Unlock()
if errGlobal != nil {
return
}
if err != nil {
errGlobal = err
return
}
for k := range tvssLocal {
tvss[k] = struct{}{}
}
}(minDate)
minDate++
}
wg.Wait()
return errGlobal
}
func (is *indexSearch) searchTagValueSuffixesAll(tvss map[string]struct{}, tagKey, tagValuePrefix []byte, delimiter byte, maxTagValueSuffixes int) error {
kb := &is.kb
nsPrefix := byte(nsPrefixTagToMetricIDs)
kb.B = is.marshalCommonPrefix(kb.B[:0], nsPrefix)
kb.B = marshalTagValue(kb.B, tagKey)
kb.B = marshalTagValue(kb.B, tagValuePrefix)
kb.B = kb.B[:len(kb.B)-1] // remove tagSeparatorChar from the end of kb.B
prefix := append([]byte(nil), kb.B...)
return is.searchTagValueSuffixesForPrefix(tvss, nsPrefix, prefix, tagValuePrefix, delimiter, maxTagValueSuffixes)
}
func (is *indexSearch) searchTagValueSuffixesForDate(tvss map[string]struct{}, date uint64, tagKey, tagValuePrefix []byte, delimiter byte, maxTagValueSuffixes int) error {
nsPrefix := byte(nsPrefixDateTagToMetricIDs)
kb := &is.kb
kb.B = is.marshalCommonPrefix(kb.B[:0], nsPrefix)
kb.B = encoding.MarshalUint64(kb.B, date)
kb.B = marshalTagValue(kb.B, tagKey)
kb.B = marshalTagValue(kb.B, tagValuePrefix)
kb.B = kb.B[:len(kb.B)-1] // remove tagSeparatorChar from the end of kb.B
prefix := append([]byte(nil), kb.B...)
return is.searchTagValueSuffixesForPrefix(tvss, nsPrefix, prefix, tagValuePrefix, delimiter, maxTagValueSuffixes)
}
func (is *indexSearch) searchTagValueSuffixesForPrefix(tvss map[string]struct{}, nsPrefix byte, prefix, tagValuePrefix []byte, delimiter byte, maxTagValueSuffixes int) error {
kb := &is.kb
ts := &is.ts
mp := &is.mp
mp.Reset()
dmis := is.db.getDeletedMetricIDs()
loopsPaceLimiter := 0
ts.Seek(prefix)
for len(tvss) < maxTagValueSuffixes && ts.NextItem() {
if loopsPaceLimiter&paceLimiterFastIterationsMask == 0 {
if err := checkSearchDeadlineAndPace(is.deadline); err != nil {
return err
}
}
loopsPaceLimiter++
item := ts.Item
if !bytes.HasPrefix(item, prefix) {
break
}
if err := mp.Init(item, nsPrefix); err != nil {
return err
}
if mp.IsDeletedTag(dmis) {
continue
}
tagValue := mp.Tag.Value
if !bytes.HasPrefix(tagValue, tagValuePrefix) {
continue
}
suffix := tagValue[len(tagValuePrefix):]
n := bytes.IndexByte(suffix, delimiter)
if n < 0 {
// Found leaf tag value that doesn't have delimiters after the given tagValuePrefix.
tvss[string(suffix)] = struct{}{}
continue
}
// Found non-leaf tag value. Extract suffix that end with the given delimiter.
suffix = suffix[:n+1]
tvss[string(suffix)] = struct{}{}
if suffix[len(suffix)-1] == 255 {
continue
}
// Search for the next suffix
suffix[len(suffix)-1]++
kb.B = append(kb.B[:0], prefix...)
kb.B = marshalTagValue(kb.B, suffix)
kb.B = kb.B[:len(kb.B)-1] // remove tagSeparatorChar
ts.Seek(kb.B)
}
if err := ts.Error(); err != nil {
return fmt.Errorf("error when searching for tag value sufixes for prefix %q: %w", prefix, err)
}
return nil
}
// GetSeriesCount returns the approximate number of unique timeseries for the given (accountID, projectID).
//
// It includes the deleted series too and may count the same series

View file

@ -999,6 +999,14 @@ func (s *Storage) SearchTagValues(accountID, projectID uint32, tagKey []byte, ma
return s.idb().SearchTagValues(accountID, projectID, tagKey, maxTagValues, deadline)
}
// SearchTagValueSuffixes returns all the tag value suffixes for the given tagKey and tagValuePrefix on the given tr.
//
// This allows implementing https://graphite-api.readthedocs.io/en/latest/api.html#metrics-find or similar APIs.
func (s *Storage) SearchTagValueSuffixes(accountID, projectID uint32, tr TimeRange, tagKey, tagValuePrefix []byte,
delimiter byte, maxTagValueSuffixes int, deadline uint64) ([]string, error) {
return s.idb().SearchTagValueSuffixes(accountID, projectID, tr, tagKey, tagValuePrefix, delimiter, maxTagValueSuffixes, deadline)
}
// SearchTagEntries returns a list of (tagName -> tagValues) for (accountID, projectID).
func (s *Storage) SearchTagEntries(accountID, projectID uint32, maxTagKeys, maxTagValues int, deadline uint64) ([]TagEntry, error) {
idb := s.idb()