From 228d137936fcf57852070c454058da46aa16217a Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Wed, 22 Jul 2020 21:57:44 +0300 Subject: [PATCH] lib/storage: reorder mergeBlockStreams() args in order to make them more consistent --- lib/storage/merge.go | 10 +++++----- lib/storage/merge_test.go | 4 ++-- lib/storage/merge_timing_test.go | 2 +- lib/storage/partition.go | 2 +- 4 files changed, 9 insertions(+), 9 deletions(-) diff --git a/lib/storage/merge.go b/lib/storage/merge.go index 32963af501..1420e56924 100644 --- a/lib/storage/merge.go +++ b/lib/storage/merge.go @@ -14,13 +14,13 @@ import ( // mergeBlockStreams returns immediately if stopCh is closed. // // rowsMerged is atomically updated with the number of merged rows during the merge. -func mergeBlockStreams(ph *partHeader, bsw *blockStreamWriter, bsrs []*blockStreamReader, stopCh <-chan struct{}, rowsMerged *uint64, - deletedMetricIDs *uint64set.Set, rowsDeleted *uint64) error { +func mergeBlockStreams(ph *partHeader, bsw *blockStreamWriter, bsrs []*blockStreamReader, stopCh <-chan struct{}, + deletedMetricIDs *uint64set.Set, rowsMerged, rowsDeleted *uint64) error { ph.Reset() bsm := bsmPool.Get().(*blockStreamMerger) bsm.Init(bsrs) - err := mergeBlockStreamsInternal(ph, bsw, bsm, stopCh, rowsMerged, deletedMetricIDs, rowsDeleted) + err := mergeBlockStreamsInternal(ph, bsw, bsm, stopCh, deletedMetricIDs, rowsMerged, rowsDeleted) bsm.reset() bsmPool.Put(bsm) bsw.MustClose() @@ -41,8 +41,8 @@ var bsmPool = &sync.Pool{ var errForciblyStopped = fmt.Errorf("forcibly stopped") -func mergeBlockStreamsInternal(ph *partHeader, bsw *blockStreamWriter, bsm *blockStreamMerger, stopCh <-chan struct{}, rowsMerged *uint64, - deletedMetricIDs *uint64set.Set, rowsDeleted *uint64) error { +func mergeBlockStreamsInternal(ph *partHeader, bsw *blockStreamWriter, bsm *blockStreamMerger, stopCh <-chan struct{}, + deletedMetricIDs *uint64set.Set, rowsMerged, rowsDeleted *uint64) error { // Search for the first block to merge var pendingBlock *Block for bsm.NextBlock() { diff --git a/lib/storage/merge_test.go b/lib/storage/merge_test.go index 971919a7e0..8b4e5380d1 100644 --- a/lib/storage/merge_test.go +++ b/lib/storage/merge_test.go @@ -364,7 +364,7 @@ func TestMergeForciblyStop(t *testing.T) { ch := make(chan struct{}) var rowsMerged, rowsDeleted uint64 close(ch) - if err := mergeBlockStreams(&mp.ph, &bsw, bsrs, ch, &rowsMerged, nil, &rowsDeleted); err != errForciblyStopped { + if err := mergeBlockStreams(&mp.ph, &bsw, bsrs, ch, nil, &rowsMerged, &rowsDeleted); err != errForciblyStopped { t.Fatalf("unexpected error in mergeBlockStreams: got %v; want %v", err, errForciblyStopped) } if rowsMerged != 0 { @@ -384,7 +384,7 @@ func testMergeBlockStreams(t *testing.T, bsrs []*blockStreamReader, expectedBloc bsw.InitFromInmemoryPart(&mp) var rowsMerged, rowsDeleted uint64 - if err := mergeBlockStreams(&mp.ph, &bsw, bsrs, nil, &rowsMerged, nil, &rowsDeleted); err != nil { + if err := mergeBlockStreams(&mp.ph, &bsw, bsrs, nil, nil, &rowsMerged, &rowsDeleted); err != nil { t.Fatalf("unexpected error in mergeBlockStreams: %s", err) } diff --git a/lib/storage/merge_timing_test.go b/lib/storage/merge_timing_test.go index d4ae7d8528..73a03ea297 100644 --- a/lib/storage/merge_timing_test.go +++ b/lib/storage/merge_timing_test.go @@ -41,7 +41,7 @@ func benchmarkMergeBlockStreams(b *testing.B, mps []*inmemoryPart, rowsPerLoop i } mpOut.Reset() bsw.InitFromInmemoryPart(&mpOut) - if err := mergeBlockStreams(&mpOut.ph, &bsw, bsrs, nil, &rowsMerged, nil, &rowsDeleted); err != nil { + if err := mergeBlockStreams(&mpOut.ph, &bsw, bsrs, nil, nil, &rowsMerged, &rowsDeleted); err != nil { panic(fmt.Errorf("cannot merge block streams: %w", err)) } } diff --git a/lib/storage/partition.go b/lib/storage/partition.go index 3bf57a5e78..56a30c8d87 100644 --- a/lib/storage/partition.go +++ b/lib/storage/partition.go @@ -1066,7 +1066,7 @@ func (pt *partition) mergeParts(pws []*partWrapper, stopCh <-chan struct{}) erro rowsDeleted = &pt.bigRowsDeleted } dmis := pt.getDeletedMetricIDs() - err := mergeBlockStreams(&ph, bsw, bsrs, stopCh, rowsMerged, dmis, rowsDeleted) + err := mergeBlockStreams(&ph, bsw, bsrs, stopCh, dmis, rowsMerged, rowsDeleted) putBlockStreamWriter(bsw) if err != nil { if err == errForciblyStopped {