mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
app/{vmagent,vminsert}: add support for importing csv data via /api/v1/import/csv
This commit is contained in:
parent
49d7cb1a3f
commit
1fe66fb3cc
15 changed files with 1325 additions and 12 deletions
53
README.md
53
README.md
|
@ -62,7 +62,8 @@ Cluster version is available [here](https://github.com/VictoriaMetrics/VictoriaM
|
||||||
if `-graphiteListenAddr` is set.
|
if `-graphiteListenAddr` is set.
|
||||||
* [OpenTSDB put message](#sending-data-via-telnet-put-protocol) if `-opentsdbListenAddr` is set.
|
* [OpenTSDB put message](#sending-data-via-telnet-put-protocol) if `-opentsdbListenAddr` is set.
|
||||||
* [HTTP OpenTSDB /api/put requests](#sending-opentsdb-data-via-http-apiput-requests) if `-opentsdbHTTPListenAddr` is set.
|
* [HTTP OpenTSDB /api/put requests](#sending-opentsdb-data-via-http-apiput-requests) if `-opentsdbHTTPListenAddr` is set.
|
||||||
* [/api/v1/import](#how-to-import-time-series-data)
|
* [/api/v1/import](#how-to-import-time-series-data).
|
||||||
|
* [Arbitrary CSV data](#how-to-import-csv-data).
|
||||||
* Ideally works with big amounts of time series data from Kubernetes, IoT sensors, connected cars, industrial telemetry, financial data and various Enterprise workloads.
|
* Ideally works with big amounts of time series data from Kubernetes, IoT sensors, connected cars, industrial telemetry, financial data and various Enterprise workloads.
|
||||||
* Has open source [cluster version](https://github.com/VictoriaMetrics/VictoriaMetrics/tree/cluster).
|
* Has open source [cluster version](https://github.com/VictoriaMetrics/VictoriaMetrics/tree/cluster).
|
||||||
* See also technical [Articles about VictoriaMetrics](https://github.com/VictoriaMetrics/VictoriaMetrics/wiki/Articles).
|
* See also technical [Articles about VictoriaMetrics](https://github.com/VictoriaMetrics/VictoriaMetrics/wiki/Articles).
|
||||||
|
@ -425,6 +426,55 @@ The `/api/v1/export` endpoint should return the following response:
|
||||||
{"metric":{"__name__":"x.y.z","t1":"v1","t2":"v2"},"values":[45.34],"timestamps":[1566464763000]}
|
{"metric":{"__name__":"x.y.z","t1":"v1","t2":"v2"},"values":[45.34],"timestamps":[1566464763000]}
|
||||||
```
|
```
|
||||||
|
|
||||||
|
|
||||||
|
### How to import CSV data
|
||||||
|
|
||||||
|
Arbitrary CSV data can be imported via `/api/v1/import/csv`. The CSV data is imported according to the provided `format` query arg.
|
||||||
|
The `format` query arg must contain comma-separated list of parsing rules for CSV fields. Each rule consists of three parts delimited by a colon:
|
||||||
|
|
||||||
|
```
|
||||||
|
<column_pos>:<type>:<context>
|
||||||
|
```
|
||||||
|
|
||||||
|
* `<column_pos>` is the position of the CSV column (field). Column numbering starts from 1. The order of parsing rules may be arbitrary.
|
||||||
|
* `<type>` describes the column type. Supported types are:
|
||||||
|
* `metric` - the corresponding CSV column at `<column_pos>` contains metric value. The metric name is read from the `<context>`.
|
||||||
|
CSV line must have at least a single metric field.
|
||||||
|
* `label` - the corresponding CSV column at `<column_pos>` contains label value. The label name is read from the `<context>`.
|
||||||
|
CSV line may have arbitrary number of label fields. All these fields are attached to all the configured metrics.
|
||||||
|
* `time` - the corresponding CSV column at `<column_pos>` contains metric time. CSV line may contain either one or zero columns with time.
|
||||||
|
If CSV line has no time, then the current time is used. The time is applied to all the configured metrics.
|
||||||
|
The format of the time is configured via `<context>`. Supported time formats are:
|
||||||
|
* `unix_s` - unix timestamp in seconds.
|
||||||
|
* `unix_ms` - unix timestamp in milliseconds.
|
||||||
|
* `unix_ns` - unix timestamp in nanoseconds. Note that VictoriaMetrics rounds the timestamp to milliseconds.
|
||||||
|
* `rfc3339` - timestamp in [RFC3339](https://tools.ietf.org/html/rfc3339) format, i.e. `2006-01-02T15:04:05Z`.
|
||||||
|
* `custom:<layout>` - custom layout for the timestamp. The `<layout>` may contain arbitrary time layout according to [time.Parse rules in Go](https://golang.org/pkg/time/#Parse).
|
||||||
|
|
||||||
|
Each request to `/api/v1/import/csv` can contain arbitrary number of CSV lines.
|
||||||
|
|
||||||
|
Example for importing CSV data via `/api/v1/import/csv`:
|
||||||
|
|
||||||
|
```bash
|
||||||
|
curl -d "GOOG,1.23,4.56,NYSE" 'http://localhost:8428/api/v1/import/csv?format=2:metric:ask,3:metric:bid,1:label:ticker,4:label:market'
|
||||||
|
curl -d "MSFT,3.21,1.67,NASDAQ" 'http://localhost:8428/api/v1/import/csv?format=2:metric:ask,3:metric:bid,1:label:ticker,4:label:market'
|
||||||
|
```
|
||||||
|
|
||||||
|
After that the data may be read via [/api/v1/export](#how-to-export-time-series) endpoint:
|
||||||
|
|
||||||
|
```bash
|
||||||
|
curl -G 'http://localhost:8428/api/v1/export' -d 'match[]={ticker!=""}'
|
||||||
|
```
|
||||||
|
|
||||||
|
The following response should be returned:
|
||||||
|
```bash
|
||||||
|
{"metric":{"__name__":"bid","market":"NASDAQ","ticker":"MSFT"},"values":[1.67],"timestamps":[1583865146520]}
|
||||||
|
{"metric":{"__name__":"bid","market":"NYSE","ticker":"GOOG"},"values":[4.56],"timestamps":[1583865146495]}
|
||||||
|
{"metric":{"__name__":"ask","market":"NASDAQ","ticker":"MSFT"},"values":[3.21],"timestamps":[1583865146520]}
|
||||||
|
{"metric":{"__name__":"ask","market":"NYSE","ticker":"GOOG"},"values":[1.23],"timestamps":[1583865146495]}
|
||||||
|
```
|
||||||
|
|
||||||
|
|
||||||
### Prometheus querying API usage
|
### Prometheus querying API usage
|
||||||
|
|
||||||
VictoriaMetrics supports the following handlers from [Prometheus querying API](https://prometheus.io/docs/prometheus/latest/querying/api/):
|
VictoriaMetrics supports the following handlers from [Prometheus querying API](https://prometheus.io/docs/prometheus/latest/querying/api/):
|
||||||
|
@ -600,6 +650,7 @@ Time series data can be imported via any supported ingestion protocol:
|
||||||
* [OpenTSDB telnet put protocol](#sending-data-via-telnet-put-protocol)
|
* [OpenTSDB telnet put protocol](#sending-data-via-telnet-put-protocol)
|
||||||
* [OpenTSDB http /api/put](#sending-opentsdb-data-via-http-apiput-requests)
|
* [OpenTSDB http /api/put](#sending-opentsdb-data-via-http-apiput-requests)
|
||||||
* `/api/v1/import` http POST handler, which accepts data from [/api/v1/export](#how-to-export-time-series).
|
* `/api/v1/import` http POST handler, which accepts data from [/api/v1/export](#how-to-export-time-series).
|
||||||
|
* `/api/v1/import/csv` http POST handler, which accepts CSV data. See [these docs](#how-to-import-csv-data) for details.
|
||||||
|
|
||||||
The most efficient protocol for importing data into VictoriaMetrics is `/api/v1/import`. Example for importing data obtained via `/api/v1/export`:
|
The most efficient protocol for importing data into VictoriaMetrics is `/api/v1/import`. Example for importing data obtained via `/api/v1/export`:
|
||||||
|
|
||||||
|
|
|
@ -21,10 +21,11 @@ to `vmagent` (like the ability to push metrics instead of pulling them). We did
|
||||||
* Can add, remove and modify labels via Prometheus relabeling. See [these docs](#relabeling) for details.
|
* Can add, remove and modify labels via Prometheus relabeling. See [these docs](#relabeling) for details.
|
||||||
* Accepts data via all the ingestion protocols supported by VictoriaMetrics:
|
* Accepts data via all the ingestion protocols supported by VictoriaMetrics:
|
||||||
* Influx line protocol via `http://<vmagent>:8429/write`. See [these docs](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/README.md#how-to-send-data-from-influxdb-compatible-agents-such-as-telegraf).
|
* Influx line protocol via `http://<vmagent>:8429/write`. See [these docs](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/README.md#how-to-send-data-from-influxdb-compatible-agents-such-as-telegraf).
|
||||||
* JSON lines import protocol via `http://<vmagent>:8429/api/v1/import`. See [these docs](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/README.md#how-to-import-time-series-data).
|
|
||||||
* Graphite plaintext protocol if `-graphiteListenAddr` command-line flag is set. See [these docs](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/README.md#how-to-send-data-from-graphite-compatible-agents-such-as-statsd).
|
* Graphite plaintext protocol if `-graphiteListenAddr` command-line flag is set. See [these docs](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/README.md#how-to-send-data-from-graphite-compatible-agents-such-as-statsd).
|
||||||
* OpenTSDB telnet and http protocols if `-opentsdbListenAddr` command-line flag is set. See [these docs](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/README.md#how-to-send-data-from-opentsdb-compatible-agents).
|
* OpenTSDB telnet and http protocols if `-opentsdbListenAddr` command-line flag is set. See [these docs](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/README.md#how-to-send-data-from-opentsdb-compatible-agents).
|
||||||
* Prometheus remote write protocol via `http://<vmagent>:8429/api/v1/write`.
|
* Prometheus remote write protocol via `http://<vmagent>:8429/api/v1/write`.
|
||||||
|
* JSON lines import protocol via `http://<vmagent>:8429/api/v1/import`. See [these docs](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/README.md#how-to-import-time-series-data).
|
||||||
|
* Arbitrary CSV data via `http://<vmagent>:8429/api/v1/import/csv`. See [these docs](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/README.md#how-to-import-csv-data).
|
||||||
* Can replicate collected metrics simultaneously to multiple remote storage systems.
|
* Can replicate collected metrics simultaneously to multiple remote storage systems.
|
||||||
* Works in environments with unstable connections to remote storage. If the remote storage is unavailable, the collected metrics
|
* Works in environments with unstable connections to remote storage. If the remote storage is unavailable, the collected metrics
|
||||||
are buffered at `-remoteWrite.tmpDataPath`. The buffered metrics are sent to remote storage as soon as connection
|
are buffered at `-remoteWrite.tmpDataPath`. The buffered metrics are sent to remote storage as soon as connection
|
||||||
|
@ -53,7 +54,7 @@ If you need collecting only Influx data, then the following command line would b
|
||||||
/path/to/vmagent -remoteWrite.url=https://victoria-metrics-host:8428/api/v1/write
|
/path/to/vmagent -remoteWrite.url=https://victoria-metrics-host:8428/api/v1/write
|
||||||
```
|
```
|
||||||
|
|
||||||
Then send Influx data to `http://vmagent-host:8429/write`. See [these docs](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/README.md#how-to-send-data-from-influxdb-compatible-agents-such-as-telegraf) for more details.
|
Then send Influx data to `http://vmagent-host:8429`. See [these docs](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/README.md#how-to-send-data-from-influxdb-compatible-agents-such-as-telegraf) for more details.
|
||||||
|
|
||||||
`vmagent` is also available in [docker images](https://hub.docker.com/r/victoriametrics/vmagent/).
|
`vmagent` is also available in [docker images](https://hub.docker.com/r/victoriametrics/vmagent/).
|
||||||
|
|
||||||
|
|
63
app/vmagent/csvimport/request_handler.go
Normal file
63
app/vmagent/csvimport/request_handler.go
Normal file
|
@ -0,0 +1,63 @@
|
||||||
|
package csvimport
|
||||||
|
|
||||||
|
import (
|
||||||
|
"net/http"
|
||||||
|
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/common"
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/remotewrite"
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
||||||
|
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/csvimport"
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
|
||||||
|
"github.com/VictoriaMetrics/metrics"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
rowsInserted = metrics.NewCounter(`vmagent_rows_inserted_total{type="csvimport"}`)
|
||||||
|
rowsPerInsert = metrics.NewHistogram(`vmagent_rows_per_insert{type="csvimport"}`)
|
||||||
|
)
|
||||||
|
|
||||||
|
// InsertHandler processes csv data from req.
|
||||||
|
func InsertHandler(req *http.Request) error {
|
||||||
|
return writeconcurrencylimiter.Do(func() error {
|
||||||
|
return parser.ParseStream(req, insertRows)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func insertRows(rows []parser.Row) error {
|
||||||
|
ctx := common.GetPushCtx()
|
||||||
|
defer common.PutPushCtx(ctx)
|
||||||
|
|
||||||
|
tssDst := ctx.WriteRequest.Timeseries[:0]
|
||||||
|
labels := ctx.Labels[:0]
|
||||||
|
samples := ctx.Samples[:0]
|
||||||
|
for i := range rows {
|
||||||
|
r := &rows[i]
|
||||||
|
labelsLen := len(labels)
|
||||||
|
labels = append(labels, prompbmarshal.Label{
|
||||||
|
Name: "__name__",
|
||||||
|
Value: r.Metric,
|
||||||
|
})
|
||||||
|
for j := range r.Tags {
|
||||||
|
tag := &r.Tags[j]
|
||||||
|
labels = append(labels, prompbmarshal.Label{
|
||||||
|
Name: tag.Key,
|
||||||
|
Value: tag.Value,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
samples = append(samples, prompbmarshal.Sample{
|
||||||
|
Value: r.Value,
|
||||||
|
Timestamp: r.Timestamp,
|
||||||
|
})
|
||||||
|
tssDst = append(tssDst, prompbmarshal.TimeSeries{
|
||||||
|
Labels: labels[labelsLen:],
|
||||||
|
Samples: samples[len(samples)-1:],
|
||||||
|
})
|
||||||
|
}
|
||||||
|
ctx.WriteRequest.Timeseries = tssDst
|
||||||
|
ctx.Labels = labels
|
||||||
|
ctx.Samples = samples
|
||||||
|
remotewrite.Push(&ctx.WriteRequest)
|
||||||
|
rowsInserted.Add(len(rows))
|
||||||
|
rowsPerInsert.Update(float64(len(rows)))
|
||||||
|
return nil
|
||||||
|
}
|
|
@ -7,6 +7,7 @@ import (
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/csvimport"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/graphite"
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/graphite"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/influx"
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/influx"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/opentsdb"
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/opentsdb"
|
||||||
|
@ -127,6 +128,15 @@ func requestHandler(w http.ResponseWriter, r *http.Request) bool {
|
||||||
}
|
}
|
||||||
w.WriteHeader(http.StatusNoContent)
|
w.WriteHeader(http.StatusNoContent)
|
||||||
return true
|
return true
|
||||||
|
case "/api/v1/import/csv":
|
||||||
|
csvimportRequests.Inc()
|
||||||
|
if err := csvimport.InsertHandler(r); err != nil {
|
||||||
|
csvimportErrors.Inc()
|
||||||
|
httpserver.Errorf(w, "error in %q: %s", r.URL.Path, err)
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
w.WriteHeader(http.StatusNoContent)
|
||||||
|
return true
|
||||||
case "/write", "/api/v2/write":
|
case "/write", "/api/v2/write":
|
||||||
influxWriteRequests.Inc()
|
influxWriteRequests.Inc()
|
||||||
if err := influx.InsertHandlerForHTTP(r); err != nil {
|
if err := influx.InsertHandlerForHTTP(r); err != nil {
|
||||||
|
@ -152,11 +162,14 @@ func requestHandler(w http.ResponseWriter, r *http.Request) bool {
|
||||||
}
|
}
|
||||||
|
|
||||||
var (
|
var (
|
||||||
prometheusWriteRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/api/v1/write", protocol="prometheus"}`)
|
prometheusWriteRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/api/v1/write", protocol="promremotewrite"}`)
|
||||||
prometheusWriteErrors = metrics.NewCounter(`vmagent_http_request_errors_total{path="/api/v1/write", protocol="prometheus"}`)
|
prometheusWriteErrors = metrics.NewCounter(`vmagent_http_request_errors_total{path="/api/v1/write", protocol="promremotewrite"}`)
|
||||||
|
|
||||||
vmimportRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/api/v1/import", protocol="vm"}`)
|
vmimportRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/api/v1/import", protocol="vmimport"}`)
|
||||||
vmimportErrors = metrics.NewCounter(`vmagent_http_request_errors_total{path="/api/v1/import", protocol="vm"}`)
|
vmimportErrors = metrics.NewCounter(`vmagent_http_request_errors_total{path="/api/v1/import", protocol="vmimport"}`)
|
||||||
|
|
||||||
|
csvimportRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/api/v1/import/csv", protocol="csvimport"}`)
|
||||||
|
csvimportErrors = metrics.NewCounter(`vmagent_http_request_errors_total{path="/api/v1/import/csv", protocol="csvimport"}`)
|
||||||
|
|
||||||
influxWriteRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/write", protocol="influx"}`)
|
influxWriteRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/write", protocol="influx"}`)
|
||||||
influxWriteErrors = metrics.NewCounter(`vmagent_http_request_errors_total{path="/write", protocol="influx"}`)
|
influxWriteErrors = metrics.NewCounter(`vmagent_http_request_errors_total{path="/write", protocol="influx"}`)
|
||||||
|
|
44
app/vminsert/csvimport/request_handler.go
Normal file
44
app/vminsert/csvimport/request_handler.go
Normal file
|
@ -0,0 +1,44 @@
|
||||||
|
package csvimport
|
||||||
|
|
||||||
|
import (
|
||||||
|
"net/http"
|
||||||
|
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/common"
|
||||||
|
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/csvimport"
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
|
||||||
|
"github.com/VictoriaMetrics/metrics"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
rowsInserted = metrics.NewCounter(`vm_rows_inserted_total{type="csvimport"}`)
|
||||||
|
rowsPerInsert = metrics.NewHistogram(`vm_rows_per_insert{type="csvimport"}`)
|
||||||
|
)
|
||||||
|
|
||||||
|
// InsertHandler processes /api/v1/import/csv requests.
|
||||||
|
func InsertHandler(req *http.Request) error {
|
||||||
|
return writeconcurrencylimiter.Do(func() error {
|
||||||
|
return parser.ParseStream(req, func(rows []parser.Row) error {
|
||||||
|
return insertRows(rows)
|
||||||
|
})
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func insertRows(rows []parser.Row) error {
|
||||||
|
ctx := common.GetInsertCtx()
|
||||||
|
defer common.PutInsertCtx(ctx)
|
||||||
|
|
||||||
|
ctx.Reset(len(rows))
|
||||||
|
for i := range rows {
|
||||||
|
r := &rows[i]
|
||||||
|
ctx.Labels = ctx.Labels[:0]
|
||||||
|
ctx.AddLabel("", r.Metric)
|
||||||
|
for j := range r.Tags {
|
||||||
|
tag := &r.Tags[j]
|
||||||
|
ctx.AddLabel(tag.Key, tag.Value)
|
||||||
|
}
|
||||||
|
ctx.WriteDataPoint(nil, ctx.Labels, r.Timestamp, r.Value)
|
||||||
|
}
|
||||||
|
rowsInserted.Add(len(rows))
|
||||||
|
rowsPerInsert.Update(float64(len(rows)))
|
||||||
|
return ctx.FlushBufs()
|
||||||
|
}
|
|
@ -6,6 +6,7 @@ import (
|
||||||
"net/http"
|
"net/http"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/csvimport"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/graphite"
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/graphite"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/influx"
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/influx"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/opentsdb"
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/opentsdb"
|
||||||
|
@ -100,6 +101,15 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool {
|
||||||
}
|
}
|
||||||
w.WriteHeader(http.StatusNoContent)
|
w.WriteHeader(http.StatusNoContent)
|
||||||
return true
|
return true
|
||||||
|
case "/api/v1/import/csv":
|
||||||
|
csvimportRequests.Inc()
|
||||||
|
if err := csvimport.InsertHandler(r); err != nil {
|
||||||
|
csvimportErrors.Inc()
|
||||||
|
httpserver.Errorf(w, "error in %q: %s", r.URL.Path, err)
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
w.WriteHeader(http.StatusNoContent)
|
||||||
|
return true
|
||||||
case "/write", "/api/v2/write":
|
case "/write", "/api/v2/write":
|
||||||
influxWriteRequests.Inc()
|
influxWriteRequests.Inc()
|
||||||
if err := influx.InsertHandlerForHTTP(r); err != nil {
|
if err := influx.InsertHandlerForHTTP(r); err != nil {
|
||||||
|
@ -127,11 +137,14 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool {
|
||||||
}
|
}
|
||||||
|
|
||||||
var (
|
var (
|
||||||
prometheusWriteRequests = metrics.NewCounter(`vm_http_requests_total{path="/api/v1/write", protocol="prometheus"}`)
|
prometheusWriteRequests = metrics.NewCounter(`vm_http_requests_total{path="/api/v1/write", protocol="promremotewrite"}`)
|
||||||
prometheusWriteErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/api/v1/write", protocol="prometheus"}`)
|
prometheusWriteErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/api/v1/write", protocol="promremotewrite"}`)
|
||||||
|
|
||||||
vmimportRequests = metrics.NewCounter(`vm_http_requests_total{path="/api/v1/import", protocol="vm"}`)
|
vmimportRequests = metrics.NewCounter(`vm_http_requests_total{path="/api/v1/import", protocol="vmimport"}`)
|
||||||
vmimportErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/api/v1/import", protocol="vm"}`)
|
vmimportErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/api/v1/import", protocol="vmimport"}`)
|
||||||
|
|
||||||
|
csvimportRequests = metrics.NewCounter(`vm_http_requests_total{path="/api/v1/import/csv", protocol="csvimport"}`)
|
||||||
|
csvimportErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/api/v1/import/csv", protocol="csvimport"}`)
|
||||||
|
|
||||||
influxWriteRequests = metrics.NewCounter(`vm_http_requests_total{path="/write", protocol="influx"}`)
|
influxWriteRequests = metrics.NewCounter(`vm_http_requests_total{path="/write", protocol="influx"}`)
|
||||||
influxWriteErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/write", protocol="influx"}`)
|
influxWriteErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/write", protocol="influx"}`)
|
||||||
|
|
|
@ -52,7 +52,8 @@ Cluster version is available [here](https://github.com/VictoriaMetrics/VictoriaM
|
||||||
if `-graphiteListenAddr` is set.
|
if `-graphiteListenAddr` is set.
|
||||||
* [OpenTSDB put message](#sending-data-via-telnet-put-protocol) if `-opentsdbListenAddr` is set.
|
* [OpenTSDB put message](#sending-data-via-telnet-put-protocol) if `-opentsdbListenAddr` is set.
|
||||||
* [HTTP OpenTSDB /api/put requests](#sending-opentsdb-data-via-http-apiput-requests) if `-opentsdbHTTPListenAddr` is set.
|
* [HTTP OpenTSDB /api/put requests](#sending-opentsdb-data-via-http-apiput-requests) if `-opentsdbHTTPListenAddr` is set.
|
||||||
* [/api/v1/import](#how-to-import-time-series-data)
|
* [/api/v1/import](#how-to-import-time-series-data).
|
||||||
|
* [Arbitrary CSV data](#how-to-import-csv-data).
|
||||||
* Ideally works with big amounts of time series data from Kubernetes, IoT sensors, connected cars, industrial telemetry, financial data and various Enterprise workloads.
|
* Ideally works with big amounts of time series data from Kubernetes, IoT sensors, connected cars, industrial telemetry, financial data and various Enterprise workloads.
|
||||||
* Has open source [cluster version](https://github.com/VictoriaMetrics/VictoriaMetrics/tree/cluster).
|
* Has open source [cluster version](https://github.com/VictoriaMetrics/VictoriaMetrics/tree/cluster).
|
||||||
* See also technical [Articles about VictoriaMetrics](https://github.com/VictoriaMetrics/VictoriaMetrics/wiki/Articles).
|
* See also technical [Articles about VictoriaMetrics](https://github.com/VictoriaMetrics/VictoriaMetrics/wiki/Articles).
|
||||||
|
@ -415,6 +416,55 @@ The `/api/v1/export` endpoint should return the following response:
|
||||||
{"metric":{"__name__":"x.y.z","t1":"v1","t2":"v2"},"values":[45.34],"timestamps":[1566464763000]}
|
{"metric":{"__name__":"x.y.z","t1":"v1","t2":"v2"},"values":[45.34],"timestamps":[1566464763000]}
|
||||||
```
|
```
|
||||||
|
|
||||||
|
|
||||||
|
### How to import CSV data
|
||||||
|
|
||||||
|
Arbitrary CSV data can be imported via `/api/v1/import/csv`. The CSV data is imported according to the provided `format` query arg.
|
||||||
|
The `format` query arg must contain comma-separated list of parsing rules for CSV fields. Each rule consists of three parts delimited by a colon:
|
||||||
|
|
||||||
|
```
|
||||||
|
<column_pos>:<type>:<context>
|
||||||
|
```
|
||||||
|
|
||||||
|
* `<column_pos>` is the position of the CSV column (field). Column numbering starts from 1. The order of parsing rules may be arbitrary.
|
||||||
|
* `<type>` describes the column type. Supported types are:
|
||||||
|
* `metric` - the corresponding CSV column at `<column_pos>` contains metric value. The metric name is read from the `<context>`.
|
||||||
|
CSV line must have at least a single metric field.
|
||||||
|
* `label` - the corresponding CSV column at `<column_pos>` contains label value. The label name is read from the `<context>`.
|
||||||
|
CSV line may have arbitrary number of label fields. All these fields are attached to all the configured metrics.
|
||||||
|
* `time` - the corresponding CSV column at `<column_pos>` contains metric time. CSV line may contain either one or zero columns with time.
|
||||||
|
If CSV line has no time, then the current time is used. The time is applied to all the configured metrics.
|
||||||
|
The format of the time is configured via `<context>`. Supported time formats are:
|
||||||
|
* `unix_s` - unix timestamp in seconds.
|
||||||
|
* `unix_ms` - unix timestamp in milliseconds.
|
||||||
|
* `unix_ns` - unix timestamp in nanoseconds. Note that VictoriaMetrics rounds the timestamp to milliseconds.
|
||||||
|
* `rfc3339` - timestamp in [RFC3339](https://tools.ietf.org/html/rfc3339) format, i.e. `2006-01-02T15:04:05Z`.
|
||||||
|
* `custom:<layout>` - custom layout for the timestamp. The `<layout>` may contain arbitrary time layout according to [time.Parse rules in Go](https://golang.org/pkg/time/#Parse).
|
||||||
|
|
||||||
|
Each request to `/api/v1/import/csv` can contain arbitrary number of CSV lines.
|
||||||
|
|
||||||
|
Example for importing CSV data via `/api/v1/import/csv`:
|
||||||
|
|
||||||
|
```bash
|
||||||
|
curl -d "GOOG,1.23,4.56,NYSE" 'http://localhost:8428/api/v1/import/csv?format=2:metric:ask,3:metric:bid,1:label:ticker,4:label:market'
|
||||||
|
curl -d "MSFT,3.21,1.67,NASDAQ" 'http://localhost:8428/api/v1/import/csv?format=2:metric:ask,3:metric:bid,1:label:ticker,4:label:market'
|
||||||
|
```
|
||||||
|
|
||||||
|
After that the data may be read via [/api/v1/export](#how-to-export-time-series) endpoint:
|
||||||
|
|
||||||
|
```bash
|
||||||
|
curl -G 'http://localhost:8428/api/v1/export' -d 'match[]={ticker!=""}'
|
||||||
|
```
|
||||||
|
|
||||||
|
The following response should be returned:
|
||||||
|
```bash
|
||||||
|
{"metric":{"__name__":"bid","market":"NASDAQ","ticker":"MSFT"},"values":[1.67],"timestamps":[1583865146520]}
|
||||||
|
{"metric":{"__name__":"bid","market":"NYSE","ticker":"GOOG"},"values":[4.56],"timestamps":[1583865146495]}
|
||||||
|
{"metric":{"__name__":"ask","market":"NASDAQ","ticker":"MSFT"},"values":[3.21],"timestamps":[1583865146520]}
|
||||||
|
{"metric":{"__name__":"ask","market":"NYSE","ticker":"GOOG"},"values":[1.23],"timestamps":[1583865146495]}
|
||||||
|
```
|
||||||
|
|
||||||
|
|
||||||
### Prometheus querying API usage
|
### Prometheus querying API usage
|
||||||
|
|
||||||
VictoriaMetrics supports the following handlers from [Prometheus querying API](https://prometheus.io/docs/prometheus/latest/querying/api/):
|
VictoriaMetrics supports the following handlers from [Prometheus querying API](https://prometheus.io/docs/prometheus/latest/querying/api/):
|
||||||
|
@ -590,6 +640,7 @@ Time series data can be imported via any supported ingestion protocol:
|
||||||
* [OpenTSDB telnet put protocol](#sending-data-via-telnet-put-protocol)
|
* [OpenTSDB telnet put protocol](#sending-data-via-telnet-put-protocol)
|
||||||
* [OpenTSDB http /api/put](#sending-opentsdb-data-via-http-apiput-requests)
|
* [OpenTSDB http /api/put](#sending-opentsdb-data-via-http-apiput-requests)
|
||||||
* `/api/v1/import` http POST handler, which accepts data from [/api/v1/export](#how-to-export-time-series).
|
* `/api/v1/import` http POST handler, which accepts data from [/api/v1/export](#how-to-export-time-series).
|
||||||
|
* `/api/v1/import/csv` http POST handler, which accepts CSV data. See [these docs](#how-to-import-csv-data) for details.
|
||||||
|
|
||||||
The most efficient protocol for importing data into VictoriaMetrics is `/api/v1/import`. Example for importing data obtained via `/api/v1/export`:
|
The most efficient protocol for importing data into VictoriaMetrics is `/api/v1/import`. Example for importing data obtained via `/api/v1/export`:
|
||||||
|
|
||||||
|
|
172
lib/protoparser/csvimport/column_descriptor.go
Normal file
172
lib/protoparser/csvimport/column_descriptor.go
Normal file
|
@ -0,0 +1,172 @@
|
||||||
|
package csvimport
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"strconv"
|
||||||
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/valyala/fastjson/fastfloat"
|
||||||
|
)
|
||||||
|
|
||||||
|
// ColumnDescriptor represents parsing rules for a single csv column.
|
||||||
|
//
|
||||||
|
// The column is transformed to either timestamp, tag or metric value
|
||||||
|
// depending on the corresponding non-empty field.
|
||||||
|
//
|
||||||
|
// If all the fields are empty, then the given column is ignored.
|
||||||
|
type ColumnDescriptor struct {
|
||||||
|
// ParseTimestamp is set to a function, which is used for timestamp
|
||||||
|
// parsing from the given column.
|
||||||
|
ParseTimestamp func(s string) (int64, error)
|
||||||
|
|
||||||
|
// TagName is set to tag name for tag value, which should be obtained
|
||||||
|
// from the given column.
|
||||||
|
TagName string
|
||||||
|
|
||||||
|
// MetricName is set to metric name for value obtained from the given column.
|
||||||
|
MetricName string
|
||||||
|
}
|
||||||
|
|
||||||
|
const maxColumnsPerRow = 64 * 1024
|
||||||
|
|
||||||
|
// ParseColumnDescriptors parses column descriptors from s.
|
||||||
|
//
|
||||||
|
// s must have comma-separated list of the following entries:
|
||||||
|
//
|
||||||
|
// <column_pos>:<column_type>:<extension>
|
||||||
|
//
|
||||||
|
// Where:
|
||||||
|
//
|
||||||
|
// - <column_pos> is numeric csv column position. The first column has position 1.
|
||||||
|
// - <column_type> is one of the following types:
|
||||||
|
// - time - the corresponding column contains timestamp. Timestamp format is determined by <extension>. The following formats are supported:
|
||||||
|
// - unix_s - unix timestamp in seconds
|
||||||
|
// - unix_ms - unix timestamp in milliseconds
|
||||||
|
// - unix_ns - unix_timestamp in nanoseconds
|
||||||
|
// - rfc3339 - RFC3339 format in the form `2006-01-02T15:04:05Z07:00`
|
||||||
|
// - label - the corresponding column contains metric label with the name set in <extension>.
|
||||||
|
// - metric - the corresponding column contains metric value with the name set in <extension>.
|
||||||
|
//
|
||||||
|
// s must contain at least a single 'metric' column and no more than a single `time` column.
|
||||||
|
func ParseColumnDescriptors(s string) ([]ColumnDescriptor, error) {
|
||||||
|
m := make(map[int]ColumnDescriptor)
|
||||||
|
cols := strings.Split(s, ",")
|
||||||
|
hasValueCol := false
|
||||||
|
hasTimeCol := false
|
||||||
|
maxPos := 0
|
||||||
|
for i, col := range cols {
|
||||||
|
var cd ColumnDescriptor
|
||||||
|
a := strings.SplitN(col, ":", 3)
|
||||||
|
if len(a) != 3 {
|
||||||
|
return nil, fmt.Errorf("entry #%d must have the following form: <column_pos>:<column_type>:<extension>; got %q", i+1, a)
|
||||||
|
}
|
||||||
|
pos, err := strconv.Atoi(a[0])
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("cannot parse <column_pos> part from the entry #%d %q: %s", i+1, col, err)
|
||||||
|
}
|
||||||
|
if pos <= 0 {
|
||||||
|
return nil, fmt.Errorf("<column_pos> cannot be smaller than 1; got %d for entry #%d %q", pos, i+1, col)
|
||||||
|
}
|
||||||
|
if pos > maxColumnsPerRow {
|
||||||
|
return nil, fmt.Errorf("<column_pos> cannot be bigger than %d; got %d for entry #%d %q", maxColumnsPerRow, pos, i+1, col)
|
||||||
|
}
|
||||||
|
if pos > maxPos {
|
||||||
|
maxPos = pos
|
||||||
|
}
|
||||||
|
typ := a[1]
|
||||||
|
switch typ {
|
||||||
|
case "time":
|
||||||
|
if hasTimeCol {
|
||||||
|
return nil, fmt.Errorf("duplicate time column has been found at entry #%d %q for %q", i+1, col, s)
|
||||||
|
}
|
||||||
|
parseTimestamp, err := parseTimeFormat(a[2])
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("cannot parse time format from the entry #%d %q: %s", i+1, col, err)
|
||||||
|
}
|
||||||
|
cd.ParseTimestamp = parseTimestamp
|
||||||
|
hasTimeCol = true
|
||||||
|
case "label":
|
||||||
|
cd.TagName = a[2]
|
||||||
|
if len(cd.TagName) == 0 {
|
||||||
|
return nil, fmt.Errorf("label name cannot be empty in the entry #%d %q", i+1, col)
|
||||||
|
}
|
||||||
|
case "metric":
|
||||||
|
cd.MetricName = a[2]
|
||||||
|
if len(cd.MetricName) == 0 {
|
||||||
|
return nil, fmt.Errorf("metric name cannot be empty in the entry #%d %q", i+1, col)
|
||||||
|
}
|
||||||
|
hasValueCol = true
|
||||||
|
default:
|
||||||
|
return nil, fmt.Errorf("unknown <column_type>: %q; allowed values: time, metric, label", typ)
|
||||||
|
}
|
||||||
|
pos--
|
||||||
|
if _, ok := m[pos]; ok {
|
||||||
|
return nil, fmt.Errorf("duplicate <column_pos> %d for the entry #%d %q", pos, i+1, col)
|
||||||
|
}
|
||||||
|
m[pos] = cd
|
||||||
|
}
|
||||||
|
if !hasValueCol {
|
||||||
|
return nil, fmt.Errorf("missing 'metric' column in %q", s)
|
||||||
|
}
|
||||||
|
cds := make([]ColumnDescriptor, maxPos)
|
||||||
|
for pos, cd := range m {
|
||||||
|
cds[pos] = cd
|
||||||
|
}
|
||||||
|
return cds, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func parseTimeFormat(format string) (func(s string) (int64, error), error) {
|
||||||
|
if strings.HasPrefix(format, "custom:") {
|
||||||
|
format = format[len("custom:"):]
|
||||||
|
return newParseCustomTimeFunc(format), nil
|
||||||
|
}
|
||||||
|
switch format {
|
||||||
|
case "unix_s":
|
||||||
|
return parseUnixTimestampSeconds, nil
|
||||||
|
case "unix_ms":
|
||||||
|
return parseUnixTimestampMilliseconds, nil
|
||||||
|
case "unix_ns":
|
||||||
|
return parseUnixTimestampNanoseconds, nil
|
||||||
|
case "rfc3339":
|
||||||
|
return parseRFC3339, nil
|
||||||
|
default:
|
||||||
|
return nil, fmt.Errorf("unknown format for time parsing: %q; supported formats: unix_s, unix_ms, unix_ns, rfc3339", format)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func parseUnixTimestampSeconds(s string) (int64, error) {
|
||||||
|
n := fastfloat.ParseInt64BestEffort(s)
|
||||||
|
if n > int64(1<<63-1)/1e3 {
|
||||||
|
return 0, fmt.Errorf("too big unix timestamp in seconds: %d; must be smaller than %d", n, int64(1<<63-1)/1e3)
|
||||||
|
}
|
||||||
|
return n * 1e3, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func parseUnixTimestampMilliseconds(s string) (int64, error) {
|
||||||
|
n := fastfloat.ParseInt64BestEffort(s)
|
||||||
|
return n, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func parseUnixTimestampNanoseconds(s string) (int64, error) {
|
||||||
|
n := fastfloat.ParseInt64BestEffort(s)
|
||||||
|
return n / 1e6, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func parseRFC3339(s string) (int64, error) {
|
||||||
|
t, err := time.Parse(time.RFC3339, s)
|
||||||
|
if err != nil {
|
||||||
|
return 0, fmt.Errorf("cannot parse time in RFC3339 from %q: %s", s, err)
|
||||||
|
}
|
||||||
|
return t.UnixNano() / 1e6, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func newParseCustomTimeFunc(format string) func(s string) (int64, error) {
|
||||||
|
return func(s string) (int64, error) {
|
||||||
|
t, err := time.Parse(format, s)
|
||||||
|
if err != nil {
|
||||||
|
return 0, fmt.Errorf("cannot parse time in custom format %q from %q: %s", format, s, err)
|
||||||
|
}
|
||||||
|
return t.UnixNano() / 1e6, nil
|
||||||
|
}
|
||||||
|
}
|
226
lib/protoparser/csvimport/column_descriptor_test.go
Normal file
226
lib/protoparser/csvimport/column_descriptor_test.go
Normal file
|
@ -0,0 +1,226 @@
|
||||||
|
package csvimport
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"fmt"
|
||||||
|
"reflect"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
"unsafe"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestParseColumnDescriptorsSuccess(t *testing.T) {
|
||||||
|
f := func(s string, cdsExpected []ColumnDescriptor) {
|
||||||
|
t.Helper()
|
||||||
|
cds, err := ParseColumnDescriptors(s)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unexpected error on ParseColumnDescriptors(%q): %s", s, err)
|
||||||
|
}
|
||||||
|
if !equalColumnDescriptors(cds, cdsExpected) {
|
||||||
|
t.Fatalf("unexpected cds returned from ParseColumnDescriptors(%q);\ngot\n%v\nwant\n%v", s, cds, cdsExpected)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
f("1:time:unix_s,3:metric:temperature", []ColumnDescriptor{
|
||||||
|
{
|
||||||
|
ParseTimestamp: parseUnixTimestampSeconds,
|
||||||
|
},
|
||||||
|
{},
|
||||||
|
{
|
||||||
|
MetricName: "temperature",
|
||||||
|
},
|
||||||
|
})
|
||||||
|
f("2:time:unix_ns,1:metric:temperature,3:label:city,4:label:country", []ColumnDescriptor{
|
||||||
|
{
|
||||||
|
MetricName: "temperature",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
ParseTimestamp: parseUnixTimestampNanoseconds,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
TagName: "city",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
TagName: "country",
|
||||||
|
},
|
||||||
|
})
|
||||||
|
f("2:time:unix_ms,1:metric:temperature", []ColumnDescriptor{
|
||||||
|
{
|
||||||
|
MetricName: "temperature",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
ParseTimestamp: parseUnixTimestampMilliseconds,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
f("2:time:rfc3339,1:metric:temperature", []ColumnDescriptor{
|
||||||
|
{
|
||||||
|
MetricName: "temperature",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
ParseTimestamp: parseRFC3339,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestParseColumnDescriptorsFailure(t *testing.T) {
|
||||||
|
f := func(s string) {
|
||||||
|
t.Helper()
|
||||||
|
cds, err := ParseColumnDescriptors(s)
|
||||||
|
if err == nil {
|
||||||
|
t.Fatalf("expecting non-nil error for ParseColumnDescriptors(%q)", s)
|
||||||
|
}
|
||||||
|
if cds != nil {
|
||||||
|
t.Fatalf("expecting nil cds; got %v", cds)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Empty string
|
||||||
|
f("")
|
||||||
|
|
||||||
|
// Missing metric column
|
||||||
|
f("1:time:unix_s")
|
||||||
|
f("1:label:aaa")
|
||||||
|
|
||||||
|
// Invalid column number
|
||||||
|
f("foo:time:unix_s,bar:metric:temp")
|
||||||
|
f("0:metric:aaa")
|
||||||
|
f("-123:metric:aaa")
|
||||||
|
f(fmt.Sprintf("%d:metric:aaa", maxColumnsPerRow+10))
|
||||||
|
|
||||||
|
// Duplicate time column
|
||||||
|
f("1:time:unix_s,2:time:rfc3339,3:metric:aaa")
|
||||||
|
f("1:time:custom:2006,2:time:rfc3339,3:metric:aaa")
|
||||||
|
|
||||||
|
// Invalid time format
|
||||||
|
f("1:time:foobar,2:metric:aaa")
|
||||||
|
f("1:time:,2:metric:aaa")
|
||||||
|
f("1:time:sss:sss,2:metric:aaa")
|
||||||
|
|
||||||
|
// empty label name
|
||||||
|
f("2:label:,1:metric:aaa")
|
||||||
|
|
||||||
|
// Empty metric name
|
||||||
|
f("1:metric:")
|
||||||
|
|
||||||
|
// Unknown type
|
||||||
|
f("1:metric:aaa,2:aaaa:bbb")
|
||||||
|
|
||||||
|
// duplicate column number
|
||||||
|
f("1:metric:a,1:metric:b")
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestParseUnixTimestampSeconds(t *testing.T) {
|
||||||
|
f := func(s string, tsExpected int64) {
|
||||||
|
t.Helper()
|
||||||
|
ts, err := parseUnixTimestampSeconds(s)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unexpected error when parsing %q: %s", s, err)
|
||||||
|
}
|
||||||
|
if ts != tsExpected {
|
||||||
|
t.Fatalf("unexpected ts when parsing %q; got %d; want %d", s, ts, tsExpected)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
f("0", 0)
|
||||||
|
f("123", 123000)
|
||||||
|
f("-123", -123000)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestParseUnixTimestampMilliseconds(t *testing.T) {
|
||||||
|
f := func(s string, tsExpected int64) {
|
||||||
|
t.Helper()
|
||||||
|
ts, err := parseUnixTimestampMilliseconds(s)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unexpected error when parsing %q: %s", s, err)
|
||||||
|
}
|
||||||
|
if ts != tsExpected {
|
||||||
|
t.Fatalf("unexpected ts when parsing %q; got %d; want %d", s, ts, tsExpected)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
f("0", 0)
|
||||||
|
f("123", 123)
|
||||||
|
f("-123", -123)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestParseUnixTimestampNanoseconds(t *testing.T) {
|
||||||
|
f := func(s string, tsExpected int64) {
|
||||||
|
t.Helper()
|
||||||
|
ts, err := parseUnixTimestampNanoseconds(s)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unexpected error when parsing %q: %s", s, err)
|
||||||
|
}
|
||||||
|
if ts != tsExpected {
|
||||||
|
t.Fatalf("unexpected ts when parsing %q; got %d; want %d", s, ts, tsExpected)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
f("0", 0)
|
||||||
|
f("123", 0)
|
||||||
|
f("12343567", 12)
|
||||||
|
f("-12343567", -12)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestParseRFC3339(t *testing.T) {
|
||||||
|
f := func(s string, tsExpected int64) {
|
||||||
|
t.Helper()
|
||||||
|
ts, err := parseRFC3339(s)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unexpected error when parsing %q: %s", s, err)
|
||||||
|
}
|
||||||
|
if ts != tsExpected {
|
||||||
|
t.Fatalf("unexpected ts when parsing %q; got %d; want %d", s, ts, tsExpected)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
f("2006-01-02T15:04:05Z", 1136214245000)
|
||||||
|
f("2020-03-11T18:23:46Z", 1583951026000)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestParseCustomTimeFunc(t *testing.T) {
|
||||||
|
f := func(format, s string, tsExpected int64) {
|
||||||
|
t.Helper()
|
||||||
|
f := newParseCustomTimeFunc(format)
|
||||||
|
ts, err := f(s)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unexpected error when parsing %q: %s", s, err)
|
||||||
|
}
|
||||||
|
if ts != tsExpected {
|
||||||
|
t.Fatalf("unexpected ts when parsing %q; got %d; want %d", s, ts, tsExpected)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
f(time.RFC1123, "Mon, 29 Oct 2018 07:50:37 GMT", 1540799437000)
|
||||||
|
f("2006-01-02 15:04:05.999Z", "2015-08-10 20:04:40.123Z", 1439237080123)
|
||||||
|
}
|
||||||
|
|
||||||
|
func equalColumnDescriptors(a, b []ColumnDescriptor) bool {
|
||||||
|
if len(a) != len(b) {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
for i, x := range a {
|
||||||
|
y := b[i]
|
||||||
|
if !equalColumnDescriptor(x, y) {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
func equalColumnDescriptor(x, y ColumnDescriptor) bool {
|
||||||
|
sh1 := &reflect.SliceHeader{
|
||||||
|
Data: uintptr(unsafe.Pointer(&x.ParseTimestamp)),
|
||||||
|
Len: int(unsafe.Sizeof(x.ParseTimestamp)),
|
||||||
|
Cap: int(unsafe.Sizeof(x.ParseTimestamp)),
|
||||||
|
}
|
||||||
|
b1 := *(*[]byte)(unsafe.Pointer(sh1))
|
||||||
|
sh2 := &reflect.SliceHeader{
|
||||||
|
Data: uintptr(unsafe.Pointer(&y.ParseTimestamp)),
|
||||||
|
Len: int(unsafe.Sizeof(y.ParseTimestamp)),
|
||||||
|
Cap: int(unsafe.Sizeof(y.ParseTimestamp)),
|
||||||
|
}
|
||||||
|
b2 := *(*[]byte)(unsafe.Pointer(sh2))
|
||||||
|
if !bytes.Equal(b1, b2) {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
if x.TagName != y.TagName {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
if x.MetricName != y.MetricName {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
}
|
141
lib/protoparser/csvimport/parser.go
Normal file
141
lib/protoparser/csvimport/parser.go
Normal file
|
@ -0,0 +1,141 @@
|
||||||
|
package csvimport
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||||
|
"github.com/valyala/fastjson/fastfloat"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Rows represents csv rows.
|
||||||
|
type Rows struct {
|
||||||
|
// Rows contains parsed csv rows after the call to Unmarshal.
|
||||||
|
Rows []Row
|
||||||
|
|
||||||
|
sc scanner
|
||||||
|
tagsPool []Tag
|
||||||
|
metricsPool []metric
|
||||||
|
}
|
||||||
|
|
||||||
|
// Reset resets rs.
|
||||||
|
func (rs *Rows) Reset() {
|
||||||
|
rows := rs.Rows
|
||||||
|
for i := range rows {
|
||||||
|
r := &rows[i]
|
||||||
|
r.Metric = ""
|
||||||
|
r.Tags = nil
|
||||||
|
r.Value = 0
|
||||||
|
r.Timestamp = 0
|
||||||
|
}
|
||||||
|
rs.Rows = rs.Rows[:0]
|
||||||
|
|
||||||
|
rs.sc.Init("")
|
||||||
|
|
||||||
|
tags := rs.tagsPool
|
||||||
|
for i := range tags {
|
||||||
|
t := &tags[i]
|
||||||
|
t.Key = ""
|
||||||
|
t.Value = ""
|
||||||
|
}
|
||||||
|
rs.tagsPool = rs.tagsPool[:0]
|
||||||
|
|
||||||
|
metrics := rs.metricsPool
|
||||||
|
for i := range metrics {
|
||||||
|
m := &metrics[i]
|
||||||
|
m.Name = ""
|
||||||
|
m.Value = 0
|
||||||
|
}
|
||||||
|
rs.metricsPool = rs.metricsPool[:0]
|
||||||
|
}
|
||||||
|
|
||||||
|
// Row represents a single metric row
|
||||||
|
type Row struct {
|
||||||
|
Metric string
|
||||||
|
Tags []Tag
|
||||||
|
Value float64
|
||||||
|
Timestamp int64
|
||||||
|
}
|
||||||
|
|
||||||
|
// Tag represents metric tag
|
||||||
|
type Tag struct {
|
||||||
|
Key string
|
||||||
|
Value string
|
||||||
|
}
|
||||||
|
|
||||||
|
type metric struct {
|
||||||
|
Name string
|
||||||
|
Value float64
|
||||||
|
}
|
||||||
|
|
||||||
|
// Unmarshal unmarshal csv lines from s according to the given cds.
|
||||||
|
func (rs *Rows) Unmarshal(s string, cds []ColumnDescriptor) {
|
||||||
|
rs.sc.Init(s)
|
||||||
|
rs.Rows, rs.tagsPool, rs.metricsPool = parseRows(&rs.sc, rs.Rows[:0], rs.tagsPool[:0], rs.metricsPool[:0], cds)
|
||||||
|
}
|
||||||
|
|
||||||
|
func parseRows(sc *scanner, dst []Row, tags []Tag, metrics []metric, cds []ColumnDescriptor) ([]Row, []Tag, []metric) {
|
||||||
|
for sc.NextLine() {
|
||||||
|
line := sc.Line
|
||||||
|
var r Row
|
||||||
|
col := uint(0)
|
||||||
|
metrics = metrics[:0]
|
||||||
|
tagsLen := len(tags)
|
||||||
|
for sc.NextColumn() {
|
||||||
|
if col >= uint(len(cds)) {
|
||||||
|
// Skip superflouous column.
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
cd := &cds[col]
|
||||||
|
col++
|
||||||
|
if parseTimestamp := cd.ParseTimestamp; parseTimestamp != nil {
|
||||||
|
timestamp, err := parseTimestamp(sc.Column)
|
||||||
|
if err != nil {
|
||||||
|
sc.Error = fmt.Errorf("cannot parse timestamp from %q: %s", sc.Column, err)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
r.Timestamp = timestamp
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if tagName := cd.TagName; tagName != "" {
|
||||||
|
tags = append(tags, Tag{
|
||||||
|
Key: tagName,
|
||||||
|
Value: sc.Column,
|
||||||
|
})
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
metricName := cd.MetricName
|
||||||
|
if metricName == "" {
|
||||||
|
// The given field is ignored.
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
value := fastfloat.ParseBestEffort(sc.Column)
|
||||||
|
metrics = append(metrics, metric{
|
||||||
|
Name: metricName,
|
||||||
|
Value: value,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
if col < uint(len(cds)) && sc.Error == nil {
|
||||||
|
sc.Error = fmt.Errorf("missing columns in the csv line %q; got %d columns; want at least %d columns", line, col, len(cds))
|
||||||
|
}
|
||||||
|
if sc.Error != nil {
|
||||||
|
logger.Errorf("error when parsing csv line %q: %s; skipping this line", line, sc.Error)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if len(metrics) == 0 {
|
||||||
|
logger.Panicf("BUG: expecting at least a single metric in columnDescriptors=%#v", cds)
|
||||||
|
}
|
||||||
|
r.Metric = metrics[0].Name
|
||||||
|
r.Tags = tags[tagsLen:]
|
||||||
|
r.Value = metrics[0].Value
|
||||||
|
dst = append(dst, r)
|
||||||
|
for _, m := range metrics[1:] {
|
||||||
|
dst = append(dst, Row{
|
||||||
|
Metric: m.Name,
|
||||||
|
Tags: r.Tags,
|
||||||
|
Value: m.Value,
|
||||||
|
Timestamp: r.Timestamp,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return dst, tags, metrics
|
||||||
|
}
|
171
lib/protoparser/csvimport/parser_test.go
Normal file
171
lib/protoparser/csvimport/parser_test.go
Normal file
|
@ -0,0 +1,171 @@
|
||||||
|
package csvimport
|
||||||
|
|
||||||
|
import (
|
||||||
|
"reflect"
|
||||||
|
"testing"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestRowsUnmarshalFailure(t *testing.T) {
|
||||||
|
f := func(format, s string) {
|
||||||
|
t.Helper()
|
||||||
|
cds, err := ParseColumnDescriptors(format)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unexpected error when parsing %q: %s", format, err)
|
||||||
|
}
|
||||||
|
var rs Rows
|
||||||
|
rs.Unmarshal(s, cds)
|
||||||
|
if len(rs.Rows) != 0 {
|
||||||
|
t.Fatalf("unexpected rows unmarshaled: %#v", rs.Rows)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Invalid timestamp
|
||||||
|
f("1:metric:foo,2:time:rfc3339", "234,foobar")
|
||||||
|
|
||||||
|
// Missing columns
|
||||||
|
f("3:metric:aaa", "123,456")
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestRowsUnmarshalSuccess(t *testing.T) {
|
||||||
|
f := func(format, s string, rowsExpected []Row) {
|
||||||
|
t.Helper()
|
||||||
|
cds, err := ParseColumnDescriptors(format)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unexpected error when parsing %q: %s", format, err)
|
||||||
|
}
|
||||||
|
var rs Rows
|
||||||
|
rs.Unmarshal(s, cds)
|
||||||
|
if !reflect.DeepEqual(rs.Rows, rowsExpected) {
|
||||||
|
t.Fatalf("unexpected rows;\ngot\n%v\nwant\n%v", rs.Rows, rowsExpected)
|
||||||
|
}
|
||||||
|
rs.Reset()
|
||||||
|
|
||||||
|
// Unmarshal rows the second time
|
||||||
|
rs.Unmarshal(s, cds)
|
||||||
|
if !reflect.DeepEqual(rs.Rows, rowsExpected) {
|
||||||
|
t.Fatalf("unexpected rows on the second unmarshal;\ngot\n%v\nwant\n%v", rs.Rows, rowsExpected)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
f("1:metric:foo", "", nil)
|
||||||
|
f("1:metric:foo", `123`, []Row{
|
||||||
|
{
|
||||||
|
Metric: "foo",
|
||||||
|
Value: 123,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
f("1:metric:foo,2:time:unix_s,3:label:foo,4:label:bar", `123,456,xxx,yy`, []Row{
|
||||||
|
{
|
||||||
|
Metric: "foo",
|
||||||
|
Tags: []Tag{
|
||||||
|
{
|
||||||
|
Key: "foo",
|
||||||
|
Value: "xxx",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Key: "bar",
|
||||||
|
Value: "yy",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Value: 123,
|
||||||
|
Timestamp: 456000,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
|
||||||
|
// Multiple metrics
|
||||||
|
f("2:metric:bar,1:metric:foo,3:label:foo,4:label:bar,5:time:custom:2006-01-02 15:04:05.999Z",
|
||||||
|
`"2.34",5.6,"foo"",bar","aa",2015-08-10 20:04:40.123Z`, []Row{
|
||||||
|
{
|
||||||
|
Metric: "foo",
|
||||||
|
Tags: []Tag{
|
||||||
|
{
|
||||||
|
Key: "foo",
|
||||||
|
Value: "foo\",bar",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Key: "bar",
|
||||||
|
Value: "aa",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Value: 2.34,
|
||||||
|
Timestamp: 1439237080123,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Metric: "bar",
|
||||||
|
Tags: []Tag{
|
||||||
|
{
|
||||||
|
Key: "foo",
|
||||||
|
Value: "foo\",bar",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Key: "bar",
|
||||||
|
Value: "aa",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Value: 5.6,
|
||||||
|
Timestamp: 1439237080123,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
f("2:label:symbol,3:time:custom:2006-01-02 15:04:05.999Z,4:metric:bid,5:metric:ask",
|
||||||
|
`
|
||||||
|
"aaa","AUDCAD","2015-08-10 00:00:01.000Z",0.9725,0.97273
|
||||||
|
"aaa","AUDCAD","2015-08-10 00:00:02.000Z",0.97253,0.97276
|
||||||
|
`, []Row{
|
||||||
|
{
|
||||||
|
Metric: "bid",
|
||||||
|
Tags: []Tag{
|
||||||
|
{
|
||||||
|
Key: "symbol",
|
||||||
|
Value: "AUDCAD",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Value: 0.9725,
|
||||||
|
Timestamp: 1439164801000,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Metric: "ask",
|
||||||
|
Tags: []Tag{
|
||||||
|
{
|
||||||
|
Key: "symbol",
|
||||||
|
Value: "AUDCAD",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Value: 0.97273,
|
||||||
|
Timestamp: 1439164801000,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Metric: "bid",
|
||||||
|
Tags: []Tag{
|
||||||
|
{
|
||||||
|
Key: "symbol",
|
||||||
|
Value: "AUDCAD",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Value: 0.97253,
|
||||||
|
Timestamp: 1439164802000,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Metric: "ask",
|
||||||
|
Tags: []Tag{
|
||||||
|
{
|
||||||
|
Key: "symbol",
|
||||||
|
Value: "AUDCAD",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Value: 0.97276,
|
||||||
|
Timestamp: 1439164802000,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
|
||||||
|
// Superflouos columns
|
||||||
|
f("1:metric:foo", `123,456,foo,bar`, []Row{
|
||||||
|
{
|
||||||
|
Metric: "foo",
|
||||||
|
Value: 123,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
f("2:metric:foo", `123,-45.6,foo,bar`, []Row{
|
||||||
|
{
|
||||||
|
Metric: "foo",
|
||||||
|
Value: -45.6,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
}
|
31
lib/protoparser/csvimport/parser_timing_test.go
Normal file
31
lib/protoparser/csvimport/parser_timing_test.go
Normal file
|
@ -0,0 +1,31 @@
|
||||||
|
package csvimport
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"testing"
|
||||||
|
)
|
||||||
|
|
||||||
|
func BenchmarkRowsUnmarshal(b *testing.B) {
|
||||||
|
cds, err := ParseColumnDescriptors("1:label:symbol,2:metric:bid,3:metric:ask,4:time:unix_ms")
|
||||||
|
if err != nil {
|
||||||
|
b.Fatalf("cannot parse column descriptors: %s", err)
|
||||||
|
}
|
||||||
|
s := `GOOG,123.456,789.234,1345678999003
|
||||||
|
GOOG,223.456,889.234,1345678939003
|
||||||
|
GOOG,323.456,989.234,1345678949003
|
||||||
|
MSFT,423.456,189.234,1345678959003
|
||||||
|
AMZN,523.456,189.234,1345678959005
|
||||||
|
`
|
||||||
|
const rowsExpected = 10
|
||||||
|
b.SetBytes(int64(len(s)))
|
||||||
|
b.ReportAllocs()
|
||||||
|
b.RunParallel(func(pb *testing.PB) {
|
||||||
|
var rs Rows
|
||||||
|
for pb.Next() {
|
||||||
|
rs.Unmarshal(s, cds)
|
||||||
|
if len(rs.Rows) != rowsExpected {
|
||||||
|
panic(fmt.Errorf("unexpected rows parsed; got %d; want %d; rows: %v", len(rs.Rows), rowsExpected, rs.Rows))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
127
lib/protoparser/csvimport/scanner.go
Normal file
127
lib/protoparser/csvimport/scanner.go
Normal file
|
@ -0,0 +1,127 @@
|
||||||
|
package csvimport
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"strings"
|
||||||
|
)
|
||||||
|
|
||||||
|
// scanner is csv scanner
|
||||||
|
type scanner struct {
|
||||||
|
// The line value read after the call to NextLine()
|
||||||
|
Line string
|
||||||
|
|
||||||
|
// The column value read after the call to NextColumn()
|
||||||
|
Column string
|
||||||
|
|
||||||
|
// Error may be set only on NextColumn call.
|
||||||
|
// It is cleared on NextLine call.
|
||||||
|
Error error
|
||||||
|
|
||||||
|
s string
|
||||||
|
}
|
||||||
|
|
||||||
|
// Init initializes sc with s
|
||||||
|
func (sc *scanner) Init(s string) {
|
||||||
|
sc.Line = ""
|
||||||
|
sc.Column = ""
|
||||||
|
sc.Error = nil
|
||||||
|
sc.s = s
|
||||||
|
}
|
||||||
|
|
||||||
|
// NextLine advances csv scanner to the next line and sets cs.Line to it.
|
||||||
|
//
|
||||||
|
// It clears sc.Error.
|
||||||
|
//
|
||||||
|
// false is returned if no more lines left in sc.s
|
||||||
|
func (sc *scanner) NextLine() bool {
|
||||||
|
s := sc.s
|
||||||
|
sc.Error = nil
|
||||||
|
for len(s) > 0 {
|
||||||
|
n := strings.IndexByte(s, '\n')
|
||||||
|
var line string
|
||||||
|
if n >= 0 {
|
||||||
|
line = trimTrailingSpace(s[:n])
|
||||||
|
s = s[n+1:]
|
||||||
|
} else {
|
||||||
|
line = trimTrailingSpace(s)
|
||||||
|
s = ""
|
||||||
|
}
|
||||||
|
sc.Line = line
|
||||||
|
sc.s = s
|
||||||
|
if len(line) > 0 {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
// NextColumn advances sc.Line to the next Column and sets sc.Column to it.
|
||||||
|
//
|
||||||
|
// false is returned if no more columns left in sc.Line or if any error occurs.
|
||||||
|
// sc.Error is set to error in the case of error.
|
||||||
|
func (sc *scanner) NextColumn() bool {
|
||||||
|
s := sc.Line
|
||||||
|
if len(s) == 0 {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
if sc.Error != nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
if s[0] == '"' {
|
||||||
|
sc.Column, sc.Line, sc.Error = readQuotedField(s)
|
||||||
|
return sc.Error == nil
|
||||||
|
}
|
||||||
|
n := strings.IndexByte(s, ',')
|
||||||
|
if n >= 0 {
|
||||||
|
sc.Column = s[:n]
|
||||||
|
sc.Line = s[n+1:]
|
||||||
|
} else {
|
||||||
|
sc.Column = s
|
||||||
|
sc.Line = ""
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
func trimTrailingSpace(s string) string {
|
||||||
|
if len(s) > 0 && s[len(s)-1] == '\r' {
|
||||||
|
return s[:len(s)-1]
|
||||||
|
}
|
||||||
|
return s
|
||||||
|
}
|
||||||
|
|
||||||
|
func readQuotedField(s string) (string, string, error) {
|
||||||
|
sOrig := s
|
||||||
|
if len(s) == 0 || s[0] != '"' {
|
||||||
|
return "", sOrig, fmt.Errorf("missing opening quote for %q", sOrig)
|
||||||
|
}
|
||||||
|
s = s[1:]
|
||||||
|
hasEscapedQuote := false
|
||||||
|
for {
|
||||||
|
n := strings.IndexByte(s, '"')
|
||||||
|
if n < 0 {
|
||||||
|
return "", sOrig, fmt.Errorf("missing closing quote for %q", sOrig)
|
||||||
|
}
|
||||||
|
s = s[n+1:]
|
||||||
|
if len(s) == 0 {
|
||||||
|
// The end of string found
|
||||||
|
return unquote(sOrig[1:len(sOrig)-1], hasEscapedQuote), "", nil
|
||||||
|
}
|
||||||
|
if s[0] == '"' {
|
||||||
|
// Take into account escaped quote
|
||||||
|
s = s[1:]
|
||||||
|
hasEscapedQuote = true
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if s[0] != ',' {
|
||||||
|
return "", sOrig, fmt.Errorf("missing comma after quoted field in %q", sOrig)
|
||||||
|
}
|
||||||
|
return unquote(sOrig[1:len(sOrig)-len(s)-1], hasEscapedQuote), s[1:], nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func unquote(s string, hasEscapedQuote bool) string {
|
||||||
|
if !hasEscapedQuote {
|
||||||
|
return s
|
||||||
|
}
|
||||||
|
return strings.ReplaceAll(s, `""`, `"`)
|
||||||
|
}
|
85
lib/protoparser/csvimport/scanner_test.go
Normal file
85
lib/protoparser/csvimport/scanner_test.go
Normal file
|
@ -0,0 +1,85 @@
|
||||||
|
package csvimport
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestScannerSuccess(t *testing.T) {
|
||||||
|
var sc scanner
|
||||||
|
sc.Init("foo,bar\n\"aa,\"\"bb\",\"\"")
|
||||||
|
if !sc.NextLine() {
|
||||||
|
t.Fatalf("expecting the first line")
|
||||||
|
}
|
||||||
|
if sc.Line != "foo,bar" {
|
||||||
|
t.Fatalf("unexpected line; got %q; want %q", sc.Line, "foo,bar")
|
||||||
|
}
|
||||||
|
if !sc.NextColumn() {
|
||||||
|
t.Fatalf("expecting the first column")
|
||||||
|
}
|
||||||
|
if sc.Column != "foo" {
|
||||||
|
t.Fatalf("unexpected first column; got %q; want %q", sc.Column, "foo")
|
||||||
|
}
|
||||||
|
if !sc.NextColumn() {
|
||||||
|
t.Fatalf("expecting the second column")
|
||||||
|
}
|
||||||
|
if sc.Column != "bar" {
|
||||||
|
t.Fatalf("unexpected second column; got %q; want %q", sc.Column, "bar")
|
||||||
|
}
|
||||||
|
if sc.NextColumn() {
|
||||||
|
t.Fatalf("unexpected next column: %q", sc.Column)
|
||||||
|
}
|
||||||
|
if sc.Error != nil {
|
||||||
|
t.Fatalf("unexpected error: %s", sc.Error)
|
||||||
|
}
|
||||||
|
if !sc.NextLine() {
|
||||||
|
t.Fatalf("expecting the second line")
|
||||||
|
}
|
||||||
|
if sc.Line != "\"aa,\"\"bb\",\"\"" {
|
||||||
|
t.Fatalf("unexpected the second line; got %q; want %q", sc.Line, "\"aa,\"\"bb\",\"\"")
|
||||||
|
}
|
||||||
|
if !sc.NextColumn() {
|
||||||
|
t.Fatalf("expecting the first column on the second line")
|
||||||
|
}
|
||||||
|
if sc.Column != "aa,\"bb" {
|
||||||
|
t.Fatalf("unexpected column on the second line; got %q; want %q", sc.Column, "aa,\"bb")
|
||||||
|
}
|
||||||
|
if !sc.NextColumn() {
|
||||||
|
t.Fatalf("expecting the second column on the second line")
|
||||||
|
}
|
||||||
|
if sc.Column != "" {
|
||||||
|
t.Fatalf("unexpected column on the second line; got %q; want %q", sc.Column, "")
|
||||||
|
}
|
||||||
|
if sc.NextColumn() {
|
||||||
|
t.Fatalf("unexpected next column on the second line: %q", sc.Column)
|
||||||
|
}
|
||||||
|
if sc.Error != nil {
|
||||||
|
t.Fatalf("unexpected error: %s", sc.Error)
|
||||||
|
}
|
||||||
|
if sc.NextLine() {
|
||||||
|
t.Fatalf("unexpected next line: %q", sc.Line)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestScannerFailure(t *testing.T) {
|
||||||
|
f := func(s string) {
|
||||||
|
t.Helper()
|
||||||
|
var sc scanner
|
||||||
|
sc.Init(s)
|
||||||
|
for sc.NextLine() {
|
||||||
|
for sc.NextColumn() {
|
||||||
|
}
|
||||||
|
if sc.Error != nil {
|
||||||
|
if sc.NextColumn() {
|
||||||
|
t.Fatalf("unexpected NextColumn success after the error %v", sc.Error)
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
t.Fatalf("expecting at least a single error")
|
||||||
|
}
|
||||||
|
// Unclosed quote
|
||||||
|
f("foo\r\n\"bar,")
|
||||||
|
f(`"foo,"bar`)
|
||||||
|
f(`foo,"bar",""a`)
|
||||||
|
f(`foo,"bar","a""`)
|
||||||
|
}
|
124
lib/protoparser/csvimport/streamparser.go
Normal file
124
lib/protoparser/csvimport/streamparser.go
Normal file
|
@ -0,0 +1,124 @@
|
||||||
|
package csvimport
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"net/http"
|
||||||
|
"runtime"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
|
||||||
|
"github.com/VictoriaMetrics/metrics"
|
||||||
|
)
|
||||||
|
|
||||||
|
// ParseStream parses csv from req and calls callback for the parsed rows.
|
||||||
|
//
|
||||||
|
// The callback can be called multiple times for streamed data from req.
|
||||||
|
//
|
||||||
|
// callback shouldn't hold rows after returning.
|
||||||
|
func ParseStream(req *http.Request, callback func(rows []Row) error) error {
|
||||||
|
readCalls.Inc()
|
||||||
|
q := req.URL.Query()
|
||||||
|
format := q.Get("format")
|
||||||
|
cds, err := ParseColumnDescriptors(format)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("cannot parse the provided csv format: %s", err)
|
||||||
|
}
|
||||||
|
r := req.Body
|
||||||
|
if req.Header.Get("Content-Encoding") == "gzip" {
|
||||||
|
zr, err := common.GetGzipReader(r)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("cannot read gzipped csv data: %s", err)
|
||||||
|
}
|
||||||
|
defer common.PutGzipReader(zr)
|
||||||
|
r = zr
|
||||||
|
}
|
||||||
|
|
||||||
|
ctx := getStreamContext()
|
||||||
|
defer putStreamContext(ctx)
|
||||||
|
for ctx.Read(r, cds) {
|
||||||
|
if err := callback(ctx.Rows.Rows); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return ctx.Error()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ctx *streamContext) Read(r io.Reader, cds []ColumnDescriptor) bool {
|
||||||
|
if ctx.err != nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
ctx.reqBuf, ctx.tailBuf, ctx.err = common.ReadLinesBlock(r, ctx.reqBuf, ctx.tailBuf)
|
||||||
|
if ctx.err != nil {
|
||||||
|
if ctx.err != io.EOF {
|
||||||
|
readErrors.Inc()
|
||||||
|
ctx.err = fmt.Errorf("cannot read csv data: %s", ctx.err)
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
ctx.Rows.Unmarshal(bytesutil.ToUnsafeString(ctx.reqBuf), cds)
|
||||||
|
rowsRead.Add(len(ctx.Rows.Rows))
|
||||||
|
|
||||||
|
// Set missing timestamps
|
||||||
|
currentTs := time.Now().UnixNano() / 1e6
|
||||||
|
for i := range ctx.Rows.Rows {
|
||||||
|
row := &ctx.Rows.Rows[i]
|
||||||
|
if row.Timestamp == 0 {
|
||||||
|
row.Timestamp = currentTs
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
var (
|
||||||
|
readCalls = metrics.NewCounter(`vm_protoparser_read_calls_total{type="csvimport"}`)
|
||||||
|
readErrors = metrics.NewCounter(`vm_protoparser_read_errors_total{type="csvimport"}`)
|
||||||
|
rowsRead = metrics.NewCounter(`vm_protoparser_rows_read_total{type="csvimport"}`)
|
||||||
|
)
|
||||||
|
|
||||||
|
type streamContext struct {
|
||||||
|
Rows Rows
|
||||||
|
reqBuf []byte
|
||||||
|
tailBuf []byte
|
||||||
|
err error
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ctx *streamContext) Error() error {
|
||||||
|
if ctx.err == io.EOF {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return ctx.err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ctx *streamContext) reset() {
|
||||||
|
ctx.Rows.Reset()
|
||||||
|
ctx.reqBuf = ctx.reqBuf[:0]
|
||||||
|
ctx.tailBuf = ctx.tailBuf[:0]
|
||||||
|
ctx.err = nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func getStreamContext() *streamContext {
|
||||||
|
select {
|
||||||
|
case ctx := <-streamContextPoolCh:
|
||||||
|
return ctx
|
||||||
|
default:
|
||||||
|
if v := streamContextPool.Get(); v != nil {
|
||||||
|
return v.(*streamContext)
|
||||||
|
}
|
||||||
|
return &streamContext{}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func putStreamContext(ctx *streamContext) {
|
||||||
|
ctx.reset()
|
||||||
|
select {
|
||||||
|
case streamContextPoolCh <- ctx:
|
||||||
|
default:
|
||||||
|
streamContextPool.Put(ctx)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
var streamContextPool sync.Pool
|
||||||
|
var streamContextPoolCh = make(chan *streamContext, runtime.GOMAXPROCS(-1))
|
Loading…
Reference in a new issue