app/vmselect: implement /tags/tagSeries and /tags/tagMultiSeries` in order to be consistent with single-node VictoriaMetrics

This commit is contained in:
Aliaksandr Valialkin 2020-11-23 12:33:17 +02:00
parent 7987129baa
commit 433ae806ac
8 changed files with 371 additions and 34 deletions

View file

@ -205,6 +205,8 @@ or [an alternative dashboard for VictoriaMetrics cluster](https://grafana.com/gr
- `metrics/find` - searches Graphite metrics. See [these docs](https://graphite-api.readthedocs.io/en/latest/api.html#metrics-find).
- `metrics/expand` - expands Graphite metrics. See [these docs](https://graphite-api.readthedocs.io/en/latest/api.html#metrics-expand).
- `metrics/index.json` - returns all the metric names. See [these docs](https://graphite-api.readthedocs.io/en/latest/api.html#metrics-index-json).
- `tags/tagSeries` - registers time series. See [these docs](https://graphite.readthedocs.io/en/stable/tags.html#adding-series-to-the-tagdb).
- `tags/tagMultiSeries` - register multiple time series. See [these docs](https://graphite.readthedocs.io/en/stable/tags.html#adding-series-to-the-tagdb).
- `tags` - returns tag names. See [these docs](https://graphite.readthedocs.io/en/stable/tags.html#exploring-tags).
- `tags/<tag_name>` - returns tag values for the given `<tag_name>`. See [these docs](https://graphite.readthedocs.io/en/stable/tags.html#exploring-tags).
- `tags/findSeries` - returns series matching the given `expr`. See [these docs](https://graphite.readthedocs.io/en/stable/tags.html#exploring-tags).

View file

@ -13,10 +13,100 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/netstorage"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/searchutils"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
graphiteparser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/graphite"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
"github.com/VictoriaMetrics/metrics"
)
// TagsTagSeriesHandler implements /tags/tagSeries handler.
//
// See https://graphite.readthedocs.io/en/stable/tags.html#adding-series-to-the-tagdb
func TagsTagSeriesHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter, r *http.Request) error {
return registerMetrics(startTime, at, w, r, false)
}
// TagsTagMultiSeriesHandler implements /tags/tagMultiSeries handler.
//
// See https://graphite.readthedocs.io/en/stable/tags.html#adding-series-to-the-tagdb
func TagsTagMultiSeriesHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter, r *http.Request) error {
return registerMetrics(startTime, at, w, r, true)
}
func registerMetrics(startTime time.Time, at *auth.Token, w http.ResponseWriter, r *http.Request, isJSONResponse bool) error {
deadline := searchutils.GetDeadlineForQuery(r, startTime)
if err := r.ParseForm(); err != nil {
return fmt.Errorf("cannot parse form values: %w", err)
}
paths := r.Form["path"]
var row graphiteparser.Row
var labels []prompb.Label
var b []byte
var tagsPool []graphiteparser.Tag
mrs := make([]storage.MetricRow, len(paths))
ct := time.Now().UnixNano() / 1e6
canonicalPaths := make([]string, len(paths))
for i, path := range paths {
var err error
tagsPool, err = row.UnmarshalMetricAndTags(path, tagsPool[:0])
if err != nil {
return fmt.Errorf("cannot parse path=%q: %w", path, err)
}
// Construct canonical path according to https://graphite.readthedocs.io/en/stable/tags.html#adding-series-to-the-tagdb
sort.Slice(row.Tags, func(i, j int) bool {
return row.Tags[i].Key < row.Tags[j].Key
})
b = append(b[:0], row.Metric...)
for _, tag := range row.Tags {
b = append(b, ';')
b = append(b, tag.Key...)
b = append(b, '=')
b = append(b, tag.Value...)
}
canonicalPaths[i] = string(b)
// Convert parsed metric and tags to labels.
labels = append(labels[:0], prompb.Label{
Name: []byte("__name__"),
Value: []byte(row.Metric),
})
for _, tag := range row.Tags {
labels = append(labels, prompb.Label{
Name: []byte(tag.Key),
Value: []byte(tag.Value),
})
}
// Put labels with the current timestamp to MetricRow
mr := &mrs[i]
mr.MetricNameRaw = storage.MarshalMetricNameRaw(mr.MetricNameRaw[:0], at.AccountID, at.ProjectID, labels)
mr.Timestamp = ct
}
if err := netstorage.RegisterMetricNames(at, mrs, deadline); err != nil {
return fmt.Errorf("cannot register paths: %w", err)
}
// Return response
contentType := "text/plain; charset=utf-8"
if isJSONResponse {
contentType = "application/json; charset=utf-8"
}
w.Header().Set("Content-Type", contentType)
WriteTagsTagMultiSeriesResponse(w, canonicalPaths, isJSONResponse)
if isJSONResponse {
tagsTagMultiSeriesDuration.UpdateDuration(startTime)
} else {
tagsTagSeriesDuration.UpdateDuration(startTime)
}
return nil
}
var (
tagsTagSeriesDuration = metrics.NewSummary(`vm_request_duration_seconds{path="/tags/tagSeries"}`)
tagsTagMultiSeriesDuration = metrics.NewSummary(`vm_request_duration_seconds{path="/tags/tagMultiSeries"}`)
)
// TagsAutoCompleteValuesHandler implements /tags/autoComplete/values endpoint from Graphite Tags API.
//
// See https://graphite.readthedocs.io/en/stable/tags.html#auto-complete-support

View file

@ -0,0 +1,14 @@
{% stripspace %}
TagsTagMultiSeriesResponse generates response for /tags/tagMultiSeries .
See https://graphite.readthedocs.io/en/stable/tags.html#adding-series-to-the-tagdb
{% func TagsTagMultiSeriesResponse(canonicalPaths []string, isJSONResponse bool) %}
{% if isJSONResponse %}[{% endif %}
{% for i, path := range canonicalPaths %}
{%q= path %}
{% if i+1 < len(canonicalPaths) %},{% endif %}
{% endfor %}
{% if isJSONResponse %}]{% endif %}
{% endfunc %}
{% endstripspace %}

View file

@ -0,0 +1,75 @@
// Code generated by qtc from "tags_tag_multi_series_response.qtpl". DO NOT EDIT.
// See https://github.com/valyala/quicktemplate for details.
// TagsTagMultiSeriesResponse generates response for /tags/tagMultiSeries .See https://graphite.readthedocs.io/en/stable/tags.html#adding-series-to-the-tagdb
//line app/vmselect/graphite/tags_tag_multi_series_response.qtpl:5
package graphite
//line app/vmselect/graphite/tags_tag_multi_series_response.qtpl:5
import (
qtio422016 "io"
qt422016 "github.com/valyala/quicktemplate"
)
//line app/vmselect/graphite/tags_tag_multi_series_response.qtpl:5
var (
_ = qtio422016.Copy
_ = qt422016.AcquireByteBuffer
)
//line app/vmselect/graphite/tags_tag_multi_series_response.qtpl:5
func StreamTagsTagMultiSeriesResponse(qw422016 *qt422016.Writer, canonicalPaths []string, isJSONResponse bool) {
//line app/vmselect/graphite/tags_tag_multi_series_response.qtpl:6
if isJSONResponse {
//line app/vmselect/graphite/tags_tag_multi_series_response.qtpl:6
qw422016.N().S(`[`)
//line app/vmselect/graphite/tags_tag_multi_series_response.qtpl:6
}
//line app/vmselect/graphite/tags_tag_multi_series_response.qtpl:7
for i, path := range canonicalPaths {
//line app/vmselect/graphite/tags_tag_multi_series_response.qtpl:8
qw422016.N().Q(path)
//line app/vmselect/graphite/tags_tag_multi_series_response.qtpl:9
if i+1 < len(canonicalPaths) {
//line app/vmselect/graphite/tags_tag_multi_series_response.qtpl:9
qw422016.N().S(`,`)
//line app/vmselect/graphite/tags_tag_multi_series_response.qtpl:9
}
//line app/vmselect/graphite/tags_tag_multi_series_response.qtpl:10
}
//line app/vmselect/graphite/tags_tag_multi_series_response.qtpl:11
if isJSONResponse {
//line app/vmselect/graphite/tags_tag_multi_series_response.qtpl:11
qw422016.N().S(`]`)
//line app/vmselect/graphite/tags_tag_multi_series_response.qtpl:11
}
//line app/vmselect/graphite/tags_tag_multi_series_response.qtpl:12
}
//line app/vmselect/graphite/tags_tag_multi_series_response.qtpl:12
func WriteTagsTagMultiSeriesResponse(qq422016 qtio422016.Writer, canonicalPaths []string, isJSONResponse bool) {
//line app/vmselect/graphite/tags_tag_multi_series_response.qtpl:12
qw422016 := qt422016.AcquireWriter(qq422016)
//line app/vmselect/graphite/tags_tag_multi_series_response.qtpl:12
StreamTagsTagMultiSeriesResponse(qw422016, canonicalPaths, isJSONResponse)
//line app/vmselect/graphite/tags_tag_multi_series_response.qtpl:12
qt422016.ReleaseWriter(qw422016)
//line app/vmselect/graphite/tags_tag_multi_series_response.qtpl:12
}
//line app/vmselect/graphite/tags_tag_multi_series_response.qtpl:12
func TagsTagMultiSeriesResponse(canonicalPaths []string, isJSONResponse bool) string {
//line app/vmselect/graphite/tags_tag_multi_series_response.qtpl:12
qb422016 := qt422016.AcquireByteBuffer()
//line app/vmselect/graphite/tags_tag_multi_series_response.qtpl:12
WriteTagsTagMultiSeriesResponse(qb422016, canonicalPaths, isJSONResponse)
//line app/vmselect/graphite/tags_tag_multi_series_response.qtpl:12
qs422016 := string(qb422016.B)
//line app/vmselect/graphite/tags_tag_multi_series_response.qtpl:12
qt422016.ReleaseByteBuffer(qb422016)
//line app/vmselect/graphite/tags_tag_multi_series_response.qtpl:12
return qs422016
//line app/vmselect/graphite/tags_tag_multi_series_response.qtpl:12
}

View file

@ -346,6 +346,22 @@ func selectHandler(startTime time.Time, w http.ResponseWriter, r *http.Request,
return true
}
return true
case "graphite/tags/tagSeries":
graphiteTagsTagSeriesRequests.Inc()
if err := graphite.TagsTagSeriesHandler(startTime, at, w, r); err != nil {
graphiteTagsTagSeriesErrors.Inc()
httpserver.Errorf(w, r, "error in %q: %s", r.URL.Path, err)
return true
}
return true
case "graphite/tags/tagMultiSeries":
graphiteTagsTagMultiSeriesRequests.Inc()
if err := graphite.TagsTagMultiSeriesHandler(startTime, at, w, r); err != nil {
graphiteTagsTagMultiSeriesErrors.Inc()
httpserver.Errorf(w, r, "error in %q: %s", r.URL.Path, err)
return true
}
return true
case "graphite/tags":
graphiteTagsRequests.Inc()
if err := graphite.TagsHandler(startTime, at, w, r); err != nil {
@ -495,6 +511,12 @@ var (
graphiteMetricsIndexRequests = metrics.NewCounter(`vm_http_requests_total{path="/select/{}/graphite/metrics/index.json"}`)
graphiteMetricsIndexErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/select/{}/graphite/metrics/index.json"}`)
graphiteTagsTagSeriesRequests = metrics.NewCounter(`vm_http_requests_total{path="/select/{}/graphite/tags/tagSeries"}`)
graphiteTagsTagSeriesErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/select/{}/graphite/tags/tagSeries"}`)
graphiteTagsTagMultiSeriesRequests = metrics.NewCounter(`vm_http_requests_total{path="/select/{}/graphite/tags/tagMultiSeries"}`)
graphiteTagsTagMultiSeriesErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/select/{}/graphite/tags/tagMultiSeries"}`)
graphiteTagsRequests = metrics.NewCounter(`vm_http_requests_total{path="/select/{}/graphite/tags"}`)
graphiteTagsErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/select/{}/graphite/tags"}`)

