lib/{mergeset,storage}: fix uint64 counters alignment for 32-bit architectures (GOARCH=386, GOARCH=arm)

This commit is contained in:
Aliaksandr Valialkin 2020-01-14 22:47:04 +02:00
parent 7830c10eb2
commit 893b62c682
5 changed files with 43 additions and 52 deletions

View file

@ -6,7 +6,6 @@ import (
"sync"
"sync/atomic"
"time"
"unsafe"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/filestream"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
@ -45,7 +44,7 @@ var (
maxCachedInmemoryBlocksPerPartOnce sync.Once
)
type partInternals struct {
type part struct {
ph partHeader
path string
@ -57,16 +56,9 @@ type partInternals struct {
indexFile fs.ReadAtCloser
itemsFile fs.ReadAtCloser
lensFile fs.ReadAtCloser
}
type part struct {
partInternals
// Align atomic counters inside caches by 8 bytes on 32-bit architectures.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/212 .
_ [(8 - (unsafe.Sizeof(partInternals{}) % 8)) % 8]byte
idxbCache indexBlockCache
ibCache inmemoryBlockCache
idxbCache *indexBlockCache
ibCache *inmemoryBlockCache
}
func openFilePart(path string) (*part, error) {
@ -133,8 +125,8 @@ func newPart(ph *partHeader, path string, size uint64, metaindexReader filestrea
p.lensFile = lensFile
p.ph.CopyFrom(ph)
p.idxbCache.Init()
p.ibCache.Init()
p.idxbCache = newIndexBlockCache()
p.ibCache = newInmemoryBlockCache()
if len(errors) > 0 {
// Return only the first error, since it has no sense in returning all errors.
@ -188,21 +180,24 @@ type indexBlockCache struct {
}
type indexBlockCacheEntry struct {
idxb *indexBlock
// Atomically updated counters must go first in the struct, so they are properly
// aligned to 8 bytes on 32-bit architectures.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/212
lastAccessTime uint64
idxb *indexBlock
}
func (idxbc *indexBlockCache) Init() {
func newIndexBlockCache() *indexBlockCache {
var idxbc indexBlockCache
idxbc.m = make(map[uint64]indexBlockCacheEntry)
idxbc.requests = 0
idxbc.misses = 0
idxbc.cleanerStopCh = make(chan struct{})
idxbc.cleanerWG.Add(1)
go func() {
defer idxbc.cleanerWG.Done()
idxbc.cleaner()
}()
return &idxbc
}
func (idxbc *indexBlockCache) MustClose() {
@ -290,8 +285,8 @@ func (idxbc *indexBlockCache) Put(k uint64, idxb *indexBlock) bool {
// Store idxb in the cache.
idxbe := indexBlockCacheEntry{
idxb: idxb,
lastAccessTime: atomic.LoadUint64(&currentTimestamp),
idxb: idxb,
}
idxbc.m[k] = idxbe
idxbc.mu.Unlock()
@ -341,14 +336,17 @@ func (ibck *inmemoryBlockCacheKey) Init(bh *blockHeader) {
}
type inmemoryBlockCacheEntry struct {
ib *inmemoryBlock
// Atomically updated counters must go first in the struct, so they are properly
// aligned to 8 bytes on 32-bit architectures.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/212
lastAccessTime uint64
ib *inmemoryBlock
}
func (ibc *inmemoryBlockCache) Init() {
func newInmemoryBlockCache() *inmemoryBlockCache {
var ibc inmemoryBlockCache
ibc.m = make(map[inmemoryBlockCacheKey]inmemoryBlockCacheEntry)
ibc.requests = 0
ibc.misses = 0
ibc.cleanerStopCh = make(chan struct{})
ibc.cleanerWG.Add(1)
@ -356,6 +354,7 @@ func (ibc *inmemoryBlockCache) Init() {
defer ibc.cleanerWG.Done()
ibc.cleaner()
}()
return &ibc
}
func (ibc *inmemoryBlockCache) MustClose() {
@ -444,8 +443,8 @@ func (ibc *inmemoryBlockCache) Put(k inmemoryBlockCacheKey, ib *inmemoryBlock) b
// Store ib in the cache.
ibe := inmemoryBlockCacheEntry{
ib: ib,
lastAccessTime: atomic.LoadUint64(&currentTimestamp),
ib: ib,
}
ibc.m[k] = ibe
ibc.mu.Unlock()

View file

@ -82,8 +82,8 @@ func (ps *partSearch) Init(p *part, shouldCacheBlock func(item []byte) bool) {
ps.reset()
ps.p = p
ps.idxbCache = &p.idxbCache
ps.ibCache = &p.ibCache
ps.idxbCache = p.idxbCache
ps.ibCache = p.ibCache
}
// Seek seeks for the first item greater or equal to k in ps.

View file

@ -6,7 +6,6 @@ import (
"sync"
"sync/atomic"
"time"
"unsafe"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/filestream"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
@ -29,7 +28,8 @@ var (
maxCachedIndexBlocksPerPartOnce sync.Once
)
type partInternals struct {
// part represents a searchable part containing time series data.
type part struct {
ph partHeader
// Filesystem path to the part.
@ -45,16 +45,8 @@ type partInternals struct {
indexFile fs.ReadAtCloser
metaindex []metaindexRow
}
// part represents a searchable part containing time series data.
type part struct {
partInternals
// Align ibCache to 8 bytes in order to align internal counters on 32-bit architectures.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/212
_ [(8 - (unsafe.Sizeof(partInternals{}) % 8)) % 8]byte
ibCache indexBlockCache
ibCache *indexBlockCache
}
// openFilePart opens file-based part from the given path.
@ -133,7 +125,7 @@ func newPart(ph *partHeader, path string, size uint64, metaindexReader filestrea
return nil, err
}
p.ibCache.Init()
p.ibCache = newIndexBlockCache()
return &p, nil
}
@ -190,15 +182,18 @@ type indexBlockCache struct {
}
type indexBlockCacheEntry struct {
ib *indexBlock
// Atomically updated counters must go first in the struct, so they are properly
// aligned to 8 bytes on 32-bit architectures.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/212
lastAccessTime uint64
ib *indexBlock
}
func (ibc *indexBlockCache) Init() {
func newIndexBlockCache() *indexBlockCache {
var ibc indexBlockCache
ibc.m = make(map[uint64]indexBlockCacheEntry)
ibc.missesMap = make(map[uint64]uint64)
ibc.requests = 0
ibc.misses = 0
ibc.cleanerStopCh = make(chan struct{})
ibc.cleanerWG.Add(1)
@ -206,6 +201,7 @@ func (ibc *indexBlockCache) Init() {
defer ibc.cleanerWG.Done()
ibc.cleaner()
}()
return &ibc
}
func (ibc *indexBlockCache) MustClose(isBig bool) {
@ -320,8 +316,8 @@ func (ibc *indexBlockCache) Put(k uint64, ib *indexBlock) bool {
// Store frequently requested ib in the cache.
delete(ibc.missesMap, k)
ibe := indexBlockCacheEntry{
ib: ib,
lastAccessTime: atomic.LoadUint64(&currentTimestamp),
ib: ib,
}
ibc.m[k] = ibe
ibc.mu.Unlock()

View file

@ -88,7 +88,7 @@ func (ps *partSearch) Init(p *part, tsids []TSID, tr TimeRange, fetchData bool)
ps.tr = tr
ps.fetchData = fetchData
ps.metaindex = p.metaindex
ps.ibCache = &p.ibCache
ps.ibCache = p.ibCache
// Advance to the first tsid. There is no need in checking
// the returned result, since it will be checked in NextBlock.

View file

@ -110,10 +110,8 @@ func testAppendPartsToMerge(t *testing.T, maxPartsToMerge int, initialRowsCount,
prefix := []*partWrapper{
{
p: &part{
partInternals: partInternals{
ph: partHeader{
RowsCount: 1234,
},
ph: partHeader{
RowsCount: 1234,
},
},
},
@ -146,10 +144,8 @@ func newTestPartWrappersForRowsCount(rowsCount []uint64) []*partWrapper {
for _, rc := range rowsCount {
pw := &partWrapper{
p: &part{
partInternals: partInternals{
ph: partHeader{
RowsCount: rc,
},
ph: partHeader{
RowsCount: rc,
},
},
}