app/vmselect/netstorage: reduce memory usage when the time range from query touches big number of samples per each time series

This commit is contained in:
Aliaksandr Valialkin 2020-09-15 21:06:04 +03:00
parent 27cd5555e6
commit 03dfccfbed

View file

@ -207,10 +207,10 @@ func (upw *unpackWork) reset() {
} }
} }
func (upw *unpackWork) unpack() { func (upw *unpackWork) unpack(tmpBlock *storage.Block) {
for _, w := range upw.ws { for _, w := range upw.ws {
sb := getSortBlock() sb := getSortBlock()
if err := sb.unpackFrom(upw.tbf, w.addr, w.tr, upw.fetchData, upw.at); err != nil { if err := sb.unpackFrom(tmpBlock, upw.tbf, w.addr, w.tr, upw.fetchData, upw.at); err != nil {
putSortBlock(sb) putSortBlock(sb)
upw.doneCh <- fmt.Errorf("cannot unpack block: %w", err) upw.doneCh <- fmt.Errorf("cannot unpack block: %w", err)
return return
@ -244,8 +244,9 @@ func init() {
} }
func unpackWorker() { func unpackWorker() {
var tmpBlock storage.Block
for upw := range unpackWorkCh { for upw := range unpackWorkCh {
upw.unpack() upw.unpack(&tmpBlock)
} }
} }
@ -381,30 +382,26 @@ func mergeSortBlocks(dst *Result, sbh sortBlocksHeap) {
var dedupsDuringSelect = metrics.NewCounter(`vm_deduplicated_samples_total{type="select"}`) var dedupsDuringSelect = metrics.NewCounter(`vm_deduplicated_samples_total{type="select"}`)
type sortBlock struct { type sortBlock struct {
// b is used as a temporary storage for unpacked rows before they
// go to Timestamps and Values.
b storage.Block
Timestamps []int64 Timestamps []int64
Values []float64 Values []float64
NextIdx int NextIdx int
} }
func (sb *sortBlock) reset() { func (sb *sortBlock) reset() {
sb.b.Reset()
sb.Timestamps = sb.Timestamps[:0] sb.Timestamps = sb.Timestamps[:0]
sb.Values = sb.Values[:0] sb.Values = sb.Values[:0]
sb.NextIdx = 0 sb.NextIdx = 0
} }
func (sb *sortBlock) unpackFrom(tbf *tmpBlocksFile, addr tmpBlockAddr, tr storage.TimeRange, fetchData bool, at *auth.Token) error { func (sb *sortBlock) unpackFrom(tmpBlock *storage.Block, tbf *tmpBlocksFile, addr tmpBlockAddr, tr storage.TimeRange, fetchData bool, at *auth.Token) error {
tmpBlock.Reset()
tbf.MustReadBlockAt(&sb.b, addr) tbf.MustReadBlockAt(&sb.b, addr)
if fetchData { if fetchData {
if err := sb.b.UnmarshalData(); err != nil { if err := tmpBlock.UnmarshalData(); err != nil {
return fmt.Errorf("cannot unmarshal block: %w", err) return fmt.Errorf("cannot unmarshal block: %w", err)
} }
} }
timestamps := sb.b.Timestamps() timestamps := tmpBlock.Timestamps()
// Skip timestamps smaller than tr.MinTimestamp. // Skip timestamps smaller than tr.MinTimestamp.
i := 0 i := 0
@ -417,16 +414,16 @@ func (sb *sortBlock) unpackFrom(tbf *tmpBlocksFile, addr tmpBlockAddr, tr storag
for j > i && timestamps[j-1] > tr.MaxTimestamp { for j > i && timestamps[j-1] > tr.MaxTimestamp {
j-- j--
} }
skippedRows := sb.b.RowsCount() - (j - i) skippedRows := tmpBlock.RowsCount() - (j - i)
metricRowsSkipped.Add(skippedRows) metricRowsSkipped.Add(skippedRows)
// Copy the remaining values. // Copy the remaining values.
if i == j { if i == j {
return nil return nil
} }
values := sb.b.Values() values := tmpBlock.Values()
sb.Timestamps = append(sb.Timestamps, timestamps[i:j]...) sb.Timestamps = append(sb.Timestamps, timestamps[i:j]...)
sb.Values = decimal.AppendDecimalToFloat(sb.Values, values[i:j], sb.b.Scale()) sb.Values = decimal.AppendDecimalToFloat(sb.Values, values[i:j], tmpBlock.Scale())
return nil return nil
} }