diff --git a/lib/persistentqueue/persistentqueue.go b/lib/persistentqueue/persistentqueue.go index 7af7933ce..3bbb9c519 100644 --- a/lib/persistentqueue/persistentqueue.go +++ b/lib/persistentqueue/persistentqueue.go @@ -113,6 +113,9 @@ func (q *queue) mustResetFiles() { // GetPendingBytes returns the number of pending bytes in the queue. func (q *queue) GetPendingBytes() uint64 { + if q.readerOffset > q.writerOffset { + logger.Panicf("BUG: readerOffset=%d cannot exceed writerOffset=%d", q.readerOffset, q.writerOffset) + } n := q.writerOffset - q.readerOffset return n } @@ -555,6 +558,9 @@ func (q *queue) nextChunkFileForRead() error { if n := q.readerOffset % q.chunkFileSize; n > 0 { q.readerOffset += q.chunkFileSize - n } + if err := q.checkReaderWriterOffsets(); err != nil { + return err + } q.readerLocalOffset = 0 q.readerPath = q.chunkFilePath(q.readerOffset) r, err := filestream.Open(q.readerPath, true) @@ -598,6 +604,14 @@ func (q *queue) readFull(buf []byte) error { } q.readerLocalOffset += bufLen q.readerOffset += bufLen + return q.checkReaderWriterOffsets() +} + +func (q *queue) checkReaderWriterOffsets() error { + if q.readerOffset > q.writerOffset { + return fmt.Errorf("readerOffset=%d cannot exceed writerOffset=%d; it is likely persistent queue files were corrupted on unclean shutdown", + q.readerOffset, q.writerOffset) + } return nil }