app/{vmagent,vminsert}: add support for importing csv data via /api/v1/import/csv

This commit is contained in:
Aliaksandr Valialkin 2020-03-10 19:35:58 +02:00
parent 7545784a49
commit 2f0a36044c
14 changed files with 1277 additions and 11 deletions

View file

@ -21,10 +21,11 @@ to `vmagent` (like the ability to push metrics instead of pulling them). We did
* Can add, remove and modify labels via Prometheus relabeling. See [these docs](#relabeling) for details.
* Accepts data via all the ingestion protocols supported by VictoriaMetrics:
* Influx line protocol via `http://<vmagent>:8429/write`. See [these docs](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/README.md#how-to-send-data-from-influxdb-compatible-agents-such-as-telegraf).
* JSON lines import protocol via `http://<vmagent>:8429/api/v1/import`. See [these docs](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/README.md#how-to-import-time-series-data).
* Graphite plaintext protocol if `-graphiteListenAddr` command-line flag is set. See [these docs](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/README.md#how-to-send-data-from-graphite-compatible-agents-such-as-statsd).
* OpenTSDB telnet and http protocols if `-opentsdbListenAddr` command-line flag is set. See [these docs](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/README.md#how-to-send-data-from-opentsdb-compatible-agents).
* Prometheus remote write protocol via `http://<vmagent>:8429/api/v1/write`.
* JSON lines import protocol via `http://<vmagent>:8429/api/v1/import`. See [these docs](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/README.md#how-to-import-time-series-data).
* Arbitrary CSV data via `http://<vmagent>:8429/api/v1/import/csv`. See [these docs](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/README.md#how-to-import-csv-data).
* Can replicate collected metrics simultaneously to multiple remote storage systems.
* Works in environments with unstable connections to remote storage. If the remote storage is unavailable, the collected metrics
are buffered at `-remoteWrite.tmpDataPath`. The buffered metrics are sent to remote storage as soon as connection
@ -53,7 +54,7 @@ If you need collecting only Influx data, then the following command line would b
/path/to/vmagent -remoteWrite.url=https://victoria-metrics-host:8428/api/v1/write
```
Then send Influx data to `http://vmagent-host:8429/write`. See [these docs](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/README.md#how-to-send-data-from-influxdb-compatible-agents-such-as-telegraf) for more details.
Then send Influx data to `http://vmagent-host:8429`. See [these docs](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/README.md#how-to-send-data-from-influxdb-compatible-agents-such-as-telegraf) for more details.
`vmagent` is also available in [docker images](https://hub.docker.com/r/victoriametrics/vmagent/).

View file

@ -0,0 +1,63 @@
package csvimport
import (
"net/http"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/common"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/remotewrite"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/csvimport"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
"github.com/VictoriaMetrics/metrics"
)
var (
rowsInserted = metrics.NewCounter(`vmagent_rows_inserted_total{type="csvimport"}`)
rowsPerInsert = metrics.NewHistogram(`vmagent_rows_per_insert{type="csvimport"}`)
)
// InsertHandler processes csv data from req.
func InsertHandler(req *http.Request) error {
return writeconcurrencylimiter.Do(func() error {
return parser.ParseStream(req, insertRows)
})
}
func insertRows(rows []parser.Row) error {
ctx := common.GetPushCtx()
defer common.PutPushCtx(ctx)
tssDst := ctx.WriteRequest.Timeseries[:0]
labels := ctx.Labels[:0]
samples := ctx.Samples[:0]
for i := range rows {
r := &rows[i]
labelsLen := len(labels)
labels = append(labels, prompbmarshal.Label{
Name: "__name__",
Value: r.Metric,
})
for j := range r.Tags {
tag := &r.Tags[j]
labels = append(labels, prompbmarshal.Label{
Name: tag.Key,
Value: tag.Value,
})
}
samples = append(samples, prompbmarshal.Sample{
Value: r.Value,
Timestamp: r.Timestamp,
})
tssDst = append(tssDst, prompbmarshal.TimeSeries{
Labels: labels[labelsLen:],
Samples: samples[len(samples)-1:],
})
}
ctx.WriteRequest.Timeseries = tssDst
ctx.Labels = labels
ctx.Samples = samples
remotewrite.Push(&ctx.WriteRequest)
rowsInserted.Add(len(rows))
rowsPerInsert.Update(float64(len(rows)))
return nil
}

View file

@ -7,6 +7,7 @@ import (
"strings"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/csvimport"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/graphite"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/influx"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/opentsdb"
@ -127,6 +128,15 @@ func requestHandler(w http.ResponseWriter, r *http.Request) bool {
}
w.WriteHeader(http.StatusNoContent)
return true
case "/api/v1/import/csv":
csvimportRequests.Inc()
if err := csvimport.InsertHandler(r); err != nil {
csvimportErrors.Inc()
httpserver.Errorf(w, "error in %q: %s", r.URL.Path, err)
return true
}
w.WriteHeader(http.StatusNoContent)
return true
case "/write", "/api/v2/write":
influxWriteRequests.Inc()
if err := influx.InsertHandlerForHTTP(r); err != nil {
@ -152,11 +162,14 @@ func requestHandler(w http.ResponseWriter, r *http.Request) bool {
}
var (
prometheusWriteRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/api/v1/write", protocol="prometheus"}`)
prometheusWriteErrors = metrics.NewCounter(`vmagent_http_request_errors_total{path="/api/v1/write", protocol="prometheus"}`)
prometheusWriteRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/api/v1/write", protocol="promremotewrite"}`)
prometheusWriteErrors = metrics.NewCounter(`vmagent_http_request_errors_total{path="/api/v1/write", protocol="promremotewrite"}`)
vmimportRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/api/v1/import", protocol="vm"}`)
vmimportErrors = metrics.NewCounter(`vmagent_http_request_errors_total{path="/api/v1/import", protocol="vm"}`)
vmimportRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/api/v1/import", protocol="vmimport"}`)
vmimportErrors = metrics.NewCounter(`vmagent_http_request_errors_total{path="/api/v1/import", protocol="vmimport"}`)
csvimportRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/api/v1/import/csv", protocol="csvimport"}`)
csvimportErrors = metrics.NewCounter(`vmagent_http_request_errors_total{path="/api/v1/import/csv", protocol="csvimport"}`)
influxWriteRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/write", protocol="influx"}`)
influxWriteErrors = metrics.NewCounter(`vmagent_http_request_errors_total{path="/write", protocol="influx"}`)

View file

@ -0,0 +1,48 @@
package csvimport
import (
"net/http"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/netstorage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/csvimport"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
"github.com/VictoriaMetrics/metrics"
)
var (
rowsInserted = tenantmetrics.NewCounterMap(`vm_rows_inserted_total{type="csvimport"}`)
rowsPerInsert = metrics.NewHistogram(`vm_rows_per_insert{type="csvimport"}`)
)
// InsertHandler processes /api/v1/import/csv requests.
func InsertHandler(at *auth.Token, req *http.Request) error {
return writeconcurrencylimiter.Do(func() error {
return parser.ParseStream(req, func(rows []parser.Row) error {
return insertRows(at, rows)
})
})
}
func insertRows(at *auth.Token, rows []parser.Row) error {
ctx := netstorage.GetInsertCtx()
defer netstorage.PutInsertCtx(ctx)
ctx.Reset() // This line is required for initializing ctx internals.
for i := range rows {
r := &rows[i]
ctx.Labels = ctx.Labels[:0]
ctx.AddLabel("", r.Metric)
for j := range r.Tags {
tag := &r.Tags[j]
ctx.AddLabel(tag.Key, tag.Value)
}
if err := ctx.WriteDataPoint(at, ctx.Labels, r.Timestamp, r.Value); err != nil {
return err
}
}
rowsInserted.Get(at).Add(len(rows))
rowsPerInsert.Update(float64(len(rows)))
return ctx.FlushBufs()
}

View file

@ -7,6 +7,7 @@ import (
"net/http"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/csvimport"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/graphite"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/influx"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/netstorage"
@ -160,6 +161,15 @@ func requestHandler(w http.ResponseWriter, r *http.Request) bool {
}
w.WriteHeader(http.StatusNoContent)
return true
case "prometheus/api/v1/import/csv":
csvimportRequests.Inc()
if err := csvimport.InsertHandler(at, r); err != nil {
csvimportErrors.Inc()
httpserver.Errorf(w, "error in %q: %s", r.URL.Path, err)
return true
}
w.WriteHeader(http.StatusNoContent)
return true
case "influx/write", "influx/api/v2/write":
influxWriteRequests.Inc()
if err := influx.InsertHandlerForHTTP(at, r); err != nil {
@ -182,11 +192,14 @@ func requestHandler(w http.ResponseWriter, r *http.Request) bool {
}
var (
prometheusWriteRequests = metrics.NewCounter(`vm_http_requests_total{path="/insert/{}/prometheus/", protocol="prometheus"}`)
prometheusWriteErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/insert/{}/prometheus/", protocol="prometheus"}`)
prometheusWriteRequests = metrics.NewCounter(`vm_http_requests_total{path="/insert/{}/prometheus/", protocol="promremotewrite"}`)
prometheusWriteErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/insert/{}/prometheus/", protocol="promremotewrite"}`)
vmimportRequests = metrics.NewCounter(`vm_http_requests_total{path="/insert/{}/prometheus/api/v1/import", protocol="vm"}`)
vmimportErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/insert/{}/prometheus/api/v1/import", protocol="vm"}`)
vmimportRequests = metrics.NewCounter(`vm_http_requests_total{path="/insert/{}/prometheus/api/v1/import", protocol="vmimport"}`)
vmimportErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/insert/{}/prometheus/api/v1/import", protocol="vmimport"}`)
csvimportRequests = metrics.NewCounter(`vm_http_requests_total{path="/insert/{}/prometheus/api/v1/import/csv", protocol="csvimport"}`)
csvimportErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/insert/{}/prometheus/api/v1/import/csv", protocol="csvimport"}`)
influxWriteRequests = metrics.NewCounter(`vm_http_requests_total{path="/insert/{}/influx/", protocol="influx"}`)
influxWriteErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/insert/{}/influx/", protocol="influx"}`)

View file

@ -52,7 +52,8 @@ Cluster version is available [here](https://github.com/VictoriaMetrics/VictoriaM
if `-graphiteListenAddr` is set.
* [OpenTSDB put message](#sending-data-via-telnet-put-protocol) if `-opentsdbListenAddr` is set.
* [HTTP OpenTSDB /api/put requests](#sending-opentsdb-data-via-http-apiput-requests) if `-opentsdbHTTPListenAddr` is set.
* [/api/v1/import](#how-to-import-time-series-data)
* [/api/v1/import](#how-to-import-time-series-data).
* [Arbitrary CSV data](#how-to-import-csv-data).
* Ideally works with big amounts of time series data from Kubernetes, IoT sensors, connected cars, industrial telemetry, financial data and various Enterprise workloads.
* Has open source [cluster version](https://github.com/VictoriaMetrics/VictoriaMetrics/tree/cluster).
* See also technical [Articles about VictoriaMetrics](https://github.com/VictoriaMetrics/VictoriaMetrics/wiki/Articles).
@ -415,6 +416,55 @@ The `/api/v1/export` endpoint should return the following response:
{"metric":{"__name__":"x.y.z","t1":"v1","t2":"v2"},"values":[45.34],"timestamps":[1566464763000]}
```
### How to import CSV data
Arbitrary CSV data can be imported via `/api/v1/import/csv`. The CSV data is imported according to the provided `format` query arg.
The `format` query arg must contain comma-separated list of parsing rules for CSV fields. Each rule consists of three parts delimited by a colon:
```
<column_pos>:<type>:<context>
```
* `<column_pos>` is the position of the CSV column (field). Column numbering starts from 1. The order of parsing rules may be arbitrary.
* `<type>` describes the column type. Supported types are:
* `metric` - the corresponding CSV column at `<column_pos>` contains metric value. The metric name is read from the `<context>`.
CSV line must have at least a single metric field.
* `label` - the corresponding CSV column at `<column_pos>` contains label value. The label name is read from the `<context>`.
CSV line may have arbitrary number of label fields. All these fields are attached to all the configured metrics.
* `time` - the corresponding CSV column at `<column_pos>` contains metric time. CSV line may contain either one or zero columns with time.
If CSV line has no time, then the current time is used. The time is applied to all the configured metrics.
The format of the time is configured via `<context>`. Supported time formats are:
* `unix_s` - unix timestamp in seconds.
* `unix_ms` - unix timestamp in milliseconds.
* `unix_ns` - unix timestamp in nanoseconds. Note that VictoriaMetrics rounds the timestamp to milliseconds.
* `rfc3339` - timestamp in [RFC3339](https://tools.ietf.org/html/rfc3339) format, i.e. `2006-01-02T15:04:05Z`.
* `custom:<layout>` - custom layout for the timestamp. The `<layout>` may contain arbitrary time layout according to [time.Parse rules in Go](https://golang.org/pkg/time/#Parse).
Each request to `/api/v1/import/csv` can contain arbitrary number of CSV lines.
Example for importing CSV data via `/api/v1/import/csv`:
```bash
curl -d "GOOG,1.23,4.56,NYSE" 'http://localhost:8428/api/v1/import/csv?format=2:metric:ask,3:metric:bid,1:label:ticker,4:label:market'
curl -d "MSFT,3.21,1.67,NASDAQ" 'http://localhost:8428/api/v1/import/csv?format=2:metric:ask,3:metric:bid,1:label:ticker,4:label:market'
```
After that the data may be read via [/api/v1/export](#how-to-export-time-series) endpoint:
```bash
curl -G 'http://localhost:8428/api/v1/export' -d 'match[]={ticker!=""}'
```
The following response should be returned:
```bash
{"metric":{"__name__":"bid","market":"NASDAQ","ticker":"MSFT"},"values":[1.67],"timestamps":[1583865146520]}
{"metric":{"__name__":"bid","market":"NYSE","ticker":"GOOG"},"values":[4.56],"timestamps":[1583865146495]}
{"metric":{"__name__":"ask","market":"NASDAQ","ticker":"MSFT"},"values":[3.21],"timestamps":[1583865146520]}
{"metric":{"__name__":"ask","market":"NYSE","ticker":"GOOG"},"values":[1.23],"timestamps":[1583865146495]}
```
### Prometheus querying API usage
VictoriaMetrics supports the following handlers from [Prometheus querying API](https://prometheus.io/docs/prometheus/latest/querying/api/):
@ -590,6 +640,7 @@ Time series data can be imported via any supported ingestion protocol:
* [OpenTSDB telnet put protocol](#sending-data-via-telnet-put-protocol)
* [OpenTSDB http /api/put](#sending-opentsdb-data-via-http-apiput-requests)
* `/api/v1/import` http POST handler, which accepts data from [/api/v1/export](#how-to-export-time-series).
* `/api/v1/import/csv` http POST handler, which accepts CSV data. See [these docs](#how-to-import-csv-data) for details.
The most efficient protocol for importing data into VictoriaMetrics is `/api/v1/import`. Example for importing data obtained via `/api/v1/export`:

View file

@ -0,0 +1,172 @@
package csvimport
import (
"fmt"
"strconv"
"strings"
"time"
"github.com/valyala/fastjson/fastfloat"
)
// ColumnDescriptor represents parsing rules for a single csv column.
//
// The column is transformed to either timestamp, tag or metric value
// depending on the corresponding non-empty field.
//
// If all the fields are empty, then the given column is ignored.
type ColumnDescriptor struct {
// ParseTimestamp is set to a function, which is used for timestamp
// parsing from the given column.
ParseTimestamp func(s string) (int64, error)
// TagName is set to tag name for tag value, which should be obtained
// from the given column.
TagName string
// MetricName is set to metric name for value obtained from the given column.
MetricName string
}
const maxColumnsPerRow = 64 * 1024
// ParseColumnDescriptors parses column descriptors from s.
//
// s must have comma-separated list of the following entries:
//
// <column_pos>:<column_type>:<extension>
//
// Where:
//
// - <column_pos> is numeric csv column position. The first column has position 1.
// - <column_type> is one of the following types:
// - time - the corresponding column contains timestamp. Timestamp format is determined by <extension>. The following formats are supported:
// - unix_s - unix timestamp in seconds
// - unix_ms - unix timestamp in milliseconds
// - unix_ns - unix_timestamp in nanoseconds
// - rfc3339 - RFC3339 format in the form `2006-01-02T15:04:05Z07:00`
// - label - the corresponding column contains metric label with the name set in <extension>.
// - metric - the corresponding column contains metric value with the name set in <extension>.
//
// s must contain at least a single 'metric' column and no more than a single `time` column.
func ParseColumnDescriptors(s string) ([]ColumnDescriptor, error) {
m := make(map[int]ColumnDescriptor)
cols := strings.Split(s, ",")
hasValueCol := false
hasTimeCol := false
maxPos := 0
for i, col := range cols {
var cd ColumnDescriptor
a := strings.SplitN(col, ":", 3)
if len(a) != 3 {
return nil, fmt.Errorf("entry #%d must have the following form: <column_pos>:<column_type>:<extension>; got %q", i+1, a)
}
pos, err := strconv.Atoi(a[0])
if err != nil {
return nil, fmt.Errorf("cannot parse <column_pos> part from the entry #%d %q: %s", i+1, col, err)
}
if pos <= 0 {
return nil, fmt.Errorf("<column_pos> cannot be smaller than 1; got %d for entry #%d %q", pos, i+1, col)
}
if pos > maxColumnsPerRow {
return nil, fmt.Errorf("<column_pos> cannot be bigger than %d; got %d for entry #%d %q", maxColumnsPerRow, pos, i+1, col)
}
if pos > maxPos {
maxPos = pos
}
typ := a[1]
switch typ {
case "time":
if hasTimeCol {
return nil, fmt.Errorf("duplicate time column has been found at entry #%d %q for %q", i+1, col, s)
}
parseTimestamp, err := parseTimeFormat(a[2])
if err != nil {
return nil, fmt.Errorf("cannot parse time format from the entry #%d %q: %s", i+1, col, err)
}
cd.ParseTimestamp = parseTimestamp
hasTimeCol = true
case "label":
cd.TagName = a[2]
if len(cd.TagName) == 0 {
return nil, fmt.Errorf("label name cannot be empty in the entry #%d %q", i+1, col)
}
case "metric":
cd.MetricName = a[2]
if len(cd.MetricName) == 0 {
return nil, fmt.Errorf("metric name cannot be empty in the entry #%d %q", i+1, col)
}
hasValueCol = true
default:
return nil, fmt.Errorf("unknown <column_type>: %q; allowed values: time, metric, label", typ)
}
pos--
if _, ok := m[pos]; ok {
return nil, fmt.Errorf("duplicate <column_pos> %d for the entry #%d %q", pos, i+1, col)
}
m[pos] = cd
}
if !hasValueCol {
return nil, fmt.Errorf("missing 'metric' column in %q", s)
}
cds := make([]ColumnDescriptor, maxPos)
for pos, cd := range m {
cds[pos] = cd
}
return cds, nil
}
func parseTimeFormat(format string) (func(s string) (int64, error), error) {
if strings.HasPrefix(format, "custom:") {
format = format[len("custom:"):]
return newParseCustomTimeFunc(format), nil
}
switch format {
case "unix_s":
return parseUnixTimestampSeconds, nil
case "unix_ms":
return parseUnixTimestampMilliseconds, nil
case "unix_ns":
return parseUnixTimestampNanoseconds, nil
case "rfc3339":
return parseRFC3339, nil
default:
return nil, fmt.Errorf("unknown format for time parsing: %q; supported formats: unix_s, unix_ms, unix_ns, rfc3339", format)
}
}
func parseUnixTimestampSeconds(s string) (int64, error) {
n := fastfloat.ParseInt64BestEffort(s)
if n > int64(1<<63-1)/1e3 {
return 0, fmt.Errorf("too big unix timestamp in seconds: %d; must be smaller than %d", n, int64(1<<63-1)/1e3)
}
return n * 1e3, nil
}
func parseUnixTimestampMilliseconds(s string) (int64, error) {
n := fastfloat.ParseInt64BestEffort(s)
return n, nil
}
func parseUnixTimestampNanoseconds(s string) (int64, error) {
n := fastfloat.ParseInt64BestEffort(s)
return n / 1e6, nil
}
func parseRFC3339(s string) (int64, error) {
t, err := time.Parse(time.RFC3339, s)
if err != nil {
return 0, fmt.Errorf("cannot parse time in RFC3339 from %q: %s", s, err)
}
return t.UnixNano() / 1e6, nil
}
func newParseCustomTimeFunc(format string) func(s string) (int64, error) {
return func(s string) (int64, error) {
t, err := time.Parse(format, s)
if err != nil {
return 0, fmt.Errorf("cannot parse time in custom format %q from %q: %s", format, s, err)
}
return t.UnixNano() / 1e6, nil
}
}

View file

@ -0,0 +1,226 @@
package csvimport
import (
"bytes"
"fmt"
"reflect"
"testing"
"time"
"unsafe"
)
func TestParseColumnDescriptorsSuccess(t *testing.T) {
f := func(s string, cdsExpected []ColumnDescriptor) {
t.Helper()
cds, err := ParseColumnDescriptors(s)
if err != nil {
t.Fatalf("unexpected error on ParseColumnDescriptors(%q): %s", s, err)
}
if !equalColumnDescriptors(cds, cdsExpected) {
t.Fatalf("unexpected cds returned from ParseColumnDescriptors(%q);\ngot\n%v\nwant\n%v", s, cds, cdsExpected)
}
}
f("1:time:unix_s,3:metric:temperature", []ColumnDescriptor{
{
ParseTimestamp: parseUnixTimestampSeconds,
},
{},
{
MetricName: "temperature",
},
})
f("2:time:unix_ns,1:metric:temperature,3:label:city,4:label:country", []ColumnDescriptor{
{
MetricName: "temperature",
},
{
ParseTimestamp: parseUnixTimestampNanoseconds,
},
{
TagName: "city",
},
{
TagName: "country",
},
})
f("2:time:unix_ms,1:metric:temperature", []ColumnDescriptor{
{
MetricName: "temperature",
},
{
ParseTimestamp: parseUnixTimestampMilliseconds,
},
})
f("2:time:rfc3339,1:metric:temperature", []ColumnDescriptor{
{
MetricName: "temperature",
},
{
ParseTimestamp: parseRFC3339,
},
})
}
func TestParseColumnDescriptorsFailure(t *testing.T) {
f := func(s string) {
t.Helper()
cds, err := ParseColumnDescriptors(s)
if err == nil {
t.Fatalf("expecting non-nil error for ParseColumnDescriptors(%q)", s)
}
if cds != nil {
t.Fatalf("expecting nil cds; got %v", cds)
}
}
// Empty string
f("")
// Missing metric column
f("1:time:unix_s")
f("1:label:aaa")
// Invalid column number
f("foo:time:unix_s,bar:metric:temp")
f("0:metric:aaa")
f("-123:metric:aaa")
f(fmt.Sprintf("%d:metric:aaa", maxColumnsPerRow+10))
// Duplicate time column
f("1:time:unix_s,2:time:rfc3339,3:metric:aaa")
f("1:time:custom:2006,2:time:rfc3339,3:metric:aaa")
// Invalid time format
f("1:time:foobar,2:metric:aaa")
f("1:time:,2:metric:aaa")
f("1:time:sss:sss,2:metric:aaa")
// empty label name
f("2:label:,1:metric:aaa")
// Empty metric name
f("1:metric:")
// Unknown type
f("1:metric:aaa,2:aaaa:bbb")
// duplicate column number
f("1:metric:a,1:metric:b")
}
func TestParseUnixTimestampSeconds(t *testing.T) {
f := func(s string, tsExpected int64) {
t.Helper()
ts, err := parseUnixTimestampSeconds(s)
if err != nil {
t.Fatalf("unexpected error when parsing %q: %s", s, err)
}
if ts != tsExpected {
t.Fatalf("unexpected ts when parsing %q; got %d; want %d", s, ts, tsExpected)
}
}
f("0", 0)
f("123", 123000)
f("-123", -123000)
}
func TestParseUnixTimestampMilliseconds(t *testing.T) {
f := func(s string, tsExpected int64) {
t.Helper()
ts, err := parseUnixTimestampMilliseconds(s)
if err != nil {
t.Fatalf("unexpected error when parsing %q: %s", s, err)
}
if ts != tsExpected {
t.Fatalf("unexpected ts when parsing %q; got %d; want %d", s, ts, tsExpected)
}
}
f("0", 0)
f("123", 123)
f("-123", -123)
}
func TestParseUnixTimestampNanoseconds(t *testing.T) {
f := func(s string, tsExpected int64) {
t.Helper()
ts, err := parseUnixTimestampNanoseconds(s)
if err != nil {
t.Fatalf("unexpected error when parsing %q: %s", s, err)
}
if ts != tsExpected {
t.Fatalf("unexpected ts when parsing %q; got %d; want %d", s, ts, tsExpected)
}
}
f("0", 0)
f("123", 0)
f("12343567", 12)
f("-12343567", -12)
}
func TestParseRFC3339(t *testing.T) {
f := func(s string, tsExpected int64) {
t.Helper()
ts, err := parseRFC3339(s)
if err != nil {
t.Fatalf("unexpected error when parsing %q: %s", s, err)
}
if ts != tsExpected {
t.Fatalf("unexpected ts when parsing %q; got %d; want %d", s, ts, tsExpected)
}
}
f("2006-01-02T15:04:05Z", 1136214245000)
f("2020-03-11T18:23:46Z", 1583951026000)
}
func TestParseCustomTimeFunc(t *testing.T) {
f := func(format, s string, tsExpected int64) {
t.Helper()
f := newParseCustomTimeFunc(format)
ts, err := f(s)
if err != nil {
t.Fatalf("unexpected error when parsing %q: %s", s, err)
}
if ts != tsExpected {
t.Fatalf("unexpected ts when parsing %q; got %d; want %d", s, ts, tsExpected)
}
}
f(time.RFC1123, "Mon, 29 Oct 2018 07:50:37 GMT", 1540799437000)
f("2006-01-02 15:04:05.999Z", "2015-08-10 20:04:40.123Z", 1439237080123)
}
func equalColumnDescriptors(a, b []ColumnDescriptor) bool {
if len(a) != len(b) {
return false
}
for i, x := range a {
y := b[i]
if !equalColumnDescriptor(x, y) {
return false
}
}
return true
}
func equalColumnDescriptor(x, y ColumnDescriptor) bool {
sh1 := &reflect.SliceHeader{
Data: uintptr(unsafe.Pointer(&x.ParseTimestamp)),
Len: int(unsafe.Sizeof(x.ParseTimestamp)),
Cap: int(unsafe.Sizeof(x.ParseTimestamp)),
}
b1 := *(*[]byte)(unsafe.Pointer(sh1))
sh2 := &reflect.SliceHeader{
Data: uintptr(unsafe.Pointer(&y.ParseTimestamp)),
Len: int(unsafe.Sizeof(y.ParseTimestamp)),
Cap: int(unsafe.Sizeof(y.ParseTimestamp)),
}
b2 := *(*[]byte)(unsafe.Pointer(sh2))
if !bytes.Equal(b1, b2) {
return false
}
if x.TagName != y.TagName {
return false
}
if x.MetricName != y.MetricName {
return false
}
return true
}

View file

@ -0,0 +1,141 @@
package csvimport
import (
"fmt"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/valyala/fastjson/fastfloat"
)
// Rows represents csv rows.
type Rows struct {
// Rows contains parsed csv rows after the call to Unmarshal.
Rows []Row
sc scanner
tagsPool []Tag
metricsPool []metric
}
// Reset resets rs.
func (rs *Rows) Reset() {
rows := rs.Rows
for i := range rows {
r := &rows[i]
r.Metric = ""
r.Tags = nil
r.Value = 0
r.Timestamp = 0
}
rs.Rows = rs.Rows[:0]
rs.sc.Init("")
tags := rs.tagsPool
for i := range tags {
t := &tags[i]
t.Key = ""
t.Value = ""
}
rs.tagsPool = rs.tagsPool[:0]
metrics := rs.metricsPool
for i := range metrics {
m := &metrics[i]
m.Name = ""
m.Value = 0
}
rs.metricsPool = rs.metricsPool[:0]
}
// Row represents a single metric row
type Row struct {
Metric string
Tags []Tag
Value float64
Timestamp int64
}
// Tag represents metric tag
type Tag struct {
Key string
Value string
}
type metric struct {
Name string
Value float64
}
// Unmarshal unmarshal csv lines from s according to the given cds.
func (rs *Rows) Unmarshal(s string, cds []ColumnDescriptor) {
rs.sc.Init(s)
rs.Rows, rs.tagsPool, rs.metricsPool = parseRows(&rs.sc, rs.Rows[:0], rs.tagsPool[:0], rs.metricsPool[:0], cds)
}
func parseRows(sc *scanner, dst []Row, tags []Tag, metrics []metric, cds []ColumnDescriptor) ([]Row, []Tag, []metric) {
for sc.NextLine() {
line := sc.Line
var r Row
col := uint(0)
metrics = metrics[:0]
tagsLen := len(tags)
for sc.NextColumn() {
if col >= uint(len(cds)) {
// Skip superflouous column.
continue
}
cd := &cds[col]
col++
if parseTimestamp := cd.ParseTimestamp; parseTimestamp != nil {
timestamp, err := parseTimestamp(sc.Column)
if err != nil {
sc.Error = fmt.Errorf("cannot parse timestamp from %q: %s", sc.Column, err)
break
}
r.Timestamp = timestamp
continue
}
if tagName := cd.TagName; tagName != "" {
tags = append(tags, Tag{
Key: tagName,
Value: sc.Column,
})
continue
}
metricName := cd.MetricName
if metricName == "" {
// The given field is ignored.
continue
}
value := fastfloat.ParseBestEffort(sc.Column)
metrics = append(metrics, metric{
Name: metricName,
Value: value,
})
}
if col < uint(len(cds)) && sc.Error == nil {
sc.Error = fmt.Errorf("missing columns in the csv line %q; got %d columns; want at least %d columns", line, col, len(cds))
}
if sc.Error != nil {
logger.Errorf("error when parsing csv line %q: %s; skipping this line", line, sc.Error)
continue
}
if len(metrics) == 0 {
logger.Panicf("BUG: expecting at least a single metric in columnDescriptors=%#v", cds)
}
r.Metric = metrics[0].Name
r.Tags = tags[tagsLen:]
r.Value = metrics[0].Value
dst = append(dst, r)
for _, m := range metrics[1:] {
dst = append(dst, Row{
Metric: m.Name,
Tags: r.Tags,
Value: m.Value,
Timestamp: r.Timestamp,
})
}
}
return dst, tags, metrics
}

View file

@ -0,0 +1,171 @@
package csvimport
import (
"reflect"
"testing"
)
func TestRowsUnmarshalFailure(t *testing.T) {
f := func(format, s string) {
t.Helper()
cds, err := ParseColumnDescriptors(format)
if err != nil {
t.Fatalf("unexpected error when parsing %q: %s", format, err)
}
var rs Rows
rs.Unmarshal(s, cds)
if len(rs.Rows) != 0 {
t.Fatalf("unexpected rows unmarshaled: %#v", rs.Rows)
}
}
// Invalid timestamp
f("1:metric:foo,2:time:rfc3339", "234,foobar")
// Missing columns
f("3:metric:aaa", "123,456")
}
func TestRowsUnmarshalSuccess(t *testing.T) {
f := func(format, s string, rowsExpected []Row) {
t.Helper()
cds, err := ParseColumnDescriptors(format)
if err != nil {
t.Fatalf("unexpected error when parsing %q: %s", format, err)
}
var rs Rows
rs.Unmarshal(s, cds)
if !reflect.DeepEqual(rs.Rows, rowsExpected) {
t.Fatalf("unexpected rows;\ngot\n%v\nwant\n%v", rs.Rows, rowsExpected)
}
rs.Reset()
// Unmarshal rows the second time
rs.Unmarshal(s, cds)
if !reflect.DeepEqual(rs.Rows, rowsExpected) {
t.Fatalf("unexpected rows on the second unmarshal;\ngot\n%v\nwant\n%v", rs.Rows, rowsExpected)
}
}
f("1:metric:foo", "", nil)
f("1:metric:foo", `123`, []Row{
{
Metric: "foo",
Value: 123,
},
})
f("1:metric:foo,2:time:unix_s,3:label:foo,4:label:bar", `123,456,xxx,yy`, []Row{
{
Metric: "foo",
Tags: []Tag{
{
Key: "foo",
Value: "xxx",
},
{
Key: "bar",
Value: "yy",
},
},
Value: 123,
Timestamp: 456000,
},
})
// Multiple metrics
f("2:metric:bar,1:metric:foo,3:label:foo,4:label:bar,5:time:custom:2006-01-02 15:04:05.999Z",
`"2.34",5.6,"foo"",bar","aa",2015-08-10 20:04:40.123Z`, []Row{
{
Metric: "foo",
Tags: []Tag{
{
Key: "foo",
Value: "foo\",bar",
},
{
Key: "bar",
Value: "aa",
},
},
Value: 2.34,
Timestamp: 1439237080123,
},
{
Metric: "bar",
Tags: []Tag{
{
Key: "foo",
Value: "foo\",bar",
},
{
Key: "bar",
Value: "aa",
},
},
Value: 5.6,
Timestamp: 1439237080123,
},
})
f("2:label:symbol,3:time:custom:2006-01-02 15:04:05.999Z,4:metric:bid,5:metric:ask",
`
"aaa","AUDCAD","2015-08-10 00:00:01.000Z",0.9725,0.97273
"aaa","AUDCAD","2015-08-10 00:00:02.000Z",0.97253,0.97276
`, []Row{
{
Metric: "bid",
Tags: []Tag{
{
Key: "symbol",
Value: "AUDCAD",
},
},
Value: 0.9725,
Timestamp: 1439164801000,
},
{
Metric: "ask",
Tags: []Tag{
{
Key: "symbol",
Value: "AUDCAD",
},
},
Value: 0.97273,
Timestamp: 1439164801000,
},
{
Metric: "bid",
Tags: []Tag{
{
Key: "symbol",
Value: "AUDCAD",
},
},
Value: 0.97253,
Timestamp: 1439164802000,
},
{
Metric: "ask",
Tags: []Tag{
{
Key: "symbol",
Value: "AUDCAD",
},
},
Value: 0.97276,
Timestamp: 1439164802000,
},
})
// Superflouos columns
f("1:metric:foo", `123,456,foo,bar`, []Row{
{
Metric: "foo",
Value: 123,
},
})
f("2:metric:foo", `123,-45.6,foo,bar`, []Row{
{
Metric: "foo",
Value: -45.6,
},
})
}

View file

@ -0,0 +1,31 @@
package csvimport
import (
"fmt"
"testing"
)
func BenchmarkRowsUnmarshal(b *testing.B) {
cds, err := ParseColumnDescriptors("1:label:symbol,2:metric:bid,3:metric:ask,4:time:unix_ms")
if err != nil {
b.Fatalf("cannot parse column descriptors: %s", err)
}
s := `GOOG,123.456,789.234,1345678999003
GOOG,223.456,889.234,1345678939003
GOOG,323.456,989.234,1345678949003
MSFT,423.456,189.234,1345678959003
AMZN,523.456,189.234,1345678959005
`
const rowsExpected = 10
b.SetBytes(int64(len(s)))
b.ReportAllocs()
b.RunParallel(func(pb *testing.PB) {
var rs Rows
for pb.Next() {
rs.Unmarshal(s, cds)
if len(rs.Rows) != rowsExpected {
panic(fmt.Errorf("unexpected rows parsed; got %d; want %d; rows: %v", len(rs.Rows), rowsExpected, rs.Rows))
}
}
})
}

View file

@ -0,0 +1,127 @@
package csvimport
import (
"fmt"
"strings"
)
// scanner is csv scanner
type scanner struct {
// The line value read after the call to NextLine()
Line string
// The column value read after the call to NextColumn()
Column string
// Error may be set only on NextColumn call.
// It is cleared on NextLine call.
Error error
s string
}
// Init initializes sc with s
func (sc *scanner) Init(s string) {
sc.Line = ""
sc.Column = ""
sc.Error = nil
sc.s = s
}
// NextLine advances csv scanner to the next line and sets cs.Line to it.
//
// It clears sc.Error.
//
// false is returned if no more lines left in sc.s
func (sc *scanner) NextLine() bool {
s := sc.s
sc.Error = nil
for len(s) > 0 {
n := strings.IndexByte(s, '\n')
var line string
if n >= 0 {
line = trimTrailingSpace(s[:n])
s = s[n+1:]
} else {
line = trimTrailingSpace(s)
s = ""
}
sc.Line = line
sc.s = s
if len(line) > 0 {
return true
}
}
return false
}
// NextColumn advances sc.Line to the next Column and sets sc.Column to it.
//
// false is returned if no more columns left in sc.Line or if any error occurs.
// sc.Error is set to error in the case of error.
func (sc *scanner) NextColumn() bool {
s := sc.Line
if len(s) == 0 {
return false
}
if sc.Error != nil {
return false
}
if s[0] == '"' {
sc.Column, sc.Line, sc.Error = readQuotedField(s)
return sc.Error == nil
}
n := strings.IndexByte(s, ',')
if n >= 0 {
sc.Column = s[:n]
sc.Line = s[n+1:]
} else {
sc.Column = s
sc.Line = ""
}
return true
}
func trimTrailingSpace(s string) string {
if len(s) > 0 && s[len(s)-1] == '\r' {
return s[:len(s)-1]
}
return s
}
func readQuotedField(s string) (string, string, error) {
sOrig := s
if len(s) == 0 || s[0] != '"' {
return "", sOrig, fmt.Errorf("missing opening quote for %q", sOrig)
}
s = s[1:]
hasEscapedQuote := false
for {
n := strings.IndexByte(s, '"')
if n < 0 {
return "", sOrig, fmt.Errorf("missing closing quote for %q", sOrig)
}
s = s[n+1:]
if len(s) == 0 {
// The end of string found
return unquote(sOrig[1:len(sOrig)-1], hasEscapedQuote), "", nil
}
if s[0] == '"' {
// Take into account escaped quote
s = s[1:]
hasEscapedQuote = true
continue
}
if s[0] != ',' {
return "", sOrig, fmt.Errorf("missing comma after quoted field in %q", sOrig)
}
return unquote(sOrig[1:len(sOrig)-len(s)-1], hasEscapedQuote), s[1:], nil
}
}
func unquote(s string, hasEscapedQuote bool) string {
if !hasEscapedQuote {
return s
}
return strings.ReplaceAll(s, `""`, `"`)
}

View file

@ -0,0 +1,85 @@
package csvimport
import (
"testing"
)
func TestScannerSuccess(t *testing.T) {
var sc scanner
sc.Init("foo,bar\n\"aa,\"\"bb\",\"\"")
if !sc.NextLine() {
t.Fatalf("expecting the first line")
}
if sc.Line != "foo,bar" {
t.Fatalf("unexpected line; got %q; want %q", sc.Line, "foo,bar")
}
if !sc.NextColumn() {
t.Fatalf("expecting the first column")
}
if sc.Column != "foo" {
t.Fatalf("unexpected first column; got %q; want %q", sc.Column, "foo")
}
if !sc.NextColumn() {
t.Fatalf("expecting the second column")
}
if sc.Column != "bar" {
t.Fatalf("unexpected second column; got %q; want %q", sc.Column, "bar")
}
if sc.NextColumn() {
t.Fatalf("unexpected next column: %q", sc.Column)
}
if sc.Error != nil {
t.Fatalf("unexpected error: %s", sc.Error)
}
if !sc.NextLine() {
t.Fatalf("expecting the second line")
}
if sc.Line != "\"aa,\"\"bb\",\"\"" {
t.Fatalf("unexpected the second line; got %q; want %q", sc.Line, "\"aa,\"\"bb\",\"\"")
}
if !sc.NextColumn() {
t.Fatalf("expecting the first column on the second line")
}
if sc.Column != "aa,\"bb" {
t.Fatalf("unexpected column on the second line; got %q; want %q", sc.Column, "aa,\"bb")
}
if !sc.NextColumn() {
t.Fatalf("expecting the second column on the second line")
}
if sc.Column != "" {
t.Fatalf("unexpected column on the second line; got %q; want %q", sc.Column, "")
}
if sc.NextColumn() {
t.Fatalf("unexpected next column on the second line: %q", sc.Column)
}
if sc.Error != nil {
t.Fatalf("unexpected error: %s", sc.Error)
}
if sc.NextLine() {
t.Fatalf("unexpected next line: %q", sc.Line)
}
}
func TestScannerFailure(t *testing.T) {
f := func(s string) {
t.Helper()
var sc scanner
sc.Init(s)
for sc.NextLine() {
for sc.NextColumn() {
}
if sc.Error != nil {
if sc.NextColumn() {
t.Fatalf("unexpected NextColumn success after the error %v", sc.Error)
}
return
}
}
t.Fatalf("expecting at least a single error")
}
// Unclosed quote
f("foo\r\n\"bar,")
f(`"foo,"bar`)
f(`foo,"bar",""a`)
f(`foo,"bar","a""`)
}

View file

@ -0,0 +1,124 @@
package csvimport
import (
"fmt"
"io"
"net/http"
"runtime"
"sync"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
"github.com/VictoriaMetrics/metrics"
)
// ParseStream parses csv from req and calls callback for the parsed rows.
//
// The callback can be called multiple times for streamed data from req.
//
// callback shouldn't hold rows after returning.
func ParseStream(req *http.Request, callback func(rows []Row) error) error {
readCalls.Inc()
q := req.URL.Query()
format := q.Get("format")
cds, err := ParseColumnDescriptors(format)
if err != nil {
return fmt.Errorf("cannot parse the provided csv format: %s", err)
}
r := req.Body
if req.Header.Get("Content-Encoding") == "gzip" {
zr, err := common.GetGzipReader(r)
if err != nil {
return fmt.Errorf("cannot read gzipped csv data: %s", err)
}
defer common.PutGzipReader(zr)
r = zr
}
ctx := getStreamContext()
defer putStreamContext(ctx)
for ctx.Read(r, cds) {
if err := callback(ctx.Rows.Rows); err != nil {
return err
}
}
return ctx.Error()
}
func (ctx *streamContext) Read(r io.Reader, cds []ColumnDescriptor) bool {
if ctx.err != nil {
return false
}
ctx.reqBuf, ctx.tailBuf, ctx.err = common.ReadLinesBlock(r, ctx.reqBuf, ctx.tailBuf)
if ctx.err != nil {
if ctx.err != io.EOF {
readErrors.Inc()
ctx.err = fmt.Errorf("cannot read csv data: %s", ctx.err)
}
return false
}
ctx.Rows.Unmarshal(bytesutil.ToUnsafeString(ctx.reqBuf), cds)
rowsRead.Add(len(ctx.Rows.Rows))
// Set missing timestamps
currentTs := time.Now().UnixNano() / 1e6
for i := range ctx.Rows.Rows {
row := &ctx.Rows.Rows[i]
if row.Timestamp == 0 {
row.Timestamp = currentTs
}
}
return true
}
var (
readCalls = metrics.NewCounter(`vm_protoparser_read_calls_total{type="csvimport"}`)
readErrors = metrics.NewCounter(`vm_protoparser_read_errors_total{type="csvimport"}`)
rowsRead = metrics.NewCounter(`vm_protoparser_rows_read_total{type="csvimport"}`)
)
type streamContext struct {
Rows Rows
reqBuf []byte
tailBuf []byte
err error
}
func (ctx *streamContext) Error() error {
if ctx.err == io.EOF {
return nil
}
return ctx.err
}
func (ctx *streamContext) reset() {
ctx.Rows.Reset()
ctx.reqBuf = ctx.reqBuf[:0]
ctx.tailBuf = ctx.tailBuf[:0]
ctx.err = nil
}
func getStreamContext() *streamContext {
select {
case ctx := <-streamContextPoolCh:
return ctx
default:
if v := streamContextPool.Get(); v != nil {
return v.(*streamContext)
}
return &streamContext{}
}
}
func putStreamContext(ctx *streamContext) {
ctx.reset()
select {
case streamContextPoolCh <- ctx:
default:
streamContextPool.Put(ctx)
}
}
var streamContextPool sync.Pool
var streamContextPoolCh = make(chan *streamContext, runtime.GOMAXPROCS(-1))