package persistentqueue

import (


// MaxBlockSize is the maximum size of the block persistent queue can work with.
const MaxBlockSize = 32 * 1024 * 1024

const defaultChunkFileSize = (MaxBlockSize + 8) * 16

var chunkFileNameRegex = regexp.MustCompile("^[0-9A-F]{16}$")

// Queue represents persistent queue.
type Queue struct {
	chunkFileSize   uint64
	maxBlockSize    uint64
	maxPendingBytes uint64

	dir  string
	name string

	flockF *os.File

	// mu protects all the fields below.
	mu sync.Mutex

	// cond is used for notifying blocked readers when new data has been added
	// or when MustClose is called.
	cond sync.Cond

	reader            *filestream.Reader
	readerPath        string
	readerOffset      uint64
	readerLocalOffset uint64

	writer              *filestream.Writer
	writerPath          string
	writerOffset        uint64
	writerLocalOffset   uint64
	writerFlushedOffset uint64

	lastMetainfoFlushTime uint64

	mustStop bool

	blocksDropped *metrics.Counter
	bytesDropped  *metrics.Counter

	blocksWritten *metrics.Counter
	bytesWritten  *metrics.Counter

	blocksRead *metrics.Counter
	bytesRead  *metrics.Counter

	bytesPending *metrics.Gauge

// ResetIfEmpty resets q if it is empty.
// This is needed in order to remove chunk file associated with empty q.
func (q *Queue) ResetIfEmpty() {

	if q.readerOffset != q.writerOffset {
		// The queue isn't empty.
	if q.readerOffset < 16*1024*1024 {
		// The file is too small to drop. Leave it as is in order to reduce filesystem load.
	if q.readerPath != q.writerPath {
		logger.Panicf("BUG: readerPath=%q doesn't match writerPath=%q", q.readerPath, q.writerPath)


	q.writerOffset = 0
	q.writerLocalOffset = 0
	q.writerFlushedOffset = 0

	q.readerOffset = 0
	q.readerLocalOffset = 0

	q.writerPath = q.chunkFilePath(q.writerOffset)
	w, err := filestream.Create(q.writerPath, false)
	if err != nil {
		logger.Panicf("FATAL: cannot create chunk file %q: %s", q.writerPath, err)
	q.writer = w

	q.readerPath = q.writerPath
	r, err := filestream.Open(q.readerPath, true)
	if err != nil {
		logger.Panicf("FATAL: cannot open chunk file %q: %s", q.readerPath, err)
	q.reader = r

	if err := q.flushMetainfoLocked(); err != nil {
		logger.Panicf("FATAL: cannot flush metainfo: %s", err)

// GetPendingBytes returns the number of pending bytes in the queue.
func (q *Queue) GetPendingBytes() uint64 {
	n := q.writerOffset - q.readerOffset
	return n

// MustOpen opens persistent queue from the given path.
// If maxPendingBytes is greater than 0, then the max queue size is limited by this value.
// The oldest data is deleted when queue size exceeds maxPendingBytes.
func MustOpen(path, name string, maxPendingBytes int) *Queue {
	if maxPendingBytes < 0 {
		maxPendingBytes = 0
	return mustOpen(path, name, defaultChunkFileSize, MaxBlockSize, uint64(maxPendingBytes))

func mustOpen(path, name string, chunkFileSize, maxBlockSize, maxPendingBytes uint64) *Queue {
	if chunkFileSize < 8 || chunkFileSize-8 < maxBlockSize {
		logger.Panicf("BUG: too small chunkFileSize=%d for maxBlockSize=%d; chunkFileSize must fit at least one block", chunkFileSize, maxBlockSize)
	if maxBlockSize <= 0 {
		logger.Panicf("BUG: maxBlockSize must be greater than 0; got %d", maxBlockSize)
	q, err := tryOpeningQueue(path, name, chunkFileSize, maxBlockSize, maxPendingBytes)
	if err != nil {
		logger.Errorf("cannot open persistent queue at %q: %s; cleaning it up and trying again", path, err)
		q, err = tryOpeningQueue(path, name, chunkFileSize, maxBlockSize, maxPendingBytes)
		if err != nil {
			logger.Panicf("FATAL: %s", err)
	return q

func mustCreateFlockFile(path string) *os.File {
	f, err := fs.CreateFlockFile(path)
	if err != nil {
		logger.Panicf("FATAL: %s", err)
	return f

func tryOpeningQueue(path, name string, chunkFileSize, maxBlockSize, maxPendingBytes uint64) (*Queue, error) {
	// Protect from concurrent opens.
	var q Queue
	q.chunkFileSize = chunkFileSize
	q.maxBlockSize = maxBlockSize
	q.maxPendingBytes = maxPendingBytes
	q.dir = path = name
	q.cond.L = &

	q.blocksDropped = metrics.GetOrCreateCounter(fmt.Sprintf(`vm_persistentqueue_blocks_dropped_total{path=%q}`, path))
	q.bytesDropped = metrics.GetOrCreateCounter(fmt.Sprintf(`vm_persistentqueue_bytes_dropped_total{path=%q}`, path))
	q.blocksWritten = metrics.GetOrCreateCounter(fmt.Sprintf(`vm_persistentqueue_blocks_written_total{path=%q}`, path))
	q.bytesWritten = metrics.GetOrCreateCounter(fmt.Sprintf(`vm_persistentqueue_bytes_written_total{path=%q}`, path))
	q.blocksRead = metrics.GetOrCreateCounter(fmt.Sprintf(`vm_persistentqueue_blocks_read_total{path=%q}`, path))
	q.bytesRead = metrics.GetOrCreateCounter(fmt.Sprintf(`vm_persistentqueue_bytes_read_total{path=%q}`, path))
	q.bytesPending = metrics.GetOrCreateGauge(fmt.Sprintf(`vm_persistentqueue_bytes_pending{path=%q}`, path), func() float64 {
		return float64(q.GetPendingBytes())

	cleanOnError := func() {
		if q.reader != nil {
		if q.writer != nil {

	if err := fs.MkdirAllIfNotExist(path); err != nil {
		return nil, fmt.Errorf("cannot create directory %q: %w", path, err)
	q.flockF = mustCreateFlockFile(path)
	mustCloseFlockF := true
	defer func() {
		if mustCloseFlockF {
			_ = q.flockF.Close()

	// Read metainfo.
	var mi metainfo
	metainfoPath := q.metainfoPath()
	if err := mi.ReadFromFile(metainfoPath); err != nil {
		if !os.IsNotExist(err) {
			logger.Errorf("cannot read metainfo for persistent queue from %q: %s; re-creating %q", metainfoPath, err, path)

		// path contents is broken or missing. Re-create it from scratch.
		mi.Name =
		if err := mi.WriteToFile(metainfoPath); err != nil {
			return nil, fmt.Errorf("cannot create %q: %w", metainfoPath, err)

		// Create initial chunk file.
		filepath := q.chunkFilePath(0)
		if err := fs.WriteFileAtomically(filepath, nil); err != nil {
			return nil, fmt.Errorf("cannot create %q: %w", filepath, err)
	if mi.Name != {
		return nil, fmt.Errorf("unexpected queue name; got %q; want %q", mi.Name,

	// Locate reader and writer chunks in the path.
	fis, err := ioutil.ReadDir(path)
	if err != nil {
		return nil, fmt.Errorf("cannot read contents of the directory %q: %w", path, err)
	for _, fi := range fis {
		fname := fi.Name()
		filepath := path + "/" + fname
		if fi.IsDir() {
			logger.Errorf("skipping unknown directory %q", filepath)
		if fname == "metainfo.json" {
			// skip metainfo file
		if fname == "flock.lock" {
			// skip flock file
		if !chunkFileNameRegex.MatchString(fname) {
			logger.Errorf("skipping unknown file %q", filepath)
		offset, err := strconv.ParseUint(fname, 16, 64)
		if err != nil {
			logger.Panicf("BUG: cannot parse hex %q: %s", fname, err)
		if offset%q.chunkFileSize != 0 {
			logger.Errorf("unexpected offset for chunk file %q: %d; it must be multiple of %d; removing the file", filepath, offset, q.chunkFileSize)
		if mi.ReaderOffset >= offset+q.chunkFileSize {
			logger.Errorf("unexpected chunk file found from the past: %q; removing it", filepath)
		if mi.WriterOffset < offset {
			logger.Errorf("unexpected chunk file found from the future: %q; removing it", filepath)
		if mi.ReaderOffset >= offset && mi.ReaderOffset < offset+q.chunkFileSize {
			// Found the chunk for reading
			if q.reader != nil {
				logger.Panicf("BUG: reader is already initialized with readerPath=%q, readerOffset=%d, readerLocalOffset=%d",
					q.readerPath, q.readerOffset, q.readerLocalOffset)
			q.readerPath = filepath
			q.readerOffset = mi.ReaderOffset
			q.readerLocalOffset = mi.ReaderOffset % q.chunkFileSize
			if fileSize := fs.MustFileSize(q.readerPath); fileSize < q.readerLocalOffset {
				logger.Errorf("chunk file %q size is too small for the given reader offset; file size %d bytes; reader offset: %d bytes; removing the file",
					q.readerPath, fileSize, q.readerLocalOffset)
			r, err := filestream.OpenReaderAt(q.readerPath, int64(q.readerLocalOffset), true)
			if err != nil {
				logger.Errorf("cannot open %q for reading at offset %d: %s; removing this file", q.readerPath, q.readerLocalOffset, err)
			q.reader = r
		if mi.WriterOffset >= offset && mi.WriterOffset < offset+q.chunkFileSize {
			// Found the chunk file for writing
			if q.writer != nil {
				logger.Panicf("BUG: writer is already initialized with writerPath=%q, writerOffset=%d, writerLocalOffset=%d",
					q.writerPath, q.writerOffset, q.writerLocalOffset)
			q.writerPath = filepath
			q.writerOffset = mi.WriterOffset
			q.writerLocalOffset = mi.WriterOffset % q.chunkFileSize
			q.writerFlushedOffset = mi.WriterOffset
			if fileSize := fs.MustFileSize(q.writerPath); fileSize != q.writerLocalOffset {
				if fileSize < q.writerLocalOffset {
					logger.Errorf("%q size (%d bytes) is smaller than the writer offset (%d bytes); removing the file",
						q.writerPath, fileSize, q.writerLocalOffset)
				logger.Warnf("%q size (%d bytes) is bigger than writer offset (%d bytes); "+
					"this may be the case on unclean shutdown (OOM, `kill -9`, hardware reset); trying to fix it by adjusting fileSize to %d",
					q.writerPath, fileSize, q.writerLocalOffset, q.writerLocalOffset)
			w, err := filestream.OpenWriterAt(q.writerPath, int64(q.writerLocalOffset), false)
			if err != nil {
				logger.Errorf("cannot open %q for writing at offset %d: %s; removing this file", q.writerPath, q.writerLocalOffset, err)
			q.writer = w
	if q.reader == nil {
		return nil, fmt.Errorf("couldn't find chunk file for reading in %q", q.dir)
	if q.writer == nil {
		return nil, fmt.Errorf("couldn't find chunk file for writing in %q", q.dir)
	if q.readerOffset > q.writerOffset {
		return nil, fmt.Errorf("readerOffset=%d cannot exceed writerOffset=%d", q.readerOffset, q.writerOffset)
	mustCloseFlockF = false
	return &q, nil

// MustClose closes q.
// It unblocks all the MustReadBlock calls.
// MustWriteBlock mustn't be called during and after the call to MustClose.
func (q *Queue) MustClose() {

	// Unblock goroutines blocked on cond in MustReadBlock.
	q.mustStop = true

	// Close writer.
	q.writer = nil

	// Close reader.
	q.reader = nil

	// Store metainfo
	if err := q.flushMetainfoLocked(); err != nil {
		logger.Panicf("FATAL: cannot flush chunked queue metainfo: %s", err)

	// Close flockF
	if err := q.flockF.Close(); err != nil {
		logger.Panicf("FATAL: cannot close flock file: %s", err)
	q.flockF = nil

func (q *Queue) chunkFilePath(offset uint64) string {
	return fmt.Sprintf("%s/%016X", q.dir, offset)

func (q *Queue) metainfoPath() string {
	return q.dir + "/metainfo.json"

// MustWriteBlock writes block to q.
// The block size cannot exceed MaxBlockSize.
// It is safe calling this function from concurrent goroutines.
func (q *Queue) MustWriteBlock(block []byte) {
	if uint64(len(block)) > q.maxBlockSize {
		logger.Panicf("BUG: too big block to send: %d bytes; it mustn't exceed %d bytes", len(block), q.maxBlockSize)

	if q.mustStop {
		logger.Panicf("BUG: MustWriteBlock cannot be called after MustClose")
	if q.readerOffset > q.writerOffset {
		logger.Panicf("BUG: readerOffset=%d shouldn't exceed writerOffset=%d", q.readerOffset, q.writerOffset)
	if q.maxPendingBytes > 0 {
		// Drain the oldest blocks until the number of pending bytes becomes enough for the block.
		blockSize := uint64(len(block) + 8)
		maxPendingBytes := q.maxPendingBytes
		if blockSize < maxPendingBytes {
			maxPendingBytes -= blockSize
		} else {
			maxPendingBytes = 0
		bb := blockBufPool.Get()
		for q.writerOffset-q.readerOffset > maxPendingBytes {
			var err error
			bb.B, err = q.readBlockLocked(bb.B[:0])
			if err != nil {
				logger.Panicf("FATAL: cannot read the oldest block %s", err)
		if blockSize > q.maxPendingBytes {
			// The block is too big to put it into the queue. Drop it.
	if err := q.writeBlockLocked(block); err != nil {
		logger.Panicf("FATAL: %s", err)

	// Notify blocked reader if any.
	// See for details.

var blockBufPool bytesutil.ByteBufferPool

func (q *Queue) writeBlockLocked(block []byte) error {
	if q.writerLocalOffset+q.maxBlockSize+8 > q.chunkFileSize {
		// Finalize the current chunk and start new one.
		// There is no need to do fs.MustSyncPath(q.writerPath) here,
		// since MustClose already does this.
		if n := q.writerOffset % q.chunkFileSize; n > 0 {
			q.writerOffset += (q.chunkFileSize - n)
		q.writerFlushedOffset = q.writerOffset
		q.writerLocalOffset = 0
		q.writerPath = q.chunkFilePath(q.writerOffset)
		w, err := filestream.Create(q.writerPath, false)
		if err != nil {
			return fmt.Errorf("cannot create chunk file %q: %w", q.writerPath, err)
		q.writer = w
		if err := q.flushMetainfoLocked(); err != nil {
			return fmt.Errorf("cannot flush metainfo: %w", err)

	// Write block len.
	blockLen := uint64(len(block))
	header := headerBufPool.Get()
	header.B = encoding.MarshalUint64(header.B, blockLen)
	err := q.write(header.B)
	if err != nil {
		return fmt.Errorf("cannot write header with size 8 bytes to %q: %w", q.writerPath, err)

	// Write block contents.
	if err := q.write(block); err != nil {
		return fmt.Errorf("cannot write block contents with size %d bytes to %q: %w", len(block), q.writerPath, err)
	return q.flushMetainfoIfNeededLocked(true)

// MustReadBlock appends the next block from q to dst and returns the result.
// false is returned after MustClose call.
// It is safe calling this function from concurrent goroutines.
func (q *Queue) MustReadBlock(dst []byte) ([]byte, bool) {

	for {
		if q.mustStop {
			return dst, false
		if q.readerOffset > q.writerOffset {
			logger.Panicf("BUG: readerOffset=%d cannot exceed writerOffset=%d", q.readerOffset, q.writerOffset)
		if q.readerOffset < q.writerOffset {

	data, err := q.readBlockLocked(dst)
	if err != nil {
		// Skip the current chunk, since it may be broken.
		q.readerOffset += q.chunkFileSize - q.readerOffset%q.chunkFileSize
		_ = q.flushMetainfoLocked()
		logger.Panicf("FATAL: %s", err)
	return data, true

func (q *Queue) readBlockLocked(dst []byte) ([]byte, error) {
	if q.readerLocalOffset+q.maxBlockSize+8 > q.chunkFileSize {
		// Remove the current chunk and go to the next chunk.
		if n := q.readerOffset % q.chunkFileSize; n > 0 {
			q.readerOffset += (q.chunkFileSize - n)
		q.readerLocalOffset = 0
		q.readerPath = q.chunkFilePath(q.readerOffset)
		r, err := filestream.Open(q.readerPath, true)
		if err != nil {
			return dst, fmt.Errorf("cannot open chunk file %q: %w", q.readerPath, err)
		q.reader = r
		if err := q.flushMetainfoLocked(); err != nil {
			return dst, fmt.Errorf("cannot flush metainfo: %w", err)

	// Read block len.
	header := headerBufPool.Get()
	header.B = bytesutil.Resize(header.B, 8)
	err := q.readFull(header.B)
	blockLen := encoding.UnmarshalUint64(header.B)
	if err != nil {
		return dst, fmt.Errorf("cannot read header with size 8 bytes from %q: %w", q.readerPath, err)
	if blockLen > q.maxBlockSize {
		return dst, fmt.Errorf("too big block size read from %q: %d bytes; cannot exceed %d bytes", q.readerPath, blockLen, q.maxBlockSize)

	// Read block contents.
	dstLen := len(dst)
	dst = bytesutil.Resize(dst, dstLen+int(blockLen))
	if err := q.readFull(dst[dstLen:]); err != nil {
		return dst, fmt.Errorf("cannot read block contents with size %d bytes from %q: %w", blockLen, q.readerPath, err)
	if err := q.flushMetainfoIfNeededLocked(false); err != nil {
		return dst, err
	return dst, nil

func (q *Queue) write(buf []byte) error {
	bufLen := uint64(len(buf))
	n, err := q.writer.Write(buf)
	if err != nil {
		return err
	if uint64(n) != bufLen {
		return fmt.Errorf("unexpected number of bytes written; got %d bytes; want %d bytes", n, bufLen)
	q.writerLocalOffset += bufLen
	q.writerOffset += bufLen
	return nil

func (q *Queue) readFull(buf []byte) error {
	bufLen := uint64(len(buf))
	if q.readerOffset+bufLen > q.writerFlushedOffset {
		q.writerFlushedOffset = q.writerOffset
	n, err := io.ReadFull(q.reader, buf)
	if err != nil {
		return err
	if uint64(n) != bufLen {
		return fmt.Errorf("unexpected number of bytes read; got %d bytes; want %d bytes", n, bufLen)
	q.readerLocalOffset += bufLen
	q.readerOffset += bufLen
	return nil

func (q *Queue) flushMetainfoIfNeededLocked(flushData bool) error {
	t := fasttime.UnixTimestamp()
	if t == q.lastMetainfoFlushTime {
		return nil
	if flushData {
	if err := q.flushMetainfoLocked(); err != nil {
		return fmt.Errorf("cannot flush metainfo: %w", err)
	q.lastMetainfoFlushTime = t
	return nil

func (q *Queue) flushMetainfoLocked() error {
	mi := &metainfo{
		ReaderOffset: q.readerOffset,
		WriterOffset: q.writerOffset,
	metainfoPath := q.metainfoPath()
	if err := mi.WriteToFile(metainfoPath); err != nil {
		return fmt.Errorf("cannot write metainfo to %q: %w", metainfoPath, err)
	return nil

var headerBufPool bytesutil.ByteBufferPool

type metainfo struct {
	Name         string
	ReaderOffset uint64
	WriterOffset uint64

func (mi *metainfo) Reset() {
	mi.ReaderOffset = 0
	mi.WriterOffset = 0

func (mi *metainfo) WriteToFile(path string) error {
	data, err := json.Marshal(mi)
	if err != nil {
		return fmt.Errorf("cannot marshal persistent queue metainfo %#v: %w", mi, err)
	if err := ioutil.WriteFile(path, data, 0600); err != nil {
		return fmt.Errorf("cannot write persistent queue metainfo to %q: %w", path, err)
	return nil

func (mi *metainfo) ReadFromFile(path string) error {
	data, err := ioutil.ReadFile(path)
	if err != nil {
		if os.IsNotExist(err) {
			return err
		return fmt.Errorf("cannot read %q: %w", path, err)
	if err := json.Unmarshal(data, mi); err != nil {
		return fmt.Errorf("cannot unmarshal persistent queue metainfo from %q: %w", path, err)
	if mi.ReaderOffset > mi.WriterOffset {
		return fmt.Errorf("invalid data read from %q: readerOffset=%d cannot exceed writerOffset=%d", path, mi.ReaderOffset, mi.WriterOffset)
	return nil