VictoriaMetrics/lib/persistentqueue/fastqueue.go
Aliaksandr Valialkin 0145b65f25
app/vmagent/remotewrite: follow-up for 87fd400dfc
- Drop samples and return true from remotewrite.TryPush() at fast path when all the remote storage
  systems are configured with the disabled on-disk queue, every in-memory queue is full
  and -remoteWrite.dropSamplesOnOverload is set to true. This case is quite common,
  so it should be optimized. Previously additional CPU time was spent on per-remoteWriteCtx
  relabeling and other processing in this case.

- Properly count the number of dropped samples inside remoteWriteCtx.pushInternalTrackDropped().
  Previously dropped samples were counted only if -remoteWrite.dropSamplesOnOverload flag is set.
  In reality, the samples are dropped when they couldn't be sent to the queue because in-memory queue is full
  and on-disk queue is disabled.
  The remoteWriteCtx.pushInternalTrackDropped() function is called by streaming aggregation for pushing
  the aggregated data to the remote storage. Streaming aggregation cannot wait until the remote storage
  processes pending data, so it drops aggregated samples in this case.

- Clarify the description for -remoteWrite.disableOnDiskQueue command-line flag at -help output,
  so it is clear that this flag can be set individually per each -remoteWrite.url.

- Make the -remoteWrite.dropSamplesOnOverload flag global. If some of the remote storage systems
  are configured with the disabled on-disk queue, then there is no sense in keeping samples
  on some of these systems, while dropping samples on the remaining systems, since this
  will result in global stall on the remote storage system with the disabled on-disk queue
  and with the -remoteWrite.dropSamplesOnOverload=false flag. vmagent will always return false
  from remotewrite.TryPush() in this case. This will result in infinite duplicate samples
  written to the remaining remote storage systems. That's why the -remoteWrite.dropSamplesOnOverload
  is forcibly set to true if more than one -remoteWrite.disableOnDiskQueue flag is set.
  This allows proceeding with newly scraped / pushed samples by sending them to the remaining
  remote storage systems, while dropping them on overloaded systems with the -remoteWrite.disableOnDiskQueue flag set.

- Verify that the remoteWriteCtx.TryPush() returns true in the TestRemoteWriteContext_TryPush_ImmutableTimeseries test.

- Mention in vmagent docs that the -remoteWrite.disableOnDiskQueue command-line flag can be set individually per each -remoteWrite.url.
  See https://docs.victoriametrics.com/vmagent/#disabling-on-disk-persistence

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6248
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6065
2024-07-13 02:25:19 +02:00

258 lines
7.4 KiB
Go

