Merge branch 'public-single-node' into pmm-6401-read-prometheus-data-files

This commit is contained in:
Aliaksandr Valialkin 2020-12-11 12:10:21 +02:00
commit 501fd8efd9
57 changed files with 467 additions and 209 deletions

View file

@ -8,10 +8,11 @@ assignees: ''
--- ---
**Describe the bug** **Describe the bug**
A clear and concise description of what the bug is. A clear and concise description of what the bug is. It would be great updating to [the latest avaialble release](https://github.com/VictoriaMetrics/VictoriaMetrics/releases)
and verifying whether the bug is reproducible there. It is also recommended reading [troubleshooting docs](https://victoriametrics.github.io/#troubleshooting).
**To Reproduce** **To Reproduce**
Steps to reproduce the behavior Steps to reproduce the behavior.
**Expected behavior** **Expected behavior**
A clear and concise description of what you expected to happen. A clear and concise description of what you expected to happen.

120
CODE_OF_CONDUCT_RU.md Normal file
View file

@ -0,0 +1,120 @@
# Кодекс Поведения участника
## Наши обязательства
Мы, как участники, авторы и лидеры обязуемся сделать участие в сообществе
свободным от притеснений для всех, независимо от возраста, телосложения,
видимых или невидимых ограничений способности, этнической принадлежности,
половых признаков, гендерной идентичности и выражения, уровня опыта,
образования, социо-экономического статуса, национальности, внешности,
расы, религии, или сексуальной идентичности и ориентации.
Мы обещаем действовать и взаимодействовать таким образом, чтобы вносить вклад в открытое,
дружелюбное, многообразное, инклюзивное и здоровое сообщество.
## Наши стандарты
Примеры поведения, создающие условия для благоприятных взаимоотношений включают в себя:
* Проявление доброты и эмпатии к другим участникам проекта
* Уважение к чужой точке зрения и опыту
* Конструктивная критика и принятие конструктивной критики
* Принятие ответственности, принесение извинений тем, кто пострадал от наших ошибок
и извлечение уроков из опыта
* Ориентирование на то, что лучше подходит для сообщества, а не только для нас лично
Примеры неприемлемого поведения участников включают в себя:
* Использование выражений или изображений сексуального характера и нежелательное сексуальное внимание или домогательство в любой форме
* Троллинг, оскорбительные или уничижительные комментарии, переход на личности или затрагивание политических убеждений
* Публичное или приватное домогательство
* Публикация личной информации других лиц, например, физического или электронного адреса, без явного разрешения
* Иное поведение, которое обоснованно считать неуместным в профессиональной обстановке
## Обязанности
Лидеры сообщества отвечают за разъяснение и применение наших стандартов приемлемого
поведения и будут предпринимать соответствующие и честные меры по исправлению положения
в ответ на любое поведение, которое они сочтут неприемлемым, угрожающим, оскорбительным или вредным.
Лидеры сообщества обладают правом и обязанностью удалять, редактировать или отклонять
комментарии, коммиты, код, изменения в вики, вопросы и другой вклад, который не совпадает
с Кодексом Поведения, и предоставят причины принятого решения, когда сочтут нужным.
## Область применения
Данный Кодекс Поведения применим во всех во всех публичных физических и цифровых пространства сообщества,
а также когда человек официально представляет сообщество в публичных местах.
Примеры представления проекта или сообщества включают использование официальной электронной почты,
публикации в официальном аккаунте в социальных сетях,
или упоминания как представителя в онлайн или оффлайн мероприятии.
## Приведение в исполнение
О случаях домогательства, а так же оскорбительного или иного другого неприемлемого
поведения можно сообщить ответственным лидерам сообщества с помощью письма на info@victoriametrics.com
Все жалобы будут рассмотрены и расследованы оперативно и беспристрастно.
Все лидеры сообщества обязаны уважать неприкосновенность частной жизни и личную
неприкосновенность автора сообщения.
## Руководство по исполнению
Лидеры сообщества будут следовать следующим Принципам Воздействия в Сообществе,
чтобы определить последствия для тех, кого они считают виновными в нарушении данного Кодекса Поведения:
### 1. Исправление
**Общественное влияние**: Использование недопустимой лексики или другое поведение,
считающиеся непрофессиональным или нежелательным в сообществе.
**Последствия**: Личное, письменное предупреждение от лидеров сообщества,
объясняющее суть нарушения и почему такое поведение
было неуместно. Лидеры сообщества могут попросить принести публичное извинение.
### 2. Предупреждение
**Общественное влияние**: Нарушение в результате одного инцидента или серии действий.
**Последствия**: Предупреждение о последствиях в случае продолжающегося неуместного поведения.
На определенное время не допускается взаимодействие с людьми, вовлеченными в инцидент,
включая незапрошенное взаимодействие
с теми, кто обеспечивает соблюдение Кодекса. Это включает в себя избегание взаимодействия
в публичных пространствах, а так же во внешних каналах,
таких как социальные сети. Нарушение этих правил влечет за собой временный или вечный бан.
### 3. Временный бан
**Общественное влияние**: Серьёзное нарушение стандартов сообщества,
включая продолжительное неуместное поведение.
**Последствия**: Временный запрет (бан) на любое взаимодействие
или публичное общение с сообществом на определенный период времени.
На этот период не допускается публичное или личное взаимодействие с людьми,
вовлеченными в инцидент, включая незапрошенное взаимодействие
с теми, кто обеспечивает соблюдение Кодекса.
Нарушение этих правил влечет за собой вечный бан.
### 4. Вечный бан
**Общественное влияние**: Демонстрация систематических нарушений стандартов сообщества,
включая продолжающееся неуместное поведение, домогательство до отдельных лиц,
или проявление агрессии либо пренебрежительного отношения к категориям лиц.
**Последствия**: Вечный запрет на любое публичное взаимодействие с сообществом.
## Атрибуция
Данный Кодекс Поведения основан на [Кодекс Поведения участника][homepage],
версии 2.0, доступной по адресу
https://www.contributor-covenant.org/version/2/0/code_of_conduct.html.
Принципы Воздействия в Сообществе были вдохновлены [Mozilla's code of conduct
enforcement ladder](https://github.com/mozilla/diversity).
[homepage]: https://www.contributor-covenant.org
Ответы на общие вопросы о данном кодексе поведения ищите на странице FAQ:
https://www.contributor-covenant.org/faq. Переводы доступны по адресу
https://www.contributor-covenant.org/translations.

View file

@ -30,6 +30,7 @@ See [features available for enterprise customers](https://victoriametrics.com/en
Click on a link in order to read the corresponding case study Click on a link in order to read the corresponding case study
* [zhihu](https://github.com/VictoriaMetrics/VictoriaMetrics/wiki/CaseStudies#zhihu)
* [adidas](https://github.com/VictoriaMetrics/VictoriaMetrics/wiki/CaseStudies#adidas) * [adidas](https://github.com/VictoriaMetrics/VictoriaMetrics/wiki/CaseStudies#adidas)
* [CERN](https://github.com/VictoriaMetrics/VictoriaMetrics/wiki/CaseStudies#cern) * [CERN](https://github.com/VictoriaMetrics/VictoriaMetrics/wiki/CaseStudies#cern)
* [COLOPL](https://github.com/VictoriaMetrics/VictoriaMetrics/wiki/CaseStudies#colopl) * [COLOPL](https://github.com/VictoriaMetrics/VictoriaMetrics/wiki/CaseStudies#colopl)
@ -834,8 +835,9 @@ Time series data can be imported via any supported ingestion protocol:
### How to import data in native format ### How to import data in native format
The most efficient protocol for importing data into VictoriaMetrics is `/api/v1/import/native`. The specification of VictoriaMetrics' native format may yet change and is not formally documented yet. So currently we do not recommend that external clients attempt to pack their own metrics in native format file.
Example for importing data obtained via [/api/v1/export/native](#how-to-export-data-in-native-format):
If you have a native format file obtained via [/api/v1/export/native](#how-to-export-data-in-native-format) however this is the most efficient protocol for importing data in.
```bash ```bash
# Export the data from <source-victoriametrics>: # Export the data from <source-victoriametrics>:
@ -1100,6 +1102,8 @@ VictoriaMetrics de-duplicates data points if `-dedup.minScrapeInterval` command-
is set to positive duration. For example, `-dedup.minScrapeInterval=60s` would de-duplicate data points is set to positive duration. For example, `-dedup.minScrapeInterval=60s` would de-duplicate data points
on the same time series if they fall within the same discrete 60s bucket. The earliest data point will be kept. In the case of equal timestamps, an arbitrary data point will be kept. on the same time series if they fall within the same discrete 60s bucket. The earliest data point will be kept. In the case of equal timestamps, an arbitrary data point will be kept.
The recommended value for `-dedup.minScrapeInterval` must equal to `scrape_interval` config from Prometheus configs.
The de-duplication reduces disk space usage if multiple identically configured Prometheus instances in HA pair The de-duplication reduces disk space usage if multiple identically configured Prometheus instances in HA pair
write data to the same VictoriaMetrics instance. Note that these Prometheus instances must have identical write data to the same VictoriaMetrics instance. Note that these Prometheus instances must have identical
`external_labels` section in their configs, so they write data to the same time series. `external_labels` section in their configs, so they write data to the same time series.

View file

@ -11,7 +11,6 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmstorage" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmstorage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/buildinfo" "github.com/VictoriaMetrics/VictoriaMetrics/lib/buildinfo"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/envflag" "github.com/VictoriaMetrics/VictoriaMetrics/lib/envflag"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver" "github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
@ -36,7 +35,6 @@ func main() {
envflag.Parse() envflag.Parse()
buildinfo.Init() buildinfo.Init()
logger.Init() logger.Init()
cgroup.UpdateGOMAXPROCSToCPUQuota()
if promscrape.IsDryRun() { if promscrape.IsDryRun() {
*dryRun = true *dryRun = true

View file

@ -1,9 +1,9 @@
package common package common
import ( import (
"runtime"
"sync" "sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel"
) )
@ -63,4 +63,4 @@ func PutPushCtx(ctx *PushCtx) {
} }
var pushCtxPool sync.Pool var pushCtxPool sync.Pool
var pushCtxPoolCh = make(chan *PushCtx, runtime.GOMAXPROCS(-1)) var pushCtxPoolCh = make(chan *PushCtx, cgroup.AvailableCPUs())

View file

@ -4,12 +4,12 @@ import (
"flag" "flag"
"io" "io"
"net/http" "net/http"
"runtime"
"sync" "sync"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/common" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/common"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/remotewrite" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/remotewrite"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel"
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/influx" parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/influx"
@ -165,4 +165,4 @@ func putPushCtx(ctx *pushCtx) {
} }
var pushCtxPool sync.Pool var pushCtxPool sync.Pool
var pushCtxPoolCh = make(chan *pushCtx, runtime.GOMAXPROCS(-1)) var pushCtxPoolCh = make(chan *pushCtx, cgroup.AvailableCPUs())

View file

@ -21,7 +21,6 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/remotewrite" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/remotewrite"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/vmimport" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/vmimport"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/buildinfo" "github.com/VictoriaMetrics/VictoriaMetrics/lib/buildinfo"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/envflag" "github.com/VictoriaMetrics/VictoriaMetrics/lib/envflag"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver" "github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
@ -68,7 +67,6 @@ func main() {
remotewrite.InitSecretFlags() remotewrite.InitSecretFlags()
buildinfo.Init() buildinfo.Init()
logger.Init() logger.Init()
cgroup.UpdateGOMAXPROCSToCPUQuota()
if promscrape.IsDryRun() { if promscrape.IsDryRun() {
if err := promscrape.CheckConfig(); err != nil { if err := promscrape.CheckConfig(); err != nil {

View file

@ -65,6 +65,9 @@ type relabelConfigs struct {
func initLabelsGlobal() { func initLabelsGlobal() {
labelsGlobal = nil labelsGlobal = nil
for _, s := range *unparsedLabelsGlobal { for _, s := range *unparsedLabelsGlobal {
if len(s) == 0 {
continue
}
n := strings.IndexByte(s, '=') n := strings.IndexByte(s, '=')
if n < 0 { if n < 0 {
logger.Fatalf("missing '=' in `-remoteWrite.label`. It must contain label in the form `name=value`; got %q", s) logger.Fatalf("missing '=' in `-remoteWrite.label`. It must contain label in the form `name=value`; got %q", s)

View file

@ -3,10 +3,10 @@ package remotewrite
import ( import (
"flag" "flag"
"fmt" "fmt"
"runtime"
"sync" "sync"
"sync/atomic" "sync/atomic"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
@ -43,7 +43,7 @@ var allRelabelConfigs atomic.Value
// maxQueues limits the maximum value for `-remoteWrite.queues`. There is no sense in setting too high value, // maxQueues limits the maximum value for `-remoteWrite.queues`. There is no sense in setting too high value,
// since it may lead to high memory usage due to big number of buffers. // since it may lead to high memory usage due to big number of buffers.
var maxQueues = runtime.GOMAXPROCS(-1) * 4 var maxQueues = cgroup.AvailableCPUs() * 4
// InitSecretFlags must be called after flag.Parse and before any logging. // InitSecretFlags must be called after flag.Parse and before any logging.
func InitSecretFlags() { func InitSecretFlags() {

View file

@ -16,7 +16,6 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/remoteread" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/remoteread"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/remotewrite" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/remotewrite"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/buildinfo" "github.com/VictoriaMetrics/VictoriaMetrics/lib/buildinfo"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/envflag" "github.com/VictoriaMetrics/VictoriaMetrics/lib/envflag"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
@ -59,7 +58,6 @@ func main() {
envflag.Parse() envflag.Parse()
buildinfo.Init() buildinfo.Init()
logger.Init() logger.Init()
cgroup.UpdateGOMAXPROCSToCPUQuota()
if *dryRun { if *dryRun {
u, _ := url.Parse("https://victoriametrics.com/") u, _ := url.Parse("https://victoriametrics.com/")
@ -160,6 +158,9 @@ func newManager(ctx context.Context) (*manager, error) {
manager.rr = rr manager.rr = rr
for _, s := range *externalLabels { for _, s := range *externalLabels {
if len(s) == 0 {
continue
}
n := strings.IndexByte(s, '=') n := strings.IndexByte(s, '=')
if n < 0 { if n < 0 {
return nil, fmt.Errorf("missing '=' in `-label`. It must contain label in the form `name=value`; got %q", s) return nil, fmt.Errorf("missing '=' in `-label`. It must contain label in the form `name=value`; got %q", s)

View file

@ -9,7 +9,6 @@ import (
"time" "time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/buildinfo" "github.com/VictoriaMetrics/VictoriaMetrics/lib/buildinfo"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/envflag" "github.com/VictoriaMetrics/VictoriaMetrics/lib/envflag"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver" "github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
@ -28,7 +27,6 @@ func main() {
envflag.Parse() envflag.Parse()
buildinfo.Init() buildinfo.Init()
logger.Init() logger.Init()
cgroup.UpdateGOMAXPROCSToCPUQuota()
logger.Infof("starting vmauth at %q...", *httpListenAddr) logger.Infof("starting vmauth at %q...", *httpListenAddr)
startTime := time.Now() startTime := time.Now()
initAuthConfig() initAuthConfig()

View file

@ -12,7 +12,6 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/backup/fslocal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/backup/fslocal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/backup/fsnil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/backup/fsnil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/buildinfo" "github.com/VictoriaMetrics/VictoriaMetrics/lib/buildinfo"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/envflag" "github.com/VictoriaMetrics/VictoriaMetrics/lib/envflag"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
@ -40,7 +39,6 @@ func main() {
envflag.Parse() envflag.Parse()
buildinfo.Init() buildinfo.Init()
logger.Init() logger.Init()
cgroup.UpdateGOMAXPROCSToCPUQuota()
if len(*snapshotCreateURL) > 0 { if len(*snapshotCreateURL) > 0 {
logger.Infof("Snapshots enabled") logger.Infof("Snapshots enabled")

View file

@ -1,8 +1,9 @@
package common package common
import ( import (
"runtime"
"sync" "sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
) )
// GetInsertCtx returns InsertCtx from the pool. // GetInsertCtx returns InsertCtx from the pool.
@ -33,4 +34,4 @@ func PutInsertCtx(ctx *InsertCtx) {
} }
var insertCtxPool sync.Pool var insertCtxPool sync.Pool
var insertCtxPoolCh = make(chan *InsertCtx, runtime.GOMAXPROCS(-1)) var insertCtxPoolCh = make(chan *InsertCtx, cgroup.AvailableCPUs())

View file

@ -4,12 +4,12 @@ import (
"flag" "flag"
"io" "io"
"net/http" "net/http"
"runtime"
"sync" "sync"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/common" "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/common"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/relabel" "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/relabel"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/influx" parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/influx"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage" "github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
@ -175,4 +175,4 @@ func putPushCtx(ctx *pushCtx) {
} }
var pushCtxPool sync.Pool var pushCtxPool sync.Pool
var pushCtxPoolCh = make(chan *pushCtx, runtime.GOMAXPROCS(-1)) var pushCtxPoolCh = make(chan *pushCtx, cgroup.AvailableCPUs())

View file

@ -2,11 +2,11 @@ package native
import ( import (
"net/http" "net/http"
"runtime"
"sync" "sync"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/common" "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/common"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/relabel" "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/relabel"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
@ -112,4 +112,4 @@ func putPushCtx(ctx *pushCtx) {
} }
var pushCtxPool sync.Pool var pushCtxPool sync.Pool
var pushCtxPoolCh = make(chan *pushCtx, runtime.GOMAXPROCS(-1)) var pushCtxPoolCh = make(chan *pushCtx, cgroup.AvailableCPUs())

View file

@ -2,11 +2,11 @@ package vmimport
import ( import (
"net/http" "net/http"
"runtime"
"sync" "sync"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/common" "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/common"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/relabel" "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/relabel"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
@ -117,4 +117,4 @@ func putPushCtx(ctx *pushCtx) {
} }
var pushCtxPool sync.Pool var pushCtxPool sync.Pool
var pushCtxPoolCh = make(chan *pushCtx, runtime.GOMAXPROCS(-1)) var pushCtxPoolCh = make(chan *pushCtx, cgroup.AvailableCPUs())

View file

@ -9,7 +9,6 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/backup/common" "github.com/VictoriaMetrics/VictoriaMetrics/lib/backup/common"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/backup/fslocal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/backup/fslocal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/buildinfo" "github.com/VictoriaMetrics/VictoriaMetrics/lib/buildinfo"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/envflag" "github.com/VictoriaMetrics/VictoriaMetrics/lib/envflag"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
@ -33,7 +32,6 @@ func main() {
envflag.Parse() envflag.Parse()
buildinfo.Init() buildinfo.Init()
logger.Init() logger.Init()
cgroup.UpdateGOMAXPROCSToCPUQuota()
srcFS, err := newSrcFS() srcFS, err := newSrcFS()
if err != nil { if err != nil {

View file

@ -5,7 +5,6 @@ import (
"flag" "flag"
"fmt" "fmt"
"net/http" "net/http"
"runtime"
"strings" "strings"
"time" "time"
@ -15,6 +14,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/promql" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/promql"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/searchutils" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/searchutils"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmstorage" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmstorage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver" "github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
@ -32,7 +32,7 @@ var (
) )
func getDefaultMaxConcurrentRequests() int { func getDefaultMaxConcurrentRequests() int {
n := runtime.GOMAXPROCS(-1) n := cgroup.AvailableCPUs()
if n <= 4 { if n <= 4 {
n *= 2 n *= 2
} }

View file

@ -6,7 +6,6 @@ import (
"flag" "flag"
"fmt" "fmt"
"regexp" "regexp"
"runtime"
"sort" "sort"
"sync" "sync"
"sync/atomic" "sync/atomic"
@ -15,6 +14,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmstorage" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmstorage"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmstorage/promdb" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmstorage/promdb"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage" "github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
@ -180,7 +180,7 @@ func (rss *Results) RunParallel(f func(rs *Result, workerID uint) error) error {
var perQueryRowsProcessed = metrics.NewHistogram(`vm_per_query_rows_processed_count`) var perQueryRowsProcessed = metrics.NewHistogram(`vm_per_query_rows_processed_count`)
var perQuerySeriesProcessed = metrics.NewHistogram(`vm_per_query_series_processed_count`) var perQuerySeriesProcessed = metrics.NewHistogram(`vm_per_query_series_processed_count`)
var gomaxprocs = runtime.GOMAXPROCS(-1) var gomaxprocs = cgroup.AvailableCPUs()
type packedTimeseries struct { type packedTimeseries struct {
metricName string metricName string
@ -272,7 +272,7 @@ func unpackWorker() {
// unpackBatchSize is the maximum number of blocks that may be unpacked at once by a single goroutine. // unpackBatchSize is the maximum number of blocks that may be unpacked at once by a single goroutine.
// //
// This batch is needed in order to reduce contention for upackWorkCh in multi-CPU system. // This batch is needed in order to reduce contention for upackWorkCh in multi-CPU system.
var unpackBatchSize = 8 * runtime.GOMAXPROCS(-1) var unpackBatchSize = 8 * cgroup.AvailableCPUs()
// Unpack unpacks pts to dst. // Unpack unpacks pts to dst.
func (pts *packedTimeseries) Unpack(dst *Result, tbf *tmpBlocksFile, tr storage.TimeRange, fetchData bool) error { func (pts *packedTimeseries) Unpack(dst *Result, tbf *tmpBlocksFile, tr storage.TimeRange, fetchData bool) error {
@ -535,11 +535,18 @@ func GetGraphiteTags(filter string, limit int, deadline searchutils.Deadline) ([
} }
// Substitute "__name__" with "name" for Graphite compatibility // Substitute "__name__" with "name" for Graphite compatibility
for i := range labels { for i := range labels {
if labels[i] == "__name__" { if labels[i] != "__name__" {
continue
}
// Prevent from duplicate `name` tag.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/942
if hasString(labels, "name") {
labels = append(labels[:i], labels[i+1:]...)
} else {
labels[i] = "name" labels[i] = "name"
sort.Strings(labels) sort.Strings(labels)
break
} }
break
} }
if len(filter) > 0 { if len(filter) > 0 {
labels, err = applyGraphiteRegexpFilter(filter, labels) labels, err = applyGraphiteRegexpFilter(filter, labels)
@ -553,6 +560,15 @@ func GetGraphiteTags(filter string, limit int, deadline searchutils.Deadline) ([
return labels, nil return labels, nil
} }
func hasString(a []string, s string) bool {
for _, x := range a {
if x == s {
return true
}
}
return false
}
// GetLabels returns labels until the given deadline. // GetLabels returns labels until the given deadline.
func GetLabels(deadline searchutils.Deadline) ([]string, error) { func GetLabels(deadline searchutils.Deadline) ([]string, error) {
if deadline.Exceeded() { if deadline.Exceeded() {
@ -794,7 +810,7 @@ func ExportBlocks(sq *storage.SearchQuery, deadline searchutils.Deadline, f func
sr.Init(vmstorage.Storage, tfss, tr, *maxMetricsPerSearch, deadline.Deadline()) sr.Init(vmstorage.Storage, tfss, tr, *maxMetricsPerSearch, deadline.Deadline())
// Start workers that call f in parallel on available CPU cores. // Start workers that call f in parallel on available CPU cores.
gomaxprocs := runtime.GOMAXPROCS(-1) gomaxprocs := cgroup.AvailableCPUs()
workCh := make(chan *exportWork, gomaxprocs*8) workCh := make(chan *exportWork, gomaxprocs*8)
var ( var (
errGlobal error errGlobal error

View file

@ -5,7 +5,6 @@ import (
"fmt" "fmt"
"math" "math"
"net/http" "net/http"
"runtime"
"sort" "sort"
"strconv" "strconv"
"strings" "strings"
@ -17,6 +16,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/promql" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/promql"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/searchutils" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/searchutils"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding" "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
@ -147,7 +147,7 @@ func ExportCSVHandler(startTime time.Time, w http.ResponseWriter, r *http.Reques
bw := bufferedwriter.Get(w) bw := bufferedwriter.Get(w)
defer bufferedwriter.Put(bw) defer bufferedwriter.Put(bw)
resultsCh := make(chan *quicktemplate.ByteBuffer, runtime.GOMAXPROCS(-1)) resultsCh := make(chan *quicktemplate.ByteBuffer, cgroup.AvailableCPUs())
doneCh := make(chan error) doneCh := make(chan error)
go func() { go func() {
err := netstorage.ExportBlocks(sq, deadline, func(mn *storage.MetricName, b *storage.Block, tr storage.TimeRange) error { err := netstorage.ExportBlocks(sq, deadline, func(mn *storage.MetricName, b *storage.Block, tr storage.TimeRange) error {
@ -374,7 +374,7 @@ func exportHandler(w http.ResponseWriter, matches []string, start, end int64, fo
bw := bufferedwriter.Get(w) bw := bufferedwriter.Get(w)
defer bufferedwriter.Put(bw) defer bufferedwriter.Put(bw)
resultsCh := make(chan *quicktemplate.ByteBuffer, runtime.GOMAXPROCS(-1)) resultsCh := make(chan *quicktemplate.ByteBuffer, cgroup.AvailableCPUs())
doneCh := make(chan error) doneCh := make(chan error)
if !reduceMemUsage { if !reduceMemUsage {
rss, err := netstorage.ProcessSearchQuery(sq, true, deadline) rss, err := netstorage.ProcessSearchQuery(sq, true, deadline)

View file

@ -4,12 +4,12 @@ import (
"flag" "flag"
"fmt" "fmt"
"math" "math"
"runtime"
"sync" "sync"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/netstorage" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/netstorage"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/searchutils" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/searchutils"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory" "github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage" "github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
@ -555,7 +555,7 @@ func evalRollupFuncWithSubquery(ec *EvalConfig, name string, rf rollupFunc, expr
} }
func doParallel(tss []*timeseries, f func(ts *timeseries, values []float64, timestamps []int64) ([]float64, []int64)) { func doParallel(tss []*timeseries, f func(ts *timeseries, values []float64, timestamps []int64) ([]float64, []int64)) {
concurrency := runtime.GOMAXPROCS(-1) concurrency := cgroup.AvailableCPUs()
if concurrency > len(tss) { if concurrency > len(tss) {
concurrency = len(tss) concurrency = len(tss)
} }
@ -678,7 +678,7 @@ func evalRollupFuncWithMetricExpr(ec *EvalConfig, name string, rf rollupFunc,
timeseriesLen := rssLen timeseriesLen := rssLen
if iafc != nil { if iafc != nil {
// Incremental aggregates require holding only GOMAXPROCS timeseries in memory. // Incremental aggregates require holding only GOMAXPROCS timeseries in memory.
timeseriesLen = runtime.GOMAXPROCS(-1) timeseriesLen = cgroup.AvailableCPUs()
if iafc.ae.Modifier.Op != "" { if iafc.ae.Modifier.Op != "" {
if iafc.ae.Limit > 0 { if iafc.ae.Limit > 0 {
// There is an explicit limit on the number of output time series. // There is an explicit limit on the number of output time series.

View file

@ -508,8 +508,8 @@ func (rc *rollupConfig) doInternal(dstValues []float64, tsm *timeseriesMap, valu
window := rc.Window window := rc.Window
if window <= 0 { if window <= 0 {
window = rc.Step window = rc.Step
if rc.LookbackDelta > 0 && window > rc.LookbackDelta { if rc.CanDropLastSample && rc.LookbackDelta > 0 && window > rc.LookbackDelta {
// Implicitly set window exceeds -search.maxStalenessInterval, so limit it to -search.maxStalenessInterval // Implicitly window exceeds -search.maxStalenessInterval, so limit it to -search.maxStalenessInterval
// according to https://github.com/VictoriaMetrics/VictoriaMetrics/issues/784 // according to https://github.com/VictoriaMetrics/VictoriaMetrics/issues/784
window = rc.LookbackDelta window = rc.LookbackDelta
} }

View file

@ -683,7 +683,7 @@ func TestRollupFuncsLookbackDelta(t *testing.T) {
} }
rc.Timestamps = getTimestamps(rc.Start, rc.End, rc.Step) rc.Timestamps = getTimestamps(rc.Start, rc.End, rc.Step)
values := rc.Do(nil, testValues, testTimestamps) values := rc.Do(nil, testValues, testTimestamps)
valuesExpected := []float64{12, nan, nan, nan, 34, 34, nan} valuesExpected := []float64{99, nan, 44, nan, 32, 34, nan}
timestampsExpected := []int64{80, 90, 100, 110, 120, 130, 140} timestampsExpected := []int64{80, 90, 100, 110, 120, 130, 140}
testRowsEqual(t, values, rc.Timestamps, valuesExpected, timestampsExpected) testRowsEqual(t, values, rc.Timestamps, valuesExpected, timestampsExpected)
}) })

View file

@ -51,7 +51,7 @@
} }
] ]
}, },
"description": "Overview for single node VictoriaMetrics v1.40.0 or higher", "description": "Overview for single node VictoriaMetrics v1.48.0 or higher",
"editable": true, "editable": true,
"gnetId": 10229, "gnetId": 10229,
"graphTooltip": 0, "graphTooltip": 0,
@ -1295,7 +1295,7 @@
"steppedLine": false, "steppedLine": false,
"targets": [ "targets": [
{ {
"expr": "vm_free_disk_space_bytes{job=\"$job\", instance=\"$instance\"} / ignoring(path) ((rate(vm_rows_added_to_storage_total{job=\"$job\", instance=\"$instance\"}[1d]) - ignoring(type) rate(vm_deduplicated_samples_total{job=\"$job\", instance=\"$instance\", type=\"merge\"}[1d])) * (sum(vm_data_size_bytes{job=\"$job\", instance=\"$instance\", type!=\"indexdb\"}) / sum(vm_rows{job=\"$job\", instance=\"$instance\", type!=\"indexdb\"})))", "expr": "vm_free_disk_space_bytes{job=\"$job\", instance=\"$instance\"} / ignoring(path) ((rate(vm_rows_added_to_storage_total{job=\"$job\", instance=\"$instance\"}[1d]) - ignoring(type) rate(vm_deduplicated_samples_total{job=\"$job\", instance=\"$instance\", type=\"merge\"}[1d])) * scalar(sum(vm_data_size_bytes{job=\"$job\", instance=\"$instance\", type!=\"indexdb\"}) / sum(vm_rows{job=\"$job\", instance=\"$instance\", type!=\"indexdb\"})))",
"format": "time_series", "format": "time_series",
"hide": false, "hide": false,
"interval": "", "interval": "",

View file

@ -2,6 +2,19 @@
# tip # tip
* FEATURE: allow multiple whitespace chars between measurements, fields and timestamp when parsing InfluxDB line protocol.
Though [InfluxDB line protocol](https://docs.influxdata.com/influxdb/v1.8/write_protocols/line_protocol_tutorial/) denies multiple whitespace chars between these entities,
some apps improperly put multiple whitespace chars. This workaround allows accepting data from such apps.
* FEATURE: export `vm_promscrape_active_scrapers{type="<sd_type>"}` metric for tracking the number of active scrapers per each service discovery type.
* FEATURE: export `vm_promscrape_scrapers_started_total{type="<sd_type>"}` and `vm_promscrape_scrapers_stopped_total{type="<sd_type>"}` metrics for tracking churn rate for scrapers
per each service discovery type.
* BUGFIX: vmagent: fix memory leak when big number of targets is discovered via service discovery.
* BUGFIX: vmagent: properly pass `datacenter` filter to Consul API server. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/574#issuecomment-740454170
* BUGFIX: properly handle CPU limits set on the host system or host container. The bugfix may result in lower memory usage on systems with CPU limits. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/946
* BUGFIX: prevent from duplicate `name` tag returned from `/tags/autoComplete/tags` handler. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/942
* BUGFIX: do not enable strict parsing for `-promscrape.config` if `-promscrape.config.dryRun` comand-line flag is set. Strict parsing can be enabled with `-promscrape.config.strictParse` command-line flag. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/944
# [v1.49.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.49.0) # [v1.49.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.49.0)

View file

@ -6,6 +6,7 @@ and feel free asking for references, reviews and additional case studies from re
See also [articles about VictoriaMetrics from our users](https://github.com/VictoriaMetrics/VictoriaMetrics/wiki/Articles#third-party-articles-and-slides). See also [articles about VictoriaMetrics from our users](https://github.com/VictoriaMetrics/VictoriaMetrics/wiki/Articles#third-party-articles-and-slides).
* [zhihu](#zhihu)
* [adidas](#adidas) * [adidas](#adidas)
* [CERN](#cern) * [CERN](#cern)
* [COLOPL](#colopl) * [COLOPL](#colopl)
@ -19,6 +20,20 @@ See also [articles about VictoriaMetrics from our users](https://github.com/Vict
* [ARNES](#arnes) * [ARNES](#arnes)
* [Idealo.de](#idealode) * [Idealo.de](#idealode)
## zhihu
[zhihu](https://www.zhihu.com) is the largest chinese question-and-answer website. We use VictoriaMetrics to store and use Graphite metrics, and we shared the [promate](https://github.com/zhihu/promate) solution in our [单机 20 亿指标,知乎 Graphite 极致优化!](https://qcon.infoq.cn/2020/shenzhen/presentation/2881)([slides](https://static001.geekbang.org/con/76/pdf/828698018/file/%E5%8D%95%E6%9C%BA%2020%20%E4%BA%BF%E6%8C%87%E6%A0%87%EF%BC%8C%E7%9F%A5%E4%B9%8E%20Graphite%20%E6%9E%81%E8%87%B4%E4%BC%98%E5%8C%96%EF%BC%81-%E7%86%8A%E8%B1%B9.pdf)) talk at [QCon 2020](https://qcon.infoq.cn/2020/shenzhen/).
Numbers:
- Active time series: ~2500 Million
- Datapoints: ~20 Trillion
- Ingestion rate: ~1800k/s
- Disk usage: ~20 TiB
- Index size: ~600 GiB
- The average query rate is ~3k per second (mostly alert queries).
- Query duration: median is ~40ms, 99th percentile is ~100ms.
## adidas ## adidas

View file

@ -30,6 +30,7 @@ See [features available for enterprise customers](https://victoriametrics.com/en
Click on a link in order to read the corresponding case study Click on a link in order to read the corresponding case study
* [zhihu](https://github.com/VictoriaMetrics/VictoriaMetrics/wiki/CaseStudies#zhihu)
* [adidas](https://github.com/VictoriaMetrics/VictoriaMetrics/wiki/CaseStudies#adidas) * [adidas](https://github.com/VictoriaMetrics/VictoriaMetrics/wiki/CaseStudies#adidas)
* [CERN](https://github.com/VictoriaMetrics/VictoriaMetrics/wiki/CaseStudies#cern) * [CERN](https://github.com/VictoriaMetrics/VictoriaMetrics/wiki/CaseStudies#cern)
* [COLOPL](https://github.com/VictoriaMetrics/VictoriaMetrics/wiki/CaseStudies#colopl) * [COLOPL](https://github.com/VictoriaMetrics/VictoriaMetrics/wiki/CaseStudies#colopl)
@ -834,8 +835,9 @@ Time series data can be imported via any supported ingestion protocol:
### How to import data in native format ### How to import data in native format
The most efficient protocol for importing data into VictoriaMetrics is `/api/v1/import/native`. The specification of VictoriaMetrics' native format may yet change and is not formally documented yet. So currently we do not recommend that external clients attempt to pack their own metrics in native format file.
Example for importing data obtained via [/api/v1/export/native](#how-to-export-data-in-native-format):
If you have a native format file obtained via [/api/v1/export/native](#how-to-export-data-in-native-format) however this is the most efficient protocol for importing data in.
```bash ```bash
# Export the data from <source-victoriametrics>: # Export the data from <source-victoriametrics>:
@ -1100,6 +1102,8 @@ VictoriaMetrics de-duplicates data points if `-dedup.minScrapeInterval` command-
is set to positive duration. For example, `-dedup.minScrapeInterval=60s` would de-duplicate data points is set to positive duration. For example, `-dedup.minScrapeInterval=60s` would de-duplicate data points
on the same time series if they fall within the same discrete 60s bucket. The earliest data point will be kept. In the case of equal timestamps, an arbitrary data point will be kept. on the same time series if they fall within the same discrete 60s bucket. The earliest data point will be kept. In the case of equal timestamps, an arbitrary data point will be kept.
The recommended value for `-dedup.minScrapeInterval` must equal to `scrape_interval` config from Prometheus configs.
The de-duplication reduces disk space usage if multiple identically configured Prometheus instances in HA pair The de-duplication reduces disk space usage if multiple identically configured Prometheus instances in HA pair
write data to the same VictoriaMetrics instance. Note that these Prometheus instances must have identical write data to the same VictoriaMetrics instance. Note that these Prometheus instances must have identical
`external_labels` section in their configs, so they write data to the same time series. `external_labels` section in their configs, so they write data to the same time series.

View file

@ -6,17 +6,21 @@ import (
"runtime" "runtime"
"strconv" "strconv"
"strings" "strings"
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
) )
// UpdateGOMAXPROCSToCPUQuota updates GOMAXPROCS to cgroup CPU quota if GOMAXPROCS isn't set in environment var. // AvailableCPUs returns the number of available CPU cores for the app.
// func AvailableCPUs() int {
// This function must be called after logger.Init(). availableCPUsOnce.Do(updateGOMAXPROCSToCPUQuota)
func UpdateGOMAXPROCSToCPUQuota() { return runtime.GOMAXPROCS(-1)
}
var availableCPUsOnce sync.Once
// updateGOMAXPROCSToCPUQuota updates GOMAXPROCS to cgroup CPU quota if GOMAXPROCS isn't set in environment var.
func updateGOMAXPROCSToCPUQuota() {
if v := os.Getenv("GOMAXPROCS"); v != "" { if v := os.Getenv("GOMAXPROCS"); v != "" {
// Do not override explicitly set GOMAXPROCS. // Do not override explicitly set GOMAXPROCS.
logger.Infof("using GOMAXPROCS=%q set via environment variable", v)
return return
} }
q := getCPUQuota() q := getCPUQuota()
@ -28,13 +32,11 @@ func UpdateGOMAXPROCSToCPUQuota() {
numCPU := runtime.NumCPU() numCPU := runtime.NumCPU()
if gomaxprocs > numCPU { if gomaxprocs > numCPU {
// There is no sense in setting more GOMAXPROCS than the number of available CPU cores. // There is no sense in setting more GOMAXPROCS than the number of available CPU cores.
logger.Infof("cgroup CPU quota=%d exceeds NumCPU=%d; using GOMAXPROCS=NumCPU", gomaxprocs, numCPU)
return return
} }
if gomaxprocs <= 0 { if gomaxprocs <= 0 {
gomaxprocs = 1 gomaxprocs = 1
} }
logger.Infof("updating GOMAXPROCS to %d according to cgroup CPU quota", gomaxprocs)
runtime.GOMAXPROCS(gomaxprocs) runtime.GOMAXPROCS(gomaxprocs)
} }

View file

@ -4,12 +4,12 @@ import (
"errors" "errors"
"io" "io"
"net" "net"
"runtime"
"strings" "strings"
"sync" "sync"
"time" "time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil"
"github.com/VictoriaMetrics/metrics" "github.com/VictoriaMetrics/metrics"
@ -113,7 +113,7 @@ func serveTCP(ln net.Listener, insertHandler func(r io.Reader) error) {
} }
func serveUDP(ln net.PacketConn, insertHandler func(r io.Reader) error) { func serveUDP(ln net.PacketConn, insertHandler func(r io.Reader) error) {
gomaxprocs := runtime.GOMAXPROCS(-1) gomaxprocs := cgroup.AvailableCPUs()
var wg sync.WaitGroup var wg sync.WaitGroup
for i := 0; i < gomaxprocs; i++ { for i := 0; i < gomaxprocs; i++ {
wg.Add(1) wg.Add(1)

View file

@ -4,12 +4,12 @@ import (
"errors" "errors"
"io" "io"
"net" "net"
"runtime"
"strings" "strings"
"sync" "sync"
"time" "time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil"
"github.com/VictoriaMetrics/metrics" "github.com/VictoriaMetrics/metrics"
@ -113,7 +113,7 @@ func serveTCP(ln net.Listener, insertHandler func(r io.Reader) error) {
} }
func serveUDP(ln net.PacketConn, insertHandler func(r io.Reader) error) { func serveUDP(ln net.PacketConn, insertHandler func(r io.Reader) error) {
gomaxprocs := runtime.GOMAXPROCS(-1) gomaxprocs := cgroup.AvailableCPUs()
var wg sync.WaitGroup var wg sync.WaitGroup
for i := 0; i < gomaxprocs; i++ { for i := 0; i < gomaxprocs; i++ {
wg.Add(1) wg.Add(1)

View file

@ -5,12 +5,12 @@ import (
"io" "io"
"net" "net"
"net/http" "net/http"
"runtime"
"strings" "strings"
"sync" "sync"
"time" "time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/ingestserver/opentsdbhttp" "github.com/VictoriaMetrics/VictoriaMetrics/lib/ingestserver/opentsdbhttp"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil"
@ -133,7 +133,7 @@ func serveTelnet(ln net.Listener, insertHandler func(r io.Reader) error) {
} }
func serveUDP(ln net.PacketConn, insertHandler func(r io.Reader) error) { func serveUDP(ln net.PacketConn, insertHandler func(r io.Reader) error) {
gomaxprocs := runtime.GOMAXPROCS(-1) gomaxprocs := cgroup.AvailableCPUs()
var wg sync.WaitGroup var wg sync.WaitGroup
for i := 0; i < gomaxprocs; i++ { for i := 0; i < gomaxprocs; i++ {
wg.Add(1) wg.Add(1)

View file

@ -6,7 +6,6 @@ import (
"io/ioutil" "io/ioutil"
"os" "os"
"path/filepath" "path/filepath"
"runtime"
"sort" "sort"
"strings" "strings"
"sync" "sync"
@ -14,6 +13,7 @@ import (
"time" "time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
@ -959,7 +959,7 @@ func (tb *Table) maxOutPartItemsSlow() uint64 {
return freeSpace / uint64(mergeWorkersCount) / 4 return freeSpace / uint64(mergeWorkersCount) / 4
} }
var mergeWorkersCount = runtime.GOMAXPROCS(-1) var mergeWorkersCount = cgroup.AvailableCPUs()
func openParts(path string) ([]*partWrapper, error) { func openParts(path string) ([]*partWrapper, error) {
// The path can be missing after restoring from backup, so create it if needed. // The path can be missing after restoring from backup, so create it if needed.

View file

@ -2,8 +2,9 @@ package persistentqueue
import ( import (
"fmt" "fmt"
"runtime"
"testing" "testing"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
) )
func BenchmarkFastQueueThroughputSerial(b *testing.B) { func BenchmarkFastQueueThroughputSerial(b *testing.B) {
@ -36,7 +37,7 @@ func BenchmarkFastQueueThroughputConcurrent(b *testing.B) {
b.SetBytes(int64(blockSize) * iterationsCount) b.SetBytes(int64(blockSize) * iterationsCount)
path := fmt.Sprintf("bench-fast-queue-throughput-concurrent-%d", blockSize) path := fmt.Sprintf("bench-fast-queue-throughput-concurrent-%d", blockSize)
mustDeleteDir(path) mustDeleteDir(path)
fq := MustOpenFastQueue(path, "foobar", iterationsCount*runtime.GOMAXPROCS(-1)*2, 0) fq := MustOpenFastQueue(path, "foobar", iterationsCount*cgroup.AvailableCPUs()*2, 0)
defer func() { defer func() {
fq.MustClose() fq.MustClose()
mustDeleteDir(path) mustDeleteDir(path)

View file

@ -7,6 +7,7 @@ import (
"net/url" "net/url"
"path/filepath" "path/filepath"
"strings" "strings"
"sync"
"sync/atomic" "sync/atomic"
"time" "time"
@ -27,11 +28,12 @@ import (
) )
var ( var (
strictParse = flag.Bool("promscrape.config.strictParse", false, "Whether to allow only supported fields in '-promscrape.config'. "+ strictParse = flag.Bool("promscrape.config.strictParse", false, "Whether to allow only supported fields in -promscrape.config . "+
"This option may be used for errors detection in '-promscrape.config' file") "By default unsupported fields are silently skipped")
dryRun = flag.Bool("promscrape.config.dryRun", false, "Checks -promscrape.config file for errors and unsupported fields and then exits. "+ dryRun = flag.Bool("promscrape.config.dryRun", false, "Checks -promscrape.config file for errors and unsupported fields and then exits. "+
"Returns non-zero exit code on parsing errors and emits these errors to stderr. "+ "Returns non-zero exit code on parsing errors and emits these errors to stderr. "+
"Pass -loggerLevel=ERROR if you don't need to see info messages in the output") "See also -promscrape.config.strictParse command-line flag. "+
"Pass -loggerLevel=ERROR if you don't need to see info messages in the output.")
dropOriginalLabels = flag.Bool("promscrape.dropOriginalLabels", false, "Whether to drop original labels for scrape targets at /targets and /api/v1/targets pages. "+ dropOriginalLabels = flag.Bool("promscrape.dropOriginalLabels", false, "Whether to drop original labels for scrape targets at /targets and /api/v1/targets pages. "+
"This may be needed for reducing memory usage when original labels for big number of scrape targets occupy big amounts of memory. "+ "This may be needed for reducing memory usage when original labels for big number of scrape targets occupy big amounts of memory. "+
"Note that this reduces debuggability for improper per-target relabeling configs") "Note that this reduces debuggability for improper per-target relabeling configs")
@ -164,7 +166,7 @@ func (cfg *Config) parse(data []byte, path string) error {
func unmarshalMaybeStrict(data []byte, dst interface{}) error { func unmarshalMaybeStrict(data []byte, dst interface{}) error {
data = envtemplate.Replace(data) data = envtemplate.Replace(data)
var err error var err error
if *strictParse || *dryRun { if *strictParse {
err = yaml.UnmarshalStrict(data, dst) err = yaml.UnmarshalStrict(data, dst)
} else { } else {
err = yaml.Unmarshal(data, dst) err = yaml.Unmarshal(data, dst)
@ -172,8 +174,8 @@ func unmarshalMaybeStrict(data []byte, dst interface{}) error {
return err return err
} }
func getSWSByJob(sws []ScrapeWork) map[string][]ScrapeWork { func getSWSByJob(sws []*ScrapeWork) map[string][]*ScrapeWork {
m := make(map[string][]ScrapeWork) m := make(map[string][]*ScrapeWork)
for _, sw := range sws { for _, sw := range sws {
m[sw.jobNameOriginal] = append(m[sw.jobNameOriginal], sw) m[sw.jobNameOriginal] = append(m[sw.jobNameOriginal], sw)
} }
@ -181,9 +183,9 @@ func getSWSByJob(sws []ScrapeWork) map[string][]ScrapeWork {
} }
// getKubernetesSDScrapeWork returns `kubernetes_sd_configs` ScrapeWork from cfg. // getKubernetesSDScrapeWork returns `kubernetes_sd_configs` ScrapeWork from cfg.
func (cfg *Config) getKubernetesSDScrapeWork(prev []ScrapeWork) []ScrapeWork { func (cfg *Config) getKubernetesSDScrapeWork(prev []*ScrapeWork) []*ScrapeWork {
swsPrevByJob := getSWSByJob(prev) swsPrevByJob := getSWSByJob(prev)
dst := make([]ScrapeWork, 0, len(prev)) dst := make([]*ScrapeWork, 0, len(prev))
for i := range cfg.ScrapeConfigs { for i := range cfg.ScrapeConfigs {
sc := &cfg.ScrapeConfigs[i] sc := &cfg.ScrapeConfigs[i]
dstLen := len(dst) dstLen := len(dst)
@ -209,9 +211,9 @@ func (cfg *Config) getKubernetesSDScrapeWork(prev []ScrapeWork) []ScrapeWork {
} }
// getOpenStackSDScrapeWork returns `openstack_sd_configs` ScrapeWork from cfg. // getOpenStackSDScrapeWork returns `openstack_sd_configs` ScrapeWork from cfg.
func (cfg *Config) getOpenStackSDScrapeWork(prev []ScrapeWork) []ScrapeWork { func (cfg *Config) getOpenStackSDScrapeWork(prev []*ScrapeWork) []*ScrapeWork {
swsPrevByJob := getSWSByJob(prev) swsPrevByJob := getSWSByJob(prev)
dst := make([]ScrapeWork, 0, len(prev)) dst := make([]*ScrapeWork, 0, len(prev))
for i := range cfg.ScrapeConfigs { for i := range cfg.ScrapeConfigs {
sc := &cfg.ScrapeConfigs[i] sc := &cfg.ScrapeConfigs[i]
dstLen := len(dst) dstLen := len(dst)
@ -237,9 +239,9 @@ func (cfg *Config) getOpenStackSDScrapeWork(prev []ScrapeWork) []ScrapeWork {
} }
// getDockerSwarmSDScrapeWork returns `dockerswarm_sd_configs` ScrapeWork from cfg. // getDockerSwarmSDScrapeWork returns `dockerswarm_sd_configs` ScrapeWork from cfg.
func (cfg *Config) getDockerSwarmSDScrapeWork(prev []ScrapeWork) []ScrapeWork { func (cfg *Config) getDockerSwarmSDScrapeWork(prev []*ScrapeWork) []*ScrapeWork {
swsPrevByJob := getSWSByJob(prev) swsPrevByJob := getSWSByJob(prev)
dst := make([]ScrapeWork, 0, len(prev)) dst := make([]*ScrapeWork, 0, len(prev))
for i := range cfg.ScrapeConfigs { for i := range cfg.ScrapeConfigs {
sc := &cfg.ScrapeConfigs[i] sc := &cfg.ScrapeConfigs[i]
dstLen := len(dst) dstLen := len(dst)
@ -265,9 +267,9 @@ func (cfg *Config) getDockerSwarmSDScrapeWork(prev []ScrapeWork) []ScrapeWork {
} }
// getConsulSDScrapeWork returns `consul_sd_configs` ScrapeWork from cfg. // getConsulSDScrapeWork returns `consul_sd_configs` ScrapeWork from cfg.
func (cfg *Config) getConsulSDScrapeWork(prev []ScrapeWork) []ScrapeWork { func (cfg *Config) getConsulSDScrapeWork(prev []*ScrapeWork) []*ScrapeWork {
swsPrevByJob := getSWSByJob(prev) swsPrevByJob := getSWSByJob(prev)
dst := make([]ScrapeWork, 0, len(prev)) dst := make([]*ScrapeWork, 0, len(prev))
for i := range cfg.ScrapeConfigs { for i := range cfg.ScrapeConfigs {
sc := &cfg.ScrapeConfigs[i] sc := &cfg.ScrapeConfigs[i]
dstLen := len(dst) dstLen := len(dst)
@ -293,9 +295,9 @@ func (cfg *Config) getConsulSDScrapeWork(prev []ScrapeWork) []ScrapeWork {
} }
// getEurekaSDScrapeWork returns `eureka_sd_configs` ScrapeWork from cfg. // getEurekaSDScrapeWork returns `eureka_sd_configs` ScrapeWork from cfg.
func (cfg *Config) getEurekaSDScrapeWork(prev []ScrapeWork) []ScrapeWork { func (cfg *Config) getEurekaSDScrapeWork(prev []*ScrapeWork) []*ScrapeWork {
swsPrevByJob := getSWSByJob(prev) swsPrevByJob := getSWSByJob(prev)
dst := make([]ScrapeWork, 0, len(prev)) dst := make([]*ScrapeWork, 0, len(prev))
for i := range cfg.ScrapeConfigs { for i := range cfg.ScrapeConfigs {
sc := &cfg.ScrapeConfigs[i] sc := &cfg.ScrapeConfigs[i]
dstLen := len(dst) dstLen := len(dst)
@ -321,9 +323,9 @@ func (cfg *Config) getEurekaSDScrapeWork(prev []ScrapeWork) []ScrapeWork {
} }
// getDNSSDScrapeWork returns `dns_sd_configs` ScrapeWork from cfg. // getDNSSDScrapeWork returns `dns_sd_configs` ScrapeWork from cfg.
func (cfg *Config) getDNSSDScrapeWork(prev []ScrapeWork) []ScrapeWork { func (cfg *Config) getDNSSDScrapeWork(prev []*ScrapeWork) []*ScrapeWork {
swsPrevByJob := getSWSByJob(prev) swsPrevByJob := getSWSByJob(prev)
dst := make([]ScrapeWork, 0, len(prev)) dst := make([]*ScrapeWork, 0, len(prev))
for i := range cfg.ScrapeConfigs { for i := range cfg.ScrapeConfigs {
sc := &cfg.ScrapeConfigs[i] sc := &cfg.ScrapeConfigs[i]
dstLen := len(dst) dstLen := len(dst)
@ -349,9 +351,9 @@ func (cfg *Config) getDNSSDScrapeWork(prev []ScrapeWork) []ScrapeWork {
} }
// getEC2SDScrapeWork returns `ec2_sd_configs` ScrapeWork from cfg. // getEC2SDScrapeWork returns `ec2_sd_configs` ScrapeWork from cfg.
func (cfg *Config) getEC2SDScrapeWork(prev []ScrapeWork) []ScrapeWork { func (cfg *Config) getEC2SDScrapeWork(prev []*ScrapeWork) []*ScrapeWork {
swsPrevByJob := getSWSByJob(prev) swsPrevByJob := getSWSByJob(prev)
dst := make([]ScrapeWork, 0, len(prev)) dst := make([]*ScrapeWork, 0, len(prev))
for i := range cfg.ScrapeConfigs { for i := range cfg.ScrapeConfigs {
sc := &cfg.ScrapeConfigs[i] sc := &cfg.ScrapeConfigs[i]
dstLen := len(dst) dstLen := len(dst)
@ -377,9 +379,9 @@ func (cfg *Config) getEC2SDScrapeWork(prev []ScrapeWork) []ScrapeWork {
} }
// getGCESDScrapeWork returns `gce_sd_configs` ScrapeWork from cfg. // getGCESDScrapeWork returns `gce_sd_configs` ScrapeWork from cfg.
func (cfg *Config) getGCESDScrapeWork(prev []ScrapeWork) []ScrapeWork { func (cfg *Config) getGCESDScrapeWork(prev []*ScrapeWork) []*ScrapeWork {
swsPrevByJob := getSWSByJob(prev) swsPrevByJob := getSWSByJob(prev)
dst := make([]ScrapeWork, 0, len(prev)) dst := make([]*ScrapeWork, 0, len(prev))
for i := range cfg.ScrapeConfigs { for i := range cfg.ScrapeConfigs {
sc := &cfg.ScrapeConfigs[i] sc := &cfg.ScrapeConfigs[i]
dstLen := len(dst) dstLen := len(dst)
@ -405,19 +407,18 @@ func (cfg *Config) getGCESDScrapeWork(prev []ScrapeWork) []ScrapeWork {
} }
// getFileSDScrapeWork returns `file_sd_configs` ScrapeWork from cfg. // getFileSDScrapeWork returns `file_sd_configs` ScrapeWork from cfg.
func (cfg *Config) getFileSDScrapeWork(prev []ScrapeWork) []ScrapeWork { func (cfg *Config) getFileSDScrapeWork(prev []*ScrapeWork) []*ScrapeWork {
// Create a map for the previous scrape work. // Create a map for the previous scrape work.
swsMapPrev := make(map[string][]ScrapeWork) swsMapPrev := make(map[string][]*ScrapeWork)
for i := range prev { for _, sw := range prev {
sw := &prev[i]
filepath := promrelabel.GetLabelValueByName(sw.Labels, "__vm_filepath") filepath := promrelabel.GetLabelValueByName(sw.Labels, "__vm_filepath")
if len(filepath) == 0 { if len(filepath) == 0 {
logger.Panicf("BUG: missing `__vm_filepath` label") logger.Panicf("BUG: missing `__vm_filepath` label")
} else { } else {
swsMapPrev[filepath] = append(swsMapPrev[filepath], *sw) swsMapPrev[filepath] = append(swsMapPrev[filepath], sw)
} }
} }
dst := make([]ScrapeWork, 0, len(prev)) dst := make([]*ScrapeWork, 0, len(prev))
for i := range cfg.ScrapeConfigs { for i := range cfg.ScrapeConfigs {
sc := &cfg.ScrapeConfigs[i] sc := &cfg.ScrapeConfigs[i]
for j := range sc.FileSDConfigs { for j := range sc.FileSDConfigs {
@ -429,8 +430,8 @@ func (cfg *Config) getFileSDScrapeWork(prev []ScrapeWork) []ScrapeWork {
} }
// getStaticScrapeWork returns `static_configs` ScrapeWork from from cfg. // getStaticScrapeWork returns `static_configs` ScrapeWork from from cfg.
func (cfg *Config) getStaticScrapeWork() []ScrapeWork { func (cfg *Config) getStaticScrapeWork() []*ScrapeWork {
var dst []ScrapeWork var dst []*ScrapeWork
for i := range cfg.ScrapeConfigs { for i := range cfg.ScrapeConfigs {
sc := &cfg.ScrapeConfigs[i] sc := &cfg.ScrapeConfigs[i]
for j := range sc.StaticConfigs { for j := range sc.StaticConfigs {
@ -528,7 +529,7 @@ type scrapeWorkConfig struct {
streamParse bool streamParse bool
} }
func appendKubernetesScrapeWork(dst []ScrapeWork, sdc *kubernetes.SDConfig, baseDir string, swc *scrapeWorkConfig) ([]ScrapeWork, bool) { func appendKubernetesScrapeWork(dst []*ScrapeWork, sdc *kubernetes.SDConfig, baseDir string, swc *scrapeWorkConfig) ([]*ScrapeWork, bool) {
targetLabels, err := kubernetes.GetLabels(sdc, baseDir) targetLabels, err := kubernetes.GetLabels(sdc, baseDir)
if err != nil { if err != nil {
logger.Errorf("error when discovering kubernetes targets for `job_name` %q: %s; skipping it", swc.jobName, err) logger.Errorf("error when discovering kubernetes targets for `job_name` %q: %s; skipping it", swc.jobName, err)
@ -537,7 +538,7 @@ func appendKubernetesScrapeWork(dst []ScrapeWork, sdc *kubernetes.SDConfig, base
return appendScrapeWorkForTargetLabels(dst, swc, targetLabels, "kubernetes_sd_config"), true return appendScrapeWorkForTargetLabels(dst, swc, targetLabels, "kubernetes_sd_config"), true
} }
func appendOpenstackScrapeWork(dst []ScrapeWork, sdc *openstack.SDConfig, baseDir string, swc *scrapeWorkConfig) ([]ScrapeWork, bool) { func appendOpenstackScrapeWork(dst []*ScrapeWork, sdc *openstack.SDConfig, baseDir string, swc *scrapeWorkConfig) ([]*ScrapeWork, bool) {
targetLabels, err := openstack.GetLabels(sdc, baseDir) targetLabels, err := openstack.GetLabels(sdc, baseDir)
if err != nil { if err != nil {
logger.Errorf("error when discovering openstack targets for `job_name` %q: %s; skipping it", swc.jobName, err) logger.Errorf("error when discovering openstack targets for `job_name` %q: %s; skipping it", swc.jobName, err)
@ -546,7 +547,7 @@ func appendOpenstackScrapeWork(dst []ScrapeWork, sdc *openstack.SDConfig, baseDi
return appendScrapeWorkForTargetLabels(dst, swc, targetLabels, "openstack_sd_config"), true return appendScrapeWorkForTargetLabels(dst, swc, targetLabels, "openstack_sd_config"), true
} }
func appendDockerSwarmScrapeWork(dst []ScrapeWork, sdc *dockerswarm.SDConfig, baseDir string, swc *scrapeWorkConfig) ([]ScrapeWork, bool) { func appendDockerSwarmScrapeWork(dst []*ScrapeWork, sdc *dockerswarm.SDConfig, baseDir string, swc *scrapeWorkConfig) ([]*ScrapeWork, bool) {
targetLabels, err := dockerswarm.GetLabels(sdc, baseDir) targetLabels, err := dockerswarm.GetLabels(sdc, baseDir)
if err != nil { if err != nil {
logger.Errorf("error when discovering dockerswarm targets for `job_name` %q: %s; skipping it", swc.jobName, err) logger.Errorf("error when discovering dockerswarm targets for `job_name` %q: %s; skipping it", swc.jobName, err)
@ -555,7 +556,7 @@ func appendDockerSwarmScrapeWork(dst []ScrapeWork, sdc *dockerswarm.SDConfig, ba
return appendScrapeWorkForTargetLabels(dst, swc, targetLabels, "dockerswarm_sd_config"), true return appendScrapeWorkForTargetLabels(dst, swc, targetLabels, "dockerswarm_sd_config"), true
} }
func appendConsulScrapeWork(dst []ScrapeWork, sdc *consul.SDConfig, baseDir string, swc *scrapeWorkConfig) ([]ScrapeWork, bool) { func appendConsulScrapeWork(dst []*ScrapeWork, sdc *consul.SDConfig, baseDir string, swc *scrapeWorkConfig) ([]*ScrapeWork, bool) {
targetLabels, err := consul.GetLabels(sdc, baseDir) targetLabels, err := consul.GetLabels(sdc, baseDir)
if err != nil { if err != nil {
logger.Errorf("error when discovering consul targets for `job_name` %q: %s; skipping it", swc.jobName, err) logger.Errorf("error when discovering consul targets for `job_name` %q: %s; skipping it", swc.jobName, err)
@ -564,7 +565,7 @@ func appendConsulScrapeWork(dst []ScrapeWork, sdc *consul.SDConfig, baseDir stri
return appendScrapeWorkForTargetLabels(dst, swc, targetLabels, "consul_sd_config"), true return appendScrapeWorkForTargetLabels(dst, swc, targetLabels, "consul_sd_config"), true
} }
func appendEurekaScrapeWork(dst []ScrapeWork, sdc *eureka.SDConfig, baseDir string, swc *scrapeWorkConfig) ([]ScrapeWork, bool) { func appendEurekaScrapeWork(dst []*ScrapeWork, sdc *eureka.SDConfig, baseDir string, swc *scrapeWorkConfig) ([]*ScrapeWork, bool) {
targetLabels, err := eureka.GetLabels(sdc, baseDir) targetLabels, err := eureka.GetLabels(sdc, baseDir)
if err != nil { if err != nil {
logger.Errorf("error when discovering eureka targets for `job_name` %q: %s; skipping it", swc.jobName, err) logger.Errorf("error when discovering eureka targets for `job_name` %q: %s; skipping it", swc.jobName, err)
@ -573,7 +574,7 @@ func appendEurekaScrapeWork(dst []ScrapeWork, sdc *eureka.SDConfig, baseDir stri
return appendScrapeWorkForTargetLabels(dst, swc, targetLabels, "eureka_sd_config"), true return appendScrapeWorkForTargetLabels(dst, swc, targetLabels, "eureka_sd_config"), true
} }
func appendDNSScrapeWork(dst []ScrapeWork, sdc *dns.SDConfig, swc *scrapeWorkConfig) ([]ScrapeWork, bool) { func appendDNSScrapeWork(dst []*ScrapeWork, sdc *dns.SDConfig, swc *scrapeWorkConfig) ([]*ScrapeWork, bool) {
targetLabels, err := dns.GetLabels(sdc) targetLabels, err := dns.GetLabels(sdc)
if err != nil { if err != nil {
logger.Errorf("error when discovering dns targets for `job_name` %q: %s; skipping it", swc.jobName, err) logger.Errorf("error when discovering dns targets for `job_name` %q: %s; skipping it", swc.jobName, err)
@ -582,7 +583,7 @@ func appendDNSScrapeWork(dst []ScrapeWork, sdc *dns.SDConfig, swc *scrapeWorkCon
return appendScrapeWorkForTargetLabels(dst, swc, targetLabels, "dns_sd_config"), true return appendScrapeWorkForTargetLabels(dst, swc, targetLabels, "dns_sd_config"), true
} }
func appendEC2ScrapeWork(dst []ScrapeWork, sdc *ec2.SDConfig, swc *scrapeWorkConfig) ([]ScrapeWork, bool) { func appendEC2ScrapeWork(dst []*ScrapeWork, sdc *ec2.SDConfig, swc *scrapeWorkConfig) ([]*ScrapeWork, bool) {
targetLabels, err := ec2.GetLabels(sdc) targetLabels, err := ec2.GetLabels(sdc)
if err != nil { if err != nil {
logger.Errorf("error when discovering ec2 targets for `job_name` %q: %s; skipping it", swc.jobName, err) logger.Errorf("error when discovering ec2 targets for `job_name` %q: %s; skipping it", swc.jobName, err)
@ -591,7 +592,7 @@ func appendEC2ScrapeWork(dst []ScrapeWork, sdc *ec2.SDConfig, swc *scrapeWorkCon
return appendScrapeWorkForTargetLabels(dst, swc, targetLabels, "ec2_sd_config"), true return appendScrapeWorkForTargetLabels(dst, swc, targetLabels, "ec2_sd_config"), true
} }
func appendGCEScrapeWork(dst []ScrapeWork, sdc *gce.SDConfig, swc *scrapeWorkConfig) ([]ScrapeWork, bool) { func appendGCEScrapeWork(dst []*ScrapeWork, sdc *gce.SDConfig, swc *scrapeWorkConfig) ([]*ScrapeWork, bool) {
targetLabels, err := gce.GetLabels(sdc) targetLabels, err := gce.GetLabels(sdc)
if err != nil { if err != nil {
logger.Errorf("error when discovering gce targets for `job_name` %q: %s; skippint it", swc.jobName, err) logger.Errorf("error when discovering gce targets for `job_name` %q: %s; skippint it", swc.jobName, err)
@ -600,7 +601,7 @@ func appendGCEScrapeWork(dst []ScrapeWork, sdc *gce.SDConfig, swc *scrapeWorkCon
return appendScrapeWorkForTargetLabels(dst, swc, targetLabels, "gce_sd_config"), true return appendScrapeWorkForTargetLabels(dst, swc, targetLabels, "gce_sd_config"), true
} }
func appendScrapeWorkForTargetLabels(dst []ScrapeWork, swc *scrapeWorkConfig, targetLabels []map[string]string, sectionName string) []ScrapeWork { func appendScrapeWorkForTargetLabels(dst []*ScrapeWork, swc *scrapeWorkConfig, targetLabels []map[string]string, sectionName string) []*ScrapeWork {
for _, metaLabels := range targetLabels { for _, metaLabels := range targetLabels {
target := metaLabels["__address__"] target := metaLabels["__address__"]
var err error var err error
@ -613,7 +614,7 @@ func appendScrapeWorkForTargetLabels(dst []ScrapeWork, swc *scrapeWorkConfig, ta
return dst return dst
} }
func (sdc *FileSDConfig) appendScrapeWork(dst []ScrapeWork, swsMapPrev map[string][]ScrapeWork, baseDir string, swc *scrapeWorkConfig) []ScrapeWork { func (sdc *FileSDConfig) appendScrapeWork(dst []*ScrapeWork, swsMapPrev map[string][]*ScrapeWork, baseDir string, swc *scrapeWorkConfig) []*ScrapeWork {
for _, file := range sdc.Files { for _, file := range sdc.Files {
pathPattern := getFilepath(baseDir, file) pathPattern := getFilepath(baseDir, file)
paths := []string{pathPattern} paths := []string{pathPattern}
@ -658,7 +659,7 @@ func (sdc *FileSDConfig) appendScrapeWork(dst []ScrapeWork, swsMapPrev map[strin
return dst return dst
} }
func (stc *StaticConfig) appendScrapeWork(dst []ScrapeWork, swc *scrapeWorkConfig, metaLabels map[string]string) []ScrapeWork { func (stc *StaticConfig) appendScrapeWork(dst []*ScrapeWork, swc *scrapeWorkConfig, metaLabels map[string]string) []*ScrapeWork {
for _, target := range stc.Targets { for _, target := range stc.Targets {
if target == "" { if target == "" {
// Do not return this error, since other targets may be valid // Do not return this error, since other targets may be valid
@ -676,12 +677,14 @@ func (stc *StaticConfig) appendScrapeWork(dst []ScrapeWork, swc *scrapeWorkConfi
return dst return dst
} }
func appendScrapeWork(dst []ScrapeWork, swc *scrapeWorkConfig, target string, extraLabels, metaLabels map[string]string) ([]ScrapeWork, error) { func appendScrapeWork(dst []*ScrapeWork, swc *scrapeWorkConfig, target string, extraLabels, metaLabels map[string]string) ([]*ScrapeWork, error) {
labels := mergeLabels(swc.jobName, swc.scheme, target, swc.metricsPath, extraLabels, swc.externalLabels, metaLabels, swc.params) labels := mergeLabels(swc.jobName, swc.scheme, target, swc.metricsPath, extraLabels, swc.externalLabels, metaLabels, swc.params)
var originalLabels []prompbmarshal.Label var originalLabels []prompbmarshal.Label
if !*dropOriginalLabels { if !*dropOriginalLabels {
originalLabels = append([]prompbmarshal.Label{}, labels...) originalLabels = append([]prompbmarshal.Label{}, labels...)
promrelabel.SortLabels(originalLabels) promrelabel.SortLabels(originalLabels)
// Reduce memory usage by interning all the strings in originalLabels.
internLabelStrings(originalLabels)
} }
labels = promrelabel.ApplyRelabelConfigs(labels, 0, swc.relabelConfigs, false) labels = promrelabel.ApplyRelabelConfigs(labels, 0, swc.relabelConfigs, false)
labels = promrelabel.RemoveMetaLabels(labels[:0], labels) labels = promrelabel.RemoveMetaLabels(labels[:0], labels)
@ -738,7 +741,9 @@ func appendScrapeWork(dst []ScrapeWork, swc *scrapeWorkConfig, target string, ex
}) })
promrelabel.SortLabels(labels) promrelabel.SortLabels(labels)
} }
dst = append(dst, ScrapeWork{ // Reduce memory usage by interning all the strings in labels.
internLabelStrings(labels)
dst = append(dst, &ScrapeWork{
ID: atomic.AddUint64(&nextScrapeWorkID, 1), ID: atomic.AddUint64(&nextScrapeWorkID, 1),
ScrapeURL: scrapeURL, ScrapeURL: scrapeURL,
ScrapeInterval: swc.scrapeInterval, ScrapeInterval: swc.scrapeInterval,
@ -762,6 +767,35 @@ func appendScrapeWork(dst []ScrapeWork, swc *scrapeWorkConfig, target string, ex
// Each ScrapeWork has an ID, which is used for locating it when updating its status. // Each ScrapeWork has an ID, which is used for locating it when updating its status.
var nextScrapeWorkID uint64 var nextScrapeWorkID uint64
func internLabelStrings(labels []prompbmarshal.Label) {
for i := range labels {
label := &labels[i]
label.Name = internString(label.Name)
label.Value = internString(label.Value)
}
}
func internString(s string) string {
internStringsMapLock.Lock()
defer internStringsMapLock.Unlock()
if sInterned, ok := internStringsMap[s]; ok {
return sInterned
}
// Make a new copy for s in order to remove references from possible bigger string s refers to.
sCopy := string(append([]byte{}, s...))
internStringsMap[sCopy] = sCopy
if len(internStringsMap) > 100e3 {
internStringsMap = make(map[string]string, 100e3)
}
return sCopy
}
var (
internStringsMapLock sync.Mutex
internStringsMap = make(map[string]string, 100e3)
)
func getParamsFromLabels(labels []prompbmarshal.Label, paramsOrig map[string][]string) map[string][]string { func getParamsFromLabels(labels []prompbmarshal.Label, paramsOrig map[string][]string) map[string][]string {
// See https://www.robustperception.io/life-of-a-label // See https://www.robustperception.io/life-of-a-label
m := make(map[string][]string) m := make(map[string][]string)

View file

@ -94,7 +94,7 @@ scrape_configs:
} }
sws := cfg.getStaticScrapeWork() sws := cfg.getStaticScrapeWork()
resetNonEssentialFields(sws) resetNonEssentialFields(sws)
swsExpected := []ScrapeWork{{ swsExpected := []*ScrapeWork{{
ScrapeURL: "http://black:9115/probe?module=dns_udp_example&target=8.8.8.8", ScrapeURL: "http://black:9115/probe?module=dns_udp_example&target=8.8.8.8",
ScrapeInterval: defaultScrapeInterval, ScrapeInterval: defaultScrapeInterval,
ScrapeTimeout: defaultScrapeTimeout, ScrapeTimeout: defaultScrapeTimeout,
@ -199,7 +199,7 @@ scrape_configs:
} }
} }
func getFileSDScrapeWork(data []byte, path string) ([]ScrapeWork, error) { func getFileSDScrapeWork(data []byte, path string) ([]*ScrapeWork, error) {
var cfg Config var cfg Config
if err := cfg.parse(data, path); err != nil { if err := cfg.parse(data, path); err != nil {
return nil, fmt.Errorf("cannot parse data: %w", err) return nil, fmt.Errorf("cannot parse data: %w", err)
@ -207,7 +207,7 @@ func getFileSDScrapeWork(data []byte, path string) ([]ScrapeWork, error) {
return cfg.getFileSDScrapeWork(nil), nil return cfg.getFileSDScrapeWork(nil), nil
} }
func getStaticScrapeWork(data []byte, path string) ([]ScrapeWork, error) { func getStaticScrapeWork(data []byte, path string) ([]*ScrapeWork, error) {
var cfg Config var cfg Config
if err := cfg.parse(data, path); err != nil { if err := cfg.parse(data, path); err != nil {
return nil, fmt.Errorf("cannot parse data: %w", err) return nil, fmt.Errorf("cannot parse data: %w", err)
@ -440,7 +440,7 @@ scrape_configs:
`) `)
} }
func resetNonEssentialFields(sws []ScrapeWork) { func resetNonEssentialFields(sws []*ScrapeWork) {
for i := range sws { for i := range sws {
sws[i].ID = 0 sws[i].ID = 0
sws[i].OriginalLabels = nil sws[i].OriginalLabels = nil
@ -448,7 +448,7 @@ func resetNonEssentialFields(sws []ScrapeWork) {
} }
func TestGetFileSDScrapeWorkSuccess(t *testing.T) { func TestGetFileSDScrapeWorkSuccess(t *testing.T) {
f := func(data string, expectedSws []ScrapeWork) { f := func(data string, expectedSws []*ScrapeWork) {
t.Helper() t.Helper()
sws, err := getFileSDScrapeWork([]byte(data), "non-existing-file") sws, err := getFileSDScrapeWork([]byte(data), "non-existing-file")
if err != nil { if err != nil {
@ -457,8 +457,7 @@ func TestGetFileSDScrapeWorkSuccess(t *testing.T) {
resetNonEssentialFields(sws) resetNonEssentialFields(sws)
// Remove `__vm_filepath` label, since its value depends on the current working dir. // Remove `__vm_filepath` label, since its value depends on the current working dir.
for i := range sws { for _, sw := range sws {
sw := &sws[i]
for j := range sw.Labels { for j := range sw.Labels {
label := &sw.Labels[j] label := &sw.Labels[j]
if label.Name == "__vm_filepath" { if label.Name == "__vm_filepath" {
@ -475,14 +474,14 @@ scrape_configs:
- job_name: foo - job_name: foo
static_configs: static_configs:
- targets: ["xxx"] - targets: ["xxx"]
`, []ScrapeWork{}) `, []*ScrapeWork{})
f(` f(`
scrape_configs: scrape_configs:
- job_name: foo - job_name: foo
metrics_path: /abc/de metrics_path: /abc/de
file_sd_configs: file_sd_configs:
- files: ["testdata/file_sd.json", "testdata/file_sd*.yml"] - files: ["testdata/file_sd.json", "testdata/file_sd*.yml"]
`, []ScrapeWork{ `, []*ScrapeWork{
{ {
ScrapeURL: "http://host1:80/abc/de", ScrapeURL: "http://host1:80/abc/de",
ScrapeInterval: defaultScrapeInterval, ScrapeInterval: defaultScrapeInterval,
@ -604,7 +603,7 @@ scrape_configs:
} }
func TestGetStaticScrapeWorkSuccess(t *testing.T) { func TestGetStaticScrapeWorkSuccess(t *testing.T) {
f := func(data string, expectedSws []ScrapeWork) { f := func(data string, expectedSws []*ScrapeWork) {
t.Helper() t.Helper()
sws, err := getStaticScrapeWork([]byte(data), "non-exsiting-file") sws, err := getStaticScrapeWork([]byte(data), "non-exsiting-file")
if err != nil { if err != nil {
@ -621,7 +620,7 @@ scrape_configs:
- job_name: foo - job_name: foo
static_configs: static_configs:
- targets: ["foo.bar:1234"] - targets: ["foo.bar:1234"]
`, []ScrapeWork{ `, []*ScrapeWork{
{ {
ScrapeURL: "http://foo.bar:1234/metrics", ScrapeURL: "http://foo.bar:1234/metrics",
ScrapeInterval: defaultScrapeInterval, ScrapeInterval: defaultScrapeInterval,
@ -663,7 +662,7 @@ scrape_configs:
- job_name: foo - job_name: foo
static_configs: static_configs:
- targets: ["foo.bar:1234"] - targets: ["foo.bar:1234"]
`, []ScrapeWork{ `, []*ScrapeWork{
{ {
ScrapeURL: "http://foo.bar:1234/metrics", ScrapeURL: "http://foo.bar:1234/metrics",
ScrapeInterval: defaultScrapeInterval, ScrapeInterval: defaultScrapeInterval,
@ -733,7 +732,7 @@ scrape_configs:
insecure_skip_verify: true insecure_skip_verify: true
static_configs: static_configs:
- targets: [1.2.3.4] - targets: [1.2.3.4]
`, []ScrapeWork{ `, []*ScrapeWork{
{ {
ScrapeURL: "https://foo.bar:443/foo/bar?p=x%26y&p=%3D", ScrapeURL: "https://foo.bar:443/foo/bar?p=x%26y&p=%3D",
ScrapeInterval: 543 * time.Second, ScrapeInterval: 543 * time.Second,
@ -887,7 +886,7 @@ scrape_configs:
x: [keep_me] x: [keep_me]
static_configs: static_configs:
- targets: ["foo.bar:1234", "drop-this-target"] - targets: ["foo.bar:1234", "drop-this-target"]
`, []ScrapeWork{ `, []*ScrapeWork{
{ {
ScrapeURL: "http://foo.bar:1234/metrics?x=keep_me", ScrapeURL: "http://foo.bar:1234/metrics?x=keep_me",
ScrapeInterval: defaultScrapeInterval, ScrapeInterval: defaultScrapeInterval,
@ -957,7 +956,7 @@ scrape_configs:
replacement: b replacement: b
static_configs: static_configs:
- targets: ["foo.bar:1234"] - targets: ["foo.bar:1234"]
`, []ScrapeWork{ `, []*ScrapeWork{
{ {
ScrapeURL: "mailto://foo.bar:1234/abc.de?a=b", ScrapeURL: "mailto://foo.bar:1234/abc.de?a=b",
ScrapeInterval: defaultScrapeInterval, ScrapeInterval: defaultScrapeInterval,
@ -1012,7 +1011,7 @@ scrape_configs:
regex: "" regex: ""
static_configs: static_configs:
- targets: ["foo.bar:1234", "xyz"] - targets: ["foo.bar:1234", "xyz"]
`, []ScrapeWork{ `, []*ScrapeWork{
{ {
ScrapeURL: "http://foo.bar:1234/metrics", ScrapeURL: "http://foo.bar:1234/metrics",
ScrapeInterval: defaultScrapeInterval, ScrapeInterval: defaultScrapeInterval,
@ -1051,7 +1050,7 @@ scrape_configs:
target_label: abc target_label: abc
static_configs: static_configs:
- targets: ["foo.bar:1234"] - targets: ["foo.bar:1234"]
`, []ScrapeWork{ `, []*ScrapeWork{
{ {
ScrapeURL: "http://foo.bar:1234/metrics", ScrapeURL: "http://foo.bar:1234/metrics",
ScrapeInterval: defaultScrapeInterval, ScrapeInterval: defaultScrapeInterval,
@ -1091,7 +1090,7 @@ scrape_configs:
password_file: testdata/password.txt password_file: testdata/password.txt
static_configs: static_configs:
- targets: ["foo.bar:1234"] - targets: ["foo.bar:1234"]
`, []ScrapeWork{ `, []*ScrapeWork{
{ {
ScrapeURL: "http://foo.bar:1234/metrics", ScrapeURL: "http://foo.bar:1234/metrics",
ScrapeInterval: defaultScrapeInterval, ScrapeInterval: defaultScrapeInterval,
@ -1130,7 +1129,7 @@ scrape_configs:
bearer_token_file: testdata/password.txt bearer_token_file: testdata/password.txt
static_configs: static_configs:
- targets: ["foo.bar:1234"] - targets: ["foo.bar:1234"]
`, []ScrapeWork{ `, []*ScrapeWork{
{ {
ScrapeURL: "http://foo.bar:1234/metrics", ScrapeURL: "http://foo.bar:1234/metrics",
ScrapeInterval: defaultScrapeInterval, ScrapeInterval: defaultScrapeInterval,
@ -1175,7 +1174,7 @@ scrape_configs:
key_file: testdata/ssl-cert-snakeoil.key key_file: testdata/ssl-cert-snakeoil.key
static_configs: static_configs:
- targets: ["foo.bar:1234"] - targets: ["foo.bar:1234"]
`, []ScrapeWork{ `, []*ScrapeWork{
{ {
ScrapeURL: "http://foo.bar:1234/metrics", ScrapeURL: "http://foo.bar:1234/metrics",
ScrapeInterval: defaultScrapeInterval, ScrapeInterval: defaultScrapeInterval,
@ -1227,7 +1226,7 @@ scrape_configs:
__param_a: c __param_a: c
__address__: pp __address__: pp
job: yyy job: yyy
`, []ScrapeWork{ `, []*ScrapeWork{
{ {
ScrapeURL: "http://pp:80/metrics?a=c&a=xy", ScrapeURL: "http://pp:80/metrics?a=c&a=xy",
ScrapeInterval: defaultScrapeInterval, ScrapeInterval: defaultScrapeInterval,
@ -1290,7 +1289,7 @@ scrape_configs:
target_label: instance target_label: instance
- target_label: __address__ - target_label: __address__
replacement: 127.0.0.1:9116 # The SNMP exporter's real hostname:port. replacement: 127.0.0.1:9116 # The SNMP exporter's real hostname:port.
`, []ScrapeWork{ `, []*ScrapeWork{
{ {
ScrapeURL: "http://127.0.0.1:9116/snmp?module=if_mib&target=192.168.1.2", ScrapeURL: "http://127.0.0.1:9116/snmp?module=if_mib&target=192.168.1.2",
ScrapeInterval: defaultScrapeInterval, ScrapeInterval: defaultScrapeInterval,
@ -1341,7 +1340,7 @@ scrape_configs:
relabel_configs: relabel_configs:
- replacement: metricspath - replacement: metricspath
target_label: __metrics_path__ target_label: __metrics_path__
`, []ScrapeWork{ `, []*ScrapeWork{
{ {
ScrapeURL: "http://foo.bar:1234/metricspath", ScrapeURL: "http://foo.bar:1234/metricspath",
ScrapeInterval: defaultScrapeInterval, ScrapeInterval: defaultScrapeInterval,
@ -1376,7 +1375,7 @@ scrape_configs:
var defaultRegexForRelabelConfig = regexp.MustCompile("^(.*)$") var defaultRegexForRelabelConfig = regexp.MustCompile("^(.*)$")
func equalStaticConfigForScrapeWorks(a, b []ScrapeWork) bool { func equalStaticConfigForScrapeWorks(a, b []*ScrapeWork) bool {
if len(a) != len(b) { if len(a) != len(b) {
return false return false
} }

View file

@ -43,7 +43,7 @@ type serviceWatcher struct {
// newConsulWatcher creates new watcher and start background service discovery for Consul. // newConsulWatcher creates new watcher and start background service discovery for Consul.
func newConsulWatcher(client *discoveryutils.Client, sdc *SDConfig, datacenter string) *consulWatcher { func newConsulWatcher(client *discoveryutils.Client, sdc *SDConfig, datacenter string) *consulWatcher {
baseQueryArgs := "?sdc=" + url.QueryEscape(datacenter) baseQueryArgs := "?dc=" + url.QueryEscape(datacenter)
if sdc.AllowStale { if sdc.AllowStale {
baseQueryArgs += "&stale" baseQueryArgs += "&stale"
} }

View file

@ -95,16 +95,16 @@ func runScraper(configFile string, pushData func(wr *prompbmarshal.WriteRequest)
} }
scs := newScrapeConfigs(pushData) scs := newScrapeConfigs(pushData)
scs.add("static_configs", 0, func(cfg *Config, swsPrev []ScrapeWork) []ScrapeWork { return cfg.getStaticScrapeWork() }) scs.add("static_configs", 0, func(cfg *Config, swsPrev []*ScrapeWork) []*ScrapeWork { return cfg.getStaticScrapeWork() })
scs.add("file_sd_configs", *fileSDCheckInterval, func(cfg *Config, swsPrev []ScrapeWork) []ScrapeWork { return cfg.getFileSDScrapeWork(swsPrev) }) scs.add("file_sd_configs", *fileSDCheckInterval, func(cfg *Config, swsPrev []*ScrapeWork) []*ScrapeWork { return cfg.getFileSDScrapeWork(swsPrev) })
scs.add("kubernetes_sd_configs", *kubernetesSDCheckInterval, func(cfg *Config, swsPrev []ScrapeWork) []ScrapeWork { return cfg.getKubernetesSDScrapeWork(swsPrev) }) scs.add("kubernetes_sd_configs", *kubernetesSDCheckInterval, func(cfg *Config, swsPrev []*ScrapeWork) []*ScrapeWork { return cfg.getKubernetesSDScrapeWork(swsPrev) })
scs.add("openstack_sd_configs", *openstackSDCheckInterval, func(cfg *Config, swsPrev []ScrapeWork) []ScrapeWork { return cfg.getOpenStackSDScrapeWork(swsPrev) }) scs.add("openstack_sd_configs", *openstackSDCheckInterval, func(cfg *Config, swsPrev []*ScrapeWork) []*ScrapeWork { return cfg.getOpenStackSDScrapeWork(swsPrev) })
scs.add("consul_sd_configs", *consul.SDCheckInterval, func(cfg *Config, swsPrev []ScrapeWork) []ScrapeWork { return cfg.getConsulSDScrapeWork(swsPrev) }) scs.add("consul_sd_configs", *consul.SDCheckInterval, func(cfg *Config, swsPrev []*ScrapeWork) []*ScrapeWork { return cfg.getConsulSDScrapeWork(swsPrev) })
scs.add("eureka_sd_configs", *eurekaSDCheckInterval, func(cfg *Config, swsPrev []ScrapeWork) []ScrapeWork { return cfg.getEurekaSDScrapeWork(swsPrev) }) scs.add("eureka_sd_configs", *eurekaSDCheckInterval, func(cfg *Config, swsPrev []*ScrapeWork) []*ScrapeWork { return cfg.getEurekaSDScrapeWork(swsPrev) })
scs.add("dns_sd_configs", *dnsSDCheckInterval, func(cfg *Config, swsPrev []ScrapeWork) []ScrapeWork { return cfg.getDNSSDScrapeWork(swsPrev) }) scs.add("dns_sd_configs", *dnsSDCheckInterval, func(cfg *Config, swsPrev []*ScrapeWork) []*ScrapeWork { return cfg.getDNSSDScrapeWork(swsPrev) })
scs.add("ec2_sd_configs", *ec2SDCheckInterval, func(cfg *Config, swsPrev []ScrapeWork) []ScrapeWork { return cfg.getEC2SDScrapeWork(swsPrev) }) scs.add("ec2_sd_configs", *ec2SDCheckInterval, func(cfg *Config, swsPrev []*ScrapeWork) []*ScrapeWork { return cfg.getEC2SDScrapeWork(swsPrev) })
scs.add("gce_sd_configs", *gceSDCheckInterval, func(cfg *Config, swsPrev []ScrapeWork) []ScrapeWork { return cfg.getGCESDScrapeWork(swsPrev) }) scs.add("gce_sd_configs", *gceSDCheckInterval, func(cfg *Config, swsPrev []*ScrapeWork) []*ScrapeWork { return cfg.getGCESDScrapeWork(swsPrev) })
scs.add("dockerswarm_sd_configs", *dockerswarmSDCheckInterval, func(cfg *Config, swsPrev []ScrapeWork) []ScrapeWork { return cfg.getDockerSwarmSDScrapeWork(swsPrev) }) scs.add("dockerswarm_sd_configs", *dockerswarmSDCheckInterval, func(cfg *Config, swsPrev []*ScrapeWork) []*ScrapeWork { return cfg.getDockerSwarmSDScrapeWork(swsPrev) })
sighupCh := procutil.NewSighupChan() sighupCh := procutil.NewSighupChan()
@ -171,7 +171,7 @@ func newScrapeConfigs(pushData func(wr *prompbmarshal.WriteRequest)) *scrapeConf
} }
} }
func (scs *scrapeConfigs) add(name string, checkInterval time.Duration, getScrapeWork func(cfg *Config, swsPrev []ScrapeWork) []ScrapeWork) { func (scs *scrapeConfigs) add(name string, checkInterval time.Duration, getScrapeWork func(cfg *Config, swsPrev []*ScrapeWork) []*ScrapeWork) {
atomic.AddInt32(&PendingScrapeConfigs, 1) atomic.AddInt32(&PendingScrapeConfigs, 1)
scfg := &scrapeConfig{ scfg := &scrapeConfig{
name: name, name: name,
@ -204,7 +204,7 @@ func (scs *scrapeConfigs) stop() {
type scrapeConfig struct { type scrapeConfig struct {
name string name string
pushData func(wr *prompbmarshal.WriteRequest) pushData func(wr *prompbmarshal.WriteRequest)
getScrapeWork func(cfg *Config, swsPrev []ScrapeWork) []ScrapeWork getScrapeWork func(cfg *Config, swsPrev []*ScrapeWork) []*ScrapeWork
checkInterval time.Duration checkInterval time.Duration
cfgCh chan *Config cfgCh chan *Config
stopCh <-chan struct{} stopCh <-chan struct{}
@ -222,7 +222,7 @@ func (scfg *scrapeConfig) run() {
} }
cfg := <-scfg.cfgCh cfg := <-scfg.cfgCh
var swsPrev []ScrapeWork var swsPrev []*ScrapeWork
updateScrapeWork := func(cfg *Config) { updateScrapeWork := func(cfg *Config) {
sws := scfg.getScrapeWork(cfg, swsPrev) sws := scfg.getScrapeWork(cfg, swsPrev)
sg.update(sws) sg.update(sws)
@ -249,7 +249,11 @@ type scraperGroup struct {
mLock sync.Mutex mLock sync.Mutex
m map[string]*scraper m map[string]*scraper
pushData func(wr *prompbmarshal.WriteRequest) pushData func(wr *prompbmarshal.WriteRequest)
changesCount *metrics.Counter changesCount *metrics.Counter
activeScrapers *metrics.Counter
scrapersStarted *metrics.Counter
scrapersStopped *metrics.Counter
} }
func newScraperGroup(name string, pushData func(wr *prompbmarshal.WriteRequest)) *scraperGroup { func newScraperGroup(name string, pushData func(wr *prompbmarshal.WriteRequest)) *scraperGroup {
@ -257,7 +261,11 @@ func newScraperGroup(name string, pushData func(wr *prompbmarshal.WriteRequest))
name: name, name: name,
m: make(map[string]*scraper), m: make(map[string]*scraper),
pushData: pushData, pushData: pushData,
changesCount: metrics.NewCounter(fmt.Sprintf(`vm_promscrape_config_changes_total{type=%q}`, name)), changesCount: metrics.NewCounter(fmt.Sprintf(`vm_promscrape_config_changes_total{type=%q}`, name)),
activeScrapers: metrics.NewCounter(fmt.Sprintf(`vm_promscrape_active_scrapers{type=%q}`, name)),
scrapersStarted: metrics.NewCounter(fmt.Sprintf(`vm_promscrape_scrapers_started_total{type=%q}`, name)),
scrapersStopped: metrics.NewCounter(fmt.Sprintf(`vm_promscrape_scrapers_stopped_total{type=%q}`, name)),
} }
metrics.NewGauge(fmt.Sprintf(`vm_promscrape_targets{type=%q, status="up"}`, name), func() float64 { metrics.NewGauge(fmt.Sprintf(`vm_promscrape_targets{type=%q, status="up"}`, name), func() float64 {
return float64(tsmGlobal.StatusByGroup(sg.name, true)) return float64(tsmGlobal.StatusByGroup(sg.name, true))
@ -278,15 +286,14 @@ func (sg *scraperGroup) stop() {
sg.wg.Wait() sg.wg.Wait()
} }
func (sg *scraperGroup) update(sws []ScrapeWork) { func (sg *scraperGroup) update(sws []*ScrapeWork) {
sg.mLock.Lock() sg.mLock.Lock()
defer sg.mLock.Unlock() defer sg.mLock.Unlock()
additionsCount := 0 additionsCount := 0
deletionsCount := 0 deletionsCount := 0
swsMap := make(map[string][]prompbmarshal.Label, len(sws)) swsMap := make(map[string][]prompbmarshal.Label, len(sws))
for i := range sws { for _, sw := range sws {
sw := &sws[i]
key := sw.key() key := sw.key()
originalLabels := swsMap[key] originalLabels := swsMap[key]
if originalLabels != nil { if originalLabels != nil {
@ -308,11 +315,15 @@ func (sg *scraperGroup) update(sws []ScrapeWork) {
// Start a scraper for the missing key. // Start a scraper for the missing key.
sc := newScraper(sw, sg.name, sg.pushData) sc := newScraper(sw, sg.name, sg.pushData)
sg.activeScrapers.Inc()
sg.scrapersStarted.Inc()
sg.wg.Add(1) sg.wg.Add(1)
go func() { go func() {
defer sg.wg.Done() defer sg.wg.Done()
sc.sw.run(sc.stopCh) sc.sw.run(sc.stopCh)
tsmGlobal.Unregister(sw) tsmGlobal.Unregister(sw)
sg.activeScrapers.Dec()
sg.scrapersStopped.Inc()
}() }()
tsmGlobal.Register(sw) tsmGlobal.Register(sw)
sg.m[key] = sc sg.m[key] = sc

View file

@ -221,7 +221,9 @@ func (sw *scrapeWork) run(stopCh <-chan struct{}) {
func (sw *scrapeWork) logError(s string) { func (sw *scrapeWork) logError(s string) {
if !*suppressScrapeErrors { if !*suppressScrapeErrors {
logger.ErrorfSkipframes(1, "error when scraping %q from job %q with labels %s: %s", sw.Config.ScrapeURL, sw.Config.Job(), sw.Config.LabelsString(), s) logger.ErrorfSkipframes(1, "error when scraping %q from job %q with labels %s: %s; "+
"scrape errors can be disabled by -promscrape.suppressScrapeErrors command-line flag",
sw.Config.ScrapeURL, sw.Config.Job(), sw.Config.LabelsString(), s)
} }
} }

View file

@ -19,6 +19,9 @@ func statStdDial(ctx context.Context, network, addr string) (net.Conn, error) {
dialsTotal.Inc() dialsTotal.Inc()
if err != nil { if err != nil {
dialErrors.Inc() dialErrors.Inc()
if !netutil.TCP6Enabled() {
err = fmt.Errorf("%w; try -enableTCP6 command-line flag if you scrape ipv6 addresses", err)
}
return nil, err return nil, err
} }
conns.Inc() conns.Inc()
@ -54,7 +57,7 @@ func statDial(addr string) (conn net.Conn, err error) {
if err != nil { if err != nil {
dialErrors.Inc() dialErrors.Inc()
if !netutil.TCP6Enabled() { if !netutil.TCP6Enabled() {
err = fmt.Errorf("%w; try -enableTCP6 command-line flag", err) err = fmt.Errorf("%w; try -enableTCP6 command-line flag if you scrape ipv6 addresses", err)
} }
return nil, err return nil, err
} }

View file

@ -46,24 +46,24 @@ func WriteAPIV1Targets(w io.Writer, state string) {
type targetStatusMap struct { type targetStatusMap struct {
mu sync.Mutex mu sync.Mutex
m map[uint64]targetStatus m map[uint64]*targetStatus
} }
func newTargetStatusMap() *targetStatusMap { func newTargetStatusMap() *targetStatusMap {
return &targetStatusMap{ return &targetStatusMap{
m: make(map[uint64]targetStatus), m: make(map[uint64]*targetStatus),
} }
} }
func (tsm *targetStatusMap) Reset() { func (tsm *targetStatusMap) Reset() {
tsm.mu.Lock() tsm.mu.Lock()
tsm.m = make(map[uint64]targetStatus) tsm.m = make(map[uint64]*targetStatus)
tsm.mu.Unlock() tsm.mu.Unlock()
} }
func (tsm *targetStatusMap) Register(sw *ScrapeWork) { func (tsm *targetStatusMap) Register(sw *ScrapeWork) {
tsm.mu.Lock() tsm.mu.Lock()
tsm.m[sw.ID] = targetStatus{ tsm.m[sw.ID] = &targetStatus{
sw: *sw, sw: *sw,
} }
tsm.mu.Unlock() tsm.mu.Unlock()
@ -77,7 +77,7 @@ func (tsm *targetStatusMap) Unregister(sw *ScrapeWork) {
func (tsm *targetStatusMap) Update(sw *ScrapeWork, group string, up bool, scrapeTime, scrapeDuration int64, err error) { func (tsm *targetStatusMap) Update(sw *ScrapeWork, group string, up bool, scrapeTime, scrapeDuration int64, err error) {
tsm.mu.Lock() tsm.mu.Lock()
tsm.m[sw.ID] = targetStatus{ tsm.m[sw.ID] = &targetStatus{
sw: *sw, sw: *sw,
up: up, up: up,
scrapeGroup: group, scrapeGroup: group,
@ -114,7 +114,7 @@ func (tsm *targetStatusMap) WriteActiveTargetsJSON(w io.Writer) {
key := promLabelsString(st.sw.OriginalLabels) key := promLabelsString(st.sw.OriginalLabels)
kss = append(kss, keyStatus{ kss = append(kss, keyStatus{
key: key, key: key,
st: st, st: *st,
}) })
} }
tsm.mu.Unlock() tsm.mu.Unlock()
@ -167,7 +167,7 @@ func (tsm *targetStatusMap) WriteHumanReadable(w io.Writer, showOriginalLabels b
tsm.mu.Lock() tsm.mu.Lock()
for _, st := range tsm.m { for _, st := range tsm.m {
job := st.sw.Job() job := st.sw.Job()
byJob[job] = append(byJob[job], st) byJob[job] = append(byJob[job], *st)
} }
tsm.mu.Unlock() tsm.mu.Unlock()
@ -245,7 +245,6 @@ type droppedTarget struct {
} }
func (dt *droppedTargets) Register(originalLabels []prompbmarshal.Label) { func (dt *droppedTargets) Register(originalLabels []prompbmarshal.Label) {
key := promLabelsString(originalLabels) key := promLabelsString(originalLabels)
currentTime := fasttime.UnixTimestamp() currentTime := fasttime.UnixTimestamp()
dt.mu.Lock() dt.mu.Lock()

View file

@ -1,9 +1,9 @@
package common package common
import ( import (
"runtime"
"sync" "sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
) )
@ -25,7 +25,7 @@ func StartUnmarshalWorkers() {
if unmarshalWorkCh != nil { if unmarshalWorkCh != nil {
logger.Panicf("BUG: it looks like startUnmarshalWorkers() has been alread called without stopUnmarshalWorkers()") logger.Panicf("BUG: it looks like startUnmarshalWorkers() has been alread called without stopUnmarshalWorkers()")
} }
gomaxprocs := runtime.GOMAXPROCS(-1) gomaxprocs := cgroup.AvailableCPUs()
unmarshalWorkCh = make(chan UnmarshalWork, 2*gomaxprocs) unmarshalWorkCh = make(chan UnmarshalWork, 2*gomaxprocs)
unmarshalWorkersWG.Add(gomaxprocs) unmarshalWorkersWG.Add(gomaxprocs)
for i := 0; i < gomaxprocs; i++ { for i := 0; i < gomaxprocs; i++ {

View file

@ -6,11 +6,11 @@ import (
"fmt" "fmt"
"io" "io"
"net/http" "net/http"
"runtime"
"sync" "sync"
"time" "time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
"github.com/VictoriaMetrics/metrics" "github.com/VictoriaMetrics/metrics"
) )
@ -142,7 +142,7 @@ func putStreamContext(ctx *streamContext) {
} }
var streamContextPool sync.Pool var streamContextPool sync.Pool
var streamContextPoolCh = make(chan *streamContext, runtime.GOMAXPROCS(-1)) var streamContextPoolCh = make(chan *streamContext, cgroup.AvailableCPUs())
type unmarshalWork struct { type unmarshalWork struct {
rows Rows rows Rows

View file

@ -5,11 +5,11 @@ import (
"flag" "flag"
"fmt" "fmt"
"io" "io"
"runtime"
"sync" "sync"
"time" "time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
"github.com/VictoriaMetrics/metrics" "github.com/VictoriaMetrics/metrics"
@ -127,7 +127,7 @@ func putStreamContext(ctx *streamContext) {
} }
var streamContextPool sync.Pool var streamContextPool sync.Pool
var streamContextPoolCh = make(chan *streamContext, runtime.GOMAXPROCS(-1)) var streamContextPoolCh = make(chan *streamContext, cgroup.AvailableCPUs())
type unmarshalWork struct { type unmarshalWork struct {
rows Rows rows Rows

View file

@ -69,7 +69,7 @@ func (r *Row) unmarshal(s string, tagsPool []Tag, fieldsPool []Field, noEscapeCh
return tagsPool, fieldsPool, fmt.Errorf("cannot find Whitespace I in %q", s) return tagsPool, fieldsPool, fmt.Errorf("cannot find Whitespace I in %q", s)
} }
measurementTags := s[:n] measurementTags := s[:n]
s = s[n+1:] s = stripLeadingWhitespace(s[n+1:])
// Parse measurement and tags // Parse measurement and tags
var err error var err error
@ -110,7 +110,7 @@ func (r *Row) unmarshal(s string, tagsPool []Tag, fieldsPool []Field, noEscapeCh
return tagsPool, fieldsPool, err return tagsPool, fieldsPool, err
} }
r.Fields = fieldsPool[fieldsStart:] r.Fields = fieldsPool[fieldsStart:]
s = s[n+1:] s = stripLeadingWhitespace(s[n+1:])
// Parse timestamp // Parse timestamp
timestamp, err := fastfloat.ParseInt64(s) timestamp, err := fastfloat.ParseInt64(s)
@ -409,3 +409,10 @@ func isInQuote(s string, noEscapeChars bool) bool {
s = s[n+1:] s = s[n+1:]
} }
} }
func stripLeadingWhitespace(s string) string {
for len(s) > 0 && s[0] == ' ' {
s = s[1:]
}
return s
}

View file

@ -468,6 +468,36 @@ func TestRowsUnmarshalSuccess(t *testing.T) {
}, },
}) })
// Superfluous whitespace between tags, fields and timestamps.
f(`cpu_utilization,host=mnsbook-pro.local value=119.8 1607222595591`, &Rows{
Rows: []Row{{
Measurement: "cpu_utilization",
Tags: []Tag{{
Key: "host",
Value: "mnsbook-pro.local",
}},
Fields: []Field{{
Key: "value",
Value: 119.8,
}},
Timestamp: 1607222595591,
}},
})
f(`cpu_utilization,host=mnsbook-pro.local value=119.8 1607222595591`, &Rows{
Rows: []Row{{
Measurement: "cpu_utilization",
Tags: []Tag{{
Key: "host",
Value: "mnsbook-pro.local",
}},
Fields: []Field{{
Key: "value",
Value: 119.8,
}},
Timestamp: 1607222595591,
}},
})
f("x,y=z,g=p:\\ \\ 5432\\,\\ gp\\ mon\\ [lol]\\ con10\\ cmd5\\ SELECT f=1", &Rows{ f("x,y=z,g=p:\\ \\ 5432\\,\\ gp\\ mon\\ [lol]\\ con10\\ cmd5\\ SELECT f=1", &Rows{
Rows: []Row{{ Rows: []Row{{
Measurement: "x", Measurement: "x",

View file

@ -5,11 +5,11 @@ import (
"flag" "flag"
"fmt" "fmt"
"io" "io"
"runtime"
"sync" "sync"
"time" "time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
"github.com/VictoriaMetrics/metrics" "github.com/VictoriaMetrics/metrics"
@ -155,7 +155,7 @@ func putStreamContext(ctx *streamContext) {
} }
var streamContextPool sync.Pool var streamContextPool sync.Pool
var streamContextPoolCh = make(chan *streamContext, runtime.GOMAXPROCS(-1)) var streamContextPoolCh = make(chan *streamContext, cgroup.AvailableCPUs())
type unmarshalWork struct { type unmarshalWork struct {
rows Rows rows Rows

View file

@ -5,11 +5,11 @@ import (
"flag" "flag"
"fmt" "fmt"
"io" "io"
"runtime"
"sync" "sync"
"time" "time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
"github.com/VictoriaMetrics/metrics" "github.com/VictoriaMetrics/metrics"
@ -126,7 +126,7 @@ func putStreamContext(ctx *streamContext) {
} }
var streamContextPool sync.Pool var streamContextPool sync.Pool
var streamContextPoolCh = make(chan *streamContext, runtime.GOMAXPROCS(-1)) var streamContextPoolCh = make(chan *streamContext, cgroup.AvailableCPUs())
type unmarshalWork struct { type unmarshalWork struct {
rows Rows rows Rows

View file

@ -6,11 +6,11 @@ import (
"fmt" "fmt"
"io" "io"
"net/http" "net/http"
"runtime"
"sync" "sync"
"time" "time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
@ -149,7 +149,7 @@ func putStreamContext(ctx *streamContext) {
} }
var streamContextPool sync.Pool var streamContextPool sync.Pool
var streamContextPoolCh = make(chan *streamContext, runtime.GOMAXPROCS(-1)) var streamContextPoolCh = make(chan *streamContext, cgroup.AvailableCPUs())
func getRows() *Rows { func getRows() *Rows {
v := rowsPool.Get() v := rowsPool.Get()

View file

@ -4,11 +4,11 @@ import (
"bufio" "bufio"
"fmt" "fmt"
"io" "io"
"runtime"
"sync" "sync"
"time" "time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
"github.com/VictoriaMetrics/metrics" "github.com/VictoriaMetrics/metrics"
) )
@ -128,7 +128,7 @@ func putStreamContext(ctx *streamContext) {
} }
var streamContextPool sync.Pool var streamContextPool sync.Pool
var streamContextPoolCh = make(chan *streamContext, runtime.GOMAXPROCS(-1)) var streamContextPoolCh = make(chan *streamContext, cgroup.AvailableCPUs())
type unmarshalWork struct { type unmarshalWork struct {
rows Rows rows Rows

View file

@ -5,10 +5,10 @@ import (
"fmt" "fmt"
"io" "io"
"net/http" "net/http"
"runtime"
"sync" "sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
@ -123,7 +123,7 @@ func putPushCtx(ctx *pushCtx) {
} }
var pushCtxPool sync.Pool var pushCtxPool sync.Pool
var pushCtxPoolCh = make(chan *pushCtx, runtime.GOMAXPROCS(-1)) var pushCtxPoolCh = make(chan *pushCtx, cgroup.AvailableCPUs())
func getWriteRequest() *prompb.WriteRequest { func getWriteRequest() *prompb.WriteRequest {
v := writeRequestPool.Get() v := writeRequestPool.Get()

View file

@ -5,10 +5,10 @@ import (
"fmt" "fmt"
"io" "io"
"net/http" "net/http"
"runtime"
"sync" "sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
"github.com/VictoriaMetrics/metrics" "github.com/VictoriaMetrics/metrics"
@ -132,7 +132,7 @@ func putStreamContext(ctx *streamContext) {
} }
var streamContextPool sync.Pool var streamContextPool sync.Pool
var streamContextPoolCh = make(chan *streamContext, runtime.GOMAXPROCS(-1)) var streamContextPoolCh = make(chan *streamContext, cgroup.AvailableCPUs())
type unmarshalWork struct { type unmarshalWork struct {
rows Rows rows Rows

View file

@ -7,7 +7,6 @@ import (
"math/bits" "math/bits"
"os" "os"
"path/filepath" "path/filepath"
"runtime"
"sort" "sort"
"strings" "strings"
"sync" "sync"
@ -16,6 +15,7 @@ import (
"unsafe" "unsafe"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding" "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
@ -77,7 +77,7 @@ const finalPartsToMerge = 3
// The number of shards for rawRow entries per partition. // The number of shards for rawRow entries per partition.
// //
// Higher number of shards reduces CPU contention and increases the max bandwidth on multi-core systems. // Higher number of shards reduces CPU contention and increases the max bandwidth on multi-core systems.
var rawRowsShardsPerPartition = (runtime.GOMAXPROCS(-1) + 7) / 8 var rawRowsShardsPerPartition = (cgroup.AvailableCPUs() + 7) / 8
// getMaxRowsPerPartition returns the maximum number of rows that haven't been converted into parts yet. // getMaxRowsPerPartition returns the maximum number of rows that haven't been converted into parts yet.
func getMaxRawRowsPerPartition() int { func getMaxRawRowsPerPartition() int {
@ -882,8 +882,8 @@ func hasActiveMerges(pws []*partWrapper) bool {
} }
var ( var (
bigMergeWorkersCount = (runtime.GOMAXPROCS(-1) + 1) / 2 bigMergeWorkersCount = (cgroup.AvailableCPUs() + 1) / 2
smallMergeWorkersCount = (runtime.GOMAXPROCS(-1) + 1) / 2 smallMergeWorkersCount = (cgroup.AvailableCPUs() + 1) / 2
) )
// SetBigMergeWorkersCount sets the maximum number of concurrent mergers for big blocks. // SetBigMergeWorkersCount sets the maximum number of concurrent mergers for big blocks.

View file

@ -8,13 +8,13 @@ import (
"os" "os"
"path/filepath" "path/filepath"
"regexp" "regexp"
"runtime"
"sort" "sort"
"sync" "sync"
"sync/atomic" "sync/atomic"
"time" "time"
"unsafe" "unsafe"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding" "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
@ -873,7 +873,7 @@ var (
// Limit the concurrency for TSID searches to GOMAXPROCS*2, since this operation // Limit the concurrency for TSID searches to GOMAXPROCS*2, since this operation
// is CPU bound and sometimes disk IO bound, so there is no sense in running more // is CPU bound and sometimes disk IO bound, so there is no sense in running more
// than GOMAXPROCS*2 concurrent goroutines for TSID searches. // than GOMAXPROCS*2 concurrent goroutines for TSID searches.
searchTSIDsConcurrencyCh = make(chan struct{}, runtime.GOMAXPROCS(-1)*2) searchTSIDsConcurrencyCh = make(chan struct{}, cgroup.AvailableCPUs()*2)
) )
// prefetchMetricNames pre-fetches metric names for the given tsids into metricID->metricName cache. // prefetchMetricNames pre-fetches metric names for the given tsids into metricID->metricName cache.
@ -1149,7 +1149,7 @@ var (
// Limit the concurrency for data ingestion to GOMAXPROCS, since this operation // Limit the concurrency for data ingestion to GOMAXPROCS, since this operation
// is CPU bound, so there is no sense in running more than GOMAXPROCS concurrent // is CPU bound, so there is no sense in running more than GOMAXPROCS concurrent
// goroutines on data ingestion path. // goroutines on data ingestion path.
addRowsConcurrencyCh = make(chan struct{}, runtime.GOMAXPROCS(-1)) addRowsConcurrencyCh = make(chan struct{}, cgroup.AvailableCPUs())
addRowsTimeout = 30 * time.Second addRowsTimeout = 30 * time.Second
) )

View file

@ -4,11 +4,12 @@ import (
"fmt" "fmt"
"math/rand" "math/rand"
"os" "os"
"runtime"
"sync" "sync"
"sync/atomic" "sync/atomic"
"testing" "testing"
"time" "time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
) )
func TestMain(m *testing.M) { func TestMain(m *testing.M) {
@ -79,7 +80,7 @@ func createBenchTable(b *testing.B, path string, startTimestamp int64, rowsPerIn
timestamp := uint64(startTimestamp) timestamp := uint64(startTimestamp)
var wg sync.WaitGroup var wg sync.WaitGroup
for k := 0; k < runtime.GOMAXPROCS(-1); k++ { for k := 0; k < cgroup.AvailableCPUs(); k++ {
wg.Add(1) wg.Add(1)
go func() { go func() {
rows := make([]rawRow, rowsPerInsert) rows := make([]rawRow, rowsPerInsert)

View file

@ -4,9 +4,10 @@ import (
"fmt" "fmt"
"math/rand" "math/rand"
"os" "os"
"runtime"
"testing" "testing"
"time" "time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
) )
func BenchmarkTableAddRows(b *testing.B) { func BenchmarkTableAddRows(b *testing.B) {
@ -57,7 +58,7 @@ func benchmarkTableAddRows(b *testing.B, rowsPerInsert, tsidsCount int) {
close(workCh) close(workCh)
doneCh := make(chan struct{}) doneCh := make(chan struct{})
gomaxprocs := runtime.GOMAXPROCS(-1) gomaxprocs := cgroup.AvailableCPUs()
for j := 0; j < gomaxprocs; j++ { for j := 0; j < gomaxprocs; j++ {
go func(goroutineID int) { go func(goroutineID int) {

View file

@ -1,11 +1,11 @@
package workingsetcache package workingsetcache
import ( import (
"runtime"
"sync" "sync"
"sync/atomic" "sync/atomic"
"time" "time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/fastcache" "github.com/VictoriaMetrics/fastcache"
) )
@ -198,7 +198,7 @@ func (c *Cache) cacheSizeWatcher(maxBytes int) {
// Save safes the cache to filePath. // Save safes the cache to filePath.
func (c *Cache) Save(filePath string) error { func (c *Cache) Save(filePath string) error {
curr := c.curr.Load().(*fastcache.Cache) curr := c.curr.Load().(*fastcache.Cache)
concurrency := runtime.GOMAXPROCS(-1) concurrency := cgroup.AvailableCPUs()
return curr.SaveToFileConcurrent(filePath, concurrency) return curr.SaveToFileConcurrent(filePath, concurrency)
} }

View file

@ -4,16 +4,16 @@ import (
"flag" "flag"
"fmt" "fmt"
"net/http" "net/http"
"runtime"
"time" "time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver" "github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool" "github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool"
"github.com/VictoriaMetrics/metrics" "github.com/VictoriaMetrics/metrics"
) )
var ( var (
maxConcurrentInserts = flag.Int("maxConcurrentInserts", runtime.GOMAXPROCS(-1)*4, "The maximum number of concurrent inserts. Default value should work for most cases, "+ maxConcurrentInserts = flag.Int("maxConcurrentInserts", cgroup.AvailableCPUs()*4, "The maximum number of concurrent inserts. Default value should work for most cases, "+
"since it minimizes the overhead for concurrent inserts. This option is tigthly coupled with -insert.maxQueueDuration") "since it minimizes the overhead for concurrent inserts. This option is tigthly coupled with -insert.maxQueueDuration")
maxQueueDuration = flag.Duration("insert.maxQueueDuration", time.Minute, "The maximum duration for waiting in the queue for insert requests due to -maxConcurrentInserts") maxQueueDuration = flag.Duration("insert.maxQueueDuration", time.Minute, "The maximum duration for waiting in the queue for insert requests due to -maxConcurrentInserts")
) )