lib/persistentqueue: delete corrupted persistent queue instead of throwing a fatal error

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1030
This commit is contained in:
Aliaksandr Valialkin 2021-04-05 19:19:58 +03:00
parent f010d773d6
commit 95dbebf512
5 changed files with 195 additions and 292 deletions

View file

@ -14,6 +14,7 @@
* FEATURE: vmagent: add AWS IAM roles for tasks support for EC2 service discovery according to [these docs](https://docs.aws.amazon.com/AmazonECS/latest/developerguide/task-iam-roles.html).
* FEATURE: vmagent: add support for `proxy_tls_config`, `proxy_authorization`, `proxy_basic_auth`, `proxy_bearer_token` and `proxy_bearer_token_file` options in `consul_sd_config`, `dockerswarm_sd_config` and `eureka_sd_config` sections.
* FEATURE: vmagent: pass `X-Prometheus-Scrape-Timeout-Seconds` header to scrape targets as Prometheus does. In this case scrape targets can limit the time needed for performing the scrape. See [this comment](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1179#issuecomment-813118733) for details.
* FEATURE: vmagent: drop corrupted persistent queue files at `-remoteWrite.tmpDataPath` instead of throwing a fatal error. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1030).
* FEATURE: vmauth: add support for authorization via [bearer token](https://swagger.io/docs/specification/authentication/bearer-authentication/). See [the docs](https://victoriametrics.github.io/vmauth.html#auth-config) for details.
* BUGFIX: vmagent: properly work with simple HTTP proxies which don't support `CONNECT` method. For example, [PushProx](https://github.com/prometheus-community/PushProx). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1179).

View file

@ -8,7 +8,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
)
// FastQueue is a wrapper around Queue, which prefers sending data via memory.
// FastQueue is fast persistent queue, which prefers sending data via memory.
//
// It falls back to sending data via file when readers don't catch up with writers.
type FastQueue struct {
@ -20,7 +20,7 @@ type FastQueue struct {
cond sync.Cond
// pq is file-based queue
pq *Queue
pq *queue
// ch is in-memory queue
ch chan *bytesutil.ByteBuffer
@ -40,7 +40,7 @@ type FastQueue struct {
// Otherwise its size is limited by maxPendingBytes. The oldest data is dropped when the queue
// reaches maxPendingSize.
func MustOpenFastQueue(path, name string, maxInmemoryBlocks, maxPendingBytes int) *FastQueue {
pq := MustOpen(path, name, maxPendingBytes)
pq := mustOpen(path, name, maxPendingBytes)
fq := &FastQueue{
pq: pq,
ch: make(chan *bytesutil.ByteBuffer, maxInmemoryBlocks),
@ -174,7 +174,12 @@ func (fq *FastQueue) MustReadBlock(dst []byte) ([]byte, bool) {
return dst, true
}
if n := fq.pq.GetPendingBytes(); n > 0 {
return fq.pq.MustReadBlock(dst)
data, ok := fq.pq.MustReadBlockNonblocking(dst)
if ok {
return data, true
}
dst = data
continue
}
// There are no blocks. Wait for new block.

View file

@ -8,7 +8,6 @@ import (
"os"
"regexp"
"strconv"
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
@ -26,8 +25,10 @@ const defaultChunkFileSize = (MaxBlockSize + 8) * 16
var chunkFileNameRegex = regexp.MustCompile("^[0-9A-F]{16}$")
// Queue represents persistent queue.
type Queue struct {
// queue represents persistent queue.
//
// It is unsafe to call queue methods from concurrent goroutines.
type queue struct {
chunkFileSize uint64
maxBlockSize uint64
maxPendingBytes uint64
@ -37,13 +38,6 @@ type Queue struct {
flockF *os.File
// mu protects all the fields below.
mu sync.Mutex
// cond is used for notifying blocked readers when new data has been added
// or when MustClose is called.
cond sync.Cond
reader *filestream.Reader
readerPath string
readerOffset uint64
@ -74,10 +68,7 @@ type Queue struct {
// ResetIfEmpty resets q if it is empty.
//
// This is needed in order to remove chunk file associated with empty q.
func (q *Queue) ResetIfEmpty() {
q.mu.Lock()
defer q.mu.Unlock()
func (q *queue) ResetIfEmpty() {
if q.readerOffset != q.writerOffset {
// The queue isn't empty.
return
@ -86,10 +77,13 @@ func (q *Queue) ResetIfEmpty() {
// The file is too small to drop. Leave it as is in order to reduce filesystem load.
return
}
q.mustResetFiles()
}
func (q *queue) mustResetFiles() {
if q.readerPath != q.writerPath {
logger.Panicf("BUG: readerPath=%q doesn't match writerPath=%q", q.readerPath, q.writerPath)
}
q.reader.MustClose()
q.writer.MustClose()
fs.MustRemoveAll(q.readerPath)
@ -115,31 +109,29 @@ func (q *Queue) ResetIfEmpty() {
}
q.reader = r
if err := q.flushMetainfoLocked(); err != nil {
if err := q.flushMetainfo(); err != nil {
logger.Panicf("FATAL: cannot flush metainfo: %s", err)
}
}
// GetPendingBytes returns the number of pending bytes in the queue.
func (q *Queue) GetPendingBytes() uint64 {
q.mu.Lock()
func (q *queue) GetPendingBytes() uint64 {
n := q.writerOffset - q.readerOffset
q.mu.Unlock()
return n
}
// MustOpen opens persistent queue from the given path.
// mustOpen opens persistent queue from the given path.
//
// If maxPendingBytes is greater than 0, then the max queue size is limited by this value.
// The oldest data is deleted when queue size exceeds maxPendingBytes.
func MustOpen(path, name string, maxPendingBytes int) *Queue {
func mustOpen(path, name string, maxPendingBytes int) *queue {
if maxPendingBytes < 0 {
maxPendingBytes = 0
}
return mustOpen(path, name, defaultChunkFileSize, MaxBlockSize, uint64(maxPendingBytes))
return mustOpenInternal(path, name, defaultChunkFileSize, MaxBlockSize, uint64(maxPendingBytes))
}
func mustOpen(path, name string, chunkFileSize, maxBlockSize, maxPendingBytes uint64) *Queue {
func mustOpenInternal(path, name string, chunkFileSize, maxBlockSize, maxPendingBytes uint64) *queue {
if chunkFileSize < 8 || chunkFileSize-8 < maxBlockSize {
logger.Panicf("BUG: too small chunkFileSize=%d for maxBlockSize=%d; chunkFileSize must fit at least one block", chunkFileSize, maxBlockSize)
}
@ -166,15 +158,14 @@ func mustCreateFlockFile(path string) *os.File {
return f
}
func tryOpeningQueue(path, name string, chunkFileSize, maxBlockSize, maxPendingBytes uint64) (*Queue, error) {
func tryOpeningQueue(path, name string, chunkFileSize, maxBlockSize, maxPendingBytes uint64) (*queue, error) {
// Protect from concurrent opens.
var q Queue
var q queue
q.chunkFileSize = chunkFileSize
q.maxBlockSize = maxBlockSize
q.maxPendingBytes = maxPendingBytes
q.dir = path
q.name = name
q.cond.L = &q.mu
q.blocksDropped = metrics.GetOrCreateCounter(fmt.Sprintf(`vm_persistentqueue_blocks_dropped_total{path=%q}`, path))
q.bytesDropped = metrics.GetOrCreateCounter(fmt.Sprintf(`vm_persistentqueue_bytes_dropped_total{path=%q}`, path))
@ -346,17 +337,8 @@ func tryOpeningQueue(path, name string, chunkFileSize, maxBlockSize, maxPendingB
// MustClose closes q.
//
// It unblocks all the MustReadBlock calls.
//
// MustWriteBlock mustn't be called during and after the call to MustClose.
func (q *Queue) MustClose() {
q.mu.Lock()
defer q.mu.Unlock()
// Unblock goroutines blocked on cond in MustReadBlock.
q.mustStop = true
q.cond.Broadcast()
func (q *queue) MustClose() {
// Close writer.
q.writer.MustClose()
q.writer = nil
@ -366,7 +348,7 @@ func (q *Queue) MustClose() {
q.reader = nil
// Store metainfo
if err := q.flushMetainfoLocked(); err != nil {
if err := q.flushMetainfo(); err != nil {
logger.Panicf("FATAL: cannot flush chunked queue metainfo: %s", err)
}
@ -377,11 +359,11 @@ func (q *Queue) MustClose() {
q.flockF = nil
}
func (q *Queue) chunkFilePath(offset uint64) string {
func (q *queue) chunkFilePath(offset uint64) string {
return fmt.Sprintf("%s/%016X", q.dir, offset)
}
func (q *Queue) metainfoPath() string {
func (q *queue) metainfoPath() string {
return q.dir + "/metainfo.json"
}
@ -390,14 +372,10 @@ func (q *Queue) metainfoPath() string {
// The block size cannot exceed MaxBlockSize.
//
// It is safe calling this function from concurrent goroutines.
func (q *Queue) MustWriteBlock(block []byte) {
func (q *queue) MustWriteBlock(block []byte) {
if uint64(len(block)) > q.maxBlockSize {
logger.Panicf("BUG: too big block to send: %d bytes; it mustn't exceed %d bytes", len(block), q.maxBlockSize)
}
q.mu.Lock()
defer q.mu.Unlock()
if q.mustStop {
logger.Panicf("BUG: MustWriteBlock cannot be called after MustClose")
}
@ -416,7 +394,10 @@ func (q *Queue) MustWriteBlock(block []byte) {
bb := blockBufPool.Get()
for q.writerOffset-q.readerOffset > maxPendingBytes {
var err error
bb.B, err = q.readBlockLocked(bb.B[:0])
bb.B, err = q.readBlock(bb.B[:0])
if err == errEmptyQueue {
break
}
if err != nil {
logger.Panicf("FATAL: cannot read the oldest block %s", err)
}
@ -429,38 +410,18 @@ func (q *Queue) MustWriteBlock(block []byte) {
return
}
}
if err := q.writeBlockLocked(block); err != nil {
if err := q.writeBlock(block); err != nil {
logger.Panicf("FATAL: %s", err)
}
// Notify blocked reader if any.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/pull/484 for details.
q.cond.Signal()
}
var blockBufPool bytesutil.ByteBufferPool
func (q *Queue) writeBlockLocked(block []byte) error {
func (q *queue) writeBlock(block []byte) error {
if q.writerLocalOffset+q.maxBlockSize+8 > q.chunkFileSize {
// Finalize the current chunk and start new one.
q.writer.MustClose()
// There is no need to do fs.MustSyncPath(q.writerPath) here,
// since MustClose already does this.
if n := q.writerOffset % q.chunkFileSize; n > 0 {
q.writerOffset += (q.chunkFileSize - n)
if err := q.nextChunkFileForWrite(); err != nil {
return fmt.Errorf("cannot create next chunk file: %w", err)
}
q.writerFlushedOffset = q.writerOffset
q.writerLocalOffset = 0
q.writerPath = q.chunkFilePath(q.writerOffset)
w, err := filestream.Create(q.writerPath, false)
if err != nil {
return fmt.Errorf("cannot create chunk file %q: %w", q.writerPath, err)
}
q.writer = w
if err := q.flushMetainfoLocked(); err != nil {
return fmt.Errorf("cannot flush metainfo: %w", err)
}
fs.MustSyncPath(q.dir)
}
// Write block len.
@ -479,62 +440,61 @@ func (q *Queue) writeBlockLocked(block []byte) error {
}
q.blocksWritten.Inc()
q.bytesWritten.Add(len(block))
return q.flushMetainfoIfNeededLocked(true)
return q.flushWriterMetainfoIfNeeded()
}
// MustReadBlock appends the next block from q to dst and returns the result.
//
// false is returned after MustClose call.
//
// It is safe calling this function from concurrent goroutines.
func (q *Queue) MustReadBlock(dst []byte) ([]byte, bool) {
q.mu.Lock()
defer q.mu.Unlock()
for {
if q.mustStop {
return dst, false
func (q *queue) nextChunkFileForWrite() error {
// Finalize the current chunk and start new one.
q.writer.MustClose()
// There is no need to do fs.MustSyncPath(q.writerPath) here,
// since MustClose already does this.
if n := q.writerOffset % q.chunkFileSize; n > 0 {
q.writerOffset += q.chunkFileSize - n
}
q.writerFlushedOffset = q.writerOffset
q.writerLocalOffset = 0
q.writerPath = q.chunkFilePath(q.writerOffset)
w, err := filestream.Create(q.writerPath, false)
if err != nil {
return fmt.Errorf("cannot create chunk file %q: %w", q.writerPath, err)
}
q.writer = w
if err := q.flushMetainfo(); err != nil {
return fmt.Errorf("cannot flush metainfo: %w", err)
}
fs.MustSyncPath(q.dir)
return nil
}
// MustReadBlockNonblocking appends the next block from q to dst and returns the result.
//
// false is returned if q is empty.
func (q *queue) MustReadBlockNonblocking(dst []byte) ([]byte, bool) {
if q.readerOffset > q.writerOffset {
logger.Panicf("BUG: readerOffset=%d cannot exceed writerOffset=%d", q.readerOffset, q.writerOffset)
}
if q.readerOffset < q.writerOffset {
break
if q.readerOffset == q.writerOffset {
return dst, false
}
q.cond.Wait()
}
data, err := q.readBlockLocked(dst)
var err error
dst, err = q.readBlock(dst)
if err != nil {
// Skip the current chunk, since it may be broken.
q.readerOffset += q.chunkFileSize - q.readerOffset%q.chunkFileSize
_ = q.flushMetainfoLocked()
if err == errEmptyQueue {
return dst, false
}
logger.Panicf("FATAL: %s", err)
}
return data, true
return dst, true
}
func (q *Queue) readBlockLocked(dst []byte) ([]byte, error) {
func (q *queue) readBlock(dst []byte) ([]byte, error) {
if q.readerLocalOffset+q.maxBlockSize+8 > q.chunkFileSize {
// Remove the current chunk and go to the next chunk.
q.reader.MustClose()
fs.MustRemoveAll(q.readerPath)
if n := q.readerOffset % q.chunkFileSize; n > 0 {
q.readerOffset += (q.chunkFileSize - n)
if err := q.nextChunkFileForRead(); err != nil {
return dst, fmt.Errorf("cannot open next chunk file: %w", err)
}
q.readerLocalOffset = 0
q.readerPath = q.chunkFilePath(q.readerOffset)
r, err := filestream.Open(q.readerPath, true)
if err != nil {
return dst, fmt.Errorf("cannot open chunk file %q: %w", q.readerPath, err)
}
q.reader = r
if err := q.flushMetainfoLocked(); err != nil {
return dst, fmt.Errorf("cannot flush metainfo: %w", err)
}
fs.MustSyncPath(q.dir)
}
again:
// Read block len.
header := headerBufPool.Get()
header.B = bytesutil.Resize(header.B, 8)
@ -542,27 +502,73 @@ func (q *Queue) readBlockLocked(dst []byte) ([]byte, error) {
blockLen := encoding.UnmarshalUint64(header.B)
headerBufPool.Put(header)
if err != nil {
return dst, fmt.Errorf("cannot read header with size 8 bytes from %q: %w", q.readerPath, err)
logger.Errorf("skipping corrupted %q, since header with size 8 bytes cannot be read from it: %s", q.readerPath, err)
if err := q.skipBrokenChunkFile(); err != nil {
return dst, err
}
goto again
}
if blockLen > q.maxBlockSize {
return dst, fmt.Errorf("too big block size read from %q: %d bytes; cannot exceed %d bytes", q.readerPath, blockLen, q.maxBlockSize)
logger.Errorf("skipping corrupted %q, since too big block size is read from it: %d bytes; cannot exceed %d bytes", q.readerPath, blockLen, q.maxBlockSize)
if err := q.skipBrokenChunkFile(); err != nil {
return dst, err
}
goto again
}
// Read block contents.
dstLen := len(dst)
dst = bytesutil.Resize(dst, dstLen+int(blockLen))
if err := q.readFull(dst[dstLen:]); err != nil {
return dst, fmt.Errorf("cannot read block contents with size %d bytes from %q: %w", blockLen, q.readerPath, err)
logger.Errorf("skipping corrupted %q, since contents with size %d bytes cannot be read from it: %s", q.readerPath, blockLen, err)
if err := q.skipBrokenChunkFile(); err != nil {
return dst[:dstLen], err
}
goto again
}
q.blocksRead.Inc()
q.bytesRead.Add(int(blockLen))
if err := q.flushMetainfoIfNeededLocked(false); err != nil {
if err := q.flushReaderMetainfoIfNeeded(); err != nil {
return dst, err
}
return dst, nil
}
func (q *Queue) write(buf []byte) error {
func (q *queue) skipBrokenChunkFile() error {
// Try to recover from broken chunk file by skipping it.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1030
q.readerOffset += q.chunkFileSize - q.readerOffset%q.chunkFileSize
if q.readerOffset >= q.writerOffset {
q.mustResetFiles()
return errEmptyQueue
}
return q.nextChunkFileForRead()
}
var errEmptyQueue = fmt.Errorf("the queue is empty")
func (q *queue) nextChunkFileForRead() error {
// Remove the current chunk and go to the next chunk.
q.reader.MustClose()
fs.MustRemoveAll(q.readerPath)
if n := q.readerOffset % q.chunkFileSize; n > 0 {
q.readerOffset += q.chunkFileSize - n
}
q.readerLocalOffset = 0
q.readerPath = q.chunkFilePath(q.readerOffset)
r, err := filestream.Open(q.readerPath, true)
if err != nil {
return fmt.Errorf("cannot open chunk file %q: %w", q.readerPath, err)
}
q.reader = r
if err := q.flushMetainfo(); err != nil {
return fmt.Errorf("cannot flush metainfo: %w", err)
}
fs.MustSyncPath(q.dir)
return nil
}
func (q *queue) write(buf []byte) error {
bufLen := uint64(len(buf))
n, err := q.writer.Write(buf)
if err != nil {
@ -576,7 +582,7 @@ func (q *Queue) write(buf []byte) error {
return nil
}
func (q *Queue) readFull(buf []byte) error {
func (q *queue) readFull(buf []byte) error {
bufLen := uint64(len(buf))
if q.readerOffset+bufLen > q.writerFlushedOffset {
q.writer.MustFlush(false)
@ -594,22 +600,32 @@ func (q *Queue) readFull(buf []byte) error {
return nil
}
func (q *Queue) flushMetainfoIfNeededLocked(flushData bool) error {
func (q *queue) flushReaderMetainfoIfNeeded() error {
t := fasttime.UnixTimestamp()
if t == q.lastMetainfoFlushTime {
return nil
}
if flushData {
q.writer.MustFlush(true)
}
if err := q.flushMetainfoLocked(); err != nil {
if err := q.flushMetainfo(); err != nil {
return fmt.Errorf("cannot flush metainfo: %w", err)
}
q.lastMetainfoFlushTime = t
return nil
}
func (q *Queue) flushMetainfoLocked() error {
func (q *queue) flushWriterMetainfoIfNeeded() error {
t := fasttime.UnixTimestamp()
if t == q.lastMetainfoFlushTime {
return nil
}
q.writer.MustFlush(true)
if err := q.flushMetainfo(); err != nil {
return fmt.Errorf("cannot flush metainfo: %w", err)
}
q.lastMetainfoFlushTime = t
return nil
}
func (q *queue) flushMetainfo() error {
mi := &metainfo{
Name: q.name,
ReaderOffset: q.readerOffset,

View file

@ -5,16 +5,14 @@ import (
"io/ioutil"
"os"
"strconv"
"sync"
"testing"
"time"
)
func TestQueueOpenClose(t *testing.T) {
path := "queue-open-close"
mustDeleteDir(path)
for i := 0; i < 3; i++ {
q := MustOpen(path, "foobar", 0)
q := mustOpen(path, "foobar", 0)
if n := q.GetPendingBytes(); n > 0 {
t.Fatalf("pending bytes must be 0; got %d", n)
}
@ -28,7 +26,7 @@ func TestQueueOpen(t *testing.T) {
path := "queue-open-invalid-metainfo"
mustCreateDir(path)
mustCreateFile(path+"/metainfo.json", "foobarbaz")
q := MustOpen(path, "foobar", 0)
q := mustOpen(path, "foobar", 0)
q.MustClose()
mustDeleteDir(path)
})
@ -38,7 +36,7 @@ func TestQueueOpen(t *testing.T) {
mustCreateEmptyMetainfo(path, "foobar")
mustCreateFile(path+"/junk-file", "foobar")
mustCreateDir(path + "/junk-dir")
q := MustOpen(path, "foobar", 0)
q := mustOpen(path, "foobar", 0)
q.MustClose()
mustDeleteDir(path)
})
@ -47,7 +45,7 @@ func TestQueueOpen(t *testing.T) {
mustCreateDir(path)
mustCreateEmptyMetainfo(path, "foobar")
mustCreateFile(fmt.Sprintf("%s/%016X", path, 1234), "qwere")
q := MustOpen(path, "foobar", 0)
q := mustOpen(path, "foobar", 0)
q.MustClose()
mustDeleteDir(path)
})
@ -56,7 +54,7 @@ func TestQueueOpen(t *testing.T) {
mustCreateDir(path)
mustCreateEmptyMetainfo(path, "foobar")
mustCreateFile(fmt.Sprintf("%s/%016X", path, 100*uint64(defaultChunkFileSize)), "asdf")
q := MustOpen(path, "foobar", 0)
q := mustOpen(path, "foobar", 0)
q.MustClose()
mustDeleteDir(path)
})
@ -72,7 +70,7 @@ func TestQueueOpen(t *testing.T) {
t.Fatalf("unexpected error: %s", err)
}
mustCreateFile(fmt.Sprintf("%s/%016X", path, 0), "adfsfd")
q := MustOpen(path, mi.Name, 0)
q := mustOpen(path, mi.Name, 0)
q.MustClose()
mustDeleteDir(path)
})
@ -86,7 +84,7 @@ func TestQueueOpen(t *testing.T) {
if err := mi.WriteToFile(path + "/metainfo.json"); err != nil {
t.Fatalf("unexpected error: %s", err)
}
q := MustOpen(path, mi.Name, 0)
q := mustOpen(path, mi.Name, 0)
q.MustClose()
mustDeleteDir(path)
})
@ -94,7 +92,7 @@ func TestQueueOpen(t *testing.T) {
path := "queue-open-metainfo-dir"
mustCreateDir(path)
mustCreateDir(path + "/metainfo.json")
q := MustOpen(path, "foobar", 0)
q := mustOpen(path, "foobar", 0)
q.MustClose()
mustDeleteDir(path)
})
@ -110,7 +108,7 @@ func TestQueueOpen(t *testing.T) {
t.Fatalf("unexpected error: %s", err)
}
mustCreateFile(fmt.Sprintf("%s/%016X", path, 0), "sdf")
q := MustOpen(path, mi.Name, 0)
q := mustOpen(path, mi.Name, 0)
q.MustClose()
mustDeleteDir(path)
})
@ -119,7 +117,7 @@ func TestQueueOpen(t *testing.T) {
mustCreateDir(path)
mustCreateEmptyMetainfo(path, "foobar")
mustCreateFile(fmt.Sprintf("%s/%016X", path, 0), "sdfdsf")
q := MustOpen(path, "foobar", 0)
q := mustOpen(path, "foobar", 0)
q.MustClose()
mustDeleteDir(path)
})
@ -133,7 +131,7 @@ func TestQueueOpen(t *testing.T) {
t.Fatalf("unexpected error: %s", err)
}
mustCreateFile(fmt.Sprintf("%s/%016X", path, 0), "sdf")
q := MustOpen(path, "baz", 0)
q := mustOpen(path, "baz", 0)
q.MustClose()
mustDeleteDir(path)
})
@ -142,7 +140,7 @@ func TestQueueOpen(t *testing.T) {
func TestQueueResetIfEmpty(t *testing.T) {
path := "queue-reset-if-empty"
mustDeleteDir(path)
q := MustOpen(path, "foobar", 0)
q := mustOpen(path, "foobar", 0)
defer func() {
q.MustClose()
mustDeleteDir(path)
@ -154,14 +152,18 @@ func TestQueueResetIfEmpty(t *testing.T) {
for i := 0; i < 10; i++ {
q.MustWriteBlock(block)
var ok bool
buf, ok = q.MustReadBlock(buf[:0])
buf, ok = q.MustReadBlockNonblocking(buf[:0])
if !ok {
t.Fatalf("unexpected ok=false returned from MustReadBlock")
t.Fatalf("unexpected ok=false returned from MustReadBlockNonblocking")
}
}
q.ResetIfEmpty()
if n := q.GetPendingBytes(); n > 0 {
t.Fatalf("unexpected non-zer pending bytes after queue reset: %d", n)
t.Fatalf("unexpected non-zero pending bytes after queue reset: %d", n)
}
q.ResetIfEmpty()
if n := q.GetPendingBytes(); n > 0 {
t.Fatalf("unexpected non-zero pending bytes after queue reset: %d", n)
}
}
}
@ -169,7 +171,7 @@ func TestQueueResetIfEmpty(t *testing.T) {
func TestQueueWriteRead(t *testing.T) {
path := "queue-write-read"
mustDeleteDir(path)
q := MustOpen(path, "foobar", 0)
q := mustOpen(path, "foobar", 0)
defer func() {
q.MustClose()
mustDeleteDir(path)
@ -188,9 +190,9 @@ func TestQueueWriteRead(t *testing.T) {
var buf []byte
var ok bool
for _, block := range blocks {
buf, ok = q.MustReadBlock(buf[:0])
buf, ok = q.MustReadBlockNonblocking(buf[:0])
if !ok {
t.Fatalf("unexpected ok=%v returned from MustReadBlock; want true", ok)
t.Fatalf("unexpected ok=%v returned from MustReadBlockNonblocking; want true", ok)
}
if string(buf) != string(block) {
t.Fatalf("unexpected block read; got %q; want %q", buf, block)
@ -205,7 +207,7 @@ func TestQueueWriteRead(t *testing.T) {
func TestQueueWriteCloseRead(t *testing.T) {
path := "queue-write-close-read"
mustDeleteDir(path)
q := MustOpen(path, "foobar", 0)
q := mustOpen(path, "foobar", 0)
defer func() {
q.MustClose()
mustDeleteDir(path)
@ -222,16 +224,16 @@ func TestQueueWriteCloseRead(t *testing.T) {
t.Fatalf("pending bytes must be greater than 0; got %d", n)
}
q.MustClose()
q = MustOpen(path, "foobar", 0)
q = mustOpen(path, "foobar", 0)
if n := q.GetPendingBytes(); n <= 0 {
t.Fatalf("pending bytes must be greater than 0; got %d", n)
}
var buf []byte
var ok bool
for _, block := range blocks {
buf, ok = q.MustReadBlock(buf[:0])
buf, ok = q.MustReadBlockNonblocking(buf[:0])
if !ok {
t.Fatalf("unexpected ok=%v returned from MustReadBlock; want true", ok)
t.Fatalf("unexpected ok=%v returned from MustReadBlockNonblocking; want true", ok)
}
if string(buf) != string(block) {
t.Fatalf("unexpected block read; got %q; want %q", buf, block)
@ -243,137 +245,12 @@ func TestQueueWriteCloseRead(t *testing.T) {
}
}
func TestQueueReadEmpty(t *testing.T) {
path := "queue-read-empty"
mustDeleteDir(path)
q := MustOpen(path, "foobar", 0)
defer mustDeleteDir(path)
resultCh := make(chan error)
go func() {
data, ok := q.MustReadBlock(nil)
var err error
if ok {
err = fmt.Errorf("unexpected ok=%v returned from MustReadBlock; want false", ok)
} else if len(data) > 0 {
err = fmt.Errorf("unexpected non-empty data returned from MustReadBlock: %q", data)
}
resultCh <- err
}()
if n := q.GetPendingBytes(); n > 0 {
t.Fatalf("pending bytes must be 0; got %d", n)
}
q.MustClose()
select {
case err := <-resultCh:
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
case <-time.After(time.Second):
t.Fatalf("timeout")
}
}
func TestQueueReadWriteConcurrent(t *testing.T) {
path := "queue-read-write-concurrent"
mustDeleteDir(path)
q := MustOpen(path, "foobar", 0)
defer mustDeleteDir(path)
blocksMap := make(map[string]bool, 1000)
var blocksMapLock sync.Mutex
blocks := make([]string, 1000)
for i := 0; i < 1000; i++ {
block := fmt.Sprintf("block #%d", i)
blocksMap[block] = true
blocks[i] = block
}
// Start block readers
var readersWG sync.WaitGroup
for workerID := 0; workerID < 10; workerID++ {
readersWG.Add(1)
go func() {
defer readersWG.Done()
for {
block, ok := q.MustReadBlock(nil)
if !ok {
return
}
blocksMapLock.Lock()
if !blocksMap[string(block)] {
panic(fmt.Errorf("unexpected block read: %q", block))
}
delete(blocksMap, string(block))
blocksMapLock.Unlock()
}
}()
}
// Start block writers
blocksCh := make(chan string)
var writersWG sync.WaitGroup
for workerID := 0; workerID < 10; workerID++ {
writersWG.Add(1)
go func(workerID int) {
defer writersWG.Done()
for block := range blocksCh {
q.MustWriteBlock([]byte(block))
}
}(workerID)
}
for _, block := range blocks {
blocksCh <- block
}
close(blocksCh)
// Wait for block writers to finish
writersWG.Wait()
// Notify readers that the queue is closed
q.MustClose()
// Wait for block readers to finish
readersWG.Wait()
// Read the remaining blocks in q.
q = MustOpen(path, "foobar", 0)
defer q.MustClose()
resultCh := make(chan error)
go func() {
for len(blocksMap) > 0 {
block, ok := q.MustReadBlock(nil)
if !ok {
resultCh <- fmt.Errorf("unexpected ok=false returned from MustReadBlock")
return
}
if !blocksMap[string(block)] {
resultCh <- fmt.Errorf("unexpected block read from the queue: %q", block)
return
}
delete(blocksMap, string(block))
}
resultCh <- nil
}()
select {
case err := <-resultCh:
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
case <-time.After(5 * time.Second):
t.Fatalf("timeout")
}
if n := q.GetPendingBytes(); n > 0 {
t.Fatalf("pending bytes must be 0; got %d", n)
}
}
func TestQueueChunkManagementSimple(t *testing.T) {
path := "queue-chunk-management-simple"
mustDeleteDir(path)
const chunkFileSize = 100
const maxBlockSize = 20
q := mustOpen(path, "foobar", chunkFileSize, maxBlockSize, 0)
q := mustOpenInternal(path, "foobar", chunkFileSize, maxBlockSize, 0)
defer mustDeleteDir(path)
defer q.MustClose()
var blocks []string
@ -386,7 +263,7 @@ func TestQueueChunkManagementSimple(t *testing.T) {
t.Fatalf("unexpected zero number of bytes pending")
}
for _, block := range blocks {
data, ok := q.MustReadBlock(nil)
data, ok := q.MustReadBlockNonblocking(nil)
if !ok {
t.Fatalf("unexpected ok=false")
}
@ -404,7 +281,7 @@ func TestQueueChunkManagementPeriodicClose(t *testing.T) {
mustDeleteDir(path)
const chunkFileSize = 100
const maxBlockSize = 20
q := mustOpen(path, "foobar", chunkFileSize, maxBlockSize, 0)
q := mustOpenInternal(path, "foobar", chunkFileSize, maxBlockSize, 0)
defer func() {
q.MustClose()
mustDeleteDir(path)
@ -415,13 +292,13 @@ func TestQueueChunkManagementPeriodicClose(t *testing.T) {
q.MustWriteBlock([]byte(block))
blocks = append(blocks, block)
q.MustClose()
q = mustOpen(path, "foobar", chunkFileSize, maxBlockSize, 0)
q = mustOpenInternal(path, "foobar", chunkFileSize, maxBlockSize, 0)
}
if n := q.GetPendingBytes(); n == 0 {
t.Fatalf("unexpected zero number of bytes pending")
}
for _, block := range blocks {
data, ok := q.MustReadBlock(nil)
data, ok := q.MustReadBlockNonblocking(nil)
if !ok {
t.Fatalf("unexpected ok=false")
}
@ -429,7 +306,7 @@ func TestQueueChunkManagementPeriodicClose(t *testing.T) {
t.Fatalf("unexpected block read; got %q; want %q", data, block)
}
q.MustClose()
q = mustOpen(path, "foobar", chunkFileSize, maxBlockSize, 0)
q = mustOpenInternal(path, "foobar", chunkFileSize, maxBlockSize, 0)
}
if n := q.GetPendingBytes(); n != 0 {
t.Fatalf("unexpected non-zero number of pending bytes: %d", n)
@ -440,7 +317,7 @@ func TestQueueLimitedSize(t *testing.T) {
const maxPendingBytes = 1000
path := "queue-limited-size"
mustDeleteDir(path)
q := MustOpen(path, "foobar", maxPendingBytes)
q := mustOpen(path, "foobar", maxPendingBytes)
defer func() {
q.MustClose()
mustDeleteDir(path)
@ -456,7 +333,7 @@ func TestQueueLimitedSize(t *testing.T) {
var buf []byte
var ok bool
for _, block := range blocks {
buf, ok = q.MustReadBlock(buf[:0])
buf, ok = q.MustReadBlockNonblocking(buf[:0])
if !ok {
t.Fatalf("unexpected ok=false")
}
@ -473,7 +350,7 @@ func TestQueueLimitedSize(t *testing.T) {
if n := q.GetPendingBytes(); n > maxPendingBytes {
t.Fatalf("too many pending bytes; got %d; mustn't exceed %d", n, maxPendingBytes)
}
buf, ok = q.MustReadBlock(buf[:0])
buf, ok = q.MustReadBlockNonblocking(buf[:0])
if !ok {
t.Fatalf("unexpected ok=false")
}

View file

@ -2,13 +2,14 @@ package persistentqueue
import (
"fmt"
"sync"
"testing"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
)
func BenchmarkQueueThroughputSerial(b *testing.B) {
const iterationsCount = 10
const iterationsCount = 100
for _, blockSize := range []int{1e0, 1e1, 1e2, 1e3, 1e4, 1e5, 1e6} {
block := make([]byte, blockSize)
b.Run(fmt.Sprintf("block-size-%d", blockSize), func(b *testing.B) {
@ -16,7 +17,7 @@ func BenchmarkQueueThroughputSerial(b *testing.B) {
b.SetBytes(int64(blockSize) * iterationsCount)
path := fmt.Sprintf("bench-queue-throughput-serial-%d", blockSize)
mustDeleteDir(path)
q := MustOpen(path, "foobar", 0)
q := mustOpen(path, "foobar", 0)
defer func() {
q.MustClose()
mustDeleteDir(path)
@ -29,7 +30,7 @@ func BenchmarkQueueThroughputSerial(b *testing.B) {
}
func BenchmarkQueueThroughputConcurrent(b *testing.B) {
const iterationsCount = 10
const iterationsCount = 100
for _, blockSize := range []int{1e0, 1e1, 1e2, 1e3, 1e4, 1e5, 1e6} {
block := make([]byte, blockSize)
b.Run(fmt.Sprintf("block-size-%d", blockSize), func(b *testing.B) {
@ -37,28 +38,31 @@ func BenchmarkQueueThroughputConcurrent(b *testing.B) {
b.SetBytes(int64(blockSize) * iterationsCount)
path := fmt.Sprintf("bench-queue-throughput-concurrent-%d", blockSize)
mustDeleteDir(path)
q := MustOpen(path, "foobar", 0)
q := mustOpen(path, "foobar", 0)
var qLock sync.Mutex
defer func() {
q.MustClose()
mustDeleteDir(path)
}()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
qLock.Lock()
writeReadIteration(q, block, iterationsCount)
qLock.Unlock()
}
})
})
}
}
func writeReadIteration(q *Queue, block []byte, iterationsCount int) {
func writeReadIteration(q *queue, block []byte, iterationsCount int) {
for i := 0; i < iterationsCount; i++ {
q.MustWriteBlock(block)
}
var ok bool
bb := bbPool.Get()
for i := 0; i < iterationsCount; i++ {
bb.B, ok = q.MustReadBlock(bb.B[:0])
bb.B, ok = q.MustReadBlockNonblocking(bb.B[:0])
if !ok {
panic(fmt.Errorf("unexpected ok=false"))
}