app/{vminsert,vmagent}: allow passing timestamp via timestamp query arg when ingesting data to /api/v1/import/prometheus

See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/750
This commit is contained in:
Aliaksandr Valialkin 2020-09-11 13:26:41 +03:00
parent af994562c8
commit 58d3b82ae5
6 changed files with 52 additions and 9 deletions

View file

@ -23,9 +23,13 @@ func InsertHandler(req *http.Request) error {
if err != nil { if err != nil {
return err return err
} }
defaultTimestamp, err := parserCommon.GetTimestamp(req)
if err != nil {
return err
}
return writeconcurrencylimiter.Do(func() error { return writeconcurrencylimiter.Do(func() error {
isGzipped := req.Header.Get("Content-Encoding") == "gzip" isGzipped := req.Header.Get("Content-Encoding") == "gzip"
return parser.ParseStream(req.Body, isGzipped, func(rows []parser.Row) error { return parser.ParseStream(req.Body, defaultTimestamp, isGzipped, func(rows []parser.Row) error {
return insertRows(rows, extraLabels) return insertRows(rows, extraLabels)
}) })
}) })

View file

@ -25,9 +25,13 @@ func InsertHandler(at *auth.Token, req *http.Request) error {
if err != nil { if err != nil {
return err return err
} }
defaultTimestamp, err := parserCommon.GetTimestamp(req)
if err != nil {
return err
}
return writeconcurrencylimiter.Do(func() error { return writeconcurrencylimiter.Do(func() error {
isGzipped := req.Header.Get("Content-Encoding") == "gzip" isGzipped := req.Header.Get("Content-Encoding") == "gzip"
return parser.ParseStream(req.Body, isGzipped, func(rows []parser.Row) error { return parser.ParseStream(req.Body, defaultTimestamp, isGzipped, func(rows []parser.Row) error {
return insertRows(at, rows, extraLabels) return insertRows(at, rows, extraLabels)
}) })
}) })

View file

@ -551,6 +551,10 @@ It should return something like the following:
Extra labels may be added to all the imported metrics by passing `extra_label=name=value` query args. Extra labels may be added to all the imported metrics by passing `extra_label=name=value` query args.
For example, `/api/v1/import/prometheus?extra_label=foo=bar` would add `{foo="bar"}` label to all the imported metrics. For example, `/api/v1/import/prometheus?extra_label=foo=bar` would add `{foo="bar"}` label to all the imported metrics.
If timestamp is missing in `<metric> <value> <timestamp>` Prometheus exposition format line, then the current timestamp is used during data ingestion.
It can be overriden by passing unix timestamp in seconds via `timestamp` query arg. The value may be fractional when millisecond precision is needed.
For example, `/api/v1/import/prometheus?timestamp=1594370496.905`.
VictoriaMetrics accepts arbitrary number of lines in a single request to `/api/v1/import/prometheus`, i.e. it supports data streaming. VictoriaMetrics accepts arbitrary number of lines in a single request to `/api/v1/import/prometheus`, i.e. it supports data streaming.
VictoriaMetrics also may scrape Prometheus targets - see [these docs](#how-to-scrape-prometheus-exporters-such-as-node-exporter). VictoriaMetrics also may scrape Prometheus targets - see [these docs](#how-to-scrape-prometheus-exporters-such-as-node-exporter).

View file

@ -0,0 +1,23 @@
package common
import (
"fmt"
"net/http"
"strconv"
)
// GetTimestamp extracts unix timestamp from `timestamp` query arg.
//
// It returns 0 if there is no `timestamp` query arg.
func GetTimestamp(req *http.Request) (int64, error) {
ts := req.FormValue("timestamp")
if len(ts) == 0 {
return 0, nil
}
timestamp, err := strconv.ParseFloat(ts, 64)
if err != nil {
return 0, fmt.Errorf("cannot parse `timestamp=%s` query arg: %w", ts, err)
}
// Convert seconds to milliseconds.
return int64(timestamp * 1e3), nil
}

View file

@ -17,7 +17,7 @@ import (
// The callback can be called multiple times for streamed data from r. // The callback can be called multiple times for streamed data from r.
// //
// callback shouldn't hold rows after returning. // callback shouldn't hold rows after returning.
func ParseStream(r io.Reader, isGzipped bool, callback func(rows []Row) error) error { func ParseStream(r io.Reader, defaultTimestamp int64, isGzipped bool, callback func(rows []Row) error) error {
if isGzipped { if isGzipped {
zr, err := common.GetGzipReader(r) zr, err := common.GetGzipReader(r)
if err != nil { if err != nil {
@ -28,7 +28,7 @@ func ParseStream(r io.Reader, isGzipped bool, callback func(rows []Row) error) e
} }
ctx := getStreamContext() ctx := getStreamContext()
defer putStreamContext(ctx) defer putStreamContext(ctx)
for ctx.Read(r) { for ctx.Read(r, defaultTimestamp) {
if err := callback(ctx.Rows.Rows); err != nil { if err := callback(ctx.Rows.Rows); err != nil {
return err return err
} }
@ -36,7 +36,7 @@ func ParseStream(r io.Reader, isGzipped bool, callback func(rows []Row) error) e
return ctx.Error() return ctx.Error()
} }
func (ctx *streamContext) Read(r io.Reader) bool { func (ctx *streamContext) Read(r io.Reader, defaultTimestamp int64) bool {
readCalls.Inc() readCalls.Inc()
if ctx.err != nil { if ctx.err != nil {
return false return false
@ -55,11 +55,13 @@ func (ctx *streamContext) Read(r io.Reader) bool {
rows := ctx.Rows.Rows rows := ctx.Rows.Rows
// Fill missing timestamps with the current timestamp. // Fill missing timestamps with the current timestamp.
currentTimestamp := int64(time.Now().UnixNano() / 1e6) if defaultTimestamp <= 0 {
defaultTimestamp = int64(time.Now().UnixNano() / 1e6)
}
for i := range rows { for i := range rows {
r := &rows[i] r := &rows[i]
if r.Timestamp == 0 { if r.Timestamp == 0 {
r.Timestamp = currentTimestamp r.Timestamp = defaultTimestamp
} }
} }
return true return true

View file

@ -8,11 +8,12 @@ import (
) )
func TestParseStream(t *testing.T) { func TestParseStream(t *testing.T) {
const defaultTimestamp = 123
f := func(s string, rowsExpected []Row) { f := func(s string, rowsExpected []Row) {
t.Helper() t.Helper()
bb := bytes.NewBufferString(s) bb := bytes.NewBufferString(s)
var result []Row var result []Row
err := ParseStream(bb, false, func(rows []Row) error { err := ParseStream(bb, defaultTimestamp, false, func(rows []Row) error {
result = appendRowCopies(result, rows) result = appendRowCopies(result, rows)
return nil return nil
}) })
@ -33,7 +34,7 @@ func TestParseStream(t *testing.T) {
t.Fatalf("unexpected error when closing gzip writer: %s", err) t.Fatalf("unexpected error when closing gzip writer: %s", err)
} }
result = nil result = nil
err = ParseStream(bb, true, func(rows []Row) error { err = ParseStream(bb, defaultTimestamp, true, func(rows []Row) error {
result = appendRowCopies(result, rows) result = appendRowCopies(result, rows)
return nil return nil
}) })
@ -67,6 +68,11 @@ func TestParseStream(t *testing.T) {
Timestamp: 4, Timestamp: 4,
}, },
}) })
f("foo 23", []Row{{
Metric: "foo",
Value: 23,
Timestamp: defaultTimestamp,
}})
} }
func appendRowCopies(dst, src []Row) []Row { func appendRowCopies(dst, src []Row) []Row {