mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-03-11 15:34:56 +00:00
lib/storage: reorder mergeBlockStreams() args in order to make them more consistent
This commit is contained in:
parent
e4303d3d21
commit
228d137936
4 changed files with 9 additions and 9 deletions
|
@ -14,13 +14,13 @@ import (
|
||||||
// mergeBlockStreams returns immediately if stopCh is closed.
|
// mergeBlockStreams returns immediately if stopCh is closed.
|
||||||
//
|
//
|
||||||
// rowsMerged is atomically updated with the number of merged rows during the merge.
|
// rowsMerged is atomically updated with the number of merged rows during the merge.
|
||||||
func mergeBlockStreams(ph *partHeader, bsw *blockStreamWriter, bsrs []*blockStreamReader, stopCh <-chan struct{}, rowsMerged *uint64,
|
func mergeBlockStreams(ph *partHeader, bsw *blockStreamWriter, bsrs []*blockStreamReader, stopCh <-chan struct{},
|
||||||
deletedMetricIDs *uint64set.Set, rowsDeleted *uint64) error {
|
deletedMetricIDs *uint64set.Set, rowsMerged, rowsDeleted *uint64) error {
|
||||||
ph.Reset()
|
ph.Reset()
|
||||||
|
|
||||||
bsm := bsmPool.Get().(*blockStreamMerger)
|
bsm := bsmPool.Get().(*blockStreamMerger)
|
||||||
bsm.Init(bsrs)
|
bsm.Init(bsrs)
|
||||||
err := mergeBlockStreamsInternal(ph, bsw, bsm, stopCh, rowsMerged, deletedMetricIDs, rowsDeleted)
|
err := mergeBlockStreamsInternal(ph, bsw, bsm, stopCh, deletedMetricIDs, rowsMerged, rowsDeleted)
|
||||||
bsm.reset()
|
bsm.reset()
|
||||||
bsmPool.Put(bsm)
|
bsmPool.Put(bsm)
|
||||||
bsw.MustClose()
|
bsw.MustClose()
|
||||||
|
@ -41,8 +41,8 @@ var bsmPool = &sync.Pool{
|
||||||
|
|
||||||
var errForciblyStopped = fmt.Errorf("forcibly stopped")
|
var errForciblyStopped = fmt.Errorf("forcibly stopped")
|
||||||
|
|
||||||
func mergeBlockStreamsInternal(ph *partHeader, bsw *blockStreamWriter, bsm *blockStreamMerger, stopCh <-chan struct{}, rowsMerged *uint64,
|
func mergeBlockStreamsInternal(ph *partHeader, bsw *blockStreamWriter, bsm *blockStreamMerger, stopCh <-chan struct{},
|
||||||
deletedMetricIDs *uint64set.Set, rowsDeleted *uint64) error {
|
deletedMetricIDs *uint64set.Set, rowsMerged, rowsDeleted *uint64) error {
|
||||||
// Search for the first block to merge
|
// Search for the first block to merge
|
||||||
var pendingBlock *Block
|
var pendingBlock *Block
|
||||||
for bsm.NextBlock() {
|
for bsm.NextBlock() {
|
||||||
|
|
|
@ -364,7 +364,7 @@ func TestMergeForciblyStop(t *testing.T) {
|
||||||
ch := make(chan struct{})
|
ch := make(chan struct{})
|
||||||
var rowsMerged, rowsDeleted uint64
|
var rowsMerged, rowsDeleted uint64
|
||||||
close(ch)
|
close(ch)
|
||||||
if err := mergeBlockStreams(&mp.ph, &bsw, bsrs, ch, &rowsMerged, nil, &rowsDeleted); err != errForciblyStopped {
|
if err := mergeBlockStreams(&mp.ph, &bsw, bsrs, ch, nil, &rowsMerged, &rowsDeleted); err != errForciblyStopped {
|
||||||
t.Fatalf("unexpected error in mergeBlockStreams: got %v; want %v", err, errForciblyStopped)
|
t.Fatalf("unexpected error in mergeBlockStreams: got %v; want %v", err, errForciblyStopped)
|
||||||
}
|
}
|
||||||
if rowsMerged != 0 {
|
if rowsMerged != 0 {
|
||||||
|
@ -384,7 +384,7 @@ func testMergeBlockStreams(t *testing.T, bsrs []*blockStreamReader, expectedBloc
|
||||||
bsw.InitFromInmemoryPart(&mp)
|
bsw.InitFromInmemoryPart(&mp)
|
||||||
|
|
||||||
var rowsMerged, rowsDeleted uint64
|
var rowsMerged, rowsDeleted uint64
|
||||||
if err := mergeBlockStreams(&mp.ph, &bsw, bsrs, nil, &rowsMerged, nil, &rowsDeleted); err != nil {
|
if err := mergeBlockStreams(&mp.ph, &bsw, bsrs, nil, nil, &rowsMerged, &rowsDeleted); err != nil {
|
||||||
t.Fatalf("unexpected error in mergeBlockStreams: %s", err)
|
t.Fatalf("unexpected error in mergeBlockStreams: %s", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -41,7 +41,7 @@ func benchmarkMergeBlockStreams(b *testing.B, mps []*inmemoryPart, rowsPerLoop i
|
||||||
}
|
}
|
||||||
mpOut.Reset()
|
mpOut.Reset()
|
||||||
bsw.InitFromInmemoryPart(&mpOut)
|
bsw.InitFromInmemoryPart(&mpOut)
|
||||||
if err := mergeBlockStreams(&mpOut.ph, &bsw, bsrs, nil, &rowsMerged, nil, &rowsDeleted); err != nil {
|
if err := mergeBlockStreams(&mpOut.ph, &bsw, bsrs, nil, nil, &rowsMerged, &rowsDeleted); err != nil {
|
||||||
panic(fmt.Errorf("cannot merge block streams: %w", err))
|
panic(fmt.Errorf("cannot merge block streams: %w", err))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1066,7 +1066,7 @@ func (pt *partition) mergeParts(pws []*partWrapper, stopCh <-chan struct{}) erro
|
||||||
rowsDeleted = &pt.bigRowsDeleted
|
rowsDeleted = &pt.bigRowsDeleted
|
||||||
}
|
}
|
||||||
dmis := pt.getDeletedMetricIDs()
|
dmis := pt.getDeletedMetricIDs()
|
||||||
err := mergeBlockStreams(&ph, bsw, bsrs, stopCh, rowsMerged, dmis, rowsDeleted)
|
err := mergeBlockStreams(&ph, bsw, bsrs, stopCh, dmis, rowsMerged, rowsDeleted)
|
||||||
putBlockStreamWriter(bsw)
|
putBlockStreamWriter(bsw)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if err == errForciblyStopped {
|
if err == errForciblyStopped {
|
||||||
|
|
Loading…
Reference in a new issue