View file

@ -27,6 +27,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/syncwg"
"github.com/VictoriaMetrics/metrics"
xxhash "github.com/cespare/xxhash/v2"
)
var replicationFactor = flag.Int("replicationFactor", 1, "How many copies of every time series is available on vmstorage nodes. "+
@ -451,6 +452,42 @@ func (sbh *sortBlocksHeap) Pop() interface{} {
return v
}
// RegisterMetricNames registers metric names from mrs in the storage.
func RegisterMetricNames(at *auth.Token, mrs []storage.MetricRow, deadline searchutils.Deadline) error {
// Split mrs among available vmstorage nodes.
mrsPerNode := make([][]storage.MetricRow, len(storageNodes))
for _, mr := range mrs {
idx := 0
if len(storageNodes) > 1 {
// There is no need in using the same hash as for time series distribution in vminsert,
// since RegisterMetricNames is used only in Graphite Tags API.
h := xxhash.Sum64(mr.MetricNameRaw)
idx = int(h % uint64(len(storageNodes)))
}
mrsPerNode[idx] = append(mrsPerNode[idx], mr)
}
// Push mrs to storage nodes in parallel.
snr := startStorageNodesRequest(true, func(idx int, sn *storageNode) interface{} {
sn.registerMetricNamesRequests.Inc()
err := sn.registerMetricNames(mrsPerNode[idx], deadline)
if err != nil {
sn.registerMetricNamesRequestErrors.Inc()
}
return &err
})
// Collect results
err := snr.collectAllResults(func(result interface{}) error {
errP := result.(*error)
return *errP
})
if err != nil {
return fmt.Errorf("cannot register series on all the vmstorage nodes: %w", err)
}
return nil
}
// DeleteSeries deletes time series matching the given sq.
func DeleteSeries(at *auth.Token, sq *storage.SearchQuery, deadline searchutils.Deadline) (int, error) {
requestData := sq.Marshal(nil)
@ -460,7 +497,7 @@ func DeleteSeries(at *auth.Token, sq *storage.SearchQuery, deadline searchutils.
deletedCount int
err error
}
snr := startStorageNodesRequest(true, func(sn *storageNode) interface{} {
snr := startStorageNodesRequest(true, func(idx int, sn *storageNode) interface{} {
sn.deleteSeriesRequests.Inc()
deletedCount, err := sn.deleteMetrics(requestData, deadline)
if err != nil {
@ -474,7 +511,7 @@ func DeleteSeries(at *auth.Token, sq *storage.SearchQuery, deadline searchutils.
// Collect results
deletedTotal := 0
err := snr.collectAllResults(partialDeleteSeriesResults, func(result interface{}) error {
err := snr.collectAllResults(func(result interface{}) error {
nr := result.(*nodeResult)
if nr.err != nil {
return nr.err
@ -498,7 +535,7 @@ func GetLabelsOnTimeRange(at *auth.Token, denyPartialResponse bool, tr storage.T
labels []string
err error
}
snr := startStorageNodesRequest(denyPartialResponse, func(sn *storageNode) interface{} {
snr := startStorageNodesRequest(denyPartialResponse, func(idx int, sn *storageNode) interface{} {
sn.labelsOnTimeRangeRequests.Inc()
labels, err := sn.getLabelsOnTimeRange(at.AccountID, at.ProjectID, tr, deadline)
if err != nil {
@ -577,7 +614,7 @@ func GetLabels(at *auth.Token, denyPartialResponse bool, deadline searchutils.De
labels []string
err error
}
snr := startStorageNodesRequest(denyPartialResponse, func(sn *storageNode) interface{} {
snr := startStorageNodesRequest(denyPartialResponse, func(idx int, sn *storageNode) interface{} {
sn.labelsRequests.Inc()
labels, err := sn.getLabels(at.AccountID, at.ProjectID, deadline)
if err != nil {
@ -632,7 +669,7 @@ func GetLabelValuesOnTimeRange(at *auth.Token, denyPartialResponse bool, labelNa
labelValues []string
err error
}
snr := startStorageNodesRequest(denyPartialResponse, func(sn *storageNode) interface{} {
snr := startStorageNodesRequest(denyPartialResponse, func(idx int, sn *storageNode) interface{} {
sn.labelValuesOnTimeRangeRequests.Inc()
labelValues, err := sn.getLabelValuesOnTimeRange(at.AccountID, at.ProjectID, labelName, tr, deadline)
if err != nil {
@ -705,7 +742,7 @@ func GetLabelValues(at *auth.Token, denyPartialResponse bool, labelName string,
labelValues []string
err error
}
snr := startStorageNodesRequest(denyPartialResponse, func(sn *storageNode) interface{} {
snr := startStorageNodesRequest(denyPartialResponse, func(idx int, sn *storageNode) interface{} {
sn.labelValuesRequests.Inc()
labelValues, err := sn.getLabelValues(at.AccountID, at.ProjectID, labelName, deadline)
if err != nil {
@ -752,7 +789,7 @@ func GetTagValueSuffixes(at *auth.Token, denyPartialResponse bool, tr storage.Ti
suffixes []string
err error
}
snr := startStorageNodesRequest(denyPartialResponse, func(sn *storageNode) interface{} {
snr := startStorageNodesRequest(denyPartialResponse, func(idx int, sn *storageNode) interface{} {
sn.tagValueSuffixesRequests.Inc()
suffixes, err := sn.getTagValueSuffixes(at.AccountID, at.ProjectID, tr, tagKey, tagValuePrefix, delimiter, deadline)
if err != nil {
@ -799,7 +836,7 @@ func GetLabelEntries(at *auth.Token, denyPartialResponse bool, deadline searchut
labelEntries []storage.TagEntry
err error
}
snr := startStorageNodesRequest(denyPartialResponse, func(sn *storageNode) interface{} {
snr := startStorageNodesRequest(denyPartialResponse, func(idx int, sn *storageNode) interface{} {
sn.labelEntriesRequests.Inc()
labelEntries, err := sn.getLabelEntries(at.AccountID, at.ProjectID, deadline)
if err != nil {
@ -889,7 +926,7 @@ func GetTSDBStatusForDate(at *auth.Token, denyPartialResponse bool, deadline sea
status *storage.TSDBStatus
err error
}
snr := startStorageNodesRequest(denyPartialResponse, func(sn *storageNode) interface{} {
snr := startStorageNodesRequest(denyPartialResponse, func(idx int, sn *storageNode) interface{} {
sn.tsdbStatusRequests.Inc()
status, err := sn.getTSDBStatusForDate(at.AccountID, at.ProjectID, date, topN, deadline)
if err != nil {
@ -976,7 +1013,7 @@ func GetSeriesCount(at *auth.Token, denyPartialResponse bool, deadline searchuti
n uint64
err error
}
snr := startStorageNodesRequest(denyPartialResponse, func(sn *storageNode) interface{} {
snr := startStorageNodesRequest(denyPartialResponse, func(idx int, sn *storageNode) interface{} {
sn.seriesCountRequests.Inc()
n, err := sn.getSeriesCount(at.AccountID, at.ProjectID, deadline)
if err != nil {
@ -1110,7 +1147,7 @@ func SearchMetricNames(at *auth.Token, denyPartialResponse bool, sq *storage.Sea
metricNames [][]byte
err error
}
snr := startStorageNodesRequest(denyPartialResponse, func(sn *storageNode) interface{} {
snr := startStorageNodesRequest(denyPartialResponse, func(idx int, sn *storageNode) interface{} {
sn.searchMetricNamesRequests.Inc()
metricNames, err := sn.processSearchMetricNames(requestData, deadline)
if err != nil {
@ -1221,7 +1258,7 @@ func processSearchQuery(at *auth.Token, denyPartialResponse bool, sq *storage.Se
requestData := sq.Marshal(nil)
// Send the query to all the storage nodes in parallel.
snr := startStorageNodesRequest(denyPartialResponse, func(sn *storageNode) interface{} {
snr := startStorageNodesRequest(denyPartialResponse, func(idx int, sn *storageNode) interface{} {
sn.searchRequests.Inc()
err := sn.processSearchQuery(requestData, fetchData, processBlock, deadline)
if err != nil {
@ -1247,13 +1284,13 @@ type storageNodesRequest struct {
resultsCh chan interface{}
}
func startStorageNodesRequest(denyPartialResponse bool, f func(sn *storageNode) interface{}) *storageNodesRequest {
func startStorageNodesRequest(denyPartialResponse bool, f func(idx int, sn *storageNode) interface{}) *storageNodesRequest {
resultsCh := make(chan interface{}, len(storageNodes))
for _, sn := range storageNodes {
go func(sn *storageNode) {
result := f(sn)
for idx, sn := range storageNodes {
go func(idx int, sn *storageNode) {
result := f(idx, sn)
resultsCh <- result
}(sn)
}(idx, sn)
}
return &storageNodesRequest{
denyPartialResponse: denyPartialResponse,
@ -1261,7 +1298,7 @@ func startStorageNodesRequest(denyPartialResponse bool, f func(sn *storageNode)
}
}
func (snr *storageNodesRequest) collectAllResults(partialResultsCounter *metrics.Counter, f func(result interface{}) error) error {
func (snr *storageNodesRequest) collectAllResults(f func(result interface{}) error) error {
var errors []error
for i := 0; i < len(storageNodes); i++ {
result := <-snr.resultsCh
@ -1280,8 +1317,8 @@ func (snr *storageNodesRequest) collectResults(partialResultsCounter *metrics.Co
var errors []error
resultsCollected := 0
for i := 0; i < len(storageNodes); i++ {
// There is no need in timer here, since all the goroutines executing
// the sn.process* function must be finished until the deadline.
// There is no need in timer here, since all the goroutines executing the f function
// passed to startStorageNodesRequest must be finished until the deadline.
result := <-snr.resultsCh
if err := f(result); err != nil {
errors = append(errors, err)
@ -1326,6 +1363,12 @@ type storageNode struct {
// The channel for limiting the maximum number of concurrent queries to storageNode.
concurrentQueriesCh chan struct{}
// The number of RegisterMetricNames requests to storageNode.
registerMetricNamesRequests *metrics.Counter
// The number of RegisterMetricNames request errors to storageNode.
registerMetricNamesRequestErrors *metrics.Counter
// The number of DeleteSeries requests to storageNode.
deleteSeriesRequests *metrics.Counter
@ -1399,6 +1442,22 @@ type storageNode struct {
metricRowsRead *metrics.Counter
}
func (sn *storageNode) registerMetricNames(mrs []storage.MetricRow, deadline searchutils.Deadline) error {
if len(mrs) == 0 {
return nil
}
f := func(bc *handshake.BufferedConn) error {
return sn.registerMetricNamesOnConn(bc, mrs)
}
if err := sn.execOnConn("registerMetricNames_v1", f, deadline); err != nil {
// Try again before giving up.
if err = sn.execOnConn("registerMetricNames_v1", f, deadline); err != nil {
return err
}
}
return nil
}
func (sn *storageNode) deleteMetrics(requestData []byte, deadline searchutils.Deadline) (int, error) {
var deletedCount int
f := func(bc *handshake.BufferedConn) error {
@ -1714,6 +1773,34 @@ func newErrRemote(buf []byte) error {
}
}
func (sn *storageNode) registerMetricNamesOnConn(bc *handshake.BufferedConn, mrs []storage.MetricRow) error {
// Send the request to sn.
if err := writeUint64(bc, uint64(len(mrs))); err != nil {
return fmt.Errorf("cannot send metricsCount to conn: %w", err)
}
for i, mr := range mrs {
if err := writeBytes(bc, mr.MetricNameRaw); err != nil {
return fmt.Errorf("cannot send MetricNameRaw #%d to conn: %w", i+1, err)
}
if err := writeUint64(bc, uint64(mr.Timestamp)); err != nil {
return fmt.Errorf("cannot send Timestamp #%d to conn: %w", i+1, err)
}
}
if err := bc.Flush(); err != nil {
return fmt.Errorf("cannot flush registerMetricNames request to conn: %w", err)
}
// Read response error.
buf, err := readBytes(nil, bc, maxErrorMessageSize)
if err != nil {
return fmt.Errorf("cannot read error message: %w", err)
}
if len(buf) > 0 {
return newErrRemote(buf)
}
return nil
}
func (sn *storageNode) deleteMetricsOnConn(bc *handshake.BufferedConn, requestData []byte) (int, error) {
// Send the request to sn
if err := writeBytes(bc, requestData); err != nil {
@ -2269,6 +2356,8 @@ func InitStorageNodes(addrs []string) {
concurrentQueriesCh: make(chan struct{}, maxConcurrentQueriesPerStorageNode),
registerMetricNamesRequests: metrics.NewCounter(fmt.Sprintf(`vm_requests_total{action="registerMetricNames", type="rpcClient", name="vmselect", addr=%q}`, addr)),
registerMetricNamesRequestErrors: metrics.NewCounter(fmt.Sprintf(`vm_request_errors_total{action="registerMetricNames", type="rpcClient", name="vmselect", addr=%q}`, addr)),
deleteSeriesRequests: metrics.NewCounter(fmt.Sprintf(`vm_requests_total{action="deleteSeries", type="rpcClient", name="vmselect", addr=%q}`, addr)),
deleteSeriesRequestErrors: metrics.NewCounter(fmt.Sprintf(`vm_request_errors_total{action="deleteSeries", type="rpcClient", name="vmselect", addr=%q}`, addr)),
labelsOnTimeRangeRequests: metrics.NewCounter(fmt.Sprintf(`vm_requests_total{action="labelsOnTimeRange", type="rpcClient", name="vmselect", addr=%q}`, addr)),
@ -2307,7 +2396,6 @@ func Stop() {
}
var (
partialDeleteSeriesResults = metrics.NewCounter(`vm_partial_results_total{type="delete_series", name="vmselect"}`)
partialLabelsOnTimeRangeResults = metrics.NewCounter(`vm_partial_results_total{type="labels_on_time_range", name="vmselect"}`)
partialLabelsResults = metrics.NewCounter(`vm_partial_results_total{type="labels", name="vmselect"}`)
partialLabelValuesOnTimeRangeResults = metrics.NewCounter(`vm_partial_results_total{type="label_values_on_time_range", name="vmselect"}`)

View file

@ -698,11 +698,53 @@ func (s *Server) processVMSelectRequest(ctx *vmselectRequestCtx) error {
return s.processVMSelectTSDBStatus(ctx)
case "deleteMetrics_v3":
return s.processVMSelectDeleteMetrics(ctx)
case "registerMetricNames_v1":
return s.processVMSelectRegisterMetricNames(ctx)
default:
return fmt.Errorf("unsupported rpcName: %q", ctx.dataBuf)
}
}
const maxMetricNameRawSize = 1024 * 1024
const maxMetricNamesPerRequest = 1024 * 1024
func (s *Server) processVMSelectRegisterMetricNames(ctx *vmselectRequestCtx) error {
vmselectRegisterMetricNamesRequests.Inc()
// Read request
metricsCount, err := ctx.readUint64()
if err != nil {
return fmt.Errorf("cannot read metricsCount: %w", err)
}
if metricsCount > maxMetricNamesPerRequest {
return fmt.Errorf("too many metric names in a single request; got %d; mustn't exceed %d", metricsCount, maxMetricNamesPerRequest)
}
mrs := make([]storage.MetricRow, metricsCount)
for i := 0; i < int(metricsCount); i++ {
if err := ctx.readDataBufBytes(maxMetricNameRawSize); err != nil {
return fmt.Errorf("cannot read metricNameRaw: %w", err)
}
mr := &mrs[i]
mr.MetricNameRaw = append(mr.MetricNameRaw[:0], ctx.dataBuf...)
n, err := ctx.readUint64()
if err != nil {
return fmt.Errorf("cannot read timestamp: %w", err)
}
mr.Timestamp = int64(n)
}
// Register metric names from mrs.
if err := s.storage.RegisterMetricNames(mrs); err != nil {
return ctx.writeErrorMessage(err)
}
// Send an empty error message to vmselect.
if err := ctx.writeString(""); err != nil {
return fmt.Errorf("cannot send empty error message: %w", err)
}
return nil
}
const maxTagFiltersSize = 64 * 1024
func (s *Server) processVMSelectDeleteMetrics(ctx *vmselectRequestCtx) error {
@ -1193,19 +1235,21 @@ func checkTimeRange(s *storage.Storage, tr storage.TimeRange) error {
}
var (
vmselectDeleteMetricsRequests = metrics.NewCounter("vm_vmselect_delete_metrics_requests_total")
vmselectLabelsOnTimeRangeRequests = metrics.NewCounter("vm_vmselect_labels_on_time_range_requests_total")
vmselectLabelsRequests = metrics.NewCounter("vm_vmselect_labels_requests_total")
vmselectLabelValuesOnTimeRangeRequests = metrics.NewCounter("vm_vmselect_label_values_on_time_range_requests_total")
vmselectLabelValuesRequests = metrics.NewCounter("vm_vmselect_label_values_requests_total")
vmselectTagValueSuffixesRequests = metrics.NewCounter("vm_vmselect_tag_value_suffixes_requests_total")
vmselectLabelEntriesRequests = metrics.NewCounter("vm_vmselect_label_entries_requests_total")
vmselectSeriesCountRequests = metrics.NewCounter("vm_vmselect_series_count_requests_total")
vmselectTSDBStatusRequests = metrics.NewCounter("vm_vmselect_tsdb_status_requests_total")
vmselectSearchMetricNamesRequests = metrics.NewCounter("vm_vmselect_search_metric_names_requests_total")
vmselectSearchRequests = metrics.NewCounter("vm_vmselect_search_requests_total")
vmselectMetricBlocksRead = metrics.NewCounter("vm_vmselect_metric_blocks_read_total")
vmselectMetricRowsRead = metrics.NewCounter("vm_vmselect_metric_rows_read_total")
vmselectRegisterMetricNamesRequests = metrics.NewCounter(`vm_vmselect_rpc_requests_total{name="register_metric_names"}`)
vmselectDeleteMetricsRequests = metrics.NewCounter(`vm_vmselect_rpc_requests_total{name="delete_metrics"}`)
vmselectLabelsOnTimeRangeRequests = metrics.NewCounter(`vm_vmselect_rpc_requests_total{name="labels_on_time_range"}`)
vmselectLabelsRequests = metrics.NewCounter(`vm_vmselect_rpc_requests_total{name="labels"}`)
vmselectLabelValuesOnTimeRangeRequests = metrics.NewCounter(`vm_vmselect_rpc_requests_total{name="label_values_on_time_range"}`)
vmselectLabelValuesRequests = metrics.NewCounter(`vm_vmselect_rpc_requests_total{name="label_values"}`)
vmselectTagValueSuffixesRequests = metrics.NewCounter(`vm_vmselect_rpc_requests_total{name="tag_value_suffixes"}`)
vmselectLabelEntriesRequests = metrics.NewCounter(`vm_vmselect_rpc_requests_total{name="label_entries"}`)
vmselectSeriesCountRequests = metrics.NewCounter(`vm_vmselect_rpc_requests_total{name="series_count"}`)
vmselectTSDBStatusRequests = metrics.NewCounter(`vm_vmselect_rpc_requests_total{name="tsdb_status"}`)
vmselectSearchMetricNamesRequests = metrics.NewCounter(`vm_vmselect_rpc_requests_total{name="search_metric_names"}`)
vmselectSearchRequests = metrics.NewCounter(`vm_vmselect_rpc_requests_total{name="search"}`)
vmselectMetricBlocksRead = metrics.NewCounter(`vm_vmselect_metric_blocks_read_total`)
vmselectMetricRowsRead = metrics.NewCounter(`vm_vmselect_metric_rows_read_total`)
)
func (ctx *vmselectRequestCtx) setupTfss() error {

View file

@ -205,6 +205,8 @@ or [an alternative dashboard for VictoriaMetrics cluster](https://grafana.com/gr
- `metrics/find` - searches Graphite metrics. See [these docs](https://graphite-api.readthedocs.io/en/latest/api.html#metrics-find).
- `metrics/expand` - expands Graphite metrics. See [these docs](https://graphite-api.readthedocs.io/en/latest/api.html#metrics-expand).
- `metrics/index.json` - returns all the metric names. See [these docs](https://graphite-api.readthedocs.io/en/latest/api.html#metrics-index-json).
- `tags/tagSeries` - registers time series. See [these docs](https://graphite.readthedocs.io/en/stable/tags.html#adding-series-to-the-tagdb).
- `tags/tagMultiSeries` - register multiple time series. See [these docs](https://graphite.readthedocs.io/en/stable/tags.html#adding-series-to-the-tagdb).
- `tags` - returns tag names. See [these docs](https://graphite.readthedocs.io/en/stable/tags.html#exploring-tags).
- `tags/<tag_name>` - returns tag values for the given `<tag_name>`. See [these docs](https://graphite.readthedocs.io/en/stable/tags.html#exploring-tags).
- `tags/findSeries` - returns series matching the given `expr`. See [these docs](https://graphite.readthedocs.io/en/stable/tags.html#exploring-tags).