From e9647bb669fb1df5067a1133ffbb61dde5bcfb23 Mon Sep 17 00:00:00 2001
From: Aliaksandr Valialkin <valyala@victoriametrics.com>
Date: Mon, 18 Sep 2023 23:58:32 +0200
Subject: [PATCH] app/vlinsert: follow-up for
 d570763c91a235cfcbaf1b799cbfbefa557ee7e1

- Switch from summary to histogram for vl_http_request_duration_seconds metric.
  This allows calculating request duration quantiles across multiple hosts
  via histogram_quantile(0.99, sum(vl_http_request_duration_seconds_bucket) by (vmrange)).
- Take into account only successfully processed data ingestion requests
  when updating vl_http_request_duration_seconds histogram.
  Failed requests are ignored, since they may significantly skew measurements.
- Clarify the description of the change at docs/VictoriaLogs/CHANGELOG.md.

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/pull/4934
---
 app/vlinsert/elasticsearch/elasticsearch.go | 12 ++++++++----
 app/vlinsert/jsonline/jsonline.go           | 14 +++++++++-----
 app/vlinsert/loki/loki.go                   | 14 --------------
 app/vlinsert/loki/loki_json.go              | 19 +++++++++++++++----
 app/vlinsert/loki/loki_protobuf.go          | 20 +++++++++++++++++---
 docs/VictoriaLogs/CHANGELOG.md              |  2 +-
 6 files changed, 50 insertions(+), 31 deletions(-)

