app/{vminsert,vmselect}/netstorage: allow calling Init()+MustStop() in a loop

Previously netstorage.MustStop() call didn't free up all the resources,
so the subsequent call to nestorage.Init() would panic.

This allows writing tests, which call nestorage.Init() + nestorage.MustStop() in a loop.
This commit is contained in:
Aliaksandr Valialkin 2022-10-25 14:41:56 +03:00
parent 596251eb87
commit 4f53147ed4
No known key found for this signature in database
GPG key ID: A72BEC6CD3D0DED1
17 changed files with 454 additions and 202 deletions

View file

@ -93,7 +93,7 @@ func main() {
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1672
hashSeed = 0xabcdef0123456789
}
netstorage.InitStorageNodes(*storageNodes, hashSeed)
netstorage.Init(*storageNodes, hashSeed)
logger.Infof("successfully initialized netstorage in %.3f seconds", time.Since(startTime).Seconds())
relabel.Init()
@ -158,7 +158,7 @@ func main() {
logger.Infof("shutting down neststorage...")
startTime = time.Now()
netstorage.Stop()
netstorage.MustStop()
logger.Infof("successfully stopped netstorage in %.3f seconds", time.Since(startTime).Seconds())
fs.MustStopDirRemover()

View file

@ -19,6 +19,7 @@ import (
//
// InsertCtx.Reset must be called before the first usage.
type InsertCtx struct {
sns []*storageNode
Labels sortedLabels
MetricNameBuf []byte
@ -40,9 +41,9 @@ func (br *bufRows) reset() {
br.rows = 0
}
func (br *bufRows) pushTo(sn *storageNode) error {
func (br *bufRows) pushTo(sns []*storageNode, sn *storageNode) error {
bufLen := len(br.buf)
err := sn.push(br.buf, br.rows)
err := sn.push(sns, br.buf, br.rows)
br.reset()
if err != nil {
return &httpserver.ErrorWithStatusCode{
@ -55,6 +56,7 @@ func (br *bufRows) pushTo(sn *storageNode) error {
// Reset resets ctx.
func (ctx *InsertCtx) Reset() {
ctx.sns = getStorageNodes()
for i := range ctx.Labels {
label := &ctx.Labels[i]
label.Name = nil
@ -64,7 +66,7 @@ func (ctx *InsertCtx) Reset() {
ctx.MetricNameBuf = ctx.MetricNameBuf[:0]
if ctx.bufRowss == nil {
ctx.bufRowss = make([]bufRows, len(storageNodes))
ctx.bufRowss = make([]bufRows, len(ctx.sns))
}
for i := range ctx.bufRowss {
ctx.bufRowss[i].reset()
@ -125,11 +127,12 @@ func (ctx *InsertCtx) WriteDataPoint(at *auth.Token, labels []prompb.Label, time
// WriteDataPointExt writes the given metricNameRaw with (timestmap, value) to ctx buffer with the given storageNodeIdx.
func (ctx *InsertCtx) WriteDataPointExt(storageNodeIdx int, metricNameRaw []byte, timestamp int64, value float64) error {
br := &ctx.bufRowss[storageNodeIdx]
sn := storageNodes[storageNodeIdx]
sns := ctx.sns
sn := sns[storageNodeIdx]
bufNew := storage.MarshalMetricRow(br.buf, metricNameRaw, timestamp, value)
if len(bufNew) >= maxBufSizePerStorageNode {
// Send buf to storageNode, since it is too big.
if err := br.pushTo(sn); err != nil {
// Send buf to sn, since it is too big.
if err := br.pushTo(sns, sn); err != nil {
return err
}
br.buf = storage.MarshalMetricRow(bufNew[:0], metricNameRaw, timestamp, value)
@ -148,7 +151,7 @@ func (ctx *InsertCtx) FlushBufs() error {
if len(br.buf) == 0 {
continue
}
if err := br.pushTo(storageNodes[i]); err != nil && firstErr == nil {
if err := br.pushTo(ctx.sns, ctx.sns[i]); err != nil && firstErr == nil {
firstErr = err
}
}
@ -159,7 +162,7 @@ func (ctx *InsertCtx) FlushBufs() error {
//
// The returned index must be passed to WriteDataPoint.
func (ctx *InsertCtx) GetStorageNodeIdx(at *auth.Token, labels []prompb.Label) int {
if len(storageNodes) == 1 {
if len(ctx.sns) == 1 {
// Fast path - only a single storage node.
return 0
}

View file

@ -43,14 +43,14 @@ func (sn *storageNode) isReady() bool {
// push pushes buf to sn internal bufs.
//
// This function doesn't block on fast path.
// It may block only if storageNodes cannot handle the incoming ingestion rate.
// It may block only if storage nodes cannot handle the incoming ingestion rate.
// This blocking provides backpressure to the caller.
//
// The function falls back to sending data to other vmstorage nodes
// if sn is currently unavailable or overloaded.
//
// rows must match the number of rows in the buf.
func (sn *storageNode) push(buf []byte, rows int) error {
func (sn *storageNode) push(sns []*storageNode, buf []byte, rows int) error {
if len(buf) > maxBufSizePerStorageNode {
logger.Panicf("BUG: len(buf)=%d cannot exceed %d", len(buf), maxBufSizePerStorageNode)
}
@ -66,7 +66,7 @@ func (sn *storageNode) push(buf []byte, rows int) error {
return nil
}
// Slow path - sn cannot accept buf now, so re-route it to other vmstorage nodes.
if err := sn.rerouteBufToOtherStorageNodes(buf, rows); err != nil {
if err := sn.rerouteBufToOtherStorageNodes(sns, buf, rows); err != nil {
return fmt.Errorf("error when re-routing rows from %s: %w", sn.dialer.Addr(), err)
}
return nil
@ -74,24 +74,24 @@ func (sn *storageNode) push(buf []byte, rows int) error {
var dropSamplesOnOverloadLogger = logger.WithThrottler("droppedSamplesOnOverload", 5*time.Second)
func (sn *storageNode) rerouteBufToOtherStorageNodes(buf []byte, rows int) error {
func (sn *storageNode) rerouteBufToOtherStorageNodes(sns []*storageNode, buf []byte, rows int) error {
sn.brLock.Lock()
again:
select {
case <-storageNodesStopCh:
case <-sn.stopCh:
sn.brLock.Unlock()
return fmt.Errorf("cannot send %d rows because of graceful shutdown", rows)
default:
}
if !sn.isReady() {
if len(storageNodes) == 1 {
if len(sns) == 1 {
// There are no other storage nodes to re-route to. So wait until the current node becomes healthy.
sn.brCond.Wait()
goto again
}
sn.brLock.Unlock()
// The vmstorage node isn't ready for data processing. Re-route buf to healthy vmstorage nodes even if disableRerouting is set.
rowsProcessed, err := rerouteRowsToReadyStorageNodes(sn, buf)
rowsProcessed, err := rerouteRowsToReadyStorageNodes(sns, sn, buf)
rows -= rowsProcessed
if err != nil {
return fmt.Errorf("%d rows dropped because the current vsmtorage is unavailable and %w", rows, err)
@ -106,12 +106,12 @@ again:
return nil
}
// Slow path: the buf contents doesn't fit sn.buf, so try re-routing it to other vmstorage nodes.
if *disableRerouting || len(storageNodes) == 1 {
if *disableRerouting || len(sns) == 1 {
sn.brCond.Wait()
goto again
}
sn.brLock.Unlock()
rowsProcessed, err := rerouteRowsToFreeStorageNodes(sn, buf)
rowsProcessed, err := rerouteRowsToFreeStorageNodes(sns, sn, buf)
rows -= rowsProcessed
if err != nil {
return fmt.Errorf("%d rows dropped because the current vmstorage buf is full and %w", rows, err)
@ -125,19 +125,19 @@ var closedCh = func() <-chan struct{} {
return ch
}()
func (sn *storageNode) run(stopCh <-chan struct{}, snIdx int) {
func (sn *storageNode) run(sns []*storageNode, snIdx int) {
replicas := *replicationFactor
if replicas <= 0 {
replicas = 1
}
if replicas > len(storageNodes) {
replicas = len(storageNodes)
if replicas > len(sns) {
replicas = len(sns)
}
sn.readOnlyCheckerWG.Add(1)
go func() {
defer sn.readOnlyCheckerWG.Done()
sn.readOnlyChecker(stopCh)
sn.readOnlyChecker()
}()
defer sn.readOnlyCheckerWG.Wait()
@ -157,7 +157,7 @@ func (sn *storageNode) run(stopCh <-chan struct{}, snIdx int) {
waitCh = closedCh
}
select {
case <-stopCh:
case <-sn.stopCh:
mustStop = true
// Make sure the sn.buf is flushed last time before returning
// in order to send the remaining bits of data.
@ -179,11 +179,11 @@ func (sn *storageNode) run(stopCh <-chan struct{}, snIdx int) {
// Nothing to send.
continue
}
// Send br to replicas storageNodes starting from snIdx.
for !sendBufToReplicasNonblocking(&br, snIdx, replicas) {
// Send br to replicas storage nodes starting from snIdx.
for !sendBufToReplicasNonblocking(sns, &br, snIdx, replicas) {
t := timerpool.Get(200 * time.Millisecond)
select {
case <-stopCh:
case <-sn.stopCh:
timerpool.Put(t)
return
case <-t.C:
@ -195,14 +195,14 @@ func (sn *storageNode) run(stopCh <-chan struct{}, snIdx int) {
}
}
func sendBufToReplicasNonblocking(br *bufRows, snIdx, replicas int) bool {
func sendBufToReplicasNonblocking(sns []*storageNode, br *bufRows, snIdx, replicas int) bool {
usedStorageNodes := make(map[*storageNode]struct{}, replicas)
for i := 0; i < replicas; i++ {
idx := snIdx + i
attempts := 0
for {
attempts++
if attempts > len(storageNodes) {
if attempts > len(sns) {
if i == 0 {
// The data wasn't replicated at all.
cannotReplicateLogger.Warnf("cannot push %d bytes with %d rows to storage nodes, since all the nodes are temporarily unavailable; "+
@ -217,10 +217,10 @@ func sendBufToReplicasNonblocking(br *bufRows, snIdx, replicas int) bool {
"since a part of storage nodes is temporarily unavailable", i+1, replicas, *replicationFactor, len(br.buf), br.rows)
return true
}
if idx >= len(storageNodes) {
idx %= len(storageNodes)
if idx >= len(sns) {
idx %= len(sns)
}
sn := storageNodes[idx]
sn := sns[idx]
idx++
if _, ok := usedStorageNodes[sn]; ok {
// The br has been already replicated to sn. Skip it.
@ -423,6 +423,8 @@ type storageNode struct {
dialer *netutil.TCPDialer
stopCh chan struct{}
// last error during dial.
lastDialErr error
@ -457,99 +459,129 @@ type storageNode struct {
sendDurationSeconds *metrics.FloatCounter
}
func getStorageNodes() []*storageNode {
v := storageNodes.Load()
snb := v.(*storageNodesBucket)
return snb.sns
}
type storageNodesBucket struct {
ms *metrics.Set
sns []*storageNode
stopCh chan struct{}
wg *sync.WaitGroup
}
// storageNodes contains a list of vmstorage node clients.
var storageNodes []*storageNode
var storageNodesWG sync.WaitGroup
var storageNodesStopCh = make(chan struct{})
var storageNodes atomic.Value
// nodesHash is used for consistently selecting a storage node by key.
var nodesHash *consistentHash
// InitStorageNodes initializes vmstorage nodes' connections to the given addrs.
// Init initializes vmstorage nodes' connections to the given addrs.
//
// hashSeed is used for changing the distribution of input time series among addrs.
func InitStorageNodes(addrs []string, hashSeed uint64) {
//
// Call MustStop when the initialized vmstorage connections are no longer needed.
func Init(addrs []string, hashSeed uint64) {
if len(addrs) == 0 {
logger.Panicf("BUG: addrs must be non-empty")
}
ms := metrics.NewSet()
nodesHash = newConsistentHash(addrs, hashSeed)
storageNodes = storageNodes[:0]
sns := make([]*storageNode, 0, len(addrs))
stopCh := make(chan struct{})
for _, addr := range addrs {
if _, _, err := net.SplitHostPort(addr); err != nil {
// Automatically add missing port.
addr += ":8400"
}
sn := &storageNode{
dialer: netutil.NewTCPDialer("vminsert", addr, *vmstorageDialTimeout),
dialer: netutil.NewTCPDialer(ms, "vminsert", addr, *vmstorageDialTimeout),
dialErrors: metrics.NewCounter(fmt.Sprintf(`vm_rpc_dial_errors_total{name="vminsert", addr=%q}`, addr)),
handshakeErrors: metrics.NewCounter(fmt.Sprintf(`vm_rpc_handshake_errors_total{name="vminsert", addr=%q}`, addr)),
connectionErrors: metrics.NewCounter(fmt.Sprintf(`vm_rpc_connection_errors_total{name="vminsert", addr=%q}`, addr)),
rowsPushed: metrics.NewCounter(fmt.Sprintf(`vm_rpc_rows_pushed_total{name="vminsert", addr=%q}`, addr)),
rowsSent: metrics.NewCounter(fmt.Sprintf(`vm_rpc_rows_sent_total{name="vminsert", addr=%q}`, addr)),
rowsDroppedOnOverload: metrics.NewCounter(fmt.Sprintf(`vm_rpc_rows_dropped_on_overload_total{name="vminsert", addr=%q}`, addr)),
rowsReroutedFromHere: metrics.NewCounter(fmt.Sprintf(`vm_rpc_rows_rerouted_from_here_total{name="vminsert", addr=%q}`, addr)),
rowsReroutedToHere: metrics.NewCounter(fmt.Sprintf(`vm_rpc_rows_rerouted_to_here_total{name="vminsert", addr=%q}`, addr)),
sendDurationSeconds: metrics.NewFloatCounter(fmt.Sprintf(`vm_rpc_send_duration_seconds_total{name="vminsert", addr=%q}`, addr)),
stopCh: stopCh,
dialErrors: ms.NewCounter(fmt.Sprintf(`vm_rpc_dial_errors_total{name="vminsert", addr=%q}`, addr)),
handshakeErrors: ms.NewCounter(fmt.Sprintf(`vm_rpc_handshake_errors_total{name="vminsert", addr=%q}`, addr)),
connectionErrors: ms.NewCounter(fmt.Sprintf(`vm_rpc_connection_errors_total{name="vminsert", addr=%q}`, addr)),
rowsPushed: ms.NewCounter(fmt.Sprintf(`vm_rpc_rows_pushed_total{name="vminsert", addr=%q}`, addr)),
rowsSent: ms.NewCounter(fmt.Sprintf(`vm_rpc_rows_sent_total{name="vminsert", addr=%q}`, addr)),
rowsDroppedOnOverload: ms.NewCounter(fmt.Sprintf(`vm_rpc_rows_dropped_on_overload_total{name="vminsert", addr=%q}`, addr)),
rowsReroutedFromHere: ms.NewCounter(fmt.Sprintf(`vm_rpc_rows_rerouted_from_here_total{name="vminsert", addr=%q}`, addr)),
rowsReroutedToHere: ms.NewCounter(fmt.Sprintf(`vm_rpc_rows_rerouted_to_here_total{name="vminsert", addr=%q}`, addr)),
sendDurationSeconds: ms.NewFloatCounter(fmt.Sprintf(`vm_rpc_send_duration_seconds_total{name="vminsert", addr=%q}`, addr)),
}
sn.brCond = sync.NewCond(&sn.brLock)
_ = metrics.NewGauge(fmt.Sprintf(`vm_rpc_rows_pending{name="vminsert", addr=%q}`, addr), func() float64 {
_ = ms.NewGauge(fmt.Sprintf(`vm_rpc_rows_pending{name="vminsert", addr=%q}`, addr), func() float64 {
sn.brLock.Lock()
n := sn.br.rows
sn.brLock.Unlock()
return float64(n)
})
_ = metrics.NewGauge(fmt.Sprintf(`vm_rpc_buf_pending_bytes{name="vminsert", addr=%q}`, addr), func() float64 {
_ = ms.NewGauge(fmt.Sprintf(`vm_rpc_buf_pending_bytes{name="vminsert", addr=%q}`, addr), func() float64 {
sn.brLock.Lock()
n := len(sn.br.buf)
sn.brLock.Unlock()
return float64(n)
})
_ = metrics.NewGauge(fmt.Sprintf(`vm_rpc_vmstorage_is_reachable{name="vminsert", addr=%q}`, addr), func() float64 {
_ = ms.NewGauge(fmt.Sprintf(`vm_rpc_vmstorage_is_reachable{name="vminsert", addr=%q}`, addr), func() float64 {
if atomic.LoadUint32(&sn.broken) != 0 {
return 0
}
return 1
})
_ = metrics.NewGauge(fmt.Sprintf(`vm_rpc_vmstorage_is_read_only{name="vminsert", addr=%q}`, addr), func() float64 {
_ = ms.NewGauge(fmt.Sprintf(`vm_rpc_vmstorage_is_read_only{name="vminsert", addr=%q}`, addr), func() float64 {
return float64(atomic.LoadUint32(&sn.isReadOnly))
})
storageNodes = append(storageNodes, sn)
sns = append(sns, sn)
}
maxBufSizePerStorageNode = memory.Allowed() / 8 / len(storageNodes)
maxBufSizePerStorageNode = memory.Allowed() / 8 / len(sns)
if maxBufSizePerStorageNode > consts.MaxInsertPacketSizeForVMInsert {
maxBufSizePerStorageNode = consts.MaxInsertPacketSizeForVMInsert
}
for idx, sn := range storageNodes {
storageNodesWG.Add(1)
var wg sync.WaitGroup
for idx, sn := range sns {
wg.Add(1)
go func(sn *storageNode, idx int) {
sn.run(storageNodesStopCh, idx)
storageNodesWG.Done()
sn.run(sns, idx)
wg.Done()
}(sn, idx)
}
metrics.RegisterSet(ms)
storageNodes.Store(&storageNodesBucket{
ms: ms,
sns: sns,
stopCh: stopCh,
wg: &wg,
})
}
// Stop gracefully stops netstorage.
func Stop() {
close(storageNodesStopCh)
for _, sn := range storageNodes {
// MustStop stops netstorage.
func MustStop() {
v := storageNodes.Load()
snb := v.(*storageNodesBucket)
storageNodes.Store(&storageNodesBucket{})
close(snb.stopCh)
for _, sn := range snb.sns {
sn.brCond.Broadcast()
}
storageNodesWG.Wait()
snb.wg.Wait()
metrics.UnregisterSet(snb.ms)
snb.ms.UnregisterAllMetrics()
}
// rerouteRowsToReadyStorageNodes reroutes src from not ready snSource to ready storage nodes.
//
// The function blocks until src is fully re-routed.
func rerouteRowsToReadyStorageNodes(snSource *storageNode, src []byte) (int, error) {
func rerouteRowsToReadyStorageNodes(sns []*storageNode, snSource *storageNode, src []byte) (int, error) {
reroutesTotal.Inc()
rowsProcessed := 0
var idxsExclude, idxsExcludeNew []int
idxsExclude = getNotReadyStorageNodeIdxsBlocking(idxsExclude[:0], nil)
idxsExclude = getNotReadyStorageNodeIdxsBlocking(sns, idxsExclude[:0], nil)
var mr storage.MetricRow
for len(src) > 0 {
tail, err := mr.UnmarshalX(src)
@ -564,12 +596,12 @@ func rerouteRowsToReadyStorageNodes(snSource *storageNode, src []byte) (int, err
var sn *storageNode
for {
idx := nodesHash.getNodeIdx(h, idxsExclude)
sn = storageNodes[idx]
sn = sns[idx]
if sn.isReady() {
break
}
// re-generate idxsExclude list, since sn must be put there.
idxsExclude = getNotReadyStorageNodeIdxsBlocking(idxsExclude[:0], nil)
idxsExclude = getNotReadyStorageNodeIdxsBlocking(sns, idxsExclude[:0], nil)
}
if *disableRerouting {
if !sn.sendBufMayBlock(rowBuf) {
@ -592,9 +624,9 @@ func rerouteRowsToReadyStorageNodes(snSource *storageNode, src []byte) (int, err
continue
}
// If the re-routing is enabled, then try sending the row to another storage node.
idxsExcludeNew = getNotReadyStorageNodeIdxs(idxsExcludeNew[:0], sn)
idxsExcludeNew = getNotReadyStorageNodeIdxs(sns, idxsExcludeNew[:0], sn)
idx := nodesHash.getNodeIdx(h, idxsExcludeNew)
snNew := storageNodes[idx]
snNew := sns[idx]
if snNew.trySendBuf(rowBuf, 1) {
rowsProcessed++
if snNew != snSource {
@ -615,14 +647,14 @@ func rerouteRowsToReadyStorageNodes(snSource *storageNode, src []byte) (int, err
//
// It is expected that snSource has no enough buffer for sending src.
// It is expected than *dsableRerouting isn't set when calling this function.
func rerouteRowsToFreeStorageNodes(snSource *storageNode, src []byte) (int, error) {
func rerouteRowsToFreeStorageNodes(sns []*storageNode, snSource *storageNode, src []byte) (int, error) {
if *disableRerouting {
logger.Panicf("BUG: disableRerouting must be disabled when calling rerouteRowsToFreeStorageNodes")
}
reroutesTotal.Inc()
rowsProcessed := 0
var idxsExclude []int
idxsExclude = getNotReadyStorageNodeIdxs(idxsExclude[:0], snSource)
idxsExclude = getNotReadyStorageNodeIdxs(sns, idxsExclude[:0], snSource)
var mr storage.MetricRow
for len(src) > 0 {
tail, err := mr.UnmarshalX(src)
@ -644,12 +676,12 @@ func rerouteRowsToFreeStorageNodes(snSource *storageNode, src []byte) (int, erro
var sn *storageNode
for {
idx := nodesHash.getNodeIdx(h, idxsExclude)
sn = storageNodes[idx]
sn = sns[idx]
if sn.isReady() {
break
}
// re-generate idxsExclude list, since sn must be put there.
idxsExclude = getNotReadyStorageNodeIdxs(idxsExclude[:0], snSource)
idxsExclude = getNotReadyStorageNodeIdxs(sns, idxsExclude[:0], snSource)
}
if sn.trySendBuf(rowBuf, 1) {
rowsProcessed++
@ -665,16 +697,16 @@ func rerouteRowsToFreeStorageNodes(snSource *storageNode, src []byte) (int, erro
return rowsProcessed, nil
}
func getNotReadyStorageNodeIdxsBlocking(dst []int, snExtra *storageNode) []int {
dst = getNotReadyStorageNodeIdxs(dst[:0], snExtra)
if len(dst) < len(storageNodes) {
func getNotReadyStorageNodeIdxsBlocking(sns []*storageNode, dst []int, snExtra *storageNode) []int {
dst = getNotReadyStorageNodeIdxs(sns, dst[:0], snExtra)
if len(dst) < len(sns) {
return dst
}
noStorageNodesLogger.Warnf("all the vmstorage nodes are unavailable; stopping data processing util at least a single node becomes available")
for {
time.Sleep(time.Second)
dst = getNotReadyStorageNodeIdxs(dst[:0], snExtra)
if availableNodes := len(storageNodes) - len(dst); availableNodes > 0 {
dst = getNotReadyStorageNodeIdxs(sns, dst[:0], snExtra)
if availableNodes := len(sns) - len(dst); availableNodes > 0 {
storageNodesBecameAvailableLogger.Warnf("%d vmstorage nodes became available, so continue data processing", availableNodes)
return dst
}
@ -685,9 +717,9 @@ var storageNodesBecameAvailableLogger = logger.WithThrottler("storageNodesBecame
var noStorageNodesLogger = logger.WithThrottler("storageNodesUnavailable", 5*time.Second)
func getNotReadyStorageNodeIdxs(dst []int, snExtra *storageNode) []int {
func getNotReadyStorageNodeIdxs(sns []*storageNode, dst []int, snExtra *storageNode) []int {
dst = dst[:0]
for i, sn := range storageNodes {
for i, sn := range sns {
if sn == snExtra || !sn.isReady() {
dst = append(dst, i)
}
@ -711,7 +743,7 @@ func (sn *storageNode) sendBufMayBlock(buf []byte) bool {
sn.brLock.Lock()
for len(sn.br.buf)+len(buf) > maxBufSizePerStorageNode {
select {
case <-storageNodesStopCh:
case <-sn.stopCh:
sn.brLock.Unlock()
return false
default:
@ -724,12 +756,12 @@ func (sn *storageNode) sendBufMayBlock(buf []byte) bool {
return true
}
func (sn *storageNode) readOnlyChecker(stop <-chan struct{}) {
func (sn *storageNode) readOnlyChecker() {
ticker := time.NewTicker(time.Second * 30)
defer ticker.Stop()
for {
select {
case <-stop:
case <-sn.stopCh:
return
case <-ticker.C:
sn.checkReadOnlyMode()

View file

@ -0,0 +1,14 @@
package netstorage
import (
"runtime"
"testing"
)
func TestInitStopNodes(t *testing.T) {
for i := 0; i < 3; i++ {
Init([]string{"host1", "host2"}, 0)
runtime.Gosched()
MustStop()
}
}

View file

@ -90,7 +90,7 @@ func main() {
logger.Fatalf("found equal addresses of storage nodes in the -storageNodes flag: %q", duplicatedAddr)
}
netstorage.InitStorageNodes(*storageNodes)
netstorage.Init(*storageNodes)
logger.Infof("started netstorage in %.3f seconds", time.Since(startTime).Seconds())
if len(*cacheDataPath) > 0 {
@ -137,7 +137,7 @@ func main() {
logger.Infof("shutting down neststorage...")
startTime = time.Now()
netstorage.Stop()
netstorage.MustStop()
if len(*cacheDataPath) > 0 {
promql.StopRollupResultCache()
}

View file

@ -648,21 +648,22 @@ func (sbh *sortBlocksHeap) Pop() interface{} {
func RegisterMetricNames(qt *querytracer.Tracer, mrs []storage.MetricRow, deadline searchutils.Deadline) error {
qt = qt.NewChild("register metric names")
defer qt.Done()
sns := getStorageNodes()
// Split mrs among available vmstorage nodes.
mrsPerNode := make([][]storage.MetricRow, len(storageNodes))
mrsPerNode := make([][]storage.MetricRow, len(sns))
for _, mr := range mrs {
idx := 0
if len(storageNodes) > 1 {
if len(sns) > 1 {
// There is no need in using the same hash as for time series distribution in vminsert,
// since RegisterMetricNames is used only in Graphite Tags API.
h := xxhash.Sum64(mr.MetricNameRaw)
idx = int(h % uint64(len(storageNodes)))
idx = int(h % uint64(len(sns)))
}
mrsPerNode[idx] = append(mrsPerNode[idx], mr)
}
// Push mrs to storage nodes in parallel.
snr := startStorageNodesRequest(qt, true, func(qt *querytracer.Tracer, workerID uint, sn *storageNode) interface{} {
snr := startStorageNodesRequest(qt, sns, true, func(qt *querytracer.Tracer, workerID uint, sn *storageNode) interface{} {
sn.registerMetricNamesRequests.Inc()
err := sn.registerMetricNames(qt, mrsPerNode[workerID], deadline)
if err != nil {
@ -693,7 +694,8 @@ func DeleteSeries(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline sear
deletedCount int
err error
}
snr := startStorageNodesRequest(qt, true, func(qt *querytracer.Tracer, workerID uint, sn *storageNode) interface{} {
sns := getStorageNodes()
snr := startStorageNodesRequest(qt, sns, true, func(qt *querytracer.Tracer, workerID uint, sn *storageNode) interface{} {
sn.deleteSeriesRequests.Inc()
deletedCount, err := sn.deleteSeries(qt, requestData, deadline)
if err != nil {
@ -734,7 +736,8 @@ func LabelNames(qt *querytracer.Tracer, denyPartialResponse bool, sq *storage.Se
labelNames []string
err error
}
snr := startStorageNodesRequest(qt, denyPartialResponse, func(qt *querytracer.Tracer, workerID uint, sn *storageNode) interface{} {
sns := getStorageNodes()
snr := startStorageNodesRequest(qt, sns, denyPartialResponse, func(qt *querytracer.Tracer, workerID uint, sn *storageNode) interface{} {
sn.labelNamesRequests.Inc()
labelNames, err := sn.getLabelNames(qt, requestData, maxLabelNames, deadline)
if err != nil {
@ -836,7 +839,8 @@ func LabelValues(qt *querytracer.Tracer, denyPartialResponse bool, labelName str
labelValues []string
err error
}
snr := startStorageNodesRequest(qt, denyPartialResponse, func(qt *querytracer.Tracer, workerID uint, sn *storageNode) interface{} {
sns := getStorageNodes()
snr := startStorageNodesRequest(qt, sns, denyPartialResponse, func(qt *querytracer.Tracer, workerID uint, sn *storageNode) interface{} {
sn.labelValuesRequests.Inc()
labelValues, err := sn.getLabelValues(qt, labelName, requestData, maxLabelValues, deadline)
if err != nil {
@ -918,7 +922,8 @@ func TagValueSuffixes(qt *querytracer.Tracer, accountID, projectID uint32, denyP
suffixes []string
err error
}
snr := startStorageNodesRequest(qt, denyPartialResponse, func(qt *querytracer.Tracer, workerID uint, sn *storageNode) interface{} {
sns := getStorageNodes()
snr := startStorageNodesRequest(qt, sns, denyPartialResponse, func(qt *querytracer.Tracer, workerID uint, sn *storageNode) interface{} {
sn.tagValueSuffixesRequests.Inc()
suffixes, err := sn.getTagValueSuffixes(qt, accountID, projectID, tr, tagKey, tagValuePrefix, delimiter, maxSuffixes, deadline)
if err != nil {
@ -982,7 +987,8 @@ func TSDBStatus(qt *querytracer.Tracer, denyPartialResponse bool, sq *storage.Se
status *storage.TSDBStatus
err error
}
snr := startStorageNodesRequest(qt, denyPartialResponse, func(qt *querytracer.Tracer, workerID uint, sn *storageNode) interface{} {
sns := getStorageNodes()
snr := startStorageNodesRequest(qt, sns, denyPartialResponse, func(qt *querytracer.Tracer, workerID uint, sn *storageNode) interface{} {
sn.tsdbStatusRequests.Inc()
status, err := sn.getTSDBStatus(qt, requestData, focusLabel, topN, deadline)
if err != nil {
@ -1087,7 +1093,8 @@ func SeriesCount(qt *querytracer.Tracer, accountID, projectID uint32, denyPartia
n uint64
err error
}
snr := startStorageNodesRequest(qt, denyPartialResponse, func(qt *querytracer.Tracer, workerID uint, sn *storageNode) interface{} {
sns := getStorageNodes()
snr := startStorageNodesRequest(qt, sns, denyPartialResponse, func(qt *querytracer.Tracer, workerID uint, sn *storageNode) interface{} {
sn.seriesCountRequests.Inc()
n, err := sn.getSeriesCount(qt, accountID, projectID, deadline)
if err != nil {
@ -1122,8 +1129,8 @@ type tmpBlocksFileWrapper struct {
orderedMetricNamess [][]string
}
func newTmpBlocksFileWrapper() *tmpBlocksFileWrapper {
n := len(storageNodes)
func newTmpBlocksFileWrapper(sns []*storageNode) *tmpBlocksFileWrapper {
n := len(sns)
tbfs := make([]*tmpBlocksFile, n)
for i := range tbfs {
tbfs[i] = getTmpBlocksFile()
@ -1210,8 +1217,9 @@ func ExportBlocks(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline sear
MinTimestamp: sq.MinTimestamp,
MaxTimestamp: sq.MaxTimestamp,
}
blocksRead := newPerNodeCounter()
samples := newPerNodeCounter()
sns := getStorageNodes()
blocksRead := newPerNodeCounter(sns)
samples := newPerNodeCounter(sns)
processBlock := func(mb *storage.MetricBlock, workerID uint) error {
mn := metricNamePool.Get().(*storage.MetricName)
if err := mn.Unmarshal(mb.MetricName); err != nil {
@ -1226,7 +1234,7 @@ func ExportBlocks(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline sear
samples.Add(workerID, uint64(mb.Block.RowsCount()))
return nil
}
_, err := ProcessBlocks(qt, true, sq, processBlock, deadline)
_, err := processBlocks(qt, sns, true, sq, processBlock, deadline)
qt.Printf("export blocks=%d, samples=%d, err=%v", blocksRead.GetTotal(), samples.GetTotal(), err)
if err != nil {
return fmt.Errorf("error occured during export: %w", err)
@ -1250,7 +1258,8 @@ func SearchMetricNames(qt *querytracer.Tracer, denyPartialResponse bool, sq *sto
metricNames []string
err error
}
snr := startStorageNodesRequest(qt, denyPartialResponse, func(qt *querytracer.Tracer, workerID uint, sn *storageNode) interface{} {
sns := getStorageNodes()
snr := startStorageNodesRequest(qt, sns, denyPartialResponse, func(qt *querytracer.Tracer, workerID uint, sn *storageNode) interface{} {
sn.searchMetricNamesRequests.Inc()
metricNames, err := sn.processSearchMetricNames(qt, requestData, deadline)
if err != nil {
@ -1303,10 +1312,11 @@ func ProcessSearchQuery(qt *querytracer.Tracer, denyPartialResponse bool, sq *st
MinTimestamp: sq.MinTimestamp,
MaxTimestamp: sq.MaxTimestamp,
}
tbfw := newTmpBlocksFileWrapper()
blocksRead := newPerNodeCounter()
samples := newPerNodeCounter()
maxSamplesPerWorker := uint64(*maxSamplesPerQuery) / uint64(len(storageNodes))
sns := getStorageNodes()
tbfw := newTmpBlocksFileWrapper(sns)
blocksRead := newPerNodeCounter(sns)
samples := newPerNodeCounter(sns)
maxSamplesPerWorker := uint64(*maxSamplesPerQuery) / uint64(len(sns))
processBlock := func(mb *storage.MetricBlock, workerID uint) error {
blocksRead.Add(workerID, 1)
n := samples.Add(workerID, uint64(mb.Block.RowsCount()))
@ -1320,7 +1330,7 @@ func ProcessSearchQuery(qt *querytracer.Tracer, denyPartialResponse bool, sq *st
}
return nil
}
isPartial, err := ProcessBlocks(qt, denyPartialResponse, sq, processBlock, deadline)
isPartial, err := processBlocks(qt, sns, denyPartialResponse, sq, processBlock, deadline)
if err != nil {
closeTmpBlockFiles(tbfw.tbfs)
return nil, false, fmt.Errorf("error occured during search: %w", err)
@ -1348,10 +1358,16 @@ func ProcessSearchQuery(qt *querytracer.Tracer, denyPartialResponse bool, sq *st
// ProcessBlocks calls processBlock per each block matching the given sq.
func ProcessBlocks(qt *querytracer.Tracer, denyPartialResponse bool, sq *storage.SearchQuery,
processBlock func(mb *storage.MetricBlock, workerID uint) error, deadline searchutils.Deadline) (bool, error) {
sns := getStorageNodes()
return processBlocks(qt, sns, denyPartialResponse, sq, processBlock, deadline)
}
func processBlocks(qt *querytracer.Tracer, sns []*storageNode, denyPartialResponse bool, sq *storage.SearchQuery,
processBlock func(mb *storage.MetricBlock, workerID uint) error, deadline searchutils.Deadline) (bool, error) {
requestData := sq.Marshal(nil)
// Make sure that processBlock is no longer called after the exit from ProcessBlocks() function.
// Make sure that processBlock is no longer called after the exit from processBlocks() function.
// Use per-worker WaitGroup instead of a shared WaitGroup in order to avoid inter-CPU contention,
// which may siginificantly slow down the rate of processBlock calls on multi-CPU systems.
type wgStruct struct {
@ -1370,7 +1386,7 @@ func ProcessBlocks(qt *querytracer.Tracer, denyPartialResponse bool, sq *storage
// 128 mod (cache line size) = 0 .
_ [128 - unsafe.Sizeof(wgStruct{})%128]byte
}
wgs := make([]wgWithPadding, len(storageNodes))
wgs := make([]wgWithPadding, len(sns))
f := func(mb *storage.MetricBlock, workerID uint) error {
muwg := &wgs[workerID]
muwg.mu.Lock()
@ -1386,7 +1402,7 @@ func ProcessBlocks(qt *querytracer.Tracer, denyPartialResponse bool, sq *storage
}
// Send the query to all the storage nodes in parallel.
snr := startStorageNodesRequest(qt, denyPartialResponse, func(qt *querytracer.Tracer, workerID uint, sn *storageNode) interface{} {
snr := startStorageNodesRequest(qt, sns, denyPartialResponse, func(qt *querytracer.Tracer, workerID uint, sn *storageNode) interface{} {
sn.searchRequests.Inc()
err := sn.processSearchQuery(qt, requestData, f, workerID, deadline)
if err != nil {
@ -1401,7 +1417,7 @@ func ProcessBlocks(qt *querytracer.Tracer, denyPartialResponse bool, sq *storage
errP := result.(*error)
return *errP
})
// Make sure that processBlock is no longer called after the exit from ProcessBlocks() function.
// Make sure that processBlock is no longer called after the exit from processBlocks() function.
for i := range wgs {
muwg := &wgs[i]
muwg.mu.Lock()
@ -1420,11 +1436,13 @@ func ProcessBlocks(qt *querytracer.Tracer, denyPartialResponse bool, sq *storage
type storageNodesRequest struct {
denyPartialResponse bool
resultsCh chan interface{}
sns []*storageNode
}
func startStorageNodesRequest(qt *querytracer.Tracer, denyPartialResponse bool, f func(qt *querytracer.Tracer, workerID uint, sn *storageNode) interface{}) *storageNodesRequest {
resultsCh := make(chan interface{}, len(storageNodes))
for idx, sn := range storageNodes {
func startStorageNodesRequest(qt *querytracer.Tracer, sns []*storageNode, denyPartialResponse bool,
f func(qt *querytracer.Tracer, workerID uint, sn *storageNode) interface{}) *storageNodesRequest {
resultsCh := make(chan interface{}, len(sns))
for idx, sn := range sns {
qtChild := qt.NewChild("rpc at vmstorage %s", sn.connPool.Addr())
go func(workerID uint, sn *storageNode) {
result := f(qtChild, workerID, sn)
@ -1435,11 +1453,13 @@ func startStorageNodesRequest(qt *querytracer.Tracer, denyPartialResponse bool,
return &storageNodesRequest{
denyPartialResponse: denyPartialResponse,
resultsCh: resultsCh,
sns: sns,
}
}
func (snr *storageNodesRequest) collectAllResults(f func(result interface{}) error) error {
for i := 0; i < len(storageNodes); i++ {
sns := snr.sns
for i := 0; i < len(sns); i++ {
result := <-snr.resultsCh
if err := f(result); err != nil {
// Immediately return the error to the caller without waiting for responses from other vmstorage nodes -
@ -1453,7 +1473,8 @@ func (snr *storageNodesRequest) collectAllResults(f func(result interface{}) err
func (snr *storageNodesRequest) collectResults(partialResultsCounter *metrics.Counter, f func(result interface{}) error) (bool, error) {
var errsPartial []error
resultsCollected := 0
for i := 0; i < len(storageNodes); i++ {
sns := snr.sns
for i := 0; i < len(sns); i++ {
// There is no need in timer here, since all the goroutines executing the f function
// passed to startStorageNodesRequest must be finished until the deadline.
result := <-snr.resultsCh
@ -1475,13 +1496,13 @@ func (snr *storageNodesRequest) collectResults(partialResultsCounter *metrics.Co
continue
}
resultsCollected++
if resultsCollected > len(storageNodes)-*replicationFactor {
if resultsCollected > len(sns)-*replicationFactor {
// There is no need in waiting for the remaining results,
// because the collected results contain all the data according to the given -replicationFactor.
// This should speed up responses when a part of vmstorage nodes are slow and/or temporarily unavailable.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/711
//
// It is expected that cap(snr.resultsCh) == len(storageNodes), otherwise goroutine leak is possible.
// It is expected that cap(snr.resultsCh) == len(sns), otherwise goroutine leak is possible.
return false, nil
}
}
@ -1490,7 +1511,7 @@ func (snr *storageNodesRequest) collectResults(partialResultsCounter *metrics.Co
// is smaller than the -replicationFactor.
return false, nil
}
if len(errsPartial) == len(storageNodes) {
if len(errsPartial) == len(sns) {
// All the vmstorage nodes returned error.
// Return only the first error, since it has no sense in returning all errors.
return false, errsPartial[0]
@ -1501,7 +1522,7 @@ func (snr *storageNodesRequest) collectResults(partialResultsCounter *metrics.Co
partialResultsCounter.Inc()
// Do not return the error, since it may spam logs on busy vmselect
// serving high amounts of requests.
partialErrorsLogger.Warnf("%d out of %d vmstorage nodes were unavailable during the query; a sample error: %s", len(errsPartial), len(storageNodes), errsPartial[0])
partialErrorsLogger.Warnf("%d out of %d vmstorage nodes were unavailable during the query; a sample error: %s", len(errsPartial), len(sns), errsPartial[0])
return true, nil
}
@ -2346,14 +2367,28 @@ func readUint64(bc *handshake.BufferedConn) (uint64, error) {
return n, nil
}
var storageNodes []*storageNode
func getStorageNodes() []*storageNode {
v := storageNodes.Load()
snb := v.(*storageNodesBucket)
return snb.sns
}
// InitStorageNodes initializes storage nodes' connections to the given addrs.
func InitStorageNodes(addrs []string) {
type storageNodesBucket struct {
ms *metrics.Set
sns []*storageNode
}
var storageNodes atomic.Value
// Init initializes storage nodes' connections to the given addrs.
//
// MustStop must be called when the initialized connections are no longer needed.
func Init(addrs []string) {
if len(addrs) == 0 {
logger.Panicf("BUG: addrs must be non-empty")
}
sns := make([]*storageNode, 0, len(addrs))
ms := metrics.NewSet()
for _, addr := range addrs {
if _, _, err := net.SplitHostPort(addr); err != nil {
// Automatically add missing port.
@ -2361,39 +2396,50 @@ func InitStorageNodes(addrs []string) {
}
sn := &storageNode{
// There is no need in requests compression, since they are usually very small.
connPool: netutil.NewConnPool("vmselect", addr, handshake.VMSelectClient, 0, *vmstorageDialTimeout),
connPool: netutil.NewConnPool(ms, "vmselect", addr, handshake.VMSelectClient, 0, *vmstorageDialTimeout),
concurrentQueries: metrics.NewCounter(fmt.Sprintf(`vm_concurrent_queries{name="vmselect", addr=%q}`, addr)),
concurrentQueries: ms.NewCounter(fmt.Sprintf(`vm_concurrent_queries{name="vmselect", addr=%q}`, addr)),
registerMetricNamesRequests: metrics.NewCounter(fmt.Sprintf(`vm_requests_total{action="registerMetricNames", type="rpcClient", name="vmselect", addr=%q}`, addr)),
registerMetricNamesErrors: metrics.NewCounter(fmt.Sprintf(`vm_request_errors_total{action="registerMetricNames", type="rpcClient", name="vmselect", addr=%q}`, addr)),
deleteSeriesRequests: metrics.NewCounter(fmt.Sprintf(`vm_requests_total{action="deleteSeries", type="rpcClient", name="vmselect", addr=%q}`, addr)),
deleteSeriesErrors: metrics.NewCounter(fmt.Sprintf(`vm_request_errors_total{action="deleteSeries", type="rpcClient", name="vmselect", addr=%q}`, addr)),
labelNamesRequests: metrics.NewCounter(fmt.Sprintf(`vm_requests_total{action="labelNames", type="rpcClient", name="vmselect", addr=%q}`, addr)),
labelNamesErrors: metrics.NewCounter(fmt.Sprintf(`vm_request_errors_total{action="labelNames", type="rpcClient", name="vmselect", addr=%q}`, addr)),
labelValuesRequests: metrics.NewCounter(fmt.Sprintf(`vm_requests_total{action="labelValues", type="rpcClient", name="vmselect", addr=%q}`, addr)),
labelValuesErrors: metrics.NewCounter(fmt.Sprintf(`vm_request_errors_total{action="labelValues", type="rpcClient", name="vmselect", addr=%q}`, addr)),
tagValueSuffixesRequests: metrics.NewCounter(fmt.Sprintf(`vm_requests_total{action="tagValueSuffixes", type="rpcClient", name="vmselect", addr=%q}`, addr)),
tagValueSuffixesErrors: metrics.NewCounter(fmt.Sprintf(`vm_request_errors_total{action="tagValueSuffixes", type="rpcClient", name="vmselect", addr=%q}`, addr)),
tsdbStatusRequests: metrics.NewCounter(fmt.Sprintf(`vm_requests_total{action="tsdbStatus", type="rpcClient", name="vmselect", addr=%q}`, addr)),
tsdbStatusErrors: metrics.NewCounter(fmt.Sprintf(`vm_request_errors_total{action="tsdbStatus", type="rpcClient", name="vmselect", addr=%q}`, addr)),
seriesCountRequests: metrics.NewCounter(fmt.Sprintf(`vm_requests_total{action="seriesCount", type="rpcClient", name="vmselect", addr=%q}`, addr)),
seriesCountErrors: metrics.NewCounter(fmt.Sprintf(`vm_request_errors_total{action="seriesCount", type="rpcClient", name="vmselect", addr=%q}`, addr)),
searchMetricNamesRequests: metrics.NewCounter(fmt.Sprintf(`vm_requests_total{action="searchMetricNames", type="rpcClient", name="vmselect", addr=%q}`, addr)),
searchMetricNamesErrors: metrics.NewCounter(fmt.Sprintf(`vm_request_errors_total{action="searchMetricNames", type="rpcClient", name="vmselect", addr=%q}`, addr)),
searchRequests: metrics.NewCounter(fmt.Sprintf(`vm_requests_total{action="search", type="rpcClient", name="vmselect", addr=%q}`, addr)),
searchErrors: metrics.NewCounter(fmt.Sprintf(`vm_request_errors_total{action="search", type="rpcClient", name="vmselect", addr=%q}`, addr)),
registerMetricNamesRequests: ms.NewCounter(fmt.Sprintf(`vm_requests_total{action="registerMetricNames", type="rpcClient", name="vmselect", addr=%q}`, addr)),
registerMetricNamesErrors: ms.NewCounter(fmt.Sprintf(`vm_request_errors_total{action="registerMetricNames", type="rpcClient", name="vmselect", addr=%q}`, addr)),
deleteSeriesRequests: ms.NewCounter(fmt.Sprintf(`vm_requests_total{action="deleteSeries", type="rpcClient", name="vmselect", addr=%q}`, addr)),
deleteSeriesErrors: ms.NewCounter(fmt.Sprintf(`vm_request_errors_total{action="deleteSeries", type="rpcClient", name="vmselect", addr=%q}`, addr)),
labelNamesRequests: ms.NewCounter(fmt.Sprintf(`vm_requests_total{action="labelNames", type="rpcClient", name="vmselect", addr=%q}`, addr)),
labelNamesErrors: ms.NewCounter(fmt.Sprintf(`vm_request_errors_total{action="labelNames", type="rpcClient", name="vmselect", addr=%q}`, addr)),
labelValuesRequests: ms.NewCounter(fmt.Sprintf(`vm_requests_total{action="labelValues", type="rpcClient", name="vmselect", addr=%q}`, addr)),
labelValuesErrors: ms.NewCounter(fmt.Sprintf(`vm_request_errors_total{action="labelValues", type="rpcClient", name="vmselect", addr=%q}`, addr)),
tagValueSuffixesRequests: ms.NewCounter(fmt.Sprintf(`vm_requests_total{action="tagValueSuffixes", type="rpcClient", name="vmselect", addr=%q}`, addr)),
tagValueSuffixesErrors: ms.NewCounter(fmt.Sprintf(`vm_request_errors_total{action="tagValueSuffixes", type="rpcClient", name="vmselect", addr=%q}`, addr)),
tsdbStatusRequests: ms.NewCounter(fmt.Sprintf(`vm_requests_total{action="tsdbStatus", type="rpcClient", name="vmselect", addr=%q}`, addr)),
tsdbStatusErrors: ms.NewCounter(fmt.Sprintf(`vm_request_errors_total{action="tsdbStatus", type="rpcClient", name="vmselect", addr=%q}`, addr)),
seriesCountRequests: ms.NewCounter(fmt.Sprintf(`vm_requests_total{action="seriesCount", type="rpcClient", name="vmselect", addr=%q}`, addr)),
seriesCountErrors: ms.NewCounter(fmt.Sprintf(`vm_request_errors_total{action="seriesCount", type="rpcClient", name="vmselect", addr=%q}`, addr)),
searchMetricNamesRequests: ms.NewCounter(fmt.Sprintf(`vm_requests_total{action="searchMetricNames", type="rpcClient", name="vmselect", addr=%q}`, addr)),
searchMetricNamesErrors: ms.NewCounter(fmt.Sprintf(`vm_request_errors_total{action="searchMetricNames", type="rpcClient", name="vmselect", addr=%q}`, addr)),
searchRequests: ms.NewCounter(fmt.Sprintf(`vm_requests_total{action="search", type="rpcClient", name="vmselect", addr=%q}`, addr)),
searchErrors: ms.NewCounter(fmt.Sprintf(`vm_request_errors_total{action="search", type="rpcClient", name="vmselect", addr=%q}`, addr)),
metricBlocksRead: metrics.NewCounter(fmt.Sprintf(`vm_metric_blocks_read_total{name="vmselect", addr=%q}`, addr)),
metricRowsRead: metrics.NewCounter(fmt.Sprintf(`vm_metric_rows_read_total{name="vmselect", addr=%q}`, addr)),
metricBlocksRead: ms.NewCounter(fmt.Sprintf(`vm_metric_blocks_read_total{name="vmselect", addr=%q}`, addr)),
metricRowsRead: ms.NewCounter(fmt.Sprintf(`vm_metric_rows_read_total{name="vmselect", addr=%q}`, addr)),
}
storageNodes = append(storageNodes, sn)
sns = append(sns, sn)
}
metrics.RegisterSet(ms)
storageNodes.Store(&storageNodesBucket{
sns: sns,
ms: ms,
})
}
// Stop gracefully stops netstorage.
func Stop() {
// Nothing to do at the moment.
// MustStop gracefully stops netstorage.
func MustStop() {
snb := storageNodes.Load().(*storageNodesBucket)
storageNodes.Store(&storageNodesBucket{})
for _, sn := range snb.sns {
sn.connPool.MustStop()
}
metrics.UnregisterSet(snb.ms)
snb.ms.UnregisterAllMetrics()
}
var (
@ -2434,9 +2480,9 @@ type perNodeCounter struct {
ns []uint64WithPadding
}
func newPerNodeCounter() *perNodeCounter {
func newPerNodeCounter(sns []*storageNode) *perNodeCounter {
return &perNodeCounter{
ns: make([]uint64WithPadding, len(storageNodes)),
ns: make([]uint64WithPadding, len(sns)),
}
}

View file

@ -2,9 +2,18 @@ package netstorage
import (
"reflect"
"runtime"
"testing"
)
func TestInitStopNodes(t *testing.T) {
for i := 0; i < 3; i++ {
Init([]string{"host1", "host2"})
runtime.Gosched()
MustStop()
}
}
func TestMergeSortBlocks(t *testing.T) {
f := func(blocks []*sortBlock, dedupInterval int64, expectedResult *Result) {
t.Helper()

2
go.mod
View file

@ -10,7 +10,7 @@ require (
// Do not use the original github.com/valyala/fasthttp because of issues
// like https://github.com/valyala/fasthttp/commit/996610f021ff45fdc98c2ce7884d5fa4e7f9199b
github.com/VictoriaMetrics/fasthttp v1.1.0
github.com/VictoriaMetrics/metrics v1.22.2
github.com/VictoriaMetrics/metrics v1.23.0
github.com/VictoriaMetrics/metricsql v0.45.0
github.com/aws/aws-sdk-go-v2 v1.17.0
github.com/aws/aws-sdk-go-v2/config v1.17.9

4
go.sum
View file

@ -95,8 +95,8 @@ github.com/VictoriaMetrics/fastcache v1.12.0/go.mod h1:tjiYeEfYXCqacuvYw/7UoDIeJ
github.com/VictoriaMetrics/fasthttp v1.1.0 h1:3crd4YWHsMwu60GUXRH6OstowiFvqrwS4a/ueoLdLL0=
github.com/VictoriaMetrics/fasthttp v1.1.0/go.mod h1:/7DMcogqd+aaD3G3Hg5kFgoFwlR2uydjiWvoLp5ZTqQ=
github.com/VictoriaMetrics/metrics v1.18.1/go.mod h1:ArjwVz7WpgpegX/JpB0zpNF2h2232kErkEnzH1sxMmA=
github.com/VictoriaMetrics/metrics v1.22.2 h1:A6LsNidYwkAHetxsvNFaUWjtzu5ltdgNEoS6i7Bn+6I=
github.com/VictoriaMetrics/metrics v1.22.2/go.mod h1:rAr/llLpEnAdTehiNlUxKgnjcOuROSzpw0GvjpEbvFc=
github.com/VictoriaMetrics/metrics v1.23.0 h1:WzfqyzCaxUZip+OBbg1+lV33WChDSu4ssYII3nxtpeA=
github.com/VictoriaMetrics/metrics v1.23.0/go.mod h1:rAr/llLpEnAdTehiNlUxKgnjcOuROSzpw0GvjpEbvFc=
github.com/VictoriaMetrics/metricsql v0.45.0 h1:kVQHnkDJm4qyJ8f5msTclmwqAtlUdPbbEJ7zoa/FTNs=
github.com/VictoriaMetrics/metricsql v0.45.0/go.mod h1:6pP1ZeLVJHqJrHlF6Ij3gmpQIznSsgktEcZgsAWYel0=
github.com/VividCortex/ewma v1.1.1/go.mod h1:2Tkkvm3sRDVXaiyucHiACn4cqf7DpdyLvmxzcbUokwA=

View file

@ -26,20 +26,20 @@ type connMetrics struct {
conns *metrics.Counter
}
func (cm *connMetrics) init(group, name, addr string) {
cm.readCalls = metrics.NewCounter(fmt.Sprintf(`%s_read_calls_total{name=%q, addr=%q}`, group, name, addr))
cm.readBytes = metrics.NewCounter(fmt.Sprintf(`%s_read_bytes_total{name=%q, addr=%q}`, group, name, addr))
cm.readErrors = metrics.NewCounter(fmt.Sprintf(`%s_errors_total{name=%q, addr=%q, type="read"}`, group, name, addr))
cm.readTimeouts = metrics.NewCounter(fmt.Sprintf(`%s_read_timeouts_total{name=%q, addr=%q}`, group, name, addr))
func (cm *connMetrics) init(ms *metrics.Set, group, name, addr string) {
cm.readCalls = ms.NewCounter(fmt.Sprintf(`%s_read_calls_total{name=%q, addr=%q}`, group, name, addr))
cm.readBytes = ms.NewCounter(fmt.Sprintf(`%s_read_bytes_total{name=%q, addr=%q}`, group, name, addr))
cm.readErrors = ms.NewCounter(fmt.Sprintf(`%s_errors_total{name=%q, addr=%q, type="read"}`, group, name, addr))
cm.readTimeouts = ms.NewCounter(fmt.Sprintf(`%s_read_timeouts_total{name=%q, addr=%q}`, group, name, addr))
cm.writeCalls = metrics.NewCounter(fmt.Sprintf(`%s_write_calls_total{name=%q, addr=%q}`, group, name, addr))
cm.writtenBytes = metrics.NewCounter(fmt.Sprintf(`%s_written_bytes_total{name=%q, addr=%q}`, group, name, addr))
cm.writeErrors = metrics.NewCounter(fmt.Sprintf(`%s_errors_total{name=%q, addr=%q, type="write"}`, group, name, addr))
cm.writeTimeouts = metrics.NewCounter(fmt.Sprintf(`%s_write_timeouts_total{name=%q, addr=%q}`, group, name, addr))
cm.writeCalls = ms.NewCounter(fmt.Sprintf(`%s_write_calls_total{name=%q, addr=%q}`, group, name, addr))
cm.writtenBytes = ms.NewCounter(fmt.Sprintf(`%s_written_bytes_total{name=%q, addr=%q}`, group, name, addr))
cm.writeErrors = ms.NewCounter(fmt.Sprintf(`%s_errors_total{name=%q, addr=%q, type="write"}`, group, name, addr))
cm.writeTimeouts = ms.NewCounter(fmt.Sprintf(`%s_write_timeouts_total{name=%q, addr=%q}`, group, name, addr))
cm.closeErrors = metrics.NewCounter(fmt.Sprintf(`%s_errors_total{name=%q, addr=%q, type="close"}`, group, name, addr))
cm.closeErrors = ms.NewCounter(fmt.Sprintf(`%s_errors_total{name=%q, addr=%q, type="close"}`, group, name, addr))
cm.conns = metrics.NewCounter(fmt.Sprintf(`%s_conns{name=%q, addr=%q}`, group, name, addr))
cm.conns = ms.NewCounter(fmt.Sprintf(`%s_conns{name=%q, addr=%q}`, group, name, addr))
}
type statConn struct {

View file

@ -7,6 +7,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/handshake"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/metrics"
)
@ -27,6 +28,8 @@ type ConnPool struct {
conns []connWithTimestamp
isStopped bool
// lastDialError contains the last error seen when dialing remote addr.
// When it is non-nil and conns is empty, then ConnPool.Get() return this error.
// This reduces the time needed for dialing unavailable remote storage systems.
@ -41,12 +44,14 @@ type connWithTimestamp struct {
// NewConnPool creates a new connection pool for the given addr.
//
// Name is used in exported metrics.
// Name is used in metrics registered at ms.
// handshakeFunc is used for handshaking after the connection establishing.
// The compression is disabled if compressionLevel <= 0.
func NewConnPool(name, addr string, handshakeFunc handshake.Func, compressionLevel int, dialTimeout time.Duration) *ConnPool {
//
// Call ConnPool.MustStop when the returned ConnPool is no longer needed.
func NewConnPool(ms *metrics.Set, name, addr string, handshakeFunc handshake.Func, compressionLevel int, dialTimeout time.Duration) *ConnPool {
cp := &ConnPool{
d: NewTCPDialer(name, addr, dialTimeout),
d: NewTCPDialer(ms, name, addr, dialTimeout),
concurrentDialsCh: make(chan struct{}, 8),
name: name,
@ -54,13 +59,13 @@ func NewConnPool(name, addr string, handshakeFunc handshake.Func, compressionLev
compressionLevel: compressionLevel,
}
cp.checkAvailability(true)
_ = metrics.NewGauge(fmt.Sprintf(`vm_tcpdialer_conns_idle{name=%q, addr=%q}`, name, addr), func() float64 {
_ = ms.NewGauge(fmt.Sprintf(`vm_tcpdialer_conns_idle{name=%q, addr=%q}`, name, addr), func() float64 {
cp.mu.Lock()
n := len(cp.conns)
cp.mu.Unlock()
return float64(n)
})
_ = metrics.NewGauge(fmt.Sprintf(`vm_tcpdialer_addr_available{name=%q, addr=%q}`, name, addr), func() float64 {
_ = ms.NewGauge(fmt.Sprintf(`vm_tcpdialer_addr_available{name=%q, addr=%q}`, name, addr), func() float64 {
cp.mu.Lock()
isAvailable := len(cp.conns) > 0 || cp.lastDialError == nil
cp.mu.Unlock()
@ -75,6 +80,40 @@ func NewConnPool(name, addr string, handshakeFunc handshake.Func, compressionLev
return cp
}
// MustStop frees up resources occupied by cp.
//
// ConnPool.Get() immediately returns an error after MustStop call.
// ConnPool.Put() immediately closes the connection returned to the pool.
func (cp *ConnPool) MustStop() {
cp.mu.Lock()
isStopped := cp.isStopped
cp.isStopped = true
for _, c := range cp.conns {
_ = c.bc.Close()
}
cp.conns = nil
cp.mu.Unlock()
if isStopped {
logger.Panicf("BUG: MustStop is called multiple times")
}
connPoolsMu.Lock()
cpDeleted := false
for i, cpTmp := range connPools {
if cpTmp == cp {
connPoolsNew := append(connPools[:i], connPools[i+1:]...)
connPools[len(connPools)-1] = nil
connPools = connPoolsNew
cpDeleted = true
break
}
}
connPoolsMu.Unlock()
if !cpDeleted {
logger.Panicf("BUG: couldn't find the ConnPool in connPools")
}
}
// Addr returns the address where connections are established.
func (cp *ConnPool) Addr() string {
return cp.d.addr
@ -138,6 +177,10 @@ func (cp *ConnPool) dialAndHandshake() (*handshake.BufferedConn, error) {
func (cp *ConnPool) tryGetConn() (*handshake.BufferedConn, error) {
cp.mu.Lock()
defer cp.mu.Unlock()
if cp.isStopped {
return nil, fmt.Errorf("conn pool to %s cannot be used, since it is stopped", cp.d.addr)
}
if len(cp.conns) == 0 {
return nil, cp.lastDialError
}
@ -159,10 +202,14 @@ func (cp *ConnPool) Put(bc *handshake.BufferedConn) {
return
}
cp.mu.Lock()
cp.conns = append(cp.conns, connWithTimestamp{
bc: bc,
lastActiveTime: fasttime.UnixTimestamp(),
})
if cp.isStopped {
_ = bc.Close()
} else {
cp.conns = append(cp.conns, connWithTimestamp{
bc: bc,
lastActiveTime: fasttime.UnixTimestamp(),
})
}
cp.mu.Unlock()
}
@ -188,8 +235,12 @@ func (cp *ConnPool) closeIdleConns() {
func (cp *ConnPool) checkAvailability(force bool) {
cp.mu.Lock()
isStopped := cp.isStopped
hasDialError := cp.lastDialError != nil
cp.mu.Unlock()
if isStopped {
return
}
if hasDialError || force {
bc, _ := cp.dialAndHandshake()
if bc != nil {

View file

@ -0,0 +1,61 @@
package netutil
import (
"fmt"
"testing"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/handshake"
"github.com/VictoriaMetrics/metrics"
)
func TestConnPoolStartStopSerial(t *testing.T) {
ms := metrics.NewSet()
testConnPoolStartStop(t, "foobar", ms)
ms.UnregisterAllMetrics()
}
func TestConnPoolStartStopConcurrent(t *testing.T) {
ms := metrics.NewSet()
concurrency := 5
ch := make(chan struct{})
for i := 0; i < concurrency; i++ {
name := fmt.Sprintf("foobar_%d", i)
go func() {
testConnPoolStartStop(t, name, ms)
ch <- struct{}{}
}()
}
tc := time.NewTimer(time.Second * 5)
for i := 0; i < concurrency; i++ {
select {
case <-tc.C:
t.Fatalf("timeout")
case <-ch:
}
}
tc.Stop()
ms.UnregisterAllMetrics()
}
func testConnPoolStartStop(t *testing.T, name string, ms *metrics.Set) {
dialTimeout := 5 * time.Second
compressLevel := 1
var cps []*ConnPool
for i := 0; i < 5; i++ {
addr := fmt.Sprintf("host-%d", i)
cp := NewConnPool(ms, name, addr, handshake.VMSelectClient, compressLevel, dialTimeout)
cps = append(cps, cp)
}
for _, cp := range cps {
cp.MustStop()
// Make sure that Get works properly after MustStop()
c, err := cp.Get()
if err == nil {
t.Fatalf("expecting non-nil error after MustStop()")
}
if c != nil {
t.Fatalf("expecting nil conn after MustStop()")
}
}
}

View file

@ -12,7 +12,7 @@ import (
//
// The name is used in metric tags for the returned dialer.
// The name must be unique among dialers.
func NewTCPDialer(name, addr string, dialTimeout time.Duration) *TCPDialer {
func NewTCPDialer(ms *metrics.Set, name, addr string, dialTimeout time.Duration) *TCPDialer {
d := &TCPDialer{
d: &net.Dialer{
Timeout: dialTimeout,
@ -23,10 +23,10 @@ func NewTCPDialer(name, addr string, dialTimeout time.Duration) *TCPDialer {
addr: addr,
dials: metrics.NewCounter(fmt.Sprintf(`vm_tcpdialer_dials_total{name=%q, addr=%q}`, name, addr)),
dialErrors: metrics.NewCounter(fmt.Sprintf(`vm_tcpdialer_errors_total{name=%q, addr=%q, type="dial"}`, name, addr)),
dials: ms.NewCounter(fmt.Sprintf(`vm_tcpdialer_dials_total{name=%q, addr=%q}`, name, addr)),
dialErrors: ms.NewCounter(fmt.Sprintf(`vm_tcpdialer_errors_total{name=%q, addr=%q, type="dial"}`, name, addr)),
}
d.connMetrics.init("vm_tcpdialer", name, addr)
d.connMetrics.init(ms, "vm_tcpdialer", name, addr)
return d
}

View file

@ -16,8 +16,7 @@ var enableTCP6 = flag.Bool("enableTCP6", false, "Whether to enable IPv6 for list
// NewTCPListener returns new TCP listener for the given addr and optional tlsConfig.
//
// name is used for exported metrics. Each listener in the program must have
// distinct name.
// name is used for metrics registered in ms. Each listener in the program must have distinct name.
func NewTCPListener(name, addr string, tlsConfig *tls.Config) (*TCPListener, error) {
network := GetTCPNetwork()
ln, err := net.Listen(network, addr)
@ -27,13 +26,14 @@ func NewTCPListener(name, addr string, tlsConfig *tls.Config) (*TCPListener, err
if tlsConfig != nil {
ln = tls.NewListener(ln, tlsConfig)
}
ms := metrics.GetDefaultSet()
tln := &TCPListener{
Listener: ln,
accepts: metrics.NewCounter(fmt.Sprintf(`vm_tcplistener_accepts_total{name=%q, addr=%q}`, name, addr)),
acceptErrors: metrics.NewCounter(fmt.Sprintf(`vm_tcplistener_errors_total{name=%q, addr=%q, type="accept"}`, name, addr)),
accepts: ms.NewCounter(fmt.Sprintf(`vm_tcplistener_accepts_total{name=%q, addr=%q}`, name, addr)),
acceptErrors: ms.NewCounter(fmt.Sprintf(`vm_tcplistener_errors_total{name=%q, addr=%q, type="accept"}`, name, addr)),
}
tln.connMetrics.init("vm_tcplistener", name, addr)
tln.connMetrics.init(ms, "vm_tcplistener", name, addr)
return tln, err
}

View file

@ -22,6 +22,7 @@ import (
type namedMetric struct {
name string
metric metric
isAux bool
}
type metric interface {
@ -49,6 +50,8 @@ func RegisterSet(s *Set) {
}
// UnregisterSet stops exporting metrics for the given s via global WritePrometheus() call.
//
// Call s.UnregisterAllMetrics() after unregistering s if it is no longer used.
func UnregisterSet(s *Set) {
registeredSetsLock.Lock()
delete(registeredSets, s)
@ -180,11 +183,23 @@ func WriteFDMetrics(w io.Writer) {
}
// UnregisterMetric removes metric with the given name from default set.
//
// See also UnregisterAllMetrics.
func UnregisterMetric(name string) bool {
return defaultSet.UnregisterMetric(name)
}
// ListMetricNames returns a list of all the metric names from default set.
// UnregisterAllMetrics unregisters all the metrics from default set.
func UnregisterAllMetrics() {
defaultSet.UnregisterAllMetrics()
}
// ListMetricNames returns sorted list of all the metric names from default set.
func ListMetricNames() []string {
return defaultSet.ListMetricNames()
}
// GetDefaultSet returns the default metrics set.
func GetDefaultSet() *Set {
return defaultSet
}

View file

@ -336,7 +336,7 @@ func (s *Set) NewSummaryExt(name string, window time.Duration, quantiles []float
// checks in tests
defer s.mu.Unlock()
s.mustRegisterLocked(name, sm)
s.mustRegisterLocked(name, sm, false)
registerSummaryLocked(sm)
s.registerSummaryQuantilesLocked(name, sm)
s.summaries = append(s.summaries, sm)
@ -420,7 +420,7 @@ func (s *Set) registerSummaryQuantilesLocked(name string, sm *Summary) {
sm: sm,
idx: i,
}
s.mustRegisterLocked(quantileValueName, qv)
s.mustRegisterLocked(quantileValueName, qv, true)
}
}
@ -432,18 +432,19 @@ func (s *Set) registerMetric(name string, m metric) {
// defer will unlock in case of panic
// checks in test
defer s.mu.Unlock()
s.mustRegisterLocked(name, m)
s.mustRegisterLocked(name, m, false)
}
// mustRegisterLocked registers given metric with
// the given name. Panics if the given name was
// already registered before.
func (s *Set) mustRegisterLocked(name string, m metric) {
// mustRegisterLocked registers given metric with the given name.
//
// Panics if the given name was already registered before.
func (s *Set) mustRegisterLocked(name string, m metric, isAux bool) {
nm, ok := s.m[name]
if !ok {
nm = &namedMetric{
name: name,
metric: m,
isAux: isAux,
}
s.m[name] = nm
s.a = append(s.a, nm)
@ -465,8 +466,16 @@ func (s *Set) UnregisterMetric(name string) bool {
if !ok {
return false
}
m := nm.metric
if nm.isAux {
// Do not allow deleting auxiliary metrics such as summary_metric{quantile="..."}
// Such metrics must be deleted via parent metric name, e.g. summary_metric .
return false
}
return s.unregisterMetricLocked(nm)
}
func (s *Set) unregisterMetricLocked(nm *namedMetric) bool {
name := nm.name
delete(s.m, name)
deleteFromList := func(metricName string) {
@ -482,9 +491,9 @@ func (s *Set) UnregisterMetric(name string) bool {
// remove metric from s.a
deleteFromList(name)
sm, ok := m.(*Summary)
sm, ok := nm.metric.(*Summary)
if !ok {
// There is no need in cleaning up summary.
// There is no need in cleaning up non-summary metrics.
return true
}
@ -511,13 +520,25 @@ func (s *Set) UnregisterMetric(name string) bool {
return true
}
// ListMetricNames returns a list of all the metrics in s.
// UnregisterAllMetrics de-registers all metrics registered in s.
func (s *Set) UnregisterAllMetrics() {
metricNames := s.ListMetricNames()
for _, name := range metricNames {
s.UnregisterMetric(name)
}
}
// ListMetricNames returns sorted list of all the metrics in s.
func (s *Set) ListMetricNames() []string {
s.mu.Lock()
defer s.mu.Unlock()
var list []string
for name := range s.m {
list = append(list, name)
metricNames := make([]string, 0, len(s.m))
for _, nm := range s.m {
if nm.isAux {
continue
}
metricNames = append(metricNames, nm.name)
}
return list
sort.Strings(metricNames)
return metricNames
}

2
vendor/modules.txt vendored
View file

@ -62,7 +62,7 @@ github.com/VictoriaMetrics/fastcache
github.com/VictoriaMetrics/fasthttp
github.com/VictoriaMetrics/fasthttp/fasthttputil
github.com/VictoriaMetrics/fasthttp/stackless
# github.com/VictoriaMetrics/metrics v1.22.2
# github.com/VictoriaMetrics/metrics v1.23.0
## explicit; go 1.15
github.com/VictoriaMetrics/metrics
# github.com/VictoriaMetrics/metricsql v0.45.0