This commit is contained in:
Aliaksandr Valialkin 2024-06-04 00:48:18 +02:00
parent ebb37263a1
commit 2c7606eb49
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB
7 changed files with 855 additions and 5 deletions

View file

@ -19,6 +19,7 @@ according to [these docs](https://docs.victoriametrics.com/victorialogs/quicksta
## tip ## tip
* FEATURE: add [`unpack_syslog` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#unpack_syslog-pipe) for unpacking [syslog](https://en.wikipedia.org/wiki/Syslog) messages from [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
* FEATURE: parse timestamps in [`_time` filter](https://docs.victoriametrics.com/victorialogs/logsql/#time-filter) with nanosecond precision. * FEATURE: parse timestamps in [`_time` filter](https://docs.victoriametrics.com/victorialogs/logsql/#time-filter) with nanosecond precision.
* FEATURE: return the last `N` matching logs from [`/select/logsql/query` HTTP API](https://docs.victoriametrics.com/victorialogs/querying/#querying-logs) with the maximum timestamps if `limit=N` query arg is passed to it. Previously a random subset of matching logs could be returned, which could complicate investigation of the returned logs. * FEATURE: return the last `N` matching logs from [`/select/logsql/query` HTTP API](https://docs.victoriametrics.com/victorialogs/querying/#querying-logs) with the maximum timestamps if `limit=N` query arg is passed to it. Previously a random subset of matching logs could be returned, which could complicate investigation of the returned logs.
* FEATURE: add [`drop_empty_fields` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#drop_empty_fields-pipe) for dropping [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) with empty values. * FEATURE: add [`drop_empty_fields` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#drop_empty_fields-pipe) for dropping [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) with empty values.

View file

@ -1173,8 +1173,9 @@ LogsQL supports the following pipes:
- [`sort`](#sort-pipe) sorts logs by the given [fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). - [`sort`](#sort-pipe) sorts logs by the given [fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
- [`stats`](#stats-pipe) calculates various stats over the selected logs. - [`stats`](#stats-pipe) calculates various stats over the selected logs.
- [`uniq`](#uniq-pipe) returns unique log entires. - [`uniq`](#uniq-pipe) returns unique log entires.
- [`unpack_json`](#unpack_json-pipe) unpacks JSON fields from [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). - [`unpack_json`](#unpack_json-pipe) unpacks JSON messages from [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
- [`unpack_logfmt`](#unpack_logfmt-pipe) unpacks [logfmt](https://brandur.org/logfmt) fields from [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). - [`unpack_logfmt`](#unpack_logfmt-pipe) unpacks [logfmt](https://brandur.org/logfmt) messages from [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
- [`unpack_syslog`](#unpack_syslog-pipe) [syslog](https://en.wikipedia.org/wiki/Syslog) messages from [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
- [`unroll`](#unroll-pipe) unrolls JSON arrays from [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). - [`unroll`](#unroll-pipe) unrolls JSON arrays from [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
### copy pipe ### copy pipe
@ -2127,7 +2128,7 @@ See also:
### unpack_json pipe ### unpack_json pipe
`| unpack_json from field_name` pipe unpacks `{"k1":"v1", ..., "kN":"vN"}` JSON from the given input [`field_name`](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) `| unpack_json from field_name` [pipe](#pipes) unpacks `{"k1":"v1", ..., "kN":"vN"}` JSON from the given input [`field_name`](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model)
into `k1`, ... `kN` output field names with the corresponding `v1`, ..., `vN` values. It overrides existing fields with names from the `k1`, ..., `kN` list. Other fields remain untouched. into `k1`, ... `kN` output field names with the corresponding `v1`, ..., `vN` values. It overrides existing fields with names from the `k1`, ..., `kN` list. Other fields remain untouched.
Nested JSON is unpacked according to the rules defined [here](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). Nested JSON is unpacked according to the rules defined [here](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
@ -2194,6 +2195,7 @@ See also:
- [Conditional `unpack_json`](#conditional-unpack_json) - [Conditional `unpack_json`](#conditional-unpack_json)
- [`unpack_logfmt` pipe](#unpack_logfmt-pipe) - [`unpack_logfmt` pipe](#unpack_logfmt-pipe)
- [`unpack_syslog` pipe](#unpack_syslog-pipe)
- [`extract` pipe](#extract-pipe) - [`extract` pipe](#extract-pipe)
- [`unroll` pipe](#unroll-pipe) - [`unroll` pipe](#unroll-pipe)
- [`pack_json` pipe](#pack_json-pipe) - [`pack_json` pipe](#pack_json-pipe)
@ -2210,7 +2212,7 @@ _time:5m | unpack_json if (ip:"") from foo
### unpack_logfmt pipe ### unpack_logfmt pipe
`| unpack_logfmt from field_name` pipe unpacks `k1=v1 ... kN=vN` [logfmt](https://brandur.org/logfmt) fields `| unpack_logfmt from field_name` [pipe](#pipes) unpacks `k1=v1 ... kN=vN` [logfmt](https://brandur.org/logfmt) fields
from the given [`field_name`](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) into `k1`, ... `kN` field names from the given [`field_name`](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) into `k1`, ... `kN` field names
with the corresponding `v1`, ..., `vN` values. It overrides existing fields with names from the `k1`, ..., `kN` list. Other fields remain untouched. with the corresponding `v1`, ..., `vN` values. It overrides existing fields with names from the `k1`, ..., `kN` list. Other fields remain untouched.
@ -2257,7 +2259,7 @@ in [`_msg` field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#mes
_time:5m | extract ' ip=<ip>' _time:5m | extract ' ip=<ip>'
``` ```
If you want to make sure that the unpacked [logfmt](https://brandur.org/logfmt) fields do not clash with the existing fields, then specify common prefix for all the fields extracted from JSON, If you want to make sure that the unpacked [logfmt](https://brandur.org/logfmt) fields do not clash with the existing fields, then specify common prefix for all the fields extracted from logfmt,
by adding `result_prefix "prefix_name"` to `unpack_logfmt`. For example, the following query adds `foo_` prefix for all the unpacked fields by adding `result_prefix "prefix_name"` to `unpack_logfmt`. For example, the following query adds `foo_` prefix for all the unpacked fields
from `foo` field: from `foo` field:
@ -2278,6 +2280,7 @@ See also:
- [Conditional unpack_logfmt](#conditional-unpack_logfmt) - [Conditional unpack_logfmt](#conditional-unpack_logfmt)
- [`unpack_json` pipe](#unpack_json-pipe) - [`unpack_json` pipe](#unpack_json-pipe)
- [`unpack_syslog` pipe](#unpack_syslog-pipe)
- [`extract` pipe](#extract-pipe) - [`extract` pipe](#extract-pipe)
#### Conditional unpack_logfmt #### Conditional unpack_logfmt
@ -2291,6 +2294,86 @@ only if `ip` field in the current log entry isn't set or empty:
_time:5m | unpack_logfmt if (ip:"") from foo _time:5m | unpack_logfmt if (ip:"") from foo
``` ```
### unpack_syslog pipe
`| unpack_syslog from field_name` [pipe](#pipes) unpacks [syslog](https://en.wikipedia.org/wiki/Syslog) message
from the given [`field_name`](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). It understands the following Syslog formats:
- [RFC3164](https://datatracker.ietf.org/doc/html/rfc3164) aka `<PRI>MMM DD hh:mm:ss HOSTNAME TAG: MESSAGE`
- [RFC5424](https://datatracker.ietf.org/doc/html/rfc5424) aka `<PRI>VERSION TIMESTAMP HOSTNAME APP-NAME PROCID MSGID [STRUCTURED-DATA] MESSAGE`
The following fields are unpacked:
- `priority` - it is obtained from `PRI`.
- `facility` - it is calculated as `PRI / 8`.
- `severity` - it is calculated as `PRI % 8`.
- `timestamp` - timestamp in [ISO8601 format](https://en.wikipedia.org/wiki/ISO_8601). The `MMM DD hh:mm:ss` timestamp in [RFC3164](https://datatracker.ietf.org/doc/html/rfc3164)
is automatically converted into [ISO8601 format](https://en.wikipedia.org/wiki/ISO_8601) by assuming that the timestamp belongs to the last 12 months.
- `hostname`
- `app_name`
- `proc_id`
- `msg_id`
- `message`
The `[STRUCTURED-DATA]` is parsed into fields with the `SD-ID` name and `param1="value1" ... paramN="valueN"` value
according to [the specification](https://datatracker.ietf.org/doc/html/rfc5424#section-6.3). The value then can be parsed to separate fields with [`unpack_logfmt` pipe](#unpack_logfmt-pipe).
For example, the following query unpacks [syslog](https://en.wikipedia.org/wiki/Syslog) message from the [`_msg` field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field)
across logs for the last 5 minutes:
```logsql
_time:5m | unpack_syslog from _msg
```
The `from _json` part can be omitted when [syslog](https://en.wikipedia.org/wiki/Syslog) message is unpacked
from the [`_msg` field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field).
The following query is equivalent to the previous one:
```logsql
_time:5m | unpack_syslog
```
If it is needed to preserve the original non-empty field values, then add `keep_original_fields` to the end of `unpack_syslog ...`:
```logsql
_time:5m | unpack_syslog keep_original_fields
```
If you want to make sure that the unpacked [syslog](https://en.wikipedia.org/wiki/Syslog) fields do not clash with the existing fields,
then specify common prefix for all the fields extracted from syslog, by adding `result_prefix "prefix_name"` to `unpack_syslog`.
For example, the following query adds `foo_` prefix for all the unpacked fields from `foo` field:
```logsql
_time:5m | unpack_syslog from foo result_prefix "foo_"
```
Performance tips:
- It is better from performance and resource usage PoV ingesting parsed [syslog](https://en.wikipedia.org/wiki/Syslog) messages into VictoriaLogs
according to the [supported data model](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model)
instead of ingesting unparsed syslog lines into VictoriaLogs and then parsing them at query time with [`unpack_syslog` pipe](#unpack_syslog-pipe).
- It is recommended using more specific [log filters](#filters) in order to reduce the number of log entries, which are passed to `unpack_syslog`.
See [general performance tips](#performance-tips) for details.
See also:
- [Conditional unpack_syslog](#conditional-unpack_syslog)
- [`unpack_json` pipe](#unpack_json-pipe)
- [`unpack_logfmt` pipe](#unpack_logfmt-pipe)
- [`extract` pipe](#extract-pipe)
#### Conditional unpack_syslog
If the [`unpack_syslog` pipe](#unpack_syslog-pipe) musn't be applied to every [log entry](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model),
then add `if (<filters>)` after `unpack_syslog`.
The `<filters>` can contain arbitrary [filters](#filters). For example, the following query unpacks syslog message fields from `foo` field
only if `hostname` field in the current log entry isn't set or empty:
```logsql
_time:5m | unpack_syslog if (hostname:"") from foo
```
### unroll pipe ### unroll pipe
`| unroll by (field1, ..., fieldN)` [pipe](#pipes) can be used for unrolling JSON arrays from `field1`, `fieldN` `| unroll by (field1, ..., fieldN)` [pipe](#pipes) can be used for unrolling JSON arrays from `field1`, `fieldN`

View file

@ -226,6 +226,12 @@ func parsePipe(lex *lexer) (pipe, error) {
return nil, fmt.Errorf("cannot parse 'unpack_logfmt' pipe: %w", err) return nil, fmt.Errorf("cannot parse 'unpack_logfmt' pipe: %w", err)
} }
return pu, nil return pu, nil
case lex.isKeyword("unpack_syslog"):
pu, err := parsePipeUnpackSyslog(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse 'unpack_syslog' pipe: %w", err)
}
return pu, nil
case lex.isKeyword("unroll"): case lex.isKeyword("unroll"):
pu, err := parsePipeUnroll(lex) pu, err := parsePipeUnroll(lex)
if err != nil { if err != nil {

View file

@ -0,0 +1,146 @@
package logstorage
import (
"fmt"
"sync/atomic"
"time"
)
// pipeUnpackSyslog processes '| unpack_syslog ...' pipe.
//
// See https://docs.victoriametrics.com/victorialogs/logsql/#unpack_syslog-pipe
type pipeUnpackSyslog struct {
// fromField is the field to unpack syslog fields from
fromField string
// resultPrefix is prefix to add to unpacked field names
resultPrefix string
keepOriginalFields bool
// iff is an optional filter for skipping unpacking syslog
iff *ifFilter
}
func (pu *pipeUnpackSyslog) String() string {
s := "unpack_syslog"
if pu.iff != nil {
s += " " + pu.iff.String()
}
if !isMsgFieldName(pu.fromField) {
s += " from " + quoteTokenIfNeeded(pu.fromField)
}
if pu.resultPrefix != "" {
s += " result_prefix " + quoteTokenIfNeeded(pu.resultPrefix)
}
if pu.keepOriginalFields {
s += " keep_original_fields"
}
return s
}
func (pu *pipeUnpackSyslog) updateNeededFields(neededFields, unneededFields fieldsSet) {
updateNeededFieldsForUnpackPipe(pu.fromField, nil, pu.keepOriginalFields, false, pu.iff, neededFields, unneededFields)
}
func (pu *pipeUnpackSyslog) optimize() {
pu.iff.optimizeFilterIn()
}
func (pu *pipeUnpackSyslog) hasFilterInWithQuery() bool {
return pu.iff.hasFilterInWithQuery()
}
func (pu *pipeUnpackSyslog) initFilterInValues(cache map[string][]string, getFieldValuesFunc getFieldValuesFunc) (pipe, error) {
iffNew, err := pu.iff.initFilterInValues(cache, getFieldValuesFunc)
if err != nil {
return nil, err
}
puNew := *pu
puNew.iff = iffNew
return &puNew, nil
}
func (pu *pipeUnpackSyslog) newPipeProcessor(workersCount int, _ <-chan struct{}, _ func(), ppNext pipeProcessor) pipeProcessor {
unpackSyslog := func(uctx *fieldsUnpackerContext, s string) {
year := currentYear.Load()
p := getSyslogParser(int(year))
p.parse(s)
for _, f := range p.fields {
uctx.addField(f.Name, f.Value)
}
putSyslogParser(p)
}
return newPipeUnpackProcessor(workersCount, unpackSyslog, ppNext, pu.fromField, pu.resultPrefix, pu.keepOriginalFields, false, pu.iff)
}
var currentYear atomic.Int64
func init() {
year := time.Now().UTC().Year()
currentYear.Store(int64(year))
go func() {
for {
t := time.Now().UTC()
nextYear := time.Date(t.Year()+1, 1, 1, 0, 0, 0, 0, time.UTC)
d := nextYear.Sub(t)
time.Sleep(d)
year := time.Now().UTC().Year()
currentYear.Store(int64(year))
}
}()
}
func parsePipeUnpackSyslog(lex *lexer) (*pipeUnpackSyslog, error) {
if !lex.isKeyword("unpack_syslog") {
return nil, fmt.Errorf("unexpected token: %q; want %q", lex.token, "unpack_syslog")
}
lex.nextToken()
var iff *ifFilter
if lex.isKeyword("if") {
f, err := parseIfFilter(lex)
if err != nil {
return nil, err
}
iff = f
}
fromField := "_msg"
if lex.isKeyword("from") {
lex.nextToken()
f, err := parseFieldName(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse 'from' field name: %w", err)
}
fromField = f
}
resultPrefix := ""
if lex.isKeyword("result_prefix") {
lex.nextToken()
p, err := getCompoundToken(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse 'result_prefix': %w", err)
}
resultPrefix = p
}
keepOriginalFields := false
if lex.isKeyword("keep_original_fields") {
lex.nextToken()
keepOriginalFields = true
}
pu := &pipeUnpackSyslog{
fromField: fromField,
resultPrefix: resultPrefix,
keepOriginalFields: keepOriginalFields,
iff: iff,
}
return pu, nil
}

View file

@ -0,0 +1,239 @@
package logstorage
import (
"testing"
)
func TestParsePipeUnpackSyslogSuccess(t *testing.T) {
f := func(pipeStr string) {
t.Helper()
expectParsePipeSuccess(t, pipeStr)
}
f(`unpack_syslog`)
f(`unpack_syslog keep_original_fields`)
f(`unpack_syslog if (a:x)`)
f(`unpack_syslog if (a:x) keep_original_fields`)
f(`unpack_syslog from x`)
f(`unpack_syslog from x keep_original_fields`)
f(`unpack_syslog if (a:x) from x`)
f(`unpack_syslog from x result_prefix abc`)
f(`unpack_syslog if (a:x) from x result_prefix abc`)
f(`unpack_syslog result_prefix abc`)
f(`unpack_syslog if (a:x) result_prefix abc`)
}
func TestParsePipeUnpackSyslogFailure(t *testing.T) {
f := func(pipeStr string) {
t.Helper()
expectParsePipeFailure(t, pipeStr)
}
f(`unpack_syslog foo`)
f(`unpack_syslog if`)
f(`unpack_syslog if (x:y) foobar`)
f(`unpack_syslog from`)
f(`unpack_syslog from x y`)
f(`unpack_syslog from x if`)
f(`unpack_syslog from x result_prefix`)
f(`unpack_syslog from x result_prefix a b`)
f(`unpack_syslog from x result_prefix a if`)
f(`unpack_syslog result_prefix`)
f(`unpack_syslog result_prefix a b`)
f(`unpack_syslog result_prefix a if`)
}
func TestPipeUnpackSyslog(t *testing.T) {
f := func(pipeStr string, rows, rowsExpected [][]Field) {
t.Helper()
expectPipeResults(t, pipeStr, rows, rowsExpected)
}
// no skip empty results
f("unpack_syslog", [][]Field{
{
{"_msg", `<165>1 2023-06-03T17:42:32.123456789Z mymachine.example.com appname 12345 ID47 - This is a test message with structured data`},
{"foo", "321"},
},
}, [][]Field{
{
{"_msg", `<165>1 2023-06-03T17:42:32.123456789Z mymachine.example.com appname 12345 ID47 - This is a test message with structured data`},
{"foo", "321"},
{"priority", "165"},
{"facility", "20"},
{"severity", "5"},
{"timestamp", "2023-06-03T17:42:32.123456789Z"},
{"hostname", "mymachine.example.com"},
{"app_name", "appname"},
{"proc_id", "12345"},
{"msg_id", "ID47"},
{"message", "This is a test message with structured data"},
},
})
// keep original fields
f("unpack_syslog keep_original_fields", [][]Field{
{
{"_msg", `<165>1 2023-06-03T17:42:32.123456789Z mymachine.example.com appname 12345 ID47 - This is a test message with structured data`},
{"foo", "321"},
{"app_name", "foobar"},
{"msg_id", "baz"},
},
}, [][]Field{
{
{"_msg", `<165>1 2023-06-03T17:42:32.123456789Z mymachine.example.com appname 12345 ID47 - This is a test message with structured data`},
{"foo", "321"},
{"priority", "165"},
{"facility", "20"},
{"severity", "5"},
{"timestamp", "2023-06-03T17:42:32.123456789Z"},
{"hostname", "mymachine.example.com"},
{"app_name", "foobar"},
{"proc_id", "12345"},
{"msg_id", "baz"},
{"message", "This is a test message with structured data"},
},
})
// unpack from other field
f("unpack_syslog from x", [][]Field{
{
{"x", `<165>1 2023-06-03T17:42:32.123456789Z mymachine.example.com appname 12345 ID47 - This is a test message with structured data`},
},
}, [][]Field{
{
{"x", `<165>1 2023-06-03T17:42:32.123456789Z mymachine.example.com appname 12345 ID47 - This is a test message with structured data`},
{"priority", "165"},
{"facility", "20"},
{"severity", "5"},
{"timestamp", "2023-06-03T17:42:32.123456789Z"},
{"hostname", "mymachine.example.com"},
{"app_name", "appname"},
{"proc_id", "12345"},
{"msg_id", "ID47"},
{"message", "This is a test message with structured data"},
},
})
// failed if condition
f("unpack_syslog if (foo:bar)", [][]Field{
{
{"_msg", `<165>1 2023-06-03T17:42:32.123456789Z mymachine.example.com appname 12345 ID47 - This is a test message with structured data`},
},
}, [][]Field{
{
{"_msg", `<165>1 2023-06-03T17:42:32.123456789Z mymachine.example.com appname 12345 ID47 - This is a test message with structured data`},
},
})
// matched if condition
f("unpack_syslog if (appname)", [][]Field{
{
{"_msg", `<165>1 2023-06-03T17:42:32.123456789Z mymachine.example.com appname 12345 ID47 - This is a test message with structured data`},
},
}, [][]Field{
{
{"_msg", `<165>1 2023-06-03T17:42:32.123456789Z mymachine.example.com appname 12345 ID47 - This is a test message with structured data`},
{"priority", "165"},
{"facility", "20"},
{"severity", "5"},
{"timestamp", "2023-06-03T17:42:32.123456789Z"},
{"hostname", "mymachine.example.com"},
{"app_name", "appname"},
{"proc_id", "12345"},
{"msg_id", "ID47"},
{"message", "This is a test message with structured data"},
},
})
// single row, unpack from missing field
f("unpack_syslog from x", [][]Field{
{
{"_msg", `foo=bar`},
},
}, [][]Field{
{
{"_msg", `foo=bar`},
},
})
// single row, unpack from non-syslog field
f("unpack_syslog from x", [][]Field{
{
{"x", `foobar`},
},
}, [][]Field{
{
{"x", `foobar`},
},
})
// multiple rows with distinct number of fields
f("unpack_syslog from x result_prefix qwe_", [][]Field{
{
{"x", `<165>1 2023-06-03T17:42:32.123456789Z mymachine.example.com appname 12345 ID47 - This is a test message with structured data`},
},
{
{"x", `<163>1 2024-12-13T18:21:43Z mymachine.example.com appname2 345 ID7 - foobar`},
{"y", `z=bar`},
},
}, [][]Field{
{
{"x", `<165>1 2023-06-03T17:42:32.123456789Z mymachine.example.com appname 12345 ID47 - This is a test message with structured data`},
{"qwe_priority", "165"},
{"qwe_facility", "20"},
{"qwe_severity", "5"},
{"qwe_timestamp", "2023-06-03T17:42:32.123456789Z"},
{"qwe_hostname", "mymachine.example.com"},
{"qwe_app_name", "appname"},
{"qwe_proc_id", "12345"},
{"qwe_msg_id", "ID47"},
{"qwe_message", "This is a test message with structured data"},
},
{
{"x", `<163>1 2024-12-13T18:21:43Z mymachine.example.com appname2 345 ID7 - foobar`},
{"y", `z=bar`},
{"qwe_priority", "163"},
{"qwe_facility", "20"},
{"qwe_severity", "3"},
{"qwe_timestamp", "2024-12-13T18:21:43Z"},
{"qwe_hostname", "mymachine.example.com"},
{"qwe_app_name", "appname2"},
{"qwe_proc_id", "345"},
{"qwe_msg_id", "ID7"},
{"qwe_message", "foobar"},
},
})
}
func TestPipeUnpackSyslogUpdateNeededFields(t *testing.T) {
f := func(s string, neededFields, unneededFields, neededFieldsExpected, unneededFieldsExpected string) {
t.Helper()
expectPipeNeededFields(t, s, neededFields, unneededFields, neededFieldsExpected, unneededFieldsExpected)
}
// all the needed fields
f("unpack_syslog", "*", "", "*", "")
f("unpack_syslog keep_original_fields", "*", "", "*", "")
f("unpack_syslog if (y:z) from x", "*", "", "*", "")
// all the needed fields, unneeded fields do not intersect with src
f("unpack_syslog from x", "*", "f1,f2", "*", "f1,f2")
f("unpack_syslog if (y:z) from x", "*", "f1,f2", "*", "f1,f2")
f("unpack_syslog if (f1:z) from x", "*", "f1,f2", "*", "f2")
// all the needed fields, unneeded fields intersect with src
f("unpack_syslog from x", "*", "f2,x", "*", "f2")
f("unpack_syslog if (y:z) from x", "*", "f2,x", "*", "f2")
f("unpack_syslog if (f2:z) from x", "*", "f1,f2,x", "*", "f1")
// needed fields do not intersect with src
f("unpack_syslog from x", "f1,f2", "", "f1,f2,x", "")
f("unpack_syslog if (y:z) from x", "f1,f2", "", "f1,f2,x,y", "")
f("unpack_syslog if (f1:z) from x", "f1,f2", "", "f1,f2,x", "")
// needed fields intersect with src
f("unpack_syslog from x", "f2,x", "", "f2,x", "")
f("unpack_syslog if (y:z) from x", "f2,x", "", "f2,x,y", "")
f("unpack_syslog if (f2:z y:qwe) from x", "f2,x", "", "f2,x,y", "")
}

View file

@ -0,0 +1,308 @@
package logstorage
import (
"strconv"
"strings"
"sync"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
)
func getSyslogParser(currentYear int) *syslogParser {
v := syslogParserPool.Get()
if v == nil {
v = &syslogParser{}
}
p := v.(*syslogParser)
p.currentYear = currentYear
return p
}
func putSyslogParser(p *syslogParser) {
p.reset()
syslogParserPool.Put(p)
}
var syslogParserPool sync.Pool
type syslogParser struct {
currentYear int
buf []byte
fields []Field
}
func (p *syslogParser) reset() {
p.currentYear = 0
p.resetFields()
}
func (p *syslogParser) resetFields() {
p.buf = p.buf[:0]
clear(p.fields)
p.fields = p.fields[:0]
}
func (p *syslogParser) addField(name, value string) {
p.fields = append(p.fields, Field{
Name: name,
Value: value,
})
}
func (p *syslogParser) parse(s string) {
p.resetFields()
if len(s) == 0 {
// Cannot parse syslog message
return
}
if s[0] != '<' {
p.parseNoHeader(s)
return
}
// parse priority
s = s[1:]
n := strings.IndexByte(s, '>')
if n < 0 {
// Cannot parse priority
return
}
priorityStr := s[:n]
s = s[n+1:]
p.addField("priority", priorityStr)
priority, ok := tryParseUint64(priorityStr)
if !ok {
// Cannot parse priority
return
}
facility := priority / 8
severity := priority % 8
bufLen := len(p.buf)
p.buf = marshalUint64String(p.buf, facility)
p.addField("facility", bytesutil.ToUnsafeString(p.buf[bufLen:]))
bufLen = len(p.buf)
p.buf = marshalUint64String(p.buf, severity)
p.addField("severity", bytesutil.ToUnsafeString(p.buf[bufLen:]))
p.parseNoHeader(s)
}
func (p *syslogParser) parseNoHeader(s string) {
if len(s) == 0 {
return
}
if strings.HasPrefix(s, "1 ") {
p.parseRFC5424(s[2:])
} else {
p.parseRFC3164(s)
}
}
func (p *syslogParser) parseRFC5424(s string) {
// See https://datatracker.ietf.org/doc/html/rfc5424
if len(s) == 0 {
return
}
// Parse timestamp
n := strings.IndexByte(s, ' ')
if n < 0 {
p.addField("timestamp", s)
return
}
p.addField("timestamp", s[:n])
s = s[n+1:]
// Parse hostname
n = strings.IndexByte(s, ' ')
if n < 0 {
p.addField("hostname", s)
return
}
p.addField("hostname", s[:n])
s = s[n+1:]
// Parse app-name
n = strings.IndexByte(s, ' ')
if n < 0 {
p.addField("app_name", s)
return
}
p.addField("app_name", s[:n])
s = s[n+1:]
// Parse procid
n = strings.IndexByte(s, ' ')
if n < 0 {
p.addField("proc_id", s)
return
}
p.addField("proc_id", s[:n])
s = s[n+1:]
// Parse msgID
n = strings.IndexByte(s, ' ')
if n < 0 {
p.addField("msg_id", s)
return
}
p.addField("msg_id", s[:n])
s = s[n+1:]
// Parse structured data
tail, ok := p.parseRFC5424SD(s)
if !ok {
return
}
s = tail
// Parse message
p.addField("message", s)
}
func (p *syslogParser) parseRFC5424SD(s string) (string, bool) {
if strings.HasPrefix(s, "- ") {
return s[2:], true
}
for {
tail, ok := p.parseRFC5424SDLine(s)
if !ok {
return tail, false
}
s = tail
if strings.HasPrefix(s, " ") {
s = s[1:]
return s, true
}
}
}
func (p *syslogParser) parseRFC5424SDLine(s string) (string, bool) {
if len(s) == 0 || s[0] != '[' {
return s, false
}
s = s[1:]
n := strings.IndexAny(s, " ]")
if n < 0 {
return s, false
}
sdID := s[:n]
s = s[n:]
// Parse structured data
i := 0
for i < len(s) && s[i] != ']' {
// skip whitespace
if s[i] != ' ' {
return s, false
}
i++
// Parse name
n := strings.IndexByte(s[i:], '=')
if n < 0 {
return s, false
}
i += n + 1
// Parse value
qp, err := strconv.QuotedPrefix(s[i:])
if err != nil {
return s, false
}
i += len(qp)
}
if i == len(s) {
return s, false
}
sdValue := strings.TrimSpace(s[:i])
p.addField(sdID, sdValue)
s = s[i+1:]
return s, true
}
func (p *syslogParser) parseRFC3164(s string) {
// See https://datatracker.ietf.org/doc/html/rfc3164
// Parse timestamp
n := len(time.Stamp)
if len(s) < n {
return
}
t, err := time.Parse(time.Stamp, s[:n])
if err != nil {
// TODO: fall back to parsing ISO8601 timestamp?
return
}
s = s[n:]
t = t.UTC()
t = time.Date(p.currentYear, t.Month(), t.Day(), t.Hour(), t.Minute(), t.Second(), t.Nanosecond(), time.UTC)
if uint64(t.Unix())-24*3600 > fasttime.UnixTimestamp() {
// Adjust time to the previous year
t = time.Date(t.Year()-1, t.Month(), t.Day(), t.Hour(), t.Minute(), t.Second(), t.Nanosecond(), time.UTC)
}
bufLen := len(p.buf)
p.buf = marshalTimestampISO8601String(p.buf, t.UnixNano())
p.addField("timestamp", bytesutil.ToUnsafeString(p.buf[bufLen:]))
if len(s) == 0 || s[0] != ' ' {
// Missing space after the time field
return
}
s = s[1:]
// Parse hostname
n = strings.IndexByte(s, ' ')
if n < 0 {
p.addField("hostname", s)
return
}
p.addField("hostname", s[:n])
s = s[n+1:]
// Parse tag (aka app_name)
n = strings.IndexAny(s, "[: ")
if n < 0 {
p.addField("app_name", s)
return
}
p.addField("app_name", s[:n])
s = s[n:]
// Parse proc_id
if len(s) == 0 {
return
}
if s[0] == '[' {
s = s[1:]
n = strings.IndexByte(s, ']')
if n < 0 {
return
}
p.addField("proc_id", s[:n])
s = s[n+1:]
}
// Skip optional ': ' in front of message
s = strings.TrimPrefix(s, ":")
s = strings.TrimPrefix(s, " ")
if len(s) > 0 {
p.addField("message", s)
}
}

View file

@ -0,0 +1,67 @@
package logstorage
import (
"testing"
)
func TestSyslogParser(t *testing.T) {
f := func(s, resultExpected string) {
t.Helper()
const currentYear = 2024
p := getSyslogParser(currentYear)
defer putSyslogParser(p)
p.parse(s)
result := MarshalFieldsToJSON(nil, p.fields)
if string(result) != resultExpected {
t.Fatalf("unexpected result when parsing [%s]; got\n%s\nwant\n%s\n", s, result, resultExpected)
}
}
// RFC 3164
f("Jun 3 12:08:33 abcd systemd[1]: Starting Update the local ESM caches...",
`{"timestamp":"2024-06-03T12:08:33.000Z","hostname":"abcd","app_name":"systemd","proc_id":"1","message":"Starting Update the local ESM caches..."}`)
f("<165>Jun 3 12:08:33 abcd systemd[1]: Starting Update the local ESM caches...",
`{"priority":"165","facility":"20","severity":"5","timestamp":"2024-06-03T12:08:33.000Z","hostname":"abcd","app_name":"systemd","proc_id":"1","message":"Starting Update the local ESM caches..."}`)
f("Mar 13 12:08:33 abcd systemd: Starting Update the local ESM caches...",
`{"timestamp":"2024-03-13T12:08:33.000Z","hostname":"abcd","app_name":"systemd","message":"Starting Update the local ESM caches..."}`)
f("Jun 3 12:08:33 abcd - Starting Update the local ESM caches...",
`{"timestamp":"2024-06-03T12:08:33.000Z","hostname":"abcd","app_name":"-","message":"Starting Update the local ESM caches..."}`)
f("Jun 3 12:08:33 - - Starting Update the local ESM caches...",
`{"timestamp":"2024-06-03T12:08:33.000Z","hostname":"-","app_name":"-","message":"Starting Update the local ESM caches..."}`)
// RFC 5424
f(`<165>1 2023-06-03T17:42:32.123456789Z mymachine.example.com appname 12345 ID47 - This is a test message with structured data.`,
`{"priority":"165","facility":"20","severity":"5","timestamp":"2023-06-03T17:42:32.123456789Z","hostname":"mymachine.example.com","app_name":"appname","proc_id":"12345","msg_id":"ID47","message":"This is a test message with structured data."}`)
f(`1 2023-06-03T17:42:32.123456789Z mymachine.example.com appname 12345 ID47 - This is a test message with structured data.`,
`{"timestamp":"2023-06-03T17:42:32.123456789Z","hostname":"mymachine.example.com","app_name":"appname","proc_id":"12345","msg_id":"ID47","message":"This is a test message with structured data."}`)
f(`<165>1 2023-06-03T17:42:00.000Z mymachine.example.com appname 12345 ID47 [exampleSDID@32473 iut="3" eventSource="Application 123 = ] 56" eventID="11211"] This is a test message with structured data.`,
`{"priority":"165","facility":"20","severity":"5","timestamp":"2023-06-03T17:42:00.000Z","hostname":"mymachine.example.com","app_name":"appname","proc_id":"12345","msg_id":"ID47","exampleSDID@32473":"iut=\"3\" eventSource=\"Application 123 = ] 56\" eventID=\"11211\"","message":"This is a test message with structured data."}`)
f(`<165>1 2023-06-03T17:42:00.000Z mymachine.example.com appname 12345 ID47 [foo@123 iut="3"][bar@456 eventID="11211"] This is a test message with structured data.`,
`{"priority":"165","facility":"20","severity":"5","timestamp":"2023-06-03T17:42:00.000Z","hostname":"mymachine.example.com","app_name":"appname","proc_id":"12345","msg_id":"ID47","foo@123":"iut=\"3\"","bar@456":"eventID=\"11211\"","message":"This is a test message with structured data."}`)
// Incomplete RFC 3164
f("", `{}`)
f("Jun 3 12:08:33", `{"timestamp":"2024-06-03T12:08:33.000Z"}`)
f("Jun 3 12:08:33 abcd", `{"timestamp":"2024-06-03T12:08:33.000Z","hostname":"abcd"}`)
f("Jun 3 12:08:33 abcd sudo", `{"timestamp":"2024-06-03T12:08:33.000Z","hostname":"abcd","app_name":"sudo"}`)
f("Jun 3 12:08:33 abcd sudo[123]", `{"timestamp":"2024-06-03T12:08:33.000Z","hostname":"abcd","app_name":"sudo","proc_id":"123"}`)
f("Jun 3 12:08:33 abcd sudo foobar", `{"timestamp":"2024-06-03T12:08:33.000Z","hostname":"abcd","app_name":"sudo","message":"foobar"}`)
// Incomplete RFC 5424
f(`<165>1 2023-06-03T17:42:32.123456789Z mymachine.example.com appname 12345 ID47 [foo@123]`,
`{"priority":"165","facility":"20","severity":"5","timestamp":"2023-06-03T17:42:32.123456789Z","hostname":"mymachine.example.com","app_name":"appname","proc_id":"12345","msg_id":"ID47","foo@123":""}`)
f(`<165>1 2023-06-03T17:42:32.123456789Z mymachine.example.com appname 12345 ID47`,
`{"priority":"165","facility":"20","severity":"5","timestamp":"2023-06-03T17:42:32.123456789Z","hostname":"mymachine.example.com","app_name":"appname","proc_id":"12345","msg_id":"ID47"}`)
f(`<165>1 2023-06-03T17:42:32.123456789Z mymachine.example.com appname 12345`,
`{"priority":"165","facility":"20","severity":"5","timestamp":"2023-06-03T17:42:32.123456789Z","hostname":"mymachine.example.com","app_name":"appname","proc_id":"12345"}`)
f(`<165>1 2023-06-03T17:42:32.123456789Z mymachine.example.com appname`,
`{"priority":"165","facility":"20","severity":"5","timestamp":"2023-06-03T17:42:32.123456789Z","hostname":"mymachine.example.com","app_name":"appname"}`)
f(`<165>1 2023-06-03T17:42:32.123456789Z mymachine.example.com`,
`{"priority":"165","facility":"20","severity":"5","timestamp":"2023-06-03T17:42:32.123456789Z","hostname":"mymachine.example.com"}`)
f(`<165>1 2023-06-03T17:42:32.123456789Z`,
`{"priority":"165","facility":"20","severity":"5","timestamp":"2023-06-03T17:42:32.123456789Z"}`)
f(`<165>1 `,
`{"priority":"165","facility":"20","severity":"5"}`)
}