app/vminsert: fix relabeling for metrics ingested via Influx line protocol

Previously the enabled relabeling with `-relabelConfig` command-line flag could result in missing labels
if a single Influx line protocol message contains multiple field values.

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/638
This commit is contained in:
Aliaksandr Valialkin 2020-07-23 12:50:41 +03:00
parent 61c611f5ad
commit 2f612e0c67
5 changed files with 73 additions and 77 deletions

View file

@ -63,18 +63,17 @@ func insertRows(db string, rows []parser.Row) error {
for i := range rows { for i := range rows {
r := &rows[i] r := &rows[i]
commonLabels = commonLabels[:0] commonLabels = commonLabels[:0]
hasDBLabel := false
for j := range r.Tags { for j := range r.Tags {
tag := &r.Tags[j] tag := &r.Tags[j]
if tag.Key == "db" { if tag.Key == "db" {
hasDBLabel = true db = ""
} }
commonLabels = append(commonLabels, prompbmarshal.Label{ commonLabels = append(commonLabels, prompbmarshal.Label{
Name: tag.Key, Name: tag.Key,
Value: tag.Value, Value: tag.Value,
}) })
} }
if len(db) > 0 && !hasDBLabel { if len(db) > 0 {
commonLabels = append(commonLabels, prompbmarshal.Label{ commonLabels = append(commonLabels, prompbmarshal.Label{
Name: "db", Name: "db",
Value: db, Value: db,

View file

@ -88,40 +88,36 @@ func (ctx *InsertCtx) addRow(metricNameRaw []byte, timestamp int64, value float6
// //
// name and value must exist until ctx.Labels is used. // name and value must exist until ctx.Labels is used.
func (ctx *InsertCtx) AddLabelBytes(name, value []byte) { func (ctx *InsertCtx) AddLabelBytes(name, value []byte) {
labels := ctx.Labels if len(value) == 0 {
if cap(labels) > len(labels) { // Skip labels without values, since they have no sense.
labels = labels[:len(labels)+1] // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/600
} else { // Do not skip labels with empty name, since they are equal to __name__.
labels = append(labels, prompb.Label{}) return
} }
label := &labels[len(labels)-1] ctx.Labels = append(ctx.Labels, prompb.Label{
// Do not copy name and value contents for performance reasons.
// Do not copy name and value contents for performance reasons. // This reduces GC overhead on the number of objects and allocations.
// This reduces GC overhead on the number of objects and allocations. Name: name,
label.Name = name Value: value,
label.Value = value })
ctx.Labels = labels
} }
// AddLabel adds (name, value) label to ctx.Labels. // AddLabel adds (name, value) label to ctx.Labels.
// //
// name and value must exist until ctx.Labels is used. // name and value must exist until ctx.Labels is used.
func (ctx *InsertCtx) AddLabel(name, value string) { func (ctx *InsertCtx) AddLabel(name, value string) {
labels := ctx.Labels if len(value) == 0 {
if cap(labels) > len(labels) { // Skip labels without values, since they have no sense.
labels = labels[:len(labels)+1] // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/600
} else { // Do not skip labels with empty name, since they are equal to __name__.
labels = append(labels, prompb.Label{}) return
} }
label := &labels[len(labels)-1] ctx.Labels = append(ctx.Labels, prompb.Label{
// Do not copy name and value contents for performance reasons.
// Do not copy name and value contents for performance reasons. // This reduces GC overhead on the number of objects and allocations.
// This reduces GC overhead on the number of objects and allocations. Name: bytesutil.ToUnsafeBytes(name),
label.Name = bytesutil.ToUnsafeBytes(name) Value: bytesutil.ToUnsafeBytes(value),
label.Value = bytesutil.ToUnsafeBytes(value) })
ctx.Labels = labels
} }
// ApplyRelabeling applies relabeling to ic.Labels. // ApplyRelabeling applies relabeling to ic.Labels.

View file

@ -10,6 +10,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/common" "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/common"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/relabel" "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/relabel"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/influx" parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/influx"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage" "github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter" "github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
@ -65,17 +66,14 @@ func insertRows(db string, rows []parser.Row) error {
for i := range rows { for i := range rows {
r := &rows[i] r := &rows[i]
ic.Labels = ic.Labels[:0] ic.Labels = ic.Labels[:0]
hasDBLabel := false
for j := range r.Tags { for j := range r.Tags {
tag := &r.Tags[j] tag := &r.Tags[j]
if tag.Key == "db" { if tag.Key == "db" {
hasDBLabel = true db = ""
} }
ic.AddLabel(tag.Key, tag.Value) ic.AddLabel(tag.Key, tag.Value)
} }
if len(db) > 0 && !hasDBLabel { ic.AddLabel("db", db)
ic.AddLabel("db", db)
}
ctx.metricGroupBuf = ctx.metricGroupBuf[:0] ctx.metricGroupBuf = ctx.metricGroupBuf[:0]
if !*skipMeasurement { if !*skipMeasurement {
ctx.metricGroupBuf = append(ctx.metricGroupBuf, r.Measurement...) ctx.metricGroupBuf = append(ctx.metricGroupBuf, r.Measurement...)
@ -85,29 +83,40 @@ func insertRows(db string, rows []parser.Row) error {
ctx.metricGroupBuf = append(ctx.metricGroupBuf, *measurementFieldSeparator...) ctx.metricGroupBuf = append(ctx.metricGroupBuf, *measurementFieldSeparator...)
} }
metricGroupPrefixLen := len(ctx.metricGroupBuf) metricGroupPrefixLen := len(ctx.metricGroupBuf)
ctx.metricNameBuf = ctx.metricNameBuf[:0] if hasRelabeling {
if !hasRelabeling { ctx.originLabels = append(ctx.originLabels[:0], ic.Labels...)
ctx.metricNameBuf = storage.MarshalMetricNameRaw(ctx.metricNameBuf, ic.Labels) for j := range r.Fields {
} f := &r.Fields[j]
labelsLen := len(ic.Labels) if !skipFieldKey {
for j := range r.Fields { ctx.metricGroupBuf = append(ctx.metricGroupBuf[:metricGroupPrefixLen], f.Key...)
f := &r.Fields[j] }
if !skipFieldKey { metricGroup := bytesutil.ToUnsafeString(ctx.metricGroupBuf)
ctx.metricGroupBuf = append(ctx.metricGroupBuf[:metricGroupPrefixLen], f.Key...) ic.Labels = append(ic.Labels[:0], ctx.originLabels...)
ic.AddLabel("", metricGroup)
ic.ApplyRelabeling()
if len(ic.Labels) == 0 {
// Skip metric without labels.
continue
}
ic.WriteDataPoint(nil, ic.Labels, r.Timestamp, f.Value)
} }
metricGroup := bytesutil.ToUnsafeString(ctx.metricGroupBuf) } else {
ic.Labels = ic.Labels[:labelsLen] ctx.metricNameBuf = storage.MarshalMetricNameRaw(ctx.metricNameBuf[:0], ic.Labels)
ic.AddLabel("", metricGroup) labelsLen := len(ic.Labels)
ic.ApplyRelabeling() // this must be called even if !hasRelabeling in order to remove labels with empty values for j := range r.Fields {
if len(ic.Labels) == 0 { f := &r.Fields[j]
// Skip metric without labels. if !skipFieldKey {
continue ctx.metricGroupBuf = append(ctx.metricGroupBuf[:metricGroupPrefixLen], f.Key...)
}
metricGroup := bytesutil.ToUnsafeString(ctx.metricGroupBuf)
ic.Labels = ic.Labels[:labelsLen]
ic.AddLabel("", metricGroup)
if len(ic.Labels) == 0 {
// Skip metric without labels.
continue
}
ic.WriteDataPoint(ctx.metricNameBuf, ic.Labels[len(ic.Labels)-1:], r.Timestamp, f.Value)
} }
labels := ic.Labels
if !hasRelabeling {
labels = labels[len(labels)-1:]
}
ic.WriteDataPoint(ctx.metricNameBuf, labels, r.Timestamp, f.Value)
} }
rowsTotal += len(r.Fields) rowsTotal += len(r.Fields)
} }
@ -120,12 +129,21 @@ type pushCtx struct {
Common common.InsertCtx Common common.InsertCtx
metricNameBuf []byte metricNameBuf []byte
metricGroupBuf []byte metricGroupBuf []byte
originLabels []prompb.Label
} }
func (ctx *pushCtx) reset() { func (ctx *pushCtx) reset() {
ctx.Common.Reset(0) ctx.Common.Reset(0)
ctx.metricNameBuf = ctx.metricNameBuf[:0] ctx.metricNameBuf = ctx.metricNameBuf[:0]
ctx.metricGroupBuf = ctx.metricGroupBuf[:0] ctx.metricGroupBuf = ctx.metricGroupBuf[:0]
originLabels := ctx.originLabels
for i := range originLabels {
label := &originLabels[i]
label.Name = nil
label.Value = nil
}
ctx.originLabels = ctx.originLabels[:0]
} }
func getPushCtx() *pushCtx { func getPushCtx() *pushCtx {

View file

@ -34,8 +34,11 @@ func insertRows(timeseries []prompb.TimeSeries) error {
rowsTotal := 0 rowsTotal := 0
for i := range timeseries { for i := range timeseries {
ts := &timeseries[i] ts := &timeseries[i]
// Make a shallow copy of ts.Labels before calling ctx.ApplyRelabeling, since ctx.ApplyRelabeling may modify labels. ctx.Labels = ctx.Labels[:0]
ctx.Labels = append(ctx.Labels[:0], ts.Labels...) srcLabels := ts.Labels
for _, srcLabel := range srcLabels {
ctx.AddLabelBytes(srcLabel.Name, srcLabel.Value)
}
ctx.ApplyRelabeling() ctx.ApplyRelabeling()
if len(ctx.Labels) == 0 { if len(ctx.Labels) == 0 {
// Skip metric without labels. // Skip metric without labels.

View file

@ -81,26 +81,6 @@ func (ctx *Ctx) Reset() {
// //
// The returned labels are valid until the next call to ApplyRelabeling. // The returned labels are valid until the next call to ApplyRelabeling.
func (ctx *Ctx) ApplyRelabeling(labels []prompb.Label) []prompb.Label { func (ctx *Ctx) ApplyRelabeling(labels []prompb.Label) []prompb.Label {
// Remove labels with empty values, since such labels have no sense.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/600 .
hasEmptyValues := false
for _, label := range labels {
if len(label.Value) == 0 {
hasEmptyValues = true
break
}
}
if hasEmptyValues {
dst := labels[:0]
for _, label := range labels {
if len(label.Value) == 0 {
continue
}
dst = append(dst, label)
}
labels = dst
}
prcs := prcsGlobal.Load().(*[]promrelabel.ParsedRelabelConfig) prcs := prcsGlobal.Load().(*[]promrelabel.ParsedRelabelConfig)
if len(*prcs) == 0 { if len(*prcs) == 0 {
// There are no relabeling rules. // There are no relabeling rules.