From 2f612e0c67d45e0ba539ed5ed0b63b2dbeea9797 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Thu, 23 Jul 2020 12:50:41 +0300 Subject: [PATCH] app/vminsert: fix relabeling for metrics ingested via Influx line protocol Previously the enabled relabeling with `-relabelConfig` command-line flag could result in missing labels if a single Influx line protocol message contains multiple field values. Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/638 --- app/vmagent/influx/request_handler.go | 5 +- app/vminsert/common/insert_ctx.go | 48 ++++++------- app/vminsert/influx/request_handler.go | 70 ++++++++++++------- .../promremotewrite/request_handler.go | 7 +- app/vminsert/relabel/relabel.go | 20 ------ 5 files changed, 73 insertions(+), 77 deletions(-) diff --git a/app/vmagent/influx/request_handler.go b/app/vmagent/influx/request_handler.go index a17ef60f8..489e0874e 100644 --- a/app/vmagent/influx/request_handler.go +++ b/app/vmagent/influx/request_handler.go @@ -63,18 +63,17 @@ func insertRows(db string, rows []parser.Row) error { for i := range rows { r := &rows[i] commonLabels = commonLabels[:0] - hasDBLabel := false for j := range r.Tags { tag := &r.Tags[j] if tag.Key == "db" { - hasDBLabel = true + db = "" } commonLabels = append(commonLabels, prompbmarshal.Label{ Name: tag.Key, Value: tag.Value, }) } - if len(db) > 0 && !hasDBLabel { + if len(db) > 0 { commonLabels = append(commonLabels, prompbmarshal.Label{ Name: "db", Value: db, diff --git a/app/vminsert/common/insert_ctx.go b/app/vminsert/common/insert_ctx.go index 3eda96d34..d5aef3218 100644 --- a/app/vminsert/common/insert_ctx.go +++ b/app/vminsert/common/insert_ctx.go @@ -88,40 +88,36 @@ func (ctx *InsertCtx) addRow(metricNameRaw []byte, timestamp int64, value float6 // // name and value must exist until ctx.Labels is used. func (ctx *InsertCtx) AddLabelBytes(name, value []byte) { - labels := ctx.Labels - if cap(labels) > len(labels) { - labels = labels[:len(labels)+1] - } else { - labels = append(labels, prompb.Label{}) + if len(value) == 0 { + // Skip labels without values, since they have no sense. + // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/600 + // Do not skip labels with empty name, since they are equal to __name__. + return } - label := &labels[len(labels)-1] - - // Do not copy name and value contents for performance reasons. - // This reduces GC overhead on the number of objects and allocations. - label.Name = name - label.Value = value - - ctx.Labels = labels + ctx.Labels = append(ctx.Labels, prompb.Label{ + // Do not copy name and value contents for performance reasons. + // This reduces GC overhead on the number of objects and allocations. + Name: name, + Value: value, + }) } // AddLabel adds (name, value) label to ctx.Labels. // // name and value must exist until ctx.Labels is used. func (ctx *InsertCtx) AddLabel(name, value string) { - labels := ctx.Labels - if cap(labels) > len(labels) { - labels = labels[:len(labels)+1] - } else { - labels = append(labels, prompb.Label{}) + if len(value) == 0 { + // Skip labels without values, since they have no sense. + // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/600 + // Do not skip labels with empty name, since they are equal to __name__. + return } - label := &labels[len(labels)-1] - - // Do not copy name and value contents for performance reasons. - // This reduces GC overhead on the number of objects and allocations. - label.Name = bytesutil.ToUnsafeBytes(name) - label.Value = bytesutil.ToUnsafeBytes(value) - - ctx.Labels = labels + ctx.Labels = append(ctx.Labels, prompb.Label{ + // Do not copy name and value contents for performance reasons. + // This reduces GC overhead on the number of objects and allocations. + Name: bytesutil.ToUnsafeBytes(name), + Value: bytesutil.ToUnsafeBytes(value), + }) } // ApplyRelabeling applies relabeling to ic.Labels. diff --git a/app/vminsert/influx/request_handler.go b/app/vminsert/influx/request_handler.go index 35fc2d3d8..b6128c7d7 100644 --- a/app/vminsert/influx/request_handler.go +++ b/app/vminsert/influx/request_handler.go @@ -10,6 +10,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/common" "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/relabel" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb" parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/influx" "github.com/VictoriaMetrics/VictoriaMetrics/lib/storage" "github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter" @@ -65,17 +66,14 @@ func insertRows(db string, rows []parser.Row) error { for i := range rows { r := &rows[i] ic.Labels = ic.Labels[:0] - hasDBLabel := false for j := range r.Tags { tag := &r.Tags[j] if tag.Key == "db" { - hasDBLabel = true + db = "" } ic.AddLabel(tag.Key, tag.Value) } - if len(db) > 0 && !hasDBLabel { - ic.AddLabel("db", db) - } + ic.AddLabel("db", db) ctx.metricGroupBuf = ctx.metricGroupBuf[:0] if !*skipMeasurement { ctx.metricGroupBuf = append(ctx.metricGroupBuf, r.Measurement...) @@ -85,29 +83,40 @@ func insertRows(db string, rows []parser.Row) error { ctx.metricGroupBuf = append(ctx.metricGroupBuf, *measurementFieldSeparator...) } metricGroupPrefixLen := len(ctx.metricGroupBuf) - ctx.metricNameBuf = ctx.metricNameBuf[:0] - if !hasRelabeling { - ctx.metricNameBuf = storage.MarshalMetricNameRaw(ctx.metricNameBuf, ic.Labels) - } - labelsLen := len(ic.Labels) - for j := range r.Fields { - f := &r.Fields[j] - if !skipFieldKey { - ctx.metricGroupBuf = append(ctx.metricGroupBuf[:metricGroupPrefixLen], f.Key...) + if hasRelabeling { + ctx.originLabels = append(ctx.originLabels[:0], ic.Labels...) + for j := range r.Fields { + f := &r.Fields[j] + if !skipFieldKey { + ctx.metricGroupBuf = append(ctx.metricGroupBuf[:metricGroupPrefixLen], f.Key...) + } + metricGroup := bytesutil.ToUnsafeString(ctx.metricGroupBuf) + ic.Labels = append(ic.Labels[:0], ctx.originLabels...) + ic.AddLabel("", metricGroup) + ic.ApplyRelabeling() + if len(ic.Labels) == 0 { + // Skip metric without labels. + continue + } + ic.WriteDataPoint(nil, ic.Labels, r.Timestamp, f.Value) } - metricGroup := bytesutil.ToUnsafeString(ctx.metricGroupBuf) - ic.Labels = ic.Labels[:labelsLen] - ic.AddLabel("", metricGroup) - ic.ApplyRelabeling() // this must be called even if !hasRelabeling in order to remove labels with empty values - if len(ic.Labels) == 0 { - // Skip metric without labels. - continue + } else { + ctx.metricNameBuf = storage.MarshalMetricNameRaw(ctx.metricNameBuf[:0], ic.Labels) + labelsLen := len(ic.Labels) + for j := range r.Fields { + f := &r.Fields[j] + if !skipFieldKey { + ctx.metricGroupBuf = append(ctx.metricGroupBuf[:metricGroupPrefixLen], f.Key...) + } + metricGroup := bytesutil.ToUnsafeString(ctx.metricGroupBuf) + ic.Labels = ic.Labels[:labelsLen] + ic.AddLabel("", metricGroup) + if len(ic.Labels) == 0 { + // Skip metric without labels. + continue + } + ic.WriteDataPoint(ctx.metricNameBuf, ic.Labels[len(ic.Labels)-1:], r.Timestamp, f.Value) } - labels := ic.Labels - if !hasRelabeling { - labels = labels[len(labels)-1:] - } - ic.WriteDataPoint(ctx.metricNameBuf, labels, r.Timestamp, f.Value) } rowsTotal += len(r.Fields) } @@ -120,12 +129,21 @@ type pushCtx struct { Common common.InsertCtx metricNameBuf []byte metricGroupBuf []byte + originLabels []prompb.Label } func (ctx *pushCtx) reset() { ctx.Common.Reset(0) ctx.metricNameBuf = ctx.metricNameBuf[:0] ctx.metricGroupBuf = ctx.metricGroupBuf[:0] + + originLabels := ctx.originLabels + for i := range originLabels { + label := &originLabels[i] + label.Name = nil + label.Value = nil + } + ctx.originLabels = ctx.originLabels[:0] } func getPushCtx() *pushCtx { diff --git a/app/vminsert/promremotewrite/request_handler.go b/app/vminsert/promremotewrite/request_handler.go index 4f7b2e80e..c45e2a5e9 100644 --- a/app/vminsert/promremotewrite/request_handler.go +++ b/app/vminsert/promremotewrite/request_handler.go @@ -34,8 +34,11 @@ func insertRows(timeseries []prompb.TimeSeries) error { rowsTotal := 0 for i := range timeseries { ts := ×eries[i] - // Make a shallow copy of ts.Labels before calling ctx.ApplyRelabeling, since ctx.ApplyRelabeling may modify labels. - ctx.Labels = append(ctx.Labels[:0], ts.Labels...) + ctx.Labels = ctx.Labels[:0] + srcLabels := ts.Labels + for _, srcLabel := range srcLabels { + ctx.AddLabelBytes(srcLabel.Name, srcLabel.Value) + } ctx.ApplyRelabeling() if len(ctx.Labels) == 0 { // Skip metric without labels. diff --git a/app/vminsert/relabel/relabel.go b/app/vminsert/relabel/relabel.go index 62ce3afe5..1c8facec2 100644 --- a/app/vminsert/relabel/relabel.go +++ b/app/vminsert/relabel/relabel.go @@ -81,26 +81,6 @@ func (ctx *Ctx) Reset() { // // The returned labels are valid until the next call to ApplyRelabeling. func (ctx *Ctx) ApplyRelabeling(labels []prompb.Label) []prompb.Label { - // Remove labels with empty values, since such labels have no sense. - // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/600 . - hasEmptyValues := false - for _, label := range labels { - if len(label.Value) == 0 { - hasEmptyValues = true - break - } - } - if hasEmptyValues { - dst := labels[:0] - for _, label := range labels { - if len(label.Value) == 0 { - continue - } - dst = append(dst, label) - } - labels = dst - } - prcs := prcsGlobal.Load().(*[]promrelabel.ParsedRelabelConfig) if len(*prcs) == 0 { // There are no relabeling rules.