VictoriaMetrics/app/vlinsert/syslog/syslog.go

482 lines
14 KiB
Go
Raw Normal View History

2024-06-17 10:13:18 +00:00
package syslog
import (
"bufio"
"crypto/tls"
"errors"
"flag"
"fmt"
"io"
"net"
"strconv"
2024-06-17 10:13:18 +00:00
"strings"
"sync"
"sync/atomic"
"time"
"github.com/klauspost/compress/gzip"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vlinsert/insertutils"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vlstorage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/ingestserver"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil"
2024-06-17 10:13:18 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
"github.com/VictoriaMetrics/metrics"
)
var (
syslogTenantID = flag.String("syslog.tenantID", "0:0", "TenantID for logs ingested via Syslog protocol. See https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/")
syslogTimezone = flag.String("syslog.timezone", "Local", "Timezone to use when parsing timestamps in RFC3164 syslog messages. Timezone must be a valid IANA Time Zone. "+
"For example: America/New_York, Europe/Berlin, Etc/GMT+3 . See https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/")
listenAddrTCP = flag.String("syslog.listenAddr.tcp", "", "Optional TCP address to listen to for Syslog messages. See https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/")
listenAddrUDP = flag.String("syslog.listenAddr.udp", "", "Optional UDP address to listen to for Syslog messages. See https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/")
tlsEnable = flag.Bool("syslog.tls", false, "Whether to use TLS for receiving syslog messages at -syslog.listenAddr.tcp. -syslog.tlsCertFile and -syslog.tlsKeyFile must be set "+
"if -syslog.tls is set. See https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/")
tlsCertFile = flag.String("syslog.tlsCertFile", "", "Path to file with TLS certificate for -syslog.listenAddr.tcp if -syslog.tls is set. "+
"Prefer ECDSA certs instead of RSA certs as RSA certs are slower. The provided certificate file is automatically re-read every second, so it can be dynamically updated. "+
"See https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/")
tlsKeyFile = flag.String("syslog.tlsKeyFile", "", "Path to file with TLS key for -syslog.listenAddr.tcp if -syslog.tls is set. "+
"The provided key file is automatically re-read every second, so it can be dynamically updated")
tlsCipherSuites = flagutil.NewArrayString("syslog.tlsCipherSuites", "Optional list of TLS cipher suites for -syslog.listenAddr.tcp if -syslog.tls is set. "+
"See the list of supported cipher suites at https://pkg.go.dev/crypto/tls#pkg-constants . "+
"See also https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/")
tlsMinVersion = flag.String("syslog.tlsMinVersion", "TLS13", "The minimum TLS version to use for -syslog.listenAddr.tcp if -syslog.tls is set. "+
"Supported values: TLS10, TLS11, TLS12, TLS13. "+
"See https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/")
compressMethod = flag.String("syslog.compressMethod", "", "Compression method for syslog messages received at -syslog.listenAddr.tcp and -syslog.listenAddr.udp. "+
"Supported values: none, gzip, deflate. See https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/")
)
// MustInit initializes syslog parser at the given -syslog.listenAddr.tcp and -syslog.listenAddr.udp ports
//
// This function must be called after flag.Parse().
//
// MustStop() must be called in order to free up resources occupied by the initialized syslog parser.
func MustInit() {
if workersStopCh != nil {
logger.Panicf("BUG: MustInit() called twice without MustStop() call")
}
workersStopCh = make(chan struct{})
tenantID, err := logstorage.GetTenantIDFromString(*syslogTenantID)
if err != nil {
logger.Fatalf("cannot parse -syslog.tenantID=%q: %s", *syslogTenantID, err)
}
globalTenantID = tenantID
switch *compressMethod {
case "", "none", "gzip", "deflate":
default:
logger.Fatalf("unexpected -syslog.compressLevel=%q; supported values: none, gzip, deflate", *compressMethod)
}
if *listenAddrTCP != "" {
workersWG.Add(1)
go func() {
runTCPListener(*listenAddrTCP)
workersWG.Done()
}()
}
if *listenAddrUDP != "" {
workersWG.Add(1)
go func() {
runUDPListener(*listenAddrUDP)
workersWG.Done()
}()
}
currentYear := time.Now().Year()
globalCurrentYear.Store(int64(currentYear))
workersWG.Add(1)
go func() {
ticker := time.NewTicker(time.Minute)
for {
select {
case <-workersStopCh:
ticker.Stop()
workersWG.Done()
return
case <-ticker.C:
currentYear := time.Now().Year()
globalCurrentYear.Store(int64(currentYear))
}
}
}()
if *syslogTimezone != "" {
tz, err := time.LoadLocation(*syslogTimezone)
if err != nil {
logger.Fatalf("cannot parse -syslog.timezone=%q: %s", *syslogTimezone, err)
}
globalTimezone = tz
} else {
globalTimezone = time.Local
}
}
var (
globalTenantID logstorage.TenantID
globalCurrentYear atomic.Int64
globalTimezone *time.Location
)
var (
workersWG sync.WaitGroup
workersStopCh chan struct{}
)
// MustStop stops syslog parser initialized via MustInit()
func MustStop() {
close(workersStopCh)
workersWG.Wait()
workersStopCh = nil
}
func runUDPListener(addr string) {
ln, err := net.ListenPacket(netutil.GetUDPNetwork(), addr)
if err != nil {
logger.Fatalf("cannot start UDP syslog server at %q: %s", addr, err)
}
doneCh := make(chan struct{})
go func() {
serveUDP(ln)
close(doneCh)
}()
<-workersStopCh
if err := ln.Close(); err != nil {
logger.Fatalf("syslog: cannot close UDP listener at %s: %s", addr, err)
}
<-doneCh
}
func runTCPListener(addr string) {
var tlsConfig *tls.Config
if *tlsEnable {
tc, err := netutil.GetServerTLSConfig(*tlsCertFile, *tlsKeyFile, *tlsMinVersion, *tlsCipherSuites)
if err != nil {
logger.Fatalf("cannot load TLS cert from -syslog.tlsCertFile=%q, -syslog.tlsKeyFile=%q, -syslog.tlsMinVersion=%q, -syslog.tlsCipherSuites=%q: %s",
*tlsCertFile, *tlsKeyFile, *tlsMinVersion, *tlsCipherSuites, err)
}
tlsConfig = tc
}
ln, err := netutil.NewTCPListener("syslog", addr, false, tlsConfig)
if err != nil {
logger.Fatalf("syslog: cannot start TCP listener at %s: %s", addr, err)
}
doneCh := make(chan struct{})
go func() {
serveTCP(ln)
close(doneCh)
}()
<-workersStopCh
if err := ln.Close(); err != nil {
logger.Fatalf("syslog: cannot close TCP listener at %s: %s", addr, err)
}
<-doneCh
}
func serveUDP(ln net.PacketConn) {
gomaxprocs := cgroup.AvailableCPUs()
var wg sync.WaitGroup
localAddr := ln.LocalAddr()
for i := 0; i < gomaxprocs; i++ {
wg.Add(1)
go func() {
defer wg.Done()
cp := insertutils.GetCommonParamsForSyslog(globalTenantID)
var bb bytesutil.ByteBuffer
bb.B = bytesutil.ResizeNoCopyNoOverallocate(bb.B, 64*1024)
for {
bb.Reset()
bb.B = bb.B[:cap(bb.B)]
n, remoteAddr, err := ln.ReadFrom(bb.B)
if err != nil {
udpErrorsTotal.Inc()
var ne net.Error
if errors.As(err, &ne) {
if ne.Temporary() {
logger.Errorf("syslog: temporary error when listening for UDP at %q: %s", localAddr, err)
time.Sleep(time.Second)
continue
}
if strings.Contains(err.Error(), "use of closed network connection") {
break
}
}
logger.Errorf("syslog: cannot read UDP data from %s at %s: %s", remoteAddr, localAddr, err)
continue
}
bb.B = bb.B[:n]
udpRequestsTotal.Inc()
if err := processStream(bb.NewReader(), cp); err != nil {
logger.Errorf("syslog: cannot process UDP data from %s at %s: %s", remoteAddr, localAddr, err)
}
}
}()
}
wg.Wait()
}
func serveTCP(ln net.Listener) {
var cm ingestserver.ConnsMap
cm.Init("syslog")
var wg sync.WaitGroup
addr := ln.Addr()
for {
c, err := ln.Accept()
if err != nil {
var ne net.Error
if errors.As(err, &ne) {
if ne.Temporary() {
logger.Errorf("syslog: temporary error when listening for TCP addr %q: %s", addr, err)
time.Sleep(time.Second)
continue
}
if strings.Contains(err.Error(), "use of closed network connection") {
break
}
logger.Fatalf("syslog: unrecoverable error when accepting TCP connections at %q: %s", addr, err)
}
logger.Fatalf("syslog: unexpected error when accepting TCP connections at %q: %s", addr, err)
}
if !cm.Add(c) {
_ = c.Close()
break
}
wg.Add(1)
go func() {
cp := insertutils.GetCommonParamsForSyslog(globalTenantID)
if err := processStream(c, cp); err != nil {
logger.Errorf("syslog: cannot process TCP data at %q: %s", addr, err)
}
cm.Delete(c)
_ = c.Close()
wg.Done()
}()
}
cm.CloseAll(0)
wg.Wait()
}
// processStream parses a stream of syslog messages from r and ingests them into vlstorage.
func processStream(r io.Reader, cp *insertutils.CommonParams) error {
if err := vlstorage.CanWriteData(); err != nil {
return err
}
lmp := cp.NewLogMessageProcessor()
err := processStreamInternal(r, lmp)
lmp.MustClose()
return err
}
func processStreamInternal(r io.Reader, lmp insertutils.LogMessageProcessor) error {
2024-06-17 10:13:18 +00:00
switch *compressMethod {
case "", "none":
case "gzip":
zr, err := common.GetGzipReader(r)
if err != nil {
return fmt.Errorf("cannot read gzipped data: %w", err)
}
r = zr
case "deflate":
zr, err := common.GetZlibReader(r)
if err != nil {
return fmt.Errorf("cannot read deflated data: %w", err)
}
r = zr
default:
logger.Panicf("BUG: compressLevel=%q; supported values: none, gzip, deflate", *compressMethod)
}
err := processUncompressedStream(r, lmp)
2024-06-17 10:13:18 +00:00
switch *compressMethod {
case "gzip":
zr := r.(*gzip.Reader)
common.PutGzipReader(zr)
case "deflate":
zr := r.(io.ReadCloser)
common.PutZlibReader(zr)
}
return err
}
func processUncompressedStream(r io.Reader, lmp insertutils.LogMessageProcessor) error {
2024-06-17 10:13:18 +00:00
wcr := writeconcurrencylimiter.GetReader(r)
defer writeconcurrencylimiter.PutReader(wcr)
slr := getSyslogLineReader(wcr)
defer putSyslogLineReader(slr)
2024-06-17 10:13:18 +00:00
n := 0
for {
ok := slr.nextLine()
2024-06-17 10:13:18 +00:00
wcr.DecConcurrency()
if !ok {
break
}
currentYear := int(globalCurrentYear.Load())
err := processLine(slr.line, currentYear, globalTimezone, lmp)
2024-06-17 10:13:18 +00:00
if err != nil {
errorsTotal.Inc()
return fmt.Errorf("cannot read line #%d: %s", n, err)
}
n++
rowsIngestedTotal.Inc()
}
return slr.Error()
}
2024-06-17 10:13:18 +00:00
type syslogLineReader struct {
line []byte
2024-06-17 10:13:18 +00:00
br *bufio.Reader
err error
2024-06-17 10:13:18 +00:00
}
func (slr *syslogLineReader) reset(r io.Reader) {
slr.line = slr.line[:0]
slr.br.Reset(r)
slr.err = nil
}
// Error returns the last error occurred in slr.
func (slr *syslogLineReader) Error() error {
if slr.err == nil || slr.err == io.EOF {
return nil
}
return slr.err
}
// nextLine reads the next syslog line from slr and stores it at slr.line.
//
// false is returned if the next line cannot be read. Error() must be called in this case
// in order to verify whether there is an error or just slr stream has been finished.
func (slr *syslogLineReader) nextLine() bool {
if slr.err != nil {
return false
}
prefix, err := slr.br.ReadSlice(' ')
if err != nil {
if err != io.EOF {
slr.err = fmt.Errorf("cannot read message frame prefix: %w", err)
return false
}
if len(prefix) == 0 {
slr.err = err
return false
2024-06-17 10:13:18 +00:00
}
}
// skip empty lines
for len(prefix) > 0 && prefix[0] == '\n' {
prefix = prefix[1:]
2024-06-17 10:13:18 +00:00
}
if prefix[0] >= '0' && prefix[0] <= '9' {
// This is octet-counting method. See https://www.ietf.org/archive/id/draft-gerhards-syslog-plain-tcp-07.html#msgxfer
msgLenStr := bytesutil.ToUnsafeString(prefix[:len(prefix)-1])
msgLen, err := strconv.ParseUint(msgLenStr, 10, 64)
if err != nil {
slr.err = fmt.Errorf("cannot parse message length from %q: %w", msgLenStr, err)
return false
}
if maxMsgLen := insertutils.MaxLineSizeBytes.IntN(); msgLen > uint64(maxMsgLen) {
slr.err = fmt.Errorf("cannot read message longer than %d bytes; msgLen=%d", maxMsgLen, msgLen)
return false
}
slr.line = slicesutil.SetLength(slr.line, int(msgLen))
if _, err := io.ReadFull(slr.br, slr.line); err != nil {
slr.err = fmt.Errorf("cannot read message with size %d bytes: %w", msgLen, err)
return false
}
return true
}
// This is octet-stuffing method. See https://www.ietf.org/archive/id/draft-gerhards-syslog-plain-tcp-07.html#octet-stuffing-legacy
slr.line = append(slr.line[:0], prefix...)
for {
line, err := slr.br.ReadSlice('\n')
if err == nil {
slr.line = append(slr.line, line[:len(line)-1]...)
return true
}
if err == io.EOF {
slr.line = append(slr.line, line...)
return true
}
if err == bufio.ErrBufferFull {
slr.line = append(slr.line, line...)
continue
}
slr.err = fmt.Errorf("cannot read message in octet-stuffing method: %w", err)
return false
}
}
func getSyslogLineReader(r io.Reader) *syslogLineReader {
v := syslogLineReaderPool.Get()
if v == nil {
br := bufio.NewReaderSize(r, 64*1024)
return &syslogLineReader{
br: br,
}
}
slr := v.(*syslogLineReader)
slr.reset(r)
return slr
}
func putSyslogLineReader(slr *syslogLineReader) {
syslogLineReaderPool.Put(slr)
}
var syslogLineReaderPool sync.Pool
func processLine(line []byte, currentYear int, timezone *time.Location, lmp insertutils.LogMessageProcessor) error {
2024-06-17 10:13:18 +00:00
p := logstorage.GetSyslogParser(currentYear, timezone)
lineStr := bytesutil.ToUnsafeString(line)
p.Parse(lineStr)
ts, err := insertutils.ExtractTimestampISO8601FromFields("timestamp", p.Fields)
if err != nil {
return fmt.Errorf("cannot get timestamp from syslog line %q: %w", line, err)
2024-06-17 10:13:18 +00:00
}
logstorage.RenameField(p.Fields, "message", "_msg")
lmp.AddRow(ts, p.Fields)
2024-06-17 10:13:18 +00:00
logstorage.PutSyslogParser(p)
return nil
2024-06-17 10:13:18 +00:00
}
var (
rowsIngestedTotal = metrics.NewCounter(`vl_rows_ingested_total{type="syslog"}`)
errorsTotal = metrics.NewCounter(`vl_errors_total{type="syslog"}`)
udpRequestsTotal = metrics.NewCounter(`vl_udp_reqests_total{type="syslog"}`)
udpErrorsTotal = metrics.NewCounter(`vl_udp_errors_total{type="syslog"}`)
)