mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-10 15:14:09 +00:00
app/vmselect/netstorage: eliminate memory allocation for sortBlocksHeap arg when calling mergeSortBlocks()
This commit is contained in:
parent
1f9d605988
commit
c38a10e143
3 changed files with 74 additions and 36 deletions
|
@ -426,7 +426,12 @@ func (pts *packedTimeseries) Unpack(dst *Result, tbf *tmpBlocksFile, tr storage.
|
|||
|
||||
// Wait until work is complete
|
||||
samples := 0
|
||||
sbs := make([]*sortBlock, 0, brsLen)
|
||||
sbh := getSortBlocksHeap()
|
||||
sbs := sbh.sbs
|
||||
if n := brsLen - cap(sbs); n > 0 {
|
||||
sbs = append(sbs[:cap(sbs)], make([]*sortBlock, n)...)
|
||||
}
|
||||
sbs = sbs[:0]
|
||||
var firstErr error
|
||||
for _, upw := range upws {
|
||||
if err := <-upw.doneCh; err != nil && firstErr == nil {
|
||||
|
@ -451,6 +456,7 @@ func (pts *packedTimeseries) Unpack(dst *Result, tbf *tmpBlocksFile, tr storage.
|
|||
}
|
||||
putUnpackWork(upw)
|
||||
}
|
||||
sbh.sbs = sbs
|
||||
|
||||
// Shut down local workers
|
||||
for _, workCh := range workChs {
|
||||
|
@ -462,7 +468,8 @@ func (pts *packedTimeseries) Unpack(dst *Result, tbf *tmpBlocksFile, tr storage.
|
|||
return firstErr
|
||||
}
|
||||
dedupInterval := storage.GetDedupInterval()
|
||||
mergeSortBlocks(dst, sbs, dedupInterval)
|
||||
mergeSortBlocks(dst, sbh, dedupInterval)
|
||||
putSortBlocksHeap(sbh)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -483,24 +490,25 @@ var sbPool sync.Pool
|
|||
|
||||
var metricRowsSkipped = metrics.NewCounter(`vm_metric_rows_skipped_total{name="vmselect"}`)
|
||||
|
||||
func mergeSortBlocks(dst *Result, sbh sortBlocksHeap, dedupInterval int64) {
|
||||
func mergeSortBlocks(dst *Result, sbh *sortBlocksHeap, dedupInterval int64) {
|
||||
// Skip empty sort blocks, since they cannot be passed to heap.Init.
|
||||
src := sbh
|
||||
sbh = sbh[:0]
|
||||
for _, sb := range src {
|
||||
sbs := sbh.sbs[:0]
|
||||
for _, sb := range sbh.sbs {
|
||||
if len(sb.Timestamps) == 0 {
|
||||
putSortBlock(sb)
|
||||
continue
|
||||
}
|
||||
sbh = append(sbh, sb)
|
||||
sbs = append(sbs, sb)
|
||||
}
|
||||
if len(sbh) == 0 {
|
||||
sbh.sbs = sbs
|
||||
if sbh.Len() == 0 {
|
||||
return
|
||||
}
|
||||
heap.Init(&sbh)
|
||||
heap.Init(sbh)
|
||||
for {
|
||||
top := sbh[0]
|
||||
if len(sbh) == 1 {
|
||||
sbs := sbh.sbs
|
||||
top := sbs[0]
|
||||
if len(sbs) == 1 {
|
||||
dst.Timestamps = append(dst.Timestamps, top.Timestamps[top.NextIdx:]...)
|
||||
dst.Values = append(dst.Values, top.Values[top.NextIdx:]...)
|
||||
putSortBlock(top)
|
||||
|
@ -519,9 +527,9 @@ func mergeSortBlocks(dst *Result, sbh sortBlocksHeap, dedupInterval int64) {
|
|||
dst.Values = append(dst.Values, top.Values[topNextIdx:top.NextIdx]...)
|
||||
}
|
||||
if top.NextIdx < len(top.Timestamps) {
|
||||
heap.Fix(&sbh, 0)
|
||||
heap.Fix(sbh, 0)
|
||||
} else {
|
||||
heap.Pop(&sbh)
|
||||
heap.Pop(sbh)
|
||||
putSortBlock(top)
|
||||
}
|
||||
}
|
||||
|
@ -604,48 +612,73 @@ func (sb *sortBlock) unpackFrom(tmpBlock *storage.Block, tbf *tmpBlocksFile, br
|
|||
return nil
|
||||
}
|
||||
|
||||
type sortBlocksHeap []*sortBlock
|
||||
type sortBlocksHeap struct {
|
||||
sbs []*sortBlock
|
||||
}
|
||||
|
||||
func (sbh sortBlocksHeap) getNextBlock() *sortBlock {
|
||||
if len(sbh) < 2 {
|
||||
func (sbh *sortBlocksHeap) getNextBlock() *sortBlock {
|
||||
sbs := sbh.sbs
|
||||
if len(sbs) < 2 {
|
||||
return nil
|
||||
}
|
||||
if len(sbh) < 3 {
|
||||
return sbh[1]
|
||||
if len(sbs) < 3 {
|
||||
return sbs[1]
|
||||
}
|
||||
a := sbh[1]
|
||||
b := sbh[2]
|
||||
a := sbs[1]
|
||||
b := sbs[2]
|
||||
if a.Timestamps[a.NextIdx] <= b.Timestamps[b.NextIdx] {
|
||||
return a
|
||||
}
|
||||
return b
|
||||
}
|
||||
|
||||
func (sbh sortBlocksHeap) Len() int {
|
||||
return len(sbh)
|
||||
func (sbh *sortBlocksHeap) Len() int {
|
||||
return len(sbh.sbs)
|
||||
}
|
||||
|
||||
func (sbh sortBlocksHeap) Less(i, j int) bool {
|
||||
a := sbh[i]
|
||||
b := sbh[j]
|
||||
func (sbh *sortBlocksHeap) Less(i, j int) bool {
|
||||
sbs := sbh.sbs
|
||||
a := sbs[i]
|
||||
b := sbs[j]
|
||||
return a.Timestamps[a.NextIdx] < b.Timestamps[b.NextIdx]
|
||||
}
|
||||
|
||||
func (sbh sortBlocksHeap) Swap(i, j int) {
|
||||
sbh[i], sbh[j] = sbh[j], sbh[i]
|
||||
func (sbh *sortBlocksHeap) Swap(i, j int) {
|
||||
sbs := sbh.sbs
|
||||
sbs[i], sbs[j] = sbs[j], sbs[i]
|
||||
}
|
||||
|
||||
func (sbh *sortBlocksHeap) Push(x interface{}) {
|
||||
*sbh = append(*sbh, x.(*sortBlock))
|
||||
sbh.sbs = append(sbh.sbs, x.(*sortBlock))
|
||||
}
|
||||
|
||||
func (sbh *sortBlocksHeap) Pop() interface{} {
|
||||
a := *sbh
|
||||
v := a[len(a)-1]
|
||||
*sbh = a[:len(a)-1]
|
||||
sbs := sbh.sbs
|
||||
v := sbs[len(sbs)-1]
|
||||
sbs[len(sbs)-1] = nil
|
||||
sbh.sbs = sbs[:len(sbs)-1]
|
||||
return v
|
||||
}
|
||||
|
||||
func getSortBlocksHeap() *sortBlocksHeap {
|
||||
v := sbhPool.Get()
|
||||
if v == nil {
|
||||
return &sortBlocksHeap{}
|
||||
}
|
||||
return v.(*sortBlocksHeap)
|
||||
}
|
||||
|
||||
func putSortBlocksHeap(sbh *sortBlocksHeap) {
|
||||
sbs := sbh.sbs
|
||||
for i := range sbs {
|
||||
sbs[i] = nil
|
||||
}
|
||||
sbh.sbs = sbs[:0]
|
||||
sbhPool.Put(sbh)
|
||||
}
|
||||
|
||||
var sbhPool sync.Pool
|
||||
|
||||
// DeleteSeries deletes time series matching the given tagFilterss.
|
||||
func DeleteSeries(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline searchutils.Deadline) (int, error) {
|
||||
qt = qt.NewChild("delete series: %s", sq)
|
||||
|
|
|
@ -9,7 +9,10 @@ func TestMergeSortBlocks(t *testing.T) {
|
|||
f := func(blocks []*sortBlock, dedupInterval int64, expectedResult *Result) {
|
||||
t.Helper()
|
||||
var result Result
|
||||
mergeSortBlocks(&result, blocks, dedupInterval)
|
||||
sbh := getSortBlocksHeap()
|
||||
sbh.sbs = append(sbh.sbs[:0], blocks...)
|
||||
mergeSortBlocks(&result, sbh, dedupInterval)
|
||||
putSortBlocksHeap(sbh)
|
||||
if !reflect.DeepEqual(result.Values, expectedResult.Values) {
|
||||
t.Fatalf("unexpected values;\ngot\n%v\nwant\n%v", result.Values, expectedResult.Values)
|
||||
}
|
||||
|
|
|
@ -90,16 +90,18 @@ func benchmarkMergeSortBlocks(b *testing.B, blocks []*sortBlock) {
|
|||
b.ReportAllocs()
|
||||
b.RunParallel(func(pb *testing.PB) {
|
||||
var result Result
|
||||
sbs := make(sortBlocksHeap, len(blocks))
|
||||
sbh := getSortBlocksHeap()
|
||||
for pb.Next() {
|
||||
result.reset()
|
||||
for i, b := range blocks {
|
||||
sbs := sbh.sbs[:0]
|
||||
for _, b := range blocks {
|
||||
sb := getSortBlock()
|
||||
sb.Timestamps = b.Timestamps
|
||||
sb.Values = b.Values
|
||||
sbs[i] = sb
|
||||
sbs = append(sbs, sb)
|
||||
}
|
||||
mergeSortBlocks(&result, sbs, dedupInterval)
|
||||
sbh.sbs = sbs
|
||||
mergeSortBlocks(&result, sbh, dedupInterval)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue