From 23e1de06ee7040fdd650b469b21fd059aa0a4083 Mon Sep 17 00:00:00 2001
From: Roman Khavronenko <hagen1778@gmail.com>
Date: Tue, 21 Dec 2021 16:36:09 +0200
Subject: [PATCH] =?UTF-8?q?vmagent:=20add=20error=20log=20for=20skipped=20?=
 =?UTF-8?q?data=20block=20when=20rejected=20by=20receiv=E2=80=A6=20(#1956)?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

* vmagent: add error log for skipped data block when rejected by receiving side

Previously, rejected data blocks were silently dropped - only metrics were update.
From operational perspective, having an additional logging for such cases is preferable.

https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1911

Signed-off-by: hagen1778 <roman@victoriametrics.com>

* vmagent: throttle log messages about skipped blocks

The new type of logger was added to logger pacakge.
This new type supposed to control number of logged messages
by time.

Signed-off-by: hagen1778 <roman@victoriametrics.com>

* lib/logger: make LogThrottler public, so its methods can be inspected by external packages

Co-authored-by: Aliaksandr Valialkin <valyala@victoriametrics.com>
---
 app/vmagent/remotewrite/client.go | 11 +++++
 lib/logger/throttler.go           | 72 +++++++++++++++++++++++++++++++
 lib/logger/throttler_test.go      | 40 +++++++++++++++++
 3 files changed, 123 insertions(+)
 create mode 100644 lib/logger/throttler.go
 create mode 100644 lib/logger/throttler_test.go

diff --git a/app/vmagent/remotewrite/client.go b/app/vmagent/remotewrite/client.go
index 5a4f5ea53e..0493758b8c 100644
--- a/app/vmagent/remotewrite/client.go
+++ b/app/vmagent/remotewrite/client.go
@@ -295,6 +295,17 @@ again:
 	}
 	metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_requests_total{url=%q, status_code="%d"}`, c.sanitizedURL, statusCode)).Inc()
 	if statusCode == 409 || statusCode == 400 {
+		body, err := ioutil.ReadAll(resp.Body)
+		_ = resp.Body.Close()
+		l := logger.WithThrottler("remoteWriteRejected", 5*time.Second)
+		if err != nil {
+			l.Errorf("sending a block with size %d bytes to %q was rejected (skipping the block): status code %d; "+
+				"failed to read response body: %s",
+				len(block), c.sanitizedURL, statusCode, err)
+		} else {
+			l.Errorf("sending a block with size %d bytes to %q was rejected (skipping the block): status code %d; response body: %s",
+				len(block), c.sanitizedURL, statusCode, string(body))
+		}
 		// Just drop block on 409 and 400 status codes like Prometheus does.
 		// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/873
 		// and https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1149
diff --git a/lib/logger/throttler.go b/lib/logger/throttler.go
new file mode 100644
index 0000000000..da1e9edabb
--- /dev/null
+++ b/lib/logger/throttler.go
@@ -0,0 +1,72 @@
+package logger
+
+import (
+	"sync"
+	"time"
+)
+
+var (
+	logThrottlerRegistryMu = sync.Mutex{}
+	logThrottlerRegistry   = make(map[string]*LogThrottler)
+)
+
+// WithThrottler returns a logger throttled by time - only one message in throttle duration will be logged.
+//
+// New logger is created only once for each unique name passed.
+// The function is thread-safe.
+func WithThrottler(name string, throttle time.Duration) *LogThrottler {
+	logThrottlerRegistryMu.Lock()
+	defer logThrottlerRegistryMu.Unlock()
+
+	lt, ok := logThrottlerRegistry[name]
+	if ok {
+		return lt
+	}
+
+	lt = newLogThrottler(throttle)
+	lt.warnF = Warnf
+	lt.errorF = Errorf
+	logThrottlerRegistry[name] = lt
+	return lt
+}
+
+// LogThrottler is a logger, which throttles log messages passed to Warnf and Errorf.
+//
+// LogThrottler must be created via WithThrottler() call.
+type LogThrottler struct {
+	ch chan struct{}
+
+	warnF  func(format string, args ...interface{})
+	errorF func(format string, args ...interface{})
+}
+
+func newLogThrottler(throttle time.Duration) *LogThrottler {
+	lt := &LogThrottler{
+		ch: make(chan struct{}, 1),
+	}
+	go func() {
+		for {
+			<-lt.ch
+			time.Sleep(throttle)
+		}
+	}()
+	return lt
+}
+
+// Errorf logs error message.
+func (lt *LogThrottler) Errorf(format string, args ...interface{}) {
+	select {
+	case lt.ch <- struct{}{}:
+		lt.errorF(format, args...)
+	default:
+	}
+}
+
+// Warnf logs warn message.
+func (lt *LogThrottler) Warnf(format string, args ...interface{}) {
+	select {
+	case lt.ch <- struct{}{}:
+		lt.warnF(format, args...)
+	default:
+	}
+}
diff --git a/lib/logger/throttler_test.go b/lib/logger/throttler_test.go
new file mode 100644
index 0000000000..53f3c81a6e
--- /dev/null
+++ b/lib/logger/throttler_test.go
@@ -0,0 +1,40 @@
+package logger
+
+import (
+	"testing"
+	"time"
+)
+
+func TestLoggerWithThrottler(t *testing.T) {
+	lName := "test"
+	lThrottle := 50 * time.Millisecond
+
+	lt := WithThrottler(lName, lThrottle)
+	var i int
+	lt.warnF = func(format string, args ...interface{}) {
+		i++
+	}
+
+	lt.Warnf("")
+	lt.Warnf("")
+	lt.Warnf("")
+
+	if i != 1 {
+		t.Fatalf("expected logger will be throttled to 1; got %d instead", i)
+	}
+
+	time.Sleep(lThrottle * 2) // wait to throttle to fade off
+	// the same logger supposed to be return for the same name
+	WithThrottler(lName, lThrottle).Warnf("")
+	if i != 2 {
+		t.Fatalf("expected logger to have 2 iterations; got %d instead", i)
+	}
+
+	logThrottlerRegistryMu.Lock()
+	registeredN := len(logThrottlerRegistry)
+	logThrottlerRegistryMu.Unlock()
+
+	if registeredN != 1 {
+		t.Fatalf("expected only 1 logger to be registered; got %d", registeredN)
+	}
+}