From dde9ceed07adbdec18b3cc21095e18f25f9950db Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Wed, 21 Jun 2023 19:39:22 -0700 Subject: [PATCH] app/vlinsert/jsonline: code prettifying --- app/vlinsert/common/flags.go | 8 -- app/vlinsert/elasticsearch/elasticsearch.go | 78 ++--------- app/vlinsert/insertutils/common_params.go | 91 +++++++++++++ app/vlinsert/insertutils/flags.go | 10 ++ app/vlinsert/jsonline/jsonline.go | 143 ++++---------------- app/vlinsert/main.go | 6 +- docs/VictoriaLogs/data-ingestion/README.md | 68 ++++++++-- docs/VictoriaLogs/data-ingestion/Vector.md | 2 +- lib/logjson/parser.go | 15 ++ lib/logstorage/storage.go | 2 +- 10 files changed, 218 insertions(+), 205 deletions(-) delete mode 100644 app/vlinsert/common/flags.go create mode 100644 app/vlinsert/insertutils/common_params.go create mode 100644 app/vlinsert/insertutils/flags.go diff --git a/app/vlinsert/common/flags.go b/app/vlinsert/common/flags.go deleted file mode 100644 index 7c29adeea..000000000 --- a/app/vlinsert/common/flags.go +++ /dev/null @@ -1,8 +0,0 @@ -package common - -import "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" - -var ( - // MaxLineSizeBytes is the maximum size of a single line, which can be read by /insert/* handlers - MaxLineSizeBytes = flagutil.NewBytes("insert.maxLineSizeBytes", 256*1024, "The maximum size of a single line, which can be read by /insert/* handlers") -) diff --git a/app/vlinsert/elasticsearch/elasticsearch.go b/app/vlinsert/elasticsearch/elasticsearch.go index e02722a78..d951564c6 100644 --- a/app/vlinsert/elasticsearch/elasticsearch.go +++ b/app/vlinsert/elasticsearch/elasticsearch.go @@ -11,16 +11,15 @@ import ( "strings" "time" - "github.com/VictoriaMetrics/VictoriaMetrics/app/vlinsert/common" + "github.com/VictoriaMetrics/VictoriaMetrics/app/vlinsert/insertutils" "github.com/VictoriaMetrics/VictoriaMetrics/app/vlstorage" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bufferedwriter" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/httputils" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logjson" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage" - pc "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" "github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter" "github.com/VictoriaMetrics/metrics" ) @@ -84,54 +83,15 @@ func RequestHandler(path string, w http.ResponseWriter, r *http.Request) bool { startTime := time.Now() bulkRequestsTotal.Inc() - // Extract tenantID - tenantID, err := logstorage.GetTenantIDFromRequest(r) + cp, err := insertutils.GetCommonParams(r) if err != nil { httpserver.Errorf(w, r, "%s", err) return true } - - // Extract time field name from _time_field query arg - var timeField = "_time" - if tf := r.FormValue("_time_field"); tf != "" { - timeField = tf - } - - // Extract message field name from _msg_field query arg - var msgField = "" - if msgf := r.FormValue("_msg_field"); msgf != "" { - msgField = msgf - } - - streamFields := httputils.GetArray(r, "_stream_fields") - ignoreFields := httputils.GetArray(r, "ignore_fields") - - isDebug := httputils.GetBool(r, "debug") - debugRequestURI := "" - debugRemoteAddr := "" - if isDebug { - debugRequestURI = httpserver.GetRequestURI(r) - debugRemoteAddr = httpserver.GetQuotedRemoteAddr(r) - } - - lr := logstorage.GetLogRows(streamFields, ignoreFields) - processLogMessage := func(timestamp int64, fields []logstorage.Field) { - lr.MustAdd(tenantID, timestamp, fields) - if isDebug { - s := lr.GetRowString(0) - lr.ResetKeepSettings() - logger.Infof("remoteAddr=%s; requestURI=%s; ignoring log entry because of `debug` query arg: %s", debugRemoteAddr, debugRequestURI, s) - rowsDroppedTotal.Inc() - return - } - if lr.NeedFlush() { - vlstorage.MustAddRows(lr) - lr.ResetKeepSettings() - } - } - + lr := logstorage.GetLogRows(cp.StreamFields, cp.IgnoreFields) + processLogMessage := cp.GetProcessLogMessageFunc(lr) isGzip := r.Header.Get("Content-Encoding") == "gzip" - n, err := readBulkRequest(r.Body, isGzip, timeField, msgField, processLogMessage) + n, err := readBulkRequest(r.Body, isGzip, cp.TimeField, cp.MsgField, processLogMessage) if err != nil { logger.Warnf("cannot decode log message #%d in /_bulk request: %s", n, err) return true @@ -152,7 +112,6 @@ func RequestHandler(path string, w http.ResponseWriter, r *http.Request) bool { var ( bulkRequestsTotal = metrics.NewCounter(`vl_http_requests_total{path="/insert/elasticsearch/_bulk"}`) - rowsDroppedTotal = metrics.NewCounter(`vl_rows_dropped_total{path="/insert/elasticsearch/_bulk",reason="debug"}`) ) func readBulkRequest(r io.Reader, isGzip bool, timeField, msgField string, @@ -161,11 +120,11 @@ func readBulkRequest(r io.Reader, isGzip bool, timeField, msgField string, // See https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html if isGzip { - zr, err := pc.GetGzipReader(r) + zr, err := common.GetGzipReader(r) if err != nil { return 0, fmt.Errorf("cannot read gzipped _bulk request: %w", err) } - defer pc.PutGzipReader(zr) + defer common.PutGzipReader(zr) r = zr } @@ -175,7 +134,7 @@ func readBulkRequest(r io.Reader, isGzip bool, timeField, msgField string, lb := lineBufferPool.Get() defer lineBufferPool.Put(lb) - lb.B = bytesutil.ResizeNoCopyNoOverallocate(lb.B, common.MaxLineSizeBytes.IntN()) + lb.B = bytesutil.ResizeNoCopyNoOverallocate(lb.B, insertutils.MaxLineSizeBytes.IntN()) sc := bufio.NewScanner(wcr) sc.Buffer(lb.B, len(lb.B)) @@ -211,7 +170,7 @@ func readBulkLine(sc *bufio.Scanner, timeField, msgField string, if err := sc.Err(); err != nil { if errors.Is(err, bufio.ErrTooLong) { return false, fmt.Errorf(`cannot read "create" or "index" command, since its size exceeds -insert.maxLineSizeBytes=%d`, - common.MaxLineSizeBytes.IntN()) + insertutils.MaxLineSizeBytes.IntN()) } return false, err } @@ -228,7 +187,7 @@ func readBulkLine(sc *bufio.Scanner, timeField, msgField string, if !sc.Scan() { if err := sc.Err(); err != nil { if errors.Is(err, bufio.ErrTooLong) { - return false, fmt.Errorf("cannot read log message, since its size exceeds -insert.maxLineSizeBytes=%d", common.MaxLineSizeBytes.IntN()) + return false, fmt.Errorf("cannot read log message, since its size exceeds -insert.maxLineSizeBytes=%d", insertutils.MaxLineSizeBytes.IntN()) } return false, err } @@ -244,7 +203,7 @@ func readBulkLine(sc *bufio.Scanner, timeField, msgField string, if err != nil { return false, fmt.Errorf("cannot parse timestamp: %w", err) } - updateMessageFieldName(msgField, p.Fields) + p.RenameField(msgField, "_msg") processLogMessage(timestamp, p.Fields) logjson.PutParser(p) return true, nil @@ -266,19 +225,6 @@ func extractTimestampFromFields(timeField string, fields []logstorage.Field) (in return time.Now().UnixNano(), nil } -func updateMessageFieldName(msgField string, fields []logstorage.Field) { - if msgField == "" { - return - } - for i := range fields { - f := &fields[i] - if f.Name == msgField { - f.Name = "_msg" - return - } - } -} - func parseElasticsearchTimestamp(s string) (int64, error) { if len(s) < len("YYYY-MM-DD") || s[len("YYYY")] != '-' { // Try parsing timestamp in milliseconds diff --git a/app/vlinsert/insertutils/common_params.go b/app/vlinsert/insertutils/common_params.go new file mode 100644 index 000000000..97bf5c280 --- /dev/null +++ b/app/vlinsert/insertutils/common_params.go @@ -0,0 +1,91 @@ +package insertutils + +import ( + "net/http" + + "github.com/VictoriaMetrics/VictoriaMetrics/app/vlstorage" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/httputils" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage" + "github.com/VictoriaMetrics/metrics" +) + +// CommonParams contains common HTTP parameters used by log ingestion APIs. +// +// See https://docs.victoriametrics.com/VictoriaLogs/data-ingestion/#http-parameters +type CommonParams struct { + TenantID logstorage.TenantID + TimeField string + MsgField string + StreamFields []string + IgnoreFields []string + + Debug bool + DebugRequestURI string + DebugRemoteAddr string +} + +// GetCommonParams returns CommonParams from r. +func GetCommonParams(r *http.Request) (*CommonParams, error) { + // Extract tenantID + tenantID, err := logstorage.GetTenantIDFromRequest(r) + if err != nil { + return nil, err + } + + // Extract time field name from _time_field query arg + var timeField = "_time" + if tf := r.FormValue("_time_field"); tf != "" { + timeField = tf + } + + // Extract message field name from _msg_field query arg + var msgField = "" + if msgf := r.FormValue("_msg_field"); msgf != "" { + msgField = msgf + } + + streamFields := httputils.GetArray(r, "_stream_fields") + ignoreFields := httputils.GetArray(r, "ignore_fields") + + debug := httputils.GetBool(r, "debug") + debugRequestURI := "" + debugRemoteAddr := "" + if debug { + debugRequestURI = httpserver.GetRequestURI(r) + debugRemoteAddr = httpserver.GetQuotedRemoteAddr(r) + } + + cp := &CommonParams{ + TenantID: tenantID, + TimeField: timeField, + MsgField: msgField, + StreamFields: streamFields, + IgnoreFields: ignoreFields, + Debug: debug, + DebugRequestURI: debugRequestURI, + DebugRemoteAddr: debugRemoteAddr, + } + return cp, nil +} + +// GetProcessLogMessageFunc returns a function, which adds parsed log messages to lr. +func (cp *CommonParams) GetProcessLogMessageFunc(lr *logstorage.LogRows) func(timestamp int64, fields []logstorage.Field) { + return func(timestamp int64, fields []logstorage.Field) { + lr.MustAdd(cp.TenantID, timestamp, fields) + if cp.Debug { + s := lr.GetRowString(0) + lr.ResetKeepSettings() + logger.Infof("remoteAddr=%s; requestURI=%s; ignoring log entry because of `debug` query arg: %s", cp.DebugRemoteAddr, cp.DebugRequestURI, s) + rowsDroppedTotal.Inc() + return + } + if lr.NeedFlush() { + vlstorage.MustAddRows(lr) + lr.ResetKeepSettings() + } + } +} + +var rowsDroppedTotal = metrics.NewCounter(`vl_rows_dropped_total{reason="debug"}`) diff --git a/app/vlinsert/insertutils/flags.go b/app/vlinsert/insertutils/flags.go new file mode 100644 index 000000000..8f070488c --- /dev/null +++ b/app/vlinsert/insertutils/flags.go @@ -0,0 +1,10 @@ +package insertutils + +import ( + "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" +) + +var ( + // MaxLineSizeBytes is the maximum length of a single line for /insert/* handlers + MaxLineSizeBytes = flagutil.NewBytes("insert.maxLineSizeBytes", 256*1024, "The maximum size of a single line, which can be read by /insert/* handlers") +) diff --git a/app/vlinsert/jsonline/jsonline.go b/app/vlinsert/jsonline/jsonline.go index de49156c7..373bc0129 100644 --- a/app/vlinsert/jsonline/jsonline.go +++ b/app/vlinsert/jsonline/jsonline.go @@ -4,92 +4,48 @@ import ( "bufio" "errors" "fmt" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/httputils" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/logjson" - "math" "net/http" - "strconv" "time" - "github.com/VictoriaMetrics/VictoriaMetrics/app/vlinsert/common" + "github.com/VictoriaMetrics/VictoriaMetrics/app/vlinsert/insertutils" "github.com/VictoriaMetrics/VictoriaMetrics/app/vlstorage" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/logjson" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage" - pc "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" "github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter" "github.com/VictoriaMetrics/metrics" ) // RequestHandler processes jsonline insert requests -func RequestHandler(path string, w http.ResponseWriter, r *http.Request) bool { +func RequestHandler(w http.ResponseWriter, r *http.Request) bool { w.Header().Add("Content-Type", "application/json") - if path != "/" { - return false - } - if method := r.Method; method != "POST" { + if r.Method != "POST" { w.WriteHeader(http.StatusMethodNotAllowed) return true } requestsTotal.Inc() - // Extract tenantID - tenantID, err := logstorage.GetTenantIDFromRequest(r) + cp, err := insertutils.GetCommonParams(r) if err != nil { httpserver.Errorf(w, r, "%s", err) return true } - - // Extract time field name from _time_field query arg - var timeField = "_time" - if tf := r.FormValue("_time_field"); tf != "" { - timeField = tf - } - - // Extract message field name from _msg_field query arg - var msgField = "" - if msgf := r.FormValue("_msg_field"); msgf != "" { - msgField = msgf - } - - streamFields := httputils.GetArray(r, "_stream_fields") - ignoreFields := httputils.GetArray(r, "ignore_fields") - - isDebug := httputils.GetBool(r, "debug") - debugRequestURI := "" - debugRemoteAddr := "" - if isDebug { - debugRequestURI = httpserver.GetRequestURI(r) - debugRemoteAddr = httpserver.GetQuotedRemoteAddr(r) - } - - lr := logstorage.GetLogRows(streamFields, ignoreFields) - processLogMessage := func(timestamp int64, fields []logstorage.Field) { - lr.MustAdd(tenantID, timestamp, fields) - if isDebug { - s := lr.GetRowString(0) - lr.ResetKeepSettings() - logger.Infof("remoteAddr=%s; requestURI=%s; ignoring log entry because of `debug` query arg: %s", debugRemoteAddr, debugRequestURI, s) - rowsDroppedTotal.Inc() - return - } - if lr.NeedFlush() { - vlstorage.MustAddRows(lr) - lr.ResetKeepSettings() - } - } + lr := logstorage.GetLogRows(cp.StreamFields, cp.IgnoreFields) + processLogMessage := cp.GetProcessLogMessageFunc(lr) reader := r.Body if r.Header.Get("Content-Encoding") == "gzip" { - zr, err := pc.GetGzipReader(reader) + zr, err := common.GetGzipReader(reader) if err != nil { - //return 0, fmt.Errorf("cannot read gzipped _bulk request: %w", err) + logger.Errorf("cannot read gzipped _bulk request: %s", err) return true } - defer pc.PutGzipReader(zr) + defer common.PutGzipReader(zr) reader = zr } @@ -99,16 +55,17 @@ func RequestHandler(path string, w http.ResponseWriter, r *http.Request) bool { lb := lineBufferPool.Get() defer lineBufferPool.Put(lb) - lb.B = bytesutil.ResizeNoCopyNoOverallocate(lb.B, common.MaxLineSizeBytes.IntN()) + lb.B = bytesutil.ResizeNoCopyNoOverallocate(lb.B, insertutils.MaxLineSizeBytes.IntN()) sc := bufio.NewScanner(wcr) sc.Buffer(lb.B, len(lb.B)) n := 0 for { - ok, err := readLine(sc, timeField, msgField, processLogMessage) + ok, err := readLine(sc, cp.TimeField, cp.MsgField, processLogMessage) wcr.DecConcurrency() if err != nil { logger.Errorf("cannot read line #%d in /jsonline request: %s", n, err) + break } if !ok { break @@ -124,30 +81,29 @@ func RequestHandler(path string, w http.ResponseWriter, r *http.Request) bool { } func readLine(sc *bufio.Scanner, timeField, msgField string, processLogMessage func(timestamp int64, fields []logstorage.Field)) (bool, error) { - if !sc.Scan() { - if err := sc.Err(); err != nil { - if errors.Is(err, bufio.ErrTooLong) { - return false, fmt.Errorf(`cannot read json line, since its size exceeds -insert.maxLineSizeBytes=%d`, common.MaxLineSizeBytes.IntN()) + var line []byte + for len(line) == 0 { + if !sc.Scan() { + if err := sc.Err(); err != nil { + if errors.Is(err, bufio.ErrTooLong) { + return false, fmt.Errorf(`cannot read json line, since its size exceeds -insert.maxLineSizeBytes=%d`, insertutils.MaxLineSizeBytes.IntN()) + } + return false, err } - return false, err + return false, nil } - return false, nil + line = sc.Bytes() } - line := sc.Bytes() p := logjson.GetParser() - if err := p.ParseLogMessage(line); err != nil { - invalidJSONLineLogger.Warnf("cannot parse json-encoded log entry: %s", err) - return true, nil + return false, fmt.Errorf("cannot parse json-encoded log entry: %w", err) } - timestamp, err := extractTimestampFromFields(timeField, p.Fields) if err != nil { - invalidTimestampLogger.Warnf("skipping the log entry because cannot parse timestamp: %s", err) - return true, nil + return false, fmt.Errorf("cannot parse timestamp: %w", err) } - updateMessageFieldName(msgField, p.Fields) + p.RenameField(msgField, "_msg") processLogMessage(timestamp, p.Fields) logjson.PutParser(p) return true, nil @@ -159,7 +115,7 @@ func extractTimestampFromFields(timeField string, fields []logstorage.Field) (in if f.Name != timeField { continue } - timestamp, err := parseTimestamp(f.Value) + timestamp, err := parseISO8601Timestamp(f.Value) if err != nil { return 0, err } @@ -169,42 +125,7 @@ func extractTimestampFromFields(timeField string, fields []logstorage.Field) (in return time.Now().UnixNano(), nil } -func updateMessageFieldName(msgField string, fields []logstorage.Field) { - if msgField == "" { - return - } - for i := range fields { - f := &fields[i] - if f.Name == msgField { - f.Name = "_msg" - return - } - } -} - -func parseTimestamp(s string) (int64, error) { - if len(s) < len("YYYY-MM-DD") || s[len("YYYY")] != '-' { - // Try parsing timestamp in milliseconds - n, err := strconv.ParseInt(s, 10, 64) - if err != nil { - return 0, fmt.Errorf("cannot parse timestamp in milliseconds from %q: %w", s, err) - } - if n > int64(math.MaxInt64)/1e6 { - return 0, fmt.Errorf("too big timestamp in milliseconds: %d; mustn't exceed %d", n, int64(math.MaxInt64)/1e6) - } - if n < int64(math.MinInt64)/1e6 { - return 0, fmt.Errorf("too small timestamp in milliseconds: %d; must be bigger than %d", n, int64(math.MinInt64)/1e6) - } - n *= 1e6 - return n, nil - } - if len(s) == len("YYYY-MM-DD") { - t, err := time.Parse("2006-01-02", s) - if err != nil { - return 0, fmt.Errorf("cannot parse date %q: %w", s, err) - } - return t.UnixNano(), nil - } +func parseISO8601Timestamp(s string) (int64, error) { t, err := time.Parse(time.RFC3339, s) if err != nil { return 0, fmt.Errorf("cannot parse timestamp %q: %w", s, err) @@ -217,10 +138,4 @@ var lineBufferPool bytesutil.ByteBufferPool var ( requestsTotal = metrics.NewCounter(`vl_http_requests_total{path="/insert/jsonline"}`) rowsIngestedTotal = metrics.NewCounter(`vl_rows_ingested_total{type="jsonline"}`) - rowsDroppedTotal = metrics.NewCounter(`vl_rows_dropped_total{path="/insert/jsonline",reason="debug"}`) -) - -var ( - invalidTimestampLogger = logger.WithThrottler("invalidTimestampLogger", 5*time.Second) - invalidJSONLineLogger = logger.WithThrottler("invalidJSONLineLogger", 5*time.Second) ) diff --git a/app/vlinsert/main.go b/app/vlinsert/main.go index 5f5c9495d..7aa7254a1 100644 --- a/app/vlinsert/main.go +++ b/app/vlinsert/main.go @@ -25,13 +25,13 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool { path = strings.TrimPrefix(path, "/insert") path = strings.ReplaceAll(path, "//", "/") + if path == "/jsonline" { + return jsonline.RequestHandler(w, r) + } switch { case strings.HasPrefix(path, "/elasticsearch/"): path = strings.TrimPrefix(path, "/elasticsearch") return elasticsearch.RequestHandler(path, w, r) - case strings.HasPrefix(path, "/jsonline"): - path = strings.TrimPrefix(path, "/jsonline") - return jsonline.RequestHandler(path, w, r) default: return false } diff --git a/docs/VictoriaLogs/data-ingestion/README.md b/docs/VictoriaLogs/data-ingestion/README.md index 36e5b378b..3259e5215 100644 --- a/docs/VictoriaLogs/data-ingestion/README.md +++ b/docs/VictoriaLogs/data-ingestion/README.md @@ -30,7 +30,7 @@ VictoriaLogs accepts logs in [Elasticsearch bulk API](https://www.elastic.co/gui / [OpenSearch Bulk API](http://opensearch.org/docs/1.2/opensearch/rest-api/document-apis/bulk/) format at `http://localhost:9428/insert/elasticsearch/_bulk` endpoint. -The following command pushes a single log line to Elasticsearch bulk API at VictoriaLogs: +The following command pushes a single log line to VictoriaLogs: ```bash echo '{"create":{}} @@ -38,7 +38,14 @@ echo '{"create":{}} ' | curl -X POST -H 'Content-Type: application/json' --data-binary @- http://localhost:9428/insert/elasticsearch/_bulk ``` -The following command verifies that the data has been successfully pushed to VictoriaLogs by [querying](https://docs.victoriametrics.com/VictoriaLogs/querying/) it: +It is possible to push thousands of log lines in a single request to this API. + +See [these docs](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#data-model) for details on fields, +which must be present in the ingested log messages. + +The API accepts various http parameters, which can change the data ingestion behavior - [these docs](#http-parameters) for details. + +The following command verifies that the data has been successfully ingested to VictoriaLogs by [querying](https://docs.victoriametrics.com/VictoriaLogs/querying/) it: ```bash curl http://localhost:9428/select/logsql/query -d 'query=host.name:host123' @@ -50,20 +57,57 @@ The command should return the following response: {"_msg":"cannot open file","_stream":"{}","_time":"2023-06-21T04:24:24Z","host.name":"host123"} ``` +See also: + +- [How to debug data ingestion](#troubleshooting). +- [HTTP parameters, which can be passed to the API](#http-parameters). +- [How to query VictoriaLogs](https://docs.victoriametrics.com/VictoriaLogs/querying.html). + ### JSON stream API -VictoriaLogs supports HTTP API on `/insert/jsonline` endpoint for data ingestion where -body contains a JSON object in each line (separated by `\n`). +VictoriaLogs accepts JSON line stream aka [ndjson](http://ndjson.org/) at `http://localhost:9428/insert/jsonline` endpoint. -Here is an example: +The following command pushes multiple log lines to VictoriaLogs: - ```http request - POST http://localhost:9428/insert/jsonline/?_stream_fields=stream&_msg_field=log&_time_field=date - Content-Type: application/jsonl - { "log": { "level": "info", "message": "hello world" }, "date": "2023‐06‐20T15:31:23Z", "stream": "stream1" } - { "log": { "level": "error", "message": "oh no!" }, "date": "2023‐06‐20T15:32:10Z", "stream": "stream1" } - { "log": { "level": "info", "message": "hello world" }, "date": "2023‐06‐20T15:35:11Z", "stream": "stream2" } - ``` + ```bash +echo '{ "log": { "level": "info", "message": "hello world" }, "date": "2023-06-20T15:31:23Z", "stream": "stream1" } +{ "log": { "level": "error", "message": "oh no!" }, "date": "2023-06-20T15:32:10.567Z", "stream": "stream1" } +{ "log": { "level": "info", "message": "hello world" }, "date": "2023-06-20T15:35:11.567890+02:00", "stream": "stream2" } +' | curl -X POST -H 'Content-Type: application/stream+json' --data-binary @- \ + 'http://localhost:9428/insert/jsonline?_stream_fields=stream&_time_field=date&_msg_field=log.message' +``` + +It is possible to push unlimited number of log lines in a single request to this API. + +The [timestamp field](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#time-field) must be +in the [ISO8601](https://en.wikipedia.org/wiki/ISO_8601) format. For example, `2023-06-20T15:32:10Z`. +Optional fractional part of seconds can be specified after the dot - `2023-06-20T15:32:10.123Z`. +Timezone can be specified instead of `Z` suffix - `2023-06-20T15:32:10+02:00`. + +See [these docs](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#data-model) for details on fields, +which must be present in the ingested log messages. + +The API accepts various http parameters, which can change the data ingestion behavior - [these docs](#http-parameters) for details. + +The following command verifies that the data has been successfully ingested into VictoriaLogs by [querying](https://docs.victoriametrics.com/VictoriaLogs/querying/) it: + +```bash +curl http://localhost:9428/select/logsql/query -d 'query=log.level:*' +``` + +The command should return the following response: + +```bash +{"_msg":"hello world","_stream":"{stream=\"stream2\"}","_time":"2023-06-20T13:35:11.56789Z","log.level":"info"} +{"_msg":"hello world","_stream":"{stream=\"stream1\"}","_time":"2023-06-20T15:31:23Z","log.level":"info"} +{"_msg":"oh no!","_stream":"{stream=\"stream1\"}","_time":"2023-06-20T15:32:10.567Z","log.level":"error"} +``` + +See also: + +- [How to debug data ingestion](#troubleshooting). +- [HTTP parameters, which can be passed to the API](#http-parameters). +- [How to query VictoriaLogs](https://docs.victoriametrics.com/VictoriaLogs/querying.html). ### HTTP parameters diff --git a/docs/VictoriaLogs/data-ingestion/Vector.md b/docs/VictoriaLogs/data-ingestion/Vector.md index 18fde4a31..9843d33e4 100644 --- a/docs/VictoriaLogs/data-ingestion/Vector.md +++ b/docs/VictoriaLogs/data-ingestion/Vector.md @@ -133,5 +133,5 @@ See also: - [Data ingestion troubleshooting](https://docs.victoriametrics.com/VictoriaLogs/data-ingestion/#troubleshooting). - [How to query VictoriaLogs](https://docs.victoriametrics.com/VictoriaLogs/querying/). -- [Elasticsearch output docs for Vector.dev](https://vector.dev/docs/reference/configuration/sinks/elasticsearch/). +- [Elasticsearch output docs for Vector](https://vector.dev/docs/reference/configuration/sinks/elasticsearch/). - [Docker-compose demo for Filebeat integration with VictoriaLogs](https://github.com/VictoriaMetrics/VictoriaMetrics/tree/master/deployment/docker/victorialogs/vector-docker). diff --git a/lib/logjson/parser.go b/lib/logjson/parser.go index 6bc8919d5..aa669181a 100644 --- a/lib/logjson/parser.go +++ b/lib/logjson/parser.go @@ -84,6 +84,21 @@ func (p *Parser) ParseLogMessage(msg []byte) error { return nil } +// RenameField renames field with the oldName to newName in p.Fields +func (p *Parser) RenameField(oldName, newName string) { + if oldName == "" { + return + } + fields := p.Fields + for i := range fields { + f := &fields[i] + if f.Name == oldName { + f.Name = newName + return + } + } +} + func appendLogFields(dst []logstorage.Field, dstBuf, prefixBuf []byte, v *fastjson.Value) ([]logstorage.Field, []byte, []byte) { o := v.GetObject() o.Visit(func(k []byte, v *fastjson.Value) { diff --git a/lib/logstorage/storage.go b/lib/logstorage/storage.go index bcc93cd69..53a45d15e 100644 --- a/lib/logstorage/storage.go +++ b/lib/logstorage/storage.go @@ -457,7 +457,7 @@ type TimeFormatter int64 func (tf *TimeFormatter) String() string { ts := int64(*tf) t := time.Unix(0, ts).UTC() - return t.Format(time.RFC3339) + return t.Format(time.RFC3339Nano) } func (s *Storage) getPartitionForDay(day int64) *partitionWrapper {