From a283023d16be35f8bd02163bbe5b8340200c4d8c Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Sat, 24 Aug 2019 13:05:20 +0300 Subject: [PATCH] app/vminsert/opentsdbhttp: skip invalid rows and continue parsing the remaining rows Invalid rows are logged and counted in `vm_rows_invalid_total{type="opentsdb-http"}` metric --- app/vminsert/opentsdbhttp/parser.go | 34 ++++++++++--------- app/vminsert/opentsdbhttp/parser_test.go | 20 +++++------ .../opentsdbhttp/parser_timing_test.go | 5 +-- app/vminsert/opentsdbhttp/request_handler.go | 5 +-- 4 files changed, 31 insertions(+), 33 deletions(-) diff --git a/app/vminsert/opentsdbhttp/parser.go b/app/vminsert/opentsdbhttp/parser.go index bb6c049165..891ebe5d2d 100644 --- a/app/vminsert/opentsdbhttp/parser.go +++ b/app/vminsert/opentsdbhttp/parser.go @@ -4,6 +4,8 @@ import ( "fmt" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" + "github.com/VictoriaMetrics/metrics" "github.com/valyala/fastjson" "github.com/valyala/fastjson/fastfloat" ) @@ -34,10 +36,8 @@ func (rs *Rows) Reset() { // See http://opentsdb.net/docs/build/html/api_http/put.html // // s must be unchanged until rs is in use. -func (rs *Rows) Unmarshal(av *fastjson.Value) error { - var err error - rs.Rows, rs.tagsPool, err = unmarshalRows(rs.Rows[:0], av, rs.tagsPool[:0]) - return err +func (rs *Rows) Unmarshal(av *fastjson.Value) { + rs.Rows, rs.tagsPool = unmarshalRows(rs.Rows[:0], av, rs.tagsPool[:0]) } // Row is a single OpenTSDB row. @@ -122,26 +122,24 @@ func getFloat64(v *fastjson.Value) (float64, error) { } } -func unmarshalRows(dst []Row, av *fastjson.Value, tagsPool []Tag) ([]Row, []Tag, error) { +func unmarshalRows(dst []Row, av *fastjson.Value, tagsPool []Tag) ([]Row, []Tag) { switch av.Type() { case fastjson.TypeObject: return unmarshalRow(dst, av, tagsPool) case fastjson.TypeArray: a, _ := av.Array() - for i, o := range a { - var err error - dst, tagsPool, err = unmarshalRow(dst, o, tagsPool) - if err != nil { - return dst, tagsPool, fmt.Errorf("cannot unmarshal %d object out of %d objects: %s", i, len(a), err) - } + for _, o := range a { + dst, tagsPool = unmarshalRow(dst, o, tagsPool) } - return dst, tagsPool, nil + return dst, tagsPool default: - return dst, tagsPool, fmt.Errorf("OpenTSDB body must be either object or array; got %s; body=%s", av.Type(), av) + logger.Errorf("OpenTSDB JSON must be either object or array; got %s; body=%s", av.Type(), av) + invalidLines.Inc() + return dst, tagsPool } } -func unmarshalRow(dst []Row, o *fastjson.Value, tagsPool []Tag) ([]Row, []Tag, error) { +func unmarshalRow(dst []Row, o *fastjson.Value, tagsPool []Tag) ([]Row, []Tag) { if cap(dst) > len(dst) { dst = dst[:len(dst)+1] } else { @@ -151,11 +149,15 @@ func unmarshalRow(dst []Row, o *fastjson.Value, tagsPool []Tag) ([]Row, []Tag, e var err error tagsPool, err = r.unmarshal(o, tagsPool) if err != nil { - return dst, tagsPool, fmt.Errorf("cannot unmarshal OpenTSDB object %s: %s", o, err) + dst = dst[:len(dst)-1] + logger.Errorf("cannot unmarshal OpenTSDB object %s: %s", o, err) + invalidLines.Inc() } - return dst, tagsPool, nil + return dst, tagsPool } +var invalidLines = metrics.NewCounter(`vm_rows_invalid_total{type="opentsdb-http"}`) + func unmarshalTags(dst []Tag, o *fastjson.Object) ([]Tag, error) { var err error o.Visit(func(k []byte, v *fastjson.Value) { diff --git a/app/vminsert/opentsdbhttp/parser_test.go b/app/vminsert/opentsdbhttp/parser_test.go index 366d481fa1..6a9284d376 100644 --- a/app/vminsert/opentsdbhttp/parser_test.go +++ b/app/vminsert/opentsdbhttp/parser_test.go @@ -17,12 +17,14 @@ func TestRowsUnmarshalFailure(t *testing.T) { return } // Verify OpenTSDB body parsing error - if err := rows.Unmarshal(v); err == nil { - t.Fatalf("expecting non-nil error when parsing %q", s) + rows.Unmarshal(v) + if len(rows.Rows) != 0 { + t.Fatalf("unexpected number of rows parsed; got %d; want 0", len(rows.Rows)) } // Try again - if err := rows.Unmarshal(v); err == nil { - t.Fatalf("expecting non-nil error when parsing %q", s) + rows.Unmarshal(v) + if len(rows.Rows) != 0 { + t.Fatalf("unexpected number of rows parsed; got %d; want 0", len(rows.Rows)) } } @@ -73,7 +75,7 @@ func TestRowsUnmarshalFailure(t *testing.T) { f(`{"metric": "aaa", "timestamp": 1122, "value": 0.45, "tags": {"foo": 1}}`) // Invalid multiline - f(`[{"metric": "aaa", "timestamp": 1122, "value": "trt", "tags":{"foo":"bar"}}, {"metric": "aaa", "timestamp": 1122, "value": 111}]`) + f(`[{"metric": "aaa", "timestamp": 1122, "value": "trt", "tags":{"foo":"bar"}}, {"metric": "aaa", "timestamp": [1122], "value": 111}]`) } func TestRowsUnmarshalSuccess(t *testing.T) { @@ -87,17 +89,13 @@ func TestRowsUnmarshalSuccess(t *testing.T) { if err != nil { t.Fatalf("cannot parse json %s: %s", s, err) } - if err := rows.Unmarshal(v); err != nil { - t.Fatalf("cannot unmarshal %s: %s", v, err) - } + rows.Unmarshal(v) if !reflect.DeepEqual(rows.Rows, rowsExpected.Rows) { t.Fatalf("unexpected rows;\ngot\n%+v;\nwant\n%+v", rows.Rows, rowsExpected.Rows) } // Try unmarshaling again - if err := rows.Unmarshal(v); err != nil { - t.Fatalf("cannot unmarshal %s: %s", v, err) - } + rows.Unmarshal(v) if !reflect.DeepEqual(rows.Rows, rowsExpected.Rows) { t.Fatalf("unexpected rows;\ngot\n%+v;\nwant\n%+v", rows.Rows, rowsExpected.Rows) } diff --git a/app/vminsert/opentsdbhttp/parser_timing_test.go b/app/vminsert/opentsdbhttp/parser_timing_test.go index 8444646001..984c6cd0af 100644 --- a/app/vminsert/opentsdbhttp/parser_timing_test.go +++ b/app/vminsert/opentsdbhttp/parser_timing_test.go @@ -24,8 +24,9 @@ func BenchmarkRowsUnmarshal(b *testing.B) { if err != nil { panic(fmt.Errorf("cannot parse %q: %s", s, err)) } - if err := rows.Unmarshal(v); err != nil { - panic(fmt.Errorf("cannot unmarshal %q: %s", s, err)) + rows.Unmarshal(v) + if len(rows.Rows) != 4 { + panic(fmt.Errorf("unexpected number of rows unmarshaled; got %d; want 4", len(rows.Rows))) } } }) diff --git a/app/vminsert/opentsdbhttp/request_handler.go b/app/vminsert/opentsdbhttp/request_handler.go index b02a43298b..36d7b66190 100644 --- a/app/vminsert/opentsdbhttp/request_handler.go +++ b/app/vminsert/opentsdbhttp/request_handler.go @@ -72,10 +72,7 @@ func insertHandlerInternal(at *auth.Token, req *http.Request, maxSize int64) err opentsdbUnmarshalErrors.Inc() return fmt.Errorf("cannot parse HTTP OpenTSDB json: %s", err) } - if err := ctx.Rows.Unmarshal(v); err != nil { - opentsdbUnmarshalErrors.Inc() - return fmt.Errorf("cannot unmarshal HTTP OpenTSDB json %s, %s", err, v) - } + ctx.Rows.Unmarshal(v) // Fill in missing timestamps currentTimestamp := time.Now().Unix()