app/vlinsert: support getting _msg_field, _time_field, _stream_fields and _ignore_fields from headers

*  Many collectors don't support forwarding url query params to the remote system. It makes impossible to define stream fields for it. Workaround with proxy between VictoriaLogs and log shipper is too complicated solution.

* This commit adds the following changes:
 * Adds fallback to to headers params, if query param is empty for:
     _msg_field -> VL-Msg-Field
    _stream_fields -> VL-Stream-Fields
    _ignore_fields -> VL-Ignore-Fields
    _time_field -> VL-Time-Field
 * removes deprecations from victorialogs compose files, added more
output format examples for logstash, telegraf, fluent-bit

 related issue: https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5310
This commit is contained in:
Andrii Chubatiuk 2024-09-03 18:43:26 +03:00 committed by GitHub
parent 4dcb6a3719
commit 1731c0eabf
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
28 changed files with 731 additions and 265 deletions

View file

@ -38,20 +38,34 @@ func GetCommonParams(r *http.Request) (*CommonParams, error) {
return nil, err
}
// Extract time field name from _time_field query arg
// Extract time field name from _time_field query arg or header
var timeField = "_time"
if tf := r.FormValue("_time_field"); tf != "" {
timeField = tf
} else if tf = r.Header.Get("VL-Time-Field"); tf != "" {
timeField = tf
}
// Extract message field name from _msg_field query arg
// Extract message field name from _msg_field query arg or header
var msgField = ""
if msgf := r.FormValue("_msg_field"); msgf != "" {
msgField = msgf
} else if msgf = r.Header.Get("VL-Msg-Field"); msgf != "" {
msgField = msgf
}
streamFields := httputils.GetArray(r, "_stream_fields")
if len(streamFields) == 0 {
if sf := r.Header.Values("VL-Stream-Fields"); len(sf) > 0 {
streamFields = sf
}
}
ignoreFields := httputils.GetArray(r, "ignore_fields")
if len(ignoreFields) == 0 {
if f := r.Header.Values("VL-Ignore-Fields"); len(f) > 0 {
ignoreFields = f
}
}
debug := httputils.GetBool(r, "debug")
debugRequestURI := ""
@ -71,6 +85,7 @@ func GetCommonParams(r *http.Request) (*CommonParams, error) {
DebugRequestURI: debugRequestURI,
DebugRemoteAddr: debugRemoteAddr,
}
return cp, nil
}

View file

@ -1,5 +1,3 @@
version: "3"
services:
filebeat-vlogs:
image: docker.elastic.co/beats/filebeat:8.8.1

View file

@ -1,5 +1,3 @@
version: '3'
services:
filebeat-victorialogs:
image: docker.elastic.co/beats/filebeat:8.8.1

View file

@ -1,30 +0,0 @@
[INPUT]
name tail
path /var/lib/docker/containers/**/*.log
path_key path
multiline.parser docker, cri
Parser docker
Docker_Mode On
[INPUT]
Name syslog
Listen 0.0.0.0
Port 5140
Parser syslog-rfc3164
Mode tcp
[SERVICE]
Flush 1
Parsers_File parsers.conf
[Output]
Name http
Match *
host victorialogs
port 9428
compress gzip
uri /insert/jsonline?_stream_fields=stream,path&_msg_field=log&_time_field=date
format json_lines
json_date_format iso8601
header AccountID 0
header ProjectID 0

View file

@ -1,8 +1,6 @@
version: "3"
services:
fluentbit:
image: cr.fluentbit.io/fluent/fluent-bit:3.0.2
image: cr.fluentbit.io/fluent/fluent-bit:3.0.7
volumes:
- /var/lib/docker/containers:/var/lib/docker/containers:ro
- ./fluent-bit.conf:/fluent-bit/etc/fluent-bit.conf

View file

@ -0,0 +1,54 @@
[INPUT]
name tail
path /var/lib/docker/containers/**/*.log
path_key path
multiline.parser docker, cri
Parser docker
Docker_Mode On
[INPUT]
Name syslog
Listen 0.0.0.0
Port 5140
Parser syslog-rfc3164
Mode tcp
[SERVICE]
Flush 1
Parsers_File parsers.conf
[OUTPUT]
Name http
Match *
host victorialogs
port 9428
compress gzip
uri /insert/jsonline?_stream_fields=stream,path&_msg_field=log&_time_field=date
format json_lines
json_date_format iso8601
header AccountID 0
header ProjectID 0
[OUTPUT]
Name es
Match *
host victorialogs
port 9428
compress gzip
path /insert/elasticsearch
header AccountID 0
header ProjectID 0
header VL-Stream-Fields path
header VL-Msg-Field log
header VL-Time-Field @timestamp
[OUTPUT]
name loki
match *
host victorialogs
uri /insert/loki/api/v1/push
port 9428
label_keys $path,$log,$time
header VL-Msg-Field log
header VL-Time-Field time
header VL-Stream-Fields path

View file

@ -1,3 +1,5 @@
FROM docker.elastic.co/logstash/logstash:8.8.1
RUN bin/logstash-plugin install logstash-output-opensearch
RUN bin/logstash-plugin install \
logstash-output-opensearch \
logstash-output-loki

