mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
app/vminsert: add /tags/tagSeries
and /tags/tagMultiSeries
handlers from Graphite Tags API
See https://graphite.readthedocs.io/en/stable/tags.html#adding-series-to-the-tagdb
This commit is contained in:
parent
4aaee33860
commit
48d033a198
11 changed files with 522 additions and 21 deletions
10
README.md
10
README.md
|
@ -107,6 +107,7 @@ Click on a link in order to read the corresponding case study
|
|||
* [Prometheus querying API usage](#prometheus-querying-api-usage)
|
||||
* [Prometheus querying API enhancements](#prometheus-querying-api-enhancements)
|
||||
* [Graphite Metrics API usage](#graphite-metrics-api-usage)
|
||||
* [Graphite Tags API usage](#graphite-tags-api-usage)
|
||||
* [How to build from sources](#how-to-build-from-sources)
|
||||
* [Development build](#development-build)
|
||||
* [Production build](#production-build)
|
||||
|
@ -412,6 +413,7 @@ Data sent to VictoriaMetrics via `Graphite plaintext protocol` may be read via t
|
|||
|
||||
* [Prometheus querying API](#prometheus-querying-api-usage)
|
||||
* Metric names can be explored via [Graphite metrics API](#graphite-metrics-api-usage)
|
||||
* Tags can be explored via [Graphite tags API](#graphite-tags-api-usage)
|
||||
* [go-graphite/carbonapi](https://github.com/go-graphite/carbonapi/blob/master/cmd/carbonapi/carbonapi.example.prometheus.yaml)
|
||||
|
||||
### How to send data from OpenTSDB-compatible agents
|
||||
|
@ -540,6 +542,14 @@ VictoriaMetrics accepts the following additional query args at `/metrics/find` a
|
|||
that start with `node_`. By default `delimiter=.`.
|
||||
|
||||
|
||||
### Graphite Tags API usage
|
||||
|
||||
VictoriaMetrics supports the following handlers from [Graphite Tags API](https://graphite.readthedocs.io/en/stable/tags.html):
|
||||
|
||||
* [/tags/tagSeries](https://graphite.readthedocs.io/en/stable/tags.html#adding-series-to-the-tagdb)
|
||||
* [/tags/tagMultiSeries](https://graphite.readthedocs.io/en/stable/tags.html#adding-series-to-the-tagdb)
|
||||
|
||||
|
||||
### How to build from sources
|
||||
|
||||
We recommend using either [binary releases](https://github.com/VictoriaMetrics/VictoriaMetrics/releases) or
|
||||
|
|
102
app/vminsert/graphite/tags.go
Normal file
102
app/vminsert/graphite/tags.go
Normal file
|
@ -0,0 +1,102 @@
|
|||
package graphite
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net/http"
|
||||
"sort"
|
||||
"time"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmstorage"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
|
||||
graphiteparser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/graphite"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
|
||||
"github.com/VictoriaMetrics/metrics"
|
||||
)
|
||||
|
||||
// TagsTagSeriesHandler implements /tags/tagSeries handler.
|
||||
//
|
||||
// See https://graphite.readthedocs.io/en/stable/tags.html#adding-series-to-the-tagdb
|
||||
func TagsTagSeriesHandler(w http.ResponseWriter, r *http.Request) error {
|
||||
return registerMetrics(w, r, false)
|
||||
}
|
||||
|
||||
// TagsTagMultiSeriesHandler implements /tags/tagMultiSeries handler.
|
||||
//
|
||||
// See https://graphite.readthedocs.io/en/stable/tags.html#adding-series-to-the-tagdb
|
||||
func TagsTagMultiSeriesHandler(w http.ResponseWriter, r *http.Request) error {
|
||||
return registerMetrics(w, r, true)
|
||||
}
|
||||
|
||||
func registerMetrics(w http.ResponseWriter, r *http.Request, isJSONResponse bool) error {
|
||||
startTime := time.Now()
|
||||
if err := r.ParseForm(); err != nil {
|
||||
return fmt.Errorf("cannot parse form values: %w", err)
|
||||
}
|
||||
paths := r.Form["path"]
|
||||
var row graphiteparser.Row
|
||||
var labels []prompb.Label
|
||||
var b []byte
|
||||
var tagsPool []graphiteparser.Tag
|
||||
mrs := make([]storage.MetricRow, len(paths))
|
||||
ct := time.Now().UnixNano() / 1e6
|
||||
canonicalPaths := make([]string, len(paths))
|
||||
for i, path := range paths {
|
||||
var err error
|
||||
tagsPool, err = row.UnmarshalMetricAndTags(path, tagsPool[:0])
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot parse path=%q: %w", path, err)
|
||||
}
|
||||
|
||||
// Construct canonical path according to https://graphite.readthedocs.io/en/stable/tags.html#adding-series-to-the-tagdb
|
||||
sort.Slice(row.Tags, func(i, j int) bool {
|
||||
return row.Tags[i].Key < row.Tags[j].Key
|
||||
})
|
||||
b = append(b[:0], row.Metric...)
|
||||
for _, tag := range row.Tags {
|
||||
b = append(b, ';')
|
||||
b = append(b, tag.Key...)
|
||||
b = append(b, '=')
|
||||
b = append(b, tag.Value...)
|
||||
}
|
||||
canonicalPaths[i] = string(b)
|
||||
|
||||
// Convert parsed metric and tags to labels.
|
||||
labels = append(labels[:0], prompb.Label{
|
||||
Name: []byte("__name__"),
|
||||
Value: []byte(row.Metric),
|
||||
})
|
||||
for _, tag := range row.Tags {
|
||||
labels = append(labels, prompb.Label{
|
||||
Name: []byte(tag.Key),
|
||||
Value: []byte(tag.Value),
|
||||
})
|
||||
}
|
||||
|
||||
// Put labels with the current timestamp to MetricRow
|
||||
mr := &mrs[i]
|
||||
mr.MetricNameRaw = storage.MarshalMetricNameRaw(mr.MetricNameRaw[:0], labels)
|
||||
mr.Timestamp = ct
|
||||
}
|
||||
if err := vmstorage.RegisterMetricNames(mrs); err != nil {
|
||||
return fmt.Errorf("cannot register paths: %w", err)
|
||||
}
|
||||
|
||||
// Return response
|
||||
contentType := "text/plain; charset=utf-8"
|
||||
if isJSONResponse {
|
||||
contentType = "application/json; charset=utf-8"
|
||||
}
|
||||
w.Header().Set("Content-Type", contentType)
|
||||
WriteTagsTagMultiSeriesResponse(w, canonicalPaths, isJSONResponse)
|
||||
if isJSONResponse {
|
||||
tagsTagMultiSeriesDuration.UpdateDuration(startTime)
|
||||
} else {
|
||||
tagsTagSeriesDuration.UpdateDuration(startTime)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
var (
|
||||
tagsTagSeriesDuration = metrics.NewSummary(`vm_request_duration_seconds{path="/tags/tagSeries"}`)
|
||||
tagsTagMultiSeriesDuration = metrics.NewSummary(`vm_request_duration_seconds{path="/tags/tagMultiSeries"}`)
|
||||
)
|
14
app/vminsert/graphite/tags_tag_multi_series_response.qtpl
Normal file
14
app/vminsert/graphite/tags_tag_multi_series_response.qtpl
Normal file
|
@ -0,0 +1,14 @@
|
|||
{% stripspace %}
|
||||
|
||||
TagsTagMultiSeriesResponse generates response for /tags/tagMultiSeries .
|
||||
See https://graphite.readthedocs.io/en/stable/tags.html#adding-series-to-the-tagdb
|
||||
{% func TagsTagMultiSeriesResponse(canonicalPaths []string, isJSONResponse bool) %}
|
||||
{% if isJSONResponse %}[{% endif %}
|
||||
{% for i, path := range canonicalPaths %}
|
||||
{%q= path %}
|
||||
{% if i+1 < len(canonicalPaths) %},{% endif %}
|
||||
{% endfor %}
|
||||
{% if isJSONResponse %}]{% endif %}
|
||||
{% endfunc %}
|
||||
|
||||
{% endstripspace %}
|
75
app/vminsert/graphite/tags_tag_multi_series_response.qtpl.go
Normal file
75
app/vminsert/graphite/tags_tag_multi_series_response.qtpl.go
Normal file
|
@ -0,0 +1,75 @@
|
|||
// Code generated by qtc from "tags_tag_multi_series_response.qtpl". DO NOT EDIT.
|
||||
// See https://github.com/valyala/quicktemplate for details.
|
||||
|
||||
// TagsTagMultiSeriesResponse generates response for /tags/tagMultiSeries .See https://graphite.readthedocs.io/en/stable/tags.html#adding-series-to-the-tagdb
|
||||
|
||||
//line app/vminsert/graphite/tags_tag_multi_series_response.qtpl:5
|
||||
package graphite
|
||||
|
||||
//line app/vminsert/graphite/tags_tag_multi_series_response.qtpl:5
|
||||
import (
|
||||
qtio422016 "io"
|
||||
|
||||
qt422016 "github.com/valyala/quicktemplate"
|
||||
)
|
||||
|
||||
//line app/vminsert/graphite/tags_tag_multi_series_response.qtpl:5
|
||||
var (
|
||||
_ = qtio422016.Copy
|
||||
_ = qt422016.AcquireByteBuffer
|
||||
)
|
||||
|
||||
//line app/vminsert/graphite/tags_tag_multi_series_response.qtpl:5
|
||||
func StreamTagsTagMultiSeriesResponse(qw422016 *qt422016.Writer, canonicalPaths []string, isJSONResponse bool) {
|
||||
//line app/vminsert/graphite/tags_tag_multi_series_response.qtpl:6
|
||||
if isJSONResponse {
|
||||
//line app/vminsert/graphite/tags_tag_multi_series_response.qtpl:6
|
||||
qw422016.N().S(`[`)
|
||||
//line app/vminsert/graphite/tags_tag_multi_series_response.qtpl:6
|
||||
}
|
||||
//line app/vminsert/graphite/tags_tag_multi_series_response.qtpl:7
|
||||
for i, path := range canonicalPaths {
|
||||
//line app/vminsert/graphite/tags_tag_multi_series_response.qtpl:8
|
||||
qw422016.N().Q(path)
|
||||
//line app/vminsert/graphite/tags_tag_multi_series_response.qtpl:9
|
||||
if i+1 < len(canonicalPaths) {
|
||||
//line app/vminsert/graphite/tags_tag_multi_series_response.qtpl:9
|
||||
qw422016.N().S(`,`)
|
||||
//line app/vminsert/graphite/tags_tag_multi_series_response.qtpl:9
|
||||
}
|
||||
//line app/vminsert/graphite/tags_tag_multi_series_response.qtpl:10
|
||||
}
|
||||
//line app/vminsert/graphite/tags_tag_multi_series_response.qtpl:11
|
||||
if isJSONResponse {
|
||||
//line app/vminsert/graphite/tags_tag_multi_series_response.qtpl:11
|
||||
qw422016.N().S(`]`)
|
||||
//line app/vminsert/graphite/tags_tag_multi_series_response.qtpl:11
|
||||
}
|
||||
//line app/vminsert/graphite/tags_tag_multi_series_response.qtpl:12
|
||||
}
|
||||
|
||||
//line app/vminsert/graphite/tags_tag_multi_series_response.qtpl:12
|
||||
func WriteTagsTagMultiSeriesResponse(qq422016 qtio422016.Writer, canonicalPaths []string, isJSONResponse bool) {
|
||||
//line app/vminsert/graphite/tags_tag_multi_series_response.qtpl:12
|
||||
qw422016 := qt422016.AcquireWriter(qq422016)
|
||||
//line app/vminsert/graphite/tags_tag_multi_series_response.qtpl:12
|
||||
StreamTagsTagMultiSeriesResponse(qw422016, canonicalPaths, isJSONResponse)
|
||||
//line app/vminsert/graphite/tags_tag_multi_series_response.qtpl:12
|
||||
qt422016.ReleaseWriter(qw422016)
|
||||
//line app/vminsert/graphite/tags_tag_multi_series_response.qtpl:12
|
||||
}
|
||||
|
||||
//line app/vminsert/graphite/tags_tag_multi_series_response.qtpl:12
|
||||
func TagsTagMultiSeriesResponse(canonicalPaths []string, isJSONResponse bool) string {
|
||||
//line app/vminsert/graphite/tags_tag_multi_series_response.qtpl:12
|
||||
qb422016 := qt422016.AcquireByteBuffer()
|
||||
//line app/vminsert/graphite/tags_tag_multi_series_response.qtpl:12
|
||||
WriteTagsTagMultiSeriesResponse(qb422016, canonicalPaths, isJSONResponse)
|
||||
//line app/vminsert/graphite/tags_tag_multi_series_response.qtpl:12
|
||||
qs422016 := string(qb422016.B)
|
||||
//line app/vminsert/graphite/tags_tag_multi_series_response.qtpl:12
|
||||
qt422016.ReleaseByteBuffer(qb422016)
|
||||
//line app/vminsert/graphite/tags_tag_multi_series_response.qtpl:12
|
||||
return qs422016
|
||||
//line app/vminsert/graphite/tags_tag_multi_series_response.qtpl:12
|
||||
}
|
|
@ -153,6 +153,22 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool {
|
|||
influxQueryRequests.Inc()
|
||||
fmt.Fprintf(w, `{"results":[{"series":[{"values":[]}]}]}`)
|
||||
return true
|
||||
case "/tags/tagSeries":
|
||||
graphiteTagsTagSeriesRequests.Inc()
|
||||
if err := graphite.TagsTagSeriesHandler(w, r); err != nil {
|
||||
graphiteTagsTagSeriesErrors.Inc()
|
||||
httpserver.Errorf(w, r, "error in %q: %s", r.URL.Path, err)
|
||||
return true
|
||||
}
|
||||
return true
|
||||
case "/tags/tagMultiSeries":
|
||||
graphiteTagsTagMultiSeriesRequests.Inc()
|
||||
if err := graphite.TagsTagMultiSeriesHandler(w, r); err != nil {
|
||||
graphiteTagsTagMultiSeriesErrors.Inc()
|
||||
httpserver.Errorf(w, r, "error in %q: %s", r.URL.Path, err)
|
||||
return true
|
||||
}
|
||||
return true
|
||||
case "/targets":
|
||||
promscrapeTargetsRequests.Inc()
|
||||
w.Header().Set("Content-Type", "text/plain; charset=utf-8")
|
||||
|
@ -207,6 +223,12 @@ var (
|
|||
|
||||
influxQueryRequests = metrics.NewCounter(`vm_http_requests_total{path="/query", protocol="influx"}`)
|
||||
|
||||
graphiteTagsTagSeriesRequests = metrics.NewCounter(`vm_http_requests_total{path="/tags/tagSeries", protocol="graphite"}`)
|
||||
graphiteTagsTagSeriesErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/tags/tagSeries", protocol="graphite"}`)
|
||||
|
||||
graphiteTagsTagMultiSeriesRequests = metrics.NewCounter(`vm_http_requests_total{path="/tags/tagMultiSeries", protocol="graphite"}`)
|
||||
graphiteTagsTagMultiSeriesErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/tags/tagMultiSeries", protocol="graphite"}`)
|
||||
|
||||
promscrapeTargetsRequests = metrics.NewCounter(`vm_http_requests_total{path="/targets"}`)
|
||||
promscrapeAPIV1TargetsRequests = metrics.NewCounter(`vm_http_requests_total{path="/api/v1/targets"}`)
|
||||
|
||||
|
|
|
@ -113,6 +113,14 @@ func AddRows(mrs []storage.MetricRow) error {
|
|||
return err
|
||||
}
|
||||
|
||||
// RegisterMetricNames registers all the metrics from mrs in the storage.
|
||||
func RegisterMetricNames(mrs []storage.MetricRow) error {
|
||||
WG.Add(1)
|
||||
err := Storage.RegisterMetricNames(mrs)
|
||||
WG.Done()
|
||||
return err
|
||||
}
|
||||
|
||||
// DeleteMetrics deletes metrics matching tfss.
|
||||
//
|
||||
// Returns the number of deleted metrics.
|
||||
|
|
|
@ -107,6 +107,7 @@ Click on a link in order to read the corresponding case study
|
|||
* [Prometheus querying API usage](#prometheus-querying-api-usage)
|
||||
* [Prometheus querying API enhancements](#prometheus-querying-api-enhancements)
|
||||
* [Graphite Metrics API usage](#graphite-metrics-api-usage)
|
||||
* [Graphite Tags API usage](#graphite-tags-api-usage)
|
||||
* [How to build from sources](#how-to-build-from-sources)
|
||||
* [Development build](#development-build)
|
||||
* [Production build](#production-build)
|
||||
|
@ -412,6 +413,7 @@ Data sent to VictoriaMetrics via `Graphite plaintext protocol` may be read via t
|
|||
|
||||
* [Prometheus querying API](#prometheus-querying-api-usage)
|
||||
* Metric names can be explored via [Graphite metrics API](#graphite-metrics-api-usage)
|
||||
* Tags can be explored via [Graphite tags API](#graphite-tags-api-usage)
|
||||
* [go-graphite/carbonapi](https://github.com/go-graphite/carbonapi/blob/master/cmd/carbonapi/carbonapi.example.prometheus.yaml)
|
||||
|
||||
### How to send data from OpenTSDB-compatible agents
|
||||
|
@ -540,6 +542,14 @@ VictoriaMetrics accepts the following additional query args at `/metrics/find` a
|
|||
that start with `node_`. By default `delimiter=.`.
|
||||
|
||||
|
||||
### Graphite Tags API usage
|
||||
|
||||
VictoriaMetrics supports the following handlers from [Graphite Tags API](https://graphite.readthedocs.io/en/stable/tags.html):
|
||||
|
||||
* [/tags/tagSeries](https://graphite.readthedocs.io/en/stable/tags.html#adding-series-to-the-tagdb)
|
||||
* [/tags/tagMultiSeries](https://graphite.readthedocs.io/en/stable/tags.html#adding-series-to-the-tagdb)
|
||||
|
||||
|
||||
### How to build from sources
|
||||
|
||||
We recommend using either [binary releases](https://github.com/VictoriaMetrics/VictoriaMetrics/releases) or
|
||||
|
|
|
@ -55,6 +55,33 @@ func (r *Row) reset() {
|
|||
r.Timestamp = 0
|
||||
}
|
||||
|
||||
// UnmarshalMetricAndTags unmarshals metric and optional tags from s.
|
||||
func (r *Row) UnmarshalMetricAndTags(s string, tagsPool []Tag) ([]Tag, error) {
|
||||
if strings.Contains(s, " ") {
|
||||
return tagsPool, fmt.Errorf("unexpected whitespace found in %q", s)
|
||||
}
|
||||
n := strings.IndexByte(s, ';')
|
||||
if n < 0 {
|
||||
// No tags
|
||||
r.Metric = s
|
||||
} else {
|
||||
// Tags found
|
||||
r.Metric = s[:n]
|
||||
tagsStart := len(tagsPool)
|
||||
var err error
|
||||
tagsPool, err = unmarshalTags(tagsPool, s[n+1:])
|
||||
if err != nil {
|
||||
return tagsPool, fmt.Errorf("cannot umarshal tags: %w", err)
|
||||
}
|
||||
tags := tagsPool[tagsStart:]
|
||||
r.Tags = tags[:len(tags):len(tags)]
|
||||
}
|
||||
if len(r.Metric) == 0 {
|
||||
return tagsPool, fmt.Errorf("metric cannot be empty")
|
||||
}
|
||||
return tagsPool, nil
|
||||
}
|
||||
|
||||
func (r *Row) unmarshal(s string, tagsPool []Tag) ([]Tag, error) {
|
||||
r.reset()
|
||||
n := strings.IndexByte(s, ' ')
|
||||
|
@ -64,24 +91,9 @@ func (r *Row) unmarshal(s string, tagsPool []Tag) ([]Tag, error) {
|
|||
metricAndTags := s[:n]
|
||||
tail := s[n+1:]
|
||||
|
||||
n = strings.IndexByte(metricAndTags, ';')
|
||||
if n < 0 {
|
||||
// No tags
|
||||
r.Metric = metricAndTags
|
||||
} else {
|
||||
// Tags found
|
||||
r.Metric = metricAndTags[:n]
|
||||
tagsStart := len(tagsPool)
|
||||
var err error
|
||||
tagsPool, err = unmarshalTags(tagsPool, metricAndTags[n+1:])
|
||||
if err != nil {
|
||||
return tagsPool, fmt.Errorf("cannot umarshal tags: %w", err)
|
||||
}
|
||||
tags := tagsPool[tagsStart:]
|
||||
r.Tags = tags[:len(tags):len(tags)]
|
||||
}
|
||||
if len(r.Metric) == 0 {
|
||||
return tagsPool, fmt.Errorf("metric cannot be empty")
|
||||
tagsPool, err := r.UnmarshalMetricAndTags(metricAndTags, tagsPool)
|
||||
if err != nil {
|
||||
return tagsPool, err
|
||||
}
|
||||
|
||||
n = strings.IndexByte(tail, ' ')
|
||||
|
|
|
@ -7,6 +7,57 @@ import (
|
|||
"testing"
|
||||
)
|
||||
|
||||
func TestUnmarshalMetricAndTagsFailure(t *testing.T) {
|
||||
f := func(s string) {
|
||||
t.Helper()
|
||||
var r Row
|
||||
_, err := r.UnmarshalMetricAndTags(s, nil)
|
||||
if err == nil {
|
||||
t.Fatalf("expecting non-nil error for UnmarshalMetricAndTags(%q)", s)
|
||||
}
|
||||
}
|
||||
f("")
|
||||
f(";foo=bar")
|
||||
f(" ")
|
||||
f("foo;bar")
|
||||
f("foo ;bar=baz")
|
||||
f("f oo;bar=baz")
|
||||
f("foo;bar=baz ")
|
||||
f("foo;bar= baz")
|
||||
f("foo;bar=b az")
|
||||
f("foo;b ar=baz")
|
||||
}
|
||||
|
||||
func TestUnmarshalMetricAndTagsSuccess(t *testing.T) {
|
||||
f := func(s string, rExpected *Row) {
|
||||
t.Helper()
|
||||
var r Row
|
||||
_, err := r.UnmarshalMetricAndTags(s, nil)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error in UnmarshalMetricAndTags(%q): %s", s, err)
|
||||
}
|
||||
if !reflect.DeepEqual(&r, rExpected) {
|
||||
t.Fatalf("unexpected row;\ngot\n%+v\nwant\n%+v", &r, rExpected)
|
||||
}
|
||||
}
|
||||
f("foo", &Row{
|
||||
Metric: "foo",
|
||||
})
|
||||
f("foo;bar=123;baz=aabb", &Row{
|
||||
Metric: "foo",
|
||||
Tags: []Tag{
|
||||
{
|
||||
Key: "bar",
|
||||
Value: "123",
|
||||
},
|
||||
{
|
||||
Key: "baz",
|
||||
Value: "aabb",
|
||||
},
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
func TestRowsUnmarshalFailure(t *testing.T) {
|
||||
f := func(s string) {
|
||||
t.Helper()
|
||||
|
|
|
@ -1070,7 +1070,6 @@ func (s *Storage) AddRows(mrs []MetricRow, precisionBits uint8) error {
|
|||
if len(mrs) == 0 {
|
||||
return nil
|
||||
}
|
||||
atomic.AddUint64(&rowsAddedTotal, uint64(len(mrs)))
|
||||
|
||||
// Limit the number of concurrent goroutines that may add rows to the storage.
|
||||
// This should prevent from out of memory errors and CPU trashing when too many
|
||||
|
@ -1107,6 +1106,7 @@ func (s *Storage) AddRows(mrs []MetricRow, precisionBits uint8) error {
|
|||
|
||||
<-addRowsConcurrencyCh
|
||||
|
||||
atomic.AddUint64(&rowsAddedTotal, uint64(len(mrs)))
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -1118,6 +1118,64 @@ var (
|
|||
addRowsTimeout = 30 * time.Second
|
||||
)
|
||||
|
||||
// RegisterMetricNames registers all the metric names from mns in the indexdb, so they can be queried later.
|
||||
//
|
||||
// The the MetricRow.Timestamp is used for registering the metric name starting from the given timestamp.
|
||||
// Th MetricRow.Value field is ignored.
|
||||
func (s *Storage) RegisterMetricNames(mrs []MetricRow) error {
|
||||
var (
|
||||
tsid TSID
|
||||
mn MetricName
|
||||
metricName []byte
|
||||
)
|
||||
idb := s.idb()
|
||||
is := idb.getIndexSearch(noDeadline)
|
||||
defer idb.putIndexSearch(is)
|
||||
for i := range mrs {
|
||||
mr := &mrs[i]
|
||||
if s.getTSIDFromCache(&tsid, mr.MetricNameRaw) {
|
||||
// Fast path - mr.MetricNameRaw has been already registered.
|
||||
continue
|
||||
}
|
||||
|
||||
// Slow path - register mr.MetricNameRaw.
|
||||
if err := mn.unmarshalRaw(mr.MetricNameRaw); err != nil {
|
||||
return fmt.Errorf("cannot register the metric because cannot unmarshal MetricNameRaw %q: %w", mr.MetricNameRaw, err)
|
||||
}
|
||||
mn.sortTags()
|
||||
metricName = mn.Marshal(metricName[:0])
|
||||
if err := is.GetOrCreateTSIDByName(&tsid, metricName); err != nil {
|
||||
return fmt.Errorf("cannot register the metric because cannot create TSID for metricName %q: %w", metricName, err)
|
||||
}
|
||||
s.putTSIDToCache(&tsid, mr.MetricNameRaw)
|
||||
|
||||
// Register the metric in per-day inverted index.
|
||||
date := uint64(mr.Timestamp) / msecPerDay
|
||||
metricID := tsid.MetricID
|
||||
if s.dateMetricIDCache.Has(date, metricID) {
|
||||
// Fast path: the metric has been already registered in per-day inverted index
|
||||
continue
|
||||
}
|
||||
|
||||
// Slow path: acutally register the metric in per-day inverted index.
|
||||
ok, err := is.hasDateMetricID(date, metricID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot register the metric in per-date inverted index because of error when locating (date=%d, metricID=%d) in database: %w",
|
||||
date, metricID, err)
|
||||
}
|
||||
if !ok {
|
||||
// The (date, metricID) entry is missing in the indexDB. Add it there.
|
||||
if err := is.storeDateMetricID(date, metricID); err != nil {
|
||||
return fmt.Errorf("cannot register the metric in per-date inverted index because of error when storing (date=%d, metricID=%d) in database: %w",
|
||||
date, metricID, err)
|
||||
}
|
||||
}
|
||||
// The metric must be added to cache only after it has been successfully added to indexDB.
|
||||
s.dateMetricIDCache.Set(date, metricID)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Storage) add(rows []rawRow, mrs []MetricRow, precisionBits uint8) ([]rawRow, error) {
|
||||
idb := s.idb()
|
||||
rowsLen := len(rows)
|
||||
|
|
|
@ -5,6 +5,7 @@ import (
|
|||
"math/rand"
|
||||
"os"
|
||||
"reflect"
|
||||
"sort"
|
||||
"strings"
|
||||
"testing"
|
||||
"testing/quick"
|
||||
|
@ -103,7 +104,7 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) {
|
|||
s.pendingHourEntries = &uint64set.Set{}
|
||||
return &s
|
||||
}
|
||||
t.Run("empty_pedning_metric_ids_stale_curr_hour", func(t *testing.T) {
|
||||
t.Run("empty_pending_metric_ids_stale_curr_hour", func(t *testing.T) {
|
||||
s := newStorage()
|
||||
hour := uint64(timestampFromTime(time.Now())) / msecPerHour
|
||||
hmOrig := &hourMetricIDs{
|
||||
|
@ -138,7 +139,7 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) {
|
|||
t.Fatalf("unexpected s.pendingHourEntries.Len(); got %d; want %d", s.pendingHourEntries.Len(), 0)
|
||||
}
|
||||
})
|
||||
t.Run("empty_pedning_metric_ids_valid_curr_hour", func(t *testing.T) {
|
||||
t.Run("empty_pending_metric_ids_valid_curr_hour", func(t *testing.T) {
|
||||
s := newStorage()
|
||||
hour := uint64(timestampFromTime(time.Now())) / msecPerHour
|
||||
hmOrig := &hourMetricIDs{
|
||||
|
@ -664,6 +665,144 @@ func checkTagKeys(tks []string, tksExpected map[string]bool) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func TestStorageRegisterMetricNamesSerial(t *testing.T) {
|
||||
path := "TestStorageRegisterMetricNamesSerial"
|
||||
s, err := OpenStorage(path, 0)
|
||||
if err != nil {
|
||||
t.Fatalf("cannot open storage: %s", err)
|
||||
}
|
||||
if err := testStorageRegisterMetricNames(s); err != nil {
|
||||
t.Fatalf("unexpected error: %s", err)
|
||||
}
|
||||
s.MustClose()
|
||||
if err := os.RemoveAll(path); err != nil {
|
||||
t.Fatalf("cannot remove %q: %s", path, err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestStorageRegisterMetricNamesConcurrent(t *testing.T) {
|
||||
path := "TestStorageRegisterMetricNamesConcurrent"
|
||||
s, err := OpenStorage(path, 0)
|
||||
if err != nil {
|
||||
t.Fatalf("cannot open storage: %s", err)
|
||||
}
|
||||
ch := make(chan error, 3)
|
||||
for i := 0; i < cap(ch); i++ {
|
||||
go func() {
|
||||
ch <- testStorageRegisterMetricNames(s)
|
||||
}()
|
||||
}
|
||||
for i := 0; i < cap(ch); i++ {
|
||||
select {
|
||||
case err := <-ch:
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %s", err)
|
||||
}
|
||||
case <-time.After(10 * time.Second):
|
||||
t.Fatalf("timeout")
|
||||
}
|
||||
}
|
||||
s.MustClose()
|
||||
if err := os.RemoveAll(path); err != nil {
|
||||
t.Fatalf("cannot remove %q: %s", path, err)
|
||||
}
|
||||
}
|
||||
|
||||
func testStorageRegisterMetricNames(s *Storage) error {
|
||||
const metricsPerAdd = 1e3
|
||||
const addsCount = 10
|
||||
|
||||
addIDsMap := make(map[string]struct{})
|
||||
for i := 0; i < addsCount; i++ {
|
||||
var mrs []MetricRow
|
||||
var mn MetricName
|
||||
addID := fmt.Sprintf("%d", i)
|
||||
addIDsMap[addID] = struct{}{}
|
||||
mn.Tags = []Tag{
|
||||
{[]byte("job"), []byte("webservice")},
|
||||
{[]byte("instance"), []byte("1.2.3.4")},
|
||||
{[]byte("add_id"), []byte(addID)},
|
||||
}
|
||||
now := timestampFromTime(time.Now())
|
||||
for j := 0; j < metricsPerAdd; j++ {
|
||||
mn.MetricGroup = []byte(fmt.Sprintf("metric_%d", rand.Intn(100)))
|
||||
metricNameRaw := mn.marshalRaw(nil)
|
||||
|
||||
mr := MetricRow{
|
||||
MetricNameRaw: metricNameRaw,
|
||||
Timestamp: now,
|
||||
}
|
||||
mrs = append(mrs, mr)
|
||||
}
|
||||
if err := s.RegisterMetricNames(mrs); err != nil {
|
||||
return fmt.Errorf("unexpected error in AddMetrics: %w", err)
|
||||
}
|
||||
}
|
||||
var addIDsExpected []string
|
||||
for k := range addIDsMap {
|
||||
addIDsExpected = append(addIDsExpected, k)
|
||||
}
|
||||
sort.Strings(addIDsExpected)
|
||||
|
||||
// Verify the storage contains the added metric names.
|
||||
s.DebugFlush()
|
||||
|
||||
// Verify that SearchTagKeys returns correct result.
|
||||
tksExpected := []string{
|
||||
"",
|
||||
"add_id",
|
||||
"instance",
|
||||
"job",
|
||||
}
|
||||
tks, err := s.SearchTagKeys(100, noDeadline)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error in SearchTagKeys: %w", err)
|
||||
}
|
||||
sort.Strings(tks)
|
||||
if !reflect.DeepEqual(tks, tksExpected) {
|
||||
return fmt.Errorf("unexpected tag keys returned from SearchTagKeys;\ngot\n%q\nwant\n%q", tks, tksExpected)
|
||||
}
|
||||
|
||||
// Verify that SearchTagKeysOnTimeRange returns correct result.
|
||||
now := timestampFromTime(time.Now())
|
||||
start := now - msecPerDay
|
||||
end := now + 60*1000
|
||||
tr := TimeRange{
|
||||
MinTimestamp: start,
|
||||
MaxTimestamp: end,
|
||||
}
|
||||
tks, err = s.SearchTagKeysOnTimeRange(tr, 100, noDeadline)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error in SearchTagKeysOnTimeRange: %w", err)
|
||||
}
|
||||
sort.Strings(tks)
|
||||
if !reflect.DeepEqual(tks, tksExpected) {
|
||||
return fmt.Errorf("unexpected tag keys returned from SearchTagKeysOnTimeRange;\ngot\n%q\nwant\n%q", tks, tksExpected)
|
||||
}
|
||||
|
||||
// Verify that SearchTagValues returns correct result.
|
||||
addIDs, err := s.SearchTagValues([]byte("add_id"), addsCount+100, noDeadline)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error in SearchTagValues: %w", err)
|
||||
}
|
||||
sort.Strings(addIDs)
|
||||
if !reflect.DeepEqual(addIDs, addIDsExpected) {
|
||||
return fmt.Errorf("unexpected tag values returned from SearchTagValues;\ngot\n%q\nwant\n%q", addIDs, addIDsExpected)
|
||||
}
|
||||
|
||||
// Verify that SearchTagValuesOnTimeRange returns correct result.
|
||||
addIDs, err = s.SearchTagValuesOnTimeRange([]byte("add_id"), tr, addsCount+100, noDeadline)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error in SearchTagValuesOnTimeRange: %w", err)
|
||||
}
|
||||
sort.Strings(addIDs)
|
||||
if !reflect.DeepEqual(addIDs, addIDsExpected) {
|
||||
return fmt.Errorf("unexpected tag values returned from SearchTagValuesOnTimeRange;\ngot\n%q\nwant\n%q", addIDs, addIDsExpected)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func TestStorageAddRowsSerial(t *testing.T) {
|
||||
path := "TestStorageAddRowsSerial"
|
||||
s, err := OpenStorage(path, 0)
|
||||
|
|
Loading…
Reference in a new issue