vlinsert: support datadog logs

This commit adds the following changes:

- Added support to push datadog logs with examples of how to ingest data
using Vector and Fluentbit
- Updated VictoriaLogs examples directory structure to have single
container image for victorialogs, agent (fluentbit, vector, etc) but
multiple configurations for different protocols

Related issue https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6632
This commit is contained in:
Andrii Chubatiuk 2024-11-05 17:52:35 +02:00 committed by GitHub
parent 2e8f420d84
commit e0930687f1
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
21 changed files with 545 additions and 15 deletions

View file

@ -0,0 +1,185 @@
package datadog
import (
"bytes"
"fmt"
"io"
"net/http"
"strconv"
"time"
"github.com/VictoriaMetrics/metrics"
"github.com/valyala/fastjson"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vlinsert/insertutils"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vlstorage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
)
var parserPool fastjson.ParserPool
// RequestHandler processes Datadog insert requests
func RequestHandler(path string, w http.ResponseWriter, r *http.Request) bool {
switch path {
case "/api/v1/validate":
fmt.Fprintf(w, `{}`)
return true
case "/api/v2/logs":
return datadogLogsIngestion(w, r)
default:
return false
}
}
func datadogLogsIngestion(w http.ResponseWriter, r *http.Request) bool {
w.Header().Add("Content-Type", "application/json")
startTime := time.Now()
v2LogsRequestsTotal.Inc()
reader := r.Body
var ts int64
if tsValue := r.Header.Get("dd-message-timestamp"); tsValue != "" && tsValue != "0" {
var err error
ts, err = strconv.ParseInt(tsValue, 10, 64)
if err != nil {
httpserver.Errorf(w, r, "could not parse dd-message-timestamp header value: %s", err)
return true
}
ts *= 1e6
} else {
ts = startTime.UnixNano()
}
if r.Header.Get("Content-Encoding") == "gzip" {
zr, err := common.GetGzipReader(reader)
if err != nil {
httpserver.Errorf(w, r, "cannot read gzipped logs request: %s", err)
return true
}
defer common.PutGzipReader(zr)
reader = zr
}
wcr := writeconcurrencylimiter.GetReader(reader)
data, err := io.ReadAll(wcr)
writeconcurrencylimiter.PutReader(wcr)
if err != nil {
httpserver.Errorf(w, r, "cannot read request body: %s", err)
return true
}
cp, err := insertutils.GetCommonParams(r)
if err != nil {
httpserver.Errorf(w, r, "%s", err)
return true
}
if err := vlstorage.CanWriteData(); err != nil {
httpserver.Errorf(w, r, "%s", err)
return true
}
lmp := cp.NewLogMessageProcessor()
n, err := readLogsRequest(ts, data, lmp.AddRow)
lmp.MustClose()
if n > 0 {
rowsIngestedTotal.Add(n)
}
if err != nil {
logger.Warnf("cannot decode log message in /api/v2/logs request: %s, stream fields: %s", err, cp.StreamFields)
return true
}
// update v2LogsRequestDuration only for successfully parsed requests
// There is no need in updating v2LogsRequestDuration for request errors,
// since their timings are usually much smaller than the timing for successful request parsing.
v2LogsRequestDuration.UpdateDuration(startTime)
fmt.Fprintf(w, `{}`)
return true
}
var (
v2LogsRequestsTotal = metrics.NewCounter(`vl_http_requests_total{path="/insert/datadog/api/v2/logs"}`)
rowsIngestedTotal = metrics.NewCounter(`vl_rows_ingested_total{type="datadog"}`)
v2LogsRequestDuration = metrics.NewHistogram(`vl_http_request_duration_seconds{path="/insert/datadog/api/v2/logs"}`)
)
// readLogsRequest parses data according to DataDog logs format
// https://docs.datadoghq.com/api/latest/logs/#send-logs
func readLogsRequest(ts int64, data []byte, processLogMessage func(int64, []logstorage.Field)) (int, error) {
p := parserPool.Get()
defer parserPool.Put(p)
v, err := p.ParseBytes(data)
if err != nil {
return 0, fmt.Errorf("cannot parse JSON request body: %w", err)
}
records, err := v.Array()
if err != nil {
return 0, fmt.Errorf("cannot extract array from parsed JSON: %w", err)
}
var fields []logstorage.Field
for m, r := range records {
o, err := r.Object()
if err != nil {
return m + 1, fmt.Errorf("could not extract log record: %w", err)
}
o.Visit(func(k []byte, v *fastjson.Value) {
if err != nil {
return
}
val, e := v.StringBytes()
if e != nil {
err = fmt.Errorf("unexpected label value type for %q:%q; want string", k, v)
return
}
switch string(k) {
case "message":
fields = append(fields, logstorage.Field{
Name: "_msg",
Value: bytesutil.ToUnsafeString(val),
})
case "ddtags":
// https://docs.datadoghq.com/getting_started/tagging/
var pair []byte
idx := 0
for idx >= 0 {
idx = bytes.IndexByte(val, ',')
if idx < 0 {
pair = val
} else {
pair = val[:idx]
val = val[idx+1:]
}
if len(pair) > 0 {
n := bytes.IndexByte(pair, ':')
if n < 0 {
// No tag value.
fields = append(fields, logstorage.Field{
Name: bytesutil.ToUnsafeString(pair),
Value: "no_label_value",
})
}
fields = append(fields, logstorage.Field{
Name: bytesutil.ToUnsafeString(pair[:n]),
Value: bytesutil.ToUnsafeString(pair[n+1:]),
})
}
}
default:
fields = append(fields, logstorage.Field{
Name: bytesutil.ToUnsafeString(k),
Value: bytesutil.ToUnsafeString(val),
})
}
})
processLogMessage(ts, fields)
fields = fields[:0]
}
return len(records), nil
}

