From d9d759bc9069a67314a193b7bef3a399fc5f4bf0 Mon Sep 17 00:00:00 2001 From: Alexander Marshalov <_@marshalov.org> Date: Wed, 21 Jun 2023 15:31:28 +0200 Subject: [PATCH] jsonline support for data ingestion in vlinsert (#4487) added json lines / json stream format for ingestion to vlinsert --- app/vlinsert/common/flags.go | 8 + app/vlinsert/elasticsearch/elasticsearch.go | 18 +- app/vlinsert/jsonline/jsonline.go | 229 ++++++++++++++++++++ app/vlinsert/jsonline/jsonline_test.go | 70 ++++++ app/vlinsert/main.go | 4 + docs/VictoriaLogs/data-ingestion/README.md | 12 +- 6 files changed, 329 insertions(+), 12 deletions(-) create mode 100644 app/vlinsert/common/flags.go create mode 100644 app/vlinsert/jsonline/jsonline.go create mode 100644 app/vlinsert/jsonline/jsonline_test.go diff --git a/app/vlinsert/common/flags.go b/app/vlinsert/common/flags.go new file mode 100644 index 0000000000..7c29adeea9 --- /dev/null +++ b/app/vlinsert/common/flags.go @@ -0,0 +1,8 @@ +package common + +import "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" + +var ( + // MaxLineSizeBytes is the maximum size of a single line, which can be read by /insert/* handlers + MaxLineSizeBytes = flagutil.NewBytes("insert.maxLineSizeBytes", 256*1024, "The maximum size of a single line, which can be read by /insert/* handlers") +) diff --git a/app/vlinsert/elasticsearch/elasticsearch.go b/app/vlinsert/elasticsearch/elasticsearch.go index ed72cd0cfb..e02722a78d 100644 --- a/app/vlinsert/elasticsearch/elasticsearch.go +++ b/app/vlinsert/elasticsearch/elasticsearch.go @@ -11,24 +11,20 @@ import ( "strings" "time" + "github.com/VictoriaMetrics/VictoriaMetrics/app/vlinsert/common" "github.com/VictoriaMetrics/VictoriaMetrics/app/vlstorage" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bufferedwriter" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver" "github.com/VictoriaMetrics/VictoriaMetrics/lib/httputils" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logjson" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" + pc "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" "github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter" "github.com/VictoriaMetrics/metrics" ) -var ( - maxLineSizeBytes = flagutil.NewBytes("insert.maxLineSizeBytes", 256*1024, "The maximum size of a single line, which can be read by /insert/* handlers") -) - // RequestHandler processes ElasticSearch insert requests func RequestHandler(path string, w http.ResponseWriter, r *http.Request) bool { w.Header().Add("Content-Type", "application/json") @@ -165,11 +161,11 @@ func readBulkRequest(r io.Reader, isGzip bool, timeField, msgField string, // See https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html if isGzip { - zr, err := common.GetGzipReader(r) + zr, err := pc.GetGzipReader(r) if err != nil { return 0, fmt.Errorf("cannot read gzipped _bulk request: %w", err) } - defer common.PutGzipReader(zr) + defer pc.PutGzipReader(zr) r = zr } @@ -179,7 +175,7 @@ func readBulkRequest(r io.Reader, isGzip bool, timeField, msgField string, lb := lineBufferPool.Get() defer lineBufferPool.Put(lb) - lb.B = bytesutil.ResizeNoCopyNoOverallocate(lb.B, maxLineSizeBytes.IntN()) + lb.B = bytesutil.ResizeNoCopyNoOverallocate(lb.B, common.MaxLineSizeBytes.IntN()) sc := bufio.NewScanner(wcr) sc.Buffer(lb.B, len(lb.B)) @@ -215,7 +211,7 @@ func readBulkLine(sc *bufio.Scanner, timeField, msgField string, if err := sc.Err(); err != nil { if errors.Is(err, bufio.ErrTooLong) { return false, fmt.Errorf(`cannot read "create" or "index" command, since its size exceeds -insert.maxLineSizeBytes=%d`, - maxLineSizeBytes.IntN()) + common.MaxLineSizeBytes.IntN()) } return false, err } @@ -232,7 +228,7 @@ func readBulkLine(sc *bufio.Scanner, timeField, msgField string, if !sc.Scan() { if err := sc.Err(); err != nil { if errors.Is(err, bufio.ErrTooLong) { - return false, fmt.Errorf("cannot read log message, since its size exceeds -insert.maxLineSizeBytes=%d", maxLineSizeBytes.IntN()) + return false, fmt.Errorf("cannot read log message, since its size exceeds -insert.maxLineSizeBytes=%d", common.MaxLineSizeBytes.IntN()) } return false, err } diff --git a/app/vlinsert/jsonline/jsonline.go b/app/vlinsert/jsonline/jsonline.go new file mode 100644 index 0000000000..b9d57a03a5 --- /dev/null +++ b/app/vlinsert/jsonline/jsonline.go @@ -0,0 +1,229 @@ +package jsonline + +import ( + "bufio" + "errors" + "fmt" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/httputils" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/logjson" + "math" + "net/http" + "strconv" + "time" + + "github.com/VictoriaMetrics/VictoriaMetrics/app/vlinsert/common" + "github.com/VictoriaMetrics/VictoriaMetrics/app/vlstorage" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage" + pc "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter" + "github.com/VictoriaMetrics/metrics" +) + +// RequestHandler processes jsonline insert requests +func RequestHandler(path string, w http.ResponseWriter, r *http.Request) bool { + w.Header().Add("Content-Type", "application/json") + + if path != "/" { + return false + } + if method := r.Method; method != "POST" { + w.WriteHeader(http.StatusMethodNotAllowed) + return true + } + + requestsTotal.Inc() + + // Extract tenantID + tenantID, err := logstorage.GetTenantIDFromRequest(r) + if err != nil { + httpserver.Errorf(w, r, "%s", err) + return true + } + + // Extract time field name from _time_field query arg + var timeField = "_time" + if tf := r.FormValue("_time_field"); tf != "" { + timeField = tf + } + + // Extract message field name from _msg_field query arg + var msgField = "" + if msgf := r.FormValue("_msg_field"); msgf != "" { + msgField = msgf + } + + streamFields := httputils.GetArray(r, "_stream_fields") + ignoreFields := httputils.GetArray(r, "ignore_fields") + + isDebug := httputils.GetBool(r, "debug") + debugRequestURI := "" + debugRemoteAddr := "" + if isDebug { + debugRequestURI = httpserver.GetRequestURI(r) + debugRemoteAddr = httpserver.GetQuotedRemoteAddr(r) + } + + lr := logstorage.GetLogRows(streamFields, ignoreFields) + processLogMessage := func(timestamp int64, fields []logstorage.Field) { + lr.MustAdd(tenantID, timestamp, fields) + if isDebug { + s := lr.GetRowString(0) + lr.ResetKeepSettings() + logger.Infof("remoteAddr=%s; requestURI=%s; ignoring log entry because of `debug` query arg: %s", debugRemoteAddr, debugRequestURI, s) + rowsDroppedTotal.Inc() + return + } + if lr.NeedFlush() { + vlstorage.MustAddRows(lr) + lr.ResetKeepSettings() + } + } + + reader := r.Body + if r.Header.Get("Content-Encoding") == "gzip" { + zr, err := pc.GetGzipReader(reader) + if err != nil { + //return 0, fmt.Errorf("cannot read gzipped _bulk request: %w", err) + return true + } + defer pc.PutGzipReader(zr) + reader = zr + } + + wcr := writeconcurrencylimiter.GetReader(reader) + defer writeconcurrencylimiter.PutReader(wcr) + + lb := lineBufferPool.Get() + defer lineBufferPool.Put(lb) + + lb.B = bytesutil.ResizeNoCopyNoOverallocate(lb.B, common.MaxLineSizeBytes.IntN()) + sc := bufio.NewScanner(wcr) + sc.Buffer(lb.B, len(lb.B)) + + n := 0 + for { + ok, err := readLine(sc, timeField, msgField, processLogMessage) + wcr.DecConcurrency() + if err != nil { + logger.Errorf("cannot read line #%d in /jsonline request: %s", n, err) + } + if !ok { + break + } + n++ + rowsIngestedTotal.Inc() + } + + vlstorage.MustAddRows(lr) + logstorage.PutLogRows(lr) + + return true +} + +func readLine(sc *bufio.Scanner, timeField, msgField string, processLogMessage func(timestamp int64, fields []logstorage.Field)) (bool, error) { + if !sc.Scan() { + if err := sc.Err(); err != nil { + if errors.Is(err, bufio.ErrTooLong) { + return false, fmt.Errorf(`cannot read json line, since its size exceeds -insert.maxLineSizeBytes=%d`, common.MaxLineSizeBytes.IntN()) + } + return false, err + } + return false, nil + } + + line := sc.Bytes() + p := logjson.GetParser() + + llll.Warnf("\n----\n%s\n----\n", line) + + if err := p.ParseLogMessage(line); err != nil { + invalidJSONLineLogger.Warnf("cannot parse json-encoded log entry: %s", err) + return true, nil + } + + timestamp, err := extractTimestampFromFields(timeField, p.Fields) + if err != nil { + invalidTimestampLogger.Warnf("skipping the log entry because cannot parse timestamp: %s", err) + return true, nil + } + updateMessageFieldName(msgField, p.Fields) + processLogMessage(timestamp, p.Fields) + logjson.PutParser(p) + return true, nil +} + +func extractTimestampFromFields(timeField string, fields []logstorage.Field) (int64, error) { + for i := range fields { + f := &fields[i] + if f.Name != timeField { + continue + } + timestamp, err := parseTimestamp(f.Value) + if err != nil { + return 0, err + } + f.Value = "" + return timestamp, nil + } + return time.Now().UnixNano(), nil +} + +func updateMessageFieldName(msgField string, fields []logstorage.Field) { + if msgField == "" { + return + } + for i := range fields { + f := &fields[i] + if f.Name == msgField { + f.Name = "_msg" + return + } + } +} + +func parseTimestamp(s string) (int64, error) { + if len(s) < len("YYYY-MM-DD") || s[len("YYYY")] != '-' { + // Try parsing timestamp in milliseconds + n, err := strconv.ParseInt(s, 10, 64) + if err != nil { + return 0, fmt.Errorf("cannot parse timestamp in milliseconds from %q: %w", s, err) + } + if n > int64(math.MaxInt64)/1e6 { + return 0, fmt.Errorf("too big timestamp in milliseconds: %d; mustn't exceed %d", n, int64(math.MaxInt64)/1e6) + } + if n < int64(math.MinInt64)/1e6 { + return 0, fmt.Errorf("too small timestamp in milliseconds: %d; must be bigger than %d", n, int64(math.MinInt64)/1e6) + } + n *= 1e6 + return n, nil + } + if len(s) == len("YYYY-MM-DD") { + t, err := time.Parse("2006-01-02", s) + if err != nil { + return 0, fmt.Errorf("cannot parse date %q: %w", s, err) + } + return t.UnixNano(), nil + } + t, err := time.Parse(time.RFC3339, s) + if err != nil { + return 0, fmt.Errorf("cannot parse timestamp %q: %w", s, err) + } + return t.UnixNano(), nil +} + +var lineBufferPool bytesutil.ByteBufferPool + +var ( + requestsTotal = metrics.NewCounter(`vl_http_requests_total{path="/insert/jsonline"}`) + rowsIngestedTotal = metrics.NewCounter(`vl_rows_ingested_total{type="jsonline"}`) + rowsDroppedTotal = metrics.NewCounter(`vl_rows_dropped_total{path="/insert/jsonline",reason="debug"}`) +) + +var ( + invalidTimestampLogger = logger.WithThrottler("invalidTimestampLogger", 5*time.Second) + invalidJSONLineLogger = logger.WithThrottler("invalidJSONLineLogger", 5*time.Second) + llll = logger.WithThrottler("llll", 2*time.Second) +) diff --git a/app/vlinsert/jsonline/jsonline_test.go b/app/vlinsert/jsonline/jsonline_test.go new file mode 100644 index 0000000000..86a917491e --- /dev/null +++ b/app/vlinsert/jsonline/jsonline_test.go @@ -0,0 +1,70 @@ +package jsonline + +import ( + "bufio" + "bytes" + "fmt" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage" + "reflect" + "strings" + "testing" +) + +func TestReadBulkRequestSuccess(t *testing.T) { + f := func(data, timeField, msgField string, rowsExpected int, timestampsExpected []int64, resultExpected string) { + t.Helper() + + var timestamps []int64 + var result string + processLogMessage := func(timestamp int64, fields []logstorage.Field) { + timestamps = append(timestamps, timestamp) + + a := make([]string, len(fields)) + for i, f := range fields { + a[i] = fmt.Sprintf("%q:%q", f.Name, f.Value) + } + s := "{" + strings.Join(a, ",") + "}\n" + result += s + } + + // Read the request without compression + r := bytes.NewBufferString(data) + sc := bufio.NewScanner(r) + rows := 0 + for { + ok, err := readLine(sc, timeField, msgField, processLogMessage) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + if !ok { + break + } + rows++ + } + if rows != rowsExpected { + t.Fatalf("unexpected rows read; got %d; want %d", rows, rowsExpected) + } + + if !reflect.DeepEqual(timestamps, timestampsExpected) { + t.Fatalf("unexpected timestamps;\ngot\n%d\nwant\n%d", timestamps, timestampsExpected) + } + if result != resultExpected { + t.Fatalf("unexpected result;\ngot\n%s\nwant\n%s", result, resultExpected) + } + } + + // Verify non-empty data + data := `{"@timestamp":"2023-06-06T04:48:11.735Z","log":{"offset":71770,"file":{"path":"/var/log/auth.log"}},"message":"foobar"} +{"@timestamp":"2023-06-06T04:48:12.735Z","message":"baz"} +{"message":"xyz","@timestamp":"2023-06-06T04:48:13.735Z","x":"y"} +` + timeField := "@timestamp" + msgField := "message" + rowsExpected := 3 + timestampsExpected := []int64{1686026891735000000, 1686026892735000000, 1686026893735000000} + resultExpected := `{"@timestamp":"","log.offset":"71770","log.file.path":"/var/log/auth.log","_msg":"foobar"} +{"@timestamp":"","_msg":"baz"} +{"_msg":"xyz","@timestamp":"","x":"y"} +` + f(data, timeField, msgField, rowsExpected, timestampsExpected, resultExpected) +} diff --git a/app/vlinsert/main.go b/app/vlinsert/main.go index 64157229f8..5f5c9495d2 100644 --- a/app/vlinsert/main.go +++ b/app/vlinsert/main.go @@ -5,6 +5,7 @@ import ( "strings" "github.com/VictoriaMetrics/VictoriaMetrics/app/vlinsert/elasticsearch" + "github.com/VictoriaMetrics/VictoriaMetrics/app/vlinsert/jsonline" ) // Init initializes vlinsert @@ -28,6 +29,9 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool { case strings.HasPrefix(path, "/elasticsearch/"): path = strings.TrimPrefix(path, "/elasticsearch") return elasticsearch.RequestHandler(path, w, r) + case strings.HasPrefix(path, "/jsonline"): + path = strings.TrimPrefix(path, "/jsonline") + return jsonline.RequestHandler(path, w, r) default: return false } diff --git a/docs/VictoriaLogs/data-ingestion/README.md b/docs/VictoriaLogs/data-ingestion/README.md index 9915820bb2..b9902adc02 100644 --- a/docs/VictoriaLogs/data-ingestion/README.md +++ b/docs/VictoriaLogs/data-ingestion/README.md @@ -45,8 +45,18 @@ The command should return the following response: ### JSON stream API -TODO: document JSON stream API +VictoriaLogs supports HTTP API on `/insert/jsonline` endpoint for data ingestion where +body contains a JSON object in each line (separated by `\n`). +Here is an example: + + ```http request + POST http://localhost:9428/insert/jsonline/?_stream_fields=stream&_msg_field=log&_time_field=date + Content-Type: application/jsonl + { "log": { "level": "info", "message": "hello world" }, "date": "2023‐06‐20T15:31:23Z", "stream": "stream1" } + { "log": { "level": "error", "message": "oh no!" }, "date": "2023‐06‐20T15:32:10Z", "stream": "stream1" } + { "log": { "level": "info", "message": "hello world" }, "date": "2023‐06‐20T15:35:11Z", "stream": "stream2" } + ``` ### HTTP parameters