mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-03-21 15:45:01 +00:00
lib/fs: substitute ReadFullData with MustReadData
Callers of ReadFullData() log the error and then exit. So let's log the error with the path to the filename and the call stack inside MustReadData(). This simplifies the code at callers' side, while leaving the debuggability at the same level.
This commit is contained in:
parent
bd6de6406a
commit
f341b7b3f8
3 changed files with 16 additions and 44 deletions
11
lib/fs/fs.go
11
lib/fs/fs.go
|
@ -337,19 +337,18 @@ func MustCopyFile(srcPath, dstPath string) {
|
||||||
MustSyncPath(dstPath)
|
MustSyncPath(dstPath)
|
||||||
}
|
}
|
||||||
|
|
||||||
// ReadFullData reads len(data) bytes from r.
|
// MustReadData reads len(data) bytes from r.
|
||||||
func ReadFullData(r io.Reader, data []byte) error {
|
func MustReadData(r filestream.ReadCloser, data []byte) {
|
||||||
n, err := io.ReadFull(r, data)
|
n, err := io.ReadFull(r, data)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if err == io.EOF {
|
if err == io.EOF {
|
||||||
return io.EOF
|
return
|
||||||
}
|
}
|
||||||
return fmt.Errorf("cannot read %d bytes; read only %d bytes; error: %w", len(data), n, err)
|
logger.Panicf("FATAL: cannot read %d bytes from %s; read only %d bytes; error: %s", len(data), r.Path(), n, err)
|
||||||
}
|
}
|
||||||
if n != len(data) {
|
if n != len(data) {
|
||||||
logger.Panicf("BUG: io.ReadFull read only %d bytes; must read %d bytes", n, len(data))
|
logger.Panicf("BUG: io.ReadFull read only %d bytes from %s; must read %d bytes", n, r.Path(), len(data))
|
||||||
}
|
}
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// MustWriteData writes data to w.
|
// MustWriteData writes data to w.
|
||||||
|
|
|
@ -234,16 +234,10 @@ func (bsr *blockStreamReader) Next() bool {
|
||||||
bsr.bhIdx++
|
bsr.bhIdx++
|
||||||
|
|
||||||
bsr.sb.itemsData = bytesutil.ResizeNoCopyMayOverallocate(bsr.sb.itemsData, int(bsr.bh.itemsBlockSize))
|
bsr.sb.itemsData = bytesutil.ResizeNoCopyMayOverallocate(bsr.sb.itemsData, int(bsr.bh.itemsBlockSize))
|
||||||
if err := fs.ReadFullData(bsr.itemsReader, bsr.sb.itemsData); err != nil {
|
fs.MustReadData(bsr.itemsReader, bsr.sb.itemsData)
|
||||||
bsr.err = fmt.Errorf("cannot read compressed items block with size %d: %w", bsr.bh.itemsBlockSize, err)
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
bsr.sb.lensData = bytesutil.ResizeNoCopyMayOverallocate(bsr.sb.lensData, int(bsr.bh.lensBlockSize))
|
bsr.sb.lensData = bytesutil.ResizeNoCopyMayOverallocate(bsr.sb.lensData, int(bsr.bh.lensBlockSize))
|
||||||
if err := fs.ReadFullData(bsr.lensReader, bsr.sb.lensData); err != nil {
|
fs.MustReadData(bsr.lensReader, bsr.sb.lensData)
|
||||||
bsr.err = fmt.Errorf("cannot read compressed lens block with size %d: %w", bsr.bh.lensBlockSize, err)
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := bsr.Block.UnmarshalData(&bsr.sb, bsr.bh.firstItem, bsr.bh.commonPrefix, bsr.bh.itemsCount, bsr.bh.marshalType); err != nil {
|
if err := bsr.Block.UnmarshalData(&bsr.sb, bsr.bh.firstItem, bsr.bh.commonPrefix, bsr.bh.itemsCount, bsr.bh.marshalType); err != nil {
|
||||||
bsr.err = fmt.Errorf("cannot unmarshal inmemoryBlock from storageBlock with firstItem=%X, commonPrefix=%X, itemsCount=%d, marshalType=%d: %w",
|
bsr.err = fmt.Errorf("cannot unmarshal inmemoryBlock from storageBlock with firstItem=%X, commonPrefix=%X, itemsCount=%d, marshalType=%d: %w",
|
||||||
|
@ -283,9 +277,7 @@ func (bsr *blockStreamReader) readNextBHS() error {
|
||||||
|
|
||||||
// Read compressed index block.
|
// Read compressed index block.
|
||||||
bsr.packedBuf = bytesutil.ResizeNoCopyMayOverallocate(bsr.packedBuf, int(mr.indexBlockSize))
|
bsr.packedBuf = bytesutil.ResizeNoCopyMayOverallocate(bsr.packedBuf, int(mr.indexBlockSize))
|
||||||
if err := fs.ReadFullData(bsr.indexReader, bsr.packedBuf); err != nil {
|
fs.MustReadData(bsr.indexReader, bsr.packedBuf)
|
||||||
return fmt.Errorf("cannot read compressed index block with size %d: %w", mr.indexBlockSize, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Unpack the compressed index block.
|
// Unpack the compressed index block.
|
||||||
var err error
|
var err error
|
||||||
|
|
|
@ -30,13 +30,9 @@ type blockStreamReader struct {
|
||||||
|
|
||||||
ph partHeader
|
ph partHeader
|
||||||
|
|
||||||
// Use io.Reader type for timestampsReader and valuesReader
|
timestampsReader filestream.ReadCloser
|
||||||
// in order to remove I2I conversion in readBlock
|
valuesReader filestream.ReadCloser
|
||||||
// when passing them to fs.ReadFullData
|
indexReader filestream.ReadCloser
|
||||||
timestampsReader io.Reader
|
|
||||||
valuesReader io.Reader
|
|
||||||
|
|
||||||
indexReader filestream.ReadCloser
|
|
||||||
|
|
||||||
mrs []metaindexRow
|
mrs []metaindexRow
|
||||||
|
|
||||||
|
@ -68,11 +64,6 @@ type blockStreamReader struct {
|
||||||
err error
|
err error
|
||||||
}
|
}
|
||||||
|
|
||||||
func (bsr *blockStreamReader) assertWriteClosers() {
|
|
||||||
_ = bsr.timestampsReader.(filestream.ReadCloser)
|
|
||||||
_ = bsr.valuesReader.(filestream.ReadCloser)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (bsr *blockStreamReader) reset() {
|
func (bsr *blockStreamReader) reset() {
|
||||||
bsr.Block.Reset()
|
bsr.Block.Reset()
|
||||||
|
|
||||||
|
@ -128,8 +119,6 @@ func (bsr *blockStreamReader) InitFromInmemoryPart(mp *inmemoryPart) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Panicf("BUG: cannot unmarshal metaindex rows from inmemoryPart: %s", err)
|
logger.Panicf("BUG: cannot unmarshal metaindex rows from inmemoryPart: %s", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
bsr.assertWriteClosers()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// InitFromFilePart initializes bsr from a file-based part on the given path.
|
// InitFromFilePart initializes bsr from a file-based part on the given path.
|
||||||
|
@ -189,8 +178,6 @@ func (bsr *blockStreamReader) InitFromFilePart(path string) error {
|
||||||
bsr.indexReader = indexFile
|
bsr.indexReader = indexFile
|
||||||
bsr.mrs = mrs
|
bsr.mrs = mrs
|
||||||
|
|
||||||
bsr.assertWriteClosers()
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -198,8 +185,8 @@ func (bsr *blockStreamReader) InitFromFilePart(path string) error {
|
||||||
//
|
//
|
||||||
// It closes *Reader files passed to Init.
|
// It closes *Reader files passed to Init.
|
||||||
func (bsr *blockStreamReader) MustClose() {
|
func (bsr *blockStreamReader) MustClose() {
|
||||||
bsr.timestampsReader.(filestream.ReadCloser).MustClose()
|
bsr.timestampsReader.MustClose()
|
||||||
bsr.valuesReader.(filestream.ReadCloser).MustClose()
|
bsr.valuesReader.MustClose()
|
||||||
bsr.indexReader.MustClose()
|
bsr.indexReader.MustClose()
|
||||||
|
|
||||||
bsr.reset()
|
bsr.reset()
|
||||||
|
@ -309,18 +296,14 @@ func (bsr *blockStreamReader) readBlock() error {
|
||||||
bsr.Block.timestampsData = append(bsr.Block.timestampsData[:0], bsr.prevTimestampsData...)
|
bsr.Block.timestampsData = append(bsr.Block.timestampsData[:0], bsr.prevTimestampsData...)
|
||||||
} else {
|
} else {
|
||||||
bsr.Block.timestampsData = bytesutil.ResizeNoCopyMayOverallocate(bsr.Block.timestampsData, int(bsr.Block.bh.TimestampsBlockSize))
|
bsr.Block.timestampsData = bytesutil.ResizeNoCopyMayOverallocate(bsr.Block.timestampsData, int(bsr.Block.bh.TimestampsBlockSize))
|
||||||
if err := fs.ReadFullData(bsr.timestampsReader, bsr.Block.timestampsData); err != nil {
|
fs.MustReadData(bsr.timestampsReader, bsr.Block.timestampsData)
|
||||||
return fmt.Errorf("cannot read timestamps block at offset %d: %w", bsr.timestampsBlockOffset, err)
|
|
||||||
}
|
|
||||||
bsr.prevTimestampsBlockOffset = bsr.timestampsBlockOffset
|
bsr.prevTimestampsBlockOffset = bsr.timestampsBlockOffset
|
||||||
bsr.prevTimestampsData = append(bsr.prevTimestampsData[:0], bsr.Block.timestampsData...)
|
bsr.prevTimestampsData = append(bsr.prevTimestampsData[:0], bsr.Block.timestampsData...)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Read values data.
|
// Read values data.
|
||||||
bsr.Block.valuesData = bytesutil.ResizeNoCopyMayOverallocate(bsr.Block.valuesData, int(bsr.Block.bh.ValuesBlockSize))
|
bsr.Block.valuesData = bytesutil.ResizeNoCopyMayOverallocate(bsr.Block.valuesData, int(bsr.Block.bh.ValuesBlockSize))
|
||||||
if err := fs.ReadFullData(bsr.valuesReader, bsr.Block.valuesData); err != nil {
|
fs.MustReadData(bsr.valuesReader, bsr.Block.valuesData)
|
||||||
return fmt.Errorf("cannot read values block at offset %d: %w", bsr.valuesBlockOffset, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Update offsets.
|
// Update offsets.
|
||||||
if !usePrevTimestamps {
|
if !usePrevTimestamps {
|
||||||
|
@ -353,9 +336,7 @@ func (bsr *blockStreamReader) readIndexBlock() error {
|
||||||
|
|
||||||
// Read index block.
|
// Read index block.
|
||||||
bsr.compressedIndexData = bytesutil.ResizeNoCopyMayOverallocate(bsr.compressedIndexData, int(bsr.mr.IndexBlockSize))
|
bsr.compressedIndexData = bytesutil.ResizeNoCopyMayOverallocate(bsr.compressedIndexData, int(bsr.mr.IndexBlockSize))
|
||||||
if err := fs.ReadFullData(bsr.indexReader, bsr.compressedIndexData); err != nil {
|
fs.MustReadData(bsr.indexReader, bsr.compressedIndexData)
|
||||||
return fmt.Errorf("cannot read index block at offset %d: %w", bsr.indexBlockOffset, err)
|
|
||||||
}
|
|
||||||
tmpData, err := encoding.DecompressZSTD(bsr.indexData[:0], bsr.compressedIndexData)
|
tmpData, err := encoding.DecompressZSTD(bsr.indexData[:0], bsr.compressedIndexData)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("cannot decompress index block at offset %d: %w", bsr.indexBlockOffset, err)
|
return fmt.Errorf("cannot decompress index block at offset %d: %w", bsr.indexBlockOffset, err)
|
||||||
|
|
Loading…
Reference in a new issue