mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
app/vmagent/remotewrite: follow-up for 87fd400dfc
- Drop samples and return true from remotewrite.TryPush() at fast path when all the remote storage systems are configured with the disabled on-disk queue, every in-memory queue is full and -remoteWrite.dropSamplesOnOverload is set to true. This case is quite common, so it should be optimized. Previously additional CPU time was spent on per-remoteWriteCtx relabeling and other processing in this case. - Properly count the number of dropped samples inside remoteWriteCtx.pushInternalTrackDropped(). Previously dropped samples were counted only if -remoteWrite.dropSamplesOnOverload flag is set. In reality, the samples are dropped when they couldn't be sent to the queue because in-memory queue is full and on-disk queue is disabled. The remoteWriteCtx.pushInternalTrackDropped() function is called by streaming aggregation for pushing the aggregated data to the remote storage. Streaming aggregation cannot wait until the remote storage processes pending data, so it drops aggregated samples in this case. - Clarify the description for -remoteWrite.disableOnDiskQueue command-line flag at -help output, so it is clear that this flag can be set individually per each -remoteWrite.url. - Make the -remoteWrite.dropSamplesOnOverload flag global. If some of the remote storage systems are configured with the disabled on-disk queue, then there is no sense in keeping samples on some of these systems, while dropping samples on the remaining systems, since this will result in global stall on the remote storage system with the disabled on-disk queue and with the -remoteWrite.dropSamplesOnOverload=false flag. vmagent will always return false from remotewrite.TryPush() in this case. This will result in infinite duplicate samples written to the remaining remote storage systems. That's why the -remoteWrite.dropSamplesOnOverload is forcibly set to true if more than one -remoteWrite.disableOnDiskQueue flag is set. This allows proceeding with newly scraped / pushed samples by sending them to the remaining remote storage systems, while dropping them on overloaded systems with the -remoteWrite.disableOnDiskQueue flag set. - Verify that the remoteWriteCtx.TryPush() returns true in the TestRemoteWriteContext_TryPush_ImmutableTimeseries test. - Mention in vmagent docs that the -remoteWrite.disableOnDiskQueue command-line flag can be set individually per each -remoteWrite.url. See https://docs.victoriametrics.com/vmagent/#disabling-on-disk-persistence Updates https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6248 Updates https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6065
This commit is contained in:
parent
a8472d033a
commit
0145b65f25
5 changed files with 77 additions and 56 deletions
|
@ -6,6 +6,7 @@ import (
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/url"
|
"net/url"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
|
"slices"
|
||||||
"strconv"
|
"strconv"
|
||||||
"sync"
|
"sync"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
|
@ -88,10 +89,10 @@ var (
|
||||||
"By default there are no limits on samples ingestion rate. See also -remoteWrite.rateLimit")
|
"By default there are no limits on samples ingestion rate. See also -remoteWrite.rateLimit")
|
||||||
|
|
||||||
disableOnDiskQueue = flagutil.NewArrayBool("remoteWrite.disableOnDiskQueue", "Whether to disable storing pending data to -remoteWrite.tmpDataPath "+
|
disableOnDiskQueue = flagutil.NewArrayBool("remoteWrite.disableOnDiskQueue", "Whether to disable storing pending data to -remoteWrite.tmpDataPath "+
|
||||||
"when the configured remote storage systems cannot keep up with the data ingestion rate. See https://docs.victoriametrics.com/vmagent#disabling-on-disk-persistence ."+
|
"when the remote storage system at the corresponding -remoteWrite.url cannot keep up with the data ingestion rate. "+
|
||||||
"See also -remoteWrite.dropSamplesOnOverload")
|
"See https://docs.victoriametrics.com/vmagent#disabling-on-disk-persistence . See also -remoteWrite.dropSamplesOnOverload")
|
||||||
dropSamplesOnOverload = flagutil.NewArrayBool("remoteWrite.dropSamplesOnOverload", "Whether to drop samples when -remoteWrite.disableOnDiskQueue is set and if the samples "+
|
dropSamplesOnOverload = flag.Bool("remoteWrite.dropSamplesOnOverload", false, "Whether to drop samples when -remoteWrite.disableOnDiskQueue is set and if the samples "+
|
||||||
"cannot be pushed into the configured remote storage systems in a timely manner. See https://docs.victoriametrics.com/vmagent#disabling-on-disk-persistence")
|
"cannot be pushed into the configured -remoteWrite.url systems in a timely manner. See https://docs.victoriametrics.com/vmagent#disabling-on-disk-persistence")
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
@ -109,8 +110,11 @@ var (
|
||||||
StatusCode: http.StatusTooManyRequests,
|
StatusCode: http.StatusTooManyRequests,
|
||||||
}
|
}
|
||||||
|
|
||||||
// disableOnDiskQueueAll is set to true if all remoteWrite.urls were configured to disable persistent queue via disableOnDiskQueue
|
// disableOnDiskQueueAll is set to true if all -remoteWrite.url were configured to disable persistent queue via -remoteWrite.disableOnDiskQueue
|
||||||
disableOnDiskQueueAll bool
|
disableOnDiskQueueAll bool
|
||||||
|
|
||||||
|
// dropSamplesOnFailureGlobal is set to true if -remoteWrite.dropSamplesOnOverload is set or if multiple -remoteWrite.disableOnDiskQueue options are set.
|
||||||
|
dropSamplesOnFailureGlobal bool
|
||||||
)
|
)
|
||||||
|
|
||||||
// MultitenancyEnabled returns true if -enableMultitenantHandlers is specified.
|
// MultitenancyEnabled returns true if -enableMultitenantHandlers is specified.
|
||||||
|
@ -218,13 +222,14 @@ func Init() {
|
||||||
rwctxs = newRemoteWriteCtxs(nil, *remoteWriteURLs)
|
rwctxs = newRemoteWriteCtxs(nil, *remoteWriteURLs)
|
||||||
}
|
}
|
||||||
|
|
||||||
disableOnDiskQueueAll = true
|
disableOnDiskQueues := []bool(*disableOnDiskQueue)
|
||||||
for _, v := range *disableOnDiskQueue {
|
disableOnDiskQueueAll = !slices.Contains(disableOnDiskQueues, false)
|
||||||
if !v {
|
|
||||||
disableOnDiskQueueAll = false
|
// Samples must be dropped if multiple -remoteWrite.disableOnDiskQueue options are configured and at least a single is set to true.
|
||||||
break
|
// In this case it is impossible to prevent from sending many duplicates of samples passed to TryPush() to all the configured -remoteWrite.url
|
||||||
}
|
// if these samples couldn't be sent to the -remoteWrite.url with the disabled persistent queue. So it is better sending samples
|
||||||
}
|
// to the remaining -remoteWrite.url and dropping them on the blocked queue.
|
||||||
|
dropSamplesOnFailureGlobal = *dropSamplesOnOverload || len(disableOnDiskQueues) > 1 && slices.Contains(disableOnDiskQueues, true)
|
||||||
|
|
||||||
dropDanglingQueues()
|
dropDanglingQueues()
|
||||||
|
|
||||||
|
@ -397,6 +402,8 @@ func Stop() {
|
||||||
|
|
||||||
// PushDropSamplesOnFailure pushes wr to the configured remote storage systems set via -remoteWrite.url
|
// PushDropSamplesOnFailure pushes wr to the configured remote storage systems set via -remoteWrite.url
|
||||||
//
|
//
|
||||||
|
// PushDropSamplesOnFailure drops wr samples if they cannot be sent to -remoteWrite.url by any reason.
|
||||||
|
//
|
||||||
// PushDropSamplesOnFailure can modify wr contents.
|
// PushDropSamplesOnFailure can modify wr contents.
|
||||||
func PushDropSamplesOnFailure(at *auth.Token, wr *prompbmarshal.WriteRequest) {
|
func PushDropSamplesOnFailure(at *auth.Token, wr *prompbmarshal.WriteRequest) {
|
||||||
_ = tryPush(at, wr, true)
|
_ = tryPush(at, wr, true)
|
||||||
|
@ -409,7 +416,7 @@ func PushDropSamplesOnFailure(at *auth.Token, wr *prompbmarshal.WriteRequest) {
|
||||||
//
|
//
|
||||||
// The caller must return ErrQueueFullHTTPRetry to the client, which sends wr, if TryPush returns false.
|
// The caller must return ErrQueueFullHTTPRetry to the client, which sends wr, if TryPush returns false.
|
||||||
func TryPush(at *auth.Token, wr *prompbmarshal.WriteRequest) bool {
|
func TryPush(at *auth.Token, wr *prompbmarshal.WriteRequest) bool {
|
||||||
return tryPush(at, wr, false)
|
return tryPush(at, wr, dropSamplesOnFailureGlobal)
|
||||||
}
|
}
|
||||||
|
|
||||||
func tryPush(at *auth.Token, wr *prompbmarshal.WriteRequest, forceDropSamplesOnFailure bool) bool {
|
func tryPush(at *auth.Token, wr *prompbmarshal.WriteRequest, forceDropSamplesOnFailure bool) bool {
|
||||||
|
@ -433,17 +440,22 @@ func tryPush(at *auth.Token, wr *prompbmarshal.WriteRequest, forceDropSamplesOnF
|
||||||
// if some of remote storage systems cannot keep up with the data ingestion rate.
|
// if some of remote storage systems cannot keep up with the data ingestion rate.
|
||||||
// this shortcut is only applicable if all remote writes have disableOnDiskQueue = true
|
// this shortcut is only applicable if all remote writes have disableOnDiskQueue = true
|
||||||
if disableOnDiskQueueAll {
|
if disableOnDiskQueueAll {
|
||||||
|
skippedQueues := 0
|
||||||
for _, rwctx := range rwctxs {
|
for _, rwctx := range rwctxs {
|
||||||
if rwctx.fq.IsWriteBlocked() {
|
if rwctx.fq.IsWriteBlocked() {
|
||||||
rwctx.pushFailures.Inc()
|
rwctx.pushFailures.Inc()
|
||||||
if forceDropSamplesOnFailure || rwctx.dropSamplesOnOverload {
|
if !forceDropSamplesOnFailure {
|
||||||
// Just drop samples
|
return false
|
||||||
rwctx.rowsDroppedOnPushFailure.Add(rowsCount)
|
|
||||||
continue
|
|
||||||
}
|
}
|
||||||
return false
|
rwctx.rowsDroppedOnPushFailure.Add(rowsCount)
|
||||||
|
skippedQueues++
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if skippedQueues == len(rwctxs) {
|
||||||
|
// All the queues are skipped because they are blocked and dropSamplesOnFailure is set to true.
|
||||||
|
// Return true to the caller, so it doesn't re-send the samples again.
|
||||||
|
return true
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
var rctx *relabelCtx
|
var rctx *relabelCtx
|
||||||
|
@ -745,10 +757,8 @@ type remoteWriteCtx struct {
|
||||||
sas atomic.Pointer[streamaggr.Aggregators]
|
sas atomic.Pointer[streamaggr.Aggregators]
|
||||||
deduplicator *streamaggr.Deduplicator
|
deduplicator *streamaggr.Deduplicator
|
||||||
|
|
||||||
streamAggrKeepInput bool
|
streamAggrKeepInput bool
|
||||||
streamAggrDropInput bool
|
streamAggrDropInput bool
|
||||||
disableOnDiskQueue bool
|
|
||||||
dropSamplesOnOverload bool
|
|
||||||
|
|
||||||
pss []*pendingSeries
|
pss []*pendingSeries
|
||||||
pssNextIdx atomic.Uint64
|
pssNextIdx atomic.Uint64
|
||||||
|
@ -773,6 +783,7 @@ func newRemoteWriteCtx(argIdx int, remoteWriteURL *url.URL, maxInmemoryBlocks in
|
||||||
logger.Warnf("rounding the -remoteWrite.maxDiskUsagePerURL=%d to the minimum supported value: %d", maxPendingBytes, persistentqueue.DefaultChunkFileSize)
|
logger.Warnf("rounding the -remoteWrite.maxDiskUsagePerURL=%d to the minimum supported value: %d", maxPendingBytes, persistentqueue.DefaultChunkFileSize)
|
||||||
maxPendingBytes = persistentqueue.DefaultChunkFileSize
|
maxPendingBytes = persistentqueue.DefaultChunkFileSize
|
||||||
}
|
}
|
||||||
|
|
||||||
isPQDisabled := disableOnDiskQueue.GetOptionalArg(argIdx)
|
isPQDisabled := disableOnDiskQueue.GetOptionalArg(argIdx)
|
||||||
fq := persistentqueue.MustOpenFastQueue(queuePath, sanitizedURL, maxInmemoryBlocks, maxPendingBytes, isPQDisabled)
|
fq := persistentqueue.MustOpenFastQueue(queuePath, sanitizedURL, maxInmemoryBlocks, maxPendingBytes, isPQDisabled)
|
||||||
_ = metrics.GetOrCreateGauge(fmt.Sprintf(`vmagent_remotewrite_pending_data_bytes{path=%q, url=%q}`, queuePath, sanitizedURL), func() float64 {
|
_ = metrics.GetOrCreateGauge(fmt.Sprintf(`vmagent_remotewrite_pending_data_bytes{path=%q, url=%q}`, queuePath, sanitizedURL), func() float64 {
|
||||||
|
@ -817,9 +828,6 @@ func newRemoteWriteCtx(argIdx int, remoteWriteURL *url.URL, maxInmemoryBlocks in
|
||||||
c: c,
|
c: c,
|
||||||
pss: pss,
|
pss: pss,
|
||||||
|
|
||||||
dropSamplesOnOverload: dropSamplesOnOverload.GetOptionalArg(argIdx),
|
|
||||||
disableOnDiskQueue: isPQDisabled,
|
|
||||||
|
|
||||||
rowsPushedAfterRelabel: metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_rows_pushed_after_relabel_total{path=%q, url=%q}`, queuePath, sanitizedURL)),
|
rowsPushedAfterRelabel: metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_rows_pushed_after_relabel_total{path=%q, url=%q}`, queuePath, sanitizedURL)),
|
||||||
rowsDroppedByRelabel: metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_relabel_metrics_dropped_total{path=%q, url=%q}`, queuePath, sanitizedURL)),
|
rowsDroppedByRelabel: metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_relabel_metrics_dropped_total{path=%q, url=%q}`, queuePath, sanitizedURL)),
|
||||||
|
|
||||||
|
@ -934,8 +942,9 @@ func (rwctx *remoteWriteCtx) TryPush(tss []prompbmarshal.TimeSeries, forceDropSa
|
||||||
|
|
||||||
// Couldn't push tss to remote storage
|
// Couldn't push tss to remote storage
|
||||||
rwctx.pushFailures.Inc()
|
rwctx.pushFailures.Inc()
|
||||||
if forceDropSamplesOnFailure || rwctx.dropSamplesOnOverload {
|
if forceDropSamplesOnFailure {
|
||||||
rwctx.rowsDroppedOnPushFailure.Add(len(tss))
|
rowsCount := getRowsCount(tss)
|
||||||
|
rwctx.rowsDroppedOnPushFailure.Add(rowsCount)
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
return false
|
return false
|
||||||
|
@ -962,14 +971,12 @@ func (rwctx *remoteWriteCtx) pushInternalTrackDropped(tss []prompbmarshal.TimeSe
|
||||||
if rwctx.tryPushInternal(tss) {
|
if rwctx.tryPushInternal(tss) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if !rwctx.disableOnDiskQueue {
|
if !rwctx.fq.IsPersistentQueueDisabled() {
|
||||||
logger.Panicf("BUG: tryPushInternal must return true if -remoteWrite.disableOnDiskQueue isn't set")
|
logger.Panicf("BUG: tryPushInternal must return true if -remoteWrite.disableOnDiskQueue isn't set")
|
||||||
}
|
}
|
||||||
rwctx.pushFailures.Inc()
|
rwctx.pushFailures.Inc()
|
||||||
if dropSamplesOnOverload.GetOptionalArg(rwctx.idx) {
|
rowsCount := getRowsCount(tss)
|
||||||
rowsCount := getRowsCount(tss)
|
rwctx.rowsDroppedOnPushFailure.Add(rowsCount)
|
||||||
rwctx.rowsDroppedOnPushFailure.Add(rowsCount)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (rwctx *remoteWriteCtx) tryPushInternal(tss []prompbmarshal.TimeSeries) bool {
|
func (rwctx *remoteWriteCtx) tryPushInternal(tss []prompbmarshal.TimeSeries) bool {
|
||||||
|
|
|
@ -94,7 +94,9 @@ func TestRemoteWriteContext_TryPush_ImmutableTimeseries(t *testing.T) {
|
||||||
|
|
||||||
// copy inputTss to make sure it is not mutated during TryPush call
|
// copy inputTss to make sure it is not mutated during TryPush call
|
||||||
copy(expectedTss, inputTss)
|
copy(expectedTss, inputTss)
|
||||||
rwctx.TryPush(inputTss, false)
|
if !rwctx.TryPush(inputTss, false) {
|
||||||
|
t.Fatalf("cannot push samples to rwctx")
|
||||||
|
}
|
||||||
|
|
||||||
if !reflect.DeepEqual(expectedTss, inputTss) {
|
if !reflect.DeepEqual(expectedTss, inputTss) {
|
||||||
t.Fatalf("unexpected samples;\ngot\n%v\nwant\n%v", inputTss, expectedTss)
|
t.Fatalf("unexpected samples;\ngot\n%v\nwant\n%v", inputTss, expectedTss)
|
||||||
|
|
|
@ -119,8 +119,8 @@ Released at 2024-06-07
|
||||||
* FEATURE: [dashboards](https://grafana.com/orgs/victoriametrics): use `$__interval` variable for offsets and look-behind windows in annotations. This should improve precision of `restarts` and `version change` annotations when zooming-in/zooming-out on the dashboards.
|
* FEATURE: [dashboards](https://grafana.com/orgs/victoriametrics): use `$__interval` variable for offsets and look-behind windows in annotations. This should improve precision of `restarts` and `version change` annotations when zooming-in/zooming-out on the dashboards.
|
||||||
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): support aggregation and deduplication configs before replicating data to configured `-remoteWrite.url` destinations. This saves CPU and memory resources when incoming data needs to be aggregated or deduplicated once and then replicated to multiple destinations. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5467).
|
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): support aggregation and deduplication configs before replicating data to configured `-remoteWrite.url` destinations. This saves CPU and memory resources when incoming data needs to be aggregated or deduplicated once and then replicated to multiple destinations. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5467).
|
||||||
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add service discovery support for [Vultr](https://www.vultr.com/). See [these docs](https://docs.victoriametrics.com/sd_configs/#vultr_sd_configs) and [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6041).
|
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add service discovery support for [Vultr](https://www.vultr.com/). See [these docs](https://docs.victoriametrics.com/sd_configs/#vultr_sd_configs) and [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6041).
|
||||||
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): allow configuring `-remoteWrite.disableOnDiskQueue` and `-remoteWrite.dropSamplesOnOverload` cmd-line flags per each `-remoteWrite.url`. See this [pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6065). Thanks to @rbizos for implementation!
|
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): allow specifying `-remoteWrite.disableOnDiskQueue` command-line flag per each `-remoteWrite.url`. If multiple `-remoteWrite.disableOnDiskQueue` command-line flags are configured, then the `-remoteWrite.dropSamplesOnOverload` is automatically set to true, so samples are automatically dropped if they cannot be sent to the corresponding `-remoteWrite.url` in a timely manner. See this [pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6065). Thanks to @rbizos for implementation!
|
||||||
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add labels `path` and `url` to metrics `vmagent_remotewrite_push_failures_total` and `vmagent_remotewrite_samples_dropped_total`. Now number of failed pushes and dropped samples can be tracked per `-remoteWrite.url`.
|
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add `path` and `url` labels to `vmagent_remotewrite_push_failures_total` and `vmagent_remotewrite_samples_dropped_total` [metrics](https://docs.victoriametrics.com/vmagent/#monitoring), so the number of failed pushes and dropped samples can be tracked per each `-remoteWrite.url`.
|
||||||
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): support Statsd plaintext protocol. See [these docs](https://docs.victoriametrics.com/vmagent/#how-to-push-data-to-vmagent) and this [pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5053). Thanks to @Koilanetroc for implementation!
|
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): support Statsd plaintext protocol. See [these docs](https://docs.victoriametrics.com/vmagent/#how-to-push-data-to-vmagent) and this [pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5053). Thanks to @Koilanetroc for implementation!
|
||||||
* FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): add [rate_sum](https://docs.victoriametrics.com/stream-aggregation/#rate_sum) and [rate_avg](https://docs.victoriametrics.com/stream-aggregation/#rate_avg) aggregation outputs.
|
* FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): add [rate_sum](https://docs.victoriametrics.com/stream-aggregation/#rate_sum) and [rate_avg](https://docs.victoriametrics.com/stream-aggregation/#rate_avg) aggregation outputs.
|
||||||
* FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): reduce the number of allocated objects in heap during deduplication and aggregation. The change supposed to reduce pressure on Garbage Collector, as it will need to scan less objects. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6402).
|
* FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): reduce the number of allocated objects in heap during deduplication and aggregation. The change supposed to reduce pressure on Garbage Collector, as it will need to scan less objects. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6402).
|
||||||
|
|
|
@ -1021,9 +1021,9 @@ scrape_configs:
|
||||||
|
|
||||||
## Disabling on-disk persistence
|
## Disabling on-disk persistence
|
||||||
|
|
||||||
By default `vmagent` stores pending data, which cannot be sent to the configured remote storage systems in a timely manner, in the folder configured
|
By default `vmagent` stores pending data, which cannot be sent to the configured remote storage systems in a timely manner, in the folder set
|
||||||
via `-remoteWrite.tmpDataPath` command-line flag. By default `vmagent` writes all the pending data to this folder until this data is sent to the configured
|
by `-remoteWrite.tmpDataPath` command-line flag. By default `vmagent` writes all the pending data to this folder until this data is sent to the configured
|
||||||
remote storage systems or until the folder becomes full. The maximum data size, which can be saved to `-remoteWrite.tmpDataPath`
|
`-remoteWrite.url` systems or until the folder becomes full. The maximum data size, which can be saved to `-remoteWrite.tmpDataPath`
|
||||||
per every configured `-remoteWrite.url`, can be limited via `-remoteWrite.maxDiskUsagePerURL` command-line flag.
|
per every configured `-remoteWrite.url`, can be limited via `-remoteWrite.maxDiskUsagePerURL` command-line flag.
|
||||||
When this limit is reached, `vmagent` drops the oldest data from disk in order to save newly ingested data.
|
When this limit is reached, `vmagent` drops the oldest data from disk in order to save newly ingested data.
|
||||||
|
|
||||||
|
@ -1031,21 +1031,25 @@ There are cases when it is better disabling on-disk persistence for pending data
|
||||||
|
|
||||||
- When the persistent disk performance isn't enough for the given data processing rate.
|
- When the persistent disk performance isn't enough for the given data processing rate.
|
||||||
- When it is better to buffer pending data at the client side instead of bufferring it at `vmagent` side in the `-remoteWrite.tmpDataPath` folder.
|
- When it is better to buffer pending data at the client side instead of bufferring it at `vmagent` side in the `-remoteWrite.tmpDataPath` folder.
|
||||||
- When the data is already buffered at [Kafka side](#reading-metrics-from-kafka) or [Google PubSub side](#reading-metrics-from-pubsub).
|
- When the data is already buffered at [Kafka side](#reading-metrics-from-kafka) or at [Google PubSub side](#reading-metrics-from-pubsub).
|
||||||
- When it is better to drop pending data instead of buffering it.
|
- When it is better to drop pending data instead of buffering it.
|
||||||
|
|
||||||
In this case `-remoteWrite.disableOnDiskQueue` command-line flag can be passed to `vmagent`.
|
In this case `-remoteWrite.disableOnDiskQueue` command-line flag can be passed to `vmagent` per each configured `-remoteWrite.url`.
|
||||||
When this flag is specified, `vmagent` works in the following way if the configured remote storage systems cannot keep up with the data ingestion rate:
|
`vmagent` works in the following way if the corresponding remote storage system at `-remoteWrite.url` cannot keep up with the data ingestion rate
|
||||||
|
and the `-remoteWrite.disableOnDiskQueue` command-line flag is set:
|
||||||
|
|
||||||
- It returns `429 Too Many Requests` HTTP error to clients, which send data to `vmagent` via [supported HTTP endpoints](#how-to-push-data-to-vmagent).
|
- It returns `429 Too Many Requests` HTTP error to clients, which send data to `vmagent` via [supported HTTP endpoints](#how-to-push-data-to-vmagent).
|
||||||
You can specify `-remoteWrite.dropSamplesOnOverload` command-line flag in order to drop the ingested samples instead of returning the error to clients in this case.
|
If `-remoteWrite.dropSamplesOnOverload` command-line flag is set or if multiple `-remoteWrite.disableOnDiskQueue` command-line flags are set
|
||||||
|
for different `-remoteWrite.url` options, then the ingested samples are silently dropped instead of returning the error to clients.
|
||||||
- It suspends consuming data from [Kafka side](#reading-metrics-from-kafka) or [Google PubSub side](#google-pubsub-integration) until the remote storage becomes available.
|
- It suspends consuming data from [Kafka side](#reading-metrics-from-kafka) or [Google PubSub side](#google-pubsub-integration) until the remote storage becomes available.
|
||||||
You can specify `-remoteWrite.dropSamplesOnOverload` command-line flag in order to drop the fetched samples instead of suspending data consumption from Kafka or Google PubSub.
|
If `-remoteWrite.dropSamplesOnOverload` command-line flag is set or if multiple `-remoteWrite.disableOnDiskQueue` command-line flags are set
|
||||||
- It drops samples pushed to `vmagent` via non-HTTP protocols and logs the error. Pass `-remoteWrite.dropSamplesOnOverload` on order to suppress error messages in this case.
|
for different `-remoteWrite.url` options, then the fetched samples are silently dropped instead of suspending data consumption from Kafka or Google PubSub.
|
||||||
- It drops samples [scraped from Prometheus-compatible targets](#how-to-collect-metrics-in-prometheus-format), because it is better to drop samples
|
- It drops samples pushed to `vmagent` via non-HTTP protocols and logs the error. Pass `-remoteWrite.dropSamplesOnOverload` command-line flag in order
|
||||||
instead of blocking the scrape process.
|
to suppress error messages in this case.
|
||||||
- It drops [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/) output samples, because it is better to drop output samples
|
- It drops samples [scraped from Prometheus-compatible targets](#how-to-collect-metrics-in-prometheus-format), because it is better from operations perspective
|
||||||
instead of blocking the stream aggregation process.
|
to drop samples instead of blocking the scrape process.
|
||||||
|
- It drops [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/) output samples, because it is better from operations perspective
|
||||||
|
to drop output samples instead of blocking the stream aggregation process.
|
||||||
|
|
||||||
The number of dropped samples because of overloaded remote storage can be [monitored](#monitoring) via `vmagent_remotewrite_samples_dropped_total` metric.
|
The number of dropped samples because of overloaded remote storage can be [monitored](#monitoring) via `vmagent_remotewrite_samples_dropped_total` metric.
|
||||||
The number of unsuccessful attempts to send data to overloaded remote storage can be [monitored](#monitoring) via `vmagent_remotewrite_push_failures_total` metric.
|
The number of unsuccessful attempts to send data to overloaded remote storage can be [monitored](#monitoring) via `vmagent_remotewrite_push_failures_total` metric.
|
||||||
|
@ -1057,7 +1061,7 @@ on spiky workloads, since `vmagent` may buffer more data in memory before return
|
||||||
if `-remoteWrite.disableOnDiskQueue` command-line flag is specified. It may also read buffered data from `-remoteWrite.tmpDataPath`
|
if `-remoteWrite.disableOnDiskQueue` command-line flag is specified. It may also read buffered data from `-remoteWrite.tmpDataPath`
|
||||||
on startup.
|
on startup.
|
||||||
|
|
||||||
When `-remoteWrite.disableOnDiskQueue` command-line flag is set, then `vmagent` may send the same samples multiple times to the configured remote storage
|
When `-remoteWrite.disableOnDiskQueue` command-line flag is set, `vmagent` may send the same samples multiple times to the configured remote storage
|
||||||
if it cannot keep up with the data ingestion rate. In this case the [deduplication](https://docs.victoriametrics.com/#deduplication)
|
if it cannot keep up with the data ingestion rate. In this case the [deduplication](https://docs.victoriametrics.com/#deduplication)
|
||||||
must be enabled on all the configured remote storage systems.
|
must be enabled on all the configured remote storage systems.
|
||||||
|
|
||||||
|
@ -1196,7 +1200,7 @@ If you have suggestions for improvements or have found a bug - please open an is
|
||||||
with `-remoteWrite.maxDiskUsagePerURL` command-line flag.
|
with `-remoteWrite.maxDiskUsagePerURL` command-line flag.
|
||||||
If you don't want to send all the buffered data from the directory to remote storage then simply stop `vmagent` and delete the directory.
|
If you don't want to send all the buffered data from the directory to remote storage then simply stop `vmagent` and delete the directory.
|
||||||
|
|
||||||
* If `vmagent` runs on a host with slow persistent storage, which cannot keep up with the volume of processed samples, then is possible to disable
|
* If `vmagent` runs on a host with slow persistent storage, which cannot keep up with the volume of processed samples, then it is possible to disable
|
||||||
the persistent storage with `-remoteWrite.disableOnDiskQueue` command-line flag. See [these docs](#disabling-on-disk-persistence) for more details.
|
the persistent storage with `-remoteWrite.disableOnDiskQueue` command-line flag. See [these docs](#disabling-on-disk-persistence) for more details.
|
||||||
|
|
||||||
* By default `vmagent` masks `-remoteWrite.url` with `secret-url` values in logs and at `/metrics` page because
|
* By default `vmagent` masks `-remoteWrite.url` with `secret-url` values in logs and at `/metrics` page because
|
||||||
|
@ -2065,13 +2069,11 @@ See the docs at https://docs.victoriametrics.com/vmagent/ .
|
||||||
Supports an array of values separated by comma or specified via multiple flags.
|
Supports an array of values separated by comma or specified via multiple flags.
|
||||||
Value can contain comma inside single-quoted or double-quoted string, {}, [] and () braces.
|
Value can contain comma inside single-quoted or double-quoted string, {}, [] and () braces.
|
||||||
-remoteWrite.disableOnDiskQueue array
|
-remoteWrite.disableOnDiskQueue array
|
||||||
Whether to disable storing pending data to -remoteWrite.tmpDataPath when the configured remote storage systems cannot keep up with the data ingestion rate. See https://docs.victoriametrics.com/vmagent#disabling-on-disk-persistence .See also -remoteWrite.dropSamplesOnOverload
|
Whether to disable storing pending data to -remoteWrite.tmpDataPath when the remote storage system at the corresponding -remoteWrite.url cannot keep up with the data ingestion rate. See https://docs.victoriametrics.com/vmagent#disabling-on-disk-persistence . See also -remoteWrite.dropSamplesOnOverload
|
||||||
Supports array of values separated by comma or specified via multiple flags.
|
|
||||||
Empty values are set to false.
|
|
||||||
-remoteWrite.dropSamplesOnOverload array
|
|
||||||
Whether to drop samples when -remoteWrite.disableOnDiskQueue is set and if the samples cannot be pushed into the configured remote storage systems in a timely manner. See https://docs.victoriametrics.com/vmagent#disabling-on-disk-persistence
|
|
||||||
Supports array of values separated by comma or specified via multiple flags.
|
Supports array of values separated by comma or specified via multiple flags.
|
||||||
Empty values are set to false.
|
Empty values are set to false.
|
||||||
|
-remoteWrite.dropSamplesOnOverload
|
||||||
|
Whether to drop samples when -remoteWrite.disableOnDiskQueue is set and if the samples cannot be pushed into the configured -remoteWrite.url systems in a timely manner. See https://docs.victoriametrics.com/vmagent#disabling-on-disk-persistence
|
||||||
-remoteWrite.flushInterval duration
|
-remoteWrite.flushInterval duration
|
||||||
Interval for flushing the data to remote storage. This option takes effect only when less than 10K data points per second are pushed to -remoteWrite.url (default 1s)
|
Interval for flushing the data to remote storage. This option takes effect only when less than 10K data points per second are pushed to -remoteWrite.url (default 1s)
|
||||||
-remoteWrite.forcePromProto array
|
-remoteWrite.forcePromProto array
|
||||||
|
|
|
@ -62,11 +62,21 @@ func MustOpenFastQueue(path, name string, maxInmemoryBlocks int, maxPendingBytes
|
||||||
fq.mu.Unlock()
|
fq.mu.Unlock()
|
||||||
return float64(n)
|
return float64(n)
|
||||||
})
|
})
|
||||||
|
|
||||||
pendingBytes := fq.GetPendingBytes()
|
pendingBytes := fq.GetPendingBytes()
|
||||||
logger.Infof("opened fast persistent queue at %q with maxInmemoryBlocks=%d isPQDisabled=%t, it contains %d pending bytes", path, maxInmemoryBlocks, isPQDisabled, pendingBytes)
|
persistenceStatus := "enabled"
|
||||||
|
if isPQDisabled {
|
||||||
|
persistenceStatus = "disabled"
|
||||||
|
}
|
||||||
|
logger.Infof("opened fast queue at %q with maxInmemoryBlocks=%d, it contains %d pending bytes, persistence is %s", path, maxInmemoryBlocks, pendingBytes, persistenceStatus)
|
||||||
return fq
|
return fq
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// IsPersistentQueueDisabled returns true if persistend queue at fq is disabled.
|
||||||
|
func (fq *FastQueue) IsPersistentQueueDisabled() bool {
|
||||||
|
return fq.isPQDisabled
|
||||||
|
}
|
||||||
|
|
||||||
// IsWriteBlocked checks if data can be pushed into fq
|
// IsWriteBlocked checks if data can be pushed into fq
|
||||||
func (fq *FastQueue) IsWriteBlocked() bool {
|
func (fq *FastQueue) IsWriteBlocked() bool {
|
||||||
if !fq.isPQDisabled {
|
if !fq.isPQDisabled {
|
||||||
|
|
Loading…
Reference in a new issue