lib/persistentqueue: check that readerOffset doesnt exceed writerOffset after each readerOffset increase

This should help detecting the source of the panic from https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1981
This commit is contained in:
Aliaksandr Valialkin 2021-12-20 17:25:08 +02:00
parent ad388ecd78
commit 5a36e241f4
No known key found for this signature in database
GPG key ID: A72BEC6CD3D0DED1

View file

@ -113,6 +113,9 @@ func (q *queue) mustResetFiles() {
// GetPendingBytes returns the number of pending bytes in the queue.
func (q *queue) GetPendingBytes() uint64 {
if q.readerOffset > q.writerOffset {
logger.Panicf("BUG: readerOffset=%d cannot exceed writerOffset=%d", q.readerOffset, q.writerOffset)
}
n := q.writerOffset - q.readerOffset
return n
}
@ -555,6 +558,9 @@ func (q *queue) nextChunkFileForRead() error {
if n := q.readerOffset % q.chunkFileSize; n > 0 {
q.readerOffset += q.chunkFileSize - n
}
if err := q.checkReaderWriterOffsets(); err != nil {
return err
}
q.readerLocalOffset = 0
q.readerPath = q.chunkFilePath(q.readerOffset)
r, err := filestream.Open(q.readerPath, true)
@ -598,6 +604,14 @@ func (q *queue) readFull(buf []byte) error {
}
q.readerLocalOffset += bufLen
q.readerOffset += bufLen
return q.checkReaderWriterOffsets()
}
func (q *queue) checkReaderWriterOffsets() error {
if q.readerOffset > q.writerOffset {
return fmt.Errorf("readerOffset=%d cannot exceed writerOffset=%d; it is likely persistent queue files were corrupted on unclean shutdown",
q.readerOffset, q.writerOffset)
}
return nil
}