From ed73f8350b5710d27895facf50c84147c7a6af7e Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Wed, 30 Oct 2024 14:13:56 +0100 Subject: [PATCH] app/vlinsert: allow specifying comma-separated list of fields containing log message via _msg_field query arg and VL-Msg-Field HTTP request header This msy be useful when ingesting logs from different sources, which store the log message in different fields. For example, `_msg_field=message,event.data,some_field` will get log message from the first non-empty field: `message`, `event.data` and `some_field`. --- app/vlinsert/elasticsearch/elasticsearch.go | 10 +++++----- app/vlinsert/elasticsearch/elasticsearch_test.go | 7 ++++--- .../elasticsearch/elasticsearch_timing_test.go | 4 ++-- app/vlinsert/insertutils/common_params.go | 12 +++++++++--- app/vlinsert/journald/journald.go | 5 +++-- app/vlinsert/journald/journald_test.go | 4 ++-- app/vlinsert/jsonline/jsonline.go | 10 +++++----- app/vlinsert/jsonline/jsonline_test.go | 5 +++-- app/vlinsert/syslog/syslog.go | 4 +++- docs/VictoriaLogs/CHANGELOG.md | 2 ++ docs/VictoriaLogs/data-ingestion/README.md | 8 ++++++++ docs/VictoriaLogs/keyConcepts.md | 3 ++- lib/logstorage/rows.go | 10 ++++++---- 13 files changed, 54 insertions(+), 30 deletions(-) diff --git a/app/vlinsert/elasticsearch/elasticsearch.go b/app/vlinsert/elasticsearch/elasticsearch.go index fff9f19be..b0e941cd2 100644 --- a/app/vlinsert/elasticsearch/elasticsearch.go +++ b/app/vlinsert/elasticsearch/elasticsearch.go @@ -103,7 +103,7 @@ func RequestHandler(path string, w http.ResponseWriter, r *http.Request) bool { } lmp := cp.NewLogMessageProcessor() isGzip := r.Header.Get("Content-Encoding") == "gzip" - n, err := readBulkRequest(r.Body, isGzip, cp.TimeField, cp.MsgField, lmp) + n, err := readBulkRequest(r.Body, isGzip, cp.TimeField, cp.MsgFields, lmp) lmp.MustClose() if err != nil { logger.Warnf("cannot decode log message #%d in /_bulk request: %s, stream fields: %s", n, err, cp.StreamFields) @@ -133,7 +133,7 @@ var ( bulkRequestDuration = metrics.NewHistogram(`vl_http_request_duration_seconds{path="/insert/elasticsearch/_bulk"}`) ) -func readBulkRequest(r io.Reader, isGzip bool, timeField, msgField string, lmp insertutils.LogMessageProcessor) (int, error) { +func readBulkRequest(r io.Reader, isGzip bool, timeField string, msgFields []string, lmp insertutils.LogMessageProcessor) (int, error) { // See https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html if isGzip { @@ -158,7 +158,7 @@ func readBulkRequest(r io.Reader, isGzip bool, timeField, msgField string, lmp i n := 0 nCheckpoint := 0 for { - ok, err := readBulkLine(sc, timeField, msgField, lmp) + ok, err := readBulkLine(sc, timeField, msgFields, lmp) wcr.DecConcurrency() if err != nil || !ok { rowsIngestedTotal.Add(n - nCheckpoint) @@ -174,7 +174,7 @@ func readBulkRequest(r io.Reader, isGzip bool, timeField, msgField string, lmp i var lineBufferPool bytesutil.ByteBufferPool -func readBulkLine(sc *bufio.Scanner, timeField, msgField string, lmp insertutils.LogMessageProcessor) (bool, error) { +func readBulkLine(sc *bufio.Scanner, timeField string, msgFields []string, lmp insertutils.LogMessageProcessor) (bool, error) { var line []byte // Read the command, must be "create" or "index" @@ -219,7 +219,7 @@ func readBulkLine(sc *bufio.Scanner, timeField, msgField string, lmp insertutils if ts == 0 { ts = time.Now().UnixNano() } - logstorage.RenameField(p.Fields, msgField, "_msg") + logstorage.RenameField(p.Fields, msgFields, "_msg") lmp.AddRow(ts, p.Fields) logstorage.PutJSONParser(p) diff --git a/app/vlinsert/elasticsearch/elasticsearch_test.go b/app/vlinsert/elasticsearch/elasticsearch_test.go index 4370fb669..a4c6c9686 100644 --- a/app/vlinsert/elasticsearch/elasticsearch_test.go +++ b/app/vlinsert/elasticsearch/elasticsearch_test.go @@ -15,7 +15,7 @@ func TestReadBulkRequest_Failure(t *testing.T) { tlp := &insertutils.TestLogMessageProcessor{} r := bytes.NewBufferString(data) - rows, err := readBulkRequest(r, false, "_time", "_msg", tlp) + rows, err := readBulkRequest(r, false, "_time", []string{"_msg"}, tlp) if err == nil { t.Fatalf("expecting non-empty error") } @@ -36,11 +36,12 @@ func TestReadBulkRequest_Success(t *testing.T) { f := func(data, timeField, msgField string, rowsExpected int, timestampsExpected []int64, resultExpected string) { t.Helper() + msgFields := []string{"non_existing_foo", msgField, "non_exiting_bar"} tlp := &insertutils.TestLogMessageProcessor{} // Read the request without compression r := bytes.NewBufferString(data) - rows, err := readBulkRequest(r, false, timeField, msgField, tlp) + rows, err := readBulkRequest(r, false, timeField, msgFields, tlp) if err != nil { t.Fatalf("unexpected error: %s", err) } @@ -55,7 +56,7 @@ func TestReadBulkRequest_Success(t *testing.T) { tlp = &insertutils.TestLogMessageProcessor{} compressedData := compressData(data) r = bytes.NewBufferString(compressedData) - rows, err = readBulkRequest(r, true, timeField, msgField, tlp) + rows, err = readBulkRequest(r, true, timeField, msgFields, tlp) if err != nil { t.Fatalf("unexpected error: %s", err) } diff --git a/app/vlinsert/elasticsearch/elasticsearch_timing_test.go b/app/vlinsert/elasticsearch/elasticsearch_timing_test.go index a0b4e485b..8e592ddbf 100644 --- a/app/vlinsert/elasticsearch/elasticsearch_timing_test.go +++ b/app/vlinsert/elasticsearch/elasticsearch_timing_test.go @@ -32,7 +32,7 @@ func benchmarkReadBulkRequest(b *testing.B, isGzip bool) { dataBytes := bytesutil.ToUnsafeBytes(data) timeField := "@timestamp" - msgField := "message" + msgFields := []string{"message"} blp := &insertutils.BenchmarkLogMessageProcessor{} b.ReportAllocs() @@ -41,7 +41,7 @@ func benchmarkReadBulkRequest(b *testing.B, isGzip bool) { r := &bytes.Reader{} for pb.Next() { r.Reset(dataBytes) - _, err := readBulkRequest(r, isGzip, timeField, msgField, blp) + _, err := readBulkRequest(r, isGzip, timeField, msgFields, blp) if err != nil { panic(fmt.Errorf("unexpected error: %w", err)) } diff --git a/app/vlinsert/insertutils/common_params.go b/app/vlinsert/insertutils/common_params.go index 733079ce0..12b9f1103 100644 --- a/app/vlinsert/insertutils/common_params.go +++ b/app/vlinsert/insertutils/common_params.go @@ -22,7 +22,7 @@ import ( type CommonParams struct { TenantID logstorage.TenantID TimeField string - MsgField string + MsgFields []string StreamFields []string IgnoreFields []string @@ -54,6 +54,10 @@ func GetCommonParams(r *http.Request) (*CommonParams, error) { } else if msgf = r.Header.Get("VL-Msg-Field"); msgf != "" { msgField = msgf } + var msgFields []string + if msgField != "" { + msgFields = strings.Split(msgField, ",") + } streamFields := httputils.GetArray(r, "_stream_fields") if len(streamFields) == 0 { @@ -89,7 +93,7 @@ func GetCommonParams(r *http.Request) (*CommonParams, error) { cp := &CommonParams{ TenantID: tenantID, TimeField: timeField, - MsgField: msgField, + MsgFields: msgFields, StreamFields: streamFields, IgnoreFields: ignoreFields, Debug: debug, @@ -106,7 +110,9 @@ func GetCommonParamsForSyslog(tenantID logstorage.TenantID) *CommonParams { cp := &CommonParams{ TenantID: tenantID, TimeField: "timestamp", - MsgField: "message", + MsgFields: []string{ + "message", + }, StreamFields: []string{ "hostname", "app_name", diff --git a/app/vlinsert/journald/journald.go b/app/vlinsert/journald/journald.go index c337daa54..1cf76c354 100644 --- a/app/vlinsert/journald/journald.go +++ b/app/vlinsert/journald/journald.go @@ -8,6 +8,7 @@ import ( "io" "net/http" "regexp" + "slices" "strconv" "strings" "time" @@ -64,7 +65,7 @@ func getCommonParams(r *http.Request) (*insertutils.CommonParams, error) { if len(cp.IgnoreFields) == 0 { cp.IgnoreFields = *journaldIgnoreFields } - cp.MsgField = "MESSAGE" + cp.MsgFields = []string{"MESSAGE"} return cp, nil } @@ -233,7 +234,7 @@ func parseJournaldRequest(data []byte, lmp insertutils.LogMessageProcessor, cp * continue } - if name == cp.MsgField { + if slices.Contains(cp.MsgFields, name) { name = "_msg" } diff --git a/app/vlinsert/journald/journald_test.go b/app/vlinsert/journald/journald_test.go index b3d3db93c..86fd0de1f 100644 --- a/app/vlinsert/journald/journald_test.go +++ b/app/vlinsert/journald/journald_test.go @@ -12,7 +12,7 @@ func TestPushJournaldOk(t *testing.T) { tlp := &insertutils.TestLogMessageProcessor{} cp := &insertutils.CommonParams{ TimeField: "__REALTIME_TIMESTAMP", - MsgField: "MESSAGE", + MsgFields: []string{"MESSAGE"}, } n, err := parseJournaldRequest([]byte(src), tlp, cp) if err != nil { @@ -48,7 +48,7 @@ func TestPushJournald_Failure(t *testing.T) { tlp := &insertutils.TestLogMessageProcessor{} cp := &insertutils.CommonParams{ TimeField: "__REALTIME_TIMESTAMP", - MsgField: "MESSAGE", + MsgFields: []string{"MESSAGE"}, } _, err := parseJournaldRequest([]byte(data), tlp, cp) if err == nil { diff --git a/app/vlinsert/jsonline/jsonline.go b/app/vlinsert/jsonline/jsonline.go index 10df87367..1db2cf7ca 100644 --- a/app/vlinsert/jsonline/jsonline.go +++ b/app/vlinsert/jsonline/jsonline.go @@ -53,7 +53,7 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) { } lmp := cp.NewLogMessageProcessor() - err = processStreamInternal(reader, cp.TimeField, cp.MsgField, lmp) + err = processStreamInternal(reader, cp.TimeField, cp.MsgFields, lmp) lmp.MustClose() if err != nil { @@ -66,7 +66,7 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) { } } -func processStreamInternal(r io.Reader, timeField, msgField string, lmp insertutils.LogMessageProcessor) error { +func processStreamInternal(r io.Reader, timeField string, msgFields []string, lmp insertutils.LogMessageProcessor) error { wcr := writeconcurrencylimiter.GetReader(r) defer writeconcurrencylimiter.PutReader(wcr) @@ -79,7 +79,7 @@ func processStreamInternal(r io.Reader, timeField, msgField string, lmp insertut n := 0 for { - ok, err := readLine(sc, timeField, msgField, lmp) + ok, err := readLine(sc, timeField, msgFields, lmp) wcr.DecConcurrency() if err != nil { errorsTotal.Inc() @@ -93,7 +93,7 @@ func processStreamInternal(r io.Reader, timeField, msgField string, lmp insertut } } -func readLine(sc *bufio.Scanner, timeField, msgField string, lmp insertutils.LogMessageProcessor) (bool, error) { +func readLine(sc *bufio.Scanner, timeField string, msgFields []string, lmp insertutils.LogMessageProcessor) (bool, error) { var line []byte for len(line) == 0 { if !sc.Scan() { @@ -116,7 +116,7 @@ func readLine(sc *bufio.Scanner, timeField, msgField string, lmp insertutils.Log if err != nil { return false, fmt.Errorf("cannot get timestamp: %w", err) } - logstorage.RenameField(p.Fields, msgField, "_msg") + logstorage.RenameField(p.Fields, msgFields, "_msg") lmp.AddRow(ts, p.Fields) logstorage.PutJSONParser(p) diff --git a/app/vlinsert/jsonline/jsonline_test.go b/app/vlinsert/jsonline/jsonline_test.go index 17dc0ad95..429a1e4f2 100644 --- a/app/vlinsert/jsonline/jsonline_test.go +++ b/app/vlinsert/jsonline/jsonline_test.go @@ -11,9 +11,10 @@ func TestProcessStreamInternal_Success(t *testing.T) { f := func(data, timeField, msgField string, rowsExpected int, timestampsExpected []int64, resultExpected string) { t.Helper() + msgFields := []string{msgField} tlp := &insertutils.TestLogMessageProcessor{} r := bytes.NewBufferString(data) - if err := processStreamInternal(r, timeField, msgField, tlp); err != nil { + if err := processStreamInternal(r, timeField, msgFields, tlp); err != nil { t.Fatalf("unexpected error: %s", err) } @@ -42,7 +43,7 @@ func TestProcessStreamInternal_Failure(t *testing.T) { tlp := &insertutils.TestLogMessageProcessor{} r := bytes.NewBufferString(data) - if err := processStreamInternal(r, "time", "", tlp); err == nil { + if err := processStreamInternal(r, "time", nil, tlp); err == nil { t.Fatalf("expecting non-nil error") } } diff --git a/app/vlinsert/syslog/syslog.go b/app/vlinsert/syslog/syslog.go index a2481d1b1..83bf6944c 100644 --- a/app/vlinsert/syslog/syslog.go +++ b/app/vlinsert/syslog/syslog.go @@ -514,13 +514,15 @@ func processLine(line []byte, currentYear int, timezone *time.Location, useLocal } ts = nsecs } - logstorage.RenameField(p.Fields, "message", "_msg") + logstorage.RenameField(p.Fields, msgFields, "_msg") lmp.AddRow(ts, p.Fields) logstorage.PutSyslogParser(p) return nil } +var msgFields = []string{"message"} + var ( rowsIngestedTotal = metrics.NewCounter(`vl_rows_ingested_total{type="syslog"}`) diff --git a/docs/VictoriaLogs/CHANGELOG.md b/docs/VictoriaLogs/CHANGELOG.md index 8e2037265..378108cc6 100644 --- a/docs/VictoriaLogs/CHANGELOG.md +++ b/docs/VictoriaLogs/CHANGELOG.md @@ -15,6 +15,8 @@ according to [these docs](https://docs.victoriametrics.com/victorialogs/quicksta ## tip +* FEATURE: allow specifying a list of log fields, which contain log message, via `_msg_field` query arg and via `VL-Msg-Field` HTTP request header. For example, `_msg_field=message,event.message` instructs obtaining [message field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field) from the first non-empty field out of the `message` and `event.message` fields. See [these docs](https://docs.victoriametrics.com/victorialogs/data-ingestion/#http-parameters) for details. + * BUGFIX: fix `runtime error: index out of range [0] with length 0` panic during low-rate data ingestion. The panic has been introduced in [v0.38.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.38.0-victorialogs). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7391). ## [v0.38.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.38.0-victorialogs) diff --git a/docs/VictoriaLogs/data-ingestion/README.md b/docs/VictoriaLogs/data-ingestion/README.md index 0f912c4de..a717540a9 100644 --- a/docs/VictoriaLogs/data-ingestion/README.md +++ b/docs/VictoriaLogs/data-ingestion/README.md @@ -193,6 +193,10 @@ First defined parameter is used. [Query string](https://en.wikipedia.org/wiki/Qu - `_msg_field` - it must contain the name of the [log field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) with the [log message](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field) generated by the log shipper. This is usually the `message` field for Filebeat and Logstash. + + The `_msg_field` arg may contain comma-separated list of field names. In this case the first non-empty field from the list + is treated as [log message](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field). + If the `_msg_field` parameter isn't set, then VictoriaLogs reads the log message from the `_msg` field. - `_time_field` - it must contain the name of the [log field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) @@ -225,6 +229,10 @@ VictoriaLogs accepts optional `AccountID` and `ProjectID` headers at [data inges - `VL-Msg-Field` - it must contain the name of the [log field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) with the [log message](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field) generated by the log shipper. This is usually the `message` field for Filebeat and Logstash. + + The `VL-Msg-Field` header may contain comma-separated list of field names. In this case the first non-empty field from the list + is treated as [log message](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field). + If the `VL-Msg-Field` header isn't set, then VictoriaLogs reads the log message from the `_msg` field. - `VL-Time-Field` - it must contain the name of the [log field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) diff --git a/docs/VictoriaLogs/keyConcepts.md b/docs/VictoriaLogs/keyConcepts.md index d1b38ba65..8eee6fbbf 100644 --- a/docs/VictoriaLogs/keyConcepts.md +++ b/docs/VictoriaLogs/keyConcepts.md @@ -128,9 +128,10 @@ log entry, which can be ingested into VictoriaLogs: ``` If the actual log message has other than `_msg` field name, then it is possible to specify the real log message field -via `_msg_field` query arg during [data ingestion](https://docs.victoriametrics.com/victorialogs/data-ingestion/). +via `_msg_field` query arg or via `VL-Msg-Field` HTTP header during [data ingestion](https://docs.victoriametrics.com/victorialogs/data-ingestion/). For example, if log message is located in the `event.original` field, then specify `_msg_field=event.original` query arg during [data ingestion](https://docs.victoriametrics.com/victorialogs/data-ingestion/). +See [these docs](https://docs.victoriametrics.com/victorialogs/data-ingestion/#http-parameters) for more details. ### Time field diff --git a/lib/logstorage/rows.go b/lib/logstorage/rows.go index 7f70b122e..dddeb896b 100644 --- a/lib/logstorage/rows.go +++ b/lib/logstorage/rows.go @@ -2,6 +2,7 @@ package logstorage import ( "fmt" + "slices" "github.com/valyala/quicktemplate" @@ -118,14 +119,15 @@ func isLogfmtSpecialChar(c rune) bool { } } -// RenameField renames field with the oldName to newName in Fields -func RenameField(fields []Field, oldName, newName string) { - if oldName == "" { +// RenameField renames the first non-empty field with the name from oldNames list to newName in Fields +func RenameField(fields []Field, oldNames []string, newName string) { + if len(oldNames) == 0 { + // Nothing to rename return } for i := range fields { f := &fields[i] - if f.Name == oldName { + if f.Value != "" && slices.Contains(oldNames, f.Name) { f.Name = newName return }