From 1cfa183c2bba07f914f091967dcd920054b7a12b Mon Sep 17 00:00:00 2001 From: Artem Navoiev Date: Tue, 24 Jan 2023 08:14:34 +0200 Subject: [PATCH] =?UTF-8?q?add=20error=20handler=20for=20parsing=20prometh?= =?UTF-8?q?eus=20text=20format=20to=20vmagent=20and=20v=E2=80=A6=20(#3693)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * add error handler for parsing prometheus text format to vmagent and vminsert Signed-off-by: Artem Navoiev * fix typo Signed-off-by: Artem Navoiev * typo Signed-off-by: Artem Navoiev * fix variables naming and error message Signed-off-by: Artem Navoiev Signed-off-by: Artem Navoiev --- .../prometheusimport/request_handler.go | 16 +++--- .../prometheusimport/request_handler_test.go | 55 +++++++++++++++++++ .../prometheusimport/request_handler.go | 8 ++- lib/logger/logger.go | 6 ++ lib/protoparser/prometheus/streamparser.go | 2 +- 5 files changed, 76 insertions(+), 11 deletions(-) create mode 100644 app/vmagent/prometheusimport/request_handler_test.go diff --git a/app/vmagent/prometheusimport/request_handler.go b/app/vmagent/prometheusimport/request_handler.go index 571860e25c..84ede2e1b3 100644 --- a/app/vmagent/prometheusimport/request_handler.go +++ b/app/vmagent/prometheusimport/request_handler.go @@ -1,12 +1,13 @@ package prometheusimport import ( - "io" "net/http" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/common" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/remotewrite" "github.com/VictoriaMetrics/VictoriaMetrics/lib/auth" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/prometheus" @@ -30,17 +31,14 @@ func InsertHandler(at *auth.Token, req *http.Request) error { if err != nil { return err } + remoteAddr := httpserver.GetQuotedRemoteAddr(req) + uri := httpserver.GetRequestURI(req) isGzipped := req.Header.Get("Content-Encoding") == "gzip" return parser.ParseStream(req.Body, defaultTimestamp, isGzipped, func(rows []parser.Row) error { return insertRows(at, rows, extraLabels) - }, nil) -} - -// InsertHandlerForReader processes metrics from given reader with optional gzip format -func InsertHandlerForReader(r io.Reader, isGzipped bool) error { - return parser.ParseStream(r, 0, isGzipped, func(rows []parser.Row) error { - return insertRows(nil, rows, nil) - }, nil) + }, func(s string) { + logger.Errorf("error parsing prometheus text protocol, uri - %s, remote address - %q: %s", uri, remoteAddr, s) + }) } func insertRows(at *auth.Token, rows []parser.Row, extraLabels []prompbmarshal.Label) error { diff --git a/app/vmagent/prometheusimport/request_handler_test.go b/app/vmagent/prometheusimport/request_handler_test.go new file mode 100644 index 0000000000..6883da2b61 --- /dev/null +++ b/app/vmagent/prometheusimport/request_handler_test.go @@ -0,0 +1,55 @@ +package prometheusimport + +import ( + "bytes" + "flag" + "log" + "net/http" + "net/http/httptest" + "strings" + "testing" + + "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/remotewrite" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" +) + +var ( + srv *httptest.Server + testOutput *bytes.Buffer +) + +func TestInsertHandler(t *testing.T) { + setUp() + defer tearDown() + req := httptest.NewRequest("POST", "/insert/0/api/v1/import/prometheus", bytes.NewBufferString(`{"foo":"bar"} +go_memstats_alloc_bytes_total 1`)) + if err := InsertHandler(nil, req); err != nil { + t.Errorf("unxepected error %s", err) + } + if msg := "error parsing prometheus text protocol"; !strings.Contains(testOutput.String(), msg) { + t.Errorf("output %q should contain %q", testOutput.String(), msg) + } +} + +func setUp() { + srv = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(204) + })) + flag.Parse() + remoteWriteFlag := "remoteWrite.url" + if err := flag.Lookup(remoteWriteFlag).Value.Set(srv.URL); err != nil { + log.Fatalf("unable to set %q with value %q, err: %v", remoteWriteFlag, srv.URL, err) + } + logger.Init() + common.StartUnmarshalWorkers() + remotewrite.Init() + testOutput = &bytes.Buffer{} + logger.SetOutputForTests(testOutput) +} + +func tearDown() { + common.StopUnmarshalWorkers() + srv.Close() + logger.ResetOutputForTest() +} diff --git a/app/vminsert/prometheusimport/request_handler.go b/app/vminsert/prometheusimport/request_handler.go index 0afccf45b9..12b1fe90e4 100644 --- a/app/vminsert/prometheusimport/request_handler.go +++ b/app/vminsert/prometheusimport/request_handler.go @@ -5,6 +5,8 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/common" "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/relabel" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/prometheus" @@ -26,10 +28,14 @@ func InsertHandler(req *http.Request) error { if err != nil { return err } + remoteAddr := httpserver.GetQuotedRemoteAddr(req) + uri := httpserver.GetRequestURI(req) isGzipped := req.Header.Get("Content-Encoding") == "gzip" return parser.ParseStream(req.Body, defaultTimestamp, isGzipped, func(rows []parser.Row) error { return insertRows(rows, extraLabels) - }, nil) + }, func(s string) { + logger.Errorf("error parsing prometheus text protocol, uri - %s, remote address - %q: %s", uri, remoteAddr, s) + }) } func insertRows(rows []parser.Row, extraLabels []prompbmarshal.Label) error { diff --git a/lib/logger/logger.go b/lib/logger/logger.go index 97539b29ec..3b973a81c9 100644 --- a/lib/logger/logger.go +++ b/lib/logger/logger.go @@ -315,3 +315,9 @@ func shouldSkipLog(level string) bool { return false } } + +// SetOutputForTests redefine output for logger. Use for Tests only. Call ResetOutputForTest to return output state to default +func SetOutputForTests(writer io.Writer) { output = writer } + +// ResetOutputForTest set logger output to default value +func ResetOutputForTest() { output = os.Stderr } diff --git a/lib/protoparser/prometheus/streamparser.go b/lib/protoparser/prometheus/streamparser.go index b7e8a2bff9..0a0b554779 100644 --- a/lib/protoparser/prometheus/streamparser.go +++ b/lib/protoparser/prometheus/streamparser.go @@ -179,7 +179,7 @@ func (uw *unmarshalWork) Unmarshal() { // Fill missing timestamps with the current timestamp. defaultTimestamp := uw.defaultTimestamp if defaultTimestamp <= 0 { - defaultTimestamp = int64(time.Now().UnixNano() / 1e6) + defaultTimestamp = time.Now().UnixNano() / 1e6 } for i := range rows { r := &rows[i]