From dcac849c1ffb4909e745cf51634fe1849b7278aa Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Thu, 20 May 2021 02:12:36 +0300 Subject: [PATCH] app/vmagent/remotewrite: sort labels before sending the series to per-remoteWrite.url queues --- app/vmagent/remotewrite/pendingseries.go | 1 - app/vmagent/remotewrite/remotewrite.go | 16 ++++++++ app/vmagent/remotewrite/sort_labels.go | 51 ------------------------ 3 files changed, 16 insertions(+), 52 deletions(-) delete mode 100644 app/vmagent/remotewrite/sort_labels.go diff --git a/app/vmagent/remotewrite/pendingseries.go b/app/vmagent/remotewrite/pendingseries.go index 05c5453315..a5dff6c7d6 100644 --- a/app/vmagent/remotewrite/pendingseries.go +++ b/app/vmagent/remotewrite/pendingseries.go @@ -128,7 +128,6 @@ func (wr *writeRequest) reset() { } func (wr *writeRequest) flush() { - sortLabelsIfNeeded(wr.tss) wr.wr.Timeseries = wr.tss wr.adjustSampleValues() atomic.StoreUint64(&wr.lastFlushTime, fasttime.UnixTimestamp()) diff --git a/app/vmagent/remotewrite/remotewrite.go b/app/vmagent/remotewrite/remotewrite.go index 09ec041028..de07aa7b48 100644 --- a/app/vmagent/remotewrite/remotewrite.go +++ b/app/vmagent/remotewrite/remotewrite.go @@ -13,6 +13,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/persistentqueue" "github.com/VictoriaMetrics/VictoriaMetrics/lib/procutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel" "github.com/VictoriaMetrics/metrics" xxhash "github.com/cespare/xxhash/v2" ) @@ -38,6 +39,10 @@ var ( "Examples: -remoteWrite.roundDigits=2 would round 1.236 to 1.24, while -remoteWrite.roundDigits=-1 would round 126.78 to 130. "+ "By default digits rounding is disabled. Set it to 100 for disabling it for a particular remote storage. "+ "This option may be used for improving data compression for the stored metrics") + sortLabels = flag.Bool("sortLabels", false, `Whether to sort labels for incoming samples before writing them to all the configured remote storage systems. `+ + `This may be needed for reducing memory usage at remote storage when the order of labels in incoming samples is random. `+ + `For example, if m{k1="v1",k2="v2"} may be sent as m{k2="v2",k1="v1"}`+ + `Enabled sorting for labels can slow down ingestion performance a bit`) ) var rwctxs []*remoteWriteCtx @@ -173,6 +178,7 @@ func Push(wr *prompbmarshal.WriteRequest) { tssBlock = rctx.applyRelabeling(tssBlock, labelsGlobal, pcsGlobal) globalRelabelMetricsDropped.Add(tssBlockLen - len(tssBlock)) } + sortLabelsIfNeeded(tssBlock) for _, rwctx := range rwctxs { rwctx.Push(tssBlock) } @@ -185,6 +191,16 @@ func Push(wr *prompbmarshal.WriteRequest) { } } +// sortLabelsIfNeeded sorts labels if -sortLabels command-line flag is set. +func sortLabelsIfNeeded(tss []prompbmarshal.TimeSeries) { + if !*sortLabels { + return + } + for i := range tss { + promrelabel.SortLabels(tss[i].Labels) + } +} + var globalRelabelMetricsDropped = metrics.NewCounter("vmagent_remotewrite_global_relabel_metrics_dropped_total") type remoteWriteCtx struct { diff --git a/app/vmagent/remotewrite/sort_labels.go b/app/vmagent/remotewrite/sort_labels.go deleted file mode 100644 index e9ec252bd9..0000000000 --- a/app/vmagent/remotewrite/sort_labels.go +++ /dev/null @@ -1,51 +0,0 @@ -package remotewrite - -import ( - "flag" - "sort" - "sync" - - "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" -) - -var sortLabels = flag.Bool("sortLabels", false, `Whether to sort labels for incoming samples before writing them to all the configured remote storage systems. `+ - `This may be needed for reducing memory usage at remote storage when the order of labels in incoming samples is random. `+ - `For example, if m{k1="v1",k2="v2"} may be sent as m{k2="v2",k1="v1"}`+ - `Enabled sorting for labels can slow down ingestion performance a bit`) - -// sortLabelsIfNeeded sorts labels if -sortLabels command-line flag is set. -func sortLabelsIfNeeded(tss []prompbmarshal.TimeSeries) { - if !*sortLabels { - return - } - // The slc is used for avoiding memory allocation when passing labels to sort.Sort. - slc := sortLabelsCtxPool.Get().(*sortLabelsCtx) - for i := range tss { - slc.labels = tss[i].Labels - sort.Sort(&slc.labels) - } - slc.labels = nil - sortLabelsCtxPool.Put(slc) -} - -type sortLabelsCtx struct { - labels sortedLabels -} - -var sortLabelsCtxPool = &sync.Pool{ - New: func() interface{} { - return &sortLabelsCtx{} - }, -} - -type sortedLabels []prompbmarshal.Label - -func (sl *sortedLabels) Len() int { return len(*sl) } -func (sl *sortedLabels) Less(i, j int) bool { - a := *sl - return a[i].Name < a[j].Name -} -func (sl *sortedLabels) Swap(i, j int) { - a := *sl - a[i], a[j] = a[j], a[i] -}