mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-03-01 15:33:35 +00:00
refactoring: changed prompb to prompbmarshal everythere where internal series transformations are happening (#7409)
doing similar changes for both vmagent and vminsert (like one in https://github.com/VictoriaMetrics/VictoriaMetrics/pull/7399) ends up with almost same implementations for each of packages instead of having this shared code in one place. one of the reasons is the same Timeseries and Labels structure from different prompb and prompbmarshal packages. My proposal is to use structures from prompb package only to marshal/unmarshal sent/received data, but for internal transformations use only structures from prompbmarshal package Another example, where it already can help to simplify code is streaming aggregation pipeline for vmsingle (now it first marshals prompb.Timeseries to storage.MetricRow and then if streaming aggregation or deduplication is enabled it unmarshals all the series back but to prompbmarshal.Timeseries) The following checks are **mandatory**: - [ ] My change adheres [VictoriaMetrics contributing guidelines](https://docs.victoriametrics.com/contributing/).
This commit is contained in:
parent
1753c3850b
commit
ed9ab2ea73
6 changed files with 24 additions and 26 deletions
|
@ -10,7 +10,6 @@ import (
|
|||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/relabel"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
||||
parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
|
||||
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/influx"
|
||||
|
@ -163,7 +162,7 @@ func insertRows(at *auth.Token, db string, rows []parser.Row, extraLabels []prom
|
|||
type pushCtx struct {
|
||||
Common netstorage.InsertCtx
|
||||
metricGroupBuf []byte
|
||||
originLabels []prompb.Label
|
||||
originLabels []prompbmarshal.Label
|
||||
}
|
||||
|
||||
func (ctx *pushCtx) reset() {
|
||||
|
@ -172,7 +171,7 @@ func (ctx *pushCtx) reset() {
|
|||
|
||||
originLabels := ctx.originLabels
|
||||
for i := range originLabels {
|
||||
originLabels[i] = prompb.Label{}
|
||||
originLabels[i] = prompbmarshal.Label{}
|
||||
}
|
||||
ctx.originLabels = originLabels[:0]
|
||||
}
|
||||
|
|
|
@ -10,7 +10,7 @@ import (
|
|||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
|
||||
"github.com/cespare/xxhash/v2"
|
||||
)
|
||||
|
@ -60,7 +60,7 @@ func (ctx *InsertCtx) Reset() {
|
|||
|
||||
labels := ctx.Labels
|
||||
for i := range labels {
|
||||
labels[i] = prompb.Label{}
|
||||
labels[i] = prompbmarshal.Label{}
|
||||
}
|
||||
ctx.Labels = labels[:0]
|
||||
|
||||
|
@ -87,7 +87,7 @@ func (ctx *InsertCtx) AddLabelBytes(name, value []byte) {
|
|||
// Do not skip labels with empty name, since they are equal to __name__.
|
||||
return
|
||||
}
|
||||
ctx.Labels = append(ctx.Labels, prompb.Label{
|
||||
ctx.Labels = append(ctx.Labels, prompbmarshal.Label{
|
||||
// Do not copy name and value contents for performance reasons.
|
||||
// This reduces GC overhead on the number of objects and allocations.
|
||||
Name: bytesutil.ToUnsafeString(name),
|
||||
|
@ -105,7 +105,7 @@ func (ctx *InsertCtx) AddLabel(name, value string) {
|
|||
// Do not skip labels with empty name, since they are equal to __name__.
|
||||
return
|
||||
}
|
||||
ctx.Labels = append(ctx.Labels, prompb.Label{
|
||||
ctx.Labels = append(ctx.Labels, prompbmarshal.Label{
|
||||
// Do not copy name and value contents for performance reasons.
|
||||
// This reduces GC overhead on the number of objects and allocations.
|
||||
Name: name,
|
||||
|
@ -119,7 +119,7 @@ func (ctx *InsertCtx) ApplyRelabeling() {
|
|||
}
|
||||
|
||||
// WriteDataPoint writes (timestamp, value) data point with the given at and labels to ctx buffer.
|
||||
func (ctx *InsertCtx) WriteDataPoint(at *auth.Token, labels []prompb.Label, timestamp int64, value float64) error {
|
||||
func (ctx *InsertCtx) WriteDataPoint(at *auth.Token, labels []prompbmarshal.Label, timestamp int64, value float64) error {
|
||||
ctx.MetricNameBuf = storage.MarshalMetricNameRaw(ctx.MetricNameBuf[:0], at.AccountID, at.ProjectID, labels)
|
||||
storageNodeIdx := ctx.GetStorageNodeIdx(at, labels)
|
||||
return ctx.WriteDataPointExt(storageNodeIdx, ctx.MetricNameBuf, timestamp, value)
|
||||
|
@ -164,7 +164,7 @@ func (ctx *InsertCtx) FlushBufs() error {
|
|||
// GetStorageNodeIdx returns storage node index for the given at and labels.
|
||||
//
|
||||
// The returned index must be passed to WriteDataPoint.
|
||||
func (ctx *InsertCtx) GetStorageNodeIdx(at *auth.Token, labels []prompb.Label) int {
|
||||
func (ctx *InsertCtx) GetStorageNodeIdx(at *auth.Token, labels []prompbmarshal.Label) int {
|
||||
if len(ctx.snb.sns) == 1 {
|
||||
// Fast path - only a single storage node.
|
||||
return 0
|
||||
|
@ -223,7 +223,7 @@ func (ctx *InsertCtx) GetLocalAuthToken(at *auth.Token) *auth.Token {
|
|||
}
|
||||
cleanLabels := ctx.Labels[len(tmpLabels):]
|
||||
for i := range cleanLabels {
|
||||
cleanLabels[i] = prompb.Label{}
|
||||
cleanLabels[i] = prompbmarshal.Label{}
|
||||
}
|
||||
ctx.Labels = tmpLabels
|
||||
ctx.at.Set(accountID, projectID)
|
||||
|
|
|
@ -4,7 +4,7 @@ import (
|
|||
"flag"
|
||||
"sort"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
||||
)
|
||||
|
||||
var sortLabels = flag.Bool("sortLabels", false, `Whether to sort labels for incoming samples before writing them to storage. `+
|
||||
|
@ -19,7 +19,7 @@ func (ctx *InsertCtx) SortLabelsIfNeeded() {
|
|||
}
|
||||
}
|
||||
|
||||
type sortedLabels []prompb.Label
|
||||
type sortedLabels []prompbmarshal.Label
|
||||
|
||||
func (sl *sortedLabels) Len() int { return len(*sl) }
|
||||
func (sl *sortedLabels) Less(i, j int) bool {
|
||||
|
|
|
@ -10,7 +10,6 @@ import (
|
|||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/procutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel"
|
||||
"github.com/VictoriaMetrics/metrics"
|
||||
|
@ -160,7 +159,7 @@ func (ctx *Ctx) Reset() {
|
|||
// ApplyRelabeling applies relabeling to the given labels and returns the result.
|
||||
//
|
||||
// The returned labels are valid until the next call to ApplyRelabeling.
|
||||
func (ctx *Ctx) ApplyRelabeling(labels []prompb.Label) []prompb.Label {
|
||||
func (ctx *Ctx) ApplyRelabeling(labels []prompbmarshal.Label) []prompbmarshal.Label {
|
||||
pcs := pcsGlobal.Load()
|
||||
if pcs.Len() == 0 && !*usePromCompatibleNaming {
|
||||
// There are no relabeling rules.
|
||||
|
@ -211,7 +210,7 @@ func (ctx *Ctx) ApplyRelabeling(labels []prompb.Label) []prompb.Label {
|
|||
name = ""
|
||||
}
|
||||
value := label.Value
|
||||
dst = append(dst, prompb.Label{
|
||||
dst = append(dst, prompbmarshal.Label{
|
||||
Name: name,
|
||||
Value: value,
|
||||
})
|
||||
|
|
|
@ -14,7 +14,7 @@ import (
|
|||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bufferedwriter"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httputils"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
||||
graphiteparser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/graphite"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
|
||||
"github.com/VictoriaMetrics/metrics"
|
||||
|
@ -94,7 +94,7 @@ func registerMetrics(startTime time.Time, at *auth.Token, w http.ResponseWriter,
|
|||
deadline := searchutils.GetDeadlineForQuery(r, startTime)
|
||||
paths := r.Form["path"]
|
||||
var row graphiteparser.Row
|
||||
var labels []prompb.Label
|
||||
var labels []prompbmarshal.Label
|
||||
var b []byte
|
||||
var tagsPool []graphiteparser.Tag
|
||||
mrs := make([]storage.MetricRow, len(paths))
|
||||
|
@ -121,12 +121,12 @@ func registerMetrics(startTime time.Time, at *auth.Token, w http.ResponseWriter,
|
|||
canonicalPaths[i] = string(b)
|
||||
|
||||
// Convert parsed metric and tags to labels.
|
||||
labels = append(labels[:0], prompb.Label{
|
||||
labels = append(labels[:0], prompbmarshal.Label{
|
||||
Name: "__name__",
|
||||
Value: row.Metric,
|
||||
})
|
||||
for _, tag := range row.Tags {
|
||||
labels = append(labels, prompb.Label{
|
||||
labels = append(labels, prompbmarshal.Label{
|
||||
Name: tag.Key,
|
||||
Value: tag.Value,
|
||||
})
|
||||
|
|
|
@ -14,7 +14,7 @@ import (
|
|||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil"
|
||||
)
|
||||
|
||||
|
@ -573,7 +573,7 @@ func SetMaxLabelsPerTimeseries(maxLabels int) {
|
|||
// MarshalMetricNameRaw marshals labels to dst and returns the result.
|
||||
//
|
||||
// The result must be unmarshaled with MetricName.UnmarshalRaw
|
||||
func MarshalMetricNameRaw(dst []byte, accountID, projectID uint32, labels []prompb.Label) []byte {
|
||||
func MarshalMetricNameRaw(dst []byte, accountID, projectID uint32, labels []prompbmarshal.Label) []byte {
|
||||
// Calculate the required space for dst.
|
||||
dstLen := len(dst)
|
||||
dstSize := dstLen + 8
|
||||
|
@ -633,7 +633,7 @@ var (
|
|||
TooLongLabelValues atomic.Uint64
|
||||
)
|
||||
|
||||
func trackDroppedLabels(labels, droppedLabels []prompb.Label, accountID, projectID uint32) {
|
||||
func trackDroppedLabels(labels, droppedLabels []prompbmarshal.Label, accountID, projectID uint32) {
|
||||
MetricsWithDroppedLabels.Add(1)
|
||||
select {
|
||||
case <-droppedLabelsLogTicker.C:
|
||||
|
@ -646,7 +646,7 @@ func trackDroppedLabels(labels, droppedLabels []prompb.Label, accountID, project
|
|||
}
|
||||
}
|
||||
|
||||
func trackTruncatedLabels(labels []prompb.Label, truncated *prompb.Label, accountID, projectID uint32) {
|
||||
func trackTruncatedLabels(labels []prompbmarshal.Label, truncated *prompbmarshal.Label, accountID, projectID uint32) {
|
||||
TooLongLabelValues.Add(1)
|
||||
select {
|
||||
case <-truncatedLabelsLogTicker.C:
|
||||
|
@ -664,8 +664,8 @@ var (
|
|||
truncatedLabelsLogTicker = time.NewTicker(5 * time.Second)
|
||||
)
|
||||
|
||||
func labelsToString(labels []prompb.Label) string {
|
||||
labelsCopy := append([]prompb.Label{}, labels...)
|
||||
func labelsToString(labels []prompbmarshal.Label) string {
|
||||
labelsCopy := append([]prompbmarshal.Label{}, labels...)
|
||||
sort.Slice(labelsCopy, func(i, j int) bool {
|
||||
return string(labelsCopy[i].Name) < string(labelsCopy[j].Name)
|
||||
})
|
||||
|
@ -688,7 +688,7 @@ func labelsToString(labels []prompb.Label) string {
|
|||
}
|
||||
|
||||
// MarshalMetricLabelRaw marshals label to dst.
|
||||
func MarshalMetricLabelRaw(dst []byte, label *prompb.Label) []byte {
|
||||
func MarshalMetricLabelRaw(dst []byte, label *prompbmarshal.Label) []byte {
|
||||
dst = marshalStringFast(dst, label.Name)
|
||||
dst = marshalStringFast(dst, label.Value)
|
||||
return dst
|
||||
|
|
Loading…
Reference in a new issue