From 989d84cf3f3e2b210b0b9989ceda5dbc0ccad9ce Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Mon, 27 Apr 2020 09:32:08 +0300 Subject: [PATCH] app/{vminsert,vmstorage}: wait for `ack` from `vmstorage` after each packet sent to it from `vminsert` This should protect from possible data loss when `vmstorage` is stopped while the packet is sent from `vminsert`. This commit switches to new protocol between vminsert and vmstorage, which is incompatible with the previous protocol. So it is required that both vminsert and vmstorage nodes are updated. --- app/vminsert/netstorage/netstorage.go | 34 ++++++++++++++++++++------- app/vmstorage/transport/server.go | 18 +++++++++++--- lib/handshake/handshake.go | 2 +- 3 files changed, 42 insertions(+), 12 deletions(-) diff --git a/app/vminsert/netstorage/netstorage.go b/app/vminsert/netstorage/netstorage.go index 66268a10f..c8d26e40d 100644 --- a/app/vminsert/netstorage/netstorage.go +++ b/app/vminsert/netstorage/netstorage.go @@ -3,6 +3,7 @@ package netstorage import ( "flag" "fmt" + "io" "sync" "time" @@ -119,32 +120,49 @@ func (sn *storageNode) sendBufLocked(buf []byte) error { return fmt.Errorf("cannot dial %q: %s", sn.dialer.Addr(), err) } } + if err := sn.sendToConn(sn.bc, buf); err != nil { + sn.closeBrokenConn() + return err + } + return nil +} + +func (sn *storageNode) sendToConn(bc *handshake.BufferedConn, buf []byte) error { timeoutSeconds := len(buf) / 3e5 if timeoutSeconds < 60 { timeoutSeconds = 60 } timeout := time.Duration(timeoutSeconds) * time.Second deadline := time.Now().Add(timeout) - if err := sn.bc.SetWriteDeadline(deadline); err != nil { - sn.closeBrokenConn() + if err := bc.SetWriteDeadline(deadline); err != nil { return fmt.Errorf("cannot set write deadline to %s: %s", deadline, err) } // sizeBuf guarantees that the rows batch will be either fully // read or fully discarded on the vmstorage side. // sizeBuf is used for read optimization in vmstorage. sn.sizeBuf = encoding.MarshalUint64(sn.sizeBuf[:0], uint64(len(buf))) - if _, err := sn.bc.Write(sn.sizeBuf); err != nil { - sn.closeBrokenConn() + if _, err := bc.Write(sn.sizeBuf); err != nil { return fmt.Errorf("cannot write data size %d: %s", len(buf), err) } - if _, err := sn.bc.Write(buf); err != nil { - sn.closeBrokenConn() + if _, err := bc.Write(buf); err != nil { return fmt.Errorf("cannot write data with size %d: %s", len(buf), err) } - if err := sn.bc.Flush(); err != nil { - sn.closeBrokenConn() + if err := bc.Flush(); err != nil { return fmt.Errorf("cannot flush data with size %d: %s", len(buf), err) } + + // Wait for `ack` from vmstorage. + // This guarantees that the message has been fully received by vmstorage. + deadline = time.Now().Add(5 * time.Second) + if err := bc.SetReadDeadline(deadline); err != nil { + return fmt.Errorf("cannot set read deadline for reading `ack` to vmstorage: %s", err) + } + if _, err := io.ReadFull(bc, sn.sizeBuf[:1]); err != nil { + return fmt.Errorf("cannot read `ack` from vmstorage: %s", err) + } + if sn.sizeBuf[0] != 1 { + return fmt.Errorf("unexpected `ack` received from vmstorage; got %d; want %d", sn.sizeBuf[0], 1) + } return nil } diff --git a/app/vmstorage/transport/server.go b/app/vmstorage/transport/server.go index 955a5c437..9589220d4 100644 --- a/app/vmstorage/transport/server.go +++ b/app/vmstorage/transport/server.go @@ -274,12 +274,12 @@ func (s *Server) isStopping() bool { return atomic.LoadUint64(&s.stopFlag) != 0 } -func (s *Server) processVMInsertConn(r io.Reader) error { +func (s *Server) processVMInsertConn(bc *handshake.BufferedConn) error { sizeBuf := make([]byte, 8) var buf []byte var mrs []storage.MetricRow for { - if _, err := io.ReadFull(r, sizeBuf); err != nil { + if _, err := io.ReadFull(bc, sizeBuf); err != nil { if err == io.EOF { // Remote end gracefully closed the connection. return nil @@ -291,9 +291,21 @@ func (s *Server) processVMInsertConn(r io.Reader) error { return fmt.Errorf("too big packet size: %d; shouldn't exceed %d", packetSize, consts.MaxInsertPacketSize) } buf = bytesutil.Resize(buf, int(packetSize)) - if n, err := io.ReadFull(r, buf); err != nil { + if n, err := io.ReadFull(bc, buf); err != nil { return fmt.Errorf("cannot read packet with size %d: %s; read only %d bytes", packetSize, err, n) } + // Send `ack` to vminsert that we recevied the packet. + deadline := time.Now().Add(5 * time.Second) + if err := bc.SetWriteDeadline(deadline); err != nil { + return fmt.Errorf("cannot set write deadline for sending `ack` to vminsert: %s", err) + } + sizeBuf[0] = 1 + if _, err := bc.Write(sizeBuf[:1]); err != nil { + return fmt.Errorf("cannot send `ack` to vminsert: %s", err) + } + if err := bc.Flush(); err != nil { + return fmt.Errorf("cannot flush `ack` to vminsert: %s", err) + } vminsertPacketsRead.Inc() // Read metric rows from the packet. diff --git a/lib/handshake/handshake.go b/lib/handshake/handshake.go index 814611985..1a56fd626 100644 --- a/lib/handshake/handshake.go +++ b/lib/handshake/handshake.go @@ -8,7 +8,7 @@ import ( ) const ( - vminsertHello = "vminsert.01" + vminsertHello = "vminsert.02" vmselectHello = "vmselect.01" successResponse = "ok"