mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
app/{vminsert,vmstorage}: wait for ack
from vmstorage
after each packet sent to it from vminsert
This should protect from possible data loss when `vmstorage` is stopped while the packet is sent from `vminsert`. This commit switches to new protocol between vminsert and vmstorage, which is incompatible with the previous protocol. So it is required that both vminsert and vmstorage nodes are updated.
This commit is contained in:
parent
e933cbac16
commit
989d84cf3f
3 changed files with 42 additions and 12 deletions
|
@ -3,6 +3,7 @@ package netstorage
|
|||
import (
|
||||
"flag"
|
||||
"fmt"
|
||||
"io"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
|
@ -119,32 +120,49 @@ func (sn *storageNode) sendBufLocked(buf []byte) error {
|
|||
return fmt.Errorf("cannot dial %q: %s", sn.dialer.Addr(), err)
|
||||
}
|
||||
}
|
||||
if err := sn.sendToConn(sn.bc, buf); err != nil {
|
||||
sn.closeBrokenConn()
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (sn *storageNode) sendToConn(bc *handshake.BufferedConn, buf []byte) error {
|
||||
timeoutSeconds := len(buf) / 3e5
|
||||
if timeoutSeconds < 60 {
|
||||
timeoutSeconds = 60
|
||||
}
|
||||
timeout := time.Duration(timeoutSeconds) * time.Second
|
||||
deadline := time.Now().Add(timeout)
|
||||
if err := sn.bc.SetWriteDeadline(deadline); err != nil {
|
||||
sn.closeBrokenConn()
|
||||
if err := bc.SetWriteDeadline(deadline); err != nil {
|
||||
return fmt.Errorf("cannot set write deadline to %s: %s", deadline, err)
|
||||
}
|
||||
// sizeBuf guarantees that the rows batch will be either fully
|
||||
// read or fully discarded on the vmstorage side.
|
||||
// sizeBuf is used for read optimization in vmstorage.
|
||||
sn.sizeBuf = encoding.MarshalUint64(sn.sizeBuf[:0], uint64(len(buf)))
|
||||
if _, err := sn.bc.Write(sn.sizeBuf); err != nil {
|
||||
sn.closeBrokenConn()
|
||||
if _, err := bc.Write(sn.sizeBuf); err != nil {
|
||||
return fmt.Errorf("cannot write data size %d: %s", len(buf), err)
|
||||
}
|
||||
if _, err := sn.bc.Write(buf); err != nil {
|
||||
sn.closeBrokenConn()
|
||||
if _, err := bc.Write(buf); err != nil {
|
||||
return fmt.Errorf("cannot write data with size %d: %s", len(buf), err)
|
||||
}
|
||||
if err := sn.bc.Flush(); err != nil {
|
||||
sn.closeBrokenConn()
|
||||
if err := bc.Flush(); err != nil {
|
||||
return fmt.Errorf("cannot flush data with size %d: %s", len(buf), err)
|
||||
}
|
||||
|
||||
// Wait for `ack` from vmstorage.
|
||||
// This guarantees that the message has been fully received by vmstorage.
|
||||
deadline = time.Now().Add(5 * time.Second)
|
||||
if err := bc.SetReadDeadline(deadline); err != nil {
|
||||
return fmt.Errorf("cannot set read deadline for reading `ack` to vmstorage: %s", err)
|
||||
}
|
||||
if _, err := io.ReadFull(bc, sn.sizeBuf[:1]); err != nil {
|
||||
return fmt.Errorf("cannot read `ack` from vmstorage: %s", err)
|
||||
}
|
||||
if sn.sizeBuf[0] != 1 {
|
||||
return fmt.Errorf("unexpected `ack` received from vmstorage; got %d; want %d", sn.sizeBuf[0], 1)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
@ -274,12 +274,12 @@ func (s *Server) isStopping() bool {
|
|||
return atomic.LoadUint64(&s.stopFlag) != 0
|
||||
}
|
||||
|
||||
func (s *Server) processVMInsertConn(r io.Reader) error {
|
||||
func (s *Server) processVMInsertConn(bc *handshake.BufferedConn) error {
|
||||
sizeBuf := make([]byte, 8)
|
||||
var buf []byte
|
||||
var mrs []storage.MetricRow
|
||||
for {
|
||||
if _, err := io.ReadFull(r, sizeBuf); err != nil {
|
||||
if _, err := io.ReadFull(bc, sizeBuf); err != nil {
|
||||
if err == io.EOF {
|
||||
// Remote end gracefully closed the connection.
|
||||
return nil
|
||||
|
@ -291,9 +291,21 @@ func (s *Server) processVMInsertConn(r io.Reader) error {
|
|||
return fmt.Errorf("too big packet size: %d; shouldn't exceed %d", packetSize, consts.MaxInsertPacketSize)
|
||||
}
|
||||
buf = bytesutil.Resize(buf, int(packetSize))
|
||||
if n, err := io.ReadFull(r, buf); err != nil {
|
||||
if n, err := io.ReadFull(bc, buf); err != nil {
|
||||
return fmt.Errorf("cannot read packet with size %d: %s; read only %d bytes", packetSize, err, n)
|
||||
}
|
||||
// Send `ack` to vminsert that we recevied the packet.
|
||||
deadline := time.Now().Add(5 * time.Second)
|
||||
if err := bc.SetWriteDeadline(deadline); err != nil {
|
||||
return fmt.Errorf("cannot set write deadline for sending `ack` to vminsert: %s", err)
|
||||
}
|
||||
sizeBuf[0] = 1
|
||||
if _, err := bc.Write(sizeBuf[:1]); err != nil {
|
||||
return fmt.Errorf("cannot send `ack` to vminsert: %s", err)
|
||||
}
|
||||
if err := bc.Flush(); err != nil {
|
||||
return fmt.Errorf("cannot flush `ack` to vminsert: %s", err)
|
||||
}
|
||||
vminsertPacketsRead.Inc()
|
||||
|
||||
// Read metric rows from the packet.
|
||||
|
|
|
@ -8,7 +8,7 @@ import (
|
|||
)
|
||||
|
||||
const (
|
||||
vminsertHello = "vminsert.01"
|
||||
vminsertHello = "vminsert.02"
|
||||
vmselectHello = "vmselect.01"
|
||||
|
||||
successResponse = "ok"
|
||||
|
|
Loading…
Reference in a new issue