app/{vminsert,vmagent}: add -sortLabels command-line option for sorting time series labels before ingesting them in the storage

This option can be useful when samples for the same time series are ingested with distinct order of labels.
For example, metric{k1="v1",k2="v2"} and metric{k2="v2",k1="v1"}.
This commit is contained in:
Aliaksandr Valialkin 2021-03-31 23:12:56 +03:00
parent e1f699bb6c
commit dc9eafcd02
21 changed files with 136 additions and 49 deletions

View file

@ -1324,6 +1324,8 @@ See the example of alerting rules for VM components [here](https://github.com/Vi
* It is recommended to use default command-line flag values (i.e. don't set them explicitly) until the need
of tweaking these flag values arises.
* It is recommended inspecting logs during troubleshooting, since they may contain useful information.
* It is recommended upgrading to the latest available release from [this page](https://github.com/VictoriaMetrics/VictoriaMetrics/releases),
since the encountered issue could be already fixed there.
@ -1338,8 +1340,6 @@ See the example of alerting rules for VM components [here](https://github.com/Vi
if background merge cannot be initiated due to free disk space shortage. The value shows the number of per-month partitions,
which would start background merge if they had more free disk space.
* It is recommended inspecting logs during troubleshooting, since they may contain useful information.
* VictoriaMetrics buffers incoming data in memory for up to a few seconds before flushing it to persistent storage.
This may lead to the following "issues":
* Data becomes available for querying in a few seconds after inserting. It is possible to flush in-memory buffers to persistent storage
@ -1349,10 +1349,13 @@ See the example of alerting rules for VM components [here](https://github.com/Vi
* If VictoriaMetrics works slowly and eats more than a CPU core per 100K ingested data points per second,
then it is likely you have too many active time series for the current amount of RAM.
VictoriaMetrics [exposes](#monitoring) `vm_slow_*` metrics, which could be used as an indicator of low amounts of RAM.
It is recommended increasing the amount of RAM on the node with VictoriaMetrics in order to improve
VictoriaMetrics [exposes](#monitoring) `vm_slow_*` metrics such as `vm_slow_row_inserts_total` and `vm_slow_metric_name_loads_total`, which could be used
as an indicator of low amounts of RAM. It is recommended increasing the amount of RAM on the node with VictoriaMetrics in order to improve
ingestion and query performance in this case.
* If the order of labels for the same metrics can change over time (e.g. if `metric{k1="v1",k2="v2"}` may become `metric{k2="v2",k1="v1"}`),
then it is recommended running VictoriaMetrics with `-sortLabels` command-line flag in order to reduce memory usage and CPU usage.
* VictoriaMetrics prioritizes data ingestion over data querying. So if it has no enough resources for data ingestion,
then data querying may slow down significantly.
@ -1790,6 +1793,8 @@ Pass `-help` to VictoriaMetrics in order to see the list of supported command-li
The maximum number of CPU cores to use for small merges. Default value is used if set to 0
-snapshotAuthKey string
authKey, which must be passed in query string to /snapshot* pages
-sortLabels
Whether to sort labels for incoming samples before writing them to storage. This may be needed for reducing memory usage at storage when the order of labels in incoming samples is random. For example, if m{k1="v1",k2="v2"} may be sent as m{k2="v2",k1="v1"}. Enabled sorting for labels can slow down ingestion performance a bit
-storageDataPath string
Path to storage data (default "victoria-metrics-data")
-tls

View file

@ -702,6 +702,8 @@ See the docs at https://victoriametrics.github.io/vmagent.html .
-remoteWrite.urlRelabelConfig array
Optional path to relabel config for the corresponding -remoteWrite.url
Supports array of values separated by comma or specified via multiple flags.
-sortLabels
Whether to sort labels for incoming samples before writing them to all the configured remote storage systems. This may be needed for reducing memory usage at remote storage when the order of labels in incoming samples is random. For example, if m{k1="v1",k2="v2"} may be sent as m{k2="v2",k1="v1"}Enabled sorting for labels can slow down ingestion performance a bit
-tls
Whether to enable TLS (aka HTTPS) for incoming requests. -tlsCertFile and -tlsKeyFile must be set if -tls is set
-tlsCertFile string

View file

@ -128,6 +128,7 @@ func (wr *writeRequest) reset() {
}
func (wr *writeRequest) flush() {
sortLabelsIfNeeded(wr.tss)
wr.wr.Timeseries = wr.tss
wr.adjustSampleValues()
atomic.StoreUint64(&wr.lastFlushTime, fasttime.UnixTimestamp())

View file

@ -0,0 +1,51 @@
package remotewrite
import (
"flag"
"sort"
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
)
var sortLabels = flag.Bool("sortLabels", false, `Whether to sort labels for incoming samples before writing them to all the configured remote storage systems. `+
`This may be needed for reducing memory usage at remote storage when the order of labels in incoming samples is random. `+
`For example, if m{k1="v1",k2="v2"} may be sent as m{k2="v2",k1="v1"}`+
`Enabled sorting for labels can slow down ingestion performance a bit`)
// sortLabelsIfNeeded sorts labels if -sortLabels command-line flag is set.
func sortLabelsIfNeeded(tss []prompbmarshal.TimeSeries) {
if !*sortLabels {
return
}
// The slc is used for avoiding memory allocation when passing labels to sort.Sort.
slc := sortLabelsCtxPool.Get().(*sortLabelsCtx)
for i := range tss {
slc.labels = tss[i].Labels
sort.Sort(&slc.labels)
}
slc.labels = nil
sortLabelsCtxPool.Put(slc)
}
type sortLabelsCtx struct {
labels sortedLabels
}
var sortLabelsCtxPool = &sync.Pool{
New: func() interface{} {
return &sortLabelsCtx{}
},
}
type sortedLabels []prompbmarshal.Label
func (sl *sortedLabels) Len() int { return len(*sl) }
func (sl *sortedLabels) Less(i, j int) bool {
a := *sl
return a[i].Name < a[j].Name
}
func (sl *sortedLabels) Swap(i, j int) {
a := *sl
a[i], a[j] = a[j], a[i]
}

View file

@ -14,7 +14,7 @@ import (
// InsertCtx contains common bits for data points insertion.
type InsertCtx struct {
Labels []prompb.Label
Labels sortedLabels
mrs []storage.MetricRow
metricNamesBuf []byte

View file

@ -0,0 +1,32 @@
package common
import (
"flag"
"sort"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
)
var sortLabels = flag.Bool("sortLabels", false, `Whether to sort labels for incoming samples before writing them to storage. `+
`This may be needed for reducing memory usage at storage when the order of labels in incoming samples is random. `+
`For example, if m{k1="v1",k2="v2"} may be sent as m{k2="v2",k1="v1"}. `+
`Enabled sorting for labels can slow down ingestion performance a bit`)
// SortLabelsIfNeeded sorts labels if -sortLabels command-line flag is set
func (ctx *InsertCtx) SortLabelsIfNeeded() {
if *sortLabels {
sort.Sort(&ctx.Labels)
}
}
type sortedLabels []prompb.Label
func (sl *sortedLabels) Len() int { return len(*sl) }
func (sl *sortedLabels) Less(i, j int) bool {
a := *sl
return string(a[i].Name) < string(a[j].Name)
}
func (sl *sortedLabels) Swap(i, j int) {
a := *sl
a[i], a[j] = a[j], a[i]
}

View file

@ -55,6 +55,7 @@ func insertRows(rows []parser.Row, extraLabels []prompbmarshal.Label) error {
// Skip metric without labels.
continue
}
ctx.SortLabelsIfNeeded()
if err := ctx.WriteDataPoint(nil, ctx.Labels, r.Timestamp, r.Value); err != nil {
return err
}

View file

@ -45,6 +45,7 @@ func insertRows(rows []parser.Row) error {
// Skip metric without labels.
continue
}
ctx.SortLabelsIfNeeded()
if err := ctx.WriteDataPoint(nil, ctx.Labels, r.Timestamp, r.Value); err != nil {
return err
}

View file

@ -117,11 +117,13 @@ func insertRows(db string, rows []parser.Row, extraLabels []prompbmarshal.Label)
// Skip metric without labels.
continue
}
ic.SortLabelsIfNeeded()
if err := ic.WriteDataPoint(nil, ic.Labels, r.Timestamp, f.Value); err != nil {
return err
}
}
} else {
ic.SortLabelsIfNeeded()
ctx.metricNameBuf = storage.MarshalMetricNameRaw(ctx.metricNameBuf[:0], ic.Labels)
labelsLen := len(ic.Labels)
for j := range r.Fields {

View file

@ -65,6 +65,7 @@ func insertRows(block *parser.Block, extraLabels []prompbmarshal.Label) error {
// Skip metric without labels.
return nil
}
ic.SortLabelsIfNeeded()
ctx.metricNameBuf = storage.MarshalMetricNameRaw(ctx.metricNameBuf[:0], ic.Labels)
values := block.Values
timestamps := block.Timestamps

View file

@ -45,6 +45,7 @@ func insertRows(rows []parser.Row) error {
// Skip metric without labels.
continue
}
ctx.SortLabelsIfNeeded()
if err := ctx.WriteDataPoint(nil, ctx.Labels, r.Timestamp, r.Value); err != nil {
return err
}

View file

@ -63,6 +63,7 @@ func insertRows(rows []parser.Row, extraLabels []prompbmarshal.Label) error {
// Skip metric without labels.
continue
}
ctx.SortLabelsIfNeeded()
if err := ctx.WriteDataPoint(nil, ctx.Labels, r.Timestamp, r.Value); err != nil {
return err
}

View file

@ -60,6 +60,7 @@ func insertRows(rows []parser.Row, extraLabels []prompbmarshal.Label) error {
// Skip metric without labels.
continue
}
ctx.SortLabelsIfNeeded()
if err := ctx.WriteDataPoint(nil, ctx.Labels, r.Timestamp, r.Value); err != nil {
return err
}

View file

@ -62,6 +62,7 @@ func push(ctx *common.InsertCtx, tss []prompbmarshal.TimeSeries) {
// Skip metric without labels.
continue
}
ctx.SortLabelsIfNeeded()
var metricNameRaw []byte
var err error
for i := range ts.Samples {

View file

@ -61,6 +61,7 @@ func insertRows(timeseries []prompb.TimeSeries, extraLabels []prompbmarshal.Labe
// Skip metric without labels.
continue
}
ctx.SortLabelsIfNeeded()
var metricNameRaw []byte
var err error
samples := ts.Samples

View file

@ -67,6 +67,7 @@ func insertRows(rows []parser.Row, extraLabels []prompbmarshal.Label) error {
// Skip metric without labels.
continue
}
ic.SortLabelsIfNeeded()
ctx.metricNameBuf = storage.MarshalMetricNameRaw(ctx.metricNameBuf[:0], ic.Labels)
values := r.Values
timestamps := r.Timestamps

View file

@ -557,9 +557,6 @@ func registerStorageMetrics() {
return float64(m().SearchDelays)
})
metrics.NewGauge(`vm_sorted_row_labels_inserts_total`, func() float64 {
return float64(m().SortedRowLabelsInserts)
})
metrics.NewGauge(`vm_slow_row_inserts_total`, func() float64 {
return float64(m().SlowRowInserts)
})

View file

@ -2,7 +2,7 @@
# tip
* FEATURE: reduce the size of `MetricName -> internal_series_id` cache (aka `vm_cache_size_bytes{type="storage/tsid"}`) when ingesting samples for the same time series with distinct order of labels. For example, `foo{k1="v1",k2="v2"}` and `foo{k2="v2",k1="v1"}` represent a single time series. Previously VictoriaMetrics could need additional memory when ingesting such samples. The number of ingested samples with distinct order of labels for the same time series can be monitored with `vm_sorted_row_labels_inserts_total` metric.
* FEATURE: vminsert and vmagent: add `-sortLabels` command-line flag for sorting metric labels before pushing them to `vmstorage`. This should reduce the size of `MetricName -> internal_series_id` cache (aka `vm_cache_size_bytes{type="storage/tsid"}`) when ingesting samples for the same time series with distinct order of labels. For example, `foo{k1="v1",k2="v2"}` and `foo{k2="v2",k1="v1"}` represent a single time series.
* FEATURE: vmagent: reduce memory usage when `-remoteWrite.queues` is set to a big value. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1167).

View file

@ -1324,6 +1324,8 @@ See the example of alerting rules for VM components [here](https://github.com/Vi
* It is recommended to use default command-line flag values (i.e. don't set them explicitly) until the need
of tweaking these flag values arises.
* It is recommended inspecting logs during troubleshooting, since they may contain useful information.
* It is recommended upgrading to the latest available release from [this page](https://github.com/VictoriaMetrics/VictoriaMetrics/releases),
since the encountered issue could be already fixed there.
@ -1338,8 +1340,6 @@ See the example of alerting rules for VM components [here](https://github.com/Vi
if background merge cannot be initiated due to free disk space shortage. The value shows the number of per-month partitions,
which would start background merge if they had more free disk space.
* It is recommended inspecting logs during troubleshooting, since they may contain useful information.
* VictoriaMetrics buffers incoming data in memory for up to a few seconds before flushing it to persistent storage.
This may lead to the following "issues":
* Data becomes available for querying in a few seconds after inserting. It is possible to flush in-memory buffers to persistent storage
@ -1349,10 +1349,13 @@ See the example of alerting rules for VM components [here](https://github.com/Vi
* If VictoriaMetrics works slowly and eats more than a CPU core per 100K ingested data points per second,
then it is likely you have too many active time series for the current amount of RAM.
VictoriaMetrics [exposes](#monitoring) `vm_slow_*` metrics, which could be used as an indicator of low amounts of RAM.
It is recommended increasing the amount of RAM on the node with VictoriaMetrics in order to improve
VictoriaMetrics [exposes](#monitoring) `vm_slow_*` metrics such as `vm_slow_row_inserts_total` and `vm_slow_metric_name_loads_total`, which could be used
as an indicator of low amounts of RAM. It is recommended increasing the amount of RAM on the node with VictoriaMetrics in order to improve
ingestion and query performance in this case.
* If the order of labels for the same metrics can change over time (e.g. if `metric{k1="v1",k2="v2"}` may become `metric{k2="v2",k1="v1"}`),
then it is recommended running VictoriaMetrics with `-sortLabels` command-line flag in order to reduce memory usage and CPU usage.
* VictoriaMetrics prioritizes data ingestion over data querying. So if it has no enough resources for data ingestion,
then data querying may slow down significantly.
@ -1790,6 +1793,8 @@ Pass `-help` to VictoriaMetrics in order to see the list of supported command-li
The maximum number of CPU cores to use for small merges. Default value is used if set to 0
-snapshotAuthKey string
authKey, which must be passed in query string to /snapshot* pages
-sortLabels
Whether to sort labels for incoming samples before writing them to storage. This may be needed for reducing memory usage at storage when the order of labels in incoming samples is random. For example, if m{k1="v1",k2="v2"} may be sent as m{k2="v2",k1="v1"}. Enabled sorting for labels can slow down ingestion performance a bit
-storageDataPath string
Path to storage data (default "victoria-metrics-data")
-tls

View file

@ -702,6 +702,8 @@ See the docs at https://victoriametrics.github.io/vmagent.html .
-remoteWrite.urlRelabelConfig array
Optional path to relabel config for the corresponding -remoteWrite.url
Supports array of values separated by comma or specified via multiple flags.
-sortLabels
Whether to sort labels for incoming samples before writing them to all the configured remote storage systems. This may be needed for reducing memory usage at remote storage when the order of labels in incoming samples is random. For example, if m{k1="v1",k2="v2"} may be sent as m{k2="v2",k1="v1"}Enabled sorting for labels can slow down ingestion performance a bit
-tls
Whether to enable TLS (aka HTTPS) for incoming requests. -tlsCertFile and -tlsKeyFile must be set if -tls is set
-tlsCertFile string

View file

@ -48,7 +48,6 @@ type Storage struct {
searchTSIDsConcurrencyLimitReached uint64
searchTSIDsConcurrencyLimitTimeout uint64
sortedRowLabelsInserts uint64
slowRowInserts uint64
slowPerDayIndexInserts uint64
slowMetricNameLoads uint64
@ -359,7 +358,6 @@ type Metrics struct {
SearchDelays uint64
SortedRowLabelsInserts uint64
SlowRowInserts uint64
SlowPerDayIndexInserts uint64
SlowMetricNameLoads uint64
@ -429,7 +427,6 @@ func (s *Storage) UpdateMetrics(m *Metrics) {
m.SearchDelays = storagepacelimiter.Search.DelaysTotal()
m.SortedRowLabelsInserts += atomic.LoadUint64(&s.sortedRowLabelsInserts)
m.SlowRowInserts += atomic.LoadUint64(&s.slowRowInserts)
m.SlowPerDayIndexInserts += atomic.LoadUint64(&s.slowPerDayIndexInserts)
m.SlowMetricNameLoads += atomic.LoadUint64(&s.slowMetricNameLoads)
@ -1321,8 +1318,6 @@ func (s *Storage) ForceMergePartitions(partitionNamePrefix string) error {
var rowsAddedTotal uint64
// AddRows adds the given mrs to s.
//
// AddRows can modify mrs contents.
func (s *Storage) AddRows(mrs []MetricRow, precisionBits uint8) error {
if len(mrs) == 0 {
return nil
@ -1447,9 +1442,6 @@ func (s *Storage) add(rows []rawRow, mrs []MetricRow, precisionBits uint8) ([]ra
prevMetricNameRaw []byte
)
var pmrs *pendingMetricRows
var mn MetricName
var metricNameRawSorted []byte
var sortedRowLabelsInserts uint64
minTimestamp, maxTimestamp := s.tb.getMinMaxTimestamps()
// Return only the first error, since it has no sense in returning all errors.
var firstWarn error
@ -1502,40 +1494,22 @@ func (s *Storage) add(rows []rawRow, mrs []MetricRow, precisionBits uint8) ([]ra
continue
}
// Slower path - sort labels in MetricNameRaw and check the cache again.
// This should limit the number of cache entries for metrics with distinct order of labels to 1.
if err := mn.unmarshalRaw(mr.MetricNameRaw); err != nil {
if firstWarn == nil {
firstWarn = fmt.Errorf("cannot unmarshal MetricNameRaw %q: %w", mr.MetricNameRaw, err)
}
j--
continue
}
mn.sortTags()
metricNameRawSorted = mn.marshalRaw(metricNameRawSorted[:0])
if s.getTSIDFromCache(&r.TSID, metricNameRawSorted) {
// The TSID for the given metricNameRawSorted has been found in cache and isn't deleted.
// There is no need in checking whether r.TSID.MetricID is deleted, since tsidCache doesn't
// contain MetricName->TSID entries for deleted time series.
// See Storage.DeleteMetrics code for details.
sortedRowLabelsInserts++
prevTSID = r.TSID
prevMetricNameRaw = mr.MetricNameRaw
continue
}
// Slow path - the TSID is missing in the cache.
// Postpone its search in the loop below.
j--
if pmrs == nil {
pmrs = getPendingMetricRows()
}
if string(mr.MetricNameRaw) != string(metricNameRawSorted) {
mr.MetricNameRaw = append(mr.MetricNameRaw[:0], metricNameRawSorted...)
if err := pmrs.addRow(mr); err != nil {
// Do not stop adding rows on error - just skip invalid row.
// This guarantees that invalid rows don't prevent
// from adding valid rows into the storage.
if firstWarn == nil {
firstWarn = err
}
continue
}
pmrs.addRow(mr, &mn)
}
atomic.AddUint64(&s.sortedRowLabelsInserts, sortedRowLabelsInserts)
if pmrs != nil {
// Sort pendingMetricRows by canonical metric name in order to speed up search via `is` in the loop below.
pendingMetricRows := pmrs.pmrs
@ -1615,6 +1589,7 @@ type pendingMetricRows struct {
lastMetricNameRaw []byte
lastMetricName []byte
mn MetricName
}
func (pmrs *pendingMetricRows) reset() {
@ -1626,14 +1601,19 @@ func (pmrs *pendingMetricRows) reset() {
pmrs.metricNamesBuf = pmrs.metricNamesBuf[:0]
pmrs.lastMetricNameRaw = nil
pmrs.lastMetricName = nil
pmrs.mn.Reset()
}
func (pmrs *pendingMetricRows) addRow(mr *MetricRow, mn *MetricName) {
func (pmrs *pendingMetricRows) addRow(mr *MetricRow) error {
// Do not spend CPU time on re-calculating canonical metricName during bulk import
// of many rows for the same metric.
if string(mr.MetricNameRaw) != string(pmrs.lastMetricNameRaw) {
if err := pmrs.mn.unmarshalRaw(mr.MetricNameRaw); err != nil {
return fmt.Errorf("cannot unmarshal MetricNameRaw %q: %w", mr.MetricNameRaw, err)
}
pmrs.mn.sortTags()
metricNamesBufLen := len(pmrs.metricNamesBuf)
pmrs.metricNamesBuf = mn.Marshal(pmrs.metricNamesBuf)
pmrs.metricNamesBuf = pmrs.mn.Marshal(pmrs.metricNamesBuf)
pmrs.lastMetricName = pmrs.metricNamesBuf[metricNamesBufLen:]
pmrs.lastMetricNameRaw = mr.MetricNameRaw
}
@ -1641,6 +1621,7 @@ func (pmrs *pendingMetricRows) addRow(mr *MetricRow, mn *MetricName) {
MetricName: pmrs.lastMetricName,
mr: *mr,
})
return nil
}
func getPendingMetricRows() *pendingMetricRows {