View file

@ -0,0 +1,117 @@
package datadog
import (
"fmt"
"strings"
"testing"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage"
)
func TestReadLogsRequestFailure(t *testing.T) {
f := func(data string) {
t.Helper()
ts := time.Now().UnixNano()
processLogMessage := func(timestamp int64, fields []logstorage.Field) {
t.Fatalf("unexpected call to processLogMessage with timestamp=%d, fields=%s", timestamp, fields)
}
rows, err := readLogsRequest(ts, []byte(data), processLogMessage)
if err == nil {
t.Fatalf("expecting non-empty error")
}
if rows != 0 {
t.Fatalf("unexpected non-zero rows=%d", rows)
}
}
f("foobar")
f(`{}`)
f(`["create":{}]`)
f(`{"create":{}}
foobar`)
}
func TestReadLogsRequestSuccess(t *testing.T) {
f := func(data string, rowsExpected int, resultExpected string) {
t.Helper()
ts := time.Now().UnixNano()
var result string
processLogMessage := func(_ int64, fields []logstorage.Field) {
a := make([]string, len(fields))
for i, f := range fields {
a[i] = fmt.Sprintf("%q:%q", f.Name, f.Value)
}
if len(result) > 0 {
result = result + "\n"
}
s := "{" + strings.Join(a, ",") + "}"
result += s
}
// Read the request without compression
rows, err := readLogsRequest(ts, []byte(data), processLogMessage)
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
if rows != rowsExpected {
t.Fatalf("unexpected rows read; got %d; want %d", rows, rowsExpected)
}
if result != resultExpected {
t.Fatalf("unexpected result;\ngot\n%s\nwant\n%s", result, resultExpected)
}
}
// Verify non-empty data
data := `[
{
"ddsource":"nginx",
"ddtags":"tag1:value1,tag2:value2",
"hostname":"127.0.0.1",
"message":"bar",
"service":"test"
}, {
"ddsource":"nginx",
"ddtags":"tag1:value1,tag2:value2",
"hostname":"127.0.0.1",
"message":"foobar",
"service":"test"
}, {
"ddsource":"nginx",
"ddtags":"tag1:value1,tag2:value2",
"hostname":"127.0.0.1",
"message":"baz",
"service":"test"
}, {
"ddsource":"nginx",
"ddtags":"tag1:value1,tag2:value2",
"hostname":"127.0.0.1",
"message":"xyz",
"service":"test"
}, {
"ddsource": "nginx",
"ddtags":"tag1:value1,tag2:value2,",
"hostname":"127.0.0.1",
"message":"xyz",
"service":"test"
}, {
"ddsource":"nginx",
"ddtags":",tag1:value1,tag2:value2",
"hostname":"127.0.0.1",
"message":"xyz",
"service":"test"
}
]`
rowsExpected := 6
resultExpected := `{"ddsource":"nginx","tag1":"value1","tag2":"value2","hostname":"127.0.0.1","_msg":"bar","service":"test"}
{"ddsource":"nginx","tag1":"value1","tag2":"value2","hostname":"127.0.0.1","_msg":"foobar","service":"test"}
{"ddsource":"nginx","tag1":"value1","tag2":"value2","hostname":"127.0.0.1","_msg":"baz","service":"test"}
{"ddsource":"nginx","tag1":"value1","tag2":"value2","hostname":"127.0.0.1","_msg":"xyz","service":"test"}
{"ddsource":"nginx","tag1":"value1","tag2":"value2","hostname":"127.0.0.1","_msg":"xyz","service":"test"}
{"ddsource":"nginx","tag1":"value1","tag2":"value2","hostname":"127.0.0.1","_msg":"xyz","service":"test"}`
f(data, rowsExpected, resultExpected)
}

