From b80d93d4b29457400bf59839d3e174b15ee43fd6 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Fri, 14 Apr 2023 14:39:26 -0700 Subject: [PATCH] lib/fs: substitute ReadFullData with MustReadData Callers of ReadFullData() log the error and then exit. So let's log the error with the path to the filename and the call stack inside MustReadData(). This simplifies the code at callers' side, while leaving the debuggability at the same level. --- lib/fs/fs.go | 11 +++++---- lib/mergeset/block_stream_reader.go | 14 +++--------- lib/storage/block_stream_reader.go | 35 +++++++---------------------- 3 files changed, 16 insertions(+), 44 deletions(-) diff --git a/lib/fs/fs.go b/lib/fs/fs.go index e7fe8450a6..85c0d75c61 100644 --- a/lib/fs/fs.go +++ b/lib/fs/fs.go @@ -337,19 +337,18 @@ func MustCopyFile(srcPath, dstPath string) { MustSyncPath(dstPath) } -// ReadFullData reads len(data) bytes from r. -func ReadFullData(r io.Reader, data []byte) error { +// MustReadData reads len(data) bytes from r. +func MustReadData(r filestream.ReadCloser, data []byte) { n, err := io.ReadFull(r, data) if err != nil { if err == io.EOF { - return io.EOF + return } - return fmt.Errorf("cannot read %d bytes; read only %d bytes; error: %w", len(data), n, err) + logger.Panicf("FATAL: cannot read %d bytes from %s; read only %d bytes; error: %s", len(data), r.Path(), n, err) } if n != len(data) { - logger.Panicf("BUG: io.ReadFull read only %d bytes; must read %d bytes", n, len(data)) + logger.Panicf("BUG: io.ReadFull read only %d bytes from %s; must read %d bytes", n, r.Path(), len(data)) } - return nil } // MustWriteData writes data to w. diff --git a/lib/mergeset/block_stream_reader.go b/lib/mergeset/block_stream_reader.go index 2df51b6915..c2d4a3d3d1 100644 --- a/lib/mergeset/block_stream_reader.go +++ b/lib/mergeset/block_stream_reader.go @@ -234,16 +234,10 @@ func (bsr *blockStreamReader) Next() bool { bsr.bhIdx++ bsr.sb.itemsData = bytesutil.ResizeNoCopyMayOverallocate(bsr.sb.itemsData, int(bsr.bh.itemsBlockSize)) - if err := fs.ReadFullData(bsr.itemsReader, bsr.sb.itemsData); err != nil { - bsr.err = fmt.Errorf("cannot read compressed items block with size %d: %w", bsr.bh.itemsBlockSize, err) - return false - } + fs.MustReadData(bsr.itemsReader, bsr.sb.itemsData) bsr.sb.lensData = bytesutil.ResizeNoCopyMayOverallocate(bsr.sb.lensData, int(bsr.bh.lensBlockSize)) - if err := fs.ReadFullData(bsr.lensReader, bsr.sb.lensData); err != nil { - bsr.err = fmt.Errorf("cannot read compressed lens block with size %d: %w", bsr.bh.lensBlockSize, err) - return false - } + fs.MustReadData(bsr.lensReader, bsr.sb.lensData) if err := bsr.Block.UnmarshalData(&bsr.sb, bsr.bh.firstItem, bsr.bh.commonPrefix, bsr.bh.itemsCount, bsr.bh.marshalType); err != nil { bsr.err = fmt.Errorf("cannot unmarshal inmemoryBlock from storageBlock with firstItem=%X, commonPrefix=%X, itemsCount=%d, marshalType=%d: %w", @@ -283,9 +277,7 @@ func (bsr *blockStreamReader) readNextBHS() error { // Read compressed index block. bsr.packedBuf = bytesutil.ResizeNoCopyMayOverallocate(bsr.packedBuf, int(mr.indexBlockSize)) - if err := fs.ReadFullData(bsr.indexReader, bsr.packedBuf); err != nil { - return fmt.Errorf("cannot read compressed index block with size %d: %w", mr.indexBlockSize, err) - } + fs.MustReadData(bsr.indexReader, bsr.packedBuf) // Unpack the compressed index block. var err error diff --git a/lib/storage/block_stream_reader.go b/lib/storage/block_stream_reader.go index 4f15ef6778..58e24c6c17 100644 --- a/lib/storage/block_stream_reader.go +++ b/lib/storage/block_stream_reader.go @@ -30,13 +30,9 @@ type blockStreamReader struct { ph partHeader - // Use io.Reader type for timestampsReader and valuesReader - // in order to remove I2I conversion in readBlock - // when passing them to fs.ReadFullData - timestampsReader io.Reader - valuesReader io.Reader - - indexReader filestream.ReadCloser + timestampsReader filestream.ReadCloser + valuesReader filestream.ReadCloser + indexReader filestream.ReadCloser mrs []metaindexRow @@ -68,11 +64,6 @@ type blockStreamReader struct { err error } -func (bsr *blockStreamReader) assertWriteClosers() { - _ = bsr.timestampsReader.(filestream.ReadCloser) - _ = bsr.valuesReader.(filestream.ReadCloser) -} - func (bsr *blockStreamReader) reset() { bsr.Block.Reset() @@ -128,8 +119,6 @@ func (bsr *blockStreamReader) InitFromInmemoryPart(mp *inmemoryPart) { if err != nil { logger.Panicf("BUG: cannot unmarshal metaindex rows from inmemoryPart: %s", err) } - - bsr.assertWriteClosers() } // InitFromFilePart initializes bsr from a file-based part on the given path. @@ -189,8 +178,6 @@ func (bsr *blockStreamReader) InitFromFilePart(path string) error { bsr.indexReader = indexFile bsr.mrs = mrs - bsr.assertWriteClosers() - return nil } @@ -198,8 +185,8 @@ func (bsr *blockStreamReader) InitFromFilePart(path string) error { // // It closes *Reader files passed to Init. func (bsr *blockStreamReader) MustClose() { - bsr.timestampsReader.(filestream.ReadCloser).MustClose() - bsr.valuesReader.(filestream.ReadCloser).MustClose() + bsr.timestampsReader.MustClose() + bsr.valuesReader.MustClose() bsr.indexReader.MustClose() bsr.reset() @@ -309,18 +296,14 @@ func (bsr *blockStreamReader) readBlock() error { bsr.Block.timestampsData = append(bsr.Block.timestampsData[:0], bsr.prevTimestampsData...) } else { bsr.Block.timestampsData = bytesutil.ResizeNoCopyMayOverallocate(bsr.Block.timestampsData, int(bsr.Block.bh.TimestampsBlockSize)) - if err := fs.ReadFullData(bsr.timestampsReader, bsr.Block.timestampsData); err != nil { - return fmt.Errorf("cannot read timestamps block at offset %d: %w", bsr.timestampsBlockOffset, err) - } + fs.MustReadData(bsr.timestampsReader, bsr.Block.timestampsData) bsr.prevTimestampsBlockOffset = bsr.timestampsBlockOffset bsr.prevTimestampsData = append(bsr.prevTimestampsData[:0], bsr.Block.timestampsData...) } // Read values data. bsr.Block.valuesData = bytesutil.ResizeNoCopyMayOverallocate(bsr.Block.valuesData, int(bsr.Block.bh.ValuesBlockSize)) - if err := fs.ReadFullData(bsr.valuesReader, bsr.Block.valuesData); err != nil { - return fmt.Errorf("cannot read values block at offset %d: %w", bsr.valuesBlockOffset, err) - } + fs.MustReadData(bsr.valuesReader, bsr.Block.valuesData) // Update offsets. if !usePrevTimestamps { @@ -353,9 +336,7 @@ func (bsr *blockStreamReader) readIndexBlock() error { // Read index block. bsr.compressedIndexData = bytesutil.ResizeNoCopyMayOverallocate(bsr.compressedIndexData, int(bsr.mr.IndexBlockSize)) - if err := fs.ReadFullData(bsr.indexReader, bsr.compressedIndexData); err != nil { - return fmt.Errorf("cannot read index block at offset %d: %w", bsr.indexBlockOffset, err) - } + fs.MustReadData(bsr.indexReader, bsr.compressedIndexData) tmpData, err := encoding.DecompressZSTD(bsr.indexData[:0], bsr.compressedIndexData) if err != nil { return fmt.Errorf("cannot decompress index block at offset %d: %w", bsr.indexBlockOffset, err)