mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-10 15:14:09 +00:00
app/vlinsert/syslog: allow changing the default set of log fields to use as stream fields during syslog data ingestion
Thanks to @AndrewChubatiuk for the initial implementation at https://github.com/VictoriaMetrics/VictoriaMetrics/pull/7488 Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7480 See https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/#stream-fields
This commit is contained in:
parent
bab5348b8b
commit
ab2ce3728b
5 changed files with 76 additions and 19 deletions
|
@ -112,19 +112,22 @@ func getExtraFields(r *http.Request) ([]logstorage.Field, error) {
|
|||
}
|
||||
|
||||
// GetCommonParamsForSyslog returns common params needed for parsing syslog messages and storing them to the given tenantID.
|
||||
func GetCommonParamsForSyslog(tenantID logstorage.TenantID, ignoreFields []string, extraFields []logstorage.Field) *CommonParams {
|
||||
func GetCommonParamsForSyslog(tenantID logstorage.TenantID, streamFields, ignoreFields []string, extraFields []logstorage.Field) *CommonParams {
|
||||
// See https://docs.victoriametrics.com/victorialogs/logsql/#unpack_syslog-pipe
|
||||
if streamFields == nil {
|
||||
streamFields = []string{
|
||||
"hostname",
|
||||
"app_name",
|
||||
"proc_id",
|
||||
}
|
||||
}
|
||||
cp := &CommonParams{
|
||||
TenantID: tenantID,
|
||||
TimeField: "timestamp",
|
||||
MsgFields: []string{
|
||||
"message",
|
||||
},
|
||||
StreamFields: []string{
|
||||
"hostname",
|
||||
"app_name",
|
||||
"proc_id",
|
||||
},
|
||||
StreamFields: streamFields,
|
||||
IgnoreFields: ignoreFields,
|
||||
ExtraFields: extraFields,
|
||||
}
|
||||
|
|
|
@ -37,6 +37,11 @@ var (
|
|||
syslogTimezone = flag.String("syslog.timezone", "Local", "Timezone to use when parsing timestamps in RFC3164 syslog messages. Timezone must be a valid IANA Time Zone. "+
|
||||
"For example: America/New_York, Europe/Berlin, Etc/GMT+3 . See https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/")
|
||||
|
||||
streamFieldsTCP = flagutil.NewArrayString("syslog.streamFields.tcp", "Fields to use as log stream labels for logs ingested via the corresponding -syslog.listenAddr.tcp. "+
|
||||
`See https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/#stream-fields`)
|
||||
streamFieldsUDP = flagutil.NewArrayString("syslog.streamFields.udp", "Fields to use as log stream labels for logs ingested via the corresponding -syslog.listenAddr.udp. "+
|
||||
`See https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/#stream-fields`)
|
||||
|
||||
ignoreFieldsTCP = flagutil.NewArrayString("syslog.ignoreFields.tcp", "Fields to ignore at logs ingested via the corresponding -syslog.listenAddr.tcp. "+
|
||||
`See https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/#dropping-fields`)
|
||||
ignoreFieldsUDP = flagutil.NewArrayString("syslog.ignoreFields.udp", "Fields to ignore at logs ingested via the corresponding -syslog.listenAddr.udp. "+
|
||||
|
@ -173,8 +178,14 @@ func runUDPListener(addr string, argIdx int) {
|
|||
|
||||
useLocalTimestamp := useLocalTimestampUDP.GetOptionalArg(argIdx)
|
||||
|
||||
streamFieldsStr := streamFieldsUDP.GetOptionalArg(argIdx)
|
||||
streamFields, err := parseFieldsList(streamFieldsStr)
|
||||
if err != nil {
|
||||
logger.Fatalf("cannot parse -syslog.streamFields.udp=%q for -syslog.listenAddr.udp=%q: %s", streamFieldsStr, addr, err)
|
||||
}
|
||||
|
||||
ignoreFieldsStr := ignoreFieldsUDP.GetOptionalArg(argIdx)
|
||||
ignoreFields, err := parseIgnoreFields(ignoreFieldsStr)
|
||||
ignoreFields, err := parseFieldsList(ignoreFieldsStr)
|
||||
if err != nil {
|
||||
logger.Fatalf("cannot parse -syslog.ignoreFields.udp=%q for -syslog.listenAddr.udp=%q: %s", ignoreFieldsStr, addr, err)
|
||||
}
|
||||
|
@ -187,7 +198,7 @@ func runUDPListener(addr string, argIdx int) {
|
|||
|
||||
doneCh := make(chan struct{})
|
||||
go func() {
|
||||
serveUDP(ln, tenantID, compressMethod, useLocalTimestamp, ignoreFields, extraFields)
|
||||
serveUDP(ln, tenantID, compressMethod, useLocalTimestamp, streamFields, ignoreFields, extraFields)
|
||||
close(doneCh)
|
||||
}()
|
||||
|
||||
|
@ -228,8 +239,14 @@ func runTCPListener(addr string, argIdx int) {
|
|||
|
||||
useLocalTimestamp := useLocalTimestampTCP.GetOptionalArg(argIdx)
|
||||
|
||||
streamFieldsStr := streamFieldsTCP.GetOptionalArg(argIdx)
|
||||
streamFields, err := parseFieldsList(streamFieldsStr)
|
||||
if err != nil {
|
||||
logger.Fatalf("cannot parse -syslog.streamFields.tcp=%q for -syslog.listenAddr.tcp=%q: %s", streamFieldsStr, addr, err)
|
||||
}
|
||||
|
||||
ignoreFieldsStr := ignoreFieldsTCP.GetOptionalArg(argIdx)
|
||||
ignoreFields, err := parseIgnoreFields(ignoreFieldsStr)
|
||||
ignoreFields, err := parseFieldsList(ignoreFieldsStr)
|
||||
if err != nil {
|
||||
logger.Fatalf("cannot parse -syslog.ignoreFields.tcp=%q for -syslog.listenAddr.tcp=%q: %s", ignoreFieldsStr, addr, err)
|
||||
}
|
||||
|
@ -242,7 +259,7 @@ func runTCPListener(addr string, argIdx int) {
|
|||
|
||||
doneCh := make(chan struct{})
|
||||
go func() {
|
||||
serveTCP(ln, tenantID, compressMethod, useLocalTimestamp, ignoreFields, extraFields)
|
||||
serveTCP(ln, tenantID, compressMethod, useLocalTimestamp, streamFields, ignoreFields, extraFields)
|
||||
close(doneCh)
|
||||
}()
|
||||
|
||||
|
@ -264,7 +281,7 @@ func checkCompressMethod(compressMethod, addr, protocol string) {
|
|||
}
|
||||
}
|
||||
|
||||
func serveUDP(ln net.PacketConn, tenantID logstorage.TenantID, compressMethod string, useLocalTimestamp bool, ignoreFields []string, extraFields []logstorage.Field) {
|
||||
func serveUDP(ln net.PacketConn, tenantID logstorage.TenantID, compressMethod string, useLocalTimestamp bool, streamFields, ignoreFields []string, extraFields []logstorage.Field) {
|
||||
gomaxprocs := cgroup.AvailableCPUs()
|
||||
var wg sync.WaitGroup
|
||||
localAddr := ln.LocalAddr()
|
||||
|
@ -272,7 +289,7 @@ func serveUDP(ln net.PacketConn, tenantID logstorage.TenantID, compressMethod st
|
|||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
cp := insertutils.GetCommonParamsForSyslog(tenantID, ignoreFields, extraFields)
|
||||
cp := insertutils.GetCommonParamsForSyslog(tenantID, streamFields, ignoreFields, extraFields)
|
||||
var bb bytesutil.ByteBuffer
|
||||
bb.B = bytesutil.ResizeNoCopyNoOverallocate(bb.B, 64*1024)
|
||||
for {
|
||||
|
@ -306,7 +323,7 @@ func serveUDP(ln net.PacketConn, tenantID logstorage.TenantID, compressMethod st
|
|||
wg.Wait()
|
||||
}
|
||||
|
||||
func serveTCP(ln net.Listener, tenantID logstorage.TenantID, compressMethod string, useLocalTimestamp bool, ignoreFields []string, extraFields []logstorage.Field) {
|
||||
func serveTCP(ln net.Listener, tenantID logstorage.TenantID, compressMethod string, useLocalTimestamp bool, streamFields, ignoreFields []string, extraFields []logstorage.Field) {
|
||||
var cm ingestserver.ConnsMap
|
||||
cm.Init("syslog")
|
||||
|
||||
|
@ -336,7 +353,7 @@ func serveTCP(ln net.Listener, tenantID logstorage.TenantID, compressMethod stri
|
|||
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
cp := insertutils.GetCommonParamsForSyslog(tenantID, ignoreFields, extraFields)
|
||||
cp := insertutils.GetCommonParamsForSyslog(tenantID, streamFields, ignoreFields, extraFields)
|
||||
if err := processStream(c, compressMethod, useLocalTimestamp, cp); err != nil {
|
||||
logger.Errorf("syslog: cannot process TCP data at %q: %s", addr, err)
|
||||
}
|
||||
|
@ -568,7 +585,7 @@ var (
|
|||
udpErrorsTotal = metrics.NewCounter(`vl_udp_errors_total{type="syslog"}`)
|
||||
)
|
||||
|
||||
func parseIgnoreFields(s string) ([]string, error) {
|
||||
func parseFieldsList(s string) ([]string, error) {
|
||||
if s == "" {
|
||||
return nil, nil
|
||||
}
|
||||
|
|
|
@ -19,6 +19,7 @@ according to [these docs](https://docs.victoriametrics.com/victorialogs/quicksta
|
|||
* FEATURE: [`_time` filter](https://docs.victoriametrics.com/victorialogs/logsql/#time-filter): allow specifying offset without time range. For example, `_time:offset 1d` matches all the logs until `now-1d` in the [`_time` field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#time-field). This is useful when building graphs for time ranges with some offset in the past.
|
||||
* FEATURE: [`/select/logsql/tail` HTTP endpoint](): support for `offset` query arg, which can be used for delayed emission of matching logs during live tailing. Thanks to @Fusl for the initial idea and implementation in [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/7428).
|
||||
* FEATURE: [vlogscli](https://docs.victoriametrics.com/victorialogs/querying/vlogscli/): allow enabling and disabling wrapping of long lines, which do not fit screen width, with `\wrap_long_lines` command.
|
||||
* FEATURE: [syslog data ingestion](https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/): allow overriding default [stream fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields) with the given [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) during data ingestion. See [these docs](https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/#stream-fields). See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7480).
|
||||
* FEATURE: [syslog data ingestion](https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/): allow adding arbitrary [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) via `[label1=value1 ... labelN=valueN]` syntax inside Syslog messages. For example, `<165>1 2024-06-03T17:42:00.000Z example.com appname 12345 ID47 [field1=value1 field2=value2] some message`.
|
||||
* FEATURE: [syslog data ingestion](https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/): allow dropping the specified [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) during data ingestion. See [these docs](https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/#dropping-fields).
|
||||
* FEATURE: [syslog data ingestion](https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/): allow adding the specified [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) during data ingestion. See [these docs](https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/#adding-extra-fields).
|
||||
|
|
|
@ -430,6 +430,22 @@ Pass `-help` to VictoriaLogs in order to see the list of supported command-line
|
|||
Compression method for syslog messages received at the corresponding -syslog.listenAddr.udp. Supported values: none, gzip, deflate. See https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/#compression
|
||||
Supports an array of values separated by comma or specified via multiple flags.
|
||||
Value can contain comma inside single-quoted or double-quoted string, {}, [] and () braces.
|
||||
-syslog.extraFields.tcp array
|
||||
Fields to add to logs ingested via the corresponding -syslog.listenAddr.tcp. See https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/#adding-extra-fields
|
||||
Supports an array of values separated by comma or specified via multiple flags.
|
||||
Value can contain comma inside single-quoted or double-quoted string, {}, [] and () braces.
|
||||
-syslog.extraFields.udp array
|
||||
Fields to add to logs ingested via the corresponding -syslog.listenAddr.udp. See https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/#adding-extra-fields
|
||||
Supports an array of values separated by comma or specified via multiple flags.
|
||||
Value can contain comma inside single-quoted or double-quoted string, {}, [] and () braces.
|
||||
-syslog.ignoreFields.tcp array
|
||||
Fields to ignore at logs ingested via the corresponding -syslog.listenAddr.tcp. See https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/#dropping-fields
|
||||
Supports an array of values separated by comma or specified via multiple flags.
|
||||
Value can contain comma inside single-quoted or double-quoted string, {}, [] and () braces.
|
||||
-syslog.ignoreFields.udp array
|
||||
Fields to ignore at logs ingested via the corresponding -syslog.listenAddr.udp. See https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/#dropping-fields
|
||||
Supports an array of values separated by comma or specified via multiple flags.
|
||||
Value can contain comma inside single-quoted or double-quoted string, {}, [] and () braces.
|
||||
-syslog.listenAddr.tcp array
|
||||
Comma-separated list of TCP addresses to listen to for Syslog messages. See https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/
|
||||
Supports an array of values separated by comma or specified via multiple flags.
|
||||
|
@ -438,12 +454,20 @@ Pass `-help` to VictoriaLogs in order to see the list of supported command-line
|
|||
Comma-separated list of UDP address to listen to for Syslog messages. See https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/
|
||||
Supports an array of values separated by comma or specified via multiple flags.
|
||||
Value can contain comma inside single-quoted or double-quoted string, {}, [] and () braces.
|
||||
-syslog.streamFields.tcp array
|
||||
Fields to use as log stream labels for logs ingested via the corresponding -syslog.listenAddr.tcp. See https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/#stream-fields
|
||||
Supports an array of values separated by comma or specified via multiple flags.
|
||||
Value can contain comma inside single-quoted or double-quoted string, {}, [] and () braces.
|
||||
-syslog.streamFields.udp array
|
||||
Fields to use as log stream labels for logs ingested via the corresponding -syslog.listenAddr.udp. See https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/#stream-fields
|
||||
Supports an array of values separated by comma or specified via multiple flags.
|
||||
Value can contain comma inside single-quoted or double-quoted string, {}, [] and () braces.
|
||||
-syslog.tenantID.tcp array
|
||||
TenantID for logs ingested via the corresponding -syslog.listenAddr.tcp. See https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/
|
||||
TenantID for logs ingested via the corresponding -syslog.listenAddr.tcp. See https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/#multitenancy
|
||||
Supports an array of values separated by comma or specified via multiple flags.
|
||||
Value can contain comma inside single-quoted or double-quoted string, {}, [] and () braces.
|
||||
-syslog.tenantID.udp array
|
||||
TenantID for logs ingested via the corresponding -syslog.listenAddr.udp. See https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/
|
||||
TenantID for logs ingested via the corresponding -syslog.listenAddr.udp. See https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/#multitenancy
|
||||
Supports an array of values separated by comma or specified via multiple flags.
|
||||
Value can contain comma inside single-quoted or double-quoted string, {}, [] and () braces.
|
||||
-syslog.timezone string
|
||||
|
|
|
@ -41,8 +41,8 @@ from the received Syslog lines:
|
|||
|
||||
- [`_time`](https://docs.victoriametrics.com/victorialogs/keyconcepts/#time-field) - log timestamp. See also [log timestamps](#log-timestamps)
|
||||
- [`_msg`](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field) - the `MESSAGE` field from the supported syslog formats above
|
||||
- `hostname`, `app_name` and `proc_id` - [stream fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields) for unique identification
|
||||
over every log stream
|
||||
- `hostname`, `app_name` and `proc_id` - for unique identification of [log streams](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields).
|
||||
It is possible to change the list of fields for log streams - see [these docs](#stream-fields).
|
||||
- `priority`, `facility` and `severity` - these fields are extracted from `<PRI>` field
|
||||
- `format` - this field is set to either `rfc3164` or `rfc5424` depending on the format of the parsed syslog line
|
||||
- `msg_id` - `MSGID` field from log line in `RFC5424` format.
|
||||
|
@ -134,6 +134,18 @@ For example, the following command starts VictoriaLogs, which writes syslog mess
|
|||
./victoria-logs -syslog.listenAddr.tcp=:514 -syslog.tenantID.tcp=12:34
|
||||
```
|
||||
|
||||
## Stream fields
|
||||
|
||||
VictoriaLogs uses `(hostname, app_name, proc_id)` fields as labels for [log streams](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields) by default.
|
||||
It is possible setting other set of labels via `-syslog.streamFields.tcp` and `-syslog.streamFields.udp` command-line flags
|
||||
for logs insted via the corresponding `-syslog.listenAddr.tcp` and `-syslog.listenAddr.dup` addresses.
|
||||
For example, the following command starts VictoriaLogs, which uses `(hostname, app_name)` fields as log stream labels
|
||||
for logs received at TCP port 514:
|
||||
|
||||
```sh
|
||||
./victoria-logs -syslog.listenAddr.tcp=:514 -syslog.streamFields.tcp='["hostname","app_name"]'
|
||||
```
|
||||
|
||||
## Dropping fields
|
||||
|
||||
VictoriaLogs supports `-syslog.ignoreFields.tcp` and `-syslog.ignoreFields.udp` command-line flags for skipping
|
||||
|
|
Loading…
Reference in a new issue