From 6c66804fd36c93ebf09ceaab7cc4d117567c4a28 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Mon, 27 Jun 2022 12:31:16 +0300 Subject: [PATCH] all: locate throttled loggers via logger.WithThrottler() only once and then use them This reduces the contention on logThrottlerRegistryMu mutex when logger.WithThrottler() is called frequently from concurrent goroutines. --- app/vmagent/remotewrite/client.go | 7 +++--- app/vminsert/netstorage/netstorage.go | 34 ++++++++++++++++----------- lib/storage/partition.go | 5 ++-- lib/storage/storage.go | 4 +++- 4 files changed, 30 insertions(+), 20 deletions(-) diff --git a/app/vmagent/remotewrite/client.go b/app/vmagent/remotewrite/client.go index da6a8960b5..d556f6cb34 100644 --- a/app/vmagent/remotewrite/client.go +++ b/app/vmagent/remotewrite/client.go @@ -345,13 +345,12 @@ again: if statusCode == 409 || statusCode == 400 { body, err := ioutil.ReadAll(resp.Body) _ = resp.Body.Close() - l := logger.WithThrottler("remoteWriteRejected", 5*time.Second) if err != nil { - l.Errorf("sending a block with size %d bytes to %q was rejected (skipping the block): status code %d; "+ + remoteWriteRejectedLogger.Errorf("sending a block with size %d bytes to %q was rejected (skipping the block): status code %d; "+ "failed to read response body: %s", len(block), c.sanitizedURL, statusCode, err) } else { - l.Errorf("sending a block with size %d bytes to %q was rejected (skipping the block): status code %d; response body: %s", + remoteWriteRejectedLogger.Errorf("sending a block with size %d bytes to %q was rejected (skipping the block): status code %d; response body: %s", len(block), c.sanitizedURL, statusCode, string(body)) } // Just drop block on 409 and 400 status codes like Prometheus does. @@ -388,6 +387,8 @@ again: goto again } +var remoteWriteRejectedLogger = logger.WithThrottler("remoteWriteRejected", 5*time.Second) + type rateLimiter struct { perSecondLimit int64 diff --git a/app/vminsert/netstorage/netstorage.go b/app/vminsert/netstorage/netstorage.go index e910407251..f029988d94 100644 --- a/app/vminsert/netstorage/netstorage.go +++ b/app/vminsert/netstorage/netstorage.go @@ -61,9 +61,8 @@ func (sn *storageNode) push(buf []byte, rows int) error { } if *dropSamplesOnOverload && atomic.LoadUint32(&sn.isReadOnly) == 0 { sn.rowsDroppedOnOverload.Add(rows) - logger.WithThrottler("droppedSamplesOnOverload", 5*time.Second).Warnf( - "some rows dropped, because -dropSamplesOnOverload is set and vmstorage %s cannot accept new rows now. "+ - "See vm_rpc_rows_dropped_on_overload_total metric at /metrics page", sn.dialer.Addr()) + dropSamplesOnOverloadLogger.Warnf("some rows dropped, because -dropSamplesOnOverload is set and vmstorage %s cannot accept new rows now. "+ + "See vm_rpc_rows_dropped_on_overload_total metric at /metrics page", sn.dialer.Addr()) return nil } // Slow path - sn cannot accept buf now, so re-route it to other vmstorage nodes. @@ -73,6 +72,8 @@ func (sn *storageNode) push(buf []byte, rows int) error { return nil } +var dropSamplesOnOverloadLogger = logger.WithThrottler("droppedSamplesOnOverload", 5*time.Second) + func (sn *storageNode) rerouteBufToOtherStorageNodes(buf []byte, rows int) error { sn.brLock.Lock() again: @@ -204,18 +205,16 @@ func sendBufToReplicasNonblocking(br *bufRows, snIdx, replicas int) bool { if attempts > len(storageNodes) { if i == 0 { // The data wasn't replicated at all. - logger.WithThrottler("cannotReplicateDataBecauseNoStorageNodes", 5*time.Second).Warnf( - "cannot push %d bytes with %d rows to storage nodes, since all the nodes are temporarily unavailable; "+ - "re-trying to send the data soon", len(br.buf), br.rows) + cannotReplicateLogger.Warnf("cannot push %d bytes with %d rows to storage nodes, since all the nodes are temporarily unavailable; "+ + "re-trying to send the data soon", len(br.buf), br.rows) return false } // The data is partially replicated, so just emit a warning and return true. // We could retry sending the data again, but this may result in uncontrolled duplicate data. // So it is better returning true. rowsIncompletelyReplicatedTotal.Add(br.rows) - logger.WithThrottler("incompleteReplication", 5*time.Second).Warnf( - "cannot make a copy #%d out of %d copies according to -replicationFactor=%d for %d bytes with %d rows, "+ - "since a part of storage nodes is temporarily unavailable", i+1, replicas, *replicationFactor, len(br.buf), br.rows) + incompleteReplicationLogger.Warnf("cannot make a copy #%d out of %d copies according to -replicationFactor=%d for %d bytes with %d rows, "+ + "since a part of storage nodes is temporarily unavailable", i+1, replicas, *replicationFactor, len(br.buf), br.rows) return true } if idx >= len(storageNodes) { @@ -239,6 +238,11 @@ func sendBufToReplicasNonblocking(br *bufRows, snIdx, replicas int) bool { return true } +var ( + cannotReplicateLogger = logger.WithThrottler("cannotReplicateDataBecauseNoStorageNodes", 5*time.Second) + incompleteReplicationLogger = logger.WithThrottler("incompleteReplication", 5*time.Second) +) + func (sn *storageNode) checkHealth() { sn.bcLock.Lock() defer sn.bcLock.Unlock() @@ -296,9 +300,8 @@ func (sn *storageNode) sendBufRowsNonblocking(br *bufRows) bool { return false } // Couldn't flush buf to sn. Mark sn as broken. - logger.WithThrottler("cannotSendBufRows", 5*time.Second).Warnf( - "cannot send %d bytes with %d rows to -storageNode=%q: %s; closing the connection to storageNode and "+ - "re-routing this data to healthy storage nodes", len(br.buf), br.rows, sn.dialer.Addr(), err) + cannotSendBufsLogger.Warnf("cannot send %d bytes with %d rows to -storageNode=%q: %s; closing the connection to storageNode and "+ + "re-routing this data to healthy storage nodes", len(br.buf), br.rows, sn.dialer.Addr(), err) if err = sn.bc.Close(); err != nil { logger.WithThrottler("cannotCloseStorageNodeConn", 5*time.Second).Warnf("cannot close connection to storageNode %q: %s", sn.dialer.Addr(), err) } @@ -309,6 +312,8 @@ func (sn *storageNode) sendBufRowsNonblocking(br *bufRows) bool { return false } +var cannotSendBufsLogger = logger.WithThrottler("cannotSendBufRows", 5*time.Second) + func sendToConn(bc *handshake.BufferedConn, buf []byte) error { if len(buf) == 0 { // Nothing to send @@ -663,8 +668,7 @@ func getNotReadyStorageNodeIdxsBlocking(dst []int, snExtra *storageNode) []int { if len(dst) < len(storageNodes) { return dst } - logger.WithThrottler("storageNodesUnavailable", 5*time.Second).Warnf( - "all the vmstorage nodes are unavailable; stopping data processing util at least a single node becomes available") + noStorageNodesLogger.Warnf("all the vmstorage nodes are unavailable; stopping data processing util at least a single node becomes available") for { time.Sleep(time.Second) dst = getNotReadyStorageNodeIdxs(dst[:0], snExtra) @@ -675,6 +679,8 @@ func getNotReadyStorageNodeIdxsBlocking(dst []int, snExtra *storageNode) []int { } } +var noStorageNodesLogger = logger.WithThrottler("storageNodesUnavailable", 5*time.Second) + func getNotReadyStorageNodeIdxs(dst []int, snExtra *storageNode) []int { dst = dst[:0] for i, sn := range storageNodes { diff --git a/lib/storage/partition.go b/lib/storage/partition.go index ddd13fc48e..969211c83e 100644 --- a/lib/storage/partition.go +++ b/lib/storage/partition.go @@ -836,8 +836,7 @@ func (pt *partition) ForceMergeAllParts() error { maxOutBytes := fs.MustGetFreeSpace(pt.bigPartsPath) if newPartSize > maxOutBytes { freeSpaceNeededBytes := newPartSize - maxOutBytes - logger.WithThrottler("forceMerge", time.Minute).Warnf("cannot initiate force merge for the partition %s; additional space needed: %d bytes", - pt.name, freeSpaceNeededBytes) + forceMergeLogger.Warnf("cannot initiate force merge for the partition %s; additional space needed: %d bytes", pt.name, freeSpaceNeededBytes) return nil } @@ -848,6 +847,8 @@ func (pt *partition) ForceMergeAllParts() error { return nil } +var forceMergeLogger = logger.WithThrottler("forceMerge", time.Minute) + func appendAllPartsToMerge(dst, src []*partWrapper) []*partWrapper { for _, pw := range src { if pw.isInMerge { diff --git a/lib/storage/storage.go b/lib/storage/storage.go index 8dc866d52a..d007476759 100644 --- a/lib/storage/storage.go +++ b/lib/storage/storage.go @@ -1970,7 +1970,7 @@ func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, preci atomic.AddUint64(&s.slowRowInserts, slowInsertsCount) } if firstWarn != nil { - logger.WithThrottler("storageAddRows", 5*time.Second).Warnf("warn occurred during rows addition: %s", firstWarn) + storageAddRowsLogger.Warnf("warn occurred during rows addition: %s", firstWarn) } dstMrs = dstMrs[:j] rows = rows[:j] @@ -1990,6 +1990,8 @@ func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, preci return nil } +var storageAddRowsLogger = logger.WithThrottler("storageAddRows", 5*time.Second) + func (s *Storage) registerSeriesCardinality(metricID uint64, metricNameRaw []byte) error { if sl := s.hourlySeriesLimiter; sl != nil && !sl.Add(metricID) { atomic.AddUint64(&s.hourlySeriesLimitRowsDropped, 1)