mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
lib/protoparser/graphite: accept whitespace in metric names and tags according to the specification
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/99 Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3102 See the specification https://graphite.readthedocs.io/en/latest/tags.html
This commit is contained in:
parent
5983ecf4d1
commit
41f8c2987d
3 changed files with 117 additions and 51 deletions
|
@ -21,6 +21,7 @@ The following tip changes can be tested by building VictoriaMetrics components f
|
|||
|
||||
* FEATURE: sanitize metric names for data ingested via [DataDog protocol](https://docs.victoriametrics.com/#how-to-send-data-from-datadog-agent) according to [DataDog metric naming](https://docs.datadoghq.com/metrics/custom_metrics/#naming-custom-metrics). The behaviour can be disabled by passing `-datadog.sanitizeMetricName=false` command-line flag. Thanks to @PerGon for [the pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/3105).
|
||||
* FEATURE: add `-usePromCompatibleNaming` command-line flag to [vmagent](https://docs.victoriametrics.com/vmagent.html), to single-node VictoriaMetrics and to `vminsert` component of VictoriaMetrics cluster. This flag can be used for normalizing the ingested metric names and label names to [Prometheus-compatible form](https://prometheus.io/docs/concepts/data_model/#metric-names-and-labels). If this flag is set, then all the chars unsupported by Prometheus are replaced with `_` chars in metric names and labels of the ingested samples. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3113).
|
||||
* FEATURE: accept whitespace in metric names and tags ingested via [Graphite plaintext protocol](https://docs.victoriametrics.com/#how-to-send-data-from-graphite-compatible-agents-such-as-statsd) according to [the specs](https://graphite.readthedocs.io/en/latest/tags.html). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3102).
|
||||
* FEATURE: check the correctess of raw sample timestamps stored on disk when reading them. This reduces the probability of possible silent corruption of the data stored on disk. This should help [this](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2998) and [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3011).
|
||||
* FEATURE: atomically delete directories with snapshots, parts and partitions at [storage level](https://docs.victoriametrics.com/#storage). Previously such directories can be left in partially deleted state when the deletion operation was interrupted by unclean shutdown. This may result in `cannot open file ...: no such file or directory` error on the next start. The probability of this error was quite high when NFS or EFS was used as persistent storage for VictoriaMetrics data. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3038).
|
||||
* FEATURE: set the `start` arg to `end - 5 minutes` if isn't passed explicitly to [/api/v1/labels](https://docs.victoriametrics.com/url-examples.html#apiv1labels) and [/api/v1/label/.../values](https://docs.victoriametrics.com/url-examples.html#apiv1labelvalues). See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/3052).
|
||||
|
|
|
@ -61,9 +61,6 @@ func (r *Row) reset() {
|
|||
|
||||
// UnmarshalMetricAndTags unmarshals metric and optional tags from s.
|
||||
func (r *Row) UnmarshalMetricAndTags(s string, tagsPool []Tag) ([]Tag, error) {
|
||||
if strings.Contains(s, " ") {
|
||||
return tagsPool, fmt.Errorf("unexpected whitespace found in %q", s)
|
||||
}
|
||||
n := strings.IndexByte(s, ';')
|
||||
if n < 0 {
|
||||
// No tags
|
||||
|
@ -72,11 +69,7 @@ func (r *Row) UnmarshalMetricAndTags(s string, tagsPool []Tag) ([]Tag, error) {
|
|||
// Tags found
|
||||
r.Metric = s[:n]
|
||||
tagsStart := len(tagsPool)
|
||||
var err error
|
||||
tagsPool, err = unmarshalTags(tagsPool, s[n+1:])
|
||||
if err != nil {
|
||||
return tagsPool, fmt.Errorf("cannot unmarshal tags: %w", err)
|
||||
}
|
||||
tagsPool = unmarshalTags(tagsPool, s[n+1:])
|
||||
tags := tagsPool[tagsStart:]
|
||||
r.Tags = tags[:len(tags):len(tags)]
|
||||
}
|
||||
|
@ -88,40 +81,43 @@ func (r *Row) UnmarshalMetricAndTags(s string, tagsPool []Tag) ([]Tag, error) {
|
|||
|
||||
func (r *Row) unmarshal(s string, tagsPool []Tag) ([]Tag, error) {
|
||||
r.reset()
|
||||
n := strings.IndexAny(s, graphiteSeparators)
|
||||
sOrig := s
|
||||
s = stripTrailingWhitespace(s)
|
||||
n := strings.LastIndexAny(s, graphiteSeparators)
|
||||
if n < 0 {
|
||||
return tagsPool, fmt.Errorf("cannot find separator between metric and value in %q", s)
|
||||
return tagsPool, fmt.Errorf("cannot find separator between value and timestamp in %q", s)
|
||||
}
|
||||
metricAndTags := s[:n]
|
||||
tail := stripLeadingWhitespace(s[n+1:])
|
||||
|
||||
timestampStr := s[n+1:]
|
||||
valueStr := ""
|
||||
metricAndTags := ""
|
||||
s = stripTrailingWhitespace(s[:n])
|
||||
n = strings.LastIndexAny(s, graphiteSeparators)
|
||||
if n < 0 {
|
||||
// Missing timestamp
|
||||
metricAndTags = stripLeadingWhitespace(s)
|
||||
valueStr = timestampStr
|
||||
timestampStr = ""
|
||||
} else {
|
||||
metricAndTags = stripLeadingWhitespace(s[:n])
|
||||
valueStr = s[n+1:]
|
||||
}
|
||||
metricAndTags = stripTrailingWhitespace(metricAndTags)
|
||||
tagsPool, err := r.UnmarshalMetricAndTags(metricAndTags, tagsPool)
|
||||
if err != nil {
|
||||
return tagsPool, err
|
||||
return tagsPool, fmt.Errorf("cannot parse metric and tags from %q: %w; original line: %q", metricAndTags, err, sOrig)
|
||||
}
|
||||
|
||||
n = strings.IndexAny(tail, graphiteSeparators)
|
||||
if n < 0 {
|
||||
// There is no timestamp. Use default timestamp instead.
|
||||
v, err := fastfloat.Parse(tail)
|
||||
if len(timestampStr) > 0 {
|
||||
ts, err := fastfloat.Parse(timestampStr)
|
||||
if err != nil {
|
||||
return tagsPool, fmt.Errorf("cannot unmarshal value from %q: %w", tail, err)
|
||||
return tagsPool, fmt.Errorf("cannot unmarshal timestamp from %q: %w; orignal line: %q", timestampStr, err, sOrig)
|
||||
}
|
||||
r.Value = v
|
||||
return tagsPool, nil
|
||||
r.Timestamp = int64(ts)
|
||||
}
|
||||
v, err := fastfloat.Parse(tail[:n])
|
||||
v, err := fastfloat.Parse(valueStr)
|
||||
if err != nil {
|
||||
return tagsPool, fmt.Errorf("cannot unmarshal value from %q: %w", tail[:n], err)
|
||||
}
|
||||
tail = stripLeadingWhitespace(tail[n+1:])
|
||||
tail = stripTrailingWhitespace(tail)
|
||||
ts, err := fastfloat.Parse(tail)
|
||||
if err != nil {
|
||||
return tagsPool, fmt.Errorf("cannot unmarshal timestamp from %q: %w", tail, err)
|
||||
return tagsPool, fmt.Errorf("cannot unmarshal value from %q: %w; original line: %q", valueStr, err, sOrig)
|
||||
}
|
||||
r.Value = v
|
||||
r.Timestamp = int64(ts)
|
||||
return tagsPool, nil
|
||||
}
|
||||
|
||||
|
@ -142,11 +138,11 @@ func unmarshalRow(dst []Row, s string, tagsPool []Tag) ([]Row, []Tag) {
|
|||
if len(s) > 0 && s[len(s)-1] == '\r' {
|
||||
s = s[:len(s)-1]
|
||||
}
|
||||
s = stripLeadingWhitespace(s)
|
||||
if len(s) == 0 {
|
||||
// Skip empty line
|
||||
return dst, tagsPool
|
||||
}
|
||||
|
||||
if cap(dst) > len(dst) {
|
||||
dst = dst[:len(dst)+1]
|
||||
} else {
|
||||
|
@ -165,7 +161,7 @@ func unmarshalRow(dst []Row, s string, tagsPool []Tag) ([]Row, []Tag) {
|
|||
|
||||
var invalidLines = metrics.NewCounter(`vm_rows_invalid_total{type="graphite"}`)
|
||||
|
||||
func unmarshalTags(dst []Tag, s string) ([]Tag, error) {
|
||||
func unmarshalTags(dst []Tag, s string) []Tag {
|
||||
for {
|
||||
if cap(dst) > len(dst) {
|
||||
dst = dst[:len(dst)+1]
|
||||
|
@ -182,7 +178,7 @@ func unmarshalTags(dst []Tag, s string) ([]Tag, error) {
|
|||
// Skip empty tag
|
||||
dst = dst[:len(dst)-1]
|
||||
}
|
||||
return dst, nil
|
||||
return dst
|
||||
}
|
||||
tag.unmarshal(s[:n])
|
||||
s = s[n+1:]
|
||||
|
|
|
@ -19,13 +19,6 @@ func TestUnmarshalMetricAndTagsFailure(t *testing.T) {
|
|||
}
|
||||
f("")
|
||||
f(";foo=bar")
|
||||
f(" ")
|
||||
f("foo ;bar=baz")
|
||||
f("f oo;bar=baz")
|
||||
f("foo;bar=baz ")
|
||||
f("foo;bar= baz")
|
||||
f("foo;bar=b az")
|
||||
f("foo;b ar=baz")
|
||||
}
|
||||
|
||||
func TestUnmarshalMetricAndTagsSuccess(t *testing.T) {
|
||||
|
@ -40,10 +33,67 @@ func TestUnmarshalMetricAndTagsSuccess(t *testing.T) {
|
|||
t.Fatalf("unexpected row;\ngot\n%+v\nwant\n%+v", &r, rExpected)
|
||||
}
|
||||
}
|
||||
f(" ", &Row{
|
||||
Metric: " ",
|
||||
})
|
||||
f("foo ;bar=baz", &Row{
|
||||
Metric: "foo ",
|
||||
Tags: []Tag{
|
||||
{
|
||||
Key: "bar",
|
||||
Value: "baz",
|
||||
},
|
||||
},
|
||||
})
|
||||
f("f oo;bar=baz", &Row{
|
||||
Metric: "f oo",
|
||||
Tags: []Tag{
|
||||
{
|
||||
Key: "bar",
|
||||
Value: "baz",
|
||||
},
|
||||
},
|
||||
})
|
||||
f("foo;bar=baz ", &Row{
|
||||
Metric: "foo",
|
||||
Tags: []Tag{
|
||||
{
|
||||
Key: "bar",
|
||||
Value: "baz ",
|
||||
},
|
||||
},
|
||||
})
|
||||
f("foo;bar= baz", &Row{
|
||||
Metric: "foo",
|
||||
Tags: []Tag{
|
||||
{
|
||||
Key: "bar",
|
||||
Value: " baz",
|
||||
},
|
||||
},
|
||||
})
|
||||
f("foo;bar=b az", &Row{
|
||||
Metric: "foo",
|
||||
Tags: []Tag{
|
||||
{
|
||||
Key: "bar",
|
||||
Value: "b az",
|
||||
},
|
||||
},
|
||||
})
|
||||
f("foo;b ar=baz", &Row{
|
||||
Metric: "foo",
|
||||
Tags: []Tag{
|
||||
{
|
||||
Key: "b ar",
|
||||
Value: "baz",
|
||||
},
|
||||
},
|
||||
})
|
||||
f("foo", &Row{
|
||||
Metric: "foo",
|
||||
})
|
||||
f("foo;bar=123;baz=aabb", &Row{
|
||||
f("foo;bar=123;baz=aa=bb", &Row{
|
||||
Metric: "foo",
|
||||
Tags: []Tag{
|
||||
{
|
||||
|
@ -52,7 +102,7 @@ func TestUnmarshalMetricAndTagsSuccess(t *testing.T) {
|
|||
},
|
||||
{
|
||||
Key: "baz",
|
||||
Value: "aabb",
|
||||
Value: "aa=bb",
|
||||
},
|
||||
},
|
||||
})
|
||||
|
@ -74,16 +124,9 @@ func TestRowsUnmarshalFailure(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
// Missing metric
|
||||
f(" 123 455")
|
||||
|
||||
// Missing value
|
||||
f("aaa")
|
||||
|
||||
// unexpected space in tag value
|
||||
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/99
|
||||
f("s;tag1=aaa1;tag2=bb b2;tag3=ccc3 1")
|
||||
|
||||
// invalid value
|
||||
f("aa bb")
|
||||
|
||||
|
@ -103,7 +146,7 @@ func TestRowsUnmarshalSuccess(t *testing.T) {
|
|||
// Try unmarshaling again
|
||||
rows.Unmarshal(s)
|
||||
if !reflect.DeepEqual(rows.Rows, rowsExpected.Rows) {
|
||||
t.Fatalf("unexpected rows;\ngot\n%+v;\nwant\n%+v", rows.Rows, rowsExpected.Rows)
|
||||
t.Fatalf("unexpected rows on second unmarshal;\ngot\n%+v;\nwant\n%+v", rows.Rows, rowsExpected.Rows)
|
||||
}
|
||||
|
||||
rows.Reset()
|
||||
|
@ -119,6 +162,12 @@ func TestRowsUnmarshalSuccess(t *testing.T) {
|
|||
f("\n\r\n", &Rows{})
|
||||
|
||||
// Single line
|
||||
f(" 123 455", &Rows{
|
||||
Rows: []Row{{
|
||||
Metric: "123",
|
||||
Value: 455,
|
||||
}},
|
||||
})
|
||||
f("foobar -123.456 789", &Rows{
|
||||
Rows: []Row{{
|
||||
Metric: "foobar",
|
||||
|
@ -134,6 +183,26 @@ func TestRowsUnmarshalSuccess(t *testing.T) {
|
|||
}},
|
||||
})
|
||||
|
||||
// Whitespace in metric name, tag name and tag value
|
||||
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3102
|
||||
f("s a;ta g1=aaa1;tag2=bb b2;tag3 1 23", &Rows{
|
||||
Rows: []Row{{
|
||||
Metric: "s a",
|
||||
Value: 1,
|
||||
Timestamp: 23,
|
||||
Tags: []Tag{
|
||||
{
|
||||
Key: "ta g1",
|
||||
Value: "aaa1",
|
||||
},
|
||||
{
|
||||
Key: "tag2",
|
||||
Value: "bb b2",
|
||||
},
|
||||
},
|
||||
}},
|
||||
})
|
||||
|
||||
// Missing timestamp
|
||||
f("aaa 1123", &Rows{
|
||||
Rows: []Row{{
|
||||
|
|
Loading…
Reference in a new issue