diff --git a/app/vminsert/graphite/request_handler.go b/app/vminsert/graphite/request_handler.go index d5e9541c0..b2e3ab698 100644 --- a/app/vminsert/graphite/request_handler.go +++ b/app/vminsert/graphite/request_handler.go @@ -18,7 +18,10 @@ import ( "github.com/valyala/fastjson/fastfloat" ) -var rowsInserted = tenantmetrics.NewCounterMap(`vm_rows_inserted_total{type="graphite"}`) +var ( + rowsInserted = tenantmetrics.NewCounterMap(`vm_rows_inserted_total{type="graphite"}`) + rowsPerInsert = metrics.NewSummary(`vm_rows_per_insert{type="graphite"}`) +) // insertHandler processes remote write for graphite plaintext protocol. // @@ -69,6 +72,7 @@ func (ctx *pushCtx) InsertRows(at *auth.Token) error { } // Assume that all the rows for a single connection belong to the same (AccountID, ProjectID). rowsInserted.Get(&atCopy).Add(len(rows)) + rowsPerInsert.Update(float64(len(rows))) return ic.FlushBufs() } diff --git a/app/vminsert/influx/request_handler.go b/app/vminsert/influx/request_handler.go index 476b99060..8a87abc40 100644 --- a/app/vminsert/influx/request_handler.go +++ b/app/vminsert/influx/request_handler.go @@ -25,7 +25,10 @@ var ( skipSingleField = flag.Bool("influxSkipSingleField", false, "Uses `{measurement}` instead of `{measurement}{separator}{field_name}` for metic name if Influx line contains only a single field") ) -var rowsInserted = tenantmetrics.NewCounterMap(`vm_rows_inserted_total{type="influx"}`) +var ( + rowsInserted = tenantmetrics.NewCounterMap(`vm_rows_inserted_total{type="influx"}`) + rowsPerInsert = metrics.NewSummary(`vm_rows_per_insert{type="influx"}`) +) // InsertHandler processes remote write for influx line protocol. // @@ -83,7 +86,7 @@ func (ctx *pushCtx) InsertRows(at *auth.Token, db string) error { rows := ctx.Rows.Rows ic := &ctx.Common ic.Reset() - rowsAdded := 0 + rowsTotal := 0 for i := range rows { r := &rows[i] ic.Labels = ic.Labels[:0] @@ -116,9 +119,10 @@ func (ctx *pushCtx) InsertRows(at *auth.Token, db string) error { return err } } - rowsAdded += len(r.Fields) + rowsTotal += len(r.Fields) } - rowsInserted.Get(at).Add(rowsAdded) + rowsInserted.Get(at).Add(rowsTotal) + rowsPerInsert.Update(float64(rowsTotal)) return ic.FlushBufs() } diff --git a/app/vminsert/opentsdb/request_handler.go b/app/vminsert/opentsdb/request_handler.go index 3b996c066..d88e69a29 100644 --- a/app/vminsert/opentsdb/request_handler.go +++ b/app/vminsert/opentsdb/request_handler.go @@ -18,7 +18,10 @@ import ( "github.com/valyala/fastjson/fastfloat" ) -var rowsInserted = tenantmetrics.NewCounterMap(`vm_rows_inserted_total{type="opentsdb"}`) +var ( + rowsInserted = tenantmetrics.NewCounterMap(`vm_rows_inserted_total{type="opentsdb"}`) + rowsPerInsert = metrics.NewSummary(`vm_rows_per_insert{type="opentsdb"}`) +) // insertHandler processes remote write for OpenTSDB put protocol. // @@ -69,6 +72,7 @@ func (ctx *pushCtx) InsertRows(at *auth.Token) error { } // Assume that all the rows for a single connection belong to the same (AccountID, ProjectID). rowsInserted.Get(&atCopy).Add(len(rows)) + rowsPerInsert.Update(float64(len(rows))) return ic.FlushBufs() } diff --git a/app/vminsert/prometheus/request_handler.go b/app/vminsert/prometheus/request_handler.go index e5796e783..1871f96eb 100644 --- a/app/vminsert/prometheus/request_handler.go +++ b/app/vminsert/prometheus/request_handler.go @@ -15,7 +15,10 @@ import ( "github.com/VictoriaMetrics/metrics" ) -var rowsInserted = tenantmetrics.NewCounterMap(`vm_rows_inserted_total{type="prometheus"}`) +var ( + rowsInserted = tenantmetrics.NewCounterMap(`vm_rows_inserted_total{type="prometheus"}`) + rowsPerInsert = metrics.NewSummary(`vm_rows_per_insert{type="prometheus"}`) +) // InsertHandler processes remote write for prometheus. func InsertHandler(at *auth.Token, r *http.Request, maxSize int64) error { @@ -34,6 +37,7 @@ func insertHandlerInternal(at *auth.Token, r *http.Request, maxSize int64) error ic := &ctx.Common ic.Reset() timeseries := ctx.req.Timeseries + rowsTotal := 0 for i := range timeseries { ts := ×eries[i] storageNodeIdx := ic.GetStorageNodeIdx(at, ts.Labels) @@ -47,8 +51,10 @@ func insertHandlerInternal(at *auth.Token, r *http.Request, maxSize int64) error return err } } - rowsInserted.Get(at).Add(len(ts.Samples)) + rowsTotal += len(ts.Samples) } + rowsInserted.Get(at).Add(rowsTotal) + rowsPerInsert.Update(float64(rowsTotal)) return ic.FlushBufs() }