mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
app/cluster: communicate node IDs when performing a handshake
Send a node ID of vmstorage as a part of vmselect and vminsert handshakes. Use vmstorage node ID as an identifier for consistent hashing at vminsert. Cluster native endpoints calculate vminsert and vmselect node IDs as a hash of all underlying storage node IDs, so that it will also remain consistent in case of address changes. Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com>
This commit is contained in:
parent
41e217423f
commit
84184b707a
15 changed files with 213 additions and 61 deletions
|
@ -5,6 +5,8 @@ import (
|
|||
"fmt"
|
||||
"net"
|
||||
|
||||
"github.com/VictoriaMetrics/metrics"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/netstorage"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/relabel"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
|
||||
|
@ -12,7 +14,6 @@ import (
|
|||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/clusternative/stream"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics"
|
||||
"github.com/VictoriaMetrics/metrics"
|
||||
)
|
||||
|
||||
var (
|
||||
|
@ -25,7 +26,7 @@ var (
|
|||
func InsertHandler(c net.Conn) error {
|
||||
// There is no need in response compression, since
|
||||
// lower-level vminsert sends only small packets to upper-level vminsert.
|
||||
bc, err := handshake.VMInsertServer(c, 0)
|
||||
bc, err := handshake.VMInsertServer(c, 0, netstorage.GetNodeID())
|
||||
if err != nil {
|
||||
if errors.Is(err, handshake.ErrIgnoreHealthcheck) {
|
||||
return nil
|
||||
|
|
|
@ -1,9 +1,5 @@
|
|||
package netstorage
|
||||
|
||||
import (
|
||||
"github.com/cespare/xxhash/v2"
|
||||
)
|
||||
|
||||
// See the following docs:
|
||||
// - https://www.eecs.umich.edu/techreports/cse/96/CSE-TR-316-96.pdf
|
||||
// - https://github.com/dgryski/go-rendezvous
|
||||
|
@ -13,14 +9,10 @@ type consistentHash struct {
|
|||
nodeHashes []uint64
|
||||
}
|
||||
|
||||
func newConsistentHash(nodes []string, hashSeed uint64) *consistentHash {
|
||||
nodeHashes := make([]uint64, len(nodes))
|
||||
for i, node := range nodes {
|
||||
nodeHashes[i] = xxhash.Sum64([]byte(node))
|
||||
}
|
||||
func newConsistentHash(ids []uint64, hashSeed uint64) *consistentHash {
|
||||
return &consistentHash{
|
||||
hashSeed: hashSeed,
|
||||
nodeHashes: nodeHashes,
|
||||
nodeHashes: ids,
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -4,16 +4,18 @@ import (
|
|||
"math"
|
||||
"math/rand"
|
||||
"testing"
|
||||
|
||||
"github.com/cespare/xxhash/v2"
|
||||
)
|
||||
|
||||
func TestConsistentHash(t *testing.T) {
|
||||
r := rand.New(rand.NewSource(1))
|
||||
|
||||
nodes := []string{
|
||||
"node1",
|
||||
"node2",
|
||||
"node3",
|
||||
"node4",
|
||||
nodes := []uint64{
|
||||
xxhash.Sum64String("node1"),
|
||||
xxhash.Sum64String("node2"),
|
||||
xxhash.Sum64String("node3"),
|
||||
xxhash.Sum64String("node4"),
|
||||
}
|
||||
rh := newConsistentHash(nodes, 0)
|
||||
|
||||
|
|
|
@ -4,16 +4,19 @@ import (
|
|||
"math/rand"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
|
||||
"github.com/cespare/xxhash/v2"
|
||||
)
|
||||
|
||||
func BenchmarkConsistentHash(b *testing.B) {
|
||||
nodes := []string{
|
||||
"node1",
|
||||
"node2",
|
||||
"node3",
|
||||
"node4",
|
||||
nodes := []uint64{
|
||||
xxhash.Sum64String("node1"),
|
||||
xxhash.Sum64String("node2"),
|
||||
xxhash.Sum64String("node3"),
|
||||
xxhash.Sum64String("node4"),
|
||||
}
|
||||
rh := newConsistentHash(nodes, 0)
|
||||
|
||||
b.ReportAllocs()
|
||||
b.SetBytes(int64(len(benchKeys)))
|
||||
b.RunParallel(func(pb *testing.PB) {
|
||||
|
|
|
@ -6,10 +6,14 @@ import (
|
|||
"fmt"
|
||||
"io"
|
||||
"net"
|
||||
"slices"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/VictoriaMetrics/metrics"
|
||||
"github.com/cespare/xxhash/v2"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/consts"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
|
||||
|
@ -21,8 +25,6 @@ import (
|
|||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timeutil"
|
||||
"github.com/VictoriaMetrics/metrics"
|
||||
"github.com/cespare/xxhash/v2"
|
||||
)
|
||||
|
||||
var (
|
||||
|
@ -398,7 +400,8 @@ func (sn *storageNode) dial() (*handshake.BufferedConn, error) {
|
|||
if *disableRPCCompression {
|
||||
compressionLevel = 0
|
||||
}
|
||||
bc, err := handshake.VMInsertClient(c, compressionLevel)
|
||||
bc, id, err := handshake.VMInsertClient(c, compressionLevel)
|
||||
sn.id.CompareAndSwap(0, id)
|
||||
if err != nil {
|
||||
_ = c.Close()
|
||||
sn.handshakeErrors.Inc()
|
||||
|
@ -407,6 +410,15 @@ func (sn *storageNode) dial() (*handshake.BufferedConn, error) {
|
|||
return bc, nil
|
||||
}
|
||||
|
||||
func (sn *storageNode) getID() uint64 {
|
||||
// Ensure that the id is populated
|
||||
if sn.id.Load() == 0 {
|
||||
sn.checkHealth()
|
||||
}
|
||||
|
||||
return sn.id.Load()
|
||||
}
|
||||
|
||||
// storageNode is a client sending data to vmstorage node.
|
||||
type storageNode struct {
|
||||
// isBroken is set to true if the given vmstorage node is temporarily unhealthy.
|
||||
|
@ -473,6 +485,9 @@ type storageNode struct {
|
|||
// The total duration spent for sending data to vmstorage node.
|
||||
// This metric is useful for determining the saturation of vminsert->vmstorage link.
|
||||
sendDurationSeconds *metrics.FloatCounter
|
||||
|
||||
// id is a unique identifier for the storage node.
|
||||
id atomic.Uint64
|
||||
}
|
||||
|
||||
type storageNodesBucket struct {
|
||||
|
@ -515,14 +530,39 @@ func MustStop() {
|
|||
mustStopStorageNodes(snb)
|
||||
}
|
||||
|
||||
var (
|
||||
nodeID uint64
|
||||
nodeIDOnce sync.Once
|
||||
)
|
||||
|
||||
// GetNodeID returns unique identifier for underlying storage nodes.
|
||||
func GetNodeID() uint64 {
|
||||
nodeIDOnce.Do(func() {
|
||||
snb := getStorageNodesBucket()
|
||||
snIDs := make([]uint64, 0, len(snb.sns))
|
||||
for _, sn := range snb.sns {
|
||||
snIDs = append(snIDs, sn.getID())
|
||||
}
|
||||
slices.Sort(snIDs)
|
||||
idsM := make([]byte, 0)
|
||||
for _, id := range snIDs {
|
||||
idsM = encoding.MarshalUint64(idsM, id)
|
||||
}
|
||||
|
||||
nodeID = xxhash.Sum64(idsM)
|
||||
})
|
||||
|
||||
return nodeID
|
||||
}
|
||||
|
||||
func initStorageNodes(addrs []string, hashSeed uint64) *storageNodesBucket {
|
||||
if len(addrs) == 0 {
|
||||
logger.Panicf("BUG: addrs must be non-empty")
|
||||
}
|
||||
ms := metrics.NewSet()
|
||||
nodesHash := newConsistentHash(addrs, hashSeed)
|
||||
sns := make([]*storageNode, 0, len(addrs))
|
||||
stopCh := make(chan struct{})
|
||||
nodeIDs := make([]uint64, 0, len(addrs))
|
||||
for _, addr := range addrs {
|
||||
if _, _, err := net.SplitHostPort(addr); err != nil {
|
||||
// Automatically add missing port.
|
||||
|
@ -568,8 +608,10 @@ func initStorageNodes(addrs []string, hashSeed uint64) *storageNodesBucket {
|
|||
}
|
||||
return 0
|
||||
})
|
||||
nodeIDs = append(nodeIDs, sn.getID())
|
||||
sns = append(sns, sn)
|
||||
}
|
||||
nodesHash := newConsistentHash(nodeIDs, hashSeed)
|
||||
|
||||
maxBufSizePerStorageNode = memory.Allowed() / 8 / len(sns)
|
||||
if maxBufSizePerStorageNode > consts.MaxInsertPacketSizeForVMInsert {
|
||||
|
|
|
@ -31,7 +31,9 @@ var (
|
|||
|
||||
// NewVMSelectServer starts new server at the given addr, which serves vmselect requests from netstorage.
|
||||
func NewVMSelectServer(addr string) (*vmselectapi.Server, error) {
|
||||
api := &vmstorageAPI{}
|
||||
api := &vmstorageAPI{
|
||||
nodeID: netstorage.GetNodeID(),
|
||||
}
|
||||
limits := vmselectapi.Limits{
|
||||
MaxLabelNames: *maxTagKeys,
|
||||
MaxLabelValues: *maxTagValues,
|
||||
|
@ -45,7 +47,9 @@ func NewVMSelectServer(addr string) (*vmselectapi.Server, error) {
|
|||
}
|
||||
|
||||
// vmstorageAPI impelements vmselectapi.API
|
||||
type vmstorageAPI struct{}
|
||||
type vmstorageAPI struct {
|
||||
nodeID uint64
|
||||
}
|
||||
|
||||
func (api *vmstorageAPI) InitSearch(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline uint64) (vmselectapi.BlockIterator, error) {
|
||||
denyPartialResponse := httputils.GetDenyPartialResponse(nil)
|
||||
|
@ -112,6 +116,10 @@ func (api *vmstorageAPI) RegisterMetricNames(qt *querytracer.Tracer, mrs []stora
|
|||
return netstorage.RegisterMetricNames(qt, mrs, dl)
|
||||
}
|
||||
|
||||
func (api *vmstorageAPI) GetID() uint64 {
|
||||
return api.nodeID
|
||||
}
|
||||
|
||||
// blockIterator implements vmselectapi.BlockIterator
|
||||
type blockIterator struct {
|
||||
workCh chan workItem
|
||||
|
|
|
@ -9,6 +9,7 @@ import (
|
|||
"net"
|
||||
"net/http"
|
||||
"os"
|
||||
"slices"
|
||||
"sort"
|
||||
"strings"
|
||||
"sync"
|
||||
|
@ -2107,6 +2108,9 @@ type storageNode struct {
|
|||
|
||||
// The number of list tenants errors to storageNode.
|
||||
tenantsErrors *metrics.Counter
|
||||
|
||||
// id is the unique identifier for the storageNode.
|
||||
id uint64
|
||||
}
|
||||
|
||||
func (sn *storageNode) registerMetricNames(qt *querytracer.Tracer, mrs []storage.MetricRow, deadline searchutils.Deadline) error {
|
||||
|
@ -2954,6 +2958,31 @@ func getStorageNodes() []*storageNode {
|
|||
return snb.sns
|
||||
}
|
||||
|
||||
var (
|
||||
nodeID uint64
|
||||
nodeIDOnce sync.Once
|
||||
)
|
||||
|
||||
// GetNodeID returns unique identifier for underlying storage nodes.
|
||||
func GetNodeID() uint64 {
|
||||
nodeIDOnce.Do(func() {
|
||||
snb := getStorageNodesBucket()
|
||||
snIDs := make([]uint64, 0, len(snb.sns))
|
||||
for _, sn := range snb.sns {
|
||||
snIDs = append(snIDs, sn.id)
|
||||
}
|
||||
slices.Sort(snIDs)
|
||||
idsM := make([]byte, 0)
|
||||
for _, id := range snIDs {
|
||||
idsM = encoding.MarshalUint64(idsM, id)
|
||||
}
|
||||
|
||||
nodeID = xxhash.Sum64(idsM)
|
||||
})
|
||||
|
||||
return nodeID
|
||||
}
|
||||
|
||||
// Init initializes storage nodes' connections to the given addrs.
|
||||
//
|
||||
// MustStop must be called when the initialized connections are no longer needed.
|
||||
|
@ -3015,6 +3044,7 @@ func newStorageNode(ms *metrics.Set, group *storageNodesGroup, addr string) *sto
|
|||
sn := &storageNode{
|
||||
group: group,
|
||||
connPool: connPool,
|
||||
id: connPool.GetTargetNodeID(),
|
||||
|
||||
concurrentQueries: ms.NewCounter(fmt.Sprintf(`vm_concurrent_queries{name="vmselect", addr=%q}`, addr)),
|
||||
|
||||
|
|
|
@ -101,7 +101,7 @@ func (s *VMInsertServer) run() {
|
|||
// There is no need in response compression, since
|
||||
// vmstorage sends only small packets to vminsert.
|
||||
compressionLevel := 0
|
||||
bc, err := handshake.VMInsertServer(c, compressionLevel)
|
||||
bc, err := handshake.VMInsertServer(c, compressionLevel, s.storage.GetID())
|
||||
if err != nil {
|
||||
if s.isStopping() {
|
||||
// c is stopped inside VMInsertServer.MustStop
|
||||
|
|
|
@ -195,6 +195,10 @@ func (api *vmstorageAPI) setupTfss(qt *querytracer.Tracer, sq *storage.SearchQue
|
|||
return tfss, nil
|
||||
}
|
||||
|
||||
func (api *vmstorageAPI) GetID() uint64 {
|
||||
return api.s.GetID()
|
||||
}
|
||||
|
||||
// blockIterator implements vmselectapi.BlockIterator
|
||||
type blockIterator struct {
|
||||
sr storage.Search
|
||||
|
|
|
@ -6,6 +6,9 @@ import (
|
|||
"io"
|
||||
"net"
|
||||
"time"
|
||||
"unsafe"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
|
||||
)
|
||||
|
||||
const (
|
||||
|
@ -15,17 +18,22 @@ const (
|
|||
successResponse = "ok"
|
||||
)
|
||||
|
||||
// Func must perform handshake on the given c using the given compressionLevel.
|
||||
// ClientFunc must perform handshake on the given c using the given compressionLevel.
|
||||
//
|
||||
// It must return BufferedConn wrapper for c on successful handshake.
|
||||
type Func func(c net.Conn, compressionLevel int) (*BufferedConn, error)
|
||||
type ClientFunc func(c net.Conn, compressionLevel int) (*BufferedConn, uint64, error)
|
||||
|
||||
// ServerFunc must perform handshake on the given c using the given compressionLevel and id.
|
||||
//
|
||||
// It must return BufferedConn wrapper for c on successful handshake.
|
||||
type ServerFunc func(c net.Conn, compressionLevel int, id uint64) (*BufferedConn, error)
|
||||
|
||||
// VMInsertClient performs client-side handshake for vminsert protocol.
|
||||
//
|
||||
// compressionLevel is the level used for compression of the data sent
|
||||
// to the server.
|
||||
// compressionLevel <= 0 means 'no compression'
|
||||
func VMInsertClient(c net.Conn, compressionLevel int) (*BufferedConn, error) {
|
||||
func VMInsertClient(c net.Conn, compressionLevel int) (*BufferedConn, uint64, error) {
|
||||
return genericClient(c, vminsertHello, compressionLevel)
|
||||
}
|
||||
|
||||
|
@ -34,8 +42,8 @@ func VMInsertClient(c net.Conn, compressionLevel int) (*BufferedConn, error) {
|
|||
// compressionLevel is the level used for compression of the data sent
|
||||
// to the client.
|
||||
// compressionLevel <= 0 means 'no compression'
|
||||
func VMInsertServer(c net.Conn, compressionLevel int) (*BufferedConn, error) {
|
||||
return genericServer(c, vminsertHello, compressionLevel)
|
||||
func VMInsertServer(c net.Conn, compressionLevel int, id uint64) (*BufferedConn, error) {
|
||||
return genericServer(c, vminsertHello, compressionLevel, id)
|
||||
}
|
||||
|
||||
// VMSelectClient performs client-side handshake for vmselect protocol.
|
||||
|
@ -43,7 +51,7 @@ func VMInsertServer(c net.Conn, compressionLevel int) (*BufferedConn, error) {
|
|||
// compressionLevel is the level used for compression of the data sent
|
||||
// to the server.
|
||||
// compressionLevel <= 0 means 'no compression'
|
||||
func VMSelectClient(c net.Conn, compressionLevel int) (*BufferedConn, error) {
|
||||
func VMSelectClient(c net.Conn, compressionLevel int) (*BufferedConn, uint64, error) {
|
||||
return genericClient(c, vmselectHello, compressionLevel)
|
||||
}
|
||||
|
||||
|
@ -52,8 +60,8 @@ func VMSelectClient(c net.Conn, compressionLevel int) (*BufferedConn, error) {
|
|||
// compressionLevel is the level used for compression of the data sent
|
||||
// to the client.
|
||||
// compressionLevel <= 0 means 'no compression'
|
||||
func VMSelectServer(c net.Conn, compressionLevel int) (*BufferedConn, error) {
|
||||
return genericServer(c, vmselectHello, compressionLevel)
|
||||
func VMSelectServer(c net.Conn, compressionLevel int, id uint64) (*BufferedConn, error) {
|
||||
return genericServer(c, vmselectHello, compressionLevel, id)
|
||||
}
|
||||
|
||||
// ErrIgnoreHealthcheck means the TCP healthckeck, which must be ignored.
|
||||
|
@ -61,7 +69,7 @@ func VMSelectServer(c net.Conn, compressionLevel int) (*BufferedConn, error) {
|
|||
// The TCP healthcheck is performed by opening and then immediately closing the connection.
|
||||
var ErrIgnoreHealthcheck = fmt.Errorf("TCP healthcheck - ignore it")
|
||||
|
||||
func genericServer(c net.Conn, msg string, compressionLevel int) (*BufferedConn, error) {
|
||||
func genericServer(c net.Conn, msg string, compressionLevel int, id uint64) (*BufferedConn, error) {
|
||||
if err := readMessage(c, msg); err != nil {
|
||||
if errors.Is(err, io.EOF) {
|
||||
// This is TCP healthcheck, which must be ignored in order to prevent from logs pollution.
|
||||
|
@ -86,32 +94,46 @@ func genericServer(c net.Conn, msg string, compressionLevel int) (*BufferedConn,
|
|||
if err := readMessage(c, successResponse); err != nil {
|
||||
return nil, fmt.Errorf("cannot read success response on isCompressed: %w", err)
|
||||
}
|
||||
if err := writeNodeID(c, id); err != nil {
|
||||
return nil, fmt.Errorf("cannot write nodeID: %w", err)
|
||||
}
|
||||
if err := readMessage(c, successResponse); err != nil {
|
||||
return nil, fmt.Errorf("cannot read success response on nodeID: %w", err)
|
||||
}
|
||||
bc := newBufferedConn(c, compressionLevel, isRemoteCompressed)
|
||||
return bc, nil
|
||||
}
|
||||
|
||||
func genericClient(c net.Conn, msg string, compressionLevel int) (*BufferedConn, error) {
|
||||
func genericClient(c net.Conn, msg string, compressionLevel int) (*BufferedConn, uint64, error) {
|
||||
if err := writeMessage(c, msg); err != nil {
|
||||
return nil, fmt.Errorf("cannot write hello: %w", err)
|
||||
return nil, 0, fmt.Errorf("cannot write hello: %w", err)
|
||||
}
|
||||
if err := readMessage(c, successResponse); err != nil {
|
||||
return nil, fmt.Errorf("cannot read success response after sending hello: %w", err)
|
||||
return nil, 0, fmt.Errorf("cannot read success response after sending hello: %w", err)
|
||||
}
|
||||
if err := writeIsCompressed(c, compressionLevel > 0); err != nil {
|
||||
return nil, fmt.Errorf("cannot write isCompressed flag: %w", err)
|
||||
return nil, 0, fmt.Errorf("cannot write isCompressed flag: %w", err)
|
||||
}
|
||||
if err := readMessage(c, successResponse); err != nil {
|
||||
return nil, fmt.Errorf("cannot read success response on isCompressed: %w", err)
|
||||
return nil, 0, fmt.Errorf("cannot read success response on isCompressed: %w", err)
|
||||
}
|
||||
isRemoteCompressed, err := readIsCompressed(c)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot read isCompressed flag: %w", err)
|
||||
return nil, 0, fmt.Errorf("cannot read isCompressed flag: %w", err)
|
||||
}
|
||||
if err := writeMessage(c, successResponse); err != nil {
|
||||
return nil, fmt.Errorf("cannot write success response on isCompressed: %w", err)
|
||||
return nil, 0, fmt.Errorf("cannot write success response on isCompressed: %w", err)
|
||||
}
|
||||
nodeID, err := readNodeID(c)
|
||||
if err != nil {
|
||||
return nil, 0, fmt.Errorf("cannot read nodeID: %w", err)
|
||||
}
|
||||
if err := writeMessage(c, successResponse); err != nil {
|
||||
return nil, 0, fmt.Errorf("cannot write success response on nodeID: %w", err)
|
||||
}
|
||||
|
||||
bc := newBufferedConn(c, compressionLevel, isRemoteCompressed)
|
||||
return bc, nil
|
||||
return bc, nodeID, nil
|
||||
}
|
||||
|
||||
func writeIsCompressed(c net.Conn, isCompressed bool) error {
|
||||
|
@ -164,6 +186,19 @@ func readMessage(c net.Conn, msg string) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func readNodeID(c net.Conn) (uint64, error) {
|
||||
buf, err := readData(c, int(unsafe.Sizeof(uint64(0))))
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return encoding.UnmarshalUint64(buf), nil
|
||||
}
|
||||
|
||||
func writeNodeID(c net.Conn, id uint64) error {
|
||||
buf := encoding.MarshalUint64(nil, id)
|
||||
return writeMessage(c, string(buf[:]))
|
||||
}
|
||||
|
||||
func readData(c net.Conn, dataLen int) ([]byte, error) {
|
||||
if err := c.SetReadDeadline(time.Now().Add(time.Second)); err != nil {
|
||||
return nil, fmt.Errorf("cannot set read deadline: %w", err)
|
||||
|
|
|
@ -15,22 +15,26 @@ func TestVMSelectHandshake(t *testing.T) {
|
|||
testHandshake(t, VMSelectClient, VMSelectServer)
|
||||
}
|
||||
|
||||
func testHandshake(t *testing.T, clientFunc, serverFunc Func) {
|
||||
func testHandshake(t *testing.T, clientFunc ClientFunc, serverFunc ServerFunc) {
|
||||
t.Helper()
|
||||
|
||||
c, s := net.Pipe()
|
||||
ch := make(chan error, 1)
|
||||
go func() {
|
||||
bcs, err := serverFunc(s, 3)
|
||||
bcs, err := serverFunc(s, 3, 1)
|
||||
if err != nil {
|
||||
ch <- fmt.Errorf("error on outer handshake: %w", err)
|
||||
return
|
||||
}
|
||||
bcc, err := clientFunc(bcs, 3)
|
||||
bcc, id, err := clientFunc(bcs, 3)
|
||||
if err != nil {
|
||||
ch <- fmt.Errorf("error on inner handshake: %w", err)
|
||||
return
|
||||
}
|
||||
if id != 1 {
|
||||
ch <- fmt.Errorf("unexpected id; got %d; want 1", id)
|
||||
return
|
||||
}
|
||||
if bcc == nil {
|
||||
ch <- fmt.Errorf("expecting non-nil conn")
|
||||
return
|
||||
|
@ -38,11 +42,15 @@ func testHandshake(t *testing.T, clientFunc, serverFunc Func) {
|
|||
ch <- nil
|
||||
}()
|
||||
|
||||
bcc, err := clientFunc(c, 0)
|
||||
bcc, id, err := clientFunc(c, 0)
|
||||
if err != nil {
|
||||
t.Fatalf("error on outer handshake: %s", err)
|
||||
}
|
||||
bcs, err := serverFunc(bcc, 0)
|
||||
if id != 1 {
|
||||
t.Fatalf("unexpected id; got %d; want 2", id)
|
||||
}
|
||||
|
||||
bcs, err := serverFunc(bcc, 0, 1)
|
||||
if err != nil {
|
||||
t.Fatalf("error on inner handshake: %s", err)
|
||||
}
|
||||
|
|
|
@ -3,12 +3,14 @@ package netutil
|
|||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/VictoriaMetrics/metrics"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/handshake"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
"github.com/VictoriaMetrics/metrics"
|
||||
)
|
||||
|
||||
// ConnPool is a connection pool with ZSTD-compressed connections.
|
||||
|
@ -23,8 +25,9 @@ type ConnPool struct {
|
|||
concurrentDialsCh chan struct{}
|
||||
|
||||
name string
|
||||
handshakeFunc handshake.Func
|
||||
handshakeFunc handshake.ClientFunc
|
||||
compressionLevel int
|
||||
nodeID atomic.Uint64
|
||||
|
||||
conns []connWithTimestamp
|
||||
|
||||
|
@ -49,7 +52,7 @@ type connWithTimestamp struct {
|
|||
// The compression is disabled if compressionLevel <= 0.
|
||||
//
|
||||
// Call ConnPool.MustStop when the returned ConnPool is no longer needed.
|
||||
func NewConnPool(ms *metrics.Set, name, addr string, handshakeFunc handshake.Func, compressionLevel int, dialTimeout, userTimeout time.Duration) *ConnPool {
|
||||
func NewConnPool(ms *metrics.Set, name, addr string, handshakeFunc handshake.ClientFunc, compressionLevel int, dialTimeout, userTimeout time.Duration) *ConnPool {
|
||||
cp := &ConnPool{
|
||||
d: NewTCPDialer(ms, name, addr, dialTimeout, userTimeout),
|
||||
concurrentDialsCh: make(chan struct{}, 8),
|
||||
|
@ -163,7 +166,8 @@ func (cp *ConnPool) dialAndHandshake() (*handshake.BufferedConn, error) {
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
bc, err := cp.handshakeFunc(c, cp.compressionLevel)
|
||||
bc, id, err := cp.handshakeFunc(c, cp.compressionLevel)
|
||||
cp.nodeID.CompareAndSwap(0, id)
|
||||
if err != nil {
|
||||
// Do not put handshake error to cp.lastDialError, because handshake
|
||||
// is perfomed on an already established connection.
|
||||
|
@ -249,6 +253,16 @@ func (cp *ConnPool) checkAvailability(force bool) {
|
|||
}
|
||||
}
|
||||
|
||||
// GetTargetNodeID returns the nodeID of the target server.
|
||||
func (cp *ConnPool) GetTargetNodeID() uint64 {
|
||||
// Ensure that nodeID is initialized.
|
||||
if cp.nodeID.Load() == 0 {
|
||||
cp.checkAvailability(true)
|
||||
}
|
||||
|
||||
return cp.nodeID.Load()
|
||||
}
|
||||
|
||||
func init() {
|
||||
go func() {
|
||||
for {
|
||||
|
|
|
@ -249,12 +249,16 @@ func MustOpenStorage(path string, retention time.Duration, maxHourlySeries, maxD
|
|||
isEmptyDB := !fs.IsPathExist(filepath.Join(path, indexdbDirname))
|
||||
fs.MustMkdirIfNotExist(metadataDir)
|
||||
s.minTimestampForCompositeIndex = mustGetMinTimestampForCompositeIndex(metadataDir, isEmptyDB)
|
||||
nodeIDFileF := filepath.Join(metadataDir, nodeIDFilename)
|
||||
if fs.IsPathExist(nodeIDFileF) {
|
||||
r, err := os.Open(nodeIDFileF)
|
||||
nodeIDPath := filepath.Join(metadataDir, nodeIDFilename)
|
||||
if fs.IsPathExist(nodeIDPath) {
|
||||
r, err := os.Open(nodeIDPath)
|
||||
if err != nil {
|
||||
logger.Panicf("FATAL: cannot open nodeID file %q: %s", nodeIDPath, err)
|
||||
}
|
||||
|
||||
nodeID, err := io.ReadAll(r)
|
||||
if err != nil {
|
||||
logger.Panicf("FATAL: cannot read nodeID from %q: %s", nodeIDFileF, err)
|
||||
logger.Panicf("FATAL: cannot read nodeID from %q: %s", nodeIDPath, err)
|
||||
}
|
||||
s.nodeID = encoding.UnmarshalUint64(nodeID)
|
||||
} else {
|
||||
|
@ -1638,6 +1642,11 @@ func (s *Storage) GetTSDBStatus(qt *querytracer.Tracer, accountID, projectID uin
|
|||
return s.idb().GetTSDBStatus(qt, accountID, projectID, tfss, date, focusLabel, topN, maxMetrics, deadline)
|
||||
}
|
||||
|
||||
// GetID returns a unique identifier for the storage node.
|
||||
func (s *Storage) GetID() uint64 {
|
||||
return s.nodeID
|
||||
}
|
||||
|
||||
// MetricRow is a metric to insert into storage.
|
||||
type MetricRow struct {
|
||||
// MetricNameRaw contains raw metric name, which must be decoded
|
||||
|
|
|
@ -38,6 +38,9 @@ type API interface {
|
|||
|
||||
// Tenants returns list of tenants in the storage on the given tr.
|
||||
Tenants(qt *querytracer.Tracer, tr storage.TimeRange, deadline uint64) ([]string, error)
|
||||
|
||||
// GetID returns the ID of the node.
|
||||
GetID() uint64
|
||||
}
|
||||
|
||||
// BlockIterator must iterate through series blocks found by VMSelect.InitSearch.
|
||||
|
|
|
@ -10,6 +10,8 @@ import (
|
|||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/VictoriaMetrics/metrics"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
|
||||
|
@ -20,7 +22,6 @@ import (
|
|||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/querytracer"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool"
|
||||
"github.com/VictoriaMetrics/metrics"
|
||||
)
|
||||
|
||||
// Server processes vmselect requests.
|
||||
|
@ -193,7 +194,7 @@ func (s *Server) run() {
|
|||
if s.disableResponseCompression {
|
||||
compressionLevel = 0
|
||||
}
|
||||
bc, err := handshake.VMSelectServer(c, compressionLevel)
|
||||
bc, err := handshake.VMSelectServer(c, compressionLevel, s.api.GetID())
|
||||
if err != nil {
|
||||
if s.isStopping() {
|
||||
// c is closed inside Server.MustStop
|
||||
|
|
Loading…
Reference in a new issue