all: locate throttled loggers via logger.WithThrottler() only once and then use them

This reduces the contention on logThrottlerRegistryMu mutex when logger.WithThrottler()
is called frequently from concurrent goroutines.
This commit is contained in:
Aliaksandr Valialkin 2022-06-27 12:31:16 +03:00
parent 08de733924
commit 6c66804fd3
No known key found for this signature in database
GPG key ID: A72BEC6CD3D0DED1
4 changed files with 30 additions and 20 deletions

View file

@ -345,13 +345,12 @@ again:
if statusCode == 409 || statusCode == 400 { if statusCode == 409 || statusCode == 400 {
body, err := ioutil.ReadAll(resp.Body) body, err := ioutil.ReadAll(resp.Body)
_ = resp.Body.Close() _ = resp.Body.Close()
l := logger.WithThrottler("remoteWriteRejected", 5*time.Second)
if err != nil { if err != nil {
l.Errorf("sending a block with size %d bytes to %q was rejected (skipping the block): status code %d; "+ remoteWriteRejectedLogger.Errorf("sending a block with size %d bytes to %q was rejected (skipping the block): status code %d; "+
"failed to read response body: %s", "failed to read response body: %s",
len(block), c.sanitizedURL, statusCode, err) len(block), c.sanitizedURL, statusCode, err)
} else { } else {
l.Errorf("sending a block with size %d bytes to %q was rejected (skipping the block): status code %d; response body: %s", remoteWriteRejectedLogger.Errorf("sending a block with size %d bytes to %q was rejected (skipping the block): status code %d; response body: %s",
len(block), c.sanitizedURL, statusCode, string(body)) len(block), c.sanitizedURL, statusCode, string(body))
} }
// Just drop block on 409 and 400 status codes like Prometheus does. // Just drop block on 409 and 400 status codes like Prometheus does.
@ -388,6 +387,8 @@ again:
goto again goto again
} }
var remoteWriteRejectedLogger = logger.WithThrottler("remoteWriteRejected", 5*time.Second)
type rateLimiter struct { type rateLimiter struct {
perSecondLimit int64 perSecondLimit int64

View file

@ -61,9 +61,8 @@ func (sn *storageNode) push(buf []byte, rows int) error {
} }
if *dropSamplesOnOverload && atomic.LoadUint32(&sn.isReadOnly) == 0 { if *dropSamplesOnOverload && atomic.LoadUint32(&sn.isReadOnly) == 0 {
sn.rowsDroppedOnOverload.Add(rows) sn.rowsDroppedOnOverload.Add(rows)
logger.WithThrottler("droppedSamplesOnOverload", 5*time.Second).Warnf( dropSamplesOnOverloadLogger.Warnf("some rows dropped, because -dropSamplesOnOverload is set and vmstorage %s cannot accept new rows now. "+
"some rows dropped, because -dropSamplesOnOverload is set and vmstorage %s cannot accept new rows now. "+ "See vm_rpc_rows_dropped_on_overload_total metric at /metrics page", sn.dialer.Addr())
"See vm_rpc_rows_dropped_on_overload_total metric at /metrics page", sn.dialer.Addr())
return nil return nil
} }
// Slow path - sn cannot accept buf now, so re-route it to other vmstorage nodes. // Slow path - sn cannot accept buf now, so re-route it to other vmstorage nodes.
@ -73,6 +72,8 @@ func (sn *storageNode) push(buf []byte, rows int) error {
return nil return nil
} }
var dropSamplesOnOverloadLogger = logger.WithThrottler("droppedSamplesOnOverload", 5*time.Second)
func (sn *storageNode) rerouteBufToOtherStorageNodes(buf []byte, rows int) error { func (sn *storageNode) rerouteBufToOtherStorageNodes(buf []byte, rows int) error {
sn.brLock.Lock() sn.brLock.Lock()
again: again:
@ -204,18 +205,16 @@ func sendBufToReplicasNonblocking(br *bufRows, snIdx, replicas int) bool {
if attempts > len(storageNodes) { if attempts > len(storageNodes) {
if i == 0 { if i == 0 {
// The data wasn't replicated at all. // The data wasn't replicated at all.
logger.WithThrottler("cannotReplicateDataBecauseNoStorageNodes", 5*time.Second).Warnf( cannotReplicateLogger.Warnf("cannot push %d bytes with %d rows to storage nodes, since all the nodes are temporarily unavailable; "+
"cannot push %d bytes with %d rows to storage nodes, since all the nodes are temporarily unavailable; "+ "re-trying to send the data soon", len(br.buf), br.rows)
"re-trying to send the data soon", len(br.buf), br.rows)
return false return false
} }
// The data is partially replicated, so just emit a warning and return true. // The data is partially replicated, so just emit a warning and return true.
// We could retry sending the data again, but this may result in uncontrolled duplicate data. // We could retry sending the data again, but this may result in uncontrolled duplicate data.
// So it is better returning true. // So it is better returning true.
rowsIncompletelyReplicatedTotal.Add(br.rows) rowsIncompletelyReplicatedTotal.Add(br.rows)
logger.WithThrottler("incompleteReplication", 5*time.Second).Warnf( incompleteReplicationLogger.Warnf("cannot make a copy #%d out of %d copies according to -replicationFactor=%d for %d bytes with %d rows, "+
"cannot make a copy #%d out of %d copies according to -replicationFactor=%d for %d bytes with %d rows, "+ "since a part of storage nodes is temporarily unavailable", i+1, replicas, *replicationFactor, len(br.buf), br.rows)
"since a part of storage nodes is temporarily unavailable", i+1, replicas, *replicationFactor, len(br.buf), br.rows)
return true return true
} }
if idx >= len(storageNodes) { if idx >= len(storageNodes) {
@ -239,6 +238,11 @@ func sendBufToReplicasNonblocking(br *bufRows, snIdx, replicas int) bool {
return true return true
} }
var (
cannotReplicateLogger = logger.WithThrottler("cannotReplicateDataBecauseNoStorageNodes", 5*time.Second)
incompleteReplicationLogger = logger.WithThrottler("incompleteReplication", 5*time.Second)
)
func (sn *storageNode) checkHealth() { func (sn *storageNode) checkHealth() {
sn.bcLock.Lock() sn.bcLock.Lock()
defer sn.bcLock.Unlock() defer sn.bcLock.Unlock()
@ -296,9 +300,8 @@ func (sn *storageNode) sendBufRowsNonblocking(br *bufRows) bool {
return false return false
} }
// Couldn't flush buf to sn. Mark sn as broken. // Couldn't flush buf to sn. Mark sn as broken.
logger.WithThrottler("cannotSendBufRows", 5*time.Second).Warnf( cannotSendBufsLogger.Warnf("cannot send %d bytes with %d rows to -storageNode=%q: %s; closing the connection to storageNode and "+
"cannot send %d bytes with %d rows to -storageNode=%q: %s; closing the connection to storageNode and "+ "re-routing this data to healthy storage nodes", len(br.buf), br.rows, sn.dialer.Addr(), err)
"re-routing this data to healthy storage nodes", len(br.buf), br.rows, sn.dialer.Addr(), err)
if err = sn.bc.Close(); err != nil { if err = sn.bc.Close(); err != nil {
logger.WithThrottler("cannotCloseStorageNodeConn", 5*time.Second).Warnf("cannot close connection to storageNode %q: %s", sn.dialer.Addr(), err) logger.WithThrottler("cannotCloseStorageNodeConn", 5*time.Second).Warnf("cannot close connection to storageNode %q: %s", sn.dialer.Addr(), err)
} }
@ -309,6 +312,8 @@ func (sn *storageNode) sendBufRowsNonblocking(br *bufRows) bool {
return false return false
} }
var cannotSendBufsLogger = logger.WithThrottler("cannotSendBufRows", 5*time.Second)
func sendToConn(bc *handshake.BufferedConn, buf []byte) error { func sendToConn(bc *handshake.BufferedConn, buf []byte) error {
if len(buf) == 0 { if len(buf) == 0 {
// Nothing to send // Nothing to send
@ -663,8 +668,7 @@ func getNotReadyStorageNodeIdxsBlocking(dst []int, snExtra *storageNode) []int {
if len(dst) < len(storageNodes) { if len(dst) < len(storageNodes) {
return dst return dst
} }
logger.WithThrottler("storageNodesUnavailable", 5*time.Second).Warnf( noStorageNodesLogger.Warnf("all the vmstorage nodes are unavailable; stopping data processing util at least a single node becomes available")
"all the vmstorage nodes are unavailable; stopping data processing util at least a single node becomes available")
for { for {
time.Sleep(time.Second) time.Sleep(time.Second)
dst = getNotReadyStorageNodeIdxs(dst[:0], snExtra) dst = getNotReadyStorageNodeIdxs(dst[:0], snExtra)
@ -675,6 +679,8 @@ func getNotReadyStorageNodeIdxsBlocking(dst []int, snExtra *storageNode) []int {
} }
} }
var noStorageNodesLogger = logger.WithThrottler("storageNodesUnavailable", 5*time.Second)
func getNotReadyStorageNodeIdxs(dst []int, snExtra *storageNode) []int { func getNotReadyStorageNodeIdxs(dst []int, snExtra *storageNode) []int {
dst = dst[:0] dst = dst[:0]
for i, sn := range storageNodes { for i, sn := range storageNodes {

View file

@ -836,8 +836,7 @@ func (pt *partition) ForceMergeAllParts() error {
maxOutBytes := fs.MustGetFreeSpace(pt.bigPartsPath) maxOutBytes := fs.MustGetFreeSpace(pt.bigPartsPath)
if newPartSize > maxOutBytes { if newPartSize > maxOutBytes {
freeSpaceNeededBytes := newPartSize - maxOutBytes freeSpaceNeededBytes := newPartSize - maxOutBytes
logger.WithThrottler("forceMerge", time.Minute).Warnf("cannot initiate force merge for the partition %s; additional space needed: %d bytes", forceMergeLogger.Warnf("cannot initiate force merge for the partition %s; additional space needed: %d bytes", pt.name, freeSpaceNeededBytes)
pt.name, freeSpaceNeededBytes)
return nil return nil
} }
@ -848,6 +847,8 @@ func (pt *partition) ForceMergeAllParts() error {
return nil return nil
} }
var forceMergeLogger = logger.WithThrottler("forceMerge", time.Minute)
func appendAllPartsToMerge(dst, src []*partWrapper) []*partWrapper { func appendAllPartsToMerge(dst, src []*partWrapper) []*partWrapper {
for _, pw := range src { for _, pw := range src {
if pw.isInMerge { if pw.isInMerge {

View file

@ -1970,7 +1970,7 @@ func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, preci
atomic.AddUint64(&s.slowRowInserts, slowInsertsCount) atomic.AddUint64(&s.slowRowInserts, slowInsertsCount)
} }
if firstWarn != nil { if firstWarn != nil {
logger.WithThrottler("storageAddRows", 5*time.Second).Warnf("warn occurred during rows addition: %s", firstWarn) storageAddRowsLogger.Warnf("warn occurred during rows addition: %s", firstWarn)
} }
dstMrs = dstMrs[:j] dstMrs = dstMrs[:j]
rows = rows[:j] rows = rows[:j]
@ -1990,6 +1990,8 @@ func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, preci
return nil return nil
} }
var storageAddRowsLogger = logger.WithThrottler("storageAddRows", 5*time.Second)
func (s *Storage) registerSeriesCardinality(metricID uint64, metricNameRaw []byte) error { func (s *Storage) registerSeriesCardinality(metricID uint64, metricNameRaw []byte) error {
if sl := s.hourlySeriesLimiter; sl != nil && !sl.Add(metricID) { if sl := s.hourlySeriesLimiter; sl != nil && !sl.Add(metricID) {
atomic.AddUint64(&s.hourlySeriesLimitRowsDropped, 1) atomic.AddUint64(&s.hourlySeriesLimitRowsDropped, 1)