lib/protoparser/influx: enable batch processing by default (#7165)

### Describe Your Changes

Fixes https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7090

### Checklist

The following checks are **mandatory**:

- [ ] My change adheres [VictoriaMetrics contributing
guidelines](https://docs.victoriametrics.com/contributing/).

---------

Signed-off-by: hagen1778 <roman@victoriametrics.com>
Co-authored-by: hagen1778 <roman@victoriametrics.com>
This commit is contained in:
Andrii Chubatiuk 2024-10-15 12:48:40 +03:00 committed by GitHub
parent bac193e50b
commit daa7183749
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
11 changed files with 318 additions and 104 deletions

View file

@ -36,7 +36,7 @@ var (
//
// See https://github.com/influxdata/telegraf/tree/master/plugins/inputs/socket_listener/
func InsertHandlerForReader(at *auth.Token, r io.Reader, isGzipped bool) error {
return stream.Parse(r, isGzipped, "", "", func(db string, rows []parser.Row) error {
return stream.Parse(r, true, isGzipped, "", "", func(db string, rows []parser.Row) error {
return insertRows(at, db, rows, nil)
})
}
@ -50,11 +50,12 @@ func InsertHandlerForHTTP(at *auth.Token, req *http.Request) error {
return err
}
isGzipped := req.Header.Get("Content-Encoding") == "gzip"
isStreamMode := req.Header.Get("Stream-Mode") == "1"
q := req.URL.Query()
precision := q.Get("precision")
// Read db tag from https://docs.influxdata.com/influxdb/v1.7/tools/api/#write-http-endpoint
db := q.Get("db")
return stream.Parse(req.Body, isGzipped, precision, db, func(db string, rows []parser.Row) error {
return stream.Parse(req.Body, isStreamMode, isGzipped, precision, db, func(db string, rows []parser.Row) error {
return insertRows(at, db, rows, extraLabels)
})
}

View file

@ -34,7 +34,7 @@ var (
//
// See https://github.com/influxdata/telegraf/tree/master/plugins/inputs/socket_listener/
func InsertHandlerForReader(r io.Reader) error {
return stream.Parse(r, false, "", "", func(db string, rows []parser.Row) error {
return stream.Parse(r, true, false, "", "", func(db string, rows []parser.Row) error {
return insertRows(db, rows, nil)
})
}
@ -48,11 +48,12 @@ func InsertHandlerForHTTP(req *http.Request) error {
return err
}
isGzipped := req.Header.Get("Content-Encoding") == "gzip"
isStreamMode := req.Header.Get("Stream-Mode") == "1"
q := req.URL.Query()
precision := q.Get("precision")
// Read db tag from https://docs.influxdata.com/influxdb/v1.7/tools/api/#write-http-endpoint
db := q.Get("db")
return stream.Parse(req.Body, isGzipped, precision, db, func(db string, rows []parser.Row) error {
return stream.Parse(req.Body, isStreamMode, isGzipped, precision, db, func(db string, rows []parser.Row) error {
return insertRows(db, rows, extraLabels)
})
}

View file

@ -1185,8 +1185,11 @@ Below is the output for `/path/to/vminsert -help`:
Supports an array of values separated by comma or specified via multiple flags.
Value can contain comma inside single-quoted or double-quoted string, {}, [] and () braces.
-influx.maxLineSize size
The maximum size in bytes for a single InfluxDB line during parsing
The maximum size in bytes for a single InfluxDB line during parsing. Applicable for stream mode only. See https://docs.victoriametrics.com/#how-to-send-data-from-influxdb-compatible-agents-such-as-telegraf
Supports the following optional suffixes for size values: KB, MB, GB, TB, KiB, MiB, GiB, TiB (default 262144)
-influx.maxRequestSize size
The maximum size in bytes of a single InfluxDB request. Applicable for batch mode only. See https://docs.victoriametrics.com/#how-to-send-data-from-influxdb-compatible-agents-such-as-telegraf
Supports the following optional suffixes for size values: KB, MB, GB, TB, KiB, MiB, GiB, TiB (default 67108864)
-influxDBLabel string
Default label for the DB name sent over '?db={db_name}' query parameter (default "db")
-influxListenAddr string

View file

@ -580,7 +580,7 @@ See [these docs](https://docs.victoriametrics.com/vmagent/#adding-labels-to-metr
## How to send data from InfluxDB-compatible agents such as [Telegraf](https://www.influxdata.com/time-series-platform/telegraf/)
Use `http://<victoriametrics-addr>:8428` url instead of InfluxDB url in agents' configs.
Use `http://<victoriametrics-addr>:8428` URL instead of InfluxDB url in agents' configs.
For instance, put the following lines into `Telegraf` config, so it sends data to VictoriaMetrics instead of InfluxDB:
```toml
@ -588,17 +588,33 @@ For instance, put the following lines into `Telegraf` config, so it sends data t
urls = ["http://<victoriametrics-addr>:8428"]
```
Or in case of [`http`](https://github.com/influxdata/telegraf/blob/master/plugins/outputs/http) output
```toml
[[outputs.http]]
url = "http://<victoriametrics-addr>:8428/influx/write"
data_format = "influx"
non_retryable_statuscodes = [400]
```
The size of the request sent to VictoriaMetrics's Influx HTTP endpoints is limited by `-influx.maxRequestSize` (default: 64Mb).
For better ingestion speed and lower memory usage, VM can be switched to stream processing mode by setting `Stream-Mode: "1"`
HTTP header with each request. Please note, in streaming mode VictoriaMetrics processes workload line-by-line (see `-influx.maxLineSize`),
it ignores invalid rows (only logs them) and ingests successfully parsed rows. If client cancels the ingestion request
due to timeout or other reasons, it could happen that some lines from the workload were already parsed and ingested.
Another option is to enable TCP and UDP receiver for InfluxDB line protocol via `-influxListenAddr` command-line flag
and stream plain InfluxDB line protocol data to the configured TCP and/or UDP addresses.
and stream plain InfluxDB line protocol data to the configured TCP and/or UDP addresses. TCP and UDP receivers are
only working in streaming mode.
VictoriaMetrics performs the following transformations to the ingested InfluxDB data:
* [db query arg](https://docs.influxdata.com/influxdb/v1.7/tools/api/#write-http-endpoint) is mapped into `db`
[label](https://docs.victoriametrics.com/keyconcepts/#labels) value unless `db` tag exists in the InfluxDB line.
The `db` label name can be overridden via `-influxDBLabel` command-line flag. If more strict data isolation is required,
read more about multi-tenancy [here](https://docs.victoriametrics.com/keyconcepts/#multi-tenancy).
* Field names are mapped to time series names prefixed with `{measurement}{separator}` value, where `{separator}` equals to `_` by default. It can be changed with `-influxMeasurementFieldSeparator` command-line flag. See also `-influxSkipSingleField` command-line flag. If `{measurement}` is empty or if `-influxSkipMeasurement` command-line flag is set, then time series names correspond to field names.
* Field values are mapped to time series values.
* Non-numeric field values are converted to 0.
* Tags are mapped to Prometheus labels as-is.
* If `-usePromCompatibleNaming` command-line flag is set, then all the metric names and label names
are normalized to [Prometheus-compatible naming](https://prometheus.io/docs/concepts/data_model/#metric-names-and-labels) by replacing unsupported chars with `_`.
@ -620,21 +636,17 @@ foo_field2{tag1="value1", tag2="value2"} 40
Example for writing data with [InfluxDB line protocol](https://docs.influxdata.com/influxdb/v1.7/write_protocols/line_protocol_tutorial/)
to local VictoriaMetrics using `curl`:
```sh
curl -d 'measurement,tag1=value1,tag2=value2 field1=123,field2=1.23' -X POST 'http://localhost:8428/write'
```
An arbitrary number of lines delimited by '\n' (aka newline char) can be sent in a single request.
After that the data may be read via [/api/v1/export](#how-to-export-data-in-json-line-format) endpoint:
```sh
curl -G 'http://localhost:8428/api/v1/export' -d 'match={__name__=~"measurement_.*"}'
```
The `/api/v1/export` endpoint should return the following response:
```json
@ -1359,7 +1371,7 @@ Additionally, VictoriaMetrics can accept metrics via the following popular data
* `/api/v1/import/prometheus` for importing data in Prometheus exposition format and in [Pushgateway format](https://github.com/prometheus/pushgateway#url).
See [these docs](#how-to-import-data-in-prometheus-exposition-format) for details.
Please note, most of the ingestion APIs (except [Prometheus remote_write API](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#remote_write) and [OpenTelemetry](#sending-data-via-opentelemetry))
Please note, most of the ingestion APIs (except [Prometheus remote_write API](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#remote_write), [OpenTelemetry](#sending-data-via-opentelemetry) and [Influx Line Protocol](https://docs.victoriametrics.com/#how-to-send-data-from-influxdb-compatible-agents-such-as-telegraf))
are optimized for performance and processes data in a streaming fashion.
It means that client can transfer unlimited amount of data through the open connection. Because of this, import APIs
may not return parsing errors to the client, as it is expected for data stream to be not interrupted.
@ -2875,8 +2887,11 @@ Pass `-help` to VictoriaMetrics in order to see the list of supported command-li
Supports an array of values separated by comma or specified via multiple flags.
Value can contain comma inside single-quoted or double-quoted string, {}, [] and () braces.
-influx.maxLineSize size
The maximum size in bytes for a single InfluxDB line during parsing
The maximum size in bytes for a single InfluxDB line during parsing. Applicable for stream mode only. See https://docs.victoriametrics.com/#how-to-send-data-from-influxdb-compatible-agents-such-as-telegraf
Supports the following optional suffixes for size values: KB, MB, GB, TB, KiB, MiB, GiB, TiB (default 262144)
-influx.maxRequestSize size
The maximum size in bytes of a single InfluxDB request. Applicable for batch mode only. See https://docs.victoriametrics.com/#how-to-send-data-from-influxdb-compatible-agents-such-as-telegraf
Supports the following optional suffixes for size values: KB, MB, GB, TB, KiB, MiB, GiB, TiB (default 67108864)
-influxDBLabel string
Default label for the DB name sent over '?db={db_name}' query parameter (default "db")
-influxListenAddr string

View file

@ -22,6 +22,7 @@ See also [LTS releases](https://docs.victoriametrics.com/lts-releases/).
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent/): allow using HTTP/2 client for Kubernetes service discovery if `-promscrape.kubernetes.useHTTP2Client` cmd-line flag is set. This could help to reduce the amount of opened connections to the Kubernetes API server. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5971) for the details.
* FEATURE: [vmalert](https://docs.victoriametrics.com/vmalert/): `-rule` cmd-line flag now supports multi-document YAML files. This could be useful when rules are retrieved via HTTP URL where multiple rule files were merged together in one response. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6753). Thanks to @Irene-123 for [the pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6995).
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent/) and [Single-node VictoriaMetrics](https://docs.victoriametrics.com/): add support of [exponential histograms](https://opentelemetry.io/docs/specs/otel/metrics/data-model/#exponentialhistogram) ingested via [OpenTelemetry protocol for metrics](https://docs.victoriametrics.com/#sending-data-via-opentelemetry). Such histograms will be automatically converted to [VictoriaMetrics histogram format](https://valyala.medium.com/improving-histogram-usability-for-prometheus-and-grafana-bc7e5df0e350). See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6354).
* FEATURE: [vmsingle](https://docs.victoriametrics.com/single-server-victoriametrics/), `vminsert` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/cluster-victoriametrics/) and [vmagent](https://docs.victoriametrics.com/vmagent/): disable stream processing mode for data [ingested via InfluxDB](https://docs.victoriametrics.com/#how-to-send-data-from-influxdb-compatible-agents-such-as-telegraf) HTTP endpoints by default. With this change, the data is processed in batches (see `-influx.maxRequestSize`) and user will get parsing errors immediately as they happen. This also improves users' experience and resiliency against thundering herd problems caused by clients without backoff policies like telegraf. To enable stream mode back, pass HTTP header `Stream-Mode: "1"` with each request. For data sent via TCP and UDP (see `-influxListenAddr`) protocols streaming processing remains enabled.
## [v1.104.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.104.0)

View file

@ -1785,8 +1785,11 @@ See the docs at https://docs.victoriametrics.com/vmagent/ .
Supports an array of values separated by comma or specified via multiple flags.
Value can contain comma inside single-quoted or double-quoted string, {}, [] and () braces.
-influx.maxLineSize size
The maximum size in bytes for a single InfluxDB line during parsing
The maximum size in bytes for a single InfluxDB line during parsing. Applicable for stream mode only. See https://docs.victoriametrics.com/#how-to-send-data-from-influxdb-compatible-agents-such-as-telegraf
Supports the following optional suffixes for size values: KB, MB, GB, TB, KiB, MiB, GiB, TiB (default 262144)
-influx.maxRequestSize size
The maximum size in bytes of a single InfluxDB request. Applicable for batch mode only. See https://docs.victoriametrics.com/#how-to-send-data-from-influxdb-compatible-agents-such-as-telegraf
Supports the following optional suffixes for size values: KB, MB, GB, TB, KiB, MiB, GiB, TiB (default 67108864)
-influxDBLabel string
Default label for the DB name sent over '?db={db_name}' query parameter (default "db")
-influxListenAddr string

View file

@ -11,7 +11,8 @@ import (
// Rows contains parsed influx rows.
type Rows struct {
Rows []Row
Rows []Row
IgnoreErrs bool
tagsPool []Tag
fieldsPool []Field
@ -43,8 +44,15 @@ func (rs *Rows) Reset() {
// See https://docs.influxdata.com/influxdb/v1.7/write_protocols/line_protocol_tutorial/
//
// s shouldn't be modified when rs is in use.
func (rs *Rows) Unmarshal(s string) {
rs.Rows, rs.tagsPool, rs.fieldsPool = unmarshalRows(rs.Rows[:0], s, rs.tagsPool[:0], rs.fieldsPool[:0])
func (rs *Rows) Unmarshal(s string) error {
rs.reset()
return rs.unmarshal(s)
}
func (rs *Rows) reset() {
rs.Rows = rs.Rows[:0]
rs.tagsPool = rs.tagsPool[:0]
rs.fieldsPool = rs.fieldsPool[:0]
}
// Row is a single influx row.
@ -176,47 +184,55 @@ func (f *Field) unmarshal(s string, noEscapeChars, hasQuotedFields bool) error {
return nil
}
func unmarshalRows(dst []Row, s string, tagsPool []Tag, fieldsPool []Field) ([]Row, []Tag, []Field) {
func (rs *Rows) unmarshal(s string) error {
noEscapeChars := strings.IndexByte(s, '\\') < 0
for len(s) > 0 {
n := strings.IndexByte(s, '\n')
if n < 0 {
// The last line.
return unmarshalRow(dst, s, tagsPool, fieldsPool, noEscapeChars)
n = len(s)
}
err := rs.unmarshalRow(s[:n], noEscapeChars)
if err != nil {
if !rs.IgnoreErrs {
return fmt.Errorf("incorrect influx line %q: %w", s, err)
}
logger.Errorf("skipping InfluxDB line %q because of error: %s", s, err)
invalidLines.Inc()
}
if len(s) == n {
return nil
}
dst, tagsPool, fieldsPool = unmarshalRow(dst, s[:n], tagsPool, fieldsPool, noEscapeChars)
s = s[n+1:]
}
return dst, tagsPool, fieldsPool
return nil
}
func unmarshalRow(dst []Row, s string, tagsPool []Tag, fieldsPool []Field, noEscapeChars bool) ([]Row, []Tag, []Field) {
func (rs *Rows) unmarshalRow(s string, noEscapeChars bool) error {
if len(s) > 0 && s[len(s)-1] == '\r' {
s = s[:len(s)-1]
}
if len(s) == 0 {
// Skip empty line
return dst, tagsPool, fieldsPool
return nil
}
if s[0] == '#' {
// Skip comment
return dst, tagsPool, fieldsPool
return nil
}
if cap(dst) > len(dst) {
dst = dst[:len(dst)+1]
if cap(rs.Rows) > len(rs.Rows) {
rs.Rows = rs.Rows[:len(rs.Rows)+1]
} else {
dst = append(dst, Row{})
rs.Rows = append(rs.Rows, Row{})
}
r := &dst[len(dst)-1]
r := &rs.Rows[len(rs.Rows)-1]
var err error
tagsPool, fieldsPool, err = r.unmarshal(s, tagsPool, fieldsPool, noEscapeChars)
rs.tagsPool, rs.fieldsPool, err = r.unmarshal(s, rs.tagsPool, rs.fieldsPool, noEscapeChars)
if err != nil {
dst = dst[:len(dst)-1]
logger.Errorf("skipping InfluxDB line %q because of error: %s", s, err)
invalidLines.Inc()
rs.Rows = rs.Rows[:len(rs.Rows)-1]
}
return dst, tagsPool, fieldsPool
return err
}
var invalidLines = metrics.NewCounter(`vm_rows_invalid_total{type="influx"}`)

View file

@ -74,13 +74,16 @@ func TestRowsUnmarshalFailure(t *testing.T) {
f := func(s string) {
t.Helper()
var rows Rows
rows.Unmarshal(s)
err := rows.Unmarshal(s)
if err == nil {
t.Fatal("unexpected nil error")
}
if len(rows.Rows) != 0 {
t.Fatalf("expecting zero rows; got %d rows", len(rows.Rows))
}
// Try again
rows.Unmarshal(s)
_ = rows.Unmarshal(s)
if len(rows.Rows) != 0 {
t.Fatalf("expecting zero rows; got %d rows", len(rows.Rows))
}
@ -125,14 +128,17 @@ func TestRowsUnmarshalFailure(t *testing.T) {
func TestRowsUnmarshalSuccess(t *testing.T) {
f := func(s string, rowsExpected *Rows) {
t.Helper()
var rows Rows
rows.Unmarshal(s)
rows := Rows{IgnoreErrs: true}
err := rows.Unmarshal(s)
if err != nil {
t.Fatalf("unexpected err: %s", err)
}
if !reflect.DeepEqual(rows.Rows, rowsExpected.Rows) {
t.Fatalf("unexpected rows;\ngot\n%+v;\nwant\n%+v", rows.Rows, rowsExpected.Rows)
}
// Try unmarshaling again
rows.Unmarshal(s)
_ = rows.Unmarshal(s)
if !reflect.DeepEqual(rows.Rows, rowsExpected.Rows) {
t.Fatalf("unexpected rows;\ngot\n%+v;\nwant\n%+v", rows.Rows, rowsExpected.Rows)
}

View file

@ -16,7 +16,7 @@ bbb usage_user=1.23,usage_system=4.34,usage_iowait=0.1112 123455676344
b.RunParallel(func(pb *testing.PB) {
var rows Rows
for pb.Next() {
rows.Unmarshal(s)
_ = rows.Unmarshal(s)
if len(rows.Rows) != 4 {
panic(fmt.Errorf("unexpected number of rows parsed; got %d; want 4", len(rows.Rows)))
}

View file

@ -17,8 +17,9 @@ import (
)
var (
maxLineSize = flagutil.NewBytes("influx.maxLineSize", 256*1024, "The maximum size in bytes for a single InfluxDB line during parsing")
trimTimestamp = flag.Duration("influxTrimTimestamp", time.Millisecond, "Trim timestamps for InfluxDB line protocol data to this duration. "+
maxLineSize = flagutil.NewBytes("influx.maxLineSize", 256*1024, "The maximum size in bytes for a single InfluxDB line during parsing. Applicable for stream mode only. See https://docs.victoriametrics.com/#how-to-send-data-from-influxdb-compatible-agents-such-as-telegraf")
maxRequestSize = flagutil.NewBytes("influx.maxRequestSize", 64*1024*1024, "The maximum size in bytes of a single InfluxDB request. Applicable for batch mode only. See https://docs.victoriametrics.com/#how-to-send-data-from-influxdb-compatible-agents-such-as-telegraf")
trimTimestamp = flag.Duration("influxTrimTimestamp", time.Millisecond, "Trim timestamps for InfluxDB line protocol data to this duration. "+
"Minimum practical duration is 1ms. Higher duration (i.e. 1s) may be used for reducing disk space usage for timestamp data")
)
@ -27,7 +28,7 @@ var (
// The callback can be called concurrently multiple times for streamed data from r.
//
// callback shouldn't hold rows after returning.
func Parse(r io.Reader, isGzipped bool, precision, db string, callback func(db string, rows []influx.Row) error) error {
func Parse(r io.Reader, isStreamMode, isGzipped bool, precision, db string, callback func(db string, rows []influx.Row) error) error {
wcr := writeconcurrencylimiter.GetReader(r)
defer writeconcurrencylimiter.PutReader(wcr)
r = wcr
@ -57,6 +58,24 @@ func Parse(r io.Reader, isGzipped bool, precision, db string, callback func(db s
tsMultiplier = -1e3 * 3600
}
// processing payload altogether
// see https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7090
if !isStreamMode {
ctx := getBatchContext(r)
defer putBatchContext(ctx)
err := ctx.Read()
if err != nil {
return err
}
err = unmarshal(&ctx.rows, ctx.reqBuf.B, tsMultiplier)
if err != nil {
return fmt.Errorf("cannot parse influx line protocol data: %s; To skip invalid lines switch to stream mode by passing Stream-Mode: \"1\" header with each request", err)
}
return callback(db, ctx.rows.Rows)
}
// processing in a streaming fashion, line-by-line
// invalid lines are skipped
ctx := getStreamContext(r)
defer putStreamContext(ctx)
for ctx.Read() {
@ -77,6 +96,67 @@ func Parse(r io.Reader, isGzipped bool, precision, db string, callback func(db s
return ctx.callbackErr
}
var (
readCalls = metrics.NewCounter(`vm_protoparser_read_calls_total{type="influx"}`)
readErrors = metrics.NewCounter(`vm_protoparser_read_errors_total{type="influx"}`)
rowsRead = metrics.NewCounter(`vm_protoparser_rows_read_total{type="influx"}`)
)
type batchContext struct {
br *bufio.Reader
reqBuf bytesutil.ByteBuffer
rows influx.Rows
}
func (ctx *batchContext) Read() error {
readCalls.Inc()
lr := io.LimitReader(ctx.br, int64(maxRequestSize.IntN()))
reqLen, err := ctx.reqBuf.ReadFrom(lr)
if err != nil {
readErrors.Inc()
return err
} else if reqLen > int64(maxRequestSize.IntN()) {
readErrors.Inc()
return fmt.Errorf("too big request; mustn't exceed -influx.maxRequestSize=%d bytes", maxRequestSize.N)
}
return nil
}
func (ctx *batchContext) reset() {
ctx.br.Reset(nil)
ctx.reqBuf.Reset()
ctx.rows.Reset()
}
func getBatchContext(r io.Reader) *batchContext {
if v := batchContextPool.Get(); v != nil {
ctx := v.(*batchContext)
ctx.br.Reset(r)
return ctx
}
return &batchContext{
br: bufio.NewReaderSize(r, 64*1024),
}
}
func putBatchContext(ctx *batchContext) {
ctx.reset()
batchContextPool.Put(ctx)
}
var batchContextPool sync.Pool
type streamContext struct {
br *bufio.Reader
reqBuf []byte
tailBuf []byte
err error
wg sync.WaitGroup
callbackErrLock sync.Mutex
callbackErr error
}
func (ctx *streamContext) Read() bool {
readCalls.Inc()
if ctx.err != nil || ctx.hasCallbackError() {
@ -93,23 +173,6 @@ func (ctx *streamContext) Read() bool {
return true
}
var (
readCalls = metrics.NewCounter(`vm_protoparser_read_calls_total{type="influx"}`)
readErrors = metrics.NewCounter(`vm_protoparser_read_errors_total{type="influx"}`)
rowsRead = metrics.NewCounter(`vm_protoparser_rows_read_total{type="influx"}`)
)
type streamContext struct {
br *bufio.Reader
reqBuf []byte
tailBuf []byte
err error
wg sync.WaitGroup
callbackErrLock sync.Mutex
callbackErr error
}
func (ctx *streamContext) Error() error {
if ctx.err == io.EOF {
return nil
@ -168,12 +231,13 @@ func (uw *unmarshalWork) reset() {
uw.reqBuf = uw.reqBuf[:0]
}
func (uw *unmarshalWork) runCallback(rows []influx.Row) {
func (uw *unmarshalWork) runCallback() {
ctx := uw.ctx
if err := uw.callback(uw.db, rows); err != nil {
if err := uw.callback(uw.db, uw.rows.Rows); err != nil {
err = fmt.Errorf("error when processing imported data: %w", err)
ctx.callbackErrLock.Lock()
if ctx.callbackErr == nil {
ctx.callbackErr = fmt.Errorf("error when processing imported data: %w", err)
ctx.callbackErr = err
}
ctx.callbackErrLock.Unlock()
}
@ -182,13 +246,57 @@ func (uw *unmarshalWork) runCallback(rows []influx.Row) {
// Unmarshal implements common.UnmarshalWork
func (uw *unmarshalWork) Unmarshal() {
uw.rows.Unmarshal(bytesutil.ToUnsafeString(uw.reqBuf))
rows := uw.rows.Rows
_ = unmarshal(&uw.rows, uw.reqBuf, uw.tsMultiplier)
uw.runCallback()
putUnmarshalWork(uw)
}
func getUnmarshalWork() *unmarshalWork {
v := unmarshalWorkPool.Get()
if v == nil {
v = &unmarshalWork{}
}
uw := v.(*unmarshalWork)
uw.rows.IgnoreErrs = true
return uw
}
func putUnmarshalWork(uw *unmarshalWork) {
uw.reset()
unmarshalWorkPool.Put(uw)
}
var unmarshalWorkPool sync.Pool
func detectTimestamp(ts, currentTs int64) int64 {
if ts == 0 {
return currentTs
}
if ts >= 1e17 {
// convert nanoseconds to milliseconds
return ts / 1e6
}
if ts >= 1e14 {
// convert microseconds to milliseconds
return ts / 1e3
}
if ts >= 1e11 {
// the ts is in milliseconds
return ts
}
// convert seconds to milliseconds
return ts * 1e3
}
func unmarshal(rs *influx.Rows, reqBuf []byte, tsMultiplier int64) error {
// do not return error immediately because rs.Rows could contain
// successfully parsed rows that needs to be processed below
err := rs.Unmarshal(bytesutil.ToUnsafeString(reqBuf))
rows := rs.Rows
rowsRead.Add(len(rows))
// Adjust timestamps according to uw.tsMultiplier
currentTs := time.Now().UnixNano() / 1e6
tsMultiplier := uw.tsMultiplier
if tsMultiplier == 0 {
// Default precision is 'ns'. See https://docs.influxdata.com/influxdb/v1.7/write_protocols/line_protocol_tutorial/#timestamp
// But it can be in ns, us, ms or s depending on the number of digits in practice.
@ -225,42 +333,5 @@ func (uw *unmarshalWork) Unmarshal() {
row.Timestamp -= row.Timestamp % tsTrim
}
}
uw.runCallback(rows)
putUnmarshalWork(uw)
}
func getUnmarshalWork() *unmarshalWork {
v := unmarshalWorkPool.Get()
if v == nil {
return &unmarshalWork{}
}
return v.(*unmarshalWork)
}
func putUnmarshalWork(uw *unmarshalWork) {
uw.reset()
unmarshalWorkPool.Put(uw)
}
var unmarshalWorkPool sync.Pool
func detectTimestamp(ts, currentTs int64) int64 {
if ts == 0 {
return currentTs
}
if ts >= 1e17 {
// convert nanoseconds to milliseconds
return ts / 1e6
}
if ts >= 1e14 {
// convert microseconds to milliseconds
return ts / 1e3
}
if ts >= 1e11 {
// the ts is in milliseconds
return ts
}
// convert seconds to milliseconds
return ts * 1e3
return err
}

View file

@ -1,7 +1,14 @@
package stream
import (
"bytes"
"reflect"
"sort"
"sync"
"testing"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/influx"
)
func TestDetectTimestamp(t *testing.T) {
@ -28,3 +35,93 @@ func TestDetectTimestamp(t *testing.T) {
f(1e17, 1e11)
f(1e18, 1e12)
}
func TestParseStream(t *testing.T) {
common.StartUnmarshalWorkers()
defer common.StopUnmarshalWorkers()
f := func(data string, rowsExpected []influx.Row, isStreamMode bool, badData bool) {
t.Helper()
var wg sync.WaitGroup
wg.Add(len(rowsExpected))
buf := bytes.NewBuffer([]byte(data))
var rowsMu sync.Mutex
rows := make([]influx.Row, 0, len(rowsExpected))
cb := func(_ string, rs []influx.Row) error {
for _, r := range rs {
rowsMu.Lock()
rows = append(rows, influx.Row{
Measurement: r.Measurement,
Tags: append(make([]influx.Tag, 0, len(r.Tags)), r.Tags...),
Fields: append(make([]influx.Field, 0, len(r.Fields)), r.Fields...),
Timestamp: r.Timestamp,
})
rowsMu.Unlock()
wg.Done()
}
return nil
}
err := Parse(buf, isStreamMode, false, "ns", "test", cb)
wg.Wait()
if badData && !isStreamMode && err == nil {
t.Fatalf("expected error on bad data in batch mode")
}
sort.Slice(rows, func(i, j int) bool {
return rows[i].Measurement < rows[j].Measurement
})
if !reflect.DeepEqual(rows, rowsExpected) {
t.Fatalf("unexpected rows;\ngot\n%+v\nwant\n%+v", rows, rowsExpected)
}
}
goodData := `foo1,location=us-midwest1 temperature=81 1727879909390000000
foo2,location=us-midwest2 temperature=82 1727879909390000000
foo3,location=us-midwest3 temperature=83 1727879909390000000`
goodDataParsed := []influx.Row{
{
Measurement: "foo1",
Tags: []influx.Tag{{Key: "location", Value: "us-midwest1"}},
Fields: []influx.Field{{Key: "temperature", Value: 81}},
Timestamp: 1727879909390,
}, {
Measurement: "foo2",
Tags: []influx.Tag{{Key: "location", Value: "us-midwest2"}},
Fields: []influx.Field{{Key: "temperature", Value: 82}},
Timestamp: 1727879909390,
}, {
Measurement: "foo3",
Tags: []influx.Tag{{Key: "location", Value: "us-midwest3"}},
Fields: []influx.Field{{Key: "temperature", Value: 83}},
Timestamp: 1727879909390,
}}
//batch mode
f(goodData, goodDataParsed, false, false)
//stream mode
f(goodData, goodDataParsed, true, false)
badData := `foo1,location=us-midwest1 temperature=81 1727879909390000000
foo2, ,location=us-midwest2 temperature=82 1727879909390000000
foo3,location=us-midwest3 temperature=83 1727879909390000000`
badDataParsed := []influx.Row{{
Measurement: "foo1",
Tags: []influx.Tag{{Key: "location", Value: "us-midwest1"}},
Fields: []influx.Field{{Key: "temperature", Value: 81}},
Timestamp: 1727879909390,
}, {
Measurement: "foo3",
Tags: []influx.Tag{{Key: "location", Value: "us-midwest3"}},
Fields: []influx.Field{{Key: "temperature", Value: 83}},
Timestamp: 1727879909390,
}}
// batch mode with errors
f(badData, []influx.Row{}, false, true)
// stream mode with errors
f(badData, badDataParsed, true, false)
}