lib/promscrape: optimize discoveryutils.SanitizeLabelName()

Cache sanitized label names and return them next time.
This reduces the number of allocations and speeds up the SanitizeLabelName()
function for common case when the number of unique label names is smaller than 100k
This commit is contained in:
Aliaksandr Valialkin 2022-08-27 00:12:39 +03:00
parent a2cd79576f
commit c06e7a142c
No known key found for this signature in database
GPG key ID: A72BEC6CD3D0DED1
24 changed files with 217 additions and 105 deletions

View file

@ -23,7 +23,8 @@ The following tip changes can be tested by building VictoriaMetrics components f
* FEATURE: return shorter error messages to Grafana and to other clients requesting [/api/v1/query](https://docs.victoriametrics.com/keyConcepts.html#instant-query) and [/api/v1/query_range](https://docs.victoriametrics.com/keyConcepts.html#range-query) endpoints. This should simplify reading these errors by humans. The long error message with full context is still written to logs.
* FEATURE: add the ability to fine-tune the number of points, which can be generated per each matching time series during [subquery](https://docs.victoriametrics.com/MetricsQL.html#subqueries) evaluation. This can be done with the `-search.maxPointsSubqueryPerTimeseries` command-line flag. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2922).
* FEATURE: improve the performance for relabeling rules with commonly used regular expressions in `regex` and `if` fields such as `some_string`, `prefix.*`, `prefix.+`, `foo|bar|baz`, `.*foo.*` and `.+foo.+`.
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): improve the performance for relabeling rules with commonly used regular expressions in `regex` and `if` fields such as `some_string`, `prefix.*`, `prefix.+`, `foo|bar|baz`, `.*foo.*` and `.+foo.+`.
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): reduce CPU usage when discovering big number of [Kubernetes targets](https://docs.victoriametrics.com/sd_configs.html#kubernetes_sd_configs) with big number of labels and annotations.
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add ability to accept [multitenant](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#multitenancy) data via OpenTSDB `/api/put` protocol at `/insert/<tenantID>/opentsdb/api/put` http endpoint if [multitenant support](https://docs.victoriametrics.com/vmagent.html#multitenancy) is enabled at `vmagent`. Thanks to @chengjianyun for [the pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/3015).
* FEATURE: [monitoring](https://docs.victoriametrics.com/#monitoring): expose `vm_hourly_series_limit_max_series`, `vm_hourly_series_limit_current_series`, `vm_daily_series_limit_max_series` and `vm_daily_series_limit_current_series` metrics when `-search.maxHourlySeries` or `-search.maxDailySeries` limits are set. This allows alerting when the number of unique series reaches the configured limits. See [these docs](https://docs.victoriametrics.com/#cardinality-limiter) for details.
* FEATURE: [VictoriaMetrics cluster](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html): reduce the amounts of logging at `vmstorage` when `vmselect` connects/disconnects to `vmstorage`.

View file

@ -11,7 +11,6 @@ import (
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
@ -35,6 +34,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/kubernetes"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/openstack"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/yandexcloud"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutils"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/proxy"
"github.com/VictoriaMetrics/metrics"
@ -1330,37 +1330,11 @@ func (swc *scrapeWorkConfig) getScrapeWork(target string, extraLabels, metaLabel
func internLabelStrings(labels []prompbmarshal.Label) {
for i := range labels {
label := &labels[i]
label.Name = internString(label.Name)
label.Value = internString(label.Value)
label.Name = discoveryutils.InternString(label.Name)
label.Value = discoveryutils.InternString(label.Value)
}
}
func internString(s string) string {
m := internStringsMap.Load().(*sync.Map)
if v, ok := m.Load(s); ok {
sp := v.(*string)
return *sp
}
// Make a new copy for s in order to remove references from possible bigger string s refers to.
sCopy := string(append([]byte{}, s...))
m.Store(sCopy, &sCopy)
n := atomic.AddUint64(&internStringsMapLen, 1)
if n > 100e3 {
atomic.StoreUint64(&internStringsMapLen, 0)
internStringsMap.Store(&sync.Map{})
}
return sCopy
}
var (
internStringsMap atomic.Value
internStringsMapLen uint64
)
func init() {
internStringsMap.Store(&sync.Map{})
}
func getParamsFromLabels(labels []prompbmarshal.Label, paramsOrig map[string][]string) map[string][]string {
// See https://www.robustperception.io/life-of-a-label
m := make(map[string][]string)

View file

@ -17,44 +17,6 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/proxy"
)
func TestInternStringSerial(t *testing.T) {
if err := testInternString(t); err != nil {
t.Fatalf("unexpected error: %s", err)
}
}
func TestInternStringConcurrent(t *testing.T) {
concurrency := 5
resultCh := make(chan error, concurrency)
for i := 0; i < concurrency; i++ {
go func() {
resultCh <- testInternString(t)
}()
}
timer := time.NewTimer(5 * time.Second)
for i := 0; i < concurrency; i++ {
select {
case err := <-resultCh:
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
case <-timer.C:
t.Fatalf("timeout")
}
}
}
func testInternString(t *testing.T) error {
for i := 0; i < 1000; i++ {
s := fmt.Sprintf("foo_%d", i)
s1 := internString(s)
if s != s1 {
return fmt.Errorf("unexpected string returned from internString; got %q; want %q", s1, s)
}
}
return nil
}
func TestMergeLabels(t *testing.T) {
f := func(swc *scrapeWorkConfig, target string, extraLabels, metaLabels map[string]string, resultExpected string) {
t.Helper()

View file

@ -94,16 +94,13 @@ func (sn *ServiceNode) appendTargetLabels(ms []map[string]string, serviceName, t
m["__meta_consul_tags"] = tagSeparator + strings.Join(sn.Service.Tags, tagSeparator) + tagSeparator
for k, v := range sn.Node.Meta {
key := discoveryutils.SanitizeLabelName(k)
m["__meta_consul_metadata_"+key] = v
m[discoveryutils.SanitizeLabelName("__meta_consul_metadata_"+k)] = v
}
for k, v := range sn.Service.Meta {
key := discoveryutils.SanitizeLabelName(k)
m["__meta_consul_service_metadata_"+key] = v
m[discoveryutils.SanitizeLabelName("__meta_consul_service_metadata_"+k)] = v
}
for k, v := range sn.Node.TaggedAddresses {
key := discoveryutils.SanitizeLabelName(k)
m["__meta_consul_tagged_address_"+key] = v
m[discoveryutils.SanitizeLabelName("__meta_consul_tagged_address_"+k)] = v
}
ms = append(ms, m)
return ms

View file

@ -107,7 +107,7 @@ func addCommonLabels(m map[string]string, c *container, networkLabels map[string
m["__meta_docker_container_name"] = c.Names[0]
m["__meta_docker_container_network_mode"] = c.HostConfig.NetworkMode
for k, v := range c.Labels {
m["__meta_docker_container_label_"+discoveryutils.SanitizeLabelName(k)] = v
m[discoveryutils.SanitizeLabelName("__meta_docker_container_label_"+k)] = v
}
for k, v := range networkLabels {
m[k] = v

View file

@ -53,7 +53,7 @@ func getNetworkLabelsByNetworkID(networks []network) map[string]map[string]strin
"__meta_docker_network_scope": network.Scope,
}
for k, v := range network.Labels {
m["__meta_docker_network_label_"+discoveryutils.SanitizeLabelName(k)] = v
m[discoveryutils.SanitizeLabelName("__meta_docker_network_label_"+k)] = v
}
ms[network.ID] = m
}

View file

@ -53,7 +53,7 @@ func getNetworkLabelsByNetworkID(networks []network) map[string]map[string]strin
"__meta_dockerswarm_network_scope": network.Scope,
}
for k, v := range network.Labels {
m["__meta_dockerswarm_network_label_"+discoveryutils.SanitizeLabelName(k)] = v
m[discoveryutils.SanitizeLabelName("__meta_dockerswarm_network_label_"+k)] = v
}
ms[network.ID] = m
}

View file

@ -80,7 +80,7 @@ func addNodeLabels(nodes []node, port int) []map[string]string {
"__meta_dockerswarm_node_status": node.Status.State,
}
for k, v := range node.Spec.Labels {
m["__meta_dockerswarm_node_label_"+discoveryutils.SanitizeLabelName(k)] = v
m[discoveryutils.SanitizeLabelName("__meta_dockerswarm_node_label_"+k)] = v
}
ms = append(ms, m)
}

View file

@ -96,7 +96,7 @@ func addServicesLabels(services []service, networksLabels map[string]map[string]
"__meta_dockerswarm_service_updating_status": service.UpdateStatus.State,
}
for k, v := range service.Spec.Labels {
commonLabels["__meta_dockerswarm_service_label_"+discoveryutils.SanitizeLabelName(k)] = v
commonLabels[discoveryutils.SanitizeLabelName("__meta_dockerswarm_service_label_"+k)] = v
}
for _, vip := range service.Endpoint.VirtualIPs {
// skip services without virtual address.

View file

@ -87,7 +87,7 @@ func addTasksLabels(tasks []task, nodesLabels, servicesLabels []map[string]strin
"__meta_dockerswarm_task_state": task.Status.State,
}
for k, v := range task.Spec.ContainerSpec.Labels {
commonLabels["__meta_dockerswarm_container_label_"+discoveryutils.SanitizeLabelName(k)] = v
commonLabels[discoveryutils.SanitizeLabelName("__meta_dockerswarm_container_label_"+k)] = v
}
var svcPorts []portConfig
for i, v := range services {

View file

@ -186,8 +186,7 @@ func (inst *Instance) appendTargetLabels(ms []map[string]string, ownerID string,
if len(t.Key) == 0 || len(t.Value) == 0 {
continue
}
name := discoveryutils.SanitizeLabelName(t.Key)
m["__meta_ec2_tag_"+name] = t.Value
m[discoveryutils.SanitizeLabelName("__meta_ec2_tag_"+t.Key)] = t.Value
}
ms = append(ms, m)
return ms

View file

@ -139,11 +139,11 @@ func addInstanceLabels(apps *applications) []map[string]string {
if len(instance.DataCenterInfo.Name) > 0 {
m["__meta_eureka_app_instance_datacenterinfo_name"] = instance.DataCenterInfo.Name
for _, tag := range instance.DataCenterInfo.Metadata.Items {
m["__meta_eureka_app_instance_datacenterinfo_metadata_"+discoveryutils.SanitizeLabelName(tag.XMLName.Local)] = tag.Content
m[discoveryutils.SanitizeLabelName("__meta_eureka_app_instance_datacenterinfo_metadata_"+tag.XMLName.Local)] = tag.Content
}
}
for _, tag := range instance.Metadata.Items {
m["__meta_eureka_app_instance_metadata_"+discoveryutils.SanitizeLabelName(tag.XMLName.Local)] = tag.Content
m[discoveryutils.SanitizeLabelName("__meta_eureka_app_instance_metadata_"+tag.XMLName.Local)] = tag.Content
}
ms = append(ms, m)
}

View file

@ -150,8 +150,7 @@ func (inst *Instance) appendTargetLabels(ms []map[string]string, project, tagSep
"__meta_gce_zone": inst.Zone,
}
for _, iface := range inst.NetworkInterfaces {
ifaceName := discoveryutils.SanitizeLabelName(iface.Name)
m["__meta_gce_interface_ipv4_"+ifaceName] = iface.NetworkIP
m[discoveryutils.SanitizeLabelName("__meta_gce_interface_ipv4_"+iface.Name)] = iface.NetworkIP
}
if len(inst.Tags.Items) > 0 {
// We surround the separated list with the separator as well. This way regular expressions
@ -159,12 +158,10 @@ func (inst *Instance) appendTargetLabels(ms []map[string]string, project, tagSep
m["__meta_gce_tags"] = tagSeparator + strings.Join(inst.Tags.Items, tagSeparator) + tagSeparator
}
for _, item := range inst.Metadata.Items {
key := discoveryutils.SanitizeLabelName(item.Key)
m["__meta_gce_metadata_"+key] = item.Value
m[discoveryutils.SanitizeLabelName("__meta_gce_metadata_"+item.Key)] = item.Value
}
for _, label := range inst.Labels {
name := discoveryutils.SanitizeLabelName(label.Name)
m["__meta_gce_label_"+name] = label.Value
m[discoveryutils.SanitizeLabelName("__meta_gce_label_"+label.Name)] = label.Value
}
if len(iface.AccessConfigs) > 0 {
ac := iface.AccessConfigs[0]

View file

@ -28,14 +28,12 @@ type ListMeta struct {
func (om *ObjectMeta) registerLabelsAndAnnotations(prefix string, m map[string]string) {
for _, lb := range om.Labels {
ln := discoveryutils.SanitizeLabelName(lb.Name)
m[prefix+"_label_"+ln] = lb.Value
m[prefix+"_labelpresent_"+ln] = "true"
m[discoveryutils.SanitizeLabelName(prefix+"_label_"+lb.Name)] = lb.Value
m[discoveryutils.SanitizeLabelName(prefix+"_labelpresent_"+lb.Name)] = "true"
}
for _, a := range om.Annotations {
an := discoveryutils.SanitizeLabelName(a.Name)
m[prefix+"_annotation_"+an] = a.Value
m[prefix+"_annotationpresent_"+an] = "true"
m[discoveryutils.SanitizeLabelName(prefix+"_annotation_"+a.Name)] = a.Value
m[discoveryutils.SanitizeLabelName(prefix+"_annotationpresent_"+a.Name)] = "true"
}
}

View file

@ -151,8 +151,8 @@ func getEndpointSliceLabels(eps *EndpointSlice, addr string, ea Endpoint, epp En
m["__meta_kubernetes_endpointslice_endpoint_hostname"] = ea.Hostname
}
for k, v := range ea.Topology {
m["__meta_kubernetes_endpointslice_endpoint_topology_"+discoveryutils.SanitizeLabelName(k)] = v
m["__meta_kubernetes_endpointslice_endpoint_topology_present_"+discoveryutils.SanitizeLabelName(k)] = "true"
m[discoveryutils.SanitizeLabelName("__meta_kubernetes_endpointslice_endpoint_topology_"+k)] = v
m[discoveryutils.SanitizeLabelName("__meta_kubernetes_endpointslice_endpoint_topology_present_"+k)] = "true"
}
return m
}

View file

@ -104,8 +104,7 @@ func (n *Node) getTargetLabels(gw *groupWatcher) []map[string]string {
continue
}
addrTypesUsed[a.Type] = true
ln := discoveryutils.SanitizeLabelName(a.Type)
m["__meta_kubernetes_node_address_"+ln] = a.Address
m[discoveryutils.SanitizeLabelName("__meta_kubernetes_node_address_"+a.Type)] = a.Address
}
return []map[string]string{m}
}

View file

@ -57,7 +57,7 @@ func addInstanceLabels(servers []server, port int) []map[string]string {
"__meta_openstack_instance_flavor": server.Flavor.ID,
}
for k, v := range server.Metadata {
m["__meta_openstack_tag_"+discoveryutils.SanitizeLabelName(k)] = v
m[discoveryutils.SanitizeLabelName("__meta_openstack_tag_"+k)] = v
}
// Traverse server.Addresses in alphabetical order of pool name
// in order to return targets in deterministic order.

View file

@ -51,7 +51,7 @@ func addInstanceLabels(instances []instance) []map[string]string {
"__meta_yandexcloud_folder_id": server.FolderID,
}
for k, v := range server.Labels {
m["__meta_yandexcloud_instance_label_"+discoveryutils.SanitizeLabelName(k)] = v
m[discoveryutils.SanitizeLabelName("__meta_yandexcloud_instance_label_"+k)] = v
}
for _, ni := range server.NetworkInterfaces {

View file

@ -0,0 +1,35 @@
package discoveryutils
import (
"sync"
"sync/atomic"
)
// InternString returns interned s.
//
// This may be needed for reducing the amounts of allocated memory.
func InternString(s string) string {
m := internStringsMap.Load().(*sync.Map)
if v, ok := m.Load(s); ok {
sp := v.(*string)
return *sp
}
// Make a new copy for s in order to remove references from possible bigger string s refers to.
sCopy := string(append([]byte{}, s...))
m.Store(sCopy, &sCopy)
n := atomic.AddUint64(&internStringsMapLen, 1)
if n > 100e3 {
atomic.StoreUint64(&internStringsMapLen, 0)
internStringsMap.Store(&sync.Map{})
}
return sCopy
}
var (
internStringsMap atomic.Value
internStringsMapLen uint64
)
func init() {
internStringsMap.Store(&sync.Map{})
}

View file

@ -0,0 +1,45 @@
package discoveryutils
import (
"fmt"
"testing"
"time"
)
func TestInternStringSerial(t *testing.T) {
if err := testInternString(t); err != nil {
t.Fatalf("unexpected error: %s", err)
}
}
func TestInternStringConcurrent(t *testing.T) {
concurrency := 5
resultCh := make(chan error, concurrency)
for i := 0; i < concurrency; i++ {
go func() {
resultCh <- testInternString(t)
}()
}
timer := time.NewTimer(5 * time.Second)
for i := 0; i < concurrency; i++ {
select {
case err := <-resultCh:
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
case <-timer.C:
t.Fatalf("timeout")
}
}
}
func testInternString(t *testing.T) error {
for i := 0; i < 1000; i++ {
s := fmt.Sprintf("foo_%d", i)
s1 := InternString(s)
if s != s1 {
return fmt.Errorf("unexpected string returned from internString; got %q; want %q", s1, s)
}
}
return nil
}

View file

@ -1,4 +1,4 @@
package promscrape
package discoveryutils
import (
"fmt"
@ -15,7 +15,7 @@ func BenchmarkInternString(b *testing.B) {
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
for _, s := range a {
sResult := internString(s)
sResult := InternString(s)
if sResult != s {
panic(fmt.Sprintf("unexpected string obtained; got %q; want %q", sResult, s))
}

View file

@ -6,6 +6,8 @@ import (
"regexp"
"sort"
"strconv"
"sync"
"sync/atomic"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
)
@ -15,13 +17,39 @@ import (
//
// This has been copied from Prometheus sources at util/strutil/strconv.go
func SanitizeLabelName(name string) string {
return invalidLabelCharRE.ReplaceAllString(name, "_")
m := sanitizedLabelNames.Load().(*sync.Map)
v, ok := m.Load(name)
if ok {
// Fast path - the sanitized label name is found in the cache.
sp := v.(*string)
return *sp
}
// Slow path - sanitize name and store it in the cache.
sanitizedName := invalidLabelCharRE.ReplaceAllString(name, "_")
// Make a copy of name in order to limit memory usage to the name length,
// since the name may point to bigger string.
s := string(append([]byte{}, name...))
sp := &sanitizedName
m.Store(s, sp)
n := atomic.AddUint64(&sanitizedLabelNamesLen, 1)
if n > 100e3 {
atomic.StoreUint64(&sanitizedLabelNamesLen, 0)
sanitizedLabelNames.Store(&sync.Map{})
}
return sanitizedName
}
var (
sanitizedLabelNames atomic.Value
sanitizedLabelNamesLen uint64
invalidLabelCharRE = regexp.MustCompile(`[^a-zA-Z0-9_]`)
)
func init() {
sanitizedLabelNames.Store(&sync.Map{})
}
// JoinHostPort returns host:port.
//
// Host may be dns name, ipv4 or ipv6 address.

View file

@ -0,0 +1,56 @@
package discoveryutils
import (
"fmt"
"testing"
"time"
)
func TestSanitizeLabelNameSerial(t *testing.T) {
if err := testSanitizeLabelName(); err != nil {
t.Fatalf("unexpected error: %s", err)
}
}
func TestSanitizeLabelNameParallel(t *testing.T) {
goroutines := 5
ch := make(chan error, goroutines)
for i := 0; i < goroutines; i++ {
go func() {
ch <- testSanitizeLabelName()
}()
}
tch := time.After(5 * time.Second)
for i := 0; i < goroutines; i++ {
select {
case <-tch:
t.Fatalf("timeout!")
case err := <-ch:
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
}
}
}
func testSanitizeLabelName() error {
f := func(name, expectedSanitizedName string) error {
for i := 0; i < 5; i++ {
sanitizedName := SanitizeLabelName(name)
if sanitizedName != expectedSanitizedName {
return fmt.Errorf("unexpected sanitized label name %q; got %q; want %q", name, sanitizedName, expectedSanitizedName)
}
}
return nil
}
if err := f("", ""); err != nil {
return err
}
if err := f("foo", "foo"); err != nil {
return err
}
if err := f("foo-bar/baz", "foo_bar_baz"); err != nil {
return err
}
return nil
}

View file

@ -0,0 +1,21 @@
package discoveryutils
import (
"fmt"
"testing"
)
func BenchmarkSanitizeLabelName(b *testing.B) {
labelName := "foo-bar/baz/aaaa+bbb"
expectedLabelNameSanitized := "foo_bar_baz_aaaa_bbb"
b.SetBytes(1)
b.ReportAllocs()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
labelNameSanitized := SanitizeLabelName(labelName)
if labelNameSanitized != expectedLabelNameSanitized {
panic(fmt.Errorf("unexpected sanitized label name; got %q; want %q", labelNameSanitized, expectedLabelNameSanitized))
}
}
})
}