From 2ee0dc27a6af02bc139c8ba81bd9bd71175a1423 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Mon, 28 Sep 2020 21:37:58 +0300 Subject: [PATCH] app/vmstorage: parallelize data processing obtained from a single connection from vminsert Previously vmstorage could use only a single CPU core for data processing from a single connection from vminsert. Now all the CPU cores can be used for data processing from a single connection from vminsert. This should improve the maximum data ingestion performance for a single vminsert->vmstorage connection. --- app/vmstorage/main.go | 2 + app/vmstorage/transport/server.go | 162 ++++++++++++++++++++++-------- 2 files changed, 121 insertions(+), 43 deletions(-) diff --git a/app/vmstorage/main.go b/app/vmstorage/main.go index 507457830..3d9efbbe7 100644 --- a/app/vmstorage/main.go +++ b/app/vmstorage/main.go @@ -68,6 +68,7 @@ func main() { registerStorageMetrics(strg) + transport.StartUnmarshalWorkers() srv, err := transport.NewServer(*vminsertAddr, *vmselectAddr, strg) if err != nil { logger.Fatalf("cannot create a server with vminsertAddr=%s, vmselectAddr=%s: %s", *vminsertAddr, *vmselectAddr, err) @@ -94,6 +95,7 @@ func main() { logger.Infof("gracefully shutting down the service") startTime = time.Now() srv.MustClose() + transport.StopUnmarshalWorkers() logger.Infof("successfully shut down the service in %.3f seconds", time.Since(startTime).Seconds()) logger.Infof("gracefully closing the storage at %s", *storageDataPath) diff --git a/app/vmstorage/transport/server.go b/app/vmstorage/transport/server.go index ea6f52fea..339b64f0a 100644 --- a/app/vmstorage/transport/server.go +++ b/app/vmstorage/transport/server.go @@ -7,6 +7,7 @@ import ( "io" "net" "net/http" + "runtime" "sync" "sync/atomic" "time" @@ -294,17 +295,9 @@ func (s *Server) isStopping() bool { func (s *Server) processVMInsertConn(bc *handshake.BufferedConn) error { sizeBuf := make([]byte, 8) - var buf []byte - var mrs []storage.MetricRow - lastMRsResetTime := fasttime.UnixTimestamp() + var reqBuf []byte + remoteAddr := bc.RemoteAddr().String() for { - if fasttime.UnixTimestamp()-lastMRsResetTime > 10 { - // Periodically reset mrs in order to prevent from gradual memory usage growth - // when ceratin entries in mr contain too long labels. - // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/490 for details. - mrs = nil - lastMRsResetTime = fasttime.UnixTimestamp() - } if _, err := io.ReadFull(bc, sizeBuf); err != nil { if err == io.EOF { // Remote end gracefully closed the connection. @@ -316,11 +309,11 @@ func (s *Server) processVMInsertConn(bc *handshake.BufferedConn) error { if packetSize > consts.MaxInsertPacketSize { return fmt.Errorf("too big packet size: %d; shouldn't exceed %d", packetSize, consts.MaxInsertPacketSize) } - buf = bytesutil.Resize(buf, int(packetSize)) - if n, err := io.ReadFull(bc, buf); err != nil { + reqBuf = bytesutil.Resize(reqBuf, int(packetSize)) + if n, err := io.ReadFull(bc, reqBuf); err != nil { return fmt.Errorf("cannot read packet with size %d: %w; read only %d bytes", packetSize, err, n) } - // Send `ack` to vminsert that we recevied the packet. + // Send `ack` to vminsert that the packet has been received. deadline := time.Now().Add(5 * time.Second) if err := bc.SetWriteDeadline(deadline); err != nil { return fmt.Errorf("cannot set write deadline for sending `ack` to vminsert: %w", err) @@ -334,36 +327,11 @@ func (s *Server) processVMInsertConn(bc *handshake.BufferedConn) error { } vminsertPacketsRead.Inc() - // Read metric rows from the packet. - mrs = mrs[:0] - tail := buf - for len(tail) > 0 { - if len(mrs) < cap(mrs) { - mrs = mrs[:len(mrs)+1] - } else { - mrs = append(mrs, storage.MetricRow{}) - } - mr := &mrs[len(mrs)-1] - var err error - tail, err = mr.Unmarshal(tail) - if err != nil { - return fmt.Errorf("cannot unmarshal MetricRow: %w", err) - } - if len(mrs) >= 10000 { - // Store the collected mrs in order to reduce memory usage - // when too big number of mrs are sent in each packet. - // This should help with https://github.com/VictoriaMetrics/VictoriaMetrics/issues/490 - vminsertMetricsRead.Add(len(mrs)) - if err := s.storage.AddRows(mrs, uint8(*precisionBits)); err != nil { - return fmt.Errorf("cannot store metrics: %w", err) - } - mrs = mrs[:0] - } - } - vminsertMetricsRead.Add(len(mrs)) - if err := s.storage.AddRows(mrs, uint8(*precisionBits)); err != nil { - return fmt.Errorf("cannot store metrics: %w", err) - } + uw := getUnmarshalWork() + uw.storage = s.storage + uw.remoteAddr = remoteAddr + uw.reqBuf, reqBuf = reqBuf, uw.reqBuf + unmarshalWorkCh <- uw } } @@ -372,6 +340,114 @@ var ( vminsertMetricsRead = metrics.NewCounter("vm_vminsert_metrics_read_total") ) +func getUnmarshalWork() *unmarshalWork { + v := unmarshalWorkPool.Get() + if v == nil { + return &unmarshalWork{} + } + return v.(*unmarshalWork) +} + +// StartUnmarshalWorkers starts workers for unmarshaling data obtained from vminsert connections. +// +// This function must be called before servers are created via NewServer. +func StartUnmarshalWorkers() { + gomaxprocs := runtime.GOMAXPROCS(-1) + unmarshalWorkCh = make(chan *unmarshalWork, gomaxprocs) + unmarshalWorkersWG.Add(gomaxprocs) + for i := 0; i < gomaxprocs; i++ { + go func() { + defer unmarshalWorkersWG.Done() + for uw := range unmarshalWorkCh { + uw.Unmarshal() + putUnmarshalWork(uw) + } + }() + } +} + +// StopUnmarshalWorkers stops unmarshal workers which were started with StartUnmarshalWorkers. +// +// This function must be called after Server.MustClose(). +func StopUnmarshalWorkers() { + close(unmarshalWorkCh) + unmarshalWorkersWG.Wait() +} + +var ( + unmarshalWorkCh chan *unmarshalWork + unmarshalWorkersWG sync.WaitGroup +) + +func putUnmarshalWork(uw *unmarshalWork) { + uw.reset() + unmarshalWorkPool.Put(uw) +} + +var unmarshalWorkPool sync.Pool + +type unmarshalWork struct { + storage *storage.Storage + remoteAddr string + mrs []storage.MetricRow + reqBuf []byte + lastResetTime uint64 +} + +func (uw *unmarshalWork) reset() { + if (len(uw.mrs)*4 > cap(uw.mrs) || len(uw.reqBuf)*4 > cap(uw.reqBuf)) && fasttime.UnixTimestamp()-uw.lastResetTime > 10 { + // Periodically reset mrs and reqBuf in order to prevent from gradual memory usage growth + // when ceratin entries in mr contain too long labels. + // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/490 for details. + uw.mrs = nil + uw.reqBuf = nil + uw.lastResetTime = fasttime.UnixTimestamp() + } + uw.storage = nil + uw.remoteAddr = "" + uw.mrs = uw.mrs[:0] + uw.reqBuf = uw.reqBuf[:0] +} + +func (uw *unmarshalWork) Unmarshal() { + mrs := uw.mrs[:0] + tail := uw.reqBuf + for len(tail) > 0 { + if len(mrs) < cap(mrs) { + mrs = mrs[:len(mrs)+1] + } else { + mrs = append(mrs, storage.MetricRow{}) + } + mr := &mrs[len(mrs)-1] + var err error + tail, err = mr.Unmarshal(tail) + if err != nil { + logger.Errorf("cannot unmarshal MetricRow obtained from %s: %s", uw.remoteAddr, err) + uw.mrs = mrs[:0] + return + } + if len(mrs) >= 10000 { + // Store the collected mrs in order to reduce memory usage + // when too big number of mrs are sent in each packet. + // This should help with https://github.com/VictoriaMetrics/VictoriaMetrics/issues/490 + uw.mrs = mrs + uw.flushRows() + mrs = uw.mrs[:0] + } + } + uw.mrs = mrs + uw.flushRows() +} + +func (uw *unmarshalWork) flushRows() { + vminsertMetricsRead.Add(len(uw.mrs)) + err := uw.storage.AddRows(uw.mrs, uint8(*precisionBits)) + uw.mrs = uw.mrs[:0] + if err != nil { + logger.Errorf("cannot store metrics obtained from %s: %s", uw.remoteAddr, err) + } +} + func (s *Server) processVMSelectConn(bc *handshake.BufferedConn) error { ctx := &vmselectRequestCtx{ bc: bc,