From 2d7b3b1a5c4d2a69dd0cdf4689211a3dcd7a4ca8 Mon Sep 17 00:00:00 2001 From: Andrii Chubatiuk Date: Thu, 31 Oct 2024 21:56:46 +0200 Subject: [PATCH] doing similar changes for both vmagent and vminsert ends up with almost same implementations. one of the reasons is the same Timeseries structure from different prompb and prompbmarshal packages. My proposal is to use structures from prompb package only to marshal/unmarshal sent/received data, but for internal transformations use only structures from prompbmarshal package --- app/victoria-metrics/self_scraper.go | 8 ++++---- app/vmctl/vm_native_test.go | 8 ++++---- app/vminsert/common/insert_ctx.go | 14 +++++++------- app/vminsert/common/sort_labels.go | 4 ++-- app/vminsert/influx/request_handler.go | 5 ++--- app/vminsert/relabel/relabel.go | 5 ++--- app/vmselect/graphite/tags_api.go | 8 ++++---- lib/storage/metric_name.go | 12 ++++++------ 8 files changed, 31 insertions(+), 33 deletions(-) diff --git a/app/victoria-metrics/self_scraper.go b/app/victoria-metrics/self_scraper.go index cc405884d..f1e7a4d3a 100644 --- a/app/victoria-metrics/self_scraper.go +++ b/app/victoria-metrics/self_scraper.go @@ -10,7 +10,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/prometheus" "github.com/VictoriaMetrics/VictoriaMetrics/lib/storage" ) @@ -48,7 +48,7 @@ func selfScraper(scrapeInterval time.Duration) { var bb bytesutil.ByteBuffer var rows prometheus.Rows var mrs []storage.MetricRow - var labels []prompb.Label + var labels []prompbmarshal.Label t := time.NewTicker(scrapeInterval) f := func(currentTime time.Time, sendStaleMarkers bool) { currentTimestamp := currentTime.UnixNano() / 1e6 @@ -99,11 +99,11 @@ func selfScraper(scrapeInterval time.Duration) { } } -func addLabel(dst []prompb.Label, key, value string) []prompb.Label { +func addLabel(dst []prompbmarshal.Label, key, value string) []prompbmarshal.Label { if len(dst) < cap(dst) { dst = dst[:len(dst)+1] } else { - dst = append(dst, prompb.Label{}) + dst = append(dst, prompbmarshal.Label{}) } lb := &dst[len(dst)-1] lb.Name = key diff --git a/app/vmctl/vm_native_test.go b/app/vmctl/vm_native_test.go index fa7bf4b68..f94ca7cb2 100644 --- a/app/vmctl/vm_native_test.go +++ b/app/vmctl/vm_native_test.go @@ -17,7 +17,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/vm" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/promql" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmstorage" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/storage" ) @@ -214,15 +214,15 @@ func processFlags() { func fillStorage(series []vm.TimeSeries) error { var mrs []storage.MetricRow for _, series := range series { - var labels []prompb.Label + var labels []prompbmarshal.Label for _, lp := range series.LabelPairs { - labels = append(labels, prompb.Label{ + labels = append(labels, prompbmarshal.Label{ Name: lp.Name, Value: lp.Value, }) } if series.Name != "" { - labels = append(labels, prompb.Label{ + labels = append(labels, prompbmarshal.Label{ Name: "__name__", Value: series.Name, }) diff --git a/app/vminsert/common/insert_ctx.go b/app/vminsert/common/insert_ctx.go index f7a06960b..d7f89b9c6 100644 --- a/app/vminsert/common/insert_ctx.go +++ b/app/vminsert/common/insert_ctx.go @@ -8,7 +8,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/app/vmstorage" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/storage" ) @@ -30,7 +30,7 @@ type InsertCtx struct { func (ctx *InsertCtx) Reset(rowsLen int) { labels := ctx.Labels for i := range labels { - labels[i] = prompb.Label{} + labels[i] = prompbmarshal.Label{} } ctx.Labels = labels[:0] @@ -51,7 +51,7 @@ func cleanMetricRow(mr *storage.MetricRow) { mr.MetricNameRaw = nil } -func (ctx *InsertCtx) marshalMetricNameRaw(prefix []byte, labels []prompb.Label) []byte { +func (ctx *InsertCtx) marshalMetricNameRaw(prefix []byte, labels []prompbmarshal.Label) []byte { start := len(ctx.metricNamesBuf) ctx.metricNamesBuf = append(ctx.metricNamesBuf, prefix...) ctx.metricNamesBuf = storage.MarshalMetricNameRaw(ctx.metricNamesBuf, labels) @@ -60,7 +60,7 @@ func (ctx *InsertCtx) marshalMetricNameRaw(prefix []byte, labels []prompb.Label) } // WriteDataPoint writes (timestamp, value) with the given prefix and labels into ctx buffer. -func (ctx *InsertCtx) WriteDataPoint(prefix []byte, labels []prompb.Label, timestamp int64, value float64) error { +func (ctx *InsertCtx) WriteDataPoint(prefix []byte, labels []prompbmarshal.Label, timestamp int64, value float64) error { metricNameRaw := ctx.marshalMetricNameRaw(prefix, labels) return ctx.addRow(metricNameRaw, timestamp, value) } @@ -68,7 +68,7 @@ func (ctx *InsertCtx) WriteDataPoint(prefix []byte, labels []prompb.Label, times // WriteDataPointExt writes (timestamp, value) with the given metricNameRaw and labels into ctx buffer. // // It returns metricNameRaw for the given labels if len(metricNameRaw) == 0. -func (ctx *InsertCtx) WriteDataPointExt(metricNameRaw []byte, labels []prompb.Label, timestamp int64, value float64) ([]byte, error) { +func (ctx *InsertCtx) WriteDataPointExt(metricNameRaw []byte, labels []prompbmarshal.Label, timestamp int64, value float64) ([]byte, error) { if len(metricNameRaw) == 0 { metricNameRaw = ctx.marshalMetricNameRaw(nil, labels) } @@ -106,7 +106,7 @@ func (ctx *InsertCtx) AddLabelBytes(name, value []byte) { // Do not skip labels with empty name, since they are equal to __name__. return } - ctx.Labels = append(ctx.Labels, prompb.Label{ + ctx.Labels = append(ctx.Labels, prompbmarshal.Label{ // Do not copy name and value contents for performance reasons. // This reduces GC overhead on the number of objects and allocations. Name: bytesutil.ToUnsafeString(name), @@ -124,7 +124,7 @@ func (ctx *InsertCtx) AddLabel(name, value string) { // Do not skip labels with empty name, since they are equal to __name__. return } - ctx.Labels = append(ctx.Labels, prompb.Label{ + ctx.Labels = append(ctx.Labels, prompbmarshal.Label{ // Do not copy name and value contents for performance reasons. // This reduces GC overhead on the number of objects and allocations. Name: name, diff --git a/app/vminsert/common/sort_labels.go b/app/vminsert/common/sort_labels.go index 16fa88cc0..5ca933708 100644 --- a/app/vminsert/common/sort_labels.go +++ b/app/vminsert/common/sort_labels.go @@ -4,7 +4,7 @@ import ( "flag" "sort" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" ) var sortLabels = flag.Bool("sortLabels", false, `Whether to sort labels for incoming samples before writing them to storage. `+ @@ -19,7 +19,7 @@ func (ctx *InsertCtx) SortLabelsIfNeeded() { } } -type sortedLabels []prompb.Label +type sortedLabels []prompbmarshal.Label func (sl *sortedLabels) Len() int { return len(*sl) } func (sl *sortedLabels) Less(i, j int) bool { diff --git a/app/vminsert/influx/request_handler.go b/app/vminsert/influx/request_handler.go index 632501814..2ad0e4e94 100644 --- a/app/vminsert/influx/request_handler.go +++ b/app/vminsert/influx/request_handler.go @@ -9,7 +9,6 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/common" "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/relabel" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/influx" @@ -150,7 +149,7 @@ type pushCtx struct { Common common.InsertCtx metricNameBuf []byte metricGroupBuf []byte - originLabels []prompb.Label + originLabels []prompbmarshal.Label } func (ctx *pushCtx) reset() { @@ -160,7 +159,7 @@ func (ctx *pushCtx) reset() { originLabels := ctx.originLabels for i := range originLabels { - originLabels[i] = prompb.Label{} + originLabels[i] = prompbmarshal.Label{} } ctx.originLabels = originLabels[:0] } diff --git a/app/vminsert/relabel/relabel.go b/app/vminsert/relabel/relabel.go index 17a969445..444e05d0b 100644 --- a/app/vminsert/relabel/relabel.go +++ b/app/vminsert/relabel/relabel.go @@ -8,7 +8,6 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/procutil" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel" "github.com/VictoriaMetrics/metrics" @@ -108,7 +107,7 @@ func (ctx *Ctx) Reset() { // ApplyRelabeling applies relabeling to the given labels and returns the result. // // The returned labels are valid until the next call to ApplyRelabeling. -func (ctx *Ctx) ApplyRelabeling(labels []prompb.Label) []prompb.Label { +func (ctx *Ctx) ApplyRelabeling(labels []prompbmarshal.Label) []prompbmarshal.Label { pcs := pcsGlobal.Load() if pcs.Len() == 0 && !*usePromCompatibleNaming { // There are no relabeling rules. @@ -159,7 +158,7 @@ func (ctx *Ctx) ApplyRelabeling(labels []prompb.Label) []prompb.Label { name = "" } value := label.Value - dst = append(dst, prompb.Label{ + dst = append(dst, prompbmarshal.Label{ Name: name, Value: value, }) diff --git a/app/vmselect/graphite/tags_api.go b/app/vmselect/graphite/tags_api.go index d558388a0..393478d69 100644 --- a/app/vmselect/graphite/tags_api.go +++ b/app/vmselect/graphite/tags_api.go @@ -14,7 +14,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/app/vmstorage" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bufferedwriter" "github.com/VictoriaMetrics/VictoriaMetrics/lib/httputils" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" graphiteparser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/graphite" "github.com/VictoriaMetrics/VictoriaMetrics/lib/storage" "github.com/VictoriaMetrics/metrics" @@ -95,7 +95,7 @@ func registerMetrics(startTime time.Time, w http.ResponseWriter, r *http.Request _ = deadline // TODO: use the deadline as in the cluster branch paths := r.Form["path"] var row graphiteparser.Row - var labels []prompb.Label + var labels []prompbmarshal.Label var b []byte var tagsPool []graphiteparser.Tag mrs := make([]storage.MetricRow, len(paths)) @@ -122,12 +122,12 @@ func registerMetrics(startTime time.Time, w http.ResponseWriter, r *http.Request canonicalPaths[i] = string(b) // Convert parsed metric and tags to labels. - labels = append(labels[:0], prompb.Label{ + labels = append(labels[:0], prompbmarshal.Label{ Name: "__name__", Value: row.Metric, }) for _, tag := range row.Tags { - labels = append(labels, prompb.Label{ + labels = append(labels, prompbmarshal.Label{ Name: tag.Key, Value: tag.Value, }) diff --git a/lib/storage/metric_name.go b/lib/storage/metric_name.go index bbf262c3e..1890e9cbc 100644 --- a/lib/storage/metric_name.go +++ b/lib/storage/metric_name.go @@ -14,7 +14,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil" ) @@ -506,7 +506,7 @@ func SetMaxLabelsPerTimeseries(maxLabels int) { // MarshalMetricNameRaw marshals labels to dst and returns the result. // // The result must be unmarshaled with MetricName.UnmarshalRaw -func MarshalMetricNameRaw(dst []byte, labels []prompb.Label) []byte { +func MarshalMetricNameRaw(dst []byte, labels []prompbmarshal.Label) []byte { // Calculate the required space for dst. dstLen := len(dst) dstSize := dstLen @@ -564,7 +564,7 @@ var ( TooLongLabelValues atomic.Uint64 ) -func trackDroppedLabels(labels, droppedLabels []prompb.Label) { +func trackDroppedLabels(labels, droppedLabels []prompbmarshal.Label) { MetricsWithDroppedLabels.Add(1) select { case <-droppedLabelsLogTicker.C: @@ -577,7 +577,7 @@ func trackDroppedLabels(labels, droppedLabels []prompb.Label) { } } -func trackTruncatedLabels(labels []prompb.Label, truncated *prompb.Label) { +func trackTruncatedLabels(labels []prompbmarshal.Label, truncated *prompbmarshal.Label) { TooLongLabelValues.Add(1) select { case <-truncatedLabelsLogTicker.C: @@ -595,8 +595,8 @@ var ( truncatedLabelsLogTicker = time.NewTicker(5 * time.Second) ) -func labelsToString(labels []prompb.Label) string { - labelsCopy := append([]prompb.Label{}, labels...) +func labelsToString(labels []prompbmarshal.Label) string { + labelsCopy := append([]prompbmarshal.Label{}, labels...) sort.Slice(labelsCopy, func(i, j int) bool { return string(labelsCopy[i].Name) < string(labelsCopy[j].Name) })