mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-10 15:14:09 +00:00
590160ddbb
The added helper functions - SetLength() and ExtendCapacity() - replace error-prone code with simple function calls.
202 lines
4.2 KiB
Go
202 lines
4.2 KiB
Go
package storage
|
|
|
|
import (
|
|
"container/heap"
|
|
"fmt"
|
|
"io"
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil"
|
|
)
|
|
|
|
// partitionSearch represents a search in the partition.
|
|
type partitionSearch struct {
|
|
// BlockRef is the block found after NextBlock call.
|
|
BlockRef *BlockRef
|
|
|
|
// pt is a partition to search.
|
|
pt *partition
|
|
|
|
// pws hold parts snapshot for the given partition during Init call.
|
|
// This snapshot is used for calling Part.PutParts on partitionSearch.MustClose.
|
|
pws []*partWrapper
|
|
|
|
psPool []partSearch
|
|
psHeap partSearchHeap
|
|
|
|
err error
|
|
|
|
nextBlockNoop bool
|
|
needClosing bool
|
|
}
|
|
|
|
func (pts *partitionSearch) reset() {
|
|
pts.BlockRef = nil
|
|
pts.pt = nil
|
|
|
|
for i := range pts.pws {
|
|
pts.pws[i] = nil
|
|
}
|
|
pts.pws = pts.pws[:0]
|
|
|
|
for i := range pts.psPool {
|
|
pts.psPool[i].reset()
|
|
}
|
|
pts.psPool = pts.psPool[:0]
|
|
|
|
for i := range pts.psHeap {
|
|
pts.psHeap[i] = nil
|
|
}
|
|
pts.psHeap = pts.psHeap[:0]
|
|
|
|
pts.err = nil
|
|
pts.nextBlockNoop = false
|
|
pts.needClosing = false
|
|
}
|
|
|
|
// Init initializes the search in the given partition for the given tsid and tr.
|
|
//
|
|
// tsids must be sorted.
|
|
// tsids cannot be modified after the Init call, since it is owned by pts.
|
|
//
|
|
// MustClose must be called when partition search is done.
|
|
func (pts *partitionSearch) Init(pt *partition, tsids []TSID, tr TimeRange) {
|
|
if pts.needClosing {
|
|
logger.Panicf("BUG: missing partitionSearch.MustClose call before the next call to Init")
|
|
}
|
|
|
|
pts.reset()
|
|
pts.pt = pt
|
|
pts.needClosing = true
|
|
|
|
if len(tsids) == 0 {
|
|
// Fast path - zero tsids.
|
|
pts.err = io.EOF
|
|
return
|
|
}
|
|
|
|
if pt.tr.MinTimestamp > tr.MaxTimestamp || pt.tr.MaxTimestamp < tr.MinTimestamp {
|
|
// Fast path - the partition doesn't contain rows for the given time range.
|
|
pts.err = io.EOF
|
|
return
|
|
}
|
|
|
|
pts.pws = pt.GetParts(pts.pws[:0], true)
|
|
|
|
// Initialize psPool.
|
|
pts.psPool = slicesutil.SetLength(pts.psPool, len(pts.pws))
|
|
for i, pw := range pts.pws {
|
|
pts.psPool[i].Init(pw.p, tsids, tr)
|
|
}
|
|
|
|
// Initialize the psHeap.
|
|
pts.psHeap = pts.psHeap[:0]
|
|
for i := range pts.psPool {
|
|
ps := &pts.psPool[i]
|
|
if !ps.NextBlock() {
|
|
if err := ps.Error(); err != nil {
|
|
// Return only the first error, since it has no sense in returning all errors.
|
|
pts.err = fmt.Errorf("cannot initialize partition search: %w", err)
|
|
return
|
|
}
|
|
continue
|
|
}
|
|
pts.psHeap = append(pts.psHeap, ps)
|
|
}
|
|
if len(pts.psHeap) == 0 {
|
|
pts.err = io.EOF
|
|
return
|
|
}
|
|
heap.Init(&pts.psHeap)
|
|
pts.BlockRef = &pts.psHeap[0].BlockRef
|
|
pts.nextBlockNoop = true
|
|
}
|
|
|
|
// NextBlock advances to the next block.
|
|
//
|
|
// The blocks are sorted by (TDIS, MinTimestamp). Two subsequent blocks
|
|
// for the same TSID may contain overlapped time ranges.
|
|
func (pts *partitionSearch) NextBlock() bool {
|
|
if pts.err != nil {
|
|
return false
|
|
}
|
|
if pts.nextBlockNoop {
|
|
pts.nextBlockNoop = false
|
|
return true
|
|
}
|
|
|
|
pts.err = pts.nextBlock()
|
|
if pts.err != nil {
|
|
if pts.err != io.EOF {
|
|
pts.err = fmt.Errorf("cannot obtain the next block to search in the partition: %w", pts.err)
|
|
}
|
|
return false
|
|
}
|
|
return true
|
|
}
|
|
|
|
func (pts *partitionSearch) nextBlock() error {
|
|
psMin := pts.psHeap[0]
|
|
if psMin.NextBlock() {
|
|
heap.Fix(&pts.psHeap, 0)
|
|
pts.BlockRef = &pts.psHeap[0].BlockRef
|
|
return nil
|
|
}
|
|
|
|
if err := psMin.Error(); err != nil {
|
|
return err
|
|
}
|
|
|
|
heap.Pop(&pts.psHeap)
|
|
|
|
if len(pts.psHeap) == 0 {
|
|
return io.EOF
|
|
}
|
|
|
|
pts.BlockRef = &pts.psHeap[0].BlockRef
|
|
return nil
|
|
}
|
|
|
|
func (pts *partitionSearch) Error() error {
|
|
if pts.err == io.EOF {
|
|
return nil
|
|
}
|
|
return pts.err
|
|
}
|
|
|
|
// MustClose closes the pts.
|
|
func (pts *partitionSearch) MustClose() {
|
|
if !pts.needClosing {
|
|
logger.Panicf("BUG: missing Init call before the MustClose call")
|
|
}
|
|
|
|
pts.pt.PutParts(pts.pws)
|
|
pts.reset()
|
|
}
|
|
|
|
type partSearchHeap []*partSearch
|
|
|
|
func (psh *partSearchHeap) Len() int {
|
|
return len(*psh)
|
|
}
|
|
|
|
func (psh *partSearchHeap) Less(i, j int) bool {
|
|
x := *psh
|
|
return x[i].BlockRef.bh.Less(&x[j].BlockRef.bh)
|
|
}
|
|
|
|
func (psh *partSearchHeap) Swap(i, j int) {
|
|
x := *psh
|
|
x[i], x[j] = x[j], x[i]
|
|
}
|
|
|
|
func (psh *partSearchHeap) Push(x interface{}) {
|
|
*psh = append(*psh, x.(*partSearch))
|
|
}
|
|
|
|
func (psh *partSearchHeap) Pop() interface{} {
|
|
a := *psh
|
|
v := a[len(a)-1]
|
|
*psh = a[:len(a)-1]
|
|
return v
|
|
}
|