diff --git a/app/vmagent/remotewrite/remotewrite.go b/app/vmagent/remotewrite/remotewrite.go index b9d99d1c7..13be88c5a 100644 --- a/app/vmagent/remotewrite/remotewrite.go +++ b/app/vmagent/remotewrite/remotewrite.go @@ -110,11 +110,11 @@ var ( streamAggrDropInputLabels = flagutil.NewArrayString("streamAggr.dropInputLabels", "An optional list of labels to drop from samples "+ "before stream de-duplication and aggregation . See https://docs.victoriametrics.com/stream-aggregation/#dropping-unneeded-labels") - disableOnDiskQueue = flag.Bool("remoteWrite.disableOnDiskQueue", false, "Whether to disable storing pending data to -remoteWrite.tmpDataPath "+ - "when the configured remote storage systems cannot keep up with the data ingestion rate. See https://docs.victoriametrics.com/vmagent/#disabling-on-disk-persistence ."+ + disableOnDiskQueue = flagutil.NewArrayBool("remoteWrite.disableOnDiskQueue", "Whether to disable storing pending data to -remoteWrite.tmpDataPath "+ + "when the configured remote storage systems cannot keep up with the data ingestion rate. See https://docs.victoriametrics.com/vmagent#disabling-on-disk-persistence ."+ "See also -remoteWrite.dropSamplesOnOverload") - dropSamplesOnOverload = flag.Bool("remoteWrite.dropSamplesOnOverload", false, "Whether to drop samples when -remoteWrite.disableOnDiskQueue is set and if the samples "+ - "cannot be pushed into the configured remote storage systems in a timely manner. See https://docs.victoriametrics.com/vmagent/#disabling-on-disk-persistence") + dropSamplesOnOverload = flagutil.NewArrayBool("remoteWrite.dropSamplesOnOverload", "Whether to drop samples when -remoteWrite.disableOnDiskQueue is set and if the samples "+ + "cannot be pushed into the configured remote storage systems in a timely manner. See https://docs.victoriametrics.com/vmagent#disabling-on-disk-persistence") ) var ( @@ -135,6 +135,9 @@ var ( "see https://docs.victoriametrics.com/vmagent/#disabling-on-disk-persistence"), StatusCode: http.StatusTooManyRequests, } + + // disableOnDiskQueueAll is set to true if all remoteWrite.urls were configured to disable persistent queue via disableOnDiskQueue + disableOnDiskQueueAll bool ) // MultitenancyEnabled returns true if -enableMultitenantHandlers or -remoteWrite.multitenantURL is specified. @@ -227,6 +230,15 @@ func Init() { if len(*remoteWriteURLs) > 0 { rwctxsDefault = newRemoteWriteCtxs(nil, *remoteWriteURLs) } + + disableOnDiskQueueAll = true + for _, v := range *disableOnDiskQueue { + if !v { + disableOnDiskQueueAll = false + break + } + } + dropDanglingQueues() // Start config reloader. @@ -440,10 +452,10 @@ func PushDropSamplesOnFailure(at *auth.Token, wr *prompbmarshal.WriteRequest) { // // The caller must return ErrQueueFullHTTPRetry to the client, which sends wr, if TryPush returns false. func TryPush(at *auth.Token, wr *prompbmarshal.WriteRequest) bool { - return tryPush(at, wr, *dropSamplesOnOverload) + return tryPush(at, wr, false) } -func tryPush(at *auth.Token, wr *prompbmarshal.WriteRequest, dropSamplesOnFailure bool) bool { +func tryPush(at *auth.Token, wr *prompbmarshal.WriteRequest, forceDropSamplesOnFailure bool) bool { tss := wr.Timeseries if at == nil && MultitenancyEnabled() { @@ -476,17 +488,18 @@ func tryPush(at *auth.Token, wr *prompbmarshal.WriteRequest, dropSamplesOnFailur rowsCount := getRowsCount(tss) - if *disableOnDiskQueue { - // Quick check whether writes to configured remote storage systems are blocked. - // This allows saving CPU time spent on relabeling and block compression - // if some of remote storage systems cannot keep up with the data ingestion rate. + // Quick check whether writes to configured remote storage systems are blocked. + // This allows saving CPU time spent on relabeling and block compression + // if some of remote storage systems cannot keep up with the data ingestion rate. + // this shortcut is only applicable if all remote writes have disableOnDiskQueue = true + if disableOnDiskQueueAll { for _, rwctx := range rwctxs { if rwctx.fq.IsWriteBlocked() { - pushFailures.Inc() - if dropSamplesOnFailure { + rwctx.pushFailures.Inc() + if forceDropSamplesOnFailure || rwctx.dropSamplesOnOverload { // Just drop samples - samplesDropped.Add(rowsCount) - return true + rwctx.rowsDroppedOnPushFailure.Add(rowsCount) + continue } return false } @@ -539,27 +552,14 @@ func tryPush(at *auth.Token, wr *prompbmarshal.WriteRequest, dropSamplesOnFailur } sortLabelsIfNeeded(tssBlock) tssBlock = limitSeriesCardinality(tssBlock) - if !tryPushBlockToRemoteStorages(rwctxs, tssBlock) { - if !*disableOnDiskQueue { - logger.Panicf("BUG: tryPushBlockToRemoteStorages must return true if -remoteWrite.disableOnDiskQueue isn't set") - } - pushFailures.Inc() - if dropSamplesOnFailure { - samplesDropped.Add(rowsCount) - return true - } + if !tryPushBlockToRemoteStorages(rwctxs, tssBlock, forceDropSamplesOnFailure) { return false } } return true } -var ( - samplesDropped = metrics.NewCounter(`vmagent_remotewrite_samples_dropped_total`) - pushFailures = metrics.NewCounter(`vmagent_remotewrite_push_failures_total`) -) - -func tryPushBlockToRemoteStorages(rwctxs []*remoteWriteCtx, tssBlock []prompbmarshal.TimeSeries) bool { +func tryPushBlockToRemoteStorages(rwctxs []*remoteWriteCtx, tssBlock []prompbmarshal.TimeSeries, forceDropSamplesOnFailure bool) bool { if len(tssBlock) == 0 { // Nothing to push return true @@ -567,7 +567,7 @@ func tryPushBlockToRemoteStorages(rwctxs []*remoteWriteCtx, tssBlock []prompbmar if len(rwctxs) == 1 { // Fast path - just push data to the configured single remote storage - return rwctxs[0].TryPush(tssBlock) + return rwctxs[0].TryPush(tssBlock, forceDropSamplesOnFailure) } // We need to push tssBlock to multiple remote storages. @@ -578,7 +578,7 @@ func tryPushBlockToRemoteStorages(rwctxs []*remoteWriteCtx, tssBlock []prompbmar if replicas <= 0 { replicas = 1 } - return tryShardingBlockAmongRemoteStorages(rwctxs, tssBlock, replicas) + return tryShardingBlockAmongRemoteStorages(rwctxs, tssBlock, replicas, forceDropSamplesOnFailure) } // Replicate tssBlock samples among rwctxs. @@ -590,7 +590,7 @@ func tryPushBlockToRemoteStorages(rwctxs []*remoteWriteCtx, tssBlock []prompbmar for _, rwctx := range rwctxs { go func(rwctx *remoteWriteCtx) { defer wg.Done() - if !rwctx.TryPush(tssBlock) { + if !rwctx.TryPush(tssBlock, forceDropSamplesOnFailure) { anyPushFailed.Store(true) } }(rwctx) @@ -599,7 +599,7 @@ func tryPushBlockToRemoteStorages(rwctxs []*remoteWriteCtx, tssBlock []prompbmar return !anyPushFailed.Load() } -func tryShardingBlockAmongRemoteStorages(rwctxs []*remoteWriteCtx, tssBlock []prompbmarshal.TimeSeries, replicas int) bool { +func tryShardingBlockAmongRemoteStorages(rwctxs []*remoteWriteCtx, tssBlock []prompbmarshal.TimeSeries, replicas int, forceDropSamplesOnFailure bool) bool { x := getTSSShards(len(rwctxs)) defer putTSSShards(x) @@ -653,7 +653,7 @@ func tryShardingBlockAmongRemoteStorages(rwctxs []*remoteWriteCtx, tssBlock []pr wg.Add(1) go func(rwctx *remoteWriteCtx, tss []prompbmarshal.TimeSeries) { defer wg.Done() - if !rwctx.TryPush(tss) { + if !rwctx.TryPush(tss, forceDropSamplesOnFailure) { anyPushFailed.Store(true) } }(rwctx, shard) @@ -786,14 +786,19 @@ type remoteWriteCtx struct { sas atomic.Pointer[streamaggr.Aggregators] deduplicator *streamaggr.Deduplicator - streamAggrKeepInput bool - streamAggrDropInput bool + streamAggrKeepInput bool + streamAggrDropInput bool + disableOnDiskQueue bool + dropSamplesOnOverload bool pss []*pendingSeries pssNextIdx atomic.Uint64 rowsPushedAfterRelabel *metrics.Counter rowsDroppedByRelabel *metrics.Counter + + pushFailures *metrics.Counter + rowsDroppedOnPushFailure *metrics.Counter } func newRemoteWriteCtx(argIdx int, remoteWriteURL *url.URL, maxInmemoryBlocks int, sanitizedURL string) *remoteWriteCtx { @@ -809,7 +814,8 @@ func newRemoteWriteCtx(argIdx int, remoteWriteURL *url.URL, maxInmemoryBlocks in logger.Warnf("rounding the -remoteWrite.maxDiskUsagePerURL=%d to the minimum supported value: %d", maxPendingBytes, persistentqueue.DefaultChunkFileSize) maxPendingBytes = persistentqueue.DefaultChunkFileSize } - fq := persistentqueue.MustOpenFastQueue(queuePath, sanitizedURL, maxInmemoryBlocks, maxPendingBytes, *disableOnDiskQueue) + isPQDisabled := disableOnDiskQueue.GetOptionalArg(argIdx) + fq := persistentqueue.MustOpenFastQueue(queuePath, sanitizedURL, maxInmemoryBlocks, maxPendingBytes, isPQDisabled) _ = metrics.GetOrCreateGauge(fmt.Sprintf(`vmagent_remotewrite_pending_data_bytes{path=%q, url=%q}`, queuePath, sanitizedURL), func() float64 { return float64(fq.GetPendingBytes()) }) @@ -852,8 +858,14 @@ func newRemoteWriteCtx(argIdx int, remoteWriteURL *url.URL, maxInmemoryBlocks in c: c, pss: pss, + dropSamplesOnOverload: dropSamplesOnOverload.GetOptionalArg(argIdx), + disableOnDiskQueue: isPQDisabled, + rowsPushedAfterRelabel: metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_rows_pushed_after_relabel_total{path=%q, url=%q}`, queuePath, sanitizedURL)), rowsDroppedByRelabel: metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_relabel_metrics_dropped_total{path=%q, url=%q}`, queuePath, sanitizedURL)), + + pushFailures: metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_push_failures_total{path=%q, url=%q}`, queuePath, sanitizedURL)), + rowsDroppedOnPushFailure: metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_samples_dropped_total{path=%q, url=%q}`, queuePath, sanitizedURL)), } // Initialize sas @@ -914,7 +926,7 @@ func (rwctx *remoteWriteCtx) MustStop() { // // TryPush can be called concurrently for multiple remoteWriteCtx, // so it shouldn't modify tss entries. -func (rwctx *remoteWriteCtx) TryPush(tss []prompbmarshal.TimeSeries) bool { +func (rwctx *remoteWriteCtx) TryPush(tss []prompbmarshal.TimeSeries, forceDropSamplesOnFailure bool) bool { // Apply relabeling var rctx *relabelCtx var v *[]prompbmarshal.TimeSeries @@ -966,6 +978,14 @@ func (rwctx *remoteWriteCtx) TryPush(tss []prompbmarshal.TimeSeries) bool { putRelabelCtx(rctx) } + if !ok { + rwctx.pushFailures.Inc() + if forceDropSamplesOnFailure || rwctx.dropSamplesOnOverload { + rwctx.rowsDroppedOnPushFailure.Add(len(tss)) + return true + } + } + return ok } @@ -990,13 +1010,13 @@ func (rwctx *remoteWriteCtx) pushInternalTrackDropped(tss []prompbmarshal.TimeSe if rwctx.tryPushInternal(tss) { return } - if !*disableOnDiskQueue { + if !rwctx.disableOnDiskQueue { logger.Panicf("BUG: tryPushInternal must return true if -remoteWrite.disableOnDiskQueue isn't set") } - pushFailures.Inc() - if *dropSamplesOnOverload { + rwctx.pushFailures.Inc() + if dropSamplesOnOverload.GetOptionalArg(rwctx.idx) { rowsCount := getRowsCount(tss) - samplesDropped.Add(rowsCount) + rwctx.rowsDroppedOnPushFailure.Add(rowsCount) } } diff --git a/app/vmagent/remotewrite/remotewrite_test.go b/app/vmagent/remotewrite/remotewrite_test.go index 2fd8a9011..6c384ba2e 100644 --- a/app/vmagent/remotewrite/remotewrite_test.go +++ b/app/vmagent/remotewrite/remotewrite_test.go @@ -96,7 +96,7 @@ func TestRemoteWriteContext_TryPush_ImmutableTimeseries(t *testing.T) { // copy inputTss to make sure it is not mutated during TryPush call copy(expectedTss, inputTss) - rwctx.TryPush(inputTss) + rwctx.TryPush(inputTss, false) if !reflect.DeepEqual(expectedTss, inputTss) { t.Fatalf("unexpected samples;\ngot\n%v\nwant\n%v", inputTss, expectedTss) diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index f21551350..0ec5a1665 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -38,6 +38,8 @@ See also [LTS releases](https://docs.victoriametrics.com/lts-releases/). * FEATURE: [dashboards/operator](https://grafana.com/grafana/dashboards/17869), [dashboards/backupmanager](https://grafana.com/grafana/dashboards/17798) and [dashboard/tenant-statistic](https://grafana.com/grafana/dashboards/16399): update dashboard to be compatible with Grafana 10+ version. * FEATURE: [dashboards/cluster](https://grafana.com/grafana/dashboards/11176): add new panel `Concurrent selects` to `vmstorage` row. The panel will show how many ongoing select queries are processed by vmstorage and should help to identify resource bottlenecks. See panel description for more details. * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add service discovery support for [Vultr](https://www.vultr.com/). See [these docs](https://docs.victoriametrics.com/sd_configs/#vultr_sd_configs) and [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6041). +* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): allow configuring `-remoteWrite.disableOnDiskQueue` and `-remoteWrite.dropSamplesOnOverload` cmd-line flags per each `-remoteWrite.url`. See this [pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6065). Thanks to @rbizos for implementaion! +* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add labels `path` and `url` to metrics `vmagent_remotewrite_push_failures_total` and `vmagent_remotewrite_samples_dropped_total`. Now number of failed pushes and dropped samples can be tracked per `-remoteWrite.url`. * BUGFIX: [vmui](https://docs.victoriametrics.com/#vmui): fix bug that prevents the first query trace from expanding on click event. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6186). The issue was introduced in [v1.100.0](https://docs.victoriametrics.com/changelog/#v11000) release. * BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent/): prevent potential panic during [stream aggregation](https://docs.victoriametrics.com/stream-aggregation.html) if more than one `--remoteWrite.streamAggr.dedupInterval` is configured. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6205). diff --git a/docs/vmagent.md b/docs/vmagent.md index 7e368f437..5c77e4f35 100644 --- a/docs/vmagent.md +++ b/docs/vmagent.md @@ -1187,7 +1187,7 @@ If you have suggestions for improvements or have found a bug - please open an is with `-remoteWrite.maxDiskUsagePerURL` command-line flag. If you don't want to send all the buffered data from the directory to remote storage then simply stop `vmagent` and delete the directory. -* If `vmagent` runs on a host with slow persistent storage, which cannot keep up with the volume of processed samples, then is is possible to disable +* If `vmagent` runs on a host with slow persistent storage, which cannot keep up with the volume of processed samples, then is possible to disable the persistent storage with `-remoteWrite.disableOnDiskQueue` command-line flag. See [these docs](#disabling-on-disk-persistence) for more details. * By default `vmagent` masks `-remoteWrite.url` with `secret-url` values in logs and at `/metrics` page because @@ -2057,10 +2057,14 @@ See the docs at https://docs.victoriametrics.com/vmagent/ . Optional path to bearer token file to use for the corresponding -remoteWrite.url. The token is re-read from the file every second Supports an array of values separated by comma or specified via multiple flags. Value can contain comma inside single-quoted or double-quoted string, {}, [] and () braces. - -remoteWrite.disableOnDiskQueue - Whether to disable storing pending data to -remoteWrite.tmpDataPath when the configured remote storage systems cannot keep up with the data ingestion rate. See https://docs.victoriametrics.com/vmagent/#disabling-on-disk-persistence .See also -remoteWrite.dropSamplesOnOverload - -remoteWrite.dropSamplesOnOverload - Whether to drop samples when -remoteWrite.disableOnDiskQueue is set and if the samples cannot be pushed into the configured remote storage systems in a timely manner. See https://docs.victoriametrics.com/vmagent/#disabling-on-disk-persistence + -remoteWrite.disableOnDiskQueue array + Whether to disable storing pending data to -remoteWrite.tmpDataPath when the configured remote storage systems cannot keep up with the data ingestion rate. See https://docs.victoriametrics.com/vmagent#disabling-on-disk-persistence .See also -remoteWrite.dropSamplesOnOverload + Supports array of values separated by comma or specified via multiple flags. + Empty values are set to false. + -remoteWrite.dropSamplesOnOverload array + Whether to drop samples when -remoteWrite.disableOnDiskQueue is set and if the samples cannot be pushed into the configured remote storage systems in a timely manner. See https://docs.victoriametrics.com/vmagent#disabling-on-disk-persistence + Supports array of values separated by comma or specified via multiple flags. + Empty values are set to false. -remoteWrite.flushInterval duration Interval for flushing the data to remote storage. This option takes effect only when less than 10K data points per second are pushed to -remoteWrite.url (default 1s) -remoteWrite.forcePromProto array diff --git a/lib/persistentqueue/fastqueue.go b/lib/persistentqueue/fastqueue.go index a2d597411..3c194f7ba 100644 --- a/lib/persistentqueue/fastqueue.go +++ b/lib/persistentqueue/fastqueue.go @@ -63,7 +63,7 @@ func MustOpenFastQueue(path, name string, maxInmemoryBlocks int, maxPendingBytes return float64(n) }) pendingBytes := fq.GetPendingBytes() - logger.Infof("opened fast persistent queue at %q with maxInmemoryBlocks=%d, it contains %d pending bytes", path, maxInmemoryBlocks, pendingBytes) + logger.Infof("opened fast persistent queue at %q with maxInmemoryBlocks=%d isPQDisabled=%t, it contains %d pending bytes", path, maxInmemoryBlocks, isPQDisabled, pendingBytes) return fq }