mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
lib/storage: remove prioritizing of merging small parts over merging big parts, since it doesn't work as expected
The prioritizing could lead to big merge starvation, which could end up in too big number of parts that must be merged into big parts. Multiple big merges may be initiated after the migration from v1.39.0 or v1.39.1. It is OK - these merges should be finished soon, which should return CPU and disk IO usage to normal levels. Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/648 Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/618
This commit is contained in:
parent
96039dcb40
commit
29bbab0ec9
12 changed files with 61 additions and 77 deletions
|
@ -363,9 +363,6 @@ func registerStorageMetrics(strg *storage.Storage) {
|
||||||
metrics.NewGauge(`vm_search_delays_total`, func() float64 {
|
metrics.NewGauge(`vm_search_delays_total`, func() float64 {
|
||||||
return float64(m().SearchDelays)
|
return float64(m().SearchDelays)
|
||||||
})
|
})
|
||||||
metrics.NewGauge(`vm_big_merges_delays_total`, func() float64 {
|
|
||||||
return float64(tm().BigMergesDelays)
|
|
||||||
})
|
|
||||||
|
|
||||||
metrics.NewGauge(`vm_slow_row_inserts_total`, func() float64 {
|
metrics.NewGauge(`vm_slow_row_inserts_total`, func() float64 {
|
||||||
return float64(m().SlowRowInserts)
|
return float64(m().SlowRowInserts)
|
||||||
|
|
|
@ -7,7 +7,6 @@ import (
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
|
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/pacelimiter"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// PrepareBlockCallback can transform the passed items allocated at the given data.
|
// PrepareBlockCallback can transform the passed items allocated at the given data.
|
||||||
|
@ -29,9 +28,9 @@ type PrepareBlockCallback func(data []byte, items [][]byte) ([]byte, [][]byte)
|
||||||
//
|
//
|
||||||
// It also atomically adds the number of items merged to itemsMerged.
|
// It also atomically adds the number of items merged to itemsMerged.
|
||||||
func mergeBlockStreams(ph *partHeader, bsw *blockStreamWriter, bsrs []*blockStreamReader, prepareBlock PrepareBlockCallback, stopCh <-chan struct{},
|
func mergeBlockStreams(ph *partHeader, bsw *blockStreamWriter, bsrs []*blockStreamReader, prepareBlock PrepareBlockCallback, stopCh <-chan struct{},
|
||||||
pl *pacelimiter.PaceLimiter, itemsMerged *uint64) error {
|
itemsMerged *uint64) error {
|
||||||
bsm := bsmPool.Get().(*blockStreamMerger)
|
bsm := bsmPool.Get().(*blockStreamMerger)
|
||||||
if err := bsm.Init(bsrs, prepareBlock, pl); err != nil {
|
if err := bsm.Init(bsrs, prepareBlock); err != nil {
|
||||||
return fmt.Errorf("cannot initialize blockStreamMerger: %w", err)
|
return fmt.Errorf("cannot initialize blockStreamMerger: %w", err)
|
||||||
}
|
}
|
||||||
err := bsm.Merge(bsw, ph, stopCh, itemsMerged)
|
err := bsm.Merge(bsw, ph, stopCh, itemsMerged)
|
||||||
|
@ -63,9 +62,6 @@ type blockStreamMerger struct {
|
||||||
|
|
||||||
phFirstItemCaught bool
|
phFirstItemCaught bool
|
||||||
|
|
||||||
// optional pace limiter for merge process.
|
|
||||||
pl *pacelimiter.PaceLimiter
|
|
||||||
|
|
||||||
// This are auxiliary buffers used in flushIB
|
// This are auxiliary buffers used in flushIB
|
||||||
// for consistency checks after prepareBlock call.
|
// for consistency checks after prepareBlock call.
|
||||||
firstItem []byte
|
firstItem []byte
|
||||||
|
@ -82,13 +78,11 @@ func (bsm *blockStreamMerger) reset() {
|
||||||
bsm.ib.Reset()
|
bsm.ib.Reset()
|
||||||
|
|
||||||
bsm.phFirstItemCaught = false
|
bsm.phFirstItemCaught = false
|
||||||
bsm.pl = nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (bsm *blockStreamMerger) Init(bsrs []*blockStreamReader, prepareBlock PrepareBlockCallback, pl *pacelimiter.PaceLimiter) error {
|
func (bsm *blockStreamMerger) Init(bsrs []*blockStreamReader, prepareBlock PrepareBlockCallback) error {
|
||||||
bsm.reset()
|
bsm.reset()
|
||||||
bsm.prepareBlock = prepareBlock
|
bsm.prepareBlock = prepareBlock
|
||||||
bsm.pl = pl
|
|
||||||
for _, bsr := range bsrs {
|
for _, bsr := range bsrs {
|
||||||
if bsr.Next() {
|
if bsr.Next() {
|
||||||
bsm.bsrHeap = append(bsm.bsrHeap, bsr)
|
bsm.bsrHeap = append(bsm.bsrHeap, bsr)
|
||||||
|
@ -111,9 +105,6 @@ var errForciblyStopped = fmt.Errorf("forcibly stopped")
|
||||||
|
|
||||||
func (bsm *blockStreamMerger) Merge(bsw *blockStreamWriter, ph *partHeader, stopCh <-chan struct{}, itemsMerged *uint64) error {
|
func (bsm *blockStreamMerger) Merge(bsw *blockStreamWriter, ph *partHeader, stopCh <-chan struct{}, itemsMerged *uint64) error {
|
||||||
again:
|
again:
|
||||||
if bsm.pl != nil {
|
|
||||||
bsm.pl.WaitIfNeeded()
|
|
||||||
}
|
|
||||||
if len(bsm.bsrHeap) == 0 {
|
if len(bsm.bsrHeap) == 0 {
|
||||||
// Write the last (maybe incomplete) inmemoryBlock to bsw.
|
// Write the last (maybe incomplete) inmemoryBlock to bsw.
|
||||||
bsm.flushIB(bsw, ph, itemsMerged)
|
bsm.flushIB(bsw, ph, itemsMerged)
|
||||||
|
|
|
@ -7,8 +7,6 @@ import (
|
||||||
"sort"
|
"sort"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storagepacelimiter"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestMergeBlockStreams(t *testing.T) {
|
func TestMergeBlockStreams(t *testing.T) {
|
||||||
|
@ -32,14 +30,14 @@ func TestMultilevelMerge(t *testing.T) {
|
||||||
var dstIP1 inmemoryPart
|
var dstIP1 inmemoryPart
|
||||||
var bsw1 blockStreamWriter
|
var bsw1 blockStreamWriter
|
||||||
bsw1.InitFromInmemoryPart(&dstIP1)
|
bsw1.InitFromInmemoryPart(&dstIP1)
|
||||||
if err := mergeBlockStreams(&dstIP1.ph, &bsw1, bsrs[:5], nil, nil, nil, &itemsMerged); err != nil {
|
if err := mergeBlockStreams(&dstIP1.ph, &bsw1, bsrs[:5], nil, nil, &itemsMerged); err != nil {
|
||||||
t.Fatalf("cannot merge first level part 1: %s", err)
|
t.Fatalf("cannot merge first level part 1: %s", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
var dstIP2 inmemoryPart
|
var dstIP2 inmemoryPart
|
||||||
var bsw2 blockStreamWriter
|
var bsw2 blockStreamWriter
|
||||||
bsw2.InitFromInmemoryPart(&dstIP2)
|
bsw2.InitFromInmemoryPart(&dstIP2)
|
||||||
if err := mergeBlockStreams(&dstIP2.ph, &bsw2, bsrs[5:], nil, nil, storagepacelimiter.BigMerges, &itemsMerged); err != nil {
|
if err := mergeBlockStreams(&dstIP2.ph, &bsw2, bsrs[5:], nil, nil, &itemsMerged); err != nil {
|
||||||
t.Fatalf("cannot merge first level part 2: %s", err)
|
t.Fatalf("cannot merge first level part 2: %s", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -56,7 +54,7 @@ func TestMultilevelMerge(t *testing.T) {
|
||||||
newTestBlockStreamReader(&dstIP2),
|
newTestBlockStreamReader(&dstIP2),
|
||||||
}
|
}
|
||||||
bsw.InitFromInmemoryPart(&dstIP)
|
bsw.InitFromInmemoryPart(&dstIP)
|
||||||
if err := mergeBlockStreams(&dstIP.ph, &bsw, bsrsTop, nil, nil, storagepacelimiter.BigMerges, &itemsMerged); err != nil {
|
if err := mergeBlockStreams(&dstIP.ph, &bsw, bsrsTop, nil, nil, &itemsMerged); err != nil {
|
||||||
t.Fatalf("cannot merge second level: %s", err)
|
t.Fatalf("cannot merge second level: %s", err)
|
||||||
}
|
}
|
||||||
if itemsMerged != uint64(len(items)) {
|
if itemsMerged != uint64(len(items)) {
|
||||||
|
@ -78,7 +76,7 @@ func TestMergeForciblyStop(t *testing.T) {
|
||||||
ch := make(chan struct{})
|
ch := make(chan struct{})
|
||||||
var itemsMerged uint64
|
var itemsMerged uint64
|
||||||
close(ch)
|
close(ch)
|
||||||
if err := mergeBlockStreams(&dstIP.ph, &bsw, bsrs, nil, ch, nil, &itemsMerged); err != errForciblyStopped {
|
if err := mergeBlockStreams(&dstIP.ph, &bsw, bsrs, nil, ch, &itemsMerged); err != errForciblyStopped {
|
||||||
t.Fatalf("unexpected error during merge: got %v; want %v", err, errForciblyStopped)
|
t.Fatalf("unexpected error during merge: got %v; want %v", err, errForciblyStopped)
|
||||||
}
|
}
|
||||||
if itemsMerged != 0 {
|
if itemsMerged != 0 {
|
||||||
|
@ -122,7 +120,7 @@ func testMergeBlockStreamsSerial(blocksToMerge, maxItemsPerBlock int) error {
|
||||||
var dstIP inmemoryPart
|
var dstIP inmemoryPart
|
||||||
var bsw blockStreamWriter
|
var bsw blockStreamWriter
|
||||||
bsw.InitFromInmemoryPart(&dstIP)
|
bsw.InitFromInmemoryPart(&dstIP)
|
||||||
if err := mergeBlockStreams(&dstIP.ph, &bsw, bsrs, nil, nil, storagepacelimiter.BigMerges, &itemsMerged); err != nil {
|
if err := mergeBlockStreams(&dstIP.ph, &bsw, bsrs, nil, nil, &itemsMerged); err != nil {
|
||||||
return fmt.Errorf("cannot merge block streams: %w", err)
|
return fmt.Errorf("cannot merge block streams: %w", err)
|
||||||
}
|
}
|
||||||
if itemsMerged != uint64(len(items)) {
|
if itemsMerged != uint64(len(items)) {
|
||||||
|
|
|
@ -150,7 +150,7 @@ func newTestPart(blocksCount, maxItemsPerBlock int) (*part, []string, error) {
|
||||||
var ip inmemoryPart
|
var ip inmemoryPart
|
||||||
var bsw blockStreamWriter
|
var bsw blockStreamWriter
|
||||||
bsw.InitFromInmemoryPart(&ip)
|
bsw.InitFromInmemoryPart(&ip)
|
||||||
if err := mergeBlockStreams(&ip.ph, &bsw, bsrs, nil, nil, nil, &itemsMerged); err != nil {
|
if err := mergeBlockStreams(&ip.ph, &bsw, bsrs, nil, nil, &itemsMerged); err != nil {
|
||||||
return nil, nil, fmt.Errorf("cannot merge blocks: %w", err)
|
return nil, nil, fmt.Errorf("cannot merge blocks: %w", err)
|
||||||
}
|
}
|
||||||
if itemsMerged != uint64(len(items)) {
|
if itemsMerged != uint64(len(items)) {
|
||||||
|
|
|
@ -635,11 +635,7 @@ func (tb *Table) mergeInmemoryBlocks(blocksToMerge []*inmemoryBlock) *partWrappe
|
||||||
// Merge parts.
|
// Merge parts.
|
||||||
// The merge shouldn't be interrupted by stopCh,
|
// The merge shouldn't be interrupted by stopCh,
|
||||||
// since it may be final after stopCh is closed.
|
// since it may be final after stopCh is closed.
|
||||||
//
|
err := mergeBlockStreams(&mpDst.ph, bsw, bsrs, tb.prepareBlock, nil, &tb.itemsMerged)
|
||||||
// Prioritize merging of inmemory blocks over merging file parts.
|
|
||||||
storagepacelimiter.BigMerges.Inc()
|
|
||||||
err := mergeBlockStreams(&mpDst.ph, bsw, bsrs, tb.prepareBlock, nil, nil, &tb.itemsMerged)
|
|
||||||
storagepacelimiter.BigMerges.Dec()
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Panicf("FATAL: cannot merge inmemoryBlocks: %s", err)
|
logger.Panicf("FATAL: cannot merge inmemoryBlocks: %s", err)
|
||||||
}
|
}
|
||||||
|
@ -801,7 +797,7 @@ func (tb *Table) mergeParts(pws []*partWrapper, stopCh <-chan struct{}, isOuterP
|
||||||
|
|
||||||
// Merge parts into a temporary location.
|
// Merge parts into a temporary location.
|
||||||
var ph partHeader
|
var ph partHeader
|
||||||
err := mergeBlockStreams(&ph, bsw, bsrs, tb.prepareBlock, stopCh, storagepacelimiter.BigMerges, &tb.itemsMerged)
|
err := mergeBlockStreams(&ph, bsw, bsrs, tb.prepareBlock, stopCh, &tb.itemsMerged)
|
||||||
putBlockStreamWriter(bsw)
|
putBlockStreamWriter(bsw)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if err == errForciblyStopped {
|
if err == errForciblyStopped {
|
||||||
|
@ -949,9 +945,7 @@ func (tb *Table) maxOutPartItemsSlow() uint64 {
|
||||||
return freeSpace / uint64(mergeWorkersCount) / 4
|
return freeSpace / uint64(mergeWorkersCount) / 4
|
||||||
}
|
}
|
||||||
|
|
||||||
var mergeWorkersCount = func() int {
|
var mergeWorkersCount = runtime.GOMAXPROCS(-1)
|
||||||
return runtime.GOMAXPROCS(-1)
|
|
||||||
}()
|
|
||||||
|
|
||||||
func openParts(path string) ([]*partWrapper, error) {
|
func openParts(path string) ([]*partWrapper, error) {
|
||||||
// The path can be missing after restoring from backup, so create it if needed.
|
// The path can be missing after restoring from backup, so create it if needed.
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
package pacelimiter
|
package pacelimiter
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"fmt"
|
||||||
"runtime"
|
"runtime"
|
||||||
"sync"
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
|
@ -73,6 +74,43 @@ func TestPacelimiter(t *testing.T) {
|
||||||
}
|
}
|
||||||
// Verify that the pl is unblocked now.
|
// Verify that the pl is unblocked now.
|
||||||
pl.WaitIfNeeded()
|
pl.WaitIfNeeded()
|
||||||
|
|
||||||
|
// Verify that negative count doesn't block pl.
|
||||||
|
pl.Dec()
|
||||||
|
pl.WaitIfNeeded()
|
||||||
|
if n := pl.DelaysTotal(); n == 0 {
|
||||||
|
t.Fatalf("expecting non-zero number of delays after subsequent pl.Dec()")
|
||||||
|
}
|
||||||
|
})
|
||||||
|
t.Run("negative_count", func(t *testing.T) {
|
||||||
|
n := 10
|
||||||
|
pl := New()
|
||||||
|
for i := 0; i < n; i++ {
|
||||||
|
pl.Dec()
|
||||||
|
}
|
||||||
|
|
||||||
|
doneCh := make(chan error)
|
||||||
|
go func() {
|
||||||
|
defer close(doneCh)
|
||||||
|
for i := 0; i < n; i++ {
|
||||||
|
pl.Inc()
|
||||||
|
pl.WaitIfNeeded()
|
||||||
|
if n := pl.DelaysTotal(); n != 0 {
|
||||||
|
doneCh <- fmt.Errorf("expecting zero number of delays")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
doneCh <- nil
|
||||||
|
}()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case err := <-doneCh:
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unexpected error: %s", err)
|
||||||
|
}
|
||||||
|
case <-time.After(5 * time.Second):
|
||||||
|
t.Fatalf("timeout")
|
||||||
|
}
|
||||||
})
|
})
|
||||||
t.Run("concurrent_inc_dec", func(t *testing.T) {
|
t.Run("concurrent_inc_dec", func(t *testing.T) {
|
||||||
pl := New()
|
pl := New()
|
||||||
|
|
|
@ -4,8 +4,6 @@ import (
|
||||||
"container/heap"
|
"container/heap"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
|
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/pacelimiter"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// blockStreamMerger is used for merging block streams.
|
// blockStreamMerger is used for merging block streams.
|
||||||
|
@ -18,9 +16,6 @@ type blockStreamMerger struct {
|
||||||
// Whether the call to NextBlock must be no-op.
|
// Whether the call to NextBlock must be no-op.
|
||||||
nextBlockNoop bool
|
nextBlockNoop bool
|
||||||
|
|
||||||
// Optional pace limiter for limiting the pace for NextBlock calls.
|
|
||||||
pl *pacelimiter.PaceLimiter
|
|
||||||
|
|
||||||
// The last error
|
// The last error
|
||||||
err error
|
err error
|
||||||
}
|
}
|
||||||
|
@ -32,14 +27,11 @@ func (bsm *blockStreamMerger) reset() {
|
||||||
}
|
}
|
||||||
bsm.bsrHeap = bsm.bsrHeap[:0]
|
bsm.bsrHeap = bsm.bsrHeap[:0]
|
||||||
bsm.nextBlockNoop = false
|
bsm.nextBlockNoop = false
|
||||||
bsm.pl = nil
|
|
||||||
bsm.err = nil
|
bsm.err = nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Init initializes bsm with the given bsrs.
|
// Init initializes bsm with the given bsrs.
|
||||||
//
|
func (bsm *blockStreamMerger) Init(bsrs []*blockStreamReader) {
|
||||||
// pl is an optional pace limiter, which allows limiting the pace for NextBlock calls.
|
|
||||||
func (bsm *blockStreamMerger) Init(bsrs []*blockStreamReader, pl *pacelimiter.PaceLimiter) {
|
|
||||||
bsm.reset()
|
bsm.reset()
|
||||||
for _, bsr := range bsrs {
|
for _, bsr := range bsrs {
|
||||||
if bsr.NextBlock() {
|
if bsr.NextBlock() {
|
||||||
|
@ -60,7 +52,6 @@ func (bsm *blockStreamMerger) Init(bsrs []*blockStreamReader, pl *pacelimiter.Pa
|
||||||
heap.Init(&bsm.bsrHeap)
|
heap.Init(&bsm.bsrHeap)
|
||||||
bsm.Block = &bsm.bsrHeap[0].Block
|
bsm.Block = &bsm.bsrHeap[0].Block
|
||||||
bsm.nextBlockNoop = true
|
bsm.nextBlockNoop = true
|
||||||
bsm.pl = pl
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// NextBlock stores the next block in bsm.Block.
|
// NextBlock stores the next block in bsm.Block.
|
||||||
|
@ -75,9 +66,6 @@ func (bsm *blockStreamMerger) NextBlock() bool {
|
||||||
bsm.nextBlockNoop = false
|
bsm.nextBlockNoop = false
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
if bsm.pl != nil {
|
|
||||||
bsm.pl.WaitIfNeeded()
|
|
||||||
}
|
|
||||||
|
|
||||||
bsm.err = bsm.nextBlock()
|
bsm.err = bsm.nextBlock()
|
||||||
switch bsm.err {
|
switch bsm.err {
|
||||||
|
|
|
@ -6,7 +6,6 @@ import (
|
||||||
|
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/pacelimiter"
|
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/uint64set"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/uint64set"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -16,11 +15,11 @@ import (
|
||||||
//
|
//
|
||||||
// rowsMerged is atomically updated with the number of merged rows during the merge.
|
// rowsMerged is atomically updated with the number of merged rows during the merge.
|
||||||
func mergeBlockStreams(ph *partHeader, bsw *blockStreamWriter, bsrs []*blockStreamReader, stopCh <-chan struct{},
|
func mergeBlockStreams(ph *partHeader, bsw *blockStreamWriter, bsrs []*blockStreamReader, stopCh <-chan struct{},
|
||||||
pl *pacelimiter.PaceLimiter, dmis *uint64set.Set, rowsMerged, rowsDeleted *uint64) error {
|
dmis *uint64set.Set, rowsMerged, rowsDeleted *uint64) error {
|
||||||
ph.Reset()
|
ph.Reset()
|
||||||
|
|
||||||
bsm := bsmPool.Get().(*blockStreamMerger)
|
bsm := bsmPool.Get().(*blockStreamMerger)
|
||||||
bsm.Init(bsrs, pl)
|
bsm.Init(bsrs)
|
||||||
err := mergeBlockStreamsInternal(ph, bsw, bsm, stopCh, dmis, rowsMerged, rowsDeleted)
|
err := mergeBlockStreamsInternal(ph, bsw, bsm, stopCh, dmis, rowsMerged, rowsDeleted)
|
||||||
bsm.reset()
|
bsm.reset()
|
||||||
bsmPool.Put(bsm)
|
bsmPool.Put(bsm)
|
||||||
|
|
|
@ -3,8 +3,6 @@ package storage
|
||||||
import (
|
import (
|
||||||
"math/rand"
|
"math/rand"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storagepacelimiter"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestMergeBlockStreamsOneStreamOneRow(t *testing.T) {
|
func TestMergeBlockStreamsOneStreamOneRow(t *testing.T) {
|
||||||
|
@ -366,7 +364,7 @@ func TestMergeForciblyStop(t *testing.T) {
|
||||||
ch := make(chan struct{})
|
ch := make(chan struct{})
|
||||||
var rowsMerged, rowsDeleted uint64
|
var rowsMerged, rowsDeleted uint64
|
||||||
close(ch)
|
close(ch)
|
||||||
if err := mergeBlockStreams(&mp.ph, &bsw, bsrs, ch, nil, nil, &rowsMerged, &rowsDeleted); err != errForciblyStopped {
|
if err := mergeBlockStreams(&mp.ph, &bsw, bsrs, ch, nil, &rowsMerged, &rowsDeleted); err != errForciblyStopped {
|
||||||
t.Fatalf("unexpected error in mergeBlockStreams: got %v; want %v", err, errForciblyStopped)
|
t.Fatalf("unexpected error in mergeBlockStreams: got %v; want %v", err, errForciblyStopped)
|
||||||
}
|
}
|
||||||
if rowsMerged != 0 {
|
if rowsMerged != 0 {
|
||||||
|
@ -386,7 +384,7 @@ func testMergeBlockStreams(t *testing.T, bsrs []*blockStreamReader, expectedBloc
|
||||||
bsw.InitFromInmemoryPart(&mp)
|
bsw.InitFromInmemoryPart(&mp)
|
||||||
|
|
||||||
var rowsMerged, rowsDeleted uint64
|
var rowsMerged, rowsDeleted uint64
|
||||||
if err := mergeBlockStreams(&mp.ph, &bsw, bsrs, nil, storagepacelimiter.BigMerges, nil, &rowsMerged, &rowsDeleted); err != nil {
|
if err := mergeBlockStreams(&mp.ph, &bsw, bsrs, nil, nil, &rowsMerged, &rowsDeleted); err != nil {
|
||||||
t.Fatalf("unexpected error in mergeBlockStreams: %s", err)
|
t.Fatalf("unexpected error in mergeBlockStreams: %s", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -41,7 +41,7 @@ func benchmarkMergeBlockStreams(b *testing.B, mps []*inmemoryPart, rowsPerLoop i
|
||||||
}
|
}
|
||||||
mpOut.Reset()
|
mpOut.Reset()
|
||||||
bsw.InitFromInmemoryPart(&mpOut)
|
bsw.InitFromInmemoryPart(&mpOut)
|
||||||
if err := mergeBlockStreams(&mpOut.ph, &bsw, bsrs, nil, nil, nil, &rowsMerged, &rowsDeleted); err != nil {
|
if err := mergeBlockStreams(&mpOut.ph, &bsw, bsrs, nil, nil, &rowsMerged, &rowsDeleted); err != nil {
|
||||||
panic(fmt.Errorf("cannot merge block streams: %w", err))
|
panic(fmt.Errorf("cannot merge block streams: %w", err))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -329,7 +329,6 @@ type partitionMetrics struct {
|
||||||
SmallPartsRefCount uint64
|
SmallPartsRefCount uint64
|
||||||
|
|
||||||
SmallAssistedMerges uint64
|
SmallAssistedMerges uint64
|
||||||
BigMergesDelays uint64
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// UpdateMetrics updates m with metrics from pt.
|
// UpdateMetrics updates m with metrics from pt.
|
||||||
|
@ -388,8 +387,6 @@ func (pt *partition) UpdateMetrics(m *partitionMetrics) {
|
||||||
m.SmallRowsDeleted += atomic.LoadUint64(&pt.smallRowsDeleted)
|
m.SmallRowsDeleted += atomic.LoadUint64(&pt.smallRowsDeleted)
|
||||||
|
|
||||||
m.SmallAssistedMerges += atomic.LoadUint64(&pt.smallAssistedMerges)
|
m.SmallAssistedMerges += atomic.LoadUint64(&pt.smallAssistedMerges)
|
||||||
|
|
||||||
m.BigMergesDelays = storagepacelimiter.BigMerges.DelaysTotal()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// AddRows adds the given rows to the partition pt.
|
// AddRows adds the given rows to the partition pt.
|
||||||
|
@ -817,13 +814,7 @@ func (pt *partition) mergePartsOptimal(pws []*partWrapper) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
var mergeWorkersCount = func() int {
|
var mergeWorkersCount = runtime.GOMAXPROCS(-1)
|
||||||
n := runtime.GOMAXPROCS(-1) / 2
|
|
||||||
if n <= 0 {
|
|
||||||
n = 1
|
|
||||||
}
|
|
||||||
return n
|
|
||||||
}()
|
|
||||||
|
|
||||||
var (
|
var (
|
||||||
bigMergeConcurrencyLimitCh = make(chan struct{}, mergeWorkersCount)
|
bigMergeConcurrencyLimitCh = make(chan struct{}, mergeWorkersCount)
|
||||||
|
@ -935,10 +926,9 @@ func maxRowsByPath(path string) uint64 {
|
||||||
// Calculate the maximum number of rows in the output merge part
|
// Calculate the maximum number of rows in the output merge part
|
||||||
// by dividing the freeSpace by the number of concurrent
|
// by dividing the freeSpace by the number of concurrent
|
||||||
// mergeWorkersCount for big parts.
|
// mergeWorkersCount for big parts.
|
||||||
// This assumes each row is compressed into 1 byte. Production
|
// This assumes each row is compressed into 0.5 bytes
|
||||||
// simulation shows that each row usually occupies up to 0.5 bytes,
|
// according to production data.
|
||||||
// so this is quite safe assumption.
|
maxRows := 2 * (freeSpace / uint64(mergeWorkersCount))
|
||||||
maxRows := freeSpace / uint64(mergeWorkersCount)
|
|
||||||
if maxRows > maxRowsPerBigPart {
|
if maxRows > maxRowsPerBigPart {
|
||||||
maxRows = maxRowsPerBigPart
|
maxRows = maxRowsPerBigPart
|
||||||
}
|
}
|
||||||
|
@ -1058,25 +1048,21 @@ func (pt *partition) mergeParts(pws []*partWrapper, stopCh <-chan struct{}) erro
|
||||||
var ph partHeader
|
var ph partHeader
|
||||||
rowsMerged := &pt.smallRowsMerged
|
rowsMerged := &pt.smallRowsMerged
|
||||||
rowsDeleted := &pt.smallRowsDeleted
|
rowsDeleted := &pt.smallRowsDeleted
|
||||||
pl := storagepacelimiter.BigMerges
|
|
||||||
if isBigPart {
|
if isBigPart {
|
||||||
rowsMerged = &pt.bigRowsMerged
|
rowsMerged = &pt.bigRowsMerged
|
||||||
rowsDeleted = &pt.bigRowsDeleted
|
rowsDeleted = &pt.bigRowsDeleted
|
||||||
atomic.AddUint64(&pt.bigMergesCount, 1)
|
atomic.AddUint64(&pt.bigMergesCount, 1)
|
||||||
atomic.AddUint64(&pt.activeBigMerges, 1)
|
atomic.AddUint64(&pt.activeBigMerges, 1)
|
||||||
} else {
|
} else {
|
||||||
pl = nil
|
|
||||||
atomic.AddUint64(&pt.smallMergesCount, 1)
|
atomic.AddUint64(&pt.smallMergesCount, 1)
|
||||||
atomic.AddUint64(&pt.activeSmallMerges, 1)
|
atomic.AddUint64(&pt.activeSmallMerges, 1)
|
||||||
// Prioritize small merges over big merges.
|
// Prioritize small merges over big merges.
|
||||||
storagepacelimiter.BigMerges.Inc()
|
|
||||||
}
|
}
|
||||||
err := mergeBlockStreams(&ph, bsw, bsrs, stopCh, pl, dmis, rowsMerged, rowsDeleted)
|
err := mergeBlockStreams(&ph, bsw, bsrs, stopCh, dmis, rowsMerged, rowsDeleted)
|
||||||
if isBigPart {
|
if isBigPart {
|
||||||
atomic.AddUint64(&pt.activeBigMerges, ^uint64(0))
|
atomic.AddUint64(&pt.activeBigMerges, ^uint64(0))
|
||||||
} else {
|
} else {
|
||||||
atomic.AddUint64(&pt.activeSmallMerges, ^uint64(0))
|
atomic.AddUint64(&pt.activeSmallMerges, ^uint64(0))
|
||||||
storagepacelimiter.BigMerges.Dec()
|
|
||||||
}
|
}
|
||||||
putBlockStreamWriter(bsw)
|
putBlockStreamWriter(bsw)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -8,8 +8,3 @@ import (
|
||||||
//
|
//
|
||||||
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/291
|
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/291
|
||||||
var Search = pacelimiter.New()
|
var Search = pacelimiter.New()
|
||||||
|
|
||||||
// BigMerges limits the pace for big merges when there is at least a single in-flight small merge.
|
|
||||||
//
|
|
||||||
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/648
|
|
||||||
var BigMerges = pacelimiter.New()
|
|
||||||
|
|
Loading…
Reference in a new issue