app/vminsert/netstorage: move nodesHash from global state to storageNodesBucket

This should prevent from panics when the list of discovered vmstorage nodes changes.

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/pull/3329
This commit is contained in:
Aliaksandr Valialkin 2022-11-09 11:36:38 +02:00
parent abf7e4e72f
commit 8540dd669b
No known key found for this signature in database
GPG key ID: A72BEC6CD3D0DED1
3 changed files with 62 additions and 50 deletions

View file

@ -19,7 +19,7 @@ import (
//
// InsertCtx.Reset must be called before the first usage.
type InsertCtx struct {
sns []*storageNode
snb *storageNodesBucket
Labels sortedLabels
MetricNameBuf []byte
@ -41,9 +41,9 @@ func (br *bufRows) reset() {
br.rows = 0
}
func (br *bufRows) pushTo(sns []*storageNode, sn *storageNode) error {
func (br *bufRows) pushTo(snb *storageNodesBucket, sn *storageNode) error {
bufLen := len(br.buf)
err := sn.push(sns, br.buf, br.rows)
err := sn.push(snb, br.buf, br.rows)
br.reset()
if err != nil {
return &httpserver.ErrorWithStatusCode{
@ -56,7 +56,7 @@ func (br *bufRows) pushTo(sns []*storageNode, sn *storageNode) error {
// Reset resets ctx.
func (ctx *InsertCtx) Reset() {
ctx.sns = getStorageNodes()
ctx.snb = getStorageNodesBucket()
for i := range ctx.Labels {
label := &ctx.Labels[i]
label.Name = nil
@ -66,7 +66,7 @@ func (ctx *InsertCtx) Reset() {
ctx.MetricNameBuf = ctx.MetricNameBuf[:0]
if ctx.bufRowss == nil {
ctx.bufRowss = make([]bufRows, len(ctx.sns))
ctx.bufRowss = make([]bufRows, len(ctx.snb.sns))
}
for i := range ctx.bufRowss {
ctx.bufRowss[i].reset()
@ -127,12 +127,12 @@ func (ctx *InsertCtx) WriteDataPoint(at *auth.Token, labels []prompb.Label, time
// WriteDataPointExt writes the given metricNameRaw with (timestmap, value) to ctx buffer with the given storageNodeIdx.
func (ctx *InsertCtx) WriteDataPointExt(storageNodeIdx int, metricNameRaw []byte, timestamp int64, value float64) error {
br := &ctx.bufRowss[storageNodeIdx]
sns := ctx.sns
sn := sns[storageNodeIdx]
snb := ctx.snb
sn := snb.sns[storageNodeIdx]
bufNew := storage.MarshalMetricRow(br.buf, metricNameRaw, timestamp, value)
if len(bufNew) >= maxBufSizePerStorageNode {
// Send buf to sn, since it is too big.
if err := br.pushTo(sns, sn); err != nil {
if err := br.pushTo(snb, sn); err != nil {
return err
}
br.buf = storage.MarshalMetricRow(bufNew[:0], metricNameRaw, timestamp, value)
@ -146,12 +146,14 @@ func (ctx *InsertCtx) WriteDataPointExt(storageNodeIdx int, metricNameRaw []byte
// FlushBufs flushes ctx bufs to remote storage nodes.
func (ctx *InsertCtx) FlushBufs() error {
var firstErr error
snb := ctx.snb
sns := snb.sns
for i := range ctx.bufRowss {
br := &ctx.bufRowss[i]
if len(br.buf) == 0 {
continue
}
if err := br.pushTo(ctx.sns, ctx.sns[i]); err != nil && firstErr == nil {
if err := br.pushTo(snb, sns[i]); err != nil && firstErr == nil {
firstErr = err
}
}
@ -162,7 +164,7 @@ func (ctx *InsertCtx) FlushBufs() error {
//
// The returned index must be passed to WriteDataPoint.
func (ctx *InsertCtx) GetStorageNodeIdx(at *auth.Token, labels []prompb.Label) int {
if len(ctx.sns) == 1 {
if len(ctx.snb.sns) == 1 {
// Fast path - only a single storage node.
return 0
}
@ -179,7 +181,7 @@ func (ctx *InsertCtx) GetStorageNodeIdx(at *auth.Token, labels []prompb.Label) i
ctx.labelsBuf = buf
// Do not exclude unavailable storage nodes in order to properly account for rerouted rows in storageNode.push().
idx := nodesHash.getNodeIdx(h, nil)
idx := ctx.snb.nodesHash.getNodeIdx(h, nil)
return idx
}

View file

@ -50,7 +50,7 @@ func (sn *storageNode) isReady() bool {
// if sn is currently unavailable or overloaded.
//
// rows must match the number of rows in the buf.
func (sn *storageNode) push(sns []*storageNode, buf []byte, rows int) error {
func (sn *storageNode) push(snb *storageNodesBucket, buf []byte, rows int) error {
if len(buf) > maxBufSizePerStorageNode {
logger.Panicf("BUG: len(buf)=%d cannot exceed %d", len(buf), maxBufSizePerStorageNode)
}
@ -66,7 +66,7 @@ func (sn *storageNode) push(sns []*storageNode, buf []byte, rows int) error {
return nil
}
// Slow path - sn cannot accept buf now, so re-route it to other vmstorage nodes.
if err := sn.rerouteBufToOtherStorageNodes(sns, buf, rows); err != nil {
if err := sn.rerouteBufToOtherStorageNodes(snb, buf, rows); err != nil {
return fmt.Errorf("error when re-routing rows from %s: %w", sn.dialer.Addr(), err)
}
return nil
@ -74,7 +74,8 @@ func (sn *storageNode) push(sns []*storageNode, buf []byte, rows int) error {
var dropSamplesOnOverloadLogger = logger.WithThrottler("droppedSamplesOnOverload", 5*time.Second)
func (sn *storageNode) rerouteBufToOtherStorageNodes(sns []*storageNode, buf []byte, rows int) error {
func (sn *storageNode) rerouteBufToOtherStorageNodes(snb *storageNodesBucket, buf []byte, rows int) error {
sns := snb.sns
sn.brLock.Lock()
again:
select {
@ -91,7 +92,7 @@ again:
}
sn.brLock.Unlock()
// The vmstorage node isn't ready for data processing. Re-route buf to healthy vmstorage nodes even if disableRerouting is set.
rowsProcessed, err := rerouteRowsToReadyStorageNodes(sns, sn, buf)
rowsProcessed, err := rerouteRowsToReadyStorageNodes(snb, sn, buf)
rows -= rowsProcessed
if err != nil {
return fmt.Errorf("%d rows dropped because the current vsmtorage is unavailable and %w", rows, err)
@ -111,7 +112,7 @@ again:
goto again
}
sn.brLock.Unlock()
rowsProcessed, err := rerouteRowsToFreeStorageNodes(sns, sn, buf)
rowsProcessed, err := rerouteRowsToFreeStorageNodes(snb, sn, buf)
rows -= rowsProcessed
if err != nil {
return fmt.Errorf("%d rows dropped because the current vmstorage buf is full and %w", rows, err)
@ -125,11 +126,12 @@ var closedCh = func() <-chan struct{} {
return ch
}()
func (sn *storageNode) run(sns []*storageNode, snIdx int) {
func (sn *storageNode) run(snb *storageNodesBucket, snIdx int) {
replicas := *replicationFactor
if replicas <= 0 {
replicas = 1
}
sns := snb.sns
if replicas > len(sns) {
replicas = len(sns)
}
@ -180,7 +182,7 @@ func (sn *storageNode) run(sns []*storageNode, snIdx int) {
continue
}
// Send br to replicas storage nodes starting from snIdx.
for !sendBufToReplicasNonblocking(sns, &br, snIdx, replicas) {
for !sendBufToReplicasNonblocking(snb, &br, snIdx, replicas) {
t := timerpool.Get(200 * time.Millisecond)
select {
case <-sn.stopCh:
@ -195,8 +197,9 @@ func (sn *storageNode) run(sns []*storageNode, snIdx int) {
}
}
func sendBufToReplicasNonblocking(sns []*storageNode, br *bufRows, snIdx, replicas int) bool {
func sendBufToReplicasNonblocking(snb *storageNodesBucket, br *bufRows, snIdx, replicas int) bool {
usedStorageNodes := make(map[*storageNode]struct{}, replicas)
sns := snb.sns
for i := 0; i < replicas; i++ {
idx := snIdx + i
attempts := 0
@ -460,8 +463,14 @@ type storageNode struct {
}
type storageNodesBucket struct {
ms *metrics.Set
sns []*storageNode
ms *metrics.Set
// nodesHash is used for consistently selecting a storage node by key.
nodesHash *consistentHash
// sns is a list of storage nodes.
sns []*storageNode
stopCh chan struct{}
wg *sync.WaitGroup
}
@ -477,14 +486,6 @@ func setStorageNodesBucket(snb *storageNodesBucket) {
storageNodes.Store(snb)
}
func getStorageNodes() []*storageNode {
snb := getStorageNodesBucket()
return snb.sns
}
// nodesHash is used for consistently selecting a storage node by key.
var nodesHash *consistentHash
// Init initializes vmstorage nodes' connections to the given addrs.
//
// hashSeed is used for changing the distribution of input time series among addrs.
@ -506,7 +507,7 @@ func initStorageNodes(addrs []string, hashSeed uint64) *storageNodesBucket {
logger.Panicf("BUG: addrs must be non-empty")
}
ms := metrics.NewSet()
nodesHash = newConsistentHash(addrs, hashSeed)
nodesHash := newConsistentHash(addrs, hashSeed)
sns := make([]*storageNode, 0, len(addrs))
stopCh := make(chan struct{})
for _, addr := range addrs {
@ -559,22 +560,25 @@ func initStorageNodes(addrs []string, hashSeed uint64) *storageNodesBucket {
maxBufSizePerStorageNode = consts.MaxInsertPacketSizeForVMInsert
}
metrics.RegisterSet(ms)
var wg sync.WaitGroup
snb := &storageNodesBucket{
ms: ms,
nodesHash: nodesHash,
sns: sns,
stopCh: stopCh,
wg: &wg,
}
for idx, sn := range sns {
wg.Add(1)
go func(sn *storageNode, idx int) {
sn.run(sns, idx)
sn.run(snb, idx)
wg.Done()
}(sn, idx)
}
metrics.RegisterSet(ms)
return &storageNodesBucket{
ms: ms,
sns: sns,
stopCh: stopCh,
wg: &wg,
}
return snb
}
func mustStopStorageNodes(snb *storageNodesBucket) {
@ -590,11 +594,13 @@ func mustStopStorageNodes(snb *storageNodesBucket) {
// rerouteRowsToReadyStorageNodes reroutes src from not ready snSource to ready storage nodes.
//
// The function blocks until src is fully re-routed.
func rerouteRowsToReadyStorageNodes(sns []*storageNode, snSource *storageNode, src []byte) (int, error) {
func rerouteRowsToReadyStorageNodes(snb *storageNodesBucket, snSource *storageNode, src []byte) (int, error) {
reroutesTotal.Inc()
rowsProcessed := 0
var idxsExclude, idxsExcludeNew []int
idxsExclude = getNotReadyStorageNodeIdxsBlocking(sns, idxsExclude[:0], nil)
nodesHash := snb.nodesHash
sns := snb.sns
idxsExclude = getNotReadyStorageNodeIdxsBlocking(snb, idxsExclude[:0], nil)
var mr storage.MetricRow
for len(src) > 0 {
tail, err := mr.UnmarshalX(src)
@ -614,7 +620,7 @@ func rerouteRowsToReadyStorageNodes(sns []*storageNode, snSource *storageNode, s
break
}
// re-generate idxsExclude list, since sn must be put there.
idxsExclude = getNotReadyStorageNodeIdxsBlocking(sns, idxsExclude[:0], nil)
idxsExclude = getNotReadyStorageNodeIdxsBlocking(snb, idxsExclude[:0], nil)
}
if *disableRerouting {
if !sn.sendBufMayBlock(rowBuf) {
@ -637,7 +643,7 @@ func rerouteRowsToReadyStorageNodes(sns []*storageNode, snSource *storageNode, s
continue
}
// If the re-routing is enabled, then try sending the row to another storage node.
idxsExcludeNew = getNotReadyStorageNodeIdxs(sns, idxsExcludeNew[:0], sn)
idxsExcludeNew = getNotReadyStorageNodeIdxs(snb, idxsExcludeNew[:0], sn)
idx := nodesHash.getNodeIdx(h, idxsExcludeNew)
snNew := sns[idx]
if snNew.trySendBuf(rowBuf, 1) {
@ -660,14 +666,16 @@ func rerouteRowsToReadyStorageNodes(sns []*storageNode, snSource *storageNode, s
//
// It is expected that snSource has no enough buffer for sending src.
// It is expected than *dsableRerouting isn't set when calling this function.
func rerouteRowsToFreeStorageNodes(sns []*storageNode, snSource *storageNode, src []byte) (int, error) {
func rerouteRowsToFreeStorageNodes(snb *storageNodesBucket, snSource *storageNode, src []byte) (int, error) {
if *disableRerouting {
logger.Panicf("BUG: disableRerouting must be disabled when calling rerouteRowsToFreeStorageNodes")
}
reroutesTotal.Inc()
rowsProcessed := 0
var idxsExclude []int
idxsExclude = getNotReadyStorageNodeIdxs(sns, idxsExclude[:0], snSource)
nodesHash := snb.nodesHash
sns := snb.sns
idxsExclude = getNotReadyStorageNodeIdxs(snb, idxsExclude[:0], snSource)
var mr storage.MetricRow
for len(src) > 0 {
tail, err := mr.UnmarshalX(src)
@ -694,7 +702,7 @@ func rerouteRowsToFreeStorageNodes(sns []*storageNode, snSource *storageNode, sr
break
}
// re-generate idxsExclude list, since sn must be put there.
idxsExclude = getNotReadyStorageNodeIdxs(sns, idxsExclude[:0], snSource)
idxsExclude = getNotReadyStorageNodeIdxs(snb, idxsExclude[:0], snSource)
}
if sn.trySendBuf(rowBuf, 1) {
rowsProcessed++
@ -710,15 +718,16 @@ func rerouteRowsToFreeStorageNodes(sns []*storageNode, snSource *storageNode, sr
return rowsProcessed, nil
}
func getNotReadyStorageNodeIdxsBlocking(sns []*storageNode, dst []int, snExtra *storageNode) []int {
dst = getNotReadyStorageNodeIdxs(sns, dst[:0], snExtra)
func getNotReadyStorageNodeIdxsBlocking(snb *storageNodesBucket, dst []int, snExtra *storageNode) []int {
dst = getNotReadyStorageNodeIdxs(snb, dst[:0], snExtra)
sns := snb.sns
if len(dst) < len(sns) {
return dst
}
noStorageNodesLogger.Warnf("all the vmstorage nodes are unavailable; stopping data processing util at least a single node becomes available")
for {
time.Sleep(time.Second)
dst = getNotReadyStorageNodeIdxs(sns, dst[:0], snExtra)
dst = getNotReadyStorageNodeIdxs(snb, dst[:0], snExtra)
if availableNodes := len(sns) - len(dst); availableNodes > 0 {
storageNodesBecameAvailableLogger.Warnf("%d vmstorage nodes became available, so continue data processing", availableNodes)
return dst
@ -730,9 +739,9 @@ var storageNodesBecameAvailableLogger = logger.WithThrottler("storageNodesBecame
var noStorageNodesLogger = logger.WithThrottler("storageNodesUnavailable", 5*time.Second)
func getNotReadyStorageNodeIdxs(sns []*storageNode, dst []int, snExtra *storageNode) []int {
func getNotReadyStorageNodeIdxs(snb *storageNodesBucket, dst []int, snExtra *storageNode) []int {
dst = dst[:0]
for i, sn := range sns {
for i, sn := range snb.sns {
if sn == snExtra || !sn.isReady() {
dst = append(dst, i)
}

View file

@ -17,6 +17,7 @@ The following tip changes can be tested by building VictoriaMetrics components f
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): expose `__meta_consul_partition` label for targets discovered via [consul_sd_configs](https://docs.victoriametrics.com/sd_configs.html#consul_sd_configs) in the same way as [Prometheus 2.40 does](https://github.com/prometheus/prometheus/pull/11482).
* BUGFIX: [VictoriaMetrics enterprise](https://docs.victoriametrics.com/enterprise.html): fix a panic at `vminsert` when the discovered list of `vmstorage` nodes is changed during [automatic vmstorage discovery](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#automatic-vmstorage-discovery). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/3329).
* BUGFIX: properly register new time series in per-day inverted index if they were ingested during the last 10 seconds of the day. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3309). Thanks to @lmarszal for the bugreport and for the [initial fix](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/3320).