From dcf4d83ba84b4c5bfccfa7ac77548a7347b69616 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Tue, 4 Jun 2024 16:20:02 +0200 Subject: [PATCH] wip --- app/vlselect/logsql/logsql.go | 2 +- docs/VictoriaLogs/CHANGELOG.md | 2 ++ docs/VictoriaLogs/LogsQL.md | 17 ++++++++- lib/logstorage/pipe_format.go | 27 +++++++++++++-- lib/logstorage/pipe_format_test.go | 16 +++++++++ lib/logstorage/pipe_unpack_syslog_test.go | 6 ++++ lib/logstorage/syslog_parser.go | 5 +++ lib/logstorage/syslog_parser_test.go | 42 +++++++++++------------ 8 files changed, 92 insertions(+), 25 deletions(-) diff --git a/app/vlselect/logsql/logsql.go b/app/vlselect/logsql/logsql.go index fcc16f24a..c2c4e8472 100644 --- a/app/vlselect/logsql/logsql.go +++ b/app/vlselect/logsql/logsql.go @@ -355,7 +355,7 @@ type row struct { } func getLastNQueryResults(ctx context.Context, tenantIDs []logstorage.TenantID, q *logstorage.Query, limit int) ([]row, error) { - limitUpper := 2*limit + limitUpper := 2 * limit q.AddPipeLimit(uint64(limitUpper)) q.Optimize() rows, err := getQueryResultsWithLimit(ctx, tenantIDs, q, limitUpper) diff --git a/docs/VictoriaLogs/CHANGELOG.md b/docs/VictoriaLogs/CHANGELOG.md index 51ef8d8cc..91a7a148e 100644 --- a/docs/VictoriaLogs/CHANGELOG.md +++ b/docs/VictoriaLogs/CHANGELOG.md @@ -19,6 +19,8 @@ according to [these docs](https://docs.victoriametrics.com/victorialogs/quicksta ## tip +* FEATURE: add ability to format numeric fields into string representation of time, duration and ipv4 with [`format` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#format-pipe). + ## [v0.16.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.16.0-victorialogs) Released at 2024-06-04 diff --git a/docs/VictoriaLogs/LogsQL.md b/docs/VictoriaLogs/LogsQL.md index 67315b07a..2140b5a6a 100644 --- a/docs/VictoriaLogs/LogsQL.md +++ b/docs/VictoriaLogs/LogsQL.md @@ -1554,6 +1554,18 @@ and stores it into `my_json` output field: _time:5m | format '{"_msg":,"stacktrace":}' as my_json ``` +Numeric fields can be transformed into the following string representation at `format` pipe: + +- [RFC3339 time](https://www.rfc-editor.org/rfc/rfc3339) - by adding `time:` in front of the corresponding field name + containing [Unix timestamp](https://en.wikipedia.org/wiki/Unix_time) in nanoseconds. + For example, `format "time="`. The timestamp can be converted into nanoseconds with the [`math` pipe](#math-pipe). + +- Human-readable duration - by adding `duration:` in front of the corresponding numeric field name containing duration in nanoseconds. + For example, `format "duration="`. The duration can be converted into nanoseconds with the [`math` pipe](#math-pipe). + +- IPv4 - by adding `ipv4:` in front of the corresponding field name containing `uint32` representation of the IPv4 address. + For example, `format "ip="`. + Add `keep_original_fields` to the end of `format ... as result_field` when the original non-empty value of the `result_field` must be preserved instead of overwriting it with the `format` results. For example, the following query adds formatted result to `foo` field only if it was missing or empty: @@ -2301,13 +2313,14 @@ _time:5m | unpack_logfmt if (ip:"") from foo from the given [`field_name`](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). It understands the following Syslog formats: - [RFC3164](https://datatracker.ietf.org/doc/html/rfc3164) aka `MMM DD hh:mm:ss HOSTNAME TAG: MESSAGE` -- [RFC5424](https://datatracker.ietf.org/doc/html/rfc5424) aka `VERSION TIMESTAMP HOSTNAME APP-NAME PROCID MSGID [STRUCTURED-DATA] MESSAGE` +- [RFC5424](https://datatracker.ietf.org/doc/html/rfc5424) aka `1 TIMESTAMP HOSTNAME APP-NAME PROCID MSGID [STRUCTURED-DATA] MESSAGE` The following fields are unpacked: - `priority` - it is obtained from `PRI`. - `facility` - it is calculated as `PRI / 8`. - `severity` - it is calculated as `PRI % 8`. +- `format` - either `rfc3164` or `rfc5424` depending on which Syslog format is unpacked. - `timestamp` - timestamp in [ISO8601 format](https://en.wikipedia.org/wiki/ISO_8601). The `MMM DD hh:mm:ss` timestamp in [RFC3164](https://datatracker.ietf.org/doc/html/rfc3164) is automatically converted into [ISO8601 format](https://en.wikipedia.org/wiki/ISO_8601) by assuming that the timestamp belongs to the last 12 months. - `hostname` @@ -2316,6 +2329,8 @@ The following fields are unpacked: - `msg_id` - `message` +The `` part is optional. If it is missing, then `priority`, `facility` and `severity` fields aren't set. + The `[STRUCTURED-DATA]` is parsed into fields with the `SD-ID` name and `param1="value1" ... paramN="valueN"` value according to [the specification](https://datatracker.ietf.org/doc/html/rfc5424#section-6.3). The value then can be parsed to separate fields with [`unpack_logfmt` pipe](#unpack_logfmt-pipe). diff --git a/lib/logstorage/pipe_format.go b/lib/logstorage/pipe_format.go index aa91a0677..3b2e70eaf 100644 --- a/lib/logstorage/pipe_format.go +++ b/lib/logstorage/pipe_format.go @@ -2,6 +2,7 @@ package logstorage import ( "fmt" + "math" "strconv" "unsafe" @@ -185,9 +186,31 @@ func (shard *pipeFormatProcessorShard) formatRow(pf *pipeFormat, br *blockResult if step.field != "" { c := br.getColumnByName(step.field) v := c.getValueAtRow(br, rowIdx) - if step.fieldOpt == "q" { + switch step.fieldOpt { + case "q": b = strconv.AppendQuote(b, v) - } else { + case "time": + nsecs, ok := tryParseInt64(v) + if !ok { + b = append(b, v...) + continue + } + b = marshalTimestampRFC3339NanoString(b, nsecs) + case "duration": + nsecs, ok := tryParseInt64(v) + if !ok { + b = append(b, v...) + continue + } + b = marshalDurationString(b, nsecs) + case "ipv4": + ipNum, ok := tryParseUint64(v) + if !ok || ipNum > math.MaxUint32 { + b = append(b, v...) + continue + } + b = marshalIPv4String(b, uint32(ipNum)) + default: b = append(b, v...) } } diff --git a/lib/logstorage/pipe_format_test.go b/lib/logstorage/pipe_format_test.go index 8f74bd3e9..d1ff5cf07 100644 --- a/lib/logstorage/pipe_format_test.go +++ b/lib/logstorage/pipe_format_test.go @@ -47,6 +47,22 @@ func TestPipeFormat(t *testing.T) { expectPipeResults(t, pipeStr, rows, rowsExpected) } + // format time, duration and ipv4 + f(`format 'time=, duration=, ip=' as x`, [][]Field{ + { + {"foo", `1717328141123456789`}, + {"bar", `210123456789`}, + {"baz", "1234567890"}, + }, + }, [][]Field{ + { + {"foo", `1717328141123456789`}, + {"bar", `210123456789`}, + {"baz", "1234567890"}, + {"x", "time=2024-06-02T11:35:41.123456789Z, duration=3m30.123456789s, ip=73.150.2.210"}, + }, + }) + // skip_empty_results f(`format '' as x skip_empty_results`, [][]Field{ { diff --git a/lib/logstorage/pipe_unpack_syslog_test.go b/lib/logstorage/pipe_unpack_syslog_test.go index ed1a1a7d4..6b7ddd948 100644 --- a/lib/logstorage/pipe_unpack_syslog_test.go +++ b/lib/logstorage/pipe_unpack_syslog_test.go @@ -62,6 +62,7 @@ func TestPipeUnpackSyslog(t *testing.T) { {"priority", "165"}, {"facility", "20"}, {"severity", "5"}, + {"format", "rfc5424"}, {"timestamp", "2023-06-03T17:42:32.123456789Z"}, {"hostname", "mymachine.example.com"}, {"app_name", "appname"}, @@ -86,6 +87,7 @@ func TestPipeUnpackSyslog(t *testing.T) { {"priority", "165"}, {"facility", "20"}, {"severity", "5"}, + {"format", "rfc5424"}, {"timestamp", "2023-06-03T17:42:32.123456789Z"}, {"hostname", "mymachine.example.com"}, {"app_name", "foobar"}, @@ -106,6 +108,7 @@ func TestPipeUnpackSyslog(t *testing.T) { {"priority", "165"}, {"facility", "20"}, {"severity", "5"}, + {"format", "rfc5424"}, {"timestamp", "2023-06-03T17:42:32.123456789Z"}, {"hostname", "mymachine.example.com"}, {"app_name", "appname"}, @@ -137,6 +140,7 @@ func TestPipeUnpackSyslog(t *testing.T) { {"priority", "165"}, {"facility", "20"}, {"severity", "5"}, + {"format", "rfc5424"}, {"timestamp", "2023-06-03T17:42:32.123456789Z"}, {"hostname", "mymachine.example.com"}, {"app_name", "appname"}, @@ -183,6 +187,7 @@ func TestPipeUnpackSyslog(t *testing.T) { {"qwe_priority", "165"}, {"qwe_facility", "20"}, {"qwe_severity", "5"}, + {"qwe_format", "rfc5424"}, {"qwe_timestamp", "2023-06-03T17:42:32.123456789Z"}, {"qwe_hostname", "mymachine.example.com"}, {"qwe_app_name", "appname"}, @@ -196,6 +201,7 @@ func TestPipeUnpackSyslog(t *testing.T) { {"qwe_priority", "163"}, {"qwe_facility", "20"}, {"qwe_severity", "3"}, + {"qwe_format", "rfc5424"}, {"qwe_timestamp", "2024-12-13T18:21:43Z"}, {"qwe_hostname", "mymachine.example.com"}, {"qwe_app_name", "appname2"}, diff --git a/lib/logstorage/syslog_parser.go b/lib/logstorage/syslog_parser.go index 6bf6f0082..5e4f67fa7 100644 --- a/lib/logstorage/syslog_parser.go +++ b/lib/logstorage/syslog_parser.go @@ -110,6 +110,8 @@ func (p *syslogParser) parseNoHeader(s string) { func (p *syslogParser) parseRFC5424(s string) { // See https://datatracker.ietf.org/doc/html/rfc5424 + p.addField("format", "rfc5424") + if len(s) == 0 { return } @@ -242,6 +244,9 @@ func (p *syslogParser) parseRFC3164(s string) { if len(s) < n { return } + + p.addField("format", "rfc3164") + t, err := time.Parse(time.Stamp, s[:n]) if err != nil { // TODO: fall back to parsing ISO8601 timestamp? diff --git a/lib/logstorage/syslog_parser_test.go b/lib/logstorage/syslog_parser_test.go index 1a61a1fe6..a82154103 100644 --- a/lib/logstorage/syslog_parser_test.go +++ b/lib/logstorage/syslog_parser_test.go @@ -21,47 +21,47 @@ func TestSyslogParser(t *testing.T) { // RFC 3164 f("Jun 3 12:08:33 abcd systemd[1]: Starting Update the local ESM caches...", - `{"timestamp":"2024-06-03T12:08:33.000Z","hostname":"abcd","app_name":"systemd","proc_id":"1","message":"Starting Update the local ESM caches..."}`) + `{"format":"rfc3164","timestamp":"2024-06-03T12:08:33.000Z","hostname":"abcd","app_name":"systemd","proc_id":"1","message":"Starting Update the local ESM caches..."}`) f("<165>Jun 3 12:08:33 abcd systemd[1]: Starting Update the local ESM caches...", - `{"priority":"165","facility":"20","severity":"5","timestamp":"2024-06-03T12:08:33.000Z","hostname":"abcd","app_name":"systemd","proc_id":"1","message":"Starting Update the local ESM caches..."}`) + `{"priority":"165","facility":"20","severity":"5","format":"rfc3164","timestamp":"2024-06-03T12:08:33.000Z","hostname":"abcd","app_name":"systemd","proc_id":"1","message":"Starting Update the local ESM caches..."}`) f("Mar 13 12:08:33 abcd systemd: Starting Update the local ESM caches...", - `{"timestamp":"2024-03-13T12:08:33.000Z","hostname":"abcd","app_name":"systemd","message":"Starting Update the local ESM caches..."}`) + `{"format":"rfc3164","timestamp":"2024-03-13T12:08:33.000Z","hostname":"abcd","app_name":"systemd","message":"Starting Update the local ESM caches..."}`) f("Jun 3 12:08:33 abcd - Starting Update the local ESM caches...", - `{"timestamp":"2024-06-03T12:08:33.000Z","hostname":"abcd","app_name":"-","message":"Starting Update the local ESM caches..."}`) + `{"format":"rfc3164","timestamp":"2024-06-03T12:08:33.000Z","hostname":"abcd","app_name":"-","message":"Starting Update the local ESM caches..."}`) f("Jun 3 12:08:33 - - Starting Update the local ESM caches...", - `{"timestamp":"2024-06-03T12:08:33.000Z","hostname":"-","app_name":"-","message":"Starting Update the local ESM caches..."}`) + `{"format":"rfc3164","timestamp":"2024-06-03T12:08:33.000Z","hostname":"-","app_name":"-","message":"Starting Update the local ESM caches..."}`) // RFC 5424 f(`<165>1 2023-06-03T17:42:32.123456789Z mymachine.example.com appname 12345 ID47 - This is a test message with structured data.`, - `{"priority":"165","facility":"20","severity":"5","timestamp":"2023-06-03T17:42:32.123456789Z","hostname":"mymachine.example.com","app_name":"appname","proc_id":"12345","msg_id":"ID47","message":"This is a test message with structured data."}`) + `{"priority":"165","facility":"20","severity":"5","format":"rfc5424","timestamp":"2023-06-03T17:42:32.123456789Z","hostname":"mymachine.example.com","app_name":"appname","proc_id":"12345","msg_id":"ID47","message":"This is a test message with structured data."}`) f(`1 2023-06-03T17:42:32.123456789Z mymachine.example.com appname 12345 ID47 - This is a test message with structured data.`, - `{"timestamp":"2023-06-03T17:42:32.123456789Z","hostname":"mymachine.example.com","app_name":"appname","proc_id":"12345","msg_id":"ID47","message":"This is a test message with structured data."}`) + `{"format":"rfc5424","timestamp":"2023-06-03T17:42:32.123456789Z","hostname":"mymachine.example.com","app_name":"appname","proc_id":"12345","msg_id":"ID47","message":"This is a test message with structured data."}`) f(`<165>1 2023-06-03T17:42:00.000Z mymachine.example.com appname 12345 ID47 [exampleSDID@32473 iut="3" eventSource="Application 123 = ] 56" eventID="11211"] This is a test message with structured data.`, - `{"priority":"165","facility":"20","severity":"5","timestamp":"2023-06-03T17:42:00.000Z","hostname":"mymachine.example.com","app_name":"appname","proc_id":"12345","msg_id":"ID47","exampleSDID@32473":"iut=\"3\" eventSource=\"Application 123 = ] 56\" eventID=\"11211\"","message":"This is a test message with structured data."}`) + `{"priority":"165","facility":"20","severity":"5","format":"rfc5424","timestamp":"2023-06-03T17:42:00.000Z","hostname":"mymachine.example.com","app_name":"appname","proc_id":"12345","msg_id":"ID47","exampleSDID@32473":"iut=\"3\" eventSource=\"Application 123 = ] 56\" eventID=\"11211\"","message":"This is a test message with structured data."}`) f(`<165>1 2023-06-03T17:42:00.000Z mymachine.example.com appname 12345 ID47 [foo@123 iut="3"][bar@456 eventID="11211"] This is a test message with structured data.`, - `{"priority":"165","facility":"20","severity":"5","timestamp":"2023-06-03T17:42:00.000Z","hostname":"mymachine.example.com","app_name":"appname","proc_id":"12345","msg_id":"ID47","foo@123":"iut=\"3\"","bar@456":"eventID=\"11211\"","message":"This is a test message with structured data."}`) + `{"priority":"165","facility":"20","severity":"5","format":"rfc5424","timestamp":"2023-06-03T17:42:00.000Z","hostname":"mymachine.example.com","app_name":"appname","proc_id":"12345","msg_id":"ID47","foo@123":"iut=\"3\"","bar@456":"eventID=\"11211\"","message":"This is a test message with structured data."}`) // Incomplete RFC 3164 f("", `{}`) - f("Jun 3 12:08:33", `{"timestamp":"2024-06-03T12:08:33.000Z"}`) - f("Jun 3 12:08:33 abcd", `{"timestamp":"2024-06-03T12:08:33.000Z","hostname":"abcd"}`) - f("Jun 3 12:08:33 abcd sudo", `{"timestamp":"2024-06-03T12:08:33.000Z","hostname":"abcd","app_name":"sudo"}`) - f("Jun 3 12:08:33 abcd sudo[123]", `{"timestamp":"2024-06-03T12:08:33.000Z","hostname":"abcd","app_name":"sudo","proc_id":"123"}`) - f("Jun 3 12:08:33 abcd sudo foobar", `{"timestamp":"2024-06-03T12:08:33.000Z","hostname":"abcd","app_name":"sudo","message":"foobar"}`) + f("Jun 3 12:08:33", `{"format":"rfc3164","timestamp":"2024-06-03T12:08:33.000Z"}`) + f("Jun 3 12:08:33 abcd", `{"format":"rfc3164","timestamp":"2024-06-03T12:08:33.000Z","hostname":"abcd"}`) + f("Jun 3 12:08:33 abcd sudo", `{"format":"rfc3164","timestamp":"2024-06-03T12:08:33.000Z","hostname":"abcd","app_name":"sudo"}`) + f("Jun 3 12:08:33 abcd sudo[123]", `{"format":"rfc3164","timestamp":"2024-06-03T12:08:33.000Z","hostname":"abcd","app_name":"sudo","proc_id":"123"}`) + f("Jun 3 12:08:33 abcd sudo foobar", `{"format":"rfc3164","timestamp":"2024-06-03T12:08:33.000Z","hostname":"abcd","app_name":"sudo","message":"foobar"}`) // Incomplete RFC 5424 f(`<165>1 2023-06-03T17:42:32.123456789Z mymachine.example.com appname 12345 ID47 [foo@123]`, - `{"priority":"165","facility":"20","severity":"5","timestamp":"2023-06-03T17:42:32.123456789Z","hostname":"mymachine.example.com","app_name":"appname","proc_id":"12345","msg_id":"ID47","foo@123":""}`) + `{"priority":"165","facility":"20","severity":"5","format":"rfc5424","timestamp":"2023-06-03T17:42:32.123456789Z","hostname":"mymachine.example.com","app_name":"appname","proc_id":"12345","msg_id":"ID47","foo@123":""}`) f(`<165>1 2023-06-03T17:42:32.123456789Z mymachine.example.com appname 12345 ID47`, - `{"priority":"165","facility":"20","severity":"5","timestamp":"2023-06-03T17:42:32.123456789Z","hostname":"mymachine.example.com","app_name":"appname","proc_id":"12345","msg_id":"ID47"}`) + `{"priority":"165","facility":"20","severity":"5","format":"rfc5424","timestamp":"2023-06-03T17:42:32.123456789Z","hostname":"mymachine.example.com","app_name":"appname","proc_id":"12345","msg_id":"ID47"}`) f(`<165>1 2023-06-03T17:42:32.123456789Z mymachine.example.com appname 12345`, - `{"priority":"165","facility":"20","severity":"5","timestamp":"2023-06-03T17:42:32.123456789Z","hostname":"mymachine.example.com","app_name":"appname","proc_id":"12345"}`) + `{"priority":"165","facility":"20","severity":"5","format":"rfc5424","timestamp":"2023-06-03T17:42:32.123456789Z","hostname":"mymachine.example.com","app_name":"appname","proc_id":"12345"}`) f(`<165>1 2023-06-03T17:42:32.123456789Z mymachine.example.com appname`, - `{"priority":"165","facility":"20","severity":"5","timestamp":"2023-06-03T17:42:32.123456789Z","hostname":"mymachine.example.com","app_name":"appname"}`) + `{"priority":"165","facility":"20","severity":"5","format":"rfc5424","timestamp":"2023-06-03T17:42:32.123456789Z","hostname":"mymachine.example.com","app_name":"appname"}`) f(`<165>1 2023-06-03T17:42:32.123456789Z mymachine.example.com`, - `{"priority":"165","facility":"20","severity":"5","timestamp":"2023-06-03T17:42:32.123456789Z","hostname":"mymachine.example.com"}`) + `{"priority":"165","facility":"20","severity":"5","format":"rfc5424","timestamp":"2023-06-03T17:42:32.123456789Z","hostname":"mymachine.example.com"}`) f(`<165>1 2023-06-03T17:42:32.123456789Z`, - `{"priority":"165","facility":"20","severity":"5","timestamp":"2023-06-03T17:42:32.123456789Z"}`) + `{"priority":"165","facility":"20","severity":"5","format":"rfc5424","timestamp":"2023-06-03T17:42:32.123456789Z"}`) f(`<165>1 `, - `{"priority":"165","facility":"20","severity":"5"}`) + `{"priority":"165","facility":"20","severity":"5","format":"rfc5424"}`) }