diff --git a/app/vlinsert/elasticsearch/elasticsearch.go b/app/vlinsert/elasticsearch/elasticsearch.go index 3d103791c..9c6a2e1f0 100644 --- a/app/vlinsert/elasticsearch/elasticsearch.go +++ b/app/vlinsert/elasticsearch/elasticsearch.go @@ -221,7 +221,7 @@ func readBulkLine(sc *bufio.Scanner, timeField, msgField string, if ts == 0 { ts = time.Now().UnixNano() } - p.RenameField(msgField, "_msg") + logstorage.RenameField(p.Fields, msgField, "_msg") processLogMessage(ts, p.Fields) logstorage.PutJSONParser(p) @@ -272,9 +272,9 @@ func parseElasticsearchTimestamp(s string) (int64, error) { } return t.UnixNano(), nil } - t, err := time.Parse(time.RFC3339, s) - if err != nil { - return 0, fmt.Errorf("cannot parse timestamp %q: %w", s, err) + nsecs, ok := logstorage.TryParseTimestampRFC3339Nano(s) + if !ok { + return 0, fmt.Errorf("cannot parse timestamp %q", s) } - return t.UnixNano(), nil + return nsecs, nil } diff --git a/app/vlinsert/elasticsearch/elasticsearch_test.go b/app/vlinsert/elasticsearch/elasticsearch_test.go index e5169f246..8a15dc0e2 100644 --- a/app/vlinsert/elasticsearch/elasticsearch_test.go +++ b/app/vlinsert/elasticsearch/elasticsearch_test.go @@ -5,7 +5,6 @@ import ( "compress/gzip" "fmt" "reflect" - "strings" "testing" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage" @@ -45,13 +44,7 @@ func TestReadBulkRequestSuccess(t *testing.T) { var result string processLogMessage := func(timestamp int64, fields []logstorage.Field) { timestamps = append(timestamps, timestamp) - - a := make([]string, len(fields)) - for i, f := range fields { - a[i] = fmt.Sprintf("%q:%q", f.Name, f.Value) - } - s := "{" + strings.Join(a, ",") + "}\n" - result += s + result += string(logstorage.MarshalFieldsToJSON(nil, fields)) + "\n" } // Read the request without compression diff --git a/app/vlinsert/insertutils/common_params.go b/app/vlinsert/insertutils/common_params.go index 389bfa0c9..6f808e00c 100644 --- a/app/vlinsert/insertutils/common_params.go +++ b/app/vlinsert/insertutils/common_params.go @@ -71,6 +71,23 @@ func GetCommonParams(r *http.Request) (*CommonParams, error) { return cp, nil } +// GetCommonParamsForSyslog returns common params needed for parsing syslog messages and storing them to the given tenantID. +func GetCommonParamsForSyslog(tenantID logstorage.TenantID) *CommonParams { + // See https://docs.victoriametrics.com/victorialogs/logsql/#unpack_syslog-pipe + cp := &CommonParams{ + TenantID: tenantID, + TimeField: "timestamp", + MsgField: "message", + StreamFields: []string{ + "hostname", + "app_name", + "proc_id", + }, + } + + return cp +} + // GetProcessLogMessageFunc returns a function, which adds parsed log messages to lr. func (cp *CommonParams) GetProcessLogMessageFunc(lr *logstorage.LogRows) func(timestamp int64, fields []logstorage.Field) { return func(timestamp int64, fields []logstorage.Field) { diff --git a/app/vlinsert/insertutils/timestamp.go b/app/vlinsert/insertutils/timestamp.go new file mode 100644 index 000000000..b75e5f559 --- /dev/null +++ b/app/vlinsert/insertutils/timestamp.go @@ -0,0 +1,33 @@ +package insertutils + +import ( + "fmt" + "time" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage" +) + +// ExtractTimestampISO8601FromFields extracts ISO8601 timestamp in nanoseconds from the field with the name timeField at fields. +// +// The value for the timeField is set to empty string after returning from the function, +// so it could be ignored during data ingestion. +// +// The current timestamp is returned if fields do not contain a field with timeField name or if the timeField value is empty. +func ExtractTimestampISO8601FromFields(timeField string, fields []logstorage.Field) (int64, error) { + for i := range fields { + f := &fields[i] + if f.Name != timeField { + continue + } + nsecs, ok := logstorage.TryParseTimestampISO8601(f.Value) + if !ok { + if f.Value == "0" || f.Value == "" { + return time.Now().UnixNano(), nil + } + return time.Now().UnixNano(), fmt.Errorf("cannot unmarshal iso8601 timestamp from %s=%q", timeField, f.Value) + } + f.Value = "" + return nsecs, nil + } + return time.Now().UnixNano(), nil +} diff --git a/app/vlinsert/jsonline/jsonline.go b/app/vlinsert/jsonline/jsonline.go index 33d832789..9eeb99a5c 100644 --- a/app/vlinsert/jsonline/jsonline.go +++ b/app/vlinsert/jsonline/jsonline.go @@ -19,13 +19,13 @@ import ( ) // RequestHandler processes jsonline insert requests -func RequestHandler(w http.ResponseWriter, r *http.Request) bool { +func RequestHandler(w http.ResponseWriter, r *http.Request) { startTime := time.Now() w.Header().Add("Content-Type", "application/json") if r.Method != "POST" { w.WriteHeader(http.StatusMethodNotAllowed) - return true + return } requestsTotal.Inc() @@ -33,11 +33,11 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool { cp, err := insertutils.GetCommonParams(r) if err != nil { httpserver.Errorf(w, r, "%s", err) - return true + return } if err := vlstorage.CanWriteData(); err != nil { httpserver.Errorf(w, r, "%s", err) - return true + return } lr := logstorage.GetLogRows(cp.StreamFields, cp.IgnoreFields) processLogMessage := cp.GetProcessLogMessageFunc(lr) @@ -46,8 +46,8 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool { if r.Header.Get("Content-Encoding") == "gzip" { zr, err := common.GetGzipReader(reader) if err != nil { - logger.Errorf("cannot read gzipped _bulk request: %s", err) - return true + logger.Errorf("cannot read gzipped jsonline request: %s", err) + return } defer common.PutGzipReader(zr) reader = zr @@ -68,6 +68,7 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool { ok, err := readLine(sc, cp.TimeField, cp.MsgField, processLogMessage) wcr.DecConcurrency() if err != nil { + errorsTotal.Inc() logger.Errorf("cannot read line #%d in /jsonline request: %s", n, err) break } @@ -81,12 +82,10 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool { vlstorage.MustAddRows(lr) logstorage.PutLogRows(lr) - // update jsonlineRequestDuration only for successfully parsed requests. - // There is no need in updating jsonlineRequestDuration for request errors, + // update requestDuration only for successfully parsed requests. + // There is no need in updating requestDuration for request errors, // since their timings are usually much smaller than the timing for successful request parsing. - jsonlineRequestDuration.UpdateDuration(startTime) - - return true + requestDuration.UpdateDuration(startTime) } func readLine(sc *bufio.Scanner, timeField, msgField string, processLogMessage func(timestamp int64, fields []logstorage.Field)) (bool, error) { @@ -108,53 +107,24 @@ func readLine(sc *bufio.Scanner, timeField, msgField string, processLogMessage f if err := p.ParseLogMessage(line); err != nil { return false, fmt.Errorf("cannot parse json-encoded log entry: %w", err) } - ts, err := extractTimestampFromFields(timeField, p.Fields) + ts, err := insertutils.ExtractTimestampISO8601FromFields(timeField, p.Fields) if err != nil { - return false, fmt.Errorf("cannot parse timestamp: %w", err) + return false, fmt.Errorf("cannot get timestamp: %w", err) } - if ts == 0 { - ts = time.Now().UnixNano() - } - p.RenameField(msgField, "_msg") + logstorage.RenameField(p.Fields, msgField, "_msg") processLogMessage(ts, p.Fields) logstorage.PutJSONParser(p) return true, nil } -func extractTimestampFromFields(timeField string, fields []logstorage.Field) (int64, error) { - for i := range fields { - f := &fields[i] - if f.Name != timeField { - continue - } - timestamp, err := parseISO8601Timestamp(f.Value) - if err != nil { - return 0, err - } - f.Value = "" - return timestamp, nil - } - return 0, nil -} - -func parseISO8601Timestamp(s string) (int64, error) { - if s == "0" || s == "" { - // Special case for returning the current timestamp. - // It must be automatically converted to the current timestamp by the caller. - return 0, nil - } - t, err := time.Parse(time.RFC3339, s) - if err != nil { - return 0, fmt.Errorf("cannot parse timestamp %q: %w", s, err) - } - return t.UnixNano(), nil -} - var lineBufferPool bytesutil.ByteBufferPool var ( - requestsTotal = metrics.NewCounter(`vl_http_requests_total{path="/insert/jsonline"}`) - rowsIngestedTotal = metrics.NewCounter(`vl_rows_ingested_total{type="jsonline"}`) - jsonlineRequestDuration = metrics.NewHistogram(`vl_http_request_duration_seconds{path="/insert/jsonline"}`) + rowsIngestedTotal = metrics.NewCounter(`vl_rows_ingested_total{type="jsonline"}`) + + requestsTotal = metrics.NewCounter(`vl_http_requests_total{path="/insert/jsonline"}`) + errorsTotal = metrics.NewCounter(`vl_http_errors_total{path="/insert/jsonline"}`) + + requestDuration = metrics.NewHistogram(`vl_http_request_duration_seconds{path="/insert/jsonline"}`) ) diff --git a/app/vlinsert/jsonline/jsonline_test.go b/app/vlinsert/jsonline/jsonline_test.go index 86a917491..dc8d554cf 100644 --- a/app/vlinsert/jsonline/jsonline_test.go +++ b/app/vlinsert/jsonline/jsonline_test.go @@ -3,14 +3,13 @@ package jsonline import ( "bufio" "bytes" - "fmt" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage" "reflect" - "strings" "testing" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage" ) -func TestReadBulkRequestSuccess(t *testing.T) { +func TestReadLine_Success(t *testing.T) { f := func(data, timeField, msgField string, rowsExpected int, timestampsExpected []int64, resultExpected string) { t.Helper() @@ -18,16 +17,9 @@ func TestReadBulkRequestSuccess(t *testing.T) { var result string processLogMessage := func(timestamp int64, fields []logstorage.Field) { timestamps = append(timestamps, timestamp) - - a := make([]string, len(fields)) - for i, f := range fields { - a[i] = fmt.Sprintf("%q:%q", f.Name, f.Value) - } - s := "{" + strings.Join(a, ",") + "}\n" - result += s + result += string(logstorage.MarshalFieldsToJSON(nil, fields)) + "\n" } - // Read the request without compression r := bytes.NewBufferString(data) sc := bufio.NewScanner(r) rows := 0 @@ -53,7 +45,6 @@ func TestReadBulkRequestSuccess(t *testing.T) { } } - // Verify non-empty data data := `{"@timestamp":"2023-06-06T04:48:11.735Z","log":{"offset":71770,"file":{"path":"/var/log/auth.log"}},"message":"foobar"} {"@timestamp":"2023-06-06T04:48:12.735Z","message":"baz"} {"message":"xyz","@timestamp":"2023-06-06T04:48:13.735Z","x":"y"} diff --git a/app/vlinsert/loki/loki.go b/app/vlinsert/loki/loki.go index e7a3c0b7d..95c5fd5e6 100644 --- a/app/vlinsert/loki/loki.go +++ b/app/vlinsert/loki/loki.go @@ -11,7 +11,8 @@ import ( func RequestHandler(path string, w http.ResponseWriter, r *http.Request) bool { switch path { case "/api/v1/push": - return handleInsert(r, w) + handleInsert(r, w) + return true case "/ready": // See https://grafana.com/docs/loki/latest/api/#identify-ready-loki-instance w.WriteHeader(http.StatusOK) @@ -23,14 +24,14 @@ func RequestHandler(path string, w http.ResponseWriter, r *http.Request) bool { } // See https://grafana.com/docs/loki/latest/api/#push-log-entries-to-loki -func handleInsert(r *http.Request, w http.ResponseWriter) bool { +func handleInsert(r *http.Request, w http.ResponseWriter) { contentType := r.Header.Get("Content-Type") switch contentType { case "application/json": - return handleJSON(r, w) + handleJSON(r, w) default: // Protobuf request body should be handled by default according to https://grafana.com/docs/loki/latest/api/#push-log-entries-to-loki - return handleProtobuf(r, w) + handleProtobuf(r, w) } } diff --git a/app/vlinsert/loki/loki_json.go b/app/vlinsert/loki/loki_json.go index 263b1781f..96fd1a9d7 100644 --- a/app/vlinsert/loki/loki_json.go +++ b/app/vlinsert/loki/loki_json.go @@ -20,15 +20,15 @@ import ( var parserPool fastjson.ParserPool -func handleJSON(r *http.Request, w http.ResponseWriter) bool { +func handleJSON(r *http.Request, w http.ResponseWriter) { startTime := time.Now() - lokiRequestsJSONTotal.Inc() + requestsJSONTotal.Inc() reader := r.Body if r.Header.Get("Content-Encoding") == "gzip" { zr, err := common.GetGzipReader(reader) if err != nil { httpserver.Errorf(w, r, "cannot initialize gzip reader: %s", err) - return true + return } defer common.PutGzipReader(zr) reader = zr @@ -39,17 +39,17 @@ func handleJSON(r *http.Request, w http.ResponseWriter) bool { writeconcurrencylimiter.PutReader(wcr) if err != nil { httpserver.Errorf(w, r, "cannot read request body: %s", err) - return true + return } cp, err := getCommonParams(r) if err != nil { httpserver.Errorf(w, r, "cannot parse common params from request: %s", err) - return true + return } if err := vlstorage.CanWriteData(); err != nil { httpserver.Errorf(w, r, "%s", err) - return true + return } lr := logstorage.GetLogRows(cp.StreamFields, cp.IgnoreFields) processLogMessage := cp.GetProcessLogMessageFunc(lr) @@ -58,23 +58,21 @@ func handleJSON(r *http.Request, w http.ResponseWriter) bool { logstorage.PutLogRows(lr) if err != nil { httpserver.Errorf(w, r, "cannot parse Loki json request: %s", err) - return true + return } rowsIngestedJSONTotal.Add(n) - // update lokiRequestJSONDuration only for successfully parsed requests - // There is no need in updating lokiRequestJSONDuration for request errors, + // update requestJSONDuration only for successfully parsed requests + // There is no need in updating requestJSONDuration for request errors, // since their timings are usually much smaller than the timing for successful request parsing. - lokiRequestJSONDuration.UpdateDuration(startTime) - - return true + requestJSONDuration.UpdateDuration(startTime) } var ( - lokiRequestsJSONTotal = metrics.NewCounter(`vl_http_requests_total{path="/insert/loki/api/v1/push",format="json"}`) - rowsIngestedJSONTotal = metrics.NewCounter(`vl_rows_ingested_total{type="loki",format="json"}`) - lokiRequestJSONDuration = metrics.NewHistogram(`vl_http_request_duration_seconds{path="/insert/loki/api/v1/push",format="json"}`) + requestsJSONTotal = metrics.NewCounter(`vl_http_requests_total{path="/insert/loki/api/v1/push",format="json"}`) + rowsIngestedJSONTotal = metrics.NewCounter(`vl_rows_ingested_total{type="loki",format="json"}`) + requestJSONDuration = metrics.NewHistogram(`vl_http_request_duration_seconds{path="/insert/loki/api/v1/push",format="json"}`) ) func parseJSONRequest(data []byte, processLogMessage func(timestamp int64, fields []logstorage.Field)) (int, error) { diff --git a/app/vlinsert/loki/loki_protobuf.go b/app/vlinsert/loki/loki_protobuf.go index fe171e6d4..4860e6e12 100644 --- a/app/vlinsert/loki/loki_protobuf.go +++ b/app/vlinsert/loki/loki_protobuf.go @@ -23,25 +23,25 @@ var ( pushReqsPool sync.Pool ) -func handleProtobuf(r *http.Request, w http.ResponseWriter) bool { +func handleProtobuf(r *http.Request, w http.ResponseWriter) { startTime := time.Now() - lokiRequestsProtobufTotal.Inc() + requestsProtobufTotal.Inc() wcr := writeconcurrencylimiter.GetReader(r.Body) data, err := io.ReadAll(wcr) writeconcurrencylimiter.PutReader(wcr) if err != nil { httpserver.Errorf(w, r, "cannot read request body: %s", err) - return true + return } cp, err := getCommonParams(r) if err != nil { httpserver.Errorf(w, r, "cannot parse common params from request: %s", err) - return true + return } if err := vlstorage.CanWriteData(); err != nil { httpserver.Errorf(w, r, "%s", err) - return true + return } lr := logstorage.GetLogRows(cp.StreamFields, cp.IgnoreFields) processLogMessage := cp.GetProcessLogMessageFunc(lr) @@ -50,23 +50,21 @@ func handleProtobuf(r *http.Request, w http.ResponseWriter) bool { logstorage.PutLogRows(lr) if err != nil { httpserver.Errorf(w, r, "cannot parse Loki protobuf request: %s", err) - return true + return } rowsIngestedProtobufTotal.Add(n) - // update lokiRequestProtobufDuration only for successfully parsed requests - // There is no need in updating lokiRequestProtobufDuration for request errors, + // update requestProtobufDuration only for successfully parsed requests + // There is no need in updating requestProtobufDuration for request errors, // since their timings are usually much smaller than the timing for successful request parsing. - lokiRequestProtobufDuration.UpdateDuration(startTime) - - return true + requestProtobufDuration.UpdateDuration(startTime) } var ( - lokiRequestsProtobufTotal = metrics.NewCounter(`vl_http_requests_total{path="/insert/loki/api/v1/push",format="protobuf"}`) - rowsIngestedProtobufTotal = metrics.NewCounter(`vl_rows_ingested_total{type="loki",format="protobuf"}`) - lokiRequestProtobufDuration = metrics.NewHistogram(`vl_http_request_duration_seconds{path="/insert/loki/api/v1/push",format="protobuf"}`) + requestsProtobufTotal = metrics.NewCounter(`vl_http_requests_total{path="/insert/loki/api/v1/push",format="protobuf"}`) + rowsIngestedProtobufTotal = metrics.NewCounter(`vl_rows_ingested_total{type="loki",format="protobuf"}`) + requestProtobufDuration = metrics.NewHistogram(`vl_http_request_duration_seconds{path="/insert/loki/api/v1/push",format="protobuf"}`) ) func parseProtobufRequest(data []byte, processLogMessage func(timestamp int64, fields []logstorage.Field)) (int, error) { diff --git a/app/vlinsert/main.go b/app/vlinsert/main.go index a03c0715c..ac9a727d8 100644 --- a/app/vlinsert/main.go +++ b/app/vlinsert/main.go @@ -7,14 +7,17 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/app/vlinsert/elasticsearch" "github.com/VictoriaMetrics/VictoriaMetrics/app/vlinsert/jsonline" "github.com/VictoriaMetrics/VictoriaMetrics/app/vlinsert/loki" + "github.com/VictoriaMetrics/VictoriaMetrics/app/vlinsert/syslog" ) // Init initializes vlinsert func Init() { + syslog.MustInit() } // Stop stops vlinsert func Stop() { + syslog.MustStop() } // RequestHandler handles insert requests for VictoriaLogs @@ -28,7 +31,8 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool { path = strings.ReplaceAll(path, "//", "/") if path == "/jsonline" { - return jsonline.RequestHandler(w, r) + jsonline.RequestHandler(w, r) + return true } switch { case strings.HasPrefix(path, "/elasticsearch/"): diff --git a/app/vlinsert/syslog/syslog.go b/app/vlinsert/syslog/syslog.go new file mode 100644 index 000000000..f5279d294 --- /dev/null +++ b/app/vlinsert/syslog/syslog.go @@ -0,0 +1,389 @@ +package syslog + +import ( + "bufio" + "crypto/tls" + "errors" + "flag" + "fmt" + "io" + "net" + "strings" + "sync" + "sync/atomic" + "time" + + "github.com/klauspost/compress/gzip" + + "github.com/VictoriaMetrics/VictoriaMetrics/app/vlinsert/insertutils" + "github.com/VictoriaMetrics/VictoriaMetrics/app/vlstorage" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/ingestserver" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter" + "github.com/VictoriaMetrics/metrics" +) + +var ( + syslogTenantID = flag.String("syslog.tenantID", "0:0", "TenantID for logs ingested via Syslog protocol. See https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/") + syslogTimezone = flag.String("syslog.timezone", "Local", "Timezone to use when parsing timestamps in RFC3164 syslog messages. Timezone must be a valid IANA Time Zone. "+ + "For example: America/New_York, Europe/Berlin, Etc/GMT+3 . See https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/") + + listenAddrTCP = flag.String("syslog.listenAddr.tcp", "", "Optional TCP address to listen to for Syslog messages. See https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/") + listenAddrUDP = flag.String("syslog.listenAddr.udp", "", "Optional UDP address to listen to for Syslog messages. See https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/") + + tlsEnable = flag.Bool("syslog.tls", false, "Whether to use TLS for receiving syslog messages at -syslog.listenAddr.tcp. -syslog.tlsCertFile and -syslog.tlsKeyFile must be set "+ + "if -syslog.tls is set. See https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/") + tlsCertFile = flag.String("syslog.tlsCertFile", "", "Path to file with TLS certificate for -syslog.listenAddr.tcp if -syslog.tls is set. "+ + "Prefer ECDSA certs instead of RSA certs as RSA certs are slower. The provided certificate file is automatically re-read every second, so it can be dynamically updated. "+ + "See https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/") + tlsKeyFile = flag.String("syslog.tlsKeyFile", "", "Path to file with TLS key for -syslog.listenAddr.tcp if -syslog.tls is set. "+ + "The provided key file is automatically re-read every second, so it can be dynamically updated") + tlsCipherSuites = flagutil.NewArrayString("syslog.tlsCipherSuites", "Optional list of TLS cipher suites for -syslog.listenAddr.tcp if -syslog.tls is set. "+ + "See the list of supported cipher suites at https://pkg.go.dev/crypto/tls#pkg-constants . "+ + "See also https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/") + tlsMinVersion = flag.String("syslog.tlsMinVersion", "TLS13", "The minimum TLS version to use for -syslog.listenAddr.tcp if -syslog.tls is set. "+ + "Supported values: TLS10, TLS11, TLS12, TLS13. "+ + "See https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/") + + compressMethod = flag.String("syslog.compressMethod", "", "Compression method for syslog messages received at -syslog.listenAddr.tcp and -syslog.listenAddr.udp. "+ + "Supported values: none, gzip, deflate. See https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/") +) + +// MustInit initializes syslog parser at the given -syslog.listenAddr.tcp and -syslog.listenAddr.udp ports +// +// This function must be called after flag.Parse(). +// +// MustStop() must be called in order to free up resources occupied by the initialized syslog parser. +func MustInit() { + if workersStopCh != nil { + logger.Panicf("BUG: MustInit() called twice without MustStop() call") + } + workersStopCh = make(chan struct{}) + + tenantID, err := logstorage.GetTenantIDFromString(*syslogTenantID) + if err != nil { + logger.Fatalf("cannot parse -syslog.tenantID=%q: %s", *syslogTenantID, err) + } + globalTenantID = tenantID + + switch *compressMethod { + case "", "none", "gzip", "deflate": + default: + logger.Fatalf("unexpected -syslog.compressLevel=%q; supported values: none, gzip, deflate", *compressMethod) + } + + if *listenAddrTCP != "" { + workersWG.Add(1) + go func() { + runTCPListener(*listenAddrTCP) + workersWG.Done() + }() + } + + if *listenAddrUDP != "" { + workersWG.Add(1) + go func() { + runUDPListener(*listenAddrUDP) + workersWG.Done() + }() + } + + currentYear := time.Now().Year() + globalCurrentYear.Store(int64(currentYear)) + workersWG.Add(1) + go func() { + ticker := time.NewTicker(time.Minute) + for { + select { + case <-workersStopCh: + ticker.Stop() + workersWG.Done() + return + case <-ticker.C: + currentYear := time.Now().Year() + globalCurrentYear.Store(int64(currentYear)) + } + } + }() + + if *syslogTimezone != "" { + tz, err := time.LoadLocation(*syslogTimezone) + if err != nil { + logger.Fatalf("cannot parse -syslog.timezone=%q: %s", *syslogTimezone, err) + } + globalTimezone = tz + } else { + globalTimezone = time.Local + } +} + +var ( + globalTenantID logstorage.TenantID + globalCurrentYear atomic.Int64 + globalTimezone *time.Location +) + +var ( + workersWG sync.WaitGroup + workersStopCh chan struct{} +) + +// MustStop stops syslog parser initialized via MustInit() +func MustStop() { + close(workersStopCh) + workersWG.Wait() + workersStopCh = nil +} + +func runUDPListener(addr string) { + ln, err := net.ListenPacket(netutil.GetUDPNetwork(), addr) + if err != nil { + logger.Fatalf("cannot start UDP syslog server at %q: %s", addr, err) + } + + doneCh := make(chan struct{}) + go func() { + serveUDP(ln) + close(doneCh) + }() + + <-workersStopCh + if err := ln.Close(); err != nil { + logger.Fatalf("syslog: cannot close UDP listener at %s: %s", addr, err) + } + <-doneCh +} + +func runTCPListener(addr string) { + var tlsConfig *tls.Config + if *tlsEnable { + tc, err := netutil.GetServerTLSConfig(*tlsCertFile, *tlsKeyFile, *tlsMinVersion, *tlsCipherSuites) + if err != nil { + logger.Fatalf("cannot load TLS cert from -syslog.tlsCertFile=%q, -syslog.tlsKeyFile=%q, -syslog.tlsMinVersion=%q, -syslog.tlsCipherSuites=%q: %s", + *tlsCertFile, *tlsKeyFile, *tlsMinVersion, *tlsCipherSuites, err) + } + tlsConfig = tc + } + ln, err := netutil.NewTCPListener("syslog", addr, false, tlsConfig) + if err != nil { + logger.Fatalf("syslog: cannot start TCP listener at %s: %s", addr, err) + } + + doneCh := make(chan struct{}) + go func() { + serveTCP(ln) + close(doneCh) + }() + + <-workersStopCh + if err := ln.Close(); err != nil { + logger.Fatalf("syslog: cannot close TCP listener at %s: %s", addr, err) + } + <-doneCh +} + +func serveUDP(ln net.PacketConn) { + gomaxprocs := cgroup.AvailableCPUs() + var wg sync.WaitGroup + localAddr := ln.LocalAddr() + for i := 0; i < gomaxprocs; i++ { + wg.Add(1) + go func() { + defer wg.Done() + cp := insertutils.GetCommonParamsForSyslog(globalTenantID) + var bb bytesutil.ByteBuffer + bb.B = bytesutil.ResizeNoCopyNoOverallocate(bb.B, 64*1024) + for { + bb.Reset() + bb.B = bb.B[:cap(bb.B)] + n, remoteAddr, err := ln.ReadFrom(bb.B) + if err != nil { + udpErrorsTotal.Inc() + var ne net.Error + if errors.As(err, &ne) { + if ne.Temporary() { + logger.Errorf("syslog: temporary error when listening for UDP at %q: %s", localAddr, err) + time.Sleep(time.Second) + continue + } + if strings.Contains(err.Error(), "use of closed network connection") { + break + } + } + logger.Errorf("syslog: cannot read UDP data from %s at %s: %s", remoteAddr, localAddr, err) + continue + } + bb.B = bb.B[:n] + udpRequestsTotal.Inc() + if err := processStream(bb.NewReader(), cp); err != nil { + logger.Errorf("syslog: cannot process UDP data from %s at %s: %s", remoteAddr, localAddr, err) + } + } + }() + } + wg.Wait() +} + +func serveTCP(ln net.Listener) { + var cm ingestserver.ConnsMap + cm.Init("syslog") + + var wg sync.WaitGroup + addr := ln.Addr() + for { + c, err := ln.Accept() + if err != nil { + var ne net.Error + if errors.As(err, &ne) { + if ne.Temporary() { + logger.Errorf("syslog: temporary error when listening for TCP addr %q: %s", addr, err) + time.Sleep(time.Second) + continue + } + if strings.Contains(err.Error(), "use of closed network connection") { + break + } + logger.Fatalf("syslog: unrecoverable error when accepting TCP connections at %q: %s", addr, err) + } + logger.Fatalf("syslog: unexpected error when accepting TCP connections at %q: %s", addr, err) + } + if !cm.Add(c) { + _ = c.Close() + break + } + + wg.Add(1) + go func() { + cp := insertutils.GetCommonParamsForSyslog(globalTenantID) + if err := processStream(c, cp); err != nil { + logger.Errorf("syslog: cannot process TCP data at %q: %s", addr, err) + } + + cm.Delete(c) + _ = c.Close() + wg.Done() + }() + } + + cm.CloseAll(0) + wg.Wait() +} + +// processStream parses a stream of syslog messages from r and ingests them into vlstorage. +func processStream(r io.Reader, cp *insertutils.CommonParams) error { + switch *compressMethod { + case "", "none": + case "gzip": + zr, err := common.GetGzipReader(r) + if err != nil { + return fmt.Errorf("cannot read gzipped data: %w", err) + } + r = zr + case "deflate": + zr, err := common.GetZlibReader(r) + if err != nil { + return fmt.Errorf("cannot read deflated data: %w", err) + } + r = zr + default: + logger.Panicf("BUG: compressLevel=%q; supported values: none, gzip, deflate", *compressMethod) + } + + err := processUncompressedStream(r, cp) + + switch *compressMethod { + case "gzip": + zr := r.(*gzip.Reader) + common.PutGzipReader(zr) + case "deflate": + zr := r.(io.ReadCloser) + common.PutZlibReader(zr) + } + + return err +} + +func processUncompressedStream(r io.Reader, cp *insertutils.CommonParams) error { + if err := vlstorage.CanWriteData(); err != nil { + return err + } + lr := logstorage.GetLogRows(cp.StreamFields, nil) + processLogMessage := cp.GetProcessLogMessageFunc(lr) + + wcr := writeconcurrencylimiter.GetReader(r) + defer writeconcurrencylimiter.PutReader(wcr) + + lb := lineBufferPool.Get() + defer lineBufferPool.Put(lb) + + lb.B = bytesutil.ResizeNoCopyNoOverallocate(lb.B, insertutils.MaxLineSizeBytes.IntN()) + sc := bufio.NewScanner(wcr) + sc.Buffer(lb.B, len(lb.B)) + + n := 0 + for { + currentYear := int(globalCurrentYear.Load()) + ok, err := readLine(sc, currentYear, globalTimezone, processLogMessage) + wcr.DecConcurrency() + if err != nil { + errorsTotal.Inc() + return fmt.Errorf("cannot read line #%d: %s", n, err) + } + if !ok { + break + } + n++ + rowsIngestedTotal.Inc() + } + + vlstorage.MustAddRows(lr) + logstorage.PutLogRows(lr) + + return nil +} + +func readLine(sc *bufio.Scanner, currentYear int, timezone *time.Location, processLogMessage func(timestamp int64, fields []logstorage.Field)) (bool, error) { + var line []byte + for len(line) == 0 { + if !sc.Scan() { + if err := sc.Err(); err != nil { + if errors.Is(err, bufio.ErrTooLong) { + return false, fmt.Errorf(`line size exceeds -insert.maxLineSizeBytes=%d`, insertutils.MaxLineSizeBytes.IntN()) + } + return false, err + } + return false, nil + } + line = sc.Bytes() + } + + p := logstorage.GetSyslogParser(currentYear, timezone) + lineStr := bytesutil.ToUnsafeString(line) + p.Parse(lineStr) + ts, err := insertutils.ExtractTimestampISO8601FromFields("timestamp", p.Fields) + if err != nil { + return false, fmt.Errorf("cannot get timestamp from syslog line %q: %w", line, err) + } + logstorage.RenameField(p.Fields, "message", "_msg") + processLogMessage(ts, p.Fields) + logstorage.PutSyslogParser(p) + + return true, nil +} + +var lineBufferPool bytesutil.ByteBufferPool + +var ( + rowsIngestedTotal = metrics.NewCounter(`vl_rows_ingested_total{type="syslog"}`) + + errorsTotal = metrics.NewCounter(`vl_errors_total{type="syslog"}`) + + udpRequestsTotal = metrics.NewCounter(`vl_udp_reqests_total{type="syslog"}`) + udpErrorsTotal = metrics.NewCounter(`vl_udp_errors_total{type="syslog"}`) +) diff --git a/app/vlinsert/syslog/syslog_test.go b/app/vlinsert/syslog/syslog_test.go new file mode 100644 index 000000000..d07deab2f --- /dev/null +++ b/app/vlinsert/syslog/syslog_test.go @@ -0,0 +1,65 @@ +package syslog + +import ( + "bufio" + "bytes" + "reflect" + "testing" + "time" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage" +) + +func TestReadLine_Success(t *testing.T) { + f := func(data string, currentYear, rowsExpected int, timestampsExpected []int64, resultExpected string) { + t.Helper() + + MustInit() + defer MustStop() + + var timestamps []int64 + var result string + processLogMessage := func(timestamp int64, fields []logstorage.Field) { + timestamps = append(timestamps, timestamp) + result += string(logstorage.MarshalFieldsToJSON(nil, fields)) + "\n" + } + + r := bytes.NewBufferString(data) + sc := bufio.NewScanner(r) + rows := 0 + timezone := time.UTC + for { + ok, err := readLine(sc, currentYear, timezone, processLogMessage) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + if !ok { + break + } + rows++ + } + if rows != rowsExpected { + t.Fatalf("unexpected rows read; got %d; want %d", rows, rowsExpected) + } + + if !reflect.DeepEqual(timestamps, timestampsExpected) { + t.Fatalf("unexpected timestamps;\ngot\n%d\nwant\n%d", timestamps, timestampsExpected) + } + if result != resultExpected { + t.Fatalf("unexpected result;\ngot\n%s\nwant\n%s", result, resultExpected) + } + } + + data := `Jun 3 12:08:33 abcd systemd: Starting Update the local ESM caches... +<165>Jun 4 12:08:33 abcd systemd[345]: abc defg +<123>1 2023-06-03T17:42:12.345Z mymachine.example.com appname 12345 ID47 [exampleSDID@32473 iut="3" eventSource="Application 123 = ] 56" eventID="11211"] This is a test message with structured data. +` + currentYear := 2023 + rowsExpected := 3 + timestampsExpected := []int64{1685794113000000000, 1685880513000000000, 1685814132345000000} + resultExpected := `{"format":"rfc3164","timestamp":"","hostname":"abcd","app_name":"systemd","_msg":"Starting Update the local ESM caches..."} +{"priority":"165","facility":"20","severity":"5","format":"rfc3164","timestamp":"","hostname":"abcd","app_name":"systemd","proc_id":"345","_msg":"abc defg"} +{"priority":"123","facility":"15","severity":"3","format":"rfc5424","timestamp":"","hostname":"mymachine.example.com","app_name":"appname","proc_id":"12345","msg_id":"ID47","exampleSDID@32473":"iut=\"3\" eventSource=\"Application 123 = ] 56\" eventID=\"11211\"","_msg":"This is a test message with structured data."} +` + f(data, currentYear, rowsExpected, timestampsExpected, resultExpected) +} diff --git a/deployment/docker/docker-compose-victorialogs.yml b/deployment/docker/docker-compose-victorialogs.yml index fd772adc6..2bfaf235a 100644 --- a/deployment/docker/docker-compose-victorialogs.yml +++ b/deployment/docker/docker-compose-victorialogs.yml @@ -42,7 +42,7 @@ services: # storing logs and serving read queries. victorialogs: container_name: victorialogs - image: docker.io/victoriametrics/victoria-logs:v0.19.0-victorialogs + image: docker.io/victoriametrics/victoria-logs:v0.20.0-victorialogs command: - "--storageDataPath=/vlogs" - "--httpListenAddr=:9428" diff --git a/deployment/docker/victorialogs/filebeat-docker/docker-compose.yml b/deployment/docker/victorialogs/filebeat-docker/docker-compose.yml index 145998764..a51df33c7 100644 --- a/deployment/docker/victorialogs/filebeat-docker/docker-compose.yml +++ b/deployment/docker/victorialogs/filebeat-docker/docker-compose.yml @@ -22,7 +22,7 @@ services: - -beat.uri=http://filebeat-victorialogs:5066 victorialogs: - image: docker.io/victoriametrics/victoria-logs:v0.19.0-victorialogs + image: docker.io/victoriametrics/victoria-logs:v0.20.0-victorialogs volumes: - victorialogs-filebeat-docker-vl:/vlogs ports: diff --git a/deployment/docker/victorialogs/filebeat-syslog/docker-compose.yml b/deployment/docker/victorialogs/filebeat-syslog/docker-compose.yml index acddfe7b1..c36103568 100644 --- a/deployment/docker/victorialogs/filebeat-syslog/docker-compose.yml +++ b/deployment/docker/victorialogs/filebeat-syslog/docker-compose.yml @@ -13,7 +13,7 @@ services: - "5140:5140" victorialogs: - image: docker.io/victoriametrics/victoria-logs:v0.19.0-victorialogs + image: docker.io/victoriametrics/victoria-logs:v0.20.0-victorialogs volumes: - victorialogs-filebeat-syslog-vl:/vlogs ports: diff --git a/deployment/docker/victorialogs/fluentbit-docker/docker-compose.yml b/deployment/docker/victorialogs/fluentbit-docker/docker-compose.yml index fdd141860..4675c78ca 100644 --- a/deployment/docker/victorialogs/fluentbit-docker/docker-compose.yml +++ b/deployment/docker/victorialogs/fluentbit-docker/docker-compose.yml @@ -11,7 +11,7 @@ services: - "5140:5140" victorialogs: - image: docker.io/victoriametrics/victoria-logs:v0.19.0-victorialogs + image: docker.io/victoriametrics/victoria-logs:v0.20.0-victorialogs volumes: - victorialogs-fluentbit-vl:/vlogs ports: diff --git a/deployment/docker/victorialogs/logstash/docker-compose.yml b/deployment/docker/victorialogs/logstash/docker-compose.yml index c61d70583..569b95960 100644 --- a/deployment/docker/victorialogs/logstash/docker-compose.yml +++ b/deployment/docker/victorialogs/logstash/docker-compose.yml @@ -14,7 +14,7 @@ services: - "5140:5140" victorialogs: - image: docker.io/victoriametrics/victoria-logs:v0.19.0-victorialogs + image: docker.io/victoriametrics/victoria-logs:v0.20.0-victorialogs volumes: - victorialogs-logstash-vl:/vlogs ports: diff --git a/deployment/docker/victorialogs/promtail/docker-compose.yml b/deployment/docker/victorialogs/promtail/docker-compose.yml index d76305833..48bc23daa 100644 --- a/deployment/docker/victorialogs/promtail/docker-compose.yml +++ b/deployment/docker/victorialogs/promtail/docker-compose.yml @@ -12,7 +12,7 @@ services: - "5140:5140" vlogs: - image: docker.io/victoriametrics/victoria-logs:v0.19.0-victorialogs + image: docker.io/victoriametrics/victoria-logs:v0.20.0-victorialogs volumes: - victorialogs-promtail-docker:/vlogs ports: diff --git a/deployment/docker/victorialogs/vector-docker/docker-compose.yml b/deployment/docker/victorialogs/vector-docker/docker-compose.yml index f50ad40ee..82822545c 100644 --- a/deployment/docker/victorialogs/vector-docker/docker-compose.yml +++ b/deployment/docker/victorialogs/vector-docker/docker-compose.yml @@ -22,7 +22,7 @@ services: condition: service_healthy victorialogs: - image: docker.io/victoriametrics/victoria-logs:v0.19.0-victorialogs + image: docker.io/victoriametrics/victoria-logs:v0.20.0-victorialogs volumes: - victorialogs-vector-docker-vl:/vlogs ports: diff --git a/deployment/logs-benchmark/docker-compose.yml b/deployment/logs-benchmark/docker-compose.yml index e58f5a7ef..321c1a42a 100644 --- a/deployment/logs-benchmark/docker-compose.yml +++ b/deployment/logs-benchmark/docker-compose.yml @@ -3,7 +3,7 @@ version: '3' services: # Run `make package-victoria-logs` to build victoria-logs image vlogs: - image: docker.io/victoriametrics/victoria-logs:v0.19.0-victorialogs + image: docker.io/victoriametrics/victoria-logs:v0.20.0-victorialogs volumes: - vlogs:/vlogs ports: diff --git a/docs/VictoriaLogs/CHANGELOG.md b/docs/VictoriaLogs/CHANGELOG.md index c56e8801a..9a9d39a93 100644 --- a/docs/VictoriaLogs/CHANGELOG.md +++ b/docs/VictoriaLogs/CHANGELOG.md @@ -19,6 +19,14 @@ according to [these docs](https://docs.victoriametrics.com/victorialogs/quicksta ## tip +## [v0.20.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.20.0-victorialogs) + +Released at 2024-06-17 + +* FEATURE: add ability to accept logs in [Syslog format](https://en.wikipedia.org/wiki/Syslog). See [these docs](https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/). +* FEATURE: add abitlity to specify timezone offset when parsing [rfc3164](https://datatracker.ietf.org/doc/html/rfc3164) syslog messages with [`unpack_syslog` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#unpack_syslog-pipe). +* FEATURE: add [`top` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#top-pipe) for returning top N sets of the given fields with the maximum number of matching log entries. + ## [v0.19.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.19.0-victorialogs) Released at 2024-06-11 @@ -284,7 +292,7 @@ Released at 2023-10-03 Released at 2023-07-20 -* FEATURE: add support for data ingestion via Promtail (aka default log shipper for Grafana Loki). See [these](https://docs.victoriametrics.com/victorialogs/data-ingestion/Promtail.html) and [these](https://docs.victoriametrics.com/victorialogs/data-ingestion/#loki-json-api) docs. +* FEATURE: add support for data ingestion via Promtail (aka default log shipper for Grafana Loki). See [these](https://docs.victoriametrics.com/victorialogs/data-ingestion/promtail/) and [these](https://docs.victoriametrics.com/victorialogs/data-ingestion/#loki-json-api) docs. ## [v0.2.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.2.0-victorialogs) diff --git a/docs/VictoriaLogs/LogsQL.md b/docs/VictoriaLogs/LogsQL.md index 7ba1cc861..02ea8ca15 100644 --- a/docs/VictoriaLogs/LogsQL.md +++ b/docs/VictoriaLogs/LogsQL.md @@ -1265,6 +1265,7 @@ LogsQL supports the following pipes: - [`replace_regexp`](#replace_regexp-pipe) updates [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) with regular expressions. - [`sort`](#sort-pipe) sorts logs by the given [fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). - [`stats`](#stats-pipe) calculates various stats over the selected logs. +- [`top`](#top-pipe) returns top `N` field sets with the maximum number of matching logs. - [`uniq`](#uniq-pipe) returns unique log entires. - [`unpack_json`](#unpack_json-pipe) unpacks JSON messages from [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). - [`unpack_logfmt`](#unpack_logfmt-pipe) unpacks [logfmt](https://brandur.org/logfmt) messages from [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). @@ -1573,6 +1574,7 @@ If the limit is reached, then the set of returned values is random. Also the num See also: - [`field_names` pipe](#field_names-pipe) +- [`top` pipe](#top-pipe) - [`uniq` pipe](#uniq-pipe) ### fields pipe @@ -2139,6 +2141,8 @@ See also: - [stats pipe functions](#stats-pipe-functions) - [`math` pipe](#math-pipe) - [`sort` pipe](#sort-pipe) +- [`uniq` pipe](#uniq-pipe) +- [`top` pipe](#top-pipe) #### Stats by fields @@ -2256,9 +2260,41 @@ _time:5m | stats count() total ``` +### top pipe + +`| top N by (field1, ..., fieldN)` [pipe](#pipes) returns top `N` sets for `(field1, ..., fieldN)` [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) +with the maximum number of matching log entries. + +For example, the following query returns top 7 [log streams](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields) +with the maximum number of log entries over the last 5 minutes: + +```logsql +_time:5m | top 7 by (_stream) +``` + +The `N` is optional. If it is skipped, then top 10 entries are returned. For example, the following query returns top 10 values +for `ip` [field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) seen in logs for the last 5 minutes: + +```logsql +_time:5m | top by (ip) +``` + +The `by (...)` part in the `top` [pipe](#pipes) is optional. If it is skipped, then all the log fields are taken into account +when determining top field sets. This is useful when the field sets are already limited by other pipes such as [`fields` pipe](#fields-pipe). +For example, the following query is equivalent to the previous one: + +```logsql +_time:5m | fields ip | top +``` + +See also: + +- [`uniq` pipe](#uniq-pipe) +- [`stats` pipe](#stats-pipe) + ### uniq pipe -`| uniq ...` pipe returns unique results over the selected logs. For example, the following LogsQL query +`| uniq ...` [pipe](#pipes) returns unique results over the selected logs. For example, the following LogsQL query returns unique values for `ip` [log field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) over logs for the last 5 minutes: @@ -2300,6 +2336,8 @@ _time:5m | uniq (host, path) limit 100 See also: - [`uniq_values` stats function](#uniq_values-stats) +- [`top` pipe](#top-pipe) +- [`stats` pipe](#stats-pipe) ### unpack_json pipe @@ -2512,6 +2550,13 @@ The following query is equivalent to the previous one: _time:5m | unpack_syslog ``` +By default timestamps in [RFC3164 format](https://datatracker.ietf.org/doc/html/rfc3164) are converted to local timezone. It is possible to change the timezone +offset via `offset` option. For example, the following query adds 5 hours and 30 minutes to unpacked `rfc3164` timestamps: + +```logsql +_time:5m | unpack_syslog offset 5h30m +``` + If it is needed to preserve the original non-empty field values, then add `keep_original_fields` to the end of `unpack_syslog ...`: ```logsql diff --git a/docs/VictoriaLogs/QuickStart.md b/docs/VictoriaLogs/QuickStart.md index d0606c9c0..7aa83fc7c 100644 --- a/docs/VictoriaLogs/QuickStart.md +++ b/docs/VictoriaLogs/QuickStart.md @@ -36,8 +36,8 @@ Just download archive for the needed Operating system and architecture, unpack i For example, the following commands download VictoriaLogs archive for Linux/amd64, unpack and run it: ```sh -curl -L -O https://github.com/VictoriaMetrics/VictoriaMetrics/releases/download/v0.19.0-victorialogs/victoria-logs-linux-amd64-v0.19.0-victorialogs.tar.gz -tar xzf victoria-logs-linux-amd64-v0.19.0-victorialogs.tar.gz +curl -L -O https://github.com/VictoriaMetrics/VictoriaMetrics/releases/download/v0.20.0-victorialogs/victoria-logs-linux-amd64-v0.20.0-victorialogs.tar.gz +tar xzf victoria-logs-linux-amd64-v0.20.0-victorialogs.tar.gz ./victoria-logs-prod ``` @@ -61,7 +61,7 @@ Here is the command to run VictoriaLogs in a Docker container: ```sh docker run --rm -it -p 9428:9428 -v ./victoria-logs-data:/victoria-logs-data \ - docker.io/victoriametrics/victoria-logs:v0.19.0-victorialogs + docker.io/victoriametrics/victoria-logs:v0.20.0-victorialogs ``` See also: diff --git a/docs/VictoriaLogs/data-ingestion/README.md b/docs/VictoriaLogs/data-ingestion/README.md index 2abc0ce5b..3aba5d949 100644 --- a/docs/VictoriaLogs/data-ingestion/README.md +++ b/docs/VictoriaLogs/data-ingestion/README.md @@ -16,11 +16,12 @@ aliases: [VictoriaLogs](https://docs.victoriametrics.com/victorialogs/) can accept logs from the following log collectors: -- Filebeat. See [how to setup Filebeat for sending logs to VictoriaLogs](https://docs.victoriametrics.com/victorialogs/data-ingestion/Filebeat.html). -- Fluentbit. See [how to setup Fluentbit for sending logs to VictoriaLogs](https://docs.victoriametrics.com/victorialogs/data-ingestion/Fluentbit.html). -- Logstash. See [how to setup Logstash for sending logs to VictoriaLogs](https://docs.victoriametrics.com/victorialogs/data-ingestion/Logstash.html). -- Vector. See [how to setup Vector for sending logs to VictoriaLogs](https://docs.victoriametrics.com/victorialogs/data-ingestion/Vector.html). -- Promtail (aka Grafana Loki). See [how to setup Promtail for sending logs to VictoriaLogs](https://docs.victoriametrics.com/victorialogs/data-ingestion/Promtail.html). +- Syslog - see [these docs](https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/). +- Filebeat - see [these docs](https://docs.victoriametrics.com/victorialogs/data-ingestion/filebeat/). +- Fluentbit - see [these docs](https://docs.victoriametrics.com/victorialogs/data-ingestion/fluentbit/). +- Logstash - see [these docs](https://docs.victoriametrics.com/victorialogs/data-ingestion/logstash/). +- Vector - see [these docs](https://docs.victoriametrics.com/victorialogs/data-ingestion/vector/). +- Promtail (aka Grafana Loki) - see [these docs](https://docs.victoriametrics.com/victorialogs/data-ingestion/promtail/). The ingested logs can be queried according to [these docs](https://docs.victoriametrics.com/victorialogs/querying/). @@ -258,8 +259,8 @@ Here is the list of log collectors and their ingestion formats supported by Vict | How to setup the collector | Format: Elasticsearch | Format: JSON Stream | Format: Loki | |------------------------------------------------------------------------------------------|--------------------------------------------------------------------------------------------|---------------------------------------------------------------|-------------------------------------------------------------------------------------| -| [Filebeat](https://docs.victoriametrics.com/victorialogs/data-ingestion/Filebeat.html) | [Yes](https://www.elastic.co/guide/en/beats/filebeat/current/elasticsearch-output.html) | No | No | -| [Fluentbit](https://docs.victoriametrics.com/victorialogs/data-ingestion/Fluentbit.html) | No | [Yes](https://docs.fluentbit.io/manual/pipeline/outputs/http) | [Yes](https://docs.fluentbit.io/manual/pipeline/outputs/loki) | -| [Logstash](https://docs.victoriametrics.com/victorialogs/data-ingestion/Logstash.html) | [Yes](https://www.elastic.co/guide/en/logstash/current/plugins-outputs-elasticsearch.html) | No | No | -| [Vector](https://docs.victoriametrics.com/victorialogs/data-ingestion/Vector.html) | [Yes](https://vector.dev/docs/reference/configuration/sinks/elasticsearch/) | [Yes](https://vector.dev/docs/reference/configuration/sinks/http/) | [Yes](https://vector.dev/docs/reference/configuration/sinks/loki/) | -| [Promtail](https://docs.victoriametrics.com/victorialogs/data-ingestion/Promtail.html) | No | No | [Yes](https://grafana.com/docs/loki/latest/clients/promtail/configuration/#clients) | +| [Filebeat](https://docs.victoriametrics.com/victorialogs/data-ingestion/filebeat/) | [Yes](https://www.elastic.co/guide/en/beats/filebeat/current/elasticsearch-output.html) | No | No | +| [Fluentbit](https://docs.victoriametrics.com/victorialogs/data-ingestion/fluentbit/) | No | [Yes](https://docs.fluentbit.io/manual/pipeline/outputs/http) | [Yes](https://docs.fluentbit.io/manual/pipeline/outputs/loki) | +| [Logstash](https://docs.victoriametrics.com/victorialogs/data-ingestion/logstash/) | [Yes](https://www.elastic.co/guide/en/logstash/current/plugins-outputs-elasticsearch.html) | No | No | +| [Vector](https://docs.victoriametrics.com/victorialogs/data-ingestion/vector/) | [Yes](https://vector.dev/docs/reference/configuration/sinks/elasticsearch/) | [Yes](https://vector.dev/docs/reference/configuration/sinks/http/) | [Yes](https://vector.dev/docs/reference/configuration/sinks/loki/) | +| [Promtail](https://docs.victoriametrics.com/victorialogs/data-ingestion/promtail/) | No | No | [Yes](https://grafana.com/docs/loki/latest/clients/promtail/configuration/#clients) | diff --git a/docs/VictoriaLogs/data-ingestion/Vector.md b/docs/VictoriaLogs/data-ingestion/Vector.md index 7fc0bee19..1e7bcb1a7 100644 --- a/docs/VictoriaLogs/data-ingestion/Vector.md +++ b/docs/VictoriaLogs/data-ingestion/Vector.md @@ -1,11 +1,11 @@ --- -weight: 5 +weight: 20 title: Vector setup disableToc: true menu: docs: parent: "victorialogs-data-ingestion" - weight: 5 + weight: 20 aliases: - /VictoriaLogs/data-ingestion/Vector.html - /victorialogs/data-ingestion/Vector.html diff --git a/docs/VictoriaLogs/data-ingestion/syslog.md b/docs/VictoriaLogs/data-ingestion/syslog.md new file mode 100644 index 000000000..c6137bb7a --- /dev/null +++ b/docs/VictoriaLogs/data-ingestion/syslog.md @@ -0,0 +1,99 @@ +--- +weight: 10 +title: Syslog setup +disableToc: true +menu: + docs: + parent: "victorialogs-data-ingestion" + weight: 10 +--- + +# Syslog setup + +[VictoriaLogs](https://docs.victoriametrics.com/victorialogs/) can accept logs in [Syslog formats](https://en.wikipedia.org/wiki/Syslog) at the specified TCP and UDP addresses +via `-syslog.listenAddr.tcp` and `-syslog.listenAddr.udp` command-line flags. The following syslog formats are supported: + +- [RFC3164](https://datatracker.ietf.org/doc/html/rfc3164) aka `MMM DD hh:mm:ss HOSTNAME APP-NAME[PROCID]: MESSAGE` +- [RFC5424](https://datatracker.ietf.org/doc/html/rfc5424) aka `1 TIMESTAMP HOSTNAME APP-NAME PROCID MSGID [STRUCTURED-DATA] MESSAGE` + +For example, the following command starts VictoriaLogs, which accepts logs in Syslog format at TCP port 514 on all the network interfaces: + +```sh +./victoria-logs -syslog.listenAddr.tcp=:514 +``` + +It may be needed to run VictoriaLogs under `root` user or to set [`CAP_NET_BIND_SERVICE`](https://superuser.com/questions/710253/allow-non-root-process-to-bind-to-port-80-and-443) +option if syslog messages must be accepted at TCP port below 1024. + +The following command starts VictoriaLogs, which accepts logs in Syslog format at TCP and UDP ports 514: + +```sh +./victoria-logs -syslog.listenAddr.tcp=:514 -syslog.listenAddr.udp=:514 +``` + +Multiple logs in Syslog format can be ingested via a single TCP connection or via a single UDP packet - just put every log on a separate line +and delimit them with `\n` char. + +VictoriaLogs automatically extracts the following [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) +from the received Syslog lines: + +- [`_time`](https://docs.victoriametrics.com/victorialogs/keyconcepts/#time-field) - log timestamp +- [`_msg`](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field) - the `MESSAGE` field from the supported syslog formats above +- `hostname`, `app_name` and `proc_id` - [stream fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields) for unique identification + over every log stream +- `priority`, `factility` and `severity` - these fields are extracted from `` field +- `format` - this field is set to either `rfc3164` or `rfc5424` depending on the format of the parsed syslog line +- `msg_id` - `MSGID` field from log line in `RFC5424` format. + +By default local timezone is used when parsing timestamps in `rfc3164` lines. This can be changed to any desired timezone via `-syslog.timezone` command-line flag. +See [the list of supported timezone identifiers](https://en.wikipedia.org/wiki/List_of_tz_database_time_zones). For example, the following command starts VictoriaLogs, +which parses syslog timestamps in `rfc3164` using `Europe/Berlin` timezone: + +```sh +./victoria-logs -syslog.listenAddr.tcp=:514 -syslog.timezone='Europe/Berlin' +``` + +See also: + +- [Security](#security) +- [Compression](#compression) +- [Multitenancy](#multitenancy) +- [Data ingestion troubleshooting](https://docs.victoriametrics.com/victorialogs/data-ingestion/#troubleshooting). +- [How to query VictoriaLogs](https://docs.victoriametrics.com/victorialogs/querying/). + +## Security + +By default VictoriaLogs accepts plaintext data at `-syslog.listenAddr.tcp` address. Run VictoriaLogs with `-syslog.tls` command-line flag +in order to accept TLS-encrypted logs at `-syslog.listenAddr.tcp` address. The `-syslog.tlsCertFile` and `-syslog.tlsKeyFile` command-line flags +must be set to paths to TLS certificate file and TLS key file if `-syslog.tls` is set. For example, the following command +starts VictoriaLogs, which accepts TLS-encrypted syslog messages at TCP port 514: + +```sh +./victoria-logs -syslog.listenAddr.tcp=:514 -syslog.tls -syslog.tlsCertFile=/path/to/tls/cert -syslog.tlsKeyFile=/path/to/tls/key +``` + +## Compression + +By default VictoriaLogs accepts uncompressed log messages in Syslog format at `-syslog.listenAddr.tcp` and `-syslog.listenAddr.udp` addresses. +It is possible configuring VictoriaLogs to accept compressed log messages via `-syslog.compressMethod` command-line flag. The following +compression methods are supported: + +- `none` - no compression +- `gzip` - [gzip compression](https://en.wikipedia.org/wiki/Gzip) +- `deflate` - [deflate compression](https://en.wikipedia.org/wiki/Deflate) + +For example, the following command starts VictoriaLogs, which accepts gzip-compressed syslog messages at TCP port 514: + +```sh +./victoria-logs -syslog.listenAddr.tcp=:514 -syslog.compressMethod=gzip +``` + +## Multitenancy + +By default, the ingested logs are stored in the `(AccountID=0, ProjectID=0)` [tenant](https://docs.victoriametrics.com/victorialogs/#multitenancy). +If you need storing logs in other tenant, then specify the needed tenant via `-syslog.tenantID` command-line flag. +For example, the following command starts VictoriaLogs, which writes syslog messages received at TCP port 514, to `(AccountID=12, ProjectID=34)` tenant: + +```sh +./victoria-logs -syslog.listenAddr.tcp=:514 -syslog.tenantID=12:34 +``` diff --git a/docs/VictoriaLogs/logsql-examples.md b/docs/VictoriaLogs/logsql-examples.md index b4706de3e..c676d8b9e 100644 --- a/docs/VictoriaLogs/logsql-examples.md +++ b/docs/VictoriaLogs/logsql-examples.md @@ -286,6 +286,12 @@ This query uses the following [LogsQL](https://docs.victoriametrics.com/victoria - [`sort` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#sort-pipe) for sorting the stats by `logs` field in descending order. - [`limit` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#limit-pipe) for limiting the number of returned results to 10. +This query can be simplified into the following one, which uses [`top` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#top-pipe): + +```logsql +_time:5m | top 10 by (_stream) +``` + See also: - [How to filter out data after stats calculation?](#how-to-filter-out-data-after-stats-calculation) diff --git a/lib/ingestserver/conns_map.go b/lib/ingestserver/conns_map.go index 19fafdde9..6dc921a0f 100644 --- a/lib/ingestserver/conns_map.go +++ b/lib/ingestserver/conns_map.go @@ -44,6 +44,8 @@ func (cm *ConnsMap) Delete(c net.Conn) { } // CloseAll gradually closes all the cm conns with during the given shutdownDuration. +// +// If shutdownDuration <= 0, then all the connections are closed simultaneously. func (cm *ConnsMap) CloseAll(shutdownDuration time.Duration) { cm.mu.Lock() conns := make([]net.Conn, 0, len(cm.m)) @@ -70,8 +72,8 @@ func (cm *ConnsMap) CloseAll(shutdownDuration time.Duration) { return } - // Sort vminsert conns in order to make the order of closing connections deterministic across vmstorage nodes. - // This should reduce resource usage spikes at vmstorage nodes during rolling restarts. + // Sort conns in order to make the order of closing connections deterministic across clients. + // This should reduce resource usage spikes at clients during rolling restarts. sort.Slice(conns, func(i, j int) bool { return conns[i].RemoteAddr().String() < conns[j].RemoteAddr().String() }) diff --git a/lib/logstorage/block_result.go b/lib/logstorage/block_result.go index e72184c4a..d76e597cd 100644 --- a/lib/logstorage/block_result.go +++ b/lib/logstorage/block_result.go @@ -1166,7 +1166,7 @@ func (br *blockResult) getBucketedValue(s string, bf *byStatsField) string { return bytesutil.ToUnsafeString(buf[bufLen:]) } - if timestamp, ok := tryParseTimestampISO8601(s); ok { + if timestamp, ok := TryParseTimestampISO8601(s); ok { bucketSizeInt := int64(bf.bucketSize) if bucketSizeInt <= 0 { bucketSizeInt = 1 @@ -1190,7 +1190,7 @@ func (br *blockResult) getBucketedValue(s string, bf *byStatsField) string { return bytesutil.ToUnsafeString(buf[bufLen:]) } - if timestamp, ok := tryParseTimestampRFC3339Nano(s); ok { + if timestamp, ok := TryParseTimestampRFC3339Nano(s); ok { bucketSizeInt := int64(bf.bucketSize) if bucketSizeInt <= 0 { bucketSizeInt = 1 diff --git a/lib/logstorage/filter_day_range.go b/lib/logstorage/filter_day_range.go index 14136b69d..fc23e169b 100644 --- a/lib/logstorage/filter_day_range.go +++ b/lib/logstorage/filter_day_range.go @@ -102,7 +102,7 @@ func (fr *filterDayRange) applyToBlockResult(br *blockResult, bm *bitmap) { } func (fr *filterDayRange) matchTimestampString(v string) bool { - timestamp, ok := tryParseTimestampRFC3339Nano(v) + timestamp, ok := TryParseTimestampRFC3339Nano(v) if !ok { return false } diff --git a/lib/logstorage/filter_exact.go b/lib/logstorage/filter_exact.go index 98e9ea1ee..7434e7339 100644 --- a/lib/logstorage/filter_exact.go +++ b/lib/logstorage/filter_exact.go @@ -141,7 +141,7 @@ func (fe *filterExact) applyToBlockResult(br *blockResult, bm *bitmap) { return ip == ipNeeded }) case valueTypeTimestampISO8601: - timestampNeeded, ok := tryParseTimestampISO8601(value) + timestampNeeded, ok := TryParseTimestampISO8601(value) if !ok { bm.resetBits() return @@ -213,7 +213,7 @@ func (fe *filterExact) applyToBlockSearch(bs *blockSearch, bm *bitmap) { } func matchTimestampISO8601ByExactValue(bs *blockSearch, ch *columnHeader, bm *bitmap, value string, tokens []string) { - n, ok := tryParseTimestampISO8601(value) + n, ok := TryParseTimestampISO8601(value) if !ok || n < int64(ch.minValue) || n > int64(ch.maxValue) { bm.resetBits() return diff --git a/lib/logstorage/filter_in.go b/lib/logstorage/filter_in.go index eb39f41f6..c3d4d3622 100644 --- a/lib/logstorage/filter_in.go +++ b/lib/logstorage/filter_in.go @@ -245,7 +245,7 @@ func (fi *filterIn) initTimestampISO8601Values() { m := make(map[string]struct{}, len(values)) buf := make([]byte, 0, len(values)*8) for _, v := range values { - n, ok := tryParseTimestampISO8601(v) + n, ok := TryParseTimestampISO8601(v) if !ok { continue } diff --git a/lib/logstorage/filter_phrase.go b/lib/logstorage/filter_phrase.go index 3b4c04174..2598209be 100644 --- a/lib/logstorage/filter_phrase.go +++ b/lib/logstorage/filter_phrase.go @@ -100,7 +100,7 @@ func (fp *filterPhrase) applyToBlockSearch(bs *blockSearch, bm *bitmap) { } func matchTimestampISO8601ByPhrase(bs *blockSearch, ch *columnHeader, bm *bitmap, phrase string, tokens []string) { - _, ok := tryParseTimestampISO8601(phrase) + _, ok := TryParseTimestampISO8601(phrase) if ok { // Fast path - the phrase contains complete timestamp, so we can use exact search matchTimestampISO8601ByExactValue(bs, ch, bm, phrase, tokens) diff --git a/lib/logstorage/filter_time.go b/lib/logstorage/filter_time.go index d44c542e0..1009657f6 100644 --- a/lib/logstorage/filter_time.go +++ b/lib/logstorage/filter_time.go @@ -96,7 +96,7 @@ func (ft *filterTime) applyToBlockResult(br *blockResult, bm *bitmap) { } func (ft *filterTime) matchTimestampString(v string) bool { - timestamp, ok := tryParseTimestampRFC3339Nano(v) + timestamp, ok := TryParseTimestampRFC3339Nano(v) if !ok { return false } diff --git a/lib/logstorage/filter_week_range.go b/lib/logstorage/filter_week_range.go index aba5e3452..e7854666d 100644 --- a/lib/logstorage/filter_week_range.go +++ b/lib/logstorage/filter_week_range.go @@ -1,9 +1,9 @@ package logstorage import ( - "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" - "time" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" ) // filterWeekRange filters by week range. @@ -104,7 +104,7 @@ func (fr *filterWeekRange) applyToBlockResult(br *blockResult, bm *bitmap) { } func (fr *filterWeekRange) matchTimestampString(v string) bool { - timestamp, ok := tryParseTimestampRFC3339Nano(v) + timestamp, ok := TryParseTimestampRFC3339Nano(v) if !ok { return false } diff --git a/lib/logstorage/json_parser.go b/lib/logstorage/json_parser.go index bd6823ff2..5fe881a54 100644 --- a/lib/logstorage/json_parser.go +++ b/lib/logstorage/json_parser.go @@ -78,21 +78,6 @@ func (p *JSONParser) ParseLogMessage(msg []byte) error { return nil } -// RenameField renames field with the oldName to newName in p.Fields -func (p *JSONParser) RenameField(oldName, newName string) { - if oldName == "" { - return - } - fields := p.Fields - for i := range fields { - f := &fields[i] - if f.Name == oldName { - f.Name = newName - return - } - } -} - func appendLogFields(dst []Field, dstBuf, prefixBuf []byte, v *fastjson.Value) ([]Field, []byte, []byte) { o := v.GetObject() o.Visit(func(k []byte, v *fastjson.Value) { diff --git a/lib/logstorage/log_rows.go b/lib/logstorage/log_rows.go index b1c9c3e56..bf5553b84 100644 --- a/lib/logstorage/log_rows.go +++ b/lib/logstorage/log_rows.go @@ -60,20 +60,8 @@ type RowFormatter []Field // String returns user-readable representation for rf func (rf *RowFormatter) String() string { - b := append([]byte{}, '{') - - fields := *rf - if len(fields) > 0 { - b = append(b, fields[0].String()...) - fields = fields[1:] - for _, field := range fields { - b = append(b, ',') - b = append(b, field.String()...) - } - } - - b = append(b, '}') - return string(b) + result := MarshalFieldsToJSON(nil, *rf) + return string(result) } // Reset resets lr with all its settings. diff --git a/lib/logstorage/logfmt_parser_test.go b/lib/logstorage/logfmt_parser_test.go index 66a1e6552..ee0309f8f 100644 --- a/lib/logstorage/logfmt_parser_test.go +++ b/lib/logstorage/logfmt_parser_test.go @@ -26,5 +26,5 @@ func TestLogfmtParser(t *testing.T) { f(`foo bar`, `{"foo":"","bar":""}`) f(`foo bar=baz`, `{"foo":"","bar":"baz"}`) f(`foo=bar baz="x y" a=b`, `{"foo":"bar","baz":"x y","a":"b"}`) - f(` foo=bar baz=x =z qwe`, `{"foo":"bar","baz":"x","":"z","qwe":""}`) + f(` foo=bar baz=x =z qwe`, `{"foo":"bar","baz":"x","_msg":"z","qwe":""}`) } diff --git a/lib/logstorage/parser.go b/lib/logstorage/parser.go index 9aaaa3b26..a4e4d119c 100644 --- a/lib/logstorage/parser.go +++ b/lib/logstorage/parser.go @@ -1316,7 +1316,7 @@ func parseNumber(lex *lexer) (float64, string, error) { return f, s, nil } - return 0, "", fmt.Errorf("cannot parse %q as float64", s) + return 0, s, fmt.Errorf("cannot parse %q as float64", s) } func parseFuncArg(lex *lexer, fieldName string, callback func(args string) (filter, error)) (filter, error) { diff --git a/lib/logstorage/pipe.go b/lib/logstorage/pipe.go index d9890f043..c330fc331 100644 --- a/lib/logstorage/pipe.go +++ b/lib/logstorage/pipe.go @@ -214,6 +214,12 @@ func parsePipe(lex *lexer) (pipe, error) { return nil, fmt.Errorf("cannot parse 'stats' pipe: %w", err) } return ps, nil + case lex.isKeyword("top"): + pt, err := parsePipeTop(lex) + if err != nil { + return nil, fmt.Errorf("cannot parse 'top' pipe: %w", err) + } + return pt, nil case lex.isKeyword("uniq"): pu, err := parsePipeUniq(lex) if err != nil { @@ -287,6 +293,7 @@ var pipeNames = func() map[string]struct{} { "replace_regexp", "sort", "stats", + "top", "uniq", "unpack_json", "unpack_logfmt", diff --git a/lib/logstorage/pipe_math.go b/lib/logstorage/pipe_math.go index 350c578ea..caf6487a4 100644 --- a/lib/logstorage/pipe_math.go +++ b/lib/logstorage/pipe_math.go @@ -919,7 +919,7 @@ func parseMathNumber(s string) float64 { if ok { return f } - nsecs, ok := tryParseTimestampRFC3339Nano(s) + nsecs, ok := TryParseTimestampRFC3339Nano(s) if ok { return float64(nsecs) } diff --git a/lib/logstorage/pipe_topk.go b/lib/logstorage/pipe_sort_topk.go similarity index 100% rename from lib/logstorage/pipe_topk.go rename to lib/logstorage/pipe_sort_topk.go diff --git a/lib/logstorage/pipe_top.go b/lib/logstorage/pipe_top.go new file mode 100644 index 000000000..e633d87c8 --- /dev/null +++ b/lib/logstorage/pipe_top.go @@ -0,0 +1,500 @@ +package logstorage + +import ( + "fmt" + "slices" + "sort" + "strings" + "sync/atomic" + "unsafe" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/memory" +) + +// pipeTopDefaultLimit is the default number of entries pipeTop returns. +const pipeTopDefaultLimit = 10 + +// pipeTop processes '| top ...' queries. +// +// See https://docs.victoriametrics.com/victorialogs/logsql/#top-pipe +type pipeTop struct { + // fields contains field names for returning top values for. + byFields []string + + // limit is the number of top (byFields) sets to return. + limit uint64 + + // limitStr is string representation of the limit. + limitStr string + + // if hitsFieldName isn't empty, then the number of hits per each unique value is returned in this field. + hitsFieldName string +} + +func (pt *pipeTop) String() string { + s := "top" + if pt.limit != pipeTopDefaultLimit { + s += " " + pt.limitStr + } + if len(pt.byFields) > 0 { + s += " by (" + fieldNamesString(pt.byFields) + ")" + } + return s +} + +func (pt *pipeTop) updateNeededFields(neededFields, unneededFields fieldsSet) { + neededFields.reset() + unneededFields.reset() + + if len(pt.byFields) == 0 { + neededFields.add("*") + } else { + neededFields.addFields(pt.byFields) + } +} + +func (pt *pipeTop) optimize() { + // nothing to do +} + +func (pt *pipeTop) hasFilterInWithQuery() bool { + return false +} + +func (pt *pipeTop) initFilterInValues(_ map[string][]string, _ getFieldValuesFunc) (pipe, error) { + return pt, nil +} + +func (pt *pipeTop) newPipeProcessor(workersCount int, stopCh <-chan struct{}, cancel func(), ppNext pipeProcessor) pipeProcessor { + maxStateSize := int64(float64(memory.Allowed()) * 0.2) + + shards := make([]pipeTopProcessorShard, workersCount) + for i := range shards { + shards[i] = pipeTopProcessorShard{ + pipeTopProcessorShardNopad: pipeTopProcessorShardNopad{ + pt: pt, + stateSizeBudget: stateSizeBudgetChunk, + }, + } + maxStateSize -= stateSizeBudgetChunk + } + + ptp := &pipeTopProcessor{ + pt: pt, + stopCh: stopCh, + cancel: cancel, + ppNext: ppNext, + + shards: shards, + + maxStateSize: maxStateSize, + } + ptp.stateSizeBudget.Store(maxStateSize) + + return ptp +} + +type pipeTopProcessor struct { + pt *pipeTop + stopCh <-chan struct{} + cancel func() + ppNext pipeProcessor + + shards []pipeTopProcessorShard + + maxStateSize int64 + stateSizeBudget atomic.Int64 +} + +type pipeTopProcessorShard struct { + pipeTopProcessorShardNopad + + // The padding prevents false sharing on widespread platforms with 128 mod (cache line size) = 0 . + _ [128 - unsafe.Sizeof(pipeTopProcessorShardNopad{})%128]byte +} + +type pipeTopProcessorShardNopad struct { + // pt points to the parent pipeTop. + pt *pipeTop + + // m holds per-row hits. + m map[string]*uint64 + + // keyBuf is a temporary buffer for building keys for m. + keyBuf []byte + + // columnValues is a temporary buffer for the processed column values. + columnValues [][]string + + // stateSizeBudget is the remaining budget for the whole state size for the shard. + // The per-shard budget is provided in chunks from the parent pipeTopProcessor. + stateSizeBudget int +} + +// writeBlock writes br to shard. +func (shard *pipeTopProcessorShard) writeBlock(br *blockResult) { + byFields := shard.pt.byFields + if len(byFields) == 0 { + // Take into account all the columns in br. + keyBuf := shard.keyBuf + cs := br.getColumns() + for i := range br.timestamps { + keyBuf = keyBuf[:0] + for _, c := range cs { + v := c.getValueAtRow(br, i) + keyBuf = encoding.MarshalBytes(keyBuf, bytesutil.ToUnsafeBytes(c.name)) + keyBuf = encoding.MarshalBytes(keyBuf, bytesutil.ToUnsafeBytes(v)) + } + shard.updateState(bytesutil.ToUnsafeString(keyBuf), 1) + } + shard.keyBuf = keyBuf + return + } + if len(byFields) == 1 { + // Fast path for a single field. + c := br.getColumnByName(byFields[0]) + if c.isConst { + v := c.valuesEncoded[0] + shard.updateState(v, uint64(len(br.timestamps))) + return + } + if c.valueType == valueTypeDict { + a := encoding.GetUint64s(len(c.dictValues)) + hits := a.A + valuesEncoded := c.getValuesEncoded(br) + for _, v := range valuesEncoded { + idx := unmarshalUint8(v) + hits[idx]++ + } + for i, v := range c.dictValues { + shard.updateState(v, hits[i]) + } + encoding.PutUint64s(a) + return + } + + values := c.getValues(br) + for _, v := range values { + shard.updateState(v, 1) + } + return + } + + // Take into account only the selected columns. + columnValues := shard.columnValues[:0] + for _, f := range byFields { + c := br.getColumnByName(f) + values := c.getValues(br) + columnValues = append(columnValues, values) + } + shard.columnValues = columnValues + + keyBuf := shard.keyBuf + for i := range br.timestamps { + keyBuf = keyBuf[:0] + for _, values := range columnValues { + keyBuf = encoding.MarshalBytes(keyBuf, bytesutil.ToUnsafeBytes(values[i])) + } + shard.updateState(bytesutil.ToUnsafeString(keyBuf), 1) + } + shard.keyBuf = keyBuf +} + +func (shard *pipeTopProcessorShard) updateState(v string, hits uint64) { + m := shard.getM() + pHits, ok := m[v] + if !ok { + vCopy := strings.Clone(v) + hits := uint64(0) + pHits = &hits + m[vCopy] = pHits + shard.stateSizeBudget -= len(vCopy) + int(unsafe.Sizeof(vCopy)+unsafe.Sizeof(hits)+unsafe.Sizeof(pHits)) + } + *pHits += hits +} + +func (shard *pipeTopProcessorShard) getM() map[string]*uint64 { + if shard.m == nil { + shard.m = make(map[string]*uint64) + } + return shard.m +} + +func (ptp *pipeTopProcessor) writeBlock(workerID uint, br *blockResult) { + if len(br.timestamps) == 0 { + return + } + + shard := &ptp.shards[workerID] + + for shard.stateSizeBudget < 0 { + // steal some budget for the state size from the global budget. + remaining := ptp.stateSizeBudget.Add(-stateSizeBudgetChunk) + if remaining < 0 { + // The state size is too big. Stop processing data in order to avoid OOM crash. + if remaining+stateSizeBudgetChunk >= 0 { + // Notify worker goroutines to stop calling writeBlock() in order to save CPU time. + ptp.cancel() + } + return + } + shard.stateSizeBudget += stateSizeBudgetChunk + } + + shard.writeBlock(br) +} + +func (ptp *pipeTopProcessor) flush() error { + if n := ptp.stateSizeBudget.Load(); n <= 0 { + return fmt.Errorf("cannot calculate [%s], since it requires more than %dMB of memory", ptp.pt.String(), ptp.maxStateSize/(1<<20)) + } + + // merge state across shards + shards := ptp.shards + m := shards[0].getM() + shards = shards[1:] + for i := range shards { + if needStop(ptp.stopCh) { + return nil + } + + for k, pHitsSrc := range shards[i].getM() { + pHits, ok := m[k] + if !ok { + m[k] = pHitsSrc + } else { + *pHits += *pHitsSrc + } + } + } + + // select top entries with the biggest number of hits + entries := make([]pipeTopEntry, 0, len(m)) + for k, pHits := range m { + entries = append(entries, pipeTopEntry{ + k: k, + hits: *pHits, + }) + } + sort.Slice(entries, func(i, j int) bool { + a, b := &entries[i], &entries[j] + if a.hits == b.hits { + return a.k < b.k + } + return a.hits > b.hits + }) + if uint64(len(entries)) > ptp.pt.limit { + entries = entries[:ptp.pt.limit] + } + + // write result + wctx := &pipeTopWriteContext{ + ptp: ptp, + } + byFields := ptp.pt.byFields + var rowFields []Field + + addHitsField := func(dst []Field, hits uint64) []Field { + hitsStr := string(marshalUint64String(nil, hits)) + dst = append(dst, Field{ + Name: ptp.pt.hitsFieldName, + Value: hitsStr, + }) + return dst + } + + if len(byFields) == 0 { + for _, e := range entries { + if needStop(ptp.stopCh) { + return nil + } + + rowFields = rowFields[:0] + keyBuf := bytesutil.ToUnsafeBytes(e.k) + for len(keyBuf) > 0 { + name, nSize := encoding.UnmarshalBytes(keyBuf) + if nSize <= 0 { + logger.Panicf("BUG: cannot unmarshal field name") + } + keyBuf = keyBuf[nSize:] + + value, nSize := encoding.UnmarshalBytes(keyBuf) + if nSize <= 0 { + logger.Panicf("BUG: cannot unmarshal field value") + } + keyBuf = keyBuf[nSize:] + + rowFields = append(rowFields, Field{ + Name: bytesutil.ToUnsafeString(name), + Value: bytesutil.ToUnsafeString(value), + }) + } + rowFields = addHitsField(rowFields, e.hits) + wctx.writeRow(rowFields) + } + } else if len(byFields) == 1 { + fieldName := byFields[0] + for _, e := range entries { + if needStop(ptp.stopCh) { + return nil + } + + rowFields = append(rowFields[:0], Field{ + Name: fieldName, + Value: e.k, + }) + rowFields = addHitsField(rowFields, e.hits) + wctx.writeRow(rowFields) + } + } else { + for _, e := range entries { + if needStop(ptp.stopCh) { + return nil + } + + rowFields = rowFields[:0] + keyBuf := bytesutil.ToUnsafeBytes(e.k) + fieldIdx := 0 + for len(keyBuf) > 0 { + value, nSize := encoding.UnmarshalBytes(keyBuf) + if nSize <= 0 { + logger.Panicf("BUG: cannot unmarshal field value") + } + keyBuf = keyBuf[nSize:] + + rowFields = append(rowFields, Field{ + Name: byFields[fieldIdx], + Value: bytesutil.ToUnsafeString(value), + }) + fieldIdx++ + } + rowFields = addHitsField(rowFields, e.hits) + wctx.writeRow(rowFields) + } + } + + wctx.flush() + + return nil +} + +type pipeTopEntry struct { + k string + hits uint64 +} + +type pipeTopWriteContext struct { + ptp *pipeTopProcessor + rcs []resultColumn + br blockResult + + // rowsCount is the number of rows in the current block + rowsCount int + + // valuesLen is the total length of values in the current block + valuesLen int +} + +func (wctx *pipeTopWriteContext) writeRow(rowFields []Field) { + rcs := wctx.rcs + + areEqualColumns := len(rcs) == len(rowFields) + if areEqualColumns { + for i, f := range rowFields { + if rcs[i].name != f.Name { + areEqualColumns = false + break + } + } + } + if !areEqualColumns { + // send the current block to ppNext and construct a block with new set of columns + wctx.flush() + + rcs = wctx.rcs[:0] + for _, f := range rowFields { + rcs = appendResultColumnWithName(rcs, f.Name) + } + wctx.rcs = rcs + } + + for i, f := range rowFields { + v := f.Value + rcs[i].addValue(v) + wctx.valuesLen += len(v) + } + + wctx.rowsCount++ + if wctx.valuesLen >= 1_000_000 { + wctx.flush() + } +} + +func (wctx *pipeTopWriteContext) flush() { + rcs := wctx.rcs + br := &wctx.br + + wctx.valuesLen = 0 + + // Flush rcs to ppNext + br.setResultColumns(rcs, wctx.rowsCount) + wctx.rowsCount = 0 + wctx.ptp.ppNext.writeBlock(0, br) + br.reset() + for i := range rcs { + rcs[i].resetValues() + } +} + +func parsePipeTop(lex *lexer) (*pipeTop, error) { + if !lex.isKeyword("top") { + return nil, fmt.Errorf("expecting 'top'; got %q", lex.token) + } + lex.nextToken() + + limit := uint64(pipeTopDefaultLimit) + limitStr := "" + if isNumberPrefix(lex.token) { + limitF, s, err := parseNumber(lex) + if err != nil { + return nil, fmt.Errorf("cannot parse N in 'top': %w", err) + } + if limitF < 1 { + return nil, fmt.Errorf("N in 'top %s' must be integer bigger than 0", s) + } + limit = uint64(limitF) + limitStr = s + } + + var byFields []string + if lex.isKeyword("by", "(") { + if lex.isKeyword("by") { + lex.nextToken() + } + bfs, err := parseFieldNamesInParens(lex) + if err != nil { + return nil, fmt.Errorf("cannot parse 'by' clause in 'top': %w", err) + } + if slices.Contains(bfs, "*") { + bfs = nil + } + byFields = bfs + } + + hitsFieldName := "hits" + for slices.Contains(byFields, hitsFieldName) { + hitsFieldName += "s" + } + + pt := &pipeTop{ + byFields: byFields, + limit: limit, + limitStr: limitStr, + hitsFieldName: hitsFieldName, + } + + return pt, nil +} diff --git a/lib/logstorage/pipe_top_test.go b/lib/logstorage/pipe_top_test.go new file mode 100644 index 000000000..edcd4db46 --- /dev/null +++ b/lib/logstorage/pipe_top_test.go @@ -0,0 +1,313 @@ +package logstorage + +import ( + "testing" +) + +func TestParsePipeTopSuccess(t *testing.T) { + f := func(pipeStr string) { + t.Helper() + expectParsePipeSuccess(t, pipeStr) + } + + f(`top`) + f(`top 5`) + f(`top by (x)`) + f(`top 5 by (x)`) + f(`top by (x, y)`) + f(`top 5 by (x, y)`) +} + +func TestParsePipeTopFailure(t *testing.T) { + f := func(pipeStr string) { + t.Helper() + expectParsePipeFailure(t, pipeStr) + } + + f(`top 5 foo`) + f(`top 5 by`) + f(`top 5 by (`) + f(`top 5foo`) + f(`top foo`) + f(`top by`) +} + +func TestPipeTop(t *testing.T) { + f := func(pipeStr string, rows, rowsExpected [][]Field) { + t.Helper() + expectPipeResults(t, pipeStr, rows, rowsExpected) + } + + f("top", [][]Field{ + { + {"a", `2`}, + {"b", `3`}, + }, + { + {"a", "2"}, + {"b", "3"}, + }, + { + {"a", `2`}, + {"b", `54`}, + {"c", "d"}, + }, + }, [][]Field{ + { + {"a", "2"}, + {"b", "3"}, + {"hits", "2"}, + }, + { + {"a", `2`}, + {"b", `54`}, + {"c", "d"}, + {"hits", "1"}, + }, + }) + + f("top 1", [][]Field{ + { + {"a", `2`}, + {"b", `3`}, + }, + { + {"a", "2"}, + {"b", "3"}, + }, + { + {"a", `2`}, + {"b", `54`}, + {"c", "d"}, + }, + }, [][]Field{ + { + {"a", "2"}, + {"b", "3"}, + {"hits", "2"}, + }, + }) + + f("top by (a)", [][]Field{ + { + {"a", `2`}, + {"b", `3`}, + }, + { + {"a", "2"}, + {"b", "3"}, + }, + { + {"a", `2`}, + {"b", `54`}, + {"c", "d"}, + }, + }, [][]Field{ + { + {"a", "2"}, + {"hits", "3"}, + }, + }) + + f("top by (b)", [][]Field{ + { + {"a", `2`}, + {"b", `3`}, + }, + { + {"a", "2"}, + {"b", "3"}, + }, + { + {"a", `2`}, + {"b", `54`}, + {"c", "d"}, + }, + }, [][]Field{ + { + {"b", "3"}, + {"hits", "2"}, + }, + { + {"b", "54"}, + {"hits", "1"}, + }, + }) + + f("top by (hits)", [][]Field{ + { + {"a", `2`}, + {"hits", `3`}, + }, + { + {"a", "2"}, + {"hits", "3"}, + }, + { + {"a", `2`}, + {"hits", `54`}, + {"c", "d"}, + }, + }, [][]Field{ + { + {"hits", "3"}, + {"hitss", "2"}, + }, + { + {"hits", "54"}, + {"hitss", "1"}, + }, + }) + + f("top by (c)", [][]Field{ + { + {"a", `2`}, + {"b", `3`}, + }, + { + {"a", "2"}, + {"b", "3"}, + }, + { + {"a", `2`}, + {"b", `54`}, + {"c", "d"}, + }, + }, [][]Field{ + { + {"c", ""}, + {"hits", "2"}, + }, + { + {"c", "d"}, + {"hits", "1"}, + }, + }) + + f("top by (d)", [][]Field{ + { + {"a", `2`}, + {"b", `3`}, + }, + { + {"a", "2"}, + {"b", "3"}, + }, + { + {"a", `2`}, + {"b", `54`}, + {"c", "d"}, + }, + }, [][]Field{ + { + {"d", ""}, + {"hits", "3"}, + }, + }) + + f("top by (a, b)", [][]Field{ + { + {"a", `2`}, + {"b", `3`}, + }, + { + {"a", "2"}, + {"b", "3"}, + }, + { + {"a", `2`}, + {"b", `54`}, + {"c", "d"}, + }, + }, [][]Field{ + { + {"a", "2"}, + {"b", "3"}, + {"hits", "2"}, + }, + { + {"a", "2"}, + {"b", "54"}, + {"hits", "1"}, + }, + }) + + f("top 10 by (a, b)", [][]Field{ + { + {"a", `2`}, + {"b", `3`}, + }, + { + {"a", "2"}, + {"b", "3"}, + }, + { + {"a", `2`}, + {"b", `54`}, + {"c", "d"}, + }, + }, [][]Field{ + { + {"a", "2"}, + {"b", "3"}, + {"hits", "2"}, + }, + { + {"a", "2"}, + {"b", "54"}, + {"hits", "1"}, + }, + }) + + f("top 1 by (a, b)", [][]Field{ + { + {"a", `2`}, + {"b", `3`}, + }, + { + {"a", "2"}, + {"b", "3"}, + }, + { + {"a", `2`}, + {"b", `54`}, + {"c", "d"}, + }, + }, [][]Field{ + { + {"a", "2"}, + {"b", "3"}, + {"hits", "2"}, + }, + }) +} + +func TestPipeTopUpdateNeededFields(t *testing.T) { + f := func(s, neededFields, unneededFields, neededFieldsExpected, unneededFieldsExpected string) { + t.Helper() + expectPipeNeededFields(t, s, neededFields, unneededFields, neededFieldsExpected, unneededFieldsExpected) + } + + // all the needed fields + f("top", "*", "", "*", "") + f("top by()", "*", "", "*", "") + f("top by(*)", "*", "", "*", "") + f("top by(f1,f2)", "*", "", "f1,f2", "") + f("top by(f1,f2)", "*", "", "f1,f2", "") + + // all the needed fields, unneeded fields do not intersect with src + f("top by(s1, s2)", "*", "f1,f2", "s1,s2", "") + f("top", "*", "f1,f2", "*", "") + + // all the needed fields, unneeded fields intersect with src + f("top by(s1, s2)", "*", "s1,f1,f2", "s1,s2", "") + f("top by(*)", "*", "s1,f1,f2", "*", "") + f("top by(s1, s2)", "*", "s1,s2,f1", "s1,s2", "") + + // needed fields do not intersect with src + f("top by (s1, s2)", "f1,f2", "", "s1,s2", "") + + // needed fields intersect with src + f("top by (s1, s2)", "s1,f1,f2", "", "s1,s2", "") + f("top by (*)", "s1,f1,f2", "", "*", "") +} diff --git a/lib/logstorage/pipe_unpack_syslog.go b/lib/logstorage/pipe_unpack_syslog.go index f693739c3..afcd337f2 100644 --- a/lib/logstorage/pipe_unpack_syslog.go +++ b/lib/logstorage/pipe_unpack_syslog.go @@ -13,6 +13,10 @@ type pipeUnpackSyslog struct { // fromField is the field to unpack syslog fields from fromField string + // the timezone to use when parsing rfc3164 timestamps + offsetStr string + offsetTimezone *time.Location + // resultPrefix is prefix to add to unpacked field names resultPrefix string @@ -30,6 +34,9 @@ func (pu *pipeUnpackSyslog) String() string { if !isMsgFieldName(pu.fromField) { s += " from " + quoteTokenIfNeeded(pu.fromField) } + if pu.offsetStr != "" { + s += " offset " + pu.offsetStr + } if pu.resultPrefix != "" { s += " result_prefix " + quoteTokenIfNeeded(pu.resultPrefix) } @@ -64,14 +71,14 @@ func (pu *pipeUnpackSyslog) initFilterInValues(cache map[string][]string, getFie func (pu *pipeUnpackSyslog) newPipeProcessor(workersCount int, _ <-chan struct{}, _ func(), ppNext pipeProcessor) pipeProcessor { unpackSyslog := func(uctx *fieldsUnpackerContext, s string) { year := currentYear.Load() - p := getSyslogParser(int(year)) + p := GetSyslogParser(int(year), pu.offsetTimezone) - p.parse(s) - for _, f := range p.fields { + p.Parse(s) + for _, f := range p.Fields { uctx.addField(f.Name, f.Value) } - putSyslogParser(p) + PutSyslogParser(p) } return newPipeUnpackProcessor(workersCount, unpackSyslog, ppNext, pu.fromField, pu.resultPrefix, pu.keepOriginalFields, false, pu.iff) @@ -119,6 +126,23 @@ func parsePipeUnpackSyslog(lex *lexer) (*pipeUnpackSyslog, error) { fromField = f } + offsetStr := "" + offsetTimezone := time.Local + if lex.isKeyword("offset") { + lex.nextToken() + s, err := getCompoundToken(lex) + if err != nil { + return nil, fmt.Errorf("cannot read 'offset': %w", err) + } + offsetStr = s + nsecs, ok := tryParseDuration(offsetStr) + if !ok { + return nil, fmt.Errorf("cannot parse 'offset' from %q", offsetStr) + } + secs := nsecs / nsecsPerSecond + offsetTimezone = time.FixedZone("custom", int(secs)) + } + resultPrefix := "" if lex.isKeyword("result_prefix") { lex.nextToken() @@ -137,6 +161,8 @@ func parsePipeUnpackSyslog(lex *lexer) (*pipeUnpackSyslog, error) { pu := &pipeUnpackSyslog{ fromField: fromField, + offsetStr: offsetStr, + offsetTimezone: offsetTimezone, resultPrefix: resultPrefix, keepOriginalFields: keepOriginalFields, iff: iff, diff --git a/lib/logstorage/pipe_unpack_syslog_test.go b/lib/logstorage/pipe_unpack_syslog_test.go index 8a670dbf7..115903cda 100644 --- a/lib/logstorage/pipe_unpack_syslog_test.go +++ b/lib/logstorage/pipe_unpack_syslog_test.go @@ -11,16 +11,22 @@ func TestParsePipeUnpackSyslogSuccess(t *testing.T) { } f(`unpack_syslog`) + f(`unpack_syslog offset 6h30m`) + f(`unpack_syslog offset -6h30m`) f(`unpack_syslog keep_original_fields`) + f(`unpack_syslog offset -6h30m keep_original_fields`) f(`unpack_syslog if (a:x)`) f(`unpack_syslog if (a:x) keep_original_fields`) + f(`unpack_syslog if (a:x) offset 2h keep_original_fields`) f(`unpack_syslog from x`) f(`unpack_syslog from x keep_original_fields`) f(`unpack_syslog if (a:x) from x`) f(`unpack_syslog from x result_prefix abc`) + f(`unpack_syslog from x offset 2h30m result_prefix abc`) f(`unpack_syslog if (a:x) from x result_prefix abc`) f(`unpack_syslog result_prefix abc`) f(`unpack_syslog if (a:x) result_prefix abc`) + f(`unpack_syslog if (a:x) offset -1h result_prefix abc`) } func TestParsePipeUnpackSyslogFailure(t *testing.T) { @@ -31,6 +37,7 @@ func TestParsePipeUnpackSyslogFailure(t *testing.T) { f(`unpack_syslog foo`) f(`unpack_syslog if`) + f(`unpack_syslog offset`) f(`unpack_syslog if (x:y) foobar`) f(`unpack_syslog from`) f(`unpack_syslog from x y`) @@ -118,6 +125,27 @@ func TestPipeUnpackSyslog(t *testing.T) { }, }) + // offset should be ignored when parsing non-rfc3164 messages + f("unpack_syslog from x offset 2h30m", [][]Field{ + { + {"x", `<165>1 2023-06-03T17:42:32.123456789Z mymachine.example.com appname 12345 ID47 - This is a test message with structured data`}, + }, + }, [][]Field{ + { + {"x", `<165>1 2023-06-03T17:42:32.123456789Z mymachine.example.com appname 12345 ID47 - This is a test message with structured data`}, + {"priority", "165"}, + {"facility", "20"}, + {"severity", "5"}, + {"format", "rfc5424"}, + {"timestamp", "2023-06-03T17:42:32.123456789Z"}, + {"hostname", "mymachine.example.com"}, + {"app_name", "appname"}, + {"proc_id", "12345"}, + {"msg_id", "ID47"}, + {"message", "This is a test message with structured data"}, + }, + }) + // failed if condition f("unpack_syslog if (foo:bar)", [][]Field{ { diff --git a/lib/logstorage/rows.go b/lib/logstorage/rows.go index 21906df49..c165641a3 100644 --- a/lib/logstorage/rows.go +++ b/lib/logstorage/rows.go @@ -58,7 +58,11 @@ func (f *Field) unmarshal(a *arena, src []byte) ([]byte, error) { } func (f *Field) marshalToJSON(dst []byte) []byte { - dst = strconv.AppendQuote(dst, f.Name) + name := f.Name + if name == "" { + name = "_msg" + } + dst = strconv.AppendQuote(dst, name) dst = append(dst, ':') dst = strconv.AppendQuote(dst, f.Value) return dst @@ -84,6 +88,20 @@ func needLogfmtQuoting(s string) bool { return false } +// RenameField renames field with the oldName to newName in Fields +func RenameField(fields []Field, oldName, newName string) { + if oldName == "" { + return + } + for i := range fields { + f := &fields[i] + if f.Name == oldName { + f.Name = newName + return + } + } +} + // MarshalFieldsToJSON appends JSON-marshaled fields to dst and returns the result. func MarshalFieldsToJSON(dst []byte, fields []Field) []byte { dst = append(dst, '{') diff --git a/lib/logstorage/syslog_parser.go b/lib/logstorage/syslog_parser.go index 51e5f5ccd..b86ccdb68 100644 --- a/lib/logstorage/syslog_parser.go +++ b/lib/logstorage/syslog_parser.go @@ -10,50 +10,78 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" ) -func getSyslogParser(currentYear int) *syslogParser { +// GetSyslogParser returns syslog parser from the pool. +// +// currentYear must contain the current year. It is used for properly setting timestamp +// field for rfc3164 format, which doesn't contain year. +// +// the timezone is used for rfc3164 format for setting the desired timezone. +// +// Return back the parser to the pool by calling PutSyslogParser when it is no longer needed. +func GetSyslogParser(currentYear int, timezone *time.Location) *SyslogParser { v := syslogParserPool.Get() if v == nil { - v = &syslogParser{} + v = &SyslogParser{} } - p := v.(*syslogParser) + p := v.(*SyslogParser) p.currentYear = currentYear + p.timezone = timezone return p } -func putSyslogParser(p *syslogParser) { +// PutSyslogParser returns back syslog parser to the pool. +// +// p cannot be used after returning to the pool. +func PutSyslogParser(p *SyslogParser) { p.reset() syslogParserPool.Put(p) } var syslogParserPool sync.Pool -type syslogParser struct { - currentYear int +// SyslogParser is parser for syslog messages. +// +// It understands the following syslog formats: +// +// - https://datatracker.ietf.org/doc/html/rfc5424 +// - https://datatracker.ietf.org/doc/html/rfc3164 +// +// It extracts the following list of syslog message fields into Fields - +// https://docs.victoriametrics.com/victorialogs/logsql/#unpack_syslog-pipe +type SyslogParser struct { + // Fields contains parsed fields after Parse call. + Fields []Field - buf []byte - fields []Field + buf []byte + + currentYear int + timezone *time.Location } -func (p *syslogParser) reset() { +func (p *SyslogParser) reset() { p.currentYear = 0 + p.timezone = nil p.resetFields() } -func (p *syslogParser) resetFields() { - p.buf = p.buf[:0] +func (p *SyslogParser) resetFields() { + clear(p.Fields) + p.Fields = p.Fields[:0] - clear(p.fields) - p.fields = p.fields[:0] + p.buf = p.buf[:0] } -func (p *syslogParser) addField(name, value string) { - p.fields = append(p.fields, Field{ +func (p *SyslogParser) addField(name, value string) { + p.Fields = append(p.Fields, Field{ Name: name, Value: value, }) } -func (p *syslogParser) parse(s string) { +// Parse parses syslog message from s into p.Fields. +// +// p.Fields is valid until s is modified or p state is changed. +func (p *SyslogParser) Parse(s string) { p.resetFields() if len(s) == 0 { @@ -96,7 +124,7 @@ func (p *syslogParser) parse(s string) { p.parseNoHeader(s) } -func (p *syslogParser) parseNoHeader(s string) { +func (p *SyslogParser) parseNoHeader(s string) { if len(s) == 0 { return } @@ -107,7 +135,7 @@ func (p *syslogParser) parseNoHeader(s string) { } } -func (p *syslogParser) parseRFC5424(s string) { +func (p *SyslogParser) parseRFC5424(s string) { // See https://datatracker.ietf.org/doc/html/rfc5424 p.addField("format", "rfc5424") @@ -172,7 +200,7 @@ func (p *syslogParser) parseRFC5424(s string) { p.addField("message", s) } -func (p *syslogParser) parseRFC5424SD(s string) (string, bool) { +func (p *SyslogParser) parseRFC5424SD(s string) (string, bool) { if strings.HasPrefix(s, "- ") { return s[2:], true } @@ -190,7 +218,7 @@ func (p *syslogParser) parseRFC5424SD(s string) (string, bool) { } } -func (p *syslogParser) parseRFC5424SDLine(s string) (string, bool) { +func (p *SyslogParser) parseRFC5424SDLine(s string) (string, bool) { if len(s) == 0 || s[0] != '[' { return s, false } @@ -236,7 +264,7 @@ func (p *syslogParser) parseRFC5424SDLine(s string) (string, bool) { return s, true } -func (p *syslogParser) parseRFC3164(s string) { +func (p *SyslogParser) parseRFC3164(s string) { // See https://datatracker.ietf.org/doc/html/rfc3164 p.addField("format", "rfc3164") @@ -257,10 +285,10 @@ func (p *syslogParser) parseRFC3164(s string) { s = s[n:] t = t.UTC() - t = time.Date(p.currentYear, t.Month(), t.Day(), t.Hour(), t.Minute(), t.Second(), t.Nanosecond(), time.UTC) + t = time.Date(p.currentYear, t.Month(), t.Day(), t.Hour(), t.Minute(), t.Second(), t.Nanosecond(), p.timezone) if uint64(t.Unix())-24*3600 > fasttime.UnixTimestamp() { // Adjust time to the previous year - t = time.Date(t.Year()-1, t.Month(), t.Day(), t.Hour(), t.Minute(), t.Second(), t.Nanosecond(), time.UTC) + t = time.Date(t.Year()-1, t.Month(), t.Day(), t.Hour(), t.Minute(), t.Second(), t.Nanosecond(), p.timezone) } bufLen := len(p.buf) diff --git a/lib/logstorage/syslog_parser_test.go b/lib/logstorage/syslog_parser_test.go index 7935f472f..7f5f3d7e1 100644 --- a/lib/logstorage/syslog_parser_test.go +++ b/lib/logstorage/syslog_parser_test.go @@ -2,69 +2,70 @@ package logstorage import ( "testing" + "time" ) func TestSyslogParser(t *testing.T) { - f := func(s, resultExpected string) { + f := func(s string, timezone *time.Location, resultExpected string) { t.Helper() const currentYear = 2024 - p := getSyslogParser(currentYear) - defer putSyslogParser(p) + p := GetSyslogParser(currentYear, timezone) + defer PutSyslogParser(p) - p.parse(s) - result := MarshalFieldsToJSON(nil, p.fields) + p.Parse(s) + result := MarshalFieldsToJSON(nil, p.Fields) if string(result) != resultExpected { t.Fatalf("unexpected result when parsing [%s]; got\n%s\nwant\n%s\n", s, result, resultExpected) } } // RFC 3164 - f("Jun 3 12:08:33 abcd systemd[1]: Starting Update the local ESM caches...", + f("Jun 3 12:08:33 abcd systemd[1]: Starting Update the local ESM caches...", time.UTC, `{"format":"rfc3164","timestamp":"2024-06-03T12:08:33.000Z","hostname":"abcd","app_name":"systemd","proc_id":"1","message":"Starting Update the local ESM caches..."}`) - f("<165>Jun 3 12:08:33 abcd systemd[1]: Starting Update the local ESM caches...", + f("<165>Jun 3 12:08:33 abcd systemd[1]: Starting Update the local ESM caches...", time.UTC, `{"priority":"165","facility":"20","severity":"5","format":"rfc3164","timestamp":"2024-06-03T12:08:33.000Z","hostname":"abcd","app_name":"systemd","proc_id":"1","message":"Starting Update the local ESM caches..."}`) - f("Mar 13 12:08:33 abcd systemd: Starting Update the local ESM caches...", + f("Mar 13 12:08:33 abcd systemd: Starting Update the local ESM caches...", time.UTC, `{"format":"rfc3164","timestamp":"2024-03-13T12:08:33.000Z","hostname":"abcd","app_name":"systemd","message":"Starting Update the local ESM caches..."}`) - f("Jun 3 12:08:33 abcd - Starting Update the local ESM caches...", + f("Jun 3 12:08:33 abcd - Starting Update the local ESM caches...", time.UTC, `{"format":"rfc3164","timestamp":"2024-06-03T12:08:33.000Z","hostname":"abcd","app_name":"-","message":"Starting Update the local ESM caches..."}`) - f("Jun 3 12:08:33 - - Starting Update the local ESM caches...", + f("Jun 3 12:08:33 - - Starting Update the local ESM caches...", time.UTC, `{"format":"rfc3164","timestamp":"2024-06-03T12:08:33.000Z","hostname":"-","app_name":"-","message":"Starting Update the local ESM caches..."}`) // RFC 5424 - f(`<165>1 2023-06-03T17:42:32.123456789Z mymachine.example.com appname 12345 ID47 - This is a test message with structured data.`, + f(`<165>1 2023-06-03T17:42:32.123456789Z mymachine.example.com appname 12345 ID47 - This is a test message with structured data.`, time.UTC, `{"priority":"165","facility":"20","severity":"5","format":"rfc5424","timestamp":"2023-06-03T17:42:32.123456789Z","hostname":"mymachine.example.com","app_name":"appname","proc_id":"12345","msg_id":"ID47","message":"This is a test message with structured data."}`) - f(`1 2023-06-03T17:42:32.123456789Z mymachine.example.com appname 12345 ID47 - This is a test message with structured data.`, + f(`1 2023-06-03T17:42:32.123456789Z mymachine.example.com appname 12345 ID47 - This is a test message with structured data.`, time.UTC, `{"format":"rfc5424","timestamp":"2023-06-03T17:42:32.123456789Z","hostname":"mymachine.example.com","app_name":"appname","proc_id":"12345","msg_id":"ID47","message":"This is a test message with structured data."}`) - f(`<165>1 2023-06-03T17:42:00.000Z mymachine.example.com appname 12345 ID47 [exampleSDID@32473 iut="3" eventSource="Application 123 = ] 56" eventID="11211"] This is a test message with structured data.`, + f(`<165>1 2023-06-03T17:42:00.000Z mymachine.example.com appname 12345 ID47 [exampleSDID@32473 iut="3" eventSource="Application 123 = ] 56" eventID="11211"] This is a test message with structured data.`, time.UTC, `{"priority":"165","facility":"20","severity":"5","format":"rfc5424","timestamp":"2023-06-03T17:42:00.000Z","hostname":"mymachine.example.com","app_name":"appname","proc_id":"12345","msg_id":"ID47","exampleSDID@32473":"iut=\"3\" eventSource=\"Application 123 = ] 56\" eventID=\"11211\"","message":"This is a test message with structured data."}`) - f(`<165>1 2023-06-03T17:42:00.000Z mymachine.example.com appname 12345 ID47 [foo@123 iut="3"][bar@456 eventID="11211"] This is a test message with structured data.`, + f(`<165>1 2023-06-03T17:42:00.000Z mymachine.example.com appname 12345 ID47 [foo@123 iut="3"][bar@456 eventID="11211"] This is a test message with structured data.`, time.UTC, `{"priority":"165","facility":"20","severity":"5","format":"rfc5424","timestamp":"2023-06-03T17:42:00.000Z","hostname":"mymachine.example.com","app_name":"appname","proc_id":"12345","msg_id":"ID47","foo@123":"iut=\"3\"","bar@456":"eventID=\"11211\"","message":"This is a test message with structured data."}`) // Incomplete RFC 3164 - f("", `{}`) - f("Jun 3 12:08:33", `{"format":"rfc3164","timestamp":"2024-06-03T12:08:33.000Z"}`) - f("Foo 3 12:08:33", `{"format":"rfc3164","message":"Foo 3 12:08:33"}`) - f("Foo 3 12:08:33bar", `{"format":"rfc3164","message":"Foo 3 12:08:33bar"}`) - f("Jun 3 12:08:33 abcd", `{"format":"rfc3164","timestamp":"2024-06-03T12:08:33.000Z","hostname":"abcd"}`) - f("Jun 3 12:08:33 abcd sudo", `{"format":"rfc3164","timestamp":"2024-06-03T12:08:33.000Z","hostname":"abcd","app_name":"sudo"}`) - f("Jun 3 12:08:33 abcd sudo[123]", `{"format":"rfc3164","timestamp":"2024-06-03T12:08:33.000Z","hostname":"abcd","app_name":"sudo","proc_id":"123"}`) - f("Jun 3 12:08:33 abcd sudo foobar", `{"format":"rfc3164","timestamp":"2024-06-03T12:08:33.000Z","hostname":"abcd","app_name":"sudo","message":"foobar"}`) - f(`foo bar baz`, `{"format":"rfc3164","message":"foo bar baz"}`) + f("", time.UTC, `{}`) + f("Jun 3 12:08:33", time.UTC, `{"format":"rfc3164","timestamp":"2024-06-03T12:08:33.000Z"}`) + f("Foo 3 12:08:33", time.UTC, `{"format":"rfc3164","message":"Foo 3 12:08:33"}`) + f("Foo 3 12:08:33bar", time.UTC, `{"format":"rfc3164","message":"Foo 3 12:08:33bar"}`) + f("Jun 3 12:08:33 abcd", time.UTC, `{"format":"rfc3164","timestamp":"2024-06-03T12:08:33.000Z","hostname":"abcd"}`) + f("Jun 3 12:08:33 abcd sudo", time.UTC, `{"format":"rfc3164","timestamp":"2024-06-03T12:08:33.000Z","hostname":"abcd","app_name":"sudo"}`) + f("Jun 3 12:08:33 abcd sudo[123]", time.UTC, `{"format":"rfc3164","timestamp":"2024-06-03T12:08:33.000Z","hostname":"abcd","app_name":"sudo","proc_id":"123"}`) + f("Jun 3 12:08:33 abcd sudo foobar", time.UTC, `{"format":"rfc3164","timestamp":"2024-06-03T12:08:33.000Z","hostname":"abcd","app_name":"sudo","message":"foobar"}`) + f(`foo bar baz`, time.UTC, `{"format":"rfc3164","message":"foo bar baz"}`) // Incomplete RFC 5424 - f(`<165>1 2023-06-03T17:42:32.123456789Z mymachine.example.com appname 12345 ID47 [foo@123]`, + f(`<165>1 2023-06-03T17:42:32.123456789Z mymachine.example.com appname 12345 ID47 [foo@123]`, time.UTC, `{"priority":"165","facility":"20","severity":"5","format":"rfc5424","timestamp":"2023-06-03T17:42:32.123456789Z","hostname":"mymachine.example.com","app_name":"appname","proc_id":"12345","msg_id":"ID47","foo@123":""}`) - f(`<165>1 2023-06-03T17:42:32.123456789Z mymachine.example.com appname 12345 ID47`, + f(`<165>1 2023-06-03T17:42:32.123456789Z mymachine.example.com appname 12345 ID47`, time.UTC, `{"priority":"165","facility":"20","severity":"5","format":"rfc5424","timestamp":"2023-06-03T17:42:32.123456789Z","hostname":"mymachine.example.com","app_name":"appname","proc_id":"12345","msg_id":"ID47"}`) - f(`<165>1 2023-06-03T17:42:32.123456789Z mymachine.example.com appname 12345`, + f(`<165>1 2023-06-03T17:42:32.123456789Z mymachine.example.com appname 12345`, time.UTC, `{"priority":"165","facility":"20","severity":"5","format":"rfc5424","timestamp":"2023-06-03T17:42:32.123456789Z","hostname":"mymachine.example.com","app_name":"appname","proc_id":"12345"}`) - f(`<165>1 2023-06-03T17:42:32.123456789Z mymachine.example.com appname`, + f(`<165>1 2023-06-03T17:42:32.123456789Z mymachine.example.com appname`, time.UTC, `{"priority":"165","facility":"20","severity":"5","format":"rfc5424","timestamp":"2023-06-03T17:42:32.123456789Z","hostname":"mymachine.example.com","app_name":"appname"}`) - f(`<165>1 2023-06-03T17:42:32.123456789Z mymachine.example.com`, + f(`<165>1 2023-06-03T17:42:32.123456789Z mymachine.example.com`, time.UTC, `{"priority":"165","facility":"20","severity":"5","format":"rfc5424","timestamp":"2023-06-03T17:42:32.123456789Z","hostname":"mymachine.example.com"}`) - f(`<165>1 2023-06-03T17:42:32.123456789Z`, + f(`<165>1 2023-06-03T17:42:32.123456789Z`, time.UTC, `{"priority":"165","facility":"20","severity":"5","format":"rfc5424","timestamp":"2023-06-03T17:42:32.123456789Z"}`) - f(`<165>1 `, + f(`<165>1 `, time.UTC, `{"priority":"165","facility":"20","severity":"5","format":"rfc5424"}`) } diff --git a/lib/logstorage/values_encoder.go b/lib/logstorage/values_encoder.go index f1a909279..7e9bf0cd0 100644 --- a/lib/logstorage/values_encoder.go +++ b/lib/logstorage/values_encoder.go @@ -262,7 +262,7 @@ func tryTimestampISO8601Encoding(dstBuf []byte, dstValues, srcValues []string) ( a := u64s.A var minValue, maxValue int64 for i, v := range srcValues { - n, ok := tryParseTimestampISO8601(v) + n, ok := TryParseTimestampISO8601(v) if !ok { return dstBuf, dstValues, valueTypeUnknown, 0, 0 } @@ -283,10 +283,10 @@ func tryTimestampISO8601Encoding(dstBuf []byte, dstValues, srcValues []string) ( return dstBuf, dstValues, valueTypeTimestampISO8601, uint64(minValue), uint64(maxValue) } -// tryParseTimestampRFC3339Nano parses 'YYYY-MM-DDThh:mm:ss' with optional nanoseconds part and 'Z' tail and returns unix timestamp in nanoseconds. +// TryParseTimestampRFC3339Nano parses 'YYYY-MM-DDThh:mm:ss' with optional nanoseconds part and 'Z' tail and returns unix timestamp in nanoseconds. // // The returned timestamp can be negative if s is smaller than 1970 year. -func tryParseTimestampRFC3339Nano(s string) (int64, bool) { +func TryParseTimestampRFC3339Nano(s string) (int64, bool) { // Do not parse timestamps with timezone other than Z, since they cannot be converted back // to the same string representation in general case. // This may break search. @@ -330,10 +330,10 @@ func tryParseTimestampRFC3339Nano(s string) (int64, bool) { return nsecs, true } -// tryParseTimestampISO8601 parses 'YYYY-MM-DDThh:mm:ss.mssZ' and returns unix timestamp in nanoseconds. +// TryParseTimestampISO8601 parses 'YYYY-MM-DDThh:mm:ss.mssZ' and returns unix timestamp in nanoseconds. // // The returned timestamp can be negative if s is smaller than 1970 year. -func tryParseTimestampISO8601(s string) (int64, bool) { +func TryParseTimestampISO8601(s string) (int64, bool) { // Do not parse timestamps with timezone, since they cannot be converted back // to the same string representation in general case. // This may break search. diff --git a/lib/logstorage/values_encoder_test.go b/lib/logstorage/values_encoder_test.go index 369f12b06..6564f4a4f 100644 --- a/lib/logstorage/values_encoder_test.go +++ b/lib/logstorage/values_encoder_test.go @@ -151,7 +151,7 @@ func TestTryParseIPv4_Failure(t *testing.T) { func TestTryParseTimestampRFC3339NanoString_Success(t *testing.T) { f := func(s string) { t.Helper() - nsecs, ok := tryParseTimestampRFC3339Nano(s) + nsecs, ok := TryParseTimestampRFC3339Nano(s) if !ok { t.Fatalf("cannot parse timestamp %q", s) } @@ -185,7 +185,7 @@ func TestTryParseTimestampRFC3339NanoString_Success(t *testing.T) { func TestTryParseTimestampRFC3339Nano_Failure(t *testing.T) { f := func(s string) { t.Helper() - _, ok := tryParseTimestampRFC3339Nano(s) + _, ok := TryParseTimestampRFC3339Nano(s) if ok { t.Fatalf("expecting faulure when parsing %q", s) } @@ -240,7 +240,7 @@ func TestTryParseTimestampRFC3339Nano_Failure(t *testing.T) { func TestTryParseTimestampISO8601String_Success(t *testing.T) { f := func(s string) { t.Helper() - nsecs, ok := tryParseTimestampISO8601(s) + nsecs, ok := TryParseTimestampISO8601(s) if !ok { t.Fatalf("cannot parse timestamp %q", s) } @@ -263,7 +263,7 @@ func TestTryParseTimestampISO8601String_Success(t *testing.T) { func TestTryParseTimestampISO8601_Failure(t *testing.T) { f := func(s string) { t.Helper() - _, ok := tryParseTimestampISO8601(s) + _, ok := TryParseTimestampISO8601(s) if ok { t.Fatalf("expecting faulure when parsing %q", s) } diff --git a/lib/logstorage/values_encoder_timing_test.go b/lib/logstorage/values_encoder_timing_test.go index d80e6f411..ef564fee4 100644 --- a/lib/logstorage/values_encoder_timing_test.go +++ b/lib/logstorage/values_encoder_timing_test.go @@ -21,7 +21,7 @@ func BenchmarkTryParseTimestampRFC3339Nano(b *testing.B) { nSum := int64(0) for pb.Next() { for _, s := range a { - n, ok := tryParseTimestampRFC3339Nano(s) + n, ok := TryParseTimestampRFC3339Nano(s) if !ok { panic(fmt.Errorf("cannot parse timestamp %q", s)) } @@ -47,7 +47,7 @@ func BenchmarkTryParseTimestampISO8601(b *testing.B) { nSum := int64(0) for pb.Next() { for _, s := range a { - n, ok := tryParseTimestampISO8601(s) + n, ok := TryParseTimestampISO8601(s) if !ok { panic(fmt.Errorf("cannot parse timestamp %q", s)) }