mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-12-01 14:47:38 +00:00
wip
This commit is contained in:
parent
430bebb5f0
commit
56fbbb3ae7
36 changed files with 871 additions and 232 deletions
|
@ -221,7 +221,7 @@ func readBulkLine(sc *bufio.Scanner, timeField, msgField string,
|
||||||
if ts == 0 {
|
if ts == 0 {
|
||||||
ts = time.Now().UnixNano()
|
ts = time.Now().UnixNano()
|
||||||
}
|
}
|
||||||
p.RenameField(msgField, "_msg")
|
logstorage.RenameField(p.Fields, msgField, "_msg")
|
||||||
processLogMessage(ts, p.Fields)
|
processLogMessage(ts, p.Fields)
|
||||||
logstorage.PutJSONParser(p)
|
logstorage.PutJSONParser(p)
|
||||||
|
|
||||||
|
@ -272,9 +272,9 @@ func parseElasticsearchTimestamp(s string) (int64, error) {
|
||||||
}
|
}
|
||||||
return t.UnixNano(), nil
|
return t.UnixNano(), nil
|
||||||
}
|
}
|
||||||
t, err := time.Parse(time.RFC3339, s)
|
nsecs, ok := logstorage.TryParseTimestampRFC3339Nano(s)
|
||||||
if err != nil {
|
if !ok {
|
||||||
return 0, fmt.Errorf("cannot parse timestamp %q: %w", s, err)
|
return 0, fmt.Errorf("cannot parse timestamp %q", s)
|
||||||
}
|
}
|
||||||
return t.UnixNano(), nil
|
return nsecs, nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -5,7 +5,6 @@ import (
|
||||||
"compress/gzip"
|
"compress/gzip"
|
||||||
"fmt"
|
"fmt"
|
||||||
"reflect"
|
"reflect"
|
||||||
"strings"
|
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage"
|
||||||
|
@ -45,13 +44,7 @@ func TestReadBulkRequestSuccess(t *testing.T) {
|
||||||
var result string
|
var result string
|
||||||
processLogMessage := func(timestamp int64, fields []logstorage.Field) {
|
processLogMessage := func(timestamp int64, fields []logstorage.Field) {
|
||||||
timestamps = append(timestamps, timestamp)
|
timestamps = append(timestamps, timestamp)
|
||||||
|
result += string(logstorage.MarshalFieldsToJSON(nil, fields)) + "\n"
|
||||||
a := make([]string, len(fields))
|
|
||||||
for i, f := range fields {
|
|
||||||
a[i] = fmt.Sprintf("%q:%q", f.Name, f.Value)
|
|
||||||
}
|
|
||||||
s := "{" + strings.Join(a, ",") + "}\n"
|
|
||||||
result += s
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Read the request without compression
|
// Read the request without compression
|
||||||
|
|
|
@ -71,6 +71,23 @@ func GetCommonParams(r *http.Request) (*CommonParams, error) {
|
||||||
return cp, nil
|
return cp, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// GetCommonParamsForSyslog returns common params needed for parsing syslog messages and storing them to the given tenantID.
|
||||||
|
func GetCommonParamsForSyslog(tenantID logstorage.TenantID) *CommonParams {
|
||||||
|
// See https://docs.victoriametrics.com/victorialogs/logsql/#unpack_syslog-pipe
|
||||||
|
cp := &CommonParams{
|
||||||
|
TenantID: tenantID,
|
||||||
|
TimeField: "timestamp",
|
||||||
|
MsgField: "message",
|
||||||
|
StreamFields: []string{
|
||||||
|
"hostname",
|
||||||
|
"app_name",
|
||||||
|
"proc_id",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
return cp
|
||||||
|
}
|
||||||
|
|
||||||
// GetProcessLogMessageFunc returns a function, which adds parsed log messages to lr.
|
// GetProcessLogMessageFunc returns a function, which adds parsed log messages to lr.
|
||||||
func (cp *CommonParams) GetProcessLogMessageFunc(lr *logstorage.LogRows) func(timestamp int64, fields []logstorage.Field) {
|
func (cp *CommonParams) GetProcessLogMessageFunc(lr *logstorage.LogRows) func(timestamp int64, fields []logstorage.Field) {
|
||||||
return func(timestamp int64, fields []logstorage.Field) {
|
return func(timestamp int64, fields []logstorage.Field) {
|
||||||
|
|
33
app/vlinsert/insertutils/timestamp.go
Normal file
33
app/vlinsert/insertutils/timestamp.go
Normal file
|
@ -0,0 +1,33 @@
|
||||||
|
package insertutils
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage"
|
||||||
|
)
|
||||||
|
|
||||||
|
// ExtractTimestampISO8601FromFields extracts ISO8601 timestamp in nanoseconds from the field with the name timeField at fields.
|
||||||
|
//
|
||||||
|
// The value for the timeField is set to empty string after returning from the function,
|
||||||
|
// so it could be ignored during data ingestion.
|
||||||
|
//
|
||||||
|
// The current timestamp is returned if fields do not contain a field with timeField name or if the timeField value is empty.
|
||||||
|
func ExtractTimestampISO8601FromFields(timeField string, fields []logstorage.Field) (int64, error) {
|
||||||
|
for i := range fields {
|
||||||
|
f := &fields[i]
|
||||||
|
if f.Name != timeField {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
nsecs, ok := logstorage.TryParseTimestampISO8601(f.Value)
|
||||||
|
if !ok {
|
||||||
|
if f.Value == "0" || f.Value == "" {
|
||||||
|
return time.Now().UnixNano(), nil
|
||||||
|
}
|
||||||
|
return time.Now().UnixNano(), fmt.Errorf("cannot unmarshal iso8601 timestamp from %s=%q", timeField, f.Value)
|
||||||
|
}
|
||||||
|
f.Value = ""
|
||||||
|
return nsecs, nil
|
||||||
|
}
|
||||||
|
return time.Now().UnixNano(), nil
|
||||||
|
}
|
|
@ -19,13 +19,13 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
// RequestHandler processes jsonline insert requests
|
// RequestHandler processes jsonline insert requests
|
||||||
func RequestHandler(w http.ResponseWriter, r *http.Request) bool {
|
func RequestHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
startTime := time.Now()
|
startTime := time.Now()
|
||||||
w.Header().Add("Content-Type", "application/json")
|
w.Header().Add("Content-Type", "application/json")
|
||||||
|
|
||||||
if r.Method != "POST" {
|
if r.Method != "POST" {
|
||||||
w.WriteHeader(http.StatusMethodNotAllowed)
|
w.WriteHeader(http.StatusMethodNotAllowed)
|
||||||
return true
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
requestsTotal.Inc()
|
requestsTotal.Inc()
|
||||||
|
@ -33,11 +33,11 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool {
|
||||||
cp, err := insertutils.GetCommonParams(r)
|
cp, err := insertutils.GetCommonParams(r)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
httpserver.Errorf(w, r, "%s", err)
|
httpserver.Errorf(w, r, "%s", err)
|
||||||
return true
|
return
|
||||||
}
|
}
|
||||||
if err := vlstorage.CanWriteData(); err != nil {
|
if err := vlstorage.CanWriteData(); err != nil {
|
||||||
httpserver.Errorf(w, r, "%s", err)
|
httpserver.Errorf(w, r, "%s", err)
|
||||||
return true
|
return
|
||||||
}
|
}
|
||||||
lr := logstorage.GetLogRows(cp.StreamFields, cp.IgnoreFields)
|
lr := logstorage.GetLogRows(cp.StreamFields, cp.IgnoreFields)
|
||||||
processLogMessage := cp.GetProcessLogMessageFunc(lr)
|
processLogMessage := cp.GetProcessLogMessageFunc(lr)
|
||||||
|
@ -46,8 +46,8 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool {
|
||||||
if r.Header.Get("Content-Encoding") == "gzip" {
|
if r.Header.Get("Content-Encoding") == "gzip" {
|
||||||
zr, err := common.GetGzipReader(reader)
|
zr, err := common.GetGzipReader(reader)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Errorf("cannot read gzipped _bulk request: %s", err)
|
logger.Errorf("cannot read gzipped jsonline request: %s", err)
|
||||||
return true
|
return
|
||||||
}
|
}
|
||||||
defer common.PutGzipReader(zr)
|
defer common.PutGzipReader(zr)
|
||||||
reader = zr
|
reader = zr
|
||||||
|
@ -68,6 +68,7 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool {
|
||||||
ok, err := readLine(sc, cp.TimeField, cp.MsgField, processLogMessage)
|
ok, err := readLine(sc, cp.TimeField, cp.MsgField, processLogMessage)
|
||||||
wcr.DecConcurrency()
|
wcr.DecConcurrency()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
errorsTotal.Inc()
|
||||||
logger.Errorf("cannot read line #%d in /jsonline request: %s", n, err)
|
logger.Errorf("cannot read line #%d in /jsonline request: %s", n, err)
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
@ -81,12 +82,10 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool {
|
||||||
vlstorage.MustAddRows(lr)
|
vlstorage.MustAddRows(lr)
|
||||||
logstorage.PutLogRows(lr)
|
logstorage.PutLogRows(lr)
|
||||||
|
|
||||||
// update jsonlineRequestDuration only for successfully parsed requests.
|
// update requestDuration only for successfully parsed requests.
|
||||||
// There is no need in updating jsonlineRequestDuration for request errors,
|
// There is no need in updating requestDuration for request errors,
|
||||||
// since their timings are usually much smaller than the timing for successful request parsing.
|
// since their timings are usually much smaller than the timing for successful request parsing.
|
||||||
jsonlineRequestDuration.UpdateDuration(startTime)
|
requestDuration.UpdateDuration(startTime)
|
||||||
|
|
||||||
return true
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func readLine(sc *bufio.Scanner, timeField, msgField string, processLogMessage func(timestamp int64, fields []logstorage.Field)) (bool, error) {
|
func readLine(sc *bufio.Scanner, timeField, msgField string, processLogMessage func(timestamp int64, fields []logstorage.Field)) (bool, error) {
|
||||||
|
@ -108,53 +107,24 @@ func readLine(sc *bufio.Scanner, timeField, msgField string, processLogMessage f
|
||||||
if err := p.ParseLogMessage(line); err != nil {
|
if err := p.ParseLogMessage(line); err != nil {
|
||||||
return false, fmt.Errorf("cannot parse json-encoded log entry: %w", err)
|
return false, fmt.Errorf("cannot parse json-encoded log entry: %w", err)
|
||||||
}
|
}
|
||||||
ts, err := extractTimestampFromFields(timeField, p.Fields)
|
ts, err := insertutils.ExtractTimestampISO8601FromFields(timeField, p.Fields)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, fmt.Errorf("cannot parse timestamp: %w", err)
|
return false, fmt.Errorf("cannot get timestamp: %w", err)
|
||||||
}
|
}
|
||||||
if ts == 0 {
|
logstorage.RenameField(p.Fields, msgField, "_msg")
|
||||||
ts = time.Now().UnixNano()
|
|
||||||
}
|
|
||||||
p.RenameField(msgField, "_msg")
|
|
||||||
processLogMessage(ts, p.Fields)
|
processLogMessage(ts, p.Fields)
|
||||||
logstorage.PutJSONParser(p)
|
logstorage.PutJSONParser(p)
|
||||||
|
|
||||||
return true, nil
|
return true, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func extractTimestampFromFields(timeField string, fields []logstorage.Field) (int64, error) {
|
|
||||||
for i := range fields {
|
|
||||||
f := &fields[i]
|
|
||||||
if f.Name != timeField {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
timestamp, err := parseISO8601Timestamp(f.Value)
|
|
||||||
if err != nil {
|
|
||||||
return 0, err
|
|
||||||
}
|
|
||||||
f.Value = ""
|
|
||||||
return timestamp, nil
|
|
||||||
}
|
|
||||||
return 0, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func parseISO8601Timestamp(s string) (int64, error) {
|
|
||||||
if s == "0" || s == "" {
|
|
||||||
// Special case for returning the current timestamp.
|
|
||||||
// It must be automatically converted to the current timestamp by the caller.
|
|
||||||
return 0, nil
|
|
||||||
}
|
|
||||||
t, err := time.Parse(time.RFC3339, s)
|
|
||||||
if err != nil {
|
|
||||||
return 0, fmt.Errorf("cannot parse timestamp %q: %w", s, err)
|
|
||||||
}
|
|
||||||
return t.UnixNano(), nil
|
|
||||||
}
|
|
||||||
|
|
||||||
var lineBufferPool bytesutil.ByteBufferPool
|
var lineBufferPool bytesutil.ByteBufferPool
|
||||||
|
|
||||||
var (
|
var (
|
||||||
requestsTotal = metrics.NewCounter(`vl_http_requests_total{path="/insert/jsonline"}`)
|
|
||||||
rowsIngestedTotal = metrics.NewCounter(`vl_rows_ingested_total{type="jsonline"}`)
|
rowsIngestedTotal = metrics.NewCounter(`vl_rows_ingested_total{type="jsonline"}`)
|
||||||
jsonlineRequestDuration = metrics.NewHistogram(`vl_http_request_duration_seconds{path="/insert/jsonline"}`)
|
|
||||||
|
requestsTotal = metrics.NewCounter(`vl_http_requests_total{path="/insert/jsonline"}`)
|
||||||
|
errorsTotal = metrics.NewCounter(`vl_http_errors_total{path="/insert/jsonline"}`)
|
||||||
|
|
||||||
|
requestDuration = metrics.NewHistogram(`vl_http_request_duration_seconds{path="/insert/jsonline"}`)
|
||||||
)
|
)
|
||||||
|
|
|
@ -3,14 +3,13 @@ package jsonline
|
||||||
import (
|
import (
|
||||||
"bufio"
|
"bufio"
|
||||||
"bytes"
|
"bytes"
|
||||||
"fmt"
|
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage"
|
|
||||||
"reflect"
|
"reflect"
|
||||||
"strings"
|
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestReadBulkRequestSuccess(t *testing.T) {
|
func TestReadLine_Success(t *testing.T) {
|
||||||
f := func(data, timeField, msgField string, rowsExpected int, timestampsExpected []int64, resultExpected string) {
|
f := func(data, timeField, msgField string, rowsExpected int, timestampsExpected []int64, resultExpected string) {
|
||||||
t.Helper()
|
t.Helper()
|
||||||
|
|
||||||
|
@ -18,16 +17,9 @@ func TestReadBulkRequestSuccess(t *testing.T) {
|
||||||
var result string
|
var result string
|
||||||
processLogMessage := func(timestamp int64, fields []logstorage.Field) {
|
processLogMessage := func(timestamp int64, fields []logstorage.Field) {
|
||||||
timestamps = append(timestamps, timestamp)
|
timestamps = append(timestamps, timestamp)
|
||||||
|
result += string(logstorage.MarshalFieldsToJSON(nil, fields)) + "\n"
|
||||||
a := make([]string, len(fields))
|
|
||||||
for i, f := range fields {
|
|
||||||
a[i] = fmt.Sprintf("%q:%q", f.Name, f.Value)
|
|
||||||
}
|
|
||||||
s := "{" + strings.Join(a, ",") + "}\n"
|
|
||||||
result += s
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Read the request without compression
|
|
||||||
r := bytes.NewBufferString(data)
|
r := bytes.NewBufferString(data)
|
||||||
sc := bufio.NewScanner(r)
|
sc := bufio.NewScanner(r)
|
||||||
rows := 0
|
rows := 0
|
||||||
|
@ -53,7 +45,6 @@ func TestReadBulkRequestSuccess(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Verify non-empty data
|
|
||||||
data := `{"@timestamp":"2023-06-06T04:48:11.735Z","log":{"offset":71770,"file":{"path":"/var/log/auth.log"}},"message":"foobar"}
|
data := `{"@timestamp":"2023-06-06T04:48:11.735Z","log":{"offset":71770,"file":{"path":"/var/log/auth.log"}},"message":"foobar"}
|
||||||
{"@timestamp":"2023-06-06T04:48:12.735Z","message":"baz"}
|
{"@timestamp":"2023-06-06T04:48:12.735Z","message":"baz"}
|
||||||
{"message":"xyz","@timestamp":"2023-06-06T04:48:13.735Z","x":"y"}
|
{"message":"xyz","@timestamp":"2023-06-06T04:48:13.735Z","x":"y"}
|
||||||
|
|
|
@ -11,7 +11,8 @@ import (
|
||||||
func RequestHandler(path string, w http.ResponseWriter, r *http.Request) bool {
|
func RequestHandler(path string, w http.ResponseWriter, r *http.Request) bool {
|
||||||
switch path {
|
switch path {
|
||||||
case "/api/v1/push":
|
case "/api/v1/push":
|
||||||
return handleInsert(r, w)
|
handleInsert(r, w)
|
||||||
|
return true
|
||||||
case "/ready":
|
case "/ready":
|
||||||
// See https://grafana.com/docs/loki/latest/api/#identify-ready-loki-instance
|
// See https://grafana.com/docs/loki/latest/api/#identify-ready-loki-instance
|
||||||
w.WriteHeader(http.StatusOK)
|
w.WriteHeader(http.StatusOK)
|
||||||
|
@ -23,14 +24,14 @@ func RequestHandler(path string, w http.ResponseWriter, r *http.Request) bool {
|
||||||
}
|
}
|
||||||
|
|
||||||
// See https://grafana.com/docs/loki/latest/api/#push-log-entries-to-loki
|
// See https://grafana.com/docs/loki/latest/api/#push-log-entries-to-loki
|
||||||
func handleInsert(r *http.Request, w http.ResponseWriter) bool {
|
func handleInsert(r *http.Request, w http.ResponseWriter) {
|
||||||
contentType := r.Header.Get("Content-Type")
|
contentType := r.Header.Get("Content-Type")
|
||||||
switch contentType {
|
switch contentType {
|
||||||
case "application/json":
|
case "application/json":
|
||||||
return handleJSON(r, w)
|
handleJSON(r, w)
|
||||||
default:
|
default:
|
||||||
// Protobuf request body should be handled by default according to https://grafana.com/docs/loki/latest/api/#push-log-entries-to-loki
|
// Protobuf request body should be handled by default according to https://grafana.com/docs/loki/latest/api/#push-log-entries-to-loki
|
||||||
return handleProtobuf(r, w)
|
handleProtobuf(r, w)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -20,15 +20,15 @@ import (
|
||||||
|
|
||||||
var parserPool fastjson.ParserPool
|
var parserPool fastjson.ParserPool
|
||||||
|
|
||||||
func handleJSON(r *http.Request, w http.ResponseWriter) bool {
|
func handleJSON(r *http.Request, w http.ResponseWriter) {
|
||||||
startTime := time.Now()
|
startTime := time.Now()
|
||||||
lokiRequestsJSONTotal.Inc()
|
requestsJSONTotal.Inc()
|
||||||
reader := r.Body
|
reader := r.Body
|
||||||
if r.Header.Get("Content-Encoding") == "gzip" {
|
if r.Header.Get("Content-Encoding") == "gzip" {
|
||||||
zr, err := common.GetGzipReader(reader)
|
zr, err := common.GetGzipReader(reader)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
httpserver.Errorf(w, r, "cannot initialize gzip reader: %s", err)
|
httpserver.Errorf(w, r, "cannot initialize gzip reader: %s", err)
|
||||||
return true
|
return
|
||||||
}
|
}
|
||||||
defer common.PutGzipReader(zr)
|
defer common.PutGzipReader(zr)
|
||||||
reader = zr
|
reader = zr
|
||||||
|
@ -39,17 +39,17 @@ func handleJSON(r *http.Request, w http.ResponseWriter) bool {
|
||||||
writeconcurrencylimiter.PutReader(wcr)
|
writeconcurrencylimiter.PutReader(wcr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
httpserver.Errorf(w, r, "cannot read request body: %s", err)
|
httpserver.Errorf(w, r, "cannot read request body: %s", err)
|
||||||
return true
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
cp, err := getCommonParams(r)
|
cp, err := getCommonParams(r)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
httpserver.Errorf(w, r, "cannot parse common params from request: %s", err)
|
httpserver.Errorf(w, r, "cannot parse common params from request: %s", err)
|
||||||
return true
|
return
|
||||||
}
|
}
|
||||||
if err := vlstorage.CanWriteData(); err != nil {
|
if err := vlstorage.CanWriteData(); err != nil {
|
||||||
httpserver.Errorf(w, r, "%s", err)
|
httpserver.Errorf(w, r, "%s", err)
|
||||||
return true
|
return
|
||||||
}
|
}
|
||||||
lr := logstorage.GetLogRows(cp.StreamFields, cp.IgnoreFields)
|
lr := logstorage.GetLogRows(cp.StreamFields, cp.IgnoreFields)
|
||||||
processLogMessage := cp.GetProcessLogMessageFunc(lr)
|
processLogMessage := cp.GetProcessLogMessageFunc(lr)
|
||||||
|
@ -58,23 +58,21 @@ func handleJSON(r *http.Request, w http.ResponseWriter) bool {
|
||||||
logstorage.PutLogRows(lr)
|
logstorage.PutLogRows(lr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
httpserver.Errorf(w, r, "cannot parse Loki json request: %s", err)
|
httpserver.Errorf(w, r, "cannot parse Loki json request: %s", err)
|
||||||
return true
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
rowsIngestedJSONTotal.Add(n)
|
rowsIngestedJSONTotal.Add(n)
|
||||||
|
|
||||||
// update lokiRequestJSONDuration only for successfully parsed requests
|
// update requestJSONDuration only for successfully parsed requests
|
||||||
// There is no need in updating lokiRequestJSONDuration for request errors,
|
// There is no need in updating requestJSONDuration for request errors,
|
||||||
// since their timings are usually much smaller than the timing for successful request parsing.
|
// since their timings are usually much smaller than the timing for successful request parsing.
|
||||||
lokiRequestJSONDuration.UpdateDuration(startTime)
|
requestJSONDuration.UpdateDuration(startTime)
|
||||||
|
|
||||||
return true
|
|
||||||
}
|
}
|
||||||
|
|
||||||
var (
|
var (
|
||||||
lokiRequestsJSONTotal = metrics.NewCounter(`vl_http_requests_total{path="/insert/loki/api/v1/push",format="json"}`)
|
requestsJSONTotal = metrics.NewCounter(`vl_http_requests_total{path="/insert/loki/api/v1/push",format="json"}`)
|
||||||
rowsIngestedJSONTotal = metrics.NewCounter(`vl_rows_ingested_total{type="loki",format="json"}`)
|
rowsIngestedJSONTotal = metrics.NewCounter(`vl_rows_ingested_total{type="loki",format="json"}`)
|
||||||
lokiRequestJSONDuration = metrics.NewHistogram(`vl_http_request_duration_seconds{path="/insert/loki/api/v1/push",format="json"}`)
|
requestJSONDuration = metrics.NewHistogram(`vl_http_request_duration_seconds{path="/insert/loki/api/v1/push",format="json"}`)
|
||||||
)
|
)
|
||||||
|
|
||||||
func parseJSONRequest(data []byte, processLogMessage func(timestamp int64, fields []logstorage.Field)) (int, error) {
|
func parseJSONRequest(data []byte, processLogMessage func(timestamp int64, fields []logstorage.Field)) (int, error) {
|
||||||
|
|
|
@ -23,25 +23,25 @@ var (
|
||||||
pushReqsPool sync.Pool
|
pushReqsPool sync.Pool
|
||||||
)
|
)
|
||||||
|
|
||||||
func handleProtobuf(r *http.Request, w http.ResponseWriter) bool {
|
func handleProtobuf(r *http.Request, w http.ResponseWriter) {
|
||||||
startTime := time.Now()
|
startTime := time.Now()
|
||||||
lokiRequestsProtobufTotal.Inc()
|
requestsProtobufTotal.Inc()
|
||||||
wcr := writeconcurrencylimiter.GetReader(r.Body)
|
wcr := writeconcurrencylimiter.GetReader(r.Body)
|
||||||
data, err := io.ReadAll(wcr)
|
data, err := io.ReadAll(wcr)
|
||||||
writeconcurrencylimiter.PutReader(wcr)
|
writeconcurrencylimiter.PutReader(wcr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
httpserver.Errorf(w, r, "cannot read request body: %s", err)
|
httpserver.Errorf(w, r, "cannot read request body: %s", err)
|
||||||
return true
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
cp, err := getCommonParams(r)
|
cp, err := getCommonParams(r)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
httpserver.Errorf(w, r, "cannot parse common params from request: %s", err)
|
httpserver.Errorf(w, r, "cannot parse common params from request: %s", err)
|
||||||
return true
|
return
|
||||||
}
|
}
|
||||||
if err := vlstorage.CanWriteData(); err != nil {
|
if err := vlstorage.CanWriteData(); err != nil {
|
||||||
httpserver.Errorf(w, r, "%s", err)
|
httpserver.Errorf(w, r, "%s", err)
|
||||||
return true
|
return
|
||||||
}
|
}
|
||||||
lr := logstorage.GetLogRows(cp.StreamFields, cp.IgnoreFields)
|
lr := logstorage.GetLogRows(cp.StreamFields, cp.IgnoreFields)
|
||||||
processLogMessage := cp.GetProcessLogMessageFunc(lr)
|
processLogMessage := cp.GetProcessLogMessageFunc(lr)
|
||||||
|
@ -50,23 +50,21 @@ func handleProtobuf(r *http.Request, w http.ResponseWriter) bool {
|
||||||
logstorage.PutLogRows(lr)
|
logstorage.PutLogRows(lr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
httpserver.Errorf(w, r, "cannot parse Loki protobuf request: %s", err)
|
httpserver.Errorf(w, r, "cannot parse Loki protobuf request: %s", err)
|
||||||
return true
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
rowsIngestedProtobufTotal.Add(n)
|
rowsIngestedProtobufTotal.Add(n)
|
||||||
|
|
||||||
// update lokiRequestProtobufDuration only for successfully parsed requests
|
// update requestProtobufDuration only for successfully parsed requests
|
||||||
// There is no need in updating lokiRequestProtobufDuration for request errors,
|
// There is no need in updating requestProtobufDuration for request errors,
|
||||||
// since their timings are usually much smaller than the timing for successful request parsing.
|
// since their timings are usually much smaller than the timing for successful request parsing.
|
||||||
lokiRequestProtobufDuration.UpdateDuration(startTime)
|
requestProtobufDuration.UpdateDuration(startTime)
|
||||||
|
|
||||||
return true
|
|
||||||
}
|
}
|
||||||
|
|
||||||
var (
|
var (
|
||||||
lokiRequestsProtobufTotal = metrics.NewCounter(`vl_http_requests_total{path="/insert/loki/api/v1/push",format="protobuf"}`)
|
requestsProtobufTotal = metrics.NewCounter(`vl_http_requests_total{path="/insert/loki/api/v1/push",format="protobuf"}`)
|
||||||
rowsIngestedProtobufTotal = metrics.NewCounter(`vl_rows_ingested_total{type="loki",format="protobuf"}`)
|
rowsIngestedProtobufTotal = metrics.NewCounter(`vl_rows_ingested_total{type="loki",format="protobuf"}`)
|
||||||
lokiRequestProtobufDuration = metrics.NewHistogram(`vl_http_request_duration_seconds{path="/insert/loki/api/v1/push",format="protobuf"}`)
|
requestProtobufDuration = metrics.NewHistogram(`vl_http_request_duration_seconds{path="/insert/loki/api/v1/push",format="protobuf"}`)
|
||||||
)
|
)
|
||||||
|
|
||||||
func parseProtobufRequest(data []byte, processLogMessage func(timestamp int64, fields []logstorage.Field)) (int, error) {
|
func parseProtobufRequest(data []byte, processLogMessage func(timestamp int64, fields []logstorage.Field)) (int, error) {
|
||||||
|
|
|
@ -7,14 +7,17 @@ import (
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vlinsert/elasticsearch"
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vlinsert/elasticsearch"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vlinsert/jsonline"
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vlinsert/jsonline"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vlinsert/loki"
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vlinsert/loki"
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vlinsert/syslog"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Init initializes vlinsert
|
// Init initializes vlinsert
|
||||||
func Init() {
|
func Init() {
|
||||||
|
syslog.MustInit()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Stop stops vlinsert
|
// Stop stops vlinsert
|
||||||
func Stop() {
|
func Stop() {
|
||||||
|
syslog.MustStop()
|
||||||
}
|
}
|
||||||
|
|
||||||
// RequestHandler handles insert requests for VictoriaLogs
|
// RequestHandler handles insert requests for VictoriaLogs
|
||||||
|
@ -28,7 +31,8 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool {
|
||||||
path = strings.ReplaceAll(path, "//", "/")
|
path = strings.ReplaceAll(path, "//", "/")
|
||||||
|
|
||||||
if path == "/jsonline" {
|
if path == "/jsonline" {
|
||||||
return jsonline.RequestHandler(w, r)
|
jsonline.RequestHandler(w, r)
|
||||||
|
return true
|
||||||
}
|
}
|
||||||
switch {
|
switch {
|
||||||
case strings.HasPrefix(path, "/elasticsearch/"):
|
case strings.HasPrefix(path, "/elasticsearch/"):
|
||||||
|
|
389
app/vlinsert/syslog/syslog.go
Normal file
389
app/vlinsert/syslog/syslog.go
Normal file
|
@ -0,0 +1,389 @@
|
||||||
|
package syslog
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bufio"
|
||||||
|
"crypto/tls"
|
||||||
|
"errors"
|
||||||
|
"flag"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"net"
|
||||||
|
"strings"
|
||||||
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/klauspost/compress/gzip"
|
||||||
|
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vlinsert/insertutils"
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vlstorage"
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/ingestserver"
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage"
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil"
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
|
||||||
|
"github.com/VictoriaMetrics/metrics"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
syslogTenantID = flag.String("syslog.tenantID", "0:0", "TenantID for logs ingested via Syslog protocol. See https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/")
|
||||||
|
syslogTimezone = flag.String("syslog.timezone", "Local", "Timezone to use when parsing timestamps in RFC3164 syslog messages. Timezone must be a valid IANA Time Zone. "+
|
||||||
|
"For example: America/New_York, Europe/Berlin, Etc/GMT+3 . See https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/")
|
||||||
|
|
||||||
|
listenAddrTCP = flag.String("syslog.listenAddr.tcp", "", "Optional TCP address to listen to for Syslog messages. See https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/")
|
||||||
|
listenAddrUDP = flag.String("syslog.listenAddr.udp", "", "Optional UDP address to listen to for Syslog messages. See https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/")
|
||||||
|
|
||||||
|
tlsEnable = flag.Bool("syslog.tls", false, "Whether to use TLS for receiving syslog messages at -syslog.listenAddr.tcp. -syslog.tlsCertFile and -syslog.tlsKeyFile must be set "+
|
||||||
|
"if -syslog.tls is set. See https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/")
|
||||||
|
tlsCertFile = flag.String("syslog.tlsCertFile", "", "Path to file with TLS certificate for -syslog.listenAddr.tcp if -syslog.tls is set. "+
|
||||||
|
"Prefer ECDSA certs instead of RSA certs as RSA certs are slower. The provided certificate file is automatically re-read every second, so it can be dynamically updated. "+
|
||||||
|
"See https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/")
|
||||||
|
tlsKeyFile = flag.String("syslog.tlsKeyFile", "", "Path to file with TLS key for -syslog.listenAddr.tcp if -syslog.tls is set. "+
|
||||||
|
"The provided key file is automatically re-read every second, so it can be dynamically updated")
|
||||||
|
tlsCipherSuites = flagutil.NewArrayString("syslog.tlsCipherSuites", "Optional list of TLS cipher suites for -syslog.listenAddr.tcp if -syslog.tls is set. "+
|
||||||
|
"See the list of supported cipher suites at https://pkg.go.dev/crypto/tls#pkg-constants . "+
|
||||||
|
"See also https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/")
|
||||||
|
tlsMinVersion = flag.String("syslog.tlsMinVersion", "TLS13", "The minimum TLS version to use for -syslog.listenAddr.tcp if -syslog.tls is set. "+
|
||||||
|
"Supported values: TLS10, TLS11, TLS12, TLS13. "+
|
||||||
|
"See https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/")
|
||||||
|
|
||||||
|
compressMethod = flag.String("syslog.compressMethod", "", "Compression method for syslog messages received at -syslog.listenAddr.tcp and -syslog.listenAddr.udp. "+
|
||||||
|
"Supported values: none, gzip, deflate. See https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/")
|
||||||
|
)
|
||||||
|
|
||||||
|
// MustInit initializes syslog parser at the given -syslog.listenAddr.tcp and -syslog.listenAddr.udp ports
|
||||||
|
//
|
||||||
|
// This function must be called after flag.Parse().
|
||||||
|
//
|
||||||
|
// MustStop() must be called in order to free up resources occupied by the initialized syslog parser.
|
||||||
|
func MustInit() {
|
||||||
|
if workersStopCh != nil {
|
||||||
|
logger.Panicf("BUG: MustInit() called twice without MustStop() call")
|
||||||
|
}
|
||||||
|
workersStopCh = make(chan struct{})
|
||||||
|
|
||||||
|
tenantID, err := logstorage.GetTenantIDFromString(*syslogTenantID)
|
||||||
|
if err != nil {
|
||||||
|
logger.Fatalf("cannot parse -syslog.tenantID=%q: %s", *syslogTenantID, err)
|
||||||
|
}
|
||||||
|
globalTenantID = tenantID
|
||||||
|
|
||||||
|
switch *compressMethod {
|
||||||
|
case "", "none", "gzip", "deflate":
|
||||||
|
default:
|
||||||
|
logger.Fatalf("unexpected -syslog.compressLevel=%q; supported values: none, gzip, deflate", *compressMethod)
|
||||||
|
}
|
||||||
|
|
||||||
|
if *listenAddrTCP != "" {
|
||||||
|
workersWG.Add(1)
|
||||||
|
go func() {
|
||||||
|
runTCPListener(*listenAddrTCP)
|
||||||
|
workersWG.Done()
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
if *listenAddrUDP != "" {
|
||||||
|
workersWG.Add(1)
|
||||||
|
go func() {
|
||||||
|
runUDPListener(*listenAddrUDP)
|
||||||
|
workersWG.Done()
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
currentYear := time.Now().Year()
|
||||||
|
globalCurrentYear.Store(int64(currentYear))
|
||||||
|
workersWG.Add(1)
|
||||||
|
go func() {
|
||||||
|
ticker := time.NewTicker(time.Minute)
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-workersStopCh:
|
||||||
|
ticker.Stop()
|
||||||
|
workersWG.Done()
|
||||||
|
return
|
||||||
|
case <-ticker.C:
|
||||||
|
currentYear := time.Now().Year()
|
||||||
|
globalCurrentYear.Store(int64(currentYear))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
if *syslogTimezone != "" {
|
||||||
|
tz, err := time.LoadLocation(*syslogTimezone)
|
||||||
|
if err != nil {
|
||||||
|
logger.Fatalf("cannot parse -syslog.timezone=%q: %s", *syslogTimezone, err)
|
||||||
|
}
|
||||||
|
globalTimezone = tz
|
||||||
|
} else {
|
||||||
|
globalTimezone = time.Local
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
var (
|
||||||
|
globalTenantID logstorage.TenantID
|
||||||
|
globalCurrentYear atomic.Int64
|
||||||
|
globalTimezone *time.Location
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
workersWG sync.WaitGroup
|
||||||
|
workersStopCh chan struct{}
|
||||||
|
)
|
||||||
|
|
||||||
|
// MustStop stops syslog parser initialized via MustInit()
|
||||||
|
func MustStop() {
|
||||||
|
close(workersStopCh)
|
||||||
|
workersWG.Wait()
|
||||||
|
workersStopCh = nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func runUDPListener(addr string) {
|
||||||
|
ln, err := net.ListenPacket(netutil.GetUDPNetwork(), addr)
|
||||||
|
if err != nil {
|
||||||
|
logger.Fatalf("cannot start UDP syslog server at %q: %s", addr, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
doneCh := make(chan struct{})
|
||||||
|
go func() {
|
||||||
|
serveUDP(ln)
|
||||||
|
close(doneCh)
|
||||||
|
}()
|
||||||
|
|
||||||
|
<-workersStopCh
|
||||||
|
if err := ln.Close(); err != nil {
|
||||||
|
logger.Fatalf("syslog: cannot close UDP listener at %s: %s", addr, err)
|
||||||
|
}
|
||||||
|
<-doneCh
|
||||||
|
}
|
||||||
|
|
||||||
|
func runTCPListener(addr string) {
|
||||||
|
var tlsConfig *tls.Config
|
||||||
|
if *tlsEnable {
|
||||||
|
tc, err := netutil.GetServerTLSConfig(*tlsCertFile, *tlsKeyFile, *tlsMinVersion, *tlsCipherSuites)
|
||||||
|
if err != nil {
|
||||||
|
logger.Fatalf("cannot load TLS cert from -syslog.tlsCertFile=%q, -syslog.tlsKeyFile=%q, -syslog.tlsMinVersion=%q, -syslog.tlsCipherSuites=%q: %s",
|
||||||
|
*tlsCertFile, *tlsKeyFile, *tlsMinVersion, *tlsCipherSuites, err)
|
||||||
|
}
|
||||||
|
tlsConfig = tc
|
||||||
|
}
|
||||||
|
ln, err := netutil.NewTCPListener("syslog", addr, false, tlsConfig)
|
||||||
|
if err != nil {
|
||||||
|
logger.Fatalf("syslog: cannot start TCP listener at %s: %s", addr, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
doneCh := make(chan struct{})
|
||||||
|
go func() {
|
||||||
|
serveTCP(ln)
|
||||||
|
close(doneCh)
|
||||||
|
}()
|
||||||
|
|
||||||
|
<-workersStopCh
|
||||||
|
if err := ln.Close(); err != nil {
|
||||||
|
logger.Fatalf("syslog: cannot close TCP listener at %s: %s", addr, err)
|
||||||
|
}
|
||||||
|
<-doneCh
|
||||||
|
}
|
||||||
|
|
||||||
|
func serveUDP(ln net.PacketConn) {
|
||||||
|
gomaxprocs := cgroup.AvailableCPUs()
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
localAddr := ln.LocalAddr()
|
||||||
|
for i := 0; i < gomaxprocs; i++ {
|
||||||
|
wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
cp := insertutils.GetCommonParamsForSyslog(globalTenantID)
|
||||||
|
var bb bytesutil.ByteBuffer
|
||||||
|
bb.B = bytesutil.ResizeNoCopyNoOverallocate(bb.B, 64*1024)
|
||||||
|
for {
|
||||||
|
bb.Reset()
|
||||||
|
bb.B = bb.B[:cap(bb.B)]
|
||||||
|
n, remoteAddr, err := ln.ReadFrom(bb.B)
|
||||||
|
if err != nil {
|
||||||
|
udpErrorsTotal.Inc()
|
||||||
|
var ne net.Error
|
||||||
|
if errors.As(err, &ne) {
|
||||||
|
if ne.Temporary() {
|
||||||
|
logger.Errorf("syslog: temporary error when listening for UDP at %q: %s", localAddr, err)
|
||||||
|
time.Sleep(time.Second)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if strings.Contains(err.Error(), "use of closed network connection") {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
logger.Errorf("syslog: cannot read UDP data from %s at %s: %s", remoteAddr, localAddr, err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
bb.B = bb.B[:n]
|
||||||
|
udpRequestsTotal.Inc()
|
||||||
|
if err := processStream(bb.NewReader(), cp); err != nil {
|
||||||
|
logger.Errorf("syslog: cannot process UDP data from %s at %s: %s", remoteAddr, localAddr, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
wg.Wait()
|
||||||
|
}
|
||||||
|
|
||||||
|
func serveTCP(ln net.Listener) {
|
||||||
|
var cm ingestserver.ConnsMap
|
||||||
|
cm.Init("syslog")
|
||||||
|
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
addr := ln.Addr()
|
||||||
|
for {
|
||||||
|
c, err := ln.Accept()
|
||||||
|
if err != nil {
|
||||||
|
var ne net.Error
|
||||||
|
if errors.As(err, &ne) {
|
||||||
|
if ne.Temporary() {
|
||||||
|
logger.Errorf("syslog: temporary error when listening for TCP addr %q: %s", addr, err)
|
||||||
|
time.Sleep(time.Second)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if strings.Contains(err.Error(), "use of closed network connection") {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
logger.Fatalf("syslog: unrecoverable error when accepting TCP connections at %q: %s", addr, err)
|
||||||
|
}
|
||||||
|
logger.Fatalf("syslog: unexpected error when accepting TCP connections at %q: %s", addr, err)
|
||||||
|
}
|
||||||
|
if !cm.Add(c) {
|
||||||
|
_ = c.Close()
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
cp := insertutils.GetCommonParamsForSyslog(globalTenantID)
|
||||||
|
if err := processStream(c, cp); err != nil {
|
||||||
|
logger.Errorf("syslog: cannot process TCP data at %q: %s", addr, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
cm.Delete(c)
|
||||||
|
_ = c.Close()
|
||||||
|
wg.Done()
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
cm.CloseAll(0)
|
||||||
|
wg.Wait()
|
||||||
|
}
|
||||||
|
|
||||||
|
// processStream parses a stream of syslog messages from r and ingests them into vlstorage.
|
||||||
|
func processStream(r io.Reader, cp *insertutils.CommonParams) error {
|
||||||
|
switch *compressMethod {
|
||||||
|
case "", "none":
|
||||||
|
case "gzip":
|
||||||
|
zr, err := common.GetGzipReader(r)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("cannot read gzipped data: %w", err)
|
||||||
|
}
|
||||||
|
r = zr
|
||||||
|
case "deflate":
|
||||||
|
zr, err := common.GetZlibReader(r)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("cannot read deflated data: %w", err)
|
||||||
|
}
|
||||||
|
r = zr
|
||||||
|
default:
|
||||||
|
logger.Panicf("BUG: compressLevel=%q; supported values: none, gzip, deflate", *compressMethod)
|
||||||
|
}
|
||||||
|
|
||||||
|
err := processUncompressedStream(r, cp)
|
||||||
|
|
||||||
|
switch *compressMethod {
|
||||||
|
case "gzip":
|
||||||
|
zr := r.(*gzip.Reader)
|
||||||
|
common.PutGzipReader(zr)
|
||||||
|
case "deflate":
|
||||||
|
zr := r.(io.ReadCloser)
|
||||||
|
common.PutZlibReader(zr)
|
||||||
|
}
|
||||||
|
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func processUncompressedStream(r io.Reader, cp *insertutils.CommonParams) error {
|
||||||
|
if err := vlstorage.CanWriteData(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
lr := logstorage.GetLogRows(cp.StreamFields, nil)
|
||||||
|
processLogMessage := cp.GetProcessLogMessageFunc(lr)
|
||||||
|
|
||||||
|
wcr := writeconcurrencylimiter.GetReader(r)
|
||||||
|
defer writeconcurrencylimiter.PutReader(wcr)
|
||||||
|
|
||||||
|
lb := lineBufferPool.Get()
|
||||||
|
defer lineBufferPool.Put(lb)
|
||||||
|
|
||||||
|
lb.B = bytesutil.ResizeNoCopyNoOverallocate(lb.B, insertutils.MaxLineSizeBytes.IntN())
|
||||||
|
sc := bufio.NewScanner(wcr)
|
||||||
|
sc.Buffer(lb.B, len(lb.B))
|
||||||
|
|
||||||
|
n := 0
|
||||||
|
for {
|
||||||
|
currentYear := int(globalCurrentYear.Load())
|
||||||
|
ok, err := readLine(sc, currentYear, processLogMessage)
|
||||||
|
wcr.DecConcurrency()
|
||||||
|
if err != nil {
|
||||||
|
errorsTotal.Inc()
|
||||||
|
return fmt.Errorf("cannot read line #%d: %s", n, err)
|
||||||
|
}
|
||||||
|
if !ok {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
n++
|
||||||
|
rowsIngestedTotal.Inc()
|
||||||
|
}
|
||||||
|
|
||||||
|
vlstorage.MustAddRows(lr)
|
||||||
|
logstorage.PutLogRows(lr)
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func readLine(sc *bufio.Scanner, currentYear int, processLogMessage func(timestamp int64, fields []logstorage.Field)) (bool, error) {
|
||||||
|
var line []byte
|
||||||
|
for len(line) == 0 {
|
||||||
|
if !sc.Scan() {
|
||||||
|
if err := sc.Err(); err != nil {
|
||||||
|
if errors.Is(err, bufio.ErrTooLong) {
|
||||||
|
return false, fmt.Errorf(`line size exceeds -insert.maxLineSizeBytes=%d`, insertutils.MaxLineSizeBytes.IntN())
|
||||||
|
}
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
line = sc.Bytes()
|
||||||
|
}
|
||||||
|
|
||||||
|
p := logstorage.GetSyslogParser(currentYear, globalTimezone)
|
||||||
|
lineStr := bytesutil.ToUnsafeString(line)
|
||||||
|
p.Parse(lineStr)
|
||||||
|
ts, err := insertutils.ExtractTimestampISO8601FromFields("timestamp", p.Fields)
|
||||||
|
if err != nil {
|
||||||
|
return false, fmt.Errorf("cannot get timestamp from syslog line %q: %w", line, err)
|
||||||
|
}
|
||||||
|
logstorage.RenameField(p.Fields, "message", "_msg")
|
||||||
|
processLogMessage(ts, p.Fields)
|
||||||
|
logstorage.PutSyslogParser(p)
|
||||||
|
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
var lineBufferPool bytesutil.ByteBufferPool
|
||||||
|
|
||||||
|
var (
|
||||||
|
rowsIngestedTotal = metrics.NewCounter(`vl_rows_ingested_total{type="syslog"}`)
|
||||||
|
|
||||||
|
errorsTotal = metrics.NewCounter(`vl_errors_total{type="syslog"}`)
|
||||||
|
|
||||||
|
udpRequestsTotal = metrics.NewCounter(`vl_udp_reqests_total{type="syslog"}`)
|
||||||
|
udpErrorsTotal = metrics.NewCounter(`vl_udp_errors_total{type="syslog"}`)
|
||||||
|
)
|
60
app/vlinsert/syslog/syslog_test.go
Normal file
60
app/vlinsert/syslog/syslog_test.go
Normal file
|
@ -0,0 +1,60 @@
|
||||||
|
package syslog
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bufio"
|
||||||
|
"bytes"
|
||||||
|
"reflect"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestReadLine_Success(t *testing.T) {
|
||||||
|
f := func(data string, currentYear, rowsExpected int, timestampsExpected []int64, resultExpected string) {
|
||||||
|
t.Helper()
|
||||||
|
|
||||||
|
var timestamps []int64
|
||||||
|
var result string
|
||||||
|
processLogMessage := func(timestamp int64, fields []logstorage.Field) {
|
||||||
|
timestamps = append(timestamps, timestamp)
|
||||||
|
result += string(logstorage.MarshalFieldsToJSON(nil, fields)) + "\n"
|
||||||
|
}
|
||||||
|
|
||||||
|
r := bytes.NewBufferString(data)
|
||||||
|
sc := bufio.NewScanner(r)
|
||||||
|
rows := 0
|
||||||
|
for {
|
||||||
|
ok, err := readLine(sc, currentYear, processLogMessage)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unexpected error: %s", err)
|
||||||
|
}
|
||||||
|
if !ok {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
rows++
|
||||||
|
}
|
||||||
|
if rows != rowsExpected {
|
||||||
|
t.Fatalf("unexpected rows read; got %d; want %d", rows, rowsExpected)
|
||||||
|
}
|
||||||
|
|
||||||
|
if !reflect.DeepEqual(timestamps, timestampsExpected) {
|
||||||
|
t.Fatalf("unexpected timestamps;\ngot\n%d\nwant\n%d", timestamps, timestampsExpected)
|
||||||
|
}
|
||||||
|
if result != resultExpected {
|
||||||
|
t.Fatalf("unexpected result;\ngot\n%s\nwant\n%s", result, resultExpected)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
data := `Jun 3 12:08:33 abcd systemd: Starting Update the local ESM caches...
|
||||||
|
<165>Jun 4 12:08:33 abcd systemd[345]: abc defg
|
||||||
|
<123>1 2023-06-03T17:42:12.345Z mymachine.example.com appname 12345 ID47 [exampleSDID@32473 iut="3" eventSource="Application 123 = ] 56" eventID="11211"] This is a test message with structured data.
|
||||||
|
`
|
||||||
|
currentYear := 2023
|
||||||
|
rowsExpected := 3
|
||||||
|
timestampsExpected := []int64{1685794113000000000, 1685880513000000000, 1685814132345000000}
|
||||||
|
resultExpected := `{"format":"rfc3164","timestamp":"","hostname":"abcd","app_name":"systemd","_msg":"Starting Update the local ESM caches..."}
|
||||||
|
{"priority":"165","facility":"20","severity":"5","format":"rfc3164","timestamp":"","hostname":"abcd","app_name":"systemd","proc_id":"345","_msg":"abc defg"}
|
||||||
|
{"priority":"123","facility":"15","severity":"3","format":"rfc5424","timestamp":"","hostname":"mymachine.example.com","app_name":"appname","proc_id":"12345","msg_id":"ID47","exampleSDID@32473":"iut=\"3\" eventSource=\"Application 123 = ] 56\" eventID=\"11211\"","_msg":"This is a test message with structured data."}
|
||||||
|
`
|
||||||
|
f(data, currentYear, rowsExpected, timestampsExpected, resultExpected)
|
||||||
|
}
|
|
@ -19,6 +19,8 @@ according to [these docs](https://docs.victoriametrics.com/victorialogs/quicksta
|
||||||
|
|
||||||
## tip
|
## tip
|
||||||
|
|
||||||
|
* FEATURE: add ability to accept logs in [Syslog format](https://en.wikipedia.org/wiki/Syslog). See [these docs](https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/).
|
||||||
|
* FEATURE: add abitlity to specify timezone `offset` when parsing [rfc3164](https://datatracker.ietf.org/doc/html/rfc3164) syslog messages with [`unpack_syslog` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#unpack_syslog-pipe).
|
||||||
* FEATURE: add [`top` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#top-pipe) for returning top N sets of the given fields with the maximum number of matching log entries.
|
* FEATURE: add [`top` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#top-pipe) for returning top N sets of the given fields with the maximum number of matching log entries.
|
||||||
|
|
||||||
## [v0.19.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.19.0-victorialogs)
|
## [v0.19.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.19.0-victorialogs)
|
||||||
|
@ -286,7 +288,7 @@ Released at 2023-10-03
|
||||||
|
|
||||||
Released at 2023-07-20
|
Released at 2023-07-20
|
||||||
|
|
||||||
* FEATURE: add support for data ingestion via Promtail (aka default log shipper for Grafana Loki). See [these](https://docs.victoriametrics.com/victorialogs/data-ingestion/Promtail.html) and [these](https://docs.victoriametrics.com/victorialogs/data-ingestion/#loki-json-api) docs.
|
* FEATURE: add support for data ingestion via Promtail (aka default log shipper for Grafana Loki). See [these](https://docs.victoriametrics.com/victorialogs/data-ingestion/promtail/) and [these](https://docs.victoriametrics.com/victorialogs/data-ingestion/#loki-json-api) docs.
|
||||||
|
|
||||||
## [v0.2.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.2.0-victorialogs)
|
## [v0.2.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.2.0-victorialogs)
|
||||||
|
|
||||||
|
|
|
@ -2550,6 +2550,13 @@ The following query is equivalent to the previous one:
|
||||||
_time:5m | unpack_syslog
|
_time:5m | unpack_syslog
|
||||||
```
|
```
|
||||||
|
|
||||||
|
By default timestamps in [RFC3164 format](https://datatracker.ietf.org/doc/html/rfc3164) are converted to local timezone. It is possible to change the timezone
|
||||||
|
offset via `offset` option. For example, the following query adds 5 hours and 30 minutes to unpacked `rfc3164` timestamps:
|
||||||
|
|
||||||
|
```logsql
|
||||||
|
_time:5m | unpack_syslog offset 5h30m
|
||||||
|
```
|
||||||
|
|
||||||
If it is needed to preserve the original non-empty field values, then add `keep_original_fields` to the end of `unpack_syslog ...`:
|
If it is needed to preserve the original non-empty field values, then add `keep_original_fields` to the end of `unpack_syslog ...`:
|
||||||
|
|
||||||
```logsql
|
```logsql
|
||||||
|
|
|
@ -16,11 +16,12 @@ aliases:
|
||||||
|
|
||||||
[VictoriaLogs](https://docs.victoriametrics.com/victorialogs/) can accept logs from the following log collectors:
|
[VictoriaLogs](https://docs.victoriametrics.com/victorialogs/) can accept logs from the following log collectors:
|
||||||
|
|
||||||
- Filebeat. See [how to setup Filebeat for sending logs to VictoriaLogs](https://docs.victoriametrics.com/victorialogs/data-ingestion/Filebeat.html).
|
- Syslog - see [these docs](https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/).
|
||||||
- Fluentbit. See [how to setup Fluentbit for sending logs to VictoriaLogs](https://docs.victoriametrics.com/victorialogs/data-ingestion/Fluentbit.html).
|
- Filebeat - see [these docs](https://docs.victoriametrics.com/victorialogs/data-ingestion/filebeat/).
|
||||||
- Logstash. See [how to setup Logstash for sending logs to VictoriaLogs](https://docs.victoriametrics.com/victorialogs/data-ingestion/Logstash.html).
|
- Fluentbit - see [these docs](https://docs.victoriametrics.com/victorialogs/data-ingestion/fluentbit/).
|
||||||
- Vector. See [how to setup Vector for sending logs to VictoriaLogs](https://docs.victoriametrics.com/victorialogs/data-ingestion/Vector.html).
|
- Logstash - see [these docs](https://docs.victoriametrics.com/victorialogs/data-ingestion/logstash/).
|
||||||
- Promtail (aka Grafana Loki). See [how to setup Promtail for sending logs to VictoriaLogs](https://docs.victoriametrics.com/victorialogs/data-ingestion/Promtail.html).
|
- Vector - see [these docs](https://docs.victoriametrics.com/victorialogs/data-ingestion/vector/).
|
||||||
|
- Promtail (aka Grafana Loki) - see [these docs](https://docs.victoriametrics.com/victorialogs/data-ingestion/promtail/).
|
||||||
|
|
||||||
The ingested logs can be queried according to [these docs](https://docs.victoriametrics.com/victorialogs/querying/).
|
The ingested logs can be queried according to [these docs](https://docs.victoriametrics.com/victorialogs/querying/).
|
||||||
|
|
||||||
|
@ -258,8 +259,8 @@ Here is the list of log collectors and their ingestion formats supported by Vict
|
||||||
|
|
||||||
| How to setup the collector | Format: Elasticsearch | Format: JSON Stream | Format: Loki |
|
| How to setup the collector | Format: Elasticsearch | Format: JSON Stream | Format: Loki |
|
||||||
|------------------------------------------------------------------------------------------|--------------------------------------------------------------------------------------------|---------------------------------------------------------------|-------------------------------------------------------------------------------------|
|
|------------------------------------------------------------------------------------------|--------------------------------------------------------------------------------------------|---------------------------------------------------------------|-------------------------------------------------------------------------------------|
|
||||||
| [Filebeat](https://docs.victoriametrics.com/victorialogs/data-ingestion/Filebeat.html) | [Yes](https://www.elastic.co/guide/en/beats/filebeat/current/elasticsearch-output.html) | No | No |
|
| [Filebeat](https://docs.victoriametrics.com/victorialogs/data-ingestion/filebeat/) | [Yes](https://www.elastic.co/guide/en/beats/filebeat/current/elasticsearch-output.html) | No | No |
|
||||||
| [Fluentbit](https://docs.victoriametrics.com/victorialogs/data-ingestion/Fluentbit.html) | No | [Yes](https://docs.fluentbit.io/manual/pipeline/outputs/http) | [Yes](https://docs.fluentbit.io/manual/pipeline/outputs/loki) |
|
| [Fluentbit](https://docs.victoriametrics.com/victorialogs/data-ingestion/fluentbit/) | No | [Yes](https://docs.fluentbit.io/manual/pipeline/outputs/http) | [Yes](https://docs.fluentbit.io/manual/pipeline/outputs/loki) |
|
||||||
| [Logstash](https://docs.victoriametrics.com/victorialogs/data-ingestion/Logstash.html) | [Yes](https://www.elastic.co/guide/en/logstash/current/plugins-outputs-elasticsearch.html) | No | No |
|
| [Logstash](https://docs.victoriametrics.com/victorialogs/data-ingestion/logstash/) | [Yes](https://www.elastic.co/guide/en/logstash/current/plugins-outputs-elasticsearch.html) | No | No |
|
||||||
| [Vector](https://docs.victoriametrics.com/victorialogs/data-ingestion/Vector.html) | [Yes](https://vector.dev/docs/reference/configuration/sinks/elasticsearch/) | [Yes](https://vector.dev/docs/reference/configuration/sinks/http/) | [Yes](https://vector.dev/docs/reference/configuration/sinks/loki/) |
|
| [Vector](https://docs.victoriametrics.com/victorialogs/data-ingestion/vector/) | [Yes](https://vector.dev/docs/reference/configuration/sinks/elasticsearch/) | [Yes](https://vector.dev/docs/reference/configuration/sinks/http/) | [Yes](https://vector.dev/docs/reference/configuration/sinks/loki/) |
|
||||||
| [Promtail](https://docs.victoriametrics.com/victorialogs/data-ingestion/Promtail.html) | No | No | [Yes](https://grafana.com/docs/loki/latest/clients/promtail/configuration/#clients) |
|
| [Promtail](https://docs.victoriametrics.com/victorialogs/data-ingestion/promtail/) | No | No | [Yes](https://grafana.com/docs/loki/latest/clients/promtail/configuration/#clients) |
|
||||||
|
|
|
@ -1,11 +1,11 @@
|
||||||
---
|
---
|
||||||
weight: 5
|
weight: 20
|
||||||
title: Vector setup
|
title: Vector setup
|
||||||
disableToc: true
|
disableToc: true
|
||||||
menu:
|
menu:
|
||||||
docs:
|
docs:
|
||||||
parent: "victorialogs-data-ingestion"
|
parent: "victorialogs-data-ingestion"
|
||||||
weight: 5
|
weight: 20
|
||||||
aliases:
|
aliases:
|
||||||
- /VictoriaLogs/data-ingestion/Vector.html
|
- /VictoriaLogs/data-ingestion/Vector.html
|
||||||
---
|
---
|
||||||
|
|
99
docs/VictoriaLogs/data-ingestion/syslog.md
Normal file
99
docs/VictoriaLogs/data-ingestion/syslog.md
Normal file
|
@ -0,0 +1,99 @@
|
||||||
|
---
|
||||||
|
weight: 10
|
||||||
|
title: Syslog setup
|
||||||
|
disableToc: true
|
||||||
|
menu:
|
||||||
|
docs:
|
||||||
|
parent: "victorialogs-data-ingestion"
|
||||||
|
weight: 10
|
||||||
|
---
|
||||||
|
|
||||||
|
# Syslog setup
|
||||||
|
|
||||||
|
[VictoriaLogs](https://docs.victoriametrics.com/victorialogs/) can accept logs in [Syslog formats](https://en.wikipedia.org/wiki/Syslog) at the specified TCP and UDP addresses
|
||||||
|
via `-syslog.listenAddr.tcp` and `-syslog.listenAddr.udp` command-line flags. The following syslog formats are supported:
|
||||||
|
|
||||||
|
- [RFC3164](https://datatracker.ietf.org/doc/html/rfc3164) aka `<PRI>MMM DD hh:mm:ss HOSTNAME APP-NAME[PROCID]: MESSAGE`
|
||||||
|
- [RFC5424](https://datatracker.ietf.org/doc/html/rfc5424) aka `<PRI>1 TIMESTAMP HOSTNAME APP-NAME PROCID MSGID [STRUCTURED-DATA] MESSAGE`
|
||||||
|
|
||||||
|
For example, the following command starts VictoriaLogs, which accepts logs in Syslog format at TCP port 514 on all the network interfaces:
|
||||||
|
|
||||||
|
```sh
|
||||||
|
./victoria-logs -syslog.listenAddr.tcp=:514
|
||||||
|
```
|
||||||
|
|
||||||
|
It may be needed to run VictoriaLogs under `root` user or to set [`CAP_NET_BIND_SERVICE`](https://superuser.com/questions/710253/allow-non-root-process-to-bind-to-port-80-and-443)
|
||||||
|
option if syslog messages must be accepted at TCP port below 1024.
|
||||||
|
|
||||||
|
The following command starts VictoriaLogs, which accepts logs in Syslog format at TCP and UDP ports 514:
|
||||||
|
|
||||||
|
```sh
|
||||||
|
./victoria-logs -syslog.listenAddr.tcp=:514 -syslog.listenAddr.udp=:514
|
||||||
|
```
|
||||||
|
|
||||||
|
Multiple logs in Syslog format can be ingested via a single TCP connection or via a single UDP packet - just put every log on a separate line
|
||||||
|
and delimit them with `\n` char.
|
||||||
|
|
||||||
|
VictoriaLogs automatically extracts the following [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model)
|
||||||
|
from the received Syslog lines:
|
||||||
|
|
||||||
|
- [`_time`](https://docs.victoriametrics.com/victorialogs/keyconcepts/#time-field) - log timestamp
|
||||||
|
- [`_msg`](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field) - the `MESSAGE` field from the supported syslog formats above
|
||||||
|
- `hostname`, `app_name` and `proc_id` - [stream fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields) for unique identification
|
||||||
|
over every log stream
|
||||||
|
- `priority`, `factility` and `severity` - these fields are extracted from `<PRI>` field
|
||||||
|
- `format` - this field is set to either `rfc3164` or `rfc5424` depending on the format of the parsed syslog line
|
||||||
|
- `msg_id` - `MSGID` field from log line in `RFC5424` format.
|
||||||
|
|
||||||
|
By default local timezone is used when parsing timestamps in `rfc3164` lines. This can be changed to any desired timezone via `-syslog.timezone` command-line flag.
|
||||||
|
See [the list of supported timezone identifiers](https://en.wikipedia.org/wiki/List_of_tz_database_time_zones). For example, the following command starts VictoriaLogs,
|
||||||
|
which parses syslog timestamps in `rfc3164` using `Europe/Berlin` timezone:
|
||||||
|
|
||||||
|
```sh
|
||||||
|
./victoria-logs -syslog.listenAddr.tcp=:514 -syslog.timezone='Europe/Berlin'
|
||||||
|
```
|
||||||
|
|
||||||
|
See also:
|
||||||
|
|
||||||
|
- [Security](#security)
|
||||||
|
- [Compression](#compression)
|
||||||
|
- [Multitenancy](#multitenancy)
|
||||||
|
- [Data ingestion troubleshooting](https://docs.victoriametrics.com/victorialogs/data-ingestion/#troubleshooting).
|
||||||
|
- [How to query VictoriaLogs](https://docs.victoriametrics.com/victorialogs/querying/).
|
||||||
|
|
||||||
|
## Security
|
||||||
|
|
||||||
|
By default VictoriaLogs accepts plaintext data at `-syslog.listenAddr.tcp` address. Run VictoriaLogs with `-syslog.tls` command-line flag
|
||||||
|
in order to accept TLS-encrypted logs at `-syslog.listenAddr.tcp` address. The `-syslog.tlsCertFile` and `-syslog.tlsKeyFile` command-line flags
|
||||||
|
must be set to paths to TLS certificate file and TLS key file if `-syslog.tls` is set. For example, the following command
|
||||||
|
starts VictoriaLogs, which accepts TLS-encrypted syslog messages at TCP port 514:
|
||||||
|
|
||||||
|
```sh
|
||||||
|
./victoria-logs -syslog.listenAddr.tcp=:514 -syslog.tls -syslog.tlsCertFile=/path/to/tls/cert -syslog.tlsKeyFile=/path/to/tls/key
|
||||||
|
```
|
||||||
|
|
||||||
|
## Compression
|
||||||
|
|
||||||
|
By default VictoriaLogs accepts uncompressed log messages in Syslog format at `-syslog.listenAddr.tcp` and `-syslog.listenAddr.udp` addresses.
|
||||||
|
It is possible configuring VictoriaLogs to accept compressed log messages via `-syslog.compressMethod` command-line flag. The following
|
||||||
|
compression methods are supported:
|
||||||
|
|
||||||
|
- `none` - no compression
|
||||||
|
- `gzip` - [gzip compression](https://en.wikipedia.org/wiki/Gzip)
|
||||||
|
- `deflate` - [deflate compression](https://en.wikipedia.org/wiki/Deflate)
|
||||||
|
|
||||||
|
For example, the following command starts VictoriaLogs, which accepts gzip-compressed syslog messages at TCP port 514:
|
||||||
|
|
||||||
|
```sh
|
||||||
|
./victoria-logs -syslog.listenAddr.tcp=:514 -syslog.compressMethod=gzip
|
||||||
|
```
|
||||||
|
|
||||||
|
## Multitenancy
|
||||||
|
|
||||||
|
By default, the ingested logs are stored in the `(AccountID=0, ProjectID=0)` [tenant](https://docs.victoriametrics.com/victorialogs/#multitenancy).
|
||||||
|
If you need storing logs in other tenant, then specify the needed tenant via `-syslog.tenantID` command-line flag.
|
||||||
|
For example, the following command starts VictoriaLogs, which writes syslog messages received at TCP port 514, to `(AccountID=12, ProjectID=34)` tenant:
|
||||||
|
|
||||||
|
```sh
|
||||||
|
./victoria-logs -syslog.listenAddr.tcp=:514 -syslog.tenantID=12:34
|
||||||
|
```
|
|
@ -44,6 +44,8 @@ func (cm *ConnsMap) Delete(c net.Conn) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// CloseAll gradually closes all the cm conns with during the given shutdownDuration.
|
// CloseAll gradually closes all the cm conns with during the given shutdownDuration.
|
||||||
|
//
|
||||||
|
// If shutdownDuration <= 0, then all the connections are closed simultaneously.
|
||||||
func (cm *ConnsMap) CloseAll(shutdownDuration time.Duration) {
|
func (cm *ConnsMap) CloseAll(shutdownDuration time.Duration) {
|
||||||
cm.mu.Lock()
|
cm.mu.Lock()
|
||||||
conns := make([]net.Conn, 0, len(cm.m))
|
conns := make([]net.Conn, 0, len(cm.m))
|
||||||
|
@ -70,8 +72,8 @@ func (cm *ConnsMap) CloseAll(shutdownDuration time.Duration) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Sort vminsert conns in order to make the order of closing connections deterministic across vmstorage nodes.
|
// Sort conns in order to make the order of closing connections deterministic across clients.
|
||||||
// This should reduce resource usage spikes at vmstorage nodes during rolling restarts.
|
// This should reduce resource usage spikes at clients during rolling restarts.
|
||||||
sort.Slice(conns, func(i, j int) bool {
|
sort.Slice(conns, func(i, j int) bool {
|
||||||
return conns[i].RemoteAddr().String() < conns[j].RemoteAddr().String()
|
return conns[i].RemoteAddr().String() < conns[j].RemoteAddr().String()
|
||||||
})
|
})
|
||||||
|
|
|
@ -1166,7 +1166,7 @@ func (br *blockResult) getBucketedValue(s string, bf *byStatsField) string {
|
||||||
return bytesutil.ToUnsafeString(buf[bufLen:])
|
return bytesutil.ToUnsafeString(buf[bufLen:])
|
||||||
}
|
}
|
||||||
|
|
||||||
if timestamp, ok := tryParseTimestampISO8601(s); ok {
|
if timestamp, ok := TryParseTimestampISO8601(s); ok {
|
||||||
bucketSizeInt := int64(bf.bucketSize)
|
bucketSizeInt := int64(bf.bucketSize)
|
||||||
if bucketSizeInt <= 0 {
|
if bucketSizeInt <= 0 {
|
||||||
bucketSizeInt = 1
|
bucketSizeInt = 1
|
||||||
|
@ -1190,7 +1190,7 @@ func (br *blockResult) getBucketedValue(s string, bf *byStatsField) string {
|
||||||
return bytesutil.ToUnsafeString(buf[bufLen:])
|
return bytesutil.ToUnsafeString(buf[bufLen:])
|
||||||
}
|
}
|
||||||
|
|
||||||
if timestamp, ok := tryParseTimestampRFC3339Nano(s); ok {
|
if timestamp, ok := TryParseTimestampRFC3339Nano(s); ok {
|
||||||
bucketSizeInt := int64(bf.bucketSize)
|
bucketSizeInt := int64(bf.bucketSize)
|
||||||
if bucketSizeInt <= 0 {
|
if bucketSizeInt <= 0 {
|
||||||
bucketSizeInt = 1
|
bucketSizeInt = 1
|
||||||
|
|
|
@ -102,7 +102,7 @@ func (fr *filterDayRange) applyToBlockResult(br *blockResult, bm *bitmap) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (fr *filterDayRange) matchTimestampString(v string) bool {
|
func (fr *filterDayRange) matchTimestampString(v string) bool {
|
||||||
timestamp, ok := tryParseTimestampRFC3339Nano(v)
|
timestamp, ok := TryParseTimestampRFC3339Nano(v)
|
||||||
if !ok {
|
if !ok {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
|
@ -141,7 +141,7 @@ func (fe *filterExact) applyToBlockResult(br *blockResult, bm *bitmap) {
|
||||||
return ip == ipNeeded
|
return ip == ipNeeded
|
||||||
})
|
})
|
||||||
case valueTypeTimestampISO8601:
|
case valueTypeTimestampISO8601:
|
||||||
timestampNeeded, ok := tryParseTimestampISO8601(value)
|
timestampNeeded, ok := TryParseTimestampISO8601(value)
|
||||||
if !ok {
|
if !ok {
|
||||||
bm.resetBits()
|
bm.resetBits()
|
||||||
return
|
return
|
||||||
|
@ -213,7 +213,7 @@ func (fe *filterExact) applyToBlockSearch(bs *blockSearch, bm *bitmap) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func matchTimestampISO8601ByExactValue(bs *blockSearch, ch *columnHeader, bm *bitmap, value string, tokens []string) {
|
func matchTimestampISO8601ByExactValue(bs *blockSearch, ch *columnHeader, bm *bitmap, value string, tokens []string) {
|
||||||
n, ok := tryParseTimestampISO8601(value)
|
n, ok := TryParseTimestampISO8601(value)
|
||||||
if !ok || n < int64(ch.minValue) || n > int64(ch.maxValue) {
|
if !ok || n < int64(ch.minValue) || n > int64(ch.maxValue) {
|
||||||
bm.resetBits()
|
bm.resetBits()
|
||||||
return
|
return
|
||||||
|
|
|
@ -245,7 +245,7 @@ func (fi *filterIn) initTimestampISO8601Values() {
|
||||||
m := make(map[string]struct{}, len(values))
|
m := make(map[string]struct{}, len(values))
|
||||||
buf := make([]byte, 0, len(values)*8)
|
buf := make([]byte, 0, len(values)*8)
|
||||||
for _, v := range values {
|
for _, v := range values {
|
||||||
n, ok := tryParseTimestampISO8601(v)
|
n, ok := TryParseTimestampISO8601(v)
|
||||||
if !ok {
|
if !ok {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
|
@ -100,7 +100,7 @@ func (fp *filterPhrase) applyToBlockSearch(bs *blockSearch, bm *bitmap) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func matchTimestampISO8601ByPhrase(bs *blockSearch, ch *columnHeader, bm *bitmap, phrase string, tokens []string) {
|
func matchTimestampISO8601ByPhrase(bs *blockSearch, ch *columnHeader, bm *bitmap, phrase string, tokens []string) {
|
||||||
_, ok := tryParseTimestampISO8601(phrase)
|
_, ok := TryParseTimestampISO8601(phrase)
|
||||||
if ok {
|
if ok {
|
||||||
// Fast path - the phrase contains complete timestamp, so we can use exact search
|
// Fast path - the phrase contains complete timestamp, so we can use exact search
|
||||||
matchTimestampISO8601ByExactValue(bs, ch, bm, phrase, tokens)
|
matchTimestampISO8601ByExactValue(bs, ch, bm, phrase, tokens)
|
||||||
|
|
|
@ -96,7 +96,7 @@ func (ft *filterTime) applyToBlockResult(br *blockResult, bm *bitmap) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ft *filterTime) matchTimestampString(v string) bool {
|
func (ft *filterTime) matchTimestampString(v string) bool {
|
||||||
timestamp, ok := tryParseTimestampRFC3339Nano(v)
|
timestamp, ok := TryParseTimestampRFC3339Nano(v)
|
||||||
if !ok {
|
if !ok {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
|
@ -104,7 +104,7 @@ func (fr *filterWeekRange) applyToBlockResult(br *blockResult, bm *bitmap) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (fr *filterWeekRange) matchTimestampString(v string) bool {
|
func (fr *filterWeekRange) matchTimestampString(v string) bool {
|
||||||
timestamp, ok := tryParseTimestampRFC3339Nano(v)
|
timestamp, ok := TryParseTimestampRFC3339Nano(v)
|
||||||
if !ok {
|
if !ok {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
|
@ -78,21 +78,6 @@ func (p *JSONParser) ParseLogMessage(msg []byte) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// RenameField renames field with the oldName to newName in p.Fields
|
|
||||||
func (p *JSONParser) RenameField(oldName, newName string) {
|
|
||||||
if oldName == "" {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
fields := p.Fields
|
|
||||||
for i := range fields {
|
|
||||||
f := &fields[i]
|
|
||||||
if f.Name == oldName {
|
|
||||||
f.Name = newName
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func appendLogFields(dst []Field, dstBuf, prefixBuf []byte, v *fastjson.Value) ([]Field, []byte, []byte) {
|
func appendLogFields(dst []Field, dstBuf, prefixBuf []byte, v *fastjson.Value) ([]Field, []byte, []byte) {
|
||||||
o := v.GetObject()
|
o := v.GetObject()
|
||||||
o.Visit(func(k []byte, v *fastjson.Value) {
|
o.Visit(func(k []byte, v *fastjson.Value) {
|
||||||
|
|
|
@ -60,20 +60,8 @@ type RowFormatter []Field
|
||||||
|
|
||||||
// String returns user-readable representation for rf
|
// String returns user-readable representation for rf
|
||||||
func (rf *RowFormatter) String() string {
|
func (rf *RowFormatter) String() string {
|
||||||
b := append([]byte{}, '{')
|
result := MarshalFieldsToJSON(nil, *rf)
|
||||||
|
return string(result)
|
||||||
fields := *rf
|
|
||||||
if len(fields) > 0 {
|
|
||||||
b = append(b, fields[0].String()...)
|
|
||||||
fields = fields[1:]
|
|
||||||
for _, field := range fields {
|
|
||||||
b = append(b, ',')
|
|
||||||
b = append(b, field.String()...)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
b = append(b, '}')
|
|
||||||
return string(b)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Reset resets lr with all its settings.
|
// Reset resets lr with all its settings.
|
||||||
|
|
|
@ -919,7 +919,7 @@ func parseMathNumber(s string) float64 {
|
||||||
if ok {
|
if ok {
|
||||||
return f
|
return f
|
||||||
}
|
}
|
||||||
nsecs, ok := tryParseTimestampRFC3339Nano(s)
|
nsecs, ok := TryParseTimestampRFC3339Nano(s)
|
||||||
if ok {
|
if ok {
|
||||||
return float64(nsecs)
|
return float64(nsecs)
|
||||||
}
|
}
|
||||||
|
|
|
@ -13,6 +13,10 @@ type pipeUnpackSyslog struct {
|
||||||
// fromField is the field to unpack syslog fields from
|
// fromField is the field to unpack syslog fields from
|
||||||
fromField string
|
fromField string
|
||||||
|
|
||||||
|
// the timezone to use when parsing rfc3164 timestamps
|
||||||
|
offsetStr string
|
||||||
|
offsetTimezone *time.Location
|
||||||
|
|
||||||
// resultPrefix is prefix to add to unpacked field names
|
// resultPrefix is prefix to add to unpacked field names
|
||||||
resultPrefix string
|
resultPrefix string
|
||||||
|
|
||||||
|
@ -30,6 +34,9 @@ func (pu *pipeUnpackSyslog) String() string {
|
||||||
if !isMsgFieldName(pu.fromField) {
|
if !isMsgFieldName(pu.fromField) {
|
||||||
s += " from " + quoteTokenIfNeeded(pu.fromField)
|
s += " from " + quoteTokenIfNeeded(pu.fromField)
|
||||||
}
|
}
|
||||||
|
if pu.offsetStr != "" {
|
||||||
|
s += " offset " + pu.offsetStr
|
||||||
|
}
|
||||||
if pu.resultPrefix != "" {
|
if pu.resultPrefix != "" {
|
||||||
s += " result_prefix " + quoteTokenIfNeeded(pu.resultPrefix)
|
s += " result_prefix " + quoteTokenIfNeeded(pu.resultPrefix)
|
||||||
}
|
}
|
||||||
|
@ -64,14 +71,14 @@ func (pu *pipeUnpackSyslog) initFilterInValues(cache map[string][]string, getFie
|
||||||
func (pu *pipeUnpackSyslog) newPipeProcessor(workersCount int, _ <-chan struct{}, _ func(), ppNext pipeProcessor) pipeProcessor {
|
func (pu *pipeUnpackSyslog) newPipeProcessor(workersCount int, _ <-chan struct{}, _ func(), ppNext pipeProcessor) pipeProcessor {
|
||||||
unpackSyslog := func(uctx *fieldsUnpackerContext, s string) {
|
unpackSyslog := func(uctx *fieldsUnpackerContext, s string) {
|
||||||
year := currentYear.Load()
|
year := currentYear.Load()
|
||||||
p := getSyslogParser(int(year))
|
p := GetSyslogParser(int(year), pu.offsetTimezone)
|
||||||
|
|
||||||
p.parse(s)
|
p.Parse(s)
|
||||||
for _, f := range p.fields {
|
for _, f := range p.Fields {
|
||||||
uctx.addField(f.Name, f.Value)
|
uctx.addField(f.Name, f.Value)
|
||||||
}
|
}
|
||||||
|
|
||||||
putSyslogParser(p)
|
PutSyslogParser(p)
|
||||||
}
|
}
|
||||||
|
|
||||||
return newPipeUnpackProcessor(workersCount, unpackSyslog, ppNext, pu.fromField, pu.resultPrefix, pu.keepOriginalFields, false, pu.iff)
|
return newPipeUnpackProcessor(workersCount, unpackSyslog, ppNext, pu.fromField, pu.resultPrefix, pu.keepOriginalFields, false, pu.iff)
|
||||||
|
@ -119,6 +126,23 @@ func parsePipeUnpackSyslog(lex *lexer) (*pipeUnpackSyslog, error) {
|
||||||
fromField = f
|
fromField = f
|
||||||
}
|
}
|
||||||
|
|
||||||
|
offsetStr := ""
|
||||||
|
offsetTimezone := time.Local
|
||||||
|
if lex.isKeyword("offset") {
|
||||||
|
lex.nextToken()
|
||||||
|
s, err := getCompoundToken(lex)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("cannot read 'offset': %w", err)
|
||||||
|
}
|
||||||
|
offsetStr = s
|
||||||
|
nsecs, ok := tryParseDuration(offsetStr)
|
||||||
|
if !ok {
|
||||||
|
return nil, fmt.Errorf("cannot parse 'offset' from %q", offsetStr)
|
||||||
|
}
|
||||||
|
secs := nsecs / nsecsPerSecond
|
||||||
|
offsetTimezone = time.FixedZone("custom", int(secs))
|
||||||
|
}
|
||||||
|
|
||||||
resultPrefix := ""
|
resultPrefix := ""
|
||||||
if lex.isKeyword("result_prefix") {
|
if lex.isKeyword("result_prefix") {
|
||||||
lex.nextToken()
|
lex.nextToken()
|
||||||
|
@ -137,6 +161,8 @@ func parsePipeUnpackSyslog(lex *lexer) (*pipeUnpackSyslog, error) {
|
||||||
|
|
||||||
pu := &pipeUnpackSyslog{
|
pu := &pipeUnpackSyslog{
|
||||||
fromField: fromField,
|
fromField: fromField,
|
||||||
|
offsetStr: offsetStr,
|
||||||
|
offsetTimezone: offsetTimezone,
|
||||||
resultPrefix: resultPrefix,
|
resultPrefix: resultPrefix,
|
||||||
keepOriginalFields: keepOriginalFields,
|
keepOriginalFields: keepOriginalFields,
|
||||||
iff: iff,
|
iff: iff,
|
||||||
|
|
|
@ -11,16 +11,22 @@ func TestParsePipeUnpackSyslogSuccess(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
f(`unpack_syslog`)
|
f(`unpack_syslog`)
|
||||||
|
f(`unpack_syslog offset 6h30m`)
|
||||||
|
f(`unpack_syslog offset -6h30m`)
|
||||||
f(`unpack_syslog keep_original_fields`)
|
f(`unpack_syslog keep_original_fields`)
|
||||||
|
f(`unpack_syslog offset -6h30m keep_original_fields`)
|
||||||
f(`unpack_syslog if (a:x)`)
|
f(`unpack_syslog if (a:x)`)
|
||||||
f(`unpack_syslog if (a:x) keep_original_fields`)
|
f(`unpack_syslog if (a:x) keep_original_fields`)
|
||||||
|
f(`unpack_syslog if (a:x) offset 2h keep_original_fields`)
|
||||||
f(`unpack_syslog from x`)
|
f(`unpack_syslog from x`)
|
||||||
f(`unpack_syslog from x keep_original_fields`)
|
f(`unpack_syslog from x keep_original_fields`)
|
||||||
f(`unpack_syslog if (a:x) from x`)
|
f(`unpack_syslog if (a:x) from x`)
|
||||||
f(`unpack_syslog from x result_prefix abc`)
|
f(`unpack_syslog from x result_prefix abc`)
|
||||||
|
f(`unpack_syslog from x offset 2h30m result_prefix abc`)
|
||||||
f(`unpack_syslog if (a:x) from x result_prefix abc`)
|
f(`unpack_syslog if (a:x) from x result_prefix abc`)
|
||||||
f(`unpack_syslog result_prefix abc`)
|
f(`unpack_syslog result_prefix abc`)
|
||||||
f(`unpack_syslog if (a:x) result_prefix abc`)
|
f(`unpack_syslog if (a:x) result_prefix abc`)
|
||||||
|
f(`unpack_syslog if (a:x) offset -1h result_prefix abc`)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestParsePipeUnpackSyslogFailure(t *testing.T) {
|
func TestParsePipeUnpackSyslogFailure(t *testing.T) {
|
||||||
|
@ -31,6 +37,7 @@ func TestParsePipeUnpackSyslogFailure(t *testing.T) {
|
||||||
|
|
||||||
f(`unpack_syslog foo`)
|
f(`unpack_syslog foo`)
|
||||||
f(`unpack_syslog if`)
|
f(`unpack_syslog if`)
|
||||||
|
f(`unpack_syslog offset`)
|
||||||
f(`unpack_syslog if (x:y) foobar`)
|
f(`unpack_syslog if (x:y) foobar`)
|
||||||
f(`unpack_syslog from`)
|
f(`unpack_syslog from`)
|
||||||
f(`unpack_syslog from x y`)
|
f(`unpack_syslog from x y`)
|
||||||
|
@ -118,6 +125,27 @@ func TestPipeUnpackSyslog(t *testing.T) {
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
|
|
||||||
|
// offset should be ignored when parsing non-rfc3164 messages
|
||||||
|
f("unpack_syslog from x offset 2h30m", [][]Field{
|
||||||
|
{
|
||||||
|
{"x", `<165>1 2023-06-03T17:42:32.123456789Z mymachine.example.com appname 12345 ID47 - This is a test message with structured data`},
|
||||||
|
},
|
||||||
|
}, [][]Field{
|
||||||
|
{
|
||||||
|
{"x", `<165>1 2023-06-03T17:42:32.123456789Z mymachine.example.com appname 12345 ID47 - This is a test message with structured data`},
|
||||||
|
{"priority", "165"},
|
||||||
|
{"facility", "20"},
|
||||||
|
{"severity", "5"},
|
||||||
|
{"format", "rfc5424"},
|
||||||
|
{"timestamp", "2023-06-03T17:42:32.123456789Z"},
|
||||||
|
{"hostname", "mymachine.example.com"},
|
||||||
|
{"app_name", "appname"},
|
||||||
|
{"proc_id", "12345"},
|
||||||
|
{"msg_id", "ID47"},
|
||||||
|
{"message", "This is a test message with structured data"},
|
||||||
|
},
|
||||||
|
})
|
||||||
|
|
||||||
// failed if condition
|
// failed if condition
|
||||||
f("unpack_syslog if (foo:bar)", [][]Field{
|
f("unpack_syslog if (foo:bar)", [][]Field{
|
||||||
{
|
{
|
||||||
|
|
|
@ -58,7 +58,11 @@ func (f *Field) unmarshal(a *arena, src []byte) ([]byte, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *Field) marshalToJSON(dst []byte) []byte {
|
func (f *Field) marshalToJSON(dst []byte) []byte {
|
||||||
dst = strconv.AppendQuote(dst, f.Name)
|
name := f.Name
|
||||||
|
if name == "" {
|
||||||
|
name = "_msg"
|
||||||
|
}
|
||||||
|
dst = strconv.AppendQuote(dst, name)
|
||||||
dst = append(dst, ':')
|
dst = append(dst, ':')
|
||||||
dst = strconv.AppendQuote(dst, f.Value)
|
dst = strconv.AppendQuote(dst, f.Value)
|
||||||
return dst
|
return dst
|
||||||
|
@ -84,6 +88,20 @@ func needLogfmtQuoting(s string) bool {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// RenameField renames field with the oldName to newName in Fields
|
||||||
|
func RenameField(fields []Field, oldName, newName string) {
|
||||||
|
if oldName == "" {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
for i := range fields {
|
||||||
|
f := &fields[i]
|
||||||
|
if f.Name == oldName {
|
||||||
|
f.Name = newName
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// MarshalFieldsToJSON appends JSON-marshaled fields to dst and returns the result.
|
// MarshalFieldsToJSON appends JSON-marshaled fields to dst and returns the result.
|
||||||
func MarshalFieldsToJSON(dst []byte, fields []Field) []byte {
|
func MarshalFieldsToJSON(dst []byte, fields []Field) []byte {
|
||||||
dst = append(dst, '{')
|
dst = append(dst, '{')
|
||||||
|
|
|
@ -10,50 +10,78 @@ import (
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
|
||||||
)
|
)
|
||||||
|
|
||||||
func getSyslogParser(currentYear int) *syslogParser {
|
// GetSyslogParser returns syslog parser from the pool.
|
||||||
|
//
|
||||||
|
// currentYear must contain the current year. It is used for properly setting timestamp
|
||||||
|
// field for rfc3164 format, which doesn't contain year.
|
||||||
|
//
|
||||||
|
// the timezone is used for rfc3164 format for setting the desired timezone.
|
||||||
|
//
|
||||||
|
// Return back the parser to the pool by calling PutSyslogParser when it is no longer needed.
|
||||||
|
func GetSyslogParser(currentYear int, timezone *time.Location) *SyslogParser {
|
||||||
v := syslogParserPool.Get()
|
v := syslogParserPool.Get()
|
||||||
if v == nil {
|
if v == nil {
|
||||||
v = &syslogParser{}
|
v = &SyslogParser{}
|
||||||
}
|
}
|
||||||
p := v.(*syslogParser)
|
p := v.(*SyslogParser)
|
||||||
p.currentYear = currentYear
|
p.currentYear = currentYear
|
||||||
|
p.timezone = timezone
|
||||||
return p
|
return p
|
||||||
}
|
}
|
||||||
|
|
||||||
func putSyslogParser(p *syslogParser) {
|
// PutSyslogParser returns back syslog parser to the pool.
|
||||||
|
//
|
||||||
|
// p cannot be used after returning to the pool.
|
||||||
|
func PutSyslogParser(p *SyslogParser) {
|
||||||
p.reset()
|
p.reset()
|
||||||
syslogParserPool.Put(p)
|
syslogParserPool.Put(p)
|
||||||
}
|
}
|
||||||
|
|
||||||
var syslogParserPool sync.Pool
|
var syslogParserPool sync.Pool
|
||||||
|
|
||||||
type syslogParser struct {
|
// SyslogParser is parser for syslog messages.
|
||||||
currentYear int
|
//
|
||||||
|
// It understands the following syslog formats:
|
||||||
|
//
|
||||||
|
// - https://datatracker.ietf.org/doc/html/rfc5424
|
||||||
|
// - https://datatracker.ietf.org/doc/html/rfc3164
|
||||||
|
//
|
||||||
|
// It extracts the following list of syslog message fields into Fields -
|
||||||
|
// https://docs.victoriametrics.com/victorialogs/logsql/#unpack_syslog-pipe
|
||||||
|
type SyslogParser struct {
|
||||||
|
// Fields contains parsed fields after Parse call.
|
||||||
|
Fields []Field
|
||||||
|
|
||||||
buf []byte
|
buf []byte
|
||||||
fields []Field
|
|
||||||
|
currentYear int
|
||||||
|
timezone *time.Location
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *syslogParser) reset() {
|
func (p *SyslogParser) reset() {
|
||||||
p.currentYear = 0
|
p.currentYear = 0
|
||||||
|
p.timezone = nil
|
||||||
p.resetFields()
|
p.resetFields()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *syslogParser) resetFields() {
|
func (p *SyslogParser) resetFields() {
|
||||||
p.buf = p.buf[:0]
|
clear(p.Fields)
|
||||||
|
p.Fields = p.Fields[:0]
|
||||||
|
|
||||||
clear(p.fields)
|
p.buf = p.buf[:0]
|
||||||
p.fields = p.fields[:0]
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *syslogParser) addField(name, value string) {
|
func (p *SyslogParser) addField(name, value string) {
|
||||||
p.fields = append(p.fields, Field{
|
p.Fields = append(p.Fields, Field{
|
||||||
Name: name,
|
Name: name,
|
||||||
Value: value,
|
Value: value,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *syslogParser) parse(s string) {
|
// Parse parses syslog message from s into p.Fields.
|
||||||
|
//
|
||||||
|
// p.Fields is valid until s is modified or p state is changed.
|
||||||
|
func (p *SyslogParser) Parse(s string) {
|
||||||
p.resetFields()
|
p.resetFields()
|
||||||
|
|
||||||
if len(s) == 0 {
|
if len(s) == 0 {
|
||||||
|
@ -96,7 +124,7 @@ func (p *syslogParser) parse(s string) {
|
||||||
p.parseNoHeader(s)
|
p.parseNoHeader(s)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *syslogParser) parseNoHeader(s string) {
|
func (p *SyslogParser) parseNoHeader(s string) {
|
||||||
if len(s) == 0 {
|
if len(s) == 0 {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -107,7 +135,7 @@ func (p *syslogParser) parseNoHeader(s string) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *syslogParser) parseRFC5424(s string) {
|
func (p *SyslogParser) parseRFC5424(s string) {
|
||||||
// See https://datatracker.ietf.org/doc/html/rfc5424
|
// See https://datatracker.ietf.org/doc/html/rfc5424
|
||||||
|
|
||||||
p.addField("format", "rfc5424")
|
p.addField("format", "rfc5424")
|
||||||
|
@ -172,7 +200,7 @@ func (p *syslogParser) parseRFC5424(s string) {
|
||||||
p.addField("message", s)
|
p.addField("message", s)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *syslogParser) parseRFC5424SD(s string) (string, bool) {
|
func (p *SyslogParser) parseRFC5424SD(s string) (string, bool) {
|
||||||
if strings.HasPrefix(s, "- ") {
|
if strings.HasPrefix(s, "- ") {
|
||||||
return s[2:], true
|
return s[2:], true
|
||||||
}
|
}
|
||||||
|
@ -190,7 +218,7 @@ func (p *syslogParser) parseRFC5424SD(s string) (string, bool) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *syslogParser) parseRFC5424SDLine(s string) (string, bool) {
|
func (p *SyslogParser) parseRFC5424SDLine(s string) (string, bool) {
|
||||||
if len(s) == 0 || s[0] != '[' {
|
if len(s) == 0 || s[0] != '[' {
|
||||||
return s, false
|
return s, false
|
||||||
}
|
}
|
||||||
|
@ -236,7 +264,7 @@ func (p *syslogParser) parseRFC5424SDLine(s string) (string, bool) {
|
||||||
return s, true
|
return s, true
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *syslogParser) parseRFC3164(s string) {
|
func (p *SyslogParser) parseRFC3164(s string) {
|
||||||
// See https://datatracker.ietf.org/doc/html/rfc3164
|
// See https://datatracker.ietf.org/doc/html/rfc3164
|
||||||
|
|
||||||
p.addField("format", "rfc3164")
|
p.addField("format", "rfc3164")
|
||||||
|
@ -257,10 +285,10 @@ func (p *syslogParser) parseRFC3164(s string) {
|
||||||
s = s[n:]
|
s = s[n:]
|
||||||
|
|
||||||
t = t.UTC()
|
t = t.UTC()
|
||||||
t = time.Date(p.currentYear, t.Month(), t.Day(), t.Hour(), t.Minute(), t.Second(), t.Nanosecond(), time.UTC)
|
t = time.Date(p.currentYear, t.Month(), t.Day(), t.Hour(), t.Minute(), t.Second(), t.Nanosecond(), p.timezone)
|
||||||
if uint64(t.Unix())-24*3600 > fasttime.UnixTimestamp() {
|
if uint64(t.Unix())-24*3600 > fasttime.UnixTimestamp() {
|
||||||
// Adjust time to the previous year
|
// Adjust time to the previous year
|
||||||
t = time.Date(t.Year()-1, t.Month(), t.Day(), t.Hour(), t.Minute(), t.Second(), t.Nanosecond(), time.UTC)
|
t = time.Date(t.Year()-1, t.Month(), t.Day(), t.Hour(), t.Minute(), t.Second(), t.Nanosecond(), p.timezone)
|
||||||
}
|
}
|
||||||
|
|
||||||
bufLen := len(p.buf)
|
bufLen := len(p.buf)
|
||||||
|
|
|
@ -2,69 +2,70 @@ package logstorage
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"testing"
|
"testing"
|
||||||
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestSyslogParser(t *testing.T) {
|
func TestSyslogParser(t *testing.T) {
|
||||||
f := func(s, resultExpected string) {
|
f := func(s string, timezone *time.Location, resultExpected string) {
|
||||||
t.Helper()
|
t.Helper()
|
||||||
|
|
||||||
const currentYear = 2024
|
const currentYear = 2024
|
||||||
p := getSyslogParser(currentYear)
|
p := GetSyslogParser(currentYear, timezone)
|
||||||
defer putSyslogParser(p)
|
defer PutSyslogParser(p)
|
||||||
|
|
||||||
p.parse(s)
|
p.Parse(s)
|
||||||
result := MarshalFieldsToJSON(nil, p.fields)
|
result := MarshalFieldsToJSON(nil, p.Fields)
|
||||||
if string(result) != resultExpected {
|
if string(result) != resultExpected {
|
||||||
t.Fatalf("unexpected result when parsing [%s]; got\n%s\nwant\n%s\n", s, result, resultExpected)
|
t.Fatalf("unexpected result when parsing [%s]; got\n%s\nwant\n%s\n", s, result, resultExpected)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// RFC 3164
|
// RFC 3164
|
||||||
f("Jun 3 12:08:33 abcd systemd[1]: Starting Update the local ESM caches...",
|
f("Jun 3 12:08:33 abcd systemd[1]: Starting Update the local ESM caches...", time.UTC,
|
||||||
`{"format":"rfc3164","timestamp":"2024-06-03T12:08:33.000Z","hostname":"abcd","app_name":"systemd","proc_id":"1","message":"Starting Update the local ESM caches..."}`)
|
`{"format":"rfc3164","timestamp":"2024-06-03T12:08:33.000Z","hostname":"abcd","app_name":"systemd","proc_id":"1","message":"Starting Update the local ESM caches..."}`)
|
||||||
f("<165>Jun 3 12:08:33 abcd systemd[1]: Starting Update the local ESM caches...",
|
f("<165>Jun 3 12:08:33 abcd systemd[1]: Starting Update the local ESM caches...", time.UTC,
|
||||||
`{"priority":"165","facility":"20","severity":"5","format":"rfc3164","timestamp":"2024-06-03T12:08:33.000Z","hostname":"abcd","app_name":"systemd","proc_id":"1","message":"Starting Update the local ESM caches..."}`)
|
`{"priority":"165","facility":"20","severity":"5","format":"rfc3164","timestamp":"2024-06-03T12:08:33.000Z","hostname":"abcd","app_name":"systemd","proc_id":"1","message":"Starting Update the local ESM caches..."}`)
|
||||||
f("Mar 13 12:08:33 abcd systemd: Starting Update the local ESM caches...",
|
f("Mar 13 12:08:33 abcd systemd: Starting Update the local ESM caches...", time.UTC,
|
||||||
`{"format":"rfc3164","timestamp":"2024-03-13T12:08:33.000Z","hostname":"abcd","app_name":"systemd","message":"Starting Update the local ESM caches..."}`)
|
`{"format":"rfc3164","timestamp":"2024-03-13T12:08:33.000Z","hostname":"abcd","app_name":"systemd","message":"Starting Update the local ESM caches..."}`)
|
||||||
f("Jun 3 12:08:33 abcd - Starting Update the local ESM caches...",
|
f("Jun 3 12:08:33 abcd - Starting Update the local ESM caches...", time.UTC,
|
||||||
`{"format":"rfc3164","timestamp":"2024-06-03T12:08:33.000Z","hostname":"abcd","app_name":"-","message":"Starting Update the local ESM caches..."}`)
|
`{"format":"rfc3164","timestamp":"2024-06-03T12:08:33.000Z","hostname":"abcd","app_name":"-","message":"Starting Update the local ESM caches..."}`)
|
||||||
f("Jun 3 12:08:33 - - Starting Update the local ESM caches...",
|
f("Jun 3 12:08:33 - - Starting Update the local ESM caches...", time.UTC,
|
||||||
`{"format":"rfc3164","timestamp":"2024-06-03T12:08:33.000Z","hostname":"-","app_name":"-","message":"Starting Update the local ESM caches..."}`)
|
`{"format":"rfc3164","timestamp":"2024-06-03T12:08:33.000Z","hostname":"-","app_name":"-","message":"Starting Update the local ESM caches..."}`)
|
||||||
|
|
||||||
// RFC 5424
|
// RFC 5424
|
||||||
f(`<165>1 2023-06-03T17:42:32.123456789Z mymachine.example.com appname 12345 ID47 - This is a test message with structured data.`,
|
f(`<165>1 2023-06-03T17:42:32.123456789Z mymachine.example.com appname 12345 ID47 - This is a test message with structured data.`, time.UTC,
|
||||||
`{"priority":"165","facility":"20","severity":"5","format":"rfc5424","timestamp":"2023-06-03T17:42:32.123456789Z","hostname":"mymachine.example.com","app_name":"appname","proc_id":"12345","msg_id":"ID47","message":"This is a test message with structured data."}`)
|
`{"priority":"165","facility":"20","severity":"5","format":"rfc5424","timestamp":"2023-06-03T17:42:32.123456789Z","hostname":"mymachine.example.com","app_name":"appname","proc_id":"12345","msg_id":"ID47","message":"This is a test message with structured data."}`)
|
||||||
f(`1 2023-06-03T17:42:32.123456789Z mymachine.example.com appname 12345 ID47 - This is a test message with structured data.`,
|
f(`1 2023-06-03T17:42:32.123456789Z mymachine.example.com appname 12345 ID47 - This is a test message with structured data.`, time.UTC,
|
||||||
`{"format":"rfc5424","timestamp":"2023-06-03T17:42:32.123456789Z","hostname":"mymachine.example.com","app_name":"appname","proc_id":"12345","msg_id":"ID47","message":"This is a test message with structured data."}`)
|
`{"format":"rfc5424","timestamp":"2023-06-03T17:42:32.123456789Z","hostname":"mymachine.example.com","app_name":"appname","proc_id":"12345","msg_id":"ID47","message":"This is a test message with structured data."}`)
|
||||||
f(`<165>1 2023-06-03T17:42:00.000Z mymachine.example.com appname 12345 ID47 [exampleSDID@32473 iut="3" eventSource="Application 123 = ] 56" eventID="11211"] This is a test message with structured data.`,
|
f(`<165>1 2023-06-03T17:42:00.000Z mymachine.example.com appname 12345 ID47 [exampleSDID@32473 iut="3" eventSource="Application 123 = ] 56" eventID="11211"] This is a test message with structured data.`, time.UTC,
|
||||||
`{"priority":"165","facility":"20","severity":"5","format":"rfc5424","timestamp":"2023-06-03T17:42:00.000Z","hostname":"mymachine.example.com","app_name":"appname","proc_id":"12345","msg_id":"ID47","exampleSDID@32473":"iut=\"3\" eventSource=\"Application 123 = ] 56\" eventID=\"11211\"","message":"This is a test message with structured data."}`)
|
`{"priority":"165","facility":"20","severity":"5","format":"rfc5424","timestamp":"2023-06-03T17:42:00.000Z","hostname":"mymachine.example.com","app_name":"appname","proc_id":"12345","msg_id":"ID47","exampleSDID@32473":"iut=\"3\" eventSource=\"Application 123 = ] 56\" eventID=\"11211\"","message":"This is a test message with structured data."}`)
|
||||||
f(`<165>1 2023-06-03T17:42:00.000Z mymachine.example.com appname 12345 ID47 [foo@123 iut="3"][bar@456 eventID="11211"] This is a test message with structured data.`,
|
f(`<165>1 2023-06-03T17:42:00.000Z mymachine.example.com appname 12345 ID47 [foo@123 iut="3"][bar@456 eventID="11211"] This is a test message with structured data.`, time.UTC,
|
||||||
`{"priority":"165","facility":"20","severity":"5","format":"rfc5424","timestamp":"2023-06-03T17:42:00.000Z","hostname":"mymachine.example.com","app_name":"appname","proc_id":"12345","msg_id":"ID47","foo@123":"iut=\"3\"","bar@456":"eventID=\"11211\"","message":"This is a test message with structured data."}`)
|
`{"priority":"165","facility":"20","severity":"5","format":"rfc5424","timestamp":"2023-06-03T17:42:00.000Z","hostname":"mymachine.example.com","app_name":"appname","proc_id":"12345","msg_id":"ID47","foo@123":"iut=\"3\"","bar@456":"eventID=\"11211\"","message":"This is a test message with structured data."}`)
|
||||||
|
|
||||||
// Incomplete RFC 3164
|
// Incomplete RFC 3164
|
||||||
f("", `{}`)
|
f("", time.UTC, `{}`)
|
||||||
f("Jun 3 12:08:33", `{"format":"rfc3164","timestamp":"2024-06-03T12:08:33.000Z"}`)
|
f("Jun 3 12:08:33", time.UTC, `{"format":"rfc3164","timestamp":"2024-06-03T12:08:33.000Z"}`)
|
||||||
f("Foo 3 12:08:33", `{"format":"rfc3164","message":"Foo 3 12:08:33"}`)
|
f("Foo 3 12:08:33", time.UTC, `{"format":"rfc3164","message":"Foo 3 12:08:33"}`)
|
||||||
f("Foo 3 12:08:33bar", `{"format":"rfc3164","message":"Foo 3 12:08:33bar"}`)
|
f("Foo 3 12:08:33bar", time.UTC, `{"format":"rfc3164","message":"Foo 3 12:08:33bar"}`)
|
||||||
f("Jun 3 12:08:33 abcd", `{"format":"rfc3164","timestamp":"2024-06-03T12:08:33.000Z","hostname":"abcd"}`)
|
f("Jun 3 12:08:33 abcd", time.UTC, `{"format":"rfc3164","timestamp":"2024-06-03T12:08:33.000Z","hostname":"abcd"}`)
|
||||||
f("Jun 3 12:08:33 abcd sudo", `{"format":"rfc3164","timestamp":"2024-06-03T12:08:33.000Z","hostname":"abcd","app_name":"sudo"}`)
|
f("Jun 3 12:08:33 abcd sudo", time.UTC, `{"format":"rfc3164","timestamp":"2024-06-03T12:08:33.000Z","hostname":"abcd","app_name":"sudo"}`)
|
||||||
f("Jun 3 12:08:33 abcd sudo[123]", `{"format":"rfc3164","timestamp":"2024-06-03T12:08:33.000Z","hostname":"abcd","app_name":"sudo","proc_id":"123"}`)
|
f("Jun 3 12:08:33 abcd sudo[123]", time.UTC, `{"format":"rfc3164","timestamp":"2024-06-03T12:08:33.000Z","hostname":"abcd","app_name":"sudo","proc_id":"123"}`)
|
||||||
f("Jun 3 12:08:33 abcd sudo foobar", `{"format":"rfc3164","timestamp":"2024-06-03T12:08:33.000Z","hostname":"abcd","app_name":"sudo","message":"foobar"}`)
|
f("Jun 3 12:08:33 abcd sudo foobar", time.UTC, `{"format":"rfc3164","timestamp":"2024-06-03T12:08:33.000Z","hostname":"abcd","app_name":"sudo","message":"foobar"}`)
|
||||||
f(`foo bar baz`, `{"format":"rfc3164","message":"foo bar baz"}`)
|
f(`foo bar baz`, time.UTC, `{"format":"rfc3164","message":"foo bar baz"}`)
|
||||||
|
|
||||||
// Incomplete RFC 5424
|
// Incomplete RFC 5424
|
||||||
f(`<165>1 2023-06-03T17:42:32.123456789Z mymachine.example.com appname 12345 ID47 [foo@123]`,
|
f(`<165>1 2023-06-03T17:42:32.123456789Z mymachine.example.com appname 12345 ID47 [foo@123]`, time.UTC,
|
||||||
`{"priority":"165","facility":"20","severity":"5","format":"rfc5424","timestamp":"2023-06-03T17:42:32.123456789Z","hostname":"mymachine.example.com","app_name":"appname","proc_id":"12345","msg_id":"ID47","foo@123":""}`)
|
`{"priority":"165","facility":"20","severity":"5","format":"rfc5424","timestamp":"2023-06-03T17:42:32.123456789Z","hostname":"mymachine.example.com","app_name":"appname","proc_id":"12345","msg_id":"ID47","foo@123":""}`)
|
||||||
f(`<165>1 2023-06-03T17:42:32.123456789Z mymachine.example.com appname 12345 ID47`,
|
f(`<165>1 2023-06-03T17:42:32.123456789Z mymachine.example.com appname 12345 ID47`, time.UTC,
|
||||||
`{"priority":"165","facility":"20","severity":"5","format":"rfc5424","timestamp":"2023-06-03T17:42:32.123456789Z","hostname":"mymachine.example.com","app_name":"appname","proc_id":"12345","msg_id":"ID47"}`)
|
`{"priority":"165","facility":"20","severity":"5","format":"rfc5424","timestamp":"2023-06-03T17:42:32.123456789Z","hostname":"mymachine.example.com","app_name":"appname","proc_id":"12345","msg_id":"ID47"}`)
|
||||||
f(`<165>1 2023-06-03T17:42:32.123456789Z mymachine.example.com appname 12345`,
|
f(`<165>1 2023-06-03T17:42:32.123456789Z mymachine.example.com appname 12345`, time.UTC,
|
||||||
`{"priority":"165","facility":"20","severity":"5","format":"rfc5424","timestamp":"2023-06-03T17:42:32.123456789Z","hostname":"mymachine.example.com","app_name":"appname","proc_id":"12345"}`)
|
`{"priority":"165","facility":"20","severity":"5","format":"rfc5424","timestamp":"2023-06-03T17:42:32.123456789Z","hostname":"mymachine.example.com","app_name":"appname","proc_id":"12345"}`)
|
||||||
f(`<165>1 2023-06-03T17:42:32.123456789Z mymachine.example.com appname`,
|
f(`<165>1 2023-06-03T17:42:32.123456789Z mymachine.example.com appname`, time.UTC,
|
||||||
`{"priority":"165","facility":"20","severity":"5","format":"rfc5424","timestamp":"2023-06-03T17:42:32.123456789Z","hostname":"mymachine.example.com","app_name":"appname"}`)
|
`{"priority":"165","facility":"20","severity":"5","format":"rfc5424","timestamp":"2023-06-03T17:42:32.123456789Z","hostname":"mymachine.example.com","app_name":"appname"}`)
|
||||||
f(`<165>1 2023-06-03T17:42:32.123456789Z mymachine.example.com`,
|
f(`<165>1 2023-06-03T17:42:32.123456789Z mymachine.example.com`, time.UTC,
|
||||||
`{"priority":"165","facility":"20","severity":"5","format":"rfc5424","timestamp":"2023-06-03T17:42:32.123456789Z","hostname":"mymachine.example.com"}`)
|
`{"priority":"165","facility":"20","severity":"5","format":"rfc5424","timestamp":"2023-06-03T17:42:32.123456789Z","hostname":"mymachine.example.com"}`)
|
||||||
f(`<165>1 2023-06-03T17:42:32.123456789Z`,
|
f(`<165>1 2023-06-03T17:42:32.123456789Z`, time.UTC,
|
||||||
`{"priority":"165","facility":"20","severity":"5","format":"rfc5424","timestamp":"2023-06-03T17:42:32.123456789Z"}`)
|
`{"priority":"165","facility":"20","severity":"5","format":"rfc5424","timestamp":"2023-06-03T17:42:32.123456789Z"}`)
|
||||||
f(`<165>1 `,
|
f(`<165>1 `, time.UTC,
|
||||||
`{"priority":"165","facility":"20","severity":"5","format":"rfc5424"}`)
|
`{"priority":"165","facility":"20","severity":"5","format":"rfc5424"}`)
|
||||||
}
|
}
|
||||||
|
|
|
@ -262,7 +262,7 @@ func tryTimestampISO8601Encoding(dstBuf []byte, dstValues, srcValues []string) (
|
||||||
a := u64s.A
|
a := u64s.A
|
||||||
var minValue, maxValue int64
|
var minValue, maxValue int64
|
||||||
for i, v := range srcValues {
|
for i, v := range srcValues {
|
||||||
n, ok := tryParseTimestampISO8601(v)
|
n, ok := TryParseTimestampISO8601(v)
|
||||||
if !ok {
|
if !ok {
|
||||||
return dstBuf, dstValues, valueTypeUnknown, 0, 0
|
return dstBuf, dstValues, valueTypeUnknown, 0, 0
|
||||||
}
|
}
|
||||||
|
@ -283,10 +283,10 @@ func tryTimestampISO8601Encoding(dstBuf []byte, dstValues, srcValues []string) (
|
||||||
return dstBuf, dstValues, valueTypeTimestampISO8601, uint64(minValue), uint64(maxValue)
|
return dstBuf, dstValues, valueTypeTimestampISO8601, uint64(minValue), uint64(maxValue)
|
||||||
}
|
}
|
||||||
|
|
||||||
// tryParseTimestampRFC3339Nano parses 'YYYY-MM-DDThh:mm:ss' with optional nanoseconds part and 'Z' tail and returns unix timestamp in nanoseconds.
|
// TryParseTimestampRFC3339Nano parses 'YYYY-MM-DDThh:mm:ss' with optional nanoseconds part and 'Z' tail and returns unix timestamp in nanoseconds.
|
||||||
//
|
//
|
||||||
// The returned timestamp can be negative if s is smaller than 1970 year.
|
// The returned timestamp can be negative if s is smaller than 1970 year.
|
||||||
func tryParseTimestampRFC3339Nano(s string) (int64, bool) {
|
func TryParseTimestampRFC3339Nano(s string) (int64, bool) {
|
||||||
// Do not parse timestamps with timezone other than Z, since they cannot be converted back
|
// Do not parse timestamps with timezone other than Z, since they cannot be converted back
|
||||||
// to the same string representation in general case.
|
// to the same string representation in general case.
|
||||||
// This may break search.
|
// This may break search.
|
||||||
|
@ -330,10 +330,10 @@ func tryParseTimestampRFC3339Nano(s string) (int64, bool) {
|
||||||
return nsecs, true
|
return nsecs, true
|
||||||
}
|
}
|
||||||
|
|
||||||
// tryParseTimestampISO8601 parses 'YYYY-MM-DDThh:mm:ss.mssZ' and returns unix timestamp in nanoseconds.
|
// TryParseTimestampISO8601 parses 'YYYY-MM-DDThh:mm:ss.mssZ' and returns unix timestamp in nanoseconds.
|
||||||
//
|
//
|
||||||
// The returned timestamp can be negative if s is smaller than 1970 year.
|
// The returned timestamp can be negative if s is smaller than 1970 year.
|
||||||
func tryParseTimestampISO8601(s string) (int64, bool) {
|
func TryParseTimestampISO8601(s string) (int64, bool) {
|
||||||
// Do not parse timestamps with timezone, since they cannot be converted back
|
// Do not parse timestamps with timezone, since they cannot be converted back
|
||||||
// to the same string representation in general case.
|
// to the same string representation in general case.
|
||||||
// This may break search.
|
// This may break search.
|
||||||
|
|
|
@ -151,7 +151,7 @@ func TestTryParseIPv4_Failure(t *testing.T) {
|
||||||
func TestTryParseTimestampRFC3339NanoString_Success(t *testing.T) {
|
func TestTryParseTimestampRFC3339NanoString_Success(t *testing.T) {
|
||||||
f := func(s string) {
|
f := func(s string) {
|
||||||
t.Helper()
|
t.Helper()
|
||||||
nsecs, ok := tryParseTimestampRFC3339Nano(s)
|
nsecs, ok := TryParseTimestampRFC3339Nano(s)
|
||||||
if !ok {
|
if !ok {
|
||||||
t.Fatalf("cannot parse timestamp %q", s)
|
t.Fatalf("cannot parse timestamp %q", s)
|
||||||
}
|
}
|
||||||
|
@ -185,7 +185,7 @@ func TestTryParseTimestampRFC3339NanoString_Success(t *testing.T) {
|
||||||
func TestTryParseTimestampRFC3339Nano_Failure(t *testing.T) {
|
func TestTryParseTimestampRFC3339Nano_Failure(t *testing.T) {
|
||||||
f := func(s string) {
|
f := func(s string) {
|
||||||
t.Helper()
|
t.Helper()
|
||||||
_, ok := tryParseTimestampRFC3339Nano(s)
|
_, ok := TryParseTimestampRFC3339Nano(s)
|
||||||
if ok {
|
if ok {
|
||||||
t.Fatalf("expecting faulure when parsing %q", s)
|
t.Fatalf("expecting faulure when parsing %q", s)
|
||||||
}
|
}
|
||||||
|
@ -240,7 +240,7 @@ func TestTryParseTimestampRFC3339Nano_Failure(t *testing.T) {
|
||||||
func TestTryParseTimestampISO8601String_Success(t *testing.T) {
|
func TestTryParseTimestampISO8601String_Success(t *testing.T) {
|
||||||
f := func(s string) {
|
f := func(s string) {
|
||||||
t.Helper()
|
t.Helper()
|
||||||
nsecs, ok := tryParseTimestampISO8601(s)
|
nsecs, ok := TryParseTimestampISO8601(s)
|
||||||
if !ok {
|
if !ok {
|
||||||
t.Fatalf("cannot parse timestamp %q", s)
|
t.Fatalf("cannot parse timestamp %q", s)
|
||||||
}
|
}
|
||||||
|
@ -263,7 +263,7 @@ func TestTryParseTimestampISO8601String_Success(t *testing.T) {
|
||||||
func TestTryParseTimestampISO8601_Failure(t *testing.T) {
|
func TestTryParseTimestampISO8601_Failure(t *testing.T) {
|
||||||
f := func(s string) {
|
f := func(s string) {
|
||||||
t.Helper()
|
t.Helper()
|
||||||
_, ok := tryParseTimestampISO8601(s)
|
_, ok := TryParseTimestampISO8601(s)
|
||||||
if ok {
|
if ok {
|
||||||
t.Fatalf("expecting faulure when parsing %q", s)
|
t.Fatalf("expecting faulure when parsing %q", s)
|
||||||
}
|
}
|
||||||
|
|
|
@ -21,7 +21,7 @@ func BenchmarkTryParseTimestampRFC3339Nano(b *testing.B) {
|
||||||
nSum := int64(0)
|
nSum := int64(0)
|
||||||
for pb.Next() {
|
for pb.Next() {
|
||||||
for _, s := range a {
|
for _, s := range a {
|
||||||
n, ok := tryParseTimestampRFC3339Nano(s)
|
n, ok := TryParseTimestampRFC3339Nano(s)
|
||||||
if !ok {
|
if !ok {
|
||||||
panic(fmt.Errorf("cannot parse timestamp %q", s))
|
panic(fmt.Errorf("cannot parse timestamp %q", s))
|
||||||
}
|
}
|
||||||
|
@ -47,7 +47,7 @@ func BenchmarkTryParseTimestampISO8601(b *testing.B) {
|
||||||
nSum := int64(0)
|
nSum := int64(0)
|
||||||
for pb.Next() {
|
for pb.Next() {
|
||||||
for _, s := range a {
|
for _, s := range a {
|
||||||
n, ok := tryParseTimestampISO8601(s)
|
n, ok := TryParseTimestampISO8601(s)
|
||||||
if !ok {
|
if !ok {
|
||||||
panic(fmt.Errorf("cannot parse timestamp %q", s))
|
panic(fmt.Errorf("cannot parse timestamp %q", s))
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue