diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 75f1096bd..f1294214e 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -14,6 +14,7 @@ * FEATURE: vmagent: add AWS IAM roles for tasks support for EC2 service discovery according to [these docs](https://docs.aws.amazon.com/AmazonECS/latest/developerguide/task-iam-roles.html). * FEATURE: vmagent: add support for `proxy_tls_config`, `proxy_authorization`, `proxy_basic_auth`, `proxy_bearer_token` and `proxy_bearer_token_file` options in `consul_sd_config`, `dockerswarm_sd_config` and `eureka_sd_config` sections. * FEATURE: vmagent: pass `X-Prometheus-Scrape-Timeout-Seconds` header to scrape targets as Prometheus does. In this case scrape targets can limit the time needed for performing the scrape. See [this comment](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1179#issuecomment-813118733) for details. +* FEATURE: vmagent: drop corrupted persistent queue files at `-remoteWrite.tmpDataPath` instead of throwing a fatal error. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1030). * FEATURE: vmauth: add support for authorization via [bearer token](https://swagger.io/docs/specification/authentication/bearer-authentication/). See [the docs](https://victoriametrics.github.io/vmauth.html#auth-config) for details. * BUGFIX: vmagent: properly work with simple HTTP proxies which don't support `CONNECT` method. For example, [PushProx](https://github.com/prometheus-community/PushProx). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1179). diff --git a/lib/persistentqueue/fastqueue.go b/lib/persistentqueue/fastqueue.go index 5c25933f6..7dda73a86 100644 --- a/lib/persistentqueue/fastqueue.go +++ b/lib/persistentqueue/fastqueue.go @@ -8,7 +8,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" ) -// FastQueue is a wrapper around Queue, which prefers sending data via memory. +// FastQueue is fast persistent queue, which prefers sending data via memory. // // It falls back to sending data via file when readers don't catch up with writers. type FastQueue struct { @@ -20,7 +20,7 @@ type FastQueue struct { cond sync.Cond // pq is file-based queue - pq *Queue + pq *queue // ch is in-memory queue ch chan *bytesutil.ByteBuffer @@ -40,7 +40,7 @@ type FastQueue struct { // Otherwise its size is limited by maxPendingBytes. The oldest data is dropped when the queue // reaches maxPendingSize. func MustOpenFastQueue(path, name string, maxInmemoryBlocks, maxPendingBytes int) *FastQueue { - pq := MustOpen(path, name, maxPendingBytes) + pq := mustOpen(path, name, maxPendingBytes) fq := &FastQueue{ pq: pq, ch: make(chan *bytesutil.ByteBuffer, maxInmemoryBlocks), @@ -174,7 +174,12 @@ func (fq *FastQueue) MustReadBlock(dst []byte) ([]byte, bool) { return dst, true } if n := fq.pq.GetPendingBytes(); n > 0 { - return fq.pq.MustReadBlock(dst) + data, ok := fq.pq.MustReadBlockNonblocking(dst) + if ok { + return data, true + } + dst = data + continue } // There are no blocks. Wait for new block. diff --git a/lib/persistentqueue/persistentqueue.go b/lib/persistentqueue/persistentqueue.go index 916276c3f..4c082a9f8 100644 --- a/lib/persistentqueue/persistentqueue.go +++ b/lib/persistentqueue/persistentqueue.go @@ -8,7 +8,6 @@ import ( "os" "regexp" "strconv" - "sync" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding" @@ -26,8 +25,10 @@ const defaultChunkFileSize = (MaxBlockSize + 8) * 16 var chunkFileNameRegex = regexp.MustCompile("^[0-9A-F]{16}$") -// Queue represents persistent queue. -type Queue struct { +// queue represents persistent queue. +// +// It is unsafe to call queue methods from concurrent goroutines. +type queue struct { chunkFileSize uint64 maxBlockSize uint64 maxPendingBytes uint64 @@ -37,13 +38,6 @@ type Queue struct { flockF *os.File - // mu protects all the fields below. - mu sync.Mutex - - // cond is used for notifying blocked readers when new data has been added - // or when MustClose is called. - cond sync.Cond - reader *filestream.Reader readerPath string readerOffset uint64 @@ -74,10 +68,7 @@ type Queue struct { // ResetIfEmpty resets q if it is empty. // // This is needed in order to remove chunk file associated with empty q. -func (q *Queue) ResetIfEmpty() { - q.mu.Lock() - defer q.mu.Unlock() - +func (q *queue) ResetIfEmpty() { if q.readerOffset != q.writerOffset { // The queue isn't empty. return @@ -86,10 +77,13 @@ func (q *Queue) ResetIfEmpty() { // The file is too small to drop. Leave it as is in order to reduce filesystem load. return } + q.mustResetFiles() +} + +func (q *queue) mustResetFiles() { if q.readerPath != q.writerPath { logger.Panicf("BUG: readerPath=%q doesn't match writerPath=%q", q.readerPath, q.writerPath) } - q.reader.MustClose() q.writer.MustClose() fs.MustRemoveAll(q.readerPath) @@ -115,31 +109,29 @@ func (q *Queue) ResetIfEmpty() { } q.reader = r - if err := q.flushMetainfoLocked(); err != nil { + if err := q.flushMetainfo(); err != nil { logger.Panicf("FATAL: cannot flush metainfo: %s", err) } } // GetPendingBytes returns the number of pending bytes in the queue. -func (q *Queue) GetPendingBytes() uint64 { - q.mu.Lock() +func (q *queue) GetPendingBytes() uint64 { n := q.writerOffset - q.readerOffset - q.mu.Unlock() return n } -// MustOpen opens persistent queue from the given path. +// mustOpen opens persistent queue from the given path. // // If maxPendingBytes is greater than 0, then the max queue size is limited by this value. // The oldest data is deleted when queue size exceeds maxPendingBytes. -func MustOpen(path, name string, maxPendingBytes int) *Queue { +func mustOpen(path, name string, maxPendingBytes int) *queue { if maxPendingBytes < 0 { maxPendingBytes = 0 } - return mustOpen(path, name, defaultChunkFileSize, MaxBlockSize, uint64(maxPendingBytes)) + return mustOpenInternal(path, name, defaultChunkFileSize, MaxBlockSize, uint64(maxPendingBytes)) } -func mustOpen(path, name string, chunkFileSize, maxBlockSize, maxPendingBytes uint64) *Queue { +func mustOpenInternal(path, name string, chunkFileSize, maxBlockSize, maxPendingBytes uint64) *queue { if chunkFileSize < 8 || chunkFileSize-8 < maxBlockSize { logger.Panicf("BUG: too small chunkFileSize=%d for maxBlockSize=%d; chunkFileSize must fit at least one block", chunkFileSize, maxBlockSize) } @@ -166,15 +158,14 @@ func mustCreateFlockFile(path string) *os.File { return f } -func tryOpeningQueue(path, name string, chunkFileSize, maxBlockSize, maxPendingBytes uint64) (*Queue, error) { +func tryOpeningQueue(path, name string, chunkFileSize, maxBlockSize, maxPendingBytes uint64) (*queue, error) { // Protect from concurrent opens. - var q Queue + var q queue q.chunkFileSize = chunkFileSize q.maxBlockSize = maxBlockSize q.maxPendingBytes = maxPendingBytes q.dir = path q.name = name - q.cond.L = &q.mu q.blocksDropped = metrics.GetOrCreateCounter(fmt.Sprintf(`vm_persistentqueue_blocks_dropped_total{path=%q}`, path)) q.bytesDropped = metrics.GetOrCreateCounter(fmt.Sprintf(`vm_persistentqueue_bytes_dropped_total{path=%q}`, path)) @@ -346,17 +337,8 @@ func tryOpeningQueue(path, name string, chunkFileSize, maxBlockSize, maxPendingB // MustClose closes q. // -// It unblocks all the MustReadBlock calls. -// // MustWriteBlock mustn't be called during and after the call to MustClose. -func (q *Queue) MustClose() { - q.mu.Lock() - defer q.mu.Unlock() - - // Unblock goroutines blocked on cond in MustReadBlock. - q.mustStop = true - q.cond.Broadcast() - +func (q *queue) MustClose() { // Close writer. q.writer.MustClose() q.writer = nil @@ -366,7 +348,7 @@ func (q *Queue) MustClose() { q.reader = nil // Store metainfo - if err := q.flushMetainfoLocked(); err != nil { + if err := q.flushMetainfo(); err != nil { logger.Panicf("FATAL: cannot flush chunked queue metainfo: %s", err) } @@ -377,11 +359,11 @@ func (q *Queue) MustClose() { q.flockF = nil } -func (q *Queue) chunkFilePath(offset uint64) string { +func (q *queue) chunkFilePath(offset uint64) string { return fmt.Sprintf("%s/%016X", q.dir, offset) } -func (q *Queue) metainfoPath() string { +func (q *queue) metainfoPath() string { return q.dir + "/metainfo.json" } @@ -390,14 +372,10 @@ func (q *Queue) metainfoPath() string { // The block size cannot exceed MaxBlockSize. // // It is safe calling this function from concurrent goroutines. -func (q *Queue) MustWriteBlock(block []byte) { +func (q *queue) MustWriteBlock(block []byte) { if uint64(len(block)) > q.maxBlockSize { logger.Panicf("BUG: too big block to send: %d bytes; it mustn't exceed %d bytes", len(block), q.maxBlockSize) } - - q.mu.Lock() - defer q.mu.Unlock() - if q.mustStop { logger.Panicf("BUG: MustWriteBlock cannot be called after MustClose") } @@ -416,7 +394,10 @@ func (q *Queue) MustWriteBlock(block []byte) { bb := blockBufPool.Get() for q.writerOffset-q.readerOffset > maxPendingBytes { var err error - bb.B, err = q.readBlockLocked(bb.B[:0]) + bb.B, err = q.readBlock(bb.B[:0]) + if err == errEmptyQueue { + break + } if err != nil { logger.Panicf("FATAL: cannot read the oldest block %s", err) } @@ -429,38 +410,18 @@ func (q *Queue) MustWriteBlock(block []byte) { return } } - if err := q.writeBlockLocked(block); err != nil { + if err := q.writeBlock(block); err != nil { logger.Panicf("FATAL: %s", err) } - - // Notify blocked reader if any. - // See https://github.com/VictoriaMetrics/VictoriaMetrics/pull/484 for details. - q.cond.Signal() } var blockBufPool bytesutil.ByteBufferPool -func (q *Queue) writeBlockLocked(block []byte) error { +func (q *queue) writeBlock(block []byte) error { if q.writerLocalOffset+q.maxBlockSize+8 > q.chunkFileSize { - // Finalize the current chunk and start new one. - q.writer.MustClose() - // There is no need to do fs.MustSyncPath(q.writerPath) here, - // since MustClose already does this. - if n := q.writerOffset % q.chunkFileSize; n > 0 { - q.writerOffset += (q.chunkFileSize - n) + if err := q.nextChunkFileForWrite(); err != nil { + return fmt.Errorf("cannot create next chunk file: %w", err) } - q.writerFlushedOffset = q.writerOffset - q.writerLocalOffset = 0 - q.writerPath = q.chunkFilePath(q.writerOffset) - w, err := filestream.Create(q.writerPath, false) - if err != nil { - return fmt.Errorf("cannot create chunk file %q: %w", q.writerPath, err) - } - q.writer = w - if err := q.flushMetainfoLocked(); err != nil { - return fmt.Errorf("cannot flush metainfo: %w", err) - } - fs.MustSyncPath(q.dir) } // Write block len. @@ -479,62 +440,61 @@ func (q *Queue) writeBlockLocked(block []byte) error { } q.blocksWritten.Inc() q.bytesWritten.Add(len(block)) - return q.flushMetainfoIfNeededLocked(true) + return q.flushWriterMetainfoIfNeeded() } -// MustReadBlock appends the next block from q to dst and returns the result. -// -// false is returned after MustClose call. -// -// It is safe calling this function from concurrent goroutines. -func (q *Queue) MustReadBlock(dst []byte) ([]byte, bool) { - q.mu.Lock() - defer q.mu.Unlock() +func (q *queue) nextChunkFileForWrite() error { + // Finalize the current chunk and start new one. + q.writer.MustClose() + // There is no need to do fs.MustSyncPath(q.writerPath) here, + // since MustClose already does this. + if n := q.writerOffset % q.chunkFileSize; n > 0 { + q.writerOffset += q.chunkFileSize - n + } + q.writerFlushedOffset = q.writerOffset + q.writerLocalOffset = 0 + q.writerPath = q.chunkFilePath(q.writerOffset) + w, err := filestream.Create(q.writerPath, false) + if err != nil { + return fmt.Errorf("cannot create chunk file %q: %w", q.writerPath, err) + } + q.writer = w + if err := q.flushMetainfo(); err != nil { + return fmt.Errorf("cannot flush metainfo: %w", err) + } + fs.MustSyncPath(q.dir) + return nil +} - for { - if q.mustStop { +// MustReadBlockNonblocking appends the next block from q to dst and returns the result. +// +// false is returned if q is empty. +func (q *queue) MustReadBlockNonblocking(dst []byte) ([]byte, bool) { + if q.readerOffset > q.writerOffset { + logger.Panicf("BUG: readerOffset=%d cannot exceed writerOffset=%d", q.readerOffset, q.writerOffset) + } + if q.readerOffset == q.writerOffset { + return dst, false + } + var err error + dst, err = q.readBlock(dst) + if err != nil { + if err == errEmptyQueue { return dst, false } - if q.readerOffset > q.writerOffset { - logger.Panicf("BUG: readerOffset=%d cannot exceed writerOffset=%d", q.readerOffset, q.writerOffset) - } - if q.readerOffset < q.writerOffset { - break - } - q.cond.Wait() - } - - data, err := q.readBlockLocked(dst) - if err != nil { - // Skip the current chunk, since it may be broken. - q.readerOffset += q.chunkFileSize - q.readerOffset%q.chunkFileSize - _ = q.flushMetainfoLocked() logger.Panicf("FATAL: %s", err) } - return data, true + return dst, true } -func (q *Queue) readBlockLocked(dst []byte) ([]byte, error) { +func (q *queue) readBlock(dst []byte) ([]byte, error) { if q.readerLocalOffset+q.maxBlockSize+8 > q.chunkFileSize { - // Remove the current chunk and go to the next chunk. - q.reader.MustClose() - fs.MustRemoveAll(q.readerPath) - if n := q.readerOffset % q.chunkFileSize; n > 0 { - q.readerOffset += (q.chunkFileSize - n) + if err := q.nextChunkFileForRead(); err != nil { + return dst, fmt.Errorf("cannot open next chunk file: %w", err) } - q.readerLocalOffset = 0 - q.readerPath = q.chunkFilePath(q.readerOffset) - r, err := filestream.Open(q.readerPath, true) - if err != nil { - return dst, fmt.Errorf("cannot open chunk file %q: %w", q.readerPath, err) - } - q.reader = r - if err := q.flushMetainfoLocked(); err != nil { - return dst, fmt.Errorf("cannot flush metainfo: %w", err) - } - fs.MustSyncPath(q.dir) } +again: // Read block len. header := headerBufPool.Get() header.B = bytesutil.Resize(header.B, 8) @@ -542,27 +502,73 @@ func (q *Queue) readBlockLocked(dst []byte) ([]byte, error) { blockLen := encoding.UnmarshalUint64(header.B) headerBufPool.Put(header) if err != nil { - return dst, fmt.Errorf("cannot read header with size 8 bytes from %q: %w", q.readerPath, err) + logger.Errorf("skipping corrupted %q, since header with size 8 bytes cannot be read from it: %s", q.readerPath, err) + if err := q.skipBrokenChunkFile(); err != nil { + return dst, err + } + goto again } if blockLen > q.maxBlockSize { - return dst, fmt.Errorf("too big block size read from %q: %d bytes; cannot exceed %d bytes", q.readerPath, blockLen, q.maxBlockSize) + logger.Errorf("skipping corrupted %q, since too big block size is read from it: %d bytes; cannot exceed %d bytes", q.readerPath, blockLen, q.maxBlockSize) + if err := q.skipBrokenChunkFile(); err != nil { + return dst, err + } + goto again } // Read block contents. dstLen := len(dst) dst = bytesutil.Resize(dst, dstLen+int(blockLen)) if err := q.readFull(dst[dstLen:]); err != nil { - return dst, fmt.Errorf("cannot read block contents with size %d bytes from %q: %w", blockLen, q.readerPath, err) + logger.Errorf("skipping corrupted %q, since contents with size %d bytes cannot be read from it: %s", q.readerPath, blockLen, err) + if err := q.skipBrokenChunkFile(); err != nil { + return dst[:dstLen], err + } + goto again } q.blocksRead.Inc() q.bytesRead.Add(int(blockLen)) - if err := q.flushMetainfoIfNeededLocked(false); err != nil { + if err := q.flushReaderMetainfoIfNeeded(); err != nil { return dst, err } return dst, nil } -func (q *Queue) write(buf []byte) error { +func (q *queue) skipBrokenChunkFile() error { + // Try to recover from broken chunk file by skipping it. + // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1030 + q.readerOffset += q.chunkFileSize - q.readerOffset%q.chunkFileSize + if q.readerOffset >= q.writerOffset { + q.mustResetFiles() + return errEmptyQueue + } + return q.nextChunkFileForRead() +} + +var errEmptyQueue = fmt.Errorf("the queue is empty") + +func (q *queue) nextChunkFileForRead() error { + // Remove the current chunk and go to the next chunk. + q.reader.MustClose() + fs.MustRemoveAll(q.readerPath) + if n := q.readerOffset % q.chunkFileSize; n > 0 { + q.readerOffset += q.chunkFileSize - n + } + q.readerLocalOffset = 0 + q.readerPath = q.chunkFilePath(q.readerOffset) + r, err := filestream.Open(q.readerPath, true) + if err != nil { + return fmt.Errorf("cannot open chunk file %q: %w", q.readerPath, err) + } + q.reader = r + if err := q.flushMetainfo(); err != nil { + return fmt.Errorf("cannot flush metainfo: %w", err) + } + fs.MustSyncPath(q.dir) + return nil +} + +func (q *queue) write(buf []byte) error { bufLen := uint64(len(buf)) n, err := q.writer.Write(buf) if err != nil { @@ -576,7 +582,7 @@ func (q *Queue) write(buf []byte) error { return nil } -func (q *Queue) readFull(buf []byte) error { +func (q *queue) readFull(buf []byte) error { bufLen := uint64(len(buf)) if q.readerOffset+bufLen > q.writerFlushedOffset { q.writer.MustFlush(false) @@ -594,22 +600,32 @@ func (q *Queue) readFull(buf []byte) error { return nil } -func (q *Queue) flushMetainfoIfNeededLocked(flushData bool) error { +func (q *queue) flushReaderMetainfoIfNeeded() error { t := fasttime.UnixTimestamp() if t == q.lastMetainfoFlushTime { return nil } - if flushData { - q.writer.MustFlush(true) - } - if err := q.flushMetainfoLocked(); err != nil { + if err := q.flushMetainfo(); err != nil { return fmt.Errorf("cannot flush metainfo: %w", err) } q.lastMetainfoFlushTime = t return nil } -func (q *Queue) flushMetainfoLocked() error { +func (q *queue) flushWriterMetainfoIfNeeded() error { + t := fasttime.UnixTimestamp() + if t == q.lastMetainfoFlushTime { + return nil + } + q.writer.MustFlush(true) + if err := q.flushMetainfo(); err != nil { + return fmt.Errorf("cannot flush metainfo: %w", err) + } + q.lastMetainfoFlushTime = t + return nil +} + +func (q *queue) flushMetainfo() error { mi := &metainfo{ Name: q.name, ReaderOffset: q.readerOffset, diff --git a/lib/persistentqueue/persistentqueue_test.go b/lib/persistentqueue/persistentqueue_test.go index e4b83d540..e135eb415 100644 --- a/lib/persistentqueue/persistentqueue_test.go +++ b/lib/persistentqueue/persistentqueue_test.go @@ -5,16 +5,14 @@ import ( "io/ioutil" "os" "strconv" - "sync" "testing" - "time" ) func TestQueueOpenClose(t *testing.T) { path := "queue-open-close" mustDeleteDir(path) for i := 0; i < 3; i++ { - q := MustOpen(path, "foobar", 0) + q := mustOpen(path, "foobar", 0) if n := q.GetPendingBytes(); n > 0 { t.Fatalf("pending bytes must be 0; got %d", n) } @@ -28,7 +26,7 @@ func TestQueueOpen(t *testing.T) { path := "queue-open-invalid-metainfo" mustCreateDir(path) mustCreateFile(path+"/metainfo.json", "foobarbaz") - q := MustOpen(path, "foobar", 0) + q := mustOpen(path, "foobar", 0) q.MustClose() mustDeleteDir(path) }) @@ -38,7 +36,7 @@ func TestQueueOpen(t *testing.T) { mustCreateEmptyMetainfo(path, "foobar") mustCreateFile(path+"/junk-file", "foobar") mustCreateDir(path + "/junk-dir") - q := MustOpen(path, "foobar", 0) + q := mustOpen(path, "foobar", 0) q.MustClose() mustDeleteDir(path) }) @@ -47,7 +45,7 @@ func TestQueueOpen(t *testing.T) { mustCreateDir(path) mustCreateEmptyMetainfo(path, "foobar") mustCreateFile(fmt.Sprintf("%s/%016X", path, 1234), "qwere") - q := MustOpen(path, "foobar", 0) + q := mustOpen(path, "foobar", 0) q.MustClose() mustDeleteDir(path) }) @@ -56,7 +54,7 @@ func TestQueueOpen(t *testing.T) { mustCreateDir(path) mustCreateEmptyMetainfo(path, "foobar") mustCreateFile(fmt.Sprintf("%s/%016X", path, 100*uint64(defaultChunkFileSize)), "asdf") - q := MustOpen(path, "foobar", 0) + q := mustOpen(path, "foobar", 0) q.MustClose() mustDeleteDir(path) }) @@ -72,7 +70,7 @@ func TestQueueOpen(t *testing.T) { t.Fatalf("unexpected error: %s", err) } mustCreateFile(fmt.Sprintf("%s/%016X", path, 0), "adfsfd") - q := MustOpen(path, mi.Name, 0) + q := mustOpen(path, mi.Name, 0) q.MustClose() mustDeleteDir(path) }) @@ -86,7 +84,7 @@ func TestQueueOpen(t *testing.T) { if err := mi.WriteToFile(path + "/metainfo.json"); err != nil { t.Fatalf("unexpected error: %s", err) } - q := MustOpen(path, mi.Name, 0) + q := mustOpen(path, mi.Name, 0) q.MustClose() mustDeleteDir(path) }) @@ -94,7 +92,7 @@ func TestQueueOpen(t *testing.T) { path := "queue-open-metainfo-dir" mustCreateDir(path) mustCreateDir(path + "/metainfo.json") - q := MustOpen(path, "foobar", 0) + q := mustOpen(path, "foobar", 0) q.MustClose() mustDeleteDir(path) }) @@ -110,7 +108,7 @@ func TestQueueOpen(t *testing.T) { t.Fatalf("unexpected error: %s", err) } mustCreateFile(fmt.Sprintf("%s/%016X", path, 0), "sdf") - q := MustOpen(path, mi.Name, 0) + q := mustOpen(path, mi.Name, 0) q.MustClose() mustDeleteDir(path) }) @@ -119,7 +117,7 @@ func TestQueueOpen(t *testing.T) { mustCreateDir(path) mustCreateEmptyMetainfo(path, "foobar") mustCreateFile(fmt.Sprintf("%s/%016X", path, 0), "sdfdsf") - q := MustOpen(path, "foobar", 0) + q := mustOpen(path, "foobar", 0) q.MustClose() mustDeleteDir(path) }) @@ -133,7 +131,7 @@ func TestQueueOpen(t *testing.T) { t.Fatalf("unexpected error: %s", err) } mustCreateFile(fmt.Sprintf("%s/%016X", path, 0), "sdf") - q := MustOpen(path, "baz", 0) + q := mustOpen(path, "baz", 0) q.MustClose() mustDeleteDir(path) }) @@ -142,7 +140,7 @@ func TestQueueOpen(t *testing.T) { func TestQueueResetIfEmpty(t *testing.T) { path := "queue-reset-if-empty" mustDeleteDir(path) - q := MustOpen(path, "foobar", 0) + q := mustOpen(path, "foobar", 0) defer func() { q.MustClose() mustDeleteDir(path) @@ -154,14 +152,18 @@ func TestQueueResetIfEmpty(t *testing.T) { for i := 0; i < 10; i++ { q.MustWriteBlock(block) var ok bool - buf, ok = q.MustReadBlock(buf[:0]) + buf, ok = q.MustReadBlockNonblocking(buf[:0]) if !ok { - t.Fatalf("unexpected ok=false returned from MustReadBlock") + t.Fatalf("unexpected ok=false returned from MustReadBlockNonblocking") } } q.ResetIfEmpty() if n := q.GetPendingBytes(); n > 0 { - t.Fatalf("unexpected non-zer pending bytes after queue reset: %d", n) + t.Fatalf("unexpected non-zero pending bytes after queue reset: %d", n) + } + q.ResetIfEmpty() + if n := q.GetPendingBytes(); n > 0 { + t.Fatalf("unexpected non-zero pending bytes after queue reset: %d", n) } } } @@ -169,7 +171,7 @@ func TestQueueResetIfEmpty(t *testing.T) { func TestQueueWriteRead(t *testing.T) { path := "queue-write-read" mustDeleteDir(path) - q := MustOpen(path, "foobar", 0) + q := mustOpen(path, "foobar", 0) defer func() { q.MustClose() mustDeleteDir(path) @@ -188,9 +190,9 @@ func TestQueueWriteRead(t *testing.T) { var buf []byte var ok bool for _, block := range blocks { - buf, ok = q.MustReadBlock(buf[:0]) + buf, ok = q.MustReadBlockNonblocking(buf[:0]) if !ok { - t.Fatalf("unexpected ok=%v returned from MustReadBlock; want true", ok) + t.Fatalf("unexpected ok=%v returned from MustReadBlockNonblocking; want true", ok) } if string(buf) != string(block) { t.Fatalf("unexpected block read; got %q; want %q", buf, block) @@ -205,7 +207,7 @@ func TestQueueWriteRead(t *testing.T) { func TestQueueWriteCloseRead(t *testing.T) { path := "queue-write-close-read" mustDeleteDir(path) - q := MustOpen(path, "foobar", 0) + q := mustOpen(path, "foobar", 0) defer func() { q.MustClose() mustDeleteDir(path) @@ -222,16 +224,16 @@ func TestQueueWriteCloseRead(t *testing.T) { t.Fatalf("pending bytes must be greater than 0; got %d", n) } q.MustClose() - q = MustOpen(path, "foobar", 0) + q = mustOpen(path, "foobar", 0) if n := q.GetPendingBytes(); n <= 0 { t.Fatalf("pending bytes must be greater than 0; got %d", n) } var buf []byte var ok bool for _, block := range blocks { - buf, ok = q.MustReadBlock(buf[:0]) + buf, ok = q.MustReadBlockNonblocking(buf[:0]) if !ok { - t.Fatalf("unexpected ok=%v returned from MustReadBlock; want true", ok) + t.Fatalf("unexpected ok=%v returned from MustReadBlockNonblocking; want true", ok) } if string(buf) != string(block) { t.Fatalf("unexpected block read; got %q; want %q", buf, block) @@ -243,137 +245,12 @@ func TestQueueWriteCloseRead(t *testing.T) { } } -func TestQueueReadEmpty(t *testing.T) { - path := "queue-read-empty" - mustDeleteDir(path) - q := MustOpen(path, "foobar", 0) - defer mustDeleteDir(path) - - resultCh := make(chan error) - go func() { - data, ok := q.MustReadBlock(nil) - var err error - if ok { - err = fmt.Errorf("unexpected ok=%v returned from MustReadBlock; want false", ok) - } else if len(data) > 0 { - err = fmt.Errorf("unexpected non-empty data returned from MustReadBlock: %q", data) - } - resultCh <- err - }() - if n := q.GetPendingBytes(); n > 0 { - t.Fatalf("pending bytes must be 0; got %d", n) - } - q.MustClose() - select { - case err := <-resultCh: - if err != nil { - t.Fatalf("unexpected error: %s", err) - } - case <-time.After(time.Second): - t.Fatalf("timeout") - } -} - -func TestQueueReadWriteConcurrent(t *testing.T) { - path := "queue-read-write-concurrent" - mustDeleteDir(path) - q := MustOpen(path, "foobar", 0) - defer mustDeleteDir(path) - - blocksMap := make(map[string]bool, 1000) - var blocksMapLock sync.Mutex - blocks := make([]string, 1000) - for i := 0; i < 1000; i++ { - block := fmt.Sprintf("block #%d", i) - blocksMap[block] = true - blocks[i] = block - } - - // Start block readers - var readersWG sync.WaitGroup - for workerID := 0; workerID < 10; workerID++ { - readersWG.Add(1) - go func() { - defer readersWG.Done() - for { - block, ok := q.MustReadBlock(nil) - if !ok { - return - } - blocksMapLock.Lock() - if !blocksMap[string(block)] { - panic(fmt.Errorf("unexpected block read: %q", block)) - } - delete(blocksMap, string(block)) - blocksMapLock.Unlock() - } - }() - } - - // Start block writers - blocksCh := make(chan string) - var writersWG sync.WaitGroup - for workerID := 0; workerID < 10; workerID++ { - writersWG.Add(1) - go func(workerID int) { - defer writersWG.Done() - for block := range blocksCh { - q.MustWriteBlock([]byte(block)) - } - }(workerID) - } - for _, block := range blocks { - blocksCh <- block - } - close(blocksCh) - - // Wait for block writers to finish - writersWG.Wait() - - // Notify readers that the queue is closed - q.MustClose() - - // Wait for block readers to finish - readersWG.Wait() - - // Read the remaining blocks in q. - q = MustOpen(path, "foobar", 0) - defer q.MustClose() - resultCh := make(chan error) - go func() { - for len(blocksMap) > 0 { - block, ok := q.MustReadBlock(nil) - if !ok { - resultCh <- fmt.Errorf("unexpected ok=false returned from MustReadBlock") - return - } - if !blocksMap[string(block)] { - resultCh <- fmt.Errorf("unexpected block read from the queue: %q", block) - return - } - delete(blocksMap, string(block)) - } - resultCh <- nil - }() - select { - case err := <-resultCh: - if err != nil { - t.Fatalf("unexpected error: %s", err) - } - case <-time.After(5 * time.Second): - t.Fatalf("timeout") - } - if n := q.GetPendingBytes(); n > 0 { - t.Fatalf("pending bytes must be 0; got %d", n) - } -} - func TestQueueChunkManagementSimple(t *testing.T) { path := "queue-chunk-management-simple" mustDeleteDir(path) const chunkFileSize = 100 const maxBlockSize = 20 - q := mustOpen(path, "foobar", chunkFileSize, maxBlockSize, 0) + q := mustOpenInternal(path, "foobar", chunkFileSize, maxBlockSize, 0) defer mustDeleteDir(path) defer q.MustClose() var blocks []string @@ -386,7 +263,7 @@ func TestQueueChunkManagementSimple(t *testing.T) { t.Fatalf("unexpected zero number of bytes pending") } for _, block := range blocks { - data, ok := q.MustReadBlock(nil) + data, ok := q.MustReadBlockNonblocking(nil) if !ok { t.Fatalf("unexpected ok=false") } @@ -404,7 +281,7 @@ func TestQueueChunkManagementPeriodicClose(t *testing.T) { mustDeleteDir(path) const chunkFileSize = 100 const maxBlockSize = 20 - q := mustOpen(path, "foobar", chunkFileSize, maxBlockSize, 0) + q := mustOpenInternal(path, "foobar", chunkFileSize, maxBlockSize, 0) defer func() { q.MustClose() mustDeleteDir(path) @@ -415,13 +292,13 @@ func TestQueueChunkManagementPeriodicClose(t *testing.T) { q.MustWriteBlock([]byte(block)) blocks = append(blocks, block) q.MustClose() - q = mustOpen(path, "foobar", chunkFileSize, maxBlockSize, 0) + q = mustOpenInternal(path, "foobar", chunkFileSize, maxBlockSize, 0) } if n := q.GetPendingBytes(); n == 0 { t.Fatalf("unexpected zero number of bytes pending") } for _, block := range blocks { - data, ok := q.MustReadBlock(nil) + data, ok := q.MustReadBlockNonblocking(nil) if !ok { t.Fatalf("unexpected ok=false") } @@ -429,7 +306,7 @@ func TestQueueChunkManagementPeriodicClose(t *testing.T) { t.Fatalf("unexpected block read; got %q; want %q", data, block) } q.MustClose() - q = mustOpen(path, "foobar", chunkFileSize, maxBlockSize, 0) + q = mustOpenInternal(path, "foobar", chunkFileSize, maxBlockSize, 0) } if n := q.GetPendingBytes(); n != 0 { t.Fatalf("unexpected non-zero number of pending bytes: %d", n) @@ -440,7 +317,7 @@ func TestQueueLimitedSize(t *testing.T) { const maxPendingBytes = 1000 path := "queue-limited-size" mustDeleteDir(path) - q := MustOpen(path, "foobar", maxPendingBytes) + q := mustOpen(path, "foobar", maxPendingBytes) defer func() { q.MustClose() mustDeleteDir(path) @@ -456,7 +333,7 @@ func TestQueueLimitedSize(t *testing.T) { var buf []byte var ok bool for _, block := range blocks { - buf, ok = q.MustReadBlock(buf[:0]) + buf, ok = q.MustReadBlockNonblocking(buf[:0]) if !ok { t.Fatalf("unexpected ok=false") } @@ -473,7 +350,7 @@ func TestQueueLimitedSize(t *testing.T) { if n := q.GetPendingBytes(); n > maxPendingBytes { t.Fatalf("too many pending bytes; got %d; mustn't exceed %d", n, maxPendingBytes) } - buf, ok = q.MustReadBlock(buf[:0]) + buf, ok = q.MustReadBlockNonblocking(buf[:0]) if !ok { t.Fatalf("unexpected ok=false") } diff --git a/lib/persistentqueue/persistentqueue_timing_test.go b/lib/persistentqueue/persistentqueue_timing_test.go index 02e87513f..e7a3b7874 100644 --- a/lib/persistentqueue/persistentqueue_timing_test.go +++ b/lib/persistentqueue/persistentqueue_timing_test.go @@ -2,13 +2,14 @@ package persistentqueue import ( "fmt" + "sync" "testing" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" ) func BenchmarkQueueThroughputSerial(b *testing.B) { - const iterationsCount = 10 + const iterationsCount = 100 for _, blockSize := range []int{1e0, 1e1, 1e2, 1e3, 1e4, 1e5, 1e6} { block := make([]byte, blockSize) b.Run(fmt.Sprintf("block-size-%d", blockSize), func(b *testing.B) { @@ -16,7 +17,7 @@ func BenchmarkQueueThroughputSerial(b *testing.B) { b.SetBytes(int64(blockSize) * iterationsCount) path := fmt.Sprintf("bench-queue-throughput-serial-%d", blockSize) mustDeleteDir(path) - q := MustOpen(path, "foobar", 0) + q := mustOpen(path, "foobar", 0) defer func() { q.MustClose() mustDeleteDir(path) @@ -29,7 +30,7 @@ func BenchmarkQueueThroughputSerial(b *testing.B) { } func BenchmarkQueueThroughputConcurrent(b *testing.B) { - const iterationsCount = 10 + const iterationsCount = 100 for _, blockSize := range []int{1e0, 1e1, 1e2, 1e3, 1e4, 1e5, 1e6} { block := make([]byte, blockSize) b.Run(fmt.Sprintf("block-size-%d", blockSize), func(b *testing.B) { @@ -37,28 +38,31 @@ func BenchmarkQueueThroughputConcurrent(b *testing.B) { b.SetBytes(int64(blockSize) * iterationsCount) path := fmt.Sprintf("bench-queue-throughput-concurrent-%d", blockSize) mustDeleteDir(path) - q := MustOpen(path, "foobar", 0) + q := mustOpen(path, "foobar", 0) + var qLock sync.Mutex defer func() { q.MustClose() mustDeleteDir(path) }() b.RunParallel(func(pb *testing.PB) { for pb.Next() { + qLock.Lock() writeReadIteration(q, block, iterationsCount) + qLock.Unlock() } }) }) } } -func writeReadIteration(q *Queue, block []byte, iterationsCount int) { +func writeReadIteration(q *queue, block []byte, iterationsCount int) { for i := 0; i < iterationsCount; i++ { q.MustWriteBlock(block) } var ok bool bb := bbPool.Get() for i := 0; i < iterationsCount; i++ { - bb.B, ok = q.MustReadBlock(bb.B[:0]) + bb.B, ok = q.MustReadBlockNonblocking(bb.B[:0]) if !ok { panic(fmt.Errorf("unexpected ok=false")) }