mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-10 15:14:09 +00:00
app/vmselect/netstorage: do not spend CPU time on unpacking empty blocks during /api/v1/series
calls
This commit is contained in:
parent
24ca30bf66
commit
23bdc1f107
1 changed files with 12 additions and 15 deletions
|
@ -177,10 +177,9 @@ type unpackWorkItem struct {
|
|||
}
|
||||
|
||||
type unpackWork struct {
|
||||
ws []unpackWorkItem
|
||||
fetchData bool
|
||||
sbs []*sortBlock
|
||||
doneCh chan error
|
||||
ws []unpackWorkItem
|
||||
sbs []*sortBlock
|
||||
doneCh chan error
|
||||
}
|
||||
|
||||
func (upw *unpackWork) reset() {
|
||||
|
@ -191,7 +190,6 @@ func (upw *unpackWork) reset() {
|
|||
w.tr = storage.TimeRange{}
|
||||
}
|
||||
upw.ws = upw.ws[:0]
|
||||
upw.fetchData = false
|
||||
sbs := upw.sbs
|
||||
for i := range sbs {
|
||||
sbs[i] = nil
|
||||
|
@ -205,7 +203,7 @@ func (upw *unpackWork) reset() {
|
|||
func (upw *unpackWork) unpack(tmpBlock *storage.Block) {
|
||||
for _, w := range upw.ws {
|
||||
sb := getSortBlock()
|
||||
if err := sb.unpackFrom(tmpBlock, w.br, w.tr, upw.fetchData); err != nil {
|
||||
if err := sb.unpackFrom(tmpBlock, w.br, w.tr); err != nil {
|
||||
putSortBlock(sb)
|
||||
upw.doneCh <- fmt.Errorf("cannot unpack block: %w", err)
|
||||
return
|
||||
|
@ -253,22 +251,23 @@ var unpackBatchSize = 8 * runtime.GOMAXPROCS(-1)
|
|||
// Unpack unpacks pts to dst.
|
||||
func (pts *packedTimeseries) Unpack(dst *Result, tr storage.TimeRange, fetchData bool) error {
|
||||
dst.reset()
|
||||
|
||||
if err := dst.MetricName.Unmarshal(bytesutil.ToUnsafeBytes(pts.metricName)); err != nil {
|
||||
return fmt.Errorf("cannot unmarshal metricName %q: %w", pts.metricName, err)
|
||||
}
|
||||
if !fetchData {
|
||||
// Do not spend resources on data reading and unpacking.
|
||||
return nil
|
||||
}
|
||||
|
||||
// Feed workers with work
|
||||
brsLen := len(pts.brs)
|
||||
upws := make([]*unpackWork, 0, 1+brsLen/unpackBatchSize)
|
||||
upw := getUnpackWork()
|
||||
upw.fetchData = fetchData
|
||||
for _, br := range pts.brs {
|
||||
if len(upw.ws) >= unpackBatchSize {
|
||||
unpackWorkCh <- upw
|
||||
upws = append(upws, upw)
|
||||
upw = getUnpackWork()
|
||||
upw.fetchData = fetchData
|
||||
}
|
||||
upw.ws = append(upw.ws, unpackWorkItem{
|
||||
br: br,
|
||||
|
@ -385,13 +384,11 @@ func (sb *sortBlock) reset() {
|
|||
sb.NextIdx = 0
|
||||
}
|
||||
|
||||
func (sb *sortBlock) unpackFrom(tmpBlock *storage.Block, br storage.BlockRef, tr storage.TimeRange, fetchData bool) error {
|
||||
func (sb *sortBlock) unpackFrom(tmpBlock *storage.Block, br storage.BlockRef, tr storage.TimeRange) error {
|
||||
tmpBlock.Reset()
|
||||
br.MustReadBlock(tmpBlock, fetchData)
|
||||
if fetchData {
|
||||
if err := tmpBlock.UnmarshalData(); err != nil {
|
||||
return fmt.Errorf("cannot unmarshal block: %w", err)
|
||||
}
|
||||
br.MustReadBlock(tmpBlock, true)
|
||||
if err := tmpBlock.UnmarshalData(); err != nil {
|
||||
return fmt.Errorf("cannot unmarshal block: %w", err)
|
||||
}
|
||||
timestamps := tmpBlock.Timestamps()
|
||||
|
||||
|
|
Loading…
Reference in a new issue