mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
lib/promscrape: optimize discoveryutils.SanitizeLabelName()
Cache sanitized label names and return them next time. This reduces the number of allocations and speeds up the SanitizeLabelName() function for common case when the number of unique label names is smaller than 100k
This commit is contained in:
parent
1c7db1c8fd
commit
86394b4179
24 changed files with 217 additions and 105 deletions
|
@ -23,7 +23,8 @@ The following tip changes can be tested by building VictoriaMetrics components f
|
||||||
|
|
||||||
* FEATURE: return shorter error messages to Grafana and to other clients requesting [/api/v1/query](https://docs.victoriametrics.com/keyConcepts.html#instant-query) and [/api/v1/query_range](https://docs.victoriametrics.com/keyConcepts.html#range-query) endpoints. This should simplify reading these errors by humans. The long error message with full context is still written to logs.
|
* FEATURE: return shorter error messages to Grafana and to other clients requesting [/api/v1/query](https://docs.victoriametrics.com/keyConcepts.html#instant-query) and [/api/v1/query_range](https://docs.victoriametrics.com/keyConcepts.html#range-query) endpoints. This should simplify reading these errors by humans. The long error message with full context is still written to logs.
|
||||||
* FEATURE: add the ability to fine-tune the number of points, which can be generated per each matching time series during [subquery](https://docs.victoriametrics.com/MetricsQL.html#subqueries) evaluation. This can be done with the `-search.maxPointsSubqueryPerTimeseries` command-line flag. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2922).
|
* FEATURE: add the ability to fine-tune the number of points, which can be generated per each matching time series during [subquery](https://docs.victoriametrics.com/MetricsQL.html#subqueries) evaluation. This can be done with the `-search.maxPointsSubqueryPerTimeseries` command-line flag. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2922).
|
||||||
* FEATURE: improve the performance for relabeling rules with commonly used regular expressions in `regex` and `if` fields such as `some_string`, `prefix.*`, `prefix.+`, `foo|bar|baz`, `.*foo.*` and `.+foo.+`.
|
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): improve the performance for relabeling rules with commonly used regular expressions in `regex` and `if` fields such as `some_string`, `prefix.*`, `prefix.+`, `foo|bar|baz`, `.*foo.*` and `.+foo.+`.
|
||||||
|
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): reduce CPU usage when discovering big number of [Kubernetes targets](https://docs.victoriametrics.com/sd_configs.html#kubernetes_sd_configs) with big number of labels and annotations.
|
||||||
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add ability to accept [multitenant](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#multitenancy) data via OpenTSDB `/api/put` protocol at `/insert/<tenantID>/opentsdb/api/put` http endpoint if [multitenant support](https://docs.victoriametrics.com/vmagent.html#multitenancy) is enabled at `vmagent`. Thanks to @chengjianyun for [the pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/3015).
|
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add ability to accept [multitenant](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#multitenancy) data via OpenTSDB `/api/put` protocol at `/insert/<tenantID>/opentsdb/api/put` http endpoint if [multitenant support](https://docs.victoriametrics.com/vmagent.html#multitenancy) is enabled at `vmagent`. Thanks to @chengjianyun for [the pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/3015).
|
||||||
* FEATURE: [monitoring](https://docs.victoriametrics.com/#monitoring): expose `vm_hourly_series_limit_max_series`, `vm_hourly_series_limit_current_series`, `vm_daily_series_limit_max_series` and `vm_daily_series_limit_current_series` metrics when `-search.maxHourlySeries` or `-search.maxDailySeries` limits are set. This allows alerting when the number of unique series reaches the configured limits. See [these docs](https://docs.victoriametrics.com/#cardinality-limiter) for details.
|
* FEATURE: [monitoring](https://docs.victoriametrics.com/#monitoring): expose `vm_hourly_series_limit_max_series`, `vm_hourly_series_limit_current_series`, `vm_daily_series_limit_max_series` and `vm_daily_series_limit_current_series` metrics when `-search.maxHourlySeries` or `-search.maxDailySeries` limits are set. This allows alerting when the number of unique series reaches the configured limits. See [these docs](https://docs.victoriametrics.com/#cardinality-limiter) for details.
|
||||||
* FEATURE: [VictoriaMetrics cluster](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html): reduce the amounts of logging at `vmstorage` when `vmselect` connects/disconnects to `vmstorage`.
|
* FEATURE: [VictoriaMetrics cluster](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html): reduce the amounts of logging at `vmstorage` when `vmselect` connects/disconnects to `vmstorage`.
|
||||||
|
|
|
@ -11,7 +11,6 @@ import (
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"sync/atomic"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||||
|
@ -35,6 +34,7 @@ import (
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/kubernetes"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/kubernetes"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/openstack"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/openstack"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/yandexcloud"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/yandexcloud"
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutils"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/proxy"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/proxy"
|
||||||
"github.com/VictoriaMetrics/metrics"
|
"github.com/VictoriaMetrics/metrics"
|
||||||
|
@ -1330,37 +1330,11 @@ func (swc *scrapeWorkConfig) getScrapeWork(target string, extraLabels, metaLabel
|
||||||
func internLabelStrings(labels []prompbmarshal.Label) {
|
func internLabelStrings(labels []prompbmarshal.Label) {
|
||||||
for i := range labels {
|
for i := range labels {
|
||||||
label := &labels[i]
|
label := &labels[i]
|
||||||
label.Name = internString(label.Name)
|
label.Name = discoveryutils.InternString(label.Name)
|
||||||
label.Value = internString(label.Value)
|
label.Value = discoveryutils.InternString(label.Value)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func internString(s string) string {
|
|
||||||
m := internStringsMap.Load().(*sync.Map)
|
|
||||||
if v, ok := m.Load(s); ok {
|
|
||||||
sp := v.(*string)
|
|
||||||
return *sp
|
|
||||||
}
|
|
||||||
// Make a new copy for s in order to remove references from possible bigger string s refers to.
|
|
||||||
sCopy := string(append([]byte{}, s...))
|
|
||||||
m.Store(sCopy, &sCopy)
|
|
||||||
n := atomic.AddUint64(&internStringsMapLen, 1)
|
|
||||||
if n > 100e3 {
|
|
||||||
atomic.StoreUint64(&internStringsMapLen, 0)
|
|
||||||
internStringsMap.Store(&sync.Map{})
|
|
||||||
}
|
|
||||||
return sCopy
|
|
||||||
}
|
|
||||||
|
|
||||||
var (
|
|
||||||
internStringsMap atomic.Value
|
|
||||||
internStringsMapLen uint64
|
|
||||||
)
|
|
||||||
|
|
||||||
func init() {
|
|
||||||
internStringsMap.Store(&sync.Map{})
|
|
||||||
}
|
|
||||||
|
|
||||||
func getParamsFromLabels(labels []prompbmarshal.Label, paramsOrig map[string][]string) map[string][]string {
|
func getParamsFromLabels(labels []prompbmarshal.Label, paramsOrig map[string][]string) map[string][]string {
|
||||||
// See https://www.robustperception.io/life-of-a-label
|
// See https://www.robustperception.io/life-of-a-label
|
||||||
m := make(map[string][]string)
|
m := make(map[string][]string)
|
||||||
|
|
|
@ -17,44 +17,6 @@ import (
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/proxy"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/proxy"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestInternStringSerial(t *testing.T) {
|
|
||||||
if err := testInternString(t); err != nil {
|
|
||||||
t.Fatalf("unexpected error: %s", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestInternStringConcurrent(t *testing.T) {
|
|
||||||
concurrency := 5
|
|
||||||
resultCh := make(chan error, concurrency)
|
|
||||||
for i := 0; i < concurrency; i++ {
|
|
||||||
go func() {
|
|
||||||
resultCh <- testInternString(t)
|
|
||||||
}()
|
|
||||||
}
|
|
||||||
timer := time.NewTimer(5 * time.Second)
|
|
||||||
for i := 0; i < concurrency; i++ {
|
|
||||||
select {
|
|
||||||
case err := <-resultCh:
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("unexpected error: %s", err)
|
|
||||||
}
|
|
||||||
case <-timer.C:
|
|
||||||
t.Fatalf("timeout")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func testInternString(t *testing.T) error {
|
|
||||||
for i := 0; i < 1000; i++ {
|
|
||||||
s := fmt.Sprintf("foo_%d", i)
|
|
||||||
s1 := internString(s)
|
|
||||||
if s != s1 {
|
|
||||||
return fmt.Errorf("unexpected string returned from internString; got %q; want %q", s1, s)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestMergeLabels(t *testing.T) {
|
func TestMergeLabels(t *testing.T) {
|
||||||
f := func(swc *scrapeWorkConfig, target string, extraLabels, metaLabels map[string]string, resultExpected string) {
|
f := func(swc *scrapeWorkConfig, target string, extraLabels, metaLabels map[string]string, resultExpected string) {
|
||||||
t.Helper()
|
t.Helper()
|
||||||
|
|
|
@ -94,16 +94,13 @@ func (sn *ServiceNode) appendTargetLabels(ms []map[string]string, serviceName, t
|
||||||
m["__meta_consul_tags"] = tagSeparator + strings.Join(sn.Service.Tags, tagSeparator) + tagSeparator
|
m["__meta_consul_tags"] = tagSeparator + strings.Join(sn.Service.Tags, tagSeparator) + tagSeparator
|
||||||
|
|
||||||
for k, v := range sn.Node.Meta {
|
for k, v := range sn.Node.Meta {
|
||||||
key := discoveryutils.SanitizeLabelName(k)
|
m[discoveryutils.SanitizeLabelName("__meta_consul_metadata_"+k)] = v
|
||||||
m["__meta_consul_metadata_"+key] = v
|
|
||||||
}
|
}
|
||||||
for k, v := range sn.Service.Meta {
|
for k, v := range sn.Service.Meta {
|
||||||
key := discoveryutils.SanitizeLabelName(k)
|
m[discoveryutils.SanitizeLabelName("__meta_consul_service_metadata_"+k)] = v
|
||||||
m["__meta_consul_service_metadata_"+key] = v
|
|
||||||
}
|
}
|
||||||
for k, v := range sn.Node.TaggedAddresses {
|
for k, v := range sn.Node.TaggedAddresses {
|
||||||
key := discoveryutils.SanitizeLabelName(k)
|
m[discoveryutils.SanitizeLabelName("__meta_consul_tagged_address_"+k)] = v
|
||||||
m["__meta_consul_tagged_address_"+key] = v
|
|
||||||
}
|
}
|
||||||
ms = append(ms, m)
|
ms = append(ms, m)
|
||||||
return ms
|
return ms
|
||||||
|
|
|
@ -107,7 +107,7 @@ func addCommonLabels(m map[string]string, c *container, networkLabels map[string
|
||||||
m["__meta_docker_container_name"] = c.Names[0]
|
m["__meta_docker_container_name"] = c.Names[0]
|
||||||
m["__meta_docker_container_network_mode"] = c.HostConfig.NetworkMode
|
m["__meta_docker_container_network_mode"] = c.HostConfig.NetworkMode
|
||||||
for k, v := range c.Labels {
|
for k, v := range c.Labels {
|
||||||
m["__meta_docker_container_label_"+discoveryutils.SanitizeLabelName(k)] = v
|
m[discoveryutils.SanitizeLabelName("__meta_docker_container_label_"+k)] = v
|
||||||
}
|
}
|
||||||
for k, v := range networkLabels {
|
for k, v := range networkLabels {
|
||||||
m[k] = v
|
m[k] = v
|
||||||
|
|
|
@ -53,7 +53,7 @@ func getNetworkLabelsByNetworkID(networks []network) map[string]map[string]strin
|
||||||
"__meta_docker_network_scope": network.Scope,
|
"__meta_docker_network_scope": network.Scope,
|
||||||
}
|
}
|
||||||
for k, v := range network.Labels {
|
for k, v := range network.Labels {
|
||||||
m["__meta_docker_network_label_"+discoveryutils.SanitizeLabelName(k)] = v
|
m[discoveryutils.SanitizeLabelName("__meta_docker_network_label_"+k)] = v
|
||||||
}
|
}
|
||||||
ms[network.ID] = m
|
ms[network.ID] = m
|
||||||
}
|
}
|
||||||
|
|
|
@ -53,7 +53,7 @@ func getNetworkLabelsByNetworkID(networks []network) map[string]map[string]strin
|
||||||
"__meta_dockerswarm_network_scope": network.Scope,
|
"__meta_dockerswarm_network_scope": network.Scope,
|
||||||
}
|
}
|
||||||
for k, v := range network.Labels {
|
for k, v := range network.Labels {
|
||||||
m["__meta_dockerswarm_network_label_"+discoveryutils.SanitizeLabelName(k)] = v
|
m[discoveryutils.SanitizeLabelName("__meta_dockerswarm_network_label_"+k)] = v
|
||||||
}
|
}
|
||||||
ms[network.ID] = m
|
ms[network.ID] = m
|
||||||
}
|
}
|
||||||
|
|
|
@ -80,7 +80,7 @@ func addNodeLabels(nodes []node, port int) []map[string]string {
|
||||||
"__meta_dockerswarm_node_status": node.Status.State,
|
"__meta_dockerswarm_node_status": node.Status.State,
|
||||||
}
|
}
|
||||||
for k, v := range node.Spec.Labels {
|
for k, v := range node.Spec.Labels {
|
||||||
m["__meta_dockerswarm_node_label_"+discoveryutils.SanitizeLabelName(k)] = v
|
m[discoveryutils.SanitizeLabelName("__meta_dockerswarm_node_label_"+k)] = v
|
||||||
}
|
}
|
||||||
ms = append(ms, m)
|
ms = append(ms, m)
|
||||||
}
|
}
|
||||||
|
|
|
@ -96,7 +96,7 @@ func addServicesLabels(services []service, networksLabels map[string]map[string]
|
||||||
"__meta_dockerswarm_service_updating_status": service.UpdateStatus.State,
|
"__meta_dockerswarm_service_updating_status": service.UpdateStatus.State,
|
||||||
}
|
}
|
||||||
for k, v := range service.Spec.Labels {
|
for k, v := range service.Spec.Labels {
|
||||||
commonLabels["__meta_dockerswarm_service_label_"+discoveryutils.SanitizeLabelName(k)] = v
|
commonLabels[discoveryutils.SanitizeLabelName("__meta_dockerswarm_service_label_"+k)] = v
|
||||||
}
|
}
|
||||||
for _, vip := range service.Endpoint.VirtualIPs {
|
for _, vip := range service.Endpoint.VirtualIPs {
|
||||||
// skip services without virtual address.
|
// skip services without virtual address.
|
||||||
|
|
|
@ -87,7 +87,7 @@ func addTasksLabels(tasks []task, nodesLabels, servicesLabels []map[string]strin
|
||||||
"__meta_dockerswarm_task_state": task.Status.State,
|
"__meta_dockerswarm_task_state": task.Status.State,
|
||||||
}
|
}
|
||||||
for k, v := range task.Spec.ContainerSpec.Labels {
|
for k, v := range task.Spec.ContainerSpec.Labels {
|
||||||
commonLabels["__meta_dockerswarm_container_label_"+discoveryutils.SanitizeLabelName(k)] = v
|
commonLabels[discoveryutils.SanitizeLabelName("__meta_dockerswarm_container_label_"+k)] = v
|
||||||
}
|
}
|
||||||
var svcPorts []portConfig
|
var svcPorts []portConfig
|
||||||
for i, v := range services {
|
for i, v := range services {
|
||||||
|
|
|
@ -186,8 +186,7 @@ func (inst *Instance) appendTargetLabels(ms []map[string]string, ownerID string,
|
||||||
if len(t.Key) == 0 || len(t.Value) == 0 {
|
if len(t.Key) == 0 || len(t.Value) == 0 {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
name := discoveryutils.SanitizeLabelName(t.Key)
|
m[discoveryutils.SanitizeLabelName("__meta_ec2_tag_"+t.Key)] = t.Value
|
||||||
m["__meta_ec2_tag_"+name] = t.Value
|
|
||||||
}
|
}
|
||||||
ms = append(ms, m)
|
ms = append(ms, m)
|
||||||
return ms
|
return ms
|
||||||
|
|
|
@ -139,11 +139,11 @@ func addInstanceLabels(apps *applications) []map[string]string {
|
||||||
if len(instance.DataCenterInfo.Name) > 0 {
|
if len(instance.DataCenterInfo.Name) > 0 {
|
||||||
m["__meta_eureka_app_instance_datacenterinfo_name"] = instance.DataCenterInfo.Name
|
m["__meta_eureka_app_instance_datacenterinfo_name"] = instance.DataCenterInfo.Name
|
||||||
for _, tag := range instance.DataCenterInfo.Metadata.Items {
|
for _, tag := range instance.DataCenterInfo.Metadata.Items {
|
||||||
m["__meta_eureka_app_instance_datacenterinfo_metadata_"+discoveryutils.SanitizeLabelName(tag.XMLName.Local)] = tag.Content
|
m[discoveryutils.SanitizeLabelName("__meta_eureka_app_instance_datacenterinfo_metadata_"+tag.XMLName.Local)] = tag.Content
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
for _, tag := range instance.Metadata.Items {
|
for _, tag := range instance.Metadata.Items {
|
||||||
m["__meta_eureka_app_instance_metadata_"+discoveryutils.SanitizeLabelName(tag.XMLName.Local)] = tag.Content
|
m[discoveryutils.SanitizeLabelName("__meta_eureka_app_instance_metadata_"+tag.XMLName.Local)] = tag.Content
|
||||||
}
|
}
|
||||||
ms = append(ms, m)
|
ms = append(ms, m)
|
||||||
}
|
}
|
||||||
|
|
|
@ -150,8 +150,7 @@ func (inst *Instance) appendTargetLabels(ms []map[string]string, project, tagSep
|
||||||
"__meta_gce_zone": inst.Zone,
|
"__meta_gce_zone": inst.Zone,
|
||||||
}
|
}
|
||||||
for _, iface := range inst.NetworkInterfaces {
|
for _, iface := range inst.NetworkInterfaces {
|
||||||
ifaceName := discoveryutils.SanitizeLabelName(iface.Name)
|
m[discoveryutils.SanitizeLabelName("__meta_gce_interface_ipv4_"+iface.Name)] = iface.NetworkIP
|
||||||
m["__meta_gce_interface_ipv4_"+ifaceName] = iface.NetworkIP
|
|
||||||
}
|
}
|
||||||
if len(inst.Tags.Items) > 0 {
|
if len(inst.Tags.Items) > 0 {
|
||||||
// We surround the separated list with the separator as well. This way regular expressions
|
// We surround the separated list with the separator as well. This way regular expressions
|
||||||
|
@ -159,12 +158,10 @@ func (inst *Instance) appendTargetLabels(ms []map[string]string, project, tagSep
|
||||||
m["__meta_gce_tags"] = tagSeparator + strings.Join(inst.Tags.Items, tagSeparator) + tagSeparator
|
m["__meta_gce_tags"] = tagSeparator + strings.Join(inst.Tags.Items, tagSeparator) + tagSeparator
|
||||||
}
|
}
|
||||||
for _, item := range inst.Metadata.Items {
|
for _, item := range inst.Metadata.Items {
|
||||||
key := discoveryutils.SanitizeLabelName(item.Key)
|
m[discoveryutils.SanitizeLabelName("__meta_gce_metadata_"+item.Key)] = item.Value
|
||||||
m["__meta_gce_metadata_"+key] = item.Value
|
|
||||||
}
|
}
|
||||||
for _, label := range inst.Labels {
|
for _, label := range inst.Labels {
|
||||||
name := discoveryutils.SanitizeLabelName(label.Name)
|
m[discoveryutils.SanitizeLabelName("__meta_gce_label_"+label.Name)] = label.Value
|
||||||
m["__meta_gce_label_"+name] = label.Value
|
|
||||||
}
|
}
|
||||||
if len(iface.AccessConfigs) > 0 {
|
if len(iface.AccessConfigs) > 0 {
|
||||||
ac := iface.AccessConfigs[0]
|
ac := iface.AccessConfigs[0]
|
||||||
|
|
|
@ -28,14 +28,12 @@ type ListMeta struct {
|
||||||
|
|
||||||
func (om *ObjectMeta) registerLabelsAndAnnotations(prefix string, m map[string]string) {
|
func (om *ObjectMeta) registerLabelsAndAnnotations(prefix string, m map[string]string) {
|
||||||
for _, lb := range om.Labels {
|
for _, lb := range om.Labels {
|
||||||
ln := discoveryutils.SanitizeLabelName(lb.Name)
|
m[discoveryutils.SanitizeLabelName(prefix+"_label_"+lb.Name)] = lb.Value
|
||||||
m[prefix+"_label_"+ln] = lb.Value
|
m[discoveryutils.SanitizeLabelName(prefix+"_labelpresent_"+lb.Name)] = "true"
|
||||||
m[prefix+"_labelpresent_"+ln] = "true"
|
|
||||||
}
|
}
|
||||||
for _, a := range om.Annotations {
|
for _, a := range om.Annotations {
|
||||||
an := discoveryutils.SanitizeLabelName(a.Name)
|
m[discoveryutils.SanitizeLabelName(prefix+"_annotation_"+a.Name)] = a.Value
|
||||||
m[prefix+"_annotation_"+an] = a.Value
|
m[discoveryutils.SanitizeLabelName(prefix+"_annotationpresent_"+a.Name)] = "true"
|
||||||
m[prefix+"_annotationpresent_"+an] = "true"
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -151,8 +151,8 @@ func getEndpointSliceLabels(eps *EndpointSlice, addr string, ea Endpoint, epp En
|
||||||
m["__meta_kubernetes_endpointslice_endpoint_hostname"] = ea.Hostname
|
m["__meta_kubernetes_endpointslice_endpoint_hostname"] = ea.Hostname
|
||||||
}
|
}
|
||||||
for k, v := range ea.Topology {
|
for k, v := range ea.Topology {
|
||||||
m["__meta_kubernetes_endpointslice_endpoint_topology_"+discoveryutils.SanitizeLabelName(k)] = v
|
m[discoveryutils.SanitizeLabelName("__meta_kubernetes_endpointslice_endpoint_topology_"+k)] = v
|
||||||
m["__meta_kubernetes_endpointslice_endpoint_topology_present_"+discoveryutils.SanitizeLabelName(k)] = "true"
|
m[discoveryutils.SanitizeLabelName("__meta_kubernetes_endpointslice_endpoint_topology_present_"+k)] = "true"
|
||||||
}
|
}
|
||||||
return m
|
return m
|
||||||
}
|
}
|
||||||
|
|
|
@ -104,8 +104,7 @@ func (n *Node) getTargetLabels(gw *groupWatcher) []map[string]string {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
addrTypesUsed[a.Type] = true
|
addrTypesUsed[a.Type] = true
|
||||||
ln := discoveryutils.SanitizeLabelName(a.Type)
|
m[discoveryutils.SanitizeLabelName("__meta_kubernetes_node_address_"+a.Type)] = a.Address
|
||||||
m["__meta_kubernetes_node_address_"+ln] = a.Address
|
|
||||||
}
|
}
|
||||||
return []map[string]string{m}
|
return []map[string]string{m}
|
||||||
}
|
}
|
||||||
|
|
|
@ -57,7 +57,7 @@ func addInstanceLabels(servers []server, port int) []map[string]string {
|
||||||
"__meta_openstack_instance_flavor": server.Flavor.ID,
|
"__meta_openstack_instance_flavor": server.Flavor.ID,
|
||||||
}
|
}
|
||||||
for k, v := range server.Metadata {
|
for k, v := range server.Metadata {
|
||||||
m["__meta_openstack_tag_"+discoveryutils.SanitizeLabelName(k)] = v
|
m[discoveryutils.SanitizeLabelName("__meta_openstack_tag_"+k)] = v
|
||||||
}
|
}
|
||||||
// Traverse server.Addresses in alphabetical order of pool name
|
// Traverse server.Addresses in alphabetical order of pool name
|
||||||
// in order to return targets in deterministic order.
|
// in order to return targets in deterministic order.
|
||||||
|
|
|
@ -51,7 +51,7 @@ func addInstanceLabels(instances []instance) []map[string]string {
|
||||||
"__meta_yandexcloud_folder_id": server.FolderID,
|
"__meta_yandexcloud_folder_id": server.FolderID,
|
||||||
}
|
}
|
||||||
for k, v := range server.Labels {
|
for k, v := range server.Labels {
|
||||||
m["__meta_yandexcloud_instance_label_"+discoveryutils.SanitizeLabelName(k)] = v
|
m[discoveryutils.SanitizeLabelName("__meta_yandexcloud_instance_label_"+k)] = v
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, ni := range server.NetworkInterfaces {
|
for _, ni := range server.NetworkInterfaces {
|
||||||
|
|
35
lib/promscrape/discoveryutils/internstring.go
Normal file
35
lib/promscrape/discoveryutils/internstring.go
Normal file
|
@ -0,0 +1,35 @@
|
||||||
|
package discoveryutils
|
||||||
|
|
||||||
|
import (
|
||||||
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
|
)
|
||||||
|
|
||||||
|
// InternString returns interned s.
|
||||||
|
//
|
||||||
|
// This may be needed for reducing the amounts of allocated memory.
|
||||||
|
func InternString(s string) string {
|
||||||
|
m := internStringsMap.Load().(*sync.Map)
|
||||||
|
if v, ok := m.Load(s); ok {
|
||||||
|
sp := v.(*string)
|
||||||
|
return *sp
|
||||||
|
}
|
||||||
|
// Make a new copy for s in order to remove references from possible bigger string s refers to.
|
||||||
|
sCopy := string(append([]byte{}, s...))
|
||||||
|
m.Store(sCopy, &sCopy)
|
||||||
|
n := atomic.AddUint64(&internStringsMapLen, 1)
|
||||||
|
if n > 100e3 {
|
||||||
|
atomic.StoreUint64(&internStringsMapLen, 0)
|
||||||
|
internStringsMap.Store(&sync.Map{})
|
||||||
|
}
|
||||||
|
return sCopy
|
||||||
|
}
|
||||||
|
|
||||||
|
var (
|
||||||
|
internStringsMap atomic.Value
|
||||||
|
internStringsMapLen uint64
|
||||||
|
)
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
internStringsMap.Store(&sync.Map{})
|
||||||
|
}
|
45
lib/promscrape/discoveryutils/internstring_test.go
Normal file
45
lib/promscrape/discoveryutils/internstring_test.go
Normal file
|
@ -0,0 +1,45 @@
|
||||||
|
package discoveryutils
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestInternStringSerial(t *testing.T) {
|
||||||
|
if err := testInternString(t); err != nil {
|
||||||
|
t.Fatalf("unexpected error: %s", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestInternStringConcurrent(t *testing.T) {
|
||||||
|
concurrency := 5
|
||||||
|
resultCh := make(chan error, concurrency)
|
||||||
|
for i := 0; i < concurrency; i++ {
|
||||||
|
go func() {
|
||||||
|
resultCh <- testInternString(t)
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
timer := time.NewTimer(5 * time.Second)
|
||||||
|
for i := 0; i < concurrency; i++ {
|
||||||
|
select {
|
||||||
|
case err := <-resultCh:
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unexpected error: %s", err)
|
||||||
|
}
|
||||||
|
case <-timer.C:
|
||||||
|
t.Fatalf("timeout")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func testInternString(t *testing.T) error {
|
||||||
|
for i := 0; i < 1000; i++ {
|
||||||
|
s := fmt.Sprintf("foo_%d", i)
|
||||||
|
s1 := InternString(s)
|
||||||
|
if s != s1 {
|
||||||
|
return fmt.Errorf("unexpected string returned from internString; got %q; want %q", s1, s)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
|
@ -1,4 +1,4 @@
|
||||||
package promscrape
|
package discoveryutils
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
@ -15,7 +15,7 @@ func BenchmarkInternString(b *testing.B) {
|
||||||
b.RunParallel(func(pb *testing.PB) {
|
b.RunParallel(func(pb *testing.PB) {
|
||||||
for pb.Next() {
|
for pb.Next() {
|
||||||
for _, s := range a {
|
for _, s := range a {
|
||||||
sResult := internString(s)
|
sResult := InternString(s)
|
||||||
if sResult != s {
|
if sResult != s {
|
||||||
panic(fmt.Sprintf("unexpected string obtained; got %q; want %q", sResult, s))
|
panic(fmt.Sprintf("unexpected string obtained; got %q; want %q", sResult, s))
|
||||||
}
|
}
|
|
@ -6,6 +6,8 @@ import (
|
||||||
"regexp"
|
"regexp"
|
||||||
"sort"
|
"sort"
|
||||||
"strconv"
|
"strconv"
|
||||||
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
|
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
||||||
)
|
)
|
||||||
|
@ -15,13 +17,39 @@ import (
|
||||||
//
|
//
|
||||||
// This has been copied from Prometheus sources at util/strutil/strconv.go
|
// This has been copied from Prometheus sources at util/strutil/strconv.go
|
||||||
func SanitizeLabelName(name string) string {
|
func SanitizeLabelName(name string) string {
|
||||||
return invalidLabelCharRE.ReplaceAllString(name, "_")
|
m := sanitizedLabelNames.Load().(*sync.Map)
|
||||||
|
v, ok := m.Load(name)
|
||||||
|
if ok {
|
||||||
|
// Fast path - the sanitized label name is found in the cache.
|
||||||
|
sp := v.(*string)
|
||||||
|
return *sp
|
||||||
|
}
|
||||||
|
// Slow path - sanitize name and store it in the cache.
|
||||||
|
sanitizedName := invalidLabelCharRE.ReplaceAllString(name, "_")
|
||||||
|
// Make a copy of name in order to limit memory usage to the name length,
|
||||||
|
// since the name may point to bigger string.
|
||||||
|
s := string(append([]byte{}, name...))
|
||||||
|
sp := &sanitizedName
|
||||||
|
m.Store(s, sp)
|
||||||
|
n := atomic.AddUint64(&sanitizedLabelNamesLen, 1)
|
||||||
|
if n > 100e3 {
|
||||||
|
atomic.StoreUint64(&sanitizedLabelNamesLen, 0)
|
||||||
|
sanitizedLabelNames.Store(&sync.Map{})
|
||||||
|
}
|
||||||
|
return sanitizedName
|
||||||
}
|
}
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
sanitizedLabelNames atomic.Value
|
||||||
|
sanitizedLabelNamesLen uint64
|
||||||
|
|
||||||
invalidLabelCharRE = regexp.MustCompile(`[^a-zA-Z0-9_]`)
|
invalidLabelCharRE = regexp.MustCompile(`[^a-zA-Z0-9_]`)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
sanitizedLabelNames.Store(&sync.Map{})
|
||||||
|
}
|
||||||
|
|
||||||
// JoinHostPort returns host:port.
|
// JoinHostPort returns host:port.
|
||||||
//
|
//
|
||||||
// Host may be dns name, ipv4 or ipv6 address.
|
// Host may be dns name, ipv4 or ipv6 address.
|
||||||
|
|
56
lib/promscrape/discoveryutils/utils_test.go
Normal file
56
lib/promscrape/discoveryutils/utils_test.go
Normal file
|
@ -0,0 +1,56 @@
|
||||||
|
package discoveryutils
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestSanitizeLabelNameSerial(t *testing.T) {
|
||||||
|
if err := testSanitizeLabelName(); err != nil {
|
||||||
|
t.Fatalf("unexpected error: %s", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestSanitizeLabelNameParallel(t *testing.T) {
|
||||||
|
goroutines := 5
|
||||||
|
ch := make(chan error, goroutines)
|
||||||
|
for i := 0; i < goroutines; i++ {
|
||||||
|
go func() {
|
||||||
|
ch <- testSanitizeLabelName()
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
tch := time.After(5 * time.Second)
|
||||||
|
for i := 0; i < goroutines; i++ {
|
||||||
|
select {
|
||||||
|
case <-tch:
|
||||||
|
t.Fatalf("timeout!")
|
||||||
|
case err := <-ch:
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unexpected error: %s", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func testSanitizeLabelName() error {
|
||||||
|
f := func(name, expectedSanitizedName string) error {
|
||||||
|
for i := 0; i < 5; i++ {
|
||||||
|
sanitizedName := SanitizeLabelName(name)
|
||||||
|
if sanitizedName != expectedSanitizedName {
|
||||||
|
return fmt.Errorf("unexpected sanitized label name %q; got %q; want %q", name, sanitizedName, expectedSanitizedName)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
if err := f("", ""); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := f("foo", "foo"); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := f("foo-bar/baz", "foo_bar_baz"); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
21
lib/promscrape/discoveryutils/utils_timing_test.go
Normal file
21
lib/promscrape/discoveryutils/utils_timing_test.go
Normal file
|
@ -0,0 +1,21 @@
|
||||||
|
package discoveryutils
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"testing"
|
||||||
|
)
|
||||||
|
|
||||||
|
func BenchmarkSanitizeLabelName(b *testing.B) {
|
||||||
|
labelName := "foo-bar/baz/aaaa+bbb"
|
||||||
|
expectedLabelNameSanitized := "foo_bar_baz_aaaa_bbb"
|
||||||
|
b.SetBytes(1)
|
||||||
|
b.ReportAllocs()
|
||||||
|
b.RunParallel(func(pb *testing.PB) {
|
||||||
|
for pb.Next() {
|
||||||
|
labelNameSanitized := SanitizeLabelName(labelName)
|
||||||
|
if labelNameSanitized != expectedLabelNameSanitized {
|
||||||
|
panic(fmt.Errorf("unexpected sanitized label name; got %q; want %q", labelNameSanitized, expectedLabelNameSanitized))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
Loading…
Reference in a new issue