app/vminsert: add /tags/tagSeries and /tags/tagMultiSeries handlers from Graphite Tags API

See https://graphite.readthedocs.io/en/stable/tags.html#adding-series-to-the-tagdb
This commit is contained in:
Aliaksandr Valialkin 2020-11-16 00:42:27 +02:00
parent 9ec964bff8
commit 4be5b5733a
5 changed files with 298 additions and 19 deletions

View file

@ -107,6 +107,7 @@ Click on a link in order to read the corresponding case study
* [Prometheus querying API usage](#prometheus-querying-api-usage)
* [Prometheus querying API enhancements](#prometheus-querying-api-enhancements)
* [Graphite Metrics API usage](#graphite-metrics-api-usage)
* [Graphite Tags API usage](#graphite-tags-api-usage)
* [How to build from sources](#how-to-build-from-sources)
* [Development build](#development-build)
* [Production build](#production-build)
@ -412,6 +413,7 @@ Data sent to VictoriaMetrics via `Graphite plaintext protocol` may be read via t
* [Prometheus querying API](#prometheus-querying-api-usage)
* Metric names can be explored via [Graphite metrics API](#graphite-metrics-api-usage)
* Tags can be explored via [Graphite tags API](#graphite-tags-api-usage)
* [go-graphite/carbonapi](https://github.com/go-graphite/carbonapi/blob/master/cmd/carbonapi/carbonapi.example.prometheus.yaml)
### How to send data from OpenTSDB-compatible agents
@ -540,6 +542,14 @@ VictoriaMetrics accepts the following additional query args at `/metrics/find` a
that start with `node_`. By default `delimiter=.`.
### Graphite Tags API usage
VictoriaMetrics supports the following handlers from [Graphite Tags API](https://graphite.readthedocs.io/en/stable/tags.html):
* [/tags/tagSeries](https://graphite.readthedocs.io/en/stable/tags.html#adding-series-to-the-tagdb)
* [/tags/tagMultiSeries](https://graphite.readthedocs.io/en/stable/tags.html#adding-series-to-the-tagdb)
### How to build from sources
We recommend using either [binary releases](https://github.com/VictoriaMetrics/VictoriaMetrics/releases) or

View file

@ -55,6 +55,33 @@ func (r *Row) reset() {
r.Timestamp = 0
}
// UnmarshalMetricAndTags unmarshals metric and optional tags from s.
func (r *Row) UnmarshalMetricAndTags(s string, tagsPool []Tag) ([]Tag, error) {
if strings.Contains(s, " ") {
return tagsPool, fmt.Errorf("unexpected whitespace found in %q", s)
}
n := strings.IndexByte(s, ';')
if n < 0 {
// No tags
r.Metric = s
} else {
// Tags found
r.Metric = s[:n]
tagsStart := len(tagsPool)
var err error
tagsPool, err = unmarshalTags(tagsPool, s[n+1:])
if err != nil {
return tagsPool, fmt.Errorf("cannot umarshal tags: %w", err)
}
tags := tagsPool[tagsStart:]
r.Tags = tags[:len(tags):len(tags)]
}
if len(r.Metric) == 0 {
return tagsPool, fmt.Errorf("metric cannot be empty")
}
return tagsPool, nil
}
func (r *Row) unmarshal(s string, tagsPool []Tag) ([]Tag, error) {
r.reset()
n := strings.IndexByte(s, ' ')
@ -64,24 +91,9 @@ func (r *Row) unmarshal(s string, tagsPool []Tag) ([]Tag, error) {
metricAndTags := s[:n]
tail := s[n+1:]
n = strings.IndexByte(metricAndTags, ';')
if n < 0 {
// No tags
r.Metric = metricAndTags
} else {
// Tags found
r.Metric = metricAndTags[:n]
tagsStart := len(tagsPool)
var err error
tagsPool, err = unmarshalTags(tagsPool, metricAndTags[n+1:])
if err != nil {
return tagsPool, fmt.Errorf("cannot umarshal tags: %w", err)
}
tags := tagsPool[tagsStart:]
r.Tags = tags[:len(tags):len(tags)]
}
if len(r.Metric) == 0 {
return tagsPool, fmt.Errorf("metric cannot be empty")
tagsPool, err := r.UnmarshalMetricAndTags(metricAndTags, tagsPool)
if err != nil {
return tagsPool, err
}
n = strings.IndexByte(tail, ' ')

View file

@ -7,6 +7,57 @@ import (
"testing"
)
func TestUnmarshalMetricAndTagsFailure(t *testing.T) {
f := func(s string) {
t.Helper()
var r Row
_, err := r.UnmarshalMetricAndTags(s, nil)
if err == nil {
t.Fatalf("expecting non-nil error for UnmarshalMetricAndTags(%q)", s)
}
}
f("")
f(";foo=bar")
f(" ")
f("foo;bar")
f("foo ;bar=baz")
f("f oo;bar=baz")
f("foo;bar=baz ")
f("foo;bar= baz")
f("foo;bar=b az")
f("foo;b ar=baz")
}
func TestUnmarshalMetricAndTagsSuccess(t *testing.T) {
f := func(s string, rExpected *Row) {
t.Helper()
var r Row
_, err := r.UnmarshalMetricAndTags(s, nil)
if err != nil {
t.Fatalf("unexpected error in UnmarshalMetricAndTags(%q): %s", s, err)
}
if !reflect.DeepEqual(&r, rExpected) {
t.Fatalf("unexpected row;\ngot\n%+v\nwant\n%+v", &r, rExpected)
}
}
f("foo", &Row{
Metric: "foo",
})
f("foo;bar=123;baz=aabb", &Row{
Metric: "foo",
Tags: []Tag{
{
Key: "bar",
Value: "123",
},
{
Key: "baz",
Value: "aabb",
},
},
})
}
func TestRowsUnmarshalFailure(t *testing.T) {
f := func(s string) {
t.Helper()

View file

@ -1146,7 +1146,6 @@ func (s *Storage) AddRows(mrs []MetricRow, precisionBits uint8) error {
if len(mrs) == 0 {
return nil
}
atomic.AddUint64(&rowsAddedTotal, uint64(len(mrs)))
// Limit the number of concurrent goroutines that may add rows to the storage.
// This should prevent from out of memory errors and CPU trashing when too many
@ -1183,6 +1182,7 @@ func (s *Storage) AddRows(mrs []MetricRow, precisionBits uint8) error {
<-addRowsConcurrencyCh
atomic.AddUint64(&rowsAddedTotal, uint64(len(mrs)))
return err
}
@ -1194,6 +1194,69 @@ var (
addRowsTimeout = 30 * time.Second
)
// RegisterMetricNames registers all the metric names from mns in the indexdb, so they can be queried later.
//
// The the MetricRow.Timestamp is used for registering the metric name starting from the given timestamp.
// Th MetricRow.Value field is ignored.
func (s *Storage) RegisterMetricNames(mrs []MetricRow) error {
var (
tsid TSID
mn MetricName
metricName []byte
)
idb := s.idb()
is := idb.getIndexSearch(0, 0, noDeadline)
defer idb.putIndexSearch(is)
for i := range mrs {
mr := &mrs[i]
if s.getTSIDFromCache(&tsid, mr.MetricNameRaw) {
// Fast path - mr.MetricNameRaw has been already registered.
continue
}
// Slow path - register mr.MetricNameRaw.
if err := mn.unmarshalRaw(mr.MetricNameRaw); err != nil {
return fmt.Errorf("cannot register the metric because cannot unmarshal MetricNameRaw %q: %w", mr.MetricNameRaw, err)
}
mn.sortTags()
metricName = mn.Marshal(metricName[:0])
if err := is.GetOrCreateTSIDByName(&tsid, metricName); err != nil {
return fmt.Errorf("cannot register the metric because cannot create TSID for metricName %q: %w", metricName, err)
}
s.putTSIDToCache(&tsid, mr.MetricNameRaw)
// Register the metric in per-day inverted index.
date := uint64(mr.Timestamp) / msecPerDay
metricID := tsid.MetricID
if s.dateMetricIDCache.Has(date, metricID) {
// Fast path: the metric has been already registered in per-day inverted index
continue
}
// Slow path: acutally register the metric in per-day inverted index.
is.accountID = mn.AccountID
is.projectID = mn.ProjectID
ok, err := is.hasDateMetricID(date, metricID)
if err != nil {
return fmt.Errorf("cannot register the metric in per-date inverted index because of error when locating (date=%d, metricID=%d) in database: %w",
date, metricID, err)
}
if !ok {
// The (date, metricID) entry is missing in the indexDB. Add it there.
if err := is.storeDateMetricID(date, metricID); err != nil {
return fmt.Errorf("cannot register the metric in per-date inverted index because of error when storing (date=%d, metricID=%d) in database: %w",
date, metricID, err)
}
}
is.accountID = 0
is.projectID = 0
// The metric must be added to cache only after it has been successfully added to indexDB.
s.dateMetricIDCache.Set(date, metricID)
}
return nil
}
func (s *Storage) add(rows []rawRow, mrs []MetricRow, precisionBits uint8) ([]rawRow, error) {
idb := s.idb()
rowsLen := len(rows)

View file

@ -5,6 +5,7 @@ import (
"math/rand"
"os"
"reflect"
"sort"
"strings"
"testing"
"testing/quick"
@ -704,6 +705,148 @@ func checkTagKeys(tks []string, tksExpected map[string]bool) error {
return nil
}
func TestStorageRegisterMetricNamesSerial(t *testing.T) {
path := "TestStorageRegisterMetricNamesSerial"
s, err := OpenStorage(path, 0)
if err != nil {
t.Fatalf("cannot open storage: %s", err)
}
if err := testStorageRegisterMetricNames(s); err != nil {
t.Fatalf("unexpected error: %s", err)
}
s.MustClose()
if err := os.RemoveAll(path); err != nil {
t.Fatalf("cannot remove %q: %s", path, err)
}
}
func TestStorageRegisterMetricNamesConcurrent(t *testing.T) {
path := "TestStorageRegisterMetricNamesConcurrent"
s, err := OpenStorage(path, 0)
if err != nil {
t.Fatalf("cannot open storage: %s", err)
}
ch := make(chan error, 3)
for i := 0; i < cap(ch); i++ {
go func() {
ch <- testStorageRegisterMetricNames(s)
}()
}
for i := 0; i < cap(ch); i++ {
select {
case err := <-ch:
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
case <-time.After(10 * time.Second):
t.Fatalf("timeout")
}
}
s.MustClose()
if err := os.RemoveAll(path); err != nil {
t.Fatalf("cannot remove %q: %s", path, err)
}
}
func testStorageRegisterMetricNames(s *Storage) error {
const metricsPerAdd = 1e3
const addsCount = 10
const accountID = 123
const projectID = 421
addIDsMap := make(map[string]struct{})
for i := 0; i < addsCount; i++ {
var mrs []MetricRow
var mn MetricName
addID := fmt.Sprintf("%d", i)
addIDsMap[addID] = struct{}{}
mn.AccountID = accountID
mn.ProjectID = projectID
mn.Tags = []Tag{
{[]byte("job"), []byte("webservice")},
{[]byte("instance"), []byte("1.2.3.4")},
{[]byte("add_id"), []byte(addID)},
}
now := timestampFromTime(time.Now())
for j := 0; j < metricsPerAdd; j++ {
mn.MetricGroup = []byte(fmt.Sprintf("metric_%d", rand.Intn(100)))
metricNameRaw := mn.marshalRaw(nil)
mr := MetricRow{
MetricNameRaw: metricNameRaw,
Timestamp: now,
}
mrs = append(mrs, mr)
}
if err := s.RegisterMetricNames(mrs); err != nil {
return fmt.Errorf("unexpected error in AddMetrics: %w", err)
}
}
var addIDsExpected []string
for k := range addIDsMap {
addIDsExpected = append(addIDsExpected, k)
}
sort.Strings(addIDsExpected)
// Verify the storage contains the added metric names.
s.DebugFlush()
// Verify that SearchTagKeys returns correct result.
tksExpected := []string{
"",
"add_id",
"instance",
"job",
}
tks, err := s.SearchTagKeys(accountID, projectID, 100, noDeadline)
if err != nil {
return fmt.Errorf("error in SearchTagKeys: %w", err)
}
sort.Strings(tks)
if !reflect.DeepEqual(tks, tksExpected) {
return fmt.Errorf("unexpected tag keys returned from SearchTagKeys;\ngot\n%q\nwant\n%q", tks, tksExpected)
}
// Verify that SearchTagKeysOnTimeRange returns correct result.
now := timestampFromTime(time.Now())
start := now - msecPerDay
end := now + 60*1000
tr := TimeRange{
MinTimestamp: start,
MaxTimestamp: end,
}
tks, err = s.SearchTagKeysOnTimeRange(accountID, projectID, tr, 100, noDeadline)
if err != nil {
return fmt.Errorf("error in SearchTagKeysOnTimeRange: %w", err)
}
sort.Strings(tks)
if !reflect.DeepEqual(tks, tksExpected) {
return fmt.Errorf("unexpected tag keys returned from SearchTagKeysOnTimeRange;\ngot\n%q\nwant\n%q", tks, tksExpected)
}
// Verify that SearchTagValues returns correct result.
addIDs, err := s.SearchTagValues(accountID, projectID, []byte("add_id"), addsCount+100, noDeadline)
if err != nil {
return fmt.Errorf("error in SearchTagValues: %w", err)
}
sort.Strings(addIDs)
if !reflect.DeepEqual(addIDs, addIDsExpected) {
return fmt.Errorf("unexpected tag values returned from SearchTagValues;\ngot\n%q\nwant\n%q", addIDs, addIDsExpected)
}
// Verify that SearchTagValuesOnTimeRange returns correct result.
addIDs, err = s.SearchTagValuesOnTimeRange(accountID, projectID, []byte("add_id"), tr, addsCount+100, noDeadline)
if err != nil {
return fmt.Errorf("error in SearchTagValuesOnTimeRange: %w", err)
}
sort.Strings(addIDs)
if !reflect.DeepEqual(addIDs, addIDsExpected) {
return fmt.Errorf("unexpected tag values returned from SearchTagValuesOnTimeRange;\ngot\n%q\nwant\n%q", addIDs, addIDsExpected)
}
return nil
}
func TestStorageAddRowsSerial(t *testing.T) {
path := "TestStorageAddRowsSerial"
s, err := OpenStorage(path, 0)