From 3aeb1b96a29e135ddd127d2e8b430f8845db5a24 Mon Sep 17 00:00:00 2001 From: Zhu Jiekun Date: Thu, 7 Nov 2024 00:25:05 +0800 Subject: [PATCH] app/vlinisert/loki: properly parse json logs with structured metadata Loki protocol supports optional `metadata` object for each ingested line. It's added as 3rd field at the (ts,msg,metadata) tuple. Previously, loki request json parsers rejected log line if tuple size != 2. This commit allows optional tuple field. It parses it as json object and adds it as log metadata fields to the log message stream. related issue: https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7431 --------- Co-authored-by: f41gh7 --- app/vlinsert/loki/loki_json.go | 34 ++++++++++++++++++++++++++--- app/vlinsert/loki/loki_json_test.go | 9 +++++++- docs/VictoriaLogs/CHANGELOG.md | 1 + 3 files changed, 40 insertions(+), 4 deletions(-) diff --git a/app/vlinsert/loki/loki_json.go b/app/vlinsert/loki/loki_json.go index 03890dea7..79630fd63 100644 --- a/app/vlinsert/loki/loki_json.go +++ b/app/vlinsert/loki/loki_json.go @@ -8,6 +8,9 @@ import ( "strconv" "time" + "github.com/VictoriaMetrics/metrics" + "github.com/valyala/fastjson" + "github.com/VictoriaMetrics/VictoriaMetrics/app/vlinsert/insertutils" "github.com/VictoriaMetrics/VictoriaMetrics/app/vlstorage" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" @@ -15,8 +18,6 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" "github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter" - "github.com/VictoriaMetrics/metrics" - "github.com/valyala/fastjson" ) var parserPool fastjson.ParserPool @@ -140,7 +141,7 @@ func parseJSONRequest(data []byte, lmp insertutils.LogMessageProcessor) (int, er if err != nil { return rowsIngested, fmt.Errorf("unexpected contents of `values` item; want array; got %q", line) } - if len(lineA) != 2 { + if len(lineA) < 2 { return rowsIngested, fmt.Errorf("unexpected number of values in `values` item array %q; got %d want 2", line, len(lineA)) } @@ -167,6 +168,33 @@ func parseJSONRequest(data []byte, lmp insertutils.LogMessageProcessor) (int, er Name: "_msg", Value: bytesutil.ToUnsafeString(msg), }) + + // parse structured metadata + if len(lineA) > 2 { + structuredMetadata, err := lineA[2].Object() + if err != nil { + return rowsIngested, fmt.Errorf("unexpected structured metadata type for %q; want JSON object", lineA[2]) + } + + structuredMetadata.Visit(func(k []byte, v *fastjson.Value) { + if err != nil { + return + } + vStr, errLocal := v.StringBytes() + if errLocal != nil { + err = fmt.Errorf("unexpected structuredMetadata label value type for %q:%q; want string", k, v) + return + } + + fields = append(fields, logstorage.Field{ + Name: bytesutil.ToUnsafeString(k), + Value: bytesutil.ToUnsafeString(vStr), + }) + }) + if err != nil { + return rowsIngested, fmt.Errorf("error when parsing line `structuredMetadata` object: %w", err) + } + } lmp.AddRow(ts, fields) } rowsIngested += len(lines) diff --git a/app/vlinsert/loki/loki_json_test.go b/app/vlinsert/loki/loki_json_test.go index 4ccb34cb9..01c52f658 100644 --- a/app/vlinsert/loki/loki_json_test.go +++ b/app/vlinsert/loki/loki_json_test.go @@ -45,13 +45,16 @@ func TestParseJSONRequest_Failure(t *testing.T) { // Invalid length of `values` individual item f(`{"streams":[{"values":[[]]}]}`) f(`{"streams":[{"values":[["123"]]}]}`) - f(`{"streams":[{"values":[["123","456","789"]]}]}`) // Invalid type for timestamp inside `values` individual item f(`{"streams":[{"values":[[123,"456"]}]}`) // Invalid type for log message f(`{"streams":[{"values":[["123",1234]]}]}`) + // invalid structured metadata type + f(`{"streams":[{"values":[["1577836800000000001", "foo bar", ["metadata_1", "md_value"]]]}]}`) + // structured metadata with unexpected value type + f(`{"streams":[{"values":[["1577836800000000001", "foo bar", {"metadata_1": 1}]] }]}`) } func TestParseJSONRequest_Success(t *testing.T) { @@ -116,4 +119,8 @@ func TestParseJSONRequest_Success(t *testing.T) { }`, []int64{1577836800000000001, 1577836900005000002, 1877836900005000002}, `{"foo":"bar","a":"b","_msg":"foo bar"} {"foo":"bar","a":"b","_msg":"abc"} {"x":"y","_msg":"yx"}`) + + // values with metadata + f(`{"streams":[{"values":[["1577836800000000001", "foo bar", {"metadata_1": "md_value"}]]}]}`, []int64{1577836800000000001}, `{"_msg":"foo bar","metadata_1":"md_value"}`) + f(`{"streams":[{"values":[["1577836800000000001", "foo bar", {}]]}]}`, []int64{1577836800000000001}, `{"_msg":"foo bar"}`) } diff --git a/docs/VictoriaLogs/CHANGELOG.md b/docs/VictoriaLogs/CHANGELOG.md index f8444a4d6..934cafe3a 100644 --- a/docs/VictoriaLogs/CHANGELOG.md +++ b/docs/VictoriaLogs/CHANGELOG.md @@ -16,6 +16,7 @@ according to [these docs](https://docs.victoriametrics.com/victorialogs/quicksta ## tip * FEATURE: add an ability to specify extra fields for logs ingested via [HTTP-based data ingestion protocols](https://docs.victoriametrics.com/victorialogs/data-ingestion/#http-apis). See `extra_fields` query arg and `VL-Extra-Fields` HTTP header in [these docs](https://docs.victoriametrics.com/victorialogs/data-ingestion/#http-parameters). +* BUGFIX: Properly parse structured metadata when ingesting logs with Loki ingestion protocol. An issue has been introduced in [v0.3.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.3.0-victorialogs). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7431) for the details. ## [v0.40.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.40.0-victorialogs)