mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
Feature allow configuring disableOnDiskQueue and dropSamplesOnOverload per url (#6248)
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): allow configuring `-remoteWrite.disableOnDiskQueue` and `-remoteWrite.dropSamplesOnOverload` cmd-line flags per each `-remoteWrite.url`. See this [pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6065). Thanks to @rbizos for implementaion! * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add labels `path` and `url` to metrics `vmagent_remotewrite_push_failures_total` and `vmagent_remotewrite_samples_dropped_total`. Now number of failed pushes and dropped samples can be tracked per `-remoteWrite.url`. --------- Signed-off-by: hagen1778 <roman@victoriametrics.com> Co-authored-by: Raphael Bizos <r.bizos@criteo.com>
This commit is contained in:
parent
c87ce86d96
commit
87fd400dfc
5 changed files with 75 additions and 49 deletions
|
@ -110,11 +110,11 @@ var (
|
||||||
streamAggrDropInputLabels = flagutil.NewArrayString("streamAggr.dropInputLabels", "An optional list of labels to drop from samples "+
|
streamAggrDropInputLabels = flagutil.NewArrayString("streamAggr.dropInputLabels", "An optional list of labels to drop from samples "+
|
||||||
"before stream de-duplication and aggregation . See https://docs.victoriametrics.com/stream-aggregation/#dropping-unneeded-labels")
|
"before stream de-duplication and aggregation . See https://docs.victoriametrics.com/stream-aggregation/#dropping-unneeded-labels")
|
||||||
|
|
||||||
disableOnDiskQueue = flag.Bool("remoteWrite.disableOnDiskQueue", false, "Whether to disable storing pending data to -remoteWrite.tmpDataPath "+
|
disableOnDiskQueue = flagutil.NewArrayBool("remoteWrite.disableOnDiskQueue", "Whether to disable storing pending data to -remoteWrite.tmpDataPath "+
|
||||||
"when the configured remote storage systems cannot keep up with the data ingestion rate. See https://docs.victoriametrics.com/vmagent/#disabling-on-disk-persistence ."+
|
"when the configured remote storage systems cannot keep up with the data ingestion rate. See https://docs.victoriametrics.com/vmagent#disabling-on-disk-persistence ."+
|
||||||
"See also -remoteWrite.dropSamplesOnOverload")
|
"See also -remoteWrite.dropSamplesOnOverload")
|
||||||
dropSamplesOnOverload = flag.Bool("remoteWrite.dropSamplesOnOverload", false, "Whether to drop samples when -remoteWrite.disableOnDiskQueue is set and if the samples "+
|
dropSamplesOnOverload = flagutil.NewArrayBool("remoteWrite.dropSamplesOnOverload", "Whether to drop samples when -remoteWrite.disableOnDiskQueue is set and if the samples "+
|
||||||
"cannot be pushed into the configured remote storage systems in a timely manner. See https://docs.victoriametrics.com/vmagent/#disabling-on-disk-persistence")
|
"cannot be pushed into the configured remote storage systems in a timely manner. See https://docs.victoriametrics.com/vmagent#disabling-on-disk-persistence")
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
@ -135,6 +135,9 @@ var (
|
||||||
"see https://docs.victoriametrics.com/vmagent/#disabling-on-disk-persistence"),
|
"see https://docs.victoriametrics.com/vmagent/#disabling-on-disk-persistence"),
|
||||||
StatusCode: http.StatusTooManyRequests,
|
StatusCode: http.StatusTooManyRequests,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// disableOnDiskQueueAll is set to true if all remoteWrite.urls were configured to disable persistent queue via disableOnDiskQueue
|
||||||
|
disableOnDiskQueueAll bool
|
||||||
)
|
)
|
||||||
|
|
||||||
// MultitenancyEnabled returns true if -enableMultitenantHandlers or -remoteWrite.multitenantURL is specified.
|
// MultitenancyEnabled returns true if -enableMultitenantHandlers or -remoteWrite.multitenantURL is specified.
|
||||||
|
@ -227,6 +230,15 @@ func Init() {
|
||||||
if len(*remoteWriteURLs) > 0 {
|
if len(*remoteWriteURLs) > 0 {
|
||||||
rwctxsDefault = newRemoteWriteCtxs(nil, *remoteWriteURLs)
|
rwctxsDefault = newRemoteWriteCtxs(nil, *remoteWriteURLs)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
disableOnDiskQueueAll = true
|
||||||
|
for _, v := range *disableOnDiskQueue {
|
||||||
|
if !v {
|
||||||
|
disableOnDiskQueueAll = false
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
dropDanglingQueues()
|
dropDanglingQueues()
|
||||||
|
|
||||||
// Start config reloader.
|
// Start config reloader.
|
||||||
|
@ -440,10 +452,10 @@ func PushDropSamplesOnFailure(at *auth.Token, wr *prompbmarshal.WriteRequest) {
|
||||||
//
|
//
|
||||||
// The caller must return ErrQueueFullHTTPRetry to the client, which sends wr, if TryPush returns false.
|
// The caller must return ErrQueueFullHTTPRetry to the client, which sends wr, if TryPush returns false.
|
||||||
func TryPush(at *auth.Token, wr *prompbmarshal.WriteRequest) bool {
|
func TryPush(at *auth.Token, wr *prompbmarshal.WriteRequest) bool {
|
||||||
return tryPush(at, wr, *dropSamplesOnOverload)
|
return tryPush(at, wr, false)
|
||||||
}
|
}
|
||||||
|
|
||||||
func tryPush(at *auth.Token, wr *prompbmarshal.WriteRequest, dropSamplesOnFailure bool) bool {
|
func tryPush(at *auth.Token, wr *prompbmarshal.WriteRequest, forceDropSamplesOnFailure bool) bool {
|
||||||
tss := wr.Timeseries
|
tss := wr.Timeseries
|
||||||
|
|
||||||
if at == nil && MultitenancyEnabled() {
|
if at == nil && MultitenancyEnabled() {
|
||||||
|
@ -476,17 +488,18 @@ func tryPush(at *auth.Token, wr *prompbmarshal.WriteRequest, dropSamplesOnFailur
|
||||||
|
|
||||||
rowsCount := getRowsCount(tss)
|
rowsCount := getRowsCount(tss)
|
||||||
|
|
||||||
if *disableOnDiskQueue {
|
// Quick check whether writes to configured remote storage systems are blocked.
|
||||||
// Quick check whether writes to configured remote storage systems are blocked.
|
// This allows saving CPU time spent on relabeling and block compression
|
||||||
// This allows saving CPU time spent on relabeling and block compression
|
// if some of remote storage systems cannot keep up with the data ingestion rate.
|
||||||
// if some of remote storage systems cannot keep up with the data ingestion rate.
|
// this shortcut is only applicable if all remote writes have disableOnDiskQueue = true
|
||||||
|
if disableOnDiskQueueAll {
|
||||||
for _, rwctx := range rwctxs {
|
for _, rwctx := range rwctxs {
|
||||||
if rwctx.fq.IsWriteBlocked() {
|
if rwctx.fq.IsWriteBlocked() {
|
||||||
pushFailures.Inc()
|
rwctx.pushFailures.Inc()
|
||||||
if dropSamplesOnFailure {
|
if forceDropSamplesOnFailure || rwctx.dropSamplesOnOverload {
|
||||||
// Just drop samples
|
// Just drop samples
|
||||||
samplesDropped.Add(rowsCount)
|
rwctx.rowsDroppedOnPushFailure.Add(rowsCount)
|
||||||
return true
|
continue
|
||||||
}
|
}
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
@ -539,27 +552,14 @@ func tryPush(at *auth.Token, wr *prompbmarshal.WriteRequest, dropSamplesOnFailur
|
||||||
}
|
}
|
||||||
sortLabelsIfNeeded(tssBlock)
|
sortLabelsIfNeeded(tssBlock)
|
||||||
tssBlock = limitSeriesCardinality(tssBlock)
|
tssBlock = limitSeriesCardinality(tssBlock)
|
||||||
if !tryPushBlockToRemoteStorages(rwctxs, tssBlock) {
|
if !tryPushBlockToRemoteStorages(rwctxs, tssBlock, forceDropSamplesOnFailure) {
|
||||||
if !*disableOnDiskQueue {
|
|
||||||
logger.Panicf("BUG: tryPushBlockToRemoteStorages must return true if -remoteWrite.disableOnDiskQueue isn't set")
|
|
||||||
}
|
|
||||||
pushFailures.Inc()
|
|
||||||
if dropSamplesOnFailure {
|
|
||||||
samplesDropped.Add(rowsCount)
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
var (
|
func tryPushBlockToRemoteStorages(rwctxs []*remoteWriteCtx, tssBlock []prompbmarshal.TimeSeries, forceDropSamplesOnFailure bool) bool {
|
||||||
samplesDropped = metrics.NewCounter(`vmagent_remotewrite_samples_dropped_total`)
|
|
||||||
pushFailures = metrics.NewCounter(`vmagent_remotewrite_push_failures_total`)
|
|
||||||
)
|
|
||||||
|
|
||||||
func tryPushBlockToRemoteStorages(rwctxs []*remoteWriteCtx, tssBlock []prompbmarshal.TimeSeries) bool {
|
|
||||||
if len(tssBlock) == 0 {
|
if len(tssBlock) == 0 {
|
||||||
// Nothing to push
|
// Nothing to push
|
||||||
return true
|
return true
|
||||||
|
@ -567,7 +567,7 @@ func tryPushBlockToRemoteStorages(rwctxs []*remoteWriteCtx, tssBlock []prompbmar
|
||||||
|
|
||||||
if len(rwctxs) == 1 {
|
if len(rwctxs) == 1 {
|
||||||
// Fast path - just push data to the configured single remote storage
|
// Fast path - just push data to the configured single remote storage
|
||||||
return rwctxs[0].TryPush(tssBlock)
|
return rwctxs[0].TryPush(tssBlock, forceDropSamplesOnFailure)
|
||||||
}
|
}
|
||||||
|
|
||||||
// We need to push tssBlock to multiple remote storages.
|
// We need to push tssBlock to multiple remote storages.
|
||||||
|
@ -578,7 +578,7 @@ func tryPushBlockToRemoteStorages(rwctxs []*remoteWriteCtx, tssBlock []prompbmar
|
||||||
if replicas <= 0 {
|
if replicas <= 0 {
|
||||||
replicas = 1
|
replicas = 1
|
||||||
}
|
}
|
||||||
return tryShardingBlockAmongRemoteStorages(rwctxs, tssBlock, replicas)
|
return tryShardingBlockAmongRemoteStorages(rwctxs, tssBlock, replicas, forceDropSamplesOnFailure)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Replicate tssBlock samples among rwctxs.
|
// Replicate tssBlock samples among rwctxs.
|
||||||
|
@ -590,7 +590,7 @@ func tryPushBlockToRemoteStorages(rwctxs []*remoteWriteCtx, tssBlock []prompbmar
|
||||||
for _, rwctx := range rwctxs {
|
for _, rwctx := range rwctxs {
|
||||||
go func(rwctx *remoteWriteCtx) {
|
go func(rwctx *remoteWriteCtx) {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
if !rwctx.TryPush(tssBlock) {
|
if !rwctx.TryPush(tssBlock, forceDropSamplesOnFailure) {
|
||||||
anyPushFailed.Store(true)
|
anyPushFailed.Store(true)
|
||||||
}
|
}
|
||||||
}(rwctx)
|
}(rwctx)
|
||||||
|
@ -599,7 +599,7 @@ func tryPushBlockToRemoteStorages(rwctxs []*remoteWriteCtx, tssBlock []prompbmar
|
||||||
return !anyPushFailed.Load()
|
return !anyPushFailed.Load()
|
||||||
}
|
}
|
||||||
|
|
||||||
func tryShardingBlockAmongRemoteStorages(rwctxs []*remoteWriteCtx, tssBlock []prompbmarshal.TimeSeries, replicas int) bool {
|
func tryShardingBlockAmongRemoteStorages(rwctxs []*remoteWriteCtx, tssBlock []prompbmarshal.TimeSeries, replicas int, forceDropSamplesOnFailure bool) bool {
|
||||||
x := getTSSShards(len(rwctxs))
|
x := getTSSShards(len(rwctxs))
|
||||||
defer putTSSShards(x)
|
defer putTSSShards(x)
|
||||||
|
|
||||||
|
@ -653,7 +653,7 @@ func tryShardingBlockAmongRemoteStorages(rwctxs []*remoteWriteCtx, tssBlock []pr
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
go func(rwctx *remoteWriteCtx, tss []prompbmarshal.TimeSeries) {
|
go func(rwctx *remoteWriteCtx, tss []prompbmarshal.TimeSeries) {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
if !rwctx.TryPush(tss) {
|
if !rwctx.TryPush(tss, forceDropSamplesOnFailure) {
|
||||||
anyPushFailed.Store(true)
|
anyPushFailed.Store(true)
|
||||||
}
|
}
|
||||||
}(rwctx, shard)
|
}(rwctx, shard)
|
||||||
|
@ -786,14 +786,19 @@ type remoteWriteCtx struct {
|
||||||
sas atomic.Pointer[streamaggr.Aggregators]
|
sas atomic.Pointer[streamaggr.Aggregators]
|
||||||
deduplicator *streamaggr.Deduplicator
|
deduplicator *streamaggr.Deduplicator
|
||||||
|
|
||||||
streamAggrKeepInput bool
|
streamAggrKeepInput bool
|
||||||
streamAggrDropInput bool
|
streamAggrDropInput bool
|
||||||
|
disableOnDiskQueue bool
|
||||||
|
dropSamplesOnOverload bool
|
||||||
|
|
||||||
pss []*pendingSeries
|
pss []*pendingSeries
|
||||||
pssNextIdx atomic.Uint64
|
pssNextIdx atomic.Uint64
|
||||||
|
|
||||||
rowsPushedAfterRelabel *metrics.Counter
|
rowsPushedAfterRelabel *metrics.Counter
|
||||||
rowsDroppedByRelabel *metrics.Counter
|
rowsDroppedByRelabel *metrics.Counter
|
||||||
|
|
||||||
|
pushFailures *metrics.Counter
|
||||||
|
rowsDroppedOnPushFailure *metrics.Counter
|
||||||
}
|
}
|
||||||
|
|
||||||
func newRemoteWriteCtx(argIdx int, remoteWriteURL *url.URL, maxInmemoryBlocks int, sanitizedURL string) *remoteWriteCtx {
|
func newRemoteWriteCtx(argIdx int, remoteWriteURL *url.URL, maxInmemoryBlocks int, sanitizedURL string) *remoteWriteCtx {
|
||||||
|
@ -809,7 +814,8 @@ func newRemoteWriteCtx(argIdx int, remoteWriteURL *url.URL, maxInmemoryBlocks in
|
||||||
logger.Warnf("rounding the -remoteWrite.maxDiskUsagePerURL=%d to the minimum supported value: %d", maxPendingBytes, persistentqueue.DefaultChunkFileSize)
|
logger.Warnf("rounding the -remoteWrite.maxDiskUsagePerURL=%d to the minimum supported value: %d", maxPendingBytes, persistentqueue.DefaultChunkFileSize)
|
||||||
maxPendingBytes = persistentqueue.DefaultChunkFileSize
|
maxPendingBytes = persistentqueue.DefaultChunkFileSize
|
||||||
}
|
}
|
||||||
fq := persistentqueue.MustOpenFastQueue(queuePath, sanitizedURL, maxInmemoryBlocks, maxPendingBytes, *disableOnDiskQueue)
|
isPQDisabled := disableOnDiskQueue.GetOptionalArg(argIdx)
|
||||||
|
fq := persistentqueue.MustOpenFastQueue(queuePath, sanitizedURL, maxInmemoryBlocks, maxPendingBytes, isPQDisabled)
|
||||||
_ = metrics.GetOrCreateGauge(fmt.Sprintf(`vmagent_remotewrite_pending_data_bytes{path=%q, url=%q}`, queuePath, sanitizedURL), func() float64 {
|
_ = metrics.GetOrCreateGauge(fmt.Sprintf(`vmagent_remotewrite_pending_data_bytes{path=%q, url=%q}`, queuePath, sanitizedURL), func() float64 {
|
||||||
return float64(fq.GetPendingBytes())
|
return float64(fq.GetPendingBytes())
|
||||||
})
|
})
|
||||||
|
@ -852,8 +858,14 @@ func newRemoteWriteCtx(argIdx int, remoteWriteURL *url.URL, maxInmemoryBlocks in
|
||||||
c: c,
|
c: c,
|
||||||
pss: pss,
|
pss: pss,
|
||||||
|
|
||||||
|
dropSamplesOnOverload: dropSamplesOnOverload.GetOptionalArg(argIdx),
|
||||||
|
disableOnDiskQueue: isPQDisabled,
|
||||||
|
|
||||||
rowsPushedAfterRelabel: metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_rows_pushed_after_relabel_total{path=%q, url=%q}`, queuePath, sanitizedURL)),
|
rowsPushedAfterRelabel: metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_rows_pushed_after_relabel_total{path=%q, url=%q}`, queuePath, sanitizedURL)),
|
||||||
rowsDroppedByRelabel: metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_relabel_metrics_dropped_total{path=%q, url=%q}`, queuePath, sanitizedURL)),
|
rowsDroppedByRelabel: metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_relabel_metrics_dropped_total{path=%q, url=%q}`, queuePath, sanitizedURL)),
|
||||||
|
|
||||||
|
pushFailures: metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_push_failures_total{path=%q, url=%q}`, queuePath, sanitizedURL)),
|
||||||
|
rowsDroppedOnPushFailure: metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_samples_dropped_total{path=%q, url=%q}`, queuePath, sanitizedURL)),
|
||||||
}
|
}
|
||||||
|
|
||||||
// Initialize sas
|
// Initialize sas
|
||||||
|
@ -914,7 +926,7 @@ func (rwctx *remoteWriteCtx) MustStop() {
|
||||||
//
|
//
|
||||||
// TryPush can be called concurrently for multiple remoteWriteCtx,
|
// TryPush can be called concurrently for multiple remoteWriteCtx,
|
||||||
// so it shouldn't modify tss entries.
|
// so it shouldn't modify tss entries.
|
||||||
func (rwctx *remoteWriteCtx) TryPush(tss []prompbmarshal.TimeSeries) bool {
|
func (rwctx *remoteWriteCtx) TryPush(tss []prompbmarshal.TimeSeries, forceDropSamplesOnFailure bool) bool {
|
||||||
// Apply relabeling
|
// Apply relabeling
|
||||||
var rctx *relabelCtx
|
var rctx *relabelCtx
|
||||||
var v *[]prompbmarshal.TimeSeries
|
var v *[]prompbmarshal.TimeSeries
|
||||||
|
@ -966,6 +978,14 @@ func (rwctx *remoteWriteCtx) TryPush(tss []prompbmarshal.TimeSeries) bool {
|
||||||
putRelabelCtx(rctx)
|
putRelabelCtx(rctx)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if !ok {
|
||||||
|
rwctx.pushFailures.Inc()
|
||||||
|
if forceDropSamplesOnFailure || rwctx.dropSamplesOnOverload {
|
||||||
|
rwctx.rowsDroppedOnPushFailure.Add(len(tss))
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return ok
|
return ok
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -990,13 +1010,13 @@ func (rwctx *remoteWriteCtx) pushInternalTrackDropped(tss []prompbmarshal.TimeSe
|
||||||
if rwctx.tryPushInternal(tss) {
|
if rwctx.tryPushInternal(tss) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if !*disableOnDiskQueue {
|
if !rwctx.disableOnDiskQueue {
|
||||||
logger.Panicf("BUG: tryPushInternal must return true if -remoteWrite.disableOnDiskQueue isn't set")
|
logger.Panicf("BUG: tryPushInternal must return true if -remoteWrite.disableOnDiskQueue isn't set")
|
||||||
}
|
}
|
||||||
pushFailures.Inc()
|
rwctx.pushFailures.Inc()
|
||||||
if *dropSamplesOnOverload {
|
if dropSamplesOnOverload.GetOptionalArg(rwctx.idx) {
|
||||||
rowsCount := getRowsCount(tss)
|
rowsCount := getRowsCount(tss)
|
||||||
samplesDropped.Add(rowsCount)
|
rwctx.rowsDroppedOnPushFailure.Add(rowsCount)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -96,7 +96,7 @@ func TestRemoteWriteContext_TryPush_ImmutableTimeseries(t *testing.T) {
|
||||||
|
|
||||||
// copy inputTss to make sure it is not mutated during TryPush call
|
// copy inputTss to make sure it is not mutated during TryPush call
|
||||||
copy(expectedTss, inputTss)
|
copy(expectedTss, inputTss)
|
||||||
rwctx.TryPush(inputTss)
|
rwctx.TryPush(inputTss, false)
|
||||||
|
|
||||||
if !reflect.DeepEqual(expectedTss, inputTss) {
|
if !reflect.DeepEqual(expectedTss, inputTss) {
|
||||||
t.Fatalf("unexpected samples;\ngot\n%v\nwant\n%v", inputTss, expectedTss)
|
t.Fatalf("unexpected samples;\ngot\n%v\nwant\n%v", inputTss, expectedTss)
|
||||||
|
|
|
@ -38,6 +38,8 @@ See also [LTS releases](https://docs.victoriametrics.com/lts-releases/).
|
||||||
* FEATURE: [dashboards/operator](https://grafana.com/grafana/dashboards/17869), [dashboards/backupmanager](https://grafana.com/grafana/dashboards/17798) and [dashboard/tenant-statistic](https://grafana.com/grafana/dashboards/16399): update dashboard to be compatible with Grafana 10+ version.
|
* FEATURE: [dashboards/operator](https://grafana.com/grafana/dashboards/17869), [dashboards/backupmanager](https://grafana.com/grafana/dashboards/17798) and [dashboard/tenant-statistic](https://grafana.com/grafana/dashboards/16399): update dashboard to be compatible with Grafana 10+ version.
|
||||||
* FEATURE: [dashboards/cluster](https://grafana.com/grafana/dashboards/11176): add new panel `Concurrent selects` to `vmstorage` row. The panel will show how many ongoing select queries are processed by vmstorage and should help to identify resource bottlenecks. See panel description for more details.
|
* FEATURE: [dashboards/cluster](https://grafana.com/grafana/dashboards/11176): add new panel `Concurrent selects` to `vmstorage` row. The panel will show how many ongoing select queries are processed by vmstorage and should help to identify resource bottlenecks. See panel description for more details.
|
||||||
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add service discovery support for [Vultr](https://www.vultr.com/). See [these docs](https://docs.victoriametrics.com/sd_configs/#vultr_sd_configs) and [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6041).
|
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add service discovery support for [Vultr](https://www.vultr.com/). See [these docs](https://docs.victoriametrics.com/sd_configs/#vultr_sd_configs) and [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6041).
|
||||||
|
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): allow configuring `-remoteWrite.disableOnDiskQueue` and `-remoteWrite.dropSamplesOnOverload` cmd-line flags per each `-remoteWrite.url`. See this [pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6065). Thanks to @rbizos for implementaion!
|
||||||
|
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add labels `path` and `url` to metrics `vmagent_remotewrite_push_failures_total` and `vmagent_remotewrite_samples_dropped_total`. Now number of failed pushes and dropped samples can be tracked per `-remoteWrite.url`.
|
||||||
|
|
||||||
* BUGFIX: [vmui](https://docs.victoriametrics.com/#vmui): fix bug that prevents the first query trace from expanding on click event. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6186). The issue was introduced in [v1.100.0](https://docs.victoriametrics.com/changelog/#v11000) release.
|
* BUGFIX: [vmui](https://docs.victoriametrics.com/#vmui): fix bug that prevents the first query trace from expanding on click event. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6186). The issue was introduced in [v1.100.0](https://docs.victoriametrics.com/changelog/#v11000) release.
|
||||||
* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent/): prevent potential panic during [stream aggregation](https://docs.victoriametrics.com/stream-aggregation.html) if more than one `--remoteWrite.streamAggr.dedupInterval` is configured. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6205).
|
* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent/): prevent potential panic during [stream aggregation](https://docs.victoriametrics.com/stream-aggregation.html) if more than one `--remoteWrite.streamAggr.dedupInterval` is configured. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6205).
|
||||||
|
|
|
@ -1187,7 +1187,7 @@ If you have suggestions for improvements or have found a bug - please open an is
|
||||||
with `-remoteWrite.maxDiskUsagePerURL` command-line flag.
|
with `-remoteWrite.maxDiskUsagePerURL` command-line flag.
|
||||||
If you don't want to send all the buffered data from the directory to remote storage then simply stop `vmagent` and delete the directory.
|
If you don't want to send all the buffered data from the directory to remote storage then simply stop `vmagent` and delete the directory.
|
||||||
|
|
||||||
* If `vmagent` runs on a host with slow persistent storage, which cannot keep up with the volume of processed samples, then is is possible to disable
|
* If `vmagent` runs on a host with slow persistent storage, which cannot keep up with the volume of processed samples, then is possible to disable
|
||||||
the persistent storage with `-remoteWrite.disableOnDiskQueue` command-line flag. See [these docs](#disabling-on-disk-persistence) for more details.
|
the persistent storage with `-remoteWrite.disableOnDiskQueue` command-line flag. See [these docs](#disabling-on-disk-persistence) for more details.
|
||||||
|
|
||||||
* By default `vmagent` masks `-remoteWrite.url` with `secret-url` values in logs and at `/metrics` page because
|
* By default `vmagent` masks `-remoteWrite.url` with `secret-url` values in logs and at `/metrics` page because
|
||||||
|
@ -2057,10 +2057,14 @@ See the docs at https://docs.victoriametrics.com/vmagent/ .
|
||||||
Optional path to bearer token file to use for the corresponding -remoteWrite.url. The token is re-read from the file every second
|
Optional path to bearer token file to use for the corresponding -remoteWrite.url. The token is re-read from the file every second
|
||||||
Supports an array of values separated by comma or specified via multiple flags.
|
Supports an array of values separated by comma or specified via multiple flags.
|
||||||
Value can contain comma inside single-quoted or double-quoted string, {}, [] and () braces.
|
Value can contain comma inside single-quoted or double-quoted string, {}, [] and () braces.
|
||||||
-remoteWrite.disableOnDiskQueue
|
-remoteWrite.disableOnDiskQueue array
|
||||||
Whether to disable storing pending data to -remoteWrite.tmpDataPath when the configured remote storage systems cannot keep up with the data ingestion rate. See https://docs.victoriametrics.com/vmagent/#disabling-on-disk-persistence .See also -remoteWrite.dropSamplesOnOverload
|
Whether to disable storing pending data to -remoteWrite.tmpDataPath when the configured remote storage systems cannot keep up with the data ingestion rate. See https://docs.victoriametrics.com/vmagent#disabling-on-disk-persistence .See also -remoteWrite.dropSamplesOnOverload
|
||||||
-remoteWrite.dropSamplesOnOverload
|
Supports array of values separated by comma or specified via multiple flags.
|
||||||
Whether to drop samples when -remoteWrite.disableOnDiskQueue is set and if the samples cannot be pushed into the configured remote storage systems in a timely manner. See https://docs.victoriametrics.com/vmagent/#disabling-on-disk-persistence
|
Empty values are set to false.
|
||||||
|
-remoteWrite.dropSamplesOnOverload array
|
||||||
|
Whether to drop samples when -remoteWrite.disableOnDiskQueue is set and if the samples cannot be pushed into the configured remote storage systems in a timely manner. See https://docs.victoriametrics.com/vmagent#disabling-on-disk-persistence
|
||||||
|
Supports array of values separated by comma or specified via multiple flags.
|
||||||
|
Empty values are set to false.
|
||||||
-remoteWrite.flushInterval duration
|
-remoteWrite.flushInterval duration
|
||||||
Interval for flushing the data to remote storage. This option takes effect only when less than 10K data points per second are pushed to -remoteWrite.url (default 1s)
|
Interval for flushing the data to remote storage. This option takes effect only when less than 10K data points per second are pushed to -remoteWrite.url (default 1s)
|
||||||
-remoteWrite.forcePromProto array
|
-remoteWrite.forcePromProto array
|
||||||
|
|
|
@ -63,7 +63,7 @@ func MustOpenFastQueue(path, name string, maxInmemoryBlocks int, maxPendingBytes
|
||||||
return float64(n)
|
return float64(n)
|
||||||
})
|
})
|
||||||
pendingBytes := fq.GetPendingBytes()
|
pendingBytes := fq.GetPendingBytes()
|
||||||
logger.Infof("opened fast persistent queue at %q with maxInmemoryBlocks=%d, it contains %d pending bytes", path, maxInmemoryBlocks, pendingBytes)
|
logger.Infof("opened fast persistent queue at %q with maxInmemoryBlocks=%d isPQDisabled=%t, it contains %d pending bytes", path, maxInmemoryBlocks, isPQDisabled, pendingBytes)
|
||||||
return fq
|
return fq
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue