Merge branch 'public-single-node' into pmm-6401-read-prometheus-data-files

This commit is contained in:
Aliaksandr Valialkin 2023-02-13 11:11:49 -08:00
commit 2ce4d04d8e
No known key found for this signature in database
GPG key ID: A72BEC6CD3D0DED1
90 changed files with 444 additions and 305 deletions

View file

@ -9,6 +9,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/csvimport" parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/csvimport"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/csvimport/stream"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics" "github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics"
"github.com/VictoriaMetrics/metrics" "github.com/VictoriaMetrics/metrics"
) )
@ -25,7 +26,7 @@ func InsertHandler(at *auth.Token, req *http.Request) error {
if err != nil { if err != nil {
return err return err
} }
return parser.ParseStream(req, func(rows []parser.Row) error { return stream.Parse(req, func(rows []parser.Row) error {
return insertRows(at, rows, extraLabels) return insertRows(at, rows, extraLabels)
}) })
} }

View file

@ -9,6 +9,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadog" parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadog"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadog/stream"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics" "github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics"
"github.com/VictoriaMetrics/metrics" "github.com/VictoriaMetrics/metrics"
) )
@ -28,7 +29,7 @@ func InsertHandlerForHTTP(at *auth.Token, req *http.Request) error {
return err return err
} }
ce := req.Header.Get("Content-Encoding") ce := req.Header.Get("Content-Encoding")
return parser.ParseStream(req.Body, ce, func(series []parser.Series) error { return stream.Parse(req.Body, ce, func(series []parser.Series) error {
return insertRows(at, series, extraLabels) return insertRows(at, series, extraLabels)
}) })
} }

View file

@ -7,6 +7,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/remotewrite" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/remotewrite"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/graphite" parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/graphite"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/graphite/stream"
"github.com/VictoriaMetrics/metrics" "github.com/VictoriaMetrics/metrics"
) )
@ -19,7 +20,7 @@ var (
// //
// See https://graphite.readthedocs.io/en/latest/feeding-carbon.html#the-plaintext-protocol // See https://graphite.readthedocs.io/en/latest/feeding-carbon.html#the-plaintext-protocol
func InsertHandler(r io.Reader) error { func InsertHandler(r io.Reader) error {
return parser.ParseStream(r, insertRows) return stream.Parse(r, insertRows)
} }
func insertRows(rows []parser.Row) error { func insertRows(rows []parser.Row) error {

View file

@ -15,6 +15,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel"
parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/influx" parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/influx"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/influx/stream"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics" "github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics"
"github.com/VictoriaMetrics/metrics" "github.com/VictoriaMetrics/metrics"
) )
@ -36,7 +37,7 @@ var (
// //
// See https://github.com/influxdata/telegraf/tree/master/plugins/inputs/socket_listener/ // See https://github.com/influxdata/telegraf/tree/master/plugins/inputs/socket_listener/
func InsertHandlerForReader(r io.Reader, isGzipped bool) error { func InsertHandlerForReader(r io.Reader, isGzipped bool) error {
return parser.ParseStream(r, isGzipped, "", "", func(db string, rows []parser.Row) error { return stream.Parse(r, isGzipped, "", "", func(db string, rows []parser.Row) error {
return insertRows(nil, db, rows, nil) return insertRows(nil, db, rows, nil)
}) })
} }
@ -54,7 +55,7 @@ func InsertHandlerForHTTP(at *auth.Token, req *http.Request) error {
precision := q.Get("precision") precision := q.Get("precision")
// Read db tag from https://docs.influxdata.com/influxdb/v1.7/tools/api/#write-http-endpoint // Read db tag from https://docs.influxdata.com/influxdb/v1.7/tools/api/#write-http-endpoint
db := q.Get("db") db := q.Get("db")
return parser.ParseStream(req.Body, isGzipped, precision, db, func(db string, rows []parser.Row) error { return stream.Parse(req.Body, isGzipped, precision, db, func(db string, rows []parser.Row) error {
return insertRows(at, db, rows, extraLabels) return insertRows(at, db, rows, extraLabels)
}) })
} }

View file

@ -10,7 +10,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/native" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/native/stream"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics" "github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics"
"github.com/VictoriaMetrics/metrics" "github.com/VictoriaMetrics/metrics"
) )
@ -30,12 +30,12 @@ func InsertHandler(at *auth.Token, req *http.Request) error {
return err return err
} }
isGzip := req.Header.Get("Content-Encoding") == "gzip" isGzip := req.Header.Get("Content-Encoding") == "gzip"
return parser.ParseStream(req.Body, isGzip, func(block *parser.Block) error { return stream.Parse(req.Body, isGzip, func(block *stream.Block) error {
return insertRows(at, block, extraLabels) return insertRows(at, block, extraLabels)
}) })
} }
func insertRows(at *auth.Token, block *parser.Block, extraLabels []prompbmarshal.Label) error { func insertRows(at *auth.Token, block *stream.Block, extraLabels []prompbmarshal.Label) error {
ctx := common.GetPushCtx() ctx := common.GetPushCtx()
defer common.PutPushCtx(ctx) defer common.PutPushCtx(ctx)

View file

@ -7,6 +7,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/remotewrite" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/remotewrite"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentsdb" parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentsdb"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentsdb/stream"
"github.com/VictoriaMetrics/metrics" "github.com/VictoriaMetrics/metrics"
) )
@ -19,7 +20,7 @@ var (
// //
// See http://opentsdb.net/docs/build/html/api_telnet/put.html // See http://opentsdb.net/docs/build/html/api_telnet/put.html
func InsertHandler(r io.Reader) error { func InsertHandler(r io.Reader) error {
return parser.ParseStream(r, insertRows) return stream.Parse(r, insertRows)
} }
func insertRows(rows []parser.Row) error { func insertRows(rows []parser.Row) error {

View file

@ -9,6 +9,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentsdbhttp" parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentsdbhttp"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentsdbhttp/stream"
"github.com/VictoriaMetrics/metrics" "github.com/VictoriaMetrics/metrics"
) )
@ -24,7 +25,7 @@ func InsertHandler(at *auth.Token, req *http.Request) error {
if err != nil { if err != nil {
return err return err
} }
return parser.ParseStream(req, func(rows []parser.Row) error { return stream.Parse(req, func(rows []parser.Row) error {
return insertRows(at, rows, extraLabels) return insertRows(at, rows, extraLabels)
}) })
} }

View file

@ -10,6 +10,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/prometheus" parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/prometheus"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/prometheus/stream"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics" "github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics"
"github.com/VictoriaMetrics/metrics" "github.com/VictoriaMetrics/metrics"
) )
@ -31,7 +32,7 @@ func InsertHandler(at *auth.Token, req *http.Request) error {
return err return err
} }
isGzipped := req.Header.Get("Content-Encoding") == "gzip" isGzipped := req.Header.Get("Content-Encoding") == "gzip"
return parser.ParseStream(req.Body, defaultTimestamp, isGzipped, func(rows []parser.Row) error { return stream.Parse(req.Body, defaultTimestamp, isGzipped, func(rows []parser.Row) error {
return insertRows(at, rows, extraLabels) return insertRows(at, rows, extraLabels)
}, func(s string) { }, func(s string) {
httpserver.LogError(req, s) httpserver.LogError(req, s)

View file

@ -10,7 +10,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/promremotewrite" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/promremotewrite/stream"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics" "github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics"
"github.com/VictoriaMetrics/metrics" "github.com/VictoriaMetrics/metrics"
) )
@ -27,7 +27,7 @@ func InsertHandler(at *auth.Token, req *http.Request) error {
if err != nil { if err != nil {
return err return err
} }
return parser.ParseStream(req.Body, func(tss []prompb.TimeSeries) error { return stream.Parse(req.Body, func(tss []prompb.TimeSeries) error {
return insertRows(at, tss, extraLabels) return insertRows(at, tss, extraLabels)
}) })
} }

View file

@ -11,6 +11,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/vmimport" parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/vmimport"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/vmimport/stream"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics" "github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics"
"github.com/VictoriaMetrics/metrics" "github.com/VictoriaMetrics/metrics"
) )
@ -30,7 +31,7 @@ func InsertHandler(at *auth.Token, req *http.Request) error {
return err return err
} }
isGzipped := req.Header.Get("Content-Encoding") == "gzip" isGzipped := req.Header.Get("Content-Encoding") == "gzip"
return parser.ParseStream(req.Body, isGzipped, func(rows []parser.Row) error { return stream.Parse(req.Body, isGzipped, func(rows []parser.Row) error {
return insertRows(at, rows, extraLabels) return insertRows(at, rows, extraLabels)
}) })
} }

View file

