From 38b9615c5333d651c55ed271bb5c9d20a1f055a0 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Sat, 24 Aug 2019 12:54:17 +0300 Subject: [PATCH] app/vminsert/opentsdb: skip invalid rows and continue parsing the remaining rows Invalid rows are logged and counted in `vm_rows_invalid_total{type="opentsdb"}` metric --- app/vminsert/opentsdb/parser.go | 65 +++++++++++---------- app/vminsert/opentsdb/parser_test.go | 43 ++++++++++---- app/vminsert/opentsdb/parser_timing_test.go | 5 +- app/vminsert/opentsdb/request_handler.go | 11 +--- 4 files changed, 74 insertions(+), 50 deletions(-) diff --git a/app/vminsert/opentsdb/parser.go b/app/vminsert/opentsdb/parser.go index 02b750d3b..360dea5af 100644 --- a/app/vminsert/opentsdb/parser.go +++ b/app/vminsert/opentsdb/parser.go @@ -4,6 +4,8 @@ import ( "fmt" "strings" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" + "github.com/VictoriaMetrics/metrics" "github.com/valyala/fastjson/fastfloat" ) @@ -34,10 +36,8 @@ func (rs *Rows) Reset() { // See http://opentsdb.net/docs/build/html/api_telnet/put.html // // s must be unchanged until rs is in use. -func (rs *Rows) Unmarshal(s string) error { - var err error - rs.Rows, rs.tagsPool, err = unmarshalRows(rs.Rows[:0], s, rs.tagsPool[:0]) - return err +func (rs *Rows) Unmarshal(s string) { + rs.Rows, rs.tagsPool = unmarshalRows(rs.Rows[:0], s, rs.tagsPool[:0]) } // Row is a single OpenTSDB row. @@ -89,41 +89,46 @@ func (r *Row) unmarshal(s string, tagsPool []Tag) ([]Tag, error) { return tagsPool, nil } -func unmarshalRows(dst []Row, s string, tagsPool []Tag) ([]Row, []Tag, error) { +func unmarshalRows(dst []Row, s string, tagsPool []Tag) ([]Row, []Tag) { for len(s) > 0 { n := strings.IndexByte(s, '\n') - if n == 0 { - // Skip empty line - s = s[1:] - continue - } - if cap(dst) > len(dst) { - dst = dst[:len(dst)+1] - } else { - dst = append(dst, Row{}) - } - r := &dst[len(dst)-1] if n < 0 { // The last line. - var err error - tagsPool, err = r.unmarshal(s, tagsPool) - if err != nil { - err = fmt.Errorf("cannot unmarshal OpenTSDB line %q: %s", s, err) - return dst, tagsPool, err - } - return dst, tagsPool, nil - } - var err error - tagsPool, err = r.unmarshal(s[:n], tagsPool) - if err != nil { - err = fmt.Errorf("cannot unmarshal OpenTSDB line %q: %s", s[:n], err) - return dst, tagsPool, err + return unmarshalRow(dst, s, tagsPool) } + dst, tagsPool = unmarshalRow(dst, s[:n], tagsPool) s = s[n+1:] } - return dst, tagsPool, nil + return dst, tagsPool } +func unmarshalRow(dst []Row, s string, tagsPool []Tag) ([]Row, []Tag) { + if len(s) > 0 && s[len(s)-1] == '\r' { + s = s[:len(s)-1] + } + if len(s) == 0 { + // Skip empty line + return dst, tagsPool + } + + if cap(dst) > len(dst) { + dst = dst[:len(dst)+1] + } else { + dst = append(dst, Row{}) + } + r := &dst[len(dst)-1] + var err error + tagsPool, err = r.unmarshal(s, tagsPool) + if err != nil { + dst = dst[:len(dst)-1] + logger.Errorf("cannot unmarshal OpenTSDB line %q: %s", s, err) + invalidLines.Inc() + } + return dst, tagsPool +} + +var invalidLines = metrics.NewCounter(`vm_rows_invalid_total{type="opentsdb"}`) + func unmarshalTags(dst []Tag, s string) ([]Tag, error) { for { if cap(dst) > len(dst) { diff --git a/app/vminsert/opentsdb/parser_test.go b/app/vminsert/opentsdb/parser_test.go index 35b57d37d..4524c779a 100644 --- a/app/vminsert/opentsdb/parser_test.go +++ b/app/vminsert/opentsdb/parser_test.go @@ -9,13 +9,15 @@ func TestRowsUnmarshalFailure(t *testing.T) { f := func(s string) { t.Helper() var rows Rows - if err := rows.Unmarshal(s); err == nil { - t.Fatalf("expecting non-nil error when parsing %q", s) + rows.Unmarshal(s) + if len(rows.Rows) != 0 { + t.Fatalf("unexpected number of rows parsed; got %d; want 0", len(rows.Rows)) } // Try again - if err := rows.Unmarshal(s); err == nil { - t.Fatalf("expecting non-nil error when parsing %q", s) + rows.Unmarshal(s) + if len(rows.Rows) != 0 { + t.Fatalf("unexpected number of rows parsed; got %d; want 0", len(rows.Rows)) } } @@ -51,17 +53,13 @@ func TestRowsUnmarshalSuccess(t *testing.T) { f := func(s string, rowsExpected *Rows) { t.Helper() var rows Rows - if err := rows.Unmarshal(s); err != nil { - t.Fatalf("cannot unmarshal %q: %s", s, err) - } + rows.Unmarshal(s) if !reflect.DeepEqual(rows.Rows, rowsExpected.Rows) { t.Fatalf("unexpected rows;\ngot\n%+v;\nwant\n%+v", rows.Rows, rowsExpected.Rows) } // Try unmarshaling again - if err := rows.Unmarshal(s); err != nil { - t.Fatalf("cannot unmarshal %q: %s", s, err) - } + rows.Unmarshal(s) if !reflect.DeepEqual(rows.Rows, rowsExpected.Rows) { t.Fatalf("unexpected rows;\ngot\n%+v;\nwant\n%+v", rows.Rows, rowsExpected.Rows) } @@ -74,7 +72,9 @@ func TestRowsUnmarshalSuccess(t *testing.T) { // Empty line f("", &Rows{}) + f("\r", &Rows{}) f("\n\n", &Rows{}) + f("\n\r\n", &Rows{}) // Single line f("put foobar 789 -123.456 a=b", &Rows{ @@ -200,4 +200,27 @@ func TestRowsUnmarshalSuccess(t *testing.T) { }, }, }) + // Multi lines with invalid line + f("put foo 2 0.3 a=b\naaa bbb\nput bar.baz 43 0.34 a=b\n", &Rows{ + Rows: []Row{ + { + Metric: "foo", + Value: 0.3, + Timestamp: 2, + Tags: []Tag{{ + Key: "a", + Value: "b", + }}, + }, + { + Metric: "bar.baz", + Value: 0.34, + Timestamp: 43, + Tags: []Tag{{ + Key: "a", + Value: "b", + }}, + }, + }, + }) } diff --git a/app/vminsert/opentsdb/parser_timing_test.go b/app/vminsert/opentsdb/parser_timing_test.go index 06bc01a2a..25e867edb 100644 --- a/app/vminsert/opentsdb/parser_timing_test.go +++ b/app/vminsert/opentsdb/parser_timing_test.go @@ -16,8 +16,9 @@ put cpu.usage_irq 1234556768 0.34432 a=b b.RunParallel(func(pb *testing.PB) { var rows Rows for pb.Next() { - if err := rows.Unmarshal(s); err != nil { - panic(fmt.Errorf("cannot unmarshal %q: %s", s, err)) + rows.Unmarshal(s) + if len(rows.Rows) != 4 { + panic(fmt.Errorf("unexpected number of parsed rows; got %d; want 4", len(rows.Rows))) } } }) diff --git a/app/vminsert/opentsdb/request_handler.go b/app/vminsert/opentsdb/request_handler.go index 007633ea2..abf44854f 100644 --- a/app/vminsert/opentsdb/request_handler.go +++ b/app/vminsert/opentsdb/request_handler.go @@ -103,11 +103,7 @@ func (ctx *pushCtx) Read(r io.Reader) bool { return false } } - if err := ctx.Rows.Unmarshal(bytesutil.ToUnsafeString(ctx.reqBuf)); err != nil { - opentsdbUnmarshalErrors.Inc() - ctx.err = fmt.Errorf("cannot unmarshal OpenTSDB put protocol data with size %d: %s", len(ctx.reqBuf), err) - return false - } + ctx.Rows.Unmarshal(bytesutil.ToUnsafeString(ctx.reqBuf)) // Fill in missing timestamps currentTimestamp := time.Now().Unix() @@ -153,9 +149,8 @@ func (ctx *pushCtx) reset() { } var ( - opentsdbReadCalls = metrics.NewCounter(`vm_read_calls_total{name="opentsdb"}`) - opentsdbReadErrors = metrics.NewCounter(`vm_read_errors_total{name="opentsdb"}`) - opentsdbUnmarshalErrors = metrics.NewCounter(`vm_unmarshal_errors_total{name="opentsdb"}`) + opentsdbReadCalls = metrics.NewCounter(`vm_read_calls_total{name="opentsdb"}`) + opentsdbReadErrors = metrics.NewCounter(`vm_read_errors_total{name="opentsdb"}`) ) func getPushCtx() *pushCtx {