app/{vminsert,vmagent}: take into account all the inserted rows before relabeling in vm_rows_inserted_total and vmagent_rows_inserted_total metrics

This commit is contained in:
Aliaksandr Valialkin 2020-10-09 13:29:27 +03:00
parent f4e8687c88
commit 84227ea2fc
9 changed files with 18 additions and 13 deletions

View file

@ -62,6 +62,7 @@ func insertRows(db string, rows []parser.Row) error {
buf := ctx.buf[:0]
for i := range rows {
r := &rows[i]
rowsTotal += len(r.Fields)
commonLabels = commonLabels[:0]
hasDBKey := false
for j := range r.Tags {
@ -111,7 +112,6 @@ func insertRows(db string, rows []parser.Row) error {
Samples: samples[len(samples)-1:],
})
}
rowsTotal += len(r.Fields)
}
ctx.buf = buf
ctx.ctx.WriteRequest.Timeseries = tssDst

View file

@ -38,6 +38,12 @@ func insertRows(block *parser.Block, extraLabels []prompbmarshal.Label) error {
ctx := common.GetPushCtx()
defer common.PutPushCtx(ctx)
// Update rowsInserted and rowsPerInsert before actual inserting,
// since relabeling can prevent from inserting the rows.
rowsLen := len(block.Values)
rowsInserted.Add(rowsLen)
rowsPerInsert.Update(float64(rowsLen))
tssDst := ctx.WriteRequest.Timeseries[:0]
labels := ctx.Labels[:0]
samples := ctx.Samples[:0]
@ -71,12 +77,9 @@ func insertRows(block *parser.Block, extraLabels []prompbmarshal.Label) error {
Labels: labels[labelsLen:],
Samples: samples[samplesLen:],
})
rowsTotal := len(values)
ctx.WriteRequest.Timeseries = tssDst
ctx.Labels = labels
ctx.Samples = samples
remotewrite.Push(&ctx.WriteRequest)
rowsInserted.Add(rowsTotal)
rowsPerInsert.Update(float64(rowsTotal))
return nil
}

View file

@ -35,6 +35,7 @@ func insertRows(timeseries []prompb.TimeSeries) error {
samples := ctx.Samples[:0]
for i := range timeseries {
ts := &timeseries[i]
rowsTotal += len(ts.Samples)
labelsLen := len(labels)
for i := range ts.Labels {
label := &ts.Labels[i]
@ -55,7 +56,6 @@ func insertRows(timeseries []prompb.TimeSeries) error {
Labels: labels[labelsLen:],
Samples: samples[samplesLen:],
})
rowsTotal += len(ts.Samples)
}
ctx.WriteRequest.Timeseries = tssDst
ctx.Labels = labels

View file

@ -44,6 +44,7 @@ func insertRows(rows []parser.Row, extraLabels []prompbmarshal.Label) error {
samples := ctx.Samples[:0]
for i := range rows {
r := &rows[i]
rowsTotal += len(r.Values)
labelsLen := len(labels)
for j := range r.Tags {
tag := &r.Tags[j]
@ -69,7 +70,6 @@ func insertRows(rows []parser.Row, extraLabels []prompbmarshal.Label) error {
Labels: labels[labelsLen:],
Samples: samples[samplesLen:],
})
rowsTotal += len(values)
}
ctx.WriteRequest.Timeseries = tssDst
ctx.Labels = labels

View file

@ -65,6 +65,7 @@ func insertRows(db string, rows []parser.Row) error {
hasRelabeling := relabel.HasRelabeling()
for i := range rows {
r := &rows[i]
rowsTotal += len(r.Fields)
ic.Labels = ic.Labels[:0]
hasDBKey := false
for j := range r.Tags {
@ -125,7 +126,6 @@ func insertRows(db string, rows []parser.Row) error {
}
}
}
rowsTotal += len(r.Fields)
}
rowsInserted.Add(rowsTotal)
rowsPerInsert.Update(float64(rowsTotal))

View file

@ -38,7 +38,12 @@ func insertRows(block *parser.Block, extraLabels []prompbmarshal.Label) error {
ctx := getPushCtx()
defer putPushCtx(ctx)
// Update rowsInserted and rowsPerInsert before actual inserting,
// since relabeling can prevent from inserting the rows.
rowsLen := len(block.Values)
rowsInserted.Add(rowsLen)
rowsPerInsert.Update(float64(rowsLen))
ic := &ctx.Common
ic.Reset(rowsLen)
hasRelabeling := relabel.HasRelabeling()
@ -72,9 +77,6 @@ func insertRows(block *parser.Block, extraLabels []prompbmarshal.Label) error {
return err
}
}
rowsTotal := len(values)
rowsInserted.Add(rowsTotal)
rowsPerInsert.Update(float64(rowsTotal))
return ic.FlushBufs()
}

View file

@ -51,6 +51,7 @@ func push(ctx *common.InsertCtx, tss []prompbmarshal.TimeSeries) {
rowsTotal := 0
for i := range tss {
ts := &tss[i]
rowsTotal += len(ts.Samples)
ctx.Labels = ctx.Labels[:0]
for j := range ts.Labels {
label := &ts.Labels[j]
@ -71,7 +72,6 @@ func push(ctx *common.InsertCtx, tss []prompbmarshal.TimeSeries) {
return
}
}
rowsTotal += len(ts.Samples)
}
rowsInserted.Add(rowsTotal)
rowsPerInsert.Update(float64(rowsTotal))

View file

@ -36,6 +36,7 @@ func insertRows(timeseries []prompb.TimeSeries) error {
hasRelabeling := relabel.HasRelabeling()
for i := range timeseries {
ts := &timeseries[i]
rowsTotal += len(ts.Samples)
ctx.Labels = ctx.Labels[:0]
srcLabels := ts.Labels
for _, srcLabel := range srcLabels {
@ -58,7 +59,6 @@ func insertRows(timeseries []prompb.TimeSeries) error {
return err
}
}
rowsTotal += len(samples)
}
rowsInserted.Add(rowsTotal)
rowsPerInsert.Update(float64(rowsTotal))

View file

@ -50,6 +50,7 @@ func insertRows(rows []parser.Row, extraLabels []prompbmarshal.Label) error {
hasRelabeling := relabel.HasRelabeling()
for i := range rows {
r := &rows[i]
rowsTotal += len(r.Values)
ic.Labels = ic.Labels[:0]
for j := range r.Tags {
tag := &r.Tags[j]
@ -78,7 +79,6 @@ func insertRows(rows []parser.Row, extraLabels []prompbmarshal.Label) error {
return err
}
}
rowsTotal += len(values)
}
rowsInserted.Add(rowsTotal)
rowsPerInsert.Update(float64(rowsTotal))