diff --git a/app/vmagent/remotewrite/remotewrite.go b/app/vmagent/remotewrite/remotewrite.go index a7cdcf32b..3c55c2ff8 100644 --- a/app/vmagent/remotewrite/remotewrite.go +++ b/app/vmagent/remotewrite/remotewrite.go @@ -6,6 +6,7 @@ import ( "net/http" "net/url" "path/filepath" + "slices" "strconv" "sync" "sync/atomic" @@ -88,10 +89,10 @@ var ( "By default there are no limits on samples ingestion rate. See also -remoteWrite.rateLimit") disableOnDiskQueue = flagutil.NewArrayBool("remoteWrite.disableOnDiskQueue", "Whether to disable storing pending data to -remoteWrite.tmpDataPath "+ - "when the configured remote storage systems cannot keep up with the data ingestion rate. See https://docs.victoriametrics.com/vmagent#disabling-on-disk-persistence ."+ - "See also -remoteWrite.dropSamplesOnOverload") - dropSamplesOnOverload = flagutil.NewArrayBool("remoteWrite.dropSamplesOnOverload", "Whether to drop samples when -remoteWrite.disableOnDiskQueue is set and if the samples "+ - "cannot be pushed into the configured remote storage systems in a timely manner. See https://docs.victoriametrics.com/vmagent#disabling-on-disk-persistence") + "when the remote storage system at the corresponding -remoteWrite.url cannot keep up with the data ingestion rate. "+ + "See https://docs.victoriametrics.com/vmagent#disabling-on-disk-persistence . See also -remoteWrite.dropSamplesOnOverload") + dropSamplesOnOverload = flag.Bool("remoteWrite.dropSamplesOnOverload", false, "Whether to drop samples when -remoteWrite.disableOnDiskQueue is set and if the samples "+ + "cannot be pushed into the configured -remoteWrite.url systems in a timely manner. See https://docs.victoriametrics.com/vmagent#disabling-on-disk-persistence") ) var ( @@ -109,8 +110,11 @@ var ( StatusCode: http.StatusTooManyRequests, } - // disableOnDiskQueueAll is set to true if all remoteWrite.urls were configured to disable persistent queue via disableOnDiskQueue + // disableOnDiskQueueAll is set to true if all -remoteWrite.url were configured to disable persistent queue via -remoteWrite.disableOnDiskQueue disableOnDiskQueueAll bool + + // dropSamplesOnFailureGlobal is set to true if -remoteWrite.dropSamplesOnOverload is set or if multiple -remoteWrite.disableOnDiskQueue options are set. + dropSamplesOnFailureGlobal bool ) // MultitenancyEnabled returns true if -enableMultitenantHandlers is specified. @@ -218,13 +222,14 @@ func Init() { rwctxs = newRemoteWriteCtxs(nil, *remoteWriteURLs) } - disableOnDiskQueueAll = true - for _, v := range *disableOnDiskQueue { - if !v { - disableOnDiskQueueAll = false - break - } - } + disableOnDiskQueues := []bool(*disableOnDiskQueue) + disableOnDiskQueueAll = !slices.Contains(disableOnDiskQueues, false) + + // Samples must be dropped if multiple -remoteWrite.disableOnDiskQueue options are configured and at least a single is set to true. + // In this case it is impossible to prevent from sending many duplicates of samples passed to TryPush() to all the configured -remoteWrite.url + // if these samples couldn't be sent to the -remoteWrite.url with the disabled persistent queue. So it is better sending samples + // to the remaining -remoteWrite.url and dropping them on the blocked queue. + dropSamplesOnFailureGlobal = *dropSamplesOnOverload || len(disableOnDiskQueues) > 1 && slices.Contains(disableOnDiskQueues, true) dropDanglingQueues() @@ -397,6 +402,8 @@ func Stop() { // PushDropSamplesOnFailure pushes wr to the configured remote storage systems set via -remoteWrite.url // +// PushDropSamplesOnFailure drops wr samples if they cannot be sent to -remoteWrite.url by any reason. +// // PushDropSamplesOnFailure can modify wr contents. func PushDropSamplesOnFailure(at *auth.Token, wr *prompbmarshal.WriteRequest) { _ = tryPush(at, wr, true) @@ -409,7 +416,7 @@ func PushDropSamplesOnFailure(at *auth.Token, wr *prompbmarshal.WriteRequest) { // // The caller must return ErrQueueFullHTTPRetry to the client, which sends wr, if TryPush returns false. func TryPush(at *auth.Token, wr *prompbmarshal.WriteRequest) bool { - return tryPush(at, wr, false) + return tryPush(at, wr, dropSamplesOnFailureGlobal) } func tryPush(at *auth.Token, wr *prompbmarshal.WriteRequest, forceDropSamplesOnFailure bool) bool { @@ -433,17 +440,22 @@ func tryPush(at *auth.Token, wr *prompbmarshal.WriteRequest, forceDropSamplesOnF // if some of remote storage systems cannot keep up with the data ingestion rate. // this shortcut is only applicable if all remote writes have disableOnDiskQueue = true if disableOnDiskQueueAll { + skippedQueues := 0 for _, rwctx := range rwctxs { if rwctx.fq.IsWriteBlocked() { rwctx.pushFailures.Inc() - if forceDropSamplesOnFailure || rwctx.dropSamplesOnOverload { - // Just drop samples - rwctx.rowsDroppedOnPushFailure.Add(rowsCount) - continue + if !forceDropSamplesOnFailure { + return false } - return false + rwctx.rowsDroppedOnPushFailure.Add(rowsCount) + skippedQueues++ } } + if skippedQueues == len(rwctxs) { + // All the queues are skipped because they are blocked and dropSamplesOnFailure is set to true. + // Return true to the caller, so it doesn't re-send the samples again. + return true + } } var rctx *relabelCtx @@ -745,10 +757,8 @@ type remoteWriteCtx struct { sas atomic.Pointer[streamaggr.Aggregators] deduplicator *streamaggr.Deduplicator - streamAggrKeepInput bool - streamAggrDropInput bool - disableOnDiskQueue bool - dropSamplesOnOverload bool + streamAggrKeepInput bool + streamAggrDropInput bool pss []*pendingSeries pssNextIdx atomic.Uint64 @@ -773,6 +783,7 @@ func newRemoteWriteCtx(argIdx int, remoteWriteURL *url.URL, maxInmemoryBlocks in logger.Warnf("rounding the -remoteWrite.maxDiskUsagePerURL=%d to the minimum supported value: %d", maxPendingBytes, persistentqueue.DefaultChunkFileSize) maxPendingBytes = persistentqueue.DefaultChunkFileSize } + isPQDisabled := disableOnDiskQueue.GetOptionalArg(argIdx) fq := persistentqueue.MustOpenFastQueue(queuePath, sanitizedURL, maxInmemoryBlocks, maxPendingBytes, isPQDisabled) _ = metrics.GetOrCreateGauge(fmt.Sprintf(`vmagent_remotewrite_pending_data_bytes{path=%q, url=%q}`, queuePath, sanitizedURL), func() float64 { @@ -817,9 +828,6 @@ func newRemoteWriteCtx(argIdx int, remoteWriteURL *url.URL, maxInmemoryBlocks in c: c, pss: pss, - dropSamplesOnOverload: dropSamplesOnOverload.GetOptionalArg(argIdx), - disableOnDiskQueue: isPQDisabled, - rowsPushedAfterRelabel: metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_rows_pushed_after_relabel_total{path=%q, url=%q}`, queuePath, sanitizedURL)), rowsDroppedByRelabel: metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_relabel_metrics_dropped_total{path=%q, url=%q}`, queuePath, sanitizedURL)), @@ -934,8 +942,9 @@ func (rwctx *remoteWriteCtx) TryPush(tss []prompbmarshal.TimeSeries, forceDropSa // Couldn't push tss to remote storage rwctx.pushFailures.Inc() - if forceDropSamplesOnFailure || rwctx.dropSamplesOnOverload { - rwctx.rowsDroppedOnPushFailure.Add(len(tss)) + if forceDropSamplesOnFailure { + rowsCount := getRowsCount(tss) + rwctx.rowsDroppedOnPushFailure.Add(rowsCount) return true } return false @@ -962,14 +971,12 @@ func (rwctx *remoteWriteCtx) pushInternalTrackDropped(tss []prompbmarshal.TimeSe if rwctx.tryPushInternal(tss) { return } - if !rwctx.disableOnDiskQueue { + if !rwctx.fq.IsPersistentQueueDisabled() { logger.Panicf("BUG: tryPushInternal must return true if -remoteWrite.disableOnDiskQueue isn't set") } rwctx.pushFailures.Inc() - if dropSamplesOnOverload.GetOptionalArg(rwctx.idx) { - rowsCount := getRowsCount(tss) - rwctx.rowsDroppedOnPushFailure.Add(rowsCount) - } + rowsCount := getRowsCount(tss) + rwctx.rowsDroppedOnPushFailure.Add(rowsCount) } func (rwctx *remoteWriteCtx) tryPushInternal(tss []prompbmarshal.TimeSeries) bool { diff --git a/app/vmagent/remotewrite/remotewrite_test.go b/app/vmagent/remotewrite/remotewrite_test.go index d86b20eda..5461ed243 100644 --- a/app/vmagent/remotewrite/remotewrite_test.go +++ b/app/vmagent/remotewrite/remotewrite_test.go @@ -94,7 +94,9 @@ func TestRemoteWriteContext_TryPush_ImmutableTimeseries(t *testing.T) { // copy inputTss to make sure it is not mutated during TryPush call copy(expectedTss, inputTss) - rwctx.TryPush(inputTss, false) + if !rwctx.TryPush(inputTss, false) { + t.Fatalf("cannot push samples to rwctx") + } if !reflect.DeepEqual(expectedTss, inputTss) { t.Fatalf("unexpected samples;\ngot\n%v\nwant\n%v", inputTss, expectedTss) diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index c97af984d..a3ecccd59 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -119,8 +119,8 @@ Released at 2024-06-07 * FEATURE: [dashboards](https://grafana.com/orgs/victoriametrics): use `$__interval` variable for offsets and look-behind windows in annotations. This should improve precision of `restarts` and `version change` annotations when zooming-in/zooming-out on the dashboards. * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): support aggregation and deduplication configs before replicating data to configured `-remoteWrite.url` destinations. This saves CPU and memory resources when incoming data needs to be aggregated or deduplicated once and then replicated to multiple destinations. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5467). * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add service discovery support for [Vultr](https://www.vultr.com/). See [these docs](https://docs.victoriametrics.com/sd_configs/#vultr_sd_configs) and [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6041). -* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): allow configuring `-remoteWrite.disableOnDiskQueue` and `-remoteWrite.dropSamplesOnOverload` cmd-line flags per each `-remoteWrite.url`. See this [pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6065). Thanks to @rbizos for implementation! -* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add labels `path` and `url` to metrics `vmagent_remotewrite_push_failures_total` and `vmagent_remotewrite_samples_dropped_total`. Now number of failed pushes and dropped samples can be tracked per `-remoteWrite.url`. +* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): allow specifying `-remoteWrite.disableOnDiskQueue` command-line flag per each `-remoteWrite.url`. If multiple `-remoteWrite.disableOnDiskQueue` command-line flags are configured, then the `-remoteWrite.dropSamplesOnOverload` is automatically set to true, so samples are automatically dropped if they cannot be sent to the corresponding `-remoteWrite.url` in a timely manner. See this [pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6065). Thanks to @rbizos for implementation! +* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add `path` and `url` labels to `vmagent_remotewrite_push_failures_total` and `vmagent_remotewrite_samples_dropped_total` [metrics](https://docs.victoriametrics.com/vmagent/#monitoring), so the number of failed pushes and dropped samples can be tracked per each `-remoteWrite.url`. * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): support Statsd plaintext protocol. See [these docs](https://docs.victoriametrics.com/vmagent/#how-to-push-data-to-vmagent) and this [pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5053). Thanks to @Koilanetroc for implementation! * FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): add [rate_sum](https://docs.victoriametrics.com/stream-aggregation/#rate_sum) and [rate_avg](https://docs.victoriametrics.com/stream-aggregation/#rate_avg) aggregation outputs. * FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): reduce the number of allocated objects in heap during deduplication and aggregation. The change supposed to reduce pressure on Garbage Collector, as it will need to scan less objects. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6402). diff --git a/docs/vmagent.md b/docs/vmagent.md index ccf9a1544..70965d62d 100644 --- a/docs/vmagent.md +++ b/docs/vmagent.md @@ -1021,9 +1021,9 @@ scrape_configs: ## Disabling on-disk persistence -By default `vmagent` stores pending data, which cannot be sent to the configured remote storage systems in a timely manner, in the folder configured -via `-remoteWrite.tmpDataPath` command-line flag. By default `vmagent` writes all the pending data to this folder until this data is sent to the configured -remote storage systems or until the folder becomes full. The maximum data size, which can be saved to `-remoteWrite.tmpDataPath` +By default `vmagent` stores pending data, which cannot be sent to the configured remote storage systems in a timely manner, in the folder set +by `-remoteWrite.tmpDataPath` command-line flag. By default `vmagent` writes all the pending data to this folder until this data is sent to the configured +`-remoteWrite.url` systems or until the folder becomes full. The maximum data size, which can be saved to `-remoteWrite.tmpDataPath` per every configured `-remoteWrite.url`, can be limited via `-remoteWrite.maxDiskUsagePerURL` command-line flag. When this limit is reached, `vmagent` drops the oldest data from disk in order to save newly ingested data. @@ -1031,21 +1031,25 @@ There are cases when it is better disabling on-disk persistence for pending data - When the persistent disk performance isn't enough for the given data processing rate. - When it is better to buffer pending data at the client side instead of bufferring it at `vmagent` side in the `-remoteWrite.tmpDataPath` folder. -- When the data is already buffered at [Kafka side](#reading-metrics-from-kafka) or [Google PubSub side](#reading-metrics-from-pubsub). +- When the data is already buffered at [Kafka side](#reading-metrics-from-kafka) or at [Google PubSub side](#reading-metrics-from-pubsub). - When it is better to drop pending data instead of buffering it. -In this case `-remoteWrite.disableOnDiskQueue` command-line flag can be passed to `vmagent`. -When this flag is specified, `vmagent` works in the following way if the configured remote storage systems cannot keep up with the data ingestion rate: +In this case `-remoteWrite.disableOnDiskQueue` command-line flag can be passed to `vmagent` per each configured `-remoteWrite.url`. +`vmagent` works in the following way if the corresponding remote storage system at `-remoteWrite.url` cannot keep up with the data ingestion rate +and the `-remoteWrite.disableOnDiskQueue` command-line flag is set: - It returns `429 Too Many Requests` HTTP error to clients, which send data to `vmagent` via [supported HTTP endpoints](#how-to-push-data-to-vmagent). - You can specify `-remoteWrite.dropSamplesOnOverload` command-line flag in order to drop the ingested samples instead of returning the error to clients in this case. + If `-remoteWrite.dropSamplesOnOverload` command-line flag is set or if multiple `-remoteWrite.disableOnDiskQueue` command-line flags are set + for different `-remoteWrite.url` options, then the ingested samples are silently dropped instead of returning the error to clients. - It suspends consuming data from [Kafka side](#reading-metrics-from-kafka) or [Google PubSub side](#google-pubsub-integration) until the remote storage becomes available. - You can specify `-remoteWrite.dropSamplesOnOverload` command-line flag in order to drop the fetched samples instead of suspending data consumption from Kafka or Google PubSub. -- It drops samples pushed to `vmagent` via non-HTTP protocols and logs the error. Pass `-remoteWrite.dropSamplesOnOverload` on order to suppress error messages in this case. -- It drops samples [scraped from Prometheus-compatible targets](#how-to-collect-metrics-in-prometheus-format), because it is better to drop samples - instead of blocking the scrape process. -- It drops [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/) output samples, because it is better to drop output samples - instead of blocking the stream aggregation process. + If `-remoteWrite.dropSamplesOnOverload` command-line flag is set or if multiple `-remoteWrite.disableOnDiskQueue` command-line flags are set + for different `-remoteWrite.url` options, then the fetched samples are silently dropped instead of suspending data consumption from Kafka or Google PubSub. +- It drops samples pushed to `vmagent` via non-HTTP protocols and logs the error. Pass `-remoteWrite.dropSamplesOnOverload` command-line flag in order + to suppress error messages in this case. +- It drops samples [scraped from Prometheus-compatible targets](#how-to-collect-metrics-in-prometheus-format), because it is better from operations perspective + to drop samples instead of blocking the scrape process. +- It drops [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/) output samples, because it is better from operations perspective + to drop output samples instead of blocking the stream aggregation process. The number of dropped samples because of overloaded remote storage can be [monitored](#monitoring) via `vmagent_remotewrite_samples_dropped_total` metric. The number of unsuccessful attempts to send data to overloaded remote storage can be [monitored](#monitoring) via `vmagent_remotewrite_push_failures_total` metric. @@ -1057,7 +1061,7 @@ on spiky workloads, since `vmagent` may buffer more data in memory before return if `-remoteWrite.disableOnDiskQueue` command-line flag is specified. It may also read buffered data from `-remoteWrite.tmpDataPath` on startup. -When `-remoteWrite.disableOnDiskQueue` command-line flag is set, then `vmagent` may send the same samples multiple times to the configured remote storage +When `-remoteWrite.disableOnDiskQueue` command-line flag is set, `vmagent` may send the same samples multiple times to the configured remote storage if it cannot keep up with the data ingestion rate. In this case the [deduplication](https://docs.victoriametrics.com/#deduplication) must be enabled on all the configured remote storage systems. @@ -1196,7 +1200,7 @@ If you have suggestions for improvements or have found a bug - please open an is with `-remoteWrite.maxDiskUsagePerURL` command-line flag. If you don't want to send all the buffered data from the directory to remote storage then simply stop `vmagent` and delete the directory. -* If `vmagent` runs on a host with slow persistent storage, which cannot keep up with the volume of processed samples, then is possible to disable +* If `vmagent` runs on a host with slow persistent storage, which cannot keep up with the volume of processed samples, then it is possible to disable the persistent storage with `-remoteWrite.disableOnDiskQueue` command-line flag. See [these docs](#disabling-on-disk-persistence) for more details. * By default `vmagent` masks `-remoteWrite.url` with `secret-url` values in logs and at `/metrics` page because @@ -2065,13 +2069,11 @@ See the docs at https://docs.victoriametrics.com/vmagent/ . Supports an array of values separated by comma or specified via multiple flags. Value can contain comma inside single-quoted or double-quoted string, {}, [] and () braces. -remoteWrite.disableOnDiskQueue array - Whether to disable storing pending data to -remoteWrite.tmpDataPath when the configured remote storage systems cannot keep up with the data ingestion rate. See https://docs.victoriametrics.com/vmagent#disabling-on-disk-persistence .See also -remoteWrite.dropSamplesOnOverload - Supports array of values separated by comma or specified via multiple flags. - Empty values are set to false. - -remoteWrite.dropSamplesOnOverload array - Whether to drop samples when -remoteWrite.disableOnDiskQueue is set and if the samples cannot be pushed into the configured remote storage systems in a timely manner. See https://docs.victoriametrics.com/vmagent#disabling-on-disk-persistence + Whether to disable storing pending data to -remoteWrite.tmpDataPath when the remote storage system at the corresponding -remoteWrite.url cannot keep up with the data ingestion rate. See https://docs.victoriametrics.com/vmagent#disabling-on-disk-persistence . See also -remoteWrite.dropSamplesOnOverload Supports array of values separated by comma or specified via multiple flags. Empty values are set to false. + -remoteWrite.dropSamplesOnOverload + Whether to drop samples when -remoteWrite.disableOnDiskQueue is set and if the samples cannot be pushed into the configured -remoteWrite.url systems in a timely manner. See https://docs.victoriametrics.com/vmagent#disabling-on-disk-persistence -remoteWrite.flushInterval duration Interval for flushing the data to remote storage. This option takes effect only when less than 10K data points per second are pushed to -remoteWrite.url (default 1s) -remoteWrite.forcePromProto array diff --git a/lib/persistentqueue/fastqueue.go b/lib/persistentqueue/fastqueue.go index 396d7a69b..c1ba7b8ce 100644 --- a/lib/persistentqueue/fastqueue.go +++ b/lib/persistentqueue/fastqueue.go @@ -62,11 +62,21 @@ func MustOpenFastQueue(path, name string, maxInmemoryBlocks int, maxPendingBytes fq.mu.Unlock() return float64(n) }) + pendingBytes := fq.GetPendingBytes() - logger.Infof("opened fast persistent queue at %q with maxInmemoryBlocks=%d isPQDisabled=%t, it contains %d pending bytes", path, maxInmemoryBlocks, isPQDisabled, pendingBytes) + persistenceStatus := "enabled" + if isPQDisabled { + persistenceStatus = "disabled" + } + logger.Infof("opened fast queue at %q with maxInmemoryBlocks=%d, it contains %d pending bytes, persistence is %s", path, maxInmemoryBlocks, pendingBytes, persistenceStatus) return fq } +// IsPersistentQueueDisabled returns true if persistend queue at fq is disabled. +func (fq *FastQueue) IsPersistentQueueDisabled() bool { + return fq.isPQDisabled +} + // IsWriteBlocked checks if data can be pushed into fq func (fq *FastQueue) IsWriteBlocked() bool { if !fq.isPQDisabled {