diff --git a/app/vlinsert/journald/journald.go b/app/vlinsert/journald/journald.go new file mode 100644 index 000000000..c337daa54 --- /dev/null +++ b/app/vlinsert/journald/journald.go @@ -0,0 +1,255 @@ +package journald + +import ( + "bytes" + "encoding/binary" + "flag" + "fmt" + "io" + "net/http" + "regexp" + "strconv" + "strings" + "time" + + "github.com/VictoriaMetrics/VictoriaMetrics/app/vlinsert/insertutils" + "github.com/VictoriaMetrics/VictoriaMetrics/app/vlstorage" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding/zstd" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter" + "github.com/VictoriaMetrics/metrics" +) + +const ( + journaldEntryMaxNameLen = 64 +) + +var ( + bodyBufferPool bytesutil.ByteBufferPool + allowedJournaldEntryNameChars = regexp.MustCompile(`^[A-Z_][A-Z0-9_]+`) +) + +var ( + journaldStreamFields = flagutil.NewArrayString("journald.streamFields", "Journal fields to be used as stream fields. "+ + "See the list of allowed fields at https://www.freedesktop.org/software/systemd/man/latest/systemd.journal-fields.html.") + journaldIgnoreFields = flagutil.NewArrayString("journald.ignoreFields", "Journal fields to ignore. "+ + "See the list of allowed fields at https://www.freedesktop.org/software/systemd/man/latest/systemd.journal-fields.html.") + journaldTimeField = flag.String("journald.timeField", "__REALTIME_TIMESTAMP", "Journal field to be used as time field. "+ + "See the list of allowed fields at https://www.freedesktop.org/software/systemd/man/latest/systemd.journal-fields.html.") + journaldTenantID = flag.String("journald.tenantID", "0:0", "TenantID for logs ingested via the Journald endpoint.") + journaldIncludeEntryMetadata = flag.Bool("journald.includeEntryMetadata", false, "Include journal entry fields, which with double underscores.") +) + +func getCommonParams(r *http.Request) (*insertutils.CommonParams, error) { + cp, err := insertutils.GetCommonParams(r) + if err != nil { + return nil, err + } + if cp.TenantID.AccountID == 0 && cp.TenantID.ProjectID == 0 { + tenantID, err := logstorage.ParseTenantID(*journaldTenantID) + if err != nil { + return nil, fmt.Errorf("cannot parse -journald.tenantID=%q for journald: %w", *journaldTenantID, err) + } + cp.TenantID = tenantID + } + if cp.TimeField != "" { + cp.TimeField = *journaldTimeField + } + if len(cp.StreamFields) == 0 { + cp.StreamFields = *journaldStreamFields + } + if len(cp.IgnoreFields) == 0 { + cp.IgnoreFields = *journaldIgnoreFields + } + cp.MsgField = "MESSAGE" + return cp, nil +} + +// RequestHandler processes Journald Export insert requests +func RequestHandler(path string, w http.ResponseWriter, r *http.Request) bool { + switch path { + case "/upload": + if r.Header.Get("Content-Type") != "application/vnd.fdo.journal" { + httpserver.Errorf(w, r, "only application/vnd.fdo.journal encoding is supported for Journald") + return true + } + handleJournald(r, w) + return true + default: + return false + } +} + +// handleJournald parses Journal binary entries +func handleJournald(r *http.Request, w http.ResponseWriter) { + startTime := time.Now() + requestsJournaldTotal.Inc() + + if err := vlstorage.CanWriteData(); err != nil { + httpserver.Errorf(w, r, "%s", err) + return + } + + reader := r.Body + var err error + + wcr := writeconcurrencylimiter.GetReader(reader) + data, err := io.ReadAll(wcr) + if err != nil { + httpserver.Errorf(w, r, "cannot read request body: %s", err) + return + } + writeconcurrencylimiter.PutReader(wcr) + bb := bodyBufferPool.Get() + defer bodyBufferPool.Put(bb) + if r.Header.Get("Content-Encoding") == "zstd" { + bb.B, err = zstd.Decompress(bb.B[:0], data) + if err != nil { + httpserver.Errorf(w, r, "cannot decompress zstd-encoded request with length %d: %s", len(data), err) + return + } + data = bb.B + } + cp, err := getCommonParams(r) + if err != nil { + httpserver.Errorf(w, r, "cannot parse common params from request: %s", err) + return + } + + lmp := cp.NewLogMessageProcessor() + n, err := parseJournaldRequest(data, lmp, cp) + lmp.MustClose() + if err != nil { + errorsTotal.Inc() + httpserver.Errorf(w, r, "cannot parse Journald protobuf request: %s", err) + return + } + + rowsIngestedJournaldTotal.Add(n) + + // update requestJournaldDuration only for successfully parsed requests + // There is no need in updating requestJournaldDuration for request errors, + // since their timings are usually much smaller than the timing for successful request parsing. + requestJournaldDuration.UpdateDuration(startTime) +} + +var ( + rowsIngestedJournaldTotal = metrics.NewCounter(`vl_rows_ingested_total{type="journald", format="journald"}`) + + requestsJournaldTotal = metrics.NewCounter(`vl_http_requests_total{path="/insert/journald/upload",format="journald"}`) + errorsTotal = metrics.NewCounter(`vl_http_errors_total{path="/insert/journald/upload",format="journald"}`) + + requestJournaldDuration = metrics.NewHistogram(`vl_http_request_duration_seconds{path="/insert/journald/upload",format="journald"}`) +) + +// See https://systemd.io/JOURNAL_EXPORT_FORMATS/#journal-export-format +func parseJournaldRequest(data []byte, lmp insertutils.LogMessageProcessor, cp *insertutils.CommonParams) (rowsIngested int, err error) { + var fields []logstorage.Field + var ts int64 + var size uint64 + var name, value string + var line []byte + + currentTimestamp := time.Now().UnixNano() + + for len(data) > 0 { + idx := bytes.IndexByte(data, '\n') + switch { + case idx > 0: + // process fields + line = data[:idx] + data = data[idx+1:] + case idx == 0: + // next message or end of file + // double new line is a separator for the next message + if len(fields) > 0 { + if ts == 0 { + ts = currentTimestamp + } + lmp.AddRow(ts, fields) + rowsIngested++ + fields = fields[:0] + } + // skip newline separator + data = data[1:] + continue + case idx < 0: + return rowsIngested, fmt.Errorf("missing new line separator, unread data left=%d", len(data)) + } + + idx = bytes.IndexByte(line, '=') + // could b either e key=value\n pair + // or just key\n + // with binary data at the buffer + if idx > 0 { + name = bytesutil.ToUnsafeString(line[:idx]) + value = bytesutil.ToUnsafeString(line[idx+1:]) + } else { + name = bytesutil.ToUnsafeString(line) + if len(data) == 0 { + return rowsIngested, fmt.Errorf("unexpected zero data for binary field value of key=%s", name) + } + // size of binary data encoded as le i64 at the begging + idx, err := binary.Decode(data, binary.LittleEndian, &size) + if err != nil { + return rowsIngested, fmt.Errorf("failed to extract binary field %q value size: %w", name, err) + } + // skip binary data sise + data = data[idx:] + if size == 0 { + return rowsIngested, fmt.Errorf("unexpected zero binary data size decoded %d", size) + } + if int(size) > len(data) { + return rowsIngested, fmt.Errorf("binary data size=%d cannot exceed size of the data at buffer=%d", size, len(data)) + } + value = bytesutil.ToUnsafeString(data[:size]) + data = data[int(size):] + // binary data must has new line separator for the new line or next field + if len(data) == 0 { + return rowsIngested, fmt.Errorf("unexpected empty buffer after binary field=%s read", name) + } + lastB := data[0] + if lastB != '\n' { + return rowsIngested, fmt.Errorf("expected new line separator after binary field=%s, got=%s", name, string(lastB)) + } + data = data[1:] + } + // https://github.com/systemd/systemd/blob/main/src/libsystemd/sd-journal/journal-file.c#L1703 + if len(name) > journaldEntryMaxNameLen { + return rowsIngested, fmt.Errorf("journald entry name should not exceed %d symbols, got: %q", journaldEntryMaxNameLen, name) + } + if !allowedJournaldEntryNameChars.MatchString(name) { + return rowsIngested, fmt.Errorf("journald entry name should consist of `A-Z0-9_` characters and must start from non-digit symbol") + } + if name == cp.TimeField { + ts, err = strconv.ParseInt(value, 10, 64) + if err != nil { + return 0, fmt.Errorf("failed to parse Journald timestamp, %w", err) + } + ts *= 1e3 + continue + } + + if name == cp.MsgField { + name = "_msg" + } + + if *journaldIncludeEntryMetadata || !strings.HasPrefix(name, "__") { + fields = append(fields, logstorage.Field{ + Name: name, + Value: value, + }) + } + } + if len(fields) > 0 { + if ts == 0 { + ts = currentTimestamp + } + lmp.AddRow(ts, fields) + rowsIngested++ + } + return rowsIngested, nil +} diff --git a/app/vlinsert/journald/journald_test.go b/app/vlinsert/journald/journald_test.go new file mode 100644 index 000000000..b3d3db93c --- /dev/null +++ b/app/vlinsert/journald/journald_test.go @@ -0,0 +1,70 @@ +package journald + +import ( + "testing" + + "github.com/VictoriaMetrics/VictoriaMetrics/app/vlinsert/insertutils" +) + +func TestPushJournaldOk(t *testing.T) { + f := func(src string, timestampsExpected []int64, resultExpected string) { + t.Helper() + tlp := &insertutils.TestLogMessageProcessor{} + cp := &insertutils.CommonParams{ + TimeField: "__REALTIME_TIMESTAMP", + MsgField: "MESSAGE", + } + n, err := parseJournaldRequest([]byte(src), tlp, cp) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + + if err := tlp.Verify(n, timestampsExpected, resultExpected); err != nil { + t.Fatal(err) + } + } + // Single event + f("__REALTIME_TIMESTAMP=91723819283\nMESSAGE=Test message\n", + []int64{91723819283000}, + "{\"_msg\":\"Test message\"}", + ) + + // Multiple events + f("__REALTIME_TIMESTAMP=91723819283\nMESSAGE=Test message\n\n__REALTIME_TIMESTAMP=91723819284\nMESSAGE=Test message2\n", + []int64{91723819283000, 91723819284000}, + "{\"_msg\":\"Test message\"}\n{\"_msg\":\"Test message2\"}", + ) + + // Parse binary data + f("__CURSOR=s=e0afe8412a6a49d2bfcf66aa7927b588;i=1f06;b=f778b6e2f7584a77b991a2366612a7b5;m=300bdfd420;t=62526e1182354;x=930dc44b370963b7\n__REALTIME_TIMESTAMP=1729698775704404\n__MONOTONIC_TIMESTAMP=206357648416\n__SEQNUM=7942\n__SEQNUM_ID=e0afe8412a6a49d2bfcf66aa7927b588\n_BOOT_ID=f778b6e2f7584a77b991a2366612a7b5\n_UID=0\n_GID=0\n_MACHINE_ID=a4a970370c30a925df02a13c67167847\n_HOSTNAME=ecd5e4555787\n_RUNTIME_SCOPE=system\n_TRANSPORT=journal\n_CAP_EFFECTIVE=1ffffffffff\n_SYSTEMD_CGROUP=/init.scope\n_SYSTEMD_UNIT=init.scope\n_SYSTEMD_SLICE=-.slice\nCODE_FILE=\nCODE_LINE=1\nCODE_FUNC=\nSYSLOG_IDENTIFIER=python3\n_COMM=python3\n_EXE=/usr/bin/python3.12\n_CMDLINE=python3\nMESSAGE\n\x13\x00\x00\x00\x00\x00\x00\x00foo\nbar\n\n\nasda\nasda\n_PID=2763\n_SOURCE_REALTIME_TIMESTAMP=1729698775704375\n\n", + []int64{1729698775704404000}, + "{\"_BOOT_ID\":\"f778b6e2f7584a77b991a2366612a7b5\",\"_UID\":\"0\",\"_GID\":\"0\",\"_MACHINE_ID\":\"a4a970370c30a925df02a13c67167847\",\"_HOSTNAME\":\"ecd5e4555787\",\"_RUNTIME_SCOPE\":\"system\",\"_TRANSPORT\":\"journal\",\"_CAP_EFFECTIVE\":\"1ffffffffff\",\"_SYSTEMD_CGROUP\":\"/init.scope\",\"_SYSTEMD_UNIT\":\"init.scope\",\"_SYSTEMD_SLICE\":\"-.slice\",\"CODE_FILE\":\"\\u003cstdin>\",\"CODE_LINE\":\"1\",\"CODE_FUNC\":\"\\u003cmodule>\",\"SYSLOG_IDENTIFIER\":\"python3\",\"_COMM\":\"python3\",\"_EXE\":\"/usr/bin/python3.12\",\"_CMDLINE\":\"python3\",\"_msg\":\"foo\\nbar\\n\\n\\nasda\\nasda\",\"_PID\":\"2763\",\"_SOURCE_REALTIME_TIMESTAMP\":\"1729698775704375\"}", + ) +} + +func TestPushJournald_Failure(t *testing.T) { + f := func(data string) { + t.Helper() + tlp := &insertutils.TestLogMessageProcessor{} + cp := &insertutils.CommonParams{ + TimeField: "__REALTIME_TIMESTAMP", + MsgField: "MESSAGE", + } + _, err := parseJournaldRequest([]byte(data), tlp, cp) + if err == nil { + t.Fatalf("expected non nil error") + } + } + // missing new line terminator for binary encoded message + f("__CURSOR=s=e0afe8412a6a49d2bfcf66aa7927b588;i=1f06;b=f778b6e2f7584a77b991a2366612a7b5;m=300bdfd420;t=62526e1182354;x=930dc44b370963b7\n__REALTIME_TIMESTAMP=1729698775704404\nMESSAGE\n\x13\x00\x00\x00\x00\x00\x00\x00foo\nbar\n\n\nasdaasda2") + // missing new line terminator + f("__REALTIME_TIMESTAMP=91723819283\n=Test message") + // empty field name + f("__REALTIME_TIMESTAMP=91723819283\n=Test message\n") + // field name starting with number + f("__REALTIME_TIMESTAMP=91723819283\n1incorrect=Test message\n") + // field name exceeds 64 limit + f("__REALTIME_TIMESTAMP=91723819283\ntoolooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooongcorrecooooooooooooong=Test message\n") + // Only allow A-Z0-9 and '_' + f("__REALTIME_TIMESTAMP=91723819283\nbadC!@$!@$as=Test message\n") +} diff --git a/app/vlinsert/main.go b/app/vlinsert/main.go index a5c01fb98..d784ed337 100644 --- a/app/vlinsert/main.go +++ b/app/vlinsert/main.go @@ -5,6 +5,7 @@ import ( "strings" "github.com/VictoriaMetrics/VictoriaMetrics/app/vlinsert/elasticsearch" + "github.com/VictoriaMetrics/VictoriaMetrics/app/vlinsert/journald" "github.com/VictoriaMetrics/VictoriaMetrics/app/vlinsert/jsonline" "github.com/VictoriaMetrics/VictoriaMetrics/app/vlinsert/loki" "github.com/VictoriaMetrics/VictoriaMetrics/app/vlinsert/opentelemetry" @@ -45,6 +46,9 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool { case strings.HasPrefix(path, "/opentelemetry/"): path = strings.TrimPrefix(path, "/opentelemetry") return opentelemetry.RequestHandler(path, w, r) + case strings.HasPrefix(path, "/journald/"): + path = strings.TrimPrefix(path, "/journald") + return journald.RequestHandler(path, w, r) default: return false } diff --git a/deployment/docker/victorialogs/compose-base.yml b/deployment/docker/victorialogs/compose-base.yml index 0d6c48dcb..05da7cb9c 100644 --- a/deployment/docker/victorialogs/compose-base.yml +++ b/deployment/docker/victorialogs/compose-base.yml @@ -6,6 +6,9 @@ services: - -storageDataPath=/vlogs - -loggerFormat=json - -syslog.listenAddr.tcp=0.0.0.0:8094 + - -journald.streamFields=_HOSTNAME,_SYSTEMD_UNIT,_PID + - -journald.ignoreFields=MESSAGE_ID,INVOCATION_ID,USER_INVOCATION_ID, + - -journald.ignoreFields=_BOOT_ID,_MACHINE_ID,_SYSTEMD_INVOCATION_ID,_STREAM_ID,_UID deploy: replicas: 0 healthcheck: diff --git a/deployment/docker/victorialogs/fluentbit/README.md b/deployment/docker/victorialogs/fluentbit/README.md index c6727c206..97339e232 100644 --- a/deployment/docker/victorialogs/fluentbit/README.md +++ b/deployment/docker/victorialogs/fluentbit/README.md @@ -5,6 +5,7 @@ The folder contains examples of [FluentBit](https://docs.fluentbit.io/manual) in * [loki](./loki) * [jsonline single node](./jsonline) * [jsonline HA setup](./jsonline-ha) +* [otlp](./otlp) To spin-up environment `cd` to any of listed above directories run the following command: ``` @@ -32,5 +33,6 @@ FluentBit configuration example can be found below: * [loki](./loki/fluent-bit.conf) * [jsonline single node](./jsonline/fluent-bit.conf) * [jsonline HA setup](./jsonline-ha/fluent-bit.conf) +* [otlp](./otlp/fluent-bit.conf) Please, note that `_stream_fields` parameter must follow recommended [best practices](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields) to achieve better performance. diff --git a/deployment/docker/victorialogs/fluentbit/compose-base.yml b/deployment/docker/victorialogs/fluentbit/compose-base.yml index c9580679f..bfa881ae5 100644 --- a/deployment/docker/victorialogs/fluentbit/compose-base.yml +++ b/deployment/docker/victorialogs/fluentbit/compose-base.yml @@ -1,6 +1,13 @@ include: - ../compose-base.yml services: + nginx: + image: nginx:1.27 + volumes: + - ./nginx.conf:/etc/nginx/nginx.conf + depends_on: [fluentbit] + ports: + - "8080:80" fluentbit: image: cr.fluentbit.io/fluent/fluent-bit:3.1.7 volumes: diff --git a/deployment/docker/victorialogs/fluentbit/nginx.conf b/deployment/docker/victorialogs/fluentbit/nginx.conf new file mode 100644 index 000000000..d0ae8edce --- /dev/null +++ b/deployment/docker/victorialogs/fluentbit/nginx.conf @@ -0,0 +1,16 @@ +events { + worker_connections 2000; +} + +http { + server { + listen 0.0.0.0; + server_name _; + location /opentelemetry/api/v1/push { + proxy_pass http://victoriametrics:8428; + } + location /insert/opentelemetry/v1/logs { + proxy_pass http://victorialogs:9428; + } + } +} diff --git a/deployment/docker/victorialogs/fluentbit/otlp/compose.yml b/deployment/docker/victorialogs/fluentbit/otlp/compose.yml new file mode 100644 index 000000000..1627ba043 --- /dev/null +++ b/deployment/docker/victorialogs/fluentbit/otlp/compose.yml @@ -0,0 +1,3 @@ +include: + - ../compose.yml +name: fluentbit-loki diff --git a/deployment/docker/victorialogs/fluentbit/otlp/fluent-bit.conf b/deployment/docker/victorialogs/fluentbit/otlp/fluent-bit.conf new file mode 100644 index 000000000..b116220c8 --- /dev/null +++ b/deployment/docker/victorialogs/fluentbit/otlp/fluent-bit.conf @@ -0,0 +1,33 @@ +[INPUT] + name tail + path /var/lib/docker/containers/**/*.log + path_key path + multiline.parser docker, cri + Parser docker + Docker_Mode On + +[INPUT] + Name syslog + Listen 0.0.0.0 + Port 5140 + Parser syslog-rfc3164 + Mode tcp + +[INPUT] + name fluentbit_metrics + tag internal_metrics + scrape_interval 2 + +[SERVICE] + Flush 1 + Parsers_File parsers.conf + +[OUTPUT] + name opentelemetry + match * + host nginx + logs_uri /insert/opentelemetry/v1/logs + metrics_uri /opentelemetry/api/v1/push + port 80 + header VL-Msg-Field log + header VL-Stream-Fields severity diff --git a/deployment/docker/victorialogs/journald/Dockerfile b/deployment/docker/victorialogs/journald/Dockerfile new file mode 100644 index 000000000..95bc96fb5 --- /dev/null +++ b/deployment/docker/victorialogs/journald/Dockerfile @@ -0,0 +1,9 @@ +FROM ubuntu:24.04 + +RUN \ + apt update && \ + apt install -y \ + systemd \ + systemd-journal-remote && \ + sed -i 's/# URL=/URL=http:\/\/victorialogs:9428\/insert\/journald/g' /etc/systemd/journal-upload.conf && \ + systemctl enable systemd-journal-upload.service diff --git a/deployment/docker/victorialogs/journald/README.md b/deployment/docker/victorialogs/journald/README.md new file mode 100644 index 000000000..096aac31f --- /dev/null +++ b/deployment/docker/victorialogs/journald/README.md @@ -0,0 +1,29 @@ +# Docker compose Journald integration with VictoriaLogs + +The folder contains examples of Journald integration with VictoriaLogs using protocols: + +* [journald](./journald) + +To spin-up environment `cd` to any of listed above directories run the following command: +``` +docker compose up -d +``` + +To shut down the docker-compose environment run the following command: +``` +docker compose down +docker compose rm -f +``` + +The docker compose file contains the following components: + +* journald - Journald logs collection agent, which is configured to collect and write data to `victorialogs` +* victorialogs - VictoriaLogs log database, which accepts the data from `journald` +* victoriametrics - VictoriaMetrics metrics database, which collects metrics from `victorialogs` and `journald` + +Querying the data + +* [vmui](https://docs.victoriametrics.com/victorialogs/querying/#vmui) - a web UI is accessible by `http://localhost:9428/select/vmui` +* for querying the data via command-line please check [these docs](https://docs.victoriametrics.com/victorialogs/querying/#command-line) + +Please, note that `_stream_fields` parameter must follow recommended [best practices](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields) to achieve better performance. diff --git a/deployment/docker/victorialogs/journald/compose-base.yml b/deployment/docker/victorialogs/journald/compose-base.yml new file mode 100644 index 000000000..0091669dc --- /dev/null +++ b/deployment/docker/victorialogs/journald/compose-base.yml @@ -0,0 +1,14 @@ +include: + - ../compose-base.yml +services: + journald: + build: . + restart: on-failure + privileged: true + user: root + entrypoint: /lib/systemd/systemd + depends_on: + victorialogs: + condition: service_healthy + victoriametrics: + condition: service_healthy diff --git a/deployment/docker/victorialogs/journald/journald/compose.yml b/deployment/docker/victorialogs/journald/journald/compose.yml new file mode 100644 index 000000000..573ac1550 --- /dev/null +++ b/deployment/docker/victorialogs/journald/journald/compose.yml @@ -0,0 +1,3 @@ +include: + - ../compose-base.yml +name: journald-remote diff --git a/docs/VictoriaLogs/CHANGELOG.md b/docs/VictoriaLogs/CHANGELOG.md index 78f05254d..574d9bf7a 100644 --- a/docs/VictoriaLogs/CHANGELOG.md +++ b/docs/VictoriaLogs/CHANGELOG.md @@ -15,6 +15,8 @@ according to [these docs](https://docs.victoriametrics.com/victorialogs/quicksta ## tip +* FEATURE: added ability to receive systemd (journald) logs over network. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4618). + * BUGFIX: [web UI](https://docs.victoriametrics.com/victorialogs/querying/#web-ui): fix various glitches with updating query responses. The issue was introduced in [v0.36.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.36.0-victorialogs). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7279). ## [v0.37.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.37.0-victorialogs) diff --git a/docs/VictoriaLogs/Roadmap.md b/docs/VictoriaLogs/Roadmap.md index 5d2f28f37..c5307ceb2 100644 --- a/docs/VictoriaLogs/Roadmap.md +++ b/docs/VictoriaLogs/Roadmap.md @@ -21,7 +21,6 @@ See [these docs](https://docs.victoriametrics.com/victorialogs/) for details. The following functionality is planned in the future versions of VictoriaLogs: - Support for [data ingestion](https://docs.victoriametrics.com/victorialogs/data-ingestion/) from popular log collectors and formats: - - [ ] [Journald](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4618) (systemd) - [ ] [Datadog protocol for logs](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6632) - [ ] Integration with Grafana. Partially done, check the [documentation](https://docs.victoriametrics.com/victorialogs/victorialogs-datasource/) and [datasource repository](https://github.com/VictoriaMetrics/victorialogs-datasource). - [ ] Ability to make instant snapshots and backups in the way [similar to VictoriaMetrics](https://docs.victoriametrics.com/#how-to-work-with-snapshots). diff --git a/docs/VictoriaLogs/data-ingestion/Journald.md b/docs/VictoriaLogs/data-ingestion/Journald.md new file mode 100644 index 000000000..16d736b83 --- /dev/null +++ b/docs/VictoriaLogs/data-ingestion/Journald.md @@ -0,0 +1,33 @@ +--- +weight: 10 +title: Journald setup +disableToc: true +menu: + docs: + parent: "victorialogs-data-ingestion" + weight: 10 +aliases: + - /VictoriaLogs/data-ingestion/Journald.html +--- +On a client site which should already have journald please install additionally [systemd-journal-upload](https://www.freedesktop.org/software/systemd/man/latest/systemd-journal-upload.service.html) and edit `/etc/systemd/journal-upload.conf` and set `URL` to VictoriaLogs endpoint: + +``` +[Upload] +URL=http://localhost:9428/insert/journald +``` + +Substitute the `localhost:9428` address inside `endpoints` section with the real TCP address of VictoriaLogs. + +Since neither HTTP query arguments nor HTTP headers are configurable on systemd-journal-upload, +[stream fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields) and other params can be configured on VictoriaLogs using command-line flags: +- `journald.streamFields` - configures [stream fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields) for ingested data. +Here's a [list of supported Journald fields](https://www.freedesktop.org/software/systemd/man/latest/systemd.journal-fields.html) +- `journald.ignoreFields` - configures Journald fields, that should be ignored. +- `journald.tenantID` - configures TenantID for ingested data. +- `journald.timeField` - configures time field for ingested data. + +See also: + +- [Data ingestion troubleshooting](https://docs.victoriametrics.com/victorialogs/data-ingestion/#troubleshooting). +- [How to query VictoriaLogs](https://docs.victoriametrics.com/victorialogs/querying/). +- [Docker-compose demo for Journald integration with VictoriaLogs](https://github.com/VictoriaMetrics/VictoriaMetrics/tree/master/deployment/docker/victorialogs/journald). diff --git a/docs/VictoriaLogs/data-ingestion/README.md b/docs/VictoriaLogs/data-ingestion/README.md index d84cd7603..0f912c4de 100644 --- a/docs/VictoriaLogs/data-ingestion/README.md +++ b/docs/VictoriaLogs/data-ingestion/README.md @@ -9,6 +9,7 @@ - Promtail (aka Grafana Loki) - see [these docs](https://docs.victoriametrics.com/victorialogs/data-ingestion/promtail/). - Telegraf - see [these docs](https://docs.victoriametrics.com/victorialogs/data-ingestion/telegraf/). - OpenTelemetry Collector - see [these docs](https://docs.victoriametrics.com/victorialogs/data-ingestion/opentelemetry/). +- Journald - see [these docs](https://docs.victoriametrics.com/victorialogs/data-ingestion/journald/). The ingested logs can be queried according to [these docs](https://docs.victoriametrics.com/victorialogs/querying/). @@ -25,6 +26,7 @@ VictoriaLogs supports the following data ingestion HTTP APIs: - JSON stream API aka [ndjson](https://jsonlines.org/). See [these docs](#json-stream-api). - Loki JSON API. See [these docs](#loki-json-api). - OpenTelemetry API. See [these docs](#opentelemetry-api). +- Journald export format. VictoriaLogs accepts optional [HTTP parameters](#http-parameters) at data ingestion HTTP APIs. @@ -286,16 +288,17 @@ VictoriaLogs exposes various [metrics](https://docs.victoriametrics.com/victoria Here is the list of log collectors and their ingestion formats supported by VictoriaLogs: -| How to setup the collector | Format: Elasticsearch | Format: JSON Stream | Format: Loki | Format: syslog | Format: OpenTelemetry | -|----------------------------|-----------------------|---------------------|--------------|----------------|-----------------------| -| [Rsyslog](https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/) | [Yes](https://www.rsyslog.com/doc/configuration/modules/omelasticsearch.html) | No | No | [Yes](https://www.rsyslog.com/doc/configuration/modules/omfwd.html) | No | -| [Syslog-ng](https://docs.victoriametrics.com/victorialogs/data-ingestion/filebeat/) | Yes, [v1](https://support.oneidentity.com/technical-documents/syslog-ng-open-source-edition/3.16/administration-guide/28#TOPIC-956489), [v2](https://support.oneidentity.com/technical-documents/doc/syslog-ng-open-source-edition/3.16/administration-guide/29#TOPIC-956494) | No | No | [Yes](https://support.oneidentity.com/technical-documents/doc/syslog-ng-open-source-edition/3.16/administration-guide/44#TOPIC-956553) | No | -| [Filebeat](https://docs.victoriametrics.com/victorialogs/data-ingestion/filebeat/) | [Yes](https://www.elastic.co/guide/en/beats/filebeat/current/elasticsearch-output.html) | No | No | No | No | -| [Fluentbit](https://docs.victoriametrics.com/victorialogs/data-ingestion/fluentbit/) | No | [Yes](https://docs.fluentbit.io/manual/pipeline/outputs/http) | [Yes](https://docs.fluentbit.io/manual/pipeline/outputs/loki) | [Yes](https://docs.fluentbit.io/manual/pipeline/outputs/syslog) | [Yes](https://docs.fluentbit.io/manual/pipeline/outputs/opentelemetry) | -| [Logstash](https://docs.victoriametrics.com/victorialogs/data-ingestion/logstash/) | [Yes](https://www.elastic.co/guide/en/logstash/current/plugins-outputs-elasticsearch.html) | No | No | [Yes](https://www.elastic.co/guide/en/logstash/current/plugins-outputs-syslog.html) | [Yes](https://github.com/paulgrav/logstash-output-opentelemetry) | -| [Vector](https://docs.victoriametrics.com/victorialogs/data-ingestion/vector/) | [Yes](https://vector.dev/docs/reference/configuration/sinks/elasticsearch/) | [Yes](https://vector.dev/docs/reference/configuration/sinks/http/) | [Yes](https://vector.dev/docs/reference/configuration/sinks/loki/) | No | [Yes](https://vector.dev/docs/reference/configuration/sources/opentelemetry/) | -| [Promtail](https://docs.victoriametrics.com/victorialogs/data-ingestion/promtail/) | No | No | [Yes](https://grafana.com/docs/loki/latest/clients/promtail/configuration/#clients) | No | No | -| [OpenTelemetry Collector](https://opentelemetry.io/docs/collector/) | [Yes](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/exporter/elasticsearchexporter) | No | [Yes](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/exporter/lokiexporter) | [Yes](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/exporter/syslogexporter) | [Yes](https://github.com/open-telemetry/opentelemetry-collector/tree/main/exporter/otlphttpexporter) | -| [Telegraf](https://docs.victoriametrics.com/victorialogs/data-ingestion/telegraf/) | [Yes](https://github.com/influxdata/telegraf/tree/master/plugins/outputs/elasticsearch) | [Yes](https://github.com/influxdata/telegraf/tree/master/plugins/outputs/http) | [Yes](https://github.com/influxdata/telegraf/tree/master/plugins/outputs/loki) | [Yes](https://github.com/influxdata/telegraf/blob/master/plugins/outputs/syslog) | Yes | -| [Fluentd](https://docs.victoriametrics.com/victorialogs/data-ingestion/fluentd/) | [Yes](https://github.com/uken/fluent-plugin-elasticsearch) | [Yes](https://docs.fluentd.org/output/http) | [Yes](https://grafana.com/docs/loki/latest/send-data/fluentd/) | [Yes](https://github.com/fluent-plugins-nursery/fluent-plugin-remote_syslog) | No | +| How to setup the collector | Format: Elasticsearch | Format: JSON Stream | Format: Loki | Format: syslog | Format: OpenTelemetry | Format: Journald | +|----------------------------|-----------------------|---------------------|--------------|----------------|-----------------------|------------------| +| [Rsyslog](https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/) | [Yes](https://www.rsyslog.com/doc/configuration/modules/omelasticsearch.html) | No | No | [Yes](https://www.rsyslog.com/doc/configuration/modules/omfwd.html) | No | No | +| [Syslog-ng](https://docs.victoriametrics.com/victorialogs/data-ingestion/filebeat/) | Yes, [v1](https://support.oneidentity.com/technical-documents/syslog-ng-open-source-edition/3.16/administration-guide/28#TOPIC-956489), [v2](https://support.oneidentity.com/technical-documents/doc/syslog-ng-open-source-edition/3.16/administration-guide/29#TOPIC-956494) | No | No | [Yes](https://support.oneidentity.com/technical-documents/doc/syslog-ng-open-source-edition/3.16/administration-guide/44#TOPIC-956553) | No | No | +| [Filebeat](https://docs.victoriametrics.com/victorialogs/data-ingestion/filebeat/) | [Yes](https://www.elastic.co/guide/en/beats/filebeat/current/elasticsearch-output.html) | No | No | No | No | No | +| [Fluentbit](https://docs.victoriametrics.com/victorialogs/data-ingestion/fluentbit/) | No | [Yes](https://docs.fluentbit.io/manual/pipeline/outputs/http) | [Yes](https://docs.fluentbit.io/manual/pipeline/outputs/loki) | [Yes](https://docs.fluentbit.io/manual/pipeline/outputs/syslog) | [Yes](https://docs.fluentbit.io/manual/pipeline/outputs/opentelemetry) | No | +| [Logstash](https://docs.victoriametrics.com/victorialogs/data-ingestion/logstash/) | [Yes](https://www.elastic.co/guide/en/logstash/current/plugins-outputs-elasticsearch.html) | No | No | [Yes](https://www.elastic.co/guide/en/logstash/current/plugins-outputs-syslog.html) | [Yes](https://github.com/paulgrav/logstash-output-opentelemetry) | No | +| [Vector](https://docs.victoriametrics.com/victorialogs/data-ingestion/vector/) | [Yes](https://vector.dev/docs/reference/configuration/sinks/elasticsearch/) | [Yes](https://vector.dev/docs/reference/configuration/sinks/http/) | [Yes](https://vector.dev/docs/reference/configuration/sinks/loki/) | No | [Yes](https://vector.dev/docs/reference/configuration/sources/opentelemetry/) | No | +| [Promtail](https://docs.victoriametrics.com/victorialogs/data-ingestion/promtail/) | No | No | [Yes](https://grafana.com/docs/loki/latest/clients/promtail/configuration/#clients) | No | No | No | +| [OpenTelemetry Collector](https://opentelemetry.io/docs/collector/) | [Yes](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/exporter/elasticsearchexporter) | No | [Yes](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/exporter/lokiexporter) | [Yes](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/exporter/syslogexporter) | [Yes](https://github.com/open-telemetry/opentelemetry-collector/tree/main/exporter/otlphttpexporter) | No | +| [Telegraf](https://docs.victoriametrics.com/victorialogs/data-ingestion/telegraf/) | [Yes](https://github.com/influxdata/telegraf/tree/master/plugins/outputs/elasticsearch) | [Yes](https://github.com/influxdata/telegraf/tree/master/plugins/outputs/http) | [Yes](https://github.com/influxdata/telegraf/tree/master/plugins/outputs/loki) | [Yes](https://github.com/influxdata/telegraf/blob/master/plugins/outputs/syslog) | Yes | No | +| [Fluentd](https://docs.victoriametrics.com/victorialogs/data-ingestion/fluentd/) | [Yes](https://github.com/uken/fluent-plugin-elasticsearch) | [Yes](https://docs.fluentd.org/output/http) | [Yes](https://grafana.com/docs/loki/latest/send-data/fluentd/) | [Yes](https://github.com/fluent-plugins-nursery/fluent-plugin-remote_syslog) | No | No | +| [Journald](https://docs.victoriametrics.com/victorialogs/data-ingestion/journald/) | No | No | No | No | No | Yes | diff --git a/go.mod b/go.mod index e50ce1856..0f7bb846d 100644 --- a/go.mod +++ b/go.mod @@ -30,7 +30,7 @@ require ( github.com/valyala/fastjson v1.6.4 github.com/valyala/fastrand v1.1.0 github.com/valyala/fasttemplate v1.2.2 - github.com/valyala/gozstd v1.21.1 + github.com/valyala/gozstd v1.21.2 github.com/valyala/histogram v1.2.0 github.com/valyala/quicktemplate v1.8.0 golang.org/x/net v0.29.0 diff --git a/go.sum b/go.sum index 636d7402d..53089cd60 100644 --- a/go.sum +++ b/go.sum @@ -502,6 +502,8 @@ github.com/valyala/fasttemplate v1.2.2 h1:lxLXG0uE3Qnshl9QyaK6XJxMXlQZELvChBOCmQ github.com/valyala/fasttemplate v1.2.2/go.mod h1:KHLXt3tVN2HBp8eijSv/kGJopbvo7S+qRAEEKiv+SiQ= github.com/valyala/gozstd v1.21.1 h1:TQFZVTk5zo7iJcX3o4XYBJujPdO31LFb4fVImwK873A= github.com/valyala/gozstd v1.21.1/go.mod h1:y5Ew47GLlP37EkTB+B4s7r6A5rdaeB7ftbl9zoYiIPQ= +github.com/valyala/gozstd v1.21.2 h1:SBZ6sYA9y+u32XSds1TwOJJatcqmA3TgfLwGtV78Fcw= +github.com/valyala/gozstd v1.21.2/go.mod h1:y5Ew47GLlP37EkTB+B4s7r6A5rdaeB7ftbl9zoYiIPQ= github.com/valyala/histogram v1.2.0 h1:wyYGAZZt3CpwUiIb9AU/Zbllg1llXyrtApRS815OLoQ= github.com/valyala/histogram v1.2.0/go.mod h1:Hb4kBwb4UxsaNbbbh+RRz8ZR6pdodR57tzWUS3BUzXY= github.com/valyala/quicktemplate v1.8.0 h1:zU0tjbIqTRgKQzFY1L42zq0qR3eh4WoQQdIdqCysW5k= diff --git a/vendor/github.com/valyala/gozstd/gozstd.go b/vendor/github.com/valyala/gozstd/gozstd.go index cc9ffa467..831b703ef 100644 --- a/vendor/github.com/valyala/gozstd/gozstd.go +++ b/vendor/github.com/valyala/gozstd/gozstd.go @@ -29,9 +29,10 @@ static size_t ZSTD_decompress_usingDDict_wrapper(uintptr_t ctx, uintptr_t dst, s return ZSTD_decompress_usingDDict((ZSTD_DCtx*)ctx, (void*)dst, dstCapacity, (const void*)src, srcSize, (const ZSTD_DDict*)ddict); } -static unsigned long long ZSTD_getFrameContentSize_wrapper(uintptr_t src, size_t srcSize) { - return ZSTD_getFrameContentSize((const void*)src, srcSize); +static unsigned long long ZSTD_findDecompressedSize_wrapper(uintptr_t src, size_t srcSize) { + return ZSTD_findDecompressedSize((const void*)src, srcSize); } + */ import "C" @@ -254,7 +255,7 @@ func decompress(dctx, dctxDict *dctxWrapper, dst, src []byte, dd *DDict) ([]byte } // Slow path - resize dst to fit decompressed data. - decompressBound := int(C.ZSTD_getFrameContentSize_wrapper( + decompressBound := int(C.ZSTD_findDecompressedSize_wrapper( C.uintptr_t(uintptr(unsafe.Pointer(&src[0]))), C.size_t(len(src)))) // Prevent from GC'ing of src during CGO call above. runtime.KeepAlive(src) diff --git a/vendor/modules.txt b/vendor/modules.txt index ec1af20a9..1807cfa2b 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -580,7 +580,7 @@ github.com/valyala/fastrand # github.com/valyala/fasttemplate v1.2.2 ## explicit; go 1.12 github.com/valyala/fasttemplate -# github.com/valyala/gozstd v1.21.1 +# github.com/valyala/gozstd v1.21.2 ## explicit; go 1.12 github.com/valyala/gozstd # github.com/valyala/histogram v1.2.0