all: locate throttled loggers via logger.WithThrottler() only once and then use them

This reduces the contention on logThrottlerRegistryMu mutex when logger.WithThrottler()
is called frequently from concurrent goroutines.
This commit is contained in:
Aliaksandr Valialkin 2022-06-27 12:31:16 +03:00
parent 45f20ad1aa
commit 134751e43e
No known key found for this signature in database
GPG key ID: A72BEC6CD3D0DED1
3 changed files with 10 additions and 6 deletions

View file

@ -345,13 +345,12 @@ again:
if statusCode == 409 || statusCode == 400 {
body, err := ioutil.ReadAll(resp.Body)
_ = resp.Body.Close()
l := logger.WithThrottler("remoteWriteRejected", 5*time.Second)
if err != nil {
l.Errorf("sending a block with size %d bytes to %q was rejected (skipping the block): status code %d; "+
remoteWriteRejectedLogger.Errorf("sending a block with size %d bytes to %q was rejected (skipping the block): status code %d; "+
"failed to read response body: %s",
len(block), c.sanitizedURL, statusCode, err)
} else {
l.Errorf("sending a block with size %d bytes to %q was rejected (skipping the block): status code %d; response body: %s",
remoteWriteRejectedLogger.Errorf("sending a block with size %d bytes to %q was rejected (skipping the block): status code %d; response body: %s",
len(block), c.sanitizedURL, statusCode, string(body))
}
// Just drop block on 409 and 400 status codes like Prometheus does.
@ -388,6 +387,8 @@ again:
goto again
}
var remoteWriteRejectedLogger = logger.WithThrottler("remoteWriteRejected", 5*time.Second)
type rateLimiter struct {
perSecondLimit int64

View file

@ -836,8 +836,7 @@ func (pt *partition) ForceMergeAllParts() error {
maxOutBytes := fs.MustGetFreeSpace(pt.bigPartsPath)
if newPartSize > maxOutBytes {
freeSpaceNeededBytes := newPartSize - maxOutBytes
logger.WithThrottler("forceMerge", time.Minute).Warnf("cannot initiate force merge for the partition %s; additional space needed: %d bytes",
pt.name, freeSpaceNeededBytes)
forceMergeLogger.Warnf("cannot initiate force merge for the partition %s; additional space needed: %d bytes", pt.name, freeSpaceNeededBytes)
return nil
}
@ -848,6 +847,8 @@ func (pt *partition) ForceMergeAllParts() error {
return nil
}
var forceMergeLogger = logger.WithThrottler("forceMerge", time.Minute)
func appendAllPartsToMerge(dst, src []*partWrapper) []*partWrapper {
for _, pw := range src {
if pw.isInMerge {

View file

@ -1861,7 +1861,7 @@ func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, preci
atomic.AddUint64(&s.slowRowInserts, slowInsertsCount)
}
if firstWarn != nil {
logger.WithThrottler("storageAddRows", 5*time.Second).Warnf("warn occurred during rows addition: %s", firstWarn)
storageAddRowsLogger.Warnf("warn occurred during rows addition: %s", firstWarn)
}
dstMrs = dstMrs[:j]
rows = rows[:j]
@ -1881,6 +1881,8 @@ func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, preci
return nil
}
var storageAddRowsLogger = logger.WithThrottler("storageAddRows", 5*time.Second)
func (s *Storage) registerSeriesCardinality(metricID uint64, metricNameRaw []byte) error {
if sl := s.hourlySeriesLimiter; sl != nil && !sl.Add(metricID) {
atomic.AddUint64(&s.hourlySeriesLimitRowsDropped, 1)