mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
lib/mergeset: consistently use atomic.* types instead of atomic.* function calls on ordinary types
See ea9e2b19a5
This commit is contained in:
parent
275335c181
commit
f81b480905
12 changed files with 94 additions and 94 deletions
|
@ -81,7 +81,7 @@ func mustOpenIndexdb(path, partitionName string, s *Storage) *indexdb {
|
||||||
partitionName: partitionName,
|
partitionName: partitionName,
|
||||||
s: s,
|
s: s,
|
||||||
}
|
}
|
||||||
isReadOnly := uint32(0)
|
var isReadOnly atomic.Bool
|
||||||
idb.tb = mergeset.MustOpenTable(path, idb.invalidateStreamFilterCache, mergeTagToStreamIDsRows, &isReadOnly)
|
idb.tb = mergeset.MustOpenTable(path, idb.invalidateStreamFilterCache, mergeTagToStreamIDsRows, &isReadOnly)
|
||||||
return idb
|
return idb
|
||||||
}
|
}
|
||||||
|
|
|
@ -28,7 +28,7 @@ type PrepareBlockCallback func(data []byte, items []Item) ([]byte, []Item)
|
||||||
//
|
//
|
||||||
// It also atomically adds the number of items merged to itemsMerged.
|
// It also atomically adds the number of items merged to itemsMerged.
|
||||||
func mergeBlockStreams(ph *partHeader, bsw *blockStreamWriter, bsrs []*blockStreamReader, prepareBlock PrepareBlockCallback, stopCh <-chan struct{},
|
func mergeBlockStreams(ph *partHeader, bsw *blockStreamWriter, bsrs []*blockStreamReader, prepareBlock PrepareBlockCallback, stopCh <-chan struct{},
|
||||||
itemsMerged *uint64) error {
|
itemsMerged *atomic.Uint64) error {
|
||||||
bsm := bsmPool.Get().(*blockStreamMerger)
|
bsm := bsmPool.Get().(*blockStreamMerger)
|
||||||
if err := bsm.Init(bsrs, prepareBlock); err != nil {
|
if err := bsm.Init(bsrs, prepareBlock); err != nil {
|
||||||
return fmt.Errorf("cannot initialize blockStreamMerger: %w", err)
|
return fmt.Errorf("cannot initialize blockStreamMerger: %w", err)
|
||||||
|
@ -100,7 +100,7 @@ func (bsm *blockStreamMerger) Init(bsrs []*blockStreamReader, prepareBlock Prepa
|
||||||
|
|
||||||
var errForciblyStopped = fmt.Errorf("forcibly stopped")
|
var errForciblyStopped = fmt.Errorf("forcibly stopped")
|
||||||
|
|
||||||
func (bsm *blockStreamMerger) Merge(bsw *blockStreamWriter, ph *partHeader, stopCh <-chan struct{}, itemsMerged *uint64) error {
|
func (bsm *blockStreamMerger) Merge(bsw *blockStreamWriter, ph *partHeader, stopCh <-chan struct{}, itemsMerged *atomic.Uint64) error {
|
||||||
again:
|
again:
|
||||||
if len(bsm.bsrHeap) == 0 {
|
if len(bsm.bsrHeap) == 0 {
|
||||||
// Write the last (maybe incomplete) inmemoryBlock to bsw.
|
// Write the last (maybe incomplete) inmemoryBlock to bsw.
|
||||||
|
@ -163,14 +163,14 @@ again:
|
||||||
goto again
|
goto again
|
||||||
}
|
}
|
||||||
|
|
||||||
func (bsm *blockStreamMerger) flushIB(bsw *blockStreamWriter, ph *partHeader, itemsMerged *uint64) {
|
func (bsm *blockStreamMerger) flushIB(bsw *blockStreamWriter, ph *partHeader, itemsMerged *atomic.Uint64) {
|
||||||
items := bsm.ib.items
|
items := bsm.ib.items
|
||||||
data := bsm.ib.data
|
data := bsm.ib.data
|
||||||
if len(items) == 0 {
|
if len(items) == 0 {
|
||||||
// Nothing to flush.
|
// Nothing to flush.
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
atomic.AddUint64(itemsMerged, uint64(len(items)))
|
itemsMerged.Add(uint64(len(items)))
|
||||||
if bsm.prepareBlock != nil {
|
if bsm.prepareBlock != nil {
|
||||||
bsm.firstItem = append(bsm.firstItem[:0], items[0].String(data)...)
|
bsm.firstItem = append(bsm.firstItem[:0], items[0].String(data)...)
|
||||||
bsm.lastItem = append(bsm.lastItem[:0], items[len(items)-1].String(data)...)
|
bsm.lastItem = append(bsm.lastItem[:0], items[len(items)-1].String(data)...)
|
||||||
|
|
|
@ -6,6 +6,7 @@ import (
|
||||||
"math/rand"
|
"math/rand"
|
||||||
"reflect"
|
"reflect"
|
||||||
"sort"
|
"sort"
|
||||||
|
"sync/atomic"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
@ -27,7 +28,7 @@ func TestMultilevelMerge(t *testing.T) {
|
||||||
|
|
||||||
// Prepare blocks to merge.
|
// Prepare blocks to merge.
|
||||||
bsrs, items := newTestInmemoryBlockStreamReaders(r, 10, 4000)
|
bsrs, items := newTestInmemoryBlockStreamReaders(r, 10, 4000)
|
||||||
var itemsMerged uint64
|
var itemsMerged atomic.Uint64
|
||||||
|
|
||||||
// First level merge
|
// First level merge
|
||||||
var dstIP1 inmemoryPart
|
var dstIP1 inmemoryPart
|
||||||
|
@ -44,12 +45,12 @@ func TestMultilevelMerge(t *testing.T) {
|
||||||
t.Fatalf("cannot merge first level part 2: %s", err)
|
t.Fatalf("cannot merge first level part 2: %s", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if itemsMerged != uint64(len(items)) {
|
if n := itemsMerged.Load(); n != uint64(len(items)) {
|
||||||
t.Fatalf("unexpected itemsMerged; got %d; want %d", itemsMerged, len(items))
|
t.Fatalf("unexpected itemsMerged; got %d; want %d", n, len(items))
|
||||||
}
|
}
|
||||||
|
|
||||||
// Second level merge (aka final merge)
|
// Second level merge (aka final merge)
|
||||||
itemsMerged = 0
|
itemsMerged.Store(0)
|
||||||
var dstIP inmemoryPart
|
var dstIP inmemoryPart
|
||||||
var bsw blockStreamWriter
|
var bsw blockStreamWriter
|
||||||
bsrsTop := []*blockStreamReader{
|
bsrsTop := []*blockStreamReader{
|
||||||
|
@ -60,8 +61,8 @@ func TestMultilevelMerge(t *testing.T) {
|
||||||
if err := mergeBlockStreams(&dstIP.ph, &bsw, bsrsTop, nil, nil, &itemsMerged); err != nil {
|
if err := mergeBlockStreams(&dstIP.ph, &bsw, bsrsTop, nil, nil, &itemsMerged); err != nil {
|
||||||
t.Fatalf("cannot merge second level: %s", err)
|
t.Fatalf("cannot merge second level: %s", err)
|
||||||
}
|
}
|
||||||
if itemsMerged != uint64(len(items)) {
|
if n := itemsMerged.Load(); n != uint64(len(items)) {
|
||||||
t.Fatalf("unexpected itemsMerged after final merge; got %d; want %d", itemsMerged, len(items))
|
t.Fatalf("unexpected itemsMerged after final merge; got %d; want %d", n, len(items))
|
||||||
}
|
}
|
||||||
|
|
||||||
// Verify the resulting part (dstIP) contains all the items
|
// Verify the resulting part (dstIP) contains all the items
|
||||||
|
@ -78,13 +79,13 @@ func TestMergeForciblyStop(t *testing.T) {
|
||||||
var bsw blockStreamWriter
|
var bsw blockStreamWriter
|
||||||
bsw.MustInitFromInmemoryPart(&dstIP, 1)
|
bsw.MustInitFromInmemoryPart(&dstIP, 1)
|
||||||
ch := make(chan struct{})
|
ch := make(chan struct{})
|
||||||
var itemsMerged uint64
|
var itemsMerged atomic.Uint64
|
||||||
close(ch)
|
close(ch)
|
||||||
if err := mergeBlockStreams(&dstIP.ph, &bsw, bsrs, nil, ch, &itemsMerged); !errors.Is(err, errForciblyStopped) {
|
if err := mergeBlockStreams(&dstIP.ph, &bsw, bsrs, nil, ch, &itemsMerged); !errors.Is(err, errForciblyStopped) {
|
||||||
t.Fatalf("unexpected error during merge: got %v; want %v", err, errForciblyStopped)
|
t.Fatalf("unexpected error during merge: got %v; want %v", err, errForciblyStopped)
|
||||||
}
|
}
|
||||||
if itemsMerged != 0 {
|
if n := itemsMerged.Load(); n != 0 {
|
||||||
t.Fatalf("unexpected itemsMerged; got %d; want %d", itemsMerged, 0)
|
t.Fatalf("unexpected itemsMerged; got %d; want %d", n, 0)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -122,15 +123,15 @@ func testMergeBlockStreamsSerial(r *rand.Rand, blocksToMerge, maxItemsPerBlock i
|
||||||
bsrs, items := newTestInmemoryBlockStreamReaders(r, blocksToMerge, maxItemsPerBlock)
|
bsrs, items := newTestInmemoryBlockStreamReaders(r, blocksToMerge, maxItemsPerBlock)
|
||||||
|
|
||||||
// Merge blocks.
|
// Merge blocks.
|
||||||
var itemsMerged uint64
|
var itemsMerged atomic.Uint64
|
||||||
var dstIP inmemoryPart
|
var dstIP inmemoryPart
|
||||||
var bsw blockStreamWriter
|
var bsw blockStreamWriter
|
||||||
bsw.MustInitFromInmemoryPart(&dstIP, -4)
|
bsw.MustInitFromInmemoryPart(&dstIP, -4)
|
||||||
if err := mergeBlockStreams(&dstIP.ph, &bsw, bsrs, nil, nil, &itemsMerged); err != nil {
|
if err := mergeBlockStreams(&dstIP.ph, &bsw, bsrs, nil, nil, &itemsMerged); err != nil {
|
||||||
return fmt.Errorf("cannot merge block streams: %w", err)
|
return fmt.Errorf("cannot merge block streams: %w", err)
|
||||||
}
|
}
|
||||||
if itemsMerged != uint64(len(items)) {
|
if n := itemsMerged.Load(); n != uint64(len(items)) {
|
||||||
return fmt.Errorf("unexpected itemsMerged; got %d; want %d", itemsMerged, len(items))
|
return fmt.Errorf("unexpected itemsMerged; got %d; want %d", n, len(items))
|
||||||
}
|
}
|
||||||
|
|
||||||
// Verify the resulting part (dstIP) contains all the items
|
// Verify the resulting part (dstIP) contains all the items
|
||||||
|
|
|
@ -4,6 +4,7 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
"sort"
|
"sort"
|
||||||
|
"sync/atomic"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
@ -148,15 +149,15 @@ func testPartSearchSerial(r *rand.Rand, p *part, items []string) error {
|
||||||
func newTestPart(r *rand.Rand, blocksCount, maxItemsPerBlock int) (*part, []string, error) {
|
func newTestPart(r *rand.Rand, blocksCount, maxItemsPerBlock int) (*part, []string, error) {
|
||||||
bsrs, items := newTestInmemoryBlockStreamReaders(r, blocksCount, maxItemsPerBlock)
|
bsrs, items := newTestInmemoryBlockStreamReaders(r, blocksCount, maxItemsPerBlock)
|
||||||
|
|
||||||
var itemsMerged uint64
|
var itemsMerged atomic.Uint64
|
||||||
var ip inmemoryPart
|
var ip inmemoryPart
|
||||||
var bsw blockStreamWriter
|
var bsw blockStreamWriter
|
||||||
bsw.MustInitFromInmemoryPart(&ip, -3)
|
bsw.MustInitFromInmemoryPart(&ip, -3)
|
||||||
if err := mergeBlockStreams(&ip.ph, &bsw, bsrs, nil, nil, &itemsMerged); err != nil {
|
if err := mergeBlockStreams(&ip.ph, &bsw, bsrs, nil, nil, &itemsMerged); err != nil {
|
||||||
return nil, nil, fmt.Errorf("cannot merge blocks: %w", err)
|
return nil, nil, fmt.Errorf("cannot merge blocks: %w", err)
|
||||||
}
|
}
|
||||||
if itemsMerged != uint64(len(items)) {
|
if n := itemsMerged.Load(); n != uint64(len(items)) {
|
||||||
return nil, nil, fmt.Errorf("unexpected itemsMerged; got %d; want %d", itemsMerged, len(items))
|
return nil, nil, fmt.Errorf("unexpected itemsMerged; got %d; want %d", n, len(items))
|
||||||
}
|
}
|
||||||
size := ip.size()
|
size := ip.size()
|
||||||
p := newPart(&ip.ph, "partName", size, ip.metaindexData.NewReader(), &ip.indexData, &ip.itemsData, &ip.lensData)
|
p := newPart(&ip.ph, "partName", size, ip.metaindexData.NewReader(), &ip.indexData, &ip.itemsData, &ip.lensData)
|
||||||
|
|
|
@ -83,33 +83,29 @@ func maxItemsPerCachedPart() uint64 {
|
||||||
|
|
||||||
// Table represents mergeset table.
|
// Table represents mergeset table.
|
||||||
type Table struct {
|
type Table struct {
|
||||||
// Atomically updated counters must go first in the struct, so they are properly
|
activeInmemoryMerges atomic.Int64
|
||||||
// aligned to 8 bytes on 32-bit architectures.
|
activeFileMerges atomic.Int64
|
||||||
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/212
|
|
||||||
|
|
||||||
activeInmemoryMerges uint64
|
inmemoryMergesCount atomic.Uint64
|
||||||
activeFileMerges uint64
|
fileMergesCount atomic.Uint64
|
||||||
|
|
||||||
inmemoryMergesCount uint64
|
inmemoryItemsMerged atomic.Uint64
|
||||||
fileMergesCount uint64
|
fileItemsMerged atomic.Uint64
|
||||||
|
|
||||||
inmemoryItemsMerged uint64
|
itemsAdded atomic.Uint64
|
||||||
fileItemsMerged uint64
|
itemsAddedSizeBytes atomic.Uint64
|
||||||
|
|
||||||
itemsAdded uint64
|
inmemoryPartsLimitReachedCount atomic.Uint64
|
||||||
itemsAddedSizeBytes uint64
|
|
||||||
|
|
||||||
inmemoryPartsLimitReachedCount uint64
|
mergeIdx atomic.Uint64
|
||||||
|
|
||||||
mergeIdx uint64
|
|
||||||
|
|
||||||
path string
|
path string
|
||||||
|
|
||||||
flushCallback func()
|
flushCallback func()
|
||||||
needFlushCallbackCall uint32
|
needFlushCallbackCall atomic.Bool
|
||||||
|
|
||||||
prepareBlock PrepareBlockCallback
|
prepareBlock PrepareBlockCallback
|
||||||
isReadOnly *uint32
|
isReadOnly *atomic.Bool
|
||||||
|
|
||||||
// rawItems contains recently added items that haven't been converted to parts yet.
|
// rawItems contains recently added items that haven't been converted to parts yet.
|
||||||
//
|
//
|
||||||
|
@ -353,7 +349,7 @@ func (pw *partWrapper) decRef() {
|
||||||
// to persistent storage.
|
// to persistent storage.
|
||||||
//
|
//
|
||||||
// The table is created if it doesn't exist yet.
|
// The table is created if it doesn't exist yet.
|
||||||
func MustOpenTable(path string, flushCallback func(), prepareBlock PrepareBlockCallback, isReadOnly *uint32) *Table {
|
func MustOpenTable(path string, flushCallback func(), prepareBlock PrepareBlockCallback, isReadOnly *atomic.Bool) *Table {
|
||||||
path = filepath.Clean(path)
|
path = filepath.Clean(path)
|
||||||
|
|
||||||
// Create a directory for the table if it doesn't exist yet.
|
// Create a directory for the table if it doesn't exist yet.
|
||||||
|
@ -363,7 +359,6 @@ func MustOpenTable(path string, flushCallback func(), prepareBlock PrepareBlockC
|
||||||
pws := mustOpenParts(path)
|
pws := mustOpenParts(path)
|
||||||
|
|
||||||
tb := &Table{
|
tb := &Table{
|
||||||
mergeIdx: uint64(time.Now().UnixNano()),
|
|
||||||
path: path,
|
path: path,
|
||||||
flushCallback: flushCallback,
|
flushCallback: flushCallback,
|
||||||
prepareBlock: prepareBlock,
|
prepareBlock: prepareBlock,
|
||||||
|
@ -372,6 +367,7 @@ func MustOpenTable(path string, flushCallback func(), prepareBlock PrepareBlockC
|
||||||
inmemoryPartsLimitCh: make(chan struct{}, maxInmemoryParts),
|
inmemoryPartsLimitCh: make(chan struct{}, maxInmemoryParts),
|
||||||
stopCh: make(chan struct{}),
|
stopCh: make(chan struct{}),
|
||||||
}
|
}
|
||||||
|
tb.mergeIdx.Store(uint64(time.Now().UnixNano()))
|
||||||
tb.rawItems.init()
|
tb.rawItems.init()
|
||||||
tb.startBackgroundWorkers()
|
tb.startBackgroundWorkers()
|
||||||
|
|
||||||
|
@ -464,7 +460,7 @@ func (tb *Table) startFlushCallbackWorker() {
|
||||||
tb.wg.Done()
|
tb.wg.Done()
|
||||||
return
|
return
|
||||||
case <-tc.C:
|
case <-tc.C:
|
||||||
if atomic.CompareAndSwapUint32(&tb.needFlushCallbackCall, 1, 0) {
|
if tb.needFlushCallbackCall.CompareAndSwap(true, false) {
|
||||||
tb.flushCallback()
|
tb.flushCallback()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -588,19 +584,19 @@ func (tm *TableMetrics) TotalItemsCount() uint64 {
|
||||||
|
|
||||||
// UpdateMetrics updates m with metrics from tb.
|
// UpdateMetrics updates m with metrics from tb.
|
||||||
func (tb *Table) UpdateMetrics(m *TableMetrics) {
|
func (tb *Table) UpdateMetrics(m *TableMetrics) {
|
||||||
m.ActiveInmemoryMerges += atomic.LoadUint64(&tb.activeInmemoryMerges)
|
m.ActiveInmemoryMerges += uint64(tb.activeInmemoryMerges.Load())
|
||||||
m.ActiveFileMerges += atomic.LoadUint64(&tb.activeFileMerges)
|
m.ActiveFileMerges += uint64(tb.activeFileMerges.Load())
|
||||||
|
|
||||||
m.InmemoryMergesCount += atomic.LoadUint64(&tb.inmemoryMergesCount)
|
m.InmemoryMergesCount += tb.inmemoryMergesCount.Load()
|
||||||
m.FileMergesCount += atomic.LoadUint64(&tb.fileMergesCount)
|
m.FileMergesCount += tb.fileMergesCount.Load()
|
||||||
|
|
||||||
m.InmemoryItemsMerged += atomic.LoadUint64(&tb.inmemoryItemsMerged)
|
m.InmemoryItemsMerged += tb.inmemoryItemsMerged.Load()
|
||||||
m.FileItemsMerged += atomic.LoadUint64(&tb.fileItemsMerged)
|
m.FileItemsMerged += tb.fileItemsMerged.Load()
|
||||||
|
|
||||||
m.ItemsAdded += atomic.LoadUint64(&tb.itemsAdded)
|
m.ItemsAdded += tb.itemsAdded.Load()
|
||||||
m.ItemsAddedSizeBytes += atomic.LoadUint64(&tb.itemsAddedSizeBytes)
|
m.ItemsAddedSizeBytes += tb.itemsAddedSizeBytes.Load()
|
||||||
|
|
||||||
m.InmemoryPartsLimitReachedCount += atomic.LoadUint64(&tb.inmemoryPartsLimitReachedCount)
|
m.InmemoryPartsLimitReachedCount += tb.inmemoryPartsLimitReachedCount.Load()
|
||||||
|
|
||||||
m.PendingItems += uint64(tb.rawItems.Len())
|
m.PendingItems += uint64(tb.rawItems.Len())
|
||||||
|
|
||||||
|
@ -644,12 +640,12 @@ func (tb *Table) UpdateMetrics(m *TableMetrics) {
|
||||||
// It logs the ignored items, so users could notice and fix the issue.
|
// It logs the ignored items, so users could notice and fix the issue.
|
||||||
func (tb *Table) AddItems(items [][]byte) {
|
func (tb *Table) AddItems(items [][]byte) {
|
||||||
tb.rawItems.addItems(tb, items)
|
tb.rawItems.addItems(tb, items)
|
||||||
atomic.AddUint64(&tb.itemsAdded, uint64(len(items)))
|
tb.itemsAdded.Add(uint64(len(items)))
|
||||||
n := 0
|
n := 0
|
||||||
for _, item := range items {
|
for _, item := range items {
|
||||||
n += len(item)
|
n += len(item)
|
||||||
}
|
}
|
||||||
atomic.AddUint64(&tb.itemsAddedSizeBytes, uint64(n))
|
tb.itemsAddedSizeBytes.Add(uint64(n))
|
||||||
}
|
}
|
||||||
|
|
||||||
// getParts appends parts snapshot to dst and returns it.
|
// getParts appends parts snapshot to dst and returns it.
|
||||||
|
@ -890,7 +886,7 @@ func (tb *Table) addToInmemoryParts(pw *partWrapper, isFinal bool) {
|
||||||
select {
|
select {
|
||||||
case tb.inmemoryPartsLimitCh <- struct{}{}:
|
case tb.inmemoryPartsLimitCh <- struct{}{}:
|
||||||
default:
|
default:
|
||||||
atomic.AddUint64(&tb.inmemoryPartsLimitReachedCount, 1)
|
tb.inmemoryPartsLimitReachedCount.Add(1)
|
||||||
select {
|
select {
|
||||||
case tb.inmemoryPartsLimitCh <- struct{}{}:
|
case tb.inmemoryPartsLimitCh <- struct{}{}:
|
||||||
case <-tb.stopCh:
|
case <-tb.stopCh:
|
||||||
|
@ -906,10 +902,10 @@ func (tb *Table) addToInmemoryParts(pw *partWrapper, isFinal bool) {
|
||||||
if isFinal {
|
if isFinal {
|
||||||
tb.flushCallback()
|
tb.flushCallback()
|
||||||
} else {
|
} else {
|
||||||
// Use atomic.LoadUint32 in front of atomic.CompareAndSwapUint32 in order to avoid slow inter-CPU synchronization
|
// Use Load in front of CompareAndSwap in order to avoid slow inter-CPU synchronization
|
||||||
// at fast path when needFlushCallbackCall is already set to 1.
|
// at fast path when needFlushCallbackCall is already set to true.
|
||||||
if atomic.LoadUint32(&tb.needFlushCallbackCall) == 0 {
|
if !tb.needFlushCallbackCall.Load() {
|
||||||
atomic.CompareAndSwapUint32(&tb.needFlushCallbackCall, 0, 1)
|
tb.needFlushCallbackCall.CompareAndSwap(false, true)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1072,7 +1068,7 @@ func (tb *Table) NotifyReadWriteMode() {
|
||||||
|
|
||||||
func (tb *Table) inmemoryPartsMerger() {
|
func (tb *Table) inmemoryPartsMerger() {
|
||||||
for {
|
for {
|
||||||
if atomic.LoadUint32(tb.isReadOnly) != 0 {
|
if tb.isReadOnly.Load() {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
maxOutBytes := tb.getMaxFilePartSize()
|
maxOutBytes := tb.getMaxFilePartSize()
|
||||||
|
@ -1105,7 +1101,7 @@ func (tb *Table) inmemoryPartsMerger() {
|
||||||
|
|
||||||
func (tb *Table) filePartsMerger() {
|
func (tb *Table) filePartsMerger() {
|
||||||
for {
|
for {
|
||||||
if atomic.LoadUint32(tb.isReadOnly) != 0 {
|
if tb.isReadOnly.Load() {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
maxOutBytes := tb.getMaxFilePartSize()
|
maxOutBytes := tb.getMaxFilePartSize()
|
||||||
|
@ -1303,9 +1299,9 @@ func mustOpenBlockStreamReaders(pws []*partWrapper) []*blockStreamReader {
|
||||||
|
|
||||||
func (tb *Table) mergePartsInternal(dstPartPath string, bsw *blockStreamWriter, bsrs []*blockStreamReader, dstPartType partType, stopCh <-chan struct{}) (*partHeader, error) {
|
func (tb *Table) mergePartsInternal(dstPartPath string, bsw *blockStreamWriter, bsrs []*blockStreamReader, dstPartType partType, stopCh <-chan struct{}) (*partHeader, error) {
|
||||||
var ph partHeader
|
var ph partHeader
|
||||||
var itemsMerged *uint64
|
var itemsMerged *atomic.Uint64
|
||||||
var mergesCount *uint64
|
var mergesCount *atomic.Uint64
|
||||||
var activeMerges *uint64
|
var activeMerges *atomic.Int64
|
||||||
switch dstPartType {
|
switch dstPartType {
|
||||||
case partInmemory:
|
case partInmemory:
|
||||||
itemsMerged = &tb.inmemoryItemsMerged
|
itemsMerged = &tb.inmemoryItemsMerged
|
||||||
|
@ -1318,10 +1314,10 @@ func (tb *Table) mergePartsInternal(dstPartPath string, bsw *blockStreamWriter,
|
||||||
default:
|
default:
|
||||||
logger.Panicf("BUG: unknown partType=%d", dstPartType)
|
logger.Panicf("BUG: unknown partType=%d", dstPartType)
|
||||||
}
|
}
|
||||||
atomic.AddUint64(activeMerges, 1)
|
activeMerges.Add(1)
|
||||||
err := mergeBlockStreams(&ph, bsw, bsrs, tb.prepareBlock, stopCh, itemsMerged)
|
err := mergeBlockStreams(&ph, bsw, bsrs, tb.prepareBlock, stopCh, itemsMerged)
|
||||||
atomic.AddUint64(activeMerges, ^uint64(0))
|
activeMerges.Add(-1)
|
||||||
atomic.AddUint64(mergesCount, 1)
|
mergesCount.Add(1)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("cannot merge %d parts to %s: %w", len(bsrs), dstPartPath, err)
|
return nil, fmt.Errorf("cannot merge %d parts to %s: %w", len(bsrs), dstPartPath, err)
|
||||||
}
|
}
|
||||||
|
@ -1462,7 +1458,7 @@ func getCompressLevel(itemsCount uint64) int {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (tb *Table) nextMergeIdx() uint64 {
|
func (tb *Table) nextMergeIdx() uint64 {
|
||||||
return atomic.AddUint64(&tb.mergeIdx, 1)
|
return tb.mergeIdx.Add(1)
|
||||||
}
|
}
|
||||||
|
|
||||||
func mustOpenParts(path string) []*partWrapper {
|
func mustOpenParts(path string) []*partWrapper {
|
||||||
|
|
|
@ -41,7 +41,7 @@ func TestTableSearchSerial(t *testing.T) {
|
||||||
|
|
||||||
func() {
|
func() {
|
||||||
// Re-open the table and verify the search works.
|
// Re-open the table and verify the search works.
|
||||||
var isReadOnly uint32
|
var isReadOnly atomic.Bool
|
||||||
tb := MustOpenTable(path, nil, nil, &isReadOnly)
|
tb := MustOpenTable(path, nil, nil, &isReadOnly)
|
||||||
defer tb.MustClose()
|
defer tb.MustClose()
|
||||||
if err := testTableSearchSerial(tb, items); err != nil {
|
if err := testTableSearchSerial(tb, items); err != nil {
|
||||||
|
@ -75,7 +75,7 @@ func TestTableSearchConcurrent(t *testing.T) {
|
||||||
|
|
||||||
// Re-open the table and verify the search works.
|
// Re-open the table and verify the search works.
|
||||||
func() {
|
func() {
|
||||||
var isReadOnly uint32
|
var isReadOnly atomic.Bool
|
||||||
tb := MustOpenTable(path, nil, nil, &isReadOnly)
|
tb := MustOpenTable(path, nil, nil, &isReadOnly)
|
||||||
defer tb.MustClose()
|
defer tb.MustClose()
|
||||||
if err := testTableSearchConcurrent(tb, items); err != nil {
|
if err := testTableSearchConcurrent(tb, items); err != nil {
|
||||||
|
@ -145,11 +145,11 @@ func testTableSearchSerial(tb *Table, items []string) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func newTestTable(r *rand.Rand, path string, itemsCount int) (*Table, []string, error) {
|
func newTestTable(r *rand.Rand, path string, itemsCount int) (*Table, []string, error) {
|
||||||
var flushes uint64
|
var flushes atomic.Uint64
|
||||||
flushCallback := func() {
|
flushCallback := func() {
|
||||||
atomic.AddUint64(&flushes, 1)
|
flushes.Add(1)
|
||||||
}
|
}
|
||||||
var isReadOnly uint32
|
var isReadOnly atomic.Bool
|
||||||
tb := MustOpenTable(path, flushCallback, nil, &isReadOnly)
|
tb := MustOpenTable(path, flushCallback, nil, &isReadOnly)
|
||||||
items := make([]string, itemsCount)
|
items := make([]string, itemsCount)
|
||||||
for i := 0; i < itemsCount; i++ {
|
for i := 0; i < itemsCount; i++ {
|
||||||
|
@ -158,7 +158,7 @@ func newTestTable(r *rand.Rand, path string, itemsCount int) (*Table, []string,
|
||||||
items[i] = item
|
items[i] = item
|
||||||
}
|
}
|
||||||
tb.DebugFlush()
|
tb.DebugFlush()
|
||||||
if itemsCount > 0 && atomic.LoadUint64(&flushes) == 0 {
|
if itemsCount > 0 && flushes.Load() == 0 {
|
||||||
return nil, nil, fmt.Errorf("unexpeted zero flushes for itemsCount=%d", itemsCount)
|
return nil, nil, fmt.Errorf("unexpeted zero flushes for itemsCount=%d", itemsCount)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -5,6 +5,7 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
"os"
|
"os"
|
||||||
|
"sync/atomic"
|
||||||
"testing"
|
"testing"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -34,7 +35,7 @@ func benchmarkTableSearch(b *testing.B, itemsCount int) {
|
||||||
|
|
||||||
// Force finishing pending merges
|
// Force finishing pending merges
|
||||||
tb.MustClose()
|
tb.MustClose()
|
||||||
var isReadOnly uint32
|
var isReadOnly atomic.Bool
|
||||||
tb = MustOpenTable(path, nil, nil, &isReadOnly)
|
tb = MustOpenTable(path, nil, nil, &isReadOnly)
|
||||||
defer tb.MustClose()
|
defer tb.MustClose()
|
||||||
|
|
||||||
|
|
|
@ -20,7 +20,7 @@ func TestTableOpenClose(t *testing.T) {
|
||||||
}()
|
}()
|
||||||
|
|
||||||
// Create a new table
|
// Create a new table
|
||||||
var isReadOnly uint32
|
var isReadOnly atomic.Bool
|
||||||
tb := MustOpenTable(path, nil, nil, &isReadOnly)
|
tb := MustOpenTable(path, nil, nil, &isReadOnly)
|
||||||
|
|
||||||
// Close it
|
// Close it
|
||||||
|
@ -39,7 +39,7 @@ func TestTableAddItemsTooLongItem(t *testing.T) {
|
||||||
t.Fatalf("cannot remove %q: %s", path, err)
|
t.Fatalf("cannot remove %q: %s", path, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
var isReadOnly uint32
|
var isReadOnly atomic.Bool
|
||||||
tb := MustOpenTable(path, nil, nil, &isReadOnly)
|
tb := MustOpenTable(path, nil, nil, &isReadOnly)
|
||||||
tb.AddItems([][]byte{make([]byte, maxInmemoryBlockSize+1)})
|
tb.AddItems([][]byte{make([]byte, maxInmemoryBlockSize+1)})
|
||||||
tb.MustClose()
|
tb.MustClose()
|
||||||
|
@ -56,11 +56,11 @@ func TestTableAddItemsSerial(t *testing.T) {
|
||||||
_ = os.RemoveAll(path)
|
_ = os.RemoveAll(path)
|
||||||
}()
|
}()
|
||||||
|
|
||||||
var flushes uint64
|
var flushes atomic.Uint64
|
||||||
flushCallback := func() {
|
flushCallback := func() {
|
||||||
atomic.AddUint64(&flushes, 1)
|
flushes.Add(1)
|
||||||
}
|
}
|
||||||
var isReadOnly uint32
|
var isReadOnly atomic.Bool
|
||||||
tb := MustOpenTable(path, flushCallback, nil, &isReadOnly)
|
tb := MustOpenTable(path, flushCallback, nil, &isReadOnly)
|
||||||
|
|
||||||
const itemsCount = 10e3
|
const itemsCount = 10e3
|
||||||
|
@ -68,7 +68,7 @@ func TestTableAddItemsSerial(t *testing.T) {
|
||||||
|
|
||||||
// Verify items count after pending items flush.
|
// Verify items count after pending items flush.
|
||||||
tb.DebugFlush()
|
tb.DebugFlush()
|
||||||
if atomic.LoadUint64(&flushes) == 0 {
|
if flushes.Load() == 0 {
|
||||||
t.Fatalf("unexpected zero flushes")
|
t.Fatalf("unexpected zero flushes")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -109,7 +109,7 @@ func TestTableCreateSnapshotAt(t *testing.T) {
|
||||||
t.Fatalf("cannot remove %q: %s", path, err)
|
t.Fatalf("cannot remove %q: %s", path, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
var isReadOnly uint32
|
var isReadOnly atomic.Bool
|
||||||
tb := MustOpenTable(path, nil, nil, &isReadOnly)
|
tb := MustOpenTable(path, nil, nil, &isReadOnly)
|
||||||
|
|
||||||
// Write a lot of items into the table, so background merges would start.
|
// Write a lot of items into the table, so background merges would start.
|
||||||
|
@ -185,14 +185,14 @@ func TestTableAddItemsConcurrent(t *testing.T) {
|
||||||
_ = os.RemoveAll(path)
|
_ = os.RemoveAll(path)
|
||||||
}()
|
}()
|
||||||
|
|
||||||
var flushes uint64
|
var flushes atomic.Uint64
|
||||||
flushCallback := func() {
|
flushCallback := func() {
|
||||||
atomic.AddUint64(&flushes, 1)
|
flushes.Add(1)
|
||||||
}
|
}
|
||||||
prepareBlock := func(data []byte, items []Item) ([]byte, []Item) {
|
prepareBlock := func(data []byte, items []Item) ([]byte, []Item) {
|
||||||
return data, items
|
return data, items
|
||||||
}
|
}
|
||||||
var isReadOnly uint32
|
var isReadOnly atomic.Bool
|
||||||
tb := MustOpenTable(path, flushCallback, prepareBlock, &isReadOnly)
|
tb := MustOpenTable(path, flushCallback, prepareBlock, &isReadOnly)
|
||||||
|
|
||||||
const itemsCount = 10e3
|
const itemsCount = 10e3
|
||||||
|
@ -200,7 +200,7 @@ func TestTableAddItemsConcurrent(t *testing.T) {
|
||||||
|
|
||||||
// Verify items count after pending items flush.
|
// Verify items count after pending items flush.
|
||||||
tb.DebugFlush()
|
tb.DebugFlush()
|
||||||
if atomic.LoadUint64(&flushes) == 0 {
|
if flushes.Load() == 0 {
|
||||||
t.Fatalf("unexpected zero flushes")
|
t.Fatalf("unexpected zero flushes")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -254,7 +254,7 @@ func testReopenTable(t *testing.T, path string, itemsCount int) {
|
||||||
t.Helper()
|
t.Helper()
|
||||||
|
|
||||||
for i := 0; i < 10; i++ {
|
for i := 0; i < 10; i++ {
|
||||||
var isReadOnly uint32
|
var isReadOnly atomic.Bool
|
||||||
tb := MustOpenTable(path, nil, nil, &isReadOnly)
|
tb := MustOpenTable(path, nil, nil, &isReadOnly)
|
||||||
var m TableMetrics
|
var m TableMetrics
|
||||||
tb.UpdateMetrics(&m)
|
tb.UpdateMetrics(&m)
|
||||||
|
|
|
@ -132,7 +132,7 @@ func getTagFiltersCacheSize() int {
|
||||||
//
|
//
|
||||||
// The last segment of the path should contain unique hex value which
|
// The last segment of the path should contain unique hex value which
|
||||||
// will be then used as indexDB.generation
|
// will be then used as indexDB.generation
|
||||||
func mustOpenIndexDB(path string, s *Storage, isReadOnly *uint32) *indexDB {
|
func mustOpenIndexDB(path string, s *Storage, isReadOnly *atomic.Bool) *indexDB {
|
||||||
if s == nil {
|
if s == nil {
|
||||||
logger.Panicf("BUG: Storage must be nin-nil")
|
logger.Panicf("BUG: Storage must be nin-nil")
|
||||||
}
|
}
|
||||||
|
|
|
@ -519,7 +519,7 @@ func TestIndexDBOpenClose(t *testing.T) {
|
||||||
var s Storage
|
var s Storage
|
||||||
tableName := nextIndexDBTableName()
|
tableName := nextIndexDBTableName()
|
||||||
for i := 0; i < 5; i++ {
|
for i := 0; i < 5; i++ {
|
||||||
var isReadOnly uint32
|
var isReadOnly atomic.Bool
|
||||||
db := mustOpenIndexDB(tableName, &s, &isReadOnly)
|
db := mustOpenIndexDB(tableName, &s, &isReadOnly)
|
||||||
db.MustClose()
|
db.MustClose()
|
||||||
}
|
}
|
||||||
|
|
|
@ -604,7 +604,7 @@ func (pt *partition) NotifyReadWriteMode() {
|
||||||
|
|
||||||
func (pt *partition) inmemoryPartsMerger() {
|
func (pt *partition) inmemoryPartsMerger() {
|
||||||
for {
|
for {
|
||||||
if atomic.LoadUint32(&pt.s.isReadOnly) != 0 {
|
if pt.s.isReadOnly.Load() {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
maxOutBytes := pt.getMaxBigPartSize()
|
maxOutBytes := pt.getMaxBigPartSize()
|
||||||
|
@ -637,7 +637,7 @@ func (pt *partition) inmemoryPartsMerger() {
|
||||||
|
|
||||||
func (pt *partition) smallPartsMerger() {
|
func (pt *partition) smallPartsMerger() {
|
||||||
for {
|
for {
|
||||||
if atomic.LoadUint32(&pt.s.isReadOnly) != 0 {
|
if pt.s.isReadOnly.Load() {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
maxOutBytes := pt.getMaxBigPartSize()
|
maxOutBytes := pt.getMaxBigPartSize()
|
||||||
|
@ -670,7 +670,7 @@ func (pt *partition) smallPartsMerger() {
|
||||||
|
|
||||||
func (pt *partition) bigPartsMerger() {
|
func (pt *partition) bigPartsMerger() {
|
||||||
for {
|
for {
|
||||||
if atomic.LoadUint32(&pt.s.isReadOnly) != 0 {
|
if pt.s.isReadOnly.Load() {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
maxOutBytes := pt.getMaxBigPartSize()
|
maxOutBytes := pt.getMaxBigPartSize()
|
||||||
|
|
|
@ -149,7 +149,8 @@ type Storage struct {
|
||||||
deletedMetricIDs atomic.Pointer[uint64set.Set]
|
deletedMetricIDs atomic.Pointer[uint64set.Set]
|
||||||
deletedMetricIDsUpdateLock sync.Mutex
|
deletedMetricIDsUpdateLock sync.Mutex
|
||||||
|
|
||||||
isReadOnly uint32
|
// isReadOnly is set to true when the storage is in read-only mode.
|
||||||
|
isReadOnly atomic.Bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// MustOpenStorage opens storage on the given path with the given retentionMsecs.
|
// MustOpenStorage opens storage on the given path with the given retentionMsecs.
|
||||||
|
@ -650,7 +651,7 @@ var freeDiskSpaceLimitBytes uint64
|
||||||
|
|
||||||
// IsReadOnly returns information is storage in read only mode
|
// IsReadOnly returns information is storage in read only mode
|
||||||
func (s *Storage) IsReadOnly() bool {
|
func (s *Storage) IsReadOnly() bool {
|
||||||
return atomic.LoadUint32(&s.isReadOnly) == 1
|
return s.isReadOnly.Load()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Storage) startFreeDiskSpaceWatcher() {
|
func (s *Storage) startFreeDiskSpaceWatcher() {
|
||||||
|
@ -659,18 +660,18 @@ func (s *Storage) startFreeDiskSpaceWatcher() {
|
||||||
if freeSpaceBytes < freeDiskSpaceLimitBytes {
|
if freeSpaceBytes < freeDiskSpaceLimitBytes {
|
||||||
// Switch the storage to readonly mode if there is no enough free space left at s.path
|
// Switch the storage to readonly mode if there is no enough free space left at s.path
|
||||||
//
|
//
|
||||||
// Use atomic.LoadUint32 in front of atomic.CompareAndSwapUint32 in order to avoid slow inter-CPU synchronization
|
// Use Load in front of CompareAndSwap in order to avoid slow inter-CPU synchronization
|
||||||
// when the storage is already in read-only mode.
|
// when the storage is already in read-only mode.
|
||||||
if atomic.LoadUint32(&s.isReadOnly) == 0 && atomic.CompareAndSwapUint32(&s.isReadOnly, 0, 1) {
|
if !s.isReadOnly.Load() && s.isReadOnly.CompareAndSwap(false, true) {
|
||||||
// log notification only on state change
|
// log notification only on state change
|
||||||
logger.Warnf("switching the storage at %s to read-only mode, since it has less than -storage.minFreeDiskSpaceBytes=%d of free space: %d bytes left",
|
logger.Warnf("switching the storage at %s to read-only mode, since it has less than -storage.minFreeDiskSpaceBytes=%d of free space: %d bytes left",
|
||||||
s.path, freeDiskSpaceLimitBytes, freeSpaceBytes)
|
s.path, freeDiskSpaceLimitBytes, freeSpaceBytes)
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
// Use atomic.LoadUint32 in front of atomic.CompareAndSwapUint32 in order to avoid slow inter-CPU synchronization
|
// Use Load in front of CompareAndSwap in order to avoid slow inter-CPU synchronization
|
||||||
// when the storage isn't in read-only mode.
|
// when the storage isn't in read-only mode.
|
||||||
if atomic.LoadUint32(&s.isReadOnly) == 1 && atomic.CompareAndSwapUint32(&s.isReadOnly, 1, 0) {
|
if s.isReadOnly.Load() && s.isReadOnly.CompareAndSwap(true, false) {
|
||||||
s.notifyReadWriteMode()
|
s.notifyReadWriteMode()
|
||||||
logger.Warnf("switching the storage at %s to read-write mode, since it has more than -storage.minFreeDiskSpaceBytes=%d of free space: %d bytes left",
|
logger.Warnf("switching the storage at %s to read-write mode, since it has more than -storage.minFreeDiskSpaceBytes=%d of free space: %d bytes left",
|
||||||
s.path, freeDiskSpaceLimitBytes, freeSpaceBytes)
|
s.path, freeDiskSpaceLimitBytes, freeSpaceBytes)
|
||||||
|
|
Loading…
Reference in a new issue