diff --git a/app/vmagent/csvimport/request_handler.go b/app/vmagent/csvimport/request_handler.go index 00d9846d7..835944db5 100644 --- a/app/vmagent/csvimport/request_handler.go +++ b/app/vmagent/csvimport/request_handler.go @@ -65,7 +65,9 @@ func insertRows(at *auth.Token, rows []parser.Row, extraLabels []prompbmarshal.L ctx.WriteRequest.Timeseries = tssDst ctx.Labels = labels ctx.Samples = samples - remotewrite.Push(at, &ctx.WriteRequest) + if !remotewrite.Push(at, &ctx.WriteRequest) { + return remotewrite.ErrQueueFullHTTPRetry + } rowsInserted.Add(len(rows)) if at != nil { rowsTenantInserted.Get(at).Add(len(rows)) diff --git a/app/vmagent/datadog/request_handler.go b/app/vmagent/datadog/request_handler.go index b7cb1a2dc..971cf450c 100644 --- a/app/vmagent/datadog/request_handler.go +++ b/app/vmagent/datadog/request_handler.go @@ -88,7 +88,9 @@ func insertRows(at *auth.Token, series []datadog.Series, extraLabels []prompbmar ctx.WriteRequest.Timeseries = tssDst ctx.Labels = labels ctx.Samples = samples - remotewrite.Push(at, &ctx.WriteRequest) + if !remotewrite.Push(at, &ctx.WriteRequest) { + return remotewrite.ErrQueueFullHTTPRetry + } rowsInserted.Add(rowsTotal) if at != nil { rowsTenantInserted.Get(at).Add(rowsTotal) diff --git a/app/vmagent/graphite/request_handler.go b/app/vmagent/graphite/request_handler.go index 2c8180667..4e124d91a 100644 --- a/app/vmagent/graphite/request_handler.go +++ b/app/vmagent/graphite/request_handler.go @@ -56,7 +56,9 @@ func insertRows(rows []parser.Row) error { ctx.WriteRequest.Timeseries = tssDst ctx.Labels = labels ctx.Samples = samples - remotewrite.Push(nil, &ctx.WriteRequest) + if !remotewrite.Push(nil, &ctx.WriteRequest) { + return remotewrite.ErrQueueFullHTTPRetry + } rowsInserted.Add(len(rows)) rowsPerInsert.Update(float64(len(rows))) return nil diff --git a/app/vmagent/influx/request_handler.go b/app/vmagent/influx/request_handler.go index 26f508cbe..5d5e4585d 100644 --- a/app/vmagent/influx/request_handler.go +++ b/app/vmagent/influx/request_handler.go @@ -130,7 +130,9 @@ func insertRows(at *auth.Token, db string, rows []parser.Row, extraLabels []prom ctx.ctx.Labels = labels ctx.ctx.Samples = samples ctx.commonLabels = commonLabels - remotewrite.Push(at, &ctx.ctx.WriteRequest) + if !remotewrite.Push(at, &ctx.ctx.WriteRequest) { + return remotewrite.ErrQueueFullHTTPRetry + } rowsInserted.Add(rowsTotal) if at != nil { rowsTenantInserted.Get(at).Add(rowsTotal) diff --git a/app/vmagent/native/request_handler.go b/app/vmagent/native/request_handler.go index 60347b0dd..ed112648b 100644 --- a/app/vmagent/native/request_handler.go +++ b/app/vmagent/native/request_handler.go @@ -84,6 +84,8 @@ func insertRows(at *auth.Token, block *stream.Block, extraLabels []prompbmarshal ctx.WriteRequest.Timeseries = tssDst ctx.Labels = labels ctx.Samples = samples - remotewrite.Push(at, &ctx.WriteRequest) + if !remotewrite.Push(at, &ctx.WriteRequest) { + return remotewrite.ErrQueueFullHTTPRetry + } return nil } diff --git a/app/vmagent/newrelic/request_handler.go b/app/vmagent/newrelic/request_handler.go index b164fc973..9e47694fd 100644 --- a/app/vmagent/newrelic/request_handler.go +++ b/app/vmagent/newrelic/request_handler.go @@ -76,7 +76,9 @@ func insertRows(at *auth.Token, rows []newrelic.Row, extraLabels []prompbmarshal ctx.WriteRequest.Timeseries = tssDst ctx.Labels = labels ctx.Samples = samples - remotewrite.Push(at, &ctx.WriteRequest) + if !remotewrite.Push(at, &ctx.WriteRequest) { + return remotewrite.ErrQueueFullHTTPRetry + } rowsInserted.Add(len(rows)) if at != nil { rowsTenantInserted.Get(at).Add(samplesCount) diff --git a/app/vmagent/opentelemetry/request_handler.go b/app/vmagent/opentelemetry/request_handler.go index 0ad6a75fd..b308a4b7c 100644 --- a/app/vmagent/opentelemetry/request_handler.go +++ b/app/vmagent/opentelemetry/request_handler.go @@ -59,7 +59,9 @@ func insertRows(at *auth.Token, tss []prompbmarshal.TimeSeries, extraLabels []pr ctx.WriteRequest.Timeseries = tssDst ctx.Labels = labels ctx.Samples = samples - remotewrite.Push(at, &ctx.WriteRequest) + if !remotewrite.Push(at, &ctx.WriteRequest) { + return remotewrite.ErrQueueFullHTTPRetry + } rowsInserted.Add(rowsTotal) if at != nil { rowsTenantInserted.Get(at).Add(rowsTotal) diff --git a/app/vmagent/opentsdb/request_handler.go b/app/vmagent/opentsdb/request_handler.go index 8388a5238..3e8c12295 100644 --- a/app/vmagent/opentsdb/request_handler.go +++ b/app/vmagent/opentsdb/request_handler.go @@ -56,7 +56,9 @@ func insertRows(rows []parser.Row) error { ctx.WriteRequest.Timeseries = tssDst ctx.Labels = labels ctx.Samples = samples - remotewrite.Push(nil, &ctx.WriteRequest) + if !remotewrite.Push(nil, &ctx.WriteRequest) { + return remotewrite.ErrQueueFullHTTPRetry + } rowsInserted.Add(len(rows)) rowsPerInsert.Update(float64(len(rows))) return nil diff --git a/app/vmagent/opentsdbhttp/request_handler.go b/app/vmagent/opentsdbhttp/request_handler.go index 7fecea6b3..a919cb153 100644 --- a/app/vmagent/opentsdbhttp/request_handler.go +++ b/app/vmagent/opentsdbhttp/request_handler.go @@ -64,7 +64,9 @@ func insertRows(at *auth.Token, rows []parser.Row, extraLabels []prompbmarshal.L ctx.WriteRequest.Timeseries = tssDst ctx.Labels = labels ctx.Samples = samples - remotewrite.Push(at, &ctx.WriteRequest) + if !remotewrite.Push(at, &ctx.WriteRequest) { + return remotewrite.ErrQueueFullHTTPRetry + } rowsInserted.Add(len(rows)) rowsPerInsert.Update(float64(len(rows))) return nil diff --git a/app/vmagent/prometheusimport/request_handler.go b/app/vmagent/prometheusimport/request_handler.go index 58473dcae..2d0e353de 100644 --- a/app/vmagent/prometheusimport/request_handler.go +++ b/app/vmagent/prometheusimport/request_handler.go @@ -73,7 +73,9 @@ func insertRows(at *auth.Token, rows []parser.Row, extraLabels []prompbmarshal.L ctx.WriteRequest.Timeseries = tssDst ctx.Labels = labels ctx.Samples = samples - remotewrite.Push(at, &ctx.WriteRequest) + if !remotewrite.Push(at, &ctx.WriteRequest) { + return remotewrite.ErrQueueFullHTTPRetry + } rowsInserted.Add(len(rows)) if at != nil { rowsTenantInserted.Get(at).Add(len(rows)) diff --git a/app/vmagent/promremotewrite/request_handler.go b/app/vmagent/promremotewrite/request_handler.go index 8abfa6371..b2455555b 100644 --- a/app/vmagent/promremotewrite/request_handler.go +++ b/app/vmagent/promremotewrite/request_handler.go @@ -69,7 +69,9 @@ func insertRows(at *auth.Token, timeseries []prompb.TimeSeries, extraLabels []pr ctx.WriteRequest.Timeseries = tssDst ctx.Labels = labels ctx.Samples = samples - remotewrite.Push(at, &ctx.WriteRequest) + if !remotewrite.Push(at, &ctx.WriteRequest) { + return remotewrite.ErrQueueFullHTTPRetry + } rowsInserted.Add(rowsTotal) if at != nil { rowsTenantInserted.Get(at).Add(rowsTotal) diff --git a/app/vmagent/remotewrite/client.go b/app/vmagent/remotewrite/client.go index b11a786b8..94e6ad3ff 100644 --- a/app/vmagent/remotewrite/client.go +++ b/app/vmagent/remotewrite/client.go @@ -305,7 +305,7 @@ func (c *client) runWorker() { continue } // Return unsent block to the queue. - c.fq.MustWriteBlock(block) + c.fq.MustWriteBlockIgnoreDisabledPQ(block) return case <-c.stopCh: // c must be stopped. Wait for a while in the hope the block will be sent. @@ -314,11 +314,11 @@ func (c *client) runWorker() { case ok := <-ch: if !ok { // Return unsent block to the queue. - c.fq.MustWriteBlock(block) + c.fq.MustWriteBlockIgnoreDisabledPQ(block) } case <-time.After(graceDuration): // Return unsent block to the queue. - c.fq.MustWriteBlock(block) + c.fq.MustWriteBlockIgnoreDisabledPQ(block) } return } diff --git a/app/vmagent/remotewrite/pendingseries.go b/app/vmagent/remotewrite/pendingseries.go index f436e53a8..92bd03bc7 100644 --- a/app/vmagent/remotewrite/pendingseries.go +++ b/app/vmagent/remotewrite/pendingseries.go @@ -37,9 +37,9 @@ type pendingSeries struct { periodicFlusherWG sync.WaitGroup } -func newPendingSeries(pushBlock func(block []byte), isVMRemoteWrite bool, significantFigures, roundDigits int) *pendingSeries { +func newPendingSeries(fq *persistentqueue.FastQueue, isVMRemoteWrite bool, significantFigures, roundDigits int) *pendingSeries { var ps pendingSeries - ps.wr.pushBlock = pushBlock + ps.wr.fq = fq ps.wr.isVMRemoteWrite = isVMRemoteWrite ps.wr.significantFigures = significantFigures ps.wr.roundDigits = roundDigits @@ -57,10 +57,11 @@ func (ps *pendingSeries) MustStop() { ps.periodicFlusherWG.Wait() } -func (ps *pendingSeries) Push(tss []prompbmarshal.TimeSeries) { +func (ps *pendingSeries) Push(tss []prompbmarshal.TimeSeries) bool { ps.mu.Lock() - ps.wr.push(tss) + wasPushed := ps.wr.push(tss) ps.mu.Unlock() + return wasPushed } func (ps *pendingSeries) periodicFlusher() { @@ -70,18 +71,21 @@ func (ps *pendingSeries) periodicFlusher() { } ticker := time.NewTicker(*flushInterval) defer ticker.Stop() - mustStop := false - for !mustStop { + for { select { case <-ps.stopCh: - mustStop = true + ps.mu.Lock() + ps.wr.mustFlushOnStop() + ps.mu.Unlock() + return case <-ticker.C: if fasttime.UnixTimestamp()-atomic.LoadUint64(&ps.wr.lastFlushTime) < uint64(flushSeconds) { continue } } ps.mu.Lock() - ps.wr.flush() + // no-op + _ = ps.wr.flush() ps.mu.Unlock() } } @@ -90,8 +94,7 @@ type writeRequest struct { // Move lastFlushTime to the top of the struct in order to guarantee atomic access on 32-bit architectures. lastFlushTime uint64 - // pushBlock is called when whe write request is ready to be sent. - pushBlock func(block []byte) + fq *persistentqueue.FastQueue // Whether to encode the write request with VictoriaMetrics remote write protocol. isVMRemoteWrite bool @@ -130,14 +133,32 @@ func (wr *writeRequest) reset() { wr.buf = wr.buf[:0] } -func (wr *writeRequest) flush() { +// mustFlushOnStop makes force push into the queue +// needed to properly save in-memory buffer with disabled disk storage +func (wr *writeRequest) mustFlushOnStop() { wr.wr.Timeseries = wr.tss wr.adjustSampleValues() atomic.StoreUint64(&wr.lastFlushTime, fasttime.UnixTimestamp()) - pushWriteRequest(&wr.wr, wr.pushBlock, wr.isVMRemoteWrite) + if !pushWriteRequest(&wr.wr, func(block []byte) bool { + wr.fq.MustWriteBlockIgnoreDisabledPQ(block) + return true + }, wr.isVMRemoteWrite) { + return + } wr.reset() } +func (wr *writeRequest) flush() bool { + wr.wr.Timeseries = wr.tss + wr.adjustSampleValues() + atomic.StoreUint64(&wr.lastFlushTime, fasttime.UnixTimestamp()) + if !pushWriteRequest(&wr.wr, wr.fq.WriteBlock, wr.isVMRemoteWrite) { + return false + } + wr.reset() + return true +} + func (wr *writeRequest) adjustSampleValues() { samples := wr.samples if n := wr.significantFigures; n > 0 { @@ -154,21 +175,25 @@ func (wr *writeRequest) adjustSampleValues() { } } -func (wr *writeRequest) push(src []prompbmarshal.TimeSeries) { +func (wr *writeRequest) push(src []prompbmarshal.TimeSeries) bool { tssDst := wr.tss maxSamplesPerBlock := *maxRowsPerBlock // Allow up to 10x of labels per each block on average. maxLabelsPerBlock := 10 * maxSamplesPerBlock for i := range src { - tssDst = append(tssDst, prompbmarshal.TimeSeries{}) - wr.copyTimeSeries(&tssDst[len(tssDst)-1], &src[i]) if len(wr.samples) >= maxSamplesPerBlock || len(wr.labels) >= maxLabelsPerBlock { wr.tss = tssDst - wr.flush() + if !wr.flush() { + return false + } tssDst = wr.tss } + tssDst = append(tssDst, prompbmarshal.TimeSeries{}) + wr.copyTimeSeries(&tssDst[len(tssDst)-1], &src[i]) } + wr.tss = tssDst + return true } func (wr *writeRequest) copyTimeSeries(dst, src *prompbmarshal.TimeSeries) { @@ -196,10 +221,10 @@ func (wr *writeRequest) copyTimeSeries(dst, src *prompbmarshal.TimeSeries) { wr.buf = buf } -func pushWriteRequest(wr *prompbmarshal.WriteRequest, pushBlock func(block []byte), isVMRemoteWrite bool) { +func pushWriteRequest(wr *prompbmarshal.WriteRequest, pushBlock func(block []byte) bool, isVMRemoteWrite bool) bool { if len(wr.Timeseries) == 0 { // Nothing to push - return + return true } bb := writeRequestBufPool.Get() bb.B = prompbmarshal.MarshalWriteRequest(bb.B[:0], wr) @@ -212,11 +237,13 @@ func pushWriteRequest(wr *prompbmarshal.WriteRequest, pushBlock func(block []byt } writeRequestBufPool.Put(bb) if len(zb.B) <= persistentqueue.MaxBlockSize { - pushBlock(zb.B) + if !pushBlock(zb.B) { + return false + } blockSizeRows.Update(float64(len(wr.Timeseries))) blockSizeBytes.Update(float64(len(zb.B))) snappyBufPool.Put(zb) - return + return true } snappyBufPool.Put(zb) } else { @@ -229,23 +256,32 @@ func pushWriteRequest(wr *prompbmarshal.WriteRequest, pushBlock func(block []byt samples := wr.Timeseries[0].Samples if len(samples) == 1 { logger.Warnf("dropping a sample for metric with too long labels exceeding -remoteWrite.maxBlockSize=%d bytes", maxUnpackedBlockSize.N) - return + return true } n := len(samples) / 2 wr.Timeseries[0].Samples = samples[:n] - pushWriteRequest(wr, pushBlock, isVMRemoteWrite) + if !pushWriteRequest(wr, pushBlock, isVMRemoteWrite) { + return false + } wr.Timeseries[0].Samples = samples[n:] - pushWriteRequest(wr, pushBlock, isVMRemoteWrite) + if !pushWriteRequest(wr, pushBlock, isVMRemoteWrite) { + return false + } wr.Timeseries[0].Samples = samples - return + return true } timeseries := wr.Timeseries n := len(timeseries) / 2 wr.Timeseries = timeseries[:n] - pushWriteRequest(wr, pushBlock, isVMRemoteWrite) + if !pushWriteRequest(wr, pushBlock, isVMRemoteWrite) { + return false + } wr.Timeseries = timeseries[n:] - pushWriteRequest(wr, pushBlock, isVMRemoteWrite) + if !pushWriteRequest(wr, pushBlock, isVMRemoteWrite) { + return false + } wr.Timeseries = timeseries + return true } var ( diff --git a/app/vmagent/remotewrite/pendingseries_test.go b/app/vmagent/remotewrite/pendingseries_test.go index 5b7a51219..90a097a59 100644 --- a/app/vmagent/remotewrite/pendingseries_test.go +++ b/app/vmagent/remotewrite/pendingseries_test.go @@ -26,13 +26,14 @@ func testPushWriteRequest(t *testing.T, rowsCount, expectedBlockLenProm, expecte t.Helper() wr := newTestWriteRequest(rowsCount, 20) pushBlockLen := 0 - pushBlock := func(block []byte) { + pushBlock := func(block []byte) bool { if pushBlockLen > 0 { panic(fmt.Errorf("BUG: pushBlock called multiple times; pushBlockLen=%d at first call, len(block)=%d at second call", pushBlockLen, len(block))) } pushBlockLen = len(block) + return true } - pushWriteRequest(wr, pushBlock, isVMRemoteWrite) + _ = pushWriteRequest(wr, pushBlock, isVMRemoteWrite) if math.Abs(float64(pushBlockLen-expectedBlockLen)/float64(expectedBlockLen)*100) > tolerancePrc { t.Fatalf("unexpected block len for rowsCount=%d, isVMRemoteWrite=%v; got %d bytes; expecting %d bytes +- %.0f%%", rowsCount, isVMRemoteWrite, pushBlockLen, expectedBlockLen, tolerancePrc) diff --git a/app/vmagent/remotewrite/remotewrite.go b/app/vmagent/remotewrite/remotewrite.go index d9264e3ba..07f573b43 100644 --- a/app/vmagent/remotewrite/remotewrite.go +++ b/app/vmagent/remotewrite/remotewrite.go @@ -3,6 +3,7 @@ package remotewrite import ( "flag" "fmt" + "net/http" "net/url" "path/filepath" "strconv" @@ -10,6 +11,8 @@ import ( "sync/atomic" "time" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/auth" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bloomfilter" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" @@ -84,6 +87,9 @@ var ( "are written to the corresponding -remoteWrite.url . See also -remoteWrite.streamAggr.keepInput and https://docs.victoriametrics.com/stream-aggregation.html") streamAggrDedupInterval = flagutil.NewArrayDuration("remoteWrite.streamAggr.dedupInterval", 0, "Input samples are de-duplicated with this interval before being aggregated. "+ "Only the last sample per each time series per each interval is aggregated if the interval is greater than zero") + disableOnDiskQueue = flag.Bool("remoteWrite.disableOnDiskQueue", false, "Whether to disable on-disk queue for metrics ingestion processing. "+ + "If in-memory queue is full for at least 1 remoteWrite target, all data ingestion is blocked and returns an error. "+ + "It allows to build a chain of vmagents and build complicated data pipelines without data-loss. On-disk writes is still possible during graceful shutdown for storing in-memory part of the queue.") ) var ( @@ -96,6 +102,13 @@ var ( // Data without tenant id is written to defaultAuthToken if -remoteWrite.multitenantURL is specified. defaultAuthToken = &auth.Token{} + + // ErrQueueFullHTTPRetry returned when -remoteWrite.disableOnDiskQueue enabled + // and one of remote storage cannot handle a load + ErrQueueFullHTTPRetry = &httpserver.ErrorWithStatusCode{ + Err: fmt.Errorf("in-memory queue is full, write requests blocked due to enabled flag -remoteWrite.disableOnDiskQueue=true. Retry request later"), + StatusCode: http.StatusTooManyRequests, + } ) // MultitenancyEnabled returns true if -remoteWrite.multitenantURL is specified. @@ -350,7 +363,7 @@ func Stop() { // If at isn't nil, the data is pushed to the configured `-remoteWrite.multitenantURL`. // // Note that wr may be modified by Push because of relabeling and rounding. -func Push(at *auth.Token, wr *prompbmarshal.WriteRequest) { +func Push(at *auth.Token, wr *prompbmarshal.WriteRequest) bool { if at == nil && len(*remoteWriteMultitenantURLs) > 0 { // Write data to default tenant if at isn't set while -remoteWrite.multitenantURL is set. at = defaultAuthToken @@ -374,6 +387,17 @@ func Push(at *auth.Token, wr *prompbmarshal.WriteRequest) { } rwctxsMapLock.Unlock() } + var isWritesLocked bool + for _, rwctx := range rwctxs { + if rwctx.fq.IsWritesBlocked() { + isWritesLocked = true + break + } + } + // fast path, write path is blocked + if isWritesLocked { + return false + } var rctx *relabelCtx rcs := allRelabelConfigs.Load() @@ -415,7 +439,13 @@ func Push(at *auth.Token, wr *prompbmarshal.WriteRequest) { } sortLabelsIfNeeded(tssBlock) tssBlock = limitSeriesCardinality(tssBlock) - pushBlockToRemoteStorages(rwctxs, tssBlock) + if !pushBlockToRemoteStorages(rwctxs, tssBlock) { + if rctx != nil { + rctx.reset() + putRelabelCtx(rctx) + } + return false + } if rctx != nil { rctx.reset() } @@ -423,17 +453,19 @@ func Push(at *auth.Token, wr *prompbmarshal.WriteRequest) { if rctx != nil { putRelabelCtx(rctx) } + return true } -func pushBlockToRemoteStorages(rwctxs []*remoteWriteCtx, tssBlock []prompbmarshal.TimeSeries) { +func pushBlockToRemoteStorages(rwctxs []*remoteWriteCtx, tssBlock []prompbmarshal.TimeSeries) bool { if len(tssBlock) == 0 { // Nothing to push - return + return true } + if len(rwctxs) == 1 { // Fast path - just push data to the configured single remote storage - rwctxs[0].Push(tssBlock) - return + err := rwctxs[0].Push(tssBlock) + return err } // We need to push tssBlock to multiple remote storages. @@ -462,6 +494,7 @@ func pushBlockToRemoteStorages(rwctxs []*remoteWriteCtx, tssBlock []prompbmarsha // the time needed for sending the data to multiple remote storage systems. var wg sync.WaitGroup wg.Add(len(rwctxs)) + var anyPushFailed uint64 for i, rwctx := range rwctxs { tssShard := tssByURL[i] if len(tssShard) == 0 { @@ -469,11 +502,13 @@ func pushBlockToRemoteStorages(rwctxs []*remoteWriteCtx, tssBlock []prompbmarsha } go func(rwctx *remoteWriteCtx, tss []prompbmarshal.TimeSeries) { defer wg.Done() - rwctx.Push(tss) + if !rwctx.Push(tss) { + atomic.StoreUint64(&anyPushFailed, 1) + } }(rwctx, tssShard) } wg.Wait() - return + return atomic.LoadUint64(&anyPushFailed) == 0 } // Replicate data among rwctxs. @@ -481,13 +516,17 @@ func pushBlockToRemoteStorages(rwctxs []*remoteWriteCtx, tssBlock []prompbmarsha // the time needed for sending the data to multiple remote storage systems. var wg sync.WaitGroup wg.Add(len(rwctxs)) + var anyPushFailed uint64 for _, rwctx := range rwctxs { go func(rwctx *remoteWriteCtx) { defer wg.Done() - rwctx.Push(tssBlock) + if !rwctx.Push(tssBlock) { + atomic.StoreUint64(&anyPushFailed, 1) + } }(rwctx) } wg.Wait() + return atomic.LoadUint64(&anyPushFailed) == 0 } // sortLabelsIfNeeded sorts labels if -sortLabels command-line flag is set. @@ -590,8 +629,9 @@ type remoteWriteCtx struct { pss []*pendingSeries pssNextIdx uint64 - rowsPushedAfterRelabel *metrics.Counter - rowsDroppedByRelabel *metrics.Counter + rowsPushedAfterRelabel *metrics.Counter + rowsDroppedByRelabel *metrics.Counter + rowsDroppedAtAggregationOnPush *metrics.Counter } func newRemoteWriteCtx(argIdx int, remoteWriteURL *url.URL, maxInmemoryBlocks int, sanitizedURL string) *remoteWriteCtx { @@ -607,13 +647,19 @@ func newRemoteWriteCtx(argIdx int, remoteWriteURL *url.URL, maxInmemoryBlocks in logger.Warnf("rounding the -remoteWrite.maxDiskUsagePerURL=%d to the minimum supported value: %d", maxPendingBytes, persistentqueue.DefaultChunkFileSize) maxPendingBytes = persistentqueue.DefaultChunkFileSize } - fq := persistentqueue.MustOpenFastQueue(queuePath, sanitizedURL, maxInmemoryBlocks, maxPendingBytes) + fq := persistentqueue.MustOpenFastQueue(queuePath, sanitizedURL, maxInmemoryBlocks, maxPendingBytes, *disableOnDiskQueue) _ = metrics.GetOrCreateGauge(fmt.Sprintf(`vmagent_remotewrite_pending_data_bytes{path=%q, url=%q}`, queuePath, sanitizedURL), func() float64 { return float64(fq.GetPendingBytes()) }) _ = metrics.GetOrCreateGauge(fmt.Sprintf(`vmagent_remotewrite_pending_inmemory_blocks{path=%q, url=%q}`, queuePath, sanitizedURL), func() float64 { return float64(fq.GetInmemoryQueueLen()) }) + _ = metrics.GetOrCreateGauge(fmt.Sprintf(`vmagent_remotewrite_inmemory_queue_blocked{path=%q, url=%q}`, queuePath, sanitizedURL), func() float64 { + if fq.IsWritesBlocked() { + return 1.0 + } + return 0 + }) var c *client switch remoteWriteURL.Scheme { @@ -635,7 +681,7 @@ func newRemoteWriteCtx(argIdx int, remoteWriteURL *url.URL, maxInmemoryBlocks in } pss := make([]*pendingSeries, pssLen) for i := range pss { - pss[i] = newPendingSeries(fq.MustWriteBlock, c.useVMProto, sf, rd) + pss[i] = newPendingSeries(fq, c.useVMProto, sf, rd) } rwctx := &remoteWriteCtx{ @@ -644,15 +690,16 @@ func newRemoteWriteCtx(argIdx int, remoteWriteURL *url.URL, maxInmemoryBlocks in c: c, pss: pss, - rowsPushedAfterRelabel: metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_rows_pushed_after_relabel_total{path=%q, url=%q}`, queuePath, sanitizedURL)), - rowsDroppedByRelabel: metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_relabel_metrics_dropped_total{path=%q, url=%q}`, queuePath, sanitizedURL)), + rowsPushedAfterRelabel: metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_rows_pushed_after_relabel_total{path=%q, url=%q}`, queuePath, sanitizedURL)), + rowsDroppedByRelabel: metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_relabel_metrics_dropped_total{path=%q, url=%q}`, queuePath, sanitizedURL)), + rowsDroppedAtAggregationOnPush: metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_aggregation_metrics_dropped_total{path=%q, url=%q}`, queuePath, sanitizedURL)), } // Initialize sas sasFile := streamAggrConfig.GetOptionalArg(argIdx) if sasFile != "" { dedupInterval := streamAggrDedupInterval.GetOptionalArg(argIdx) - sas, err := streamaggr.LoadFromFile(sasFile, rwctx.pushInternal, dedupInterval) + sas, err := streamaggr.LoadFromFile(sasFile, rwctx.pushInternalTrackDropped, dedupInterval) if err != nil { logger.Fatalf("cannot initialize stream aggregators from -remoteWrite.streamAggr.config=%q: %s", sasFile, err) } @@ -688,7 +735,7 @@ func (rwctx *remoteWriteCtx) MustStop() { rwctx.rowsDroppedByRelabel = nil } -func (rwctx *remoteWriteCtx) Push(tss []prompbmarshal.TimeSeries) { +func (rwctx *remoteWriteCtx) Push(tss []prompbmarshal.TimeSeries) bool { // Apply relabeling var rctx *relabelCtx var v *[]prompbmarshal.TimeSeries @@ -726,14 +773,16 @@ func (rwctx *remoteWriteCtx) Push(tss []prompbmarshal.TimeSeries) { } matchIdxsPool.Put(matchIdxs) } - rwctx.pushInternal(tss) + defer func() { + // Return back relabeling contexts to the pool + if rctx != nil { + *v = prompbmarshal.ResetTimeSeries(tss) + tssPool.Put(v) + putRelabelCtx(rctx) + } + }() - // Return back relabeling contexts to the pool - if rctx != nil { - *v = prompbmarshal.ResetTimeSeries(tss) - tssPool.Put(v) - putRelabelCtx(rctx) - } + return rwctx.pushInternal(tss) } var matchIdxsPool bytesutil.ByteBufferPool @@ -753,9 +802,22 @@ func dropAggregatedSeries(src []prompbmarshal.TimeSeries, matchIdxs []byte, drop return dst } -func (rwctx *remoteWriteCtx) pushInternal(tss []prompbmarshal.TimeSeries) { +func (rwctx *remoteWriteCtx) pushInternalTrackDropped(tss []prompbmarshal.TimeSeries) { + if !rwctx.pushInternal(tss) { + rwctx.rowsDroppedAtAggregationOnPush.Inc() + } +} + +func (rwctx *remoteWriteCtx) pushInternal(tss []prompbmarshal.TimeSeries) bool { var rctx *relabelCtx var v *[]prompbmarshal.TimeSeries + defer func() { + if rctx != nil { + *v = prompbmarshal.ResetTimeSeries(tss) + tssPool.Put(v) + putRelabelCtx(rctx) + } + }() if len(labelsGlobal) > 0 { // Make a copy of tss before adding extra labels in order to prevent // from affecting time series for other remoteWrite.url configs. @@ -767,13 +829,7 @@ func (rwctx *remoteWriteCtx) pushInternal(tss []prompbmarshal.TimeSeries) { pss := rwctx.pss idx := atomic.AddUint64(&rwctx.pssNextIdx, 1) % uint64(len(pss)) - pss[idx].Push(tss) - - if rctx != nil { - *v = prompbmarshal.ResetTimeSeries(tss) - tssPool.Put(v) - putRelabelCtx(rctx) - } + return pss[idx].Push(tss) } func (rwctx *remoteWriteCtx) reinitStreamAggr() { @@ -786,7 +842,7 @@ func (rwctx *remoteWriteCtx) reinitStreamAggr() { logger.Infof("reloading stream aggregation configs pointed by -remoteWrite.streamAggr.config=%q", sasFile) metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reloads_total{path=%q}`, sasFile)).Inc() dedupInterval := streamAggrDedupInterval.GetOptionalArg(rwctx.idx) - sasNew, err := streamaggr.LoadFromFile(sasFile, rwctx.pushInternal, dedupInterval) + sasNew, err := streamaggr.LoadFromFile(sasFile, rwctx.pushInternalTrackDropped, dedupInterval) if err != nil { metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reloads_errors_total{path=%q}`, sasFile)).Inc() metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_successful{path=%q}`, sasFile)).Set(0) diff --git a/app/vmagent/vmimport/request_handler.go b/app/vmagent/vmimport/request_handler.go index 838f7a6ca..68e4875f7 100644 --- a/app/vmagent/vmimport/request_handler.go +++ b/app/vmagent/vmimport/request_handler.go @@ -76,7 +76,9 @@ func insertRows(at *auth.Token, rows []parser.Row, extraLabels []prompbmarshal.L ctx.WriteRequest.Timeseries = tssDst ctx.Labels = labels ctx.Samples = samples - remotewrite.Push(at, &ctx.WriteRequest) + if !remotewrite.Push(at, &ctx.WriteRequest) { + return remotewrite.ErrQueueFullHTTPRetry + } rowsInserted.Add(rowsTotal) if at != nil { rowsTenantInserted.Get(at).Add(rowsTotal) diff --git a/app/vminsert/main.go b/app/vminsert/main.go index a53f8216d..9f23614fe 100644 --- a/app/vminsert/main.go +++ b/app/vminsert/main.go @@ -97,8 +97,9 @@ func Init() { if len(*opentsdbHTTPListenAddr) > 0 { opentsdbhttpServer = opentsdbhttpserver.MustStart(*opentsdbHTTPListenAddr, *opentsdbHTTPUseProxyProtocol, opentsdbhttp.InsertHandler) } - promscrape.Init(func(at *auth.Token, wr *prompbmarshal.WriteRequest) { + promscrape.Init(func(at *auth.Token, wr *prompbmarshal.WriteRequest) bool { prompush.Push(wr) + return true }) } diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 2246e305e..86bbadb6f 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -28,6 +28,7 @@ The sandbox cluster installation is running under the constant load generated by ## tip +* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add `-remoteWrite.disableOnDiskQueue` command-line flag, which can be used for disabling data queuing to disk when the remote storage cannot keep up with the data ingestion rate. In this case `vmagent` returns `429 Too Many Requests` response, so clients could decrease data ingestion rate on their side. This option may be useful when `vmagent` runs in environments with slow persistent disks. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2110). * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add support for reading and writing samples via [Google PubSub](https://cloud.google.com/pubsub). See [these docs](https://docs.victoriametrics.com/vmagent.html#google-pubsub-integration). * FEATURE: reduce the default value for `-import.maxLineLen` command-line flag from 100MB to 10MB in order to prevent excessive memory usage during data import via [/api/v1/import](https://docs.victoriametrics.com/#how-to-import-data-in-json-line-format). diff --git a/docs/vmagent.md b/docs/vmagent.md index e577abc1b..637428547 100644 --- a/docs/vmagent.md +++ b/docs/vmagent.md @@ -869,6 +869,48 @@ scrape_configs: - "Proxy-Auth: top-secret" ``` +## Disabling on-disk queue + +On-disk queue aka persistent queue is a temporary folder configured via `-remoteWrite.tmpDataPath` flag. At this folder vmagent may store metric blocks. +Metric blocks persisted on disk if remote storage is not available or cannot handle ingestion load. +Size of this disk queue per remote storage can be limited via `-remoteWrite.maxDiskUsagePerURL`. By default, there is no limit. +In case of reaching those limit metric blocks will be silently dropped by vmagent. + +This behaviour can be changed via flag `--remoteWrite.disableOnDiskQueue=true`. +It prevents vmagent from using on-disk storage for data buffering during ingestion or scraping. +But on-disk storage is still used for saving in-memory part of the queue and buffers during graceful shutdown. + +It's expected that `streaming` aggregation and `scrapping` metrics will be dropped in case of full queue. +The following metrics help to detect samples drop: `vmagent_remotewrite_aggregation_metrics_dropped_total` and `vm_promscrape_push_samples_dropped_total`. + +In case of multiple configured remote storages, vmagent block writes requests even if a single remote storage cannot accept ingested samples. + +vmagent guarantees at-least-once delivery semantic. +It means that metric samples duplication is possible and [deduplication](https://docs.victoriametrics.com/#deduplication) must be configured at remote storage. + +### Common patterns +You may want to disable on-disk queue in the following cases: + +1) chaining of vmagents. Intermediate vmagents used for aggregation may loss the data, if vmcluster is not available. + With disabled persistent queue aggregation vmagents will back-pressure metrics to the first vmagent. + +```mermaid +flowchart LR + A[vmagent] --> B(vmagent-aggregation-0) + A[vmagent] --> C(vmagent-aggregation-1) + B --> D[vmcluster] + C --> D[vmcluster] +``` + +2) If you want to replace actual on-disk queue with kafka or another compatible queue. On-disk queue must be disabled at `vmagent-consumer` + +```mermaid +flowchart LR + A[vmagent] --> B(kafka) + B <--> C(vmagent-consumer) + C --> D[vmcluster] +``` + ## Cardinality limiter By default, `vmagent` doesn't limit the number of time series each scrape target can expose. @@ -1746,6 +1788,9 @@ See the docs at https://docs.victoriametrics.com/vmagent.html . -remoteWrite.bearerTokenFile array Optional path to bearer token file to use for the corresponding -remoteWrite.url. The token is re-read from the file every second Supports an array of values separated by comma or specified via multiple flags. + -remoteWrite.disableOnDiskQueue + Whether to disable on-disk queue for metrics ingestion processing. If in-memory queue is full for at least 1 remoteWrite target, all data ingestion is blocked and returns an error. + It allows to build a chain of vmagents and build complicated data pipelines without data-loss. On-disk writes is still possible during graceful shutdown for storing in-memory part of the queue. -remoteWrite.flushInterval duration Interval for flushing the data to remote storage. This option takes effect only when less than 10K data points per second are pushed to -remoteWrite.url (default 1s) -remoteWrite.forcePromProto array diff --git a/lib/persistentqueue/fastqueue.go b/lib/persistentqueue/fastqueue.go index 77cfe38f3..f08a73f92 100644 --- a/lib/persistentqueue/fastqueue.go +++ b/lib/persistentqueue/fastqueue.go @@ -22,6 +22,7 @@ type FastQueue struct { // or when MustClose is called. cond sync.Cond + isPQDisabled bool // pq is file-based queue pq *queue @@ -42,11 +43,14 @@ type FastQueue struct { // if maxPendingBytes is 0, then the queue size is unlimited. // Otherwise its size is limited by maxPendingBytes. The oldest data is dropped when the queue // reaches maxPendingSize. -func MustOpenFastQueue(path, name string, maxInmemoryBlocks int, maxPendingBytes int64) *FastQueue { +// if isPQDisabled is set to true, all write requests that exceed in-memory buffer capacity'll be rejected with errQueueIsFull error +// in-memory queue part can be stored on disk during gracefull shutdown. +func MustOpenFastQueue(path, name string, maxInmemoryBlocks int, maxPendingBytes int64, isPQDisabled bool) *FastQueue { pq := mustOpen(path, name, maxPendingBytes) fq := &FastQueue{ - pq: pq, - ch: make(chan *bytesutil.ByteBuffer, maxInmemoryBlocks), + pq: pq, + isPQDisabled: isPQDisabled, + ch: make(chan *bytesutil.ByteBuffer, maxInmemoryBlocks), } fq.cond.L = &fq.mu fq.lastInmemoryBlockReadTime = fasttime.UnixTimestamp() @@ -61,6 +65,16 @@ func MustOpenFastQueue(path, name string, maxInmemoryBlocks int, maxPendingBytes return fq } +// IsWritesBlocked checks if data can be pushed into the queue +func (fq *FastQueue) IsWritesBlocked() bool { + if !fq.isPQDisabled { + return false + } + fq.mu.Lock() + defer fq.mu.Unlock() + return len(fq.ch) == cap(fq.ch) || fq.pq.GetPendingBytes() > 0 +} + // UnblockAllReaders unblocks all the readers. func (fq *FastQueue) UnblockAllReaders() { fq.mu.Lock() @@ -92,7 +106,7 @@ func (fq *FastQueue) MustClose() { } func (fq *FastQueue) flushInmemoryBlocksToFileIfNeededLocked() { - if len(fq.ch) == 0 { + if len(fq.ch) == 0 || fq.isPQDisabled { return } if fasttime.UnixTimestamp() < fq.lastInmemoryBlockReadTime+5 { @@ -118,6 +132,10 @@ func (fq *FastQueue) flushInmemoryBlocksToFileLocked() { func (fq *FastQueue) GetPendingBytes() uint64 { fq.mu.Lock() defer fq.mu.Unlock() + return fq.getPendingBytesLocked() +} + +func (fq *FastQueue) getPendingBytesLocked() uint64 { n := fq.pendingInmemoryBytes n += fq.pq.GetPendingBytes() @@ -132,26 +150,47 @@ func (fq *FastQueue) GetInmemoryQueueLen() int { return len(fq.ch) } -// MustWriteBlock writes block to fq. -func (fq *FastQueue) MustWriteBlock(block []byte) { +// MustWriteBlockIgnoreDisabledPQ writes block to fq, persists data on disk even if persistent disabled by flag. +// it's needed to gracefully stop service and do not lose data if remote storage is not available. +func (fq *FastQueue) MustWriteBlockIgnoreDisabledPQ(block []byte) { + if !fq.writeBlock(block, true) { + logger.Fatalf("BUG: MustWriteBlockIgnoreDisabledPQ must always write data even if persistence is disabled") + } +} + +// WriteBlock writes block to fq. +func (fq *FastQueue) WriteBlock(block []byte) bool { + return fq.writeBlock(block, false) +} + +// WriteBlock writes block to fq. +func (fq *FastQueue) writeBlock(block []byte, mustIgnoreDisabledPQ bool) bool { fq.mu.Lock() defer fq.mu.Unlock() + isPQWritesAllowed := !fq.isPQDisabled || mustIgnoreDisabledPQ + fq.flushInmemoryBlocksToFileIfNeededLocked() if n := fq.pq.GetPendingBytes(); n > 0 { + if !isPQWritesAllowed { + return false + } // The file-based queue isn't drained yet. This means that in-memory queue cannot be used yet. // So put the block to file-based queue. if len(fq.ch) > 0 { logger.Panicf("BUG: the in-memory queue must be empty when the file-based queue is non-empty; it contains %d pending bytes", n) } fq.pq.MustWriteBlock(block) - return + return true } if len(fq.ch) == cap(fq.ch) { // There is no space in the in-memory queue. Put the data to file-based queue. + if !isPQWritesAllowed { + return false + } fq.flushInmemoryBlocksToFileLocked() fq.pq.MustWriteBlock(block) - return + return true } // There is enough space in the in-memory queue. bb := blockBufPool.Get() @@ -162,6 +201,7 @@ func (fq *FastQueue) MustWriteBlock(block []byte) { // Notify potentially blocked reader. // See https://github.com/VictoriaMetrics/VictoriaMetrics/pull/484 for the context. fq.cond.Signal() + return true } // MustReadBlock reads the next block from fq to dst and returns it. diff --git a/lib/persistentqueue/fastqueue_test.go b/lib/persistentqueue/fastqueue_test.go index 0fd1fb847..198f73884 100644 --- a/lib/persistentqueue/fastqueue_test.go +++ b/lib/persistentqueue/fastqueue_test.go @@ -11,7 +11,7 @@ func TestFastQueueOpenClose(_ *testing.T) { path := "fast-queue-open-close" mustDeleteDir(path) for i := 0; i < 10; i++ { - fq := MustOpenFastQueue(path, "foobar", 100, 0) + fq := MustOpenFastQueue(path, "foobar", 100, 0, true) fq.MustClose() } mustDeleteDir(path) @@ -22,14 +22,16 @@ func TestFastQueueWriteReadInmemory(t *testing.T) { mustDeleteDir(path) capacity := 100 - fq := MustOpenFastQueue(path, "foobar", capacity, 0) + fq := MustOpenFastQueue(path, "foobar", capacity, 0, true) if n := fq.GetInmemoryQueueLen(); n != 0 { t.Fatalf("unexpected non-zero inmemory queue size: %d", n) } var blocks []string for i := 0; i < capacity; i++ { block := fmt.Sprintf("block %d", i) - fq.MustWriteBlock([]byte(block)) + if !fq.WriteBlock([]byte(block)) { + t.Fatalf("unexpected false for WriteBlock") + } blocks = append(blocks, block) } if n := fq.GetInmemoryQueueLen(); n != capacity { @@ -53,14 +55,16 @@ func TestFastQueueWriteReadMixed(t *testing.T) { mustDeleteDir(path) capacity := 100 - fq := MustOpenFastQueue(path, "foobar", capacity, 0) + fq := MustOpenFastQueue(path, "foobar", capacity, 0, false) if n := fq.GetPendingBytes(); n != 0 { t.Fatalf("the number of pending bytes must be 0; got %d", n) } var blocks []string for i := 0; i < 2*capacity; i++ { block := fmt.Sprintf("block %d", i) - fq.MustWriteBlock([]byte(block)) + if !fq.WriteBlock([]byte(block)) { + t.Fatalf("not expected WriteBlock fail") + } blocks = append(blocks, block) } if n := fq.GetPendingBytes(); n == 0 { @@ -87,17 +91,20 @@ func TestFastQueueWriteReadWithCloses(t *testing.T) { mustDeleteDir(path) capacity := 100 - fq := MustOpenFastQueue(path, "foobar", capacity, 0) + fq := MustOpenFastQueue(path, "foobar", capacity, 0, false) if n := fq.GetPendingBytes(); n != 0 { t.Fatalf("the number of pending bytes must be 0; got %d", n) } var blocks []string for i := 0; i < 2*capacity; i++ { block := fmt.Sprintf("block %d", i) - fq.MustWriteBlock([]byte(block)) + if !fq.WriteBlock([]byte(block)) { + t.Fatalf("unexpected false for WriteBlock") + } + blocks = append(blocks, block) fq.MustClose() - fq = MustOpenFastQueue(path, "foobar", capacity, 0) + fq = MustOpenFastQueue(path, "foobar", capacity, 0, false) } if n := fq.GetPendingBytes(); n == 0 { t.Fatalf("the number of pending bytes must be greater than 0") @@ -111,7 +118,7 @@ func TestFastQueueWriteReadWithCloses(t *testing.T) { t.Fatalf("unexpected block read; got %q; want %q", buf, block) } fq.MustClose() - fq = MustOpenFastQueue(path, "foobar", capacity, 0) + fq = MustOpenFastQueue(path, "foobar", capacity, 0, false) } if n := fq.GetPendingBytes(); n != 0 { t.Fatalf("the number of pending bytes must be 0; got %d", n) @@ -124,7 +131,7 @@ func TestFastQueueReadUnblockByClose(t *testing.T) { path := "fast-queue-read-unblock-by-close" mustDeleteDir(path) - fq := MustOpenFastQueue(path, "foorbar", 123, 0) + fq := MustOpenFastQueue(path, "foorbar", 123, 0, false) resultCh := make(chan error) go func() { data, ok := fq.MustReadBlock(nil) @@ -154,7 +161,7 @@ func TestFastQueueReadUnblockByWrite(t *testing.T) { path := "fast-queue-read-unblock-by-write" mustDeleteDir(path) - fq := MustOpenFastQueue(path, "foobar", 13, 0) + fq := MustOpenFastQueue(path, "foobar", 13, 0, false) block := "foodsafdsaf sdf" resultCh := make(chan error) go func() { @@ -169,7 +176,9 @@ func TestFastQueueReadUnblockByWrite(t *testing.T) { } resultCh <- nil }() - fq.MustWriteBlock([]byte(block)) + if !fq.WriteBlock([]byte(block)) { + t.Fatalf("unexpected false for WriteBlock") + } select { case err := <-resultCh: if err != nil { @@ -186,7 +195,7 @@ func TestFastQueueReadWriteConcurrent(t *testing.T) { path := "fast-queue-read-write-concurrent" mustDeleteDir(path) - fq := MustOpenFastQueue(path, "foobar", 5, 0) + fq := MustOpenFastQueue(path, "foobar", 5, 0, false) var blocks []string blocksMap := make(map[string]bool) @@ -226,7 +235,10 @@ func TestFastQueueReadWriteConcurrent(t *testing.T) { go func() { defer writersWG.Done() for block := range blocksCh { - fq.MustWriteBlock([]byte(block)) + if !fq.WriteBlock([]byte(block)) { + t.Errorf("unexpected false for WriteBlock") + return + } } }() } @@ -250,7 +262,7 @@ func TestFastQueueReadWriteConcurrent(t *testing.T) { readersWG.Wait() // Collect the remaining data - fq = MustOpenFastQueue(path, "foobar", 5, 0) + fq = MustOpenFastQueue(path, "foobar", 5, 0, false) resultCh := make(chan error) go func() { for len(blocksMap) > 0 { @@ -278,3 +290,80 @@ func TestFastQueueReadWriteConcurrent(t *testing.T) { fq.MustClose() mustDeleteDir(path) } + +func TestFastQueueWriteReadWithDisabledPQ(t *testing.T) { + path := "fast-queue-write-read-inmemory-disabled-pq" + mustDeleteDir(path) + + capacity := 20 + fq := MustOpenFastQueue(path, "foobar", capacity, 0, true) + if n := fq.GetInmemoryQueueLen(); n != 0 { + t.Fatalf("unexpected non-zero inmemory queue size: %d", n) + } + var blocks []string + for i := 0; i < capacity; i++ { + block := fmt.Sprintf("block %d", i) + if !fq.WriteBlock([]byte(block)) { + t.Fatalf("unexpected false for WriteBlock") + } + blocks = append(blocks, block) + } + if fq.WriteBlock([]byte("error-block")) { + t.Fatalf("expect false due to full queue") + } + + fq.MustClose() + fq = MustOpenFastQueue(path, "foobar", capacity, 0, true) + for _, block := range blocks { + buf, ok := fq.MustReadBlock(nil) + if !ok { + t.Fatalf("unexpected ok=false") + } + if string(buf) != block { + t.Fatalf("unexpected block read; got %q; want %q", buf, block) + } + } + fq.MustClose() + mustDeleteDir(path) +} + +func TestFastQueueWriteReadWithIgnoreDisabledPQ(t *testing.T) { + path := "fast-queue-write-read-inmemory-disabled-pq-force-write" + mustDeleteDir(path) + + capacity := 20 + fq := MustOpenFastQueue(path, "foobar", capacity, 0, true) + if n := fq.GetInmemoryQueueLen(); n != 0 { + t.Fatalf("unexpected non-zero inmemory queue size: %d", n) + } + var blocks []string + for i := 0; i < capacity; i++ { + block := fmt.Sprintf("block %d", i) + if !fq.WriteBlock([]byte(block)) { + t.Fatalf("unexpected false for WriteBlock") + } + blocks = append(blocks, block) + } + if fq.WriteBlock([]byte("error-block")) { + t.Fatalf("expect false due to full queue") + } + for i := 0; i < capacity; i++ { + block := fmt.Sprintf("block %d-%d", i, i) + fq.MustWriteBlockIgnoreDisabledPQ([]byte(block)) + blocks = append(blocks, block) + } + + fq.MustClose() + fq = MustOpenFastQueue(path, "foobar", capacity, 0, true) + for _, block := range blocks { + buf, ok := fq.MustReadBlock(nil) + if !ok { + t.Fatalf("unexpected ok=false") + } + if string(buf) != block { + t.Fatalf("unexpected block read; got %q; want %q", buf, block) + } + } + fq.MustClose() + mustDeleteDir(path) +} diff --git a/lib/persistentqueue/fastqueue_timing_test.go b/lib/persistentqueue/fastqueue_timing_test.go index 1ec46801b..b14daf4da 100644 --- a/lib/persistentqueue/fastqueue_timing_test.go +++ b/lib/persistentqueue/fastqueue_timing_test.go @@ -16,13 +16,13 @@ func BenchmarkFastQueueThroughputSerial(b *testing.B) { b.SetBytes(int64(blockSize) * iterationsCount) path := fmt.Sprintf("bench-fast-queue-throughput-serial-%d", blockSize) mustDeleteDir(path) - fq := MustOpenFastQueue(path, "foobar", iterationsCount*2, 0) + fq := MustOpenFastQueue(path, "foobar", iterationsCount*2, 0, false) defer func() { fq.MustClose() mustDeleteDir(path) }() for i := 0; i < b.N; i++ { - writeReadIterationFastQueue(fq, block, iterationsCount) + writeReadIterationFastQueue(b, fq, block, iterationsCount) } }) } @@ -37,23 +37,25 @@ func BenchmarkFastQueueThroughputConcurrent(b *testing.B) { b.SetBytes(int64(blockSize) * iterationsCount) path := fmt.Sprintf("bench-fast-queue-throughput-concurrent-%d", blockSize) mustDeleteDir(path) - fq := MustOpenFastQueue(path, "foobar", iterationsCount*cgroup.AvailableCPUs()*2, 0) + fq := MustOpenFastQueue(path, "foobar", iterationsCount*cgroup.AvailableCPUs()*2, 0, false) defer func() { fq.MustClose() mustDeleteDir(path) }() b.RunParallel(func(pb *testing.PB) { for pb.Next() { - writeReadIterationFastQueue(fq, block, iterationsCount) + writeReadIterationFastQueue(b, fq, block, iterationsCount) } }) }) } } -func writeReadIterationFastQueue(fq *FastQueue, block []byte, iterationsCount int) { +func writeReadIterationFastQueue(b *testing.B, fq *FastQueue, block []byte, iterationsCount int) { for i := 0; i < iterationsCount; i++ { - fq.MustWriteBlock(block) + if !fq.WriteBlock(block) { + b.Fatalf("unexpected false for WriteBlock") + } } var ok bool bb := bbPool.Get() diff --git a/lib/promscrape/scraper.go b/lib/promscrape/scraper.go index 0880ac204..1428bdee2 100644 --- a/lib/promscrape/scraper.go +++ b/lib/promscrape/scraper.go @@ -59,13 +59,18 @@ func CheckConfig() error { // Init initializes Prometheus scraper with config from the `-promscrape.config`. // // Scraped data is passed to pushData. -func Init(pushData func(at *auth.Token, wr *prompbmarshal.WriteRequest)) { +func Init(pushData func(at *auth.Token, wr *prompbmarshal.WriteRequest) bool) { mustInitClusterMemberID() + pushDataTrackDropped := func(at *auth.Token, wr *prompbmarshal.WriteRequest) { + if !pushData(at, wr) { + pushDataFailsTotal.Inc() + } + } globalStopChan = make(chan struct{}) scraperWG.Add(1) go func() { defer scraperWG.Done() - runScraper(*promscrapeConfigFile, pushData, globalStopChan) + runScraper(*promscrapeConfigFile, pushDataTrackDropped, globalStopChan) }() } @@ -84,6 +89,8 @@ var ( // configData contains -promscrape.config data configData atomic.Pointer[[]byte] + + pushDataFailsTotal = metrics.NewCounter(`vm_promscrape_push_samples_dropped_total`) ) // WriteConfigData writes -promscrape.config contents to w