Merge branch 'public-single-node' into pmm-6401-read-prometheus-data-files

This commit is contained in:
Aliaksandr Valialkin 2021-01-13 11:41:15 +02:00
commit d72fc60108
33 changed files with 269 additions and 92 deletions

View file

@ -49,6 +49,13 @@ vmutils: \
vmbackup \
vmrestore
vmutils-arm64: \
vmagent-arm64 \
vmalert-arm64 \
vmauth-arm64 \
vmbackup-arm64 \
vmrestore-arm64
release-snap:
snapcraft
snapcraft upload "victoriametrics_$(PKG_TAG)_multi.snap" --release beta,edge,candidate
@ -70,6 +77,16 @@ release-vmutils: \
cd bin && tar czf vmutils-$(PKG_TAG).tar.gz vmagent-prod vmalert-prod vmauth-prod vmbackup-prod vmrestore-prod && \
sha256sum vmutils-$(PKG_TAG).tar.gz > vmutils-$(PKG_TAG)_checksums.txt
release-vmutils-arm64: \
vmagent-arm64-prod \
vmalert-arm64-prod \
vmauth-arm64-prod \
vmbackup-arm64-prod \
vmrestore-arm64-prod
cd bin && tar czf vmutils-arm64-$(PKG_TAG).tar.gz vmagent-arm64-prod vmalert-arm64-prod vmauth-arm64-prod vmbackup-arm64-prod vmrestore-arm64-prod && \
sha256sum vmutils-arm64-$(PKG_TAG).tar.gz > vmutils-arm64-$(PKG_TAG)_checksums.txt
pprof-cpu:
go tool pprof -trim_path=github.com/VictoriaMetrics/VictoriaMetrics@ $(PPROF_FILE)

View file

