From 1eade9b35848d970d9ce3479808c916e86dd48a5 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Sat, 27 Jul 2019 13:20:47 +0300 Subject: [PATCH] app/vminsert: add `vm_rows_per_insert` summary metric This metric should help tuning batch sizes on clients writing data to VictoriaMetrics --- app/vminsert/graphite/request_handler.go | 6 +++++- app/vminsert/influx/request_handler.go | 10 ++++++++-- app/vminsert/opentsdb/request_handler.go | 6 +++++- app/vminsert/prometheus/request_handler.go | 10 ++++++++-- 4 files changed, 26 insertions(+), 6 deletions(-) diff --git a/app/vminsert/graphite/request_handler.go b/app/vminsert/graphite/request_handler.go index 114838939..a3d8f435d 100644 --- a/app/vminsert/graphite/request_handler.go +++ b/app/vminsert/graphite/request_handler.go @@ -14,7 +14,10 @@ import ( "github.com/VictoriaMetrics/metrics" ) -var rowsInserted = metrics.NewCounter(`vm_rows_inserted_total{type="graphite"}`) +var ( + rowsInserted = metrics.NewCounter(`vm_rows_inserted_total{type="graphite"}`) + rowsPerInsert = metrics.NewSummary(`vm_rows_per_insert{type="graphite"}`) +) // insertHandler processes remote write for graphite plaintext protocol. // @@ -51,6 +54,7 @@ func (ctx *pushCtx) InsertRows() error { ic.WriteDataPoint(nil, ic.Labels, r.Timestamp, r.Value) } rowsInserted.Add(len(rows)) + rowsPerInsert.Update(float64(len(rows))) return ic.FlushBufs() } diff --git a/app/vminsert/influx/request_handler.go b/app/vminsert/influx/request_handler.go index 7911e2124..b336216f5 100644 --- a/app/vminsert/influx/request_handler.go +++ b/app/vminsert/influx/request_handler.go @@ -22,7 +22,10 @@ var ( skipSingleField = flag.Bool("influxSkipSingleField", false, "Uses `{measurement}` instead of `{measurement}{separator}{field_name}` for metic name if Influx line contains only a single field") ) -var rowsInserted = metrics.NewCounter(`vm_rows_inserted_total{type="influx"}`) +var ( + rowsInserted = metrics.NewCounter(`vm_rows_inserted_total{type="influx"}`) + rowsPerInsert = metrics.NewSummary(`vm_rows_per_insert{type="influx"}`) +) // InsertHandler processes remote write for influx line protocol. // @@ -84,6 +87,7 @@ func (ctx *pushCtx) InsertRows(db string) error { } ic := &ctx.Common ic.Reset(rowsLen) + rowsTotal := 0 for i := range rows { r := &rows[i] ic.Labels = ic.Labels[:0] @@ -109,8 +113,10 @@ func (ctx *pushCtx) InsertRows(db string) error { ic.AddLabel("", metricGroup) ic.WriteDataPoint(ctx.metricNameBuf, ic.Labels[:1], r.Timestamp, f.Value) } - rowsInserted.Add(len(r.Fields)) + rowsTotal += len(r.Fields) } + rowsInserted.Add(rowsTotal) + rowsPerInsert.Update(float64(rowsTotal)) return ic.FlushBufs() } diff --git a/app/vminsert/opentsdb/request_handler.go b/app/vminsert/opentsdb/request_handler.go index 189e30725..bc96c0a82 100644 --- a/app/vminsert/opentsdb/request_handler.go +++ b/app/vminsert/opentsdb/request_handler.go @@ -14,7 +14,10 @@ import ( "github.com/VictoriaMetrics/metrics" ) -var rowsInserted = metrics.NewCounter(`vm_rows_inserted_total{type="opentsdb"}`) +var ( + rowsInserted = metrics.NewCounter(`vm_rows_inserted_total{type="opentsdb"}`) + rowsPerInsert = metrics.NewSummary(`vm_rows_per_insert{type="opentsdb"}`) +) // insertHandler processes remote write for OpenTSDB put protocol. // @@ -51,6 +54,7 @@ func (ctx *pushCtx) InsertRows() error { ic.WriteDataPoint(nil, ic.Labels, r.Timestamp, r.Value) } rowsInserted.Add(len(rows)) + rowsPerInsert.Update(float64(len(rows))) return ic.FlushBufs() } diff --git a/app/vminsert/prometheus/request_handler.go b/app/vminsert/prometheus/request_handler.go index ab544afac..46a03ddfb 100644 --- a/app/vminsert/prometheus/request_handler.go +++ b/app/vminsert/prometheus/request_handler.go @@ -12,7 +12,10 @@ import ( "github.com/VictoriaMetrics/metrics" ) -var rowsInserted = metrics.NewCounter(`vm_rows_inserted_total{type="prometheus"}`) +var ( + rowsInserted = metrics.NewCounter(`vm_rows_inserted_total{type="prometheus"}`) + rowsPerInsert = metrics.NewSummary(`vm_rows_per_insert{type="prometheus"}`) +) // InsertHandler processes remote write for prometheus. func InsertHandler(r *http.Request, maxSize int64) error { @@ -34,6 +37,7 @@ func insertHandlerInternal(r *http.Request, maxSize int64) error { } ic := &ctx.Common ic.Reset(rowsLen) + rowsTotal := 0 for i := range timeseries { ts := ×eries[i] var metricNameRaw []byte @@ -41,8 +45,10 @@ func insertHandlerInternal(r *http.Request, maxSize int64) error { r := &ts.Samples[i] metricNameRaw = ic.WriteDataPointExt(metricNameRaw, ts.Labels, r.Timestamp, r.Value) } - rowsInserted.Add(len(ts.Samples)) + rowsTotal += len(ts.Samples) } + rowsInserted.Add(rowsTotal) + rowsPerInsert.Update(float64(rowsTotal)) return ic.FlushBufs() }