mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-03-01 15:33:35 +00:00
Merge branch 'public-single-node' into pmm-6401-read-prometheus-data-files
This commit is contained in:
commit
8f0afc656e
10 changed files with 299 additions and 64 deletions
|
@ -373,7 +373,9 @@ func evalAggrFunc(qt *querytracer.Tracer, ec *EvalConfig, ae *metricsql.AggrFunc
|
|||
args: args,
|
||||
ec: ec,
|
||||
}
|
||||
qtChild := qt.NewChild("eval %s", ae.Name)
|
||||
rv, err := af(afa)
|
||||
qtChild.Done()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf(`cannot evaluate %q: %w`, ae.AppendString(nil), err)
|
||||
}
|
||||
|
|
|
@ -7522,6 +7522,22 @@ func TestExecSuccess(t *testing.T) {
|
|||
resultExpected := []netstorage.Result{r1, r2, r3, r4}
|
||||
f(q, resultExpected)
|
||||
})
|
||||
t.Run(`rollup_candlestick(high)`, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
q := `rollup_candlestick(alias(round(rand(0),0.01),"foobar")[:10s], "high")`
|
||||
r := netstorage.Result{
|
||||
MetricName: metricNameExpected,
|
||||
Values: []float64{0.9, 0.94, 0.97, 0.93, 0.98, 0.92},
|
||||
Timestamps: timestampsExpected,
|
||||
}
|
||||
r.MetricName.MetricGroup = []byte("foobar")
|
||||
r.MetricName.Tags = []storage.Tag{{
|
||||
Key: []byte("rollup"),
|
||||
Value: []byte("high"),
|
||||
}}
|
||||
resultExpected := []netstorage.Result{r}
|
||||
f(q, resultExpected)
|
||||
})
|
||||
t.Run(`rollup_increase()`, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
q := `sort(rollup_increase(time()))`
|
||||
|
@ -7555,6 +7571,61 @@ func TestExecSuccess(t *testing.T) {
|
|||
resultExpected := []netstorage.Result{r1, r2, r3}
|
||||
f(q, resultExpected)
|
||||
})
|
||||
t.Run(`rollup_rate()`, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
q := `rollup_rate((2000-time())[600s])`
|
||||
r1 := netstorage.Result{
|
||||
MetricName: metricNameExpected,
|
||||
Values: []float64{5, 4, 3, 2, 1, 0},
|
||||
Timestamps: timestampsExpected,
|
||||
}
|
||||
r1.MetricName.Tags = []storage.Tag{{
|
||||
Key: []byte("rollup"),
|
||||
Value: []byte("avg"),
|
||||
}}
|
||||
r2 := netstorage.Result{
|
||||
MetricName: metricNameExpected,
|
||||
Values: []float64{6, 5, 4, 3, 2, 1},
|
||||
Timestamps: timestampsExpected,
|
||||
}
|
||||
r2.MetricName.Tags = []storage.Tag{{
|
||||
Key: []byte("rollup"),
|
||||
Value: []byte("max"),
|
||||
}}
|
||||
r3 := netstorage.Result{
|
||||
MetricName: metricNameExpected,
|
||||
Values: []float64{4, 3, 2, 1, 0, -1},
|
||||
Timestamps: timestampsExpected,
|
||||
}
|
||||
r3.MetricName.Tags = []storage.Tag{{
|
||||
Key: []byte("rollup"),
|
||||
Value: []byte("min"),
|
||||
}}
|
||||
resultExpected := []netstorage.Result{r1, r2, r3}
|
||||
f(q, resultExpected)
|
||||
})
|
||||
t.Run(`rollup_rate(q, "max")`, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
q := `rollup_rate((2000-time())[600s], "max")`
|
||||
r := netstorage.Result{
|
||||
MetricName: metricNameExpected,
|
||||
Values: []float64{6, 5, 4, 3, 2, 1},
|
||||
Timestamps: timestampsExpected,
|
||||
}
|
||||
resultExpected := []netstorage.Result{r}
|
||||
f(q, resultExpected)
|
||||
})
|
||||
t.Run(`rollup_rate(q, "avg")`, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
q := `rollup_rate((2000-time())[600s], "avg")`
|
||||
r := netstorage.Result{
|
||||
MetricName: metricNameExpected,
|
||||
Values: []float64{5, 4, 3, 2, 1, 0},
|
||||
Timestamps: timestampsExpected,
|
||||
}
|
||||
resultExpected := []netstorage.Result{r}
|
||||
f(q, resultExpected)
|
||||
})
|
||||
t.Run(`rollup_scrape_interval()`, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
q := `sort_by_label(rollup_scrape_interval(1[5m:10S]), "rollup")`
|
||||
|
@ -7654,6 +7725,17 @@ func TestExecSuccess(t *testing.T) {
|
|||
resultExpected := []netstorage.Result{r1, r2, r3}
|
||||
f(q, resultExpected)
|
||||
})
|
||||
t.Run(`rollup_deriv(q, "max")`, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
q := `sort(rollup_deriv(time()[100s:50s], "max"))`
|
||||
r := netstorage.Result{
|
||||
MetricName: metricNameExpected,
|
||||
Values: []float64{1, 1, 1, 1, 1, 1},
|
||||
Timestamps: timestampsExpected,
|
||||
}
|
||||
resultExpected := []netstorage.Result{r}
|
||||
f(q, resultExpected)
|
||||
})
|
||||
t.Run(`{}`, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
q := `{}`
|
||||
|
@ -8535,6 +8617,8 @@ func TestExecError(t *testing.T) {
|
|||
f(`changes_prometheus()`)
|
||||
f(`delta()`)
|
||||
f(`delta_prometheus()`)
|
||||
f(`rollup_candlestick()`)
|
||||
f(`rollup()`)
|
||||
|
||||
// Invalid argument type
|
||||
f(`median_over_time({}, 2)`)
|
||||
|
@ -8621,6 +8705,12 @@ func TestExecError(t *testing.T) {
|
|||
f(`ru()`)
|
||||
f(`ru(1)`)
|
||||
f(`ru(1,3,3)`)
|
||||
|
||||
// Invalid rollup tags
|
||||
f(`rollup_rate(time()[5m], "")`)
|
||||
f(`rollup_rate(time()[5m], "foo")`)
|
||||
f(`rollup_rate(time()[5m], "foo", "bar")`)
|
||||
f(`rollup_candlestick(time(), "foo")`)
|
||||
}
|
||||
|
||||
func testResultsEqual(t *testing.T, result, resultExpected []netstorage.Result) {
|
||||
|
|
|
@ -68,13 +68,13 @@ var rollupFuncs = map[string]newRollupFunc{
|
|||
"rate": newRollupFuncOneArg(rollupDerivFast), // + rollupFuncsRemoveCounterResets
|
||||
"rate_over_sum": newRollupFuncOneArg(rollupRateOverSum),
|
||||
"resets": newRollupFuncOneArg(rollupResets),
|
||||
"rollup": newRollupFuncOneArg(rollupFake),
|
||||
"rollup_candlestick": newRollupFuncOneArg(rollupFake),
|
||||
"rollup_delta": newRollupFuncOneArg(rollupFake),
|
||||
"rollup_deriv": newRollupFuncOneArg(rollupFake),
|
||||
"rollup_increase": newRollupFuncOneArg(rollupFake), // + rollupFuncsRemoveCounterResets
|
||||
"rollup_rate": newRollupFuncOneArg(rollupFake), // + rollupFuncsRemoveCounterResets
|
||||
"rollup_scrape_interval": newRollupFuncOneArg(rollupFake),
|
||||
"rollup": newRollupFuncOneOrTwoArgs(rollupFake),
|
||||
"rollup_candlestick": newRollupFuncOneOrTwoArgs(rollupFake),
|
||||
"rollup_delta": newRollupFuncOneOrTwoArgs(rollupFake),
|
||||
"rollup_deriv": newRollupFuncOneOrTwoArgs(rollupFake),
|
||||
"rollup_increase": newRollupFuncOneOrTwoArgs(rollupFake), // + rollupFuncsRemoveCounterResets
|
||||
"rollup_rate": newRollupFuncOneOrTwoArgs(rollupFake), // + rollupFuncsRemoveCounterResets
|
||||
"rollup_scrape_interval": newRollupFuncOneOrTwoArgs(rollupFake),
|
||||
"scrape_interval": newRollupFuncOneArg(rollupScrapeInterval),
|
||||
"share_gt_over_time": newRollupShareGT,
|
||||
"share_le_over_time": newRollupShareLE,
|
||||
|
@ -282,6 +282,29 @@ func getRollupAggrFuncNames(expr metricsql.Expr) ([]string, error) {
|
|||
return aggrFuncNames, nil
|
||||
}
|
||||
|
||||
func getRollupTag(expr metricsql.Expr) (string, error) {
|
||||
fe, ok := expr.(*metricsql.FuncExpr)
|
||||
if !ok {
|
||||
logger.Panicf("BUG: unexpected expression; want metricsql.FuncExpr; got %T; value: %s", expr, expr.AppendString(nil))
|
||||
}
|
||||
if len(fe.Args) < 2 {
|
||||
return "", nil
|
||||
}
|
||||
if len(fe.Args) != 2 {
|
||||
return "", fmt.Errorf("unexpected number of args; got %d; want %d", len(fe.Args), 2)
|
||||
}
|
||||
arg := fe.Args[1]
|
||||
|
||||
se, ok := arg.(*metricsql.StringExpr)
|
||||
if !ok {
|
||||
return "", fmt.Errorf("unexpected rollup tag type: %s; expecting string", arg.AppendString(nil))
|
||||
}
|
||||
if se.S == "" {
|
||||
return "", fmt.Errorf("rollup tag cannot be empty")
|
||||
}
|
||||
return se.S, nil
|
||||
}
|
||||
|
||||
func getRollupConfigs(funcName string, rf rollupFunc, expr metricsql.Expr, start, end, step int64, maxPointsPerSeries int,
|
||||
window, lookbackDelta int64, sharedTimestamps []int64) (
|
||||
func(values []float64, timestamps []int64), []*rollupConfig, error) {
|
||||
|
@ -311,35 +334,69 @@ func getRollupConfigs(funcName string, rf rollupFunc, expr metricsql.Expr, start
|
|||
samplesScannedPerCall: samplesScannedPerCall,
|
||||
}
|
||||
}
|
||||
appendRollupConfigs := func(dst []*rollupConfig) []*rollupConfig {
|
||||
dst = append(dst, newRollupConfig(rollupMin, "min"))
|
||||
dst = append(dst, newRollupConfig(rollupMax, "max"))
|
||||
dst = append(dst, newRollupConfig(rollupAvg, "avg"))
|
||||
return dst
|
||||
|
||||
appendRollupConfigs := func(dst []*rollupConfig, expr metricsql.Expr) ([]*rollupConfig, error) {
|
||||
tag, err := getRollupTag(expr)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("invalid args for %s: %w", expr.AppendString(nil), err)
|
||||
}
|
||||
switch tag {
|
||||
case "min":
|
||||
dst = append(dst, newRollupConfig(rollupMin, ""))
|
||||
case "max":
|
||||
dst = append(dst, newRollupConfig(rollupMax, ""))
|
||||
case "avg":
|
||||
dst = append(dst, newRollupConfig(rollupAvg, ""))
|
||||
case "":
|
||||
dst = append(dst, newRollupConfig(rollupMin, "min"))
|
||||
dst = append(dst, newRollupConfig(rollupMax, "max"))
|
||||
dst = append(dst, newRollupConfig(rollupAvg, "avg"))
|
||||
default:
|
||||
return nil, fmt.Errorf("unexpected second arg for %s: %q; want `min`, `max` or `avg`", expr.AppendString(nil), tag)
|
||||
}
|
||||
return dst, nil
|
||||
}
|
||||
var rcs []*rollupConfig
|
||||
var err error
|
||||
switch funcName {
|
||||
case "rollup":
|
||||
rcs = appendRollupConfigs(rcs)
|
||||
rcs, err = appendRollupConfigs(rcs, expr)
|
||||
case "rollup_rate", "rollup_deriv":
|
||||
preFuncPrev := preFunc
|
||||
preFunc = func(values []float64, timestamps []int64) {
|
||||
preFuncPrev(values, timestamps)
|
||||
derivValues(values, timestamps)
|
||||
}
|
||||
rcs = appendRollupConfigs(rcs)
|
||||
rcs, err = appendRollupConfigs(rcs, expr)
|
||||
case "rollup_increase", "rollup_delta":
|
||||
preFuncPrev := preFunc
|
||||
preFunc = func(values []float64, timestamps []int64) {
|
||||
preFuncPrev(values, timestamps)
|
||||
deltaValues(values)
|
||||
}
|
||||
rcs = appendRollupConfigs(rcs)
|
||||
rcs, err = appendRollupConfigs(rcs, expr)
|
||||
case "rollup_candlestick":
|
||||
rcs = append(rcs, newRollupConfig(rollupOpen, "open"))
|
||||
rcs = append(rcs, newRollupConfig(rollupClose, "close"))
|
||||
rcs = append(rcs, newRollupConfig(rollupLow, "low"))
|
||||
rcs = append(rcs, newRollupConfig(rollupHigh, "high"))
|
||||
tag, err := getRollupTag(expr)
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("invalid args for %s: %w", expr.AppendString(nil), err)
|
||||
}
|
||||
switch tag {
|
||||
case "open":
|
||||
rcs = append(rcs, newRollupConfig(rollupOpen, "open"))
|
||||
case "close":
|
||||
rcs = append(rcs, newRollupConfig(rollupClose, "close"))
|
||||
case "low":
|
||||
rcs = append(rcs, newRollupConfig(rollupLow, "low"))
|
||||
case "high":
|
||||
rcs = append(rcs, newRollupConfig(rollupHigh, "high"))
|
||||
case "":
|
||||
rcs = append(rcs, newRollupConfig(rollupOpen, "open"))
|
||||
rcs = append(rcs, newRollupConfig(rollupClose, "close"))
|
||||
rcs = append(rcs, newRollupConfig(rollupLow, "low"))
|
||||
rcs = append(rcs, newRollupConfig(rollupHigh, "high"))
|
||||
default:
|
||||
return nil, nil, fmt.Errorf("unexpected second arg for %s: %q; want `min`, `max` or `avg`", expr.AppendString(nil), tag)
|
||||
}
|
||||
case "rollup_scrape_interval":
|
||||
preFuncPrev := preFunc
|
||||
preFunc = func(values []float64, timestamps []int64) {
|
||||
|
@ -357,7 +414,7 @@ func getRollupConfigs(funcName string, rf rollupFunc, expr metricsql.Expr, start
|
|||
values[0] = values[1]
|
||||
}
|
||||
}
|
||||
rcs = appendRollupConfigs(rcs)
|
||||
rcs, err = appendRollupConfigs(rcs, expr)
|
||||
case "aggr_over_time":
|
||||
aggrFuncNames, err := getRollupAggrFuncNames(expr)
|
||||
if err != nil {
|
||||
|
@ -376,6 +433,9 @@ func getRollupConfigs(funcName string, rf rollupFunc, expr metricsql.Expr, start
|
|||
default:
|
||||
rcs = append(rcs, newRollupConfig(rf, ""))
|
||||
}
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
return preFunc, rcs, nil
|
||||
}
|
||||
|
||||
|
@ -845,6 +905,15 @@ func newRollupFuncTwoArgs(rf rollupFunc) newRollupFunc {
|
|||
}
|
||||
}
|
||||
|
||||
func newRollupFuncOneOrTwoArgs(rf rollupFunc) newRollupFunc {
|
||||
return func(args []interface{}) (rollupFunc, error) {
|
||||
if len(args) < 1 || len(args) > 2 {
|
||||
return nil, fmt.Errorf("unexpected number of args; got %d; want 1...2", len(args))
|
||||
}
|
||||
return rf, nil
|
||||
}
|
||||
}
|
||||
|
||||
func newRollupHoltWinters(args []interface{}) (rollupFunc, error) {
|
||||
if err := expectRollupArgsNum(args, 3); err != nil {
|
||||
return nil, err
|
||||
|
|
|
@ -26,6 +26,8 @@ The following tip changes can be tested by building VictoriaMetrics components f
|
|||
* FEATURE: [vmauth](https://docs.victoriametrics.com/vmauth.html): choose the backend with the minimum number of concurrently executed requests [among the configured backends](https://docs.victoriametrics.com/vmauth.html#load-balancing) in a round-robin manner for serving the incoming requests. This allows spreading the load among backends more evenly, while improving the response time.
|
||||
* FEATURE: [vmalert enterprise](https://docs.victoriametrics.com/vmalert.html): add ability to read alerting and recording rules from S3, GCS or S3-compatible object storage. See [these docs](https://docs.victoriametrics.com/vmalert.html#reading-rules-from-object-storage).
|
||||
* FEATURE: [vmctl](https://docs.victoriametrics.com/vmctl.html): automatically retry requests to remote storage if up to 5 errors occur during the data migration process. This should help continuing the data migration process on temporary errors. Previously `vmctl` was stopping after the first error. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3600).
|
||||
* FEATURE: [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html): support optional 2nd argument `min`, `max` or `avg` for [rollup](https://docs.victoriametrics.com/MetricsQL.html#rollup), [rollup_delta](https://docs.victoriametrics.com/MetricsQL.html#rollup_delta), [rollup_deriv](https://docs.victoriametrics.com/MetricsQL.html#rollup_deriv), [rollup_increase](https://docs.victoriametrics.com/MetricsQL.html#rollup_increase), [rollup_rate](https://docs.victoriametrics.com/MetricsQL.html#rollup_rate) and [rollup_scrape_interval](https://docs.victoriametrics.com/MetricsQL.html#rollup_scrape_interval) function. If the second argument is passed, then the function returns only the selected aggregation type. This change can be useful for situations where only one type of rollup calculation is needed. For example, `rollup_rate(requests_total[1i], "max")` would return only the max increase rates for `requests_total` metric per each interval between adjacent points on the graph. See [this article](https://valyala.medium.com/why-irate-from-prometheus-doesnt-capture-spikes-45f9896d7832) for details.
|
||||
* FEATURE: [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html): support optional 2nd argument `open`, `low`, `high`, `close` for [rollup_candlestick](https://docs.victoriametrics.com/MetricsQL.html#rollup_candlestick) function. If the second argument is passed, then the function returns only the selected aggregation type.
|
||||
* FEATURE: [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html): add [share(q)](https://docs.victoriametrics.com/MetricsQL.html#share) aggregate function.
|
||||
* FEATURE: [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html): add `mad_over_time(m[d])` function for calculating the [median absolute deviation](https://en.wikipedia.org/wiki/Median_absolute_deviation) over raw samples on the lookbehind window `d`. See [this feature request](https://github.com/prometheus/prometheus/issues/5514).
|
||||
* FEATURE: [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html): add `range_mad(q)` function for calculating the [median absolute deviation](https://en.wikipedia.org/wiki/Median_absolute_deviation) over points per each time series returned by `q`.
|
||||
|
@ -33,12 +35,15 @@ The following tip changes can be tested by building VictoriaMetrics components f
|
|||
* FEATURE: [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html): add `range_trim_outliers(k, q)` function for dropping outliers located farther than `k*range_mad(q)` from the `range_median(q)`. This should help removing outliers during query time at [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3759).
|
||||
* FEATURE: [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html): add `range_trim_zscore(z, q)` function for dropping outliers located farther than `z*range_stddev(q)` from `range_avg(q)`. This should help removing outliers during query time at [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3759).
|
||||
* FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): show `median` instead of `avg` in graph tooltip and line legend, since `median` is more tolerant against spikes. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3706).
|
||||
|
||||
* FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): small UX improvements for mobile view. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3707) and [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/3848).
|
||||
* FEATURE: add `-search.logQueryMemoryUsage` command-line flag for logging queries, which need more memory than specified by this command-line flag. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3553). Thanks to @michal-kralik for the idea and the intial implementation.
|
||||
* FEATURE: allow setting zero value for `-search.latencyOffset` command-line flag. This may be needed in [some cases](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2061#issuecomment-1299109836). Previously the minimum supported value for `-search.latencyOffset` command-line flag was `1s`.
|
||||
|
||||
* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): immediately cancel in-flight scrape requests during configuration reload when [stream parsing mode](https://docs.victoriametrics.com/vmagent.html#stream-parsing-mode) is disabled. Previously `vmagent` could wait for long time until all the in-flight requests are completed before reloading the configuration. This could significantly slow down configuration reload. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3747).
|
||||
* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): do not wait for 2 seconds after the first unsuccessful attempt to scrape the target before performing the next attempt. This should improve scrape speed when the target closes [http keep-alive connection](https://en.wikipedia.org/wiki/HTTP_persistent_connection) between scrapes. See [this](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3293) and [this](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3747) issues.
|
||||
* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): fix [Azure service discovery](https://docs.victoriametrics.com/sd_configs.html#azure_sd_configs) inside [Azure Container App](https://learn.microsoft.com/en-us/azure/container-apps/overview). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3830). Thanks to @MattiasAng for the fix!
|
||||
* BUGFIX: do not put auxiliary directories scheduled for removal into snapshots. This should prevent from `cannot create hard links from ...must-remove...` errors when making snapshots / backups. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3858).
|
||||
* BUGFIX: prevent from possible data ingestion slowdown and query performance slowdown during [background merges of big parts](https://docs.victoriametrics.com/#storage) on systems with small number of CPU cores (1 or 2 CPU cores). The issue has been introduced in [v1.85.0](https://docs.victoriametrics.com/CHANGELOG.html#v1850) when implementing [this feature](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3337). See also [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3790).
|
||||
* BUGFIX: properly parse timestamps in milliseconds when [ingesting data via OpenTSDB telnet put protocol](https://docs.victoriametrics.com/#sending-data-via-telnet-put-protocol). Previously timestamps in milliseconds were mistakenly multiplied by 1000. Thanks to @Droxenator for the [pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/3810).
|
||||
* BUGFIX: [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html): do not add extrapolated points outside the real points when using [interpolate()](https://docs.victoriametrics.com/MetricsQL.html#interpolate) function. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3816).
|
||||
|
|
|
@ -475,7 +475,7 @@ It is expected that the `series_selector` returns time series of [counter type](
|
|||
|
||||
Metric names are stripped from the resulting rollups. Add [keep_metric_names](#keep_metric_names) modifier in order to keep metric names.
|
||||
|
||||
This function is supported by PromQL. See also [rate](#rate).
|
||||
This function is supported by PromQL. See also [rate](#rate) and [rollup_rate](#rollup_rate).
|
||||
|
||||
#### lag
|
||||
|
||||
|
@ -588,7 +588,7 @@ It is expected that the `series_selector` returns time series of [counter type](
|
|||
|
||||
Metric names are stripped from the resulting rollups. Add [keep_metric_names](#keep_metric_names) modifier in order to keep metric names.
|
||||
|
||||
This function is supported by PromQL. See also [irate](#irate).
|
||||
This function is supported by PromQL. See also [irate](#irate) and [rollup_rate](#rollup_rate).
|
||||
|
||||
#### rate_over_sum
|
||||
|
||||
|
@ -615,6 +615,8 @@ This function is supported by PromQL.
|
|||
on the given lookbehind window `d` and returns them in time series with `rollup="min"`, `rollup="max"` and `rollup="avg"` additional labels.
|
||||
These values are calculated individually per each time series returned from the given [series_selector](https://docs.victoriametrics.com/keyConcepts.html#filtering).
|
||||
|
||||
Optional 2nd argument `min`, `max` or `avg` can be passed to keep only one calculation result and without adding a label.
|
||||
|
||||
#### rollup_candlestick
|
||||
|
||||
`rollup_candlestick(series_selector[d])` is a [rollup function](#rollup-functions), which calculates `open`, `high`, `low` and `close` values (aka OHLC)
|
||||
|
@ -622,6 +624,8 @@ over raw samples on the given lookbehind window `d` and returns them in time ser
|
|||
The calculations are performed individually per each time series returned
|
||||
from the given [series_selector](https://docs.victoriametrics.com/keyConcepts.html#filtering). This function is useful for financial applications.
|
||||
|
||||
Optional 2nd argument `min`, `max` or `avg` can be passed to keep only one calculation result and without adding a label.
|
||||
|
||||
#### rollup_delta
|
||||
|
||||
`rollup_delta(series_selector[d])` is a [rollup function](#rollup-functions), which calculates differences between adjacent raw samples
|
||||
|
@ -629,6 +633,8 @@ on the given lookbehind window `d` and returns `min`, `max` and `avg` values for
|
|||
and returns them in time series with `rollup="min"`, `rollup="max"` and `rollup="avg"` additional labels.
|
||||
The calculations are performed individually per each time series returned from the given [series_selector](https://docs.victoriametrics.com/keyConcepts.html#filtering).
|
||||
|
||||
Optional 2nd argument `min`, `max` or `avg` can be passed to keep only one calculation result and without adding a label.
|
||||
|
||||
Metric names are stripped from the resulting rollups. Add [keep_metric_names](#keep_metric_names) modifier in order to keep metric names.
|
||||
|
||||
See also [rollup_increase](#rollup_increase).
|
||||
|
@ -640,6 +646,8 @@ for adjacent raw samples on the given lookbehind window `d` and returns `min`, `
|
|||
and returns them in time series with `rollup="min"`, `rollup="max"` and `rollup="avg"` additional labels.
|
||||
The calculations are performed individually per each time series returned from the given [series_selector](https://docs.victoriametrics.com/keyConcepts.html#filtering).
|
||||
|
||||
Optional 2nd argument `min`, `max` or `avg` can be passed to keep only one calculation result and without adding a label.
|
||||
|
||||
Metric names are stripped from the resulting rollups. Add [keep_metric_names](#keep_metric_names) modifier in order to keep metric names.
|
||||
|
||||
#### rollup_increase
|
||||
|
@ -649,6 +657,8 @@ on the given lookbehind window `d` and returns `min`, `max` and `avg` values for
|
|||
and returns them in time series with `rollup="min"`, `rollup="max"` and `rollup="avg"` additional labels.
|
||||
The calculations are performed individually per each time series returned from the given [series_selector](https://docs.victoriametrics.com/keyConcepts.html#filtering).
|
||||
|
||||
Optional 2nd argument `min`, `max` or `avg` can be passed to keep only one calculation result and without adding a label.
|
||||
|
||||
Metric names are stripped from the resulting rollups. Add [keep_metric_names](#keep_metric_names) modifier in order to keep metric names. See also [rollup_delta](#rollup_delta).
|
||||
|
||||
#### rollup_rate
|
||||
|
@ -656,8 +666,15 @@ Metric names are stripped from the resulting rollups. Add [keep_metric_names](#k
|
|||
`rollup_rate(series_selector[d])` is a [rollup function](#rollup-functions), which calculates per-second change rates for adjacent raw samples
|
||||
on the given lookbehind window `d` and returns `min`, `max` and `avg` values for the calculated per-second change rates
|
||||
and returns them in time series with `rollup="min"`, `rollup="max"` and `rollup="avg"` additional labels.
|
||||
|
||||
See [this article](https://valyala.medium.com/why-irate-from-prometheus-doesnt-capture-spikes-45f9896d7832) in order to undertand better
|
||||
when to use `rollup_rate()`.
|
||||
|
||||
Optional 2nd argument `min`, `max` or `avg` can be passed to keep only one calculation result and without adding a label.
|
||||
|
||||
The calculations are performed individually per each time series returned from the given [series_selector](https://docs.victoriametrics.com/keyConcepts.html#filtering).
|
||||
|
||||
|
||||
Metric names are stripped from the resulting rollups. Add [keep_metric_names](#keep_metric_names) modifier in order to keep metric names.
|
||||
|
||||
#### rollup_scrape_interval
|
||||
|
@ -667,6 +684,8 @@ adjacent raw samples on the given lookbehind window `d` and returns `min`, `max`
|
|||
and returns them in time series with `rollup="min"`, `rollup="max"` and `rollup="avg"` additional labels.
|
||||
The calculations are performed individually per each time series returned from the given [series_selector](https://docs.victoriametrics.com/keyConcepts.html#filtering).
|
||||
|
||||
Optional 2nd argument `min`, `max` or `avg` can be passed to keep only one calculation result and without adding a label.
|
||||
|
||||
Metric names are stripped from the resulting rollups. Add [keep_metric_names](#keep_metric_names) modifier in order to keep metric names. See also [scrape_interval](#scrape_interval).
|
||||
|
||||
#### scrape_interval
|
||||
|
@ -1587,9 +1606,9 @@ See also [label_lowercase](#label_lowercase).
|
|||
#### label_value
|
||||
|
||||
`label_value(q, "label")` is [label manipulation function](#label-manipulation-functions), which returns numeric values
|
||||
for the given `label` for every time series returned by `q`.
|
||||
for the given `label` for every time series returned by `q`.
|
||||
|
||||
For example, if `label_value(foo, "bar")` is applied to `foo{bar="1.234"}`, then it will return a time series
|
||||
For example, if `label_value(foo, "bar")` is applied to `foo{bar="1.234"}`, then it will return a time series
|
||||
`foo{bar="1.234"}` with `1.234` value. Function will return no data for non-numeric label values.
|
||||
|
||||
#### sort_by_label
|
||||
|
|
|
@ -262,7 +262,7 @@ func MustRemoveTemporaryDirs(dir string) {
|
|||
continue
|
||||
}
|
||||
dirName := fi.Name()
|
||||
if strings.Contains(dirName, ".must-remove.") {
|
||||
if IsScheduledForRemoval(dirName) {
|
||||
fullPath := dir + "/" + dirName
|
||||
MustRemoveAll(fullPath)
|
||||
}
|
||||
|
@ -467,3 +467,7 @@ func isHTTPURL(targetURL string) bool {
|
|||
return err == nil && (parsed.Scheme == "http" || parsed.Scheme == "https") && parsed.Host != ""
|
||||
|
||||
}
|
||||
|
||||
func IsScheduledForRemoval(name string) bool {
|
||||
return strings.Contains(name, ".must-remove.")
|
||||
}
|
||||
|
|
|
@ -1835,5 +1835,5 @@ func removeParts(pws []*partWrapper, partsToRemove map[*partWrapper]bool) ([]*pa
|
|||
func isSpecialDir(name string) bool {
|
||||
// Snapshots and cache dirs aren't used anymore.
|
||||
// Keep them here for backwards compatibility.
|
||||
return name == "tmp" || name == "txn" || name == "snapshots" || name == "cache"
|
||||
return name == "tmp" || name == "txn" || name == "snapshots" || name == "cache" || fs.IsScheduledForRemoval(name)
|
||||
}
|
||||
|
|
|
@ -14,6 +14,7 @@ import (
|
|||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutils"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/proxy"
|
||||
"github.com/VictoriaMetrics/fasthttp"
|
||||
"github.com/VictoriaMetrics/metrics"
|
||||
|
@ -263,7 +264,10 @@ func (c *client) ReadData(dst []byte) ([]byte, error) {
|
|||
dst = resp.SwapBody(dst)
|
||||
}
|
||||
|
||||
err := doRequestWithPossibleRetry(c.ctx, c.hc, req, resp, deadline)
|
||||
ctx, cancel := context.WithDeadline(c.ctx, deadline)
|
||||
defer cancel()
|
||||
|
||||
err := doRequestWithPossibleRetry(ctx, c.hc, req, resp)
|
||||
statusCode := resp.StatusCode()
|
||||
redirectsCount := 0
|
||||
for err == nil && isStatusRedirect(statusCode) {
|
||||
|
@ -283,7 +287,7 @@ func (c *client) ReadData(dst []byte) ([]byte, error) {
|
|||
break
|
||||
}
|
||||
req.URI().UpdateBytes(location)
|
||||
err = doRequestWithPossibleRetry(c.ctx, c.hc, req, resp, deadline)
|
||||
err = doRequestWithPossibleRetry(ctx, c.hc, req, resp)
|
||||
statusCode = resp.StatusCode()
|
||||
redirectsCount++
|
||||
}
|
||||
|
@ -350,34 +354,45 @@ var (
|
|||
scrapeRetries = metrics.NewCounter(`vm_promscrape_scrape_retries_total`)
|
||||
)
|
||||
|
||||
func doRequestWithPossibleRetry(ctx context.Context, hc *fasthttp.HostClient, req *fasthttp.Request, resp *fasthttp.Response, deadline time.Time) error {
|
||||
sleepTime := time.Second
|
||||
func doRequestWithPossibleRetry(ctx context.Context, hc *fasthttp.HostClient, req *fasthttp.Request, resp *fasthttp.Response) error {
|
||||
scrapeRequests.Inc()
|
||||
reqCtx, cancel := context.WithDeadline(ctx, deadline)
|
||||
defer cancel()
|
||||
for {
|
||||
|
||||
var reqErr error
|
||||
// Return true if the request execution is completed and retry is not required
|
||||
attempt := func() bool {
|
||||
// Use DoCtx instead of Do in order to support context cancellation
|
||||
err := hc.DoCtx(reqCtx, req, resp)
|
||||
if err == nil {
|
||||
reqErr = hc.DoCtx(ctx, req, resp)
|
||||
if reqErr == nil {
|
||||
statusCode := resp.StatusCode()
|
||||
if statusCode != fasthttp.StatusTooManyRequests {
|
||||
return nil
|
||||
return true
|
||||
}
|
||||
} else if err != fasthttp.ErrConnectionClosed && !strings.Contains(err.Error(), "broken pipe") {
|
||||
return err
|
||||
} else if reqErr != fasthttp.ErrConnectionClosed && !strings.Contains(reqErr.Error(), "broken pipe") {
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// Retry request after exponentially increased sleep.
|
||||
maxSleepTime := time.Until(deadline)
|
||||
if sleepTime > maxSleepTime {
|
||||
return fmt.Errorf("the server closes all the connection attempts: %w", err)
|
||||
if attempt() {
|
||||
return reqErr
|
||||
}
|
||||
|
||||
// The first attempt was unsuccessful. Use exponential backoff for further attempts.
|
||||
// Perform the second attempt immediately after the first attempt - this should help
|
||||
// in cases when the remote side closes the keep-alive connection before the first attempt.
|
||||
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3293
|
||||
sleepTime := time.Second
|
||||
// It is expected that the deadline is already set to ctx, so the loop below
|
||||
// should eventually finish if all the attempt() calls are unsuccessful.
|
||||
for {
|
||||
scrapeRetries.Inc()
|
||||
if attempt() {
|
||||
return reqErr
|
||||
}
|
||||
sleepTime += sleepTime
|
||||
if sleepTime > maxSleepTime {
|
||||
sleepTime = maxSleepTime
|
||||
if !discoveryutils.SleepCtx(ctx, sleepTime) {
|
||||
return reqErr
|
||||
}
|
||||
time.Sleep(sleepTime)
|
||||
scrapeRetries.Inc()
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -238,7 +238,7 @@ func (c *Client) getAPIResponseWithParamsAndClientCtx(ctx context.Context, clien
|
|||
modifyRequest(req)
|
||||
}
|
||||
|
||||
resp, err := doRequestWithPossibleRetry(client, req, deadline)
|
||||
resp, err := doRequestWithPossibleRetry(client, req)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot fetch %q: %w", requestURL, err)
|
||||
}
|
||||
|
@ -269,31 +269,48 @@ func (c *Client) Stop() {
|
|||
c.clientCancel()
|
||||
}
|
||||
|
||||
func doRequestWithPossibleRetry(hc *HTTPClient, req *http.Request, deadline time.Time) (*http.Response, error) {
|
||||
sleepTime := time.Second
|
||||
func doRequestWithPossibleRetry(hc *HTTPClient, req *http.Request) (*http.Response, error) {
|
||||
discoveryRequests.Inc()
|
||||
|
||||
for {
|
||||
resp, err := hc.client.Do(req)
|
||||
if err == nil {
|
||||
var (
|
||||
reqErr error
|
||||
resp *http.Response
|
||||
)
|
||||
// Return true if the request execution is completed and retry is not required
|
||||
attempt := func() bool {
|
||||
resp, reqErr = hc.client.Do(req)
|
||||
if reqErr == nil {
|
||||
statusCode := resp.StatusCode
|
||||
if statusCode != http.StatusTooManyRequests {
|
||||
return resp, nil
|
||||
return true
|
||||
}
|
||||
} else if err != net.ErrClosed && !strings.Contains(err.Error(), "broken pipe") {
|
||||
return nil, err
|
||||
} else if reqErr != net.ErrClosed && !strings.Contains(reqErr.Error(), "broken pipe") {
|
||||
return true
|
||||
}
|
||||
// Retry request after exponentially increased sleep.
|
||||
maxSleepTime := time.Until(deadline)
|
||||
if sleepTime > maxSleepTime {
|
||||
return nil, fmt.Errorf("the server closes all the connection attempts: %w", err)
|
||||
return false
|
||||
}
|
||||
|
||||
if attempt() {
|
||||
return resp, reqErr
|
||||
}
|
||||
|
||||
// The first attempt was unsuccessful. Use exponential backoff for further attempts.
|
||||
// Perform the second attempt immediately after the first attempt - this should help
|
||||
// in cases when the remote side closes the keep-alive connection before the first attempt.
|
||||
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3293
|
||||
sleepTime := time.Second
|
||||
// It is expected that the deadline is already set to req.Context(), so the loop below
|
||||
// should eventually finish if all the attempt() calls are unsuccessful.
|
||||
ctx := req.Context()
|
||||
for {
|
||||
discoveryRetries.Inc()
|
||||
if attempt() {
|
||||
return resp, reqErr
|
||||
}
|
||||
sleepTime += sleepTime
|
||||
if sleepTime > maxSleepTime {
|
||||
sleepTime = maxSleepTime
|
||||
if !SleepCtx(ctx, sleepTime) {
|
||||
return resp, reqErr
|
||||
}
|
||||
time.Sleep(sleepTime)
|
||||
discoveryRetries.Inc()
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -301,3 +318,17 @@ var (
|
|||
discoveryRequests = metrics.NewCounter(`vm_promscrape_discovery_requests_total`)
|
||||
discoveryRetries = metrics.NewCounter(`vm_promscrape_discovery_retries_total`)
|
||||
)
|
||||
|
||||
// SleepCtx sleeps for sleepDuration.
|
||||
//
|
||||
// It immediately returns false on ctx cancel or deadline, without waiting for sleepDuration.
|
||||
func SleepCtx(ctx context.Context, sleepDuration time.Duration) bool {
|
||||
t := timerpool.Get(sleepDuration)
|
||||
defer timerpool.Put(t)
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return false
|
||||
case <-t.C:
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
|
|
@ -2022,7 +2022,7 @@ func (pt *partition) createSnapshot(srcDir, dstDir string) error {
|
|||
// Skip non-directories.
|
||||
continue
|
||||
}
|
||||
if fn == "tmp" || fn == "txn" {
|
||||
if fn == "tmp" || fn == "txn" || fs.IsScheduledForRemoval(fn) {
|
||||
// Skip special dirs.
|
||||
continue
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue