From c0b852d50d20e4a38bb84ca470768a577dda850d Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Fri, 14 Apr 2023 15:46:09 -0700 Subject: [PATCH] lib/{storage,mergeset}: convert InitFromFilePart to MustInitFromFilePart Callers of InitFromFilePart log the error and exit. It is better to log the error with the path to the part and the call stack directly inside the MustInitFromFilePart() function. This simplifies the code at callers' side while leaving the same level of debuggability. --- lib/mergeset/block_stream_reader.go | 20 +++---- lib/mergeset/block_stream_writer.go | 2 +- lib/mergeset/inmemory_part.go | 6 +-- lib/mergeset/merge_test.go | 12 ++--- lib/mergeset/part.go | 21 +++----- lib/mergeset/part_header.go | 14 +++-- lib/mergeset/part_search_test.go | 7 +-- lib/mergeset/table.go | 54 +++++-------------- lib/storage/block_stream_reader.go | 14 ++--- lib/storage/block_stream_reader_test.go | 4 +- .../block_stream_reader_timing_test.go | 2 +- lib/storage/block_stream_writer.go | 4 +- .../block_stream_writer_timing_test.go | 4 +- lib/storage/inmemory_part.go | 2 +- lib/storage/inmemory_part_test.go | 2 +- lib/storage/merge_test.go | 6 +-- lib/storage/merge_timing_test.go | 4 +- lib/storage/part.go | 24 +++------ lib/storage/part_header.go | 20 +++---- lib/storage/part_search_test.go | 5 +- lib/storage/partition.go | 46 +++++----------- lib/storage/raw_row.go | 2 +- 22 files changed, 93 insertions(+), 182 deletions(-) diff --git a/lib/mergeset/block_stream_reader.go b/lib/mergeset/block_stream_reader.go index 8a4f729fe..e0cd81af5 100644 --- a/lib/mergeset/block_stream_reader.go +++ b/lib/mergeset/block_stream_reader.go @@ -17,7 +17,7 @@ type blockStreamReader struct { // Block contains the current block if Next returned true. Block inmemoryBlock - // isInmemoryBlock is set to true if bsr was initialized with InitFromInmemoryBlock(). + // isInmemoryBlock is set to true if bsr was initialized with MustInitFromInmemoryBlock(). isInmemoryBlock bool // The index of the current item in the Block, which is returned from CurrItem() @@ -103,16 +103,16 @@ func (bsr *blockStreamReader) String() string { return bsr.ph.String() } -// InitFromInmemoryBlock initializes bsr from the given ib. -func (bsr *blockStreamReader) InitFromInmemoryBlock(ib *inmemoryBlock) { +// MustInitFromInmemoryBlock initializes bsr from the given ib. +func (bsr *blockStreamReader) MustInitFromInmemoryBlock(ib *inmemoryBlock) { bsr.reset() bsr.Block.CopyFrom(ib) bsr.Block.SortItems() bsr.isInmemoryBlock = true } -// InitFromInmemoryPart initializes bsr from the given mp. -func (bsr *blockStreamReader) InitFromInmemoryPart(mp *inmemoryPart) { +// MustInitFromInmemoryPart initializes bsr from the given mp. +func (bsr *blockStreamReader) MustInitFromInmemoryPart(mp *inmemoryPart) { bsr.reset() var err error @@ -134,18 +134,16 @@ func (bsr *blockStreamReader) InitFromInmemoryPart(mp *inmemoryPart) { } } -// InitFromFilePart initializes bsr from a file-based part on the given path. +// MustInitFromFilePart initializes bsr from a file-based part on the given path. // // Part files are read without OS cache pollution, since the part is usually // deleted after the merge. -func (bsr *blockStreamReader) InitFromFilePart(path string) error { +func (bsr *blockStreamReader) MustInitFromFilePart(path string) { bsr.reset() path = filepath.Clean(path) - if err := bsr.ph.ReadMetadata(path); err != nil { - return fmt.Errorf("cannot read metadata from %q: %w", path, err) - } + bsr.ph.MustReadMetadata(path) metaindexPath := filepath.Join(path, metaindexFilename) metaindexFile := filestream.MustOpen(metaindexPath, true) @@ -170,8 +168,6 @@ func (bsr *blockStreamReader) InitFromFilePart(path string) error { bsr.indexReader = indexFile bsr.itemsReader = itemsFile bsr.lensReader = lensFile - - return nil } // MustClose closes the bsr. diff --git a/lib/mergeset/block_stream_writer.go b/lib/mergeset/block_stream_writer.go index df26e064a..8341b8318 100644 --- a/lib/mergeset/block_stream_writer.go +++ b/lib/mergeset/block_stream_writer.go @@ -60,7 +60,7 @@ func (bsw *blockStreamWriter) reset() { bsw.mrFirstItemCaught = false } -func (bsw *blockStreamWriter) InitFromInmemoryPart(mp *inmemoryPart, compressLevel int) { +func (bsw *blockStreamWriter) MustInitFromInmemoryPart(mp *inmemoryPart, compressLevel int) { bsw.reset() bsw.compressLevel = compressLevel diff --git a/lib/mergeset/inmemory_part.go b/lib/mergeset/inmemory_part.go index 1a8d1ade4..d798f4537 100644 --- a/lib/mergeset/inmemory_part.go +++ b/lib/mergeset/inmemory_part.go @@ -6,7 +6,6 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fs" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" ) type inmemoryPart struct { @@ -102,10 +101,7 @@ var inmemoryPartBytePool bytesutil.ByteBufferPool // It is unsafe re-using mp while the returned part is in use. func (mp *inmemoryPart) NewPart() *part { size := mp.size() - p, err := newPart(&mp.ph, "", size, mp.metaindexData.NewReader(), &mp.indexData, &mp.itemsData, &mp.lensData) - if err != nil { - logger.Panicf("BUG: cannot create a part from inmemoryPart: %s", err) - } + p := newPart(&mp.ph, "", size, mp.metaindexData.NewReader(), &mp.indexData, &mp.itemsData, &mp.lensData) return p } diff --git a/lib/mergeset/merge_test.go b/lib/mergeset/merge_test.go index 2298a1991..0b628e1d9 100644 --- a/lib/mergeset/merge_test.go +++ b/lib/mergeset/merge_test.go @@ -32,14 +32,14 @@ func TestMultilevelMerge(t *testing.T) { // First level merge var dstIP1 inmemoryPart var bsw1 blockStreamWriter - bsw1.InitFromInmemoryPart(&dstIP1, -5) + bsw1.MustInitFromInmemoryPart(&dstIP1, -5) if err := mergeBlockStreams(&dstIP1.ph, &bsw1, bsrs[:5], nil, nil, &itemsMerged); err != nil { t.Fatalf("cannot merge first level part 1: %s", err) } var dstIP2 inmemoryPart var bsw2 blockStreamWriter - bsw2.InitFromInmemoryPart(&dstIP2, -5) + bsw2.MustInitFromInmemoryPart(&dstIP2, -5) if err := mergeBlockStreams(&dstIP2.ph, &bsw2, bsrs[5:], nil, nil, &itemsMerged); err != nil { t.Fatalf("cannot merge first level part 2: %s", err) } @@ -56,7 +56,7 @@ func TestMultilevelMerge(t *testing.T) { newTestBlockStreamReader(&dstIP1), newTestBlockStreamReader(&dstIP2), } - bsw.InitFromInmemoryPart(&dstIP, 1) + bsw.MustInitFromInmemoryPart(&dstIP, 1) if err := mergeBlockStreams(&dstIP.ph, &bsw, bsrsTop, nil, nil, &itemsMerged); err != nil { t.Fatalf("cannot merge second level: %s", err) } @@ -76,7 +76,7 @@ func TestMergeForciblyStop(t *testing.T) { bsrs, _ := newTestInmemoryBlockStreamReaders(r, 20, 4000) var dstIP inmemoryPart var bsw blockStreamWriter - bsw.InitFromInmemoryPart(&dstIP, 1) + bsw.MustInitFromInmemoryPart(&dstIP, 1) ch := make(chan struct{}) var itemsMerged uint64 close(ch) @@ -125,7 +125,7 @@ func testMergeBlockStreamsSerial(r *rand.Rand, blocksToMerge, maxItemsPerBlock i var itemsMerged uint64 var dstIP inmemoryPart var bsw blockStreamWriter - bsw.InitFromInmemoryPart(&dstIP, -4) + bsw.MustInitFromInmemoryPart(&dstIP, -4) if err := mergeBlockStreams(&dstIP.ph, &bsw, bsrs, nil, nil, &itemsMerged); err != nil { return fmt.Errorf("cannot merge block streams: %w", err) } @@ -204,6 +204,6 @@ func newTestInmemoryBlockStreamReaders(r *rand.Rand, blocksCount, maxItemsPerBlo func newTestBlockStreamReader(ip *inmemoryPart) *blockStreamReader { var bsr blockStreamReader - bsr.InitFromInmemoryPart(ip) + bsr.MustInitFromInmemoryPart(ip) return &bsr } diff --git a/lib/mergeset/part.go b/lib/mergeset/part.go index 7ee86dffc..13e8c705b 100644 --- a/lib/mergeset/part.go +++ b/lib/mergeset/part.go @@ -1,7 +1,6 @@ package mergeset import ( - "fmt" "path/filepath" "sync" "unsafe" @@ -9,6 +8,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/blockcache" "github.com/VictoriaMetrics/VictoriaMetrics/lib/filestream" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fs" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/memory" ) @@ -67,11 +67,9 @@ type part struct { lensFile fs.MustReadAtCloser } -func openFilePart(path string) (*part, error) { +func mustOpenFilePart(path string) *part { var ph partHeader - if err := ph.ReadMetadata(path); err != nil { - return nil, fmt.Errorf("cannot read part metadata: %w", err) - } + ph.MustReadMetadata(path) metaindexPath := filepath.Join(path, metaindexFilename) metaindexFile := filestream.MustOpen(metaindexPath, true) @@ -93,11 +91,10 @@ func openFilePart(path string) (*part, error) { return newPart(&ph, path, size, metaindexFile, indexFile, itemsFile, lensFile) } -func newPart(ph *partHeader, path string, size uint64, metaindexReader filestream.ReadCloser, indexFile, itemsFile, lensFile fs.MustReadAtCloser) (*part, error) { - var errors []error +func newPart(ph *partHeader, path string, size uint64, metaindexReader filestream.ReadCloser, indexFile, itemsFile, lensFile fs.MustReadAtCloser) *part { mrs, err := unmarshalMetaindexRows(nil, metaindexReader) if err != nil { - errors = append(errors, fmt.Errorf("cannot unmarshal metaindexRows: %w", err)) + logger.Panicf("FATAL: cannot unmarshal metaindexRows from %q: %s", path, err) } metaindexReader.MustClose() @@ -111,13 +108,7 @@ func newPart(ph *partHeader, path string, size uint64, metaindexReader filestrea p.lensFile = lensFile p.ph.CopyFrom(ph) - if len(errors) > 0 { - // Return only the first error, since it has no sense in returning all errors. - err := fmt.Errorf("error opening part %s: %w", p.path, errors[0]) - p.MustClose() - return nil, err - } - return &p, nil + return &p } func (p *part) MustClose() { diff --git a/lib/mergeset/part_header.go b/lib/mergeset/part_header.go index 23979c42f..28f539c95 100644 --- a/lib/mergeset/part_header.go +++ b/lib/mergeset/part_header.go @@ -78,39 +78,37 @@ func (ph *partHeader) CopyFrom(src *partHeader) { ph.lastItem = append(ph.lastItem[:0], src.lastItem...) } -func (ph *partHeader) ReadMetadata(partPath string) error { +func (ph *partHeader) MustReadMetadata(partPath string) { ph.Reset() // Read ph fields from metadata. metadataPath := filepath.Join(partPath, metadataFilename) metadata, err := os.ReadFile(metadataPath) if err != nil { - return fmt.Errorf("cannot read %q: %w", metadataPath, err) + logger.Panicf("FATAL: cannot read %q: %s", metadataPath, err) } var phj partHeaderJSON if err := json.Unmarshal(metadata, &phj); err != nil { - return fmt.Errorf("cannot parse %q: %w", metadataPath, err) + logger.Panicf("FATAL: cannot parse %q: %s", metadataPath, err) } if phj.ItemsCount <= 0 { - return fmt.Errorf("part %q cannot contain zero items", partPath) + logger.Panicf("FATAL: part %q cannot contain zero items", partPath) } ph.itemsCount = phj.ItemsCount if phj.BlocksCount <= 0 { - return fmt.Errorf("part %q cannot contain zero blocks", partPath) + logger.Panicf("FATAL: part %q cannot contain zero blocks", partPath) } if phj.BlocksCount > phj.ItemsCount { - return fmt.Errorf("the number of blocks cannot exceed the number of items in the part %q; got blocksCount=%d, itemsCount=%d", + logger.Panicf("FATAL: the number of blocks cannot exceed the number of items in the part %q; got blocksCount=%d, itemsCount=%d", partPath, phj.BlocksCount, phj.ItemsCount) } ph.blocksCount = phj.BlocksCount ph.firstItem = append(ph.firstItem[:0], phj.FirstItem...) ph.lastItem = append(ph.lastItem[:0], phj.LastItem...) - - return nil } func (ph *partHeader) MustWriteMetadata(partPath string) { diff --git a/lib/mergeset/part_search_test.go b/lib/mergeset/part_search_test.go index 72434a402..b61e9b552 100644 --- a/lib/mergeset/part_search_test.go +++ b/lib/mergeset/part_search_test.go @@ -151,7 +151,7 @@ func newTestPart(r *rand.Rand, blocksCount, maxItemsPerBlock int) (*part, []stri var itemsMerged uint64 var ip inmemoryPart var bsw blockStreamWriter - bsw.InitFromInmemoryPart(&ip, -3) + bsw.MustInitFromInmemoryPart(&ip, -3) if err := mergeBlockStreams(&ip.ph, &bsw, bsrs, nil, nil, &itemsMerged); err != nil { return nil, nil, fmt.Errorf("cannot merge blocks: %w", err) } @@ -159,9 +159,6 @@ func newTestPart(r *rand.Rand, blocksCount, maxItemsPerBlock int) (*part, []stri return nil, nil, fmt.Errorf("unexpected itemsMerged; got %d; want %d", itemsMerged, len(items)) } size := ip.size() - p, err := newPart(&ip.ph, "partName", size, ip.metaindexData.NewReader(), &ip.indexData, &ip.itemsData, &ip.lensData) - if err != nil { - return nil, nil, fmt.Errorf("cannot create part: %w", err) - } + p := newPart(&ip.ph, "partName", size, ip.metaindexData.NewReader(), &ip.indexData, &ip.itemsData, &ip.lensData) return p, items, nil } diff --git a/lib/mergeset/table.go b/lib/mergeset/table.go index 10f2e97b0..96153d9a8 100644 --- a/lib/mergeset/table.go +++ b/lib/mergeset/table.go @@ -878,7 +878,7 @@ func (tb *Table) createInmemoryPart(ibs []*inmemoryBlock) *partWrapper { continue } bsr := getBlockStreamReader() - bsr.InitFromInmemoryBlock(ib) + bsr.MustInitFromInmemoryBlock(ib) putInmemoryBlock(ib) bsrs = append(bsrs, bsr) } @@ -899,7 +899,7 @@ func (tb *Table) createInmemoryPart(ibs []*inmemoryBlock) *partWrapper { compressLevel := getCompressLevel(outItemsCount) bsw := getBlockStreamWriter() mpDst := &inmemoryPart{} - bsw.InitFromInmemoryPart(mpDst, compressLevel) + bsw.MustInitFromInmemoryPart(mpDst, compressLevel) // Merge parts. // The merge shouldn't be interrupted by stopCh, @@ -1093,16 +1093,7 @@ func (tb *Table) mergeParts(pws []*partWrapper, stopCh <-chan struct{}, isFinal } // Prepare BlockStreamReaders for source parts. - bsrs, err := openBlockStreamReaders(pws) - if err != nil { - logger.Panicf("FATAL: cannot open source parts for merging: %s", err) - } - closeBlockStreamReaders := func() { - for _, bsr := range bsrs { - putBlockStreamReader(bsr) - } - bsrs = nil - } + bsrs := mustOpenBlockStreamReaders(pws) // Prepare BlockStreamWriter for destination part. srcSize := uint64(0) @@ -1118,7 +1109,7 @@ func (tb *Table) mergeParts(pws []*partWrapper, stopCh <-chan struct{}, isFinal var mpNew *inmemoryPart if dstPartType == partInmemory { mpNew = &inmemoryPart{} - bsw.InitFromInmemoryPart(mpNew, compressLevel) + bsw.MustInitFromInmemoryPart(mpNew, compressLevel) } else { nocache := srcItemsCount > maxItemsPerCachedPart() bsw.MustInitFromFilePart(dstPartPath, nocache, compressLevel) @@ -1127,7 +1118,9 @@ func (tb *Table) mergeParts(pws []*partWrapper, stopCh <-chan struct{}, isFinal // Merge source parts to destination part. ph, err := tb.mergePartsInternal(dstPartPath, bsw, bsrs, dstPartType, stopCh) putBlockStreamWriter(bsw) - closeBlockStreamReaders() + for _, bsr := range bsrs { + putBlockStreamReader(bsr) + } if err != nil { tb.releasePartsToMerge(pws) return err @@ -1193,23 +1186,18 @@ func getDstPartType(pws []*partWrapper, isFinal bool) partType { return partInmemory } -func openBlockStreamReaders(pws []*partWrapper) ([]*blockStreamReader, error) { +func mustOpenBlockStreamReaders(pws []*partWrapper) []*blockStreamReader { bsrs := make([]*blockStreamReader, 0, len(pws)) for _, pw := range pws { bsr := getBlockStreamReader() if pw.mp != nil { - bsr.InitFromInmemoryPart(pw.mp) + bsr.MustInitFromInmemoryPart(pw.mp) } else { - if err := bsr.InitFromFilePart(pw.p.path); err != nil { - for _, bsr := range bsrs { - putBlockStreamReader(bsr) - } - return nil, fmt.Errorf("cannot open source part for merging: %w", err) - } + bsr.MustInitFromFilePart(pw.p.path) } bsrs = append(bsrs, bsr) } - return bsrs, nil + return bsrs } func (tb *Table) mergePartsInternal(dstPartPath string, bsw *blockStreamWriter, bsrs []*blockStreamReader, dstPartType partType, stopCh <-chan struct{}) (*partHeader, error) { @@ -1251,10 +1239,7 @@ func (tb *Table) openCreatedPart(pws []*partWrapper, mpNew *inmemoryPart, dstPar return pwNew } // Open the created part from disk. - pNew, err := openFilePart(dstPartPath) - if err != nil { - logger.Panicf("FATAL: cannot open the merged part: %s", err) - } + pNew := mustOpenFilePart(dstPartPath) pwNew := &partWrapper{ p: pNew, refCount: 1, @@ -1413,11 +1398,7 @@ func openParts(path string) ([]*partWrapper, error) { var pws []*partWrapper for _, partName := range partNames { partPath := filepath.Join(path, partName) - p, err := openFilePart(partPath) - if err != nil { - mustCloseParts(pws) - return nil, fmt.Errorf("cannot open part %q: %w", partPath, err) - } + p := mustOpenFilePart(partPath) pw := &partWrapper{ p: p, refCount: 1, @@ -1428,15 +1409,6 @@ func openParts(path string) ([]*partWrapper, error) { return pws, nil } -func mustCloseParts(pws []*partWrapper) { - for _, pw := range pws { - if pw.refCount != 1 { - logger.Panicf("BUG: unexpected refCount when closing part %q: %d; want 1", pw.p.path, pw.refCount) - } - pw.p.MustClose() - } -} - // CreateSnapshotAt creates tb snapshot in the given dstDir. // // Snapshot is created using linux hard links, so it is usually created very quickly. diff --git a/lib/storage/block_stream_reader.go b/lib/storage/block_stream_reader.go index 8087c2388..a8b61d84d 100644 --- a/lib/storage/block_stream_reader.go +++ b/lib/storage/block_stream_reader.go @@ -105,8 +105,8 @@ func (bsr *blockStreamReader) String() string { return bsr.ph.String() } -// InitFromInmemoryPart initializes bsr from the given mp. -func (bsr *blockStreamReader) InitFromInmemoryPart(mp *inmemoryPart) { +// MustInitFromInmemoryPart initializes bsr from the given mp. +func (bsr *blockStreamReader) MustInitFromInmemoryPart(mp *inmemoryPart) { bsr.reset() bsr.ph = mp.ph @@ -121,18 +121,16 @@ func (bsr *blockStreamReader) InitFromInmemoryPart(mp *inmemoryPart) { } } -// InitFromFilePart initializes bsr from a file-based part on the given path. +// MustInitFromFilePart initializes bsr from a file-based part on the given path. // // Files in the part are always read without OS cache pollution, // since they are usually deleted after the merge. -func (bsr *blockStreamReader) InitFromFilePart(path string) error { +func (bsr *blockStreamReader) MustInitFromFilePart(path string) { bsr.reset() path = filepath.Clean(path) - if err := bsr.ph.ReadMetadata(path); err != nil { - return fmt.Errorf("cannot parse path to part: %w", err) - } + bsr.ph.MustReadMetadata(path) timestampsPath := filepath.Join(path, timestampsFilename) timestampsFile := filestream.MustOpen(timestampsPath, true) @@ -156,8 +154,6 @@ func (bsr *blockStreamReader) InitFromFilePart(path string) error { bsr.valuesReader = valuesFile bsr.indexReader = indexFile bsr.mrs = mrs - - return nil } // MustClose closes the bsr. diff --git a/lib/storage/block_stream_reader_test.go b/lib/storage/block_stream_reader_test.go index f1c1398bc..5a05449e4 100644 --- a/lib/storage/block_stream_reader_test.go +++ b/lib/storage/block_stream_reader_test.go @@ -106,7 +106,7 @@ func TestBlockStreamReaderReadConcurrent(t *testing.T) { func testBlockStreamReaderReadRows(mp *inmemoryPart, rows []rawRow) error { var bsr blockStreamReader - bsr.InitFromInmemoryPart(mp) + bsr.MustInitFromInmemoryPart(mp) rowsCount := 0 for bsr.NextBlock() { if err := bsr.Block.UnmarshalData(); err != nil { @@ -155,6 +155,6 @@ func newTestBlockStreamReader(t *testing.T, rows []rawRow) *blockStreamReader { var mp inmemoryPart mp.InitFromRows(rows) var bsr blockStreamReader - bsr.InitFromInmemoryPart(&mp) + bsr.MustInitFromInmemoryPart(&mp) return &bsr } diff --git a/lib/storage/block_stream_reader_timing_test.go b/lib/storage/block_stream_reader_timing_test.go index 7628fac7b..39ae16607 100644 --- a/lib/storage/block_stream_reader_timing_test.go +++ b/lib/storage/block_stream_reader_timing_test.go @@ -28,7 +28,7 @@ func benchmarkBlockStreamReader(b *testing.B, mp *inmemoryPart, readRows bool) { var bsr blockStreamReader blockNum := 0 for pb.Next() { - bsr.InitFromInmemoryPart(mp) + bsr.MustInitFromInmemoryPart(mp) for bsr.NextBlock() { if !readRows { continue diff --git a/lib/storage/block_stream_writer.go b/lib/storage/block_stream_writer.go index 6c319d7f8..0029534e7 100644 --- a/lib/storage/block_stream_writer.go +++ b/lib/storage/block_stream_writer.go @@ -66,8 +66,8 @@ func (bsw *blockStreamWriter) reset() { bsw.prevTimestampsBlockOffset = 0 } -// InitFromInmemoryPart initializes bsw from inmemory part. -func (bsw *blockStreamWriter) InitFromInmemoryPart(mp *inmemoryPart, compressLevel int) { +// MustInitFromInmemoryPart initializes bsw from inmemory part. +func (bsw *blockStreamWriter) MustInitFromInmemoryPart(mp *inmemoryPart, compressLevel int) { bsw.reset() bsw.compressLevel = compressLevel diff --git a/lib/storage/block_stream_writer_timing_test.go b/lib/storage/block_stream_writer_timing_test.go index 95ecbbe35..80df38cd6 100644 --- a/lib/storage/block_stream_writer_timing_test.go +++ b/lib/storage/block_stream_writer_timing_test.go @@ -47,7 +47,7 @@ func benchmarkBlockStreamWriter(b *testing.B, ebs []Block, rowsCount int, writeR } } - bsw.InitFromInmemoryPart(&mp, -5) + bsw.MustInitFromInmemoryPart(&mp, -5) for i := range ebsCopy { bsw.WriteExternalBlock(&ebsCopy[i], &ph, &rowsMerged) } @@ -66,7 +66,7 @@ func newBenchBlocks(rows []rawRow) []Block { mp := newTestInmemoryPart(rows) var bsr blockStreamReader - bsr.InitFromInmemoryPart(mp) + bsr.MustInitFromInmemoryPart(mp) for bsr.NextBlock() { var eb Block eb.CopyFrom(&bsr.Block) diff --git a/lib/storage/inmemory_part.go b/lib/storage/inmemory_part.go index 2e3c48084..bc3b5b82a 100644 --- a/lib/storage/inmemory_part.go +++ b/lib/storage/inmemory_part.go @@ -73,7 +73,7 @@ func (mp *inmemoryPart) InitFromRows(rows []rawRow) { // // It is safe calling NewPart multiple times. // It is unsafe re-using mp while the returned part is in use. -func (mp *inmemoryPart) NewPart() (*part, error) { +func (mp *inmemoryPart) NewPart() *part { size := mp.size() return newPart(&mp.ph, "", size, mp.metaindexData.NewReader(), &mp.timestampsData, &mp.valuesData, &mp.indexData) } diff --git a/lib/storage/inmemory_part_test.go b/lib/storage/inmemory_part_test.go index 7047fc9d4..1ed1ebb90 100644 --- a/lib/storage/inmemory_part_test.go +++ b/lib/storage/inmemory_part_test.go @@ -78,7 +78,7 @@ func testInmemoryPartInitFromRows(t *testing.T, rows []rawRow, blocksCount int) } var bsr blockStreamReader - bsr.InitFromInmemoryPart(&mp) + bsr.MustInitFromInmemoryPart(&mp) rowsCount := 0 blockNum := 0 diff --git a/lib/storage/merge_test.go b/lib/storage/merge_test.go index e572babca..35a6d7d94 100644 --- a/lib/storage/merge_test.go +++ b/lib/storage/merge_test.go @@ -369,7 +369,7 @@ func TestMergeForciblyStop(t *testing.T) { var mp inmemoryPart var bsw blockStreamWriter - bsw.InitFromInmemoryPart(&mp, -5) + bsw.MustInitFromInmemoryPart(&mp, -5) ch := make(chan struct{}) var rowsMerged, rowsDeleted uint64 close(ch) @@ -392,7 +392,7 @@ func testMergeBlockStreams(t *testing.T, bsrs []*blockStreamReader, expectedBloc var mp inmemoryPart var bsw blockStreamWriter - bsw.InitFromInmemoryPart(&mp, -5) + bsw.MustInitFromInmemoryPart(&mp, -5) strg := newTestStorage() var rowsMerged, rowsDeleted uint64 @@ -418,7 +418,7 @@ func testMergeBlockStreams(t *testing.T, bsrs []*blockStreamReader, expectedBloc } var bsr1 blockStreamReader - bsr1.InitFromInmemoryPart(&mp) + bsr1.MustInitFromInmemoryPart(&mp) blocksCount := 0 rowsCount := 0 var prevTSID TSID diff --git a/lib/storage/merge_timing_test.go b/lib/storage/merge_timing_test.go index 983bbb69b..34ddde98f 100644 --- a/lib/storage/merge_timing_test.go +++ b/lib/storage/merge_timing_test.go @@ -38,10 +38,10 @@ func benchmarkMergeBlockStreams(b *testing.B, mps []*inmemoryPart, rowsPerLoop i } for pb.Next() { for i, mp := range mps { - bsrs[i].InitFromInmemoryPart(mp) + bsrs[i].MustInitFromInmemoryPart(mp) } mpOut.Reset() - bsw.InitFromInmemoryPart(&mpOut, -5) + bsw.MustInitFromInmemoryPart(&mpOut, -5) if err := mergeBlockStreams(&mpOut.ph, &bsw, bsrs, nil, strg, 0, &rowsMerged, &rowsDeleted); err != nil { panic(fmt.Errorf("cannot merge block streams: %w", err)) } diff --git a/lib/storage/part.go b/lib/storage/part.go index 3882bc79f..4efd74382 100644 --- a/lib/storage/part.go +++ b/lib/storage/part.go @@ -1,7 +1,6 @@ package storage import ( - "fmt" "path/filepath" "sync" "unsafe" @@ -9,6 +8,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/blockcache" "github.com/VictoriaMetrics/VictoriaMetrics/lib/filestream" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fs" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/memory" ) @@ -45,14 +45,12 @@ type part struct { metaindex []metaindexRow } -// openFilePart opens file-based part from the given path. -func openFilePart(path string) (*part, error) { +// mustOpenFilePart opens file-based part from the given path. +func mustOpenFilePart(path string) *part { path = filepath.Clean(path) var ph partHeader - if err := ph.ReadMetadata(path); err != nil { - return nil, fmt.Errorf("cannot parse path to part: %w", err) - } + ph.MustReadMetadata(path) timestampsPath := filepath.Join(path, timestampsFilename) timestampsFile := fs.MustOpenReaderAt(timestampsPath) @@ -78,11 +76,10 @@ func openFilePart(path string) (*part, error) { // // The returned part calls MustClose on all the files passed to newPart // when calling part.MustClose. -func newPart(ph *partHeader, path string, size uint64, metaindexReader filestream.ReadCloser, timestampsFile, valuesFile, indexFile fs.MustReadAtCloser) (*part, error) { - var errors []error +func newPart(ph *partHeader, path string, size uint64, metaindexReader filestream.ReadCloser, timestampsFile, valuesFile, indexFile fs.MustReadAtCloser) *part { metaindex, err := unmarshalMetaindexRows(nil, metaindexReader) if err != nil { - errors = append(errors, fmt.Errorf("cannot unmarshal metaindex data: %w", err)) + logger.Panicf("FATAL: cannot unmarshal metaindex data from %q: %s", path, err) } metaindexReader.MustClose() @@ -95,14 +92,7 @@ func newPart(ph *partHeader, path string, size uint64, metaindexReader filestrea p.indexFile = indexFile p.metaindex = metaindex - if len(errors) > 0 { - // Return only the first error, since it has no sense in returning all errors. - err = fmt.Errorf("cannot initialize part %q: %w", &p, errors[0]) - p.MustClose() - return nil, err - } - - return &p, nil + return &p } // String returns human-readable representation of p. diff --git a/lib/storage/part_header.go b/lib/storage/part_header.go index cbab4c8ed..b54bf39d5 100644 --- a/lib/storage/part_header.go +++ b/lib/storage/part_header.go @@ -131,7 +131,7 @@ func (ph *partHeader) ParseFromPath(path string) error { return nil } -func (ph *partHeader) ReadMetadata(partPath string) error { +func (ph *partHeader) MustReadMetadata(partPath string) { ph.Reset() metadataPath := filepath.Join(partPath, metadataFilename) @@ -140,29 +140,29 @@ func (ph *partHeader) ReadMetadata(partPath string) error { if os.IsNotExist(err) { // This is a part created before v1.90.0. // Fall back to reading the metadata from the partPath itsel - return ph.ParseFromPath(partPath) + if err := ph.ParseFromPath(partPath); err != nil { + logger.Panicf("FATAL: cannot parse metadata from %q: %s", partPath, err) + } } - return fmt.Errorf("cannot read %q: %w", metadataPath, err) + logger.Panicf("FATAL: cannot read %q: %s", metadataPath, err) } if err := json.Unmarshal(metadata, ph); err != nil { - return fmt.Errorf("cannot parse %q: %w", metadataPath, err) + logger.Panicf("FATAL: cannot parse %q: %s", metadataPath, err) } // Perform various checks if ph.MinTimestamp > ph.MaxTimestamp { - return fmt.Errorf("minTimestamp cannot exceed maxTimestamp; got %d vs %d", ph.MinTimestamp, ph.MaxTimestamp) + logger.Panicf("FATAL: minTimestamp cannot exceed maxTimestamp at %q; got %d vs %d", metadataPath, ph.MinTimestamp, ph.MaxTimestamp) } if ph.RowsCount <= 0 { - return fmt.Errorf("rowsCount must be greater than 0; got %d", ph.RowsCount) + logger.Panicf("FATAL: rowsCount must be greater than 0 at %q; got %d", metadataPath, ph.RowsCount) } if ph.BlocksCount <= 0 { - return fmt.Errorf("blocksCount must be greater than 0; got %d", ph.BlocksCount) + logger.Panicf("FATAL: blocksCount must be greater than 0 at %q; got %d", metadataPath, ph.BlocksCount) } if ph.BlocksCount > ph.RowsCount { - return fmt.Errorf("blocksCount cannot be bigger than rowsCount; got blocksCount=%d, rowsCount=%d", ph.BlocksCount, ph.RowsCount) + logger.Panicf("FATAL: blocksCount cannot be bigger than rowsCount at %q; got blocksCount=%d, rowsCount=%d", metadataPath, ph.BlocksCount, ph.RowsCount) } - - return nil } func (ph *partHeader) MustWriteMetadata(partPath string) { diff --git a/lib/storage/part_search_test.go b/lib/storage/part_search_test.go index 6847225c9..67a82b014 100644 --- a/lib/storage/part_search_test.go +++ b/lib/storage/part_search_test.go @@ -1425,9 +1425,6 @@ func getTestExpectedRawBlocks(rowsOriginal []rawRow, tsids []TSID, tr TimeRange) func newTestPart(rows []rawRow) *part { mp := newTestInmemoryPart(rows) - p, err := mp.NewPart() - if err != nil { - panic(fmt.Errorf("cannot create new part: %w", err)) - } + p := mp.NewPart() return p } diff --git a/lib/storage/partition.go b/lib/storage/partition.go index 7f54859a1..b4cb31749 100644 --- a/lib/storage/partition.go +++ b/lib/storage/partition.go @@ -723,10 +723,7 @@ func (pt *partition) createInmemoryPart(rows []rawRow) *partWrapper { } func newPartWrapperFromInmemoryPart(mp *inmemoryPart, flushToDiskDeadline time.Time) *partWrapper { - p, err := mp.NewPart() - if err != nil { - logger.Panicf("BUG: cannot create part from %q: %s", &mp.ph, err) - } + p := mp.NewPart() pw := &partWrapper{ p: p, mp: mp, @@ -1268,16 +1265,7 @@ func (pt *partition) mergeParts(pws []*partWrapper, stopCh <-chan struct{}, isFi } // Prepare BlockStreamReaders for source parts. - bsrs, err := openBlockStreamReaders(pws) - if err != nil { - logger.Panicf("FATAL: cannot open source parts for merging: %s", err) - } - closeBlockStreamReaders := func() { - for _, bsr := range bsrs { - putBlockStreamReader(bsr) - } - bsrs = nil - } + bsrs := mustOpenBlockStreamReaders(pws) // Prepare BlockStreamWriter for destination part. srcSize := uint64(0) @@ -1294,7 +1282,7 @@ func (pt *partition) mergeParts(pws []*partWrapper, stopCh <-chan struct{}, isFi var mpNew *inmemoryPart if dstPartType == partInmemory { mpNew = getInmemoryPart() - bsw.InitFromInmemoryPart(mpNew, compressLevel) + bsw.MustInitFromInmemoryPart(mpNew, compressLevel) } else { if dstPartPath == "" { logger.Panicf("BUG: dstPartPath must be non-empty") @@ -1306,7 +1294,9 @@ func (pt *partition) mergeParts(pws []*partWrapper, stopCh <-chan struct{}, isFi // Merge source parts to destination part. ph, err := pt.mergePartsInternal(dstPartPath, bsw, bsrs, dstPartType, stopCh) putBlockStreamWriter(bsw) - closeBlockStreamReaders() + for _, bsr := range bsrs { + putBlockStreamReader(bsr) + } if err != nil { pt.releasePartsToMerge(pws) return err @@ -1401,23 +1391,18 @@ func (pt *partition) getDstPartPath(dstPartType partType, mergeIdx uint64) strin return dstPartPath } -func openBlockStreamReaders(pws []*partWrapper) ([]*blockStreamReader, error) { +func mustOpenBlockStreamReaders(pws []*partWrapper) []*blockStreamReader { bsrs := make([]*blockStreamReader, 0, len(pws)) for _, pw := range pws { bsr := getBlockStreamReader() if pw.mp != nil { - bsr.InitFromInmemoryPart(pw.mp) + bsr.MustInitFromInmemoryPart(pw.mp) } else { - if err := bsr.InitFromFilePart(pw.p.path); err != nil { - for _, bsr := range bsrs { - putBlockStreamReader(bsr) - } - return nil, fmt.Errorf("cannot open source part for merging: %w", err) - } + bsr.MustInitFromFilePart(pw.p.path) } bsrs = append(bsrs, bsr) } - return bsrs, nil + return bsrs } func (pt *partition) mergePartsInternal(dstPartPath string, bsw *blockStreamWriter, bsrs []*blockStreamReader, dstPartType partType, stopCh <-chan struct{}) (*partHeader, error) { @@ -1476,10 +1461,7 @@ func (pt *partition) openCreatedPart(ph *partHeader, pws []*partWrapper, mpNew * return pwNew } // Open the created part from disk. - pNew, err := openFilePart(dstPartPath) - if err != nil { - logger.Panicf("FATAL: cannot open merged part %s: %s", dstPartPath, err) - } + pNew := mustOpenFilePart(dstPartPath) pwNew := &partWrapper{ p: pNew, refCount: 1, @@ -1821,11 +1803,7 @@ func openParts(path string, partNames []string) ([]*partWrapper, error) { var pws []*partWrapper for _, partName := range partNames { partPath := filepath.Join(path, partName) - p, err := openFilePart(partPath) - if err != nil { - mustCloseParts(pws) - return nil, fmt.Errorf("cannot open part %q: %w", partPath, err) - } + p := mustOpenFilePart(partPath) pw := &partWrapper{ p: p, refCount: 1, diff --git a/lib/storage/raw_row.go b/lib/storage/raw_row.go index 86aeb0533..0f267da03 100644 --- a/lib/storage/raw_row.go +++ b/lib/storage/raw_row.go @@ -89,7 +89,7 @@ func (rrm *rawRowsMarshaler) marshalToInmemoryPart(mp *inmemoryPart, rows []rawR // Use the minimum compression level for first-level in-memory blocks, // since they are going to be re-compressed during subsequent merges. const compressLevel = -5 // See https://github.com/facebook/zstd/releases/tag/v1.3.4 - rrm.bsw.InitFromInmemoryPart(mp, compressLevel) + rrm.bsw.MustInitFromInmemoryPart(mp, compressLevel) ph := &mp.ph ph.Reset()