diff --git a/app/vlinsert/elasticsearch/elasticsearch.go b/app/vlinsert/elasticsearch/elasticsearch.go
index aaa8457450..a3ae68f19e 100644
--- a/app/vlinsert/elasticsearch/elasticsearch.go
+++ b/app/vlinsert/elasticsearch/elasticsearch.go
@@ -86,7 +86,6 @@ func RequestHandler(path string, w http.ResponseWriter, r *http.Request) bool {
 		return true
 	case "/_bulk":
 		startTime := time.Now()
-		defer bulkRequestDuration.UpdateDuration(startTime)
 		bulkRequestsTotal.Inc()
 
 		cp, err := insertutils.GetCommonParams(r)
@@ -110,6 +109,12 @@ func RequestHandler(path string, w http.ResponseWriter, r *http.Request) bool {
 		defer bufferedwriter.Put(bw)
 		WriteBulkResponse(bw, n, tookMs)
 		_ = bw.Flush()
+
+		// update bulkRequestDuration only for successfully parsed requests
+		// There is no need in updating bulkRequestDuration for request errors,
+		// since their timings are usually much smaller than the timing for successful request parsing.
+		bulkRequestDuration.UpdateDuration(startTime)
+
 		return true
 	default:
 		return false
@@ -118,7 +123,8 @@ func RequestHandler(path string, w http.ResponseWriter, r *http.Request) bool {
 
 var (
 	bulkRequestsTotal   = metrics.NewCounter(`vl_http_requests_total{path="/insert/elasticsearch/_bulk"}`)
-	bulkRequestDuration = metrics.NewSummary(`vl_http_request_duration_seconds{path="/insert/elasticsearch/_bulk"}`)
+	rowsIngestedTotal   = metrics.NewCounter(`vl_rows_ingested_total{type="elasticsearch_bulk"}`)
+	bulkRequestDuration = metrics.NewHistogram(`vl_http_request_duration_seconds{path="/insert/elasticsearch/_bulk"}`)
 )
 
 func readBulkRequest(r io.Reader, isGzip bool, timeField, msgField string,
@@ -164,8 +170,6 @@ func readBulkRequest(r io.Reader, isGzip bool, timeField, msgField string,
 
 var lineBufferPool bytesutil.ByteBufferPool
 
-var rowsIngestedTotal = metrics.NewCounter(`vl_rows_ingested_total{type="elasticsearch_bulk"}`)
-
 func readBulkLine(sc *bufio.Scanner, timeField, msgField string,
 	processLogMessage func(timestamp int64, fields []logstorage.Field),
 ) (bool, error) {
diff --git a/app/vlinsert/jsonline/jsonline.go b/app/vlinsert/jsonline/jsonline.go
index 965f1b15c9..bf8d4760ec 100644
--- a/app/vlinsert/jsonline/jsonline.go
+++ b/app/vlinsert/jsonline/jsonline.go
@@ -19,11 +19,9 @@ import (
 	"github.com/VictoriaMetrics/metrics"
 )
 
-var jsonlineRequestDuration = metrics.NewSummary(`vl_http_request_duration_seconds{path="/insert/jsonline"}`)
-
 // RequestHandler processes jsonline insert requests
 func RequestHandler(w http.ResponseWriter, r *http.Request) bool {
-	defer jsonlineRequestDuration.UpdateDuration(time.Now())
+	startTime := time.Now()
 	w.Header().Add("Content-Type", "application/json")
 
 	if r.Method != "POST" {
@@ -80,6 +78,11 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool {
 	vlstorage.MustAddRows(lr)
 	logstorage.PutLogRows(lr)
 
+	// update jsonlineRequestDuration only for successfully parsed requests.
+	// There is no need in updating jsonlineRequestDuration for request errors,
+	// since their timings are usually much smaller than the timing for successful request parsing.
+	jsonlineRequestDuration.UpdateDuration(startTime)
+
 	return true
 }
 
@@ -147,6 +150,7 @@ func parseISO8601Timestamp(s string) (int64, error) {
 var lineBufferPool bytesutil.ByteBufferPool
 
 var (
-	requestsTotal     = metrics.NewCounter(`vl_http_requests_total{path="/insert/jsonline"}`)
-	rowsIngestedTotal = metrics.NewCounter(`vl_rows_ingested_total{type="jsonline"}`)
+	requestsTotal           = metrics.NewCounter(`vl_http_requests_total{path="/insert/jsonline"}`)
+	rowsIngestedTotal       = metrics.NewCounter(`vl_rows_ingested_total{type="jsonline"}`)
+	jsonlineRequestDuration = metrics.NewHistogram(`vl_http_request_duration_seconds{path="/insert/jsonline"}`)
 )
diff --git a/app/vlinsert/loki/loki.go b/app/vlinsert/loki/loki.go
index 98bb688117..e7a3c0b7d0 100644
--- a/app/vlinsert/loki/loki.go
+++ b/app/vlinsert/loki/loki.go
@@ -2,21 +2,11 @@ package loki
 
 import (
 	"net/http"
-	"time"
-
-	"github.com/VictoriaMetrics/metrics"
 
 	"github.com/VictoriaMetrics/VictoriaMetrics/app/vlinsert/insertutils"
 	"github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage"
 )
 
-var (
-	lokiRequestsJSONTotal       = metrics.NewCounter(`vl_http_requests_total{path="/insert/loki/api/v1/push",format="json"}`)
-	lokiRequestsProtobufTotal   = metrics.NewCounter(`vl_http_requests_total{path="/insert/loki/api/v1/push",format="protobuf"}`)
-	lokiRequestJSONDuration     = metrics.NewSummary(`vl_http_request_duration_seconds{path="/insert/loki/api/v1/push",format="json"}`)
-	lokiRequestProtobufDuration = metrics.NewSummary(`vl_http_request_duration_seconds{path="/insert/loki/api/v1/push",format="protobuf"}`)
-)
-
 // RequestHandler processes Loki insert requests
 func RequestHandler(path string, w http.ResponseWriter, r *http.Request) bool {
 	switch path {
@@ -37,13 +27,9 @@ func handleInsert(r *http.Request, w http.ResponseWriter) bool {
 	contentType := r.Header.Get("Content-Type")
 	switch contentType {
 	case "application/json":
-		defer lokiRequestJSONDuration.UpdateDuration(time.Now())
-		lokiRequestsJSONTotal.Inc()
 		return handleJSON(r, w)
 	default:
 		// Protobuf request body should be handled by default according to https://grafana.com/docs/loki/latest/api/#push-log-entries-to-loki
-		defer lokiRequestProtobufDuration.UpdateDuration(time.Now())
-		lokiRequestsProtobufTotal.Inc()
 		return handleProtobuf(r, w)
 	}
 }
diff --git a/app/vlinsert/loki/loki_json.go b/app/vlinsert/loki/loki_json.go
index 381aa34364..88a75df1d0 100644
--- a/app/vlinsert/loki/loki_json.go
+++ b/app/vlinsert/loki/loki_json.go
@@ -18,12 +18,11 @@ import (
 	"github.com/valyala/fastjson"
 )
 
-var (
-	rowsIngestedJSONTotal = metrics.NewCounter(`vl_rows_ingested_total{type="loki",format="json"}`)
-	parserPool            fastjson.ParserPool
-)
+var parserPool fastjson.ParserPool
 
 func handleJSON(r *http.Request, w http.ResponseWriter) bool {
+	startTime := time.Now()
+	lokiRequestsJSONTotal.Inc()
 	reader := r.Body
 	if r.Header.Get("Content-Encoding") == "gzip" {
 		zr, err := common.GetGzipReader(reader)
@@ -58,9 +57,21 @@ func handleJSON(r *http.Request, w http.ResponseWriter) bool {
 		return true
 	}
 	rowsIngestedJSONTotal.Add(n)
+
+	// update lokiRequestJSONDuration only for successfully parsed requests
+	// There is no need in updating lokiRequestJSONDuration for request errors,
+	// since their timings are usually much smaller than the timing for successful request parsing.
+	lokiRequestJSONDuration.UpdateDuration(startTime)
+
 	return true
 }
 
+var (
+	lokiRequestsJSONTotal   = metrics.NewCounter(`vl_http_requests_total{path="/insert/loki/api/v1/push",format="json"}`)
+	rowsIngestedJSONTotal   = metrics.NewCounter(`vl_rows_ingested_total{type="loki",format="json"}`)
+	lokiRequestJSONDuration = metrics.NewHistogram(`vl_http_request_duration_seconds{path="/insert/loki/api/v1/push",format="json"}`)
+)
+
 func parseJSONRequest(data []byte, processLogMessage func(timestamp int64, fields []logstorage.Field)) (int, error) {
 	p := parserPool.Get()
 	defer parserPool.Put(p)
diff --git a/app/vlinsert/loki/loki_protobuf.go b/app/vlinsert/loki/loki_protobuf.go
index a75bf93d00..aa4e6b592f 100644
--- a/app/vlinsert/loki/loki_protobuf.go
+++ b/app/vlinsert/loki/loki_protobuf.go
@@ -19,12 +19,13 @@ import (
 )
 
 var (
-	rowsIngestedProtobufTotal = metrics.NewCounter(`vl_rows_ingested_total{type="loki",format="protobuf"}`)
-	bytesBufPool              bytesutil.ByteBufferPool
-	pushReqsPool              sync.Pool
+	bytesBufPool bytesutil.ByteBufferPool
+	pushReqsPool sync.Pool
 )
 
 func handleProtobuf(r *http.Request, w http.ResponseWriter) bool {
+	startTime := time.Now()
+	lokiRequestsProtobufTotal.Inc()
 	wcr := writeconcurrencylimiter.GetReader(r.Body)
 	data, err := io.ReadAll(wcr)
 	writeconcurrencylimiter.PutReader(wcr)
@@ -47,10 +48,23 @@ func handleProtobuf(r *http.Request, w http.ResponseWriter) bool {
 		httpserver.Errorf(w, r, "cannot parse loki request: %s", err)
 		return true
 	}
+
 	rowsIngestedProtobufTotal.Add(n)
+
+	// update lokiRequestProtobufDuration only for successfully parsed requests
+	// There is no need in updating lokiRequestProtobufDuration for request errors,
+	// since their timings are usually much smaller than the timing for successful request parsing.
+	lokiRequestProtobufDuration.UpdateDuration(startTime)
+
 	return true
 }
 
+var (
+	lokiRequestsProtobufTotal   = metrics.NewCounter(`vl_http_requests_total{path="/insert/loki/api/v1/push",format="protobuf"}`)
+	rowsIngestedProtobufTotal   = metrics.NewCounter(`vl_rows_ingested_total{type="loki",format="protobuf"}`)
+	lokiRequestProtobufDuration = metrics.NewHistogram(`vl_http_request_duration_seconds{path="/insert/loki/api/v1/push",format="protobuf"}`)
+)
+
 func parseProtobufRequest(data []byte, processLogMessage func(timestamp int64, fields []logstorage.Field)) (int, error) {
 	bb := bytesBufPool.Get()
 	defer bytesBufPool.Put(bb)
diff --git a/docs/VictoriaLogs/CHANGELOG.md b/docs/VictoriaLogs/CHANGELOG.md
index 6a0c121ba0..f2bb6d29aa 100644
--- a/docs/VictoriaLogs/CHANGELOG.md
+++ b/docs/VictoriaLogs/CHANGELOG.md
@@ -10,7 +10,7 @@ according to [these docs](https://docs.victoriametrics.com/VictoriaLogs/QuickSta
   * `vl_data_size_bytes{type="storage"}` - on-disk size for data excluding [log stream](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#stream-fields) indexes.
   * `vl_data_size_bytes{type="indexdb"}` - on-disk size for [log stream](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#stream-fields) indexes.
 * FEATURE: add `-insert.maxFieldsPerLine` command-line flag, which can be used for limiting the number of fields per line in logs sent to VictoriaLogs via ingestion protocols. This helps to avoid issues like [this](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4762).
-* FEATURE: expose `vl_http_request_duration_seconds` metric at the [/metrics](monitoring). See this [PR](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/4934) for details.
+* FEATURE: expose `vl_http_request_duration_seconds` histogram at the [/metrics](https://docs.victoriametrics.com/VictoriaLogs/#monitoring) page. Thanks to @crossoverJie for [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/4934).
 
 * BUGFIX: fix possible panic when no data is written to VictoriaLogs for a long time. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4895). Thanks to @crossoverJie for filing and fixing the issue.
 * BUGFIX: add `/insert/loky/ready` endpoint, which is used by Promtail for healthchecks. This should remove `unsupported path requested: /insert/loki/ready` warning logs. See [this comment](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4762#issuecomment-1690966722).