@ -725,31 +725,42 @@ a review to the dashboard.
## Troubleshooting ## Troubleshooting
vmalert executes configured rules within certain intervals. It is expected that at the moment when rule is executed, ### Data delay
the data is already present in configured `-datasource.url`:
Data delay is one of the most common issues with rules execution.
vmalert executes configured rules within certain intervals at specifics timestamps.
It expects that the data is already present in configured `-datasource.url` at the moment of time when rule is executed:
<img alt="vmalert expected evaluation" src="vmalert_ts_normal.gif"> <img alt="vmalert expected evaluation" src="vmalert_ts_normal.gif">
Usually, troubles start to appear when data in `-datasource.url` is delayed or absent. In such cases, evaluations Usually, troubles start to appear when data in `-datasource.url` is delayed or absent. In such cases, evaluations
may get empty response from datasource and produce empty recording rules or reset alerts state: may get empty response from the datasource and produce empty recording rules or reset alerts state:
<img alt="vmalert evaluation when data is delayed" src="vmalert_ts_data_delay.gif"> <img alt="vmalert evaluation when data is delayed" src="vmalert_ts_data_delay.gif">
By default, recently written samples to VictoriaMetrics aren't visible for queries for up to 30s. Try the following recommendations to reduce the chance of hitting the data delay issue:
This behavior is controlled by `-search.latencyOffset` command-line flag and the `latency_offset` query ag at `vmselect`.
Usually, this results into a 30s shift for recording rules results.
Note that too small value passed to `-search.latencyOffset` or to `latency_offest` query arg may lead to incomplete query results.
Try the following recommendations in such cases: * Always configure group's `evaluationInterval` to be bigger or at least equal to
[time series resolution](https://docs.victoriametrics.com/keyConcepts.html#time-series-resolution);
* Always configure group's `evaluationInterval` to be bigger or equal to `scrape_interval` at which metrics * Ensure that `[duration]` value is at least twice bigger than
are delivered to the datasource; [time series resolution](https://docs.victoriametrics.com/keyConcepts.html#time-series-resolution). For example,
if expression is `rate(my_metric[2m]) > 0` then ensure that `my_metric` resolution is at least `1m` or better `30s`.
If you use VictoriaMetrics as datasource, `[duration]` can be omitted and VictoriaMetrics will adjust it automatically.
* If you know in advance, that data in datasource is delayed - try changing vmalert's `-datasource.lookback` * If you know in advance, that data in datasource is delayed - try changing vmalert's `-datasource.lookback`
command-line flag to add a time shift for evaluations; command-line flag to add a time shift for evaluations. Or extend `[duration]` to tolerate the delay.
* If time intervals between datapoints in datasource are irregular or `>=5min` - try changing vmalert's For example, `max_over_time(errors_total[10m]) > 0` will be active even if there is no data in datasource for last `9m`.
`-datasource.queryStep` command-line flag to specify how far search query can lookback for the recent datapoint. * If [time series resolution](https://docs.victoriametrics.com/keyConcepts.html#time-series-resolution)
The recommendation is to have the step at least two times bigger than `scrape_interval`, since in datasource is inconsistent or `>=5min` - try changing vmalert's `-datasource.queryStep` command-line flag to specify
there are no guarantees that scrape will not fail. how far search query can lookback for the recent datapoint. The recommendation is to have the step
at least two times bigger than the resolution.
> Please note, data delay is inevitable in distributed systems. And it is better to account for it instead of ignoring.
By default, recently written samples to VictoriaMetrics aren't visible for queries for up to 30s
(see `-search.latencyOffset` command-line flag at vmselect). Such delay is needed to eliminate risk of incomplete
data on the moment of querying, since metrics collectors won't be able to deliver the data in time.
### Alerts state
Sometimes, it is not clear why some specific alert fired or didn't fire. It is very important to remember, that Sometimes, it is not clear why some specific alert fired or didn't fire. It is very important to remember, that
alerts with `for: 0` fire immediately when their expression becomes true. And alerts with `for > 0` will fire only alerts with `for: 0` fire immediately when their expression becomes true. And alerts with `for > 0` will fire only
@ -772,6 +783,8 @@ HTTP request sent by vmalert to the `-datasource.url` during evaluation. If spec
no samples returned and curl command returns data - then it is very likely there was no data in datasource on the no samples returned and curl command returns data - then it is very likely there was no data in datasource on the
moment when rule was evaluated. moment when rule was evaluated.
### Debug mode
vmalert allows configuring more detailed logging for specific alerting rule. Just set `debug: true` in rule's configuration vmalert allows configuring more detailed logging for specific alerting rule. Just set `debug: true` in rule's configuration
and vmalert will start printing additional log messages: and vmalert will start printing additional log messages:
```terminal ```terminal

View file

@ -41,7 +41,7 @@ type Alert struct {
LastSent time.Time LastSent time.Time
// Value stores the value returned from evaluating expression from Expr field // Value stores the value returned from evaluating expression from Expr field
Value float64 Value float64
// ID is the unique identifer for the Alert // ID is the unique identifier for the Alert
ID uint64 ID uint64
// Restored is true if Alert was restored after restart // Restored is true if Alert was restored after restart
Restored bool Restored bool

View file

@ -29,7 +29,7 @@ type Config struct {
// ConsulSDConfigs contains list of settings for service discovery via Consul // ConsulSDConfigs contains list of settings for service discovery via Consul
// see https://prometheus.io/docs/prometheus/latest/configuration/configuration/#consul_sd_config // see https://prometheus.io/docs/prometheus/latest/configuration/configuration/#consul_sd_config
ConsulSDConfigs []consul.SDConfig `yaml:"consul_sd_configs,omitempty"` ConsulSDConfigs []consul.SDConfig `yaml:"consul_sd_configs,omitempty"`
// DNSSDConfigs ontains list of settings for service discovery via DNS. // DNSSDConfigs contains list of settings for service discovery via DNS.
// See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#dns_sd_config // See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#dns_sd_config
DNSSDConfigs []dns.SDConfig `yaml:"dns_sd_configs,omitempty"` DNSSDConfigs []dns.SDConfig `yaml:"dns_sd_configs,omitempty"`

View file

@ -225,7 +225,7 @@ func templateFuncs() textTpl.FuncMap {
"toLower": strings.ToLower, "toLower": strings.ToLower,
// crlfEscape replaces '\n' and '\r' chars with `\\n` and `\\r`. // crlfEscape replaces '\n' and '\r' chars with `\\n` and `\\r`.
// This funcion is deprectated. // This function is deprecated.
// //
// It is better to use quotesEscape, jsonEscape, queryEscape or pathEscape instead - // It is better to use quotesEscape, jsonEscape, queryEscape or pathEscape instead -
// these functions properly escape `\n` and `\r` chars according to their purpose. // these functions properly escape `\n` and `\r` chars according to their purpose.

View file

@ -110,7 +110,7 @@ type SrcPath struct {
re *regexp.Regexp re *regexp.Regexp
} }
// URLPrefix represents pased `url_prefix` // URLPrefix represents passed `url_prefix`
type URLPrefix struct { type URLPrefix struct {
n uint32 n uint32
bus []*backendURL bus []*backendURL

View file

@ -50,7 +50,7 @@ func normalizeURL(uOrig *url.URL) *url.URL {
// Prevent from attacks with using `..` in r.URL.Path // Prevent from attacks with using `..` in r.URL.Path
u.Path = path.Clean(u.Path) u.Path = path.Clean(u.Path)
if !strings.HasSuffix(u.Path, "/") && strings.HasSuffix(uOrig.Path, "/") { if !strings.HasSuffix(u.Path, "/") && strings.HasSuffix(uOrig.Path, "/") {
// The path.Clean() removes traling slash. // The path.Clean() removes trailing slash.
// Return it back if needed. // Return it back if needed.
// This should fix https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1752 // This should fix https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1752
u.Path += "/" u.Path += "/"

View file

@ -20,7 +20,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/vm" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/vm"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/buildinfo" "github.com/VictoriaMetrics/VictoriaMetrics/lib/buildinfo"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/native" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/native/stream"
) )
func main() { func main() {
@ -247,7 +247,7 @@ func main() {
return cli.Exit(fmt.Errorf("cannot open exported block at path=%q err=%w", blockPath, err), 1) return cli.Exit(fmt.Errorf("cannot open exported block at path=%q err=%w", blockPath, err), 1)
} }
var blocksCount uint64 var blocksCount uint64
if err := parser.ParseStream(f, isBlockGzipped, func(block *parser.Block) error { if err := stream.Parse(f, isBlockGzipped, func(block *stream.Block) error {
atomic.AddUint64(&blocksCount, 1) atomic.AddUint64(&blocksCount, 1)
return nil return nil
}); err != nil { }); err != nil {

View file

@ -146,7 +146,7 @@ func (ctx *InsertCtx) FlushBufs() error {
} }
// There is no need in limiting the number of concurrent calls to vmstorage.AddRows() here, // There is no need in limiting the number of concurrent calls to vmstorage.AddRows() here,
// since the number of concurrent FlushBufs() calls should be already limited via writeconcurrencylimiter // since the number of concurrent FlushBufs() calls should be already limited via writeconcurrencylimiter
// used at every ParseStream() call under lib/protoparser/*/streamparser.go // used at every stream.Parse() call under lib/protoparser/*
err := vmstorage.AddRows(ctx.mrs) err := vmstorage.AddRows(ctx.mrs)
ctx.Reset(0) ctx.Reset(0)
if err == nil { if err == nil {

View file

@ -8,6 +8,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/csvimport" parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/csvimport"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/csvimport/stream"
"github.com/VictoriaMetrics/metrics" "github.com/VictoriaMetrics/metrics"
) )
@ -22,7 +23,7 @@ func InsertHandler(req *http.Request) error {
if err != nil { if err != nil {
return err return err
} }
return parser.ParseStream(req, func(rows []parser.Row) error { return stream.Parse(req, func(rows []parser.Row) error {
return insertRows(rows, extraLabels) return insertRows(rows, extraLabels)
}) })
} }

View file

@ -8,6 +8,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadog" parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadog"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadog/stream"
"github.com/VictoriaMetrics/metrics" "github.com/VictoriaMetrics/metrics"
) )
@ -25,7 +26,7 @@ func InsertHandlerForHTTP(req *http.Request) error {
return err return err
} }
ce := req.Header.Get("Content-Encoding") ce := req.Header.Get("Content-Encoding")
return parser.ParseStream(req.Body, ce, func(series []parser.Series) error { return stream.Parse(req.Body, ce, func(series []parser.Series) error {
return insertRows(series, extraLabels) return insertRows(series, extraLabels)
}) })
} }

View file

@ -6,6 +6,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/common" "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/common"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/relabel" "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/relabel"
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/graphite" parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/graphite"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/graphite/stream"
"github.com/VictoriaMetrics/metrics" "github.com/VictoriaMetrics/metrics"
) )
@ -18,7 +19,7 @@ var (
// //
// See https://graphite.readthedocs.io/en/latest/feeding-carbon.html#the-plaintext-protocol // See https://graphite.readthedocs.io/en/latest/feeding-carbon.html#the-plaintext-protocol
func InsertHandler(r io.Reader) error { func InsertHandler(r io.Reader) error {
return parser.ParseStream(r, insertRows) return stream.Parse(r, insertRows)
} }
func insertRows(rows []parser.Row) error { func insertRows(rows []parser.Row) error {

View file

@ -14,6 +14,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/influx" parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/influx"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/influx/stream"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage" "github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
"github.com/VictoriaMetrics/metrics" "github.com/VictoriaMetrics/metrics"
) )
@ -34,7 +35,7 @@ var (
// //
// See https://github.com/influxdata/telegraf/tree/master/plugins/inputs/socket_listener/ // See https://github.com/influxdata/telegraf/tree/master/plugins/inputs/socket_listener/
func InsertHandlerForReader(r io.Reader) error { func InsertHandlerForReader(r io.Reader) error {
return parser.ParseStream(r, false, "", "", func(db string, rows []parser.Row) error { return stream.Parse(r, false, "", "", func(db string, rows []parser.Row) error {
return insertRows(db, rows, nil) return insertRows(db, rows, nil)
}) })
} }
@ -52,7 +53,7 @@ func InsertHandlerForHTTP(req *http.Request) error {
precision := q.Get("precision") precision := q.Get("precision")
// Read db tag from https://docs.influxdata.com/influxdb/v1.7/tools/api/#write-http-endpoint // Read db tag from https://docs.influxdata.com/influxdb/v1.7/tools/api/#write-http-endpoint
db := q.Get("db") db := q.Get("db")
return parser.ParseStream(req.Body, isGzipped, precision, db, func(db string, rows []parser.Row) error { return stream.Parse(req.Body, isGzipped, precision, db, func(db string, rows []parser.Row) error {
return insertRows(db, rows, extraLabels) return insertRows(db, rows, extraLabels)
}) })
} }

View file

@ -10,7 +10,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/native" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/native/stream"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage" "github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
"github.com/VictoriaMetrics/metrics" "github.com/VictoriaMetrics/metrics"
) )
@ -27,12 +27,12 @@ func InsertHandler(req *http.Request) error {
return err return err
} }
isGzip := req.Header.Get("Content-Encoding") == "gzip" isGzip := req.Header.Get("Content-Encoding") == "gzip"
return parser.ParseStream(req.Body, isGzip, func(block *parser.Block) error { return stream.Parse(req.Body, isGzip, func(block *stream.Block) error {
return insertRows(block, extraLabels) return insertRows(block, extraLabels)
}) })
} }
func insertRows(block *parser.Block, extraLabels []prompbmarshal.Label) error { func insertRows(block *stream.Block, extraLabels []prompbmarshal.Label) error {
ctx := getPushCtx() ctx := getPushCtx()
defer putPushCtx(ctx) defer putPushCtx(ctx)

View file

@ -6,6 +6,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/common" "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/common"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/relabel" "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/relabel"
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentsdb" parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentsdb"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentsdb/stream"
"github.com/VictoriaMetrics/metrics" "github.com/VictoriaMetrics/metrics"
) )
@ -18,7 +19,7 @@ var (
// //
// See http://opentsdb.net/docs/build/html/api_telnet/put.html // See http://opentsdb.net/docs/build/html/api_telnet/put.html
func InsertHandler(r io.Reader) error { func InsertHandler(r io.Reader) error {
return parser.ParseStream(r, insertRows) return stream.Parse(r, insertRows)
} }
func insertRows(rows []parser.Row) error { func insertRows(rows []parser.Row) error {

View file

@ -9,6 +9,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentsdbhttp" parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentsdbhttp"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentsdbhttp/stream"
"github.com/VictoriaMetrics/metrics" "github.com/VictoriaMetrics/metrics"
) )
@ -27,7 +28,7 @@ func InsertHandler(req *http.Request) error {
if err != nil { if err != nil {
return err return err
} }
return parser.ParseStream(req, func(rows []parser.Row) error { return stream.Parse(req, func(rows []parser.Row) error {
return insertRows(rows, extraLabels) return insertRows(rows, extraLabels)
}) })
default: default:

View file

@ -9,6 +9,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/prometheus" parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/prometheus"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/prometheus/stream"
"github.com/VictoriaMetrics/metrics" "github.com/VictoriaMetrics/metrics"
) )
@ -28,7 +29,7 @@ func InsertHandler(req *http.Request) error {
return err return err
} }
isGzipped := req.Header.Get("Content-Encoding") == "gzip" isGzipped := req.Header.Get("Content-Encoding") == "gzip"
return parser.ParseStream(req.Body, defaultTimestamp, isGzipped, func(rows []parser.Row) error { return stream.Parse(req.Body, defaultTimestamp, isGzipped, func(rows []parser.Row) error {
return insertRows(rows, extraLabels) return insertRows(rows, extraLabels)
}, func(s string) { }, func(s string) {
httpserver.LogError(req, s) httpserver.LogError(req, s)

View file

@ -21,7 +21,7 @@ func Push(wr *prompbmarshal.WriteRequest) {
tss := wr.Timeseries tss := wr.Timeseries
for len(tss) > 0 { for len(tss) > 0 {
// Process big tss in smaller blocks in order to reduce maxmimum memory usage // Process big tss in smaller blocks in order to reduce maximum memory usage
samplesCount := 0 samplesCount := 0
i := 0 i := 0
for i < len(tss) { for i < len(tss) {

View file

@ -8,7 +8,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/promremotewrite" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/promremotewrite/stream"
"github.com/VictoriaMetrics/metrics" "github.com/VictoriaMetrics/metrics"
) )
@ -23,7 +23,7 @@ func InsertHandler(req *http.Request) error {
if err != nil { if err != nil {
return err return err
} }
return parser.ParseStream(req.Body, func(tss []prompb.TimeSeries) error { return stream.Parse(req.Body, func(tss []prompb.TimeSeries) error {
return insertRows(tss, extraLabels) return insertRows(tss, extraLabels)
}) })
} }

View file

@ -11,6 +11,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/vmimport" parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/vmimport"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/vmimport/stream"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage" "github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
"github.com/VictoriaMetrics/metrics" "github.com/VictoriaMetrics/metrics"
) )
@ -29,7 +30,7 @@ func InsertHandler(req *http.Request) error {
return err return err
} }
isGzipped := req.Header.Get("Content-Encoding") == "gzip" isGzipped := req.Header.Get("Content-Encoding") == "gzip"
return parser.ParseStream(req.Body, isGzipped, func(rows []parser.Row) error { return stream.Parse(req.Body, isGzipped, func(rows []parser.Row) error {
return insertRows(rows, extraLabels) return insertRows(rows, extraLabels)
}) })
} }

View file

@ -164,7 +164,7 @@ func (p *parser) parseString() (*StringExpr, error) {
return se, nil return se, nil
} }
// StringExpr represents string contant. // StringExpr represents string constant.
type StringExpr struct { type StringExpr struct {
// S contains unquoted string contents. // S contains unquoted string contents.
S string S string

View file

@ -194,7 +194,7 @@ func getTmpResult() *result {
func putTmpResult(r *result) { func putTmpResult(r *result) {
currentTime := fasttime.UnixTimestamp() currentTime := fasttime.UnixTimestamp()
if cap(r.rs.Values) > 1024*1024 && 4*len(r.rs.Values) < cap(r.rs.Values) && currentTime-r.lastResetTime > 10 { if cap(r.rs.Values) > 1024*1024 && 4*len(r.rs.Values) < cap(r.rs.Values) && currentTime-r.lastResetTime > 10 {
// Reset r.rs in order to preseve memory usage after processing big time series with millions of rows. // Reset r.rs in order to preserve memory usage after processing big time series with millions of rows.
r.rs = Result{} r.rs = Result{}
r.lastResetTime = currentTime r.lastResetTime = currentTime
} }
@ -1015,7 +1015,7 @@ func TagValueSuffixes(qt *querytracer.Tracer, tr storage.TimeRange, tagKey, tagV
// TSDBStatus returns tsdb status according to https://prometheus.io/docs/prometheus/latest/querying/api/#tsdb-stats // TSDBStatus returns tsdb status according to https://prometheus.io/docs/prometheus/latest/querying/api/#tsdb-stats
// //
// It accepts aribtrary filters on time series in sq. // It accepts arbitrary filters on time series in sq.
func TSDBStatus(qt *querytracer.Tracer, sq *storage.SearchQuery, focusLabel string, topN int, deadline searchutils.Deadline) (*storage.TSDBStatus, error) { func TSDBStatus(qt *querytracer.Tracer, sq *storage.SearchQuery, focusLabel string, topN int, deadline searchutils.Deadline) (*storage.TSDBStatus, error) {
qt = qt.NewChild("get tsdb stats: %s, focusLabel=%q, topN=%d", sq, focusLabel, topN) qt = qt.NewChild("get tsdb stats: %s, focusLabel=%q, topN=%d", sq, focusLabel, topN)
defer qt.Done() defer qt.Done()

View file

@ -901,7 +901,7 @@ func quantileSorted(phi float64, values []float64) float64 {
func aggrFuncMAD(tss []*timeseries) []*timeseries { func aggrFuncMAD(tss []*timeseries) []*timeseries {
// Calculate medians for each point across tss. // Calculate medians for each point across tss.
medians := getPerPointMedians(tss) medians := getPerPointMedians(tss)
// Calculate MAD values multipled by tolerance for each point across tss. // Calculate MAD values multiplied by tolerance for each point across tss.
// See https://en.wikipedia.org/wiki/Median_absolute_deviation // See https://en.wikipedia.org/wiki/Median_absolute_deviation
mads := getPerPointMADs(tss, medians) mads := getPerPointMADs(tss, medians)
tss[0].Values = append(tss[0].Values[:0], mads...) tss[0].Values = append(tss[0].Values[:0], mads...)
@ -920,7 +920,7 @@ func aggrFuncOutliersMAD(afa *aggrFuncArg) ([]*timeseries, error) {
afe := func(tss []*timeseries, modifier *metricsql.ModifierExpr) []*timeseries { afe := func(tss []*timeseries, modifier *metricsql.ModifierExpr) []*timeseries {
// Calculate medians for each point across tss. // Calculate medians for each point across tss.
medians := getPerPointMedians(tss) medians := getPerPointMedians(tss)
// Calculate MAD values multipled by tolerance for each point across tss. // Calculate MAD values multiplied by tolerance for each point across tss.
// See https://en.wikipedia.org/wiki/Median_absolute_deviation // See https://en.wikipedia.org/wiki/Median_absolute_deviation
mads := getPerPointMADs(tss, medians) mads := getPerPointMADs(tss, medians)
for n := range mads { for n := range mads {

View file

@ -466,7 +466,7 @@ func execBinaryOpArgs(qt *querytracer.Tracer, ec *EvalConfig, exprFirst, exprSec
// 1) execute the exprFirst // 1) execute the exprFirst
// 2) get common label filters for series returned at step 1 // 2) get common label filters for series returned at step 1
// 3) push down the found common label filters to exprSecond. This filters out unneeded series // 3) push down the found common label filters to exprSecond. This filters out unneeded series
// during exprSecond exection instead of spending compute resources on extracting and processing these series // during exprSecond execution instead of spending compute resources on extracting and processing these series
// before they are dropped later when matching time series according to https://prometheus.io/docs/prometheus/latest/querying/operators/#vector-matching // before they are dropped later when matching time series according to https://prometheus.io/docs/prometheus/latest/querying/operators/#vector-matching
// 4) execute the exprSecond with possible additional filters found at step 3 // 4) execute the exprSecond with possible additional filters found at step 3
// //

View file

@ -385,7 +385,7 @@ func getRollupFunc(funcName string) newRollupFunc {
} }
type rollupFuncArg struct { type rollupFuncArg struct {
// The value preceeding values if it fits staleness interval. // The value preceding values if it fits staleness interval.
prevValue float64 prevValue float64
// The timestamp for prevValue. // The timestamp for prevValue.
@ -397,7 +397,7 @@ type rollupFuncArg struct {
// Timestamps for values. // Timestamps for values.
timestamps []int64 timestamps []int64
// Real value preceeding values without restrictions on staleness interval. // Real value preceding values without restrictions on staleness interval.
realPrevValue float64 realPrevValue float64
// Real value which goes after values. // Real value which goes after values.
@ -587,7 +587,7 @@ func (rc *rollupConfig) doInternal(dstValues []float64, tsm *timeseriesMap, valu
if window <= 0 { if window <= 0 {
window = rc.Step window = rc.Step
if rc.MayAdjustWindow && window < maxPrevInterval { if rc.MayAdjustWindow && window < maxPrevInterval {
// Adjust lookbehind window only if it isn't set explicilty, e.g. rate(foo). // Adjust lookbehind window only if it isn't set explicitly, e.g. rate(foo).
// In the case of missing lookbehind window it should be adjusted in order to return non-empty graph // In the case of missing lookbehind window it should be adjusted in order to return non-empty graph
// when the window doesn't cover at least two raw samples (this is what most users expect). // when the window doesn't cover at least two raw samples (this is what most users expect).
// //

View file

@ -25,6 +25,7 @@ The following tip changes can be tested by building VictoriaMetrics components f
* FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): show `median` instead of `avg` in graph tooltip and line legend, since `median` is more tolerant against spikes. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3706). * FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): show `median` instead of `avg` in graph tooltip and line legend, since `median` is more tolerant against spikes. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3706).
* BUGFIX: prevent from possible data ingestion slowdown and query performance slowdown during [background merges of big parts](https://docs.victoriametrics.com/#storage) on systems with small number of CPU cores (1 or 2 CPU cores). The issue has been introduced in [v1.85.0](https://docs.victoriametrics.com/CHANGELOG.html#v1850) when implementing [this feature](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3337). See also [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3790). * BUGFIX: prevent from possible data ingestion slowdown and query performance slowdown during [background merges of big parts](https://docs.victoriametrics.com/#storage) on systems with small number of CPU cores (1 or 2 CPU cores). The issue has been introduced in [v1.85.0](https://docs.victoriametrics.com/CHANGELOG.html#v1850) when implementing [this feature](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3337). See also [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3790).
* BUGFIX: properly parse timestamps in milliseconds when [ingesting data via OpenTSDB telnet put protocol](https://docs.victoriametrics.com/#sending-data-via-telnet-put-protocol). Previously timestamps in milliseconds were mistakenly multiplied by 1000. Thanks to @Droxenator for the [pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/3810).
## [v1.87.1](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.87.1) ## [v1.87.1](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.87.1)

View file

@ -79,6 +79,30 @@ requests_total{path="/", code="200"} 123 4567890
- The `4567890` is an optional timestamp for the sample. If it is missing, - The `4567890` is an optional timestamp for the sample. If it is missing,
then the current timestamp is used when storing the sample in VictoriaMetrics. then the current timestamp is used when storing the sample in VictoriaMetrics.
#### Time series resolution
Resolution is the minimum interval between [raw samples](https://docs.victoriametrics.com/keyConcepts.html#raw-samples)
of the [time series](https://docs.victoriametrics.com/keyConcepts.html#time-series). Consider the following example:
```
----------------------------------------------------------------------
| <time series> | <value> | <timestamp> |
| requests_total{path="/health", code="200"} | 1 | 1676297640 |
| requests_total{path="/health", code="200"} | 2 | 1676297670 |
| requests_total{path="/health", code="200"} | 3 | 1676297700 |
| requests_total{path="/health", code="200"} | 4 | 1676297730 |
....
```
Here we have a time series `requests_total{path="/health", code="200"}` which has a value update each `30s`.
This means, its resolution is also a `30s`.
> In terms of [pull model](https://docs.victoriametrics.com/keyConcepts.html#pull-model), resolution is equal
> to `scrape_interval` and is controlled by the monitoring system (server).
> For [push model](https://docs.victoriametrics.com/keyConcepts.html#push-model), resolution is an interval between
> samples timestamps and is controlled by a client (metrics collector).
Try to keep time series resolution consistent, since some [MetricsQL](#metricsql) functions may expect it to be so.
### Types of metrics ### Types of metrics
Internally, VictoriaMetrics does not have the notion of a metric type. The concept of a metric Internally, VictoriaMetrics does not have the notion of a metric type. The concept of a metric

View file

@ -4,37 +4,19 @@ sort: 28
# Managed VictoriaMetrics # Managed VictoriaMetrics
VictoriaMetrics is a fast and easy-to-use monitoring solution and time series database. Product information:
It integrates well with existing monitoring systems such as Grafana, Prometheus, Graphite,
InfluxDB, OpenTSDB and DataDog - see [these docs](https://docs.victoriametrics.com/#how-to-import-time-series-data) for details.
The most common use cases for VictoriaMetrics are: * [Overview of Managed VictoriaMetrics](/managed-victoriametrics/overview.html)
* Long-term remote storage for Prometheus; * [User Management](/managed-victoriametrics/user-managment.html)
* More efficient drop-in replacement for Prometheus and Graphite
* Replacement for InfluxDB and OpenTSDB, which uses lower amounts of RAM, CPU and disk;
* Cost-efficient alternative for DataDog.
We are glad to announce the availability of Managed VictoriaMetrics Guides:
at AWS Marketplace - [try it right now](https://aws.amazon.com/marketplace/pp/prodview-4tbfq5icmbmyc)!
Managed VictoriaMetrics allows users running VictoriaMetrics at AWS without the need to perform typical * [Quick Start](/managed-victoriametrics/quickstart.html)
DevOps tasks such as proper configuration, monitoring, logs collection, access protection, software updates, * [Kubernetes Monitoring with Managed VictoriaMetrics](/managed-victoriametrics/how-to-monitor-k8s.html)
backups, etc. * [Understand Your Setup Size](/guides/understand-your-setup-size.html)
We run Managed VictoriaMetrics instances in our environment at AWS while providing easy-to-use endpoints
for data ingestion and querying. And the VictoriaMetrics team takes care of optimal configuration and software
maintenance.
Managed VictoriaMetrics comes with the following features:
* It can be used as a Managed Prometheus - just configure Prometheus or vmagent to write data to Managed VictoriaMetrics and then use the provided endpoint as a Prometheus datasource in Grafana;
* Every Managed VictoriaMetrics instance runs in an isolated environment, so instances cannot interfere with each other;
* Managed VictoriaMetrics instance can be scaled up or scaled down in a few clicks;
* Automated backups;
* Pay only for the actually used compute resources - instance type, disk and network.
See more about Managed VictoriaMetrics in the following articles: See more about Managed VictoriaMetrics in the following articles:
* [Managed VictoriaMetrics announcement](https://victoriametrics.com/blog/managed-victoriametrics-announcement) * [Managed VictoriaMetrics announcement](https://victoriametrics.com/blog/managed-victoriametrics-announcement)
* [Pricing comparison for Managed Prometheus](https://victoriametrics.com/blog/managed-prometheus-pricing/) * [Pricing comparison for Managed Prometheus](https://victoriametrics.com/blog/managed-prometheus-pricing/)
* [Monitoring Proxmox VE via Managed VictoriaMetrics and vmagent](https://victoriametrics.com/blog/proxmox-monitoring-with-dbaas/) * [Monitoring Proxmox VE via Managed VictoriaMetrics and vmagent](https://victoriametrics.com/blog/proxmox-monitoring-with-dbaas/)

View file

@ -1,3 +1,12 @@
---
sort: 3
weight: 3
title: Kubernetes Monitoring with Managed VictoriaMetrics
menu:
docs:
parent: "managed"
weight: 3
---
# Kubernetes Monitoring with Managed VictoriaMetrics # Kubernetes Monitoring with Managed VictoriaMetrics
Monitoring kubernetes cluster is necessary to build SLO/SLI, to analyze performance and cost-efficiency of your workloads. Monitoring kubernetes cluster is necessary to build SLO/SLI, to analyze performance and cost-efficiency of your workloads.

View file

@ -0,0 +1,40 @@
---
sort: 1
weight: 1
title: Overview of Managed VictoriaMetrics
menu:
docs:
parent: "managed"
weight: 1
---
# Overview of Managed VictoriaMetrics
VictoriaMetrics is a fast and easy-to-use monitoring solution and time series database.
It integrates well with existing monitoring systems such as Grafana, Prometheus, Graphite,
InfluxDB, OpenTSDB and DataDog - see [these docs](https://docs.victoriametrics.com/#how-to-import-time-series-data) for details.
The most common use cases for VictoriaMetrics are:
* Long-term remote storage for Prometheus;
* More efficient drop-in replacement for Prometheus and Graphite
* Replacement for InfluxDB and OpenTSDB, which uses lower amounts of RAM, CPU and disk;
* Cost-efficient alternative for DataDog.
We are glad to announce the availability of Managed VictoriaMetrics
at AWS Marketplace - [try it right now](https://aws.amazon.com/marketplace/pp/prodview-4tbfq5icmbmyc)!
Managed VictoriaMetrics allows users running VictoriaMetrics at AWS without the need to perform typical
DevOps tasks such as proper configuration, monitoring, logs collection, access protection, software updates,
backups, etc.
We run Managed VictoriaMetrics instances in our environment at AWS while providing easy-to-use endpoints
for data ingestion and querying. And the VictoriaMetrics team takes care of optimal configuration and software
maintenance.
Managed VictoriaMetrics comes with the following features:
* It can be used as a Managed Prometheus - just configure Prometheus or vmagent to write data to Managed VictoriaMetrics and then use the provided endpoint as a Prometheus datasource in Grafana;
* Every Managed VictoriaMetrics instance runs in an isolated environment, so instances cannot interfere with each other;
* Managed VictoriaMetrics instance can be scaled up or scaled down in a few clicks;
* Automated backups;
* Pay only for the actually used compute resources - instance type, disk and network.

View file

@ -1,3 +1,12 @@
---
sort: 2
weight: 2
title: Quick Start
menu:
docs:
parent: "managed"
weight: 2
---
# Quick Start # Quick Start
Managed VictoriaMetrics - is a database-as-a-service platform, where users can run the VictoriaMetrics Managed VictoriaMetrics - is a database-as-a-service platform, where users can run the VictoriaMetrics

View file

@ -1,3 +1,12 @@
---
sort: 4
weight: 4
title: User Management in Managed VictoriaMetrics
menu:
docs:
parent: "managed"
weight: 4
---
# User Management in Managed VictoriaMetrics # User Management in Managed VictoriaMetrics
The user management system enables admins to control user access and onboard and offboard users to the Managed VictoriaMetrics. It organizes users according to their needs and role. The user management system enables admins to control user access and onboard and offboard users to the Managed VictoriaMetrics. It organizes users according to their needs and role.

View file

@ -729,31 +729,42 @@ a review to the dashboard.
## Troubleshooting ## Troubleshooting
vmalert executes configured rules within certain intervals. It is expected that at the moment when rule is executed, ### Data delay
the data is already present in configured `-datasource.url`:
Data delay is one of the most common issues with rules execution.
vmalert executes configured rules within certain intervals at specifics timestamps.
It expects that the data is already present in configured `-datasource.url` at the moment of time when rule is executed:
<img alt="vmalert expected evaluation" src="vmalert_ts_normal.gif"> <img alt="vmalert expected evaluation" src="vmalert_ts_normal.gif">
Usually, troubles start to appear when data in `-datasource.url` is delayed or absent. In such cases, evaluations Usually, troubles start to appear when data in `-datasource.url` is delayed or absent. In such cases, evaluations
may get empty response from datasource and produce empty recording rules or reset alerts state: may get empty response from the datasource and produce empty recording rules or reset alerts state:
<img alt="vmalert evaluation when data is delayed" src="vmalert_ts_data_delay.gif"> <img alt="vmalert evaluation when data is delayed" src="vmalert_ts_data_delay.gif">
By default, recently written samples to VictoriaMetrics aren't visible for queries for up to 30s. Try the following recommendations to reduce the chance of hitting the data delay issue:
This behavior is controlled by `-search.latencyOffset` command-line flag and the `latency_offset` query ag at `vmselect`.
Usually, this results into a 30s shift for recording rules results.
Note that too small value passed to `-search.latencyOffset` or to `latency_offest` query arg may lead to incomplete query results.
Try the following recommendations in such cases: * Always configure group's `evaluationInterval` to be bigger or at least equal to
[time series resolution](https://docs.victoriametrics.com/keyConcepts.html#time-series-resolution);
* Always configure group's `evaluationInterval` to be bigger or equal to `scrape_interval` at which metrics * Ensure that `[duration]` value is at least twice bigger than
are delivered to the datasource; [time series resolution](https://docs.victoriametrics.com/keyConcepts.html#time-series-resolution). For example,
if expression is `rate(my_metric[2m]) > 0` then ensure that `my_metric` resolution is at least `1m` or better `30s`.
If you use VictoriaMetrics as datasource, `[duration]` can be omitted and VictoriaMetrics will adjust it automatically.
* If you know in advance, that data in datasource is delayed - try changing vmalert's `-datasource.lookback` * If you know in advance, that data in datasource is delayed - try changing vmalert's `-datasource.lookback`
command-line flag to add a time shift for evaluations; command-line flag to add a time shift for evaluations. Or extend `[duration]` to tolerate the delay.
* If time intervals between datapoints in datasource are irregular or `>=5min` - try changing vmalert's For example, `max_over_time(errors_total[10m]) > 0` will be active even if there is no data in datasource for last `9m`.
`-datasource.queryStep` command-line flag to specify how far search query can lookback for the recent datapoint. * If [time series resolution](https://docs.victoriametrics.com/keyConcepts.html#time-series-resolution)
The recommendation is to have the step at least two times bigger than `scrape_interval`, since in datasource is inconsistent or `>=5min` - try changing vmalert's `-datasource.queryStep` command-line flag to specify
there are no guarantees that scrape will not fail. how far search query can lookback for the recent datapoint. The recommendation is to have the step
at least two times bigger than the resolution.
> Please note, data delay is inevitable in distributed systems. And it is better to account for it instead of ignoring.
By default, recently written samples to VictoriaMetrics aren't visible for queries for up to 30s
(see `-search.latencyOffset` command-line flag at vmselect). Such delay is needed to eliminate risk of incomplete
data on the moment of querying, since metrics collectors won't be able to deliver the data in time.
### Alerts state
Sometimes, it is not clear why some specific alert fired or didn't fire. It is very important to remember, that Sometimes, it is not clear why some specific alert fired or didn't fire. It is very important to remember, that
alerts with `for: 0` fire immediately when their expression becomes true. And alerts with `for > 0` will fire only alerts with `for: 0` fire immediately when their expression becomes true. And alerts with `for > 0` will fire only
@ -776,6 +787,8 @@ HTTP request sent by vmalert to the `-datasource.url` during evaluation. If spec
no samples returned and curl command returns data - then it is very likely there was no data in datasource on the no samples returned and curl command returns data - then it is very likely there was no data in datasource on the
moment when rule was evaluated. moment when rule was evaluated.
### Debug mode
vmalert allows configuring more detailed logging for specific alerting rule. Just set `debug: true` in rule's configuration vmalert allows configuring more detailed logging for specific alerting rule. Just set `debug: true` in rule's configuration
and vmalert will start printing additional log messages: and vmalert will start printing additional log messages:
```terminal ```terminal

View file

@ -239,7 +239,7 @@ func (cfg *Config) getAPICredentials() (*credentials, error) {
} }
// getECSRoleCredentialsByPath makes request to ecs metadata service // getECSRoleCredentialsByPath makes request to ecs metadata service
// and retrieves instances credentails // and retrieves instances credentials
// https://docs.aws.amazon.com/AmazonECS/latest/developerguide/task-iam-roles.html // https://docs.aws.amazon.com/AmazonECS/latest/developerguide/task-iam-roles.html
func getECSRoleCredentialsByPath(client *http.Client, path string) (*credentials, error) { func getECSRoleCredentialsByPath(client *http.Client, path string) (*credentials, error) {
resp, err := client.Get(path) resp, err := client.Get(path)
@ -329,7 +329,7 @@ func getMetadataByPath(client *http.Client, apiPath string) ([]byte, error) {
return readResponseBody(resp, apiURL) return readResponseBody(resp, apiURL)
} }
// getRoleWebIdentityCredentials obtains credentials fo the given roleARN with webToken. // getRoleWebIdentityCredentials obtains credentials for the given roleARN with webToken.
// https://docs.aws.amazon.com/STS/latest/APIReference/API_AssumeRoleWithWebIdentity.html // https://docs.aws.amazon.com/STS/latest/APIReference/API_AssumeRoleWithWebIdentity.html
// aws IRSA for kubernetes. // aws IRSA for kubernetes.
// https://aws.amazon.com/blogs/opensource/introducing-fine-grained-iam-roles-service-accounts/ // https://aws.amazon.com/blogs/opensource/introducing-fine-grained-iam-roles-service-accounts/
@ -365,7 +365,7 @@ func (cfg *Config) getSTSAPIResponse(action string, reqBuilder func(apiURL strin
return readResponseBody(resp, apiURL) return readResponseBody(resp, apiURL)
} }
// getRoleARNCredentials obtains credentials fo the given roleARN. // getRoleARNCredentials obtains credentials for the given roleARN.
func (cfg *Config) getRoleARNCredentials(creds *credentials) (*credentials, error) { func (cfg *Config) getRoleARNCredentials(creds *credentials) (*credentials, error) {
data, err := cfg.getSTSAPIResponse("AssumeRole", func(apiURL string) (*http.Request, error) { data, err := cfg.getSTSAPIResponse("AssumeRole", func(apiURL string) (*http.Request, error) {
return newSignedGetRequest(apiURL, "sts", cfg.region, creds) return newSignedGetRequest(apiURL, "sts", cfg.region, creds)

View file

@ -309,7 +309,7 @@ func (fs *FS) CreateFile(filePath string, data []byte) error {
return nil return nil
} }
// HasFile returns ture if filePath exists at fs. // HasFile returns true if filePath exists at fs.
func (fs *FS) HasFile(filePath string) (bool, error) { func (fs *FS) HasFile(filePath string) (bool, error) {
path := fs.Dir + filePath path := fs.Dir + filePath

View file

@ -250,7 +250,7 @@ func removeEmptyDirsInternal(d *os.File) (bool, error) {
return false, nil return false, nil
} }
// Use os.RemoveAll() instead of os.Remove(), since the dir may contain special files such as flock.lock and restore-in-progress, // Use os.RemoveAll() instead of os.Remove(), since the dir may contain special files such as flock.lock and restore-in-progress,
// which must be ingored. // which must be ignored.
if err := os.RemoveAll(dir); err != nil { if err := os.RemoveAll(dir); err != nil {
return false, fmt.Errorf("cannot remove %q: %w", dir, err) return false, fmt.Errorf("cannot remove %q: %w", dir, err)
} }

View file

@ -33,7 +33,7 @@ type FS struct {
// Directory in the bucket to write to. // Directory in the bucket to write to.
Dir string Dir string
// Set for using S3-compatible enpoint such as MinIO etc. // Set for using S3-compatible endpoint such as MinIO etc.
CustomEndpoint string CustomEndpoint string
// Force to use path style for s3, true by default. // Force to use path style for s3, true by default.

View file

@ -40,7 +40,7 @@ func WriteFileAndSync(path string, data []byte) error {
} }
if _, err := f.Write(data); err != nil { if _, err := f.Write(data); err != nil {
f.MustClose() f.MustClose()
// Do not call MustRemoveAll(path), so the user could inpsect // Do not call MustRemoveAll(path), so the user could inspect
// the file contents during investigation of the issue. // the file contents during investigation of the issue.
return fmt.Errorf("cannot write %d bytes to %q: %w", len(data), path, err) return fmt.Errorf("cannot write %d bytes to %q: %w", len(data), path, err)
} }

View file

@ -20,7 +20,7 @@ func sysTotalMemory() int {
} }
mem := cgroup.GetMemoryLimit() mem := cgroup.GetMemoryLimit()
if mem <= 0 || int64(int(mem)) != mem || int(mem) > totalMem { if mem <= 0 || int64(int(mem)) != mem || int(mem) > totalMem {
// Try reading hierachical memory limit. // Try reading hierarchical memory limit.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/699 // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/699
mem = cgroup.GetHierarchicalMemoryLimit() mem = cgroup.GetHierarchicalMemoryLimit()
if mem <= 0 || int64(int(mem)) != mem || int(mem) > totalMem { if mem <= 0 || int64(int(mem)) != mem || int(mem) > totalMem {

View file

@ -157,7 +157,7 @@ func commonPrefixLen(a, b []byte) int {
// Add adds x to the end of ib. // Add adds x to the end of ib.
// //
// false is returned if x isn't added to ib due to block size contraints. // false is returned if x isn't added to ib due to block size constraints.
func (ib *inmemoryBlock) Add(x []byte) bool { func (ib *inmemoryBlock) Add(x []byte) bool {
data := ib.data data := ib.data
if len(x)+len(data) > maxInmemoryBlockSize { if len(x)+len(data) > maxInmemoryBlockSize {

View file

@ -1665,7 +1665,7 @@ func runTransaction(txnLock *sync.RWMutex, pathPrefix, txnPath string) error {
srcPath, dstPath) srcPath, dstPath)
} }
// Flush pathPrefix directory metadata to the underying storage. // Flush pathPrefix directory metadata to the underlying storage.
fs.MustSyncPath(pathPrefix) fs.MustSyncPath(pathPrefix)
pendingTxnDeletionsWG.Add(1) pendingTxnDeletionsWG.Add(1)

View file

@ -89,7 +89,7 @@ func TestParseProxyProtocolFail(t *testing.T) {
// unsupported command // unsupported command
f([]byte{0x0D, 0x0A, 0x0D, 0x0A, 0x00, 0x0D, 0x0A, 0x51, 0x55, 0x49, 0x54, 0x0A, 0x22, 0x11, 0x00, 0x0C, f([]byte{0x0D, 0x0A, 0x0D, 0x0A, 0x00, 0x0D, 0x0A, 0x51, 0x55, 0x49, 0x54, 0x0A, 0x22, 0x11, 0x00, 0x0C,
0x7F, 0x00, 0x00, 0x01, 0, 0, 0, 0, 0, 80, 0, 0}) 0x7F, 0x00, 0x00, 0x01, 0, 0, 0, 0, 0, 80, 0, 0})
// mimatch ipv6 and ipv4 // mismatch ipv6 and ipv4
f([]byte{0x0D, 0x0A, 0x0D, 0x0A, 0x00, 0x0D, 0x0A, 0x51, 0x55, 0x49, 0x54, 0x0A, 0x21, 0x21, 0x00, 0x0C, f([]byte{0x0D, 0x0A, 0x0D, 0x0A, 0x00, 0x0D, 0x0A, 0x51, 0x55, 0x49, 0x54, 0x0A, 0x21, 0x21, 0x00, 0x0C,
// ip data srcid,dstip,srcport // ip data srcid,dstip,srcport
0x7F, 0x00, 0x00, 0x01, 0, 0, 0, 0, 0, 80, 0, 0}) 0x7F, 0x00, 0x00, 0x01, 0, 0, 0, 0, 0, 80, 0, 0})

View file

@ -76,7 +76,7 @@ func appendGraphiteMatchTemplateParts(dst []string, s string) []string {
// Match matches s against gmt. // Match matches s against gmt.
// //
// On success it adds matched captures to dst and returns it with true. // On success it adds matched captures to dst and returns it with true.
// Of failre it returns false. // On failure it returns false.
func (gmt *graphiteMatchTemplate) Match(dst []string, s string) ([]string, bool) { func (gmt *graphiteMatchTemplate) Match(dst []string, s string) ([]string, bool) {
dst = append(dst, s) dst = append(dst, s)
parts := gmt.parts parts := gmt.parts

View file

@ -39,7 +39,7 @@ type client struct {
// hc is the default client optimized for common case of scraping targets with moderate number of metrics. // hc is the default client optimized for common case of scraping targets with moderate number of metrics.
hc *fasthttp.HostClient hc *fasthttp.HostClient
// sc (aka `stream client`) is used instead of hc if ScrapeWork.ParseStream is set. // sc (aka `stream client`) is used instead of hc if ScrapeWork.StreamParse is set.
// It may be useful for scraping targets with millions of metrics per target. // It may be useful for scraping targets with millions of metrics per target.
sc *http.Client sc *http.Client

View file

@ -119,7 +119,7 @@ func getToken(token *promauth.Secret) (string, error) {
return string(data), nil return string(data), nil
} }
t := os.Getenv("CONSUL_HTTP_TOKEN") t := os.Getenv("CONSUL_HTTP_TOKEN")
// Allow empty token - it shouls work if authorization is disabled in Consul // Allow empty token - it should work if authorization is disabled in Consul
return t, nil return t, nil
} }
@ -145,7 +145,7 @@ func maxWaitTime() time.Duration {
// Consul adds random delay up to wait/16, so reduce the timeout in order to keep it below BlockingClientReadTimeout. // Consul adds random delay up to wait/16, so reduce the timeout in order to keep it below BlockingClientReadTimeout.
// See https://www.consul.io/api-docs/features/blocking // See https://www.consul.io/api-docs/features/blocking
d -= d / 8 d -= d / 8
// The timeout cannot exceed 10 minuntes. See https://www.consul.io/api-docs/features/blocking // The timeout cannot exceed 10 minutes. See https://www.consul.io/api-docs/features/blocking
if d > 10*time.Minute { if d > 10*time.Minute {
d = 10 * time.Minute d = 10 * time.Minute
} }
@ -155,7 +155,7 @@ func maxWaitTime() time.Duration {
return d return d
} }
// getBlockingAPIResponse perfoms blocking request to Consul via client and returns response. // getBlockingAPIResponse performs blocking request to Consul via client and returns response.
// //
// See https://www.consul.io/api-docs/features/blocking . // See https://www.consul.io/api-docs/features/blocking .
func getBlockingAPIResponse(ctx context.Context, client *discoveryutils.Client, path string, index int64) ([]byte, int64, error) { func getBlockingAPIResponse(ctx context.Context, client *discoveryutils.Client, path string, index int64) ([]byte, int64, error) {

View file

@ -150,7 +150,7 @@ func addTasksLabels(tasks []task, nodesLabels, servicesLabels []*promutils.Label
return ms return ms
} }
// addLabels adds lables from src to dst if they contain the given `key: value` pair. // addLabels adds labels from src to dst if they contain the given `key: value` pair.
func addLabels(dst *promutils.Labels, src []*promutils.Labels, key, value string) { func addLabels(dst *promutils.Labels, src []*promutils.Labels, key, value string) {
for _, m := range src { for _, m := range src {
if m.Get(key) != value { if m.Get(key) != value {

View file

@ -22,7 +22,7 @@ type apiConfig struct {
parseErrors *metrics.Counter parseErrors *metrics.Counter
} }
// httpGroupTarget respresent prometheus GroupTarget // httpGroupTarget represent prometheus GroupTarget
// https://prometheus.io/docs/prometheus/latest/http_sd/ // https://prometheus.io/docs/prometheus/latest/http_sd/
type httpGroupTarget struct { type httpGroupTarget struct {
Targets []string `json:"targets"` Targets []string `json:"targets"`

View file

@ -408,7 +408,7 @@ func (gw *groupWatcher) doRequest(requestURL string) (*http.Response, error) {
requestURL = strings.Replace(requestURL, "/apis/networking.k8s.io/v1/", "/apis/networking.k8s.io/v1beta1/", 1) requestURL = strings.Replace(requestURL, "/apis/networking.k8s.io/v1/", "/apis/networking.k8s.io/v1beta1/", 1)
} }
if strings.Contains(requestURL, "/apis/discovery.k8s.io/v1/") && atomic.LoadUint32(&gw.useDiscoveryV1Beta1) == 1 { if strings.Contains(requestURL, "/apis/discovery.k8s.io/v1/") && atomic.LoadUint32(&gw.useDiscoveryV1Beta1) == 1 {
// Update discovery URL for old Kuberentes API, which supports only v1beta1 path. // Update discovery URL for old Kubernetes API, which supports only v1beta1 path.
requestURL = strings.Replace(requestURL, "/apis/discovery.k8s.io/v1/", "/apis/discovery.k8s.io/v1beta1/", 1) requestURL = strings.Replace(requestURL, "/apis/discovery.k8s.io/v1/", "/apis/discovery.k8s.io/v1beta1/", 1)
} }
req, err := http.NewRequest("GET", requestURL, nil) req, err := http.NewRequest("GET", requestURL, nil)

View file

@ -82,7 +82,7 @@ type NodeDaemonEndpoints struct {
KubeletEndpoint DaemonEndpoint KubeletEndpoint DaemonEndpoint
} }
// getTargetLabels returs labels for the given n. // getTargetLabels returns labels for the given n.
// //
// See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#node // See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#node
func (n *Node) getTargetLabels(gw *groupWatcher) []*promutils.Labels { func (n *Node) getTargetLabels(gw *groupWatcher) []*promutils.Labels {

View file

@ -104,7 +104,7 @@ func maxWaitTime() time.Duration {
// Nomad adds random delay up to wait/16, so reduce the timeout in order to keep it below BlockingClientReadTimeout. // Nomad adds random delay up to wait/16, so reduce the timeout in order to keep it below BlockingClientReadTimeout.
// See https://developer.hashicorp.com/nomad/api-docs#blocking-queries // See https://developer.hashicorp.com/nomad/api-docs#blocking-queries
d -= d / 16 d -= d / 16
// The timeout cannot exceed 10 minuntes. See https://developer.hashicorp.com/nomad/api-docs#blocking-queries // The timeout cannot exceed 10 minutes. See https://developer.hashicorp.com/nomad/api-docs#blocking-queries
if d > 10*time.Minute { if d > 10*time.Minute {
d = 10 * time.Minute d = 10 * time.Minute
@ -115,7 +115,7 @@ func maxWaitTime() time.Duration {
return d return d
} }
// getBlockingAPIResponse perfoms blocking request to Nomad via client and returns response. // getBlockingAPIResponse performs blocking request to Nomad via client and returns response.
// See https://developer.hashicorp.com/nomad/api-docs#blocking-queries . // See https://developer.hashicorp.com/nomad/api-docs#blocking-queries .
func getBlockingAPIResponse(ctx context.Context, client *discoveryutils.Client, path string, index int64) ([]byte, int64, error) { func getBlockingAPIResponse(ctx context.Context, client *discoveryutils.Client, path string, index int64) ([]byte, int64, error) {
path += "&index=" + strconv.FormatInt(index, 10) path += "&index=" + strconv.FormatInt(index, 10)

View file

@ -26,6 +26,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils"
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/prometheus" parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/prometheus"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/prometheus/stream"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/proxy" "github.com/VictoriaMetrics/VictoriaMetrics/lib/proxy"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool" "github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool"
"github.com/VictoriaMetrics/metrics" "github.com/VictoriaMetrics/metrics"
@ -575,7 +576,7 @@ func (sw *scrapeWork) scrapeStream(scrapeTimestamp, realTimestamp int64) error {
if err == nil { if err == nil {
bodyString = bytesutil.ToUnsafeString(sbr.body) bodyString = bytesutil.ToUnsafeString(sbr.body)
areIdenticalSeries = sw.areIdenticalSeries(lastScrape, bodyString) areIdenticalSeries = sw.areIdenticalSeries(lastScrape, bodyString)
err = parser.ParseStream(&sbr, scrapeTimestamp, false, func(rows []parser.Row) error { err = stream.Parse(&sbr, scrapeTimestamp, false, func(rows []parser.Row) error {
mu.Lock() mu.Lock()
defer mu.Unlock() defer mu.Unlock()
samplesScraped += len(rows) samplesScraped += len(rows)
@ -796,7 +797,7 @@ func (sw *scrapeWork) sendStaleSeries(lastScrape, currScrape string, timestamp i
// and https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3675 // and https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3675
var mu sync.Mutex var mu sync.Mutex
br := bytes.NewBufferString(bodyString) br := bytes.NewBufferString(bodyString)
err := parser.ParseStream(br, timestamp, false, func(rows []parser.Row) error { err := stream.Parse(br, timestamp, false, func(rows []parser.Row) error {
mu.Lock() mu.Lock()
defer mu.Unlock() defer mu.Unlock()
for i := range rows { for i := range rows {

View file

@ -49,7 +49,7 @@ func (x *Labels) UnmarshalYAML(unmarshal func(interface{}) error) error {
return nil return nil
} }
// MarshalJSON returns JSON respresentation for x. // MarshalJSON returns JSON representation for x.
func (x *Labels) MarshalJSON() ([]byte, error) { func (x *Labels) MarshalJSON() ([]byte, error) {
m := x.ToMap() m := x.ToMap()
return json.Marshal(m) return json.Marshal(m)
@ -235,7 +235,7 @@ func (x *Labels) RemoveDuplicates() {
// RemoveMetaLabels removes all the `__meta_` labels from x. // RemoveMetaLabels removes all the `__meta_` labels from x.
// //
// See https://www.robustperception.io/life-of-a-label fo details. // See https://www.robustperception.io/life-of-a-label for details.
func (x *Labels) RemoveMetaLabels() { func (x *Labels) RemoveMetaLabels() {
src := x.Labels src := x.Labels
dst := x.Labels[:0] dst := x.Labels[:0]

View file

@ -40,7 +40,7 @@ func StartUnmarshalWorkers() {
// StopUnmarshalWorkers stops unmarshal workers. // StopUnmarshalWorkers stops unmarshal workers.
// //
// No more calles to ScheduleUnmarshalWork are allowed after calling stopUnmarshalWorkers // No more calls to ScheduleUnmarshalWork are allowed after calling stopUnmarshalWorkers
func StopUnmarshalWorkers() { func StopUnmarshalWorkers() {
close(unmarshalWorkCh) close(unmarshalWorkCh)
unmarshalWorkersWG.Wait() unmarshalWorkersWG.Wait()

View file

@ -1,4 +1,4 @@
package csvimport package stream
import ( import (
"bufio" "bufio"
@ -12,6 +12,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/csvimport"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter" "github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
"github.com/VictoriaMetrics/metrics" "github.com/VictoriaMetrics/metrics"
) )
@ -21,19 +22,19 @@ var (
"Minimum practical duration is 1ms. Higher duration (i.e. 1s) may be used for reducing disk space usage for timestamp data") "Minimum practical duration is 1ms. Higher duration (i.e. 1s) may be used for reducing disk space usage for timestamp data")
) )
// ParseStream parses csv from req and calls callback for the parsed rows. // Parse parses csv from req and calls callback for the parsed rows.
// //
// The callback can be called concurrently multiple times for streamed data from req. // The callback can be called concurrently multiple times for streamed data from req.
// //
// callback shouldn't hold rows after returning. // callback shouldn't hold rows after returning.
func ParseStream(req *http.Request, callback func(rows []Row) error) error { func Parse(req *http.Request, callback func(rows []csvimport.Row) error) error {
wcr := writeconcurrencylimiter.GetReader(req.Body) wcr := writeconcurrencylimiter.GetReader(req.Body)
defer writeconcurrencylimiter.PutReader(wcr) defer writeconcurrencylimiter.PutReader(wcr)
r := io.Reader(wcr) r := io.Reader(wcr)
q := req.URL.Query() q := req.URL.Query()
format := q.Get("format") format := q.Get("format")
cds, err := ParseColumnDescriptors(format) cds, err := csvimport.ParseColumnDescriptors(format)
if err != nil { if err != nil {
return fmt.Errorf("cannot parse the provided csv format: %w", err) return fmt.Errorf("cannot parse the provided csv format: %w", err)
} }
@ -149,10 +150,10 @@ var streamContextPool sync.Pool
var streamContextPoolCh = make(chan *streamContext, cgroup.AvailableCPUs()) var streamContextPoolCh = make(chan *streamContext, cgroup.AvailableCPUs())
type unmarshalWork struct { type unmarshalWork struct {
rows Rows rows csvimport.Rows
ctx *streamContext ctx *streamContext
callback func(rows []Row) error callback func(rows []csvimport.Row) error
cds []ColumnDescriptor cds []csvimport.ColumnDescriptor
reqBuf []byte reqBuf []byte
} }
@ -164,7 +165,7 @@ func (uw *unmarshalWork) reset() {
uw.reqBuf = uw.reqBuf[:0] uw.reqBuf = uw.reqBuf[:0]
} }
func (uw *unmarshalWork) runCallback(rows []Row) { func (uw *unmarshalWork) runCallback(rows []csvimport.Row) {
ctx := uw.ctx ctx := uw.ctx
if err := uw.callback(rows); err != nil { if err := uw.callback(rows); err != nil {
ctx.callbackErrLock.Lock() ctx.callbackErrLock.Lock()

View file

@ -1,4 +1,4 @@
package datadog package stream
import ( import (
"bufio" "bufio"
@ -13,6 +13,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadog"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter" "github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
"github.com/VictoriaMetrics/metrics" "github.com/VictoriaMetrics/metrics"
) )
@ -30,10 +31,10 @@ var (
"https://docs.datadoghq.com/metrics/custom_metrics/#naming-custom-metrics") "https://docs.datadoghq.com/metrics/custom_metrics/#naming-custom-metrics")
) )
// ParseStream parses DataDog POST request for /api/v1/series from reader and calls callback for the parsed request. // Parse parses DataDog POST request for /api/v1/series from reader and calls callback for the parsed request.
// //
// callback shouldn't hold series after returning. // callback shouldn't hold series after returning.
func ParseStream(r io.Reader, contentEncoding string, callback func(series []Series) error) error { func Parse(r io.Reader, contentEncoding string, callback func(series []datadog.Series) error) error {
wcr := writeconcurrencylimiter.GetReader(r) wcr := writeconcurrencylimiter.GetReader(r)
defer writeconcurrencylimiter.PutReader(wcr) defer writeconcurrencylimiter.PutReader(wcr)
r = wcr r = wcr
@ -143,21 +144,21 @@ func putPushCtx(ctx *pushCtx) {
var pushCtxPool sync.Pool var pushCtxPool sync.Pool
var pushCtxPoolCh = make(chan *pushCtx, cgroup.AvailableCPUs()) var pushCtxPoolCh = make(chan *pushCtx, cgroup.AvailableCPUs())
func getRequest() *Request { func getRequest() *datadog.Request {
v := requestPool.Get() v := requestPool.Get()
if v == nil { if v == nil {
return &Request{} return &datadog.Request{}
} }
return v.(*Request) return v.(*datadog.Request)
} }
func putRequest(req *Request) { func putRequest(req *datadog.Request) {
requestPool.Put(req) requestPool.Put(req)
} }
var requestPool sync.Pool var requestPool sync.Pool
// sanitizeName performs DataDog-compatible santizing for metric names // sanitizeName performs DataDog-compatible sanitizing for metric names
// //
// See https://docs.datadoghq.com/metrics/custom_metrics/#naming-custom-metrics // See https://docs.datadoghq.com/metrics/custom_metrics/#naming-custom-metrics
func sanitizeName(name string) string { func sanitizeName(name string) string {

View file

@ -1,4 +1,4 @@
package datadog package stream
import ( import (
"testing" "testing"

View file

@ -2,10 +2,7 @@ package graphite
import ( import (
"reflect" "reflect"
"strings"
"testing" "testing"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
) )
func TestUnmarshalMetricAndTagsFailure(t *testing.T) { func TestUnmarshalMetricAndTagsFailure(t *testing.T) {
@ -383,71 +380,3 @@ func TestRowsUnmarshalSuccess(t *testing.T) {
}}, }},
}) })
} }
func Test_streamContext_Read(t *testing.T) {
f := func(s string, rowsExpected *Rows) {
t.Helper()
ctx := getStreamContext(strings.NewReader(s))
if !ctx.Read() {
t.Fatalf("expecting successful read")
}
uw := getUnmarshalWork()
callbackCalls := 0
uw.ctx = ctx
uw.callback = func(rows []Row) error {
callbackCalls++
if len(rows) != len(rowsExpected.Rows) {
t.Fatalf("different len of expected rows;\ngot\n%+v;\nwant\n%+v", rows, rowsExpected.Rows)
}
if !reflect.DeepEqual(rows, rowsExpected.Rows) {
t.Fatalf("unexpected rows;\ngot\n%+v;\nwant\n%+v", rows, rowsExpected.Rows)
}
return nil
}
uw.reqBuf = append(uw.reqBuf[:0], ctx.reqBuf...)
ctx.wg.Add(1)
uw.Unmarshal()
if callbackCalls != 1 {
t.Fatalf("unexpected number of callback calls; got %d; want 1", callbackCalls)
}
}
// Full line without tags
f("aaa 1123 345", &Rows{
Rows: []Row{{
Metric: "aaa",
Value: 1123,
Timestamp: 345 * 1000,
}},
})
// Full line with tags
f("aaa;x=y 1123 345", &Rows{
Rows: []Row{{
Metric: "aaa",
Tags: []Tag{{
Key: "x",
Value: "y",
}},
Value: 1123,
Timestamp: 345 * 1000,
}},
})
// missing timestamp.
// Note that this test may be flaky due to timing issues. TODO: fix it
f("aaa 1123", &Rows{
Rows: []Row{{
Metric: "aaa",
Value: 1123,
Timestamp: int64(fasttime.UnixTimestamp()) * 1000,
}},
})
// -1 timestamp. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/610
// Note that this test may be flaky due to timing issues. TODO: fix it.
f("aaa 1123 -1", &Rows{
Rows: []Row{{
Metric: "aaa",
Value: 1123,
Timestamp: int64(fasttime.UnixTimestamp()) * 1000,
}},
})
}

View file

@ -1,4 +1,4 @@
package graphite package stream
import ( import (
"bufio" "bufio"
@ -12,6 +12,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/graphite"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter" "github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
"github.com/VictoriaMetrics/metrics" "github.com/VictoriaMetrics/metrics"
) )
@ -21,12 +22,12 @@ var (
"Minimum practical duration is 1s. Higher duration (i.e. 1m) may be used for reducing disk space usage for timestamp data") "Minimum practical duration is 1s. Higher duration (i.e. 1m) may be used for reducing disk space usage for timestamp data")
) )
// ParseStream parses Graphite lines from r and calls callback for the parsed rows. // Parse parses Graphite lines from r and calls callback for the parsed rows.
// //
// The callback can be called concurrently multiple times for streamed data from r. // The callback can be called concurrently multiple times for streamed data from r.
// //
// callback shouldn't hold rows after returning. // callback shouldn't hold rows after returning.
func ParseStream(r io.Reader, callback func(rows []Row) error) error { func Parse(r io.Reader, callback func(rows []graphite.Row) error) error {
wcr := writeconcurrencylimiter.GetReader(r) wcr := writeconcurrencylimiter.GetReader(r)
defer writeconcurrencylimiter.PutReader(wcr) defer writeconcurrencylimiter.PutReader(wcr)
r = wcr r = wcr
@ -135,9 +136,9 @@ var streamContextPool sync.Pool
var streamContextPoolCh = make(chan *streamContext, cgroup.AvailableCPUs()) var streamContextPoolCh = make(chan *streamContext, cgroup.AvailableCPUs())
type unmarshalWork struct { type unmarshalWork struct {
rows Rows rows graphite.Rows
ctx *streamContext ctx *streamContext
callback func(rows []Row) error callback func(rows []graphite.Row) error
reqBuf []byte reqBuf []byte
} }
@ -148,7 +149,7 @@ func (uw *unmarshalWork) reset() {
uw.reqBuf = uw.reqBuf[:0] uw.reqBuf = uw.reqBuf[:0]
} }
func (uw *unmarshalWork) runCallback(rows []Row) { func (uw *unmarshalWork) runCallback(rows []graphite.Row) {
ctx := uw.ctx ctx := uw.ctx
if err := uw.callback(rows); err != nil { if err := uw.callback(rows); err != nil {
ctx.callbackErrLock.Lock() ctx.callbackErrLock.Lock()

View file

@ -0,0 +1,78 @@
package stream
import (
"reflect"
"strings"
"testing"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/graphite"
)
func Test_streamContext_Read(t *testing.T) {
f := func(s string, rowsExpected *graphite.Rows) {
t.Helper()
ctx := getStreamContext(strings.NewReader(s))
if !ctx.Read() {
t.Fatalf("expecting successful read")
}
uw := getUnmarshalWork()
callbackCalls := 0
uw.ctx = ctx
uw.callback = func(rows []graphite.Row) error {
callbackCalls++
if len(rows) != len(rowsExpected.Rows) {
t.Fatalf("different len of expected rows;\ngot\n%+v;\nwant\n%+v", rows, rowsExpected.Rows)
}
if !reflect.DeepEqual(rows, rowsExpected.Rows) {
t.Fatalf("unexpected rows;\ngot\n%+v;\nwant\n%+v", rows, rowsExpected.Rows)
}
return nil
}
uw.reqBuf = append(uw.reqBuf[:0], ctx.reqBuf...)
ctx.wg.Add(1)
uw.Unmarshal()
if callbackCalls != 1 {
t.Fatalf("unexpected number of callback calls; got %d; want 1", callbackCalls)
}
}
// Full line without tags
f("aaa 1123 345", &graphite.Rows{
Rows: []graphite.Row{{
Metric: "aaa",
Value: 1123,
Timestamp: 345 * 1000,
}},
})
// Full line with tags
f("aaa;x=y 1123 345", &graphite.Rows{
Rows: []graphite.Row{{
Metric: "aaa",
Tags: []graphite.Tag{{
Key: "x",
Value: "y",
}},
Value: 1123,
Timestamp: 345 * 1000,
}},
})
// missing timestamp.
// Note that this test may be flaky due to timing issues. TODO: fix it
f("aaa 1123", &graphite.Rows{
Rows: []graphite.Row{{
Metric: "aaa",
Value: 1123,
Timestamp: int64(fasttime.UnixTimestamp()) * 1000,
}},
})
// -1 timestamp. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/610
// Note that this test may be flaky due to timing issues. TODO: fix it.
f("aaa 1123 -1", &graphite.Rows{
Rows: []graphite.Row{{
Metric: "aaa",
Value: 1123,
Timestamp: int64(fasttime.UnixTimestamp()) * 1000,
}},
})
}

View file

@ -1,4 +1,4 @@
package influx package stream
import ( import (
"bufio" "bufio"
@ -12,6 +12,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/influx"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter" "github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
"github.com/VictoriaMetrics/metrics" "github.com/VictoriaMetrics/metrics"
) )
@ -22,12 +23,12 @@ var (
"Minimum practical duration is 1ms. Higher duration (i.e. 1s) may be used for reducing disk space usage for timestamp data") "Minimum practical duration is 1ms. Higher duration (i.e. 1s) may be used for reducing disk space usage for timestamp data")
) )
// ParseStream parses r with the given args and calls callback for the parsed rows. // Parse parses r with the given args and calls callback for the parsed rows.
// //
// The callback can be called concurrently multiple times for streamed data from r. // The callback can be called concurrently multiple times for streamed data from r.
// //
// callback shouldn't hold rows after returning. // callback shouldn't hold rows after returning.
func ParseStream(r io.Reader, isGzipped bool, precision, db string, callback func(db string, rows []Row) error) error { func Parse(r io.Reader, isGzipped bool, precision, db string, callback func(db string, rows []influx.Row) error) error {
wcr := writeconcurrencylimiter.GetReader(r) wcr := writeconcurrencylimiter.GetReader(r)
defer writeconcurrencylimiter.PutReader(wcr) defer writeconcurrencylimiter.PutReader(wcr)
r = wcr r = wcr
@ -162,9 +163,9 @@ var streamContextPool sync.Pool
var streamContextPoolCh = make(chan *streamContext, cgroup.AvailableCPUs()) var streamContextPoolCh = make(chan *streamContext, cgroup.AvailableCPUs())
type unmarshalWork struct { type unmarshalWork struct {
rows Rows rows influx.Rows
ctx *streamContext ctx *streamContext
callback func(db string, rows []Row) error callback func(db string, rows []influx.Row) error
db string db string
tsMultiplier int64 tsMultiplier int64
reqBuf []byte reqBuf []byte
@ -179,7 +180,7 @@ func (uw *unmarshalWork) reset() {
uw.reqBuf = uw.reqBuf[:0] uw.reqBuf = uw.reqBuf[:0]
} }
func (uw *unmarshalWork) runCallback(rows []Row) { func (uw *unmarshalWork) runCallback(rows []influx.Row) {
ctx := uw.ctx ctx := uw.ctx
if err := uw.callback(uw.db, rows); err != nil { if err := uw.callback(uw.db, rows); err != nil {
ctx.callbackErrLock.Lock() ctx.callbackErrLock.Lock()

View file

@ -1,4 +1,4 @@
package influx package stream
import ( import (
"testing" "testing"

View file

@ -1,4 +1,4 @@
package native package stream
import ( import (
"bufio" "bufio"
@ -14,12 +14,12 @@ import (
"github.com/VictoriaMetrics/metrics" "github.com/VictoriaMetrics/metrics"
) )
// ParseStream parses /api/v1/import/native lines from req and calls callback for parsed blocks. // Parse parses /api/v1/import/native lines from req and calls callback for parsed blocks.
// //
// The callback can be called concurrently multiple times for streamed data from r. // The callback can be called concurrently multiple times for streamed data from r.
// //
// callback shouldn't hold block after returning. // callback shouldn't hold block after returning.
func ParseStream(r io.Reader, isGzip bool, callback func(block *Block) error) error { func Parse(r io.Reader, isGzip bool, callback func(block *Block) error) error {
wcr := writeconcurrencylimiter.GetReader(r) wcr := writeconcurrencylimiter.GetReader(r)
defer writeconcurrencylimiter.PutReader(wcr) defer writeconcurrencylimiter.PutReader(wcr)
r = wcr r = wcr

View file

@ -1,4 +1,4 @@
package opentsdb package stream
import ( import (
"bufio" "bufio"
@ -12,6 +12,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentsdb"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter" "github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
"github.com/VictoriaMetrics/metrics" "github.com/VictoriaMetrics/metrics"
) )
@ -21,12 +22,12 @@ var (
"Minimum practical duration is 1s. Higher duration (i.e. 1m) may be used for reducing disk space usage for timestamp data") "Minimum practical duration is 1s. Higher duration (i.e. 1m) may be used for reducing disk space usage for timestamp data")
) )
// ParseStream parses OpenTSDB lines from r and calls callback for the parsed rows. // Parse parses OpenTSDB lines from r and calls callback for the parsed rows.
// //
// The callback can be called concurrently multiple times for streamed data from r. // The callback can be called concurrently multiple times for streamed data from r.
// //
// callback shouldn't hold rows after returning. // callback shouldn't hold rows after returning.
func ParseStream(r io.Reader, callback func(rows []Row) error) error { func Parse(r io.Reader, callback func(rows []opentsdb.Row) error) error {
wcr := writeconcurrencylimiter.GetReader(r) wcr := writeconcurrencylimiter.GetReader(r)
defer writeconcurrencylimiter.PutReader(wcr) defer writeconcurrencylimiter.PutReader(wcr)
r = wcr r = wcr
@ -134,9 +135,9 @@ var streamContextPool sync.Pool
var streamContextPoolCh = make(chan *streamContext, cgroup.AvailableCPUs()) var streamContextPoolCh = make(chan *streamContext, cgroup.AvailableCPUs())
type unmarshalWork struct { type unmarshalWork struct {
rows Rows rows opentsdb.Rows
ctx *streamContext ctx *streamContext
callback func(rows []Row) error callback func(rows []opentsdb.Row) error
reqBuf []byte reqBuf []byte
} }
@ -147,7 +148,7 @@ func (uw *unmarshalWork) reset() {
uw.reqBuf = uw.reqBuf[:0] uw.reqBuf = uw.reqBuf[:0]
} }
func (uw *unmarshalWork) runCallback(rows []Row) { func (uw *unmarshalWork) runCallback(rows []opentsdb.Row) {
ctx := uw.ctx ctx := uw.ctx
if err := uw.callback(rows); err != nil { if err := uw.callback(rows); err != nil {
ctx.callbackErrLock.Lock() ctx.callbackErrLock.Lock()
@ -174,9 +175,13 @@ func (uw *unmarshalWork) Unmarshal() {
} }
} }
// Convert timestamps from seconds to milliseconds // Convert timestamps in seconds to milliseconds if needed.
// See http://opentsdb.net/docs/javadoc/net/opentsdb/core/Const.html#SECOND_MASK
for i := range rows { for i := range rows {
rows[i].Timestamp *= 1e3 r := &rows[i]
if r.Timestamp&secondMask == 0 {
r.Timestamp *= 1e3
}
} }
// Trim timestamps if required. // Trim timestamps if required.
@ -191,6 +196,8 @@ func (uw *unmarshalWork) Unmarshal() {
putUnmarshalWork(uw) putUnmarshalWork(uw)
} }
const secondMask int64 = 0x7FFFFFFF00000000
func getUnmarshalWork() *unmarshalWork { func getUnmarshalWork() *unmarshalWork {
v := unmarshalWorkPool.Get() v := unmarshalWorkPool.Get()
if v == nil { if v == nil {

View file

@ -4,17 +4,17 @@ import (
"github.com/valyala/fastjson" "github.com/valyala/fastjson"
) )
// getJSONParser returns JSON parser. // GetJSONParser returns JSON parser.
// //
// The parser must be returned to the pool via putJSONParser when no longer needed. // The parser must be returned to the pool via PutJSONParser when no longer needed.
func getJSONParser() *fastjson.Parser { func GetJSONParser() *fastjson.Parser {
return parserPool.Get() return parserPool.Get()
} }
// putJSONParser returns p to the pool. // PutJSONParser returns p to the pool.
// //
// p cannot be used after returning to the pool. // p cannot be used after returning to the pool.
func putJSONParser(p *fastjson.Parser) { func PutJSONParser(p *fastjson.Parser) {
parserPool.Put(p) parserPool.Put(p)
} }

View file

@ -9,8 +9,8 @@ func TestRowsUnmarshalFailure(t *testing.T) {
f := func(s string) { f := func(s string) {
t.Helper() t.Helper()
var rows Rows var rows Rows
p := getJSONParser() p := GetJSONParser()
defer putJSONParser(p) defer PutJSONParser(p)
v, err := p.Parse(s) v, err := p.Parse(s)
if err != nil { if err != nil {
// Expected JSON parser error // Expected JSON parser error
@ -84,8 +84,8 @@ func TestRowsUnmarshalSuccess(t *testing.T) {
t.Helper() t.Helper()
var rows Rows var rows Rows
p := getJSONParser() p := GetJSONParser()
defer putJSONParser(p) defer PutJSONParser(p)
v, err := p.Parse(s) v, err := p.Parse(s)
if err != nil { if err != nil {
t.Fatalf("cannot parse json %s: %s", s, err) t.Fatalf("cannot parse json %s: %s", s, err)

View file

@ -1,4 +1,4 @@
package opentsdbhttp package stream
import ( import (
"bufio" "bufio"
@ -14,6 +14,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentsdbhttp"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter" "github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
"github.com/VictoriaMetrics/metrics" "github.com/VictoriaMetrics/metrics"
) )
@ -24,12 +25,12 @@ var (
"Minimum practical duration is 1ms. Higher duration (i.e. 1s) may be used for reducing disk space usage for timestamp data") "Minimum practical duration is 1ms. Higher duration (i.e. 1s) may be used for reducing disk space usage for timestamp data")
) )
// ParseStream parses OpenTSDB http lines from req and calls callback for the parsed rows. // Parse parses OpenTSDB http lines from req and calls callback for the parsed rows.
// //
// The callback can be called concurrently multiple times for streamed data from req. // The callback can be called concurrently multiple times for streamed data from req.
// //
// callback shouldn't hold rows after returning. // callback shouldn't hold rows after returning.
func ParseStream(req *http.Request, callback func(rows []Row) error) error { func Parse(req *http.Request, callback func(rows []opentsdbhttp.Row) error) error {
wcr := writeconcurrencylimiter.GetReader(req.Body) wcr := writeconcurrencylimiter.GetReader(req.Body)
defer writeconcurrencylimiter.PutReader(wcr) defer writeconcurrencylimiter.PutReader(wcr)
r := io.Reader(req.Body) r := io.Reader(req.Body)
@ -62,8 +63,8 @@ func ParseStream(req *http.Request, callback func(rows []Row) error) error {
// Process the request synchronously, since there is no sense in processing a single request asynchronously. // Process the request synchronously, since there is no sense in processing a single request asynchronously.
// Sync code is easier to read and understand. // Sync code is easier to read and understand.
p := getJSONParser() p := opentsdbhttp.GetJSONParser()
defer putJSONParser(p) defer opentsdbhttp.PutJSONParser(p)
v, err := p.ParseBytes(ctx.reqBuf.B) v, err := p.ParseBytes(ctx.reqBuf.B)
if err != nil { if err != nil {
unmarshalErrors.Inc() unmarshalErrors.Inc()
@ -155,15 +156,15 @@ func putStreamContext(ctx *streamContext) {
var streamContextPool sync.Pool var streamContextPool sync.Pool
var streamContextPoolCh = make(chan *streamContext, cgroup.AvailableCPUs()) var streamContextPoolCh = make(chan *streamContext, cgroup.AvailableCPUs())
func getRows() *Rows { func getRows() *opentsdbhttp.Rows {
v := rowsPool.Get() v := rowsPool.Get()
if v == nil { if v == nil {
return &Rows{} return &opentsdbhttp.Rows{}
} }
return v.(*Rows) return v.(*opentsdbhttp.Rows)
} }
func putRows(rs *Rows) { func putRows(rs *opentsdbhttp.Rows) {
rs.Reset() rs.Reset()
rowsPool.Put(rs) rowsPool.Put(rs)
} }

View file

@ -1,4 +1,4 @@
package prometheus package stream
import ( import (
"bufio" "bufio"
@ -10,16 +10,17 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/prometheus"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter" "github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
"github.com/VictoriaMetrics/metrics" "github.com/VictoriaMetrics/metrics"
) )
// ParseStream parses lines with Prometheus exposition format from r and calls callback for the parsed rows. // Parse parses lines with Prometheus exposition format from r and calls callback for the parsed rows.
// //
// The callback can be called concurrently multiple times for streamed data from r. // The callback can be called concurrently multiple times for streamed data from r.
// //
// callback shouldn't hold rows after returning. // callback shouldn't hold rows after returning.
func ParseStream(r io.Reader, defaultTimestamp int64, isGzipped bool, callback func(rows []Row) error, errLogger func(string)) error { func Parse(r io.Reader, defaultTimestamp int64, isGzipped bool, callback func(rows []prometheus.Row) error, errLogger func(string)) error {
wcr := writeconcurrencylimiter.GetReader(r) wcr := writeconcurrencylimiter.GetReader(r)
defer writeconcurrencylimiter.PutReader(wcr) defer writeconcurrencylimiter.PutReader(wcr)
r = wcr r = wcr
@ -137,9 +138,9 @@ var streamContextPool sync.Pool
var streamContextPoolCh = make(chan *streamContext, cgroup.AvailableCPUs()) var streamContextPoolCh = make(chan *streamContext, cgroup.AvailableCPUs())
type unmarshalWork struct { type unmarshalWork struct {
rows Rows rows prometheus.Rows
ctx *streamContext ctx *streamContext
callback func(rows []Row) error callback func(rows []prometheus.Row) error
errLogger func(string) errLogger func(string)
defaultTimestamp int64 defaultTimestamp int64
reqBuf []byte reqBuf []byte
@ -154,7 +155,7 @@ func (uw *unmarshalWork) reset() {
uw.reqBuf = uw.reqBuf[:0] uw.reqBuf = uw.reqBuf[:0]
} }
func (uw *unmarshalWork) runCallback(rows []Row) { func (uw *unmarshalWork) runCallback(rows []prometheus.Row) {
ctx := uw.ctx ctx := uw.ctx
if err := uw.callback(rows); err != nil { if err := uw.callback(rows); err != nil {
ctx.callbackErrLock.Lock() ctx.callbackErrLock.Lock()

View file

@ -1,4 +1,4 @@
package prometheus package stream
import ( import (
"bytes" "bytes"
@ -10,20 +10,21 @@ import (
"time" "time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/prometheus"
) )
func TestParseStream(t *testing.T) { func TestParse(t *testing.T) {
common.StartUnmarshalWorkers() common.StartUnmarshalWorkers()
defer common.StopUnmarshalWorkers() defer common.StopUnmarshalWorkers()
const defaultTimestamp = 123 const defaultTimestamp = 123
f := func(s string, rowsExpected []Row) { f := func(s string, rowsExpected []prometheus.Row) {
t.Helper() t.Helper()
bb := bytes.NewBufferString(s) bb := bytes.NewBufferString(s)
var result []Row var result []prometheus.Row
var lock sync.Mutex var lock sync.Mutex
doneCh := make(chan struct{}) doneCh := make(chan struct{})
err := ParseStream(bb, defaultTimestamp, false, func(rows []Row) error { err := Parse(bb, defaultTimestamp, false, func(rows []prometheus.Row) error {
lock.Lock() lock.Lock()
result = appendRowCopies(result, rows) result = appendRowCopies(result, rows)
if len(result) == len(rowsExpected) { if len(result) == len(rowsExpected) {
@ -56,7 +57,7 @@ func TestParseStream(t *testing.T) {
} }
result = nil result = nil
doneCh = make(chan struct{}) doneCh = make(chan struct{})
err = ParseStream(bb, defaultTimestamp, true, func(rows []Row) error { err = Parse(bb, defaultTimestamp, true, func(rows []prometheus.Row) error {
lock.Lock() lock.Lock()
result = appendRowCopies(result, rows) result = appendRowCopies(result, rows)
if len(result) == len(rowsExpected) { if len(result) == len(rowsExpected) {
@ -79,12 +80,12 @@ func TestParseStream(t *testing.T) {
} }
} }
f("foo 123 456", []Row{{ f("foo 123 456", []prometheus.Row{{
Metric: "foo", Metric: "foo",
Value: 123, Value: 123,
Timestamp: 456000, Timestamp: 456000,
}}) }})
f(`foo{bar="baz"} 1 2`+"\n"+`aaa{} 3 4`, []Row{ f(`foo{bar="baz"} 1 2`+"\n"+`aaa{} 3 4`, []prometheus.Row{
{ {
Metric: "aaa", Metric: "aaa",
Value: 3, Value: 3,
@ -92,7 +93,7 @@ func TestParseStream(t *testing.T) {
}, },
{ {
Metric: "foo", Metric: "foo",
Tags: []Tag{{ Tags: []prometheus.Tag{{
Key: "bar", Key: "bar",
Value: "baz", Value: "baz",
}}, }},
@ -100,29 +101,29 @@ func TestParseStream(t *testing.T) {
Timestamp: 2000, Timestamp: 2000,
}, },
}) })
f("foo 23", []Row{{ f("foo 23", []prometheus.Row{{
Metric: "foo", Metric: "foo",
Value: 23, Value: 23,
Timestamp: defaultTimestamp, Timestamp: defaultTimestamp,
}}) }})
} }
func sortRows(rows []Row) { func sortRows(rows []prometheus.Row) {
sort.Slice(rows, func(i, j int) bool { sort.Slice(rows, func(i, j int) bool {
a, b := rows[i], rows[j] a, b := rows[i], rows[j]
return a.Metric < b.Metric return a.Metric < b.Metric
}) })
} }
func appendRowCopies(dst, src []Row) []Row { func appendRowCopies(dst, src []prometheus.Row) []prometheus.Row {
for _, r := range src { for _, r := range src {
// Make a copy of r, since r may contain garbage after returning from the callback to ParseStream. // Make a copy of r, since r may contain garbage after returning from the callback to Parse.
var rCopy Row var rCopy prometheus.Row
rCopy.Metric = copyString(r.Metric) rCopy.Metric = copyString(r.Metric)
rCopy.Value = r.Value rCopy.Value = r.Value
rCopy.Timestamp = r.Timestamp rCopy.Timestamp = r.Timestamp
for _, tag := range r.Tags { for _, tag := range r.Tags {
rCopy.Tags = append(rCopy.Tags, Tag{ rCopy.Tags = append(rCopy.Tags, prometheus.Tag{
Key: copyString(tag.Key), Key: copyString(tag.Key),
Value: copyString(tag.Value), Value: copyString(tag.Value),
}) })

View file

@ -1,4 +1,4 @@
package promremotewrite package stream
import ( import (
"bufio" "bufio"
@ -18,10 +18,10 @@ import (
var maxInsertRequestSize = flagutil.NewBytes("maxInsertRequestSize", 32*1024*1024, "The maximum size in bytes of a single Prometheus remote_write API request") var maxInsertRequestSize = flagutil.NewBytes("maxInsertRequestSize", 32*1024*1024, "The maximum size in bytes of a single Prometheus remote_write API request")
// ParseStream parses Prometheus remote_write message from reader and calls callback for the parsed timeseries. // Parse parses Prometheus remote_write message from reader and calls callback for the parsed timeseries.
// //
// callback shouldn't hold tss after returning. // callback shouldn't hold tss after returning.
func ParseStream(r io.Reader, callback func(tss []prompb.TimeSeries) error) error { func Parse(r io.Reader, callback func(tss []prompb.TimeSeries) error) error {
wcr := writeconcurrencylimiter.GetReader(r) wcr := writeconcurrencylimiter.GetReader(r)
defer writeconcurrencylimiter.PutReader(wcr) defer writeconcurrencylimiter.PutReader(wcr)
r = wcr r = wcr
@ -32,7 +32,7 @@ func ParseStream(r io.Reader, callback func(tss []prompb.TimeSeries) error) erro
return err return err
} }
// Synchronously process the request in order to properly return errors to ParseStream caller, // Synchronously process the request in order to properly return errors to Parse caller,
// so it could properly return HTTP 503 status code in response. // so it could properly return HTTP 503 status code in response.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/896 // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/896
bb := bodyBufferPool.Get() bb := bodyBufferPool.Get()

View file

@ -1,4 +1,4 @@
package vmimport package stream
import ( import (
"bufio" "bufio"
@ -10,6 +10,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/vmimport"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter" "github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
"github.com/VictoriaMetrics/metrics" "github.com/VictoriaMetrics/metrics"
) )
@ -17,12 +18,12 @@ import (
var maxLineLen = flagutil.NewBytes("import.maxLineLen", 100*1024*1024, "The maximum length in bytes of a single line accepted by /api/v1/import; "+ var maxLineLen = flagutil.NewBytes("import.maxLineLen", 100*1024*1024, "The maximum length in bytes of a single line accepted by /api/v1/import; "+
"the line length can be limited with 'max_rows_per_line' query arg passed to /api/v1/export") "the line length can be limited with 'max_rows_per_line' query arg passed to /api/v1/export")
// ParseStream parses /api/v1/import lines from req and calls callback for the parsed rows. // Parse parses /api/v1/import lines from req and calls callback for the parsed rows.
// //
// The callback can be called concurrently multiple times for streamed data from reader. // The callback can be called concurrently multiple times for streamed data from reader.
// //
// callback shouldn't hold rows after returning. // callback shouldn't hold rows after returning.
func ParseStream(r io.Reader, isGzipped bool, callback func(rows []Row) error) error { func Parse(r io.Reader, isGzipped bool, callback func(rows []vmimport.Row) error) error {
wcr := writeconcurrencylimiter.GetReader(r) wcr := writeconcurrencylimiter.GetReader(r)
defer writeconcurrencylimiter.PutReader(wcr) defer writeconcurrencylimiter.PutReader(wcr)
r = wcr r = wcr
@ -138,9 +139,9 @@ var streamContextPool sync.Pool
var streamContextPoolCh = make(chan *streamContext, cgroup.AvailableCPUs()) var streamContextPoolCh = make(chan *streamContext, cgroup.AvailableCPUs())
type unmarshalWork struct { type unmarshalWork struct {
rows Rows rows vmimport.Rows
ctx *streamContext ctx *streamContext
callback func(rows []Row) error callback func(rows []vmimport.Row) error
reqBuf []byte reqBuf []byte
} }
@ -151,7 +152,7 @@ func (uw *unmarshalWork) reset() {
uw.reqBuf = uw.reqBuf[:0] uw.reqBuf = uw.reqBuf[:0]
} }
func (uw *unmarshalWork) runCallback(rows []Row) { func (uw *unmarshalWork) runCallback(rows []vmimport.Row) {
ctx := uw.ctx ctx := uw.ctx
if err := uw.callback(rows); err != nil { if err := uw.callback(rows); err != nil {
ctx.callbackErrLock.Lock() ctx.callbackErrLock.Lock()

View file

@ -65,7 +65,7 @@ func NewPromRegex(expr string) (*PromRegex, error) {
return pr, nil return pr, nil
} }
// MatchString retruns true if s matches pr. // MatchString returns true if s matches pr.
// //
// The pr is automatically anchored to the beginning and to the end // The pr is automatically anchored to the beginning and to the end
// of the matching string with '^' and '$'. // of the matching string with '^' and '$'.

View file

@ -79,7 +79,7 @@ func (bsw *blockStreamWriter) reset() {
bsw.prevTimestampsBlockOffset = 0 bsw.prevTimestampsBlockOffset = 0
} }
// InitFromInmemoryPart initialzes bsw from inmemory part. // InitFromInmemoryPart initializes bsw from inmemory part.
func (bsw *blockStreamWriter) InitFromInmemoryPart(mp *inmemoryPart, compressLevel int) { func (bsw *blockStreamWriter) InitFromInmemoryPart(mp *inmemoryPart, compressLevel int) {
bsw.reset() bsw.reset()

View file

@ -4,7 +4,7 @@ func isDedupEnabled() bool {
return len(downsamplingPeriods) > 0 return len(downsamplingPeriods) > 0
} }
// DeduplicateSamples removes samples from src* if they are closer to each other than dedupInterval in millseconds. // DeduplicateSamples removes samples from src* if they are closer to each other than dedupInterval in milliseconds.
func DeduplicateSamples(srcTimestamps []int64, srcValues []float64, dedupInterval int64) ([]int64, []float64) { func DeduplicateSamples(srcTimestamps []int64, srcValues []float64, dedupInterval int64) ([]int64, []float64) {
if !needsDedup(srcTimestamps, dedupInterval) { if !needsDedup(srcTimestamps, dedupInterval) {
// Fast path - nothing to deduplicate // Fast path - nothing to deduplicate

View file

@ -650,7 +650,7 @@ func generateTSID(dst *TSID, mn *MetricName) {
// This assumption is true because mn.Tags must be sorted with mn.sortTags() before calling generateTSID() function. // This assumption is true because mn.Tags must be sorted with mn.sortTags() before calling generateTSID() function.
// This allows grouping data blocks for the same (job, instance) close to each other on disk. // This allows grouping data blocks for the same (job, instance) close to each other on disk.
// This reduces disk seeks and disk read IO when data blocks are read from disk for the same job and/or instance. // This reduces disk seeks and disk read IO when data blocks are read from disk for the same job and/or instance.
// For example, data blocks for time series matching `process_resident_memory_bytes{job="vmstorage"}` are physically adjancent on disk. // For example, data blocks for time series matching `process_resident_memory_bytes{job="vmstorage"}` are physically adjacent on disk.
if len(mn.Tags) > 0 { if len(mn.Tags) > 0 {
dst.JobID = uint32(xxhash.Sum64(mn.Tags[0].Value)) dst.JobID = uint32(xxhash.Sum64(mn.Tags[0].Value))
} }
@ -2754,7 +2754,7 @@ func (is *indexSearch) getMetricIDsForDateAndFilters(qt *querytracer.Tracer, dat
// Intersect metricIDs with the rest of filters. // Intersect metricIDs with the rest of filters.
// //
// Do not run these tag filters in parallel, since this may result in CPU and RAM waste // Do not run these tag filters in parallel, since this may result in CPU and RAM waste
// when the intial tag filters significantly reduce the number of found metricIDs, // when the initial tag filters significantly reduce the number of found metricIDs,
// so the remaining filters could be performed via much faster metricName matching instead // so the remaining filters could be performed via much faster metricName matching instead
// of slow selecting of matching metricIDs. // of slow selecting of matching metricIDs.
qtChild = qt.NewChild("intersect the remaining %d filters with the found %d metric ids", len(tfws), metricIDs.Len()) qtChild = qt.NewChild("intersect the remaining %d filters with the found %d metric ids", len(tfws), metricIDs.Len())

View file

@ -57,7 +57,7 @@ const finalPartsToMerge = 3
// Higher number of shards reduces CPU contention and increases the max bandwidth on multi-core systems. // Higher number of shards reduces CPU contention and increases the max bandwidth on multi-core systems.
var rawRowsShardsPerPartition = (cgroup.AvailableCPUs() + 1) / 2 var rawRowsShardsPerPartition = (cgroup.AvailableCPUs() + 1) / 2
// The interval for flushing bufferred rows into parts, so they become visible to search. // The interval for flushing buffered rows into parts, so they become visible to search.
const pendingRowsFlushInterval = time.Second const pendingRowsFlushInterval = time.Second
// The interval for guaranteed flush of recently ingested data from memory to on-disk parts, // The interval for guaranteed flush of recently ingested data from memory to on-disk parts,
@ -2144,7 +2144,7 @@ func runTransaction(txnLock *sync.RWMutex, pathPrefix1, pathPrefix2, txnPath str
} }
} }
// Flush pathPrefix* directory metadata to the underying storage, // Flush pathPrefix* directory metadata to the underlying storage,
// so the moved files become visible there. // so the moved files become visible there.
fs.MustSyncPath(pathPrefix1) fs.MustSyncPath(pathPrefix1)
fs.MustSyncPath(pathPrefix2) fs.MustSyncPath(pathPrefix2)

View file

@ -8,7 +8,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
) )
// rawRow reperesents raw timeseries row. // rawRow represents raw timeseries row.
type rawRow struct { type rawRow struct {
// TSID is time series id. // TSID is time series id.
TSID TSID TSID TSID

View file

@ -100,7 +100,7 @@ type Search struct {
ts tableSearch ts tableSearch
// tr contains time range used in the serach. // tr contains time range used in the search.
tr TimeRange tr TimeRange
// tfss contains tag filters used in the search. // tfss contains tag filters used in the search.
@ -165,7 +165,7 @@ func (s *Search) Init(qt *querytracer.Tracer, storage *Storage, tfss []*TagFilte
} }
// It is ok to call Init on non-nil err. // It is ok to call Init on non-nil err.
// Init must be called before returning because it will fail // Init must be called before returning because it will fail
// on Seach.MustClose otherwise. // on Search.MustClose otherwise.
s.ts.Init(storage.tb, tsids, tr) s.ts.Init(storage.tb, tsids, tr)
qt.Printf("search for parts with data for %d series", len(tsids)) qt.Printf("search for parts with data for %d series", len(tsids))
if err != nil { if err != nil {

View file

@ -1972,7 +1972,7 @@ func (s *Storage) updatePerDateData(rows []rawRow, mrs []*MetricRow) error {
s.pendingHourEntriesLock.Unlock() s.pendingHourEntriesLock.Unlock()
} }
if len(pendingDateMetricIDs) == 0 { if len(pendingDateMetricIDs) == 0 {
// Fast path - there are no new (date, metricID) entires in rows. // Fast path - there are no new (date, metricID) entries in rows.
return nil return nil
} }

View file

@ -425,7 +425,7 @@ func (tb *table) retentionWatcher() {
continue continue
} }
// There are paritions to drop. Drop them. // There are partitions to drop. Drop them.
// Remove table references from partitions, so they will be eventually // Remove table references from partitions, so they will be eventually
// closed and dropped after all the pending searches are done. // closed and dropped after all the pending searches are done.

View file

@ -51,7 +51,7 @@ func timestampToPartitionName(timestamp int64) string {
return t.Format("2006_01") return t.Format("2006_01")
} }
// fromPartitionName initializes tr from the given parition name. // fromPartitionName initializes tr from the given partition name.
func (tr *TimeRange) fromPartitionName(name string) error { func (tr *TimeRange) fromPartitionName(name string) error {
t, err := time.Parse("2006_01", name) t, err := time.Parse("2006_01", name)
if err != nil { if err != nil {

View file

@ -199,7 +199,7 @@ type aggregator struct {
// suffix contains a suffix, which should be added to aggregate metric names // suffix contains a suffix, which should be added to aggregate metric names
// //
// It contains the interval, lables in (by, without), plus output name. // It contains the interval, labels in (by, without), plus output name.
// For example, foo_bar metric name is transformed to foo_bar:1m_by_job // For example, foo_bar metric name is transformed to foo_bar:1m_by_job
// for `interval: 1m`, `by: [job]` // for `interval: 1m`, `by: [job]`
suffix string suffix string