From 29bbab0ec9d19c23cfd4c43cc88660eec600a531 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Thu, 30 Jul 2020 19:57:25 +0300 Subject: [PATCH] lib/storage: remove prioritizing of merging small parts over merging big parts, since it doesn't work as expected The prioritizing could lead to big merge starvation, which could end up in too big number of parts that must be merged into big parts. Multiple big merges may be initiated after the migration from v1.39.0 or v1.39.1. It is OK - these merges should be finished soon, which should return CPU and disk IO usage to normal levels. Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/648 Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/618 --- app/vmstorage/main.go | 3 -- lib/mergeset/merge.go | 15 ++------ lib/mergeset/merge_test.go | 12 +++---- lib/mergeset/part_search_test.go | 2 +- lib/mergeset/table.go | 12 ++----- lib/pacelimiter/pacelimiter_test.go | 38 ++++++++++++++++++++ lib/storage/block_stream_merger.go | 14 +------- lib/storage/merge.go | 5 ++- lib/storage/merge_test.go | 6 ++-- lib/storage/merge_timing_test.go | 2 +- lib/storage/partition.go | 24 +++---------- lib/storagepacelimiter/storagepacelimiter.go | 5 --- 12 files changed, 61 insertions(+), 77 deletions(-) diff --git a/app/vmstorage/main.go b/app/vmstorage/main.go index 8cf420d47..2b820d962 100644 --- a/app/vmstorage/main.go +++ b/app/vmstorage/main.go @@ -363,9 +363,6 @@ func registerStorageMetrics(strg *storage.Storage) { metrics.NewGauge(`vm_search_delays_total`, func() float64 { return float64(m().SearchDelays) }) - metrics.NewGauge(`vm_big_merges_delays_total`, func() float64 { - return float64(tm().BigMergesDelays) - }) metrics.NewGauge(`vm_slow_row_inserts_total`, func() float64 { return float64(m().SlowRowInserts) diff --git a/lib/mergeset/merge.go b/lib/mergeset/merge.go index 4b3f36fc6..709ce6557 100644 --- a/lib/mergeset/merge.go +++ b/lib/mergeset/merge.go @@ -7,7 +7,6 @@ import ( "sync/atomic" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/pacelimiter" ) // PrepareBlockCallback can transform the passed items allocated at the given data. @@ -29,9 +28,9 @@ type PrepareBlockCallback func(data []byte, items [][]byte) ([]byte, [][]byte) // // It also atomically adds the number of items merged to itemsMerged. func mergeBlockStreams(ph *partHeader, bsw *blockStreamWriter, bsrs []*blockStreamReader, prepareBlock PrepareBlockCallback, stopCh <-chan struct{}, - pl *pacelimiter.PaceLimiter, itemsMerged *uint64) error { + itemsMerged *uint64) error { bsm := bsmPool.Get().(*blockStreamMerger) - if err := bsm.Init(bsrs, prepareBlock, pl); err != nil { + if err := bsm.Init(bsrs, prepareBlock); err != nil { return fmt.Errorf("cannot initialize blockStreamMerger: %w", err) } err := bsm.Merge(bsw, ph, stopCh, itemsMerged) @@ -63,9 +62,6 @@ type blockStreamMerger struct { phFirstItemCaught bool - // optional pace limiter for merge process. - pl *pacelimiter.PaceLimiter - // This are auxiliary buffers used in flushIB // for consistency checks after prepareBlock call. firstItem []byte @@ -82,13 +78,11 @@ func (bsm *blockStreamMerger) reset() { bsm.ib.Reset() bsm.phFirstItemCaught = false - bsm.pl = nil } -func (bsm *blockStreamMerger) Init(bsrs []*blockStreamReader, prepareBlock PrepareBlockCallback, pl *pacelimiter.PaceLimiter) error { +func (bsm *blockStreamMerger) Init(bsrs []*blockStreamReader, prepareBlock PrepareBlockCallback) error { bsm.reset() bsm.prepareBlock = prepareBlock - bsm.pl = pl for _, bsr := range bsrs { if bsr.Next() { bsm.bsrHeap = append(bsm.bsrHeap, bsr) @@ -111,9 +105,6 @@ var errForciblyStopped = fmt.Errorf("forcibly stopped") func (bsm *blockStreamMerger) Merge(bsw *blockStreamWriter, ph *partHeader, stopCh <-chan struct{}, itemsMerged *uint64) error { again: - if bsm.pl != nil { - bsm.pl.WaitIfNeeded() - } if len(bsm.bsrHeap) == 0 { // Write the last (maybe incomplete) inmemoryBlock to bsw. bsm.flushIB(bsw, ph, itemsMerged) diff --git a/lib/mergeset/merge_test.go b/lib/mergeset/merge_test.go index 971fcc298..a4d7cc058 100644 --- a/lib/mergeset/merge_test.go +++ b/lib/mergeset/merge_test.go @@ -7,8 +7,6 @@ import ( "sort" "testing" "time" - - "github.com/VictoriaMetrics/VictoriaMetrics/lib/storagepacelimiter" ) func TestMergeBlockStreams(t *testing.T) { @@ -32,14 +30,14 @@ func TestMultilevelMerge(t *testing.T) { var dstIP1 inmemoryPart var bsw1 blockStreamWriter bsw1.InitFromInmemoryPart(&dstIP1) - if err := mergeBlockStreams(&dstIP1.ph, &bsw1, bsrs[:5], nil, nil, nil, &itemsMerged); err != nil { + if err := mergeBlockStreams(&dstIP1.ph, &bsw1, bsrs[:5], nil, nil, &itemsMerged); err != nil { t.Fatalf("cannot merge first level part 1: %s", err) } var dstIP2 inmemoryPart var bsw2 blockStreamWriter bsw2.InitFromInmemoryPart(&dstIP2) - if err := mergeBlockStreams(&dstIP2.ph, &bsw2, bsrs[5:], nil, nil, storagepacelimiter.BigMerges, &itemsMerged); err != nil { + if err := mergeBlockStreams(&dstIP2.ph, &bsw2, bsrs[5:], nil, nil, &itemsMerged); err != nil { t.Fatalf("cannot merge first level part 2: %s", err) } @@ -56,7 +54,7 @@ func TestMultilevelMerge(t *testing.T) { newTestBlockStreamReader(&dstIP2), } bsw.InitFromInmemoryPart(&dstIP) - if err := mergeBlockStreams(&dstIP.ph, &bsw, bsrsTop, nil, nil, storagepacelimiter.BigMerges, &itemsMerged); err != nil { + if err := mergeBlockStreams(&dstIP.ph, &bsw, bsrsTop, nil, nil, &itemsMerged); err != nil { t.Fatalf("cannot merge second level: %s", err) } if itemsMerged != uint64(len(items)) { @@ -78,7 +76,7 @@ func TestMergeForciblyStop(t *testing.T) { ch := make(chan struct{}) var itemsMerged uint64 close(ch) - if err := mergeBlockStreams(&dstIP.ph, &bsw, bsrs, nil, ch, nil, &itemsMerged); err != errForciblyStopped { + if err := mergeBlockStreams(&dstIP.ph, &bsw, bsrs, nil, ch, &itemsMerged); err != errForciblyStopped { t.Fatalf("unexpected error during merge: got %v; want %v", err, errForciblyStopped) } if itemsMerged != 0 { @@ -122,7 +120,7 @@ func testMergeBlockStreamsSerial(blocksToMerge, maxItemsPerBlock int) error { var dstIP inmemoryPart var bsw blockStreamWriter bsw.InitFromInmemoryPart(&dstIP) - if err := mergeBlockStreams(&dstIP.ph, &bsw, bsrs, nil, nil, storagepacelimiter.BigMerges, &itemsMerged); err != nil { + if err := mergeBlockStreams(&dstIP.ph, &bsw, bsrs, nil, nil, &itemsMerged); err != nil { return fmt.Errorf("cannot merge block streams: %w", err) } if itemsMerged != uint64(len(items)) { diff --git a/lib/mergeset/part_search_test.go b/lib/mergeset/part_search_test.go index 9a3d7f9d9..37b87f13d 100644 --- a/lib/mergeset/part_search_test.go +++ b/lib/mergeset/part_search_test.go @@ -150,7 +150,7 @@ func newTestPart(blocksCount, maxItemsPerBlock int) (*part, []string, error) { var ip inmemoryPart var bsw blockStreamWriter bsw.InitFromInmemoryPart(&ip) - if err := mergeBlockStreams(&ip.ph, &bsw, bsrs, nil, nil, nil, &itemsMerged); err != nil { + if err := mergeBlockStreams(&ip.ph, &bsw, bsrs, nil, nil, &itemsMerged); err != nil { return nil, nil, fmt.Errorf("cannot merge blocks: %w", err) } if itemsMerged != uint64(len(items)) { diff --git a/lib/mergeset/table.go b/lib/mergeset/table.go index 4f5d25374..f309c039d 100644 --- a/lib/mergeset/table.go +++ b/lib/mergeset/table.go @@ -635,11 +635,7 @@ func (tb *Table) mergeInmemoryBlocks(blocksToMerge []*inmemoryBlock) *partWrappe // Merge parts. // The merge shouldn't be interrupted by stopCh, // since it may be final after stopCh is closed. - // - // Prioritize merging of inmemory blocks over merging file parts. - storagepacelimiter.BigMerges.Inc() - err := mergeBlockStreams(&mpDst.ph, bsw, bsrs, tb.prepareBlock, nil, nil, &tb.itemsMerged) - storagepacelimiter.BigMerges.Dec() + err := mergeBlockStreams(&mpDst.ph, bsw, bsrs, tb.prepareBlock, nil, &tb.itemsMerged) if err != nil { logger.Panicf("FATAL: cannot merge inmemoryBlocks: %s", err) } @@ -801,7 +797,7 @@ func (tb *Table) mergeParts(pws []*partWrapper, stopCh <-chan struct{}, isOuterP // Merge parts into a temporary location. var ph partHeader - err := mergeBlockStreams(&ph, bsw, bsrs, tb.prepareBlock, stopCh, storagepacelimiter.BigMerges, &tb.itemsMerged) + err := mergeBlockStreams(&ph, bsw, bsrs, tb.prepareBlock, stopCh, &tb.itemsMerged) putBlockStreamWriter(bsw) if err != nil { if err == errForciblyStopped { @@ -949,9 +945,7 @@ func (tb *Table) maxOutPartItemsSlow() uint64 { return freeSpace / uint64(mergeWorkersCount) / 4 } -var mergeWorkersCount = func() int { - return runtime.GOMAXPROCS(-1) -}() +var mergeWorkersCount = runtime.GOMAXPROCS(-1) func openParts(path string) ([]*partWrapper, error) { // The path can be missing after restoring from backup, so create it if needed. diff --git a/lib/pacelimiter/pacelimiter_test.go b/lib/pacelimiter/pacelimiter_test.go index 2b136e3b9..4c5bbf252 100644 --- a/lib/pacelimiter/pacelimiter_test.go +++ b/lib/pacelimiter/pacelimiter_test.go @@ -1,6 +1,7 @@ package pacelimiter import ( + "fmt" "runtime" "sync" "testing" @@ -73,6 +74,43 @@ func TestPacelimiter(t *testing.T) { } // Verify that the pl is unblocked now. pl.WaitIfNeeded() + + // Verify that negative count doesn't block pl. + pl.Dec() + pl.WaitIfNeeded() + if n := pl.DelaysTotal(); n == 0 { + t.Fatalf("expecting non-zero number of delays after subsequent pl.Dec()") + } + }) + t.Run("negative_count", func(t *testing.T) { + n := 10 + pl := New() + for i := 0; i < n; i++ { + pl.Dec() + } + + doneCh := make(chan error) + go func() { + defer close(doneCh) + for i := 0; i < n; i++ { + pl.Inc() + pl.WaitIfNeeded() + if n := pl.DelaysTotal(); n != 0 { + doneCh <- fmt.Errorf("expecting zero number of delays") + return + } + } + doneCh <- nil + }() + + select { + case err := <-doneCh: + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + case <-time.After(5 * time.Second): + t.Fatalf("timeout") + } }) t.Run("concurrent_inc_dec", func(t *testing.T) { pl := New() diff --git a/lib/storage/block_stream_merger.go b/lib/storage/block_stream_merger.go index 436d12363..e41301e70 100644 --- a/lib/storage/block_stream_merger.go +++ b/lib/storage/block_stream_merger.go @@ -4,8 +4,6 @@ import ( "container/heap" "fmt" "io" - - "github.com/VictoriaMetrics/VictoriaMetrics/lib/pacelimiter" ) // blockStreamMerger is used for merging block streams. @@ -18,9 +16,6 @@ type blockStreamMerger struct { // Whether the call to NextBlock must be no-op. nextBlockNoop bool - // Optional pace limiter for limiting the pace for NextBlock calls. - pl *pacelimiter.PaceLimiter - // The last error err error } @@ -32,14 +27,11 @@ func (bsm *blockStreamMerger) reset() { } bsm.bsrHeap = bsm.bsrHeap[:0] bsm.nextBlockNoop = false - bsm.pl = nil bsm.err = nil } // Init initializes bsm with the given bsrs. -// -// pl is an optional pace limiter, which allows limiting the pace for NextBlock calls. -func (bsm *blockStreamMerger) Init(bsrs []*blockStreamReader, pl *pacelimiter.PaceLimiter) { +func (bsm *blockStreamMerger) Init(bsrs []*blockStreamReader) { bsm.reset() for _, bsr := range bsrs { if bsr.NextBlock() { @@ -60,7 +52,6 @@ func (bsm *blockStreamMerger) Init(bsrs []*blockStreamReader, pl *pacelimiter.Pa heap.Init(&bsm.bsrHeap) bsm.Block = &bsm.bsrHeap[0].Block bsm.nextBlockNoop = true - bsm.pl = pl } // NextBlock stores the next block in bsm.Block. @@ -75,9 +66,6 @@ func (bsm *blockStreamMerger) NextBlock() bool { bsm.nextBlockNoop = false return true } - if bsm.pl != nil { - bsm.pl.WaitIfNeeded() - } bsm.err = bsm.nextBlock() switch bsm.err { diff --git a/lib/storage/merge.go b/lib/storage/merge.go index 7c4a43508..403547b4e 100644 --- a/lib/storage/merge.go +++ b/lib/storage/merge.go @@ -6,7 +6,6 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/pacelimiter" "github.com/VictoriaMetrics/VictoriaMetrics/lib/uint64set" ) @@ -16,11 +15,11 @@ import ( // // rowsMerged is atomically updated with the number of merged rows during the merge. func mergeBlockStreams(ph *partHeader, bsw *blockStreamWriter, bsrs []*blockStreamReader, stopCh <-chan struct{}, - pl *pacelimiter.PaceLimiter, dmis *uint64set.Set, rowsMerged, rowsDeleted *uint64) error { + dmis *uint64set.Set, rowsMerged, rowsDeleted *uint64) error { ph.Reset() bsm := bsmPool.Get().(*blockStreamMerger) - bsm.Init(bsrs, pl) + bsm.Init(bsrs) err := mergeBlockStreamsInternal(ph, bsw, bsm, stopCh, dmis, rowsMerged, rowsDeleted) bsm.reset() bsmPool.Put(bsm) diff --git a/lib/storage/merge_test.go b/lib/storage/merge_test.go index b76bb09d4..8b4e5380d 100644 --- a/lib/storage/merge_test.go +++ b/lib/storage/merge_test.go @@ -3,8 +3,6 @@ package storage import ( "math/rand" "testing" - - "github.com/VictoriaMetrics/VictoriaMetrics/lib/storagepacelimiter" ) func TestMergeBlockStreamsOneStreamOneRow(t *testing.T) { @@ -366,7 +364,7 @@ func TestMergeForciblyStop(t *testing.T) { ch := make(chan struct{}) var rowsMerged, rowsDeleted uint64 close(ch) - if err := mergeBlockStreams(&mp.ph, &bsw, bsrs, ch, nil, nil, &rowsMerged, &rowsDeleted); err != errForciblyStopped { + if err := mergeBlockStreams(&mp.ph, &bsw, bsrs, ch, nil, &rowsMerged, &rowsDeleted); err != errForciblyStopped { t.Fatalf("unexpected error in mergeBlockStreams: got %v; want %v", err, errForciblyStopped) } if rowsMerged != 0 { @@ -386,7 +384,7 @@ func testMergeBlockStreams(t *testing.T, bsrs []*blockStreamReader, expectedBloc bsw.InitFromInmemoryPart(&mp) var rowsMerged, rowsDeleted uint64 - if err := mergeBlockStreams(&mp.ph, &bsw, bsrs, nil, storagepacelimiter.BigMerges, nil, &rowsMerged, &rowsDeleted); err != nil { + if err := mergeBlockStreams(&mp.ph, &bsw, bsrs, nil, nil, &rowsMerged, &rowsDeleted); err != nil { t.Fatalf("unexpected error in mergeBlockStreams: %s", err) } diff --git a/lib/storage/merge_timing_test.go b/lib/storage/merge_timing_test.go index fe426b60c..73a03ea29 100644 --- a/lib/storage/merge_timing_test.go +++ b/lib/storage/merge_timing_test.go @@ -41,7 +41,7 @@ func benchmarkMergeBlockStreams(b *testing.B, mps []*inmemoryPart, rowsPerLoop i } mpOut.Reset() bsw.InitFromInmemoryPart(&mpOut) - if err := mergeBlockStreams(&mpOut.ph, &bsw, bsrs, nil, nil, nil, &rowsMerged, &rowsDeleted); err != nil { + if err := mergeBlockStreams(&mpOut.ph, &bsw, bsrs, nil, nil, &rowsMerged, &rowsDeleted); err != nil { panic(fmt.Errorf("cannot merge block streams: %w", err)) } } diff --git a/lib/storage/partition.go b/lib/storage/partition.go index 08ae49ef2..45e52345b 100644 --- a/lib/storage/partition.go +++ b/lib/storage/partition.go @@ -329,7 +329,6 @@ type partitionMetrics struct { SmallPartsRefCount uint64 SmallAssistedMerges uint64 - BigMergesDelays uint64 } // UpdateMetrics updates m with metrics from pt. @@ -388,8 +387,6 @@ func (pt *partition) UpdateMetrics(m *partitionMetrics) { m.SmallRowsDeleted += atomic.LoadUint64(&pt.smallRowsDeleted) m.SmallAssistedMerges += atomic.LoadUint64(&pt.smallAssistedMerges) - - m.BigMergesDelays = storagepacelimiter.BigMerges.DelaysTotal() } // AddRows adds the given rows to the partition pt. @@ -817,13 +814,7 @@ func (pt *partition) mergePartsOptimal(pws []*partWrapper) error { return nil } -var mergeWorkersCount = func() int { - n := runtime.GOMAXPROCS(-1) / 2 - if n <= 0 { - n = 1 - } - return n -}() +var mergeWorkersCount = runtime.GOMAXPROCS(-1) var ( bigMergeConcurrencyLimitCh = make(chan struct{}, mergeWorkersCount) @@ -935,10 +926,9 @@ func maxRowsByPath(path string) uint64 { // Calculate the maximum number of rows in the output merge part // by dividing the freeSpace by the number of concurrent // mergeWorkersCount for big parts. - // This assumes each row is compressed into 1 byte. Production - // simulation shows that each row usually occupies up to 0.5 bytes, - // so this is quite safe assumption. - maxRows := freeSpace / uint64(mergeWorkersCount) + // This assumes each row is compressed into 0.5 bytes + // according to production data. + maxRows := 2 * (freeSpace / uint64(mergeWorkersCount)) if maxRows > maxRowsPerBigPart { maxRows = maxRowsPerBigPart } @@ -1058,25 +1048,21 @@ func (pt *partition) mergeParts(pws []*partWrapper, stopCh <-chan struct{}) erro var ph partHeader rowsMerged := &pt.smallRowsMerged rowsDeleted := &pt.smallRowsDeleted - pl := storagepacelimiter.BigMerges if isBigPart { rowsMerged = &pt.bigRowsMerged rowsDeleted = &pt.bigRowsDeleted atomic.AddUint64(&pt.bigMergesCount, 1) atomic.AddUint64(&pt.activeBigMerges, 1) } else { - pl = nil atomic.AddUint64(&pt.smallMergesCount, 1) atomic.AddUint64(&pt.activeSmallMerges, 1) // Prioritize small merges over big merges. - storagepacelimiter.BigMerges.Inc() } - err := mergeBlockStreams(&ph, bsw, bsrs, stopCh, pl, dmis, rowsMerged, rowsDeleted) + err := mergeBlockStreams(&ph, bsw, bsrs, stopCh, dmis, rowsMerged, rowsDeleted) if isBigPart { atomic.AddUint64(&pt.activeBigMerges, ^uint64(0)) } else { atomic.AddUint64(&pt.activeSmallMerges, ^uint64(0)) - storagepacelimiter.BigMerges.Dec() } putBlockStreamWriter(bsw) if err != nil { diff --git a/lib/storagepacelimiter/storagepacelimiter.go b/lib/storagepacelimiter/storagepacelimiter.go index 4eda4d95f..e309e6c52 100644 --- a/lib/storagepacelimiter/storagepacelimiter.go +++ b/lib/storagepacelimiter/storagepacelimiter.go @@ -8,8 +8,3 @@ import ( // // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/291 var Search = pacelimiter.New() - -// BigMerges limits the pace for big merges when there is at least a single in-flight small merge. -// -// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/648 -var BigMerges = pacelimiter.New()