app/vmagent: follow-up for 090cb2c9de

- Add Try* prefix to functions, which return bool result in order to improve readability and reduce the probability of missing check
  for the result returned from these functions.

- Call the adjustSampleValues() only once on input samples. Previously it was called on every attempt to flush data to peristent queue.

- Properly restore the initial state of WriteRequest passed to tryPushWriteRequest() before returning from this function
  after unsuccessful push to persistent queue. Previously a part of WriteRequest samples may be lost in such case.

- Add -remoteWrite.dropSamplesOnOverload command-line flag, which can be used for dropping incoming samples instead
  of returning 429 Too Many Requests error to the client when -remoteWrite.disableOnDiskQueue is set and the remote storage
  cannot keep up with the data ingestion rate.

- Add vmagent_remotewrite_samples_dropped_total metric, which counts the number of dropped samples.

- Add vmagent_remotewrite_push_failures_total metric, which counts the number of unsuccessful attempts to push
  data to persistent queue when -remoteWrite.disableOnDiskQueue is set.

- Remove vmagent_remotewrite_aggregation_metrics_dropped_total and vm_promscrape_push_samples_dropped_total metrics,
  because they are replaced with vmagent_remotewrite_samples_dropped_total metric.

- Update 'Disabling on-disk persistence' docs at docs/vmagent.md

- Update stale comments in the code

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5088
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2110
This commit is contained in:
Aliaksandr Valialkin 2023-11-25 11:31:30 +02:00
parent 090cb2c9de
commit 5034aa0773
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB
23 changed files with 267 additions and 230 deletions

View file

