diff --git a/app/vminsert/netstorage/insert_ctx.go b/app/vminsert/netstorage/insert_ctx.go index cadea642d..340895664 100644 --- a/app/vminsert/netstorage/insert_ctx.go +++ b/app/vminsert/netstorage/insert_ctx.go @@ -19,7 +19,7 @@ import ( // // InsertCtx.Reset must be called before the first usage. type InsertCtx struct { - sns []*storageNode + snb *storageNodesBucket Labels sortedLabels MetricNameBuf []byte @@ -41,9 +41,9 @@ func (br *bufRows) reset() { br.rows = 0 } -func (br *bufRows) pushTo(sns []*storageNode, sn *storageNode) error { +func (br *bufRows) pushTo(snb *storageNodesBucket, sn *storageNode) error { bufLen := len(br.buf) - err := sn.push(sns, br.buf, br.rows) + err := sn.push(snb, br.buf, br.rows) br.reset() if err != nil { return &httpserver.ErrorWithStatusCode{ @@ -56,7 +56,7 @@ func (br *bufRows) pushTo(sns []*storageNode, sn *storageNode) error { // Reset resets ctx. func (ctx *InsertCtx) Reset() { - ctx.sns = getStorageNodes() + ctx.snb = getStorageNodesBucket() for i := range ctx.Labels { label := &ctx.Labels[i] label.Name = nil @@ -66,7 +66,7 @@ func (ctx *InsertCtx) Reset() { ctx.MetricNameBuf = ctx.MetricNameBuf[:0] if ctx.bufRowss == nil { - ctx.bufRowss = make([]bufRows, len(ctx.sns)) + ctx.bufRowss = make([]bufRows, len(ctx.snb.sns)) } for i := range ctx.bufRowss { ctx.bufRowss[i].reset() @@ -127,12 +127,12 @@ func (ctx *InsertCtx) WriteDataPoint(at *auth.Token, labels []prompb.Label, time // WriteDataPointExt writes the given metricNameRaw with (timestmap, value) to ctx buffer with the given storageNodeIdx. func (ctx *InsertCtx) WriteDataPointExt(storageNodeIdx int, metricNameRaw []byte, timestamp int64, value float64) error { br := &ctx.bufRowss[storageNodeIdx] - sns := ctx.sns - sn := sns[storageNodeIdx] + snb := ctx.snb + sn := snb.sns[storageNodeIdx] bufNew := storage.MarshalMetricRow(br.buf, metricNameRaw, timestamp, value) if len(bufNew) >= maxBufSizePerStorageNode { // Send buf to sn, since it is too big. - if err := br.pushTo(sns, sn); err != nil { + if err := br.pushTo(snb, sn); err != nil { return err } br.buf = storage.MarshalMetricRow(bufNew[:0], metricNameRaw, timestamp, value) @@ -146,12 +146,14 @@ func (ctx *InsertCtx) WriteDataPointExt(storageNodeIdx int, metricNameRaw []byte // FlushBufs flushes ctx bufs to remote storage nodes. func (ctx *InsertCtx) FlushBufs() error { var firstErr error + snb := ctx.snb + sns := snb.sns for i := range ctx.bufRowss { br := &ctx.bufRowss[i] if len(br.buf) == 0 { continue } - if err := br.pushTo(ctx.sns, ctx.sns[i]); err != nil && firstErr == nil { + if err := br.pushTo(snb, sns[i]); err != nil && firstErr == nil { firstErr = err } } @@ -162,7 +164,7 @@ func (ctx *InsertCtx) FlushBufs() error { // // The returned index must be passed to WriteDataPoint. func (ctx *InsertCtx) GetStorageNodeIdx(at *auth.Token, labels []prompb.Label) int { - if len(ctx.sns) == 1 { + if len(ctx.snb.sns) == 1 { // Fast path - only a single storage node. return 0 } @@ -179,7 +181,7 @@ func (ctx *InsertCtx) GetStorageNodeIdx(at *auth.Token, labels []prompb.Label) i ctx.labelsBuf = buf // Do not exclude unavailable storage nodes in order to properly account for rerouted rows in storageNode.push(). - idx := nodesHash.getNodeIdx(h, nil) + idx := ctx.snb.nodesHash.getNodeIdx(h, nil) return idx } diff --git a/app/vminsert/netstorage/netstorage.go b/app/vminsert/netstorage/netstorage.go index b972850e3..502883d5f 100644 --- a/app/vminsert/netstorage/netstorage.go +++ b/app/vminsert/netstorage/netstorage.go @@ -50,7 +50,7 @@ func (sn *storageNode) isReady() bool { // if sn is currently unavailable or overloaded. // // rows must match the number of rows in the buf. -func (sn *storageNode) push(sns []*storageNode, buf []byte, rows int) error { +func (sn *storageNode) push(snb *storageNodesBucket, buf []byte, rows int) error { if len(buf) > maxBufSizePerStorageNode { logger.Panicf("BUG: len(buf)=%d cannot exceed %d", len(buf), maxBufSizePerStorageNode) } @@ -66,7 +66,7 @@ func (sn *storageNode) push(sns []*storageNode, buf []byte, rows int) error { return nil } // Slow path - sn cannot accept buf now, so re-route it to other vmstorage nodes. - if err := sn.rerouteBufToOtherStorageNodes(sns, buf, rows); err != nil { + if err := sn.rerouteBufToOtherStorageNodes(snb, buf, rows); err != nil { return fmt.Errorf("error when re-routing rows from %s: %w", sn.dialer.Addr(), err) } return nil @@ -74,7 +74,8 @@ func (sn *storageNode) push(sns []*storageNode, buf []byte, rows int) error { var dropSamplesOnOverloadLogger = logger.WithThrottler("droppedSamplesOnOverload", 5*time.Second) -func (sn *storageNode) rerouteBufToOtherStorageNodes(sns []*storageNode, buf []byte, rows int) error { +func (sn *storageNode) rerouteBufToOtherStorageNodes(snb *storageNodesBucket, buf []byte, rows int) error { + sns := snb.sns sn.brLock.Lock() again: select { @@ -91,7 +92,7 @@ again: } sn.brLock.Unlock() // The vmstorage node isn't ready for data processing. Re-route buf to healthy vmstorage nodes even if disableRerouting is set. - rowsProcessed, err := rerouteRowsToReadyStorageNodes(sns, sn, buf) + rowsProcessed, err := rerouteRowsToReadyStorageNodes(snb, sn, buf) rows -= rowsProcessed if err != nil { return fmt.Errorf("%d rows dropped because the current vsmtorage is unavailable and %w", rows, err) @@ -111,7 +112,7 @@ again: goto again } sn.brLock.Unlock() - rowsProcessed, err := rerouteRowsToFreeStorageNodes(sns, sn, buf) + rowsProcessed, err := rerouteRowsToFreeStorageNodes(snb, sn, buf) rows -= rowsProcessed if err != nil { return fmt.Errorf("%d rows dropped because the current vmstorage buf is full and %w", rows, err) @@ -125,11 +126,12 @@ var closedCh = func() <-chan struct{} { return ch }() -func (sn *storageNode) run(sns []*storageNode, snIdx int) { +func (sn *storageNode) run(snb *storageNodesBucket, snIdx int) { replicas := *replicationFactor if replicas <= 0 { replicas = 1 } + sns := snb.sns if replicas > len(sns) { replicas = len(sns) } @@ -180,7 +182,7 @@ func (sn *storageNode) run(sns []*storageNode, snIdx int) { continue } // Send br to replicas storage nodes starting from snIdx. - for !sendBufToReplicasNonblocking(sns, &br, snIdx, replicas) { + for !sendBufToReplicasNonblocking(snb, &br, snIdx, replicas) { t := timerpool.Get(200 * time.Millisecond) select { case <-sn.stopCh: @@ -195,8 +197,9 @@ func (sn *storageNode) run(sns []*storageNode, snIdx int) { } } -func sendBufToReplicasNonblocking(sns []*storageNode, br *bufRows, snIdx, replicas int) bool { +func sendBufToReplicasNonblocking(snb *storageNodesBucket, br *bufRows, snIdx, replicas int) bool { usedStorageNodes := make(map[*storageNode]struct{}, replicas) + sns := snb.sns for i := 0; i < replicas; i++ { idx := snIdx + i attempts := 0 @@ -460,8 +463,14 @@ type storageNode struct { } type storageNodesBucket struct { - ms *metrics.Set - sns []*storageNode + ms *metrics.Set + + // nodesHash is used for consistently selecting a storage node by key. + nodesHash *consistentHash + + // sns is a list of storage nodes. + sns []*storageNode + stopCh chan struct{} wg *sync.WaitGroup } @@ -477,14 +486,6 @@ func setStorageNodesBucket(snb *storageNodesBucket) { storageNodes.Store(snb) } -func getStorageNodes() []*storageNode { - snb := getStorageNodesBucket() - return snb.sns -} - -// nodesHash is used for consistently selecting a storage node by key. -var nodesHash *consistentHash - // Init initializes vmstorage nodes' connections to the given addrs. // // hashSeed is used for changing the distribution of input time series among addrs. @@ -506,7 +507,7 @@ func initStorageNodes(addrs []string, hashSeed uint64) *storageNodesBucket { logger.Panicf("BUG: addrs must be non-empty") } ms := metrics.NewSet() - nodesHash = newConsistentHash(addrs, hashSeed) + nodesHash := newConsistentHash(addrs, hashSeed) sns := make([]*storageNode, 0, len(addrs)) stopCh := make(chan struct{}) for _, addr := range addrs { @@ -559,22 +560,25 @@ func initStorageNodes(addrs []string, hashSeed uint64) *storageNodesBucket { maxBufSizePerStorageNode = consts.MaxInsertPacketSizeForVMInsert } + metrics.RegisterSet(ms) var wg sync.WaitGroup + snb := &storageNodesBucket{ + ms: ms, + nodesHash: nodesHash, + sns: sns, + stopCh: stopCh, + wg: &wg, + } + for idx, sn := range sns { wg.Add(1) go func(sn *storageNode, idx int) { - sn.run(sns, idx) + sn.run(snb, idx) wg.Done() }(sn, idx) } - metrics.RegisterSet(ms) - return &storageNodesBucket{ - ms: ms, - sns: sns, - stopCh: stopCh, - wg: &wg, - } + return snb } func mustStopStorageNodes(snb *storageNodesBucket) { @@ -590,11 +594,13 @@ func mustStopStorageNodes(snb *storageNodesBucket) { // rerouteRowsToReadyStorageNodes reroutes src from not ready snSource to ready storage nodes. // // The function blocks until src is fully re-routed. -func rerouteRowsToReadyStorageNodes(sns []*storageNode, snSource *storageNode, src []byte) (int, error) { +func rerouteRowsToReadyStorageNodes(snb *storageNodesBucket, snSource *storageNode, src []byte) (int, error) { reroutesTotal.Inc() rowsProcessed := 0 var idxsExclude, idxsExcludeNew []int - idxsExclude = getNotReadyStorageNodeIdxsBlocking(sns, idxsExclude[:0], nil) + nodesHash := snb.nodesHash + sns := snb.sns + idxsExclude = getNotReadyStorageNodeIdxsBlocking(snb, idxsExclude[:0], nil) var mr storage.MetricRow for len(src) > 0 { tail, err := mr.UnmarshalX(src) @@ -614,7 +620,7 @@ func rerouteRowsToReadyStorageNodes(sns []*storageNode, snSource *storageNode, s break } // re-generate idxsExclude list, since sn must be put there. - idxsExclude = getNotReadyStorageNodeIdxsBlocking(sns, idxsExclude[:0], nil) + idxsExclude = getNotReadyStorageNodeIdxsBlocking(snb, idxsExclude[:0], nil) } if *disableRerouting { if !sn.sendBufMayBlock(rowBuf) { @@ -637,7 +643,7 @@ func rerouteRowsToReadyStorageNodes(sns []*storageNode, snSource *storageNode, s continue } // If the re-routing is enabled, then try sending the row to another storage node. - idxsExcludeNew = getNotReadyStorageNodeIdxs(sns, idxsExcludeNew[:0], sn) + idxsExcludeNew = getNotReadyStorageNodeIdxs(snb, idxsExcludeNew[:0], sn) idx := nodesHash.getNodeIdx(h, idxsExcludeNew) snNew := sns[idx] if snNew.trySendBuf(rowBuf, 1) { @@ -660,14 +666,16 @@ func rerouteRowsToReadyStorageNodes(sns []*storageNode, snSource *storageNode, s // // It is expected that snSource has no enough buffer for sending src. // It is expected than *dsableRerouting isn't set when calling this function. -func rerouteRowsToFreeStorageNodes(sns []*storageNode, snSource *storageNode, src []byte) (int, error) { +func rerouteRowsToFreeStorageNodes(snb *storageNodesBucket, snSource *storageNode, src []byte) (int, error) { if *disableRerouting { logger.Panicf("BUG: disableRerouting must be disabled when calling rerouteRowsToFreeStorageNodes") } reroutesTotal.Inc() rowsProcessed := 0 var idxsExclude []int - idxsExclude = getNotReadyStorageNodeIdxs(sns, idxsExclude[:0], snSource) + nodesHash := snb.nodesHash + sns := snb.sns + idxsExclude = getNotReadyStorageNodeIdxs(snb, idxsExclude[:0], snSource) var mr storage.MetricRow for len(src) > 0 { tail, err := mr.UnmarshalX(src) @@ -694,7 +702,7 @@ func rerouteRowsToFreeStorageNodes(sns []*storageNode, snSource *storageNode, sr break } // re-generate idxsExclude list, since sn must be put there. - idxsExclude = getNotReadyStorageNodeIdxs(sns, idxsExclude[:0], snSource) + idxsExclude = getNotReadyStorageNodeIdxs(snb, idxsExclude[:0], snSource) } if sn.trySendBuf(rowBuf, 1) { rowsProcessed++ @@ -710,15 +718,16 @@ func rerouteRowsToFreeStorageNodes(sns []*storageNode, snSource *storageNode, sr return rowsProcessed, nil } -func getNotReadyStorageNodeIdxsBlocking(sns []*storageNode, dst []int, snExtra *storageNode) []int { - dst = getNotReadyStorageNodeIdxs(sns, dst[:0], snExtra) +func getNotReadyStorageNodeIdxsBlocking(snb *storageNodesBucket, dst []int, snExtra *storageNode) []int { + dst = getNotReadyStorageNodeIdxs(snb, dst[:0], snExtra) + sns := snb.sns if len(dst) < len(sns) { return dst } noStorageNodesLogger.Warnf("all the vmstorage nodes are unavailable; stopping data processing util at least a single node becomes available") for { time.Sleep(time.Second) - dst = getNotReadyStorageNodeIdxs(sns, dst[:0], snExtra) + dst = getNotReadyStorageNodeIdxs(snb, dst[:0], snExtra) if availableNodes := len(sns) - len(dst); availableNodes > 0 { storageNodesBecameAvailableLogger.Warnf("%d vmstorage nodes became available, so continue data processing", availableNodes) return dst @@ -730,9 +739,9 @@ var storageNodesBecameAvailableLogger = logger.WithThrottler("storageNodesBecame var noStorageNodesLogger = logger.WithThrottler("storageNodesUnavailable", 5*time.Second) -func getNotReadyStorageNodeIdxs(sns []*storageNode, dst []int, snExtra *storageNode) []int { +func getNotReadyStorageNodeIdxs(snb *storageNodesBucket, dst []int, snExtra *storageNode) []int { dst = dst[:0] - for i, sn := range sns { + for i, sn := range snb.sns { if sn == snExtra || !sn.isReady() { dst = append(dst, i) } diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index ef37279e0..c66b86fb4 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -17,6 +17,7 @@ The following tip changes can be tested by building VictoriaMetrics components f * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): expose `__meta_consul_partition` label for targets discovered via [consul_sd_configs](https://docs.victoriametrics.com/sd_configs.html#consul_sd_configs) in the same way as [Prometheus 2.40 does](https://github.com/prometheus/prometheus/pull/11482). +* BUGFIX: [VictoriaMetrics enterprise](https://docs.victoriametrics.com/enterprise.html): fix a panic at `vminsert` when the discovered list of `vmstorage` nodes is changed during [automatic vmstorage discovery](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#automatic-vmstorage-discovery). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/3329). * BUGFIX: properly register new time series in per-day inverted index if they were ingested during the last 10 seconds of the day. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3309). Thanks to @lmarszal for the bugreport and for the [initial fix](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/3320).