app/vmselect/netstorage: marshal block outside tmpBlocksFile.WriteBlock

This also allows marshaling outside lock, thus reducing the amount of work under the lock.
This commit is contained in:
Aliaksandr Valialkin 2019-09-28 20:38:24 +03:00
parent e92e39eddf
commit 946ca438a6
3 changed files with 19 additions and 19 deletions

View file

@ -701,16 +701,17 @@ type tmpBlocksFileWrapper struct {
}
func (tbfw *tmpBlocksFileWrapper) WriteBlock(mb *storage.MetricBlock) error {
bb := tmpBufPool.Get()
bb.B = storage.MarshalBlock(bb.B[:0], mb.Block)
tbfw.mu.Lock()
defer tbfw.mu.Unlock()
addr, err := tbfw.tbf.WriteBlock(mb.Block)
if err != nil {
return err
addr, err := tbfw.tbf.WriteBlockData(bb.B)
tmpBufPool.Put(bb)
if err == nil {
metricName := mb.MetricName
tbfw.m[string(metricName)] = append(tbfw.m[string(metricName)], addr)
}
metricName := mb.MetricName
tbfw.m[string(metricName)] = append(tbfw.m[string(metricName)], addr)
return nil
tbfw.mu.Unlock()
return err
}
// ProcessSearchQuery performs sq on storage nodes until the given deadline.

View file

@ -82,22 +82,18 @@ func (addr tmpBlockAddr) String() string {
var tmpBlocksFilesCreated = metrics.NewCounter(`vm_tmp_blocks_files_created_total`)
// WriteBlock writes b to tbf.
// WriteBlockData writes b to tbf.
//
// It returns errors since the operation may fail on space shortage
// and this must be handled.
func (tbf *tmpBlocksFile) WriteBlock(b *storage.Block) (tmpBlockAddr, error) {
bb := tmpBufPool.Get()
defer tmpBufPool.Put(bb)
bb.B = storage.MarshalBlock(bb.B[:0], b)
func (tbf *tmpBlocksFile) WriteBlockData(b []byte) (tmpBlockAddr, error) {
var addr tmpBlockAddr
addr.offset = tbf.offset
addr.size = len(bb.B)
addr.size = len(b)
tbf.offset += uint64(addr.size)
if len(tbf.buf)+len(bb.B) <= cap(tbf.buf) {
if len(tbf.buf)+len(b) <= cap(tbf.buf) {
// Fast path - the data fits tbf.buf
tbf.buf = append(tbf.buf, bb.B...)
tbf.buf = append(tbf.buf, b...)
return addr, nil
}
@ -111,7 +107,7 @@ func (tbf *tmpBlocksFile) WriteBlock(b *storage.Block) (tmpBlockAddr, error) {
tmpBlocksFilesCreated.Inc()
}
_, err := tbf.f.Write(tbf.buf)
tbf.buf = append(tbf.buf[:0], bb.B...)
tbf.buf = append(tbf.buf[:0], b...)
if err != nil {
return addr, fmt.Errorf("cannot write block to %q: %s", tbf.f.Name(), err)
}

View file

@ -77,9 +77,12 @@ func testTmpBlocksFile() error {
// Write blocks until their summary size exceeds `size`.
var addrs []tmpBlockAddr
var blocks []*storage.Block
bb := tmpBufPool.Get()
defer tmpBufPool.Put(bb)
for tbf.offset < uint64(size) {
b := createBlock()
addr, err := tbf.WriteBlock(b)
bb.B = storage.MarshalBlock(bb.B[:0], b)
addr, err := tbf.WriteBlockData(bb.B)
if err != nil {
return fmt.Errorf("cannot write block at offset %d: %s", tbf.offset, err)
}