mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
daa7183749
### Describe Your Changes Fixes https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7090 ### Checklist The following checks are **mandatory**: - [ ] My change adheres [VictoriaMetrics contributing guidelines](https://docs.victoriametrics.com/contributing/). --------- Signed-off-by: hagen1778 <roman@victoriametrics.com> Co-authored-by: hagen1778 <roman@victoriametrics.com>
434 lines
9.9 KiB
Go
434 lines
9.9 KiB
Go
package influx
|
|
|
|
import (
|
|
"fmt"
|
|
"strings"
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
|
"github.com/VictoriaMetrics/metrics"
|
|
"github.com/valyala/fastjson/fastfloat"
|
|
)
|
|
|
|
// Rows contains parsed influx rows.
|
|
type Rows struct {
|
|
Rows []Row
|
|
IgnoreErrs bool
|
|
|
|
tagsPool []Tag
|
|
fieldsPool []Field
|
|
}
|
|
|
|
// Reset resets rs.
|
|
func (rs *Rows) Reset() {
|
|
// Reset rows, tags and fields in order to remove references to old data,
|
|
// so GC could collect it.
|
|
|
|
for i := range rs.Rows {
|
|
rs.Rows[i].reset()
|
|
}
|
|
rs.Rows = rs.Rows[:0]
|
|
|
|
for i := range rs.tagsPool {
|
|
rs.tagsPool[i].reset()
|
|
}
|
|
rs.tagsPool = rs.tagsPool[:0]
|
|
|
|
for i := range rs.fieldsPool {
|
|
rs.fieldsPool[i].reset()
|
|
}
|
|
rs.fieldsPool = rs.fieldsPool[:0]
|
|
}
|
|
|
|
// Unmarshal unmarshals influx line protocol rows from s.
|
|
//
|
|
// See https://docs.influxdata.com/influxdb/v1.7/write_protocols/line_protocol_tutorial/
|
|
//
|
|
// s shouldn't be modified when rs is in use.
|
|
func (rs *Rows) Unmarshal(s string) error {
|
|
rs.reset()
|
|
return rs.unmarshal(s)
|
|
}
|
|
|
|
func (rs *Rows) reset() {
|
|
rs.Rows = rs.Rows[:0]
|
|
rs.tagsPool = rs.tagsPool[:0]
|
|
rs.fieldsPool = rs.fieldsPool[:0]
|
|
}
|
|
|
|
// Row is a single influx row.
|
|
type Row struct {
|
|
Measurement string
|
|
Tags []Tag
|
|
Fields []Field
|
|
Timestamp int64
|
|
}
|
|
|
|
func (r *Row) reset() {
|
|
r.Measurement = ""
|
|
r.Tags = nil
|
|
r.Fields = nil
|
|
r.Timestamp = 0
|
|
}
|
|
|
|
func (r *Row) unmarshal(s string, tagsPool []Tag, fieldsPool []Field, noEscapeChars bool) ([]Tag, []Field, error) {
|
|
r.reset()
|
|
n := nextUnescapedChar(s, ' ', noEscapeChars)
|
|
if n < 0 {
|
|
return tagsPool, fieldsPool, fmt.Errorf("cannot find Whitespace I in %q", s)
|
|
}
|
|
measurementTags := s[:n]
|
|
s = stripLeadingWhitespace(s[n+1:])
|
|
|
|
// Parse measurement and tags
|
|
var err error
|
|
n = nextUnescapedChar(measurementTags, ',', noEscapeChars)
|
|
if n >= 0 {
|
|
tagsStart := len(tagsPool)
|
|
tagsPool, err = unmarshalTags(tagsPool, measurementTags[n+1:], noEscapeChars)
|
|
if err != nil {
|
|
return tagsPool, fieldsPool, err
|
|
}
|
|
tags := tagsPool[tagsStart:]
|
|
r.Tags = tags[:len(tags):len(tags)]
|
|
measurementTags = measurementTags[:n]
|
|
}
|
|
r.Measurement = unescapeTagValue(measurementTags, noEscapeChars)
|
|
// Allow empty r.Measurement. In this case metric name is constructed directly from field keys.
|
|
|
|
// Parse fields
|
|
fieldsStart := len(fieldsPool)
|
|
hasQuotedFields := nextUnescapedChar(s, '"', noEscapeChars) >= 0
|
|
n = nextUnquotedChar(s, ' ', noEscapeChars, hasQuotedFields)
|
|
if n < 0 {
|
|
// No timestamp.
|
|
fieldsPool, err = unmarshalInfluxFields(fieldsPool, s, noEscapeChars, hasQuotedFields)
|
|
if err != nil {
|
|
return tagsPool, fieldsPool, err
|
|
}
|
|
fields := fieldsPool[fieldsStart:]
|
|
r.Fields = fields[:len(fields):len(fields)]
|
|
return tagsPool, fieldsPool, nil
|
|
}
|
|
fieldsPool, err = unmarshalInfluxFields(fieldsPool, s[:n], noEscapeChars, hasQuotedFields)
|
|
if err != nil {
|
|
if strings.HasPrefix(s[n+1:], "HTTP/") {
|
|
return tagsPool, fieldsPool, fmt.Errorf("please switch from tcp to http protocol for data ingestion; " +
|
|
"do not set `-influxListenAddr` command-line flag, since it is needed for tcp protocol only")
|
|
}
|
|
return tagsPool, fieldsPool, err
|
|
}
|
|
r.Fields = fieldsPool[fieldsStart:]
|
|
s = stripLeadingWhitespace(s[n+1:])
|
|
|
|
// Parse timestamp
|
|
timestamp, err := fastfloat.ParseInt64(s)
|
|
if err != nil {
|
|
if strings.HasPrefix(s, "HTTP/") {
|
|
return tagsPool, fieldsPool, fmt.Errorf("please switch from tcp to http protocol for data ingestion; " +
|
|
"do not set `-influxListenAddr` command-line flag, since it is needed for tcp protocol only")
|
|
}
|
|
return tagsPool, fieldsPool, fmt.Errorf("cannot parse timestamp %q: %w", s, err)
|
|
}
|
|
r.Timestamp = timestamp
|
|
return tagsPool, fieldsPool, nil
|
|
}
|
|
|
|
// Tag represents influx tag.
|
|
type Tag struct {
|
|
Key string
|
|
Value string
|
|
}
|
|
|
|
func (tag *Tag) reset() {
|
|
tag.Key = ""
|
|
tag.Value = ""
|
|
}
|
|
|
|
func (tag *Tag) unmarshal(s string, noEscapeChars bool) error {
|
|
tag.reset()
|
|
n := nextUnescapedChar(s, '=', noEscapeChars)
|
|
if n < 0 {
|
|
return fmt.Errorf("missing tag value for %q", s)
|
|
}
|
|
tag.Key = unescapeTagValue(s[:n], noEscapeChars)
|
|
tag.Value = unescapeTagValue(s[n+1:], noEscapeChars)
|
|
return nil
|
|
}
|
|
|
|
// Field represents influx field.
|
|
type Field struct {
|
|
Key string
|
|
Value float64
|
|
}
|
|
|
|
func (f *Field) reset() {
|
|
f.Key = ""
|
|
f.Value = 0
|
|
}
|
|
|
|
func (f *Field) unmarshal(s string, noEscapeChars, hasQuotedFields bool) error {
|
|
f.reset()
|
|
n := nextUnescapedChar(s, '=', noEscapeChars)
|
|
if n < 0 {
|
|
return fmt.Errorf("missing field value for %q", s)
|
|
}
|
|
f.Key = unescapeTagValue(s[:n], noEscapeChars)
|
|
if len(f.Key) == 0 {
|
|
return fmt.Errorf("field key cannot be empty")
|
|
}
|
|
v, err := parseFieldValue(s[n+1:], hasQuotedFields)
|
|
if err != nil {
|
|
return fmt.Errorf("cannot parse field value for %q: %w", f.Key, err)
|
|
}
|
|
f.Value = v
|
|
return nil
|
|
}
|
|
|
|
func (rs *Rows) unmarshal(s string) error {
|
|
noEscapeChars := strings.IndexByte(s, '\\') < 0
|
|
for len(s) > 0 {
|
|
n := strings.IndexByte(s, '\n')
|
|
if n < 0 {
|
|
// The last line.
|
|
n = len(s)
|
|
}
|
|
err := rs.unmarshalRow(s[:n], noEscapeChars)
|
|
if err != nil {
|
|
if !rs.IgnoreErrs {
|
|
return fmt.Errorf("incorrect influx line %q: %w", s, err)
|
|
}
|
|
logger.Errorf("skipping InfluxDB line %q because of error: %s", s, err)
|
|
invalidLines.Inc()
|
|
}
|
|
if len(s) == n {
|
|
return nil
|
|
}
|
|
s = s[n+1:]
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (rs *Rows) unmarshalRow(s string, noEscapeChars bool) error {
|
|
if len(s) > 0 && s[len(s)-1] == '\r' {
|
|
s = s[:len(s)-1]
|
|
}
|
|
if len(s) == 0 {
|
|
// Skip empty line
|
|
return nil
|
|
}
|
|
if s[0] == '#' {
|
|
// Skip comment
|
|
return nil
|
|
}
|
|
|
|
if cap(rs.Rows) > len(rs.Rows) {
|
|
rs.Rows = rs.Rows[:len(rs.Rows)+1]
|
|
} else {
|
|
rs.Rows = append(rs.Rows, Row{})
|
|
}
|
|
r := &rs.Rows[len(rs.Rows)-1]
|
|
var err error
|
|
rs.tagsPool, rs.fieldsPool, err = r.unmarshal(s, rs.tagsPool, rs.fieldsPool, noEscapeChars)
|
|
if err != nil {
|
|
rs.Rows = rs.Rows[:len(rs.Rows)-1]
|
|
}
|
|
return err
|
|
}
|
|
|
|
var invalidLines = metrics.NewCounter(`vm_rows_invalid_total{type="influx"}`)
|
|
|
|
func unmarshalTags(dst []Tag, s string, noEscapeChars bool) ([]Tag, error) {
|
|
for {
|
|
if cap(dst) > len(dst) {
|
|
dst = dst[:len(dst)+1]
|
|
} else {
|
|
dst = append(dst, Tag{})
|
|
}
|
|
tag := &dst[len(dst)-1]
|
|
n := nextUnescapedChar(s, ',', noEscapeChars)
|
|
if n < 0 {
|
|
if err := tag.unmarshal(s, noEscapeChars); err != nil {
|
|
return dst[:len(dst)-1], err
|
|
}
|
|
if len(tag.Key) == 0 || len(tag.Value) == 0 {
|
|
// Skip empty tag
|
|
dst = dst[:len(dst)-1]
|
|
}
|
|
return dst, nil
|
|
}
|
|
if err := tag.unmarshal(s[:n], noEscapeChars); err != nil {
|
|
return dst[:len(dst)-1], err
|
|
}
|
|
s = s[n+1:]
|
|
if len(tag.Key) == 0 || len(tag.Value) == 0 {
|
|
// Skip empty tag
|
|
dst = dst[:len(dst)-1]
|
|
}
|
|
}
|
|
}
|
|
|
|
func unmarshalInfluxFields(dst []Field, s string, noEscapeChars, hasQuotedFields bool) ([]Field, error) {
|
|
for {
|
|
if cap(dst) > len(dst) {
|
|
dst = dst[:len(dst)+1]
|
|
} else {
|
|
dst = append(dst, Field{})
|
|
}
|
|
f := &dst[len(dst)-1]
|
|
n := nextUnquotedChar(s, ',', noEscapeChars, hasQuotedFields)
|
|
if n < 0 {
|
|
if err := f.unmarshal(s, noEscapeChars, hasQuotedFields); err != nil {
|
|
return dst, err
|
|
}
|
|
return dst, nil
|
|
}
|
|
if err := f.unmarshal(s[:n], noEscapeChars, hasQuotedFields); err != nil {
|
|
return dst, err
|
|
}
|
|
s = s[n+1:]
|
|
}
|
|
}
|
|
|
|
func unescapeTagValue(s string, noEscapeChars bool) string {
|
|
if noEscapeChars {
|
|
// Fast path - no escape chars.
|
|
return s
|
|
}
|
|
n := strings.IndexByte(s, '\\')
|
|
if n < 0 {
|
|
return s
|
|
}
|
|
|
|
// Slow path. Remove escape chars.
|
|
dst := make([]byte, 0, len(s))
|
|
for {
|
|
dst = append(dst, s[:n]...)
|
|
s = s[n+1:]
|
|
if len(s) == 0 {
|
|
return string(append(dst, '\\'))
|
|
}
|
|
ch := s[0]
|
|
if ch != ' ' && ch != ',' && ch != '=' && ch != '\\' {
|
|
dst = append(dst, '\\')
|
|
}
|
|
dst = append(dst, ch)
|
|
s = s[1:]
|
|
n = strings.IndexByte(s, '\\')
|
|
if n < 0 {
|
|
return string(append(dst, s...))
|
|
}
|
|
}
|
|
}
|
|
|
|
func parseFieldValue(s string, hasQuotedFields bool) (float64, error) {
|
|
if len(s) == 0 {
|
|
return 0, fmt.Errorf("field value cannot be empty")
|
|
}
|
|
if hasQuotedFields && s[0] == '"' {
|
|
if len(s) < 2 || s[len(s)-1] != '"' {
|
|
return 0, fmt.Errorf("missing closing quote for quoted field value %s", s)
|
|
}
|
|
// Try converting quoted string to number, since sometimes InfluxDB agents
|
|
// send numbers as strings.
|
|
s = s[1 : len(s)-1]
|
|
return fastfloat.ParseBestEffort(s), nil
|
|
}
|
|
ch := s[len(s)-1]
|
|
if ch == 'i' {
|
|
// Integer value
|
|
ss := s[:len(s)-1]
|
|
n, err := fastfloat.ParseInt64(ss)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
return float64(n), nil
|
|
}
|
|
if ch == 'u' {
|
|
// Unsigned integer value
|
|
ss := s[:len(s)-1]
|
|
n, err := fastfloat.ParseUint64(ss)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
return float64(n), nil
|
|
}
|
|
if s == "t" || s == "T" || s == "true" || s == "True" || s == "TRUE" {
|
|
return 1, nil
|
|
}
|
|
if s == "f" || s == "F" || s == "false" || s == "False" || s == "FALSE" {
|
|
return 0, nil
|
|
}
|
|
return fastfloat.ParseBestEffort(s), nil
|
|
}
|
|
|
|
func nextUnescapedChar(s string, ch byte, noEscapeChars bool) int {
|
|
if noEscapeChars {
|
|
// Fast path: just search for ch in s, since s has no escape chars.
|
|
return strings.IndexByte(s, ch)
|
|
}
|
|
|
|
sOrig := s
|
|
again:
|
|
n := strings.IndexByte(s, ch)
|
|
if n < 0 {
|
|
return -1
|
|
}
|
|
if n == 0 {
|
|
return len(sOrig) - len(s) + n
|
|
}
|
|
if s[n-1] != '\\' {
|
|
return len(sOrig) - len(s) + n
|
|
}
|
|
nOrig := n
|
|
slashes := 0
|
|
for n > 0 && s[n-1] == '\\' {
|
|
slashes++
|
|
n--
|
|
}
|
|
if slashes&1 == 0 {
|
|
return len(sOrig) - len(s) + nOrig
|
|
}
|
|
s = s[nOrig+1:]
|
|
goto again
|
|
}
|
|
|
|
func nextUnquotedChar(s string, ch byte, noEscapeChars, hasQuotedFields bool) int {
|
|
if !hasQuotedFields {
|
|
return nextUnescapedChar(s, ch, noEscapeChars)
|
|
}
|
|
sOrig := s
|
|
for {
|
|
n := nextUnescapedChar(s, ch, noEscapeChars)
|
|
if n < 0 {
|
|
return -1
|
|
}
|
|
if !isInQuote(s[:n], noEscapeChars) {
|
|
return n + len(sOrig) - len(s)
|
|
}
|
|
s = s[n+1:]
|
|
n = nextUnescapedChar(s, '"', noEscapeChars)
|
|
if n < 0 {
|
|
return -1
|
|
}
|
|
s = s[n+1:]
|
|
}
|
|
}
|
|
|
|
func isInQuote(s string, noEscapeChars bool) bool {
|
|
isQuote := false
|
|
for {
|
|
n := nextUnescapedChar(s, '"', noEscapeChars)
|
|
if n < 0 {
|
|
return isQuote
|
|
}
|
|
isQuote = !isQuote
|
|
s = s[n+1:]
|
|
}
|
|
}
|
|
|
|
func stripLeadingWhitespace(s string) string {
|
|
for len(s) > 0 && s[0] == ' ' {
|
|
s = s[1:]
|
|
}
|
|
return s
|
|
}
|