lib: consistently use atomic.* types instead of atomic.* functions

See ea9e2b19a5
This commit is contained in:
Aliaksandr Valialkin 2024-02-24 02:07:51 +02:00
parent 61519f6c22
commit d845edc24b
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB
39 changed files with 185 additions and 190 deletions

View file

@ -8,7 +8,6 @@ import (
"net/http"
"os"
"strings"
"sync/atomic"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/csvimport"
@ -459,7 +458,7 @@ func requestHandler(w http.ResponseWriter, r *http.Request) bool {
w.WriteHeader(http.StatusOK)
return true
case "/ready":
if rdy := atomic.LoadInt32(&promscrape.PendingScrapeConfigs); rdy > 0 {
if rdy := promscrape.PendingScrapeConfigs.Load(); rdy > 0 {
errMsg := fmt.Sprintf("waiting for scrapes to init, left: %d", rdy)
http.Error(w, errMsg, http.StatusTooEarly)
} else {

2
go.mod
View file

@ -8,7 +8,7 @@ require (
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.3.0
github.com/VictoriaMetrics/easyproto v0.1.4
github.com/VictoriaMetrics/fastcache v1.12.2
github.com/VictoriaMetrics/metrics v1.32.0
github.com/VictoriaMetrics/metrics v1.33.0
github.com/VictoriaMetrics/metricsql v0.74.0
github.com/aws/aws-sdk-go-v2 v1.25.0
github.com/aws/aws-sdk-go-v2/config v1.27.0

4
go.sum
View file

@ -69,8 +69,8 @@ github.com/VictoriaMetrics/easyproto v0.1.4/go.mod h1:QlGlzaJnDfFd8Lk6Ci/fuLxfTo
github.com/VictoriaMetrics/fastcache v1.12.2 h1:N0y9ASrJ0F6h0QaC3o6uJb3NIZ9VKLjCM7NQbSmF7WI=
github.com/VictoriaMetrics/fastcache v1.12.2/go.mod h1:AmC+Nzz1+3G2eCPapF6UcsnkThDcMsQicp4xDukwJYI=
github.com/VictoriaMetrics/metrics v1.24.0/go.mod h1:eFT25kvsTidQFHb6U0oa0rTrDRdz4xTYjpL8+UPohys=
github.com/VictoriaMetrics/metrics v1.32.0 h1:r9JK2zndYv0TIxFXLEHwhQqRdnu8/O3cwJiCBX4vJCM=
github.com/VictoriaMetrics/metrics v1.32.0/go.mod h1:r7hveu6xMdUACXvB8TYdAj8WEsKzWB0EkpJN+RDtOf8=
github.com/VictoriaMetrics/metrics v1.33.0 h1:EnkDEaGiL2u95t+W76GfecC/LMYpy+tFrexYzBWQIAc=
github.com/VictoriaMetrics/metrics v1.33.0/go.mod h1:r7hveu6xMdUACXvB8TYdAj8WEsKzWB0EkpJN+RDtOf8=
github.com/VictoriaMetrics/metricsql v0.74.0 h1:bVO7USXBBYEuEHQ3PZg/6216j0DvblZM+Q8sTRECkv0=
github.com/VictoriaMetrics/metricsql v0.74.0/go.mod h1:k4UaP/+CjuZslIjd+kCigNG9TQmUqh5v0TP/nMEy90I=
github.com/VividCortex/ewma v1.2.0 h1:f58SaIzcDXrSy3kWaHNvuJgJ3Nmz59Zji6XoJR/q1ow=

View file

@ -46,14 +46,14 @@ func (p *Part) key() string {
// so it must have an unique key in order to always copy it during
// backup, restore and server-side copy.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5005
id := atomic.AddUint64(&uniqueKeyID, 1)
id := uniqueKeyID.Add(1)
return fmt.Sprintf("unique-%016X", id)
}
// Do not use p.FileSize in the key, since it cannot be properly initialized when resuming the restore for partially restored file
return fmt.Sprintf("%s%016X%016X%016X", p.Path, p.Offset, p.Size, p.ActualSize)
}
var uniqueKeyID uint64
var uniqueKeyID atomic.Uint64
// String returns human-readable representation of the part.
func (p *Part) String() string {

View file

@ -172,14 +172,11 @@ func (c *Cache) cleanPerKeyMisses() {
}
type cache struct {
// Atomically updated fields must go first in the struct, so they are properly
// aligned to 8 bytes on 32-bit architectures.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/212
requests uint64
misses uint64
requests atomic.Uint64
misses atomic.Uint64
// sizeBytes contains an approximate size for all the blocks stored in the cache.
sizeBytes int64
sizeBytes atomic.Int64
// getMaxSizeBytes() is a callback, which returns the maximum allowed cache size in bytes.
getMaxSizeBytes func() int
@ -256,7 +253,7 @@ func (c *cache) RemoveBlocksForPart(p interface{}) {
}
func (c *cache) updateSizeBytes(n int) {
atomic.AddInt64(&c.sizeBytes, int64(n))
c.sizeBytes.Add(int64(n))
}
func (c *cache) cleanPerKeyMisses() {
@ -281,7 +278,7 @@ func (c *cache) cleanByTimeout() {
}
func (c *cache) GetBlock(k Key) Block {
atomic.AddUint64(&c.requests, 1)
c.requests.Add(1)
var e *cacheEntry
c.mu.Lock()
defer c.mu.Unlock()
@ -301,7 +298,7 @@ func (c *cache) GetBlock(k Key) Block {
}
// Slow path - the entry is missing in the cache.
c.perKeyMisses[k]++
atomic.AddUint64(&c.misses, 1)
c.misses.Add(1)
return nil
}
@ -367,7 +364,7 @@ func (c *cache) Len() int {
}
func (c *cache) SizeBytes() int {
return int(atomic.LoadInt64(&c.sizeBytes))
return int(c.sizeBytes.Load())
}
func (c *cache) SizeMaxBytes() int {
@ -375,11 +372,11 @@ func (c *cache) SizeMaxBytes() int {
}
func (c *cache) Requests() uint64 {
return atomic.LoadUint64(&c.requests)
return c.requests.Load()
}
func (c *cache) Misses() uint64 {
return atomic.LoadUint64(&c.misses)
return c.misses.Load()
}
// lastAccessHeap implements heap.Interface

View file

@ -16,11 +16,11 @@ func BenchmarkKeyHashUint64(b *testing.B) {
h := k.hashUint64()
hSum += h
}
atomic.AddUint64(&BenchSink, hSum)
BenchSink.Add(hSum)
})
}
var BenchSink uint64
var BenchSink atomic.Uint64
func BenchmarkCacheGet(b *testing.B) {
c := NewCache(func() int {

View file

@ -56,7 +56,7 @@ func (l *Limiter) MaxItems() int {
// CurrentItems return the current number of items registered in l.
func (l *Limiter) CurrentItems() int {
lm := l.v.Load()
n := atomic.LoadUint64(&lm.currentItems)
n := lm.currentItems.Load()
return int(n)
}
@ -72,7 +72,7 @@ func (l *Limiter) Add(h uint64) bool {
}
type limiter struct {
currentItems uint64
currentItems atomic.Uint64
f *filter
}
@ -83,12 +83,12 @@ func newLimiter(maxItems int) *limiter {
}
func (l *limiter) Add(h uint64) bool {
currentItems := atomic.LoadUint64(&l.currentItems)
currentItems := l.currentItems.Load()
if currentItems >= uint64(l.f.maxItems) {
return l.f.Has(h)
}
if l.f.Add(h) {
atomic.AddUint64(&l.currentItems, 1)
l.currentItems.Add(1)
}
return true
}

View file

@ -17,7 +17,7 @@ func BenchmarkToUnsafeString(b *testing.B) {
n += len(s)
}
}
atomic.AddUint64(&Sink, uint64(n))
Sink.Add(uint64(n))
})
}
@ -33,8 +33,8 @@ func BenchmarkToUnsafeBytes(b *testing.B) {
n += len(s)
}
}
atomic.AddUint64(&Sink, uint64(n))
Sink.Add(uint64(n))
})
}
var Sink uint64
var Sink atomic.Uint64

View file

@ -13,7 +13,7 @@ import (
// It caches string match results and returns them back on the next calls
// without calling the matchFunc, which may be expensive.
type FastStringMatcher struct {
lastCleanupTime uint64
lastCleanupTime atomic.Uint64
m sync.Map
@ -21,7 +21,7 @@ type FastStringMatcher struct {
}
type fsmEntry struct {
lastAccessTime uint64
lastAccessTime atomic.Uint64
ok bool
}
@ -29,10 +29,11 @@ type fsmEntry struct {
//
// matchFunc must return the same result for the same input.
func NewFastStringMatcher(matchFunc func(s string) bool) *FastStringMatcher {
return &FastStringMatcher{
lastCleanupTime: fasttime.UnixTimestamp(),
fsm := &FastStringMatcher{
matchFunc: matchFunc,
}
fsm.lastCleanupTime.Store(fasttime.UnixTimestamp())
return fsm
}
// Match applies matchFunc to s and returns the result.
@ -46,19 +47,19 @@ func (fsm *FastStringMatcher) Match(s string) bool {
if ok {
// Fast path - s match result is found in the cache.
e := v.(*fsmEntry)
if atomic.LoadUint64(&e.lastAccessTime)+10 < ct {
if e.lastAccessTime.Load()+10 < ct {
// Reduce the frequency of e.lastAccessTime update to once per 10 seconds
// in order to improve the fast path speed on systems with many CPU cores.
atomic.StoreUint64(&e.lastAccessTime, ct)
e.lastAccessTime.Store(ct)
}
return e.ok
}
// Slow path - run matchFunc for s and store the result in the cache.
b := fsm.matchFunc(s)
e := &fsmEntry{
lastAccessTime: ct,
ok: b,
}
e.lastAccessTime.Store(ct)
// Make a copy of s in order to limit memory usage to the s length,
// since the s may point to bigger string.
// This also protects from the case when s contains unsafe string, which points to a temporary byte slice.
@ -72,7 +73,7 @@ func (fsm *FastStringMatcher) Match(s string) bool {
deadline := ct - uint64(cacheExpireDuration.Seconds())
m.Range(func(k, v interface{}) bool {
e := v.(*fsmEntry)
if atomic.LoadUint64(&e.lastAccessTime) < deadline {
if e.lastAccessTime.Load() < deadline {
m.Delete(k)
}
return true
@ -82,13 +83,13 @@ func (fsm *FastStringMatcher) Match(s string) bool {
return b
}
func needCleanup(lastCleanupTime *uint64, currentTime uint64) bool {
lct := atomic.LoadUint64(lastCleanupTime)
func needCleanup(lastCleanupTime *atomic.Uint64, currentTime uint64) bool {
lct := lastCleanupTime.Load()
if lct+61 >= currentTime {
return false
}
// Atomically compare and swap the current time with the lastCleanupTime
// in order to guarantee that only a single goroutine out of multiple
// concurrently executing goroutines gets true from the call.
return atomic.CompareAndSwapUint64(lastCleanupTime, lct, currentTime)
return lastCleanupTime.CompareAndSwap(lct, currentTime)
}

View file

@ -2,6 +2,7 @@ package bytesutil
import (
"strings"
"sync/atomic"
"testing"
)
@ -27,18 +28,19 @@ func TestFastStringMatcher(t *testing.T) {
func TestNeedCleanup(t *testing.T) {
f := func(lastCleanupTime, currentTime uint64, resultExpected bool) {
t.Helper()
lct := lastCleanupTime
var lct atomic.Uint64
lct.Store(lastCleanupTime)
result := needCleanup(&lct, currentTime)
if result != resultExpected {
t.Fatalf("unexpected result for needCleanup(%d, %d); got %v; want %v", lastCleanupTime, currentTime, result, resultExpected)
}
if result {
if lct != currentTime {
t.Fatalf("unexpected value for lct; got %d; want currentTime=%d", lct, currentTime)
if n := lct.Load(); n != currentTime {
t.Fatalf("unexpected value for lct; got %d; want currentTime=%d", n, currentTime)
}
} else {
if lct != lastCleanupTime {
t.Fatalf("unexpected value for lct; got %d; want lastCleanupTime=%d", lct, lastCleanupTime)
if n := lct.Load(); n != lastCleanupTime {
t.Fatalf("unexpected value for lct; got %d; want lastCleanupTime=%d", n, lastCleanupTime)
}
}
}

View file

@ -2,7 +2,6 @@ package bytesutil
import (
"strings"
"sync/atomic"
"testing"
)
@ -28,6 +27,6 @@ func benchmarkFastStringMatcher(b *testing.B, s string) {
n++
}
}
atomic.AddUint64(&GlobalSink, n)
GlobalSink.Add(n)
})
}

View file

@ -13,7 +13,7 @@ import (
// It caches transformed strings and returns them back on the next calls
// without calling the transformFunc, which may be expensive.
type FastStringTransformer struct {
lastCleanupTime uint64
lastCleanupTime atomic.Uint64
m sync.Map
@ -21,7 +21,7 @@ type FastStringTransformer struct {
}
type fstEntry struct {
lastAccessTime uint64
lastAccessTime atomic.Uint64
s string
}
@ -29,10 +29,11 @@ type fstEntry struct {
//
// transformFunc must return the same result for the same input.
func NewFastStringTransformer(transformFunc func(s string) string) *FastStringTransformer {
return &FastStringTransformer{
lastCleanupTime: fasttime.UnixTimestamp(),
fst := &FastStringTransformer{
transformFunc: transformFunc,
}
fst.lastCleanupTime.Store(fasttime.UnixTimestamp())
return fst
}
// Transform applies transformFunc to s and returns the result.
@ -52,10 +53,10 @@ func (fst *FastStringTransformer) Transform(s string) string {
if ok {
// Fast path - the transformed s is found in the cache.
e := v.(*fstEntry)
if atomic.LoadUint64(&e.lastAccessTime)+10 < ct {
if e.lastAccessTime.Load()+10 < ct {
// Reduce the frequency of e.lastAccessTime update to once per 10 seconds
// in order to improve the fast path speed on systems with many CPU cores.
atomic.StoreUint64(&e.lastAccessTime, ct)
e.lastAccessTime.Store(ct)
}
return e.s
}
@ -72,9 +73,9 @@ func (fst *FastStringTransformer) Transform(s string) string {
sTransformed = s
}
e := &fstEntry{
lastAccessTime: ct,
s: sTransformed,
}
e.lastAccessTime.Store(ct)
fst.m.Store(s, e)
if needCleanup(&fst.lastCleanupTime, ct) {
@ -83,7 +84,7 @@ func (fst *FastStringTransformer) Transform(s string) string {
deadline := ct - uint64(cacheExpireDuration.Seconds())
m.Range(func(k, v interface{}) bool {
e := v.(*fstEntry)
if atomic.LoadUint64(&e.lastAccessTime) < deadline {
if e.lastAccessTime.Load() < deadline {
m.Delete(k)
}
return true

View file

@ -24,8 +24,8 @@ func benchmarkFastStringTransformer(b *testing.B, s string) {
sTransformed := fst.Transform(s)
n += uint64(len(sTransformed))
}
atomic.AddUint64(&GlobalSink, n)
GlobalSink.Add(n)
})
}
var GlobalSink uint64
var GlobalSink atomic.Uint64

View file

@ -42,19 +42,19 @@ func InternString(s string) string {
ct := fasttime.UnixTimestamp()
if v, ok := internStringsMap.Load(s); ok {
e := v.(*ismEntry)
if atomic.LoadUint64(&e.lastAccessTime)+10 < ct {
if e.lastAccessTime.Load()+10 < ct {
// Reduce the frequency of e.lastAccessTime update to once per 10 seconds
// in order to improve the fast path speed on systems with many CPU cores.
atomic.StoreUint64(&e.lastAccessTime, ct)
e.lastAccessTime.Store(ct)
}
return e.s
}
// Make a new copy for s in order to remove references from possible bigger string s refers to.
sCopy := strings.Clone(s)
e := &ismEntry{
lastAccessTime: ct,
s: sCopy,
}
e.lastAccessTime.Store(ct)
internStringsMap.Store(sCopy, e)
if needCleanup(&internStringsMapLastCleanupTime, ct) {
@ -63,7 +63,7 @@ func InternString(s string) string {
deadline := ct - uint64(cacheExpireDuration.Seconds())
m.Range(func(k, v interface{}) bool {
e := v.(*ismEntry)
if atomic.LoadUint64(&e.lastAccessTime) < deadline {
if e.lastAccessTime.Load() < deadline {
m.Delete(k)
}
return true
@ -74,11 +74,11 @@ func InternString(s string) string {
}
type ismEntry struct {
lastAccessTime uint64
lastAccessTime atomic.Uint64
s string
}
var (
internStringsMap sync.Map
internStringsMapLastCleanupTime uint64
internStringsMapLastCleanupTime atomic.Uint64
)

View file

@ -51,7 +51,7 @@ func benchmarkAppendDecimalToFloat(b *testing.B, a []int64, scale int16) {
var fa []float64
for pb.Next() {
fa = AppendDecimalToFloat(fa[:0], a, scale)
atomic.AddUint64(&Sink, uint64(len(fa)))
Sink.Add(uint64(len(fa)))
}
})
}
@ -101,7 +101,7 @@ func benchmarkAppendFloatToDecimal(b *testing.B, fa []float64) {
sink += uint64(len(da))
sink += uint64(e)
}
atomic.AddUint64(&Sink, sink)
Sink.Add(sink)
})
}
@ -138,10 +138,10 @@ func BenchmarkFromFloat(b *testing.B) {
sink += uint64(v)
sink += uint64(e)
}
atomic.AddUint64(&Sink, sink)
Sink.Add(sink)
})
})
}
}
var Sink uint64
var Sink atomic.Uint64

View file

@ -18,12 +18,12 @@ func BenchmarkMarshalGaugeArray(b *testing.B) {
if mt != MarshalTypeZSTDNearestDelta {
panic(fmt.Errorf("unexpected marshal type; got %d; expecting %d", mt, MarshalTypeZSTDNearestDelta))
}
atomic.AddUint64(&Sink, uint64(len(dst)))
Sink.Add(uint64(len(dst)))
}
})
}
var Sink uint64
var Sink atomic.Uint64
func BenchmarkUnmarshalGaugeArray(b *testing.B) {
b.ReportAllocs()
@ -36,7 +36,7 @@ func BenchmarkUnmarshalGaugeArray(b *testing.B) {
if err != nil {
panic(fmt.Errorf("cannot unmarshal gauge array: %w", err))
}
atomic.AddUint64(&Sink, uint64(len(dst)))
Sink.Add(uint64(len(dst)))
}
})
}
@ -68,7 +68,7 @@ func BenchmarkMarshalDeltaConstArray(b *testing.B) {
if mt != MarshalTypeDeltaConst {
panic(fmt.Errorf("unexpected marshal type; got %d; expecting %d", mt, MarshalTypeDeltaConst))
}
atomic.AddUint64(&Sink, uint64(len(dst)))
Sink.Add(uint64(len(dst)))
}
})
}
@ -84,7 +84,7 @@ func BenchmarkUnmarshalDeltaConstArray(b *testing.B) {
if err != nil {
panic(fmt.Errorf("cannot unmarshal delta const array: %w", err))
}
atomic.AddUint64(&Sink, uint64(len(dst)))
Sink.Add(uint64(len(dst)))
}
})
}
@ -115,7 +115,7 @@ func BenchmarkMarshalConstArray(b *testing.B) {
if mt != MarshalTypeConst {
panic(fmt.Errorf("unexpected marshal type; got %d; expecting %d", mt, MarshalTypeConst))
}
atomic.AddUint64(&Sink, uint64(len(dst)))
Sink.Add(uint64(len(dst)))
}
})
}
@ -131,7 +131,7 @@ func BenchmarkUnmarshalConstArray(b *testing.B) {
if err != nil {
panic(fmt.Errorf("cannot unmarshal const array: %w", err))
}
atomic.AddUint64(&Sink, uint64(len(dst)))
Sink.Add(uint64(len(dst)))
}
})
}
@ -160,7 +160,7 @@ func BenchmarkMarshalZeroConstArray(b *testing.B) {
if mt != MarshalTypeConst {
panic(fmt.Errorf("unexpected marshal type; got %d; expecting %d", mt, MarshalTypeConst))
}
atomic.AddUint64(&Sink, uint64(len(dst)))
Sink.Add(uint64(len(dst)))
}
})
}
@ -176,7 +176,7 @@ func BenchmarkUnmarshalZeroConstArray(b *testing.B) {
if err != nil {
panic(fmt.Errorf("cannot unmarshal zero const array: %w", err))
}
atomic.AddUint64(&Sink, uint64(len(dst)))
Sink.Add(uint64(len(dst)))
}
})
}
@ -199,7 +199,7 @@ func BenchmarkMarshalInt64Array(b *testing.B) {
if mt != benchMarshalType {
panic(fmt.Errorf("unexpected marshal type; got %d; expecting %d", mt, benchMarshalType))
}
atomic.AddUint64(&Sink, uint64(len(dst)))
Sink.Add(uint64(len(dst)))
}
})
}
@ -215,7 +215,7 @@ func BenchmarkUnmarshalInt64Array(b *testing.B) {
if err != nil {
panic(fmt.Errorf("cannot unmarshal int64 array: %w", err))
}
atomic.AddUint64(&Sink, uint64(len(dst)))
Sink.Add(uint64(len(dst)))
}
})
}

