package storage import ( "container/heap" "fmt" "io" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" ) // partitionSearch represents a search in the partition. type partitionSearch struct { // Block is the block found after NextBlock call. Block *Block // pt is a partition to search. pt *partition // pws hold parts snapshot for the given partition during Init call. // This snapshot is used for calling Part.PutParts on partitionSearch.MustClose. pws []*partWrapper psPool []partSearch psHeap partSearchHeap err error nextBlockNoop bool needClosing bool } func (pts *partitionSearch) reset() { pts.Block = nil pts.pt = nil for i := range pts.pws { pts.pws[i] = nil } pts.pws = pts.pws[:0] for i := range pts.psPool { pts.psPool[i].reset() } pts.psPool = pts.psPool[:0] for i := range pts.psHeap { pts.psHeap[i] = nil } pts.psHeap = pts.psHeap[:0] pts.err = nil pts.nextBlockNoop = false pts.needClosing = false } // Init initializes the search in the given partition for the given tsid and tr. // // MustClose must be called when partition search is done. func (pts *partitionSearch) Init(pt *partition, tsids []TSID, tr TimeRange) { if pts.needClosing { logger.Panicf("BUG: missing partitionSearch.MustClose call before the next call to Init") } pts.reset() pts.pt = pt pts.needClosing = true if len(tsids) == 0 { // Fast path - zero tsids. pts.err = io.EOF return } if pt.tr.MinTimestamp > tr.MaxTimestamp || pt.tr.MaxTimestamp < tr.MinTimestamp { // Fast path - the partition doesn't contain rows for the given time range. pts.err = io.EOF return } pts.pws = pt.GetParts(pts.pws[:0]) // Initialize psPool. if n := len(pts.pws) - cap(pts.psPool); n > 0 { pts.psPool = append(pts.psPool[:cap(pts.psPool)], make([]partSearch, n)...) } pts.psPool = pts.psPool[:len(pts.pws)] for i, pw := range pts.pws { pts.psPool[i].Init(pw.p, tsids, tr) } // Initialize the psHeap. var errors []error pts.psHeap = pts.psHeap[:0] for i := range pts.psPool { ps := &pts.psPool[i] if !ps.NextBlock() { if err := ps.Error(); err != nil { errors = append(errors, err) } continue } pts.psHeap = append(pts.psHeap, ps) } if len(errors) > 0 { // Return only the first error, since it has no sense in returning all errors. pts.err = fmt.Errorf("cannot initialize partition search: %s", errors[0]) return } if len(pts.psHeap) == 0 { pts.err = io.EOF return } heap.Init(&pts.psHeap) pts.Block = &pts.psHeap[0].Block pts.nextBlockNoop = true } // NextBlock advances to the next block. // // The blocks are sorted by (TDIS, MinTimestamp). Two subsequent blocks // for the same TSID may contain overlapped time ranges. func (pts *partitionSearch) NextBlock() bool { if pts.err != nil { return false } if pts.nextBlockNoop { pts.nextBlockNoop = false return true } pts.err = pts.nextBlock() if pts.err != nil { if pts.err != io.EOF { pts.err = fmt.Errorf("cannot obtain the next block to search in the partition: %s", pts.err) } return false } return true } func (pts *partitionSearch) nextBlock() error { psMin := pts.psHeap[0] if psMin.NextBlock() { heap.Fix(&pts.psHeap, 0) pts.Block = &pts.psHeap[0].Block return nil } if err := psMin.Error(); err != nil { return err } heap.Pop(&pts.psHeap) if len(pts.psHeap) == 0 { return io.EOF } pts.Block = &pts.psHeap[0].Block return nil } func (pts *partitionSearch) Error() error { if pts.err == io.EOF { return nil } return pts.err } // MustClose closes the pts. func (pts *partitionSearch) MustClose() { if !pts.needClosing { logger.Panicf("BUG: missing Init call before the MustClose call") } pts.pt.PutParts(pts.pws) pts.reset() } type partSearchHeap []*partSearch func (psh *partSearchHeap) Len() int { return len(*psh) } func (psh *partSearchHeap) Less(i, j int) bool { x := *psh return x[i].Block.bh.Less(&x[j].Block.bh) } func (psh *partSearchHeap) Swap(i, j int) { x := *psh x[i], x[j] = x[j], x[i] } func (psh *partSearchHeap) Push(x interface{}) { *psh = append(*psh, x.(*partSearch)) } func (psh *partSearchHeap) Pop() interface{} { a := *psh v := a[len(a)-1] *psh = a[:len(a)-1] return v }