mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
app/{vminsert,vmagent}: add ability to import data in Prometheus exposition format via /api/v1/import/prometheus
This commit is contained in:
parent
6fe3c48a6e
commit
cba820e390
10 changed files with 428 additions and 0 deletions
31
README.md
31
README.md
|
@ -99,6 +99,8 @@ See [features available for enterprise customers](https://github.com/VictoriaMet
|
|||
* [How to send data from Graphite-compatible agents such as StatsD](#how-to-send-data-from-graphite-compatible-agents-such-as-statsd)
|
||||
* [Querying Graphite data](#querying-graphite-data)
|
||||
* [How to send data from OpenTSDB-compatible agents](#how-to-send-data-from-opentsdb-compatible-agents)
|
||||
* [How to import data in Prometheus exposition format](#how-to-import-data-in-prometheus-exposition-format)
|
||||
* [How to import CSV data](#how-to-import-csv-data)
|
||||
* [Prometheus querying API usage](#prometheus-querying-api-usage)
|
||||
* [How to build from sources](#how-to-build-from-sources)
|
||||
* [Development build](#development-build)
|
||||
|
@ -284,6 +286,8 @@ Currently the following [scrape_config](https://prometheus.io/docs/prometheus/la
|
|||
|
||||
In the future other `*_sd_config` types will be supported.
|
||||
|
||||
VictoriaMetrics also supports [importing data in Prometheus exposition format](#how-to-import-data-in-prometheus-exposition-format).
|
||||
|
||||
See also [vmagent](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/app/vmagent/README.md), which can be used as drop-in replacement for Prometheus.
|
||||
|
||||
### How to send data from InfluxDB-compatible agents such as [Telegraf](https://www.influxdata.com/time-series-platform/telegraf/)
|
||||
|
@ -509,6 +513,32 @@ The following response should be returned:
|
|||
Note that it could be required to flush response cache after importing historical data. See [these docs](#backfilling) for detail.
|
||||
|
||||
|
||||
### How to import data in Prometheus exposition format
|
||||
|
||||
VictoriaMetrics accepts data in [Prometheus exposition format](https://github.com/prometheus/docs/blob/master/content/docs/instrumenting/exposition_formats.md#text-based-format)
|
||||
via `/api/v1/import/prometheus` path. For example, the following line imports a single line in Prometheus exposition format into VictoriaMetrics:
|
||||
|
||||
```bash
|
||||
curl -d 'foo{bar="baz"} 123' -X POST 'http://localhost:8428/api/v1/import/prometheus'
|
||||
```
|
||||
|
||||
The following command may be used for verifying the imported data:
|
||||
|
||||
```bash
|
||||
curl -G 'http://localhost:8428/api/v1/export' -d 'match={__name__=~"foo"}'
|
||||
```
|
||||
|
||||
It should return somethins like the following:
|
||||
|
||||
```
|
||||
{"metric":{"__name__":"foo","bar":"baz"},"values":[123],"timestamps":[1594370496905]}
|
||||
```
|
||||
|
||||
VictoriaMetrics accepts arbitrary number of lines in a single request to `/api/v1/import/prometheus`, i.e. it supports data streaming.
|
||||
|
||||
VictoriaMetrics also may scrape Prometheus targets - see [these docs](#how-to-scrape-prometheus-exporters-such-as-node-exporter).
|
||||
|
||||
|
||||
### Prometheus querying API usage
|
||||
|
||||
VictoriaMetrics supports the following handlers from [Prometheus querying API](https://prometheus.io/docs/prometheus/latest/querying/api/):
|
||||
|
@ -701,6 +731,7 @@ Time series data can be imported via any supported ingestion protocol:
|
|||
* [OpenTSDB http /api/put](#sending-opentsdb-data-via-http-apiput-requests)
|
||||
* `/api/v1/import` http POST handler, which accepts data from [/api/v1/export](#how-to-export-time-series).
|
||||
* `/api/v1/import/csv` http POST handler, which accepts CSV data. See [these docs](#how-to-import-csv-data) for details.
|
||||
* `/api/v1/import/prometheus` http POST handler, which accepts data in Prometheus exposition format. See [these docs](#how-to-import-data-in-prometheus-exposition-format) for details.
|
||||
|
||||
The most efficient protocol for importing data into VictoriaMetrics is `/api/v1/import`. Example for importing data obtained via `/api/v1/export`:
|
||||
|
||||
|
|
|
@ -26,6 +26,7 @@ to `vmagent` (like the ability to push metrics instead of pulling them). We did
|
|||
* OpenTSDB telnet and http protocols if `-opentsdbListenAddr` command-line flag is set. See [these docs](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/README.md#how-to-send-data-from-opentsdb-compatible-agents).
|
||||
* Prometheus remote write protocol via `http://<vmagent>:8429/api/v1/write`.
|
||||
* JSON lines import protocol via `http://<vmagent>:8429/api/v1/import`. See [these docs](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/README.md#how-to-import-time-series-data).
|
||||
* Data in Prometheus exposition format. See [these docs](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/README.md#how-to-import-data-in-prometheus-exposition-format) for details.
|
||||
* Arbitrary CSV data via `http://<vmagent>:8429/api/v1/import/csv`. See [these docs](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/README.md#how-to-import-csv-data).
|
||||
* Can replicate collected metrics simultaneously to multiple remote storage systems.
|
||||
* Works in environments with unstable connections to remote storage. If the remote storage is unavailable, the collected metrics
|
||||
|
|
|
@ -13,6 +13,7 @@ import (
|
|||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/influx"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/opentsdb"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/opentsdbhttp"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/prometheusimport"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/promremotewrite"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/remotewrite"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/vmimport"
|
||||
|
@ -158,6 +159,15 @@ func requestHandler(w http.ResponseWriter, r *http.Request) bool {
|
|||
}
|
||||
w.WriteHeader(http.StatusNoContent)
|
||||
return true
|
||||
case "/api/v1/import/prometheus":
|
||||
prometheusimportRequests.Inc()
|
||||
if err := prometheusimport.InsertHandler(r); err != nil {
|
||||
prometheusimportErrors.Inc()
|
||||
httpserver.Errorf(w, "error in %q: %s", r.URL.Path, err)
|
||||
return true
|
||||
}
|
||||
w.WriteHeader(http.StatusNoContent)
|
||||
return true
|
||||
case "/write", "/api/v2/write":
|
||||
influxWriteRequests.Inc()
|
||||
if err := influx.InsertHandlerForHTTP(r); err != nil {
|
||||
|
@ -197,6 +207,9 @@ var (
|
|||
csvimportRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/api/v1/import/csv", protocol="csvimport"}`)
|
||||
csvimportErrors = metrics.NewCounter(`vmagent_http_request_errors_total{path="/api/v1/import/csv", protocol="csvimport"}`)
|
||||
|
||||
prometheusimportRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/api/v1/import/prometheus", protocol="prometheusimport"}`)
|
||||
prometheusimportErrors = metrics.NewCounter(`vmagent_http_request_errors_total{path="/api/v1/import/prometheus", protocol="prometheusimport"}`)
|
||||
|
||||
influxWriteRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/write", protocol="influx"}`)
|
||||
influxWriteErrors = metrics.NewCounter(`vmagent_http_request_errors_total{path="/write", protocol="influx"}`)
|
||||
|
||||
|
|
64
app/vmagent/prometheusimport/request_handler.go
Normal file
64
app/vmagent/prometheusimport/request_handler.go
Normal file
|
@ -0,0 +1,64 @@
|
|||
package prometheusimport
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/common"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/remotewrite"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
||||
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/prometheus"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
|
||||
"github.com/VictoriaMetrics/metrics"
|
||||
)
|
||||
|
||||
var (
|
||||
rowsInserted = metrics.NewCounter(`vmagent_rows_inserted_total{type="prometheus"}`)
|
||||
rowsPerInsert = metrics.NewHistogram(`vmagent_rows_per_insert{type="prometheus"}`)
|
||||
)
|
||||
|
||||
// InsertHandler processes `/api/v1/import/prometheus` request.
|
||||
func InsertHandler(req *http.Request) error {
|
||||
return writeconcurrencylimiter.Do(func() error {
|
||||
isGzipped := req.Header.Get("Content-Encoding") == "gzip"
|
||||
return parser.ParseStream(req.Body, isGzipped, insertRows)
|
||||
})
|
||||
}
|
||||
|
||||
func insertRows(rows []parser.Row) error {
|
||||
ctx := common.GetPushCtx()
|
||||
defer common.PutPushCtx(ctx)
|
||||
|
||||
tssDst := ctx.WriteRequest.Timeseries[:0]
|
||||
labels := ctx.Labels[:0]
|
||||
samples := ctx.Samples[:0]
|
||||
for i := range rows {
|
||||
r := &rows[i]
|
||||
labelsLen := len(labels)
|
||||
labels = append(labels, prompbmarshal.Label{
|
||||
Name: "__name__",
|
||||
Value: r.Metric,
|
||||
})
|
||||
for j := range r.Tags {
|
||||
tag := &r.Tags[j]
|
||||
labels = append(labels, prompbmarshal.Label{
|
||||
Name: tag.Key,
|
||||
Value: tag.Value,
|
||||
})
|
||||
}
|
||||
samples = append(samples, prompbmarshal.Sample{
|
||||
Value: r.Value,
|
||||
Timestamp: r.Timestamp,
|
||||
})
|
||||
tssDst = append(tssDst, prompbmarshal.TimeSeries{
|
||||
Labels: labels[labelsLen:],
|
||||
Samples: samples[len(samples)-1:],
|
||||
})
|
||||
}
|
||||
ctx.WriteRequest.Timeseries = tssDst
|
||||
ctx.Labels = labels
|
||||
ctx.Samples = samples
|
||||
remotewrite.Push(&ctx.WriteRequest)
|
||||
rowsInserted.Add(len(rows))
|
||||
rowsPerInsert.Update(float64(len(rows)))
|
||||
return nil
|
||||
}
|
|
@ -12,6 +12,7 @@ import (
|
|||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/influx"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/opentsdb"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/opentsdbhttp"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/prometheusimport"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/prompush"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/promremotewrite"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/relabel"
|
||||
|
@ -114,6 +115,15 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool {
|
|||
}
|
||||
w.WriteHeader(http.StatusNoContent)
|
||||
return true
|
||||
case "/api/v1/import/prometheus":
|
||||
prometheusimportRequests.Inc()
|
||||
if err := prometheusimport.InsertHandler(r); err != nil {
|
||||
prometheusimportErrors.Inc()
|
||||
httpserver.Errorf(w, "error in %q: %s", r.URL.Path, err)
|
||||
return true
|
||||
}
|
||||
w.WriteHeader(http.StatusNoContent)
|
||||
return true
|
||||
case "/write", "/api/v2/write":
|
||||
influxWriteRequests.Inc()
|
||||
if err := influx.InsertHandlerForHTTP(r); err != nil {
|
||||
|
@ -155,6 +165,9 @@ var (
|
|||
csvimportRequests = metrics.NewCounter(`vm_http_requests_total{path="/api/v1/import/csv", protocol="csvimport"}`)
|
||||
csvimportErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/api/v1/import/csv", protocol="csvimport"}`)
|
||||
|
||||
prometheusimportRequests = metrics.NewCounter(`vm_http_requests_total{path="/api/v1/import/prometheus", protocol="prometheusimport"}`)
|
||||
prometheusimportErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/api/v1/import/prometheus", protocol="prometheusimport"}`)
|
||||
|
||||
influxWriteRequests = metrics.NewCounter(`vm_http_requests_total{path="/write", protocol="influx"}`)
|
||||
influxWriteErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/write", protocol="influx"}`)
|
||||
|
||||
|
|
48
app/vminsert/prometheusimport/request_handler.go
Normal file
48
app/vminsert/prometheusimport/request_handler.go
Normal file
|
@ -0,0 +1,48 @@
|
|||
package prometheusimport
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/common"
|
||||
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/prometheus"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
|
||||
"github.com/VictoriaMetrics/metrics"
|
||||
)
|
||||
|
||||
var (
|
||||
rowsInserted = metrics.NewCounter(`vm_rows_inserted_total{type="prometheus"}`)
|
||||
rowsPerInsert = metrics.NewHistogram(`vm_rows_per_insert{type="prometheus"}`)
|
||||
)
|
||||
|
||||
// InsertHandler processes `/api/v1/import/prometheus` request.
|
||||
func InsertHandler(req *http.Request) error {
|
||||
return writeconcurrencylimiter.Do(func() error {
|
||||
isGzipped := req.Header.Get("Content-Encoding") == "gzip"
|
||||
return parser.ParseStream(req.Body, isGzipped, insertRows)
|
||||
})
|
||||
}
|
||||
|
||||
func insertRows(rows []parser.Row) error {
|
||||
ctx := common.GetInsertCtx()
|
||||
defer common.PutInsertCtx(ctx)
|
||||
|
||||
ctx.Reset(len(rows))
|
||||
for i := range rows {
|
||||
r := &rows[i]
|
||||
ctx.Labels = ctx.Labels[:0]
|
||||
ctx.AddLabel("", r.Metric)
|
||||
for j := range r.Tags {
|
||||
tag := &r.Tags[j]
|
||||
ctx.AddLabel(tag.Key, tag.Value)
|
||||
}
|
||||
ctx.ApplyRelabeling()
|
||||
if len(ctx.Labels) == 0 {
|
||||
// Skip metric without labels.
|
||||
continue
|
||||
}
|
||||
ctx.WriteDataPoint(nil, ctx.Labels, r.Timestamp, r.Value)
|
||||
}
|
||||
rowsInserted.Add(len(rows))
|
||||
rowsPerInsert.Update(float64(len(rows)))
|
||||
return ctx.FlushBufs()
|
||||
}
|
|
@ -99,6 +99,8 @@ See [features available for enterprise customers](https://github.com/VictoriaMet
|
|||
* [How to send data from Graphite-compatible agents such as StatsD](#how-to-send-data-from-graphite-compatible-agents-such-as-statsd)
|
||||
* [Querying Graphite data](#querying-graphite-data)
|
||||
* [How to send data from OpenTSDB-compatible agents](#how-to-send-data-from-opentsdb-compatible-agents)
|
||||
* [How to import data in Prometheus exposition format](#how-to-import-data-in-prometheus-exposition-format)
|
||||
* [How to import CSV data](#how-to-import-csv-data)
|
||||
* [Prometheus querying API usage](#prometheus-querying-api-usage)
|
||||
* [How to build from sources](#how-to-build-from-sources)
|
||||
* [Development build](#development-build)
|
||||
|
@ -284,6 +286,8 @@ Currently the following [scrape_config](https://prometheus.io/docs/prometheus/la
|
|||
|
||||
In the future other `*_sd_config` types will be supported.
|
||||
|
||||
VictoriaMetrics also supports [importing data in Prometheus exposition format](#how-to-import-data-in-prometheus-exposition-format).
|
||||
|
||||
See also [vmagent](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/app/vmagent/README.md), which can be used as drop-in replacement for Prometheus.
|
||||
|
||||
### How to send data from InfluxDB-compatible agents such as [Telegraf](https://www.influxdata.com/time-series-platform/telegraf/)
|
||||
|
@ -509,6 +513,32 @@ The following response should be returned:
|
|||
Note that it could be required to flush response cache after importing historical data. See [these docs](#backfilling) for detail.
|
||||
|
||||
|
||||
### How to import data in Prometheus exposition format
|
||||
|
||||
VictoriaMetrics accepts data in [Prometheus exposition format](https://github.com/prometheus/docs/blob/master/content/docs/instrumenting/exposition_formats.md#text-based-format)
|
||||
via `/api/v1/import/prometheus` path. For example, the following line imports a single line in Prometheus exposition format into VictoriaMetrics:
|
||||
|
||||
```bash
|
||||
curl -d 'foo{bar="baz"} 123' -X POST 'http://localhost:8428/api/v1/import/prometheus'
|
||||
```
|
||||
|
||||
The following command may be used for verifying the imported data:
|
||||
|
||||
```bash
|
||||
curl -G 'http://localhost:8428/api/v1/export' -d 'match={__name__=~"foo"}'
|
||||
```
|
||||
|
||||
It should return somethins like the following:
|
||||
|
||||
```
|
||||
{"metric":{"__name__":"foo","bar":"baz"},"values":[123],"timestamps":[1594370496905]}
|
||||
```
|
||||
|
||||
VictoriaMetrics accepts arbitrary number of lines in a single request to `/api/v1/import/prometheus`, i.e. it supports data streaming.
|
||||
|
||||
VictoriaMetrics also may scrape Prometheus targets - see [these docs](#how-to-scrape-prometheus-exporters-such-as-node-exporter).
|
||||
|
||||
|
||||
### Prometheus querying API usage
|
||||
|
||||
VictoriaMetrics supports the following handlers from [Prometheus querying API](https://prometheus.io/docs/prometheus/latest/querying/api/):
|
||||
|
@ -701,6 +731,7 @@ Time series data can be imported via any supported ingestion protocol:
|
|||
* [OpenTSDB http /api/put](#sending-opentsdb-data-via-http-apiput-requests)
|
||||
* `/api/v1/import` http POST handler, which accepts data from [/api/v1/export](#how-to-export-time-series).
|
||||
* `/api/v1/import/csv` http POST handler, which accepts CSV data. See [these docs](#how-to-import-csv-data) for details.
|
||||
* `/api/v1/import/prometheus` http POST handler, which accepts data in Prometheus exposition format. See [these docs](#how-to-import-data-in-prometheus-exposition-format) for details.
|
||||
|
||||
The most efficient protocol for importing data into VictoriaMetrics is `/api/v1/import`. Example for importing data obtained via `/api/v1/export`:
|
||||
|
||||
|
|
|
@ -26,6 +26,7 @@ to `vmagent` (like the ability to push metrics instead of pulling them). We did
|
|||
* OpenTSDB telnet and http protocols if `-opentsdbListenAddr` command-line flag is set. See [these docs](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/README.md#how-to-send-data-from-opentsdb-compatible-agents).
|
||||
* Prometheus remote write protocol via `http://<vmagent>:8429/api/v1/write`.
|
||||
* JSON lines import protocol via `http://<vmagent>:8429/api/v1/import`. See [these docs](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/README.md#how-to-import-time-series-data).
|
||||
* Data in Prometheus exposition format. See [these docs](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/README.md#how-to-import-data-in-prometheus-exposition-format) for details.
|
||||
* Arbitrary CSV data via `http://<vmagent>:8429/api/v1/import/csv`. See [these docs](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/README.md#how-to-import-csv-data).
|
||||
* Can replicate collected metrics simultaneously to multiple remote storage systems.
|
||||
* Works in environments with unstable connections to remote storage. If the remote storage is unavailable, the collected metrics
|
||||
|
|
134
lib/protoparser/prometheus/streamparser.go
Normal file
134
lib/protoparser/prometheus/streamparser.go
Normal file
|
@ -0,0 +1,134 @@
|
|||
package prometheus
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"net"
|
||||
"runtime"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
|
||||
"github.com/VictoriaMetrics/metrics"
|
||||
)
|
||||
|
||||
// ParseStream parses lines with Prometheus exposition format from r and calls callback for the parsed rows.
|
||||
//
|
||||
// The callback can be called multiple times for streamed data from r.
|
||||
//
|
||||
// callback shouldn't hold rows after returning.
|
||||
func ParseStream(r io.Reader, isGzipped bool, callback func(rows []Row) error) error {
|
||||
if isGzipped {
|
||||
zr, err := common.GetGzipReader(r)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot read gzipped lines with Prometheus exposition format: %w", err)
|
||||
}
|
||||
defer common.PutGzipReader(zr)
|
||||
r = zr
|
||||
}
|
||||
ctx := getStreamContext()
|
||||
defer putStreamContext(ctx)
|
||||
for ctx.Read(r) {
|
||||
if err := callback(ctx.Rows.Rows); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return ctx.Error()
|
||||
}
|
||||
|
||||
const flushTimeout = 3 * time.Second
|
||||
|
||||
func (ctx *streamContext) Read(r io.Reader) bool {
|
||||
readCalls.Inc()
|
||||
if ctx.err != nil {
|
||||
return false
|
||||
}
|
||||
if c, ok := r.(net.Conn); ok {
|
||||
if err := c.SetReadDeadline(time.Now().Add(flushTimeout)); err != nil {
|
||||
readErrors.Inc()
|
||||
ctx.err = fmt.Errorf("cannot set read deadline: %w", err)
|
||||
return false
|
||||
}
|
||||
}
|
||||
ctx.reqBuf, ctx.tailBuf, ctx.err = common.ReadLinesBlock(r, ctx.reqBuf, ctx.tailBuf)
|
||||
if ctx.err != nil {
|
||||
var ne net.Error
|
||||
if errors.As(ctx.err, &ne) && ne.Timeout() {
|
||||
// Flush the read data on timeout and try reading again.
|
||||
ctx.err = nil
|
||||
} else {
|
||||
if ctx.err != io.EOF {
|
||||
readErrors.Inc()
|
||||
ctx.err = fmt.Errorf("cannot read graphite plaintext protocol data: %w", ctx.err)
|
||||
}
|
||||
return false
|
||||
}
|
||||
}
|
||||
ctx.Rows.Unmarshal(bytesutil.ToUnsafeString(ctx.reqBuf))
|
||||
rowsRead.Add(len(ctx.Rows.Rows))
|
||||
|
||||
rows := ctx.Rows.Rows
|
||||
|
||||
// Fill missing timestamps with the current timestamp.
|
||||
currentTimestamp := int64(time.Now().UnixNano() / 1e6)
|
||||
for i := range rows {
|
||||
r := &rows[i]
|
||||
if r.Timestamp == 0 {
|
||||
r.Timestamp = currentTimestamp
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
type streamContext struct {
|
||||
Rows Rows
|
||||
reqBuf []byte
|
||||
tailBuf []byte
|
||||
err error
|
||||
}
|
||||
|
||||
func (ctx *streamContext) Error() error {
|
||||
if ctx.err == io.EOF {
|
||||
return nil
|
||||
}
|
||||
return ctx.err
|
||||
}
|
||||
|
||||
func (ctx *streamContext) reset() {
|
||||
ctx.Rows.Reset()
|
||||
ctx.reqBuf = ctx.reqBuf[:0]
|
||||
ctx.tailBuf = ctx.tailBuf[:0]
|
||||
ctx.err = nil
|
||||
}
|
||||
|
||||
var (
|
||||
readCalls = metrics.NewCounter(`vm_protoparser_read_calls_total{type="prometheus"}`)
|
||||
readErrors = metrics.NewCounter(`vm_protoparser_read_errors_total{type="prometheus"}`)
|
||||
rowsRead = metrics.NewCounter(`vm_protoparser_rows_read_total{type="prometheus"}`)
|
||||
)
|
||||
|
||||
func getStreamContext() *streamContext {
|
||||
select {
|
||||
case ctx := <-streamContextPoolCh:
|
||||
return ctx
|
||||
default:
|
||||
if v := streamContextPool.Get(); v != nil {
|
||||
return v.(*streamContext)
|
||||
}
|
||||
return &streamContext{}
|
||||
}
|
||||
}
|
||||
|
||||
func putStreamContext(ctx *streamContext) {
|
||||
ctx.reset()
|
||||
select {
|
||||
case streamContextPoolCh <- ctx:
|
||||
default:
|
||||
streamContextPool.Put(ctx)
|
||||
}
|
||||
}
|
||||
|
||||
var streamContextPool sync.Pool
|
||||
var streamContextPoolCh = make(chan *streamContext, runtime.GOMAXPROCS(-1))
|
92
lib/protoparser/prometheus/streamparser_test.go
Normal file
92
lib/protoparser/prometheus/streamparser_test.go
Normal file
|
@ -0,0 +1,92 @@
|
|||
package prometheus
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"compress/gzip"
|
||||
"reflect"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestParseStream(t *testing.T) {
|
||||
f := func(s string, rowsExpected []Row) {
|
||||
t.Helper()
|
||||
bb := bytes.NewBufferString(s)
|
||||
var result []Row
|
||||
err := ParseStream(bb, false, func(rows []Row) error {
|
||||
result = appendRowCopies(result, rows)
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error when parsing %q: %s", s, err)
|
||||
}
|
||||
if !reflect.DeepEqual(result, rowsExpected) {
|
||||
t.Fatalf("unexpected rows parsed; got\n%v\nwant\n%v", result, rowsExpected)
|
||||
}
|
||||
|
||||
// Parse compressed stream.
|
||||
bb.Reset()
|
||||
zw := gzip.NewWriter(bb)
|
||||
if _, err := zw.Write([]byte(s)); err != nil {
|
||||
t.Fatalf("unexpected error when gzipping %q: %s", s, err)
|
||||
}
|
||||
if err := zw.Close(); err != nil {
|
||||
t.Fatalf("unexpected error when closing gzip writer: %s", err)
|
||||
}
|
||||
result = nil
|
||||
err = ParseStream(bb, true, func(rows []Row) error {
|
||||
result = appendRowCopies(result, rows)
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error when parsing compressed %q: %s", s, err)
|
||||
}
|
||||
if !reflect.DeepEqual(result, rowsExpected) {
|
||||
t.Fatalf("unexpected rows parsed; got\n%v\nwant\n%v", result, rowsExpected)
|
||||
}
|
||||
}
|
||||
|
||||
f("", nil)
|
||||
f("foo 123 456", []Row{{
|
||||
Metric: "foo",
|
||||
Value: 123,
|
||||
Timestamp: 456,
|
||||
}})
|
||||
f(`foo{bar="baz"} 1 2`+"\n"+`aaa{} 3 4`, []Row{
|
||||
{
|
||||
Metric: "foo",
|
||||
Tags: []Tag{{
|
||||
Key: "bar",
|
||||
Value: "baz",
|
||||
}},
|
||||
Value: 1,
|
||||
Timestamp: 2,
|
||||
},
|
||||
{
|
||||
Metric: "aaa",
|
||||
Value: 3,
|
||||
Timestamp: 4,
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
func appendRowCopies(dst, src []Row) []Row {
|
||||
for _, r := range src {
|
||||
// Make a copy of r, since r may contain garbage after returning from the callback to ParseStream.
|
||||
var rCopy Row
|
||||
rCopy.Metric = copyString(r.Metric)
|
||||
rCopy.Value = r.Value
|
||||
rCopy.Timestamp = r.Timestamp
|
||||
for _, tag := range r.Tags {
|
||||
rCopy.Tags = append(rCopy.Tags, Tag{
|
||||
Key: copyString(tag.Key),
|
||||
Value: copyString(tag.Value),
|
||||
})
|
||||
}
|
||||
dst = append(dst, rCopy)
|
||||
}
|
||||
return dst
|
||||
}
|
||||
|
||||
func copyString(s string) string {
|
||||
return string(append([]byte(nil), s...))
|
||||
}
|
Loading…
Reference in a new issue