View file

@ -2,7 +2,6 @@ package encoding
import (
"fmt"
"sync/atomic"
"testing"
)
@ -16,7 +15,7 @@ func BenchmarkMarshalUint64(b *testing.B) {
dst = MarshalUint64(dst[:0], sink)
sink += uint64(len(dst))
}
atomic.AddUint64(&Sink, sink)
Sink.Add(sink)
})
}
@ -29,7 +28,7 @@ func BenchmarkUnmarshalUint64(b *testing.B) {
v := UnmarshalUint64(testMarshaledUint64Data)
sink += v
}
atomic.AddUint64(&Sink, sink)
Sink.Add(sink)
})
}
@ -43,7 +42,7 @@ func BenchmarkMarshalInt64(b *testing.B) {
dst = MarshalInt64(dst[:0], int64(sink))
sink += uint64(len(dst))
}
atomic.AddUint64(&Sink, sink)
Sink.Add(sink)
})
}
@ -56,7 +55,7 @@ func BenchmarkUnmarshalInt64(b *testing.B) {
v := UnmarshalInt64(testMarshaledInt64Data)
sink += uint64(v)
}
atomic.AddUint64(&Sink, sink)
Sink.Add(sink)
})
}
@ -96,7 +95,7 @@ func benchmarkMarshalVarUint64s(b *testing.B, maxValue uint64) {
dst = MarshalVarUint64s(dst[:0], data)
sink += uint64(len(dst))
}
atomic.AddUint64(&Sink, sink)
Sink.Add(sink)
})
}
@ -136,7 +135,7 @@ func benchmarkMarshalVarInt64s(b *testing.B, maxValue int64) {
dst = MarshalVarInt64s(dst[:0], data)
sink += uint64(len(dst))
}
atomic.AddUint64(&Sink, sink)
Sink.Add(sink)
})
}
@ -182,7 +181,7 @@ func benchmarkUnmarshalVarUint64s(b *testing.B, maxValue uint64) {
}
sink += uint64(len(dst))
}
atomic.AddUint64(&Sink, sink)
Sink.Add(sink)
})
}
@ -228,7 +227,7 @@ func benchmarkUnmarshalVarInt64s(b *testing.B, maxValue int64) {
}
sink += uint64(len(dst))
}
atomic.AddUint64(&Sink, sink)
Sink.Add(sink)
})
}

