lib/handshake: use a json payload for metadata exchange

Update the handshake to use an arbitrary JSON payload to transfer metadata.
Handshake sends the metadata length first as an uint64 and then the metadata itself.

Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com>
This commit is contained in:
Zakhar Bessarab 2024-07-05 12:40:46 +04:00
parent 20b9c8007b
commit 96a62a275a
No known key found for this signature in database
GPG key ID: 932B34D6FE062023

View file

@ -1,13 +1,17 @@
package handshake
import (
"encoding/json"
"errors"
"fmt"
"io"
"net"
"sync"
"time"
"unsafe"
"github.com/valyala/fastjson"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
)
@ -69,6 +73,10 @@ func VMSelectServer(c net.Conn, compressionLevel int, id uint64) (*BufferedConn,
// The TCP healthcheck is performed by opening and then immediately closing the connection.
var ErrIgnoreHealthcheck = fmt.Errorf("TCP healthcheck - ignore it")
type handshakeMetadata struct {
NodeID uint64 `json:"nodeId"`
}
func genericServer(c net.Conn, msg string, compressionLevel int, id uint64) (*BufferedConn, error) {
if err := readMessage(c, msg); err != nil {
if errors.Is(err, io.EOF) {
@ -94,11 +102,8 @@ func genericServer(c net.Conn, msg string, compressionLevel int, id uint64) (*Bu
if err := readMessage(c, successResponse); err != nil {
return nil, fmt.Errorf("cannot read success response on isCompressed: %w", err)
}
if err := writeNodeID(c, id); err != nil {
return nil, fmt.Errorf("cannot write nodeID: %w", err)
}
if err := readMessage(c, successResponse); err != nil {
return nil, fmt.Errorf("cannot read success response on nodeID: %w", err)
if err := writeMetadata(c, id); err != nil {
return nil, fmt.Errorf("cannot write metadata: %w", err)
}
bc := newBufferedConn(c, compressionLevel, isRemoteCompressed)
return bc, nil
@ -124,16 +129,13 @@ func genericClient(c net.Conn, msg string, compressionLevel int) (*BufferedConn,
if err := writeMessage(c, successResponse); err != nil {
return nil, 0, fmt.Errorf("cannot write success response on isCompressed: %w", err)
}
nodeID, err := readNodeID(c)
metadata, err := readMetadata(c)
if err != nil {
return nil, 0, fmt.Errorf("cannot read nodeID: %w", err)
}
if err := writeMessage(c, successResponse); err != nil {
return nil, 0, fmt.Errorf("cannot write success response on nodeID: %w", err)
}
bc := newBufferedConn(c, compressionLevel, isRemoteCompressed)
return bc, nodeID, nil
return bc, metadata.NodeID, nil
}
func writeIsCompressed(c net.Conn, isCompressed bool) error {
@ -186,17 +188,78 @@ func readMessage(c net.Conn, msg string) error {
return nil
}
func readNodeID(c net.Conn) (uint64, error) {
buf, err := readData(c, int(unsafe.Sizeof(uint64(0))))
var metadataParserPool fastjson.ParserPool
func readMetadata(c net.Conn) (*handshakeMetadata, error) {
metaLenBuf, err := readData(c, int(unsafe.Sizeof(uint64(0))))
if err != nil {
return 0, err
}
return encoding.UnmarshalUint64(buf), nil
return nil, fmt.Errorf("cannot read metadata length: %w", err)
}
func writeNodeID(c net.Conn, id uint64) error {
buf := encoding.MarshalUint64(nil, id)
return writeMessage(c, string(buf[:]))
metaLen := int(encoding.UnmarshalUint64(metaLenBuf))
if err := writeMessage(c, successResponse); err != nil {
return nil, fmt.Errorf("cannot write success response on metadata length: %w", err)
}
metaBuf, err := readData(c, metaLen)
if err != nil {
return nil, fmt.Errorf("cannot read metadata: %w", err)
}
if err := writeMessage(c, successResponse); err != nil {
return nil, fmt.Errorf("cannot write success response on metadata: %w", err)
}
parser := metadataParserPool.Get()
defer metadataParserPool.Put(parser)
v, err := parser.ParseBytes(metaBuf)
if err != nil {
return nil, fmt.Errorf("cannot parse metadata: %w", err)
}
return &handshakeMetadata{
NodeID: v.GetUint64("nodeId"),
}, nil
}
var (
metadataCache sync.Map
)
func getMetadataBytes(id uint64) ([]byte, error) {
m, ok := metadataCache.Load(id)
if !ok {
metadata := handshakeMetadata{
NodeID: id,
}
var err error
m, err = json.Marshal(metadata)
if err != nil {
return nil, fmt.Errorf("cannot marshal metadata: %w", err)
}
metadataCache.Store(id, m)
}
metaV := m.([]byte)
return metaV, nil
}
func writeMetadata(c net.Conn, id uint64) error {
meta, err := getMetadataBytes(id)
if err != nil {
return fmt.Errorf("cannot obtain metadata bytes: %w", err)
}
metaLen := len(meta)
if err := writeMessage(c, string(encoding.MarshalUint64(nil, uint64(metaLen)))); err != nil {
return fmt.Errorf("cannot write metadata length: %w", err)
}
if err := readMessage(c, successResponse); err != nil {
return fmt.Errorf("cannot read success response on metadata length: %w", err)
}
if err := writeMessage(c, string(meta[:])); err != nil {
return fmt.Errorf("cannot write metadata: %w", err)
}
if err := readMessage(c, successResponse); err != nil {
return fmt.Errorf("cannot read success response on metadata: %w", err)
}
return nil
}
func readData(c net.Conn, dataLen int) ([]byte, error) {