package persistentqueue
import (
"fmt"
"path/filepath"
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/metrics"
)
// FastQueue is fast persistent queue, which prefers sending data via memory.
//
// It falls back to sending data via file when readers don't catch up with writers.
type FastQueue struct {
// mu protects the state of FastQueue.
mu sync.Mutex
// cond is used for notifying blocked readers when new data has been added
// or when MustClose is called.
cond sync.Cond
// isPQDisabled is set to true when pq is disabled.
isPQDisabled bool
// pq is file-based queue
pq *queue
// ch is in-memory queue
ch chan *bytesutil.ByteBuffer
pendingInmemoryBytes uint64
lastInmemoryBlockReadTime uint64
stopDeadline uint64
}
// MustOpenFastQueue opens persistent queue at the given path.
//
// It holds up to maxInmemoryBlocks in memory before falling back to file-based persistence.
//
// if maxPendingBytes is 0, then the queue size is unlimited.
// Otherwise its size is limited by maxPendingBytes. The oldest data is dropped when the queue
// reaches maxPendingSize.
// if isPQDisabled is set to true, then write requests that exceed in-memory buffer capacity are rejected.
// in-memory queue part can be stored on disk during gracefull shutdown.
func MustOpenFastQueue(path, name string, maxInmemoryBlocks int, maxPendingBytes int64, isPQDisabled bool) *FastQueue {
pq := mustOpen(path, name, maxPendingBytes)
fq := &FastQueue{
pq: pq,
isPQDisabled: isPQDisabled,
ch: make(chan *bytesutil.ByteBuffer, maxInmemoryBlocks),
}
fq.cond.L = &fq.mu
fq.lastInmemoryBlockReadTime = fasttime.UnixTimestamp()
_ = metrics.GetOrCreateGauge(fmt.Sprintf(`vm_persistentqueue_bytes_pending{path=%q}`, path), func() float64 {
fq.mu.Lock()
n := fq.pq.GetPendingBytes()
fq.mu.Unlock()
return float64(n)
})
pendingBytes := fq.GetPendingBytes()
persistenceStatus := "enabled"
if isPQDisabled {
persistenceStatus = "disabled"
}
logger.Infof("opened fast queue at %q with maxInmemoryBlocks=%d, it contains %d pending bytes, persistence is %s", path, maxInmemoryBlocks, pendingBytes, persistenceStatus)
return fq
}
// IsPersistentQueueDisabled returns true if persistend queue at fq is disabled.
func (fq *FastQueue) IsPersistentQueueDisabled() bool {
return fq.isPQDisabled
}
// IsWriteBlocked checks if data can be pushed into fq
func (fq *FastQueue) IsWriteBlocked() bool {
if !fq.isPQDisabled {
return false
}
fq.mu.Lock()
defer fq.mu.Unlock()
return len(fq.ch) == cap(fq.ch) || fq.pq.GetPendingBytes() > 0
}
// UnblockAllReaders unblocks all the readers.
func (fq *FastQueue) UnblockAllReaders() {
fq.mu.Lock()
defer fq.mu.Unlock()
// Unblock blocked readers
// Allow for up to 5 seconds for sending Prometheus stale markers.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1526
fq.stopDeadline = fasttime.UnixTimestamp() + 5
fq.cond.Broadcast()
}
// MustClose unblocks all the readers.
//
// It is expected no new writers during and after the call.
func (fq *FastQueue) MustClose() {
fq.UnblockAllReaders()
fq.mu.Lock()
defer fq.mu.Unlock()
// flush blocks from fq.ch to fq.pq, so they can be persisted
fq.flushInmemoryBlocksToFileLocked()
// Close fq.pq
fq.pq.MustClose()
logger.Infof("closed fast persistent queue at %q", fq.pq.dir)
}
func (fq *FastQueue) flushInmemoryBlocksToFileIfNeededLocked() {
if len(fq.ch) == 0 || fq.isPQDisabled {
return
}
if fasttime.UnixTimestamp() < fq.lastInmemoryBlockReadTime+5 {
return
}
fq.flushInmemoryBlocksToFileLocked()
}
func (fq *FastQueue) flushInmemoryBlocksToFileLocked() {
// fq.mu must be locked by the caller.
for len(fq.ch) > 0 {
bb := <-fq.ch
fq.pq.MustWriteBlock(bb.B)
fq.pendingInmemoryBytes -= uint64(len(bb.B))
fq.lastInmemoryBlockReadTime = fasttime.UnixTimestamp()
blockBufPool.Put(bb)
}
// Unblock all the potentially blocked readers, so they could proceed with reading file-based queue.
fq.cond.Broadcast()
}
// GetPendingBytes returns the number of pending bytes in the fq.
func (fq *FastQueue) GetPendingBytes() uint64 {
fq.mu.Lock()
defer fq.mu.Unlock()
n := fq.pendingInmemoryBytes
n += fq.pq.GetPendingBytes()
return n
}
// GetInmemoryQueueLen returns the length of inmemory queue.
func (fq *FastQueue) GetInmemoryQueueLen() int {
fq.mu.Lock()
defer fq.mu.Unlock()
return len(fq.ch)
}
// MustWriteBlockIgnoreDisabledPQ unconditionally writes block to fq.
//
// This method allows perisisting in-memory blocks during graceful shutdown, even if persistence is disabled.
func (fq *FastQueue) MustWriteBlockIgnoreDisabledPQ(block []byte) {
if !fq.tryWriteBlock(block, true) {
logger.Fatalf("BUG: tryWriteBlock must always write data even if persistence is disabled")
}
}
// TryWriteBlock tries writing block to fq.
//
// false is returned if the block couldn't be written to fq when the in-memory queue is full
// and the persistent queue is disabled.
func (fq *FastQueue) TryWriteBlock(block []byte) bool {
return fq.tryWriteBlock(block, false)
}
// WriteBlock writes block to fq.
func (fq *FastQueue) tryWriteBlock(block []byte, ignoreDisabledPQ bool) bool {
fq.mu.Lock()
defer fq.mu.Unlock()
isPQWriteAllowed := !fq.isPQDisabled || ignoreDisabledPQ
fq.flushInmemoryBlocksToFileIfNeededLocked()
if n := fq.pq.GetPendingBytes(); n > 0 {
// The file-based queue isn't drained yet. This means that in-memory queue cannot be used yet.
// So put the block to file-based queue.
if len(fq.ch) > 0 {
logger.Panicf("BUG: the in-memory queue must be empty when the file-based queue is non-empty; it contains %d pending bytes", n)
}
if !isPQWriteAllowed {
return false
}
fq.pq.MustWriteBlock(block)
return true
}
if len(fq.ch) == cap(fq.ch) {
// There is no space left in the in-memory queue. Put the data to file-based queue.
if !isPQWriteAllowed {
return false
}
fq.flushInmemoryBlocksToFileLocked()
fq.pq.MustWriteBlock(block)
return true
}
// Fast path - put the block to in-memory queue.
bb := blockBufPool.Get()
bb.B = append(bb.B[:0], block...)
fq.ch <- bb
fq.pendingInmemoryBytes += uint64(len(block))
// Notify potentially blocked reader.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/pull/484 for the context.
fq.cond.Signal()
return true
}
// MustReadBlock reads the next block from fq to dst and returns it.
func (fq *FastQueue) MustReadBlock(dst []byte) ([]byte, bool) {
fq.mu.Lock()
defer fq.mu.Unlock()
for {
if fq.stopDeadline > 0 && fasttime.UnixTimestamp() > fq.stopDeadline {
return dst, false
}
if len(fq.ch) > 0 {
if n := fq.pq.GetPendingBytes(); n > 0 {
logger.Panicf("BUG: the file-based queue must be empty when the inmemory queue is non-empty; it contains %d pending bytes", n)
}
bb := <-fq.ch
fq.pendingInmemoryBytes -= uint64(len(bb.B))
fq.lastInmemoryBlockReadTime = fasttime.UnixTimestamp()
dst = append(dst, bb.B...)
blockBufPool.Put(bb)
return dst, true
}
if n := fq.pq.GetPendingBytes(); n > 0 {
data, ok := fq.pq.MustReadBlockNonblocking(dst)
if ok {
return data, true
}
dst = data
continue
}
if fq.stopDeadline > 0 {
return dst, false
}
// There are no blocks. Wait for new block.
fq.pq.ResetIfEmpty()
fq.cond.Wait()
}
}
// Dirname returns the directory name for persistent queue.
func (fq *FastQueue) Dirname() string {
return filepath.Base(fq.pq.dir)
}