mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-02-09 15:27:11 +00:00
![Andrii Chubatiuk](/assets/img/avatar_default.png)
Previously, time series with labels exceeding the configured limits were truncated and written to storage, potentially causing data inconsistency. This could lead to collisions between time series and make it difficult to identify the source due to truncated labels. This commit changes the behavior: * Such time series are now rejected outright. * Rejected time series are logged to stdout, and corresponding counters are incremented. * removes `vm_too_long_label_values_total`, `vm_too_long_label_names_total`, `vm_metrics_with_dropped_labels_total` metrics. * adds new values `[too_many_labels,too_long_label_name,too_long_label_value]` to `reason` label of the `vm_rows_ignored_total` metric name related issues: - https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6928 - https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7661
73 lines
2.1 KiB
Go
73 lines
2.1 KiB
Go
package promremotewrite
|
|
|
|
import (
|
|
"net/http"
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/common"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/relabel"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
|
parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/promremotewrite/stream"
|
|
"github.com/VictoriaMetrics/metrics"
|
|
)
|
|
|
|
var (
|
|
rowsInserted = metrics.NewCounter(`vm_rows_inserted_total{type="promremotewrite"}`)
|
|
rowsPerInsert = metrics.NewHistogram(`vm_rows_per_insert{type="promremotewrite"}`)
|
|
)
|
|
|
|
// InsertHandler processes remote write for prometheus.
|
|
func InsertHandler(req *http.Request) error {
|
|
extraLabels, err := parserCommon.GetExtraLabels(req)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
isVMRemoteWrite := req.Header.Get("Content-Encoding") == "zstd"
|
|
return stream.Parse(req.Body, isVMRemoteWrite, func(tss []prompb.TimeSeries) error {
|
|
return insertRows(tss, extraLabels)
|
|
})
|
|
}
|
|
|
|
func insertRows(timeseries []prompb.TimeSeries, extraLabels []prompbmarshal.Label) error {
|
|
ctx := common.GetInsertCtx()
|
|
defer common.PutInsertCtx(ctx)
|
|
|
|
rowsLen := 0
|
|
for i := range timeseries {
|
|
rowsLen += len(timeseries[i].Samples)
|
|
}
|
|
ctx.Reset(rowsLen)
|
|
rowsTotal := 0
|
|
hasRelabeling := relabel.HasRelabeling()
|
|
for i := range timeseries {
|
|
ts := ×eries[i]
|
|
rowsTotal += len(ts.Samples)
|
|
ctx.Labels = ctx.Labels[:0]
|
|
srcLabels := ts.Labels
|
|
for _, srcLabel := range srcLabels {
|
|
ctx.AddLabel(srcLabel.Name, srcLabel.Value)
|
|
}
|
|
for j := range extraLabels {
|
|
label := &extraLabels[j]
|
|
ctx.AddLabel(label.Name, label.Value)
|
|
}
|
|
|
|
if !ctx.TryPrepareLabels(hasRelabeling) {
|
|
continue
|
|
}
|
|
var metricNameRaw []byte
|
|
var err error
|
|
samples := ts.Samples
|
|
for i := range samples {
|
|
r := &samples[i]
|
|
metricNameRaw, err = ctx.WriteDataPointExt(metricNameRaw, ctx.Labels, r.Timestamp, r.Value)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
rowsInserted.Add(rowsTotal)
|
|
rowsPerInsert.Update(float64(rowsTotal))
|
|
return ctx.FlushBufs()
|
|
}
|