From 9cfdbc582f40c7fa04c1744cc28057a41c9d3e6e Mon Sep 17 00:00:00 2001 From: Andrii Chubatiuk Date: Tue, 26 Nov 2024 13:45:17 +0200 Subject: [PATCH] refactoring: changed prompb to prompbmarshal everythere where internal series transformations are happening (#7409) ### Describe Your Changes doing similar changes for both vmagent and vminsert (like one in https://github.com/VictoriaMetrics/VictoriaMetrics/pull/7399) ends up with almost same implementations for each of packages instead of having this shared code in one place. one of the reasons is the same Timeseries and Labels structure from different prompb and prompbmarshal packages. My proposal is to use structures from prompb package only to marshal/unmarshal sent/received data, but for internal transformations use only structures from prompbmarshal package Another example, where it already can help to simplify code is streaming aggregation pipeline for vmsingle (now it first marshals prompb.Timeseries to storage.MetricRow and then if streaming aggregation or deduplication is enabled it unmarshals all the series back but to prompbmarshal.Timeseries) ### Checklist The following checks are **mandatory**: - [ ] My change adheres [VictoriaMetrics contributing guidelines](https://docs.victoriametrics.com/contributing/). --- app/victoria-metrics/self_scraper.go | 8 ++++---- app/vmctl/vm_native_test.go | 8 ++++---- app/vminsert/common/insert_ctx.go | 14 +++++++------- app/vminsert/common/sort_labels.go | 4 ++-- app/vminsert/influx/request_handler.go | 5 ++--- app/vminsert/relabel/relabel.go | 5 ++--- app/vmselect/graphite/tags_api.go | 8 ++++---- lib/storage/metric_name.go | 12 ++++++------ 8 files changed, 31 insertions(+), 33 deletions(-) diff --git a/app/victoria-metrics/self_scraper.go b/app/victoria-metrics/self_scraper.go index cc405884d6..f1e7a4d3a5 100644 --- a/app/victoria-metrics/self_scraper.go +++ b/app/victoria-metrics/self_scraper.go @@ -10,7 +10,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/prometheus" "github.com/VictoriaMetrics/VictoriaMetrics/lib/storage" ) @@ -48,7 +48,7 @@ func selfScraper(scrapeInterval time.Duration) { var bb bytesutil.ByteBuffer var rows prometheus.Rows var mrs []storage.MetricRow - var labels []prompb.Label + var labels []prompbmarshal.Label t := time.NewTicker(scrapeInterval) f := func(currentTime time.Time, sendStaleMarkers bool) { currentTimestamp := currentTime.UnixNano() / 1e6 @@ -99,11 +99,11 @@ func selfScraper(scrapeInterval time.Duration) { } } -func addLabel(dst []prompb.Label, key, value string) []prompb.Label { +func addLabel(dst []prompbmarshal.Label, key, value string) []prompbmarshal.Label { if len(dst) < cap(dst) { dst = dst[:len(dst)+1] } else { - dst = append(dst, prompb.Label{}) + dst = append(dst, prompbmarshal.Label{}) } lb := &dst[len(dst)-1] lb.Name = key diff --git a/app/vmctl/vm_native_test.go b/app/vmctl/vm_native_test.go index fa7bf4b686..f94ca7cb2f 100644 --- a/app/vmctl/vm_native_test.go +++ b/app/vmctl/vm_native_test.go @@ -17,7 +17,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/vm" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/promql" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmstorage" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/storage" ) @@ -214,15 +214,15 @@ func processFlags() { func fillStorage(series []vm.TimeSeries) error { var mrs []storage.MetricRow for _, series := range series { - var labels []prompb.Label + var labels []prompbmarshal.Label for _, lp := range series.LabelPairs { - labels = append(labels, prompb.Label{ + labels = append(labels, prompbmarshal.Label{ Name: lp.Name, Value: lp.Value, }) } if series.Name != "" { - labels = append(labels, prompb.Label{ + labels = append(labels, prompbmarshal.Label{ Name: "__name__", Value: series.Name, }) diff --git a/app/vminsert/common/insert_ctx.go b/app/vminsert/common/insert_ctx.go index f7a06960b5..d7f89b9c6f 100644 --- a/app/vminsert/common/insert_ctx.go +++ b/app/vminsert/common/insert_ctx.go @@ -8,7 +8,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/app/vmstorage" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/storage" ) @@ -30,7 +30,7 @@ type InsertCtx struct { func (ctx *InsertCtx) Reset(rowsLen int) { labels := ctx.Labels for i := range labels { - labels[i] = prompb.Label{} + labels[i] = prompbmarshal.Label{} } ctx.Labels = labels[:0] @@ -51,7 +51,7 @@ func cleanMetricRow(mr *storage.MetricRow) { mr.MetricNameRaw = nil } -func (ctx *InsertCtx) marshalMetricNameRaw(prefix []byte, labels []prompb.Label) []byte { +func (ctx *InsertCtx) marshalMetricNameRaw(prefix []byte, labels []prompbmarshal.Label) []byte { start := len(ctx.metricNamesBuf) ctx.metricNamesBuf = append(ctx.metricNamesBuf, prefix...) ctx.metricNamesBuf = storage.MarshalMetricNameRaw(ctx.metricNamesBuf, labels) @@ -60,7 +60,7 @@ func (ctx *InsertCtx) marshalMetricNameRaw(prefix []byte, labels []prompb.Label) } // WriteDataPoint writes (timestamp, value) with the given prefix and labels into ctx buffer. -func (ctx *InsertCtx) WriteDataPoint(prefix []byte, labels []prompb.Label, timestamp int64, value float64) error { +func (ctx *InsertCtx) WriteDataPoint(prefix []byte, labels []prompbmarshal.Label, timestamp int64, value float64) error { metricNameRaw := ctx.marshalMetricNameRaw(prefix, labels) return ctx.addRow(metricNameRaw, timestamp, value) } @@ -68,7 +68,7 @@ func (ctx *InsertCtx) WriteDataPoint(prefix []byte, labels []prompb.Label, times // WriteDataPointExt writes (timestamp, value) with the given metricNameRaw and labels into ctx buffer. // // It returns metricNameRaw for the given labels if len(metricNameRaw) == 0. -func (ctx *InsertCtx) WriteDataPointExt(metricNameRaw []byte, labels []prompb.Label, timestamp int64, value float64) ([]byte, error) { +func (ctx *InsertCtx) WriteDataPointExt(metricNameRaw []byte, labels []prompbmarshal.Label, timestamp int64, value float64) ([]byte, error) { if len(metricNameRaw) == 0 { metricNameRaw = ctx.marshalMetricNameRaw(nil, labels) } @@ -106,7 +106,7 @@ func (ctx *InsertCtx) AddLabelBytes(name, value []byte) { // Do not skip labels with empty name, since they are equal to __name__. return } - ctx.Labels = append(ctx.Labels, prompb.Label{ + ctx.Labels = append(ctx.Labels, prompbmarshal.Label{ // Do not copy name and value contents for performance reasons. // This reduces GC overhead on the number of objects and allocations. Name: bytesutil.ToUnsafeString(name), @@ -124,7 +124,7 @@ func (ctx *InsertCtx) AddLabel(name, value string) { // Do not skip labels with empty name, since they are equal to __name__. return } - ctx.Labels = append(ctx.Labels, prompb.Label{ + ctx.Labels = append(ctx.Labels, prompbmarshal.Label{ // Do not copy name and value contents for performance reasons. // This reduces GC overhead on the number of objects and allocations. Name: name, diff --git a/app/vminsert/common/sort_labels.go b/app/vminsert/common/sort_labels.go index 16fa88cc07..5ca933708f 100644 --- a/app/vminsert/common/sort_labels.go +++ b/app/vminsert/common/sort_labels.go @@ -4,7 +4,7 @@ import ( "flag" "sort" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" ) var sortLabels = flag.Bool("sortLabels", false, `Whether to sort labels for incoming samples before writing them to storage. `+ @@ -19,7 +19,7 @@ func (ctx *InsertCtx) SortLabelsIfNeeded() { } } -type sortedLabels []prompb.Label +type sortedLabels []prompbmarshal.Label func (sl *sortedLabels) Len() int { return len(*sl) } func (sl *sortedLabels) Less(i, j int) bool { diff --git a/app/vminsert/influx/request_handler.go b/app/vminsert/influx/request_handler.go index 6325018148..2ad0e4e94d 100644 --- a/app/vminsert/influx/request_handler.go +++ b/app/vminsert/influx/request_handler.go @@ -9,7 +9,6 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/common" "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/relabel" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/influx" @@ -150,7 +149,7 @@ type pushCtx struct { Common common.InsertCtx metricNameBuf []byte metricGroupBuf []byte - originLabels []prompb.Label + originLabels []prompbmarshal.Label } func (ctx *pushCtx) reset() { @@ -160,7 +159,7 @@ func (ctx *pushCtx) reset() { originLabels := ctx.originLabels for i := range originLabels { - originLabels[i] = prompb.Label{} + originLabels[i] = prompbmarshal.Label{} } ctx.originLabels = originLabels[:0] } diff --git a/app/vminsert/relabel/relabel.go b/app/vminsert/relabel/relabel.go index 17a9694452..444e05d0bd 100644 --- a/app/vminsert/relabel/relabel.go +++ b/app/vminsert/relabel/relabel.go @@ -8,7 +8,6 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/procutil" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel" "github.com/VictoriaMetrics/metrics" @@ -108,7 +107,7 @@ func (ctx *Ctx) Reset() { // ApplyRelabeling applies relabeling to the given labels and returns the result. // // The returned labels are valid until the next call to ApplyRelabeling. -func (ctx *Ctx) ApplyRelabeling(labels []prompb.Label) []prompb.Label { +func (ctx *Ctx) ApplyRelabeling(labels []prompbmarshal.Label) []prompbmarshal.Label { pcs := pcsGlobal.Load() if pcs.Len() == 0 && !*usePromCompatibleNaming { // There are no relabeling rules. @@ -159,7 +158,7 @@ func (ctx *Ctx) ApplyRelabeling(labels []prompb.Label) []prompb.Label { name = "" } value := label.Value - dst = append(dst, prompb.Label{ + dst = append(dst, prompbmarshal.Label{ Name: name, Value: value, }) diff --git a/app/vmselect/graphite/tags_api.go b/app/vmselect/graphite/tags_api.go index d558388a0d..393478d698 100644 --- a/app/vmselect/graphite/tags_api.go +++ b/app/vmselect/graphite/tags_api.go @@ -14,7 +14,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/app/vmstorage" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bufferedwriter" "github.com/VictoriaMetrics/VictoriaMetrics/lib/httputils" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" graphiteparser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/graphite" "github.com/VictoriaMetrics/VictoriaMetrics/lib/storage" "github.com/VictoriaMetrics/metrics" @@ -95,7 +95,7 @@ func registerMetrics(startTime time.Time, w http.ResponseWriter, r *http.Request _ = deadline // TODO: use the deadline as in the cluster branch paths := r.Form["path"] var row graphiteparser.Row - var labels []prompb.Label + var labels []prompbmarshal.Label var b []byte var tagsPool []graphiteparser.Tag mrs := make([]storage.MetricRow, len(paths)) @@ -122,12 +122,12 @@ func registerMetrics(startTime time.Time, w http.ResponseWriter, r *http.Request canonicalPaths[i] = string(b) // Convert parsed metric and tags to labels. - labels = append(labels[:0], prompb.Label{ + labels = append(labels[:0], prompbmarshal.Label{ Name: "__name__", Value: row.Metric, }) for _, tag := range row.Tags { - labels = append(labels, prompb.Label{ + labels = append(labels, prompbmarshal.Label{ Name: tag.Key, Value: tag.Value, }) diff --git a/lib/storage/metric_name.go b/lib/storage/metric_name.go index bbf262c3e5..1890e9cbca 100644 --- a/lib/storage/metric_name.go +++ b/lib/storage/metric_name.go @@ -14,7 +14,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil" ) @@ -506,7 +506,7 @@ func SetMaxLabelsPerTimeseries(maxLabels int) { // MarshalMetricNameRaw marshals labels to dst and returns the result. // // The result must be unmarshaled with MetricName.UnmarshalRaw -func MarshalMetricNameRaw(dst []byte, labels []prompb.Label) []byte { +func MarshalMetricNameRaw(dst []byte, labels []prompbmarshal.Label) []byte { // Calculate the required space for dst. dstLen := len(dst) dstSize := dstLen @@ -564,7 +564,7 @@ var ( TooLongLabelValues atomic.Uint64 ) -func trackDroppedLabels(labels, droppedLabels []prompb.Label) { +func trackDroppedLabels(labels, droppedLabels []prompbmarshal.Label) { MetricsWithDroppedLabels.Add(1) select { case <-droppedLabelsLogTicker.C: @@ -577,7 +577,7 @@ func trackDroppedLabels(labels, droppedLabels []prompb.Label) { } } -func trackTruncatedLabels(labels []prompb.Label, truncated *prompb.Label) { +func trackTruncatedLabels(labels []prompbmarshal.Label, truncated *prompbmarshal.Label) { TooLongLabelValues.Add(1) select { case <-truncatedLabelsLogTicker.C: @@ -595,8 +595,8 @@ var ( truncatedLabelsLogTicker = time.NewTicker(5 * time.Second) ) -func labelsToString(labels []prompb.Label) string { - labelsCopy := append([]prompb.Label{}, labels...) +func labelsToString(labels []prompbmarshal.Label) string { + labelsCopy := append([]prompbmarshal.Label{}, labels...) sort.Slice(labelsCopy, func(i, j int) bool { return string(labelsCopy[i].Name) < string(labelsCopy[j].Name) })