diff --git a/.github/ISSUE_TEMPLATE/bug_report.md b/.github/ISSUE_TEMPLATE/bug_report.md index 0d0453541..a451b5f78 100644 --- a/.github/ISSUE_TEMPLATE/bug_report.md +++ b/.github/ISSUE_TEMPLATE/bug_report.md @@ -8,10 +8,11 @@ assignees: '' --- **Describe the bug** -A clear and concise description of what the bug is. +A clear and concise description of what the bug is. It would be great updating to [the latest avaialble release](https://github.com/VictoriaMetrics/VictoriaMetrics/releases) +and verifying whether the bug is reproducible there. It is also recommended reading [troubleshooting docs](https://victoriametrics.github.io/#troubleshooting). **To Reproduce** -Steps to reproduce the behavior +Steps to reproduce the behavior. **Expected behavior** A clear and concise description of what you expected to happen. diff --git a/CODE_OF_CONDUCT_RU.md b/CODE_OF_CONDUCT_RU.md new file mode 100644 index 000000000..26a5ecf4c --- /dev/null +++ b/CODE_OF_CONDUCT_RU.md @@ -0,0 +1,120 @@ + +# Кодекс Поведения участника + +## Наши обязательства + +Мы, как участники, авторы и лидеры обязуемся сделать участие в сообществе +свободным от притеснений для всех, независимо от возраста, телосложения, +видимых или невидимых ограничений способности, этнической принадлежности, +половых признаков, гендерной идентичности и выражения, уровня опыта, +образования, социо-экономического статуса, национальности, внешности, +расы, религии, или сексуальной идентичности и ориентации. + +Мы обещаем действовать и взаимодействовать таким образом, чтобы вносить вклад в открытое, +дружелюбное, многообразное, инклюзивное и здоровое сообщество. + +## Наши стандарты + +Примеры поведения, создающие условия для благоприятных взаимоотношений включают в себя: + +* Проявление доброты и эмпатии к другим участникам проекта +* Уважение к чужой точке зрения и опыту +* Конструктивная критика и принятие конструктивной критики +* Принятие ответственности, принесение извинений тем, кто пострадал от наших ошибок + и извлечение уроков из опыта +* Ориентирование на то, что лучше подходит для сообщества, а не только для нас лично + +Примеры неприемлемого поведения участников включают в себя: + +* Использование выражений или изображений сексуального характера и нежелательное сексуальное внимание или домогательство в любой форме +* Троллинг, оскорбительные или уничижительные комментарии, переход на личности или затрагивание политических убеждений +* Публичное или приватное домогательство +* Публикация личной информации других лиц, например, физического или электронного адреса, без явного разрешения +* Иное поведение, которое обоснованно считать неуместным в профессиональной обстановке + +## Обязанности + +Лидеры сообщества отвечают за разъяснение и применение наших стандартов приемлемого +поведения и будут предпринимать соответствующие и честные меры по исправлению положения +в ответ на любое поведение, которое они сочтут неприемлемым, угрожающим, оскорбительным или вредным. + +Лидеры сообщества обладают правом и обязанностью удалять, редактировать или отклонять +комментарии, коммиты, код, изменения в вики, вопросы и другой вклад, который не совпадает +с Кодексом Поведения, и предоставят причины принятого решения, когда сочтут нужным. + +## Область применения + +Данный Кодекс Поведения применим во всех во всех публичных физических и цифровых пространства сообщества, +а также когда человек официально представляет сообщество в публичных местах. +Примеры представления проекта или сообщества включают использование официальной электронной почты, +публикации в официальном аккаунте в социальных сетях, +или упоминания как представителя в онлайн или оффлайн мероприятии. + +## Приведение в исполнение + +О случаях домогательства, а так же оскорбительного или иного другого неприемлемого +поведения можно сообщить ответственным лидерам сообщества с помощью письма на info@victoriametrics.com +Все жалобы будут рассмотрены и расследованы оперативно и беспристрастно. + +Все лидеры сообщества обязаны уважать неприкосновенность частной жизни и личную +неприкосновенность автора сообщения. + +## Руководство по исполнению + +Лидеры сообщества будут следовать следующим Принципам Воздействия в Сообществе, +чтобы определить последствия для тех, кого они считают виновными в нарушении данного Кодекса Поведения: + +### 1. Исправление + +**Общественное влияние**: Использование недопустимой лексики или другое поведение, +считающиеся непрофессиональным или нежелательным в сообществе. + +**Последствия**: Личное, письменное предупреждение от лидеров сообщества, +объясняющее суть нарушения и почему такое поведение +было неуместно. Лидеры сообщества могут попросить принести публичное извинение. + +### 2. Предупреждение + +**Общественное влияние**: Нарушение в результате одного инцидента или серии действий. + +**Последствия**: Предупреждение о последствиях в случае продолжающегося неуместного поведения. +На определенное время не допускается взаимодействие с людьми, вовлеченными в инцидент, +включая незапрошенное взаимодействие +с теми, кто обеспечивает соблюдение Кодекса. Это включает в себя избегание взаимодействия +в публичных пространствах, а так же во внешних каналах, +таких как социальные сети. Нарушение этих правил влечет за собой временный или вечный бан. + +### 3. Временный бан + +**Общественное влияние**: Серьёзное нарушение стандартов сообщества, +включая продолжительное неуместное поведение. + +**Последствия**: Временный запрет (бан) на любое взаимодействие +или публичное общение с сообществом на определенный период времени. +На этот период не допускается публичное или личное взаимодействие с людьми, +вовлеченными в инцидент, включая незапрошенное взаимодействие +с теми, кто обеспечивает соблюдение Кодекса. +Нарушение этих правил влечет за собой вечный бан. + +### 4. Вечный бан + +**Общественное влияние**: Демонстрация систематических нарушений стандартов сообщества, +включая продолжающееся неуместное поведение, домогательство до отдельных лиц, +или проявление агрессии либо пренебрежительного отношения к категориям лиц. + +**Последствия**: Вечный запрет на любое публичное взаимодействие с сообществом. + +## Атрибуция + +Данный Кодекс Поведения основан на [Кодекс Поведения участника][homepage], +версии 2.0, доступной по адресу +https://www.contributor-covenant.org/version/2/0/code_of_conduct.html. + +Принципы Воздействия в Сообществе были вдохновлены [Mozilla's code of conduct +enforcement ladder](https://github.com/mozilla/diversity). + +[homepage]: https://www.contributor-covenant.org + +Ответы на общие вопросы о данном кодексе поведения ищите на странице FAQ: +https://www.contributor-covenant.org/faq. Переводы доступны по адресу +https://www.contributor-covenant.org/translations. diff --git a/README.md b/README.md index 3386b7a4c..531ea7522 100644 --- a/README.md +++ b/README.md @@ -30,6 +30,7 @@ See [features available for enterprise customers](https://victoriametrics.com/en Click on a link in order to read the corresponding case study +* [zhihu](https://github.com/VictoriaMetrics/VictoriaMetrics/wiki/CaseStudies#zhihu) * [adidas](https://github.com/VictoriaMetrics/VictoriaMetrics/wiki/CaseStudies#adidas) * [CERN](https://github.com/VictoriaMetrics/VictoriaMetrics/wiki/CaseStudies#cern) * [COLOPL](https://github.com/VictoriaMetrics/VictoriaMetrics/wiki/CaseStudies#colopl) @@ -834,8 +835,9 @@ Time series data can be imported via any supported ingestion protocol: ### How to import data in native format -The most efficient protocol for importing data into VictoriaMetrics is `/api/v1/import/native`. -Example for importing data obtained via [/api/v1/export/native](#how-to-export-data-in-native-format): +The specification of VictoriaMetrics' native format may yet change and is not formally documented yet. So currently we do not recommend that external clients attempt to pack their own metrics in native format file. + +If you have a native format file obtained via [/api/v1/export/native](#how-to-export-data-in-native-format) however this is the most efficient protocol for importing data in. ```bash # Export the data from : @@ -1100,6 +1102,8 @@ VictoriaMetrics de-duplicates data points if `-dedup.minScrapeInterval` command- is set to positive duration. For example, `-dedup.minScrapeInterval=60s` would de-duplicate data points on the same time series if they fall within the same discrete 60s bucket. The earliest data point will be kept. In the case of equal timestamps, an arbitrary data point will be kept. +The recommended value for `-dedup.minScrapeInterval` must equal to `scrape_interval` config from Prometheus configs. + The de-duplication reduces disk space usage if multiple identically configured Prometheus instances in HA pair write data to the same VictoriaMetrics instance. Note that these Prometheus instances must have identical `external_labels` section in their configs, so they write data to the same time series. diff --git a/app/victoria-metrics/main.go b/app/victoria-metrics/main.go index 6349dd3f3..c74d74768 100644 --- a/app/victoria-metrics/main.go +++ b/app/victoria-metrics/main.go @@ -11,7 +11,6 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmstorage" "github.com/VictoriaMetrics/VictoriaMetrics/lib/buildinfo" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/envflag" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fs" "github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver" @@ -36,7 +35,6 @@ func main() { envflag.Parse() buildinfo.Init() logger.Init() - cgroup.UpdateGOMAXPROCSToCPUQuota() if promscrape.IsDryRun() { *dryRun = true diff --git a/app/vmagent/common/push_ctx.go b/app/vmagent/common/push_ctx.go index 1c4e19d36..e25ddd03a 100644 --- a/app/vmagent/common/push_ctx.go +++ b/app/vmagent/common/push_ctx.go @@ -1,9 +1,9 @@ package common import ( - "runtime" "sync" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel" ) @@ -63,4 +63,4 @@ func PutPushCtx(ctx *PushCtx) { } var pushCtxPool sync.Pool -var pushCtxPoolCh = make(chan *PushCtx, runtime.GOMAXPROCS(-1)) +var pushCtxPoolCh = make(chan *PushCtx, cgroup.AvailableCPUs()) diff --git a/app/vmagent/influx/request_handler.go b/app/vmagent/influx/request_handler.go index b9d1e7c3f..6235e61f9 100644 --- a/app/vmagent/influx/request_handler.go +++ b/app/vmagent/influx/request_handler.go @@ -4,12 +4,12 @@ import ( "flag" "io" "net/http" - "runtime" "sync" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/common" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/remotewrite" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel" parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/influx" @@ -165,4 +165,4 @@ func putPushCtx(ctx *pushCtx) { } var pushCtxPool sync.Pool -var pushCtxPoolCh = make(chan *pushCtx, runtime.GOMAXPROCS(-1)) +var pushCtxPoolCh = make(chan *pushCtx, cgroup.AvailableCPUs()) diff --git a/app/vmagent/main.go b/app/vmagent/main.go index 973e468c4..4f2f269c7 100644 --- a/app/vmagent/main.go +++ b/app/vmagent/main.go @@ -21,7 +21,6 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/remotewrite" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/vmimport" "github.com/VictoriaMetrics/VictoriaMetrics/lib/buildinfo" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/envflag" "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver" @@ -68,7 +67,6 @@ func main() { remotewrite.InitSecretFlags() buildinfo.Init() logger.Init() - cgroup.UpdateGOMAXPROCSToCPUQuota() if promscrape.IsDryRun() { if err := promscrape.CheckConfig(); err != nil { diff --git a/app/vmagent/remotewrite/relabel.go b/app/vmagent/remotewrite/relabel.go index 0fe16e6b4..875694173 100644 --- a/app/vmagent/remotewrite/relabel.go +++ b/app/vmagent/remotewrite/relabel.go @@ -65,6 +65,9 @@ type relabelConfigs struct { func initLabelsGlobal() { labelsGlobal = nil for _, s := range *unparsedLabelsGlobal { + if len(s) == 0 { + continue + } n := strings.IndexByte(s, '=') if n < 0 { logger.Fatalf("missing '=' in `-remoteWrite.label`. It must contain label in the form `name=value`; got %q", s) diff --git a/app/vmagent/remotewrite/remotewrite.go b/app/vmagent/remotewrite/remotewrite.go index 93a146f09..80c742a1b 100644 --- a/app/vmagent/remotewrite/remotewrite.go +++ b/app/vmagent/remotewrite/remotewrite.go @@ -3,10 +3,10 @@ package remotewrite import ( "flag" "fmt" - "runtime" "sync" "sync/atomic" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" @@ -43,7 +43,7 @@ var allRelabelConfigs atomic.Value // maxQueues limits the maximum value for `-remoteWrite.queues`. There is no sense in setting too high value, // since it may lead to high memory usage due to big number of buffers. -var maxQueues = runtime.GOMAXPROCS(-1) * 4 +var maxQueues = cgroup.AvailableCPUs() * 4 // InitSecretFlags must be called after flag.Parse and before any logging. func InitSecretFlags() { diff --git a/app/vmalert/main.go b/app/vmalert/main.go index a46715206..18097680a 100644 --- a/app/vmalert/main.go +++ b/app/vmalert/main.go @@ -16,7 +16,6 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/remoteread" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/remotewrite" "github.com/VictoriaMetrics/VictoriaMetrics/lib/buildinfo" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/envflag" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" @@ -59,7 +58,6 @@ func main() { envflag.Parse() buildinfo.Init() logger.Init() - cgroup.UpdateGOMAXPROCSToCPUQuota() if *dryRun { u, _ := url.Parse("https://victoriametrics.com/") @@ -160,6 +158,9 @@ func newManager(ctx context.Context) (*manager, error) { manager.rr = rr for _, s := range *externalLabels { + if len(s) == 0 { + continue + } n := strings.IndexByte(s, '=') if n < 0 { return nil, fmt.Errorf("missing '=' in `-label`. It must contain label in the form `name=value`; got %q", s) diff --git a/app/vmauth/main.go b/app/vmauth/main.go index 30c33c61f..58c89238e 100644 --- a/app/vmauth/main.go +++ b/app/vmauth/main.go @@ -9,7 +9,6 @@ import ( "time" "github.com/VictoriaMetrics/VictoriaMetrics/lib/buildinfo" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/envflag" "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver" @@ -28,7 +27,6 @@ func main() { envflag.Parse() buildinfo.Init() logger.Init() - cgroup.UpdateGOMAXPROCSToCPUQuota() logger.Infof("starting vmauth at %q...", *httpListenAddr) startTime := time.Now() initAuthConfig() diff --git a/app/vmbackup/main.go b/app/vmbackup/main.go index 2e008ad02..13f0f23f2 100644 --- a/app/vmbackup/main.go +++ b/app/vmbackup/main.go @@ -12,7 +12,6 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/backup/fslocal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/backup/fsnil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/buildinfo" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/envflag" "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" @@ -40,7 +39,6 @@ func main() { envflag.Parse() buildinfo.Init() logger.Init() - cgroup.UpdateGOMAXPROCSToCPUQuota() if len(*snapshotCreateURL) > 0 { logger.Infof("Snapshots enabled") diff --git a/app/vminsert/common/insert_ctx_pool.go b/app/vminsert/common/insert_ctx_pool.go index 3f6c057bd..4fb20fb9c 100644 --- a/app/vminsert/common/insert_ctx_pool.go +++ b/app/vminsert/common/insert_ctx_pool.go @@ -1,8 +1,9 @@ package common import ( - "runtime" "sync" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" ) // GetInsertCtx returns InsertCtx from the pool. @@ -33,4 +34,4 @@ func PutInsertCtx(ctx *InsertCtx) { } var insertCtxPool sync.Pool -var insertCtxPoolCh = make(chan *InsertCtx, runtime.GOMAXPROCS(-1)) +var insertCtxPoolCh = make(chan *InsertCtx, cgroup.AvailableCPUs()) diff --git a/app/vminsert/influx/request_handler.go b/app/vminsert/influx/request_handler.go index 93a826cd0..b841dd0d2 100644 --- a/app/vminsert/influx/request_handler.go +++ b/app/vminsert/influx/request_handler.go @@ -4,12 +4,12 @@ import ( "flag" "io" "net/http" - "runtime" "sync" "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/common" "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/relabel" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb" parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/influx" "github.com/VictoriaMetrics/VictoriaMetrics/lib/storage" @@ -175,4 +175,4 @@ func putPushCtx(ctx *pushCtx) { } var pushCtxPool sync.Pool -var pushCtxPoolCh = make(chan *pushCtx, runtime.GOMAXPROCS(-1)) +var pushCtxPoolCh = make(chan *pushCtx, cgroup.AvailableCPUs()) diff --git a/app/vminsert/native/request_handler.go b/app/vminsert/native/request_handler.go index 9b0fc6477..cb36025dc 100644 --- a/app/vminsert/native/request_handler.go +++ b/app/vminsert/native/request_handler.go @@ -2,11 +2,11 @@ package native import ( "net/http" - "runtime" "sync" "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/common" "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/relabel" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" @@ -112,4 +112,4 @@ func putPushCtx(ctx *pushCtx) { } var pushCtxPool sync.Pool -var pushCtxPoolCh = make(chan *pushCtx, runtime.GOMAXPROCS(-1)) +var pushCtxPoolCh = make(chan *pushCtx, cgroup.AvailableCPUs()) diff --git a/app/vminsert/vmimport/request_handler.go b/app/vminsert/vmimport/request_handler.go index 6fbc7ab83..64002b4d4 100644 --- a/app/vminsert/vmimport/request_handler.go +++ b/app/vminsert/vmimport/request_handler.go @@ -2,11 +2,11 @@ package vmimport import ( "net/http" - "runtime" "sync" "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/common" "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/relabel" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" @@ -117,4 +117,4 @@ func putPushCtx(ctx *pushCtx) { } var pushCtxPool sync.Pool -var pushCtxPoolCh = make(chan *pushCtx, runtime.GOMAXPROCS(-1)) +var pushCtxPoolCh = make(chan *pushCtx, cgroup.AvailableCPUs()) diff --git a/app/vmrestore/main.go b/app/vmrestore/main.go index 895d37b97..c65f54bf5 100644 --- a/app/vmrestore/main.go +++ b/app/vmrestore/main.go @@ -9,7 +9,6 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/backup/common" "github.com/VictoriaMetrics/VictoriaMetrics/lib/backup/fslocal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/buildinfo" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/envflag" "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" @@ -33,7 +32,6 @@ func main() { envflag.Parse() buildinfo.Init() logger.Init() - cgroup.UpdateGOMAXPROCSToCPUQuota() srcFS, err := newSrcFS() if err != nil { diff --git a/app/vmselect/main.go b/app/vmselect/main.go index 3ec78f12e..5847113ca 100644 --- a/app/vmselect/main.go +++ b/app/vmselect/main.go @@ -5,7 +5,6 @@ import ( "flag" "fmt" "net/http" - "runtime" "strings" "time" @@ -15,6 +14,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/promql" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/searchutils" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmstorage" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fs" "github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" @@ -32,7 +32,7 @@ var ( ) func getDefaultMaxConcurrentRequests() int { - n := runtime.GOMAXPROCS(-1) + n := cgroup.AvailableCPUs() if n <= 4 { n *= 2 } diff --git a/app/vmselect/netstorage/netstorage.go b/app/vmselect/netstorage/netstorage.go index 8af6c3f82..4b907ce04 100644 --- a/app/vmselect/netstorage/netstorage.go +++ b/app/vmselect/netstorage/netstorage.go @@ -6,7 +6,6 @@ import ( "flag" "fmt" "regexp" - "runtime" "sort" "sync" "sync/atomic" @@ -15,6 +14,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/app/vmstorage" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmstorage/promdb" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/storage" @@ -180,7 +180,7 @@ func (rss *Results) RunParallel(f func(rs *Result, workerID uint) error) error { var perQueryRowsProcessed = metrics.NewHistogram(`vm_per_query_rows_processed_count`) var perQuerySeriesProcessed = metrics.NewHistogram(`vm_per_query_series_processed_count`) -var gomaxprocs = runtime.GOMAXPROCS(-1) +var gomaxprocs = cgroup.AvailableCPUs() type packedTimeseries struct { metricName string @@ -272,7 +272,7 @@ func unpackWorker() { // unpackBatchSize is the maximum number of blocks that may be unpacked at once by a single goroutine. // // This batch is needed in order to reduce contention for upackWorkCh in multi-CPU system. -var unpackBatchSize = 8 * runtime.GOMAXPROCS(-1) +var unpackBatchSize = 8 * cgroup.AvailableCPUs() // Unpack unpacks pts to dst. func (pts *packedTimeseries) Unpack(dst *Result, tbf *tmpBlocksFile, tr storage.TimeRange, fetchData bool) error { @@ -535,11 +535,18 @@ func GetGraphiteTags(filter string, limit int, deadline searchutils.Deadline) ([ } // Substitute "__name__" with "name" for Graphite compatibility for i := range labels { - if labels[i] == "__name__" { + if labels[i] != "__name__" { + continue + } + // Prevent from duplicate `name` tag. + // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/942 + if hasString(labels, "name") { + labels = append(labels[:i], labels[i+1:]...) + } else { labels[i] = "name" sort.Strings(labels) - break } + break } if len(filter) > 0 { labels, err = applyGraphiteRegexpFilter(filter, labels) @@ -553,6 +560,15 @@ func GetGraphiteTags(filter string, limit int, deadline searchutils.Deadline) ([ return labels, nil } +func hasString(a []string, s string) bool { + for _, x := range a { + if x == s { + return true + } + } + return false +} + // GetLabels returns labels until the given deadline. func GetLabels(deadline searchutils.Deadline) ([]string, error) { if deadline.Exceeded() { @@ -794,7 +810,7 @@ func ExportBlocks(sq *storage.SearchQuery, deadline searchutils.Deadline, f func sr.Init(vmstorage.Storage, tfss, tr, *maxMetricsPerSearch, deadline.Deadline()) // Start workers that call f in parallel on available CPU cores. - gomaxprocs := runtime.GOMAXPROCS(-1) + gomaxprocs := cgroup.AvailableCPUs() workCh := make(chan *exportWork, gomaxprocs*8) var ( errGlobal error diff --git a/app/vmselect/prometheus/prometheus.go b/app/vmselect/prometheus/prometheus.go index 8eba99869..e2c5cd704 100644 --- a/app/vmselect/prometheus/prometheus.go +++ b/app/vmselect/prometheus/prometheus.go @@ -5,7 +5,6 @@ import ( "fmt" "math" "net/http" - "runtime" "sort" "strconv" "strings" @@ -17,6 +16,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/promql" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/searchutils" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" @@ -147,7 +147,7 @@ func ExportCSVHandler(startTime time.Time, w http.ResponseWriter, r *http.Reques bw := bufferedwriter.Get(w) defer bufferedwriter.Put(bw) - resultsCh := make(chan *quicktemplate.ByteBuffer, runtime.GOMAXPROCS(-1)) + resultsCh := make(chan *quicktemplate.ByteBuffer, cgroup.AvailableCPUs()) doneCh := make(chan error) go func() { err := netstorage.ExportBlocks(sq, deadline, func(mn *storage.MetricName, b *storage.Block, tr storage.TimeRange) error { @@ -374,7 +374,7 @@ func exportHandler(w http.ResponseWriter, matches []string, start, end int64, fo bw := bufferedwriter.Get(w) defer bufferedwriter.Put(bw) - resultsCh := make(chan *quicktemplate.ByteBuffer, runtime.GOMAXPROCS(-1)) + resultsCh := make(chan *quicktemplate.ByteBuffer, cgroup.AvailableCPUs()) doneCh := make(chan error) if !reduceMemUsage { rss, err := netstorage.ProcessSearchQuery(sq, true, deadline) diff --git a/app/vmselect/promql/eval.go b/app/vmselect/promql/eval.go index d0c6c99ef..bd94207a7 100644 --- a/app/vmselect/promql/eval.go +++ b/app/vmselect/promql/eval.go @@ -4,12 +4,12 @@ import ( "flag" "fmt" "math" - "runtime" "sync" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/netstorage" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/searchutils" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/memory" "github.com/VictoriaMetrics/VictoriaMetrics/lib/storage" @@ -555,7 +555,7 @@ func evalRollupFuncWithSubquery(ec *EvalConfig, name string, rf rollupFunc, expr } func doParallel(tss []*timeseries, f func(ts *timeseries, values []float64, timestamps []int64) ([]float64, []int64)) { - concurrency := runtime.GOMAXPROCS(-1) + concurrency := cgroup.AvailableCPUs() if concurrency > len(tss) { concurrency = len(tss) } @@ -678,7 +678,7 @@ func evalRollupFuncWithMetricExpr(ec *EvalConfig, name string, rf rollupFunc, timeseriesLen := rssLen if iafc != nil { // Incremental aggregates require holding only GOMAXPROCS timeseries in memory. - timeseriesLen = runtime.GOMAXPROCS(-1) + timeseriesLen = cgroup.AvailableCPUs() if iafc.ae.Modifier.Op != "" { if iafc.ae.Limit > 0 { // There is an explicit limit on the number of output time series. diff --git a/app/vmselect/promql/rollup.go b/app/vmselect/promql/rollup.go index 5245c63c3..3bb5f8463 100644 --- a/app/vmselect/promql/rollup.go +++ b/app/vmselect/promql/rollup.go @@ -508,8 +508,8 @@ func (rc *rollupConfig) doInternal(dstValues []float64, tsm *timeseriesMap, valu window := rc.Window if window <= 0 { window = rc.Step - if rc.LookbackDelta > 0 && window > rc.LookbackDelta { - // Implicitly set window exceeds -search.maxStalenessInterval, so limit it to -search.maxStalenessInterval + if rc.CanDropLastSample && rc.LookbackDelta > 0 && window > rc.LookbackDelta { + // Implicitly window exceeds -search.maxStalenessInterval, so limit it to -search.maxStalenessInterval // according to https://github.com/VictoriaMetrics/VictoriaMetrics/issues/784 window = rc.LookbackDelta } diff --git a/app/vmselect/promql/rollup_test.go b/app/vmselect/promql/rollup_test.go index ae3268c7f..ad9a7b7e4 100644 --- a/app/vmselect/promql/rollup_test.go +++ b/app/vmselect/promql/rollup_test.go @@ -683,7 +683,7 @@ func TestRollupFuncsLookbackDelta(t *testing.T) { } rc.Timestamps = getTimestamps(rc.Start, rc.End, rc.Step) values := rc.Do(nil, testValues, testTimestamps) - valuesExpected := []float64{12, nan, nan, nan, 34, 34, nan} + valuesExpected := []float64{99, nan, 44, nan, 32, 34, nan} timestampsExpected := []int64{80, 90, 100, 110, 120, 130, 140} testRowsEqual(t, values, rc.Timestamps, valuesExpected, timestampsExpected) }) diff --git a/dashboards/victoriametrics.json b/dashboards/victoriametrics.json index d44e06730..2aefcda75 100644 --- a/dashboards/victoriametrics.json +++ b/dashboards/victoriametrics.json @@ -51,7 +51,7 @@ } ] }, - "description": "Overview for single node VictoriaMetrics v1.40.0 or higher", + "description": "Overview for single node VictoriaMetrics v1.48.0 or higher", "editable": true, "gnetId": 10229, "graphTooltip": 0, @@ -1295,7 +1295,7 @@ "steppedLine": false, "targets": [ { - "expr": "vm_free_disk_space_bytes{job=\"$job\", instance=\"$instance\"} / ignoring(path) ((rate(vm_rows_added_to_storage_total{job=\"$job\", instance=\"$instance\"}[1d]) - ignoring(type) rate(vm_deduplicated_samples_total{job=\"$job\", instance=\"$instance\", type=\"merge\"}[1d])) * (sum(vm_data_size_bytes{job=\"$job\", instance=\"$instance\", type!=\"indexdb\"}) / sum(vm_rows{job=\"$job\", instance=\"$instance\", type!=\"indexdb\"})))", + "expr": "vm_free_disk_space_bytes{job=\"$job\", instance=\"$instance\"} / ignoring(path) ((rate(vm_rows_added_to_storage_total{job=\"$job\", instance=\"$instance\"}[1d]) - ignoring(type) rate(vm_deduplicated_samples_total{job=\"$job\", instance=\"$instance\", type=\"merge\"}[1d])) * scalar(sum(vm_data_size_bytes{job=\"$job\", instance=\"$instance\", type!=\"indexdb\"}) / sum(vm_rows{job=\"$job\", instance=\"$instance\", type!=\"indexdb\"})))", "format": "time_series", "hide": false, "interval": "", diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index d81781eb6..52d3c722b 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -2,6 +2,19 @@ # tip +* FEATURE: allow multiple whitespace chars between measurements, fields and timestamp when parsing InfluxDB line protocol. + Though [InfluxDB line protocol](https://docs.influxdata.com/influxdb/v1.8/write_protocols/line_protocol_tutorial/) denies multiple whitespace chars between these entities, + some apps improperly put multiple whitespace chars. This workaround allows accepting data from such apps. +* FEATURE: export `vm_promscrape_active_scrapers{type=""}` metric for tracking the number of active scrapers per each service discovery type. +* FEATURE: export `vm_promscrape_scrapers_started_total{type=""}` and `vm_promscrape_scrapers_stopped_total{type=""}` metrics for tracking churn rate for scrapers + per each service discovery type. + +* BUGFIX: vmagent: fix memory leak when big number of targets is discovered via service discovery. +* BUGFIX: vmagent: properly pass `datacenter` filter to Consul API server. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/574#issuecomment-740454170 +* BUGFIX: properly handle CPU limits set on the host system or host container. The bugfix may result in lower memory usage on systems with CPU limits. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/946 +* BUGFIX: prevent from duplicate `name` tag returned from `/tags/autoComplete/tags` handler. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/942 +* BUGFIX: do not enable strict parsing for `-promscrape.config` if `-promscrape.config.dryRun` comand-line flag is set. Strict parsing can be enabled with `-promscrape.config.strictParse` command-line flag. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/944 + # [v1.49.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.49.0) diff --git a/docs/CaseStudies.md b/docs/CaseStudies.md index 3c88fb921..240cdad35 100644 --- a/docs/CaseStudies.md +++ b/docs/CaseStudies.md @@ -6,6 +6,7 @@ and feel free asking for references, reviews and additional case studies from re See also [articles about VictoriaMetrics from our users](https://github.com/VictoriaMetrics/VictoriaMetrics/wiki/Articles#third-party-articles-and-slides). +* [zhihu](#zhihu) * [adidas](#adidas) * [CERN](#cern) * [COLOPL](#colopl) @@ -19,6 +20,20 @@ See also [articles about VictoriaMetrics from our users](https://github.com/Vict * [ARNES](#arnes) * [Idealo.de](#idealode) +## zhihu + +[zhihu](https://www.zhihu.com) is the largest chinese question-and-answer website. We use VictoriaMetrics to store and use Graphite metrics, and we shared the [promate](https://github.com/zhihu/promate) solution in our [单机 20 亿指标,知乎 Graphite 极致优化!](https://qcon.infoq.cn/2020/shenzhen/presentation/2881)([slides](https://static001.geekbang.org/con/76/pdf/828698018/file/%E5%8D%95%E6%9C%BA%2020%20%E4%BA%BF%E6%8C%87%E6%A0%87%EF%BC%8C%E7%9F%A5%E4%B9%8E%20Graphite%20%E6%9E%81%E8%87%B4%E4%BC%98%E5%8C%96%EF%BC%81-%E7%86%8A%E8%B1%B9.pdf)) talk at [QCon 2020](https://qcon.infoq.cn/2020/shenzhen/). + +Numbers: + +- Active time series: ~2500 Million +- Datapoints: ~20 Trillion +- Ingestion rate: ~1800k/s +- Disk usage: ~20 TiB +- Index size: ~600 GiB +- The average query rate is ~3k per second (mostly alert queries). +- Query duration: median is ~40ms, 99th percentile is ~100ms. + ## adidas diff --git a/docs/Single-server-VictoriaMetrics.md b/docs/Single-server-VictoriaMetrics.md index 3386b7a4c..531ea7522 100644 --- a/docs/Single-server-VictoriaMetrics.md +++ b/docs/Single-server-VictoriaMetrics.md @@ -30,6 +30,7 @@ See [features available for enterprise customers](https://victoriametrics.com/en Click on a link in order to read the corresponding case study +* [zhihu](https://github.com/VictoriaMetrics/VictoriaMetrics/wiki/CaseStudies#zhihu) * [adidas](https://github.com/VictoriaMetrics/VictoriaMetrics/wiki/CaseStudies#adidas) * [CERN](https://github.com/VictoriaMetrics/VictoriaMetrics/wiki/CaseStudies#cern) * [COLOPL](https://github.com/VictoriaMetrics/VictoriaMetrics/wiki/CaseStudies#colopl) @@ -834,8 +835,9 @@ Time series data can be imported via any supported ingestion protocol: ### How to import data in native format -The most efficient protocol for importing data into VictoriaMetrics is `/api/v1/import/native`. -Example for importing data obtained via [/api/v1/export/native](#how-to-export-data-in-native-format): +The specification of VictoriaMetrics' native format may yet change and is not formally documented yet. So currently we do not recommend that external clients attempt to pack their own metrics in native format file. + +If you have a native format file obtained via [/api/v1/export/native](#how-to-export-data-in-native-format) however this is the most efficient protocol for importing data in. ```bash # Export the data from : @@ -1100,6 +1102,8 @@ VictoriaMetrics de-duplicates data points if `-dedup.minScrapeInterval` command- is set to positive duration. For example, `-dedup.minScrapeInterval=60s` would de-duplicate data points on the same time series if they fall within the same discrete 60s bucket. The earliest data point will be kept. In the case of equal timestamps, an arbitrary data point will be kept. +The recommended value for `-dedup.minScrapeInterval` must equal to `scrape_interval` config from Prometheus configs. + The de-duplication reduces disk space usage if multiple identically configured Prometheus instances in HA pair write data to the same VictoriaMetrics instance. Note that these Prometheus instances must have identical `external_labels` section in their configs, so they write data to the same time series. diff --git a/lib/cgroup/cpu.go b/lib/cgroup/cpu.go index fbfd73238..56690e86d 100644 --- a/lib/cgroup/cpu.go +++ b/lib/cgroup/cpu.go @@ -6,17 +6,21 @@ import ( "runtime" "strconv" "strings" - - "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" + "sync" ) -// UpdateGOMAXPROCSToCPUQuota updates GOMAXPROCS to cgroup CPU quota if GOMAXPROCS isn't set in environment var. -// -// This function must be called after logger.Init(). -func UpdateGOMAXPROCSToCPUQuota() { +// AvailableCPUs returns the number of available CPU cores for the app. +func AvailableCPUs() int { + availableCPUsOnce.Do(updateGOMAXPROCSToCPUQuota) + return runtime.GOMAXPROCS(-1) +} + +var availableCPUsOnce sync.Once + +// updateGOMAXPROCSToCPUQuota updates GOMAXPROCS to cgroup CPU quota if GOMAXPROCS isn't set in environment var. +func updateGOMAXPROCSToCPUQuota() { if v := os.Getenv("GOMAXPROCS"); v != "" { // Do not override explicitly set GOMAXPROCS. - logger.Infof("using GOMAXPROCS=%q set via environment variable", v) return } q := getCPUQuota() @@ -28,13 +32,11 @@ func UpdateGOMAXPROCSToCPUQuota() { numCPU := runtime.NumCPU() if gomaxprocs > numCPU { // There is no sense in setting more GOMAXPROCS than the number of available CPU cores. - logger.Infof("cgroup CPU quota=%d exceeds NumCPU=%d; using GOMAXPROCS=NumCPU", gomaxprocs, numCPU) return } if gomaxprocs <= 0 { gomaxprocs = 1 } - logger.Infof("updating GOMAXPROCS to %d according to cgroup CPU quota", gomaxprocs) runtime.GOMAXPROCS(gomaxprocs) } diff --git a/lib/ingestserver/graphite/server.go b/lib/ingestserver/graphite/server.go index ae2d9085d..8bd685954 100644 --- a/lib/ingestserver/graphite/server.go +++ b/lib/ingestserver/graphite/server.go @@ -4,12 +4,12 @@ import ( "errors" "io" "net" - "runtime" "strings" "sync" "time" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil" "github.com/VictoriaMetrics/metrics" @@ -113,7 +113,7 @@ func serveTCP(ln net.Listener, insertHandler func(r io.Reader) error) { } func serveUDP(ln net.PacketConn, insertHandler func(r io.Reader) error) { - gomaxprocs := runtime.GOMAXPROCS(-1) + gomaxprocs := cgroup.AvailableCPUs() var wg sync.WaitGroup for i := 0; i < gomaxprocs; i++ { wg.Add(1) diff --git a/lib/ingestserver/influx/server.go b/lib/ingestserver/influx/server.go index 02bc5c60e..9ed428f27 100644 --- a/lib/ingestserver/influx/server.go +++ b/lib/ingestserver/influx/server.go @@ -4,12 +4,12 @@ import ( "errors" "io" "net" - "runtime" "strings" "sync" "time" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil" "github.com/VictoriaMetrics/metrics" @@ -113,7 +113,7 @@ func serveTCP(ln net.Listener, insertHandler func(r io.Reader) error) { } func serveUDP(ln net.PacketConn, insertHandler func(r io.Reader) error) { - gomaxprocs := runtime.GOMAXPROCS(-1) + gomaxprocs := cgroup.AvailableCPUs() var wg sync.WaitGroup for i := 0; i < gomaxprocs; i++ { wg.Add(1) diff --git a/lib/ingestserver/opentsdb/server.go b/lib/ingestserver/opentsdb/server.go index c7aca100c..b25251410 100644 --- a/lib/ingestserver/opentsdb/server.go +++ b/lib/ingestserver/opentsdb/server.go @@ -5,12 +5,12 @@ import ( "io" "net" "net/http" - "runtime" "strings" "sync" "time" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/ingestserver/opentsdbhttp" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil" @@ -133,7 +133,7 @@ func serveTelnet(ln net.Listener, insertHandler func(r io.Reader) error) { } func serveUDP(ln net.PacketConn, insertHandler func(r io.Reader) error) { - gomaxprocs := runtime.GOMAXPROCS(-1) + gomaxprocs := cgroup.AvailableCPUs() var wg sync.WaitGroup for i := 0; i < gomaxprocs; i++ { wg.Add(1) diff --git a/lib/mergeset/table.go b/lib/mergeset/table.go index 4284bc623..0ab790dcb 100644 --- a/lib/mergeset/table.go +++ b/lib/mergeset/table.go @@ -6,7 +6,6 @@ import ( "io/ioutil" "os" "path/filepath" - "runtime" "sort" "strings" "sync" @@ -14,6 +13,7 @@ import ( "time" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fs" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" @@ -959,7 +959,7 @@ func (tb *Table) maxOutPartItemsSlow() uint64 { return freeSpace / uint64(mergeWorkersCount) / 4 } -var mergeWorkersCount = runtime.GOMAXPROCS(-1) +var mergeWorkersCount = cgroup.AvailableCPUs() func openParts(path string) ([]*partWrapper, error) { // The path can be missing after restoring from backup, so create it if needed. diff --git a/lib/persistentqueue/fastqueue_timing_test.go b/lib/persistentqueue/fastqueue_timing_test.go index 991846ed5..1ec46801b 100644 --- a/lib/persistentqueue/fastqueue_timing_test.go +++ b/lib/persistentqueue/fastqueue_timing_test.go @@ -2,8 +2,9 @@ package persistentqueue import ( "fmt" - "runtime" "testing" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" ) func BenchmarkFastQueueThroughputSerial(b *testing.B) { @@ -36,7 +37,7 @@ func BenchmarkFastQueueThroughputConcurrent(b *testing.B) { b.SetBytes(int64(blockSize) * iterationsCount) path := fmt.Sprintf("bench-fast-queue-throughput-concurrent-%d", blockSize) mustDeleteDir(path) - fq := MustOpenFastQueue(path, "foobar", iterationsCount*runtime.GOMAXPROCS(-1)*2, 0) + fq := MustOpenFastQueue(path, "foobar", iterationsCount*cgroup.AvailableCPUs()*2, 0) defer func() { fq.MustClose() mustDeleteDir(path) diff --git a/lib/promscrape/config.go b/lib/promscrape/config.go index 007ec267c..f85950818 100644 --- a/lib/promscrape/config.go +++ b/lib/promscrape/config.go @@ -7,6 +7,7 @@ import ( "net/url" "path/filepath" "strings" + "sync" "sync/atomic" "time" @@ -27,11 +28,12 @@ import ( ) var ( - strictParse = flag.Bool("promscrape.config.strictParse", false, "Whether to allow only supported fields in '-promscrape.config'. "+ - "This option may be used for errors detection in '-promscrape.config' file") + strictParse = flag.Bool("promscrape.config.strictParse", false, "Whether to allow only supported fields in -promscrape.config . "+ + "By default unsupported fields are silently skipped") dryRun = flag.Bool("promscrape.config.dryRun", false, "Checks -promscrape.config file for errors and unsupported fields and then exits. "+ "Returns non-zero exit code on parsing errors and emits these errors to stderr. "+ - "Pass -loggerLevel=ERROR if you don't need to see info messages in the output") + "See also -promscrape.config.strictParse command-line flag. "+ + "Pass -loggerLevel=ERROR if you don't need to see info messages in the output.") dropOriginalLabels = flag.Bool("promscrape.dropOriginalLabels", false, "Whether to drop original labels for scrape targets at /targets and /api/v1/targets pages. "+ "This may be needed for reducing memory usage when original labels for big number of scrape targets occupy big amounts of memory. "+ "Note that this reduces debuggability for improper per-target relabeling configs") @@ -164,7 +166,7 @@ func (cfg *Config) parse(data []byte, path string) error { func unmarshalMaybeStrict(data []byte, dst interface{}) error { data = envtemplate.Replace(data) var err error - if *strictParse || *dryRun { + if *strictParse { err = yaml.UnmarshalStrict(data, dst) } else { err = yaml.Unmarshal(data, dst) @@ -172,8 +174,8 @@ func unmarshalMaybeStrict(data []byte, dst interface{}) error { return err } -func getSWSByJob(sws []ScrapeWork) map[string][]ScrapeWork { - m := make(map[string][]ScrapeWork) +func getSWSByJob(sws []*ScrapeWork) map[string][]*ScrapeWork { + m := make(map[string][]*ScrapeWork) for _, sw := range sws { m[sw.jobNameOriginal] = append(m[sw.jobNameOriginal], sw) } @@ -181,9 +183,9 @@ func getSWSByJob(sws []ScrapeWork) map[string][]ScrapeWork { } // getKubernetesSDScrapeWork returns `kubernetes_sd_configs` ScrapeWork from cfg. -func (cfg *Config) getKubernetesSDScrapeWork(prev []ScrapeWork) []ScrapeWork { +func (cfg *Config) getKubernetesSDScrapeWork(prev []*ScrapeWork) []*ScrapeWork { swsPrevByJob := getSWSByJob(prev) - dst := make([]ScrapeWork, 0, len(prev)) + dst := make([]*ScrapeWork, 0, len(prev)) for i := range cfg.ScrapeConfigs { sc := &cfg.ScrapeConfigs[i] dstLen := len(dst) @@ -209,9 +211,9 @@ func (cfg *Config) getKubernetesSDScrapeWork(prev []ScrapeWork) []ScrapeWork { } // getOpenStackSDScrapeWork returns `openstack_sd_configs` ScrapeWork from cfg. -func (cfg *Config) getOpenStackSDScrapeWork(prev []ScrapeWork) []ScrapeWork { +func (cfg *Config) getOpenStackSDScrapeWork(prev []*ScrapeWork) []*ScrapeWork { swsPrevByJob := getSWSByJob(prev) - dst := make([]ScrapeWork, 0, len(prev)) + dst := make([]*ScrapeWork, 0, len(prev)) for i := range cfg.ScrapeConfigs { sc := &cfg.ScrapeConfigs[i] dstLen := len(dst) @@ -237,9 +239,9 @@ func (cfg *Config) getOpenStackSDScrapeWork(prev []ScrapeWork) []ScrapeWork { } // getDockerSwarmSDScrapeWork returns `dockerswarm_sd_configs` ScrapeWork from cfg. -func (cfg *Config) getDockerSwarmSDScrapeWork(prev []ScrapeWork) []ScrapeWork { +func (cfg *Config) getDockerSwarmSDScrapeWork(prev []*ScrapeWork) []*ScrapeWork { swsPrevByJob := getSWSByJob(prev) - dst := make([]ScrapeWork, 0, len(prev)) + dst := make([]*ScrapeWork, 0, len(prev)) for i := range cfg.ScrapeConfigs { sc := &cfg.ScrapeConfigs[i] dstLen := len(dst) @@ -265,9 +267,9 @@ func (cfg *Config) getDockerSwarmSDScrapeWork(prev []ScrapeWork) []ScrapeWork { } // getConsulSDScrapeWork returns `consul_sd_configs` ScrapeWork from cfg. -func (cfg *Config) getConsulSDScrapeWork(prev []ScrapeWork) []ScrapeWork { +func (cfg *Config) getConsulSDScrapeWork(prev []*ScrapeWork) []*ScrapeWork { swsPrevByJob := getSWSByJob(prev) - dst := make([]ScrapeWork, 0, len(prev)) + dst := make([]*ScrapeWork, 0, len(prev)) for i := range cfg.ScrapeConfigs { sc := &cfg.ScrapeConfigs[i] dstLen := len(dst) @@ -293,9 +295,9 @@ func (cfg *Config) getConsulSDScrapeWork(prev []ScrapeWork) []ScrapeWork { } // getEurekaSDScrapeWork returns `eureka_sd_configs` ScrapeWork from cfg. -func (cfg *Config) getEurekaSDScrapeWork(prev []ScrapeWork) []ScrapeWork { +func (cfg *Config) getEurekaSDScrapeWork(prev []*ScrapeWork) []*ScrapeWork { swsPrevByJob := getSWSByJob(prev) - dst := make([]ScrapeWork, 0, len(prev)) + dst := make([]*ScrapeWork, 0, len(prev)) for i := range cfg.ScrapeConfigs { sc := &cfg.ScrapeConfigs[i] dstLen := len(dst) @@ -321,9 +323,9 @@ func (cfg *Config) getEurekaSDScrapeWork(prev []ScrapeWork) []ScrapeWork { } // getDNSSDScrapeWork returns `dns_sd_configs` ScrapeWork from cfg. -func (cfg *Config) getDNSSDScrapeWork(prev []ScrapeWork) []ScrapeWork { +func (cfg *Config) getDNSSDScrapeWork(prev []*ScrapeWork) []*ScrapeWork { swsPrevByJob := getSWSByJob(prev) - dst := make([]ScrapeWork, 0, len(prev)) + dst := make([]*ScrapeWork, 0, len(prev)) for i := range cfg.ScrapeConfigs { sc := &cfg.ScrapeConfigs[i] dstLen := len(dst) @@ -349,9 +351,9 @@ func (cfg *Config) getDNSSDScrapeWork(prev []ScrapeWork) []ScrapeWork { } // getEC2SDScrapeWork returns `ec2_sd_configs` ScrapeWork from cfg. -func (cfg *Config) getEC2SDScrapeWork(prev []ScrapeWork) []ScrapeWork { +func (cfg *Config) getEC2SDScrapeWork(prev []*ScrapeWork) []*ScrapeWork { swsPrevByJob := getSWSByJob(prev) - dst := make([]ScrapeWork, 0, len(prev)) + dst := make([]*ScrapeWork, 0, len(prev)) for i := range cfg.ScrapeConfigs { sc := &cfg.ScrapeConfigs[i] dstLen := len(dst) @@ -377,9 +379,9 @@ func (cfg *Config) getEC2SDScrapeWork(prev []ScrapeWork) []ScrapeWork { } // getGCESDScrapeWork returns `gce_sd_configs` ScrapeWork from cfg. -func (cfg *Config) getGCESDScrapeWork(prev []ScrapeWork) []ScrapeWork { +func (cfg *Config) getGCESDScrapeWork(prev []*ScrapeWork) []*ScrapeWork { swsPrevByJob := getSWSByJob(prev) - dst := make([]ScrapeWork, 0, len(prev)) + dst := make([]*ScrapeWork, 0, len(prev)) for i := range cfg.ScrapeConfigs { sc := &cfg.ScrapeConfigs[i] dstLen := len(dst) @@ -405,19 +407,18 @@ func (cfg *Config) getGCESDScrapeWork(prev []ScrapeWork) []ScrapeWork { } // getFileSDScrapeWork returns `file_sd_configs` ScrapeWork from cfg. -func (cfg *Config) getFileSDScrapeWork(prev []ScrapeWork) []ScrapeWork { +func (cfg *Config) getFileSDScrapeWork(prev []*ScrapeWork) []*ScrapeWork { // Create a map for the previous scrape work. - swsMapPrev := make(map[string][]ScrapeWork) - for i := range prev { - sw := &prev[i] + swsMapPrev := make(map[string][]*ScrapeWork) + for _, sw := range prev { filepath := promrelabel.GetLabelValueByName(sw.Labels, "__vm_filepath") if len(filepath) == 0 { logger.Panicf("BUG: missing `__vm_filepath` label") } else { - swsMapPrev[filepath] = append(swsMapPrev[filepath], *sw) + swsMapPrev[filepath] = append(swsMapPrev[filepath], sw) } } - dst := make([]ScrapeWork, 0, len(prev)) + dst := make([]*ScrapeWork, 0, len(prev)) for i := range cfg.ScrapeConfigs { sc := &cfg.ScrapeConfigs[i] for j := range sc.FileSDConfigs { @@ -429,8 +430,8 @@ func (cfg *Config) getFileSDScrapeWork(prev []ScrapeWork) []ScrapeWork { } // getStaticScrapeWork returns `static_configs` ScrapeWork from from cfg. -func (cfg *Config) getStaticScrapeWork() []ScrapeWork { - var dst []ScrapeWork +func (cfg *Config) getStaticScrapeWork() []*ScrapeWork { + var dst []*ScrapeWork for i := range cfg.ScrapeConfigs { sc := &cfg.ScrapeConfigs[i] for j := range sc.StaticConfigs { @@ -528,7 +529,7 @@ type scrapeWorkConfig struct { streamParse bool } -func appendKubernetesScrapeWork(dst []ScrapeWork, sdc *kubernetes.SDConfig, baseDir string, swc *scrapeWorkConfig) ([]ScrapeWork, bool) { +func appendKubernetesScrapeWork(dst []*ScrapeWork, sdc *kubernetes.SDConfig, baseDir string, swc *scrapeWorkConfig) ([]*ScrapeWork, bool) { targetLabels, err := kubernetes.GetLabels(sdc, baseDir) if err != nil { logger.Errorf("error when discovering kubernetes targets for `job_name` %q: %s; skipping it", swc.jobName, err) @@ -537,7 +538,7 @@ func appendKubernetesScrapeWork(dst []ScrapeWork, sdc *kubernetes.SDConfig, base return appendScrapeWorkForTargetLabels(dst, swc, targetLabels, "kubernetes_sd_config"), true } -func appendOpenstackScrapeWork(dst []ScrapeWork, sdc *openstack.SDConfig, baseDir string, swc *scrapeWorkConfig) ([]ScrapeWork, bool) { +func appendOpenstackScrapeWork(dst []*ScrapeWork, sdc *openstack.SDConfig, baseDir string, swc *scrapeWorkConfig) ([]*ScrapeWork, bool) { targetLabels, err := openstack.GetLabels(sdc, baseDir) if err != nil { logger.Errorf("error when discovering openstack targets for `job_name` %q: %s; skipping it", swc.jobName, err) @@ -546,7 +547,7 @@ func appendOpenstackScrapeWork(dst []ScrapeWork, sdc *openstack.SDConfig, baseDi return appendScrapeWorkForTargetLabels(dst, swc, targetLabels, "openstack_sd_config"), true } -func appendDockerSwarmScrapeWork(dst []ScrapeWork, sdc *dockerswarm.SDConfig, baseDir string, swc *scrapeWorkConfig) ([]ScrapeWork, bool) { +func appendDockerSwarmScrapeWork(dst []*ScrapeWork, sdc *dockerswarm.SDConfig, baseDir string, swc *scrapeWorkConfig) ([]*ScrapeWork, bool) { targetLabels, err := dockerswarm.GetLabels(sdc, baseDir) if err != nil { logger.Errorf("error when discovering dockerswarm targets for `job_name` %q: %s; skipping it", swc.jobName, err) @@ -555,7 +556,7 @@ func appendDockerSwarmScrapeWork(dst []ScrapeWork, sdc *dockerswarm.SDConfig, ba return appendScrapeWorkForTargetLabels(dst, swc, targetLabels, "dockerswarm_sd_config"), true } -func appendConsulScrapeWork(dst []ScrapeWork, sdc *consul.SDConfig, baseDir string, swc *scrapeWorkConfig) ([]ScrapeWork, bool) { +func appendConsulScrapeWork(dst []*ScrapeWork, sdc *consul.SDConfig, baseDir string, swc *scrapeWorkConfig) ([]*ScrapeWork, bool) { targetLabels, err := consul.GetLabels(sdc, baseDir) if err != nil { logger.Errorf("error when discovering consul targets for `job_name` %q: %s; skipping it", swc.jobName, err) @@ -564,7 +565,7 @@ func appendConsulScrapeWork(dst []ScrapeWork, sdc *consul.SDConfig, baseDir stri return appendScrapeWorkForTargetLabels(dst, swc, targetLabels, "consul_sd_config"), true } -func appendEurekaScrapeWork(dst []ScrapeWork, sdc *eureka.SDConfig, baseDir string, swc *scrapeWorkConfig) ([]ScrapeWork, bool) { +func appendEurekaScrapeWork(dst []*ScrapeWork, sdc *eureka.SDConfig, baseDir string, swc *scrapeWorkConfig) ([]*ScrapeWork, bool) { targetLabels, err := eureka.GetLabels(sdc, baseDir) if err != nil { logger.Errorf("error when discovering eureka targets for `job_name` %q: %s; skipping it", swc.jobName, err) @@ -573,7 +574,7 @@ func appendEurekaScrapeWork(dst []ScrapeWork, sdc *eureka.SDConfig, baseDir stri return appendScrapeWorkForTargetLabels(dst, swc, targetLabels, "eureka_sd_config"), true } -func appendDNSScrapeWork(dst []ScrapeWork, sdc *dns.SDConfig, swc *scrapeWorkConfig) ([]ScrapeWork, bool) { +func appendDNSScrapeWork(dst []*ScrapeWork, sdc *dns.SDConfig, swc *scrapeWorkConfig) ([]*ScrapeWork, bool) { targetLabels, err := dns.GetLabels(sdc) if err != nil { logger.Errorf("error when discovering dns targets for `job_name` %q: %s; skipping it", swc.jobName, err) @@ -582,7 +583,7 @@ func appendDNSScrapeWork(dst []ScrapeWork, sdc *dns.SDConfig, swc *scrapeWorkCon return appendScrapeWorkForTargetLabels(dst, swc, targetLabels, "dns_sd_config"), true } -func appendEC2ScrapeWork(dst []ScrapeWork, sdc *ec2.SDConfig, swc *scrapeWorkConfig) ([]ScrapeWork, bool) { +func appendEC2ScrapeWork(dst []*ScrapeWork, sdc *ec2.SDConfig, swc *scrapeWorkConfig) ([]*ScrapeWork, bool) { targetLabels, err := ec2.GetLabels(sdc) if err != nil { logger.Errorf("error when discovering ec2 targets for `job_name` %q: %s; skipping it", swc.jobName, err) @@ -591,7 +592,7 @@ func appendEC2ScrapeWork(dst []ScrapeWork, sdc *ec2.SDConfig, swc *scrapeWorkCon return appendScrapeWorkForTargetLabels(dst, swc, targetLabels, "ec2_sd_config"), true } -func appendGCEScrapeWork(dst []ScrapeWork, sdc *gce.SDConfig, swc *scrapeWorkConfig) ([]ScrapeWork, bool) { +func appendGCEScrapeWork(dst []*ScrapeWork, sdc *gce.SDConfig, swc *scrapeWorkConfig) ([]*ScrapeWork, bool) { targetLabels, err := gce.GetLabels(sdc) if err != nil { logger.Errorf("error when discovering gce targets for `job_name` %q: %s; skippint it", swc.jobName, err) @@ -600,7 +601,7 @@ func appendGCEScrapeWork(dst []ScrapeWork, sdc *gce.SDConfig, swc *scrapeWorkCon return appendScrapeWorkForTargetLabels(dst, swc, targetLabels, "gce_sd_config"), true } -func appendScrapeWorkForTargetLabels(dst []ScrapeWork, swc *scrapeWorkConfig, targetLabels []map[string]string, sectionName string) []ScrapeWork { +func appendScrapeWorkForTargetLabels(dst []*ScrapeWork, swc *scrapeWorkConfig, targetLabels []map[string]string, sectionName string) []*ScrapeWork { for _, metaLabels := range targetLabels { target := metaLabels["__address__"] var err error @@ -613,7 +614,7 @@ func appendScrapeWorkForTargetLabels(dst []ScrapeWork, swc *scrapeWorkConfig, ta return dst } -func (sdc *FileSDConfig) appendScrapeWork(dst []ScrapeWork, swsMapPrev map[string][]ScrapeWork, baseDir string, swc *scrapeWorkConfig) []ScrapeWork { +func (sdc *FileSDConfig) appendScrapeWork(dst []*ScrapeWork, swsMapPrev map[string][]*ScrapeWork, baseDir string, swc *scrapeWorkConfig) []*ScrapeWork { for _, file := range sdc.Files { pathPattern := getFilepath(baseDir, file) paths := []string{pathPattern} @@ -658,7 +659,7 @@ func (sdc *FileSDConfig) appendScrapeWork(dst []ScrapeWork, swsMapPrev map[strin return dst } -func (stc *StaticConfig) appendScrapeWork(dst []ScrapeWork, swc *scrapeWorkConfig, metaLabels map[string]string) []ScrapeWork { +func (stc *StaticConfig) appendScrapeWork(dst []*ScrapeWork, swc *scrapeWorkConfig, metaLabels map[string]string) []*ScrapeWork { for _, target := range stc.Targets { if target == "" { // Do not return this error, since other targets may be valid @@ -676,12 +677,14 @@ func (stc *StaticConfig) appendScrapeWork(dst []ScrapeWork, swc *scrapeWorkConfi return dst } -func appendScrapeWork(dst []ScrapeWork, swc *scrapeWorkConfig, target string, extraLabels, metaLabels map[string]string) ([]ScrapeWork, error) { +func appendScrapeWork(dst []*ScrapeWork, swc *scrapeWorkConfig, target string, extraLabels, metaLabels map[string]string) ([]*ScrapeWork, error) { labels := mergeLabels(swc.jobName, swc.scheme, target, swc.metricsPath, extraLabels, swc.externalLabels, metaLabels, swc.params) var originalLabels []prompbmarshal.Label if !*dropOriginalLabels { originalLabels = append([]prompbmarshal.Label{}, labels...) promrelabel.SortLabels(originalLabels) + // Reduce memory usage by interning all the strings in originalLabels. + internLabelStrings(originalLabels) } labels = promrelabel.ApplyRelabelConfigs(labels, 0, swc.relabelConfigs, false) labels = promrelabel.RemoveMetaLabels(labels[:0], labels) @@ -738,7 +741,9 @@ func appendScrapeWork(dst []ScrapeWork, swc *scrapeWorkConfig, target string, ex }) promrelabel.SortLabels(labels) } - dst = append(dst, ScrapeWork{ + // Reduce memory usage by interning all the strings in labels. + internLabelStrings(labels) + dst = append(dst, &ScrapeWork{ ID: atomic.AddUint64(&nextScrapeWorkID, 1), ScrapeURL: scrapeURL, ScrapeInterval: swc.scrapeInterval, @@ -762,6 +767,35 @@ func appendScrapeWork(dst []ScrapeWork, swc *scrapeWorkConfig, target string, ex // Each ScrapeWork has an ID, which is used for locating it when updating its status. var nextScrapeWorkID uint64 +func internLabelStrings(labels []prompbmarshal.Label) { + for i := range labels { + label := &labels[i] + label.Name = internString(label.Name) + label.Value = internString(label.Value) + } +} + +func internString(s string) string { + internStringsMapLock.Lock() + defer internStringsMapLock.Unlock() + + if sInterned, ok := internStringsMap[s]; ok { + return sInterned + } + // Make a new copy for s in order to remove references from possible bigger string s refers to. + sCopy := string(append([]byte{}, s...)) + internStringsMap[sCopy] = sCopy + if len(internStringsMap) > 100e3 { + internStringsMap = make(map[string]string, 100e3) + } + return sCopy +} + +var ( + internStringsMapLock sync.Mutex + internStringsMap = make(map[string]string, 100e3) +) + func getParamsFromLabels(labels []prompbmarshal.Label, paramsOrig map[string][]string) map[string][]string { // See https://www.robustperception.io/life-of-a-label m := make(map[string][]string) diff --git a/lib/promscrape/config_test.go b/lib/promscrape/config_test.go index b215359bc..523b194d6 100644 --- a/lib/promscrape/config_test.go +++ b/lib/promscrape/config_test.go @@ -94,7 +94,7 @@ scrape_configs: } sws := cfg.getStaticScrapeWork() resetNonEssentialFields(sws) - swsExpected := []ScrapeWork{{ + swsExpected := []*ScrapeWork{{ ScrapeURL: "http://black:9115/probe?module=dns_udp_example&target=8.8.8.8", ScrapeInterval: defaultScrapeInterval, ScrapeTimeout: defaultScrapeTimeout, @@ -199,7 +199,7 @@ scrape_configs: } } -func getFileSDScrapeWork(data []byte, path string) ([]ScrapeWork, error) { +func getFileSDScrapeWork(data []byte, path string) ([]*ScrapeWork, error) { var cfg Config if err := cfg.parse(data, path); err != nil { return nil, fmt.Errorf("cannot parse data: %w", err) @@ -207,7 +207,7 @@ func getFileSDScrapeWork(data []byte, path string) ([]ScrapeWork, error) { return cfg.getFileSDScrapeWork(nil), nil } -func getStaticScrapeWork(data []byte, path string) ([]ScrapeWork, error) { +func getStaticScrapeWork(data []byte, path string) ([]*ScrapeWork, error) { var cfg Config if err := cfg.parse(data, path); err != nil { return nil, fmt.Errorf("cannot parse data: %w", err) @@ -440,7 +440,7 @@ scrape_configs: `) } -func resetNonEssentialFields(sws []ScrapeWork) { +func resetNonEssentialFields(sws []*ScrapeWork) { for i := range sws { sws[i].ID = 0 sws[i].OriginalLabels = nil @@ -448,7 +448,7 @@ func resetNonEssentialFields(sws []ScrapeWork) { } func TestGetFileSDScrapeWorkSuccess(t *testing.T) { - f := func(data string, expectedSws []ScrapeWork) { + f := func(data string, expectedSws []*ScrapeWork) { t.Helper() sws, err := getFileSDScrapeWork([]byte(data), "non-existing-file") if err != nil { @@ -457,8 +457,7 @@ func TestGetFileSDScrapeWorkSuccess(t *testing.T) { resetNonEssentialFields(sws) // Remove `__vm_filepath` label, since its value depends on the current working dir. - for i := range sws { - sw := &sws[i] + for _, sw := range sws { for j := range sw.Labels { label := &sw.Labels[j] if label.Name == "__vm_filepath" { @@ -475,14 +474,14 @@ scrape_configs: - job_name: foo static_configs: - targets: ["xxx"] -`, []ScrapeWork{}) +`, []*ScrapeWork{}) f(` scrape_configs: - job_name: foo metrics_path: /abc/de file_sd_configs: - files: ["testdata/file_sd.json", "testdata/file_sd*.yml"] -`, []ScrapeWork{ +`, []*ScrapeWork{ { ScrapeURL: "http://host1:80/abc/de", ScrapeInterval: defaultScrapeInterval, @@ -604,7 +603,7 @@ scrape_configs: } func TestGetStaticScrapeWorkSuccess(t *testing.T) { - f := func(data string, expectedSws []ScrapeWork) { + f := func(data string, expectedSws []*ScrapeWork) { t.Helper() sws, err := getStaticScrapeWork([]byte(data), "non-exsiting-file") if err != nil { @@ -621,7 +620,7 @@ scrape_configs: - job_name: foo static_configs: - targets: ["foo.bar:1234"] -`, []ScrapeWork{ +`, []*ScrapeWork{ { ScrapeURL: "http://foo.bar:1234/metrics", ScrapeInterval: defaultScrapeInterval, @@ -663,7 +662,7 @@ scrape_configs: - job_name: foo static_configs: - targets: ["foo.bar:1234"] -`, []ScrapeWork{ +`, []*ScrapeWork{ { ScrapeURL: "http://foo.bar:1234/metrics", ScrapeInterval: defaultScrapeInterval, @@ -733,7 +732,7 @@ scrape_configs: insecure_skip_verify: true static_configs: - targets: [1.2.3.4] -`, []ScrapeWork{ +`, []*ScrapeWork{ { ScrapeURL: "https://foo.bar:443/foo/bar?p=x%26y&p=%3D", ScrapeInterval: 543 * time.Second, @@ -887,7 +886,7 @@ scrape_configs: x: [keep_me] static_configs: - targets: ["foo.bar:1234", "drop-this-target"] -`, []ScrapeWork{ +`, []*ScrapeWork{ { ScrapeURL: "http://foo.bar:1234/metrics?x=keep_me", ScrapeInterval: defaultScrapeInterval, @@ -957,7 +956,7 @@ scrape_configs: replacement: b static_configs: - targets: ["foo.bar:1234"] -`, []ScrapeWork{ +`, []*ScrapeWork{ { ScrapeURL: "mailto://foo.bar:1234/abc.de?a=b", ScrapeInterval: defaultScrapeInterval, @@ -1012,7 +1011,7 @@ scrape_configs: regex: "" static_configs: - targets: ["foo.bar:1234", "xyz"] -`, []ScrapeWork{ +`, []*ScrapeWork{ { ScrapeURL: "http://foo.bar:1234/metrics", ScrapeInterval: defaultScrapeInterval, @@ -1051,7 +1050,7 @@ scrape_configs: target_label: abc static_configs: - targets: ["foo.bar:1234"] -`, []ScrapeWork{ +`, []*ScrapeWork{ { ScrapeURL: "http://foo.bar:1234/metrics", ScrapeInterval: defaultScrapeInterval, @@ -1091,7 +1090,7 @@ scrape_configs: password_file: testdata/password.txt static_configs: - targets: ["foo.bar:1234"] -`, []ScrapeWork{ +`, []*ScrapeWork{ { ScrapeURL: "http://foo.bar:1234/metrics", ScrapeInterval: defaultScrapeInterval, @@ -1130,7 +1129,7 @@ scrape_configs: bearer_token_file: testdata/password.txt static_configs: - targets: ["foo.bar:1234"] -`, []ScrapeWork{ +`, []*ScrapeWork{ { ScrapeURL: "http://foo.bar:1234/metrics", ScrapeInterval: defaultScrapeInterval, @@ -1175,7 +1174,7 @@ scrape_configs: key_file: testdata/ssl-cert-snakeoil.key static_configs: - targets: ["foo.bar:1234"] -`, []ScrapeWork{ +`, []*ScrapeWork{ { ScrapeURL: "http://foo.bar:1234/metrics", ScrapeInterval: defaultScrapeInterval, @@ -1227,7 +1226,7 @@ scrape_configs: __param_a: c __address__: pp job: yyy -`, []ScrapeWork{ +`, []*ScrapeWork{ { ScrapeURL: "http://pp:80/metrics?a=c&a=xy", ScrapeInterval: defaultScrapeInterval, @@ -1290,7 +1289,7 @@ scrape_configs: target_label: instance - target_label: __address__ replacement: 127.0.0.1:9116 # The SNMP exporter's real hostname:port. -`, []ScrapeWork{ +`, []*ScrapeWork{ { ScrapeURL: "http://127.0.0.1:9116/snmp?module=if_mib&target=192.168.1.2", ScrapeInterval: defaultScrapeInterval, @@ -1341,7 +1340,7 @@ scrape_configs: relabel_configs: - replacement: metricspath target_label: __metrics_path__ -`, []ScrapeWork{ +`, []*ScrapeWork{ { ScrapeURL: "http://foo.bar:1234/metricspath", ScrapeInterval: defaultScrapeInterval, @@ -1376,7 +1375,7 @@ scrape_configs: var defaultRegexForRelabelConfig = regexp.MustCompile("^(.*)$") -func equalStaticConfigForScrapeWorks(a, b []ScrapeWork) bool { +func equalStaticConfigForScrapeWorks(a, b []*ScrapeWork) bool { if len(a) != len(b) { return false } diff --git a/lib/promscrape/discovery/consul/watch.go b/lib/promscrape/discovery/consul/watch.go index b56da1d79..a85edccaf 100644 --- a/lib/promscrape/discovery/consul/watch.go +++ b/lib/promscrape/discovery/consul/watch.go @@ -43,7 +43,7 @@ type serviceWatcher struct { // newConsulWatcher creates new watcher and start background service discovery for Consul. func newConsulWatcher(client *discoveryutils.Client, sdc *SDConfig, datacenter string) *consulWatcher { - baseQueryArgs := "?sdc=" + url.QueryEscape(datacenter) + baseQueryArgs := "?dc=" + url.QueryEscape(datacenter) if sdc.AllowStale { baseQueryArgs += "&stale" } diff --git a/lib/promscrape/scraper.go b/lib/promscrape/scraper.go index ccb0d4aaf..0570f95e6 100644 --- a/lib/promscrape/scraper.go +++ b/lib/promscrape/scraper.go @@ -95,16 +95,16 @@ func runScraper(configFile string, pushData func(wr *prompbmarshal.WriteRequest) } scs := newScrapeConfigs(pushData) - scs.add("static_configs", 0, func(cfg *Config, swsPrev []ScrapeWork) []ScrapeWork { return cfg.getStaticScrapeWork() }) - scs.add("file_sd_configs", *fileSDCheckInterval, func(cfg *Config, swsPrev []ScrapeWork) []ScrapeWork { return cfg.getFileSDScrapeWork(swsPrev) }) - scs.add("kubernetes_sd_configs", *kubernetesSDCheckInterval, func(cfg *Config, swsPrev []ScrapeWork) []ScrapeWork { return cfg.getKubernetesSDScrapeWork(swsPrev) }) - scs.add("openstack_sd_configs", *openstackSDCheckInterval, func(cfg *Config, swsPrev []ScrapeWork) []ScrapeWork { return cfg.getOpenStackSDScrapeWork(swsPrev) }) - scs.add("consul_sd_configs", *consul.SDCheckInterval, func(cfg *Config, swsPrev []ScrapeWork) []ScrapeWork { return cfg.getConsulSDScrapeWork(swsPrev) }) - scs.add("eureka_sd_configs", *eurekaSDCheckInterval, func(cfg *Config, swsPrev []ScrapeWork) []ScrapeWork { return cfg.getEurekaSDScrapeWork(swsPrev) }) - scs.add("dns_sd_configs", *dnsSDCheckInterval, func(cfg *Config, swsPrev []ScrapeWork) []ScrapeWork { return cfg.getDNSSDScrapeWork(swsPrev) }) - scs.add("ec2_sd_configs", *ec2SDCheckInterval, func(cfg *Config, swsPrev []ScrapeWork) []ScrapeWork { return cfg.getEC2SDScrapeWork(swsPrev) }) - scs.add("gce_sd_configs", *gceSDCheckInterval, func(cfg *Config, swsPrev []ScrapeWork) []ScrapeWork { return cfg.getGCESDScrapeWork(swsPrev) }) - scs.add("dockerswarm_sd_configs", *dockerswarmSDCheckInterval, func(cfg *Config, swsPrev []ScrapeWork) []ScrapeWork { return cfg.getDockerSwarmSDScrapeWork(swsPrev) }) + scs.add("static_configs", 0, func(cfg *Config, swsPrev []*ScrapeWork) []*ScrapeWork { return cfg.getStaticScrapeWork() }) + scs.add("file_sd_configs", *fileSDCheckInterval, func(cfg *Config, swsPrev []*ScrapeWork) []*ScrapeWork { return cfg.getFileSDScrapeWork(swsPrev) }) + scs.add("kubernetes_sd_configs", *kubernetesSDCheckInterval, func(cfg *Config, swsPrev []*ScrapeWork) []*ScrapeWork { return cfg.getKubernetesSDScrapeWork(swsPrev) }) + scs.add("openstack_sd_configs", *openstackSDCheckInterval, func(cfg *Config, swsPrev []*ScrapeWork) []*ScrapeWork { return cfg.getOpenStackSDScrapeWork(swsPrev) }) + scs.add("consul_sd_configs", *consul.SDCheckInterval, func(cfg *Config, swsPrev []*ScrapeWork) []*ScrapeWork { return cfg.getConsulSDScrapeWork(swsPrev) }) + scs.add("eureka_sd_configs", *eurekaSDCheckInterval, func(cfg *Config, swsPrev []*ScrapeWork) []*ScrapeWork { return cfg.getEurekaSDScrapeWork(swsPrev) }) + scs.add("dns_sd_configs", *dnsSDCheckInterval, func(cfg *Config, swsPrev []*ScrapeWork) []*ScrapeWork { return cfg.getDNSSDScrapeWork(swsPrev) }) + scs.add("ec2_sd_configs", *ec2SDCheckInterval, func(cfg *Config, swsPrev []*ScrapeWork) []*ScrapeWork { return cfg.getEC2SDScrapeWork(swsPrev) }) + scs.add("gce_sd_configs", *gceSDCheckInterval, func(cfg *Config, swsPrev []*ScrapeWork) []*ScrapeWork { return cfg.getGCESDScrapeWork(swsPrev) }) + scs.add("dockerswarm_sd_configs", *dockerswarmSDCheckInterval, func(cfg *Config, swsPrev []*ScrapeWork) []*ScrapeWork { return cfg.getDockerSwarmSDScrapeWork(swsPrev) }) sighupCh := procutil.NewSighupChan() @@ -171,7 +171,7 @@ func newScrapeConfigs(pushData func(wr *prompbmarshal.WriteRequest)) *scrapeConf } } -func (scs *scrapeConfigs) add(name string, checkInterval time.Duration, getScrapeWork func(cfg *Config, swsPrev []ScrapeWork) []ScrapeWork) { +func (scs *scrapeConfigs) add(name string, checkInterval time.Duration, getScrapeWork func(cfg *Config, swsPrev []*ScrapeWork) []*ScrapeWork) { atomic.AddInt32(&PendingScrapeConfigs, 1) scfg := &scrapeConfig{ name: name, @@ -204,7 +204,7 @@ func (scs *scrapeConfigs) stop() { type scrapeConfig struct { name string pushData func(wr *prompbmarshal.WriteRequest) - getScrapeWork func(cfg *Config, swsPrev []ScrapeWork) []ScrapeWork + getScrapeWork func(cfg *Config, swsPrev []*ScrapeWork) []*ScrapeWork checkInterval time.Duration cfgCh chan *Config stopCh <-chan struct{} @@ -222,7 +222,7 @@ func (scfg *scrapeConfig) run() { } cfg := <-scfg.cfgCh - var swsPrev []ScrapeWork + var swsPrev []*ScrapeWork updateScrapeWork := func(cfg *Config) { sws := scfg.getScrapeWork(cfg, swsPrev) sg.update(sws) @@ -244,20 +244,28 @@ func (scfg *scrapeConfig) run() { } type scraperGroup struct { - name string - wg sync.WaitGroup - mLock sync.Mutex - m map[string]*scraper - pushData func(wr *prompbmarshal.WriteRequest) - changesCount *metrics.Counter + name string + wg sync.WaitGroup + mLock sync.Mutex + m map[string]*scraper + pushData func(wr *prompbmarshal.WriteRequest) + + changesCount *metrics.Counter + activeScrapers *metrics.Counter + scrapersStarted *metrics.Counter + scrapersStopped *metrics.Counter } func newScraperGroup(name string, pushData func(wr *prompbmarshal.WriteRequest)) *scraperGroup { sg := &scraperGroup{ - name: name, - m: make(map[string]*scraper), - pushData: pushData, - changesCount: metrics.NewCounter(fmt.Sprintf(`vm_promscrape_config_changes_total{type=%q}`, name)), + name: name, + m: make(map[string]*scraper), + pushData: pushData, + + changesCount: metrics.NewCounter(fmt.Sprintf(`vm_promscrape_config_changes_total{type=%q}`, name)), + activeScrapers: metrics.NewCounter(fmt.Sprintf(`vm_promscrape_active_scrapers{type=%q}`, name)), + scrapersStarted: metrics.NewCounter(fmt.Sprintf(`vm_promscrape_scrapers_started_total{type=%q}`, name)), + scrapersStopped: metrics.NewCounter(fmt.Sprintf(`vm_promscrape_scrapers_stopped_total{type=%q}`, name)), } metrics.NewGauge(fmt.Sprintf(`vm_promscrape_targets{type=%q, status="up"}`, name), func() float64 { return float64(tsmGlobal.StatusByGroup(sg.name, true)) @@ -278,15 +286,14 @@ func (sg *scraperGroup) stop() { sg.wg.Wait() } -func (sg *scraperGroup) update(sws []ScrapeWork) { +func (sg *scraperGroup) update(sws []*ScrapeWork) { sg.mLock.Lock() defer sg.mLock.Unlock() additionsCount := 0 deletionsCount := 0 swsMap := make(map[string][]prompbmarshal.Label, len(sws)) - for i := range sws { - sw := &sws[i] + for _, sw := range sws { key := sw.key() originalLabels := swsMap[key] if originalLabels != nil { @@ -308,11 +315,15 @@ func (sg *scraperGroup) update(sws []ScrapeWork) { // Start a scraper for the missing key. sc := newScraper(sw, sg.name, sg.pushData) + sg.activeScrapers.Inc() + sg.scrapersStarted.Inc() sg.wg.Add(1) go func() { defer sg.wg.Done() sc.sw.run(sc.stopCh) tsmGlobal.Unregister(sw) + sg.activeScrapers.Dec() + sg.scrapersStopped.Inc() }() tsmGlobal.Register(sw) sg.m[key] = sc diff --git a/lib/promscrape/scrapework.go b/lib/promscrape/scrapework.go index 864b2cbe0..58392cd93 100644 --- a/lib/promscrape/scrapework.go +++ b/lib/promscrape/scrapework.go @@ -221,7 +221,9 @@ func (sw *scrapeWork) run(stopCh <-chan struct{}) { func (sw *scrapeWork) logError(s string) { if !*suppressScrapeErrors { - logger.ErrorfSkipframes(1, "error when scraping %q from job %q with labels %s: %s", sw.Config.ScrapeURL, sw.Config.Job(), sw.Config.LabelsString(), s) + logger.ErrorfSkipframes(1, "error when scraping %q from job %q with labels %s: %s; "+ + "scrape errors can be disabled by -promscrape.suppressScrapeErrors command-line flag", + sw.Config.ScrapeURL, sw.Config.Job(), sw.Config.LabelsString(), s) } } diff --git a/lib/promscrape/statconn.go b/lib/promscrape/statconn.go index d62cefb57..73c6dcc94 100644 --- a/lib/promscrape/statconn.go +++ b/lib/promscrape/statconn.go @@ -19,6 +19,9 @@ func statStdDial(ctx context.Context, network, addr string) (net.Conn, error) { dialsTotal.Inc() if err != nil { dialErrors.Inc() + if !netutil.TCP6Enabled() { + err = fmt.Errorf("%w; try -enableTCP6 command-line flag if you scrape ipv6 addresses", err) + } return nil, err } conns.Inc() @@ -54,7 +57,7 @@ func statDial(addr string) (conn net.Conn, err error) { if err != nil { dialErrors.Inc() if !netutil.TCP6Enabled() { - err = fmt.Errorf("%w; try -enableTCP6 command-line flag", err) + err = fmt.Errorf("%w; try -enableTCP6 command-line flag if you scrape ipv6 addresses", err) } return nil, err } diff --git a/lib/promscrape/targetstatus.go b/lib/promscrape/targetstatus.go index 6479d581c..0a75f00e4 100644 --- a/lib/promscrape/targetstatus.go +++ b/lib/promscrape/targetstatus.go @@ -46,24 +46,24 @@ func WriteAPIV1Targets(w io.Writer, state string) { type targetStatusMap struct { mu sync.Mutex - m map[uint64]targetStatus + m map[uint64]*targetStatus } func newTargetStatusMap() *targetStatusMap { return &targetStatusMap{ - m: make(map[uint64]targetStatus), + m: make(map[uint64]*targetStatus), } } func (tsm *targetStatusMap) Reset() { tsm.mu.Lock() - tsm.m = make(map[uint64]targetStatus) + tsm.m = make(map[uint64]*targetStatus) tsm.mu.Unlock() } func (tsm *targetStatusMap) Register(sw *ScrapeWork) { tsm.mu.Lock() - tsm.m[sw.ID] = targetStatus{ + tsm.m[sw.ID] = &targetStatus{ sw: *sw, } tsm.mu.Unlock() @@ -77,7 +77,7 @@ func (tsm *targetStatusMap) Unregister(sw *ScrapeWork) { func (tsm *targetStatusMap) Update(sw *ScrapeWork, group string, up bool, scrapeTime, scrapeDuration int64, err error) { tsm.mu.Lock() - tsm.m[sw.ID] = targetStatus{ + tsm.m[sw.ID] = &targetStatus{ sw: *sw, up: up, scrapeGroup: group, @@ -114,7 +114,7 @@ func (tsm *targetStatusMap) WriteActiveTargetsJSON(w io.Writer) { key := promLabelsString(st.sw.OriginalLabels) kss = append(kss, keyStatus{ key: key, - st: st, + st: *st, }) } tsm.mu.Unlock() @@ -167,7 +167,7 @@ func (tsm *targetStatusMap) WriteHumanReadable(w io.Writer, showOriginalLabels b tsm.mu.Lock() for _, st := range tsm.m { job := st.sw.Job() - byJob[job] = append(byJob[job], st) + byJob[job] = append(byJob[job], *st) } tsm.mu.Unlock() @@ -245,7 +245,6 @@ type droppedTarget struct { } func (dt *droppedTargets) Register(originalLabels []prompbmarshal.Label) { - key := promLabelsString(originalLabels) currentTime := fasttime.UnixTimestamp() dt.mu.Lock() diff --git a/lib/protoparser/common/unmarshal_work.go b/lib/protoparser/common/unmarshal_work.go index ac16e289d..049644f15 100644 --- a/lib/protoparser/common/unmarshal_work.go +++ b/lib/protoparser/common/unmarshal_work.go @@ -1,9 +1,9 @@ package common import ( - "runtime" "sync" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" ) @@ -25,7 +25,7 @@ func StartUnmarshalWorkers() { if unmarshalWorkCh != nil { logger.Panicf("BUG: it looks like startUnmarshalWorkers() has been alread called without stopUnmarshalWorkers()") } - gomaxprocs := runtime.GOMAXPROCS(-1) + gomaxprocs := cgroup.AvailableCPUs() unmarshalWorkCh = make(chan UnmarshalWork, 2*gomaxprocs) unmarshalWorkersWG.Add(gomaxprocs) for i := 0; i < gomaxprocs; i++ { diff --git a/lib/protoparser/csvimport/streamparser.go b/lib/protoparser/csvimport/streamparser.go index 0b318775a..918f40baf 100644 --- a/lib/protoparser/csvimport/streamparser.go +++ b/lib/protoparser/csvimport/streamparser.go @@ -6,11 +6,11 @@ import ( "fmt" "io" "net/http" - "runtime" "sync" "time" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" "github.com/VictoriaMetrics/metrics" ) @@ -142,7 +142,7 @@ func putStreamContext(ctx *streamContext) { } var streamContextPool sync.Pool -var streamContextPoolCh = make(chan *streamContext, runtime.GOMAXPROCS(-1)) +var streamContextPoolCh = make(chan *streamContext, cgroup.AvailableCPUs()) type unmarshalWork struct { rows Rows diff --git a/lib/protoparser/graphite/streamparser.go b/lib/protoparser/graphite/streamparser.go index 26b490162..93ac45394 100644 --- a/lib/protoparser/graphite/streamparser.go +++ b/lib/protoparser/graphite/streamparser.go @@ -5,11 +5,11 @@ import ( "flag" "fmt" "io" - "runtime" "sync" "time" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" "github.com/VictoriaMetrics/metrics" @@ -127,7 +127,7 @@ func putStreamContext(ctx *streamContext) { } var streamContextPool sync.Pool -var streamContextPoolCh = make(chan *streamContext, runtime.GOMAXPROCS(-1)) +var streamContextPoolCh = make(chan *streamContext, cgroup.AvailableCPUs()) type unmarshalWork struct { rows Rows diff --git a/lib/protoparser/influx/parser.go b/lib/protoparser/influx/parser.go index e08d42ee2..0e8680697 100644 --- a/lib/protoparser/influx/parser.go +++ b/lib/protoparser/influx/parser.go @@ -69,7 +69,7 @@ func (r *Row) unmarshal(s string, tagsPool []Tag, fieldsPool []Field, noEscapeCh return tagsPool, fieldsPool, fmt.Errorf("cannot find Whitespace I in %q", s) } measurementTags := s[:n] - s = s[n+1:] + s = stripLeadingWhitespace(s[n+1:]) // Parse measurement and tags var err error @@ -110,7 +110,7 @@ func (r *Row) unmarshal(s string, tagsPool []Tag, fieldsPool []Field, noEscapeCh return tagsPool, fieldsPool, err } r.Fields = fieldsPool[fieldsStart:] - s = s[n+1:] + s = stripLeadingWhitespace(s[n+1:]) // Parse timestamp timestamp, err := fastfloat.ParseInt64(s) @@ -409,3 +409,10 @@ func isInQuote(s string, noEscapeChars bool) bool { s = s[n+1:] } } + +func stripLeadingWhitespace(s string) string { + for len(s) > 0 && s[0] == ' ' { + s = s[1:] + } + return s +} diff --git a/lib/protoparser/influx/parser_test.go b/lib/protoparser/influx/parser_test.go index 6d75d04c0..50c625b9f 100644 --- a/lib/protoparser/influx/parser_test.go +++ b/lib/protoparser/influx/parser_test.go @@ -468,6 +468,36 @@ func TestRowsUnmarshalSuccess(t *testing.T) { }, }) + // Superfluous whitespace between tags, fields and timestamps. + f(`cpu_utilization,host=mnsbook-pro.local value=119.8 1607222595591`, &Rows{ + Rows: []Row{{ + Measurement: "cpu_utilization", + Tags: []Tag{{ + Key: "host", + Value: "mnsbook-pro.local", + }}, + Fields: []Field{{ + Key: "value", + Value: 119.8, + }}, + Timestamp: 1607222595591, + }}, + }) + f(`cpu_utilization,host=mnsbook-pro.local value=119.8 1607222595591`, &Rows{ + Rows: []Row{{ + Measurement: "cpu_utilization", + Tags: []Tag{{ + Key: "host", + Value: "mnsbook-pro.local", + }}, + Fields: []Field{{ + Key: "value", + Value: 119.8, + }}, + Timestamp: 1607222595591, + }}, + }) + f("x,y=z,g=p:\\ \\ 5432\\,\\ gp\\ mon\\ [lol]\\ con10\\ cmd5\\ SELECT f=1", &Rows{ Rows: []Row{{ Measurement: "x", diff --git a/lib/protoparser/influx/streamparser.go b/lib/protoparser/influx/streamparser.go index fc6bc0215..3da3858b8 100644 --- a/lib/protoparser/influx/streamparser.go +++ b/lib/protoparser/influx/streamparser.go @@ -5,11 +5,11 @@ import ( "flag" "fmt" "io" - "runtime" "sync" "time" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" "github.com/VictoriaMetrics/metrics" @@ -155,7 +155,7 @@ func putStreamContext(ctx *streamContext) { } var streamContextPool sync.Pool -var streamContextPoolCh = make(chan *streamContext, runtime.GOMAXPROCS(-1)) +var streamContextPoolCh = make(chan *streamContext, cgroup.AvailableCPUs()) type unmarshalWork struct { rows Rows diff --git a/lib/protoparser/opentsdb/streamparser.go b/lib/protoparser/opentsdb/streamparser.go index aaab8da92..14a476e22 100644 --- a/lib/protoparser/opentsdb/streamparser.go +++ b/lib/protoparser/opentsdb/streamparser.go @@ -5,11 +5,11 @@ import ( "flag" "fmt" "io" - "runtime" "sync" "time" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" "github.com/VictoriaMetrics/metrics" @@ -126,7 +126,7 @@ func putStreamContext(ctx *streamContext) { } var streamContextPool sync.Pool -var streamContextPoolCh = make(chan *streamContext, runtime.GOMAXPROCS(-1)) +var streamContextPoolCh = make(chan *streamContext, cgroup.AvailableCPUs()) type unmarshalWork struct { rows Rows diff --git a/lib/protoparser/opentsdbhttp/streamparser.go b/lib/protoparser/opentsdbhttp/streamparser.go index ad2d87dd8..edad74dd2 100644 --- a/lib/protoparser/opentsdbhttp/streamparser.go +++ b/lib/protoparser/opentsdbhttp/streamparser.go @@ -6,11 +6,11 @@ import ( "fmt" "io" "net/http" - "runtime" "sync" "time" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" @@ -149,7 +149,7 @@ func putStreamContext(ctx *streamContext) { } var streamContextPool sync.Pool -var streamContextPoolCh = make(chan *streamContext, runtime.GOMAXPROCS(-1)) +var streamContextPoolCh = make(chan *streamContext, cgroup.AvailableCPUs()) func getRows() *Rows { v := rowsPool.Get() diff --git a/lib/protoparser/prometheus/streamparser.go b/lib/protoparser/prometheus/streamparser.go index 2c6aa46c0..5e7563a29 100644 --- a/lib/protoparser/prometheus/streamparser.go +++ b/lib/protoparser/prometheus/streamparser.go @@ -4,11 +4,11 @@ import ( "bufio" "fmt" "io" - "runtime" "sync" "time" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" "github.com/VictoriaMetrics/metrics" ) @@ -128,7 +128,7 @@ func putStreamContext(ctx *streamContext) { } var streamContextPool sync.Pool -var streamContextPoolCh = make(chan *streamContext, runtime.GOMAXPROCS(-1)) +var streamContextPoolCh = make(chan *streamContext, cgroup.AvailableCPUs()) type unmarshalWork struct { rows Rows diff --git a/lib/protoparser/promremotewrite/streamparser.go b/lib/protoparser/promremotewrite/streamparser.go index ad447f979..6adb6ae77 100644 --- a/lib/protoparser/promremotewrite/streamparser.go +++ b/lib/protoparser/promremotewrite/streamparser.go @@ -5,10 +5,10 @@ import ( "fmt" "io" "net/http" - "runtime" "sync" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb" @@ -123,7 +123,7 @@ func putPushCtx(ctx *pushCtx) { } var pushCtxPool sync.Pool -var pushCtxPoolCh = make(chan *pushCtx, runtime.GOMAXPROCS(-1)) +var pushCtxPoolCh = make(chan *pushCtx, cgroup.AvailableCPUs()) func getWriteRequest() *prompb.WriteRequest { v := writeRequestPool.Get() diff --git a/lib/protoparser/vmimport/streamparser.go b/lib/protoparser/vmimport/streamparser.go index 52063ae2d..183b316e5 100644 --- a/lib/protoparser/vmimport/streamparser.go +++ b/lib/protoparser/vmimport/streamparser.go @@ -5,10 +5,10 @@ import ( "fmt" "io" "net/http" - "runtime" "sync" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" "github.com/VictoriaMetrics/metrics" @@ -132,7 +132,7 @@ func putStreamContext(ctx *streamContext) { } var streamContextPool sync.Pool -var streamContextPoolCh = make(chan *streamContext, runtime.GOMAXPROCS(-1)) +var streamContextPoolCh = make(chan *streamContext, cgroup.AvailableCPUs()) type unmarshalWork struct { rows Rows diff --git a/lib/storage/partition.go b/lib/storage/partition.go index 82ea56617..70b0d0b7b 100644 --- a/lib/storage/partition.go +++ b/lib/storage/partition.go @@ -7,7 +7,6 @@ import ( "math/bits" "os" "path/filepath" - "runtime" "sort" "strings" "sync" @@ -16,6 +15,7 @@ import ( "unsafe" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fs" @@ -77,7 +77,7 @@ const finalPartsToMerge = 3 // The number of shards for rawRow entries per partition. // // Higher number of shards reduces CPU contention and increases the max bandwidth on multi-core systems. -var rawRowsShardsPerPartition = (runtime.GOMAXPROCS(-1) + 7) / 8 +var rawRowsShardsPerPartition = (cgroup.AvailableCPUs() + 7) / 8 // getMaxRowsPerPartition returns the maximum number of rows that haven't been converted into parts yet. func getMaxRawRowsPerPartition() int { @@ -882,8 +882,8 @@ func hasActiveMerges(pws []*partWrapper) bool { } var ( - bigMergeWorkersCount = (runtime.GOMAXPROCS(-1) + 1) / 2 - smallMergeWorkersCount = (runtime.GOMAXPROCS(-1) + 1) / 2 + bigMergeWorkersCount = (cgroup.AvailableCPUs() + 1) / 2 + smallMergeWorkersCount = (cgroup.AvailableCPUs() + 1) / 2 ) // SetBigMergeWorkersCount sets the maximum number of concurrent mergers for big blocks. diff --git a/lib/storage/storage.go b/lib/storage/storage.go index 07d8686b0..3cef89e86 100644 --- a/lib/storage/storage.go +++ b/lib/storage/storage.go @@ -8,13 +8,13 @@ import ( "os" "path/filepath" "regexp" - "runtime" "sort" "sync" "sync/atomic" "time" "unsafe" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fs" @@ -873,7 +873,7 @@ var ( // Limit the concurrency for TSID searches to GOMAXPROCS*2, since this operation // is CPU bound and sometimes disk IO bound, so there is no sense in running more // than GOMAXPROCS*2 concurrent goroutines for TSID searches. - searchTSIDsConcurrencyCh = make(chan struct{}, runtime.GOMAXPROCS(-1)*2) + searchTSIDsConcurrencyCh = make(chan struct{}, cgroup.AvailableCPUs()*2) ) // prefetchMetricNames pre-fetches metric names for the given tsids into metricID->metricName cache. @@ -1149,7 +1149,7 @@ var ( // Limit the concurrency for data ingestion to GOMAXPROCS, since this operation // is CPU bound, so there is no sense in running more than GOMAXPROCS concurrent // goroutines on data ingestion path. - addRowsConcurrencyCh = make(chan struct{}, runtime.GOMAXPROCS(-1)) + addRowsConcurrencyCh = make(chan struct{}, cgroup.AvailableCPUs()) addRowsTimeout = 30 * time.Second ) diff --git a/lib/storage/table_search_timing_test.go b/lib/storage/table_search_timing_test.go index c45c9b334..927b54c40 100644 --- a/lib/storage/table_search_timing_test.go +++ b/lib/storage/table_search_timing_test.go @@ -4,11 +4,12 @@ import ( "fmt" "math/rand" "os" - "runtime" "sync" "sync/atomic" "testing" "time" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" ) func TestMain(m *testing.M) { @@ -79,7 +80,7 @@ func createBenchTable(b *testing.B, path string, startTimestamp int64, rowsPerIn timestamp := uint64(startTimestamp) var wg sync.WaitGroup - for k := 0; k < runtime.GOMAXPROCS(-1); k++ { + for k := 0; k < cgroup.AvailableCPUs(); k++ { wg.Add(1) go func() { rows := make([]rawRow, rowsPerInsert) diff --git a/lib/storage/table_timing_test.go b/lib/storage/table_timing_test.go index ed1ca8bde..bd88dd247 100644 --- a/lib/storage/table_timing_test.go +++ b/lib/storage/table_timing_test.go @@ -4,9 +4,10 @@ import ( "fmt" "math/rand" "os" - "runtime" "testing" "time" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" ) func BenchmarkTableAddRows(b *testing.B) { @@ -57,7 +58,7 @@ func benchmarkTableAddRows(b *testing.B, rowsPerInsert, tsidsCount int) { close(workCh) doneCh := make(chan struct{}) - gomaxprocs := runtime.GOMAXPROCS(-1) + gomaxprocs := cgroup.AvailableCPUs() for j := 0; j < gomaxprocs; j++ { go func(goroutineID int) { diff --git a/lib/workingsetcache/cache.go b/lib/workingsetcache/cache.go index 87b11ecd9..c21a331bd 100644 --- a/lib/workingsetcache/cache.go +++ b/lib/workingsetcache/cache.go @@ -1,11 +1,11 @@ package workingsetcache import ( - "runtime" "sync" "sync/atomic" "time" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/fastcache" ) @@ -198,7 +198,7 @@ func (c *Cache) cacheSizeWatcher(maxBytes int) { // Save safes the cache to filePath. func (c *Cache) Save(filePath string) error { curr := c.curr.Load().(*fastcache.Cache) - concurrency := runtime.GOMAXPROCS(-1) + concurrency := cgroup.AvailableCPUs() return curr.SaveToFileConcurrent(filePath, concurrency) } diff --git a/lib/writeconcurrencylimiter/concurrencylimiter.go b/lib/writeconcurrencylimiter/concurrencylimiter.go index e1e9f4ab5..a91fa686d 100644 --- a/lib/writeconcurrencylimiter/concurrencylimiter.go +++ b/lib/writeconcurrencylimiter/concurrencylimiter.go @@ -4,16 +4,16 @@ import ( "flag" "fmt" "net/http" - "runtime" "time" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver" "github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool" "github.com/VictoriaMetrics/metrics" ) var ( - maxConcurrentInserts = flag.Int("maxConcurrentInserts", runtime.GOMAXPROCS(-1)*4, "The maximum number of concurrent inserts. Default value should work for most cases, "+ + maxConcurrentInserts = flag.Int("maxConcurrentInserts", cgroup.AvailableCPUs()*4, "The maximum number of concurrent inserts. Default value should work for most cases, "+ "since it minimizes the overhead for concurrent inserts. This option is tigthly coupled with -insert.maxQueueDuration") maxQueueDuration = flag.Duration("insert.maxQueueDuration", time.Minute, "The maximum duration for waiting in the queue for insert requests due to -maxConcurrentInserts") )