lib/protoparser/csvimport: properly parse the last empty column in CSV line

Do not ignore the last empty column in CSV line.
While at it, properly parse CSV columns in single quotes, e.g. `'foo,bar',baz` is parsed as two columns - `foo,bar` and `baz`

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4048

See also https://github.com/VictoriaMetrics/VictoriaMetrics/pull/4298
This commit is contained in:
Aliaksandr Valialkin 2023-05-12 15:16:55 -07:00
parent 7e9a74c865
commit a892f22bf7
No known key found for this signature in database
GPG key ID: A72BEC6CD3D0DED1
6 changed files with 204 additions and 97 deletions

View file

@ -20,6 +20,7 @@ The following tip changes can be tested by building VictoriaMetrics components f
* BUGFIX: reduce the probability of sudden increase in the number of small parts on systems with small number of CPU cores.
* BUGFIX: reduce the possibility of increased CPU usage when data with timestamps older than one hour is ingested into VictoriaMetrics. This reduces spikes for the graph `sum(rate(vm_slow_per_day_index_inserts_total))`. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/4258).
* BUGFIX: do not ignore trailing empty field in CSV lines when [importing data in CSV format](https://docs.victoriametrics.com/#how-to-import-csv-data). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4048).
* BUGFIX: [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html): fix a panic when the duration in the query contains uppercase `M` suffix. Such a suffix isn't allowed to use in durations, since it clashes with `a million` suffix, e.g. it isn't clear whether `rate(metric[5M])` means rate over 5 minutes, 5 months or 5 million seconds. See [this](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3589) and [this](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4120) issues.
* BUGFIX: [VictoriaMetrics cluster](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html): prevent from possible panic when the number of vmstorage nodes increases when [automatic vmstorage discovery](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#automatic-vmstorage-discovery) is enabled.
* BUGFIX: properly limit the number of [OpenTSDB HTTP](https://docs.victoriametrics.com/#sending-opentsdb-data-via-http-apiput-requests) concurrent requests specified via `-maxConcurrentInserts` command-line flag. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4204). Thanks to @zouxiang1993 for [the fix](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/4208).

View file

@ -28,6 +28,10 @@ type ColumnDescriptor struct {
MetricName string
}
func (cd *ColumnDescriptor) isEmpty() bool {
return cd.ParseTimestamp == nil && cd.TagName == "" && cd.MetricName == ""
}
const maxColumnsPerRow = 64 * 1024
// ParseColumnDescriptors parses column descriptors from s.

View file

@ -88,6 +88,10 @@ func parseRows(sc *scanner, dst []Row, tags []Tag, metrics []metric, cds []Colum
}
cd := &cds[col]
col++
if cd.isEmpty() || sc.Column == "" {
// Ignore empty column.
continue
}
if parseTimestamp := cd.ParseTimestamp; parseTimestamp != nil {
timestamp, err := parseTimestamp(sc.Column)
if err != nil {
@ -105,9 +109,8 @@ func parseRows(sc *scanner, dst []Row, tags []Tag, metrics []metric, cds []Colum
continue
}
metricName := cd.MetricName
if metricName == "" || sc.Column == "" {
// The given field is ignored.
continue
if metricName == "" {
logger.Panicf("BUG: unexpected empty MetricName")
}
value, err := fastfloat.Parse(sc.Column)
if err != nil {
@ -127,7 +130,7 @@ func parseRows(sc *scanner, dst []Row, tags []Tag, metrics []metric, cds []Colum
continue
}
if len(metrics) == 0 {
logger.Panicf("BUG: expecting at least a single metric in columnDescriptors=%#v", cds)
continue
}
r.Metric = metrics[0].Name
r.Tags = tags[tagsLen:]

View file

@ -30,6 +30,8 @@ func TestRowsUnmarshalFailure(t *testing.T) {
// Missing columns
f("3:metric:aaa", "123,456")
f("1:metric:foo,2:label:bar", "123")
f("1:label:foo,2:metric:bar", "aaa")
// Invalid value
f("1:metric:foo", "12foobar")
@ -189,6 +191,35 @@ func TestRowsUnmarshalSuccess(t *testing.T) {
Value: 2,
},
})
// last metric with empty value
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4048
f("1:metric:foo,2:metric:bar", `123,`, []Row{
{
Metric: "foo",
Value: 123,
},
})
// all the metrics with empty values
f(`1:metric:foo,2:metric:bar,3:label:xx`, `,,abc`, nil)
// labels with empty value
f("1:metric:foo,2:label:bar,3:label:baz,4:label:xxx", "123,x,,", []Row{
{
Metric: "foo",
Tags: []Tag{
{
Key: "bar",
Value: "x",
},
},
Value: 123,
},
})
f("1:metric:foo,2:label:bar,3:label:baz,4:label:xxx", "123,,,", []Row{
{
Metric: "foo",
Value: 123,
},
})
// see https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3540
f("1:label:mytest,2:time:rfc3339,3:metric:M10,4:metric:M20,5:metric:M30,6:metric:M40,7:metric:M50,8:metric:M60",
`test,2022-12-25T16:57:12+01:00,10,20,30,,,60,70,80`, []Row{

View file

@ -17,6 +17,9 @@ type scanner struct {
// It is cleared on NextLine call.
Error error
// isLastColumn is set to true when the last column at the given Line is processed
isLastColumn bool
s string
}
@ -25,6 +28,7 @@ func (sc *scanner) Init(s string) {
sc.Line = ""
sc.Column = ""
sc.Error = nil
sc.isLastColumn = false
sc.s = s
}
@ -35,23 +39,26 @@ func (sc *scanner) Init(s string) {
// false is returned if no more lines left in sc.s
func (sc *scanner) NextLine() bool {
s := sc.s
sc.Line = ""
sc.Error = nil
sc.isLastColumn = false
for len(s) > 0 {
n := strings.IndexByte(s, '\n')
var line string
if n >= 0 {
line = trimTrailingSpace(s[:n])
line = trimTrailingCR(s[:n])
s = s[n+1:]
} else {
line = trimTrailingSpace(s)
line = trimTrailingCR(s)
s = ""
}
sc.Line = line
sc.s = s
if len(line) > 0 {
sc.Line = line
sc.s = s
return true
}
}
sc.s = ""
return false
}
@ -60,16 +67,28 @@ func (sc *scanner) NextLine() bool {
// false is returned if no more columns left in sc.Line or if any error occurs.
// sc.Error is set to error in the case of error.
func (sc *scanner) NextColumn() bool {
if sc.isLastColumn || sc.Error != nil {
return false
}
s := sc.Line
if len(s) == 0 {
return false
}
if sc.Error != nil {
return false
}
if s[0] == '"' {
sc.Column, sc.Line, sc.Error = readQuotedField(s)
return sc.Error == nil
if strings.HasPrefix(s, `"`) || strings.HasPrefix(s, "'") {
field, tail, err := readQuotedField(s)
if err != nil {
sc.Error = err
return false
}
sc.Column = field
if len(tail) == 0 {
sc.isLastColumn = true
} else {
if tail[0] != ',' {
sc.Error = fmt.Errorf("missing comma after quoted field in %q", s)
return false
}
tail = tail[1:]
}
sc.Line = tail
return true
}
n := strings.IndexByte(s, ',')
if n >= 0 {
@ -78,11 +97,12 @@ func (sc *scanner) NextColumn() bool {
} else {
sc.Column = s
sc.Line = ""
sc.isLastColumn = true
}
return true
}
func trimTrailingSpace(s string) string {
func trimTrailingCR(s string) string {
if len(s) > 0 && s[len(s)-1] == '\r' {
return s[:len(s)-1]
}
@ -90,38 +110,32 @@ func trimTrailingSpace(s string) string {
}
func readQuotedField(s string) (string, string, error) {
sOrig := s
if len(s) == 0 || s[0] != '"' {
return "", sOrig, fmt.Errorf("missing opening quote for %q", sOrig)
quote := s[0]
offset := 1
n := strings.IndexByte(s[offset:], quote)
if n < 0 {
return "", s, fmt.Errorf("missing closing quote for %q", s)
}
s = s[1:]
hasEscapedQuote := false
offset += n + 1
if offset >= len(s) || s[offset] != quote {
// Fast path - the quoted string doesn't contain escaped quotes
return s[1 : offset-1], s[offset:], nil
}
// Slow path - the quoted string contains escaped quote
buf := make([]byte, 0, len(s)-2)
buf = append(buf, s[1:offset]...)
for {
n := strings.IndexByte(s, '"')
offset++
n := strings.IndexByte(s[offset:], quote)
if n < 0 {
return "", sOrig, fmt.Errorf("missing closing quote for %q", sOrig)
return "", s, fmt.Errorf("missing closing quote for %q", s)
}
s = s[n+1:]
if len(s) == 0 {
// The end of string found
return unquote(sOrig[1:len(sOrig)-1], hasEscapedQuote), "", nil
}
if s[0] == '"' {
// Take into account escaped quote
s = s[1:]
hasEscapedQuote = true
buf = append(buf, s[offset:offset+n]...)
offset += n + 1
if offset < len(s) && s[offset] == quote {
buf = append(buf, quote)
continue
}
if s[0] != ',' {
return "", sOrig, fmt.Errorf("missing comma after quoted field in %q", sOrig)
}
return unquote(sOrig[1:len(sOrig)-len(s)-1], hasEscapedQuote), s[1:], nil
return string(buf), s[offset:], nil
}
}
func unquote(s string, hasEscapedQuote bool) string {
if !hasEscapedQuote {
return s
}
return strings.ReplaceAll(s, `""`, `"`)
}

View file

@ -1,63 +1,117 @@
package csvimport
import (
"reflect"
"testing"
)
func TestReadQuotedFieldSuccess(t *testing.T) {
f := func(s, resultExpected, tailExpected string) {
t.Helper()
result, tail, err := readQuotedField(s)
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
if result != resultExpected {
t.Fatalf("unexpected result; got %q; want %q", result, resultExpected)
}
if tail != tailExpected {
t.Fatalf("unexpected tail; got %q; want %q", tail, tailExpected)
}
}
// double quotes
f(`""`, ``, ``)
f(`"",`, ``, `,`)
f(`"",foobar`, ``, `,foobar`)
f(`"","bc"`, ``, `,"bc"`)
f(`"a"`, `a`, ``)
f(`"a"bc`, `a`, `bc`)
f(`"foo`+"`',\n\t\r"+`bar"baz`, "foo`',\n\t\rbar", "baz")
// single quotes
f(`''`, ``, ``)
f(`'',`, ``, `,`)
f(`'',foobar`, ``, `,foobar`)
f(`'','bc'`, ``, `,'bc'`)
f(`'a'`, `a`, ``)
f(`'a'bc`, `a`, `bc`)
f(`'foo"`+"`,\n\t\r"+`bar'baz`, "foo\"`,\n\t\rbar", "baz")
// escaped double quotes
f(`" foo""bar"baz`, ` foo"bar`, `baz`)
f(`""""bar"baz`, `"`, `bar"baz`)
f(`"a,""b""'c",d,"e"`, `a,"b"'c`, `,d,"e"`)
// escaped single quotes
f(`' foo''bar'baz`, ` foo'bar`, `baz`)
f(`''''bar'baz`, `'`, `bar'baz`)
f(`'''bar'''baz`, `'bar'`, `baz`)
f(`'a,''b''"c',d,'e'`, `a,'b'"c`, `,d,'e'`)
}
func TestReadQuotedFieldFailure(t *testing.T) {
f := func(s string) {
t.Helper()
field, tail, err := readQuotedField(s)
if field != "" {
t.Fatalf("unexpected non-empty field returned: %q", field)
}
if tail != s {
t.Fatalf("unexpected tail returned; got %q; want %q", tail, s)
}
if err == nil {
t.Fatalf("expecting non-nil error")
}
}
f(`"`)
f(`'`)
f(`"foo""`)
f(`'foo''`)
f(`'foo''`)
}
func TestScannerSuccess(t *testing.T) {
var sc scanner
sc.Init("foo,bar\n\"aa,\"\"bb\",\"\"")
if !sc.NextLine() {
t.Fatalf("expecting the first line")
}
if sc.Line != "foo,bar" {
t.Fatalf("unexpected line; got %q; want %q", sc.Line, "foo,bar")
}
if !sc.NextColumn() {
t.Fatalf("expecting the first column")
}
if sc.Column != "foo" {
t.Fatalf("unexpected first column; got %q; want %q", sc.Column, "foo")
}
if !sc.NextColumn() {
t.Fatalf("expecting the second column")
}
if sc.Column != "bar" {
t.Fatalf("unexpected second column; got %q; want %q", sc.Column, "bar")
}
if sc.NextColumn() {
t.Fatalf("unexpected next column: %q", sc.Column)
}
if sc.Error != nil {
t.Fatalf("unexpected error: %s", sc.Error)
}
if !sc.NextLine() {
t.Fatalf("expecting the second line")
}
if sc.Line != "\"aa,\"\"bb\",\"\"" {
t.Fatalf("unexpected the second line; got %q; want %q", sc.Line, "\"aa,\"\"bb\",\"\"")
}
if !sc.NextColumn() {
t.Fatalf("expecting the first column on the second line")
}
if sc.Column != "aa,\"bb" {
t.Fatalf("unexpected column on the second line; got %q; want %q", sc.Column, "aa,\"bb")
}
if !sc.NextColumn() {
t.Fatalf("expecting the second column on the second line")
}
if sc.Column != "" {
t.Fatalf("unexpected column on the second line; got %q; want %q", sc.Column, "")
}
if sc.NextColumn() {
t.Fatalf("unexpected next column on the second line: %q", sc.Column)
}
if sc.Error != nil {
t.Fatalf("unexpected error: %s", sc.Error)
}
if sc.NextLine() {
t.Fatalf("unexpected next line: %q", sc.Line)
f := func(s string, rowsExpected [][]string) {
t.Helper()
var sc scanner
sc.Init(s)
var rows [][]string
for sc.NextLine() {
var row []string
for sc.NextColumn() {
row = append(row, sc.Column)
}
rows = append(rows, row)
}
if sc.Error != nil {
t.Fatalf("unexpected error: %s", sc.Error)
}
if !reflect.DeepEqual(rows, rowsExpected) {
t.Fatalf("unexpected rows;\ngot\n%q\nwant\n%q", rows, rowsExpected)
}
}
f("", nil)
f("\n", nil)
f("\r\n\n\r", nil)
f("foo,bar\n\"aa,\"\"bb\",\"\"", [][]string{
{"foo", "bar"},
{`aa,"bb`, ``},
})
f(`fo"bar,baz'a,"bc""de",'g''e'`, [][]string{
{`fo"bar`, `baz'a`, `bc"de`, `g'e`},
})
f(`,`, [][]string{
{``, ``},
})
f(`foo`, [][]string{
{`foo`},
})
f(`foo,,`+"\r\n"+`,bar,`+"\n", [][]string{
{`foo`, ``, ``},
{``, `bar`, ``},
})
}
func TestScannerFailure(t *testing.T) {