lib/mergeset: optimize mergeInmemoryBlocks() function

Do not spend CPU time on converting inmemoryBlock structs to inmemoryPart structs.
Just merge inmemoryBlock structs directly.

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2249
This commit is contained in:
Aliaksandr Valialkin 2022-07-27 23:47:18 +03:00
parent 3bbe9054d3
commit 962ed46583
No known key found for this signature in database
GPG key ID: A72BEC6CD3D0DED1
4 changed files with 59 additions and 39 deletions

View file

@ -20,6 +20,7 @@ The following tip changes can be tested by building VictoriaMetrics components f
* FEATURE: [vmauth](https://docs.victoriametrics.com/vmagent.html): allow multiple sections with duplicate `username` but with different `password` values at `-auth.config` file.
* FEATURE: add ability to push internal metrics (e.g. metrics exposed at `/metrics` page) to the configured remote storage from all the VictoriaMetrics components. See [these docs](https://docs.victoriametrics.com/#push-metrics).
* FEATURE: improve performance for heavy queries over big number of time series on systems with big number of CPU cores. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2896). Thanks to @zqyzyq for [the idea](https://github.com/VictoriaMetrics/VictoriaMetrics/commit/b596ac3745314fcc170a14e3ded062971cf7ced2).
* FEATURE: improve performance for registering new time series in `indexdb` by up to 50%. Thanks to @ahfuzhang for [the issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2249).
* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): set `up` metric to `0` for partial scrapes in [stream parsing mode](https://docs.victoriametrics.com/vmagent.html#stream-parsing-mode). Previously the `up` metric was set to `1` when at least a single metric has been scraped before the error. This aligns the behaviour of `vmselect` with Prometheus.
* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): restart all the scrape jobs during [config reload](https://docs.victoriametrics.com/vmagent.html#configuration-update) after `global` section is changed inside `-promscrape.config`. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2884).

View file

@ -17,6 +17,9 @@ type blockStreamReader struct {
// Block contains the current block if Next returned true.
Block inmemoryBlock
// isInmemoryBlock is set to true if bsr was initialized with InitFromInmemoryBlock().
isInmemoryBlock bool
// The index of the current item in the Block, which is returned from CurrItem()
currItemIdx int
@ -67,6 +70,7 @@ type blockStreamReader struct {
func (bsr *blockStreamReader) reset() {
bsr.Block.Reset()
bsr.isInmemoryBlock = false
bsr.currItemIdx = 0
bsr.path = ""
bsr.ph.Reset()
@ -99,6 +103,14 @@ func (bsr *blockStreamReader) String() string {
return bsr.ph.String()
}
// InitFromInmemoryBlock initializes bsr from the given ib.
func (bsr *blockStreamReader) InitFromInmemoryBlock(ib *inmemoryBlock) {
bsr.reset()
bsr.Block.CopyFrom(ib)
bsr.Block.SortItems()
bsr.isInmemoryBlock = true
}
// InitFromInmemoryPart initializes bsr from the given mp.
func (bsr *blockStreamReader) InitFromInmemoryPart(mp *inmemoryPart) {
bsr.reset()
@ -179,10 +191,11 @@ func (bsr *blockStreamReader) InitFromFilePart(path string) error {
//
// It closes *Reader files passed to Init.
func (bsr *blockStreamReader) MustClose() {
if !bsr.isInmemoryBlock {
bsr.indexReader.MustClose()
bsr.itemsReader.MustClose()
bsr.lensReader.MustClose()
}
bsr.reset()
}
@ -194,6 +207,10 @@ func (bsr *blockStreamReader) Next() bool {
if bsr.err != nil {
return false
}
if bsr.isInmemoryBlock {
bsr.err = io.EOF
return true
}
if bsr.bhIdx >= len(bsr.bhs) {
// The current index block is over. Try reading the next index block.

View file

@ -1,6 +1,7 @@
package mergeset
import (
"bytes"
"fmt"
"os"
"reflect"
@ -36,7 +37,7 @@ func (it Item) Bytes(data []byte) []byte {
return data
}
// String returns string represetnation of it obtained from data.
// String returns string representation of it obtained from data.
//
// The returned string representation belongs to data.
func (it Item) String(data []byte) string {
@ -56,7 +57,7 @@ func (ib *inmemoryBlock) Less(i, j int) bool {
a.Start += cpLen
b.Start += cpLen
data := ib.data
return string(a.Bytes(data)) < string(b.Bytes(data))
return a.String(data) < b.String(data)
}
func (ib *inmemoryBlock) Swap(i, j int) {
@ -76,6 +77,21 @@ type inmemoryBlock struct {
items []Item
}
func (ib *inmemoryBlock) CopyFrom(src *inmemoryBlock) {
ib.commonPrefix = append(ib.commonPrefix[:0], src.commonPrefix...)
ib.data = append(ib.data[:0], src.data...)
ib.items = append(ib.items[:0], src.items...)
}
func (ib *inmemoryBlock) SortItems() {
if !ib.isSorted() {
ib.updateCommonPrefixUnsorted()
sort.Sort(ib)
} else {
ib.updateCommonPrefixSorted()
}
}
func (ib *inmemoryBlock) SizeBytes() int {
return int(unsafe.Sizeof(*ib)) + cap(ib.commonPrefix) + cap(ib.data) + cap(ib.items)*int(unsafe.Sizeof(Item{}))
}
@ -110,7 +126,11 @@ func (ib *inmemoryBlock) updateCommonPrefixUnsorted() {
data := ib.data
cp := items[0].Bytes(data)
for _, it := range items[1:] {
cpLen := commonPrefixLen(cp, it.Bytes(data))
item := it.Bytes(data)
if bytes.HasPrefix(item, cp) {
continue
}
cpLen := commonPrefixLen(cp, item)
if cpLen == 0 {
return
}
@ -199,12 +219,7 @@ func (ib *inmemoryBlock) isSorted() bool {
// - returns the number of items encoded including the first item.
// - returns the marshal type used for the encoding.
func (ib *inmemoryBlock) MarshalUnsortedData(sb *storageBlock, firstItemDst, commonPrefixDst []byte, compressLevel int) ([]byte, []byte, uint32, marshalType) {
if !ib.isSorted() {
ib.updateCommonPrefixUnsorted()
sort.Sort(ib)
} else {
ib.updateCommonPrefixSorted()
}
ib.SortItems()
return ib.marshalData(sb, firstItemDst, commonPrefixDst, compressLevel)
}

View file

@ -718,23 +718,28 @@ func (tb *Table) mergeRawItemsBlocks(ibs []*inmemoryBlock, isFinal bool) {
}
func (tb *Table) mergeInmemoryBlocks(ibs []*inmemoryBlock) *partWrapper {
// Convert ibs into inmemoryPart's
mps := make([]*inmemoryPart, 0, len(ibs))
atomic.AddUint64(&tb.mergesCount, 1)
atomic.AddUint64(&tb.activeMerges, 1)
defer atomic.AddUint64(&tb.activeMerges, ^uint64(0))
// Prepare blockStreamReaders for source blocks.
bsrs := make([]*blockStreamReader, 0, len(ibs))
for _, ib := range ibs {
if len(ib.items) == 0 {
continue
}
mp := getInmemoryPart()
mp.Init(ib)
bsr := getBlockStreamReader()
bsr.InitFromInmemoryBlock(ib)
putInmemoryBlock(ib)
mps = append(mps, mp)
bsrs = append(bsrs, bsr)
}
if len(mps) == 0 {
if len(bsrs) == 0 {
return nil
}
if len(mps) == 1 {
if len(bsrs) == 1 {
// Nothing to merge. Just return a single inmemory part.
mp := mps[0]
mp := getInmemoryPart()
mp.Init(&bsrs[0].Block)
p := mp.NewPart()
return &partWrapper{
p: p,
@ -742,24 +747,6 @@ func (tb *Table) mergeInmemoryBlocks(ibs []*inmemoryBlock) *partWrapper {
refCount: 1,
}
}
defer func() {
// Return source inmemoryParts to pool.
for _, mp := range mps {
putInmemoryPart(mp)
}
}()
atomic.AddUint64(&tb.mergesCount, 1)
atomic.AddUint64(&tb.activeMerges, 1)
defer atomic.AddUint64(&tb.activeMerges, ^uint64(0))
// Prepare blockStreamReaders for source parts.
bsrs := make([]*blockStreamReader, 0, len(mps))
for _, mp := range mps {
bsr := getBlockStreamReader()
bsr.InitFromInmemoryPart(mp)
bsrs = append(bsrs, bsr)
}
// Prepare blockStreamWriter for destination part.
bsw := getBlockStreamWriter()