diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 5446f3ceca..65367251a7 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -20,6 +20,7 @@ The following tip changes can be tested by building VictoriaMetrics components f * FEATURE: [vmauth](https://docs.victoriametrics.com/vmagent.html): allow multiple sections with duplicate `username` but with different `password` values at `-auth.config` file. * FEATURE: add ability to push internal metrics (e.g. metrics exposed at `/metrics` page) to the configured remote storage from all the VictoriaMetrics components. See [these docs](https://docs.victoriametrics.com/#push-metrics). * FEATURE: improve performance for heavy queries over big number of time series on systems with big number of CPU cores. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2896). Thanks to @zqyzyq for [the idea](https://github.com/VictoriaMetrics/VictoriaMetrics/commit/b596ac3745314fcc170a14e3ded062971cf7ced2). +* FEATURE: improve performance for registering new time series in `indexdb` by up to 50%. Thanks to @ahfuzhang for [the issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2249). * BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): set `up` metric to `0` for partial scrapes in [stream parsing mode](https://docs.victoriametrics.com/vmagent.html#stream-parsing-mode). Previously the `up` metric was set to `1` when at least a single metric has been scraped before the error. This aligns the behaviour of `vmselect` with Prometheus. * BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): restart all the scrape jobs during [config reload](https://docs.victoriametrics.com/vmagent.html#configuration-update) after `global` section is changed inside `-promscrape.config`. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2884). diff --git a/lib/mergeset/block_stream_reader.go b/lib/mergeset/block_stream_reader.go index 42436716de..ff49f23b03 100644 --- a/lib/mergeset/block_stream_reader.go +++ b/lib/mergeset/block_stream_reader.go @@ -17,6 +17,9 @@ type blockStreamReader struct { // Block contains the current block if Next returned true. Block inmemoryBlock + // isInmemoryBlock is set to true if bsr was initialized with InitFromInmemoryBlock(). + isInmemoryBlock bool + // The index of the current item in the Block, which is returned from CurrItem() currItemIdx int @@ -67,6 +70,7 @@ type blockStreamReader struct { func (bsr *blockStreamReader) reset() { bsr.Block.Reset() + bsr.isInmemoryBlock = false bsr.currItemIdx = 0 bsr.path = "" bsr.ph.Reset() @@ -99,6 +103,14 @@ func (bsr *blockStreamReader) String() string { return bsr.ph.String() } +// InitFromInmemoryBlock initializes bsr from the given ib. +func (bsr *blockStreamReader) InitFromInmemoryBlock(ib *inmemoryBlock) { + bsr.reset() + bsr.Block.CopyFrom(ib) + bsr.Block.SortItems() + bsr.isInmemoryBlock = true +} + // InitFromInmemoryPart initializes bsr from the given mp. func (bsr *blockStreamReader) InitFromInmemoryPart(mp *inmemoryPart) { bsr.reset() @@ -179,10 +191,11 @@ func (bsr *blockStreamReader) InitFromFilePart(path string) error { // // It closes *Reader files passed to Init. func (bsr *blockStreamReader) MustClose() { - bsr.indexReader.MustClose() - bsr.itemsReader.MustClose() - bsr.lensReader.MustClose() - + if !bsr.isInmemoryBlock { + bsr.indexReader.MustClose() + bsr.itemsReader.MustClose() + bsr.lensReader.MustClose() + } bsr.reset() } @@ -194,6 +207,10 @@ func (bsr *blockStreamReader) Next() bool { if bsr.err != nil { return false } + if bsr.isInmemoryBlock { + bsr.err = io.EOF + return true + } if bsr.bhIdx >= len(bsr.bhs) { // The current index block is over. Try reading the next index block. diff --git a/lib/mergeset/encoding.go b/lib/mergeset/encoding.go index 991d65f329..1236c18614 100644 --- a/lib/mergeset/encoding.go +++ b/lib/mergeset/encoding.go @@ -1,6 +1,7 @@ package mergeset import ( + "bytes" "fmt" "os" "reflect" @@ -36,7 +37,7 @@ func (it Item) Bytes(data []byte) []byte { return data } -// String returns string represetnation of it obtained from data. +// String returns string representation of it obtained from data. // // The returned string representation belongs to data. func (it Item) String(data []byte) string { @@ -56,7 +57,7 @@ func (ib *inmemoryBlock) Less(i, j int) bool { a.Start += cpLen b.Start += cpLen data := ib.data - return string(a.Bytes(data)) < string(b.Bytes(data)) + return a.String(data) < b.String(data) } func (ib *inmemoryBlock) Swap(i, j int) { @@ -76,6 +77,21 @@ type inmemoryBlock struct { items []Item } +func (ib *inmemoryBlock) CopyFrom(src *inmemoryBlock) { + ib.commonPrefix = append(ib.commonPrefix[:0], src.commonPrefix...) + ib.data = append(ib.data[:0], src.data...) + ib.items = append(ib.items[:0], src.items...) +} + +func (ib *inmemoryBlock) SortItems() { + if !ib.isSorted() { + ib.updateCommonPrefixUnsorted() + sort.Sort(ib) + } else { + ib.updateCommonPrefixSorted() + } +} + func (ib *inmemoryBlock) SizeBytes() int { return int(unsafe.Sizeof(*ib)) + cap(ib.commonPrefix) + cap(ib.data) + cap(ib.items)*int(unsafe.Sizeof(Item{})) } @@ -110,7 +126,11 @@ func (ib *inmemoryBlock) updateCommonPrefixUnsorted() { data := ib.data cp := items[0].Bytes(data) for _, it := range items[1:] { - cpLen := commonPrefixLen(cp, it.Bytes(data)) + item := it.Bytes(data) + if bytes.HasPrefix(item, cp) { + continue + } + cpLen := commonPrefixLen(cp, item) if cpLen == 0 { return } @@ -199,12 +219,7 @@ func (ib *inmemoryBlock) isSorted() bool { // - returns the number of items encoded including the first item. // - returns the marshal type used for the encoding. func (ib *inmemoryBlock) MarshalUnsortedData(sb *storageBlock, firstItemDst, commonPrefixDst []byte, compressLevel int) ([]byte, []byte, uint32, marshalType) { - if !ib.isSorted() { - ib.updateCommonPrefixUnsorted() - sort.Sort(ib) - } else { - ib.updateCommonPrefixSorted() - } + ib.SortItems() return ib.marshalData(sb, firstItemDst, commonPrefixDst, compressLevel) } diff --git a/lib/mergeset/table.go b/lib/mergeset/table.go index eea3bf93c7..b9e9f8c5b8 100644 --- a/lib/mergeset/table.go +++ b/lib/mergeset/table.go @@ -718,23 +718,28 @@ func (tb *Table) mergeRawItemsBlocks(ibs []*inmemoryBlock, isFinal bool) { } func (tb *Table) mergeInmemoryBlocks(ibs []*inmemoryBlock) *partWrapper { - // Convert ibs into inmemoryPart's - mps := make([]*inmemoryPart, 0, len(ibs)) + atomic.AddUint64(&tb.mergesCount, 1) + atomic.AddUint64(&tb.activeMerges, 1) + defer atomic.AddUint64(&tb.activeMerges, ^uint64(0)) + + // Prepare blockStreamReaders for source blocks. + bsrs := make([]*blockStreamReader, 0, len(ibs)) for _, ib := range ibs { if len(ib.items) == 0 { continue } - mp := getInmemoryPart() - mp.Init(ib) + bsr := getBlockStreamReader() + bsr.InitFromInmemoryBlock(ib) putInmemoryBlock(ib) - mps = append(mps, mp) + bsrs = append(bsrs, bsr) } - if len(mps) == 0 { + if len(bsrs) == 0 { return nil } - if len(mps) == 1 { + if len(bsrs) == 1 { // Nothing to merge. Just return a single inmemory part. - mp := mps[0] + mp := getInmemoryPart() + mp.Init(&bsrs[0].Block) p := mp.NewPart() return &partWrapper{ p: p, @@ -742,24 +747,6 @@ func (tb *Table) mergeInmemoryBlocks(ibs []*inmemoryBlock) *partWrapper { refCount: 1, } } - defer func() { - // Return source inmemoryParts to pool. - for _, mp := range mps { - putInmemoryPart(mp) - } - }() - - atomic.AddUint64(&tb.mergesCount, 1) - atomic.AddUint64(&tb.activeMerges, 1) - defer atomic.AddUint64(&tb.activeMerges, ^uint64(0)) - - // Prepare blockStreamReaders for source parts. - bsrs := make([]*blockStreamReader, 0, len(mps)) - for _, mp := range mps { - bsr := getBlockStreamReader() - bsr.InitFromInmemoryPart(mp) - bsrs = append(bsrs, bsr) - } // Prepare blockStreamWriter for destination part. bsw := getBlockStreamWriter()