mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-20 15:16:42 +00:00
lib/storage: postpone reading data from blocks during search
This eliminates the need for storing block data into temporary files on a single-node VictoriaMetrics during heavy queries, which touch big number of time series over long time ranges. This improves single-node VM performance on heavy queries by up to 2x.
This commit is contained in:
parent
23a310cc68
commit
e933cbac16
13 changed files with 132 additions and 130 deletions
|
@ -819,7 +819,7 @@ type tmpBlocksFileWrapper struct {
|
|||
|
||||
func (tbfw *tmpBlocksFileWrapper) WriteBlock(mb *storage.MetricBlock) error {
|
||||
bb := tmpBufPool.Get()
|
||||
bb.B = storage.MarshalBlock(bb.B[:0], mb.Block)
|
||||
bb.B = storage.MarshalBlock(bb.B[:0], &mb.Block)
|
||||
tbfw.mu.Lock()
|
||||
addr, err := tbfw.tbf.WriteBlockData(bb.B)
|
||||
tmpBufPool.Put(bb)
|
||||
|
@ -1450,6 +1450,7 @@ func (sn *storageNode) processSearchQueryOnConn(tbfw *tmpBlocksFileWrapper, bc *
|
|||
|
||||
// Read response. It may consist of multiple MetricBlocks.
|
||||
blocksRead := 0
|
||||
var mb storage.MetricBlock
|
||||
for {
|
||||
buf, err = readBytes(buf[:0], bc, maxMetricBlockSize)
|
||||
if err != nil {
|
||||
|
@ -1459,8 +1460,6 @@ func (sn *storageNode) processSearchQueryOnConn(tbfw *tmpBlocksFileWrapper, bc *
|
|||
// Reached the end of the response
|
||||
return blocksRead, nil
|
||||
}
|
||||
var mb storage.MetricBlock
|
||||
mb.Block = &storage.Block{}
|
||||
tail, err := mb.Unmarshal(buf)
|
||||
if err != nil {
|
||||
return blocksRead, fmt.Errorf("cannot unmarshal MetricBlock #%d: %s", blocksRead, err)
|
||||
|
|
|
@ -351,6 +351,7 @@ type vmselectRequestCtx struct {
|
|||
sq storage.SearchQuery
|
||||
tfss []*storage.TagFilters
|
||||
sr storage.Search
|
||||
mb storage.MetricBlock
|
||||
}
|
||||
|
||||
func (ctx *vmselectRequestCtx) readUint32() (uint32, error) {
|
||||
|
@ -783,7 +784,7 @@ func (s *Server) processVMSelectSearchQuery(ctx *vmselectRequestCtx) error {
|
|||
MinTimestamp: ctx.sq.MinTimestamp,
|
||||
MaxTimestamp: ctx.sq.MaxTimestamp,
|
||||
}
|
||||
ctx.sr.Init(s.storage, ctx.tfss, tr, fetchData, *maxMetricsPerSearch)
|
||||
ctx.sr.Init(s.storage, ctx.tfss, tr, *maxMetricsPerSearch)
|
||||
defer ctx.sr.MustClose()
|
||||
if err := ctx.sr.Error(); err != nil {
|
||||
return ctx.writeErrorMessage(err)
|
||||
|
@ -796,12 +797,13 @@ func (s *Server) processVMSelectSearchQuery(ctx *vmselectRequestCtx) error {
|
|||
|
||||
// Send found blocks to vmselect.
|
||||
for ctx.sr.NextMetricBlock() {
|
||||
mb := ctx.sr.MetricBlock
|
||||
ctx.mb.MetricName = ctx.sr.MetricBlockRef.MetricName
|
||||
ctx.sr.MetricBlockRef.BlockRef.MustReadBlock(&ctx.mb.Block, fetchData)
|
||||
|
||||
vmselectMetricBlocksRead.Inc()
|
||||
vmselectMetricRowsRead.Add(mb.Block.RowsCount())
|
||||
vmselectMetricRowsRead.Add(ctx.mb.Block.RowsCount())
|
||||
|
||||
ctx.dataBuf = mb.Marshal(ctx.dataBuf[:0])
|
||||
ctx.dataBuf = ctx.mb.Marshal(ctx.dataBuf[:0])
|
||||
if err := ctx.writeDataBufBytes(); err != nil {
|
||||
return fmt.Errorf("cannot send MetricBlock: %s", err)
|
||||
}
|
||||
|
|
|
@ -94,7 +94,6 @@ func OpenReaderAt(path string) (*ReaderAt, error) {
|
|||
}
|
||||
r.mmapData = data
|
||||
}
|
||||
r.MustFadviseSequentialRead(false)
|
||||
readersCount.Inc()
|
||||
return &r, nil
|
||||
}
|
||||
|
|
|
@ -15,8 +15,8 @@ import (
|
|||
// partSearch represents blocks stream for the given search args
|
||||
// passed to Init.
|
||||
type partSearch struct {
|
||||
// Block contains the found block after NextBlock call.
|
||||
Block Block
|
||||
// BlockRef contains the reference to the found block after NextBlock call.
|
||||
BlockRef BlockRef
|
||||
|
||||
// p is the part to search.
|
||||
p *part
|
||||
|
@ -30,9 +30,6 @@ type partSearch struct {
|
|||
// tr is a time range to search.
|
||||
tr TimeRange
|
||||
|
||||
// Skip populating timestampsData and valuesData in Block if fetchData=false.
|
||||
fetchData bool
|
||||
|
||||
metaindex []metaindexRow
|
||||
|
||||
ibCache *indexBlockCache
|
||||
|
@ -49,11 +46,10 @@ type partSearch struct {
|
|||
}
|
||||
|
||||
func (ps *partSearch) reset() {
|
||||
ps.Block.Reset()
|
||||
ps.BlockRef.reset()
|
||||
ps.p = nil
|
||||
ps.tsids = nil
|
||||
ps.tsidIdx = 0
|
||||
ps.fetchData = true
|
||||
ps.metaindex = nil
|
||||
ps.ibCache = nil
|
||||
ps.bhs = nil
|
||||
|
@ -74,7 +70,7 @@ var isInTest = func() bool {
|
|||
//
|
||||
// tsids must be sorted.
|
||||
// tsids cannot be modified after the Init call, since it is owned by ps.
|
||||
func (ps *partSearch) Init(p *part, tsids []TSID, tr TimeRange, fetchData bool) {
|
||||
func (ps *partSearch) Init(p *part, tsids []TSID, tr TimeRange) {
|
||||
ps.reset()
|
||||
ps.p = p
|
||||
|
||||
|
@ -86,7 +82,6 @@ func (ps *partSearch) Init(p *part, tsids []TSID, tr TimeRange, fetchData bool)
|
|||
ps.tsids = tsids
|
||||
}
|
||||
ps.tr = tr
|
||||
ps.fetchData = fetchData
|
||||
ps.metaindex = p.metaindex
|
||||
ps.ibCache = p.ibCache
|
||||
|
||||
|
@ -95,7 +90,7 @@ func (ps *partSearch) Init(p *part, tsids []TSID, tr TimeRange, fetchData bool)
|
|||
ps.nextTSID()
|
||||
}
|
||||
|
||||
// NextBlock advances to the next Block.
|
||||
// NextBlock advances to the next BlockRef.
|
||||
//
|
||||
// Returns true on success.
|
||||
//
|
||||
|
@ -130,7 +125,7 @@ func (ps *partSearch) nextTSID() bool {
|
|||
ps.err = io.EOF
|
||||
return false
|
||||
}
|
||||
ps.Block.bh.TSID = ps.tsids[ps.tsidIdx]
|
||||
ps.BlockRef.bh.TSID = ps.tsids[ps.tsidIdx]
|
||||
ps.tsidIdx++
|
||||
return true
|
||||
}
|
||||
|
@ -139,20 +134,20 @@ func (ps *partSearch) nextBHS() bool {
|
|||
for len(ps.metaindex) > 0 {
|
||||
// Optimization: skip tsid values smaller than the minimum value
|
||||
// from ps.metaindex.
|
||||
for ps.Block.bh.TSID.Less(&ps.metaindex[0].TSID) {
|
||||
for ps.BlockRef.bh.TSID.Less(&ps.metaindex[0].TSID) {
|
||||
if !ps.nextTSID() {
|
||||
return false
|
||||
}
|
||||
}
|
||||
// Invariant: ps.Block.bh.TSID >= ps.metaindex[0].TSID
|
||||
// Invariant: ps.BlockRef.bh.TSID >= ps.metaindex[0].TSID
|
||||
|
||||
ps.metaindex = skipSmallMetaindexRows(ps.metaindex, &ps.Block.bh.TSID)
|
||||
// Invariant: len(ps.metaindex) > 0 && ps.Block.bh.TSID >= ps.metaindex[0].TSID
|
||||
ps.metaindex = skipSmallMetaindexRows(ps.metaindex, &ps.BlockRef.bh.TSID)
|
||||
// Invariant: len(ps.metaindex) > 0 && ps.BlockRef.bh.TSID >= ps.metaindex[0].TSID
|
||||
|
||||
mr := &ps.metaindex[0]
|
||||
ps.metaindex = ps.metaindex[1:]
|
||||
if ps.Block.bh.TSID.Less(&mr.TSID) {
|
||||
logger.Panicf("BUG: invariant violation: ps.Block.bh.TSID cannot be smaller than mr.TSID; got %+v vs %+v", &ps.Block.bh.TSID, &mr.TSID)
|
||||
if ps.BlockRef.bh.TSID.Less(&mr.TSID) {
|
||||
logger.Panicf("BUG: invariant violation: ps.BlockRef.bh.TSID cannot be smaller than mr.TSID; got %+v vs %+v", &ps.BlockRef.bh.TSID, &mr.TSID)
|
||||
}
|
||||
|
||||
if mr.MaxTimestamp < ps.tr.MinTimestamp {
|
||||
|
@ -165,7 +160,7 @@ func (ps *partSearch) nextBHS() bool {
|
|||
}
|
||||
|
||||
// Found the index block which may contain the required data
|
||||
// for the ps.Block.bh.TSID and the given timestamp range.
|
||||
// for the ps.BlockRef.bh.TSID and the given timestamp range.
|
||||
if ps.indexBlockReuse != nil {
|
||||
putIndexBlock(ps.indexBlockReuse)
|
||||
ps.indexBlockReuse = nil
|
||||
|
@ -249,15 +244,15 @@ func (ps *partSearch) searchBHS() bool {
|
|||
bh := &ps.bhs[i]
|
||||
|
||||
nextTSID:
|
||||
if bh.TSID.Less(&ps.Block.bh.TSID) {
|
||||
if bh.TSID.Less(&ps.BlockRef.bh.TSID) {
|
||||
// Skip blocks with small tsid values.
|
||||
continue
|
||||
}
|
||||
|
||||
// Invariant: ps.Block.bh.TSID <= bh.TSID
|
||||
// Invariant: ps.BlockRef.bh.TSID <= bh.TSID
|
||||
|
||||
if bh.TSID.MetricID != ps.Block.bh.TSID.MetricID {
|
||||
// ps.Block.bh.TSID < bh.TSID: no more blocks with the given tsid.
|
||||
if bh.TSID.MetricID != ps.BlockRef.bh.TSID.MetricID {
|
||||
// ps.BlockRef.bh.TSID < bh.TSID: no more blocks with the given tsid.
|
||||
// Proceed to the next (bigger) tsid.
|
||||
if !ps.nextTSID() {
|
||||
return false
|
||||
|
@ -284,7 +279,7 @@ func (ps *partSearch) searchBHS() bool {
|
|||
|
||||
// Found the tsid block with the matching timestamp range.
|
||||
// Read it.
|
||||
ps.readBlock(bh)
|
||||
ps.BlockRef.init(ps.p, bh)
|
||||
|
||||
ps.bhs = ps.bhs[i+1:]
|
||||
return true
|
||||
|
@ -293,17 +288,3 @@ func (ps *partSearch) searchBHS() bool {
|
|||
ps.bhs = nil
|
||||
return false
|
||||
}
|
||||
|
||||
func (ps *partSearch) readBlock(bh *blockHeader) {
|
||||
ps.Block.Reset()
|
||||
ps.Block.bh = *bh
|
||||
if !ps.fetchData {
|
||||
return
|
||||
}
|
||||
|
||||
ps.Block.timestampsData = bytesutil.Resize(ps.Block.timestampsData[:0], int(bh.TimestampsBlockSize))
|
||||
ps.p.timestampsFile.MustReadAt(ps.Block.timestampsData, int64(bh.TimestampsBlockOffset))
|
||||
|
||||
ps.Block.valuesData = bytesutil.Resize(ps.Block.valuesData[:0], int(bh.ValuesBlockSize))
|
||||
ps.p.valuesFile.MustReadAt(ps.Block.valuesData, int64(bh.ValuesBlockOffset))
|
||||
}
|
||||
|
|
|
@ -1247,11 +1247,11 @@ func testPartSearch(t *testing.T, p *part, tsids []TSID, tr TimeRange, expectedR
|
|||
|
||||
func testPartSearchSerial(p *part, tsids []TSID, tr TimeRange, expectedRawBlocks []rawBlock) error {
|
||||
var ps partSearch
|
||||
ps.Init(p, tsids, tr, true)
|
||||
ps.Init(p, tsids, tr)
|
||||
var bs []Block
|
||||
for ps.NextBlock() {
|
||||
var b Block
|
||||
b.CopyFrom(&ps.Block)
|
||||
ps.BlockRef.MustReadBlock(&b, true)
|
||||
bs = append(bs, b)
|
||||
}
|
||||
if err := ps.Error(); err != nil {
|
||||
|
|
|
@ -10,8 +10,8 @@ import (
|
|||
|
||||
// partitionSearch represents a search in the partition.
|
||||
type partitionSearch struct {
|
||||
// Block is the block found after NextBlock call.
|
||||
Block *Block
|
||||
// BlockRef is the block found after NextBlock call.
|
||||
BlockRef *BlockRef
|
||||
|
||||
// pt is a partition to search.
|
||||
pt *partition
|
||||
|
@ -30,7 +30,7 @@ type partitionSearch struct {
|
|||
}
|
||||
|
||||
func (pts *partitionSearch) reset() {
|
||||
pts.Block = nil
|
||||
pts.BlockRef = nil
|
||||
pts.pt = nil
|
||||
|
||||
for i := range pts.pws {
|
||||
|
@ -59,7 +59,7 @@ func (pts *partitionSearch) reset() {
|
|||
// tsids cannot be modified after the Init call, since it is owned by pts.
|
||||
//
|
||||
/// MustClose must be called when partition search is done.
|
||||
func (pts *partitionSearch) Init(pt *partition, tsids []TSID, tr TimeRange, fetchData bool) {
|
||||
func (pts *partitionSearch) Init(pt *partition, tsids []TSID, tr TimeRange) {
|
||||
if pts.needClosing {
|
||||
logger.Panicf("BUG: missing partitionSearch.MustClose call before the next call to Init")
|
||||
}
|
||||
|
@ -88,7 +88,7 @@ func (pts *partitionSearch) Init(pt *partition, tsids []TSID, tr TimeRange, fetc
|
|||
}
|
||||
pts.psPool = pts.psPool[:len(pts.pws)]
|
||||
for i, pw := range pts.pws {
|
||||
pts.psPool[i].Init(pw.p, tsids, tr, fetchData)
|
||||
pts.psPool[i].Init(pw.p, tsids, tr)
|
||||
}
|
||||
|
||||
// Initialize the psHeap.
|
||||
|
@ -114,7 +114,7 @@ func (pts *partitionSearch) Init(pt *partition, tsids []TSID, tr TimeRange, fetc
|
|||
return
|
||||
}
|
||||
heap.Init(&pts.psHeap)
|
||||
pts.Block = &pts.psHeap[0].Block
|
||||
pts.BlockRef = &pts.psHeap[0].BlockRef
|
||||
pts.nextBlockNoop = true
|
||||
}
|
||||
|
||||
|
@ -145,7 +145,7 @@ func (pts *partitionSearch) nextBlock() error {
|
|||
psMin := pts.psHeap[0]
|
||||
if psMin.NextBlock() {
|
||||
heap.Fix(&pts.psHeap, 0)
|
||||
pts.Block = &pts.psHeap[0].Block
|
||||
pts.BlockRef = &pts.psHeap[0].BlockRef
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -159,7 +159,7 @@ func (pts *partitionSearch) nextBlock() error {
|
|||
return io.EOF
|
||||
}
|
||||
|
||||
pts.Block = &pts.psHeap[0].Block
|
||||
pts.BlockRef = &pts.psHeap[0].BlockRef
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -188,7 +188,7 @@ func (psh *partSearchHeap) Len() int {
|
|||
|
||||
func (psh *partSearchHeap) Less(i, j int) bool {
|
||||
x := *psh
|
||||
return x[i].Block.bh.Less(&x[j].Block.bh)
|
||||
return x[i].BlockRef.bh.Less(&x[j].BlockRef.bh)
|
||||
}
|
||||
|
||||
func (psh *partSearchHeap) Swap(i, j int) {
|
||||
|
|
|
@ -240,10 +240,10 @@ func testPartitionSearchSerial(pt *partition, tsids []TSID, tr TimeRange, rbsExp
|
|||
|
||||
bs := []Block{}
|
||||
var pts partitionSearch
|
||||
pts.Init(pt, tsids, tr, true)
|
||||
pts.Init(pt, tsids, tr)
|
||||
for pts.NextBlock() {
|
||||
var b Block
|
||||
b.CopyFrom(pts.Block)
|
||||
pts.BlockRef.MustReadBlock(&b, true)
|
||||
bs = append(bs, b)
|
||||
}
|
||||
if err := pts.Error(); err != nil {
|
||||
|
@ -265,18 +265,9 @@ func testPartitionSearchSerial(pt *partition, tsids []TSID, tr TimeRange, rbsExp
|
|||
}
|
||||
|
||||
// verify that empty tsids returns empty result
|
||||
pts.Init(pt, []TSID{}, tr, true)
|
||||
pts.Init(pt, []TSID{}, tr)
|
||||
if pts.NextBlock() {
|
||||
return fmt.Errorf("unexpected block got for an empty tsids list: %+v", pts.Block)
|
||||
}
|
||||
if err := pts.Error(); err != nil {
|
||||
return fmt.Errorf("unexpected error on empty tsids list: %s", err)
|
||||
}
|
||||
pts.MustClose()
|
||||
|
||||
pts.Init(pt, []TSID{}, tr, false)
|
||||
if pts.NextBlock() {
|
||||
return fmt.Errorf("unexpected block got for an empty tsids list: %+v", pts.Block)
|
||||
return fmt.Errorf("unexpected block got for an empty tsids list: %+v", pts.BlockRef)
|
||||
}
|
||||
if err := pts.Error(); err != nil {
|
||||
return fmt.Errorf("unexpected error on empty tsids list: %s", err)
|
||||
|
|
|
@ -9,17 +9,64 @@ import (
|
|||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
)
|
||||
|
||||
// MetricBlock is a time series block for a single metric.
|
||||
type MetricBlock struct {
|
||||
// BlockRef references a Block.
|
||||
//
|
||||
// BlockRef is valid only until the corresponding Search is valid,
|
||||
// i.e. it becomes invalid after Search.MustClose is called.
|
||||
type BlockRef struct {
|
||||
p *part
|
||||
bh blockHeader
|
||||
}
|
||||
|
||||
func (br *BlockRef) reset() {
|
||||
br.p = nil
|
||||
br.bh = blockHeader{}
|
||||
}
|
||||
|
||||
func (br *BlockRef) init(p *part, bh *blockHeader) {
|
||||
br.p = p
|
||||
br.bh = *bh
|
||||
}
|
||||
|
||||
// MustReadBlock reads block from br to dst.
|
||||
//
|
||||
// if fetchData is false, then only block header is read, otherwise all the data is read.
|
||||
func (br *BlockRef) MustReadBlock(dst *Block, fetchData bool) {
|
||||
dst.Reset()
|
||||
dst.bh = br.bh
|
||||
if !fetchData {
|
||||
return
|
||||
}
|
||||
|
||||
dst.timestampsData = bytesutil.Resize(dst.timestampsData[:0], int(br.bh.TimestampsBlockSize))
|
||||
br.p.timestampsFile.MustReadAt(dst.timestampsData, int64(br.bh.TimestampsBlockOffset))
|
||||
|
||||
dst.valuesData = bytesutil.Resize(dst.valuesData[:0], int(br.bh.ValuesBlockSize))
|
||||
br.p.valuesFile.MustReadAt(dst.valuesData, int64(br.bh.ValuesBlockOffset))
|
||||
}
|
||||
|
||||
// MetricBlockRef contains reference to time series block for a single metric.
|
||||
type MetricBlockRef struct {
|
||||
// The metric name
|
||||
MetricName []byte
|
||||
|
||||
Block *Block
|
||||
// The block reference. Call BlockRef.MustReadBlock in order to obtain the block.
|
||||
BlockRef *BlockRef
|
||||
}
|
||||
|
||||
// MetricBlock is a time series block for a single metric.
|
||||
type MetricBlock struct {
|
||||
// MetricName is metric name for the given Block.
|
||||
MetricName []byte
|
||||
|
||||
// Block is a block for the given MetricName
|
||||
Block Block
|
||||
}
|
||||
|
||||
// Marshal marshals MetricBlock to dst
|
||||
func (mb *MetricBlock) Marshal(dst []byte) []byte {
|
||||
dst = encoding.MarshalBytes(dst, mb.MetricName)
|
||||
return MarshalBlock(dst, mb.Block)
|
||||
return MarshalBlock(dst, &mb.Block)
|
||||
}
|
||||
|
||||
// MarshalBlock marshals b to dst.
|
||||
|
@ -34,11 +81,7 @@ func MarshalBlock(dst []byte, b *Block) []byte {
|
|||
|
||||
// Unmarshal unmarshals MetricBlock from src
|
||||
func (mb *MetricBlock) Unmarshal(src []byte) ([]byte, error) {
|
||||
if mb.Block == nil {
|
||||
logger.Panicf("BUG: MetricBlock.Block must be non-nil when calling Unmarshal!")
|
||||
} else {
|
||||
mb.Block.Reset()
|
||||
}
|
||||
mb.Block.Reset()
|
||||
tail, mn, err := encoding.UnmarshalBytes(src)
|
||||
if err != nil {
|
||||
return tail, fmt.Errorf("cannot unmarshal MetricName: %s", err)
|
||||
|
@ -46,7 +89,7 @@ func (mb *MetricBlock) Unmarshal(src []byte) ([]byte, error) {
|
|||
mb.MetricName = append(mb.MetricName[:0], mn...)
|
||||
src = tail
|
||||
|
||||
return UnmarshalBlock(mb.Block, src)
|
||||
return UnmarshalBlock(&mb.Block, src)
|
||||
}
|
||||
|
||||
// UnmarshalBlock unmarshal Block from src to dst.
|
||||
|
@ -78,8 +121,8 @@ func UnmarshalBlock(dst *Block, src []byte) ([]byte, error) {
|
|||
|
||||
// Search is a search for time series.
|
||||
type Search struct {
|
||||
// MetricBlock is updated with each Search.NextMetricBlock call.
|
||||
MetricBlock MetricBlock
|
||||
// MetricBlockRef is updated with each Search.NextMetricBlock call.
|
||||
MetricBlockRef MetricBlockRef
|
||||
|
||||
storage *Storage
|
||||
|
||||
|
@ -91,8 +134,8 @@ type Search struct {
|
|||
}
|
||||
|
||||
func (s *Search) reset() {
|
||||
s.MetricBlock.MetricName = s.MetricBlock.MetricName[:0]
|
||||
s.MetricBlock.Block = nil
|
||||
s.MetricBlockRef.MetricName = s.MetricBlockRef.MetricName[:0]
|
||||
s.MetricBlockRef.BlockRef = nil
|
||||
|
||||
s.storage = nil
|
||||
s.ts.reset()
|
||||
|
@ -103,7 +146,7 @@ func (s *Search) reset() {
|
|||
// Init initializes s from the given storage, tfss and tr.
|
||||
//
|
||||
// MustClose must be called when the search is done.
|
||||
func (s *Search) Init(storage *Storage, tfss []*TagFilters, tr TimeRange, fetchData bool, maxMetrics int) {
|
||||
func (s *Search) Init(storage *Storage, tfss []*TagFilters, tr TimeRange, maxMetrics int) {
|
||||
if s.needClosing {
|
||||
logger.Panicf("BUG: missing MustClose call before the next call to Init")
|
||||
}
|
||||
|
@ -118,7 +161,7 @@ func (s *Search) Init(storage *Storage, tfss []*TagFilters, tr TimeRange, fetchD
|
|||
// It is ok to call Init on error from storage.searchTSIDs.
|
||||
// Init must be called before returning because it will fail
|
||||
// on Seach.MustClose otherwise.
|
||||
s.ts.Init(storage.tb, tsids, tr, fetchData)
|
||||
s.ts.Init(storage.tb, tsids, tr)
|
||||
|
||||
if err != nil {
|
||||
s.err = err
|
||||
|
@ -145,15 +188,15 @@ func (s *Search) Error() error {
|
|||
return s.err
|
||||
}
|
||||
|
||||
// NextMetricBlock proceeds to the next MetricBlock.
|
||||
// NextMetricBlock proceeds to the next MetricBlockRef.
|
||||
func (s *Search) NextMetricBlock() bool {
|
||||
if s.err != nil {
|
||||
return false
|
||||
}
|
||||
for s.ts.NextBlock() {
|
||||
tsid := &s.ts.Block.bh.TSID
|
||||
tsid := &s.ts.BlockRef.bh.TSID
|
||||
var err error
|
||||
s.MetricBlock.MetricName, err = s.storage.searchMetricName(s.MetricBlock.MetricName[:0], tsid.MetricID, tsid.AccountID, tsid.ProjectID)
|
||||
s.MetricBlockRef.MetricName, err = s.storage.searchMetricName(s.MetricBlockRef.MetricName[:0], tsid.MetricID, tsid.AccountID, tsid.ProjectID)
|
||||
if err != nil {
|
||||
if err == io.EOF {
|
||||
// Skip missing metricName for tsid.MetricID.
|
||||
|
@ -163,7 +206,7 @@ func (s *Search) NextMetricBlock() bool {
|
|||
s.err = err
|
||||
return false
|
||||
}
|
||||
s.MetricBlock.Block = s.ts.Block
|
||||
s.MetricBlockRef.BlockRef = s.ts.BlockRef
|
||||
return true
|
||||
}
|
||||
if err := s.ts.Error(); err != nil {
|
||||
|
|
|
@ -215,15 +215,20 @@ func testSearchInternal(st *Storage, tr TimeRange, mrs []MetricRow, accountsCoun
|
|||
expectedMrs = append(expectedMrs, *mr)
|
||||
}
|
||||
|
||||
type metricBlock struct {
|
||||
MetricName []byte
|
||||
Block *Block
|
||||
}
|
||||
|
||||
// Search
|
||||
s.Init(st, []*TagFilters{tfs}, tr, true, 1e5)
|
||||
var mbs []MetricBlock
|
||||
s.Init(st, []*TagFilters{tfs}, tr, 1e5)
|
||||
var mbs []metricBlock
|
||||
for s.NextMetricBlock() {
|
||||
var b Block
|
||||
b.CopyFrom(s.MetricBlock.Block)
|
||||
s.MetricBlockRef.BlockRef.MustReadBlock(&b, true)
|
||||
|
||||
var mb MetricBlock
|
||||
mb.MetricName = append(mb.MetricName, s.MetricBlock.MetricName...)
|
||||
var mb metricBlock
|
||||
mb.MetricName = append(mb.MetricName, s.MetricBlockRef.MetricName...)
|
||||
mb.Block = &b
|
||||
mbs = append(mbs, mb)
|
||||
}
|
||||
|
|
|
@ -623,24 +623,13 @@ func testStorageDeleteMetrics(s *Storage, workerNum int) error {
|
|||
MaxTimestamp: 2e10,
|
||||
}
|
||||
metricBlocksCount := func(tfs *TagFilters) int {
|
||||
// Verify the number of blocks with fetchData=true
|
||||
// Verify the number of blocks
|
||||
n := 0
|
||||
sr.Init(s, []*TagFilters{tfs}, tr, true, 1e5)
|
||||
sr.Init(s, []*TagFilters{tfs}, tr, 1e5)
|
||||
for sr.NextMetricBlock() {
|
||||
n++
|
||||
}
|
||||
sr.MustClose()
|
||||
|
||||
// Make sure the number of blocks with fetchData=false is the same.
|
||||
m := 0
|
||||
sr.Init(s, []*TagFilters{tfs}, tr, false, 1e5)
|
||||
for sr.NextMetricBlock() {
|
||||
m++
|
||||
}
|
||||
sr.MustClose()
|
||||
if n != m {
|
||||
return -1
|
||||
}
|
||||
return n
|
||||
}
|
||||
for i := 0; i < metricsCount; i++ {
|
||||
|
|
|
@ -11,7 +11,7 @@ import (
|
|||
|
||||
// tableSearch performs searches in the table.
|
||||
type tableSearch struct {
|
||||
Block *Block
|
||||
BlockRef *BlockRef
|
||||
|
||||
tb *table
|
||||
|
||||
|
@ -29,7 +29,7 @@ type tableSearch struct {
|
|||
}
|
||||
|
||||
func (ts *tableSearch) reset() {
|
||||
ts.Block = nil
|
||||
ts.BlockRef = nil
|
||||
ts.tb = nil
|
||||
|
||||
for i := range ts.ptws {
|
||||
|
@ -58,7 +58,7 @@ func (ts *tableSearch) reset() {
|
|||
// tsids cannot be modified after the Init call, since it is owned by ts.
|
||||
//
|
||||
// MustClose must be called then the tableSearch is done.
|
||||
func (ts *tableSearch) Init(tb *table, tsids []TSID, tr TimeRange, fetchData bool) {
|
||||
func (ts *tableSearch) Init(tb *table, tsids []TSID, tr TimeRange) {
|
||||
if ts.needClosing {
|
||||
logger.Panicf("BUG: missing MustClose call before the next call to Init")
|
||||
}
|
||||
|
@ -89,7 +89,7 @@ func (ts *tableSearch) Init(tb *table, tsids []TSID, tr TimeRange, fetchData boo
|
|||
}
|
||||
ts.ptsPool = ts.ptsPool[:len(ts.ptws)]
|
||||
for i, ptw := range ts.ptws {
|
||||
ts.ptsPool[i].Init(ptw.pt, tsids, tr, fetchData)
|
||||
ts.ptsPool[i].Init(ptw.pt, tsids, tr)
|
||||
}
|
||||
|
||||
// Initialize the ptsHeap.
|
||||
|
@ -115,13 +115,13 @@ func (ts *tableSearch) Init(tb *table, tsids []TSID, tr TimeRange, fetchData boo
|
|||
return
|
||||
}
|
||||
heap.Init(&ts.ptsHeap)
|
||||
ts.Block = ts.ptsHeap[0].Block
|
||||
ts.BlockRef = ts.ptsHeap[0].BlockRef
|
||||
ts.nextBlockNoop = true
|
||||
}
|
||||
|
||||
// NextBlock advances to the next block.
|
||||
//
|
||||
// The blocks are sorted by (TDIS, MinTimestamp). Two subsequent blocks
|
||||
// The blocks are sorted by (TSID, MinTimestamp). Two subsequent blocks
|
||||
// for the same TSID may contain overlapped time ranges.
|
||||
func (ts *tableSearch) NextBlock() bool {
|
||||
if ts.err != nil {
|
||||
|
@ -146,7 +146,7 @@ func (ts *tableSearch) nextBlock() error {
|
|||
ptsMin := ts.ptsHeap[0]
|
||||
if ptsMin.NextBlock() {
|
||||
heap.Fix(&ts.ptsHeap, 0)
|
||||
ts.Block = ts.ptsHeap[0].Block
|
||||
ts.BlockRef = ts.ptsHeap[0].BlockRef
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -160,7 +160,7 @@ func (ts *tableSearch) nextBlock() error {
|
|||
return io.EOF
|
||||
}
|
||||
|
||||
ts.Block = ts.ptsHeap[0].Block
|
||||
ts.BlockRef = ts.ptsHeap[0].BlockRef
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -192,7 +192,7 @@ func (ptsh *partitionSearchHeap) Len() int {
|
|||
|
||||
func (ptsh *partitionSearchHeap) Less(i, j int) bool {
|
||||
x := *ptsh
|
||||
return x[i].Block.bh.Less(&x[j].Block.bh)
|
||||
return x[i].BlockRef.bh.Less(&x[j].BlockRef.bh)
|
||||
}
|
||||
|
||||
func (ptsh *partitionSearchHeap) Swap(i, j int) {
|
||||
|
|
|
@ -251,10 +251,10 @@ func testTableSearchSerial(tb *table, tsids []TSID, tr TimeRange, rbsExpected []
|
|||
|
||||
bs := []Block{}
|
||||
var ts tableSearch
|
||||
ts.Init(tb, tsids, tr, true)
|
||||
ts.Init(tb, tsids, tr)
|
||||
for ts.NextBlock() {
|
||||
var b Block
|
||||
b.CopyFrom(ts.Block)
|
||||
ts.BlockRef.MustReadBlock(&b, true)
|
||||
bs = append(bs, b)
|
||||
}
|
||||
if err := ts.Error(); err != nil {
|
||||
|
@ -276,23 +276,14 @@ func testTableSearchSerial(tb *table, tsids []TSID, tr TimeRange, rbsExpected []
|
|||
}
|
||||
|
||||
// verify that empty tsids returns empty result
|
||||
ts.Init(tb, []TSID{}, tr, true)
|
||||
ts.Init(tb, []TSID{}, tr)
|
||||
if ts.NextBlock() {
|
||||
return fmt.Errorf("unexpected block got for an empty tsids list: %+v", ts.Block)
|
||||
return fmt.Errorf("unexpected block got for an empty tsids list: %+v", ts.BlockRef)
|
||||
}
|
||||
if err := ts.Error(); err != nil {
|
||||
return fmt.Errorf("unexpected error on empty tsids list: %s", err)
|
||||
}
|
||||
ts.MustClose()
|
||||
|
||||
ts.Init(tb, []TSID{}, tr, false)
|
||||
if ts.NextBlock() {
|
||||
return fmt.Errorf("unexpected block got for an empty tsids list with fetchData=false: %+v", ts.Block)
|
||||
}
|
||||
if err := ts.Error(); err != nil {
|
||||
return fmt.Errorf("unexpected error on empty tsids list with fetchData=false: %s", err)
|
||||
}
|
||||
ts.MustClose()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -127,12 +127,14 @@ func benchmarkTableSearch(b *testing.B, rowsCount, tsidsCount, tsidsSearch int,
|
|||
b.RunParallel(func(pb *testing.PB) {
|
||||
var ts tableSearch
|
||||
tsids := make([]TSID, tsidsSearch)
|
||||
var tmpBlock Block
|
||||
for pb.Next() {
|
||||
for i := range tsids {
|
||||
tsids[i].MetricID = 1 + uint64(i)
|
||||
}
|
||||
ts.Init(tb, tsids, tr, fetchData)
|
||||
ts.Init(tb, tsids, tr)
|
||||
for ts.NextBlock() {
|
||||
ts.BlockRef.MustReadBlock(&tmpBlock, fetchData)
|
||||
}
|
||||
ts.MustClose()
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue