From 4266091e4f5ab753d5806242a7682feeccd265fe Mon Sep 17 00:00:00 2001
From: Aliaksandr Valialkin <valyala@gmail.com>
Date: Sat, 24 Aug 2019 12:54:17 +0300
Subject: [PATCH] app/vminsert/opentsdb: skip invalid rows and continue parsing
 the remaining rows

Invalid rows are logged and counted in `vm_rows_invalid_total{type="opentsdb"}` metric
---
 app/vminsert/opentsdb/parser.go             | 65 +++++++++++----------
 app/vminsert/opentsdb/parser_test.go        | 43 ++++++++++----
 app/vminsert/opentsdb/parser_timing_test.go |  5 +-
 app/vminsert/opentsdb/request_handler.go    | 11 +---
 4 files changed, 74 insertions(+), 50 deletions(-)

diff --git a/app/vminsert/opentsdb/parser.go b/app/vminsert/opentsdb/parser.go
index 02b750d3bf..360dea5aff 100644
--- a/app/vminsert/opentsdb/parser.go
+++ b/app/vminsert/opentsdb/parser.go
@@ -4,6 +4,8 @@ import (
 	"fmt"
 	"strings"
 
+	"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
+	"github.com/VictoriaMetrics/metrics"
 	"github.com/valyala/fastjson/fastfloat"
 )
 
