From 1ee536f9fd6604099d50d00ad11e5a613edd77dc Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Sat, 24 Aug 2019 13:35:29 +0300 Subject: [PATCH] app/vminsert: skip empty tags --- app/vminsert/graphite/parser.go | 14 +++++++++++--- app/vminsert/graphite/parser_test.go | 11 +++++------ app/vminsert/influx/parser.go | 15 ++++++++++----- app/vminsert/influx/parser_test.go | 10 +--------- app/vminsert/opentsdb/parser.go | 14 +++++++++++--- app/vminsert/opentsdb/parser_test.go | 14 +++++--------- app/vminsert/opentsdbhttp/parser.go | 6 +++++- app/vminsert/opentsdbhttp/parser_test.go | 3 ++- 8 files changed, 50 insertions(+), 37 deletions(-) diff --git a/app/vminsert/graphite/parser.go b/app/vminsert/graphite/parser.go index 6c3302494..79838f3db 100644 --- a/app/vminsert/graphite/parser.go +++ b/app/vminsert/graphite/parser.go @@ -80,6 +80,9 @@ func (r *Row) unmarshal(s string, tagsPool []Tag) ([]Tag, error) { tags := tagsPool[tagsStart:] r.Tags = tags[:len(tags):len(tags)] } + if len(r.Metric) == 0 { + return tagsPool, fmt.Errorf("metric cannot be empty") + } n = strings.IndexByte(tail, ' ') if n < 0 { @@ -147,12 +150,20 @@ func unmarshalTags(dst []Tag, s string) ([]Tag, error) { if err := tag.unmarshal(s); err != nil { return dst[:len(dst)-1], err } + if len(tag.Key) == 0 || len(tag.Value) == 0 { + // Skip empty tag + dst = dst[:len(dst)-1] + } return dst, nil } if err := tag.unmarshal(s[:n]); err != nil { return dst[:len(dst)-1], err } s = s[n+1:] + if len(tag.Key) == 0 || len(tag.Value) == 0 { + // Skip empty tag + dst = dst[:len(dst)-1] + } } } @@ -174,9 +185,6 @@ func (t *Tag) unmarshal(s string) error { return fmt.Errorf("missing tag value for %q", s) } t.Key = s[:n] - if len(t.Key) == 0 { - return fmt.Errorf("tag key cannot be empty for %q", s) - } t.Value = s[n+1:] return nil } diff --git a/app/vminsert/graphite/parser_test.go b/app/vminsert/graphite/parser_test.go index 020496c35..a3fa44e48 100644 --- a/app/vminsert/graphite/parser_test.go +++ b/app/vminsert/graphite/parser_test.go @@ -21,6 +21,9 @@ func TestRowsUnmarshalFailure(t *testing.T) { } } + // Missing metric + f(" 123 455") + // Missing value f("aaa") @@ -29,7 +32,6 @@ func TestRowsUnmarshalFailure(t *testing.T) { // missing tag value f("aa;bb 23 34") - f("aa;=dsd 234 45") } func TestRowsUnmarshalSuccess(t *testing.T) { @@ -95,7 +97,8 @@ func TestRowsUnmarshalSuccess(t *testing.T) { Timestamp: 2, }}, }) - f("foo;bar=baz;aa=;x=y 1 2", &Rows{ + // Empty tags + f("foo;bar=baz;aa=;x=y;=z 1 2", &Rows{ Rows: []Row{{ Metric: "foo", Tags: []Tag{ @@ -103,10 +106,6 @@ func TestRowsUnmarshalSuccess(t *testing.T) { Key: "bar", Value: "baz", }, - { - Key: "aa", - Value: "", - }, { Key: "x", Value: "y", diff --git a/app/vminsert/influx/parser.go b/app/vminsert/influx/parser.go index 663a36ee7..ae586adad 100644 --- a/app/vminsert/influx/parser.go +++ b/app/vminsert/influx/parser.go @@ -137,9 +137,6 @@ func (tag *Tag) unmarshal(s string, noEscapeChars bool) error { return fmt.Errorf("missing tag value for %q", s) } tag.Key = unescapeTagValue(s[:n], noEscapeChars) - if len(tag.Key) == 0 { - return fmt.Errorf("tag key cannot be empty") - } tag.Value = unescapeTagValue(s[n+1:], noEscapeChars) return nil } @@ -229,14 +226,22 @@ func unmarshalTags(dst []Tag, s string, noEscapeChars bool) ([]Tag, error) { n := nextUnescapedChar(s, ',', noEscapeChars) if n < 0 { if err := tag.unmarshal(s, noEscapeChars); err != nil { - return dst, err + return dst[:len(dst)-1], err + } + if len(tag.Key) == 0 || len(tag.Value) == 0 { + // Skip empty tag + dst = dst[:len(dst)-1] } return dst, nil } if err := tag.unmarshal(s[:n], noEscapeChars); err != nil { - return dst, err + return dst[:len(dst)-1], err } s = s[n+1:] + if len(tag.Key) == 0 || len(tag.Value) == 0 { + // Skip empty tag + dst = dst[:len(dst)-1] + } } } diff --git a/app/vminsert/influx/parser_test.go b/app/vminsert/influx/parser_test.go index 559e9adcb..207ea35c9 100644 --- a/app/vminsert/influx/parser_test.go +++ b/app/vminsert/influx/parser_test.go @@ -96,12 +96,8 @@ func TestRowsUnmarshalFailure(t *testing.T) { // Missing tag value f("foo,bar") f("foo,bar baz") - f("foo,bar= baz") f("foo,bar=123, 123") - // Missing tag name - f("foo,=bar baz=234") - // Missing field value f("foo bar") f("foo bar=") @@ -229,7 +225,7 @@ func TestRowsUnmarshalSuccess(t *testing.T) { }) // Line with empty tag values - f("foo,tag1=xyz,tagN=,tag2=43as bar=123", &Rows{ + f("foo,tag1=xyz,tagN=,tag2=43as,=xxx bar=123", &Rows{ Rows: []Row{{ Measurement: "foo", Tags: []Tag{ @@ -237,10 +233,6 @@ func TestRowsUnmarshalSuccess(t *testing.T) { Key: "tag1", Value: "xyz", }, - { - Key: "tagN", - Value: "", - }, { Key: "tag2", Value: "43as", diff --git a/app/vminsert/opentsdb/parser.go b/app/vminsert/opentsdb/parser.go index 360dea5af..2372cc6b4 100644 --- a/app/vminsert/opentsdb/parser.go +++ b/app/vminsert/opentsdb/parser.go @@ -66,6 +66,9 @@ func (r *Row) unmarshal(s string, tagsPool []Tag) ([]Tag, error) { return tagsPool, fmt.Errorf("cannot find whitespace between metric and timestamp in %q", s) } r.Metric = s[:n] + if len(r.Metric) == 0 { + return tagsPool, fmt.Errorf("metric cannot be empty") + } tail := s[n+1:] n = strings.IndexByte(tail, ' ') if n < 0 { @@ -144,12 +147,20 @@ func unmarshalTags(dst []Tag, s string) ([]Tag, error) { if err := tag.unmarshal(s); err != nil { return dst[:len(dst)-1], err } + if len(tag.Key) == 0 || len(tag.Value) == 0 { + // Skip empty tag + dst = dst[:len(dst)-1] + } return dst, nil } if err := tag.unmarshal(s[:n]); err != nil { return dst[:len(dst)-1], err } s = s[n+1:] + if len(tag.Key) == 0 || len(tag.Value) == 0 { + // Skip empty tag + dst = dst[:len(dst)-1] + } } } @@ -171,9 +182,6 @@ func (t *Tag) unmarshal(s string) error { return fmt.Errorf("missing tag value for %q", s) } t.Key = s[:n] - if len(t.Key) == 0 { - return fmt.Errorf("tag key cannot be empty for %q", s) - } t.Value = s[n+1:] return nil } diff --git a/app/vminsert/opentsdb/parser_test.go b/app/vminsert/opentsdb/parser_test.go index 4524c779a..63a245424 100644 --- a/app/vminsert/opentsdb/parser_test.go +++ b/app/vminsert/opentsdb/parser_test.go @@ -24,6 +24,9 @@ func TestRowsUnmarshalFailure(t *testing.T) { // Missing put prefix f("xx") + // Missing metric + f("put 111 34") + // Missing timestamp f("put aaa") @@ -44,9 +47,6 @@ func TestRowsUnmarshalFailure(t *testing.T) { // Invalid tag f("put aaa 123 4.5 foo") - f("put aaa 123 4.5 =") - f("put aaa 123 4.5 =foo") - f("put aaa 123 4.5 =foo a=b") } func TestRowsUnmarshalSuccess(t *testing.T) { @@ -88,17 +88,13 @@ func TestRowsUnmarshalSuccess(t *testing.T) { }}, }}, }) - // Empty tag value - f("put foobar 789 -123.456 a= b=c", &Rows{ + // Empty tag + f("put foobar 789 -123.456 a= b=c =d", &Rows{ Rows: []Row{{ Metric: "foobar", Value: -123.456, Timestamp: 789, Tags: []Tag{ - { - Key: "a", - Value: "", - }, { Key: "b", Value: "c", diff --git a/app/vminsert/opentsdbhttp/parser.go b/app/vminsert/opentsdbhttp/parser.go index 891ebe5d2..3edabab50 100644 --- a/app/vminsert/opentsdbhttp/parser.go +++ b/app/vminsert/opentsdbhttp/parser.go @@ -58,7 +58,7 @@ func (r *Row) reset() { func (r *Row) unmarshal(o *fastjson.Value, tagsPool []Tag) ([]Tag, error) { r.reset() m := o.GetStringBytes("metric") - if m == nil { + if len(m) == 0 { return tagsPool, fmt.Errorf("missing `metric` in %s", o) } r.Metric = bytesutil.ToUnsafeString(m) @@ -165,6 +165,10 @@ func unmarshalTags(dst []Tag, o *fastjson.Object) ([]Tag, error) { err = fmt.Errorf("tag value must be string; got %s; value=%s", v.Type(), v) return } + if len(k) == 0 { + // Skip empty tags + return + } vStr, _ := v.StringBytes() if len(vStr) == 0 { // Skip empty tags diff --git a/app/vminsert/opentsdbhttp/parser_test.go b/app/vminsert/opentsdbhttp/parser_test.go index 6a9284d37..819c22543 100644 --- a/app/vminsert/opentsdbhttp/parser_test.go +++ b/app/vminsert/opentsdbhttp/parser_test.go @@ -50,6 +50,7 @@ func TestRowsUnmarshalFailure(t *testing.T) { f(`{"metric": "aaa", "timestamp": 1122, "value": "0.0.0"}`) // Invalid metric type + f(`{"metric": "", "timestamp": 1122, "value": 0.45, "tags": {"foo": "bar"}}`) f(`{"metric": ["aaa"], "timestamp": 1122, "value": 0.45, "tags": {"foo": "bar"}}`) f(`{"metric": {"aaa":1}, "timestamp": 1122, "value": 0.45, "tags": {"foo": "bar"}}`) f(`{"metric": 1, "timestamp": 1122, "value": 0.45, "tags": {"foo": "bar"}}`) @@ -161,7 +162,7 @@ func TestRowsUnmarshalSuccess(t *testing.T) { }}, }) // Empty tag value - f(`{"metric": "foobar", "timestamp": 123, "value": -123.456, "tags": {"a":"", "b":"c"}}`, &Rows{ + f(`{"metric": "foobar", "timestamp": 123, "value": -123.456, "tags": {"a":"", "b":"c", "": "d"}}`, &Rows{ Rows: []Row{{ Metric: "foobar", Value: -123.456,