app/{vminsert,vmagent}: take into account all the inserted rows before relabeling in vm_rows_inserted_total and vmagent_rows_inserted_total metrics

This commit is contained in:
Aliaksandr Valialkin 2020-10-09 13:29:27 +03:00
parent d2e917d1cb
commit 9b7ce5d004
8 changed files with 18 additions and 12 deletions

View file

@ -62,6 +62,7 @@ func insertRows(db string, rows []parser.Row) error {
buf := ctx.buf[:0]
for i := range rows {
r := &rows[i]
rowsTotal += len(r.Fields)
commonLabels = commonLabels[:0]
hasDBKey := false
for j := range r.Tags {
@ -111,7 +112,6 @@ func insertRows(db string, rows []parser.Row) error {
Samples: samples[len(samples)-1:],
})
}
rowsTotal += len(r.Fields)
}
ctx.buf = buf
ctx.ctx.WriteRequest.Timeseries = tssDst

View file

@ -38,6 +38,12 @@ func insertRows(block *parser.Block, extraLabels []prompbmarshal.Label) error {
ctx := common.GetPushCtx()
defer common.PutPushCtx(ctx)
// Update rowsInserted and rowsPerInsert before actual inserting,
// since relabeling can prevent from inserting the rows.
rowsLen := len(block.Values)
rowsInserted.Add(rowsLen)
rowsPerInsert.Update(float64(rowsLen))
tssDst := ctx.WriteRequest.Timeseries[:0]
labels := ctx.Labels[:0]
samples := ctx.Samples[:0]
@ -71,12 +77,9 @@ func insertRows(block *parser.Block, extraLabels []prompbmarshal.Label) error {
Labels: labels[labelsLen:],
Samples: samples[samplesLen:],
})
rowsTotal := len(values)
ctx.WriteRequest.Timeseries = tssDst
ctx.Labels = labels
ctx.Samples = samples
remotewrite.Push(&ctx.WriteRequest)
rowsInserted.Add(rowsTotal)
rowsPerInsert.Update(float64(rowsTotal))
return nil
}

View file

@ -35,6 +35,7 @@ func insertRows(timeseries []prompb.TimeSeries) error {
samples := ctx.Samples[:0]
for i := range timeseries {
ts := &timeseries[i]
rowsTotal += len(ts.Samples)
labelsLen := len(labels)
for i := range ts.Labels {
label := &ts.Labels[i]
@ -55,7 +56,6 @@ func insertRows(timeseries []prompb.TimeSeries) error {
Labels: labels[labelsLen:],
Samples: samples[samplesLen:],
})
rowsTotal += len(ts.Samples)
}
ctx.WriteRequest.Timeseries = tssDst
ctx.Labels = labels

View file

@ -44,6 +44,7 @@ func insertRows(rows []parser.Row, extraLabels []prompbmarshal.Label) error {
samples := ctx.Samples[:0]
for i := range rows {
r := &rows[i]
rowsTotal += len(r.Values)
labelsLen := len(labels)
for j := range r.Tags {
tag := &r.Tags[j]
@ -69,7 +70,6 @@ func insertRows(rows []parser.Row, extraLabels []prompbmarshal.Label) error {
Labels: labels[labelsLen:],
Samples: samples[samplesLen:],
})
rowsTotal += len(values)
}
ctx.WriteRequest.Timeseries = tssDst
ctx.Labels = labels

View file

@ -69,6 +69,7 @@ func insertRows(at *auth.Token, db string, rows []parser.Row, mayOverrideAccount
hasRelabeling := relabel.HasRelabeling()
for i := range rows {
r := &rows[i]
rowsTotal += len(r.Fields)
ic.Labels = ic.Labels[:0]
hasDBKey := false
for j := range r.Tags {
@ -149,7 +150,6 @@ func insertRows(at *auth.Token, db string, rows []parser.Row, mayOverrideAccount
}
}
}
rowsTotal += len(r.Fields)
}
rowsInserted.Get(&atCopy).Add(rowsTotal)
rowsPerInsert.Update(float64(rowsTotal))

View file

@ -38,6 +38,12 @@ func insertRows(at *auth.Token, block *parser.Block, extraLabels []prompbmarshal
ctx := netstorage.GetInsertCtx()
defer netstorage.PutInsertCtx(ctx)
// Update rowsInserted and rowsPerInsert before actual inserting,
// since relabeling can prevent from inserting the rows.
rowsLen := len(block.Values)
rowsInserted.Get(at).Add(rowsLen)
rowsPerInsert.Update(float64(rowsLen))
ctx.Reset() // This line is required for initializing ctx internals.
hasRelabeling := relabel.HasRelabeling()
mn := &block.MetricName
@ -71,8 +77,5 @@ func insertRows(at *auth.Token, block *parser.Block, extraLabels []prompbmarshal
return err
}
}
rowsTotal := len(values)
rowsInserted.Get(at).Add(rowsTotal)
rowsPerInsert.Update(float64(rowsTotal))
return ctx.FlushBufs()
}

View file

@ -37,6 +37,7 @@ func insertRows(at *auth.Token, timeseries []prompb.TimeSeries) error {
hasRelabeling := relabel.HasRelabeling()
for i := range timeseries {
ts := &timeseries[i]
rowsTotal += len(ts.Samples)
ctx.Labels = ctx.Labels[:0]
srcLabels := ts.Labels
for _, srcLabel := range srcLabels {
@ -61,7 +62,6 @@ func insertRows(at *auth.Token, timeseries []prompb.TimeSeries) error {
return err
}
}
rowsTotal += len(samples)
}
rowsInserted.Get(at).Add(rowsTotal)
rowsPerInsert.Update(float64(rowsTotal))

View file

@ -45,6 +45,7 @@ func insertRows(at *auth.Token, rows []parser.Row, extraLabels []prompbmarshal.L
hasRelabeling := relabel.HasRelabeling()
for i := range rows {
r := &rows[i]
rowsTotal += len(r.Values)
ctx.Labels = ctx.Labels[:0]
for j := range r.Tags {
tag := &r.Tags[j]
@ -74,7 +75,6 @@ func insertRows(at *auth.Token, rows []parser.Row, extraLabels []prompbmarshal.L
return err
}
}
rowsTotal += len(values)
}
rowsInserted.Get(at).Add(rowsTotal)
rowsPerInsert.Update(float64(rowsTotal))