app/vlinsert: expose vl_bytes_ingested_total metric

This metric tracks an approximate amounts of bytes processed when parsing the ingested logs.
The metric is exposed individually per every supported data ingestion protocol. The protocol name
is exposed via "type" label in order to be consistent with vl_rows_ingested_total metric.

Thanks to @tenmozes for the initial idea and implementation at https://github.com/VictoriaMetrics/VictoriaMetrics/pull/7682

While at it, remove the unneeded "format" label from vl_rows_ingested_total metric.
The "type" label must be enough for encoding the data ingestion format.
This commit is contained in:
Aliaksandr Valialkin 2024-11-30 17:19:13 +01:00
parent 70e368741d
commit b2555491bb
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB
10 changed files with 39 additions and 20 deletions

View file

@ -84,7 +84,7 @@ func datadogLogsIngestion(w http.ResponseWriter, r *http.Request) bool {
return true return true
} }
lmp := cp.NewLogMessageProcessor() lmp := cp.NewLogMessageProcessor("datadog")
n, err := readLogsRequest(ts, data, lmp.AddRow) n, err := readLogsRequest(ts, data, lmp.AddRow)
lmp.MustClose() lmp.MustClose()
if n > 0 { if n > 0 {

View file

@ -101,7 +101,7 @@ func RequestHandler(path string, w http.ResponseWriter, r *http.Request) bool {
httpserver.Errorf(w, r, "%s", err) httpserver.Errorf(w, r, "%s", err)
return true return true
} }
lmp := cp.NewLogMessageProcessor() lmp := cp.NewLogMessageProcessor("elasticsearch_bulk")
isGzip := r.Header.Get("Content-Encoding") == "gzip" isGzip := r.Header.Get("Content-Encoding") == "gzip"
n, err := readBulkRequest(r.Body, isGzip, cp.TimeField, cp.MsgFields, lmp) n, err := readBulkRequest(r.Body, isGzip, cp.TimeField, cp.MsgFields, lmp)
lmp.MustClose() lmp.MustClose()

View file

@ -154,6 +154,8 @@ type logMessageProcessor struct {
cp *CommonParams cp *CommonParams
lr *logstorage.LogRows lr *logstorage.LogRows
processedBytesTotal *metrics.Counter
} }
func (lmp *logMessageProcessor) initPeriodicFlush() { func (lmp *logMessageProcessor) initPeriodicFlush() {
@ -187,6 +189,9 @@ func (lmp *logMessageProcessor) AddRow(timestamp int64, fields []logstorage.Fiel
lmp.mu.Lock() lmp.mu.Lock()
defer lmp.mu.Unlock() defer lmp.mu.Unlock()
n := getApproxJSONRowLen(fields)
lmp.processedBytesTotal.Add(n)
if len(fields) > *MaxFieldsPerLine { if len(fields) > *MaxFieldsPerLine {
rf := logstorage.RowFormatter(fields) rf := logstorage.RowFormatter(fields)
logger.Warnf("dropping log line with %d fields; it exceeds -insert.maxFieldsPerLine=%d; %s", len(fields), *MaxFieldsPerLine, rf) logger.Warnf("dropping log line with %d fields; it exceeds -insert.maxFieldsPerLine=%d; %s", len(fields), *MaxFieldsPerLine, rf)
@ -207,6 +212,16 @@ func (lmp *logMessageProcessor) AddRow(timestamp int64, fields []logstorage.Fiel
} }
} }
// getApproxJSONRowLen returns an approximate length of the log entry with the given fields if represented as JSON.
func getApproxJSONRowLen(fields []logstorage.Field) int {
n := len("{}\n")
n += len(`"_time":""`) + len(time.RFC3339Nano)
for _, f := range fields {
n += len(`,"":""`) + len(f.Name) + len(f.Value)
}
return n
}
// flushLocked must be called under locked lmp.mu. // flushLocked must be called under locked lmp.mu.
func (lmp *logMessageProcessor) flushLocked() { func (lmp *logMessageProcessor) flushLocked() {
lmp.lastFlushTime = time.Now() lmp.lastFlushTime = time.Now()
@ -227,12 +242,15 @@ func (lmp *logMessageProcessor) MustClose() {
// NewLogMessageProcessor returns new LogMessageProcessor for the given cp. // NewLogMessageProcessor returns new LogMessageProcessor for the given cp.
// //
// MustClose() must be called on the returned LogMessageProcessor when it is no longer needed. // MustClose() must be called on the returned LogMessageProcessor when it is no longer needed.
func (cp *CommonParams) NewLogMessageProcessor() LogMessageProcessor { func (cp *CommonParams) NewLogMessageProcessor(protocolName string) LogMessageProcessor {
lr := logstorage.GetLogRows(cp.StreamFields, cp.IgnoreFields, cp.ExtraFields, *defaultMsgValue) lr := logstorage.GetLogRows(cp.StreamFields, cp.IgnoreFields, cp.ExtraFields, *defaultMsgValue)
processedBytesTotal := metrics.GetOrCreateCounter(fmt.Sprintf("vl_bytes_ingested_total{type=%q}", protocolName))
lmp := &logMessageProcessor{ lmp := &logMessageProcessor{
cp: cp, cp: cp,
lr: lr, lr: lr,
processedBytesTotal: processedBytesTotal,
stopCh: make(chan struct{}), stopCh: make(chan struct{}),
} }
lmp.initPeriodicFlush() lmp.initPeriodicFlush()

View file

@ -120,7 +120,7 @@ func handleJournald(r *http.Request, w http.ResponseWriter) {
return return
} }
lmp := cp.NewLogMessageProcessor() lmp := cp.NewLogMessageProcessor("journald")
n, err := parseJournaldRequest(data, lmp, cp) n, err := parseJournaldRequest(data, lmp, cp)
lmp.MustClose() lmp.MustClose()
if err != nil { if err != nil {
@ -138,12 +138,12 @@ func handleJournald(r *http.Request, w http.ResponseWriter) {
} }
var ( var (
rowsIngestedJournaldTotal = metrics.NewCounter(`vl_rows_ingested_total{type="journald", format="journald"}`) rowsIngestedJournaldTotal = metrics.NewCounter(`vl_rows_ingested_total{type="journald"}`)
requestsJournaldTotal = metrics.NewCounter(`vl_http_requests_total{path="/insert/journald/upload",format="journald"}`) requestsJournaldTotal = metrics.NewCounter(`vl_http_requests_total{path="/insert/journald/upload"}`)
errorsTotal = metrics.NewCounter(`vl_http_errors_total{path="/insert/journald/upload",format="journald"}`) errorsTotal = metrics.NewCounter(`vl_http_errors_total{path="/insert/journald/upload"}`)
requestJournaldDuration = metrics.NewHistogram(`vl_http_request_duration_seconds{path="/insert/journald/upload",format="journald"}`) requestJournaldDuration = metrics.NewHistogram(`vl_http_request_duration_seconds{path="/insert/journald/upload"}`)
) )
// See https://systemd.io/JOURNAL_EXPORT_FORMATS/#journal-export-format // See https://systemd.io/JOURNAL_EXPORT_FORMATS/#journal-export-format

View file

@ -52,7 +52,7 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) {
reader = zr reader = zr
} }
lmp := cp.NewLogMessageProcessor() lmp := cp.NewLogMessageProcessor("jsonline")
err = processStreamInternal(reader, cp.TimeField, cp.MsgFields, lmp) err = processStreamInternal(reader, cp.TimeField, cp.MsgFields, lmp)
lmp.MustClose() lmp.MustClose()

View file

@ -53,7 +53,7 @@ func handleJSON(r *http.Request, w http.ResponseWriter) {
httpserver.Errorf(w, r, "%s", err) httpserver.Errorf(w, r, "%s", err)
return return
} }
lmp := cp.NewLogMessageProcessor() lmp := cp.NewLogMessageProcessor("loki_json")
n, err := parseJSONRequest(data, lmp) n, err := parseJSONRequest(data, lmp)
lmp.MustClose() lmp.MustClose()
if err != nil { if err != nil {
@ -71,7 +71,7 @@ func handleJSON(r *http.Request, w http.ResponseWriter) {
var ( var (
requestsJSONTotal = metrics.NewCounter(`vl_http_requests_total{path="/insert/loki/api/v1/push",format="json"}`) requestsJSONTotal = metrics.NewCounter(`vl_http_requests_total{path="/insert/loki/api/v1/push",format="json"}`)
rowsIngestedJSONTotal = metrics.NewCounter(`vl_rows_ingested_total{type="loki",format="json"}`) rowsIngestedJSONTotal = metrics.NewCounter(`vl_rows_ingested_total{type="loki_json"}`)
requestJSONDuration = metrics.NewHistogram(`vl_http_request_duration_seconds{path="/insert/loki/api/v1/push",format="json"}`) requestJSONDuration = metrics.NewHistogram(`vl_http_request_duration_seconds{path="/insert/loki/api/v1/push",format="json"}`)
) )

View file

@ -44,7 +44,7 @@ func handleProtobuf(r *http.Request, w http.ResponseWriter) {
httpserver.Errorf(w, r, "%s", err) httpserver.Errorf(w, r, "%s", err)
return return
} }
lmp := cp.NewLogMessageProcessor() lmp := cp.NewLogMessageProcessor("loki_protobuf")
n, err := parseProtobufRequest(data, lmp) n, err := parseProtobufRequest(data, lmp)
lmp.MustClose() lmp.MustClose()
if err != nil { if err != nil {
@ -62,7 +62,7 @@ func handleProtobuf(r *http.Request, w http.ResponseWriter) {
var ( var (
requestsProtobufTotal = metrics.NewCounter(`vl_http_requests_total{path="/insert/loki/api/v1/push",format="protobuf"}`) requestsProtobufTotal = metrics.NewCounter(`vl_http_requests_total{path="/insert/loki/api/v1/push",format="protobuf"}`)
rowsIngestedProtobufTotal = metrics.NewCounter(`vl_rows_ingested_total{type="loki",format="protobuf"}`) rowsIngestedProtobufTotal = metrics.NewCounter(`vl_rows_ingested_total{type="loki_protobuf"}`)
requestProtobufDuration = metrics.NewHistogram(`vl_http_request_duration_seconds{path="/insert/loki/api/v1/push",format="protobuf"}`) requestProtobufDuration = metrics.NewHistogram(`vl_http_request_duration_seconds{path="/insert/loki/api/v1/push",format="protobuf"}`)
) )

View file

@ -66,7 +66,7 @@ func handleProtobuf(r *http.Request, w http.ResponseWriter) {
return return
} }
lmp := cp.NewLogMessageProcessor() lmp := cp.NewLogMessageProcessor("opentelelemtry_protobuf")
n, err := pushProtobufRequest(data, lmp) n, err := pushProtobufRequest(data, lmp)
lmp.MustClose() lmp.MustClose()
if err != nil { if err != nil {
@ -83,7 +83,7 @@ func handleProtobuf(r *http.Request, w http.ResponseWriter) {
} }
var ( var (
rowsIngestedProtobufTotal = metrics.NewCounter(`vl_rows_ingested_total{type="opentelemetry",format="protobuf"}`) rowsIngestedProtobufTotal = metrics.NewCounter(`vl_rows_ingested_total{type="opentelemetry_protobuf"}`)
requestsProtobufTotal = metrics.NewCounter(`vl_http_requests_total{path="/insert/opentelemetry/v1/logs",format="protobuf"}`) requestsProtobufTotal = metrics.NewCounter(`vl_http_requests_total{path="/insert/opentelemetry/v1/logs",format="protobuf"}`)
errorsTotal = metrics.NewCounter(`vl_http_errors_total{path="/insert/opentelemetry/v1/logs",format="protobuf"}`) errorsTotal = metrics.NewCounter(`vl_http_errors_total{path="/insert/opentelemetry/v1/logs",format="protobuf"}`)

View file

@ -314,7 +314,7 @@ func serveUDP(ln net.PacketConn, tenantID logstorage.TenantID, compressMethod st
} }
bb.B = bb.B[:n] bb.B = bb.B[:n]
udpRequestsTotal.Inc() udpRequestsTotal.Inc()
if err := processStream(bb.NewReader(), compressMethod, useLocalTimestamp, cp); err != nil { if err := processStream("udp", bb.NewReader(), compressMethod, useLocalTimestamp, cp); err != nil {
logger.Errorf("syslog: cannot process UDP data from %s at %s: %s", remoteAddr, localAddr, err) logger.Errorf("syslog: cannot process UDP data from %s at %s: %s", remoteAddr, localAddr, err)
} }
} }
@ -354,7 +354,7 @@ func serveTCP(ln net.Listener, tenantID logstorage.TenantID, compressMethod stri
wg.Add(1) wg.Add(1)
go func() { go func() {
cp := insertutils.GetCommonParamsForSyslog(tenantID, streamFields, ignoreFields, extraFields) cp := insertutils.GetCommonParamsForSyslog(tenantID, streamFields, ignoreFields, extraFields)
if err := processStream(c, compressMethod, useLocalTimestamp, cp); err != nil { if err := processStream("tcp", c, compressMethod, useLocalTimestamp, cp); err != nil {
logger.Errorf("syslog: cannot process TCP data at %q: %s", addr, err) logger.Errorf("syslog: cannot process TCP data at %q: %s", addr, err)
} }
@ -369,12 +369,12 @@ func serveTCP(ln net.Listener, tenantID logstorage.TenantID, compressMethod stri
} }
// processStream parses a stream of syslog messages from r and ingests them into vlstorage. // processStream parses a stream of syslog messages from r and ingests them into vlstorage.
func processStream(r io.Reader, compressMethod string, useLocalTimestamp bool, cp *insertutils.CommonParams) error { func processStream(protocol string, r io.Reader, compressMethod string, useLocalTimestamp bool, cp *insertutils.CommonParams) error {
if err := vlstorage.CanWriteData(); err != nil { if err := vlstorage.CanWriteData(); err != nil {
return err return err
} }
lmp := cp.NewLogMessageProcessor() lmp := cp.NewLogMessageProcessor("syslog_" + protocol)
err := processStreamInternal(r, compressMethod, useLocalTimestamp, lmp) err := processStreamInternal(r, compressMethod, useLocalTimestamp, lmp)
lmp.MustClose() lmp.MustClose()

View file

@ -10,6 +10,7 @@ menu:
aliases: aliases:
- /VictoriaLogs/CHANGELOG.html - /VictoriaLogs/CHANGELOG.html
--- ---
The following `tip` changes can be tested by building VictoriaLogs from the latest commit of [VictoriaMetrics](https://github.com/VictoriaMetrics/VictoriaMetrics/) repository The following `tip` changes can be tested by building VictoriaLogs from the latest commit of [VictoriaMetrics](https://github.com/VictoriaMetrics/VictoriaMetrics/) repository
according to [these docs](https://docs.victoriametrics.com/victorialogs/quickstart/#building-from-source-code) according to [these docs](https://docs.victoriametrics.com/victorialogs/quickstart/#building-from-source-code)
@ -18,7 +19,7 @@ according to [these docs](https://docs.victoriametrics.com/victorialogs/quicksta
* FEATURE: [web UI](https://docs.victoriametrics.com/victorialogs/querying/#web-ui): add frontend-only pagination for table view. * FEATURE: [web UI](https://docs.victoriametrics.com/victorialogs/querying/#web-ui): add frontend-only pagination for table view.
* FEATURE: [web UI](https://docs.victoriametrics.com/victorialogs/querying/#web-ui): improve memory consumption during data processing. This enhancement reduces the overall memory footprint, leading to better performance and stability. * FEATURE: [web UI](https://docs.victoriametrics.com/victorialogs/querying/#web-ui): improve memory consumption during data processing. This enhancement reduces the overall memory footprint, leading to better performance and stability.
* FEATURE: [web UI](https://docs.victoriametrics.com/victorialogs/querying/#web-ui): reduce memory usage across all tabs for improved performance and stability. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7185). * FEATURE: [web UI](https://docs.victoriametrics.com/victorialogs/querying/#web-ui): reduce memory usage across all tabs for improved performance and stability. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7185).
* FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): add support for template alias in predefined panels. This allows creating more readable metric names in the legend using constructions like `{{label_name}}`, where `label_name` is the name of the label. [See this commit](https://github.com/VictoriaMetrics/VictoriaMetrics/commit/116101da78a4dee8bd7c4ba0e66458fd05a10469#diff-95141489b32468cf852d2705d96eaa48c50a8b1cdd0424a29e7ca289912a6dcbR140-R151) * FEATURE: [data ingestion](https://docs.victoriametrics.com/victorialogs/data-ingestion/): expose `vl_bytes_ingested_total` [counter](https://docs.victoriametrics.com/keyconcepts/#counter) at `/metrics` page. This counter tracks an estimated number of bytes processed when parsing the ingested logs. This counter is exposed individually per every [supported data ingestion protocol](https://docs.victoriametrics.com/victorialogs/data-ingestion/) - the protocol name is exposed in the `type` label. For example, `vl_bytes_ingested_total{type="jsonline"}` tracks an estimated number of bytes processed when reading the ingested logs via [json line protocol](https://docs.victoriametrics.com/victorialogs/data-ingestion/#json-stream-api). Thanks to @tenmozes for the idea and [the initial implementation](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/7682).
* BUGFIX: [vmui](https://docs.victoriametrics.com/#vmui): fix for `showLegend` and `alias` flags in predefined panels. [See this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7565) * BUGFIX: [vmui](https://docs.victoriametrics.com/#vmui): fix for `showLegend` and `alias` flags in predefined panels. [See this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7565)