From 5034aa0773d4a38f4e89352b33da10559538593f Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Sat, 25 Nov 2023 11:31:30 +0200 Subject: [PATCH] app/vmagent: follow-up for 090cb2c9de8d533eaba45a3ebbdb0d2503e97e00 - Add Try* prefix to functions, which return bool result in order to improve readability and reduce the probability of missing check for the result returned from these functions. - Call the adjustSampleValues() only once on input samples. Previously it was called on every attempt to flush data to peristent queue. - Properly restore the initial state of WriteRequest passed to tryPushWriteRequest() before returning from this function after unsuccessful push to persistent queue. Previously a part of WriteRequest samples may be lost in such case. - Add -remoteWrite.dropSamplesOnOverload command-line flag, which can be used for dropping incoming samples instead of returning 429 Too Many Requests error to the client when -remoteWrite.disableOnDiskQueue is set and the remote storage cannot keep up with the data ingestion rate. - Add vmagent_remotewrite_samples_dropped_total metric, which counts the number of dropped samples. - Add vmagent_remotewrite_push_failures_total metric, which counts the number of unsuccessful attempts to push data to persistent queue when -remoteWrite.disableOnDiskQueue is set. - Remove vmagent_remotewrite_aggregation_metrics_dropped_total and vm_promscrape_push_samples_dropped_total metrics, because they are replaced with vmagent_remotewrite_samples_dropped_total metric. - Update 'Disabling on-disk persistence' docs at docs/vmagent.md - Update stale comments in the code Updates https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5088 Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2110 --- app/vmagent/csvimport/request_handler.go | 2 +- app/vmagent/datadog/request_handler.go | 2 +- app/vmagent/graphite/request_handler.go | 2 +- app/vmagent/influx/request_handler.go | 2 +- app/vmagent/main.go | 5 +- app/vmagent/native/request_handler.go | 2 +- app/vmagent/newrelic/request_handler.go | 2 +- app/vmagent/opentelemetry/request_handler.go | 2 +- app/vmagent/opentsdb/request_handler.go | 2 +- app/vmagent/opentsdbhttp/request_handler.go | 2 +- .../prometheusimport/request_handler.go | 2 +- .../promremotewrite/request_handler.go | 2 +- app/vmagent/remotewrite/pendingseries.go | 71 ++++---- app/vmagent/remotewrite/pendingseries_test.go | 4 +- app/vmagent/remotewrite/remotewrite.go | 171 +++++++++++------- app/vmagent/vmimport/request_handler.go | 2 +- app/vminsert/main.go | 3 +- docs/CHANGELOG.md | 2 +- docs/vmagent.md | 112 ++++++------ lib/persistentqueue/fastqueue.go | 47 ++--- lib/persistentqueue/fastqueue_test.go | 37 ++-- lib/persistentqueue/fastqueue_timing_test.go | 10 +- lib/promscrape/scraper.go | 11 +- 23 files changed, 267 insertions(+), 230 deletions(-) diff --git a/app/vmagent/csvimport/request_handler.go b/app/vmagent/csvimport/request_handler.go index 835944db5..8d5464a06 100644 --- a/app/vmagent/csvimport/request_handler.go +++ b/app/vmagent/csvimport/request_handler.go @@ -65,7 +65,7 @@ func insertRows(at *auth.Token, rows []parser.Row, extraLabels []prompbmarshal.L ctx.WriteRequest.Timeseries = tssDst ctx.Labels = labels ctx.Samples = samples - if !remotewrite.Push(at, &ctx.WriteRequest) { + if !remotewrite.TryPush(at, &ctx.WriteRequest) { return remotewrite.ErrQueueFullHTTPRetry } rowsInserted.Add(len(rows)) diff --git a/app/vmagent/datadog/request_handler.go b/app/vmagent/datadog/request_handler.go index 971cf450c..4cdfe1093 100644 --- a/app/vmagent/datadog/request_handler.go +++ b/app/vmagent/datadog/request_handler.go @@ -88,7 +88,7 @@ func insertRows(at *auth.Token, series []datadog.Series, extraLabels []prompbmar ctx.WriteRequest.Timeseries = tssDst ctx.Labels = labels ctx.Samples = samples - if !remotewrite.Push(at, &ctx.WriteRequest) { + if !remotewrite.TryPush(at, &ctx.WriteRequest) { return remotewrite.ErrQueueFullHTTPRetry } rowsInserted.Add(rowsTotal) diff --git a/app/vmagent/graphite/request_handler.go b/app/vmagent/graphite/request_handler.go index 4e124d91a..b072415f6 100644 --- a/app/vmagent/graphite/request_handler.go +++ b/app/vmagent/graphite/request_handler.go @@ -56,7 +56,7 @@ func insertRows(rows []parser.Row) error { ctx.WriteRequest.Timeseries = tssDst ctx.Labels = labels ctx.Samples = samples - if !remotewrite.Push(nil, &ctx.WriteRequest) { + if !remotewrite.TryPush(nil, &ctx.WriteRequest) { return remotewrite.ErrQueueFullHTTPRetry } rowsInserted.Add(len(rows)) diff --git a/app/vmagent/influx/request_handler.go b/app/vmagent/influx/request_handler.go index 5d5e4585d..58a6bf950 100644 --- a/app/vmagent/influx/request_handler.go +++ b/app/vmagent/influx/request_handler.go @@ -130,7 +130,7 @@ func insertRows(at *auth.Token, db string, rows []parser.Row, extraLabels []prom ctx.ctx.Labels = labels ctx.ctx.Samples = samples ctx.commonLabels = commonLabels - if !remotewrite.Push(at, &ctx.ctx.WriteRequest) { + if !remotewrite.TryPush(at, &ctx.ctx.WriteRequest) { return remotewrite.ErrQueueFullHTTPRetry } rowsInserted.Add(rowsTotal) diff --git a/app/vmagent/main.go b/app/vmagent/main.go index eef50fd05..c42331ba5 100644 --- a/app/vmagent/main.go +++ b/app/vmagent/main.go @@ -37,6 +37,7 @@ import ( opentsdbhttpserver "github.com/VictoriaMetrics/VictoriaMetrics/lib/ingestserver/opentsdbhttp" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/procutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" "github.com/VictoriaMetrics/VictoriaMetrics/lib/pushmetrics" @@ -139,7 +140,9 @@ func main() { opentsdbhttpServer = opentsdbhttpserver.MustStart(*opentsdbHTTPListenAddr, *opentsdbHTTPUseProxyProtocol, httpInsertHandler) } - promscrape.Init(remotewrite.Push) + promscrape.Init(func(at *auth.Token, wr *prompbmarshal.WriteRequest) { + _ = remotewrite.TryPush(at, wr) + }) if len(*httpListenAddr) > 0 { go httpserver.Serve(*httpListenAddr, *useProxyProtocol, requestHandler) diff --git a/app/vmagent/native/request_handler.go b/app/vmagent/native/request_handler.go index ed112648b..68b0bdebe 100644 --- a/app/vmagent/native/request_handler.go +++ b/app/vmagent/native/request_handler.go @@ -84,7 +84,7 @@ func insertRows(at *auth.Token, block *stream.Block, extraLabels []prompbmarshal ctx.WriteRequest.Timeseries = tssDst ctx.Labels = labels ctx.Samples = samples - if !remotewrite.Push(at, &ctx.WriteRequest) { + if !remotewrite.TryPush(at, &ctx.WriteRequest) { return remotewrite.ErrQueueFullHTTPRetry } return nil diff --git a/app/vmagent/newrelic/request_handler.go b/app/vmagent/newrelic/request_handler.go index 9e47694fd..b20c05f34 100644 --- a/app/vmagent/newrelic/request_handler.go +++ b/app/vmagent/newrelic/request_handler.go @@ -76,7 +76,7 @@ func insertRows(at *auth.Token, rows []newrelic.Row, extraLabels []prompbmarshal ctx.WriteRequest.Timeseries = tssDst ctx.Labels = labels ctx.Samples = samples - if !remotewrite.Push(at, &ctx.WriteRequest) { + if !remotewrite.TryPush(at, &ctx.WriteRequest) { return remotewrite.ErrQueueFullHTTPRetry } rowsInserted.Add(len(rows)) diff --git a/app/vmagent/opentelemetry/request_handler.go b/app/vmagent/opentelemetry/request_handler.go index b308a4b7c..e923f750c 100644 --- a/app/vmagent/opentelemetry/request_handler.go +++ b/app/vmagent/opentelemetry/request_handler.go @@ -59,7 +59,7 @@ func insertRows(at *auth.Token, tss []prompbmarshal.TimeSeries, extraLabels []pr ctx.WriteRequest.Timeseries = tssDst ctx.Labels = labels ctx.Samples = samples - if !remotewrite.Push(at, &ctx.WriteRequest) { + if !remotewrite.TryPush(at, &ctx.WriteRequest) { return remotewrite.ErrQueueFullHTTPRetry } rowsInserted.Add(rowsTotal) diff --git a/app/vmagent/opentsdb/request_handler.go b/app/vmagent/opentsdb/request_handler.go index 3e8c12295..a0f2f775f 100644 --- a/app/vmagent/opentsdb/request_handler.go +++ b/app/vmagent/opentsdb/request_handler.go @@ -56,7 +56,7 @@ func insertRows(rows []parser.Row) error { ctx.WriteRequest.Timeseries = tssDst ctx.Labels = labels ctx.Samples = samples - if !remotewrite.Push(nil, &ctx.WriteRequest) { + if !remotewrite.TryPush(nil, &ctx.WriteRequest) { return remotewrite.ErrQueueFullHTTPRetry } rowsInserted.Add(len(rows)) diff --git a/app/vmagent/opentsdbhttp/request_handler.go b/app/vmagent/opentsdbhttp/request_handler.go index a919cb153..d9cb4b791 100644 --- a/app/vmagent/opentsdbhttp/request_handler.go +++ b/app/vmagent/opentsdbhttp/request_handler.go @@ -64,7 +64,7 @@ func insertRows(at *auth.Token, rows []parser.Row, extraLabels []prompbmarshal.L ctx.WriteRequest.Timeseries = tssDst ctx.Labels = labels ctx.Samples = samples - if !remotewrite.Push(at, &ctx.WriteRequest) { + if !remotewrite.TryPush(at, &ctx.WriteRequest) { return remotewrite.ErrQueueFullHTTPRetry } rowsInserted.Add(len(rows)) diff --git a/app/vmagent/prometheusimport/request_handler.go b/app/vmagent/prometheusimport/request_handler.go index 2d0e353de..49fcedfcb 100644 --- a/app/vmagent/prometheusimport/request_handler.go +++ b/app/vmagent/prometheusimport/request_handler.go @@ -73,7 +73,7 @@ func insertRows(at *auth.Token, rows []parser.Row, extraLabels []prompbmarshal.L ctx.WriteRequest.Timeseries = tssDst ctx.Labels = labels ctx.Samples = samples - if !remotewrite.Push(at, &ctx.WriteRequest) { + if !remotewrite.TryPush(at, &ctx.WriteRequest) { return remotewrite.ErrQueueFullHTTPRetry } rowsInserted.Add(len(rows)) diff --git a/app/vmagent/promremotewrite/request_handler.go b/app/vmagent/promremotewrite/request_handler.go index b2455555b..657c717d3 100644 --- a/app/vmagent/promremotewrite/request_handler.go +++ b/app/vmagent/promremotewrite/request_handler.go @@ -69,7 +69,7 @@ func insertRows(at *auth.Token, timeseries []prompb.TimeSeries, extraLabels []pr ctx.WriteRequest.Timeseries = tssDst ctx.Labels = labels ctx.Samples = samples - if !remotewrite.Push(at, &ctx.WriteRequest) { + if !remotewrite.TryPush(at, &ctx.WriteRequest) { return remotewrite.ErrQueueFullHTTPRetry } rowsInserted.Add(rowsTotal) diff --git a/app/vmagent/remotewrite/pendingseries.go b/app/vmagent/remotewrite/pendingseries.go index 92bd03bc7..c6591e946 100644 --- a/app/vmagent/remotewrite/pendingseries.go +++ b/app/vmagent/remotewrite/pendingseries.go @@ -57,11 +57,11 @@ func (ps *pendingSeries) MustStop() { ps.periodicFlusherWG.Wait() } -func (ps *pendingSeries) Push(tss []prompbmarshal.TimeSeries) bool { +func (ps *pendingSeries) TryPush(tss []prompbmarshal.TimeSeries) bool { ps.mu.Lock() - wasPushed := ps.wr.push(tss) + ok := ps.wr.tryPush(tss) ps.mu.Unlock() - return wasPushed + return ok } func (ps *pendingSeries) periodicFlusher() { @@ -84,8 +84,7 @@ func (ps *pendingSeries) periodicFlusher() { } } ps.mu.Lock() - // no-op - _ = ps.wr.flush() + _ = ps.wr.tryFlush() ps.mu.Unlock() } } @@ -94,15 +93,16 @@ type writeRequest struct { // Move lastFlushTime to the top of the struct in order to guarantee atomic access on 32-bit architectures. lastFlushTime uint64 + // The queue to send blocks to. fq *persistentqueue.FastQueue // Whether to encode the write request with VictoriaMetrics remote write protocol. isVMRemoteWrite bool - // How many significant figures must be left before sending the writeRequest to pushBlock. + // How many significant figures must be left before sending the writeRequest to fq. significantFigures int - // How many decimal digits after point must be left before sending the writeRequest to pushBlock. + // How many decimal digits after point must be left before sending the writeRequest to fq. roundDigits int wr prompbmarshal.WriteRequest @@ -115,7 +115,7 @@ type writeRequest struct { } func (wr *writeRequest) reset() { - // Do not reset lastFlushTime, pushBlock, isVMRemoteWrite, significantFigures and roundDigits, since they are re-used. + // Do not reset lastFlushTime, fq, isVMRemoteWrite, significantFigures and roundDigits, since they are re-used. wr.wr.Timeseries = nil @@ -133,41 +133,40 @@ func (wr *writeRequest) reset() { wr.buf = wr.buf[:0] } -// mustFlushOnStop makes force push into the queue -// needed to properly save in-memory buffer with disabled disk storage +// mustFlushOnStop force pushes wr data into wr.fq +// +// This is needed in order to properly save in-memory data to persistent queue on graceful shutdown. func (wr *writeRequest) mustFlushOnStop() { wr.wr.Timeseries = wr.tss - wr.adjustSampleValues() - atomic.StoreUint64(&wr.lastFlushTime, fasttime.UnixTimestamp()) - if !pushWriteRequest(&wr.wr, func(block []byte) bool { - wr.fq.MustWriteBlockIgnoreDisabledPQ(block) - return true - }, wr.isVMRemoteWrite) { - return + if !tryPushWriteRequest(&wr.wr, wr.mustWriteBlock, wr.isVMRemoteWrite) { + logger.Panicf("BUG: final flush must always return true") } wr.reset() } -func (wr *writeRequest) flush() bool { +func (wr *writeRequest) mustWriteBlock(block []byte) bool { + wr.fq.MustWriteBlockIgnoreDisabledPQ(block) + return true +} + +func (wr *writeRequest) tryFlush() bool { wr.wr.Timeseries = wr.tss - wr.adjustSampleValues() atomic.StoreUint64(&wr.lastFlushTime, fasttime.UnixTimestamp()) - if !pushWriteRequest(&wr.wr, wr.fq.WriteBlock, wr.isVMRemoteWrite) { + if !tryPushWriteRequest(&wr.wr, wr.fq.TryWriteBlock, wr.isVMRemoteWrite) { return false } wr.reset() return true } -func (wr *writeRequest) adjustSampleValues() { - samples := wr.samples - if n := wr.significantFigures; n > 0 { +func adjustSampleValues(samples []prompbmarshal.Sample, significantFigures, roundDigits int) { + if n := significantFigures; n > 0 { for i := range samples { s := &samples[i] s.Value = decimal.RoundToSignificantFigures(s.Value, n) } } - if n := wr.roundDigits; n < 100 { + if n := roundDigits; n < 100 { for i := range samples { s := &samples[i] s.Value = decimal.RoundToDecimalDigits(s.Value, n) @@ -175,7 +174,7 @@ func (wr *writeRequest) adjustSampleValues() { } } -func (wr *writeRequest) push(src []prompbmarshal.TimeSeries) bool { +func (wr *writeRequest) tryPush(src []prompbmarshal.TimeSeries) bool { tssDst := wr.tss maxSamplesPerBlock := *maxRowsPerBlock // Allow up to 10x of labels per each block on average. @@ -183,13 +182,15 @@ func (wr *writeRequest) push(src []prompbmarshal.TimeSeries) bool { for i := range src { if len(wr.samples) >= maxSamplesPerBlock || len(wr.labels) >= maxLabelsPerBlock { wr.tss = tssDst - if !wr.flush() { + if !wr.tryFlush() { return false } tssDst = wr.tss } + tsSrc := &src[i] + adjustSampleValues(tsSrc.Samples, wr.significantFigures, wr.roundDigits) tssDst = append(tssDst, prompbmarshal.TimeSeries{}) - wr.copyTimeSeries(&tssDst[len(tssDst)-1], &src[i]) + wr.copyTimeSeries(&tssDst[len(tssDst)-1], tsSrc) } wr.tss = tssDst @@ -221,7 +222,7 @@ func (wr *writeRequest) copyTimeSeries(dst, src *prompbmarshal.TimeSeries) { wr.buf = buf } -func pushWriteRequest(wr *prompbmarshal.WriteRequest, pushBlock func(block []byte) bool, isVMRemoteWrite bool) bool { +func tryPushWriteRequest(wr *prompbmarshal.WriteRequest, tryPushBlock func(block []byte) bool, isVMRemoteWrite bool) bool { if len(wr.Timeseries) == 0 { // Nothing to push return true @@ -237,7 +238,7 @@ func pushWriteRequest(wr *prompbmarshal.WriteRequest, pushBlock func(block []byt } writeRequestBufPool.Put(bb) if len(zb.B) <= persistentqueue.MaxBlockSize { - if !pushBlock(zb.B) { + if !tryPushBlock(zb.B) { return false } blockSizeRows.Update(float64(len(wr.Timeseries))) @@ -260,11 +261,13 @@ func pushWriteRequest(wr *prompbmarshal.WriteRequest, pushBlock func(block []byt } n := len(samples) / 2 wr.Timeseries[0].Samples = samples[:n] - if !pushWriteRequest(wr, pushBlock, isVMRemoteWrite) { + if !tryPushWriteRequest(wr, tryPushBlock, isVMRemoteWrite) { + wr.Timeseries[0].Samples = samples return false } wr.Timeseries[0].Samples = samples[n:] - if !pushWriteRequest(wr, pushBlock, isVMRemoteWrite) { + if !tryPushWriteRequest(wr, tryPushBlock, isVMRemoteWrite) { + wr.Timeseries[0].Samples = samples return false } wr.Timeseries[0].Samples = samples @@ -273,11 +276,13 @@ func pushWriteRequest(wr *prompbmarshal.WriteRequest, pushBlock func(block []byt timeseries := wr.Timeseries n := len(timeseries) / 2 wr.Timeseries = timeseries[:n] - if !pushWriteRequest(wr, pushBlock, isVMRemoteWrite) { + if !tryPushWriteRequest(wr, tryPushBlock, isVMRemoteWrite) { + wr.Timeseries = timeseries return false } wr.Timeseries = timeseries[n:] - if !pushWriteRequest(wr, pushBlock, isVMRemoteWrite) { + if !tryPushWriteRequest(wr, tryPushBlock, isVMRemoteWrite) { + wr.Timeseries = timeseries return false } wr.Timeseries = timeseries diff --git a/app/vmagent/remotewrite/pendingseries_test.go b/app/vmagent/remotewrite/pendingseries_test.go index 90a097a59..726548cb9 100644 --- a/app/vmagent/remotewrite/pendingseries_test.go +++ b/app/vmagent/remotewrite/pendingseries_test.go @@ -33,7 +33,9 @@ func testPushWriteRequest(t *testing.T, rowsCount, expectedBlockLenProm, expecte pushBlockLen = len(block) return true } - _ = pushWriteRequest(wr, pushBlock, isVMRemoteWrite) + if !tryPushWriteRequest(wr, pushBlock, isVMRemoteWrite) { + t.Fatalf("cannot push data to to remote storage") + } if math.Abs(float64(pushBlockLen-expectedBlockLen)/float64(expectedBlockLen)*100) > tolerancePrc { t.Fatalf("unexpected block len for rowsCount=%d, isVMRemoteWrite=%v; got %d bytes; expecting %d bytes +- %.0f%%", rowsCount, isVMRemoteWrite, pushBlockLen, expectedBlockLen, tolerancePrc) diff --git a/app/vmagent/remotewrite/remotewrite.go b/app/vmagent/remotewrite/remotewrite.go index 07f573b43..1bd936fa0 100644 --- a/app/vmagent/remotewrite/remotewrite.go +++ b/app/vmagent/remotewrite/remotewrite.go @@ -47,8 +47,8 @@ var ( shardByURLLabels = flagutil.NewArrayString("remoteWrite.shardByURL.labels", "Optional list of labels, which must be used for sharding outgoing samples "+ "among remote storage systems if -remoteWrite.shardByURL command-line flag is set. By default all the labels are used for sharding in order to gain "+ "even distribution of series over the specified -remoteWrite.url systems") - tmpDataPath = flag.String("remoteWrite.tmpDataPath", "vmagent-remotewrite-data", "Path to directory where temporary data for remote write component is stored. "+ - "See also -remoteWrite.maxDiskUsagePerURL") + tmpDataPath = flag.String("remoteWrite.tmpDataPath", "vmagent-remotewrite-data", "Path to directory for storing pending data, which isn't sent to the configured -remoteWrite.url . "+ + "See also -remoteWrite.maxDiskUsagePerURL and -remoteWrite.disableOnDiskQueue") keepDanglingQueues = flag.Bool("remoteWrite.keepDanglingQueues", false, "Keep persistent queues contents at -remoteWrite.tmpDataPath in case there are no matching -remoteWrite.url. "+ "Useful when -remoteWrite.url is changed temporarily and persistent queue files will be needed later on.") queues = flag.Int("remoteWrite.queues", cgroup.AvailableCPUs()*2, "The number of concurrent queues to each -remoteWrite.url. Set more queues if default number of queues "+ @@ -87,9 +87,11 @@ var ( "are written to the corresponding -remoteWrite.url . See also -remoteWrite.streamAggr.keepInput and https://docs.victoriametrics.com/stream-aggregation.html") streamAggrDedupInterval = flagutil.NewArrayDuration("remoteWrite.streamAggr.dedupInterval", 0, "Input samples are de-duplicated with this interval before being aggregated. "+ "Only the last sample per each time series per each interval is aggregated if the interval is greater than zero") - disableOnDiskQueue = flag.Bool("remoteWrite.disableOnDiskQueue", false, "Whether to disable on-disk queue for metrics ingestion processing. "+ - "If in-memory queue is full for at least 1 remoteWrite target, all data ingestion is blocked and returns an error. "+ - "It allows to build a chain of vmagents and build complicated data pipelines without data-loss. On-disk writes is still possible during graceful shutdown for storing in-memory part of the queue.") + disableOnDiskQueue = flag.Bool("remoteWrite.disableOnDiskQueue", false, "Whether to disable storing pending data to -remoteWrite.tmpDataPath "+ + "when the configured remote storage systems cannot keep up with the data ingestion rate. See https://docs.victoriametrics.com/vmagent.html#disabling-on-disk-persistence ."+ + "See also -remoteWrite.dropSamplesOnOverload") + dropSamplesOnOverload = flag.Bool("remoteWrite.dropSamplesOnOverload", false, "Whether to drop samples when -remoteWrite.disableOnDiskQueue is set and if the samples "+ + "cannot be pushed into the configured remote storage systems in a timely manner. See https://docs.victoriametrics.com/vmagent.html#disabling-on-disk-persistence") ) var ( @@ -103,10 +105,11 @@ var ( // Data without tenant id is written to defaultAuthToken if -remoteWrite.multitenantURL is specified. defaultAuthToken = &auth.Token{} - // ErrQueueFullHTTPRetry returned when -remoteWrite.disableOnDiskQueue enabled - // and one of remote storage cannot handle a load + // ErrQueueFullHTTPRetry must be returned when TryPush() returns false. ErrQueueFullHTTPRetry = &httpserver.ErrorWithStatusCode{ - Err: fmt.Errorf("in-memory queue is full, write requests blocked due to enabled flag -remoteWrite.disableOnDiskQueue=true. Retry request later"), + Err: fmt.Errorf("remote storage systems cannot keep up with the data ingestion rate; retry the request later " + + "or remove -remoteWrite.disableOnDiskQueue from vmagent command-line flags, so it could save pending data to -remoteWrite.tmpDataPath; " + + "see https://docs.victoriametrics.com/vmagent.html#disabling-on-disk-persistence"), StatusCode: http.StatusTooManyRequests, } ) @@ -331,7 +334,7 @@ var configReloaderWG sync.WaitGroup // Stop stops remotewrite. // -// It is expected that nobody calls Push during and after the call to this func. +// It is expected that nobody calls TryPush during and after the call to this func. func Stop() { close(configReloaderStopCh) configReloaderWG.Wait() @@ -341,7 +344,7 @@ func Stop() { } rwctxsDefault = nil - // There is no need in locking rwctxsMapLock here, since nobody should call Push during the Stop call. + // There is no need in locking rwctxsMapLock here, since nobody should call TryPush during the Stop call. for _, rwctxs := range rwctxsMap { for _, rwctx := range rwctxs { rwctx.MustStop() @@ -357,13 +360,16 @@ func Stop() { } } -// Push sends wr to remote storage systems set via `-remoteWrite.url`. +// TryPush tries sending wr to the configured remote storage systems set via -remoteWrite.url and -remoteWrite.multitenantURL // -// If at is nil, then the data is pushed to the configured `-remoteWrite.url`. -// If at isn't nil, the data is pushed to the configured `-remoteWrite.multitenantURL`. +// If at is nil, then the data is pushed to the configured -remoteWrite.url. +// If at isn't nil, the data is pushed to the configured -remoteWrite.multitenantURL. // -// Note that wr may be modified by Push because of relabeling and rounding. -func Push(at *auth.Token, wr *prompbmarshal.WriteRequest) bool { +// TryPush can modify wr contents, so the caller must re-initialize wr before calling TryPush() after unsuccessful attempt. +// TryPush may send partial data from wr on unsuccessful attempt, so repeated call for the same wr may send the data multiple times. +// +// The caller must return ErrQueueFullHTTPRetry to the client, which sends wr, if TryPush returns false. +func TryPush(at *auth.Token, wr *prompbmarshal.WriteRequest) bool { if at == nil && len(*remoteWriteMultitenantURLs) > 0 { // Write data to default tenant if at isn't set while -remoteWrite.multitenantURL is set. at = defaultAuthToken @@ -387,30 +393,42 @@ func Push(at *auth.Token, wr *prompbmarshal.WriteRequest) bool { } rwctxsMapLock.Unlock() } - var isWritesLocked bool - for _, rwctx := range rwctxs { - if rwctx.fq.IsWritesBlocked() { - isWritesLocked = true - break + + tss := wr.Timeseries + rowsCount := getRowsCount(tss) + + if *disableOnDiskQueue { + // Quick check whether writes to configured remote storage systems are blocked. + // This allows saving CPU time spent on relabeling and block compression + // if some of remote storage systems cannot keep up with the data ingestion rate. + for _, rwctx := range rwctxs { + if rwctx.fq.IsWriteBlocked() { + pushFailures.Inc() + if *dropSamplesOnOverload { + // Just drop samples + samplesDropped.Add(rowsCount) + return true + } + return false + } } } - // fast path, write path is blocked - if isWritesLocked { - return false - } var rctx *relabelCtx rcs := allRelabelConfigs.Load() pcsGlobal := rcs.global if pcsGlobal.Len() > 0 { rctx = getRelabelCtx() + defer func() { + rctx.reset() + putRelabelCtx(rctx) + }() } - tss := wr.Timeseries - rowsCount := getRowsCount(tss) globalRowsPushedBeforeRelabel.Add(rowsCount) maxSamplesPerBlock := *maxRowsPerBlock // Allow up to 10x of labels per each block on average. maxLabelsPerBlock := 10 * maxSamplesPerBlock + for len(tss) > 0 { // Process big tss in smaller blocks in order to reduce the maximum memory usage samplesCount := 0 @@ -418,7 +436,7 @@ func Push(at *auth.Token, wr *prompbmarshal.WriteRequest) bool { i := 0 for i < len(tss) { samplesCount += len(tss[i].Samples) - labelsCount += len(tss[i].Labels) + labelsCount += len(tss[i].Samples) * len(tss[i].Labels) i++ if samplesCount >= maxSamplesPerBlock || labelsCount >= maxLabelsPerBlock { break @@ -439,10 +457,14 @@ func Push(at *auth.Token, wr *prompbmarshal.WriteRequest) bool { } sortLabelsIfNeeded(tssBlock) tssBlock = limitSeriesCardinality(tssBlock) - if !pushBlockToRemoteStorages(rwctxs, tssBlock) { - if rctx != nil { - rctx.reset() - putRelabelCtx(rctx) + if !tryPushBlockToRemoteStorages(rwctxs, tssBlock) { + if !*disableOnDiskQueue { + logger.Panicf("BUG: tryPushBlockToRemoteStorages must return true if -remoteWrite.disableOnDiskQueue isn't set") + } + pushFailures.Inc() + if *dropSamplesOnOverload { + samplesDropped.Add(rowsCount) + return true } return false } @@ -450,13 +472,15 @@ func Push(at *auth.Token, wr *prompbmarshal.WriteRequest) bool { rctx.reset() } } - if rctx != nil { - putRelabelCtx(rctx) - } return true } -func pushBlockToRemoteStorages(rwctxs []*remoteWriteCtx, tssBlock []prompbmarshal.TimeSeries) bool { +var ( + samplesDropped = metrics.NewCounter(`vmagent_remotewrite_samples_dropped_total`) + pushFailures = metrics.NewCounter(`vmagent_remotewrite_push_failures_total`) +) + +func tryPushBlockToRemoteStorages(rwctxs []*remoteWriteCtx, tssBlock []prompbmarshal.TimeSeries) bool { if len(tssBlock) == 0 { // Nothing to push return true @@ -464,8 +488,7 @@ func pushBlockToRemoteStorages(rwctxs []*remoteWriteCtx, tssBlock []prompbmarsha if len(rwctxs) == 1 { // Fast path - just push data to the configured single remote storage - err := rwctxs[0].Push(tssBlock) - return err + return rwctxs[0].TryPush(tssBlock) } // We need to push tssBlock to multiple remote storages. @@ -502,7 +525,7 @@ func pushBlockToRemoteStorages(rwctxs []*remoteWriteCtx, tssBlock []prompbmarsha } go func(rwctx *remoteWriteCtx, tss []prompbmarshal.TimeSeries) { defer wg.Done() - if !rwctx.Push(tss) { + if !rwctx.TryPush(tss) { atomic.StoreUint64(&anyPushFailed, 1) } }(rwctx, tssShard) @@ -520,7 +543,7 @@ func pushBlockToRemoteStorages(rwctxs []*remoteWriteCtx, tssBlock []prompbmarsha for _, rwctx := range rwctxs { go func(rwctx *remoteWriteCtx) { defer wg.Done() - if !rwctx.Push(tssBlock) { + if !rwctx.TryPush(tssBlock) { atomic.StoreUint64(&anyPushFailed, 1) } }(rwctx) @@ -629,9 +652,8 @@ type remoteWriteCtx struct { pss []*pendingSeries pssNextIdx uint64 - rowsPushedAfterRelabel *metrics.Counter - rowsDroppedByRelabel *metrics.Counter - rowsDroppedAtAggregationOnPush *metrics.Counter + rowsPushedAfterRelabel *metrics.Counter + rowsDroppedByRelabel *metrics.Counter } func newRemoteWriteCtx(argIdx int, remoteWriteURL *url.URL, maxInmemoryBlocks int, sanitizedURL string) *remoteWriteCtx { @@ -654,9 +676,9 @@ func newRemoteWriteCtx(argIdx int, remoteWriteURL *url.URL, maxInmemoryBlocks in _ = metrics.GetOrCreateGauge(fmt.Sprintf(`vmagent_remotewrite_pending_inmemory_blocks{path=%q, url=%q}`, queuePath, sanitizedURL), func() float64 { return float64(fq.GetInmemoryQueueLen()) }) - _ = metrics.GetOrCreateGauge(fmt.Sprintf(`vmagent_remotewrite_inmemory_queue_blocked{path=%q, url=%q}`, queuePath, sanitizedURL), func() float64 { - if fq.IsWritesBlocked() { - return 1.0 + _ = metrics.GetOrCreateGauge(fmt.Sprintf(`vmagent_remotewrite_queue_blocked{path=%q, url=%q}`, queuePath, sanitizedURL), func() float64 { + if fq.IsWriteBlocked() { + return 1 } return 0 }) @@ -690,9 +712,8 @@ func newRemoteWriteCtx(argIdx int, remoteWriteURL *url.URL, maxInmemoryBlocks in c: c, pss: pss, - rowsPushedAfterRelabel: metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_rows_pushed_after_relabel_total{path=%q, url=%q}`, queuePath, sanitizedURL)), - rowsDroppedByRelabel: metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_relabel_metrics_dropped_total{path=%q, url=%q}`, queuePath, sanitizedURL)), - rowsDroppedAtAggregationOnPush: metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_aggregation_metrics_dropped_total{path=%q, url=%q}`, queuePath, sanitizedURL)), + rowsPushedAfterRelabel: metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_rows_pushed_after_relabel_total{path=%q, url=%q}`, queuePath, sanitizedURL)), + rowsDroppedByRelabel: metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_relabel_metrics_dropped_total{path=%q, url=%q}`, queuePath, sanitizedURL)), } // Initialize sas @@ -735,7 +756,7 @@ func (rwctx *remoteWriteCtx) MustStop() { rwctx.rowsDroppedByRelabel = nil } -func (rwctx *remoteWriteCtx) Push(tss []prompbmarshal.TimeSeries) bool { +func (rwctx *remoteWriteCtx) TryPush(tss []prompbmarshal.TimeSeries) bool { // Apply relabeling var rctx *relabelCtx var v *[]prompbmarshal.TimeSeries @@ -773,16 +794,18 @@ func (rwctx *remoteWriteCtx) Push(tss []prompbmarshal.TimeSeries) bool { } matchIdxsPool.Put(matchIdxs) } - defer func() { - // Return back relabeling contexts to the pool - if rctx != nil { - *v = prompbmarshal.ResetTimeSeries(tss) - tssPool.Put(v) - putRelabelCtx(rctx) - } - }() - return rwctx.pushInternal(tss) + // Try pushing the data to remote storage + ok := rwctx.tryPushInternal(tss) + + // Return back relabeling contexts to the pool + if rctx != nil { + *v = prompbmarshal.ResetTimeSeries(tss) + tssPool.Put(v) + putRelabelCtx(rctx) + } + + return ok } var matchIdxsPool bytesutil.ByteBufferPool @@ -803,21 +826,22 @@ func dropAggregatedSeries(src []prompbmarshal.TimeSeries, matchIdxs []byte, drop } func (rwctx *remoteWriteCtx) pushInternalTrackDropped(tss []prompbmarshal.TimeSeries) { - if !rwctx.pushInternal(tss) { - rwctx.rowsDroppedAtAggregationOnPush.Inc() + if rwctx.tryPushInternal(tss) { + return + } + if !*disableOnDiskQueue { + logger.Panicf("BUG: tryPushInternal must return true if -remoteWrite.disableOnDiskQueue isn't set") + } + pushFailures.Inc() + if *dropSamplesOnOverload { + rowsCount := getRowsCount(tss) + samplesDropped.Add(rowsCount) } } -func (rwctx *remoteWriteCtx) pushInternal(tss []prompbmarshal.TimeSeries) bool { +func (rwctx *remoteWriteCtx) tryPushInternal(tss []prompbmarshal.TimeSeries) bool { var rctx *relabelCtx var v *[]prompbmarshal.TimeSeries - defer func() { - if rctx != nil { - *v = prompbmarshal.ResetTimeSeries(tss) - tssPool.Put(v) - putRelabelCtx(rctx) - } - }() if len(labelsGlobal) > 0 { // Make a copy of tss before adding extra labels in order to prevent // from affecting time series for other remoteWrite.url configs. @@ -829,7 +853,16 @@ func (rwctx *remoteWriteCtx) pushInternal(tss []prompbmarshal.TimeSeries) bool { pss := rwctx.pss idx := atomic.AddUint64(&rwctx.pssNextIdx, 1) % uint64(len(pss)) - return pss[idx].Push(tss) + + ok := pss[idx].TryPush(tss) + + if rctx != nil { + *v = prompbmarshal.ResetTimeSeries(tss) + tssPool.Put(v) + putRelabelCtx(rctx) + } + + return ok } func (rwctx *remoteWriteCtx) reinitStreamAggr() { diff --git a/app/vmagent/vmimport/request_handler.go b/app/vmagent/vmimport/request_handler.go index 68e4875f7..c755198ef 100644 --- a/app/vmagent/vmimport/request_handler.go +++ b/app/vmagent/vmimport/request_handler.go @@ -76,7 +76,7 @@ func insertRows(at *auth.Token, rows []parser.Row, extraLabels []prompbmarshal.L ctx.WriteRequest.Timeseries = tssDst ctx.Labels = labels ctx.Samples = samples - if !remotewrite.Push(at, &ctx.WriteRequest) { + if !remotewrite.TryPush(at, &ctx.WriteRequest) { return remotewrite.ErrQueueFullHTTPRetry } rowsInserted.Add(rowsTotal) diff --git a/app/vminsert/main.go b/app/vminsert/main.go index 9f23614fe..a53f8216d 100644 --- a/app/vminsert/main.go +++ b/app/vminsert/main.go @@ -97,9 +97,8 @@ func Init() { if len(*opentsdbHTTPListenAddr) > 0 { opentsdbhttpServer = opentsdbhttpserver.MustStart(*opentsdbHTTPListenAddr, *opentsdbHTTPUseProxyProtocol, opentsdbhttp.InsertHandler) } - promscrape.Init(func(at *auth.Token, wr *prompbmarshal.WriteRequest) bool { + promscrape.Init(func(at *auth.Token, wr *prompbmarshal.WriteRequest) { prompush.Push(wr) - return true }) } diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 86bbadb6f..8a48a8481 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -28,7 +28,7 @@ The sandbox cluster installation is running under the constant load generated by ## tip -* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add `-remoteWrite.disableOnDiskQueue` command-line flag, which can be used for disabling data queuing to disk when the remote storage cannot keep up with the data ingestion rate. In this case `vmagent` returns `429 Too Many Requests` response, so clients could decrease data ingestion rate on their side. This option may be useful when `vmagent` runs in environments with slow persistent disks. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2110). +* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add `-remoteWrite.disableOnDiskQueue` command-line flag, which can be used for disabling data queueing to disk when the remote storage cannot keep up with the data ingestion rate. See [these docs](https://docs.victoriametrics.com/vmagent.html#disabling-on-disk-persistence) and [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2110). * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add support for reading and writing samples via [Google PubSub](https://cloud.google.com/pubsub). See [these docs](https://docs.victoriametrics.com/vmagent.html#google-pubsub-integration). * FEATURE: reduce the default value for `-import.maxLineLen` command-line flag from 100MB to 10MB in order to prevent excessive memory usage during data import via [/api/v1/import](https://docs.victoriametrics.com/#how-to-import-data-in-json-line-format). diff --git a/docs/vmagent.md b/docs/vmagent.md index 637428547..8f0789473 100644 --- a/docs/vmagent.md +++ b/docs/vmagent.md @@ -869,47 +869,43 @@ scrape_configs: - "Proxy-Auth: top-secret" ``` -## Disabling on-disk queue +## Disabling on-disk persistence -On-disk queue aka persistent queue is a temporary folder configured via `-remoteWrite.tmpDataPath` flag. At this folder vmagent may store metric blocks. -Metric blocks persisted on disk if remote storage is not available or cannot handle ingestion load. -Size of this disk queue per remote storage can be limited via `-remoteWrite.maxDiskUsagePerURL`. By default, there is no limit. -In case of reaching those limit metric blocks will be silently dropped by vmagent. +By default `vmagent` stores pending data, which cannot be sent to the configured remote storage systems in a timely manner, in the folder configured +via `-remoteWrite.tmpDataPath` command-line flag. By default `vmagent` writes all the pending data to this folder until this data is sent to the configured +remote storage systems or until the folder becomes full. The maximum data size, which can be saved to `-remoteWrite.tmpDataPath` +per every configured `-remoteWrite.url`, can be limited via `-remoteWrite.maxDiskUsagePerURL` command-line flag. +When this limit is reached, `vmagent` drops the oldest data from disk in order to save newly ingested data. -This behaviour can be changed via flag `--remoteWrite.disableOnDiskQueue=true`. -It prevents vmagent from using on-disk storage for data buffering during ingestion or scraping. -But on-disk storage is still used for saving in-memory part of the queue and buffers during graceful shutdown. +There are cases when it is better disabling on-disk persistence for pending data at `vmagent` side: -It's expected that `streaming` aggregation and `scrapping` metrics will be dropped in case of full queue. -The following metrics help to detect samples drop: `vmagent_remotewrite_aggregation_metrics_dropped_total` and `vm_promscrape_push_samples_dropped_total`. +- When the persistent disk performance isn't enough for the given data processing rate. +- When it is better to buffer pending data at the client side instead of bufferring it at `vmagent` side in the `-remoteWrite.tmpDataPath` folder. +- When the data is already buffered at [Kafka side](#reading-metrics-from-kafka) or [Google PubSub side](#pubsub-integration). +- When it is better to drop pending data instead of buffering it. -In case of multiple configured remote storages, vmagent block writes requests even if a single remote storage cannot accept ingested samples. +In this case `-remoteWrite.disableOnDiskQueue` command-line flag can be passed to `vmagent`. +When this flag is specified, `vmagent` works in the following way if the configured remote storage systems cannot keep up with the data ingestion rate: -vmagent guarantees at-least-once delivery semantic. -It means that metric samples duplication is possible and [deduplication](https://docs.victoriametrics.com/#deduplication) must be configured at remote storage. +- It returns `429 Too Many Requests` HTTP error to clients, which send data to `vmagent` via [supported HTTP endpoints](#how-to-push-data-to-vmagent). + You can specify `-remoteWrite.dropSamplesOnOverload` command-line flag in order to drop the ingested samples instead of returning the error to clients in this case. +- It suspends consuming data from [Kafka side](#reading-metrics-from-kafka) or [Google PubSub side](#pubsub-integration) until the remote storage becomes available. + You can specify `-remoteWrite.dropSamplesOnOverload` command-line flag in order to drop the fetched samples instead of suspending data consumption from Kafka or Google PubSub. +- It drops samples [scraped from Prometheus-compatible targets](#how-to-collect-metrics-in-prometheus-format), because it is better to drop samples + instead of blocking the scrape process. +- It drops [stream aggregation](https://docs.victoriametrics.com/stream-aggregation.html) output samples, because it is better to drop output samples + instead of blocking the stream aggregation process. -### Common patterns -You may want to disable on-disk queue in the following cases: +The number of dropped samples because of overloaded remote storage can be [monitored](#monitoring) via `vmagent_remotewrite_samples_dropped_total` metric. +The number of unsuccessful attempts to send data to overloaded remote storage can be [monitored](#monitoring) via `vmagent_remotewrite_push_failures_total` metric. -1) chaining of vmagents. Intermediate vmagents used for aggregation may loss the data, if vmcluster is not available. - With disabled persistent queue aggregation vmagents will back-pressure metrics to the first vmagent. +`vmagent` still may write pending in-memory data to `-remoteWrite.tmpDataPath` on graceful shutdown +if `-remoteWrite.disableOnDiskQueue` command-line flag is specified. It may also read buffered data from `-remoteWrite.tmpDataPath` +on startup. -```mermaid -flowchart LR - A[vmagent] --> B(vmagent-aggregation-0) - A[vmagent] --> C(vmagent-aggregation-1) - B --> D[vmcluster] - C --> D[vmcluster] -``` - -2) If you want to replace actual on-disk queue with kafka or another compatible queue. On-disk queue must be disabled at `vmagent-consumer` - -```mermaid -flowchart LR - A[vmagent] --> B(kafka) - B <--> C(vmagent-consumer) - C --> D[vmcluster] -``` +When `-remoteWrite.disableOnDiskQueue` command-line flag is set, then `vmagent` may send the same samples multiple times to the configured remote storage +if it cannot keep up with the data ingestion rate. In this case the [deduplication](https://docs.victoriametrics.com/#deduplication) +must be enabled on all the configured remote storage systems. ## Cardinality limiter @@ -989,38 +985,39 @@ If you have suggestions for improvements or have found a bug - please open an is ## Troubleshooting -* We recommend you [set up the official Grafana dashboard](#monitoring) in order to monitor the state of `vmagent'. +* It is recommended [setting up the official Grafana dashboard](#monitoring) in order to monitor the state of `vmagent'. -* We recommend you increase the maximum number of open files in the system (`ulimit -n`) when scraping a big number of targets, +* It is recommended increasing the maximum number of open files in the system (`ulimit -n`) when scraping a big number of targets, as `vmagent` establishes at least a single TCP connection per target. * If `vmagent` uses too big amounts of memory, then the following options can help: - * Disabling staleness tracking with `-promscrape.noStaleMarkers` option. See [these docs](#prometheus-staleness-markers). - * Enabling stream parsing mode if `vmagent` scrapes targets with millions of metrics per target. See [these docs](#stream-parsing-mode). - * Reducing the number of output queues with `-remoteWrite.queues` command-line option. * Reducing the amounts of RAM vmagent can use for in-memory buffering with `-memory.allowedPercent` or `-memory.allowedBytes` command-line option. Another option is to reduce memory limits in Docker and/or Kubernetes if `vmagent` runs under these systems. * Reducing the number of CPU cores vmagent can use by passing `GOMAXPROCS=N` environment variable to `vmagent`, where `N` is the desired limit on CPU cores. Another option is to reduce CPU limits in Docker or Kubernetes if `vmagent` runs under these systems. - * Passing `-promscrape.dropOriginalLabels` command-line option to `vmagent`, so it drops `"discoveredLabels"` and `"droppedTargets"` - lists at `/api/v1/targets` page. This reduces memory usage when scraping big number of targets at the cost - of reduced debuggability for improperly configured per-target relabeling. + * Disabling staleness tracking with `-promscrape.noStaleMarkers` option. See [these docs](#prometheus-staleness-markers). + * Enabling stream parsing mode if `vmagent` scrapes targets with millions of metrics per target. See [these docs](#stream-parsing-mode). + * Reducing the number of tcp connections to remote storage systems with `-remoteWrite.queues` command-line option. + * Passing `-promscrape.dropOriginalLabels` command-line option to `vmagent` if it [discovers](https://docs.victoriametrics.com/sd_configs.html) + big number of targets and many of these targets are [dropped](https://docs.victoriametrics.com/relabeling.html#how-to-drop-discovered-targets) + before scraping. In this case `vmagent` drops `"discoveredLabels"` and `"droppedTargets"` + lists at `http://vmagent-host:8429/service-discovery` page. This reduces memory usage when scraping big number of targets at the cost + of reduced debuggability for improperly configured per-target [relabeling](https://docs.victoriametrics.com/relabeling.html). -* When `vmagent` scrapes many unreliable targets, it can flood the error log with scrape errors. These errors can be suppressed - by passing `-promscrape.suppressScrapeErrors` command-line flag to `vmagent`. The most recent scrape error per each target can be observed at `http://vmagent-host:8429/targets` - and `http://vmagent-host:8429/api/v1/targets`. +* When `vmagent` scrapes many unreliable targets, it can flood the error log with scrape errors. It is recommended investigating and fixing these errors. + If it is unfeasible to fix all the reported errors, then they can be suppressed by passing `-promscrape.suppressScrapeErrors` command-line flag to `vmagent`. + The most recent scrape error per each target can be observed at `http://vmagent-host:8429/targets` and `http://vmagent-host:8429/api/v1/targets`. -* The `/service-discovery` page could be useful for debugging relabeling process for scrape targets. +* The `http://vmagent-host:8429/service-discovery` page could be useful for debugging relabeling process for scrape targets. This page contains original labels for targets dropped during relabeling. By default, the `-promscrape.maxDroppedTargets` targets are shown here. If your setup drops more targets during relabeling, then increase `-promscrape.maxDroppedTargets` command-line flag value to see all the dropped targets. Note that tracking each dropped target requires up to 10Kb of RAM. Therefore, big values for `-promscrape.maxDroppedTargets` may result in increased memory usage if a big number of scrape targets are dropped during relabeling. -* We recommend you increase `-remoteWrite.queues` if `vmagent_remotewrite_pending_data_bytes` metric exported - at `http://vmagent-host:8429/metrics` page grows constantly. It is also recommended increasing `-remoteWrite.maxBlockSize` - and `-remoteWrite.maxRowsPerBlock` command-line options in this case. This can improve data ingestion performance - to the configured remote storage systems at the cost of higher memory usage. +* It is recommended increaseing `-remoteWrite.queues` if `vmagent_remotewrite_pending_data_bytes` [metric](#monitoring) + grows constantly. It is also recommended increasing `-remoteWrite.maxBlockSize` and `-remoteWrite.maxRowsPerBlock` command-line options in this case. + This can improve data ingestion performance to the configured remote storage systems at the cost of higher memory usage. * If you see gaps in the data pushed by `vmagent` to remote storage when `-remoteWrite.maxDiskUsagePerURL` is set, try increasing `-remoteWrite.queues`. Such gaps may appear because `vmagent` cannot keep up with sending the collected data to remote storage. @@ -1034,8 +1031,12 @@ If you have suggestions for improvements or have found a bug - please open an is The best solution is to use remote storage with [backfilling support](https://docs.victoriametrics.com/#backfilling) such as VictoriaMetrics. * `vmagent` buffers scraped data at the `-remoteWrite.tmpDataPath` directory until it is sent to `-remoteWrite.url`. - The directory can grow large when remote storage is unavailable for extended periods of time and if `-remoteWrite.maxDiskUsagePerURL` isn't set. - If you don't want to send all the data from the directory to remote storage then simply stop `vmagent` and delete the directory. + The directory can grow large when remote storage is unavailable for extended periods of time and if the maximum directory size isn't limited + with `-remoteWrite.maxDiskUsagePerURL` command-line flag. + If you don't want to send all the buffered data from the directory to remote storage then simply stop `vmagent` and delete the directory. + +* If `vmagent` runs on a host with slow persistent storage, which cannot keep up with the volume of processed samples, then is is possible to disable + the persistent storage with `-remoteWrite.disableOnDiskQueue` command-line flag. See [these docs](#disabling-on-disk-persistence) for more details. * By default `vmagent` masks `-remoteWrite.url` with `secret-url` values in logs and at `/metrics` page because the url may contain sensitive information such as auth tokens or passwords. @@ -1080,7 +1081,7 @@ If you have suggestions for improvements or have found a bug - please open an is regex: true ``` -See also [troubleshooting docs](https://docs.victoriametrics.com/Troubleshooting.html). +See also [general troubleshooting docs](https://docs.victoriametrics.com/Troubleshooting.html). ## Google PubSub integration [Enterprise version](https://docs.victoriametrics.com/enterprise.html) of `vmagent` can read and write metrics from / to google [PubSub](https://cloud.google.com/pubsub): @@ -1789,8 +1790,9 @@ See the docs at https://docs.victoriametrics.com/vmagent.html . Optional path to bearer token file to use for the corresponding -remoteWrite.url. The token is re-read from the file every second Supports an array of values separated by comma or specified via multiple flags. -remoteWrite.disableOnDiskQueue - Whether to disable on-disk queue for metrics ingestion processing. If in-memory queue is full for at least 1 remoteWrite target, all data ingestion is blocked and returns an error. - It allows to build a chain of vmagents and build complicated data pipelines without data-loss. On-disk writes is still possible during graceful shutdown for storing in-memory part of the queue. + Whether to disable storing pending data to -remoteWrite.tmpDataPath when the configured remote storage systems cannot keep up with the data ingestion rate. See https://docs.victoriametrics.com/vmagent.html#disabling-on-disk-persistence .See also -remoteWrite.dropSamplesOnOverload + -remoteWrite.dropSamplesOnOverload + Whether to drop samples when -remoteWrite.disableOnDiskQueue is set and if the samples cannot be pushed into the configured remote storage systems in a timely manner. See https://docs.victoriametrics.com/vmagent.html#disabling-on-disk-persistence -remoteWrite.flushInterval duration Interval for flushing the data to remote storage. This option takes effect only when less than 10K data points per second are pushed to -remoteWrite.url (default 1s) -remoteWrite.forcePromProto array @@ -1892,7 +1894,7 @@ See the docs at https://docs.victoriametrics.com/vmagent.html . Optional TLS server name to use for connections to the corresponding -remoteWrite.url. By default, the server name from -remoteWrite.url is used Supports an array of values separated by comma or specified via multiple flags. -remoteWrite.tmpDataPath string - Path to directory where temporary data for remote write component is stored. See also -remoteWrite.maxDiskUsagePerURL (default "vmagent-remotewrite-data") + Path to directory for storing pending data, which isn't sent to the configured -remoteWrite.url . See also -remoteWrite.maxDiskUsagePerURL and -remoteWrite.disableOnDiskQueue (default "vmagent-remotewrite-data") -remoteWrite.url array Remote storage URL to write data to. It must support either VictoriaMetrics remote write protocol or Prometheus remote_write protocol. Example url: http://:8428/api/v1/write . Pass multiple -remoteWrite.url options in order to replicate the collected data to multiple remote storage systems. The data can be sharded among the configured remote storage systems if -remoteWrite.shardByURL flag is set. See also -remoteWrite.multitenantURL Supports an array of values separated by comma or specified via multiple flags. diff --git a/lib/persistentqueue/fastqueue.go b/lib/persistentqueue/fastqueue.go index f08a73f92..a2d597411 100644 --- a/lib/persistentqueue/fastqueue.go +++ b/lib/persistentqueue/fastqueue.go @@ -22,7 +22,9 @@ type FastQueue struct { // or when MustClose is called. cond sync.Cond + // isPQDisabled is set to true when pq is disabled. isPQDisabled bool + // pq is file-based queue pq *queue @@ -43,7 +45,7 @@ type FastQueue struct { // if maxPendingBytes is 0, then the queue size is unlimited. // Otherwise its size is limited by maxPendingBytes. The oldest data is dropped when the queue // reaches maxPendingSize. -// if isPQDisabled is set to true, all write requests that exceed in-memory buffer capacity'll be rejected with errQueueIsFull error +// if isPQDisabled is set to true, then write requests that exceed in-memory buffer capacity are rejected. // in-memory queue part can be stored on disk during gracefull shutdown. func MustOpenFastQueue(path, name string, maxInmemoryBlocks int, maxPendingBytes int64, isPQDisabled bool) *FastQueue { pq := mustOpen(path, name, maxPendingBytes) @@ -65,8 +67,8 @@ func MustOpenFastQueue(path, name string, maxInmemoryBlocks int, maxPendingBytes return fq } -// IsWritesBlocked checks if data can be pushed into the queue -func (fq *FastQueue) IsWritesBlocked() bool { +// IsWriteBlocked checks if data can be pushed into fq +func (fq *FastQueue) IsWriteBlocked() bool { if !fq.isPQDisabled { return false } @@ -132,11 +134,6 @@ func (fq *FastQueue) flushInmemoryBlocksToFileLocked() { func (fq *FastQueue) GetPendingBytes() uint64 { fq.mu.Lock() defer fq.mu.Unlock() - return fq.getPendingBytesLocked() -} - -func (fq *FastQueue) getPendingBytesLocked() uint64 { - n := fq.pendingInmemoryBytes n += fq.pq.GetPendingBytes() return n @@ -150,49 +147,53 @@ func (fq *FastQueue) GetInmemoryQueueLen() int { return len(fq.ch) } -// MustWriteBlockIgnoreDisabledPQ writes block to fq, persists data on disk even if persistent disabled by flag. -// it's needed to gracefully stop service and do not lose data if remote storage is not available. +// MustWriteBlockIgnoreDisabledPQ unconditionally writes block to fq. +// +// This method allows perisisting in-memory blocks during graceful shutdown, even if persistence is disabled. func (fq *FastQueue) MustWriteBlockIgnoreDisabledPQ(block []byte) { - if !fq.writeBlock(block, true) { - logger.Fatalf("BUG: MustWriteBlockIgnoreDisabledPQ must always write data even if persistence is disabled") + if !fq.tryWriteBlock(block, true) { + logger.Fatalf("BUG: tryWriteBlock must always write data even if persistence is disabled") } } -// WriteBlock writes block to fq. -func (fq *FastQueue) WriteBlock(block []byte) bool { - return fq.writeBlock(block, false) +// TryWriteBlock tries writing block to fq. +// +// false is returned if the block couldn't be written to fq when the in-memory queue is full +// and the persistent queue is disabled. +func (fq *FastQueue) TryWriteBlock(block []byte) bool { + return fq.tryWriteBlock(block, false) } // WriteBlock writes block to fq. -func (fq *FastQueue) writeBlock(block []byte, mustIgnoreDisabledPQ bool) bool { +func (fq *FastQueue) tryWriteBlock(block []byte, ignoreDisabledPQ bool) bool { fq.mu.Lock() defer fq.mu.Unlock() - isPQWritesAllowed := !fq.isPQDisabled || mustIgnoreDisabledPQ + isPQWriteAllowed := !fq.isPQDisabled || ignoreDisabledPQ fq.flushInmemoryBlocksToFileIfNeededLocked() if n := fq.pq.GetPendingBytes(); n > 0 { - if !isPQWritesAllowed { - return false - } // The file-based queue isn't drained yet. This means that in-memory queue cannot be used yet. // So put the block to file-based queue. if len(fq.ch) > 0 { logger.Panicf("BUG: the in-memory queue must be empty when the file-based queue is non-empty; it contains %d pending bytes", n) } + if !isPQWriteAllowed { + return false + } fq.pq.MustWriteBlock(block) return true } if len(fq.ch) == cap(fq.ch) { - // There is no space in the in-memory queue. Put the data to file-based queue. - if !isPQWritesAllowed { + // There is no space left in the in-memory queue. Put the data to file-based queue. + if !isPQWriteAllowed { return false } fq.flushInmemoryBlocksToFileLocked() fq.pq.MustWriteBlock(block) return true } - // There is enough space in the in-memory queue. + // Fast path - put the block to in-memory queue. bb := blockBufPool.Get() bb.B = append(bb.B[:0], block...) fq.ch <- bb diff --git a/lib/persistentqueue/fastqueue_test.go b/lib/persistentqueue/fastqueue_test.go index 198f73884..affa04169 100644 --- a/lib/persistentqueue/fastqueue_test.go +++ b/lib/persistentqueue/fastqueue_test.go @@ -11,7 +11,7 @@ func TestFastQueueOpenClose(_ *testing.T) { path := "fast-queue-open-close" mustDeleteDir(path) for i := 0; i < 10; i++ { - fq := MustOpenFastQueue(path, "foobar", 100, 0, true) + fq := MustOpenFastQueue(path, "foobar", 100, 0, false) fq.MustClose() } mustDeleteDir(path) @@ -22,15 +22,15 @@ func TestFastQueueWriteReadInmemory(t *testing.T) { mustDeleteDir(path) capacity := 100 - fq := MustOpenFastQueue(path, "foobar", capacity, 0, true) + fq := MustOpenFastQueue(path, "foobar", capacity, 0, false) if n := fq.GetInmemoryQueueLen(); n != 0 { t.Fatalf("unexpected non-zero inmemory queue size: %d", n) } var blocks []string for i := 0; i < capacity; i++ { block := fmt.Sprintf("block %d", i) - if !fq.WriteBlock([]byte(block)) { - t.Fatalf("unexpected false for WriteBlock") + if !fq.TryWriteBlock([]byte(block)) { + t.Fatalf("TryWriteBlock must return true in this context") } blocks = append(blocks, block) } @@ -62,8 +62,8 @@ func TestFastQueueWriteReadMixed(t *testing.T) { var blocks []string for i := 0; i < 2*capacity; i++ { block := fmt.Sprintf("block %d", i) - if !fq.WriteBlock([]byte(block)) { - t.Fatalf("not expected WriteBlock fail") + if !fq.TryWriteBlock([]byte(block)) { + t.Fatalf("TryWriteBlock must return true in this context") } blocks = append(blocks, block) } @@ -98,8 +98,8 @@ func TestFastQueueWriteReadWithCloses(t *testing.T) { var blocks []string for i := 0; i < 2*capacity; i++ { block := fmt.Sprintf("block %d", i) - if !fq.WriteBlock([]byte(block)) { - t.Fatalf("unexpected false for WriteBlock") + if !fq.TryWriteBlock([]byte(block)) { + t.Fatalf("TryWriteBlock must return true in this context") } blocks = append(blocks, block) @@ -176,8 +176,8 @@ func TestFastQueueReadUnblockByWrite(t *testing.T) { } resultCh <- nil }() - if !fq.WriteBlock([]byte(block)) { - t.Fatalf("unexpected false for WriteBlock") + if !fq.TryWriteBlock([]byte(block)) { + t.Fatalf("TryWriteBlock must return true in this context") } select { case err := <-resultCh: @@ -235,9 +235,8 @@ func TestFastQueueReadWriteConcurrent(t *testing.T) { go func() { defer writersWG.Done() for block := range blocksCh { - if !fq.WriteBlock([]byte(block)) { - t.Errorf("unexpected false for WriteBlock") - return + if !fq.TryWriteBlock([]byte(block)) { + panic(fmt.Errorf("TryWriteBlock must return true in this context")) } } }() @@ -303,12 +302,12 @@ func TestFastQueueWriteReadWithDisabledPQ(t *testing.T) { var blocks []string for i := 0; i < capacity; i++ { block := fmt.Sprintf("block %d", i) - if !fq.WriteBlock([]byte(block)) { - t.Fatalf("unexpected false for WriteBlock") + if !fq.TryWriteBlock([]byte(block)) { + t.Fatalf("TryWriteBlock must return true in this context") } blocks = append(blocks, block) } - if fq.WriteBlock([]byte("error-block")) { + if fq.TryWriteBlock([]byte("error-block")) { t.Fatalf("expect false due to full queue") } @@ -339,12 +338,12 @@ func TestFastQueueWriteReadWithIgnoreDisabledPQ(t *testing.T) { var blocks []string for i := 0; i < capacity; i++ { block := fmt.Sprintf("block %d", i) - if !fq.WriteBlock([]byte(block)) { - t.Fatalf("unexpected false for WriteBlock") + if !fq.TryWriteBlock([]byte(block)) { + t.Fatalf("TryWriteBlock must return true in this context") } blocks = append(blocks, block) } - if fq.WriteBlock([]byte("error-block")) { + if fq.TryWriteBlock([]byte("error-block")) { t.Fatalf("expect false due to full queue") } for i := 0; i < capacity; i++ { diff --git a/lib/persistentqueue/fastqueue_timing_test.go b/lib/persistentqueue/fastqueue_timing_test.go index b14daf4da..311fec6f9 100644 --- a/lib/persistentqueue/fastqueue_timing_test.go +++ b/lib/persistentqueue/fastqueue_timing_test.go @@ -22,7 +22,7 @@ func BenchmarkFastQueueThroughputSerial(b *testing.B) { mustDeleteDir(path) }() for i := 0; i < b.N; i++ { - writeReadIterationFastQueue(b, fq, block, iterationsCount) + writeReadIterationFastQueue(fq, block, iterationsCount) } }) } @@ -44,17 +44,17 @@ func BenchmarkFastQueueThroughputConcurrent(b *testing.B) { }() b.RunParallel(func(pb *testing.PB) { for pb.Next() { - writeReadIterationFastQueue(b, fq, block, iterationsCount) + writeReadIterationFastQueue(fq, block, iterationsCount) } }) }) } } -func writeReadIterationFastQueue(b *testing.B, fq *FastQueue, block []byte, iterationsCount int) { +func writeReadIterationFastQueue(fq *FastQueue, block []byte, iterationsCount int) { for i := 0; i < iterationsCount; i++ { - if !fq.WriteBlock(block) { - b.Fatalf("unexpected false for WriteBlock") + if !fq.TryWriteBlock(block) { + panic(fmt.Errorf("TryWriteBlock must return true")) } } var ok bool diff --git a/lib/promscrape/scraper.go b/lib/promscrape/scraper.go index 1428bdee2..0880ac204 100644 --- a/lib/promscrape/scraper.go +++ b/lib/promscrape/scraper.go @@ -59,18 +59,13 @@ func CheckConfig() error { // Init initializes Prometheus scraper with config from the `-promscrape.config`. // // Scraped data is passed to pushData. -func Init(pushData func(at *auth.Token, wr *prompbmarshal.WriteRequest) bool) { +func Init(pushData func(at *auth.Token, wr *prompbmarshal.WriteRequest)) { mustInitClusterMemberID() - pushDataTrackDropped := func(at *auth.Token, wr *prompbmarshal.WriteRequest) { - if !pushData(at, wr) { - pushDataFailsTotal.Inc() - } - } globalStopChan = make(chan struct{}) scraperWG.Add(1) go func() { defer scraperWG.Done() - runScraper(*promscrapeConfigFile, pushDataTrackDropped, globalStopChan) + runScraper(*promscrapeConfigFile, pushData, globalStopChan) }() } @@ -89,8 +84,6 @@ var ( // configData contains -promscrape.config data configData atomic.Pointer[[]byte] - - pushDataFailsTotal = metrics.NewCounter(`vm_promscrape_push_samples_dropped_total`) ) // WriteConfigData writes -promscrape.config contents to w