mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-10 15:14:09 +00:00
cd87ca303f
Previously certain errors in timestamps and/or values could be silently skipped, which could lead to samples with zero values stored in the database. Updates https://github.com/VictoriaMetrics/vmctl/issues/25
181 lines
5.7 KiB
Go
181 lines
5.7 KiB
Go
package csvimport
|
|
|
|
import (
|
|
"fmt"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/valyala/fastjson/fastfloat"
|
|
)
|
|
|
|
// ColumnDescriptor represents parsing rules for a single csv column.
|
|
//
|
|
// The column is transformed to either timestamp, tag or metric value
|
|
// depending on the corresponding non-empty field.
|
|
//
|
|
// If all the fields are empty, then the given column is ignored.
|
|
type ColumnDescriptor struct {
|
|
// ParseTimestamp is set to a function, which is used for timestamp
|
|
// parsing from the given column.
|
|
ParseTimestamp func(s string) (int64, error)
|
|
|
|
// TagName is set to tag name for tag value, which should be obtained
|
|
// from the given column.
|
|
TagName string
|
|
|
|
// MetricName is set to metric name for value obtained from the given column.
|
|
MetricName string
|
|
}
|
|
|
|
const maxColumnsPerRow = 64 * 1024
|
|
|
|
// ParseColumnDescriptors parses column descriptors from s.
|
|
//
|
|
// s must have comma-separated list of the following entries:
|
|
//
|
|
// <column_pos>:<column_type>:<extension>
|
|
//
|
|
// Where:
|
|
//
|
|
// - <column_pos> is numeric csv column position. The first column has position 1.
|
|
// - <column_type> is one of the following types:
|
|
// - time - the corresponding column contains timestamp. Timestamp format is determined by <extension>. The following formats are supported:
|
|
// - unix_s - unix timestamp in seconds
|
|
// - unix_ms - unix timestamp in milliseconds
|
|
// - unix_ns - unix_timestamp in nanoseconds
|
|
// - rfc3339 - RFC3339 format in the form `2006-01-02T15:04:05Z07:00`
|
|
// - label - the corresponding column contains metric label with the name set in <extension>.
|
|
// - metric - the corresponding column contains metric value with the name set in <extension>.
|
|
//
|
|
// s must contain at least a single 'metric' column and no more than a single `time` column.
|
|
func ParseColumnDescriptors(s string) ([]ColumnDescriptor, error) {
|
|
m := make(map[int]ColumnDescriptor)
|
|
cols := strings.Split(s, ",")
|
|
hasValueCol := false
|
|
hasTimeCol := false
|
|
maxPos := 0
|
|
for i, col := range cols {
|
|
var cd ColumnDescriptor
|
|
a := strings.SplitN(col, ":", 3)
|
|
if len(a) != 3 {
|
|
return nil, fmt.Errorf("entry #%d must have the following form: <column_pos>:<column_type>:<extension>; got %q", i+1, a)
|
|
}
|
|
pos, err := strconv.Atoi(a[0])
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot parse <column_pos> part from the entry #%d %q: %w", i+1, col, err)
|
|
}
|
|
if pos <= 0 {
|
|
return nil, fmt.Errorf("<column_pos> cannot be smaller than 1; got %d for entry #%d %q", pos, i+1, col)
|
|
}
|
|
if pos > maxColumnsPerRow {
|
|
return nil, fmt.Errorf("<column_pos> cannot be bigger than %d; got %d for entry #%d %q", maxColumnsPerRow, pos, i+1, col)
|
|
}
|
|
if pos > maxPos {
|
|
maxPos = pos
|
|
}
|
|
typ := a[1]
|
|
switch typ {
|
|
case "time":
|
|
if hasTimeCol {
|
|
return nil, fmt.Errorf("duplicate time column has been found at entry #%d %q for %q", i+1, col, s)
|
|
}
|
|
parseTimestamp, err := parseTimeFormat(a[2])
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot parse time format from the entry #%d %q: %w", i+1, col, err)
|
|
}
|
|
cd.ParseTimestamp = parseTimestamp
|
|
hasTimeCol = true
|
|
case "label":
|
|
cd.TagName = a[2]
|
|
if len(cd.TagName) == 0 {
|
|
return nil, fmt.Errorf("label name cannot be empty in the entry #%d %q", i+1, col)
|
|
}
|
|
case "metric":
|
|
cd.MetricName = a[2]
|
|
if len(cd.MetricName) == 0 {
|
|
return nil, fmt.Errorf("metric name cannot be empty in the entry #%d %q", i+1, col)
|
|
}
|
|
hasValueCol = true
|
|
default:
|
|
return nil, fmt.Errorf("unknown <column_type>: %q; allowed values: time, metric, label", typ)
|
|
}
|
|
pos--
|
|
if _, ok := m[pos]; ok {
|
|
return nil, fmt.Errorf("duplicate <column_pos> %d for the entry #%d %q", pos, i+1, col)
|
|
}
|
|
m[pos] = cd
|
|
}
|
|
if !hasValueCol {
|
|
return nil, fmt.Errorf("missing 'metric' column in %q", s)
|
|
}
|
|
cds := make([]ColumnDescriptor, maxPos)
|
|
for pos, cd := range m {
|
|
cds[pos] = cd
|
|
}
|
|
return cds, nil
|
|
}
|
|
|
|
func parseTimeFormat(format string) (func(s string) (int64, error), error) {
|
|
if strings.HasPrefix(format, "custom:") {
|
|
format = format[len("custom:"):]
|
|
return newParseCustomTimeFunc(format), nil
|
|
}
|
|
switch format {
|
|
case "unix_s":
|
|
return parseUnixTimestampSeconds, nil
|
|
case "unix_ms":
|
|
return parseUnixTimestampMilliseconds, nil
|
|
case "unix_ns":
|
|
return parseUnixTimestampNanoseconds, nil
|
|
case "rfc3339":
|
|
return parseRFC3339, nil
|
|
default:
|
|
return nil, fmt.Errorf("unknown format for time parsing: %q; supported formats: unix_s, unix_ms, unix_ns, rfc3339", format)
|
|
}
|
|
}
|
|
|
|
func parseUnixTimestampSeconds(s string) (int64, error) {
|
|
n, err := fastfloat.ParseInt64(s)
|
|
if err != nil {
|
|
return 0, fmt.Errorf("cannot parse timestamp seconds from %q: %w", s, err)
|
|
}
|
|
if n > int64(1<<63-1)/1e3 {
|
|
return 0, fmt.Errorf("too big unix timestamp in seconds: %d; must be smaller than %d", n, int64(1<<63-1)/1e3)
|
|
}
|
|
return n * 1e3, nil
|
|
}
|
|
|
|
func parseUnixTimestampMilliseconds(s string) (int64, error) {
|
|
n, err := fastfloat.ParseInt64(s)
|
|
if err != nil {
|
|
return 0, fmt.Errorf("cannot parse timestamp milliseconds from %q: %w", s, err)
|
|
}
|
|
return n, nil
|
|
}
|
|
|
|
func parseUnixTimestampNanoseconds(s string) (int64, error) {
|
|
n, err := fastfloat.ParseInt64(s)
|
|
if err != nil {
|
|
return 0, fmt.Errorf("cannot parse timestamp nanoseconds from %q: %w", s, err)
|
|
}
|
|
return n / 1e6, nil
|
|
}
|
|
|
|
func parseRFC3339(s string) (int64, error) {
|
|
t, err := time.Parse(time.RFC3339, s)
|
|
if err != nil {
|
|
return 0, fmt.Errorf("cannot parse time in RFC3339 from %q: %w", s, err)
|
|
}
|
|
return t.UnixNano() / 1e6, nil
|
|
}
|
|
|
|
func newParseCustomTimeFunc(format string) func(s string) (int64, error) {
|
|
return func(s string) (int64, error) {
|
|
t, err := time.Parse(format, s)
|
|
if err != nil {
|
|
return 0, fmt.Errorf("cannot parse time in custom format %q from %q: %w", format, s, err)
|
|
}
|
|
return t.UnixNano() / 1e6, nil
|
|
}
|
|
}
|