app/vlinsert/syslog: allow changing the default set of log fields to use as stream fields during syslog data ingestion

Thanks to @AndrewChubatiuk for the initial implementation at https://github.com/VictoriaMetrics/VictoriaMetrics/pull/7488
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7480

See https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/#stream-fields
This commit is contained in:
Aliaksandr Valialkin 2024-11-08 21:20:58 +01:00
parent cd60a4c589
commit 4f0bec6f03
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB
5 changed files with 76 additions and 19 deletions

View file

@ -112,19 +112,22 @@ func getExtraFields(r *http.Request) ([]logstorage.Field, error) {
} }
// GetCommonParamsForSyslog returns common params needed for parsing syslog messages and storing them to the given tenantID. // GetCommonParamsForSyslog returns common params needed for parsing syslog messages and storing them to the given tenantID.
func GetCommonParamsForSyslog(tenantID logstorage.TenantID, ignoreFields []string, extraFields []logstorage.Field) *CommonParams { func GetCommonParamsForSyslog(tenantID logstorage.TenantID, streamFields, ignoreFields []string, extraFields []logstorage.Field) *CommonParams {
// See https://docs.victoriametrics.com/victorialogs/logsql/#unpack_syslog-pipe // See https://docs.victoriametrics.com/victorialogs/logsql/#unpack_syslog-pipe
if streamFields == nil {
streamFields = []string{
"hostname",
"app_name",
"proc_id",
}
}
cp := &CommonParams{ cp := &CommonParams{
TenantID: tenantID, TenantID: tenantID,
TimeField: "timestamp", TimeField: "timestamp",
MsgFields: []string{ MsgFields: []string{
"message", "message",
}, },
StreamFields: []string{ StreamFields: streamFields,
"hostname",
"app_name",
"proc_id",
},
IgnoreFields: ignoreFields, IgnoreFields: ignoreFields,
ExtraFields: extraFields, ExtraFields: extraFields,
} }

View file

@ -37,6 +37,11 @@ var (
syslogTimezone = flag.String("syslog.timezone", "Local", "Timezone to use when parsing timestamps in RFC3164 syslog messages. Timezone must be a valid IANA Time Zone. "+ syslogTimezone = flag.String("syslog.timezone", "Local", "Timezone to use when parsing timestamps in RFC3164 syslog messages. Timezone must be a valid IANA Time Zone. "+
"For example: America/New_York, Europe/Berlin, Etc/GMT+3 . See https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/") "For example: America/New_York, Europe/Berlin, Etc/GMT+3 . See https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/")
streamFieldsTCP = flagutil.NewArrayString("syslog.streamFields.tcp", "Fields to use as log stream labels for logs ingested via the corresponding -syslog.listenAddr.tcp. "+
`See https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/#stream-fields`)
streamFieldsUDP = flagutil.NewArrayString("syslog.streamFields.udp", "Fields to use as log stream labels for logs ingested via the corresponding -syslog.listenAddr.udp. "+
`See https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/#stream-fields`)
ignoreFieldsTCP = flagutil.NewArrayString("syslog.ignoreFields.tcp", "Fields to ignore at logs ingested via the corresponding -syslog.listenAddr.tcp. "+ ignoreFieldsTCP = flagutil.NewArrayString("syslog.ignoreFields.tcp", "Fields to ignore at logs ingested via the corresponding -syslog.listenAddr.tcp. "+
`See https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/#dropping-fields`) `See https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/#dropping-fields`)
ignoreFieldsUDP = flagutil.NewArrayString("syslog.ignoreFields.udp", "Fields to ignore at logs ingested via the corresponding -syslog.listenAddr.udp. "+ ignoreFieldsUDP = flagutil.NewArrayString("syslog.ignoreFields.udp", "Fields to ignore at logs ingested via the corresponding -syslog.listenAddr.udp. "+
@ -173,8 +178,14 @@ func runUDPListener(addr string, argIdx int) {
useLocalTimestamp := useLocalTimestampUDP.GetOptionalArg(argIdx) useLocalTimestamp := useLocalTimestampUDP.GetOptionalArg(argIdx)
streamFieldsStr := streamFieldsUDP.GetOptionalArg(argIdx)
streamFields, err := parseFieldsList(streamFieldsStr)
if err != nil {
logger.Fatalf("cannot parse -syslog.streamFields.udp=%q for -syslog.listenAddr.udp=%q: %s", streamFieldsStr, addr, err)
}
ignoreFieldsStr := ignoreFieldsUDP.GetOptionalArg(argIdx) ignoreFieldsStr := ignoreFieldsUDP.GetOptionalArg(argIdx)
ignoreFields, err := parseIgnoreFields(ignoreFieldsStr) ignoreFields, err := parseFieldsList(ignoreFieldsStr)
if err != nil { if err != nil {
logger.Fatalf("cannot parse -syslog.ignoreFields.udp=%q for -syslog.listenAddr.udp=%q: %s", ignoreFieldsStr, addr, err) logger.Fatalf("cannot parse -syslog.ignoreFields.udp=%q for -syslog.listenAddr.udp=%q: %s", ignoreFieldsStr, addr, err)
} }
@ -187,7 +198,7 @@ func runUDPListener(addr string, argIdx int) {
doneCh := make(chan struct{}) doneCh := make(chan struct{})
go func() { go func() {
serveUDP(ln, tenantID, compressMethod, useLocalTimestamp, ignoreFields, extraFields) serveUDP(ln, tenantID, compressMethod, useLocalTimestamp, streamFields, ignoreFields, extraFields)
close(doneCh) close(doneCh)
}() }()
@ -228,8 +239,14 @@ func runTCPListener(addr string, argIdx int) {
useLocalTimestamp := useLocalTimestampTCP.GetOptionalArg(argIdx) useLocalTimestamp := useLocalTimestampTCP.GetOptionalArg(argIdx)
streamFieldsStr := streamFieldsTCP.GetOptionalArg(argIdx)
streamFields, err := parseFieldsList(streamFieldsStr)
if err != nil {
logger.Fatalf("cannot parse -syslog.streamFields.tcp=%q for -syslog.listenAddr.tcp=%q: %s", streamFieldsStr, addr, err)
}
ignoreFieldsStr := ignoreFieldsTCP.GetOptionalArg(argIdx) ignoreFieldsStr := ignoreFieldsTCP.GetOptionalArg(argIdx)
ignoreFields, err := parseIgnoreFields(ignoreFieldsStr) ignoreFields, err := parseFieldsList(ignoreFieldsStr)
if err != nil { if err != nil {
logger.Fatalf("cannot parse -syslog.ignoreFields.tcp=%q for -syslog.listenAddr.tcp=%q: %s", ignoreFieldsStr, addr, err) logger.Fatalf("cannot parse -syslog.ignoreFields.tcp=%q for -syslog.listenAddr.tcp=%q: %s", ignoreFieldsStr, addr, err)
} }
@ -242,7 +259,7 @@ func runTCPListener(addr string, argIdx int) {
doneCh := make(chan struct{}) doneCh := make(chan struct{})
go func() { go func() {
serveTCP(ln, tenantID, compressMethod, useLocalTimestamp, ignoreFields, extraFields) serveTCP(ln, tenantID, compressMethod, useLocalTimestamp, streamFields, ignoreFields, extraFields)
close(doneCh) close(doneCh)
}() }()
@ -264,7 +281,7 @@ func checkCompressMethod(compressMethod, addr, protocol string) {
} }
} }
func serveUDP(ln net.PacketConn, tenantID logstorage.TenantID, compressMethod string, useLocalTimestamp bool, ignoreFields []string, extraFields []logstorage.Field) { func serveUDP(ln net.PacketConn, tenantID logstorage.TenantID, compressMethod string, useLocalTimestamp bool, streamFields, ignoreFields []string, extraFields []logstorage.Field) {
gomaxprocs := cgroup.AvailableCPUs() gomaxprocs := cgroup.AvailableCPUs()
var wg sync.WaitGroup var wg sync.WaitGroup
localAddr := ln.LocalAddr() localAddr := ln.LocalAddr()
@ -272,7 +289,7 @@ func serveUDP(ln net.PacketConn, tenantID logstorage.TenantID, compressMethod st
wg.Add(1) wg.Add(1)
go func() { go func() {
defer wg.Done() defer wg.Done()
cp := insertutils.GetCommonParamsForSyslog(tenantID, ignoreFields, extraFields) cp := insertutils.GetCommonParamsForSyslog(tenantID, streamFields, ignoreFields, extraFields)
var bb bytesutil.ByteBuffer var bb bytesutil.ByteBuffer
bb.B = bytesutil.ResizeNoCopyNoOverallocate(bb.B, 64*1024) bb.B = bytesutil.ResizeNoCopyNoOverallocate(bb.B, 64*1024)
for { for {
@ -306,7 +323,7 @@ func serveUDP(ln net.PacketConn, tenantID logstorage.TenantID, compressMethod st
wg.Wait() wg.Wait()
} }
func serveTCP(ln net.Listener, tenantID logstorage.TenantID, compressMethod string, useLocalTimestamp bool, ignoreFields []string, extraFields []logstorage.Field) { func serveTCP(ln net.Listener, tenantID logstorage.TenantID, compressMethod string, useLocalTimestamp bool, streamFields, ignoreFields []string, extraFields []logstorage.Field) {
var cm ingestserver.ConnsMap var cm ingestserver.ConnsMap
cm.Init("syslog") cm.Init("syslog")
@ -336,7 +353,7 @@ func serveTCP(ln net.Listener, tenantID logstorage.TenantID, compressMethod stri
wg.Add(1) wg.Add(1)
go func() { go func() {
cp := insertutils.GetCommonParamsForSyslog(tenantID, ignoreFields, extraFields) cp := insertutils.GetCommonParamsForSyslog(tenantID, streamFields, ignoreFields, extraFields)
if err := processStream(c, compressMethod, useLocalTimestamp, cp); err != nil { if err := processStream(c, compressMethod, useLocalTimestamp, cp); err != nil {
logger.Errorf("syslog: cannot process TCP data at %q: %s", addr, err) logger.Errorf("syslog: cannot process TCP data at %q: %s", addr, err)
} }
@ -568,7 +585,7 @@ var (
udpErrorsTotal = metrics.NewCounter(`vl_udp_errors_total{type="syslog"}`) udpErrorsTotal = metrics.NewCounter(`vl_udp_errors_total{type="syslog"}`)
) )
func parseIgnoreFields(s string) ([]string, error) { func parseFieldsList(s string) ([]string, error) {
if s == "" { if s == "" {
return nil, nil return nil, nil
} }

View file

@ -19,6 +19,7 @@ according to [these docs](https://docs.victoriametrics.com/victorialogs/quicksta
* FEATURE: [`_time` filter](https://docs.victoriametrics.com/victorialogs/logsql/#time-filter): allow specifying offset without time range. For example, `_time:offset 1d` matches all the logs until `now-1d` in the [`_time` field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#time-field). This is useful when building graphs for time ranges with some offset in the past. * FEATURE: [`_time` filter](https://docs.victoriametrics.com/victorialogs/logsql/#time-filter): allow specifying offset without time range. For example, `_time:offset 1d` matches all the logs until `now-1d` in the [`_time` field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#time-field). This is useful when building graphs for time ranges with some offset in the past.
* FEATURE: [`/select/logsql/tail` HTTP endpoint](): support for `offset` query arg, which can be used for delayed emission of matching logs during live tailing. Thanks to @Fusl for the initial idea and implementation in [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/7428). * FEATURE: [`/select/logsql/tail` HTTP endpoint](): support for `offset` query arg, which can be used for delayed emission of matching logs during live tailing. Thanks to @Fusl for the initial idea and implementation in [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/7428).
* FEATURE: [vlogscli](https://docs.victoriametrics.com/victorialogs/querying/vlogscli/): allow enabling and disabling wrapping of long lines, which do not fit screen width, with `\wrap_long_lines` command. * FEATURE: [vlogscli](https://docs.victoriametrics.com/victorialogs/querying/vlogscli/): allow enabling and disabling wrapping of long lines, which do not fit screen width, with `\wrap_long_lines` command.
* FEATURE: [syslog data ingestion](https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/): allow overriding default [stream fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields) with the given [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) during data ingestion. See [these docs](https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/#stream-fields). See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7480).
* FEATURE: [syslog data ingestion](https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/): allow adding arbitrary [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) via `[label1=value1 ... labelN=valueN]` syntax inside Syslog messages. For example, `<165>1 2024-06-03T17:42:00.000Z example.com appname 12345 ID47 [field1=value1 field2=value2] some message`. * FEATURE: [syslog data ingestion](https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/): allow adding arbitrary [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) via `[label1=value1 ... labelN=valueN]` syntax inside Syslog messages. For example, `<165>1 2024-06-03T17:42:00.000Z example.com appname 12345 ID47 [field1=value1 field2=value2] some message`.
* FEATURE: [syslog data ingestion](https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/): allow dropping the specified [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) during data ingestion. See [these docs](https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/#dropping-fields). * FEATURE: [syslog data ingestion](https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/): allow dropping the specified [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) during data ingestion. See [these docs](https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/#dropping-fields).
* FEATURE: [syslog data ingestion](https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/): allow adding the specified [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) during data ingestion. See [these docs](https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/#adding-extra-fields). * FEATURE: [syslog data ingestion](https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/): allow adding the specified [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) during data ingestion. See [these docs](https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/#adding-extra-fields).

View file

@ -430,6 +430,22 @@ Pass `-help` to VictoriaLogs in order to see the list of supported command-line
Compression method for syslog messages received at the corresponding -syslog.listenAddr.udp. Supported values: none, gzip, deflate. See https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/#compression Compression method for syslog messages received at the corresponding -syslog.listenAddr.udp. Supported values: none, gzip, deflate. See https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/#compression
Supports an array of values separated by comma or specified via multiple flags. Supports an array of values separated by comma or specified via multiple flags.
Value can contain comma inside single-quoted or double-quoted string, {}, [] and () braces. Value can contain comma inside single-quoted or double-quoted string, {}, [] and () braces.
-syslog.extraFields.tcp array
Fields to add to logs ingested via the corresponding -syslog.listenAddr.tcp. See https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/#adding-extra-fields
Supports an array of values separated by comma or specified via multiple flags.
Value can contain comma inside single-quoted or double-quoted string, {}, [] and () braces.
-syslog.extraFields.udp array
Fields to add to logs ingested via the corresponding -syslog.listenAddr.udp. See https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/#adding-extra-fields
Supports an array of values separated by comma or specified via multiple flags.
Value can contain comma inside single-quoted or double-quoted string, {}, [] and () braces.
-syslog.ignoreFields.tcp array
Fields to ignore at logs ingested via the corresponding -syslog.listenAddr.tcp. See https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/#dropping-fields
Supports an array of values separated by comma or specified via multiple flags.
Value can contain comma inside single-quoted or double-quoted string, {}, [] and () braces.
-syslog.ignoreFields.udp array
Fields to ignore at logs ingested via the corresponding -syslog.listenAddr.udp. See https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/#dropping-fields
Supports an array of values separated by comma or specified via multiple flags.
Value can contain comma inside single-quoted or double-quoted string, {}, [] and () braces.
-syslog.listenAddr.tcp array -syslog.listenAddr.tcp array
Comma-separated list of TCP addresses to listen to for Syslog messages. See https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/ Comma-separated list of TCP addresses to listen to for Syslog messages. See https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/
Supports an array of values separated by comma or specified via multiple flags. Supports an array of values separated by comma or specified via multiple flags.
@ -438,12 +454,20 @@ Pass `-help` to VictoriaLogs in order to see the list of supported command-line
Comma-separated list of UDP address to listen to for Syslog messages. See https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/ Comma-separated list of UDP address to listen to for Syslog messages. See https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/
Supports an array of values separated by comma or specified via multiple flags. Supports an array of values separated by comma or specified via multiple flags.
Value can contain comma inside single-quoted or double-quoted string, {}, [] and () braces. Value can contain comma inside single-quoted or double-quoted string, {}, [] and () braces.
-syslog.streamFields.tcp array
Fields to use as log stream labels for logs ingested via the corresponding -syslog.listenAddr.tcp. See https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/#stream-fields
Supports an array of values separated by comma or specified via multiple flags.
Value can contain comma inside single-quoted or double-quoted string, {}, [] and () braces.
-syslog.streamFields.udp array
Fields to use as log stream labels for logs ingested via the corresponding -syslog.listenAddr.udp. See https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/#stream-fields
Supports an array of values separated by comma or specified via multiple flags.
Value can contain comma inside single-quoted or double-quoted string, {}, [] and () braces.
-syslog.tenantID.tcp array -syslog.tenantID.tcp array
TenantID for logs ingested via the corresponding -syslog.listenAddr.tcp. See https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/ TenantID for logs ingested via the corresponding -syslog.listenAddr.tcp. See https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/#multitenancy
Supports an array of values separated by comma or specified via multiple flags. Supports an array of values separated by comma or specified via multiple flags.
Value can contain comma inside single-quoted or double-quoted string, {}, [] and () braces. Value can contain comma inside single-quoted or double-quoted string, {}, [] and () braces.
-syslog.tenantID.udp array -syslog.tenantID.udp array
TenantID for logs ingested via the corresponding -syslog.listenAddr.udp. See https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/ TenantID for logs ingested via the corresponding -syslog.listenAddr.udp. See https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/#multitenancy
Supports an array of values separated by comma or specified via multiple flags. Supports an array of values separated by comma or specified via multiple flags.
Value can contain comma inside single-quoted or double-quoted string, {}, [] and () braces. Value can contain comma inside single-quoted or double-quoted string, {}, [] and () braces.
-syslog.timezone string -syslog.timezone string

View file

@ -41,8 +41,8 @@ from the received Syslog lines:
- [`_time`](https://docs.victoriametrics.com/victorialogs/keyconcepts/#time-field) - log timestamp. See also [log timestamps](#log-timestamps) - [`_time`](https://docs.victoriametrics.com/victorialogs/keyconcepts/#time-field) - log timestamp. See also [log timestamps](#log-timestamps)
- [`_msg`](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field) - the `MESSAGE` field from the supported syslog formats above - [`_msg`](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field) - the `MESSAGE` field from the supported syslog formats above
- `hostname`, `app_name` and `proc_id` - [stream fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields) for unique identification - `hostname`, `app_name` and `proc_id` - for unique identification of [log streams](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields).
over every log stream It is possible to change the list of fields for log streams - see [these docs](#stream-fields).
- `priority`, `facility` and `severity` - these fields are extracted from `<PRI>` field - `priority`, `facility` and `severity` - these fields are extracted from `<PRI>` field
- `format` - this field is set to either `rfc3164` or `rfc5424` depending on the format of the parsed syslog line - `format` - this field is set to either `rfc3164` or `rfc5424` depending on the format of the parsed syslog line
- `msg_id` - `MSGID` field from log line in `RFC5424` format. - `msg_id` - `MSGID` field from log line in `RFC5424` format.
@ -134,6 +134,18 @@ For example, the following command starts VictoriaLogs, which writes syslog mess
./victoria-logs -syslog.listenAddr.tcp=:514 -syslog.tenantID.tcp=12:34 ./victoria-logs -syslog.listenAddr.tcp=:514 -syslog.tenantID.tcp=12:34
``` ```
## Stream fields
VictoriaLogs uses `(hostname, app_name, proc_id)` fields as labels for [log streams](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields) by default.
It is possible setting other set of labels via `-syslog.streamFields.tcp` and `-syslog.streamFields.udp` command-line flags
for logs insted via the corresponding `-syslog.listenAddr.tcp` and `-syslog.listenAddr.dup` addresses.
For example, the following command starts VictoriaLogs, which uses `(hostname, app_name)` fields as log stream labels
for logs received at TCP port 514:
```sh
./victoria-logs -syslog.listenAddr.tcp=:514 -syslog.streamFields.tcp='["hostname","app_name"]'
```
## Dropping fields ## Dropping fields
VictoriaLogs supports `-syslog.ignoreFields.tcp` and `-syslog.ignoreFields.udp` command-line flags for skipping VictoriaLogs supports `-syslog.ignoreFields.tcp` and `-syslog.ignoreFields.udp` command-line flags for skipping