From 7e1dd8ab9d792b9c6250e0bf4c5b9cbadaeaf529 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Sat, 24 Feb 2024 02:07:51 +0200 Subject: [PATCH] lib: consistently use atomic.* types instead of atomic.* functions See ea9e2b19a5fa8aecc25c9da10fad8f4c1c58df38 --- app/vmagent/main.go | 3 +- app/vminsert/main.go | 3 +- go.mod | 2 +- go.sum | 4 +-- lib/backup/common/part.go | 4 +-- lib/blockcache/blockcache.go | 21 ++++++------- lib/blockcache/blockcache_timing_test.go | 4 +-- lib/bloomfilter/limiter.go | 8 ++--- lib/bytesutil/bytesutil_timing_test.go | 6 ++-- lib/bytesutil/fast_string_matcher.go | 27 +++++++++-------- lib/bytesutil/fast_string_matcher_test.go | 12 ++++---- .../fast_string_matcher_timing_test.go | 3 +- lib/bytesutil/fast_string_transformer.go | 21 ++++++------- .../fast_string_transformer_timing_test.go | 4 +-- lib/bytesutil/internstring.go | 14 ++++----- lib/decimal/decimal_timing_test.go | 8 ++--- lib/encoding/encoding_timing_test.go | 22 +++++++------- lib/encoding/int_timing_test.go | 17 +++++------ lib/encoding/nearest_delta2_timing_test.go | 5 ++-- lib/encoding/nearest_delta_timing_test.go | 5 ++-- lib/fasttime/fasttime.go | 10 +++++-- lib/fasttime/fasttime_timing_test.go | 6 ++-- lib/flagutil/password.go | 8 ++--- lib/fs/fs.go | 10 +++++-- lib/fs/fs_nix.go | 3 +- lib/fs/fs_openbsd.go | 3 +- lib/fs/fs_solaris.go | 3 +- lib/fs/fs_windows.go | 3 +- lib/fs/reader_at.go | 16 +++++----- lib/httpserver/httpserver.go | 6 ++-- lib/lrucache/lrucache.go | 21 ++++++------- lib/netutil/conn.go | 7 ++--- .../discovery/kubernetes/api_watcher.go | 24 +++++++-------- lib/promscrape/scraper.go | 10 +++---- lib/promscrape/statconn.go | 4 +-- lib/snapshot/snapshotutil/snapshotutil.go | 8 +++-- lib/workingsetcache/cache.go | 30 +++++++------------ .../VictoriaMetrics/metrics/counter.go | 5 ++++ vendor/modules.txt | 2 +- 39 files changed, 183 insertions(+), 189 deletions(-) diff --git a/app/vmagent/main.go b/app/vmagent/main.go index 7eedc3a24..e7c9b92a8 100644 --- a/app/vmagent/main.go +++ b/app/vmagent/main.go @@ -8,7 +8,6 @@ import ( "net/http" "os" "strings" - "sync/atomic" "time" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/csvimport" @@ -459,7 +458,7 @@ func requestHandler(w http.ResponseWriter, r *http.Request) bool { w.WriteHeader(http.StatusOK) return true case "/ready": - if rdy := atomic.LoadInt32(&promscrape.PendingScrapeConfigs); rdy > 0 { + if rdy := promscrape.PendingScrapeConfigs.Load(); rdy > 0 { errMsg := fmt.Sprintf("waiting for scrapes to init, left: %d", rdy) http.Error(w, errMsg, http.StatusTooEarly) } else { diff --git a/app/vminsert/main.go b/app/vminsert/main.go index 78fee4ead..a5d54553c 100644 --- a/app/vminsert/main.go +++ b/app/vminsert/main.go @@ -6,7 +6,6 @@ import ( "fmt" "net/http" "strings" - "sync/atomic" "time" "github.com/VictoriaMetrics/metrics" @@ -354,7 +353,7 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool { w.WriteHeader(http.StatusOK) return true case "/ready": - if rdy := atomic.LoadInt32(&promscrape.PendingScrapeConfigs); rdy > 0 { + if rdy := promscrape.PendingScrapeConfigs.Load(); rdy > 0 { errMsg := fmt.Sprintf("waiting for scrape config to init targets, configs left: %d", rdy) http.Error(w, errMsg, http.StatusTooEarly) } else { diff --git a/go.mod b/go.mod index 01a23707b..d021148c0 100644 --- a/go.mod +++ b/go.mod @@ -8,7 +8,7 @@ require ( github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.3.0 github.com/VictoriaMetrics/easyproto v0.1.4 github.com/VictoriaMetrics/fastcache v1.12.2 - github.com/VictoriaMetrics/metrics v1.32.0 + github.com/VictoriaMetrics/metrics v1.33.0 github.com/VictoriaMetrics/metricsql v0.74.0 github.com/aws/aws-sdk-go-v2 v1.25.0 github.com/aws/aws-sdk-go-v2/config v1.27.0 diff --git a/go.sum b/go.sum index d4874d507..8e7976aa6 100644 --- a/go.sum +++ b/go.sum @@ -69,8 +69,8 @@ github.com/VictoriaMetrics/easyproto v0.1.4/go.mod h1:QlGlzaJnDfFd8Lk6Ci/fuLxfTo github.com/VictoriaMetrics/fastcache v1.12.2 h1:N0y9ASrJ0F6h0QaC3o6uJb3NIZ9VKLjCM7NQbSmF7WI= github.com/VictoriaMetrics/fastcache v1.12.2/go.mod h1:AmC+Nzz1+3G2eCPapF6UcsnkThDcMsQicp4xDukwJYI= github.com/VictoriaMetrics/metrics v1.24.0/go.mod h1:eFT25kvsTidQFHb6U0oa0rTrDRdz4xTYjpL8+UPohys= -github.com/VictoriaMetrics/metrics v1.32.0 h1:r9JK2zndYv0TIxFXLEHwhQqRdnu8/O3cwJiCBX4vJCM= -github.com/VictoriaMetrics/metrics v1.32.0/go.mod h1:r7hveu6xMdUACXvB8TYdAj8WEsKzWB0EkpJN+RDtOf8= +github.com/VictoriaMetrics/metrics v1.33.0 h1:EnkDEaGiL2u95t+W76GfecC/LMYpy+tFrexYzBWQIAc= +github.com/VictoriaMetrics/metrics v1.33.0/go.mod h1:r7hveu6xMdUACXvB8TYdAj8WEsKzWB0EkpJN+RDtOf8= github.com/VictoriaMetrics/metricsql v0.74.0 h1:bVO7USXBBYEuEHQ3PZg/6216j0DvblZM+Q8sTRECkv0= github.com/VictoriaMetrics/metricsql v0.74.0/go.mod h1:k4UaP/+CjuZslIjd+kCigNG9TQmUqh5v0TP/nMEy90I= github.com/VividCortex/ewma v1.2.0 h1:f58SaIzcDXrSy3kWaHNvuJgJ3Nmz59Zji6XoJR/q1ow= diff --git a/lib/backup/common/part.go b/lib/backup/common/part.go index 8b9c68e31..84e3d1d70 100644 --- a/lib/backup/common/part.go +++ b/lib/backup/common/part.go @@ -46,14 +46,14 @@ func (p *Part) key() string { // so it must have an unique key in order to always copy it during // backup, restore and server-side copy. // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5005 - id := atomic.AddUint64(&uniqueKeyID, 1) + id := uniqueKeyID.Add(1) return fmt.Sprintf("unique-%016X", id) } // Do not use p.FileSize in the key, since it cannot be properly initialized when resuming the restore for partially restored file return fmt.Sprintf("%s%016X%016X%016X", p.Path, p.Offset, p.Size, p.ActualSize) } -var uniqueKeyID uint64 +var uniqueKeyID atomic.Uint64 // String returns human-readable representation of the part. func (p *Part) String() string { diff --git a/lib/blockcache/blockcache.go b/lib/blockcache/blockcache.go index 9192ffd70..c331f4227 100644 --- a/lib/blockcache/blockcache.go +++ b/lib/blockcache/blockcache.go @@ -172,14 +172,11 @@ func (c *Cache) cleanPerKeyMisses() { } type cache struct { - // Atomically updated fields must go first in the struct, so they are properly - // aligned to 8 bytes on 32-bit architectures. - // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/212 - requests uint64 - misses uint64 + requests atomic.Uint64 + misses atomic.Uint64 // sizeBytes contains an approximate size for all the blocks stored in the cache. - sizeBytes int64 + sizeBytes atomic.Int64 // getMaxSizeBytes() is a callback, which returns the maximum allowed cache size in bytes. getMaxSizeBytes func() int @@ -256,7 +253,7 @@ func (c *cache) RemoveBlocksForPart(p interface{}) { } func (c *cache) updateSizeBytes(n int) { - atomic.AddInt64(&c.sizeBytes, int64(n)) + c.sizeBytes.Add(int64(n)) } func (c *cache) cleanPerKeyMisses() { @@ -281,7 +278,7 @@ func (c *cache) cleanByTimeout() { } func (c *cache) GetBlock(k Key) Block { - atomic.AddUint64(&c.requests, 1) + c.requests.Add(1) var e *cacheEntry c.mu.Lock() defer c.mu.Unlock() @@ -301,7 +298,7 @@ func (c *cache) GetBlock(k Key) Block { } // Slow path - the entry is missing in the cache. c.perKeyMisses[k]++ - atomic.AddUint64(&c.misses, 1) + c.misses.Add(1) return nil } @@ -367,7 +364,7 @@ func (c *cache) Len() int { } func (c *cache) SizeBytes() int { - return int(atomic.LoadInt64(&c.sizeBytes)) + return int(c.sizeBytes.Load()) } func (c *cache) SizeMaxBytes() int { @@ -375,11 +372,11 @@ func (c *cache) SizeMaxBytes() int { } func (c *cache) Requests() uint64 { - return atomic.LoadUint64(&c.requests) + return c.requests.Load() } func (c *cache) Misses() uint64 { - return atomic.LoadUint64(&c.misses) + return c.misses.Load() } // lastAccessHeap implements heap.Interface diff --git a/lib/blockcache/blockcache_timing_test.go b/lib/blockcache/blockcache_timing_test.go index 776a29585..0fe8735ae 100644 --- a/lib/blockcache/blockcache_timing_test.go +++ b/lib/blockcache/blockcache_timing_test.go @@ -16,11 +16,11 @@ func BenchmarkKeyHashUint64(b *testing.B) { h := k.hashUint64() hSum += h } - atomic.AddUint64(&BenchSink, hSum) + BenchSink.Add(hSum) }) } -var BenchSink uint64 +var BenchSink atomic.Uint64 func BenchmarkCacheGet(b *testing.B) { c := NewCache(func() int { diff --git a/lib/bloomfilter/limiter.go b/lib/bloomfilter/limiter.go index 70069ddd8..117846c34 100644 --- a/lib/bloomfilter/limiter.go +++ b/lib/bloomfilter/limiter.go @@ -56,7 +56,7 @@ func (l *Limiter) MaxItems() int { // CurrentItems return the current number of items registered in l. func (l *Limiter) CurrentItems() int { lm := l.v.Load() - n := atomic.LoadUint64(&lm.currentItems) + n := lm.currentItems.Load() return int(n) } @@ -72,7 +72,7 @@ func (l *Limiter) Add(h uint64) bool { } type limiter struct { - currentItems uint64 + currentItems atomic.Uint64 f *filter } @@ -83,12 +83,12 @@ func newLimiter(maxItems int) *limiter { } func (l *limiter) Add(h uint64) bool { - currentItems := atomic.LoadUint64(&l.currentItems) + currentItems := l.currentItems.Load() if currentItems >= uint64(l.f.maxItems) { return l.f.Has(h) } if l.f.Add(h) { - atomic.AddUint64(&l.currentItems, 1) + l.currentItems.Add(1) } return true } diff --git a/lib/bytesutil/bytesutil_timing_test.go b/lib/bytesutil/bytesutil_timing_test.go index c140a50fe..6ba8b0d90 100644 --- a/lib/bytesutil/bytesutil_timing_test.go +++ b/lib/bytesutil/bytesutil_timing_test.go @@ -17,7 +17,7 @@ func BenchmarkToUnsafeString(b *testing.B) { n += len(s) } } - atomic.AddUint64(&Sink, uint64(n)) + Sink.Add(uint64(n)) }) } @@ -33,8 +33,8 @@ func BenchmarkToUnsafeBytes(b *testing.B) { n += len(s) } } - atomic.AddUint64(&Sink, uint64(n)) + Sink.Add(uint64(n)) }) } -var Sink uint64 +var Sink atomic.Uint64 diff --git a/lib/bytesutil/fast_string_matcher.go b/lib/bytesutil/fast_string_matcher.go index 1b8d02b93..5d2b0643e 100644 --- a/lib/bytesutil/fast_string_matcher.go +++ b/lib/bytesutil/fast_string_matcher.go @@ -13,7 +13,7 @@ import ( // It caches string match results and returns them back on the next calls // without calling the matchFunc, which may be expensive. type FastStringMatcher struct { - lastCleanupTime uint64 + lastCleanupTime atomic.Uint64 m sync.Map @@ -21,7 +21,7 @@ type FastStringMatcher struct { } type fsmEntry struct { - lastAccessTime uint64 + lastAccessTime atomic.Uint64 ok bool } @@ -29,10 +29,11 @@ type fsmEntry struct { // // matchFunc must return the same result for the same input. func NewFastStringMatcher(matchFunc func(s string) bool) *FastStringMatcher { - return &FastStringMatcher{ - lastCleanupTime: fasttime.UnixTimestamp(), - matchFunc: matchFunc, + fsm := &FastStringMatcher{ + matchFunc: matchFunc, } + fsm.lastCleanupTime.Store(fasttime.UnixTimestamp()) + return fsm } // Match applies matchFunc to s and returns the result. @@ -46,19 +47,19 @@ func (fsm *FastStringMatcher) Match(s string) bool { if ok { // Fast path - s match result is found in the cache. e := v.(*fsmEntry) - if atomic.LoadUint64(&e.lastAccessTime)+10 < ct { + if e.lastAccessTime.Load()+10 < ct { // Reduce the frequency of e.lastAccessTime update to once per 10 seconds // in order to improve the fast path speed on systems with many CPU cores. - atomic.StoreUint64(&e.lastAccessTime, ct) + e.lastAccessTime.Store(ct) } return e.ok } // Slow path - run matchFunc for s and store the result in the cache. b := fsm.matchFunc(s) e := &fsmEntry{ - lastAccessTime: ct, - ok: b, + ok: b, } + e.lastAccessTime.Store(ct) // Make a copy of s in order to limit memory usage to the s length, // since the s may point to bigger string. // This also protects from the case when s contains unsafe string, which points to a temporary byte slice. @@ -72,7 +73,7 @@ func (fsm *FastStringMatcher) Match(s string) bool { deadline := ct - uint64(cacheExpireDuration.Seconds()) m.Range(func(k, v interface{}) bool { e := v.(*fsmEntry) - if atomic.LoadUint64(&e.lastAccessTime) < deadline { + if e.lastAccessTime.Load() < deadline { m.Delete(k) } return true @@ -82,13 +83,13 @@ func (fsm *FastStringMatcher) Match(s string) bool { return b } -func needCleanup(lastCleanupTime *uint64, currentTime uint64) bool { - lct := atomic.LoadUint64(lastCleanupTime) +func needCleanup(lastCleanupTime *atomic.Uint64, currentTime uint64) bool { + lct := lastCleanupTime.Load() if lct+61 >= currentTime { return false } // Atomically compare and swap the current time with the lastCleanupTime // in order to guarantee that only a single goroutine out of multiple // concurrently executing goroutines gets true from the call. - return atomic.CompareAndSwapUint64(lastCleanupTime, lct, currentTime) + return lastCleanupTime.CompareAndSwap(lct, currentTime) } diff --git a/lib/bytesutil/fast_string_matcher_test.go b/lib/bytesutil/fast_string_matcher_test.go index f9c7e29f0..3643f7547 100644 --- a/lib/bytesutil/fast_string_matcher_test.go +++ b/lib/bytesutil/fast_string_matcher_test.go @@ -2,6 +2,7 @@ package bytesutil import ( "strings" + "sync/atomic" "testing" ) @@ -27,18 +28,19 @@ func TestFastStringMatcher(t *testing.T) { func TestNeedCleanup(t *testing.T) { f := func(lastCleanupTime, currentTime uint64, resultExpected bool) { t.Helper() - lct := lastCleanupTime + var lct atomic.Uint64 + lct.Store(lastCleanupTime) result := needCleanup(&lct, currentTime) if result != resultExpected { t.Fatalf("unexpected result for needCleanup(%d, %d); got %v; want %v", lastCleanupTime, currentTime, result, resultExpected) } if result { - if lct != currentTime { - t.Fatalf("unexpected value for lct; got %d; want currentTime=%d", lct, currentTime) + if n := lct.Load(); n != currentTime { + t.Fatalf("unexpected value for lct; got %d; want currentTime=%d", n, currentTime) } } else { - if lct != lastCleanupTime { - t.Fatalf("unexpected value for lct; got %d; want lastCleanupTime=%d", lct, lastCleanupTime) + if n := lct.Load(); n != lastCleanupTime { + t.Fatalf("unexpected value for lct; got %d; want lastCleanupTime=%d", n, lastCleanupTime) } } } diff --git a/lib/bytesutil/fast_string_matcher_timing_test.go b/lib/bytesutil/fast_string_matcher_timing_test.go index 7ccd51eed..49e6708c0 100644 --- a/lib/bytesutil/fast_string_matcher_timing_test.go +++ b/lib/bytesutil/fast_string_matcher_timing_test.go @@ -2,7 +2,6 @@ package bytesutil import ( "strings" - "sync/atomic" "testing" ) @@ -28,6 +27,6 @@ func benchmarkFastStringMatcher(b *testing.B, s string) { n++ } } - atomic.AddUint64(&GlobalSink, n) + GlobalSink.Add(n) }) } diff --git a/lib/bytesutil/fast_string_transformer.go b/lib/bytesutil/fast_string_transformer.go index 4dcc930d1..342071cc9 100644 --- a/lib/bytesutil/fast_string_transformer.go +++ b/lib/bytesutil/fast_string_transformer.go @@ -13,7 +13,7 @@ import ( // It caches transformed strings and returns them back on the next calls // without calling the transformFunc, which may be expensive. type FastStringTransformer struct { - lastCleanupTime uint64 + lastCleanupTime atomic.Uint64 m sync.Map @@ -21,7 +21,7 @@ type FastStringTransformer struct { } type fstEntry struct { - lastAccessTime uint64 + lastAccessTime atomic.Uint64 s string } @@ -29,10 +29,11 @@ type fstEntry struct { // // transformFunc must return the same result for the same input. func NewFastStringTransformer(transformFunc func(s string) string) *FastStringTransformer { - return &FastStringTransformer{ - lastCleanupTime: fasttime.UnixTimestamp(), - transformFunc: transformFunc, + fst := &FastStringTransformer{ + transformFunc: transformFunc, } + fst.lastCleanupTime.Store(fasttime.UnixTimestamp()) + return fst } // Transform applies transformFunc to s and returns the result. @@ -52,10 +53,10 @@ func (fst *FastStringTransformer) Transform(s string) string { if ok { // Fast path - the transformed s is found in the cache. e := v.(*fstEntry) - if atomic.LoadUint64(&e.lastAccessTime)+10 < ct { + if e.lastAccessTime.Load()+10 < ct { // Reduce the frequency of e.lastAccessTime update to once per 10 seconds // in order to improve the fast path speed on systems with many CPU cores. - atomic.StoreUint64(&e.lastAccessTime, ct) + e.lastAccessTime.Store(ct) } return e.s } @@ -72,9 +73,9 @@ func (fst *FastStringTransformer) Transform(s string) string { sTransformed = s } e := &fstEntry{ - lastAccessTime: ct, - s: sTransformed, + s: sTransformed, } + e.lastAccessTime.Store(ct) fst.m.Store(s, e) if needCleanup(&fst.lastCleanupTime, ct) { @@ -83,7 +84,7 @@ func (fst *FastStringTransformer) Transform(s string) string { deadline := ct - uint64(cacheExpireDuration.Seconds()) m.Range(func(k, v interface{}) bool { e := v.(*fstEntry) - if atomic.LoadUint64(&e.lastAccessTime) < deadline { + if e.lastAccessTime.Load() < deadline { m.Delete(k) } return true diff --git a/lib/bytesutil/fast_string_transformer_timing_test.go b/lib/bytesutil/fast_string_transformer_timing_test.go index b23fdbc05..793682934 100644 --- a/lib/bytesutil/fast_string_transformer_timing_test.go +++ b/lib/bytesutil/fast_string_transformer_timing_test.go @@ -24,8 +24,8 @@ func benchmarkFastStringTransformer(b *testing.B, s string) { sTransformed := fst.Transform(s) n += uint64(len(sTransformed)) } - atomic.AddUint64(&GlobalSink, n) + GlobalSink.Add(n) }) } -var GlobalSink uint64 +var GlobalSink atomic.Uint64 diff --git a/lib/bytesutil/internstring.go b/lib/bytesutil/internstring.go index a9521245e..e62445ec3 100644 --- a/lib/bytesutil/internstring.go +++ b/lib/bytesutil/internstring.go @@ -42,19 +42,19 @@ func InternString(s string) string { ct := fasttime.UnixTimestamp() if v, ok := internStringsMap.Load(s); ok { e := v.(*ismEntry) - if atomic.LoadUint64(&e.lastAccessTime)+10 < ct { + if e.lastAccessTime.Load()+10 < ct { // Reduce the frequency of e.lastAccessTime update to once per 10 seconds // in order to improve the fast path speed on systems with many CPU cores. - atomic.StoreUint64(&e.lastAccessTime, ct) + e.lastAccessTime.Store(ct) } return e.s } // Make a new copy for s in order to remove references from possible bigger string s refers to. sCopy := strings.Clone(s) e := &ismEntry{ - lastAccessTime: ct, - s: sCopy, + s: sCopy, } + e.lastAccessTime.Store(ct) internStringsMap.Store(sCopy, e) if needCleanup(&internStringsMapLastCleanupTime, ct) { @@ -63,7 +63,7 @@ func InternString(s string) string { deadline := ct - uint64(cacheExpireDuration.Seconds()) m.Range(func(k, v interface{}) bool { e := v.(*ismEntry) - if atomic.LoadUint64(&e.lastAccessTime) < deadline { + if e.lastAccessTime.Load() < deadline { m.Delete(k) } return true @@ -74,11 +74,11 @@ func InternString(s string) string { } type ismEntry struct { - lastAccessTime uint64 + lastAccessTime atomic.Uint64 s string } var ( internStringsMap sync.Map - internStringsMapLastCleanupTime uint64 + internStringsMapLastCleanupTime atomic.Uint64 ) diff --git a/lib/decimal/decimal_timing_test.go b/lib/decimal/decimal_timing_test.go index 9dce15dee..061a18ef3 100644 --- a/lib/decimal/decimal_timing_test.go +++ b/lib/decimal/decimal_timing_test.go @@ -51,7 +51,7 @@ func benchmarkAppendDecimalToFloat(b *testing.B, a []int64, scale int16) { var fa []float64 for pb.Next() { fa = AppendDecimalToFloat(fa[:0], a, scale) - atomic.AddUint64(&Sink, uint64(len(fa))) + Sink.Add(uint64(len(fa))) } }) } @@ -101,7 +101,7 @@ func benchmarkAppendFloatToDecimal(b *testing.B, fa []float64) { sink += uint64(len(da)) sink += uint64(e) } - atomic.AddUint64(&Sink, sink) + Sink.Add(sink) }) } @@ -138,10 +138,10 @@ func BenchmarkFromFloat(b *testing.B) { sink += uint64(v) sink += uint64(e) } - atomic.AddUint64(&Sink, sink) + Sink.Add(sink) }) }) } } -var Sink uint64 +var Sink atomic.Uint64 diff --git a/lib/encoding/encoding_timing_test.go b/lib/encoding/encoding_timing_test.go index fbfed7cf6..76843bd19 100644 --- a/lib/encoding/encoding_timing_test.go +++ b/lib/encoding/encoding_timing_test.go @@ -18,12 +18,12 @@ func BenchmarkMarshalGaugeArray(b *testing.B) { if mt != MarshalTypeZSTDNearestDelta { panic(fmt.Errorf("unexpected marshal type; got %d; expecting %d", mt, MarshalTypeZSTDNearestDelta)) } - atomic.AddUint64(&Sink, uint64(len(dst))) + Sink.Add(uint64(len(dst))) } }) } -var Sink uint64 +var Sink atomic.Uint64 func BenchmarkUnmarshalGaugeArray(b *testing.B) { b.ReportAllocs() @@ -36,7 +36,7 @@ func BenchmarkUnmarshalGaugeArray(b *testing.B) { if err != nil { panic(fmt.Errorf("cannot unmarshal gauge array: %w", err)) } - atomic.AddUint64(&Sink, uint64(len(dst))) + Sink.Add(uint64(len(dst))) } }) } @@ -68,7 +68,7 @@ func BenchmarkMarshalDeltaConstArray(b *testing.B) { if mt != MarshalTypeDeltaConst { panic(fmt.Errorf("unexpected marshal type; got %d; expecting %d", mt, MarshalTypeDeltaConst)) } - atomic.AddUint64(&Sink, uint64(len(dst))) + Sink.Add(uint64(len(dst))) } }) } @@ -84,7 +84,7 @@ func BenchmarkUnmarshalDeltaConstArray(b *testing.B) { if err != nil { panic(fmt.Errorf("cannot unmarshal delta const array: %w", err)) } - atomic.AddUint64(&Sink, uint64(len(dst))) + Sink.Add(uint64(len(dst))) } }) } @@ -115,7 +115,7 @@ func BenchmarkMarshalConstArray(b *testing.B) { if mt != MarshalTypeConst { panic(fmt.Errorf("unexpected marshal type; got %d; expecting %d", mt, MarshalTypeConst)) } - atomic.AddUint64(&Sink, uint64(len(dst))) + Sink.Add(uint64(len(dst))) } }) } @@ -131,7 +131,7 @@ func BenchmarkUnmarshalConstArray(b *testing.B) { if err != nil { panic(fmt.Errorf("cannot unmarshal const array: %w", err)) } - atomic.AddUint64(&Sink, uint64(len(dst))) + Sink.Add(uint64(len(dst))) } }) } @@ -160,7 +160,7 @@ func BenchmarkMarshalZeroConstArray(b *testing.B) { if mt != MarshalTypeConst { panic(fmt.Errorf("unexpected marshal type; got %d; expecting %d", mt, MarshalTypeConst)) } - atomic.AddUint64(&Sink, uint64(len(dst))) + Sink.Add(uint64(len(dst))) } }) } @@ -176,7 +176,7 @@ func BenchmarkUnmarshalZeroConstArray(b *testing.B) { if err != nil { panic(fmt.Errorf("cannot unmarshal zero const array: %w", err)) } - atomic.AddUint64(&Sink, uint64(len(dst))) + Sink.Add(uint64(len(dst))) } }) } @@ -199,7 +199,7 @@ func BenchmarkMarshalInt64Array(b *testing.B) { if mt != benchMarshalType { panic(fmt.Errorf("unexpected marshal type; got %d; expecting %d", mt, benchMarshalType)) } - atomic.AddUint64(&Sink, uint64(len(dst))) + Sink.Add(uint64(len(dst))) } }) } @@ -215,7 +215,7 @@ func BenchmarkUnmarshalInt64Array(b *testing.B) { if err != nil { panic(fmt.Errorf("cannot unmarshal int64 array: %w", err)) } - atomic.AddUint64(&Sink, uint64(len(dst))) + Sink.Add(uint64(len(dst))) } }) } diff --git a/lib/encoding/int_timing_test.go b/lib/encoding/int_timing_test.go index 0beb2ba49..1316ef514 100644 --- a/lib/encoding/int_timing_test.go +++ b/lib/encoding/int_timing_test.go @@ -2,7 +2,6 @@ package encoding import ( "fmt" - "sync/atomic" "testing" ) @@ -16,7 +15,7 @@ func BenchmarkMarshalUint64(b *testing.B) { dst = MarshalUint64(dst[:0], sink) sink += uint64(len(dst)) } - atomic.AddUint64(&Sink, sink) + Sink.Add(sink) }) } @@ -29,7 +28,7 @@ func BenchmarkUnmarshalUint64(b *testing.B) { v := UnmarshalUint64(testMarshaledUint64Data) sink += v } - atomic.AddUint64(&Sink, sink) + Sink.Add(sink) }) } @@ -43,7 +42,7 @@ func BenchmarkMarshalInt64(b *testing.B) { dst = MarshalInt64(dst[:0], int64(sink)) sink += uint64(len(dst)) } - atomic.AddUint64(&Sink, sink) + Sink.Add(sink) }) } @@ -56,7 +55,7 @@ func BenchmarkUnmarshalInt64(b *testing.B) { v := UnmarshalInt64(testMarshaledInt64Data) sink += uint64(v) } - atomic.AddUint64(&Sink, sink) + Sink.Add(sink) }) } @@ -96,7 +95,7 @@ func benchmarkMarshalVarUint64s(b *testing.B, maxValue uint64) { dst = MarshalVarUint64s(dst[:0], data) sink += uint64(len(dst)) } - atomic.AddUint64(&Sink, sink) + Sink.Add(sink) }) } @@ -136,7 +135,7 @@ func benchmarkMarshalVarInt64s(b *testing.B, maxValue int64) { dst = MarshalVarInt64s(dst[:0], data) sink += uint64(len(dst)) } - atomic.AddUint64(&Sink, sink) + Sink.Add(sink) }) } @@ -182,7 +181,7 @@ func benchmarkUnmarshalVarUint64s(b *testing.B, maxValue uint64) { } sink += uint64(len(dst)) } - atomic.AddUint64(&Sink, sink) + Sink.Add(sink) }) } @@ -228,7 +227,7 @@ func benchmarkUnmarshalVarInt64s(b *testing.B, maxValue int64) { } sink += uint64(len(dst)) } - atomic.AddUint64(&Sink, sink) + Sink.Add(sink) }) } diff --git a/lib/encoding/nearest_delta2_timing_test.go b/lib/encoding/nearest_delta2_timing_test.go index ee58e4c83..b10196168 100644 --- a/lib/encoding/nearest_delta2_timing_test.go +++ b/lib/encoding/nearest_delta2_timing_test.go @@ -2,7 +2,6 @@ package encoding import ( "fmt" - "sync/atomic" "testing" ) @@ -21,7 +20,7 @@ func benchmarkMarshalInt64NearestDelta2(b *testing.B, precisionBits uint8) { var dst []byte for pb.Next() { dst, _ = marshalInt64NearestDelta2(dst[:0], benchInt64Array, precisionBits) - atomic.AddUint64(&Sink, uint64(len(dst))) + Sink.Add(uint64(len(dst))) } }) } @@ -37,7 +36,7 @@ func BenchmarkUnmarshalInt64NearestDelta2(b *testing.B) { if err != nil { panic(fmt.Errorf("unexpected error: %w", err)) } - atomic.AddUint64(&Sink, uint64(len(dst))) + Sink.Add(uint64(len(dst))) } }) } diff --git a/lib/encoding/nearest_delta_timing_test.go b/lib/encoding/nearest_delta_timing_test.go index 3eb0c3126..383e7e147 100644 --- a/lib/encoding/nearest_delta_timing_test.go +++ b/lib/encoding/nearest_delta_timing_test.go @@ -2,7 +2,6 @@ package encoding import ( "fmt" - "sync/atomic" "testing" ) @@ -21,7 +20,7 @@ func benchmarkMarshalInt64NearestDelta(b *testing.B, precisionBits uint8) { var dst []byte for pb.Next() { dst, _ = marshalInt64NearestDelta(dst[:0], benchInt64Array, precisionBits) - atomic.AddUint64(&Sink, uint64(len(dst))) + Sink.Add(uint64(len(dst))) } }) } @@ -37,7 +36,7 @@ func BenchmarkUnmarshalInt64NearestDelta(b *testing.B) { if err != nil { panic(fmt.Errorf("unexpected error: %w", err)) } - atomic.AddUint64(&Sink, uint64(len(dst))) + Sink.Add(uint64(len(dst))) } }) } diff --git a/lib/fasttime/fasttime.go b/lib/fasttime/fasttime.go index f50a27f7f..cf44a51de 100644 --- a/lib/fasttime/fasttime.go +++ b/lib/fasttime/fasttime.go @@ -11,18 +11,22 @@ func init() { defer ticker.Stop() for tm := range ticker.C { t := uint64(tm.Unix()) - atomic.StoreUint64(¤tTimestamp, t) + currentTimestamp.Store(t) } }() } -var currentTimestamp = uint64(time.Now().Unix()) +var currentTimestamp = func() *atomic.Uint64 { + var x atomic.Uint64 + x.Store(uint64(time.Now().Unix())) + return &x +}() // UnixTimestamp returns the current unix timestamp in seconds. // // It is faster than time.Now().Unix() func UnixTimestamp() uint64 { - return atomic.LoadUint64(¤tTimestamp) + return currentTimestamp.Load() } // UnixDate returns date from the current unix timestamp. diff --git a/lib/fasttime/fasttime_timing_test.go b/lib/fasttime/fasttime_timing_test.go index bd27085c8..becda781b 100644 --- a/lib/fasttime/fasttime_timing_test.go +++ b/lib/fasttime/fasttime_timing_test.go @@ -13,7 +13,7 @@ func BenchmarkUnixTimestamp(b *testing.B) { for pb.Next() { ts += UnixTimestamp() } - atomic.StoreUint64(&Sink, ts) + Sink.Store(ts) }) } @@ -24,9 +24,9 @@ func BenchmarkTimeNowUnix(b *testing.B) { for pb.Next() { ts += uint64(time.Now().Unix()) } - atomic.StoreUint64(&Sink, ts) + Sink.Store(ts) }) } // Sink should prevent from code elimination by optimizing compiler -var Sink uint64 +var Sink atomic.Uint64 diff --git a/lib/flagutil/password.go b/lib/flagutil/password.go index 8aff738f9..0830e8bbf 100644 --- a/lib/flagutil/password.go +++ b/lib/flagutil/password.go @@ -35,7 +35,7 @@ func NewPassword(name, description string) *Password { // If the flag value is file:///path/to/file or http://host/path , // then its contents is automatically re-read from the given file or url type Password struct { - nextRefreshTimestamp uint64 + nextRefreshTimestamp atomic.Uint64 value atomic.Pointer[string] @@ -62,14 +62,14 @@ func (p *Password) maybeRereadPassword() { return } tsCurr := fasttime.UnixTimestamp() - tsNext := atomic.LoadUint64(&p.nextRefreshTimestamp) + tsNext := p.nextRefreshTimestamp.Load() if tsCurr < tsNext { // Fast path - nothing to re-read return } // Re-read password from p.sourcePath - atomic.StoreUint64(&p.nextRefreshTimestamp, tsCurr+2) + p.nextRefreshTimestamp.Store(tsCurr + 2) s, err := fscore.ReadPasswordFromFileOrHTTP(p.sourcePath) if err != nil { // cannot use lib/logger, since it can be uninitialized yet @@ -86,7 +86,7 @@ func (p *Password) String() string { // Set implements flag.Value interface. func (p *Password) Set(value string) error { - atomic.StoreUint64(&p.nextRefreshTimestamp, 0) + p.nextRefreshTimestamp.Store(0) switch { case strings.HasPrefix(value, "file://"): p.sourcePath = strings.TrimPrefix(value, "file://") diff --git a/lib/fs/fs.go b/lib/fs/fs.go index a3521b051..980066b10 100644 --- a/lib/fs/fs.go +++ b/lib/fs/fs.go @@ -16,7 +16,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" ) -var tmpFileNum uint64 +var tmpFileNum atomic.Uint64 // MustSyncPath syncs contents of the given path. func MustSyncPath(path string) { @@ -62,7 +62,7 @@ func MustWriteAtomic(path string, data []byte, canOverwrite bool) { } // Write data to a temporary file. - n := atomic.AddUint64(&tmpFileNum, 1) + n := tmpFileNum.Add(1) tmpPath := fmt.Sprintf("%s.tmp.%d", path, n) MustWriteSync(tmpPath, data) @@ -207,7 +207,11 @@ func MustRemoveDirAtomic(dir string) { MustSyncPath(parentDir) } -var atomicDirRemoveCounter = uint64(time.Now().UnixNano()) +var atomicDirRemoveCounter = func() *atomic.Uint64 { + var x atomic.Uint64 + x.Store(uint64(time.Now().UnixNano())) + return &x +}() // MustReadDir reads directory entries at the given dir. func MustReadDir(dir string) []os.DirEntry { diff --git a/lib/fs/fs_nix.go b/lib/fs/fs_nix.go index fed424edb..634727f04 100644 --- a/lib/fs/fs_nix.go +++ b/lib/fs/fs_nix.go @@ -5,7 +5,6 @@ package fs import ( "fmt" "os" - "sync/atomic" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "golang.org/x/sys/unix" @@ -16,7 +15,7 @@ func freeSpace(stat unix.Statfs_t) uint64 { } func mustRemoveDirAtomic(dir string) { - n := atomic.AddUint64(&atomicDirRemoveCounter, 1) + n := atomicDirRemoveCounter.Add(1) tmpDir := fmt.Sprintf("%s.must-remove.%d", dir, n) if err := os.Rename(dir, tmpDir); err != nil { logger.Panicf("FATAL: cannot move %s to %s: %s", dir, tmpDir, err) diff --git a/lib/fs/fs_openbsd.go b/lib/fs/fs_openbsd.go index 6f3d4904e..2f4574ea6 100644 --- a/lib/fs/fs_openbsd.go +++ b/lib/fs/fs_openbsd.go @@ -3,7 +3,6 @@ package fs import ( "fmt" "os" - "sync/atomic" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "golang.org/x/sys/unix" @@ -14,7 +13,7 @@ func freeSpace(stat unix.Statfs_t) uint64 { } func mustRemoveDirAtomic(dir string) { - n := atomic.AddUint64(&atomicDirRemoveCounter, 1) + n := atomicDirRemoveCounter.Add(1) tmpDir := fmt.Sprintf("%s.must-remove.%d", dir, n) if err := os.Rename(dir, tmpDir); err != nil { logger.Panicf("FATAL: cannot move %s to %s: %s", dir, tmpDir, err) diff --git a/lib/fs/fs_solaris.go b/lib/fs/fs_solaris.go index 923f69e65..b3d880446 100644 --- a/lib/fs/fs_solaris.go +++ b/lib/fs/fs_solaris.go @@ -3,14 +3,13 @@ package fs import ( "fmt" "os" - "sync/atomic" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "golang.org/x/sys/unix" ) func mustRemoveDirAtomic(dir string) { - n := atomic.AddUint64(&atomicDirRemoveCounter, 1) + n := atomicDirRemoveCounter.Add(1) tmpDir := fmt.Sprintf("%s.must-remove.%d", dir, n) if err := os.Rename(dir, tmpDir); err != nil { logger.Panicf("FATAL: cannot move %s to %s: %s", dir, tmpDir, err) diff --git a/lib/fs/fs_windows.go b/lib/fs/fs_windows.go index f455c78cf..af2076afd 100644 --- a/lib/fs/fs_windows.go +++ b/lib/fs/fs_windows.go @@ -5,7 +5,6 @@ import ( "os" "reflect" "sync" - "sync/atomic" "unsafe" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" @@ -25,7 +24,7 @@ func mustSyncPath(path string) { } func mustRemoveDirAtomic(dir string) { - n := atomic.AddUint64(&atomicDirRemoveCounter, 1) + n := atomicDirRemoveCounter.Add(1) tmpDir := fmt.Sprintf("%s.must-remove.%d", dir, n) if err := os.Rename(dir, tmpDir); err != nil { logger.Panicf("FATAL: cannot move %s to %s: %s", dir, tmpDir, err) diff --git a/lib/fs/reader_at.go b/lib/fs/reader_at.go index 379c05a9e..061edae8b 100644 --- a/lib/fs/reader_at.go +++ b/lib/fs/reader_at.go @@ -32,8 +32,8 @@ type MustReadAtCloser interface { // ReaderAt implements rand-access reader. type ReaderAt struct { - readCalls uint64 - readBytes uint64 + readCalls atomic.Int64 + readBytes atomic.Int64 // path contains the path to the file for reading path string @@ -81,8 +81,8 @@ func (r *ReaderAt) MustReadAt(p []byte, off int64) { copy(p, src) } if r.useLocalStats { - atomic.AddUint64(&r.readCalls, 1) - atomic.AddUint64(&r.readBytes, uint64(len(p))) + r.readCalls.Add(1) + r.readBytes.Add(int64(len(p))) } else { readCalls.Inc() readBytes.Add(len(p)) @@ -119,10 +119,10 @@ func (r *ReaderAt) MustClose() { } if r.useLocalStats { - readCalls.Add(int(r.readCalls)) - readBytes.Add(int(r.readBytes)) - r.readCalls = 0 - r.readBytes = 0 + readCalls.AddInt64(r.readCalls.Load()) + readBytes.AddInt64(r.readBytes.Load()) + r.readCalls.Store(0) + r.readBytes.Store(0) r.useLocalStats = false } } diff --git a/lib/httpserver/httpserver.go b/lib/httpserver/httpserver.go index 1f9f9fb1b..45ea74903 100644 --- a/lib/httpserver/httpserver.go +++ b/lib/httpserver/httpserver.go @@ -68,7 +68,7 @@ var ( ) type server struct { - shutdownDelayDeadline int64 + shutdownDelayDeadline atomic.Int64 s *http.Server } @@ -225,7 +225,7 @@ func stop(addr string) error { } deadline := time.Now().Add(*shutdownDelay).UnixNano() - atomic.StoreInt64(&s.shutdownDelayDeadline, deadline) + s.shutdownDelayDeadline.Store(deadline) if *shutdownDelay > 0 { // Sleep for a while until load balancer in front of the server // notifies that "/health" endpoint returns non-OK responses. @@ -339,7 +339,7 @@ func handlerWrapper(s *server, w http.ResponseWriter, r *http.Request, rh Reques switch r.URL.Path { case "/health": h.Set("Content-Type", "text/plain; charset=utf-8") - deadline := atomic.LoadInt64(&s.shutdownDelayDeadline) + deadline := s.shutdownDelayDeadline.Load() if deadline <= 0 { w.Write([]byte("OK")) return diff --git a/lib/lrucache/lrucache.go b/lib/lrucache/lrucache.go index 3d545cb4a..e3199e95e 100644 --- a/lib/lrucache/lrucache.go +++ b/lib/lrucache/lrucache.go @@ -149,14 +149,11 @@ func (c *Cache) cleanByTimeout() { } type cache struct { - // Atomically updated fields must go first in the struct, so they are properly - // aligned to 8 bytes on 32-bit architectures. - // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/212 - requests uint64 - misses uint64 + requests atomic.Uint64 + misses atomic.Uint64 // sizeBytes contains an approximate size for all the blocks stored in the cache. - sizeBytes int64 + sizeBytes atomic.Int64 // getMaxSizeBytes() is a callback, which returns the maximum allowed cache size in bytes. getMaxSizeBytes func() int @@ -204,7 +201,7 @@ func newCache(getMaxSizeBytes func() int) *cache { } func (c *cache) updateSizeBytes(n int) { - atomic.AddInt64(&c.sizeBytes, int64(n)) + c.sizeBytes.Add(int64(n)) } func (c *cache) cleanByTimeout() { @@ -223,13 +220,13 @@ func (c *cache) cleanByTimeout() { } func (c *cache) GetEntry(k string) Entry { - atomic.AddUint64(&c.requests, 1) + c.requests.Add(1) c.mu.Lock() defer c.mu.Unlock() ce := c.m[k] if ce == nil { - atomic.AddUint64(&c.misses, 1) + c.misses.Add(1) return nil } currentTime := fasttime.UnixTimestamp() @@ -277,7 +274,7 @@ func (c *cache) Len() int { } func (c *cache) SizeBytes() int { - return int(atomic.LoadInt64(&c.sizeBytes)) + return int(c.sizeBytes.Load()) } func (c *cache) SizeMaxBytes() int { @@ -285,11 +282,11 @@ func (c *cache) SizeMaxBytes() int { } func (c *cache) Requests() uint64 { - return atomic.LoadUint64(&c.requests) + return c.requests.Load() } func (c *cache) Misses() uint64 { - return atomic.LoadUint64(&c.misses) + return c.misses.Load() } // lastAccessHeap implements heap.Interface diff --git a/lib/netutil/conn.go b/lib/netutil/conn.go index 651752f74..41aefb0b9 100644 --- a/lib/netutil/conn.go +++ b/lib/netutil/conn.go @@ -44,10 +44,7 @@ func (cm *connMetrics) init(ms *metrics.Set, group, name, addr string) { } type statConn struct { - // Move atomic counters to the top of struct in order to properly align them on 32-bit arch. - // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/212 - - closeCalls uint64 + closeCalls atomic.Uint64 net.Conn @@ -90,7 +87,7 @@ func (sc *statConn) Write(p []byte) (int, error) { } func (sc *statConn) Close() error { - n := atomic.AddUint64(&sc.closeCalls, 1) + n := sc.closeCalls.Add(1) if n > 1 { // The connection has been already closed. return nil diff --git a/lib/promscrape/discovery/kubernetes/api_watcher.go b/lib/promscrape/discovery/kubernetes/api_watcher.go index 48544f264..f1c4470b2 100644 --- a/lib/promscrape/discovery/kubernetes/api_watcher.go +++ b/lib/promscrape/discovery/kubernetes/api_watcher.go @@ -104,9 +104,9 @@ func newAPIWatcher(apiServer string, ac *promauth.Config, sdc *SDConfig, swcFunc } func (aw *apiWatcher) mustStart() { - atomic.AddInt32(&aw.gw.apiWatcherInflightStartCalls, 1) + aw.gw.apiWatcherInflightStartCalls.Add(1) aw.gw.startWatchersForRole(aw.role, aw) - atomic.AddInt32(&aw.gw.apiWatcherInflightStartCalls, -1) + aw.gw.apiWatcherInflightStartCalls.Add(-1) } func (aw *apiWatcher) updateSwosCount(multiplier int, swosByKey map[string][]interface{}) { @@ -214,15 +214,15 @@ func (aw *apiWatcher) getScrapeWorkObjects() []interface{} { type groupWatcher struct { // The number of in-flight apiWatcher.mustStart() calls for the given groupWatcher. // This field is used by groupWatchersCleaner() in order to determine when the given groupWatcher can be stopped. - apiWatcherInflightStartCalls int32 + apiWatcherInflightStartCalls atomic.Int32 // Old Kubernetes doesn't support /apis/networking.k8s.io/v1/, so /apis/networking.k8s.io/v1beta1/ must be used instead. // This flag is used for automatic substitution of v1 API path with v1beta1 API path during requests to apiServer. - useNetworkingV1Beta1 uint32 + useNetworkingV1Beta1 atomic.Bool // Old Kubernetes doesn't support /apis/discovery.k8s.io/v1/, so discovery.k8s.io/v1beta1/ must be used instead. // This flag is used for automatic substitution of v1 API path with v1beta1 API path during requests to apiServer. - useDiscoveryV1Beta1 uint32 + useDiscoveryV1Beta1 atomic.Bool apiServer string namespaces []string @@ -343,7 +343,7 @@ func groupWatchersCleaner() { awsTotal += len(uw.aws) + len(uw.awsPending) } - if awsTotal == 0 && atomic.LoadInt32(&gw.apiWatcherInflightStartCalls) == 0 { + if awsTotal == 0 && gw.apiWatcherInflightStartCalls.Load() == 0 { // There are no API watchers subscribed to gw and there are no in-flight apiWatcher.mustStart() calls. // Stop all the urlWatcher instances at gw and drop gw from groupWatchers in this case, // but do it only on the second iteration in order to reduce urlWatcher churn @@ -471,11 +471,11 @@ func (gw *groupWatcher) startWatchersForRole(role string, aw *apiWatcher) { // doRequest performs http request to the given requestURL. func (gw *groupWatcher) doRequest(ctx context.Context, requestURL string) (*http.Response, error) { - if strings.Contains(requestURL, "/apis/networking.k8s.io/v1/") && atomic.LoadUint32(&gw.useNetworkingV1Beta1) == 1 { + if strings.Contains(requestURL, "/apis/networking.k8s.io/v1/") && gw.useNetworkingV1Beta1.Load() { // Update networking URL for old Kubernetes API, which supports only v1beta1 path. requestURL = strings.Replace(requestURL, "/apis/networking.k8s.io/v1/", "/apis/networking.k8s.io/v1beta1/", 1) } - if strings.Contains(requestURL, "/apis/discovery.k8s.io/v1/") && atomic.LoadUint32(&gw.useDiscoveryV1Beta1) == 1 { + if strings.Contains(requestURL, "/apis/discovery.k8s.io/v1/") && gw.useDiscoveryV1Beta1.Load() { // Update discovery URL for old Kubernetes API, which supports only v1beta1 path. requestURL = strings.Replace(requestURL, "/apis/discovery.k8s.io/v1/", "/apis/discovery.k8s.io/v1beta1/", 1) } @@ -491,12 +491,12 @@ func (gw *groupWatcher) doRequest(ctx context.Context, requestURL string) (*http return nil, err } if resp.StatusCode == http.StatusNotFound { - if strings.Contains(requestURL, "/apis/networking.k8s.io/v1/") && atomic.LoadUint32(&gw.useNetworkingV1Beta1) == 0 { - atomic.StoreUint32(&gw.useNetworkingV1Beta1, 1) + if strings.Contains(requestURL, "/apis/networking.k8s.io/v1/") && !gw.useNetworkingV1Beta1.Load() { + gw.useNetworkingV1Beta1.Store(true) return gw.doRequest(ctx, requestURL) } - if strings.Contains(requestURL, "/apis/discovery.k8s.io/v1/") && atomic.LoadUint32(&gw.useDiscoveryV1Beta1) == 0 { - atomic.StoreUint32(&gw.useDiscoveryV1Beta1, 1) + if strings.Contains(requestURL, "/apis/discovery.k8s.io/v1/") && !gw.useDiscoveryV1Beta1.Load() { + gw.useDiscoveryV1Beta1.Store(true) return gw.doRequest(ctx, requestURL) } } diff --git a/lib/promscrape/scraper.go b/lib/promscrape/scraper.go index 183dcd0e1..87b52a5b3 100644 --- a/lib/promscrape/scraper.go +++ b/lib/promscrape/scraper.go @@ -79,9 +79,9 @@ func Stop() { var ( globalStopChan chan struct{} scraperWG sync.WaitGroup - // PendingScrapeConfigs - zero value means, that - // all scrapeConfigs are inited and ready for work. - PendingScrapeConfigs int32 + + // PendingScrapeConfigs - zero value means, that all scrapeConfigs are inited and ready for work. + PendingScrapeConfigs atomic.Int32 // configData contains -promscrape.config data configData atomic.Pointer[[]byte] @@ -225,7 +225,7 @@ func newScrapeConfigs(pushData func(at *auth.Token, wr *prompbmarshal.WriteReque } func (scs *scrapeConfigs) add(name string, checkInterval time.Duration, getScrapeWork func(cfg *Config, swsPrev []*ScrapeWork) []*ScrapeWork) { - atomic.AddInt32(&PendingScrapeConfigs, 1) + PendingScrapeConfigs.Add(1) scfg := &scrapeConfig{ name: name, pushData: scs.pushData, @@ -292,7 +292,7 @@ func (scfg *scrapeConfig) run(globalStopCh <-chan struct{}) { } } updateScrapeWork(cfg) - atomic.AddInt32(&PendingScrapeConfigs, -1) + PendingScrapeConfigs.Add(-1) for { diff --git a/lib/promscrape/statconn.go b/lib/promscrape/statconn.go index 6167646da..296f8d1d6 100644 --- a/lib/promscrape/statconn.go +++ b/lib/promscrape/statconn.go @@ -56,7 +56,7 @@ var ( ) type statConn struct { - closed uint64 + closed atomic.Int32 net.Conn } @@ -82,7 +82,7 @@ func (sc *statConn) Write(p []byte) (int, error) { func (sc *statConn) Close() error { err := sc.Conn.Close() - if atomic.AddUint64(&sc.closed, 1) == 1 { + if sc.closed.Add(1) == 1 { conns.Dec() } return err diff --git a/lib/snapshot/snapshotutil/snapshotutil.go b/lib/snapshot/snapshotutil/snapshotutil.go index 9c55fe4ac..77dcf910f 100644 --- a/lib/snapshot/snapshotutil/snapshotutil.go +++ b/lib/snapshot/snapshotutil/snapshotutil.go @@ -41,7 +41,11 @@ func NewName() string { } func nextSnapshotIdx() uint64 { - return atomic.AddUint64(&snapshotIdx, 1) + return snapshotIdx.Add(1) } -var snapshotIdx = uint64(time.Now().UnixNano()) +var snapshotIdx = func() *atomic.Uint64 { + var x atomic.Uint64 + x.Store(uint64(time.Now().UnixNano())) + return &x +}() diff --git a/lib/workingsetcache/cache.go b/lib/workingsetcache/cache.go index cc27fcb08..16dffe310 100644 --- a/lib/workingsetcache/cache.go +++ b/lib/workingsetcache/cache.go @@ -43,7 +43,7 @@ type Cache struct { // In this case using prev would result in RAM waste, // it is better to use only curr cache with doubled size. // After the process of switching, this flag will be set to whole. - mode uint32 + mode atomic.Uint32 // The maxBytes value passed to New() or to Load(). maxBytes int @@ -110,7 +110,7 @@ func newCacheInternal(curr, prev *fastcache.Cache, mode, maxBytes int) *Cache { c.curr.Store(curr) c.prev.Store(prev) c.stopCh = make(chan struct{}) - c.setMode(mode) + c.mode.Store(uint32(mode)) return &c } @@ -143,7 +143,7 @@ func (c *Cache) expirationWatcher(expireDuration time.Duration) { case <-t.C: } c.mu.Lock() - if atomic.LoadUint32(&c.mode) != split { + if c.mode.Load() != split { // Stop the expirationWatcher on non-split mode. c.mu.Unlock() return @@ -183,7 +183,7 @@ func (c *Cache) prevCacheWatcher() { case <-t.C: } c.mu.Lock() - if atomic.LoadUint32(&c.mode) != split { + if c.mode.Load() != split { // Do nothing in non-split mode. c.mu.Unlock() return @@ -227,7 +227,7 @@ func (c *Cache) cacheSizeWatcher() { return case <-t.C: } - if c.loadMode() != split { + if c.mode.Load() != split { continue } var cs fastcache.Stats @@ -252,7 +252,7 @@ func (c *Cache) cacheSizeWatcher() { // 6) drop prev cache c.mu.Lock() - c.setMode(switching) + c.mode.Store(switching) prev := c.prev.Load() curr := c.curr.Load() c.prev.Store(curr) @@ -280,7 +280,7 @@ func (c *Cache) cacheSizeWatcher() { } c.mu.Lock() - c.setMode(whole) + c.mode.Store(whole) prev = c.prev.Load() c.prev.Store(fastcache.New(1024)) cs.Reset() @@ -318,15 +318,7 @@ func (c *Cache) Reset() { updateCacheStatsHistory(&c.csHistory, &cs) curr.Reset() // Reset the mode to `split` in the hope the working set size becomes smaller after the reset. - c.setMode(split) -} - -func (c *Cache) setMode(mode int) { - atomic.StoreUint32(&c.mode, uint32(mode)) -} - -func (c *Cache) loadMode() int { - return int(atomic.LoadUint32(&c.mode)) + c.mode.Store(split) } // UpdateStats updates fcs with cache stats. @@ -374,7 +366,7 @@ func (c *Cache) Get(dst, key []byte) []byte { // Fast path - the entry is found in the current cache. return result } - if c.loadMode() == whole { + if c.mode.Load() == whole { // Nothing found. return result } @@ -397,7 +389,7 @@ func (c *Cache) Has(key []byte) bool { if curr.Has(key) { return true } - if c.loadMode() == whole { + if c.mode.Load() == whole { return false } prev := c.prev.Load() @@ -428,7 +420,7 @@ func (c *Cache) GetBig(dst, key []byte) []byte { // Fast path - the entry is found in the current cache. return result } - if c.loadMode() == whole { + if c.mode.Load() == whole { // Nothing found. return result } diff --git a/vendor/github.com/VictoriaMetrics/metrics/counter.go b/vendor/github.com/VictoriaMetrics/metrics/counter.go index 25973a8a5..1076e80c2 100644 --- a/vendor/github.com/VictoriaMetrics/metrics/counter.go +++ b/vendor/github.com/VictoriaMetrics/metrics/counter.go @@ -42,6 +42,11 @@ func (c *Counter) Add(n int) { atomic.AddUint64(&c.n, uint64(n)) } +// AddInt64 adds n to c. +func (c *Counter) AddInt64(n int64) { + atomic.AddUint64(&c.n, uint64(n)) +} + // Get returns the current value for c. func (c *Counter) Get() uint64 { return atomic.LoadUint64(&c.n) diff --git a/vendor/modules.txt b/vendor/modules.txt index d183be4bf..878000848 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -99,7 +99,7 @@ github.com/VictoriaMetrics/easyproto # github.com/VictoriaMetrics/fastcache v1.12.2 ## explicit; go 1.13 github.com/VictoriaMetrics/fastcache -# github.com/VictoriaMetrics/metrics v1.32.0 +# github.com/VictoriaMetrics/metrics v1.33.0 ## explicit; go 1.17 github.com/VictoriaMetrics/metrics # github.com/VictoriaMetrics/metricsql v0.74.0