app/vmselect/netstorage: eliminate memory allocation for sortBlocksHeap arg when calling mergeSortBlocks()

This commit is contained in:
Aliaksandr Valialkin 2023-01-09 15:19:15 -08:00
parent e58fecb019
commit 9f02f5a05a
No known key found for this signature in database
GPG key ID: A72BEC6CD3D0DED1
3 changed files with 74 additions and 36 deletions

View file

@ -443,7 +443,12 @@ func (pts *packedTimeseries) Unpack(dst *Result, tbfs []*tmpBlocksFile, tr stora
// Wait until work is complete
samples := 0
sbs := make([]*sortBlock, 0, addrsLen)
sbh := getSortBlocksHeap()
sbs := sbh.sbs
if n := addrsLen - cap(sbs); n > 0 {
sbs = append(sbs[:cap(sbs)], make([]*sortBlock, n)...)
}
sbs = sbs[:0]
var firstErr error
for _, upw := range upws {
if err := <-upw.doneCh; err != nil && firstErr == nil {
@ -468,6 +473,7 @@ func (pts *packedTimeseries) Unpack(dst *Result, tbfs []*tmpBlocksFile, tr stora
}
putUnpackWork(upw)
}
sbh.sbs = sbs
// Shut down local workers
for _, workCh := range workChs {
@ -479,7 +485,8 @@ func (pts *packedTimeseries) Unpack(dst *Result, tbfs []*tmpBlocksFile, tr stora
return firstErr
}
dedupInterval := storage.GetDedupInterval()
mergeSortBlocks(dst, sbs, dedupInterval)
mergeSortBlocks(dst, sbh, dedupInterval)
putSortBlocksHeap(sbh)
return nil
}
@ -500,24 +507,25 @@ var sbPool sync.Pool
var metricRowsSkipped = metrics.NewCounter(`vm_metric_rows_skipped_total{name="vmselect"}`)
func mergeSortBlocks(dst *Result, sbh sortBlocksHeap, dedupInterval int64) {
func mergeSortBlocks(dst *Result, sbh *sortBlocksHeap, dedupInterval int64) {
// Skip empty sort blocks, since they cannot be passed to heap.Init.
src := sbh
sbh = sbh[:0]
for _, sb := range src {
sbs := sbh.sbs[:0]
for _, sb := range sbh.sbs {
if len(sb.Timestamps) == 0 {
putSortBlock(sb)
continue
}
sbh = append(sbh, sb)
sbs = append(sbs, sb)
}
if len(sbh) == 0 {
sbh.sbs = sbs
if sbh.Len() == 0 {
return
}
heap.Init(&sbh)
heap.Init(sbh)
for {
top := sbh[0]
if len(sbh) == 1 {
sbs := sbh.sbs
top := sbs[0]
if len(sbs) == 1 {
dst.Timestamps = append(dst.Timestamps, top.Timestamps[top.NextIdx:]...)
dst.Values = append(dst.Values, top.Values[top.NextIdx:]...)
putSortBlock(top)
@ -536,9 +544,9 @@ func mergeSortBlocks(dst *Result, sbh sortBlocksHeap, dedupInterval int64) {
dst.Values = append(dst.Values, top.Values[topNextIdx:top.NextIdx]...)
}
if top.NextIdx < len(top.Timestamps) {
heap.Fix(&sbh, 0)
heap.Fix(sbh, 0)
} else {
heap.Pop(&sbh)
heap.Pop(sbh)
putSortBlock(top)
}
}
@ -620,48 +628,73 @@ func (sb *sortBlock) unpackFrom(tmpBlock *storage.Block, tbfs []*tmpBlocksFile,
return nil
}
type sortBlocksHeap []*sortBlock
type sortBlocksHeap struct {
sbs []*sortBlock
}
func (sbh sortBlocksHeap) getNextBlock() *sortBlock {
if len(sbh) < 2 {
func (sbh *sortBlocksHeap) getNextBlock() *sortBlock {
sbs := sbh.sbs
if len(sbs) < 2 {
return nil
}
if len(sbh) < 3 {
return sbh[1]
if len(sbs) < 3 {
return sbs[1]
}
a := sbh[1]
b := sbh[2]
a := sbs[1]
b := sbs[2]
if a.Timestamps[a.NextIdx] <= b.Timestamps[b.NextIdx] {
return a
}
return b
}
func (sbh sortBlocksHeap) Len() int {
return len(sbh)
func (sbh *sortBlocksHeap) Len() int {
return len(sbh.sbs)
}
func (sbh sortBlocksHeap) Less(i, j int) bool {
a := sbh[i]
b := sbh[j]
func (sbh *sortBlocksHeap) Less(i, j int) bool {
sbs := sbh.sbs
a := sbs[i]
b := sbs[j]
return a.Timestamps[a.NextIdx] < b.Timestamps[b.NextIdx]
}
func (sbh sortBlocksHeap) Swap(i, j int) {
sbh[i], sbh[j] = sbh[j], sbh[i]
func (sbh *sortBlocksHeap) Swap(i, j int) {
sbs := sbh.sbs
sbs[i], sbs[j] = sbs[j], sbs[i]
}
func (sbh *sortBlocksHeap) Push(x interface{}) {
*sbh = append(*sbh, x.(*sortBlock))
sbh.sbs = append(sbh.sbs, x.(*sortBlock))
}
func (sbh *sortBlocksHeap) Pop() interface{} {
a := *sbh
v := a[len(a)-1]
*sbh = a[:len(a)-1]
sbs := sbh.sbs
v := sbs[len(sbs)-1]
sbs[len(sbs)-1] = nil
sbh.sbs = sbs[:len(sbs)-1]
return v
}
func getSortBlocksHeap() *sortBlocksHeap {
v := sbhPool.Get()
if v == nil {
return &sortBlocksHeap{}
}
return v.(*sortBlocksHeap)
}
func putSortBlocksHeap(sbh *sortBlocksHeap) {
sbs := sbh.sbs
for i := range sbs {
sbs[i] = nil
}
sbh.sbs = sbs[:0]
sbhPool.Put(sbh)
}
var sbhPool sync.Pool
// RegisterMetricNames registers metric names from mrs in the storage.
func RegisterMetricNames(qt *querytracer.Tracer, mrs []storage.MetricRow, deadline searchutils.Deadline) error {
qt = qt.NewChild("register metric names")

View file

@ -36,7 +36,10 @@ func TestMergeSortBlocks(t *testing.T) {
f := func(blocks []*sortBlock, dedupInterval int64, expectedResult *Result) {
t.Helper()
var result Result
mergeSortBlocks(&result, blocks, dedupInterval)
sbh := getSortBlocksHeap()
sbh.sbs = append(sbh.sbs[:0], blocks...)
mergeSortBlocks(&result, sbh, dedupInterval)
putSortBlocksHeap(sbh)
if !reflect.DeepEqual(result.Values, expectedResult.Values) {
t.Fatalf("unexpected values;\ngot\n%v\nwant\n%v", result.Values, expectedResult.Values)
}

View file

@ -90,16 +90,18 @@ func benchmarkMergeSortBlocks(b *testing.B, blocks []*sortBlock) {
b.ReportAllocs()
b.RunParallel(func(pb *testing.PB) {
var result Result
sbs := make(sortBlocksHeap, len(blocks))
sbh := getSortBlocksHeap()
for pb.Next() {
result.reset()
for i, b := range blocks {
sbs := sbh.sbs[:0]
for _, b := range blocks {
sb := getSortBlock()
sb.Timestamps = b.Timestamps
sb.Values = b.Values
sbs[i] = sb
sbs = append(sbs, sb)
}
mergeSortBlocks(&result, sbs, dedupInterval)
sbh.sbs = sbs
mergeSortBlocks(&result, sbh, dedupInterval)
}
})
}