diff --git a/app/vlinsert/syslog/syslog.go b/app/vlinsert/syslog/syslog.go index 935c6b7ee8..a2481d1b1f 100644 --- a/app/vlinsert/syslog/syslog.go +++ b/app/vlinsert/syslog/syslog.go @@ -46,24 +46,29 @@ var ( "See https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/") tlsEnable = flagutil.NewArrayBool("syslog.tls", "Whether to enable TLS for receiving syslog messages at the corresponding -syslog.listenAddr.tcp. "+ - "The corresponding -syslog.tlsCertFile and -syslog.tlsKeyFile must be set if -syslog.tls is set. See https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/") + "The corresponding -syslog.tlsCertFile and -syslog.tlsKeyFile must be set if -syslog.tls is set. See https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/#security") tlsCertFile = flagutil.NewArrayString("syslog.tlsCertFile", "Path to file with TLS certificate for the corresponding -syslog.listenAddr.tcp if the corresponding -syslog.tls is set. "+ "Prefer ECDSA certs instead of RSA certs as RSA certs are slower. The provided certificate file is automatically re-read every second, so it can be dynamically updated. "+ - "See https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/") + "See https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/#security") tlsKeyFile = flagutil.NewArrayString("syslog.tlsKeyFile", "Path to file with TLS key for the corresponding -syslog.listenAddr.tcp if the corresponding -syslog.tls is set. "+ "The provided key file is automatically re-read every second, so it can be dynamically updated. "+ - "See https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/") + "See https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/#security") tlsCipherSuites = flagutil.NewArrayString("syslog.tlsCipherSuites", "Optional list of TLS cipher suites for -syslog.listenAddr.tcp if -syslog.tls is set. "+ "See the list of supported cipher suites at https://pkg.go.dev/crypto/tls#pkg-constants . "+ - "See also https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/") + "See also https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/#security") tlsMinVersion = flag.String("syslog.tlsMinVersion", "TLS13", "The minimum TLS version to use for -syslog.listenAddr.tcp if -syslog.tls is set. "+ "Supported values: TLS10, TLS11, TLS12, TLS13. "+ - "See https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/") + "See https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/#security") compressMethodTCP = flagutil.NewArrayString("syslog.compressMethod.tcp", "Compression method for syslog messages received at the corresponding -syslog.listenAddr.tcp. "+ - "Supported values: none, gzip, deflate. See https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/") + "Supported values: none, gzip, deflate. See https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/#compression") compressMethodUDP = flagutil.NewArrayString("syslog.compressMethod.udp", "Compression method for syslog messages received at the corresponding -syslog.listenAddr.udp. "+ - "Supported values: none, gzip, deflate. See https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/") + "Supported values: none, gzip, deflate. See https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/#compression") + + useLocalTimestampTCP = flagutil.NewArrayBool("syslog.useLocalTimestamp.tcp", "Whether to use local timestamp instead of the original timestamp for the ingested syslog messages "+ + "at the corresponding -syslog.listenAddr.tcp. See https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/#log-timestamps") + useLocalTimestampUDP = flagutil.NewArrayBool("syslog.useLocalTimestamp.udp", "Whether to use local timestamp instead of the original timestamp for the ingested syslog messages "+ + "at the corresponding -syslog.listenAddr.udp. See https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/#log-timestamps") ) // MustInit initializes syslog parser at the given -syslog.listenAddr.tcp and -syslog.listenAddr.udp ports @@ -154,17 +159,21 @@ func runUDPListener(addr string, argIdx int) { compressMethod := compressMethodUDP.GetOptionalArg(argIdx) checkCompressMethod(compressMethod, addr, "udp") + useLocalTimestamp := useLocalTimestampUDP.GetOptionalArg(argIdx) + doneCh := make(chan struct{}) go func() { - serveUDP(ln, tenantID, compressMethod) + serveUDP(ln, tenantID, compressMethod, useLocalTimestamp) close(doneCh) }() + logger.Infof("started accepting syslog messages at -syslog.listenAddr.udp=%q", addr) <-workersStopCh if err := ln.Close(); err != nil { logger.Fatalf("syslog: cannot close UDP listener at %s: %s", addr, err) } <-doneCh + logger.Infof("finished accepting syslog messages at -syslog.listenAddr.udp=%q", addr) } func runTCPListener(addr string, argIdx int) { @@ -193,17 +202,21 @@ func runTCPListener(addr string, argIdx int) { compressMethod := compressMethodTCP.GetOptionalArg(argIdx) checkCompressMethod(compressMethod, addr, "tcp") + useLocalTimestamp := useLocalTimestampTCP.GetOptionalArg(argIdx) + doneCh := make(chan struct{}) go func() { - serveTCP(ln, tenantID, compressMethod) + serveTCP(ln, tenantID, compressMethod, useLocalTimestamp) close(doneCh) }() + logger.Infof("started accepting syslog messages at -syslog.listenAddr.tcp=%q", addr) <-workersStopCh if err := ln.Close(); err != nil { logger.Fatalf("syslog: cannot close TCP listener at %s: %s", addr, err) } <-doneCh + logger.Infof("finished accepting syslog messages at -syslog.listenAddr.tcp=%q", addr) } func checkCompressMethod(compressMethod, addr, protocol string) { @@ -215,7 +228,7 @@ func checkCompressMethod(compressMethod, addr, protocol string) { } } -func serveUDP(ln net.PacketConn, tenantID logstorage.TenantID, compressMethod string) { +func serveUDP(ln net.PacketConn, tenantID logstorage.TenantID, compressMethod string, useLocalTimestamp bool) { gomaxprocs := cgroup.AvailableCPUs() var wg sync.WaitGroup localAddr := ln.LocalAddr() @@ -248,7 +261,7 @@ func serveUDP(ln net.PacketConn, tenantID logstorage.TenantID, compressMethod st } bb.B = bb.B[:n] udpRequestsTotal.Inc() - if err := processStream(bb.NewReader(), compressMethod, cp); err != nil { + if err := processStream(bb.NewReader(), compressMethod, useLocalTimestamp, cp); err != nil { logger.Errorf("syslog: cannot process UDP data from %s at %s: %s", remoteAddr, localAddr, err) } } @@ -257,7 +270,7 @@ func serveUDP(ln net.PacketConn, tenantID logstorage.TenantID, compressMethod st wg.Wait() } -func serveTCP(ln net.Listener, tenantID logstorage.TenantID, compressMethod string) { +func serveTCP(ln net.Listener, tenantID logstorage.TenantID, compressMethod string, useLocalTimestamp bool) { var cm ingestserver.ConnsMap cm.Init("syslog") @@ -288,7 +301,7 @@ func serveTCP(ln net.Listener, tenantID logstorage.TenantID, compressMethod stri wg.Add(1) go func() { cp := insertutils.GetCommonParamsForSyslog(tenantID) - if err := processStream(c, compressMethod, cp); err != nil { + if err := processStream(c, compressMethod, useLocalTimestamp, cp); err != nil { logger.Errorf("syslog: cannot process TCP data at %q: %s", addr, err) } @@ -303,19 +316,19 @@ func serveTCP(ln net.Listener, tenantID logstorage.TenantID, compressMethod stri } // processStream parses a stream of syslog messages from r and ingests them into vlstorage. -func processStream(r io.Reader, compressMethod string, cp *insertutils.CommonParams) error { +func processStream(r io.Reader, compressMethod string, useLocalTimestamp bool, cp *insertutils.CommonParams) error { if err := vlstorage.CanWriteData(); err != nil { return err } lmp := cp.NewLogMessageProcessor() - err := processStreamInternal(r, compressMethod, lmp) + err := processStreamInternal(r, compressMethod, useLocalTimestamp, lmp) lmp.MustClose() return err } -func processStreamInternal(r io.Reader, compressMethod string, lmp insertutils.LogMessageProcessor) error { +func processStreamInternal(r io.Reader, compressMethod string, useLocalTimestamp bool, lmp insertutils.LogMessageProcessor) error { switch compressMethod { case "", "none": case "gzip": @@ -334,7 +347,7 @@ func processStreamInternal(r io.Reader, compressMethod string, lmp insertutils.L logger.Panicf("BUG: unsupported compressMethod=%q; supported values: none, gzip, deflate", compressMethod) } - err := processUncompressedStream(r, lmp) + err := processUncompressedStream(r, useLocalTimestamp, lmp) switch compressMethod { case "gzip": @@ -348,7 +361,7 @@ func processStreamInternal(r io.Reader, compressMethod string, lmp insertutils.L return err } -func processUncompressedStream(r io.Reader, lmp insertutils.LogMessageProcessor) error { +func processUncompressedStream(r io.Reader, useLocalTimestamp bool, lmp insertutils.LogMessageProcessor) error { wcr := writeconcurrencylimiter.GetReader(r) defer writeconcurrencylimiter.PutReader(wcr) @@ -364,7 +377,7 @@ func processUncompressedStream(r io.Reader, lmp insertutils.LogMessageProcessor) } currentYear := int(globalCurrentYear.Load()) - err := processLine(slr.line, currentYear, globalTimezone, lmp) + err := processLine(slr.line, currentYear, globalTimezone, useLocalTimestamp, lmp) if err != nil { errorsTotal.Inc() return fmt.Errorf("cannot read line #%d: %s", n, err) @@ -486,13 +499,20 @@ func putSyslogLineReader(slr *syslogLineReader) { var syslogLineReaderPool sync.Pool -func processLine(line []byte, currentYear int, timezone *time.Location, lmp insertutils.LogMessageProcessor) error { +func processLine(line []byte, currentYear int, timezone *time.Location, useLocalTimestamp bool, lmp insertutils.LogMessageProcessor) error { p := logstorage.GetSyslogParser(currentYear, timezone) lineStr := bytesutil.ToUnsafeString(line) p.Parse(lineStr) - ts, err := insertutils.ExtractTimestampRFC3339NanoFromFields("timestamp", p.Fields) - if err != nil { - return fmt.Errorf("cannot get timestamp from syslog line %q: %w", line, err) + + var ts int64 + if useLocalTimestamp { + ts = time.Now().UnixNano() + } else { + nsecs, err := insertutils.ExtractTimestampRFC3339NanoFromFields("timestamp", p.Fields) + if err != nil { + return fmt.Errorf("cannot get timestamp from syslog line %q: %w", line, err) + } + ts = nsecs } logstorage.RenameField(p.Fields, "message", "_msg") lmp.AddRow(ts, p.Fields) diff --git a/app/vlinsert/syslog/syslog_test.go b/app/vlinsert/syslog/syslog_test.go index 7eb329b852..8c96ee6640 100644 --- a/app/vlinsert/syslog/syslog_test.go +++ b/app/vlinsert/syslog/syslog_test.go @@ -86,7 +86,7 @@ func TestProcessStreamInternal_Success(t *testing.T) { tlp := &insertutils.TestLogMessageProcessor{} r := bytes.NewBufferString(data) - if err := processStreamInternal(r, "", tlp); err != nil { + if err := processStreamInternal(r, "", false, tlp); err != nil { t.Fatalf("unexpected error: %s", err) } if err := tlp.Verify(rowsExpected, timestampsExpected, resultExpected); err != nil { @@ -116,7 +116,7 @@ func TestProcessStreamInternal_Failure(t *testing.T) { tlp := &insertutils.TestLogMessageProcessor{} r := bytes.NewBufferString(data) - if err := processStreamInternal(r, "", tlp); err == nil { + if err := processStreamInternal(r, "", false, tlp); err == nil { t.Fatalf("expecting non-nil error") } } diff --git a/docs/VictoriaLogs/CHANGELOG.md b/docs/VictoriaLogs/CHANGELOG.md index 7535df9569..d21c14e448 100644 --- a/docs/VictoriaLogs/CHANGELOG.md +++ b/docs/VictoriaLogs/CHANGELOG.md @@ -19,6 +19,8 @@ according to [these docs](https://docs.victoriametrics.com/victorialogs/quicksta ## tip +* FEATURE: add `-syslog.useLocalTimestamp.tcp` and `-syslog.useLocalTimestamp.udp` command-line flags, which could be used for using the local timestamp as [`_time` field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#time-field) for the logs ingested via the corresponding `-syslog.listenAddr.tcp` / `-syslog.listenAddr.udp`. By default the timestap from the syslog message is used as [`_time` field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#time-field). See [these docs](https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/). + ## [v0.26.1](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.26.1-victorialogs) Released at 2024-07-01 diff --git a/docs/VictoriaLogs/README.md b/docs/VictoriaLogs/README.md index 23190f985f..fe46fa772c 100644 --- a/docs/VictoriaLogs/README.md +++ b/docs/VictoriaLogs/README.md @@ -307,11 +307,11 @@ Pass `-help` to VictoriaLogs in order to see the list of supported command-line -storageDataPath string Path to directory where to store VictoriaLogs data; see https://docs.victoriametrics.com/victorialogs/#storage (default "victoria-logs-data") -syslog.compressMethod.tcp array - Compression method for syslog messages received at the corresponding -syslog.listenAddr.tcp. Supported values: none, gzip, deflate. See https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/ + Compression method for syslog messages received at the corresponding -syslog.listenAddr.tcp. Supported values: none, gzip, deflate. See https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/#compression Supports an array of values separated by comma or specified via multiple flags. Value can contain comma inside single-quoted or double-quoted string, {}, [] and () braces. -syslog.compressMethod.udp array - Compression method for syslog messages received at the corresponding -syslog.listenAddr.udp. Supported values: none, gzip, deflate. See https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/ + Compression method for syslog messages received at the corresponding -syslog.listenAddr.udp. Supported values: none, gzip, deflate. See https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/#compression Supports an array of values separated by comma or specified via multiple flags. Value can contain comma inside single-quoted or double-quoted string, {}, [] and () braces. -syslog.listenAddr.tcp array @@ -333,23 +333,31 @@ Pass `-help` to VictoriaLogs in order to see the list of supported command-line -syslog.timezone string Timezone to use when parsing timestamps in RFC3164 syslog messages. Timezone must be a valid IANA Time Zone. For example: America/New_York, Europe/Berlin, Etc/GMT+3 . See https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/ (default "Local") -syslog.tls array - Whether to enable TLS for receiving syslog messages at the corresponding -syslog.listenAddr.tcp. The corresponding -syslog.tlsCertFile and -syslog.tlsKeyFile must be set if -syslog.tls is set. See https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/ + Whether to enable TLS for receiving syslog messages at the corresponding -syslog.listenAddr.tcp. The corresponding -syslog.tlsCertFile and -syslog.tlsKeyFile must be set if -syslog.tls is set. See https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/#security Supports array of values separated by comma or specified via multiple flags. Empty values are set to false. -syslog.tlsCertFile array - Path to file with TLS certificate for the corresponding -syslog.listenAddr.tcp if the corresponding -syslog.tls is set. Prefer ECDSA certs instead of RSA certs as RSA certs are slower. The provided certificate file is automatically re-read every second, so it can be dynamically updated. See https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/ + Path to file with TLS certificate for the corresponding -syslog.listenAddr.tcp if the corresponding -syslog.tls is set. Prefer ECDSA certs instead of RSA certs as RSA certs are slower. The provided certificate file is automatically re-read every second, so it can be dynamically updated. See https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/#security Supports an array of values separated by comma or specified via multiple flags. Value can contain comma inside single-quoted or double-quoted string, {}, [] and () braces. -syslog.tlsCipherSuites array - Optional list of TLS cipher suites for -syslog.listenAddr.tcp if -syslog.tls is set. See the list of supported cipher suites at https://pkg.go.dev/crypto/tls#pkg-constants . See also https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/ + Optional list of TLS cipher suites for -syslog.listenAddr.tcp if -syslog.tls is set. See the list of supported cipher suites at https://pkg.go.dev/crypto/tls#pkg-constants . See also https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/#security Supports an array of values separated by comma or specified via multiple flags. Value can contain comma inside single-quoted or double-quoted string, {}, [] and () braces. -syslog.tlsKeyFile array - Path to file with TLS key for the corresponding -syslog.listenAddr.tcp if the corresponding -syslog.tls is set. The provided key file is automatically re-read every second, so it can be dynamically updated. See https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/ + Path to file with TLS key for the corresponding -syslog.listenAddr.tcp if the corresponding -syslog.tls is set. The provided key file is automatically re-read every second, so it can be dynamically updated. See https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/#security Supports an array of values separated by comma or specified via multiple flags. Value can contain comma inside single-quoted or double-quoted string, {}, [] and () braces. -syslog.tlsMinVersion string - The minimum TLS version to use for -syslog.listenAddr.tcp if -syslog.tls is set. Supported values: TLS10, TLS11, TLS12, TLS13. See https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/ (default "TLS13") + The minimum TLS version to use for -syslog.listenAddr.tcp if -syslog.tls is set. Supported values: TLS10, TLS11, TLS12, TLS13. See https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/#security (default "TLS13") + -syslog.useLocalTimestamp.tcp array + Whether to use local timestamp instead of the original timestamp for the ingested syslog messages at the corresponding -syslog.listenAddr.tcp. See https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/#log-timestamps + Supports array of values separated by comma or specified via multiple flags. + Empty values are set to false. + -syslog.useLocalTimestamp.udp array + Whether to use local timestamp instead of the original timestamp for the ingested syslog messages at the corresponding -syslog.listenAddr.udp. See https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/#log-timestamps + Supports array of values separated by comma or specified via multiple flags. + Empty values are set to false. -tls array Whether to enable TLS for incoming HTTP requests at the given -httpListenAddr (aka https). -tlsCertFile and -tlsKeyFile must be set if -tls is set. See also -mtls Supports array of values separated by comma or specified via multiple flags. diff --git a/docs/VictoriaLogs/data-ingestion/syslog.md b/docs/VictoriaLogs/data-ingestion/syslog.md index 3ef7396c48..de2468e1b5 100644 --- a/docs/VictoriaLogs/data-ingestion/syslog.md +++ b/docs/VictoriaLogs/data-ingestion/syslog.md @@ -42,7 +42,7 @@ and delimit them with `\n` char. VictoriaLogs automatically extracts the following [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) from the received Syslog lines: -- [`_time`](https://docs.victoriametrics.com/victorialogs/keyconcepts/#time-field) - log timestamp +- [`_time`](https://docs.victoriametrics.com/victorialogs/keyconcepts/#time-field) - log timestamp. See also [log timestamps](#log-timestamps) - [`_msg`](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field) - the `MESSAGE` field from the supported syslog formats above - `hostname`, `app_name` and `proc_id` - [stream fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields) for unique identification over every log stream @@ -70,21 +70,42 @@ curl http://localhost:9428/select/logsql/query -d 'query=_time:5m' See also: +- [Log timestamps](#log-timestamps) - [Security](#security) - [Compression](#compression) - [Multitenancy](#multitenancy) - [Data ingestion troubleshooting](https://docs.victoriametrics.com/victorialogs/data-ingestion/#troubleshooting). - [How to query VictoriaLogs](https://docs.victoriametrics.com/victorialogs/querying/). +## Log timestamps + +By default VictoriaLogs uses the timestamp from the parsed Syslog message as [`_time` field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#time-field). +Sometimes the ingested Syslog messages may contain incorrect timestamps (for example, timestamps with incorrect timezone). In this case VictoriaLogs can be configured +for using the log ingestion timestamp as [`_time` field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#time-field). This can be done by specifying +`-syslog.useLocalTimestamp.tcp` command-line flag for the corresponding `-syslog.listenAddr.tcp` address: + +```sh +./victoria-logs -syslog.listenAddr.tcp=:514 -syslog.useLocalTimestamp.tcp +``` + +In this case the original timestamp from the Syslog message is stored in `timestamp` [log field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). + +The `-syslog.useLocalTimestamp.udp` command-line flag can be used for instructing VictoriaLogs to use local timestamps for the ingested logs +via the corresponding `-syslog.listenAddr.udp` address: + +```sh +./victoria-logs -syslog.listenAddr.udp=:514 -syslog.useLocalTimestamp.udp +``` + ## Security By default VictoriaLogs accepts plaintext data at `-syslog.listenAddr.tcp` address. Run VictoriaLogs with `-syslog.tls` command-line flag in order to accept TLS-encrypted logs at `-syslog.listenAddr.tcp` address. The `-syslog.tlsCertFile` and `-syslog.tlsKeyFile` command-line flags must be set to paths to TLS certificate file and TLS key file if `-syslog.tls` is set. For example, the following command -starts VictoriaLogs, which accepts TLS-encrypted syslog messages at TCP port 514: +starts VictoriaLogs, which accepts TLS-encrypted syslog messages at TCP port 6514: ```sh -./victoria-logs -syslog.listenAddr.tcp=:514 -syslog.tls -syslog.tlsCertFile=/path/to/tls/cert -syslog.tlsKeyFile=/path/to/tls/key +./victoria-logs -syslog.listenAddr.tcp=:6514 -syslog.tls -syslog.tlsCertFile=/path/to/tls/cert -syslog.tlsKeyFile=/path/to/tls/key ``` ## Compression @@ -116,7 +137,7 @@ For example, the following command starts VictoriaLogs, which writes syslog mess ## Multiple configs -VictoriaLogs can accept syslog messages via multiple TCP and UDP ports with individual configurations for [compression](#compression), [security](#security) +VictoriaLogs can accept syslog messages via multiple TCP and UDP ports with individual configurations for [log timestamps](#log-timestamps), [compression](#compression), [security](#security) and [multitenancy](#multitenancy). Specify multiple command-line flags for this. For example, the following command starts VictoriaLogs, which accepts gzip-compressed syslog messages via TCP port 514 at localhost interface and stores them to [tenant](https://docs.victoriametrics.com/victorialogs/#multitenancy) `123:0`, plus it accepts TLS-encrypted syslog messages via TCP port 6514 and stores them to [tenant](https://docs.victoriametrics.com/victorialogs/#multitenancy) `567:0`: