app/vmagent: allow to disabled on-disk persistence (#5088)

* app/vmagent: allow to disabled on-disk queue
Previously, it wasn't possible to build data processing pipeline with a
chain of vmagents. In case when remoteWrite for the last vmagent in the
chain wasn't accessible, it persisted data only when it has enough disk
capacity. If disk queue is full, it started to silently drop ingested
metrics.

New flags allows to disable on-disk persistent and immediatly return an
error if remoteWrite is not accessible anymore. It blocks any writes and
notify client, that data ingestion isn't possible.

Main use case for this feature - use external queue such as kafka for
data persistence.
https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2110

* adds test, updates readme

* apply review suggestions

* update docs for vmagent

* makes linter happy

---------

Co-authored-by: Aliaksandr Valialkin <valyala@victoriametrics.com>
This commit is contained in:
Nikolay 2023-11-24 13:42:11 +01:00 committed by GitHub
parent a7800cdb95
commit 090cb2c9de
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
23 changed files with 410 additions and 108 deletions

View file

@ -65,7 +65,9 @@ func insertRows(at *auth.Token, rows []parser.Row, extraLabels []prompbmarshal.L
ctx.WriteRequest.Timeseries = tssDst ctx.WriteRequest.Timeseries = tssDst
ctx.Labels = labels ctx.Labels = labels
ctx.Samples = samples ctx.Samples = samples
remotewrite.Push(at, &ctx.WriteRequest) if !remotewrite.Push(at, &ctx.WriteRequest) {
return remotewrite.ErrQueueFullHTTPRetry
}
rowsInserted.Add(len(rows)) rowsInserted.Add(len(rows))
if at != nil { if at != nil {
rowsTenantInserted.Get(at).Add(len(rows)) rowsTenantInserted.Get(at).Add(len(rows))

View file

@ -88,7 +88,9 @@ func insertRows(at *auth.Token, series []datadog.Series, extraLabels []prompbmar
ctx.WriteRequest.Timeseries = tssDst ctx.WriteRequest.Timeseries = tssDst
ctx.Labels = labels ctx.Labels = labels
ctx.Samples = samples ctx.Samples = samples
remotewrite.Push(at, &ctx.WriteRequest) if !remotewrite.Push(at, &ctx.WriteRequest) {
return remotewrite.ErrQueueFullHTTPRetry
}
rowsInserted.Add(rowsTotal) rowsInserted.Add(rowsTotal)
if at != nil { if at != nil {
rowsTenantInserted.Get(at).Add(rowsTotal) rowsTenantInserted.Get(at).Add(rowsTotal)

View file

@ -56,7 +56,9 @@ func insertRows(rows []parser.Row) error {
ctx.WriteRequest.Timeseries = tssDst ctx.WriteRequest.Timeseries = tssDst
ctx.Labels = labels ctx.Labels = labels
ctx.Samples = samples ctx.Samples = samples
remotewrite.Push(nil, &ctx.WriteRequest) if !remotewrite.Push(nil, &ctx.WriteRequest) {
return remotewrite.ErrQueueFullHTTPRetry
}
rowsInserted.Add(len(rows)) rowsInserted.Add(len(rows))
rowsPerInsert.Update(float64(len(rows))) rowsPerInsert.Update(float64(len(rows)))
return nil return nil

View file

@ -130,7 +130,9 @@ func insertRows(at *auth.Token, db string, rows []parser.Row, extraLabels []prom
ctx.ctx.Labels = labels ctx.ctx.Labels = labels
ctx.ctx.Samples = samples ctx.ctx.Samples = samples
ctx.commonLabels = commonLabels ctx.commonLabels = commonLabels
remotewrite.Push(at, &ctx.ctx.WriteRequest) if !remotewrite.Push(at, &ctx.ctx.WriteRequest) {
return remotewrite.ErrQueueFullHTTPRetry
}
rowsInserted.Add(rowsTotal) rowsInserted.Add(rowsTotal)
if at != nil { if at != nil {
rowsTenantInserted.Get(at).Add(rowsTotal) rowsTenantInserted.Get(at).Add(rowsTotal)

View file

@ -84,6 +84,8 @@ func insertRows(at *auth.Token, block *stream.Block, extraLabels []prompbmarshal
ctx.WriteRequest.Timeseries = tssDst ctx.WriteRequest.Timeseries = tssDst
ctx.Labels = labels ctx.Labels = labels
ctx.Samples = samples ctx.Samples = samples
remotewrite.Push(at, &ctx.WriteRequest) if !remotewrite.Push(at, &ctx.WriteRequest) {
return remotewrite.ErrQueueFullHTTPRetry
}
return nil return nil
} }

View file

@ -76,7 +76,9 @@ func insertRows(at *auth.Token, rows []newrelic.Row, extraLabels []prompbmarshal
ctx.WriteRequest.Timeseries = tssDst ctx.WriteRequest.Timeseries = tssDst
ctx.Labels = labels ctx.Labels = labels
ctx.Samples = samples ctx.Samples = samples
remotewrite.Push(at, &ctx.WriteRequest) if !remotewrite.Push(at, &ctx.WriteRequest) {
return remotewrite.ErrQueueFullHTTPRetry
}
rowsInserted.Add(len(rows)) rowsInserted.Add(len(rows))
if at != nil { if at != nil {
rowsTenantInserted.Get(at).Add(samplesCount) rowsTenantInserted.Get(at).Add(samplesCount)

View file

@ -59,7 +59,9 @@ func insertRows(at *auth.Token, tss []prompbmarshal.TimeSeries, extraLabels []pr
ctx.WriteRequest.Timeseries = tssDst ctx.WriteRequest.Timeseries = tssDst
ctx.Labels = labels ctx.Labels = labels
ctx.Samples = samples ctx.Samples = samples
remotewrite.Push(at, &ctx.WriteRequest) if !remotewrite.Push(at, &ctx.WriteRequest) {
return remotewrite.ErrQueueFullHTTPRetry
}
rowsInserted.Add(rowsTotal) rowsInserted.Add(rowsTotal)
if at != nil { if at != nil {
rowsTenantInserted.Get(at).Add(rowsTotal) rowsTenantInserted.Get(at).Add(rowsTotal)

View file

@ -56,7 +56,9 @@ func insertRows(rows []parser.Row) error {
ctx.WriteRequest.Timeseries = tssDst ctx.WriteRequest.Timeseries = tssDst
ctx.Labels = labels ctx.Labels = labels
ctx.Samples = samples ctx.Samples = samples
remotewrite.Push(nil, &ctx.WriteRequest) if !remotewrite.Push(nil, &ctx.WriteRequest) {
return remotewrite.ErrQueueFullHTTPRetry
}
rowsInserted.Add(len(rows)) rowsInserted.Add(len(rows))
rowsPerInsert.Update(float64(len(rows))) rowsPerInsert.Update(float64(len(rows)))
return nil return nil

View file

@ -64,7 +64,9 @@ func insertRows(at *auth.Token, rows []parser.Row, extraLabels []prompbmarshal.L
ctx.WriteRequest.Timeseries = tssDst ctx.WriteRequest.Timeseries = tssDst
ctx.Labels = labels ctx.Labels = labels
ctx.Samples = samples ctx.Samples = samples
remotewrite.Push(at, &ctx.WriteRequest) if !remotewrite.Push(at, &ctx.WriteRequest) {
return remotewrite.ErrQueueFullHTTPRetry
}
rowsInserted.Add(len(rows)) rowsInserted.Add(len(rows))
rowsPerInsert.Update(float64(len(rows))) rowsPerInsert.Update(float64(len(rows)))
return nil return nil

View file

@ -73,7 +73,9 @@ func insertRows(at *auth.Token, rows []parser.Row, extraLabels []prompbmarshal.L
ctx.WriteRequest.Timeseries = tssDst ctx.WriteRequest.Timeseries = tssDst
ctx.Labels = labels ctx.Labels = labels
ctx.Samples = samples ctx.Samples = samples
remotewrite.Push(at, &ctx.WriteRequest) if !remotewrite.Push(at, &ctx.WriteRequest) {
return remotewrite.ErrQueueFullHTTPRetry
}
rowsInserted.Add(len(rows)) rowsInserted.Add(len(rows))
if at != nil { if at != nil {
rowsTenantInserted.Get(at).Add(len(rows)) rowsTenantInserted.Get(at).Add(len(rows))

View file

@ -69,7 +69,9 @@ func insertRows(at *auth.Token, timeseries []prompb.TimeSeries, extraLabels []pr
ctx.WriteRequest.Timeseries = tssDst ctx.WriteRequest.Timeseries = tssDst
ctx.Labels = labels ctx.Labels = labels
ctx.Samples = samples ctx.Samples = samples
remotewrite.Push(at, &ctx.WriteRequest) if !remotewrite.Push(at, &ctx.WriteRequest) {
return remotewrite.ErrQueueFullHTTPRetry
}
rowsInserted.Add(rowsTotal) rowsInserted.Add(rowsTotal)
if at != nil { if at != nil {
rowsTenantInserted.Get(at).Add(rowsTotal) rowsTenantInserted.Get(at).Add(rowsTotal)

View file

@ -305,7 +305,7 @@ func (c *client) runWorker() {
continue continue
} }
// Return unsent block to the queue. // Return unsent block to the queue.
c.fq.MustWriteBlock(block) c.fq.MustWriteBlockIgnoreDisabledPQ(block)
return return
case <-c.stopCh: case <-c.stopCh:
// c must be stopped. Wait for a while in the hope the block will be sent. // c must be stopped. Wait for a while in the hope the block will be sent.
@ -314,11 +314,11 @@ func (c *client) runWorker() {
case ok := <-ch: case ok := <-ch:
if !ok { if !ok {
// Return unsent block to the queue. // Return unsent block to the queue.
c.fq.MustWriteBlock(block) c.fq.MustWriteBlockIgnoreDisabledPQ(block)
} }
case <-time.After(graceDuration): case <-time.After(graceDuration):
// Return unsent block to the queue. // Return unsent block to the queue.
c.fq.MustWriteBlock(block) c.fq.MustWriteBlockIgnoreDisabledPQ(block)
} }
return return
} }

View file

@ -37,9 +37,9 @@ type pendingSeries struct {
periodicFlusherWG sync.WaitGroup periodicFlusherWG sync.WaitGroup
} }
func newPendingSeries(pushBlock func(block []byte), isVMRemoteWrite bool, significantFigures, roundDigits int) *pendingSeries { func newPendingSeries(fq *persistentqueue.FastQueue, isVMRemoteWrite bool, significantFigures, roundDigits int) *pendingSeries {
var ps pendingSeries var ps pendingSeries
ps.wr.pushBlock = pushBlock ps.wr.fq = fq
ps.wr.isVMRemoteWrite = isVMRemoteWrite ps.wr.isVMRemoteWrite = isVMRemoteWrite
ps.wr.significantFigures = significantFigures ps.wr.significantFigures = significantFigures
ps.wr.roundDigits = roundDigits ps.wr.roundDigits = roundDigits
@ -57,10 +57,11 @@ func (ps *pendingSeries) MustStop() {
ps.periodicFlusherWG.Wait() ps.periodicFlusherWG.Wait()
} }
func (ps *pendingSeries) Push(tss []prompbmarshal.TimeSeries) { func (ps *pendingSeries) Push(tss []prompbmarshal.TimeSeries) bool {
ps.mu.Lock() ps.mu.Lock()
ps.wr.push(tss) wasPushed := ps.wr.push(tss)
ps.mu.Unlock() ps.mu.Unlock()
return wasPushed
} }
func (ps *pendingSeries) periodicFlusher() { func (ps *pendingSeries) periodicFlusher() {
@ -70,18 +71,21 @@ func (ps *pendingSeries) periodicFlusher() {
} }
ticker := time.NewTicker(*flushInterval) ticker := time.NewTicker(*flushInterval)
defer ticker.Stop() defer ticker.Stop()
mustStop := false for {
for !mustStop {
select { select {
case <-ps.stopCh: case <-ps.stopCh:
mustStop = true ps.mu.Lock()
ps.wr.mustFlushOnStop()
ps.mu.Unlock()
return
case <-ticker.C: case <-ticker.C:
if fasttime.UnixTimestamp()-atomic.LoadUint64(&ps.wr.lastFlushTime) < uint64(flushSeconds) { if fasttime.UnixTimestamp()-atomic.LoadUint64(&ps.wr.lastFlushTime) < uint64(flushSeconds) {
continue continue
} }
} }
ps.mu.Lock() ps.mu.Lock()
ps.wr.flush() // no-op
_ = ps.wr.flush()
ps.mu.Unlock() ps.mu.Unlock()
} }
} }
@ -90,8 +94,7 @@ type writeRequest struct {
// Move lastFlushTime to the top of the struct in order to guarantee atomic access on 32-bit architectures. // Move lastFlushTime to the top of the struct in order to guarantee atomic access on 32-bit architectures.
lastFlushTime uint64 lastFlushTime uint64
// pushBlock is called when whe write request is ready to be sent. fq *persistentqueue.FastQueue
pushBlock func(block []byte)
// Whether to encode the write request with VictoriaMetrics remote write protocol. // Whether to encode the write request with VictoriaMetrics remote write protocol.
isVMRemoteWrite bool isVMRemoteWrite bool
@ -130,14 +133,32 @@ func (wr *writeRequest) reset() {
wr.buf = wr.buf[:0] wr.buf = wr.buf[:0]
} }
func (wr *writeRequest) flush() { // mustFlushOnStop makes force push into the queue
// needed to properly save in-memory buffer with disabled disk storage
func (wr *writeRequest) mustFlushOnStop() {
wr.wr.Timeseries = wr.tss wr.wr.Timeseries = wr.tss
wr.adjustSampleValues() wr.adjustSampleValues()
atomic.StoreUint64(&wr.lastFlushTime, fasttime.UnixTimestamp()) atomic.StoreUint64(&wr.lastFlushTime, fasttime.UnixTimestamp())
pushWriteRequest(&wr.wr, wr.pushBlock, wr.isVMRemoteWrite) if !pushWriteRequest(&wr.wr, func(block []byte) bool {
wr.fq.MustWriteBlockIgnoreDisabledPQ(block)
return true
}, wr.isVMRemoteWrite) {
return
}
wr.reset() wr.reset()
} }
func (wr *writeRequest) flush() bool {
wr.wr.Timeseries = wr.tss
wr.adjustSampleValues()
atomic.StoreUint64(&wr.lastFlushTime, fasttime.UnixTimestamp())
if !pushWriteRequest(&wr.wr, wr.fq.WriteBlock, wr.isVMRemoteWrite) {
return false
}
wr.reset()
return true
}
func (wr *writeRequest) adjustSampleValues() { func (wr *writeRequest) adjustSampleValues() {
samples := wr.samples samples := wr.samples
if n := wr.significantFigures; n > 0 { if n := wr.significantFigures; n > 0 {
@ -154,21 +175,25 @@ func (wr *writeRequest) adjustSampleValues() {
} }
} }
func (wr *writeRequest) push(src []prompbmarshal.TimeSeries) { func (wr *writeRequest) push(src []prompbmarshal.TimeSeries) bool {
tssDst := wr.tss tssDst := wr.tss
maxSamplesPerBlock := *maxRowsPerBlock maxSamplesPerBlock := *maxRowsPerBlock
// Allow up to 10x of labels per each block on average. // Allow up to 10x of labels per each block on average.
maxLabelsPerBlock := 10 * maxSamplesPerBlock maxLabelsPerBlock := 10 * maxSamplesPerBlock
for i := range src { for i := range src {
tssDst = append(tssDst, prompbmarshal.TimeSeries{})
wr.copyTimeSeries(&tssDst[len(tssDst)-1], &src[i])
if len(wr.samples) >= maxSamplesPerBlock || len(wr.labels) >= maxLabelsPerBlock { if len(wr.samples) >= maxSamplesPerBlock || len(wr.labels) >= maxLabelsPerBlock {
wr.tss = tssDst wr.tss = tssDst
wr.flush() if !wr.flush() {
return false
}
tssDst = wr.tss tssDst = wr.tss
} }
tssDst = append(tssDst, prompbmarshal.TimeSeries{})
wr.copyTimeSeries(&tssDst[len(tssDst)-1], &src[i])
} }
wr.tss = tssDst wr.tss = tssDst
return true
} }
func (wr *writeRequest) copyTimeSeries(dst, src *prompbmarshal.TimeSeries) { func (wr *writeRequest) copyTimeSeries(dst, src *prompbmarshal.TimeSeries) {
@ -196,10 +221,10 @@ func (wr *writeRequest) copyTimeSeries(dst, src *prompbmarshal.TimeSeries) {
wr.buf = buf wr.buf = buf
} }
func pushWriteRequest(wr *prompbmarshal.WriteRequest, pushBlock func(block []byte), isVMRemoteWrite bool) { func pushWriteRequest(wr *prompbmarshal.WriteRequest, pushBlock func(block []byte) bool, isVMRemoteWrite bool) bool {
if len(wr.Timeseries) == 0 { if len(wr.Timeseries) == 0 {
// Nothing to push // Nothing to push
return return true
} }
bb := writeRequestBufPool.Get() bb := writeRequestBufPool.Get()
bb.B = prompbmarshal.MarshalWriteRequest(bb.B[:0], wr) bb.B = prompbmarshal.MarshalWriteRequest(bb.B[:0], wr)
@ -212,11 +237,13 @@ func pushWriteRequest(wr *prompbmarshal.WriteRequest, pushBlock func(block []byt
} }
writeRequestBufPool.Put(bb) writeRequestBufPool.Put(bb)
if len(zb.B) <= persistentqueue.MaxBlockSize { if len(zb.B) <= persistentqueue.MaxBlockSize {
pushBlock(zb.B) if !pushBlock(zb.B) {
return false
}
blockSizeRows.Update(float64(len(wr.Timeseries))) blockSizeRows.Update(float64(len(wr.Timeseries)))
blockSizeBytes.Update(float64(len(zb.B))) blockSizeBytes.Update(float64(len(zb.B)))
snappyBufPool.Put(zb) snappyBufPool.Put(zb)
return return true
} }
snappyBufPool.Put(zb) snappyBufPool.Put(zb)
} else { } else {
@ -229,23 +256,32 @@ func pushWriteRequest(wr *prompbmarshal.WriteRequest, pushBlock func(block []byt
samples := wr.Timeseries[0].Samples samples := wr.Timeseries[0].Samples
if len(samples) == 1 { if len(samples) == 1 {
logger.Warnf("dropping a sample for metric with too long labels exceeding -remoteWrite.maxBlockSize=%d bytes", maxUnpackedBlockSize.N) logger.Warnf("dropping a sample for metric with too long labels exceeding -remoteWrite.maxBlockSize=%d bytes", maxUnpackedBlockSize.N)
return return true
} }
n := len(samples) / 2 n := len(samples) / 2
wr.Timeseries[0].Samples = samples[:n] wr.Timeseries[0].Samples = samples[:n]
pushWriteRequest(wr, pushBlock, isVMRemoteWrite) if !pushWriteRequest(wr, pushBlock, isVMRemoteWrite) {
return false
}
wr.Timeseries[0].Samples = samples[n:] wr.Timeseries[0].Samples = samples[n:]
pushWriteRequest(wr, pushBlock, isVMRemoteWrite) if !pushWriteRequest(wr, pushBlock, isVMRemoteWrite) {
return false
}
wr.Timeseries[0].Samples = samples wr.Timeseries[0].Samples = samples
return return true
} }
timeseries := wr.Timeseries timeseries := wr.Timeseries
n := len(timeseries) / 2 n := len(timeseries) / 2
wr.Timeseries = timeseries[:n] wr.Timeseries = timeseries[:n]
pushWriteRequest(wr, pushBlock, isVMRemoteWrite) if !pushWriteRequest(wr, pushBlock, isVMRemoteWrite) {
return false
}
wr.Timeseries = timeseries[n:] wr.Timeseries = timeseries[n:]
pushWriteRequest(wr, pushBlock, isVMRemoteWrite) if !pushWriteRequest(wr, pushBlock, isVMRemoteWrite) {
return false
}
wr.Timeseries = timeseries wr.Timeseries = timeseries
return true
} }
var ( var (

View file

@ -26,13 +26,14 @@ func testPushWriteRequest(t *testing.T, rowsCount, expectedBlockLenProm, expecte
t.Helper() t.Helper()
wr := newTestWriteRequest(rowsCount, 20) wr := newTestWriteRequest(rowsCount, 20)
pushBlockLen := 0 pushBlockLen := 0
pushBlock := func(block []byte) { pushBlock := func(block []byte) bool {
if pushBlockLen > 0 { if pushBlockLen > 0 {
panic(fmt.Errorf("BUG: pushBlock called multiple times; pushBlockLen=%d at first call, len(block)=%d at second call", pushBlockLen, len(block))) panic(fmt.Errorf("BUG: pushBlock called multiple times; pushBlockLen=%d at first call, len(block)=%d at second call", pushBlockLen, len(block)))
} }
pushBlockLen = len(block) pushBlockLen = len(block)
return true
} }
pushWriteRequest(wr, pushBlock, isVMRemoteWrite) _ = pushWriteRequest(wr, pushBlock, isVMRemoteWrite)
if math.Abs(float64(pushBlockLen-expectedBlockLen)/float64(expectedBlockLen)*100) > tolerancePrc { if math.Abs(float64(pushBlockLen-expectedBlockLen)/float64(expectedBlockLen)*100) > tolerancePrc {
t.Fatalf("unexpected block len for rowsCount=%d, isVMRemoteWrite=%v; got %d bytes; expecting %d bytes +- %.0f%%", t.Fatalf("unexpected block len for rowsCount=%d, isVMRemoteWrite=%v; got %d bytes; expecting %d bytes +- %.0f%%",
rowsCount, isVMRemoteWrite, pushBlockLen, expectedBlockLen, tolerancePrc) rowsCount, isVMRemoteWrite, pushBlockLen, expectedBlockLen, tolerancePrc)

View file

@ -3,6 +3,7 @@ package remotewrite
import ( import (
"flag" "flag"
"fmt" "fmt"
"net/http"
"net/url" "net/url"
"path/filepath" "path/filepath"
"strconv" "strconv"
@ -10,6 +11,8 @@ import (
"sync/atomic" "sync/atomic"
"time" "time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth" "github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bloomfilter" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bloomfilter"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
@ -84,6 +87,9 @@ var (
"are written to the corresponding -remoteWrite.url . See also -remoteWrite.streamAggr.keepInput and https://docs.victoriametrics.com/stream-aggregation.html") "are written to the corresponding -remoteWrite.url . See also -remoteWrite.streamAggr.keepInput and https://docs.victoriametrics.com/stream-aggregation.html")
streamAggrDedupInterval = flagutil.NewArrayDuration("remoteWrite.streamAggr.dedupInterval", 0, "Input samples are de-duplicated with this interval before being aggregated. "+ streamAggrDedupInterval = flagutil.NewArrayDuration("remoteWrite.streamAggr.dedupInterval", 0, "Input samples are de-duplicated with this interval before being aggregated. "+
"Only the last sample per each time series per each interval is aggregated if the interval is greater than zero") "Only the last sample per each time series per each interval is aggregated if the interval is greater than zero")
disableOnDiskQueue = flag.Bool("remoteWrite.disableOnDiskQueue", false, "Whether to disable on-disk queue for metrics ingestion processing. "+
"If in-memory queue is full for at least 1 remoteWrite target, all data ingestion is blocked and returns an error. "+
"It allows to build a chain of vmagents and build complicated data pipelines without data-loss. On-disk writes is still possible during graceful shutdown for storing in-memory part of the queue.")
) )
var ( var (
@ -96,6 +102,13 @@ var (
// Data without tenant id is written to defaultAuthToken if -remoteWrite.multitenantURL is specified. // Data without tenant id is written to defaultAuthToken if -remoteWrite.multitenantURL is specified.
defaultAuthToken = &auth.Token{} defaultAuthToken = &auth.Token{}
// ErrQueueFullHTTPRetry returned when -remoteWrite.disableOnDiskQueue enabled
// and one of remote storage cannot handle a load
ErrQueueFullHTTPRetry = &httpserver.ErrorWithStatusCode{
Err: fmt.Errorf("in-memory queue is full, write requests blocked due to enabled flag -remoteWrite.disableOnDiskQueue=true. Retry request later"),
StatusCode: http.StatusTooManyRequests,
}
) )
// MultitenancyEnabled returns true if -remoteWrite.multitenantURL is specified. // MultitenancyEnabled returns true if -remoteWrite.multitenantURL is specified.
@ -350,7 +363,7 @@ func Stop() {
// If at isn't nil, the data is pushed to the configured `-remoteWrite.multitenantURL`. // If at isn't nil, the data is pushed to the configured `-remoteWrite.multitenantURL`.
// //
// Note that wr may be modified by Push because of relabeling and rounding. // Note that wr may be modified by Push because of relabeling and rounding.
func Push(at *auth.Token, wr *prompbmarshal.WriteRequest) { func Push(at *auth.Token, wr *prompbmarshal.WriteRequest) bool {
if at == nil && len(*remoteWriteMultitenantURLs) > 0 { if at == nil && len(*remoteWriteMultitenantURLs) > 0 {
// Write data to default tenant if at isn't set while -remoteWrite.multitenantURL is set. // Write data to default tenant if at isn't set while -remoteWrite.multitenantURL is set.
at = defaultAuthToken at = defaultAuthToken
@ -374,6 +387,17 @@ func Push(at *auth.Token, wr *prompbmarshal.WriteRequest) {
} }
rwctxsMapLock.Unlock() rwctxsMapLock.Unlock()
} }
var isWritesLocked bool
for _, rwctx := range rwctxs {
if rwctx.fq.IsWritesBlocked() {
isWritesLocked = true
break
}
}
// fast path, write path is blocked
if isWritesLocked {
return false
}
var rctx *relabelCtx var rctx *relabelCtx
rcs := allRelabelConfigs.Load() rcs := allRelabelConfigs.Load()
@ -415,7 +439,13 @@ func Push(at *auth.Token, wr *prompbmarshal.WriteRequest) {
} }
sortLabelsIfNeeded(tssBlock) sortLabelsIfNeeded(tssBlock)
tssBlock = limitSeriesCardinality(tssBlock) tssBlock = limitSeriesCardinality(tssBlock)
pushBlockToRemoteStorages(rwctxs, tssBlock) if !pushBlockToRemoteStorages(rwctxs, tssBlock) {
if rctx != nil {
rctx.reset()
putRelabelCtx(rctx)
}
return false
}
if rctx != nil { if rctx != nil {
rctx.reset() rctx.reset()
} }
@ -423,17 +453,19 @@ func Push(at *auth.Token, wr *prompbmarshal.WriteRequest) {
if rctx != nil { if rctx != nil {
putRelabelCtx(rctx) putRelabelCtx(rctx)
} }
return true
} }
func pushBlockToRemoteStorages(rwctxs []*remoteWriteCtx, tssBlock []prompbmarshal.TimeSeries) { func pushBlockToRemoteStorages(rwctxs []*remoteWriteCtx, tssBlock []prompbmarshal.TimeSeries) bool {
if len(tssBlock) == 0 { if len(tssBlock) == 0 {
// Nothing to push // Nothing to push
return return true
} }
if len(rwctxs) == 1 { if len(rwctxs) == 1 {
// Fast path - just push data to the configured single remote storage // Fast path - just push data to the configured single remote storage
rwctxs[0].Push(tssBlock) err := rwctxs[0].Push(tssBlock)
return return err
} }
// We need to push tssBlock to multiple remote storages. // We need to push tssBlock to multiple remote storages.
@ -462,6 +494,7 @@ func pushBlockToRemoteStorages(rwctxs []*remoteWriteCtx, tssBlock []prompbmarsha
// the time needed for sending the data to multiple remote storage systems. // the time needed for sending the data to multiple remote storage systems.
var wg sync.WaitGroup var wg sync.WaitGroup
wg.Add(len(rwctxs)) wg.Add(len(rwctxs))
var anyPushFailed uint64
for i, rwctx := range rwctxs { for i, rwctx := range rwctxs {
tssShard := tssByURL[i] tssShard := tssByURL[i]
if len(tssShard) == 0 { if len(tssShard) == 0 {
@ -469,11 +502,13 @@ func pushBlockToRemoteStorages(rwctxs []*remoteWriteCtx, tssBlock []prompbmarsha
} }
go func(rwctx *remoteWriteCtx, tss []prompbmarshal.TimeSeries) { go func(rwctx *remoteWriteCtx, tss []prompbmarshal.TimeSeries) {
defer wg.Done() defer wg.Done()
rwctx.Push(tss) if !rwctx.Push(tss) {
atomic.StoreUint64(&anyPushFailed, 1)
}
}(rwctx, tssShard) }(rwctx, tssShard)
} }
wg.Wait() wg.Wait()
return return atomic.LoadUint64(&anyPushFailed) == 0
} }
// Replicate data among rwctxs. // Replicate data among rwctxs.
@ -481,13 +516,17 @@ func pushBlockToRemoteStorages(rwctxs []*remoteWriteCtx, tssBlock []prompbmarsha
// the time needed for sending the data to multiple remote storage systems. // the time needed for sending the data to multiple remote storage systems.
var wg sync.WaitGroup var wg sync.WaitGroup
wg.Add(len(rwctxs)) wg.Add(len(rwctxs))
var anyPushFailed uint64
for _, rwctx := range rwctxs { for _, rwctx := range rwctxs {
go func(rwctx *remoteWriteCtx) { go func(rwctx *remoteWriteCtx) {
defer wg.Done() defer wg.Done()
rwctx.Push(tssBlock) if !rwctx.Push(tssBlock) {
atomic.StoreUint64(&anyPushFailed, 1)
}
}(rwctx) }(rwctx)
} }
wg.Wait() wg.Wait()
return atomic.LoadUint64(&anyPushFailed) == 0
} }
// sortLabelsIfNeeded sorts labels if -sortLabels command-line flag is set. // sortLabelsIfNeeded sorts labels if -sortLabels command-line flag is set.
@ -590,8 +629,9 @@ type remoteWriteCtx struct {
pss []*pendingSeries pss []*pendingSeries
pssNextIdx uint64 pssNextIdx uint64
rowsPushedAfterRelabel *metrics.Counter rowsPushedAfterRelabel *metrics.Counter
rowsDroppedByRelabel *metrics.Counter rowsDroppedByRelabel *metrics.Counter
rowsDroppedAtAggregationOnPush *metrics.Counter
} }
func newRemoteWriteCtx(argIdx int, remoteWriteURL *url.URL, maxInmemoryBlocks int, sanitizedURL string) *remoteWriteCtx { func newRemoteWriteCtx(argIdx int, remoteWriteURL *url.URL, maxInmemoryBlocks int, sanitizedURL string) *remoteWriteCtx {
@ -607,13 +647,19 @@ func newRemoteWriteCtx(argIdx int, remoteWriteURL *url.URL, maxInmemoryBlocks in
logger.Warnf("rounding the -remoteWrite.maxDiskUsagePerURL=%d to the minimum supported value: %d", maxPendingBytes, persistentqueue.DefaultChunkFileSize) logger.Warnf("rounding the -remoteWrite.maxDiskUsagePerURL=%d to the minimum supported value: %d", maxPendingBytes, persistentqueue.DefaultChunkFileSize)
maxPendingBytes = persistentqueue.DefaultChunkFileSize maxPendingBytes = persistentqueue.DefaultChunkFileSize
} }
fq := persistentqueue.MustOpenFastQueue(queuePath, sanitizedURL, maxInmemoryBlocks, maxPendingBytes) fq := persistentqueue.MustOpenFastQueue(queuePath, sanitizedURL, maxInmemoryBlocks, maxPendingBytes, *disableOnDiskQueue)
_ = metrics.GetOrCreateGauge(fmt.Sprintf(`vmagent_remotewrite_pending_data_bytes{path=%q, url=%q}`, queuePath, sanitizedURL), func() float64 { _ = metrics.GetOrCreateGauge(fmt.Sprintf(`vmagent_remotewrite_pending_data_bytes{path=%q, url=%q}`, queuePath, sanitizedURL), func() float64 {
return float64(fq.GetPendingBytes()) return float64(fq.GetPendingBytes())
}) })
_ = metrics.GetOrCreateGauge(fmt.Sprintf(`vmagent_remotewrite_pending_inmemory_blocks{path=%q, url=%q}`, queuePath, sanitizedURL), func() float64 { _ = metrics.GetOrCreateGauge(fmt.Sprintf(`vmagent_remotewrite_pending_inmemory_blocks{path=%q, url=%q}`, queuePath, sanitizedURL), func() float64 {
return float64(fq.GetInmemoryQueueLen()) return float64(fq.GetInmemoryQueueLen())
}) })
_ = metrics.GetOrCreateGauge(fmt.Sprintf(`vmagent_remotewrite_inmemory_queue_blocked{path=%q, url=%q}`, queuePath, sanitizedURL), func() float64 {
if fq.IsWritesBlocked() {
return 1.0
}
return 0
})
var c *client var c *client
switch remoteWriteURL.Scheme { switch remoteWriteURL.Scheme {
@ -635,7 +681,7 @@ func newRemoteWriteCtx(argIdx int, remoteWriteURL *url.URL, maxInmemoryBlocks in
} }
pss := make([]*pendingSeries, pssLen) pss := make([]*pendingSeries, pssLen)
for i := range pss { for i := range pss {
pss[i] = newPendingSeries(fq.MustWriteBlock, c.useVMProto, sf, rd) pss[i] = newPendingSeries(fq, c.useVMProto, sf, rd)
} }
rwctx := &remoteWriteCtx{ rwctx := &remoteWriteCtx{
@ -644,15 +690,16 @@ func newRemoteWriteCtx(argIdx int, remoteWriteURL *url.URL, maxInmemoryBlocks in
c: c, c: c,
pss: pss, pss: pss,
rowsPushedAfterRelabel: metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_rows_pushed_after_relabel_total{path=%q, url=%q}`, queuePath, sanitizedURL)), rowsPushedAfterRelabel: metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_rows_pushed_after_relabel_total{path=%q, url=%q}`, queuePath, sanitizedURL)),
rowsDroppedByRelabel: metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_relabel_metrics_dropped_total{path=%q, url=%q}`, queuePath, sanitizedURL)), rowsDroppedByRelabel: metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_relabel_metrics_dropped_total{path=%q, url=%q}`, queuePath, sanitizedURL)),
rowsDroppedAtAggregationOnPush: metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_aggregation_metrics_dropped_total{path=%q, url=%q}`, queuePath, sanitizedURL)),
} }
// Initialize sas // Initialize sas
sasFile := streamAggrConfig.GetOptionalArg(argIdx) sasFile := streamAggrConfig.GetOptionalArg(argIdx)
if sasFile != "" { if sasFile != "" {
dedupInterval := streamAggrDedupInterval.GetOptionalArg(argIdx) dedupInterval := streamAggrDedupInterval.GetOptionalArg(argIdx)
sas, err := streamaggr.LoadFromFile(sasFile, rwctx.pushInternal, dedupInterval) sas, err := streamaggr.LoadFromFile(sasFile, rwctx.pushInternalTrackDropped, dedupInterval)
if err != nil { if err != nil {
logger.Fatalf("cannot initialize stream aggregators from -remoteWrite.streamAggr.config=%q: %s", sasFile, err) logger.Fatalf("cannot initialize stream aggregators from -remoteWrite.streamAggr.config=%q: %s", sasFile, err)
} }
@ -688,7 +735,7 @@ func (rwctx *remoteWriteCtx) MustStop() {
rwctx.rowsDroppedByRelabel = nil rwctx.rowsDroppedByRelabel = nil
} }
func (rwctx *remoteWriteCtx) Push(tss []prompbmarshal.TimeSeries) { func (rwctx *remoteWriteCtx) Push(tss []prompbmarshal.TimeSeries) bool {
// Apply relabeling // Apply relabeling
var rctx *relabelCtx var rctx *relabelCtx
var v *[]prompbmarshal.TimeSeries var v *[]prompbmarshal.TimeSeries
@ -726,14 +773,16 @@ func (rwctx *remoteWriteCtx) Push(tss []prompbmarshal.TimeSeries) {
} }
matchIdxsPool.Put(matchIdxs) matchIdxsPool.Put(matchIdxs)
} }
rwctx.pushInternal(tss) defer func() {
// Return back relabeling contexts to the pool
if rctx != nil {
*v = prompbmarshal.ResetTimeSeries(tss)
tssPool.Put(v)
putRelabelCtx(rctx)
}
}()
// Return back relabeling contexts to the pool return rwctx.pushInternal(tss)
if rctx != nil {
*v = prompbmarshal.ResetTimeSeries(tss)
tssPool.Put(v)
putRelabelCtx(rctx)
}
} }
var matchIdxsPool bytesutil.ByteBufferPool var matchIdxsPool bytesutil.ByteBufferPool
@ -753,9 +802,22 @@ func dropAggregatedSeries(src []prompbmarshal.TimeSeries, matchIdxs []byte, drop
return dst return dst
} }
func (rwctx *remoteWriteCtx) pushInternal(tss []prompbmarshal.TimeSeries) { func (rwctx *remoteWriteCtx) pushInternalTrackDropped(tss []prompbmarshal.TimeSeries) {
if !rwctx.pushInternal(tss) {
rwctx.rowsDroppedAtAggregationOnPush.Inc()
}
}
func (rwctx *remoteWriteCtx) pushInternal(tss []prompbmarshal.TimeSeries) bool {
var rctx *relabelCtx var rctx *relabelCtx
var v *[]prompbmarshal.TimeSeries var v *[]prompbmarshal.TimeSeries
defer func() {
if rctx != nil {
*v = prompbmarshal.ResetTimeSeries(tss)
tssPool.Put(v)
putRelabelCtx(rctx)
}
}()
if len(labelsGlobal) > 0 { if len(labelsGlobal) > 0 {
// Make a copy of tss before adding extra labels in order to prevent // Make a copy of tss before adding extra labels in order to prevent
// from affecting time series for other remoteWrite.url configs. // from affecting time series for other remoteWrite.url configs.
@ -767,13 +829,7 @@ func (rwctx *remoteWriteCtx) pushInternal(tss []prompbmarshal.TimeSeries) {
pss := rwctx.pss pss := rwctx.pss
idx := atomic.AddUint64(&rwctx.pssNextIdx, 1) % uint64(len(pss)) idx := atomic.AddUint64(&rwctx.pssNextIdx, 1) % uint64(len(pss))
pss[idx].Push(tss) return pss[idx].Push(tss)
if rctx != nil {
*v = prompbmarshal.ResetTimeSeries(tss)
tssPool.Put(v)
putRelabelCtx(rctx)
}
} }
func (rwctx *remoteWriteCtx) reinitStreamAggr() { func (rwctx *remoteWriteCtx) reinitStreamAggr() {
@ -786,7 +842,7 @@ func (rwctx *remoteWriteCtx) reinitStreamAggr() {
logger.Infof("reloading stream aggregation configs pointed by -remoteWrite.streamAggr.config=%q", sasFile) logger.Infof("reloading stream aggregation configs pointed by -remoteWrite.streamAggr.config=%q", sasFile)
metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reloads_total{path=%q}`, sasFile)).Inc() metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reloads_total{path=%q}`, sasFile)).Inc()
dedupInterval := streamAggrDedupInterval.GetOptionalArg(rwctx.idx) dedupInterval := streamAggrDedupInterval.GetOptionalArg(rwctx.idx)
sasNew, err := streamaggr.LoadFromFile(sasFile, rwctx.pushInternal, dedupInterval) sasNew, err := streamaggr.LoadFromFile(sasFile, rwctx.pushInternalTrackDropped, dedupInterval)
if err != nil { if err != nil {
metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reloads_errors_total{path=%q}`, sasFile)).Inc() metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reloads_errors_total{path=%q}`, sasFile)).Inc()
metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_successful{path=%q}`, sasFile)).Set(0) metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_successful{path=%q}`, sasFile)).Set(0)

View file

@ -76,7 +76,9 @@ func insertRows(at *auth.Token, rows []parser.Row, extraLabels []prompbmarshal.L
ctx.WriteRequest.Timeseries = tssDst ctx.WriteRequest.Timeseries = tssDst
ctx.Labels = labels ctx.Labels = labels
ctx.Samples = samples ctx.Samples = samples
remotewrite.Push(at, &ctx.WriteRequest) if !remotewrite.Push(at, &ctx.WriteRequest) {
return remotewrite.ErrQueueFullHTTPRetry
}
rowsInserted.Add(rowsTotal) rowsInserted.Add(rowsTotal)
if at != nil { if at != nil {
rowsTenantInserted.Get(at).Add(rowsTotal) rowsTenantInserted.Get(at).Add(rowsTotal)

View file

@ -97,8 +97,9 @@ func Init() {
if len(*opentsdbHTTPListenAddr) > 0 { if len(*opentsdbHTTPListenAddr) > 0 {
opentsdbhttpServer = opentsdbhttpserver.MustStart(*opentsdbHTTPListenAddr, *opentsdbHTTPUseProxyProtocol, opentsdbhttp.InsertHandler) opentsdbhttpServer = opentsdbhttpserver.MustStart(*opentsdbHTTPListenAddr, *opentsdbHTTPUseProxyProtocol, opentsdbhttp.InsertHandler)
} }
promscrape.Init(func(at *auth.Token, wr *prompbmarshal.WriteRequest) { promscrape.Init(func(at *auth.Token, wr *prompbmarshal.WriteRequest) bool {
prompush.Push(wr) prompush.Push(wr)
return true
}) })
} }

View file

@ -28,6 +28,7 @@ The sandbox cluster installation is running under the constant load generated by
## tip ## tip
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add `-remoteWrite.disableOnDiskQueue` command-line flag, which can be used for disabling data queuing to disk when the remote storage cannot keep up with the data ingestion rate. In this case `vmagent` returns `429 Too Many Requests` response, so clients could decrease data ingestion rate on their side. This option may be useful when `vmagent` runs in environments with slow persistent disks. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2110).
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add support for reading and writing samples via [Google PubSub](https://cloud.google.com/pubsub). See [these docs](https://docs.victoriametrics.com/vmagent.html#google-pubsub-integration). * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add support for reading and writing samples via [Google PubSub](https://cloud.google.com/pubsub). See [these docs](https://docs.victoriametrics.com/vmagent.html#google-pubsub-integration).
* FEATURE: reduce the default value for `-import.maxLineLen` command-line flag from 100MB to 10MB in order to prevent excessive memory usage during data import via [/api/v1/import](https://docs.victoriametrics.com/#how-to-import-data-in-json-line-format). * FEATURE: reduce the default value for `-import.maxLineLen` command-line flag from 100MB to 10MB in order to prevent excessive memory usage during data import via [/api/v1/import](https://docs.victoriametrics.com/#how-to-import-data-in-json-line-format).

View file

@ -869,6 +869,48 @@ scrape_configs:
- "Proxy-Auth: top-secret" - "Proxy-Auth: top-secret"
``` ```
## Disabling on-disk queue
On-disk queue aka persistent queue is a temporary folder configured via `-remoteWrite.tmpDataPath` flag. At this folder vmagent may store metric blocks.
Metric blocks persisted on disk if remote storage is not available or cannot handle ingestion load.
Size of this disk queue per remote storage can be limited via `-remoteWrite.maxDiskUsagePerURL`. By default, there is no limit.
In case of reaching those limit metric blocks will be silently dropped by vmagent.
This behaviour can be changed via flag `--remoteWrite.disableOnDiskQueue=true`.
It prevents vmagent from using on-disk storage for data buffering during ingestion or scraping.
But on-disk storage is still used for saving in-memory part of the queue and buffers during graceful shutdown.
It's expected that `streaming` aggregation and `scrapping` metrics will be dropped in case of full queue.
The following metrics help to detect samples drop: `vmagent_remotewrite_aggregation_metrics_dropped_total` and `vm_promscrape_push_samples_dropped_total`.
In case of multiple configured remote storages, vmagent block writes requests even if a single remote storage cannot accept ingested samples.
vmagent guarantees at-least-once delivery semantic.
It means that metric samples duplication is possible and [deduplication](https://docs.victoriametrics.com/#deduplication) must be configured at remote storage.
### Common patterns
You may want to disable on-disk queue in the following cases:
1) chaining of vmagents. Intermediate vmagents used for aggregation may loss the data, if vmcluster is not available.
With disabled persistent queue aggregation vmagents will back-pressure metrics to the first vmagent.
```mermaid
flowchart LR
A[vmagent] --> B(vmagent-aggregation-0)
A[vmagent] --> C(vmagent-aggregation-1)
B --> D[vmcluster]
C --> D[vmcluster]
```
2) If you want to replace actual on-disk queue with kafka or another compatible queue. On-disk queue must be disabled at `vmagent-consumer`
```mermaid
flowchart LR
A[vmagent] --> B(kafka)
B <--> C(vmagent-consumer)
C --> D[vmcluster]
```
## Cardinality limiter ## Cardinality limiter
By default, `vmagent` doesn't limit the number of time series each scrape target can expose. By default, `vmagent` doesn't limit the number of time series each scrape target can expose.
@ -1746,6 +1788,9 @@ See the docs at https://docs.victoriametrics.com/vmagent.html .
-remoteWrite.bearerTokenFile array -remoteWrite.bearerTokenFile array
Optional path to bearer token file to use for the corresponding -remoteWrite.url. The token is re-read from the file every second Optional path to bearer token file to use for the corresponding -remoteWrite.url. The token is re-read from the file every second
Supports an array of values separated by comma or specified via multiple flags. Supports an array of values separated by comma or specified via multiple flags.
-remoteWrite.disableOnDiskQueue
Whether to disable on-disk queue for metrics ingestion processing. If in-memory queue is full for at least 1 remoteWrite target, all data ingestion is blocked and returns an error.
It allows to build a chain of vmagents and build complicated data pipelines without data-loss. On-disk writes is still possible during graceful shutdown for storing in-memory part of the queue.
-remoteWrite.flushInterval duration -remoteWrite.flushInterval duration
Interval for flushing the data to remote storage. This option takes effect only when less than 10K data points per second are pushed to -remoteWrite.url (default 1s) Interval for flushing the data to remote storage. This option takes effect only when less than 10K data points per second are pushed to -remoteWrite.url (default 1s)
-remoteWrite.forcePromProto array -remoteWrite.forcePromProto array

View file

@ -22,6 +22,7 @@ type FastQueue struct {
// or when MustClose is called. // or when MustClose is called.
cond sync.Cond cond sync.Cond
isPQDisabled bool
// pq is file-based queue // pq is file-based queue
pq *queue pq *queue
@ -42,11 +43,14 @@ type FastQueue struct {
// if maxPendingBytes is 0, then the queue size is unlimited. // if maxPendingBytes is 0, then the queue size is unlimited.
// Otherwise its size is limited by maxPendingBytes. The oldest data is dropped when the queue // Otherwise its size is limited by maxPendingBytes. The oldest data is dropped when the queue
// reaches maxPendingSize. // reaches maxPendingSize.
func MustOpenFastQueue(path, name string, maxInmemoryBlocks int, maxPendingBytes int64) *FastQueue { // if isPQDisabled is set to true, all write requests that exceed in-memory buffer capacity'll be rejected with errQueueIsFull error
// in-memory queue part can be stored on disk during gracefull shutdown.
func MustOpenFastQueue(path, name string, maxInmemoryBlocks int, maxPendingBytes int64, isPQDisabled bool) *FastQueue {
pq := mustOpen(path, name, maxPendingBytes) pq := mustOpen(path, name, maxPendingBytes)
fq := &FastQueue{ fq := &FastQueue{
pq: pq, pq: pq,
ch: make(chan *bytesutil.ByteBuffer, maxInmemoryBlocks), isPQDisabled: isPQDisabled,
ch: make(chan *bytesutil.ByteBuffer, maxInmemoryBlocks),
} }
fq.cond.L = &fq.mu fq.cond.L = &fq.mu
fq.lastInmemoryBlockReadTime = fasttime.UnixTimestamp() fq.lastInmemoryBlockReadTime = fasttime.UnixTimestamp()
@ -61,6 +65,16 @@ func MustOpenFastQueue(path, name string, maxInmemoryBlocks int, maxPendingBytes
return fq return fq
} }
// IsWritesBlocked checks if data can be pushed into the queue
func (fq *FastQueue) IsWritesBlocked() bool {
if !fq.isPQDisabled {
return false
}
fq.mu.Lock()
defer fq.mu.Unlock()
return len(fq.ch) == cap(fq.ch) || fq.pq.GetPendingBytes() > 0
}
// UnblockAllReaders unblocks all the readers. // UnblockAllReaders unblocks all the readers.
func (fq *FastQueue) UnblockAllReaders() { func (fq *FastQueue) UnblockAllReaders() {
fq.mu.Lock() fq.mu.Lock()
@ -92,7 +106,7 @@ func (fq *FastQueue) MustClose() {
} }
func (fq *FastQueue) flushInmemoryBlocksToFileIfNeededLocked() { func (fq *FastQueue) flushInmemoryBlocksToFileIfNeededLocked() {
if len(fq.ch) == 0 { if len(fq.ch) == 0 || fq.isPQDisabled {
return return
} }
if fasttime.UnixTimestamp() < fq.lastInmemoryBlockReadTime+5 { if fasttime.UnixTimestamp() < fq.lastInmemoryBlockReadTime+5 {
@ -118,6 +132,10 @@ func (fq *FastQueue) flushInmemoryBlocksToFileLocked() {
func (fq *FastQueue) GetPendingBytes() uint64 { func (fq *FastQueue) GetPendingBytes() uint64 {
fq.mu.Lock() fq.mu.Lock()
defer fq.mu.Unlock() defer fq.mu.Unlock()
return fq.getPendingBytesLocked()
}
func (fq *FastQueue) getPendingBytesLocked() uint64 {
n := fq.pendingInmemoryBytes n := fq.pendingInmemoryBytes
n += fq.pq.GetPendingBytes() n += fq.pq.GetPendingBytes()
@ -132,26 +150,47 @@ func (fq *FastQueue) GetInmemoryQueueLen() int {
return len(fq.ch) return len(fq.ch)
} }
// MustWriteBlock writes block to fq. // MustWriteBlockIgnoreDisabledPQ writes block to fq, persists data on disk even if persistent disabled by flag.
func (fq *FastQueue) MustWriteBlock(block []byte) { // it's needed to gracefully stop service and do not lose data if remote storage is not available.
func (fq *FastQueue) MustWriteBlockIgnoreDisabledPQ(block []byte) {
if !fq.writeBlock(block, true) {
logger.Fatalf("BUG: MustWriteBlockIgnoreDisabledPQ must always write data even if persistence is disabled")
}
}
// WriteBlock writes block to fq.
func (fq *FastQueue) WriteBlock(block []byte) bool {
return fq.writeBlock(block, false)
}
// WriteBlock writes block to fq.
func (fq *FastQueue) writeBlock(block []byte, mustIgnoreDisabledPQ bool) bool {
fq.mu.Lock() fq.mu.Lock()
defer fq.mu.Unlock() defer fq.mu.Unlock()
isPQWritesAllowed := !fq.isPQDisabled || mustIgnoreDisabledPQ
fq.flushInmemoryBlocksToFileIfNeededLocked() fq.flushInmemoryBlocksToFileIfNeededLocked()
if n := fq.pq.GetPendingBytes(); n > 0 { if n := fq.pq.GetPendingBytes(); n > 0 {
if !isPQWritesAllowed {
return false
}
// The file-based queue isn't drained yet. This means that in-memory queue cannot be used yet. // The file-based queue isn't drained yet. This means that in-memory queue cannot be used yet.
// So put the block to file-based queue. // So put the block to file-based queue.
if len(fq.ch) > 0 { if len(fq.ch) > 0 {
logger.Panicf("BUG: the in-memory queue must be empty when the file-based queue is non-empty; it contains %d pending bytes", n) logger.Panicf("BUG: the in-memory queue must be empty when the file-based queue is non-empty; it contains %d pending bytes", n)
} }
fq.pq.MustWriteBlock(block) fq.pq.MustWriteBlock(block)
return return true
} }
if len(fq.ch) == cap(fq.ch) { if len(fq.ch) == cap(fq.ch) {
// There is no space in the in-memory queue. Put the data to file-based queue. // There is no space in the in-memory queue. Put the data to file-based queue.
if !isPQWritesAllowed {
return false
}
fq.flushInmemoryBlocksToFileLocked() fq.flushInmemoryBlocksToFileLocked()
fq.pq.MustWriteBlock(block) fq.pq.MustWriteBlock(block)
return return true
} }
// There is enough space in the in-memory queue. // There is enough space in the in-memory queue.
bb := blockBufPool.Get() bb := blockBufPool.Get()
@ -162,6 +201,7 @@ func (fq *FastQueue) MustWriteBlock(block []byte) {
// Notify potentially blocked reader. // Notify potentially blocked reader.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/pull/484 for the context. // See https://github.com/VictoriaMetrics/VictoriaMetrics/pull/484 for the context.
fq.cond.Signal() fq.cond.Signal()
return true
} }
// MustReadBlock reads the next block from fq to dst and returns it. // MustReadBlock reads the next block from fq to dst and returns it.

View file

@ -11,7 +11,7 @@ func TestFastQueueOpenClose(_ *testing.T) {
path := "fast-queue-open-close" path := "fast-queue-open-close"
mustDeleteDir(path) mustDeleteDir(path)
for i := 0; i < 10; i++ { for i := 0; i < 10; i++ {
fq := MustOpenFastQueue(path, "foobar", 100, 0) fq := MustOpenFastQueue(path, "foobar", 100, 0, true)
fq.MustClose() fq.MustClose()
} }
mustDeleteDir(path) mustDeleteDir(path)
@ -22,14 +22,16 @@ func TestFastQueueWriteReadInmemory(t *testing.T) {
mustDeleteDir(path) mustDeleteDir(path)
capacity := 100 capacity := 100
fq := MustOpenFastQueue(path, "foobar", capacity, 0) fq := MustOpenFastQueue(path, "foobar", capacity, 0, true)
if n := fq.GetInmemoryQueueLen(); n != 0 { if n := fq.GetInmemoryQueueLen(); n != 0 {
t.Fatalf("unexpected non-zero inmemory queue size: %d", n) t.Fatalf("unexpected non-zero inmemory queue size: %d", n)
} }
var blocks []string var blocks []string
for i := 0; i < capacity; i++ { for i := 0; i < capacity; i++ {
block := fmt.Sprintf("block %d", i) block := fmt.Sprintf("block %d", i)
fq.MustWriteBlock([]byte(block)) if !fq.WriteBlock([]byte(block)) {
t.Fatalf("unexpected false for WriteBlock")
}
blocks = append(blocks, block) blocks = append(blocks, block)
} }
if n := fq.GetInmemoryQueueLen(); n != capacity { if n := fq.GetInmemoryQueueLen(); n != capacity {
@ -53,14 +55,16 @@ func TestFastQueueWriteReadMixed(t *testing.T) {
mustDeleteDir(path) mustDeleteDir(path)
capacity := 100 capacity := 100
fq := MustOpenFastQueue(path, "foobar", capacity, 0) fq := MustOpenFastQueue(path, "foobar", capacity, 0, false)
if n := fq.GetPendingBytes(); n != 0 { if n := fq.GetPendingBytes(); n != 0 {
t.Fatalf("the number of pending bytes must be 0; got %d", n) t.Fatalf("the number of pending bytes must be 0; got %d", n)
} }
var blocks []string var blocks []string
for i := 0; i < 2*capacity; i++ { for i := 0; i < 2*capacity; i++ {
block := fmt.Sprintf("block %d", i) block := fmt.Sprintf("block %d", i)
fq.MustWriteBlock([]byte(block)) if !fq.WriteBlock([]byte(block)) {
t.Fatalf("not expected WriteBlock fail")
}
blocks = append(blocks, block) blocks = append(blocks, block)
} }
if n := fq.GetPendingBytes(); n == 0 { if n := fq.GetPendingBytes(); n == 0 {
@ -87,17 +91,20 @@ func TestFastQueueWriteReadWithCloses(t *testing.T) {
mustDeleteDir(path) mustDeleteDir(path)
capacity := 100 capacity := 100
fq := MustOpenFastQueue(path, "foobar", capacity, 0) fq := MustOpenFastQueue(path, "foobar", capacity, 0, false)
if n := fq.GetPendingBytes(); n != 0 { if n := fq.GetPendingBytes(); n != 0 {
t.Fatalf("the number of pending bytes must be 0; got %d", n) t.Fatalf("the number of pending bytes must be 0; got %d", n)
} }
var blocks []string var blocks []string
for i := 0; i < 2*capacity; i++ { for i := 0; i < 2*capacity; i++ {
block := fmt.Sprintf("block %d", i) block := fmt.Sprintf("block %d", i)
fq.MustWriteBlock([]byte(block)) if !fq.WriteBlock([]byte(block)) {
t.Fatalf("unexpected false for WriteBlock")
}
blocks = append(blocks, block) blocks = append(blocks, block)
fq.MustClose() fq.MustClose()
fq = MustOpenFastQueue(path, "foobar", capacity, 0) fq = MustOpenFastQueue(path, "foobar", capacity, 0, false)
} }
if n := fq.GetPendingBytes(); n == 0 { if n := fq.GetPendingBytes(); n == 0 {
t.Fatalf("the number of pending bytes must be greater than 0") t.Fatalf("the number of pending bytes must be greater than 0")
@ -111,7 +118,7 @@ func TestFastQueueWriteReadWithCloses(t *testing.T) {
t.Fatalf("unexpected block read; got %q; want %q", buf, block) t.Fatalf("unexpected block read; got %q; want %q", buf, block)
} }
fq.MustClose() fq.MustClose()
fq = MustOpenFastQueue(path, "foobar", capacity, 0) fq = MustOpenFastQueue(path, "foobar", capacity, 0, false)
} }
if n := fq.GetPendingBytes(); n != 0 { if n := fq.GetPendingBytes(); n != 0 {
t.Fatalf("the number of pending bytes must be 0; got %d", n) t.Fatalf("the number of pending bytes must be 0; got %d", n)
@ -124,7 +131,7 @@ func TestFastQueueReadUnblockByClose(t *testing.T) {
path := "fast-queue-read-unblock-by-close" path := "fast-queue-read-unblock-by-close"
mustDeleteDir(path) mustDeleteDir(path)
fq := MustOpenFastQueue(path, "foorbar", 123, 0) fq := MustOpenFastQueue(path, "foorbar", 123, 0, false)
resultCh := make(chan error) resultCh := make(chan error)
go func() { go func() {
data, ok := fq.MustReadBlock(nil) data, ok := fq.MustReadBlock(nil)
@ -154,7 +161,7 @@ func TestFastQueueReadUnblockByWrite(t *testing.T) {
path := "fast-queue-read-unblock-by-write" path := "fast-queue-read-unblock-by-write"
mustDeleteDir(path) mustDeleteDir(path)
fq := MustOpenFastQueue(path, "foobar", 13, 0) fq := MustOpenFastQueue(path, "foobar", 13, 0, false)
block := "foodsafdsaf sdf" block := "foodsafdsaf sdf"
resultCh := make(chan error) resultCh := make(chan error)
go func() { go func() {
@ -169,7 +176,9 @@ func TestFastQueueReadUnblockByWrite(t *testing.T) {
} }
resultCh <- nil resultCh <- nil
}() }()
fq.MustWriteBlock([]byte(block)) if !fq.WriteBlock([]byte(block)) {
t.Fatalf("unexpected false for WriteBlock")
}
select { select {
case err := <-resultCh: case err := <-resultCh:
if err != nil { if err != nil {
@ -186,7 +195,7 @@ func TestFastQueueReadWriteConcurrent(t *testing.T) {
path := "fast-queue-read-write-concurrent" path := "fast-queue-read-write-concurrent"
mustDeleteDir(path) mustDeleteDir(path)
fq := MustOpenFastQueue(path, "foobar", 5, 0) fq := MustOpenFastQueue(path, "foobar", 5, 0, false)
var blocks []string var blocks []string
blocksMap := make(map[string]bool) blocksMap := make(map[string]bool)
@ -226,7 +235,10 @@ func TestFastQueueReadWriteConcurrent(t *testing.T) {
go func() { go func() {
defer writersWG.Done() defer writersWG.Done()
for block := range blocksCh { for block := range blocksCh {
fq.MustWriteBlock([]byte(block)) if !fq.WriteBlock([]byte(block)) {
t.Errorf("unexpected false for WriteBlock")
return
}
} }
}() }()
} }
@ -250,7 +262,7 @@ func TestFastQueueReadWriteConcurrent(t *testing.T) {
readersWG.Wait() readersWG.Wait()
// Collect the remaining data // Collect the remaining data
fq = MustOpenFastQueue(path, "foobar", 5, 0) fq = MustOpenFastQueue(path, "foobar", 5, 0, false)
resultCh := make(chan error) resultCh := make(chan error)
go func() { go func() {
for len(blocksMap) > 0 { for len(blocksMap) > 0 {
@ -278,3 +290,80 @@ func TestFastQueueReadWriteConcurrent(t *testing.T) {
fq.MustClose() fq.MustClose()
mustDeleteDir(path) mustDeleteDir(path)
} }
func TestFastQueueWriteReadWithDisabledPQ(t *testing.T) {
path := "fast-queue-write-read-inmemory-disabled-pq"
mustDeleteDir(path)
capacity := 20
fq := MustOpenFastQueue(path, "foobar", capacity, 0, true)
if n := fq.GetInmemoryQueueLen(); n != 0 {
t.Fatalf("unexpected non-zero inmemory queue size: %d", n)
}
var blocks []string
for i := 0; i < capacity; i++ {
block := fmt.Sprintf("block %d", i)
if !fq.WriteBlock([]byte(block)) {
t.Fatalf("unexpected false for WriteBlock")
}
blocks = append(blocks, block)
}
if fq.WriteBlock([]byte("error-block")) {
t.Fatalf("expect false due to full queue")
}
fq.MustClose()
fq = MustOpenFastQueue(path, "foobar", capacity, 0, true)
for _, block := range blocks {
buf, ok := fq.MustReadBlock(nil)
if !ok {
t.Fatalf("unexpected ok=false")
}
if string(buf) != block {
t.Fatalf("unexpected block read; got %q; want %q", buf, block)
}
}
fq.MustClose()
mustDeleteDir(path)
}
func TestFastQueueWriteReadWithIgnoreDisabledPQ(t *testing.T) {
path := "fast-queue-write-read-inmemory-disabled-pq-force-write"
mustDeleteDir(path)
capacity := 20
fq := MustOpenFastQueue(path, "foobar", capacity, 0, true)
if n := fq.GetInmemoryQueueLen(); n != 0 {
t.Fatalf("unexpected non-zero inmemory queue size: %d", n)
}
var blocks []string
for i := 0; i < capacity; i++ {
block := fmt.Sprintf("block %d", i)
if !fq.WriteBlock([]byte(block)) {
t.Fatalf("unexpected false for WriteBlock")
}
blocks = append(blocks, block)
}
if fq.WriteBlock([]byte("error-block")) {
t.Fatalf("expect false due to full queue")
}
for i := 0; i < capacity; i++ {
block := fmt.Sprintf("block %d-%d", i, i)
fq.MustWriteBlockIgnoreDisabledPQ([]byte(block))
blocks = append(blocks, block)
}
fq.MustClose()
fq = MustOpenFastQueue(path, "foobar", capacity, 0, true)
for _, block := range blocks {
buf, ok := fq.MustReadBlock(nil)
if !ok {
t.Fatalf("unexpected ok=false")
}
if string(buf) != block {
t.Fatalf("unexpected block read; got %q; want %q", buf, block)
}
}
fq.MustClose()
mustDeleteDir(path)
}

View file

@ -16,13 +16,13 @@ func BenchmarkFastQueueThroughputSerial(b *testing.B) {
b.SetBytes(int64(blockSize) * iterationsCount) b.SetBytes(int64(blockSize) * iterationsCount)
path := fmt.Sprintf("bench-fast-queue-throughput-serial-%d", blockSize) path := fmt.Sprintf("bench-fast-queue-throughput-serial-%d", blockSize)
mustDeleteDir(path) mustDeleteDir(path)
fq := MustOpenFastQueue(path, "foobar", iterationsCount*2, 0) fq := MustOpenFastQueue(path, "foobar", iterationsCount*2, 0, false)
defer func() { defer func() {
fq.MustClose() fq.MustClose()
mustDeleteDir(path) mustDeleteDir(path)
}() }()
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
writeReadIterationFastQueue(fq, block, iterationsCount) writeReadIterationFastQueue(b, fq, block, iterationsCount)
} }
}) })
} }
@ -37,23 +37,25 @@ func BenchmarkFastQueueThroughputConcurrent(b *testing.B) {
b.SetBytes(int64(blockSize) * iterationsCount) b.SetBytes(int64(blockSize) * iterationsCount)
path := fmt.Sprintf("bench-fast-queue-throughput-concurrent-%d", blockSize) path := fmt.Sprintf("bench-fast-queue-throughput-concurrent-%d", blockSize)
mustDeleteDir(path) mustDeleteDir(path)
fq := MustOpenFastQueue(path, "foobar", iterationsCount*cgroup.AvailableCPUs()*2, 0) fq := MustOpenFastQueue(path, "foobar", iterationsCount*cgroup.AvailableCPUs()*2, 0, false)
defer func() { defer func() {
fq.MustClose() fq.MustClose()
mustDeleteDir(path) mustDeleteDir(path)
}() }()
b.RunParallel(func(pb *testing.PB) { b.RunParallel(func(pb *testing.PB) {
for pb.Next() { for pb.Next() {
writeReadIterationFastQueue(fq, block, iterationsCount) writeReadIterationFastQueue(b, fq, block, iterationsCount)
} }
}) })
}) })
} }
} }
func writeReadIterationFastQueue(fq *FastQueue, block []byte, iterationsCount int) { func writeReadIterationFastQueue(b *testing.B, fq *FastQueue, block []byte, iterationsCount int) {
for i := 0; i < iterationsCount; i++ { for i := 0; i < iterationsCount; i++ {
fq.MustWriteBlock(block) if !fq.WriteBlock(block) {
b.Fatalf("unexpected false for WriteBlock")
}
} }
var ok bool var ok bool
bb := bbPool.Get() bb := bbPool.Get()

View file

@ -59,13 +59,18 @@ func CheckConfig() error {
// Init initializes Prometheus scraper with config from the `-promscrape.config`. // Init initializes Prometheus scraper with config from the `-promscrape.config`.
// //
// Scraped data is passed to pushData. // Scraped data is passed to pushData.
func Init(pushData func(at *auth.Token, wr *prompbmarshal.WriteRequest)) { func Init(pushData func(at *auth.Token, wr *prompbmarshal.WriteRequest) bool) {
mustInitClusterMemberID() mustInitClusterMemberID()
pushDataTrackDropped := func(at *auth.Token, wr *prompbmarshal.WriteRequest) {
if !pushData(at, wr) {
pushDataFailsTotal.Inc()
}
}
globalStopChan = make(chan struct{}) globalStopChan = make(chan struct{})
scraperWG.Add(1) scraperWG.Add(1)
go func() { go func() {
defer scraperWG.Done() defer scraperWG.Done()
runScraper(*promscrapeConfigFile, pushData, globalStopChan) runScraper(*promscrapeConfigFile, pushDataTrackDropped, globalStopChan)
}() }()
} }
@ -84,6 +89,8 @@ var (
// configData contains -promscrape.config data // configData contains -promscrape.config data
configData atomic.Pointer[[]byte] configData atomic.Pointer[[]byte]
pushDataFailsTotal = metrics.NewCounter(`vm_promscrape_push_samples_dropped_total`)
) )
// WriteConfigData writes -promscrape.config contents to w // WriteConfigData writes -promscrape.config contents to w