diff --git a/app/vmagent/influx/request_handler.go b/app/vmagent/influx/request_handler.go index b8255e86f..0c0cd309a 100644 --- a/app/vmagent/influx/request_handler.go +++ b/app/vmagent/influx/request_handler.go @@ -36,7 +36,7 @@ var ( // // See https://github.com/influxdata/telegraf/tree/master/plugins/inputs/socket_listener/ func InsertHandlerForReader(at *auth.Token, r io.Reader, isGzipped bool) error { - return stream.Parse(r, isGzipped, "", "", func(db string, rows []parser.Row) error { + return stream.Parse(r, true, isGzipped, "", "", func(db string, rows []parser.Row) error { return insertRows(at, db, rows, nil) }) } @@ -50,11 +50,12 @@ func InsertHandlerForHTTP(at *auth.Token, req *http.Request) error { return err } isGzipped := req.Header.Get("Content-Encoding") == "gzip" + isStreamMode := req.Header.Get("Stream-Mode") == "1" q := req.URL.Query() precision := q.Get("precision") // Read db tag from https://docs.influxdata.com/influxdb/v1.7/tools/api/#write-http-endpoint db := q.Get("db") - return stream.Parse(req.Body, isGzipped, precision, db, func(db string, rows []parser.Row) error { + return stream.Parse(req.Body, isStreamMode, isGzipped, precision, db, func(db string, rows []parser.Row) error { return insertRows(at, db, rows, extraLabels) }) } diff --git a/app/vminsert/influx/request_handler.go b/app/vminsert/influx/request_handler.go index 9e9d174b0..632501814 100644 --- a/app/vminsert/influx/request_handler.go +++ b/app/vminsert/influx/request_handler.go @@ -34,7 +34,7 @@ var ( // // See https://github.com/influxdata/telegraf/tree/master/plugins/inputs/socket_listener/ func InsertHandlerForReader(r io.Reader) error { - return stream.Parse(r, false, "", "", func(db string, rows []parser.Row) error { + return stream.Parse(r, true, false, "", "", func(db string, rows []parser.Row) error { return insertRows(db, rows, nil) }) } @@ -48,11 +48,12 @@ func InsertHandlerForHTTP(req *http.Request) error { return err } isGzipped := req.Header.Get("Content-Encoding") == "gzip" + isStreamMode := req.Header.Get("Stream-Mode") == "1" q := req.URL.Query() precision := q.Get("precision") // Read db tag from https://docs.influxdata.com/influxdb/v1.7/tools/api/#write-http-endpoint db := q.Get("db") - return stream.Parse(req.Body, isGzipped, precision, db, func(db string, rows []parser.Row) error { + return stream.Parse(req.Body, isStreamMode, isGzipped, precision, db, func(db string, rows []parser.Row) error { return insertRows(db, rows, extraLabels) }) } diff --git a/docs/Cluster-VictoriaMetrics.md b/docs/Cluster-VictoriaMetrics.md index edfe8f843..271589adf 100644 --- a/docs/Cluster-VictoriaMetrics.md +++ b/docs/Cluster-VictoriaMetrics.md @@ -1185,8 +1185,11 @@ Below is the output for `/path/to/vminsert -help`: Supports an array of values separated by comma or specified via multiple flags. Value can contain comma inside single-quoted or double-quoted string, {}, [] and () braces. -influx.maxLineSize size - The maximum size in bytes for a single InfluxDB line during parsing + The maximum size in bytes for a single InfluxDB line during parsing. Applicable for stream mode only. See https://docs.victoriametrics.com/#how-to-send-data-from-influxdb-compatible-agents-such-as-telegraf Supports the following optional suffixes for size values: KB, MB, GB, TB, KiB, MiB, GiB, TiB (default 262144) + -influx.maxRequestSize size + The maximum size in bytes of a single InfluxDB request. Applicable for batch mode only. See https://docs.victoriametrics.com/#how-to-send-data-from-influxdb-compatible-agents-such-as-telegraf + Supports the following optional suffixes for size values: KB, MB, GB, TB, KiB, MiB, GiB, TiB (default 67108864) -influxDBLabel string Default label for the DB name sent over '?db={db_name}' query parameter (default "db") -influxListenAddr string diff --git a/docs/README.md b/docs/README.md index 1dc32b45f..c45b1003d 100644 --- a/docs/README.md +++ b/docs/README.md @@ -580,7 +580,7 @@ See [these docs](https://docs.victoriametrics.com/vmagent/#adding-labels-to-metr ## How to send data from InfluxDB-compatible agents such as [Telegraf](https://www.influxdata.com/time-series-platform/telegraf/) -Use `http://:8428` url instead of InfluxDB url in agents' configs. +Use `http://:8428` URL instead of InfluxDB url in agents' configs. For instance, put the following lines into `Telegraf` config, so it sends data to VictoriaMetrics instead of InfluxDB: ```toml @@ -588,17 +588,33 @@ For instance, put the following lines into `Telegraf` config, so it sends data t urls = ["http://:8428"] ``` +Or in case of [`http`](https://github.com/influxdata/telegraf/blob/master/plugins/outputs/http) output + +```toml +[[outputs.http]] + url = "http://:8428/influx/write" + data_format = "influx" + non_retryable_statuscodes = [400] +``` + +The size of the request sent to VictoriaMetrics's Influx HTTP endpoints is limited by `-influx.maxRequestSize` (default: 64Mb). +For better ingestion speed and lower memory usage, VM can be switched to stream processing mode by setting `Stream-Mode: "1"` +HTTP header with each request. Please note, in streaming mode VictoriaMetrics processes workload line-by-line (see `-influx.maxLineSize`), +it ignores invalid rows (only logs them) and ingests successfully parsed rows. If client cancels the ingestion request +due to timeout or other reasons, it could happen that some lines from the workload were already parsed and ingested. + Another option is to enable TCP and UDP receiver for InfluxDB line protocol via `-influxListenAddr` command-line flag -and stream plain InfluxDB line protocol data to the configured TCP and/or UDP addresses. +and stream plain InfluxDB line protocol data to the configured TCP and/or UDP addresses. TCP and UDP receivers are +only working in streaming mode. VictoriaMetrics performs the following transformations to the ingested InfluxDB data: - * [db query arg](https://docs.influxdata.com/influxdb/v1.7/tools/api/#write-http-endpoint) is mapped into `db` [label](https://docs.victoriametrics.com/keyconcepts/#labels) value unless `db` tag exists in the InfluxDB line. The `db` label name can be overridden via `-influxDBLabel` command-line flag. If more strict data isolation is required, read more about multi-tenancy [here](https://docs.victoriametrics.com/keyconcepts/#multi-tenancy). * Field names are mapped to time series names prefixed with `{measurement}{separator}` value, where `{separator}` equals to `_` by default. It can be changed with `-influxMeasurementFieldSeparator` command-line flag. See also `-influxSkipSingleField` command-line flag. If `{measurement}` is empty or if `-influxSkipMeasurement` command-line flag is set, then time series names correspond to field names. * Field values are mapped to time series values. +* Non-numeric field values are converted to 0. * Tags are mapped to Prometheus labels as-is. * If `-usePromCompatibleNaming` command-line flag is set, then all the metric names and label names are normalized to [Prometheus-compatible naming](https://prometheus.io/docs/concepts/data_model/#metric-names-and-labels) by replacing unsupported chars with `_`. @@ -620,21 +636,17 @@ foo_field2{tag1="value1", tag2="value2"} 40 Example for writing data with [InfluxDB line protocol](https://docs.influxdata.com/influxdb/v1.7/write_protocols/line_protocol_tutorial/) to local VictoriaMetrics using `curl`: - ```sh curl -d 'measurement,tag1=value1,tag2=value2 field1=123,field2=1.23' -X POST 'http://localhost:8428/write' ``` - An arbitrary number of lines delimited by '\n' (aka newline char) can be sent in a single request. After that the data may be read via [/api/v1/export](#how-to-export-data-in-json-line-format) endpoint: - ```sh curl -G 'http://localhost:8428/api/v1/export' -d 'match={__name__=~"measurement_.*"}' ``` - The `/api/v1/export` endpoint should return the following response: ```json @@ -1359,7 +1371,7 @@ Additionally, VictoriaMetrics can accept metrics via the following popular data * `/api/v1/import/prometheus` for importing data in Prometheus exposition format and in [Pushgateway format](https://github.com/prometheus/pushgateway#url). See [these docs](#how-to-import-data-in-prometheus-exposition-format) for details. -Please note, most of the ingestion APIs (except [Prometheus remote_write API](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#remote_write) and [OpenTelemetry](#sending-data-via-opentelemetry)) +Please note, most of the ingestion APIs (except [Prometheus remote_write API](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#remote_write), [OpenTelemetry](#sending-data-via-opentelemetry) and [Influx Line Protocol](https://docs.victoriametrics.com/#how-to-send-data-from-influxdb-compatible-agents-such-as-telegraf)) are optimized for performance and processes data in a streaming fashion. It means that client can transfer unlimited amount of data through the open connection. Because of this, import APIs may not return parsing errors to the client, as it is expected for data stream to be not interrupted. @@ -2875,8 +2887,11 @@ Pass `-help` to VictoriaMetrics in order to see the list of supported command-li Supports an array of values separated by comma or specified via multiple flags. Value can contain comma inside single-quoted or double-quoted string, {}, [] and () braces. -influx.maxLineSize size - The maximum size in bytes for a single InfluxDB line during parsing + The maximum size in bytes for a single InfluxDB line during parsing. Applicable for stream mode only. See https://docs.victoriametrics.com/#how-to-send-data-from-influxdb-compatible-agents-such-as-telegraf Supports the following optional suffixes for size values: KB, MB, GB, TB, KiB, MiB, GiB, TiB (default 262144) + -influx.maxRequestSize size + The maximum size in bytes of a single InfluxDB request. Applicable for batch mode only. See https://docs.victoriametrics.com/#how-to-send-data-from-influxdb-compatible-agents-such-as-telegraf + Supports the following optional suffixes for size values: KB, MB, GB, TB, KiB, MiB, GiB, TiB (default 67108864) -influxDBLabel string Default label for the DB name sent over '?db={db_name}' query parameter (default "db") -influxListenAddr string diff --git a/docs/changelog/CHANGELOG.md b/docs/changelog/CHANGELOG.md index f65d4e085..7c03d9b4a 100644 --- a/docs/changelog/CHANGELOG.md +++ b/docs/changelog/CHANGELOG.md @@ -22,6 +22,7 @@ See also [LTS releases](https://docs.victoriametrics.com/lts-releases/). * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent/): allow using HTTP/2 client for Kubernetes service discovery if `-promscrape.kubernetes.useHTTP2Client` cmd-line flag is set. This could help to reduce the amount of opened connections to the Kubernetes API server. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5971) for the details. * FEATURE: [vmalert](https://docs.victoriametrics.com/vmalert/): `-rule` cmd-line flag now supports multi-document YAML files. This could be useful when rules are retrieved via HTTP URL where multiple rule files were merged together in one response. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6753). Thanks to @Irene-123 for [the pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6995). * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent/) and [Single-node VictoriaMetrics](https://docs.victoriametrics.com/): add support of [exponential histograms](https://opentelemetry.io/docs/specs/otel/metrics/data-model/#exponentialhistogram) ingested via [OpenTelemetry protocol for metrics](https://docs.victoriametrics.com/#sending-data-via-opentelemetry). Such histograms will be automatically converted to [VictoriaMetrics histogram format](https://valyala.medium.com/improving-histogram-usability-for-prometheus-and-grafana-bc7e5df0e350). See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6354). +* FEATURE: [vmsingle](https://docs.victoriametrics.com/single-server-victoriametrics/), `vminsert` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/cluster-victoriametrics/) and [vmagent](https://docs.victoriametrics.com/vmagent/): disable stream processing mode for data [ingested via InfluxDB](https://docs.victoriametrics.com/#how-to-send-data-from-influxdb-compatible-agents-such-as-telegraf) HTTP endpoints by default. With this change, the data is processed in batches (see `-influx.maxRequestSize`) and user will get parsing errors immediately as they happen. This also improves users' experience and resiliency against thundering herd problems caused by clients without backoff policies like telegraf. To enable stream mode back, pass HTTP header `Stream-Mode: "1"` with each request. For data sent via TCP and UDP (see `-influxListenAddr`) protocols streaming processing remains enabled. ## [v1.104.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.104.0) diff --git a/docs/vmagent.md b/docs/vmagent.md index 64322da1e..cf1894aed 100644 --- a/docs/vmagent.md +++ b/docs/vmagent.md @@ -1785,8 +1785,11 @@ See the docs at https://docs.victoriametrics.com/vmagent/ . Supports an array of values separated by comma or specified via multiple flags. Value can contain comma inside single-quoted or double-quoted string, {}, [] and () braces. -influx.maxLineSize size - The maximum size in bytes for a single InfluxDB line during parsing + The maximum size in bytes for a single InfluxDB line during parsing. Applicable for stream mode only. See https://docs.victoriametrics.com/#how-to-send-data-from-influxdb-compatible-agents-such-as-telegraf Supports the following optional suffixes for size values: KB, MB, GB, TB, KiB, MiB, GiB, TiB (default 262144) + -influx.maxRequestSize size + The maximum size in bytes of a single InfluxDB request. Applicable for batch mode only. See https://docs.victoriametrics.com/#how-to-send-data-from-influxdb-compatible-agents-such-as-telegraf + Supports the following optional suffixes for size values: KB, MB, GB, TB, KiB, MiB, GiB, TiB (default 67108864) -influxDBLabel string Default label for the DB name sent over '?db={db_name}' query parameter (default "db") -influxListenAddr string diff --git a/lib/protoparser/influx/parser.go b/lib/protoparser/influx/parser.go index 97f2f344d..98dc4b569 100644 --- a/lib/protoparser/influx/parser.go +++ b/lib/protoparser/influx/parser.go @@ -11,7 +11,8 @@ import ( // Rows contains parsed influx rows. type Rows struct { - Rows []Row + Rows []Row + IgnoreErrs bool tagsPool []Tag fieldsPool []Field @@ -43,8 +44,15 @@ func (rs *Rows) Reset() { // See https://docs.influxdata.com/influxdb/v1.7/write_protocols/line_protocol_tutorial/ // // s shouldn't be modified when rs is in use. -func (rs *Rows) Unmarshal(s string) { - rs.Rows, rs.tagsPool, rs.fieldsPool = unmarshalRows(rs.Rows[:0], s, rs.tagsPool[:0], rs.fieldsPool[:0]) +func (rs *Rows) Unmarshal(s string) error { + rs.reset() + return rs.unmarshal(s) +} + +func (rs *Rows) reset() { + rs.Rows = rs.Rows[:0] + rs.tagsPool = rs.tagsPool[:0] + rs.fieldsPool = rs.fieldsPool[:0] } // Row is a single influx row. @@ -176,47 +184,55 @@ func (f *Field) unmarshal(s string, noEscapeChars, hasQuotedFields bool) error { return nil } -func unmarshalRows(dst []Row, s string, tagsPool []Tag, fieldsPool []Field) ([]Row, []Tag, []Field) { +func (rs *Rows) unmarshal(s string) error { noEscapeChars := strings.IndexByte(s, '\\') < 0 for len(s) > 0 { n := strings.IndexByte(s, '\n') if n < 0 { // The last line. - return unmarshalRow(dst, s, tagsPool, fieldsPool, noEscapeChars) + n = len(s) + } + err := rs.unmarshalRow(s[:n], noEscapeChars) + if err != nil { + if !rs.IgnoreErrs { + return fmt.Errorf("incorrect influx line %q: %w", s, err) + } + logger.Errorf("skipping InfluxDB line %q because of error: %s", s, err) + invalidLines.Inc() + } + if len(s) == n { + return nil } - dst, tagsPool, fieldsPool = unmarshalRow(dst, s[:n], tagsPool, fieldsPool, noEscapeChars) s = s[n+1:] } - return dst, tagsPool, fieldsPool + return nil } -func unmarshalRow(dst []Row, s string, tagsPool []Tag, fieldsPool []Field, noEscapeChars bool) ([]Row, []Tag, []Field) { +func (rs *Rows) unmarshalRow(s string, noEscapeChars bool) error { if len(s) > 0 && s[len(s)-1] == '\r' { s = s[:len(s)-1] } if len(s) == 0 { // Skip empty line - return dst, tagsPool, fieldsPool + return nil } if s[0] == '#' { // Skip comment - return dst, tagsPool, fieldsPool + return nil } - if cap(dst) > len(dst) { - dst = dst[:len(dst)+1] + if cap(rs.Rows) > len(rs.Rows) { + rs.Rows = rs.Rows[:len(rs.Rows)+1] } else { - dst = append(dst, Row{}) + rs.Rows = append(rs.Rows, Row{}) } - r := &dst[len(dst)-1] + r := &rs.Rows[len(rs.Rows)-1] var err error - tagsPool, fieldsPool, err = r.unmarshal(s, tagsPool, fieldsPool, noEscapeChars) + rs.tagsPool, rs.fieldsPool, err = r.unmarshal(s, rs.tagsPool, rs.fieldsPool, noEscapeChars) if err != nil { - dst = dst[:len(dst)-1] - logger.Errorf("skipping InfluxDB line %q because of error: %s", s, err) - invalidLines.Inc() + rs.Rows = rs.Rows[:len(rs.Rows)-1] } - return dst, tagsPool, fieldsPool + return err } var invalidLines = metrics.NewCounter(`vm_rows_invalid_total{type="influx"}`) diff --git a/lib/protoparser/influx/parser_test.go b/lib/protoparser/influx/parser_test.go index 5240a78c6..5ed0fcdcb 100644 --- a/lib/protoparser/influx/parser_test.go +++ b/lib/protoparser/influx/parser_test.go @@ -74,13 +74,16 @@ func TestRowsUnmarshalFailure(t *testing.T) { f := func(s string) { t.Helper() var rows Rows - rows.Unmarshal(s) + err := rows.Unmarshal(s) + if err == nil { + t.Fatal("unexpected nil error") + } if len(rows.Rows) != 0 { t.Fatalf("expecting zero rows; got %d rows", len(rows.Rows)) } // Try again - rows.Unmarshal(s) + _ = rows.Unmarshal(s) if len(rows.Rows) != 0 { t.Fatalf("expecting zero rows; got %d rows", len(rows.Rows)) } @@ -125,14 +128,17 @@ func TestRowsUnmarshalFailure(t *testing.T) { func TestRowsUnmarshalSuccess(t *testing.T) { f := func(s string, rowsExpected *Rows) { t.Helper() - var rows Rows - rows.Unmarshal(s) + rows := Rows{IgnoreErrs: true} + err := rows.Unmarshal(s) + if err != nil { + t.Fatalf("unexpected err: %s", err) + } if !reflect.DeepEqual(rows.Rows, rowsExpected.Rows) { t.Fatalf("unexpected rows;\ngot\n%+v;\nwant\n%+v", rows.Rows, rowsExpected.Rows) } // Try unmarshaling again - rows.Unmarshal(s) + _ = rows.Unmarshal(s) if !reflect.DeepEqual(rows.Rows, rowsExpected.Rows) { t.Fatalf("unexpected rows;\ngot\n%+v;\nwant\n%+v", rows.Rows, rowsExpected.Rows) } diff --git a/lib/protoparser/influx/parser_timing_test.go b/lib/protoparser/influx/parser_timing_test.go index 21db9b583..9e7c8ae75 100644 --- a/lib/protoparser/influx/parser_timing_test.go +++ b/lib/protoparser/influx/parser_timing_test.go @@ -16,7 +16,7 @@ bbb usage_user=1.23,usage_system=4.34,usage_iowait=0.1112 123455676344 b.RunParallel(func(pb *testing.PB) { var rows Rows for pb.Next() { - rows.Unmarshal(s) + _ = rows.Unmarshal(s) if len(rows.Rows) != 4 { panic(fmt.Errorf("unexpected number of rows parsed; got %d; want 4", len(rows.Rows))) } diff --git a/lib/protoparser/influx/stream/streamparser.go b/lib/protoparser/influx/stream/streamparser.go index a1df19aa2..c8c5f4a34 100644 --- a/lib/protoparser/influx/stream/streamparser.go +++ b/lib/protoparser/influx/stream/streamparser.go @@ -17,8 +17,9 @@ import ( ) var ( - maxLineSize = flagutil.NewBytes("influx.maxLineSize", 256*1024, "The maximum size in bytes for a single InfluxDB line during parsing") - trimTimestamp = flag.Duration("influxTrimTimestamp", time.Millisecond, "Trim timestamps for InfluxDB line protocol data to this duration. "+ + maxLineSize = flagutil.NewBytes("influx.maxLineSize", 256*1024, "The maximum size in bytes for a single InfluxDB line during parsing. Applicable for stream mode only. See https://docs.victoriametrics.com/#how-to-send-data-from-influxdb-compatible-agents-such-as-telegraf") + maxRequestSize = flagutil.NewBytes("influx.maxRequestSize", 64*1024*1024, "The maximum size in bytes of a single InfluxDB request. Applicable for batch mode only. See https://docs.victoriametrics.com/#how-to-send-data-from-influxdb-compatible-agents-such-as-telegraf") + trimTimestamp = flag.Duration("influxTrimTimestamp", time.Millisecond, "Trim timestamps for InfluxDB line protocol data to this duration. "+ "Minimum practical duration is 1ms. Higher duration (i.e. 1s) may be used for reducing disk space usage for timestamp data") ) @@ -27,7 +28,7 @@ var ( // The callback can be called concurrently multiple times for streamed data from r. // // callback shouldn't hold rows after returning. -func Parse(r io.Reader, isGzipped bool, precision, db string, callback func(db string, rows []influx.Row) error) error { +func Parse(r io.Reader, isStreamMode, isGzipped bool, precision, db string, callback func(db string, rows []influx.Row) error) error { wcr := writeconcurrencylimiter.GetReader(r) defer writeconcurrencylimiter.PutReader(wcr) r = wcr @@ -57,6 +58,24 @@ func Parse(r io.Reader, isGzipped bool, precision, db string, callback func(db s tsMultiplier = -1e3 * 3600 } + // processing payload altogether + // see https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7090 + if !isStreamMode { + ctx := getBatchContext(r) + defer putBatchContext(ctx) + err := ctx.Read() + if err != nil { + return err + } + err = unmarshal(&ctx.rows, ctx.reqBuf.B, tsMultiplier) + if err != nil { + return fmt.Errorf("cannot parse influx line protocol data: %s; To skip invalid lines switch to stream mode by passing Stream-Mode: \"1\" header with each request", err) + } + return callback(db, ctx.rows.Rows) + } + + // processing in a streaming fashion, line-by-line + // invalid lines are skipped ctx := getStreamContext(r) defer putStreamContext(ctx) for ctx.Read() { @@ -77,6 +96,67 @@ func Parse(r io.Reader, isGzipped bool, precision, db string, callback func(db s return ctx.callbackErr } +var ( + readCalls = metrics.NewCounter(`vm_protoparser_read_calls_total{type="influx"}`) + readErrors = metrics.NewCounter(`vm_protoparser_read_errors_total{type="influx"}`) + rowsRead = metrics.NewCounter(`vm_protoparser_rows_read_total{type="influx"}`) +) + +type batchContext struct { + br *bufio.Reader + reqBuf bytesutil.ByteBuffer + rows influx.Rows +} + +func (ctx *batchContext) Read() error { + readCalls.Inc() + lr := io.LimitReader(ctx.br, int64(maxRequestSize.IntN())) + reqLen, err := ctx.reqBuf.ReadFrom(lr) + if err != nil { + readErrors.Inc() + return err + } else if reqLen > int64(maxRequestSize.IntN()) { + readErrors.Inc() + return fmt.Errorf("too big request; mustn't exceed -influx.maxRequestSize=%d bytes", maxRequestSize.N) + } + return nil +} + +func (ctx *batchContext) reset() { + ctx.br.Reset(nil) + ctx.reqBuf.Reset() + ctx.rows.Reset() +} + +func getBatchContext(r io.Reader) *batchContext { + if v := batchContextPool.Get(); v != nil { + ctx := v.(*batchContext) + ctx.br.Reset(r) + return ctx + } + return &batchContext{ + br: bufio.NewReaderSize(r, 64*1024), + } +} + +func putBatchContext(ctx *batchContext) { + ctx.reset() + batchContextPool.Put(ctx) +} + +var batchContextPool sync.Pool + +type streamContext struct { + br *bufio.Reader + reqBuf []byte + tailBuf []byte + err error + + wg sync.WaitGroup + callbackErrLock sync.Mutex + callbackErr error +} + func (ctx *streamContext) Read() bool { readCalls.Inc() if ctx.err != nil || ctx.hasCallbackError() { @@ -93,23 +173,6 @@ func (ctx *streamContext) Read() bool { return true } -var ( - readCalls = metrics.NewCounter(`vm_protoparser_read_calls_total{type="influx"}`) - readErrors = metrics.NewCounter(`vm_protoparser_read_errors_total{type="influx"}`) - rowsRead = metrics.NewCounter(`vm_protoparser_rows_read_total{type="influx"}`) -) - -type streamContext struct { - br *bufio.Reader - reqBuf []byte - tailBuf []byte - err error - - wg sync.WaitGroup - callbackErrLock sync.Mutex - callbackErr error -} - func (ctx *streamContext) Error() error { if ctx.err == io.EOF { return nil @@ -168,12 +231,13 @@ func (uw *unmarshalWork) reset() { uw.reqBuf = uw.reqBuf[:0] } -func (uw *unmarshalWork) runCallback(rows []influx.Row) { +func (uw *unmarshalWork) runCallback() { ctx := uw.ctx - if err := uw.callback(uw.db, rows); err != nil { + if err := uw.callback(uw.db, uw.rows.Rows); err != nil { + err = fmt.Errorf("error when processing imported data: %w", err) ctx.callbackErrLock.Lock() if ctx.callbackErr == nil { - ctx.callbackErr = fmt.Errorf("error when processing imported data: %w", err) + ctx.callbackErr = err } ctx.callbackErrLock.Unlock() } @@ -182,13 +246,57 @@ func (uw *unmarshalWork) runCallback(rows []influx.Row) { // Unmarshal implements common.UnmarshalWork func (uw *unmarshalWork) Unmarshal() { - uw.rows.Unmarshal(bytesutil.ToUnsafeString(uw.reqBuf)) - rows := uw.rows.Rows + _ = unmarshal(&uw.rows, uw.reqBuf, uw.tsMultiplier) + uw.runCallback() + putUnmarshalWork(uw) +} + +func getUnmarshalWork() *unmarshalWork { + v := unmarshalWorkPool.Get() + if v == nil { + v = &unmarshalWork{} + } + uw := v.(*unmarshalWork) + uw.rows.IgnoreErrs = true + return uw +} + +func putUnmarshalWork(uw *unmarshalWork) { + uw.reset() + unmarshalWorkPool.Put(uw) +} + +var unmarshalWorkPool sync.Pool + +func detectTimestamp(ts, currentTs int64) int64 { + if ts == 0 { + return currentTs + } + if ts >= 1e17 { + // convert nanoseconds to milliseconds + return ts / 1e6 + } + if ts >= 1e14 { + // convert microseconds to milliseconds + return ts / 1e3 + } + if ts >= 1e11 { + // the ts is in milliseconds + return ts + } + // convert seconds to milliseconds + return ts * 1e3 +} + +func unmarshal(rs *influx.Rows, reqBuf []byte, tsMultiplier int64) error { + // do not return error immediately because rs.Rows could contain + // successfully parsed rows that needs to be processed below + err := rs.Unmarshal(bytesutil.ToUnsafeString(reqBuf)) + rows := rs.Rows rowsRead.Add(len(rows)) // Adjust timestamps according to uw.tsMultiplier currentTs := time.Now().UnixNano() / 1e6 - tsMultiplier := uw.tsMultiplier if tsMultiplier == 0 { // Default precision is 'ns'. See https://docs.influxdata.com/influxdb/v1.7/write_protocols/line_protocol_tutorial/#timestamp // But it can be in ns, us, ms or s depending on the number of digits in practice. @@ -225,42 +333,5 @@ func (uw *unmarshalWork) Unmarshal() { row.Timestamp -= row.Timestamp % tsTrim } } - - uw.runCallback(rows) - putUnmarshalWork(uw) -} - -func getUnmarshalWork() *unmarshalWork { - v := unmarshalWorkPool.Get() - if v == nil { - return &unmarshalWork{} - } - return v.(*unmarshalWork) -} - -func putUnmarshalWork(uw *unmarshalWork) { - uw.reset() - unmarshalWorkPool.Put(uw) -} - -var unmarshalWorkPool sync.Pool - -func detectTimestamp(ts, currentTs int64) int64 { - if ts == 0 { - return currentTs - } - if ts >= 1e17 { - // convert nanoseconds to milliseconds - return ts / 1e6 - } - if ts >= 1e14 { - // convert microseconds to milliseconds - return ts / 1e3 - } - if ts >= 1e11 { - // the ts is in milliseconds - return ts - } - // convert seconds to milliseconds - return ts * 1e3 + return err } diff --git a/lib/protoparser/influx/stream/streamparser_test.go b/lib/protoparser/influx/stream/streamparser_test.go index 4722d17fa..4834a6346 100644 --- a/lib/protoparser/influx/stream/streamparser_test.go +++ b/lib/protoparser/influx/stream/streamparser_test.go @@ -1,7 +1,14 @@ package stream import ( + "bytes" + "reflect" + "sort" + "sync" "testing" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/influx" ) func TestDetectTimestamp(t *testing.T) { @@ -28,3 +35,93 @@ func TestDetectTimestamp(t *testing.T) { f(1e17, 1e11) f(1e18, 1e12) } + +func TestParseStream(t *testing.T) { + common.StartUnmarshalWorkers() + defer common.StopUnmarshalWorkers() + + f := func(data string, rowsExpected []influx.Row, isStreamMode bool, badData bool) { + t.Helper() + + var wg sync.WaitGroup + wg.Add(len(rowsExpected)) + buf := bytes.NewBuffer([]byte(data)) + + var rowsMu sync.Mutex + rows := make([]influx.Row, 0, len(rowsExpected)) + + cb := func(_ string, rs []influx.Row) error { + for _, r := range rs { + rowsMu.Lock() + rows = append(rows, influx.Row{ + Measurement: r.Measurement, + Tags: append(make([]influx.Tag, 0, len(r.Tags)), r.Tags...), + Fields: append(make([]influx.Field, 0, len(r.Fields)), r.Fields...), + Timestamp: r.Timestamp, + }) + rowsMu.Unlock() + wg.Done() + } + return nil + } + + err := Parse(buf, isStreamMode, false, "ns", "test", cb) + wg.Wait() + + if badData && !isStreamMode && err == nil { + t.Fatalf("expected error on bad data in batch mode") + } + + sort.Slice(rows, func(i, j int) bool { + return rows[i].Measurement < rows[j].Measurement + }) + if !reflect.DeepEqual(rows, rowsExpected) { + t.Fatalf("unexpected rows;\ngot\n%+v\nwant\n%+v", rows, rowsExpected) + } + } + goodData := `foo1,location=us-midwest1 temperature=81 1727879909390000000 +foo2,location=us-midwest2 temperature=82 1727879909390000000 +foo3,location=us-midwest3 temperature=83 1727879909390000000` + goodDataParsed := []influx.Row{ + { + Measurement: "foo1", + Tags: []influx.Tag{{Key: "location", Value: "us-midwest1"}}, + Fields: []influx.Field{{Key: "temperature", Value: 81}}, + Timestamp: 1727879909390, + }, { + Measurement: "foo2", + Tags: []influx.Tag{{Key: "location", Value: "us-midwest2"}}, + Fields: []influx.Field{{Key: "temperature", Value: 82}}, + Timestamp: 1727879909390, + }, { + Measurement: "foo3", + Tags: []influx.Tag{{Key: "location", Value: "us-midwest3"}}, + Fields: []influx.Field{{Key: "temperature", Value: 83}}, + Timestamp: 1727879909390, + }} + + //batch mode + f(goodData, goodDataParsed, false, false) + //stream mode + f(goodData, goodDataParsed, true, false) + + badData := `foo1,location=us-midwest1 temperature=81 1727879909390000000 +foo2, ,location=us-midwest2 temperature=82 1727879909390000000 +foo3,location=us-midwest3 temperature=83 1727879909390000000` + badDataParsed := []influx.Row{{ + Measurement: "foo1", + Tags: []influx.Tag{{Key: "location", Value: "us-midwest1"}}, + Fields: []influx.Field{{Key: "temperature", Value: 81}}, + Timestamp: 1727879909390, + }, { + Measurement: "foo3", + Tags: []influx.Tag{{Key: "location", Value: "us-midwest3"}}, + Fields: []influx.Field{{Key: "temperature", Value: 83}}, + Timestamp: 1727879909390, + }} + + // batch mode with errors + f(badData, []influx.Row{}, false, true) + // stream mode with errors + f(badData, badDataParsed, true, false) +}