mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-12-01 14:47:38 +00:00
refactoring: changed prompb to prompbmarshal everythere where internal series transformations are happening (#7409)
### Describe Your Changes doing similar changes for both vmagent and vminsert (like one in https://github.com/VictoriaMetrics/VictoriaMetrics/pull/7399) ends up with almost same implementations for each of packages instead of having this shared code in one place. one of the reasons is the same Timeseries and Labels structure from different prompb and prompbmarshal packages. My proposal is to use structures from prompb package only to marshal/unmarshal sent/received data, but for internal transformations use only structures from prompbmarshal package Another example, where it already can help to simplify code is streaming aggregation pipeline for vmsingle (now it first marshals prompb.Timeseries to storage.MetricRow and then if streaming aggregation or deduplication is enabled it unmarshals all the series back but to prompbmarshal.Timeseries) ### Checklist The following checks are **mandatory**: - [ ] My change adheres [VictoriaMetrics contributing guidelines](https://docs.victoriametrics.com/contributing/).
This commit is contained in:
parent
8ab1261750
commit
9cfdbc582f
8 changed files with 31 additions and 33 deletions
|
@ -10,7 +10,7 @@ import (
|
|||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/prometheus"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
|
||||
)
|
||||
|
@ -48,7 +48,7 @@ func selfScraper(scrapeInterval time.Duration) {
|
|||
var bb bytesutil.ByteBuffer
|
||||
var rows prometheus.Rows
|
||||
var mrs []storage.MetricRow
|
||||
var labels []prompb.Label
|
||||
var labels []prompbmarshal.Label
|
||||
t := time.NewTicker(scrapeInterval)
|
||||
f := func(currentTime time.Time, sendStaleMarkers bool) {
|
||||
currentTimestamp := currentTime.UnixNano() / 1e6
|
||||
|
@ -99,11 +99,11 @@ func selfScraper(scrapeInterval time.Duration) {
|
|||
}
|
||||
}
|
||||
|
||||
func addLabel(dst []prompb.Label, key, value string) []prompb.Label {
|
||||
func addLabel(dst []prompbmarshal.Label, key, value string) []prompbmarshal.Label {
|
||||
if len(dst) < cap(dst) {
|
||||
dst = dst[:len(dst)+1]
|
||||
} else {
|
||||
dst = append(dst, prompb.Label{})
|
||||
dst = append(dst, prompbmarshal.Label{})
|
||||
}
|
||||
lb := &dst[len(dst)-1]
|
||||
lb.Name = key
|
||||
|
|
|
@ -17,7 +17,7 @@ import (
|
|||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/vm"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/promql"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmstorage"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
|
||||
)
|
||||
|
||||
|
@ -214,15 +214,15 @@ func processFlags() {
|
|||
func fillStorage(series []vm.TimeSeries) error {
|
||||
var mrs []storage.MetricRow
|
||||
for _, series := range series {
|
||||
var labels []prompb.Label
|
||||
var labels []prompbmarshal.Label
|
||||
for _, lp := range series.LabelPairs {
|
||||
labels = append(labels, prompb.Label{
|
||||
labels = append(labels, prompbmarshal.Label{
|
||||
Name: lp.Name,
|
||||
Value: lp.Value,
|
||||
})
|
||||
}
|
||||
if series.Name != "" {
|
||||
labels = append(labels, prompb.Label{
|
||||
labels = append(labels, prompbmarshal.Label{
|
||||
Name: "__name__",
|
||||
Value: series.Name,
|
||||
})
|
||||
|
|
|
@ -8,7 +8,7 @@ import (
|
|||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmstorage"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
|
||||
)
|
||||
|
@ -30,7 +30,7 @@ type InsertCtx struct {
|
|||
func (ctx *InsertCtx) Reset(rowsLen int) {
|
||||
labels := ctx.Labels
|
||||
for i := range labels {
|
||||
labels[i] = prompb.Label{}
|
||||
labels[i] = prompbmarshal.Label{}
|
||||
}
|
||||
ctx.Labels = labels[:0]
|
||||
|
||||
|
@ -51,7 +51,7 @@ func cleanMetricRow(mr *storage.MetricRow) {
|
|||
mr.MetricNameRaw = nil
|
||||
}
|
||||
|
||||
func (ctx *InsertCtx) marshalMetricNameRaw(prefix []byte, labels []prompb.Label) []byte {
|
||||
func (ctx *InsertCtx) marshalMetricNameRaw(prefix []byte, labels []prompbmarshal.Label) []byte {
|
||||
start := len(ctx.metricNamesBuf)
|
||||
ctx.metricNamesBuf = append(ctx.metricNamesBuf, prefix...)
|
||||
ctx.metricNamesBuf = storage.MarshalMetricNameRaw(ctx.metricNamesBuf, labels)
|
||||
|
@ -60,7 +60,7 @@ func (ctx *InsertCtx) marshalMetricNameRaw(prefix []byte, labels []prompb.Label)
|
|||
}
|
||||
|
||||
// WriteDataPoint writes (timestamp, value) with the given prefix and labels into ctx buffer.
|
||||
func (ctx *InsertCtx) WriteDataPoint(prefix []byte, labels []prompb.Label, timestamp int64, value float64) error {
|
||||
func (ctx *InsertCtx) WriteDataPoint(prefix []byte, labels []prompbmarshal.Label, timestamp int64, value float64) error {
|
||||
metricNameRaw := ctx.marshalMetricNameRaw(prefix, labels)
|
||||
return ctx.addRow(metricNameRaw, timestamp, value)
|
||||
}
|
||||
|
@ -68,7 +68,7 @@ func (ctx *InsertCtx) WriteDataPoint(prefix []byte, labels []prompb.Label, times
|
|||
// WriteDataPointExt writes (timestamp, value) with the given metricNameRaw and labels into ctx buffer.
|
||||
//
|
||||
// It returns metricNameRaw for the given labels if len(metricNameRaw) == 0.
|
||||
func (ctx *InsertCtx) WriteDataPointExt(metricNameRaw []byte, labels []prompb.Label, timestamp int64, value float64) ([]byte, error) {
|
||||
func (ctx *InsertCtx) WriteDataPointExt(metricNameRaw []byte, labels []prompbmarshal.Label, timestamp int64, value float64) ([]byte, error) {
|
||||
if len(metricNameRaw) == 0 {
|
||||
metricNameRaw = ctx.marshalMetricNameRaw(nil, labels)
|
||||
}
|
||||
|
@ -106,7 +106,7 @@ func (ctx *InsertCtx) AddLabelBytes(name, value []byte) {
|
|||
// Do not skip labels with empty name, since they are equal to __name__.
|
||||
return
|
||||
}
|
||||
ctx.Labels = append(ctx.Labels, prompb.Label{
|
||||
ctx.Labels = append(ctx.Labels, prompbmarshal.Label{
|
||||
// Do not copy name and value contents for performance reasons.
|
||||
// This reduces GC overhead on the number of objects and allocations.
|
||||
Name: bytesutil.ToUnsafeString(name),
|
||||
|
@ -124,7 +124,7 @@ func (ctx *InsertCtx) AddLabel(name, value string) {
|
|||
// Do not skip labels with empty name, since they are equal to __name__.
|
||||
return
|
||||
}
|
||||
ctx.Labels = append(ctx.Labels, prompb.Label{
|
||||
ctx.Labels = append(ctx.Labels, prompbmarshal.Label{
|
||||
// Do not copy name and value contents for performance reasons.
|
||||
// This reduces GC overhead on the number of objects and allocations.
|
||||
Name: name,
|
||||
|
|
|
@ -4,7 +4,7 @@ import (
|
|||
"flag"
|
||||
"sort"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
||||
)
|
||||
|
||||
var sortLabels = flag.Bool("sortLabels", false, `Whether to sort labels for incoming samples before writing them to storage. `+
|
||||
|
@ -19,7 +19,7 @@ func (ctx *InsertCtx) SortLabelsIfNeeded() {
|
|||
}
|
||||
}
|
||||
|
||||
type sortedLabels []prompb.Label
|
||||
type sortedLabels []prompbmarshal.Label
|
||||
|
||||
func (sl *sortedLabels) Len() int { return len(*sl) }
|
||||
func (sl *sortedLabels) Less(i, j int) bool {
|
||||
|
|
|
@ -9,7 +9,6 @@ import (
|
|||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/common"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/relabel"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
||||
parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
|
||||
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/influx"
|
||||
|
@ -150,7 +149,7 @@ type pushCtx struct {
|
|||
Common common.InsertCtx
|
||||
metricNameBuf []byte
|
||||
metricGroupBuf []byte
|
||||
originLabels []prompb.Label
|
||||
originLabels []prompbmarshal.Label
|
||||
}
|
||||
|
||||
func (ctx *pushCtx) reset() {
|
||||
|
@ -160,7 +159,7 @@ func (ctx *pushCtx) reset() {
|
|||
|
||||
originLabels := ctx.originLabels
|
||||
for i := range originLabels {
|
||||
originLabels[i] = prompb.Label{}
|
||||
originLabels[i] = prompbmarshal.Label{}
|
||||
}
|
||||
ctx.originLabels = originLabels[:0]
|
||||
}
|
||||
|
|
|
@ -8,7 +8,6 @@ import (
|
|||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/procutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel"
|
||||
"github.com/VictoriaMetrics/metrics"
|
||||
|
@ -108,7 +107,7 @@ func (ctx *Ctx) Reset() {
|
|||
// ApplyRelabeling applies relabeling to the given labels and returns the result.
|
||||
//
|
||||
// The returned labels are valid until the next call to ApplyRelabeling.
|
||||
func (ctx *Ctx) ApplyRelabeling(labels []prompb.Label) []prompb.Label {
|
||||
func (ctx *Ctx) ApplyRelabeling(labels []prompbmarshal.Label) []prompbmarshal.Label {
|
||||
pcs := pcsGlobal.Load()
|
||||
if pcs.Len() == 0 && !*usePromCompatibleNaming {
|
||||
// There are no relabeling rules.
|
||||
|
@ -159,7 +158,7 @@ func (ctx *Ctx) ApplyRelabeling(labels []prompb.Label) []prompb.Label {
|
|||
name = ""
|
||||
}
|
||||
value := label.Value
|
||||
dst = append(dst, prompb.Label{
|
||||
dst = append(dst, prompbmarshal.Label{
|
||||
Name: name,
|
||||
Value: value,
|
||||
})
|
||||
|
|
|
@ -14,7 +14,7 @@ import (
|
|||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmstorage"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bufferedwriter"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httputils"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
||||
graphiteparser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/graphite"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
|
||||
"github.com/VictoriaMetrics/metrics"
|
||||
|
@ -95,7 +95,7 @@ func registerMetrics(startTime time.Time, w http.ResponseWriter, r *http.Request
|
|||
_ = deadline // TODO: use the deadline as in the cluster branch
|
||||
paths := r.Form["path"]
|
||||
var row graphiteparser.Row
|
||||
var labels []prompb.Label
|
||||
var labels []prompbmarshal.Label
|
||||
var b []byte
|
||||
var tagsPool []graphiteparser.Tag
|
||||
mrs := make([]storage.MetricRow, len(paths))
|
||||
|
@ -122,12 +122,12 @@ func registerMetrics(startTime time.Time, w http.ResponseWriter, r *http.Request
|
|||
canonicalPaths[i] = string(b)
|
||||
|
||||
// Convert parsed metric and tags to labels.
|
||||
labels = append(labels[:0], prompb.Label{
|
||||
labels = append(labels[:0], prompbmarshal.Label{
|
||||
Name: "__name__",
|
||||
Value: row.Metric,
|
||||
})
|
||||
for _, tag := range row.Tags {
|
||||
labels = append(labels, prompb.Label{
|
||||
labels = append(labels, prompbmarshal.Label{
|
||||
Name: tag.Key,
|
||||
Value: tag.Value,
|
||||
})
|
||||
|
|
|
@ -14,7 +14,7 @@ import (
|
|||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil"
|
||||
)
|
||||
|
||||
|
@ -506,7 +506,7 @@ func SetMaxLabelsPerTimeseries(maxLabels int) {
|
|||
// MarshalMetricNameRaw marshals labels to dst and returns the result.
|
||||
//
|
||||
// The result must be unmarshaled with MetricName.UnmarshalRaw
|
||||
func MarshalMetricNameRaw(dst []byte, labels []prompb.Label) []byte {
|
||||
func MarshalMetricNameRaw(dst []byte, labels []prompbmarshal.Label) []byte {
|
||||
// Calculate the required space for dst.
|
||||
dstLen := len(dst)
|
||||
dstSize := dstLen
|
||||
|
@ -564,7 +564,7 @@ var (
|
|||
TooLongLabelValues atomic.Uint64
|
||||
)
|
||||
|
||||
func trackDroppedLabels(labels, droppedLabels []prompb.Label) {
|
||||
func trackDroppedLabels(labels, droppedLabels []prompbmarshal.Label) {
|
||||
MetricsWithDroppedLabels.Add(1)
|
||||
select {
|
||||
case <-droppedLabelsLogTicker.C:
|
||||
|
@ -577,7 +577,7 @@ func trackDroppedLabels(labels, droppedLabels []prompb.Label) {
|
|||
}
|
||||
}
|
||||
|
||||
func trackTruncatedLabels(labels []prompb.Label, truncated *prompb.Label) {
|
||||
func trackTruncatedLabels(labels []prompbmarshal.Label, truncated *prompbmarshal.Label) {
|
||||
TooLongLabelValues.Add(1)
|
||||
select {
|
||||
case <-truncatedLabelsLogTicker.C:
|
||||
|
@ -595,8 +595,8 @@ var (
|
|||
truncatedLabelsLogTicker = time.NewTicker(5 * time.Second)
|
||||
)
|
||||
|
||||
func labelsToString(labels []prompb.Label) string {
|
||||
labelsCopy := append([]prompb.Label{}, labels...)
|
||||
func labelsToString(labels []prompbmarshal.Label) string {
|
||||
labelsCopy := append([]prompbmarshal.Label{}, labels...)
|
||||
sort.Slice(labelsCopy, func(i, j int) bool {
|
||||
return string(labelsCopy[i].Name) < string(labelsCopy[j].Name)
|
||||
})
|
||||
|
|
Loading…
Reference in a new issue