Merge branch 'public-single-node' into pmm-6401-read-prometheus-data-files

This commit is contained in:
Aliaksandr Valialkin 2020-10-13 18:37:48 +03:00
commit 4dc13754d8
52 changed files with 2571 additions and 343 deletions

View file

@ -1,11 +1,17 @@
# tip
# [v1.44.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.44.0)
* FEATURE: automatically add missing label filters to binary operands as described at https://utcc.utoronto.ca/~cks/space/blog/sysadmin/PrometheusLabelNonOptimization .
This should improve performance for queries with missing label filters in binary operands. For example, the following query should work faster now, because it shouldn't
fetch and discard time series for `node_filesystem_files_free` metric without matching labels for the left side of the expression:
```
node_filesystem_files{ host="$host", mountpoint="/" } - node_filesystem_files_free
```
* FEATURE: vmagent: add Docker Swarm service discovery (aka [dockerswarm_sd_config](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#dockerswarm_sd_config)).
See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/656
* FEATURE: add ability to export data in CSV format. See [these docs](https://victoriametrics.github.io/#how-to-export-csv-data) for details.
* FEATURE: vmagent: add `-promscrape.suppressDuplicateScrapeTargetErrors` command-line flag for suppressing `duplicate scrape target` errors.
See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/651 and https://victoriametrics.github.io/vmagent.html#troubleshooting .
* FEATURE: vmagent: show original labels before relabeling is applied on `duplicate scrape target` errors. This should simplify debugging for incorrect relabeling.
@ -14,6 +20,45 @@
This should simplify debugging for target relabeling configs. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/651
* FEATURE: add `-finalMergeDelay` command-line flag for configuring the delay before final merge for per-month partitions.
The final merge is started after no new data is ingested into per-month partition during `-finalMergeDelay`.
* FEATURE: add `vm_rows_added_to_storage_total` metric, which shows the total number of rows added to storage since app start.
The `sum(rate(vm_rows_added_to_storage_total))` can be smaller than `sum(rate(vm_rows_inserted_total))` if certain metrics are dropped
due to [relabeling](https://victoriametrics.github.io/#relabeling). The `sum(rate(vm_rows_added_to_storage_total))` can be bigger
than `sum(rate(vm_rows_inserted_total))` if [replication](https://victoriametrics.github.io/Cluster-VictoriaMetrics.html#replication-and-data-safety) is enabled.
* FEATURE: keep metric name after applying [MetricsQL](https://victoriametrics.github.io/MetricsQL.html) functions, which don't change time series meaning.
The list of such functions:
* `keep_last_value`
* `keep_next_value`
* `interpolate`
* `running_min`
* `running_max`
* `running_avg`
* `range_min`
* `range_max`
* `range_avg`
* `range_first`
* `range_last`
* `range_quantile`
* `smooth_exponential`
* `ceil`
* `floor`
* `round`
* `clamp_min`
* `clamp_max`
* `max_over_time`
* `min_over_time`
* `avg_over_time`
* `quantile_over_time`
* `mode_over_time`
* `geomean_over_time`
* `holt_winters`
* `predict_linear`
See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/674
* BUGFIX: properly handle stale time series after K8S deployment. Previously such time series could be double-counted.
See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/748
* BUGFIX: return a single time series at max from `absent()` function like Prometheus does.
* BUGFIX: vmalert: accept days, weeks and years in `for: ` part of config like Prometheus does. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/817
* BUGFIX: fix `mode_over_time(m[d])` calculations. Previously the function could return incorrect results.
# [v1.43.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.43.0)

View file

@ -118,6 +118,7 @@ See [features available for enterprise customers](https://github.com/VictoriaMet
* [How to export time series](#how-to-export-time-series)
* [How to export data in native format](#how-to-export-data-in-native-format)
* [How to export data in JSON line format](#how-to-export-data-in-json-line-format)
* [How to export CSV data](#how-to-export-csv-data)
* [How to import time series data](#how-to-import-time-series-data)
* [How to import data in native format](#how-to-import-data-in-native-format)
* [How to import data in json line format](#how-to-import-data-in-json-line-format)
@ -295,6 +296,7 @@ Currently the following [scrape_config](https://prometheus.io/docs/prometheus/la
* [consul_sd_config](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#consul_sd_config)
* [dns_sd_config](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#dns_sd_config)
* [openstack_sd_config](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#openstack_sd_config)
* [dockerswarm_sd_config](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#dockerswarm_sd_config)
In the future other `*_sd_config` types will be supported.
@ -682,6 +684,7 @@ VictoriaMetrics provides the following handlers for exporting data:
* `/api/v1/export/native` for exporting data in native binary format. This is the most efficient format for data export.
See [these docs](#how-to-export-data-in-native-format) for details.
* `/api/v1/export` for exporing data in JSON line format. See [these docs](#how-to-export-data-in-json-line-format) for details.
* `/api/v1/export/csv` for exporting data in CSV. See [these docs](#how-to-export-csv-data) for details.
#### How to export data in native format
@ -731,13 +734,37 @@ The maximum duration for each request to `/api/v1/export` is limited by `-search
Exported data can be imported via POST'ing it to [/api/v1/import](#how-to-import-data-in-json-line-format).
#### How to export CSV data
Send a request to `http://<victoriametrics-addr>:8428/api/v1/export/csv?format=<format>&match=<timeseries_selector_for_export>`,
where:
* `<format>` must contain comma-delimited label names for the exported CSV. The following special label names are supported:
* `__name__` - metric name
* `__value__` - sample value
* `__timestamp__:<ts_format>` - sample timestamp. `<ts_format>` can have the following values:
* `unix_s` - unix seconds
* `unix_ms` - unix milliseconds
* `unix_ns` - unix nanoseconds
* `rfc3339` - [RFC3339](https://www.ietf.org/rfc/rfc3339.txt) time
* `custom:<layout>` - custom layout for time that is supported by [time.Format](https://golang.org/pkg/time/#Time.Format) function from Go.
* `<timeseries_selector_for_export>` may contain any [time series selector](https://prometheus.io/docs/prometheus/latest/querying/basics/#time-series-selectors)
for metrics to export.
Optional `start` and `end` args may be added to the request in order to limit the time frame for the exported data. These args may contain either
unix timestamp in seconds or [RFC3339](https://www.ietf.org/rfc/rfc3339.txt) values.
The exported CSV data can be imported to VictoriaMetrics via [/api/v1/import/csv](#how-to-import-csv-data).
### How to import time series data
Time series data can be imported via any supported ingestion protocol:
* [Prometheus remote_write API](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#remote_write).
* Influx line protocol. See [these docs](#how-to-send-data-from-influxdb-compatible-agents-such-as-telegraf) for details.
* Graphite plaintext protocol. See[these docs](#how-to-send-data-from-graphite-compatible-agents-such-as-statsd) for details.
* Graphite plaintext protocol. See [these docs](#how-to-send-data-from-graphite-compatible-agents-such-as-statsd) for details.
* OpenTSDB telnet put protocol. See [these docs](#sending-data-via-telnet-put-protocol) for details.
* OpenTSDB http `/api/put` protocol. See [these docs](#sending-opentsdb-data-via-http-apiput-requests) for details.
* `/api/v1/import` for importing data obtained from [/api/v1/export](#how-to-export-data-in-json-line-format).

View file

@ -151,6 +151,8 @@ The following scrape types in [scrape_config](https://prometheus.io/docs/prometh
* `openstack_sd_configs` - for scraping OpenStack targets.
See [openstack_sd_config](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#openstack_sd_config) for details.
[OpenStack identity API v3](https://docs.openstack.org/api-ref/identity/v3/) is supported only.
* `dockerswarm_sd_configs` - for scraping Docker Swarm targets.
See [dockerswarm_sd_config](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#dockerswarm_sd_config) for details.
File feature requests at [our issue tracker](https://github.com/VictoriaMetrics/VictoriaMetrics/issues) if you need other service discovery mechanisms to be supported by `vmagent`.

View file

@ -62,6 +62,7 @@ func insertRows(db string, rows []parser.Row) error {
buf := ctx.buf[:0]
for i := range rows {
r := &rows[i]
rowsTotal += len(r.Fields)
commonLabels = commonLabels[:0]
hasDBKey := false
for j := range r.Tags {
@ -111,7 +112,6 @@ func insertRows(db string, rows []parser.Row) error {
Samples: samples[len(samples)-1:],
})
}
rowsTotal += len(r.Fields)
}
ctx.buf = buf
ctx.ctx.WriteRequest.Timeseries = tssDst

View file

@ -38,6 +38,12 @@ func insertRows(block *parser.Block, extraLabels []prompbmarshal.Label) error {
ctx := common.GetPushCtx()
defer common.PutPushCtx(ctx)
// Update rowsInserted and rowsPerInsert before actual inserting,
// since relabeling can prevent from inserting the rows.
rowsLen := len(block.Values)
rowsInserted.Add(rowsLen)
rowsPerInsert.Update(float64(rowsLen))
tssDst := ctx.WriteRequest.Timeseries[:0]
labels := ctx.Labels[:0]
samples := ctx.Samples[:0]
@ -71,12 +77,9 @@ func insertRows(block *parser.Block, extraLabels []prompbmarshal.Label) error {
Labels: labels[labelsLen:],
Samples: samples[samplesLen:],
})
rowsTotal := len(values)
ctx.WriteRequest.Timeseries = tssDst
ctx.Labels = labels
ctx.Samples = samples
remotewrite.Push(&ctx.WriteRequest)
rowsInserted.Add(rowsTotal)
rowsPerInsert.Update(float64(rowsTotal))
return nil
}

View file

@ -35,6 +35,7 @@ func insertRows(timeseries []prompb.TimeSeries) error {
samples := ctx.Samples[:0]
for i := range timeseries {
ts := &timeseries[i]
rowsTotal += len(ts.Samples)
labelsLen := len(labels)
for i := range ts.Labels {
label := &ts.Labels[i]
@ -55,7 +56,6 @@ func insertRows(timeseries []prompb.TimeSeries) error {
Labels: labels[labelsLen:],
Samples: samples[samplesLen:],
})
rowsTotal += len(ts.Samples)
}
ctx.WriteRequest.Timeseries = tssDst
ctx.Labels = labels

View file

@ -44,6 +44,7 @@ func insertRows(rows []parser.Row, extraLabels []prompbmarshal.Label) error {
samples := ctx.Samples[:0]
for i := range rows {
r := &rows[i]
rowsTotal += len(r.Values)
labelsLen := len(labels)
for j := range r.Tags {
tag := &r.Tags[j]
@ -69,7 +70,6 @@ func insertRows(rows []parser.Row, extraLabels []prompbmarshal.Label) error {
Labels: labels[labelsLen:],
Samples: samples[samplesLen:],
})
rowsTotal += len(values)
}
ctx.WriteRequest.Timeseries = tssDst
ctx.Labels = labels

View file

@ -11,6 +11,7 @@ rules against configured address.
* Prometheus [alerting rules definition format](https://prometheus.io/docs/prometheus/latest/configuration/alerting_rules/#defining-alerting-rules)
support;
* Integration with [Alertmanager](https://github.com/prometheus/alertmanager);
* Keeps the alerts [state on restarts](https://github.com/VictoriaMetrics/VictoriaMetrics/tree/master/app/vmalert#alerts-state-on-restarts);
* Lightweight without extra dependencies.
### Limitations:
@ -121,14 +122,6 @@ annotations:
[ <labelname>: <tmpl_string> ]
```
`vmalert` has no local storage and alerts state is stored in process memory. Hence, after reloading of `vmalert` process
alerts state will be lost. To avoid this situation, `vmalert` may be configured via following flags:
* `-remoteWrite.url` - URL to Victoria Metrics or VMInsert. `vmalert` will persist alerts state into the configured
address in form of timeseries with name `ALERTS` via remote-write protocol.
* `-remoteRead.url` - URL to Victoria Metrics or VMSelect. `vmalert` will try to restore alerts state from configured
address by querying `ALERTS` timeseries.
##### Recording rules
The syntax for recording rules is following:
@ -147,6 +140,22 @@ labels:
For recording rules to work `-remoteWrite.url` must specified.
#### Alerts state on restarts
`vmalert` has no local storage, so alerts state is stored in the process memory. Hence, after reloading of `vmalert`
the process alerts state will be lost. To avoid this situation, `vmalert` should be configured via the following flags:
* `-remoteWrite.url` - URL to VictoriaMetrics (Single) or VMInsert (Cluster). `vmalert` will persist alerts state
into the configured address in the form of time series named `ALERTS` and `ALERTS_FOR_STATE` via remote-write protocol.
These are regular time series and may be queried from VM just as any other time series.
The state stored to the configured address on every rule evaluation.
* `-remoteRead.url` - URL to VictoriaMetrics (Single) or VMSelect (Cluster). `vmalert` will try to restore alerts state
from configured address by querying time series with name `ALERTS_FOR_STATE`.
Both flags are required for the proper state restoring. Restore process may fail if time series are missing
in configured `-remoteRead.url`, weren't updated in the last `1h` or received state doesn't match current `vmalert`
rules configuration.
#### WEB
`vmalert` runs a web-server (`-httpListenAddr`) for serving metrics and alerts endpoints:

View file

@ -90,6 +90,9 @@ func main() {
if err := a.Run(); err != nil {
logger.Fatalf("cannot create backup: %s", err)
}
srcFS.MustStop()
dstFS.MustStop()
originFS.MustStop()
}
func usage() {

View file

@ -65,6 +65,7 @@ func insertRows(db string, rows []parser.Row) error {
hasRelabeling := relabel.HasRelabeling()
for i := range rows {
r := &rows[i]
rowsTotal += len(r.Fields)
ic.Labels = ic.Labels[:0]
hasDBKey := false
for j := range r.Tags {
@ -125,7 +126,6 @@ func insertRows(db string, rows []parser.Row) error {
}
}
}
rowsTotal += len(r.Fields)
}
rowsInserted.Add(rowsTotal)
rowsPerInsert.Update(float64(rowsTotal))

View file

@ -38,7 +38,12 @@ func insertRows(block *parser.Block, extraLabels []prompbmarshal.Label) error {
ctx := getPushCtx()
defer putPushCtx(ctx)
// Update rowsInserted and rowsPerInsert before actual inserting,
// since relabeling can prevent from inserting the rows.
rowsLen := len(block.Values)
rowsInserted.Add(rowsLen)
rowsPerInsert.Update(float64(rowsLen))
ic := &ctx.Common
ic.Reset(rowsLen)
hasRelabeling := relabel.HasRelabeling()
@ -72,9 +77,6 @@ func insertRows(block *parser.Block, extraLabels []prompbmarshal.Label) error {
return err
}
}
rowsTotal := len(values)
rowsInserted.Add(rowsTotal)
rowsPerInsert.Update(float64(rowsTotal))
return ic.FlushBufs()
}

View file

@ -51,6 +51,7 @@ func push(ctx *common.InsertCtx, tss []prompbmarshal.TimeSeries) {
rowsTotal := 0
for i := range tss {
ts := &tss[i]
rowsTotal += len(ts.Samples)
ctx.Labels = ctx.Labels[:0]
for j := range ts.Labels {
label := &ts.Labels[j]
@ -71,7 +72,6 @@ func push(ctx *common.InsertCtx, tss []prompbmarshal.TimeSeries) {
return
}
}
rowsTotal += len(ts.Samples)
}
rowsInserted.Add(rowsTotal)
rowsPerInsert.Update(float64(rowsTotal))

View file

@ -36,6 +36,7 @@ func insertRows(timeseries []prompb.TimeSeries) error {
hasRelabeling := relabel.HasRelabeling()
for i := range timeseries {
ts := &timeseries[i]
rowsTotal += len(ts.Samples)
ctx.Labels = ctx.Labels[:0]
srcLabels := ts.Labels
for _, srcLabel := range srcLabels {
@ -58,7 +59,6 @@ func insertRows(timeseries []prompb.TimeSeries) error {
return err
}
}
rowsTotal += len(samples)
}
rowsInserted.Add(rowsTotal)
rowsPerInsert.Update(float64(rowsTotal))

View file

@ -50,6 +50,7 @@ func insertRows(rows []parser.Row, extraLabels []prompbmarshal.Label) error {
hasRelabeling := relabel.HasRelabeling()
for i := range rows {
r := &rows[i]
rowsTotal += len(r.Values)
ic.Labels = ic.Labels[:0]
for j := range r.Tags {
tag := &r.Tags[j]
@ -78,7 +79,6 @@ func insertRows(rows []parser.Row, extraLabels []prompbmarshal.Label) error {
return err
}
}
rowsTotal += len(values)
}
rowsInserted.Add(rowsTotal)
rowsPerInsert.Update(float64(rowsTotal))

View file

@ -52,6 +52,8 @@ func main() {
if err := a.Run(); err != nil {
logger.Fatalf("cannot restore from backup: %s", err)
}
srcFS.MustStop()
dstFS.MustStop()
}
func usage() {

View file

@ -203,6 +203,14 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool {
return true
}
return true
case "/api/v1/export/csv":
exportCSVRequests.Inc()
if err := prometheus.ExportCSVHandler(startTime, w, r); err != nil {
exportCSVErrors.Inc()
httpserver.Errorf(w, r, "error in %q: %s", r.URL.Path, err)
return true
}
return true
case "/api/v1/export/native":
exportNativeRequests.Inc()
if err := prometheus.ExportNativeHandler(startTime, w, r); err != nil {
@ -329,6 +337,9 @@ var (
exportRequests = metrics.NewCounter(`vm_http_requests_total{path="/api/v1/export"}`)
exportErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/api/v1/export"}`)
exportCSVRequests = metrics.NewCounter(`vm_http_requests_total{path="/api/v1/export/csv"}`)
exportCSVErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/api/v1/export/csv"}`)
exportNativeRequests = metrics.NewCounter(`vm_http_requests_total{path="/api/v1/export/native"}`)
exportNativeErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/api/v1/export/native"}`)

View file

@ -1,10 +1,83 @@
{% import (
"bytes"
"strings"
"time"
"github.com/valyala/quicktemplate"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
) %}
{% stripspace %}
{% func ExportCSVLine(xb *exportBlock, fieldNames []string) %}
{% if len(xb.timestamps) == 0 || len(fieldNames) == 0 %}{% return %}{% endif %}
{% for i, timestamp := range xb.timestamps %}
{% code value := xb.values[i] %}
{%= exportCSVField(xb.mn, fieldNames[0], timestamp, value) %}
{% for _, fieldName := range fieldNames[1:] %}
,
{%= exportCSVField(xb.mn, fieldName, timestamp, value) %}
{% endfor %}
{% newline %}
{% endfor %}
{% endfunc %}
{% func exportCSVField(mn *storage.MetricName, fieldName string, timestamp int64, value float64) %}
{% if fieldName == "__value__" %}
{%f= value %}
{% return %}
{% endif %}
{% if fieldName == "__timestamp__" %}
{%dl timestamp %}
{% return %}
{% endif %}
{% if strings.HasPrefix(fieldName, "__timestamp__:") %}
{% code timeFormat := fieldName[len("__timestamp__:"):] %}
{% switch timeFormat %}
{% case "unix_s" %}
{%dl= timestamp/1000 %}
{% case "unix_ms" %}
{%dl= timestamp %}
{% case "unix_ns" %}
{%dl= timestamp*1e6 %}
{% case "rfc3339" %}
{% code
bb := quicktemplate.AcquireByteBuffer()
bb.B = time.Unix(timestamp/1000, (timestamp%1000)*1e6).AppendFormat(bb.B[:0], time.RFC3339)
%}
{%z= bb.B %}
{% code
quicktemplate.ReleaseByteBuffer(bb)
%}
{% default %}
{% if strings.HasPrefix(timeFormat, "custom:") %}
{% code
layout := timeFormat[len("custom:"):]
bb := quicktemplate.AcquireByteBuffer()
bb.B = time.Unix(timestamp/1000, (timestamp%1000)*1e6).AppendFormat(bb.B[:0], layout)
%}
{% if bytes.ContainsAny(bb.B, `"`+",\n") %}
{%qz bb.B %}
{% else %}
{%z= bb.B %}
{% endif %}
{% code
quicktemplate.ReleaseByteBuffer(bb)
%}
{% else %}
Unsupported timeFormat={%s= timeFormat %}
{% endif %}
{% endswitch %}
{% return %}
{% endif %}
{% code v := mn.GetTagValue(fieldName) %}
{% if bytes.ContainsAny(v, `"`+",\n") %}
{%qz= v %}
{% else %}
{%z= v %}
{% endif %}
{% endfunc %}
{% func ExportPrometheusLine(xb *exportBlock) %}
{% if len(xb.timestamps) == 0 %}{% return %}{% endif %}
{% code bb := quicktemplate.AcquireByteBuffer() %}

View file

@ -6,306 +6,201 @@ package prometheus
//line app/vmselect/prometheus/export.qtpl:1
import (
"bytes"
"strings"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
"github.com/valyala/quicktemplate"
)
//line app/vmselect/prometheus/export.qtpl:8
//line app/vmselect/prometheus/export.qtpl:12
import (
qtio422016 "io"
qt422016 "github.com/valyala/quicktemplate"
)
//line app/vmselect/prometheus/export.qtpl:8
//line app/vmselect/prometheus/export.qtpl:12
var (
_ = qtio422016.Copy
_ = qt422016.AcquireByteBuffer
)
//line app/vmselect/prometheus/export.qtpl:8
func StreamExportPrometheusLine(qw422016 *qt422016.Writer, xb *exportBlock) {
//line app/vmselect/prometheus/export.qtpl:9
if len(xb.timestamps) == 0 {
//line app/vmselect/prometheus/export.qtpl:9
return
//line app/vmselect/prometheus/export.qtpl:9
}
//line app/vmselect/prometheus/export.qtpl:10
bb := quicktemplate.AcquireByteBuffer()
//line app/vmselect/prometheus/export.qtpl:11
writeprometheusMetricName(bb, xb.mn)
//line app/vmselect/prometheus/export.qtpl:12
for i, ts := range xb.timestamps {
func StreamExportCSVLine(qw422016 *qt422016.Writer, xb *exportBlock, fieldNames []string) {
//line app/vmselect/prometheus/export.qtpl:13
qw422016.N().Z(bb.B)
if len(xb.timestamps) == 0 || len(fieldNames) == 0 {
//line app/vmselect/prometheus/export.qtpl:13
qw422016.N().S(` `)
return
//line app/vmselect/prometheus/export.qtpl:13
}
//line app/vmselect/prometheus/export.qtpl:14
qw422016.N().F(xb.values[i])
//line app/vmselect/prometheus/export.qtpl:14
qw422016.N().S(` `)
//line app/vmselect/prometheus/export.qtpl:15
qw422016.N().DL(ts)
for i, timestamp := range xb.timestamps {
//line app/vmselect/prometheus/export.qtpl:15
value := xb.values[i]
//line app/vmselect/prometheus/export.qtpl:16
streamexportCSVField(qw422016, xb.mn, fieldNames[0], timestamp, value)
//line app/vmselect/prometheus/export.qtpl:17
for _, fieldName := range fieldNames[1:] {
//line app/vmselect/prometheus/export.qtpl:17
qw422016.N().S(`,`)
//line app/vmselect/prometheus/export.qtpl:19
streamexportCSVField(qw422016, xb.mn, fieldName, timestamp, value)
//line app/vmselect/prometheus/export.qtpl:20
}
//line app/vmselect/prometheus/export.qtpl:21
qw422016.N().S(`
`)
//line app/vmselect/prometheus/export.qtpl:16
//line app/vmselect/prometheus/export.qtpl:22
}
//line app/vmselect/prometheus/export.qtpl:17
quicktemplate.ReleaseByteBuffer(bb)
//line app/vmselect/prometheus/export.qtpl:18
//line app/vmselect/prometheus/export.qtpl:23
}
//line app/vmselect/prometheus/export.qtpl:18
func WriteExportPrometheusLine(qq422016 qtio422016.Writer, xb *exportBlock) {
//line app/vmselect/prometheus/export.qtpl:18
//line app/vmselect/prometheus/export.qtpl:23
func WriteExportCSVLine(qq422016 qtio422016.Writer, xb *exportBlock, fieldNames []string) {
//line app/vmselect/prometheus/export.qtpl:23
qw422016 := qt422016.AcquireWriter(qq422016)
//line app/vmselect/prometheus/export.qtpl:18
StreamExportPrometheusLine(qw422016, xb)
//line app/vmselect/prometheus/export.qtpl:18
//line app/vmselect/prometheus/export.qtpl:23
StreamExportCSVLine(qw422016, xb, fieldNames)
//line app/vmselect/prometheus/export.qtpl:23
qt422016.ReleaseWriter(qw422016)
//line app/vmselect/prometheus/export.qtpl:18
//line app/vmselect/prometheus/export.qtpl:23
}
//line app/vmselect/prometheus/export.qtpl:18
func ExportPrometheusLine(xb *exportBlock) string {
//line app/vmselect/prometheus/export.qtpl:18
//line app/vmselect/prometheus/export.qtpl:23
func ExportCSVLine(xb *exportBlock, fieldNames []string) string {
//line app/vmselect/prometheus/export.qtpl:23
qb422016 := qt422016.AcquireByteBuffer()
//line app/vmselect/prometheus/export.qtpl:18
WriteExportPrometheusLine(qb422016, xb)
//line app/vmselect/prometheus/export.qtpl:18
//line app/vmselect/prometheus/export.qtpl:23
WriteExportCSVLine(qb422016, xb, fieldNames)
//line app/vmselect/prometheus/export.qtpl:23
qs422016 := string(qb422016.B)
//line app/vmselect/prometheus/export.qtpl:18
//line app/vmselect/prometheus/export.qtpl:23
qt422016.ReleaseByteBuffer(qb422016)
//line app/vmselect/prometheus/export.qtpl:18
//line app/vmselect/prometheus/export.qtpl:23
return qs422016
//line app/vmselect/prometheus/export.qtpl:18
//line app/vmselect/prometheus/export.qtpl:23
}
//line app/vmselect/prometheus/export.qtpl:20
func StreamExportJSONLine(qw422016 *qt422016.Writer, xb *exportBlock) {
//line app/vmselect/prometheus/export.qtpl:21
if len(xb.timestamps) == 0 {
//line app/vmselect/prometheus/export.qtpl:21
return
//line app/vmselect/prometheus/export.qtpl:21
}
//line app/vmselect/prometheus/export.qtpl:21
qw422016.N().S(`{"metric":`)
//line app/vmselect/prometheus/export.qtpl:23
streammetricNameObject(qw422016, xb.mn)
//line app/vmselect/prometheus/export.qtpl:23
qw422016.N().S(`,"values":[`)
//line app/vmselect/prometheus/export.qtpl:25
if len(xb.values) > 0 {
func streamexportCSVField(qw422016 *qt422016.Writer, mn *storage.MetricName, fieldName string, timestamp int64, value float64) {
//line app/vmselect/prometheus/export.qtpl:26
values := xb.values
if fieldName == "__value__" {
//line app/vmselect/prometheus/export.qtpl:27
qw422016.N().F(values[0])
qw422016.N().F(value)
//line app/vmselect/prometheus/export.qtpl:28
values = values[1:]
return
//line app/vmselect/prometheus/export.qtpl:29
for _, v := range values {
//line app/vmselect/prometheus/export.qtpl:29
qw422016.N().S(`,`)
}
//line app/vmselect/prometheus/export.qtpl:30
qw422016.N().F(v)
if fieldName == "__timestamp__" {
//line app/vmselect/prometheus/export.qtpl:31
}
qw422016.N().DL(timestamp)
//line app/vmselect/prometheus/export.qtpl:32
return
//line app/vmselect/prometheus/export.qtpl:33
}
//line app/vmselect/prometheus/export.qtpl:32
qw422016.N().S(`],"timestamps":[`)
//line app/vmselect/prometheus/export.qtpl:34
if strings.HasPrefix(fieldName, "__timestamp__:") {
//line app/vmselect/prometheus/export.qtpl:35
if len(xb.timestamps) > 0 {
timeFormat := fieldName[len("__timestamp__:"):]
//line app/vmselect/prometheus/export.qtpl:36
timestamps := xb.timestamps
switch timeFormat {
//line app/vmselect/prometheus/export.qtpl:37
qw422016.N().DL(timestamps[0])
case "unix_s":
//line app/vmselect/prometheus/export.qtpl:38
timestamps = timestamps[1:]
qw422016.N().DL(timestamp / 1000)
//line app/vmselect/prometheus/export.qtpl:39
for _, ts := range timestamps {
//line app/vmselect/prometheus/export.qtpl:39
qw422016.N().S(`,`)
case "unix_ms":
//line app/vmselect/prometheus/export.qtpl:40
qw422016.N().DL(ts)
qw422016.N().DL(timestamp)
//line app/vmselect/prometheus/export.qtpl:41
}
case "unix_ns":
//line app/vmselect/prometheus/export.qtpl:42
}
//line app/vmselect/prometheus/export.qtpl:42
qw422016.N().S(`]}`)
//line app/vmselect/prometheus/export.qtpl:44
qw422016.N().S(`
`)
qw422016.N().DL(timestamp * 1e6)
//line app/vmselect/prometheus/export.qtpl:43
case "rfc3339":
//line app/vmselect/prometheus/export.qtpl:45
}
bb := quicktemplate.AcquireByteBuffer()
bb.B = time.Unix(timestamp/1000, (timestamp%1000)*1e6).AppendFormat(bb.B[:0], time.RFC3339)
//line app/vmselect/prometheus/export.qtpl:45
func WriteExportJSONLine(qq422016 qtio422016.Writer, xb *exportBlock) {
//line app/vmselect/prometheus/export.qtpl:45
qw422016 := qt422016.AcquireWriter(qq422016)
//line app/vmselect/prometheus/export.qtpl:45
StreamExportJSONLine(qw422016, xb)
//line app/vmselect/prometheus/export.qtpl:45
qt422016.ReleaseWriter(qw422016)
//line app/vmselect/prometheus/export.qtpl:45
}
//line app/vmselect/prometheus/export.qtpl:45
func ExportJSONLine(xb *exportBlock) string {
//line app/vmselect/prometheus/export.qtpl:45
qb422016 := qt422016.AcquireByteBuffer()
//line app/vmselect/prometheus/export.qtpl:45
WriteExportJSONLine(qb422016, xb)
//line app/vmselect/prometheus/export.qtpl:45
qs422016 := string(qb422016.B)
//line app/vmselect/prometheus/export.qtpl:45
qt422016.ReleaseByteBuffer(qb422016)
//line app/vmselect/prometheus/export.qtpl:45
return qs422016
//line app/vmselect/prometheus/export.qtpl:45
}
//line app/vmselect/prometheus/export.qtpl:47
func StreamExportPromAPILine(qw422016 *qt422016.Writer, xb *exportBlock) {
//line app/vmselect/prometheus/export.qtpl:47
qw422016.N().S(`{"metric":`)
//line app/vmselect/prometheus/export.qtpl:49
streammetricNameObject(qw422016, xb.mn)
//line app/vmselect/prometheus/export.qtpl:49
qw422016.N().S(`,"values":`)
//line app/vmselect/prometheus/export.qtpl:50
streamvaluesWithTimestamps(qw422016, xb.values, xb.timestamps)
//line app/vmselect/prometheus/export.qtpl:50
qw422016.N().S(`}`)
//line app/vmselect/prometheus/export.qtpl:52
}
//line app/vmselect/prometheus/export.qtpl:52
func WriteExportPromAPILine(qq422016 qtio422016.Writer, xb *exportBlock) {
//line app/vmselect/prometheus/export.qtpl:52
qw422016 := qt422016.AcquireWriter(qq422016)
//line app/vmselect/prometheus/export.qtpl:52
StreamExportPromAPILine(qw422016, xb)
//line app/vmselect/prometheus/export.qtpl:52
qt422016.ReleaseWriter(qw422016)
//line app/vmselect/prometheus/export.qtpl:52
}
//line app/vmselect/prometheus/export.qtpl:52
func ExportPromAPILine(xb *exportBlock) string {
//line app/vmselect/prometheus/export.qtpl:52
qb422016 := qt422016.AcquireByteBuffer()
//line app/vmselect/prometheus/export.qtpl:52
WriteExportPromAPILine(qb422016, xb)
//line app/vmselect/prometheus/export.qtpl:52
qs422016 := string(qb422016.B)
//line app/vmselect/prometheus/export.qtpl:52
qt422016.ReleaseByteBuffer(qb422016)
//line app/vmselect/prometheus/export.qtpl:52
return qs422016
//line app/vmselect/prometheus/export.qtpl:52
}
//line app/vmselect/prometheus/export.qtpl:54
func StreamExportPromAPIResponse(qw422016 *qt422016.Writer, resultsCh <-chan *quicktemplate.ByteBuffer) {
//line app/vmselect/prometheus/export.qtpl:54
qw422016.N().S(`{"status":"success","data":{"resultType":"matrix","result":[`)
//line app/vmselect/prometheus/export.qtpl:60
bb, ok := <-resultsCh
//line app/vmselect/prometheus/export.qtpl:61
if ok {
//line app/vmselect/prometheus/export.qtpl:62
qw422016.N().Z(bb.B)
//line app/vmselect/prometheus/export.qtpl:63
quicktemplate.ReleaseByteBuffer(bb)
//line app/vmselect/prometheus/export.qtpl:64
for bb := range resultsCh {
//line app/vmselect/prometheus/export.qtpl:64
qw422016.N().S(`,`)
//line app/vmselect/prometheus/export.qtpl:65
//line app/vmselect/prometheus/export.qtpl:48
qw422016.N().Z(bb.B)
//line app/vmselect/prometheus/export.qtpl:66
//line app/vmselect/prometheus/export.qtpl:50
quicktemplate.ReleaseByteBuffer(bb)
//line app/vmselect/prometheus/export.qtpl:52
default:
//line app/vmselect/prometheus/export.qtpl:53
if strings.HasPrefix(timeFormat, "custom:") {
//line app/vmselect/prometheus/export.qtpl:55
layout := timeFormat[len("custom:"):]
bb := quicktemplate.AcquireByteBuffer()
bb.B = time.Unix(timestamp/1000, (timestamp%1000)*1e6).AppendFormat(bb.B[:0], layout)
//line app/vmselect/prometheus/export.qtpl:59
if bytes.ContainsAny(bb.B, `"`+",\n") {
//line app/vmselect/prometheus/export.qtpl:60
qw422016.E().QZ(bb.B)
//line app/vmselect/prometheus/export.qtpl:61
} else {
//line app/vmselect/prometheus/export.qtpl:62
qw422016.N().Z(bb.B)
//line app/vmselect/prometheus/export.qtpl:63
}
//line app/vmselect/prometheus/export.qtpl:65
quicktemplate.ReleaseByteBuffer(bb)
//line app/vmselect/prometheus/export.qtpl:67
} else {
//line app/vmselect/prometheus/export.qtpl:67
qw422016.N().S(`Unsupported timeFormat=`)
//line app/vmselect/prometheus/export.qtpl:68
qw422016.N().S(timeFormat)
//line app/vmselect/prometheus/export.qtpl:69
}
//line app/vmselect/prometheus/export.qtpl:70
}
//line app/vmselect/prometheus/export.qtpl:68
//line app/vmselect/prometheus/export.qtpl:71
return
//line app/vmselect/prometheus/export.qtpl:72
}
//line app/vmselect/prometheus/export.qtpl:68
qw422016.N().S(`]}}`)
//line app/vmselect/prometheus/export.qtpl:72
}
//line app/vmselect/prometheus/export.qtpl:72
func WriteExportPromAPIResponse(qq422016 qtio422016.Writer, resultsCh <-chan *quicktemplate.ByteBuffer) {
//line app/vmselect/prometheus/export.qtpl:72
qw422016 := qt422016.AcquireWriter(qq422016)
//line app/vmselect/prometheus/export.qtpl:72
StreamExportPromAPIResponse(qw422016, resultsCh)
//line app/vmselect/prometheus/export.qtpl:72
qt422016.ReleaseWriter(qw422016)
//line app/vmselect/prometheus/export.qtpl:72
}
//line app/vmselect/prometheus/export.qtpl:72
func ExportPromAPIResponse(resultsCh <-chan *quicktemplate.ByteBuffer) string {
//line app/vmselect/prometheus/export.qtpl:72
qb422016 := qt422016.AcquireByteBuffer()
//line app/vmselect/prometheus/export.qtpl:72
WriteExportPromAPIResponse(qb422016, resultsCh)
//line app/vmselect/prometheus/export.qtpl:72
qs422016 := string(qb422016.B)
//line app/vmselect/prometheus/export.qtpl:72
qt422016.ReleaseByteBuffer(qb422016)
//line app/vmselect/prometheus/export.qtpl:72
return qs422016
//line app/vmselect/prometheus/export.qtpl:72
}
//line app/vmselect/prometheus/export.qtpl:73
v := mn.GetTagValue(fieldName)
//line app/vmselect/prometheus/export.qtpl:74
func StreamExportStdResponse(qw422016 *qt422016.Writer, resultsCh <-chan *quicktemplate.ByteBuffer) {
if bytes.ContainsAny(v, `"`+",\n") {
//line app/vmselect/prometheus/export.qtpl:75
for bb := range resultsCh {
qw422016.N().QZ(v)
//line app/vmselect/prometheus/export.qtpl:76
qw422016.N().Z(bb.B)
} else {
//line app/vmselect/prometheus/export.qtpl:77
quicktemplate.ReleaseByteBuffer(bb)
qw422016.N().Z(v)
//line app/vmselect/prometheus/export.qtpl:78
}
//line app/vmselect/prometheus/export.qtpl:79
}
//line app/vmselect/prometheus/export.qtpl:79
func WriteExportStdResponse(qq422016 qtio422016.Writer, resultsCh <-chan *quicktemplate.ByteBuffer) {
func writeexportCSVField(qq422016 qtio422016.Writer, mn *storage.MetricName, fieldName string, timestamp int64, value float64) {
//line app/vmselect/prometheus/export.qtpl:79
qw422016 := qt422016.AcquireWriter(qq422016)
//line app/vmselect/prometheus/export.qtpl:79
StreamExportStdResponse(qw422016, resultsCh)
streamexportCSVField(qw422016, mn, fieldName, timestamp, value)
//line app/vmselect/prometheus/export.qtpl:79
qt422016.ReleaseWriter(qw422016)
//line app/vmselect/prometheus/export.qtpl:79
}
//line app/vmselect/prometheus/export.qtpl:79
func ExportStdResponse(resultsCh <-chan *quicktemplate.ByteBuffer) string {
func exportCSVField(mn *storage.MetricName, fieldName string, timestamp int64, value float64) string {
//line app/vmselect/prometheus/export.qtpl:79
qb422016 := qt422016.AcquireByteBuffer()
//line app/vmselect/prometheus/export.qtpl:79
WriteExportStdResponse(qb422016, resultsCh)
writeexportCSVField(qb422016, mn, fieldName, timestamp, value)
//line app/vmselect/prometheus/export.qtpl:79
qs422016 := string(qb422016.B)
//line app/vmselect/prometheus/export.qtpl:79
@ -316,69 +211,361 @@ func ExportStdResponse(resultsCh <-chan *quicktemplate.ByteBuffer) string {
}
//line app/vmselect/prometheus/export.qtpl:81
func streamprometheusMetricName(qw422016 *qt422016.Writer, mn *storage.MetricName) {
func StreamExportPrometheusLine(qw422016 *qt422016.Writer, xb *exportBlock) {
//line app/vmselect/prometheus/export.qtpl:82
qw422016.N().Z(mn.MetricGroup)
if len(xb.timestamps) == 0 {
//line app/vmselect/prometheus/export.qtpl:82
return
//line app/vmselect/prometheus/export.qtpl:82
}
//line app/vmselect/prometheus/export.qtpl:83
if len(mn.Tags) > 0 {
//line app/vmselect/prometheus/export.qtpl:83
qw422016.N().S(`{`)
bb := quicktemplate.AcquireByteBuffer()
//line app/vmselect/prometheus/export.qtpl:84
writeprometheusMetricName(bb, xb.mn)
//line app/vmselect/prometheus/export.qtpl:85
tags := mn.Tags
for i, ts := range xb.timestamps {
//line app/vmselect/prometheus/export.qtpl:86
qw422016.N().Z(tags[0].Key)
qw422016.N().Z(bb.B)
//line app/vmselect/prometheus/export.qtpl:86
qw422016.N().S(`=`)
//line app/vmselect/prometheus/export.qtpl:86
qw422016.N().QZ(tags[0].Value)
qw422016.N().S(` `)
//line app/vmselect/prometheus/export.qtpl:87
tags = tags[1:]
qw422016.N().F(xb.values[i])
//line app/vmselect/prometheus/export.qtpl:87
qw422016.N().S(` `)
//line app/vmselect/prometheus/export.qtpl:88
for i := range tags {
qw422016.N().DL(ts)
//line app/vmselect/prometheus/export.qtpl:88
qw422016.N().S(`
`)
//line app/vmselect/prometheus/export.qtpl:89
tag := &tags[i]
}
//line app/vmselect/prometheus/export.qtpl:90
quicktemplate.ReleaseByteBuffer(bb)
//line app/vmselect/prometheus/export.qtpl:89
qw422016.N().S(`,`)
//line app/vmselect/prometheus/export.qtpl:90
qw422016.N().Z(tag.Key)
//line app/vmselect/prometheus/export.qtpl:90
qw422016.N().S(`=`)
//line app/vmselect/prometheus/export.qtpl:90
qw422016.N().QZ(tag.Value)
//line app/vmselect/prometheus/export.qtpl:91
}
}
//line app/vmselect/prometheus/export.qtpl:91
qw422016.N().S(`}`)
func WriteExportPrometheusLine(qq422016 qtio422016.Writer, xb *exportBlock) {
//line app/vmselect/prometheus/export.qtpl:91
qw422016 := qt422016.AcquireWriter(qq422016)
//line app/vmselect/prometheus/export.qtpl:91
StreamExportPrometheusLine(qw422016, xb)
//line app/vmselect/prometheus/export.qtpl:91
qt422016.ReleaseWriter(qw422016)
//line app/vmselect/prometheus/export.qtpl:91
}
//line app/vmselect/prometheus/export.qtpl:91
func ExportPrometheusLine(xb *exportBlock) string {
//line app/vmselect/prometheus/export.qtpl:91
qb422016 := qt422016.AcquireByteBuffer()
//line app/vmselect/prometheus/export.qtpl:91
WriteExportPrometheusLine(qb422016, xb)
//line app/vmselect/prometheus/export.qtpl:91
qs422016 := string(qb422016.B)
//line app/vmselect/prometheus/export.qtpl:91
qt422016.ReleaseByteBuffer(qb422016)
//line app/vmselect/prometheus/export.qtpl:91
return qs422016
//line app/vmselect/prometheus/export.qtpl:91
}
//line app/vmselect/prometheus/export.qtpl:93
func StreamExportJSONLine(qw422016 *qt422016.Writer, xb *exportBlock) {
//line app/vmselect/prometheus/export.qtpl:94
if len(xb.timestamps) == 0 {
//line app/vmselect/prometheus/export.qtpl:94
return
//line app/vmselect/prometheus/export.qtpl:94
}
//line app/vmselect/prometheus/export.qtpl:94
qw422016.N().S(`{"metric":`)
//line app/vmselect/prometheus/export.qtpl:96
streammetricNameObject(qw422016, xb.mn)
//line app/vmselect/prometheus/export.qtpl:96
qw422016.N().S(`,"values":[`)
//line app/vmselect/prometheus/export.qtpl:98
if len(xb.values) > 0 {
//line app/vmselect/prometheus/export.qtpl:99
values := xb.values
//line app/vmselect/prometheus/export.qtpl:100
qw422016.N().F(values[0])
//line app/vmselect/prometheus/export.qtpl:101
values = values[1:]
//line app/vmselect/prometheus/export.qtpl:102
for _, v := range values {
//line app/vmselect/prometheus/export.qtpl:102
qw422016.N().S(`,`)
//line app/vmselect/prometheus/export.qtpl:103
qw422016.N().F(v)
//line app/vmselect/prometheus/export.qtpl:104
}
//line app/vmselect/prometheus/export.qtpl:105
}
//line app/vmselect/prometheus/export.qtpl:105
qw422016.N().S(`],"timestamps":[`)
//line app/vmselect/prometheus/export.qtpl:108
if len(xb.timestamps) > 0 {
//line app/vmselect/prometheus/export.qtpl:109
timestamps := xb.timestamps
//line app/vmselect/prometheus/export.qtpl:110
qw422016.N().DL(timestamps[0])
//line app/vmselect/prometheus/export.qtpl:111
timestamps = timestamps[1:]
//line app/vmselect/prometheus/export.qtpl:112
for _, ts := range timestamps {
//line app/vmselect/prometheus/export.qtpl:112
qw422016.N().S(`,`)
//line app/vmselect/prometheus/export.qtpl:113
qw422016.N().DL(ts)
//line app/vmselect/prometheus/export.qtpl:114
}
//line app/vmselect/prometheus/export.qtpl:115
}
//line app/vmselect/prometheus/export.qtpl:115
qw422016.N().S(`]}`)
//line app/vmselect/prometheus/export.qtpl:117
qw422016.N().S(`
`)
//line app/vmselect/prometheus/export.qtpl:118
}
//line app/vmselect/prometheus/export.qtpl:94
func writeprometheusMetricName(qq422016 qtio422016.Writer, mn *storage.MetricName) {
//line app/vmselect/prometheus/export.qtpl:94
//line app/vmselect/prometheus/export.qtpl:118
func WriteExportJSONLine(qq422016 qtio422016.Writer, xb *exportBlock) {
//line app/vmselect/prometheus/export.qtpl:118
qw422016 := qt422016.AcquireWriter(qq422016)
//line app/vmselect/prometheus/export.qtpl:94
streamprometheusMetricName(qw422016, mn)
//line app/vmselect/prometheus/export.qtpl:94
//line app/vmselect/prometheus/export.qtpl:118
StreamExportJSONLine(qw422016, xb)
//line app/vmselect/prometheus/export.qtpl:118
qt422016.ReleaseWriter(qw422016)
//line app/vmselect/prometheus/export.qtpl:94
//line app/vmselect/prometheus/export.qtpl:118
}
//line app/vmselect/prometheus/export.qtpl:94
func prometheusMetricName(mn *storage.MetricName) string {
//line app/vmselect/prometheus/export.qtpl:94
//line app/vmselect/prometheus/export.qtpl:118
func ExportJSONLine(xb *exportBlock) string {
//line app/vmselect/prometheus/export.qtpl:118
qb422016 := qt422016.AcquireByteBuffer()
//line app/vmselect/prometheus/export.qtpl:94
writeprometheusMetricName(qb422016, mn)
//line app/vmselect/prometheus/export.qtpl:94
//line app/vmselect/prometheus/export.qtpl:118
WriteExportJSONLine(qb422016, xb)
//line app/vmselect/prometheus/export.qtpl:118
qs422016 := string(qb422016.B)
//line app/vmselect/prometheus/export.qtpl:94
//line app/vmselect/prometheus/export.qtpl:118
qt422016.ReleaseByteBuffer(qb422016)
//line app/vmselect/prometheus/export.qtpl:94
//line app/vmselect/prometheus/export.qtpl:118
return qs422016
//line app/vmselect/prometheus/export.qtpl:94
//line app/vmselect/prometheus/export.qtpl:118
}
//line app/vmselect/prometheus/export.qtpl:120
func StreamExportPromAPILine(qw422016 *qt422016.Writer, xb *exportBlock) {
//line app/vmselect/prometheus/export.qtpl:120
qw422016.N().S(`{"metric":`)
//line app/vmselect/prometheus/export.qtpl:122
streammetricNameObject(qw422016, xb.mn)
//line app/vmselect/prometheus/export.qtpl:122
qw422016.N().S(`,"values":`)
//line app/vmselect/prometheus/export.qtpl:123
streamvaluesWithTimestamps(qw422016, xb.values, xb.timestamps)
//line app/vmselect/prometheus/export.qtpl:123
qw422016.N().S(`}`)
//line app/vmselect/prometheus/export.qtpl:125
}
//line app/vmselect/prometheus/export.qtpl:125
func WriteExportPromAPILine(qq422016 qtio422016.Writer, xb *exportBlock) {
//line app/vmselect/prometheus/export.qtpl:125
qw422016 := qt422016.AcquireWriter(qq422016)
//line app/vmselect/prometheus/export.qtpl:125
StreamExportPromAPILine(qw422016, xb)
//line app/vmselect/prometheus/export.qtpl:125
qt422016.ReleaseWriter(qw422016)
//line app/vmselect/prometheus/export.qtpl:125
}
//line app/vmselect/prometheus/export.qtpl:125
func ExportPromAPILine(xb *exportBlock) string {
//line app/vmselect/prometheus/export.qtpl:125
qb422016 := qt422016.AcquireByteBuffer()
//line app/vmselect/prometheus/export.qtpl:125
WriteExportPromAPILine(qb422016, xb)
//line app/vmselect/prometheus/export.qtpl:125
qs422016 := string(qb422016.B)
//line app/vmselect/prometheus/export.qtpl:125
qt422016.ReleaseByteBuffer(qb422016)
//line app/vmselect/prometheus/export.qtpl:125
return qs422016
//line app/vmselect/prometheus/export.qtpl:125
}
//line app/vmselect/prometheus/export.qtpl:127
func StreamExportPromAPIResponse(qw422016 *qt422016.Writer, resultsCh <-chan *quicktemplate.ByteBuffer) {
//line app/vmselect/prometheus/export.qtpl:127
qw422016.N().S(`{"status":"success","data":{"resultType":"matrix","result":[`)
//line app/vmselect/prometheus/export.qtpl:133
bb, ok := <-resultsCh
//line app/vmselect/prometheus/export.qtpl:134
if ok {
//line app/vmselect/prometheus/export.qtpl:135
qw422016.N().Z(bb.B)
//line app/vmselect/prometheus/export.qtpl:136
quicktemplate.ReleaseByteBuffer(bb)
//line app/vmselect/prometheus/export.qtpl:137
for bb := range resultsCh {
//line app/vmselect/prometheus/export.qtpl:137
qw422016.N().S(`,`)
//line app/vmselect/prometheus/export.qtpl:138
qw422016.N().Z(bb.B)
//line app/vmselect/prometheus/export.qtpl:139
quicktemplate.ReleaseByteBuffer(bb)
//line app/vmselect/prometheus/export.qtpl:140
}
//line app/vmselect/prometheus/export.qtpl:141
}
//line app/vmselect/prometheus/export.qtpl:141
qw422016.N().S(`]}}`)
//line app/vmselect/prometheus/export.qtpl:145
}
//line app/vmselect/prometheus/export.qtpl:145
func WriteExportPromAPIResponse(qq422016 qtio422016.Writer, resultsCh <-chan *quicktemplate.ByteBuffer) {
//line app/vmselect/prometheus/export.qtpl:145
qw422016 := qt422016.AcquireWriter(qq422016)
//line app/vmselect/prometheus/export.qtpl:145
StreamExportPromAPIResponse(qw422016, resultsCh)
//line app/vmselect/prometheus/export.qtpl:145
qt422016.ReleaseWriter(qw422016)
//line app/vmselect/prometheus/export.qtpl:145
}
//line app/vmselect/prometheus/export.qtpl:145
func ExportPromAPIResponse(resultsCh <-chan *quicktemplate.ByteBuffer) string {
//line app/vmselect/prometheus/export.qtpl:145
qb422016 := qt422016.AcquireByteBuffer()
//line app/vmselect/prometheus/export.qtpl:145
WriteExportPromAPIResponse(qb422016, resultsCh)
//line app/vmselect/prometheus/export.qtpl:145
qs422016 := string(qb422016.B)
//line app/vmselect/prometheus/export.qtpl:145
qt422016.ReleaseByteBuffer(qb422016)
//line app/vmselect/prometheus/export.qtpl:145
return qs422016
//line app/vmselect/prometheus/export.qtpl:145
}
//line app/vmselect/prometheus/export.qtpl:147
func StreamExportStdResponse(qw422016 *qt422016.Writer, resultsCh <-chan *quicktemplate.ByteBuffer) {
//line app/vmselect/prometheus/export.qtpl:148
for bb := range resultsCh {
//line app/vmselect/prometheus/export.qtpl:149
qw422016.N().Z(bb.B)
//line app/vmselect/prometheus/export.qtpl:150
quicktemplate.ReleaseByteBuffer(bb)
//line app/vmselect/prometheus/export.qtpl:151
}
//line app/vmselect/prometheus/export.qtpl:152
}
//line app/vmselect/prometheus/export.qtpl:152
func WriteExportStdResponse(qq422016 qtio422016.Writer, resultsCh <-chan *quicktemplate.ByteBuffer) {
//line app/vmselect/prometheus/export.qtpl:152
qw422016 := qt422016.AcquireWriter(qq422016)
//line app/vmselect/prometheus/export.qtpl:152
StreamExportStdResponse(qw422016, resultsCh)
//line app/vmselect/prometheus/export.qtpl:152
qt422016.ReleaseWriter(qw422016)
//line app/vmselect/prometheus/export.qtpl:152
}
//line app/vmselect/prometheus/export.qtpl:152
func ExportStdResponse(resultsCh <-chan *quicktemplate.ByteBuffer) string {
//line app/vmselect/prometheus/export.qtpl:152
qb422016 := qt422016.AcquireByteBuffer()
//line app/vmselect/prometheus/export.qtpl:152
WriteExportStdResponse(qb422016, resultsCh)
//line app/vmselect/prometheus/export.qtpl:152
qs422016 := string(qb422016.B)
//line app/vmselect/prometheus/export.qtpl:152
qt422016.ReleaseByteBuffer(qb422016)
//line app/vmselect/prometheus/export.qtpl:152
return qs422016
//line app/vmselect/prometheus/export.qtpl:152
}
//line app/vmselect/prometheus/export.qtpl:154
func streamprometheusMetricName(qw422016 *qt422016.Writer, mn *storage.MetricName) {
//line app/vmselect/prometheus/export.qtpl:155
qw422016.N().Z(mn.MetricGroup)
//line app/vmselect/prometheus/export.qtpl:156
if len(mn.Tags) > 0 {
//line app/vmselect/prometheus/export.qtpl:156
qw422016.N().S(`{`)
//line app/vmselect/prometheus/export.qtpl:158
tags := mn.Tags
//line app/vmselect/prometheus/export.qtpl:159
qw422016.N().Z(tags[0].Key)
//line app/vmselect/prometheus/export.qtpl:159
qw422016.N().S(`=`)
//line app/vmselect/prometheus/export.qtpl:159
qw422016.N().QZ(tags[0].Value)
//line app/vmselect/prometheus/export.qtpl:160
tags = tags[1:]
//line app/vmselect/prometheus/export.qtpl:161
for i := range tags {
//line app/vmselect/prometheus/export.qtpl:162
tag := &tags[i]
//line app/vmselect/prometheus/export.qtpl:162
qw422016.N().S(`,`)
//line app/vmselect/prometheus/export.qtpl:163
qw422016.N().Z(tag.Key)
//line app/vmselect/prometheus/export.qtpl:163
qw422016.N().S(`=`)
//line app/vmselect/prometheus/export.qtpl:163
qw422016.N().QZ(tag.Value)
//line app/vmselect/prometheus/export.qtpl:164
}
//line app/vmselect/prometheus/export.qtpl:164
qw422016.N().S(`}`)
//line app/vmselect/prometheus/export.qtpl:166
}
//line app/vmselect/prometheus/export.qtpl:167
}
//line app/vmselect/prometheus/export.qtpl:167
func writeprometheusMetricName(qq422016 qtio422016.Writer, mn *storage.MetricName) {
//line app/vmselect/prometheus/export.qtpl:167
qw422016 := qt422016.AcquireWriter(qq422016)
//line app/vmselect/prometheus/export.qtpl:167
streamprometheusMetricName(qw422016, mn)
//line app/vmselect/prometheus/export.qtpl:167
qt422016.ReleaseWriter(qw422016)
//line app/vmselect/prometheus/export.qtpl:167
}
//line app/vmselect/prometheus/export.qtpl:167
func prometheusMetricName(mn *storage.MetricName) string {
//line app/vmselect/prometheus/export.qtpl:167
qb422016 := qt422016.AcquireByteBuffer()
//line app/vmselect/prometheus/export.qtpl:167
writeprometheusMetricName(qb422016, mn)
//line app/vmselect/prometheus/export.qtpl:167
qs422016 := string(qb422016.B)
//line app/vmselect/prometheus/export.qtpl:167
qt422016.ReleaseByteBuffer(qb422016)
//line app/vmselect/prometheus/export.qtpl:167
return qs422016
//line app/vmselect/prometheus/export.qtpl:167
}

View file

@ -8,6 +8,7 @@ import (
"runtime"
"sort"
"strconv"
"strings"
"sync"
"time"
@ -112,6 +113,92 @@ func FederateHandler(startTime time.Time, w http.ResponseWriter, r *http.Request
var federateDuration = metrics.NewSummary(`vm_request_duration_seconds{path="/federate"}`)
// ExportCSVHandler exports data in CSV format from /api/v1/export/csv
func ExportCSVHandler(startTime time.Time, w http.ResponseWriter, r *http.Request) error {
ct := startTime.UnixNano() / 1e6
if err := r.ParseForm(); err != nil {
return fmt.Errorf("cannot parse request form values: %w", err)
}
format := r.FormValue("format")
if len(format) == 0 {
return fmt.Errorf("missing `format` arg; see https://victoriametrics.github.io/#how-to-export-csv-data")
}
fieldNames := strings.Split(format, ",")
matches := r.Form["match[]"]
if len(matches) == 0 {
// Maintain backwards compatibility
match := r.FormValue("match")
if len(match) == 0 {
return fmt.Errorf("missing `match[]` arg")
}
matches = []string{match}
}
start, err := searchutils.GetTime(r, "start", 0)
if err != nil {
return err
}
end, err := searchutils.GetTime(r, "end", ct)
if err != nil {
return err
}
deadline := searchutils.GetDeadlineForExport(r, startTime)
tagFilterss, err := getTagFilterssFromMatches(matches)
if err != nil {
return err
}
sq := &storage.SearchQuery{
MinTimestamp: start,
MaxTimestamp: end,
TagFilterss: tagFilterss,
}
w.Header().Set("Content-Type", "text/csv")
bw := bufferedwriter.Get(w)
defer bufferedwriter.Put(bw)
resultsCh := make(chan *quicktemplate.ByteBuffer, runtime.GOMAXPROCS(-1))
doneCh := make(chan error)
go func() {
err := netstorage.ExportBlocks(sq, deadline, func(mn *storage.MetricName, b *storage.Block, tr storage.TimeRange) error {
if err := bw.Error(); err != nil {
return err
}
if err := b.UnmarshalData(); err != nil {
return fmt.Errorf("cannot unmarshal block during export: %s", err)
}
xb := exportBlockPool.Get().(*exportBlock)
xb.mn = mn
xb.timestamps, xb.values = b.AppendRowsWithTimeRangeFilter(xb.timestamps[:0], xb.values[:0], tr)
if len(xb.timestamps) > 0 {
bb := quicktemplate.AcquireByteBuffer()
WriteExportCSVLine(bb, xb, fieldNames)
resultsCh <- bb
}
xb.reset()
exportBlockPool.Put(xb)
return nil
})
close(resultsCh)
doneCh <- err
}()
// Consume all the data from resultsCh.
for bb := range resultsCh {
// Do not check for error in bw.Write, since this error is checked inside netstorage.ExportBlocks above.
_, _ = bw.Write(bb.B)
quicktemplate.ReleaseByteBuffer(bb)
}
if err := bw.Flush(); err != nil {
return err
}
err = <-doneCh
if err != nil {
return fmt.Errorf("error during exporting data to csv: %w", err)
}
exportCSVDuration.UpdateDuration(startTime)
return nil
}
var exportCSVDuration = metrics.NewSummary(`vm_request_duration_seconds{path="/api/v1/export/csv"}`)
// ExportNativeHandler exports data in native format from /api/v1/export/native.
func ExportNativeHandler(startTime time.Time, w http.ResponseWriter, r *http.Request) error {
ct := startTime.UnixNano() / 1e6

View file

@ -494,6 +494,8 @@ func aggrFuncZScore(afa *aggrFuncArg) ([]*timeseries, error) {
//
// It is expected that a doesn't contain NaNs.
//
// The function modifies contents for a, so the caller must prepare it accordingly.
//
// See https://en.wikipedia.org/wiki/Mode_(statistics)
func modeNoNaNs(prevValue float64, a []float64) float64 {
if len(a) == 0 {

View file

@ -582,10 +582,6 @@ func TestExecSuccess(t *testing.T) {
Values: []float64{1, 1, 1, 1, 1, 1},
Timestamps: timestampsExpected,
}
r.MetricName.Tags = []storage.Tag{{
Key: []byte("yy"),
Value: []byte("foo"),
}}
resultExpected := []netstorage.Result{r}
f(q, resultExpected)
})
@ -636,6 +632,7 @@ func TestExecSuccess(t *testing.T) {
Values: []float64{1000, 1200, 1400, 1400, 1400, 1400},
Timestamps: timestampsExpected,
}
r.MetricName.MetricGroup = []byte("foobar")
resultExpected := []netstorage.Result{r}
f(q, resultExpected)
})
@ -647,6 +644,7 @@ func TestExecSuccess(t *testing.T) {
Values: []float64{1000, 1200, 1400, 1400, 1400, 1400},
Timestamps: timestampsExpected,
}
r.MetricName.MetricGroup = []byte("foobar")
resultExpected := []netstorage.Result{r}
f(q, resultExpected)
})
@ -3634,6 +3632,7 @@ func TestExecSuccess(t *testing.T) {
Values: []float64{7.8, 9.9, 11.9, 13.9, 15.9, 17.9},
Timestamps: timestampsExpected,
}
r.MetricName.MetricGroup = []byte("foobar")
resultExpected := []netstorage.Result{r}
f(q, resultExpected)
})
@ -4450,7 +4449,7 @@ func TestExecSuccess(t *testing.T) {
q := `distinct_over_time((time() < 1700)[500s])`
r1 := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{3, 3, 3, 3, 2, 1},
Values: []float64{3, 3, 3, 3, nan, nan},
Timestamps: timestampsExpected,
}
resultExpected := []netstorage.Result{r1}
@ -4461,7 +4460,7 @@ func TestExecSuccess(t *testing.T) {
q := `distinct_over_time((time() < 1700)[2.5i])`
r1 := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{3, 3, 3, 3, 2, 1},
Values: []float64{3, 3, 3, 3, nan, nan},
Timestamps: timestampsExpected,
}
resultExpected := []netstorage.Result{r1}

View file

@ -169,12 +169,20 @@ var rollupFuncsRemoveCounterResets = map[string]bool{
}
var rollupFuncsKeepMetricGroup = map[string]bool{
"holt_winters": true,
"predict_linear": true,
"default_rollup": true,
"avg_over_time": true,
"min_over_time": true,
"max_over_time": true,
"quantile_over_time": true,
"rollup": true,
"geomean_over_time": true,
"hoeffding_bound_lower": true,
"hoeffding_bound_upper": true,
"first_over_time": true,
"last_over_time": true,
"mode_over_time": true,
}
func getRollupAggrFuncNames(expr metricsql.Expr) ([]string, error) {
@ -492,6 +500,7 @@ func (rc *rollupConfig) doInternal(dstValues []float64, tsm *timeseriesMap, valu
j := 0
ni := 0
nj := 0
stalenessInterval := int64(float64(scrapeInterval) * 0.9)
for _, tEnd := range rc.Timestamps {
tStart := tEnd - window
ni = seekFirstTimestampIdxAfter(timestamps[i:], tStart, ni)
@ -508,9 +517,17 @@ func (rc *rollupConfig) doInternal(dstValues []float64, tsm *timeseriesMap, valu
rfa.prevValue = values[i-1]
rfa.prevTimestamp = timestamps[i-1]
}
rfa.values = values[i:j]
rfa.timestamps = timestamps[i:j]
if j == len(timestamps) && i < j && tEnd-timestamps[j-1] > stalenessInterval {
// Do not take into account the last data point in time series if the distance between this data point
// and tEnd exceeds stalenessInterval.
// This should prevent from double counting when a label changes in time series (for instance,
// during new deployment in K8S). See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/748
rfa.prevValue = nan
rfa.values = nil
rfa.timestamps = nil
}
rfa.currTimestamp = tEnd
value := rc.Func(rfa)
rfa.idx++
@ -1579,7 +1596,23 @@ func rollupTimestamp(rfa *rollupFuncArg) float64 {
func rollupModeOverTime(rfa *rollupFuncArg) float64 {
// There is no need in handling NaNs here, since they must be cleaned up
// before calling rollup funcs.
return modeNoNaNs(rfa.prevValue, rfa.values)
// Copy rfa.values to a.A, since modeNoNaNs modifies a.A contents.
a := float64sPool.Get().(*float64s)
a.A = append(a.A[:0], rfa.values...)
result := modeNoNaNs(rfa.prevValue, a.A)
float64sPool.Put(a)
return result
}
var float64sPool = &sync.Pool{
New: func() interface{} {
return &float64s{}
},
}
type float64s struct {
A []float64
}
func rollupAscentOverTime(rfa *rollupFuncArg) float64 {

View file

@ -583,7 +583,7 @@ func TestRollupNoWindowPartialPoints(t *testing.T) {
}
rc.Timestamps = getTimestamps(rc.Start, rc.End, rc.Step)
values := rc.Do(nil, testValues, testTimestamps)
valuesExpected := []float64{nan, nan, 123, 34, 32}
valuesExpected := []float64{nan, nan, 123, 34, nan}
timestampsExpected := []int64{-50, 0, 50, 100, 150}
testRowsEqual(t, values, rc.Timestamps, valuesExpected, timestampsExpected)
})
@ -690,7 +690,7 @@ func TestRollupFuncsNoWindow(t *testing.T) {
}
rc.Timestamps = getTimestamps(rc.Start, rc.End, rc.Step)
values := rc.Do(nil, testValues, testTimestamps)
valuesExpected := []float64{nan, 123, 54, 44, 34}
valuesExpected := []float64{nan, 123, 54, 44, nan}
timestampsExpected := []int64{0, 40, 80, 120, 160}
testRowsEqual(t, values, rc.Timestamps, valuesExpected, timestampsExpected)
})
@ -704,7 +704,7 @@ func TestRollupFuncsNoWindow(t *testing.T) {
}
rc.Timestamps = getTimestamps(rc.Start, rc.End, rc.Step)
values := rc.Do(nil, testValues, testTimestamps)
valuesExpected := []float64{nan, 4, 4, 3, 1}
valuesExpected := []float64{nan, 4, 4, 3, nan}
timestampsExpected := []int64{0, 40, 80, 120, 160}
testRowsEqual(t, values, rc.Timestamps, valuesExpected, timestampsExpected)
})
@ -718,7 +718,7 @@ func TestRollupFuncsNoWindow(t *testing.T) {
}
rc.Timestamps = getTimestamps(rc.Start, rc.End, rc.Step)
values := rc.Do(nil, testValues, testTimestamps)
valuesExpected := []float64{nan, 21, 12, 32, 34}
valuesExpected := []float64{nan, 21, 12, 32, nan}
timestampsExpected := []int64{0, 40, 80, 120, 160}
testRowsEqual(t, values, rc.Timestamps, valuesExpected, timestampsExpected)
})
@ -732,7 +732,7 @@ func TestRollupFuncsNoWindow(t *testing.T) {
}
rc.Timestamps = getTimestamps(rc.Start, rc.End, rc.Step)
values := rc.Do(nil, testValues, testTimestamps)
valuesExpected := []float64{nan, 123, 99, 44, 34}
valuesExpected := []float64{nan, 123, 99, 44, nan}
timestampsExpected := []int64{0, 40, 80, 120, 160}
testRowsEqual(t, values, rc.Timestamps, valuesExpected, timestampsExpected)
})
@ -746,7 +746,7 @@ func TestRollupFuncsNoWindow(t *testing.T) {
}
rc.Timestamps = getTimestamps(rc.Start, rc.End, rc.Step)
values := rc.Do(nil, testValues, testTimestamps)
valuesExpected := []float64{nan, 222, 199, 110, 34}
valuesExpected := []float64{nan, 222, 199, 110, nan}
timestampsExpected := []int64{0, 40, 80, 120, 160}
testRowsEqual(t, values, rc.Timestamps, valuesExpected, timestampsExpected)
})
@ -760,7 +760,7 @@ func TestRollupFuncsNoWindow(t *testing.T) {
}
rc.Timestamps = getTimestamps(rc.Start, rc.End, rc.Step)
values := rc.Do(nil, testValues, testTimestamps)
valuesExpected := []float64{nan, nan, -9, 22, 0}
valuesExpected := []float64{nan, nan, -9, 22, nan}
timestampsExpected := []int64{0, 40, 80, 120, 160}
testRowsEqual(t, values, rc.Timestamps, valuesExpected, timestampsExpected)
})
@ -788,7 +788,7 @@ func TestRollupFuncsNoWindow(t *testing.T) {
}
rc.Timestamps = getTimestamps(rc.Start, rc.End, rc.Step)
values := rc.Do(nil, testValues, testTimestamps)
valuesExpected := []float64{nan, 0.004, 0, 0, 0.03}
valuesExpected := []float64{nan, 0.004, 0, 0, nan}
timestampsExpected := []int64{0, 40, 80, 120, 160}
testRowsEqual(t, values, rc.Timestamps, valuesExpected, timestampsExpected)
})
@ -802,7 +802,7 @@ func TestRollupFuncsNoWindow(t *testing.T) {
}
rc.Timestamps = getTimestamps(rc.Start, rc.End, rc.Step)
values := rc.Do(nil, testValues, testTimestamps)
valuesExpected := []float64{nan, 0.031, 0.044, 0.04, 0.01}
valuesExpected := []float64{nan, 0.031, 0.044, 0.04, nan}
timestampsExpected := []int64{0, 40, 80, 120, 160}
testRowsEqual(t, values, rc.Timestamps, valuesExpected, timestampsExpected)
})
@ -816,7 +816,7 @@ func TestRollupFuncsNoWindow(t *testing.T) {
}
rc.Timestamps = getTimestamps(rc.Start, rc.End, rc.Step)
values := rc.Do(nil, testValues, testTimestamps)
valuesExpected := []float64{nan, 0.031, 0.075, 0.115, 0.125}
valuesExpected := []float64{nan, 0.031, 0.075, 0.115, nan}
timestampsExpected := []int64{0, 40, 80, 120, 160}
testRowsEqual(t, values, rc.Timestamps, valuesExpected, timestampsExpected)
})
@ -830,7 +830,7 @@ func TestRollupFuncsNoWindow(t *testing.T) {
}
rc.Timestamps = getTimestamps(rc.Start, rc.End, rc.Step)
values := rc.Do(nil, testValues, testTimestamps)
valuesExpected := []float64{nan, 0.010333333333333333, 0.011, 0.013333333333333334, 0.01}
valuesExpected := []float64{nan, 0.010333333333333333, 0.011, 0.013333333333333334, nan}
timestampsExpected := []int64{0, 40, 80, 120, 160}
testRowsEqual(t, values, rc.Timestamps, valuesExpected, timestampsExpected)
})
@ -844,7 +844,7 @@ func TestRollupFuncsNoWindow(t *testing.T) {
}
rc.Timestamps = getTimestamps(rc.Start, rc.End, rc.Step)
values := rc.Do(nil, testValues, testTimestamps)
valuesExpected := []float64{nan, 0.010333333333333333, 0.010714285714285714, 0.012, 0.0125}
valuesExpected := []float64{nan, 0.010333333333333333, 0.010714285714285714, 0.012, nan}
timestampsExpected := []int64{0, 40, 80, 120, 160}
testRowsEqual(t, values, rc.Timestamps, valuesExpected, timestampsExpected)
})
@ -858,7 +858,7 @@ func TestRollupFuncsNoWindow(t *testing.T) {
}
rc.Timestamps = getTimestamps(rc.Start, rc.End, rc.Step)
values := rc.Do(nil, testValues, testTimestamps)
valuesExpected := []float64{nan, 4, 4, 3, 0}
valuesExpected := []float64{nan, 4, 4, 3, nan}
timestampsExpected := []int64{0, 40, 80, 120, 160}
testRowsEqual(t, values, rc.Timestamps, valuesExpected, timestampsExpected)
})
@ -886,7 +886,7 @@ func TestRollupFuncsNoWindow(t *testing.T) {
}
rc.Timestamps = getTimestamps(rc.Start, rc.End, rc.Step)
values := rc.Do(nil, testValues, testTimestamps)
valuesExpected := []float64{nan, 2, 2, 1, 0}
valuesExpected := []float64{nan, 2, 2, 1, nan}
timestampsExpected := []int64{0, 40, 80, 120, 160}
testRowsEqual(t, values, rc.Timestamps, valuesExpected, timestampsExpected)
})
@ -900,7 +900,7 @@ func TestRollupFuncsNoWindow(t *testing.T) {
}
rc.Timestamps = getTimestamps(rc.Start, rc.End, rc.Step)
values := rc.Do(nil, testValues, testTimestamps)
valuesExpected := []float64{nan, 55.5, 49.75, 36.666666666666664, 34}
valuesExpected := []float64{nan, 55.5, 49.75, 36.666666666666664, nan}
timestampsExpected := []int64{0, 40, 80, 120, 160}
testRowsEqual(t, values, rc.Timestamps, valuesExpected, timestampsExpected)
})
@ -914,7 +914,7 @@ func TestRollupFuncsNoWindow(t *testing.T) {
}
rc.Timestamps = getTimestamps(rc.Start, rc.End, rc.Step)
values := rc.Do(nil, testValues, testTimestamps)
valuesExpected := []float64{0, -2879.310344827587, 558.0608793686595, 422.84569138276544, 0}
valuesExpected := []float64{0, -2879.310344827587, 558.0608793686595, 422.84569138276544, nan}
timestampsExpected := []int64{0, 40, 80, 120, 160}
testRowsEqual(t, values, rc.Timestamps, valuesExpected, timestampsExpected)
})
@ -942,7 +942,7 @@ func TestRollupFuncsNoWindow(t *testing.T) {
}
rc.Timestamps = getTimestamps(rc.Start, rc.End, rc.Step)
values := rc.Do(nil, testValues, testTimestamps)
valuesExpected := []float64{nan, -1916.6666666666665, -43500, 400, 0}
valuesExpected := []float64{nan, -1916.6666666666665, -43500, 400, nan}
timestampsExpected := []int64{0, 40, 80, 120, 160}
testRowsEqual(t, values, rc.Timestamps, valuesExpected, timestampsExpected)
})
@ -956,7 +956,7 @@ func TestRollupFuncsNoWindow(t *testing.T) {
}
rc.Timestamps = getTimestamps(rc.Start, rc.End, rc.Step)
values := rc.Do(nil, testValues, testTimestamps)
valuesExpected := []float64{nan, 39.81519810323691, 32.080952292598795, 5.2493385826745405, 5.830951894845301}
valuesExpected := []float64{nan, 39.81519810323691, 32.080952292598795, 5.2493385826745405, nan}
timestampsExpected := []int64{0, 40, 80, 120, 160}
testRowsEqual(t, values, rc.Timestamps, valuesExpected, timestampsExpected)
})
@ -970,7 +970,7 @@ func TestRollupFuncsNoWindow(t *testing.T) {
}
rc.Timestamps = getTimestamps(rc.Start, rc.End, rc.Step)
values := rc.Do(nil, testValues, testTimestamps)
valuesExpected := []float64{nan, 2.148, 1.593, 1.156, 1.36}
valuesExpected := []float64{nan, 2.148, 1.593, 1.156, nan}
timestampsExpected := []int64{0, 40, 80, 120, 160}
testRowsEqual(t, values, rc.Timestamps, valuesExpected, timestampsExpected)
})
@ -984,7 +984,7 @@ func TestRollupFuncsNoWindow(t *testing.T) {
}
rc.Timestamps = getTimestamps(rc.Start, rc.End, rc.Step)
values := rc.Do(nil, testValues, testTimestamps)
valuesExpected := []float64{nan, 4, 4, 3, 1}
valuesExpected := []float64{nan, 4, 4, 3, nan}
timestampsExpected := []int64{0, 40, 80, 120, 160}
testRowsEqual(t, values, rc.Timestamps, valuesExpected, timestampsExpected)
})
@ -998,7 +998,7 @@ func TestRollupFuncsNoWindow(t *testing.T) {
}
rc.Timestamps = getTimestamps(rc.Start, rc.End, rc.Step)
values := rc.Do(nil, testValues, testTimestamps)
valuesExpected := []float64{nan, 4, 7, 6, 3}
valuesExpected := []float64{nan, 4, 7, 6, nan}
timestampsExpected := []int64{0, 40, 80, 120, 160}
testRowsEqual(t, values, rc.Timestamps, valuesExpected, timestampsExpected)
})
@ -1012,7 +1012,7 @@ func TestRollupFuncsNoWindow(t *testing.T) {
}
rc.Timestamps = getTimestamps(rc.Start, rc.End, rc.Step)
values := rc.Do(nil, testValues, testTimestamps)
valuesExpected := []float64{nan, nan, 34, 44, 44}
valuesExpected := []float64{nan, 21, 34, 34, nan}
timestampsExpected := []int64{0, 40, 80, 120, 160}
testRowsEqual(t, values, rc.Timestamps, valuesExpected, timestampsExpected)
})
@ -1026,7 +1026,7 @@ func TestRollupFuncsNoWindow(t *testing.T) {
}
rc.Timestamps = getTimestamps(rc.Start, rc.End, rc.Step)
values := rc.Do(nil, testValues, testTimestamps)
valuesExpected := []float64{nan, 1262.5, 3187.5, 4059.523809523809, 6200}
valuesExpected := []float64{nan, 2775, 5262.5, 3678.5714285714284, nan}
timestampsExpected := []int64{0, 40, 80, 120, 160}
testRowsEqual(t, values, rc.Timestamps, valuesExpected, timestampsExpected)
})
@ -1040,7 +1040,7 @@ func TestRollupFuncsNoWindow(t *testing.T) {
}
rc.Timestamps = getTimestamps(rc.Start, rc.End, rc.Step)
values := rc.Do(nil, testValues, testTimestamps)
valuesExpected := []float64{nan, 0.9397878236968458, 1.1969836716333457, 2.3112921116373175, nan}
valuesExpected := []float64{nan, -0.86650328627136, -1.1200838283548589, -0.40035755084856683, nan}
timestampsExpected := []int64{0, 40, 80, 120, 160}
testRowsEqual(t, values, rc.Timestamps, valuesExpected, timestampsExpected)
})
@ -1062,7 +1062,7 @@ func TestRollupBigNumberOfValues(t *testing.T) {
srcTimestamps[i] = int64(i / 2)
}
values := rc.Do(nil, srcValues, srcTimestamps)
valuesExpected := []float64{1, 4001, 8001, 9999, nan, nan}
valuesExpected := []float64{1, 4001, 8001, nan, nan, nan}
timestampsExpected := []int64{0, 2000, 4000, 6000, 8000, 10000}
testRowsEqual(t, values, rc.Timestamps, valuesExpected, timestampsExpected)
}

View file

@ -17,6 +17,27 @@ import (
"github.com/valyala/histogram"
)
var transformFuncsKeepMetricGroup = map[string]bool{
"ceil": true,
"clamp_max": true,
"clamp_min": true,
"floor": true,
"round": true,
"keep_last_value": true,
"keep_next_value": true,
"interpolate": true,
"running_min": true,
"running_max": true,
"running_avg": true,
"range_min": true,
"range_max": true,
"range_avg": true,
"range_first": true,
"range_last": true,
"range_quantile": true,
"smooth_exponential": true,
}
var transformFuncs = map[string]transformFunc{
// Standard promql funcs
// See funcs accepting instant-vector on https://prometheus.io/docs/prometheus/latest/querying/functions/ .
@ -125,8 +146,12 @@ func newTransformFuncOneArg(tf func(v float64) float64) transformFunc {
}
func doTransformValues(arg []*timeseries, tf func(values []float64), fe *metricsql.FuncExpr) ([]*timeseries, error) {
name := strings.ToLower(fe.Name)
keepMetricGroup := transformFuncsKeepMetricGroup[name]
for _, ts := range arg {
ts.MetricName.ResetMetricGroup()
if !keepMetricGroup {
ts.MetricName.ResetMetricGroup()
}
tf(ts.Values)
}
return arg, nil
@ -142,23 +167,24 @@ func transformAbsent(tfa *transformFuncArg) ([]*timeseries, error) {
if err := expectTransformArgsNum(args, 1); err != nil {
return nil, err
}
arg := args[0]
if len(arg) == 0 {
rvs := getAbsentTimeseries(tfa.ec, tfa.fe.Args[0])
tss := args[0]
rvs := getAbsentTimeseries(tfa.ec, tfa.fe.Args[0])
if len(tss) == 0 {
return rvs, nil
}
for _, ts := range arg {
ts.MetricName.ResetMetricGroup()
for i, v := range ts.Values {
if !math.IsNaN(v) {
v = nan
} else {
v = 1
for i := range tss[0].Values {
isAbsent := true
for _, ts := range tss {
if !math.IsNaN(ts.Values[i]) {
isAbsent = false
break
}
ts.Values[i] = v
}
if !isAbsent {
rvs[0].Values[i] = nan
}
}
return arg, nil
return rvs, nil
}
func getAbsentTimeseries(ec *EvalConfig, arg metricsql.Expr) []*timeseries {

View file

@ -446,6 +446,9 @@ func registerStorageMetrics() {
return float64(idbm().SizeBytes)
})
metrics.NewGauge(`vm_rows_added_to_storage_total`, func() float64 {
return float64(m().RowsAddedTotal)
})
metrics.NewGauge(`vm_deduplicated_samples_total{type="merge"}`, func() float64 {
return float64(m().DedupsDuringMerge)
})

View file

@ -192,6 +192,7 @@ or [an alternative dashboard for VictoriaMetrics cluster](https://grafana.com/gr
- `federate` - returns [federated metrics](https://prometheus.io/docs/prometheus/latest/federation/).
- `api/v1/export` - exports raw data in JSON line format. See [this article](https://medium.com/@valyala/analyzing-prometheus-data-with-external-tools-5f3e5e147639) for details.
- `api/v1/export/native` - exports raw data in native binary format. It may be imported into another VictoriaMetrics via `api/v1/import/native` (see above).
- `api/v1/export/csv` - exports data in CSV. It may be imported into another VictoriaMetrics via `api/v1/import/csv` (see above).
- `api/v1/status/tsdb` - for time series stats. See [these docs](https://prometheus.io/docs/prometheus/latest/querying/api/#tsdb-stats) for details.
- `api/v1/status/active_queries` - for currently executed active queries. Note that every `vmselect` maintains an independent list of active queries,
which is returned in the response.
@ -279,6 +280,12 @@ Each instance type - `vminsert`, `vmselect` and `vmstorage` - can run on the mos
* The recommended total number of vCPU cores for all the `vmstorage` instances can be calculated from the ingestion rate: `vCPUs = ingestion_rate / 150K`.
* The recommended total amount of RAM for all the `vmstorage` instances can be calculated from the number of active time series: `RAM = active_time_series * 1KB`.
Time series is active if it received at least a single data point during the last hour or if it has been queried during the last hour.
The required RAM per each `vmstorage` should be multiplied by `-replicationFactor` if [replication](#replication-and-data-safety) is enabled.
Additional RAM can be required for query processing.
Calculated RAM requrements may differ from actual RAM requirements due to various factors:
* The average number of labels per time series. More labels require more RAM.
* The average length of label names and label values. Longer labels require more RAM.
* The type of queries. Heavy queries that scan big number of time series over long time ranges require more RAM.
* The recommended total amount of storage space for all the `vmstorage` instances can be calculated
from the ingestion rate and retention: `storage_space = ingestion_rate * retention_seconds`.

View file

@ -15,6 +15,8 @@ The following functionality is implemented differently in MetricsQL comparing to
* MetricsQL removes all the `NaN` values from the output, so some queries like `(-1)^0.5` return empty results in VictoriaMetrics, while returning
a series of `NaN` values in Prometheus. Note that Grafana doesn't draw any lines or dots for `NaN` values, so usually the end result looks the same for both
VictoriaMetrics and Prometheus.
* MetricsQL keeps metric names after applying functions, which don't change the meaining of the original time series. For example, `min_over_time(foo)` or `round(foo)`
leave `foo` metric name in the result. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/674) for details.
Other PromQL functionality should work the same in MetricsQL. [File an issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues)
if you notice discrepancies between PromQL and MetricsQL results other than mentioned above.

View file

@ -118,6 +118,7 @@ See [features available for enterprise customers](https://github.com/VictoriaMet
* [How to export time series](#how-to-export-time-series)
* [How to export data in native format](#how-to-export-data-in-native-format)
* [How to export data in JSON line format](#how-to-export-data-in-json-line-format)
* [How to export CSV data](#how-to-export-csv-data)
* [How to import time series data](#how-to-import-time-series-data)
* [How to import data in native format](#how-to-import-data-in-native-format)
* [How to import data in json line format](#how-to-import-data-in-json-line-format)
@ -295,6 +296,7 @@ Currently the following [scrape_config](https://prometheus.io/docs/prometheus/la
* [consul_sd_config](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#consul_sd_config)
* [dns_sd_config](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#dns_sd_config)
* [openstack_sd_config](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#openstack_sd_config)
* [dockerswarm_sd_config](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#dockerswarm_sd_config)
In the future other `*_sd_config` types will be supported.
@ -682,6 +684,7 @@ VictoriaMetrics provides the following handlers for exporting data:
* `/api/v1/export/native` for exporting data in native binary format. This is the most efficient format for data export.
See [these docs](#how-to-export-data-in-native-format) for details.
* `/api/v1/export` for exporing data in JSON line format. See [these docs](#how-to-export-data-in-json-line-format) for details.
* `/api/v1/export/csv` for exporting data in CSV. See [these docs](#how-to-export-csv-data) for details.
#### How to export data in native format
@ -731,13 +734,37 @@ The maximum duration for each request to `/api/v1/export` is limited by `-search
Exported data can be imported via POST'ing it to [/api/v1/import](#how-to-import-data-in-json-line-format).
#### How to export CSV data
Send a request to `http://<victoriametrics-addr>:8428/api/v1/export/csv?format=<format>&match=<timeseries_selector_for_export>`,
where:
* `<format>` must contain comma-delimited label names for the exported CSV. The following special label names are supported:
* `__name__` - metric name
* `__value__` - sample value
* `__timestamp__:<ts_format>` - sample timestamp. `<ts_format>` can have the following values:
* `unix_s` - unix seconds
* `unix_ms` - unix milliseconds
* `unix_ns` - unix nanoseconds
* `rfc3339` - [RFC3339](https://www.ietf.org/rfc/rfc3339.txt) time
* `custom:<layout>` - custom layout for time that is supported by [time.Format](https://golang.org/pkg/time/#Time.Format) function from Go.
* `<timeseries_selector_for_export>` may contain any [time series selector](https://prometheus.io/docs/prometheus/latest/querying/basics/#time-series-selectors)
for metrics to export.
Optional `start` and `end` args may be added to the request in order to limit the time frame for the exported data. These args may contain either
unix timestamp in seconds or [RFC3339](https://www.ietf.org/rfc/rfc3339.txt) values.
The exported CSV data can be imported to VictoriaMetrics via [/api/v1/import/csv](#how-to-import-csv-data).
### How to import time series data
Time series data can be imported via any supported ingestion protocol:
* [Prometheus remote_write API](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#remote_write).
* Influx line protocol. See [these docs](#how-to-send-data-from-influxdb-compatible-agents-such-as-telegraf) for details.
* Graphite plaintext protocol. See[these docs](#how-to-send-data-from-graphite-compatible-agents-such-as-statsd) for details.
* Graphite plaintext protocol. See [these docs](#how-to-send-data-from-graphite-compatible-agents-such-as-statsd) for details.
* OpenTSDB telnet put protocol. See [these docs](#sending-data-via-telnet-put-protocol) for details.
* OpenTSDB http `/api/put` protocol. See [these docs](#sending-opentsdb-data-via-http-apiput-requests) for details.
* `/api/v1/import` for importing data obtained from [/api/v1/export](#how-to-export-data-in-json-line-format).

View file

@ -151,6 +151,8 @@ The following scrape types in [scrape_config](https://prometheus.io/docs/prometh
* `openstack_sd_configs` - for scraping OpenStack targets.
See [openstack_sd_config](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#openstack_sd_config) for details.
[OpenStack identity API v3](https://docs.openstack.org/api-ref/identity/v3/) is supported only.
* `dockerswarm_sd_configs` - for scraping Docker Swarm targets.
See [dockerswarm_sd_config](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#dockerswarm_sd_config) for details.
File feature requests at [our issue tracker](https://github.com/VictoriaMetrics/VictoriaMetrics/issues) if you need other service discovery mechanisms to be supported by `vmagent`.

View file

@ -11,6 +11,7 @@ rules against configured address.
* Prometheus [alerting rules definition format](https://prometheus.io/docs/prometheus/latest/configuration/alerting_rules/#defining-alerting-rules)
support;
* Integration with [Alertmanager](https://github.com/prometheus/alertmanager);
* Keeps the alerts [state on restarts](https://github.com/VictoriaMetrics/VictoriaMetrics/tree/master/app/vmalert#alerts-state-on-restarts);
* Lightweight without extra dependencies.
### Limitations:
@ -121,14 +122,6 @@ annotations:
[ <labelname>: <tmpl_string> ]
```
`vmalert` has no local storage and alerts state is stored in process memory. Hence, after reloading of `vmalert` process
alerts state will be lost. To avoid this situation, `vmalert` may be configured via following flags:
* `-remoteWrite.url` - URL to Victoria Metrics or VMInsert. `vmalert` will persist alerts state into the configured
address in form of timeseries with name `ALERTS` via remote-write protocol.
* `-remoteRead.url` - URL to Victoria Metrics or VMSelect. `vmalert` will try to restore alerts state from configured
address by querying `ALERTS` timeseries.
##### Recording rules
The syntax for recording rules is following:
@ -147,6 +140,22 @@ labels:
For recording rules to work `-remoteWrite.url` must specified.
#### Alerts state on restarts
`vmalert` has no local storage, so alerts state is stored in the process memory. Hence, after reloading of `vmalert`
the process alerts state will be lost. To avoid this situation, `vmalert` should be configured via the following flags:
* `-remoteWrite.url` - URL to VictoriaMetrics (Single) or VMInsert (Cluster). `vmalert` will persist alerts state
into the configured address in the form of time series named `ALERTS` and `ALERTS_FOR_STATE` via remote-write protocol.
These are regular time series and may be queried from VM just as any other time series.
The state stored to the configured address on every rule evaluation.
* `-remoteRead.url` - URL to VictoriaMetrics (Single) or VMSelect (Cluster). `vmalert` will try to restore alerts state
from configured address by querying time series with name `ALERTS_FOR_STATE`.
Both flags are required for the proper state restoring. Restore process may fail if time series are missing
in configured `-remoteRead.url`, weren't updated in the last `1h` or received state doesn't match current `vmalert`
rules configuration.
#### WEB
`vmalert` runs a web-server (`-httpListenAddr`) for serving metrics and alerts endpoints:

View file

@ -9,6 +9,9 @@ import (
// This filesystem is used for performing server-side file copies
// instead of uploading data from local filesystem.
type OriginFS interface {
// MustStop must be called when the RemoteFS is no longer needed.
MustStop()
// String must return human-readable representation of OriginFS.
String() string
@ -18,6 +21,9 @@ type OriginFS interface {
// RemoteFS is a filesystem where backups are stored.
type RemoteFS interface {
// MustStop must be called when the RemoteFS is no longer needed.
MustStop()
// String must return human-readable representation of RemoteFS.
String() string

View file

@ -15,6 +15,9 @@ type bandwidthLimiter struct {
// quota for the current second
quota int
stopCh chan struct{}
wg sync.WaitGroup
}
func newBandwidthLimiter(perSecondLimit int) *bandwidthLimiter {
@ -25,10 +28,20 @@ func newBandwidthLimiter(perSecondLimit int) *bandwidthLimiter {
bl.perSecondLimit = perSecondLimit
var mu sync.Mutex
bl.c = sync.NewCond(&mu)
go bl.perSecondUpdater()
bl.stopCh = make(chan struct{})
bl.wg.Add(1)
go func() {
defer bl.wg.Done()
bl.perSecondUpdater()
}()
return &bl
}
func (bl *bandwidthLimiter) MustStop() {
close(bl.stopCh)
bl.wg.Wait()
}
func (bl *bandwidthLimiter) NewReadCloser(rc io.ReadCloser) *bandwidthLimitedReader {
return &bandwidthLimitedReader{
rc: rc,
@ -83,7 +96,12 @@ func (blw *bandwidthLimitedWriter) Close() error {
func (bl *bandwidthLimiter) perSecondUpdater() {
tc := time.NewTicker(time.Second)
c := bl.c
for range tc.C {
for {
select {
case <-tc.C:
case <-bl.stopCh:
return
}
c.L.Lock()
bl.quota = bl.perSecondLimit
c.Signal()

View file

@ -27,7 +27,9 @@ type FS struct {
bl *bandwidthLimiter
}
// Init initializes fs
// Init initializes fs.
//
// The returned fs must be stopped when no long needed with MustStop call.
func (fs *FS) Init() error {
if fs.MaxBytesPerSecond > 0 {
fs.bl = newBandwidthLimiter(fs.MaxBytesPerSecond)
@ -35,6 +37,15 @@ func (fs *FS) Init() error {
return nil
}
// MustStop stops fs.
func (fs *FS) MustStop() {
if fs.bl == nil {
return
}
fs.bl.MustStop()
fs.bl = nil
}
// String returns user-readable representation for the fs.
func (fs *FS) String() string {
return fmt.Sprintf("fslocal %q", fs.Dir)

View file

@ -7,6 +7,11 @@ import (
// FS represents nil remote filesystem.
type FS struct{}
// MustStop stops fs.
func (fs *FS) MustStop() {
// Nothing to do
}
// String returns human-readable string representation for fs.
func (fs *FS) String() string {
return "fsnil"

View file

@ -22,6 +22,11 @@ type FS struct {
Dir string
}
// MustStop stops fs.
func (fs *FS) MustStop() {
// Nothing to do
}
// String returns human-readable string representation for fs.
func (fs *FS) String() string {
return fmt.Sprintf("fsremote %q", fs.Dir)

View file

@ -33,6 +33,8 @@ type FS struct {
}
// Init initializes fs.
//
// The returned fs must be stopped when no long needed with MustStop call.
func (fs *FS) Init() error {
if fs.bkt != nil {
logger.Panicf("BUG: fs.Init has been already called")
@ -63,6 +65,11 @@ func (fs *FS) Init() error {
return nil
}
// MustStop stops fs.
func (fs *FS) MustStop() {
fs.bkt = nil
}
// String returns human-readable description for fs.
func (fs *FS) String() string {
return fmt.Sprintf("GCS{bucket: %q, dir: %q}", fs.Bucket, fs.Dir)

View file

@ -45,6 +45,8 @@ type FS struct {
}
// Init initializes fs.
//
// The returned fs must be stopped when no long needed with MustStop call.
func (fs *FS) Init() error {
if fs.s3 != nil {
logger.Panicf("BUG: Init is already called")
@ -96,6 +98,12 @@ func (fs *FS) Init() error {
return nil
}
// MustStop stops fs.
func (fs *FS) MustStop() {
fs.s3 = nil
fs.uploader = nil
}
// String returns human-readable description for fs.
func (fs *FS) String() string {
return fmt.Sprintf("S3{bucket: %q, dir: %q}", fs.Bucket, fs.Dir)

View file

@ -18,6 +18,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/consul"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/dns"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/dockerswarm"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/ec2"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/gce"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/kubernetes"
@ -72,6 +73,7 @@ type ScrapeConfig struct {
KubernetesSDConfigs []kubernetes.SDConfig `yaml:"kubernetes_sd_configs"`
OpenStackSDConfigs []openstack.SDConfig `yaml:"openstack_sd_configs"`
ConsulSDConfigs []consul.SDConfig `yaml:"consul_sd_configs"`
DockerSwarmConfigs []dockerswarm.SDConfig `yaml:"dockerswarm_sd_configs"`
DNSSDConfigs []dns.SDConfig `yaml:"dns_sd_configs"`
EC2SDConfigs []ec2.SDConfig `yaml:"ec2_sd_configs"`
GCESDConfigs []gce.SDConfig `yaml:"gce_sd_configs"`
@ -231,6 +233,34 @@ func (cfg *Config) getOpenStackSDScrapeWork(prev []ScrapeWork) []ScrapeWork {
return dst
}
// getDockerSwarmSDScrapeWork returns `dockerswarm_sd_configs` ScrapeWork from cfg.
func (cfg *Config) getDockerSwarmSDScrapeWork(prev []ScrapeWork) []ScrapeWork {
swsPrevByJob := getSWSByJob(prev)
var dst []ScrapeWork
for i := range cfg.ScrapeConfigs {
sc := &cfg.ScrapeConfigs[i]
dstLen := len(dst)
ok := true
for j := range sc.DockerSwarmConfigs {
sdc := &sc.DockerSwarmConfigs[j]
var okLocal bool
dst, okLocal = appendDockerSwarmScrapeWork(dst, sdc, cfg.baseDir, sc.swc)
if ok {
ok = okLocal
}
}
if ok {
continue
}
swsPrev := swsPrevByJob[sc.swc.jobName]
if len(swsPrev) > 0 {
logger.Errorf("there were errors when discovering dockerswarm targets for job %q, so preserving the previous targets", sc.swc.jobName)
dst = append(dst[:dstLen], swsPrev...)
}
}
return dst
}
// getConsulSDScrapeWork returns `consul_sd_configs` ScrapeWork from cfg.
func (cfg *Config) getConsulSDScrapeWork(prev []ScrapeWork) []ScrapeWork {
swsPrevByJob := getSWSByJob(prev)
@ -483,6 +513,15 @@ func appendOpenstackScrapeWork(dst []ScrapeWork, sdc *openstack.SDConfig, baseDi
return appendScrapeWorkForTargetLabels(dst, swc, targetLabels, "openstack_sd_config"), true
}
func appendDockerSwarmScrapeWork(dst []ScrapeWork, sdc *dockerswarm.SDConfig, baseDir string, swc *scrapeWorkConfig) ([]ScrapeWork, bool) {
targetLabels, err := dockerswarm.GetLabels(sdc, baseDir)
if err != nil {
logger.Errorf("error when discovering dockerswarm targets for `job_name` %q: %s; skipping it", swc.jobName, err)
return dst, false
}
return appendScrapeWorkForTargetLabels(dst, swc, targetLabels, "dockerswarm_sd_config"), true
}
func appendConsulScrapeWork(dst []ScrapeWork, sdc *consul.SDConfig, baseDir string, swc *scrapeWorkConfig) ([]ScrapeWork, bool) {
targetLabels, err := consul.GetLabels(sdc, baseDir)
if err != nil {

View file

@ -0,0 +1,39 @@
package dockerswarm
import (
"fmt"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutils"
)
var configMap = discoveryutils.NewConfigMap()
type apiConfig struct {
client *discoveryutils.Client
port int
}
func getAPIConfig(sdc *SDConfig, baseDir string) (*apiConfig, error) {
v, err := configMap.Get(sdc, func() (interface{}, error) { return newAPIConfig(sdc, baseDir) })
if err != nil {
return nil, err
}
return v.(*apiConfig), nil
}
func newAPIConfig(sdc *SDConfig, baseDir string) (*apiConfig, error) {
cfg := &apiConfig{
port: sdc.Port,
}
ac, err := promauth.NewConfig(baseDir, sdc.BasicAuth, sdc.BearerToken, sdc.BearerTokenFile, sdc.TLSConfig)
if err != nil {
return nil, err
}
client, err := discoveryutils.NewClient(sdc.Host, ac)
if err != nil {
return nil, fmt.Errorf("cannot create HTTP client for %q: %w", sdc.Host, err)
}
cfg.client = client
return cfg, nil
}

View file

@ -0,0 +1,40 @@
package dockerswarm
import (
"fmt"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
)
// SDConfig represents docker swarm service discovery configuration
//
// See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#dockerswarm_sd_config
type SDConfig struct {
Host string `yaml:"host"`
// TODO: add support for proxy_url
TLSConfig *promauth.TLSConfig `yaml:"tls_config"`
Role string `yaml:"role"`
Port int `yaml:"port"`
// refresh_interval is obtained from `-promscrape.dockerswarmSDCheckInterval` command-line option
BasicAuth *promauth.BasicAuthConfig `yaml:"basic_auth"`
BearerToken string `yaml:"bearer_token"`
BearerTokenFile string `yaml:"bearer_token_file"`
}
// GetLabels returns dockerswarm labels according to sdc.
func GetLabels(sdc *SDConfig, baseDir string) ([]map[string]string, error) {
cfg, err := getAPIConfig(sdc, baseDir)
if err != nil {
return nil, fmt.Errorf("cannot get API config: %w", err)
}
switch sdc.Role {
case "tasks":
return getTasksLabels(cfg)
case "services":
return getServicesLabels(cfg)
case "nodes":
return getNodesLabels(cfg)
default:
return nil, fmt.Errorf("unexpected `role`: %q; must be one of `tasks`, `services` or `nodes`; skipping it", sdc.Role)
}
}

View file

@ -0,0 +1,61 @@
package dockerswarm
import (
"encoding/json"
"fmt"
"strconv"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutils"
)
// See https://docs.docker.com/engine/api/v1.40/#tag/Network
type network struct {
ID string
Name string
Scope string
Internal bool
Ingress bool
Labels map[string]string
}
func getNetworksLabelsByNetworkID(cfg *apiConfig) (map[string]map[string]string, error) {
networks, err := getNetworks(cfg)
if err != nil {
return nil, err
}
return getNetworkLabelsByNetworkID(networks), nil
}
func getNetworks(cfg *apiConfig) ([]network, error) {
resp, err := cfg.client.GetAPIResponse("/networks")
if err != nil {
return nil, fmt.Errorf("cannot query dockerswarm api for networks: %w", err)
}
return parseNetworks(resp)
}
func parseNetworks(data []byte) ([]network, error) {
var networks []network
if err := json.Unmarshal(data, &networks); err != nil {
return nil, fmt.Errorf("cannot parse networks: %w", err)
}
return networks, nil
}
func getNetworkLabelsByNetworkID(networks []network) map[string]map[string]string {
ms := make(map[string]map[string]string)
for _, network := range networks {
m := map[string]string{
"__meta_dockerswarm_network_id": network.ID,
"__meta_dockerswarm_network_name": network.Name,
"__meta_dockerswarm_network_internal": strconv.FormatBool(network.Internal),
"__meta_dockerswarm_network_ingress": strconv.FormatBool(network.Ingress),
"__meta_dockerswarm_network_scope": network.Scope,
}
for k, v := range network.Labels {
m["__meta_dockerswarm_network_label_"+discoveryutils.SanitizeLabelName(k)] = v
}
ms[network.ID] = m
}
return ms
}

View file

@ -0,0 +1,173 @@
package dockerswarm
import (
"reflect"
"sort"
"testing"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutils"
)
func Test_addNetworkLabels(t *testing.T) {
type args struct {
networks []network
}
tests := []struct {
name string
args args
want [][]prompbmarshal.Label
}{
{
name: "ingress network",
args: args{
networks: []network{
{
ID: "qs0hog6ldlei9ct11pr3c77v1",
Ingress: true,
Scope: "swarm",
Name: "ingress",
Labels: map[string]string{
"key1": "value1",
},
},
},
},
want: [][]prompbmarshal.Label{
discoveryutils.GetSortedLabels(map[string]string{
"__meta_dockerswarm_network_id": "qs0hog6ldlei9ct11pr3c77v1",
"__meta_dockerswarm_network_ingress": "true",
"__meta_dockerswarm_network_internal": "false",
"__meta_dockerswarm_network_label_key1": "value1",
"__meta_dockerswarm_network_name": "ingress",
"__meta_dockerswarm_network_scope": "swarm",
})},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := getNetworkLabelsByNetworkID(tt.args.networks)
var networkIDs []string
for networkID := range got {
networkIDs = append(networkIDs, networkID)
}
sort.Strings(networkIDs)
var sortedLabelss [][]prompbmarshal.Label
for _, networkID := range networkIDs {
labels := got[networkID]
sortedLabelss = append(sortedLabelss, discoveryutils.GetSortedLabels(labels))
}
if !reflect.DeepEqual(sortedLabelss, tt.want) {
t.Errorf("addNetworkLabels() \ngot %v, \nwant %v", sortedLabelss, tt.want)
}
})
}
}
func Test_parseNetworks(t *testing.T) {
type args struct {
data []byte
}
tests := []struct {
name string
args args
want []network
wantErr bool
}{
{
name: "parse two networks",
args: args{
data: []byte(`[
{
"Name": "ingress",
"Id": "qs0hog6ldlei9ct11pr3c77v1",
"Created": "2020-10-06T08:39:58.957083331Z",
"Scope": "swarm",
"Driver": "overlay",
"EnableIPv6": false,
"IPAM": {
"Driver": "default",
"Options": null,
"Config": [
{
"Subnet": "10.0.0.0/24",
"Gateway": "10.0.0.1"
}
]
},
"Internal": false,
"Attachable": false,
"Ingress": true,
"ConfigFrom": {
"Network": ""
},
"ConfigOnly": false,
"Containers": null,
"Options": {
"com.docker.network.driver.overlay.vxlanid_list": "4096"
},
"Labels": {
"key1": "value1"
}
},
{
"Name": "host",
"Id": "317f0384d7e5f5c26304a0b04599f9f54bc08def4d0535059ece89955e9c4b7b",
"Created": "2020-10-06T08:39:52.843373136Z",
"Scope": "local",
"Driver": "host",
"EnableIPv6": false,
"IPAM": {
"Driver": "default",
"Options": null,
"Config": []
},
"Internal": false,
"Attachable": false,
"Ingress": false,
"ConfigFrom": {
"Network": ""
},
"ConfigOnly": false,
"Containers": {},
"Options": {},
"Labels": {
"key": "value"
}
}
]`),
},
want: []network{
{
ID: "qs0hog6ldlei9ct11pr3c77v1",
Ingress: true,
Scope: "swarm",
Name: "ingress",
Labels: map[string]string{
"key1": "value1",
},
},
{
ID: "317f0384d7e5f5c26304a0b04599f9f54bc08def4d0535059ece89955e9c4b7b",
Scope: "local",
Name: "host",
Labels: map[string]string{
"key": "value",
},
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := parseNetworks(tt.args.data)
if (err != nil) != tt.wantErr {
t.Errorf("parseNetworks() error = %v, wantErr %v", err, tt.wantErr)
return
}
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("parseNetworks() \ngot %v, \nwant %v", got, tt.want)
}
})
}
}

View file

@ -0,0 +1,88 @@
package dockerswarm
import (
"encoding/json"
"fmt"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutils"
)
// See https://docs.docker.com/engine/api/v1.40/#tag/Node
type node struct {
ID string
Spec struct {
Labels map[string]string
Role string
Availability string
}
Description struct {
Hostname string
Platform struct {
Architecture string
OS string
}
Engine struct {
EngineVersion string
}
}
Status struct {
State string
Message string
Addr string
}
ManagerStatus struct {
Leader bool
Reachability string
Addr string
}
}
func getNodesLabels(cfg *apiConfig) ([]map[string]string, error) {
nodes, err := getNodes(cfg)
if err != nil {
return nil, err
}
return addNodeLabels(nodes, cfg.port), nil
}
func getNodes(cfg *apiConfig) ([]node, error) {
resp, err := cfg.client.GetAPIResponse("/nodes")
if err != nil {
return nil, fmt.Errorf("cannot query dockerswarm api for nodes: %w", err)
}
return parseNodes(resp)
}
func parseNodes(data []byte) ([]node, error) {
var nodes []node
if err := json.Unmarshal(data, &nodes); err != nil {
return nil, fmt.Errorf("cannot parse nodes: %w", err)
}
return nodes, nil
}
func addNodeLabels(nodes []node, port int) []map[string]string {
var ms []map[string]string
for _, node := range nodes {
m := map[string]string{
"__address__": discoveryutils.JoinHostPort(node.Status.Addr, port),
"__meta_dockerswarm_node_address": node.Status.Addr,
"__meta_dockerswarm_node_availability": node.Spec.Availability,
"__meta_dockerswarm_node_engine_version": node.Description.Engine.EngineVersion,
"__meta_dockerswarm_node_hostname": node.Description.Hostname,
"__meta_dockerswarm_node_id": node.ID,
"__meta_dockerswarm_node_manager_address": node.ManagerStatus.Addr,
"__meta_dockerswarm_node_manager_leader": fmt.Sprintf("%t", node.ManagerStatus.Leader),
"__meta_dockerswarm_node_manager_reachability": node.ManagerStatus.Reachability,
"__meta_dockerswarm_node_platform_architecture": node.Description.Platform.Architecture,
"__meta_dockerswarm_node_platform_os": node.Description.Platform.OS,
"__meta_dockerswarm_node_role": node.Spec.Role,
"__meta_dockerswarm_node_status": node.Status.State,
}
for k, v := range node.Spec.Labels {
m["__meta_dockerswarm_node_label_"+discoveryutils.SanitizeLabelName(k)] = v
}
ms = append(ms, m)
}
return ms
}

View file

@ -0,0 +1,188 @@
package dockerswarm
import (
"reflect"
"testing"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutils"
)
func Test_parseNodes(t *testing.T) {
type args struct {
data []byte
}
tests := []struct {
name string
args args
want []node
wantErr bool
}{
{
name: "parse ok",
args: args{
data: []byte(`[
{
"ID": "qauwmifceyvqs0sipvzu8oslu",
"Version": {
"Index": 16
},
"Spec": {
"Role": "manager",
"Availability": "active"
},
"Description": {
"Hostname": "ip-172-31-40-97",
"Platform": {
"Architecture": "x86_64",
"OS": "linux"
},
"Resources": {
"NanoCPUs": 1000000000,
"MemoryBytes": 1026158592
},
"Engine": {
"EngineVersion": "19.03.11"
}
},
"Status": {
"State": "ready",
"Addr": "172.31.40.97"
}
}
]
`),
},
want: []node{
{
ID: "qauwmifceyvqs0sipvzu8oslu",
Spec: struct {
Labels map[string]string
Role string
Availability string
}{Role: "manager", Availability: "active"},
Status: struct {
State string
Message string
Addr string
}{State: "ready", Addr: "172.31.40.97"},
Description: struct {
Hostname string
Platform struct {
Architecture string
OS string
}
Engine struct{ EngineVersion string }
}{
Hostname: "ip-172-31-40-97",
Platform: struct {
Architecture string
OS string
}{
Architecture: "x86_64",
OS: "linux",
},
Engine: struct{ EngineVersion string }{
EngineVersion: "19.03.11",
},
},
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := parseNodes(tt.args.data)
if (err != nil) != tt.wantErr {
t.Errorf("parseNodes() error = %v, wantErr %v", err, tt.wantErr)
return
}
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("parseNodes() \ngot %v, \nwant %v", got, tt.want)
}
})
}
}
func Test_addNodeLabels(t *testing.T) {
type args struct {
nodes []node
port int
}
tests := []struct {
name string
args args
want [][]prompbmarshal.Label
}{
{
name: "add labels to one node",
args: args{
nodes: []node{
{
ID: "qauwmifceyvqs0sipvzu8oslu",
Spec: struct {
Labels map[string]string
Role string
Availability string
}{Role: "manager", Availability: "active"},
Status: struct {
State string
Message string
Addr string
}{State: "ready", Addr: "172.31.40.97"},
Description: struct {
Hostname string
Platform struct {
Architecture string
OS string
}
Engine struct{ EngineVersion string }
}{
Hostname: "ip-172-31-40-97",
Platform: struct {
Architecture string
OS string
}{
Architecture: "x86_64",
OS: "linux",
},
Engine: struct{ EngineVersion string }{
EngineVersion: "19.03.11",
},
},
},
},
port: 9100,
},
want: [][]prompbmarshal.Label{
discoveryutils.GetSortedLabels(map[string]string{
"__address__": "172.31.40.97:9100",
"__meta_dockerswarm_node_address": "172.31.40.97",
"__meta_dockerswarm_node_availability": "active",
"__meta_dockerswarm_node_engine_version": "19.03.11",
"__meta_dockerswarm_node_hostname": "ip-172-31-40-97",
"__meta_dockerswarm_node_manager_address": "",
"__meta_dockerswarm_node_manager_leader": "false",
"__meta_dockerswarm_node_manager_reachability": "",
"__meta_dockerswarm_node_id": "qauwmifceyvqs0sipvzu8oslu",
"__meta_dockerswarm_node_platform_architecture": "x86_64",
"__meta_dockerswarm_node_platform_os": "linux",
"__meta_dockerswarm_node_role": "manager",
"__meta_dockerswarm_node_status": "ready",
})},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := addNodeLabels(tt.args.nodes, tt.args.port)
var sortedLabelss [][]prompbmarshal.Label
for _, labels := range got {
sortedLabelss = append(sortedLabelss, discoveryutils.GetSortedLabels(labels))
}
if !reflect.DeepEqual(sortedLabelss, tt.want) {
t.Errorf("addNodeLabels() \ngot %v, \nwant %v", sortedLabelss, tt.want)
}
})
}
}

View file

@ -0,0 +1,141 @@
package dockerswarm
import (
"encoding/json"
"fmt"
"net"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutils"
)
// https://docs.docker.com/engine/api/v1.40/#tag/Service
type service struct {
ID string
Spec struct {
Labels map[string]string
Name string
TaskTemplate struct {
ContainerSpec struct {
Hostname string
Image string
}
}
Mode struct {
Global interface{}
Replicated interface{}
}
}
UpdateStatus struct {
State string
}
Endpoint struct {
Ports []portConfig
VirtualIPs []struct {
NetworkID string
Addr string
}
}
}
type portConfig struct {
Protocol string
Name string
PublishMode string
PublishedPort int
}
func getServicesLabels(cfg *apiConfig) ([]map[string]string, error) {
services, err := getServices(cfg)
if err != nil {
return nil, err
}
networksLabels, err := getNetworksLabelsByNetworkID(cfg)
if err != nil {
return nil, err
}
return addServicesLabels(services, networksLabels, cfg.port), nil
}
func getServices(cfg *apiConfig) ([]service, error) {
data, err := cfg.client.GetAPIResponse("/services")
if err != nil {
return nil, fmt.Errorf("cannot query dockerswarm api for services: %w", err)
}
return parseServicesResponse(data)
}
func parseServicesResponse(data []byte) ([]service, error) {
var services []service
if err := json.Unmarshal(data, &services); err != nil {
return nil, fmt.Errorf("cannot parse services: %w", err)
}
return services, nil
}
func getServiceMode(svc service) string {
if svc.Spec.Mode.Global != nil {
return "global"
}
if svc.Spec.Mode.Replicated != nil {
return "replicated"
}
return ""
}
func addServicesLabels(services []service, networksLabels map[string]map[string]string, port int) []map[string]string {
var ms []map[string]string
for _, service := range services {
commonLabels := map[string]string{
"__meta_dockerswarm_service_id": service.ID,
"__meta_dockerswarm_service_name": service.Spec.Name,
"__meta_dockerswarm_service_mode": getServiceMode(service),
"__meta_dockerswarm_service_task_container_hostname": service.Spec.TaskTemplate.ContainerSpec.Hostname,
"__meta_dockerswarm_service_task_container_image": service.Spec.TaskTemplate.ContainerSpec.Image,
"__meta_dockerswarm_service_updating_status": service.UpdateStatus.State,
}
for k, v := range service.Spec.Labels {
commonLabels["__meta_dockerswarm_service_label_"+discoveryutils.SanitizeLabelName(k)] = v
}
for _, vip := range service.Endpoint.VirtualIPs {
ip, _, err := net.ParseCIDR(vip.Addr)
if err != nil {
logger.Errorf("cannot parse: %q as cidr for service label add, err: %v", vip.Addr, err)
continue
}
added := false
for _, ep := range service.Endpoint.Ports {
if ep.Protocol != "tcp" {
continue
}
m := map[string]string{
"__address__": discoveryutils.JoinHostPort(ip.String(), ep.PublishedPort),
"__meta_dockerswarm_service_endpoint_port_name": ep.Name,
"__meta_dockerswarm_service_endpoint_port_publish_mode": ep.PublishMode,
}
for k, v := range commonLabels {
m[k] = v
}
for k, v := range networksLabels[vip.NetworkID] {
m[k] = v
}
added = true
ms = append(ms, m)
}
if !added {
m := map[string]string{
"__address__": discoveryutils.JoinHostPort(ip.String(), port),
}
for k, v := range commonLabels {
m[k] = v
}
for k, v := range networksLabels[vip.NetworkID] {
m[k] = v
}
ms = append(ms, m)
}
}
}
return ms
}

View file

@ -0,0 +1,294 @@
package dockerswarm
import (
"reflect"
"testing"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutils"
)
func Test_parseServicesResponse(t *testing.T) {
type args struct {
data []byte
}
tests := []struct {
name string
args args
want []service
wantErr bool
}{
{
name: "parse ok",
args: args{
data: []byte(`[
{
"ID": "tgsci5gd31aai3jyudv98pqxf",
"Version": {
"Index": 25
},
"CreatedAt": "2020-10-06T11:17:31.948808444Z",
"UpdatedAt": "2020-10-06T11:17:31.950195138Z",
"Spec": {
"Name": "redis2",
"Labels": {},
"TaskTemplate": {
"ContainerSpec": {
"Image": "redis:3.0.6@sha256:6a692a76c2081888b589e26e6ec835743119fe453d67ecf03df7de5b73d69842",
"Init": false,
"DNSConfig": {},
"Isolation": "default"
},
"Resources": {
"Limits": {},
"Reservations": {}
}
},
"Mode": {
"Replicated": {}
},
"EndpointSpec": {
"Mode": "vip",
"Ports": [
{
"Protocol": "tcp",
"TargetPort": 6379,
"PublishedPort": 8081,
"PublishMode": "ingress"
}
]
}
},
"Endpoint": {
"Spec": {
"Mode": "vip",
"Ports": [
{
"Protocol": "tcp",
"TargetPort": 6379,
"PublishedPort": 8081,
"PublishMode": "ingress"
}
]
},
"Ports": [
{
"Protocol": "tcp",
"TargetPort": 6379,
"PublishedPort": 8081,
"PublishMode": "ingress"
}
],
"VirtualIPs": [
{
"NetworkID": "qs0hog6ldlei9ct11pr3c77v1",
"Addr": "10.0.0.3/24"
}
]
}
}
]`),
},
want: []service{
{
ID: "tgsci5gd31aai3jyudv98pqxf",
Spec: struct {
Labels map[string]string
Name string
TaskTemplate struct {
ContainerSpec struct {
Hostname string
Image string
}
}
Mode struct {
Global interface{}
Replicated interface{}
}
}{
Labels: map[string]string{},
Name: "redis2",
TaskTemplate: struct {
ContainerSpec struct {
Hostname string
Image string
}
}{
ContainerSpec: struct {
Hostname string
Image string
}{
Hostname: "",
Image: "redis:3.0.6@sha256:6a692a76c2081888b589e26e6ec835743119fe453d67ecf03df7de5b73d69842",
},
},
Mode: struct {
Global interface{}
Replicated interface{}
}{
Replicated: map[string]interface{}{},
},
},
Endpoint: struct {
Ports []portConfig
VirtualIPs []struct {
NetworkID string
Addr string
}
}{Ports: []portConfig{
{
Protocol: "tcp",
PublishMode: "ingress",
PublishedPort: 8081,
},
}, VirtualIPs: []struct {
NetworkID string
Addr string
}{
{
NetworkID: "qs0hog6ldlei9ct11pr3c77v1",
Addr: "10.0.0.3/24",
},
},
},
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := parseServicesResponse(tt.args.data)
if (err != nil) != tt.wantErr {
t.Errorf("parseServicesResponse() error = %v, wantErr %v", err, tt.wantErr)
return
}
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("parseServicesResponse() \ngot %v, \nwant %v", got, tt.want)
}
})
}
}
func Test_addServicesLabels(t *testing.T) {
type args struct {
services []service
networksLabels map[string]map[string]string
port int
}
tests := []struct {
name string
args args
want [][]prompbmarshal.Label
}{
{
name: "add 2 services with network labels join",
args: args{
port: 9100,
networksLabels: map[string]map[string]string{
"qs0hog6ldlei9ct11pr3c77v1": {
"__meta_dockerswarm_network_id": "qs0hog6ldlei9ct11pr3c77v1",
"__meta_dockerswarm_network_ingress": "true",
"__meta_dockerswarm_network_internal": "false",
"__meta_dockerswarm_network_label_key1": "value1",
"__meta_dockerswarm_network_name": "ingress",
"__meta_dockerswarm_network_scope": "swarm",
},
},
services: []service{
{
ID: "tgsci5gd31aai3jyudv98pqxf",
Spec: struct {
Labels map[string]string
Name string
TaskTemplate struct {
ContainerSpec struct {
Hostname string
Image string
}
}
Mode struct {
Global interface{}
Replicated interface{}
}
}{
Labels: map[string]string{},
Name: "redis2",
TaskTemplate: struct {
ContainerSpec struct {
Hostname string
Image string
}
}{
ContainerSpec: struct {
Hostname string
Image string
}{
Hostname: "node1",
Image: "redis:3.0.6@sha256:6a692a76c2081888b589e26e6ec835743119fe453d67ecf03df7de5b73d69842",
},
},
Mode: struct {
Global interface{}
Replicated interface{}
}{
Replicated: map[string]interface{}{},
},
},
Endpoint: struct {
Ports []portConfig
VirtualIPs []struct {
NetworkID string
Addr string
}
}{Ports: []portConfig{
{
Protocol: "tcp",
Name: "redis",
PublishMode: "ingress",
},
}, VirtualIPs: []struct {
NetworkID string
Addr string
}{
{
NetworkID: "qs0hog6ldlei9ct11pr3c77v1",
Addr: "10.0.0.3/24",
},
},
},
},
},
},
want: [][]prompbmarshal.Label{
discoveryutils.GetSortedLabels(map[string]string{
"__address__": "10.0.0.3:0",
"__meta_dockerswarm_network_id": "qs0hog6ldlei9ct11pr3c77v1",
"__meta_dockerswarm_network_ingress": "true",
"__meta_dockerswarm_network_internal": "false",
"__meta_dockerswarm_network_label_key1": "value1",
"__meta_dockerswarm_network_name": "ingress",
"__meta_dockerswarm_network_scope": "swarm",
"__meta_dockerswarm_service_endpoint_port_name": "redis",
"__meta_dockerswarm_service_endpoint_port_publish_mode": "ingress",
"__meta_dockerswarm_service_id": "tgsci5gd31aai3jyudv98pqxf",
"__meta_dockerswarm_service_mode": "replicated",
"__meta_dockerswarm_service_name": "redis2",
"__meta_dockerswarm_service_task_container_hostname": "node1",
"__meta_dockerswarm_service_task_container_image": "redis:3.0.6@sha256:6a692a76c2081888b589e26e6ec835743119fe453d67ecf03df7de5b73d69842",
"__meta_dockerswarm_service_updating_status": "",
})},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := addServicesLabels(tt.args.services, tt.args.networksLabels, tt.args.port)
var sortedLabelss [][]prompbmarshal.Label
for _, labels := range got {
sortedLabelss = append(sortedLabelss, discoveryutils.GetSortedLabels(labels))
}
if !reflect.DeepEqual(sortedLabelss, tt.want) {
t.Errorf("addServicesLabels() \ngot %v, \nwant %v", sortedLabelss, tt.want)
}
})
}
}

View file

@ -0,0 +1,166 @@
package dockerswarm
import (
"encoding/json"
"fmt"
"net"
"strconv"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutils"
)
// See https://docs.docker.com/engine/api/v1.40/#tag/Task
type task struct {
ID string
ServiceID string
NodeID string
Labels map[string]string
DesiredState string
NetworksAttachments []struct {
Addresses []string
Network struct {
ID string
}
}
Status struct {
State string
ContainerStatus struct {
ContainerID string
}
PortStatus struct {
Ports []portConfig
}
}
Slot int
}
func getTasksLabels(cfg *apiConfig) ([]map[string]string, error) {
tasks, err := getTasks(cfg)
if err != nil {
return nil, err
}
services, err := getServices(cfg)
if err != nil {
return nil, err
}
networkLabels, err := getNetworksLabelsByNetworkID(cfg)
if err != nil {
return nil, err
}
svcLabels := addServicesLabels(services, networkLabels, cfg.port)
nodeLabels, err := getNodesLabels(cfg)
if err != nil {
return nil, err
}
return addTasksLabels(tasks, nodeLabels, svcLabels, networkLabels, services, cfg.port), nil
}
func getTasks(cfg *apiConfig) ([]task, error) {
resp, err := cfg.client.GetAPIResponse("/tasks")
if err != nil {
return nil, fmt.Errorf("cannot query dockerswarm api for tasks: %w", err)
}
return parseTasks(resp)
}
func parseTasks(data []byte) ([]task, error) {
var tasks []task
if err := json.Unmarshal(data, &tasks); err != nil {
return nil, fmt.Errorf("cannot parse tasks: %w", err)
}
return tasks, nil
}
func addTasksLabels(tasks []task, nodesLabels, servicesLabels []map[string]string, networksLabels map[string]map[string]string, services []service, port int) []map[string]string {
var ms []map[string]string
for _, task := range tasks {
commonLabels := map[string]string{
"__meta_dockerswarm_task_id": task.ID,
"__meta_dockerswarm_task_container_id": task.Status.ContainerStatus.ContainerID,
"__meta_dockerswarm_task_desired_state": task.DesiredState,
"__meta_dockerswarm_task_slot": strconv.Itoa(task.Slot),
"__meta_dockerswarm_task_state": task.Status.State,
}
for k, v := range task.Labels {
commonLabels["__meta_dockerswarm_task_label_"+discoveryutils.SanitizeLabelName(k)] = v
}
var svcPorts []portConfig
for i, v := range services {
if v.ID == task.ServiceID {
svcPorts = services[i].Endpoint.Ports
break
}
}
addLabels(commonLabels, servicesLabels, "__meta_dockerswarm_service_id", task.ServiceID)
addLabels(commonLabels, nodesLabels, "__meta_dockerswarm_node_id", task.NodeID)
for _, port := range task.Status.PortStatus.Ports {
if port.Protocol != "tcp" {
continue
}
m := map[string]string{
"__address__": discoveryutils.JoinHostPort(commonLabels["__meta_dockerswarm_node_address"], port.PublishedPort),
"__meta_dockerswarm_task_port_publish_mode": port.PublishMode,
}
for k, v := range commonLabels {
m[k] = v
}
ms = append(ms, m)
}
for _, na := range task.NetworksAttachments {
for _, address := range na.Addresses {
ip, _, err := net.ParseCIDR(address)
if err != nil {
logger.Errorf("cannot parse task network attachments address: %s as net CIDR: %v", address, err)
continue
}
added := false
for _, ep := range svcPorts {
if ep.Protocol != "tcp" {
continue
}
m := map[string]string{
"__address": discoveryutils.JoinHostPort(ip.String(), ep.PublishedPort),
"__meta_dockerswarm_task_port_publish_mode": ep.PublishMode,
}
for k, v := range commonLabels {
m[k] = v
}
for k, v := range networksLabels[na.Network.ID] {
m[k] = v
}
ms = append(ms, m)
added = true
}
if !added {
m := map[string]string{
"__address__": discoveryutils.JoinHostPort(ip.String(), port),
}
for k, v := range commonLabels {
m[k] = v
}
for k, v := range networksLabels[na.Network.ID] {
m[k] = v
}
ms = append(ms, m)
}
}
}
}
return ms
}
// addLabels adds lables from src to dst if they contain the given `key: value` pair.
func addLabels(dst map[string]string, src []map[string]string, key, value string) {
for _, m := range src {
if m[key] != value {
continue
}
for k, v := range m {
dst[k] = v
}
return
}
}

View file

@ -0,0 +1,357 @@
package dockerswarm
import (
"reflect"
"testing"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutils"
)
func Test_parseTasks(t *testing.T) {
type args struct {
data []byte
}
tests := []struct {
name string
args args
want []task
wantErr bool
}{
{
name: "parse ok",
args: args{
data: []byte(`[
{
"ID": "t4rdm7j2y9yctbrksiwvsgpu5",
"Version": {
"Index": 23
},
"Labels": {
"label1": "value1"
},
"Spec": {
"ContainerSpec": {
"Image": "redis:3.0.6@sha256:6a692a76c2081888b589e26e6ec835743119fe453d67ecf03df7de5b73d69842",
"Init": false
},
"Resources": {
"Limits": {},
"Reservations": {}
},
"Placement": {
"Platforms": [
{
"Architecture": "amd64",
"OS": "linux"
}
]
},
"ForceUpdate": 0
},
"ServiceID": "t91nf284wzle1ya09lqvyjgnq",
"Slot": 1,
"NodeID": "qauwmifceyvqs0sipvzu8oslu",
"Status": {
"State": "running",
"ContainerStatus": {
"ContainerID": "33034b69f6fa5f808098208752fd1fe4e0e1ca86311988cea6a73b998cdc62e8",
"ExitCode": 0
},
"PortStatus": {}
},
"DesiredState": "running"
}
]
`),
},
want: []task{
{
ID: "t4rdm7j2y9yctbrksiwvsgpu5",
ServiceID: "t91nf284wzle1ya09lqvyjgnq",
NodeID: "qauwmifceyvqs0sipvzu8oslu",
Labels: map[string]string{
"label1": "value1",
},
DesiredState: "running",
Slot: 1,
Status: struct {
State string
ContainerStatus struct{ ContainerID string }
PortStatus struct{ Ports []portConfig }
}{
State: "running",
ContainerStatus: struct{ ContainerID string }{
ContainerID: "33034b69f6fa5f808098208752fd1fe4e0e1ca86311988cea6a73b998cdc62e8",
},
PortStatus: struct{ Ports []portConfig }{}},
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := parseTasks(tt.args.data)
if (err != nil) != tt.wantErr {
t.Errorf("parseTasks() error = %v, wantErr %v", err, tt.wantErr)
return
}
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("parseTasks() got = %v, want %v", got, tt.want)
}
})
}
}
func Test_addTasksLabels(t *testing.T) {
type args struct {
tasks []task
nodesLabels []map[string]string
servicesLabels []map[string]string
networksLabels map[string]map[string]string
services []service
port int
}
tests := []struct {
name string
args args
want [][]prompbmarshal.Label
}{
{
name: "adds 1 task with nodes labels",
args: args{
port: 9100,
tasks: []task{
{
ID: "t4rdm7j2y9yctbrksiwvsgpu5",
ServiceID: "t91nf284wzle1ya09lqvyjgnq",
NodeID: "qauwmifceyvqs0sipvzu8oslu",
Labels: map[string]string{},
DesiredState: "running",
Slot: 1,
Status: struct {
State string
ContainerStatus struct{ ContainerID string }
PortStatus struct{ Ports []portConfig }
}{
State: "running",
ContainerStatus: struct{ ContainerID string }{
ContainerID: "33034b69f6fa5f808098208752fd1fe4e0e1ca86311988cea6a73b998cdc62e8",
},
PortStatus: struct{ Ports []portConfig }{
Ports: []portConfig{
{
PublishMode: "ingress",
Name: "redis",
Protocol: "tcp",
PublishedPort: 6379,
},
},
}},
},
},
nodesLabels: []map[string]string{
{
"__address__": "172.31.40.97:9100",
"__meta_dockerswarm_node_address": "172.31.40.97",
"__meta_dockerswarm_node_availability": "active",
"__meta_dockerswarm_node_engine_version": "19.03.11",
"__meta_dockerswarm_node_hostname": "ip-172-31-40-97",
"__meta_dockerswarm_node_id": "qauwmifceyvqs0sipvzu8oslu",
"__meta_dockerswarm_node_platform_architecture": "x86_64",
"__meta_dockerswarm_node_platform_os": "linux",
"__meta_dockerswarm_node_role": "manager",
"__meta_dockerswarm_node_status": "ready",
},
},
},
want: [][]prompbmarshal.Label{
discoveryutils.GetSortedLabels(map[string]string{
"__address__": "172.31.40.97:9100",
"__meta_dockerswarm_node_address": "172.31.40.97",
"__meta_dockerswarm_node_availability": "active",
"__meta_dockerswarm_node_engine_version": "19.03.11",
"__meta_dockerswarm_node_hostname": "ip-172-31-40-97",
"__meta_dockerswarm_node_id": "qauwmifceyvqs0sipvzu8oslu",
"__meta_dockerswarm_node_platform_architecture": "x86_64",
"__meta_dockerswarm_node_platform_os": "linux",
"__meta_dockerswarm_node_role": "manager",
"__meta_dockerswarm_node_status": "ready",
"__meta_dockerswarm_task_container_id": "33034b69f6fa5f808098208752fd1fe4e0e1ca86311988cea6a73b998cdc62e8",
"__meta_dockerswarm_task_desired_state": "running",
"__meta_dockerswarm_task_id": "t4rdm7j2y9yctbrksiwvsgpu5",
"__meta_dockerswarm_task_port_publish_mode": "ingress",
"__meta_dockerswarm_task_slot": "1",
"__meta_dockerswarm_task_state": "running",
})},
},
{
name: "adds 1 task with nodes, network and services labels",
args: args{
port: 9100,
tasks: []task{
{
ID: "t4rdm7j2y9yctbrksiwvsgpu5",
ServiceID: "tgsci5gd31aai3jyudv98pqxf",
NodeID: "qauwmifceyvqs0sipvzu8oslu",
Labels: map[string]string{},
DesiredState: "running",
Slot: 1,
NetworksAttachments: []struct {
Addresses []string
Network struct{ ID string }
}{
{
Network: struct {
ID string
}{
ID: "qs0hog6ldlei9ct11pr3c77v1",
},
Addresses: []string{"10.10.15.15/24"},
},
},
Status: struct {
State string
ContainerStatus struct{ ContainerID string }
PortStatus struct{ Ports []portConfig }
}{
State: "running",
ContainerStatus: struct{ ContainerID string }{
ContainerID: "33034b69f6fa5f808098208752fd1fe4e0e1ca86311988cea6a73b998cdc62e8",
},
PortStatus: struct{ Ports []portConfig }{}},
},
},
networksLabels: map[string]map[string]string{
"qs0hog6ldlei9ct11pr3c77v1": {
"__meta_dockerswarm_network_id": "qs0hog6ldlei9ct11pr3c77v1",
"__meta_dockerswarm_network_ingress": "true",
"__meta_dockerswarm_network_internal": "false",
"__meta_dockerswarm_network_label_key1": "value1",
"__meta_dockerswarm_network_name": "ingress",
"__meta_dockerswarm_network_scope": "swarm",
},
},
nodesLabels: []map[string]string{
{
"__address__": "172.31.40.97:9100",
"__meta_dockerswarm_node_address": "172.31.40.97",
"__meta_dockerswarm_node_availability": "active",
"__meta_dockerswarm_node_engine_version": "19.03.11",
"__meta_dockerswarm_node_hostname": "ip-172-31-40-97",
"__meta_dockerswarm_node_id": "qauwmifceyvqs0sipvzu8oslu",
"__meta_dockerswarm_node_platform_architecture": "x86_64",
"__meta_dockerswarm_node_platform_os": "linux",
"__meta_dockerswarm_node_role": "manager",
"__meta_dockerswarm_node_status": "ready",
},
},
services: []service{
{
ID: "tgsci5gd31aai3jyudv98pqxf",
Spec: struct {
Labels map[string]string
Name string
TaskTemplate struct {
ContainerSpec struct {
Hostname string
Image string
}
}
Mode struct {
Global interface{}
Replicated interface{}
}
}{
Labels: map[string]string{},
Name: "redis2",
TaskTemplate: struct {
ContainerSpec struct {
Hostname string
Image string
}
}{
ContainerSpec: struct {
Hostname string
Image string
}{
Hostname: "node1",
Image: "redis:3.0.6@sha256:6a692a76c2081888b589e26e6ec835743119fe453d67ecf03df7de5b73d69842",
},
},
Mode: struct {
Global interface{}
Replicated interface{}
}{
Replicated: map[string]interface{}{},
},
},
Endpoint: struct {
Ports []portConfig
VirtualIPs []struct {
NetworkID string
Addr string
}
}{
Ports: []portConfig{
{
Protocol: "tcp",
Name: "redis",
PublishMode: "ingress",
},
}, VirtualIPs: []struct {
NetworkID string
Addr string
}{
{
NetworkID: "qs0hog6ldlei9ct11pr3c77v1",
Addr: "10.0.0.3/24",
},
},
},
},
},
servicesLabels: []map[string]string{},
},
want: [][]prompbmarshal.Label{
discoveryutils.GetSortedLabels(map[string]string{
"__address": "10.10.15.15:0",
"__address__": "172.31.40.97:9100",
"__meta_dockerswarm_network_id": "qs0hog6ldlei9ct11pr3c77v1",
"__meta_dockerswarm_network_ingress": "true",
"__meta_dockerswarm_network_internal": "false",
"__meta_dockerswarm_network_label_key1": "value1",
"__meta_dockerswarm_network_name": "ingress",
"__meta_dockerswarm_network_scope": "swarm",
"__meta_dockerswarm_node_address": "172.31.40.97",
"__meta_dockerswarm_node_availability": "active",
"__meta_dockerswarm_node_engine_version": "19.03.11",
"__meta_dockerswarm_node_hostname": "ip-172-31-40-97",
"__meta_dockerswarm_node_id": "qauwmifceyvqs0sipvzu8oslu",
"__meta_dockerswarm_node_platform_architecture": "x86_64",
"__meta_dockerswarm_node_platform_os": "linux",
"__meta_dockerswarm_node_role": "manager",
"__meta_dockerswarm_node_status": "ready",
"__meta_dockerswarm_task_container_id": "33034b69f6fa5f808098208752fd1fe4e0e1ca86311988cea6a73b998cdc62e8",
"__meta_dockerswarm_task_desired_state": "running",
"__meta_dockerswarm_task_id": "t4rdm7j2y9yctbrksiwvsgpu5",
"__meta_dockerswarm_task_port_publish_mode": "ingress",
"__meta_dockerswarm_task_slot": "1",
"__meta_dockerswarm_task_state": "running",
}),
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := addTasksLabels(tt.args.tasks, tt.args.nodesLabels, tt.args.servicesLabels, tt.args.networksLabels, tt.args.services, tt.args.port)
var sortedLabelss [][]prompbmarshal.Label
for _, labels := range got {
sortedLabelss = append(sortedLabelss, discoveryutils.GetSortedLabels(labels))
}
if !reflect.DeepEqual(sortedLabelss, tt.want) {
t.Errorf("addTasksLabels() \ngot %v, \nwant %v", sortedLabelss, tt.want)
}
})
}
}

View file

@ -75,12 +75,12 @@ func newAPIConfig(sdc *SDConfig, baseDir string) (*apiConfig, error) {
port: sdc.Port,
}
if sdc.TLSConfig != nil {
config, err := promauth.NewConfig(baseDir, nil, "", "", sdc.TLSConfig)
ac, err := promauth.NewConfig(baseDir, nil, "", "", sdc.TLSConfig)
if err != nil {
return nil, err
}
cfg.client.Transport = &http.Transport{
TLSClientConfig: config.NewTLSConfig(),
TLSClientConfig: ac.NewTLSConfig(),
}
}
// use public compute endpoint by default

View file

@ -41,11 +41,23 @@ type Client struct {
// NewClient returns new Client for the given apiServer and the given ac.
func NewClient(apiServer string, ac *promauth.Config) (*Client, error) {
var u fasthttp.URI
var (
dialFunc fasthttp.DialFunc
tlsCfg *tls.Config
u fasthttp.URI
)
u.Update(apiServer)
// special case for unix socket connection
if string(u.Scheme()) == "unix" {
dialAddr := string(u.Path())
apiServer = "http://"
dialFunc = func(_ string) (net.Conn, error) {
return net.Dial("unix", dialAddr)
}
}
hostPort := string(u.Host())
isTLS := string(u.Scheme()) == "https"
var tlsCfg *tls.Config
if isTLS && ac != nil {
tlsCfg = ac.NewTLSConfig()
}
@ -66,6 +78,7 @@ func NewClient(apiServer string, ac *promauth.Config) (*Client, error) {
WriteTimeout: 10 * time.Second,
MaxResponseBodySize: 300 * 1024 * 1024,
MaxConns: 2 * *maxConcurrency,
Dial: dialFunc,
}
return &Client{
hc: hc,

View file

@ -36,9 +36,11 @@ var (
gceSDCheckInterval = flag.Duration("promscrape.gceSDCheckInterval", time.Minute, "Interval for checking for changes in gce. "+
"This works only if `gce_sd_configs` is configured in '-promscrape.config' file. "+
"See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#gce_sd_config for details")
dockerswarmSDCheckInterval = flag.Duration("promscrape.dockerswarmSDCheckInterval", 30*time.Second, "Interval for checking for changes in dockerswarm. "+
"This works only if `dockerswarm_sd_configs` is configured in '-promscrape.config' file. "+
"See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#dockerswarm_sd_config for details")
promscrapeConfigFile = flag.String("promscrape.config", "", "Optional path to Prometheus config file with 'scrape_configs' section containing targets to scrape. "+
"See https://victoriametrics.github.io/#how-to-scrape-prometheus-exporters-such-as-node-exporter for details")
suppressDuplicateScrapeTargetErrors = flag.Bool("promscrape.suppressDuplicateScrapeTargetErrors", false, "Whether to suppress `duplicate scrape target` errors; "+
"see https://victoriametrics.github.io/vmagent.html#troubleshooting for details")
)
@ -96,6 +98,7 @@ func runScraper(configFile string, pushData func(wr *prompbmarshal.WriteRequest)
scs.add("dns_sd_configs", *dnsSDCheckInterval, func(cfg *Config, swsPrev []ScrapeWork) []ScrapeWork { return cfg.getDNSSDScrapeWork(swsPrev) })
scs.add("ec2_sd_configs", *ec2SDCheckInterval, func(cfg *Config, swsPrev []ScrapeWork) []ScrapeWork { return cfg.getEC2SDScrapeWork(swsPrev) })
scs.add("gce_sd_configs", *gceSDCheckInterval, func(cfg *Config, swsPrev []ScrapeWork) []ScrapeWork { return cfg.getGCESDScrapeWork(swsPrev) })
scs.add("dockerswarm_sd_configs", *dockerswarmSDCheckInterval, func(cfg *Config, swsPrev []ScrapeWork) []ScrapeWork { return cfg.getDockerSwarmSDScrapeWork(swsPrev) })
sighupCh := procutil.NewSighupChan()

View file

@ -318,6 +318,7 @@ func (s *Storage) idb() *indexDB {
// Metrics contains essential metrics for the Storage.
type Metrics struct {
RowsAddedTotal uint64
DedupsDuringMerge uint64
TooSmallTimestampRows uint64
@ -386,6 +387,7 @@ func (m *Metrics) Reset() {
// UpdateMetrics updates m with metrics from s.
func (s *Storage) UpdateMetrics(m *Metrics) {
m.RowsAddedTotal = atomic.LoadUint64(&rowsAddedTotal)
m.DedupsDuringMerge = atomic.LoadUint64(&dedupsDuringMerge)
m.TooSmallTimestampRows += atomic.LoadUint64(&s.tooSmallTimestampRows)
@ -1051,11 +1053,14 @@ func (s *Storage) ForceMergePartitions(partitionNamePrefix string) error {
return s.tb.ForceMergePartitions(partitionNamePrefix)
}
var rowsAddedTotal uint64
// AddRows adds the given mrs to s.
func (s *Storage) AddRows(mrs []MetricRow, precisionBits uint8) error {
if len(mrs) == 0 {
return nil
}
atomic.AddUint64(&rowsAddedTotal, uint64(len(mrs)))
// Limit the number of concurrent goroutines that may add rows to the storage.
// This should prevent from out of memory errors and CPU trashing when too many