@ -409,6 +409,8 @@ The `/api/v1/export` endpoint should return the following response:
Note that Influx line protocol expects [timestamps in *nanoseconds* by default](https://docs.influxdata.com/influxdb/v1.7/write_protocols/line_protocol_tutorial/#timestamp),
while VictoriaMetrics stores them with *milliseconds* precision.
Extra labels may be added to all the written time series by passing `extra_label=name=value` query args.
For example, `/write?extra_label=foo=bar` would add `{foo="bar"}` label to all the ingested metrics.
## How to send data from Graphite-compatible agents such as [StatsD](https://github.com/etsy/statsd)
@ -524,6 +526,8 @@ The `/api/v1/export` endpoint should return the following response:
{"metric":{"__name__":"x.y.z","t1":"v1","t2":"v2"},"values":[45.34],"timestamps":[1566464763000]}
```
Extra labels may be added to all the imported time series by passing `extra_label=name=value` query args.
For example, `/api/put?extra_label=foo=bar` would add `{foo="bar"}` label to all the ingested metrics.
## Prometheus querying API usage
@ -1369,7 +1373,7 @@ cache when samples with timestamps older than `now - search.cacheTimestampOffset
## Data updates
VictoriaMetrics doesn't support updating already existing sample values to new ones. It stores all the ingested data points
for the same time series with identical timestamps. While is possible substituting old time series with new time series via
for the same time series with identical timestamps. While it is possible substituting old time series with new time series via
[removal of old time series](#how-to-delete-timeseries) and then [writing new time series](#backfilling), this approach
should be used only for one-off updates. It shouldn't be used for frequent updates because of non-zero overhead related to data removal.

View file

@ -58,6 +58,7 @@ var (
type test struct {
Name string `json:"name"`
Data []string `json:"data"`
InsertQuery string `json:"insert_query"`
Query []string `json:"query"`
ResultMetrics []Metric `json:"result_metrics"`
ResultSeries Series `json:"result_series"`
@ -209,7 +210,7 @@ func testWrite(t *testing.T) {
t.Errorf("error compressing %v %s", r, err)
t.Fail()
}
httpWrite(t, testPromWriteHTTPPath, bytes.NewBuffer(data))
httpWrite(t, testPromWriteHTTPPath, test.InsertQuery, bytes.NewBuffer(data))
}
})
@ -218,7 +219,7 @@ func testWrite(t *testing.T) {
test := x
t.Run(test.Name, func(t *testing.T) {
t.Parallel()
httpWrite(t, testWriteHTTPPath, bytes.NewBufferString(strings.Join(test.Data, "\n")))
httpWrite(t, testWriteHTTPPath, test.InsertQuery, bytes.NewBufferString(strings.Join(test.Data, "\n")))
})
}
})
@ -246,7 +247,7 @@ func testWrite(t *testing.T) {
t.Run(test.Name, func(t *testing.T) {
t.Parallel()
logger.Infof("writing %s", test.Data)
httpWrite(t, testOpenTSDBWriteHTTPPath, bytes.NewBufferString(strings.Join(test.Data, "\n")))
httpWrite(t, testOpenTSDBWriteHTTPPath, test.InsertQuery, bytes.NewBufferString(strings.Join(test.Data, "\n")))
})
}
})
@ -324,10 +325,10 @@ func readIn(readFor string, t *testing.T, insertTime time.Time) []test {
return tt
}
func httpWrite(t *testing.T, address string, r io.Reader) {
func httpWrite(t *testing.T, address, query string, r io.Reader) {
t.Helper()
s := newSuite(t)
resp, err := http.Post(address, "", r)
resp, err := http.Post(address+query, "", r)
s.noError(err)
s.noError(resp.Body.Close())
s.equalInt(resp.StatusCode, 204)

View file

@ -0,0 +1,10 @@
{
"name": "insert_with_extra_labels",
"data": ["measurement,tag1=value1,tag2=value2 field6=1.23,field5=123 {TIME_NS}"],
"insert_query": "?extra_label=job=test&extra_label=tag2=value10",
"query": ["/api/v1/export?match={__name__!=''}"],
"result_metrics": [
{"metric":{"__name__":"measurement_field5","tag1":"value1","job": "test","tag2":"value10"},"values":[123], "timestamps": ["{TIME_MS}"]},
{"metric":{"__name__":"measurement_field6","tag1":"value1","job": "test","tag2":"value10"},"values":[1.23], "timestamps": ["{TIME_MS}"]}
]
}

View file

@ -0,0 +1,9 @@
{
"name": "insert_with_extra_labels",
"data": ["{\"metric\": \"opentsdbhttp.foobar\", \"value\": 1001, \"timestamp\": {TIME_S}, \"tags\": {\"bar\":\"baz\", \"x\": \"y\"}}"],
"insert_query": "?extra_label=job=open-test&extra_label=x=z",
"query": ["/api/v1/export?match={__name__!=''}"],
"result_metrics": [
{"metric":{"__name__":"opentsdbhttp.foobar","bar":"baz","x":"z","job": "open-test"},"values":[1001], "timestamps": ["{TIME_MSZ}"]}
]
}

View file

@ -0,0 +1,9 @@
{
"name": "basic_insertion_with_extra_labels",
"insert_query": "?extra_label=job=prom-test&extra_label=baz=bar",
"data": ["[{\"labels\":[{\"name\":\"__name__\",\"value\":\"prometheus.foobar\"},{\"name\":\"baz\",\"value\":\"qux\"}],\"samples\":[{\"value\":100000,\"timestamp\":\"{TIME_MS}\"}]}]"],
"query": ["/api/v1/export?match={__name__!=''}"],
"result_metrics": [
{"metric":{"__name__":"prometheus.foobar","baz":"bar","job": "prom-test"},"values":[100000], "timestamps": ["{TIME_MS}"]}
]
}

View file

@ -12,6 +12,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel"
parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/influx"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
"github.com/VictoriaMetrics/metrics"
@ -33,7 +34,9 @@ var (
// See https://github.com/influxdata/telegraf/tree/master/plugins/inputs/socket_listener/
func InsertHandlerForReader(r io.Reader) error {
return writeconcurrencylimiter.Do(func() error {
return parser.ParseStream(r, false, "", "", insertRows)
return parser.ParseStream(r, false, "", "", func(db string, rows []parser.Row) error {
return insertRows(db, rows, nil)
})
})
}
@ -41,17 +44,23 @@ func InsertHandlerForReader(r io.Reader) error {
//
// See https://github.com/influxdata/influxdb/blob/4cbdc197b8117fee648d62e2e5be75c6575352f0/tsdb/README.md
func InsertHandlerForHTTP(req *http.Request) error {
extraLabels, err := parserCommon.GetExtraLabels(req)
if err != nil {
return err
}
return writeconcurrencylimiter.Do(func() error {
isGzipped := req.Header.Get("Content-Encoding") == "gzip"
q := req.URL.Query()
precision := q.Get("precision")
// Read db tag from https://docs.influxdata.com/influxdb/v1.7/tools/api/#write-http-endpoint
db := q.Get("db")
return parser.ParseStream(req.Body, isGzipped, precision, db, insertRows)
return parser.ParseStream(req.Body, isGzipped, precision, db, func(db string, rows []parser.Row) error {
return insertRows(db, rows, extraLabels)
})
})
}
func insertRows(db string, rows []parser.Row) error {
func insertRows(db string, rows []parser.Row, extraLabels []prompbmarshal.Label) error {
ctx := getPushCtx()
defer putPushCtx(ctx)
@ -82,6 +91,7 @@ func insertRows(db string, rows []parser.Row) error {
Value: db,
})
}
commonLabels = append(commonLabels, extraLabels...)
ctx.metricGroupBuf = ctx.metricGroupBuf[:0]
if !*skipMeasurement {
ctx.metricGroupBuf = append(ctx.metricGroupBuf, r.Measurement...)

View file

@ -6,6 +6,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/common"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/remotewrite"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentsdbhttp"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
"github.com/VictoriaMetrics/metrics"
@ -19,12 +20,18 @@ var (
// InsertHandler processes HTTP OpenTSDB put requests.
// See http://opentsdb.net/docs/build/html/api_http/put.html
func InsertHandler(req *http.Request) error {
extraLabels, err := parserCommon.GetExtraLabels(req)
if err != nil {
return err
}
return writeconcurrencylimiter.Do(func() error {
return parser.ParseStream(req, insertRows)
return parser.ParseStream(req, func(rows []parser.Row) error {
return insertRows(rows, extraLabels)
})
})
}
func insertRows(rows []parser.Row) error {
func insertRows(rows []parser.Row, extraLabels []prompbmarshal.Label) error {
ctx := common.GetPushCtx()
defer common.PutPushCtx(ctx)
@ -45,6 +52,7 @@ func insertRows(rows []parser.Row) error {
Value: tag.Value,
})
}
labels = append(labels, extraLabels...)
samples = append(samples, prompbmarshal.Sample{
Value: r.Value,
Timestamp: r.Timestamp,

View file

@ -31,7 +31,7 @@ func InsertHandler(req *http.Request) error {
isGzipped := req.Header.Get("Content-Encoding") == "gzip"
return parser.ParseStream(req.Body, defaultTimestamp, isGzipped, func(rows []parser.Row) error {
return insertRows(rows, extraLabels)
})
}, nil)
})
}

View file

@ -8,6 +8,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/promremotewrite"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
"github.com/VictoriaMetrics/metrics"
@ -20,12 +21,18 @@ var (
// InsertHandler processes remote write for prometheus.
func InsertHandler(req *http.Request) error {
extraLabels, err := parserCommon.GetExtraLabels(req)
if err != nil {
return err
}
return writeconcurrencylimiter.Do(func() error {
return parser.ParseStream(req, insertRows)
return parser.ParseStream(req, func(tss []prompb.TimeSeries) error {
return insertRows(tss, extraLabels)
})
})
}
func insertRows(timeseries []prompb.TimeSeries) error {
func insertRows(timeseries []prompb.TimeSeries, extraLabels []prompbmarshal.Label) error {
ctx := common.GetPushCtx()
defer common.PutPushCtx(ctx)
@ -44,6 +51,7 @@ func insertRows(timeseries []prompb.TimeSeries) error {
Value: bytesutil.ToUnsafeString(label.Value),
})
}
labels = append(labels, extraLabels...)
samplesLen := len(samples)
for i := range ts.Samples {
sample := &ts.Samples[i]

View file

@ -11,6 +11,8 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/influx"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
@ -33,7 +35,9 @@ var (
// See https://github.com/influxdata/telegraf/tree/master/plugins/inputs/socket_listener/
func InsertHandlerForReader(r io.Reader) error {
return writeconcurrencylimiter.Do(func() error {
return parser.ParseStream(r, false, "", "", insertRows)
return parser.ParseStream(r, false, "", "", func(db string, rows []parser.Row) error {
return insertRows(db, rows, nil)
})
})
}
@ -41,17 +45,23 @@ func InsertHandlerForReader(r io.Reader) error {
//
// See https://github.com/influxdata/influxdb/blob/4cbdc197b8117fee648d62e2e5be75c6575352f0/tsdb/README.md
func InsertHandlerForHTTP(req *http.Request) error {
extraLabels, err := parserCommon.GetExtraLabels(req)
if err != nil {
return err
}
return writeconcurrencylimiter.Do(func() error {
isGzipped := req.Header.Get("Content-Encoding") == "gzip"
q := req.URL.Query()
precision := q.Get("precision")
// Read db tag from https://docs.influxdata.com/influxdb/v1.7/tools/api/#write-http-endpoint
db := q.Get("db")
return parser.ParseStream(req.Body, isGzipped, precision, db, insertRows)
return parser.ParseStream(req.Body, isGzipped, precision, db, func(db string, rows []parser.Row) error {
return insertRows(db, rows, extraLabels)
})
})
}
func insertRows(db string, rows []parser.Row) error {
func insertRows(db string, rows []parser.Row, extraLabels []prompbmarshal.Label) error {
ctx := getPushCtx()
defer putPushCtx(ctx)
@ -78,6 +88,10 @@ func insertRows(db string, rows []parser.Row) error {
if !hasDBKey {
ic.AddLabel("db", db)
}
for j := range extraLabels {
label := &extraLabels[j]
ic.AddLabel(label.Name, label.Value)
}
ctx.metricGroupBuf = ctx.metricGroupBuf[:0]
if !*skipMeasurement {
ctx.metricGroupBuf = append(ctx.metricGroupBuf, r.Measurement...)

View file

@ -6,6 +6,8 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/common"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/relabel"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentsdbhttp"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
"github.com/VictoriaMetrics/metrics"
@ -22,15 +24,21 @@ func InsertHandler(req *http.Request) error {
path := req.URL.Path
switch path {
case "/api/put":
extraLabels, err := parserCommon.GetExtraLabels(req)
if err != nil {
return err
}
return writeconcurrencylimiter.Do(func() error {
return parser.ParseStream(req, insertRows)
return parser.ParseStream(req, func(rows []parser.Row) error {
return insertRows(rows, extraLabels)
})
})
default:
return fmt.Errorf("unexpected path requested on HTTP OpenTSDB server: %q", path)
}
}
func insertRows(rows []parser.Row) error {
func insertRows(rows []parser.Row, extraLabels []prompbmarshal.Label) error {
ctx := common.GetInsertCtx()
defer common.PutInsertCtx(ctx)
@ -44,6 +52,10 @@ func insertRows(rows []parser.Row) error {
tag := &r.Tags[j]
ctx.AddLabel(tag.Key, tag.Value)
}
for j := range extraLabels {
label := &extraLabels[j]
ctx.AddLabel(label.Name, label.Value)
}
if hasRelabeling {
ctx.ApplyRelabeling()
}

View file

@ -31,7 +31,7 @@ func InsertHandler(req *http.Request) error {
isGzipped := req.Header.Get("Content-Encoding") == "gzip"
return parser.ParseStream(req.Body, defaultTimestamp, isGzipped, func(rows []parser.Row) error {
return insertRows(rows, extraLabels)
})
}, nil)
})
}

View file

@ -6,6 +6,8 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/common"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/relabel"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/promremotewrite"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
"github.com/VictoriaMetrics/metrics"
@ -18,12 +20,18 @@ var (
// InsertHandler processes remote write for prometheus.
func InsertHandler(req *http.Request) error {
extraLabels, err := parserCommon.GetExtraLabels(req)
if err != nil {
return err
}
return writeconcurrencylimiter.Do(func() error {
return parser.ParseStream(req, insertRows)
return parser.ParseStream(req, func(tss []prompb.TimeSeries) error {
return insertRows(tss, extraLabels)
})
})
}
func insertRows(timeseries []prompb.TimeSeries) error {
func insertRows(timeseries []prompb.TimeSeries, extraLabels []prompbmarshal.Label) error {
ctx := common.GetInsertCtx()
defer common.PutInsertCtx(ctx)
@ -42,6 +50,10 @@ func insertRows(timeseries []prompb.TimeSeries) error {
for _, srcLabel := range srcLabels {
ctx.AddLabelBytes(srcLabel.Name, srcLabel.Value)
}
for j := range extraLabels {
label := &extraLabels[j]
ctx.AddLabel(label.Name, label.Value)
}
if hasRelabeling {
ctx.ApplyRelabeling()
}

View file

@ -24,6 +24,7 @@ func TestParseMetricSelectorSuccess(t *testing.T) {
f(`foo {bar != "baz"}`)
f(` foo { bar !~ "^ddd(x+)$", a="ss", __name__="sffd"} `)
f(`(foo)`)
f(`\п\р\и\в\е{\ы="111"}`)
}
func TestParseMetricSelectorError(t *testing.T) {

View file

@ -60,6 +60,8 @@ var rollupFuncs = map[string]newRollupFunc{
"scrape_interval": newRollupFuncOneArg(rollupScrapeInterval),
"tmin_over_time": newRollupFuncOneArg(rollupTmin),
"tmax_over_time": newRollupFuncOneArg(rollupTmax),
"tfirst_over_time": newRollupFuncOneArg(rollupTfirst),
"tlast_over_time": newRollupFuncOneArg(rollupTlast),
"share_le_over_time": newRollupShareLE,
"share_gt_over_time": newRollupShareGT,
"count_le_over_time": newRollupCountLE,
@ -83,7 +85,7 @@ var rollupFuncs = map[string]newRollupFunc{
// `timestamp` function must return timestamp for the last datapoint on the current window
// in order to properly handle offset and timestamps unaligned to the current step.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/415 for details.
"timestamp": newRollupFuncOneArg(rollupTimestamp),
"timestamp": newRollupFuncOneArg(rollupTlast),
// See https://en.wikipedia.org/wiki/Mode_(statistics)
"mode_over_time": newRollupFuncOneArg(rollupModeOverTime),
@ -128,10 +130,12 @@ var rollupAggrFuncs = map[string]rollupFunc{
"scrape_interval": rollupScrapeInterval,
"tmin_over_time": rollupTmin,
"tmax_over_time": rollupTmax,
"tfirst_over_time": rollupTfirst,
"tlast_over_time": rollupTlast,
"ascent_over_time": rollupAscentOverTime,
"descent_over_time": rollupDescentOverTime,
"zscore_over_time": rollupZScoreOverTime,
"timestamp": rollupTimestamp,
"timestamp": rollupTlast,
"mode_over_time": rollupModeOverTime,
"rate_over_sum": rollupRateOverSum,
}
@ -1167,6 +1171,32 @@ func rollupTmax(rfa *rollupFuncArg) float64 {
return float64(maxTimestamp) / 1e3
}
func rollupTfirst(rfa *rollupFuncArg) float64 {
// There is no need in handling NaNs here, since they must be cleaned up
// before calling rollup funcs.
timestamps := rfa.timestamps
if len(timestamps) == 0 {
// Do not take into account rfa.prevTimestamp, since it may lead
// to inconsistent results comparing to Prometheus on broken time series
// with irregular data points.
return nan
}
return float64(timestamps[0]) / 1e3
}
func rollupTlast(rfa *rollupFuncArg) float64 {
// There is no need in handling NaNs here, since they must be cleaned up
// before calling rollup funcs.
timestamps := rfa.timestamps
if len(timestamps) == 0 {
// Do not take into account rfa.prevTimestamp, since it may lead
// to inconsistent results comparing to Prometheus on broken time series
// with irregular data points.
return nan
}
return float64(timestamps[len(timestamps)-1]) / 1e3
}
func rollupSum(rfa *rollupFuncArg) float64 {
// There is no need in handling NaNs here, since they must be cleaned up
// before calling rollup funcs.
@ -1662,19 +1692,6 @@ func rollupLow(rfa *rollupFuncArg) float64 {
return min
}
func rollupTimestamp(rfa *rollupFuncArg) float64 {
// There is no need in handling NaNs here, since they must be cleaned up
// before calling rollup funcs.
timestamps := rfa.timestamps
if len(timestamps) == 0 {
// Do not take into account rfa.prevTimestamp, since it may lead
// to inconsistent results comparing to Prometheus on broken time series
// with irregular data points.
return nan
}
return float64(timestamps[len(timestamps)-1]) / 1e3
}
func rollupModeOverTime(rfa *rollupFuncArg) float64 {
// There is no need in handling NaNs here, since they must be cleaned up
// before calling rollup funcs.

View file

@ -461,6 +461,8 @@ func TestRollupNewRollupFuncSuccess(t *testing.T) {
f("max_over_time", 123)
f("tmin_over_time", 0.08)
f("tmax_over_time", 0.005)
f("tfirst_over_time", 0.005)
f("tlast_over_time", 0.13)
f("sum_over_time", 565)
f("sum2_over_time", 37951)
f("geomean_over_time", 39.33466603189148)

View file

@ -2,8 +2,8 @@
DOCKER_NAMESPACE := victoriametrics
ROOT_IMAGE ?= alpine:3.12.1
CERTS_IMAGE := alpine:3.12.1
ROOT_IMAGE ?= alpine:3.12.3
CERTS_IMAGE := alpine:3.12.3
GO_BUILDER_IMAGE := golang:1.15.6
BUILDER_IMAGE := local/builder:2.0.0-$(shell echo $(GO_BUILDER_IMAGE) | tr : _)
BASE_IMAGE := local/base:1.1.1-$(shell echo $(ROOT_IMAGE) | tr : _)-$(shell echo $(CERTS_IMAGE) | tr : _)

View file

@ -23,6 +23,7 @@
* [Monitoring private clouds with VictoriaMetrics at LeroyMerlin](https://www.youtube.com/watch?v=74swsWqf0Uc)
* [Monitoring Kubernetes with VictoriaMetrics+Prometheus](https://speakerdeck.com/bo0km4n/victoriametrics-plus-prometheusdegou-zhu-surufu-shu-kubernetesfalsejian-shi-ji-pan)
* [High-performance Graphite storage solution on top of VictoriaMetrics](https://golangexample.com/a-high-performance-graphite-storage-solution/)
* [Cloud Native Model Driven Telemetry Stack on OpenShift](https://cer6erus.medium.com/cloud-native-model-driven-telemetry-stack-on-openshift-80712621f5bc)
## Our articles

View file

@ -2,12 +2,20 @@
# tip
* BUGFIX: vmagent: prevent from `dialing to the given TCP address time out` error when scraping big number of unavailable targets. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/987
* BUGFIX: vmagent: properly show scrape duration on `/targets` page. Previously it was incorrectly shown as 0.000s.
* BUGFIX: vmalert: return non-empty result in template func `query` stub to pass validation. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/989 .
* FEATURE: provide a sample list of alerting rules for VictoriaMetrics components. It is available [here](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/deployment/docker/alerts.yml).
* FEATURE: disable final merge for data for the previous month at the beginning of new month, since it may result in high disk IO and CPU usage. Final merge can be enabled by setting `-finalMergeDelay` command-line flag to positive duration.
* FEATURE: add `tfirst_over_time(m[d])` and `tlast_over_time(m[d])` functions to [MetricsQL](https://victoriametrics.github.io/MetricsQL.html) for returning timestamps for the first and the last data point in `m` over `d` duration.
* FEATURE: enforce at least TLS v1.2 when accepting HTTPS requests if `-tls`, `-tlsCertFile` and `-tlsKeyFile` command-line flags are set, because older TLS protocols such as v1.0 and v1.1 have been deprecated due to security vulnerabilities.
* FEATURE: support `extra_label` query arg for all HTTP-based [data ingestion protocols](https://victoriametrics.github.io/#how-to-import-time-series-data). This query arg can be used for specifying extra labels which should be added for the ingested data.
* BUGFIX: properly parse escaped unicode chars in MetricsQL metric names, label names and function names. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/990
* BUGFIX: override user-provided labels with labels set in `extra_label` query args during data ingestion over HTTP-based protocols.
* BUGFIX: vmagent: prevent from `dialing to the given TCP address time out` error when scraping big number of unavailable targets. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/987
* BUGFIX: vmagent: properly show scrape duration on `/targets` page. Previously it was incorrectly shown as 0.000s.
* BUGFIX: vmagent: properly log errors when `-promscrape.streamParse` command-line flag is set. See https://github.com/VictoriaMetrics/VictoriaMetrics/pull/1009
* BUGFIX: vmagent: properly suppress errors when both `-promscrape.suppressScrapeErrors` and `-promscrape.streamParse` command-line flags are set. See https://github.com/VictoriaMetrics/VictoriaMetrics/pull/1009 .
* BUGFIX: vmalert: return non-empty result in template func `query` stub to pass validation. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/989 .
* BUGFIX: upgrade base image for Docker packages from Alpine 3.12.1 to Alpine 3.12.3 in order to fix potential security issues. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1010
# [v1.51.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.51.0)

View file

@ -127,6 +127,8 @@ This functionality can be tried at [an editable Grafana dashboard](http://play-g
- `count_ne_over_time(m[d], N)` - returns the number of raw samples for `m` over `d` with values not equal to `N`.
- `tmin_over_time(m[d])` - returns timestamp for the minimum value for `m` over `d` time range.
- `tmax_over_time(m[d])` - returns timestamp for the maximum value for `m` over `d` time range.
- `tfirst_over_time(m[d])` - returns timestamp for the first sample for `m` over `d` time range.
- `tlast_over_time(m[d])` - returns timestamp for the last sample for `m` over `d` time range.
- `aggr_over_time(("aggr_func1", "aggr_func2", ...), m[d])` - simultaneously calculates all the listed `aggr_func*` for `m` over `d` time range.
`aggr_func*` can contain any functions that accept range vector. For instance, `aggr_over_time(("min_over_time", "max_over_time", "rate"), m[d])`
would calculate `min_over_time`, `max_over_time` and `rate` for `m[d]`.

View file

@ -409,6 +409,8 @@ The `/api/v1/export` endpoint should return the following response:
Note that Influx line protocol expects [timestamps in *nanoseconds* by default](https://docs.influxdata.com/influxdb/v1.7/write_protocols/line_protocol_tutorial/#timestamp),
while VictoriaMetrics stores them with *milliseconds* precision.
Extra labels may be added to all the written time series by passing `extra_label=name=value` query args.
For example, `/write?extra_label=foo=bar` would add `{foo="bar"}` label to all the ingested metrics.
## How to send data from Graphite-compatible agents such as [StatsD](https://github.com/etsy/statsd)
@ -524,6 +526,8 @@ The `/api/v1/export` endpoint should return the following response:
{"metric":{"__name__":"x.y.z","t1":"v1","t2":"v2"},"values":[45.34],"timestamps":[1566464763000]}
```
Extra labels may be added to all the imported time series by passing `extra_label=name=value` query args.
For example, `/api/put?extra_label=foo=bar` would add `{foo="bar"}` label to all the ingested metrics.
## Prometheus querying API usage
@ -1369,7 +1373,7 @@ cache when samples with timestamps older than `now - search.cacheTimestampOffset
## Data updates
VictoriaMetrics doesn't support updating already existing sample values to new ones. It stores all the ingested data points
for the same time series with identical timestamps. While is possible substituting old time series with new time series via
for the same time series with identical timestamps. While it is possible substituting old time series with new time series via
[removal of old time series](#how-to-delete-timeseries) and then [writing new time series](#backfilling), this approach
should be used only for one-off updates. It shouldn't be used for frequent updates because of non-zero overhead related to data removal.

2
go.mod
View file

@ -9,7 +9,7 @@ require (
// like https://github.com/valyala/fasthttp/commit/996610f021ff45fdc98c2ce7884d5fa4e7f9199b
github.com/VictoriaMetrics/fasthttp v1.0.11
github.com/VictoriaMetrics/metrics v1.12.3
github.com/VictoriaMetrics/metricsql v0.9.1
github.com/VictoriaMetrics/metricsql v0.10.0
github.com/aws/aws-sdk-go v1.36.23
github.com/cespare/xxhash/v2 v2.1.1
github.com/go-kit/kit v0.10.0

4
go.sum
View file

@ -86,8 +86,8 @@ github.com/VictoriaMetrics/fasthttp v1.0.11/go.mod h1:3SeUL4zwB/p/a9aEeRc6gdlbrt
github.com/VictoriaMetrics/metrics v1.12.2/go.mod h1:Z1tSfPfngDn12bTfZSCqArT3OPY3u88J12hSoOhuiRE=
github.com/VictoriaMetrics/metrics v1.12.3 h1:Fe6JHC6MSEKa+BtLhPN8WIvS+HKPzMc2evEpNeCGy7I=
github.com/VictoriaMetrics/metrics v1.12.3/go.mod h1:Z1tSfPfngDn12bTfZSCqArT3OPY3u88J12hSoOhuiRE=
github.com/VictoriaMetrics/metricsql v0.9.1 h1:CVl9fSW4pGhv7r9Q54zBPVVIGmwpAWvfo0QybVv+TV8=
github.com/VictoriaMetrics/metricsql v0.9.1/go.mod h1:ylO7YITho/Iw6P71oEaGyHbO94bGoGtzWfLGqFhMIg8=
github.com/VictoriaMetrics/metricsql v0.10.0 h1:45BARAP2shaL/5p67Hvz+YrWUbr0X0VCy9t+gvdIm8o=
github.com/VictoriaMetrics/metricsql v0.10.0/go.mod h1:ylO7YITho/Iw6P71oEaGyHbO94bGoGtzWfLGqFhMIg8=
github.com/VividCortex/gohistogram v1.0.0/go.mod h1:Pf5mBqqDxYaXu3hDrrU+w6nw50o/4+TcAqDqk/vUH7g=
github.com/afex/hystrix-go v0.0.0-20180502004556-fa1af6a1f4f5/go.mod h1:SkGFH1ia65gfNATL8TAiHDNxPzPdmEL5uirI2Uyuz6c=
github.com/agnivade/levenshtein v1.0.1/go.mod h1:CURSv5d9Uaml+FovSIICkLbAUZ9S4RqaHDIsdSBg7lM=

View file

@ -93,7 +93,9 @@ func Serve(addr string, rh RequestHandler) {
logger.Fatalf("cannot load TLS cert from tlsCertFile=%q, tlsKeyFile=%q: %s", *tlsCertFile, *tlsKeyFile, err)
}
cfg := &tls.Config{
Certificates: []tls.Certificate{cert},
Certificates: []tls.Certificate{cert},
MinVersion: tls.VersionTLS12,
PreferServerCipherSuites: true,
}
ln = tls.NewListener(ln, cfg)
}

View file

@ -343,7 +343,7 @@ func (sw *scrapeWork) scrapeStream(scrapeTimestamp, realTimestamp int64) error {
pushDataDuration.UpdateDuration(startTime)
wc.resetNoRows()
return nil
})
}, sw.logError)
responseSize = sr.bytesRead
sr.MustClose()
}
@ -373,7 +373,7 @@ func (sw *scrapeWork) scrapeStream(scrapeTimestamp, realTimestamp int64) error {
wc.reset()
writeRequestCtxPool.Put(wc)
tsmGlobal.Update(sw.Config, sw.ScrapeGroup, up == 1, realTimestamp, int64(duration*1000), err)
return nil
return err
}
// leveledWriteRequestCtxPool allows reducing memory usage when writeRequesCtx

View file

@ -18,7 +18,7 @@ import (
// The callback can be called concurrently multiple times for streamed data from r.
//
// callback shouldn't hold rows after returning.
func ParseStream(r io.Reader, defaultTimestamp int64, isGzipped bool, callback func(rows []Row) error) error {
func ParseStream(r io.Reader, defaultTimestamp int64, isGzipped bool, callback func(rows []Row) error, errLogger func(string)) error {
if isGzipped {
zr, err := common.GetGzipReader(r)
if err != nil {
@ -31,6 +31,7 @@ func ParseStream(r io.Reader, defaultTimestamp int64, isGzipped bool, callback f
defer putStreamContext(ctx)
for ctx.Read() {
uw := getUnmarshalWork()
uw.errLogger = errLogger
uw.callback = func(rows []Row) {
if err := callback(rows); err != nil {
ctx.callbackErrLock.Lock()
@ -133,6 +134,7 @@ var streamContextPoolCh = make(chan *streamContext, cgroup.AvailableCPUs())
type unmarshalWork struct {
rows Rows
callback func(rows []Row)
errLogger func(string)
defaultTimestamp int64
reqBuf []byte
}
@ -140,13 +142,18 @@ type unmarshalWork struct {
func (uw *unmarshalWork) reset() {
uw.rows.Reset()
uw.callback = nil
uw.errLogger = nil
uw.defaultTimestamp = 0
uw.reqBuf = uw.reqBuf[:0]
}
// Unmarshal implements common.UnmarshalWork
func (uw *unmarshalWork) Unmarshal() {
uw.rows.Unmarshal(bytesutil.ToUnsafeString(uw.reqBuf))
if uw.errLogger != nil {
uw.rows.UnmarshalWithErrLogger(bytesutil.ToUnsafeString(uw.reqBuf), uw.errLogger)
} else {
uw.rows.Unmarshal(bytesutil.ToUnsafeString(uw.reqBuf))
}
rows := uw.rows.Rows
rowsRead.Add(len(rows))

View file

@ -31,7 +31,7 @@ func TestParseStream(t *testing.T) {
}
lock.Unlock()
return nil
})
}, nil)
if err != nil {
t.Fatalf("unexpected error when parsing %q: %s", s, err)
}
@ -64,7 +64,7 @@ func TestParseStream(t *testing.T) {
}
lock.Unlock()
return nil
})
}, nil)
if err != nil {
t.Fatalf("unexpected error when parsing compressed %q: %s", s, err)
}

View file

@ -364,7 +364,8 @@ func (mn *MetricName) SortAndMarshal(dst []byte) []byte {
// Marshal appends marshaled mn to dst and returns the result.
//
// Tags must be sorted before calling this function.
// mn.sortTags must be called before calling this function
// in order to sort and de-duplcate tags.
func (mn *MetricName) Marshal(dst []byte) []byte {
// Calculate the required size and pre-allocate space in dst
dstLen := len(dst)
@ -380,7 +381,11 @@ func (mn *MetricName) Marshal(dst []byte) []byte {
dst = marshalTagValue(dst, mn.MetricGroup)
// Marshal tags.
dst = marshalTags(dst, mn.Tags)
tags := mn.Tags
for i := range tags {
t := &tags[i]
dst = t.Marshal(dst)
}
return dst
}
@ -404,7 +409,7 @@ func (mn *MetricName) Unmarshal(src []byte) error {
}
// There is no need in verifying for identical tag keys,
// since they must be handled in MetricName.Marshal inside marshalTags.
// since they must be handled by MetricName.sortTags before calling MetricName.Marshal.
return nil
}
@ -556,7 +561,10 @@ func unmarshalBytesFast(src []byte) ([]byte, []byte, error) {
return src[n:], src[:n], nil
}
// sortTags sorts tags in mn.
// sortTags sorts tags in mn to canonical form needed for storing in the index.
//
// The function also de-duplicates tags with identical keys in mn. The last tag value
// for duplicate tags wins.
//
// Tags sorting is quite slow, so try avoiding it by caching mn
// with sorted tags.
@ -578,12 +586,25 @@ func (mn *MetricName) sortTags() {
}
cts.tags = dst
// Use sort.Sort instead of sort.Slice, since sort.Slice allocates a lot.
sort.Sort(&cts.tags)
// Use sort.Stable instead of sort.Sort in order to preserve the order of tags with duplicate keys.
// The last tag value wins for tags with duplicate keys.
// Use sort.Stable instead of sort.SliceStable, since sort.SliceStable allocates a lot.
sort.Stable(&cts.tags)
j := 0
var prevKey []byte
for i := range cts.tags {
mn.Tags[i].copyFrom(&cts.tags[i].tag)
tag := &cts.tags[i].tag
if j > 0 && bytes.Equal(tag.Key, prevKey) {
// Overwrite the previous tag with duplicate key.
j--
} else {
prevKey = tag.Key
}
mn.Tags[j].copyFrom(tag)
j++
}
mn.Tags = mn.Tags[:j]
putCanonicalTags(cts)
}
@ -624,20 +645,6 @@ func (ts *canonicalTagsSort) Swap(i, j int) {
x[i], x[j] = x[j], x[i]
}
func marshalTags(dst []byte, tags []Tag) []byte {
var prevKey []byte
for i := range tags {
t := &tags[i]
if string(prevKey) == string(t.Key) {
// Skip duplicate keys, since they aren't allowed in Prometheus data model.
continue
}
prevKey = t.Key
dst = t.Marshal(dst)
}
return dst
}
func copyTags(dst, src []Tag) []Tag {
dstLen := len(dst)
if n := dstLen + len(src) - cap(dst); n > 0 {

View file

@ -64,15 +64,16 @@ func TestMetricNameMarshalDuplicateKeys(t *testing.T) {
var mn MetricName
mn.MetricGroup = []byte("xxx")
mn.AddTag("foo", "bar")
mn.AddTag("duplicate", "tag")
mn.AddTag("duplicate", "tag")
mn.AddTag("tt", "xx")
mn.AddTag("duplicate", "tag1")
mn.AddTag("duplicate", "tag2")
mn.AddTag("tt", "xx")
mn.AddTag("foo", "abc")
mn.AddTag("duplicate", "tag3")
var mnExpected MetricName
mnExpected.MetricGroup = []byte("xxx")
mnExpected.AddTag("duplicate", "tag")
mnExpected.AddTag("foo", "bar")
mnExpected.AddTag("duplicate", "tag3")
mnExpected.AddTag("foo", "abc")
mnExpected.AddTag("tt", "xx")
mn.sortTags()

View file

@ -4,6 +4,8 @@ import (
"fmt"
"strconv"
"strings"
"unicode"
"unicode/utf8"
)
type lexer struct {
@ -220,13 +222,12 @@ func scanIdent(s string) string {
if s[i] != '\\' {
break
}
i++
// Do not verify the next char, since it is escaped.
i += 2
if i > len(s) {
i--
break
}
// The next char may be encoded as multi-byte UTF8 sequence. See https://en.wikipedia.org/wiki/UTF-8#Encoding
_, size := utf8.DecodeRuneInString(s[i:])
i += size
}
if i == 0 {
panic("BUG: scanIdent couldn't find a single ident char; make sure isIdentPrefix called before scanIdent")
@ -257,8 +258,10 @@ func unescapeIdent(s string) string {
s = s[1:]
}
} else {
dst = append(dst, s[0])
s = s[1:]
// UTF8 char. See https://en.wikipedia.org/wiki/UTF-8#Encoding
_, size := utf8.DecodeRuneInString(s)
dst = append(dst, s[:size]...)
s = s[size:]
}
n = strings.IndexByte(s, '\\')
if n < 0 {
@ -298,12 +301,18 @@ func appendEscapedIdent(dst []byte, s string) []byte {
} else {
dst = append(dst, ch)
}
} else if ch >= 0x20 && ch < 0x7f {
// Leave ASCII printable chars as is
dst = append(dst, '\\', ch)
continue
}
// escape ch
dst = append(dst, '\\')
r, size := utf8.DecodeRuneInString(s[i:])
if r != utf8.RuneError && unicode.IsPrint(r) {
dst = append(dst, s[i:i+size]...)
i += size - 1
} else {
// hex-encode non-printable chars
dst = append(dst, '\\', 'x', toHex(ch>>4), toHex(ch&0xf))
dst = append(dst, 'x', toHex(ch>>4), toHex(ch&0xf))
}
}
return dst

View file

@ -45,6 +45,8 @@ var rollupFuncs = map[string]bool{
"scrape_interval": true,
"tmin_over_time": true,
"tmax_over_time": true,
"tfirst_over_time": true,
"tlast_over_time": true,
"share_le_over_time": true,
"share_gt_over_time": true,
"count_le_over_time": true,

2
vendor/modules.txt vendored
View file

@ -16,7 +16,7 @@ github.com/VictoriaMetrics/fasthttp/fasthttputil
github.com/VictoriaMetrics/fasthttp/stackless
# github.com/VictoriaMetrics/metrics v1.12.3
github.com/VictoriaMetrics/metrics
# github.com/VictoriaMetrics/metricsql v0.9.1
# github.com/VictoriaMetrics/metricsql v0.10.0
github.com/VictoriaMetrics/metricsql
github.com/VictoriaMetrics/metricsql/binaryop
# github.com/aws/aws-sdk-go v1.36.23