From cd87ca303f4e46c226c8b7524303876c6a24fbe8 Mon Sep 17 00:00:00 2001
From: Aliaksandr Valialkin <valyala@gmail.com>
Date: Wed, 16 Sep 2020 02:03:35 +0300
Subject: [PATCH] lib/protoparser: report more errors for incorrect timestamps
 and/or values

Previously certain errors in timestamps and/or values could be silently skipped,
which could lead to samples with zero values stored in the database.

Updates https://github.com/VictoriaMetrics/vmctl/issues/25
---
 go.mod                                        |  2 +-
 go.sum                                        |  4 +--
 .../csvimport/column_descriptor.go            | 15 ++++++++---
 lib/protoparser/csvimport/parser.go           |  5 +++-
 lib/protoparser/csvimport/parser_test.go      | 10 +++++++
 lib/protoparser/influx/parser.go              | 20 ++++++++++----
 lib/protoparser/influx/parser_test.go         |  9 +++++++
 lib/protoparser/opentsdb/parser.go            | 13 ++++++---
 lib/protoparser/opentsdb/parser_test.go       |  2 ++
 lib/protoparser/opentsdbhttp/parser.go        |  6 ++---
 lib/protoparser/prometheus/parser.go          | 18 ++++++++++---
 lib/protoparser/prometheus/parser_test.go     |  7 +++++
 vendor/github.com/valyala/fastjson/parser.go  | 27 +++++++------------
 vendor/modules.txt                            |  2 +-
 14 files changed, 100 insertions(+), 40 deletions(-)