@@ -34,10 +36,8 @@ func (rs *Rows) Reset() {
 // See http://opentsdb.net/docs/build/html/api_telnet/put.html
 //
 // s must be unchanged until rs is in use.
-func (rs *Rows) Unmarshal(s string) error {
-	var err error
-	rs.Rows, rs.tagsPool, err = unmarshalRows(rs.Rows[:0], s, rs.tagsPool[:0])
-	return err
+func (rs *Rows) Unmarshal(s string) {
+	rs.Rows, rs.tagsPool = unmarshalRows(rs.Rows[:0], s, rs.tagsPool[:0])
 }
 
 // Row is a single OpenTSDB row.
@@ -89,41 +89,46 @@ func (r *Row) unmarshal(s string, tagsPool []Tag) ([]Tag, error) {
 	return tagsPool, nil
 }
 
-func unmarshalRows(dst []Row, s string, tagsPool []Tag) ([]Row, []Tag, error) {
+func unmarshalRows(dst []Row, s string, tagsPool []Tag) ([]Row, []Tag) {
 	for len(s) > 0 {
 		n := strings.IndexByte(s, '\n')
-		if n == 0 {
-			// Skip empty line
-			s = s[1:]
-			continue
-		}
-		if cap(dst) > len(dst) {
-			dst = dst[:len(dst)+1]
-		} else {
-			dst = append(dst, Row{})
-		}
-		r := &dst[len(dst)-1]
 		if n < 0 {
 			// The last line.
-			var err error
-			tagsPool, err = r.unmarshal(s, tagsPool)
-			if err != nil {
-				err = fmt.Errorf("cannot unmarshal OpenTSDB line %q: %s", s, err)
-				return dst, tagsPool, err
-			}
-			return dst, tagsPool, nil
-		}
-		var err error
-		tagsPool, err = r.unmarshal(s[:n], tagsPool)
-		if err != nil {
-			err = fmt.Errorf("cannot unmarshal OpenTSDB line %q: %s", s[:n], err)
-			return dst, tagsPool, err
+			return unmarshalRow(dst, s, tagsPool)
 		}
+		dst, tagsPool = unmarshalRow(dst, s[:n], tagsPool)
 		s = s[n+1:]
 	}
-	return dst, tagsPool, nil
+	return dst, tagsPool
 }
 
+func unmarshalRow(dst []Row, s string, tagsPool []Tag) ([]Row, []Tag) {
+	if len(s) > 0 && s[len(s)-1] == '\r' {
+		s = s[:len(s)-1]
+	}
+	if len(s) == 0 {
+		// Skip empty line
+		return dst, tagsPool
+	}
+
+	if cap(dst) > len(dst) {
+		dst = dst[:len(dst)+1]
+	} else {
+		dst = append(dst, Row{})
+	}
+	r := &dst[len(dst)-1]
+	var err error
+	tagsPool, err = r.unmarshal(s, tagsPool)
+	if err != nil {
+		dst = dst[:len(dst)-1]
+		logger.Errorf("cannot unmarshal OpenTSDB line %q: %s", s, err)
+		invalidLines.Inc()
+	}
+	return dst, tagsPool
+}
+
+var invalidLines = metrics.NewCounter(`vm_rows_invalid_total{type="opentsdb"}`)
+
 func unmarshalTags(dst []Tag, s string) ([]Tag, error) {
 	for {
 		if cap(dst) > len(dst) {
diff --git a/app/vminsert/opentsdb/parser_test.go b/app/vminsert/opentsdb/parser_test.go
index 35b57d37da..4524c779a6 100644
--- a/app/vminsert/opentsdb/parser_test.go
+++ b/app/vminsert/opentsdb/parser_test.go
@@ -9,13 +9,15 @@ func TestRowsUnmarshalFailure(t *testing.T) {
 	f := func(s string) {
 		t.Helper()
 		var rows Rows
-		if err := rows.Unmarshal(s); err == nil {
-			t.Fatalf("expecting non-nil error when parsing %q", s)
+		rows.Unmarshal(s)
+		if len(rows.Rows) != 0 {
+			t.Fatalf("unexpected number of rows parsed; got %d; want 0", len(rows.Rows))
 		}
 
 		// Try again
-		if err := rows.Unmarshal(s); err == nil {
-			t.Fatalf("expecting non-nil error when parsing %q", s)
+		rows.Unmarshal(s)
+		if len(rows.Rows) != 0 {
+			t.Fatalf("unexpected number of rows parsed; got %d; want 0", len(rows.Rows))
 		}
 	}
 
@@ -51,17 +53,13 @@ func TestRowsUnmarshalSuccess(t *testing.T) {
 	f := func(s string, rowsExpected *Rows) {
 		t.Helper()
 		var rows Rows
-		if err := rows.Unmarshal(s); err != nil {
-			t.Fatalf("cannot unmarshal %q: %s", s, err)
-		}
+		rows.Unmarshal(s)
 		if !reflect.DeepEqual(rows.Rows, rowsExpected.Rows) {
 			t.Fatalf("unexpected rows;\ngot\n%+v;\nwant\n%+v", rows.Rows, rowsExpected.Rows)
 		}
 
 		// Try unmarshaling again
-		if err := rows.Unmarshal(s); err != nil {
-			t.Fatalf("cannot unmarshal %q: %s", s, err)
-		}
+		rows.Unmarshal(s)
 		if !reflect.DeepEqual(rows.Rows, rowsExpected.Rows) {
 			t.Fatalf("unexpected rows;\ngot\n%+v;\nwant\n%+v", rows.Rows, rowsExpected.Rows)
 		}
@@ -74,7 +72,9 @@ func TestRowsUnmarshalSuccess(t *testing.T) {
 
 	// Empty line
 	f("", &Rows{})
+	f("\r", &Rows{})
 	f("\n\n", &Rows{})
+	f("\n\r\n", &Rows{})
 
 	// Single line
 	f("put foobar 789 -123.456 a=b", &Rows{
@@ -200,4 +200,27 @@ func TestRowsUnmarshalSuccess(t *testing.T) {
 			},
 		},
 	})
+	// Multi lines with invalid line
+	f("put foo 2 0.3 a=b\naaa bbb\nput bar.baz 43 0.34 a=b\n", &Rows{
+		Rows: []Row{
+			{
+				Metric:    "foo",
+				Value:     0.3,
+				Timestamp: 2,
+				Tags: []Tag{{
+					Key:   "a",
+					Value: "b",
+				}},
+			},
+			{
+				Metric:    "bar.baz",
+				Value:     0.34,
+				Timestamp: 43,
+				Tags: []Tag{{
+					Key:   "a",
+					Value: "b",
+				}},
+			},
+		},
+	})
 }
diff --git a/app/vminsert/opentsdb/parser_timing_test.go b/app/vminsert/opentsdb/parser_timing_test.go
index 06bc01a2a9..25e867edb0 100644
--- a/app/vminsert/opentsdb/parser_timing_test.go
+++ b/app/vminsert/opentsdb/parser_timing_test.go
@@ -16,8 +16,9 @@ put cpu.usage_irq 1234556768 0.34432 a=b
 	b.RunParallel(func(pb *testing.PB) {
 		var rows Rows
 		for pb.Next() {
-			if err := rows.Unmarshal(s); err != nil {
-				panic(fmt.Errorf("cannot unmarshal %q: %s", s, err))
+			rows.Unmarshal(s)
+			if len(rows.Rows) != 4 {
+				panic(fmt.Errorf("unexpected number of parsed rows; got %d; want 4", len(rows.Rows)))
 			}
 		}
 	})
diff --git a/app/vminsert/opentsdb/request_handler.go b/app/vminsert/opentsdb/request_handler.go
index 6ce8810307..911ab530a3 100644
--- a/app/vminsert/opentsdb/request_handler.go
+++ b/app/vminsert/opentsdb/request_handler.go
@@ -85,11 +85,7 @@ func (ctx *pushCtx) Read(r io.Reader) bool {
 			return false
 		}
 	}
-	if err := ctx.Rows.Unmarshal(bytesutil.ToUnsafeString(ctx.reqBuf)); err != nil {
-		opentsdbUnmarshalErrors.Inc()
-		ctx.err = fmt.Errorf("cannot unmarshal OpenTSDB put protocol data with size %d: %s", len(ctx.reqBuf), err)
-		return false
-	}
+	ctx.Rows.Unmarshal(bytesutil.ToUnsafeString(ctx.reqBuf))
 
 	// Fill in missing timestamps
 	currentTimestamp := time.Now().Unix()
@@ -135,9 +131,8 @@ func (ctx *pushCtx) reset() {
 }
 
 var (
-	opentsdbReadCalls       = metrics.NewCounter(`vm_read_calls_total{name="opentsdb"}`)
-	opentsdbReadErrors      = metrics.NewCounter(`vm_read_errors_total{name="opentsdb"}`)
-	opentsdbUnmarshalErrors = metrics.NewCounter(`vm_unmarshal_errors_total{name="opentsdb"}`)
+	opentsdbReadCalls  = metrics.NewCounter(`vm_read_calls_total{name="opentsdb"}`)
+	opentsdbReadErrors = metrics.NewCounter(`vm_read_errors_total{name="opentsdb"}`)
 )
 
 func getPushCtx() *pushCtx {