doing similar changes for both vmagent and vminsert ends up with almost same implementations. one of the reasons is the same Timeseries structure from different prompb and prompbmarshal packages. My proposal is to use structures from prompb package only to marshal/unmarshal sent/received data, but for internal transformations use only structures from prompbmarshal package

This commit is contained in:
Andrii Chubatiuk 2024-10-31 21:56:46 +02:00
parent 5cc2e49297
commit 2d7b3b1a5c
No known key found for this signature in database
GPG key ID: 96D776CC99880667
8 changed files with 31 additions and 33 deletions

View file

@ -10,7 +10,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/prometheus" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/prometheus"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage" "github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
) )
@ -48,7 +48,7 @@ func selfScraper(scrapeInterval time.Duration) {
var bb bytesutil.ByteBuffer var bb bytesutil.ByteBuffer
var rows prometheus.Rows var rows prometheus.Rows
var mrs []storage.MetricRow var mrs []storage.MetricRow
var labels []prompb.Label var labels []prompbmarshal.Label
t := time.NewTicker(scrapeInterval) t := time.NewTicker(scrapeInterval)
f := func(currentTime time.Time, sendStaleMarkers bool) { f := func(currentTime time.Time, sendStaleMarkers bool) {
currentTimestamp := currentTime.UnixNano() / 1e6 currentTimestamp := currentTime.UnixNano() / 1e6
@ -99,11 +99,11 @@ func selfScraper(scrapeInterval time.Duration) {
} }
} }
func addLabel(dst []prompb.Label, key, value string) []prompb.Label { func addLabel(dst []prompbmarshal.Label, key, value string) []prompbmarshal.Label {
if len(dst) < cap(dst) { if len(dst) < cap(dst) {
dst = dst[:len(dst)+1] dst = dst[:len(dst)+1]
} else { } else {
dst = append(dst, prompb.Label{}) dst = append(dst, prompbmarshal.Label{})
} }
lb := &dst[len(dst)-1] lb := &dst[len(dst)-1]
lb.Name = key lb.Name = key

View file

@ -17,7 +17,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/vm" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/vm"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/promql" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/promql"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmstorage" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmstorage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage" "github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
) )
@ -214,15 +214,15 @@ func processFlags() {
func fillStorage(series []vm.TimeSeries) error { func fillStorage(series []vm.TimeSeries) error {
var mrs []storage.MetricRow var mrs []storage.MetricRow
for _, series := range series { for _, series := range series {
var labels []prompb.Label var labels []prompbmarshal.Label
for _, lp := range series.LabelPairs { for _, lp := range series.LabelPairs {
labels = append(labels, prompb.Label{ labels = append(labels, prompbmarshal.Label{
Name: lp.Name, Name: lp.Name,
Value: lp.Value, Value: lp.Value,
}) })
} }
if series.Name != "" { if series.Name != "" {
labels = append(labels, prompb.Label{ labels = append(labels, prompbmarshal.Label{
Name: "__name__", Name: "__name__",
Value: series.Name, Value: series.Name,
}) })

View file

@ -8,7 +8,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmstorage" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmstorage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver" "github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage" "github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
) )
@ -30,7 +30,7 @@ type InsertCtx struct {
func (ctx *InsertCtx) Reset(rowsLen int) { func (ctx *InsertCtx) Reset(rowsLen int) {
labels := ctx.Labels labels := ctx.Labels
for i := range labels { for i := range labels {
labels[i] = prompb.Label{} labels[i] = prompbmarshal.Label{}
} }
ctx.Labels = labels[:0] ctx.Labels = labels[:0]
@ -51,7 +51,7 @@ func cleanMetricRow(mr *storage.MetricRow) {
mr.MetricNameRaw = nil mr.MetricNameRaw = nil
} }
func (ctx *InsertCtx) marshalMetricNameRaw(prefix []byte, labels []prompb.Label) []byte { func (ctx *InsertCtx) marshalMetricNameRaw(prefix []byte, labels []prompbmarshal.Label) []byte {
start := len(ctx.metricNamesBuf) start := len(ctx.metricNamesBuf)
ctx.metricNamesBuf = append(ctx.metricNamesBuf, prefix...) ctx.metricNamesBuf = append(ctx.metricNamesBuf, prefix...)
ctx.metricNamesBuf = storage.MarshalMetricNameRaw(ctx.metricNamesBuf, labels) ctx.metricNamesBuf = storage.MarshalMetricNameRaw(ctx.metricNamesBuf, labels)
@ -60,7 +60,7 @@ func (ctx *InsertCtx) marshalMetricNameRaw(prefix []byte, labels []prompb.Label)
} }
// WriteDataPoint writes (timestamp, value) with the given prefix and labels into ctx buffer. // WriteDataPoint writes (timestamp, value) with the given prefix and labels into ctx buffer.
func (ctx *InsertCtx) WriteDataPoint(prefix []byte, labels []prompb.Label, timestamp int64, value float64) error { func (ctx *InsertCtx) WriteDataPoint(prefix []byte, labels []prompbmarshal.Label, timestamp int64, value float64) error {
metricNameRaw := ctx.marshalMetricNameRaw(prefix, labels) metricNameRaw := ctx.marshalMetricNameRaw(prefix, labels)
return ctx.addRow(metricNameRaw, timestamp, value) return ctx.addRow(metricNameRaw, timestamp, value)
} }
@ -68,7 +68,7 @@ func (ctx *InsertCtx) WriteDataPoint(prefix []byte, labels []prompb.Label, times
// WriteDataPointExt writes (timestamp, value) with the given metricNameRaw and labels into ctx buffer. // WriteDataPointExt writes (timestamp, value) with the given metricNameRaw and labels into ctx buffer.
// //
// It returns metricNameRaw for the given labels if len(metricNameRaw) == 0. // It returns metricNameRaw for the given labels if len(metricNameRaw) == 0.
func (ctx *InsertCtx) WriteDataPointExt(metricNameRaw []byte, labels []prompb.Label, timestamp int64, value float64) ([]byte, error) { func (ctx *InsertCtx) WriteDataPointExt(metricNameRaw []byte, labels []prompbmarshal.Label, timestamp int64, value float64) ([]byte, error) {
if len(metricNameRaw) == 0 { if len(metricNameRaw) == 0 {
metricNameRaw = ctx.marshalMetricNameRaw(nil, labels) metricNameRaw = ctx.marshalMetricNameRaw(nil, labels)
} }
@ -106,7 +106,7 @@ func (ctx *InsertCtx) AddLabelBytes(name, value []byte) {
// Do not skip labels with empty name, since they are equal to __name__. // Do not skip labels with empty name, since they are equal to __name__.
return return
} }
ctx.Labels = append(ctx.Labels, prompb.Label{ ctx.Labels = append(ctx.Labels, prompbmarshal.Label{
// Do not copy name and value contents for performance reasons. // Do not copy name and value contents for performance reasons.
// This reduces GC overhead on the number of objects and allocations. // This reduces GC overhead on the number of objects and allocations.
Name: bytesutil.ToUnsafeString(name), Name: bytesutil.ToUnsafeString(name),
@ -124,7 +124,7 @@ func (ctx *InsertCtx) AddLabel(name, value string) {
// Do not skip labels with empty name, since they are equal to __name__. // Do not skip labels with empty name, since they are equal to __name__.
return return
} }
ctx.Labels = append(ctx.Labels, prompb.Label{ ctx.Labels = append(ctx.Labels, prompbmarshal.Label{
// Do not copy name and value contents for performance reasons. // Do not copy name and value contents for performance reasons.
// This reduces GC overhead on the number of objects and allocations. // This reduces GC overhead on the number of objects and allocations.
Name: name, Name: name,

View file

@ -4,7 +4,7 @@ import (
"flag" "flag"
"sort" "sort"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
) )
var sortLabels = flag.Bool("sortLabels", false, `Whether to sort labels for incoming samples before writing them to storage. `+ var sortLabels = flag.Bool("sortLabels", false, `Whether to sort labels for incoming samples before writing them to storage. `+
@ -19,7 +19,7 @@ func (ctx *InsertCtx) SortLabelsIfNeeded() {
} }
} }
type sortedLabels []prompb.Label type sortedLabels []prompbmarshal.Label
func (sl *sortedLabels) Len() int { return len(*sl) } func (sl *sortedLabels) Len() int { return len(*sl) }
func (sl *sortedLabels) Less(i, j int) bool { func (sl *sortedLabels) Less(i, j int) bool {

View file

@ -9,7 +9,6 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/common" "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/common"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/relabel" "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/relabel"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/influx" parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/influx"
@ -150,7 +149,7 @@ type pushCtx struct {
Common common.InsertCtx Common common.InsertCtx
metricNameBuf []byte metricNameBuf []byte
metricGroupBuf []byte metricGroupBuf []byte
originLabels []prompb.Label originLabels []prompbmarshal.Label
} }
func (ctx *pushCtx) reset() { func (ctx *pushCtx) reset() {
@ -160,7 +159,7 @@ func (ctx *pushCtx) reset() {
originLabels := ctx.originLabels originLabels := ctx.originLabels
for i := range originLabels { for i := range originLabels {
originLabels[i] = prompb.Label{} originLabels[i] = prompbmarshal.Label{}
} }
ctx.originLabels = originLabels[:0] ctx.originLabels = originLabels[:0]
} }

View file

@ -8,7 +8,6 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/procutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/procutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel"
"github.com/VictoriaMetrics/metrics" "github.com/VictoriaMetrics/metrics"
@ -108,7 +107,7 @@ func (ctx *Ctx) Reset() {
// ApplyRelabeling applies relabeling to the given labels and returns the result. // ApplyRelabeling applies relabeling to the given labels and returns the result.
// //
// The returned labels are valid until the next call to ApplyRelabeling. // The returned labels are valid until the next call to ApplyRelabeling.
func (ctx *Ctx) ApplyRelabeling(labels []prompb.Label) []prompb.Label { func (ctx *Ctx) ApplyRelabeling(labels []prompbmarshal.Label) []prompbmarshal.Label {
pcs := pcsGlobal.Load() pcs := pcsGlobal.Load()
if pcs.Len() == 0 && !*usePromCompatibleNaming { if pcs.Len() == 0 && !*usePromCompatibleNaming {
// There are no relabeling rules. // There are no relabeling rules.
@ -159,7 +158,7 @@ func (ctx *Ctx) ApplyRelabeling(labels []prompb.Label) []prompb.Label {
name = "" name = ""
} }
value := label.Value value := label.Value
dst = append(dst, prompb.Label{ dst = append(dst, prompbmarshal.Label{
Name: name, Name: name,
Value: value, Value: value,
}) })

View file

@ -14,7 +14,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmstorage" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmstorage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bufferedwriter" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bufferedwriter"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httputils" "github.com/VictoriaMetrics/VictoriaMetrics/lib/httputils"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
graphiteparser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/graphite" graphiteparser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/graphite"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage" "github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
"github.com/VictoriaMetrics/metrics" "github.com/VictoriaMetrics/metrics"
@ -95,7 +95,7 @@ func registerMetrics(startTime time.Time, w http.ResponseWriter, r *http.Request
_ = deadline // TODO: use the deadline as in the cluster branch _ = deadline // TODO: use the deadline as in the cluster branch
paths := r.Form["path"] paths := r.Form["path"]
var row graphiteparser.Row var row graphiteparser.Row
var labels []prompb.Label var labels []prompbmarshal.Label
var b []byte var b []byte
var tagsPool []graphiteparser.Tag var tagsPool []graphiteparser.Tag
mrs := make([]storage.MetricRow, len(paths)) mrs := make([]storage.MetricRow, len(paths))
@ -122,12 +122,12 @@ func registerMetrics(startTime time.Time, w http.ResponseWriter, r *http.Request
canonicalPaths[i] = string(b) canonicalPaths[i] = string(b)
// Convert parsed metric and tags to labels. // Convert parsed metric and tags to labels.
labels = append(labels[:0], prompb.Label{ labels = append(labels[:0], prompbmarshal.Label{
Name: "__name__", Name: "__name__",
Value: row.Metric, Value: row.Metric,
}) })
for _, tag := range row.Tags { for _, tag := range row.Tags {
labels = append(labels, prompb.Label{ labels = append(labels, prompbmarshal.Label{
Name: tag.Key, Name: tag.Key,
Value: tag.Value, Value: tag.Value,
}) })

View file

@ -14,7 +14,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding" "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil"
) )
@ -506,7 +506,7 @@ func SetMaxLabelsPerTimeseries(maxLabels int) {
// MarshalMetricNameRaw marshals labels to dst and returns the result. // MarshalMetricNameRaw marshals labels to dst and returns the result.
// //
// The result must be unmarshaled with MetricName.UnmarshalRaw // The result must be unmarshaled with MetricName.UnmarshalRaw
func MarshalMetricNameRaw(dst []byte, labels []prompb.Label) []byte { func MarshalMetricNameRaw(dst []byte, labels []prompbmarshal.Label) []byte {
// Calculate the required space for dst. // Calculate the required space for dst.
dstLen := len(dst) dstLen := len(dst)
dstSize := dstLen dstSize := dstLen
@ -564,7 +564,7 @@ var (
TooLongLabelValues atomic.Uint64 TooLongLabelValues atomic.Uint64
) )
func trackDroppedLabels(labels, droppedLabels []prompb.Label) { func trackDroppedLabels(labels, droppedLabels []prompbmarshal.Label) {
MetricsWithDroppedLabels.Add(1) MetricsWithDroppedLabels.Add(1)
select { select {
case <-droppedLabelsLogTicker.C: case <-droppedLabelsLogTicker.C:
@ -577,7 +577,7 @@ func trackDroppedLabels(labels, droppedLabels []prompb.Label) {
} }
} }
func trackTruncatedLabels(labels []prompb.Label, truncated *prompb.Label) { func trackTruncatedLabels(labels []prompbmarshal.Label, truncated *prompbmarshal.Label) {
TooLongLabelValues.Add(1) TooLongLabelValues.Add(1)
select { select {
case <-truncatedLabelsLogTicker.C: case <-truncatedLabelsLogTicker.C:
@ -595,8 +595,8 @@ var (
truncatedLabelsLogTicker = time.NewTicker(5 * time.Second) truncatedLabelsLogTicker = time.NewTicker(5 * time.Second)
) )
func labelsToString(labels []prompb.Label) string { func labelsToString(labels []prompbmarshal.Label) string {
labelsCopy := append([]prompb.Label{}, labels...) labelsCopy := append([]prompbmarshal.Label{}, labels...)
sort.Slice(labelsCopy, func(i, j int) bool { sort.Slice(labelsCopy, func(i, j int) bool {
return string(labelsCopy[i].Name) < string(labelsCopy[j].Name) return string(labelsCopy[i].Name) < string(labelsCopy[j].Name)
}) })