diff --git a/app/vmstorage/main.go b/app/vmstorage/main.go index 8cf420d47..2b820d962 100644 --- a/app/vmstorage/main.go +++ b/app/vmstorage/main.go @@ -363,9 +363,6 @@ func registerStorageMetrics(strg *storage.Storage) { metrics.NewGauge(`vm_search_delays_total`, func() float64 { return float64(m().SearchDelays) }) - metrics.NewGauge(`vm_big_merges_delays_total`, func() float64 { - return float64(tm().BigMergesDelays) - }) metrics.NewGauge(`vm_slow_row_inserts_total`, func() float64 { return float64(m().SlowRowInserts) diff --git a/lib/mergeset/merge.go b/lib/mergeset/merge.go index 4b3f36fc6..709ce6557 100644 --- a/lib/mergeset/merge.go +++ b/lib/mergeset/merge.go @@ -7,7 +7,6 @@ import ( "sync/atomic" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/pacelimiter" ) // PrepareBlockCallback can transform the passed items allocated at the given data. @@ -29,9 +28,9 @@ type PrepareBlockCallback func(data []byte, items [][]byte) ([]byte, [][]byte) // // It also atomically adds the number of items merged to itemsMerged. func mergeBlockStreams(ph *partHeader, bsw *blockStreamWriter, bsrs []*blockStreamReader, prepareBlock PrepareBlockCallback, stopCh <-chan struct{}, - pl *pacelimiter.PaceLimiter, itemsMerged *uint64) error { + itemsMerged *uint64) error { bsm := bsmPool.Get().(*blockStreamMerger) - if err := bsm.Init(bsrs, prepareBlock, pl); err != nil { + if err := bsm.Init(bsrs, prepareBlock); err != nil { return fmt.Errorf("cannot initialize blockStreamMerger: %w", err) } err := bsm.Merge(bsw, ph, stopCh, itemsMerged) @@ -63,9 +62,6 @@ type blockStreamMerger struct { phFirstItemCaught bool - // optional pace limiter for merge process. - pl *pacelimiter.PaceLimiter - // This are auxiliary buffers used in flushIB // for consistency checks after prepareBlock call. firstItem []byte @@ -82,13 +78,11 @@ func (bsm *blockStreamMerger) reset() { bsm.ib.Reset() bsm.phFirstItemCaught = false - bsm.pl = nil } -func (bsm *blockStreamMerger) Init(bsrs []*blockStreamReader, prepareBlock PrepareBlockCallback, pl *pacelimiter.PaceLimiter) error { +func (bsm *blockStreamMerger) Init(bsrs []*blockStreamReader, prepareBlock PrepareBlockCallback) error { bsm.reset() bsm.prepareBlock = prepareBlock - bsm.pl = pl for _, bsr := range bsrs { if bsr.Next() { bsm.bsrHeap = append(bsm.bsrHeap, bsr) @@ -111,9 +105,6 @@ var errForciblyStopped = fmt.Errorf("forcibly stopped") func (bsm *blockStreamMerger) Merge(bsw *blockStreamWriter, ph *partHeader, stopCh <-chan struct{}, itemsMerged *uint64) error { again: - if bsm.pl != nil { - bsm.pl.WaitIfNeeded() - } if len(bsm.bsrHeap) == 0 { // Write the last (maybe incomplete) inmemoryBlock to bsw. bsm.flushIB(bsw, ph, itemsMerged) diff --git a/lib/mergeset/merge_test.go b/lib/mergeset/merge_test.go index 971fcc298..a4d7cc058 100644 --- a/lib/mergeset/merge_test.go +++ b/lib/mergeset/merge_test.go @@ -7,8 +7,6 @@ import ( "sort" "testing" "time" - - "github.com/VictoriaMetrics/VictoriaMetrics/lib/storagepacelimiter" ) func TestMergeBlockStreams(t *testing.T) { @@ -32,14 +30,14 @@ func TestMultilevelMerge(t *testing.T) { var dstIP1 inmemoryPart var bsw1 blockStreamWriter bsw1.InitFromInmemoryPart(&dstIP1) - if err := mergeBlockStreams(&dstIP1.ph, &bsw1, bsrs[:5], nil, nil, nil, &itemsMerged); err != nil { + if err := mergeBlockStreams(&dstIP1.ph, &bsw1, bsrs[:5], nil, nil, &itemsMerged); err != nil { t.Fatalf("cannot merge first level part 1: %s", err) } var dstIP2 inmemoryPart var bsw2 blockStreamWriter bsw2.InitFromInmemoryPart(&dstIP2) - if err := mergeBlockStreams(&dstIP2.ph, &bsw2, bsrs[5:], nil, nil, storagepacelimiter.BigMerges, &itemsMerged); err != nil { + if err := mergeBlockStreams(&dstIP2.ph, &bsw2, bsrs[5:], nil, nil, &itemsMerged); err != nil { t.Fatalf("cannot merge first level part 2: %s", err) } @@ -56,7 +54,7 @@ func TestMultilevelMerge(t *testing.T) { newTestBlockStreamReader(&dstIP2), } bsw.InitFromInmemoryPart(&dstIP) - if err := mergeBlockStreams(&dstIP.ph, &bsw, bsrsTop, nil, nil, storagepacelimiter.BigMerges, &itemsMerged); err != nil { + if err := mergeBlockStreams(&dstIP.ph, &bsw, bsrsTop, nil, nil, &itemsMerged); err != nil { t.Fatalf("cannot merge second level: %s", err) } if itemsMerged != uint64(len(items)) { @@ -78,7 +76,7 @@ func TestMergeForciblyStop(t *testing.T) { ch := make(chan struct{}) var itemsMerged uint64 close(ch) - if err := mergeBlockStreams(&dstIP.ph, &bsw, bsrs, nil, ch, nil, &itemsMerged); err != errForciblyStopped { + if err := mergeBlockStreams(&dstIP.ph, &bsw, bsrs, nil, ch, &itemsMerged); err != errForciblyStopped { t.Fatalf("unexpected error during merge: got %v; want %v", err, errForciblyStopped) } if itemsMerged != 0 { @@ -122,7 +120,7 @@ func testMergeBlockStreamsSerial(blocksToMerge, maxItemsPerBlock int) error { var dstIP inmemoryPart var bsw blockStreamWriter bsw.InitFromInmemoryPart(&dstIP) - if err := mergeBlockStreams(&dstIP.ph, &bsw, bsrs, nil, nil, storagepacelimiter.BigMerges, &itemsMerged); err != nil { + if err := mergeBlockStreams(&dstIP.ph, &bsw, bsrs, nil, nil, &itemsMerged); err != nil { return fmt.Errorf("cannot merge block streams: %w", err) } if itemsMerged != uint64(len(items)) { diff --git a/lib/mergeset/part_search_test.go b/lib/mergeset/part_search_test.go index 9a3d7f9d9..37b87f13d 100644 --- a/lib/mergeset/part_search_test.go +++ b/lib/mergeset/part_search_test.go @@ -150,7 +150,7 @@ func newTestPart(blocksCount, maxItemsPerBlock int) (*part, []string, error) { var ip inmemoryPart var bsw blockStreamWriter bsw.InitFromInmemoryPart(&ip) - if err := mergeBlockStreams(&ip.ph, &bsw, bsrs, nil, nil, nil, &itemsMerged); err != nil { + if err := mergeBlockStreams(&ip.ph, &bsw, bsrs, nil, nil, &itemsMerged); err != nil { return nil, nil, fmt.Errorf("cannot merge blocks: %w", err) } if itemsMerged != uint64(len(items)) { diff --git a/lib/mergeset/table.go b/lib/mergeset/table.go index 4f5d25374..f309c039d 100644 --- a/lib/mergeset/table.go +++ b/lib/mergeset/table.go @@ -635,11 +635,7 @@ func (tb *Table) mergeInmemoryBlocks(blocksToMerge []*inmemoryBlock) *partWrappe // Merge parts. // The merge shouldn't be interrupted by stopCh, // since it may be final after stopCh is closed. - // - // Prioritize merging of inmemory blocks over merging file parts. - storagepacelimiter.BigMerges.Inc() - err := mergeBlockStreams(&mpDst.ph, bsw, bsrs, tb.prepareBlock, nil, nil, &tb.itemsMerged) - storagepacelimiter.BigMerges.Dec() + err := mergeBlockStreams(&mpDst.ph, bsw, bsrs, tb.prepareBlock, nil, &tb.itemsMerged) if err != nil { logger.Panicf("FATAL: cannot merge inmemoryBlocks: %s", err) } @@ -801,7 +797,7 @@ func (tb *Table) mergeParts(pws []*partWrapper, stopCh <-chan struct{}, isOuterP // Merge parts into a temporary location. var ph partHeader - err := mergeBlockStreams(&ph, bsw, bsrs, tb.prepareBlock, stopCh, storagepacelimiter.BigMerges, &tb.itemsMerged) + err := mergeBlockStreams(&ph, bsw, bsrs, tb.prepareBlock, stopCh, &tb.itemsMerged) putBlockStreamWriter(bsw) if err != nil { if err == errForciblyStopped { @@ -949,9 +945,7 @@ func (tb *Table) maxOutPartItemsSlow() uint64 { return freeSpace / uint64(mergeWorkersCount) / 4 } -var mergeWorkersCount = func() int { - return runtime.GOMAXPROCS(-1) -}() +var mergeWorkersCount = runtime.GOMAXPROCS(-1) func openParts(path string) ([]*partWrapper, error) { // The path can be missing after restoring from backup, so create it if needed. diff --git a/lib/pacelimiter/pacelimiter_test.go b/lib/pacelimiter/pacelimiter_test.go index 2b136e3b9..4c5bbf252 100644 --- a/lib/pacelimiter/pacelimiter_test.go +++ b/lib/pacelimiter/pacelimiter_test.go @@ -1,6 +1,7 @@ package pacelimiter import ( + "fmt" "runtime" "sync" "testing" @@ -73,6 +74,43 @@ func TestPacelimiter(t *testing.T) { } // Verify that the pl is unblocked now. pl.WaitIfNeeded() + + // Verify that negative count doesn't block pl. + pl.Dec() + pl.WaitIfNeeded() + if n := pl.DelaysTotal(); n == 0 { + t.Fatalf("expecting non-zero number of delays after subsequent pl.Dec()") + } + }) + t.Run("negative_count", func(t *testing.T) { + n := 10 + pl := New() + for i := 0; i < n; i++ { + pl.Dec() + } + + doneCh := make(chan error) + go func() { + defer close(doneCh) + for i := 0; i < n; i++ { + pl.Inc() + pl.WaitIfNeeded() + if n := pl.DelaysTotal(); n != 0 { + doneCh <- fmt.Errorf("expecting zero number of delays") + return + } + } + doneCh <- nil + }() + + select { + case err := <-doneCh: + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + case <-time.After(5 * time.Second): + t.Fatalf("timeout") + } }) t.Run("concurrent_inc_dec", func(t *testing.T) { pl := New() diff --git a/lib/storage/block_stream_merger.go b/lib/storage/block_stream_merger.go index 436d12363..e41301e70 100644 --- a/lib/storage/block_stream_merger.go +++ b/lib/storage/block_stream_merger.go @@ -4,8 +4,6 @@ import ( "container/heap" "fmt" "io" - - "github.com/VictoriaMetrics/VictoriaMetrics/lib/pacelimiter" ) // blockStreamMerger is used for merging block streams. @@ -18,9 +16,6 @@ type blockStreamMerger struct { // Whether the call to NextBlock must be no-op. nextBlockNoop bool - // Optional pace limiter for limiting the pace for NextBlock calls. - pl *pacelimiter.PaceLimiter - // The last error err error } @@ -32,14 +27,11 @@ func (bsm *blockStreamMerger) reset() { } bsm.bsrHeap = bsm.bsrHeap[:0] bsm.nextBlockNoop = false - bsm.pl = nil bsm.err = nil } // Init initializes bsm with the given bsrs. -// -// pl is an optional pace limiter, which allows limiting the pace for NextBlock calls. -func (bsm *blockStreamMerger) Init(bsrs []*blockStreamReader, pl *pacelimiter.PaceLimiter) { +func (bsm *blockStreamMerger) Init(bsrs []*blockStreamReader) { bsm.reset() for _, bsr := range bsrs { if bsr.NextBlock() { @@ -60,7 +52,6 @@ func (bsm *blockStreamMerger) Init(bsrs []*blockStreamReader, pl *pacelimiter.Pa heap.Init(&bsm.bsrHeap) bsm.Block = &bsm.bsrHeap[0].Block bsm.nextBlockNoop = true - bsm.pl = pl } // NextBlock stores the next block in bsm.Block. @@ -75,9 +66,6 @@ func (bsm *blockStreamMerger) NextBlock() bool { bsm.nextBlockNoop = false return true } - if bsm.pl != nil { - bsm.pl.WaitIfNeeded() - } bsm.err = bsm.nextBlock() switch bsm.err { diff --git a/lib/storage/merge.go b/lib/storage/merge.go index 7c4a43508..403547b4e 100644 --- a/lib/storage/merge.go +++ b/lib/storage/merge.go @@ -6,7 +6,6 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/pacelimiter" "github.com/VictoriaMetrics/VictoriaMetrics/lib/uint64set" ) @@ -16,11 +15,11 @@ import ( // // rowsMerged is atomically updated with the number of merged rows during the merge. func mergeBlockStreams(ph *partHeader, bsw *blockStreamWriter, bsrs []*blockStreamReader, stopCh <-chan struct{}, - pl *pacelimiter.PaceLimiter, dmis *uint64set.Set, rowsMerged, rowsDeleted *uint64) error { + dmis *uint64set.Set, rowsMerged, rowsDeleted *uint64) error { ph.Reset() bsm := bsmPool.Get().(*blockStreamMerger) - bsm.Init(bsrs, pl) + bsm.Init(bsrs) err := mergeBlockStreamsInternal(ph, bsw, bsm, stopCh, dmis, rowsMerged, rowsDeleted) bsm.reset() bsmPool.Put(bsm) diff --git a/lib/storage/merge_test.go b/lib/storage/merge_test.go index b76bb09d4..8b4e5380d 100644 --- a/lib/storage/merge_test.go +++ b/lib/storage/merge_test.go @@ -3,8 +3,6 @@ package storage import ( "math/rand" "testing" - - "github.com/VictoriaMetrics/VictoriaMetrics/lib/storagepacelimiter" ) func TestMergeBlockStreamsOneStreamOneRow(t *testing.T) { @@ -366,7 +364,7 @@ func TestMergeForciblyStop(t *testing.T) { ch := make(chan struct{}) var rowsMerged, rowsDeleted uint64 close(ch) - if err := mergeBlockStreams(&mp.ph, &bsw, bsrs, ch, nil, nil, &rowsMerged, &rowsDeleted); err != errForciblyStopped { + if err := mergeBlockStreams(&mp.ph, &bsw, bsrs, ch, nil, &rowsMerged, &rowsDeleted); err != errForciblyStopped { t.Fatalf("unexpected error in mergeBlockStreams: got %v; want %v", err, errForciblyStopped) } if rowsMerged != 0 { @@ -386,7 +384,7 @@ func testMergeBlockStreams(t *testing.T, bsrs []*blockStreamReader, expectedBloc bsw.InitFromInmemoryPart(&mp) var rowsMerged, rowsDeleted uint64 - if err := mergeBlockStreams(&mp.ph, &bsw, bsrs, nil, storagepacelimiter.BigMerges, nil, &rowsMerged, &rowsDeleted); err != nil { + if err := mergeBlockStreams(&mp.ph, &bsw, bsrs, nil, nil, &rowsMerged, &rowsDeleted); err != nil { t.Fatalf("unexpected error in mergeBlockStreams: %s", err) } diff --git a/lib/storage/merge_timing_test.go b/lib/storage/merge_timing_test.go index fe426b60c..73a03ea29 100644 --- a/lib/storage/merge_timing_test.go +++ b/lib/storage/merge_timing_test.go @@ -41,7 +41,7 @@ func benchmarkMergeBlockStreams(b *testing.B, mps []*inmemoryPart, rowsPerLoop i } mpOut.Reset() bsw.InitFromInmemoryPart(&mpOut) - if err := mergeBlockStreams(&mpOut.ph, &bsw, bsrs, nil, nil, nil, &rowsMerged, &rowsDeleted); err != nil { + if err := mergeBlockStreams(&mpOut.ph, &bsw, bsrs, nil, nil, &rowsMerged, &rowsDeleted); err != nil { panic(fmt.Errorf("cannot merge block streams: %w", err)) } } diff --git a/lib/storage/partition.go b/lib/storage/partition.go index 08ae49ef2..45e52345b 100644 --- a/lib/storage/partition.go +++ b/lib/storage/partition.go @@ -329,7 +329,6 @@ type partitionMetrics struct { SmallPartsRefCount uint64 SmallAssistedMerges uint64 - BigMergesDelays uint64 } // UpdateMetrics updates m with metrics from pt. @@ -388,8 +387,6 @@ func (pt *partition) UpdateMetrics(m *partitionMetrics) { m.SmallRowsDeleted += atomic.LoadUint64(&pt.smallRowsDeleted) m.SmallAssistedMerges += atomic.LoadUint64(&pt.smallAssistedMerges) - - m.BigMergesDelays = storagepacelimiter.BigMerges.DelaysTotal() } // AddRows adds the given rows to the partition pt. @@ -817,13 +814,7 @@ func (pt *partition) mergePartsOptimal(pws []*partWrapper) error { return nil } -var mergeWorkersCount = func() int { - n := runtime.GOMAXPROCS(-1) / 2 - if n <= 0 { - n = 1 - } - return n -}() +var mergeWorkersCount = runtime.GOMAXPROCS(-1) var ( bigMergeConcurrencyLimitCh = make(chan struct{}, mergeWorkersCount) @@ -935,10 +926,9 @@ func maxRowsByPath(path string) uint64 { // Calculate the maximum number of rows in the output merge part // by dividing the freeSpace by the number of concurrent // mergeWorkersCount for big parts. - // This assumes each row is compressed into 1 byte. Production - // simulation shows that each row usually occupies up to 0.5 bytes, - // so this is quite safe assumption. - maxRows := freeSpace / uint64(mergeWorkersCount) + // This assumes each row is compressed into 0.5 bytes + // according to production data. + maxRows := 2 * (freeSpace / uint64(mergeWorkersCount)) if maxRows > maxRowsPerBigPart { maxRows = maxRowsPerBigPart } @@ -1058,25 +1048,21 @@ func (pt *partition) mergeParts(pws []*partWrapper, stopCh <-chan struct{}) erro var ph partHeader rowsMerged := &pt.smallRowsMerged rowsDeleted := &pt.smallRowsDeleted - pl := storagepacelimiter.BigMerges if isBigPart { rowsMerged = &pt.bigRowsMerged rowsDeleted = &pt.bigRowsDeleted atomic.AddUint64(&pt.bigMergesCount, 1) atomic.AddUint64(&pt.activeBigMerges, 1) } else { - pl = nil atomic.AddUint64(&pt.smallMergesCount, 1) atomic.AddUint64(&pt.activeSmallMerges, 1) // Prioritize small merges over big merges. - storagepacelimiter.BigMerges.Inc() } - err := mergeBlockStreams(&ph, bsw, bsrs, stopCh, pl, dmis, rowsMerged, rowsDeleted) + err := mergeBlockStreams(&ph, bsw, bsrs, stopCh, dmis, rowsMerged, rowsDeleted) if isBigPart { atomic.AddUint64(&pt.activeBigMerges, ^uint64(0)) } else { atomic.AddUint64(&pt.activeSmallMerges, ^uint64(0)) - storagepacelimiter.BigMerges.Dec() } putBlockStreamWriter(bsw) if err != nil { diff --git a/lib/storagepacelimiter/storagepacelimiter.go b/lib/storagepacelimiter/storagepacelimiter.go index 4eda4d95f..e309e6c52 100644 --- a/lib/storagepacelimiter/storagepacelimiter.go +++ b/lib/storagepacelimiter/storagepacelimiter.go @@ -8,8 +8,3 @@ import ( // // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/291 var Search = pacelimiter.New() - -// BigMerges limits the pace for big merges when there is at least a single in-flight small merge. -// -// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/648 -var BigMerges = pacelimiter.New()