mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-03-11 15:34:56 +00:00
app/vmagent/remotewrite: sort labels before sending the series to per-remoteWrite.url queues
This commit is contained in:
parent
9d97f44772
commit
dcac849c1f
3 changed files with 16 additions and 52 deletions
|
@ -128,7 +128,6 @@ func (wr *writeRequest) reset() {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (wr *writeRequest) flush() {
|
func (wr *writeRequest) flush() {
|
||||||
sortLabelsIfNeeded(wr.tss)
|
|
||||||
wr.wr.Timeseries = wr.tss
|
wr.wr.Timeseries = wr.tss
|
||||||
wr.adjustSampleValues()
|
wr.adjustSampleValues()
|
||||||
atomic.StoreUint64(&wr.lastFlushTime, fasttime.UnixTimestamp())
|
atomic.StoreUint64(&wr.lastFlushTime, fasttime.UnixTimestamp())
|
||||||
|
|
|
@ -13,6 +13,7 @@ import (
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/persistentqueue"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/persistentqueue"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/procutil"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/procutil"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel"
|
||||||
"github.com/VictoriaMetrics/metrics"
|
"github.com/VictoriaMetrics/metrics"
|
||||||
xxhash "github.com/cespare/xxhash/v2"
|
xxhash "github.com/cespare/xxhash/v2"
|
||||||
)
|
)
|
||||||
|
@ -38,6 +39,10 @@ var (
|
||||||
"Examples: -remoteWrite.roundDigits=2 would round 1.236 to 1.24, while -remoteWrite.roundDigits=-1 would round 126.78 to 130. "+
|
"Examples: -remoteWrite.roundDigits=2 would round 1.236 to 1.24, while -remoteWrite.roundDigits=-1 would round 126.78 to 130. "+
|
||||||
"By default digits rounding is disabled. Set it to 100 for disabling it for a particular remote storage. "+
|
"By default digits rounding is disabled. Set it to 100 for disabling it for a particular remote storage. "+
|
||||||
"This option may be used for improving data compression for the stored metrics")
|
"This option may be used for improving data compression for the stored metrics")
|
||||||
|
sortLabels = flag.Bool("sortLabels", false, `Whether to sort labels for incoming samples before writing them to all the configured remote storage systems. `+
|
||||||
|
`This may be needed for reducing memory usage at remote storage when the order of labels in incoming samples is random. `+
|
||||||
|
`For example, if m{k1="v1",k2="v2"} may be sent as m{k2="v2",k1="v1"}`+
|
||||||
|
`Enabled sorting for labels can slow down ingestion performance a bit`)
|
||||||
)
|
)
|
||||||
|
|
||||||
var rwctxs []*remoteWriteCtx
|
var rwctxs []*remoteWriteCtx
|
||||||
|
@ -173,6 +178,7 @@ func Push(wr *prompbmarshal.WriteRequest) {
|
||||||
tssBlock = rctx.applyRelabeling(tssBlock, labelsGlobal, pcsGlobal)
|
tssBlock = rctx.applyRelabeling(tssBlock, labelsGlobal, pcsGlobal)
|
||||||
globalRelabelMetricsDropped.Add(tssBlockLen - len(tssBlock))
|
globalRelabelMetricsDropped.Add(tssBlockLen - len(tssBlock))
|
||||||
}
|
}
|
||||||
|
sortLabelsIfNeeded(tssBlock)
|
||||||
for _, rwctx := range rwctxs {
|
for _, rwctx := range rwctxs {
|
||||||
rwctx.Push(tssBlock)
|
rwctx.Push(tssBlock)
|
||||||
}
|
}
|
||||||
|
@ -185,6 +191,16 @@ func Push(wr *prompbmarshal.WriteRequest) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// sortLabelsIfNeeded sorts labels if -sortLabels command-line flag is set.
|
||||||
|
func sortLabelsIfNeeded(tss []prompbmarshal.TimeSeries) {
|
||||||
|
if !*sortLabels {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
for i := range tss {
|
||||||
|
promrelabel.SortLabels(tss[i].Labels)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
var globalRelabelMetricsDropped = metrics.NewCounter("vmagent_remotewrite_global_relabel_metrics_dropped_total")
|
var globalRelabelMetricsDropped = metrics.NewCounter("vmagent_remotewrite_global_relabel_metrics_dropped_total")
|
||||||
|
|
||||||
type remoteWriteCtx struct {
|
type remoteWriteCtx struct {
|
||||||
|
|
|
@ -1,51 +0,0 @@
|
||||||
package remotewrite
|
|
||||||
|
|
||||||
import (
|
|
||||||
"flag"
|
|
||||||
"sort"
|
|
||||||
"sync"
|
|
||||||
|
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
|
||||||
)
|
|
||||||
|
|
||||||
var sortLabels = flag.Bool("sortLabels", false, `Whether to sort labels for incoming samples before writing them to all the configured remote storage systems. `+
|
|
||||||
`This may be needed for reducing memory usage at remote storage when the order of labels in incoming samples is random. `+
|
|
||||||
`For example, if m{k1="v1",k2="v2"} may be sent as m{k2="v2",k1="v1"}`+
|
|
||||||
`Enabled sorting for labels can slow down ingestion performance a bit`)
|
|
||||||
|
|
||||||
// sortLabelsIfNeeded sorts labels if -sortLabels command-line flag is set.
|
|
||||||
func sortLabelsIfNeeded(tss []prompbmarshal.TimeSeries) {
|
|
||||||
if !*sortLabels {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
// The slc is used for avoiding memory allocation when passing labels to sort.Sort.
|
|
||||||
slc := sortLabelsCtxPool.Get().(*sortLabelsCtx)
|
|
||||||
for i := range tss {
|
|
||||||
slc.labels = tss[i].Labels
|
|
||||||
sort.Sort(&slc.labels)
|
|
||||||
}
|
|
||||||
slc.labels = nil
|
|
||||||
sortLabelsCtxPool.Put(slc)
|
|
||||||
}
|
|
||||||
|
|
||||||
type sortLabelsCtx struct {
|
|
||||||
labels sortedLabels
|
|
||||||
}
|
|
||||||
|
|
||||||
var sortLabelsCtxPool = &sync.Pool{
|
|
||||||
New: func() interface{} {
|
|
||||||
return &sortLabelsCtx{}
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
type sortedLabels []prompbmarshal.Label
|
|
||||||
|
|
||||||
func (sl *sortedLabels) Len() int { return len(*sl) }
|
|
||||||
func (sl *sortedLabels) Less(i, j int) bool {
|
|
||||||
a := *sl
|
|
||||||
return a[i].Name < a[j].Name
|
|
||||||
}
|
|
||||||
func (sl *sortedLabels) Swap(i, j int) {
|
|
||||||
a := *sl
|
|
||||||
a[i], a[j] = a[j], a[i]
|
|
||||||
}
|
|
Loading…
Reference in a new issue