diff --git a/lib/storage/block_stream_writer.go b/lib/storage/block_stream_writer.go index 899facbf7..f80157b6b 100644 --- a/lib/storage/block_stream_writer.go +++ b/lib/storage/block_stream_writer.go @@ -184,9 +184,11 @@ func (bsw *blockStreamWriter) MustClose() { } // WriteExternalBlock writes b to bsw and updates ph and rowsMerged. -func (bsw *blockStreamWriter) WriteExternalBlock(b *Block, ph *partHeader, rowsMerged *uint64) { +func (bsw *blockStreamWriter) WriteExternalBlock(b *Block, ph *partHeader, rowsMerged *uint64, needDedup bool) { atomic.AddUint64(rowsMerged, uint64(b.rowsCount())) - b.deduplicateSamplesDuringMerge() + if needDedup { + b.deduplicateSamplesDuringMerge() + } headerData, timestampsData, valuesData := b.MarshalData(bsw.timestampsBlockOffset, bsw.valuesBlockOffset) usePrevTimestamps := len(bsw.prevTimestampsData) > 0 && bytes.Equal(timestampsData, bsw.prevTimestampsData) if usePrevTimestamps { diff --git a/lib/storage/block_stream_writer_timing_test.go b/lib/storage/block_stream_writer_timing_test.go index 8e51d06a2..a8b2c0d06 100644 --- a/lib/storage/block_stream_writer_timing_test.go +++ b/lib/storage/block_stream_writer_timing_test.go @@ -49,7 +49,7 @@ func benchmarkBlockStreamWriter(b *testing.B, ebs []Block, rowsCount int, writeR bsw.InitFromInmemoryPart(&mp) for i := range ebsCopy { - bsw.WriteExternalBlock(&ebsCopy[i], &ph, &rowsMerged) + bsw.WriteExternalBlock(&ebsCopy[i], &ph, &rowsMerged, false) } bsw.MustClose() mp.Reset() diff --git a/lib/storage/merge.go b/lib/storage/merge.go index 073361064..2795b2439 100644 --- a/lib/storage/merge.go +++ b/lib/storage/merge.go @@ -76,14 +76,14 @@ func mergeBlockStreamsInternal(ph *partHeader, bsw *blockStreamWriter, bsm *bloc if bsm.Block.bh.TSID.Less(&pendingBlock.bh.TSID) { logger.Panicf("BUG: the next TSID=%+v is smaller than the current TSID=%+v", &bsm.Block.bh.TSID, &pendingBlock.bh.TSID) } - bsw.WriteExternalBlock(pendingBlock, ph, rowsMerged) + bsw.WriteExternalBlock(pendingBlock, ph, rowsMerged, true) pendingBlock.CopyFrom(bsm.Block) continue } if pendingBlock.tooBig() && pendingBlock.bh.MaxTimestamp <= bsm.Block.bh.MinTimestamp { // Fast path - pendingBlock is too big and it doesn't overlap with bsm.Block. // Write the pendingBlock and then deal with bsm.Block. - bsw.WriteExternalBlock(pendingBlock, ph, rowsMerged) + bsw.WriteExternalBlock(pendingBlock, ph, rowsMerged, true) pendingBlock.CopyFrom(bsm.Block) continue } @@ -119,13 +119,13 @@ func mergeBlockStreamsInternal(ph *partHeader, bsw *blockStreamWriter, bsm *bloc tmpBlock.timestamps = tmpBlock.timestamps[:maxRowsPerBlock] tmpBlock.values = tmpBlock.values[:maxRowsPerBlock] tmpBlock.fixupTimestamps() - bsw.WriteExternalBlock(tmpBlock, ph, rowsMerged) + bsw.WriteExternalBlock(tmpBlock, ph, rowsMerged, true) } if err := bsm.Error(); err != nil { return fmt.Errorf("cannot read block to be merged: %w", err) } if !pendingBlockIsEmpty { - bsw.WriteExternalBlock(pendingBlock, ph, rowsMerged) + bsw.WriteExternalBlock(pendingBlock, ph, rowsMerged, true) } return nil } diff --git a/lib/storage/raw_row.go b/lib/storage/raw_row.go index b1d978d33..e3dcdfc9f 100644 --- a/lib/storage/raw_row.go +++ b/lib/storage/raw_row.go @@ -115,7 +115,7 @@ func (rrm *rawRowsMarshaler) marshalToInmemoryPart(mp *inmemoryPart, rows []rawR rrm.auxValues, scale = decimal.AppendFloatToDecimal(rrm.auxValues[:0], rrm.auxFloatValues) tmpBlock.Init(tsid, rrm.auxTimestamps, rrm.auxValues, scale, precisionBits) - rrm.bsw.WriteExternalBlock(tmpBlock, ph, &rowsMerged) + rrm.bsw.WriteExternalBlock(tmpBlock, ph, &rowsMerged, false) tsid = &r.TSID precisionBits = r.PrecisionBits @@ -125,7 +125,7 @@ func (rrm *rawRowsMarshaler) marshalToInmemoryPart(mp *inmemoryPart, rows []rawR rrm.auxValues, scale = decimal.AppendFloatToDecimal(rrm.auxValues[:0], rrm.auxFloatValues) tmpBlock.Init(tsid, rrm.auxTimestamps, rrm.auxValues, scale, precisionBits) - rrm.bsw.WriteExternalBlock(tmpBlock, ph, &rowsMerged) + rrm.bsw.WriteExternalBlock(tmpBlock, ph, &rowsMerged, false) if rowsMerged != uint64(len(rows)) { logger.Panicf("BUG: unexpected rowsMerged; got %d; want %d", rowsMerged, len(rows)) }