From 96a62a275a410f03c2fec2c62a440d1afc874048 Mon Sep 17 00:00:00 2001 From: Zakhar Bessarab Date: Fri, 5 Jul 2024 12:40:46 +0400 Subject: [PATCH] lib/handshake: use a json payload for metadata exchange Update the handshake to use an arbitrary JSON payload to transfer metadata. Handshake sends the metadata length first as an uint64 and then the metadata itself. Signed-off-by: Zakhar Bessarab --- lib/handshake/handshake.go | 97 +++++++++++++++++++++++++++++++------- 1 file changed, 80 insertions(+), 17 deletions(-) diff --git a/lib/handshake/handshake.go b/lib/handshake/handshake.go index d836450f6b..00230540ec 100644 --- a/lib/handshake/handshake.go +++ b/lib/handshake/handshake.go @@ -1,13 +1,17 @@ package handshake import ( + "encoding/json" "errors" "fmt" "io" "net" + "sync" "time" "unsafe" + "github.com/valyala/fastjson" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding" ) @@ -69,6 +73,10 @@ func VMSelectServer(c net.Conn, compressionLevel int, id uint64) (*BufferedConn, // The TCP healthcheck is performed by opening and then immediately closing the connection. var ErrIgnoreHealthcheck = fmt.Errorf("TCP healthcheck - ignore it") +type handshakeMetadata struct { + NodeID uint64 `json:"nodeId"` +} + func genericServer(c net.Conn, msg string, compressionLevel int, id uint64) (*BufferedConn, error) { if err := readMessage(c, msg); err != nil { if errors.Is(err, io.EOF) { @@ -94,11 +102,8 @@ func genericServer(c net.Conn, msg string, compressionLevel int, id uint64) (*Bu if err := readMessage(c, successResponse); err != nil { return nil, fmt.Errorf("cannot read success response on isCompressed: %w", err) } - if err := writeNodeID(c, id); err != nil { - return nil, fmt.Errorf("cannot write nodeID: %w", err) - } - if err := readMessage(c, successResponse); err != nil { - return nil, fmt.Errorf("cannot read success response on nodeID: %w", err) + if err := writeMetadata(c, id); err != nil { + return nil, fmt.Errorf("cannot write metadata: %w", err) } bc := newBufferedConn(c, compressionLevel, isRemoteCompressed) return bc, nil @@ -124,16 +129,13 @@ func genericClient(c net.Conn, msg string, compressionLevel int) (*BufferedConn, if err := writeMessage(c, successResponse); err != nil { return nil, 0, fmt.Errorf("cannot write success response on isCompressed: %w", err) } - nodeID, err := readNodeID(c) + metadata, err := readMetadata(c) if err != nil { return nil, 0, fmt.Errorf("cannot read nodeID: %w", err) } - if err := writeMessage(c, successResponse); err != nil { - return nil, 0, fmt.Errorf("cannot write success response on nodeID: %w", err) - } bc := newBufferedConn(c, compressionLevel, isRemoteCompressed) - return bc, nodeID, nil + return bc, metadata.NodeID, nil } func writeIsCompressed(c net.Conn, isCompressed bool) error { @@ -186,17 +188,78 @@ func readMessage(c net.Conn, msg string) error { return nil } -func readNodeID(c net.Conn) (uint64, error) { - buf, err := readData(c, int(unsafe.Sizeof(uint64(0)))) +var metadataParserPool fastjson.ParserPool + +func readMetadata(c net.Conn) (*handshakeMetadata, error) { + metaLenBuf, err := readData(c, int(unsafe.Sizeof(uint64(0)))) if err != nil { - return 0, err + return nil, fmt.Errorf("cannot read metadata length: %w", err) } - return encoding.UnmarshalUint64(buf), nil + + metaLen := int(encoding.UnmarshalUint64(metaLenBuf)) + if err := writeMessage(c, successResponse); err != nil { + return nil, fmt.Errorf("cannot write success response on metadata length: %w", err) + } + metaBuf, err := readData(c, metaLen) + if err != nil { + return nil, fmt.Errorf("cannot read metadata: %w", err) + } + if err := writeMessage(c, successResponse); err != nil { + return nil, fmt.Errorf("cannot write success response on metadata: %w", err) + } + parser := metadataParserPool.Get() + defer metadataParserPool.Put(parser) + v, err := parser.ParseBytes(metaBuf) + if err != nil { + return nil, fmt.Errorf("cannot parse metadata: %w", err) + } + + return &handshakeMetadata{ + NodeID: v.GetUint64("nodeId"), + }, nil } -func writeNodeID(c net.Conn, id uint64) error { - buf := encoding.MarshalUint64(nil, id) - return writeMessage(c, string(buf[:])) +var ( + metadataCache sync.Map +) + +func getMetadataBytes(id uint64) ([]byte, error) { + m, ok := metadataCache.Load(id) + if !ok { + metadata := handshakeMetadata{ + NodeID: id, + } + var err error + m, err = json.Marshal(metadata) + if err != nil { + return nil, fmt.Errorf("cannot marshal metadata: %w", err) + } + metadataCache.Store(id, m) + } + metaV := m.([]byte) + return metaV, nil +} + +func writeMetadata(c net.Conn, id uint64) error { + meta, err := getMetadataBytes(id) + if err != nil { + return fmt.Errorf("cannot obtain metadata bytes: %w", err) + } + metaLen := len(meta) + if err := writeMessage(c, string(encoding.MarshalUint64(nil, uint64(metaLen)))); err != nil { + return fmt.Errorf("cannot write metadata length: %w", err) + } + if err := readMessage(c, successResponse); err != nil { + return fmt.Errorf("cannot read success response on metadata length: %w", err) + } + if err := writeMessage(c, string(meta[:])); err != nil { + return fmt.Errorf("cannot write metadata: %w", err) + } + if err := readMessage(c, successResponse); err != nil { + return fmt.Errorf("cannot read success response on metadata: %w", err) + } + + return nil } func readData(c net.Conn, dataLen int) ([]byte, error) {