mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-20 15:16:42 +00:00
8baa5177aa
This msy be useful when ingesting logs from different sources, which store the log message in different fields.
For example, `_msg_field=message,event.data,some_field` will get log message from the first non-empty field:
`message`, `event.data` and `some_field`.
(cherry picked from commit ed73f8350b
)
256 lines
8.2 KiB
Go
256 lines
8.2 KiB
Go
package journald
|
|
|
|
import (
|
|
"bytes"
|
|
"encoding/binary"
|
|
"flag"
|
|
"fmt"
|
|
"io"
|
|
"net/http"
|
|
"regexp"
|
|
"slices"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vlinsert/insertutils"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vlstorage"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding/zstd"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
|
|
"github.com/VictoriaMetrics/metrics"
|
|
)
|
|
|
|
const (
|
|
journaldEntryMaxNameLen = 64
|
|
)
|
|
|
|
var (
|
|
bodyBufferPool bytesutil.ByteBufferPool
|
|
allowedJournaldEntryNameChars = regexp.MustCompile(`^[A-Z_][A-Z0-9_]+`)
|
|
)
|
|
|
|
var (
|
|
journaldStreamFields = flagutil.NewArrayString("journald.streamFields", "Journal fields to be used as stream fields. "+
|
|
"See the list of allowed fields at https://www.freedesktop.org/software/systemd/man/latest/systemd.journal-fields.html.")
|
|
journaldIgnoreFields = flagutil.NewArrayString("journald.ignoreFields", "Journal fields to ignore. "+
|
|
"See the list of allowed fields at https://www.freedesktop.org/software/systemd/man/latest/systemd.journal-fields.html.")
|
|
journaldTimeField = flag.String("journald.timeField", "__REALTIME_TIMESTAMP", "Journal field to be used as time field. "+
|
|
"See the list of allowed fields at https://www.freedesktop.org/software/systemd/man/latest/systemd.journal-fields.html.")
|
|
journaldTenantID = flag.String("journald.tenantID", "0:0", "TenantID for logs ingested via the Journald endpoint.")
|
|
journaldIncludeEntryMetadata = flag.Bool("journald.includeEntryMetadata", false, "Include journal entry fields, which with double underscores.")
|
|
)
|
|
|
|
func getCommonParams(r *http.Request) (*insertutils.CommonParams, error) {
|
|
cp, err := insertutils.GetCommonParams(r)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if cp.TenantID.AccountID == 0 && cp.TenantID.ProjectID == 0 {
|
|
tenantID, err := logstorage.ParseTenantID(*journaldTenantID)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot parse -journald.tenantID=%q for journald: %w", *journaldTenantID, err)
|
|
}
|
|
cp.TenantID = tenantID
|
|
}
|
|
if cp.TimeField != "" {
|
|
cp.TimeField = *journaldTimeField
|
|
}
|
|
if len(cp.StreamFields) == 0 {
|
|
cp.StreamFields = *journaldStreamFields
|
|
}
|
|
if len(cp.IgnoreFields) == 0 {
|
|
cp.IgnoreFields = *journaldIgnoreFields
|
|
}
|
|
cp.MsgFields = []string{"MESSAGE"}
|
|
return cp, nil
|
|
}
|
|
|
|
// RequestHandler processes Journald Export insert requests
|
|
func RequestHandler(path string, w http.ResponseWriter, r *http.Request) bool {
|
|
switch path {
|
|
case "/upload":
|
|
if r.Header.Get("Content-Type") != "application/vnd.fdo.journal" {
|
|
httpserver.Errorf(w, r, "only application/vnd.fdo.journal encoding is supported for Journald")
|
|
return true
|
|
}
|
|
handleJournald(r, w)
|
|
return true
|
|
default:
|
|
return false
|
|
}
|
|
}
|
|
|
|
// handleJournald parses Journal binary entries
|
|
func handleJournald(r *http.Request, w http.ResponseWriter) {
|
|
startTime := time.Now()
|
|
requestsJournaldTotal.Inc()
|
|
|
|
if err := vlstorage.CanWriteData(); err != nil {
|
|
httpserver.Errorf(w, r, "%s", err)
|
|
return
|
|
}
|
|
|
|
reader := r.Body
|
|
var err error
|
|
|
|
wcr := writeconcurrencylimiter.GetReader(reader)
|
|
data, err := io.ReadAll(wcr)
|
|
if err != nil {
|
|
httpserver.Errorf(w, r, "cannot read request body: %s", err)
|
|
return
|
|
}
|
|
writeconcurrencylimiter.PutReader(wcr)
|
|
bb := bodyBufferPool.Get()
|
|
defer bodyBufferPool.Put(bb)
|
|
if r.Header.Get("Content-Encoding") == "zstd" {
|
|
bb.B, err = zstd.Decompress(bb.B[:0], data)
|
|
if err != nil {
|
|
httpserver.Errorf(w, r, "cannot decompress zstd-encoded request with length %d: %s", len(data), err)
|
|
return
|
|
}
|
|
data = bb.B
|
|
}
|
|
cp, err := getCommonParams(r)
|
|
if err != nil {
|
|
httpserver.Errorf(w, r, "cannot parse common params from request: %s", err)
|
|
return
|
|
}
|
|
|
|
lmp := cp.NewLogMessageProcessor()
|
|
n, err := parseJournaldRequest(data, lmp, cp)
|
|
lmp.MustClose()
|
|
if err != nil {
|
|
errorsTotal.Inc()
|
|
httpserver.Errorf(w, r, "cannot parse Journald protobuf request: %s", err)
|
|
return
|
|
}
|
|
|
|
rowsIngestedJournaldTotal.Add(n)
|
|
|
|
// update requestJournaldDuration only for successfully parsed requests
|
|
// There is no need in updating requestJournaldDuration for request errors,
|
|
// since their timings are usually much smaller than the timing for successful request parsing.
|
|
requestJournaldDuration.UpdateDuration(startTime)
|
|
}
|
|
|
|
var (
|
|
rowsIngestedJournaldTotal = metrics.NewCounter(`vl_rows_ingested_total{type="journald", format="journald"}`)
|
|
|
|
requestsJournaldTotal = metrics.NewCounter(`vl_http_requests_total{path="/insert/journald/upload",format="journald"}`)
|
|
errorsTotal = metrics.NewCounter(`vl_http_errors_total{path="/insert/journald/upload",format="journald"}`)
|
|
|
|
requestJournaldDuration = metrics.NewHistogram(`vl_http_request_duration_seconds{path="/insert/journald/upload",format="journald"}`)
|
|
)
|
|
|
|
// See https://systemd.io/JOURNAL_EXPORT_FORMATS/#journal-export-format
|
|
func parseJournaldRequest(data []byte, lmp insertutils.LogMessageProcessor, cp *insertutils.CommonParams) (rowsIngested int, err error) {
|
|
var fields []logstorage.Field
|
|
var ts int64
|
|
var size uint64
|
|
var name, value string
|
|
var line []byte
|
|
|
|
currentTimestamp := time.Now().UnixNano()
|
|
|
|
for len(data) > 0 {
|
|
idx := bytes.IndexByte(data, '\n')
|
|
switch {
|
|
case idx > 0:
|
|
// process fields
|
|
line = data[:idx]
|
|
data = data[idx+1:]
|
|
case idx == 0:
|
|
// next message or end of file
|
|
// double new line is a separator for the next message
|
|
if len(fields) > 0 {
|
|
if ts == 0 {
|
|
ts = currentTimestamp
|
|
}
|
|
lmp.AddRow(ts, fields)
|
|
rowsIngested++
|
|
fields = fields[:0]
|
|
}
|
|
// skip newline separator
|
|
data = data[1:]
|
|
continue
|
|
case idx < 0:
|
|
return rowsIngested, fmt.Errorf("missing new line separator, unread data left=%d", len(data))
|
|
}
|
|
|
|
idx = bytes.IndexByte(line, '=')
|
|
// could b either e key=value\n pair
|
|
// or just key\n
|
|
// with binary data at the buffer
|
|
if idx > 0 {
|
|
name = bytesutil.ToUnsafeString(line[:idx])
|
|
value = bytesutil.ToUnsafeString(line[idx+1:])
|
|
} else {
|
|
name = bytesutil.ToUnsafeString(line)
|
|
if len(data) == 0 {
|
|
return rowsIngested, fmt.Errorf("unexpected zero data for binary field value of key=%s", name)
|
|
}
|
|
// size of binary data encoded as le i64 at the begging
|
|
idx, err := binary.Decode(data, binary.LittleEndian, &size)
|
|
if err != nil {
|
|
return rowsIngested, fmt.Errorf("failed to extract binary field %q value size: %w", name, err)
|
|
}
|
|
// skip binary data sise
|
|
data = data[idx:]
|
|
if size == 0 {
|
|
return rowsIngested, fmt.Errorf("unexpected zero binary data size decoded %d", size)
|
|
}
|
|
if int(size) > len(data) {
|
|
return rowsIngested, fmt.Errorf("binary data size=%d cannot exceed size of the data at buffer=%d", size, len(data))
|
|
}
|
|
value = bytesutil.ToUnsafeString(data[:size])
|
|
data = data[int(size):]
|
|
// binary data must has new line separator for the new line or next field
|
|
if len(data) == 0 {
|
|
return rowsIngested, fmt.Errorf("unexpected empty buffer after binary field=%s read", name)
|
|
}
|
|
lastB := data[0]
|
|
if lastB != '\n' {
|
|
return rowsIngested, fmt.Errorf("expected new line separator after binary field=%s, got=%s", name, string(lastB))
|
|
}
|
|
data = data[1:]
|
|
}
|
|
// https://github.com/systemd/systemd/blob/main/src/libsystemd/sd-journal/journal-file.c#L1703
|
|
if len(name) > journaldEntryMaxNameLen {
|
|
return rowsIngested, fmt.Errorf("journald entry name should not exceed %d symbols, got: %q", journaldEntryMaxNameLen, name)
|
|
}
|
|
if !allowedJournaldEntryNameChars.MatchString(name) {
|
|
return rowsIngested, fmt.Errorf("journald entry name should consist of `A-Z0-9_` characters and must start from non-digit symbol")
|
|
}
|
|
if name == cp.TimeField {
|
|
ts, err = strconv.ParseInt(value, 10, 64)
|
|
if err != nil {
|
|
return 0, fmt.Errorf("failed to parse Journald timestamp, %w", err)
|
|
}
|
|
ts *= 1e3
|
|
continue
|
|
}
|
|
|
|
if slices.Contains(cp.MsgFields, name) {
|
|
name = "_msg"
|
|
}
|
|
|
|
if *journaldIncludeEntryMetadata || !strings.HasPrefix(name, "__") {
|
|
fields = append(fields, logstorage.Field{
|
|
Name: name,
|
|
Value: value,
|
|
})
|
|
}
|
|
}
|
|
if len(fields) > 0 {
|
|
if ts == 0 {
|
|
ts = currentTimestamp
|
|
}
|
|
lmp.AddRow(ts, fields)
|
|
rowsIngested++
|
|
}
|
|
return rowsIngested, nil
|
|
}
|