From 2380e9b017d5f7b3351694823b243122c2bb8e11 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Fri, 11 Sep 2020 13:26:41 +0300 Subject: [PATCH] app/{vminsert,vmagent}: allow passing timestamp via `timestamp` query arg when ingesting data to `/api/v1/import/prometheus` See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/750 --- README.md | 4 ++++ .../prometheusimport/request_handler.go | 6 ++++- .../prometheusimport/request_handler.go | 6 ++++- docs/Single-server-VictoriaMetrics.md | 4 ++++ lib/protoparser/common/timestamp.go | 23 +++++++++++++++++++ lib/protoparser/prometheus/streamparser.go | 12 ++++++---- .../prometheus/streamparser_test.go | 10 ++++++-- 7 files changed, 56 insertions(+), 9 deletions(-) create mode 100644 lib/protoparser/common/timestamp.go diff --git a/README.md b/README.md index 223b987fb..68bee367f 100644 --- a/README.md +++ b/README.md @@ -551,6 +551,10 @@ It should return something like the following: Extra labels may be added to all the imported metrics by passing `extra_label=name=value` query args. For example, `/api/v1/import/prometheus?extra_label=foo=bar` would add `{foo="bar"}` label to all the imported metrics. +If timestamp is missing in ` ` Prometheus exposition format line, then the current timestamp is used during data ingestion. +It can be overriden by passing unix timestamp in seconds via `timestamp` query arg. The value may be fractional when millisecond precision is needed. +For example, `/api/v1/import/prometheus?timestamp=1594370496.905`. + VictoriaMetrics accepts arbitrary number of lines in a single request to `/api/v1/import/prometheus`, i.e. it supports data streaming. VictoriaMetrics also may scrape Prometheus targets - see [these docs](#how-to-scrape-prometheus-exporters-such-as-node-exporter). diff --git a/app/vmagent/prometheusimport/request_handler.go b/app/vmagent/prometheusimport/request_handler.go index b5ef3b833..aac41d944 100644 --- a/app/vmagent/prometheusimport/request_handler.go +++ b/app/vmagent/prometheusimport/request_handler.go @@ -23,9 +23,13 @@ func InsertHandler(req *http.Request) error { if err != nil { return err } + defaultTimestamp, err := parserCommon.GetTimestamp(req) + if err != nil { + return err + } return writeconcurrencylimiter.Do(func() error { isGzipped := req.Header.Get("Content-Encoding") == "gzip" - return parser.ParseStream(req.Body, isGzipped, func(rows []parser.Row) error { + return parser.ParseStream(req.Body, defaultTimestamp, isGzipped, func(rows []parser.Row) error { return insertRows(rows, extraLabels) }) }) diff --git a/app/vminsert/prometheusimport/request_handler.go b/app/vminsert/prometheusimport/request_handler.go index fa7d1068a..c499600cd 100644 --- a/app/vminsert/prometheusimport/request_handler.go +++ b/app/vminsert/prometheusimport/request_handler.go @@ -23,9 +23,13 @@ func InsertHandler(req *http.Request) error { if err != nil { return err } + defaultTimestamp, err := parserCommon.GetTimestamp(req) + if err != nil { + return err + } return writeconcurrencylimiter.Do(func() error { isGzipped := req.Header.Get("Content-Encoding") == "gzip" - return parser.ParseStream(req.Body, isGzipped, func(rows []parser.Row) error { + return parser.ParseStream(req.Body, defaultTimestamp, isGzipped, func(rows []parser.Row) error { return insertRows(rows, extraLabels) }) }) diff --git a/docs/Single-server-VictoriaMetrics.md b/docs/Single-server-VictoriaMetrics.md index 223b987fb..68bee367f 100644 --- a/docs/Single-server-VictoriaMetrics.md +++ b/docs/Single-server-VictoriaMetrics.md @@ -551,6 +551,10 @@ It should return something like the following: Extra labels may be added to all the imported metrics by passing `extra_label=name=value` query args. For example, `/api/v1/import/prometheus?extra_label=foo=bar` would add `{foo="bar"}` label to all the imported metrics. +If timestamp is missing in ` ` Prometheus exposition format line, then the current timestamp is used during data ingestion. +It can be overriden by passing unix timestamp in seconds via `timestamp` query arg. The value may be fractional when millisecond precision is needed. +For example, `/api/v1/import/prometheus?timestamp=1594370496.905`. + VictoriaMetrics accepts arbitrary number of lines in a single request to `/api/v1/import/prometheus`, i.e. it supports data streaming. VictoriaMetrics also may scrape Prometheus targets - see [these docs](#how-to-scrape-prometheus-exporters-such-as-node-exporter). diff --git a/lib/protoparser/common/timestamp.go b/lib/protoparser/common/timestamp.go new file mode 100644 index 000000000..7016b98ad --- /dev/null +++ b/lib/protoparser/common/timestamp.go @@ -0,0 +1,23 @@ +package common + +import ( + "fmt" + "net/http" + "strconv" +) + +// GetTimestamp extracts unix timestamp from `timestamp` query arg. +// +// It returns 0 if there is no `timestamp` query arg. +func GetTimestamp(req *http.Request) (int64, error) { + ts := req.FormValue("timestamp") + if len(ts) == 0 { + return 0, nil + } + timestamp, err := strconv.ParseFloat(ts, 64) + if err != nil { + return 0, fmt.Errorf("cannot parse `timestamp=%s` query arg: %w", ts, err) + } + // Convert seconds to milliseconds. + return int64(timestamp * 1e3), nil +} diff --git a/lib/protoparser/prometheus/streamparser.go b/lib/protoparser/prometheus/streamparser.go index 871960e44..fe17e6a36 100644 --- a/lib/protoparser/prometheus/streamparser.go +++ b/lib/protoparser/prometheus/streamparser.go @@ -17,7 +17,7 @@ import ( // The callback can be called multiple times for streamed data from r. // // callback shouldn't hold rows after returning. -func ParseStream(r io.Reader, isGzipped bool, callback func(rows []Row) error) error { +func ParseStream(r io.Reader, defaultTimestamp int64, isGzipped bool, callback func(rows []Row) error) error { if isGzipped { zr, err := common.GetGzipReader(r) if err != nil { @@ -28,7 +28,7 @@ func ParseStream(r io.Reader, isGzipped bool, callback func(rows []Row) error) e } ctx := getStreamContext() defer putStreamContext(ctx) - for ctx.Read(r) { + for ctx.Read(r, defaultTimestamp) { if err := callback(ctx.Rows.Rows); err != nil { return err } @@ -36,7 +36,7 @@ func ParseStream(r io.Reader, isGzipped bool, callback func(rows []Row) error) e return ctx.Error() } -func (ctx *streamContext) Read(r io.Reader) bool { +func (ctx *streamContext) Read(r io.Reader, defaultTimestamp int64) bool { readCalls.Inc() if ctx.err != nil { return false @@ -55,11 +55,13 @@ func (ctx *streamContext) Read(r io.Reader) bool { rows := ctx.Rows.Rows // Fill missing timestamps with the current timestamp. - currentTimestamp := int64(time.Now().UnixNano() / 1e6) + if defaultTimestamp <= 0 { + defaultTimestamp = int64(time.Now().UnixNano() / 1e6) + } for i := range rows { r := &rows[i] if r.Timestamp == 0 { - r.Timestamp = currentTimestamp + r.Timestamp = defaultTimestamp } } return true diff --git a/lib/protoparser/prometheus/streamparser_test.go b/lib/protoparser/prometheus/streamparser_test.go index 3a120d590..6ef7e11f2 100644 --- a/lib/protoparser/prometheus/streamparser_test.go +++ b/lib/protoparser/prometheus/streamparser_test.go @@ -8,11 +8,12 @@ import ( ) func TestParseStream(t *testing.T) { + const defaultTimestamp = 123 f := func(s string, rowsExpected []Row) { t.Helper() bb := bytes.NewBufferString(s) var result []Row - err := ParseStream(bb, false, func(rows []Row) error { + err := ParseStream(bb, defaultTimestamp, false, func(rows []Row) error { result = appendRowCopies(result, rows) return nil }) @@ -33,7 +34,7 @@ func TestParseStream(t *testing.T) { t.Fatalf("unexpected error when closing gzip writer: %s", err) } result = nil - err = ParseStream(bb, true, func(rows []Row) error { + err = ParseStream(bb, defaultTimestamp, true, func(rows []Row) error { result = appendRowCopies(result, rows) return nil }) @@ -67,6 +68,11 @@ func TestParseStream(t *testing.T) { Timestamp: 4, }, }) + f("foo 23", []Row{{ + Metric: "foo", + Value: 23, + Timestamp: defaultTimestamp, + }}) } func appendRowCopies(dst, src []Row) []Row {