VictoriaMetrics/lib/storage/part.go
Aliaksandr Valialkin e1a6262302 lib/fs: replace fs.OpenReaderAt with fs.MustOpenReaderAt
All the callers for fs.OpenReaderAt expect that the file will be opened.
So it is better to log fatal error inside fs.MustOpenReaderAt instead of leaving this to the caller.
2020-11-23 09:55:41 +02:00

288 lines
6.8 KiB
Go

package storage
import (
"fmt"
"path/filepath"
"sync"
"sync/atomic"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/filestream"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
)
func getMaxCachedIndexBlocksPerPart() int {
maxCachedIndexBlocksPerPartOnce.Do(func() {
n := memory.Allowed() / 1024 / 1024 / 8
if n < 16 {
n = 16
}
maxCachedIndexBlocksPerPart = n
})
return maxCachedIndexBlocksPerPart
}
var (
maxCachedIndexBlocksPerPart int
maxCachedIndexBlocksPerPartOnce sync.Once
)
// part represents a searchable part containing time series data.
type part struct {
ph partHeader
// Filesystem path to the part.
//
// Empty for in-memory part.
path string
// Total size in bytes of part data.
size uint64
timestampsFile fs.MustReadAtCloser
valuesFile fs.MustReadAtCloser
indexFile fs.MustReadAtCloser
metaindex []metaindexRow
ibCache *indexBlockCache
}
// openFilePart opens file-based part from the given path.
func openFilePart(path string) (*part, error) {
path = filepath.Clean(path)
var ph partHeader
if err := ph.ParseFromPath(path); err != nil {
return nil, fmt.Errorf("cannot parse path to part: %w", err)
}
timestampsPath := path + "/timestamps.bin"
timestampsFile := fs.MustOpenReaderAt(timestampsPath)
timestampsSize := fs.MustFileSize(timestampsPath)
valuesPath := path + "/values.bin"
valuesFile := fs.MustOpenReaderAt(valuesPath)
valuesSize := fs.MustFileSize(valuesPath)
indexPath := path + "/index.bin"
indexFile := fs.MustOpenReaderAt(indexPath)
indexSize := fs.MustFileSize(indexPath)
metaindexPath := path + "/metaindex.bin"
metaindexFile, err := filestream.Open(metaindexPath, true)
if err != nil {
timestampsFile.MustClose()
valuesFile.MustClose()
indexFile.MustClose()
return nil, fmt.Errorf("cannot open metaindex file: %w", err)
}
metaindexSize := fs.MustFileSize(metaindexPath)
size := timestampsSize + valuesSize + indexSize + metaindexSize
return newPart(&ph, path, size, metaindexFile, timestampsFile, valuesFile, indexFile)
}
// newPart returns new part initialized with the given arguments.
//
// The returned part calls MustClose on all the files passed to newPart
// when calling part.MustClose.
func newPart(ph *partHeader, path string, size uint64, metaindexReader filestream.ReadCloser, timestampsFile, valuesFile, indexFile fs.MustReadAtCloser) (*part, error) {
var errors []error
metaindex, err := unmarshalMetaindexRows(nil, metaindexReader)
if err != nil {
errors = append(errors, fmt.Errorf("cannot unmarshal metaindex data: %w", err))
}
metaindexReader.MustClose()
var p part
p.ph = *ph
p.path = path
p.size = size
p.timestampsFile = timestampsFile
p.valuesFile = valuesFile
p.indexFile = indexFile
p.metaindex = metaindex
p.ibCache = newIndexBlockCache()
if len(errors) > 0 {
// Return only the first error, since it has no sense in returning all errors.
err = fmt.Errorf("cannot initialize part %q: %w", &p, errors[0])
p.MustClose()
return nil, err
}
return &p, nil
}
// String returns human-readable representation of p.
func (p *part) String() string {
if len(p.path) > 0 {
return p.path
}
return p.ph.String()
}
// MustClose closes all the part files.
func (p *part) MustClose() {
p.timestampsFile.MustClose()
p.valuesFile.MustClose()
p.indexFile.MustClose()
isBig := p.ph.RowsCount > maxRowsPerSmallPart()
p.ibCache.MustClose(isBig)
}
type indexBlock struct {
bhs []blockHeader
}
func getIndexBlock() *indexBlock {
v := indexBlockPool.Get()
if v == nil {
return &indexBlock{}
}
return v.(*indexBlock)
}
func putIndexBlock(ib *indexBlock) {
ib.bhs = ib.bhs[:0]
indexBlockPool.Put(ib)
}
var indexBlockPool sync.Pool
type indexBlockCache struct {
// Put atomic counters to the top of struct in order to align them to 8 bytes on 32-bit architectures.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/212
requests uint64
misses uint64
m map[uint64]*indexBlockCacheEntry
mu sync.RWMutex
cleanerStopCh chan struct{}
cleanerWG sync.WaitGroup
}
type indexBlockCacheEntry struct {
// Atomically updated counters must go first in the struct, so they are properly
// aligned to 8 bytes on 32-bit architectures.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/212
lastAccessTime uint64
ib *indexBlock
}
func newIndexBlockCache() *indexBlockCache {
var ibc indexBlockCache
ibc.m = make(map[uint64]*indexBlockCacheEntry)
ibc.cleanerStopCh = make(chan struct{})
ibc.cleanerWG.Add(1)
go func() {
defer ibc.cleanerWG.Done()
ibc.cleaner()
}()
return &ibc
}
func (ibc *indexBlockCache) MustClose(isBig bool) {
close(ibc.cleanerStopCh)
ibc.cleanerWG.Wait()
// It is safe returning ibc.m itemst to the pool, since Reset must
// be called only when no other goroutines access ibc entries.
for _, ibe := range ibc.m {
putIndexBlock(ibe.ib)
}
ibc.m = nil
}
// cleaner periodically cleans least recently used items.
func (ibc *indexBlockCache) cleaner() {
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
ibc.cleanByTimeout()
case <-ibc.cleanerStopCh:
return
}
}
}
func (ibc *indexBlockCache) cleanByTimeout() {
currentTime := fasttime.UnixTimestamp()
ibc.mu.Lock()
for k, ibe := range ibc.m {
// Delete items accessed more than a minute ago.
if currentTime-atomic.LoadUint64(&ibe.lastAccessTime) > 60 {
delete(ibc.m, k)
}
}
ibc.mu.Unlock()
}
func (ibc *indexBlockCache) Get(k uint64) *indexBlock {
atomic.AddUint64(&ibc.requests, 1)
ibc.mu.RLock()
ibe := ibc.m[k]
ibc.mu.RUnlock()
if ibe != nil {
currentTime := fasttime.UnixTimestamp()
if atomic.LoadUint64(&ibe.lastAccessTime) != currentTime {
atomic.StoreUint64(&ibe.lastAccessTime, currentTime)
}
return ibe.ib
}
atomic.AddUint64(&ibc.misses, 1)
return nil
}
func (ibc *indexBlockCache) Put(k uint64, ib *indexBlock) {
ibc.mu.Lock()
// Clean superflouos cache entries.
if overflow := len(ibc.m) - getMaxCachedIndexBlocksPerPart(); overflow > 0 {
// Remove 10% of items from the cache.
overflow = int(float64(len(ibc.m)) * 0.1)
for k := range ibc.m {
// Do not call putIndexBlock on ibc.m entries, since they may be used by concurrent goroutines.
delete(ibc.m, k)
overflow--
if overflow == 0 {
break
}
}
}
// Store frequently requested ib in the cache.
ibe := &indexBlockCacheEntry{
lastAccessTime: fasttime.UnixTimestamp(),
ib: ib,
}
ibc.m[k] = ibe
ibc.mu.Unlock()
}
func (ibc *indexBlockCache) Requests() uint64 {
return atomic.LoadUint64(&ibc.requests)
}
func (ibc *indexBlockCache) Misses() uint64 {
return atomic.LoadUint64(&ibc.misses)
}
func (ibc *indexBlockCache) Len() uint64 {
ibc.mu.Lock()
n := uint64(len(ibc.m))
ibc.mu.Unlock()
return n
}