mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-03-01 15:33:35 +00:00
Merge branch 'public-single-node' into pmm-6401-read-prometheus-data-files
This commit is contained in:
commit
9d3eb3f4b8
7 changed files with 20 additions and 32 deletions
|
@ -184,10 +184,9 @@ type unpackWorkItem struct {
|
|||
}
|
||||
|
||||
type unpackWork struct {
|
||||
ws []unpackWorkItem
|
||||
fetchData bool
|
||||
sbs []*sortBlock
|
||||
doneCh chan error
|
||||
ws []unpackWorkItem
|
||||
sbs []*sortBlock
|
||||
doneCh chan error
|
||||
}
|
||||
|
||||
func (upw *unpackWork) reset() {
|
||||
|
@ -198,7 +197,6 @@ func (upw *unpackWork) reset() {
|
|||
w.tr = storage.TimeRange{}
|
||||
}
|
||||
upw.ws = upw.ws[:0]
|
||||
upw.fetchData = false
|
||||
sbs := upw.sbs
|
||||
for i := range sbs {
|
||||
sbs[i] = nil
|
||||
|
@ -212,7 +210,7 @@ func (upw *unpackWork) reset() {
|
|||
func (upw *unpackWork) unpack(tmpBlock *storage.Block) {
|
||||
for _, w := range upw.ws {
|
||||
sb := getSortBlock()
|
||||
if err := sb.unpackFrom(tmpBlock, w.br, w.tr, upw.fetchData); err != nil {
|
||||
if err := sb.unpackFrom(tmpBlock, w.br, w.tr); err != nil {
|
||||
putSortBlock(sb)
|
||||
upw.doneCh <- fmt.Errorf("cannot unpack block: %w", err)
|
||||
return
|
||||
|
@ -260,22 +258,23 @@ var unpackBatchSize = 8 * runtime.GOMAXPROCS(-1)
|
|||
// Unpack unpacks pts to dst.
|
||||
func (pts *packedTimeseries) Unpack(dst *Result, tr storage.TimeRange, fetchData bool) error {
|
||||
dst.reset()
|
||||
|
||||
if err := dst.MetricName.Unmarshal(bytesutil.ToUnsafeBytes(pts.metricName)); err != nil {
|
||||
return fmt.Errorf("cannot unmarshal metricName %q: %w", pts.metricName, err)
|
||||
}
|
||||
if !fetchData {
|
||||
// Do not spend resources on data reading and unpacking.
|
||||
return nil
|
||||
}
|
||||
|
||||
// Feed workers with work
|
||||
brsLen := len(pts.brs)
|
||||
upws := make([]*unpackWork, 0, 1+brsLen/unpackBatchSize)
|
||||
upw := getUnpackWork()
|
||||
upw.fetchData = fetchData
|
||||
for _, br := range pts.brs {
|
||||
if len(upw.ws) >= unpackBatchSize {
|
||||
unpackWorkCh <- upw
|
||||
upws = append(upws, upw)
|
||||
upw = getUnpackWork()
|
||||
upw.fetchData = fetchData
|
||||
}
|
||||
upw.ws = append(upw.ws, unpackWorkItem{
|
||||
br: br,
|
||||
|
@ -427,13 +426,11 @@ func (sb *sortBlock) reset() {
|
|||
sb.NextIdx = 0
|
||||
}
|
||||
|
||||
func (sb *sortBlock) unpackFrom(tmpBlock *storage.Block, br storage.BlockRef, tr storage.TimeRange, fetchData bool) error {
|
||||
func (sb *sortBlock) unpackFrom(tmpBlock *storage.Block, br storage.BlockRef, tr storage.TimeRange) error {
|
||||
tmpBlock.Reset()
|
||||
br.MustReadBlock(tmpBlock, fetchData)
|
||||
if fetchData {
|
||||
if err := tmpBlock.UnmarshalData(); err != nil {
|
||||
return fmt.Errorf("cannot unmarshal block: %w", err)
|
||||
}
|
||||
br.MustReadBlock(tmpBlock)
|
||||
if err := tmpBlock.UnmarshalData(); err != nil {
|
||||
return fmt.Errorf("cannot unmarshal block: %w", err)
|
||||
}
|
||||
timestamps := tmpBlock.Timestamps()
|
||||
|
||||
|
|
|
@ -1251,7 +1251,7 @@ func testPartSearchSerial(p *part, tsids []TSID, tr TimeRange, expectedRawBlocks
|
|||
var bs []Block
|
||||
for ps.NextBlock() {
|
||||
var b Block
|
||||
ps.BlockRef.MustReadBlock(&b, true)
|
||||
ps.BlockRef.MustReadBlock(&b)
|
||||
bs = append(bs, b)
|
||||
}
|
||||
if err := ps.Error(); err != nil {
|
||||
|
|
|
@ -243,7 +243,7 @@ func testPartitionSearchSerial(pt *partition, tsids []TSID, tr TimeRange, rbsExp
|
|||
pts.Init(pt, tsids, tr)
|
||||
for pts.NextBlock() {
|
||||
var b Block
|
||||
pts.BlockRef.MustReadBlock(&b, true)
|
||||
pts.BlockRef.MustReadBlock(&b)
|
||||
bs = append(bs, b)
|
||||
}
|
||||
if err := pts.Error(); err != nil {
|
||||
|
|
|
@ -31,14 +31,9 @@ func (br *BlockRef) init(p *part, bh *blockHeader) {
|
|||
}
|
||||
|
||||
// MustReadBlock reads block from br to dst.
|
||||
//
|
||||
// if fetchData is false, then only block header is read, otherwise all the data is read.
|
||||
func (br *BlockRef) MustReadBlock(dst *Block, fetchData bool) {
|
||||
func (br *BlockRef) MustReadBlock(dst *Block) {
|
||||
dst.Reset()
|
||||
dst.bh = br.bh
|
||||
if !fetchData {
|
||||
return
|
||||
}
|
||||
|
||||
dst.timestampsData = bytesutil.Resize(dst.timestampsData[:0], int(br.bh.TimestampsBlockSize))
|
||||
br.p.timestampsFile.MustReadAt(dst.timestampsData, int64(br.bh.TimestampsBlockOffset))
|
||||
|
|
|
@ -222,7 +222,7 @@ func testSearchInternal(st *Storage, tr TimeRange, mrs []MetricRow, accountsCoun
|
|||
var mbs []metricBlock
|
||||
for s.NextMetricBlock() {
|
||||
var b Block
|
||||
s.MetricBlockRef.BlockRef.MustReadBlock(&b, true)
|
||||
s.MetricBlockRef.BlockRef.MustReadBlock(&b)
|
||||
|
||||
var mb metricBlock
|
||||
mb.MetricName = append(mb.MetricName, s.MetricBlockRef.MetricName...)
|
||||
|
|
|
@ -254,7 +254,7 @@ func testTableSearchSerial(tb *table, tsids []TSID, tr TimeRange, rbsExpected []
|
|||
ts.Init(tb, tsids, tr)
|
||||
for ts.NextBlock() {
|
||||
var b Block
|
||||
ts.BlockRef.MustReadBlock(&b, true)
|
||||
ts.BlockRef.MustReadBlock(&b)
|
||||
bs = append(bs, b)
|
||||
}
|
||||
if err := ts.Error(); err != nil {
|
||||
|
|
|
@ -26,11 +26,7 @@ func BenchmarkTableSearch(b *testing.B) {
|
|||
b.Run(fmt.Sprintf("tsidsCount_%d", tsidsCount), func(b *testing.B) {
|
||||
for _, tsidsSearch := range []int{1, 1e1, 1e2, 1e3, 1e4} {
|
||||
b.Run(fmt.Sprintf("tsidsSearch_%d", tsidsSearch), func(b *testing.B) {
|
||||
for _, fetchData := range []bool{true, false} {
|
||||
b.Run(fmt.Sprintf("fetchData_%v", fetchData), func(b *testing.B) {
|
||||
benchmarkTableSearch(b, rowsCount, tsidsCount, tsidsSearch, fetchData)
|
||||
})
|
||||
}
|
||||
benchmarkTableSearch(b, rowsCount, tsidsCount, tsidsSearch)
|
||||
})
|
||||
}
|
||||
})
|
||||
|
@ -107,7 +103,7 @@ func createBenchTable(b *testing.B, path string, startTimestamp int64, rowsPerIn
|
|||
tb.MustClose()
|
||||
}
|
||||
|
||||
func benchmarkTableSearch(b *testing.B, rowsCount, tsidsCount, tsidsSearch int, fetchData bool) {
|
||||
func benchmarkTableSearch(b *testing.B, rowsCount, tsidsCount, tsidsSearch int) {
|
||||
startTimestamp := timestampFromTime(time.Now()) - 365*24*3600*1000
|
||||
rowsPerInsert := getMaxRawRowsPerPartition()
|
||||
|
||||
|
@ -134,7 +130,7 @@ func benchmarkTableSearch(b *testing.B, rowsCount, tsidsCount, tsidsSearch int,
|
|||
}
|
||||
ts.Init(tb, tsids, tr)
|
||||
for ts.NextBlock() {
|
||||
ts.BlockRef.MustReadBlock(&tmpBlock, fetchData)
|
||||
ts.BlockRef.MustReadBlock(&tmpBlock)
|
||||
}
|
||||
ts.MustClose()
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue