app/vmagent: code cleanup for Kafka and Google PubSub consumers / producers

- Add links to relevant docs into descriptions for every -kafka.* and -gcp.pubsub.* command-line flags.
- Wait until message processing goroutines are stopped before returning from gcppubsub.Stop().
- Prevent from multiple calls to Init() without Stop().
- Drop message if tenantID cannot be parsed properly.
- Take into account tenantID for all the supported message formats.
- Support gzip-compressed messages for graphite format.
- Use exponential backoff sleep when the message cannot be pushed to remote storage systems
  because of disabled on-disk persistence - https://docs.victoriametrics.com/vmagent.html#disabling-on-disk-persistence
- Unblock from sleep as soon as Stop() is called. Previously the sleep could take up to 2 seconds after Stop() is called.
- Remove unused globalCtx and initContext from app/vmagent/remotewrite/gcppubsub
- Mention Google PubSub support at docs/enterprise.md
- Make Google PubSub docs more clear at docs/vmagent.md

This is a follow-up for commits 115245924a5f096c5a3383d6cc8e8b6fbd421984
and e6eab781ce42285a6a1750dc01eba6801dd35516 .

Updates https://github.com/VictoriaMetrics/VictoriaMetrics-enterprise/pull/717
Updates https://github.com/VictoriaMetrics/VictoriaMetrics-enterprise/pull/713
This commit is contained in:
Aliaksandr Valialkin 2023-12-04 22:10:00 +02:00
parent a28cc6ebec
commit 0160435802
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB
7 changed files with 176 additions and 112 deletions

View file

