VictoriaMetrics/lib/mergeset/part.go
Aliaksandr Valialkin eceaf13e5e lib/{storage,mergeset}: use time.Ticker instead of time.Timer where appropriate
It has been appeared that time.Timer was used in places where time.Ticker must be used instead.
This could result in blocked goroutines as in the https://github.com/VictoriaMetrics/VictoriaMetrics/issues/316 .
2020-02-13 13:10:07 +02:00

480 lines
11 KiB
Go

package mergeset
import (
"fmt"
"path/filepath"
"sync"
"sync/atomic"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/filestream"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
)
func getMaxCachedIndexBlocksPerPart() int {
maxCachedIndexBlocksPerPartOnce.Do(func() {
n := memory.Allowed() / 1024 / 1024 / 4
if n == 0 {
n = 10
}
maxCachedIndexBlocksPerPart = n
})
return maxCachedIndexBlocksPerPart
}
var (
maxCachedIndexBlocksPerPart int
maxCachedIndexBlocksPerPartOnce sync.Once
)
func getMaxCachedInmemoryBlocksPerPart() int {
maxCachedInmemoryBlocksPerPartOnce.Do(func() {
n := memory.Allowed() / 1024 / 1024 / 4
if n == 0 {
n = 10
}
maxCachedInmemoryBlocksPerPart = n
})
return maxCachedInmemoryBlocksPerPart
}
var (
maxCachedInmemoryBlocksPerPart int
maxCachedInmemoryBlocksPerPartOnce sync.Once
)
type part struct {
ph partHeader
path string
size uint64
mrs []metaindexRow
indexFile fs.MustReadAtCloser
itemsFile fs.MustReadAtCloser
lensFile fs.MustReadAtCloser
idxbCache *indexBlockCache
ibCache *inmemoryBlockCache
}
func openFilePart(path string) (*part, error) {
path = filepath.Clean(path)
var ph partHeader
if err := ph.ParseFromPath(path); err != nil {
return nil, fmt.Errorf("cannot parse path to part: %s", err)
}
metaindexPath := path + "/metaindex.bin"
metaindexFile, err := filestream.Open(metaindexPath, true)
if err != nil {
return nil, fmt.Errorf("cannot open %q: %s", metaindexPath, err)
}
metaindexSize := fs.MustFileSize(metaindexPath)
indexPath := path + "/index.bin"
indexFile, err := fs.OpenReaderAt(indexPath)
if err != nil {
metaindexFile.MustClose()
return nil, fmt.Errorf("cannot open %q: %s", indexPath, err)
}
indexSize := fs.MustFileSize(indexPath)
itemsPath := path + "/items.bin"
itemsFile, err := fs.OpenReaderAt(itemsPath)
if err != nil {
metaindexFile.MustClose()
indexFile.MustClose()
return nil, fmt.Errorf("cannot open %q: %s", itemsPath, err)
}
itemsSize := fs.MustFileSize(itemsPath)
lensPath := path + "/lens.bin"
lensFile, err := fs.OpenReaderAt(lensPath)
if err != nil {
metaindexFile.MustClose()
indexFile.MustClose()
itemsFile.MustClose()
return nil, fmt.Errorf("cannot open %q: %s", lensPath, err)
}
lensSize := fs.MustFileSize(lensPath)
size := metaindexSize + indexSize + itemsSize + lensSize
return newPart(&ph, path, size, metaindexFile, indexFile, itemsFile, lensFile)
}
func newPart(ph *partHeader, path string, size uint64, metaindexReader filestream.ReadCloser, indexFile, itemsFile, lensFile fs.MustReadAtCloser) (*part, error) {
var errors []error
mrs, err := unmarshalMetaindexRows(nil, metaindexReader)
if err != nil {
errors = append(errors, fmt.Errorf("cannot unmarshal metaindexRows: %s", err))
}
metaindexReader.MustClose()
var p part
p.path = path
p.size = size
p.mrs = mrs
p.indexFile = indexFile
p.itemsFile = itemsFile
p.lensFile = lensFile
p.ph.CopyFrom(ph)
p.idxbCache = newIndexBlockCache()
p.ibCache = newInmemoryBlockCache()
if len(errors) > 0 {
// Return only the first error, since it has no sense in returning all errors.
err := fmt.Errorf("error opening part %s: %s", p.path, errors[0])
p.MustClose()
return nil, err
}
return &p, nil
}
func (p *part) MustClose() {
p.indexFile.MustClose()
p.itemsFile.MustClose()
p.lensFile.MustClose()
p.idxbCache.MustClose()
p.ibCache.MustClose()
}
type indexBlock struct {
bhs []blockHeader
}
func getIndexBlock() *indexBlock {
v := indexBlockPool.Get()
if v == nil {
return &indexBlock{}
}
return v.(*indexBlock)
}
func putIndexBlock(idxb *indexBlock) {
idxb.bhs = idxb.bhs[:0]
indexBlockPool.Put(idxb)
}
var indexBlockPool sync.Pool
type indexBlockCache struct {
// Atomically updated counters must go first in the struct, so they are properly
// aligned to 8 bytes on 32-bit architectures.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/212
requests uint64
misses uint64
m map[uint64]*indexBlockCacheEntry
mu sync.RWMutex
cleanerStopCh chan struct{}
cleanerWG sync.WaitGroup
}
type indexBlockCacheEntry struct {
// Atomically updated counters must go first in the struct, so they are properly
// aligned to 8 bytes on 32-bit architectures.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/212
lastAccessTime uint64
idxb *indexBlock
}
func newIndexBlockCache() *indexBlockCache {
var idxbc indexBlockCache
idxbc.m = make(map[uint64]*indexBlockCacheEntry)
idxbc.cleanerStopCh = make(chan struct{})
idxbc.cleanerWG.Add(1)
go func() {
defer idxbc.cleanerWG.Done()
idxbc.cleaner()
}()
return &idxbc
}
func (idxbc *indexBlockCache) MustClose() {
close(idxbc.cleanerStopCh)
idxbc.cleanerWG.Wait()
atomic.AddUint64(&indexBlockCacheRequests, idxbc.requests)
atomic.AddUint64(&indexBlockCacheMisses, idxbc.misses)
// It is safe returning idxbc.m to pool, since the Reset must be called
// when the idxbc entries are no longer accessed by concurrent goroutines.
for _, idxbe := range idxbc.m {
putIndexBlock(idxbe.idxb)
}
idxbc.m = nil
}
// cleaner periodically cleans least recently used items.
func (idxbc *indexBlockCache) cleaner() {
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
idxbc.cleanByTimeout()
case <-idxbc.cleanerStopCh:
return
}
}
}
func (idxbc *indexBlockCache) cleanByTimeout() {
currentTime := atomic.LoadUint64(&currentTimestamp)
idxbc.mu.Lock()
for k, idxbe := range idxbc.m {
// Delete items accessed more than 10 minutes ago.
if currentTime-atomic.LoadUint64(&idxbe.lastAccessTime) > 10*60 {
delete(idxbc.m, k)
}
}
idxbc.mu.Unlock()
}
var (
indexBlockCacheRequests uint64
indexBlockCacheMisses uint64
)
func (idxbc *indexBlockCache) Get(k uint64) *indexBlock {
atomic.AddUint64(&idxbc.requests, 1)
idxbc.mu.RLock()
idxbe := idxbc.m[k]
idxbc.mu.RUnlock()
if idxbe != nil {
currentTime := atomic.LoadUint64(&currentTimestamp)
if atomic.LoadUint64(&idxbe.lastAccessTime) != currentTime {
atomic.StoreUint64(&idxbe.lastAccessTime, currentTime)
}
return idxbe.idxb
}
atomic.AddUint64(&idxbc.misses, 1)
return nil
}
// Put puts idxb under the key k into idxbc.
//
// Returns true if the idxb has been put into idxbc.
func (idxbc *indexBlockCache) Put(k uint64, idxb *indexBlock) bool {
idxbc.mu.Lock()
// Remove superflouos entries.
if overflow := len(idxbc.m) - getMaxCachedIndexBlocksPerPart(); overflow > 0 {
// Remove 10% of items from the cache.
overflow = int(float64(len(idxbc.m)) * 0.1)
for k := range idxbc.m {
// Do not return idxb to pool, since these entries may be used
// by concurrent goroutines.
delete(idxbc.m, k)
overflow--
if overflow == 0 {
break
}
}
}
// Store idxb in the cache.
idxbe := &indexBlockCacheEntry{
lastAccessTime: atomic.LoadUint64(&currentTimestamp),
idxb: idxb,
}
idxbc.m[k] = idxbe
idxbc.mu.Unlock()
return true
}
func (idxbc *indexBlockCache) Len() uint64 {
idxbc.mu.RLock()
n := len(idxbc.m)
idxbc.mu.RUnlock()
return uint64(n)
}
func (idxbc *indexBlockCache) Requests() uint64 {
return atomic.LoadUint64(&idxbc.requests)
}
func (idxbc *indexBlockCache) Misses() uint64 {
return atomic.LoadUint64(&idxbc.misses)
}
type inmemoryBlockCache struct {
// Atomically updated counters must go first in the struct, so they are properly
// aligned to 8 bytes on 32-bit architectures.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/212
requests uint64
misses uint64
m map[inmemoryBlockCacheKey]*inmemoryBlockCacheEntry
mu sync.RWMutex
cleanerStopCh chan struct{}
cleanerWG sync.WaitGroup
}
type inmemoryBlockCacheKey struct {
firstItem string
itemsBlockOffset uint64
}
func (ibck *inmemoryBlockCacheKey) Init(bh *blockHeader) {
ibck.firstItem = ""
if bh.itemsBlockSize == 0 {
ibck.firstItem = string(bh.firstItem)
}
ibck.itemsBlockOffset = bh.itemsBlockOffset
}
type inmemoryBlockCacheEntry struct {
// Atomically updated counters must go first in the struct, so they are properly
// aligned to 8 bytes on 32-bit architectures.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/212
lastAccessTime uint64
ib *inmemoryBlock
}
func newInmemoryBlockCache() *inmemoryBlockCache {
var ibc inmemoryBlockCache
ibc.m = make(map[inmemoryBlockCacheKey]*inmemoryBlockCacheEntry)
ibc.cleanerStopCh = make(chan struct{})
ibc.cleanerWG.Add(1)
go func() {
defer ibc.cleanerWG.Done()
ibc.cleaner()
}()
return &ibc
}
func (ibc *inmemoryBlockCache) MustClose() {
close(ibc.cleanerStopCh)
ibc.cleanerWG.Wait()
atomic.AddUint64(&inmemoryBlockCacheRequests, ibc.requests)
atomic.AddUint64(&inmemoryBlockCacheMisses, ibc.misses)
// It is safe returning ibc.m entries to pool, since the Reset function may be called
// only if no other goroutines access ibc entries.
for _, ibe := range ibc.m {
putInmemoryBlock(ibe.ib)
}
ibc.m = nil
}
// cleaner periodically cleans least recently used items.
func (ibc *inmemoryBlockCache) cleaner() {
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
ibc.cleanByTimeout()
case <-ibc.cleanerStopCh:
return
}
}
}
func (ibc *inmemoryBlockCache) cleanByTimeout() {
currentTime := atomic.LoadUint64(&currentTimestamp)
ibc.mu.Lock()
for k, ibe := range ibc.m {
// Delete items accessed more than 10 minutes ago.
if currentTime-atomic.LoadUint64(&ibe.lastAccessTime) > 10*60 {
delete(ibc.m, k)
}
}
ibc.mu.Unlock()
}
var (
inmemoryBlockCacheRequests uint64
inmemoryBlockCacheMisses uint64
)
func (ibc *inmemoryBlockCache) Get(k inmemoryBlockCacheKey) *inmemoryBlock {
atomic.AddUint64(&ibc.requests, 1)
ibc.mu.RLock()
ibe := ibc.m[k]
ibc.mu.RUnlock()
if ibe != nil {
currentTime := atomic.LoadUint64(&currentTimestamp)
if atomic.LoadUint64(&ibe.lastAccessTime) != currentTime {
atomic.StoreUint64(&ibe.lastAccessTime, currentTime)
}
return ibe.ib
}
atomic.AddUint64(&ibc.misses, 1)
return nil
}
// Put puts ib under key k into ibc.
//
// Returns true if ib was put into ibc.
func (ibc *inmemoryBlockCache) Put(k inmemoryBlockCacheKey, ib *inmemoryBlock) bool {
ibc.mu.Lock()
// Clean superflouos entries in cache.
if overflow := len(ibc.m) - getMaxCachedInmemoryBlocksPerPart(); overflow > 0 {
// Remove 10% of items from the cache.
overflow = int(float64(len(ibc.m)) * 0.1)
for k := range ibc.m {
// do not call putInmemoryBlock(ib), since the ib
// may be used by concurrent goroutines.
delete(ibc.m, k)
overflow--
if overflow == 0 {
break
}
}
}
// Store ib in the cache.
ibe := &inmemoryBlockCacheEntry{
lastAccessTime: atomic.LoadUint64(&currentTimestamp),
ib: ib,
}
ibc.m[k] = ibe
ibc.mu.Unlock()
return true
}
func (ibc *inmemoryBlockCache) Len() uint64 {
ibc.mu.RLock()
n := len(ibc.m)
ibc.mu.RUnlock()
return uint64(n)
}
func (ibc *inmemoryBlockCache) Requests() uint64 {
return atomic.LoadUint64(&ibc.requests)
}
func (ibc *inmemoryBlockCache) Misses() uint64 {
return atomic.LoadUint64(&ibc.misses)
}
func init() {
go func() {
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for tm := range ticker.C {
t := uint64(tm.Unix())
atomic.StoreUint64(&currentTimestamp, t)
}
}()
}
var currentTimestamp uint64