diff --git a/app/vlinsert/insertutils/common_params.go b/app/vlinsert/insertutils/common_params.go index 0077a19a0..de096ae76 100644 --- a/app/vlinsert/insertutils/common_params.go +++ b/app/vlinsert/insertutils/common_params.go @@ -112,7 +112,7 @@ func getExtraFields(r *http.Request) ([]logstorage.Field, error) { } // GetCommonParamsForSyslog returns common params needed for parsing syslog messages and storing them to the given tenantID. -func GetCommonParamsForSyslog(tenantID logstorage.TenantID) *CommonParams { +func GetCommonParamsForSyslog(tenantID logstorage.TenantID, ignoreFields []string, extraFields []logstorage.Field) *CommonParams { // See https://docs.victoriametrics.com/victorialogs/logsql/#unpack_syslog-pipe cp := &CommonParams{ TenantID: tenantID, @@ -125,6 +125,8 @@ func GetCommonParamsForSyslog(tenantID logstorage.TenantID) *CommonParams { "app_name", "proc_id", }, + IgnoreFields: ignoreFields, + ExtraFields: extraFields, } return cp diff --git a/app/vlinsert/syslog/syslog.go b/app/vlinsert/syslog/syslog.go index 83bf6944c..b03964556 100644 --- a/app/vlinsert/syslog/syslog.go +++ b/app/vlinsert/syslog/syslog.go @@ -3,11 +3,13 @@ package syslog import ( "bufio" "crypto/tls" + "encoding/json" "errors" "flag" "fmt" "io" "net" + "sort" "strconv" "strings" "sync" @@ -35,10 +37,20 @@ var ( syslogTimezone = flag.String("syslog.timezone", "Local", "Timezone to use when parsing timestamps in RFC3164 syslog messages. Timezone must be a valid IANA Time Zone. "+ "For example: America/New_York, Europe/Berlin, Etc/GMT+3 . See https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/") - syslogTenantIDTCP = flagutil.NewArrayString("syslog.tenantID.tcp", "TenantID for logs ingested via the corresponding -syslog.listenAddr.tcp. "+ - "See https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/") - syslogTenantIDUDP = flagutil.NewArrayString("syslog.tenantID.udp", "TenantID for logs ingested via the corresponding -syslog.listenAddr.udp. "+ - "See https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/") + ignoreFieldsTCP = flagutil.NewArrayString("syslog.ignoreFields.tcp", "Fields to ignore at logs ingested via the corresponding -syslog.listenAddr.tcp. "+ + `See https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/#dropping-fields`) + ignoreFieldsUDP = flagutil.NewArrayString("syslog.ignoreFields.udp", "Fields to ignore at logs ingested via the corresponding -syslog.listenAddr.udp. "+ + `See https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/#dropping-fields`) + + extraFieldsTCP = flagutil.NewArrayString("syslog.extraFields.tcp", "Fields to add to logs ingested via the corresponding -syslog.listenAddr.tcp. "+ + `See https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/#adding-extra-fields`) + extraFieldsUDP = flagutil.NewArrayString("syslog.extraFields.udp", "Fields to add to logs ingested via the corresponding -syslog.listenAddr.udp. "+ + `See https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/#adding-extra-fields`) + + tenantIDTCP = flagutil.NewArrayString("syslog.tenantID.tcp", "TenantID for logs ingested via the corresponding -syslog.listenAddr.tcp. "+ + "See https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/#multitenancy") + tenantIDUDP = flagutil.NewArrayString("syslog.tenantID.udp", "TenantID for logs ingested via the corresponding -syslog.listenAddr.udp. "+ + "See https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/#multitenancy") listenAddrTCP = flagutil.NewArrayString("syslog.listenAddr.tcp", "Comma-separated list of TCP addresses to listen to for Syslog messages. "+ "See https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/") @@ -150,7 +162,7 @@ func runUDPListener(addr string, argIdx int) { logger.Fatalf("cannot start UDP syslog server at %q: %s", addr, err) } - tenantIDStr := syslogTenantIDUDP.GetOptionalArg(argIdx) + tenantIDStr := tenantIDUDP.GetOptionalArg(argIdx) tenantID, err := logstorage.ParseTenantID(tenantIDStr) if err != nil { logger.Fatalf("cannot parse -syslog.tenantID.udp=%q for -syslog.listenAddr.udp=%q: %s", tenantIDStr, addr, err) @@ -161,9 +173,21 @@ func runUDPListener(addr string, argIdx int) { useLocalTimestamp := useLocalTimestampUDP.GetOptionalArg(argIdx) + ignoreFieldsStr := ignoreFieldsUDP.GetOptionalArg(argIdx) + ignoreFields, err := parseIgnoreFields(ignoreFieldsStr) + if err != nil { + logger.Fatalf("cannot parse -syslog.ignoreFields.udp=%q for -syslog.listenAddr.udp=%q: %s", ignoreFieldsStr, addr, err) + } + + extraFieldsStr := extraFieldsUDP.GetOptionalArg(argIdx) + extraFields, err := parseExtraFields(extraFieldsStr) + if err != nil { + logger.Fatalf("cannot parse -syslog.extraFields.udp=%q for -syslog.listenAddr.udp=%q: %s", extraFieldsStr, addr, err) + } + doneCh := make(chan struct{}) go func() { - serveUDP(ln, tenantID, compressMethod, useLocalTimestamp) + serveUDP(ln, tenantID, compressMethod, useLocalTimestamp, ignoreFields, extraFields) close(doneCh) }() @@ -193,7 +217,7 @@ func runTCPListener(addr string, argIdx int) { logger.Fatalf("syslog: cannot start TCP listener at %s: %s", addr, err) } - tenantIDStr := syslogTenantIDTCP.GetOptionalArg(argIdx) + tenantIDStr := tenantIDTCP.GetOptionalArg(argIdx) tenantID, err := logstorage.ParseTenantID(tenantIDStr) if err != nil { logger.Fatalf("cannot parse -syslog.tenantID.tcp=%q for -syslog.listenAddr.tcp=%q: %s", tenantIDStr, addr, err) @@ -204,9 +228,21 @@ func runTCPListener(addr string, argIdx int) { useLocalTimestamp := useLocalTimestampTCP.GetOptionalArg(argIdx) + ignoreFieldsStr := ignoreFieldsTCP.GetOptionalArg(argIdx) + ignoreFields, err := parseIgnoreFields(ignoreFieldsStr) + if err != nil { + logger.Fatalf("cannot parse -syslog.ignoreFields.tcp=%q for -syslog.listenAddr.tcp=%q: %s", ignoreFieldsStr, addr, err) + } + + extraFieldsStr := extraFieldsTCP.GetOptionalArg(argIdx) + extraFields, err := parseExtraFields(extraFieldsStr) + if err != nil { + logger.Fatalf("cannot parse -syslog.extraFields.tcp=%q for -syslog.listenAddr.tcp=%q: %s", extraFieldsStr, addr, err) + } + doneCh := make(chan struct{}) go func() { - serveTCP(ln, tenantID, compressMethod, useLocalTimestamp) + serveTCP(ln, tenantID, compressMethod, useLocalTimestamp, ignoreFields, extraFields) close(doneCh) }() @@ -228,7 +264,7 @@ func checkCompressMethod(compressMethod, addr, protocol string) { } } -func serveUDP(ln net.PacketConn, tenantID logstorage.TenantID, compressMethod string, useLocalTimestamp bool) { +func serveUDP(ln net.PacketConn, tenantID logstorage.TenantID, compressMethod string, useLocalTimestamp bool, ignoreFields []string, extraFields []logstorage.Field) { gomaxprocs := cgroup.AvailableCPUs() var wg sync.WaitGroup localAddr := ln.LocalAddr() @@ -236,7 +272,7 @@ func serveUDP(ln net.PacketConn, tenantID logstorage.TenantID, compressMethod st wg.Add(1) go func() { defer wg.Done() - cp := insertutils.GetCommonParamsForSyslog(tenantID) + cp := insertutils.GetCommonParamsForSyslog(tenantID, ignoreFields, extraFields) var bb bytesutil.ByteBuffer bb.B = bytesutil.ResizeNoCopyNoOverallocate(bb.B, 64*1024) for { @@ -270,7 +306,7 @@ func serveUDP(ln net.PacketConn, tenantID logstorage.TenantID, compressMethod st wg.Wait() } -func serveTCP(ln net.Listener, tenantID logstorage.TenantID, compressMethod string, useLocalTimestamp bool) { +func serveTCP(ln net.Listener, tenantID logstorage.TenantID, compressMethod string, useLocalTimestamp bool, ignoreFields []string, extraFields []logstorage.Field) { var cm ingestserver.ConnsMap cm.Init("syslog") @@ -300,7 +336,7 @@ func serveTCP(ln net.Listener, tenantID logstorage.TenantID, compressMethod stri wg.Add(1) go func() { - cp := insertutils.GetCommonParamsForSyslog(tenantID) + cp := insertutils.GetCommonParamsForSyslog(tenantID, ignoreFields, extraFields) if err := processStream(c, compressMethod, useLocalTimestamp, cp); err != nil { logger.Errorf("syslog: cannot process TCP data at %q: %s", addr, err) } @@ -531,3 +567,35 @@ var ( udpRequestsTotal = metrics.NewCounter(`vl_udp_reqests_total{type="syslog"}`) udpErrorsTotal = metrics.NewCounter(`vl_udp_errors_total{type="syslog"}`) ) + +func parseIgnoreFields(s string) ([]string, error) { + if s == "" { + return nil, nil + } + + var a []string + err := json.Unmarshal([]byte(s), &a) + return a, err +} + +func parseExtraFields(s string) ([]logstorage.Field, error) { + if s == "" { + return nil, nil + } + + var m map[string]string + if err := json.Unmarshal([]byte(s), &m); err != nil { + return nil, err + } + fields := make([]logstorage.Field, 0, len(m)) + for k, v := range m { + fields = append(fields, logstorage.Field{ + Name: k, + Value: v, + }) + } + sort.Slice(fields, func(i, j int) bool { + return fields[i].Name < fields[j].Name + }) + return fields, nil +} diff --git a/docs/VictoriaLogs/CHANGELOG.md b/docs/VictoriaLogs/CHANGELOG.md index 6a7f96756..0fbf125a1 100644 --- a/docs/VictoriaLogs/CHANGELOG.md +++ b/docs/VictoriaLogs/CHANGELOG.md @@ -20,6 +20,8 @@ according to [these docs](https://docs.victoriametrics.com/victorialogs/quicksta * FEATURE: [`/select/logsql/tail` HTTP endpoint](): support for `offset` query arg, which can be used for delayed emission of matching logs during live tailing. Thanks to @Fusl for the initial idea and implementation in [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/7428). * FEATURE: [vlogscli](https://docs.victoriametrics.com/victorialogs/querying/vlogscli/): allow enabling and disabling wrapping of long lines, which do not fit screen width, with `\wrap_long_lines` command. * FEATURE: [syslog data ingestion](https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/): allow adding arbitrary [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) via `[label1=value1 ... labelN=valueN]` syntax inside Syslog messages. For example, `<165>1 2024-06-03T17:42:00.000Z example.com appname 12345 ID47 [field1=value1 field2=value2] some message`. +* FEATURE: [syslog data ingestion](https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/): allow dropping the specified [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) during data ingestion. See [these docs](https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/#dropping-fields). +* FEATURE: [syslog data ingestion](https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/): allow adding the specified [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) during data ingestion. See [these docs](https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/#adding-extra-fields). * BUGFIX: [HTTP querying APIs](https://docs.victoriametrics.com/victorialogs/querying/#http-api): properly take into account the `end` query arg when calculating time range for [`_time:duration` filter](https://docs.victoriametrics.com/victorialogs/logsql/#time-filter). Previously the `_time:duration` filter was treated as `_time:[now-duration, now)`, while it should be treated as `_time:[end-duration, end)`. diff --git a/docs/VictoriaLogs/data-ingestion/syslog.md b/docs/VictoriaLogs/data-ingestion/syslog.md index 873e98ce2..2ca594311 100644 --- a/docs/VictoriaLogs/data-ingestion/syslog.md +++ b/docs/VictoriaLogs/data-ingestion/syslog.md @@ -71,6 +71,8 @@ See also: - [Security](#security) - [Compression](#compression) - [Multitenancy](#multitenancy) +- [Dropping fields](#dropping-fields) +- [Adding extra fields](#adding-extra-fields) - [Data ingestion troubleshooting](https://docs.victoriametrics.com/victorialogs/data-ingestion/#troubleshooting). - [How to query VictoriaLogs](https://docs.victoriametrics.com/victorialogs/querying/). @@ -132,6 +134,28 @@ For example, the following command starts VictoriaLogs, which writes syslog mess ./victoria-logs -syslog.listenAddr.tcp=:514 -syslog.tenantID.tcp=12:34 ``` +## Dropping fields + +VictoriaLogs supports `-syslog.ignoreFields.tcp` and `-syslog.ignoreFields.udp` command-line flags for skipping +the given [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) during inestion +of Syslog logs into `-syslog.listenAddr.tcp` and `-syslog.listenAddr.udp` addresses. +For example, the following command starts VictoriaLogs, which drops `proc_id` and `msg_id` fields from logs received at TCP port 514: + +```sh +./victoria-logs -syslog.listenAddr.tcp=:514 -syslog.ignoreFields.tcp='["prod_id","msg_id"]' +``` + +## Adding extra fields + +VictoriaLogs supports -`syslog.extraFields.tcp` and `-syslog.extraFields.udp` command-line flags for adding +the given [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) during data ingestion +of Syslog logs into `-syslog.listenAddr.tcp` and `-syslog.listenAddr.udp` addresses. +For example, the following command starts VictoriaLogs, which adds `source=foo` and `abc=def` fields to logs received at TCP port 514: + +```sh +./victoria-logs -syslog.listenAddr.tcp=:514 -syslog.extraFields.tcp='{"source":"foo","abc":"def"}' +``` + ## Multiple configs VictoriaLogs can accept syslog messages via multiple TCP and UDP ports with individual configurations for [log timestamps](#log-timestamps), [compression](#compression), [security](#security)