lib/logstorage: work-in-progress

This commit is contained in:
Aliaksandr Valialkin 2024-06-17 12:13:18 +02:00
parent b37b288dce
commit 2b6a634ec0
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB
52 changed files with 1762 additions and 248 deletions

View file

@ -221,7 +221,7 @@ func readBulkLine(sc *bufio.Scanner, timeField, msgField string,
if ts == 0 { if ts == 0 {
ts = time.Now().UnixNano() ts = time.Now().UnixNano()
} }
p.RenameField(msgField, "_msg") logstorage.RenameField(p.Fields, msgField, "_msg")
processLogMessage(ts, p.Fields) processLogMessage(ts, p.Fields)
logstorage.PutJSONParser(p) logstorage.PutJSONParser(p)
@ -272,9 +272,9 @@ func parseElasticsearchTimestamp(s string) (int64, error) {
} }
return t.UnixNano(), nil return t.UnixNano(), nil
} }
t, err := time.Parse(time.RFC3339, s) nsecs, ok := logstorage.TryParseTimestampRFC3339Nano(s)
if err != nil { if !ok {
return 0, fmt.Errorf("cannot parse timestamp %q: %w", s, err) return 0, fmt.Errorf("cannot parse timestamp %q", s)
} }
return t.UnixNano(), nil return nsecs, nil
} }

View file

@ -5,7 +5,6 @@ import (
"compress/gzip" "compress/gzip"
"fmt" "fmt"
"reflect" "reflect"
"strings"
"testing" "testing"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage"
@ -45,13 +44,7 @@ func TestReadBulkRequestSuccess(t *testing.T) {
var result string var result string
processLogMessage := func(timestamp int64, fields []logstorage.Field) { processLogMessage := func(timestamp int64, fields []logstorage.Field) {
timestamps = append(timestamps, timestamp) timestamps = append(timestamps, timestamp)
result += string(logstorage.MarshalFieldsToJSON(nil, fields)) + "\n"
a := make([]string, len(fields))
for i, f := range fields {
a[i] = fmt.Sprintf("%q:%q", f.Name, f.Value)
}
s := "{" + strings.Join(a, ",") + "}\n"
result += s
} }
// Read the request without compression // Read the request without compression

View file

@ -71,6 +71,23 @@ func GetCommonParams(r *http.Request) (*CommonParams, error) {
return cp, nil return cp, nil
} }
// GetCommonParamsForSyslog returns common params needed for parsing syslog messages and storing them to the given tenantID.
func GetCommonParamsForSyslog(tenantID logstorage.TenantID) *CommonParams {
// See https://docs.victoriametrics.com/victorialogs/logsql/#unpack_syslog-pipe
cp := &CommonParams{
TenantID: tenantID,
TimeField: "timestamp",
MsgField: "message",
StreamFields: []string{
"hostname",
"app_name",
"proc_id",
},
}
return cp
}
// GetProcessLogMessageFunc returns a function, which adds parsed log messages to lr. // GetProcessLogMessageFunc returns a function, which adds parsed log messages to lr.
func (cp *CommonParams) GetProcessLogMessageFunc(lr *logstorage.LogRows) func(timestamp int64, fields []logstorage.Field) { func (cp *CommonParams) GetProcessLogMessageFunc(lr *logstorage.LogRows) func(timestamp int64, fields []logstorage.Field) {
return func(timestamp int64, fields []logstorage.Field) { return func(timestamp int64, fields []logstorage.Field) {

View file

@ -0,0 +1,33 @@
package insertutils
import (
"fmt"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage"
)
// ExtractTimestampISO8601FromFields extracts ISO8601 timestamp in nanoseconds from the field with the name timeField at fields.
//
// The value for the timeField is set to empty string after returning from the function,
// so it could be ignored during data ingestion.
//
// The current timestamp is returned if fields do not contain a field with timeField name or if the timeField value is empty.
func ExtractTimestampISO8601FromFields(timeField string, fields []logstorage.Field) (int64, error) {
for i := range fields {
f := &fields[i]
if f.Name != timeField {
continue
}
nsecs, ok := logstorage.TryParseTimestampISO8601(f.Value)
if !ok {
if f.Value == "0" || f.Value == "" {
return time.Now().UnixNano(), nil
}
return time.Now().UnixNano(), fmt.Errorf("cannot unmarshal iso8601 timestamp from %s=%q", timeField, f.Value)
}
f.Value = ""
return nsecs, nil
}
return time.Now().UnixNano(), nil
}

View file

@ -19,13 +19,13 @@ import (
) )
// RequestHandler processes jsonline insert requests // RequestHandler processes jsonline insert requests
func RequestHandler(w http.ResponseWriter, r *http.Request) bool { func RequestHandler(w http.ResponseWriter, r *http.Request) {
startTime := time.Now() startTime := time.Now()
w.Header().Add("Content-Type", "application/json") w.Header().Add("Content-Type", "application/json")
if r.Method != "POST" { if r.Method != "POST" {
w.WriteHeader(http.StatusMethodNotAllowed) w.WriteHeader(http.StatusMethodNotAllowed)
return true return
} }
requestsTotal.Inc() requestsTotal.Inc()
@ -33,11 +33,11 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool {
cp, err := insertutils.GetCommonParams(r) cp, err := insertutils.GetCommonParams(r)
if err != nil { if err != nil {
httpserver.Errorf(w, r, "%s", err) httpserver.Errorf(w, r, "%s", err)
return true return
} }
if err := vlstorage.CanWriteData(); err != nil { if err := vlstorage.CanWriteData(); err != nil {
httpserver.Errorf(w, r, "%s", err) httpserver.Errorf(w, r, "%s", err)
return true return
} }
lr := logstorage.GetLogRows(cp.StreamFields, cp.IgnoreFields) lr := logstorage.GetLogRows(cp.StreamFields, cp.IgnoreFields)
processLogMessage := cp.GetProcessLogMessageFunc(lr) processLogMessage := cp.GetProcessLogMessageFunc(lr)
@ -46,8 +46,8 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool {
if r.Header.Get("Content-Encoding") == "gzip" { if r.Header.Get("Content-Encoding") == "gzip" {
zr, err := common.GetGzipReader(reader) zr, err := common.GetGzipReader(reader)
if err != nil { if err != nil {
logger.Errorf("cannot read gzipped _bulk request: %s", err) logger.Errorf("cannot read gzipped jsonline request: %s", err)
return true return
} }
defer common.PutGzipReader(zr) defer common.PutGzipReader(zr)
reader = zr reader = zr
@ -68,6 +68,7 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool {
ok, err := readLine(sc, cp.TimeField, cp.MsgField, processLogMessage) ok, err := readLine(sc, cp.TimeField, cp.MsgField, processLogMessage)
wcr.DecConcurrency() wcr.DecConcurrency()
if err != nil { if err != nil {
errorsTotal.Inc()
logger.Errorf("cannot read line #%d in /jsonline request: %s", n, err) logger.Errorf("cannot read line #%d in /jsonline request: %s", n, err)
break break
} }
@ -81,12 +82,10 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool {
vlstorage.MustAddRows(lr) vlstorage.MustAddRows(lr)
logstorage.PutLogRows(lr) logstorage.PutLogRows(lr)
// update jsonlineRequestDuration only for successfully parsed requests. // update requestDuration only for successfully parsed requests.
// There is no need in updating jsonlineRequestDuration for request errors, // There is no need in updating requestDuration for request errors,
// since their timings are usually much smaller than the timing for successful request parsing. // since their timings are usually much smaller than the timing for successful request parsing.
jsonlineRequestDuration.UpdateDuration(startTime) requestDuration.UpdateDuration(startTime)
return true
} }
func readLine(sc *bufio.Scanner, timeField, msgField string, processLogMessage func(timestamp int64, fields []logstorage.Field)) (bool, error) { func readLine(sc *bufio.Scanner, timeField, msgField string, processLogMessage func(timestamp int64, fields []logstorage.Field)) (bool, error) {
@ -108,53 +107,24 @@ func readLine(sc *bufio.Scanner, timeField, msgField string, processLogMessage f
if err := p.ParseLogMessage(line); err != nil { if err := p.ParseLogMessage(line); err != nil {
return false, fmt.Errorf("cannot parse json-encoded log entry: %w", err) return false, fmt.Errorf("cannot parse json-encoded log entry: %w", err)
} }
ts, err := extractTimestampFromFields(timeField, p.Fields) ts, err := insertutils.ExtractTimestampISO8601FromFields(timeField, p.Fields)
if err != nil { if err != nil {
return false, fmt.Errorf("cannot parse timestamp: %w", err) return false, fmt.Errorf("cannot get timestamp: %w", err)
} }
if ts == 0 { logstorage.RenameField(p.Fields, msgField, "_msg")
ts = time.Now().UnixNano()
}
p.RenameField(msgField, "_msg")
processLogMessage(ts, p.Fields) processLogMessage(ts, p.Fields)
logstorage.PutJSONParser(p) logstorage.PutJSONParser(p)
return true, nil return true, nil
} }
func extractTimestampFromFields(timeField string, fields []logstorage.Field) (int64, error) {
for i := range fields {
f := &fields[i]
if f.Name != timeField {
continue
}
timestamp, err := parseISO8601Timestamp(f.Value)
if err != nil {
return 0, err
}
f.Value = ""
return timestamp, nil
}
return 0, nil
}
func parseISO8601Timestamp(s string) (int64, error) {
if s == "0" || s == "" {
// Special case for returning the current timestamp.
// It must be automatically converted to the current timestamp by the caller.
return 0, nil
}
t, err := time.Parse(time.RFC3339, s)
if err != nil {
return 0, fmt.Errorf("cannot parse timestamp %q: %w", s, err)
}
return t.UnixNano(), nil
}
var lineBufferPool bytesutil.ByteBufferPool var lineBufferPool bytesutil.ByteBufferPool
var ( var (
requestsTotal = metrics.NewCounter(`vl_http_requests_total{path="/insert/jsonline"}`) rowsIngestedTotal = metrics.NewCounter(`vl_rows_ingested_total{type="jsonline"}`)
rowsIngestedTotal = metrics.NewCounter(`vl_rows_ingested_total{type="jsonline"}`)
jsonlineRequestDuration = metrics.NewHistogram(`vl_http_request_duration_seconds{path="/insert/jsonline"}`) requestsTotal = metrics.NewCounter(`vl_http_requests_total{path="/insert/jsonline"}`)
errorsTotal = metrics.NewCounter(`vl_http_errors_total{path="/insert/jsonline"}`)
requestDuration = metrics.NewHistogram(`vl_http_request_duration_seconds{path="/insert/jsonline"}`)
) )

View file

@ -3,14 +3,13 @@ package jsonline
import ( import (
"bufio" "bufio"
"bytes" "bytes"
"fmt"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage"
"reflect" "reflect"
"strings"
"testing" "testing"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage"
) )
func TestReadBulkRequestSuccess(t *testing.T) { func TestReadLine_Success(t *testing.T) {
f := func(data, timeField, msgField string, rowsExpected int, timestampsExpected []int64, resultExpected string) { f := func(data, timeField, msgField string, rowsExpected int, timestampsExpected []int64, resultExpected string) {
t.Helper() t.Helper()
@ -18,16 +17,9 @@ func TestReadBulkRequestSuccess(t *testing.T) {
var result string var result string
processLogMessage := func(timestamp int64, fields []logstorage.Field) { processLogMessage := func(timestamp int64, fields []logstorage.Field) {
timestamps = append(timestamps, timestamp) timestamps = append(timestamps, timestamp)
result += string(logstorage.MarshalFieldsToJSON(nil, fields)) + "\n"
a := make([]string, len(fields))
for i, f := range fields {
a[i] = fmt.Sprintf("%q:%q", f.Name, f.Value)
}
s := "{" + strings.Join(a, ",") + "}\n"
result += s
} }
// Read the request without compression
r := bytes.NewBufferString(data) r := bytes.NewBufferString(data)
sc := bufio.NewScanner(r) sc := bufio.NewScanner(r)
rows := 0 rows := 0
@ -53,7 +45,6 @@ func TestReadBulkRequestSuccess(t *testing.T) {
} }
} }
// Verify non-empty data
data := `{"@timestamp":"2023-06-06T04:48:11.735Z","log":{"offset":71770,"file":{"path":"/var/log/auth.log"}},"message":"foobar"} data := `{"@timestamp":"2023-06-06T04:48:11.735Z","log":{"offset":71770,"file":{"path":"/var/log/auth.log"}},"message":"foobar"}
{"@timestamp":"2023-06-06T04:48:12.735Z","message":"baz"} {"@timestamp":"2023-06-06T04:48:12.735Z","message":"baz"}
{"message":"xyz","@timestamp":"2023-06-06T04:48:13.735Z","x":"y"} {"message":"xyz","@timestamp":"2023-06-06T04:48:13.735Z","x":"y"}

View file

@ -11,7 +11,8 @@ import (
func RequestHandler(path string, w http.ResponseWriter, r *http.Request) bool { func RequestHandler(path string, w http.ResponseWriter, r *http.Request) bool {
switch path { switch path {
case "/api/v1/push": case "/api/v1/push":
return handleInsert(r, w) handleInsert(r, w)
return true
case "/ready": case "/ready":
// See https://grafana.com/docs/loki/latest/api/#identify-ready-loki-instance // See https://grafana.com/docs/loki/latest/api/#identify-ready-loki-instance
w.WriteHeader(http.StatusOK) w.WriteHeader(http.StatusOK)
@ -23,14 +24,14 @@ func RequestHandler(path string, w http.ResponseWriter, r *http.Request) bool {
} }
// See https://grafana.com/docs/loki/latest/api/#push-log-entries-to-loki // See https://grafana.com/docs/loki/latest/api/#push-log-entries-to-loki
func handleInsert(r *http.Request, w http.ResponseWriter) bool { func handleInsert(r *http.Request, w http.ResponseWriter) {
contentType := r.Header.Get("Content-Type") contentType := r.Header.Get("Content-Type")
switch contentType { switch contentType {
case "application/json": case "application/json":
return handleJSON(r, w) handleJSON(r, w)
default: default:
// Protobuf request body should be handled by default according to https://grafana.com/docs/loki/latest/api/#push-log-entries-to-loki // Protobuf request body should be handled by default according to https://grafana.com/docs/loki/latest/api/#push-log-entries-to-loki
return handleProtobuf(r, w) handleProtobuf(r, w)
} }
} }

View file

@ -20,15 +20,15 @@ import (
var parserPool fastjson.ParserPool var parserPool fastjson.ParserPool
func handleJSON(r *http.Request, w http.ResponseWriter) bool { func handleJSON(r *http.Request, w http.ResponseWriter) {
startTime := time.Now() startTime := time.Now()
lokiRequestsJSONTotal.Inc() requestsJSONTotal.Inc()
reader := r.Body reader := r.Body
if r.Header.Get("Content-Encoding") == "gzip" { if r.Header.Get("Content-Encoding") == "gzip" {
zr, err := common.GetGzipReader(reader) zr, err := common.GetGzipReader(reader)
if err != nil { if err != nil {
httpserver.Errorf(w, r, "cannot initialize gzip reader: %s", err) httpserver.Errorf(w, r, "cannot initialize gzip reader: %s", err)
return true return
} }
defer common.PutGzipReader(zr) defer common.PutGzipReader(zr)
reader = zr reader = zr
@ -39,17 +39,17 @@ func handleJSON(r *http.Request, w http.ResponseWriter) bool {
writeconcurrencylimiter.PutReader(wcr) writeconcurrencylimiter.PutReader(wcr)
if err != nil { if err != nil {
httpserver.Errorf(w, r, "cannot read request body: %s", err) httpserver.Errorf(w, r, "cannot read request body: %s", err)
return true return
} }
cp, err := getCommonParams(r) cp, err := getCommonParams(r)
if err != nil { if err != nil {
httpserver.Errorf(w, r, "cannot parse common params from request: %s", err) httpserver.Errorf(w, r, "cannot parse common params from request: %s", err)
return true return
} }
if err := vlstorage.CanWriteData(); err != nil { if err := vlstorage.CanWriteData(); err != nil {
httpserver.Errorf(w, r, "%s", err) httpserver.Errorf(w, r, "%s", err)
return true return
} }
lr := logstorage.GetLogRows(cp.StreamFields, cp.IgnoreFields) lr := logstorage.GetLogRows(cp.StreamFields, cp.IgnoreFields)
processLogMessage := cp.GetProcessLogMessageFunc(lr) processLogMessage := cp.GetProcessLogMessageFunc(lr)
@ -58,23 +58,21 @@ func handleJSON(r *http.Request, w http.ResponseWriter) bool {
logstorage.PutLogRows(lr) logstorage.PutLogRows(lr)
if err != nil { if err != nil {
httpserver.Errorf(w, r, "cannot parse Loki json request: %s", err) httpserver.Errorf(w, r, "cannot parse Loki json request: %s", err)
return true return
} }
rowsIngestedJSONTotal.Add(n) rowsIngestedJSONTotal.Add(n)
// update lokiRequestJSONDuration only for successfully parsed requests // update requestJSONDuration only for successfully parsed requests
// There is no need in updating lokiRequestJSONDuration for request errors, // There is no need in updating requestJSONDuration for request errors,
// since their timings are usually much smaller than the timing for successful request parsing. // since their timings are usually much smaller than the timing for successful request parsing.
lokiRequestJSONDuration.UpdateDuration(startTime) requestJSONDuration.UpdateDuration(startTime)
return true
} }
var ( var (
lokiRequestsJSONTotal = metrics.NewCounter(`vl_http_requests_total{path="/insert/loki/api/v1/push",format="json"}`) requestsJSONTotal = metrics.NewCounter(`vl_http_requests_total{path="/insert/loki/api/v1/push",format="json"}`)
rowsIngestedJSONTotal = metrics.NewCounter(`vl_rows_ingested_total{type="loki",format="json"}`) rowsIngestedJSONTotal = metrics.NewCounter(`vl_rows_ingested_total{type="loki",format="json"}`)
lokiRequestJSONDuration = metrics.NewHistogram(`vl_http_request_duration_seconds{path="/insert/loki/api/v1/push",format="json"}`) requestJSONDuration = metrics.NewHistogram(`vl_http_request_duration_seconds{path="/insert/loki/api/v1/push",format="json"}`)
) )
func parseJSONRequest(data []byte, processLogMessage func(timestamp int64, fields []logstorage.Field)) (int, error) { func parseJSONRequest(data []byte, processLogMessage func(timestamp int64, fields []logstorage.Field)) (int, error) {

View file

@ -23,25 +23,25 @@ var (
pushReqsPool sync.Pool pushReqsPool sync.Pool
) )
func handleProtobuf(r *http.Request, w http.ResponseWriter) bool { func handleProtobuf(r *http.Request, w http.ResponseWriter) {
startTime := time.Now() startTime := time.Now()
lokiRequestsProtobufTotal.Inc() requestsProtobufTotal.Inc()
wcr := writeconcurrencylimiter.GetReader(r.Body) wcr := writeconcurrencylimiter.GetReader(r.Body)
data, err := io.ReadAll(wcr) data, err := io.ReadAll(wcr)
writeconcurrencylimiter.PutReader(wcr) writeconcurrencylimiter.PutReader(wcr)
if err != nil { if err != nil {
httpserver.Errorf(w, r, "cannot read request body: %s", err) httpserver.Errorf(w, r, "cannot read request body: %s", err)
return true return
} }
cp, err := getCommonParams(r) cp, err := getCommonParams(r)
if err != nil { if err != nil {
httpserver.Errorf(w, r, "cannot parse common params from request: %s", err) httpserver.Errorf(w, r, "cannot parse common params from request: %s", err)
return true return
} }
if err := vlstorage.CanWriteData(); err != nil { if err := vlstorage.CanWriteData(); err != nil {
httpserver.Errorf(w, r, "%s", err) httpserver.Errorf(w, r, "%s", err)
return true return
} }
lr := logstorage.GetLogRows(cp.StreamFields, cp.IgnoreFields) lr := logstorage.GetLogRows(cp.StreamFields, cp.IgnoreFields)
processLogMessage := cp.GetProcessLogMessageFunc(lr) processLogMessage := cp.GetProcessLogMessageFunc(lr)
@ -50,23 +50,21 @@ func handleProtobuf(r *http.Request, w http.ResponseWriter) bool {
logstorage.PutLogRows(lr) logstorage.PutLogRows(lr)
if err != nil { if err != nil {
httpserver.Errorf(w, r, "cannot parse Loki protobuf request: %s", err) httpserver.Errorf(w, r, "cannot parse Loki protobuf request: %s", err)
return true return
} }
rowsIngestedProtobufTotal.Add(n) rowsIngestedProtobufTotal.Add(n)
// update lokiRequestProtobufDuration only for successfully parsed requests // update requestProtobufDuration only for successfully parsed requests
// There is no need in updating lokiRequestProtobufDuration for request errors, // There is no need in updating requestProtobufDuration for request errors,
// since their timings are usually much smaller than the timing for successful request parsing. // since their timings are usually much smaller than the timing for successful request parsing.
lokiRequestProtobufDuration.UpdateDuration(startTime) requestProtobufDuration.UpdateDuration(startTime)
return true
} }
var ( var (
lokiRequestsProtobufTotal = metrics.NewCounter(`vl_http_requests_total{path="/insert/loki/api/v1/push",format="protobuf"}`) requestsProtobufTotal = metrics.NewCounter(`vl_http_requests_total{path="/insert/loki/api/v1/push",format="protobuf"}`)
rowsIngestedProtobufTotal = metrics.NewCounter(`vl_rows_ingested_total{type="loki",format="protobuf"}`) rowsIngestedProtobufTotal = metrics.NewCounter(`vl_rows_ingested_total{type="loki",format="protobuf"}`)
lokiRequestProtobufDuration = metrics.NewHistogram(`vl_http_request_duration_seconds{path="/insert/loki/api/v1/push",format="protobuf"}`) requestProtobufDuration = metrics.NewHistogram(`vl_http_request_duration_seconds{path="/insert/loki/api/v1/push",format="protobuf"}`)
) )
func parseProtobufRequest(data []byte, processLogMessage func(timestamp int64, fields []logstorage.Field)) (int, error) { func parseProtobufRequest(data []byte, processLogMessage func(timestamp int64, fields []logstorage.Field)) (int, error) {

View file

@ -7,14 +7,17 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/app/vlinsert/elasticsearch" "github.com/VictoriaMetrics/VictoriaMetrics/app/vlinsert/elasticsearch"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vlinsert/jsonline" "github.com/VictoriaMetrics/VictoriaMetrics/app/vlinsert/jsonline"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vlinsert/loki" "github.com/VictoriaMetrics/VictoriaMetrics/app/vlinsert/loki"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vlinsert/syslog"
) )
// Init initializes vlinsert // Init initializes vlinsert
func Init() { func Init() {
syslog.MustInit()
} }
// Stop stops vlinsert // Stop stops vlinsert
func Stop() { func Stop() {
syslog.MustStop()
} }
// RequestHandler handles insert requests for VictoriaLogs // RequestHandler handles insert requests for VictoriaLogs
@ -28,7 +31,8 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool {
path = strings.ReplaceAll(path, "//", "/") path = strings.ReplaceAll(path, "//", "/")
if path == "/jsonline" { if path == "/jsonline" {
return jsonline.RequestHandler(w, r) jsonline.RequestHandler(w, r)
return true
} }
switch { switch {
case strings.HasPrefix(path, "/elasticsearch/"): case strings.HasPrefix(path, "/elasticsearch/"):

View file

@ -0,0 +1,389 @@
package syslog
import (
"bufio"
"crypto/tls"
"errors"
"flag"
"fmt"
"io"
"net"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/klauspost/compress/gzip"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vlinsert/insertutils"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vlstorage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/ingestserver"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
"github.com/VictoriaMetrics/metrics"
)
var (
syslogTenantID = flag.String("syslog.tenantID", "0:0", "TenantID for logs ingested via Syslog protocol. See https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/")
syslogTimezone = flag.String("syslog.timezone", "Local", "Timezone to use when parsing timestamps in RFC3164 syslog messages. Timezone must be a valid IANA Time Zone. "+
"For example: America/New_York, Europe/Berlin, Etc/GMT+3 . See https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/")
listenAddrTCP = flag.String("syslog.listenAddr.tcp", "", "Optional TCP address to listen to for Syslog messages. See https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/")
listenAddrUDP = flag.String("syslog.listenAddr.udp", "", "Optional UDP address to listen to for Syslog messages. See https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/")
tlsEnable = flag.Bool("syslog.tls", false, "Whether to use TLS for receiving syslog messages at -syslog.listenAddr.tcp. -syslog.tlsCertFile and -syslog.tlsKeyFile must be set "+
"if -syslog.tls is set. See https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/")
tlsCertFile = flag.String("syslog.tlsCertFile", "", "Path to file with TLS certificate for -syslog.listenAddr.tcp if -syslog.tls is set. "+
"Prefer ECDSA certs instead of RSA certs as RSA certs are slower. The provided certificate file is automatically re-read every second, so it can be dynamically updated. "+
"See https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/")
tlsKeyFile = flag.String("syslog.tlsKeyFile", "", "Path to file with TLS key for -syslog.listenAddr.tcp if -syslog.tls is set. "+
"The provided key file is automatically re-read every second, so it can be dynamically updated")
tlsCipherSuites = flagutil.NewArrayString("syslog.tlsCipherSuites", "Optional list of TLS cipher suites for -syslog.listenAddr.tcp if -syslog.tls is set. "+
"See the list of supported cipher suites at https://pkg.go.dev/crypto/tls#pkg-constants . "+
"See also https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/")
tlsMinVersion = flag.String("syslog.tlsMinVersion", "TLS13", "The minimum TLS version to use for -syslog.listenAddr.tcp if -syslog.tls is set. "+
"Supported values: TLS10, TLS11, TLS12, TLS13. "+
"See https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/")
compressMethod = flag.String("syslog.compressMethod", "", "Compression method for syslog messages received at -syslog.listenAddr.tcp and -syslog.listenAddr.udp. "+
"Supported values: none, gzip, deflate. See https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/")
)
// MustInit initializes syslog parser at the given -syslog.listenAddr.tcp and -syslog.listenAddr.udp ports
//
// This function must be called after flag.Parse().
//
// MustStop() must be called in order to free up resources occupied by the initialized syslog parser.
func MustInit() {
if workersStopCh != nil {
logger.Panicf("BUG: MustInit() called twice without MustStop() call")
}
workersStopCh = make(chan struct{})
tenantID, err := logstorage.GetTenantIDFromString(*syslogTenantID)
if err != nil {
logger.Fatalf("cannot parse -syslog.tenantID=%q: %s", *syslogTenantID, err)
}
globalTenantID = tenantID
switch *compressMethod {
case "", "none", "gzip", "deflate":
default:
logger.Fatalf("unexpected -syslog.compressLevel=%q; supported values: none, gzip, deflate", *compressMethod)
}
if *listenAddrTCP != "" {
workersWG.Add(1)
go func() {
runTCPListener(*listenAddrTCP)
workersWG.Done()
}()
}
if *listenAddrUDP != "" {
workersWG.Add(1)
go func() {
runUDPListener(*listenAddrUDP)
workersWG.Done()
}()
}
currentYear := time.Now().Year()
globalCurrentYear.Store(int64(currentYear))
workersWG.Add(1)
go func() {
ticker := time.NewTicker(time.Minute)
for {
select {
case <-workersStopCh:
ticker.Stop()
workersWG.Done()
return
case <-ticker.C:
currentYear := time.Now().Year()
globalCurrentYear.Store(int64(currentYear))
}
}
}()
if *syslogTimezone != "" {
tz, err := time.LoadLocation(*syslogTimezone)
if err != nil {
logger.Fatalf("cannot parse -syslog.timezone=%q: %s", *syslogTimezone, err)
}
globalTimezone = tz
} else {
globalTimezone = time.Local
}
}
var (
globalTenantID logstorage.TenantID
globalCurrentYear atomic.Int64
globalTimezone *time.Location
)
var (
workersWG sync.WaitGroup
workersStopCh chan struct{}
)
// MustStop stops syslog parser initialized via MustInit()
func MustStop() {
close(workersStopCh)
workersWG.Wait()
workersStopCh = nil
}
func runUDPListener(addr string) {
ln, err := net.ListenPacket(netutil.GetUDPNetwork(), addr)
if err != nil {
logger.Fatalf("cannot start UDP syslog server at %q: %s", addr, err)
}
doneCh := make(chan struct{})
go func() {
serveUDP(ln)
close(doneCh)
}()
<-workersStopCh
if err := ln.Close(); err != nil {
logger.Fatalf("syslog: cannot close UDP listener at %s: %s", addr, err)
}
<-doneCh
}
func runTCPListener(addr string) {
var tlsConfig *tls.Config
if *tlsEnable {
tc, err := netutil.GetServerTLSConfig(*tlsCertFile, *tlsKeyFile, *tlsMinVersion, *tlsCipherSuites)
if err != nil {
logger.Fatalf("cannot load TLS cert from -syslog.tlsCertFile=%q, -syslog.tlsKeyFile=%q, -syslog.tlsMinVersion=%q, -syslog.tlsCipherSuites=%q: %s",
*tlsCertFile, *tlsKeyFile, *tlsMinVersion, *tlsCipherSuites, err)
}
tlsConfig = tc
}
ln, err := netutil.NewTCPListener("syslog", addr, false, tlsConfig)
if err != nil {
logger.Fatalf("syslog: cannot start TCP listener at %s: %s", addr, err)
}
doneCh := make(chan struct{})
go func() {
serveTCP(ln)
close(doneCh)
}()
<-workersStopCh
if err := ln.Close(); err != nil {
logger.Fatalf("syslog: cannot close TCP listener at %s: %s", addr, err)
}
<-doneCh
}
func serveUDP(ln net.PacketConn) {
gomaxprocs := cgroup.AvailableCPUs()
var wg sync.WaitGroup
localAddr := ln.LocalAddr()
for i := 0; i < gomaxprocs; i++ {
wg.Add(1)
go func() {
defer wg.Done()
cp := insertutils.GetCommonParamsForSyslog(globalTenantID)
var bb bytesutil.ByteBuffer
bb.B = bytesutil.ResizeNoCopyNoOverallocate(bb.B, 64*1024)
for {
bb.Reset()
bb.B = bb.B[:cap(bb.B)]
n, remoteAddr, err := ln.ReadFrom(bb.B)
if err != nil {
udpErrorsTotal.Inc()
var ne net.Error
if errors.As(err, &ne) {
if ne.Temporary() {
logger.Errorf("syslog: temporary error when listening for UDP at %q: %s", localAddr, err)
time.Sleep(time.Second)
continue
}
if strings.Contains(err.Error(), "use of closed network connection") {
break
}
}
logger.Errorf("syslog: cannot read UDP data from %s at %s: %s", remoteAddr, localAddr, err)
continue
}
bb.B = bb.B[:n]
udpRequestsTotal.Inc()
if err := processStream(bb.NewReader(), cp); err != nil {
logger.Errorf("syslog: cannot process UDP data from %s at %s: %s", remoteAddr, localAddr, err)
}
}
}()
}
wg.Wait()
}
func serveTCP(ln net.Listener) {
var cm ingestserver.ConnsMap
cm.Init("syslog")
var wg sync.WaitGroup
addr := ln.Addr()
for {
c, err := ln.Accept()
if err != nil {
var ne net.Error
if errors.As(err, &ne) {
if ne.Temporary() {
logger.Errorf("syslog: temporary error when listening for TCP addr %q: %s", addr, err)
time.Sleep(time.Second)
continue
}
if strings.Contains(err.Error(), "use of closed network connection") {
break
}
logger.Fatalf("syslog: unrecoverable error when accepting TCP connections at %q: %s", addr, err)
}
logger.Fatalf("syslog: unexpected error when accepting TCP connections at %q: %s", addr, err)
}
if !cm.Add(c) {
_ = c.Close()
break
}
wg.Add(1)
go func() {
cp := insertutils.GetCommonParamsForSyslog(globalTenantID)
if err := processStream(c, cp); err != nil {
logger.Errorf("syslog: cannot process TCP data at %q: %s", addr, err)
}
cm.Delete(c)
_ = c.Close()
wg.Done()
}()
}
cm.CloseAll(0)
wg.Wait()
}
// processStream parses a stream of syslog messages from r and ingests them into vlstorage.
func processStream(r io.Reader, cp *insertutils.CommonParams) error {
switch *compressMethod {
case "", "none":
case "gzip":
zr, err := common.GetGzipReader(r)
if err != nil {
return fmt.Errorf("cannot read gzipped data: %w", err)
}
r = zr
case "deflate":
zr, err := common.GetZlibReader(r)
if err != nil {
return fmt.Errorf("cannot read deflated data: %w", err)
}
r = zr
default:
logger.Panicf("BUG: compressLevel=%q; supported values: none, gzip, deflate", *compressMethod)
}
err := processUncompressedStream(r, cp)
switch *compressMethod {
case "gzip":
zr := r.(*gzip.Reader)
common.PutGzipReader(zr)
case "deflate":
zr := r.(io.ReadCloser)
common.PutZlibReader(zr)
}
return err
}
func processUncompressedStream(r io.Reader, cp *insertutils.CommonParams) error {
if err := vlstorage.CanWriteData(); err != nil {
return err
}
lr := logstorage.GetLogRows(cp.StreamFields, nil)
processLogMessage := cp.GetProcessLogMessageFunc(lr)
wcr := writeconcurrencylimiter.GetReader(r)
defer writeconcurrencylimiter.PutReader(wcr)
lb := lineBufferPool.Get()
defer lineBufferPool.Put(lb)
lb.B = bytesutil.ResizeNoCopyNoOverallocate(lb.B, insertutils.MaxLineSizeBytes.IntN())
sc := bufio.NewScanner(wcr)
sc.Buffer(lb.B, len(lb.B))
n := 0
for {
currentYear := int(globalCurrentYear.Load())
ok, err := readLine(sc, currentYear, globalTimezone, processLogMessage)
wcr.DecConcurrency()
if err != nil {
errorsTotal.Inc()
return fmt.Errorf("cannot read line #%d: %s", n, err)
}
if !ok {
break
}
n++
rowsIngestedTotal.Inc()
}
vlstorage.MustAddRows(lr)
logstorage.PutLogRows(lr)
return nil
}
func readLine(sc *bufio.Scanner, currentYear int, timezone *time.Location, processLogMessage func(timestamp int64, fields []logstorage.Field)) (bool, error) {
var line []byte
for len(line) == 0 {
if !sc.Scan() {
if err := sc.Err(); err != nil {
if errors.Is(err, bufio.ErrTooLong) {
return false, fmt.Errorf(`line size exceeds -insert.maxLineSizeBytes=%d`, insertutils.MaxLineSizeBytes.IntN())
}
return false, err
}
return false, nil
}
line = sc.Bytes()
}
p := logstorage.GetSyslogParser(currentYear, timezone)
lineStr := bytesutil.ToUnsafeString(line)
p.Parse(lineStr)
ts, err := insertutils.ExtractTimestampISO8601FromFields("timestamp", p.Fields)
if err != nil {
return false, fmt.Errorf("cannot get timestamp from syslog line %q: %w", line, err)
}
logstorage.RenameField(p.Fields, "message", "_msg")
processLogMessage(ts, p.Fields)
logstorage.PutSyslogParser(p)
return true, nil
}
var lineBufferPool bytesutil.ByteBufferPool
var (
rowsIngestedTotal = metrics.NewCounter(`vl_rows_ingested_total{type="syslog"}`)
errorsTotal = metrics.NewCounter(`vl_errors_total{type="syslog"}`)
udpRequestsTotal = metrics.NewCounter(`vl_udp_reqests_total{type="syslog"}`)
udpErrorsTotal = metrics.NewCounter(`vl_udp_errors_total{type="syslog"}`)
)

View file

@ -0,0 +1,65 @@
package syslog
import (
"bufio"
"bytes"
"reflect"
"testing"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage"
)
func TestReadLine_Success(t *testing.T) {
f := func(data string, currentYear, rowsExpected int, timestampsExpected []int64, resultExpected string) {
t.Helper()
MustInit()
defer MustStop()
var timestamps []int64
var result string
processLogMessage := func(timestamp int64, fields []logstorage.Field) {
timestamps = append(timestamps, timestamp)
result += string(logstorage.MarshalFieldsToJSON(nil, fields)) + "\n"
}
r := bytes.NewBufferString(data)
sc := bufio.NewScanner(r)
rows := 0
timezone := time.UTC
for {
ok, err := readLine(sc, currentYear, timezone, processLogMessage)
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
if !ok {
break
}
rows++
}
if rows != rowsExpected {
t.Fatalf("unexpected rows read; got %d; want %d", rows, rowsExpected)
}
if !reflect.DeepEqual(timestamps, timestampsExpected) {
t.Fatalf("unexpected timestamps;\ngot\n%d\nwant\n%d", timestamps, timestampsExpected)
}
if result != resultExpected {
t.Fatalf("unexpected result;\ngot\n%s\nwant\n%s", result, resultExpected)
}
}
data := `Jun 3 12:08:33 abcd systemd: Starting Update the local ESM caches...
<165>Jun 4 12:08:33 abcd systemd[345]: abc defg
<123>1 2023-06-03T17:42:12.345Z mymachine.example.com appname 12345 ID47 [exampleSDID@32473 iut="3" eventSource="Application 123 = ] 56" eventID="11211"] This is a test message with structured data.
`
currentYear := 2023
rowsExpected := 3
timestampsExpected := []int64{1685794113000000000, 1685880513000000000, 1685814132345000000}
resultExpected := `{"format":"rfc3164","timestamp":"","hostname":"abcd","app_name":"systemd","_msg":"Starting Update the local ESM caches..."}
{"priority":"165","facility":"20","severity":"5","format":"rfc3164","timestamp":"","hostname":"abcd","app_name":"systemd","proc_id":"345","_msg":"abc defg"}
{"priority":"123","facility":"15","severity":"3","format":"rfc5424","timestamp":"","hostname":"mymachine.example.com","app_name":"appname","proc_id":"12345","msg_id":"ID47","exampleSDID@32473":"iut=\"3\" eventSource=\"Application 123 = ] 56\" eventID=\"11211\"","_msg":"This is a test message with structured data."}
`
f(data, currentYear, rowsExpected, timestampsExpected, resultExpected)
}

View file

@ -42,7 +42,7 @@ services:
# storing logs and serving read queries. # storing logs and serving read queries.
victorialogs: victorialogs:
container_name: victorialogs container_name: victorialogs
image: docker.io/victoriametrics/victoria-logs:v0.19.0-victorialogs image: docker.io/victoriametrics/victoria-logs:v0.20.0-victorialogs
command: command:
- "--storageDataPath=/vlogs" - "--storageDataPath=/vlogs"
- "--httpListenAddr=:9428" - "--httpListenAddr=:9428"

View file

@ -22,7 +22,7 @@ services:
- -beat.uri=http://filebeat-victorialogs:5066 - -beat.uri=http://filebeat-victorialogs:5066
victorialogs: victorialogs:
image: docker.io/victoriametrics/victoria-logs:v0.19.0-victorialogs image: docker.io/victoriametrics/victoria-logs:v0.20.0-victorialogs
volumes: volumes:
- victorialogs-filebeat-docker-vl:/vlogs - victorialogs-filebeat-docker-vl:/vlogs
ports: ports:

View file

@ -13,7 +13,7 @@ services:
- "5140:5140" - "5140:5140"
victorialogs: victorialogs:
image: docker.io/victoriametrics/victoria-logs:v0.19.0-victorialogs image: docker.io/victoriametrics/victoria-logs:v0.20.0-victorialogs
volumes: volumes:
- victorialogs-filebeat-syslog-vl:/vlogs - victorialogs-filebeat-syslog-vl:/vlogs
ports: ports:

View file

@ -11,7 +11,7 @@ services:
- "5140:5140" - "5140:5140"
victorialogs: victorialogs:
image: docker.io/victoriametrics/victoria-logs:v0.19.0-victorialogs image: docker.io/victoriametrics/victoria-logs:v0.20.0-victorialogs
volumes: volumes:
- victorialogs-fluentbit-vl:/vlogs - victorialogs-fluentbit-vl:/vlogs
ports: ports:

View file

@ -14,7 +14,7 @@ services:
- "5140:5140" - "5140:5140"
victorialogs: victorialogs:
image: docker.io/victoriametrics/victoria-logs:v0.19.0-victorialogs image: docker.io/victoriametrics/victoria-logs:v0.20.0-victorialogs
volumes: volumes:
- victorialogs-logstash-vl:/vlogs - victorialogs-logstash-vl:/vlogs
ports: ports:

View file

@ -12,7 +12,7 @@ services:
- "5140:5140" - "5140:5140"
vlogs: vlogs:
image: docker.io/victoriametrics/victoria-logs:v0.19.0-victorialogs image: docker.io/victoriametrics/victoria-logs:v0.20.0-victorialogs
volumes: volumes:
- victorialogs-promtail-docker:/vlogs - victorialogs-promtail-docker:/vlogs
ports: ports:

View file

@ -22,7 +22,7 @@ services:
condition: service_healthy condition: service_healthy
victorialogs: victorialogs:
image: docker.io/victoriametrics/victoria-logs:v0.19.0-victorialogs image: docker.io/victoriametrics/victoria-logs:v0.20.0-victorialogs
volumes: volumes:
- victorialogs-vector-docker-vl:/vlogs - victorialogs-vector-docker-vl:/vlogs
ports: ports:

View file

@ -3,7 +3,7 @@ version: '3'
services: services:
# Run `make package-victoria-logs` to build victoria-logs image # Run `make package-victoria-logs` to build victoria-logs image
vlogs: vlogs:
image: docker.io/victoriametrics/victoria-logs:v0.19.0-victorialogs image: docker.io/victoriametrics/victoria-logs:v0.20.0-victorialogs
volumes: volumes:
- vlogs:/vlogs - vlogs:/vlogs
ports: ports:

View file

@ -19,6 +19,14 @@ according to [these docs](https://docs.victoriametrics.com/victorialogs/quicksta
## tip ## tip
## [v0.20.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.20.0-victorialogs)
Released at 2024-06-17
* FEATURE: add ability to accept logs in [Syslog format](https://en.wikipedia.org/wiki/Syslog). See [these docs](https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/).
* FEATURE: add abitlity to specify timezone offset when parsing [rfc3164](https://datatracker.ietf.org/doc/html/rfc3164) syslog messages with [`unpack_syslog` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#unpack_syslog-pipe).
* FEATURE: add [`top` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#top-pipe) for returning top N sets of the given fields with the maximum number of matching log entries.
## [v0.19.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.19.0-victorialogs) ## [v0.19.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.19.0-victorialogs)
Released at 2024-06-11 Released at 2024-06-11
@ -284,7 +292,7 @@ Released at 2023-10-03
Released at 2023-07-20 Released at 2023-07-20
* FEATURE: add support for data ingestion via Promtail (aka default log shipper for Grafana Loki). See [these](https://docs.victoriametrics.com/victorialogs/data-ingestion/Promtail.html) and [these](https://docs.victoriametrics.com/victorialogs/data-ingestion/#loki-json-api) docs. * FEATURE: add support for data ingestion via Promtail (aka default log shipper for Grafana Loki). See [these](https://docs.victoriametrics.com/victorialogs/data-ingestion/promtail/) and [these](https://docs.victoriametrics.com/victorialogs/data-ingestion/#loki-json-api) docs.
## [v0.2.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.2.0-victorialogs) ## [v0.2.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.2.0-victorialogs)

View file

@ -1265,6 +1265,7 @@ LogsQL supports the following pipes:
- [`replace_regexp`](#replace_regexp-pipe) updates [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) with regular expressions. - [`replace_regexp`](#replace_regexp-pipe) updates [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) with regular expressions.
- [`sort`](#sort-pipe) sorts logs by the given [fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). - [`sort`](#sort-pipe) sorts logs by the given [fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
- [`stats`](#stats-pipe) calculates various stats over the selected logs. - [`stats`](#stats-pipe) calculates various stats over the selected logs.
- [`top`](#top-pipe) returns top `N` field sets with the maximum number of matching logs.
- [`uniq`](#uniq-pipe) returns unique log entires. - [`uniq`](#uniq-pipe) returns unique log entires.
- [`unpack_json`](#unpack_json-pipe) unpacks JSON messages from [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). - [`unpack_json`](#unpack_json-pipe) unpacks JSON messages from [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
- [`unpack_logfmt`](#unpack_logfmt-pipe) unpacks [logfmt](https://brandur.org/logfmt) messages from [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). - [`unpack_logfmt`](#unpack_logfmt-pipe) unpacks [logfmt](https://brandur.org/logfmt) messages from [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
@ -1573,6 +1574,7 @@ If the limit is reached, then the set of returned values is random. Also the num
See also: See also:
- [`field_names` pipe](#field_names-pipe) - [`field_names` pipe](#field_names-pipe)
- [`top` pipe](#top-pipe)
- [`uniq` pipe](#uniq-pipe) - [`uniq` pipe](#uniq-pipe)
### fields pipe ### fields pipe
@ -2139,6 +2141,8 @@ See also:
- [stats pipe functions](#stats-pipe-functions) - [stats pipe functions](#stats-pipe-functions)
- [`math` pipe](#math-pipe) - [`math` pipe](#math-pipe)
- [`sort` pipe](#sort-pipe) - [`sort` pipe](#sort-pipe)
- [`uniq` pipe](#uniq-pipe)
- [`top` pipe](#top-pipe)
#### Stats by fields #### Stats by fields
@ -2256,9 +2260,41 @@ _time:5m | stats
count() total count() total
``` ```
### top pipe
`| top N by (field1, ..., fieldN)` [pipe](#pipes) returns top `N` sets for `(field1, ..., fieldN)` [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model)
with the maximum number of matching log entries.
For example, the following query returns top 7 [log streams](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields)
with the maximum number of log entries over the last 5 minutes:
```logsql
_time:5m | top 7 by (_stream)
```
The `N` is optional. If it is skipped, then top 10 entries are returned. For example, the following query returns top 10 values
for `ip` [field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) seen in logs for the last 5 minutes:
```logsql
_time:5m | top by (ip)
```
The `by (...)` part in the `top` [pipe](#pipes) is optional. If it is skipped, then all the log fields are taken into account
when determining top field sets. This is useful when the field sets are already limited by other pipes such as [`fields` pipe](#fields-pipe).
For example, the following query is equivalent to the previous one:
```logsql
_time:5m | fields ip | top
```
See also:
- [`uniq` pipe](#uniq-pipe)
- [`stats` pipe](#stats-pipe)
### uniq pipe ### uniq pipe
`| uniq ...` pipe returns unique results over the selected logs. For example, the following LogsQL query `| uniq ...` [pipe](#pipes) returns unique results over the selected logs. For example, the following LogsQL query
returns unique values for `ip` [log field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) returns unique values for `ip` [log field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model)
over logs for the last 5 minutes: over logs for the last 5 minutes:
@ -2300,6 +2336,8 @@ _time:5m | uniq (host, path) limit 100
See also: See also:
- [`uniq_values` stats function](#uniq_values-stats) - [`uniq_values` stats function](#uniq_values-stats)
- [`top` pipe](#top-pipe)
- [`stats` pipe](#stats-pipe)
### unpack_json pipe ### unpack_json pipe
@ -2512,6 +2550,13 @@ The following query is equivalent to the previous one:
_time:5m | unpack_syslog _time:5m | unpack_syslog
``` ```
By default timestamps in [RFC3164 format](https://datatracker.ietf.org/doc/html/rfc3164) are converted to local timezone. It is possible to change the timezone
offset via `offset` option. For example, the following query adds 5 hours and 30 minutes to unpacked `rfc3164` timestamps:
```logsql
_time:5m | unpack_syslog offset 5h30m
```
If it is needed to preserve the original non-empty field values, then add `keep_original_fields` to the end of `unpack_syslog ...`: If it is needed to preserve the original non-empty field values, then add `keep_original_fields` to the end of `unpack_syslog ...`:
```logsql ```logsql

View file

@ -36,8 +36,8 @@ Just download archive for the needed Operating system and architecture, unpack i
For example, the following commands download VictoriaLogs archive for Linux/amd64, unpack and run it: For example, the following commands download VictoriaLogs archive for Linux/amd64, unpack and run it:
```sh ```sh
curl -L -O https://github.com/VictoriaMetrics/VictoriaMetrics/releases/download/v0.19.0-victorialogs/victoria-logs-linux-amd64-v0.19.0-victorialogs.tar.gz curl -L -O https://github.com/VictoriaMetrics/VictoriaMetrics/releases/download/v0.20.0-victorialogs/victoria-logs-linux-amd64-v0.20.0-victorialogs.tar.gz
tar xzf victoria-logs-linux-amd64-v0.19.0-victorialogs.tar.gz tar xzf victoria-logs-linux-amd64-v0.20.0-victorialogs.tar.gz
./victoria-logs-prod ./victoria-logs-prod
``` ```
@ -61,7 +61,7 @@ Here is the command to run VictoriaLogs in a Docker container:
```sh ```sh
docker run --rm -it -p 9428:9428 -v ./victoria-logs-data:/victoria-logs-data \ docker run --rm -it -p 9428:9428 -v ./victoria-logs-data:/victoria-logs-data \
docker.io/victoriametrics/victoria-logs:v0.19.0-victorialogs docker.io/victoriametrics/victoria-logs:v0.20.0-victorialogs
``` ```
See also: See also:

View file

@ -16,11 +16,12 @@ aliases:
[VictoriaLogs](https://docs.victoriametrics.com/victorialogs/) can accept logs from the following log collectors: [VictoriaLogs](https://docs.victoriametrics.com/victorialogs/) can accept logs from the following log collectors:
- Filebeat. See [how to setup Filebeat for sending logs to VictoriaLogs](https://docs.victoriametrics.com/victorialogs/data-ingestion/Filebeat.html). - Syslog - see [these docs](https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/).
- Fluentbit. See [how to setup Fluentbit for sending logs to VictoriaLogs](https://docs.victoriametrics.com/victorialogs/data-ingestion/Fluentbit.html). - Filebeat - see [these docs](https://docs.victoriametrics.com/victorialogs/data-ingestion/filebeat/).
- Logstash. See [how to setup Logstash for sending logs to VictoriaLogs](https://docs.victoriametrics.com/victorialogs/data-ingestion/Logstash.html). - Fluentbit - see [these docs](https://docs.victoriametrics.com/victorialogs/data-ingestion/fluentbit/).
- Vector. See [how to setup Vector for sending logs to VictoriaLogs](https://docs.victoriametrics.com/victorialogs/data-ingestion/Vector.html). - Logstash - see [these docs](https://docs.victoriametrics.com/victorialogs/data-ingestion/logstash/).
- Promtail (aka Grafana Loki). See [how to setup Promtail for sending logs to VictoriaLogs](https://docs.victoriametrics.com/victorialogs/data-ingestion/Promtail.html). - Vector - see [these docs](https://docs.victoriametrics.com/victorialogs/data-ingestion/vector/).
- Promtail (aka Grafana Loki) - see [these docs](https://docs.victoriametrics.com/victorialogs/data-ingestion/promtail/).
The ingested logs can be queried according to [these docs](https://docs.victoriametrics.com/victorialogs/querying/). The ingested logs can be queried according to [these docs](https://docs.victoriametrics.com/victorialogs/querying/).
@ -258,8 +259,8 @@ Here is the list of log collectors and their ingestion formats supported by Vict
| How to setup the collector | Format: Elasticsearch | Format: JSON Stream | Format: Loki | | How to setup the collector | Format: Elasticsearch | Format: JSON Stream | Format: Loki |
|------------------------------------------------------------------------------------------|--------------------------------------------------------------------------------------------|---------------------------------------------------------------|-------------------------------------------------------------------------------------| |------------------------------------------------------------------------------------------|--------------------------------------------------------------------------------------------|---------------------------------------------------------------|-------------------------------------------------------------------------------------|
| [Filebeat](https://docs.victoriametrics.com/victorialogs/data-ingestion/Filebeat.html) | [Yes](https://www.elastic.co/guide/en/beats/filebeat/current/elasticsearch-output.html) | No | No | | [Filebeat](https://docs.victoriametrics.com/victorialogs/data-ingestion/filebeat/) | [Yes](https://www.elastic.co/guide/en/beats/filebeat/current/elasticsearch-output.html) | No | No |
| [Fluentbit](https://docs.victoriametrics.com/victorialogs/data-ingestion/Fluentbit.html) | No | [Yes](https://docs.fluentbit.io/manual/pipeline/outputs/http) | [Yes](https://docs.fluentbit.io/manual/pipeline/outputs/loki) | | [Fluentbit](https://docs.victoriametrics.com/victorialogs/data-ingestion/fluentbit/) | No | [Yes](https://docs.fluentbit.io/manual/pipeline/outputs/http) | [Yes](https://docs.fluentbit.io/manual/pipeline/outputs/loki) |
| [Logstash](https://docs.victoriametrics.com/victorialogs/data-ingestion/Logstash.html) | [Yes](https://www.elastic.co/guide/en/logstash/current/plugins-outputs-elasticsearch.html) | No | No | | [Logstash](https://docs.victoriametrics.com/victorialogs/data-ingestion/logstash/) | [Yes](https://www.elastic.co/guide/en/logstash/current/plugins-outputs-elasticsearch.html) | No | No |
| [Vector](https://docs.victoriametrics.com/victorialogs/data-ingestion/Vector.html) | [Yes](https://vector.dev/docs/reference/configuration/sinks/elasticsearch/) | [Yes](https://vector.dev/docs/reference/configuration/sinks/http/) | [Yes](https://vector.dev/docs/reference/configuration/sinks/loki/) | | [Vector](https://docs.victoriametrics.com/victorialogs/data-ingestion/vector/) | [Yes](https://vector.dev/docs/reference/configuration/sinks/elasticsearch/) | [Yes](https://vector.dev/docs/reference/configuration/sinks/http/) | [Yes](https://vector.dev/docs/reference/configuration/sinks/loki/) |
| [Promtail](https://docs.victoriametrics.com/victorialogs/data-ingestion/Promtail.html) | No | No | [Yes](https://grafana.com/docs/loki/latest/clients/promtail/configuration/#clients) | | [Promtail](https://docs.victoriametrics.com/victorialogs/data-ingestion/promtail/) | No | No | [Yes](https://grafana.com/docs/loki/latest/clients/promtail/configuration/#clients) |

View file

@ -1,11 +1,11 @@
--- ---
weight: 5 weight: 20
title: Vector setup title: Vector setup
disableToc: true disableToc: true
menu: menu:
docs: docs:
parent: "victorialogs-data-ingestion" parent: "victorialogs-data-ingestion"
weight: 5 weight: 20
aliases: aliases:
- /VictoriaLogs/data-ingestion/Vector.html - /VictoriaLogs/data-ingestion/Vector.html
- /victorialogs/data-ingestion/Vector.html - /victorialogs/data-ingestion/Vector.html

View file

@ -0,0 +1,99 @@
---
weight: 10
title: Syslog setup
disableToc: true
menu:
docs:
parent: "victorialogs-data-ingestion"
weight: 10
---
# Syslog setup
[VictoriaLogs](https://docs.victoriametrics.com/victorialogs/) can accept logs in [Syslog formats](https://en.wikipedia.org/wiki/Syslog) at the specified TCP and UDP addresses
via `-syslog.listenAddr.tcp` and `-syslog.listenAddr.udp` command-line flags. The following syslog formats are supported:
- [RFC3164](https://datatracker.ietf.org/doc/html/rfc3164) aka `<PRI>MMM DD hh:mm:ss HOSTNAME APP-NAME[PROCID]: MESSAGE`
- [RFC5424](https://datatracker.ietf.org/doc/html/rfc5424) aka `<PRI>1 TIMESTAMP HOSTNAME APP-NAME PROCID MSGID [STRUCTURED-DATA] MESSAGE`
For example, the following command starts VictoriaLogs, which accepts logs in Syslog format at TCP port 514 on all the network interfaces:
```sh
./victoria-logs -syslog.listenAddr.tcp=:514
```
It may be needed to run VictoriaLogs under `root` user or to set [`CAP_NET_BIND_SERVICE`](https://superuser.com/questions/710253/allow-non-root-process-to-bind-to-port-80-and-443)
option if syslog messages must be accepted at TCP port below 1024.
The following command starts VictoriaLogs, which accepts logs in Syslog format at TCP and UDP ports 514:
```sh
./victoria-logs -syslog.listenAddr.tcp=:514 -syslog.listenAddr.udp=:514
```
Multiple logs in Syslog format can be ingested via a single TCP connection or via a single UDP packet - just put every log on a separate line
and delimit them with `\n` char.
VictoriaLogs automatically extracts the following [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model)
from the received Syslog lines:
- [`_time`](https://docs.victoriametrics.com/victorialogs/keyconcepts/#time-field) - log timestamp
- [`_msg`](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field) - the `MESSAGE` field from the supported syslog formats above
- `hostname`, `app_name` and `proc_id` - [stream fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields) for unique identification
over every log stream
- `priority`, `factility` and `severity` - these fields are extracted from `<PRI>` field
- `format` - this field is set to either `rfc3164` or `rfc5424` depending on the format of the parsed syslog line
- `msg_id` - `MSGID` field from log line in `RFC5424` format.
By default local timezone is used when parsing timestamps in `rfc3164` lines. This can be changed to any desired timezone via `-syslog.timezone` command-line flag.
See [the list of supported timezone identifiers](https://en.wikipedia.org/wiki/List_of_tz_database_time_zones). For example, the following command starts VictoriaLogs,
which parses syslog timestamps in `rfc3164` using `Europe/Berlin` timezone:
```sh
./victoria-logs -syslog.listenAddr.tcp=:514 -syslog.timezone='Europe/Berlin'
```
See also:
- [Security](#security)
- [Compression](#compression)
- [Multitenancy](#multitenancy)
- [Data ingestion troubleshooting](https://docs.victoriametrics.com/victorialogs/data-ingestion/#troubleshooting).
- [How to query VictoriaLogs](https://docs.victoriametrics.com/victorialogs/querying/).
## Security
By default VictoriaLogs accepts plaintext data at `-syslog.listenAddr.tcp` address. Run VictoriaLogs with `-syslog.tls` command-line flag
in order to accept TLS-encrypted logs at `-syslog.listenAddr.tcp` address. The `-syslog.tlsCertFile` and `-syslog.tlsKeyFile` command-line flags
must be set to paths to TLS certificate file and TLS key file if `-syslog.tls` is set. For example, the following command
starts VictoriaLogs, which accepts TLS-encrypted syslog messages at TCP port 514:
```sh
./victoria-logs -syslog.listenAddr.tcp=:514 -syslog.tls -syslog.tlsCertFile=/path/to/tls/cert -syslog.tlsKeyFile=/path/to/tls/key
```
## Compression
By default VictoriaLogs accepts uncompressed log messages in Syslog format at `-syslog.listenAddr.tcp` and `-syslog.listenAddr.udp` addresses.
It is possible configuring VictoriaLogs to accept compressed log messages via `-syslog.compressMethod` command-line flag. The following
compression methods are supported:
- `none` - no compression
- `gzip` - [gzip compression](https://en.wikipedia.org/wiki/Gzip)
- `deflate` - [deflate compression](https://en.wikipedia.org/wiki/Deflate)
For example, the following command starts VictoriaLogs, which accepts gzip-compressed syslog messages at TCP port 514:
```sh
./victoria-logs -syslog.listenAddr.tcp=:514 -syslog.compressMethod=gzip
```
## Multitenancy
By default, the ingested logs are stored in the `(AccountID=0, ProjectID=0)` [tenant](https://docs.victoriametrics.com/victorialogs/#multitenancy).
If you need storing logs in other tenant, then specify the needed tenant via `-syslog.tenantID` command-line flag.
For example, the following command starts VictoriaLogs, which writes syslog messages received at TCP port 514, to `(AccountID=12, ProjectID=34)` tenant:
```sh
./victoria-logs -syslog.listenAddr.tcp=:514 -syslog.tenantID=12:34
```

View file

@ -286,6 +286,12 @@ This query uses the following [LogsQL](https://docs.victoriametrics.com/victoria
- [`sort` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#sort-pipe) for sorting the stats by `logs` field in descending order. - [`sort` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#sort-pipe) for sorting the stats by `logs` field in descending order.
- [`limit` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#limit-pipe) for limiting the number of returned results to 10. - [`limit` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#limit-pipe) for limiting the number of returned results to 10.
This query can be simplified into the following one, which uses [`top` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#top-pipe):
```logsql
_time:5m | top 10 by (_stream)
```
See also: See also:
- [How to filter out data after stats calculation?](#how-to-filter-out-data-after-stats-calculation) - [How to filter out data after stats calculation?](#how-to-filter-out-data-after-stats-calculation)

View file

@ -44,6 +44,8 @@ func (cm *ConnsMap) Delete(c net.Conn) {
} }
// CloseAll gradually closes all the cm conns with during the given shutdownDuration. // CloseAll gradually closes all the cm conns with during the given shutdownDuration.
//
// If shutdownDuration <= 0, then all the connections are closed simultaneously.
func (cm *ConnsMap) CloseAll(shutdownDuration time.Duration) { func (cm *ConnsMap) CloseAll(shutdownDuration time.Duration) {
cm.mu.Lock() cm.mu.Lock()
conns := make([]net.Conn, 0, len(cm.m)) conns := make([]net.Conn, 0, len(cm.m))
@ -70,8 +72,8 @@ func (cm *ConnsMap) CloseAll(shutdownDuration time.Duration) {
return return
} }
// Sort vminsert conns in order to make the order of closing connections deterministic across vmstorage nodes. // Sort conns in order to make the order of closing connections deterministic across clients.
// This should reduce resource usage spikes at vmstorage nodes during rolling restarts. // This should reduce resource usage spikes at clients during rolling restarts.
sort.Slice(conns, func(i, j int) bool { sort.Slice(conns, func(i, j int) bool {
return conns[i].RemoteAddr().String() < conns[j].RemoteAddr().String() return conns[i].RemoteAddr().String() < conns[j].RemoteAddr().String()
}) })

View file

@ -1166,7 +1166,7 @@ func (br *blockResult) getBucketedValue(s string, bf *byStatsField) string {
return bytesutil.ToUnsafeString(buf[bufLen:]) return bytesutil.ToUnsafeString(buf[bufLen:])
} }
if timestamp, ok := tryParseTimestampISO8601(s); ok { if timestamp, ok := TryParseTimestampISO8601(s); ok {
bucketSizeInt := int64(bf.bucketSize) bucketSizeInt := int64(bf.bucketSize)
if bucketSizeInt <= 0 { if bucketSizeInt <= 0 {
bucketSizeInt = 1 bucketSizeInt = 1
@ -1190,7 +1190,7 @@ func (br *blockResult) getBucketedValue(s string, bf *byStatsField) string {
return bytesutil.ToUnsafeString(buf[bufLen:]) return bytesutil.ToUnsafeString(buf[bufLen:])
} }
if timestamp, ok := tryParseTimestampRFC3339Nano(s); ok { if timestamp, ok := TryParseTimestampRFC3339Nano(s); ok {
bucketSizeInt := int64(bf.bucketSize) bucketSizeInt := int64(bf.bucketSize)
if bucketSizeInt <= 0 { if bucketSizeInt <= 0 {
bucketSizeInt = 1 bucketSizeInt = 1

View file

@ -102,7 +102,7 @@ func (fr *filterDayRange) applyToBlockResult(br *blockResult, bm *bitmap) {
} }
func (fr *filterDayRange) matchTimestampString(v string) bool { func (fr *filterDayRange) matchTimestampString(v string) bool {
timestamp, ok := tryParseTimestampRFC3339Nano(v) timestamp, ok := TryParseTimestampRFC3339Nano(v)
if !ok { if !ok {
return false return false
} }

View file

@ -141,7 +141,7 @@ func (fe *filterExact) applyToBlockResult(br *blockResult, bm *bitmap) {
return ip == ipNeeded return ip == ipNeeded
}) })
case valueTypeTimestampISO8601: case valueTypeTimestampISO8601:
timestampNeeded, ok := tryParseTimestampISO8601(value) timestampNeeded, ok := TryParseTimestampISO8601(value)
if !ok { if !ok {
bm.resetBits() bm.resetBits()
return return
@ -213,7 +213,7 @@ func (fe *filterExact) applyToBlockSearch(bs *blockSearch, bm *bitmap) {
} }
func matchTimestampISO8601ByExactValue(bs *blockSearch, ch *columnHeader, bm *bitmap, value string, tokens []string) { func matchTimestampISO8601ByExactValue(bs *blockSearch, ch *columnHeader, bm *bitmap, value string, tokens []string) {
n, ok := tryParseTimestampISO8601(value) n, ok := TryParseTimestampISO8601(value)
if !ok || n < int64(ch.minValue) || n > int64(ch.maxValue) { if !ok || n < int64(ch.minValue) || n > int64(ch.maxValue) {
bm.resetBits() bm.resetBits()
return return

View file

@ -245,7 +245,7 @@ func (fi *filterIn) initTimestampISO8601Values() {
m := make(map[string]struct{}, len(values)) m := make(map[string]struct{}, len(values))
buf := make([]byte, 0, len(values)*8) buf := make([]byte, 0, len(values)*8)
for _, v := range values { for _, v := range values {
n, ok := tryParseTimestampISO8601(v) n, ok := TryParseTimestampISO8601(v)
if !ok { if !ok {
continue continue
} }

View file

@ -100,7 +100,7 @@ func (fp *filterPhrase) applyToBlockSearch(bs *blockSearch, bm *bitmap) {
} }
func matchTimestampISO8601ByPhrase(bs *blockSearch, ch *columnHeader, bm *bitmap, phrase string, tokens []string) { func matchTimestampISO8601ByPhrase(bs *blockSearch, ch *columnHeader, bm *bitmap, phrase string, tokens []string) {
_, ok := tryParseTimestampISO8601(phrase) _, ok := TryParseTimestampISO8601(phrase)
if ok { if ok {
// Fast path - the phrase contains complete timestamp, so we can use exact search // Fast path - the phrase contains complete timestamp, so we can use exact search
matchTimestampISO8601ByExactValue(bs, ch, bm, phrase, tokens) matchTimestampISO8601ByExactValue(bs, ch, bm, phrase, tokens)

View file

@ -96,7 +96,7 @@ func (ft *filterTime) applyToBlockResult(br *blockResult, bm *bitmap) {
} }
func (ft *filterTime) matchTimestampString(v string) bool { func (ft *filterTime) matchTimestampString(v string) bool {
timestamp, ok := tryParseTimestampRFC3339Nano(v) timestamp, ok := TryParseTimestampRFC3339Nano(v)
if !ok { if !ok {
return false return false
} }

View file

@ -1,9 +1,9 @@
package logstorage package logstorage
import ( import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"time" "time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
) )
// filterWeekRange filters by week range. // filterWeekRange filters by week range.
@ -104,7 +104,7 @@ func (fr *filterWeekRange) applyToBlockResult(br *blockResult, bm *bitmap) {
} }
func (fr *filterWeekRange) matchTimestampString(v string) bool { func (fr *filterWeekRange) matchTimestampString(v string) bool {
timestamp, ok := tryParseTimestampRFC3339Nano(v) timestamp, ok := TryParseTimestampRFC3339Nano(v)
if !ok { if !ok {
return false return false
} }

View file

@ -78,21 +78,6 @@ func (p *JSONParser) ParseLogMessage(msg []byte) error {
return nil return nil
} }
// RenameField renames field with the oldName to newName in p.Fields
func (p *JSONParser) RenameField(oldName, newName string) {
if oldName == "" {
return
}
fields := p.Fields
for i := range fields {
f := &fields[i]
if f.Name == oldName {
f.Name = newName
return
}
}
}
func appendLogFields(dst []Field, dstBuf, prefixBuf []byte, v *fastjson.Value) ([]Field, []byte, []byte) { func appendLogFields(dst []Field, dstBuf, prefixBuf []byte, v *fastjson.Value) ([]Field, []byte, []byte) {
o := v.GetObject() o := v.GetObject()
o.Visit(func(k []byte, v *fastjson.Value) { o.Visit(func(k []byte, v *fastjson.Value) {

View file

@ -60,20 +60,8 @@ type RowFormatter []Field
// String returns user-readable representation for rf // String returns user-readable representation for rf
func (rf *RowFormatter) String() string { func (rf *RowFormatter) String() string {
b := append([]byte{}, '{') result := MarshalFieldsToJSON(nil, *rf)
return string(result)
fields := *rf
if len(fields) > 0 {
b = append(b, fields[0].String()...)
fields = fields[1:]
for _, field := range fields {
b = append(b, ',')
b = append(b, field.String()...)
}
}
b = append(b, '}')
return string(b)
} }
// Reset resets lr with all its settings. // Reset resets lr with all its settings.

View file

@ -26,5 +26,5 @@ func TestLogfmtParser(t *testing.T) {
f(`foo bar`, `{"foo":"","bar":""}`) f(`foo bar`, `{"foo":"","bar":""}`)
f(`foo bar=baz`, `{"foo":"","bar":"baz"}`) f(`foo bar=baz`, `{"foo":"","bar":"baz"}`)
f(`foo=bar baz="x y" a=b`, `{"foo":"bar","baz":"x y","a":"b"}`) f(`foo=bar baz="x y" a=b`, `{"foo":"bar","baz":"x y","a":"b"}`)
f(` foo=bar baz=x =z qwe`, `{"foo":"bar","baz":"x","":"z","qwe":""}`) f(` foo=bar baz=x =z qwe`, `{"foo":"bar","baz":"x","_msg":"z","qwe":""}`)
} }

View file

@ -1316,7 +1316,7 @@ func parseNumber(lex *lexer) (float64, string, error) {
return f, s, nil return f, s, nil
} }
return 0, "", fmt.Errorf("cannot parse %q as float64", s) return 0, s, fmt.Errorf("cannot parse %q as float64", s)
} }
func parseFuncArg(lex *lexer, fieldName string, callback func(args string) (filter, error)) (filter, error) { func parseFuncArg(lex *lexer, fieldName string, callback func(args string) (filter, error)) (filter, error) {

View file

@ -214,6 +214,12 @@ func parsePipe(lex *lexer) (pipe, error) {
return nil, fmt.Errorf("cannot parse 'stats' pipe: %w", err) return nil, fmt.Errorf("cannot parse 'stats' pipe: %w", err)
} }
return ps, nil return ps, nil
case lex.isKeyword("top"):
pt, err := parsePipeTop(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse 'top' pipe: %w", err)
}
return pt, nil
case lex.isKeyword("uniq"): case lex.isKeyword("uniq"):
pu, err := parsePipeUniq(lex) pu, err := parsePipeUniq(lex)
if err != nil { if err != nil {
@ -287,6 +293,7 @@ var pipeNames = func() map[string]struct{} {
"replace_regexp", "replace_regexp",
"sort", "sort",
"stats", "stats",
"top",
"uniq", "uniq",
"unpack_json", "unpack_json",
"unpack_logfmt", "unpack_logfmt",

View file

@ -919,7 +919,7 @@ func parseMathNumber(s string) float64 {
if ok { if ok {
return f return f
} }
nsecs, ok := tryParseTimestampRFC3339Nano(s) nsecs, ok := TryParseTimestampRFC3339Nano(s)
if ok { if ok {
return float64(nsecs) return float64(nsecs)
} }

500
lib/logstorage/pipe_top.go Normal file
View file

@ -0,0 +1,500 @@
package logstorage
import (
"fmt"
"slices"
"sort"
"strings"
"sync/atomic"
"unsafe"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
)
// pipeTopDefaultLimit is the default number of entries pipeTop returns.
const pipeTopDefaultLimit = 10
// pipeTop processes '| top ...' queries.
//
// See https://docs.victoriametrics.com/victorialogs/logsql/#top-pipe
type pipeTop struct {
// fields contains field names for returning top values for.
byFields []string
// limit is the number of top (byFields) sets to return.
limit uint64
// limitStr is string representation of the limit.
limitStr string
// if hitsFieldName isn't empty, then the number of hits per each unique value is returned in this field.
hitsFieldName string
}
func (pt *pipeTop) String() string {
s := "top"
if pt.limit != pipeTopDefaultLimit {
s += " " + pt.limitStr
}
if len(pt.byFields) > 0 {
s += " by (" + fieldNamesString(pt.byFields) + ")"
}
return s
}
func (pt *pipeTop) updateNeededFields(neededFields, unneededFields fieldsSet) {
neededFields.reset()
unneededFields.reset()
if len(pt.byFields) == 0 {
neededFields.add("*")
} else {
neededFields.addFields(pt.byFields)
}
}
func (pt *pipeTop) optimize() {
// nothing to do
}
func (pt *pipeTop) hasFilterInWithQuery() bool {
return false
}
func (pt *pipeTop) initFilterInValues(_ map[string][]string, _ getFieldValuesFunc) (pipe, error) {
return pt, nil
}
func (pt *pipeTop) newPipeProcessor(workersCount int, stopCh <-chan struct{}, cancel func(), ppNext pipeProcessor) pipeProcessor {
maxStateSize := int64(float64(memory.Allowed()) * 0.2)
shards := make([]pipeTopProcessorShard, workersCount)
for i := range shards {
shards[i] = pipeTopProcessorShard{
pipeTopProcessorShardNopad: pipeTopProcessorShardNopad{
pt: pt,
stateSizeBudget: stateSizeBudgetChunk,
},
}
maxStateSize -= stateSizeBudgetChunk
}
ptp := &pipeTopProcessor{
pt: pt,
stopCh: stopCh,
cancel: cancel,
ppNext: ppNext,
shards: shards,
maxStateSize: maxStateSize,
}
ptp.stateSizeBudget.Store(maxStateSize)
return ptp
}
type pipeTopProcessor struct {
pt *pipeTop
stopCh <-chan struct{}
cancel func()
ppNext pipeProcessor
shards []pipeTopProcessorShard
maxStateSize int64
stateSizeBudget atomic.Int64
}
type pipeTopProcessorShard struct {
pipeTopProcessorShardNopad
// The padding prevents false sharing on widespread platforms with 128 mod (cache line size) = 0 .
_ [128 - unsafe.Sizeof(pipeTopProcessorShardNopad{})%128]byte
}
type pipeTopProcessorShardNopad struct {
// pt points to the parent pipeTop.
pt *pipeTop
// m holds per-row hits.
m map[string]*uint64
// keyBuf is a temporary buffer for building keys for m.
keyBuf []byte
// columnValues is a temporary buffer for the processed column values.
columnValues [][]string
// stateSizeBudget is the remaining budget for the whole state size for the shard.
// The per-shard budget is provided in chunks from the parent pipeTopProcessor.
stateSizeBudget int
}
// writeBlock writes br to shard.
func (shard *pipeTopProcessorShard) writeBlock(br *blockResult) {
byFields := shard.pt.byFields
if len(byFields) == 0 {
// Take into account all the columns in br.
keyBuf := shard.keyBuf
cs := br.getColumns()
for i := range br.timestamps {
keyBuf = keyBuf[:0]
for _, c := range cs {
v := c.getValueAtRow(br, i)
keyBuf = encoding.MarshalBytes(keyBuf, bytesutil.ToUnsafeBytes(c.name))
keyBuf = encoding.MarshalBytes(keyBuf, bytesutil.ToUnsafeBytes(v))
}
shard.updateState(bytesutil.ToUnsafeString(keyBuf), 1)
}
shard.keyBuf = keyBuf
return
}
if len(byFields) == 1 {
// Fast path for a single field.
c := br.getColumnByName(byFields[0])
if c.isConst {
v := c.valuesEncoded[0]
shard.updateState(v, uint64(len(br.timestamps)))
return
}
if c.valueType == valueTypeDict {
a := encoding.GetUint64s(len(c.dictValues))
hits := a.A
valuesEncoded := c.getValuesEncoded(br)
for _, v := range valuesEncoded {
idx := unmarshalUint8(v)
hits[idx]++
}
for i, v := range c.dictValues {
shard.updateState(v, hits[i])
}
encoding.PutUint64s(a)
return
}
values := c.getValues(br)
for _, v := range values {
shard.updateState(v, 1)
}
return
}
// Take into account only the selected columns.
columnValues := shard.columnValues[:0]
for _, f := range byFields {
c := br.getColumnByName(f)
values := c.getValues(br)
columnValues = append(columnValues, values)
}
shard.columnValues = columnValues
keyBuf := shard.keyBuf
for i := range br.timestamps {
keyBuf = keyBuf[:0]
for _, values := range columnValues {
keyBuf = encoding.MarshalBytes(keyBuf, bytesutil.ToUnsafeBytes(values[i]))
}
shard.updateState(bytesutil.ToUnsafeString(keyBuf), 1)
}
shard.keyBuf = keyBuf
}
func (shard *pipeTopProcessorShard) updateState(v string, hits uint64) {
m := shard.getM()
pHits, ok := m[v]
if !ok {
vCopy := strings.Clone(v)
hits := uint64(0)
pHits = &hits
m[vCopy] = pHits
shard.stateSizeBudget -= len(vCopy) + int(unsafe.Sizeof(vCopy)+unsafe.Sizeof(hits)+unsafe.Sizeof(pHits))
}
*pHits += hits
}
func (shard *pipeTopProcessorShard) getM() map[string]*uint64 {
if shard.m == nil {
shard.m = make(map[string]*uint64)
}
return shard.m
}
func (ptp *pipeTopProcessor) writeBlock(workerID uint, br *blockResult) {
if len(br.timestamps) == 0 {
return
}
shard := &ptp.shards[workerID]
for shard.stateSizeBudget < 0 {
// steal some budget for the state size from the global budget.
remaining := ptp.stateSizeBudget.Add(-stateSizeBudgetChunk)
if remaining < 0 {
// The state size is too big. Stop processing data in order to avoid OOM crash.
if remaining+stateSizeBudgetChunk >= 0 {
// Notify worker goroutines to stop calling writeBlock() in order to save CPU time.
ptp.cancel()
}
return
}
shard.stateSizeBudget += stateSizeBudgetChunk
}
shard.writeBlock(br)
}
func (ptp *pipeTopProcessor) flush() error {
if n := ptp.stateSizeBudget.Load(); n <= 0 {
return fmt.Errorf("cannot calculate [%s], since it requires more than %dMB of memory", ptp.pt.String(), ptp.maxStateSize/(1<<20))
}
// merge state across shards
shards := ptp.shards
m := shards[0].getM()
shards = shards[1:]
for i := range shards {
if needStop(ptp.stopCh) {
return nil
}
for k, pHitsSrc := range shards[i].getM() {
pHits, ok := m[k]
if !ok {
m[k] = pHitsSrc
} else {
*pHits += *pHitsSrc
}
}
}
// select top entries with the biggest number of hits
entries := make([]pipeTopEntry, 0, len(m))
for k, pHits := range m {
entries = append(entries, pipeTopEntry{
k: k,
hits: *pHits,
})
}
sort.Slice(entries, func(i, j int) bool {
a, b := &entries[i], &entries[j]
if a.hits == b.hits {
return a.k < b.k
}
return a.hits > b.hits
})
if uint64(len(entries)) > ptp.pt.limit {
entries = entries[:ptp.pt.limit]
}
// write result
wctx := &pipeTopWriteContext{
ptp: ptp,
}
byFields := ptp.pt.byFields
var rowFields []Field
addHitsField := func(dst []Field, hits uint64) []Field {
hitsStr := string(marshalUint64String(nil, hits))
dst = append(dst, Field{
Name: ptp.pt.hitsFieldName,
Value: hitsStr,
})
return dst
}
if len(byFields) == 0 {
for _, e := range entries {
if needStop(ptp.stopCh) {
return nil
}
rowFields = rowFields[:0]
keyBuf := bytesutil.ToUnsafeBytes(e.k)
for len(keyBuf) > 0 {
name, nSize := encoding.UnmarshalBytes(keyBuf)
if nSize <= 0 {
logger.Panicf("BUG: cannot unmarshal field name")
}
keyBuf = keyBuf[nSize:]
value, nSize := encoding.UnmarshalBytes(keyBuf)
if nSize <= 0 {
logger.Panicf("BUG: cannot unmarshal field value")
}
keyBuf = keyBuf[nSize:]
rowFields = append(rowFields, Field{
Name: bytesutil.ToUnsafeString(name),
Value: bytesutil.ToUnsafeString(value),
})
}
rowFields = addHitsField(rowFields, e.hits)
wctx.writeRow(rowFields)
}
} else if len(byFields) == 1 {
fieldName := byFields[0]
for _, e := range entries {
if needStop(ptp.stopCh) {
return nil
}
rowFields = append(rowFields[:0], Field{
Name: fieldName,
Value: e.k,
})
rowFields = addHitsField(rowFields, e.hits)
wctx.writeRow(rowFields)
}
} else {
for _, e := range entries {
if needStop(ptp.stopCh) {
return nil
}
rowFields = rowFields[:0]
keyBuf := bytesutil.ToUnsafeBytes(e.k)
fieldIdx := 0
for len(keyBuf) > 0 {
value, nSize := encoding.UnmarshalBytes(keyBuf)
if nSize <= 0 {
logger.Panicf("BUG: cannot unmarshal field value")
}
keyBuf = keyBuf[nSize:]
rowFields = append(rowFields, Field{
Name: byFields[fieldIdx],
Value: bytesutil.ToUnsafeString(value),
})
fieldIdx++
}
rowFields = addHitsField(rowFields, e.hits)
wctx.writeRow(rowFields)
}
}
wctx.flush()
return nil
}
type pipeTopEntry struct {
k string
hits uint64
}
type pipeTopWriteContext struct {
ptp *pipeTopProcessor
rcs []resultColumn
br blockResult
// rowsCount is the number of rows in the current block
rowsCount int
// valuesLen is the total length of values in the current block
valuesLen int
}
func (wctx *pipeTopWriteContext) writeRow(rowFields []Field) {
rcs := wctx.rcs
areEqualColumns := len(rcs) == len(rowFields)
if areEqualColumns {
for i, f := range rowFields {
if rcs[i].name != f.Name {
areEqualColumns = false
break
}
}
}
if !areEqualColumns {
// send the current block to ppNext and construct a block with new set of columns
wctx.flush()
rcs = wctx.rcs[:0]
for _, f := range rowFields {
rcs = appendResultColumnWithName(rcs, f.Name)
}
wctx.rcs = rcs
}
for i, f := range rowFields {
v := f.Value
rcs[i].addValue(v)
wctx.valuesLen += len(v)
}
wctx.rowsCount++
if wctx.valuesLen >= 1_000_000 {
wctx.flush()
}
}
func (wctx *pipeTopWriteContext) flush() {
rcs := wctx.rcs
br := &wctx.br
wctx.valuesLen = 0
// Flush rcs to ppNext
br.setResultColumns(rcs, wctx.rowsCount)
wctx.rowsCount = 0
wctx.ptp.ppNext.writeBlock(0, br)
br.reset()
for i := range rcs {
rcs[i].resetValues()
}
}
func parsePipeTop(lex *lexer) (*pipeTop, error) {
if !lex.isKeyword("top") {
return nil, fmt.Errorf("expecting 'top'; got %q", lex.token)
}
lex.nextToken()
limit := uint64(pipeTopDefaultLimit)
limitStr := ""
if isNumberPrefix(lex.token) {
limitF, s, err := parseNumber(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse N in 'top': %w", err)
}
if limitF < 1 {
return nil, fmt.Errorf("N in 'top %s' must be integer bigger than 0", s)
}
limit = uint64(limitF)
limitStr = s
}
var byFields []string
if lex.isKeyword("by", "(") {
if lex.isKeyword("by") {
lex.nextToken()
}
bfs, err := parseFieldNamesInParens(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse 'by' clause in 'top': %w", err)
}
if slices.Contains(bfs, "*") {
bfs = nil
}
byFields = bfs
}
hitsFieldName := "hits"
for slices.Contains(byFields, hitsFieldName) {
hitsFieldName += "s"
}
pt := &pipeTop{
byFields: byFields,
limit: limit,
limitStr: limitStr,
hitsFieldName: hitsFieldName,
}
return pt, nil
}

View file

@ -0,0 +1,313 @@
package logstorage
import (
"testing"
)
func TestParsePipeTopSuccess(t *testing.T) {
f := func(pipeStr string) {
t.Helper()
expectParsePipeSuccess(t, pipeStr)
}
f(`top`)
f(`top 5`)
f(`top by (x)`)
f(`top 5 by (x)`)
f(`top by (x, y)`)
f(`top 5 by (x, y)`)
}
func TestParsePipeTopFailure(t *testing.T) {
f := func(pipeStr string) {
t.Helper()
expectParsePipeFailure(t, pipeStr)
}
f(`top 5 foo`)
f(`top 5 by`)
f(`top 5 by (`)
f(`top 5foo`)
f(`top foo`)
f(`top by`)
}
func TestPipeTop(t *testing.T) {
f := func(pipeStr string, rows, rowsExpected [][]Field) {
t.Helper()
expectPipeResults(t, pipeStr, rows, rowsExpected)
}
f("top", [][]Field{
{
{"a", `2`},
{"b", `3`},
},
{
{"a", "2"},
{"b", "3"},
},
{
{"a", `2`},
{"b", `54`},
{"c", "d"},
},
}, [][]Field{
{
{"a", "2"},
{"b", "3"},
{"hits", "2"},
},
{
{"a", `2`},
{"b", `54`},
{"c", "d"},
{"hits", "1"},
},
})
f("top 1", [][]Field{
{
{"a", `2`},
{"b", `3`},
},
{
{"a", "2"},
{"b", "3"},
},
{
{"a", `2`},
{"b", `54`},
{"c", "d"},
},
}, [][]Field{
{
{"a", "2"},
{"b", "3"},
{"hits", "2"},
},
})
f("top by (a)", [][]Field{
{
{"a", `2`},
{"b", `3`},
},
{
{"a", "2"},
{"b", "3"},
},
{
{"a", `2`},
{"b", `54`},
{"c", "d"},
},
}, [][]Field{
{
{"a", "2"},
{"hits", "3"},
},
})
f("top by (b)", [][]Field{
{
{"a", `2`},
{"b", `3`},
},
{
{"a", "2"},
{"b", "3"},
},
{
{"a", `2`},
{"b", `54`},
{"c", "d"},
},
}, [][]Field{
{
{"b", "3"},
{"hits", "2"},
},
{
{"b", "54"},
{"hits", "1"},
},
})
f("top by (hits)", [][]Field{
{
{"a", `2`},
{"hits", `3`},
},
{
{"a", "2"},
{"hits", "3"},
},
{
{"a", `2`},
{"hits", `54`},
{"c", "d"},
},
}, [][]Field{
{
{"hits", "3"},
{"hitss", "2"},
},
{
{"hits", "54"},
{"hitss", "1"},
},
})
f("top by (c)", [][]Field{
{
{"a", `2`},
{"b", `3`},
},
{
{"a", "2"},
{"b", "3"},
},
{
{"a", `2`},
{"b", `54`},
{"c", "d"},
},
}, [][]Field{
{
{"c", ""},
{"hits", "2"},
},
{
{"c", "d"},
{"hits", "1"},
},
})
f("top by (d)", [][]Field{
{
{"a", `2`},
{"b", `3`},
},
{
{"a", "2"},
{"b", "3"},
},
{
{"a", `2`},
{"b", `54`},
{"c", "d"},
},
}, [][]Field{
{
{"d", ""},
{"hits", "3"},
},
})
f("top by (a, b)", [][]Field{
{
{"a", `2`},
{"b", `3`},
},
{
{"a", "2"},
{"b", "3"},
},
{
{"a", `2`},
{"b", `54`},
{"c", "d"},
},
}, [][]Field{
{
{"a", "2"},
{"b", "3"},
{"hits", "2"},
},
{
{"a", "2"},
{"b", "54"},
{"hits", "1"},
},
})
f("top 10 by (a, b)", [][]Field{
{
{"a", `2`},
{"b", `3`},
},
{
{"a", "2"},
{"b", "3"},
},
{
{"a", `2`},
{"b", `54`},
{"c", "d"},
},
}, [][]Field{
{
{"a", "2"},
{"b", "3"},
{"hits", "2"},
},
{
{"a", "2"},
{"b", "54"},
{"hits", "1"},
},
})
f("top 1 by (a, b)", [][]Field{
{
{"a", `2`},
{"b", `3`},
},
{
{"a", "2"},
{"b", "3"},
},
{
{"a", `2`},
{"b", `54`},
{"c", "d"},
},
}, [][]Field{
{
{"a", "2"},
{"b", "3"},
{"hits", "2"},
},
})
}
func TestPipeTopUpdateNeededFields(t *testing.T) {
f := func(s, neededFields, unneededFields, neededFieldsExpected, unneededFieldsExpected string) {
t.Helper()
expectPipeNeededFields(t, s, neededFields, unneededFields, neededFieldsExpected, unneededFieldsExpected)
}
// all the needed fields
f("top", "*", "", "*", "")
f("top by()", "*", "", "*", "")
f("top by(*)", "*", "", "*", "")
f("top by(f1,f2)", "*", "", "f1,f2", "")
f("top by(f1,f2)", "*", "", "f1,f2", "")
// all the needed fields, unneeded fields do not intersect with src
f("top by(s1, s2)", "*", "f1,f2", "s1,s2", "")
f("top", "*", "f1,f2", "*", "")
// all the needed fields, unneeded fields intersect with src
f("top by(s1, s2)", "*", "s1,f1,f2", "s1,s2", "")
f("top by(*)", "*", "s1,f1,f2", "*", "")
f("top by(s1, s2)", "*", "s1,s2,f1", "s1,s2", "")
// needed fields do not intersect with src
f("top by (s1, s2)", "f1,f2", "", "s1,s2", "")
// needed fields intersect with src
f("top by (s1, s2)", "s1,f1,f2", "", "s1,s2", "")
f("top by (*)", "s1,f1,f2", "", "*", "")
}

View file

@ -13,6 +13,10 @@ type pipeUnpackSyslog struct {
// fromField is the field to unpack syslog fields from // fromField is the field to unpack syslog fields from
fromField string fromField string
// the timezone to use when parsing rfc3164 timestamps
offsetStr string
offsetTimezone *time.Location
// resultPrefix is prefix to add to unpacked field names // resultPrefix is prefix to add to unpacked field names
resultPrefix string resultPrefix string
@ -30,6 +34,9 @@ func (pu *pipeUnpackSyslog) String() string {
if !isMsgFieldName(pu.fromField) { if !isMsgFieldName(pu.fromField) {
s += " from " + quoteTokenIfNeeded(pu.fromField) s += " from " + quoteTokenIfNeeded(pu.fromField)
} }
if pu.offsetStr != "" {
s += " offset " + pu.offsetStr
}
if pu.resultPrefix != "" { if pu.resultPrefix != "" {
s += " result_prefix " + quoteTokenIfNeeded(pu.resultPrefix) s += " result_prefix " + quoteTokenIfNeeded(pu.resultPrefix)
} }
@ -64,14 +71,14 @@ func (pu *pipeUnpackSyslog) initFilterInValues(cache map[string][]string, getFie
func (pu *pipeUnpackSyslog) newPipeProcessor(workersCount int, _ <-chan struct{}, _ func(), ppNext pipeProcessor) pipeProcessor { func (pu *pipeUnpackSyslog) newPipeProcessor(workersCount int, _ <-chan struct{}, _ func(), ppNext pipeProcessor) pipeProcessor {
unpackSyslog := func(uctx *fieldsUnpackerContext, s string) { unpackSyslog := func(uctx *fieldsUnpackerContext, s string) {
year := currentYear.Load() year := currentYear.Load()
p := getSyslogParser(int(year)) p := GetSyslogParser(int(year), pu.offsetTimezone)
p.parse(s) p.Parse(s)
for _, f := range p.fields { for _, f := range p.Fields {
uctx.addField(f.Name, f.Value) uctx.addField(f.Name, f.Value)
} }
putSyslogParser(p) PutSyslogParser(p)
} }
return newPipeUnpackProcessor(workersCount, unpackSyslog, ppNext, pu.fromField, pu.resultPrefix, pu.keepOriginalFields, false, pu.iff) return newPipeUnpackProcessor(workersCount, unpackSyslog, ppNext, pu.fromField, pu.resultPrefix, pu.keepOriginalFields, false, pu.iff)
@ -119,6 +126,23 @@ func parsePipeUnpackSyslog(lex *lexer) (*pipeUnpackSyslog, error) {
fromField = f fromField = f
} }
offsetStr := ""
offsetTimezone := time.Local
if lex.isKeyword("offset") {
lex.nextToken()
s, err := getCompoundToken(lex)
if err != nil {
return nil, fmt.Errorf("cannot read 'offset': %w", err)
}
offsetStr = s
nsecs, ok := tryParseDuration(offsetStr)
if !ok {
return nil, fmt.Errorf("cannot parse 'offset' from %q", offsetStr)
}
secs := nsecs / nsecsPerSecond
offsetTimezone = time.FixedZone("custom", int(secs))
}
resultPrefix := "" resultPrefix := ""
if lex.isKeyword("result_prefix") { if lex.isKeyword("result_prefix") {
lex.nextToken() lex.nextToken()
@ -137,6 +161,8 @@ func parsePipeUnpackSyslog(lex *lexer) (*pipeUnpackSyslog, error) {
pu := &pipeUnpackSyslog{ pu := &pipeUnpackSyslog{
fromField: fromField, fromField: fromField,
offsetStr: offsetStr,
offsetTimezone: offsetTimezone,
resultPrefix: resultPrefix, resultPrefix: resultPrefix,
keepOriginalFields: keepOriginalFields, keepOriginalFields: keepOriginalFields,
iff: iff, iff: iff,

View file

@ -11,16 +11,22 @@ func TestParsePipeUnpackSyslogSuccess(t *testing.T) {
} }
f(`unpack_syslog`) f(`unpack_syslog`)
f(`unpack_syslog offset 6h30m`)
f(`unpack_syslog offset -6h30m`)
f(`unpack_syslog keep_original_fields`) f(`unpack_syslog keep_original_fields`)
f(`unpack_syslog offset -6h30m keep_original_fields`)
f(`unpack_syslog if (a:x)`) f(`unpack_syslog if (a:x)`)
f(`unpack_syslog if (a:x) keep_original_fields`) f(`unpack_syslog if (a:x) keep_original_fields`)
f(`unpack_syslog if (a:x) offset 2h keep_original_fields`)
f(`unpack_syslog from x`) f(`unpack_syslog from x`)
f(`unpack_syslog from x keep_original_fields`) f(`unpack_syslog from x keep_original_fields`)
f(`unpack_syslog if (a:x) from x`) f(`unpack_syslog if (a:x) from x`)
f(`unpack_syslog from x result_prefix abc`) f(`unpack_syslog from x result_prefix abc`)
f(`unpack_syslog from x offset 2h30m result_prefix abc`)
f(`unpack_syslog if (a:x) from x result_prefix abc`) f(`unpack_syslog if (a:x) from x result_prefix abc`)
f(`unpack_syslog result_prefix abc`) f(`unpack_syslog result_prefix abc`)
f(`unpack_syslog if (a:x) result_prefix abc`) f(`unpack_syslog if (a:x) result_prefix abc`)
f(`unpack_syslog if (a:x) offset -1h result_prefix abc`)
} }
func TestParsePipeUnpackSyslogFailure(t *testing.T) { func TestParsePipeUnpackSyslogFailure(t *testing.T) {
@ -31,6 +37,7 @@ func TestParsePipeUnpackSyslogFailure(t *testing.T) {
f(`unpack_syslog foo`) f(`unpack_syslog foo`)
f(`unpack_syslog if`) f(`unpack_syslog if`)
f(`unpack_syslog offset`)
f(`unpack_syslog if (x:y) foobar`) f(`unpack_syslog if (x:y) foobar`)
f(`unpack_syslog from`) f(`unpack_syslog from`)
f(`unpack_syslog from x y`) f(`unpack_syslog from x y`)
@ -118,6 +125,27 @@ func TestPipeUnpackSyslog(t *testing.T) {
}, },
}) })
// offset should be ignored when parsing non-rfc3164 messages
f("unpack_syslog from x offset 2h30m", [][]Field{
{
{"x", `<165>1 2023-06-03T17:42:32.123456789Z mymachine.example.com appname 12345 ID47 - This is a test message with structured data`},
},
}, [][]Field{
{
{"x", `<165>1 2023-06-03T17:42:32.123456789Z mymachine.example.com appname 12345 ID47 - This is a test message with structured data`},
{"priority", "165"},
{"facility", "20"},
{"severity", "5"},
{"format", "rfc5424"},
{"timestamp", "2023-06-03T17:42:32.123456789Z"},
{"hostname", "mymachine.example.com"},
{"app_name", "appname"},
{"proc_id", "12345"},
{"msg_id", "ID47"},
{"message", "This is a test message with structured data"},
},
})
// failed if condition // failed if condition
f("unpack_syslog if (foo:bar)", [][]Field{ f("unpack_syslog if (foo:bar)", [][]Field{
{ {

View file

@ -58,7 +58,11 @@ func (f *Field) unmarshal(a *arena, src []byte) ([]byte, error) {
} }
func (f *Field) marshalToJSON(dst []byte) []byte { func (f *Field) marshalToJSON(dst []byte) []byte {
dst = strconv.AppendQuote(dst, f.Name) name := f.Name
if name == "" {
name = "_msg"
}
dst = strconv.AppendQuote(dst, name)
dst = append(dst, ':') dst = append(dst, ':')
dst = strconv.AppendQuote(dst, f.Value) dst = strconv.AppendQuote(dst, f.Value)
return dst return dst
@ -84,6 +88,20 @@ func needLogfmtQuoting(s string) bool {
return false return false
} }
// RenameField renames field with the oldName to newName in Fields
func RenameField(fields []Field, oldName, newName string) {
if oldName == "" {
return
}
for i := range fields {
f := &fields[i]
if f.Name == oldName {
f.Name = newName
return
}
}
}
// MarshalFieldsToJSON appends JSON-marshaled fields to dst and returns the result. // MarshalFieldsToJSON appends JSON-marshaled fields to dst and returns the result.
func MarshalFieldsToJSON(dst []byte, fields []Field) []byte { func MarshalFieldsToJSON(dst []byte, fields []Field) []byte {
dst = append(dst, '{') dst = append(dst, '{')

View file

@ -10,50 +10,78 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
) )
func getSyslogParser(currentYear int) *syslogParser { // GetSyslogParser returns syslog parser from the pool.
//
// currentYear must contain the current year. It is used for properly setting timestamp
// field for rfc3164 format, which doesn't contain year.
//
// the timezone is used for rfc3164 format for setting the desired timezone.
//
// Return back the parser to the pool by calling PutSyslogParser when it is no longer needed.
func GetSyslogParser(currentYear int, timezone *time.Location) *SyslogParser {
v := syslogParserPool.Get() v := syslogParserPool.Get()
if v == nil { if v == nil {
v = &syslogParser{} v = &SyslogParser{}
} }
p := v.(*syslogParser) p := v.(*SyslogParser)
p.currentYear = currentYear p.currentYear = currentYear
p.timezone = timezone
return p return p
} }
func putSyslogParser(p *syslogParser) { // PutSyslogParser returns back syslog parser to the pool.
//
// p cannot be used after returning to the pool.
func PutSyslogParser(p *SyslogParser) {
p.reset() p.reset()
syslogParserPool.Put(p) syslogParserPool.Put(p)
} }
var syslogParserPool sync.Pool var syslogParserPool sync.Pool
type syslogParser struct { // SyslogParser is parser for syslog messages.
currentYear int //
// It understands the following syslog formats:
//
// - https://datatracker.ietf.org/doc/html/rfc5424
// - https://datatracker.ietf.org/doc/html/rfc3164
//
// It extracts the following list of syslog message fields into Fields -
// https://docs.victoriametrics.com/victorialogs/logsql/#unpack_syslog-pipe
type SyslogParser struct {
// Fields contains parsed fields after Parse call.
Fields []Field
buf []byte buf []byte
fields []Field
currentYear int
timezone *time.Location
} }
func (p *syslogParser) reset() { func (p *SyslogParser) reset() {
p.currentYear = 0 p.currentYear = 0
p.timezone = nil
p.resetFields() p.resetFields()
} }
func (p *syslogParser) resetFields() { func (p *SyslogParser) resetFields() {
p.buf = p.buf[:0] clear(p.Fields)
p.Fields = p.Fields[:0]
clear(p.fields) p.buf = p.buf[:0]
p.fields = p.fields[:0]
} }
func (p *syslogParser) addField(name, value string) { func (p *SyslogParser) addField(name, value string) {
p.fields = append(p.fields, Field{ p.Fields = append(p.Fields, Field{
Name: name, Name: name,
Value: value, Value: value,
}) })
} }
func (p *syslogParser) parse(s string) { // Parse parses syslog message from s into p.Fields.
//
// p.Fields is valid until s is modified or p state is changed.
func (p *SyslogParser) Parse(s string) {
p.resetFields() p.resetFields()
if len(s) == 0 { if len(s) == 0 {
@ -96,7 +124,7 @@ func (p *syslogParser) parse(s string) {
p.parseNoHeader(s) p.parseNoHeader(s)
} }
func (p *syslogParser) parseNoHeader(s string) { func (p *SyslogParser) parseNoHeader(s string) {
if len(s) == 0 { if len(s) == 0 {
return return
} }
@ -107,7 +135,7 @@ func (p *syslogParser) parseNoHeader(s string) {
} }
} }
func (p *syslogParser) parseRFC5424(s string) { func (p *SyslogParser) parseRFC5424(s string) {
// See https://datatracker.ietf.org/doc/html/rfc5424 // See https://datatracker.ietf.org/doc/html/rfc5424
p.addField("format", "rfc5424") p.addField("format", "rfc5424")
@ -172,7 +200,7 @@ func (p *syslogParser) parseRFC5424(s string) {
p.addField("message", s) p.addField("message", s)
} }
func (p *syslogParser) parseRFC5424SD(s string) (string, bool) { func (p *SyslogParser) parseRFC5424SD(s string) (string, bool) {
if strings.HasPrefix(s, "- ") { if strings.HasPrefix(s, "- ") {
return s[2:], true return s[2:], true
} }
@ -190,7 +218,7 @@ func (p *syslogParser) parseRFC5424SD(s string) (string, bool) {
} }
} }
func (p *syslogParser) parseRFC5424SDLine(s string) (string, bool) { func (p *SyslogParser) parseRFC5424SDLine(s string) (string, bool) {
if len(s) == 0 || s[0] != '[' { if len(s) == 0 || s[0] != '[' {
return s, false return s, false
} }
@ -236,7 +264,7 @@ func (p *syslogParser) parseRFC5424SDLine(s string) (string, bool) {
return s, true return s, true
} }
func (p *syslogParser) parseRFC3164(s string) { func (p *SyslogParser) parseRFC3164(s string) {
// See https://datatracker.ietf.org/doc/html/rfc3164 // See https://datatracker.ietf.org/doc/html/rfc3164
p.addField("format", "rfc3164") p.addField("format", "rfc3164")
@ -257,10 +285,10 @@ func (p *syslogParser) parseRFC3164(s string) {
s = s[n:] s = s[n:]
t = t.UTC() t = t.UTC()
t = time.Date(p.currentYear, t.Month(), t.Day(), t.Hour(), t.Minute(), t.Second(), t.Nanosecond(), time.UTC) t = time.Date(p.currentYear, t.Month(), t.Day(), t.Hour(), t.Minute(), t.Second(), t.Nanosecond(), p.timezone)
if uint64(t.Unix())-24*3600 > fasttime.UnixTimestamp() { if uint64(t.Unix())-24*3600 > fasttime.UnixTimestamp() {
// Adjust time to the previous year // Adjust time to the previous year
t = time.Date(t.Year()-1, t.Month(), t.Day(), t.Hour(), t.Minute(), t.Second(), t.Nanosecond(), time.UTC) t = time.Date(t.Year()-1, t.Month(), t.Day(), t.Hour(), t.Minute(), t.Second(), t.Nanosecond(), p.timezone)
} }
bufLen := len(p.buf) bufLen := len(p.buf)

View file

@ -2,69 +2,70 @@ package logstorage
import ( import (
"testing" "testing"
"time"
) )
func TestSyslogParser(t *testing.T) { func TestSyslogParser(t *testing.T) {
f := func(s, resultExpected string) { f := func(s string, timezone *time.Location, resultExpected string) {
t.Helper() t.Helper()
const currentYear = 2024 const currentYear = 2024
p := getSyslogParser(currentYear) p := GetSyslogParser(currentYear, timezone)
defer putSyslogParser(p) defer PutSyslogParser(p)
p.parse(s) p.Parse(s)
result := MarshalFieldsToJSON(nil, p.fields) result := MarshalFieldsToJSON(nil, p.Fields)
if string(result) != resultExpected { if string(result) != resultExpected {
t.Fatalf("unexpected result when parsing [%s]; got\n%s\nwant\n%s\n", s, result, resultExpected) t.Fatalf("unexpected result when parsing [%s]; got\n%s\nwant\n%s\n", s, result, resultExpected)
} }
} }
// RFC 3164 // RFC 3164
f("Jun 3 12:08:33 abcd systemd[1]: Starting Update the local ESM caches...", f("Jun 3 12:08:33 abcd systemd[1]: Starting Update the local ESM caches...", time.UTC,
`{"format":"rfc3164","timestamp":"2024-06-03T12:08:33.000Z","hostname":"abcd","app_name":"systemd","proc_id":"1","message":"Starting Update the local ESM caches..."}`) `{"format":"rfc3164","timestamp":"2024-06-03T12:08:33.000Z","hostname":"abcd","app_name":"systemd","proc_id":"1","message":"Starting Update the local ESM caches..."}`)
f("<165>Jun 3 12:08:33 abcd systemd[1]: Starting Update the local ESM caches...", f("<165>Jun 3 12:08:33 abcd systemd[1]: Starting Update the local ESM caches...", time.UTC,
`{"priority":"165","facility":"20","severity":"5","format":"rfc3164","timestamp":"2024-06-03T12:08:33.000Z","hostname":"abcd","app_name":"systemd","proc_id":"1","message":"Starting Update the local ESM caches..."}`) `{"priority":"165","facility":"20","severity":"5","format":"rfc3164","timestamp":"2024-06-03T12:08:33.000Z","hostname":"abcd","app_name":"systemd","proc_id":"1","message":"Starting Update the local ESM caches..."}`)
f("Mar 13 12:08:33 abcd systemd: Starting Update the local ESM caches...", f("Mar 13 12:08:33 abcd systemd: Starting Update the local ESM caches...", time.UTC,
`{"format":"rfc3164","timestamp":"2024-03-13T12:08:33.000Z","hostname":"abcd","app_name":"systemd","message":"Starting Update the local ESM caches..."}`) `{"format":"rfc3164","timestamp":"2024-03-13T12:08:33.000Z","hostname":"abcd","app_name":"systemd","message":"Starting Update the local ESM caches..."}`)
f("Jun 3 12:08:33 abcd - Starting Update the local ESM caches...", f("Jun 3 12:08:33 abcd - Starting Update the local ESM caches...", time.UTC,
`{"format":"rfc3164","timestamp":"2024-06-03T12:08:33.000Z","hostname":"abcd","app_name":"-","message":"Starting Update the local ESM caches..."}`) `{"format":"rfc3164","timestamp":"2024-06-03T12:08:33.000Z","hostname":"abcd","app_name":"-","message":"Starting Update the local ESM caches..."}`)
f("Jun 3 12:08:33 - - Starting Update the local ESM caches...", f("Jun 3 12:08:33 - - Starting Update the local ESM caches...", time.UTC,
`{"format":"rfc3164","timestamp":"2024-06-03T12:08:33.000Z","hostname":"-","app_name":"-","message":"Starting Update the local ESM caches..."}`) `{"format":"rfc3164","timestamp":"2024-06-03T12:08:33.000Z","hostname":"-","app_name":"-","message":"Starting Update the local ESM caches..."}`)
// RFC 5424 // RFC 5424
f(`<165>1 2023-06-03T17:42:32.123456789Z mymachine.example.com appname 12345 ID47 - This is a test message with structured data.`, f(`<165>1 2023-06-03T17:42:32.123456789Z mymachine.example.com appname 12345 ID47 - This is a test message with structured data.`, time.UTC,
`{"priority":"165","facility":"20","severity":"5","format":"rfc5424","timestamp":"2023-06-03T17:42:32.123456789Z","hostname":"mymachine.example.com","app_name":"appname","proc_id":"12345","msg_id":"ID47","message":"This is a test message with structured data."}`) `{"priority":"165","facility":"20","severity":"5","format":"rfc5424","timestamp":"2023-06-03T17:42:32.123456789Z","hostname":"mymachine.example.com","app_name":"appname","proc_id":"12345","msg_id":"ID47","message":"This is a test message with structured data."}`)
f(`1 2023-06-03T17:42:32.123456789Z mymachine.example.com appname 12345 ID47 - This is a test message with structured data.`, f(`1 2023-06-03T17:42:32.123456789Z mymachine.example.com appname 12345 ID47 - This is a test message with structured data.`, time.UTC,
`{"format":"rfc5424","timestamp":"2023-06-03T17:42:32.123456789Z","hostname":"mymachine.example.com","app_name":"appname","proc_id":"12345","msg_id":"ID47","message":"This is a test message with structured data."}`) `{"format":"rfc5424","timestamp":"2023-06-03T17:42:32.123456789Z","hostname":"mymachine.example.com","app_name":"appname","proc_id":"12345","msg_id":"ID47","message":"This is a test message with structured data."}`)
f(`<165>1 2023-06-03T17:42:00.000Z mymachine.example.com appname 12345 ID47 [exampleSDID@32473 iut="3" eventSource="Application 123 = ] 56" eventID="11211"] This is a test message with structured data.`, f(`<165>1 2023-06-03T17:42:00.000Z mymachine.example.com appname 12345 ID47 [exampleSDID@32473 iut="3" eventSource="Application 123 = ] 56" eventID="11211"] This is a test message with structured data.`, time.UTC,
`{"priority":"165","facility":"20","severity":"5","format":"rfc5424","timestamp":"2023-06-03T17:42:00.000Z","hostname":"mymachine.example.com","app_name":"appname","proc_id":"12345","msg_id":"ID47","exampleSDID@32473":"iut=\"3\" eventSource=\"Application 123 = ] 56\" eventID=\"11211\"","message":"This is a test message with structured data."}`) `{"priority":"165","facility":"20","severity":"5","format":"rfc5424","timestamp":"2023-06-03T17:42:00.000Z","hostname":"mymachine.example.com","app_name":"appname","proc_id":"12345","msg_id":"ID47","exampleSDID@32473":"iut=\"3\" eventSource=\"Application 123 = ] 56\" eventID=\"11211\"","message":"This is a test message with structured data."}`)
f(`<165>1 2023-06-03T17:42:00.000Z mymachine.example.com appname 12345 ID47 [foo@123 iut="3"][bar@456 eventID="11211"] This is a test message with structured data.`, f(`<165>1 2023-06-03T17:42:00.000Z mymachine.example.com appname 12345 ID47 [foo@123 iut="3"][bar@456 eventID="11211"] This is a test message with structured data.`, time.UTC,
`{"priority":"165","facility":"20","severity":"5","format":"rfc5424","timestamp":"2023-06-03T17:42:00.000Z","hostname":"mymachine.example.com","app_name":"appname","proc_id":"12345","msg_id":"ID47","foo@123":"iut=\"3\"","bar@456":"eventID=\"11211\"","message":"This is a test message with structured data."}`) `{"priority":"165","facility":"20","severity":"5","format":"rfc5424","timestamp":"2023-06-03T17:42:00.000Z","hostname":"mymachine.example.com","app_name":"appname","proc_id":"12345","msg_id":"ID47","foo@123":"iut=\"3\"","bar@456":"eventID=\"11211\"","message":"This is a test message with structured data."}`)
// Incomplete RFC 3164 // Incomplete RFC 3164
f("", `{}`) f("", time.UTC, `{}`)
f("Jun 3 12:08:33", `{"format":"rfc3164","timestamp":"2024-06-03T12:08:33.000Z"}`) f("Jun 3 12:08:33", time.UTC, `{"format":"rfc3164","timestamp":"2024-06-03T12:08:33.000Z"}`)
f("Foo 3 12:08:33", `{"format":"rfc3164","message":"Foo 3 12:08:33"}`) f("Foo 3 12:08:33", time.UTC, `{"format":"rfc3164","message":"Foo 3 12:08:33"}`)
f("Foo 3 12:08:33bar", `{"format":"rfc3164","message":"Foo 3 12:08:33bar"}`) f("Foo 3 12:08:33bar", time.UTC, `{"format":"rfc3164","message":"Foo 3 12:08:33bar"}`)
f("Jun 3 12:08:33 abcd", `{"format":"rfc3164","timestamp":"2024-06-03T12:08:33.000Z","hostname":"abcd"}`) f("Jun 3 12:08:33 abcd", time.UTC, `{"format":"rfc3164","timestamp":"2024-06-03T12:08:33.000Z","hostname":"abcd"}`)
f("Jun 3 12:08:33 abcd sudo", `{"format":"rfc3164","timestamp":"2024-06-03T12:08:33.000Z","hostname":"abcd","app_name":"sudo"}`) f("Jun 3 12:08:33 abcd sudo", time.UTC, `{"format":"rfc3164","timestamp":"2024-06-03T12:08:33.000Z","hostname":"abcd","app_name":"sudo"}`)
f("Jun 3 12:08:33 abcd sudo[123]", `{"format":"rfc3164","timestamp":"2024-06-03T12:08:33.000Z","hostname":"abcd","app_name":"sudo","proc_id":"123"}`) f("Jun 3 12:08:33 abcd sudo[123]", time.UTC, `{"format":"rfc3164","timestamp":"2024-06-03T12:08:33.000Z","hostname":"abcd","app_name":"sudo","proc_id":"123"}`)
f("Jun 3 12:08:33 abcd sudo foobar", `{"format":"rfc3164","timestamp":"2024-06-03T12:08:33.000Z","hostname":"abcd","app_name":"sudo","message":"foobar"}`) f("Jun 3 12:08:33 abcd sudo foobar", time.UTC, `{"format":"rfc3164","timestamp":"2024-06-03T12:08:33.000Z","hostname":"abcd","app_name":"sudo","message":"foobar"}`)
f(`foo bar baz`, `{"format":"rfc3164","message":"foo bar baz"}`) f(`foo bar baz`, time.UTC, `{"format":"rfc3164","message":"foo bar baz"}`)
// Incomplete RFC 5424 // Incomplete RFC 5424
f(`<165>1 2023-06-03T17:42:32.123456789Z mymachine.example.com appname 12345 ID47 [foo@123]`, f(`<165>1 2023-06-03T17:42:32.123456789Z mymachine.example.com appname 12345 ID47 [foo@123]`, time.UTC,
`{"priority":"165","facility":"20","severity":"5","format":"rfc5424","timestamp":"2023-06-03T17:42:32.123456789Z","hostname":"mymachine.example.com","app_name":"appname","proc_id":"12345","msg_id":"ID47","foo@123":""}`) `{"priority":"165","facility":"20","severity":"5","format":"rfc5424","timestamp":"2023-06-03T17:42:32.123456789Z","hostname":"mymachine.example.com","app_name":"appname","proc_id":"12345","msg_id":"ID47","foo@123":""}`)
f(`<165>1 2023-06-03T17:42:32.123456789Z mymachine.example.com appname 12345 ID47`, f(`<165>1 2023-06-03T17:42:32.123456789Z mymachine.example.com appname 12345 ID47`, time.UTC,
`{"priority":"165","facility":"20","severity":"5","format":"rfc5424","timestamp":"2023-06-03T17:42:32.123456789Z","hostname":"mymachine.example.com","app_name":"appname","proc_id":"12345","msg_id":"ID47"}`) `{"priority":"165","facility":"20","severity":"5","format":"rfc5424","timestamp":"2023-06-03T17:42:32.123456789Z","hostname":"mymachine.example.com","app_name":"appname","proc_id":"12345","msg_id":"ID47"}`)
f(`<165>1 2023-06-03T17:42:32.123456789Z mymachine.example.com appname 12345`, f(`<165>1 2023-06-03T17:42:32.123456789Z mymachine.example.com appname 12345`, time.UTC,
`{"priority":"165","facility":"20","severity":"5","format":"rfc5424","timestamp":"2023-06-03T17:42:32.123456789Z","hostname":"mymachine.example.com","app_name":"appname","proc_id":"12345"}`) `{"priority":"165","facility":"20","severity":"5","format":"rfc5424","timestamp":"2023-06-03T17:42:32.123456789Z","hostname":"mymachine.example.com","app_name":"appname","proc_id":"12345"}`)
f(`<165>1 2023-06-03T17:42:32.123456789Z mymachine.example.com appname`, f(`<165>1 2023-06-03T17:42:32.123456789Z mymachine.example.com appname`, time.UTC,
`{"priority":"165","facility":"20","severity":"5","format":"rfc5424","timestamp":"2023-06-03T17:42:32.123456789Z","hostname":"mymachine.example.com","app_name":"appname"}`) `{"priority":"165","facility":"20","severity":"5","format":"rfc5424","timestamp":"2023-06-03T17:42:32.123456789Z","hostname":"mymachine.example.com","app_name":"appname"}`)
f(`<165>1 2023-06-03T17:42:32.123456789Z mymachine.example.com`, f(`<165>1 2023-06-03T17:42:32.123456789Z mymachine.example.com`, time.UTC,
`{"priority":"165","facility":"20","severity":"5","format":"rfc5424","timestamp":"2023-06-03T17:42:32.123456789Z","hostname":"mymachine.example.com"}`) `{"priority":"165","facility":"20","severity":"5","format":"rfc5424","timestamp":"2023-06-03T17:42:32.123456789Z","hostname":"mymachine.example.com"}`)
f(`<165>1 2023-06-03T17:42:32.123456789Z`, f(`<165>1 2023-06-03T17:42:32.123456789Z`, time.UTC,
`{"priority":"165","facility":"20","severity":"5","format":"rfc5424","timestamp":"2023-06-03T17:42:32.123456789Z"}`) `{"priority":"165","facility":"20","severity":"5","format":"rfc5424","timestamp":"2023-06-03T17:42:32.123456789Z"}`)
f(`<165>1 `, f(`<165>1 `, time.UTC,
`{"priority":"165","facility":"20","severity":"5","format":"rfc5424"}`) `{"priority":"165","facility":"20","severity":"5","format":"rfc5424"}`)
} }

View file

@ -262,7 +262,7 @@ func tryTimestampISO8601Encoding(dstBuf []byte, dstValues, srcValues []string) (
a := u64s.A a := u64s.A
var minValue, maxValue int64 var minValue, maxValue int64
for i, v := range srcValues { for i, v := range srcValues {
n, ok := tryParseTimestampISO8601(v) n, ok := TryParseTimestampISO8601(v)
if !ok { if !ok {
return dstBuf, dstValues, valueTypeUnknown, 0, 0 return dstBuf, dstValues, valueTypeUnknown, 0, 0
} }
@ -283,10 +283,10 @@ func tryTimestampISO8601Encoding(dstBuf []byte, dstValues, srcValues []string) (
return dstBuf, dstValues, valueTypeTimestampISO8601, uint64(minValue), uint64(maxValue) return dstBuf, dstValues, valueTypeTimestampISO8601, uint64(minValue), uint64(maxValue)
} }
// tryParseTimestampRFC3339Nano parses 'YYYY-MM-DDThh:mm:ss' with optional nanoseconds part and 'Z' tail and returns unix timestamp in nanoseconds. // TryParseTimestampRFC3339Nano parses 'YYYY-MM-DDThh:mm:ss' with optional nanoseconds part and 'Z' tail and returns unix timestamp in nanoseconds.
// //
// The returned timestamp can be negative if s is smaller than 1970 year. // The returned timestamp can be negative if s is smaller than 1970 year.
func tryParseTimestampRFC3339Nano(s string) (int64, bool) { func TryParseTimestampRFC3339Nano(s string) (int64, bool) {
// Do not parse timestamps with timezone other than Z, since they cannot be converted back // Do not parse timestamps with timezone other than Z, since they cannot be converted back
// to the same string representation in general case. // to the same string representation in general case.
// This may break search. // This may break search.
@ -330,10 +330,10 @@ func tryParseTimestampRFC3339Nano(s string) (int64, bool) {
return nsecs, true return nsecs, true
} }
// tryParseTimestampISO8601 parses 'YYYY-MM-DDThh:mm:ss.mssZ' and returns unix timestamp in nanoseconds. // TryParseTimestampISO8601 parses 'YYYY-MM-DDThh:mm:ss.mssZ' and returns unix timestamp in nanoseconds.
// //
// The returned timestamp can be negative if s is smaller than 1970 year. // The returned timestamp can be negative if s is smaller than 1970 year.
func tryParseTimestampISO8601(s string) (int64, bool) { func TryParseTimestampISO8601(s string) (int64, bool) {
// Do not parse timestamps with timezone, since they cannot be converted back // Do not parse timestamps with timezone, since they cannot be converted back
// to the same string representation in general case. // to the same string representation in general case.
// This may break search. // This may break search.

View file

@ -151,7 +151,7 @@ func TestTryParseIPv4_Failure(t *testing.T) {
func TestTryParseTimestampRFC3339NanoString_Success(t *testing.T) { func TestTryParseTimestampRFC3339NanoString_Success(t *testing.T) {
f := func(s string) { f := func(s string) {
t.Helper() t.Helper()
nsecs, ok := tryParseTimestampRFC3339Nano(s) nsecs, ok := TryParseTimestampRFC3339Nano(s)
if !ok { if !ok {
t.Fatalf("cannot parse timestamp %q", s) t.Fatalf("cannot parse timestamp %q", s)
} }
@ -185,7 +185,7 @@ func TestTryParseTimestampRFC3339NanoString_Success(t *testing.T) {
func TestTryParseTimestampRFC3339Nano_Failure(t *testing.T) { func TestTryParseTimestampRFC3339Nano_Failure(t *testing.T) {
f := func(s string) { f := func(s string) {
t.Helper() t.Helper()
_, ok := tryParseTimestampRFC3339Nano(s) _, ok := TryParseTimestampRFC3339Nano(s)
if ok { if ok {
t.Fatalf("expecting faulure when parsing %q", s) t.Fatalf("expecting faulure when parsing %q", s)
} }
@ -240,7 +240,7 @@ func TestTryParseTimestampRFC3339Nano_Failure(t *testing.T) {
func TestTryParseTimestampISO8601String_Success(t *testing.T) { func TestTryParseTimestampISO8601String_Success(t *testing.T) {
f := func(s string) { f := func(s string) {
t.Helper() t.Helper()
nsecs, ok := tryParseTimestampISO8601(s) nsecs, ok := TryParseTimestampISO8601(s)
if !ok { if !ok {
t.Fatalf("cannot parse timestamp %q", s) t.Fatalf("cannot parse timestamp %q", s)
} }
@ -263,7 +263,7 @@ func TestTryParseTimestampISO8601String_Success(t *testing.T) {
func TestTryParseTimestampISO8601_Failure(t *testing.T) { func TestTryParseTimestampISO8601_Failure(t *testing.T) {
f := func(s string) { f := func(s string) {
t.Helper() t.Helper()
_, ok := tryParseTimestampISO8601(s) _, ok := TryParseTimestampISO8601(s)
if ok { if ok {
t.Fatalf("expecting faulure when parsing %q", s) t.Fatalf("expecting faulure when parsing %q", s)
} }

View file

@ -21,7 +21,7 @@ func BenchmarkTryParseTimestampRFC3339Nano(b *testing.B) {
nSum := int64(0) nSum := int64(0)
for pb.Next() { for pb.Next() {
for _, s := range a { for _, s := range a {
n, ok := tryParseTimestampRFC3339Nano(s) n, ok := TryParseTimestampRFC3339Nano(s)
if !ok { if !ok {
panic(fmt.Errorf("cannot parse timestamp %q", s)) panic(fmt.Errorf("cannot parse timestamp %q", s))
} }
@ -47,7 +47,7 @@ func BenchmarkTryParseTimestampISO8601(b *testing.B) {
nSum := int64(0) nSum := int64(0)
for pb.Next() { for pb.Next() {
for _, s := range a { for _, s := range a {
n, ok := tryParseTimestampISO8601(s) n, ok := TryParseTimestampISO8601(s)
if !ok { if !ok {
panic(fmt.Errorf("cannot parse timestamp %q", s)) panic(fmt.Errorf("cannot parse timestamp %q", s))
} }