View file

@ -2,7 +2,6 @@ package encoding
import (
"fmt"
"sync/atomic"
"testing"
)
@ -21,7 +20,7 @@ func benchmarkMarshalInt64NearestDelta2(b *testing.B, precisionBits uint8) {
var dst []byte
for pb.Next() {
dst, _ = marshalInt64NearestDelta2(dst[:0], benchInt64Array, precisionBits)
atomic.AddUint64(&Sink, uint64(len(dst)))
Sink.Add(uint64(len(dst)))
}
})
}
@ -37,7 +36,7 @@ func BenchmarkUnmarshalInt64NearestDelta2(b *testing.B) {
if err != nil {
panic(fmt.Errorf("unexpected error: %w", err))
}
atomic.AddUint64(&Sink, uint64(len(dst)))
Sink.Add(uint64(len(dst)))
}
})
}

View file

@ -2,7 +2,6 @@ package encoding
import (
"fmt"
"sync/atomic"
"testing"
)
@ -21,7 +20,7 @@ func benchmarkMarshalInt64NearestDelta(b *testing.B, precisionBits uint8) {
var dst []byte
for pb.Next() {
dst, _ = marshalInt64NearestDelta(dst[:0], benchInt64Array, precisionBits)
atomic.AddUint64(&Sink, uint64(len(dst)))
Sink.Add(uint64(len(dst)))
}
})
}
@ -37,7 +36,7 @@ func BenchmarkUnmarshalInt64NearestDelta(b *testing.B) {
if err != nil {
panic(fmt.Errorf("unexpected error: %w", err))
}
atomic.AddUint64(&Sink, uint64(len(dst)))
Sink.Add(uint64(len(dst)))
}
})
}

