mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
lib/{storage,mergeset}: convert InitFromFilePart to MustInitFromFilePart
Callers of InitFromFilePart log the error and exit. It is better to log the error with the path to the part and the call stack directly inside the MustInitFromFilePart() function. This simplifies the code at callers' side while leaving the same level of debuggability.
This commit is contained in:
parent
9183a439c7
commit
c0b852d50d
22 changed files with 93 additions and 182 deletions
|
@ -17,7 +17,7 @@ type blockStreamReader struct {
|
|||
// Block contains the current block if Next returned true.
|
||||
Block inmemoryBlock
|
||||
|
||||
// isInmemoryBlock is set to true if bsr was initialized with InitFromInmemoryBlock().
|
||||
// isInmemoryBlock is set to true if bsr was initialized with MustInitFromInmemoryBlock().
|
||||
isInmemoryBlock bool
|
||||
|
||||
// The index of the current item in the Block, which is returned from CurrItem()
|
||||
|
@ -103,16 +103,16 @@ func (bsr *blockStreamReader) String() string {
|
|||
return bsr.ph.String()
|
||||
}
|
||||
|
||||
// InitFromInmemoryBlock initializes bsr from the given ib.
|
||||
func (bsr *blockStreamReader) InitFromInmemoryBlock(ib *inmemoryBlock) {
|
||||
// MustInitFromInmemoryBlock initializes bsr from the given ib.
|
||||
func (bsr *blockStreamReader) MustInitFromInmemoryBlock(ib *inmemoryBlock) {
|
||||
bsr.reset()
|
||||
bsr.Block.CopyFrom(ib)
|
||||
bsr.Block.SortItems()
|
||||
bsr.isInmemoryBlock = true
|
||||
}
|
||||
|
||||
// InitFromInmemoryPart initializes bsr from the given mp.
|
||||
func (bsr *blockStreamReader) InitFromInmemoryPart(mp *inmemoryPart) {
|
||||
// MustInitFromInmemoryPart initializes bsr from the given mp.
|
||||
func (bsr *blockStreamReader) MustInitFromInmemoryPart(mp *inmemoryPart) {
|
||||
bsr.reset()
|
||||
|
||||
var err error
|
||||
|
@ -134,18 +134,16 @@ func (bsr *blockStreamReader) InitFromInmemoryPart(mp *inmemoryPart) {
|
|||
}
|
||||
}
|
||||
|
||||
// InitFromFilePart initializes bsr from a file-based part on the given path.
|
||||
// MustInitFromFilePart initializes bsr from a file-based part on the given path.
|
||||
//
|
||||
// Part files are read without OS cache pollution, since the part is usually
|
||||
// deleted after the merge.
|
||||
func (bsr *blockStreamReader) InitFromFilePart(path string) error {
|
||||
func (bsr *blockStreamReader) MustInitFromFilePart(path string) {
|
||||
bsr.reset()
|
||||
|
||||
path = filepath.Clean(path)
|
||||
|
||||
if err := bsr.ph.ReadMetadata(path); err != nil {
|
||||
return fmt.Errorf("cannot read metadata from %q: %w", path, err)
|
||||
}
|
||||
bsr.ph.MustReadMetadata(path)
|
||||
|
||||
metaindexPath := filepath.Join(path, metaindexFilename)
|
||||
metaindexFile := filestream.MustOpen(metaindexPath, true)
|
||||
|
@ -170,8 +168,6 @@ func (bsr *blockStreamReader) InitFromFilePart(path string) error {
|
|||
bsr.indexReader = indexFile
|
||||
bsr.itemsReader = itemsFile
|
||||
bsr.lensReader = lensFile
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// MustClose closes the bsr.
|
||||
|
|
|
@ -60,7 +60,7 @@ func (bsw *blockStreamWriter) reset() {
|
|||
bsw.mrFirstItemCaught = false
|
||||
}
|
||||
|
||||
func (bsw *blockStreamWriter) InitFromInmemoryPart(mp *inmemoryPart, compressLevel int) {
|
||||
func (bsw *blockStreamWriter) MustInitFromInmemoryPart(mp *inmemoryPart, compressLevel int) {
|
||||
bsw.reset()
|
||||
|
||||
bsw.compressLevel = compressLevel
|
||||
|
|
|
@ -6,7 +6,6 @@ import (
|
|||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
)
|
||||
|
||||
type inmemoryPart struct {
|
||||
|
@ -102,10 +101,7 @@ var inmemoryPartBytePool bytesutil.ByteBufferPool
|
|||
// It is unsafe re-using mp while the returned part is in use.
|
||||
func (mp *inmemoryPart) NewPart() *part {
|
||||
size := mp.size()
|
||||
p, err := newPart(&mp.ph, "", size, mp.metaindexData.NewReader(), &mp.indexData, &mp.itemsData, &mp.lensData)
|
||||
if err != nil {
|
||||
logger.Panicf("BUG: cannot create a part from inmemoryPart: %s", err)
|
||||
}
|
||||
p := newPart(&mp.ph, "", size, mp.metaindexData.NewReader(), &mp.indexData, &mp.itemsData, &mp.lensData)
|
||||
return p
|
||||
}
|
||||
|
||||
|
|
|
@ -32,14 +32,14 @@ func TestMultilevelMerge(t *testing.T) {
|
|||
// First level merge
|
||||
var dstIP1 inmemoryPart
|
||||
var bsw1 blockStreamWriter
|
||||
bsw1.InitFromInmemoryPart(&dstIP1, -5)
|
||||
bsw1.MustInitFromInmemoryPart(&dstIP1, -5)
|
||||
if err := mergeBlockStreams(&dstIP1.ph, &bsw1, bsrs[:5], nil, nil, &itemsMerged); err != nil {
|
||||
t.Fatalf("cannot merge first level part 1: %s", err)
|
||||
}
|
||||
|
||||
var dstIP2 inmemoryPart
|
||||
var bsw2 blockStreamWriter
|
||||
bsw2.InitFromInmemoryPart(&dstIP2, -5)
|
||||
bsw2.MustInitFromInmemoryPart(&dstIP2, -5)
|
||||
if err := mergeBlockStreams(&dstIP2.ph, &bsw2, bsrs[5:], nil, nil, &itemsMerged); err != nil {
|
||||
t.Fatalf("cannot merge first level part 2: %s", err)
|
||||
}
|
||||
|
@ -56,7 +56,7 @@ func TestMultilevelMerge(t *testing.T) {
|
|||
newTestBlockStreamReader(&dstIP1),
|
||||
newTestBlockStreamReader(&dstIP2),
|
||||
}
|
||||
bsw.InitFromInmemoryPart(&dstIP, 1)
|
||||
bsw.MustInitFromInmemoryPart(&dstIP, 1)
|
||||
if err := mergeBlockStreams(&dstIP.ph, &bsw, bsrsTop, nil, nil, &itemsMerged); err != nil {
|
||||
t.Fatalf("cannot merge second level: %s", err)
|
||||
}
|
||||
|
@ -76,7 +76,7 @@ func TestMergeForciblyStop(t *testing.T) {
|
|||
bsrs, _ := newTestInmemoryBlockStreamReaders(r, 20, 4000)
|
||||
var dstIP inmemoryPart
|
||||
var bsw blockStreamWriter
|
||||
bsw.InitFromInmemoryPart(&dstIP, 1)
|
||||
bsw.MustInitFromInmemoryPart(&dstIP, 1)
|
||||
ch := make(chan struct{})
|
||||
var itemsMerged uint64
|
||||
close(ch)
|
||||
|
@ -125,7 +125,7 @@ func testMergeBlockStreamsSerial(r *rand.Rand, blocksToMerge, maxItemsPerBlock i
|
|||
var itemsMerged uint64
|
||||
var dstIP inmemoryPart
|
||||
var bsw blockStreamWriter
|
||||
bsw.InitFromInmemoryPart(&dstIP, -4)
|
||||
bsw.MustInitFromInmemoryPart(&dstIP, -4)
|
||||
if err := mergeBlockStreams(&dstIP.ph, &bsw, bsrs, nil, nil, &itemsMerged); err != nil {
|
||||
return fmt.Errorf("cannot merge block streams: %w", err)
|
||||
}
|
||||
|
@ -204,6 +204,6 @@ func newTestInmemoryBlockStreamReaders(r *rand.Rand, blocksCount, maxItemsPerBlo
|
|||
|
||||
func newTestBlockStreamReader(ip *inmemoryPart) *blockStreamReader {
|
||||
var bsr blockStreamReader
|
||||
bsr.InitFromInmemoryPart(ip)
|
||||
bsr.MustInitFromInmemoryPart(ip)
|
||||
return &bsr
|
||||
}
|
||||
|
|
|
@ -1,7 +1,6 @@
|
|||
package mergeset
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
"unsafe"
|
||||
|
@ -9,6 +8,7 @@ import (
|
|||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/blockcache"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/filestream"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
|
||||
)
|
||||
|
||||
|
@ -67,11 +67,9 @@ type part struct {
|
|||
lensFile fs.MustReadAtCloser
|
||||
}
|
||||
|
||||
func openFilePart(path string) (*part, error) {
|
||||
func mustOpenFilePart(path string) *part {
|
||||
var ph partHeader
|
||||
if err := ph.ReadMetadata(path); err != nil {
|
||||
return nil, fmt.Errorf("cannot read part metadata: %w", err)
|
||||
}
|
||||
ph.MustReadMetadata(path)
|
||||
|
||||
metaindexPath := filepath.Join(path, metaindexFilename)
|
||||
metaindexFile := filestream.MustOpen(metaindexPath, true)
|
||||
|
@ -93,11 +91,10 @@ func openFilePart(path string) (*part, error) {
|
|||
return newPart(&ph, path, size, metaindexFile, indexFile, itemsFile, lensFile)
|
||||
}
|
||||
|
||||
func newPart(ph *partHeader, path string, size uint64, metaindexReader filestream.ReadCloser, indexFile, itemsFile, lensFile fs.MustReadAtCloser) (*part, error) {
|
||||
var errors []error
|
||||
func newPart(ph *partHeader, path string, size uint64, metaindexReader filestream.ReadCloser, indexFile, itemsFile, lensFile fs.MustReadAtCloser) *part {
|
||||
mrs, err := unmarshalMetaindexRows(nil, metaindexReader)
|
||||
if err != nil {
|
||||
errors = append(errors, fmt.Errorf("cannot unmarshal metaindexRows: %w", err))
|
||||
logger.Panicf("FATAL: cannot unmarshal metaindexRows from %q: %s", path, err)
|
||||
}
|
||||
metaindexReader.MustClose()
|
||||
|
||||
|
@ -111,13 +108,7 @@ func newPart(ph *partHeader, path string, size uint64, metaindexReader filestrea
|
|||
p.lensFile = lensFile
|
||||
|
||||
p.ph.CopyFrom(ph)
|
||||
if len(errors) > 0 {
|
||||
// Return only the first error, since it has no sense in returning all errors.
|
||||
err := fmt.Errorf("error opening part %s: %w", p.path, errors[0])
|
||||
p.MustClose()
|
||||
return nil, err
|
||||
}
|
||||
return &p, nil
|
||||
return &p
|
||||
}
|
||||
|
||||
func (p *part) MustClose() {
|
||||
|
|
|
@ -78,39 +78,37 @@ func (ph *partHeader) CopyFrom(src *partHeader) {
|
|||
ph.lastItem = append(ph.lastItem[:0], src.lastItem...)
|
||||
}
|
||||
|
||||
func (ph *partHeader) ReadMetadata(partPath string) error {
|
||||
func (ph *partHeader) MustReadMetadata(partPath string) {
|
||||
ph.Reset()
|
||||
|
||||
// Read ph fields from metadata.
|
||||
metadataPath := filepath.Join(partPath, metadataFilename)
|
||||
metadata, err := os.ReadFile(metadataPath)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot read %q: %w", metadataPath, err)
|
||||
logger.Panicf("FATAL: cannot read %q: %s", metadataPath, err)
|
||||
}
|
||||
|
||||
var phj partHeaderJSON
|
||||
if err := json.Unmarshal(metadata, &phj); err != nil {
|
||||
return fmt.Errorf("cannot parse %q: %w", metadataPath, err)
|
||||
logger.Panicf("FATAL: cannot parse %q: %s", metadataPath, err)
|
||||
}
|
||||
|
||||
if phj.ItemsCount <= 0 {
|
||||
return fmt.Errorf("part %q cannot contain zero items", partPath)
|
||||
logger.Panicf("FATAL: part %q cannot contain zero items", partPath)
|
||||
}
|
||||
ph.itemsCount = phj.ItemsCount
|
||||
|
||||
if phj.BlocksCount <= 0 {
|
||||
return fmt.Errorf("part %q cannot contain zero blocks", partPath)
|
||||
logger.Panicf("FATAL: part %q cannot contain zero blocks", partPath)
|
||||
}
|
||||
if phj.BlocksCount > phj.ItemsCount {
|
||||
return fmt.Errorf("the number of blocks cannot exceed the number of items in the part %q; got blocksCount=%d, itemsCount=%d",
|
||||
logger.Panicf("FATAL: the number of blocks cannot exceed the number of items in the part %q; got blocksCount=%d, itemsCount=%d",
|
||||
partPath, phj.BlocksCount, phj.ItemsCount)
|
||||
}
|
||||
ph.blocksCount = phj.BlocksCount
|
||||
|
||||
ph.firstItem = append(ph.firstItem[:0], phj.FirstItem...)
|
||||
ph.lastItem = append(ph.lastItem[:0], phj.LastItem...)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ph *partHeader) MustWriteMetadata(partPath string) {
|
||||
|
|
|
@ -151,7 +151,7 @@ func newTestPart(r *rand.Rand, blocksCount, maxItemsPerBlock int) (*part, []stri
|
|||
var itemsMerged uint64
|
||||
var ip inmemoryPart
|
||||
var bsw blockStreamWriter
|
||||
bsw.InitFromInmemoryPart(&ip, -3)
|
||||
bsw.MustInitFromInmemoryPart(&ip, -3)
|
||||
if err := mergeBlockStreams(&ip.ph, &bsw, bsrs, nil, nil, &itemsMerged); err != nil {
|
||||
return nil, nil, fmt.Errorf("cannot merge blocks: %w", err)
|
||||
}
|
||||
|
@ -159,9 +159,6 @@ func newTestPart(r *rand.Rand, blocksCount, maxItemsPerBlock int) (*part, []stri
|
|||
return nil, nil, fmt.Errorf("unexpected itemsMerged; got %d; want %d", itemsMerged, len(items))
|
||||
}
|
||||
size := ip.size()
|
||||
p, err := newPart(&ip.ph, "partName", size, ip.metaindexData.NewReader(), &ip.indexData, &ip.itemsData, &ip.lensData)
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("cannot create part: %w", err)
|
||||
}
|
||||
p := newPart(&ip.ph, "partName", size, ip.metaindexData.NewReader(), &ip.indexData, &ip.itemsData, &ip.lensData)
|
||||
return p, items, nil
|
||||
}
|
||||
|
|
|
@ -878,7 +878,7 @@ func (tb *Table) createInmemoryPart(ibs []*inmemoryBlock) *partWrapper {
|
|||
continue
|
||||
}
|
||||
bsr := getBlockStreamReader()
|
||||
bsr.InitFromInmemoryBlock(ib)
|
||||
bsr.MustInitFromInmemoryBlock(ib)
|
||||
putInmemoryBlock(ib)
|
||||
bsrs = append(bsrs, bsr)
|
||||
}
|
||||
|
@ -899,7 +899,7 @@ func (tb *Table) createInmemoryPart(ibs []*inmemoryBlock) *partWrapper {
|
|||
compressLevel := getCompressLevel(outItemsCount)
|
||||
bsw := getBlockStreamWriter()
|
||||
mpDst := &inmemoryPart{}
|
||||
bsw.InitFromInmemoryPart(mpDst, compressLevel)
|
||||
bsw.MustInitFromInmemoryPart(mpDst, compressLevel)
|
||||
|
||||
// Merge parts.
|
||||
// The merge shouldn't be interrupted by stopCh,
|
||||
|
@ -1093,16 +1093,7 @@ func (tb *Table) mergeParts(pws []*partWrapper, stopCh <-chan struct{}, isFinal
|
|||
}
|
||||
|
||||
// Prepare BlockStreamReaders for source parts.
|
||||
bsrs, err := openBlockStreamReaders(pws)
|
||||
if err != nil {
|
||||
logger.Panicf("FATAL: cannot open source parts for merging: %s", err)
|
||||
}
|
||||
closeBlockStreamReaders := func() {
|
||||
for _, bsr := range bsrs {
|
||||
putBlockStreamReader(bsr)
|
||||
}
|
||||
bsrs = nil
|
||||
}
|
||||
bsrs := mustOpenBlockStreamReaders(pws)
|
||||
|
||||
// Prepare BlockStreamWriter for destination part.
|
||||
srcSize := uint64(0)
|
||||
|
@ -1118,7 +1109,7 @@ func (tb *Table) mergeParts(pws []*partWrapper, stopCh <-chan struct{}, isFinal
|
|||
var mpNew *inmemoryPart
|
||||
if dstPartType == partInmemory {
|
||||
mpNew = &inmemoryPart{}
|
||||
bsw.InitFromInmemoryPart(mpNew, compressLevel)
|
||||
bsw.MustInitFromInmemoryPart(mpNew, compressLevel)
|
||||
} else {
|
||||
nocache := srcItemsCount > maxItemsPerCachedPart()
|
||||
bsw.MustInitFromFilePart(dstPartPath, nocache, compressLevel)
|
||||
|
@ -1127,7 +1118,9 @@ func (tb *Table) mergeParts(pws []*partWrapper, stopCh <-chan struct{}, isFinal
|
|||
// Merge source parts to destination part.
|
||||
ph, err := tb.mergePartsInternal(dstPartPath, bsw, bsrs, dstPartType, stopCh)
|
||||
putBlockStreamWriter(bsw)
|
||||
closeBlockStreamReaders()
|
||||
for _, bsr := range bsrs {
|
||||
putBlockStreamReader(bsr)
|
||||
}
|
||||
if err != nil {
|
||||
tb.releasePartsToMerge(pws)
|
||||
return err
|
||||
|
@ -1193,23 +1186,18 @@ func getDstPartType(pws []*partWrapper, isFinal bool) partType {
|
|||
return partInmemory
|
||||
}
|
||||
|
||||
func openBlockStreamReaders(pws []*partWrapper) ([]*blockStreamReader, error) {
|
||||
func mustOpenBlockStreamReaders(pws []*partWrapper) []*blockStreamReader {
|
||||
bsrs := make([]*blockStreamReader, 0, len(pws))
|
||||
for _, pw := range pws {
|
||||
bsr := getBlockStreamReader()
|
||||
if pw.mp != nil {
|
||||
bsr.InitFromInmemoryPart(pw.mp)
|
||||
bsr.MustInitFromInmemoryPart(pw.mp)
|
||||
} else {
|
||||
if err := bsr.InitFromFilePart(pw.p.path); err != nil {
|
||||
for _, bsr := range bsrs {
|
||||
putBlockStreamReader(bsr)
|
||||
}
|
||||
return nil, fmt.Errorf("cannot open source part for merging: %w", err)
|
||||
}
|
||||
bsr.MustInitFromFilePart(pw.p.path)
|
||||
}
|
||||
bsrs = append(bsrs, bsr)
|
||||
}
|
||||
return bsrs, nil
|
||||
return bsrs
|
||||
}
|
||||
|
||||
func (tb *Table) mergePartsInternal(dstPartPath string, bsw *blockStreamWriter, bsrs []*blockStreamReader, dstPartType partType, stopCh <-chan struct{}) (*partHeader, error) {
|
||||
|
@ -1251,10 +1239,7 @@ func (tb *Table) openCreatedPart(pws []*partWrapper, mpNew *inmemoryPart, dstPar
|
|||
return pwNew
|
||||
}
|
||||
// Open the created part from disk.
|
||||
pNew, err := openFilePart(dstPartPath)
|
||||
if err != nil {
|
||||
logger.Panicf("FATAL: cannot open the merged part: %s", err)
|
||||
}
|
||||
pNew := mustOpenFilePart(dstPartPath)
|
||||
pwNew := &partWrapper{
|
||||
p: pNew,
|
||||
refCount: 1,
|
||||
|
@ -1413,11 +1398,7 @@ func openParts(path string) ([]*partWrapper, error) {
|
|||
var pws []*partWrapper
|
||||
for _, partName := range partNames {
|
||||
partPath := filepath.Join(path, partName)
|
||||
p, err := openFilePart(partPath)
|
||||
if err != nil {
|
||||
mustCloseParts(pws)
|
||||
return nil, fmt.Errorf("cannot open part %q: %w", partPath, err)
|
||||
}
|
||||
p := mustOpenFilePart(partPath)
|
||||
pw := &partWrapper{
|
||||
p: p,
|
||||
refCount: 1,
|
||||
|
@ -1428,15 +1409,6 @@ func openParts(path string) ([]*partWrapper, error) {
|
|||
return pws, nil
|
||||
}
|
||||
|
||||
func mustCloseParts(pws []*partWrapper) {
|
||||
for _, pw := range pws {
|
||||
if pw.refCount != 1 {
|
||||
logger.Panicf("BUG: unexpected refCount when closing part %q: %d; want 1", pw.p.path, pw.refCount)
|
||||
}
|
||||
pw.p.MustClose()
|
||||
}
|
||||
}
|
||||
|
||||
// CreateSnapshotAt creates tb snapshot in the given dstDir.
|
||||
//
|
||||
// Snapshot is created using linux hard links, so it is usually created very quickly.
|
||||
|
|
|
@ -105,8 +105,8 @@ func (bsr *blockStreamReader) String() string {
|
|||
return bsr.ph.String()
|
||||
}
|
||||
|
||||
// InitFromInmemoryPart initializes bsr from the given mp.
|
||||
func (bsr *blockStreamReader) InitFromInmemoryPart(mp *inmemoryPart) {
|
||||
// MustInitFromInmemoryPart initializes bsr from the given mp.
|
||||
func (bsr *blockStreamReader) MustInitFromInmemoryPart(mp *inmemoryPart) {
|
||||
bsr.reset()
|
||||
|
||||
bsr.ph = mp.ph
|
||||
|
@ -121,18 +121,16 @@ func (bsr *blockStreamReader) InitFromInmemoryPart(mp *inmemoryPart) {
|
|||
}
|
||||
}
|
||||
|
||||
// InitFromFilePart initializes bsr from a file-based part on the given path.
|
||||
// MustInitFromFilePart initializes bsr from a file-based part on the given path.
|
||||
//
|
||||
// Files in the part are always read without OS cache pollution,
|
||||
// since they are usually deleted after the merge.
|
||||
func (bsr *blockStreamReader) InitFromFilePart(path string) error {
|
||||
func (bsr *blockStreamReader) MustInitFromFilePart(path string) {
|
||||
bsr.reset()
|
||||
|
||||
path = filepath.Clean(path)
|
||||
|
||||
if err := bsr.ph.ReadMetadata(path); err != nil {
|
||||
return fmt.Errorf("cannot parse path to part: %w", err)
|
||||
}
|
||||
bsr.ph.MustReadMetadata(path)
|
||||
|
||||
timestampsPath := filepath.Join(path, timestampsFilename)
|
||||
timestampsFile := filestream.MustOpen(timestampsPath, true)
|
||||
|
@ -156,8 +154,6 @@ func (bsr *blockStreamReader) InitFromFilePart(path string) error {
|
|||
bsr.valuesReader = valuesFile
|
||||
bsr.indexReader = indexFile
|
||||
bsr.mrs = mrs
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// MustClose closes the bsr.
|
||||
|
|
|
@ -106,7 +106,7 @@ func TestBlockStreamReaderReadConcurrent(t *testing.T) {
|
|||
|
||||
func testBlockStreamReaderReadRows(mp *inmemoryPart, rows []rawRow) error {
|
||||
var bsr blockStreamReader
|
||||
bsr.InitFromInmemoryPart(mp)
|
||||
bsr.MustInitFromInmemoryPart(mp)
|
||||
rowsCount := 0
|
||||
for bsr.NextBlock() {
|
||||
if err := bsr.Block.UnmarshalData(); err != nil {
|
||||
|
@ -155,6 +155,6 @@ func newTestBlockStreamReader(t *testing.T, rows []rawRow) *blockStreamReader {
|
|||
var mp inmemoryPart
|
||||
mp.InitFromRows(rows)
|
||||
var bsr blockStreamReader
|
||||
bsr.InitFromInmemoryPart(&mp)
|
||||
bsr.MustInitFromInmemoryPart(&mp)
|
||||
return &bsr
|
||||
}
|
||||
|
|
|
@ -28,7 +28,7 @@ func benchmarkBlockStreamReader(b *testing.B, mp *inmemoryPart, readRows bool) {
|
|||
var bsr blockStreamReader
|
||||
blockNum := 0
|
||||
for pb.Next() {
|
||||
bsr.InitFromInmemoryPart(mp)
|
||||
bsr.MustInitFromInmemoryPart(mp)
|
||||
for bsr.NextBlock() {
|
||||
if !readRows {
|
||||
continue
|
||||
|
|
|
@ -66,8 +66,8 @@ func (bsw *blockStreamWriter) reset() {
|
|||
bsw.prevTimestampsBlockOffset = 0
|
||||
}
|
||||
|
||||
// InitFromInmemoryPart initializes bsw from inmemory part.
|
||||
func (bsw *blockStreamWriter) InitFromInmemoryPart(mp *inmemoryPart, compressLevel int) {
|
||||
// MustInitFromInmemoryPart initializes bsw from inmemory part.
|
||||
func (bsw *blockStreamWriter) MustInitFromInmemoryPart(mp *inmemoryPart, compressLevel int) {
|
||||
bsw.reset()
|
||||
|
||||
bsw.compressLevel = compressLevel
|
||||
|
|
|
@ -47,7 +47,7 @@ func benchmarkBlockStreamWriter(b *testing.B, ebs []Block, rowsCount int, writeR
|
|||
}
|
||||
}
|
||||
|
||||
bsw.InitFromInmemoryPart(&mp, -5)
|
||||
bsw.MustInitFromInmemoryPart(&mp, -5)
|
||||
for i := range ebsCopy {
|
||||
bsw.WriteExternalBlock(&ebsCopy[i], &ph, &rowsMerged)
|
||||
}
|
||||
|
@ -66,7 +66,7 @@ func newBenchBlocks(rows []rawRow) []Block {
|
|||
|
||||
mp := newTestInmemoryPart(rows)
|
||||
var bsr blockStreamReader
|
||||
bsr.InitFromInmemoryPart(mp)
|
||||
bsr.MustInitFromInmemoryPart(mp)
|
||||
for bsr.NextBlock() {
|
||||
var eb Block
|
||||
eb.CopyFrom(&bsr.Block)
|
||||
|
|
|
@ -73,7 +73,7 @@ func (mp *inmemoryPart) InitFromRows(rows []rawRow) {
|
|||
//
|
||||
// It is safe calling NewPart multiple times.
|
||||
// It is unsafe re-using mp while the returned part is in use.
|
||||
func (mp *inmemoryPart) NewPart() (*part, error) {
|
||||
func (mp *inmemoryPart) NewPart() *part {
|
||||
size := mp.size()
|
||||
return newPart(&mp.ph, "", size, mp.metaindexData.NewReader(), &mp.timestampsData, &mp.valuesData, &mp.indexData)
|
||||
}
|
||||
|
|
|
@ -78,7 +78,7 @@ func testInmemoryPartInitFromRows(t *testing.T, rows []rawRow, blocksCount int)
|
|||
}
|
||||
|
||||
var bsr blockStreamReader
|
||||
bsr.InitFromInmemoryPart(&mp)
|
||||
bsr.MustInitFromInmemoryPart(&mp)
|
||||
|
||||
rowsCount := 0
|
||||
blockNum := 0
|
||||
|
|
|
@ -369,7 +369,7 @@ func TestMergeForciblyStop(t *testing.T) {
|
|||
|
||||
var mp inmemoryPart
|
||||
var bsw blockStreamWriter
|
||||
bsw.InitFromInmemoryPart(&mp, -5)
|
||||
bsw.MustInitFromInmemoryPart(&mp, -5)
|
||||
ch := make(chan struct{})
|
||||
var rowsMerged, rowsDeleted uint64
|
||||
close(ch)
|
||||
|
@ -392,7 +392,7 @@ func testMergeBlockStreams(t *testing.T, bsrs []*blockStreamReader, expectedBloc
|
|||
var mp inmemoryPart
|
||||
|
||||
var bsw blockStreamWriter
|
||||
bsw.InitFromInmemoryPart(&mp, -5)
|
||||
bsw.MustInitFromInmemoryPart(&mp, -5)
|
||||
|
||||
strg := newTestStorage()
|
||||
var rowsMerged, rowsDeleted uint64
|
||||
|
@ -418,7 +418,7 @@ func testMergeBlockStreams(t *testing.T, bsrs []*blockStreamReader, expectedBloc
|
|||
}
|
||||
|
||||
var bsr1 blockStreamReader
|
||||
bsr1.InitFromInmemoryPart(&mp)
|
||||
bsr1.MustInitFromInmemoryPart(&mp)
|
||||
blocksCount := 0
|
||||
rowsCount := 0
|
||||
var prevTSID TSID
|
||||
|
|
|
@ -38,10 +38,10 @@ func benchmarkMergeBlockStreams(b *testing.B, mps []*inmemoryPart, rowsPerLoop i
|
|||
}
|
||||
for pb.Next() {
|
||||
for i, mp := range mps {
|
||||
bsrs[i].InitFromInmemoryPart(mp)
|
||||
bsrs[i].MustInitFromInmemoryPart(mp)
|
||||
}
|
||||
mpOut.Reset()
|
||||
bsw.InitFromInmemoryPart(&mpOut, -5)
|
||||
bsw.MustInitFromInmemoryPart(&mpOut, -5)
|
||||
if err := mergeBlockStreams(&mpOut.ph, &bsw, bsrs, nil, strg, 0, &rowsMerged, &rowsDeleted); err != nil {
|
||||
panic(fmt.Errorf("cannot merge block streams: %w", err))
|
||||
}
|
||||
|
|
|
@ -1,7 +1,6 @@
|
|||
package storage
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
"unsafe"
|
||||
|
@ -9,6 +8,7 @@ import (
|
|||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/blockcache"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/filestream"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
|
||||
)
|
||||
|
||||
|
@ -45,14 +45,12 @@ type part struct {
|
|||
metaindex []metaindexRow
|
||||
}
|
||||
|
||||
// openFilePart opens file-based part from the given path.
|
||||
func openFilePart(path string) (*part, error) {
|
||||
// mustOpenFilePart opens file-based part from the given path.
|
||||
func mustOpenFilePart(path string) *part {
|
||||
path = filepath.Clean(path)
|
||||
|
||||
var ph partHeader
|
||||
if err := ph.ReadMetadata(path); err != nil {
|
||||
return nil, fmt.Errorf("cannot parse path to part: %w", err)
|
||||
}
|
||||
ph.MustReadMetadata(path)
|
||||
|
||||
timestampsPath := filepath.Join(path, timestampsFilename)
|
||||
timestampsFile := fs.MustOpenReaderAt(timestampsPath)
|
||||
|
@ -78,11 +76,10 @@ func openFilePart(path string) (*part, error) {
|
|||
//
|
||||
// The returned part calls MustClose on all the files passed to newPart
|
||||
// when calling part.MustClose.
|
||||
func newPart(ph *partHeader, path string, size uint64, metaindexReader filestream.ReadCloser, timestampsFile, valuesFile, indexFile fs.MustReadAtCloser) (*part, error) {
|
||||
var errors []error
|
||||
func newPart(ph *partHeader, path string, size uint64, metaindexReader filestream.ReadCloser, timestampsFile, valuesFile, indexFile fs.MustReadAtCloser) *part {
|
||||
metaindex, err := unmarshalMetaindexRows(nil, metaindexReader)
|
||||
if err != nil {
|
||||
errors = append(errors, fmt.Errorf("cannot unmarshal metaindex data: %w", err))
|
||||
logger.Panicf("FATAL: cannot unmarshal metaindex data from %q: %s", path, err)
|
||||
}
|
||||
metaindexReader.MustClose()
|
||||
|
||||
|
@ -95,14 +92,7 @@ func newPart(ph *partHeader, path string, size uint64, metaindexReader filestrea
|
|||
p.indexFile = indexFile
|
||||
p.metaindex = metaindex
|
||||
|
||||
if len(errors) > 0 {
|
||||
// Return only the first error, since it has no sense in returning all errors.
|
||||
err = fmt.Errorf("cannot initialize part %q: %w", &p, errors[0])
|
||||
p.MustClose()
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &p, nil
|
||||
return &p
|
||||
}
|
||||
|
||||
// String returns human-readable representation of p.
|
||||
|
|
|
@ -131,7 +131,7 @@ func (ph *partHeader) ParseFromPath(path string) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (ph *partHeader) ReadMetadata(partPath string) error {
|
||||
func (ph *partHeader) MustReadMetadata(partPath string) {
|
||||
ph.Reset()
|
||||
|
||||
metadataPath := filepath.Join(partPath, metadataFilename)
|
||||
|
@ -140,29 +140,29 @@ func (ph *partHeader) ReadMetadata(partPath string) error {
|
|||
if os.IsNotExist(err) {
|
||||
// This is a part created before v1.90.0.
|
||||
// Fall back to reading the metadata from the partPath itsel
|
||||
return ph.ParseFromPath(partPath)
|
||||
if err := ph.ParseFromPath(partPath); err != nil {
|
||||
logger.Panicf("FATAL: cannot parse metadata from %q: %s", partPath, err)
|
||||
}
|
||||
return fmt.Errorf("cannot read %q: %w", metadataPath, err)
|
||||
}
|
||||
logger.Panicf("FATAL: cannot read %q: %s", metadataPath, err)
|
||||
}
|
||||
if err := json.Unmarshal(metadata, ph); err != nil {
|
||||
return fmt.Errorf("cannot parse %q: %w", metadataPath, err)
|
||||
logger.Panicf("FATAL: cannot parse %q: %s", metadataPath, err)
|
||||
}
|
||||
|
||||
// Perform various checks
|
||||
if ph.MinTimestamp > ph.MaxTimestamp {
|
||||
return fmt.Errorf("minTimestamp cannot exceed maxTimestamp; got %d vs %d", ph.MinTimestamp, ph.MaxTimestamp)
|
||||
logger.Panicf("FATAL: minTimestamp cannot exceed maxTimestamp at %q; got %d vs %d", metadataPath, ph.MinTimestamp, ph.MaxTimestamp)
|
||||
}
|
||||
if ph.RowsCount <= 0 {
|
||||
return fmt.Errorf("rowsCount must be greater than 0; got %d", ph.RowsCount)
|
||||
logger.Panicf("FATAL: rowsCount must be greater than 0 at %q; got %d", metadataPath, ph.RowsCount)
|
||||
}
|
||||
if ph.BlocksCount <= 0 {
|
||||
return fmt.Errorf("blocksCount must be greater than 0; got %d", ph.BlocksCount)
|
||||
logger.Panicf("FATAL: blocksCount must be greater than 0 at %q; got %d", metadataPath, ph.BlocksCount)
|
||||
}
|
||||
if ph.BlocksCount > ph.RowsCount {
|
||||
return fmt.Errorf("blocksCount cannot be bigger than rowsCount; got blocksCount=%d, rowsCount=%d", ph.BlocksCount, ph.RowsCount)
|
||||
logger.Panicf("FATAL: blocksCount cannot be bigger than rowsCount at %q; got blocksCount=%d, rowsCount=%d", metadataPath, ph.BlocksCount, ph.RowsCount)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ph *partHeader) MustWriteMetadata(partPath string) {
|
||||
|
|
|
@ -1425,9 +1425,6 @@ func getTestExpectedRawBlocks(rowsOriginal []rawRow, tsids []TSID, tr TimeRange)
|
|||
|
||||
func newTestPart(rows []rawRow) *part {
|
||||
mp := newTestInmemoryPart(rows)
|
||||
p, err := mp.NewPart()
|
||||
if err != nil {
|
||||
panic(fmt.Errorf("cannot create new part: %w", err))
|
||||
}
|
||||
p := mp.NewPart()
|
||||
return p
|
||||
}
|
||||
|
|
|
@ -723,10 +723,7 @@ func (pt *partition) createInmemoryPart(rows []rawRow) *partWrapper {
|
|||
}
|
||||
|
||||
func newPartWrapperFromInmemoryPart(mp *inmemoryPart, flushToDiskDeadline time.Time) *partWrapper {
|
||||
p, err := mp.NewPart()
|
||||
if err != nil {
|
||||
logger.Panicf("BUG: cannot create part from %q: %s", &mp.ph, err)
|
||||
}
|
||||
p := mp.NewPart()
|
||||
pw := &partWrapper{
|
||||
p: p,
|
||||
mp: mp,
|
||||
|
@ -1268,16 +1265,7 @@ func (pt *partition) mergeParts(pws []*partWrapper, stopCh <-chan struct{}, isFi
|
|||
}
|
||||
|
||||
// Prepare BlockStreamReaders for source parts.
|
||||
bsrs, err := openBlockStreamReaders(pws)
|
||||
if err != nil {
|
||||
logger.Panicf("FATAL: cannot open source parts for merging: %s", err)
|
||||
}
|
||||
closeBlockStreamReaders := func() {
|
||||
for _, bsr := range bsrs {
|
||||
putBlockStreamReader(bsr)
|
||||
}
|
||||
bsrs = nil
|
||||
}
|
||||
bsrs := mustOpenBlockStreamReaders(pws)
|
||||
|
||||
// Prepare BlockStreamWriter for destination part.
|
||||
srcSize := uint64(0)
|
||||
|
@ -1294,7 +1282,7 @@ func (pt *partition) mergeParts(pws []*partWrapper, stopCh <-chan struct{}, isFi
|
|||
var mpNew *inmemoryPart
|
||||
if dstPartType == partInmemory {
|
||||
mpNew = getInmemoryPart()
|
||||
bsw.InitFromInmemoryPart(mpNew, compressLevel)
|
||||
bsw.MustInitFromInmemoryPart(mpNew, compressLevel)
|
||||
} else {
|
||||
if dstPartPath == "" {
|
||||
logger.Panicf("BUG: dstPartPath must be non-empty")
|
||||
|
@ -1306,7 +1294,9 @@ func (pt *partition) mergeParts(pws []*partWrapper, stopCh <-chan struct{}, isFi
|
|||
// Merge source parts to destination part.
|
||||
ph, err := pt.mergePartsInternal(dstPartPath, bsw, bsrs, dstPartType, stopCh)
|
||||
putBlockStreamWriter(bsw)
|
||||
closeBlockStreamReaders()
|
||||
for _, bsr := range bsrs {
|
||||
putBlockStreamReader(bsr)
|
||||
}
|
||||
if err != nil {
|
||||
pt.releasePartsToMerge(pws)
|
||||
return err
|
||||
|
@ -1401,23 +1391,18 @@ func (pt *partition) getDstPartPath(dstPartType partType, mergeIdx uint64) strin
|
|||
return dstPartPath
|
||||
}
|
||||
|
||||
func openBlockStreamReaders(pws []*partWrapper) ([]*blockStreamReader, error) {
|
||||
func mustOpenBlockStreamReaders(pws []*partWrapper) []*blockStreamReader {
|
||||
bsrs := make([]*blockStreamReader, 0, len(pws))
|
||||
for _, pw := range pws {
|
||||
bsr := getBlockStreamReader()
|
||||
if pw.mp != nil {
|
||||
bsr.InitFromInmemoryPart(pw.mp)
|
||||
bsr.MustInitFromInmemoryPart(pw.mp)
|
||||
} else {
|
||||
if err := bsr.InitFromFilePart(pw.p.path); err != nil {
|
||||
for _, bsr := range bsrs {
|
||||
putBlockStreamReader(bsr)
|
||||
}
|
||||
return nil, fmt.Errorf("cannot open source part for merging: %w", err)
|
||||
}
|
||||
bsr.MustInitFromFilePart(pw.p.path)
|
||||
}
|
||||
bsrs = append(bsrs, bsr)
|
||||
}
|
||||
return bsrs, nil
|
||||
return bsrs
|
||||
}
|
||||
|
||||
func (pt *partition) mergePartsInternal(dstPartPath string, bsw *blockStreamWriter, bsrs []*blockStreamReader, dstPartType partType, stopCh <-chan struct{}) (*partHeader, error) {
|
||||
|
@ -1476,10 +1461,7 @@ func (pt *partition) openCreatedPart(ph *partHeader, pws []*partWrapper, mpNew *
|
|||
return pwNew
|
||||
}
|
||||
// Open the created part from disk.
|
||||
pNew, err := openFilePart(dstPartPath)
|
||||
if err != nil {
|
||||
logger.Panicf("FATAL: cannot open merged part %s: %s", dstPartPath, err)
|
||||
}
|
||||
pNew := mustOpenFilePart(dstPartPath)
|
||||
pwNew := &partWrapper{
|
||||
p: pNew,
|
||||
refCount: 1,
|
||||
|
@ -1821,11 +1803,7 @@ func openParts(path string, partNames []string) ([]*partWrapper, error) {
|
|||
var pws []*partWrapper
|
||||
for _, partName := range partNames {
|
||||
partPath := filepath.Join(path, partName)
|
||||
p, err := openFilePart(partPath)
|
||||
if err != nil {
|
||||
mustCloseParts(pws)
|
||||
return nil, fmt.Errorf("cannot open part %q: %w", partPath, err)
|
||||
}
|
||||
p := mustOpenFilePart(partPath)
|
||||
pw := &partWrapper{
|
||||
p: p,
|
||||
refCount: 1,
|
||||
|
|
|
@ -89,7 +89,7 @@ func (rrm *rawRowsMarshaler) marshalToInmemoryPart(mp *inmemoryPart, rows []rawR
|
|||
// Use the minimum compression level for first-level in-memory blocks,
|
||||
// since they are going to be re-compressed during subsequent merges.
|
||||
const compressLevel = -5 // See https://github.com/facebook/zstd/releases/tag/v1.3.4
|
||||
rrm.bsw.InitFromInmemoryPart(mp, compressLevel)
|
||||
rrm.bsw.MustInitFromInmemoryPart(mp, compressLevel)
|
||||
|
||||
ph := &mp.ph
|
||||
ph.Reset()
|
||||
|
|
Loading…
Reference in a new issue