diff --git a/lib/mergeset/block_stream_reader.go b/lib/mergeset/block_stream_reader.go index 8a4f729fe..e0cd81af5 100644 --- a/lib/mergeset/block_stream_reader.go +++ b/lib/mergeset/block_stream_reader.go @@ -17,7 +17,7 @@ type blockStreamReader struct { // Block contains the current block if Next returned true. Block inmemoryBlock - // isInmemoryBlock is set to true if bsr was initialized with InitFromInmemoryBlock(). + // isInmemoryBlock is set to true if bsr was initialized with MustInitFromInmemoryBlock(). isInmemoryBlock bool // The index of the current item in the Block, which is returned from CurrItem() @@ -103,16 +103,16 @@ func (bsr *blockStreamReader) String() string { return bsr.ph.String() } -// InitFromInmemoryBlock initializes bsr from the given ib. -func (bsr *blockStreamReader) InitFromInmemoryBlock(ib *inmemoryBlock) { +// MustInitFromInmemoryBlock initializes bsr from the given ib. +func (bsr *blockStreamReader) MustInitFromInmemoryBlock(ib *inmemoryBlock) { bsr.reset() bsr.Block.CopyFrom(ib) bsr.Block.SortItems() bsr.isInmemoryBlock = true } -// InitFromInmemoryPart initializes bsr from the given mp. -func (bsr *blockStreamReader) InitFromInmemoryPart(mp *inmemoryPart) { +// MustInitFromInmemoryPart initializes bsr from the given mp. +func (bsr *blockStreamReader) MustInitFromInmemoryPart(mp *inmemoryPart) { bsr.reset() var err error @@ -134,18 +134,16 @@ func (bsr *blockStreamReader) InitFromInmemoryPart(mp *inmemoryPart) { } } -// InitFromFilePart initializes bsr from a file-based part on the given path. +// MustInitFromFilePart initializes bsr from a file-based part on the given path. // // Part files are read without OS cache pollution, since the part is usually // deleted after the merge. -func (bsr *blockStreamReader) InitFromFilePart(path string) error { +func (bsr *blockStreamReader) MustInitFromFilePart(path string) { bsr.reset() path = filepath.Clean(path) - if err := bsr.ph.ReadMetadata(path); err != nil { - return fmt.Errorf("cannot read metadata from %q: %w", path, err) - } + bsr.ph.MustReadMetadata(path) metaindexPath := filepath.Join(path, metaindexFilename) metaindexFile := filestream.MustOpen(metaindexPath, true) @@ -170,8 +168,6 @@ func (bsr *blockStreamReader) InitFromFilePart(path string) error { bsr.indexReader = indexFile bsr.itemsReader = itemsFile bsr.lensReader = lensFile - - return nil } // MustClose closes the bsr. diff --git a/lib/mergeset/block_stream_writer.go b/lib/mergeset/block_stream_writer.go index df26e064a..8341b8318 100644 --- a/lib/mergeset/block_stream_writer.go +++ b/lib/mergeset/block_stream_writer.go @@ -60,7 +60,7 @@ func (bsw *blockStreamWriter) reset() { bsw.mrFirstItemCaught = false } -func (bsw *blockStreamWriter) InitFromInmemoryPart(mp *inmemoryPart, compressLevel int) { +func (bsw *blockStreamWriter) MustInitFromInmemoryPart(mp *inmemoryPart, compressLevel int) { bsw.reset() bsw.compressLevel = compressLevel diff --git a/lib/mergeset/inmemory_part.go b/lib/mergeset/inmemory_part.go index 1a8d1ade4..d798f4537 100644 --- a/lib/mergeset/inmemory_part.go +++ b/lib/mergeset/inmemory_part.go @@ -6,7 +6,6 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fs" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" ) type inmemoryPart struct { @@ -102,10 +101,7 @@ var inmemoryPartBytePool bytesutil.ByteBufferPool // It is unsafe re-using mp while the returned part is in use. func (mp *inmemoryPart) NewPart() *part { size := mp.size() - p, err := newPart(&mp.ph, "", size, mp.metaindexData.NewReader(), &mp.indexData, &mp.itemsData, &mp.lensData) - if err != nil { - logger.Panicf("BUG: cannot create a part from inmemoryPart: %s", err) - } + p := newPart(&mp.ph, "", size, mp.metaindexData.NewReader(), &mp.indexData, &mp.itemsData, &mp.lensData) return p } diff --git a/lib/mergeset/merge_test.go b/lib/mergeset/merge_test.go index 2298a1991..0b628e1d9 100644 --- a/lib/mergeset/merge_test.go +++ b/lib/mergeset/merge_test.go @@ -32,14 +32,14 @@ func TestMultilevelMerge(t *testing.T) { // First level merge var dstIP1 inmemoryPart var bsw1 blockStreamWriter - bsw1.InitFromInmemoryPart(&dstIP1, -5) + bsw1.MustInitFromInmemoryPart(&dstIP1, -5) if err := mergeBlockStreams(&dstIP1.ph, &bsw1, bsrs[:5], nil, nil, &itemsMerged); err != nil { t.Fatalf("cannot merge first level part 1: %s", err) } var dstIP2 inmemoryPart var bsw2 blockStreamWriter - bsw2.InitFromInmemoryPart(&dstIP2, -5) + bsw2.MustInitFromInmemoryPart(&dstIP2, -5) if err := mergeBlockStreams(&dstIP2.ph, &bsw2, bsrs[5:], nil, nil, &itemsMerged); err != nil { t.Fatalf("cannot merge first level part 2: %s", err) } @@ -56,7 +56,7 @@ func TestMultilevelMerge(t *testing.T) { newTestBlockStreamReader(&dstIP1), newTestBlockStreamReader(&dstIP2), } - bsw.InitFromInmemoryPart(&dstIP, 1) + bsw.MustInitFromInmemoryPart(&dstIP, 1) if err := mergeBlockStreams(&dstIP.ph, &bsw, bsrsTop, nil, nil, &itemsMerged); err != nil { t.Fatalf("cannot merge second level: %s", err) } @@ -76,7 +76,7 @@ func TestMergeForciblyStop(t *testing.T) { bsrs, _ := newTestInmemoryBlockStreamReaders(r, 20, 4000) var dstIP inmemoryPart var bsw blockStreamWriter - bsw.InitFromInmemoryPart(&dstIP, 1) + bsw.MustInitFromInmemoryPart(&dstIP, 1) ch := make(chan struct{}) var itemsMerged uint64 close(ch) @@ -125,7 +125,7 @@ func testMergeBlockStreamsSerial(r *rand.Rand, blocksToMerge, maxItemsPerBlock i var itemsMerged uint64 var dstIP inmemoryPart var bsw blockStreamWriter - bsw.InitFromInmemoryPart(&dstIP, -4) + bsw.MustInitFromInmemoryPart(&dstIP, -4) if err := mergeBlockStreams(&dstIP.ph, &bsw, bsrs, nil, nil, &itemsMerged); err != nil { return fmt.Errorf("cannot merge block streams: %w", err) } @@ -204,6 +204,6 @@ func newTestInmemoryBlockStreamReaders(r *rand.Rand, blocksCount, maxItemsPerBlo func newTestBlockStreamReader(ip *inmemoryPart) *blockStreamReader { var bsr blockStreamReader - bsr.InitFromInmemoryPart(ip) + bsr.MustInitFromInmemoryPart(ip) return &bsr } diff --git a/lib/mergeset/part.go b/lib/mergeset/part.go index 7ee86dffc..13e8c705b 100644 --- a/lib/mergeset/part.go +++ b/lib/mergeset/part.go @@ -1,7 +1,6 @@ package mergeset import ( - "fmt" "path/filepath" "sync" "unsafe" @@ -9,6 +8,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/blockcache" "github.com/VictoriaMetrics/VictoriaMetrics/lib/filestream" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fs" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/memory" ) @@ -67,11 +67,9 @@ type part struct { lensFile fs.MustReadAtCloser } -func openFilePart(path string) (*part, error) { +func mustOpenFilePart(path string) *part { var ph partHeader - if err := ph.ReadMetadata(path); err != nil { - return nil, fmt.Errorf("cannot read part metadata: %w", err) - } + ph.MustReadMetadata(path) metaindexPath := filepath.Join(path, metaindexFilename) metaindexFile := filestream.MustOpen(metaindexPath, true) @@ -93,11 +91,10 @@ func openFilePart(path string) (*part, error) { return newPart(&ph, path, size, metaindexFile, indexFile, itemsFile, lensFile) } -func newPart(ph *partHeader, path string, size uint64, metaindexReader filestream.ReadCloser, indexFile, itemsFile, lensFile fs.MustReadAtCloser) (*part, error) { - var errors []error +func newPart(ph *partHeader, path string, size uint64, metaindexReader filestream.ReadCloser, indexFile, itemsFile, lensFile fs.MustReadAtCloser) *part { mrs, err := unmarshalMetaindexRows(nil, metaindexReader) if err != nil { - errors = append(errors, fmt.Errorf("cannot unmarshal metaindexRows: %w", err)) + logger.Panicf("FATAL: cannot unmarshal metaindexRows from %q: %s", path, err) } metaindexReader.MustClose() @@ -111,13 +108,7 @@ func newPart(ph *partHeader, path string, size uint64, metaindexReader filestrea p.lensFile = lensFile p.ph.CopyFrom(ph) - if len(errors) > 0 { - // Return only the first error, since it has no sense in returning all errors. - err := fmt.Errorf("error opening part %s: %w", p.path, errors[0]) - p.MustClose() - return nil, err - } - return &p, nil + return &p } func (p *part) MustClose() { diff --git a/lib/mergeset/part_header.go b/lib/mergeset/part_header.go index 23979c42f..28f539c95 100644 --- a/lib/mergeset/part_header.go +++ b/lib/mergeset/part_header.go @@ -78,39 +78,37 @@ func (ph *partHeader) CopyFrom(src *partHeader) { ph.lastItem = append(ph.lastItem[:0], src.lastItem...) } -func (ph *partHeader) ReadMetadata(partPath string) error { +func (ph *partHeader) MustReadMetadata(partPath string) { ph.Reset() // Read ph fields from metadata. metadataPath := filepath.Join(partPath, metadataFilename) metadata, err := os.ReadFile(metadataPath) if err != nil { - return fmt.Errorf("cannot read %q: %w", metadataPath, err) + logger.Panicf("FATAL: cannot read %q: %s", metadataPath, err) } var phj partHeaderJSON if err := json.Unmarshal(metadata, &phj); err != nil { - return fmt.Errorf("cannot parse %q: %w", metadataPath, err) + logger.Panicf("FATAL: cannot parse %q: %s", metadataPath, err) } if phj.ItemsCount <= 0 { - return fmt.Errorf("part %q cannot contain zero items", partPath) + logger.Panicf("FATAL: part %q cannot contain zero items", partPath) } ph.itemsCount = phj.ItemsCount if phj.BlocksCount <= 0 { - return fmt.Errorf("part %q cannot contain zero blocks", partPath) + logger.Panicf("FATAL: part %q cannot contain zero blocks", partPath) } if phj.BlocksCount > phj.ItemsCount { - return fmt.Errorf("the number of blocks cannot exceed the number of items in the part %q; got blocksCount=%d, itemsCount=%d", + logger.Panicf("FATAL: the number of blocks cannot exceed the number of items in the part %q; got blocksCount=%d, itemsCount=%d", partPath, phj.BlocksCount, phj.ItemsCount) } ph.blocksCount = phj.BlocksCount ph.firstItem = append(ph.firstItem[:0], phj.FirstItem...) ph.lastItem = append(ph.lastItem[:0], phj.LastItem...) - - return nil } func (ph *partHeader) MustWriteMetadata(partPath string) { diff --git a/lib/mergeset/part_search_test.go b/lib/mergeset/part_search_test.go index 72434a402..b61e9b552 100644 --- a/lib/mergeset/part_search_test.go +++ b/lib/mergeset/part_search_test.go @@ -151,7 +151,7 @@ func newTestPart(r *rand.Rand, blocksCount, maxItemsPerBlock int) (*part, []stri var itemsMerged uint64 var ip inmemoryPart var bsw blockStreamWriter - bsw.InitFromInmemoryPart(&ip, -3) + bsw.MustInitFromInmemoryPart(&ip, -3) if err := mergeBlockStreams(&ip.ph, &bsw, bsrs, nil, nil, &itemsMerged); err != nil { return nil, nil, fmt.Errorf("cannot merge blocks: %w", err) } @@ -159,9 +159,6 @@ func newTestPart(r *rand.Rand, blocksCount, maxItemsPerBlock int) (*part, []stri return nil, nil, fmt.Errorf("unexpected itemsMerged; got %d; want %d", itemsMerged, len(items)) } size := ip.size() - p, err := newPart(&ip.ph, "partName", size, ip.metaindexData.NewReader(), &ip.indexData, &ip.itemsData, &ip.lensData) - if err != nil { - return nil, nil, fmt.Errorf("cannot create part: %w", err) - } + p := newPart(&ip.ph, "partName", size, ip.metaindexData.NewReader(), &ip.indexData, &ip.itemsData, &ip.lensData) return p, items, nil } diff --git a/lib/mergeset/table.go b/lib/mergeset/table.go index 10f2e97b0..96153d9a8 100644 --- a/lib/mergeset/table.go +++ b/lib/mergeset/table.go @@ -878,7 +878,7 @@ func (tb *Table) createInmemoryPart(ibs []*inmemoryBlock) *partWrapper { continue } bsr := getBlockStreamReader() - bsr.InitFromInmemoryBlock(ib) + bsr.MustInitFromInmemoryBlock(ib) putInmemoryBlock(ib) bsrs = append(bsrs, bsr) } @@ -899,7 +899,7 @@ func (tb *Table) createInmemoryPart(ibs []*inmemoryBlock) *partWrapper { compressLevel := getCompressLevel(outItemsCount) bsw := getBlockStreamWriter() mpDst := &inmemoryPart{} - bsw.InitFromInmemoryPart(mpDst, compressLevel) + bsw.MustInitFromInmemoryPart(mpDst, compressLevel) // Merge parts. // The merge shouldn't be interrupted by stopCh, @@ -1093,16 +1093,7 @@ func (tb *Table) mergeParts(pws []*partWrapper, stopCh <-chan struct{}, isFinal } // Prepare BlockStreamReaders for source parts. - bsrs, err := openBlockStreamReaders(pws) - if err != nil { - logger.Panicf("FATAL: cannot open source parts for merging: %s", err) - } - closeBlockStreamReaders := func() { - for _, bsr := range bsrs { - putBlockStreamReader(bsr) - } - bsrs = nil - } + bsrs := mustOpenBlockStreamReaders(pws) // Prepare BlockStreamWriter for destination part. srcSize := uint64(0) @@ -1118,7 +1109,7 @@ func (tb *Table) mergeParts(pws []*partWrapper, stopCh <-chan struct{}, isFinal var mpNew *inmemoryPart if dstPartType == partInmemory { mpNew = &inmemoryPart{} - bsw.InitFromInmemoryPart(mpNew, compressLevel) + bsw.MustInitFromInmemoryPart(mpNew, compressLevel) } else { nocache := srcItemsCount > maxItemsPerCachedPart() bsw.MustInitFromFilePart(dstPartPath, nocache, compressLevel) @@ -1127,7 +1118,9 @@ func (tb *Table) mergeParts(pws []*partWrapper, stopCh <-chan struct{}, isFinal // Merge source parts to destination part. ph, err := tb.mergePartsInternal(dstPartPath, bsw, bsrs, dstPartType, stopCh) putBlockStreamWriter(bsw) - closeBlockStreamReaders() + for _, bsr := range bsrs { + putBlockStreamReader(bsr) + } if err != nil { tb.releasePartsToMerge(pws) return err @@ -1193,23 +1186,18 @@ func getDstPartType(pws []*partWrapper, isFinal bool) partType { return partInmemory } -func openBlockStreamReaders(pws []*partWrapper) ([]*blockStreamReader, error) { +func mustOpenBlockStreamReaders(pws []*partWrapper) []*blockStreamReader { bsrs := make([]*blockStreamReader, 0, len(pws)) for _, pw := range pws { bsr := getBlockStreamReader() if pw.mp != nil { - bsr.InitFromInmemoryPart(pw.mp) + bsr.MustInitFromInmemoryPart(pw.mp) } else { - if err := bsr.InitFromFilePart(pw.p.path); err != nil { - for _, bsr := range bsrs { - putBlockStreamReader(bsr) - } - return nil, fmt.Errorf("cannot open source part for merging: %w", err) - } + bsr.MustInitFromFilePart(pw.p.path) } bsrs = append(bsrs, bsr) } - return bsrs, nil + return bsrs } func (tb *Table) mergePartsInternal(dstPartPath string, bsw *blockStreamWriter, bsrs []*blockStreamReader, dstPartType partType, stopCh <-chan struct{}) (*partHeader, error) { @@ -1251,10 +1239,7 @@ func (tb *Table) openCreatedPart(pws []*partWrapper, mpNew *inmemoryPart, dstPar return pwNew } // Open the created part from disk. - pNew, err := openFilePart(dstPartPath) - if err != nil { - logger.Panicf("FATAL: cannot open the merged part: %s", err) - } + pNew := mustOpenFilePart(dstPartPath) pwNew := &partWrapper{ p: pNew, refCount: 1, @@ -1413,11 +1398,7 @@ func openParts(path string) ([]*partWrapper, error) { var pws []*partWrapper for _, partName := range partNames { partPath := filepath.Join(path, partName) - p, err := openFilePart(partPath) - if err != nil { - mustCloseParts(pws) - return nil, fmt.Errorf("cannot open part %q: %w", partPath, err) - } + p := mustOpenFilePart(partPath) pw := &partWrapper{ p: p, refCount: 1, @@ -1428,15 +1409,6 @@ func openParts(path string) ([]*partWrapper, error) { return pws, nil } -func mustCloseParts(pws []*partWrapper) { - for _, pw := range pws { - if pw.refCount != 1 { - logger.Panicf("BUG: unexpected refCount when closing part %q: %d; want 1", pw.p.path, pw.refCount) - } - pw.p.MustClose() - } -} - // CreateSnapshotAt creates tb snapshot in the given dstDir. // // Snapshot is created using linux hard links, so it is usually created very quickly. diff --git a/lib/storage/block_stream_reader.go b/lib/storage/block_stream_reader.go index 8087c2388..a8b61d84d 100644 --- a/lib/storage/block_stream_reader.go +++ b/lib/storage/block_stream_reader.go @@ -105,8 +105,8 @@ func (bsr *blockStreamReader) String() string { return bsr.ph.String() } -// InitFromInmemoryPart initializes bsr from the given mp. -func (bsr *blockStreamReader) InitFromInmemoryPart(mp *inmemoryPart) { +// MustInitFromInmemoryPart initializes bsr from the given mp. +func (bsr *blockStreamReader) MustInitFromInmemoryPart(mp *inmemoryPart) { bsr.reset() bsr.ph = mp.ph @@ -121,18 +121,16 @@ func (bsr *blockStreamReader) InitFromInmemoryPart(mp *inmemoryPart) { } } -// InitFromFilePart initializes bsr from a file-based part on the given path. +// MustInitFromFilePart initializes bsr from a file-based part on the given path. // // Files in the part are always read without OS cache pollution, // since they are usually deleted after the merge. -func (bsr *blockStreamReader) InitFromFilePart(path string) error { +func (bsr *blockStreamReader) MustInitFromFilePart(path string) { bsr.reset() path = filepath.Clean(path) - if err := bsr.ph.ReadMetadata(path); err != nil { - return fmt.Errorf("cannot parse path to part: %w", err) - } + bsr.ph.MustReadMetadata(path) timestampsPath := filepath.Join(path, timestampsFilename) timestampsFile := filestream.MustOpen(timestampsPath, true) @@ -156,8 +154,6 @@ func (bsr *blockStreamReader) InitFromFilePart(path string) error { bsr.valuesReader = valuesFile bsr.indexReader = indexFile bsr.mrs = mrs - - return nil } // MustClose closes the bsr. diff --git a/lib/storage/block_stream_reader_test.go b/lib/storage/block_stream_reader_test.go index f1c1398bc..5a05449e4 100644 --- a/lib/storage/block_stream_reader_test.go +++ b/lib/storage/block_stream_reader_test.go @@ -106,7 +106,7 @@ func TestBlockStreamReaderReadConcurrent(t *testing.T) { func testBlockStreamReaderReadRows(mp *inmemoryPart, rows []rawRow) error { var bsr blockStreamReader - bsr.InitFromInmemoryPart(mp) + bsr.MustInitFromInmemoryPart(mp) rowsCount := 0 for bsr.NextBlock() { if err := bsr.Block.UnmarshalData(); err != nil { @@ -155,6 +155,6 @@ func newTestBlockStreamReader(t *testing.T, rows []rawRow) *blockStreamReader { var mp inmemoryPart mp.InitFromRows(rows) var bsr blockStreamReader - bsr.InitFromInmemoryPart(&mp) + bsr.MustInitFromInmemoryPart(&mp) return &bsr } diff --git a/lib/storage/block_stream_reader_timing_test.go b/lib/storage/block_stream_reader_timing_test.go index 7628fac7b..39ae16607 100644 --- a/lib/storage/block_stream_reader_timing_test.go +++ b/lib/storage/block_stream_reader_timing_test.go @@ -28,7 +28,7 @@ func benchmarkBlockStreamReader(b *testing.B, mp *inmemoryPart, readRows bool) { var bsr blockStreamReader blockNum := 0 for pb.Next() { - bsr.InitFromInmemoryPart(mp) + bsr.MustInitFromInmemoryPart(mp) for bsr.NextBlock() { if !readRows { continue diff --git a/lib/storage/block_stream_writer.go b/lib/storage/block_stream_writer.go index 6c319d7f8..0029534e7 100644 --- a/lib/storage/block_stream_writer.go +++ b/lib/storage/block_stream_writer.go @@ -66,8 +66,8 @@ func (bsw *blockStreamWriter) reset() { bsw.prevTimestampsBlockOffset = 0 } -// InitFromInmemoryPart initializes bsw from inmemory part. -func (bsw *blockStreamWriter) InitFromInmemoryPart(mp *inmemoryPart, compressLevel int) { +// MustInitFromInmemoryPart initializes bsw from inmemory part. +func (bsw *blockStreamWriter) MustInitFromInmemoryPart(mp *inmemoryPart, compressLevel int) { bsw.reset() bsw.compressLevel = compressLevel diff --git a/lib/storage/block_stream_writer_timing_test.go b/lib/storage/block_stream_writer_timing_test.go index 95ecbbe35..80df38cd6 100644 --- a/lib/storage/block_stream_writer_timing_test.go +++ b/lib/storage/block_stream_writer_timing_test.go @@ -47,7 +47,7 @@ func benchmarkBlockStreamWriter(b *testing.B, ebs []Block, rowsCount int, writeR } } - bsw.InitFromInmemoryPart(&mp, -5) + bsw.MustInitFromInmemoryPart(&mp, -5) for i := range ebsCopy { bsw.WriteExternalBlock(&ebsCopy[i], &ph, &rowsMerged) } @@ -66,7 +66,7 @@ func newBenchBlocks(rows []rawRow) []Block { mp := newTestInmemoryPart(rows) var bsr blockStreamReader - bsr.InitFromInmemoryPart(mp) + bsr.MustInitFromInmemoryPart(mp) for bsr.NextBlock() { var eb Block eb.CopyFrom(&bsr.Block) diff --git a/lib/storage/inmemory_part.go b/lib/storage/inmemory_part.go index 2e3c48084..bc3b5b82a 100644 --- a/lib/storage/inmemory_part.go +++ b/lib/storage/inmemory_part.go @@ -73,7 +73,7 @@ func (mp *inmemoryPart) InitFromRows(rows []rawRow) { // // It is safe calling NewPart multiple times. // It is unsafe re-using mp while the returned part is in use. -func (mp *inmemoryPart) NewPart() (*part, error) { +func (mp *inmemoryPart) NewPart() *part { size := mp.size() return newPart(&mp.ph, "", size, mp.metaindexData.NewReader(), &mp.timestampsData, &mp.valuesData, &mp.indexData) } diff --git a/lib/storage/inmemory_part_test.go b/lib/storage/inmemory_part_test.go index 7047fc9d4..1ed1ebb90 100644 --- a/lib/storage/inmemory_part_test.go +++ b/lib/storage/inmemory_part_test.go @@ -78,7 +78,7 @@ func testInmemoryPartInitFromRows(t *testing.T, rows []rawRow, blocksCount int) } var bsr blockStreamReader - bsr.InitFromInmemoryPart(&mp) + bsr.MustInitFromInmemoryPart(&mp) rowsCount := 0 blockNum := 0 diff --git a/lib/storage/merge_test.go b/lib/storage/merge_test.go index e572babca..35a6d7d94 100644 --- a/lib/storage/merge_test.go +++ b/lib/storage/merge_test.go @@ -369,7 +369,7 @@ func TestMergeForciblyStop(t *testing.T) { var mp inmemoryPart var bsw blockStreamWriter - bsw.InitFromInmemoryPart(&mp, -5) + bsw.MustInitFromInmemoryPart(&mp, -5) ch := make(chan struct{}) var rowsMerged, rowsDeleted uint64 close(ch) @@ -392,7 +392,7 @@ func testMergeBlockStreams(t *testing.T, bsrs []*blockStreamReader, expectedBloc var mp inmemoryPart var bsw blockStreamWriter - bsw.InitFromInmemoryPart(&mp, -5) + bsw.MustInitFromInmemoryPart(&mp, -5) strg := newTestStorage() var rowsMerged, rowsDeleted uint64 @@ -418,7 +418,7 @@ func testMergeBlockStreams(t *testing.T, bsrs []*blockStreamReader, expectedBloc } var bsr1 blockStreamReader - bsr1.InitFromInmemoryPart(&mp) + bsr1.MustInitFromInmemoryPart(&mp) blocksCount := 0 rowsCount := 0 var prevTSID TSID diff --git a/lib/storage/merge_timing_test.go b/lib/storage/merge_timing_test.go index 983bbb69b..34ddde98f 100644 --- a/lib/storage/merge_timing_test.go +++ b/lib/storage/merge_timing_test.go @@ -38,10 +38,10 @@ func benchmarkMergeBlockStreams(b *testing.B, mps []*inmemoryPart, rowsPerLoop i } for pb.Next() { for i, mp := range mps { - bsrs[i].InitFromInmemoryPart(mp) + bsrs[i].MustInitFromInmemoryPart(mp) } mpOut.Reset() - bsw.InitFromInmemoryPart(&mpOut, -5) + bsw.MustInitFromInmemoryPart(&mpOut, -5) if err := mergeBlockStreams(&mpOut.ph, &bsw, bsrs, nil, strg, 0, &rowsMerged, &rowsDeleted); err != nil { panic(fmt.Errorf("cannot merge block streams: %w", err)) } diff --git a/lib/storage/part.go b/lib/storage/part.go index 3882bc79f..4efd74382 100644 --- a/lib/storage/part.go +++ b/lib/storage/part.go @@ -1,7 +1,6 @@ package storage import ( - "fmt" "path/filepath" "sync" "unsafe" @@ -9,6 +8,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/blockcache" "github.com/VictoriaMetrics/VictoriaMetrics/lib/filestream" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fs" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/memory" ) @@ -45,14 +45,12 @@ type part struct { metaindex []metaindexRow } -// openFilePart opens file-based part from the given path. -func openFilePart(path string) (*part, error) { +// mustOpenFilePart opens file-based part from the given path. +func mustOpenFilePart(path string) *part { path = filepath.Clean(path) var ph partHeader - if err := ph.ReadMetadata(path); err != nil { - return nil, fmt.Errorf("cannot parse path to part: %w", err) - } + ph.MustReadMetadata(path) timestampsPath := filepath.Join(path, timestampsFilename) timestampsFile := fs.MustOpenReaderAt(timestampsPath) @@ -78,11 +76,10 @@ func openFilePart(path string) (*part, error) { // // The returned part calls MustClose on all the files passed to newPart // when calling part.MustClose. -func newPart(ph *partHeader, path string, size uint64, metaindexReader filestream.ReadCloser, timestampsFile, valuesFile, indexFile fs.MustReadAtCloser) (*part, error) { - var errors []error +func newPart(ph *partHeader, path string, size uint64, metaindexReader filestream.ReadCloser, timestampsFile, valuesFile, indexFile fs.MustReadAtCloser) *part { metaindex, err := unmarshalMetaindexRows(nil, metaindexReader) if err != nil { - errors = append(errors, fmt.Errorf("cannot unmarshal metaindex data: %w", err)) + logger.Panicf("FATAL: cannot unmarshal metaindex data from %q: %s", path, err) } metaindexReader.MustClose() @@ -95,14 +92,7 @@ func newPart(ph *partHeader, path string, size uint64, metaindexReader filestrea p.indexFile = indexFile p.metaindex = metaindex - if len(errors) > 0 { - // Return only the first error, since it has no sense in returning all errors. - err = fmt.Errorf("cannot initialize part %q: %w", &p, errors[0]) - p.MustClose() - return nil, err - } - - return &p, nil + return &p } // String returns human-readable representation of p. diff --git a/lib/storage/part_header.go b/lib/storage/part_header.go index cbab4c8ed..b54bf39d5 100644 --- a/lib/storage/part_header.go +++ b/lib/storage/part_header.go @@ -131,7 +131,7 @@ func (ph *partHeader) ParseFromPath(path string) error { return nil } -func (ph *partHeader) ReadMetadata(partPath string) error { +func (ph *partHeader) MustReadMetadata(partPath string) { ph.Reset() metadataPath := filepath.Join(partPath, metadataFilename) @@ -140,29 +140,29 @@ func (ph *partHeader) ReadMetadata(partPath string) error { if os.IsNotExist(err) { // This is a part created before v1.90.0. // Fall back to reading the metadata from the partPath itsel - return ph.ParseFromPath(partPath) + if err := ph.ParseFromPath(partPath); err != nil { + logger.Panicf("FATAL: cannot parse metadata from %q: %s", partPath, err) + } } - return fmt.Errorf("cannot read %q: %w", metadataPath, err) + logger.Panicf("FATAL: cannot read %q: %s", metadataPath, err) } if err := json.Unmarshal(metadata, ph); err != nil { - return fmt.Errorf("cannot parse %q: %w", metadataPath, err) + logger.Panicf("FATAL: cannot parse %q: %s", metadataPath, err) } // Perform various checks if ph.MinTimestamp > ph.MaxTimestamp { - return fmt.Errorf("minTimestamp cannot exceed maxTimestamp; got %d vs %d", ph.MinTimestamp, ph.MaxTimestamp) + logger.Panicf("FATAL: minTimestamp cannot exceed maxTimestamp at %q; got %d vs %d", metadataPath, ph.MinTimestamp, ph.MaxTimestamp) } if ph.RowsCount <= 0 { - return fmt.Errorf("rowsCount must be greater than 0; got %d", ph.RowsCount) + logger.Panicf("FATAL: rowsCount must be greater than 0 at %q; got %d", metadataPath, ph.RowsCount) } if ph.BlocksCount <= 0 { - return fmt.Errorf("blocksCount must be greater than 0; got %d", ph.BlocksCount) + logger.Panicf("FATAL: blocksCount must be greater than 0 at %q; got %d", metadataPath, ph.BlocksCount) } if ph.BlocksCount > ph.RowsCount { - return fmt.Errorf("blocksCount cannot be bigger than rowsCount; got blocksCount=%d, rowsCount=%d", ph.BlocksCount, ph.RowsCount) + logger.Panicf("FATAL: blocksCount cannot be bigger than rowsCount at %q; got blocksCount=%d, rowsCount=%d", metadataPath, ph.BlocksCount, ph.RowsCount) } - - return nil } func (ph *partHeader) MustWriteMetadata(partPath string) { diff --git a/lib/storage/part_search_test.go b/lib/storage/part_search_test.go index 6847225c9..67a82b014 100644 --- a/lib/storage/part_search_test.go +++ b/lib/storage/part_search_test.go @@ -1425,9 +1425,6 @@ func getTestExpectedRawBlocks(rowsOriginal []rawRow, tsids []TSID, tr TimeRange) func newTestPart(rows []rawRow) *part { mp := newTestInmemoryPart(rows) - p, err := mp.NewPart() - if err != nil { - panic(fmt.Errorf("cannot create new part: %w", err)) - } + p := mp.NewPart() return p } diff --git a/lib/storage/partition.go b/lib/storage/partition.go index 7f54859a1..b4cb31749 100644 --- a/lib/storage/partition.go +++ b/lib/storage/partition.go @@ -723,10 +723,7 @@ func (pt *partition) createInmemoryPart(rows []rawRow) *partWrapper { } func newPartWrapperFromInmemoryPart(mp *inmemoryPart, flushToDiskDeadline time.Time) *partWrapper { - p, err := mp.NewPart() - if err != nil { - logger.Panicf("BUG: cannot create part from %q: %s", &mp.ph, err) - } + p := mp.NewPart() pw := &partWrapper{ p: p, mp: mp, @@ -1268,16 +1265,7 @@ func (pt *partition) mergeParts(pws []*partWrapper, stopCh <-chan struct{}, isFi } // Prepare BlockStreamReaders for source parts. - bsrs, err := openBlockStreamReaders(pws) - if err != nil { - logger.Panicf("FATAL: cannot open source parts for merging: %s", err) - } - closeBlockStreamReaders := func() { - for _, bsr := range bsrs { - putBlockStreamReader(bsr) - } - bsrs = nil - } + bsrs := mustOpenBlockStreamReaders(pws) // Prepare BlockStreamWriter for destination part. srcSize := uint64(0) @@ -1294,7 +1282,7 @@ func (pt *partition) mergeParts(pws []*partWrapper, stopCh <-chan struct{}, isFi var mpNew *inmemoryPart if dstPartType == partInmemory { mpNew = getInmemoryPart() - bsw.InitFromInmemoryPart(mpNew, compressLevel) + bsw.MustInitFromInmemoryPart(mpNew, compressLevel) } else { if dstPartPath == "" { logger.Panicf("BUG: dstPartPath must be non-empty") @@ -1306,7 +1294,9 @@ func (pt *partition) mergeParts(pws []*partWrapper, stopCh <-chan struct{}, isFi // Merge source parts to destination part. ph, err := pt.mergePartsInternal(dstPartPath, bsw, bsrs, dstPartType, stopCh) putBlockStreamWriter(bsw) - closeBlockStreamReaders() + for _, bsr := range bsrs { + putBlockStreamReader(bsr) + } if err != nil { pt.releasePartsToMerge(pws) return err @@ -1401,23 +1391,18 @@ func (pt *partition) getDstPartPath(dstPartType partType, mergeIdx uint64) strin return dstPartPath } -func openBlockStreamReaders(pws []*partWrapper) ([]*blockStreamReader, error) { +func mustOpenBlockStreamReaders(pws []*partWrapper) []*blockStreamReader { bsrs := make([]*blockStreamReader, 0, len(pws)) for _, pw := range pws { bsr := getBlockStreamReader() if pw.mp != nil { - bsr.InitFromInmemoryPart(pw.mp) + bsr.MustInitFromInmemoryPart(pw.mp) } else { - if err := bsr.InitFromFilePart(pw.p.path); err != nil { - for _, bsr := range bsrs { - putBlockStreamReader(bsr) - } - return nil, fmt.Errorf("cannot open source part for merging: %w", err) - } + bsr.MustInitFromFilePart(pw.p.path) } bsrs = append(bsrs, bsr) } - return bsrs, nil + return bsrs } func (pt *partition) mergePartsInternal(dstPartPath string, bsw *blockStreamWriter, bsrs []*blockStreamReader, dstPartType partType, stopCh <-chan struct{}) (*partHeader, error) { @@ -1476,10 +1461,7 @@ func (pt *partition) openCreatedPart(ph *partHeader, pws []*partWrapper, mpNew * return pwNew } // Open the created part from disk. - pNew, err := openFilePart(dstPartPath) - if err != nil { - logger.Panicf("FATAL: cannot open merged part %s: %s", dstPartPath, err) - } + pNew := mustOpenFilePart(dstPartPath) pwNew := &partWrapper{ p: pNew, refCount: 1, @@ -1821,11 +1803,7 @@ func openParts(path string, partNames []string) ([]*partWrapper, error) { var pws []*partWrapper for _, partName := range partNames { partPath := filepath.Join(path, partName) - p, err := openFilePart(partPath) - if err != nil { - mustCloseParts(pws) - return nil, fmt.Errorf("cannot open part %q: %w", partPath, err) - } + p := mustOpenFilePart(partPath) pw := &partWrapper{ p: p, refCount: 1, diff --git a/lib/storage/raw_row.go b/lib/storage/raw_row.go index 86aeb0533..0f267da03 100644 --- a/lib/storage/raw_row.go +++ b/lib/storage/raw_row.go @@ -89,7 +89,7 @@ func (rrm *rawRowsMarshaler) marshalToInmemoryPart(mp *inmemoryPart, rows []rawR // Use the minimum compression level for first-level in-memory blocks, // since they are going to be re-compressed during subsequent merges. const compressLevel = -5 // See https://github.com/facebook/zstd/releases/tag/v1.3.4 - rrm.bsw.InitFromInmemoryPart(mp, compressLevel) + rrm.bsw.MustInitFromInmemoryPart(mp, compressLevel) ph := &mp.ph ph.Reset()