diff --git a/app/vlinsert/insertutils/common_params.go b/app/vlinsert/insertutils/common_params.go index f2170da62..0077a19a0 100644 --- a/app/vlinsert/insertutils/common_params.go +++ b/app/vlinsert/insertutils/common_params.go @@ -2,7 +2,9 @@ package insertutils import ( "flag" + "fmt" "net/http" + "strconv" "strings" "sync" "time" @@ -31,6 +33,7 @@ type CommonParams struct { MsgFields []string StreamFields []string IgnoreFields []string + ExtraFields []logstorage.Field Debug bool DebugRequestURI string @@ -45,48 +48,25 @@ func GetCommonParams(r *http.Request) (*CommonParams, error) { return nil, err } - // Extract time field name from _time_field query arg or header timeField := "_time" - if tf := r.FormValue("_time_field"); tf != "" { - timeField = tf - } else if tf = r.Header.Get("VL-Time-Field"); tf != "" { + if tf := httputils.GetRequestValue(r, "_time_field", "VL-Time-Field"); tf != "" { timeField = tf } - // Extract message field name from _msg_field query arg or header - msgField := "" - if msgf := r.FormValue("_msg_field"); msgf != "" { - msgField = msgf - } else if msgf = r.Header.Get("VL-Msg-Field"); msgf != "" { - msgField = msgf - } - var msgFields []string - if msgField != "" { - msgFields = strings.Split(msgField, ",") + msgFields := httputils.GetArray(r, "_msg_field", "VL-Msg-Field") + streamFields := httputils.GetArray(r, "_stream_fields", "VL-Stream-Fields") + ignoreFields := httputils.GetArray(r, "ignore_fields", "VL-Ignore-Fields") + + extraFields, err := getExtraFields(r) + if err != nil { + return nil, err } - streamFields := httputils.GetArray(r, "_stream_fields") - if len(streamFields) == 0 { - if sf := r.Header.Get("VL-Stream-Fields"); len(sf) > 0 { - streamFields = strings.Split(sf, ",") - } - } - ignoreFields := httputils.GetArray(r, "ignore_fields") - if len(ignoreFields) == 0 { - if f := r.Header.Get("VL-Ignore-Fields"); len(f) > 0 { - ignoreFields = strings.Split(f, ",") - } - } - - debug := httputils.GetBool(r, "debug") - if !debug { - if dh := r.Header.Get("VL-Debug"); len(dh) > 0 { - hv := strings.ToLower(dh) - switch hv { - case "", "0", "f", "false", "no": - default: - debug = true - } + debug := false + if dv := httputils.GetRequestValue(r, "debug", "VL-Debug"); dv != "" { + debug, err = strconv.ParseBool(dv) + if err != nil { + return nil, fmt.Errorf("cannot parse debug=%q: %w", dv, err) } } debugRequestURI := "" @@ -102,6 +82,7 @@ func GetCommonParams(r *http.Request) (*CommonParams, error) { MsgFields: msgFields, StreamFields: streamFields, IgnoreFields: ignoreFields, + ExtraFields: extraFields, Debug: debug, DebugRequestURI: debugRequestURI, DebugRemoteAddr: debugRemoteAddr, @@ -110,6 +91,26 @@ func GetCommonParams(r *http.Request) (*CommonParams, error) { return cp, nil } +func getExtraFields(r *http.Request) ([]logstorage.Field, error) { + efs := httputils.GetArray(r, "extra_fields", "VL-Extra-Fields") + if len(efs) == 0 { + return nil, nil + } + + extraFields := make([]logstorage.Field, len(efs)) + for i, ef := range efs { + n := strings.Index(ef, "=") + if n <= 0 || n == len(ef)-1 { + return nil, fmt.Errorf(`invalid extra_field format: %q; must be in the form "field=value"`, ef) + } + extraFields[i] = logstorage.Field{ + Name: ef[:n], + Value: ef[n+1:], + } + } + return extraFields, nil +} + // GetCommonParamsForSyslog returns common params needed for parsing syslog messages and storing them to the given tenantID. func GetCommonParamsForSyslog(tenantID logstorage.TenantID) *CommonParams { // See https://docs.victoriametrics.com/victorialogs/logsql/#unpack_syslog-pipe @@ -146,8 +147,6 @@ type logMessageProcessor struct { stopCh chan struct{} lastFlushTime time.Time - tmpFields []logstorage.Field - cp *CommonParams lr *logstorage.LogRows } @@ -190,17 +189,6 @@ func (lmp *logMessageProcessor) AddRow(timestamp int64, fields []logstorage.Fiel return } - if *defaultMsgValue != "" && !hasMsgField(fields) { - // The log entry doesn't contain mandatory _msg field. Add _msg field with default value then - // according to https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field . - lmp.tmpFields = append(lmp.tmpFields[:0], fields...) - lmp.tmpFields = append(lmp.tmpFields, logstorage.Field{ - Name: "_msg", - Value: *defaultMsgValue, - }) - fields = lmp.tmpFields - } - lmp.lr.MustAdd(lmp.cp.TenantID, timestamp, fields) if lmp.cp.Debug { s := lmp.lr.GetRowString(0) @@ -214,15 +202,6 @@ func (lmp *logMessageProcessor) AddRow(timestamp int64, fields []logstorage.Fiel } } -func hasMsgField(fields []logstorage.Field) bool { - for _, f := range fields { - if f.Name == "_msg" { - return len(f.Value) > 0 - } - } - return false -} - // flushLocked must be called under locked lmp.mu. func (lmp *logMessageProcessor) flushLocked() { lmp.lastFlushTime = time.Now() @@ -244,7 +223,7 @@ func (lmp *logMessageProcessor) MustClose() { // // MustClose() must be called on the returned LogMessageProcessor when it is no longer needed. func (cp *CommonParams) NewLogMessageProcessor() LogMessageProcessor { - lr := logstorage.GetLogRows(cp.StreamFields, cp.IgnoreFields) + lr := logstorage.GetLogRows(cp.StreamFields, cp.IgnoreFields, cp.ExtraFields, *defaultMsgValue) lmp := &logMessageProcessor{ cp: cp, lr: lr, diff --git a/docs/VictoriaLogs/CHANGELOG.md b/docs/VictoriaLogs/CHANGELOG.md index 5c3119782..13068027a 100644 --- a/docs/VictoriaLogs/CHANGELOG.md +++ b/docs/VictoriaLogs/CHANGELOG.md @@ -15,6 +15,8 @@ according to [these docs](https://docs.victoriametrics.com/victorialogs/quicksta ## tip +* FEATURE: add an ability to specify extra fields for logs ingested via [HTTP-based data ingestion protocols](https://docs.victoriametrics.com/victorialogs/data-ingestion/#http-apis). See `extra_fields` query arg and `VL-Extra-Fields` HTTP header in [these docs](https://docs.victoriametrics.com/victorialogs/data-ingestion/#http-parameters). + ## [v0.40.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.40.0-victorialogs) Released at 2024-10-31 diff --git a/docs/VictoriaLogs/data-ingestion/README.md b/docs/VictoriaLogs/data-ingestion/README.md index 63d77fd8c..4be5d9a2e 100644 --- a/docs/VictoriaLogs/data-ingestion/README.md +++ b/docs/VictoriaLogs/data-ingestion/README.md @@ -189,66 +189,78 @@ HTTP query string parameters have priority over HTTP Headers. #### HTTP Query string parameters -List of supported [Query string](https://en.wikipedia.org/wiki/Query_string) parameters: +All the [HTTP-based data ingestion protocols](#http-apis) support the following [HTTP query string](https://en.wikipedia.org/wiki/Query_string) args: -- `_msg_field` - it must contain the name of the [log field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) - with the [log message](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field) generated by the log shipper. +- `_msg_field` - the name of the [log field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) + containing [log message](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field). This is usually the `message` field for Filebeat and Logstash. The `_msg_field` arg may contain comma-separated list of field names. In this case the first non-empty field from the list is treated as [log message](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field). - If the `_msg_field` parameter isn't set, then VictoriaLogs reads the log message from the `_msg` field. + If the `_msg_field` arg isn't set, then VictoriaLogs reads the log message from the `_msg` field. If the `_msg` field is empty, + then it is set to `-defaultMsgValue` command-line flag value. -- `_time_field` - it must contain the name of the [log field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) - with the [log timestamp](https://docs.victoriametrics.com/victorialogs/keyconcepts/#time-field) generated by the log shipper. +- `_time_field` - the name of the [log field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) + containing [log timestamp](https://docs.victoriametrics.com/victorialogs/keyconcepts/#time-field). This is usually the `@timestamp` field for Filebeat and Logstash. - If the `_time_field` parameter isn't set, then VictoriaLogs reads the timestamp from the `_time` field. - If this field doesn't exist, then the current timestamp is used. -- `_stream_fields` - it should contain comma-separated list of [log field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) names, - which uniquely identify every [log stream](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields) collected the log shipper. - If the `_stream_fields` parameter isn't set, then all the ingested logs are written to default log stream - `{}`. + If the `_time_field` arg isn't set, then VictoriaLogs reads the timestamp from the `_time` field. If this field doesn't exist, then the current timestamp is used. -- `ignore_fields` - this parameter may contain the list of [log field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) names, +- `_stream_fields` - comma-separated list of [log field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) names, + which uniquely identify every [log stream](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields). + + If the `_stream_fields` arg isn't set, then all the ingested logs are written to default log stream - `{}`. + +- `ignore_fields` - an optional comma-separated list of [log field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) names, which must be ignored during data ingestion. -- `debug` - if this parameter is set to `1`, then the ingested logs aren't stored in VictoriaLogs. Instead, +- `extra_fields` - an optional comma-separated list [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model), + which must be added to all the ingested logs. The format of every `extra_fields` entry is `field_name=field_value`. + If the log entry contains fields from the `extra_fields`, then they are overwritten by the values specified in `extra_fields`. + +- `debug` - if this arg is set to `1`, then the ingested logs aren't stored in VictoriaLogs. Instead, the ingested data is logged by VictoriaLogs, so it can be investigated later. See also [HTTP headers](#http-headers). #### HTTP headers -List of supported [HTTP Headers](https://en.wikipedia.org/wiki/List_of_HTTP_header_fields) parameters: +All the [HTTP-based data ingestion protocols](#http-apis) support the following [HTTP Headers](https://en.wikipedia.org/wiki/List_of_HTTP_header_fields) +additionally to [HTTP query args](#http-query-string-parameters): -- `AccountID` - may contain the needed accountID of tenant to ingest data to. See [multitenancy docs](https://docs.victoriametrics.com/victorialogs/#multitenancy) for details. +- `AccountID` - accountID of the tenant to ingest data to. See [multitenancy docs](https://docs.victoriametrics.com/victorialogs/#multitenancy) for details. -- `ProjectID`- may contain the projectID needed of tenant to ingest data to. See [multitenancy docs](https://docs.victoriametrics.com/victorialogs/#multitenancy) for details. - VictoriaLogs accepts optional `AccountID` and `ProjectID` headers at [data ingestion HTTP APIs](#http-apis). +- `ProjectID`- projectID of the tenant to ingest data to. See [multitenancy docs](https://docs.victoriametrics.com/victorialogs/#multitenancy) for details. -- `VL-Msg-Field` - it must contain the name of the [log field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) - with the [log message](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field) generated by the log shipper. +- `VL-Msg-Field` - the name of the [log field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) + containing [log message](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field). This is usually the `message` field for Filebeat and Logstash. The `VL-Msg-Field` header may contain comma-separated list of field names. In this case the first non-empty field from the list is treated as [log message](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field). - If the `VL-Msg-Field` header isn't set, then VictoriaLogs reads the log message from the `_msg` field. + If the `VL-Msg-Field` header isn't set, then VictoriaLogs reads log message from the `_msg` field. If the `_msg` field is empty, + then it is set to `-defaultMsgValue` command-line flag value. -- `VL-Time-Field` - it must contain the name of the [log field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) - with the [log timestamp](https://docs.victoriametrics.com/victorialogs/keyconcepts/#time-field) generated by the log shipper. +- `VL-Time-Field` - the name of the [log field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) + containing [log timestamp](https://docs.victoriametrics.com/victorialogs/keyconcepts/#time-field). This is usually the `@timestamp` field for Filebeat and Logstash. - If the `VL-Time-Field` header isn't set, then VictoriaLogs reads the timestamp from the `_time` field. - If this field doesn't exist, then the current timestamp is used. -- `VL-Stream-Fields` - it should contain comma-separated list of [log field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) names, - which uniquely identify every [log stream](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields) collected the log shipper. + If the `VL-Time-Field` header isn't set, then VictoriaLogs reads the timestamp from the `_time` field. If this field doesn't exist, then the current timestamp is used. + +- `VL-Stream-Fields` - comma-separated list of [log field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) names, + which uniquely identify every [log stream](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields). + If the `VL-Stream-Fields` header isn't set, then all the ingested logs are written to default log stream - `{}`. -- `VL-Ignore-Fields` - this parameter may contain the list of [log field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) names, +- `VL-Ignore-Fields` - an optional comma-separated list of [log field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) names, which must be ignored during data ingestion. +- `VL-Extra-Field` - an optional comma-separated list of [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model), + which must be added to all the ingested logs. The format of every `extra_fields` entry is `field_name=field_value`. + If the log entry contains fields from the `extra_fields`, then they are overwritten by the values specified in `extra_fields`. + - `VL-Debug` - if this parameter is set to `1`, then the ingested logs aren't stored in VictoriaLogs. Instead, the ingested data is logged by VictoriaLogs, so it can be investigated later. diff --git a/lib/httputils/array.go b/lib/httputils/array.go index 5d61f3f7e..ca5c5b7d4 100644 --- a/lib/httputils/array.go +++ b/lib/httputils/array.go @@ -5,11 +5,20 @@ import ( "strings" ) -// GetArray returns an array of comma-separated values from r arg with the argKey name. -func GetArray(r *http.Request, argKey string) []string { - v := r.FormValue(argKey) +// GetArray returns an array of comma-separated values from r with the argKey quey arg or with headerKey header. +func GetArray(r *http.Request, argKey, headerKey string) []string { + v := GetRequestValue(r, argKey, headerKey) if v == "" { return nil } return strings.Split(v, ",") } + +// GetRequestValue returns r value for the given argKey query arg or for the given headerKey header. +func GetRequestValue(r *http.Request, argKey, headerKey string) string { + v := r.FormValue(argKey) + if v == "" { + v = r.Header.Get(headerKey) + } + return v +} diff --git a/lib/logstorage/filter_stream_id_test.go b/lib/logstorage/filter_stream_id_test.go index 78fc357fc..c6628f4b8 100644 --- a/lib/logstorage/filter_stream_id_test.go +++ b/lib/logstorage/filter_stream_id_test.go @@ -84,7 +84,7 @@ func testFilterMatchForStreamID(t *testing.T, f filter, expectedRowIdxs []int) { func generateTestLogStreams(s *Storage, tenantID TenantID, getMsgValue func(int) string, rowsCount, streamsCount int) { streamFields := []string{"host", "app"} - lr := GetLogRows(streamFields, nil) + lr := GetLogRows(streamFields, nil, nil, "") var fields []Field for i := range rowsCount { fields = append(fields[:0], Field{ diff --git a/lib/logstorage/filter_test.go b/lib/logstorage/filter_test.go index 90f288280..017abca1d 100644 --- a/lib/logstorage/filter_test.go +++ b/lib/logstorage/filter_test.go @@ -249,7 +249,7 @@ func generateRowsFromColumns(s *Storage, tenantID TenantID, columns []column) { "job", "instance", } - lr := GetLogRows(streamTags, nil) + lr := GetLogRows(streamTags, nil, nil, "") var fields []Field for i := range columns[0].values { // Add stream tags diff --git a/lib/logstorage/filter_time_test.go b/lib/logstorage/filter_time_test.go index 0ad7b013e..ffb1ed2c6 100644 --- a/lib/logstorage/filter_time_test.go +++ b/lib/logstorage/filter_time_test.go @@ -123,7 +123,7 @@ func testFilterMatchForTimestamps(t *testing.T, timestamps []int64, f filter, ex } func generateRowsFromTimestamps(s *Storage, tenantID TenantID, timestamps []int64, getValue func(rowIdx int) string) { - lr := GetLogRows(nil, nil) + lr := GetLogRows(nil, nil, nil, "") var fields []Field for i, timestamp := range timestamps { fields = append(fields[:0], Field{ diff --git a/lib/logstorage/inmemory_part_test.go b/lib/logstorage/inmemory_part_test.go index c2f62c99c..ce17fa149 100644 --- a/lib/logstorage/inmemory_part_test.go +++ b/lib/logstorage/inmemory_part_test.go @@ -22,7 +22,7 @@ func TestInmemoryPartMustInitFromRows(t *testing.T) { // make a copy of lr - it is used for comapring the results later, // since lr may be modified by inmemoryPart.mustInitFromRows() - lrOrig := GetLogRows(nil, nil) + lrOrig := GetLogRows(nil, nil, nil, "") for i, timestamp := range lr.timestamps { if timestamp < minTimestampExpected { minTimestampExpected = timestamp @@ -72,7 +72,7 @@ func TestInmemoryPartMustInitFromRows(t *testing.T) { } } - f(GetLogRows(nil, nil), 0, 0) + f(GetLogRows(nil, nil, nil, ""), 0, 0) // Check how inmemoryPart works with a single stream f(newTestLogRows(1, 1, 0), 1, 0.7) @@ -108,7 +108,7 @@ func TestInmemoryPartInitFromBlockStreamReaders(t *testing.T) { maxTimestampExpected := int64(math.MinInt64) // make a copy of rrss in order to compare the results after merge. - lrOrig := GetLogRows(nil, nil) + lrOrig := GetLogRows(nil, nil, nil, "") for _, lr := range lrs { uncompressedSizeBytesExpected += uncompressedRowsSizeBytes(lr.rows) rowsCountExpected += len(lr.timestamps) @@ -188,8 +188,8 @@ func TestInmemoryPartInitFromBlockStreamReaders(t *testing.T) { // Check empty readers f(nil, 0, 0) - f([]*LogRows{GetLogRows(nil, nil)}, 0, 0) - f([]*LogRows{GetLogRows(nil, nil), GetLogRows(nil, nil)}, 0, 0) + f([]*LogRows{GetLogRows(nil, nil, nil, "")}, 0, 0) + f([]*LogRows{GetLogRows(nil, nil, nil, ""), GetLogRows(nil, nil, nil, "")}, 0, 0) // Check merge with a single reader f([]*LogRows{newTestLogRows(1, 1, 0)}, 1, 0.7) @@ -235,7 +235,7 @@ func newTestLogRows(streams, rowsPerStream int, seed int64) *LogRows { streamTags := []string{ "some-stream-tag", } - lr := GetLogRows(streamTags, nil) + lr := GetLogRows(streamTags, nil, nil, "") rng := rand.New(rand.NewSource(seed)) var fields []Field for i := 0; i < streams; i++ { @@ -322,7 +322,7 @@ func checkEqualRows(lrResult, lrOrig *LogRows) error { // // This function is for testing and debugging purposes only. func (mp *inmemoryPart) readLogRows(sbu *stringsBlockUnmarshaler, vd *valuesDecoder) *LogRows { - lr := GetLogRows(nil, nil) + lr := GetLogRows(nil, nil, nil, "") bsr := getBlockStreamReader() defer putBlockStreamReader(bsr) bsr.MustInitFromInmemoryPart(mp) diff --git a/lib/logstorage/log_rows.go b/lib/logstorage/log_rows.go index bf5553b84..2b642c199 100644 --- a/lib/logstorage/log_rows.go +++ b/lib/logstorage/log_rows.go @@ -3,16 +3,14 @@ package logstorage import ( "sort" "sync" - - "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" ) // LogRows holds a set of rows needed for Storage.MustAddRows // // LogRows must be obtained via GetLogRows() type LogRows struct { - // buf holds all the bytes referred by items in LogRows - buf []byte + // a holds all the bytes referred by items in LogRows + a arena // fieldsBuf holds all the fields referred by items in LogRows fieldsBuf []Field @@ -37,6 +35,15 @@ type LogRows struct { // ignoreFields contains names for log fields, which must be skipped during data ingestion ignoreFields map[string]struct{} + + // extraFields contains extra fields to add to all the logs at MustAdd(). + extraFields []Field + + // extraStreamFields contains extraFields, which must be treated as stream fields. + extraStreamFields []Field + + // defaultMsgValue contains default value for missing _msg field + defaultMsgValue string } type sortedFields []Field @@ -79,11 +86,16 @@ func (lr *LogRows) Reset() { for k := range ifs { delete(ifs, k) } + + lr.extraFields = nil + lr.extraStreamFields = lr.extraStreamFields[:0] + + lr.defaultMsgValue = "" } // ResetKeepSettings resets rows stored in lr, while keeping its settings passed to GetLogRows(). func (lr *LogRows) ResetKeepSettings() { - lr.buf = lr.buf[:0] + lr.a.reset() fb := lr.fieldsBuf for i := range fb { @@ -116,7 +128,7 @@ func (lr *LogRows) ResetKeepSettings() { // NeedFlush returns true if lr contains too much data, so it must be flushed to the storage. func (lr *LogRows) NeedFlush() bool { - return len(lr.buf) > (maxUncompressedBlockSize/8)*7 + return len(lr.a.b) > (maxUncompressedBlockSize/8)*7 } // MustAdd adds a log entry with the given args to lr. @@ -126,15 +138,16 @@ func (lr *LogRows) NeedFlush() bool { // // field names longer than MaxFieldNameSize are automatically truncated to MaxFieldNameSize length. func (lr *LogRows) MustAdd(tenantID TenantID, timestamp int64, fields []Field) { - // Compose StreamTags from fields according to lr.streamFields - sfs := lr.streamFields + // Compose StreamTags from fields according to lr.streamFields and lr.extraStreamFields st := GetStreamTags() - for i := range fields { - f := &fields[i] - if _, ok := sfs[f.Name]; ok { + for _, f := range fields { + if _, ok := lr.streamFields[f.Name]; ok { st.Add(f.Name, f.Value) } } + for _, f := range lr.extraStreamFields { + st.Add(f.Name, f.Value) + } // Marshal StreamTags bb := bbPool.Get() @@ -152,23 +165,45 @@ func (lr *LogRows) MustAdd(tenantID TenantID, timestamp int64, fields []Field) { } func (lr *LogRows) mustAddInternal(sid streamID, timestamp int64, fields []Field, streamTagsCanonical []byte) { - buf := lr.buf - bufLen := len(buf) - buf = append(buf, streamTagsCanonical...) + streamTagsCanonicalCopy := lr.a.copyBytes(streamTagsCanonical) + lr.streamTagsCanonicals = append(lr.streamTagsCanonicals, streamTagsCanonicalCopy) - lr.streamTagsCanonicals = append(lr.streamTagsCanonicals, buf[bufLen:]) lr.streamIDs = append(lr.streamIDs, sid) lr.timestamps = append(lr.timestamps, timestamp) - // Store all the fields - ifs := lr.ignoreFields + fieldsLen := len(lr.fieldsBuf) + hasMsgField := lr.addFieldsInternal(fields, lr.ignoreFields) + if lr.addFieldsInternal(lr.extraFields, nil) { + hasMsgField = true + } + + // Add optional default _msg field + if !hasMsgField && lr.defaultMsgValue != "" { + value := lr.a.copyString(lr.defaultMsgValue) + lr.fieldsBuf = append(lr.fieldsBuf, Field{ + Value: value, + }) + } + + // Sort fields by name + lr.sf = lr.fieldsBuf[fieldsLen:] + sort.Sort(&lr.sf) + + // Add log row with sorted fields to lr.rows + lr.rows = append(lr.rows, lr.sf) +} + +func (lr *LogRows) addFieldsInternal(fields []Field, ignoreFields map[string]struct{}) bool { + if len(fields) == 0 { + return false + } + fb := lr.fieldsBuf - fieldsLen := len(fb) + hasMsgField := false for i := range fields { f := &fields[i] - if _, ok := ifs[f.Name]; ok { - // Skip fields from the ifs map + if _, ok := ignoreFields[f.Name]; ok { continue } if f.Value == "" { @@ -179,26 +214,20 @@ func (lr *LogRows) mustAddInternal(sid streamID, timestamp int64, fields []Field fb = append(fb, Field{}) dstField := &fb[len(fb)-1] - bufLen = len(buf) fieldName := f.Name if len(fieldName) > MaxFieldNameSize { fieldName = fieldName[:MaxFieldNameSize] } - if fieldName != "_msg" { - buf = append(buf, fieldName...) + if fieldName == "_msg" { + fieldName = "" + hasMsgField = true } - dstField.Name = bytesutil.ToUnsafeString(buf[bufLen:]) - - bufLen = len(buf) - buf = append(buf, f.Value...) - dstField.Value = bytesutil.ToUnsafeString(buf[bufLen:]) + dstField.Name = lr.a.copyString(fieldName) + dstField.Value = lr.a.copyString(f.Value) } - lr.sf = fb[fieldsLen:] - sort.Sort(&lr.sf) - lr.rows = append(lr.rows, lr.sf) - lr.fieldsBuf = fb - lr.buf = buf + + return hasMsgField } // GetRowString returns string representation of the row with the given idx. @@ -225,9 +254,11 @@ func (lr *LogRows) GetRowString(idx int) string { // // streamFields is a set of field names, which must be associated with the stream. // ignoreFields is a set of field names, which must be ignored during data ingestion. +// extraFields is a set of fields, which must be added to all the logs passed to MustAdd(). +// defaultMsgValue is the default value to store in non-existing or empty _msg. // // Return back it to the pool with PutLogRows() when it is no longer needed. -func GetLogRows(streamFields, ignoreFields []string) *LogRows { +func GetLogRows(streamFields, ignoreFields []string, extraFields []Field, defaultMsgValue string) *LogRows { v := logRowsPool.Get() if v == nil { v = &LogRows{} @@ -244,6 +275,14 @@ func GetLogRows(streamFields, ignoreFields []string) *LogRows { sfs[f] = struct{}{} } + // Initialize extraStreamFields + for _, f := range extraFields { + if _, ok := sfs[f.Name]; ok { + lr.extraStreamFields = append(lr.extraStreamFields, f) + delete(sfs, f.Name) + } + } + // Initialize ignoreFields ifs := lr.ignoreFields if ifs == nil { @@ -253,8 +292,17 @@ func GetLogRows(streamFields, ignoreFields []string) *LogRows { for _, f := range ignoreFields { if f != "" { ifs[f] = struct{}{} + delete(sfs, f) } } + for _, f := range extraFields { + // Extra fields must orverride the existing fields for the sake of consistency and security, + // so the client won't be able to override them. + ifs[f.Name] = struct{}{} + } + + lr.extraFields = extraFields + lr.defaultMsgValue = defaultMsgValue return lr } diff --git a/lib/logstorage/log_rows_test.go b/lib/logstorage/log_rows_test.go new file mode 100644 index 000000000..d7f112701 --- /dev/null +++ b/lib/logstorage/log_rows_test.go @@ -0,0 +1,145 @@ +package logstorage + +import ( + "reflect" + "testing" +) + +func TestLogRows_DefaultMsgValue(t *testing.T) { + type opts struct { + rows []string + + streamFields []string + ignoreFields []string + extraFields []Field + defaultMsgValue string + + resultExpected []string + } + + f := func(o opts) { + t.Helper() + + lr := GetLogRows(o.streamFields, o.ignoreFields, o.extraFields, o.defaultMsgValue) + defer PutLogRows(lr) + + tid := TenantID{ + AccountID: 123, + ProjectID: 456, + } + + p := GetJSONParser() + defer PutJSONParser(p) + for i, r := range o.rows { + if err := p.ParseLogMessage([]byte(r)); err != nil { + t.Fatalf("unexpected error when parsing %q: %s", r, err) + } + timestamp := int64(i)*1_000 + 1 + lr.MustAdd(tid, timestamp, p.Fields) + } + + var result []string + for i := range o.rows { + s := lr.GetRowString(i) + result = append(result, s) + } + if !reflect.DeepEqual(result, o.resultExpected) { + t.Fatalf("unexpected result\ngot\n%v\nwant\n%v", result, o.resultExpected) + } + } + + var o opts + + f(o) + + // default options + o = opts{ + rows: []string{ + `{"foo":"bar"}`, + `{}`, + `{"foo":"bar","a":"b"}`, + }, + resultExpected: []string{ + `{"_stream":"{}","_time":"1970-01-01T00:00:00.000000001Z","foo":"bar"}`, + `{"_stream":"{}","_time":"1970-01-01T00:00:00.000001001Z"}`, + `{"_stream":"{}","_time":"1970-01-01T00:00:00.000002001Z","a":"b","foo":"bar"}`, + }, + } + f(o) + + // stream fields + o = opts{ + rows: []string{ + `{"x":"y","foo":"bar"}`, + `{"x":"y","foo":"bar","abc":"de"}`, + `{}`, + }, + streamFields: []string{"foo", "abc"}, + resultExpected: []string{ + `{"_stream":"{foo=\"bar\"}","_time":"1970-01-01T00:00:00.000000001Z","foo":"bar","x":"y"}`, + `{"_stream":"{abc=\"de\",foo=\"bar\"}","_time":"1970-01-01T00:00:00.000001001Z","abc":"de","foo":"bar","x":"y"}`, + `{"_stream":"{}","_time":"1970-01-01T00:00:00.000002001Z"}`, + }, + } + f(o) + + // ignore fields + o = opts{ + rows: []string{ + `{"x":"y","foo":"bar"}`, + `{"x":"y"}`, + `{}`, + }, + streamFields: []string{"foo", "abc", "x"}, + ignoreFields: []string{"foo"}, + resultExpected: []string{ + `{"_stream":"{x=\"y\"}","_time":"1970-01-01T00:00:00.000000001Z","x":"y"}`, + `{"_stream":"{x=\"y\"}","_time":"1970-01-01T00:00:00.000001001Z","x":"y"}`, + `{"_stream":"{}","_time":"1970-01-01T00:00:00.000002001Z"}`, + }, + } + f(o) + + // extra fields + o = opts{ + rows: []string{ + `{"x":"y","foo":"bar"}`, + `{}`, + }, + streamFields: []string{"foo", "abc", "x"}, + ignoreFields: []string{"foo"}, + extraFields: []Field{ + { + Name: "foo", + Value: "test", + }, + { + Name: "abc", + Value: "1234", + }, + }, + resultExpected: []string{ + `{"_stream":"{abc=\"1234\",foo=\"test\",x=\"y\"}","_time":"1970-01-01T00:00:00.000000001Z","abc":"1234","foo":"test","x":"y"}`, + `{"_stream":"{abc=\"1234\",foo=\"test\"}","_time":"1970-01-01T00:00:00.000001001Z","abc":"1234","foo":"test"}`, + }, + } + f(o) + + // default _msg value + o = opts{ + rows: []string{ + `{"x":"y","foo":"bar"}`, + `{"_msg":"ppp"}`, + `{"abc":"ppp"}`, + }, + streamFields: []string{"abc", "x"}, + defaultMsgValue: "qwert", + resultExpected: []string{ + `{"_msg":"qwert","_stream":"{x=\"y\"}","_time":"1970-01-01T00:00:00.000000001Z","foo":"bar","x":"y"}`, + `{"_msg":"ppp","_stream":"{}","_time":"1970-01-01T00:00:00.000001001Z"}`, + `{"_msg":"qwert","_stream":"{abc=\"ppp\"}","_time":"1970-01-01T00:00:00.000002001Z","abc":"ppp"}`, + }, + } + f(o) + +} diff --git a/lib/logstorage/log_rows_timing_test.go b/lib/logstorage/log_rows_timing_test.go index 55a726e1b..32023e120 100644 --- a/lib/logstorage/log_rows_timing_test.go +++ b/lib/logstorage/log_rows_timing_test.go @@ -52,7 +52,7 @@ func BenchmarkLogRowsMustAdd(b *testing.B) { } func benchmarkLogRowsMustAdd(rows [][]Field, streamFields []string) { - lr := GetLogRows(streamFields, nil) + lr := GetLogRows(streamFields, nil, nil, "") var tid TenantID for i, fields := range rows { tid.AccountID = uint32(i) diff --git a/lib/logstorage/storage.go b/lib/logstorage/storage.go index a5de2b88e..7b2f75cb2 100644 --- a/lib/logstorage/storage.go +++ b/lib/logstorage/storage.go @@ -549,7 +549,7 @@ func (s *Storage) MustAddRows(lr *LogRows) { } lrPart := m[day] if lrPart == nil { - lrPart = GetLogRows(nil, nil) + lrPart = GetLogRows(nil, nil, nil, "") m[day] = lrPart } lrPart.mustAddInternal(lr.streamIDs[i], ts, lr.rows[i], lr.streamTagsCanonicals[i]) diff --git a/lib/logstorage/storage_search_test.go b/lib/logstorage/storage_search_test.go index 35530309e..8e5375b9a 100644 --- a/lib/logstorage/storage_search_test.go +++ b/lib/logstorage/storage_search_test.go @@ -47,7 +47,7 @@ func TestStorageRunQuery(t *testing.T) { for j := 0; j < streamsPerTenant; j++ { streamIDValue := fmt.Sprintf("stream_id=%d", j) for k := 0; k < blocksPerStream; k++ { - lr := GetLogRows(streamTags, nil) + lr := GetLogRows(streamTags, nil, nil, "") for m := 0; m < rowsPerBlock; m++ { timestamp := baseTimestamp + int64(m)*1e9 + int64(k) // Append stream fields @@ -774,7 +774,7 @@ func TestStorageSearch(t *testing.T) { allTenantIDs = append(allTenantIDs, tenantID) for j := 0; j < streamsPerTenant; j++ { for k := 0; k < blocksPerStream; k++ { - lr := GetLogRows(streamTags, nil) + lr := GetLogRows(streamTags, nil, nil, "") for m := 0; m < rowsPerBlock; m++ { timestamp := baseTimestamp + int64(m)*1e9 + int64(k) // Append stream fields