lib/storage: skip deduplication when creating inmemory data blocks

The deduplication will be performed later during merging such blocks.
This commit is contained in:
Aliaksandr Valialkin 2021-02-09 02:24:59 +02:00
parent 3b146a9976
commit fda61e8e96
4 changed files with 11 additions and 9 deletions

View file

@ -184,9 +184,11 @@ func (bsw *blockStreamWriter) MustClose() {
}
// WriteExternalBlock writes b to bsw and updates ph and rowsMerged.
func (bsw *blockStreamWriter) WriteExternalBlock(b *Block, ph *partHeader, rowsMerged *uint64) {
func (bsw *blockStreamWriter) WriteExternalBlock(b *Block, ph *partHeader, rowsMerged *uint64, needDedup bool) {
atomic.AddUint64(rowsMerged, uint64(b.rowsCount()))
b.deduplicateSamplesDuringMerge()
if needDedup {
b.deduplicateSamplesDuringMerge()
}
headerData, timestampsData, valuesData := b.MarshalData(bsw.timestampsBlockOffset, bsw.valuesBlockOffset)
usePrevTimestamps := len(bsw.prevTimestampsData) > 0 && bytes.Equal(timestampsData, bsw.prevTimestampsData)
if usePrevTimestamps {

View file

@ -49,7 +49,7 @@ func benchmarkBlockStreamWriter(b *testing.B, ebs []Block, rowsCount int, writeR
bsw.InitFromInmemoryPart(&mp)
for i := range ebsCopy {
bsw.WriteExternalBlock(&ebsCopy[i], &ph, &rowsMerged)
bsw.WriteExternalBlock(&ebsCopy[i], &ph, &rowsMerged, false)
}
bsw.MustClose()
mp.Reset()

View file

@ -76,14 +76,14 @@ func mergeBlockStreamsInternal(ph *partHeader, bsw *blockStreamWriter, bsm *bloc
if bsm.Block.bh.TSID.Less(&pendingBlock.bh.TSID) {
logger.Panicf("BUG: the next TSID=%+v is smaller than the current TSID=%+v", &bsm.Block.bh.TSID, &pendingBlock.bh.TSID)
}
bsw.WriteExternalBlock(pendingBlock, ph, rowsMerged)
bsw.WriteExternalBlock(pendingBlock, ph, rowsMerged, true)
pendingBlock.CopyFrom(bsm.Block)
continue
}
if pendingBlock.tooBig() && pendingBlock.bh.MaxTimestamp <= bsm.Block.bh.MinTimestamp {
// Fast path - pendingBlock is too big and it doesn't overlap with bsm.Block.
// Write the pendingBlock and then deal with bsm.Block.
bsw.WriteExternalBlock(pendingBlock, ph, rowsMerged)
bsw.WriteExternalBlock(pendingBlock, ph, rowsMerged, true)
pendingBlock.CopyFrom(bsm.Block)
continue
}
@ -119,13 +119,13 @@ func mergeBlockStreamsInternal(ph *partHeader, bsw *blockStreamWriter, bsm *bloc
tmpBlock.timestamps = tmpBlock.timestamps[:maxRowsPerBlock]
tmpBlock.values = tmpBlock.values[:maxRowsPerBlock]
tmpBlock.fixupTimestamps()
bsw.WriteExternalBlock(tmpBlock, ph, rowsMerged)
bsw.WriteExternalBlock(tmpBlock, ph, rowsMerged, true)
}
if err := bsm.Error(); err != nil {
return fmt.Errorf("cannot read block to be merged: %w", err)
}
if !pendingBlockIsEmpty {
bsw.WriteExternalBlock(pendingBlock, ph, rowsMerged)
bsw.WriteExternalBlock(pendingBlock, ph, rowsMerged, true)
}
return nil
}

View file

@ -115,7 +115,7 @@ func (rrm *rawRowsMarshaler) marshalToInmemoryPart(mp *inmemoryPart, rows []rawR
rrm.auxValues, scale = decimal.AppendFloatToDecimal(rrm.auxValues[:0], rrm.auxFloatValues)
tmpBlock.Init(tsid, rrm.auxTimestamps, rrm.auxValues, scale, precisionBits)
rrm.bsw.WriteExternalBlock(tmpBlock, ph, &rowsMerged)
rrm.bsw.WriteExternalBlock(tmpBlock, ph, &rowsMerged, false)
tsid = &r.TSID
precisionBits = r.PrecisionBits
@ -125,7 +125,7 @@ func (rrm *rawRowsMarshaler) marshalToInmemoryPart(mp *inmemoryPart, rows []rawR
rrm.auxValues, scale = decimal.AppendFloatToDecimal(rrm.auxValues[:0], rrm.auxFloatValues)
tmpBlock.Init(tsid, rrm.auxTimestamps, rrm.auxValues, scale, precisionBits)
rrm.bsw.WriteExternalBlock(tmpBlock, ph, &rowsMerged)
rrm.bsw.WriteExternalBlock(tmpBlock, ph, &rowsMerged, false)
if rowsMerged != uint64(len(rows)) {
logger.Panicf("BUG: unexpected rowsMerged; got %d; want %d", rowsMerged, len(rows))
}