View file

@ -11,18 +11,22 @@ func init() {
defer ticker.Stop()
for tm := range ticker.C {
t := uint64(tm.Unix())
atomic.StoreUint64(&currentTimestamp, t)
currentTimestamp.Store(t)
}
}()
}
var currentTimestamp = uint64(time.Now().Unix())
var currentTimestamp = func() *atomic.Uint64 {
var x atomic.Uint64
x.Store(uint64(time.Now().Unix()))
return &x
}()
// UnixTimestamp returns the current unix timestamp in seconds.
//
// It is faster than time.Now().Unix()
func UnixTimestamp() uint64 {
return atomic.LoadUint64(&currentTimestamp)
return currentTimestamp.Load()
}
// UnixDate returns date from the current unix timestamp.

View file

@ -13,7 +13,7 @@ func BenchmarkUnixTimestamp(b *testing.B) {
for pb.Next() {
ts += UnixTimestamp()
}
atomic.StoreUint64(&Sink, ts)
Sink.Store(ts)
})
}
@ -24,9 +24,9 @@ func BenchmarkTimeNowUnix(b *testing.B) {
for pb.Next() {
ts += uint64(time.Now().Unix())
}
atomic.StoreUint64(&Sink, ts)
Sink.Store(ts)
})
}
// Sink should prevent from code elimination by optimizing compiler
var Sink uint64
var Sink atomic.Uint64

