diff --git a/docs/VictoriaLogs/CHANGELOG.md b/docs/VictoriaLogs/CHANGELOG.md index fa1d2db3f..4fb8db01f 100644 --- a/docs/VictoriaLogs/CHANGELOG.md +++ b/docs/VictoriaLogs/CHANGELOG.md @@ -19,6 +19,7 @@ according to [these docs](https://docs.victoriametrics.com/victorialogs/quicksta ## tip +* FEATURE: add [`unpack_syslog` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#unpack_syslog-pipe) for unpacking [syslog](https://en.wikipedia.org/wiki/Syslog) messages from [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). * FEATURE: parse timestamps in [`_time` filter](https://docs.victoriametrics.com/victorialogs/logsql/#time-filter) with nanosecond precision. * FEATURE: return the last `N` matching logs from [`/select/logsql/query` HTTP API](https://docs.victoriametrics.com/victorialogs/querying/#querying-logs) with the maximum timestamps if `limit=N` query arg is passed to it. Previously a random subset of matching logs could be returned, which could complicate investigation of the returned logs. * FEATURE: add [`drop_empty_fields` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#drop_empty_fields-pipe) for dropping [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) with empty values. diff --git a/docs/VictoriaLogs/LogsQL.md b/docs/VictoriaLogs/LogsQL.md index edffeb18e..9403735d4 100644 --- a/docs/VictoriaLogs/LogsQL.md +++ b/docs/VictoriaLogs/LogsQL.md @@ -1173,8 +1173,9 @@ LogsQL supports the following pipes: - [`sort`](#sort-pipe) sorts logs by the given [fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). - [`stats`](#stats-pipe) calculates various stats over the selected logs. - [`uniq`](#uniq-pipe) returns unique log entires. -- [`unpack_json`](#unpack_json-pipe) unpacks JSON fields from [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). -- [`unpack_logfmt`](#unpack_logfmt-pipe) unpacks [logfmt](https://brandur.org/logfmt) fields from [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). +- [`unpack_json`](#unpack_json-pipe) unpacks JSON messages from [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). +- [`unpack_logfmt`](#unpack_logfmt-pipe) unpacks [logfmt](https://brandur.org/logfmt) messages from [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). +- [`unpack_syslog`](#unpack_syslog-pipe) [syslog](https://en.wikipedia.org/wiki/Syslog) messages from [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). - [`unroll`](#unroll-pipe) unrolls JSON arrays from [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). ### copy pipe @@ -2127,7 +2128,7 @@ See also: ### unpack_json pipe -`| unpack_json from field_name` pipe unpacks `{"k1":"v1", ..., "kN":"vN"}` JSON from the given input [`field_name`](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) +`| unpack_json from field_name` [pipe](#pipes) unpacks `{"k1":"v1", ..., "kN":"vN"}` JSON from the given input [`field_name`](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) into `k1`, ... `kN` output field names with the corresponding `v1`, ..., `vN` values. It overrides existing fields with names from the `k1`, ..., `kN` list. Other fields remain untouched. Nested JSON is unpacked according to the rules defined [here](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). @@ -2194,6 +2195,7 @@ See also: - [Conditional `unpack_json`](#conditional-unpack_json) - [`unpack_logfmt` pipe](#unpack_logfmt-pipe) +- [`unpack_syslog` pipe](#unpack_syslog-pipe) - [`extract` pipe](#extract-pipe) - [`unroll` pipe](#unroll-pipe) - [`pack_json` pipe](#pack_json-pipe) @@ -2210,7 +2212,7 @@ _time:5m | unpack_json if (ip:"") from foo ### unpack_logfmt pipe -`| unpack_logfmt from field_name` pipe unpacks `k1=v1 ... kN=vN` [logfmt](https://brandur.org/logfmt) fields +`| unpack_logfmt from field_name` [pipe](#pipes) unpacks `k1=v1 ... kN=vN` [logfmt](https://brandur.org/logfmt) fields from the given [`field_name`](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) into `k1`, ... `kN` field names with the corresponding `v1`, ..., `vN` values. It overrides existing fields with names from the `k1`, ..., `kN` list. Other fields remain untouched. @@ -2257,7 +2259,7 @@ in [`_msg` field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#mes _time:5m | extract ' ip=' ``` -If you want to make sure that the unpacked [logfmt](https://brandur.org/logfmt) fields do not clash with the existing fields, then specify common prefix for all the fields extracted from JSON, +If you want to make sure that the unpacked [logfmt](https://brandur.org/logfmt) fields do not clash with the existing fields, then specify common prefix for all the fields extracted from logfmt, by adding `result_prefix "prefix_name"` to `unpack_logfmt`. For example, the following query adds `foo_` prefix for all the unpacked fields from `foo` field: @@ -2278,6 +2280,7 @@ See also: - [Conditional unpack_logfmt](#conditional-unpack_logfmt) - [`unpack_json` pipe](#unpack_json-pipe) +- [`unpack_syslog` pipe](#unpack_syslog-pipe) - [`extract` pipe](#extract-pipe) #### Conditional unpack_logfmt @@ -2291,6 +2294,86 @@ only if `ip` field in the current log entry isn't set or empty: _time:5m | unpack_logfmt if (ip:"") from foo ``` +### unpack_syslog pipe + +`| unpack_syslog from field_name` [pipe](#pipes) unpacks [syslog](https://en.wikipedia.org/wiki/Syslog) message +from the given [`field_name`](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). It understands the following Syslog formats: + +- [RFC3164](https://datatracker.ietf.org/doc/html/rfc3164) aka `MMM DD hh:mm:ss HOSTNAME TAG: MESSAGE` +- [RFC5424](https://datatracker.ietf.org/doc/html/rfc5424) aka `VERSION TIMESTAMP HOSTNAME APP-NAME PROCID MSGID [STRUCTURED-DATA] MESSAGE` + +The following fields are unpacked: + +- `priority` - it is obtained from `PRI`. +- `facility` - it is calculated as `PRI / 8`. +- `severity` - it is calculated as `PRI % 8`. +- `timestamp` - timestamp in [ISO8601 format](https://en.wikipedia.org/wiki/ISO_8601). The `MMM DD hh:mm:ss` timestamp in [RFC3164](https://datatracker.ietf.org/doc/html/rfc3164) + is automatically converted into [ISO8601 format](https://en.wikipedia.org/wiki/ISO_8601) by assuming that the timestamp belongs to the last 12 months. +- `hostname` +- `app_name` +- `proc_id` +- `msg_id` +- `message` + +The `[STRUCTURED-DATA]` is parsed into fields with the `SD-ID` name and `param1="value1" ... paramN="valueN"` value +according to [the specification](https://datatracker.ietf.org/doc/html/rfc5424#section-6.3). The value then can be parsed to separate fields with [`unpack_logfmt` pipe](#unpack_logfmt-pipe). + +For example, the following query unpacks [syslog](https://en.wikipedia.org/wiki/Syslog) message from the [`_msg` field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field) +across logs for the last 5 minutes: + +```logsql +_time:5m | unpack_syslog from _msg +``` + +The `from _json` part can be omitted when [syslog](https://en.wikipedia.org/wiki/Syslog) message is unpacked +from the [`_msg` field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field). +The following query is equivalent to the previous one: + +```logsql +_time:5m | unpack_syslog +``` + +If it is needed to preserve the original non-empty field values, then add `keep_original_fields` to the end of `unpack_syslog ...`: + +```logsql +_time:5m | unpack_syslog keep_original_fields +``` + +If you want to make sure that the unpacked [syslog](https://en.wikipedia.org/wiki/Syslog) fields do not clash with the existing fields, +then specify common prefix for all the fields extracted from syslog, by adding `result_prefix "prefix_name"` to `unpack_syslog`. +For example, the following query adds `foo_` prefix for all the unpacked fields from `foo` field: + +```logsql +_time:5m | unpack_syslog from foo result_prefix "foo_" +``` + +Performance tips: + +- It is better from performance and resource usage PoV ingesting parsed [syslog](https://en.wikipedia.org/wiki/Syslog) messages into VictoriaLogs + according to the [supported data model](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) + instead of ingesting unparsed syslog lines into VictoriaLogs and then parsing them at query time with [`unpack_syslog` pipe](#unpack_syslog-pipe). + +- It is recommended using more specific [log filters](#filters) in order to reduce the number of log entries, which are passed to `unpack_syslog`. + See [general performance tips](#performance-tips) for details. + +See also: + +- [Conditional unpack_syslog](#conditional-unpack_syslog) +- [`unpack_json` pipe](#unpack_json-pipe) +- [`unpack_logfmt` pipe](#unpack_logfmt-pipe) +- [`extract` pipe](#extract-pipe) + +#### Conditional unpack_syslog + +If the [`unpack_syslog` pipe](#unpack_syslog-pipe) musn't be applied to every [log entry](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model), +then add `if ()` after `unpack_syslog`. +The `` can contain arbitrary [filters](#filters). For example, the following query unpacks syslog message fields from `foo` field +only if `hostname` field in the current log entry isn't set or empty: + +```logsql +_time:5m | unpack_syslog if (hostname:"") from foo +``` + ### unroll pipe `| unroll by (field1, ..., fieldN)` [pipe](#pipes) can be used for unrolling JSON arrays from `field1`, `fieldN` diff --git a/lib/logstorage/pipe.go b/lib/logstorage/pipe.go index 4f4000718..ff2178025 100644 --- a/lib/logstorage/pipe.go +++ b/lib/logstorage/pipe.go @@ -226,6 +226,12 @@ func parsePipe(lex *lexer) (pipe, error) { return nil, fmt.Errorf("cannot parse 'unpack_logfmt' pipe: %w", err) } return pu, nil + case lex.isKeyword("unpack_syslog"): + pu, err := parsePipeUnpackSyslog(lex) + if err != nil { + return nil, fmt.Errorf("cannot parse 'unpack_syslog' pipe: %w", err) + } + return pu, nil case lex.isKeyword("unroll"): pu, err := parsePipeUnroll(lex) if err != nil { diff --git a/lib/logstorage/pipe_unpack_syslog.go b/lib/logstorage/pipe_unpack_syslog.go new file mode 100644 index 000000000..f693739c3 --- /dev/null +++ b/lib/logstorage/pipe_unpack_syslog.go @@ -0,0 +1,146 @@ +package logstorage + +import ( + "fmt" + "sync/atomic" + "time" +) + +// pipeUnpackSyslog processes '| unpack_syslog ...' pipe. +// +// See https://docs.victoriametrics.com/victorialogs/logsql/#unpack_syslog-pipe +type pipeUnpackSyslog struct { + // fromField is the field to unpack syslog fields from + fromField string + + // resultPrefix is prefix to add to unpacked field names + resultPrefix string + + keepOriginalFields bool + + // iff is an optional filter for skipping unpacking syslog + iff *ifFilter +} + +func (pu *pipeUnpackSyslog) String() string { + s := "unpack_syslog" + if pu.iff != nil { + s += " " + pu.iff.String() + } + if !isMsgFieldName(pu.fromField) { + s += " from " + quoteTokenIfNeeded(pu.fromField) + } + if pu.resultPrefix != "" { + s += " result_prefix " + quoteTokenIfNeeded(pu.resultPrefix) + } + if pu.keepOriginalFields { + s += " keep_original_fields" + } + return s +} + +func (pu *pipeUnpackSyslog) updateNeededFields(neededFields, unneededFields fieldsSet) { + updateNeededFieldsForUnpackPipe(pu.fromField, nil, pu.keepOriginalFields, false, pu.iff, neededFields, unneededFields) +} + +func (pu *pipeUnpackSyslog) optimize() { + pu.iff.optimizeFilterIn() +} + +func (pu *pipeUnpackSyslog) hasFilterInWithQuery() bool { + return pu.iff.hasFilterInWithQuery() +} + +func (pu *pipeUnpackSyslog) initFilterInValues(cache map[string][]string, getFieldValuesFunc getFieldValuesFunc) (pipe, error) { + iffNew, err := pu.iff.initFilterInValues(cache, getFieldValuesFunc) + if err != nil { + return nil, err + } + puNew := *pu + puNew.iff = iffNew + return &puNew, nil +} + +func (pu *pipeUnpackSyslog) newPipeProcessor(workersCount int, _ <-chan struct{}, _ func(), ppNext pipeProcessor) pipeProcessor { + unpackSyslog := func(uctx *fieldsUnpackerContext, s string) { + year := currentYear.Load() + p := getSyslogParser(int(year)) + + p.parse(s) + for _, f := range p.fields { + uctx.addField(f.Name, f.Value) + } + + putSyslogParser(p) + } + + return newPipeUnpackProcessor(workersCount, unpackSyslog, ppNext, pu.fromField, pu.resultPrefix, pu.keepOriginalFields, false, pu.iff) +} + +var currentYear atomic.Int64 + +func init() { + year := time.Now().UTC().Year() + currentYear.Store(int64(year)) + go func() { + for { + t := time.Now().UTC() + nextYear := time.Date(t.Year()+1, 1, 1, 0, 0, 0, 0, time.UTC) + d := nextYear.Sub(t) + time.Sleep(d) + year := time.Now().UTC().Year() + currentYear.Store(int64(year)) + } + }() +} + +func parsePipeUnpackSyslog(lex *lexer) (*pipeUnpackSyslog, error) { + if !lex.isKeyword("unpack_syslog") { + return nil, fmt.Errorf("unexpected token: %q; want %q", lex.token, "unpack_syslog") + } + lex.nextToken() + + var iff *ifFilter + if lex.isKeyword("if") { + f, err := parseIfFilter(lex) + if err != nil { + return nil, err + } + iff = f + } + + fromField := "_msg" + if lex.isKeyword("from") { + lex.nextToken() + f, err := parseFieldName(lex) + if err != nil { + return nil, fmt.Errorf("cannot parse 'from' field name: %w", err) + } + fromField = f + } + + resultPrefix := "" + if lex.isKeyword("result_prefix") { + lex.nextToken() + p, err := getCompoundToken(lex) + if err != nil { + return nil, fmt.Errorf("cannot parse 'result_prefix': %w", err) + } + resultPrefix = p + } + + keepOriginalFields := false + if lex.isKeyword("keep_original_fields") { + lex.nextToken() + keepOriginalFields = true + } + + pu := &pipeUnpackSyslog{ + fromField: fromField, + resultPrefix: resultPrefix, + keepOriginalFields: keepOriginalFields, + iff: iff, + } + + return pu, nil +} diff --git a/lib/logstorage/pipe_unpack_syslog_test.go b/lib/logstorage/pipe_unpack_syslog_test.go new file mode 100644 index 000000000..ed1a1a7d4 --- /dev/null +++ b/lib/logstorage/pipe_unpack_syslog_test.go @@ -0,0 +1,239 @@ +package logstorage + +import ( + "testing" +) + +func TestParsePipeUnpackSyslogSuccess(t *testing.T) { + f := func(pipeStr string) { + t.Helper() + expectParsePipeSuccess(t, pipeStr) + } + + f(`unpack_syslog`) + f(`unpack_syslog keep_original_fields`) + f(`unpack_syslog if (a:x)`) + f(`unpack_syslog if (a:x) keep_original_fields`) + f(`unpack_syslog from x`) + f(`unpack_syslog from x keep_original_fields`) + f(`unpack_syslog if (a:x) from x`) + f(`unpack_syslog from x result_prefix abc`) + f(`unpack_syslog if (a:x) from x result_prefix abc`) + f(`unpack_syslog result_prefix abc`) + f(`unpack_syslog if (a:x) result_prefix abc`) +} + +func TestParsePipeUnpackSyslogFailure(t *testing.T) { + f := func(pipeStr string) { + t.Helper() + expectParsePipeFailure(t, pipeStr) + } + + f(`unpack_syslog foo`) + f(`unpack_syslog if`) + f(`unpack_syslog if (x:y) foobar`) + f(`unpack_syslog from`) + f(`unpack_syslog from x y`) + f(`unpack_syslog from x if`) + f(`unpack_syslog from x result_prefix`) + f(`unpack_syslog from x result_prefix a b`) + f(`unpack_syslog from x result_prefix a if`) + f(`unpack_syslog result_prefix`) + f(`unpack_syslog result_prefix a b`) + f(`unpack_syslog result_prefix a if`) +} + +func TestPipeUnpackSyslog(t *testing.T) { + f := func(pipeStr string, rows, rowsExpected [][]Field) { + t.Helper() + expectPipeResults(t, pipeStr, rows, rowsExpected) + } + + // no skip empty results + f("unpack_syslog", [][]Field{ + { + {"_msg", `<165>1 2023-06-03T17:42:32.123456789Z mymachine.example.com appname 12345 ID47 - This is a test message with structured data`}, + {"foo", "321"}, + }, + }, [][]Field{ + { + {"_msg", `<165>1 2023-06-03T17:42:32.123456789Z mymachine.example.com appname 12345 ID47 - This is a test message with structured data`}, + {"foo", "321"}, + {"priority", "165"}, + {"facility", "20"}, + {"severity", "5"}, + {"timestamp", "2023-06-03T17:42:32.123456789Z"}, + {"hostname", "mymachine.example.com"}, + {"app_name", "appname"}, + {"proc_id", "12345"}, + {"msg_id", "ID47"}, + {"message", "This is a test message with structured data"}, + }, + }) + + // keep original fields + f("unpack_syslog keep_original_fields", [][]Field{ + { + {"_msg", `<165>1 2023-06-03T17:42:32.123456789Z mymachine.example.com appname 12345 ID47 - This is a test message with structured data`}, + {"foo", "321"}, + {"app_name", "foobar"}, + {"msg_id", "baz"}, + }, + }, [][]Field{ + { + {"_msg", `<165>1 2023-06-03T17:42:32.123456789Z mymachine.example.com appname 12345 ID47 - This is a test message with structured data`}, + {"foo", "321"}, + {"priority", "165"}, + {"facility", "20"}, + {"severity", "5"}, + {"timestamp", "2023-06-03T17:42:32.123456789Z"}, + {"hostname", "mymachine.example.com"}, + {"app_name", "foobar"}, + {"proc_id", "12345"}, + {"msg_id", "baz"}, + {"message", "This is a test message with structured data"}, + }, + }) + + // unpack from other field + f("unpack_syslog from x", [][]Field{ + { + {"x", `<165>1 2023-06-03T17:42:32.123456789Z mymachine.example.com appname 12345 ID47 - This is a test message with structured data`}, + }, + }, [][]Field{ + { + {"x", `<165>1 2023-06-03T17:42:32.123456789Z mymachine.example.com appname 12345 ID47 - This is a test message with structured data`}, + {"priority", "165"}, + {"facility", "20"}, + {"severity", "5"}, + {"timestamp", "2023-06-03T17:42:32.123456789Z"}, + {"hostname", "mymachine.example.com"}, + {"app_name", "appname"}, + {"proc_id", "12345"}, + {"msg_id", "ID47"}, + {"message", "This is a test message with structured data"}, + }, + }) + + // failed if condition + f("unpack_syslog if (foo:bar)", [][]Field{ + { + {"_msg", `<165>1 2023-06-03T17:42:32.123456789Z mymachine.example.com appname 12345 ID47 - This is a test message with structured data`}, + }, + }, [][]Field{ + { + {"_msg", `<165>1 2023-06-03T17:42:32.123456789Z mymachine.example.com appname 12345 ID47 - This is a test message with structured data`}, + }, + }) + + // matched if condition + f("unpack_syslog if (appname)", [][]Field{ + { + {"_msg", `<165>1 2023-06-03T17:42:32.123456789Z mymachine.example.com appname 12345 ID47 - This is a test message with structured data`}, + }, + }, [][]Field{ + { + {"_msg", `<165>1 2023-06-03T17:42:32.123456789Z mymachine.example.com appname 12345 ID47 - This is a test message with structured data`}, + {"priority", "165"}, + {"facility", "20"}, + {"severity", "5"}, + {"timestamp", "2023-06-03T17:42:32.123456789Z"}, + {"hostname", "mymachine.example.com"}, + {"app_name", "appname"}, + {"proc_id", "12345"}, + {"msg_id", "ID47"}, + {"message", "This is a test message with structured data"}, + }, + }) + + // single row, unpack from missing field + f("unpack_syslog from x", [][]Field{ + { + {"_msg", `foo=bar`}, + }, + }, [][]Field{ + { + {"_msg", `foo=bar`}, + }, + }) + + // single row, unpack from non-syslog field + f("unpack_syslog from x", [][]Field{ + { + {"x", `foobar`}, + }, + }, [][]Field{ + { + {"x", `foobar`}, + }, + }) + + // multiple rows with distinct number of fields + f("unpack_syslog from x result_prefix qwe_", [][]Field{ + { + {"x", `<165>1 2023-06-03T17:42:32.123456789Z mymachine.example.com appname 12345 ID47 - This is a test message with structured data`}, + }, + { + {"x", `<163>1 2024-12-13T18:21:43Z mymachine.example.com appname2 345 ID7 - foobar`}, + {"y", `z=bar`}, + }, + }, [][]Field{ + { + {"x", `<165>1 2023-06-03T17:42:32.123456789Z mymachine.example.com appname 12345 ID47 - This is a test message with structured data`}, + {"qwe_priority", "165"}, + {"qwe_facility", "20"}, + {"qwe_severity", "5"}, + {"qwe_timestamp", "2023-06-03T17:42:32.123456789Z"}, + {"qwe_hostname", "mymachine.example.com"}, + {"qwe_app_name", "appname"}, + {"qwe_proc_id", "12345"}, + {"qwe_msg_id", "ID47"}, + {"qwe_message", "This is a test message with structured data"}, + }, + { + {"x", `<163>1 2024-12-13T18:21:43Z mymachine.example.com appname2 345 ID7 - foobar`}, + {"y", `z=bar`}, + {"qwe_priority", "163"}, + {"qwe_facility", "20"}, + {"qwe_severity", "3"}, + {"qwe_timestamp", "2024-12-13T18:21:43Z"}, + {"qwe_hostname", "mymachine.example.com"}, + {"qwe_app_name", "appname2"}, + {"qwe_proc_id", "345"}, + {"qwe_msg_id", "ID7"}, + {"qwe_message", "foobar"}, + }, + }) +} + +func TestPipeUnpackSyslogUpdateNeededFields(t *testing.T) { + f := func(s string, neededFields, unneededFields, neededFieldsExpected, unneededFieldsExpected string) { + t.Helper() + expectPipeNeededFields(t, s, neededFields, unneededFields, neededFieldsExpected, unneededFieldsExpected) + } + + // all the needed fields + f("unpack_syslog", "*", "", "*", "") + f("unpack_syslog keep_original_fields", "*", "", "*", "") + f("unpack_syslog if (y:z) from x", "*", "", "*", "") + + // all the needed fields, unneeded fields do not intersect with src + f("unpack_syslog from x", "*", "f1,f2", "*", "f1,f2") + f("unpack_syslog if (y:z) from x", "*", "f1,f2", "*", "f1,f2") + f("unpack_syslog if (f1:z) from x", "*", "f1,f2", "*", "f2") + + // all the needed fields, unneeded fields intersect with src + f("unpack_syslog from x", "*", "f2,x", "*", "f2") + f("unpack_syslog if (y:z) from x", "*", "f2,x", "*", "f2") + f("unpack_syslog if (f2:z) from x", "*", "f1,f2,x", "*", "f1") + + // needed fields do not intersect with src + f("unpack_syslog from x", "f1,f2", "", "f1,f2,x", "") + f("unpack_syslog if (y:z) from x", "f1,f2", "", "f1,f2,x,y", "") + f("unpack_syslog if (f1:z) from x", "f1,f2", "", "f1,f2,x", "") + + // needed fields intersect with src + f("unpack_syslog from x", "f2,x", "", "f2,x", "") + f("unpack_syslog if (y:z) from x", "f2,x", "", "f2,x,y", "") + f("unpack_syslog if (f2:z y:qwe) from x", "f2,x", "", "f2,x,y", "") +} diff --git a/lib/logstorage/syslog_parser.go b/lib/logstorage/syslog_parser.go new file mode 100644 index 000000000..6bf6f0082 --- /dev/null +++ b/lib/logstorage/syslog_parser.go @@ -0,0 +1,308 @@ +package logstorage + +import ( + "strconv" + "strings" + "sync" + "time" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" +) + +func getSyslogParser(currentYear int) *syslogParser { + v := syslogParserPool.Get() + if v == nil { + v = &syslogParser{} + } + p := v.(*syslogParser) + p.currentYear = currentYear + return p +} + +func putSyslogParser(p *syslogParser) { + p.reset() + syslogParserPool.Put(p) +} + +var syslogParserPool sync.Pool + +type syslogParser struct { + currentYear int + + buf []byte + fields []Field +} + +func (p *syslogParser) reset() { + p.currentYear = 0 + p.resetFields() +} + +func (p *syslogParser) resetFields() { + p.buf = p.buf[:0] + + clear(p.fields) + p.fields = p.fields[:0] +} + +func (p *syslogParser) addField(name, value string) { + p.fields = append(p.fields, Field{ + Name: name, + Value: value, + }) +} + +func (p *syslogParser) parse(s string) { + p.resetFields() + + if len(s) == 0 { + // Cannot parse syslog message + return + } + + if s[0] != '<' { + p.parseNoHeader(s) + return + } + + // parse priority + s = s[1:] + n := strings.IndexByte(s, '>') + if n < 0 { + // Cannot parse priority + return + } + priorityStr := s[:n] + s = s[n+1:] + + p.addField("priority", priorityStr) + priority, ok := tryParseUint64(priorityStr) + if !ok { + // Cannot parse priority + return + } + facility := priority / 8 + severity := priority % 8 + + bufLen := len(p.buf) + p.buf = marshalUint64String(p.buf, facility) + p.addField("facility", bytesutil.ToUnsafeString(p.buf[bufLen:])) + + bufLen = len(p.buf) + p.buf = marshalUint64String(p.buf, severity) + p.addField("severity", bytesutil.ToUnsafeString(p.buf[bufLen:])) + + p.parseNoHeader(s) +} + +func (p *syslogParser) parseNoHeader(s string) { + if len(s) == 0 { + return + } + if strings.HasPrefix(s, "1 ") { + p.parseRFC5424(s[2:]) + } else { + p.parseRFC3164(s) + } +} + +func (p *syslogParser) parseRFC5424(s string) { + // See https://datatracker.ietf.org/doc/html/rfc5424 + + if len(s) == 0 { + return + } + + // Parse timestamp + n := strings.IndexByte(s, ' ') + if n < 0 { + p.addField("timestamp", s) + return + } + p.addField("timestamp", s[:n]) + s = s[n+1:] + + // Parse hostname + n = strings.IndexByte(s, ' ') + if n < 0 { + p.addField("hostname", s) + return + } + p.addField("hostname", s[:n]) + s = s[n+1:] + + // Parse app-name + n = strings.IndexByte(s, ' ') + if n < 0 { + p.addField("app_name", s) + return + } + p.addField("app_name", s[:n]) + s = s[n+1:] + + // Parse procid + n = strings.IndexByte(s, ' ') + if n < 0 { + p.addField("proc_id", s) + return + } + p.addField("proc_id", s[:n]) + s = s[n+1:] + + // Parse msgID + n = strings.IndexByte(s, ' ') + if n < 0 { + p.addField("msg_id", s) + return + } + p.addField("msg_id", s[:n]) + s = s[n+1:] + + // Parse structured data + tail, ok := p.parseRFC5424SD(s) + if !ok { + return + } + s = tail + + // Parse message + p.addField("message", s) +} + +func (p *syslogParser) parseRFC5424SD(s string) (string, bool) { + if strings.HasPrefix(s, "- ") { + return s[2:], true + } + + for { + tail, ok := p.parseRFC5424SDLine(s) + if !ok { + return tail, false + } + s = tail + if strings.HasPrefix(s, " ") { + s = s[1:] + return s, true + } + } +} + +func (p *syslogParser) parseRFC5424SDLine(s string) (string, bool) { + if len(s) == 0 || s[0] != '[' { + return s, false + } + s = s[1:] + + n := strings.IndexAny(s, " ]") + if n < 0 { + return s, false + } + sdID := s[:n] + s = s[n:] + + // Parse structured data + i := 0 + for i < len(s) && s[i] != ']' { + // skip whitespace + if s[i] != ' ' { + return s, false + } + i++ + + // Parse name + n := strings.IndexByte(s[i:], '=') + if n < 0 { + return s, false + } + i += n + 1 + + // Parse value + qp, err := strconv.QuotedPrefix(s[i:]) + if err != nil { + return s, false + } + i += len(qp) + } + if i == len(s) { + return s, false + } + + sdValue := strings.TrimSpace(s[:i]) + p.addField(sdID, sdValue) + s = s[i+1:] + return s, true +} + +func (p *syslogParser) parseRFC3164(s string) { + // See https://datatracker.ietf.org/doc/html/rfc3164 + + // Parse timestamp + n := len(time.Stamp) + if len(s) < n { + return + } + t, err := time.Parse(time.Stamp, s[:n]) + if err != nil { + // TODO: fall back to parsing ISO8601 timestamp? + return + } + s = s[n:] + + t = t.UTC() + t = time.Date(p.currentYear, t.Month(), t.Day(), t.Hour(), t.Minute(), t.Second(), t.Nanosecond(), time.UTC) + if uint64(t.Unix())-24*3600 > fasttime.UnixTimestamp() { + // Adjust time to the previous year + t = time.Date(t.Year()-1, t.Month(), t.Day(), t.Hour(), t.Minute(), t.Second(), t.Nanosecond(), time.UTC) + } + + bufLen := len(p.buf) + p.buf = marshalTimestampISO8601String(p.buf, t.UnixNano()) + p.addField("timestamp", bytesutil.ToUnsafeString(p.buf[bufLen:])) + + if len(s) == 0 || s[0] != ' ' { + // Missing space after the time field + return + } + s = s[1:] + + // Parse hostname + n = strings.IndexByte(s, ' ') + if n < 0 { + p.addField("hostname", s) + return + } + p.addField("hostname", s[:n]) + s = s[n+1:] + + // Parse tag (aka app_name) + n = strings.IndexAny(s, "[: ") + if n < 0 { + p.addField("app_name", s) + return + } + p.addField("app_name", s[:n]) + s = s[n:] + + // Parse proc_id + if len(s) == 0 { + return + } + if s[0] == '[' { + s = s[1:] + n = strings.IndexByte(s, ']') + if n < 0 { + return + } + p.addField("proc_id", s[:n]) + s = s[n+1:] + } + + // Skip optional ': ' in front of message + s = strings.TrimPrefix(s, ":") + s = strings.TrimPrefix(s, " ") + + if len(s) > 0 { + p.addField("message", s) + } +} diff --git a/lib/logstorage/syslog_parser_test.go b/lib/logstorage/syslog_parser_test.go new file mode 100644 index 000000000..1a61a1fe6 --- /dev/null +++ b/lib/logstorage/syslog_parser_test.go @@ -0,0 +1,67 @@ +package logstorage + +import ( + "testing" +) + +func TestSyslogParser(t *testing.T) { + f := func(s, resultExpected string) { + t.Helper() + + const currentYear = 2024 + p := getSyslogParser(currentYear) + defer putSyslogParser(p) + + p.parse(s) + result := MarshalFieldsToJSON(nil, p.fields) + if string(result) != resultExpected { + t.Fatalf("unexpected result when parsing [%s]; got\n%s\nwant\n%s\n", s, result, resultExpected) + } + } + + // RFC 3164 + f("Jun 3 12:08:33 abcd systemd[1]: Starting Update the local ESM caches...", + `{"timestamp":"2024-06-03T12:08:33.000Z","hostname":"abcd","app_name":"systemd","proc_id":"1","message":"Starting Update the local ESM caches..."}`) + f("<165>Jun 3 12:08:33 abcd systemd[1]: Starting Update the local ESM caches...", + `{"priority":"165","facility":"20","severity":"5","timestamp":"2024-06-03T12:08:33.000Z","hostname":"abcd","app_name":"systemd","proc_id":"1","message":"Starting Update the local ESM caches..."}`) + f("Mar 13 12:08:33 abcd systemd: Starting Update the local ESM caches...", + `{"timestamp":"2024-03-13T12:08:33.000Z","hostname":"abcd","app_name":"systemd","message":"Starting Update the local ESM caches..."}`) + f("Jun 3 12:08:33 abcd - Starting Update the local ESM caches...", + `{"timestamp":"2024-06-03T12:08:33.000Z","hostname":"abcd","app_name":"-","message":"Starting Update the local ESM caches..."}`) + f("Jun 3 12:08:33 - - Starting Update the local ESM caches...", + `{"timestamp":"2024-06-03T12:08:33.000Z","hostname":"-","app_name":"-","message":"Starting Update the local ESM caches..."}`) + + // RFC 5424 + f(`<165>1 2023-06-03T17:42:32.123456789Z mymachine.example.com appname 12345 ID47 - This is a test message with structured data.`, + `{"priority":"165","facility":"20","severity":"5","timestamp":"2023-06-03T17:42:32.123456789Z","hostname":"mymachine.example.com","app_name":"appname","proc_id":"12345","msg_id":"ID47","message":"This is a test message with structured data."}`) + f(`1 2023-06-03T17:42:32.123456789Z mymachine.example.com appname 12345 ID47 - This is a test message with structured data.`, + `{"timestamp":"2023-06-03T17:42:32.123456789Z","hostname":"mymachine.example.com","app_name":"appname","proc_id":"12345","msg_id":"ID47","message":"This is a test message with structured data."}`) + f(`<165>1 2023-06-03T17:42:00.000Z mymachine.example.com appname 12345 ID47 [exampleSDID@32473 iut="3" eventSource="Application 123 = ] 56" eventID="11211"] This is a test message with structured data.`, + `{"priority":"165","facility":"20","severity":"5","timestamp":"2023-06-03T17:42:00.000Z","hostname":"mymachine.example.com","app_name":"appname","proc_id":"12345","msg_id":"ID47","exampleSDID@32473":"iut=\"3\" eventSource=\"Application 123 = ] 56\" eventID=\"11211\"","message":"This is a test message with structured data."}`) + f(`<165>1 2023-06-03T17:42:00.000Z mymachine.example.com appname 12345 ID47 [foo@123 iut="3"][bar@456 eventID="11211"] This is a test message with structured data.`, + `{"priority":"165","facility":"20","severity":"5","timestamp":"2023-06-03T17:42:00.000Z","hostname":"mymachine.example.com","app_name":"appname","proc_id":"12345","msg_id":"ID47","foo@123":"iut=\"3\"","bar@456":"eventID=\"11211\"","message":"This is a test message with structured data."}`) + + // Incomplete RFC 3164 + f("", `{}`) + f("Jun 3 12:08:33", `{"timestamp":"2024-06-03T12:08:33.000Z"}`) + f("Jun 3 12:08:33 abcd", `{"timestamp":"2024-06-03T12:08:33.000Z","hostname":"abcd"}`) + f("Jun 3 12:08:33 abcd sudo", `{"timestamp":"2024-06-03T12:08:33.000Z","hostname":"abcd","app_name":"sudo"}`) + f("Jun 3 12:08:33 abcd sudo[123]", `{"timestamp":"2024-06-03T12:08:33.000Z","hostname":"abcd","app_name":"sudo","proc_id":"123"}`) + f("Jun 3 12:08:33 abcd sudo foobar", `{"timestamp":"2024-06-03T12:08:33.000Z","hostname":"abcd","app_name":"sudo","message":"foobar"}`) + + // Incomplete RFC 5424 + f(`<165>1 2023-06-03T17:42:32.123456789Z mymachine.example.com appname 12345 ID47 [foo@123]`, + `{"priority":"165","facility":"20","severity":"5","timestamp":"2023-06-03T17:42:32.123456789Z","hostname":"mymachine.example.com","app_name":"appname","proc_id":"12345","msg_id":"ID47","foo@123":""}`) + f(`<165>1 2023-06-03T17:42:32.123456789Z mymachine.example.com appname 12345 ID47`, + `{"priority":"165","facility":"20","severity":"5","timestamp":"2023-06-03T17:42:32.123456789Z","hostname":"mymachine.example.com","app_name":"appname","proc_id":"12345","msg_id":"ID47"}`) + f(`<165>1 2023-06-03T17:42:32.123456789Z mymachine.example.com appname 12345`, + `{"priority":"165","facility":"20","severity":"5","timestamp":"2023-06-03T17:42:32.123456789Z","hostname":"mymachine.example.com","app_name":"appname","proc_id":"12345"}`) + f(`<165>1 2023-06-03T17:42:32.123456789Z mymachine.example.com appname`, + `{"priority":"165","facility":"20","severity":"5","timestamp":"2023-06-03T17:42:32.123456789Z","hostname":"mymachine.example.com","app_name":"appname"}`) + f(`<165>1 2023-06-03T17:42:32.123456789Z mymachine.example.com`, + `{"priority":"165","facility":"20","severity":"5","timestamp":"2023-06-03T17:42:32.123456789Z","hostname":"mymachine.example.com"}`) + f(`<165>1 2023-06-03T17:42:32.123456789Z`, + `{"priority":"165","facility":"20","severity":"5","timestamp":"2023-06-03T17:42:32.123456789Z"}`) + f(`<165>1 `, + `{"priority":"165","facility":"20","severity":"5"}`) +}