app/vmagent: fix a bug with improper relabeling when multiple -remoteWrite.urlRelableConfig args are set

This bug could result in incorrect relabeling and metrics' drop.

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/467
This commit is contained in:
Aliaksandr Valialkin 2020-05-12 22:01:47 +03:00
parent faf92a0965
commit 96e001d254
2 changed files with 19 additions and 3 deletions

View file

@ -87,6 +87,8 @@ func Stop() {
// Push sends wr to remote storage systems set via `-remoteWrite.url`. // Push sends wr to remote storage systems set via `-remoteWrite.url`.
// //
// Each timeseries in wr.Timeseries must contain one sample. // Each timeseries in wr.Timeseries must contain one sample.
//
// Note that wr may be modified by Push due to relabeling.
func Push(wr *prompbmarshal.WriteRequest) { func Push(wr *prompbmarshal.WriteRequest) {
var rctx *relabelCtx var rctx *relabelCtx
if len(prcsGlobal) > 0 || len(labelsGlobal) > 0 { if len(prcsGlobal) > 0 || len(labelsGlobal) > 0 {
@ -128,6 +130,8 @@ type remoteWriteCtx struct {
pss []*pendingSeries pss []*pendingSeries
pssNextIdx uint64 pssNextIdx uint64
tss []prompbmarshal.TimeSeries
relabelMetricsDropped *metrics.Counter relabelMetricsDropped *metrics.Counter
} }
@ -181,6 +185,11 @@ func (rwctx *remoteWriteCtx) MustStop() {
func (rwctx *remoteWriteCtx) Push(tss []prompbmarshal.TimeSeries) { func (rwctx *remoteWriteCtx) Push(tss []prompbmarshal.TimeSeries) {
var rctx *relabelCtx var rctx *relabelCtx
if len(rwctx.prcs) > 0 { if len(rwctx.prcs) > 0 {
// Make a copy of tss before applying relabeling in order to prevent
// from affecting time series for other remoteWrite.url configs.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/467 for details.
rwctx.tss = append(rwctx.tss[:0], tss...)
tss = rwctx.tss
rctx = getRelabelCtx() rctx = getRelabelCtx()
tssLen := len(tss) tssLen := len(tss)
tss = rctx.applyRelabeling(tss, nil, rwctx.prcs) tss = rctx.applyRelabeling(tss, nil, rwctx.prcs)
@ -191,5 +200,7 @@ func (rwctx *remoteWriteCtx) Push(tss []prompbmarshal.TimeSeries) {
pss[idx].Push(tss) pss[idx].Push(tss)
if rctx != nil { if rctx != nil {
putRelabelCtx(rctx) putRelabelCtx(rctx)
// Zero rwctx.tss in order to free up GC references.
rwctx.tss = prompbmarshal.ResetTimeSeries(rwctx.tss)
} }
} }

View file

@ -21,10 +21,15 @@ func MarshalWriteRequest(dst []byte, wr *WriteRequest) []byte {
// ResetWriteRequest resets wr. // ResetWriteRequest resets wr.
func ResetWriteRequest(wr *WriteRequest) { func ResetWriteRequest(wr *WriteRequest) {
for i := range wr.Timeseries { wr.Timeseries = ResetTimeSeries(wr.Timeseries)
ts := wr.Timeseries[i] }
// ResetTimeSeries clears all the GC references from tss and returns an empty tss ready for further use.
func ResetTimeSeries(tss []TimeSeries) []TimeSeries {
for i := range tss {
ts := tss[i]
ts.Labels = nil ts.Labels = nil
ts.Samples = nil ts.Samples = nil
} }
wr.Timeseries = wr.Timeseries[:0] return tss[:0]
} }