Adds log suppression per caller (#908)

* Adds log suppression per caller
https://github.com/VictoriaMetrics/VictoriaMetrics/issues/905

* fixes style and report message
This commit is contained in:
Nikolay 2020-11-19 13:17:23 +03:00 committed by GitHub
parent 2859a452d4
commit 09105ff49c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -10,7 +10,6 @@ import (
"runtime"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/buildinfo"
@ -130,16 +129,59 @@ func logLevelSkipframes(skipframes int, level, format string, args ...interface{
func errorsAndWarnsLoggedCleaner() {
for {
time.Sleep(time.Second)
atomic.StoreUint64(&errorsLogged, 0)
atomic.StoreUint64(&warnsLogged, 0)
logLimiter.reset()
}
}
var (
errorsLogged uint64
warnsLogged uint64
logLimiter = newLogLimit()
)
func newLogLimit() *logLimit {
return &logLimit{
s: make(map[string]uint64),
}
}
type logLimit struct {
mu sync.Mutex
s map[string]uint64
}
func (ll *logLimit) reset() {
ll.mu.Lock()
ll.s = make(map[string]uint64)
ll.mu.Unlock()
}
// needSuppress check if given string exceeds limit,
// when count equals limit, log message prefix returned.
func (ll *logLimit) needSuppress(file string, limit uint64) (bool, string) {
// fast path
var msg string
if limit == 0 {
return false, msg
}
ll.mu.Lock()
defer ll.mu.Unlock()
if n, ok := ll.s[file]; ok {
if n >= limit {
switch n {
// report only once
case limit:
msg = fmt.Sprintf("suppressing log message with rate limit=%d: ", limit)
default:
return true, msg
}
}
ll.s[file] = n + 1
} else {
ll.s[file] = 1
}
return false, msg
}
type logWriter struct {
}
@ -149,17 +191,6 @@ func (lw *logWriter) Write(p []byte) (int, error) {
}
func logMessage(level, msg string, skipframes int) {
// rate limit ERROR log messages
if level == "ERROR" {
if n := atomic.AddUint64(&errorsLogged, 1); *errorsPerSecondLimit > 0 && n > uint64(*errorsPerSecondLimit) {
return
}
}
if level == "WARN" {
if n := atomic.AddUint64(&warnsLogged, 1); *warnsPerSecondLimit > 0 && n > uint64(*warnsPerSecondLimit) {
return
}
}
timestamp := ""
if !*disableTimestamps {
@ -175,23 +206,39 @@ func logMessage(level, msg string, skipframes int) {
// Strip /VictoriaMetrics/ prefix
file = file[n+len("/VictoriaMetrics/"):]
}
location := fmt.Sprintf("%s:%d", file, line)
// rate limit ERROR and WARN log messages with given limit.
if level == "ERROR" || level == "WARN" {
limit := uint64(*errorsPerSecondLimit)
if level == "WARN" {
limit = uint64(*warnsPerSecondLimit)
}
ok, suppressMessage := logLimiter.needSuppress(location, limit)
if ok {
return
}
if len(suppressMessage) > 0 {
msg = suppressMessage + msg
}
}
for len(msg) > 0 && msg[len(msg)-1] == '\n' {
msg = msg[:len(msg)-1]
}
var logMsg string
switch *loggerFormat {
case "json":
caller := fmt.Sprintf("%s:%d", file, line)
if *disableTimestamps {
logMsg = fmt.Sprintf(`{"level":%q,"caller":%q,"msg":%q}`+"\n", levelLowercase, caller, msg)
logMsg = fmt.Sprintf(`{"level":%q,"caller":%q,"msg":%q}`+"\n", levelLowercase, location, msg)
} else {
logMsg = fmt.Sprintf(`{"ts":%q,"level":%q,"caller":%q,"msg":%q}`+"\n", timestamp, levelLowercase, caller, msg)
logMsg = fmt.Sprintf(`{"ts":%q,"level":%q,"caller":%q,"msg":%q}`+"\n", timestamp, levelLowercase, location, msg)
}
default:
if *disableTimestamps {
logMsg = fmt.Sprintf("%s\t%s:%d\t%s\n", levelLowercase, file, line, msg)
logMsg = fmt.Sprintf("%s\t%s\t%s\n", levelLowercase, location, msg)
} else {
logMsg = fmt.Sprintf("%s\t%s\t%s:%d\t%s\n", timestamp, levelLowercase, file, line, msg)
logMsg = fmt.Sprintf("%s\t%s\t%s\t%s\n", timestamp, levelLowercase, location, msg)
}
}
@ -201,7 +248,6 @@ func logMessage(level, msg string, skipframes int) {
mu.Unlock()
// Increment vm_log_messages_total
location := fmt.Sprintf("%s:%d", file, line)
counterName := fmt.Sprintf(`vm_log_messages_total{app_version=%q, level=%q, location=%q}`, buildinfo.Version, levelLowercase, location)
metrics.GetOrCreateCounter(counterName).Inc()