diff --git a/go.mod b/go.mod
index 496c67717d..90f98ef98b 100644
--- a/go.mod
+++ b/go.mod
@@ -14,7 +14,7 @@ require (
 	github.com/golang/snappy v0.0.1
 	github.com/klauspost/compress v1.11.0
 	github.com/lithammer/go-jump-consistent-hash v1.0.1
-	github.com/valyala/fastjson v1.6.0
+	github.com/valyala/fastjson v1.6.1
 	github.com/valyala/fastrand v1.0.0
 	github.com/valyala/fasttemplate v1.2.1
 	github.com/valyala/gozstd v1.8.3
diff --git a/go.sum b/go.sum
index 396a41293b..1bc7f65a69 100644
--- a/go.sum
+++ b/go.sum
@@ -175,8 +175,8 @@ github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5
 github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw=
 github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
 github.com/valyala/fasthttp v1.15.1/go.mod h1:YOKImeEosDdBPnxc0gy7INqi3m1zK6A+xl6TwOBhHCA=
-github.com/valyala/fastjson v1.6.0 h1:aJV8Tvmeq1mCXxDOVV8raxBoyA3eE8xwTgW8SGQ5yKM=
-github.com/valyala/fastjson v1.6.0/go.mod h1:CLCAqky6SMuOcxStkYQvblddUtoRxhYMGLrsQns1aXY=
+github.com/valyala/fastjson v1.6.1 h1:qJs/Kz/HebWzk8LmhOrSm7kdOyJBr1XB+zSkYtEEfQE=
+github.com/valyala/fastjson v1.6.1/go.mod h1:CLCAqky6SMuOcxStkYQvblddUtoRxhYMGLrsQns1aXY=
 github.com/valyala/fastrand v1.0.0 h1:LUKT9aKer2dVQNUi3waewTbKV+7H17kvWFNKs2ObdkI=
 github.com/valyala/fastrand v1.0.0/go.mod h1:HWqCzkrkg6QXT8V2EXWvXCoow7vLwOFN002oeRzjapQ=
 github.com/valyala/fasttemplate v1.2.1 h1:TVEnxayobAdVkhQfrfes2IzOB6o+z4roRkPF52WA1u4=
diff --git a/lib/protoparser/csvimport/column_descriptor.go b/lib/protoparser/csvimport/column_descriptor.go
index ab2c84e621..3260e6ad01 100644
--- a/lib/protoparser/csvimport/column_descriptor.go
+++ b/lib/protoparser/csvimport/column_descriptor.go
@@ -136,7 +136,10 @@ func parseTimeFormat(format string) (func(s string) (int64, error), error) {
 }
 
 func parseUnixTimestampSeconds(s string) (int64, error) {
-	n := fastfloat.ParseInt64BestEffort(s)
+	n, err := fastfloat.ParseInt64(s)
+	if err != nil {
+		return 0, fmt.Errorf("cannot parse timestamp seconds from %q: %w", s, err)
+	}
 	if n > int64(1<<63-1)/1e3 {
 		return 0, fmt.Errorf("too big unix timestamp in seconds: %d; must be smaller than %d", n, int64(1<<63-1)/1e3)
 	}
@@ -144,12 +147,18 @@ func parseUnixTimestampSeconds(s string) (int64, error) {
 }
 
 func parseUnixTimestampMilliseconds(s string) (int64, error) {
-	n := fastfloat.ParseInt64BestEffort(s)
+	n, err := fastfloat.ParseInt64(s)
+	if err != nil {
+		return 0, fmt.Errorf("cannot parse timestamp milliseconds from %q: %w", s, err)
+	}
 	return n, nil
 }
 
 func parseUnixTimestampNanoseconds(s string) (int64, error) {
-	n := fastfloat.ParseInt64BestEffort(s)
+	n, err := fastfloat.ParseInt64(s)
+	if err != nil {
+		return 0, fmt.Errorf("cannot parse timestamp nanoseconds from %q: %w", s, err)
+	}
 	return n / 1e6, nil
 }
 
diff --git a/lib/protoparser/csvimport/parser.go b/lib/protoparser/csvimport/parser.go
index 5e74216e37..3e5126c1e6 100644
--- a/lib/protoparser/csvimport/parser.go
+++ b/lib/protoparser/csvimport/parser.go
@@ -109,7 +109,10 @@ func parseRows(sc *scanner, dst []Row, tags []Tag, metrics []metric, cds []Colum
 				// The given field is ignored.
 				continue
 			}
-			value := fastfloat.ParseBestEffort(sc.Column)
+			value, err := fastfloat.Parse(sc.Column)
+			if err != nil {
+				sc.Error = fmt.Errorf("cannot parse metric value for %q from %q: %w", metricName, sc.Column, err)
+			}
 			metrics = append(metrics, metric{
 				Name:  metricName,
 				Value: value,
diff --git a/lib/protoparser/csvimport/parser_test.go b/lib/protoparser/csvimport/parser_test.go
index 81dfa8918c..086588bbdd 100644
--- a/lib/protoparser/csvimport/parser_test.go
+++ b/lib/protoparser/csvimport/parser_test.go
@@ -20,9 +20,19 @@ func TestRowsUnmarshalFailure(t *testing.T) {
 	}
 	// Invalid timestamp
 	f("1:metric:foo,2:time:rfc3339", "234,foobar")
+	f("1:metric:foo,2:time:unix_s", "234,foobar")
+	f("1:metric:foo,2:time:unix_ms", "234,foobar")
+	f("1:metric:foo,2:time:unix_ns", "234,foobar")
+	f("1:metric:foo,2:time:custom:foobar", "234,234")
+
+	// Too big timestamp in seconds.
+	f("1:metric:foo,2:time:unix_s", "1,12345678901234567")
 
 	// Missing columns
 	f("3:metric:aaa", "123,456")
+
+	// Invalid value
+	f("1:metric:foo", "12foobar")
 }
 
 func TestRowsUnmarshalSuccess(t *testing.T) {
diff --git a/lib/protoparser/influx/parser.go b/lib/protoparser/influx/parser.go
index 76cf6bc3ed..e08d42ee2a 100644
--- a/lib/protoparser/influx/parser.go
+++ b/lib/protoparser/influx/parser.go
@@ -103,19 +103,23 @@ func (r *Row) unmarshal(s string, tagsPool []Tag, fieldsPool []Field, noEscapeCh
 	}
 	fieldsPool, err = unmarshalInfluxFields(fieldsPool, s[:n], noEscapeChars, hasQuotedFields)
 	if err != nil {
+		if strings.HasPrefix(s[n+1:], "HTTP/") {
+			return tagsPool, fieldsPool, fmt.Errorf("please switch from tcp to http protocol for data ingestion; " +
+				"do not set `-influxListenAddr` command-line flag, since it is needed for tcp protocol only")
+		}
 		return tagsPool, fieldsPool, err
 	}
 	r.Fields = fieldsPool[fieldsStart:]
 	s = s[n+1:]
 
 	// Parse timestamp
-	timestamp := fastfloat.ParseInt64BestEffort(s)
-	if timestamp == 0 && s != "0" {
+	timestamp, err := fastfloat.ParseInt64(s)
+	if err != nil {
 		if strings.HasPrefix(s, "HTTP/") {
 			return tagsPool, fieldsPool, fmt.Errorf("please switch from tcp to http protocol for data ingestion; " +
 				"do not set `-influxListenAddr` command-line flag, since it is needed for tcp protocol only")
 		}
-		return tagsPool, fieldsPool, fmt.Errorf("cannot parse timestamp %q", s)
+		return tagsPool, fieldsPool, fmt.Errorf("cannot parse timestamp %q: %w", s, err)
 	}
 	r.Timestamp = timestamp
 	return tagsPool, fieldsPool, nil
@@ -317,13 +321,19 @@ func parseFieldValue(s string, hasQuotedFields bool) (float64, error) {
 	if ch == 'i' {
 		// Integer value
 		ss := s[:len(s)-1]
-		n := fastfloat.ParseInt64BestEffort(ss)
+		n, err := fastfloat.ParseInt64(ss)
+		if err != nil {
+			return 0, err
+		}
 		return float64(n), nil
 	}
 	if ch == 'u' {
 		// Unsigned integer value
 		ss := s[:len(s)-1]
-		n := fastfloat.ParseUint64BestEffort(ss)
+		n, err := fastfloat.ParseUint64(ss)
+		if err != nil {
+			return 0, err
+		}
 		return float64(n), nil
 	}
 	if s == "t" || s == "T" || s == "true" || s == "True" || s == "TRUE" {
diff --git a/lib/protoparser/influx/parser_test.go b/lib/protoparser/influx/parser_test.go
index cd0be97bfc..6d75d04c05 100644
--- a/lib/protoparser/influx/parser_test.go
+++ b/lib/protoparser/influx/parser_test.go
@@ -111,6 +111,15 @@ func TestRowsUnmarshalFailure(t *testing.T) {
 
 	// Invalid timestamp
 	f("foo bar=123 baz")
+
+	// Invalid field value
+	f("foo bar=1abci")
+	f("foo bar=-2abci")
+	f("foo bar=3abcu")
+
+	// HTTP request line
+	f("GET /foo HTTP/1.1")
+	f("GET /foo?bar=baz HTTP/1.0")
 }
 
 func TestRowsUnmarshalSuccess(t *testing.T) {
diff --git a/lib/protoparser/opentsdb/parser.go b/lib/protoparser/opentsdb/parser.go
index ef6cae5593..a1fd9a624c 100644
--- a/lib/protoparser/opentsdb/parser.go
+++ b/lib/protoparser/opentsdb/parser.go
@@ -74,14 +74,21 @@ func (r *Row) unmarshal(s string, tagsPool []Tag) ([]Tag, error) {
 	if n < 0 {
 		return tagsPool, fmt.Errorf("cannot find whitespace between timestamp and value in %q", s)
 	}
-	r.Timestamp = int64(fastfloat.ParseBestEffort(tail[:n]))
+	timestamp, err := fastfloat.Parse(tail[:n])
+	if err != nil {
+		return tagsPool, fmt.Errorf("cannot parse timestamp from %q: %w", tail[:n], err)
+	}
+	r.Timestamp = int64(timestamp)
 	tail = tail[n+1:]
 	n = strings.IndexByte(tail, ' ')
 	if n < 0 {
 		return tagsPool, fmt.Errorf("cannot find whitespace between value and the first tag in %q", s)
 	}
-	r.Value = fastfloat.ParseBestEffort(tail[:n])
-	var err error
+	v, err := fastfloat.Parse(tail[:n])
+	if err != nil {
+		return tagsPool, fmt.Errorf("cannot parse value from %q: %w", tail[:n], err)
+	}
+	r.Value = v
 	tagsStart := len(tagsPool)
 	tagsPool, err = unmarshalTags(tagsPool, tail[n+1:])
 	if err != nil {
diff --git a/lib/protoparser/opentsdb/parser_test.go b/lib/protoparser/opentsdb/parser_test.go
index 63a2454248..bc7fc63a9e 100644
--- a/lib/protoparser/opentsdb/parser_test.go
+++ b/lib/protoparser/opentsdb/parser_test.go
@@ -35,12 +35,14 @@ func TestRowsUnmarshalFailure(t *testing.T) {
 
 	// Invalid timestamp
 	f("put aaa timestamp")
+	f("put foobar 3df4 -123456 a=b")
 
 	// Missing first tag
 	f("put aaa 123 43")
 
 	// Invalid value
 	f("put aaa 123 invalid-value")
+	f("put foobar 789 -123foo456 a=b")
 
 	// Invalid multiline
 	f("put aaa\nbbb 123 34")
diff --git a/lib/protoparser/opentsdbhttp/parser.go b/lib/protoparser/opentsdbhttp/parser.go
index 18edb621c7..32f1c9689b 100644
--- a/lib/protoparser/opentsdbhttp/parser.go
+++ b/lib/protoparser/opentsdbhttp/parser.go
@@ -112,9 +112,9 @@ func getFloat64(v *fastjson.Value) (float64, error) {
 		return v.Float64()
 	case fastjson.TypeString:
 		vStr, _ := v.StringBytes()
-		vFloat := fastfloat.ParseBestEffort(bytesutil.ToUnsafeString(vStr))
-		if vFloat == 0 && string(vStr) != "0" && string(vStr) != "0.0" {
-			return 0, fmt.Errorf("invalid float64 value: %q", vStr)
+		vFloat, err := fastfloat.Parse(bytesutil.ToUnsafeString(vStr))
+		if err != nil {
+			return 0, fmt.Errorf("cannot parse value %q: %w", vStr, err)
 		}
 		return vFloat, nil
 	default:
diff --git a/lib/protoparser/prometheus/parser.go b/lib/protoparser/prometheus/parser.go
index cafe48bc38..4972c7bd6e 100644
--- a/lib/protoparser/prometheus/parser.go
+++ b/lib/protoparser/prometheus/parser.go
@@ -139,13 +139,25 @@ func (r *Row) unmarshal(s string, tagsPool []Tag, noEscapes bool) ([]Tag, error)
 	n = nextWhitespace(s)
 	if n < 0 {
 		// There is no timestamp.
-		r.Value = fastfloat.ParseBestEffort(s)
+		v, err := fastfloat.Parse(s)
+		if err != nil {
+			return tagsPool, fmt.Errorf("cannot parse value %q: %w", s, err)
+		}
+		r.Value = v
 		return tagsPool, nil
 	}
 	// There is timestamp.
-	r.Value = fastfloat.ParseBestEffort(s[:n])
+	v, err := fastfloat.Parse(s[:n])
+	if err != nil {
+		return tagsPool, fmt.Errorf("cannot parse value %q: %w", s[:n], err)
+	}
 	s = skipLeadingWhitespace(s[n+1:])
-	r.Timestamp = fastfloat.ParseInt64BestEffort(s)
+	ts, err := fastfloat.ParseInt64(s)
+	if err != nil {
+		return tagsPool, fmt.Errorf("cannot parse timestamp %q: %w", s, err)
+	}
+	r.Value = v
+	r.Timestamp = ts
 	return tagsPool, nil
 }
 
diff --git a/lib/protoparser/prometheus/parser_test.go b/lib/protoparser/prometheus/parser_test.go
index 6a2e0740f2..dafa3f447a 100644
--- a/lib/protoparser/prometheus/parser_test.go
+++ b/lib/protoparser/prometheus/parser_test.go
@@ -129,6 +129,13 @@ func TestRowsUnmarshalFailure(t *testing.T) {
 	f(" aaa ")
 	f(" aaa   \n")
 	f(` aa{foo="bar"}   ` + "\n")
+
+	// Invalid value
+	f("foo bar")
+	f("foo bar 124")
+
+	// Invalid timestamp
+	f("foo 123 bar")
 }
 
 func TestRowsUnmarshalSuccess(t *testing.T) {
diff --git a/vendor/github.com/valyala/fastjson/parser.go b/vendor/github.com/valyala/fastjson/parser.go
index 06a3acc4ad..885e1841ef 100644
--- a/vendor/github.com/valyala/fastjson/parser.go
+++ b/vendor/github.com/valyala/fastjson/parser.go
@@ -897,8 +897,7 @@ func (v *Value) Float64() (float64, error) {
 	if v.Type() != TypeNumber {
 		return 0, fmt.Errorf("value doesn't contain number; it contains %s", v.Type())
 	}
-	f := fastfloat.ParseBestEffort(v.s)
-	return f, nil
+	return fastfloat.Parse(v.s)
 }
 
 // Int returns the underlying JSON int for the v.
@@ -908,9 +907,9 @@ func (v *Value) Int() (int, error) {
 	if v.Type() != TypeNumber {
 		return 0, fmt.Errorf("value doesn't contain number; it contains %s", v.Type())
 	}
-	n := fastfloat.ParseInt64BestEffort(v.s)
-	if n == 0 && v.s != "0" {
-		return 0, fmt.Errorf("cannot parse int %q", v.s)
+	n, err := fastfloat.ParseInt64(v.s)
+	if err != nil {
+		return 0, err
 	}
 	nn := int(n)
 	if int64(nn) != n {
@@ -926,9 +925,9 @@ func (v *Value) Uint() (uint, error) {
 	if v.Type() != TypeNumber {
 		return 0, fmt.Errorf("value doesn't contain number; it contains %s", v.Type())
 	}
-	n := fastfloat.ParseUint64BestEffort(v.s)
-	if n == 0 && v.s != "0" {
-		return 0, fmt.Errorf("cannot parse uint %q", v.s)
+	n, err := fastfloat.ParseUint64(v.s)
+	if err != nil {
+		return 0, err
 	}
 	nn := uint(n)
 	if uint64(nn) != n {
@@ -944,11 +943,7 @@ func (v *Value) Int64() (int64, error) {
 	if v.Type() != TypeNumber {
 		return 0, fmt.Errorf("value doesn't contain number; it contains %s", v.Type())
 	}
-	n := fastfloat.ParseInt64BestEffort(v.s)
-	if n == 0 && v.s != "0" {
-		return 0, fmt.Errorf("cannot parse int64 %q", v.s)
-	}
-	return n, nil
+	return fastfloat.ParseInt64(v.s)
 }
 
 // Uint64 returns the underlying JSON uint64 for the v.
@@ -958,11 +953,7 @@ func (v *Value) Uint64() (uint64, error) {
 	if v.Type() != TypeNumber {
 		return 0, fmt.Errorf("value doesn't contain number; it contains %s", v.Type())
 	}
-	n := fastfloat.ParseUint64BestEffort(v.s)
-	if n == 0 && v.s != "0" {
-		return 0, fmt.Errorf("cannot parse uint64 %q", v.s)
-	}
-	return n, nil
+	return fastfloat.ParseUint64(v.s)
 }
 
 // Bool returns the underlying JSON bool for the v.
diff --git a/vendor/modules.txt b/vendor/modules.txt
index 78a07c5a54..5af852bc1f 100644
--- a/vendor/modules.txt
+++ b/vendor/modules.txt
@@ -100,7 +100,7 @@ github.com/klauspost/compress/zstd/internal/xxhash
 github.com/lithammer/go-jump-consistent-hash
 # github.com/valyala/bytebufferpool v1.0.0
 github.com/valyala/bytebufferpool
-# github.com/valyala/fastjson v1.6.0
+# github.com/valyala/fastjson v1.6.1
 github.com/valyala/fastjson
 github.com/valyala/fastjson/fastfloat
 # github.com/valyala/fastrand v1.0.0