diff --git a/README.md b/README.md index 3c9e493fd..b3524a3c4 100644 --- a/README.md +++ b/README.md @@ -207,6 +207,7 @@ or [an alternative dashboard for VictoriaMetrics cluster](https://grafana.com/gr - `metrics/index.json` - returns all the metric names. See [these docs](https://graphite-api.readthedocs.io/en/latest/api.html#metrics-index-json). - `tags` - returns tag names. See [these docs](https://graphite.readthedocs.io/en/stable/tags.html#exploring-tags). - `tags/` - returns tag values for the given ``. See [these docs](https://graphite.readthedocs.io/en/stable/tags.html#exploring-tags). + - `tags/findSeries` - returns series matching the given `expr`. See [these docs](https://graphite.readthedocs.io/en/stable/tags.html#exploring-tags). * URL for time series deletion: `http://:8481/delete//prometheus/api/v1/admin/tsdb/delete_series?match[]=`. Note that the `delete_series` handler should be used only in exceptional cases such as deletion of accidentally ingested incorrect time series. It shouldn't diff --git a/app/vmselect/graphite/tags_api.go b/app/vmselect/graphite/tags_api.go index 7185a6b9a..6137fd9ae 100644 --- a/app/vmselect/graphite/tags_api.go +++ b/app/vmselect/graphite/tags_api.go @@ -3,16 +3,98 @@ package graphite import ( "fmt" "net/http" + "sort" "strconv" + "strings" "time" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/bufferedwriter" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/netstorage" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/searchutils" "github.com/VictoriaMetrics/VictoriaMetrics/lib/auth" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/storage" "github.com/VictoriaMetrics/metrics" ) +// TagsFindSeriesHandler implements /tags/findSeries endpoint from Graphite Tags API. +// +// See https://graphite.readthedocs.io/en/stable/tags.html#exploring-tags +func TagsFindSeriesHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter, r *http.Request) error { + deadline := searchutils.GetDeadlineForQuery(r, startTime) + if err := r.ParseForm(); err != nil { + return fmt.Errorf("cannot parse form values: %w", err) + } + limit, err := getInt(r, "limit") + if err != nil { + return err + } + exprs := r.Form["expr"] + if len(exprs) == 0 { + return fmt.Errorf("expecting at least one `expr` query arg") + } + + // Convert exprs to []storage.TagFilter + tfs := make([]storage.TagFilter, 0, len(exprs)) + for _, expr := range exprs { + tf, err := parseFilterExpr(expr) + if err != nil { + return fmt.Errorf("cannot parse `expr` query arg: %w", err) + } + tfs = append(tfs, *tf) + } + + // Send the request to storage + ct := time.Now().UnixNano() / 1e6 + sq := &storage.SearchQuery{ + MinTimestamp: 0, + MaxTimestamp: ct, + TagFilterss: [][]storage.TagFilter{tfs}, + } + denyPartialResponse := searchutils.GetDenyPartialResponse(r) + mns, isPartial, err := netstorage.SearchMetricNames(at, denyPartialResponse, sq, deadline) + if err != nil { + return fmt.Errorf("cannot fetch metric names for %q: %w", sq, err) + } + paths := getCanonicalPaths(mns) + if limit > 0 && limit < len(paths) { + paths = paths[:limit] + } + + w.Header().Set("Content-Type", "application/json; charset=utf-8") + bw := bufferedwriter.Get(w) + defer bufferedwriter.Put(bw) + WriteTagsFindSeriesResponse(bw, isPartial, paths) + if err := bw.Flush(); err != nil { + return err + } + tagsFindSeriesDuration.UpdateDuration(startTime) + return nil +} + +func getCanonicalPaths(mns []storage.MetricName) []string { + paths := make([]string, 0, len(mns)) + var b []byte + var tags []storage.Tag + for _, mn := range mns { + b = append(b[:0], mn.MetricGroup...) + tags = append(tags[:0], mn.Tags...) + sort.Slice(tags, func(i, j int) bool { + return string(tags[i].Key) < string(tags[j].Key) + }) + for _, tag := range tags { + b = append(b, ';') + b = append(b, tag.Key...) + b = append(b, '=') + b = append(b, tag.Value...) + } + paths = append(paths, string(b)) + } + sort.Strings(paths) + return paths +} + +var tagsFindSeriesDuration = metrics.NewSummary(`vm_request_duration_seconds{path="/tags/findSeries"}`) + // TagValuesHandler implements /tags/ endpoint from Graphite Tags API. // // See https://graphite.readthedocs.io/en/stable/tags.html#exploring-tags @@ -88,3 +170,31 @@ func getInt(r *http.Request, argName string) (int, error) { } return n, nil } + +func parseFilterExpr(s string) (*storage.TagFilter, error) { + n := strings.Index(s, "=") + if n < 0 { + return nil, fmt.Errorf("missing tag value in filter expression %q", s) + } + tagName := s[:n] + tagValue := s[n+1:] + isNegative := false + if strings.HasSuffix(tagName, "!") { + isNegative = true + tagName = tagName[:len(tagName)-1] + } + if tagName == "name" { + tagName = "" + } + isRegexp := false + if strings.HasPrefix(tagValue, "~") { + isRegexp = true + tagValue = "^(?:" + tagValue[1:] + ").*" + } + return &storage.TagFilter{ + Key: []byte(tagName), + Value: []byte(tagValue), + IsNegative: isNegative, + IsRegexp: isRegexp, + }, nil +} diff --git a/app/vmselect/graphite/tags_find_series_response.qtpl b/app/vmselect/graphite/tags_find_series_response.qtpl new file mode 100644 index 000000000..7a25d2fd8 --- /dev/null +++ b/app/vmselect/graphite/tags_find_series_response.qtpl @@ -0,0 +1,12 @@ +{% stripspace %} + +{% func TagsFindSeriesResponse(isPartial bool, paths []string) %} +[ + {% for i, path := range paths %} + {%q= path %} + {% if i+1 < len(paths) %},{% endif %} + {% endfor %} +] +{% endfunc %} + +{% endstripspace %} diff --git a/app/vmselect/graphite/tags_find_series_response.qtpl.go b/app/vmselect/graphite/tags_find_series_response.qtpl.go new file mode 100644 index 000000000..335c8a661 --- /dev/null +++ b/app/vmselect/graphite/tags_find_series_response.qtpl.go @@ -0,0 +1,65 @@ +// Code generated by qtc from "tags_find_series_response.qtpl". DO NOT EDIT. +// See https://github.com/valyala/quicktemplate for details. + +//line app/vmselect/graphite/tags_find_series_response.qtpl:3 +package graphite + +//line app/vmselect/graphite/tags_find_series_response.qtpl:3 +import ( + qtio422016 "io" + + qt422016 "github.com/valyala/quicktemplate" +) + +//line app/vmselect/graphite/tags_find_series_response.qtpl:3 +var ( + _ = qtio422016.Copy + _ = qt422016.AcquireByteBuffer +) + +//line app/vmselect/graphite/tags_find_series_response.qtpl:3 +func StreamTagsFindSeriesResponse(qw422016 *qt422016.Writer, isPartial bool, paths []string) { +//line app/vmselect/graphite/tags_find_series_response.qtpl:3 + qw422016.N().S(`[`) +//line app/vmselect/graphite/tags_find_series_response.qtpl:5 + for i, path := range paths { +//line app/vmselect/graphite/tags_find_series_response.qtpl:6 + qw422016.N().Q(path) +//line app/vmselect/graphite/tags_find_series_response.qtpl:7 + if i+1 < len(paths) { +//line app/vmselect/graphite/tags_find_series_response.qtpl:7 + qw422016.N().S(`,`) +//line app/vmselect/graphite/tags_find_series_response.qtpl:7 + } +//line app/vmselect/graphite/tags_find_series_response.qtpl:8 + } +//line app/vmselect/graphite/tags_find_series_response.qtpl:8 + qw422016.N().S(`]`) +//line app/vmselect/graphite/tags_find_series_response.qtpl:10 +} + +//line app/vmselect/graphite/tags_find_series_response.qtpl:10 +func WriteTagsFindSeriesResponse(qq422016 qtio422016.Writer, isPartial bool, paths []string) { +//line app/vmselect/graphite/tags_find_series_response.qtpl:10 + qw422016 := qt422016.AcquireWriter(qq422016) +//line app/vmselect/graphite/tags_find_series_response.qtpl:10 + StreamTagsFindSeriesResponse(qw422016, isPartial, paths) +//line app/vmselect/graphite/tags_find_series_response.qtpl:10 + qt422016.ReleaseWriter(qw422016) +//line app/vmselect/graphite/tags_find_series_response.qtpl:10 +} + +//line app/vmselect/graphite/tags_find_series_response.qtpl:10 +func TagsFindSeriesResponse(isPartial bool, paths []string) string { +//line app/vmselect/graphite/tags_find_series_response.qtpl:10 + qb422016 := qt422016.AcquireByteBuffer() +//line app/vmselect/graphite/tags_find_series_response.qtpl:10 + WriteTagsFindSeriesResponse(qb422016, isPartial, paths) +//line app/vmselect/graphite/tags_find_series_response.qtpl:10 + qs422016 := string(qb422016.B) +//line app/vmselect/graphite/tags_find_series_response.qtpl:10 + qt422016.ReleaseByteBuffer(qb422016) +//line app/vmselect/graphite/tags_find_series_response.qtpl:10 + return qs422016 +//line app/vmselect/graphite/tags_find_series_response.qtpl:10 +} diff --git a/app/vmselect/main.go b/app/vmselect/main.go index 46cc83371..626cb6712 100644 --- a/app/vmselect/main.go +++ b/app/vmselect/main.go @@ -209,7 +209,7 @@ func selectHandler(startTime time.Time, w http.ResponseWriter, r *http.Request, return true } } - if strings.HasPrefix(p.Suffix, "graphite/tags/") { + if strings.HasPrefix(p.Suffix, "graphite/tags/") && p.Suffix != "graphite/tags/findSeries" { tagName := p.Suffix[len("graphite/tags/"):] graphiteTagValuesRequests.Inc() if err := graphite.TagValuesHandler(startTime, at, tagName, w, r); err != nil { @@ -354,6 +354,14 @@ func selectHandler(startTime time.Time, w http.ResponseWriter, r *http.Request, return true } return true + case "graphite/tags/findSeries": + graphiteTagsFindSeriesRequests.Inc() + if err := graphite.TagsFindSeriesHandler(startTime, at, w, r); err != nil { + graphiteTagsFindSeriesErrors.Inc() + httpserver.Errorf(w, r, "error in %q: %s", r.URL.Path, err) + return true + } + return true case "prometheus/api/v1/rules": // Return dumb placeholder rulesRequests.Inc() @@ -463,6 +471,9 @@ var ( graphiteTagValuesRequests = metrics.NewCounter(`vm_http_requests_total{path="/select/{}/graphite/tags/"}`) graphiteTagValuesErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/select/{}/graphite/tags/"}`) + graphiteTagsFindSeriesRequests = metrics.NewCounter(`vm_http_requests_total{path="/select/{}/graphite/tags/findSeries"}`) + graphiteTagsFindSeriesErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/select/{}/graphite/tags/findSeries"}`) + rulesRequests = metrics.NewCounter(`vm_http_requests_total{path="/select/{}/prometheus/api/v1/rules"}`) alertsRequests = metrics.NewCounter(`vm_http_requests_total{path="/select/{}/prometheus/api/v1/alerts"}`) metadataRequests = metrics.NewCounter(`vm_http_requests_total{path="/select/{}/prometheus/api/v1/metadata"}`) diff --git a/app/vmselect/netstorage/netstorage.go b/app/vmselect/netstorage/netstorage.go index 4587ec1d3..6d141c0e6 100644 --- a/app/vmselect/netstorage/netstorage.go +++ b/app/vmselect/netstorage/netstorage.go @@ -1151,7 +1151,6 @@ func GetSeriesCount(at *auth.Token, denyPartialResponse bool, deadline searchuti } isPartial = true } - return n, isPartial, nil } @@ -1237,6 +1236,81 @@ func ExportBlocks(at *auth.Token, sq *storage.SearchQuery, deadline searchutils. return nil } +// SearchMetricNames returns all the metric names matching sq until the given deadline. +func SearchMetricNames(at *auth.Token, denyPartialResponse bool, sq *storage.SearchQuery, deadline searchutils.Deadline) ([]storage.MetricName, bool, error) { + if deadline.Exceeded() { + return nil, false, fmt.Errorf("timeout exceeded before starting to search metric names: %s", deadline.String()) + } + requestData := sq.Marshal(nil) + + // Send the query to all the storage nodes in parallel. + type nodeResult struct { + metricNames [][]byte + err error + } + resultsCh := make(chan nodeResult, len(storageNodes)) + for _, sn := range storageNodes { + go func(sn *storageNode) { + sn.searchMetricNamesRequests.Inc() + metricNames, err := sn.processSearchMetricNames(requestData, deadline) + if err != nil { + sn.searchMetricNamesRequestErrors.Inc() + err = fmt.Errorf("cannot search metric names on vmstorage %s: %w", sn.connPool.Addr(), err) + } + resultsCh <- nodeResult{ + metricNames: metricNames, + err: err, + } + }(sn) + } + + // Collect results. + var errors []error + metricNames := make(map[string]struct{}) + for i := 0; i < len(storageNodes); i++ { + // There is no need in timer here, since all the goroutines executing + // sn.processSearchQuery must be finished until the deadline. + nr := <-resultsCh + if nr.err != nil { + errors = append(errors, nr.err) + continue + } + for _, metricName := range nr.metricNames { + metricNames[string(metricName)] = struct{}{} + } + } + isPartial := false + if len(errors) > 0 { + if len(errors) == len(storageNodes) { + // Return only the first error, since it has no sense in returning all errors. + return nil, false, errors[0] + } + + // Just return partial results. + // This allows gracefully degrade vmselect in the case + // if certain storageNodes are temporarily unavailable. + // Do not return the error, since it may spam logs on busy vmselect + // serving high amounts of requests. + partialSearchResults.Inc() + if denyPartialResponse { + return nil, true, errors[0] + } + isPartial = true + } + + // Unmarshal metricNames + mns := make([]storage.MetricName, len(metricNames)) + i := 0 + for metricName := range metricNames { + mn := &mns[i] + if err := mn.Unmarshal(bytesutil.ToUnsafeBytes(metricName)); err != nil { + return nil, false, fmt.Errorf("cannot unmarshal metric name obtained from vmstorage: %w; metricName=%q", err, metricName) + } + i++ + } + return mns, isPartial, nil +} + // ProcessSearchQuery performs sq until the given deadline. // // Results.RunParallel or Results.Cancel must be called on the returned Results. @@ -1399,9 +1473,15 @@ type storageNode struct { // The number of errors during requests to seriesCount. seriesCountRequestErrors *metrics.Counter + // The number of 'search metric names' requests to storageNode. + searchMetricNamesRequests *metrics.Counter + // The number of search requests to storageNode. searchRequests *metrics.Counter + // The number of 'search metric names' errors to storageNode. + searchMetricNamesRequestErrors *metrics.Counter + // The number of search request errors to storageNode. searchRequestErrors *metrics.Counter @@ -1593,6 +1673,25 @@ func (sn *storageNode) getSeriesCount(accountID, projectID uint32, deadline sear return n, nil } +func (sn *storageNode) processSearchMetricNames(requestData []byte, deadline searchutils.Deadline) ([][]byte, error) { + var metricNames [][]byte + f := func(bc *handshake.BufferedConn) error { + mns, err := sn.processSearchMetricNamesOnConn(bc, requestData) + if err != nil { + return err + } + metricNames = mns + return nil + } + if err := sn.execOnConn("searchMetricNames_v1", f, deadline); err != nil { + // Try again before giving up. + if err = sn.execOnConn("searchMetricNames_v1", f, deadline); err != nil { + return nil, err + } + } + return metricNames, nil +} + func (sn *storageNode) processSearchQuery(requestData []byte, fetchData bool, processBlock func(mb *storage.MetricBlock) error, deadline searchutils.Deadline) error { var blocksRead int f := func(bc *handshake.BufferedConn) error { @@ -2078,6 +2177,42 @@ const maxMetricBlockSize = 1024 * 1024 // from vmstorage. const maxErrorMessageSize = 64 * 1024 +func (sn *storageNode) processSearchMetricNamesOnConn(bc *handshake.BufferedConn, requestData []byte) ([][]byte, error) { + // Send the requst to sn. + if err := writeBytes(bc, requestData); err != nil { + return nil, fmt.Errorf("cannot write requestData: %w", err) + } + if err := bc.Flush(); err != nil { + return nil, fmt.Errorf("cannot flush requestData to conn: %w", err) + } + + // Read response error. + buf, err := readBytes(nil, bc, maxErrorMessageSize) + if err != nil { + return nil, fmt.Errorf("cannot read error message: %w", err) + } + if len(buf) > 0 { + return nil, newErrRemote(buf) + } + + // Read metricNames from response. + metricNamesCount, err := readUint64(bc) + if err != nil { + return nil, fmt.Errorf("cannot read metricNamesCount: %w", err) + } + metricNames := make([][]byte, 0, metricNamesCount) + for i := int64(0); i < int64(metricNamesCount); i++ { + buf, err = readBytes(buf[:0], bc, maxMetricNameSize) + if err != nil { + return nil, fmt.Errorf("cannot read metricName #%d: %w", i+1, err) + } + metricNames[i] = append(metricNames[i][:0], buf...) + } + return metricNames, nil +} + +const maxMetricNameSize = 64 * 1024 + func (sn *storageNode) processSearchQueryOnConn(bc *handshake.BufferedConn, requestData []byte, fetchData bool, processBlock func(mb *storage.MetricBlock) error) (int, error) { // Send the request to sn. if err := writeBytes(bc, requestData); err != nil { @@ -2090,11 +2225,8 @@ func (sn *storageNode) processSearchQueryOnConn(bc *handshake.BufferedConn, requ return 0, fmt.Errorf("cannot flush requestData to conn: %w", err) } - var err error - var buf []byte - // Read response error. - buf, err = readBytes(buf[:0], bc, maxErrorMessageSize) + buf, err := readBytes(nil, bc, maxErrorMessageSize) if err != nil { return 0, fmt.Errorf("cannot read error message: %w", err) } @@ -2248,7 +2380,9 @@ func InitStorageNodes(addrs []string) { tsdbStatusRequestErrors: metrics.NewCounter(fmt.Sprintf(`vm_request_errors_total{action="tsdbStatus", type="rpcClient", name="vmselect", addr=%q}`, addr)), seriesCountRequests: metrics.NewCounter(fmt.Sprintf(`vm_requests_total{action="seriesCount", type="rpcClient", name="vmselect", addr=%q}`, addr)), seriesCountRequestErrors: metrics.NewCounter(fmt.Sprintf(`vm_request_errors_total{action="seriesCount", type="rpcClient", name="vmselect", addr=%q}`, addr)), + searchMetricNamesRequests: metrics.NewCounter(fmt.Sprintf(`vm_requests_total{action="searchMetricNames", type="rpcClient", name="vmselect", addr=%q}`, addr)), searchRequests: metrics.NewCounter(fmt.Sprintf(`vm_requests_total{action="search", type="rpcClient", name="vmselect", addr=%q}`, addr)), + searchMetricNamesRequestErrors: metrics.NewCounter(fmt.Sprintf(`vm_request_errors_total{action="searchMetricNames", type="rpcClient", name="vmselect", addr=%q}`, addr)), searchRequestErrors: metrics.NewCounter(fmt.Sprintf(`vm_request_errors_total{action="search", type="rpcClient", name="vmselect", addr=%q}`, addr)), metricBlocksRead: metrics.NewCounter(fmt.Sprintf(`vm_metric_blocks_read_total{name="vmselect", addr=%q}`, addr)), metricRowsRead: metrics.NewCounter(fmt.Sprintf(`vm_metric_rows_read_total{name="vmselect", addr=%q}`, addr)), diff --git a/app/vmselect/prometheus/prometheus.go b/app/vmselect/prometheus/prometheus.go index 3513920be..111231b92 100644 --- a/app/vmselect/prometheus/prometheus.go +++ b/app/vmselect/prometheus/prometheus.go @@ -952,6 +952,34 @@ func SeriesHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter, r TagFilterss: tagFilterss, } denyPartialResponse := searchutils.GetDenyPartialResponse(r) + if end-start > 24*3600*1000 { + // It is cheaper to call SearchMetricNames on time ranges exceeding a day. + mns, isPartial, err := netstorage.SearchMetricNames(at, denyPartialResponse, sq, deadline) + if err != nil { + return fmt.Errorf("cannot fetch time series for %q: %w", sq, err) + } + w.Header().Set("Content-Type", "application/json; charset=utf-8") + bw := bufferedwriter.Get(w) + defer bufferedwriter.Put(bw) + resultsCh := make(chan *quicktemplate.ByteBuffer) + doneCh := make(chan struct{}) + go func() { + for i := range mns { + bb := quicktemplate.AcquireByteBuffer() + writemetricNameObject(bb, &mns[i]) + resultsCh <- bb + } + close(doneCh) + }() + // WriteSeriesResponse must consume all the data from resultsCh. + WriteSeriesResponse(bw, isPartial, resultsCh) + if err := bw.Flush(); err != nil { + return err + } + <-doneCh + seriesDuration.UpdateDuration(startTime) + return nil + } rss, isPartial, err := netstorage.ProcessSearchQuery(at, denyPartialResponse, sq, false, deadline) if err != nil { return fmt.Errorf("cannot fetch data for %q: %w", sq, err) diff --git a/app/vmstorage/transport/server.go b/app/vmstorage/transport/server.go index 41711634d..845bcd564 100644 --- a/app/vmstorage/transport/server.go +++ b/app/vmstorage/transport/server.go @@ -538,6 +538,20 @@ func (ctx *vmselectRequestCtx) readAccountIDProjectID() (uint32, uint32, error) return accountID, projectID, nil } +func (ctx *vmselectRequestCtx) readSearchQuery() error { + if err := ctx.readDataBufBytes(maxSearchQuerySize); err != nil { + return fmt.Errorf("cannot read searchQuery: %w", err) + } + tail, err := ctx.sq.Unmarshal(ctx.dataBuf) + if err != nil { + return fmt.Errorf("cannot unmarshal SearchQuery: %w", err) + } + if len(tail) > 0 { + return fmt.Errorf("unexpected non-zero tail left after unmarshaling SearchQuery: (len=%d) %q", len(tail), tail) + } + return nil +} + func (ctx *vmselectRequestCtx) readDataBufBytes(maxDataSize int) error { ctx.sizeBuf = bytesutil.Resize(ctx.sizeBuf, 8) if _, err := io.ReadFull(ctx.bc, ctx.sizeBuf); err != nil { @@ -663,7 +677,9 @@ func (s *Server) processVMSelectRequest(ctx *vmselectRequestCtx) error { switch rpcName { case "search_v4": - return s.processVMSelectSearchQuery(ctx) + return s.processVMSelectSearch(ctx) + case "searchMetricNames_v1": + return s.processVMSelectSearchMetricNames(ctx) case "labelValuesOnTimeRange_v1": return s.processVMSelectLabelValuesOnTimeRange(ctx) case "labelValues_v2": @@ -1061,19 +1077,52 @@ func writeTopHeapEntries(ctx *vmselectRequestCtx, a []storage.TopHeapEntry) erro // maxSearchQuerySize is the maximum size of SearchQuery packet in bytes. const maxSearchQuerySize = 1024 * 1024 -func (s *Server) processVMSelectSearchQuery(ctx *vmselectRequestCtx) error { - vmselectSearchQueryRequests.Inc() +func (s *Server) processVMSelectSearchMetricNames(ctx *vmselectRequestCtx) error { + vmselectSearchMetricNamesRequests.Inc() - // Read search query. - if err := ctx.readDataBufBytes(maxSearchQuerySize); err != nil { - return fmt.Errorf("cannot read searchQuery: %w", err) + // Read request. + if err := ctx.readSearchQuery(); err != nil { + return err } - tail, err := ctx.sq.Unmarshal(ctx.dataBuf) + + // Search metric names. + if err := ctx.setupTfss(); err != nil { + return ctx.writeErrorMessage(err) + } + tr := storage.TimeRange{ + MinTimestamp: ctx.sq.MinTimestamp, + MaxTimestamp: ctx.sq.MaxTimestamp, + } + mns, err := s.storage.SearchMetricNames(ctx.sq.AccountID, ctx.sq.ProjectID, ctx.tfss, tr, *maxMetricsPerSearch, ctx.deadline) if err != nil { - return fmt.Errorf("cannot unmarshal SearchQuery: %w", err) + return ctx.writeErrorMessage(err) } - if len(tail) > 0 { - return fmt.Errorf("unexpected non-zero tail left after unmarshaling SearchQuery: (len=%d) %q", len(tail), tail) + + // Send empty error message to vmselect. + if err := ctx.writeString(""); err != nil { + return fmt.Errorf("cannot send empty error message: %w", err) + } + + // Send response. + metricNamesCount := len(mns) + if err := ctx.writeUint64(uint64(metricNamesCount)); err != nil { + return fmt.Errorf("cannot send metricNamesCount: %w", err) + } + for i, mn := range mns { + ctx.dataBuf = mn.Marshal(ctx.dataBuf[:0]) + if err := ctx.writeDataBufBytes(); err != nil { + return fmt.Errorf("cannot send metricName #%d: %w", i+1, err) + } + } + return nil +} + +func (s *Server) processVMSelectSearch(ctx *vmselectRequestCtx) error { + vmselectSearchRequests.Inc() + + // Read request. + if err := ctx.readSearchQuery(); err != nil { + return err } fetchData, err := ctx.readBool() if err != nil { @@ -1153,7 +1202,8 @@ var ( vmselectLabelEntriesRequests = metrics.NewCounter("vm_vmselect_label_entries_requests_total") vmselectSeriesCountRequests = metrics.NewCounter("vm_vmselect_series_count_requests_total") vmselectTSDBStatusRequests = metrics.NewCounter("vm_vmselect_tsdb_status_requests_total") - vmselectSearchQueryRequests = metrics.NewCounter("vm_vmselect_search_query_requests_total") + vmselectSearchMetricNamesRequests = metrics.NewCounter("vm_vmselect_search_metric_names_requests_total") + vmselectSearchRequests = metrics.NewCounter("vm_vmselect_search_requests_total") vmselectMetricBlocksRead = metrics.NewCounter("vm_vmselect_metric_blocks_read_total") vmselectMetricRowsRead = metrics.NewCounter("vm_vmselect_metric_rows_read_total") ) diff --git a/docs/Cluster-VictoriaMetrics.md b/docs/Cluster-VictoriaMetrics.md index 3c9e493fd..b3524a3c4 100644 --- a/docs/Cluster-VictoriaMetrics.md +++ b/docs/Cluster-VictoriaMetrics.md @@ -207,6 +207,7 @@ or [an alternative dashboard for VictoriaMetrics cluster](https://grafana.com/gr - `metrics/index.json` - returns all the metric names. See [these docs](https://graphite-api.readthedocs.io/en/latest/api.html#metrics-index-json). - `tags` - returns tag names. See [these docs](https://graphite.readthedocs.io/en/stable/tags.html#exploring-tags). - `tags/` - returns tag values for the given ``. See [these docs](https://graphite.readthedocs.io/en/stable/tags.html#exploring-tags). + - `tags/findSeries` - returns series matching the given `expr`. See [these docs](https://graphite.readthedocs.io/en/stable/tags.html#exploring-tags). * URL for time series deletion: `http://:8481/delete//prometheus/api/v1/admin/tsdb/delete_series?match[]=`. Note that the `delete_series` handler should be used only in exceptional cases such as deletion of accidentally ingested incorrect time series. It shouldn't diff --git a/docs/Single-server-VictoriaMetrics.md b/docs/Single-server-VictoriaMetrics.md index 859d44967..0c722fc4f 100644 --- a/docs/Single-server-VictoriaMetrics.md +++ b/docs/Single-server-VictoriaMetrics.md @@ -550,6 +550,7 @@ VictoriaMetrics supports the following handlers from [Graphite Tags API](https:/ * [/tags/tagMultiSeries](https://graphite.readthedocs.io/en/stable/tags.html#adding-series-to-the-tagdb) * [/tags](https://graphite.readthedocs.io/en/stable/tags.html#exploring-tags) * [/tags/tag_name](https://graphite.readthedocs.io/en/stable/tags.html#exploring-tags) +* [/tags/findSeries](https://graphite.readthedocs.io/en/stable/tags.html#exploring-tags) ### How to build from sources diff --git a/lib/storage/storage.go b/lib/storage/storage.go index a89b4c85b..a4ca0c234 100644 --- a/lib/storage/storage.go +++ b/lib/storage/storage.go @@ -859,6 +859,44 @@ func nextRetentionDuration(retentionMonths int) time.Duration { return deadline.Sub(t) } +// SearchMetricNames returns metric names matching the given tfss on the given tr. +func (s *Storage) SearchMetricNames(accountID, projectID uint32, tfss []*TagFilters, tr TimeRange, maxMetrics int, deadline uint64) ([]MetricName, error) { + tsids, err := s.searchTSIDs(tfss, tr, maxMetrics, deadline) + if err != nil { + return nil, err + } + if len(tsids) == 0 { + return nil, nil + } + if err = s.prefetchMetricNames(tsids, deadline); err != nil { + return nil, err + } + idb := s.idb() + is := idb.getIndexSearch(accountID, projectID, deadline) + defer idb.putIndexSearch(is) + mns := make([]MetricName, 0, len(tsids)) + var metricName []byte + for i := range tsids { + metricID := tsids[i].MetricID + var err error + metricName, err = is.searchMetricName(metricName[:0], metricID) + if err != nil { + if err == io.EOF { + // Skip missing metricName for metricID. + // It should be automatically fixed. See indexDB.searchMetricName for details. + continue + } + return nil, fmt.Errorf("error when searching metricName for metricID=%d: %w", metricID, err) + } + mns = mns[:len(mns)+1] + mn := &mns[len(mns)-1] + if err = mn.Unmarshal(metricName); err != nil { + return nil, fmt.Errorf("cannot unmarshal metricName=%q: %w", metricName, err) + } + } + return mns, nil +} + // searchTSIDs returns sorted TSIDs for the given tfss and the given tr. func (s *Storage) searchTSIDs(tfss []*TagFilters, tr TimeRange, maxMetrics int, deadline uint64) ([]TSID, error) { // Do not cache tfss -> tsids here, since the caching is performed