View file

@ -17,4 +17,12 @@ output {
"_time_field" => "@timestamp"
}
}
http {
url => "http://victorialogs:9428/insert/jsonline?_stream_fields=host.ip,process.name&_msg_field=message&_time_field=@timestamp"
format => "json"
http_method => "post"
}
loki {
url => "http://victorialogs:9428/insert/loki/api/v1/push?_stream_fields=host.ip,process.name&_msg_field=message&_time_field=@timestamp"
}
}

View file

@ -1,5 +1,3 @@
version: "3"
services:
promtail:
image: grafana/promtail:2.8.2

View file

@ -0,0 +1,25 @@
# Docker compose Telegraf integration with VictoriaLogs for docker
The folder contains the example of integration of [telegraf](https://www.influxdata.com/time-series-platform/telegraf/) with VictoriaLogs
To spin-up environment run the following command:
```
docker compose up -d
```
To shut down the docker-compose environment run the following command:
```
docker compose down
docker compose rm -f
```
The docker compose file contains the following components:
* telegraf - telegraf is configured to collect logs from the `docker`, you can find configuration in the `telegraf.conf`. It writes data in VictoriaLogs. It pushes metrics to VictoriaMetrics.
* VictoriaLogs - the log database, it accepts the data from `telegraf` by elastic protocol
* VictoriaMetrics - collects metrics from `VictoriaLogs` and `VictoriaMetrics`
Querying the data
* [vmui](https://docs.victoriametrics.com/victorialogs/querying/#vmui) - a web UI is accessible by `http://localhost:9428/select/vmui`
* for querying the data via command-line please check [these docs](https://docs.victoriametrics.com/victorialogs/querying/#command-line)

View file

@ -0,0 +1,55 @@
services:
telegraf:
image: bitnami/telegraf:1.31.0
restart: on-failure
volumes:
- type: bind
source: /var/run/docker.sock
target: /var/run/docker.sock
- type: bind
source: /var/lib/docker
target: /var/lib/docker
- ./telegraf.conf:/etc/telegraf/telegraf.conf:ro
command: --config /etc/telegraf/telegraf.conf
depends_on:
victorialogs:
condition: service_healthy
victoriametrics:
condition: service_healthy
victorialogs:
image: docker.io/victoriametrics/victoria-logs:v0.20.2-victorialogs
volumes:
- victorialogs-vector-docker-vl:/vlogs
ports:
- '9428:9428'
command:
- -storageDataPath=/vlogs
- -loggerFormat=json
- -syslog.listenAddr.tcp=0.0.0.0:8094
healthcheck:
test: ["CMD", "wget", "-qO-", "http://127.0.0.1:9428/health"]
interval: 1s
timeout: 1s
retries: 10
victoriametrics:
image: victoriametrics/victoria-metrics:latest
ports:
- '8428:8428'
command:
- -storageDataPath=/vmsingle
- -promscrape.config=/promscrape.yml
- -loggerFormat=json
volumes:
- victorialogs-vector-docker-vm:/vmsingle
- ./scrape.yml:/promscrape.yml
healthcheck:
test: ["CMD", "wget", "-qO-", "http://127.0.0.1:8428/health"]
interval: 1s
timeout: 1s
retries: 10
volumes:
victorialogs-vector-docker-vl:
victorialogs-vector-docker-vm:

View file

@ -0,0 +1,70 @@
[agent]
interval = "10s"
round_interval = true
metric_batch_size = 1000
metric_buffer_limit = 100000
collection_jitter = "0s"
flush_interval = "10s"
flush_jitter = "0s"
precision = ""
debug = false
quiet = false
logtarget = "file"
logfile = "/dev/null"
hostname = "pop-os"
omit_hostname = false
[[inputs.cpu]]
[[outputs.http]]
url = "http://victorialogs:9428/insert/jsonline?_msg_field=fields.msg&_stream_fields=tags.log_source,tags.metric_type"
data_format = "json"
namepass = ["docker_log"]
use_batch_format = false
[[outputs.loki]]
domain = "http://victorialogs:9428"
endpoint = "/insert/loki/api/v1/push?_msg_field=docker_log.msg&_time_field=@timestamp&_stream_fields=log_source,metric_type"
namepass = ["docker_log"]
gzip_request = true
sanitize_label_names = true
[[outputs.syslog]]
address = "tcp://victorialogs:8094"
namepass = ["docker_log"]
[[outputs.elasticsearch]]
urls = ["http://victorialogs:9428/insert/elasticsearch"]
timeout = "1m"
flush_interval = "30s"
enable_sniffer = false
health_check_interval = "0s"
index_name = "device_log-%Y.%m.%d"
manage_template = false
template_name = "telegraf"
overwrite_template = false
namepass = ["docker_log"]
[outputs.elasticsearch.headers]
VL-Msg-Field = "docker_log.msg"
VL-Time-Field = "@timestamp"
VL-Stream-Fields = "tag.log_source,tag.metric_type"
[[outputs.http]]
url = "http://victoriametrics:8428/api/v1/write"
data_format = "prometheusremotewrite"
namepass = ["cpu"]
[outputs.http.headers]
Content-Type = "application/x-protobuf"
Content-Encoding = "snappy"
X-Prometheus-Remote-Write-Version = "0.1.0"
[[inputs.docker_log]]
[inputs.docker_log.tags]
metric_type = "logs"
log_source = "telegraf"
[[processors.rename]]
namepass = ["docker_log"]
[[processors.rename.replace]]
field = "message"
dest = "msg"

View file

@ -1,75 +0,0 @@
[api]
enabled = true
address = "0.0.0.0:8686"
# ---------------------------------------------
# Docker logs -> VictoriaLogs
# ---------------------------------------------
[sources.docker]
type = "docker_logs"
[transforms.msg_parser]
type = "remap"
inputs = ["docker"]
source = '''
.log = parse_json!(.message)
del(.message)
'''
[sinks.vlogs]
type = "http"
inputs = [ "msg_parser" ]
uri = "http://victorialogs:9428/insert/jsonline?_stream_fields=source_type,host,container_name&_msg_field=log.msg&_time_field=timestamp"
encoding.codec = "json"
framing.method = "newline_delimited"
compression = "gzip"
healthcheck.enabled = false
[sinks.vlogs.request.headers]
AccountID = "0"
ProjectID = "0"
# ---------------------------------------------
# Generted demo logs -> VictoriaLogs
# ---------------------------------------------
[sources.demo]
type = "demo_logs"
format = "apache_common"
interval = 10
[sinks.vlogs_demo]
type = "elasticsearch"
inputs = [ "demo" ]
endpoints = [ "http://victorialogs:9428/insert/elasticsearch/" ]
mode = "bulk"
api_version = "v8"
compression = "gzip"
healthcheck.enabled = false
[sinks.vlogs_demo.query]
_msg_field = "message"
_time_field = "timestamp"
_stream_fields = "source_type"
[sinks.vlogs_demo.request.headers]
AccountID = "0"
ProjectID = "0"
# ---------------------------------------------
# Vector Metrics -> VictoriaMetrics
# ---------------------------------------------
[sources.vector_metrics]
type = "internal_metrics"
[sinks.victoriametrics]
type = "prometheus_remote_write"
endpoint = "http://victoriametrics:8428/api/v1/write"
inputs = ["vector_metrics"]
healthcheck.enabled = false

View file

@ -15,7 +15,7 @@ docker compose rm -f
The docker compose file contains the following components:
* vector - vector is configured to collect logs from the `docker`, you can find configuration in the `vector.toml`. It writes data in VictoriaLogs. It pushes metrics to VictoriaMetrics.
* vector - vector is configured to collect logs from the `docker`, you can find configuration in the `vector.yaml`. It writes data in VictoriaLogs. It pushes metrics to VictoriaMetrics.
* VictoriaLogs - the log database, it accepts the data from `vector` by elastic protocol
* VictoriaMetrics - collects metrics from `VictoriaLogs` and `VictoriaMetrics`
@ -25,37 +25,39 @@ Querying the data
* for querying the data via command-line please check [these docs](https://docs.victoriametrics.com/victorialogs/querying/#command-line)
the example of vector configuration(`vector.toml`)
the example of vector configuration(`vector.yaml`)
```
[sources.docker]
type = "docker_logs"
[transforms.msg_parser]
type = "remap"
inputs = ["docker"]
source = '''
sources:
docker:
type: docker_logs
transforms:
msg_parser:
type: remap
inputs:
- docker
source: |
.log = parse_json!(.message)
del(.message)
'''
[sinks.vlogs]
type = "elasticsearch"
inputs = [ "msg_parser" ]
endpoints = [ "http://victorialogs:9428/insert/elasticsearch/" ]
mode = "bulk"
api_version = "v8"
compression = "gzip"
healthcheck.enabled = false
[sinks.vlogs.query]
_msg_field = "log.msg"
_time_field = "timestamp"
_stream_fields = "source_type,host,container_name"
[sinks.vlogs.request.headers]
AccountID = "0"
ProjectID = "0"
sinks:
vlogs_es:
type: elasticsearch
inputs:
- msg_parser
endpoints:
- http://victorialogs:9428/insert/elasticsearch/
mode: bulk
api_version: v8
compression: gzip
healthcheck.enabled: false
query:
_msg_field: log.msg
_time_field: timestamp
_stream_fields: source_type,host,container_name
request:
headers:
AccountID: "0"
ProjectID: "0"
```
Please, note that `_stream_fields` parameter must follow recommended [best practices](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields) to achieve better performance.

View file

@ -1,8 +1,6 @@
version: '3'
services:
vector:
image: docker.io/timberio/vector:0.30.0-distroless-static
image: docker.io/timberio/vector:0.38.0-distroless-static
restart: on-failure
volumes:
- type: bind
@ -11,7 +9,7 @@ services:
- type: bind
source: /var/lib/docker
target: /var/lib/docker
- ./vector.toml:/etc/vector/vector.toml:ro
- ./vector.yaml:/etc/vector/vector.yaml:ro
user: root
ports:
- '8686:8686'
@ -31,7 +29,7 @@ services:
- -storageDataPath=/vlogs
- -loggerFormat=json
healthcheck:
test: ["CMD", "wget", "-qO-", "http://localhost:9428/health"]
test: ["CMD", "wget", "-qO-", "http://127.0.0.1:9428/health"]
interval: 1s
timeout: 1s
retries: 10
@ -48,7 +46,7 @@ services:
- victorialogs-vector-docker-vm:/vmsingle
- ./scrape.yml:/promscrape.yml
healthcheck:
test: ["CMD", "wget", "-qO-", "http://localhost:8428/health"]
test: ["CMD", "wget", "-qO-", "http://127.0.0.1:8428/health"]
interval: 1s
timeout: 1s
retries: 10

View file

@ -0,0 +1,11 @@
scrape_configs:
- job_name: "victoriametrics"
scrape_interval: 30s
static_configs:
- targets:
- victoriametrics:8428
- job_name: "victorialogs"
scrape_interval: 30s
static_configs:
- targets:
- victorialogs:9428

View file

@ -0,0 +1,73 @@
api:
enabled: true
address: 0.0.0.0:8686
sources:
docker:
type: docker_logs
demo:
type: demo_logs
format: json
metrics:
type: internal_metrics
transforms:
msg_parser:
type: remap
inputs:
- docker
source: |
.log = parse_json!(.message)
del(.message)
sinks:
vlogs_http:
type: http
inputs:
- msg_parser
uri: http://victorialogs:9428/insert/jsonline?_stream_fields=source_type,host,container_name&_msg_field=log.msg&_time_field=timestamp
encoding:
codec: json
framing:
method: newline_delimited
compression: gzip
healthcheck:
enabled: false
request:
headers:
AccountID: '0'
ProjectID: '0'
vlogs_loki:
type: loki
inputs:
- demo
endpoint: http://victorialogs:9428/insert/loki/
compression: gzip
path: /api/v1/push?_msg_field=message.message&_time_field=timestamp&_stream_fields=source
encoding:
codec: json
labels:
source: vector
victoriametrics:
type: prometheus_remote_write
endpoint: http://victoriametrics:8428/api/v1/write
inputs:
- metrics
healthcheck:
enabled: false
vlogs_es:
type: elasticsearch
inputs:
- demo
endpoints:
- http://victorialogs:9428/insert/elasticsearch/
mode: bulk
api_version: v8
compression: gzip
healthcheck:
enabled: false
query:
_msg_field: message
_time_field: timestamp
_stream_fields: source_type
request:
headers:
AccountID: '0'
ProjectID: '0'

View file

@ -14,6 +14,51 @@ aliases:
# Fluentbit setup
VictoriaLogs supports given below Fluentbit outputs:
- [Elasticsearch](#elasticsearch)
- [Loki](#loki)
- [HTTP JSON](#http)
## Elasticsearch
Specify [elasticsearch output](https://docs.fluentbit.io/manual/pipeline/outputs/elasticsearch) section in the `fluentbit.conf`
for sending the collected logs to [VictoriaLogs](https://docs.victoriametrics.com/victorialogs/):
```conf
[Output]
Name es
Match *
host victorialogs
port 9428
compress gzip
path /insert/elasticsearch
header AccountID 0
header ProjectID 0
header VL-Stream-Fields path
header VL-Msg-Field log
header VL-Time-Field @timestamp
```
## Loki
Specify [loki output](https://docs.fluentbit.io/manual/pipeline/outputs/loki) section in the `fluentbit.conf`
for sending the collected logs to [VictoriaLogs](https://docs.victoriametrics.com/victorialogs/):
```conf
[OUTPUT]
name loki
match *
host victorialogs
uri /insert/loki/api/v1/push
port 9428
label_keys $path,$log,$time
header VL-Msg-Field log
header VL-Time-Field time
header VL-Stream-Fields path
```
## HTTP
Specify [http output](https://docs.fluentbit.io/manual/pipeline/outputs/http) section in the `fluentbit.conf`
for sending the collected logs to [VictoriaLogs](https://docs.victoriametrics.com/victorialogs/):

View file

@ -11,6 +11,13 @@ aliases:
- /victorialogs/data-ingestion/logstash.html
- /victorialogs/data-ingestion/Logstash.html
---
VictoriaLogs supports given below Logstash outputs:
- [Elasticsearch](#elasticsearch)
- [Loki](#loki)
- [HTTP JSON](#http)
## Elasticsearch
Specify [`output.elasticsearch`](https://www.elastic.co/guide/en/logstash/current/plugins-outputs-elasticsearch.html) section in the `logstash.conf` file
for sending the collected logs to [VictoriaLogs](https://docs.victoriametrics.com/victorialogs/):
@ -106,6 +113,32 @@ output {
}
```
## Loki
Specify [`output.loki`](https://grafana.com/docs/loki/latest/send-data/logstash/) section in the `logstash.conf` file
for sending the collected logs to [VictoriaLogs](https://docs.victoriametrics.com/victorialogs/):
```conf
output {
loki {
url => "http://victorialogs:9428/insert/loki/api/v1/push?_stream_fields=host.ip,process.name&_msg_field=message&_time_field=@timestamp"
}
}
```
## HTTP
Specify [`output.http`](https://www.elastic.co/guide/en/logstash/current/plugins-outputs-http.html) section in the `logstash.conf` file
for sending the collected logs to [VictoriaLogs](https://docs.victoriametrics.com/victorialogs/):
```conf
output {
url => "http://victorialogs:9428/insert/jsonline?_stream_fields=host.ip,process.name&_msg_field=message&_time_field=@timestamp"
format => "json"
http_method => "post"
}
```
See also:
- [Data ingestion troubleshooting](https://docs.victoriametrics.com/victorialogs/data-ingestion/#troubleshooting).

View file

@ -6,6 +6,7 @@
- Logstash - see [these docs](https://docs.victoriametrics.com/victorialogs/data-ingestion/logstash/).
- Vector - see [these docs](https://docs.victoriametrics.com/victorialogs/data-ingestion/vector/).
- Promtail (aka Grafana Loki) - see [these docs](https://docs.victoriametrics.com/victorialogs/data-ingestion/promtail/).
- Telegraf - see [these docs](https://docs.victoriametrics.com/victorialogs/data-ingestion/telegraf/).
The ingested logs can be queried according to [these docs](https://docs.victoriametrics.com/victorialogs/querying/).
@ -198,6 +199,24 @@ See also [HTTP headers](#http-headers).
VictoriaLogs accepts optional `AccountID` and `ProjectID` headers at [data ingestion HTTP APIs](#http-apis).
These headers may contain the needed tenant to ingest data to. See [multitenancy docs](https://docs.victoriametrics.com/victorialogs/#multitenancy) for details.
- `VL-Msg-Field` - it must contain the name of the [log field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model)
with the [log message](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field) generated by the log shipper.
This is usually the `message` field for Filebeat and Logstash.
If the `VL-Msg-Field` header isn't set, then VictoriaLogs reads the log message from the `_msg` field.
- `VL-Time-Field` - it must contain the name of the [log field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model)
with the [log timestamp](https://docs.victoriametrics.com/victorialogs/keyconcepts/#time-field) generated by the log shipper.
This is usually the `@timestamp` field for Filebeat and Logstash.
If the `VL-Time-Field` header isn't set, then VictoriaLogs reads the timestamp from the `_time` field.
If this field doesn't exist, then the current timestamp is used.
- `VL-Stream-Fields` - it should contain comma-separated list of [log field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) names,
which uniquely identify every [log stream](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields) collected the log shipper.
If the `VL-Stream-Fields` header isn't set, then all the ingested logs are written to default log stream - `{}`.
- `VL-Ignore-Fields` - this parameter may contain the list of [log field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) names,
which must be ignored during data ingestion.
## Troubleshooting
The following command can be used for verifying whether the data is successfully ingested into VictoriaLogs:
@ -246,7 +265,8 @@ Here is the list of log collectors and their ingestion formats supported by Vict
| [Rsyslog](https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/) | [Yes](https://www.rsyslog.com/doc/configuration/modules/omelasticsearch.html) | No | No | [Yes](https://www.rsyslog.com/doc/configuration/modules/omfwd.html) |
| [Syslog-ng](https://docs.victoriametrics.com/victorialogs/data-ingestion/filebeat/) | Yes, [v1](https://support.oneidentity.com/technical-documents/syslog-ng-open-source-edition/3.16/administration-guide/28#TOPIC-956489), [v2](https://support.oneidentity.com/technical-documents/doc/syslog-ng-open-source-edition/3.16/administration-guide/29#TOPIC-956494) | No | No | [Yes](https://support.oneidentity.com/technical-documents/doc/syslog-ng-open-source-edition/3.16/administration-guide/44#TOPIC-956553) |
| [Filebeat](https://docs.victoriametrics.com/victorialogs/data-ingestion/filebeat/) | [Yes](https://www.elastic.co/guide/en/beats/filebeat/current/elasticsearch-output.html) | No | No | No |
| [Fluentbit](https://docs.victoriametrics.com/victorialogs/data-ingestion/fluentbit/) | No | [Yes](https://docs.fluentbit.io/manual/pipeline/outputs/http) | [Yes](https://docs.fluentbit.io/manual/pipeline/outputs/loki) | [Yes](https://docs.fluentbit.io/manual/pipeline/outputs/syslog) |
| [Logstash](https://docs.victoriametrics.com/victorialogs/data-ingestion/logstash/) | [Yes](https://www.elastic.co/guide/en/logstash/current/plugins-outputs-elasticsearch.html) | No | No | [Yes](https://www.elastic.co/guide/en/logstash/current/plugins-outputs-syslog.html) |
| [Fluentbit](https://docs.victoriametrics.com/victorialogs/data-ingestion/fluentbit/) | [Yes](https://docs.fluentbit.io/manual/pipeline/outputs/elasticsearch) | [Yes](https://docs.fluentbit.io/manual/pipeline/outputs/http) | [Yes](https://docs.fluentbit.io/manual/pipeline/outputs/loki) | [Yes](https://docs.fluentbit.io/manual/pipeline/outputs/syslog) |
| [Logstash](https://docs.victoriametrics.com/victorialogs/data-ingestion/logstash/) | [Yes](https://www.elastic.co/guide/en/logstash/current/plugins-outputs-elasticsearch.html) | [Yes](https://www.elastic.co/guide/en/logstash/current/plugins-outputs-http.html) | [Yes](https://grafana.com/docs/loki/latest/send-data/logstash/) | [Yes](https://www.elastic.co/guide/en/logstash/current/plugins-outputs-syslog.html) |
| [Vector](https://docs.victoriametrics.com/victorialogs/data-ingestion/vector/) | [Yes](https://vector.dev/docs/reference/configuration/sinks/elasticsearch/) | [Yes](https://vector.dev/docs/reference/configuration/sinks/http/) | [Yes](https://vector.dev/docs/reference/configuration/sinks/loki/) | No |
| [Promtail](https://docs.victoriametrics.com/victorialogs/data-ingestion/promtail/) | No | No | [Yes](https://grafana.com/docs/loki/latest/clients/promtail/configuration/#clients) | No |
| [Telegraf](https://docs.victoriametrics.com/victorialogs/data-ingestion/telegraf/) | [Yes](https://github.com/influxdata/telegraf/tree/master/plugins/outputs/elasticsearch) | [Yes](https://github.com/influxdata/telegraf/tree/master/plugins/outputs/http) | [Yes](https://github.com/influxdata/telegraf/tree/master/plugins/outputs/loki) | [Yes](https://github.com/influxdata/telegraf/blob/master/plugins/outputs/syslog) |

View file

@ -0,0 +1,123 @@
---
weight: 5
title: Telegraf setup
disableToc: true
menu:
docs:
parent: "victorialogs-data-ingestion"
weight: 5
aliases:
- /VictoriaLogs/data-ingestion/Telegraf.html
---
# Telegraf setup
VictoriaLogs supports given below Telegraf outputs:
- [Elasticsearch](#elasticsearch)
- [Loki](#loki)
- [HTTP JSON](#http)
## Elasticsearch
Specify [Elasticsearch output](https://github.com/influxdata/telegraf/tree/master/plugins/outputs/elasticsearch) in the `telegraf.toml`
for sending the collected logs to [VictoriaLogs](https://docs.victoriametrics.com/victorialogs/):
```toml
[[outputs.elasticsearch]]
urls = ["http://localhost:9428/insert/elasticsearch"]
timeout = "1m"
flush_interval = "30s"
enable_sniffer = false
health_check_interval = "0s"
index_name = "device_log-%Y.%m.%d"
manage_template = false
template_name = "telegraf"
overwrite_template = false
namepass = ["tail"]
[outputs.elasticsearch.headers]
"VL-Msg-Field" = "tail.value"
"VL-Time-Field" = "@timestamp"
"VL-Stream-Fields" = "tag.log_source,tag.metric_type"
[[inputs.tail]]
files = ["/tmp/telegraf.log"]
from_beginning = false
interval = "10s"
pipe = false
watch_method = "inotify"
data_format = "value"
data_type = "string"
character_encoding = "utf-8"
[inputs.tail.tags]
metric_type = "logs"
log_source = "telegraf"
```
## Loki
Specify [Loki output](https://github.com/influxdata/telegraf/tree/master/plugins/outputs/loki) in the `telegraf.toml`
for sending the collected logs to [VictoriaLogs](https://docs.victoriametrics.com/victorialogs/):
```toml
[[outputs.loki]]
domain = "http://localhost:9428"
endpoint = "/insert/loki/api/v1/push&_msg_field=tail.value&_time_field=@timefield&_stream_fields=log_source,metric_type"
namepass = ["tail"]
gzip_request = true
sanitize_label_names = true
[[inputs.tail]]
files = ["/tmp/telegraf.log"]
from_beginning = false
interval = "10s"
pipe = false
watch_method = "inotify"
data_format = "value"
data_type = "string"
character_encoding = "utf-8"
[inputs.tail.tags]
metric_type = "logs"
log_source = "telegraf"
```
## HTTP
Specify [HTTP output](https://github.com/influxdata/telegraf/tree/master/plugins/outputs/http) in the `telegraf.toml with batch mode disabled`
for sending the collected logs to [VictoriaLogs](https://docs.victoriametrics.com/victorialogs/):
```toml
[[inputs.tail]]
files = ["/tmp/telegraf.log"]
from_beginning = false
interval = "10s"
pipe = false
watch_method = "inotify"
data_format = "value"
data_type = "string"
character_encoding = "utf-8"
[inputs.tail.tags]
metric_type = "logs"
log_source = "telegraf"
[[outputs.http]]
url = "http://localhost:9428/insert/jsonline?_msg_field=fields.message&_time_field=timestamp,_stream_fields=tags.log_source,tags.metric_type"
data_format = "json"
namepass = ["docker_log"]
use_batch_format = false
```
Substitute the `localhost:9428` address inside `endpoints` section with the real TCP address of VictoriaLogs.
See [these docs](https://docs.victoriametrics.com/victorialogs/data-ingestion/#http-headers) for details on headers specified
in the `[[output.elasticsearch]]` section.
It is recommended verifying whether the initial setup generates the needed [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model)
and uses the correct [stream fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields).
See also:
- [Data ingestion troubleshooting](https://docs.victoriametrics.com/victorialogs/data-ingestion/#troubleshooting).
- [How to query VictoriaLogs](https://docs.victoriametrics.com/victorialogs/querying/).
- [Elasticsearch output docs for Telegraf](https://github.com/influxdata/telegraf/tree/master/plugins/outputs/elasticsearch).
- [Docker-compose demo for Telegraf integration with VictoriaLogs](https://github.com/VictoriaMetrics/VictoriaMetrics/tree/master/deployment/docker/victorialogs/telegraf-docker).

View file

@ -11,24 +11,52 @@ aliases:
- /victorialogs/data-ingestion/Vector.html
- /victorialogs/data-ingestion/vector.html
---
## Elasticsearch sink
VictoriaLogs supports given below Vector sinks:
- [Elasticsearch](#elasticsearch)
- [Loki](#loki)
- [HTTP JSON](#http)
Specify [Elasticsearch sink type](https://vector.dev/docs/reference/configuration/sinks/elasticsearch/) in the `vector.toml`
## Elasticsearch
Specify [Elasticsearch sink type](https://vector.dev/docs/reference/configuration/sinks/elasticsearch/) in the `vector.yaml`
for sending the collected logs to [VictoriaLogs](https://docs.victoriametrics.com/victorialogs/):
```toml
[sinks.vlogs]
inputs = [ "your_input" ]
type = "elasticsearch"
endpoints = [ "http://localhost:9428/insert/elasticsearch/" ]
mode = "bulk"
api_version = "v8"
healthcheck.enabled = false
```yaml
sinks:
vlogs:
inputs:
- your_input
type: elasticsearch
endpoints:
- http://localhost:9428/insert/elasticsearch/
mode: bulk
api_version: v8
healthcheck:
enabled: false
query:
_msg_field: message
_time_field: timestamp
_stream_fields: host,container_name
```
[sinks.vlogs.query]
_msg_field = "message"
_time_field = "timestamp"
_stream_fields = "host,container_name"
## Loki
Specify [Loki sink type](https://vector.dev/docs/reference/configuration/sinks/loki/) in the `vector.yaml`
for sending the collected logs to [VictoriaLogs](https://docs.victoriametrics.com/victorialogs/):
```yaml
sinks:
vlogs:
type: "loki"
endpoint = "http://localhost:9428/insert/loki/"
inputs:
- your_input
compression: gzip
path: /api/v1/push?_msg_field=message.message&_time_field=timestamp&_stream_fields=source
encoding:
codec: json
labels:
source: vector
```
Substitute the `localhost:9428` address inside `endpoints` section with the real TCP address of VictoriaLogs.
@ -36,129 +64,148 @@ Substitute the `localhost:9428` address inside `endpoints` section with the real
Replace `your_input` with the name of the `inputs` section, which collects logs. See [these docs](https://vector.dev/docs/reference/configuration/sources/) for details.
See [these docs](https://docs.victoriametrics.com/victorialogs/data-ingestion/#http-parameters) for details on parameters specified
in the `[sinks.vlogs.query]` section.
in the `sinks.vlogs.query` section.
It is recommended verifying whether the initial setup generates the needed [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model)
and uses the correct [stream fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields).
This can be done by specifying `debug` [parameter](https://docs.victoriametrics.com/victorialogs/data-ingestion/#http-parameters)
in the `[sinks.vlogs.query]` section and inspecting VictoriaLogs logs then:
in the `sinks.vlogs.query` section and inspecting VictoriaLogs logs then:
```toml
[sinks.vlogs]
inputs = [ "your_input" ]
type = "elasticsearch"
endpoints = [ "http://localhost:9428/insert/elasticsearch/" ]
mode = "bulk"
api_version = "v8"
healthcheck.enabled = false
[sinks.vlogs.query]
_msg_field = "message"
_time_field = "timestamp"
_stream_fields = "host,container_name"
debug = "1"
```yaml
sinks:
vlogs:
inputs:
- your_input
type: elasticsearch
endpoints:
- http://localhost:9428/insert/elasticsearch/
mode: bulk
api_version: v8
healthcheck:
enabled: false
query:
_msg_field: message
_time_field: timestamp
_stream_fields: host,container_name
debug: "1"
```
If some [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) must be skipped
during data ingestion, then they can be put into `ignore_fields` [parameter](https://docs.victoriametrics.com/victorialogs/data-ingestion/#http-parameters).
For example, the following config instructs VictoriaLogs to ignore `log.offset` and `event.original` fields in the ingested logs:
```toml
[sinks.vlogs]
inputs = [ "your_input" ]
type = "elasticsearch"
endpoints = [ "http://localhost:9428/insert/elasticsearch/" ]
mode = "bulk"
api_version = "v8"
healthcheck.enabled = false
[sinks.vlogs.query]
_msg_field = "message"
_time_field = "timestamp"
_stream_fields = "host,container_name"
ignore_fields = "log.offset,event.original"
```yaml
sinks:
vlogs:
inputs:
- your_input
type: elasticsearch
endpoints:
- http://localhost:9428/insert/elasticsearch/
mode: bulk
api_version: v8
healthcheck:
enabled: false
query:
_msg_field: message
_time_field: timestamp
_stream_fields: host,container_name
_ignore_fields: log.offset,event.original
```
When Vector ingests logs into VictoriaLogs at a high rate, then it may be needed to tune `batch.max_events` option.
For example, the following config is optimized for higher than usual ingestion rate:
```toml
[sinks.vlogs]
inputs = [ "your_input" ]
type = "elasticsearch"
endpoints = [ "http://localhost:9428/insert/elasticsearch/" ]
mode = "bulk"
api_version = "v8"
healthcheck.enabled = false
[sinks.vlogs.query]
_msg_field = "message"
_time_field = "timestamp"
_stream_fields = "host,container_name"
[sinks.vlogs.batch]
max_events = 1000
```yaml
sinks:
vlogs:
inputs:
- your_input
type: elasticsearch
endpoints:
- http://localhost:9428/insert/elasticsearch/
mode: bulk
api_version: v8
healthcheck:
enabled: false
query:
_msg_field: message
_time_field: timestamp
_stream_fields: host,container_name
batch]
max_events: 1000
```
If the Vector sends logs to VictoriaLogs in another datacenter, then it may be useful enabling data compression via `compression = "gzip"` option.
This usually allows saving network bandwidth and costs by up to 5 times:
```toml
[sinks.vlogs]
inputs = [ "your_input" ]
type = "elasticsearch"
endpoints = [ "http://localhost:9428/insert/elasticsearch/" ]
mode = "bulk"
api_version = "v8"
healthcheck.enabled = false
compression = "gzip"
[sinks.vlogs.query]
_msg_field = "message"
_time_field = "timestamp"
_stream_fields = "host,container_name"
```yaml
sinks:
vlogs:
inputs:
- your_input
type: elasticsearch
endpoints:
- http://localhost:9428/insert/elasticsearch/
mode: bulk
api_version: v8
healthcheck:
enabled: false
compression: gzip
query:
_msg_field: message
_time_field: timestamp
_stream_fields: host,container_name
```
By default, the ingested logs are stored in the `(AccountID=0, ProjectID=0)` [tenant](https://docs.victoriametrics.com/victorialogs/keyconcepts/#multitenancy).
If you need storing logs in other tenant, then specify the needed tenant via `[sinks.vlogs.request.headers]` section.
For example, the following `vector.toml` config instructs Vector to store the data to `(AccountID=12, ProjectID=34)` tenant:
If you need storing logs in other tenant, then specify the needed tenant via `sinks.vlogs.request.headers` section.
For example, the following `vector.yaml` config instructs Vector to store the data to `(AccountID=12, ProjectID=34)` tenant:
```toml
[sinks.vlogs]
inputs = [ "your_input" ]
type = "elasticsearch"
endpoints = [ "http://localhost:9428/insert/elasticsearch/" ]
mode = "bulk"
api_version = "v8"
healthcheck.enabled = false
[sinks.vlogs.query]
_msg_field = "message"
_time_field = "timestamp"
_stream_fields = "host,container_name"
[sinks.vlogs.request.headers]
AccountID = "12"
ProjectID = "34"
```yaml
sinks:
vlogs:
inputs:
- your_input
type: elasticsearch
endpoints:
- http://localhost:9428/insert/elasticsearch/
mode: bulk
api_version: v8
healthcheck:
enabled: false
query:
_msg_field: message
_time_field: timestamp
_stream_fields: host,container_name
request:
headers:
AccountID: "12"
ProjectID: "34"
```
## HTTP sink
## HTTP
Vector can be configured with [HTTP](https://vector.dev/docs/reference/configuration/sinks/http/) sink type
for sending data to [JSON stream API](https://docs.victoriametrics.com/victorialogs/data-ingestion/#json-stream-api):
```toml
[sinks.vlogs]
inputs = [ "your_input" ]
type = "http"
uri = "http://localhost:9428/insert/jsonline?_stream_fields=host,container_name&_msg_field=message&_time_field=timestamp"
encoding.codec = "json"
framing.method = "newline_delimited"
healthcheck.enabled = false
[sinks.vlogs.request.headers]
AccountID = "12"
ProjectID = "34"
```yaml
sinks:
vlogs:
inputs:
- your_input
type: http
uri: http://localhost:9428/insert/jsonline?_stream_fields=host,container_name&_msg_field=message&_time_field=timestamp
encoding:
codec: json
framing:
method: newline_delimited
healthcheck:
enabled: false
request:
headers:
AccountID: "12"
ProjectID: "34"
```
See also: