diff --git a/lib/logger/logger.go b/lib/logger/logger.go index 2dd657923..2764120ec 100644 --- a/lib/logger/logger.go +++ b/lib/logger/logger.go @@ -10,7 +10,6 @@ import ( "runtime" "strings" "sync" - "sync/atomic" "time" "github.com/VictoriaMetrics/VictoriaMetrics/lib/buildinfo" @@ -130,16 +129,59 @@ func logLevelSkipframes(skipframes int, level, format string, args ...interface{ func errorsAndWarnsLoggedCleaner() { for { time.Sleep(time.Second) - atomic.StoreUint64(&errorsLogged, 0) - atomic.StoreUint64(&warnsLogged, 0) + logLimiter.reset() } } var ( - errorsLogged uint64 - warnsLogged uint64 + logLimiter = newLogLimit() ) +func newLogLimit() *logLimit { + return &logLimit{ + s: make(map[string]uint64), + } +} + +type logLimit struct { + mu sync.Mutex + s map[string]uint64 +} + +func (ll *logLimit) reset() { + ll.mu.Lock() + ll.s = make(map[string]uint64) + ll.mu.Unlock() +} + +// needSuppress check if given string exceeds limit, +// when count equals limit, log message prefix returned. +func (ll *logLimit) needSuppress(file string, limit uint64) (bool, string) { + // fast path + var msg string + if limit == 0 { + return false, msg + } + ll.mu.Lock() + defer ll.mu.Unlock() + + if n, ok := ll.s[file]; ok { + if n >= limit { + switch n { + // report only once + case limit: + msg = fmt.Sprintf("suppressing log message with rate limit=%d: ", limit) + default: + return true, msg + } + } + ll.s[file] = n + 1 + } else { + ll.s[file] = 1 + } + return false, msg +} + type logWriter struct { } @@ -149,17 +191,6 @@ func (lw *logWriter) Write(p []byte) (int, error) { } func logMessage(level, msg string, skipframes int) { - // rate limit ERROR log messages - if level == "ERROR" { - if n := atomic.AddUint64(&errorsLogged, 1); *errorsPerSecondLimit > 0 && n > uint64(*errorsPerSecondLimit) { - return - } - } - if level == "WARN" { - if n := atomic.AddUint64(&warnsLogged, 1); *warnsPerSecondLimit > 0 && n > uint64(*warnsPerSecondLimit) { - return - } - } timestamp := "" if !*disableTimestamps { @@ -175,23 +206,39 @@ func logMessage(level, msg string, skipframes int) { // Strip /VictoriaMetrics/ prefix file = file[n+len("/VictoriaMetrics/"):] } + location := fmt.Sprintf("%s:%d", file, line) + + // rate limit ERROR and WARN log messages with given limit. + if level == "ERROR" || level == "WARN" { + limit := uint64(*errorsPerSecondLimit) + if level == "WARN" { + limit = uint64(*warnsPerSecondLimit) + } + ok, suppressMessage := logLimiter.needSuppress(location, limit) + if ok { + return + } + if len(suppressMessage) > 0 { + msg = suppressMessage + msg + } + } + for len(msg) > 0 && msg[len(msg)-1] == '\n' { msg = msg[:len(msg)-1] } var logMsg string switch *loggerFormat { case "json": - caller := fmt.Sprintf("%s:%d", file, line) if *disableTimestamps { - logMsg = fmt.Sprintf(`{"level":%q,"caller":%q,"msg":%q}`+"\n", levelLowercase, caller, msg) + logMsg = fmt.Sprintf(`{"level":%q,"caller":%q,"msg":%q}`+"\n", levelLowercase, location, msg) } else { - logMsg = fmt.Sprintf(`{"ts":%q,"level":%q,"caller":%q,"msg":%q}`+"\n", timestamp, levelLowercase, caller, msg) + logMsg = fmt.Sprintf(`{"ts":%q,"level":%q,"caller":%q,"msg":%q}`+"\n", timestamp, levelLowercase, location, msg) } default: if *disableTimestamps { - logMsg = fmt.Sprintf("%s\t%s:%d\t%s\n", levelLowercase, file, line, msg) + logMsg = fmt.Sprintf("%s\t%s\t%s\n", levelLowercase, location, msg) } else { - logMsg = fmt.Sprintf("%s\t%s\t%s:%d\t%s\n", timestamp, levelLowercase, file, line, msg) + logMsg = fmt.Sprintf("%s\t%s\t%s\t%s\n", timestamp, levelLowercase, location, msg) } } @@ -201,7 +248,6 @@ func logMessage(level, msg string, skipframes int) { mu.Unlock() // Increment vm_log_messages_total - location := fmt.Sprintf("%s:%d", file, line) counterName := fmt.Sprintf(`vm_log_messages_total{app_version=%q, level=%q, location=%q}`, buildinfo.Version, levelLowercase, location) metrics.GetOrCreateCounter(counterName).Inc()