VictoriaMetrics/lib/protoparser/influx/parser.go
Andrii Chubatiuk daa7183749
lib/protoparser/influx: enable batch processing by default (#7165)
### Describe Your Changes

Fixes https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7090

### Checklist

The following checks are **mandatory**:

- [ ] My change adheres [VictoriaMetrics contributing
guidelines](https://docs.victoriametrics.com/contributing/).

---------

Signed-off-by: hagen1778 <roman@victoriametrics.com>
Co-authored-by: hagen1778 <roman@victoriametrics.com>
2024-10-15 11:48:40 +02:00

434 lines
9.9 KiB
Go

package influx
import (
"fmt"
"strings"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/metrics"
"github.com/valyala/fastjson/fastfloat"
)
// Rows contains parsed influx rows.
type Rows struct {
Rows []Row
IgnoreErrs bool
tagsPool []Tag
fieldsPool []Field
}
// Reset resets rs.
func (rs *Rows) Reset() {
// Reset rows, tags and fields in order to remove references to old data,
// so GC could collect it.
for i := range rs.Rows {
rs.Rows[i].reset()
}
rs.Rows = rs.Rows[:0]
for i := range rs.tagsPool {
rs.tagsPool[i].reset()
}
rs.tagsPool = rs.tagsPool[:0]
for i := range rs.fieldsPool {
rs.fieldsPool[i].reset()
}
rs.fieldsPool = rs.fieldsPool[:0]
}
// Unmarshal unmarshals influx line protocol rows from s.
//
// See https://docs.influxdata.com/influxdb/v1.7/write_protocols/line_protocol_tutorial/
//
// s shouldn't be modified when rs is in use.
func (rs *Rows) Unmarshal(s string) error {
rs.reset()
return rs.unmarshal(s)
}
func (rs *Rows) reset() {
rs.Rows = rs.Rows[:0]
rs.tagsPool = rs.tagsPool[:0]
rs.fieldsPool = rs.fieldsPool[:0]
}
// Row is a single influx row.
type Row struct {
Measurement string
Tags []Tag
Fields []Field
Timestamp int64
}
func (r *Row) reset() {
r.Measurement = ""
r.Tags = nil
r.Fields = nil
r.Timestamp = 0
}
func (r *Row) unmarshal(s string, tagsPool []Tag, fieldsPool []Field, noEscapeChars bool) ([]Tag, []Field, error) {
r.reset()
n := nextUnescapedChar(s, ' ', noEscapeChars)
if n < 0 {
return tagsPool, fieldsPool, fmt.Errorf("cannot find Whitespace I in %q", s)
}
measurementTags := s[:n]
s = stripLeadingWhitespace(s[n+1:])
// Parse measurement and tags
var err error
n = nextUnescapedChar(measurementTags, ',', noEscapeChars)
if n >= 0 {
tagsStart := len(tagsPool)
tagsPool, err = unmarshalTags(tagsPool, measurementTags[n+1:], noEscapeChars)
if err != nil {
return tagsPool, fieldsPool, err
}
tags := tagsPool[tagsStart:]
r.Tags = tags[:len(tags):len(tags)]
measurementTags = measurementTags[:n]
}
r.Measurement = unescapeTagValue(measurementTags, noEscapeChars)
// Allow empty r.Measurement. In this case metric name is constructed directly from field keys.
// Parse fields
fieldsStart := len(fieldsPool)
hasQuotedFields := nextUnescapedChar(s, '"', noEscapeChars) >= 0
n = nextUnquotedChar(s, ' ', noEscapeChars, hasQuotedFields)
if n < 0 {
// No timestamp.
fieldsPool, err = unmarshalInfluxFields(fieldsPool, s, noEscapeChars, hasQuotedFields)
if err != nil {
return tagsPool, fieldsPool, err
}
fields := fieldsPool[fieldsStart:]
r.Fields = fields[:len(fields):len(fields)]
return tagsPool, fieldsPool, nil
}
fieldsPool, err = unmarshalInfluxFields(fieldsPool, s[:n], noEscapeChars, hasQuotedFields)
if err != nil {
if strings.HasPrefix(s[n+1:], "HTTP/") {
return tagsPool, fieldsPool, fmt.Errorf("please switch from tcp to http protocol for data ingestion; " +
"do not set `-influxListenAddr` command-line flag, since it is needed for tcp protocol only")
}
return tagsPool, fieldsPool, err
}
r.Fields = fieldsPool[fieldsStart:]
s = stripLeadingWhitespace(s[n+1:])
// Parse timestamp
timestamp, err := fastfloat.ParseInt64(s)
if err != nil {
if strings.HasPrefix(s, "HTTP/") {
return tagsPool, fieldsPool, fmt.Errorf("please switch from tcp to http protocol for data ingestion; " +
"do not set `-influxListenAddr` command-line flag, since it is needed for tcp protocol only")
}
return tagsPool, fieldsPool, fmt.Errorf("cannot parse timestamp %q: %w", s, err)
}
r.Timestamp = timestamp
return tagsPool, fieldsPool, nil
}
// Tag represents influx tag.
type Tag struct {
Key string
Value string
}
func (tag *Tag) reset() {
tag.Key = ""
tag.Value = ""
}
func (tag *Tag) unmarshal(s string, noEscapeChars bool) error {
tag.reset()
n := nextUnescapedChar(s, '=', noEscapeChars)
if n < 0 {
return fmt.Errorf("missing tag value for %q", s)
}
tag.Key = unescapeTagValue(s[:n], noEscapeChars)
tag.Value = unescapeTagValue(s[n+1:], noEscapeChars)
return nil
}
// Field represents influx field.
type Field struct {
Key string
Value float64
}
func (f *Field) reset() {
f.Key = ""
f.Value = 0
}
func (f *Field) unmarshal(s string, noEscapeChars, hasQuotedFields bool) error {
f.reset()
n := nextUnescapedChar(s, '=', noEscapeChars)
if n < 0 {
return fmt.Errorf("missing field value for %q", s)
}
f.Key = unescapeTagValue(s[:n], noEscapeChars)
if len(f.Key) == 0 {
return fmt.Errorf("field key cannot be empty")
}
v, err := parseFieldValue(s[n+1:], hasQuotedFields)
if err != nil {
return fmt.Errorf("cannot parse field value for %q: %w", f.Key, err)
}
f.Value = v
return nil
}
func (rs *Rows) unmarshal(s string) error {
noEscapeChars := strings.IndexByte(s, '\\') < 0
for len(s) > 0 {
n := strings.IndexByte(s, '\n')
if n < 0 {
// The last line.
n = len(s)
}
err := rs.unmarshalRow(s[:n], noEscapeChars)
if err != nil {
if !rs.IgnoreErrs {
return fmt.Errorf("incorrect influx line %q: %w", s, err)
}
logger.Errorf("skipping InfluxDB line %q because of error: %s", s, err)
invalidLines.Inc()
}
if len(s) == n {
return nil
}
s = s[n+1:]
}
return nil
}
func (rs *Rows) unmarshalRow(s string, noEscapeChars bool) error {
if len(s) > 0 && s[len(s)-1] == '\r' {
s = s[:len(s)-1]
}
if len(s) == 0 {
// Skip empty line
return nil
}
if s[0] == '#' {
// Skip comment
return nil
}
if cap(rs.Rows) > len(rs.Rows) {
rs.Rows = rs.Rows[:len(rs.Rows)+1]
} else {
rs.Rows = append(rs.Rows, Row{})
}
r := &rs.Rows[len(rs.Rows)-1]
var err error
rs.tagsPool, rs.fieldsPool, err = r.unmarshal(s, rs.tagsPool, rs.fieldsPool, noEscapeChars)
if err != nil {
rs.Rows = rs.Rows[:len(rs.Rows)-1]
}
return err
}
var invalidLines = metrics.NewCounter(`vm_rows_invalid_total{type="influx"}`)
func unmarshalTags(dst []Tag, s string, noEscapeChars bool) ([]Tag, error) {
for {
if cap(dst) > len(dst) {
dst = dst[:len(dst)+1]
} else {
dst = append(dst, Tag{})
}
tag := &dst[len(dst)-1]
n := nextUnescapedChar(s, ',', noEscapeChars)
if n < 0 {
if err := tag.unmarshal(s, noEscapeChars); err != nil {
return dst[:len(dst)-1], err
}
if len(tag.Key) == 0 || len(tag.Value) == 0 {
// Skip empty tag
dst = dst[:len(dst)-1]
}
return dst, nil
}
if err := tag.unmarshal(s[:n], noEscapeChars); err != nil {
return dst[:len(dst)-1], err
}
s = s[n+1:]
if len(tag.Key) == 0 || len(tag.Value) == 0 {
// Skip empty tag
dst = dst[:len(dst)-1]
}
}
}
func unmarshalInfluxFields(dst []Field, s string, noEscapeChars, hasQuotedFields bool) ([]Field, error) {
for {
if cap(dst) > len(dst) {
dst = dst[:len(dst)+1]
} else {
dst = append(dst, Field{})
}
f := &dst[len(dst)-1]
n := nextUnquotedChar(s, ',', noEscapeChars, hasQuotedFields)
if n < 0 {
if err := f.unmarshal(s, noEscapeChars, hasQuotedFields); err != nil {
return dst, err
}
return dst, nil
}
if err := f.unmarshal(s[:n], noEscapeChars, hasQuotedFields); err != nil {
return dst, err
}
s = s[n+1:]
}
}
func unescapeTagValue(s string, noEscapeChars bool) string {
if noEscapeChars {
// Fast path - no escape chars.
return s
}
n := strings.IndexByte(s, '\\')
if n < 0 {
return s
}
// Slow path. Remove escape chars.
dst := make([]byte, 0, len(s))
for {
dst = append(dst, s[:n]...)
s = s[n+1:]
if len(s) == 0 {
return string(append(dst, '\\'))
}
ch := s[0]
if ch != ' ' && ch != ',' && ch != '=' && ch != '\\' {
dst = append(dst, '\\')
}
dst = append(dst, ch)
s = s[1:]
n = strings.IndexByte(s, '\\')
if n < 0 {
return string(append(dst, s...))
}
}
}
func parseFieldValue(s string, hasQuotedFields bool) (float64, error) {
if len(s) == 0 {
return 0, fmt.Errorf("field value cannot be empty")
}
if hasQuotedFields && s[0] == '"' {
if len(s) < 2 || s[len(s)-1] != '"' {
return 0, fmt.Errorf("missing closing quote for quoted field value %s", s)
}
// Try converting quoted string to number, since sometimes InfluxDB agents
// send numbers as strings.
s = s[1 : len(s)-1]
return fastfloat.ParseBestEffort(s), nil
}
ch := s[len(s)-1]
if ch == 'i' {
// Integer value
ss := s[:len(s)-1]
n, err := fastfloat.ParseInt64(ss)
if err != nil {
return 0, err
}
return float64(n), nil
}
if ch == 'u' {
// Unsigned integer value
ss := s[:len(s)-1]
n, err := fastfloat.ParseUint64(ss)
if err != nil {
return 0, err
}
return float64(n), nil
}
if s == "t" || s == "T" || s == "true" || s == "True" || s == "TRUE" {
return 1, nil
}
if s == "f" || s == "F" || s == "false" || s == "False" || s == "FALSE" {
return 0, nil
}
return fastfloat.ParseBestEffort(s), nil
}
func nextUnescapedChar(s string, ch byte, noEscapeChars bool) int {
if noEscapeChars {
// Fast path: just search for ch in s, since s has no escape chars.
return strings.IndexByte(s, ch)
}
sOrig := s
again:
n := strings.IndexByte(s, ch)
if n < 0 {
return -1
}
if n == 0 {
return len(sOrig) - len(s) + n
}
if s[n-1] != '\\' {
return len(sOrig) - len(s) + n
}
nOrig := n
slashes := 0
for n > 0 && s[n-1] == '\\' {
slashes++
n--
}
if slashes&1 == 0 {
return len(sOrig) - len(s) + nOrig
}
s = s[nOrig+1:]
goto again
}
func nextUnquotedChar(s string, ch byte, noEscapeChars, hasQuotedFields bool) int {
if !hasQuotedFields {
return nextUnescapedChar(s, ch, noEscapeChars)
}
sOrig := s
for {
n := nextUnescapedChar(s, ch, noEscapeChars)
if n < 0 {
return -1
}
if !isInQuote(s[:n], noEscapeChars) {
return n + len(sOrig) - len(s)
}
s = s[n+1:]
n = nextUnescapedChar(s, '"', noEscapeChars)
if n < 0 {
return -1
}
s = s[n+1:]
}
}
func isInQuote(s string, noEscapeChars bool) bool {
isQuote := false
for {
n := nextUnescapedChar(s, '"', noEscapeChars)
if n < 0 {
return isQuote
}
isQuote = !isQuote
s = s[n+1:]
}
}
func stripLeadingWhitespace(s string) string {
for len(s) > 0 && s[0] == ' ' {
s = s[1:]
}
return s
}