View file

@ -35,7 +35,7 @@ func NewPassword(name, description string) *Password {
// If the flag value is file:///path/to/file or http://host/path ,
// then its contents is automatically re-read from the given file or url
type Password struct {
nextRefreshTimestamp uint64
nextRefreshTimestamp atomic.Uint64
value atomic.Pointer[string]
@ -62,14 +62,14 @@ func (p *Password) maybeRereadPassword() {
return
}
tsCurr := fasttime.UnixTimestamp()
tsNext := atomic.LoadUint64(&p.nextRefreshTimestamp)
tsNext := p.nextRefreshTimestamp.Load()
if tsCurr < tsNext {
// Fast path - nothing to re-read
return
}
// Re-read password from p.sourcePath
atomic.StoreUint64(&p.nextRefreshTimestamp, tsCurr+2)
p.nextRefreshTimestamp.Store(tsCurr + 2)
s, err := fscore.ReadPasswordFromFileOrHTTP(p.sourcePath)
if err != nil {
// cannot use lib/logger, since it can be uninitialized yet
@ -86,7 +86,7 @@ func (p *Password) String() string {
// Set implements flag.Value interface.
func (p *Password) Set(value string) error {
atomic.StoreUint64(&p.nextRefreshTimestamp, 0)
p.nextRefreshTimestamp.Store(0)
switch {
case strings.HasPrefix(value, "file://"):
p.sourcePath = strings.TrimPrefix(value, "file://")

View file

@ -16,7 +16,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
)
var tmpFileNum uint64
var tmpFileNum atomic.Uint64
// MustSyncPath syncs contents of the given path.
func MustSyncPath(path string) {
@ -62,7 +62,7 @@ func MustWriteAtomic(path string, data []byte, canOverwrite bool) {
}
// Write data to a temporary file.
n := atomic.AddUint64(&tmpFileNum, 1)
n := tmpFileNum.Add(1)
tmpPath := fmt.Sprintf("%s.tmp.%d", path, n)
MustWriteSync(tmpPath, data)
@ -207,7 +207,11 @@ func MustRemoveDirAtomic(dir string) {
MustSyncPath(parentDir)
}
var atomicDirRemoveCounter = uint64(time.Now().UnixNano())
var atomicDirRemoveCounter = func() *atomic.Uint64 {
var x atomic.Uint64
x.Store(uint64(time.Now().UnixNano()))
return &x
}()
// MustReadDir reads directory entries at the given dir.
func MustReadDir(dir string) []os.DirEntry {

View file

@ -5,7 +5,6 @@ package fs
import (
"fmt"
"os"
"sync/atomic"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"golang.org/x/sys/unix"
@ -16,7 +15,7 @@ func freeSpace(stat unix.Statfs_t) uint64 {
}
func mustRemoveDirAtomic(dir string) {
n := atomic.AddUint64(&atomicDirRemoveCounter, 1)
n := atomicDirRemoveCounter.Add(1)
tmpDir := fmt.Sprintf("%s.must-remove.%d", dir, n)
if err := os.Rename(dir, tmpDir); err != nil {
logger.Panicf("FATAL: cannot move %s to %s: %s", dir, tmpDir, err)

View file

@ -3,7 +3,6 @@ package fs
import (
"fmt"
"os"
"sync/atomic"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"golang.org/x/sys/unix"
@ -14,7 +13,7 @@ func freeSpace(stat unix.Statfs_t) uint64 {
}
func mustRemoveDirAtomic(dir string) {
n := atomic.AddUint64(&atomicDirRemoveCounter, 1)
n := atomicDirRemoveCounter.Add(1)
tmpDir := fmt.Sprintf("%s.must-remove.%d", dir, n)
if err := os.Rename(dir, tmpDir); err != nil {
logger.Panicf("FATAL: cannot move %s to %s: %s", dir, tmpDir, err)

View file

@ -3,14 +3,13 @@ package fs
import (
"fmt"
"os"
"sync/atomic"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"golang.org/x/sys/unix"
)
func mustRemoveDirAtomic(dir string) {
n := atomic.AddUint64(&atomicDirRemoveCounter, 1)
n := atomicDirRemoveCounter.Add(1)
tmpDir := fmt.Sprintf("%s.must-remove.%d", dir, n)
if err := os.Rename(dir, tmpDir); err != nil {
logger.Panicf("FATAL: cannot move %s to %s: %s", dir, tmpDir, err)

View file

@ -5,7 +5,6 @@ import (
"os"
"reflect"
"sync"
"sync/atomic"
"unsafe"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
@ -25,7 +24,7 @@ func mustSyncPath(path string) {
}
func mustRemoveDirAtomic(dir string) {
n := atomic.AddUint64(&atomicDirRemoveCounter, 1)
n := atomicDirRemoveCounter.Add(1)
tmpDir := fmt.Sprintf("%s.must-remove.%d", dir, n)
if err := os.Rename(dir, tmpDir); err != nil {
logger.Panicf("FATAL: cannot move %s to %s: %s", dir, tmpDir, err)

View file

@ -32,8 +32,8 @@ type MustReadAtCloser interface {
// ReaderAt implements rand-access reader.
type ReaderAt struct {
readCalls uint64
readBytes uint64
readCalls atomic.Int64
readBytes atomic.Int64
// path contains the path to the file for reading
path string
@ -81,8 +81,8 @@ func (r *ReaderAt) MustReadAt(p []byte, off int64) {
copy(p, src)
}
if r.useLocalStats {
atomic.AddUint64(&r.readCalls, 1)
atomic.AddUint64(&r.readBytes, uint64(len(p)))
r.readCalls.Add(1)
r.readBytes.Add(int64(len(p)))
} else {
readCalls.Inc()
readBytes.Add(len(p))
@ -119,10 +119,10 @@ func (r *ReaderAt) MustClose() {
}
if r.useLocalStats {
readCalls.Add(int(r.readCalls))
readBytes.Add(int(r.readBytes))
r.readCalls = 0
r.readBytes = 0
readCalls.AddInt64(r.readCalls.Load())
readBytes.AddInt64(r.readBytes.Load())
r.readCalls.Store(0)
r.readBytes.Store(0)
r.useLocalStats = false
}
}

View file

@ -68,7 +68,7 @@ var (
)
type server struct {
shutdownDelayDeadline int64
shutdownDelayDeadline atomic.Int64
s *http.Server
}
@ -225,7 +225,7 @@ func stop(addr string) error {
}
deadline := time.Now().Add(*shutdownDelay).UnixNano()
atomic.StoreInt64(&s.shutdownDelayDeadline, deadline)
s.shutdownDelayDeadline.Store(deadline)
if *shutdownDelay > 0 {
// Sleep for a while until load balancer in front of the server
// notifies that "/health" endpoint returns non-OK responses.
@ -339,7 +339,7 @@ func handlerWrapper(s *server, w http.ResponseWriter, r *http.Request, rh Reques
switch r.URL.Path {
case "/health":
h.Set("Content-Type", "text/plain; charset=utf-8")
deadline := atomic.LoadInt64(&s.shutdownDelayDeadline)
deadline := s.shutdownDelayDeadline.Load()
if deadline <= 0 {
w.Write([]byte("OK"))
return

View file

@ -149,14 +149,11 @@ func (c *Cache) cleanByTimeout() {
}
type cache struct {
// Atomically updated fields must go first in the struct, so they are properly
// aligned to 8 bytes on 32-bit architectures.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/212
requests uint64
misses uint64
requests atomic.Uint64
misses atomic.Uint64
// sizeBytes contains an approximate size for all the blocks stored in the cache.
sizeBytes int64
sizeBytes atomic.Int64
// getMaxSizeBytes() is a callback, which returns the maximum allowed cache size in bytes.
getMaxSizeBytes func() int
@ -204,7 +201,7 @@ func newCache(getMaxSizeBytes func() int) *cache {
}
func (c *cache) updateSizeBytes(n int) {
atomic.AddInt64(&c.sizeBytes, int64(n))
c.sizeBytes.Add(int64(n))
}
func (c *cache) cleanByTimeout() {
@ -223,13 +220,13 @@ func (c *cache) cleanByTimeout() {
}
func (c *cache) GetEntry(k string) Entry {
atomic.AddUint64(&c.requests, 1)
c.requests.Add(1)
c.mu.Lock()
defer c.mu.Unlock()
ce := c.m[k]
if ce == nil {
atomic.AddUint64(&c.misses, 1)
c.misses.Add(1)
return nil
}
currentTime := fasttime.UnixTimestamp()
@ -277,7 +274,7 @@ func (c *cache) Len() int {
}
func (c *cache) SizeBytes() int {
return int(atomic.LoadInt64(&c.sizeBytes))
return int(c.sizeBytes.Load())
}
func (c *cache) SizeMaxBytes() int {
@ -285,11 +282,11 @@ func (c *cache) SizeMaxBytes() int {
}
func (c *cache) Requests() uint64 {
return atomic.LoadUint64(&c.requests)
return c.requests.Load()
}
func (c *cache) Misses() uint64 {
return atomic.LoadUint64(&c.misses)
return c.misses.Load()
}
// lastAccessHeap implements heap.Interface

View file

@ -44,10 +44,7 @@ func (cm *connMetrics) init(ms *metrics.Set, group, name, addr string) {
}
type statConn struct {
// Move atomic counters to the top of struct in order to properly align them on 32-bit arch.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/212
closeCalls uint64
closeCalls atomic.Uint64
net.Conn
@ -90,7 +87,7 @@ func (sc *statConn) Write(p []byte) (int, error) {
}
func (sc *statConn) Close() error {
n := atomic.AddUint64(&sc.closeCalls, 1)
n := sc.closeCalls.Add(1)
if n > 1 {
// The connection has been already closed.
return nil

View file

@ -104,9 +104,9 @@ func newAPIWatcher(apiServer string, ac *promauth.Config, sdc *SDConfig, swcFunc
}
func (aw *apiWatcher) mustStart() {
atomic.AddInt32(&aw.gw.apiWatcherInflightStartCalls, 1)
aw.gw.apiWatcherInflightStartCalls.Add(1)
aw.gw.startWatchersForRole(aw.role, aw)
atomic.AddInt32(&aw.gw.apiWatcherInflightStartCalls, -1)
aw.gw.apiWatcherInflightStartCalls.Add(-1)
}
func (aw *apiWatcher) updateSwosCount(multiplier int, swosByKey map[string][]interface{}) {
@ -214,15 +214,15 @@ func (aw *apiWatcher) getScrapeWorkObjects() []interface{} {
type groupWatcher struct {
// The number of in-flight apiWatcher.mustStart() calls for the given groupWatcher.
// This field is used by groupWatchersCleaner() in order to determine when the given groupWatcher can be stopped.
apiWatcherInflightStartCalls int32
apiWatcherInflightStartCalls atomic.Int32
// Old Kubernetes doesn't support /apis/networking.k8s.io/v1/, so /apis/networking.k8s.io/v1beta1/ must be used instead.
// This flag is used for automatic substitution of v1 API path with v1beta1 API path during requests to apiServer.
useNetworkingV1Beta1 uint32
useNetworkingV1Beta1 atomic.Bool
// Old Kubernetes doesn't support /apis/discovery.k8s.io/v1/, so discovery.k8s.io/v1beta1/ must be used instead.
// This flag is used for automatic substitution of v1 API path with v1beta1 API path during requests to apiServer.
useDiscoveryV1Beta1 uint32
useDiscoveryV1Beta1 atomic.Bool
apiServer string
namespaces []string
@ -343,7 +343,7 @@ func groupWatchersCleaner() {
awsTotal += len(uw.aws) + len(uw.awsPending)
}
if awsTotal == 0 && atomic.LoadInt32(&gw.apiWatcherInflightStartCalls) == 0 {
if awsTotal == 0 && gw.apiWatcherInflightStartCalls.Load() == 0 {
// There are no API watchers subscribed to gw and there are no in-flight apiWatcher.mustStart() calls.
// Stop all the urlWatcher instances at gw and drop gw from groupWatchers in this case,
// but do it only on the second iteration in order to reduce urlWatcher churn
@ -471,11 +471,11 @@ func (gw *groupWatcher) startWatchersForRole(role string, aw *apiWatcher) {
// doRequest performs http request to the given requestURL.
func (gw *groupWatcher) doRequest(ctx context.Context, requestURL string) (*http.Response, error) {
if strings.Contains(requestURL, "/apis/networking.k8s.io/v1/") && atomic.LoadUint32(&gw.useNetworkingV1Beta1) == 1 {
if strings.Contains(requestURL, "/apis/networking.k8s.io/v1/") && gw.useNetworkingV1Beta1.Load() {
// Update networking URL for old Kubernetes API, which supports only v1beta1 path.
requestURL = strings.Replace(requestURL, "/apis/networking.k8s.io/v1/", "/apis/networking.k8s.io/v1beta1/", 1)
}
if strings.Contains(requestURL, "/apis/discovery.k8s.io/v1/") && atomic.LoadUint32(&gw.useDiscoveryV1Beta1) == 1 {
if strings.Contains(requestURL, "/apis/discovery.k8s.io/v1/") && gw.useDiscoveryV1Beta1.Load() {
// Update discovery URL for old Kubernetes API, which supports only v1beta1 path.
requestURL = strings.Replace(requestURL, "/apis/discovery.k8s.io/v1/", "/apis/discovery.k8s.io/v1beta1/", 1)
}
@ -491,12 +491,12 @@ func (gw *groupWatcher) doRequest(ctx context.Context, requestURL string) (*http
return nil, err
}
if resp.StatusCode == http.StatusNotFound {
if strings.Contains(requestURL, "/apis/networking.k8s.io/v1/") && atomic.LoadUint32(&gw.useNetworkingV1Beta1) == 0 {
atomic.StoreUint32(&gw.useNetworkingV1Beta1, 1)
if strings.Contains(requestURL, "/apis/networking.k8s.io/v1/") && !gw.useNetworkingV1Beta1.Load() {
gw.useNetworkingV1Beta1.Store(true)
return gw.doRequest(ctx, requestURL)
}
if strings.Contains(requestURL, "/apis/discovery.k8s.io/v1/") && atomic.LoadUint32(&gw.useDiscoveryV1Beta1) == 0 {
atomic.StoreUint32(&gw.useDiscoveryV1Beta1, 1)
if strings.Contains(requestURL, "/apis/discovery.k8s.io/v1/") && !gw.useDiscoveryV1Beta1.Load() {
gw.useDiscoveryV1Beta1.Store(true)
return gw.doRequest(ctx, requestURL)
}
}

View file

@ -79,9 +79,9 @@ func Stop() {
var (
globalStopChan chan struct{}
scraperWG sync.WaitGroup
// PendingScrapeConfigs - zero value means, that
// all scrapeConfigs are inited and ready for work.
PendingScrapeConfigs int32
// PendingScrapeConfigs - zero value means, that all scrapeConfigs are inited and ready for work.
PendingScrapeConfigs atomic.Int32
// configData contains -promscrape.config data
configData atomic.Pointer[[]byte]
@ -225,7 +225,7 @@ func newScrapeConfigs(pushData func(at *auth.Token, wr *prompbmarshal.WriteReque
}
func (scs *scrapeConfigs) add(name string, checkInterval time.Duration, getScrapeWork func(cfg *Config, swsPrev []*ScrapeWork) []*ScrapeWork) {
atomic.AddInt32(&PendingScrapeConfigs, 1)
PendingScrapeConfigs.Add(1)
scfg := &scrapeConfig{
name: name,
pushData: scs.pushData,
@ -292,7 +292,7 @@ func (scfg *scrapeConfig) run(globalStopCh <-chan struct{}) {
}
}
updateScrapeWork(cfg)
atomic.AddInt32(&PendingScrapeConfigs, -1)
PendingScrapeConfigs.Add(-1)
for {

View file

@ -56,7 +56,7 @@ var (
)
type statConn struct {
closed uint64
closed atomic.Int32
net.Conn
}
@ -82,7 +82,7 @@ func (sc *statConn) Write(p []byte) (int, error) {
func (sc *statConn) Close() error {
err := sc.Conn.Close()
if atomic.AddUint64(&sc.closed, 1) == 1 {
if sc.closed.Add(1) == 1 {
conns.Dec()
}
return err

View file

@ -41,7 +41,11 @@ func NewName() string {
}
func nextSnapshotIdx() uint64 {
return atomic.AddUint64(&snapshotIdx, 1)
return snapshotIdx.Add(1)
}
var snapshotIdx = uint64(time.Now().UnixNano())
var snapshotIdx = func() *atomic.Uint64 {
var x atomic.Uint64
x.Store(uint64(time.Now().UnixNano()))
return &x
}()

View file

@ -48,7 +48,7 @@ type Server struct {
wg sync.WaitGroup
// stopFlag is set to true when the server needs to stop.
stopFlag uint32
stopFlag atomic.Bool
concurrencyLimitReached *metrics.Counter
concurrencyLimitTimeout *metrics.Counter
@ -239,11 +239,11 @@ func (s *Server) MustStop() {
}
func (s *Server) setIsStopping() {
atomic.StoreUint32(&s.stopFlag, 1)
s.stopFlag.Store(true)
}
func (s *Server) isStopping() bool {
return atomic.LoadUint32(&s.stopFlag) != 0
return s.stopFlag.Load()
}
func (s *Server) processConn(bc *handshake.BufferedConn) error {

View file

@ -43,7 +43,7 @@ type Cache struct {
// In this case using prev would result in RAM waste,
// it is better to use only curr cache with doubled size.
// After the process of switching, this flag will be set to whole.
mode uint32
mode atomic.Uint32
// The maxBytes value passed to New() or to Load().
maxBytes int
@ -110,7 +110,7 @@ func newCacheInternal(curr, prev *fastcache.Cache, mode, maxBytes int) *Cache {
c.curr.Store(curr)
c.prev.Store(prev)
c.stopCh = make(chan struct{})
c.setMode(mode)
c.mode.Store(uint32(mode))
return &c
}
@ -143,7 +143,7 @@ func (c *Cache) expirationWatcher(expireDuration time.Duration) {
case <-t.C:
}
c.mu.Lock()
if atomic.LoadUint32(&c.mode) != split {
if c.mode.Load() != split {
// Stop the expirationWatcher on non-split mode.
c.mu.Unlock()
return
@ -183,7 +183,7 @@ func (c *Cache) prevCacheWatcher() {
case <-t.C:
}
c.mu.Lock()
if atomic.LoadUint32(&c.mode) != split {
if c.mode.Load() != split {
// Do nothing in non-split mode.
c.mu.Unlock()
return
@ -227,7 +227,7 @@ func (c *Cache) cacheSizeWatcher() {
return
case <-t.C:
}
if c.loadMode() != split {
if c.mode.Load() != split {
continue
}
var cs fastcache.Stats
@ -252,7 +252,7 @@ func (c *Cache) cacheSizeWatcher() {
// 6) drop prev cache
c.mu.Lock()
c.setMode(switching)
c.mode.Store(switching)
prev := c.prev.Load()
curr := c.curr.Load()
c.prev.Store(curr)
@ -280,7 +280,7 @@ func (c *Cache) cacheSizeWatcher() {
}
c.mu.Lock()
c.setMode(whole)
c.mode.Store(whole)
prev = c.prev.Load()
c.prev.Store(fastcache.New(1024))
cs.Reset()
@ -318,15 +318,7 @@ func (c *Cache) Reset() {
updateCacheStatsHistory(&c.csHistory, &cs)
curr.Reset()
// Reset the mode to `split` in the hope the working set size becomes smaller after the reset.
c.setMode(split)
}
func (c *Cache) setMode(mode int) {
atomic.StoreUint32(&c.mode, uint32(mode))
}
func (c *Cache) loadMode() int {
return int(atomic.LoadUint32(&c.mode))
c.mode.Store(split)
}
// UpdateStats updates fcs with cache stats.
@ -374,7 +366,7 @@ func (c *Cache) Get(dst, key []byte) []byte {
// Fast path - the entry is found in the current cache.
return result
}
if c.loadMode() == whole {
if c.mode.Load() == whole {
// Nothing found.
return result
}
@ -397,7 +389,7 @@ func (c *Cache) Has(key []byte) bool {
if curr.Has(key) {
return true
}
if c.loadMode() == whole {
if c.mode.Load() == whole {
return false
}
prev := c.prev.Load()
@ -428,7 +420,7 @@ func (c *Cache) GetBig(dst, key []byte) []byte {
// Fast path - the entry is found in the current cache.
return result
}
if c.loadMode() == whole {
if c.mode.Load() == whole {
// Nothing found.
return result
}

View file

@ -42,6 +42,11 @@ func (c *Counter) Add(n int) {
atomic.AddUint64(&c.n, uint64(n))
}
// AddInt64 adds n to c.
func (c *Counter) AddInt64(n int64) {
atomic.AddUint64(&c.n, uint64(n))
}
// Get returns the current value for c.
func (c *Counter) Get() uint64 {
return atomic.LoadUint64(&c.n)

2
vendor/modules.txt vendored
View file

@ -99,7 +99,7 @@ github.com/VictoriaMetrics/easyproto
# github.com/VictoriaMetrics/fastcache v1.12.2
## explicit; go 1.13
github.com/VictoriaMetrics/fastcache
# github.com/VictoriaMetrics/metrics v1.32.0
# github.com/VictoriaMetrics/metrics v1.33.0
## explicit; go 1.17
github.com/VictoriaMetrics/metrics
# github.com/VictoriaMetrics/metricsql v0.74.0