lib/vmselectapi: pass maxSuffixes arg to tagValueSuffixes RPC call

This commit is contained in:
Aliaksandr Valialkin 2022-07-06 00:31:41 +03:00
parent f4df43f7cc
commit 270e555f47
No known key found for this signature in database
GPG key ID: A72BEC6CD3D0DED1
3 changed files with 24 additions and 11 deletions

View file

@ -1,6 +1,7 @@
package graphite
import (
"flag"
"fmt"
"net/http"
"regexp"
@ -18,6 +19,8 @@ import (
"github.com/VictoriaMetrics/metrics"
)
var maxTagValueSuffixes = flag.Int("search.maxTagValueSuffixesPerSearch", 100e3, "The maximum number of tag value suffixes returned from /metrics/find")
// MetricsFindHandler implements /metrics/find handler.
//
// See https://graphite-api.readthedocs.io/en/latest/api.html#metrics-find
@ -228,7 +231,7 @@ func metricsFind(at *auth.Token, denyPartialResponse bool, tr storage.TimeRange,
n := strings.IndexAny(qTail, "*{[")
if n < 0 {
query := qHead + qTail
suffixes, isPartial, err := netstorage.TagValueSuffixes(nil, at.AccountID, at.ProjectID, denyPartialResponse, tr, label, query, delimiter, deadline)
suffixes, isPartial, err := netstorage.TagValueSuffixes(nil, at.AccountID, at.ProjectID, denyPartialResponse, tr, label, query, delimiter, *maxTagValueSuffixes, deadline)
if err != nil {
return nil, false, err
}
@ -248,7 +251,7 @@ func metricsFind(at *auth.Token, denyPartialResponse bool, tr storage.TimeRange,
}
if n == len(qTail)-1 && strings.HasSuffix(qTail, "*") {
query := qHead + qTail[:len(qTail)-1]
suffixes, isPartial, err := netstorage.TagValueSuffixes(nil, at.AccountID, at.ProjectID, denyPartialResponse, tr, label, query, delimiter, deadline)
suffixes, isPartial, err := netstorage.TagValueSuffixes(nil, at.AccountID, at.ProjectID, denyPartialResponse, tr, label, query, delimiter, *maxTagValueSuffixes, deadline)
if err != nil {
return nil, false, err
}

View file

@ -863,8 +863,8 @@ func GraphiteTagValues(qt *querytracer.Tracer, accountID, projectID uint32, deny
//
// It can be used for implementing https://graphite-api.readthedocs.io/en/latest/api.html#metrics-find
func TagValueSuffixes(qt *querytracer.Tracer, accountID, projectID uint32, denyPartialResponse bool, tr storage.TimeRange, tagKey, tagValuePrefix string,
delimiter byte, deadline searchutils.Deadline) ([]string, bool, error) {
qt = qt.NewChild("get tag value suffixes for tagKey=%s, tagValuePrefix=%s, timeRange=%s", tagKey, tagValuePrefix, &tr)
delimiter byte, maxSuffixes int, deadline searchutils.Deadline) ([]string, bool, error) {
qt = qt.NewChild("get tag value suffixes for tagKey=%s, tagValuePrefix=%s, maxSuffixes=%d, timeRange=%s", tagKey, tagValuePrefix, maxSuffixes, &tr)
defer qt.Done()
if deadline.Exceeded() {
return nil, false, fmt.Errorf("timeout exceeded before starting the query processing: %s", deadline.String())
@ -876,7 +876,7 @@ func TagValueSuffixes(qt *querytracer.Tracer, accountID, projectID uint32, denyP
}
snr := startStorageNodesRequest(qt, denyPartialResponse, func(qt *querytracer.Tracer, idx int, sn *storageNode) interface{} {
sn.tagValueSuffixesRequests.Inc()
suffixes, err := sn.getTagValueSuffixes(qt, accountID, projectID, tr, tagKey, tagValuePrefix, delimiter, deadline)
suffixes, err := sn.getTagValueSuffixes(qt, accountID, projectID, tr, tagKey, tagValuePrefix, delimiter, maxSuffixes, deadline)
if err != nil {
sn.tagValueSuffixesErrors.Inc()
err = fmt.Errorf("cannot get tag value suffixes for tr=%s, tagKey=%q, tagValuePrefix=%q, delimiter=%c from vmstorage %s: %w",
@ -1538,17 +1538,17 @@ func (sn *storageNode) getLabelValues(qt *querytracer.Tracer, labelName string,
}
func (sn *storageNode) getTagValueSuffixes(qt *querytracer.Tracer, accountID, projectID uint32, tr storage.TimeRange, tagKey, tagValuePrefix string,
delimiter byte, deadline searchutils.Deadline) ([]string, error) {
delimiter byte, maxSuffixes int, deadline searchutils.Deadline) ([]string, error) {
var suffixes []string
f := func(bc *handshake.BufferedConn) error {
ss, err := sn.getTagValueSuffixesOnConn(bc, accountID, projectID, tr, tagKey, tagValuePrefix, delimiter)
ss, err := sn.getTagValueSuffixesOnConn(bc, accountID, projectID, tr, tagKey, tagValuePrefix, delimiter, maxSuffixes)
if err != nil {
return err
}
suffixes = ss
return nil
}
if err := sn.execOnConnWithPossibleRetry(qt, "tagValueSuffixes_v3", f, deadline); err != nil {
if err := sn.execOnConnWithPossibleRetry(qt, "tagValueSuffixes_v4", f, deadline); err != nil {
return nil, err
}
return suffixes, nil
@ -1891,7 +1891,7 @@ func readLabelValues(buf []byte, bc *handshake.BufferedConn) ([]string, []byte,
}
func (sn *storageNode) getTagValueSuffixesOnConn(bc *handshake.BufferedConn, accountID, projectID uint32,
tr storage.TimeRange, tagKey, tagValuePrefix string, delimiter byte) ([]string, error) {
tr storage.TimeRange, tagKey, tagValuePrefix string, delimiter byte, maxSuffixes int) ([]string, error) {
// Send the request to sn.
if err := sendAccountIDProjectID(bc, accountID, projectID); err != nil {
return nil, err
@ -1908,6 +1908,9 @@ func (sn *storageNode) getTagValueSuffixesOnConn(bc *handshake.BufferedConn, acc
if err := writeByte(bc, delimiter); err != nil {
return nil, fmt.Errorf("cannot send delimiter=%c to conn: %w", delimiter, err)
}
if err := writeLimit(bc, maxSuffixes); err != nil {
return nil, fmt.Errorf("cannot send maxSuffixes=%d to conn: %w", maxSuffixes, err)
}
if err := bc.Flush(); err != nil {
return nil, fmt.Errorf("cannot flush request to conn: %w", err)
}

View file

@ -480,7 +480,7 @@ func (s *Server) processRPC(ctx *vmselectRequestCtx, rpcName string) error {
return s.processSearchMetricNames(ctx)
case "labelValues_v5":
return s.processLabelValues(ctx)
case "tagValueSuffixes_v3":
case "tagValueSuffixes_v4":
return s.processTagValueSuffixes(ctx)
case "labelNames_v5":
return s.processLabelNames(ctx)
@ -701,9 +701,16 @@ func (s *Server) processTagValueSuffixes(ctx *vmselectRequestCtx) error {
if err != nil {
return fmt.Errorf("cannot read delimiter: %w", err)
}
maxSuffixes, err := ctx.readLimit()
if err != nil {
return fmt.Errorf("cannot read maxTagValueSuffixes: %d", err)
}
if maxSuffixes <= 0 || maxSuffixes > s.limits.MaxTagValueSuffixes {
maxSuffixes = s.limits.MaxTagValueSuffixes
}
// Execute the request
suffixes, err := s.api.TagValueSuffixes(ctx.qt, accountID, projectID, tr, tagKey, tagValuePrefix, delimiter, s.limits.MaxTagValueSuffixes, ctx.deadline)
suffixes, err := s.api.TagValueSuffixes(ctx.qt, accountID, projectID, tr, tagKey, tagValuePrefix, delimiter, maxSuffixes, ctx.deadline)
if err != nil {
return ctx.writeErrorMessage(err)
}