vmagent: add error log for skipped data block when rejected by receiv… (#1956)

* vmagent: add error log for skipped data block when rejected by receiving side

Previously, rejected data blocks were silently dropped - only metrics were update.
From operational perspective, having an additional logging for such cases is preferable.

https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1911

Signed-off-by: hagen1778 <roman@victoriametrics.com>

* vmagent: throttle log messages about skipped blocks

The new type of logger was added to logger pacakge.
This new type supposed to control number of logged messages
by time.

Signed-off-by: hagen1778 <roman@victoriametrics.com>

* lib/logger: make LogThrottler public, so its methods can be inspected by external packages

Co-authored-by: Aliaksandr Valialkin <valyala@victoriametrics.com>
This commit is contained in:
Roman Khavronenko 2021-12-21 16:36:09 +02:00 committed by GitHub
parent 5dc9ab5829
commit 34fdc8881b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 123 additions and 0 deletions

View file

@ -295,6 +295,17 @@ again:
}
metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_requests_total{url=%q, status_code="%d"}`, c.sanitizedURL, statusCode)).Inc()
if statusCode == 409 || statusCode == 400 {
body, err := ioutil.ReadAll(resp.Body)
_ = resp.Body.Close()
l := logger.WithThrottler("remoteWriteRejected", 5*time.Second)
if err != nil {
l.Errorf("sending a block with size %d bytes to %q was rejected (skipping the block): status code %d; "+
"failed to read response body: %s",
len(block), c.sanitizedURL, statusCode, err)
} else {
l.Errorf("sending a block with size %d bytes to %q was rejected (skipping the block): status code %d; response body: %s",
len(block), c.sanitizedURL, statusCode, string(body))
}
// Just drop block on 409 and 400 status codes like Prometheus does.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/873
// and https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1149

72
lib/logger/throttler.go Normal file
View file

@ -0,0 +1,72 @@
package logger
import (
"sync"
"time"
)
var (
logThrottlerRegistryMu = sync.Mutex{}
logThrottlerRegistry = make(map[string]*LogThrottler)
)
// WithThrottler returns a logger throttled by time - only one message in throttle duration will be logged.
//
// New logger is created only once for each unique name passed.
// The function is thread-safe.
func WithThrottler(name string, throttle time.Duration) *LogThrottler {
logThrottlerRegistryMu.Lock()
defer logThrottlerRegistryMu.Unlock()
lt, ok := logThrottlerRegistry[name]
if ok {
return lt
}
lt = newLogThrottler(throttle)
lt.warnF = Warnf
lt.errorF = Errorf
logThrottlerRegistry[name] = lt
return lt
}
// LogThrottler is a logger, which throttles log messages passed to Warnf and Errorf.
//
// LogThrottler must be created via WithThrottler() call.
type LogThrottler struct {
ch chan struct{}
warnF func(format string, args ...interface{})
errorF func(format string, args ...interface{})
}
func newLogThrottler(throttle time.Duration) *LogThrottler {
lt := &LogThrottler{
ch: make(chan struct{}, 1),
}
go func() {
for {
<-lt.ch
time.Sleep(throttle)
}
}()
return lt
}
// Errorf logs error message.
func (lt *LogThrottler) Errorf(format string, args ...interface{}) {
select {
case lt.ch <- struct{}{}:
lt.errorF(format, args...)
default:
}
}
// Warnf logs warn message.
func (lt *LogThrottler) Warnf(format string, args ...interface{}) {
select {
case lt.ch <- struct{}{}:
lt.warnF(format, args...)
default:
}
}

View file

@ -0,0 +1,40 @@
package logger
import (
"testing"
"time"
)
func TestLoggerWithThrottler(t *testing.T) {
lName := "test"
lThrottle := 50 * time.Millisecond
lt := WithThrottler(lName, lThrottle)
var i int
lt.warnF = func(format string, args ...interface{}) {
i++
}
lt.Warnf("")
lt.Warnf("")
lt.Warnf("")
if i != 1 {
t.Fatalf("expected logger will be throttled to 1; got %d instead", i)
}
time.Sleep(lThrottle * 2) // wait to throttle to fade off
// the same logger supposed to be return for the same name
WithThrottler(lName, lThrottle).Warnf("")
if i != 2 {
t.Fatalf("expected logger to have 2 iterations; got %d instead", i)
}
logThrottlerRegistryMu.Lock()
registeredN := len(logThrottlerRegistry)
logThrottlerRegistryMu.Unlock()
if registeredN != 1 {
t.Fatalf("expected only 1 logger to be registered; got %d", registeredN)
}
}