app/vminsert/influx: allow escaping newline char

Though newline char isn't mentioned in escape rules at https://docs.influxdata.com/influxdb/v1.7/write_protocols/line_protocol_tutorial/ ,
there are reports that such chars occur in real life
This commit is contained in:
Aliaksandr Valialkin 2019-08-23 15:13:23 +03:00
parent f1f8fce4f7
commit b3502b2b39
2 changed files with 48 additions and 6 deletions

View file

@ -62,9 +62,8 @@ func (r *Row) reset() {
r.Timestamp = 0
}
func (r *Row) unmarshal(s string, tagsPool []Tag, fieldsPool []Field) ([]Tag, []Field, error) {
func (r *Row) unmarshal(s string, tagsPool []Tag, fieldsPool []Field, noEscapeChars bool) ([]Tag, []Field, error) {
r.reset()
noEscapeChars := strings.IndexByte(s, '\\') < 0
n := nextUnescapedChar(s, ' ', noEscapeChars)
if n < 0 {
return tagsPool, fieldsPool, fmt.Errorf("cannot find Whitespace I in %q", s)
@ -175,8 +174,9 @@ func (f *Field) unmarshal(s string, noEscapeChars, hasQuotedFields bool) error {
}
func unmarshalRows(dst []Row, s string, tagsPool []Tag, fieldsPool []Field) ([]Row, []Tag, []Field, error) {
noEscapeChars := strings.IndexByte(s, '\\') < 0
for len(s) > 0 {
n := strings.IndexByte(s, '\n')
n := nextUnquotedChar(s, '\n', noEscapeChars, true)
if n == 0 {
// Skip empty line
s = s[1:]
@ -200,7 +200,7 @@ func unmarshalRows(dst []Row, s string, tagsPool []Tag, fieldsPool []Field) ([]R
if n < 0 {
// The last line.
var err error
tagsPool, fieldsPool, err = r.unmarshal(s, tagsPool, fieldsPool)
tagsPool, fieldsPool, err = r.unmarshal(s, tagsPool, fieldsPool, noEscapeChars)
if err != nil {
err = fmt.Errorf("cannot unmarshal Influx line %q: %s", s, err)
return dst, tagsPool, fieldsPool, err
@ -208,7 +208,7 @@ func unmarshalRows(dst []Row, s string, tagsPool []Tag, fieldsPool []Field) ([]R
return dst, tagsPool, fieldsPool, nil
}
var err error
tagsPool, fieldsPool, err = r.unmarshal(s[:n], tagsPool, fieldsPool)
tagsPool, fieldsPool, err = r.unmarshal(s[:n], tagsPool, fieldsPool, noEscapeChars)
if err != nil {
err = fmt.Errorf("cannot unmarshal Influx line %q: %s", s[:n], err)
return dst, tagsPool, fieldsPool, err
@ -281,7 +281,7 @@ func unescapeTagValue(s string, noEscapeChars bool) string {
return string(append(dst, '\\'))
}
ch := s[0]
if ch != ' ' && ch != ',' && ch != '=' && ch != '\\' {
if ch != ' ' && ch != ',' && ch != '=' && ch != '\\' && ch != '\n' {
dst = append(dst, '\\')
}
dst = append(dst, ch)

View file

@ -337,6 +337,27 @@ func TestRowsUnmarshalSuccess(t *testing.T) {
}},
})
// Escape newline
f("fo\\\nb\\,ar,x\\\ny=\\\n\\y a=\"foo\nbar\",b\\\nc\\==34\n", &Rows{
Rows: []Row{{
Measurement: "fo\nb,ar",
Tags: []Tag{{
Key: "x\ny",
Value: "\n\\y",
}},
Fields: []Field{
{
Key: "a",
Value: 0,
},
{
Key: "b\nc=",
Value: 34,
},
},
}},
})
// Multiple lines
f("foo,tag=xyz field=1.23 48934\n"+
"bar x=-1i\n\n", &Rows{
@ -362,6 +383,7 @@ func TestRowsUnmarshalSuccess(t *testing.T) {
},
},
})
// No newline after the second line.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/82
f("foo,tag=xyz field=1.23 48934\n"+
@ -388,4 +410,24 @@ func TestRowsUnmarshalSuccess(t *testing.T) {
},
},
})
f("x,y=z,g=p:\\ \\ 5432\\,\\ gp\\ mon\\ [lol]\\ con10\\ cmd5\\ SELECT f=1", &Rows{
Rows: []Row{{
Measurement: "x",
Tags: []Tag{
{
Key: "y",
Value: "z",
},
{
Key: "g",
Value: "p: 5432, gp mon [lol] con10 cmd5 SELECT",
},
},
Fields: []Field{{
Key: "f",
Value: 1,
}},
}},
})
}