lib/storage: return marshaled metric names from SearchMetricNames

Previously SearchMetricNames was returning unmarshaled metric names.
This wasn't great for vmstorage, which should spend additional CPU time
for marshaling the metric names before sending them to vmselect.

While at it, remove possible duplicate metric names, which could occur when
multiple samples for new time series are ingested via concurrent requests.

Also sort the metric names before returning them to the client.
This simplifies debugging of the returned metric names across repeated requests to /api/v1/series
This commit is contained in:
Aliaksandr Valialkin 2022-06-28 17:36:27 +03:00
parent eefa1e24f8
commit a350d1e81c
No known key found for this signature in database
GPG key ID: A72BEC6CD3D0DED1
10 changed files with 127 additions and 67 deletions

View file

@ -192,7 +192,7 @@ func TagsAutoCompleteValuesHandler(startTime time.Time, w http.ResponseWriter, r
if err != nil {
return err
}
mns, err := netstorage.SearchMetricNames(nil, sq, deadline)
metricNames, err := netstorage.SearchMetricNames(nil, sq, deadline)
if err != nil {
return fmt.Errorf("cannot fetch metric names for %q: %w", sq, err)
}
@ -200,7 +200,11 @@ func TagsAutoCompleteValuesHandler(startTime time.Time, w http.ResponseWriter, r
if tag == "name" {
tag = "__name__"
}
for _, mn := range mns {
var mn storage.MetricName
for _, metricName := range metricNames {
if err := mn.UnmarshalString(metricName); err != nil {
return fmt.Errorf("cannot unmarshal metricName=%q: %w", metricName, err)
}
tagValue := mn.GetTagValue(tag)
if len(tagValue) == 0 {
continue
@ -275,12 +279,16 @@ func TagsAutoCompleteTagsHandler(startTime time.Time, w http.ResponseWriter, r *
if err != nil {
return err
}
mns, err := netstorage.SearchMetricNames(nil, sq, deadline)
metricNames, err := netstorage.SearchMetricNames(nil, sq, deadline)
if err != nil {
return fmt.Errorf("cannot fetch metric names for %q: %w", sq, err)
}
m := make(map[string]struct{})
for _, mn := range mns {
var mn storage.MetricName
for _, metricName := range metricNames {
if err := mn.UnmarshalString(metricName); err != nil {
return fmt.Errorf("cannot unmarshal metricName=%q: %w", metricName, err)
}
m["name"] = struct{}{}
for _, tag := range mn.Tags {
m[string(tag.Key)] = struct{}{}
@ -339,11 +347,14 @@ func TagsFindSeriesHandler(startTime time.Time, w http.ResponseWriter, r *http.R
if err != nil {
return err
}
mns, err := netstorage.SearchMetricNames(nil, sq, deadline)
metricNames, err := netstorage.SearchMetricNames(nil, sq, deadline)
if err != nil {
return fmt.Errorf("cannot fetch metric names for %q: %w", sq, err)
}
paths := getCanonicalPaths(mns)
paths, err := getCanonicalPaths(metricNames)
if err != nil {
return fmt.Errorf("cannot obtain canonical paths: %w", err)
}
if limit > 0 && limit < len(paths) {
paths = paths[:limit]
}
@ -359,14 +370,18 @@ func TagsFindSeriesHandler(startTime time.Time, w http.ResponseWriter, r *http.R
return nil
}
func getCanonicalPaths(mns []storage.MetricName) []string {
paths := make([]string, 0, len(mns))
for _, mn := range mns {
func getCanonicalPaths(metricNames []string) ([]string, error) {
paths := make([]string, 0, len(metricNames))
var mn storage.MetricName
for _, metricName := range metricNames {
if err := mn.UnmarshalString(metricName); err != nil {
return nil, fmt.Errorf("cannot unmarshal metricName=%q: %w", metricName, err)
}
path := getCanonicalPath(&mn)
paths = append(paths, path)
}
sort.Strings(paths)
return paths
return paths, nil
}
func getCanonicalPath(mn *storage.MetricName) string {

View file

@ -931,7 +931,9 @@ var exportWorkPool = &sync.Pool{
}
// SearchMetricNames returns all the metric names matching sq until the given deadline.
func SearchMetricNames(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline searchutils.Deadline) ([]storage.MetricName, error) {
//
// The returned metric names must be unmarshaled via storage.MetricName.UnmarshalString().
func SearchMetricNames(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline searchutils.Deadline) ([]string, error) {
qt = qt.NewChild("fetch metric names: %s", sq)
defer qt.Done()
if deadline.Exceeded() {
@ -951,11 +953,13 @@ func SearchMetricNames(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline
return nil, err
}
mns, err := vmstorage.SearchMetricNames(qt, tfss, tr, sq.MaxMetrics, deadline.Deadline())
metricNames, err := vmstorage.SearchMetricNames(qt, tfss, tr, sq.MaxMetrics, deadline.Deadline())
if err != nil {
return nil, fmt.Errorf("cannot find metric names: %w", err)
}
return mns, nil
sort.Strings(metricNames)
qt.Printf("sort %d metric names", len(metricNames))
return metricNames, nil
}
// ProcessSearchQuery performs sq until the given deadline.

View file

@ -610,7 +610,7 @@ func SeriesHandler(qt *querytracer.Tracer, startTime time.Time, w http.ResponseW
cp.start = cp.end - defaultStep
}
sq := storage.NewSearchQuery(cp.start, cp.end, cp.filterss, *maxSeriesLimit)
mns, err := netstorage.SearchMetricNames(qt, sq, cp.deadline)
metricNames, err := netstorage.SearchMetricNames(qt, sq, cp.deadline)
if err != nil {
return fmt.Errorf("cannot fetch time series for %q: %w", sq, err)
}
@ -620,7 +620,7 @@ func SeriesHandler(qt *querytracer.Tracer, startTime time.Time, w http.ResponseW
qtDone := func() {
qt.Donef("start=%d, end=%d", cp.start, cp.end)
}
WriteSeriesResponse(bw, mns, qt, qtDone)
WriteSeriesResponse(bw, metricNames, qt, qtDone)
if err := bw.Flush(); err != nil {
return err
}

View file

@ -6,17 +6,23 @@
{% stripspace %}
SeriesResponse generates response for /api/v1/series.
See https://prometheus.io/docs/prometheus/latest/querying/api/#finding-series-by-label-matchers
{% func SeriesResponse(mns []storage.MetricName, qt *querytracer.Tracer, qtDone func()) %}
{% func SeriesResponse(metricNames []string, qt *querytracer.Tracer, qtDone func()) %}
{
"status":"success",
"data":[
{% for i := range mns %}
{%= metricNameObject(&mns[i]) %}
{% if i+1 < len(mns) %},{% endif %}
{% code var mn storage.MetricName %}
{% for i, metricName := range metricNames %}
{% code err := mn.UnmarshalString(metricName) %}
{% if err != nil %}
{%q= err.Error() %}
{% else %}
{%= metricNameObject(&mn) %}
{% endif %}
{% if i+1 < len(metricNames) %},{% endif %}
{% endfor %}
]
{% code
qt.Printf("generate response: series=%d", len(mns))
qt.Printf("generate response: series=%d", len(metricNames))
qtDone()
%}
{%= dumpQueryTrace(qt) %}

View file

@ -26,56 +26,70 @@ var (
)
//line app/vmselect/prometheus/series_response.qtpl:9
func StreamSeriesResponse(qw422016 *qt422016.Writer, mns []storage.MetricName, qt *querytracer.Tracer, qtDone func()) {
func StreamSeriesResponse(qw422016 *qt422016.Writer, metricNames []string, qt *querytracer.Tracer, qtDone func()) {
//line app/vmselect/prometheus/series_response.qtpl:9
qw422016.N().S(`{"status":"success","data":[`)
//line app/vmselect/prometheus/series_response.qtpl:13
for i := range mns {
var mn storage.MetricName
//line app/vmselect/prometheus/series_response.qtpl:14
streammetricNameObject(qw422016, &mns[i])
for i, metricName := range metricNames {
//line app/vmselect/prometheus/series_response.qtpl:15
if i+1 < len(mns) {
//line app/vmselect/prometheus/series_response.qtpl:15
qw422016.N().S(`,`)
//line app/vmselect/prometheus/series_response.qtpl:15
}
err := mn.UnmarshalString(metricName)
//line app/vmselect/prometheus/series_response.qtpl:16
}
//line app/vmselect/prometheus/series_response.qtpl:16
qw422016.N().S(`]`)
if err != nil {
//line app/vmselect/prometheus/series_response.qtpl:17
qw422016.N().Q(err.Error())
//line app/vmselect/prometheus/series_response.qtpl:18
} else {
//line app/vmselect/prometheus/series_response.qtpl:19
qt.Printf("generate response: series=%d", len(mns))
streammetricNameObject(qw422016, &mn)
//line app/vmselect/prometheus/series_response.qtpl:20
}
//line app/vmselect/prometheus/series_response.qtpl:21
if i+1 < len(metricNames) {
//line app/vmselect/prometheus/series_response.qtpl:21
qw422016.N().S(`,`)
//line app/vmselect/prometheus/series_response.qtpl:21
}
//line app/vmselect/prometheus/series_response.qtpl:22
}
//line app/vmselect/prometheus/series_response.qtpl:22
qw422016.N().S(`]`)
//line app/vmselect/prometheus/series_response.qtpl:25
qt.Printf("generate response: series=%d", len(metricNames))
qtDone()
//line app/vmselect/prometheus/series_response.qtpl:22
//line app/vmselect/prometheus/series_response.qtpl:28
streamdumpQueryTrace(qw422016, qt)
//line app/vmselect/prometheus/series_response.qtpl:22
//line app/vmselect/prometheus/series_response.qtpl:28
qw422016.N().S(`}`)
//line app/vmselect/prometheus/series_response.qtpl:24
//line app/vmselect/prometheus/series_response.qtpl:30
}
//line app/vmselect/prometheus/series_response.qtpl:24
func WriteSeriesResponse(qq422016 qtio422016.Writer, mns []storage.MetricName, qt *querytracer.Tracer, qtDone func()) {
//line app/vmselect/prometheus/series_response.qtpl:24
//line app/vmselect/prometheus/series_response.qtpl:30
func WriteSeriesResponse(qq422016 qtio422016.Writer, metricNames []string, qt *querytracer.Tracer, qtDone func()) {
//line app/vmselect/prometheus/series_response.qtpl:30
qw422016 := qt422016.AcquireWriter(qq422016)
//line app/vmselect/prometheus/series_response.qtpl:24
StreamSeriesResponse(qw422016, mns, qt, qtDone)
//line app/vmselect/prometheus/series_response.qtpl:24
//line app/vmselect/prometheus/series_response.qtpl:30
StreamSeriesResponse(qw422016, metricNames, qt, qtDone)
//line app/vmselect/prometheus/series_response.qtpl:30
qt422016.ReleaseWriter(qw422016)
//line app/vmselect/prometheus/series_response.qtpl:24
//line app/vmselect/prometheus/series_response.qtpl:30
}
//line app/vmselect/prometheus/series_response.qtpl:24
func SeriesResponse(mns []storage.MetricName, qt *querytracer.Tracer, qtDone func()) string {
//line app/vmselect/prometheus/series_response.qtpl:24
//line app/vmselect/prometheus/series_response.qtpl:30
func SeriesResponse(metricNames []string, qt *querytracer.Tracer, qtDone func()) string {
//line app/vmselect/prometheus/series_response.qtpl:30
qb422016 := qt422016.AcquireByteBuffer()
//line app/vmselect/prometheus/series_response.qtpl:24
WriteSeriesResponse(qb422016, mns, qt, qtDone)
//line app/vmselect/prometheus/series_response.qtpl:24
//line app/vmselect/prometheus/series_response.qtpl:30
WriteSeriesResponse(qb422016, metricNames, qt, qtDone)
//line app/vmselect/prometheus/series_response.qtpl:30
qs422016 := string(qb422016.B)
//line app/vmselect/prometheus/series_response.qtpl:24
//line app/vmselect/prometheus/series_response.qtpl:30
qt422016.ReleaseByteBuffer(qb422016)
//line app/vmselect/prometheus/series_response.qtpl:24
//line app/vmselect/prometheus/series_response.qtpl:30
return qs422016
//line app/vmselect/prometheus/series_response.qtpl:24
//line app/vmselect/prometheus/series_response.qtpl:30
}

View file

@ -173,11 +173,11 @@ func DeleteMetrics(qt *querytracer.Tracer, tfss []*storage.TagFilters) (int, err
}
// SearchMetricNames returns metric names for the given tfss on the given tr.
func SearchMetricNames(qt *querytracer.Tracer, tfss []*storage.TagFilters, tr storage.TimeRange, maxMetrics int, deadline uint64) ([]storage.MetricName, error) {
func SearchMetricNames(qt *querytracer.Tracer, tfss []*storage.TagFilters, tr storage.TimeRange, maxMetrics int, deadline uint64) ([]string, error) {
WG.Add(1)
mns, err := Storage.SearchMetricNames(qt, tfss, tr, maxMetrics, deadline)
metricNames, err := Storage.SearchMetricNames(qt, tfss, tr, maxMetrics, deadline)
WG.Done()
return mns, err
return metricNames, err
}
// SearchLabelNamesWithFiltersOnTimeRange searches for tag keys matching the given tfss on tr.

View file

@ -33,6 +33,7 @@ scrape_configs:
```
* FEATURE: [query tracing](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#query-tracing): show timestamps in query traces in human-readable format (aka `RFC3339` in UTC timezone) instead of milliseconds since Unix epoch. For example, `2022-06-27T10:32:54.506Z` instead of `1656325974506`.
* FEATURE: improve performance of [/api/v1/series](https://prometheus.io/docs/prometheus/latest/querying/api/#finding-series-by-label-matchers) requests, which return big number of time series.
* BUGFIX: [vmalert](https://docs.victoriametrics.com/vmalert.html): allow using `__name__` label (aka [metric name](https://prometheus.io/docs/prometheus/latest/querying/basics/#time-series-selectors)) in alerting annotations. For example `{{ $labels.__name__ }}: Too high connection number for "{{ $labels.instance }}`.
* BUGFIX: limit max memory occupied by the cache, which stores parsed regular expressions. Previously too long regular expressions passed in [MetricsQL queries](https://docs.victoriametrics.com/MetricsQL.html) could result in big amounts of used memory (e.g. multiple of gigabytes). Now the max cache size for parsed regexps is limited to a a few megabytes.

View file

@ -3,6 +3,7 @@ package storage
import (
"bytes"
"fmt"
"runtime"
"sort"
"strconv"
"strings"
@ -384,6 +385,14 @@ func (mn *MetricName) Marshal(dst []byte) []byte {
return dst
}
// UnmarshalString unmarshals mn from s
func (mn *MetricName) UnmarshalString(s string) error {
b := bytesutil.ToUnsafeBytes(s)
err := mn.Unmarshal(b)
runtime.KeepAlive(s)
return err
}
// Unmarshal unmarshals mn from src.
func (mn *MetricName) Unmarshal(src []byte) error {
// Unmarshal MetricGroup.

View file

@ -1081,8 +1081,10 @@ func nextRetentionDuration(retentionMsecs int64) time.Duration {
return time.Duration(deadline-t) * time.Millisecond
}
// SearchMetricNames returns metric names matching the given tfss on the given tr.
func (s *Storage) SearchMetricNames(qt *querytracer.Tracer, tfss []*TagFilters, tr TimeRange, maxMetrics int, deadline uint64) ([]MetricName, error) {
// SearchMetricNames returns marshaled metric names matching the given tfss on the given tr.
//
// The marshaled metric names must be unmarshaled via MetricName.UnmarshalString().
func (s *Storage) SearchMetricNames(qt *querytracer.Tracer, tfss []*TagFilters, tr TimeRange, maxMetrics int, deadline uint64) ([]string, error) {
qt = qt.NewChild("search for matching metric names: filters=%s, timeRange=%s", tfss, &tr)
defer qt.Done()
tsids, err := s.searchTSIDs(qt, tfss, tr, maxMetrics, deadline)
@ -1096,7 +1098,8 @@ func (s *Storage) SearchMetricNames(qt *querytracer.Tracer, tfss []*TagFilters,
return nil, err
}
idb := s.idb()
mns := make([]MetricName, 0, len(tsids))
metricNames := make([]string, 0, len(tsids))
metricNamesSeen := make(map[string]struct{}, len(tsids))
var metricName []byte
for i := range tsids {
if i&paceLimiterSlowIterationsMask == 0 {
@ -1115,14 +1118,15 @@ func (s *Storage) SearchMetricNames(qt *querytracer.Tracer, tfss []*TagFilters,
}
return nil, fmt.Errorf("error when searching metricName for metricID=%d: %w", metricID, err)
}
mns = mns[:len(mns)+1]
mn := &mns[len(mns)-1]
if err = mn.Unmarshal(metricName); err != nil {
return nil, fmt.Errorf("cannot unmarshal metricName=%q: %w", metricName, err)
if _, ok := metricNamesSeen[string(metricName)]; ok {
// The given metric name was already seen; skip it
continue
}
metricNames = append(metricNames, string(metricName))
metricNamesSeen[metricNames[len(metricNames)-1]] = struct{}{}
}
qt.Printf("loaded %d metric names", len(mns))
return mns, nil
qt.Printf("loaded %d metric names", len(metricNames))
return metricNames, nil
}
// searchTSIDs returns sorted TSIDs for the given tfss and the given tr.

View file

@ -854,14 +854,21 @@ func testStorageRegisterMetricNames(s *Storage) error {
if err := tfs.Add([]byte("add_id"), []byte("0"), false, false); err != nil {
return fmt.Errorf("unexpected error in TagFilters.Add: %w", err)
}
mns, err := s.SearchMetricNames(nil, []*TagFilters{tfs}, tr, metricsPerAdd*addsCount*100+100, noDeadline)
metricNames, err := s.SearchMetricNames(nil, []*TagFilters{tfs}, tr, metricsPerAdd*addsCount*100+100, noDeadline)
if err != nil {
return fmt.Errorf("error in SearchMetricNames: %w", err)
}
if len(mns) < metricsPerAdd {
return fmt.Errorf("unexpected number of metricNames returned from SearchMetricNames; got %d; want at least %d", len(mns), int(metricsPerAdd))
if err != nil {
return fmt.Errorf("cannot unmarshal metric names: %w", err)
}
for i, mn := range mns {
if len(metricNames) < metricsPerAdd {
return fmt.Errorf("unexpected number of metricNames returned from SearchMetricNames; got %d; want at least %d", len(metricNames), int(metricsPerAdd))
}
var mn MetricName
for i, metricName := range metricNames {
if err := mn.UnmarshalString(metricName); err != nil {
return fmt.Errorf("cannot unmarshal metricName=%q: %w", metricName, err)
}
addID := mn.GetTagValue("add_id")
if string(addID) != "0" {
return fmt.Errorf("unexpected addID for metricName #%d; got %q; want %q", i, addID, "0")