diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index c25c57769..b923f6159 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -60,6 +60,7 @@ The following tip changes can be tested by building VictoriaMetrics components f * BUGFIX: do not panic at Windows during [snapshot deletion](https://docs.victoriametrics.com/#how-to-work-with-snapshots). Instead, delete the snapshot on the next restart. See [this comment](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/70#issuecomment-1491529183) for details. * BUGFIX: change the max allowed value for `-memory.allowedPercent` from 100 to 200. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4171). * BUGFIX: properly limit the number of [OpenTSDB HTTP](https://docs.victoriametrics.com/#sending-opentsdb-data-via-http-apiput-requests) concurrent requests specified via `-maxConcurrentInserts` command-line flag. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4204). Thanks to @zouxiang1993 for [the fix](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/4208). +* BUGFIX: do not ignore trailing empty field in CSV lines when [importing data in CSV format](https://docs.victoriametrics.com/#how-to-import-csv-data). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4048). * BUGFIX: [VictoriaMetrics cluster](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html): prevent from possible panic when the number of vmstorage nodes increases when [automatic vmstorage discovery](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#automatic-vmstorage-discovery) is enabled. * BUGFIX: [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html): fix a panic when the duration in the query contains uppercase `M` suffix. Such a suffix isn't allowed to use in durations, since it clashes with `a million` suffix, e.g. it isn't clear whether `rate(metric[5M])` means rate over 5 minutes, 5 months or 5 million seconds. See [this](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3589) and [this](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4120) issues. * BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): properly handle the `vm_promscrape_config_last_reload_successful` metric after config reload. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4260). diff --git a/lib/protoparser/csvimport/column_descriptor.go b/lib/protoparser/csvimport/column_descriptor.go index 3bdb96a7f..d9ec5a1c6 100644 --- a/lib/protoparser/csvimport/column_descriptor.go +++ b/lib/protoparser/csvimport/column_descriptor.go @@ -28,6 +28,10 @@ type ColumnDescriptor struct { MetricName string } +func (cd *ColumnDescriptor) isEmpty() bool { + return cd.ParseTimestamp == nil && cd.TagName == "" && cd.MetricName == "" +} + const maxColumnsPerRow = 64 * 1024 // ParseColumnDescriptors parses column descriptors from s. diff --git a/lib/protoparser/csvimport/parser.go b/lib/protoparser/csvimport/parser.go index 56e7560b5..77a0b8239 100644 --- a/lib/protoparser/csvimport/parser.go +++ b/lib/protoparser/csvimport/parser.go @@ -88,6 +88,10 @@ func parseRows(sc *scanner, dst []Row, tags []Tag, metrics []metric, cds []Colum } cd := &cds[col] col++ + if cd.isEmpty() || sc.Column == "" { + // Ignore empty column. + continue + } if parseTimestamp := cd.ParseTimestamp; parseTimestamp != nil { timestamp, err := parseTimestamp(sc.Column) if err != nil { @@ -105,9 +109,8 @@ func parseRows(sc *scanner, dst []Row, tags []Tag, metrics []metric, cds []Colum continue } metricName := cd.MetricName - if metricName == "" || sc.Column == "" { - // The given field is ignored. - continue + if metricName == "" { + logger.Panicf("BUG: unexpected empty MetricName") } value, err := fastfloat.Parse(sc.Column) if err != nil { @@ -127,7 +130,7 @@ func parseRows(sc *scanner, dst []Row, tags []Tag, metrics []metric, cds []Colum continue } if len(metrics) == 0 { - logger.Panicf("BUG: expecting at least a single metric in columnDescriptors=%#v", cds) + continue } r.Metric = metrics[0].Name r.Tags = tags[tagsLen:] diff --git a/lib/protoparser/csvimport/parser_test.go b/lib/protoparser/csvimport/parser_test.go index 342a1d357..1aeb02e79 100644 --- a/lib/protoparser/csvimport/parser_test.go +++ b/lib/protoparser/csvimport/parser_test.go @@ -30,6 +30,8 @@ func TestRowsUnmarshalFailure(t *testing.T) { // Missing columns f("3:metric:aaa", "123,456") + f("1:metric:foo,2:label:bar", "123") + f("1:label:foo,2:metric:bar", "aaa") // Invalid value f("1:metric:foo", "12foobar") @@ -189,6 +191,35 @@ func TestRowsUnmarshalSuccess(t *testing.T) { Value: 2, }, }) + // last metric with empty value + // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4048 + f("1:metric:foo,2:metric:bar", `123,`, []Row{ + { + Metric: "foo", + Value: 123, + }, + }) + // all the metrics with empty values + f(`1:metric:foo,2:metric:bar,3:label:xx`, `,,abc`, nil) + // labels with empty value + f("1:metric:foo,2:label:bar,3:label:baz,4:label:xxx", "123,x,,", []Row{ + { + Metric: "foo", + Tags: []Tag{ + { + Key: "bar", + Value: "x", + }, + }, + Value: 123, + }, + }) + f("1:metric:foo,2:label:bar,3:label:baz,4:label:xxx", "123,,,", []Row{ + { + Metric: "foo", + Value: 123, + }, + }) // see https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3540 f("1:label:mytest,2:time:rfc3339,3:metric:M10,4:metric:M20,5:metric:M30,6:metric:M40,7:metric:M50,8:metric:M60", `test,2022-12-25T16:57:12+01:00,10,20,30,,,60,70,80`, []Row{ diff --git a/lib/protoparser/csvimport/scanner.go b/lib/protoparser/csvimport/scanner.go index c1ab924d4..3417d0210 100644 --- a/lib/protoparser/csvimport/scanner.go +++ b/lib/protoparser/csvimport/scanner.go @@ -17,6 +17,9 @@ type scanner struct { // It is cleared on NextLine call. Error error + // isLastColumn is set to true when the last column at the given Line is processed + isLastColumn bool + s string } @@ -25,6 +28,7 @@ func (sc *scanner) Init(s string) { sc.Line = "" sc.Column = "" sc.Error = nil + sc.isLastColumn = false sc.s = s } @@ -35,23 +39,26 @@ func (sc *scanner) Init(s string) { // false is returned if no more lines left in sc.s func (sc *scanner) NextLine() bool { s := sc.s + sc.Line = "" sc.Error = nil + sc.isLastColumn = false for len(s) > 0 { n := strings.IndexByte(s, '\n') var line string if n >= 0 { - line = trimTrailingSpace(s[:n]) + line = trimTrailingCR(s[:n]) s = s[n+1:] } else { - line = trimTrailingSpace(s) + line = trimTrailingCR(s) s = "" } - sc.Line = line - sc.s = s if len(line) > 0 { + sc.Line = line + sc.s = s return true } } + sc.s = "" return false } @@ -60,16 +67,28 @@ func (sc *scanner) NextLine() bool { // false is returned if no more columns left in sc.Line or if any error occurs. // sc.Error is set to error in the case of error. func (sc *scanner) NextColumn() bool { + if sc.isLastColumn || sc.Error != nil { + return false + } s := sc.Line - if len(s) == 0 { - return false - } - if sc.Error != nil { - return false - } - if s[0] == '"' { - sc.Column, sc.Line, sc.Error = readQuotedField(s) - return sc.Error == nil + if strings.HasPrefix(s, `"`) || strings.HasPrefix(s, "'") { + field, tail, err := readQuotedField(s) + if err != nil { + sc.Error = err + return false + } + sc.Column = field + if len(tail) == 0 { + sc.isLastColumn = true + } else { + if tail[0] != ',' { + sc.Error = fmt.Errorf("missing comma after quoted field in %q", s) + return false + } + tail = tail[1:] + } + sc.Line = tail + return true } n := strings.IndexByte(s, ',') if n >= 0 { @@ -78,11 +97,12 @@ func (sc *scanner) NextColumn() bool { } else { sc.Column = s sc.Line = "" + sc.isLastColumn = true } return true } -func trimTrailingSpace(s string) string { +func trimTrailingCR(s string) string { if len(s) > 0 && s[len(s)-1] == '\r' { return s[:len(s)-1] } @@ -90,38 +110,32 @@ func trimTrailingSpace(s string) string { } func readQuotedField(s string) (string, string, error) { - sOrig := s - if len(s) == 0 || s[0] != '"' { - return "", sOrig, fmt.Errorf("missing opening quote for %q", sOrig) + quote := s[0] + offset := 1 + n := strings.IndexByte(s[offset:], quote) + if n < 0 { + return "", s, fmt.Errorf("missing closing quote for %q", s) } - s = s[1:] - hasEscapedQuote := false + offset += n + 1 + if offset >= len(s) || s[offset] != quote { + // Fast path - the quoted string doesn't contain escaped quotes + return s[1 : offset-1], s[offset:], nil + } + // Slow path - the quoted string contains escaped quote + buf := make([]byte, 0, len(s)-2) + buf = append(buf, s[1:offset]...) for { - n := strings.IndexByte(s, '"') + offset++ + n := strings.IndexByte(s[offset:], quote) if n < 0 { - return "", sOrig, fmt.Errorf("missing closing quote for %q", sOrig) + return "", s, fmt.Errorf("missing closing quote for %q", s) } - s = s[n+1:] - if len(s) == 0 { - // The end of string found - return unquote(sOrig[1:len(sOrig)-1], hasEscapedQuote), "", nil - } - if s[0] == '"' { - // Take into account escaped quote - s = s[1:] - hasEscapedQuote = true + buf = append(buf, s[offset:offset+n]...) + offset += n + 1 + if offset < len(s) && s[offset] == quote { + buf = append(buf, quote) continue } - if s[0] != ',' { - return "", sOrig, fmt.Errorf("missing comma after quoted field in %q", sOrig) - } - return unquote(sOrig[1:len(sOrig)-len(s)-1], hasEscapedQuote), s[1:], nil + return string(buf), s[offset:], nil } } - -func unquote(s string, hasEscapedQuote bool) string { - if !hasEscapedQuote { - return s - } - return strings.ReplaceAll(s, `""`, `"`) -} diff --git a/lib/protoparser/csvimport/scanner_test.go b/lib/protoparser/csvimport/scanner_test.go index 8e4381ec0..9eb9c5034 100644 --- a/lib/protoparser/csvimport/scanner_test.go +++ b/lib/protoparser/csvimport/scanner_test.go @@ -1,63 +1,117 @@ package csvimport import ( + "reflect" "testing" ) +func TestReadQuotedFieldSuccess(t *testing.T) { + f := func(s, resultExpected, tailExpected string) { + t.Helper() + result, tail, err := readQuotedField(s) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + if result != resultExpected { + t.Fatalf("unexpected result; got %q; want %q", result, resultExpected) + } + if tail != tailExpected { + t.Fatalf("unexpected tail; got %q; want %q", tail, tailExpected) + } + } + + // double quotes + f(`""`, ``, ``) + f(`"",`, ``, `,`) + f(`"",foobar`, ``, `,foobar`) + f(`"","bc"`, ``, `,"bc"`) + f(`"a"`, `a`, ``) + f(`"a"bc`, `a`, `bc`) + f(`"foo`+"`',\n\t\r"+`bar"baz`, "foo`',\n\t\rbar", "baz") + + // single quotes + f(`''`, ``, ``) + f(`'',`, ``, `,`) + f(`'',foobar`, ``, `,foobar`) + f(`'','bc'`, ``, `,'bc'`) + f(`'a'`, `a`, ``) + f(`'a'bc`, `a`, `bc`) + f(`'foo"`+"`,\n\t\r"+`bar'baz`, "foo\"`,\n\t\rbar", "baz") + + // escaped double quotes + f(`" foo""bar"baz`, ` foo"bar`, `baz`) + f(`""""bar"baz`, `"`, `bar"baz`) + f(`"a,""b""'c",d,"e"`, `a,"b"'c`, `,d,"e"`) + + // escaped single quotes + f(`' foo''bar'baz`, ` foo'bar`, `baz`) + f(`''''bar'baz`, `'`, `bar'baz`) + f(`'''bar'''baz`, `'bar'`, `baz`) + f(`'a,''b''"c',d,'e'`, `a,'b'"c`, `,d,'e'`) +} + +func TestReadQuotedFieldFailure(t *testing.T) { + f := func(s string) { + t.Helper() + field, tail, err := readQuotedField(s) + if field != "" { + t.Fatalf("unexpected non-empty field returned: %q", field) + } + if tail != s { + t.Fatalf("unexpected tail returned; got %q; want %q", tail, s) + } + if err == nil { + t.Fatalf("expecting non-nil error") + } + } + f(`"`) + f(`'`) + f(`"foo""`) + f(`'foo''`) + f(`'foo''`) +} + func TestScannerSuccess(t *testing.T) { - var sc scanner - sc.Init("foo,bar\n\"aa,\"\"bb\",\"\"") - if !sc.NextLine() { - t.Fatalf("expecting the first line") - } - if sc.Line != "foo,bar" { - t.Fatalf("unexpected line; got %q; want %q", sc.Line, "foo,bar") - } - if !sc.NextColumn() { - t.Fatalf("expecting the first column") - } - if sc.Column != "foo" { - t.Fatalf("unexpected first column; got %q; want %q", sc.Column, "foo") - } - if !sc.NextColumn() { - t.Fatalf("expecting the second column") - } - if sc.Column != "bar" { - t.Fatalf("unexpected second column; got %q; want %q", sc.Column, "bar") - } - if sc.NextColumn() { - t.Fatalf("unexpected next column: %q", sc.Column) - } - if sc.Error != nil { - t.Fatalf("unexpected error: %s", sc.Error) - } - if !sc.NextLine() { - t.Fatalf("expecting the second line") - } - if sc.Line != "\"aa,\"\"bb\",\"\"" { - t.Fatalf("unexpected the second line; got %q; want %q", sc.Line, "\"aa,\"\"bb\",\"\"") - } - if !sc.NextColumn() { - t.Fatalf("expecting the first column on the second line") - } - if sc.Column != "aa,\"bb" { - t.Fatalf("unexpected column on the second line; got %q; want %q", sc.Column, "aa,\"bb") - } - if !sc.NextColumn() { - t.Fatalf("expecting the second column on the second line") - } - if sc.Column != "" { - t.Fatalf("unexpected column on the second line; got %q; want %q", sc.Column, "") - } - if sc.NextColumn() { - t.Fatalf("unexpected next column on the second line: %q", sc.Column) - } - if sc.Error != nil { - t.Fatalf("unexpected error: %s", sc.Error) - } - if sc.NextLine() { - t.Fatalf("unexpected next line: %q", sc.Line) + f := func(s string, rowsExpected [][]string) { + t.Helper() + var sc scanner + sc.Init(s) + var rows [][]string + for sc.NextLine() { + var row []string + for sc.NextColumn() { + row = append(row, sc.Column) + } + rows = append(rows, row) + } + if sc.Error != nil { + t.Fatalf("unexpected error: %s", sc.Error) + } + if !reflect.DeepEqual(rows, rowsExpected) { + t.Fatalf("unexpected rows;\ngot\n%q\nwant\n%q", rows, rowsExpected) + } } + + f("", nil) + f("\n", nil) + f("\r\n\n\r", nil) + f("foo,bar\n\"aa,\"\"bb\",\"\"", [][]string{ + {"foo", "bar"}, + {`aa,"bb`, ``}, + }) + f(`fo"bar,baz'a,"bc""de",'g''e'`, [][]string{ + {`fo"bar`, `baz'a`, `bc"de`, `g'e`}, + }) + f(`,`, [][]string{ + {``, ``}, + }) + f(`foo`, [][]string{ + {`foo`}, + }) + f(`foo,,`+"\r\n"+`,bar,`+"\n", [][]string{ + {`foo`, ``, ``}, + {``, `bar`, ``}, + }) } func TestScannerFailure(t *testing.T) {