lib/protoparser: report more errors for incorrect timestamps and/or values

Previously certain errors in timestamps and/or values could be silently skipped,
which could lead to samples with zero values stored in the database.

Updates https://github.com/VictoriaMetrics/vmctl/issues/25
This commit is contained in:
Aliaksandr Valialkin 2020-09-16 02:03:35 +03:00
parent 9bc8484ab6
commit d8183c3124
14 changed files with 100 additions and 40 deletions

2
go.mod
View file

@ -13,7 +13,7 @@ require (
github.com/cespare/xxhash/v2 v2.1.1 github.com/cespare/xxhash/v2 v2.1.1
github.com/golang/snappy v0.0.1 github.com/golang/snappy v0.0.1
github.com/klauspost/compress v1.11.0 github.com/klauspost/compress v1.11.0
github.com/valyala/fastjson v1.6.0 github.com/valyala/fastjson v1.6.1
github.com/valyala/fastrand v1.0.0 github.com/valyala/fastrand v1.0.0
github.com/valyala/fasttemplate v1.2.1 github.com/valyala/fasttemplate v1.2.1
github.com/valyala/gozstd v1.8.3 github.com/valyala/gozstd v1.8.3

4
go.sum
View file

@ -173,8 +173,8 @@ github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5
github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw=
github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
github.com/valyala/fasthttp v1.15.1/go.mod h1:YOKImeEosDdBPnxc0gy7INqi3m1zK6A+xl6TwOBhHCA= github.com/valyala/fasthttp v1.15.1/go.mod h1:YOKImeEosDdBPnxc0gy7INqi3m1zK6A+xl6TwOBhHCA=
github.com/valyala/fastjson v1.6.0 h1:aJV8Tvmeq1mCXxDOVV8raxBoyA3eE8xwTgW8SGQ5yKM= github.com/valyala/fastjson v1.6.1 h1:qJs/Kz/HebWzk8LmhOrSm7kdOyJBr1XB+zSkYtEEfQE=
github.com/valyala/fastjson v1.6.0/go.mod h1:CLCAqky6SMuOcxStkYQvblddUtoRxhYMGLrsQns1aXY= github.com/valyala/fastjson v1.6.1/go.mod h1:CLCAqky6SMuOcxStkYQvblddUtoRxhYMGLrsQns1aXY=
github.com/valyala/fastrand v1.0.0 h1:LUKT9aKer2dVQNUi3waewTbKV+7H17kvWFNKs2ObdkI= github.com/valyala/fastrand v1.0.0 h1:LUKT9aKer2dVQNUi3waewTbKV+7H17kvWFNKs2ObdkI=
github.com/valyala/fastrand v1.0.0/go.mod h1:HWqCzkrkg6QXT8V2EXWvXCoow7vLwOFN002oeRzjapQ= github.com/valyala/fastrand v1.0.0/go.mod h1:HWqCzkrkg6QXT8V2EXWvXCoow7vLwOFN002oeRzjapQ=
github.com/valyala/fasttemplate v1.2.1 h1:TVEnxayobAdVkhQfrfes2IzOB6o+z4roRkPF52WA1u4= github.com/valyala/fasttemplate v1.2.1 h1:TVEnxayobAdVkhQfrfes2IzOB6o+z4roRkPF52WA1u4=

View file

@ -136,7 +136,10 @@ func parseTimeFormat(format string) (func(s string) (int64, error), error) {
} }
func parseUnixTimestampSeconds(s string) (int64, error) { func parseUnixTimestampSeconds(s string) (int64, error) {
n := fastfloat.ParseInt64BestEffort(s) n, err := fastfloat.ParseInt64(s)
if err != nil {
return 0, fmt.Errorf("cannot parse timestamp seconds from %q: %w", s, err)
}
if n > int64(1<<63-1)/1e3 { if n > int64(1<<63-1)/1e3 {
return 0, fmt.Errorf("too big unix timestamp in seconds: %d; must be smaller than %d", n, int64(1<<63-1)/1e3) return 0, fmt.Errorf("too big unix timestamp in seconds: %d; must be smaller than %d", n, int64(1<<63-1)/1e3)
} }
@ -144,12 +147,18 @@ func parseUnixTimestampSeconds(s string) (int64, error) {
} }
func parseUnixTimestampMilliseconds(s string) (int64, error) { func parseUnixTimestampMilliseconds(s string) (int64, error) {
n := fastfloat.ParseInt64BestEffort(s) n, err := fastfloat.ParseInt64(s)
if err != nil {
return 0, fmt.Errorf("cannot parse timestamp milliseconds from %q: %w", s, err)
}
return n, nil return n, nil
} }
func parseUnixTimestampNanoseconds(s string) (int64, error) { func parseUnixTimestampNanoseconds(s string) (int64, error) {
n := fastfloat.ParseInt64BestEffort(s) n, err := fastfloat.ParseInt64(s)
if err != nil {
return 0, fmt.Errorf("cannot parse timestamp nanoseconds from %q: %w", s, err)
}
return n / 1e6, nil return n / 1e6, nil
} }

View file

@ -109,7 +109,10 @@ func parseRows(sc *scanner, dst []Row, tags []Tag, metrics []metric, cds []Colum
// The given field is ignored. // The given field is ignored.
continue continue
} }
value := fastfloat.ParseBestEffort(sc.Column) value, err := fastfloat.Parse(sc.Column)
if err != nil {
sc.Error = fmt.Errorf("cannot parse metric value for %q from %q: %w", metricName, sc.Column, err)
}
metrics = append(metrics, metric{ metrics = append(metrics, metric{
Name: metricName, Name: metricName,
Value: value, Value: value,

View file

@ -20,9 +20,19 @@ func TestRowsUnmarshalFailure(t *testing.T) {
} }
// Invalid timestamp // Invalid timestamp
f("1:metric:foo,2:time:rfc3339", "234,foobar") f("1:metric:foo,2:time:rfc3339", "234,foobar")
f("1:metric:foo,2:time:unix_s", "234,foobar")
f("1:metric:foo,2:time:unix_ms", "234,foobar")
f("1:metric:foo,2:time:unix_ns", "234,foobar")
f("1:metric:foo,2:time:custom:foobar", "234,234")
// Too big timestamp in seconds.
f("1:metric:foo,2:time:unix_s", "1,12345678901234567")
// Missing columns // Missing columns
f("3:metric:aaa", "123,456") f("3:metric:aaa", "123,456")
// Invalid value
f("1:metric:foo", "12foobar")
} }
func TestRowsUnmarshalSuccess(t *testing.T) { func TestRowsUnmarshalSuccess(t *testing.T) {

View file

@ -103,19 +103,23 @@ func (r *Row) unmarshal(s string, tagsPool []Tag, fieldsPool []Field, noEscapeCh
} }
fieldsPool, err = unmarshalInfluxFields(fieldsPool, s[:n], noEscapeChars, hasQuotedFields) fieldsPool, err = unmarshalInfluxFields(fieldsPool, s[:n], noEscapeChars, hasQuotedFields)
if err != nil { if err != nil {
if strings.HasPrefix(s[n+1:], "HTTP/") {
return tagsPool, fieldsPool, fmt.Errorf("please switch from tcp to http protocol for data ingestion; " +
"do not set `-influxListenAddr` command-line flag, since it is needed for tcp protocol only")
}
return tagsPool, fieldsPool, err return tagsPool, fieldsPool, err
} }
r.Fields = fieldsPool[fieldsStart:] r.Fields = fieldsPool[fieldsStart:]
s = s[n+1:] s = s[n+1:]
// Parse timestamp // Parse timestamp
timestamp := fastfloat.ParseInt64BestEffort(s) timestamp, err := fastfloat.ParseInt64(s)
if timestamp == 0 && s != "0" { if err != nil {
if strings.HasPrefix(s, "HTTP/") { if strings.HasPrefix(s, "HTTP/") {
return tagsPool, fieldsPool, fmt.Errorf("please switch from tcp to http protocol for data ingestion; " + return tagsPool, fieldsPool, fmt.Errorf("please switch from tcp to http protocol for data ingestion; " +
"do not set `-influxListenAddr` command-line flag, since it is needed for tcp protocol only") "do not set `-influxListenAddr` command-line flag, since it is needed for tcp protocol only")
} }
return tagsPool, fieldsPool, fmt.Errorf("cannot parse timestamp %q", s) return tagsPool, fieldsPool, fmt.Errorf("cannot parse timestamp %q: %w", s, err)
} }
r.Timestamp = timestamp r.Timestamp = timestamp
return tagsPool, fieldsPool, nil return tagsPool, fieldsPool, nil
@ -317,13 +321,19 @@ func parseFieldValue(s string, hasQuotedFields bool) (float64, error) {
if ch == 'i' { if ch == 'i' {
// Integer value // Integer value
ss := s[:len(s)-1] ss := s[:len(s)-1]
n := fastfloat.ParseInt64BestEffort(ss) n, err := fastfloat.ParseInt64(ss)
if err != nil {
return 0, err
}
return float64(n), nil return float64(n), nil
} }
if ch == 'u' { if ch == 'u' {
// Unsigned integer value // Unsigned integer value
ss := s[:len(s)-1] ss := s[:len(s)-1]
n := fastfloat.ParseUint64BestEffort(ss) n, err := fastfloat.ParseUint64(ss)
if err != nil {
return 0, err
}
return float64(n), nil return float64(n), nil
} }
if s == "t" || s == "T" || s == "true" || s == "True" || s == "TRUE" { if s == "t" || s == "T" || s == "true" || s == "True" || s == "TRUE" {

View file

@ -111,6 +111,15 @@ func TestRowsUnmarshalFailure(t *testing.T) {
// Invalid timestamp // Invalid timestamp
f("foo bar=123 baz") f("foo bar=123 baz")
// Invalid field value
f("foo bar=1abci")
f("foo bar=-2abci")
f("foo bar=3abcu")
// HTTP request line
f("GET /foo HTTP/1.1")
f("GET /foo?bar=baz HTTP/1.0")
} }
func TestRowsUnmarshalSuccess(t *testing.T) { func TestRowsUnmarshalSuccess(t *testing.T) {

View file

@ -74,14 +74,21 @@ func (r *Row) unmarshal(s string, tagsPool []Tag) ([]Tag, error) {
if n < 0 { if n < 0 {
return tagsPool, fmt.Errorf("cannot find whitespace between timestamp and value in %q", s) return tagsPool, fmt.Errorf("cannot find whitespace between timestamp and value in %q", s)
} }
r.Timestamp = int64(fastfloat.ParseBestEffort(tail[:n])) timestamp, err := fastfloat.Parse(tail[:n])
if err != nil {
return tagsPool, fmt.Errorf("cannot parse timestamp from %q: %w", tail[:n], err)
}
r.Timestamp = int64(timestamp)
tail = tail[n+1:] tail = tail[n+1:]
n = strings.IndexByte(tail, ' ') n = strings.IndexByte(tail, ' ')
if n < 0 { if n < 0 {
return tagsPool, fmt.Errorf("cannot find whitespace between value and the first tag in %q", s) return tagsPool, fmt.Errorf("cannot find whitespace between value and the first tag in %q", s)
} }
r.Value = fastfloat.ParseBestEffort(tail[:n]) v, err := fastfloat.Parse(tail[:n])
var err error if err != nil {
return tagsPool, fmt.Errorf("cannot parse value from %q: %w", tail[:n], err)
}
r.Value = v
tagsStart := len(tagsPool) tagsStart := len(tagsPool)
tagsPool, err = unmarshalTags(tagsPool, tail[n+1:]) tagsPool, err = unmarshalTags(tagsPool, tail[n+1:])
if err != nil { if err != nil {

View file

@ -35,12 +35,14 @@ func TestRowsUnmarshalFailure(t *testing.T) {
// Invalid timestamp // Invalid timestamp
f("put aaa timestamp") f("put aaa timestamp")
f("put foobar 3df4 -123456 a=b")
// Missing first tag // Missing first tag
f("put aaa 123 43") f("put aaa 123 43")
// Invalid value // Invalid value
f("put aaa 123 invalid-value") f("put aaa 123 invalid-value")
f("put foobar 789 -123foo456 a=b")
// Invalid multiline // Invalid multiline
f("put aaa\nbbb 123 34") f("put aaa\nbbb 123 34")

View file

@ -112,9 +112,9 @@ func getFloat64(v *fastjson.Value) (float64, error) {
return v.Float64() return v.Float64()
case fastjson.TypeString: case fastjson.TypeString:
vStr, _ := v.StringBytes() vStr, _ := v.StringBytes()
vFloat := fastfloat.ParseBestEffort(bytesutil.ToUnsafeString(vStr)) vFloat, err := fastfloat.Parse(bytesutil.ToUnsafeString(vStr))
if vFloat == 0 && string(vStr) != "0" && string(vStr) != "0.0" { if err != nil {
return 0, fmt.Errorf("invalid float64 value: %q", vStr) return 0, fmt.Errorf("cannot parse value %q: %w", vStr, err)
} }
return vFloat, nil return vFloat, nil
default: default:

View file

@ -139,13 +139,25 @@ func (r *Row) unmarshal(s string, tagsPool []Tag, noEscapes bool) ([]Tag, error)
n = nextWhitespace(s) n = nextWhitespace(s)
if n < 0 { if n < 0 {
// There is no timestamp. // There is no timestamp.
r.Value = fastfloat.ParseBestEffort(s) v, err := fastfloat.Parse(s)
if err != nil {
return tagsPool, fmt.Errorf("cannot parse value %q: %w", s, err)
}
r.Value = v
return tagsPool, nil return tagsPool, nil
} }
// There is timestamp. // There is timestamp.
r.Value = fastfloat.ParseBestEffort(s[:n]) v, err := fastfloat.Parse(s[:n])
if err != nil {
return tagsPool, fmt.Errorf("cannot parse value %q: %w", s[:n], err)
}
s = skipLeadingWhitespace(s[n+1:]) s = skipLeadingWhitespace(s[n+1:])
r.Timestamp = fastfloat.ParseInt64BestEffort(s) ts, err := fastfloat.ParseInt64(s)
if err != nil {
return tagsPool, fmt.Errorf("cannot parse timestamp %q: %w", s, err)
}
r.Value = v
r.Timestamp = ts
return tagsPool, nil return tagsPool, nil
} }

View file

@ -129,6 +129,13 @@ func TestRowsUnmarshalFailure(t *testing.T) {
f(" aaa ") f(" aaa ")
f(" aaa \n") f(" aaa \n")
f(` aa{foo="bar"} ` + "\n") f(` aa{foo="bar"} ` + "\n")
// Invalid value
f("foo bar")
f("foo bar 124")
// Invalid timestamp
f("foo 123 bar")
} }
func TestRowsUnmarshalSuccess(t *testing.T) { func TestRowsUnmarshalSuccess(t *testing.T) {

View file

@ -897,8 +897,7 @@ func (v *Value) Float64() (float64, error) {
if v.Type() != TypeNumber { if v.Type() != TypeNumber {
return 0, fmt.Errorf("value doesn't contain number; it contains %s", v.Type()) return 0, fmt.Errorf("value doesn't contain number; it contains %s", v.Type())
} }
f := fastfloat.ParseBestEffort(v.s) return fastfloat.Parse(v.s)
return f, nil
} }
// Int returns the underlying JSON int for the v. // Int returns the underlying JSON int for the v.
@ -908,9 +907,9 @@ func (v *Value) Int() (int, error) {
if v.Type() != TypeNumber { if v.Type() != TypeNumber {
return 0, fmt.Errorf("value doesn't contain number; it contains %s", v.Type()) return 0, fmt.Errorf("value doesn't contain number; it contains %s", v.Type())
} }
n := fastfloat.ParseInt64BestEffort(v.s) n, err := fastfloat.ParseInt64(v.s)
if n == 0 && v.s != "0" { if err != nil {
return 0, fmt.Errorf("cannot parse int %q", v.s) return 0, err
} }
nn := int(n) nn := int(n)
if int64(nn) != n { if int64(nn) != n {
@ -926,9 +925,9 @@ func (v *Value) Uint() (uint, error) {
if v.Type() != TypeNumber { if v.Type() != TypeNumber {
return 0, fmt.Errorf("value doesn't contain number; it contains %s", v.Type()) return 0, fmt.Errorf("value doesn't contain number; it contains %s", v.Type())
} }
n := fastfloat.ParseUint64BestEffort(v.s) n, err := fastfloat.ParseUint64(v.s)
if n == 0 && v.s != "0" { if err != nil {
return 0, fmt.Errorf("cannot parse uint %q", v.s) return 0, err
} }
nn := uint(n) nn := uint(n)
if uint64(nn) != n { if uint64(nn) != n {
@ -944,11 +943,7 @@ func (v *Value) Int64() (int64, error) {
if v.Type() != TypeNumber { if v.Type() != TypeNumber {
return 0, fmt.Errorf("value doesn't contain number; it contains %s", v.Type()) return 0, fmt.Errorf("value doesn't contain number; it contains %s", v.Type())
} }
n := fastfloat.ParseInt64BestEffort(v.s) return fastfloat.ParseInt64(v.s)
if n == 0 && v.s != "0" {
return 0, fmt.Errorf("cannot parse int64 %q", v.s)
}
return n, nil
} }
// Uint64 returns the underlying JSON uint64 for the v. // Uint64 returns the underlying JSON uint64 for the v.
@ -958,11 +953,7 @@ func (v *Value) Uint64() (uint64, error) {
if v.Type() != TypeNumber { if v.Type() != TypeNumber {
return 0, fmt.Errorf("value doesn't contain number; it contains %s", v.Type()) return 0, fmt.Errorf("value doesn't contain number; it contains %s", v.Type())
} }
n := fastfloat.ParseUint64BestEffort(v.s) return fastfloat.ParseUint64(v.s)
if n == 0 && v.s != "0" {
return 0, fmt.Errorf("cannot parse uint64 %q", v.s)
}
return n, nil
} }
// Bool returns the underlying JSON bool for the v. // Bool returns the underlying JSON bool for the v.

2
vendor/modules.txt vendored
View file

@ -98,7 +98,7 @@ github.com/klauspost/compress/zstd
github.com/klauspost/compress/zstd/internal/xxhash github.com/klauspost/compress/zstd/internal/xxhash
# github.com/valyala/bytebufferpool v1.0.0 # github.com/valyala/bytebufferpool v1.0.0
github.com/valyala/bytebufferpool github.com/valyala/bytebufferpool
# github.com/valyala/fastjson v1.6.0 # github.com/valyala/fastjson v1.6.1
github.com/valyala/fastjson github.com/valyala/fastjson
github.com/valyala/fastjson/fastfloat github.com/valyala/fastjson/fastfloat
# github.com/valyala/fastrand v1.0.0 # github.com/valyala/fastrand v1.0.0