mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-10 15:14:09 +00:00
87fd400dfc
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): allow configuring `-remoteWrite.disableOnDiskQueue` and `-remoteWrite.dropSamplesOnOverload` cmd-line flags per each `-remoteWrite.url`. See this [pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6065). Thanks to @rbizos for implementaion! * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add labels `path` and `url` to metrics `vmagent_remotewrite_push_failures_total` and `vmagent_remotewrite_samples_dropped_total`. Now number of failed pushes and dropped samples can be tracked per `-remoteWrite.url`. --------- Signed-off-by: hagen1778 <roman@victoriametrics.com> Co-authored-by: Raphael Bizos <r.bizos@criteo.com>
248 lines
7.2 KiB
Go
248 lines
7.2 KiB
Go
package persistentqueue
|
|
|
|
import (
|
|
"fmt"
|
|
"path/filepath"
|
|
"sync"
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
|
"github.com/VictoriaMetrics/metrics"
|
|
)
|
|
|
|
// FastQueue is fast persistent queue, which prefers sending data via memory.
|
|
//
|
|
// It falls back to sending data via file when readers don't catch up with writers.
|
|
type FastQueue struct {
|
|
// my protects the state of FastQueue.
|
|
mu sync.Mutex
|
|
|
|
// cond is used for notifying blocked readers when new data has been added
|
|
// or when MustClose is called.
|
|
cond sync.Cond
|
|
|
|
// isPQDisabled is set to true when pq is disabled.
|
|
isPQDisabled bool
|
|
|
|
// pq is file-based queue
|
|
pq *queue
|
|
|
|
// ch is in-memory queue
|
|
ch chan *bytesutil.ByteBuffer
|
|
|
|
pendingInmemoryBytes uint64
|
|
|
|
lastInmemoryBlockReadTime uint64
|
|
|
|
stopDeadline uint64
|
|
}
|
|
|
|
// MustOpenFastQueue opens persistent queue at the given path.
|
|
//
|
|
// It holds up to maxInmemoryBlocks in memory before falling back to file-based persistence.
|
|
//
|
|
// if maxPendingBytes is 0, then the queue size is unlimited.
|
|
// Otherwise its size is limited by maxPendingBytes. The oldest data is dropped when the queue
|
|
// reaches maxPendingSize.
|
|
// if isPQDisabled is set to true, then write requests that exceed in-memory buffer capacity are rejected.
|
|
// in-memory queue part can be stored on disk during gracefull shutdown.
|
|
func MustOpenFastQueue(path, name string, maxInmemoryBlocks int, maxPendingBytes int64, isPQDisabled bool) *FastQueue {
|
|
pq := mustOpen(path, name, maxPendingBytes)
|
|
fq := &FastQueue{
|
|
pq: pq,
|
|
isPQDisabled: isPQDisabled,
|
|
ch: make(chan *bytesutil.ByteBuffer, maxInmemoryBlocks),
|
|
}
|
|
fq.cond.L = &fq.mu
|
|
fq.lastInmemoryBlockReadTime = fasttime.UnixTimestamp()
|
|
_ = metrics.GetOrCreateGauge(fmt.Sprintf(`vm_persistentqueue_bytes_pending{path=%q}`, path), func() float64 {
|
|
fq.mu.Lock()
|
|
n := fq.pq.GetPendingBytes()
|
|
fq.mu.Unlock()
|
|
return float64(n)
|
|
})
|
|
pendingBytes := fq.GetPendingBytes()
|
|
logger.Infof("opened fast persistent queue at %q with maxInmemoryBlocks=%d isPQDisabled=%t, it contains %d pending bytes", path, maxInmemoryBlocks, isPQDisabled, pendingBytes)
|
|
return fq
|
|
}
|
|
|
|
// IsWriteBlocked checks if data can be pushed into fq
|
|
func (fq *FastQueue) IsWriteBlocked() bool {
|
|
if !fq.isPQDisabled {
|
|
return false
|
|
}
|
|
fq.mu.Lock()
|
|
defer fq.mu.Unlock()
|
|
return len(fq.ch) == cap(fq.ch) || fq.pq.GetPendingBytes() > 0
|
|
}
|
|
|
|
// UnblockAllReaders unblocks all the readers.
|
|
func (fq *FastQueue) UnblockAllReaders() {
|
|
fq.mu.Lock()
|
|
defer fq.mu.Unlock()
|
|
|
|
// Unblock blocked readers
|
|
// Allow for up to 5 seconds for sending Prometheus stale markers.
|
|
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1526
|
|
fq.stopDeadline = fasttime.UnixTimestamp() + 5
|
|
fq.cond.Broadcast()
|
|
}
|
|
|
|
// MustClose unblocks all the readers.
|
|
//
|
|
// It is expected no new writers during and after the call.
|
|
func (fq *FastQueue) MustClose() {
|
|
fq.UnblockAllReaders()
|
|
|
|
fq.mu.Lock()
|
|
defer fq.mu.Unlock()
|
|
|
|
// flush blocks from fq.ch to fq.pq, so they can be persisted
|
|
fq.flushInmemoryBlocksToFileLocked()
|
|
|
|
// Close fq.pq
|
|
fq.pq.MustClose()
|
|
|
|
logger.Infof("closed fast persistent queue at %q", fq.pq.dir)
|
|
}
|
|
|
|
func (fq *FastQueue) flushInmemoryBlocksToFileIfNeededLocked() {
|
|
if len(fq.ch) == 0 || fq.isPQDisabled {
|
|
return
|
|
}
|
|
if fasttime.UnixTimestamp() < fq.lastInmemoryBlockReadTime+5 {
|
|
return
|
|
}
|
|
fq.flushInmemoryBlocksToFileLocked()
|
|
}
|
|
|
|
func (fq *FastQueue) flushInmemoryBlocksToFileLocked() {
|
|
// fq.mu must be locked by the caller.
|
|
for len(fq.ch) > 0 {
|
|
bb := <-fq.ch
|
|
fq.pq.MustWriteBlock(bb.B)
|
|
fq.pendingInmemoryBytes -= uint64(len(bb.B))
|
|
fq.lastInmemoryBlockReadTime = fasttime.UnixTimestamp()
|
|
blockBufPool.Put(bb)
|
|
}
|
|
// Unblock all the potentially blocked readers, so they could proceed with reading file-based queue.
|
|
fq.cond.Broadcast()
|
|
}
|
|
|
|
// GetPendingBytes returns the number of pending bytes in the fq.
|
|
func (fq *FastQueue) GetPendingBytes() uint64 {
|
|
fq.mu.Lock()
|
|
defer fq.mu.Unlock()
|
|
n := fq.pendingInmemoryBytes
|
|
n += fq.pq.GetPendingBytes()
|
|
return n
|
|
}
|
|
|
|
// GetInmemoryQueueLen returns the length of inmemory queue.
|
|
func (fq *FastQueue) GetInmemoryQueueLen() int {
|
|
fq.mu.Lock()
|
|
defer fq.mu.Unlock()
|
|
|
|
return len(fq.ch)
|
|
}
|
|
|
|
// MustWriteBlockIgnoreDisabledPQ unconditionally writes block to fq.
|
|
//
|
|
// This method allows perisisting in-memory blocks during graceful shutdown, even if persistence is disabled.
|
|
func (fq *FastQueue) MustWriteBlockIgnoreDisabledPQ(block []byte) {
|
|
if !fq.tryWriteBlock(block, true) {
|
|
logger.Fatalf("BUG: tryWriteBlock must always write data even if persistence is disabled")
|
|
}
|
|
}
|
|
|
|
// TryWriteBlock tries writing block to fq.
|
|
//
|
|
// false is returned if the block couldn't be written to fq when the in-memory queue is full
|
|
// and the persistent queue is disabled.
|
|
func (fq *FastQueue) TryWriteBlock(block []byte) bool {
|
|
return fq.tryWriteBlock(block, false)
|
|
}
|
|
|
|
// WriteBlock writes block to fq.
|
|
func (fq *FastQueue) tryWriteBlock(block []byte, ignoreDisabledPQ bool) bool {
|
|
fq.mu.Lock()
|
|
defer fq.mu.Unlock()
|
|
|
|
isPQWriteAllowed := !fq.isPQDisabled || ignoreDisabledPQ
|
|
|
|
fq.flushInmemoryBlocksToFileIfNeededLocked()
|
|
if n := fq.pq.GetPendingBytes(); n > 0 {
|
|
// The file-based queue isn't drained yet. This means that in-memory queue cannot be used yet.
|
|
// So put the block to file-based queue.
|
|
if len(fq.ch) > 0 {
|
|
logger.Panicf("BUG: the in-memory queue must be empty when the file-based queue is non-empty; it contains %d pending bytes", n)
|
|
}
|
|
if !isPQWriteAllowed {
|
|
return false
|
|
}
|
|
fq.pq.MustWriteBlock(block)
|
|
return true
|
|
}
|
|
if len(fq.ch) == cap(fq.ch) {
|
|
// There is no space left in the in-memory queue. Put the data to file-based queue.
|
|
if !isPQWriteAllowed {
|
|
return false
|
|
}
|
|
fq.flushInmemoryBlocksToFileLocked()
|
|
fq.pq.MustWriteBlock(block)
|
|
return true
|
|
}
|
|
// Fast path - put the block to in-memory queue.
|
|
bb := blockBufPool.Get()
|
|
bb.B = append(bb.B[:0], block...)
|
|
fq.ch <- bb
|
|
fq.pendingInmemoryBytes += uint64(len(block))
|
|
|
|
// Notify potentially blocked reader.
|
|
// See https://github.com/VictoriaMetrics/VictoriaMetrics/pull/484 for the context.
|
|
fq.cond.Signal()
|
|
return true
|
|
}
|
|
|
|
// MustReadBlock reads the next block from fq to dst and returns it.
|
|
func (fq *FastQueue) MustReadBlock(dst []byte) ([]byte, bool) {
|
|
fq.mu.Lock()
|
|
defer fq.mu.Unlock()
|
|
|
|
for {
|
|
if fq.stopDeadline > 0 && fasttime.UnixTimestamp() > fq.stopDeadline {
|
|
return dst, false
|
|
}
|
|
if len(fq.ch) > 0 {
|
|
if n := fq.pq.GetPendingBytes(); n > 0 {
|
|
logger.Panicf("BUG: the file-based queue must be empty when the inmemory queue is non-empty; it contains %d pending bytes", n)
|
|
}
|
|
bb := <-fq.ch
|
|
fq.pendingInmemoryBytes -= uint64(len(bb.B))
|
|
fq.lastInmemoryBlockReadTime = fasttime.UnixTimestamp()
|
|
dst = append(dst, bb.B...)
|
|
blockBufPool.Put(bb)
|
|
return dst, true
|
|
}
|
|
if n := fq.pq.GetPendingBytes(); n > 0 {
|
|
data, ok := fq.pq.MustReadBlockNonblocking(dst)
|
|
if ok {
|
|
return data, true
|
|
}
|
|
dst = data
|
|
continue
|
|
}
|
|
if fq.stopDeadline > 0 {
|
|
return dst, false
|
|
}
|
|
// There are no blocks. Wait for new block.
|
|
fq.pq.ResetIfEmpty()
|
|
fq.cond.Wait()
|
|
}
|
|
}
|
|
|
|
// Dirname returns the directory name for persistent queue.
|
|
func (fq *FastQueue) Dirname() string {
|
|
return filepath.Base(fq.pq.dir)
|
|
}
|