diff --git a/app/vmagent/influx/request_handler.go b/app/vmagent/influx/request_handler.go index 4ac39982e..f4447dbae 100644 --- a/app/vmagent/influx/request_handler.go +++ b/app/vmagent/influx/request_handler.go @@ -62,6 +62,7 @@ func insertRows(db string, rows []parser.Row) error { buf := ctx.buf[:0] for i := range rows { r := &rows[i] + rowsTotal += len(r.Fields) commonLabels = commonLabels[:0] hasDBKey := false for j := range r.Tags { @@ -111,7 +112,6 @@ func insertRows(db string, rows []parser.Row) error { Samples: samples[len(samples)-1:], }) } - rowsTotal += len(r.Fields) } ctx.buf = buf ctx.ctx.WriteRequest.Timeseries = tssDst diff --git a/app/vmagent/native/request_handler.go b/app/vmagent/native/request_handler.go index bda1220d4..e93f5bb2b 100644 --- a/app/vmagent/native/request_handler.go +++ b/app/vmagent/native/request_handler.go @@ -38,6 +38,12 @@ func insertRows(block *parser.Block, extraLabels []prompbmarshal.Label) error { ctx := common.GetPushCtx() defer common.PutPushCtx(ctx) + // Update rowsInserted and rowsPerInsert before actual inserting, + // since relabeling can prevent from inserting the rows. + rowsLen := len(block.Values) + rowsInserted.Add(rowsLen) + rowsPerInsert.Update(float64(rowsLen)) + tssDst := ctx.WriteRequest.Timeseries[:0] labels := ctx.Labels[:0] samples := ctx.Samples[:0] @@ -71,12 +77,9 @@ func insertRows(block *parser.Block, extraLabels []prompbmarshal.Label) error { Labels: labels[labelsLen:], Samples: samples[samplesLen:], }) - rowsTotal := len(values) ctx.WriteRequest.Timeseries = tssDst ctx.Labels = labels ctx.Samples = samples remotewrite.Push(&ctx.WriteRequest) - rowsInserted.Add(rowsTotal) - rowsPerInsert.Update(float64(rowsTotal)) return nil } diff --git a/app/vmagent/promremotewrite/request_handler.go b/app/vmagent/promremotewrite/request_handler.go index 747092e1c..00dfcd614 100644 --- a/app/vmagent/promremotewrite/request_handler.go +++ b/app/vmagent/promremotewrite/request_handler.go @@ -35,6 +35,7 @@ func insertRows(timeseries []prompb.TimeSeries) error { samples := ctx.Samples[:0] for i := range timeseries { ts := ×eries[i] + rowsTotal += len(ts.Samples) labelsLen := len(labels) for i := range ts.Labels { label := &ts.Labels[i] @@ -55,7 +56,6 @@ func insertRows(timeseries []prompb.TimeSeries) error { Labels: labels[labelsLen:], Samples: samples[samplesLen:], }) - rowsTotal += len(ts.Samples) } ctx.WriteRequest.Timeseries = tssDst ctx.Labels = labels diff --git a/app/vmagent/vmimport/request_handler.go b/app/vmagent/vmimport/request_handler.go index 572ce6682..5afb8a37c 100644 --- a/app/vmagent/vmimport/request_handler.go +++ b/app/vmagent/vmimport/request_handler.go @@ -44,6 +44,7 @@ func insertRows(rows []parser.Row, extraLabels []prompbmarshal.Label) error { samples := ctx.Samples[:0] for i := range rows { r := &rows[i] + rowsTotal += len(r.Values) labelsLen := len(labels) for j := range r.Tags { tag := &r.Tags[j] @@ -69,7 +70,6 @@ func insertRows(rows []parser.Row, extraLabels []prompbmarshal.Label) error { Labels: labels[labelsLen:], Samples: samples[samplesLen:], }) - rowsTotal += len(values) } ctx.WriteRequest.Timeseries = tssDst ctx.Labels = labels diff --git a/app/vminsert/influx/request_handler.go b/app/vminsert/influx/request_handler.go index dc5c03722..da20b29e5 100644 --- a/app/vminsert/influx/request_handler.go +++ b/app/vminsert/influx/request_handler.go @@ -69,6 +69,7 @@ func insertRows(at *auth.Token, db string, rows []parser.Row, mayOverrideAccount hasRelabeling := relabel.HasRelabeling() for i := range rows { r := &rows[i] + rowsTotal += len(r.Fields) ic.Labels = ic.Labels[:0] hasDBKey := false for j := range r.Tags { @@ -149,7 +150,6 @@ func insertRows(at *auth.Token, db string, rows []parser.Row, mayOverrideAccount } } } - rowsTotal += len(r.Fields) } rowsInserted.Get(&atCopy).Add(rowsTotal) rowsPerInsert.Update(float64(rowsTotal)) diff --git a/app/vminsert/native/request_handler.go b/app/vminsert/native/request_handler.go index e581d89fe..ccea5e807 100644 --- a/app/vminsert/native/request_handler.go +++ b/app/vminsert/native/request_handler.go @@ -38,6 +38,12 @@ func insertRows(at *auth.Token, block *parser.Block, extraLabels []prompbmarshal ctx := netstorage.GetInsertCtx() defer netstorage.PutInsertCtx(ctx) + // Update rowsInserted and rowsPerInsert before actual inserting, + // since relabeling can prevent from inserting the rows. + rowsLen := len(block.Values) + rowsInserted.Get(at).Add(rowsLen) + rowsPerInsert.Update(float64(rowsLen)) + ctx.Reset() // This line is required for initializing ctx internals. hasRelabeling := relabel.HasRelabeling() mn := &block.MetricName @@ -71,8 +77,5 @@ func insertRows(at *auth.Token, block *parser.Block, extraLabels []prompbmarshal return err } } - rowsTotal := len(values) - rowsInserted.Get(at).Add(rowsTotal) - rowsPerInsert.Update(float64(rowsTotal)) return ctx.FlushBufs() } diff --git a/app/vminsert/promremotewrite/request_handler.go b/app/vminsert/promremotewrite/request_handler.go index f3d9c8e0d..8282faa4f 100644 --- a/app/vminsert/promremotewrite/request_handler.go +++ b/app/vminsert/promremotewrite/request_handler.go @@ -37,6 +37,7 @@ func insertRows(at *auth.Token, timeseries []prompb.TimeSeries) error { hasRelabeling := relabel.HasRelabeling() for i := range timeseries { ts := ×eries[i] + rowsTotal += len(ts.Samples) ctx.Labels = ctx.Labels[:0] srcLabels := ts.Labels for _, srcLabel := range srcLabels { @@ -61,7 +62,6 @@ func insertRows(at *auth.Token, timeseries []prompb.TimeSeries) error { return err } } - rowsTotal += len(samples) } rowsInserted.Get(at).Add(rowsTotal) rowsPerInsert.Update(float64(rowsTotal)) diff --git a/app/vminsert/vmimport/request_handler.go b/app/vminsert/vmimport/request_handler.go index 269f97213..06f310850 100644 --- a/app/vminsert/vmimport/request_handler.go +++ b/app/vminsert/vmimport/request_handler.go @@ -45,6 +45,7 @@ func insertRows(at *auth.Token, rows []parser.Row, extraLabels []prompbmarshal.L hasRelabeling := relabel.HasRelabeling() for i := range rows { r := &rows[i] + rowsTotal += len(r.Values) ctx.Labels = ctx.Labels[:0] for j := range r.Tags { tag := &r.Tags[j] @@ -74,7 +75,6 @@ func insertRows(at *auth.Token, rows []parser.Row, extraLabels []prompbmarshal.L return err } } - rowsTotal += len(values) } rowsInserted.Get(at).Add(rowsTotal) rowsPerInsert.Update(float64(rowsTotal))