app/vmstorage: parallelize data processing obtained from a single connection from vminsert

Previously vmstorage could use only a single CPU core for data processing from a single connection from vminsert.
Now all the CPU cores can be used for data processing from a single connection from vminsert.
This should improve the maximum data ingestion performance for a single vminsert->vmstorage connection.
This commit is contained in:
Aliaksandr Valialkin 2020-09-28 21:37:58 +03:00
parent 9d123eb22a
commit 2ee0dc27a6
2 changed files with 121 additions and 43 deletions

View file

@ -68,6 +68,7 @@ func main() {
registerStorageMetrics(strg)
transport.StartUnmarshalWorkers()
srv, err := transport.NewServer(*vminsertAddr, *vmselectAddr, strg)
if err != nil {
logger.Fatalf("cannot create a server with vminsertAddr=%s, vmselectAddr=%s: %s", *vminsertAddr, *vmselectAddr, err)
@ -94,6 +95,7 @@ func main() {
logger.Infof("gracefully shutting down the service")
startTime = time.Now()
srv.MustClose()
transport.StopUnmarshalWorkers()
logger.Infof("successfully shut down the service in %.3f seconds", time.Since(startTime).Seconds())
logger.Infof("gracefully closing the storage at %s", *storageDataPath)

View file

@ -7,6 +7,7 @@ import (
"io"
"net"
"net/http"
"runtime"
"sync"
"sync/atomic"
"time"
@ -294,17 +295,9 @@ func (s *Server) isStopping() bool {
func (s *Server) processVMInsertConn(bc *handshake.BufferedConn) error {
sizeBuf := make([]byte, 8)
var buf []byte
var mrs []storage.MetricRow
lastMRsResetTime := fasttime.UnixTimestamp()
var reqBuf []byte
remoteAddr := bc.RemoteAddr().String()
for {
if fasttime.UnixTimestamp()-lastMRsResetTime > 10 {
// Periodically reset mrs in order to prevent from gradual memory usage growth
// when ceratin entries in mr contain too long labels.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/490 for details.
mrs = nil
lastMRsResetTime = fasttime.UnixTimestamp()
}
if _, err := io.ReadFull(bc, sizeBuf); err != nil {
if err == io.EOF {
// Remote end gracefully closed the connection.
@ -316,11 +309,11 @@ func (s *Server) processVMInsertConn(bc *handshake.BufferedConn) error {
if packetSize > consts.MaxInsertPacketSize {
return fmt.Errorf("too big packet size: %d; shouldn't exceed %d", packetSize, consts.MaxInsertPacketSize)
}
buf = bytesutil.Resize(buf, int(packetSize))
if n, err := io.ReadFull(bc, buf); err != nil {
reqBuf = bytesutil.Resize(reqBuf, int(packetSize))
if n, err := io.ReadFull(bc, reqBuf); err != nil {
return fmt.Errorf("cannot read packet with size %d: %w; read only %d bytes", packetSize, err, n)
}
// Send `ack` to vminsert that we recevied the packet.
// Send `ack` to vminsert that the packet has been received.
deadline := time.Now().Add(5 * time.Second)
if err := bc.SetWriteDeadline(deadline); err != nil {
return fmt.Errorf("cannot set write deadline for sending `ack` to vminsert: %w", err)
@ -334,9 +327,91 @@ func (s *Server) processVMInsertConn(bc *handshake.BufferedConn) error {
}
vminsertPacketsRead.Inc()
// Read metric rows from the packet.
mrs = mrs[:0]
tail := buf
uw := getUnmarshalWork()
uw.storage = s.storage
uw.remoteAddr = remoteAddr
uw.reqBuf, reqBuf = reqBuf, uw.reqBuf
unmarshalWorkCh <- uw
}
}
var (
vminsertPacketsRead = metrics.NewCounter("vm_vminsert_packets_read_total")
vminsertMetricsRead = metrics.NewCounter("vm_vminsert_metrics_read_total")
)
func getUnmarshalWork() *unmarshalWork {
v := unmarshalWorkPool.Get()
if v == nil {
return &unmarshalWork{}
}
return v.(*unmarshalWork)
}
// StartUnmarshalWorkers starts workers for unmarshaling data obtained from vminsert connections.
//
// This function must be called before servers are created via NewServer.
func StartUnmarshalWorkers() {
gomaxprocs := runtime.GOMAXPROCS(-1)
unmarshalWorkCh = make(chan *unmarshalWork, gomaxprocs)
unmarshalWorkersWG.Add(gomaxprocs)
for i := 0; i < gomaxprocs; i++ {
go func() {
defer unmarshalWorkersWG.Done()
for uw := range unmarshalWorkCh {
uw.Unmarshal()
putUnmarshalWork(uw)
}
}()
}
}
// StopUnmarshalWorkers stops unmarshal workers which were started with StartUnmarshalWorkers.
//
// This function must be called after Server.MustClose().
func StopUnmarshalWorkers() {
close(unmarshalWorkCh)
unmarshalWorkersWG.Wait()
}
var (
unmarshalWorkCh chan *unmarshalWork
unmarshalWorkersWG sync.WaitGroup
)
func putUnmarshalWork(uw *unmarshalWork) {
uw.reset()
unmarshalWorkPool.Put(uw)
}
var unmarshalWorkPool sync.Pool
type unmarshalWork struct {
storage *storage.Storage
remoteAddr string
mrs []storage.MetricRow
reqBuf []byte
lastResetTime uint64
}
func (uw *unmarshalWork) reset() {
if (len(uw.mrs)*4 > cap(uw.mrs) || len(uw.reqBuf)*4 > cap(uw.reqBuf)) && fasttime.UnixTimestamp()-uw.lastResetTime > 10 {
// Periodically reset mrs and reqBuf in order to prevent from gradual memory usage growth
// when ceratin entries in mr contain too long labels.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/490 for details.
uw.mrs = nil
uw.reqBuf = nil
uw.lastResetTime = fasttime.UnixTimestamp()
}
uw.storage = nil
uw.remoteAddr = ""
uw.mrs = uw.mrs[:0]
uw.reqBuf = uw.reqBuf[:0]
}
func (uw *unmarshalWork) Unmarshal() {
mrs := uw.mrs[:0]
tail := uw.reqBuf
for len(tail) > 0 {
if len(mrs) < cap(mrs) {
mrs = mrs[:len(mrs)+1]
@ -347,30 +422,31 @@ func (s *Server) processVMInsertConn(bc *handshake.BufferedConn) error {
var err error
tail, err = mr.Unmarshal(tail)
if err != nil {
return fmt.Errorf("cannot unmarshal MetricRow: %w", err)
logger.Errorf("cannot unmarshal MetricRow obtained from %s: %s", uw.remoteAddr, err)
uw.mrs = mrs[:0]
return
}
if len(mrs) >= 10000 {
// Store the collected mrs in order to reduce memory usage
// when too big number of mrs are sent in each packet.
// This should help with https://github.com/VictoriaMetrics/VictoriaMetrics/issues/490
vminsertMetricsRead.Add(len(mrs))
if err := s.storage.AddRows(mrs, uint8(*precisionBits)); err != nil {
return fmt.Errorf("cannot store metrics: %w", err)
}
mrs = mrs[:0]
}
}
vminsertMetricsRead.Add(len(mrs))
if err := s.storage.AddRows(mrs, uint8(*precisionBits)); err != nil {
return fmt.Errorf("cannot store metrics: %w", err)
uw.mrs = mrs
uw.flushRows()
mrs = uw.mrs[:0]
}
}
uw.mrs = mrs
uw.flushRows()
}
var (
vminsertPacketsRead = metrics.NewCounter("vm_vminsert_packets_read_total")
vminsertMetricsRead = metrics.NewCounter("vm_vminsert_metrics_read_total")
)
func (uw *unmarshalWork) flushRows() {
vminsertMetricsRead.Add(len(uw.mrs))
err := uw.storage.AddRows(uw.mrs, uint8(*precisionBits))
uw.mrs = uw.mrs[:0]
if err != nil {
logger.Errorf("cannot store metrics obtained from %s: %s", uw.remoteAddr, err)
}
}
func (s *Server) processVMSelectConn(bc *handshake.BufferedConn) error {
ctx := &vmselectRequestCtx{