From f01d1bf4a88f35355942d69e63e72d9c639fb9b2 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Tue, 3 Mar 2020 19:48:46 +0200 Subject: [PATCH] app/vmagent: add `-remoteWrite.maxDiskUsagePerURL` for limiting the maximum disk usage for each `-remoteWrite.url` buffer Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/352 --- app/vmagent/README.md | 2 +- app/vmagent/remotewrite/client.go | 8 +- app/vmagent/remotewrite/remotewrite.go | 12 +- lib/persistentqueue/fastqueue.go | 10 +- lib/persistentqueue/fastqueue_test.go | 26 ++-- lib/persistentqueue/fastqueue_timing_test.go | 4 +- lib/persistentqueue/persistentqueue.go | 72 ++++++++-- lib/persistentqueue/persistentqueue_test.go | 127 +++++++++++++++--- .../persistentqueue_timing_test.go | 4 +- 9 files changed, 209 insertions(+), 56 deletions(-) diff --git a/app/vmagent/README.md b/app/vmagent/README.md index d34a35bd1..1530b02cb 100644 --- a/app/vmagent/README.md +++ b/app/vmagent/README.md @@ -28,7 +28,7 @@ to `vmagent` (like the ability to push metrics instead of pulling them). We did * Can replicate collected metrics simultaneously to multiple remote storage systems. * Works in environments with unstable connections to remote storage. If the remote storage is unavailable, the collected metrics are buffered at `-remoteWrite.tmpDataPath`. The buffered metrics are sent to remote storage as soon as connection - to remote storage is recovered. + to remote storage is recovered. The maximum disk usage for the buffer can be limited with `-remoteWrite.maxDiskUsagePerURL`. * Uses lower amounts of RAM, CPU, disk IO and network bandwidth comparing to Prometheus. diff --git a/app/vmagent/remotewrite/client.go b/app/vmagent/remotewrite/client.go index e9a8a916e..cb1083dae 100644 --- a/app/vmagent/remotewrite/client.go +++ b/app/vmagent/remotewrite/client.go @@ -122,10 +122,10 @@ func newClient(remoteWriteURL, urlLabelValue string, fq *persistentqueue.FastQue hc: hc, stopCh: make(chan struct{}), } - c.requestDuration = metrics.NewHistogram(fmt.Sprintf(`vmagent_remotewrite_duration_seconds{url=%q}`, c.urlLabelValue)) - c.requestsOKCount = metrics.NewCounter(fmt.Sprintf(`vmagent_remotewrite_requests_total{url=%q, status_code="2XX"}`, c.urlLabelValue)) - c.errorsCount = metrics.NewCounter(fmt.Sprintf(`vmagent_remotewrite_errors_total{url=%q}`, c.urlLabelValue)) - c.retriesCount = metrics.NewCounter(fmt.Sprintf(`vmagent_remotewrite_retries_count_total{url=%q}`, c.urlLabelValue)) + c.requestDuration = metrics.GetOrCreateHistogram(fmt.Sprintf(`vmagent_remotewrite_duration_seconds{url=%q}`, c.urlLabelValue)) + c.requestsOKCount = metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_requests_total{url=%q, status_code="2XX"}`, c.urlLabelValue)) + c.errorsCount = metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_errors_total{url=%q}`, c.urlLabelValue)) + c.retriesCount = metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_retries_count_total{url=%q}`, c.urlLabelValue)) for i := 0; i < concurrency; i++ { c.wg.Add(1) go func() { diff --git a/app/vmagent/remotewrite/remotewrite.go b/app/vmagent/remotewrite/remotewrite.go index 81279e60d..ed2f5fb08 100644 --- a/app/vmagent/remotewrite/remotewrite.go +++ b/app/vmagent/remotewrite/remotewrite.go @@ -26,6 +26,10 @@ var ( "isn't enough for sending high volume of collected data to remote storage") showRemoteWriteURL = flag.Bool("remoteWrite.showURL", false, "Whether to show -remoteWrite.url in the exported metrics. "+ "It is hidden by default, since it can contain sensistive auth info") + maxPendingBytesPerURL = flag.Int("remoteWrite.maxDiskUsagePerURL", 0, "The maximum file-based buffer size in bytes at -remoteWrite.tmpDataPath "+ + "for each -remoteWrite.url. When buffer size reaches the configured maximum, then old data is dropped when adding new data to the buffer. "+ + "Buffered data is stored in ~500MB chunks, so the minimum practical value for this flag is 500000000. "+ + "Disk usage is unlimited if the value is set to 0") ) var rwctxs []*remoteWriteCtx @@ -130,11 +134,11 @@ type remoteWriteCtx struct { func newRemoteWriteCtx(remoteWriteURL, relabelConfigPath string, maxInmemoryBlocks int, urlLabelValue string) *remoteWriteCtx { h := xxhash.Sum64([]byte(remoteWriteURL)) path := fmt.Sprintf("%s/persistent-queue/%016X", *tmpDataPath, h) - fq := persistentqueue.MustOpenFastQueue(path, remoteWriteURL, maxInmemoryBlocks) - _ = metrics.NewGauge(fmt.Sprintf(`vmagent_remotewrite_pending_data_bytes{url=%q, hash="%016X"}`, urlLabelValue, h), func() float64 { + fq := persistentqueue.MustOpenFastQueue(path, remoteWriteURL, maxInmemoryBlocks, *maxPendingBytesPerURL) + _ = metrics.GetOrCreateGauge(fmt.Sprintf(`vmagent_remotewrite_pending_data_bytes{path=%q, url=%q}`, path, urlLabelValue), func() float64 { return float64(fq.GetPendingBytes()) }) - _ = metrics.NewGauge(fmt.Sprintf(`vmagent_remotewrite_pending_inmemory_blocks{url=%q}`, urlLabelValue), func() float64 { + _ = metrics.GetOrCreateGauge(fmt.Sprintf(`vmagent_remotewrite_pending_inmemory_blocks{path=%q, url=%q}`, path, urlLabelValue), func() float64 { return float64(fq.GetInmemoryQueueLen()) }) c := newClient(remoteWriteURL, urlLabelValue, fq, *queues) @@ -156,7 +160,7 @@ func newRemoteWriteCtx(remoteWriteURL, relabelConfigPath string, maxInmemoryBloc prcs: prcs, pss: pss, - relabelMetricsDropped: metrics.NewCounter(fmt.Sprintf(`vmagent_remotewrite_relabel_metrics_dropped_total{url=%q}`, urlLabelValue)), + relabelMetricsDropped: metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_relabel_metrics_dropped_total{path=%q, url=%q}`, path, urlLabelValue)), } } diff --git a/lib/persistentqueue/fastqueue.go b/lib/persistentqueue/fastqueue.go index 7f578d7ce..898ef9d9d 100644 --- a/lib/persistentqueue/fastqueue.go +++ b/lib/persistentqueue/fastqueue.go @@ -32,8 +32,12 @@ type FastQueue struct { // MustOpenFastQueue opens persistent queue at the given path. // // It holds up to maxInmemoryBlocks in memory before falling back to file-based persistence. -func MustOpenFastQueue(path, name string, maxInmemoryBlocks int) *FastQueue { - pq := MustOpen(path, name) +// +// if maxPendingBytes is 0, then the queue size is unlimited. +// Otherwise its size is limited by maxPendingBytes. The oldest data is dropped when the queue +// reaches maxPendingSize. +func MustOpenFastQueue(path, name string, maxInmemoryBlocks, maxPendingBytes int) *FastQueue { + pq := MustOpen(path, name, maxPendingBytes) fq := &FastQueue{ pq: pq, ch: make(chan *bytesutil.ByteBuffer, maxInmemoryBlocks), @@ -150,5 +154,3 @@ func (fq *FastQueue) MustReadBlock(dst []byte) ([]byte, bool) { fq.cond.Wait() } } - -var blockBufPool bytesutil.ByteBufferPool diff --git a/lib/persistentqueue/fastqueue_test.go b/lib/persistentqueue/fastqueue_test.go index 289885a8c..1b5f727a9 100644 --- a/lib/persistentqueue/fastqueue_test.go +++ b/lib/persistentqueue/fastqueue_test.go @@ -11,7 +11,7 @@ func TestFastQueueOpenClose(t *testing.T) { path := "fast-queue-open-close" mustDeleteDir(path) for i := 0; i < 10; i++ { - fq := MustOpenFastQueue(path, "foobar", 100) + fq := MustOpenFastQueue(path, "foobar", 100, 0) fq.MustClose() } mustDeleteDir(path) @@ -22,13 +22,19 @@ func TestFastQueueWriteReadInmemory(t *testing.T) { mustDeleteDir(path) capacity := 100 - fq := MustOpenFastQueue(path, "foobar", capacity) + fq := MustOpenFastQueue(path, "foobar", capacity, 0) + if n := fq.GetInmemoryQueueLen(); n != 0 { + t.Fatalf("unexpected non-zero inmemory queue size: %d", n) + } var blocks []string for i := 0; i < capacity; i++ { block := fmt.Sprintf("block %d", i) fq.MustWriteBlock([]byte(block)) blocks = append(blocks, block) } + if n := fq.GetInmemoryQueueLen(); n != capacity { + t.Fatalf("unexpected size of inmemory queue; got %d; want %d", n, capacity) + } for _, block := range blocks { buf, ok := fq.MustReadBlock(nil) if !ok { @@ -47,7 +53,7 @@ func TestFastQueueWriteReadMixed(t *testing.T) { mustDeleteDir(path) capacity := 100 - fq := MustOpenFastQueue(path, "foobar", capacity) + fq := MustOpenFastQueue(path, "foobar", capacity, 0) if n := fq.GetPendingBytes(); n != 0 { t.Fatalf("the number of pending bytes must be 0; got %d", n) } @@ -81,7 +87,7 @@ func TestFastQueueWriteReadWithCloses(t *testing.T) { mustDeleteDir(path) capacity := 100 - fq := MustOpenFastQueue(path, "foobar", capacity) + fq := MustOpenFastQueue(path, "foobar", capacity, 0) if n := fq.GetPendingBytes(); n != 0 { t.Fatalf("the number of pending bytes must be 0; got %d", n) } @@ -91,7 +97,7 @@ func TestFastQueueWriteReadWithCloses(t *testing.T) { fq.MustWriteBlock([]byte(block)) blocks = append(blocks, block) fq.MustClose() - fq = MustOpenFastQueue(path, "foobar", capacity) + fq = MustOpenFastQueue(path, "foobar", capacity, 0) } if n := fq.GetPendingBytes(); n == 0 { t.Fatalf("the number of pending bytes must be greater than 0") @@ -105,7 +111,7 @@ func TestFastQueueWriteReadWithCloses(t *testing.T) { t.Fatalf("unexpected block read; got %q; want %q", buf, block) } fq.MustClose() - fq = MustOpenFastQueue(path, "foobar", capacity) + fq = MustOpenFastQueue(path, "foobar", capacity, 0) } if n := fq.GetPendingBytes(); n != 0 { t.Fatalf("the number of pending bytes must be 0; got %d", n) @@ -118,7 +124,7 @@ func TestFastQueueReadUnblockByClose(t *testing.T) { path := "fast-queue-read-unblock-by-close" mustDeleteDir(path) - fq := MustOpenFastQueue(path, "foorbar", 123) + fq := MustOpenFastQueue(path, "foorbar", 123, 0) resultCh := make(chan error) go func() { data, ok := fq.MustReadBlock(nil) @@ -148,7 +154,7 @@ func TestFastQueueReadUnblockByWrite(t *testing.T) { path := "fast-queue-read-unblock-by-write" mustDeleteDir(path) - fq := MustOpenFastQueue(path, "foobar", 13) + fq := MustOpenFastQueue(path, "foobar", 13, 0) block := fmt.Sprintf("foodsafdsaf sdf") resultCh := make(chan error) go func() { @@ -180,7 +186,7 @@ func TestFastQueueReadWriteConcurrent(t *testing.T) { path := "fast-queue-read-write-concurrent" mustDeleteDir(path) - fq := MustOpenFastQueue(path, "foobar", 5) + fq := MustOpenFastQueue(path, "foobar", 5, 0) var blocks []string blocksMap := make(map[string]bool) @@ -244,7 +250,7 @@ func TestFastQueueReadWriteConcurrent(t *testing.T) { readersWG.Wait() // Collect the remaining data - fq = MustOpenFastQueue(path, "foobar", 5) + fq = MustOpenFastQueue(path, "foobar", 5, 0) resultCh := make(chan error) go func() { for len(blocksMap) > 0 { diff --git a/lib/persistentqueue/fastqueue_timing_test.go b/lib/persistentqueue/fastqueue_timing_test.go index 78e959479..991846ed5 100644 --- a/lib/persistentqueue/fastqueue_timing_test.go +++ b/lib/persistentqueue/fastqueue_timing_test.go @@ -15,7 +15,7 @@ func BenchmarkFastQueueThroughputSerial(b *testing.B) { b.SetBytes(int64(blockSize) * iterationsCount) path := fmt.Sprintf("bench-fast-queue-throughput-serial-%d", blockSize) mustDeleteDir(path) - fq := MustOpenFastQueue(path, "foobar", iterationsCount*2) + fq := MustOpenFastQueue(path, "foobar", iterationsCount*2, 0) defer func() { fq.MustClose() mustDeleteDir(path) @@ -36,7 +36,7 @@ func BenchmarkFastQueueThroughputConcurrent(b *testing.B) { b.SetBytes(int64(blockSize) * iterationsCount) path := fmt.Sprintf("bench-fast-queue-throughput-concurrent-%d", blockSize) mustDeleteDir(path) - fq := MustOpenFastQueue(path, "foobar", iterationsCount*runtime.GOMAXPROCS(-1)*2) + fq := MustOpenFastQueue(path, "foobar", iterationsCount*runtime.GOMAXPROCS(-1)*2, 0) defer func() { fq.MustClose() mustDeleteDir(path) diff --git a/lib/persistentqueue/persistentqueue.go b/lib/persistentqueue/persistentqueue.go index e54f99802..56e13aada 100644 --- a/lib/persistentqueue/persistentqueue.go +++ b/lib/persistentqueue/persistentqueue.go @@ -15,6 +15,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/filestream" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fs" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" + "github.com/VictoriaMetrics/metrics" ) // MaxBlockSize is the maximum size of the block persistent queue can work with. @@ -26,8 +27,9 @@ var chunkFileNameRegex = regexp.MustCompile("^[0-9A-F]{16}$") // Queue represents persistent queue. type Queue struct { - chunkFileSize uint64 - maxBlockSize uint64 + chunkFileSize uint64 + maxBlockSize uint64 + maxPendingBytes uint64 dir string name string @@ -51,6 +53,15 @@ type Queue struct { writerFlushedOffset uint64 mustStop bool + + blocksDropped *metrics.Counter + bytesDropped *metrics.Counter + + blocksWritten *metrics.Counter + bytesWritten *metrics.Counter + + blocksRead *metrics.Counter + bytesRead *metrics.Counter } // ResetIfEmpty resets q if it is empty. @@ -111,22 +122,28 @@ func (q *Queue) GetPendingBytes() uint64 { } // MustOpen opens persistent queue from the given path. -func MustOpen(path, name string) *Queue { - return mustOpen(path, name, defaultChunkFileSize, MaxBlockSize) +// +// If maxPendingBytes is greater than 0, then the max queue size is limited by this value. +// The oldest data is deleted when queue size exceeds maxPendingBytes. +func MustOpen(path, name string, maxPendingBytes int) *Queue { + if maxPendingBytes < 0 { + maxPendingBytes = 0 + } + return mustOpen(path, name, defaultChunkFileSize, MaxBlockSize, uint64(maxPendingBytes)) } -func mustOpen(path, name string, chunkFileSize, maxBlockSize uint64) *Queue { +func mustOpen(path, name string, chunkFileSize, maxBlockSize, maxPendingBytes uint64) *Queue { if chunkFileSize < 8 || chunkFileSize-8 < maxBlockSize { logger.Panicf("BUG: too small chunkFileSize=%d for maxBlockSize=%d; chunkFileSize must fit at least one block", chunkFileSize, maxBlockSize) } if maxBlockSize <= 0 { logger.Panicf("BUG: maxBlockSize must be greater than 0; got %d", maxBlockSize) } - q, err := tryOpeningQueue(path, name, chunkFileSize, maxBlockSize) + q, err := tryOpeningQueue(path, name, chunkFileSize, maxBlockSize, maxPendingBytes) if err != nil { logger.Errorf("cannot open persistent queue at %q: %s; cleaning it up and trying again", path, err) fs.RemoveDirContents(path) - q, err = tryOpeningQueue(path, name, chunkFileSize, maxBlockSize) + q, err = tryOpeningQueue(path, name, chunkFileSize, maxBlockSize, maxPendingBytes) if err != nil { logger.Panicf("FATAL: %s", err) } @@ -134,14 +151,22 @@ func mustOpen(path, name string, chunkFileSize, maxBlockSize uint64) *Queue { return q } -func tryOpeningQueue(path, name string, chunkFileSize, maxBlockSize uint64) (*Queue, error) { +func tryOpeningQueue(path, name string, chunkFileSize, maxBlockSize, maxPendingBytes uint64) (*Queue, error) { var q Queue q.chunkFileSize = chunkFileSize q.maxBlockSize = maxBlockSize + q.maxPendingBytes = maxPendingBytes q.dir = path q.name = name q.cond.L = &q.mu + q.blocksDropped = metrics.GetOrCreateCounter(fmt.Sprintf(`vm_persistentqueue_blocks_dropped_total{path=%q}`, path)) + q.bytesDropped = metrics.GetOrCreateCounter(fmt.Sprintf(`vm_persistentqueue_bytes_dropped_total{path=%q}`, path)) + q.blocksWritten = metrics.GetOrCreateCounter(fmt.Sprintf(`vm_persistentqueue_blocks_written_total{path=%q}`, path)) + q.bytesWritten = metrics.GetOrCreateCounter(fmt.Sprintf(`vm_persistentqueue_bytes_written_total{path=%q}`, path)) + q.blocksRead = metrics.GetOrCreateCounter(fmt.Sprintf(`vm_persistentqueue_blocks_read_total{path=%q}`, path)) + q.bytesRead = metrics.GetOrCreateCounter(fmt.Sprintf(`vm_persistentqueue_bytes_read_total{path=%q}`, path)) + cleanOnError := func() { if q.reader != nil { q.reader.MustClose() @@ -333,6 +358,31 @@ func (q *Queue) MustWriteBlock(block []byte) { if q.readerOffset > q.writerOffset { logger.Panicf("BUG: readerOffset=%d shouldn't exceed writerOffset=%d", q.readerOffset, q.writerOffset) } + if q.maxPendingBytes > 0 { + // Drain the oldest blocks until the number of pending bytes becomes enough for the block. + blockSize := uint64(len(block) + 8) + maxPendingBytes := q.maxPendingBytes + if blockSize < maxPendingBytes { + maxPendingBytes -= blockSize + } else { + maxPendingBytes = 0 + } + bb := blockBufPool.Get() + for q.writerOffset-q.readerOffset > maxPendingBytes { + var err error + bb.B, err = q.readBlockLocked(bb.B[:0]) + if err != nil { + logger.Panicf("FATAL: cannot read the oldest block %s", err) + } + q.blocksDropped.Inc() + q.bytesDropped.Add(len(bb.B)) + } + blockBufPool.Put(bb) + if blockSize > q.maxPendingBytes { + // The block is too big to put it into the queue. Drop it. + return + } + } mustNotifyReader := q.readerOffset == q.writerOffset if err := q.writeBlockLocked(block); err != nil { logger.Panicf("FATAL: %s", err) @@ -342,6 +392,8 @@ func (q *Queue) MustWriteBlock(block []byte) { } } +var blockBufPool bytesutil.ByteBufferPool + func (q *Queue) writeBlockLocked(block []byte) error { if q.writerLocalOffset+q.maxBlockSize+8 > q.chunkFileSize { // Finalize the current chunk and start new one. @@ -376,6 +428,8 @@ func (q *Queue) writeBlockLocked(block []byte) error { if err := q.write(block); err != nil { return fmt.Errorf("cannot write block contents with size %d bytes to %q: %s", len(block), q.writerPath, err) } + q.blocksWritten.Inc() + q.bytesWritten.Add(len(block)) return nil } @@ -447,6 +501,8 @@ func (q *Queue) readBlockLocked(dst []byte) ([]byte, error) { if err := q.readFull(dst[dstLen:]); err != nil { return dst, fmt.Errorf("cannot read block contents with size %d bytes from %q: %s", blockLen, q.readerPath, err) } + q.blocksRead.Inc() + q.bytesRead.Add(int(blockLen)) return dst, nil } diff --git a/lib/persistentqueue/persistentqueue_test.go b/lib/persistentqueue/persistentqueue_test.go index 75e2fbc3e..7a18af4ab 100644 --- a/lib/persistentqueue/persistentqueue_test.go +++ b/lib/persistentqueue/persistentqueue_test.go @@ -4,6 +4,7 @@ import ( "fmt" "io/ioutil" "os" + "strconv" "sync" "testing" "time" @@ -13,7 +14,7 @@ func TestQueueOpenClose(t *testing.T) { path := "queue-open-close" mustDeleteDir(path) for i := 0; i < 3; i++ { - q := MustOpen(path, "foobar") + q := MustOpen(path, "foobar", 0) if n := q.GetPendingBytes(); n > 0 { t.Fatalf("pending bytes must be 0; got %d", n) } @@ -27,7 +28,7 @@ func TestQueueOpen(t *testing.T) { path := "queue-open-invalid-metainfo" mustCreateDir(path) mustCreateFile(path+"/metainfo.json", "foobarbaz") - q := MustOpen(path, "foobar") + q := MustOpen(path, "foobar", 0) q.MustClose() mustDeleteDir(path) }) @@ -37,7 +38,7 @@ func TestQueueOpen(t *testing.T) { mustCreateEmptyMetainfo(path, "foobar") mustCreateFile(path+"/junk-file", "foobar") mustCreateDir(path + "/junk-dir") - q := MustOpen(path, "foobar") + q := MustOpen(path, "foobar", 0) q.MustClose() mustDeleteDir(path) }) @@ -46,7 +47,7 @@ func TestQueueOpen(t *testing.T) { mustCreateDir(path) mustCreateEmptyMetainfo(path, "foobar") mustCreateFile(fmt.Sprintf("%s/%016X", path, 1234), "qwere") - q := MustOpen(path, "foobar") + q := MustOpen(path, "foobar", 0) q.MustClose() mustDeleteDir(path) }) @@ -55,7 +56,7 @@ func TestQueueOpen(t *testing.T) { mustCreateDir(path) mustCreateEmptyMetainfo(path, "foobar") mustCreateFile(fmt.Sprintf("%s/%016X", path, 100*uint64(defaultChunkFileSize)), "asdf") - q := MustOpen(path, "foobar") + q := MustOpen(path, "foobar", 0) q.MustClose() mustDeleteDir(path) }) @@ -71,7 +72,7 @@ func TestQueueOpen(t *testing.T) { t.Fatalf("unexpected error: %s", err) } mustCreateFile(fmt.Sprintf("%s/%016X", path, 0), "adfsfd") - q := MustOpen(path, mi.Name) + q := MustOpen(path, mi.Name, 0) q.MustClose() mustDeleteDir(path) }) @@ -85,7 +86,7 @@ func TestQueueOpen(t *testing.T) { if err := mi.WriteToFile(path + "/metainfo.json"); err != nil { t.Fatalf("unexpected error: %s", err) } - q := MustOpen(path, mi.Name) + q := MustOpen(path, mi.Name, 0) q.MustClose() mustDeleteDir(path) }) @@ -93,7 +94,7 @@ func TestQueueOpen(t *testing.T) { path := "queue-open-metainfo-dir" mustCreateDir(path) mustCreateDir(path + "/metainfo.json") - q := MustOpen(path, "foobar") + q := MustOpen(path, "foobar", 0) q.MustClose() mustDeleteDir(path) }) @@ -109,7 +110,7 @@ func TestQueueOpen(t *testing.T) { t.Fatalf("unexpected error: %s", err) } mustCreateFile(fmt.Sprintf("%s/%016X", path, 0), "sdf") - q := MustOpen(path, mi.Name) + q := MustOpen(path, mi.Name, 0) q.MustClose() mustDeleteDir(path) }) @@ -118,7 +119,7 @@ func TestQueueOpen(t *testing.T) { mustCreateDir(path) mustCreateEmptyMetainfo(path, "foobar") mustCreateFile(fmt.Sprintf("%s/%016X", path, 0), "sdfdsf") - q := MustOpen(path, "foobar") + q := MustOpen(path, "foobar", 0) q.MustClose() mustDeleteDir(path) }) @@ -132,16 +133,43 @@ func TestQueueOpen(t *testing.T) { t.Fatalf("unexpected error: %s", err) } mustCreateFile(fmt.Sprintf("%s/%016X", path, 0), "sdf") - q := MustOpen(path, "baz") + q := MustOpen(path, "baz", 0) q.MustClose() mustDeleteDir(path) }) } +func TestQueueResetIfEmpty(t *testing.T) { + path := "queue-reset-if-empty" + mustDeleteDir(path) + q := MustOpen(path, "foobar", 0) + defer func() { + q.MustClose() + mustDeleteDir(path) + }() + + block := make([]byte, 1024*1024) + var buf []byte + for j := 0; j < 10; j++ { + for i := 0; i < 10; i++ { + q.MustWriteBlock(block) + var ok bool + buf, ok = q.MustReadBlock(buf[:0]) + if !ok { + t.Fatalf("unexpected ok=false returned from MustReadBlock") + } + } + q.ResetIfEmpty() + if n := q.GetPendingBytes(); n > 0 { + t.Fatalf("unexpected non-zer pending bytes after queue reset: %d", n) + } + } +} + func TestQueueWriteRead(t *testing.T) { path := "queue-write-read" mustDeleteDir(path) - q := MustOpen(path, "foobar") + q := MustOpen(path, "foobar", 0) defer func() { q.MustClose() mustDeleteDir(path) @@ -177,7 +205,7 @@ func TestQueueWriteRead(t *testing.T) { func TestQueueWriteCloseRead(t *testing.T) { path := "queue-write-close-read" mustDeleteDir(path) - q := MustOpen(path, "foobar") + q := MustOpen(path, "foobar", 0) defer func() { q.MustClose() mustDeleteDir(path) @@ -194,7 +222,7 @@ func TestQueueWriteCloseRead(t *testing.T) { t.Fatalf("pending bytes must be greater than 0; got %d", n) } q.MustClose() - q = MustOpen(path, "foobar") + q = MustOpen(path, "foobar", 0) if n := q.GetPendingBytes(); n <= 0 { t.Fatalf("pending bytes must be greater than 0; got %d", n) } @@ -218,7 +246,7 @@ func TestQueueWriteCloseRead(t *testing.T) { func TestQueueReadEmpty(t *testing.T) { path := "queue-read-empty" mustDeleteDir(path) - q := MustOpen(path, "foobar") + q := MustOpen(path, "foobar", 0) defer mustDeleteDir(path) resultCh := make(chan error) @@ -249,7 +277,7 @@ func TestQueueReadEmpty(t *testing.T) { func TestQueueReadWriteConcurrent(t *testing.T) { path := "queue-read-write-concurrent" mustDeleteDir(path) - q := MustOpen(path, "foobar") + q := MustOpen(path, "foobar", 0) defer mustDeleteDir(path) blocksMap := make(map[string]bool, 1000) @@ -309,7 +337,7 @@ func TestQueueReadWriteConcurrent(t *testing.T) { readersWG.Wait() // Read the remaining blocks in q. - q = MustOpen(path, "foobar") + q = MustOpen(path, "foobar", 0) defer q.MustClose() resultCh := make(chan error) go func() { @@ -345,7 +373,7 @@ func TestQueueChunkManagementSimple(t *testing.T) { mustDeleteDir(path) const chunkFileSize = 100 const maxBlockSize = 20 - q := mustOpen(path, "foobar", chunkFileSize, maxBlockSize) + q := mustOpen(path, "foobar", chunkFileSize, maxBlockSize, 0) defer mustDeleteDir(path) defer q.MustClose() var blocks []string @@ -376,7 +404,7 @@ func TestQueueChunkManagementPeriodicClose(t *testing.T) { mustDeleteDir(path) const chunkFileSize = 100 const maxBlockSize = 20 - q := mustOpen(path, "foobar", chunkFileSize, maxBlockSize) + q := mustOpen(path, "foobar", chunkFileSize, maxBlockSize, 0) defer func() { q.MustClose() mustDeleteDir(path) @@ -387,7 +415,7 @@ func TestQueueChunkManagementPeriodicClose(t *testing.T) { q.MustWriteBlock([]byte(block)) blocks = append(blocks, block) q.MustClose() - q = mustOpen(path, "foobar", chunkFileSize, maxBlockSize) + q = mustOpen(path, "foobar", chunkFileSize, maxBlockSize, 0) } if n := q.GetPendingBytes(); n == 0 { t.Fatalf("unexpected zero number of bytes pending") @@ -401,13 +429,70 @@ func TestQueueChunkManagementPeriodicClose(t *testing.T) { t.Fatalf("unexpected block read; got %q; want %q", data, block) } q.MustClose() - q = mustOpen(path, "foobar", chunkFileSize, maxBlockSize) + q = mustOpen(path, "foobar", chunkFileSize, maxBlockSize, 0) } if n := q.GetPendingBytes(); n != 0 { t.Fatalf("unexpected non-zero number of pending bytes: %d", n) } } +func TestQueueLimitedSize(t *testing.T) { + const maxPendingBytes = 1000 + path := "queue-limited-size" + mustDeleteDir(path) + q := MustOpen(path, "foobar", maxPendingBytes) + defer func() { + q.MustClose() + mustDeleteDir(path) + }() + + // Check that small blocks are successfully buffered and read + var blocks []string + for i := 0; i < 10; i++ { + block := fmt.Sprintf("block_%d", i) + q.MustWriteBlock([]byte(block)) + blocks = append(blocks, block) + } + var buf []byte + var ok bool + for _, block := range blocks { + buf, ok = q.MustReadBlock(buf[:0]) + if !ok { + t.Fatalf("unexpected ok=false") + } + if string(buf) != block { + t.Fatalf("unexpected block read; got %q; want %q", buf, block) + } + } + + // Make sure that old blocks are dropped on queue size overflow + for i := 0; i < maxPendingBytes; i++ { + block := fmt.Sprintf("%d", i) + q.MustWriteBlock([]byte(block)) + } + if n := q.GetPendingBytes(); n > maxPendingBytes { + t.Fatalf("too many pending bytes; got %d; mustn't exceed %d", n, maxPendingBytes) + } + buf, ok = q.MustReadBlock(buf[:0]) + if !ok { + t.Fatalf("unexpected ok=false") + } + blockNum, err := strconv.Atoi(string(buf)) + if err != nil { + t.Fatalf("cannot parse block contents: %s", err) + } + if blockNum < 20 { + t.Fatalf("too small block number: %d; it looks like it wasn't dropped", blockNum) + } + + // Try writing a block with too big size + block := make([]byte, maxPendingBytes+1) + q.MustWriteBlock(block) + if n := q.GetPendingBytes(); n != 0 { + t.Fatalf("unexpected non-empty queue after writing a block with too big size; queue size: %d bytes", n) + } +} + func mustCreateFile(path, contents string) { if err := ioutil.WriteFile(path, []byte(contents), 0600); err != nil { panic(fmt.Errorf("cannot create file %q with %d bytes contents: %s", path, len(contents), err)) diff --git a/lib/persistentqueue/persistentqueue_timing_test.go b/lib/persistentqueue/persistentqueue_timing_test.go index 853ee2125..02e87513f 100644 --- a/lib/persistentqueue/persistentqueue_timing_test.go +++ b/lib/persistentqueue/persistentqueue_timing_test.go @@ -16,7 +16,7 @@ func BenchmarkQueueThroughputSerial(b *testing.B) { b.SetBytes(int64(blockSize) * iterationsCount) path := fmt.Sprintf("bench-queue-throughput-serial-%d", blockSize) mustDeleteDir(path) - q := MustOpen(path, "foobar") + q := MustOpen(path, "foobar", 0) defer func() { q.MustClose() mustDeleteDir(path) @@ -37,7 +37,7 @@ func BenchmarkQueueThroughputConcurrent(b *testing.B) { b.SetBytes(int64(blockSize) * iterationsCount) path := fmt.Sprintf("bench-queue-throughput-concurrent-%d", blockSize) mustDeleteDir(path) - q := MustOpen(path, "foobar") + q := MustOpen(path, "foobar", 0) defer func() { q.MustClose() mustDeleteDir(path)