diff --git a/app/vminsert/influx/parser.go b/app/vminsert/influx/parser.go index 35fb32461c..663a36ee71 100644 --- a/app/vminsert/influx/parser.go +++ b/app/vminsert/influx/parser.go @@ -4,6 +4,8 @@ import ( "fmt" "strings" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" + "github.com/VictoriaMetrics/metrics" "github.com/valyala/fastjson/fastfloat" ) @@ -41,10 +43,8 @@ func (rs *Rows) Reset() { // See https://docs.influxdata.com/influxdb/v1.7/write_protocols/line_protocol_tutorial/ // // s must be unchanged until rs is in use. -func (rs *Rows) Unmarshal(s string) error { - var err error - rs.Rows, rs.tagsPool, rs.fieldsPool, err = unmarshalRows(rs.Rows[:0], s, rs.tagsPool[:0], rs.fieldsPool[:0]) - return err +func (rs *Rows) Unmarshal(s string) { + rs.Rows, rs.tagsPool, rs.fieldsPool = unmarshalRows(rs.Rows[:0], s, rs.tagsPool[:0], rs.fieldsPool[:0]) } // Row is a single influx row. @@ -64,10 +64,6 @@ func (r *Row) reset() { func (r *Row) unmarshal(s string, tagsPool []Tag, fieldsPool []Field, noEscapeChars bool) ([]Tag, []Field, error) { r.reset() - // Remove optional \r from the end of s - if len(s) > 0 && s[len(s)-1] == '\r' { - s = s[:len(s)-1] - } n := nextUnescapedChar(s, ' ', noEscapeChars) if n < 0 { return tagsPool, fieldsPool, fmt.Errorf("cannot find Whitespace I in %q", s) @@ -177,57 +173,51 @@ func (f *Field) unmarshal(s string, noEscapeChars, hasQuotedFields bool) error { return nil } -func unmarshalRows(dst []Row, s string, tagsPool []Tag, fieldsPool []Field) ([]Row, []Tag, []Field, error) { +func unmarshalRows(dst []Row, s string, tagsPool []Tag, fieldsPool []Field) ([]Row, []Tag, []Field) { noEscapeChars := strings.IndexByte(s, '\\') < 0 for len(s) > 0 { n := strings.IndexByte(s, '\n') - if n == 0 { - // Skip empty line - s = s[1:] - continue - } - if n == 1 && s[0] == '\r' { - // Skip empty line - s = s[2:] - continue - } - if s[0] == '#' { - // Skip comment - if n > 0 { - s = s[n+1:] - } else { - s = s[len(s):] - } - continue - } - - if cap(dst) > len(dst) { - dst = dst[:len(dst)+1] - } else { - dst = append(dst, Row{}) - } - r := &dst[len(dst)-1] if n < 0 { // The last line. - var err error - tagsPool, fieldsPool, err = r.unmarshal(s, tagsPool, fieldsPool, noEscapeChars) - if err != nil { - err = fmt.Errorf("cannot unmarshal Influx line %q: %s", s, err) - return dst, tagsPool, fieldsPool, err - } - return dst, tagsPool, fieldsPool, nil - } - var err error - tagsPool, fieldsPool, err = r.unmarshal(s[:n], tagsPool, fieldsPool, noEscapeChars) - if err != nil { - err = fmt.Errorf("cannot unmarshal Influx line %q: %s", s[:n], err) - return dst, tagsPool, fieldsPool, err + return unmarshalRow(dst, s, tagsPool, fieldsPool, noEscapeChars) } + dst, tagsPool, fieldsPool = unmarshalRow(dst, s[:n], tagsPool, fieldsPool, noEscapeChars) s = s[n+1:] } - return dst, tagsPool, fieldsPool, nil + return dst, tagsPool, fieldsPool } +func unmarshalRow(dst []Row, s string, tagsPool []Tag, fieldsPool []Field, noEscapeChars bool) ([]Row, []Tag, []Field) { + if len(s) > 0 && s[len(s)-1] == '\r' { + s = s[:len(s)-1] + } + if len(s) == 0 { + // Skip empty line + return dst, tagsPool, fieldsPool + } + if s[0] == '#' { + // Skip comment + return dst, tagsPool, fieldsPool + } + + if cap(dst) > len(dst) { + dst = dst[:len(dst)+1] + } else { + dst = append(dst, Row{}) + } + r := &dst[len(dst)-1] + var err error + tagsPool, fieldsPool, err = r.unmarshal(s, tagsPool, fieldsPool, noEscapeChars) + if err != nil { + dst = dst[:len(dst)-1] + logger.Errorf("cannot unmarshal Influx line %q: %s; skipping it", s, err) + invalidLines.Inc() + } + return dst, tagsPool, fieldsPool +} + +var invalidLines = metrics.NewCounter(`vm_rows_invalid_total{type="influx"}`) + func unmarshalTags(dst []Tag, s string, noEscapeChars bool) ([]Tag, error) { for { if cap(dst) > len(dst) { diff --git a/app/vminsert/influx/parser_test.go b/app/vminsert/influx/parser_test.go index 55429d13e1..559e9adcbd 100644 --- a/app/vminsert/influx/parser_test.go +++ b/app/vminsert/influx/parser_test.go @@ -74,13 +74,15 @@ func TestRowsUnmarshalFailure(t *testing.T) { f := func(s string) { t.Helper() var rows Rows - if err := rows.Unmarshal(s); err == nil { - t.Fatalf("expecting non-nil error when parsing %q", s) + rows.Unmarshal(s) + if len(rows.Rows) != 0 { + t.Fatalf("expecting zero rows; got %d rows", len(rows.Rows)) } // Try again - if err := rows.Unmarshal(s); err == nil { - t.Fatalf("expecting non-nil error when parsing %q", s) + rows.Unmarshal(s) + if len(rows.Rows) != 0 { + t.Fatalf("expecting zero rows; got %d rows", len(rows.Rows)) } } @@ -122,17 +124,13 @@ func TestRowsUnmarshalSuccess(t *testing.T) { f := func(s string, rowsExpected *Rows) { t.Helper() var rows Rows - if err := rows.Unmarshal(s); err != nil { - t.Fatalf("cannot unmarshal %q: %s", s, err) - } + rows.Unmarshal(s) if !reflect.DeepEqual(rows.Rows, rowsExpected.Rows) { t.Fatalf("unexpected rows;\ngot\n%+v;\nwant\n%+v", rows.Rows, rowsExpected.Rows) } // Try unmarshaling again - if err := rows.Unmarshal(s); err != nil { - t.Fatalf("cannot unmarshal %q: %s", s, err) - } + rows.Unmarshal(s) if !reflect.DeepEqual(rows.Rows, rowsExpected.Rows) { t.Fatalf("unexpected rows;\ngot\n%+v;\nwant\n%+v", rows.Rows, rowsExpected.Rows) } @@ -364,6 +362,33 @@ func TestRowsUnmarshalSuccess(t *testing.T) { }, }) + // Multiple lines with invalid line in the middle. + f("foo,tag=xyz field=1.23 48934\n"+ + "invalid line\n"+ + "bar x=-1i\n\n", &Rows{ + Rows: []Row{ + { + Measurement: "foo", + Tags: []Tag{{ + Key: "tag", + Value: "xyz", + }}, + Fields: []Field{{ + Key: "field", + Value: 1.23, + }}, + Timestamp: 48934, + }, + { + Measurement: "bar", + Fields: []Field{{ + Key: "x", + Value: -1, + }}, + }, + }, + }) + // No newline after the second line. // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/82 f("foo,tag=xyz field=1.23 48934\n"+ diff --git a/app/vminsert/influx/parser_timing_test.go b/app/vminsert/influx/parser_timing_test.go index 1d05b9956b..21db9b583a 100644 --- a/app/vminsert/influx/parser_timing_test.go +++ b/app/vminsert/influx/parser_timing_test.go @@ -6,14 +6,19 @@ import ( ) func BenchmarkRowsUnmarshal(b *testing.B) { - s := `cpu usage_user=1.23,usage_system=4.34,usage_iowait=0.1112 1234556768` + s := `cpu usage_user=1.23,usage_system=4.34,usage_iowait=0.1112 1234556768 +cpu usage_user=1.23,usage_system=4.34,usage_iowait=0.1112 123455676344 +aaa usage_user=1.23,usage_system=4.34,usage_iowait=0.1112 123455676344 +bbb usage_user=1.23,usage_system=4.34,usage_iowait=0.1112 123455676344 +` b.SetBytes(int64(len(s))) b.ReportAllocs() b.RunParallel(func(pb *testing.PB) { var rows Rows for pb.Next() { - if err := rows.Unmarshal(s); err != nil { - panic(fmt.Errorf("cannot unmarshal %q: %s", s, err)) + rows.Unmarshal(s) + if len(rows.Rows) != 4 { + panic(fmt.Errorf("unexpected number of rows parsed; got %d; want 4", len(rows.Rows))) } } }) diff --git a/app/vminsert/influx/request_handler.go b/app/vminsert/influx/request_handler.go index 0fa8139a01..c75295cb61 100644 --- a/app/vminsert/influx/request_handler.go +++ b/app/vminsert/influx/request_handler.go @@ -137,11 +137,7 @@ func (ctx *pushCtx) Read(r io.Reader, tsMultiplier int64) bool { } return false } - if err := ctx.Rows.Unmarshal(bytesutil.ToUnsafeString(ctx.reqBuf)); err != nil { - influxUnmarshalErrors.Inc() - ctx.err = fmt.Errorf("cannot unmarshal influx line protocol data with size %d: %s", len(ctx.reqBuf), err) - return false - } + ctx.Rows.Unmarshal(bytesutil.ToUnsafeString(ctx.reqBuf)) // Adjust timestamps according to tsMultiplier currentTs := time.Now().UnixNano() / 1e6 @@ -170,9 +166,8 @@ func (ctx *pushCtx) Read(r io.Reader, tsMultiplier int64) bool { } var ( - influxReadCalls = metrics.NewCounter(`vm_read_calls_total{name="influx"}`) - influxReadErrors = metrics.NewCounter(`vm_read_errors_total{name="influx"}`) - influxUnmarshalErrors = metrics.NewCounter(`vm_unmarshal_errors_total{name="influx"}`) + influxReadCalls = metrics.NewCounter(`vm_read_calls_total{name="influx"}`) + influxReadErrors = metrics.NewCounter(`vm_read_errors_total{name="influx"}`) ) type pushCtx struct {