app/vmselect/netstorage: marshal block outside tmpBlocksFile.WriteBlock

This allows re-using the destination buffer for marshaling in the outer loop.
This commit is contained in:
Aliaksandr Valialkin 2019-09-28 20:38:24 +03:00
parent b7ee2e7af2
commit c1d3705be0
3 changed files with 14 additions and 12 deletions

View file

@ -484,9 +484,12 @@ func ProcessSearchQuery(sq *storage.SearchQuery, fetchData bool, deadline Deadli
tbf := getTmpBlocksFile()
m := make(map[string][]tmpBlockAddr)
blocksRead := 0
bb := tmpBufPool.Get()
defer tmpBufPool.Put(bb)
for sr.NextMetricBlock() {
blocksRead++
addr, err := tbf.WriteBlock(sr.MetricBlock.Block)
bb.B = storage.MarshalBlock(bb.B[:0], sr.MetricBlock.Block)
addr, err := tbf.WriteBlockData(bb.B)
if err != nil {
putTmpBlocksFile(tbf)
return nil, fmt.Errorf("cannot write data block #%d to temporary blocks file: %s", blocksRead, err)

View file

@ -82,22 +82,18 @@ func (addr tmpBlockAddr) String() string {
var tmpBlocksFilesCreated = metrics.NewCounter(`vm_tmp_blocks_files_created_total`)
// WriteBlock writes b to tbf.
// WriteBlockData writes b to tbf.
//
// It returns errors since the operation may fail on space shortage
// and this must be handled.
func (tbf *tmpBlocksFile) WriteBlock(b *storage.Block) (tmpBlockAddr, error) {
bb := tmpBufPool.Get()
defer tmpBufPool.Put(bb)
bb.B = storage.MarshalBlock(bb.B[:0], b)
func (tbf *tmpBlocksFile) WriteBlockData(b []byte) (tmpBlockAddr, error) {
var addr tmpBlockAddr
addr.offset = tbf.offset
addr.size = len(bb.B)
addr.size = len(b)
tbf.offset += uint64(addr.size)
if len(tbf.buf)+len(bb.B) <= cap(tbf.buf) {
if len(tbf.buf)+len(b) <= cap(tbf.buf) {
// Fast path - the data fits tbf.buf
tbf.buf = append(tbf.buf, bb.B...)
tbf.buf = append(tbf.buf, b...)
return addr, nil
}
@ -111,7 +107,7 @@ func (tbf *tmpBlocksFile) WriteBlock(b *storage.Block) (tmpBlockAddr, error) {
tmpBlocksFilesCreated.Inc()
}
_, err := tbf.f.Write(tbf.buf)
tbf.buf = append(tbf.buf[:0], bb.B...)
tbf.buf = append(tbf.buf[:0], b...)
if err != nil {
return addr, fmt.Errorf("cannot write block to %q: %s", tbf.f.Name(), err)
}

View file

@ -77,9 +77,12 @@ func testTmpBlocksFile() error {
// Write blocks until their summary size exceeds `size`.
var addrs []tmpBlockAddr
var blocks []*storage.Block
bb := tmpBufPool.Get()
defer tmpBufPool.Put(bb)
for tbf.offset < uint64(size) {
b := createBlock()
addr, err := tbf.WriteBlock(b)
bb.B = storage.MarshalBlock(bb.B[:0], b)
addr, err := tbf.WriteBlockData(bb.B)
if err != nil {
return fmt.Errorf("cannot write block at offset %d: %s", tbf.offset, err)
}