From 50bfa689c93b21d771f7d478d921cf0ea733d411 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Sat, 30 Nov 2024 17:19:13 +0100 Subject: [PATCH] app/vlinsert: expose vl_bytes_ingested_total metric This metric tracks an approximate amounts of bytes processed when parsing the ingested logs. The metric is exposed individually per every supported data ingestion protocol. The protocol name is exposed via "type" label in order to be consistent with vl_rows_ingested_total metric. Thanks to @tenmozes for the initial idea and implementation at https://github.com/VictoriaMetrics/VictoriaMetrics/pull/7682 While at it, remove the unneeded "format" label from vl_rows_ingested_total metric. The "type" label must be enough for encoding the data ingestion format. --- app/vlinsert/datadog/datadog.go | 2 +- app/vlinsert/elasticsearch/elasticsearch.go | 2 +- app/vlinsert/insertutils/common_params.go | 20 +++++++++++++++++++- app/vlinsert/journald/journald.go | 10 +++++----- app/vlinsert/jsonline/jsonline.go | 2 +- app/vlinsert/loki/loki_json.go | 4 ++-- app/vlinsert/loki/loki_protobuf.go | 4 ++-- app/vlinsert/opentelemetry/opentelemetry.go | 4 ++-- app/vlinsert/syslog/syslog.go | 8 ++++---- docs/VictoriaLogs/CHANGELOG.md | 3 ++- 10 files changed, 39 insertions(+), 20 deletions(-) diff --git a/app/vlinsert/datadog/datadog.go b/app/vlinsert/datadog/datadog.go index 8bd7f96d89..e53d3a7e96 100644 --- a/app/vlinsert/datadog/datadog.go +++ b/app/vlinsert/datadog/datadog.go @@ -84,7 +84,7 @@ func datadogLogsIngestion(w http.ResponseWriter, r *http.Request) bool { return true } - lmp := cp.NewLogMessageProcessor() + lmp := cp.NewLogMessageProcessor("datadog") n, err := readLogsRequest(ts, data, lmp.AddRow) lmp.MustClose() if n > 0 { diff --git a/app/vlinsert/elasticsearch/elasticsearch.go b/app/vlinsert/elasticsearch/elasticsearch.go index b0e941cd28..4472572ef1 100644 --- a/app/vlinsert/elasticsearch/elasticsearch.go +++ b/app/vlinsert/elasticsearch/elasticsearch.go @@ -101,7 +101,7 @@ func RequestHandler(path string, w http.ResponseWriter, r *http.Request) bool { httpserver.Errorf(w, r, "%s", err) return true } - lmp := cp.NewLogMessageProcessor() + lmp := cp.NewLogMessageProcessor("elasticsearch_bulk") isGzip := r.Header.Get("Content-Encoding") == "gzip" n, err := readBulkRequest(r.Body, isGzip, cp.TimeField, cp.MsgFields, lmp) lmp.MustClose() diff --git a/app/vlinsert/insertutils/common_params.go b/app/vlinsert/insertutils/common_params.go index c357d3f874..4a85bc49fe 100644 --- a/app/vlinsert/insertutils/common_params.go +++ b/app/vlinsert/insertutils/common_params.go @@ -154,6 +154,8 @@ type logMessageProcessor struct { cp *CommonParams lr *logstorage.LogRows + + processedBytesTotal *metrics.Counter } func (lmp *logMessageProcessor) initPeriodicFlush() { @@ -187,6 +189,9 @@ func (lmp *logMessageProcessor) AddRow(timestamp int64, fields []logstorage.Fiel lmp.mu.Lock() defer lmp.mu.Unlock() + n := getApproxJSONRowLen(fields) + lmp.processedBytesTotal.Add(n) + if len(fields) > *MaxFieldsPerLine { rf := logstorage.RowFormatter(fields) logger.Warnf("dropping log line with %d fields; it exceeds -insert.maxFieldsPerLine=%d; %s", len(fields), *MaxFieldsPerLine, rf) @@ -207,6 +212,16 @@ func (lmp *logMessageProcessor) AddRow(timestamp int64, fields []logstorage.Fiel } } +// getApproxJSONRowLen returns an approximate length of the log entry with the given fields if represented as JSON. +func getApproxJSONRowLen(fields []logstorage.Field) int { + n := len("{}\n") + n += len(`"_time":""`) + len(time.RFC3339Nano) + for _, f := range fields { + n += len(`,"":""`) + len(f.Name) + len(f.Value) + } + return n +} + // flushLocked must be called under locked lmp.mu. func (lmp *logMessageProcessor) flushLocked() { lmp.lastFlushTime = time.Now() @@ -227,12 +242,15 @@ func (lmp *logMessageProcessor) MustClose() { // NewLogMessageProcessor returns new LogMessageProcessor for the given cp. // // MustClose() must be called on the returned LogMessageProcessor when it is no longer needed. -func (cp *CommonParams) NewLogMessageProcessor() LogMessageProcessor { +func (cp *CommonParams) NewLogMessageProcessor(protocolName string) LogMessageProcessor { lr := logstorage.GetLogRows(cp.StreamFields, cp.IgnoreFields, cp.ExtraFields, *defaultMsgValue) + processedBytesTotal := metrics.GetOrCreateCounter(fmt.Sprintf("vl_bytes_ingested_total{type=%q}", protocolName)) lmp := &logMessageProcessor{ cp: cp, lr: lr, + processedBytesTotal: processedBytesTotal, + stopCh: make(chan struct{}), } lmp.initPeriodicFlush() diff --git a/app/vlinsert/journald/journald.go b/app/vlinsert/journald/journald.go index 1cf76c3544..68509a5713 100644 --- a/app/vlinsert/journald/journald.go +++ b/app/vlinsert/journald/journald.go @@ -120,7 +120,7 @@ func handleJournald(r *http.Request, w http.ResponseWriter) { return } - lmp := cp.NewLogMessageProcessor() + lmp := cp.NewLogMessageProcessor("journald") n, err := parseJournaldRequest(data, lmp, cp) lmp.MustClose() if err != nil { @@ -138,12 +138,12 @@ func handleJournald(r *http.Request, w http.ResponseWriter) { } var ( - rowsIngestedJournaldTotal = metrics.NewCounter(`vl_rows_ingested_total{type="journald", format="journald"}`) + rowsIngestedJournaldTotal = metrics.NewCounter(`vl_rows_ingested_total{type="journald"}`) - requestsJournaldTotal = metrics.NewCounter(`vl_http_requests_total{path="/insert/journald/upload",format="journald"}`) - errorsTotal = metrics.NewCounter(`vl_http_errors_total{path="/insert/journald/upload",format="journald"}`) + requestsJournaldTotal = metrics.NewCounter(`vl_http_requests_total{path="/insert/journald/upload"}`) + errorsTotal = metrics.NewCounter(`vl_http_errors_total{path="/insert/journald/upload"}`) - requestJournaldDuration = metrics.NewHistogram(`vl_http_request_duration_seconds{path="/insert/journald/upload",format="journald"}`) + requestJournaldDuration = metrics.NewHistogram(`vl_http_request_duration_seconds{path="/insert/journald/upload"}`) ) // See https://systemd.io/JOURNAL_EXPORT_FORMATS/#journal-export-format diff --git a/app/vlinsert/jsonline/jsonline.go b/app/vlinsert/jsonline/jsonline.go index 1db2cf7ca8..732d2a2dc0 100644 --- a/app/vlinsert/jsonline/jsonline.go +++ b/app/vlinsert/jsonline/jsonline.go @@ -52,7 +52,7 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) { reader = zr } - lmp := cp.NewLogMessageProcessor() + lmp := cp.NewLogMessageProcessor("jsonline") err = processStreamInternal(reader, cp.TimeField, cp.MsgFields, lmp) lmp.MustClose() diff --git a/app/vlinsert/loki/loki_json.go b/app/vlinsert/loki/loki_json.go index 768691c112..8b8f184150 100644 --- a/app/vlinsert/loki/loki_json.go +++ b/app/vlinsert/loki/loki_json.go @@ -53,7 +53,7 @@ func handleJSON(r *http.Request, w http.ResponseWriter) { httpserver.Errorf(w, r, "%s", err) return } - lmp := cp.NewLogMessageProcessor() + lmp := cp.NewLogMessageProcessor("loki_json") n, err := parseJSONRequest(data, lmp) lmp.MustClose() if err != nil { @@ -71,7 +71,7 @@ func handleJSON(r *http.Request, w http.ResponseWriter) { var ( requestsJSONTotal = metrics.NewCounter(`vl_http_requests_total{path="/insert/loki/api/v1/push",format="json"}`) - rowsIngestedJSONTotal = metrics.NewCounter(`vl_rows_ingested_total{type="loki",format="json"}`) + rowsIngestedJSONTotal = metrics.NewCounter(`vl_rows_ingested_total{type="loki_json"}`) requestJSONDuration = metrics.NewHistogram(`vl_http_request_duration_seconds{path="/insert/loki/api/v1/push",format="json"}`) ) diff --git a/app/vlinsert/loki/loki_protobuf.go b/app/vlinsert/loki/loki_protobuf.go index 2c1ac6b396..014b1bc533 100644 --- a/app/vlinsert/loki/loki_protobuf.go +++ b/app/vlinsert/loki/loki_protobuf.go @@ -44,7 +44,7 @@ func handleProtobuf(r *http.Request, w http.ResponseWriter) { httpserver.Errorf(w, r, "%s", err) return } - lmp := cp.NewLogMessageProcessor() + lmp := cp.NewLogMessageProcessor("loki_protobuf") n, err := parseProtobufRequest(data, lmp) lmp.MustClose() if err != nil { @@ -62,7 +62,7 @@ func handleProtobuf(r *http.Request, w http.ResponseWriter) { var ( requestsProtobufTotal = metrics.NewCounter(`vl_http_requests_total{path="/insert/loki/api/v1/push",format="protobuf"}`) - rowsIngestedProtobufTotal = metrics.NewCounter(`vl_rows_ingested_total{type="loki",format="protobuf"}`) + rowsIngestedProtobufTotal = metrics.NewCounter(`vl_rows_ingested_total{type="loki_protobuf"}`) requestProtobufDuration = metrics.NewHistogram(`vl_http_request_duration_seconds{path="/insert/loki/api/v1/push",format="protobuf"}`) ) diff --git a/app/vlinsert/opentelemetry/opentelemetry.go b/app/vlinsert/opentelemetry/opentelemetry.go index b300500ca4..5a5882c60c 100644 --- a/app/vlinsert/opentelemetry/opentelemetry.go +++ b/app/vlinsert/opentelemetry/opentelemetry.go @@ -66,7 +66,7 @@ func handleProtobuf(r *http.Request, w http.ResponseWriter) { return } - lmp := cp.NewLogMessageProcessor() + lmp := cp.NewLogMessageProcessor("opentelelemtry_protobuf") n, err := pushProtobufRequest(data, lmp) lmp.MustClose() if err != nil { @@ -83,7 +83,7 @@ func handleProtobuf(r *http.Request, w http.ResponseWriter) { } var ( - rowsIngestedProtobufTotal = metrics.NewCounter(`vl_rows_ingested_total{type="opentelemetry",format="protobuf"}`) + rowsIngestedProtobufTotal = metrics.NewCounter(`vl_rows_ingested_total{type="opentelemetry_protobuf"}`) requestsProtobufTotal = metrics.NewCounter(`vl_http_requests_total{path="/insert/opentelemetry/v1/logs",format="protobuf"}`) errorsTotal = metrics.NewCounter(`vl_http_errors_total{path="/insert/opentelemetry/v1/logs",format="protobuf"}`) diff --git a/app/vlinsert/syslog/syslog.go b/app/vlinsert/syslog/syslog.go index 7211204abe..4ad43fe70e 100644 --- a/app/vlinsert/syslog/syslog.go +++ b/app/vlinsert/syslog/syslog.go @@ -314,7 +314,7 @@ func serveUDP(ln net.PacketConn, tenantID logstorage.TenantID, compressMethod st } bb.B = bb.B[:n] udpRequestsTotal.Inc() - if err := processStream(bb.NewReader(), compressMethod, useLocalTimestamp, cp); err != nil { + if err := processStream("udp", bb.NewReader(), compressMethod, useLocalTimestamp, cp); err != nil { logger.Errorf("syslog: cannot process UDP data from %s at %s: %s", remoteAddr, localAddr, err) } } @@ -354,7 +354,7 @@ func serveTCP(ln net.Listener, tenantID logstorage.TenantID, compressMethod stri wg.Add(1) go func() { cp := insertutils.GetCommonParamsForSyslog(tenantID, streamFields, ignoreFields, extraFields) - if err := processStream(c, compressMethod, useLocalTimestamp, cp); err != nil { + if err := processStream("tcp", c, compressMethod, useLocalTimestamp, cp); err != nil { logger.Errorf("syslog: cannot process TCP data at %q: %s", addr, err) } @@ -369,12 +369,12 @@ func serveTCP(ln net.Listener, tenantID logstorage.TenantID, compressMethod stri } // processStream parses a stream of syslog messages from r and ingests them into vlstorage. -func processStream(r io.Reader, compressMethod string, useLocalTimestamp bool, cp *insertutils.CommonParams) error { +func processStream(protocol string, r io.Reader, compressMethod string, useLocalTimestamp bool, cp *insertutils.CommonParams) error { if err := vlstorage.CanWriteData(); err != nil { return err } - lmp := cp.NewLogMessageProcessor() + lmp := cp.NewLogMessageProcessor("syslog_" + protocol) err := processStreamInternal(r, compressMethod, useLocalTimestamp, lmp) lmp.MustClose() diff --git a/docs/VictoriaLogs/CHANGELOG.md b/docs/VictoriaLogs/CHANGELOG.md index d80a1dbe07..be1ca475ad 100644 --- a/docs/VictoriaLogs/CHANGELOG.md +++ b/docs/VictoriaLogs/CHANGELOG.md @@ -10,6 +10,7 @@ menu: aliases: - /VictoriaLogs/CHANGELOG.html --- + The following `tip` changes can be tested by building VictoriaLogs from the latest commit of [VictoriaMetrics](https://github.com/VictoriaMetrics/VictoriaMetrics/) repository according to [these docs](https://docs.victoriametrics.com/victorialogs/quickstart/#building-from-source-code) @@ -18,7 +19,7 @@ according to [these docs](https://docs.victoriametrics.com/victorialogs/quicksta * FEATURE: [web UI](https://docs.victoriametrics.com/victorialogs/querying/#web-ui): add frontend-only pagination for table view. * FEATURE: [web UI](https://docs.victoriametrics.com/victorialogs/querying/#web-ui): improve memory consumption during data processing. This enhancement reduces the overall memory footprint, leading to better performance and stability. * FEATURE: [web UI](https://docs.victoriametrics.com/victorialogs/querying/#web-ui): reduce memory usage across all tabs for improved performance and stability. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7185). -* FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): add support for template alias in predefined panels. This allows creating more readable metric names in the legend using constructions like `{{label_name}}`, where `label_name` is the name of the label. [See this commit](https://github.com/VictoriaMetrics/VictoriaMetrics/commit/116101da78a4dee8bd7c4ba0e66458fd05a10469#diff-95141489b32468cf852d2705d96eaa48c50a8b1cdd0424a29e7ca289912a6dcbR140-R151) +* FEATURE: [data ingestion](https://docs.victoriametrics.com/victorialogs/data-ingestion/): expose `vl_bytes_ingested_total` [counter](https://docs.victoriametrics.com/keyconcepts/#counter) at `/metrics` page. This counter tracks an estimated number of bytes processed when parsing the ingested logs. This counter is exposed individually per every [supported data ingestion protocol](https://docs.victoriametrics.com/victorialogs/data-ingestion/) - the protocol name is exposed in the `type` label. For example, `vl_bytes_ingested_total{type="jsonline"}` tracks an estimated number of bytes processed when reading the ingested logs via [json line protocol](https://docs.victoriametrics.com/victorialogs/data-ingestion/#json-stream-api). Thanks to @tenmozes for the idea and [the initial implementation](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/7682). * BUGFIX: [vmui](https://docs.victoriametrics.com/#vmui): fix for `showLegend` and `alias` flags in predefined panels. [See this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7565)