app/vminsert: skip empty tags

This commit is contained in:
Aliaksandr Valialkin 2019-08-24 13:35:29 +03:00
parent a283023d16
commit 1ee536f9fd
8 changed files with 50 additions and 37 deletions

View file

@ -80,6 +80,9 @@ func (r *Row) unmarshal(s string, tagsPool []Tag) ([]Tag, error) {
tags := tagsPool[tagsStart:] tags := tagsPool[tagsStart:]
r.Tags = tags[:len(tags):len(tags)] r.Tags = tags[:len(tags):len(tags)]
} }
if len(r.Metric) == 0 {
return tagsPool, fmt.Errorf("metric cannot be empty")
}
n = strings.IndexByte(tail, ' ') n = strings.IndexByte(tail, ' ')
if n < 0 { if n < 0 {
@ -147,12 +150,20 @@ func unmarshalTags(dst []Tag, s string) ([]Tag, error) {
if err := tag.unmarshal(s); err != nil { if err := tag.unmarshal(s); err != nil {
return dst[:len(dst)-1], err return dst[:len(dst)-1], err
} }
if len(tag.Key) == 0 || len(tag.Value) == 0 {
// Skip empty tag
dst = dst[:len(dst)-1]
}
return dst, nil return dst, nil
} }
if err := tag.unmarshal(s[:n]); err != nil { if err := tag.unmarshal(s[:n]); err != nil {
return dst[:len(dst)-1], err return dst[:len(dst)-1], err
} }
s = s[n+1:] s = s[n+1:]
if len(tag.Key) == 0 || len(tag.Value) == 0 {
// Skip empty tag
dst = dst[:len(dst)-1]
}
} }
} }
@ -174,9 +185,6 @@ func (t *Tag) unmarshal(s string) error {
return fmt.Errorf("missing tag value for %q", s) return fmt.Errorf("missing tag value for %q", s)
} }
t.Key = s[:n] t.Key = s[:n]
if len(t.Key) == 0 {
return fmt.Errorf("tag key cannot be empty for %q", s)
}
t.Value = s[n+1:] t.Value = s[n+1:]
return nil return nil
} }

View file

@ -21,6 +21,9 @@ func TestRowsUnmarshalFailure(t *testing.T) {
} }
} }
// Missing metric
f(" 123 455")
// Missing value // Missing value
f("aaa") f("aaa")
@ -29,7 +32,6 @@ func TestRowsUnmarshalFailure(t *testing.T) {
// missing tag value // missing tag value
f("aa;bb 23 34") f("aa;bb 23 34")
f("aa;=dsd 234 45")
} }
func TestRowsUnmarshalSuccess(t *testing.T) { func TestRowsUnmarshalSuccess(t *testing.T) {
@ -95,7 +97,8 @@ func TestRowsUnmarshalSuccess(t *testing.T) {
Timestamp: 2, Timestamp: 2,
}}, }},
}) })
f("foo;bar=baz;aa=;x=y 1 2", &Rows{ // Empty tags
f("foo;bar=baz;aa=;x=y;=z 1 2", &Rows{
Rows: []Row{{ Rows: []Row{{
Metric: "foo", Metric: "foo",
Tags: []Tag{ Tags: []Tag{
@ -103,10 +106,6 @@ func TestRowsUnmarshalSuccess(t *testing.T) {
Key: "bar", Key: "bar",
Value: "baz", Value: "baz",
}, },
{
Key: "aa",
Value: "",
},
{ {
Key: "x", Key: "x",
Value: "y", Value: "y",

View file

@ -137,9 +137,6 @@ func (tag *Tag) unmarshal(s string, noEscapeChars bool) error {
return fmt.Errorf("missing tag value for %q", s) return fmt.Errorf("missing tag value for %q", s)
} }
tag.Key = unescapeTagValue(s[:n], noEscapeChars) tag.Key = unescapeTagValue(s[:n], noEscapeChars)
if len(tag.Key) == 0 {
return fmt.Errorf("tag key cannot be empty")
}
tag.Value = unescapeTagValue(s[n+1:], noEscapeChars) tag.Value = unescapeTagValue(s[n+1:], noEscapeChars)
return nil return nil
} }
@ -229,14 +226,22 @@ func unmarshalTags(dst []Tag, s string, noEscapeChars bool) ([]Tag, error) {
n := nextUnescapedChar(s, ',', noEscapeChars) n := nextUnescapedChar(s, ',', noEscapeChars)
if n < 0 { if n < 0 {
if err := tag.unmarshal(s, noEscapeChars); err != nil { if err := tag.unmarshal(s, noEscapeChars); err != nil {
return dst, err return dst[:len(dst)-1], err
}
if len(tag.Key) == 0 || len(tag.Value) == 0 {
// Skip empty tag
dst = dst[:len(dst)-1]
} }
return dst, nil return dst, nil
} }
if err := tag.unmarshal(s[:n], noEscapeChars); err != nil { if err := tag.unmarshal(s[:n], noEscapeChars); err != nil {
return dst, err return dst[:len(dst)-1], err
} }
s = s[n+1:] s = s[n+1:]
if len(tag.Key) == 0 || len(tag.Value) == 0 {
// Skip empty tag
dst = dst[:len(dst)-1]
}
} }
} }

View file

@ -96,12 +96,8 @@ func TestRowsUnmarshalFailure(t *testing.T) {
// Missing tag value // Missing tag value
f("foo,bar") f("foo,bar")
f("foo,bar baz") f("foo,bar baz")
f("foo,bar= baz")
f("foo,bar=123, 123") f("foo,bar=123, 123")
// Missing tag name
f("foo,=bar baz=234")
// Missing field value // Missing field value
f("foo bar") f("foo bar")
f("foo bar=") f("foo bar=")
@ -229,7 +225,7 @@ func TestRowsUnmarshalSuccess(t *testing.T) {
}) })
// Line with empty tag values // Line with empty tag values
f("foo,tag1=xyz,tagN=,tag2=43as bar=123", &Rows{ f("foo,tag1=xyz,tagN=,tag2=43as,=xxx bar=123", &Rows{
Rows: []Row{{ Rows: []Row{{
Measurement: "foo", Measurement: "foo",
Tags: []Tag{ Tags: []Tag{
@ -237,10 +233,6 @@ func TestRowsUnmarshalSuccess(t *testing.T) {
Key: "tag1", Key: "tag1",
Value: "xyz", Value: "xyz",
}, },
{
Key: "tagN",
Value: "",
},
{ {
Key: "tag2", Key: "tag2",
Value: "43as", Value: "43as",

View file

@ -66,6 +66,9 @@ func (r *Row) unmarshal(s string, tagsPool []Tag) ([]Tag, error) {
return tagsPool, fmt.Errorf("cannot find whitespace between metric and timestamp in %q", s) return tagsPool, fmt.Errorf("cannot find whitespace between metric and timestamp in %q", s)
} }
r.Metric = s[:n] r.Metric = s[:n]
if len(r.Metric) == 0 {
return tagsPool, fmt.Errorf("metric cannot be empty")
}
tail := s[n+1:] tail := s[n+1:]
n = strings.IndexByte(tail, ' ') n = strings.IndexByte(tail, ' ')
if n < 0 { if n < 0 {
@ -144,12 +147,20 @@ func unmarshalTags(dst []Tag, s string) ([]Tag, error) {
if err := tag.unmarshal(s); err != nil { if err := tag.unmarshal(s); err != nil {
return dst[:len(dst)-1], err return dst[:len(dst)-1], err
} }
if len(tag.Key) == 0 || len(tag.Value) == 0 {
// Skip empty tag
dst = dst[:len(dst)-1]
}
return dst, nil return dst, nil
} }
if err := tag.unmarshal(s[:n]); err != nil { if err := tag.unmarshal(s[:n]); err != nil {
return dst[:len(dst)-1], err return dst[:len(dst)-1], err
} }
s = s[n+1:] s = s[n+1:]
if len(tag.Key) == 0 || len(tag.Value) == 0 {
// Skip empty tag
dst = dst[:len(dst)-1]
}
} }
} }
@ -171,9 +182,6 @@ func (t *Tag) unmarshal(s string) error {
return fmt.Errorf("missing tag value for %q", s) return fmt.Errorf("missing tag value for %q", s)
} }
t.Key = s[:n] t.Key = s[:n]
if len(t.Key) == 0 {
return fmt.Errorf("tag key cannot be empty for %q", s)
}
t.Value = s[n+1:] t.Value = s[n+1:]
return nil return nil
} }

View file

@ -24,6 +24,9 @@ func TestRowsUnmarshalFailure(t *testing.T) {
// Missing put prefix // Missing put prefix
f("xx") f("xx")
// Missing metric
f("put 111 34")
// Missing timestamp // Missing timestamp
f("put aaa") f("put aaa")
@ -44,9 +47,6 @@ func TestRowsUnmarshalFailure(t *testing.T) {
// Invalid tag // Invalid tag
f("put aaa 123 4.5 foo") f("put aaa 123 4.5 foo")
f("put aaa 123 4.5 =")
f("put aaa 123 4.5 =foo")
f("put aaa 123 4.5 =foo a=b")
} }
func TestRowsUnmarshalSuccess(t *testing.T) { func TestRowsUnmarshalSuccess(t *testing.T) {
@ -88,17 +88,13 @@ func TestRowsUnmarshalSuccess(t *testing.T) {
}}, }},
}}, }},
}) })
// Empty tag value // Empty tag
f("put foobar 789 -123.456 a= b=c", &Rows{ f("put foobar 789 -123.456 a= b=c =d", &Rows{
Rows: []Row{{ Rows: []Row{{
Metric: "foobar", Metric: "foobar",
Value: -123.456, Value: -123.456,
Timestamp: 789, Timestamp: 789,
Tags: []Tag{ Tags: []Tag{
{
Key: "a",
Value: "",
},
{ {
Key: "b", Key: "b",
Value: "c", Value: "c",

View file

@ -58,7 +58,7 @@ func (r *Row) reset() {
func (r *Row) unmarshal(o *fastjson.Value, tagsPool []Tag) ([]Tag, error) { func (r *Row) unmarshal(o *fastjson.Value, tagsPool []Tag) ([]Tag, error) {
r.reset() r.reset()
m := o.GetStringBytes("metric") m := o.GetStringBytes("metric")
if m == nil { if len(m) == 0 {
return tagsPool, fmt.Errorf("missing `metric` in %s", o) return tagsPool, fmt.Errorf("missing `metric` in %s", o)
} }
r.Metric = bytesutil.ToUnsafeString(m) r.Metric = bytesutil.ToUnsafeString(m)
@ -165,6 +165,10 @@ func unmarshalTags(dst []Tag, o *fastjson.Object) ([]Tag, error) {
err = fmt.Errorf("tag value must be string; got %s; value=%s", v.Type(), v) err = fmt.Errorf("tag value must be string; got %s; value=%s", v.Type(), v)
return return
} }
if len(k) == 0 {
// Skip empty tags
return
}
vStr, _ := v.StringBytes() vStr, _ := v.StringBytes()
if len(vStr) == 0 { if len(vStr) == 0 {
// Skip empty tags // Skip empty tags

View file

@ -50,6 +50,7 @@ func TestRowsUnmarshalFailure(t *testing.T) {
f(`{"metric": "aaa", "timestamp": 1122, "value": "0.0.0"}`) f(`{"metric": "aaa", "timestamp": 1122, "value": "0.0.0"}`)
// Invalid metric type // Invalid metric type
f(`{"metric": "", "timestamp": 1122, "value": 0.45, "tags": {"foo": "bar"}}`)
f(`{"metric": ["aaa"], "timestamp": 1122, "value": 0.45, "tags": {"foo": "bar"}}`) f(`{"metric": ["aaa"], "timestamp": 1122, "value": 0.45, "tags": {"foo": "bar"}}`)
f(`{"metric": {"aaa":1}, "timestamp": 1122, "value": 0.45, "tags": {"foo": "bar"}}`) f(`{"metric": {"aaa":1}, "timestamp": 1122, "value": 0.45, "tags": {"foo": "bar"}}`)
f(`{"metric": 1, "timestamp": 1122, "value": 0.45, "tags": {"foo": "bar"}}`) f(`{"metric": 1, "timestamp": 1122, "value": 0.45, "tags": {"foo": "bar"}}`)
@ -161,7 +162,7 @@ func TestRowsUnmarshalSuccess(t *testing.T) {
}}, }},
}) })
// Empty tag value // Empty tag value
f(`{"metric": "foobar", "timestamp": 123, "value": -123.456, "tags": {"a":"", "b":"c"}}`, &Rows{ f(`{"metric": "foobar", "timestamp": 123, "value": -123.456, "tags": {"a":"", "b":"c", "": "d"}}`, &Rows{
Rows: []Row{{ Rows: []Row{{
Metric: "foobar", Metric: "foobar",
Value: -123.456, Value: -123.456,