mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
app/vmselect: reduce lock contention for heavy aggregation requests (#5119)
reduce lock contention for heavy aggregation requests previously lock contetion may happen on machine with big number of CPU due to enabled string interning. sync.Map was a choke point for all aggregation requests. Now instead of interning, new string is created. It may increase CPU and memory usage for some cases. https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5087
This commit is contained in:
parent
b52f1d1f0a
commit
4a50e9400c
8 changed files with 16 additions and 21 deletions
|
@ -8,7 +8,6 @@ import (
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
|
||||||
"github.com/VictoriaMetrics/metrics"
|
"github.com/VictoriaMetrics/metrics"
|
||||||
|
@ -130,7 +129,7 @@ func aggrPrepareSeries(argOrig []*timeseries, modifier *metricsql.ModifierExpr,
|
||||||
for i, ts := range arg {
|
for i, ts := range arg {
|
||||||
removeGroupTags(&ts.MetricName, modifier)
|
removeGroupTags(&ts.MetricName, modifier)
|
||||||
bb.B = marshalMetricNameSorted(bb.B[:0], &ts.MetricName)
|
bb.B = marshalMetricNameSorted(bb.B[:0], &ts.MetricName)
|
||||||
k := bytesutil.InternBytes(bb.B)
|
k := string(bb.B)
|
||||||
if keepOriginal {
|
if keepOriginal {
|
||||||
ts = argOrig[i]
|
ts = argOrig[i]
|
||||||
}
|
}
|
||||||
|
|
|
@ -6,7 +6,6 @@ import (
|
||||||
"unsafe"
|
"unsafe"
|
||||||
|
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/netstorage"
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/netstorage"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
|
||||||
"github.com/VictoriaMetrics/metricsql"
|
"github.com/VictoriaMetrics/metricsql"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -105,7 +104,7 @@ func (iafc *incrementalAggrFuncContext) updateTimeseries(tsOrig *timeseries, wor
|
||||||
removeGroupTags(&ts.MetricName, &iafc.ae.Modifier)
|
removeGroupTags(&ts.MetricName, &iafc.ae.Modifier)
|
||||||
bb := bbPool.Get()
|
bb := bbPool.Get()
|
||||||
bb.B = marshalMetricNameSorted(bb.B[:0], &ts.MetricName)
|
bb.B = marshalMetricNameSorted(bb.B[:0], &ts.MetricName)
|
||||||
k := bytesutil.InternBytes(bb.B)
|
k := string(bb.B)
|
||||||
iac := m[k]
|
iac := m[k]
|
||||||
if iac == nil {
|
if iac == nil {
|
||||||
if iafc.ae.Limit > 0 && len(m) >= iafc.ae.Limit {
|
if iafc.ae.Limit > 0 && len(m) >= iafc.ae.Limit {
|
||||||
|
|
|
@ -5,7 +5,6 @@ import (
|
||||||
"math"
|
"math"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
|
||||||
"github.com/VictoriaMetrics/metricsql"
|
"github.com/VictoriaMetrics/metricsql"
|
||||||
|
@ -261,8 +260,7 @@ func groupJoin(singleTimeseriesSide string, be *metricsql.BinaryOpExpr, rvsLeft,
|
||||||
bb.B = marshalMetricTagsSorted(bb.B[:0], &tsCopy.MetricName)
|
bb.B = marshalMetricTagsSorted(bb.B[:0], &tsCopy.MetricName)
|
||||||
pair, ok := m[string(bb.B)]
|
pair, ok := m[string(bb.B)]
|
||||||
if !ok {
|
if !ok {
|
||||||
k := bytesutil.InternBytes(bb.B)
|
m[string(bb.B)] = &tsPair{
|
||||||
m[k] = &tsPair{
|
|
||||||
left: &tsCopy,
|
left: &tsCopy,
|
||||||
right: tsRight,
|
right: tsRight,
|
||||||
}
|
}
|
||||||
|
@ -524,7 +522,7 @@ func createTimeseriesMapByTagSet(be *metricsql.BinaryOpExpr, left, right []*time
|
||||||
logger.Panicf("BUG: unexpected binary op modifier %q", groupOp)
|
logger.Panicf("BUG: unexpected binary op modifier %q", groupOp)
|
||||||
}
|
}
|
||||||
bb.B = marshalMetricTagsSorted(bb.B[:0], mn)
|
bb.B = marshalMetricTagsSorted(bb.B[:0], mn)
|
||||||
k := bytesutil.InternBytes(bb.B)
|
k := string(bb.B)
|
||||||
m[k] = append(m[k], ts)
|
m[k] = append(m[k], ts)
|
||||||
}
|
}
|
||||||
storage.PutMetricName(mn)
|
storage.PutMetricName(mn)
|
||||||
|
|
|
@ -563,8 +563,8 @@ func getCommonLabelFilters(tss []*timeseries) []metricsql.LabelFilter {
|
||||||
for _, tag := range ts.MetricName.Tags {
|
for _, tag := range ts.MetricName.Tags {
|
||||||
vc, ok := m[string(tag.Key)]
|
vc, ok := m[string(tag.Key)]
|
||||||
if !ok {
|
if !ok {
|
||||||
k := bytesutil.InternBytes(tag.Key)
|
k := string(tag.Key)
|
||||||
v := bytesutil.InternBytes(tag.Value)
|
v := string(tag.Value)
|
||||||
m[k] = &valuesCounter{
|
m[k] = &valuesCounter{
|
||||||
values: map[string]struct{}{
|
values: map[string]struct{}{
|
||||||
v: {},
|
v: {},
|
||||||
|
@ -581,8 +581,7 @@ func getCommonLabelFilters(tss []*timeseries) []metricsql.LabelFilter {
|
||||||
}
|
}
|
||||||
vc.count++
|
vc.count++
|
||||||
if _, ok := vc.values[string(tag.Value)]; !ok {
|
if _, ok := vc.values[string(tag.Value)]; !ok {
|
||||||
v := bytesutil.InternBytes(tag.Value)
|
vc.values[string(tag.Value)] = struct{}{}
|
||||||
vc.values[v] = struct{}{}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -12,7 +12,6 @@ import (
|
||||||
|
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/netstorage"
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/netstorage"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/querystats"
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/querystats"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/querytracer"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/querytracer"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
|
||||||
|
@ -129,7 +128,7 @@ func timeseriesToResult(tss []*timeseries, maySort bool) ([]netstorage.Result, e
|
||||||
bb := bbPool.Get()
|
bb := bbPool.Get()
|
||||||
for i, ts := range tss {
|
for i, ts := range tss {
|
||||||
bb.B = marshalMetricNameSorted(bb.B[:0], &ts.MetricName)
|
bb.B = marshalMetricNameSorted(bb.B[:0], &ts.MetricName)
|
||||||
k := bytesutil.InternBytes(bb.B)
|
k := string(bb.B)
|
||||||
if _, ok := m[k]; ok {
|
if _, ok := m[k]; ok {
|
||||||
return nil, fmt.Errorf(`duplicate output timeseries: %s`, stringMetricName(&ts.MetricName))
|
return nil, fmt.Errorf(`duplicate output timeseries: %s`, stringMetricName(&ts.MetricName))
|
||||||
}
|
}
|
||||||
|
|
|
@ -442,8 +442,7 @@ func mergeTimeseries(a, b []*timeseries, bStart int64, ec *EvalConfig) []*timese
|
||||||
defer bbPool.Put(bb)
|
defer bbPool.Put(bb)
|
||||||
for _, ts := range a {
|
for _, ts := range a {
|
||||||
bb.B = marshalMetricNameSorted(bb.B[:0], &ts.MetricName)
|
bb.B = marshalMetricNameSorted(bb.B[:0], &ts.MetricName)
|
||||||
k := bytesutil.InternBytes(bb.B)
|
m[string(bb.B)] = ts
|
||||||
m[k] = ts
|
|
||||||
}
|
}
|
||||||
|
|
||||||
rvs := make([]*timeseries, 0, len(a))
|
rvs := make([]*timeseries, 0, len(a))
|
||||||
|
@ -455,7 +454,7 @@ func mergeTimeseries(a, b []*timeseries, bStart int64, ec *EvalConfig) []*timese
|
||||||
tmp.MetricName.MoveFrom(&tsB.MetricName)
|
tmp.MetricName.MoveFrom(&tsB.MetricName)
|
||||||
|
|
||||||
bb.B = marshalMetricNameSorted(bb.B[:0], &tmp.MetricName)
|
bb.B = marshalMetricNameSorted(bb.B[:0], &tmp.MetricName)
|
||||||
k := bytesutil.InternBytes(bb.B)
|
k := string(bb.B)
|
||||||
tsA := m[k]
|
tsA := m[k]
|
||||||
if tsA == nil {
|
if tsA == nil {
|
||||||
tStart := ec.Start
|
tStart := ec.Start
|
||||||
|
|
|
@ -420,7 +420,7 @@ func transformBucketsLimit(tfa *transformFuncArg) ([]*timeseries, error) {
|
||||||
mn.CopyFrom(&ts.MetricName)
|
mn.CopyFrom(&ts.MetricName)
|
||||||
mn.RemoveTag("le")
|
mn.RemoveTag("le")
|
||||||
b = marshalMetricNameSorted(b[:0], &mn)
|
b = marshalMetricNameSorted(b[:0], &mn)
|
||||||
k := bytesutil.InternBytes(b)
|
k := bytesutil.ToUnsafeString(b)
|
||||||
m[k] = append(m[k], x{
|
m[k] = append(m[k], x{
|
||||||
le: le,
|
le: le,
|
||||||
ts: ts,
|
ts: ts,
|
||||||
|
@ -523,7 +523,7 @@ func vmrangeBucketsToLE(tss []*timeseries) []*timeseries {
|
||||||
ts.MetricName.RemoveTag("le")
|
ts.MetricName.RemoveTag("le")
|
||||||
ts.MetricName.RemoveTag("vmrange")
|
ts.MetricName.RemoveTag("vmrange")
|
||||||
bb.B = marshalMetricNameSorted(bb.B[:0], &ts.MetricName)
|
bb.B = marshalMetricNameSorted(bb.B[:0], &ts.MetricName)
|
||||||
k := bytesutil.InternBytes(bb.B)
|
k := string(bb.B)
|
||||||
m[k] = append(m[k], x{
|
m[k] = append(m[k], x{
|
||||||
startStr: startStr,
|
startStr: startStr,
|
||||||
endStr: endStr,
|
endStr: endStr,
|
||||||
|
@ -1023,7 +1023,7 @@ func groupLeTimeseries(tss []*timeseries) map[string][]leTimeseries {
|
||||||
ts.MetricName.ResetMetricGroup()
|
ts.MetricName.ResetMetricGroup()
|
||||||
ts.MetricName.RemoveTag("le")
|
ts.MetricName.RemoveTag("le")
|
||||||
bb.B = marshalMetricTagsSorted(bb.B[:0], &ts.MetricName)
|
bb.B = marshalMetricTagsSorted(bb.B[:0], &ts.MetricName)
|
||||||
k := bytesutil.InternBytes(bb.B)
|
k := string(bb.B)
|
||||||
m[k] = append(m[k], leTimeseries{
|
m[k] = append(m[k], leTimeseries{
|
||||||
le: le,
|
le: le,
|
||||||
ts: ts,
|
ts: ts,
|
||||||
|
@ -1657,7 +1657,7 @@ func transformUnion(tfa *transformFuncArg) ([]*timeseries, error) {
|
||||||
for _, arg := range args {
|
for _, arg := range args {
|
||||||
for _, ts := range arg {
|
for _, ts := range arg {
|
||||||
bb.B = marshalMetricNameSorted(bb.B[:0], &ts.MetricName)
|
bb.B = marshalMetricNameSorted(bb.B[:0], &ts.MetricName)
|
||||||
k := bytesutil.InternBytes(bb.B)
|
k := string(bb.B)
|
||||||
if m[k] {
|
if m[k] {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
|
@ -40,6 +40,8 @@ The sandbox cluster installation is running under the constant load generated by
|
||||||
|
|
||||||
* BUGFIX: [vmalert](https://docs.victoriametrics.com/vmalert.html): strip sensitive information such as auth headers or passwords from datasource, remote-read, remote-write or notifier URLs in log messages or UI. This behavior is by default and is controlled via `-datasource.showURL`, `-remoteRead.showURL`, `remoteWrite.showURL` or `-notifier.showURL` cmd-line flags. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5044).
|
* BUGFIX: [vmalert](https://docs.victoriametrics.com/vmalert.html): strip sensitive information such as auth headers or passwords from datasource, remote-read, remote-write or notifier URLs in log messages or UI. This behavior is by default and is controlled via `-datasource.showURL`, `-remoteRead.showURL`, `remoteWrite.showURL` or `-notifier.showURL` cmd-line flags. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5044).
|
||||||
|
|
||||||
|
* BUGFIX: [vmselect](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html): improve performance and memory usage during query processing on machines with big number of CPU cores. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5087) for details.
|
||||||
|
|
||||||
## [v1.94.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.94.0)
|
## [v1.94.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.94.0)
|
||||||
|
|
||||||
Released at 2023-10-02
|
Released at 2023-10-02
|
||||||
|
|
Loading…
Reference in a new issue