mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-03-21 15:45:01 +00:00
add error handler for parsing prometheus text format to vmagent and v… (#3693)
* add error handler for parsing prometheus text format to vmagent and vminsert Signed-off-by: Artem Navoiev <tenmozes@gmail.com> * fix typo Signed-off-by: Artem Navoiev <tenmozes@gmail.com> * typo Signed-off-by: Artem Navoiev <tenmozes@gmail.com> * fix variables naming and error message Signed-off-by: Artem Navoiev <tenmozes@gmail.com> Signed-off-by: Artem Navoiev <tenmozes@gmail.com>
This commit is contained in:
parent
3536bef36e
commit
1cfa183c2b
5 changed files with 76 additions and 11 deletions
|
@ -1,12 +1,13 @@
|
||||||
package prometheusimport
|
package prometheusimport
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"io"
|
|
||||||
"net/http"
|
"net/http"
|
||||||
|
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/common"
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/common"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/remotewrite"
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/remotewrite"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
||||||
parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
|
parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
|
||||||
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/prometheus"
|
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/prometheus"
|
||||||
|
@ -30,17 +31,14 @@ func InsertHandler(at *auth.Token, req *http.Request) error {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
remoteAddr := httpserver.GetQuotedRemoteAddr(req)
|
||||||
|
uri := httpserver.GetRequestURI(req)
|
||||||
isGzipped := req.Header.Get("Content-Encoding") == "gzip"
|
isGzipped := req.Header.Get("Content-Encoding") == "gzip"
|
||||||
return parser.ParseStream(req.Body, defaultTimestamp, isGzipped, func(rows []parser.Row) error {
|
return parser.ParseStream(req.Body, defaultTimestamp, isGzipped, func(rows []parser.Row) error {
|
||||||
return insertRows(at, rows, extraLabels)
|
return insertRows(at, rows, extraLabels)
|
||||||
}, nil)
|
}, func(s string) {
|
||||||
}
|
logger.Errorf("error parsing prometheus text protocol, uri - %s, remote address - %q: %s", uri, remoteAddr, s)
|
||||||
|
})
|
||||||
// InsertHandlerForReader processes metrics from given reader with optional gzip format
|
|
||||||
func InsertHandlerForReader(r io.Reader, isGzipped bool) error {
|
|
||||||
return parser.ParseStream(r, 0, isGzipped, func(rows []parser.Row) error {
|
|
||||||
return insertRows(nil, rows, nil)
|
|
||||||
}, nil)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func insertRows(at *auth.Token, rows []parser.Row, extraLabels []prompbmarshal.Label) error {
|
func insertRows(at *auth.Token, rows []parser.Row, extraLabels []prompbmarshal.Label) error {
|
||||||
|
|
55
app/vmagent/prometheusimport/request_handler_test.go
Normal file
55
app/vmagent/prometheusimport/request_handler_test.go
Normal file
|
@ -0,0 +1,55 @@
|
||||||
|
package prometheusimport
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"flag"
|
||||||
|
"log"
|
||||||
|
"net/http"
|
||||||
|
"net/http/httptest"
|
||||||
|
"strings"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/remotewrite"
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
srv *httptest.Server
|
||||||
|
testOutput *bytes.Buffer
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestInsertHandler(t *testing.T) {
|
||||||
|
setUp()
|
||||||
|
defer tearDown()
|
||||||
|
req := httptest.NewRequest("POST", "/insert/0/api/v1/import/prometheus", bytes.NewBufferString(`{"foo":"bar"}
|
||||||
|
go_memstats_alloc_bytes_total 1`))
|
||||||
|
if err := InsertHandler(nil, req); err != nil {
|
||||||
|
t.Errorf("unxepected error %s", err)
|
||||||
|
}
|
||||||
|
if msg := "error parsing prometheus text protocol"; !strings.Contains(testOutput.String(), msg) {
|
||||||
|
t.Errorf("output %q should contain %q", testOutput.String(), msg)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func setUp() {
|
||||||
|
srv = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
|
||||||
|
w.WriteHeader(204)
|
||||||
|
}))
|
||||||
|
flag.Parse()
|
||||||
|
remoteWriteFlag := "remoteWrite.url"
|
||||||
|
if err := flag.Lookup(remoteWriteFlag).Value.Set(srv.URL); err != nil {
|
||||||
|
log.Fatalf("unable to set %q with value %q, err: %v", remoteWriteFlag, srv.URL, err)
|
||||||
|
}
|
||||||
|
logger.Init()
|
||||||
|
common.StartUnmarshalWorkers()
|
||||||
|
remotewrite.Init()
|
||||||
|
testOutput = &bytes.Buffer{}
|
||||||
|
logger.SetOutputForTests(testOutput)
|
||||||
|
}
|
||||||
|
|
||||||
|
func tearDown() {
|
||||||
|
common.StopUnmarshalWorkers()
|
||||||
|
srv.Close()
|
||||||
|
logger.ResetOutputForTest()
|
||||||
|
}
|
|
@ -5,6 +5,8 @@ import (
|
||||||
|
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/common"
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/common"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/relabel"
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/relabel"
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
||||||
parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
|
parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
|
||||||
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/prometheus"
|
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/prometheus"
|
||||||
|
@ -26,10 +28,14 @@ func InsertHandler(req *http.Request) error {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
remoteAddr := httpserver.GetQuotedRemoteAddr(req)
|
||||||
|
uri := httpserver.GetRequestURI(req)
|
||||||
isGzipped := req.Header.Get("Content-Encoding") == "gzip"
|
isGzipped := req.Header.Get("Content-Encoding") == "gzip"
|
||||||
return parser.ParseStream(req.Body, defaultTimestamp, isGzipped, func(rows []parser.Row) error {
|
return parser.ParseStream(req.Body, defaultTimestamp, isGzipped, func(rows []parser.Row) error {
|
||||||
return insertRows(rows, extraLabels)
|
return insertRows(rows, extraLabels)
|
||||||
}, nil)
|
}, func(s string) {
|
||||||
|
logger.Errorf("error parsing prometheus text protocol, uri - %s, remote address - %q: %s", uri, remoteAddr, s)
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func insertRows(rows []parser.Row, extraLabels []prompbmarshal.Label) error {
|
func insertRows(rows []parser.Row, extraLabels []prompbmarshal.Label) error {
|
||||||
|
|
|
@ -315,3 +315,9 @@ func shouldSkipLog(level string) bool {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SetOutputForTests redefine output for logger. Use for Tests only. Call ResetOutputForTest to return output state to default
|
||||||
|
func SetOutputForTests(writer io.Writer) { output = writer }
|
||||||
|
|
||||||
|
// ResetOutputForTest set logger output to default value
|
||||||
|
func ResetOutputForTest() { output = os.Stderr }
|
||||||
|
|
|
@ -179,7 +179,7 @@ func (uw *unmarshalWork) Unmarshal() {
|
||||||
// Fill missing timestamps with the current timestamp.
|
// Fill missing timestamps with the current timestamp.
|
||||||
defaultTimestamp := uw.defaultTimestamp
|
defaultTimestamp := uw.defaultTimestamp
|
||||||
if defaultTimestamp <= 0 {
|
if defaultTimestamp <= 0 {
|
||||||
defaultTimestamp = int64(time.Now().UnixNano() / 1e6)
|
defaultTimestamp = time.Now().UnixNano() / 1e6
|
||||||
}
|
}
|
||||||
for i := range rows {
|
for i := range rows {
|
||||||
r := &rows[i]
|
r := &rows[i]
|
||||||
|
|
Loading…
Reference in a new issue