app/vlinsert: allow specifying comma-separated list of fields containing log message via _msg_field query arg and VL-Msg-Field HTTP request header

This msy be useful when ingesting logs from different sources, which store the log message in different fields.
For example, `_msg_field=message,event.data,some_field` will get log message from the first non-empty field:
`message`, `event.data` and `some_field`.
This commit is contained in:
Aliaksandr Valialkin 2024-10-30 14:13:56 +01:00
parent 102e9d4f4e
commit ed73f8350b
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB
13 changed files with 54 additions and 30 deletions

View file

@ -103,7 +103,7 @@ func RequestHandler(path string, w http.ResponseWriter, r *http.Request) bool {
} }
lmp := cp.NewLogMessageProcessor() lmp := cp.NewLogMessageProcessor()
isGzip := r.Header.Get("Content-Encoding") == "gzip" isGzip := r.Header.Get("Content-Encoding") == "gzip"
n, err := readBulkRequest(r.Body, isGzip, cp.TimeField, cp.MsgField, lmp) n, err := readBulkRequest(r.Body, isGzip, cp.TimeField, cp.MsgFields, lmp)
lmp.MustClose() lmp.MustClose()
if err != nil { if err != nil {
logger.Warnf("cannot decode log message #%d in /_bulk request: %s, stream fields: %s", n, err, cp.StreamFields) logger.Warnf("cannot decode log message #%d in /_bulk request: %s, stream fields: %s", n, err, cp.StreamFields)
@ -133,7 +133,7 @@ var (
bulkRequestDuration = metrics.NewHistogram(`vl_http_request_duration_seconds{path="/insert/elasticsearch/_bulk"}`) bulkRequestDuration = metrics.NewHistogram(`vl_http_request_duration_seconds{path="/insert/elasticsearch/_bulk"}`)
) )
func readBulkRequest(r io.Reader, isGzip bool, timeField, msgField string, lmp insertutils.LogMessageProcessor) (int, error) { func readBulkRequest(r io.Reader, isGzip bool, timeField string, msgFields []string, lmp insertutils.LogMessageProcessor) (int, error) {
// See https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html // See https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html
if isGzip { if isGzip {
@ -158,7 +158,7 @@ func readBulkRequest(r io.Reader, isGzip bool, timeField, msgField string, lmp i
n := 0 n := 0
nCheckpoint := 0 nCheckpoint := 0
for { for {
ok, err := readBulkLine(sc, timeField, msgField, lmp) ok, err := readBulkLine(sc, timeField, msgFields, lmp)
wcr.DecConcurrency() wcr.DecConcurrency()
if err != nil || !ok { if err != nil || !ok {
rowsIngestedTotal.Add(n - nCheckpoint) rowsIngestedTotal.Add(n - nCheckpoint)
@ -174,7 +174,7 @@ func readBulkRequest(r io.Reader, isGzip bool, timeField, msgField string, lmp i
var lineBufferPool bytesutil.ByteBufferPool var lineBufferPool bytesutil.ByteBufferPool
func readBulkLine(sc *bufio.Scanner, timeField, msgField string, lmp insertutils.LogMessageProcessor) (bool, error) { func readBulkLine(sc *bufio.Scanner, timeField string, msgFields []string, lmp insertutils.LogMessageProcessor) (bool, error) {
var line []byte var line []byte
// Read the command, must be "create" or "index" // Read the command, must be "create" or "index"
@ -219,7 +219,7 @@ func readBulkLine(sc *bufio.Scanner, timeField, msgField string, lmp insertutils
if ts == 0 { if ts == 0 {
ts = time.Now().UnixNano() ts = time.Now().UnixNano()
} }
logstorage.RenameField(p.Fields, msgField, "_msg") logstorage.RenameField(p.Fields, msgFields, "_msg")
lmp.AddRow(ts, p.Fields) lmp.AddRow(ts, p.Fields)
logstorage.PutJSONParser(p) logstorage.PutJSONParser(p)

View file

@ -15,7 +15,7 @@ func TestReadBulkRequest_Failure(t *testing.T) {
tlp := &insertutils.TestLogMessageProcessor{} tlp := &insertutils.TestLogMessageProcessor{}
r := bytes.NewBufferString(data) r := bytes.NewBufferString(data)
rows, err := readBulkRequest(r, false, "_time", "_msg", tlp) rows, err := readBulkRequest(r, false, "_time", []string{"_msg"}, tlp)
if err == nil { if err == nil {
t.Fatalf("expecting non-empty error") t.Fatalf("expecting non-empty error")
} }
@ -36,11 +36,12 @@ func TestReadBulkRequest_Success(t *testing.T) {
f := func(data, timeField, msgField string, rowsExpected int, timestampsExpected []int64, resultExpected string) { f := func(data, timeField, msgField string, rowsExpected int, timestampsExpected []int64, resultExpected string) {
t.Helper() t.Helper()
msgFields := []string{"non_existing_foo", msgField, "non_exiting_bar"}
tlp := &insertutils.TestLogMessageProcessor{} tlp := &insertutils.TestLogMessageProcessor{}
// Read the request without compression // Read the request without compression
r := bytes.NewBufferString(data) r := bytes.NewBufferString(data)
rows, err := readBulkRequest(r, false, timeField, msgField, tlp) rows, err := readBulkRequest(r, false, timeField, msgFields, tlp)
if err != nil { if err != nil {
t.Fatalf("unexpected error: %s", err) t.Fatalf("unexpected error: %s", err)
} }
@ -55,7 +56,7 @@ func TestReadBulkRequest_Success(t *testing.T) {
tlp = &insertutils.TestLogMessageProcessor{} tlp = &insertutils.TestLogMessageProcessor{}
compressedData := compressData(data) compressedData := compressData(data)
r = bytes.NewBufferString(compressedData) r = bytes.NewBufferString(compressedData)
rows, err = readBulkRequest(r, true, timeField, msgField, tlp) rows, err = readBulkRequest(r, true, timeField, msgFields, tlp)
if err != nil { if err != nil {
t.Fatalf("unexpected error: %s", err) t.Fatalf("unexpected error: %s", err)
} }

View file

@ -32,7 +32,7 @@ func benchmarkReadBulkRequest(b *testing.B, isGzip bool) {
dataBytes := bytesutil.ToUnsafeBytes(data) dataBytes := bytesutil.ToUnsafeBytes(data)
timeField := "@timestamp" timeField := "@timestamp"
msgField := "message" msgFields := []string{"message"}
blp := &insertutils.BenchmarkLogMessageProcessor{} blp := &insertutils.BenchmarkLogMessageProcessor{}
b.ReportAllocs() b.ReportAllocs()
@ -41,7 +41,7 @@ func benchmarkReadBulkRequest(b *testing.B, isGzip bool) {
r := &bytes.Reader{} r := &bytes.Reader{}
for pb.Next() { for pb.Next() {
r.Reset(dataBytes) r.Reset(dataBytes)
_, err := readBulkRequest(r, isGzip, timeField, msgField, blp) _, err := readBulkRequest(r, isGzip, timeField, msgFields, blp)
if err != nil { if err != nil {
panic(fmt.Errorf("unexpected error: %w", err)) panic(fmt.Errorf("unexpected error: %w", err))
} }

View file

@ -22,7 +22,7 @@ import (
type CommonParams struct { type CommonParams struct {
TenantID logstorage.TenantID TenantID logstorage.TenantID
TimeField string TimeField string
MsgField string MsgFields []string
StreamFields []string StreamFields []string
IgnoreFields []string IgnoreFields []string
@ -54,6 +54,10 @@ func GetCommonParams(r *http.Request) (*CommonParams, error) {
} else if msgf = r.Header.Get("VL-Msg-Field"); msgf != "" { } else if msgf = r.Header.Get("VL-Msg-Field"); msgf != "" {
msgField = msgf msgField = msgf
} }
var msgFields []string
if msgField != "" {
msgFields = strings.Split(msgField, ",")
}
streamFields := httputils.GetArray(r, "_stream_fields") streamFields := httputils.GetArray(r, "_stream_fields")
if len(streamFields) == 0 { if len(streamFields) == 0 {
@ -89,7 +93,7 @@ func GetCommonParams(r *http.Request) (*CommonParams, error) {
cp := &CommonParams{ cp := &CommonParams{
TenantID: tenantID, TenantID: tenantID,
TimeField: timeField, TimeField: timeField,
MsgField: msgField, MsgFields: msgFields,
StreamFields: streamFields, StreamFields: streamFields,
IgnoreFields: ignoreFields, IgnoreFields: ignoreFields,
Debug: debug, Debug: debug,
@ -106,7 +110,9 @@ func GetCommonParamsForSyslog(tenantID logstorage.TenantID) *CommonParams {
cp := &CommonParams{ cp := &CommonParams{
TenantID: tenantID, TenantID: tenantID,
TimeField: "timestamp", TimeField: "timestamp",
MsgField: "message", MsgFields: []string{
"message",
},
StreamFields: []string{ StreamFields: []string{
"hostname", "hostname",
"app_name", "app_name",

View file

@ -8,6 +8,7 @@ import (
"io" "io"
"net/http" "net/http"
"regexp" "regexp"
"slices"
"strconv" "strconv"
"strings" "strings"
"time" "time"
@ -64,7 +65,7 @@ func getCommonParams(r *http.Request) (*insertutils.CommonParams, error) {
if len(cp.IgnoreFields) == 0 { if len(cp.IgnoreFields) == 0 {
cp.IgnoreFields = *journaldIgnoreFields cp.IgnoreFields = *journaldIgnoreFields
} }
cp.MsgField = "MESSAGE" cp.MsgFields = []string{"MESSAGE"}
return cp, nil return cp, nil
} }
@ -233,7 +234,7 @@ func parseJournaldRequest(data []byte, lmp insertutils.LogMessageProcessor, cp *
continue continue
} }
if name == cp.MsgField { if slices.Contains(cp.MsgFields, name) {
name = "_msg" name = "_msg"
} }

View file

@ -12,7 +12,7 @@ func TestPushJournaldOk(t *testing.T) {
tlp := &insertutils.TestLogMessageProcessor{} tlp := &insertutils.TestLogMessageProcessor{}
cp := &insertutils.CommonParams{ cp := &insertutils.CommonParams{
TimeField: "__REALTIME_TIMESTAMP", TimeField: "__REALTIME_TIMESTAMP",
MsgField: "MESSAGE", MsgFields: []string{"MESSAGE"},
} }
n, err := parseJournaldRequest([]byte(src), tlp, cp) n, err := parseJournaldRequest([]byte(src), tlp, cp)
if err != nil { if err != nil {
@ -48,7 +48,7 @@ func TestPushJournald_Failure(t *testing.T) {
tlp := &insertutils.TestLogMessageProcessor{} tlp := &insertutils.TestLogMessageProcessor{}
cp := &insertutils.CommonParams{ cp := &insertutils.CommonParams{
TimeField: "__REALTIME_TIMESTAMP", TimeField: "__REALTIME_TIMESTAMP",
MsgField: "MESSAGE", MsgFields: []string{"MESSAGE"},
} }
_, err := parseJournaldRequest([]byte(data), tlp, cp) _, err := parseJournaldRequest([]byte(data), tlp, cp)
if err == nil { if err == nil {

View file

@ -53,7 +53,7 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) {
} }
lmp := cp.NewLogMessageProcessor() lmp := cp.NewLogMessageProcessor()
err = processStreamInternal(reader, cp.TimeField, cp.MsgField, lmp) err = processStreamInternal(reader, cp.TimeField, cp.MsgFields, lmp)
lmp.MustClose() lmp.MustClose()
if err != nil { if err != nil {
@ -66,7 +66,7 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) {
} }
} }
func processStreamInternal(r io.Reader, timeField, msgField string, lmp insertutils.LogMessageProcessor) error { func processStreamInternal(r io.Reader, timeField string, msgFields []string, lmp insertutils.LogMessageProcessor) error {
wcr := writeconcurrencylimiter.GetReader(r) wcr := writeconcurrencylimiter.GetReader(r)
defer writeconcurrencylimiter.PutReader(wcr) defer writeconcurrencylimiter.PutReader(wcr)
@ -79,7 +79,7 @@ func processStreamInternal(r io.Reader, timeField, msgField string, lmp insertut
n := 0 n := 0
for { for {
ok, err := readLine(sc, timeField, msgField, lmp) ok, err := readLine(sc, timeField, msgFields, lmp)
wcr.DecConcurrency() wcr.DecConcurrency()
if err != nil { if err != nil {
errorsTotal.Inc() errorsTotal.Inc()
@ -93,7 +93,7 @@ func processStreamInternal(r io.Reader, timeField, msgField string, lmp insertut
} }
} }
func readLine(sc *bufio.Scanner, timeField, msgField string, lmp insertutils.LogMessageProcessor) (bool, error) { func readLine(sc *bufio.Scanner, timeField string, msgFields []string, lmp insertutils.LogMessageProcessor) (bool, error) {
var line []byte var line []byte
for len(line) == 0 { for len(line) == 0 {
if !sc.Scan() { if !sc.Scan() {
@ -116,7 +116,7 @@ func readLine(sc *bufio.Scanner, timeField, msgField string, lmp insertutils.Log
if err != nil { if err != nil {
return false, fmt.Errorf("cannot get timestamp: %w", err) return false, fmt.Errorf("cannot get timestamp: %w", err)
} }
logstorage.RenameField(p.Fields, msgField, "_msg") logstorage.RenameField(p.Fields, msgFields, "_msg")
lmp.AddRow(ts, p.Fields) lmp.AddRow(ts, p.Fields)
logstorage.PutJSONParser(p) logstorage.PutJSONParser(p)

View file

@ -11,9 +11,10 @@ func TestProcessStreamInternal_Success(t *testing.T) {
f := func(data, timeField, msgField string, rowsExpected int, timestampsExpected []int64, resultExpected string) { f := func(data, timeField, msgField string, rowsExpected int, timestampsExpected []int64, resultExpected string) {
t.Helper() t.Helper()
msgFields := []string{msgField}
tlp := &insertutils.TestLogMessageProcessor{} tlp := &insertutils.TestLogMessageProcessor{}
r := bytes.NewBufferString(data) r := bytes.NewBufferString(data)
if err := processStreamInternal(r, timeField, msgField, tlp); err != nil { if err := processStreamInternal(r, timeField, msgFields, tlp); err != nil {
t.Fatalf("unexpected error: %s", err) t.Fatalf("unexpected error: %s", err)
} }
@ -42,7 +43,7 @@ func TestProcessStreamInternal_Failure(t *testing.T) {
tlp := &insertutils.TestLogMessageProcessor{} tlp := &insertutils.TestLogMessageProcessor{}
r := bytes.NewBufferString(data) r := bytes.NewBufferString(data)
if err := processStreamInternal(r, "time", "", tlp); err == nil { if err := processStreamInternal(r, "time", nil, tlp); err == nil {
t.Fatalf("expecting non-nil error") t.Fatalf("expecting non-nil error")
} }
} }

View file

@ -514,13 +514,15 @@ func processLine(line []byte, currentYear int, timezone *time.Location, useLocal
} }
ts = nsecs ts = nsecs
} }
logstorage.RenameField(p.Fields, "message", "_msg") logstorage.RenameField(p.Fields, msgFields, "_msg")
lmp.AddRow(ts, p.Fields) lmp.AddRow(ts, p.Fields)
logstorage.PutSyslogParser(p) logstorage.PutSyslogParser(p)
return nil return nil
} }
var msgFields = []string{"message"}
var ( var (
rowsIngestedTotal = metrics.NewCounter(`vl_rows_ingested_total{type="syslog"}`) rowsIngestedTotal = metrics.NewCounter(`vl_rows_ingested_total{type="syslog"}`)

View file

@ -15,6 +15,8 @@ according to [these docs](https://docs.victoriametrics.com/victorialogs/quicksta
## tip ## tip
* FEATURE: allow specifying a list of log fields, which contain log message, via `_msg_field` query arg and via `VL-Msg-Field` HTTP request header. For example, `_msg_field=message,event.message` instructs obtaining [message field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field) from the first non-empty field out of the `message` and `event.message` fields. See [these docs](https://docs.victoriametrics.com/victorialogs/data-ingestion/#http-parameters) for details.
* BUGFIX: fix `runtime error: index out of range [0] with length 0` panic during low-rate data ingestion. The panic has been introduced in [v0.38.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.38.0-victorialogs). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7391). * BUGFIX: fix `runtime error: index out of range [0] with length 0` panic during low-rate data ingestion. The panic has been introduced in [v0.38.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.38.0-victorialogs). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7391).
## [v0.38.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.38.0-victorialogs) ## [v0.38.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.38.0-victorialogs)

View file

@ -193,6 +193,10 @@ First defined parameter is used. [Query string](https://en.wikipedia.org/wiki/Qu
- `_msg_field` - it must contain the name of the [log field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) - `_msg_field` - it must contain the name of the [log field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model)
with the [log message](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field) generated by the log shipper. with the [log message](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field) generated by the log shipper.
This is usually the `message` field for Filebeat and Logstash. This is usually the `message` field for Filebeat and Logstash.
The `_msg_field` arg may contain comma-separated list of field names. In this case the first non-empty field from the list
is treated as [log message](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field).
If the `_msg_field` parameter isn't set, then VictoriaLogs reads the log message from the `_msg` field. If the `_msg_field` parameter isn't set, then VictoriaLogs reads the log message from the `_msg` field.
- `_time_field` - it must contain the name of the [log field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) - `_time_field` - it must contain the name of the [log field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model)
@ -225,6 +229,10 @@ VictoriaLogs accepts optional `AccountID` and `ProjectID` headers at [data inges
- `VL-Msg-Field` - it must contain the name of the [log field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) - `VL-Msg-Field` - it must contain the name of the [log field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model)
with the [log message](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field) generated by the log shipper. with the [log message](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field) generated by the log shipper.
This is usually the `message` field for Filebeat and Logstash. This is usually the `message` field for Filebeat and Logstash.
The `VL-Msg-Field` header may contain comma-separated list of field names. In this case the first non-empty field from the list
is treated as [log message](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field).
If the `VL-Msg-Field` header isn't set, then VictoriaLogs reads the log message from the `_msg` field. If the `VL-Msg-Field` header isn't set, then VictoriaLogs reads the log message from the `_msg` field.
- `VL-Time-Field` - it must contain the name of the [log field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) - `VL-Time-Field` - it must contain the name of the [log field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model)

View file

@ -128,9 +128,10 @@ log entry, which can be ingested into VictoriaLogs:
``` ```
If the actual log message has other than `_msg` field name, then it is possible to specify the real log message field If the actual log message has other than `_msg` field name, then it is possible to specify the real log message field
via `_msg_field` query arg during [data ingestion](https://docs.victoriametrics.com/victorialogs/data-ingestion/). via `_msg_field` query arg or via `VL-Msg-Field` HTTP header during [data ingestion](https://docs.victoriametrics.com/victorialogs/data-ingestion/).
For example, if log message is located in the `event.original` field, then specify `_msg_field=event.original` query arg For example, if log message is located in the `event.original` field, then specify `_msg_field=event.original` query arg
during [data ingestion](https://docs.victoriametrics.com/victorialogs/data-ingestion/). during [data ingestion](https://docs.victoriametrics.com/victorialogs/data-ingestion/).
See [these docs](https://docs.victoriametrics.com/victorialogs/data-ingestion/#http-parameters) for more details.
### Time field ### Time field

View file

@ -2,6 +2,7 @@ package logstorage
import ( import (
"fmt" "fmt"
"slices"
"github.com/valyala/quicktemplate" "github.com/valyala/quicktemplate"
@ -118,14 +119,15 @@ func isLogfmtSpecialChar(c rune) bool {
} }
} }
// RenameField renames field with the oldName to newName in Fields // RenameField renames the first non-empty field with the name from oldNames list to newName in Fields
func RenameField(fields []Field, oldName, newName string) { func RenameField(fields []Field, oldNames []string, newName string) {
if oldName == "" { if len(oldNames) == 0 {
// Nothing to rename
return return
} }
for i := range fields { for i := range fields {
f := &fields[i] f := &fields[i]
if f.Name == oldName { if f.Value != "" && slices.Contains(oldNames, f.Name) {
f.Name = newName f.Name = newName
return return
} }