diff --git a/app/vmagent/influx/request_handler.go b/app/vmagent/influx/request_handler.go index 4ac39982e5..f4447dbae8 100644 --- a/app/vmagent/influx/request_handler.go +++ b/app/vmagent/influx/request_handler.go @@ -62,6 +62,7 @@ func insertRows(db string, rows []parser.Row) error { buf := ctx.buf[:0] for i := range rows { r := &rows[i] + rowsTotal += len(r.Fields) commonLabels = commonLabels[:0] hasDBKey := false for j := range r.Tags { @@ -111,7 +112,6 @@ func insertRows(db string, rows []parser.Row) error { Samples: samples[len(samples)-1:], }) } - rowsTotal += len(r.Fields) } ctx.buf = buf ctx.ctx.WriteRequest.Timeseries = tssDst diff --git a/app/vmagent/native/request_handler.go b/app/vmagent/native/request_handler.go index bda1220d43..e93f5bb2b8 100644 --- a/app/vmagent/native/request_handler.go +++ b/app/vmagent/native/request_handler.go @@ -38,6 +38,12 @@ func insertRows(block *parser.Block, extraLabels []prompbmarshal.Label) error { ctx := common.GetPushCtx() defer common.PutPushCtx(ctx) + // Update rowsInserted and rowsPerInsert before actual inserting, + // since relabeling can prevent from inserting the rows. + rowsLen := len(block.Values) + rowsInserted.Add(rowsLen) + rowsPerInsert.Update(float64(rowsLen)) + tssDst := ctx.WriteRequest.Timeseries[:0] labels := ctx.Labels[:0] samples := ctx.Samples[:0] @@ -71,12 +77,9 @@ func insertRows(block *parser.Block, extraLabels []prompbmarshal.Label) error { Labels: labels[labelsLen:], Samples: samples[samplesLen:], }) - rowsTotal := len(values) ctx.WriteRequest.Timeseries = tssDst ctx.Labels = labels ctx.Samples = samples remotewrite.Push(&ctx.WriteRequest) - rowsInserted.Add(rowsTotal) - rowsPerInsert.Update(float64(rowsTotal)) return nil } diff --git a/app/vmagent/promremotewrite/request_handler.go b/app/vmagent/promremotewrite/request_handler.go index 747092e1c8..00dfcd614e 100644 --- a/app/vmagent/promremotewrite/request_handler.go +++ b/app/vmagent/promremotewrite/request_handler.go @@ -35,6 +35,7 @@ func insertRows(timeseries []prompb.TimeSeries) error { samples := ctx.Samples[:0] for i := range timeseries { ts := ×eries[i] + rowsTotal += len(ts.Samples) labelsLen := len(labels) for i := range ts.Labels { label := &ts.Labels[i] @@ -55,7 +56,6 @@ func insertRows(timeseries []prompb.TimeSeries) error { Labels: labels[labelsLen:], Samples: samples[samplesLen:], }) - rowsTotal += len(ts.Samples) } ctx.WriteRequest.Timeseries = tssDst ctx.Labels = labels diff --git a/app/vmagent/vmimport/request_handler.go b/app/vmagent/vmimport/request_handler.go index 572ce66825..5afb8a37c6 100644 --- a/app/vmagent/vmimport/request_handler.go +++ b/app/vmagent/vmimport/request_handler.go @@ -44,6 +44,7 @@ func insertRows(rows []parser.Row, extraLabels []prompbmarshal.Label) error { samples := ctx.Samples[:0] for i := range rows { r := &rows[i] + rowsTotal += len(r.Values) labelsLen := len(labels) for j := range r.Tags { tag := &r.Tags[j] @@ -69,7 +70,6 @@ func insertRows(rows []parser.Row, extraLabels []prompbmarshal.Label) error { Labels: labels[labelsLen:], Samples: samples[samplesLen:], }) - rowsTotal += len(values) } ctx.WriteRequest.Timeseries = tssDst ctx.Labels = labels diff --git a/app/vminsert/influx/request_handler.go b/app/vminsert/influx/request_handler.go index 4362c61053..93a826cd0e 100644 --- a/app/vminsert/influx/request_handler.go +++ b/app/vminsert/influx/request_handler.go @@ -65,6 +65,7 @@ func insertRows(db string, rows []parser.Row) error { hasRelabeling := relabel.HasRelabeling() for i := range rows { r := &rows[i] + rowsTotal += len(r.Fields) ic.Labels = ic.Labels[:0] hasDBKey := false for j := range r.Tags { @@ -125,7 +126,6 @@ func insertRows(db string, rows []parser.Row) error { } } } - rowsTotal += len(r.Fields) } rowsInserted.Add(rowsTotal) rowsPerInsert.Update(float64(rowsTotal)) diff --git a/app/vminsert/native/request_handler.go b/app/vminsert/native/request_handler.go index c991d3891f..9b0fc6477f 100644 --- a/app/vminsert/native/request_handler.go +++ b/app/vminsert/native/request_handler.go @@ -38,7 +38,12 @@ func insertRows(block *parser.Block, extraLabels []prompbmarshal.Label) error { ctx := getPushCtx() defer putPushCtx(ctx) + // Update rowsInserted and rowsPerInsert before actual inserting, + // since relabeling can prevent from inserting the rows. rowsLen := len(block.Values) + rowsInserted.Add(rowsLen) + rowsPerInsert.Update(float64(rowsLen)) + ic := &ctx.Common ic.Reset(rowsLen) hasRelabeling := relabel.HasRelabeling() @@ -72,9 +77,6 @@ func insertRows(block *parser.Block, extraLabels []prompbmarshal.Label) error { return err } } - rowsTotal := len(values) - rowsInserted.Add(rowsTotal) - rowsPerInsert.Update(float64(rowsTotal)) return ic.FlushBufs() } diff --git a/app/vminsert/prompush/push.go b/app/vminsert/prompush/push.go index 6291114f47..1c6ebe0d78 100644 --- a/app/vminsert/prompush/push.go +++ b/app/vminsert/prompush/push.go @@ -51,6 +51,7 @@ func push(ctx *common.InsertCtx, tss []prompbmarshal.TimeSeries) { rowsTotal := 0 for i := range tss { ts := &tss[i] + rowsTotal += len(ts.Samples) ctx.Labels = ctx.Labels[:0] for j := range ts.Labels { label := &ts.Labels[j] @@ -71,7 +72,6 @@ func push(ctx *common.InsertCtx, tss []prompbmarshal.TimeSeries) { return } } - rowsTotal += len(ts.Samples) } rowsInserted.Add(rowsTotal) rowsPerInsert.Update(float64(rowsTotal)) diff --git a/app/vminsert/promremotewrite/request_handler.go b/app/vminsert/promremotewrite/request_handler.go index 74568920ce..f4ff538149 100644 --- a/app/vminsert/promremotewrite/request_handler.go +++ b/app/vminsert/promremotewrite/request_handler.go @@ -36,6 +36,7 @@ func insertRows(timeseries []prompb.TimeSeries) error { hasRelabeling := relabel.HasRelabeling() for i := range timeseries { ts := ×eries[i] + rowsTotal += len(ts.Samples) ctx.Labels = ctx.Labels[:0] srcLabels := ts.Labels for _, srcLabel := range srcLabels { @@ -58,7 +59,6 @@ func insertRows(timeseries []prompb.TimeSeries) error { return err } } - rowsTotal += len(samples) } rowsInserted.Add(rowsTotal) rowsPerInsert.Update(float64(rowsTotal)) diff --git a/app/vminsert/vmimport/request_handler.go b/app/vminsert/vmimport/request_handler.go index 3094409ecf..6fbc7ab833 100644 --- a/app/vminsert/vmimport/request_handler.go +++ b/app/vminsert/vmimport/request_handler.go @@ -50,6 +50,7 @@ func insertRows(rows []parser.Row, extraLabels []prompbmarshal.Label) error { hasRelabeling := relabel.HasRelabeling() for i := range rows { r := &rows[i] + rowsTotal += len(r.Values) ic.Labels = ic.Labels[:0] for j := range r.Tags { tag := &r.Tags[j] @@ -78,7 +79,6 @@ func insertRows(rows []parser.Row, extraLabels []prompbmarshal.Label) error { return err } } - rowsTotal += len(values) } rowsInserted.Add(rowsTotal) rowsPerInsert.Update(float64(rowsTotal))