mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
app/{vminsert,vmagent}: allow passing timestamp via timestamp
query arg when ingesting data to /api/v1/import/prometheus
See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/750
This commit is contained in:
parent
f0005c3007
commit
2380e9b017
7 changed files with 56 additions and 9 deletions
|
@ -551,6 +551,10 @@ It should return something like the following:
|
||||||
Extra labels may be added to all the imported metrics by passing `extra_label=name=value` query args.
|
Extra labels may be added to all the imported metrics by passing `extra_label=name=value` query args.
|
||||||
For example, `/api/v1/import/prometheus?extra_label=foo=bar` would add `{foo="bar"}` label to all the imported metrics.
|
For example, `/api/v1/import/prometheus?extra_label=foo=bar` would add `{foo="bar"}` label to all the imported metrics.
|
||||||
|
|
||||||
|
If timestamp is missing in `<metric> <value> <timestamp>` Prometheus exposition format line, then the current timestamp is used during data ingestion.
|
||||||
|
It can be overriden by passing unix timestamp in seconds via `timestamp` query arg. The value may be fractional when millisecond precision is needed.
|
||||||
|
For example, `/api/v1/import/prometheus?timestamp=1594370496.905`.
|
||||||
|
|
||||||
VictoriaMetrics accepts arbitrary number of lines in a single request to `/api/v1/import/prometheus`, i.e. it supports data streaming.
|
VictoriaMetrics accepts arbitrary number of lines in a single request to `/api/v1/import/prometheus`, i.e. it supports data streaming.
|
||||||
|
|
||||||
VictoriaMetrics also may scrape Prometheus targets - see [these docs](#how-to-scrape-prometheus-exporters-such-as-node-exporter).
|
VictoriaMetrics also may scrape Prometheus targets - see [these docs](#how-to-scrape-prometheus-exporters-such-as-node-exporter).
|
||||||
|
|
|
@ -23,9 +23,13 @@ func InsertHandler(req *http.Request) error {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
defaultTimestamp, err := parserCommon.GetTimestamp(req)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
return writeconcurrencylimiter.Do(func() error {
|
return writeconcurrencylimiter.Do(func() error {
|
||||||
isGzipped := req.Header.Get("Content-Encoding") == "gzip"
|
isGzipped := req.Header.Get("Content-Encoding") == "gzip"
|
||||||
return parser.ParseStream(req.Body, isGzipped, func(rows []parser.Row) error {
|
return parser.ParseStream(req.Body, defaultTimestamp, isGzipped, func(rows []parser.Row) error {
|
||||||
return insertRows(rows, extraLabels)
|
return insertRows(rows, extraLabels)
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
|
|
@ -23,9 +23,13 @@ func InsertHandler(req *http.Request) error {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
defaultTimestamp, err := parserCommon.GetTimestamp(req)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
return writeconcurrencylimiter.Do(func() error {
|
return writeconcurrencylimiter.Do(func() error {
|
||||||
isGzipped := req.Header.Get("Content-Encoding") == "gzip"
|
isGzipped := req.Header.Get("Content-Encoding") == "gzip"
|
||||||
return parser.ParseStream(req.Body, isGzipped, func(rows []parser.Row) error {
|
return parser.ParseStream(req.Body, defaultTimestamp, isGzipped, func(rows []parser.Row) error {
|
||||||
return insertRows(rows, extraLabels)
|
return insertRows(rows, extraLabels)
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
|
|
@ -551,6 +551,10 @@ It should return something like the following:
|
||||||
Extra labels may be added to all the imported metrics by passing `extra_label=name=value` query args.
|
Extra labels may be added to all the imported metrics by passing `extra_label=name=value` query args.
|
||||||
For example, `/api/v1/import/prometheus?extra_label=foo=bar` would add `{foo="bar"}` label to all the imported metrics.
|
For example, `/api/v1/import/prometheus?extra_label=foo=bar` would add `{foo="bar"}` label to all the imported metrics.
|
||||||
|
|
||||||
|
If timestamp is missing in `<metric> <value> <timestamp>` Prometheus exposition format line, then the current timestamp is used during data ingestion.
|
||||||
|
It can be overriden by passing unix timestamp in seconds via `timestamp` query arg. The value may be fractional when millisecond precision is needed.
|
||||||
|
For example, `/api/v1/import/prometheus?timestamp=1594370496.905`.
|
||||||
|
|
||||||
VictoriaMetrics accepts arbitrary number of lines in a single request to `/api/v1/import/prometheus`, i.e. it supports data streaming.
|
VictoriaMetrics accepts arbitrary number of lines in a single request to `/api/v1/import/prometheus`, i.e. it supports data streaming.
|
||||||
|
|
||||||
VictoriaMetrics also may scrape Prometheus targets - see [these docs](#how-to-scrape-prometheus-exporters-such-as-node-exporter).
|
VictoriaMetrics also may scrape Prometheus targets - see [these docs](#how-to-scrape-prometheus-exporters-such-as-node-exporter).
|
||||||
|
|
23
lib/protoparser/common/timestamp.go
Normal file
23
lib/protoparser/common/timestamp.go
Normal file
|
@ -0,0 +1,23 @@
|
||||||
|
package common
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"net/http"
|
||||||
|
"strconv"
|
||||||
|
)
|
||||||
|
|
||||||
|
// GetTimestamp extracts unix timestamp from `timestamp` query arg.
|
||||||
|
//
|
||||||
|
// It returns 0 if there is no `timestamp` query arg.
|
||||||
|
func GetTimestamp(req *http.Request) (int64, error) {
|
||||||
|
ts := req.FormValue("timestamp")
|
||||||
|
if len(ts) == 0 {
|
||||||
|
return 0, nil
|
||||||
|
}
|
||||||
|
timestamp, err := strconv.ParseFloat(ts, 64)
|
||||||
|
if err != nil {
|
||||||
|
return 0, fmt.Errorf("cannot parse `timestamp=%s` query arg: %w", ts, err)
|
||||||
|
}
|
||||||
|
// Convert seconds to milliseconds.
|
||||||
|
return int64(timestamp * 1e3), nil
|
||||||
|
}
|
|
@ -17,7 +17,7 @@ import (
|
||||||
// The callback can be called multiple times for streamed data from r.
|
// The callback can be called multiple times for streamed data from r.
|
||||||
//
|
//
|
||||||
// callback shouldn't hold rows after returning.
|
// callback shouldn't hold rows after returning.
|
||||||
func ParseStream(r io.Reader, isGzipped bool, callback func(rows []Row) error) error {
|
func ParseStream(r io.Reader, defaultTimestamp int64, isGzipped bool, callback func(rows []Row) error) error {
|
||||||
if isGzipped {
|
if isGzipped {
|
||||||
zr, err := common.GetGzipReader(r)
|
zr, err := common.GetGzipReader(r)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -28,7 +28,7 @@ func ParseStream(r io.Reader, isGzipped bool, callback func(rows []Row) error) e
|
||||||
}
|
}
|
||||||
ctx := getStreamContext()
|
ctx := getStreamContext()
|
||||||
defer putStreamContext(ctx)
|
defer putStreamContext(ctx)
|
||||||
for ctx.Read(r) {
|
for ctx.Read(r, defaultTimestamp) {
|
||||||
if err := callback(ctx.Rows.Rows); err != nil {
|
if err := callback(ctx.Rows.Rows); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -36,7 +36,7 @@ func ParseStream(r io.Reader, isGzipped bool, callback func(rows []Row) error) e
|
||||||
return ctx.Error()
|
return ctx.Error()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ctx *streamContext) Read(r io.Reader) bool {
|
func (ctx *streamContext) Read(r io.Reader, defaultTimestamp int64) bool {
|
||||||
readCalls.Inc()
|
readCalls.Inc()
|
||||||
if ctx.err != nil {
|
if ctx.err != nil {
|
||||||
return false
|
return false
|
||||||
|
@ -55,11 +55,13 @@ func (ctx *streamContext) Read(r io.Reader) bool {
|
||||||
rows := ctx.Rows.Rows
|
rows := ctx.Rows.Rows
|
||||||
|
|
||||||
// Fill missing timestamps with the current timestamp.
|
// Fill missing timestamps with the current timestamp.
|
||||||
currentTimestamp := int64(time.Now().UnixNano() / 1e6)
|
if defaultTimestamp <= 0 {
|
||||||
|
defaultTimestamp = int64(time.Now().UnixNano() / 1e6)
|
||||||
|
}
|
||||||
for i := range rows {
|
for i := range rows {
|
||||||
r := &rows[i]
|
r := &rows[i]
|
||||||
if r.Timestamp == 0 {
|
if r.Timestamp == 0 {
|
||||||
r.Timestamp = currentTimestamp
|
r.Timestamp = defaultTimestamp
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return true
|
return true
|
||||||
|
|
|
@ -8,11 +8,12 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestParseStream(t *testing.T) {
|
func TestParseStream(t *testing.T) {
|
||||||
|
const defaultTimestamp = 123
|
||||||
f := func(s string, rowsExpected []Row) {
|
f := func(s string, rowsExpected []Row) {
|
||||||
t.Helper()
|
t.Helper()
|
||||||
bb := bytes.NewBufferString(s)
|
bb := bytes.NewBufferString(s)
|
||||||
var result []Row
|
var result []Row
|
||||||
err := ParseStream(bb, false, func(rows []Row) error {
|
err := ParseStream(bb, defaultTimestamp, false, func(rows []Row) error {
|
||||||
result = appendRowCopies(result, rows)
|
result = appendRowCopies(result, rows)
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
|
@ -33,7 +34,7 @@ func TestParseStream(t *testing.T) {
|
||||||
t.Fatalf("unexpected error when closing gzip writer: %s", err)
|
t.Fatalf("unexpected error when closing gzip writer: %s", err)
|
||||||
}
|
}
|
||||||
result = nil
|
result = nil
|
||||||
err = ParseStream(bb, true, func(rows []Row) error {
|
err = ParseStream(bb, defaultTimestamp, true, func(rows []Row) error {
|
||||||
result = appendRowCopies(result, rows)
|
result = appendRowCopies(result, rows)
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
|
@ -67,6 +68,11 @@ func TestParseStream(t *testing.T) {
|
||||||
Timestamp: 4,
|
Timestamp: 4,
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
|
f("foo 23", []Row{{
|
||||||
|
Metric: "foo",
|
||||||
|
Value: 23,
|
||||||
|
Timestamp: defaultTimestamp,
|
||||||
|
}})
|
||||||
}
|
}
|
||||||
|
|
||||||
func appendRowCopies(dst, src []Row) []Row {
|
func appendRowCopies(dst, src []Row) []Row {
|
||||||
|
|
Loading…
Reference in a new issue