@ -5,6 +5,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/common"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/remotewrite"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/graphite"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/graphite/stream"
@ -20,10 +21,12 @@ var (
//
// See https://graphite.readthedocs.io/en/latest/feeding-carbon.html#the-plaintext-protocol
func InsertHandler(r io.Reader) error {
return stream.Parse(r, insertRows)
return stream.Parse(r, false, func(rows []parser.Row) error {
return insertRows(nil, rows)
})
}
func insertRows(rows []parser.Row) error {
func insertRows(at *auth.Token, rows []parser.Row) error {
ctx := common.GetPushCtx()
defer common.PutPushCtx(ctx)
@ -56,7 +59,7 @@ func insertRows(rows []parser.Row) error {
ctx.WriteRequest.Timeseries = tssDst
ctx.Labels = labels
ctx.Samples = samples
if !remotewrite.TryPush(nil, &ctx.WriteRequest) {
if !remotewrite.TryPush(at, &ctx.WriteRequest) {
return remotewrite.ErrQueueFullHTTPRetry
}
rowsInserted.Add(len(rows))

View file

@ -36,9 +36,9 @@ var (
// InsertHandlerForReader processes remote write for influx line protocol.
//
// See https://github.com/influxdata/telegraf/tree/master/plugins/inputs/socket_listener/
func InsertHandlerForReader(r io.Reader, isGzipped bool) error {
func InsertHandlerForReader(at *auth.Token, r io.Reader, isGzipped bool) error {
return stream.Parse(r, isGzipped, "", "", func(db string, rows []parser.Row) error {
return insertRows(nil, db, rows, nil)
return insertRows(at, db, rows, nil)
})
}

View file

@ -124,7 +124,7 @@ func main() {
common.StartUnmarshalWorkers()
if len(*influxListenAddr) > 0 {
influxServer = influxserver.MustStart(*influxListenAddr, *influxUseProxyProtocol, func(r io.Reader) error {
return influx.InsertHandlerForReader(r, false)
return influx.InsertHandlerForReader(nil, r, false)
})
}
if len(*graphiteListenAddr) > 0 {

View file

@ -19,7 +19,7 @@ var (
//
// See https://graphite.readthedocs.io/en/latest/feeding-carbon.html#the-plaintext-protocol
func InsertHandler(r io.Reader) error {
return stream.Parse(r, insertRows)
return stream.Parse(r, false, insertRows)
}
func insertRows(rows []parser.Row) error {

View file

@ -51,6 +51,7 @@ plus the following additional features:
- [Advanced auth and rate limiter](https://docs.victoriametrics.com/vmgateway.html).
- [mTLS for cluster components](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#mtls-protection).
- [Kafka integration](https://docs.victoriametrics.com/vmagent.html#kafka-integration).
- [Google PubSub integration](https://docs.victoriametrics.com/vmagent.html#google-pubsub-integration).
- [Multitenant support in vmalert](https://docs.victoriametrics.com/vmalert.html#multitenancy).
- [Ability to read alerting and recording rules from Object Storage](https://docs.victoriametrics.com/vmalert.html#reading-rules-from-object-storage).
- [Ability to filter incoming requests by IP at vmauth](https://docs.victoriametrics.com/vmauth.html#ip-filters).

View file

@ -52,8 +52,8 @@ additionally to [discovering Prometheus-compatible targets and scraping metrics
and [high churn rate](https://docs.victoriametrics.com/FAQ.html#what-is-high-churn-rate) issues by limiting the number of unique time series at scrape time
and before sending them to remote storage systems. See [these docs](#cardinality-limiter).
* Can write collected metrics to multiple tenants. See [these docs](#multitenancy).
* Can read data from Kafka. See [these docs](#reading-metrics-from-kafka).
* Can write data to Kafka. See [these docs](#writing-metrics-to-kafka).
* Can read and write data from / to Kafka. See [these docs](#kafka-integration).
* Can read and write data from / to Google PubSub. See [these docs](#google-pubsub-integration).
## Quick Start
@ -1107,18 +1107,54 @@ If you have suggestions for improvements or have found a bug - please open an is
See also [general troubleshooting docs](https://docs.victoriametrics.com/Troubleshooting.html).
## Google PubSub integration
[Enterprise version](https://docs.victoriametrics.com/enterprise.html) of `vmagent` can read and write metrics from / to google [PubSub](https://cloud.google.com/pubsub):
## Calculating disk space for persistence queue
`vmagent` buffers collected metrics on disk at the directory specified via `-remoteWrite.tmpDataPath` command-line flag
until the metrics are sent to remote storage configured via `-remoteWrite.url` command-line flag.
The `-remoteWrite.tmpDataPath` directory can grow large when remote storage is unavailable for extended
periods of time and if the maximum directory size isn't limited with `-remoteWrite.maxDiskUsagePerURL` command-line flag.
To estimate the allocated disk size for persistent queue, or to estimate `-remoteWrite.maxDiskUsagePerURL` command-line flag value,
take into account the following attributes:
1. The **size in bytes** of data stream sent by vmagent:
Run the query `sum(rate(vmagent_remotewrite_bytes_sent_total[1h])) by(instance,url)` in [vmui](https://docs.victoriametrics.com/#vmui)
or Grafana to get the amount of bytes sent by each vmagent instance per second.
1. The amount of **time** a persistent queue should keep the data before starting to drop it.
For example, if `vmagent` should be able to buffer the data for at least 6 hours, then the following query
can be used for estimating the needed amounts of disk space in gigabytes:
```
sum(rate(vmagent_remotewrite_bytes_sent_total[1h])) by(instance,url) * 6h / 1Gi
```
Additional notes:
1. Ensure that `vmagent` [monitoring](#monitoring) is configured properly.
1. Re-evaluate the estimation each time when:
* there is an increase in the vmagent's workload
* there is a change in [relabeling rules](https://docs.victoriametrics.com/vmagent.html#relabeling) which could increase the amount metrics to send
* there is a change in number of configured `-remoteWrite.url` addresses
1. The minimum disk size to allocate for the persistent queue is 500Mi per each `-remoteWrite.url`.
1. On-disk persistent queue can be disabled if needed. See [these docs](https://docs.victoriametrics.com/vmagent.html#disabling-on-disk-persistence).
## Google PubSub integration
[Enterprise version](https://docs.victoriametrics.com/enterprise.html) of `vmagent` can read and write metrics from / to [Google PubSub](https://cloud.google.com/pubsub):
### Reading metrics from PubSub
[Enterprise version](https://docs.victoriametrics.com/enterprise.html) of `vmagent` can read metrics in various formats from Pub/Sub messages.
`-gcp.pubsub.subscribe.defaultMessageFormat` and `-gcp.pubsub.subscribe.topicSubscription.messageFormat` allow you to configure the message format.
We support the following options:
[Enterprise version](https://docs.victoriametrics.com/enterprise.html) of `vmagent` can read metrics in various formats from Google PubSub messages.
`-gcp.pubsub.subscribe.defaultMessageFormat` and `-gcp.pubsub.subscribe.topicSubscription.messageFormat` command-line flags allow configuring the needed message format.
The following message formats are supported:
* `promremotewrite` - [Prometheus remote_write](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#remote_write).
Messages in this format can be sent by vmagent - see [these docs](#writing-metrics-to-pubsub).
* `influx` - [InfluxDB line protocol format](https://docs.influxdata.com/influxdb/v1.7/write_protocols/line_protocol_tutorial/).
* `influx` - [InfluxDB line protocol format](https://docs.influxdata.com/influxdb/cloud/reference/syntax/line-protocol/).
* `prometheus` - [Prometheus text exposition format](https://github.com/prometheus/docs/blob/master/content/docs/instrumenting/exposition_formats.md#text-based-format)
and [OpenMetrics format](https://github.com/OpenObservability/OpenMetrics/blob/master/specification/OpenMetrics.md).
* `graphite` - [Graphite plaintext format](https://graphite.readthedocs.io/en/latest/feeding-carbon.html#the-plaintext-protocol).
@ -1126,22 +1162,22 @@ We support the following options:
Every PubSub message may contain multiple lines in `influx`, `prometheus`, `graphite` and `jsonline` format delimited by `\n`.
`vmagent` consumes messages from PubSub topic subscriptions specified by `-gcp.pubsub.subscribe.topicSubscription` command-line flag. You can configure the multiple topics by specifying -gcp.pubsub.subscribe.topicSubscription command-line flags in vmagent
Multiple topics can be specified
by passing multiple `-gcp.pubsub.subscribe.topicSubscription` command-line flags to `vmagent`.
`vmagent` consumes messages from PubSub topic subscriptions specified by `-gcp.pubsub.subscribe.topicSubscription` command-line flag.
Multiple topics can be specified by passing multiple `-gcp.pubsub.subscribe.topicSubscription` command-line flags to `vmagent`.
`vmagent` uses standard google authorization mechanism for topic access. It's possible to specify credentials directly via flag `-gcp.pubsub.subscribe.credentialsFile=`.
`vmagent` uses standard Google authorization mechanism for topic access. It's possible to specify credentials directly via `-gcp.pubsub.subscribe.credentialsFile` command-line flag.
The following command starts `vmagent`, which reads metrics in InfluxDB line protocol format from PubSub topic: `projects/victoriametrics-vmagent-pub-sub-test/subscriptions/telegraf-testing`
from the topic `telegraf-testing` and sends them to remote storage at `http://localhost:8428/api/v1/write`:
For example, the following command starts `vmagent`, which reads metrics in [InfluxDB line protocol format](https://docs.influxdata.com/influxdb/cloud/reference/syntax/line-protocol/)
from PubSub `projects/victoriametrics-vmagent-pub-sub-test/subscriptions/telegraf-testing` and sends them to remote storage at `http://localhost:8428/api/v1/write`:
```bash
./bin/vmagent -remoteWrite.url=http://localhost:8428/api/v1/write \
-gcp.pubsub.subscribe.topicSubscription=projects/victoriametrics-vmagent-pub-sub-test/subscriptions/telegraf-testing
-gcp.pubsub.subscribe.topicSubscription=projects/victoriametrics-vmagent-pub-sub-test/subscriptions/telegraf-testing \
-gcp.pubsub.subscribe.topicSubscription.messageFormat=influx
```
It is expected that [Telegraf](https://github.com/influxdata/telegraf) sends metrics to the `metrics-by-telegraf` topic with the following config:
It is expected that [Telegraf](https://github.com/influxdata/telegraf) sends metrics to the `telegraf-testing` topic at the `victoriametrics-vmagent-pub-sub-test` project
with the following config:
```yaml
[[outputs.cloud_pubsub]]
@ -1152,17 +1188,18 @@ It is expected that [Telegraf](https://github.com/influxdata/telegraf) sends met
#### Consume metrics from multiple topics
It's possible to configure message consumption from multiple topics. In this case, command-line flag arguments must have the same position at corresponding values.
For example, given configuration configures message reading from:
1) project `victoriametrics-vmagent-pub-sub-test` topic: `telegraf-testing` message encoding `influx` without `gzip` compression
2) project `victoriametrics-vmagent-pub-sub-test` topic: `json-line-testing` message encoding `jsonline` with `gzip` compression
`vmagent` can read messages from different topics in different formats. For example, the following command starts `vmagent`, which reads plaintext
[Influx](https://docs.influxdata.com/influxdb/cloud/reference/syntax/line-protocol/) messages from `telegraf-testing` topic
and gzipp'ed [JSON line](https://docs.victoriametrics.com/#json-line-format) messages from `json-line-testing` topic:
```bash
./bin/vmagent -remoteWrite.url=http://localhost:8428/api/v1/write \
-gcp.pubsub.subscribe.topicSubscription=projects/victoriametrics-vmagent-pub-sub-test/subscriptions/telegraf-testing,projects/victoriametrics-vmagent-pub-sub-test/subscriptions/json-line-testing
-gcp.pubsub.subscribe.topicSubscription.messageFormat=influx,jsonline
-gcp.pubsub.subscribe.topicSubscription.isGzipped=false,true
-gcp.pubsub.subscribe.topicSubscription=projects/victoriametrics-vmagent-pub-sub-test/subscriptions/telegraf-testing \
-gcp.pubsub.subscribe.topicSubscription.messageFormat=influx \
-gcp.pubsub.subscribe.topicSubscription.isGzipped=false \
-gcp.pubsub.subscribe.topicSubscription=projects/victoriametrics-vmagent-pub-sub-test/subscriptions/json-line-testing \
-gcp.pubsub.subscribe.topicSubscription.messageFormat=jsonline \
-gcp.pubsub.subscribe.topicSubscription.isGzipped=true
```
#### Command-line flags for PubSub consumer
@ -1173,32 +1210,31 @@ which can be downloaded for evaluation from [releases](https://github.com/Victor
```text
-gcp.pubsub.subscribe.credentialsFile string
Path to file with GCP credentials to use for PubSub client. If not set, default credentials will be used (see Workload Identity for K8S, or https://cloud.google.com/docs/authentication/application-default-credentials. This flag is available only in Enterprise binaries. See https://docs.victoriametrics.com/enterprise.html
Path to file with GCP credentials to use for PubSub client. If not set, default credentials are used (see Workload Identity for K8S or https://cloud.google.com/docs/authentication/application-default-credentials ). See https://docs.victoriametrics.com/vmagent.html#reading-metrics-from-pubsub . This flag is available only in Enterprise binaries. See https://docs.victoriametrics.com/enterprise.html
-gcp.pubsub.subscribe.defaultMessageFormat string
Expected data format in the topic if -gcp.pubsub.consumer.topicSubscription.messageFormat is skipped. This flag is available only in Enterprise binaries. See https://docs.victoriametrics.com/enterprise.html (default "promremotewrite")
Default message format if -gcp.pubsub.subscribe.topicSubscription.messageFormat is missing. See https://docs.victoriametrics.com/vmagent.html#reading-metrics-from-pubsub . This flag is available only in Enterprise binaries. See https://docs.victoriametrics.com/enterprise.html (default "promremotewrite")
-gcp.pubsub.subscribe.topicSubscription array
project topic subscription url in form: projects/<project-id>/subscriptions/<subscription-name> This flag is available only in Enterprise binaries. See https://docs.victoriametrics.com/enterprise.html
GCP PubSub topic subscription in the format: projects/<project-id>/subscriptions/<subscription-name>. See https://docs.victoriametrics.com/vmagent.html#reading-metrics-from-pubsub . This flag is available only in Enterprise binaries. See https://docs.victoriametrics.com/enterprise.html
Supports an array of values separated by comma or specified via multiple flags.
-gcp.pubsub.subscribe.topicSubscription.concurrency array
Configures the number of concurrently processed messages for topic subscription specified via -gcp.pubsub.consumer.topicSubscription flag.This flag is available only in Enterprise binaries. See https://docs.victoriametrics.com/enterprise.html (default 0)
The number of concurrently processed messages for topic subscription specified via -gcp.pubsub.subscribe.topicSubscription flag. See https://docs.victoriametrics.com/vmagent.html#reading-metrics-from-pubsub . This flag is available only in Enterprise binaries. See https://docs.victoriametrics.com/enterprise.html (default 0)
Supports array of values separated by comma or specified via multiple flags.
-gcp.pubsub.subscribe.topicSubscription.messageformat array
payload format for corresponding pubsub topic subscription. valid formats: influx, prometheus, promremotewrite, graphite, jsonline . this flag is available only in enterprise binaries. see https://docs.victoriametrics.com/enterprise.html
supports an array of values separated by comma or specified via multiple flags.
-gcp.pubsub.subscribe.topicSubscription.isGzipped array
Enables gzip decompression for topic subscription messages payload. Only prometheus, jsonline and influx formats accept gzipped messages.This flag is available only in Enterprise binaries. See https://docs.victoriametrics.com/enterprise.html
Enables gzip decompression for messages payload at the corresponding -gcp.pubsub.subscribe.topicSubscription. Only prometheus, jsonline, graphite and influx formats accept gzipped messages. See https://docs.victoriametrics.com/vmagent.html#reading-metrics-from-pubsub . This flag is available only in Enterprise binaries. See https://docs.victoriametrics.com/enterprise.html
Supports array of values separated by comma or specified via multiple flags.
-gcp.pubsub.subscribe.topicSubscription.messageFormat array
Message format for the corresponding -gcp.pbusub.subcribe.topicSubscription. Valid formats: influx, prometheus, promremotewrite, graphite, jsonline . See https://docs.victoriametrics.com/vmagent.html#reading-metrics-from-pubsub . This flag is available only in Enterprise binaries. See https://docs.victoriametrics.com/enterprise.html
Supports an array of values separated by comma or specified via multiple flags.
```
### Writing metrics to PubSub
[Enterprise version](https://docs.victoriametrics.com/enterprise.html) of `vmagent` writes data into PubSub if url contains `pubsub` prefix. For example, with remote write url:
`--remoteWrite.url=pubsub:projects/victoriametrics-vmagent-publish-test/topics/testing-pubsub-push`.
[Enterprise version](https://docs.victoriametrics.com/enterprise.html) of `vmagent` writes data into Google PubSub if `-remoteWrite.url` command-line flag starts witht `pubsub:` prefix.
For example, `-remoteWrite.url=pubsub:projects/victoriametrics-vmagent-publish-test/topics/testing-pubsub-push`.
These messages can be read later from PubSub by another `vmagent` - see [these docs](#reading-metrics-from-pubsub) for details.
These messages can be read later from Google PubSub by another `vmagent` instance - see [these docs](#reading-metrics-from-pubsub) for details.
`vmagent` uses a standard Google authorization mechanism for topic access. It's possible to specify credentials directly via the flag `-gcp.pubsub.subscribe.credentialsFile=`.
`vmagent` uses standard Google authorization mechanism for PubSub topic access. Custom auth credentials can be specified via `-gcp.pubsub.subscribe.credentialsFile` command-line flag.
#### Command-line flags for PubSub producer
@ -1207,53 +1243,24 @@ which can be downloaded for evaluation from [releases](https://github.com/Victor
(see `vmutils-...-enterprise.tar.gz` archives) and from [docker images](https://hub.docker.com/r/victoriametrics/vmagent/tags) with tags containing `enterprise` suffix.
```text
-gcp.pubsub.publish.credentialsFile string
Path to file with GCP credentials to use for PubSub client. If not set, default credentials will be used (see Workload Identity for K8S, or https://cloud.google.com/docs/authentication/application-default-credentials. This flag is available only in Enterprise binaries. See https://docs.victoriametrics.com/enterprise.html
-gcp.pubsub.publish.byteThreshold int
Publish a batch when its size in bytes reaches this value. This flag is available only in Enterprise binaries. See https://docs.victoriametrics.com/enterprise.html (default 1000000)
Publish a batch when its size in bytes reaches this value. See https://docs.victoriametrics.com/vmagent.html#writing-metrics-to-pubsub . This flag is available only in Enterprise binaries. See https://docs.victoriametrics.com/enterprise.html (default 1000000)
-gcp.pubsub.publish.countThreshold int
Publish a batch when it has this many messages.This flag is available only in Enterprise binaries. See https://docs.victoriametrics.com/enterprise.html (default 100)
Publish a batch when it has this many messages. See https://docs.victoriametrics.com/vmagent.html#writing-metrics-to-pubsub . This flag is available only in Enterprise binaries. See https://docs.victoriametrics.com/enterprise.html (default 100)
-gcp.pubsub.publish.credentialsFile string
Path to file with GCP credentials to use for PubSub client. If not set, default credentials will be used (see Workload Identity for K8S or https://cloud.google.com/docs/authentication/application-default-credentials). See https://docs.victoriametrics.com/vmagent.html#writing-metrics-to-pubsub . This flag is available only in Enterprise binaries. See https://docs.victoriametrics.com/enterprise.html
-gcp.pubsub.publish.delayThreshold value
Publish a non-empty batch after this delay has passed. This flag is available only in Enterprise binaries. See https://docs.victoriametrics.com/enterprise.html
Publish a non-empty batch after this delay has passed. See https://docs.victoriametrics.com/vmagent.html#writing-metrics-to-pubsub . This flag is available only in Enterprise binaries. See https://docs.victoriametrics.com/enterprise.html
The following optional suffixes are supported: s (second), m (minute), h (hour), d (day), w (week), y (year). If suffix isn't set, then the duration is counted in months (default 10ms)
-gcp.pubsub.publish.maxOutstandingBytes int
MaxOutstandingBytes is the maximum size of buffered messages to be published. If less than or equal to zero, this is disabled. This flag is available only in Enterprise binaries. See https://docs.victoriametrics.com/enterprise.html (default -1)
The maximum size of buffered messages to be published. If less than or equal to zero, this is disabled. See https://docs.victoriametrics.com/vmagent.html#writing-metrics-to-pubsub . This flag is available only in Enterprise binaries. See https://docs.victoriametrics.com/enterprise.html (default -1)
-gcp.pubsub.publish.maxOutstandingMessages int
MaxOutstandingMessages is the maximum number of buffered messages to be published. If less than or equal to zero, this is disabled. This flag is available only in Enterprise binaries. See https://docs.victoriametrics.com/enterprise.html (default 100)
The maximum number of buffered messages to be published. If less than or equal to zero, this is disabled. See https://docs.victoriametrics.com/vmagent.html#writing-metrics-to-pubsub . This flag is available only in Enterprise binaries. See https://docs.victoriametrics.com/enterprise.html (default 100)
-gcp.pubsub.publish.timeout value
The maximum time that the client will attempt to publish a bundle of messages. This flag is available only in Enterprise binaries. See https://docs.victoriametrics.com/enterprise.html
The maximum time that the client will attempt to publish a bundle of messages. See https://docs.victoriametrics.com/vmagent.html#writing-metrics-to-pubsub . This flag is available only in Enterprise binaries. See https://docs.victoriametrics.com/enterprise.html
The following optional suffixes are supported: s (second), m (minute), h (hour), d (day), w (week), y (year). If suffix isn't set, then the duration is counted in months (default 60s)
```
## Calculating disk space for persistence queue
vmagent buffers scraped or received data in the file system directory specified via `-remoteWrite.tmpDataPath` (aka persistent queue)
until data is sent to `-remoteWrite.url`. The directory can grow large when remote storage is unavailable for extended
periods of time and if the maximum directory size isn't limited with `-remoteWrite.maxDiskUsagePerURL` command-line flag.
The buffered metrics are sent to remote storage as soon as the connection is repaired.
To estimate the allocated disk size for persistent quee, or to estimate `-remoteWrite.maxDiskUsagePerURL` command-line flag value,
take into account the following attributes:
1. The **size in bytes** of data stream sent by vmagent:
> Run query `sum(rate(vmagent_remotewrite_bytes_sent_total[1h])) by(instance)`
in [vmui](https://docs.victoriametrics.com/#vmui) or Grafana to get the amount of bytes sent by each vmagent instance
(in k8s you might want to `sum by(pod)` instead) per second.
2. The amount of **time** a persistent queue should keep the data before starting to drop it.
> If the persistent queue should be able to retain the data for at least 6h, then multiply the amount of bytes
sent by vmagent per-second (see above) by 6*3600s to get the approximate queue size on disk.
For example, expression `sum(rate(vmagent_remotewrite_bytes_sent_total[1h])) by(instance) * 6 * 3600 / 1Gi`
would show how much disk space in Gi is needed for vmagents instances to retain the persisten queue for `6h`.
Additional notes:
1. For `vmagent_.*` metrics to be available for querying ensure that [monitoring](#monitoring) is configured.
1. Re-evaluate the estimation each time when:
* there is an increase in the vmagent's workload
* there is a change in [relabeling rules](https://docs.victoriametrics.com/vmagent.html#relabeling) which could increase the amount metrics to send
* there is a change in number of configured `-remoteWrite.url` addresses
1. The minimum disk size to allocate for the persistent queue is 500Mi per each `-remoteWrite.url` .
## Kafka integration
[Enterprise version](https://docs.victoriametrics.com/enterprise.html) of `vmagent` can read and write metrics from / to Kafka:
@ -1272,7 +1279,7 @@ These formats can be configured with `-kafka.consumer.topic.defaultFormat` or `-
* `promremotewrite` - [Prometheus remote_write](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#remote_write).
Messages in this format can be sent by vmagent - see [these docs](#writing-metrics-to-kafka).
* `influx` - [InfluxDB line protocol format](https://docs.influxdata.com/influxdb/v1.7/write_protocols/line_protocol_tutorial/).
* `influx` - [InfluxDB line protocol format](https://docs.influxdata.com/influxdb/cloud/reference/syntax/line-protocol/).
* `prometheus` - [Prometheus text exposition format](https://github.com/prometheus/docs/blob/master/content/docs/instrumenting/exposition_formats.md#text-based-format)
and [OpenMetrics format](https://github.com/OpenObservability/OpenMetrics/blob/master/specification/OpenMetrics.md).
* `graphite` - [Graphite plaintext format](https://graphite.readthedocs.io/en/latest/feeding-carbon.html#the-plaintext-protocol).
@ -1315,30 +1322,33 @@ which can be downloaded for evaluation from [releases](https://github.com/Victor
```text
-kafka.consumer.topic array
Kafka topic names for data consumption.
Kafka topic names for data consumption. See https://docs.victoriametrics.com/vmagent.html#reading-metrics-from-kafka . This flag is available only in Enterprise binaries. See https://docs.victoriametrics.com/enterprise.html
Supports an array of values separated by comma or specified via multiple flags.
-kafka.consumer.topic.basicAuth.password array
Optional basic auth password for -kafka.consumer.topic. Must be used in conjunction with any supported auth methods for kafka client, specified by flag -kafka.consumer.topic.options='security.protocol=SASL_SSL;sasl.mechanisms=PLAIN'
Optional basic auth password for -kafka.consumer.topic. Must be used in conjunction with any supported auth methods for kafka client, specified by flag -kafka.consumer.topic.options='security.protocol=SASL_SSL;sasl.mechanisms=PLAIN' . See https://docs.victoriametrics.com/vmagent.html#reading-metrics-from-kafka . This flag is available only in Enterprise binaries. See https://docs.victoriametrics.com/enterprise.html
Supports an array of values separated by comma or specified via multiple flags.
-kafka.consumer.topic.basicAuth.username array
Optional basic auth username for -kafka.consumer.topic. Must be used in conjunction with any supported auth methods for kafka client, specified by flag -kafka.consumer.topic.options='security.protocol=SASL_SSL;sasl.mechanisms=PLAIN'
Optional basic auth username for -kafka.consumer.topic. Must be used in conjunction with any supported auth methods for kafka client, specified by flag -kafka.consumer.topic.options='security.protocol=SASL_SSL;sasl.mechanisms=PLAIN' . See https://docs.victoriametrics.com/vmagent.html#reading-metrics-from-kafka . This flag is available only in Enterprise binaries. See https://docs.victoriametrics.com/enterprise.html
Supports an array of values separated by comma or specified via multiple flags.
-kafka.consumer.topic.brokers array
List of brokers to connect for given topic, e.g. -kafka.consumer.topic.broker=host-1:9092;host-2:9092
List of brokers to connect for given topic, e.g. -kafka.consumer.topic.broker=host-1:9092;host-2:9092 . See https://docs.victoriametrics.com/vmagent.html#reading-metrics-from-kafka . This flag is available only in Enterprise binaries. See https://docs.victoriametrics.com/enterprise.html
Supports an array of values separated by comma or specified via multiple flags.
-kafka.consumer.topic.concurrency array
Configures consumer concurrency for topic specified via -kafka.consumer.topic flag. See https://docs.victoriametrics.com/vmagent.html#reading-metrics-from-kafka . This flag is available only in Enterprise binaries. See https://docs.victoriametrics.com/enterprise.html (default 1)
Supports array of values separated by comma or specified via multiple flags.
-kafka.consumer.topic.defaultFormat string
Expected data format in the topic if -kafka.consumer.topic.format is skipped. (default "promremotewrite")
Expected data format in the topic if -kafka.consumer.topic.format is skipped. See https://docs.victoriametrics.com/vmagent.html#reading-metrics-from-kafka . This flag is available only in Enterprise binaries. See https://docs.victoriametrics.com/enterprise.html (default "promremotewrite")
-kafka.consumer.topic.format array
data format for corresponding kafka topic. Valid formats: influx, prometheus, promremotewrite, graphite, jsonline
data format for corresponding kafka topic. Valid formats: influx, prometheus, promremotewrite, graphite, jsonline . See https://docs.victoriametrics.com/vmagent.html#reading-metrics-from-kafka . This flag is available only in Enterprise binaries. See https://docs.victoriametrics.com/enterprise.html
Supports an array of values separated by comma or specified via multiple flags.
-kafka.consumer.topic.groupID array
Defines group.id for topic
Defines group.id for topic. See https://docs.victoriametrics.com/vmagent.html#reading-metrics-from-kafka . This flag is available only in Enterprise binaries. See https://docs.victoriametrics.com/enterprise.html
Supports an array of values separated by comma or specified via multiple flags.
-kafka.consumer.topic.isGzipped array
Enables gzip setting for topic messages payload. Only prometheus, jsonline and influx formats accept gzipped messages.
Enables gzip setting for topic messages payload. Only prometheus, jsonline, graphite and influx formats accept gzipped messages.See https://docs.victoriametrics.com/vmagent.html#reading-metrics-from-kafka . This flag is available only in Enterprise binaries. See https://docs.victoriametrics.com/enterprise.html
Supports array of values separated by comma or specified via multiple flags.
-kafka.consumer.topic.options array
Optional key=value;key1=value2 settings for topic consumer. See full configuration options at https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md.
Optional key=value;key1=value2 settings for topic consumer. See full configuration options at https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md . See https://docs.victoriametrics.com/vmagent.html#reading-metrics-from-kafka . This flag is available only in Enterprise binaries. See https://docs.victoriametrics.com/enterprise.html
Supports an array of values separated by comma or specified via multiple flags.
```
@ -1359,13 +1369,18 @@ Two types of auth are supported:
* sasl with username and password:
```bash
./bin/vmagent -remoteWrite.url=kafka://localhost:9092/?topic=prom-rw&security.protocol=SASL_SSL&sasl.mechanisms=PLAIN -remoteWrite.basicAuth.username=user -remoteWrite.basicAuth.password=password
./bin/vmagent -remoteWrite.url='kafka://localhost:9092/?topic=prom-rw&security.protocol=SASL_SSL&sasl.mechanisms=PLAIN' \
-remoteWrite.basicAuth.username=user \
-remoteWrite.basicAuth.password=password
```
* tls certificates:
```bash
./bin/vmagent -remoteWrite.url=kafka://localhost:9092/?topic=prom-rw&security.protocol=SSL -remoteWrite.tlsCAFile=/opt/ca.pem -remoteWrite.tlsCertFile=/opt/cert.pem -remoteWrite.tlsKeyFile=/opt/key.pem
./bin/vmagent -remoteWrite.url='kafka://localhost:9092/?topic=prom-rw&security.protocol=SSL' \
-remoteWrite.tlsCAFile=/opt/ca.pem \
-remoteWrite.tlsCertFile=/opt/cert.pem \
-remoteWrite.tlsKeyFile=/opt/key.pem
```
## How to build from sources
@ -1456,6 +1471,8 @@ vmagent collects metrics data via popular data ingestion protocols and routes th
See the docs at https://docs.victoriametrics.com/vmagent.html .
-blockcache.missesBeforeCaching int
The number of cache misses before putting the block into cache. Higher values may reduce indexdb/dataBlocks cache size at the cost of higher CPU and disk read usage (default 2)
-cacheExpireDuration duration
Items are removed from in-memory caches after they aren't accessed for this duration. Lower values may reduce memory usage at the cost of higher CPU usage. See also -prevCacheRemovalPercent (default 30m0s)
-configAuthKey string
@ -1485,6 +1502,38 @@ See the docs at https://docs.victoriametrics.com/vmagent.html .
Auth key for /flags endpoint. It must be passed via authKey query arg. It overrides httpAuth.* settings
-fs.disableMmap
Whether to use pread() instead of mmap() for reading data files. By default, mmap() is used for 64-bit arches and pread() is used for 32-bit arches, since they cannot read data files bigger than 2^32 bytes in memory. mmap() is usually faster for reading small data chunks than pread()
-gcp.pubsub.publish.byteThreshold int
Publish a batch when its size in bytes reaches this value. See https://docs.victoriametrics.com/vmagent.html#writing-metrics-to-pubsub . This flag is available only in Enterprise binaries. See https://docs.victoriametrics.com/enterprise.html (default 1000000)
-gcp.pubsub.publish.countThreshold int
Publish a batch when it has this many messages. See https://docs.victoriametrics.com/vmagent.html#writing-metrics-to-pubsub . This flag is available only in Enterprise binaries. See https://docs.victoriametrics.com/enterprise.html (default 100)
-gcp.pubsub.publish.credentialsFile string
Path to file with GCP credentials to use for PubSub client. If not set, default credentials will be used (see Workload Identity for K8S or https://cloud.google.com/docs/authentication/application-default-credentials). See https://docs.victoriametrics.com/vmagent.html#writing-metrics-to-pubsub . This flag is available only in Enterprise binaries. See https://docs.victoriametrics.com/enterprise.html
-gcp.pubsub.publish.delayThreshold value
Publish a non-empty batch after this delay has passed. See https://docs.victoriametrics.com/vmagent.html#writing-metrics-to-pubsub . This flag is available only in Enterprise binaries. See https://docs.victoriametrics.com/enterprise.html
The following optional suffixes are supported: s (second), m (minute), h (hour), d (day), w (week), y (year). If suffix isn't set, then the duration is counted in months (default 10ms)
-gcp.pubsub.publish.maxOutstandingBytes int
The maximum size of buffered messages to be published. If less than or equal to zero, this is disabled. See https://docs.victoriametrics.com/vmagent.html#writing-metrics-to-pubsub . This flag is available only in Enterprise binaries. See https://docs.victoriametrics.com/enterprise.html (default -1)
-gcp.pubsub.publish.maxOutstandingMessages int
The maximum number of buffered messages to be published. If less than or equal to zero, this is disabled. See https://docs.victoriametrics.com/vmagent.html#writing-metrics-to-pubsub . This flag is available only in Enterprise binaries. See https://docs.victoriametrics.com/enterprise.html (default 100)
-gcp.pubsub.publish.timeout value
The maximum time that the client will attempt to publish a bundle of messages. See https://docs.victoriametrics.com/vmagent.html#writing-metrics-to-pubsub . This flag is available only in Enterprise binaries. See https://docs.victoriametrics.com/enterprise.html
The following optional suffixes are supported: s (second), m (minute), h (hour), d (day), w (week), y (year). If suffix isn't set, then the duration is counted in months (default 60s)
-gcp.pubsub.subscribe.credentialsFile string
Path to file with GCP credentials to use for PubSub client. If not set, default credentials are used (see Workload Identity for K8S or https://cloud.google.com/docs/authentication/application-default-credentials ). See https://docs.victoriametrics.com/vmagent.html#reading-metrics-from-pubsub . This flag is available only in Enterprise binaries. See https://docs.victoriametrics.com/enterprise.html
-gcp.pubsub.subscribe.defaultMessageFormat string
Default message format if -gcp.pubsub.subscribe.topicSubscription.messageFormat is missing. See https://docs.victoriametrics.com/vmagent.html#reading-metrics-from-pubsub . This flag is available only in Enterprise binaries. See https://docs.victoriametrics.com/enterprise.html (default "promremotewrite")
-gcp.pubsub.subscribe.topicSubscription array
GCP PubSub topic subscription in the format: projects/<project-id>/subscriptions/<subscription-name>. See https://docs.victoriametrics.com/vmagent.html#reading-metrics-from-pubsub . This flag is available only in Enterprise binaries. See https://docs.victoriametrics.com/enterprise.html
Supports an array of values separated by comma or specified via multiple flags.
-gcp.pubsub.subscribe.topicSubscription.concurrency array
The number of concurrently processed messages for topic subscription specified via -gcp.pubsub.subscribe.topicSubscription flag. See https://docs.victoriametrics.com/vmagent.html#reading-metrics-from-pubsub . This flag is available only in Enterprise binaries. See https://docs.victoriametrics.com/enterprise.html (default 0)
Supports array of values separated by comma or specified via multiple flags.
-gcp.pubsub.subscribe.topicSubscription.isGzipped array
Enables gzip decompression for messages payload at the corresponding -gcp.pubsub.subscribe.topicSubscription. Only prometheus, jsonline, graphite and influx formats accept gzipped messages. See https://docs.victoriametrics.com/vmagent.html#reading-metrics-from-pubsub . This flag is available only in Enterprise binaries. See https://docs.victoriametrics.com/enterprise.html
Supports array of values separated by comma or specified via multiple flags.
-gcp.pubsub.subscribe.topicSubscription.messageFormat array
Message format for the corresponding -gcp.pbusub.subcribe.topicSubscription. Valid formats: influx, prometheus, promremotewrite, graphite, jsonline . See https://docs.victoriametrics.com/vmagent.html#reading-metrics-from-pubsub . This flag is available only in Enterprise binaries. See https://docs.victoriametrics.com/enterprise.html
Supports an array of values separated by comma or specified via multiple flags.
-graphiteListenAddr string
TCP and UDP address to listen for Graphite plaintext data. Usually :2003 must be set. Doesn't work if empty. See also -graphiteListenAddr.useProxyProtocol
-graphiteListenAddr.useProxyProtocol
@ -1549,33 +1598,33 @@ See the docs at https://docs.victoriametrics.com/vmagent.html .
-internStringMaxLen int
The maximum length for strings to intern. A lower limit may save memory at the cost of higher CPU usage. See https://en.wikipedia.org/wiki/String_interning . See also -internStringDisableCache and -internStringCacheExpireDuration (default 500)
-kafka.consumer.topic array
Kafka topic names for data consumption. This flag is available only in Enterprise binaries. See https://docs.victoriametrics.com/enterprise.html
Kafka topic names for data consumption. See https://docs.victoriametrics.com/vmagent.html#reading-metrics-from-kafka . This flag is available only in Enterprise binaries. See https://docs.victoriametrics.com/enterprise.html
Supports an array of values separated by comma or specified via multiple flags.
-kafka.consumer.topic.basicAuth.password array
Optional basic auth password for -kafka.consumer.topic. Must be used in conjunction with any supported auth methods for kafka client, specified by flag -kafka.consumer.topic.options='security.protocol=SASL_SSL;sasl.mechanisms=PLAIN' . This flag is available only in Enterprise binaries. See https://docs.victoriametrics.com/enterprise.html
Optional basic auth password for -kafka.consumer.topic. Must be used in conjunction with any supported auth methods for kafka client, specified by flag -kafka.consumer.topic.options='security.protocol=SASL_SSL;sasl.mechanisms=PLAIN' . See https://docs.victoriametrics.com/vmagent.html#reading-metrics-from-kafka . This flag is available only in Enterprise binaries. See https://docs.victoriametrics.com/enterprise.html
Supports an array of values separated by comma or specified via multiple flags.
-kafka.consumer.topic.basicAuth.username array
Optional basic auth username for -kafka.consumer.topic. Must be used in conjunction with any supported auth methods for kafka client, specified by flag -kafka.consumer.topic.options='security.protocol=SASL_SSL;sasl.mechanisms=PLAIN' . This flag is available only in Enterprise binaries. See https://docs.victoriametrics.com/enterprise.html
Optional basic auth username for -kafka.consumer.topic. Must be used in conjunction with any supported auth methods for kafka client, specified by flag -kafka.consumer.topic.options='security.protocol=SASL_SSL;sasl.mechanisms=PLAIN' . See https://docs.victoriametrics.com/vmagent.html#reading-metrics-from-kafka . This flag is available only in Enterprise binaries. See https://docs.victoriametrics.com/enterprise.html
Supports an array of values separated by comma or specified via multiple flags.
-kafka.consumer.topic.brokers array
List of brokers to connect for given topic, e.g. -kafka.consumer.topic.broker=host-1:9092;host-2:9092 . This flag is available only in Enterprise binaries. See https://docs.victoriametrics.com/enterprise.html
List of brokers to connect for given topic, e.g. -kafka.consumer.topic.broker=host-1:9092;host-2:9092 . See https://docs.victoriametrics.com/vmagent.html#reading-metrics-from-kafka . This flag is available only in Enterprise binaries. See https://docs.victoriametrics.com/enterprise.html
Supports an array of values separated by comma or specified via multiple flags.
-kafka.consumer.topic.concurrency array
Configures consumer concurrency for topic specified via -kafka.consumer.topic flag.This flag is available only in Enterprise binaries. See https://docs.victoriametrics.com/enterprise.html (default 1)
Configures consumer concurrency for topic specified via -kafka.consumer.topic flag. See https://docs.victoriametrics.com/vmagent.html#reading-metrics-from-kafka . This flag is available only in Enterprise binaries. See https://docs.victoriametrics.com/enterprise.html (default 1)
Supports array of values separated by comma or specified via multiple flags.
-kafka.consumer.topic.defaultFormat string
Expected data format in the topic if -kafka.consumer.topic.format is skipped. This flag is available only in Enterprise binaries. See https://docs.victoriametrics.com/enterprise.html (default "promremotewrite")
Expected data format in the topic if -kafka.consumer.topic.format is skipped. See https://docs.victoriametrics.com/vmagent.html#reading-metrics-from-kafka . This flag is available only in Enterprise binaries. See https://docs.victoriametrics.com/enterprise.html (default "promremotewrite")
-kafka.consumer.topic.format array
data format for corresponding kafka topic. Valid formats: influx, prometheus, promremotewrite, graphite, jsonline . This flag is available only in Enterprise binaries. See https://docs.victoriametrics.com/enterprise.html
data format for corresponding kafka topic. Valid formats: influx, prometheus, promremotewrite, graphite, jsonline . See https://docs.victoriametrics.com/vmagent.html#reading-metrics-from-kafka . This flag is available only in Enterprise binaries. See https://docs.victoriametrics.com/enterprise.html
Supports an array of values separated by comma or specified via multiple flags.
-kafka.consumer.topic.groupID array
Defines group.id for topic. This flag is available only in Enterprise binaries. See https://docs.victoriametrics.com/enterprise.html
Defines group.id for topic. See https://docs.victoriametrics.com/vmagent.html#reading-metrics-from-kafka . This flag is available only in Enterprise binaries. See https://docs.victoriametrics.com/enterprise.html
Supports an array of values separated by comma or specified via multiple flags.
-kafka.consumer.topic.isGzipped array
Enables gzip setting for topic messages payload. Only prometheus, jsonline and influx formats accept gzipped messages.This flag is available only in Enterprise binaries. See https://docs.victoriametrics.com/enterprise.html
Enables gzip setting for topic messages payload. Only prometheus, jsonline, graphite and influx formats accept gzipped messages.See https://docs.victoriametrics.com/vmagent.html#reading-metrics-from-kafka . This flag is available only in Enterprise binaries. See https://docs.victoriametrics.com/enterprise.html
Supports array of values separated by comma or specified via multiple flags.
-kafka.consumer.topic.options array
Optional key=value;key1=value2 settings for topic consumer. See full configuration options at https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md . This flag is available only in Enterprise binaries. See https://docs.victoriametrics.com/enterprise.html
Optional key=value;key1=value2 settings for topic consumer. See full configuration options at https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md . See https://docs.victoriametrics.com/vmagent.html#reading-metrics-from-kafka . This flag is available only in Enterprise binaries. See https://docs.victoriametrics.com/enterprise.html
Supports an array of values separated by comma or specified via multiple flags.
-license string
Lisense key for VictoriaMetrics Enterprise. See https://victoriametrics.com/products/enterprise/ . Trial Enterprise license can be obtained from https://victoriametrics.com/products/enterprise/trial/ . This flag is available only in Enterprise binaries. The license key can be also passed via file specified by -licenseFile command-line flag
@ -1593,6 +1642,8 @@ See the docs at https://docs.victoriametrics.com/vmagent.html .
Allows renaming fields in JSON formatted logs. Example: "ts:timestamp,msg:message" renames "ts" to "timestamp" and "msg" to "message". Supported fields: ts, level, caller, msg
-loggerLevel string
Minimum level of errors to log. Possible values: INFO, WARN, ERROR, FATAL, PANIC (default "INFO")
-loggerMaxArgLen int
The maximum length of a single logged argument. Longer arguments are replaced with 'arg_start..arg_end', where 'arg_start' and 'arg_end' is prefix and suffix of the arg with the length not exceeding -loggerMaxArgLen / 2 (default 1000)
-loggerOutput string
Output for the logs. Supported values: stderr, stdout (default "stderr")
-loggerTimezone string
@ -1600,7 +1651,7 @@ See the docs at https://docs.victoriametrics.com/vmagent.html .
-loggerWarnsPerSecondLimit int
Per-second limit on the number of WARN messages. If more than the given number of warns are emitted per second, then the remaining warns are suppressed. Zero values disable the rate limit
-maxConcurrentInserts int
The maximum number of concurrent insert requests. Default value should work for most cases, since it minimizes the memory usage. The default value can be increased when clients send data over slow networks. See also -insert.maxQueueDuration (default 8)
The maximum number of concurrent insert requests. Default value should work for most cases, since it minimizes the memory usage. The default value can be increased when clients send data over slow networks. See also -insert.maxQueueDuration (default 32)
-maxInsertRequestSize size
The maximum size in bytes of a single Prometheus remote_write API request
Supports the following optional suffixes for size values: KB, MB, GB, TB, KiB, MiB, GiB, TiB (default 33554432)
@ -1826,7 +1877,7 @@ See the docs at https://docs.victoriametrics.com/vmagent.html .
Optional proxy URL for writing data to the corresponding -remoteWrite.url. Supported proxies: http, https, socks5. Example: -remoteWrite.proxyURL=socks5://proxy:1234
Supports an array of values separated by comma or specified via multiple flags.
-remoteWrite.queues int
The number of concurrent queues to each -remoteWrite.url. Set more queues if default number of queues isn't enough for sending high volume of collected data to remote storage. Default value is 2 * numberOfAvailableCPUs (default 8)
The number of concurrent queues to each -remoteWrite.url. Set more queues if default number of queues isn't enough for sending high volume of collected data to remote storage. Default value is 2 * numberOfAvailableCPUs (default 32)
-remoteWrite.rateLimit array
Optional rate limit in bytes per second for data sent to the corresponding -remoteWrite.url. By default, the rate limit is disabled. It can be useful for limiting load on remote storage when big amounts of buffered data is sent after temporary unavailability of the remote storage (default 0)
Supports array of values separated by comma or specified via multiple flags.

View file

@ -27,11 +27,20 @@ var (
// The callback can be called concurrently multiple times for streamed data from r.
//
// callback shouldn't hold rows after returning.
func Parse(r io.Reader, callback func(rows []graphite.Row) error) error {
func Parse(r io.Reader, isGzipped bool, callback func(rows []graphite.Row) error) error {
wcr := writeconcurrencylimiter.GetReader(r)
defer writeconcurrencylimiter.PutReader(wcr)
r = wcr
if isGzipped {
zr, err := common.GetGzipReader(r)
if err != nil {
return fmt.Errorf("cannot read gzipped graphite data: %w", err)
}
defer common.PutGzipReader(zr)
r = zr
}
ctx := getStreamContext(r)
defer putStreamContext(ctx)