app/vmagent: add -remoteWrite.maxDiskUsagePerURL for limiting the maximum disk usage for each -remoteWrite.url buffer

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/352
This commit is contained in:
Aliaksandr Valialkin 2020-03-03 19:48:46 +02:00
parent 808c17e250
commit f01d1bf4a8
9 changed files with 209 additions and 56 deletions

View file

@ -28,7 +28,7 @@ to `vmagent` (like the ability to push metrics instead of pulling them). We did
* Can replicate collected metrics simultaneously to multiple remote storage systems.
* Works in environments with unstable connections to remote storage. If the remote storage is unavailable, the collected metrics
are buffered at `-remoteWrite.tmpDataPath`. The buffered metrics are sent to remote storage as soon as connection
to remote storage is recovered.
to remote storage is recovered. The maximum disk usage for the buffer can be limited with `-remoteWrite.maxDiskUsagePerURL`.
* Uses lower amounts of RAM, CPU, disk IO and network bandwidth comparing to Prometheus.

View file

@ -122,10 +122,10 @@ func newClient(remoteWriteURL, urlLabelValue string, fq *persistentqueue.FastQue
hc: hc,
stopCh: make(chan struct{}),
}
c.requestDuration = metrics.NewHistogram(fmt.Sprintf(`vmagent_remotewrite_duration_seconds{url=%q}`, c.urlLabelValue))
c.requestsOKCount = metrics.NewCounter(fmt.Sprintf(`vmagent_remotewrite_requests_total{url=%q, status_code="2XX"}`, c.urlLabelValue))
c.errorsCount = metrics.NewCounter(fmt.Sprintf(`vmagent_remotewrite_errors_total{url=%q}`, c.urlLabelValue))
c.retriesCount = metrics.NewCounter(fmt.Sprintf(`vmagent_remotewrite_retries_count_total{url=%q}`, c.urlLabelValue))
c.requestDuration = metrics.GetOrCreateHistogram(fmt.Sprintf(`vmagent_remotewrite_duration_seconds{url=%q}`, c.urlLabelValue))
c.requestsOKCount = metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_requests_total{url=%q, status_code="2XX"}`, c.urlLabelValue))
c.errorsCount = metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_errors_total{url=%q}`, c.urlLabelValue))
c.retriesCount = metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_retries_count_total{url=%q}`, c.urlLabelValue))
for i := 0; i < concurrency; i++ {
c.wg.Add(1)
go func() {

View file

@ -26,6 +26,10 @@ var (
"isn't enough for sending high volume of collected data to remote storage")
showRemoteWriteURL = flag.Bool("remoteWrite.showURL", false, "Whether to show -remoteWrite.url in the exported metrics. "+
"It is hidden by default, since it can contain sensistive auth info")
maxPendingBytesPerURL = flag.Int("remoteWrite.maxDiskUsagePerURL", 0, "The maximum file-based buffer size in bytes at -remoteWrite.tmpDataPath "+
"for each -remoteWrite.url. When buffer size reaches the configured maximum, then old data is dropped when adding new data to the buffer. "+
"Buffered data is stored in ~500MB chunks, so the minimum practical value for this flag is 500000000. "+
"Disk usage is unlimited if the value is set to 0")
)
var rwctxs []*remoteWriteCtx
@ -130,11 +134,11 @@ type remoteWriteCtx struct {
func newRemoteWriteCtx(remoteWriteURL, relabelConfigPath string, maxInmemoryBlocks int, urlLabelValue string) *remoteWriteCtx {
h := xxhash.Sum64([]byte(remoteWriteURL))
path := fmt.Sprintf("%s/persistent-queue/%016X", *tmpDataPath, h)
fq := persistentqueue.MustOpenFastQueue(path, remoteWriteURL, maxInmemoryBlocks)
_ = metrics.NewGauge(fmt.Sprintf(`vmagent_remotewrite_pending_data_bytes{url=%q, hash="%016X"}`, urlLabelValue, h), func() float64 {
fq := persistentqueue.MustOpenFastQueue(path, remoteWriteURL, maxInmemoryBlocks, *maxPendingBytesPerURL)
_ = metrics.GetOrCreateGauge(fmt.Sprintf(`vmagent_remotewrite_pending_data_bytes{path=%q, url=%q}`, path, urlLabelValue), func() float64 {
return float64(fq.GetPendingBytes())
})
_ = metrics.NewGauge(fmt.Sprintf(`vmagent_remotewrite_pending_inmemory_blocks{url=%q}`, urlLabelValue), func() float64 {
_ = metrics.GetOrCreateGauge(fmt.Sprintf(`vmagent_remotewrite_pending_inmemory_blocks{path=%q, url=%q}`, path, urlLabelValue), func() float64 {
return float64(fq.GetInmemoryQueueLen())
})
c := newClient(remoteWriteURL, urlLabelValue, fq, *queues)
@ -156,7 +160,7 @@ func newRemoteWriteCtx(remoteWriteURL, relabelConfigPath string, maxInmemoryBloc
prcs: prcs,
pss: pss,
relabelMetricsDropped: metrics.NewCounter(fmt.Sprintf(`vmagent_remotewrite_relabel_metrics_dropped_total{url=%q}`, urlLabelValue)),
relabelMetricsDropped: metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_relabel_metrics_dropped_total{path=%q, url=%q}`, path, urlLabelValue)),
}
}