@ -65,7 +65,7 @@ func insertRows(at *auth.Token, rows []parser.Row, extraLabels []prompbmarshal.L
ctx.WriteRequest.Timeseries = tssDst
ctx.Labels = labels
ctx.Samples = samples
if !remotewrite.Push(at, &ctx.WriteRequest) {
if !remotewrite.TryPush(at, &ctx.WriteRequest) {
return remotewrite.ErrQueueFullHTTPRetry
}
rowsInserted.Add(len(rows))

View file

@ -88,7 +88,7 @@ func insertRows(at *auth.Token, series []datadog.Series, extraLabels []prompbmar
ctx.WriteRequest.Timeseries = tssDst
ctx.Labels = labels
ctx.Samples = samples
if !remotewrite.Push(at, &ctx.WriteRequest) {
if !remotewrite.TryPush(at, &ctx.WriteRequest) {
return remotewrite.ErrQueueFullHTTPRetry
}
rowsInserted.Add(rowsTotal)

View file

@ -56,7 +56,7 @@ func insertRows(rows []parser.Row) error {
ctx.WriteRequest.Timeseries = tssDst
ctx.Labels = labels
ctx.Samples = samples
if !remotewrite.Push(nil, &ctx.WriteRequest) {
if !remotewrite.TryPush(nil, &ctx.WriteRequest) {
return remotewrite.ErrQueueFullHTTPRetry
}
rowsInserted.Add(len(rows))

View file

@ -130,7 +130,7 @@ func insertRows(at *auth.Token, db string, rows []parser.Row, extraLabels []prom
ctx.ctx.Labels = labels
ctx.ctx.Samples = samples
ctx.commonLabels = commonLabels
if !remotewrite.Push(at, &ctx.ctx.WriteRequest) {
if !remotewrite.TryPush(at, &ctx.ctx.WriteRequest) {
return remotewrite.ErrQueueFullHTTPRetry
}
rowsInserted.Add(rowsTotal)

View file

@ -37,6 +37,7 @@ import (
opentsdbhttpserver "github.com/VictoriaMetrics/VictoriaMetrics/lib/ingestserver/opentsdbhttp"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/procutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/pushmetrics"
@ -139,7 +140,9 @@ func main() {
opentsdbhttpServer = opentsdbhttpserver.MustStart(*opentsdbHTTPListenAddr, *opentsdbHTTPUseProxyProtocol, httpInsertHandler)
}
promscrape.Init(remotewrite.Push)
promscrape.Init(func(at *auth.Token, wr *prompbmarshal.WriteRequest) {
_ = remotewrite.TryPush(at, wr)
})
if len(*httpListenAddr) > 0 {
go httpserver.Serve(*httpListenAddr, *useProxyProtocol, requestHandler)

View file

@ -84,7 +84,7 @@ func insertRows(at *auth.Token, block *stream.Block, extraLabels []prompbmarshal
ctx.WriteRequest.Timeseries = tssDst
ctx.Labels = labels
ctx.Samples = samples
if !remotewrite.Push(at, &ctx.WriteRequest) {
if !remotewrite.TryPush(at, &ctx.WriteRequest) {
return remotewrite.ErrQueueFullHTTPRetry
}
return nil

View file

@ -76,7 +76,7 @@ func insertRows(at *auth.Token, rows []newrelic.Row, extraLabels []prompbmarshal
ctx.WriteRequest.Timeseries = tssDst
ctx.Labels = labels
ctx.Samples = samples
if !remotewrite.Push(at, &ctx.WriteRequest) {
if !remotewrite.TryPush(at, &ctx.WriteRequest) {
return remotewrite.ErrQueueFullHTTPRetry
}
rowsInserted.Add(len(rows))

View file

@ -59,7 +59,7 @@ func insertRows(at *auth.Token, tss []prompbmarshal.TimeSeries, extraLabels []pr
ctx.WriteRequest.Timeseries = tssDst
ctx.Labels = labels
ctx.Samples = samples
if !remotewrite.Push(at, &ctx.WriteRequest) {
if !remotewrite.TryPush(at, &ctx.WriteRequest) {
return remotewrite.ErrQueueFullHTTPRetry
}
rowsInserted.Add(rowsTotal)

View file

@ -56,7 +56,7 @@ func insertRows(rows []parser.Row) error {
ctx.WriteRequest.Timeseries = tssDst
ctx.Labels = labels
ctx.Samples = samples
if !remotewrite.Push(nil, &ctx.WriteRequest) {
if !remotewrite.TryPush(nil, &ctx.WriteRequest) {
return remotewrite.ErrQueueFullHTTPRetry
}
rowsInserted.Add(len(rows))

View file

@ -64,7 +64,7 @@ func insertRows(at *auth.Token, rows []parser.Row, extraLabels []prompbmarshal.L
ctx.WriteRequest.Timeseries = tssDst
ctx.Labels = labels
ctx.Samples = samples
if !remotewrite.Push(at, &ctx.WriteRequest) {
if !remotewrite.TryPush(at, &ctx.WriteRequest) {
return remotewrite.ErrQueueFullHTTPRetry
}
rowsInserted.Add(len(rows))

View file

@ -73,7 +73,7 @@ func insertRows(at *auth.Token, rows []parser.Row, extraLabels []prompbmarshal.L
ctx.WriteRequest.Timeseries = tssDst
ctx.Labels = labels
ctx.Samples = samples
if !remotewrite.Push(at, &ctx.WriteRequest) {
if !remotewrite.TryPush(at, &ctx.WriteRequest) {
return remotewrite.ErrQueueFullHTTPRetry
}
rowsInserted.Add(len(rows))

View file

@ -69,7 +69,7 @@ func insertRows(at *auth.Token, timeseries []prompb.TimeSeries, extraLabels []pr
ctx.WriteRequest.Timeseries = tssDst
ctx.Labels = labels
ctx.Samples = samples
if !remotewrite.Push(at, &ctx.WriteRequest) {
if !remotewrite.TryPush(at, &ctx.WriteRequest) {
return remotewrite.ErrQueueFullHTTPRetry
}
rowsInserted.Add(rowsTotal)

View file

@ -57,11 +57,11 @@ func (ps *pendingSeries) MustStop() {
ps.periodicFlusherWG.Wait()
}
func (ps *pendingSeries) Push(tss []prompbmarshal.TimeSeries) bool {
func (ps *pendingSeries) TryPush(tss []prompbmarshal.TimeSeries) bool {
ps.mu.Lock()
wasPushed := ps.wr.push(tss)
ok := ps.wr.tryPush(tss)
ps.mu.Unlock()
return wasPushed
return ok
}
func (ps *pendingSeries) periodicFlusher() {
@ -84,8 +84,7 @@ func (ps *pendingSeries) periodicFlusher() {
}
}
ps.mu.Lock()
// no-op
_ = ps.wr.flush()
_ = ps.wr.tryFlush()
ps.mu.Unlock()
}
}
@ -94,15 +93,16 @@ type writeRequest struct {
// Move lastFlushTime to the top of the struct in order to guarantee atomic access on 32-bit architectures.
lastFlushTime uint64
// The queue to send blocks to.
fq *persistentqueue.FastQueue
// Whether to encode the write request with VictoriaMetrics remote write protocol.
isVMRemoteWrite bool
// How many significant figures must be left before sending the writeRequest to pushBlock.
// How many significant figures must be left before sending the writeRequest to fq.
significantFigures int
// How many decimal digits after point must be left before sending the writeRequest to pushBlock.
// How many decimal digits after point must be left before sending the writeRequest to fq.
roundDigits int
wr prompbmarshal.WriteRequest
@ -115,7 +115,7 @@ type writeRequest struct {
}
func (wr *writeRequest) reset() {
// Do not reset lastFlushTime, pushBlock, isVMRemoteWrite, significantFigures and roundDigits, since they are re-used.
// Do not reset lastFlushTime, fq, isVMRemoteWrite, significantFigures and roundDigits, since they are re-used.
wr.wr.Timeseries = nil
@ -133,41 +133,40 @@ func (wr *writeRequest) reset() {
wr.buf = wr.buf[:0]
}
// mustFlushOnStop makes force push into the queue
// needed to properly save in-memory buffer with disabled disk storage
// mustFlushOnStop force pushes wr data into wr.fq
//
// This is needed in order to properly save in-memory data to persistent queue on graceful shutdown.
func (wr *writeRequest) mustFlushOnStop() {
wr.wr.Timeseries = wr.tss
wr.adjustSampleValues()
atomic.StoreUint64(&wr.lastFlushTime, fasttime.UnixTimestamp())
if !pushWriteRequest(&wr.wr, func(block []byte) bool {
wr.fq.MustWriteBlockIgnoreDisabledPQ(block)
return true
}, wr.isVMRemoteWrite) {
return
if !tryPushWriteRequest(&wr.wr, wr.mustWriteBlock, wr.isVMRemoteWrite) {
logger.Panicf("BUG: final flush must always return true")
}
wr.reset()
}
func (wr *writeRequest) flush() bool {
func (wr *writeRequest) mustWriteBlock(block []byte) bool {
wr.fq.MustWriteBlockIgnoreDisabledPQ(block)
return true
}
func (wr *writeRequest) tryFlush() bool {
wr.wr.Timeseries = wr.tss
wr.adjustSampleValues()
atomic.StoreUint64(&wr.lastFlushTime, fasttime.UnixTimestamp())
if !pushWriteRequest(&wr.wr, wr.fq.WriteBlock, wr.isVMRemoteWrite) {
if !tryPushWriteRequest(&wr.wr, wr.fq.TryWriteBlock, wr.isVMRemoteWrite) {
return false
}
wr.reset()
return true
}
func (wr *writeRequest) adjustSampleValues() {
samples := wr.samples
if n := wr.significantFigures; n > 0 {
func adjustSampleValues(samples []prompbmarshal.Sample, significantFigures, roundDigits int) {
if n := significantFigures; n > 0 {
for i := range samples {
s := &samples[i]
s.Value = decimal.RoundToSignificantFigures(s.Value, n)
}
}
if n := wr.roundDigits; n < 100 {
if n := roundDigits; n < 100 {
for i := range samples {
s := &samples[i]
s.Value = decimal.RoundToDecimalDigits(s.Value, n)
@ -175,7 +174,7 @@ func (wr *writeRequest) adjustSampleValues() {
}
}
func (wr *writeRequest) push(src []prompbmarshal.TimeSeries) bool {
func (wr *writeRequest) tryPush(src []prompbmarshal.TimeSeries) bool {
tssDst := wr.tss
maxSamplesPerBlock := *maxRowsPerBlock
// Allow up to 10x of labels per each block on average.
@ -183,13 +182,15 @@ func (wr *writeRequest) push(src []prompbmarshal.TimeSeries) bool {
for i := range src {
if len(wr.samples) >= maxSamplesPerBlock || len(wr.labels) >= maxLabelsPerBlock {
wr.tss = tssDst
if !wr.flush() {
if !wr.tryFlush() {
return false
}
tssDst = wr.tss
}
tsSrc := &src[i]
adjustSampleValues(tsSrc.Samples, wr.significantFigures, wr.roundDigits)
tssDst = append(tssDst, prompbmarshal.TimeSeries{})
wr.copyTimeSeries(&tssDst[len(tssDst)-1], &src[i])
wr.copyTimeSeries(&tssDst[len(tssDst)-1], tsSrc)
}
wr.tss = tssDst
@ -221,7 +222,7 @@ func (wr *writeRequest) copyTimeSeries(dst, src *prompbmarshal.TimeSeries) {
wr.buf = buf
}
func pushWriteRequest(wr *prompbmarshal.WriteRequest, pushBlock func(block []byte) bool, isVMRemoteWrite bool) bool {
func tryPushWriteRequest(wr *prompbmarshal.WriteRequest, tryPushBlock func(block []byte) bool, isVMRemoteWrite bool) bool {
if len(wr.Timeseries) == 0 {
// Nothing to push
return true
@ -237,7 +238,7 @@ func pushWriteRequest(wr *prompbmarshal.WriteRequest, pushBlock func(block []byt
}
writeRequestBufPool.Put(bb)
if len(zb.B) <= persistentqueue.MaxBlockSize {
if !pushBlock(zb.B) {
if !tryPushBlock(zb.B) {
return false
}
blockSizeRows.Update(float64(len(wr.Timeseries)))
@ -260,11 +261,13 @@ func pushWriteRequest(wr *prompbmarshal.WriteRequest, pushBlock func(block []byt
}
n := len(samples) / 2
wr.Timeseries[0].Samples = samples[:n]
if !pushWriteRequest(wr, pushBlock, isVMRemoteWrite) {
if !tryPushWriteRequest(wr, tryPushBlock, isVMRemoteWrite) {
wr.Timeseries[0].Samples = samples
return false
}
wr.Timeseries[0].Samples = samples[n:]
if !pushWriteRequest(wr, pushBlock, isVMRemoteWrite) {
if !tryPushWriteRequest(wr, tryPushBlock, isVMRemoteWrite) {
wr.Timeseries[0].Samples = samples
return false
}
wr.Timeseries[0].Samples = samples
@ -273,11 +276,13 @@ func pushWriteRequest(wr *prompbmarshal.WriteRequest, pushBlock func(block []byt
timeseries := wr.Timeseries
n := len(timeseries) / 2
wr.Timeseries = timeseries[:n]
if !pushWriteRequest(wr, pushBlock, isVMRemoteWrite) {
if !tryPushWriteRequest(wr, tryPushBlock, isVMRemoteWrite) {
wr.Timeseries = timeseries
return false
}
wr.Timeseries = timeseries[n:]
if !pushWriteRequest(wr, pushBlock, isVMRemoteWrite) {
if !tryPushWriteRequest(wr, tryPushBlock, isVMRemoteWrite) {
wr.Timeseries = timeseries
return false
}
wr.Timeseries = timeseries

View file

@ -33,7 +33,9 @@ func testPushWriteRequest(t *testing.T, rowsCount, expectedBlockLenProm, expecte
pushBlockLen = len(block)
return true
}
_ = pushWriteRequest(wr, pushBlock, isVMRemoteWrite)
if !tryPushWriteRequest(wr, pushBlock, isVMRemoteWrite) {
t.Fatalf("cannot push data to to remote storage")
}
if math.Abs(float64(pushBlockLen-expectedBlockLen)/float64(expectedBlockLen)*100) > tolerancePrc {
t.Fatalf("unexpected block len for rowsCount=%d, isVMRemoteWrite=%v; got %d bytes; expecting %d bytes +- %.0f%%",
rowsCount, isVMRemoteWrite, pushBlockLen, expectedBlockLen, tolerancePrc)

View file

@ -47,8 +47,8 @@ var (
shardByURLLabels = flagutil.NewArrayString("remoteWrite.shardByURL.labels", "Optional list of labels, which must be used for sharding outgoing samples "+
"among remote storage systems if -remoteWrite.shardByURL command-line flag is set. By default all the labels are used for sharding in order to gain "+
"even distribution of series over the specified -remoteWrite.url systems")
tmpDataPath = flag.String("remoteWrite.tmpDataPath", "vmagent-remotewrite-data", "Path to directory where temporary data for remote write component is stored. "+
"See also -remoteWrite.maxDiskUsagePerURL")
tmpDataPath = flag.String("remoteWrite.tmpDataPath", "vmagent-remotewrite-data", "Path to directory for storing pending data, which isn't sent to the configured -remoteWrite.url . "+
"See also -remoteWrite.maxDiskUsagePerURL and -remoteWrite.disableOnDiskQueue")
keepDanglingQueues = flag.Bool("remoteWrite.keepDanglingQueues", false, "Keep persistent queues contents at -remoteWrite.tmpDataPath in case there are no matching -remoteWrite.url. "+
"Useful when -remoteWrite.url is changed temporarily and persistent queue files will be needed later on.")
queues = flag.Int("remoteWrite.queues", cgroup.AvailableCPUs()*2, "The number of concurrent queues to each -remoteWrite.url. Set more queues if default number of queues "+
@ -87,9 +87,11 @@ var (
"are written to the corresponding -remoteWrite.url . See also -remoteWrite.streamAggr.keepInput and https://docs.victoriametrics.com/stream-aggregation.html")
streamAggrDedupInterval = flagutil.NewArrayDuration("remoteWrite.streamAggr.dedupInterval", 0, "Input samples are de-duplicated with this interval before being aggregated. "+
"Only the last sample per each time series per each interval is aggregated if the interval is greater than zero")
disableOnDiskQueue = flag.Bool("remoteWrite.disableOnDiskQueue", false, "Whether to disable on-disk queue for metrics ingestion processing. "+
"If in-memory queue is full for at least 1 remoteWrite target, all data ingestion is blocked and returns an error. "+
"It allows to build a chain of vmagents and build complicated data pipelines without data-loss. On-disk writes is still possible during graceful shutdown for storing in-memory part of the queue.")
disableOnDiskQueue = flag.Bool("remoteWrite.disableOnDiskQueue", false, "Whether to disable storing pending data to -remoteWrite.tmpDataPath "+
"when the configured remote storage systems cannot keep up with the data ingestion rate. See https://docs.victoriametrics.com/vmagent.html#disabling-on-disk-persistence ."+
"See also -remoteWrite.dropSamplesOnOverload")
dropSamplesOnOverload = flag.Bool("remoteWrite.dropSamplesOnOverload", false, "Whether to drop samples when -remoteWrite.disableOnDiskQueue is set and if the samples "+
"cannot be pushed into the configured remote storage systems in a timely manner. See https://docs.victoriametrics.com/vmagent.html#disabling-on-disk-persistence")
)
var (
@ -103,10 +105,11 @@ var (
// Data without tenant id is written to defaultAuthToken if -remoteWrite.multitenantURL is specified.
defaultAuthToken = &auth.Token{}
// ErrQueueFullHTTPRetry returned when -remoteWrite.disableOnDiskQueue enabled
// and one of remote storage cannot handle a load
// ErrQueueFullHTTPRetry must be returned when TryPush() returns false.
ErrQueueFullHTTPRetry = &httpserver.ErrorWithStatusCode{
Err: fmt.Errorf("in-memory queue is full, write requests blocked due to enabled flag -remoteWrite.disableOnDiskQueue=true. Retry request later"),
Err: fmt.Errorf("remote storage systems cannot keep up with the data ingestion rate; retry the request later " +
"or remove -remoteWrite.disableOnDiskQueue from vmagent command-line flags, so it could save pending data to -remoteWrite.tmpDataPath; " +
"see https://docs.victoriametrics.com/vmagent.html#disabling-on-disk-persistence"),
StatusCode: http.StatusTooManyRequests,
}
)
@ -331,7 +334,7 @@ var configReloaderWG sync.WaitGroup
// Stop stops remotewrite.
//
// It is expected that nobody calls Push during and after the call to this func.
// It is expected that nobody calls TryPush during and after the call to this func.
func Stop() {
close(configReloaderStopCh)
configReloaderWG.Wait()
@ -341,7 +344,7 @@ func Stop() {
}
rwctxsDefault = nil
// There is no need in locking rwctxsMapLock here, since nobody should call Push during the Stop call.
// There is no need in locking rwctxsMapLock here, since nobody should call TryPush during the Stop call.
for _, rwctxs := range rwctxsMap {
for _, rwctx := range rwctxs {
rwctx.MustStop()
@ -357,13 +360,16 @@ func Stop() {
}
}
// Push sends wr to remote storage systems set via `-remoteWrite.url`.
// TryPush tries sending wr to the configured remote storage systems set via -remoteWrite.url and -remoteWrite.multitenantURL
//
// If at is nil, then the data is pushed to the configured `-remoteWrite.url`.
// If at isn't nil, the data is pushed to the configured `-remoteWrite.multitenantURL`.
// If at is nil, then the data is pushed to the configured -remoteWrite.url.
// If at isn't nil, the data is pushed to the configured -remoteWrite.multitenantURL.
//
// Note that wr may be modified by Push because of relabeling and rounding.
func Push(at *auth.Token, wr *prompbmarshal.WriteRequest) bool {
// TryPush can modify wr contents, so the caller must re-initialize wr before calling TryPush() after unsuccessful attempt.
// TryPush may send partial data from wr on unsuccessful attempt, so repeated call for the same wr may send the data multiple times.
//
// The caller must return ErrQueueFullHTTPRetry to the client, which sends wr, if TryPush returns false.
func TryPush(at *auth.Token, wr *prompbmarshal.WriteRequest) bool {
if at == nil && len(*remoteWriteMultitenantURLs) > 0 {
// Write data to default tenant if at isn't set while -remoteWrite.multitenantURL is set.
at = defaultAuthToken
@ -387,30 +393,42 @@ func Push(at *auth.Token, wr *prompbmarshal.WriteRequest) bool {
}
rwctxsMapLock.Unlock()
}
var isWritesLocked bool
tss := wr.Timeseries
rowsCount := getRowsCount(tss)
if *disableOnDiskQueue {
// Quick check whether writes to configured remote storage systems are blocked.
// This allows saving CPU time spent on relabeling and block compression
// if some of remote storage systems cannot keep up with the data ingestion rate.
for _, rwctx := range rwctxs {
if rwctx.fq.IsWritesBlocked() {
isWritesLocked = true
break
if rwctx.fq.IsWriteBlocked() {
pushFailures.Inc()
if *dropSamplesOnOverload {
// Just drop samples
samplesDropped.Add(rowsCount)
return true
}
}
// fast path, write path is blocked
if isWritesLocked {
return false
}
}
}
var rctx *relabelCtx
rcs := allRelabelConfigs.Load()
pcsGlobal := rcs.global
if pcsGlobal.Len() > 0 {
rctx = getRelabelCtx()
defer func() {
rctx.reset()
putRelabelCtx(rctx)
}()
}
tss := wr.Timeseries
rowsCount := getRowsCount(tss)
globalRowsPushedBeforeRelabel.Add(rowsCount)
maxSamplesPerBlock := *maxRowsPerBlock
// Allow up to 10x of labels per each block on average.
maxLabelsPerBlock := 10 * maxSamplesPerBlock
for len(tss) > 0 {
// Process big tss in smaller blocks in order to reduce the maximum memory usage
samplesCount := 0
@ -418,7 +436,7 @@ func Push(at *auth.Token, wr *prompbmarshal.WriteRequest) bool {
i := 0
for i < len(tss) {
samplesCount += len(tss[i].Samples)
labelsCount += len(tss[i].Labels)
labelsCount += len(tss[i].Samples) * len(tss[i].Labels)
i++
if samplesCount >= maxSamplesPerBlock || labelsCount >= maxLabelsPerBlock {
break
@ -439,10 +457,14 @@ func Push(at *auth.Token, wr *prompbmarshal.WriteRequest) bool {
}
sortLabelsIfNeeded(tssBlock)
tssBlock = limitSeriesCardinality(tssBlock)
if !pushBlockToRemoteStorages(rwctxs, tssBlock) {
if rctx != nil {
rctx.reset()
putRelabelCtx(rctx)
if !tryPushBlockToRemoteStorages(rwctxs, tssBlock) {
if !*disableOnDiskQueue {
logger.Panicf("BUG: tryPushBlockToRemoteStorages must return true if -remoteWrite.disableOnDiskQueue isn't set")
}
pushFailures.Inc()
if *dropSamplesOnOverload {
samplesDropped.Add(rowsCount)
return true
}
return false
}
@ -450,13 +472,15 @@ func Push(at *auth.Token, wr *prompbmarshal.WriteRequest) bool {
rctx.reset()
}
}
if rctx != nil {
putRelabelCtx(rctx)
}
return true
}
func pushBlockToRemoteStorages(rwctxs []*remoteWriteCtx, tssBlock []prompbmarshal.TimeSeries) bool {
var (
samplesDropped = metrics.NewCounter(`vmagent_remotewrite_samples_dropped_total`)
pushFailures = metrics.NewCounter(`vmagent_remotewrite_push_failures_total`)
)
func tryPushBlockToRemoteStorages(rwctxs []*remoteWriteCtx, tssBlock []prompbmarshal.TimeSeries) bool {
if len(tssBlock) == 0 {
// Nothing to push
return true
@ -464,8 +488,7 @@ func pushBlockToRemoteStorages(rwctxs []*remoteWriteCtx, tssBlock []prompbmarsha
if len(rwctxs) == 1 {
// Fast path - just push data to the configured single remote storage
err := rwctxs[0].Push(tssBlock)
return err
return rwctxs[0].TryPush(tssBlock)
}
// We need to push tssBlock to multiple remote storages.
@ -502,7 +525,7 @@ func pushBlockToRemoteStorages(rwctxs []*remoteWriteCtx, tssBlock []prompbmarsha
}
go func(rwctx *remoteWriteCtx, tss []prompbmarshal.TimeSeries) {
defer wg.Done()
if !rwctx.Push(tss) {
if !rwctx.TryPush(tss) {
atomic.StoreUint64(&anyPushFailed, 1)
}
}(rwctx, tssShard)
@ -520,7 +543,7 @@ func pushBlockToRemoteStorages(rwctxs []*remoteWriteCtx, tssBlock []prompbmarsha
for _, rwctx := range rwctxs {
go func(rwctx *remoteWriteCtx) {
defer wg.Done()
if !rwctx.Push(tssBlock) {
if !rwctx.TryPush(tssBlock) {
atomic.StoreUint64(&anyPushFailed, 1)
}
}(rwctx)
@ -631,7 +654,6 @@ type remoteWriteCtx struct {
rowsPushedAfterRelabel *metrics.Counter
rowsDroppedByRelabel *metrics.Counter
rowsDroppedAtAggregationOnPush *metrics.Counter
}
func newRemoteWriteCtx(argIdx int, remoteWriteURL *url.URL, maxInmemoryBlocks int, sanitizedURL string) *remoteWriteCtx {
@ -654,9 +676,9 @@ func newRemoteWriteCtx(argIdx int, remoteWriteURL *url.URL, maxInmemoryBlocks in
_ = metrics.GetOrCreateGauge(fmt.Sprintf(`vmagent_remotewrite_pending_inmemory_blocks{path=%q, url=%q}`, queuePath, sanitizedURL), func() float64 {
return float64(fq.GetInmemoryQueueLen())
})
_ = metrics.GetOrCreateGauge(fmt.Sprintf(`vmagent_remotewrite_inmemory_queue_blocked{path=%q, url=%q}`, queuePath, sanitizedURL), func() float64 {
if fq.IsWritesBlocked() {
return 1.0
_ = metrics.GetOrCreateGauge(fmt.Sprintf(`vmagent_remotewrite_queue_blocked{path=%q, url=%q}`, queuePath, sanitizedURL), func() float64 {
if fq.IsWriteBlocked() {
return 1
}
return 0
})
@ -692,7 +714,6 @@ func newRemoteWriteCtx(argIdx int, remoteWriteURL *url.URL, maxInmemoryBlocks in
rowsPushedAfterRelabel: metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_rows_pushed_after_relabel_total{path=%q, url=%q}`, queuePath, sanitizedURL)),
rowsDroppedByRelabel: metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_relabel_metrics_dropped_total{path=%q, url=%q}`, queuePath, sanitizedURL)),
rowsDroppedAtAggregationOnPush: metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_aggregation_metrics_dropped_total{path=%q, url=%q}`, queuePath, sanitizedURL)),
}
// Initialize sas
@ -735,7 +756,7 @@ func (rwctx *remoteWriteCtx) MustStop() {
rwctx.rowsDroppedByRelabel = nil
}
func (rwctx *remoteWriteCtx) Push(tss []prompbmarshal.TimeSeries) bool {
func (rwctx *remoteWriteCtx) TryPush(tss []prompbmarshal.TimeSeries) bool {
// Apply relabeling
var rctx *relabelCtx
var v *[]prompbmarshal.TimeSeries
@ -773,16 +794,18 @@ func (rwctx *remoteWriteCtx) Push(tss []prompbmarshal.TimeSeries) bool {
}
matchIdxsPool.Put(matchIdxs)
}
defer func() {
// Try pushing the data to remote storage
ok := rwctx.tryPushInternal(tss)
// Return back relabeling contexts to the pool
if rctx != nil {
*v = prompbmarshal.ResetTimeSeries(tss)
tssPool.Put(v)
putRelabelCtx(rctx)
}
}()
return rwctx.pushInternal(tss)
return ok
}
var matchIdxsPool bytesutil.ByteBufferPool
@ -803,21 +826,22 @@ func dropAggregatedSeries(src []prompbmarshal.TimeSeries, matchIdxs []byte, drop
}
func (rwctx *remoteWriteCtx) pushInternalTrackDropped(tss []prompbmarshal.TimeSeries) {
if !rwctx.pushInternal(tss) {
rwctx.rowsDroppedAtAggregationOnPush.Inc()
if rwctx.tryPushInternal(tss) {
return
}
if !*disableOnDiskQueue {
logger.Panicf("BUG: tryPushInternal must return true if -remoteWrite.disableOnDiskQueue isn't set")
}
pushFailures.Inc()
if *dropSamplesOnOverload {
rowsCount := getRowsCount(tss)
samplesDropped.Add(rowsCount)
}
}
func (rwctx *remoteWriteCtx) pushInternal(tss []prompbmarshal.TimeSeries) bool {
func (rwctx *remoteWriteCtx) tryPushInternal(tss []prompbmarshal.TimeSeries) bool {
var rctx *relabelCtx
var v *[]prompbmarshal.TimeSeries
defer func() {
if rctx != nil {
*v = prompbmarshal.ResetTimeSeries(tss)
tssPool.Put(v)
putRelabelCtx(rctx)
}
}()
if len(labelsGlobal) > 0 {
// Make a copy of tss before adding extra labels in order to prevent
// from affecting time series for other remoteWrite.url configs.
@ -829,7 +853,16 @@ func (rwctx *remoteWriteCtx) pushInternal(tss []prompbmarshal.TimeSeries) bool {
pss := rwctx.pss
idx := atomic.AddUint64(&rwctx.pssNextIdx, 1) % uint64(len(pss))
return pss[idx].Push(tss)
ok := pss[idx].TryPush(tss)
if rctx != nil {
*v = prompbmarshal.ResetTimeSeries(tss)
tssPool.Put(v)
putRelabelCtx(rctx)
}
return ok
}
func (rwctx *remoteWriteCtx) reinitStreamAggr() {

View file

@ -76,7 +76,7 @@ func insertRows(at *auth.Token, rows []parser.Row, extraLabels []prompbmarshal.L
ctx.WriteRequest.Timeseries = tssDst
ctx.Labels = labels
ctx.Samples = samples
if !remotewrite.Push(at, &ctx.WriteRequest) {
if !remotewrite.TryPush(at, &ctx.WriteRequest) {
return remotewrite.ErrQueueFullHTTPRetry
}
rowsInserted.Add(rowsTotal)

View file

@ -97,9 +97,8 @@ func Init() {
if len(*opentsdbHTTPListenAddr) > 0 {
opentsdbhttpServer = opentsdbhttpserver.MustStart(*opentsdbHTTPListenAddr, *opentsdbHTTPUseProxyProtocol, opentsdbhttp.InsertHandler)
}
promscrape.Init(func(at *auth.Token, wr *prompbmarshal.WriteRequest) bool {
promscrape.Init(func(at *auth.Token, wr *prompbmarshal.WriteRequest) {
prompush.Push(wr)
return true
})
}

View file

@ -28,7 +28,7 @@ The sandbox cluster installation is running under the constant load generated by
## tip
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add `-remoteWrite.disableOnDiskQueue` command-line flag, which can be used for disabling data queuing to disk when the remote storage cannot keep up with the data ingestion rate. In this case `vmagent` returns `429 Too Many Requests` response, so clients could decrease data ingestion rate on their side. This option may be useful when `vmagent` runs in environments with slow persistent disks. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2110).
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add `-remoteWrite.disableOnDiskQueue` command-line flag, which can be used for disabling data queueing to disk when the remote storage cannot keep up with the data ingestion rate. See [these docs](https://docs.victoriametrics.com/vmagent.html#disabling-on-disk-persistence) and [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2110).
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add support for reading and writing samples via [Google PubSub](https://cloud.google.com/pubsub). See [these docs](https://docs.victoriametrics.com/vmagent.html#google-pubsub-integration).
* FEATURE: reduce the default value for `-import.maxLineLen` command-line flag from 100MB to 10MB in order to prevent excessive memory usage during data import via [/api/v1/import](https://docs.victoriametrics.com/#how-to-import-data-in-json-line-format).

View file

@ -869,47 +869,43 @@ scrape_configs:
- "Proxy-Auth: top-secret"
```
## Disabling on-disk queue
## Disabling on-disk persistence
On-disk queue aka persistent queue is a temporary folder configured via `-remoteWrite.tmpDataPath` flag. At this folder vmagent may store metric blocks.
Metric blocks persisted on disk if remote storage is not available or cannot handle ingestion load.
Size of this disk queue per remote storage can be limited via `-remoteWrite.maxDiskUsagePerURL`. By default, there is no limit.
In case of reaching those limit metric blocks will be silently dropped by vmagent.
By default `vmagent` stores pending data, which cannot be sent to the configured remote storage systems in a timely manner, in the folder configured
via `-remoteWrite.tmpDataPath` command-line flag. By default `vmagent` writes all the pending data to this folder until this data is sent to the configured
remote storage systems or until the folder becomes full. The maximum data size, which can be saved to `-remoteWrite.tmpDataPath`
per every configured `-remoteWrite.url`, can be limited via `-remoteWrite.maxDiskUsagePerURL` command-line flag.
When this limit is reached, `vmagent` drops the oldest data from disk in order to save newly ingested data.
This behaviour can be changed via flag `--remoteWrite.disableOnDiskQueue=true`.
It prevents vmagent from using on-disk storage for data buffering during ingestion or scraping.
But on-disk storage is still used for saving in-memory part of the queue and buffers during graceful shutdown.
There are cases when it is better disabling on-disk persistence for pending data at `vmagent` side:
It's expected that `streaming` aggregation and `scrapping` metrics will be dropped in case of full queue.
The following metrics help to detect samples drop: `vmagent_remotewrite_aggregation_metrics_dropped_total` and `vm_promscrape_push_samples_dropped_total`.
- When the persistent disk performance isn't enough for the given data processing rate.
- When it is better to buffer pending data at the client side instead of bufferring it at `vmagent` side in the `-remoteWrite.tmpDataPath` folder.
- When the data is already buffered at [Kafka side](#reading-metrics-from-kafka) or [Google PubSub side](#pubsub-integration).
- When it is better to drop pending data instead of buffering it.
In case of multiple configured remote storages, vmagent block writes requests even if a single remote storage cannot accept ingested samples.
In this case `-remoteWrite.disableOnDiskQueue` command-line flag can be passed to `vmagent`.
When this flag is specified, `vmagent` works in the following way if the configured remote storage systems cannot keep up with the data ingestion rate:
vmagent guarantees at-least-once delivery semantic.
It means that metric samples duplication is possible and [deduplication](https://docs.victoriametrics.com/#deduplication) must be configured at remote storage.
- It returns `429 Too Many Requests` HTTP error to clients, which send data to `vmagent` via [supported HTTP endpoints](#how-to-push-data-to-vmagent).
You can specify `-remoteWrite.dropSamplesOnOverload` command-line flag in order to drop the ingested samples instead of returning the error to clients in this case.
- It suspends consuming data from [Kafka side](#reading-metrics-from-kafka) or [Google PubSub side](#pubsub-integration) until the remote storage becomes available.
You can specify `-remoteWrite.dropSamplesOnOverload` command-line flag in order to drop the fetched samples instead of suspending data consumption from Kafka or Google PubSub.
- It drops samples [scraped from Prometheus-compatible targets](#how-to-collect-metrics-in-prometheus-format), because it is better to drop samples
instead of blocking the scrape process.
- It drops [stream aggregation](https://docs.victoriametrics.com/stream-aggregation.html) output samples, because it is better to drop output samples
instead of blocking the stream aggregation process.
### Common patterns
You may want to disable on-disk queue in the following cases:
The number of dropped samples because of overloaded remote storage can be [monitored](#monitoring) via `vmagent_remotewrite_samples_dropped_total` metric.
The number of unsuccessful attempts to send data to overloaded remote storage can be [monitored](#monitoring) via `vmagent_remotewrite_push_failures_total` metric.
1) chaining of vmagents. Intermediate vmagents used for aggregation may loss the data, if vmcluster is not available.
With disabled persistent queue aggregation vmagents will back-pressure metrics to the first vmagent.
`vmagent` still may write pending in-memory data to `-remoteWrite.tmpDataPath` on graceful shutdown
if `-remoteWrite.disableOnDiskQueue` command-line flag is specified. It may also read buffered data from `-remoteWrite.tmpDataPath`
on startup.
```mermaid
flowchart LR
A[vmagent] --> B(vmagent-aggregation-0)
A[vmagent] --> C(vmagent-aggregation-1)
B --> D[vmcluster]
C --> D[vmcluster]
```
2) If you want to replace actual on-disk queue with kafka or another compatible queue. On-disk queue must be disabled at `vmagent-consumer`
```mermaid
flowchart LR
A[vmagent] --> B(kafka)
B <--> C(vmagent-consumer)
C --> D[vmcluster]
```
When `-remoteWrite.disableOnDiskQueue` command-line flag is set, then `vmagent` may send the same samples multiple times to the configured remote storage
if it cannot keep up with the data ingestion rate. In this case the [deduplication](https://docs.victoriametrics.com/#deduplication)
must be enabled on all the configured remote storage systems.
## Cardinality limiter
@ -989,38 +985,39 @@ If you have suggestions for improvements or have found a bug - please open an is
## Troubleshooting
* We recommend you [set up the official Grafana dashboard](#monitoring) in order to monitor the state of `vmagent'.
* It is recommended [setting up the official Grafana dashboard](#monitoring) in order to monitor the state of `vmagent'.
* We recommend you increase the maximum number of open files in the system (`ulimit -n`) when scraping a big number of targets,
* It is recommended increasing the maximum number of open files in the system (`ulimit -n`) when scraping a big number of targets,
as `vmagent` establishes at least a single TCP connection per target.
* If `vmagent` uses too big amounts of memory, then the following options can help:
* Disabling staleness tracking with `-promscrape.noStaleMarkers` option. See [these docs](#prometheus-staleness-markers).
* Enabling stream parsing mode if `vmagent` scrapes targets with millions of metrics per target. See [these docs](#stream-parsing-mode).
* Reducing the number of output queues with `-remoteWrite.queues` command-line option.
* Reducing the amounts of RAM vmagent can use for in-memory buffering with `-memory.allowedPercent` or `-memory.allowedBytes` command-line option.
Another option is to reduce memory limits in Docker and/or Kubernetes if `vmagent` runs under these systems.
* Reducing the number of CPU cores vmagent can use by passing `GOMAXPROCS=N` environment variable to `vmagent`,
where `N` is the desired limit on CPU cores. Another option is to reduce CPU limits in Docker or Kubernetes if `vmagent` runs under these systems.
* Passing `-promscrape.dropOriginalLabels` command-line option to `vmagent`, so it drops `"discoveredLabels"` and `"droppedTargets"`
lists at `/api/v1/targets` page. This reduces memory usage when scraping big number of targets at the cost
of reduced debuggability for improperly configured per-target relabeling.
* Disabling staleness tracking with `-promscrape.noStaleMarkers` option. See [these docs](#prometheus-staleness-markers).
* Enabling stream parsing mode if `vmagent` scrapes targets with millions of metrics per target. See [these docs](#stream-parsing-mode).
* Reducing the number of tcp connections to remote storage systems with `-remoteWrite.queues` command-line option.
* Passing `-promscrape.dropOriginalLabels` command-line option to `vmagent` if it [discovers](https://docs.victoriametrics.com/sd_configs.html)
big number of targets and many of these targets are [dropped](https://docs.victoriametrics.com/relabeling.html#how-to-drop-discovered-targets)
before scraping. In this case `vmagent` drops `"discoveredLabels"` and `"droppedTargets"`
lists at `http://vmagent-host:8429/service-discovery` page. This reduces memory usage when scraping big number of targets at the cost
of reduced debuggability for improperly configured per-target [relabeling](https://docs.victoriametrics.com/relabeling.html).
* When `vmagent` scrapes many unreliable targets, it can flood the error log with scrape errors. These errors can be suppressed
by passing `-promscrape.suppressScrapeErrors` command-line flag to `vmagent`. The most recent scrape error per each target can be observed at `http://vmagent-host:8429/targets`
and `http://vmagent-host:8429/api/v1/targets`.
* When `vmagent` scrapes many unreliable targets, it can flood the error log with scrape errors. It is recommended investigating and fixing these errors.
If it is unfeasible to fix all the reported errors, then they can be suppressed by passing `-promscrape.suppressScrapeErrors` command-line flag to `vmagent`.
The most recent scrape error per each target can be observed at `http://vmagent-host:8429/targets` and `http://vmagent-host:8429/api/v1/targets`.
* The `/service-discovery` page could be useful for debugging relabeling process for scrape targets.
* The `http://vmagent-host:8429/service-discovery` page could be useful for debugging relabeling process for scrape targets.
This page contains original labels for targets dropped during relabeling.
By default, the `-promscrape.maxDroppedTargets` targets are shown here. If your setup drops more targets during relabeling,
then increase `-promscrape.maxDroppedTargets` command-line flag value to see all the dropped targets.
Note that tracking each dropped target requires up to 10Kb of RAM. Therefore, big values for `-promscrape.maxDroppedTargets`
may result in increased memory usage if a big number of scrape targets are dropped during relabeling.
* We recommend you increase `-remoteWrite.queues` if `vmagent_remotewrite_pending_data_bytes` metric exported
at `http://vmagent-host:8429/metrics` page grows constantly. It is also recommended increasing `-remoteWrite.maxBlockSize`
and `-remoteWrite.maxRowsPerBlock` command-line options in this case. This can improve data ingestion performance
to the configured remote storage systems at the cost of higher memory usage.
* It is recommended increaseing `-remoteWrite.queues` if `vmagent_remotewrite_pending_data_bytes` [metric](#monitoring)
grows constantly. It is also recommended increasing `-remoteWrite.maxBlockSize` and `-remoteWrite.maxRowsPerBlock` command-line options in this case.
This can improve data ingestion performance to the configured remote storage systems at the cost of higher memory usage.
* If you see gaps in the data pushed by `vmagent` to remote storage when `-remoteWrite.maxDiskUsagePerURL` is set,
try increasing `-remoteWrite.queues`. Such gaps may appear because `vmagent` cannot keep up with sending the collected data to remote storage.
@ -1034,8 +1031,12 @@ If you have suggestions for improvements or have found a bug - please open an is
The best solution is to use remote storage with [backfilling support](https://docs.victoriametrics.com/#backfilling) such as VictoriaMetrics.
* `vmagent` buffers scraped data at the `-remoteWrite.tmpDataPath` directory until it is sent to `-remoteWrite.url`.
The directory can grow large when remote storage is unavailable for extended periods of time and if `-remoteWrite.maxDiskUsagePerURL` isn't set.
If you don't want to send all the data from the directory to remote storage then simply stop `vmagent` and delete the directory.
The directory can grow large when remote storage is unavailable for extended periods of time and if the maximum directory size isn't limited
with `-remoteWrite.maxDiskUsagePerURL` command-line flag.
If you don't want to send all the buffered data from the directory to remote storage then simply stop `vmagent` and delete the directory.
* If `vmagent` runs on a host with slow persistent storage, which cannot keep up with the volume of processed samples, then is is possible to disable
the persistent storage with `-remoteWrite.disableOnDiskQueue` command-line flag. See [these docs](#disabling-on-disk-persistence) for more details.
* By default `vmagent` masks `-remoteWrite.url` with `secret-url` values in logs and at `/metrics` page because
the url may contain sensitive information such as auth tokens or passwords.
@ -1080,7 +1081,7 @@ If you have suggestions for improvements or have found a bug - please open an is
regex: true
```
See also [troubleshooting docs](https://docs.victoriametrics.com/Troubleshooting.html).
See also [general troubleshooting docs](https://docs.victoriametrics.com/Troubleshooting.html).
## Google PubSub integration
[Enterprise version](https://docs.victoriametrics.com/enterprise.html) of `vmagent` can read and write metrics from / to google [PubSub](https://cloud.google.com/pubsub):
@ -1789,8 +1790,9 @@ See the docs at https://docs.victoriametrics.com/vmagent.html .
Optional path to bearer token file to use for the corresponding -remoteWrite.url. The token is re-read from the file every second
Supports an array of values separated by comma or specified via multiple flags.
-remoteWrite.disableOnDiskQueue
Whether to disable on-disk queue for metrics ingestion processing. If in-memory queue is full for at least 1 remoteWrite target, all data ingestion is blocked and returns an error.
It allows to build a chain of vmagents and build complicated data pipelines without data-loss. On-disk writes is still possible during graceful shutdown for storing in-memory part of the queue.
Whether to disable storing pending data to -remoteWrite.tmpDataPath when the configured remote storage systems cannot keep up with the data ingestion rate. See https://docs.victoriametrics.com/vmagent.html#disabling-on-disk-persistence .See also -remoteWrite.dropSamplesOnOverload
-remoteWrite.dropSamplesOnOverload
Whether to drop samples when -remoteWrite.disableOnDiskQueue is set and if the samples cannot be pushed into the configured remote storage systems in a timely manner. See https://docs.victoriametrics.com/vmagent.html#disabling-on-disk-persistence
-remoteWrite.flushInterval duration
Interval for flushing the data to remote storage. This option takes effect only when less than 10K data points per second are pushed to -remoteWrite.url (default 1s)
-remoteWrite.forcePromProto array
@ -1892,7 +1894,7 @@ See the docs at https://docs.victoriametrics.com/vmagent.html .
Optional TLS server name to use for connections to the corresponding -remoteWrite.url. By default, the server name from -remoteWrite.url is used
Supports an array of values separated by comma or specified via multiple flags.
-remoteWrite.tmpDataPath string
Path to directory where temporary data for remote write component is stored. See also -remoteWrite.maxDiskUsagePerURL (default "vmagent-remotewrite-data")
Path to directory for storing pending data, which isn't sent to the configured -remoteWrite.url . See also -remoteWrite.maxDiskUsagePerURL and -remoteWrite.disableOnDiskQueue (default "vmagent-remotewrite-data")
-remoteWrite.url array
Remote storage URL to write data to. It must support either VictoriaMetrics remote write protocol or Prometheus remote_write protocol. Example url: http://<victoriametrics-host>:8428/api/v1/write . Pass multiple -remoteWrite.url options in order to replicate the collected data to multiple remote storage systems. The data can be sharded among the configured remote storage systems if -remoteWrite.shardByURL flag is set. See also -remoteWrite.multitenantURL
Supports an array of values separated by comma or specified via multiple flags.

View file

@ -22,7 +22,9 @@ type FastQueue struct {
// or when MustClose is called.
cond sync.Cond
// isPQDisabled is set to true when pq is disabled.
isPQDisabled bool
// pq is file-based queue
pq *queue
@ -43,7 +45,7 @@ type FastQueue struct {
// if maxPendingBytes is 0, then the queue size is unlimited.
// Otherwise its size is limited by maxPendingBytes. The oldest data is dropped when the queue
// reaches maxPendingSize.
// if isPQDisabled is set to true, all write requests that exceed in-memory buffer capacity'll be rejected with errQueueIsFull error
// if isPQDisabled is set to true, then write requests that exceed in-memory buffer capacity are rejected.
// in-memory queue part can be stored on disk during gracefull shutdown.
func MustOpenFastQueue(path, name string, maxInmemoryBlocks int, maxPendingBytes int64, isPQDisabled bool) *FastQueue {
pq := mustOpen(path, name, maxPendingBytes)
@ -65,8 +67,8 @@ func MustOpenFastQueue(path, name string, maxInmemoryBlocks int, maxPendingBytes
return fq
}
// IsWritesBlocked checks if data can be pushed into the queue
func (fq *FastQueue) IsWritesBlocked() bool {
// IsWriteBlocked checks if data can be pushed into fq
func (fq *FastQueue) IsWriteBlocked() bool {
if !fq.isPQDisabled {
return false
}
@ -132,11 +134,6 @@ func (fq *FastQueue) flushInmemoryBlocksToFileLocked() {
func (fq *FastQueue) GetPendingBytes() uint64 {
fq.mu.Lock()
defer fq.mu.Unlock()
return fq.getPendingBytesLocked()
}
func (fq *FastQueue) getPendingBytesLocked() uint64 {
n := fq.pendingInmemoryBytes
n += fq.pq.GetPendingBytes()
return n
@ -150,49 +147,53 @@ func (fq *FastQueue) GetInmemoryQueueLen() int {
return len(fq.ch)
}
// MustWriteBlockIgnoreDisabledPQ writes block to fq, persists data on disk even if persistent disabled by flag.
// it's needed to gracefully stop service and do not lose data if remote storage is not available.
// MustWriteBlockIgnoreDisabledPQ unconditionally writes block to fq.
//
// This method allows perisisting in-memory blocks during graceful shutdown, even if persistence is disabled.
func (fq *FastQueue) MustWriteBlockIgnoreDisabledPQ(block []byte) {
if !fq.writeBlock(block, true) {
logger.Fatalf("BUG: MustWriteBlockIgnoreDisabledPQ must always write data even if persistence is disabled")
if !fq.tryWriteBlock(block, true) {
logger.Fatalf("BUG: tryWriteBlock must always write data even if persistence is disabled")
}
}
// WriteBlock writes block to fq.
func (fq *FastQueue) WriteBlock(block []byte) bool {
return fq.writeBlock(block, false)
// TryWriteBlock tries writing block to fq.
//
// false is returned if the block couldn't be written to fq when the in-memory queue is full
// and the persistent queue is disabled.
func (fq *FastQueue) TryWriteBlock(block []byte) bool {
return fq.tryWriteBlock(block, false)
}
// WriteBlock writes block to fq.
func (fq *FastQueue) writeBlock(block []byte, mustIgnoreDisabledPQ bool) bool {
func (fq *FastQueue) tryWriteBlock(block []byte, ignoreDisabledPQ bool) bool {
fq.mu.Lock()
defer fq.mu.Unlock()
isPQWritesAllowed := !fq.isPQDisabled || mustIgnoreDisabledPQ
isPQWriteAllowed := !fq.isPQDisabled || ignoreDisabledPQ
fq.flushInmemoryBlocksToFileIfNeededLocked()
if n := fq.pq.GetPendingBytes(); n > 0 {
if !isPQWritesAllowed {
return false
}
// The file-based queue isn't drained yet. This means that in-memory queue cannot be used yet.
// So put the block to file-based queue.
if len(fq.ch) > 0 {
logger.Panicf("BUG: the in-memory queue must be empty when the file-based queue is non-empty; it contains %d pending bytes", n)
}
if !isPQWriteAllowed {
return false
}
fq.pq.MustWriteBlock(block)
return true
}
if len(fq.ch) == cap(fq.ch) {
// There is no space in the in-memory queue. Put the data to file-based queue.
if !isPQWritesAllowed {
// There is no space left in the in-memory queue. Put the data to file-based queue.
if !isPQWriteAllowed {
return false
}
fq.flushInmemoryBlocksToFileLocked()
fq.pq.MustWriteBlock(block)
return true
}
// There is enough space in the in-memory queue.
// Fast path - put the block to in-memory queue.
bb := blockBufPool.Get()
bb.B = append(bb.B[:0], block...)
fq.ch <- bb

View file

@ -11,7 +11,7 @@ func TestFastQueueOpenClose(_ *testing.T) {
path := "fast-queue-open-close"
mustDeleteDir(path)
for i := 0; i < 10; i++ {
fq := MustOpenFastQueue(path, "foobar", 100, 0, true)
fq := MustOpenFastQueue(path, "foobar", 100, 0, false)
fq.MustClose()
}
mustDeleteDir(path)
@ -22,15 +22,15 @@ func TestFastQueueWriteReadInmemory(t *testing.T) {
mustDeleteDir(path)
capacity := 100
fq := MustOpenFastQueue(path, "foobar", capacity, 0, true)
fq := MustOpenFastQueue(path, "foobar", capacity, 0, false)
if n := fq.GetInmemoryQueueLen(); n != 0 {
t.Fatalf("unexpected non-zero inmemory queue size: %d", n)
}
var blocks []string
for i := 0; i < capacity; i++ {
block := fmt.Sprintf("block %d", i)
if !fq.WriteBlock([]byte(block)) {
t.Fatalf("unexpected false for WriteBlock")
if !fq.TryWriteBlock([]byte(block)) {
t.Fatalf("TryWriteBlock must return true in this context")
}
blocks = append(blocks, block)
}
@ -62,8 +62,8 @@ func TestFastQueueWriteReadMixed(t *testing.T) {
var blocks []string
for i := 0; i < 2*capacity; i++ {
block := fmt.Sprintf("block %d", i)
if !fq.WriteBlock([]byte(block)) {
t.Fatalf("not expected WriteBlock fail")
if !fq.TryWriteBlock([]byte(block)) {
t.Fatalf("TryWriteBlock must return true in this context")
}
blocks = append(blocks, block)
}
@ -98,8 +98,8 @@ func TestFastQueueWriteReadWithCloses(t *testing.T) {
var blocks []string
for i := 0; i < 2*capacity; i++ {
block := fmt.Sprintf("block %d", i)
if !fq.WriteBlock([]byte(block)) {
t.Fatalf("unexpected false for WriteBlock")
if !fq.TryWriteBlock([]byte(block)) {
t.Fatalf("TryWriteBlock must return true in this context")
}
blocks = append(blocks, block)
@ -176,8 +176,8 @@ func TestFastQueueReadUnblockByWrite(t *testing.T) {
}
resultCh <- nil
}()
if !fq.WriteBlock([]byte(block)) {
t.Fatalf("unexpected false for WriteBlock")
if !fq.TryWriteBlock([]byte(block)) {
t.Fatalf("TryWriteBlock must return true in this context")
}
select {
case err := <-resultCh:
@ -235,9 +235,8 @@ func TestFastQueueReadWriteConcurrent(t *testing.T) {
go func() {
defer writersWG.Done()
for block := range blocksCh {
if !fq.WriteBlock([]byte(block)) {
t.Errorf("unexpected false for WriteBlock")
return
if !fq.TryWriteBlock([]byte(block)) {
panic(fmt.Errorf("TryWriteBlock must return true in this context"))
}
}
}()
@ -303,12 +302,12 @@ func TestFastQueueWriteReadWithDisabledPQ(t *testing.T) {
var blocks []string
for i := 0; i < capacity; i++ {
block := fmt.Sprintf("block %d", i)
if !fq.WriteBlock([]byte(block)) {
t.Fatalf("unexpected false for WriteBlock")
if !fq.TryWriteBlock([]byte(block)) {
t.Fatalf("TryWriteBlock must return true in this context")
}
blocks = append(blocks, block)
}
if fq.WriteBlock([]byte("error-block")) {
if fq.TryWriteBlock([]byte("error-block")) {
t.Fatalf("expect false due to full queue")
}
@ -339,12 +338,12 @@ func TestFastQueueWriteReadWithIgnoreDisabledPQ(t *testing.T) {
var blocks []string
for i := 0; i < capacity; i++ {
block := fmt.Sprintf("block %d", i)
if !fq.WriteBlock([]byte(block)) {
t.Fatalf("unexpected false for WriteBlock")
if !fq.TryWriteBlock([]byte(block)) {
t.Fatalf("TryWriteBlock must return true in this context")
}
blocks = append(blocks, block)
}
if fq.WriteBlock([]byte("error-block")) {
if fq.TryWriteBlock([]byte("error-block")) {
t.Fatalf("expect false due to full queue")
}
for i := 0; i < capacity; i++ {

View file

@ -22,7 +22,7 @@ func BenchmarkFastQueueThroughputSerial(b *testing.B) {
mustDeleteDir(path)
}()
for i := 0; i < b.N; i++ {
writeReadIterationFastQueue(b, fq, block, iterationsCount)
writeReadIterationFastQueue(fq, block, iterationsCount)
}
})
}
@ -44,17 +44,17 @@ func BenchmarkFastQueueThroughputConcurrent(b *testing.B) {
}()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
writeReadIterationFastQueue(b, fq, block, iterationsCount)
writeReadIterationFastQueue(fq, block, iterationsCount)
}
})
})
}
}
func writeReadIterationFastQueue(b *testing.B, fq *FastQueue, block []byte, iterationsCount int) {
func writeReadIterationFastQueue(fq *FastQueue, block []byte, iterationsCount int) {
for i := 0; i < iterationsCount; i++ {
if !fq.WriteBlock(block) {
b.Fatalf("unexpected false for WriteBlock")
if !fq.TryWriteBlock(block) {
panic(fmt.Errorf("TryWriteBlock must return true"))
}
}
var ok bool

View file

@ -59,18 +59,13 @@ func CheckConfig() error {
// Init initializes Prometheus scraper with config from the `-promscrape.config`.
//
// Scraped data is passed to pushData.
func Init(pushData func(at *auth.Token, wr *prompbmarshal.WriteRequest) bool) {
func Init(pushData func(at *auth.Token, wr *prompbmarshal.WriteRequest)) {
mustInitClusterMemberID()
pushDataTrackDropped := func(at *auth.Token, wr *prompbmarshal.WriteRequest) {
if !pushData(at, wr) {
pushDataFailsTotal.Inc()
}
}
globalStopChan = make(chan struct{})
scraperWG.Add(1)
go func() {
defer scraperWG.Done()
runScraper(*promscrapeConfigFile, pushDataTrackDropped, globalStopChan)
runScraper(*promscrapeConfigFile, pushData, globalStopChan)
}()
}
@ -89,8 +84,6 @@ var (
// configData contains -promscrape.config data
configData atomic.Pointer[[]byte]
pushDataFailsTotal = metrics.NewCounter(`vm_promscrape_push_samples_dropped_total`)
)
// WriteConfigData writes -promscrape.config contents to w