diff --git a/app/vmagent/prometheusimport/request_handler.go b/app/vmagent/prometheusimport/request_handler.go index b5ef3b8334..aac41d9448 100644 --- a/app/vmagent/prometheusimport/request_handler.go +++ b/app/vmagent/prometheusimport/request_handler.go @@ -23,9 +23,13 @@ func InsertHandler(req *http.Request) error { if err != nil { return err } + defaultTimestamp, err := parserCommon.GetTimestamp(req) + if err != nil { + return err + } return writeconcurrencylimiter.Do(func() error { isGzipped := req.Header.Get("Content-Encoding") == "gzip" - return parser.ParseStream(req.Body, isGzipped, func(rows []parser.Row) error { + return parser.ParseStream(req.Body, defaultTimestamp, isGzipped, func(rows []parser.Row) error { return insertRows(rows, extraLabels) }) }) diff --git a/app/vminsert/prometheusimport/request_handler.go b/app/vminsert/prometheusimport/request_handler.go index dba487f961..a5babb9ddb 100644 --- a/app/vminsert/prometheusimport/request_handler.go +++ b/app/vminsert/prometheusimport/request_handler.go @@ -25,9 +25,13 @@ func InsertHandler(at *auth.Token, req *http.Request) error { if err != nil { return err } + defaultTimestamp, err := parserCommon.GetTimestamp(req) + if err != nil { + return err + } return writeconcurrencylimiter.Do(func() error { isGzipped := req.Header.Get("Content-Encoding") == "gzip" - return parser.ParseStream(req.Body, isGzipped, func(rows []parser.Row) error { + return parser.ParseStream(req.Body, defaultTimestamp, isGzipped, func(rows []parser.Row) error { return insertRows(at, rows, extraLabels) }) }) diff --git a/docs/Single-server-VictoriaMetrics.md b/docs/Single-server-VictoriaMetrics.md index 223b987fbb..68bee367fb 100644 --- a/docs/Single-server-VictoriaMetrics.md +++ b/docs/Single-server-VictoriaMetrics.md @@ -551,6 +551,10 @@ It should return something like the following: Extra labels may be added to all the imported metrics by passing `extra_label=name=value` query args. For example, `/api/v1/import/prometheus?extra_label=foo=bar` would add `{foo="bar"}` label to all the imported metrics. +If timestamp is missing in ` ` Prometheus exposition format line, then the current timestamp is used during data ingestion. +It can be overriden by passing unix timestamp in seconds via `timestamp` query arg. The value may be fractional when millisecond precision is needed. +For example, `/api/v1/import/prometheus?timestamp=1594370496.905`. + VictoriaMetrics accepts arbitrary number of lines in a single request to `/api/v1/import/prometheus`, i.e. it supports data streaming. VictoriaMetrics also may scrape Prometheus targets - see [these docs](#how-to-scrape-prometheus-exporters-such-as-node-exporter). diff --git a/lib/protoparser/common/timestamp.go b/lib/protoparser/common/timestamp.go new file mode 100644 index 0000000000..7016b98ad0 --- /dev/null +++ b/lib/protoparser/common/timestamp.go @@ -0,0 +1,23 @@ +package common + +import ( + "fmt" + "net/http" + "strconv" +) + +// GetTimestamp extracts unix timestamp from `timestamp` query arg. +// +// It returns 0 if there is no `timestamp` query arg. +func GetTimestamp(req *http.Request) (int64, error) { + ts := req.FormValue("timestamp") + if len(ts) == 0 { + return 0, nil + } + timestamp, err := strconv.ParseFloat(ts, 64) + if err != nil { + return 0, fmt.Errorf("cannot parse `timestamp=%s` query arg: %w", ts, err) + } + // Convert seconds to milliseconds. + return int64(timestamp * 1e3), nil +} diff --git a/lib/protoparser/prometheus/streamparser.go b/lib/protoparser/prometheus/streamparser.go index 871960e44f..fe17e6a368 100644 --- a/lib/protoparser/prometheus/streamparser.go +++ b/lib/protoparser/prometheus/streamparser.go @@ -17,7 +17,7 @@ import ( // The callback can be called multiple times for streamed data from r. // // callback shouldn't hold rows after returning. -func ParseStream(r io.Reader, isGzipped bool, callback func(rows []Row) error) error { +func ParseStream(r io.Reader, defaultTimestamp int64, isGzipped bool, callback func(rows []Row) error) error { if isGzipped { zr, err := common.GetGzipReader(r) if err != nil { @@ -28,7 +28,7 @@ func ParseStream(r io.Reader, isGzipped bool, callback func(rows []Row) error) e } ctx := getStreamContext() defer putStreamContext(ctx) - for ctx.Read(r) { + for ctx.Read(r, defaultTimestamp) { if err := callback(ctx.Rows.Rows); err != nil { return err } @@ -36,7 +36,7 @@ func ParseStream(r io.Reader, isGzipped bool, callback func(rows []Row) error) e return ctx.Error() } -func (ctx *streamContext) Read(r io.Reader) bool { +func (ctx *streamContext) Read(r io.Reader, defaultTimestamp int64) bool { readCalls.Inc() if ctx.err != nil { return false @@ -55,11 +55,13 @@ func (ctx *streamContext) Read(r io.Reader) bool { rows := ctx.Rows.Rows // Fill missing timestamps with the current timestamp. - currentTimestamp := int64(time.Now().UnixNano() / 1e6) + if defaultTimestamp <= 0 { + defaultTimestamp = int64(time.Now().UnixNano() / 1e6) + } for i := range rows { r := &rows[i] if r.Timestamp == 0 { - r.Timestamp = currentTimestamp + r.Timestamp = defaultTimestamp } } return true diff --git a/lib/protoparser/prometheus/streamparser_test.go b/lib/protoparser/prometheus/streamparser_test.go index 3a120d590c..6ef7e11f25 100644 --- a/lib/protoparser/prometheus/streamparser_test.go +++ b/lib/protoparser/prometheus/streamparser_test.go @@ -8,11 +8,12 @@ import ( ) func TestParseStream(t *testing.T) { + const defaultTimestamp = 123 f := func(s string, rowsExpected []Row) { t.Helper() bb := bytes.NewBufferString(s) var result []Row - err := ParseStream(bb, false, func(rows []Row) error { + err := ParseStream(bb, defaultTimestamp, false, func(rows []Row) error { result = appendRowCopies(result, rows) return nil }) @@ -33,7 +34,7 @@ func TestParseStream(t *testing.T) { t.Fatalf("unexpected error when closing gzip writer: %s", err) } result = nil - err = ParseStream(bb, true, func(rows []Row) error { + err = ParseStream(bb, defaultTimestamp, true, func(rows []Row) error { result = appendRowCopies(result, rows) return nil }) @@ -67,6 +68,11 @@ func TestParseStream(t *testing.T) { Timestamp: 4, }, }) + f("foo 23", []Row{{ + Metric: "foo", + Value: 23, + Timestamp: defaultTimestamp, + }}) } func appendRowCopies(dst, src []Row) []Row {