diff --git a/app/vmagent/prometheusimport/request_handler.go b/app/vmagent/prometheusimport/request_handler.go index 571860e25..84ede2e1b 100644 --- a/app/vmagent/prometheusimport/request_handler.go +++ b/app/vmagent/prometheusimport/request_handler.go @@ -1,12 +1,13 @@ package prometheusimport import ( - "io" "net/http" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/common" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/remotewrite" "github.com/VictoriaMetrics/VictoriaMetrics/lib/auth" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/prometheus" @@ -30,17 +31,14 @@ func InsertHandler(at *auth.Token, req *http.Request) error { if err != nil { return err } + remoteAddr := httpserver.GetQuotedRemoteAddr(req) + uri := httpserver.GetRequestURI(req) isGzipped := req.Header.Get("Content-Encoding") == "gzip" return parser.ParseStream(req.Body, defaultTimestamp, isGzipped, func(rows []parser.Row) error { return insertRows(at, rows, extraLabels) - }, nil) -} - -// InsertHandlerForReader processes metrics from given reader with optional gzip format -func InsertHandlerForReader(r io.Reader, isGzipped bool) error { - return parser.ParseStream(r, 0, isGzipped, func(rows []parser.Row) error { - return insertRows(nil, rows, nil) - }, nil) + }, func(s string) { + logger.Errorf("error parsing prometheus text protocol, uri - %s, remote address - %q: %s", uri, remoteAddr, s) + }) } func insertRows(at *auth.Token, rows []parser.Row, extraLabels []prompbmarshal.Label) error { diff --git a/app/vmagent/prometheusimport/request_handler_test.go b/app/vmagent/prometheusimport/request_handler_test.go new file mode 100644 index 000000000..6883da2b6 --- /dev/null +++ b/app/vmagent/prometheusimport/request_handler_test.go @@ -0,0 +1,55 @@ +package prometheusimport + +import ( + "bytes" + "flag" + "log" + "net/http" + "net/http/httptest" + "strings" + "testing" + + "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/remotewrite" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" +) + +var ( + srv *httptest.Server + testOutput *bytes.Buffer +) + +func TestInsertHandler(t *testing.T) { + setUp() + defer tearDown() + req := httptest.NewRequest("POST", "/insert/0/api/v1/import/prometheus", bytes.NewBufferString(`{"foo":"bar"} +go_memstats_alloc_bytes_total 1`)) + if err := InsertHandler(nil, req); err != nil { + t.Errorf("unxepected error %s", err) + } + if msg := "error parsing prometheus text protocol"; !strings.Contains(testOutput.String(), msg) { + t.Errorf("output %q should contain %q", testOutput.String(), msg) + } +} + +func setUp() { + srv = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(204) + })) + flag.Parse() + remoteWriteFlag := "remoteWrite.url" + if err := flag.Lookup(remoteWriteFlag).Value.Set(srv.URL); err != nil { + log.Fatalf("unable to set %q with value %q, err: %v", remoteWriteFlag, srv.URL, err) + } + logger.Init() + common.StartUnmarshalWorkers() + remotewrite.Init() + testOutput = &bytes.Buffer{} + logger.SetOutputForTests(testOutput) +} + +func tearDown() { + common.StopUnmarshalWorkers() + srv.Close() + logger.ResetOutputForTest() +} diff --git a/app/vminsert/prometheusimport/request_handler.go b/app/vminsert/prometheusimport/request_handler.go index 0afccf45b..12b1fe90e 100644 --- a/app/vminsert/prometheusimport/request_handler.go +++ b/app/vminsert/prometheusimport/request_handler.go @@ -5,6 +5,8 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/common" "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/relabel" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/prometheus" @@ -26,10 +28,14 @@ func InsertHandler(req *http.Request) error { if err != nil { return err } + remoteAddr := httpserver.GetQuotedRemoteAddr(req) + uri := httpserver.GetRequestURI(req) isGzipped := req.Header.Get("Content-Encoding") == "gzip" return parser.ParseStream(req.Body, defaultTimestamp, isGzipped, func(rows []parser.Row) error { return insertRows(rows, extraLabels) - }, nil) + }, func(s string) { + logger.Errorf("error parsing prometheus text protocol, uri - %s, remote address - %q: %s", uri, remoteAddr, s) + }) } func insertRows(rows []parser.Row, extraLabels []prompbmarshal.Label) error { diff --git a/lib/logger/logger.go b/lib/logger/logger.go index 97539b29e..3b973a81c 100644 --- a/lib/logger/logger.go +++ b/lib/logger/logger.go @@ -315,3 +315,9 @@ func shouldSkipLog(level string) bool { return false } } + +// SetOutputForTests redefine output for logger. Use for Tests only. Call ResetOutputForTest to return output state to default +func SetOutputForTests(writer io.Writer) { output = writer } + +// ResetOutputForTest set logger output to default value +func ResetOutputForTest() { output = os.Stderr } diff --git a/lib/protoparser/prometheus/streamparser.go b/lib/protoparser/prometheus/streamparser.go index b7e8a2bff..0a0b55477 100644 --- a/lib/protoparser/prometheus/streamparser.go +++ b/lib/protoparser/prometheus/streamparser.go @@ -179,7 +179,7 @@ func (uw *unmarshalWork) Unmarshal() { // Fill missing timestamps with the current timestamp. defaultTimestamp := uw.defaultTimestamp if defaultTimestamp <= 0 { - defaultTimestamp = int64(time.Now().UnixNano() / 1e6) + defaultTimestamp = time.Now().UnixNano() / 1e6 } for i := range rows { r := &rows[i]