VictoriaMetrics/lib/mergeset/part.go
Aliaksandr Valialkin 97b1e11612
lib/mergeset: consistently use OS-independent separator in file paths
This is needed for Windows support, which uses `\` instead of `/` as file separator

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/70
2023-03-25 14:34:33 -07:00

149 lines
3.6 KiB
Go

package mergeset
import (
"fmt"
"path/filepath"
"sync"
"unsafe"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/blockcache"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/filestream"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
)
var idxbCache = blockcache.NewCache(getMaxIndexBlocksCacheSize)
var ibCache = blockcache.NewCache(getMaxInmemoryBlocksCacheSize)
// SetIndexBlocksCacheSize overrides the default size of indexdb/indexBlock cache
func SetIndexBlocksCacheSize(size int) {
maxIndexBlockCacheSize = size
}
func getMaxIndexBlocksCacheSize() int {
maxIndexBlockCacheSizeOnce.Do(func() {
if maxIndexBlockCacheSize <= 0 {
maxIndexBlockCacheSize = int(0.10 * float64(memory.Allowed()))
}
})
return maxIndexBlockCacheSize
}
var (
maxIndexBlockCacheSize int
maxIndexBlockCacheSizeOnce sync.Once
)
// SetDataBlocksCacheSize overrides the default size of indexdb/dataBlocks cache
func SetDataBlocksCacheSize(size int) {
maxInmemoryBlockCacheSize = size
}
func getMaxInmemoryBlocksCacheSize() int {
maxInmemoryBlockCacheSizeOnce.Do(func() {
if maxInmemoryBlockCacheSize <= 0 {
maxInmemoryBlockCacheSize = int(0.25 * float64(memory.Allowed()))
}
})
return maxInmemoryBlockCacheSize
}
var (
maxInmemoryBlockCacheSize int
maxInmemoryBlockCacheSizeOnce sync.Once
)
type part struct {
ph partHeader
path string
size uint64
mrs []metaindexRow
indexFile fs.MustReadAtCloser
itemsFile fs.MustReadAtCloser
lensFile fs.MustReadAtCloser
}
func openFilePart(path string) (*part, error) {
var ph partHeader
if err := ph.ReadMetadata(path); err != nil {
return nil, fmt.Errorf("cannot read part metadata: %w", err)
}
metaindexPath := filepath.Join(path, metaindexFilename)
metaindexFile, err := filestream.Open(metaindexPath, true)
if err != nil {
return nil, fmt.Errorf("cannot open %q: %w", metaindexPath, err)
}
metaindexSize := fs.MustFileSize(metaindexPath)
indexPath := filepath.Join(path, indexFilename)
indexFile := fs.MustOpenReaderAt(indexPath)
indexSize := fs.MustFileSize(indexPath)
itemsPath := filepath.Join(path, itemsFilename)
itemsFile := fs.MustOpenReaderAt(itemsPath)
itemsSize := fs.MustFileSize(itemsPath)
lensPath := filepath.Join(path, lensFilename)
lensFile := fs.MustOpenReaderAt(lensPath)
lensSize := fs.MustFileSize(lensPath)
size := metaindexSize + indexSize + itemsSize + lensSize
return newPart(&ph, path, size, metaindexFile, indexFile, itemsFile, lensFile)
}
func newPart(ph *partHeader, path string, size uint64, metaindexReader filestream.ReadCloser, indexFile, itemsFile, lensFile fs.MustReadAtCloser) (*part, error) {
var errors []error
mrs, err := unmarshalMetaindexRows(nil, metaindexReader)
if err != nil {
errors = append(errors, fmt.Errorf("cannot unmarshal metaindexRows: %w", err))
}
metaindexReader.MustClose()
var p part
p.path = path
p.size = size
p.mrs = mrs
p.indexFile = indexFile
p.itemsFile = itemsFile
p.lensFile = lensFile
p.ph.CopyFrom(ph)
if len(errors) > 0 {
// Return only the first error, since it has no sense in returning all errors.
err := fmt.Errorf("error opening part %s: %w", p.path, errors[0])
p.MustClose()
return nil, err
}
return &p, nil
}
func (p *part) MustClose() {
p.indexFile.MustClose()
p.itemsFile.MustClose()
p.lensFile.MustClose()
idxbCache.RemoveBlocksForPart(p)
ibCache.RemoveBlocksForPart(p)
}
type indexBlock struct {
bhs []blockHeader
// The buffer for holding the data referrred by bhs
buf []byte
}
func (idxb *indexBlock) SizeBytes() int {
bhs := idxb.bhs[:cap(idxb.bhs)]
n := int(unsafe.Sizeof(*idxb))
for i := range bhs {
n += bhs[i].SizeBytes()
}
return n
}