app/vlinsert: implement the ability to add extra fields to the ingested logs

This can be done via extra_fields query arg or via VL-Extra-Fields HTTP header.

See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7354#issuecomment-2448671445

(cherry picked from commit 4478e48eb6)
This commit is contained in:
Aliaksandr Valialkin 2024-11-01 20:06:15 +01:00 committed by Zakhar Bessarab
parent 21d2c417da
commit fced48d540
No known key found for this signature in database
GPG key ID: 932B34D6FE062023
13 changed files with 332 additions and 137 deletions

View file

@ -2,7 +2,9 @@ package insertutils
import (
"flag"
"fmt"
"net/http"
"strconv"
"strings"
"sync"
"time"
@ -31,6 +33,7 @@ type CommonParams struct {
MsgFields []string
StreamFields []string
IgnoreFields []string
ExtraFields []logstorage.Field
Debug bool
DebugRequestURI string
@ -45,48 +48,25 @@ func GetCommonParams(r *http.Request) (*CommonParams, error) {
return nil, err
}
// Extract time field name from _time_field query arg or header
timeField := "_time"
if tf := r.FormValue("_time_field"); tf != "" {
timeField = tf
} else if tf = r.Header.Get("VL-Time-Field"); tf != "" {
if tf := httputils.GetRequestValue(r, "_time_field", "VL-Time-Field"); tf != "" {
timeField = tf
}
// Extract message field name from _msg_field query arg or header
msgField := ""
if msgf := r.FormValue("_msg_field"); msgf != "" {
msgField = msgf
} else if msgf = r.Header.Get("VL-Msg-Field"); msgf != "" {
msgField = msgf
}
var msgFields []string
if msgField != "" {
msgFields = strings.Split(msgField, ",")
msgFields := httputils.GetArray(r, "_msg_field", "VL-Msg-Field")
streamFields := httputils.GetArray(r, "_stream_fields", "VL-Stream-Fields")
ignoreFields := httputils.GetArray(r, "ignore_fields", "VL-Ignore-Fields")
extraFields, err := getExtraFields(r)
if err != nil {
return nil, err
}
streamFields := httputils.GetArray(r, "_stream_fields")
if len(streamFields) == 0 {
if sf := r.Header.Get("VL-Stream-Fields"); len(sf) > 0 {
streamFields = strings.Split(sf, ",")
}
}
ignoreFields := httputils.GetArray(r, "ignore_fields")
if len(ignoreFields) == 0 {
if f := r.Header.Get("VL-Ignore-Fields"); len(f) > 0 {
ignoreFields = strings.Split(f, ",")
}
}
debug := httputils.GetBool(r, "debug")
if !debug {
if dh := r.Header.Get("VL-Debug"); len(dh) > 0 {
hv := strings.ToLower(dh)
switch hv {
case "", "0", "f", "false", "no":
default:
debug = true
}
debug := false
if dv := httputils.GetRequestValue(r, "debug", "VL-Debug"); dv != "" {
debug, err = strconv.ParseBool(dv)
if err != nil {
return nil, fmt.Errorf("cannot parse debug=%q: %w", dv, err)
}
}
debugRequestURI := ""
@ -102,6 +82,7 @@ func GetCommonParams(r *http.Request) (*CommonParams, error) {
MsgFields: msgFields,
StreamFields: streamFields,
IgnoreFields: ignoreFields,
ExtraFields: extraFields,
Debug: debug,
DebugRequestURI: debugRequestURI,
DebugRemoteAddr: debugRemoteAddr,
@ -110,6 +91,26 @@ func GetCommonParams(r *http.Request) (*CommonParams, error) {
return cp, nil
}
func getExtraFields(r *http.Request) ([]logstorage.Field, error) {
efs := httputils.GetArray(r, "extra_fields", "VL-Extra-Fields")
if len(efs) == 0 {
return nil, nil
}
extraFields := make([]logstorage.Field, len(efs))
for i, ef := range efs {
n := strings.Index(ef, "=")
if n <= 0 || n == len(ef)-1 {
return nil, fmt.Errorf(`invalid extra_field format: %q; must be in the form "field=value"`, ef)
}
extraFields[i] = logstorage.Field{
Name: ef[:n],
Value: ef[n+1:],
}
}
return extraFields, nil
}
// GetCommonParamsForSyslog returns common params needed for parsing syslog messages and storing them to the given tenantID.
func GetCommonParamsForSyslog(tenantID logstorage.TenantID) *CommonParams {
// See https://docs.victoriametrics.com/victorialogs/logsql/#unpack_syslog-pipe
@ -146,8 +147,6 @@ type logMessageProcessor struct {
stopCh chan struct{}
lastFlushTime time.Time
tmpFields []logstorage.Field
cp *CommonParams
lr *logstorage.LogRows
}
@ -190,17 +189,6 @@ func (lmp *logMessageProcessor) AddRow(timestamp int64, fields []logstorage.Fiel
return
}
if *defaultMsgValue != "" && !hasMsgField(fields) {
// The log entry doesn't contain mandatory _msg field. Add _msg field with default value then
// according to https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field .
lmp.tmpFields = append(lmp.tmpFields[:0], fields...)
lmp.tmpFields = append(lmp.tmpFields, logstorage.Field{
Name: "_msg",
Value: *defaultMsgValue,
})
fields = lmp.tmpFields
}
lmp.lr.MustAdd(lmp.cp.TenantID, timestamp, fields)
if lmp.cp.Debug {
s := lmp.lr.GetRowString(0)
@ -214,15 +202,6 @@ func (lmp *logMessageProcessor) AddRow(timestamp int64, fields []logstorage.Fiel
}
}
func hasMsgField(fields []logstorage.Field) bool {
for _, f := range fields {
if f.Name == "_msg" {
return len(f.Value) > 0
}
}
return false
}
// flushLocked must be called under locked lmp.mu.
func (lmp *logMessageProcessor) flushLocked() {
lmp.lastFlushTime = time.Now()
@ -244,7 +223,7 @@ func (lmp *logMessageProcessor) MustClose() {
//
// MustClose() must be called on the returned LogMessageProcessor when it is no longer needed.
func (cp *CommonParams) NewLogMessageProcessor() LogMessageProcessor {
lr := logstorage.GetLogRows(cp.StreamFields, cp.IgnoreFields)
lr := logstorage.GetLogRows(cp.StreamFields, cp.IgnoreFields, cp.ExtraFields, *defaultMsgValue)
lmp := &logMessageProcessor{
cp: cp,
lr: lr,

View file

@ -15,6 +15,8 @@ according to [these docs](https://docs.victoriametrics.com/victorialogs/quicksta
## tip
* FEATURE: add an ability to specify extra fields for logs ingested via [HTTP-based data ingestion protocols](https://docs.victoriametrics.com/victorialogs/data-ingestion/#http-apis). See `extra_fields` query arg and `VL-Extra-Fields` HTTP header in [these docs](https://docs.victoriametrics.com/victorialogs/data-ingestion/#http-parameters).
## [v0.40.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.40.0-victorialogs)
Released at 2024-10-31

View file

@ -189,66 +189,78 @@ HTTP query string parameters have priority over HTTP Headers.
#### HTTP Query string parameters
List of supported [Query string](https://en.wikipedia.org/wiki/Query_string) parameters:
All the [HTTP-based data ingestion protocols](#http-apis) support the following [HTTP query string](https://en.wikipedia.org/wiki/Query_string) args:
- `_msg_field` - it must contain the name of the [log field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model)
with the [log message](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field) generated by the log shipper.
- `_msg_field` - the name of the [log field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model)
containing [log message](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field).
This is usually the `message` field for Filebeat and Logstash.
The `_msg_field` arg may contain comma-separated list of field names. In this case the first non-empty field from the list
is treated as [log message](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field).
If the `_msg_field` parameter isn't set, then VictoriaLogs reads the log message from the `_msg` field.
If the `_msg_field` arg isn't set, then VictoriaLogs reads the log message from the `_msg` field. If the `_msg` field is empty,
then it is set to `-defaultMsgValue` command-line flag value.
- `_time_field` - it must contain the name of the [log field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model)
with the [log timestamp](https://docs.victoriametrics.com/victorialogs/keyconcepts/#time-field) generated by the log shipper.
- `_time_field` - the name of the [log field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model)
containing [log timestamp](https://docs.victoriametrics.com/victorialogs/keyconcepts/#time-field).
This is usually the `@timestamp` field for Filebeat and Logstash.
If the `_time_field` parameter isn't set, then VictoriaLogs reads the timestamp from the `_time` field.
If this field doesn't exist, then the current timestamp is used.
- `_stream_fields` - it should contain comma-separated list of [log field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) names,
which uniquely identify every [log stream](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields) collected the log shipper.
If the `_stream_fields` parameter isn't set, then all the ingested logs are written to default log stream - `{}`.
If the `_time_field` arg isn't set, then VictoriaLogs reads the timestamp from the `_time` field. If this field doesn't exist, then the current timestamp is used.
- `ignore_fields` - this parameter may contain the list of [log field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) names,
- `_stream_fields` - comma-separated list of [log field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) names,
which uniquely identify every [log stream](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields).
If the `_stream_fields` arg isn't set, then all the ingested logs are written to default log stream - `{}`.
- `ignore_fields` - an optional comma-separated list of [log field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) names,
which must be ignored during data ingestion.
- `debug` - if this parameter is set to `1`, then the ingested logs aren't stored in VictoriaLogs. Instead,
- `extra_fields` - an optional comma-separated list [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model),
which must be added to all the ingested logs. The format of every `extra_fields` entry is `field_name=field_value`.
If the log entry contains fields from the `extra_fields`, then they are overwritten by the values specified in `extra_fields`.
- `debug` - if this arg is set to `1`, then the ingested logs aren't stored in VictoriaLogs. Instead,
the ingested data is logged by VictoriaLogs, so it can be investigated later.
See also [HTTP headers](#http-headers).
#### HTTP headers
List of supported [HTTP Headers](https://en.wikipedia.org/wiki/List_of_HTTP_header_fields) parameters:
All the [HTTP-based data ingestion protocols](#http-apis) support the following [HTTP Headers](https://en.wikipedia.org/wiki/List_of_HTTP_header_fields)
additionally to [HTTP query args](#http-query-string-parameters):
- `AccountID` - may contain the needed accountID of tenant to ingest data to. See [multitenancy docs](https://docs.victoriametrics.com/victorialogs/#multitenancy) for details.
- `AccountID` - accountID of the tenant to ingest data to. See [multitenancy docs](https://docs.victoriametrics.com/victorialogs/#multitenancy) for details.
- `ProjectID`- may contain the projectID needed of tenant to ingest data to. See [multitenancy docs](https://docs.victoriametrics.com/victorialogs/#multitenancy) for details.
VictoriaLogs accepts optional `AccountID` and `ProjectID` headers at [data ingestion HTTP APIs](#http-apis).
- `ProjectID`- projectID of the tenant to ingest data to. See [multitenancy docs](https://docs.victoriametrics.com/victorialogs/#multitenancy) for details.
- `VL-Msg-Field` - it must contain the name of the [log field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model)
with the [log message](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field) generated by the log shipper.
- `VL-Msg-Field` - the name of the [log field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model)
containing [log message](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field).
This is usually the `message` field for Filebeat and Logstash.
The `VL-Msg-Field` header may contain comma-separated list of field names. In this case the first non-empty field from the list
is treated as [log message](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field).
If the `VL-Msg-Field` header isn't set, then VictoriaLogs reads the log message from the `_msg` field.
If the `VL-Msg-Field` header isn't set, then VictoriaLogs reads log message from the `_msg` field. If the `_msg` field is empty,
then it is set to `-defaultMsgValue` command-line flag value.
- `VL-Time-Field` - it must contain the name of the [log field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model)
with the [log timestamp](https://docs.victoriametrics.com/victorialogs/keyconcepts/#time-field) generated by the log shipper.
- `VL-Time-Field` - the name of the [log field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model)
containing [log timestamp](https://docs.victoriametrics.com/victorialogs/keyconcepts/#time-field).
This is usually the `@timestamp` field for Filebeat and Logstash.
If the `VL-Time-Field` header isn't set, then VictoriaLogs reads the timestamp from the `_time` field.
If this field doesn't exist, then the current timestamp is used.
- `VL-Stream-Fields` - it should contain comma-separated list of [log field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) names,
which uniquely identify every [log stream](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields) collected the log shipper.
If the `VL-Time-Field` header isn't set, then VictoriaLogs reads the timestamp from the `_time` field. If this field doesn't exist, then the current timestamp is used.
- `VL-Stream-Fields` - comma-separated list of [log field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) names,
which uniquely identify every [log stream](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields).
If the `VL-Stream-Fields` header isn't set, then all the ingested logs are written to default log stream - `{}`.
- `VL-Ignore-Fields` - this parameter may contain the list of [log field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) names,
- `VL-Ignore-Fields` - an optional comma-separated list of [log field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) names,
which must be ignored during data ingestion.
- `VL-Extra-Field` - an optional comma-separated list of [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model),
which must be added to all the ingested logs. The format of every `extra_fields` entry is `field_name=field_value`.
If the log entry contains fields from the `extra_fields`, then they are overwritten by the values specified in `extra_fields`.
- `VL-Debug` - if this parameter is set to `1`, then the ingested logs aren't stored in VictoriaLogs. Instead,
the ingested data is logged by VictoriaLogs, so it can be investigated later.

View file

@ -5,11 +5,20 @@ import (
"strings"
)
// GetArray returns an array of comma-separated values from r arg with the argKey name.
func GetArray(r *http.Request, argKey string) []string {
v := r.FormValue(argKey)
// GetArray returns an array of comma-separated values from r with the argKey quey arg or with headerKey header.
func GetArray(r *http.Request, argKey, headerKey string) []string {
v := GetRequestValue(r, argKey, headerKey)
if v == "" {
return nil
}
return strings.Split(v, ",")
}
// GetRequestValue returns r value for the given argKey query arg or for the given headerKey header.
func GetRequestValue(r *http.Request, argKey, headerKey string) string {
v := r.FormValue(argKey)
if v == "" {
v = r.Header.Get(headerKey)
}
return v
}

View file

@ -84,7 +84,7 @@ func testFilterMatchForStreamID(t *testing.T, f filter, expectedRowIdxs []int) {
func generateTestLogStreams(s *Storage, tenantID TenantID, getMsgValue func(int) string, rowsCount, streamsCount int) {
streamFields := []string{"host", "app"}
lr := GetLogRows(streamFields, nil)
lr := GetLogRows(streamFields, nil, nil, "")
var fields []Field
for i := range rowsCount {
fields = append(fields[:0], Field{

View file

@ -249,7 +249,7 @@ func generateRowsFromColumns(s *Storage, tenantID TenantID, columns []column) {
"job",
"instance",
}
lr := GetLogRows(streamTags, nil)
lr := GetLogRows(streamTags, nil, nil, "")
var fields []Field
for i := range columns[0].values {
// Add stream tags

View file

@ -123,7 +123,7 @@ func testFilterMatchForTimestamps(t *testing.T, timestamps []int64, f filter, ex
}
func generateRowsFromTimestamps(s *Storage, tenantID TenantID, timestamps []int64, getValue func(rowIdx int) string) {
lr := GetLogRows(nil, nil)
lr := GetLogRows(nil, nil, nil, "")
var fields []Field
for i, timestamp := range timestamps {
fields = append(fields[:0], Field{

View file

@ -22,7 +22,7 @@ func TestInmemoryPartMustInitFromRows(t *testing.T) {
// make a copy of lr - it is used for comapring the results later,
// since lr may be modified by inmemoryPart.mustInitFromRows()
lrOrig := GetLogRows(nil, nil)
lrOrig := GetLogRows(nil, nil, nil, "")
for i, timestamp := range lr.timestamps {
if timestamp < minTimestampExpected {
minTimestampExpected = timestamp
@ -72,7 +72,7 @@ func TestInmemoryPartMustInitFromRows(t *testing.T) {
}
}
f(GetLogRows(nil, nil), 0, 0)
f(GetLogRows(nil, nil, nil, ""), 0, 0)
// Check how inmemoryPart works with a single stream
f(newTestLogRows(1, 1, 0), 1, 0.7)
@ -108,7 +108,7 @@ func TestInmemoryPartInitFromBlockStreamReaders(t *testing.T) {
maxTimestampExpected := int64(math.MinInt64)
// make a copy of rrss in order to compare the results after merge.
lrOrig := GetLogRows(nil, nil)
lrOrig := GetLogRows(nil, nil, nil, "")
for _, lr := range lrs {
uncompressedSizeBytesExpected += uncompressedRowsSizeBytes(lr.rows)
rowsCountExpected += len(lr.timestamps)
@ -188,8 +188,8 @@ func TestInmemoryPartInitFromBlockStreamReaders(t *testing.T) {
// Check empty readers
f(nil, 0, 0)
f([]*LogRows{GetLogRows(nil, nil)}, 0, 0)
f([]*LogRows{GetLogRows(nil, nil), GetLogRows(nil, nil)}, 0, 0)
f([]*LogRows{GetLogRows(nil, nil, nil, "")}, 0, 0)
f([]*LogRows{GetLogRows(nil, nil, nil, ""), GetLogRows(nil, nil, nil, "")}, 0, 0)
// Check merge with a single reader
f([]*LogRows{newTestLogRows(1, 1, 0)}, 1, 0.7)
@ -235,7 +235,7 @@ func newTestLogRows(streams, rowsPerStream int, seed int64) *LogRows {
streamTags := []string{
"some-stream-tag",
}
lr := GetLogRows(streamTags, nil)
lr := GetLogRows(streamTags, nil, nil, "")
rng := rand.New(rand.NewSource(seed))
var fields []Field
for i := 0; i < streams; i++ {
@ -322,7 +322,7 @@ func checkEqualRows(lrResult, lrOrig *LogRows) error {
//
// This function is for testing and debugging purposes only.
func (mp *inmemoryPart) readLogRows(sbu *stringsBlockUnmarshaler, vd *valuesDecoder) *LogRows {
lr := GetLogRows(nil, nil)
lr := GetLogRows(nil, nil, nil, "")
bsr := getBlockStreamReader()
defer putBlockStreamReader(bsr)
bsr.MustInitFromInmemoryPart(mp)

View file

@ -3,16 +3,14 @@ package logstorage
import (
"sort"
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
)
// LogRows holds a set of rows needed for Storage.MustAddRows
//
// LogRows must be obtained via GetLogRows()
type LogRows struct {
// buf holds all the bytes referred by items in LogRows
buf []byte
// a holds all the bytes referred by items in LogRows
a arena
// fieldsBuf holds all the fields referred by items in LogRows
fieldsBuf []Field
@ -37,6 +35,15 @@ type LogRows struct {
// ignoreFields contains names for log fields, which must be skipped during data ingestion
ignoreFields map[string]struct{}
// extraFields contains extra fields to add to all the logs at MustAdd().
extraFields []Field
// extraStreamFields contains extraFields, which must be treated as stream fields.
extraStreamFields []Field
// defaultMsgValue contains default value for missing _msg field
defaultMsgValue string
}
type sortedFields []Field
@ -79,11 +86,16 @@ func (lr *LogRows) Reset() {
for k := range ifs {
delete(ifs, k)
}
lr.extraFields = nil
lr.extraStreamFields = lr.extraStreamFields[:0]
lr.defaultMsgValue = ""
}
// ResetKeepSettings resets rows stored in lr, while keeping its settings passed to GetLogRows().
func (lr *LogRows) ResetKeepSettings() {
lr.buf = lr.buf[:0]
lr.a.reset()
fb := lr.fieldsBuf
for i := range fb {
@ -116,7 +128,7 @@ func (lr *LogRows) ResetKeepSettings() {
// NeedFlush returns true if lr contains too much data, so it must be flushed to the storage.
func (lr *LogRows) NeedFlush() bool {
return len(lr.buf) > (maxUncompressedBlockSize/8)*7
return len(lr.a.b) > (maxUncompressedBlockSize/8)*7
}
// MustAdd adds a log entry with the given args to lr.
@ -126,15 +138,16 @@ func (lr *LogRows) NeedFlush() bool {
//
// field names longer than MaxFieldNameSize are automatically truncated to MaxFieldNameSize length.
func (lr *LogRows) MustAdd(tenantID TenantID, timestamp int64, fields []Field) {
// Compose StreamTags from fields according to lr.streamFields
sfs := lr.streamFields
// Compose StreamTags from fields according to lr.streamFields and lr.extraStreamFields
st := GetStreamTags()
for i := range fields {
f := &fields[i]
if _, ok := sfs[f.Name]; ok {
for _, f := range fields {
if _, ok := lr.streamFields[f.Name]; ok {
st.Add(f.Name, f.Value)
}
}
for _, f := range lr.extraStreamFields {
st.Add(f.Name, f.Value)
}
// Marshal StreamTags
bb := bbPool.Get()
@ -152,23 +165,45 @@ func (lr *LogRows) MustAdd(tenantID TenantID, timestamp int64, fields []Field) {
}
func (lr *LogRows) mustAddInternal(sid streamID, timestamp int64, fields []Field, streamTagsCanonical []byte) {
buf := lr.buf
bufLen := len(buf)
buf = append(buf, streamTagsCanonical...)
streamTagsCanonicalCopy := lr.a.copyBytes(streamTagsCanonical)
lr.streamTagsCanonicals = append(lr.streamTagsCanonicals, streamTagsCanonicalCopy)
lr.streamTagsCanonicals = append(lr.streamTagsCanonicals, buf[bufLen:])
lr.streamIDs = append(lr.streamIDs, sid)
lr.timestamps = append(lr.timestamps, timestamp)
// Store all the fields
ifs := lr.ignoreFields
fieldsLen := len(lr.fieldsBuf)
hasMsgField := lr.addFieldsInternal(fields, lr.ignoreFields)
if lr.addFieldsInternal(lr.extraFields, nil) {
hasMsgField = true
}
// Add optional default _msg field
if !hasMsgField && lr.defaultMsgValue != "" {
value := lr.a.copyString(lr.defaultMsgValue)
lr.fieldsBuf = append(lr.fieldsBuf, Field{
Value: value,
})
}
// Sort fields by name
lr.sf = lr.fieldsBuf[fieldsLen:]
sort.Sort(&lr.sf)
// Add log row with sorted fields to lr.rows
lr.rows = append(lr.rows, lr.sf)
}
func (lr *LogRows) addFieldsInternal(fields []Field, ignoreFields map[string]struct{}) bool {
if len(fields) == 0 {
return false
}
fb := lr.fieldsBuf
fieldsLen := len(fb)
hasMsgField := false
for i := range fields {
f := &fields[i]
if _, ok := ifs[f.Name]; ok {
// Skip fields from the ifs map
if _, ok := ignoreFields[f.Name]; ok {
continue
}
if f.Value == "" {
@ -179,26 +214,20 @@ func (lr *LogRows) mustAddInternal(sid streamID, timestamp int64, fields []Field
fb = append(fb, Field{})
dstField := &fb[len(fb)-1]
bufLen = len(buf)
fieldName := f.Name
if len(fieldName) > MaxFieldNameSize {
fieldName = fieldName[:MaxFieldNameSize]
}
if fieldName != "_msg" {
buf = append(buf, fieldName...)
if fieldName == "_msg" {
fieldName = ""
hasMsgField = true
}
dstField.Name = bytesutil.ToUnsafeString(buf[bufLen:])
bufLen = len(buf)
buf = append(buf, f.Value...)
dstField.Value = bytesutil.ToUnsafeString(buf[bufLen:])
dstField.Name = lr.a.copyString(fieldName)
dstField.Value = lr.a.copyString(f.Value)
}
lr.sf = fb[fieldsLen:]
sort.Sort(&lr.sf)
lr.rows = append(lr.rows, lr.sf)
lr.fieldsBuf = fb
lr.buf = buf
return hasMsgField
}
// GetRowString returns string representation of the row with the given idx.
@ -225,9 +254,11 @@ func (lr *LogRows) GetRowString(idx int) string {
//
// streamFields is a set of field names, which must be associated with the stream.
// ignoreFields is a set of field names, which must be ignored during data ingestion.
// extraFields is a set of fields, which must be added to all the logs passed to MustAdd().
// defaultMsgValue is the default value to store in non-existing or empty _msg.
//
// Return back it to the pool with PutLogRows() when it is no longer needed.
func GetLogRows(streamFields, ignoreFields []string) *LogRows {
func GetLogRows(streamFields, ignoreFields []string, extraFields []Field, defaultMsgValue string) *LogRows {
v := logRowsPool.Get()
if v == nil {
v = &LogRows{}
@ -244,6 +275,14 @@ func GetLogRows(streamFields, ignoreFields []string) *LogRows {
sfs[f] = struct{}{}
}
// Initialize extraStreamFields
for _, f := range extraFields {
if _, ok := sfs[f.Name]; ok {
lr.extraStreamFields = append(lr.extraStreamFields, f)
delete(sfs, f.Name)
}
}
// Initialize ignoreFields
ifs := lr.ignoreFields
if ifs == nil {
@ -253,8 +292,17 @@ func GetLogRows(streamFields, ignoreFields []string) *LogRows {
for _, f := range ignoreFields {
if f != "" {
ifs[f] = struct{}{}
delete(sfs, f)
}
}
for _, f := range extraFields {
// Extra fields must orverride the existing fields for the sake of consistency and security,
// so the client won't be able to override them.
ifs[f.Name] = struct{}{}
}
lr.extraFields = extraFields
lr.defaultMsgValue = defaultMsgValue
return lr
}

View file

@ -0,0 +1,145 @@
package logstorage
import (
"reflect"
"testing"
)
func TestLogRows_DefaultMsgValue(t *testing.T) {
type opts struct {
rows []string
streamFields []string
ignoreFields []string
extraFields []Field
defaultMsgValue string
resultExpected []string
}
f := func(o opts) {
t.Helper()
lr := GetLogRows(o.streamFields, o.ignoreFields, o.extraFields, o.defaultMsgValue)
defer PutLogRows(lr)
tid := TenantID{
AccountID: 123,
ProjectID: 456,
}
p := GetJSONParser()
defer PutJSONParser(p)
for i, r := range o.rows {
if err := p.ParseLogMessage([]byte(r)); err != nil {
t.Fatalf("unexpected error when parsing %q: %s", r, err)
}
timestamp := int64(i)*1_000 + 1
lr.MustAdd(tid, timestamp, p.Fields)
}
var result []string
for i := range o.rows {
s := lr.GetRowString(i)
result = append(result, s)
}
if !reflect.DeepEqual(result, o.resultExpected) {
t.Fatalf("unexpected result\ngot\n%v\nwant\n%v", result, o.resultExpected)
}
}
var o opts
f(o)
// default options
o = opts{
rows: []string{
`{"foo":"bar"}`,
`{}`,
`{"foo":"bar","a":"b"}`,
},
resultExpected: []string{
`{"_stream":"{}","_time":"1970-01-01T00:00:00.000000001Z","foo":"bar"}`,
`{"_stream":"{}","_time":"1970-01-01T00:00:00.000001001Z"}`,
`{"_stream":"{}","_time":"1970-01-01T00:00:00.000002001Z","a":"b","foo":"bar"}`,
},
}
f(o)
// stream fields
o = opts{
rows: []string{
`{"x":"y","foo":"bar"}`,
`{"x":"y","foo":"bar","abc":"de"}`,
`{}`,
},
streamFields: []string{"foo", "abc"},
resultExpected: []string{
`{"_stream":"{foo=\"bar\"}","_time":"1970-01-01T00:00:00.000000001Z","foo":"bar","x":"y"}`,
`{"_stream":"{abc=\"de\",foo=\"bar\"}","_time":"1970-01-01T00:00:00.000001001Z","abc":"de","foo":"bar","x":"y"}`,
`{"_stream":"{}","_time":"1970-01-01T00:00:00.000002001Z"}`,
},
}
f(o)
// ignore fields
o = opts{
rows: []string{
`{"x":"y","foo":"bar"}`,
`{"x":"y"}`,
`{}`,
},
streamFields: []string{"foo", "abc", "x"},
ignoreFields: []string{"foo"},
resultExpected: []string{
`{"_stream":"{x=\"y\"}","_time":"1970-01-01T00:00:00.000000001Z","x":"y"}`,
`{"_stream":"{x=\"y\"}","_time":"1970-01-01T00:00:00.000001001Z","x":"y"}`,
`{"_stream":"{}","_time":"1970-01-01T00:00:00.000002001Z"}`,
},
}
f(o)
// extra fields
o = opts{
rows: []string{
`{"x":"y","foo":"bar"}`,
`{}`,
},
streamFields: []string{"foo", "abc", "x"},
ignoreFields: []string{"foo"},
extraFields: []Field{
{
Name: "foo",
Value: "test",
},
{
Name: "abc",
Value: "1234",
},
},
resultExpected: []string{
`{"_stream":"{abc=\"1234\",foo=\"test\",x=\"y\"}","_time":"1970-01-01T00:00:00.000000001Z","abc":"1234","foo":"test","x":"y"}`,
`{"_stream":"{abc=\"1234\",foo=\"test\"}","_time":"1970-01-01T00:00:00.000001001Z","abc":"1234","foo":"test"}`,
},
}
f(o)
// default _msg value
o = opts{
rows: []string{
`{"x":"y","foo":"bar"}`,
`{"_msg":"ppp"}`,
`{"abc":"ppp"}`,
},
streamFields: []string{"abc", "x"},
defaultMsgValue: "qwert",
resultExpected: []string{
`{"_msg":"qwert","_stream":"{x=\"y\"}","_time":"1970-01-01T00:00:00.000000001Z","foo":"bar","x":"y"}`,
`{"_msg":"ppp","_stream":"{}","_time":"1970-01-01T00:00:00.000001001Z"}`,
`{"_msg":"qwert","_stream":"{abc=\"ppp\"}","_time":"1970-01-01T00:00:00.000002001Z","abc":"ppp"}`,
},
}
f(o)
}

View file

@ -52,7 +52,7 @@ func BenchmarkLogRowsMustAdd(b *testing.B) {
}
func benchmarkLogRowsMustAdd(rows [][]Field, streamFields []string) {
lr := GetLogRows(streamFields, nil)
lr := GetLogRows(streamFields, nil, nil, "")
var tid TenantID
for i, fields := range rows {
tid.AccountID = uint32(i)

View file

@ -549,7 +549,7 @@ func (s *Storage) MustAddRows(lr *LogRows) {
}
lrPart := m[day]
if lrPart == nil {
lrPart = GetLogRows(nil, nil)
lrPart = GetLogRows(nil, nil, nil, "")
m[day] = lrPart
}
lrPart.mustAddInternal(lr.streamIDs[i], ts, lr.rows[i], lr.streamTagsCanonicals[i])

View file

@ -47,7 +47,7 @@ func TestStorageRunQuery(t *testing.T) {
for j := 0; j < streamsPerTenant; j++ {
streamIDValue := fmt.Sprintf("stream_id=%d", j)
for k := 0; k < blocksPerStream; k++ {
lr := GetLogRows(streamTags, nil)
lr := GetLogRows(streamTags, nil, nil, "")
for m := 0; m < rowsPerBlock; m++ {
timestamp := baseTimestamp + int64(m)*1e9 + int64(k)
// Append stream fields
@ -774,7 +774,7 @@ func TestStorageSearch(t *testing.T) {
allTenantIDs = append(allTenantIDs, tenantID)
for j := 0; j < streamsPerTenant; j++ {
for k := 0; k < blocksPerStream; k++ {
lr := GetLogRows(streamTags, nil)
lr := GetLogRows(streamTags, nil, nil, "")
for m := 0; m < rowsPerBlock; m++ {
timestamp := baseTimestamp + int64(m)*1e9 + int64(k)
// Append stream fields