View file

@ -32,8 +32,12 @@ type FastQueue struct {
// MustOpenFastQueue opens persistent queue at the given path.
//
// It holds up to maxInmemoryBlocks in memory before falling back to file-based persistence.
func MustOpenFastQueue(path, name string, maxInmemoryBlocks int) *FastQueue {
pq := MustOpen(path, name)
//
// if maxPendingBytes is 0, then the queue size is unlimited.
// Otherwise its size is limited by maxPendingBytes. The oldest data is dropped when the queue
// reaches maxPendingSize.
func MustOpenFastQueue(path, name string, maxInmemoryBlocks, maxPendingBytes int) *FastQueue {
pq := MustOpen(path, name, maxPendingBytes)
fq := &FastQueue{
pq: pq,
ch: make(chan *bytesutil.ByteBuffer, maxInmemoryBlocks),
@ -150,5 +154,3 @@ func (fq *FastQueue) MustReadBlock(dst []byte) ([]byte, bool) {
fq.cond.Wait()
}
}
var blockBufPool bytesutil.ByteBufferPool

View file

@ -11,7 +11,7 @@ func TestFastQueueOpenClose(t *testing.T) {
path := "fast-queue-open-close"
mustDeleteDir(path)
for i := 0; i < 10; i++ {
fq := MustOpenFastQueue(path, "foobar", 100)
fq := MustOpenFastQueue(path, "foobar", 100, 0)
fq.MustClose()
}
mustDeleteDir(path)
@ -22,13 +22,19 @@ func TestFastQueueWriteReadInmemory(t *testing.T) {
mustDeleteDir(path)
capacity := 100
fq := MustOpenFastQueue(path, "foobar", capacity)
fq := MustOpenFastQueue(path, "foobar", capacity, 0)
if n := fq.GetInmemoryQueueLen(); n != 0 {
t.Fatalf("unexpected non-zero inmemory queue size: %d", n)
}
var blocks []string
for i := 0; i < capacity; i++ {
block := fmt.Sprintf("block %d", i)
fq.MustWriteBlock([]byte(block))
blocks = append(blocks, block)
}
if n := fq.GetInmemoryQueueLen(); n != capacity {
t.Fatalf("unexpected size of inmemory queue; got %d; want %d", n, capacity)
}
for _, block := range blocks {
buf, ok := fq.MustReadBlock(nil)
if !ok {
@ -47,7 +53,7 @@ func TestFastQueueWriteReadMixed(t *testing.T) {
mustDeleteDir(path)
capacity := 100
fq := MustOpenFastQueue(path, "foobar", capacity)
fq := MustOpenFastQueue(path, "foobar", capacity, 0)
if n := fq.GetPendingBytes(); n != 0 {
t.Fatalf("the number of pending bytes must be 0; got %d", n)
}
@ -81,7 +87,7 @@ func TestFastQueueWriteReadWithCloses(t *testing.T) {
mustDeleteDir(path)
capacity := 100
fq := MustOpenFastQueue(path, "foobar", capacity)
fq := MustOpenFastQueue(path, "foobar", capacity, 0)
if n := fq.GetPendingBytes(); n != 0 {
t.Fatalf("the number of pending bytes must be 0; got %d", n)
}
@ -91,7 +97,7 @@ func TestFastQueueWriteReadWithCloses(t *testing.T) {
fq.MustWriteBlock([]byte(block))
blocks = append(blocks, block)
fq.MustClose()
fq = MustOpenFastQueue(path, "foobar", capacity)
fq = MustOpenFastQueue(path, "foobar", capacity, 0)
}
if n := fq.GetPendingBytes(); n == 0 {
t.Fatalf("the number of pending bytes must be greater than 0")
@ -105,7 +111,7 @@ func TestFastQueueWriteReadWithCloses(t *testing.T) {
t.Fatalf("unexpected block read; got %q; want %q", buf, block)
}
fq.MustClose()
fq = MustOpenFastQueue(path, "foobar", capacity)
fq = MustOpenFastQueue(path, "foobar", capacity, 0)
}
if n := fq.GetPendingBytes(); n != 0 {
t.Fatalf("the number of pending bytes must be 0; got %d", n)
@ -118,7 +124,7 @@ func TestFastQueueReadUnblockByClose(t *testing.T) {
path := "fast-queue-read-unblock-by-close"
mustDeleteDir(path)
fq := MustOpenFastQueue(path, "foorbar", 123)
fq := MustOpenFastQueue(path, "foorbar", 123, 0)
resultCh := make(chan error)
go func() {
data, ok := fq.MustReadBlock(nil)
@ -148,7 +154,7 @@ func TestFastQueueReadUnblockByWrite(t *testing.T) {
path := "fast-queue-read-unblock-by-write"
mustDeleteDir(path)
fq := MustOpenFastQueue(path, "foobar", 13)
fq := MustOpenFastQueue(path, "foobar", 13, 0)
block := fmt.Sprintf("foodsafdsaf sdf")
resultCh := make(chan error)
go func() {
@ -180,7 +186,7 @@ func TestFastQueueReadWriteConcurrent(t *testing.T) {
path := "fast-queue-read-write-concurrent"
mustDeleteDir(path)
fq := MustOpenFastQueue(path, "foobar", 5)
fq := MustOpenFastQueue(path, "foobar", 5, 0)
var blocks []string
blocksMap := make(map[string]bool)
@ -244,7 +250,7 @@ func TestFastQueueReadWriteConcurrent(t *testing.T) {
readersWG.Wait()
// Collect the remaining data
fq = MustOpenFastQueue(path, "foobar", 5)
fq = MustOpenFastQueue(path, "foobar", 5, 0)
resultCh := make(chan error)
go func() {
for len(blocksMap) > 0 {

View file

@ -15,7 +15,7 @@ func BenchmarkFastQueueThroughputSerial(b *testing.B) {
b.SetBytes(int64(blockSize) * iterationsCount)
path := fmt.Sprintf("bench-fast-queue-throughput-serial-%d", blockSize)
mustDeleteDir(path)
fq := MustOpenFastQueue(path, "foobar", iterationsCount*2)
fq := MustOpenFastQueue(path, "foobar", iterationsCount*2, 0)
defer func() {
fq.MustClose()
mustDeleteDir(path)
@ -36,7 +36,7 @@ func BenchmarkFastQueueThroughputConcurrent(b *testing.B) {
b.SetBytes(int64(blockSize) * iterationsCount)
path := fmt.Sprintf("bench-fast-queue-throughput-concurrent-%d", blockSize)
mustDeleteDir(path)
fq := MustOpenFastQueue(path, "foobar", iterationsCount*runtime.GOMAXPROCS(-1)*2)
fq := MustOpenFastQueue(path, "foobar", iterationsCount*runtime.GOMAXPROCS(-1)*2, 0)
defer func() {
fq.MustClose()
mustDeleteDir(path)

View file

@ -15,6 +15,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/filestream"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/metrics"
)
// MaxBlockSize is the maximum size of the block persistent queue can work with.
@ -26,8 +27,9 @@ var chunkFileNameRegex = regexp.MustCompile("^[0-9A-F]{16}$")
// Queue represents persistent queue.
type Queue struct {
chunkFileSize uint64
maxBlockSize uint64
chunkFileSize uint64
maxBlockSize uint64
maxPendingBytes uint64
dir string
name string
@ -51,6 +53,15 @@ type Queue struct {
writerFlushedOffset uint64
mustStop bool
blocksDropped *metrics.Counter
bytesDropped *metrics.Counter
blocksWritten *metrics.Counter
bytesWritten *metrics.Counter
blocksRead *metrics.Counter
bytesRead *metrics.Counter
}
// ResetIfEmpty resets q if it is empty.
@ -111,22 +122,28 @@ func (q *Queue) GetPendingBytes() uint64 {
}
// MustOpen opens persistent queue from the given path.
func MustOpen(path, name string) *Queue {
return mustOpen(path, name, defaultChunkFileSize, MaxBlockSize)
//
// If maxPendingBytes is greater than 0, then the max queue size is limited by this value.
// The oldest data is deleted when queue size exceeds maxPendingBytes.
func MustOpen(path, name string, maxPendingBytes int) *Queue {
if maxPendingBytes < 0 {
maxPendingBytes = 0
}
return mustOpen(path, name, defaultChunkFileSize, MaxBlockSize, uint64(maxPendingBytes))
}
func mustOpen(path, name string, chunkFileSize, maxBlockSize uint64) *Queue {
func mustOpen(path, name string, chunkFileSize, maxBlockSize, maxPendingBytes uint64) *Queue {
if chunkFileSize < 8 || chunkFileSize-8 < maxBlockSize {
logger.Panicf("BUG: too small chunkFileSize=%d for maxBlockSize=%d; chunkFileSize must fit at least one block", chunkFileSize, maxBlockSize)
}
if maxBlockSize <= 0 {
logger.Panicf("BUG: maxBlockSize must be greater than 0; got %d", maxBlockSize)
}
q, err := tryOpeningQueue(path, name, chunkFileSize, maxBlockSize)
q, err := tryOpeningQueue(path, name, chunkFileSize, maxBlockSize, maxPendingBytes)
if err != nil {
logger.Errorf("cannot open persistent queue at %q: %s; cleaning it up and trying again", path, err)
fs.RemoveDirContents(path)
q, err = tryOpeningQueue(path, name, chunkFileSize, maxBlockSize)
q, err = tryOpeningQueue(path, name, chunkFileSize, maxBlockSize, maxPendingBytes)
if err != nil {
logger.Panicf("FATAL: %s", err)
}
@ -134,14 +151,22 @@ func mustOpen(path, name string, chunkFileSize, maxBlockSize uint64) *Queue {
return q
}
func tryOpeningQueue(path, name string, chunkFileSize, maxBlockSize uint64) (*Queue, error) {
func tryOpeningQueue(path, name string, chunkFileSize, maxBlockSize, maxPendingBytes uint64) (*Queue, error) {
var q Queue
q.chunkFileSize = chunkFileSize
q.maxBlockSize = maxBlockSize
q.maxPendingBytes = maxPendingBytes
q.dir = path
q.name = name
q.cond.L = &q.mu
q.blocksDropped = metrics.GetOrCreateCounter(fmt.Sprintf(`vm_persistentqueue_blocks_dropped_total{path=%q}`, path))
q.bytesDropped = metrics.GetOrCreateCounter(fmt.Sprintf(`vm_persistentqueue_bytes_dropped_total{path=%q}`, path))
q.blocksWritten = metrics.GetOrCreateCounter(fmt.Sprintf(`vm_persistentqueue_blocks_written_total{path=%q}`, path))
q.bytesWritten = metrics.GetOrCreateCounter(fmt.Sprintf(`vm_persistentqueue_bytes_written_total{path=%q}`, path))
q.blocksRead = metrics.GetOrCreateCounter(fmt.Sprintf(`vm_persistentqueue_blocks_read_total{path=%q}`, path))
q.bytesRead = metrics.GetOrCreateCounter(fmt.Sprintf(`vm_persistentqueue_bytes_read_total{path=%q}`, path))
cleanOnError := func() {
if q.reader != nil {
q.reader.MustClose()
@ -333,6 +358,31 @@ func (q *Queue) MustWriteBlock(block []byte) {
if q.readerOffset > q.writerOffset {
logger.Panicf("BUG: readerOffset=%d shouldn't exceed writerOffset=%d", q.readerOffset, q.writerOffset)
}
if q.maxPendingBytes > 0 {
// Drain the oldest blocks until the number of pending bytes becomes enough for the block.
blockSize := uint64(len(block) + 8)
maxPendingBytes := q.maxPendingBytes
if blockSize < maxPendingBytes {
maxPendingBytes -= blockSize
} else {
maxPendingBytes = 0
}
bb := blockBufPool.Get()
for q.writerOffset-q.readerOffset > maxPendingBytes {
var err error
bb.B, err = q.readBlockLocked(bb.B[:0])
if err != nil {
logger.Panicf("FATAL: cannot read the oldest block %s", err)
}
q.blocksDropped.Inc()
q.bytesDropped.Add(len(bb.B))
}
blockBufPool.Put(bb)
if blockSize > q.maxPendingBytes {
// The block is too big to put it into the queue. Drop it.
return
}
}
mustNotifyReader := q.readerOffset == q.writerOffset
if err := q.writeBlockLocked(block); err != nil {
logger.Panicf("FATAL: %s", err)
@ -342,6 +392,8 @@ func (q *Queue) MustWriteBlock(block []byte) {
}
}
var blockBufPool bytesutil.ByteBufferPool
func (q *Queue) writeBlockLocked(block []byte) error {
if q.writerLocalOffset+q.maxBlockSize+8 > q.chunkFileSize {
// Finalize the current chunk and start new one.
@ -376,6 +428,8 @@ func (q *Queue) writeBlockLocked(block []byte) error {
if err := q.write(block); err != nil {
return fmt.Errorf("cannot write block contents with size %d bytes to %q: %s", len(block), q.writerPath, err)
}
q.blocksWritten.Inc()
q.bytesWritten.Add(len(block))
return nil
}
@ -447,6 +501,8 @@ func (q *Queue) readBlockLocked(dst []byte) ([]byte, error) {
if err := q.readFull(dst[dstLen:]); err != nil {
return dst, fmt.Errorf("cannot read block contents with size %d bytes from %q: %s", blockLen, q.readerPath, err)
}
q.blocksRead.Inc()
q.bytesRead.Add(int(blockLen))
return dst, nil
}

View file

@ -4,6 +4,7 @@ import (
"fmt"
"io/ioutil"
"os"
"strconv"
"sync"
"testing"
"time"
@ -13,7 +14,7 @@ func TestQueueOpenClose(t *testing.T) {
path := "queue-open-close"
mustDeleteDir(path)
for i := 0; i < 3; i++ {
q := MustOpen(path, "foobar")
q := MustOpen(path, "foobar", 0)
if n := q.GetPendingBytes(); n > 0 {
t.Fatalf("pending bytes must be 0; got %d", n)
}
@ -27,7 +28,7 @@ func TestQueueOpen(t *testing.T) {
path := "queue-open-invalid-metainfo"
mustCreateDir(path)
mustCreateFile(path+"/metainfo.json", "foobarbaz")
q := MustOpen(path, "foobar")
q := MustOpen(path, "foobar", 0)
q.MustClose()
mustDeleteDir(path)
})
@ -37,7 +38,7 @@ func TestQueueOpen(t *testing.T) {
mustCreateEmptyMetainfo(path, "foobar")
mustCreateFile(path+"/junk-file", "foobar")
mustCreateDir(path + "/junk-dir")
q := MustOpen(path, "foobar")
q := MustOpen(path, "foobar", 0)
q.MustClose()
mustDeleteDir(path)
})
@ -46,7 +47,7 @@ func TestQueueOpen(t *testing.T) {
mustCreateDir(path)
mustCreateEmptyMetainfo(path, "foobar")
mustCreateFile(fmt.Sprintf("%s/%016X", path, 1234), "qwere")
q := MustOpen(path, "foobar")
q := MustOpen(path, "foobar", 0)
q.MustClose()
mustDeleteDir(path)
})
@ -55,7 +56,7 @@ func TestQueueOpen(t *testing.T) {
mustCreateDir(path)
mustCreateEmptyMetainfo(path, "foobar")
mustCreateFile(fmt.Sprintf("%s/%016X", path, 100*uint64(defaultChunkFileSize)), "asdf")
q := MustOpen(path, "foobar")
q := MustOpen(path, "foobar", 0)
q.MustClose()
mustDeleteDir(path)
})
@ -71,7 +72,7 @@ func TestQueueOpen(t *testing.T) {
t.Fatalf("unexpected error: %s", err)
}
mustCreateFile(fmt.Sprintf("%s/%016X", path, 0), "adfsfd")
q := MustOpen(path, mi.Name)
q := MustOpen(path, mi.Name, 0)
q.MustClose()
mustDeleteDir(path)
})
@ -85,7 +86,7 @@ func TestQueueOpen(t *testing.T) {
if err := mi.WriteToFile(path + "/metainfo.json"); err != nil {
t.Fatalf("unexpected error: %s", err)
}
q := MustOpen(path, mi.Name)
q := MustOpen(path, mi.Name, 0)
q.MustClose()
mustDeleteDir(path)
})
@ -93,7 +94,7 @@ func TestQueueOpen(t *testing.T) {
path := "queue-open-metainfo-dir"
mustCreateDir(path)
mustCreateDir(path + "/metainfo.json")
q := MustOpen(path, "foobar")
q := MustOpen(path, "foobar", 0)
q.MustClose()
mustDeleteDir(path)
})
@ -109,7 +110,7 @@ func TestQueueOpen(t *testing.T) {
t.Fatalf("unexpected error: %s", err)
}
mustCreateFile(fmt.Sprintf("%s/%016X", path, 0), "sdf")
q := MustOpen(path, mi.Name)
q := MustOpen(path, mi.Name, 0)
q.MustClose()
mustDeleteDir(path)
})
@ -118,7 +119,7 @@ func TestQueueOpen(t *testing.T) {
mustCreateDir(path)
mustCreateEmptyMetainfo(path, "foobar")
mustCreateFile(fmt.Sprintf("%s/%016X", path, 0), "sdfdsf")
q := MustOpen(path, "foobar")
q := MustOpen(path, "foobar", 0)
q.MustClose()
mustDeleteDir(path)
})
@ -132,16 +133,43 @@ func TestQueueOpen(t *testing.T) {
t.Fatalf("unexpected error: %s", err)
}
mustCreateFile(fmt.Sprintf("%s/%016X", path, 0), "sdf")
q := MustOpen(path, "baz")
q := MustOpen(path, "baz", 0)
q.MustClose()
mustDeleteDir(path)
})
}
func TestQueueResetIfEmpty(t *testing.T) {
path := "queue-reset-if-empty"
mustDeleteDir(path)
q := MustOpen(path, "foobar", 0)
defer func() {
q.MustClose()
mustDeleteDir(path)
}()
block := make([]byte, 1024*1024)
var buf []byte
for j := 0; j < 10; j++ {
for i := 0; i < 10; i++ {
q.MustWriteBlock(block)
var ok bool
buf, ok = q.MustReadBlock(buf[:0])
if !ok {
t.Fatalf("unexpected ok=false returned from MustReadBlock")
}
}
q.ResetIfEmpty()
if n := q.GetPendingBytes(); n > 0 {
t.Fatalf("unexpected non-zer pending bytes after queue reset: %d", n)
}
}
}
func TestQueueWriteRead(t *testing.T) {
path := "queue-write-read"
mustDeleteDir(path)
q := MustOpen(path, "foobar")
q := MustOpen(path, "foobar", 0)
defer func() {
q.MustClose()
mustDeleteDir(path)
@ -177,7 +205,7 @@ func TestQueueWriteRead(t *testing.T) {
func TestQueueWriteCloseRead(t *testing.T) {
path := "queue-write-close-read"
mustDeleteDir(path)
q := MustOpen(path, "foobar")
q := MustOpen(path, "foobar", 0)
defer func() {
q.MustClose()
mustDeleteDir(path)
@ -194,7 +222,7 @@ func TestQueueWriteCloseRead(t *testing.T) {
t.Fatalf("pending bytes must be greater than 0; got %d", n)
}
q.MustClose()
q = MustOpen(path, "foobar")
q = MustOpen(path, "foobar", 0)
if n := q.GetPendingBytes(); n <= 0 {
t.Fatalf("pending bytes must be greater than 0; got %d", n)
}
@ -218,7 +246,7 @@ func TestQueueWriteCloseRead(t *testing.T) {
func TestQueueReadEmpty(t *testing.T) {
path := "queue-read-empty"
mustDeleteDir(path)
q := MustOpen(path, "foobar")
q := MustOpen(path, "foobar", 0)
defer mustDeleteDir(path)
resultCh := make(chan error)
@ -249,7 +277,7 @@ func TestQueueReadEmpty(t *testing.T) {
func TestQueueReadWriteConcurrent(t *testing.T) {
path := "queue-read-write-concurrent"
mustDeleteDir(path)
q := MustOpen(path, "foobar")
q := MustOpen(path, "foobar", 0)
defer mustDeleteDir(path)
blocksMap := make(map[string]bool, 1000)
@ -309,7 +337,7 @@ func TestQueueReadWriteConcurrent(t *testing.T) {
readersWG.Wait()
// Read the remaining blocks in q.
q = MustOpen(path, "foobar")
q = MustOpen(path, "foobar", 0)
defer q.MustClose()
resultCh := make(chan error)
go func() {
@ -345,7 +373,7 @@ func TestQueueChunkManagementSimple(t *testing.T) {
mustDeleteDir(path)
const chunkFileSize = 100
const maxBlockSize = 20
q := mustOpen(path, "foobar", chunkFileSize, maxBlockSize)
q := mustOpen(path, "foobar", chunkFileSize, maxBlockSize, 0)
defer mustDeleteDir(path)
defer q.MustClose()
var blocks []string
@ -376,7 +404,7 @@ func TestQueueChunkManagementPeriodicClose(t *testing.T) {
mustDeleteDir(path)
const chunkFileSize = 100
const maxBlockSize = 20
q := mustOpen(path, "foobar", chunkFileSize, maxBlockSize)
q := mustOpen(path, "foobar", chunkFileSize, maxBlockSize, 0)
defer func() {
q.MustClose()
mustDeleteDir(path)
@ -387,7 +415,7 @@ func TestQueueChunkManagementPeriodicClose(t *testing.T) {
q.MustWriteBlock([]byte(block))
blocks = append(blocks, block)
q.MustClose()
q = mustOpen(path, "foobar", chunkFileSize, maxBlockSize)
q = mustOpen(path, "foobar", chunkFileSize, maxBlockSize, 0)
}
if n := q.GetPendingBytes(); n == 0 {
t.Fatalf("unexpected zero number of bytes pending")
@ -401,13 +429,70 @@ func TestQueueChunkManagementPeriodicClose(t *testing.T) {
t.Fatalf("unexpected block read; got %q; want %q", data, block)
}
q.MustClose()
q = mustOpen(path, "foobar", chunkFileSize, maxBlockSize)
q = mustOpen(path, "foobar", chunkFileSize, maxBlockSize, 0)
}
if n := q.GetPendingBytes(); n != 0 {
t.Fatalf("unexpected non-zero number of pending bytes: %d", n)
}
}
func TestQueueLimitedSize(t *testing.T) {
const maxPendingBytes = 1000
path := "queue-limited-size"
mustDeleteDir(path)
q := MustOpen(path, "foobar", maxPendingBytes)
defer func() {
q.MustClose()
mustDeleteDir(path)
}()
// Check that small blocks are successfully buffered and read
var blocks []string
for i := 0; i < 10; i++ {
block := fmt.Sprintf("block_%d", i)
q.MustWriteBlock([]byte(block))
blocks = append(blocks, block)
}
var buf []byte
var ok bool
for _, block := range blocks {
buf, ok = q.MustReadBlock(buf[:0])
if !ok {
t.Fatalf("unexpected ok=false")
}
if string(buf) != block {
t.Fatalf("unexpected block read; got %q; want %q", buf, block)
}
}
// Make sure that old blocks are dropped on queue size overflow
for i := 0; i < maxPendingBytes; i++ {
block := fmt.Sprintf("%d", i)
q.MustWriteBlock([]byte(block))
}
if n := q.GetPendingBytes(); n > maxPendingBytes {
t.Fatalf("too many pending bytes; got %d; mustn't exceed %d", n, maxPendingBytes)
}
buf, ok = q.MustReadBlock(buf[:0])
if !ok {
t.Fatalf("unexpected ok=false")
}
blockNum, err := strconv.Atoi(string(buf))
if err != nil {
t.Fatalf("cannot parse block contents: %s", err)
}
if blockNum < 20 {
t.Fatalf("too small block number: %d; it looks like it wasn't dropped", blockNum)
}
// Try writing a block with too big size
block := make([]byte, maxPendingBytes+1)
q.MustWriteBlock(block)
if n := q.GetPendingBytes(); n != 0 {
t.Fatalf("unexpected non-empty queue after writing a block with too big size; queue size: %d bytes", n)
}
}
func mustCreateFile(path, contents string) {
if err := ioutil.WriteFile(path, []byte(contents), 0600); err != nil {
panic(fmt.Errorf("cannot create file %q with %d bytes contents: %s", path, len(contents), err))

View file

@ -16,7 +16,7 @@ func BenchmarkQueueThroughputSerial(b *testing.B) {
b.SetBytes(int64(blockSize) * iterationsCount)
path := fmt.Sprintf("bench-queue-throughput-serial-%d", blockSize)
mustDeleteDir(path)
q := MustOpen(path, "foobar")
q := MustOpen(path, "foobar", 0)
defer func() {
q.MustClose()
mustDeleteDir(path)
@ -37,7 +37,7 @@ func BenchmarkQueueThroughputConcurrent(b *testing.B) {
b.SetBytes(int64(blockSize) * iterationsCount)
path := fmt.Sprintf("bench-queue-throughput-concurrent-%d", blockSize)
mustDeleteDir(path)
q := MustOpen(path, "foobar")
q := MustOpen(path, "foobar", 0)
defer func() {
q.MustClose()
mustDeleteDir(path)