From cae61c85d4cb35b9bdf29d665754194263e499ef Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Tue, 5 Apr 2022 20:32:50 +0300 Subject: [PATCH 01/13] vendor: update github.com/VictoriaMetrics/fastcache from v1.9.0 to v1.10.0 --- go.mod | 4 ++-- go.sum | 9 ++++----- .../github.com/VictoriaMetrics/fastcache/fastcache.go | 5 +---- vendor/golang.org/x/sys/unix/syscall_linux_amd64.go | 1 + vendor/golang.org/x/sys/unix/syscall_linux_arm64.go | 1 + vendor/golang.org/x/sys/unix/syscall_linux_riscv64.go | 1 + vendor/golang.org/x/sys/unix/zsyscall_linux_amd64.go | 11 +++++++++++ vendor/golang.org/x/sys/unix/zsyscall_linux_arm64.go | 11 +++++++++++ .../golang.org/x/sys/unix/zsyscall_linux_riscv64.go | 11 +++++++++++ vendor/modules.txt | 4 ++-- 10 files changed, 45 insertions(+), 13 deletions(-) diff --git a/go.mod b/go.mod index 88c813ed7..d90d2430c 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,7 @@ go 1.17 require ( cloud.google.com/go/storage v1.21.0 - github.com/VictoriaMetrics/fastcache v1.9.0 + github.com/VictoriaMetrics/fastcache v1.10.0 // Do not use the original github.com/valyala/fasthttp because of issues // like https://github.com/valyala/fasthttp/commit/996610f021ff45fdc98c2ce7884d5fa4e7f9199b @@ -26,7 +26,7 @@ require ( github.com/valyala/quicktemplate v1.7.0 golang.org/x/net v0.0.0-20220403103023-749bd193bc2b golang.org/x/oauth2 v0.0.0-20220309155454-6242fa91716a - golang.org/x/sys v0.0.0-20220403205710-6acee93ad0eb + golang.org/x/sys v0.0.0-20220405052023-b1e9470b6e64 google.golang.org/api v0.74.0 gopkg.in/yaml.v2 v2.4.0 ) diff --git a/go.sum b/go.sum index bc2087a91..a7e67e894 100644 --- a/go.sum +++ b/go.sum @@ -113,8 +113,8 @@ github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578/go.mod h1:uGdko github.com/SAP/go-hdb v0.14.1/go.mod h1:7fdQLVC2lER3urZLjZCm0AuMQfApof92n3aylBPEkMo= github.com/Shopify/sarama v1.19.0/go.mod h1:FVkBWblsNy7DGZRfXLU0O9RCGt5g3g3yEuWXgklEdEo= github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI= -github.com/VictoriaMetrics/fastcache v1.9.0 h1:oMwsS6c8abz98B7ytAewQ7M1ZN/Im/iwKoE1euaFvhs= -github.com/VictoriaMetrics/fastcache v1.9.0/go.mod h1:otoTS3xu+6IzF/qByjqzjp3rTuzM3Qf0ScU1UTj97iU= +github.com/VictoriaMetrics/fastcache v1.10.0 h1:5hDJnLsKLpnUEToub7ETuRu8RCkb40woBZAUiKonXzY= +github.com/VictoriaMetrics/fastcache v1.10.0/go.mod h1:tjiYeEfYXCqacuvYw/7UoDIeJaNxq6132xHICNP77w8= github.com/VictoriaMetrics/fasthttp v1.1.0 h1:3crd4YWHsMwu60GUXRH6OstowiFvqrwS4a/ueoLdLL0= github.com/VictoriaMetrics/fasthttp v1.1.0/go.mod h1:/7DMcogqd+aaD3G3Hg5kFgoFwlR2uydjiWvoLp5ZTqQ= github.com/VictoriaMetrics/metrics v1.18.1 h1:OZ0+kTTto8oPfHnVAnTOoyl0XlRhRkoQrD2n2cOuRw0= @@ -1315,12 +1315,11 @@ golang.org/x/sys v0.0.0-20211210111614-af8b64212486/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220114195835-da31bd327af9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220128215802-99c3d69c2c27/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20220204135822-1c1b9b1eba6a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220209214540-3681064d5158/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220227234510-4e6760a101f9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220328115105-d36c6a25d886/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20220403205710-6acee93ad0eb h1:PVGECzEo9Y3uOidtkHGdd347NjLtITfJFO9BxFpmRoo= -golang.org/x/sys v0.0.0-20220403205710-6acee93ad0eb/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220405052023-b1e9470b6e64 h1:D1v9ucDTYBtbz5vNuBbAhIMAGhQhJ6Ym5ah3maMVNX4= +golang.org/x/sys v0.0.0-20220405052023-b1e9470b6e64/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= diff --git a/vendor/github.com/VictoriaMetrics/fastcache/fastcache.go b/vendor/github.com/VictoriaMetrics/fastcache/fastcache.go index daa3db773..092ba3719 100644 --- a/vendor/github.com/VictoriaMetrics/fastcache/fastcache.go +++ b/vendor/github.com/VictoriaMetrics/fastcache/fastcache.go @@ -257,10 +257,7 @@ func (b *bucket) Reset() { putChunk(chunks[i]) chunks[i] = nil } - bm := b.m - for k := range bm { - delete(bm, k) - } + b.m = make(map[uint64]uint64) b.idx = 0 b.gen = 1 atomic.StoreUint64(&b.getCalls, 0) diff --git a/vendor/golang.org/x/sys/unix/syscall_linux_amd64.go b/vendor/golang.org/x/sys/unix/syscall_linux_amd64.go index b945ab254..f5e9d6bef 100644 --- a/vendor/golang.org/x/sys/unix/syscall_linux_amd64.go +++ b/vendor/golang.org/x/sys/unix/syscall_linux_amd64.go @@ -28,6 +28,7 @@ func Lstat(path string, stat *Stat_t) (err error) { return Fstatat(AT_FDCWD, path, stat, AT_SYMLINK_NOFOLLOW) } +//sys MemfdSecret(flags int) (fd int, err error) //sys Pause() (err error) //sys pread(fd int, p []byte, offset int64) (n int, err error) = SYS_PREAD64 //sys pwrite(fd int, p []byte, offset int64) (n int, err error) = SYS_PWRITE64 diff --git a/vendor/golang.org/x/sys/unix/syscall_linux_arm64.go b/vendor/golang.org/x/sys/unix/syscall_linux_arm64.go index 81db4833a..d83e2c657 100644 --- a/vendor/golang.org/x/sys/unix/syscall_linux_arm64.go +++ b/vendor/golang.org/x/sys/unix/syscall_linux_arm64.go @@ -22,6 +22,7 @@ import "unsafe" //sysnb getrlimit(resource int, rlim *Rlimit) (err error) //sysnb Getuid() (uid int) //sys Listen(s int, n int) (err error) +//sys MemfdSecret(flags int) (fd int, err error) //sys pread(fd int, p []byte, offset int64) (n int, err error) = SYS_PREAD64 //sys pwrite(fd int, p []byte, offset int64) (n int, err error) = SYS_PWRITE64 //sys Renameat(olddirfd int, oldpath string, newdirfd int, newpath string) (err error) diff --git a/vendor/golang.org/x/sys/unix/syscall_linux_riscv64.go b/vendor/golang.org/x/sys/unix/syscall_linux_riscv64.go index 8ff7adba0..925a748a3 100644 --- a/vendor/golang.org/x/sys/unix/syscall_linux_riscv64.go +++ b/vendor/golang.org/x/sys/unix/syscall_linux_riscv64.go @@ -22,6 +22,7 @@ import "unsafe" //sysnb Getrlimit(resource int, rlim *Rlimit) (err error) //sysnb Getuid() (uid int) //sys Listen(s int, n int) (err error) +//sys MemfdSecret(flags int) (fd int, err error) //sys pread(fd int, p []byte, offset int64) (n int, err error) = SYS_PREAD64 //sys pwrite(fd int, p []byte, offset int64) (n int, err error) = SYS_PWRITE64 //sys Seek(fd int, offset int64, whence int) (off int64, err error) = SYS_LSEEK diff --git a/vendor/golang.org/x/sys/unix/zsyscall_linux_amd64.go b/vendor/golang.org/x/sys/unix/zsyscall_linux_amd64.go index c947a4d10..2a0c4aa6a 100644 --- a/vendor/golang.org/x/sys/unix/zsyscall_linux_amd64.go +++ b/vendor/golang.org/x/sys/unix/zsyscall_linux_amd64.go @@ -215,6 +215,17 @@ func Listen(s int, n int) (err error) { // THIS FILE IS GENERATED BY THE COMMAND AT THE TOP; DO NOT EDIT +func MemfdSecret(flags int) (fd int, err error) { + r0, _, e1 := Syscall(SYS_MEMFD_SECRET, uintptr(flags), 0, 0) + fd = int(r0) + if e1 != 0 { + err = errnoErr(e1) + } + return +} + +// THIS FILE IS GENERATED BY THE COMMAND AT THE TOP; DO NOT EDIT + func Pause() (err error) { _, _, e1 := Syscall(SYS_PAUSE, 0, 0, 0) if e1 != 0 { diff --git a/vendor/golang.org/x/sys/unix/zsyscall_linux_arm64.go b/vendor/golang.org/x/sys/unix/zsyscall_linux_arm64.go index dd15284d8..9f8c24e43 100644 --- a/vendor/golang.org/x/sys/unix/zsyscall_linux_arm64.go +++ b/vendor/golang.org/x/sys/unix/zsyscall_linux_arm64.go @@ -180,6 +180,17 @@ func Listen(s int, n int) (err error) { // THIS FILE IS GENERATED BY THE COMMAND AT THE TOP; DO NOT EDIT +func MemfdSecret(flags int) (fd int, err error) { + r0, _, e1 := Syscall(SYS_MEMFD_SECRET, uintptr(flags), 0, 0) + fd = int(r0) + if e1 != 0 { + err = errnoErr(e1) + } + return +} + +// THIS FILE IS GENERATED BY THE COMMAND AT THE TOP; DO NOT EDIT + func pread(fd int, p []byte, offset int64) (n int, err error) { var _p0 unsafe.Pointer if len(p) > 0 { diff --git a/vendor/golang.org/x/sys/unix/zsyscall_linux_riscv64.go b/vendor/golang.org/x/sys/unix/zsyscall_linux_riscv64.go index a1a9bcbbd..1239cc2de 100644 --- a/vendor/golang.org/x/sys/unix/zsyscall_linux_riscv64.go +++ b/vendor/golang.org/x/sys/unix/zsyscall_linux_riscv64.go @@ -180,6 +180,17 @@ func Listen(s int, n int) (err error) { // THIS FILE IS GENERATED BY THE COMMAND AT THE TOP; DO NOT EDIT +func MemfdSecret(flags int) (fd int, err error) { + r0, _, e1 := Syscall(SYS_MEMFD_SECRET, uintptr(flags), 0, 0) + fd = int(r0) + if e1 != 0 { + err = errnoErr(e1) + } + return +} + +// THIS FILE IS GENERATED BY THE COMMAND AT THE TOP; DO NOT EDIT + func pread(fd int, p []byte, offset int64) (n int, err error) { var _p0 unsafe.Pointer if len(p) > 0 { diff --git a/vendor/modules.txt b/vendor/modules.txt index 5e739fe31..738a6f750 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -16,7 +16,7 @@ cloud.google.com/go/iam cloud.google.com/go/storage cloud.google.com/go/storage/internal cloud.google.com/go/storage/internal/apiv2 -# github.com/VictoriaMetrics/fastcache v1.9.0 +# github.com/VictoriaMetrics/fastcache v1.10.0 ## explicit; go 1.13 github.com/VictoriaMetrics/fastcache # github.com/VictoriaMetrics/fasthttp v1.1.0 @@ -290,7 +290,7 @@ golang.org/x/oauth2/jwt # golang.org/x/sync v0.0.0-20210220032951-036812b2e83c ## explicit golang.org/x/sync/errgroup -# golang.org/x/sys v0.0.0-20220403205710-6acee93ad0eb +# golang.org/x/sys v0.0.0-20220405052023-b1e9470b6e64 ## explicit; go 1.17 golang.org/x/sys/internal/unsafeheader golang.org/x/sys/unix From 319e910897da00b76bfc5987edc70de971197bda Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Tue, 5 Apr 2022 20:37:40 +0300 Subject: [PATCH 02/13] lib/workingsetcache: reuse prev cache after its reset This should reduce memory churn rate --- lib/workingsetcache/cache.go | 15 +++++---------- 1 file changed, 5 insertions(+), 10 deletions(-) diff --git a/lib/workingsetcache/cache.go b/lib/workingsetcache/cache.go index 6192fb515..8c4c220b0 100644 --- a/lib/workingsetcache/cache.go +++ b/lib/workingsetcache/cache.go @@ -144,17 +144,12 @@ func (c *Cache) expirationWatcher(expireDuration time.Duration) { c.mu.Unlock() return } - // Expire prev cache and create fresh curr cache with the same capacity. - // Do not reuse prev cache, since it can occupy too big amounts of memory. + // Reset prev cache and swap it with the curr cache. prev := c.prev.Load().(*fastcache.Cache) - prev.Reset() curr := c.curr.Load().(*fastcache.Cache) c.prev.Store(curr) - // Use c.maxBytes/2 instead of cs.MaxBytesSize for creating new cache, - // since cs.MaxBytesSize may not match c.maxBytes/2, so the created cache - // couldn't be loaded from file with c.maxBytes/2 limit after saving with cs.MaxBytesSize size. - curr = fastcache.New(c.maxBytes / 2) - c.curr.Store(curr) + prev.Reset() + c.curr.Store(prev) c.mu.Unlock() } } @@ -197,9 +192,9 @@ func (c *Cache) cacheSizeWatcher() { c.mu.Lock() c.setMode(switching) prev := c.prev.Load().(*fastcache.Cache) - prev.Reset() curr := c.curr.Load().(*fastcache.Cache) c.prev.Store(curr) + prev.Reset() // use c.maxBytes instead of maxBytesSize*2 for creating new cache, since otherwise the created cache // couldn't be loaded from file with c.maxBytes limit after saving with maxBytesSize*2 limit. c.curr.Store(fastcache.New(c.maxBytes)) @@ -222,8 +217,8 @@ func (c *Cache) cacheSizeWatcher() { c.mu.Lock() c.setMode(whole) prev = c.prev.Load().(*fastcache.Cache) - prev.Reset() c.prev.Store(fastcache.New(1024)) + prev.Reset() c.mu.Unlock() } From cde1e2ec93b0b47628b47e4ad0861c4beaecde65 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Wed, 6 Apr 2022 11:41:09 +0300 Subject: [PATCH 03/13] docs/Release-Guide.md: add missing steps --- docs/Release-Guide.md | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/docs/Release-Guide.md b/docs/Release-Guide.md index 21fa41b48..5f133cc11 100644 --- a/docs/Release-Guide.md +++ b/docs/Release-Guide.md @@ -6,6 +6,7 @@ sort: 18 ## Release version and Docker images +0. Make sure that the release commits have no security issues. 1. Document all the changes for new release in [CHANGELOG.md](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/docs/CHANGELOG.md). 2. Create the following release tags: * `git tag -s v1.xx.y` in `master` branch @@ -13,8 +14,9 @@ sort: 18 * `git tag -s v1.xx.y-enterprise` in `enterprise` branch * `git tag -s v1.xx.y-enterprise-cluster` in `enterprise-cluster` branch 3. Run `TAG=v1.xx.y make publish-release`. It will create `*.tar.gz` release archives with the corresponding `_checksums.txt` files inside `bin` directory and publish Docker images for the given `TAG`, `TAG-cluster`, `TAG-enterprise` and `TAG-enterprise-cluster`. -4. Push release tag to : `git push origin v1.xx.y`. -5. Go to , create new release from the pushed tag on step 5 and upload `*.tar.gz` archive with the corresponding `_checksums.txt` from step 2. +4. Push release tags to : `git push origin v1.xx.y` and `git push origin v1.xx.y-cluster`. Do not push `-enterprise` tags to public repository. +5. Go to , create new release from the pushed tag on step 4 and upload `*.tar.gz` archive with the corresponding `_checksums.txt` from step 3. +6. Copy the [CHANGELOG](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/docs/CHANGELOG.md) for this release to [releases](https://github.com/VictoriaMetrics/VictoriaMetrics/releases) page. ## Building snap package From 7da20a4b3fad761f6582fde6672c1a4138cd5e6d Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Wed, 6 Apr 2022 12:28:54 +0300 Subject: [PATCH 04/13] app/vmagent: reduce the probability of TLS handshake timeout when dialing the remote storage The following actions are taken: - Increase the TLS hashdshake timeout from 5 seconds to 10 seconds - Increase dial timeout from 5 seconds to 30 seconds - Specify DialContext instead of Dial in http.Transport. This allows properly handling the Context arg during dialing the remote storage Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1699 --- app/vmagent/remotewrite/client.go | 4 ++-- app/vmagent/remotewrite/statconn.go | 22 ++++++++++++++++++++-- docs/CHANGELOG.md | 1 + 3 files changed, 23 insertions(+), 4 deletions(-) diff --git a/app/vmagent/remotewrite/client.go b/app/vmagent/remotewrite/client.go index 0493758b8..d265ac0ff 100644 --- a/app/vmagent/remotewrite/client.go +++ b/app/vmagent/remotewrite/client.go @@ -92,9 +92,9 @@ func newHTTPClient(argIdx int, remoteWriteURL, sanitizedURL string, fq *persiste } tlsCfg := authCfg.NewTLSConfig() tr := &http.Transport{ - Dial: statDial, + DialContext: statDial, TLSClientConfig: tlsCfg, - TLSHandshakeTimeout: 5 * time.Second, + TLSHandshakeTimeout: 10 * time.Second, MaxConnsPerHost: 2 * concurrency, MaxIdleConnsPerHost: 2 * concurrency, IdleConnTimeout: time.Minute, diff --git a/app/vmagent/remotewrite/statconn.go b/app/vmagent/remotewrite/statconn.go index 5d597e9bf..9aa064c29 100644 --- a/app/vmagent/remotewrite/statconn.go +++ b/app/vmagent/remotewrite/statconn.go @@ -1,7 +1,9 @@ package remotewrite import ( + "context" "net" + "sync" "sync/atomic" "time" @@ -9,9 +11,25 @@ import ( "github.com/VictoriaMetrics/metrics" ) -func statDial(networkUnused, addr string) (conn net.Conn, err error) { +func getStdDialer() *net.Dialer { + stdDialerOnce.Do(func() { + stdDialer = &net.Dialer{ + Timeout: 30 * time.Second, + KeepAlive: 30 * time.Second, + DualStack: netutil.TCP6Enabled(), + } + }) + return stdDialer +} + +var ( + stdDialer *net.Dialer + stdDialerOnce sync.Once +) + +func statDial(ctx context.Context, networkUnused, addr string) (conn net.Conn, err error) { network := netutil.GetTCPNetwork() - conn, err = net.DialTimeout(network, addr, 5*time.Second) + conn, err = stdDialer.DialContext(ctx, network, addr) dialsTotal.Inc() if err != nil { dialErrors.Inc() diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 85a4b010a..cf6cdb785 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -33,6 +33,7 @@ Previously the `-search.maxUniqueTimeseries` command-line flag was used as a glo When using [cluster version of VictoriaMetrics](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html), these command-line flags (including `-search.maxUniqueTimeseries`) must be passed to `vmselect` instead of `vmstorage`. +* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html) and [vmauth](https://docs.victoriametrics.com/vmauth.html): reduce the probability of `TLS handshake error from XX.XX.XX.XX: EOF` errors when `-remoteWrite.url` points to HTTPS url at `vmauth`. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1699). * BUGFIX: return `Content-Type: text/html` response header when requesting `/` HTTP path at VictoriaMetrics components. Previously `text/plain` response header was returned, which could lead to broken page formatting. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2323). * BUGFIX: [Graphite Render API](https://docs.victoriametrics.com/#graphite-render-api-usage): accept floating-point values for [maxDataPoints](https://graphite.readthedocs.io/en/stable/render_api.html#maxdatapoints) query arg, since some clients send floating-point values instead of integer values for this arg. From 077193d87caac981c78a528bd3ac2c95b0ff0620 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Wed, 6 Apr 2022 13:32:01 +0300 Subject: [PATCH 05/13] lib/cgroup: reduce the default GOGC value from 50% to 30% This reduces memory usage under production workloads by up to 10%, while CPU spent on GC remains roughly the same. The CPU spent on GC can be monitored with go_memstats_gc_cpu_fraction metric --- lib/cgroup/mem.go | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/lib/cgroup/mem.go b/lib/cgroup/mem.go index 2fa34a11a..f35d3cbad 100644 --- a/lib/cgroup/mem.go +++ b/lib/cgroup/mem.go @@ -19,15 +19,17 @@ func init() { func initGOGC() { if v := os.Getenv("GOGC"); v != "" { - n, err := strconv.Atoi(v) + n, err := strconv.ParseFloat(v, 64) if err != nil { n = 100 } - gogc = n + gogc = int(n) } else { - // Set GOGC to 50% by default if it isn't set yet. - // This should reduce memory usage for typical workloads for VictoriaMetrics components. - gogc = 50 + // Use lower GOGC if it isn't set yet. + // This should reduce memory usage for typical workloads for VictoriaMetrics components + // at the cost of increased CPU usage. + // It is recommended increasing GOGC if go_memstats_gc_cpu_fraction exceeds 0.05 for extended periods of time. + gogc = 30 debug.SetGCPercent(gogc) } } From 50cf74ce4bd5795c4933cdabc11043edfcdb92e4 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Wed, 6 Apr 2022 13:34:00 +0300 Subject: [PATCH 06/13] lib/storage: reuse sync.WaitGroup objects This reduces GC load by up to 10% according to memory profiling --- lib/storage/index_db.go | 12 ++++++++---- lib/storage/partition.go | 17 ++++++++++++++++- lib/storage/storage.go | 3 ++- 3 files changed, 26 insertions(+), 6 deletions(-) diff --git a/lib/storage/index_db.go b/lib/storage/index_db.go index bab0783d1..9aecc4f86 100644 --- a/lib/storage/index_db.go +++ b/lib/storage/index_db.go @@ -724,7 +724,7 @@ func (is *indexSearch) searchTagKeysOnTimeRange(tks map[string]struct{}, tr Time return is.searchTagKeys(tks, maxTagKeys) } var mu sync.Mutex - var wg sync.WaitGroup + wg := getWaitGroup() var errGlobal error for date := minDate; date <= maxDate; date++ { wg.Add(1) @@ -752,6 +752,7 @@ func (is *indexSearch) searchTagKeysOnTimeRange(tks map[string]struct{}, tr Time }(date) } wg.Wait() + putWaitGroup(wg) return errGlobal } @@ -926,7 +927,7 @@ func (is *indexSearch) searchTagValuesOnTimeRange(tvs map[string]struct{}, tagKe return is.searchTagValues(tvs, tagKey, maxTagValues) } var mu sync.Mutex - var wg sync.WaitGroup + wg := getWaitGroup() var errGlobal error for date := minDate; date <= maxDate; date++ { wg.Add(1) @@ -954,6 +955,7 @@ func (is *indexSearch) searchTagValuesOnTimeRange(tvs map[string]struct{}, tagKe }(date) } wg.Wait() + putWaitGroup(wg) return errGlobal } @@ -1141,7 +1143,7 @@ func (is *indexSearch) searchTagValueSuffixesForTimeRange(tvss map[string]struct return is.searchTagValueSuffixesAll(tvss, tagKey, tagValuePrefix, delimiter, maxTagValueSuffixes) } // Query over multiple days in parallel. - var wg sync.WaitGroup + wg := getWaitGroup() var errGlobal error var mu sync.Mutex // protects tvss + errGlobal from concurrent access below. for minDate <= maxDate { @@ -1171,6 +1173,7 @@ func (is *indexSearch) searchTagValueSuffixesForTimeRange(tvss map[string]struct minDate++ } wg.Wait() + putWaitGroup(wg) return errGlobal } @@ -2446,7 +2449,7 @@ func (is *indexSearch) tryUpdatingMetricIDsForDateRange(metricIDs *uint64set.Set } // Slower path - search for metricIDs for each day in parallel. - var wg sync.WaitGroup + wg := getWaitGroup() var errGlobal error var mu sync.Mutex // protects metricIDs + errGlobal vars from concurrent access below for minDate <= maxDate { @@ -2473,6 +2476,7 @@ func (is *indexSearch) tryUpdatingMetricIDsForDateRange(metricIDs *uint64set.Set minDate++ } wg.Wait() + putWaitGroup(wg) if errGlobal != nil { return errGlobal } diff --git a/lib/storage/partition.go b/lib/storage/partition.go index 87efa5b4e..91baae16d 100644 --- a/lib/storage/partition.go +++ b/lib/storage/partition.go @@ -481,7 +481,7 @@ func (rrs *rawRowsShard) addRows(pt *partition, rows []rawRow) { func (pt *partition) flushRowsToParts(rows []rawRow) { maxRows := getMaxRawRowsPerShard() - var wg sync.WaitGroup + wg := getWaitGroup() for len(rows) > 0 { n := maxRows if n > len(rows) { @@ -495,8 +495,23 @@ func (pt *partition) flushRowsToParts(rows []rawRow) { rows = rows[n:] } wg.Wait() + putWaitGroup(wg) } +func getWaitGroup() *sync.WaitGroup { + v := wgPool.Get() + if v == nil { + return &sync.WaitGroup{} + } + return v.(*sync.WaitGroup) +} + +func putWaitGroup(wg *sync.WaitGroup) { + wgPool.Put(wg) +} + +var wgPool sync.Pool + func (pt *partition) addRowsPart(rows []rawRow) { if len(rows) == 0 { return diff --git a/lib/storage/storage.go b/lib/storage/storage.go index 848239b6c..95d4f88a8 100644 --- a/lib/storage/storage.go +++ b/lib/storage/storage.go @@ -166,10 +166,11 @@ func OpenStorage(path string, retentionMsecs int64, maxHourlySeries, maxDailySer // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1447 for details. if fs.IsPathExist(s.cachePath + "/reset_cache_on_startup") { logger.Infof("removing cache directory at %q, since it contains `reset_cache_on_startup` file...", s.cachePath) - var wg sync.WaitGroup + wg := getWaitGroup() wg.Add(1) fs.MustRemoveAllWithDoneCallback(s.cachePath, wg.Done) wg.Wait() + putWaitGroup(wg) logger.Infof("cache directory at %q has been successfully removed", s.cachePath) } From 569b0d444c54a645e4bc4642a7935c131263703f Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Wed, 6 Apr 2022 13:56:35 +0300 Subject: [PATCH 07/13] app/vmagent: properly initialize stdDialer This is a follow-up commit for 7da20a4b3fad761f6582fde6672c1a4138cd5e6d Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1699 --- app/vmagent/remotewrite/statconn.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/app/vmagent/remotewrite/statconn.go b/app/vmagent/remotewrite/statconn.go index 9aa064c29..787aa19f5 100644 --- a/app/vmagent/remotewrite/statconn.go +++ b/app/vmagent/remotewrite/statconn.go @@ -29,7 +29,8 @@ var ( func statDial(ctx context.Context, networkUnused, addr string) (conn net.Conn, err error) { network := netutil.GetTCPNetwork() - conn, err = stdDialer.DialContext(ctx, network, addr) + d := getStdDialer() + conn, err = d.DialContext(ctx, network, addr) dialsTotal.Inc() if err != nil { dialErrors.Inc() From 5acd70109b98a05f20375e0b4bca67ad4176ac23 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Wed, 6 Apr 2022 14:00:08 +0300 Subject: [PATCH 08/13] lib/protoparser: remove superflowous memory allocations during protocol parsing --- lib/protoparser/csvimport/streamparser.go | 30 +++++++++++++--------- lib/protoparser/graphite/parser_test.go | 5 +++- lib/protoparser/graphite/streamparser.go | 30 +++++++++++++--------- lib/protoparser/influx/streamparser.go | 30 +++++++++++++--------- lib/protoparser/opentsdb/streamparser.go | 30 +++++++++++++--------- lib/protoparser/prometheus/streamparser.go | 30 +++++++++++++--------- lib/protoparser/vmimport/streamparser.go | 30 +++++++++++++--------- 7 files changed, 112 insertions(+), 73 deletions(-) diff --git a/lib/protoparser/csvimport/streamparser.go b/lib/protoparser/csvimport/streamparser.go index 7f5d5ed19..45a86a35d 100644 --- a/lib/protoparser/csvimport/streamparser.go +++ b/lib/protoparser/csvimport/streamparser.go @@ -45,16 +45,8 @@ func ParseStream(req *http.Request, callback func(rows []Row) error) error { defer putStreamContext(ctx) for ctx.Read() { uw := getUnmarshalWork() - uw.callback = func(rows []Row) { - if err := callback(rows); err != nil { - ctx.callbackErrLock.Lock() - if ctx.callbackErr == nil { - ctx.callbackErr = fmt.Errorf("error when processing imported data: %w", err) - } - ctx.callbackErrLock.Unlock() - } - ctx.wg.Done() - } + uw.ctx = ctx + uw.callback = callback uw.cds = cds uw.reqBuf, ctx.reqBuf = ctx.reqBuf, uw.reqBuf ctx.wg.Add(1) @@ -153,18 +145,32 @@ var streamContextPoolCh = make(chan *streamContext, cgroup.AvailableCPUs()) type unmarshalWork struct { rows Rows - callback func(rows []Row) + ctx *streamContext + callback func(rows []Row) error cds []ColumnDescriptor reqBuf []byte } func (uw *unmarshalWork) reset() { uw.rows.Reset() + uw.ctx = nil uw.callback = nil uw.cds = nil uw.reqBuf = uw.reqBuf[:0] } +func (uw *unmarshalWork) runCallback(rows []Row) { + ctx := uw.ctx + if err := uw.callback(rows); err != nil { + ctx.callbackErrLock.Lock() + if ctx.callbackErr == nil { + ctx.callbackErr = fmt.Errorf("error when processing imported data: %w", err) + } + ctx.callbackErrLock.Unlock() + } + ctx.wg.Done() +} + // Unmarshal implements common.UnmarshalWork func (uw *unmarshalWork) Unmarshal() { uw.rows.Unmarshal(bytesutil.ToUnsafeString(uw.reqBuf), uw.cds) @@ -188,7 +194,7 @@ func (uw *unmarshalWork) Unmarshal() { } } - uw.callback(rows) + uw.runCallback(rows) putUnmarshalWork(uw) } diff --git a/lib/protoparser/graphite/parser_test.go b/lib/protoparser/graphite/parser_test.go index e2922548a..3a29be751 100644 --- a/lib/protoparser/graphite/parser_test.go +++ b/lib/protoparser/graphite/parser_test.go @@ -324,7 +324,8 @@ func Test_streamContext_Read(t *testing.T) { } uw := getUnmarshalWork() callbackCalls := 0 - uw.callback = func(rows []Row) { + uw.ctx = ctx + uw.callback = func(rows []Row) error { callbackCalls++ if len(rows) != len(rowsExpected.Rows) { t.Fatalf("different len of expected rows;\ngot\n%+v;\nwant\n%+v", rows, rowsExpected.Rows) @@ -332,8 +333,10 @@ func Test_streamContext_Read(t *testing.T) { if !reflect.DeepEqual(rows, rowsExpected.Rows) { t.Fatalf("unexpected rows;\ngot\n%+v;\nwant\n%+v", rows, rowsExpected.Rows) } + return nil } uw.reqBuf = append(uw.reqBuf[:0], ctx.reqBuf...) + ctx.wg.Add(1) uw.Unmarshal() if callbackCalls != 1 { t.Fatalf("unexpected number of callback calls; got %d; want 1", callbackCalls) diff --git a/lib/protoparser/graphite/streamparser.go b/lib/protoparser/graphite/streamparser.go index 010771c4d..a4207b5df 100644 --- a/lib/protoparser/graphite/streamparser.go +++ b/lib/protoparser/graphite/streamparser.go @@ -31,16 +31,8 @@ func ParseStream(r io.Reader, callback func(rows []Row) error) error { for ctx.Read() { uw := getUnmarshalWork() - uw.callback = func(rows []Row) { - if err := callback(rows); err != nil { - ctx.callbackErrLock.Lock() - if ctx.callbackErr == nil { - ctx.callbackErr = fmt.Errorf("error when processing imported data: %w", err) - } - ctx.callbackErrLock.Unlock() - } - ctx.wg.Done() - } + uw.ctx = ctx + uw.callback = callback uw.reqBuf, ctx.reqBuf = ctx.reqBuf, uw.reqBuf ctx.wg.Add(1) common.ScheduleUnmarshalWork(uw) @@ -138,16 +130,30 @@ var streamContextPoolCh = make(chan *streamContext, cgroup.AvailableCPUs()) type unmarshalWork struct { rows Rows - callback func(rows []Row) + ctx *streamContext + callback func(rows []Row) error reqBuf []byte } func (uw *unmarshalWork) reset() { uw.rows.Reset() + uw.ctx = nil uw.callback = nil uw.reqBuf = uw.reqBuf[:0] } +func (uw *unmarshalWork) runCallback(rows []Row) { + ctx := uw.ctx + if err := uw.callback(rows); err != nil { + ctx.callbackErrLock.Lock() + if ctx.callbackErr == nil { + ctx.callbackErr = fmt.Errorf("error when processing imported data: %w", err) + } + ctx.callbackErrLock.Unlock() + } + ctx.wg.Done() +} + // Unmarshal implements common.UnmarshalWork func (uw *unmarshalWork) Unmarshal() { uw.rows.Unmarshal(bytesutil.ToUnsafeString(uw.reqBuf)) @@ -176,7 +182,7 @@ func (uw *unmarshalWork) Unmarshal() { } } - uw.callback(rows) + uw.runCallback(rows) putUnmarshalWork(uw) } diff --git a/lib/protoparser/influx/streamparser.go b/lib/protoparser/influx/streamparser.go index 96f865b6a..a3820f62a 100644 --- a/lib/protoparser/influx/streamparser.go +++ b/lib/protoparser/influx/streamparser.go @@ -56,16 +56,8 @@ func ParseStream(r io.Reader, isGzipped bool, precision, db string, callback fun defer putStreamContext(ctx) for ctx.Read() { uw := getUnmarshalWork() - uw.callback = func(db string, rows []Row) { - if err := callback(db, rows); err != nil { - ctx.callbackErrLock.Lock() - if ctx.callbackErr == nil { - ctx.callbackErr = fmt.Errorf("error when processing imported data: %w", err) - } - ctx.callbackErrLock.Unlock() - } - ctx.wg.Done() - } + uw.ctx = ctx + uw.callback = callback uw.db = db uw.tsMultiplier = tsMultiplier uw.reqBuf, ctx.reqBuf = ctx.reqBuf, uw.reqBuf @@ -165,7 +157,8 @@ var streamContextPoolCh = make(chan *streamContext, cgroup.AvailableCPUs()) type unmarshalWork struct { rows Rows - callback func(db string, rows []Row) + ctx *streamContext + callback func(db string, rows []Row) error db string tsMultiplier int64 reqBuf []byte @@ -173,12 +166,25 @@ type unmarshalWork struct { func (uw *unmarshalWork) reset() { uw.rows.Reset() + uw.ctx = nil uw.callback = nil uw.db = "" uw.tsMultiplier = 0 uw.reqBuf = uw.reqBuf[:0] } +func (uw *unmarshalWork) runCallback(rows []Row) { + ctx := uw.ctx + if err := uw.callback(uw.db, rows); err != nil { + ctx.callbackErrLock.Lock() + if ctx.callbackErr == nil { + ctx.callbackErr = fmt.Errorf("error when processing imported data: %w", err) + } + ctx.callbackErrLock.Unlock() + } + ctx.wg.Done() +} + // Unmarshal implements common.UnmarshalWork func (uw *unmarshalWork) Unmarshal() { uw.rows.Unmarshal(bytesutil.ToUnsafeString(uw.reqBuf)) @@ -225,7 +231,7 @@ func (uw *unmarshalWork) Unmarshal() { } } - uw.callback(uw.db, rows) + uw.runCallback(rows) putUnmarshalWork(uw) } diff --git a/lib/protoparser/opentsdb/streamparser.go b/lib/protoparser/opentsdb/streamparser.go index c66ad5fca..94e7fc5b8 100644 --- a/lib/protoparser/opentsdb/streamparser.go +++ b/lib/protoparser/opentsdb/streamparser.go @@ -30,16 +30,8 @@ func ParseStream(r io.Reader, callback func(rows []Row) error) error { defer putStreamContext(ctx) for ctx.Read() { uw := getUnmarshalWork() - uw.callback = func(rows []Row) { - if err := callback(rows); err != nil { - ctx.callbackErrLock.Lock() - if ctx.callbackErr == nil { - ctx.callbackErr = fmt.Errorf("error when processing imported data: %w", err) - } - ctx.callbackErrLock.Unlock() - } - ctx.wg.Done() - } + uw.ctx = ctx + uw.callback = callback uw.reqBuf, ctx.reqBuf = ctx.reqBuf, uw.reqBuf ctx.wg.Add(1) common.ScheduleUnmarshalWork(uw) @@ -137,16 +129,30 @@ var streamContextPoolCh = make(chan *streamContext, cgroup.AvailableCPUs()) type unmarshalWork struct { rows Rows - callback func(rows []Row) + ctx *streamContext + callback func(rows []Row) error reqBuf []byte } func (uw *unmarshalWork) reset() { uw.rows.Reset() + uw.ctx = nil uw.callback = nil uw.reqBuf = uw.reqBuf[:0] } +func (uw *unmarshalWork) runCallback(rows []Row) { + ctx := uw.ctx + if err := uw.callback(rows); err != nil { + ctx.callbackErrLock.Lock() + if ctx.callbackErr == nil { + ctx.callbackErr = fmt.Errorf("error when processing imported data: %w", err) + } + ctx.callbackErrLock.Unlock() + } + ctx.wg.Done() +} + // Unmarshal implements common.UnmarshalWork func (uw *unmarshalWork) Unmarshal() { uw.rows.Unmarshal(bytesutil.ToUnsafeString(uw.reqBuf)) @@ -175,7 +181,7 @@ func (uw *unmarshalWork) Unmarshal() { } } - uw.callback(rows) + uw.runCallback(rows) putUnmarshalWork(uw) } diff --git a/lib/protoparser/prometheus/streamparser.go b/lib/protoparser/prometheus/streamparser.go index ef7781950..af093f82f 100644 --- a/lib/protoparser/prometheus/streamparser.go +++ b/lib/protoparser/prometheus/streamparser.go @@ -32,16 +32,8 @@ func ParseStream(r io.Reader, defaultTimestamp int64, isGzipped bool, callback f for ctx.Read() { uw := getUnmarshalWork() uw.errLogger = errLogger - uw.callback = func(rows []Row) { - if err := callback(rows); err != nil { - ctx.callbackErrLock.Lock() - if ctx.callbackErr == nil { - ctx.callbackErr = fmt.Errorf("error when processing imported data: %w", err) - } - ctx.callbackErrLock.Unlock() - } - ctx.wg.Done() - } + uw.ctx = ctx + uw.callback = callback uw.defaultTimestamp = defaultTimestamp uw.reqBuf, ctx.reqBuf = ctx.reqBuf, uw.reqBuf ctx.wg.Add(1) @@ -140,7 +132,8 @@ var streamContextPoolCh = make(chan *streamContext, cgroup.AvailableCPUs()) type unmarshalWork struct { rows Rows - callback func(rows []Row) + ctx *streamContext + callback func(rows []Row) error errLogger func(string) defaultTimestamp int64 reqBuf []byte @@ -148,12 +141,25 @@ type unmarshalWork struct { func (uw *unmarshalWork) reset() { uw.rows.Reset() + uw.ctx = nil uw.callback = nil uw.errLogger = nil uw.defaultTimestamp = 0 uw.reqBuf = uw.reqBuf[:0] } +func (uw *unmarshalWork) runCallback(rows []Row) { + ctx := uw.ctx + if err := uw.callback(rows); err != nil { + ctx.callbackErrLock.Lock() + if ctx.callbackErr == nil { + ctx.callbackErr = fmt.Errorf("error when processing imported data: %w", err) + } + ctx.callbackErrLock.Unlock() + } + ctx.wg.Done() +} + // Unmarshal implements common.UnmarshalWork func (uw *unmarshalWork) Unmarshal() { if uw.errLogger != nil { @@ -176,7 +182,7 @@ func (uw *unmarshalWork) Unmarshal() { } } - uw.callback(rows) + uw.runCallback(rows) putUnmarshalWork(uw) } diff --git a/lib/protoparser/vmimport/streamparser.go b/lib/protoparser/vmimport/streamparser.go index 497643090..dbcd84e56 100644 --- a/lib/protoparser/vmimport/streamparser.go +++ b/lib/protoparser/vmimport/streamparser.go @@ -34,16 +34,8 @@ func ParseStream(r io.Reader, isGzipped bool, callback func(rows []Row) error) e defer putStreamContext(ctx) for ctx.Read() { uw := getUnmarshalWork() - uw.callback = func(rows []Row) { - if err := callback(rows); err != nil { - ctx.callbackErrLock.Lock() - if ctx.callbackErr == nil { - ctx.callbackErr = fmt.Errorf("error when processing imported data: %w", err) - } - ctx.callbackErrLock.Unlock() - } - ctx.wg.Done() - } + uw.ctx = ctx + uw.callback = callback uw.reqBuf, ctx.reqBuf = ctx.reqBuf, uw.reqBuf ctx.wg.Add(1) common.ScheduleUnmarshalWork(uw) @@ -141,16 +133,30 @@ var streamContextPoolCh = make(chan *streamContext, cgroup.AvailableCPUs()) type unmarshalWork struct { rows Rows - callback func(rows []Row) + ctx *streamContext + callback func(rows []Row) error reqBuf []byte } func (uw *unmarshalWork) reset() { uw.rows.Reset() + uw.ctx = nil uw.callback = nil uw.reqBuf = uw.reqBuf[:0] } +func (uw *unmarshalWork) runCallback(rows []Row) { + ctx := uw.ctx + if err := uw.callback(rows); err != nil { + ctx.callbackErrLock.Lock() + if ctx.callbackErr == nil { + ctx.callbackErr = fmt.Errorf("error when processing imported data: %w", err) + } + ctx.callbackErrLock.Unlock() + } + ctx.wg.Done() +} + // Unmarshal implements common.UnmarshalWork func (uw *unmarshalWork) Unmarshal() { uw.rows.Unmarshal(bytesutil.ToUnsafeString(uw.reqBuf)) @@ -159,7 +165,7 @@ func (uw *unmarshalWork) Unmarshal() { row := &rows[i] rowsRead.Add(len(row.Timestamps)) } - uw.callback(rows) + uw.runCallback(rows) putUnmarshalWork(uw) } From ad35068c3ad1425f1735fd74b6c6612bb0511a6b Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Wed, 6 Apr 2022 18:48:05 +0300 Subject: [PATCH 09/13] lib/mergeset: skip common prefixes when comparing inmemoryBlock items This should improve the performance for items sorting inside inmemoryBlock.MarshalUnsortedData if they have common prefix. While at it, improve the performance for inmemoryBlock.updateCommonPrefix for sorted items. This should improve performance for inmemoryBlock.MarshalSortedData during background merge. --- lib/mergeset/encoding.go | 50 ++++++++++++++++++++++++++++++---------- 1 file changed, 38 insertions(+), 12 deletions(-) diff --git a/lib/mergeset/encoding.go b/lib/mergeset/encoding.go index b7774d720..07de82286 100644 --- a/lib/mergeset/encoding.go +++ b/lib/mergeset/encoding.go @@ -28,9 +28,10 @@ type Item struct { // // The returned bytes representation belongs to data. func (it Item) Bytes(data []byte) []byte { + n := int(it.End - it.Start) sh := (*reflect.SliceHeader)(unsafe.Pointer(&data)) - sh.Cap = int(it.End - it.Start) - sh.Len = int(it.End - it.Start) + sh.Cap = n + sh.Len = n sh.Data += uintptr(it.Start) return data } @@ -48,8 +49,13 @@ func (it Item) String(data []byte) string { func (ib *inmemoryBlock) Len() int { return len(ib.items) } func (ib *inmemoryBlock) Less(i, j int) bool { - data := ib.data items := ib.items + a := items[i] + b := items[j] + cpLen := uint32(len(ib.commonPrefix)) + a.Start += cpLen + b.Start += cpLen + data := ib.data return string(items[i].Bytes(data)) < string(items[j].Bytes(data)) } @@ -59,9 +65,15 @@ func (ib *inmemoryBlock) Swap(i, j int) { } type inmemoryBlock struct { + // commonPrefix contains common prefix for all the items stored in the inmemoryBlock commonPrefix []byte - data []byte - items []Item + + // data contains source data for items + data []byte + + // items contains items stored in inmemoryBlock. + // Every item contains the prefix specified at commonPrefix. + items []Item } func (ib *inmemoryBlock) SizeBytes() int { @@ -74,17 +86,29 @@ func (ib *inmemoryBlock) Reset() { ib.items = ib.items[:0] } -func (ib *inmemoryBlock) updateCommonPrefix() { +func (ib *inmemoryBlock) updateCommonPrefixSorted() { ib.commonPrefix = ib.commonPrefix[:0] - if len(ib.items) == 0 { + items := ib.items + if len(items) == 0 { return } - items := ib.items data := ib.data cp := items[0].Bytes(data) - if len(cp) == 0 { + if len(items) > 1 { + cpLen := commonPrefixLen(cp, items[len(items)-1].Bytes(data)) + cp = cp[:cpLen] + } + ib.commonPrefix = append(ib.commonPrefix[:0], cp...) +} + +func (ib *inmemoryBlock) updateCommonPrefixUnsorted() { + ib.commonPrefix = ib.commonPrefix[:0] + items := ib.items + if len(items) == 0 { return } + data := ib.data + cp := items[0].Bytes(data) for _, it := range items[1:] { cpLen := commonPrefixLen(cp, it.Bytes(data)) if cpLen == 0 { @@ -176,9 +200,11 @@ func (ib *inmemoryBlock) isSorted() bool { // - returns the marshal type used for the encoding. func (ib *inmemoryBlock) MarshalUnsortedData(sb *storageBlock, firstItemDst, commonPrefixDst []byte, compressLevel int) ([]byte, []byte, uint32, marshalType) { if !ib.isSorted() { + ib.updateCommonPrefixUnsorted() sort.Sort(ib) + } else { + ib.updateCommonPrefixSorted() } - ib.updateCommonPrefix() return ib.marshalData(sb, firstItemDst, commonPrefixDst, compressLevel) } @@ -197,7 +223,7 @@ func (ib *inmemoryBlock) MarshalSortedData(sb *storageBlock, firstItemDst, commo if isInTest && !ib.isSorted() { logger.Panicf("BUG: %d items must be sorted; items:\n%s", len(ib.items), ib.debugItemsString()) } - ib.updateCommonPrefix() + ib.updateCommonPrefixSorted() return ib.marshalData(sb, firstItemDst, commonPrefixDst, compressLevel) } @@ -218,7 +244,7 @@ func (ib *inmemoryBlock) debugItemsString() string { // Preconditions: // - ib.items must be sorted. -// - updateCommonPrefix must be called. +// - updateCommonPrefix* must be called. func (ib *inmemoryBlock) marshalData(sb *storageBlock, firstItemDst, commonPrefixDst []byte, compressLevel int) ([]byte, []byte, uint32, marshalType) { if len(ib.items) <= 0 { logger.Panicf("BUG: inmemoryBlock.marshalData must be called on non-empty blocks only") From 7bad7133bc8171830b8639a231a11bece5d016cf Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Wed, 6 Apr 2022 19:35:50 +0300 Subject: [PATCH 10/13] lib/mergeset: use more rawItemsShard shards on multi-CPU systems This should improve the scalability for registering of new time series on multi-CPU system --- lib/mergeset/table.go | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/lib/mergeset/table.go b/lib/mergeset/table.go index cb942d4f2..cc5284610 100644 --- a/lib/mergeset/table.go +++ b/lib/mergeset/table.go @@ -125,9 +125,16 @@ type rawItemsShards struct { // The number of shards for rawItems per table. // // Higher number of shards reduces CPU contention and increases the max bandwidth on multi-core systems. -var rawItemsShardsPerTable = cgroup.AvailableCPUs() +var rawItemsShardsPerTable = func() int { + cpus := cgroup.AvailableCPUs() + multiplier := cpus + if multiplier > 16 { + multiplier = 16 + } + return (cpus * multiplier + 1) / 2 +}() -const maxBlocksPerShard = 512 +const maxBlocksPerShard = 256 func (riss *rawItemsShards) init() { riss.shards = make([]rawItemsShard, rawItemsShardsPerTable) From 57143e94354bea66774c33684164695969d0cff6 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Wed, 6 Apr 2022 19:49:17 +0300 Subject: [PATCH 11/13] lib/storage: increase the number of rawRowsShard shards on systems with more than 4 CPU cores This should improve data ingestion scalability on systems with many CPU cores --- lib/storage/partition.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/storage/partition.go b/lib/storage/partition.go index 91baae16d..43439edcc 100644 --- a/lib/storage/partition.go +++ b/lib/storage/partition.go @@ -65,7 +65,7 @@ const finalPartsToMerge = 3 // The number of shards for rawRow entries per partition. // // Higher number of shards reduces CPU contention and increases the max bandwidth on multi-core systems. -var rawRowsShardsPerPartition = (cgroup.AvailableCPUs() + 7) / 8 +var rawRowsShardsPerPartition = (cgroup.AvailableCPUs() + 3) / 4 // getMaxRawRowsPerShard returns the maximum number of rows that haven't been converted into parts yet. func getMaxRawRowsPerShard() int { From 2b59fff5260a68f5f033038fa63ff52a97a31f6d Mon Sep 17 00:00:00 2001 From: Roman Khavronenko Date: Wed, 6 Apr 2022 20:24:45 +0200 Subject: [PATCH 12/13] vmalert: fix labels and annotations processing for alerts (#2403) To improve compatibility with Prometheus alerting the order of templates processing has changed. Before, vmalert did all labels processing beforehand. It meant all extra labels (such as `alertname`, `alertgroup` or rule labels) were available in templating. All collisions were resolved in favour of extra labels. In Prometheus, only labels from the received metric are available in templating, so no collisions are possible. This change makes vmalert's behaviour similar to Prometheus. For example, consider alerting rule which is triggered by time series with `alertname` label. In vmalert, this label would be overriden by alerting rule's name everywhere: for alert labels, for annotations, etc. In Prometheus, it would be overriden for alert's labels only, but in annotations the original label value would be available. See more details here https://github.com/prometheus/compliance/issues/80 Signed-off-by: hagen1778 --- app/vmalert/alerting.go | 160 ++++++++++++++++------------- app/vmalert/alerting_test.go | 92 ++++++++--------- app/vmalert/group_test.go | 16 +-- app/vmalert/main.go | 2 +- app/vmalert/manager.go | 4 +- app/vmalert/notifier/alert.go | 4 +- app/vmalert/notifier/alert_test.go | 2 +- 7 files changed, 142 insertions(+), 138 deletions(-) diff --git a/app/vmalert/alerting.go b/app/vmalert/alerting.go index a5b929f86..0f819cbd6 100644 --- a/app/vmalert/alerting.go +++ b/app/vmalert/alerting.go @@ -141,6 +141,53 @@ func (ar *AlertingRule) ID() uint64 { return ar.RuleID } +type labelSet struct { + // origin labels from series + // used for templating + origin map[string]string + // processed labels with additional data + // used as Alert labels + processed map[string]string +} + +// toLabels converts labels from given Metric +// to labelSet which contains original and processed labels. +func (ar *AlertingRule) toLabels(m datasource.Metric, qFn notifier.QueryFn) (*labelSet, error) { + ls := &labelSet{ + origin: make(map[string]string, len(m.Labels)), + processed: make(map[string]string), + } + for _, l := range m.Labels { + // drop __name__ to be consistent with Prometheus alerting + if l.Name == "__name__" { + continue + } + ls.origin[l.Name] = l.Value + ls.processed[l.Name] = l.Value + } + + extraLabels, err := notifier.ExecTemplate(qFn, ar.Labels, notifier.AlertTplData{ + Labels: ls.origin, + Value: m.Values[0], + Expr: ar.Expr, + }) + if err != nil { + return nil, fmt.Errorf("failed to expand labels: %s", err) + } + for k, v := range extraLabels { + ls.processed[k] = v + } + + // set additional labels to identify group and rule name + if ar.Name != "" { + ls.processed[alertNameLabel] = ar.Name + } + if !*disableAlertGroupLabel && ar.GroupName != "" { + ls.processed[alertGroupNameLabel] = ar.GroupName + } + return ls, nil +} + // ExecRange executes alerting rule on the given time range similarly to Exec. // It doesn't update internal states of the Rule and meant to be used just // to get time series for backfilling. @@ -155,24 +202,7 @@ func (ar *AlertingRule) ExecRange(ctx context.Context, start, end time.Time) ([] return nil, fmt.Errorf("`query` template isn't supported in replay mode") } for _, s := range series { - // set additional labels to identify group and rule Name - if ar.Name != "" { - s.SetLabel(alertNameLabel, ar.Name) - } - if !*disableAlertGroupLabel && ar.GroupName != "" { - s.SetLabel(alertGroupNameLabel, ar.GroupName) - } - // extra labels could contain templates, so we expand them first - labels, err := expandLabels(s, qFn, ar) - if err != nil { - return nil, fmt.Errorf("failed to expand labels: %s", err) - } - for k, v := range labels { - // apply extra labels to datasource - // so the hash key will be consistent on restore - s.SetLabel(k, v) - } - a, err := ar.newAlert(s, time.Time{}, qFn) // initial alert + a, err := ar.newAlert(s, nil, time.Time{}, qFn) // initial alert if err != nil { return nil, fmt.Errorf("failed to create alert: %s", err) } @@ -234,28 +264,15 @@ func (ar *AlertingRule) Exec(ctx context.Context, ts time.Time) ([]prompbmarshal updated := make(map[uint64]struct{}) // update list of active alerts for _, m := range qMetrics { - // set additional labels to identify group and rule name - if ar.Name != "" { - m.SetLabel(alertNameLabel, ar.Name) - } - if !*disableAlertGroupLabel && ar.GroupName != "" { - m.SetLabel(alertGroupNameLabel, ar.GroupName) - } - // extra labels could contain templates, so we expand them first - labels, err := expandLabels(m, qFn, ar) + ls, err := ar.toLabels(m, qFn) if err != nil { return nil, fmt.Errorf("failed to expand labels: %s", err) } - for k, v := range labels { - // apply extra labels to datasource - // so the hash key will be consistent on restore - m.SetLabel(k, v) - } - h := hash(m) + h := hash(ls.processed) if _, ok := updated[h]; ok { // duplicate may be caused by extra labels // conflicting with the metric labels - ar.lastExecError = fmt.Errorf("labels %v: %w", m.Labels, errDuplicate) + ar.lastExecError = fmt.Errorf("labels %v: %w", ls.processed, errDuplicate) return nil, ar.lastExecError } updated[h] = struct{}{} @@ -272,14 +289,14 @@ func (ar *AlertingRule) Exec(ctx context.Context, ts time.Time) ([]prompbmarshal a.Value = m.Values[0] // and re-exec template since Value can be used // in annotations - a.Annotations, err = a.ExecTemplate(qFn, ar.Annotations) + a.Annotations, err = a.ExecTemplate(qFn, ls.origin, ar.Annotations) if err != nil { return nil, err } } continue } - a, err := ar.newAlert(m, ar.lastExecTime, qFn) + a, err := ar.newAlert(m, ls, ar.lastExecTime, qFn) if err != nil { ar.lastExecError = err return nil, fmt.Errorf("failed to create alert: %w", err) @@ -315,19 +332,6 @@ func (ar *AlertingRule) Exec(ctx context.Context, ts time.Time) ([]prompbmarshal return ar.toTimeSeries(ts.Unix()), nil } -func expandLabels(m datasource.Metric, q notifier.QueryFn, ar *AlertingRule) (map[string]string, error) { - metricLabels := make(map[string]string) - for _, l := range m.Labels { - metricLabels[l.Name] = l.Value - } - tpl := notifier.AlertTplData{ - Labels: metricLabels, - Value: m.Values[0], - Expr: ar.Expr, - } - return notifier.ExecTemplate(q, ar.Labels, tpl) -} - func (ar *AlertingRule) toTimeSeries(timestamp int64) []prompbmarshal.TimeSeries { var tss []prompbmarshal.TimeSeries for _, a := range ar.alerts { @@ -358,42 +362,43 @@ func (ar *AlertingRule) UpdateWith(r Rule) error { } // TODO: consider hashing algorithm in VM -func hash(m datasource.Metric) uint64 { +func hash(labels map[string]string) uint64 { hash := fnv.New64a() - labels := m.Labels - sort.Slice(labels, func(i, j int) bool { - return labels[i].Name < labels[j].Name - }) - for _, l := range labels { + keys := make([]string, 0, len(labels)) + for k := range labels { + keys = append(keys, k) + } + sort.Strings(keys) + for _, k := range keys { // drop __name__ to be consistent with Prometheus alerting - if l.Name == "__name__" { + if k == "__name__" { continue } - hash.Write([]byte(l.Name)) - hash.Write([]byte(l.Value)) + name, value := k, labels[k] + hash.Write([]byte(name)) + hash.Write([]byte(value)) hash.Write([]byte("\xff")) } return hash.Sum64() } -func (ar *AlertingRule) newAlert(m datasource.Metric, start time.Time, qFn notifier.QueryFn) (*notifier.Alert, error) { +func (ar *AlertingRule) newAlert(m datasource.Metric, ls *labelSet, start time.Time, qFn notifier.QueryFn) (*notifier.Alert, error) { + var err error + if ls == nil { + ls, err = ar.toLabels(m, qFn) + if err != nil { + return nil, fmt.Errorf("failed to expand labels: %s", err) + } + } a := ¬ifier.Alert{ GroupID: ar.GroupID, Name: ar.Name, - Labels: map[string]string{}, + Labels: ls.processed, Value: m.Values[0], ActiveAt: start, Expr: ar.Expr, } - for _, l := range m.Labels { - // drop __name__ to be consistent with Prometheus alerting - if l.Name == "__name__" { - continue - } - a.Labels[l.Name] = l.Value - } - var err error - a.Annotations, err = a.ExecTemplate(qFn, ar.Annotations) + a.Annotations, err = a.ExecTemplate(qFn, ls.origin, ar.Annotations) return a, err } @@ -560,15 +565,26 @@ func (ar *AlertingRule) Restore(ctx context.Context, q datasource.Querier, lookb } for _, m := range qMetrics { - a, err := ar.newAlert(m, time.Unix(int64(m.Values[0]), 0), qFn) + ls := &labelSet{ + origin: make(map[string]string, len(m.Labels)), + processed: make(map[string]string, len(m.Labels)), + } + for _, l := range m.Labels { + if l.Name == "__name__" { + continue + } + ls.origin[l.Name] = l.Value + ls.processed[l.Name] = l.Value + } + a, err := ar.newAlert(m, ls, time.Unix(int64(m.Values[0]), 0), qFn) if err != nil { return fmt.Errorf("failed to create alert: %w", err) } - a.ID = hash(m) + a.ID = hash(ls.processed) a.State = notifier.StatePending a.Restored = true ar.alerts[a.ID] = a - logger.Infof("alert %q (%d) restored to state at %v", a.Name, a.ID, a.Start) + logger.Infof("alert %q (%d) restored to state at %v", a.Name, a.ID, a.ActiveAt) } return nil } diff --git a/app/vmalert/alerting_test.go b/app/vmalert/alerting_test.go index edca8254a..6bbd2dff6 100644 --- a/app/vmalert/alerting_test.go +++ b/app/vmalert/alerting_test.go @@ -315,10 +315,13 @@ func TestAlertingRule_Exec(t *testing.T) { } expAlerts := make(map[uint64]*notifier.Alert) for _, ta := range tc.expAlerts { - labels := ta.labels - labels = append(labels, alertNameLabel) - labels = append(labels, tc.rule.Name) - h := hash(metricWithLabels(t, labels...)) + labels := make(map[string]string) + for i := 0; i < len(ta.labels); i += 2 { + k, v := ta.labels[i], ta.labels[i+1] + labels[k] = v + } + labels[alertNameLabel] = tc.rule.Name + h := hash(labels) expAlerts[h] = ta.alert } for key, exp := range expAlerts { @@ -513,7 +516,7 @@ func TestAlertingRule_Restore(t *testing.T) { ), }, map[uint64]*notifier.Alert{ - hash(datasource.Metric{}): {State: notifier.StatePending, + hash(nil): {State: notifier.StatePending, ActiveAt: time.Now().Truncate(time.Hour)}, }, }, @@ -529,12 +532,12 @@ func TestAlertingRule_Restore(t *testing.T) { ), }, map[uint64]*notifier.Alert{ - hash(metricWithLabels(t, - alertNameLabel, "metric labels", - alertGroupNameLabel, "groupID", - "foo", "bar", - "namespace", "baz", - )): {State: notifier.StatePending, + hash(map[string]string{ + alertNameLabel: "metric labels", + alertGroupNameLabel: "groupID", + "foo": "bar", + "namespace": "baz", + }): {State: notifier.StatePending, ActiveAt: time.Now().Truncate(time.Hour)}, }, }, @@ -550,11 +553,11 @@ func TestAlertingRule_Restore(t *testing.T) { ), }, map[uint64]*notifier.Alert{ - hash(metricWithLabels(t, - "foo", "bar", - "namespace", "baz", - "source", "vm", - )): {State: notifier.StatePending, + hash(map[string]string{ + "foo": "bar", + "namespace": "baz", + "source": "vm", + }): {State: notifier.StatePending, ActiveAt: time.Now().Truncate(time.Hour)}, }, }, @@ -575,11 +578,11 @@ func TestAlertingRule_Restore(t *testing.T) { ), }, map[uint64]*notifier.Alert{ - hash(metricWithLabels(t, "host", "localhost-1")): {State: notifier.StatePending, + hash(map[string]string{"host": "localhost-1"}): {State: notifier.StatePending, ActiveAt: time.Now().Truncate(time.Hour)}, - hash(metricWithLabels(t, "host", "localhost-2")): {State: notifier.StatePending, + hash(map[string]string{"host": "localhost-2"}): {State: notifier.StatePending, ActiveAt: time.Now().Truncate(2 * time.Hour)}, - hash(metricWithLabels(t, "host", "localhost-3")): {State: notifier.StatePending, + hash(map[string]string{"host": "localhost-3"}): {State: notifier.StatePending, ActiveAt: time.Now().Truncate(3 * time.Hour)}, }, }, @@ -659,7 +662,7 @@ func TestAlertingRule_Template(t *testing.T) { metricWithValueAndLabels(t, 1, "instance", "bar"), }, map[uint64]*notifier.Alert{ - hash(metricWithLabels(t, alertNameLabel, "common", "region", "east", "instance", "foo")): { + hash(map[string]string{alertNameLabel: "common", "region": "east", "instance": "foo"}): { Annotations: map[string]string{}, Labels: map[string]string{ alertNameLabel: "common", @@ -667,7 +670,7 @@ func TestAlertingRule_Template(t *testing.T) { "instance": "foo", }, }, - hash(metricWithLabels(t, alertNameLabel, "common", "region", "east", "instance", "bar")): { + hash(map[string]string{alertNameLabel: "common", "region": "east", "instance": "bar"}): { Annotations: map[string]string{}, Labels: map[string]string{ alertNameLabel: "common", @@ -682,11 +685,10 @@ func TestAlertingRule_Template(t *testing.T) { Name: "override label", Labels: map[string]string{ "instance": "{{ $labels.instance }}", - "region": "east", }, Annotations: map[string]string{ - "summary": `Too high connection number for "{{ $labels.instance }}" for region {{ $labels.region }}`, - "description": `It is {{ $value }} connections for "{{ $labels.instance }}"`, + "summary": `Too high connection number for "{{ $labels.instance }}"`, + "description": `{{ $labels.alertname}}: It is {{ $value }} connections for "{{ $labels.instance }}"`, }, alerts: make(map[uint64]*notifier.Alert), }, @@ -695,64 +697,58 @@ func TestAlertingRule_Template(t *testing.T) { metricWithValueAndLabels(t, 10, "instance", "bar", alertNameLabel, "override"), }, map[uint64]*notifier.Alert{ - hash(metricWithLabels(t, alertNameLabel, "override label", "region", "east", "instance", "foo")): { + hash(map[string]string{alertNameLabel: "override label", "instance": "foo"}): { Labels: map[string]string{ alertNameLabel: "override label", "instance": "foo", - "region": "east", }, Annotations: map[string]string{ - "summary": `Too high connection number for "foo" for region east`, - "description": `It is 2 connections for "foo"`, + "summary": `Too high connection number for "foo"`, + "description": `override: It is 2 connections for "foo"`, }, }, - hash(metricWithLabels(t, alertNameLabel, "override label", "region", "east", "instance", "bar")): { + hash(map[string]string{alertNameLabel: "override label", "instance": "bar"}): { Labels: map[string]string{ alertNameLabel: "override label", "instance": "bar", - "region": "east", }, Annotations: map[string]string{ - "summary": `Too high connection number for "bar" for region east`, - "description": `It is 10 connections for "bar"`, + "summary": `Too high connection number for "bar"`, + "description": `override: It is 10 connections for "bar"`, }, }, }, }, { &AlertingRule{ - Name: "ExtraTemplating", + Name: "OriginLabels", GroupName: "Testing", Labels: map[string]string{ - "name": "alert_{{ $labels.alertname }}", - "group": "group_{{ $labels.alertgroup }}", "instance": "{{ $labels.instance }}", }, Annotations: map[string]string{ - "summary": `Alert "{{ $labels.alertname }}({{ $labels.alertgroup }})" for instance {{ $labels.instance }}`, - "description": `Alert "{{ $labels.name }}({{ $labels.group }})" for instance {{ $labels.instance }}`, + "summary": `Alert "{{ $labels.alertname }}({{ $labels.alertgroup }})" for instance {{ $labels.instance }}`, }, alerts: make(map[uint64]*notifier.Alert), }, []datasource.Metric{ - metricWithValueAndLabels(t, 1, "instance", "foo"), + metricWithValueAndLabels(t, 1, + alertNameLabel, "originAlertname", + alertGroupNameLabel, "originGroupname", + "instance", "foo"), }, map[uint64]*notifier.Alert{ - hash(metricWithLabels(t, alertNameLabel, "ExtraTemplating", - "name", "alert_ExtraTemplating", - alertGroupNameLabel, "Testing", - "group", "group_Testing", - "instance", "foo")): { + hash(map[string]string{ + alertNameLabel: "OriginLabels", + alertGroupNameLabel: "Testing", + "instance": "foo"}): { Labels: map[string]string{ - alertNameLabel: "ExtraTemplating", - "name": "alert_ExtraTemplating", + alertNameLabel: "OriginLabels", alertGroupNameLabel: "Testing", - "group": "group_Testing", "instance": "foo", }, Annotations: map[string]string{ - "summary": `Alert "ExtraTemplating(Testing)" for instance foo`, - "description": `Alert "alert_ExtraTemplating(group_Testing)" for instance foo`, + "summary": `Alert "originAlertname(originGroupname)" for instance foo`, }, }, }, diff --git a/app/vmalert/group_test.go b/app/vmalert/group_test.go index d94838b60..8322e65d0 100644 --- a/app/vmalert/group_test.go +++ b/app/vmalert/group_test.go @@ -174,7 +174,7 @@ func TestGroupStart(t *testing.T) { m2 := metricWithLabels(t, "instance", inst2, "job", job) r := g.Rules[0].(*AlertingRule) - alert1, err := r.newAlert(m1, time.Now(), nil) + alert1, err := r.newAlert(m1, nil, time.Now(), nil) if err != nil { t.Fatalf("faield to create alert: %s", err) } @@ -187,13 +187,9 @@ func TestGroupStart(t *testing.T) { // add service labels alert1.Labels[alertNameLabel] = alert1.Name alert1.Labels[alertGroupNameLabel] = g.Name - var labels1 []string - for k, v := range alert1.Labels { - labels1 = append(labels1, k, v) - } - alert1.ID = hash(metricWithLabels(t, labels1...)) + alert1.ID = hash(alert1.Labels) - alert2, err := r.newAlert(m2, time.Now(), nil) + alert2, err := r.newAlert(m2, nil, time.Now(), nil) if err != nil { t.Fatalf("faield to create alert: %s", err) } @@ -206,11 +202,7 @@ func TestGroupStart(t *testing.T) { // add service labels alert2.Labels[alertNameLabel] = alert2.Name alert2.Labels[alertGroupNameLabel] = g.Name - var labels2 []string - for k, v := range alert2.Labels { - labels2 = append(labels2, k, v) - } - alert2.ID = hash(metricWithLabels(t, labels2...)) + alert2.ID = hash(alert2.Labels) finished := make(chan struct{}) fs.add(m1) diff --git a/app/vmalert/main.go b/app/vmalert/main.go index c98650329..9366ad350 100644 --- a/app/vmalert/main.go +++ b/app/vmalert/main.go @@ -243,7 +243,7 @@ func getAlertURLGenerator(externalURL *url.URL, externalAlertSource string, vali "tpl": externalAlertSource, } return func(alert notifier.Alert) string { - templated, err := alert.ExecTemplate(nil, m) + templated, err := alert.ExecTemplate(nil, nil, m) if err != nil { logger.Errorf("can not exec source template %s", err) } diff --git a/app/vmalert/manager.go b/app/vmalert/manager.go index 7152fd258..3ab70c6f4 100644 --- a/app/vmalert/manager.go +++ b/app/vmalert/manager.go @@ -37,7 +37,7 @@ func (m *manager) AlertAPI(gID, aID uint64) (*APIAlert, error) { g, ok := m.groups[gID] if !ok { - return nil, fmt.Errorf("can't find group with id %q", gID) + return nil, fmt.Errorf("can't find group with id %d", gID) } for _, rule := range g.Rules { ar, ok := rule.(*AlertingRule) @@ -48,7 +48,7 @@ func (m *manager) AlertAPI(gID, aID uint64) (*APIAlert, error) { return apiAlert, nil } } - return nil, fmt.Errorf("can't find alert with id %q in group %q", aID, g.Name) + return nil, fmt.Errorf("can't find alert with id %d in group %q", aID, g.Name) } func (m *manager) start(ctx context.Context, groupsCfg []config.Group) error { diff --git a/app/vmalert/notifier/alert.go b/app/vmalert/notifier/alert.go index 44b80eb4a..a1bcae7c1 100644 --- a/app/vmalert/notifier/alert.go +++ b/app/vmalert/notifier/alert.go @@ -88,8 +88,8 @@ var tplHeaders = []string{ // map of annotations. // Every alert could have a different datasource, so function // requires a queryFunction as an argument. -func (a *Alert) ExecTemplate(q QueryFn, annotations map[string]string) (map[string]string, error) { - tplData := AlertTplData{Value: a.Value, Labels: a.Labels, Expr: a.Expr} +func (a *Alert) ExecTemplate(q QueryFn, labels, annotations map[string]string) (map[string]string, error) { + tplData := AlertTplData{Value: a.Value, Labels: labels, Expr: a.Expr} return templateAnnotations(annotations, tplData, funcsWithQuery(q)) } diff --git a/app/vmalert/notifier/alert_test.go b/app/vmalert/notifier/alert_test.go index f8e0c77ee..27b83aac6 100644 --- a/app/vmalert/notifier/alert_test.go +++ b/app/vmalert/notifier/alert_test.go @@ -130,7 +130,7 @@ func TestAlert_ExecTemplate(t *testing.T) { } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - tpl, err := tc.alert.ExecTemplate(qFn, tc.annotations) + tpl, err := tc.alert.ExecTemplate(qFn, tc.alert.Labels, tc.annotations) if err != nil { t.Fatal(err) } From a96eb1632914b5deb21a96813d321bd4954b0f31 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Thu, 7 Apr 2022 15:21:17 +0300 Subject: [PATCH 13/13] lib/memory: export `process_memory_limit_bytes` metric, which shows the amounts of memory the current process has access to This metric is equivalent to `vm_available_memory_bytes`, but it has better name, since the metric is related to a process, not VictoriaMetrics itself. Leave `vm_available_memory_bytes` for backwards compatibility. --- lib/memory/memory.go | 15 ++++++++++----- lib/mergeset/table.go | 2 +- 2 files changed, 11 insertions(+), 6 deletions(-) diff --git a/lib/memory/memory.go b/lib/memory/memory.go index 04e4ff0c6..d403d4a48 100644 --- a/lib/memory/memory.go +++ b/lib/memory/memory.go @@ -7,6 +7,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" + "github.com/VictoriaMetrics/metrics" ) var ( @@ -14,11 +15,15 @@ var ( allowedBytes = flagutil.NewBytes("memory.allowedBytes", 0, `Allowed size of system memory VictoriaMetrics caches may occupy. This option overrides -memory.allowedPercent if set to a non-zero value. Too low a value may increase the cache miss rate usually resulting in higher CPU and disk IO usage. Too high a value may evict too much data from OS page cache resulting in higher disk IO usage`) ) +var _ = metrics.NewGauge("process_memory_limit_bytes", func() float64 { + return float64(memoryLimit) +}) + var ( allowedMemory int remainingMemory int + memoryLimit int ) - var once sync.Once func initOnce() { @@ -26,18 +31,18 @@ func initOnce() { // Do not use logger.Panicf here, since logger may be uninitialized yet. panic(fmt.Errorf("BUG: memory.Allowed must be called only after flag.Parse call")) } - mem := sysTotalMemory() + memoryLimit = sysTotalMemory() if allowedBytes.N <= 0 { if *allowedPercent < 1 || *allowedPercent > 200 { logger.Panicf("FATAL: -memory.allowedPercent must be in the range [1...200]; got %g", *allowedPercent) } percent := *allowedPercent / 100 - allowedMemory = int(float64(mem) * percent) - remainingMemory = mem - allowedMemory + allowedMemory = int(float64(memoryLimit) * percent) + remainingMemory = memoryLimit - allowedMemory logger.Infof("limiting caches to %d bytes, leaving %d bytes to the OS according to -memory.allowedPercent=%g", allowedMemory, remainingMemory, *allowedPercent) } else { allowedMemory = allowedBytes.N - remainingMemory = mem - allowedMemory + remainingMemory = memoryLimit - allowedMemory logger.Infof("limiting caches to %d bytes, leaving %d bytes to the OS according to -memory.allowedBytes=%s", allowedMemory, remainingMemory, allowedBytes.String()) } } diff --git a/lib/mergeset/table.go b/lib/mergeset/table.go index cc5284610..f0473c9d4 100644 --- a/lib/mergeset/table.go +++ b/lib/mergeset/table.go @@ -131,7 +131,7 @@ var rawItemsShardsPerTable = func() int { if multiplier > 16 { multiplier = 16 } - return (cpus * multiplier + 1) / 2 + return (cpus*multiplier + 1) / 2 }() const maxBlocksPerShard = 256