app/vminsert/netstorage: throttle warning logs, which can be too verbose when vminsert cannot send data to vmstorage nodes

This commit is contained in:
Aliaksandr Valialkin 2022-02-07 15:38:54 +02:00
parent b4b40774ec
commit 755e26e67b
No known key found for this signature in database
GPG key ID: A72BEC6CD3D0DED1

View file

@ -60,7 +60,9 @@ func (sn *storageNode) push(buf []byte, rows int) error {
}
if *dropSamplesOnOverload {
sn.rowsDroppedOnOverload.Add(rows)
logger.WithThrottler("droppedSamplesOnOverload", 5*time.Second).Warnf("some rows dropped, because -dropSamplesOnOverload is set and vmstorage %s cannot accept new rows now. See vm_rpc_rows_dropped_on_overload_total metric at /metrics page", sn.dialer.Addr())
logger.WithThrottler("droppedSamplesOnOverload", 5*time.Second).Warnf(
"some rows dropped, because -dropSamplesOnOverload is set and vmstorage %s cannot accept new rows now. "+
"See vm_rpc_rows_dropped_on_overload_total metric at /metrics page", sn.dialer.Addr())
return nil
}
// Slow path - sn cannot accept buf now, so re-route it to other vmstorage nodes.
@ -192,7 +194,7 @@ func (sn *storageNode) run(stopCh <-chan struct{}, snIdx int) {
}
func sendBufToReplicasNonblocking(br *bufRows, snIdx, replicas int) bool {
usedStorageNodes := make(map[*storageNode]bool, replicas)
usedStorageNodes := make(map[*storageNode]struct{}, replicas)
for i := 0; i < replicas; i++ {
idx := snIdx + i
attempts := 0
@ -201,16 +203,18 @@ func sendBufToReplicasNonblocking(br *bufRows, snIdx, replicas int) bool {
if attempts > len(storageNodes) {
if i == 0 {
// The data wasn't replicated at all.
logger.Warnf("cannot push %d bytes with %d rows to storage nodes, since all the nodes are temporarily unavailable; "+
"re-trying to send the data soon", len(br.buf), br.rows)
logger.WithThrottler("cannotReplicateDataBecauseNoStorageNodes", 5*time.Second).Warnf(
"cannot push %d bytes with %d rows to storage nodes, since all the nodes are temporarily unavailable; "+
"re-trying to send the data soon", len(br.buf), br.rows)
return false
}
// The data is partially replicated, so just emit a warning and return true.
// We could retry sending the data again, but this may result in uncontrolled duplicate data.
// So it is better returning true.
rowsIncompletelyReplicatedTotal.Add(br.rows)
logger.Warnf("cannot make a copy #%d out of %d copies according to -replicationFactor=%d for %d bytes with %d rows, "+
"since a part of storage nodes is temporarily unavailable", i+1, replicas, *replicationFactor, len(br.buf), br.rows)
logger.WithThrottler("incompleteReplication", 5*time.Second).Warnf(
"cannot make a copy #%d out of %d copies according to -replicationFactor=%d for %d bytes with %d rows, "+
"since a part of storage nodes is temporarily unavailable", i+1, replicas, *replicationFactor, len(br.buf), br.rows)
return true
}
if idx >= len(storageNodes) {
@ -218,7 +222,7 @@ func sendBufToReplicasNonblocking(br *bufRows, snIdx, replicas int) bool {
}
sn := storageNodes[idx]
idx++
if usedStorageNodes[sn] {
if _, ok := usedStorageNodes[sn]; ok {
// The br has been already replicated to sn. Skip it.
continue
}
@ -227,7 +231,7 @@ func sendBufToReplicasNonblocking(br *bufRows, snIdx, replicas int) bool {
continue
}
// Successfully sent data to sn.
usedStorageNodes[sn] = true
usedStorageNodes[sn] = struct{}{}
break
}
}
@ -291,10 +295,11 @@ func (sn *storageNode) sendBufRowsNonblocking(br *bufRows) bool {
return false
}
// Couldn't flush buf to sn. Mark sn as broken.
logger.Warnf("cannot send %d bytes with %d rows to -storageNode=%q: %s; closing the connection to storageNode and "+
"re-routing this data to healthy storage nodes", len(br.buf), br.rows, sn.dialer.Addr(), err)
logger.WithThrottler("cannotSendBufRows", 5*time.Second).Warnf(
"cannot send %d bytes with %d rows to -storageNode=%q: %s; closing the connection to storageNode and "+
"re-routing this data to healthy storage nodes", len(br.buf), br.rows, sn.dialer.Addr(), err)
if err = sn.bc.Close(); err != nil {
logger.Warnf("cannot close connection to storageNode %q: %s", sn.dialer.Addr(), err)
logger.WithThrottler("cannotCloseStorageNodeConn", 5*time.Second).Warnf("cannot close connection to storageNode %q: %s", sn.dialer.Addr(), err)
}
sn.bc = nil
atomic.StoreUint32(&sn.broken, 1)
@ -657,12 +662,13 @@ func getNotReadyStorageNodeIdxsBlocking(dst []int, snExtra *storageNode) []int {
if len(dst) < len(storageNodes) {
return dst
}
logger.Warnf("all the vmstorage nodes are unavailable; stopping data processing util at least a single node becomes available")
logger.WithThrottler("storageNodesUnavailable", 5*time.Second).Warnf(
"all the vmstorage nodes are unavailable; stopping data processing util at least a single node becomes available")
for {
time.Sleep(time.Second)
dst = getNotReadyStorageNodeIdxs(dst[:0], snExtra)
if availableNodes := len(storageNodes) - len(dst); availableNodes > 0 {
logger.Warnf("%d vmstorage nodes became available, so continue data processing", availableNodes)
logger.WithThrottler("storageNodesBecameAvailable", 5*time.Second).Warnf("%d vmstorage nodes became available, so continue data processing", availableNodes)
return dst
}
}