mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-10 15:14:09 +00:00
app/vmagent: add -remoteWrite.maxDiskUsagePerURL
for limiting the maximum disk usage for each -remoteWrite.url
buffer
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/352
This commit is contained in:
parent
c31d640eb9
commit
76036c1897
9 changed files with 209 additions and 56 deletions
|
@ -28,7 +28,7 @@ to `vmagent` (like the ability to push metrics instead of pulling them). We did
|
|||
* Can replicate collected metrics simultaneously to multiple remote storage systems.
|
||||
* Works in environments with unstable connections to remote storage. If the remote storage is unavailable, the collected metrics
|
||||
are buffered at `-remoteWrite.tmpDataPath`. The buffered metrics are sent to remote storage as soon as connection
|
||||
to remote storage is recovered.
|
||||
to remote storage is recovered. The maximum disk usage for the buffer can be limited with `-remoteWrite.maxDiskUsagePerURL`.
|
||||
* Uses lower amounts of RAM, CPU, disk IO and network bandwidth comparing to Prometheus.
|
||||
|
||||
|
||||
|
|
|
@ -122,10 +122,10 @@ func newClient(remoteWriteURL, urlLabelValue string, fq *persistentqueue.FastQue
|
|||
hc: hc,
|
||||
stopCh: make(chan struct{}),
|
||||
}
|
||||
c.requestDuration = metrics.NewHistogram(fmt.Sprintf(`vmagent_remotewrite_duration_seconds{url=%q}`, c.urlLabelValue))
|
||||
c.requestsOKCount = metrics.NewCounter(fmt.Sprintf(`vmagent_remotewrite_requests_total{url=%q, status_code="2XX"}`, c.urlLabelValue))
|
||||
c.errorsCount = metrics.NewCounter(fmt.Sprintf(`vmagent_remotewrite_errors_total{url=%q}`, c.urlLabelValue))
|
||||
c.retriesCount = metrics.NewCounter(fmt.Sprintf(`vmagent_remotewrite_retries_count_total{url=%q}`, c.urlLabelValue))
|
||||
c.requestDuration = metrics.GetOrCreateHistogram(fmt.Sprintf(`vmagent_remotewrite_duration_seconds{url=%q}`, c.urlLabelValue))
|
||||
c.requestsOKCount = metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_requests_total{url=%q, status_code="2XX"}`, c.urlLabelValue))
|
||||
c.errorsCount = metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_errors_total{url=%q}`, c.urlLabelValue))
|
||||
c.retriesCount = metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_retries_count_total{url=%q}`, c.urlLabelValue))
|
||||
for i := 0; i < concurrency; i++ {
|
||||
c.wg.Add(1)
|
||||
go func() {
|
||||
|
|
|
@ -26,6 +26,10 @@ var (
|
|||
"isn't enough for sending high volume of collected data to remote storage")
|
||||
showRemoteWriteURL = flag.Bool("remoteWrite.showURL", false, "Whether to show -remoteWrite.url in the exported metrics. "+
|
||||
"It is hidden by default, since it can contain sensistive auth info")
|
||||
maxPendingBytesPerURL = flag.Int("remoteWrite.maxDiskUsagePerURL", 0, "The maximum file-based buffer size in bytes at -remoteWrite.tmpDataPath "+
|
||||
"for each -remoteWrite.url. When buffer size reaches the configured maximum, then old data is dropped when adding new data to the buffer. "+
|
||||
"Buffered data is stored in ~500MB chunks, so the minimum practical value for this flag is 500000000. "+
|
||||
"Disk usage is unlimited if the value is set to 0")
|
||||
)
|
||||
|
||||
var rwctxs []*remoteWriteCtx
|
||||
|
@ -130,11 +134,11 @@ type remoteWriteCtx struct {
|
|||
func newRemoteWriteCtx(remoteWriteURL, relabelConfigPath string, maxInmemoryBlocks int, urlLabelValue string) *remoteWriteCtx {
|
||||
h := xxhash.Sum64([]byte(remoteWriteURL))
|
||||
path := fmt.Sprintf("%s/persistent-queue/%016X", *tmpDataPath, h)
|
||||
fq := persistentqueue.MustOpenFastQueue(path, remoteWriteURL, maxInmemoryBlocks)
|
||||
_ = metrics.NewGauge(fmt.Sprintf(`vmagent_remotewrite_pending_data_bytes{url=%q, hash="%016X"}`, urlLabelValue, h), func() float64 {
|
||||
fq := persistentqueue.MustOpenFastQueue(path, remoteWriteURL, maxInmemoryBlocks, *maxPendingBytesPerURL)
|
||||
_ = metrics.GetOrCreateGauge(fmt.Sprintf(`vmagent_remotewrite_pending_data_bytes{path=%q, url=%q}`, path, urlLabelValue), func() float64 {
|
||||
return float64(fq.GetPendingBytes())
|
||||
})
|
||||
_ = metrics.NewGauge(fmt.Sprintf(`vmagent_remotewrite_pending_inmemory_blocks{url=%q}`, urlLabelValue), func() float64 {
|
||||
_ = metrics.GetOrCreateGauge(fmt.Sprintf(`vmagent_remotewrite_pending_inmemory_blocks{path=%q, url=%q}`, path, urlLabelValue), func() float64 {
|
||||
return float64(fq.GetInmemoryQueueLen())
|
||||
})
|
||||
c := newClient(remoteWriteURL, urlLabelValue, fq, *queues)
|
||||
|
@ -156,7 +160,7 @@ func newRemoteWriteCtx(remoteWriteURL, relabelConfigPath string, maxInmemoryBloc
|
|||
prcs: prcs,
|
||||
pss: pss,
|
||||
|
||||
relabelMetricsDropped: metrics.NewCounter(fmt.Sprintf(`vmagent_remotewrite_relabel_metrics_dropped_total{url=%q}`, urlLabelValue)),
|
||||
relabelMetricsDropped: metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_relabel_metrics_dropped_total{path=%q, url=%q}`, path, urlLabelValue)),
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -32,8 +32,12 @@ type FastQueue struct {
|
|||
// MustOpenFastQueue opens persistent queue at the given path.
|
||||
//
|
||||
// It holds up to maxInmemoryBlocks in memory before falling back to file-based persistence.
|
||||
func MustOpenFastQueue(path, name string, maxInmemoryBlocks int) *FastQueue {
|
||||
pq := MustOpen(path, name)
|
||||
//
|
||||
// if maxPendingBytes is 0, then the queue size is unlimited.
|
||||
// Otherwise its size is limited by maxPendingBytes. The oldest data is dropped when the queue
|
||||
// reaches maxPendingSize.
|
||||
func MustOpenFastQueue(path, name string, maxInmemoryBlocks, maxPendingBytes int) *FastQueue {
|
||||
pq := MustOpen(path, name, maxPendingBytes)
|
||||
fq := &FastQueue{
|
||||
pq: pq,
|
||||
ch: make(chan *bytesutil.ByteBuffer, maxInmemoryBlocks),
|
||||
|
@ -150,5 +154,3 @@ func (fq *FastQueue) MustReadBlock(dst []byte) ([]byte, bool) {
|
|||
fq.cond.Wait()
|
||||
}
|
||||
}
|
||||
|
||||
var blockBufPool bytesutil.ByteBufferPool
|
||||
|
|
|
@ -11,7 +11,7 @@ func TestFastQueueOpenClose(t *testing.T) {
|
|||
path := "fast-queue-open-close"
|
||||
mustDeleteDir(path)
|
||||
for i := 0; i < 10; i++ {
|
||||
fq := MustOpenFastQueue(path, "foobar", 100)
|
||||
fq := MustOpenFastQueue(path, "foobar", 100, 0)
|
||||
fq.MustClose()
|
||||
}
|
||||
mustDeleteDir(path)
|
||||
|
@ -22,13 +22,19 @@ func TestFastQueueWriteReadInmemory(t *testing.T) {
|
|||
mustDeleteDir(path)
|
||||
|
||||
capacity := 100
|
||||
fq := MustOpenFastQueue(path, "foobar", capacity)
|
||||
fq := MustOpenFastQueue(path, "foobar", capacity, 0)
|
||||
if n := fq.GetInmemoryQueueLen(); n != 0 {
|
||||
t.Fatalf("unexpected non-zero inmemory queue size: %d", n)
|
||||
}
|
||||
var blocks []string
|
||||
for i := 0; i < capacity; i++ {
|
||||
block := fmt.Sprintf("block %d", i)
|
||||
fq.MustWriteBlock([]byte(block))
|
||||
blocks = append(blocks, block)
|
||||
}
|
||||
if n := fq.GetInmemoryQueueLen(); n != capacity {
|
||||
t.Fatalf("unexpected size of inmemory queue; got %d; want %d", n, capacity)
|
||||
}
|
||||
for _, block := range blocks {
|
||||
buf, ok := fq.MustReadBlock(nil)
|
||||
if !ok {
|
||||
|
@ -47,7 +53,7 @@ func TestFastQueueWriteReadMixed(t *testing.T) {
|
|||
mustDeleteDir(path)
|
||||
|
||||
capacity := 100
|
||||
fq := MustOpenFastQueue(path, "foobar", capacity)
|
||||
fq := MustOpenFastQueue(path, "foobar", capacity, 0)
|
||||
if n := fq.GetPendingBytes(); n != 0 {
|
||||
t.Fatalf("the number of pending bytes must be 0; got %d", n)
|
||||
}
|
||||
|
@ -81,7 +87,7 @@ func TestFastQueueWriteReadWithCloses(t *testing.T) {
|
|||
mustDeleteDir(path)
|
||||
|
||||
capacity := 100
|
||||
fq := MustOpenFastQueue(path, "foobar", capacity)
|
||||
fq := MustOpenFastQueue(path, "foobar", capacity, 0)
|
||||
if n := fq.GetPendingBytes(); n != 0 {
|
||||
t.Fatalf("the number of pending bytes must be 0; got %d", n)
|
||||
}
|
||||
|
@ -91,7 +97,7 @@ func TestFastQueueWriteReadWithCloses(t *testing.T) {
|
|||
fq.MustWriteBlock([]byte(block))
|
||||
blocks = append(blocks, block)
|
||||
fq.MustClose()
|
||||
fq = MustOpenFastQueue(path, "foobar", capacity)
|
||||
fq = MustOpenFastQueue(path, "foobar", capacity, 0)
|
||||
}
|
||||
if n := fq.GetPendingBytes(); n == 0 {
|
||||
t.Fatalf("the number of pending bytes must be greater than 0")
|
||||
|
@ -105,7 +111,7 @@ func TestFastQueueWriteReadWithCloses(t *testing.T) {
|
|||
t.Fatalf("unexpected block read; got %q; want %q", buf, block)
|
||||
}
|
||||
fq.MustClose()
|
||||
fq = MustOpenFastQueue(path, "foobar", capacity)
|
||||
fq = MustOpenFastQueue(path, "foobar", capacity, 0)
|
||||
}
|
||||
if n := fq.GetPendingBytes(); n != 0 {
|
||||
t.Fatalf("the number of pending bytes must be 0; got %d", n)
|
||||
|
@ -118,7 +124,7 @@ func TestFastQueueReadUnblockByClose(t *testing.T) {
|
|||
path := "fast-queue-read-unblock-by-close"
|
||||
mustDeleteDir(path)
|
||||
|
||||
fq := MustOpenFastQueue(path, "foorbar", 123)
|
||||
fq := MustOpenFastQueue(path, "foorbar", 123, 0)
|
||||
resultCh := make(chan error)
|
||||
go func() {
|
||||
data, ok := fq.MustReadBlock(nil)
|
||||
|
@ -148,7 +154,7 @@ func TestFastQueueReadUnblockByWrite(t *testing.T) {
|
|||
path := "fast-queue-read-unblock-by-write"
|
||||
mustDeleteDir(path)
|
||||
|
||||
fq := MustOpenFastQueue(path, "foobar", 13)
|
||||
fq := MustOpenFastQueue(path, "foobar", 13, 0)
|
||||
block := fmt.Sprintf("foodsafdsaf sdf")
|
||||
resultCh := make(chan error)
|
||||
go func() {
|
||||
|
@ -180,7 +186,7 @@ func TestFastQueueReadWriteConcurrent(t *testing.T) {
|
|||
path := "fast-queue-read-write-concurrent"
|
||||
mustDeleteDir(path)
|
||||
|
||||
fq := MustOpenFastQueue(path, "foobar", 5)
|
||||
fq := MustOpenFastQueue(path, "foobar", 5, 0)
|
||||
|
||||
var blocks []string
|
||||
blocksMap := make(map[string]bool)
|
||||
|
@ -244,7 +250,7 @@ func TestFastQueueReadWriteConcurrent(t *testing.T) {
|
|||
readersWG.Wait()
|
||||
|
||||
// Collect the remaining data
|
||||
fq = MustOpenFastQueue(path, "foobar", 5)
|
||||
fq = MustOpenFastQueue(path, "foobar", 5, 0)
|
||||
resultCh := make(chan error)
|
||||
go func() {
|
||||
for len(blocksMap) > 0 {
|
||||
|
|
|
@ -15,7 +15,7 @@ func BenchmarkFastQueueThroughputSerial(b *testing.B) {
|
|||
b.SetBytes(int64(blockSize) * iterationsCount)
|
||||
path := fmt.Sprintf("bench-fast-queue-throughput-serial-%d", blockSize)
|
||||
mustDeleteDir(path)
|
||||
fq := MustOpenFastQueue(path, "foobar", iterationsCount*2)
|
||||
fq := MustOpenFastQueue(path, "foobar", iterationsCount*2, 0)
|
||||
defer func() {
|
||||
fq.MustClose()
|
||||
mustDeleteDir(path)
|
||||
|
@ -36,7 +36,7 @@ func BenchmarkFastQueueThroughputConcurrent(b *testing.B) {
|
|||
b.SetBytes(int64(blockSize) * iterationsCount)
|
||||
path := fmt.Sprintf("bench-fast-queue-throughput-concurrent-%d", blockSize)
|
||||
mustDeleteDir(path)
|
||||
fq := MustOpenFastQueue(path, "foobar", iterationsCount*runtime.GOMAXPROCS(-1)*2)
|
||||
fq := MustOpenFastQueue(path, "foobar", iterationsCount*runtime.GOMAXPROCS(-1)*2, 0)
|
||||
defer func() {
|
||||
fq.MustClose()
|
||||
mustDeleteDir(path)
|
||||
|
|
|
@ -15,6 +15,7 @@ import (
|
|||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/filestream"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
"github.com/VictoriaMetrics/metrics"
|
||||
)
|
||||
|
||||
// MaxBlockSize is the maximum size of the block persistent queue can work with.
|
||||
|
@ -26,8 +27,9 @@ var chunkFileNameRegex = regexp.MustCompile("^[0-9A-F]{16}$")
|
|||
|
||||
// Queue represents persistent queue.
|
||||
type Queue struct {
|
||||
chunkFileSize uint64
|
||||
maxBlockSize uint64
|
||||
chunkFileSize uint64
|
||||
maxBlockSize uint64
|
||||
maxPendingBytes uint64
|
||||
|
||||
dir string
|
||||
name string
|
||||
|
@ -51,6 +53,15 @@ type Queue struct {
|
|||
writerFlushedOffset uint64
|
||||
|
||||
mustStop bool
|
||||
|
||||
blocksDropped *metrics.Counter
|
||||
bytesDropped *metrics.Counter
|
||||
|
||||
blocksWritten *metrics.Counter
|
||||
bytesWritten *metrics.Counter
|
||||
|
||||
blocksRead *metrics.Counter
|
||||
bytesRead *metrics.Counter
|
||||
}
|
||||
|
||||
// ResetIfEmpty resets q if it is empty.
|
||||
|
@ -111,22 +122,28 @@ func (q *Queue) GetPendingBytes() uint64 {
|
|||
}
|
||||
|
||||
// MustOpen opens persistent queue from the given path.
|
||||
func MustOpen(path, name string) *Queue {
|
||||
return mustOpen(path, name, defaultChunkFileSize, MaxBlockSize)
|
||||
//
|
||||
// If maxPendingBytes is greater than 0, then the max queue size is limited by this value.
|
||||
// The oldest data is deleted when queue size exceeds maxPendingBytes.
|
||||
func MustOpen(path, name string, maxPendingBytes int) *Queue {
|
||||
if maxPendingBytes < 0 {
|
||||
maxPendingBytes = 0
|
||||
}
|
||||
return mustOpen(path, name, defaultChunkFileSize, MaxBlockSize, uint64(maxPendingBytes))
|
||||
}
|
||||
|
||||
func mustOpen(path, name string, chunkFileSize, maxBlockSize uint64) *Queue {
|
||||
func mustOpen(path, name string, chunkFileSize, maxBlockSize, maxPendingBytes uint64) *Queue {
|
||||
if chunkFileSize < 8 || chunkFileSize-8 < maxBlockSize {
|
||||
logger.Panicf("BUG: too small chunkFileSize=%d for maxBlockSize=%d; chunkFileSize must fit at least one block", chunkFileSize, maxBlockSize)
|
||||
}
|
||||
if maxBlockSize <= 0 {
|
||||
logger.Panicf("BUG: maxBlockSize must be greater than 0; got %d", maxBlockSize)
|
||||
}
|
||||
q, err := tryOpeningQueue(path, name, chunkFileSize, maxBlockSize)
|
||||
q, err := tryOpeningQueue(path, name, chunkFileSize, maxBlockSize, maxPendingBytes)
|
||||
if err != nil {
|
||||
logger.Errorf("cannot open persistent queue at %q: %s; cleaning it up and trying again", path, err)
|
||||
fs.RemoveDirContents(path)
|
||||
q, err = tryOpeningQueue(path, name, chunkFileSize, maxBlockSize)
|
||||
q, err = tryOpeningQueue(path, name, chunkFileSize, maxBlockSize, maxPendingBytes)
|
||||
if err != nil {
|
||||
logger.Panicf("FATAL: %s", err)
|
||||
}
|
||||
|
@ -134,14 +151,22 @@ func mustOpen(path, name string, chunkFileSize, maxBlockSize uint64) *Queue {
|
|||
return q
|
||||
}
|
||||
|
||||
func tryOpeningQueue(path, name string, chunkFileSize, maxBlockSize uint64) (*Queue, error) {
|
||||
func tryOpeningQueue(path, name string, chunkFileSize, maxBlockSize, maxPendingBytes uint64) (*Queue, error) {
|
||||
var q Queue
|
||||
q.chunkFileSize = chunkFileSize
|
||||
q.maxBlockSize = maxBlockSize
|
||||
q.maxPendingBytes = maxPendingBytes
|
||||
q.dir = path
|
||||
q.name = name
|
||||
q.cond.L = &q.mu
|
||||
|
||||
q.blocksDropped = metrics.GetOrCreateCounter(fmt.Sprintf(`vm_persistentqueue_blocks_dropped_total{path=%q}`, path))
|
||||
q.bytesDropped = metrics.GetOrCreateCounter(fmt.Sprintf(`vm_persistentqueue_bytes_dropped_total{path=%q}`, path))
|
||||
q.blocksWritten = metrics.GetOrCreateCounter(fmt.Sprintf(`vm_persistentqueue_blocks_written_total{path=%q}`, path))
|
||||
q.bytesWritten = metrics.GetOrCreateCounter(fmt.Sprintf(`vm_persistentqueue_bytes_written_total{path=%q}`, path))
|
||||
q.blocksRead = metrics.GetOrCreateCounter(fmt.Sprintf(`vm_persistentqueue_blocks_read_total{path=%q}`, path))
|
||||
q.bytesRead = metrics.GetOrCreateCounter(fmt.Sprintf(`vm_persistentqueue_bytes_read_total{path=%q}`, path))
|
||||
|
||||
cleanOnError := func() {
|
||||
if q.reader != nil {
|
||||
q.reader.MustClose()
|
||||
|
@ -333,6 +358,31 @@ func (q *Queue) MustWriteBlock(block []byte) {
|
|||
if q.readerOffset > q.writerOffset {
|
||||
logger.Panicf("BUG: readerOffset=%d shouldn't exceed writerOffset=%d", q.readerOffset, q.writerOffset)
|
||||
}
|
||||
if q.maxPendingBytes > 0 {
|
||||
// Drain the oldest blocks until the number of pending bytes becomes enough for the block.
|
||||
blockSize := uint64(len(block) + 8)
|
||||
maxPendingBytes := q.maxPendingBytes
|
||||
if blockSize < maxPendingBytes {
|
||||
maxPendingBytes -= blockSize
|
||||
} else {
|
||||
maxPendingBytes = 0
|
||||
}
|
||||
bb := blockBufPool.Get()
|
||||
for q.writerOffset-q.readerOffset > maxPendingBytes {
|
||||
var err error
|
||||
bb.B, err = q.readBlockLocked(bb.B[:0])
|
||||
if err != nil {
|
||||
logger.Panicf("FATAL: cannot read the oldest block %s", err)
|
||||
}
|
||||
q.blocksDropped.Inc()
|
||||
q.bytesDropped.Add(len(bb.B))
|
||||
}
|
||||
blockBufPool.Put(bb)
|
||||
if blockSize > q.maxPendingBytes {
|
||||
// The block is too big to put it into the queue. Drop it.
|
||||
return
|
||||
}
|
||||
}
|
||||
mustNotifyReader := q.readerOffset == q.writerOffset
|
||||
if err := q.writeBlockLocked(block); err != nil {
|
||||
logger.Panicf("FATAL: %s", err)
|
||||
|
@ -342,6 +392,8 @@ func (q *Queue) MustWriteBlock(block []byte) {
|
|||
}
|
||||
}
|
||||
|
||||
var blockBufPool bytesutil.ByteBufferPool
|
||||
|
||||
func (q *Queue) writeBlockLocked(block []byte) error {
|
||||
if q.writerLocalOffset+q.maxBlockSize+8 > q.chunkFileSize {
|
||||
// Finalize the current chunk and start new one.
|
||||
|
@ -376,6 +428,8 @@ func (q *Queue) writeBlockLocked(block []byte) error {
|
|||
if err := q.write(block); err != nil {
|
||||
return fmt.Errorf("cannot write block contents with size %d bytes to %q: %s", len(block), q.writerPath, err)
|
||||
}
|
||||
q.blocksWritten.Inc()
|
||||
q.bytesWritten.Add(len(block))
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -447,6 +501,8 @@ func (q *Queue) readBlockLocked(dst []byte) ([]byte, error) {
|
|||
if err := q.readFull(dst[dstLen:]); err != nil {
|
||||
return dst, fmt.Errorf("cannot read block contents with size %d bytes from %q: %s", blockLen, q.readerPath, err)
|
||||
}
|
||||
q.blocksRead.Inc()
|
||||
q.bytesRead.Add(int(blockLen))
|
||||
return dst, nil
|
||||
}
|
||||
|
||||
|
|
|
@ -4,6 +4,7 @@ import (
|
|||
"fmt"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"strconv"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
@ -13,7 +14,7 @@ func TestQueueOpenClose(t *testing.T) {
|
|||
path := "queue-open-close"
|
||||
mustDeleteDir(path)
|
||||
for i := 0; i < 3; i++ {
|
||||
q := MustOpen(path, "foobar")
|
||||
q := MustOpen(path, "foobar", 0)
|
||||
if n := q.GetPendingBytes(); n > 0 {
|
||||
t.Fatalf("pending bytes must be 0; got %d", n)
|
||||
}
|
||||
|
@ -27,7 +28,7 @@ func TestQueueOpen(t *testing.T) {
|
|||
path := "queue-open-invalid-metainfo"
|
||||
mustCreateDir(path)
|
||||
mustCreateFile(path+"/metainfo.json", "foobarbaz")
|
||||
q := MustOpen(path, "foobar")
|
||||
q := MustOpen(path, "foobar", 0)
|
||||
q.MustClose()
|
||||
mustDeleteDir(path)
|
||||
})
|
||||
|
@ -37,7 +38,7 @@ func TestQueueOpen(t *testing.T) {
|
|||
mustCreateEmptyMetainfo(path, "foobar")
|
||||
mustCreateFile(path+"/junk-file", "foobar")
|
||||
mustCreateDir(path + "/junk-dir")
|
||||
q := MustOpen(path, "foobar")
|
||||
q := MustOpen(path, "foobar", 0)
|
||||
q.MustClose()
|
||||
mustDeleteDir(path)
|
||||
})
|
||||
|
@ -46,7 +47,7 @@ func TestQueueOpen(t *testing.T) {
|
|||
mustCreateDir(path)
|
||||
mustCreateEmptyMetainfo(path, "foobar")
|
||||
mustCreateFile(fmt.Sprintf("%s/%016X", path, 1234), "qwere")
|
||||
q := MustOpen(path, "foobar")
|
||||
q := MustOpen(path, "foobar", 0)
|
||||
q.MustClose()
|
||||
mustDeleteDir(path)
|
||||
})
|
||||
|
@ -55,7 +56,7 @@ func TestQueueOpen(t *testing.T) {
|
|||
mustCreateDir(path)
|
||||
mustCreateEmptyMetainfo(path, "foobar")
|
||||
mustCreateFile(fmt.Sprintf("%s/%016X", path, 100*uint64(defaultChunkFileSize)), "asdf")
|
||||
q := MustOpen(path, "foobar")
|
||||
q := MustOpen(path, "foobar", 0)
|
||||
q.MustClose()
|
||||
mustDeleteDir(path)
|
||||
})
|
||||
|
@ -71,7 +72,7 @@ func TestQueueOpen(t *testing.T) {
|
|||
t.Fatalf("unexpected error: %s", err)
|
||||
}
|
||||
mustCreateFile(fmt.Sprintf("%s/%016X", path, 0), "adfsfd")
|
||||
q := MustOpen(path, mi.Name)
|
||||
q := MustOpen(path, mi.Name, 0)
|
||||
q.MustClose()
|
||||
mustDeleteDir(path)
|
||||
})
|
||||
|
@ -85,7 +86,7 @@ func TestQueueOpen(t *testing.T) {
|
|||
if err := mi.WriteToFile(path + "/metainfo.json"); err != nil {
|
||||
t.Fatalf("unexpected error: %s", err)
|
||||
}
|
||||
q := MustOpen(path, mi.Name)
|
||||
q := MustOpen(path, mi.Name, 0)
|
||||
q.MustClose()
|
||||
mustDeleteDir(path)
|
||||
})
|
||||
|
@ -93,7 +94,7 @@ func TestQueueOpen(t *testing.T) {
|
|||
path := "queue-open-metainfo-dir"
|
||||
mustCreateDir(path)
|
||||
mustCreateDir(path + "/metainfo.json")
|
||||
q := MustOpen(path, "foobar")
|
||||
q := MustOpen(path, "foobar", 0)
|
||||
q.MustClose()
|
||||
mustDeleteDir(path)
|
||||
})
|
||||
|
@ -109,7 +110,7 @@ func TestQueueOpen(t *testing.T) {
|
|||
t.Fatalf("unexpected error: %s", err)
|
||||
}
|
||||
mustCreateFile(fmt.Sprintf("%s/%016X", path, 0), "sdf")
|
||||
q := MustOpen(path, mi.Name)
|
||||
q := MustOpen(path, mi.Name, 0)
|
||||
q.MustClose()
|
||||
mustDeleteDir(path)
|
||||
})
|
||||
|
@ -118,7 +119,7 @@ func TestQueueOpen(t *testing.T) {
|
|||
mustCreateDir(path)
|
||||
mustCreateEmptyMetainfo(path, "foobar")
|
||||
mustCreateFile(fmt.Sprintf("%s/%016X", path, 0), "sdfdsf")
|
||||
q := MustOpen(path, "foobar")
|
||||
q := MustOpen(path, "foobar", 0)
|
||||
q.MustClose()
|
||||
mustDeleteDir(path)
|
||||
})
|
||||
|
@ -132,16 +133,43 @@ func TestQueueOpen(t *testing.T) {
|
|||
t.Fatalf("unexpected error: %s", err)
|
||||
}
|
||||
mustCreateFile(fmt.Sprintf("%s/%016X", path, 0), "sdf")
|
||||
q := MustOpen(path, "baz")
|
||||
q := MustOpen(path, "baz", 0)
|
||||
q.MustClose()
|
||||
mustDeleteDir(path)
|
||||
})
|
||||
}
|
||||
|
||||
func TestQueueResetIfEmpty(t *testing.T) {
|
||||
path := "queue-reset-if-empty"
|
||||
mustDeleteDir(path)
|
||||
q := MustOpen(path, "foobar", 0)
|
||||
defer func() {
|
||||
q.MustClose()
|
||||
mustDeleteDir(path)
|
||||
}()
|
||||
|
||||
block := make([]byte, 1024*1024)
|
||||
var buf []byte
|
||||
for j := 0; j < 10; j++ {
|
||||
for i := 0; i < 10; i++ {
|
||||
q.MustWriteBlock(block)
|
||||
var ok bool
|
||||
buf, ok = q.MustReadBlock(buf[:0])
|
||||
if !ok {
|
||||
t.Fatalf("unexpected ok=false returned from MustReadBlock")
|
||||
}
|
||||
}
|
||||
q.ResetIfEmpty()
|
||||
if n := q.GetPendingBytes(); n > 0 {
|
||||
t.Fatalf("unexpected non-zer pending bytes after queue reset: %d", n)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestQueueWriteRead(t *testing.T) {
|
||||
path := "queue-write-read"
|
||||
mustDeleteDir(path)
|
||||
q := MustOpen(path, "foobar")
|
||||
q := MustOpen(path, "foobar", 0)
|
||||
defer func() {
|
||||
q.MustClose()
|
||||
mustDeleteDir(path)
|
||||
|
@ -177,7 +205,7 @@ func TestQueueWriteRead(t *testing.T) {
|
|||
func TestQueueWriteCloseRead(t *testing.T) {
|
||||
path := "queue-write-close-read"
|
||||
mustDeleteDir(path)
|
||||
q := MustOpen(path, "foobar")
|
||||
q := MustOpen(path, "foobar", 0)
|
||||
defer func() {
|
||||
q.MustClose()
|
||||
mustDeleteDir(path)
|
||||
|
@ -194,7 +222,7 @@ func TestQueueWriteCloseRead(t *testing.T) {
|
|||
t.Fatalf("pending bytes must be greater than 0; got %d", n)
|
||||
}
|
||||
q.MustClose()
|
||||
q = MustOpen(path, "foobar")
|
||||
q = MustOpen(path, "foobar", 0)
|
||||
if n := q.GetPendingBytes(); n <= 0 {
|
||||
t.Fatalf("pending bytes must be greater than 0; got %d", n)
|
||||
}
|
||||
|
@ -218,7 +246,7 @@ func TestQueueWriteCloseRead(t *testing.T) {
|
|||
func TestQueueReadEmpty(t *testing.T) {
|
||||
path := "queue-read-empty"
|
||||
mustDeleteDir(path)
|
||||
q := MustOpen(path, "foobar")
|
||||
q := MustOpen(path, "foobar", 0)
|
||||
defer mustDeleteDir(path)
|
||||
|
||||
resultCh := make(chan error)
|
||||
|
@ -249,7 +277,7 @@ func TestQueueReadEmpty(t *testing.T) {
|
|||
func TestQueueReadWriteConcurrent(t *testing.T) {
|
||||
path := "queue-read-write-concurrent"
|
||||
mustDeleteDir(path)
|
||||
q := MustOpen(path, "foobar")
|
||||
q := MustOpen(path, "foobar", 0)
|
||||
defer mustDeleteDir(path)
|
||||
|
||||
blocksMap := make(map[string]bool, 1000)
|
||||
|
@ -309,7 +337,7 @@ func TestQueueReadWriteConcurrent(t *testing.T) {
|
|||
readersWG.Wait()
|
||||
|
||||
// Read the remaining blocks in q.
|
||||
q = MustOpen(path, "foobar")
|
||||
q = MustOpen(path, "foobar", 0)
|
||||
defer q.MustClose()
|
||||
resultCh := make(chan error)
|
||||
go func() {
|
||||
|
@ -345,7 +373,7 @@ func TestQueueChunkManagementSimple(t *testing.T) {
|
|||
mustDeleteDir(path)
|
||||
const chunkFileSize = 100
|
||||
const maxBlockSize = 20
|
||||
q := mustOpen(path, "foobar", chunkFileSize, maxBlockSize)
|
||||
q := mustOpen(path, "foobar", chunkFileSize, maxBlockSize, 0)
|
||||
defer mustDeleteDir(path)
|
||||
defer q.MustClose()
|
||||
var blocks []string
|
||||
|
@ -376,7 +404,7 @@ func TestQueueChunkManagementPeriodicClose(t *testing.T) {
|
|||
mustDeleteDir(path)
|
||||
const chunkFileSize = 100
|
||||
const maxBlockSize = 20
|
||||
q := mustOpen(path, "foobar", chunkFileSize, maxBlockSize)
|
||||
q := mustOpen(path, "foobar", chunkFileSize, maxBlockSize, 0)
|
||||
defer func() {
|
||||
q.MustClose()
|
||||
mustDeleteDir(path)
|
||||
|
@ -387,7 +415,7 @@ func TestQueueChunkManagementPeriodicClose(t *testing.T) {
|
|||
q.MustWriteBlock([]byte(block))
|
||||
blocks = append(blocks, block)
|
||||
q.MustClose()
|
||||
q = mustOpen(path, "foobar", chunkFileSize, maxBlockSize)
|
||||
q = mustOpen(path, "foobar", chunkFileSize, maxBlockSize, 0)
|
||||
}
|
||||
if n := q.GetPendingBytes(); n == 0 {
|
||||
t.Fatalf("unexpected zero number of bytes pending")
|
||||
|
@ -401,13 +429,70 @@ func TestQueueChunkManagementPeriodicClose(t *testing.T) {
|
|||
t.Fatalf("unexpected block read; got %q; want %q", data, block)
|
||||
}
|
||||
q.MustClose()
|
||||
q = mustOpen(path, "foobar", chunkFileSize, maxBlockSize)
|
||||
q = mustOpen(path, "foobar", chunkFileSize, maxBlockSize, 0)
|
||||
}
|
||||
if n := q.GetPendingBytes(); n != 0 {
|
||||
t.Fatalf("unexpected non-zero number of pending bytes: %d", n)
|
||||
}
|
||||
}
|
||||
|
||||
func TestQueueLimitedSize(t *testing.T) {
|
||||
const maxPendingBytes = 1000
|
||||
path := "queue-limited-size"
|
||||
mustDeleteDir(path)
|
||||
q := MustOpen(path, "foobar", maxPendingBytes)
|
||||
defer func() {
|
||||
q.MustClose()
|
||||
mustDeleteDir(path)
|
||||
}()
|
||||
|
||||
// Check that small blocks are successfully buffered and read
|
||||
var blocks []string
|
||||
for i := 0; i < 10; i++ {
|
||||
block := fmt.Sprintf("block_%d", i)
|
||||
q.MustWriteBlock([]byte(block))
|
||||
blocks = append(blocks, block)
|
||||
}
|
||||
var buf []byte
|
||||
var ok bool
|
||||
for _, block := range blocks {
|
||||
buf, ok = q.MustReadBlock(buf[:0])
|
||||
if !ok {
|
||||
t.Fatalf("unexpected ok=false")
|
||||
}
|
||||
if string(buf) != block {
|
||||
t.Fatalf("unexpected block read; got %q; want %q", buf, block)
|
||||
}
|
||||
}
|
||||
|
||||
// Make sure that old blocks are dropped on queue size overflow
|
||||
for i := 0; i < maxPendingBytes; i++ {
|
||||
block := fmt.Sprintf("%d", i)
|
||||
q.MustWriteBlock([]byte(block))
|
||||
}
|
||||
if n := q.GetPendingBytes(); n > maxPendingBytes {
|
||||
t.Fatalf("too many pending bytes; got %d; mustn't exceed %d", n, maxPendingBytes)
|
||||
}
|
||||
buf, ok = q.MustReadBlock(buf[:0])
|
||||
if !ok {
|
||||
t.Fatalf("unexpected ok=false")
|
||||
}
|
||||
blockNum, err := strconv.Atoi(string(buf))
|
||||
if err != nil {
|
||||
t.Fatalf("cannot parse block contents: %s", err)
|
||||
}
|
||||
if blockNum < 20 {
|
||||
t.Fatalf("too small block number: %d; it looks like it wasn't dropped", blockNum)
|
||||
}
|
||||
|
||||
// Try writing a block with too big size
|
||||
block := make([]byte, maxPendingBytes+1)
|
||||
q.MustWriteBlock(block)
|
||||
if n := q.GetPendingBytes(); n != 0 {
|
||||
t.Fatalf("unexpected non-empty queue after writing a block with too big size; queue size: %d bytes", n)
|
||||
}
|
||||
}
|
||||
|
||||
func mustCreateFile(path, contents string) {
|
||||
if err := ioutil.WriteFile(path, []byte(contents), 0600); err != nil {
|
||||
panic(fmt.Errorf("cannot create file %q with %d bytes contents: %s", path, len(contents), err))
|
||||
|
|
|
@ -16,7 +16,7 @@ func BenchmarkQueueThroughputSerial(b *testing.B) {
|
|||
b.SetBytes(int64(blockSize) * iterationsCount)
|
||||
path := fmt.Sprintf("bench-queue-throughput-serial-%d", blockSize)
|
||||
mustDeleteDir(path)
|
||||
q := MustOpen(path, "foobar")
|
||||
q := MustOpen(path, "foobar", 0)
|
||||
defer func() {
|
||||
q.MustClose()
|
||||
mustDeleteDir(path)
|
||||
|
@ -37,7 +37,7 @@ func BenchmarkQueueThroughputConcurrent(b *testing.B) {
|
|||
b.SetBytes(int64(blockSize) * iterationsCount)
|
||||
path := fmt.Sprintf("bench-queue-throughput-concurrent-%d", blockSize)
|
||||
mustDeleteDir(path)
|
||||
q := MustOpen(path, "foobar")
|
||||
q := MustOpen(path, "foobar", 0)
|
||||
defer func() {
|
||||
q.MustClose()
|
||||
mustDeleteDir(path)
|
||||
|
|
Loading…
Reference in a new issue