app/vminsert: add vm_rows_per_insert summary metric

This metric should help tuning batch sizes on clients writing data to VictoriaMetrics
This commit is contained in:
Aliaksandr Valialkin 2019-07-27 13:20:47 +03:00
parent 1fd4e9fb5c
commit b7089705b7
4 changed files with 26 additions and 8 deletions

View file

@ -18,7 +18,10 @@ import (
"github.com/valyala/fastjson/fastfloat" "github.com/valyala/fastjson/fastfloat"
) )
var rowsInserted = tenantmetrics.NewCounterMap(`vm_rows_inserted_total{type="graphite"}`) var (
rowsInserted = tenantmetrics.NewCounterMap(`vm_rows_inserted_total{type="graphite"}`)
rowsPerInsert = metrics.NewSummary(`vm_rows_per_insert{type="graphite"}`)
)
// insertHandler processes remote write for graphite plaintext protocol. // insertHandler processes remote write for graphite plaintext protocol.
// //
@ -69,6 +72,7 @@ func (ctx *pushCtx) InsertRows(at *auth.Token) error {
} }
// Assume that all the rows for a single connection belong to the same (AccountID, ProjectID). // Assume that all the rows for a single connection belong to the same (AccountID, ProjectID).
rowsInserted.Get(&atCopy).Add(len(rows)) rowsInserted.Get(&atCopy).Add(len(rows))
rowsPerInsert.Update(float64(len(rows)))
return ic.FlushBufs() return ic.FlushBufs()
} }

View file

@ -25,7 +25,10 @@ var (
skipSingleField = flag.Bool("influxSkipSingleField", false, "Uses `{measurement}` instead of `{measurement}{separator}{field_name}` for metic name if Influx line contains only a single field") skipSingleField = flag.Bool("influxSkipSingleField", false, "Uses `{measurement}` instead of `{measurement}{separator}{field_name}` for metic name if Influx line contains only a single field")
) )
var rowsInserted = tenantmetrics.NewCounterMap(`vm_rows_inserted_total{type="influx"}`) var (
rowsInserted = tenantmetrics.NewCounterMap(`vm_rows_inserted_total{type="influx"}`)
rowsPerInsert = metrics.NewSummary(`vm_rows_per_insert{type="influx"}`)
)
// InsertHandler processes remote write for influx line protocol. // InsertHandler processes remote write for influx line protocol.
// //
@ -83,7 +86,7 @@ func (ctx *pushCtx) InsertRows(at *auth.Token, db string) error {
rows := ctx.Rows.Rows rows := ctx.Rows.Rows
ic := &ctx.Common ic := &ctx.Common
ic.Reset() ic.Reset()
rowsAdded := 0 rowsTotal := 0
for i := range rows { for i := range rows {
r := &rows[i] r := &rows[i]
ic.Labels = ic.Labels[:0] ic.Labels = ic.Labels[:0]
@ -116,9 +119,10 @@ func (ctx *pushCtx) InsertRows(at *auth.Token, db string) error {
return err return err
} }
} }
rowsAdded += len(r.Fields) rowsTotal += len(r.Fields)
} }
rowsInserted.Get(at).Add(rowsAdded) rowsInserted.Get(at).Add(rowsTotal)
rowsPerInsert.Update(float64(rowsTotal))
return ic.FlushBufs() return ic.FlushBufs()
} }

View file

@ -18,7 +18,10 @@ import (
"github.com/valyala/fastjson/fastfloat" "github.com/valyala/fastjson/fastfloat"
) )
var rowsInserted = tenantmetrics.NewCounterMap(`vm_rows_inserted_total{type="opentsdb"}`) var (
rowsInserted = tenantmetrics.NewCounterMap(`vm_rows_inserted_total{type="opentsdb"}`)
rowsPerInsert = metrics.NewSummary(`vm_rows_per_insert{type="opentsdb"}`)
)
// insertHandler processes remote write for OpenTSDB put protocol. // insertHandler processes remote write for OpenTSDB put protocol.
// //
@ -69,6 +72,7 @@ func (ctx *pushCtx) InsertRows(at *auth.Token) error {
} }
// Assume that all the rows for a single connection belong to the same (AccountID, ProjectID). // Assume that all the rows for a single connection belong to the same (AccountID, ProjectID).
rowsInserted.Get(&atCopy).Add(len(rows)) rowsInserted.Get(&atCopy).Add(len(rows))
rowsPerInsert.Update(float64(len(rows)))
return ic.FlushBufs() return ic.FlushBufs()
} }

View file

@ -15,7 +15,10 @@ import (
"github.com/VictoriaMetrics/metrics" "github.com/VictoriaMetrics/metrics"
) )
var rowsInserted = tenantmetrics.NewCounterMap(`vm_rows_inserted_total{type="prometheus"}`) var (
rowsInserted = tenantmetrics.NewCounterMap(`vm_rows_inserted_total{type="prometheus"}`)
rowsPerInsert = metrics.NewSummary(`vm_rows_per_insert{type="prometheus"}`)
)
// InsertHandler processes remote write for prometheus. // InsertHandler processes remote write for prometheus.
func InsertHandler(at *auth.Token, r *http.Request, maxSize int64) error { func InsertHandler(at *auth.Token, r *http.Request, maxSize int64) error {
@ -34,6 +37,7 @@ func insertHandlerInternal(at *auth.Token, r *http.Request, maxSize int64) error
ic := &ctx.Common ic := &ctx.Common
ic.Reset() ic.Reset()
timeseries := ctx.req.Timeseries timeseries := ctx.req.Timeseries
rowsTotal := 0
for i := range timeseries { for i := range timeseries {
ts := &timeseries[i] ts := &timeseries[i]
storageNodeIdx := ic.GetStorageNodeIdx(at, ts.Labels) storageNodeIdx := ic.GetStorageNodeIdx(at, ts.Labels)
@ -47,8 +51,10 @@ func insertHandlerInternal(at *auth.Token, r *http.Request, maxSize int64) error
return err return err
} }
} }
rowsInserted.Get(at).Add(len(ts.Samples)) rowsTotal += len(ts.Samples)
} }
rowsInserted.Get(at).Add(rowsTotal)
rowsPerInsert.Update(float64(rowsTotal))
return ic.FlushBufs() return ic.FlushBufs()
} }