View file

@ -4,6 +4,7 @@ import (
"net/http"
"strings"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vlinsert/datadog"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vlinsert/elasticsearch"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vlinsert/journald"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vlinsert/jsonline"
@ -25,6 +26,7 @@ func Stop() {
// RequestHandler handles insert requests for VictoriaLogs
func RequestHandler(w http.ResponseWriter, r *http.Request) bool {
path := r.URL.Path
if !strings.HasPrefix(path, "/insert/") {
// Skip requests, which do not start with /insert/, since these aren't our requests.
return false
@ -49,6 +51,9 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool {
case strings.HasPrefix(path, "/journald/"):
path = strings.TrimPrefix(path, "/journald")
return journald.RequestHandler(path, w, r)
case strings.HasPrefix(path, "/datadog/"):
path = strings.TrimPrefix(path, "/datadog")
return datadog.RequestHandler(path, w, r)
default:
return false
}

View file

@ -17,6 +17,13 @@ services:
timeout: 1s
retries: 10
dd-logs:
image: docker.io/victoriametrics/vmauth:v1.105.0
restart: on-failure
volumes:
- ./:/etc/vmauth
command: -auth.config=/etc/vmauth/vmauth.yaml
victorialogs:
extends: .victorialogs
ports:

View file

@ -0,0 +1 @@
**/logs

View file

@ -0,0 +1,29 @@
# Docker compose DataDog Agent integration with VictoriaLogs
The folder contains examples of [DataDog agent](https://docs.datadoghq.com/agent) integration with VictoriaLogs using protocols:
* [datadog](./datadog)
To spin-up environment `cd` to any of listed above directories run the following command:
```
docker compose up -d
```
To shut down the docker-compose environment run the following command:
```
docker compose down
docker compose rm -f
```
The docker compose file contains the following components:
* datadog - Datadog logs collection agent, which is configured to collect and write data to `victorialogs`
* victorialogs - VictoriaLogs log database, which accepts the data from `datadog`
* victoriametrics - VictoriaMetrics metrics database, which collects metrics from `victorialogs` and `datadog`
Querying the data
* [vmui](https://docs.victoriametrics.com/victorialogs/querying/#vmui) - a web UI is accessible by `http://localhost:9428/select/vmui`
* for querying the data via command-line please check [these docs](https://docs.victoriametrics.com/victorialogs/querying/#command-line)
Please, note that `_stream_fields` parameter must follow recommended [best practices](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields) to achieve better performance.

View file

@ -0,0 +1,26 @@
include:
- ../compose-base.yml
services:
agent:
image: docker.io/datadog/agent:7.57.2
restart: on-failure
volumes:
- /var/lib/docker/containers:/var/lib/docker/containers
- /var/run/docker.sock:/var/run/docker.sock:ro
- /proc/:/host/proc/:ro
- /sys/fs/cgroup/:/host/sys/fs/cgroup:ro
environment:
DD_API_KEY: test
DD_URL: http://victoriametrics:8428/datadog
DD_LOGS_CONFIG_LOGS_DD_URL: http://dd-logs:8427
DD_LOGS_CONFIG_CONTAINER_COLLECT_ALL: true
DD_LOGS_ENABLED: true
DD_LOGS_CONFIG_USE_HTTP: true
DD_PROCESS_CONFIG_PROCESS_COLLECTION_ENABLED: false
DD_PROCESS_CONFIG_CONTAINER_COLLECTION_ENABLED: false
DD_PROCESS_CONFIG_PROCESS_DISCOVERY_ENABLED: false
depends_on:
victorialogs:
condition: service_healthy
victoriametrics:
condition: service_healthy

View file

@ -0,0 +1,3 @@
include:
- ../compose-base.yml
name: agent-datadog

View file

@ -2,6 +2,7 @@
The folder contains examples of [FluentBit](https://docs.fluentbit.io/manual) integration with VictoriaLogs using protocols:
* [datadog](./datadog)
* [loki](./loki)
* [jsonline single node](./jsonline)
* [jsonline HA setup](./jsonline-ha)
@ -30,6 +31,7 @@ Querying the data
* for querying the data via command-line please check [these docs](https://docs.victoriametrics.com/victorialogs/querying/#command-line)
FluentBit configuration example can be found below:
* [datadog](./datadog/fluent-bit.conf)
* [loki](./loki/fluent-bit.conf)
* [jsonline single node](./jsonline/fluent-bit.conf)
* [jsonline HA setup](./jsonline-ha/fluent-bit.conf)

View file

@ -0,0 +1,3 @@
include:
- ../compose-base.yml
name: fluentbit-datadog

View file

@ -0,0 +1,31 @@
[INPUT]
name tail
path /var/lib/docker/containers/**/*.log
path_key path
multiline.parser docker, cri
Parser docker
Docker_Mode On
[INPUT]
Name syslog
Listen 0.0.0.0
Port 5140
Parser syslog-rfc3164
Mode tcp
[SERVICE]
Flush 1
Parsers_File parsers.conf
[OUTPUT]
Name datadog
Match *
Host dd-logs
Port 8427
TLS off
compress gzip
apikey test
dd_service test
dd_source data
dd_message_key log
dd_tags env:dev

View file

@ -4,6 +4,7 @@ The folder contains examples of [Fluentd](https://www.fluentd.org/) integration
* [loki](./loki)
* [jsonline](./jsonline)
* [datadog](./datadog)
* [elasticsearch](./elasticsearch)
All required plugins, that should be installed in order to support protocols listed above can be found in a [Dockerfile](./Dockerfile)

View file

@ -0,0 +1,3 @@
include:
- ../compose-base.yml
name: fluentd-datadog

View file

@ -0,0 +1,27 @@
<source>
@type tail
format none
tag docker.testlog
path /var/lib/docker/containers/**/*.log
</source>
<source>
@type forward
port 24224
bind 0.0.0.0
</source>
<match **>
@type datadog
api_key test
# Optional
port 8427
use_ssl false
host dd-logs
include_tag_key true
tag_key 'tag'
# Optional parameters
dd_source 'fluentd'
dd_tags 'key1:value1,key2:value2'
dd_sourcecategory 'test'
</match>

View file

@ -6,6 +6,7 @@ The folder contains examples of [Vector](https://vector.dev/docs/) integration w
* [loki](./loki)
* [jsonline single node](./jsonline)
* [jsonline HA setup](./jsonline-ha)
* [datadog](./datadog)
To spin-up environment `cd` to any of listed above directories run the following command:
```
@ -34,5 +35,6 @@ Vector configuration example can be found below:
* [loki](./loki/vector.yaml)
* [jsonline single node](./jsonline/vector.yaml)
* [jsonline HA setup](./jsonline-ha/vector.yaml)
* [datadog](./datadog/vector.yaml)
Please, note that `_stream_fields` parameter must follow recommended [best practices](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields) to achieve better performance.

View file

@ -0,0 +1,3 @@
include:
- ../compose-base.yml
name: vector-datadog

View file

@ -0,0 +1,28 @@
api:
enabled: true
address: 0.0.0.0:8686
sources:
vector_metrics:
type: internal_metrics
demo:
type: demo_logs
format: json
sinks:
datadog:
type: datadog_logs
inputs: [demo]
default_api_key: test
endpoint: http://dd-logs:8427
compression: gzip
request:
headers:
dd-protocol: test # required by VictoriaLogs
AccountID: "0"
ProjectID: "0"
VL-Stream-Fields: "service,host"
victoriametrics:
type: prometheus_remote_write
endpoint: http://victoriametrics:8428/api/v1/write
inputs: [vector_metrics]
healthcheck:
enabled: false

View file

@ -0,0 +1,6 @@
unauthorized_user:
url_map:
- src_paths:
- "/api/v2/logs"
- "/api/v1/validate"
url_prefix: "http://victorialogs:9428/insert/datadog/"

View file

@ -20,8 +20,6 @@ See [these docs](https://docs.victoriametrics.com/victorialogs/) for details.
The following functionality is planned in the future versions of VictoriaLogs:
- Support for [data ingestion](https://docs.victoriametrics.com/victorialogs/data-ingestion/) from popular log collectors and formats:
- [ ] [Datadog protocol for logs](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6632)
- [ ] Integration with Grafana. Partially done, check the [documentation](https://docs.victoriametrics.com/victorialogs/victorialogs-datasource/) and [datasource repository](https://github.com/VictoriaMetrics/victorialogs-datasource).
- [ ] Ability to make instant snapshots and backups in the way [similar to VictoriaMetrics](https://docs.victoriametrics.com/#how-to-work-with-snapshots).
- [ ] Cluster version of VictoriaLogs.

View file

@ -0,0 +1,52 @@
---
weight: 5
title: DataDog Agent setup
disableToc: true
menu:
docs:
parent: "victorialogs-data-ingestion"
weight: 5
url: /victorialogs/data-ingestion/datadog-agent/
aliases:
- /VictoriaLogs/data-ingestion/DataDogAgent.html
---
Datadog Agent doesn't support custom path prefix, so for this reason it's required to use [VMAuth](https://docs.victoriametrics.com/vmauth/) or any other
reverse proxy to append `/insert/datadog` path prefix to all Datadog API logs requests.
In case of [VMAuth](https://docs.victoriametrics.com/vmauth/) your config should look like:
```yaml
unauthorized_user:
url_map:
- src_paths:
- "/api/v2/logs"
url_prefix: "`<victoria-logs-base-url>`/insert/datadog/"
```
To start ingesting logs from DataDog agent please specify a custom URL instead of default one for sending collected logs to [VictoriaLogs](https://docs.victoriametrics.com/VictoriaLogs/):
```yaml
logs_enabled: true
logs_config:
logs_dd_url: `<vmauth-base-url>`
use_http: true
```
While using [Serverless DataDog plugin](https://github.com/DataDog/serverless-plugin-datadog) please set VictoriaLogs endpoint using `LOGS_DD_URL` environment variable:
```yaml
custom:
datadog:
apiKey: fakekey # Set any key, otherwise plugin fails
provider:
environment:
LOGS_DD_URL: `<vmauth-base-url>`/ # VictoriaLogs endpoint for DataDog
```
Substitute the `<vmauth-base-url>` address with the real address of VMAuth proxy.
See also:
- [Data ingestion troubleshooting](https://docs.victoriametrics.com/victorialogs/data-ingestion/#troubleshooting).
- [How to query VictoriaLogs](https://docs.victoriametrics.com/victorialogs/querying/).
- [Docker-compose demo for Datadog integration with VictoriaLogs](https://github.com/VictoriaMetrics/VictoriaMetrics/tree/master/deployment/docker/victorialogs/datadog).

View file

@ -309,17 +309,18 @@ VictoriaLogs exposes various [metrics](https://docs.victoriametrics.com/victoria
Here is the list of log collectors and their ingestion formats supported by VictoriaLogs:
| How to setup the collector | Format: Elasticsearch | Format: JSON Stream | Format: Loki | Format: syslog | Format: OpenTelemetry | Format: Journald |
|----------------------------|-----------------------|---------------------|--------------|----------------|-----------------------|------------------|
| [Rsyslog](https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/) | [Yes](https://www.rsyslog.com/doc/configuration/modules/omelasticsearch.html) | No | No | [Yes](https://www.rsyslog.com/doc/configuration/modules/omfwd.html) | No | No |
| [Syslog-ng](https://docs.victoriametrics.com/victorialogs/data-ingestion/filebeat/) | Yes, [v1](https://support.oneidentity.com/technical-documents/syslog-ng-open-source-edition/3.16/administration-guide/28#TOPIC-956489), [v2](https://support.oneidentity.com/technical-documents/doc/syslog-ng-open-source-edition/3.16/administration-guide/29#TOPIC-956494) | No | No | [Yes](https://support.oneidentity.com/technical-documents/doc/syslog-ng-open-source-edition/3.16/administration-guide/44#TOPIC-956553) | No | No |
| [Filebeat](https://docs.victoriametrics.com/victorialogs/data-ingestion/filebeat/) | [Yes](https://www.elastic.co/guide/en/beats/filebeat/current/elasticsearch-output.html) | No | No | No | No | No |
| [Fluentbit](https://docs.victoriametrics.com/victorialogs/data-ingestion/fluentbit/) | No | [Yes](https://docs.fluentbit.io/manual/pipeline/outputs/http) | [Yes](https://docs.fluentbit.io/manual/pipeline/outputs/loki) | [Yes](https://docs.fluentbit.io/manual/pipeline/outputs/syslog) | [Yes](https://docs.fluentbit.io/manual/pipeline/outputs/opentelemetry) | No |
| [Logstash](https://docs.victoriametrics.com/victorialogs/data-ingestion/logstash/) | [Yes](https://www.elastic.co/guide/en/logstash/current/plugins-outputs-elasticsearch.html) | No | No | [Yes](https://www.elastic.co/guide/en/logstash/current/plugins-outputs-syslog.html) | [Yes](https://github.com/paulgrav/logstash-output-opentelemetry) | No |
| [Vector](https://docs.victoriametrics.com/victorialogs/data-ingestion/vector/) | [Yes](https://vector.dev/docs/reference/configuration/sinks/elasticsearch/) | [Yes](https://vector.dev/docs/reference/configuration/sinks/http/) | [Yes](https://vector.dev/docs/reference/configuration/sinks/loki/) | No | [Yes](https://vector.dev/docs/reference/configuration/sources/opentelemetry/) | No |
| [Promtail](https://docs.victoriametrics.com/victorialogs/data-ingestion/promtail/) | No | No | [Yes](https://grafana.com/docs/loki/latest/clients/promtail/configuration/#clients) | No | No | No |
| [OpenTelemetry Collector](https://opentelemetry.io/docs/collector/) | [Yes](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/exporter/elasticsearchexporter) | No | [Yes](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/exporter/lokiexporter) | [Yes](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/exporter/syslogexporter) | [Yes](https://github.com/open-telemetry/opentelemetry-collector/tree/main/exporter/otlphttpexporter) | No |
| [Telegraf](https://docs.victoriametrics.com/victorialogs/data-ingestion/telegraf/) | [Yes](https://github.com/influxdata/telegraf/tree/master/plugins/outputs/elasticsearch) | [Yes](https://github.com/influxdata/telegraf/tree/master/plugins/outputs/http) | [Yes](https://github.com/influxdata/telegraf/tree/master/plugins/outputs/loki) | [Yes](https://github.com/influxdata/telegraf/blob/master/plugins/outputs/syslog) | Yes | No |
| [Fluentd](https://docs.victoriametrics.com/victorialogs/data-ingestion/fluentd/) | [Yes](https://github.com/uken/fluent-plugin-elasticsearch) | [Yes](https://docs.fluentd.org/output/http) | [Yes](https://grafana.com/docs/loki/latest/send-data/fluentd/) | [Yes](https://github.com/fluent-plugins-nursery/fluent-plugin-remote_syslog) | No | No |
| [Journald](https://docs.victoriametrics.com/victorialogs/data-ingestion/journald/) | No | No | No | No | No | Yes |
| How to setup the collector | Format: Elasticsearch | Format: JSON Stream | Format: Loki | Format: syslog | Format: OpenTelemetry | Format: Journald | Format: DataDog |
|----------------------------|-----------------------|---------------------|--------------|----------------|-----------------------|------------------|-----------------|
| [Rsyslog](https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/) | [Yes](https://www.rsyslog.com/doc/configuration/modules/omelasticsearch.html) | No | No | [Yes](https://www.rsyslog.com/doc/configuration/modules/omfwd.html) | No | No | No |
| [Syslog-ng](https://docs.victoriametrics.com/victorialogs/data-ingestion/filebeat/) | Yes, [v1](https://support.oneidentity.com/technical-documents/syslog-ng-open-source-edition/3.16/administration-guide/28#TOPIC-956489), [v2](https://support.oneidentity.com/technical-documents/doc/syslog-ng-open-source-edition/3.16/administration-guide/29#TOPIC-956494) | No | No | [Yes](https://support.oneidentity.com/technical-documents/doc/syslog-ng-open-source-edition/3.16/administration-guide/44#TOPIC-956553) | No | No | No |
| [Filebeat](https://docs.victoriametrics.com/victorialogs/data-ingestion/filebeat/) | [Yes](https://www.elastic.co/guide/en/beats/filebeat/current/elasticsearch-output.html) | No | No | No | No | No | No |
| [Fluentbit](https://docs.victoriametrics.com/victorialogs/data-ingestion/fluentbit/) | No | [Yes](https://docs.fluentbit.io/manual/pipeline/outputs/http) | [Yes](https://docs.fluentbit.io/manual/pipeline/outputs/loki) | [Yes](https://docs.fluentbit.io/manual/pipeline/outputs/syslog) | [Yes](https://docs.fluentbit.io/manual/pipeline/outputs/opentelemetry) | No | [Yes](https://docs.fluentbit.io/manual/pipeline/outputs/datadog) |
| [Logstash](https://docs.victoriametrics.com/victorialogs/data-ingestion/logstash/) | [Yes](https://www.elastic.co/guide/en/logstash/current/plugins-outputs-elasticsearch.html) | No | No | [Yes](https://www.elastic.co/guide/en/logstash/current/plugins-outputs-syslog.html) | [Yes](https://github.com/paulgrav/logstash-output-opentelemetry) | No | [Yes](https://www.elastic.co/guide/en/logstash/current/plugins-outputs-datadog.html) |
| [Vector](https://docs.victoriametrics.com/victorialogs/data-ingestion/vector/) | [Yes](https://vector.dev/docs/reference/configuration/sinks/elasticsearch/) | [Yes](https://vector.dev/docs/reference/configuration/sinks/http/) | [Yes](https://vector.dev/docs/reference/configuration/sinks/loki/) | No | [Yes](https://vector.dev/docs/reference/configuration/sources/opentelemetry/) | No | [Yes](https://vector.dev/docs/reference/configuration/sinks/datadog_logs/) |
| [Promtail](https://docs.victoriametrics.com/victorialogs/data-ingestion/promtail/) | No | No | [Yes](https://grafana.com/docs/loki/latest/clients/promtail/configuration/#clients) | No | No | No | No |
| [OpenTelemetry Collector](https://opentelemetry.io/docs/collector/) | [Yes](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/exporter/elasticsearchexporter) | No | [Yes](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/exporter/lokiexporter) | [Yes](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/exporter/syslogexporter) | [Yes](https://github.com/open-telemetry/opentelemetry-collector/tree/main/exporter/otlphttpexporter) | No | [Yes](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/exporter/datadogexporter) |
| [Telegraf](https://docs.victoriametrics.com/victorialogs/data-ingestion/telegraf/) | [Yes](https://github.com/influxdata/telegraf/tree/master/plugins/outputs/elasticsearch) | [Yes](https://github.com/influxdata/telegraf/tree/master/plugins/outputs/http) | [Yes](https://github.com/influxdata/telegraf/tree/master/plugins/outputs/loki) | [Yes](https://github.com/influxdata/telegraf/blob/master/plugins/outputs/syslog) | Yes | No | No |
| [Fluentd](https://docs.victoriametrics.com/victorialogs/data-ingestion/fluentd/) | [Yes](https://github.com/uken/fluent-plugin-elasticsearch) | [Yes](https://docs.fluentd.org/output/http) | [Yes](https://grafana.com/docs/loki/latest/send-data/fluentd/) | [Yes](https://github.com/fluent-plugins-nursery/fluent-plugin-remote_syslog) | No | No | No |
| [Journald](https://docs.victoriametrics.com/victorialogs/data-ingestion/journald/) | No | No | No | No | No | Yes | No |
| [DataDog Agent](https://docs.victoriametrics.com/VictoriaLogs/data-ingestion/DataDogAgent.html) | No | No | No | No | No | No | Yes |