all: call atomic.Load* in front of atomic.CompareAndSwap* at places where the atomic.CompareAndSwap* returns false most of the time

This allows avoiding slow inter-CPU synchornization induced by atomic.CompareAndSwap*
This commit is contained in:
Aliaksandr Valialkin 2024-01-21 13:58:27 +02:00
parent 3d83f3347d
commit 0b2ea1a7c7
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB
5 changed files with 20 additions and 5 deletions

View file

@ -271,8 +271,10 @@ func (up *URLPrefix) getLeastLoadedBackendURL() *backendURL {
if bu.isBroken() {
continue
}
if atomic.CompareAndSwapInt32(&bu.concurrentRequests, 0, 1) {
if atomic.LoadInt32(&bu.concurrentRequests) == 0 {
// Fast path - return the backend with zero concurrently executed requests.
// Do not use atomic.CompareAndSwapInt32(), since it is much slower on systems with many CPU cores.
atomic.AddInt32(&bu.concurrentRequests, 1)
return bu
}
}

View file

@ -321,7 +321,9 @@ func exportHandler(qt *querytracer.Tracer, w http.ResponseWriter, cp *commonPara
firstLineSent := uint32(0)
writeLineFunc = func(xb *exportBlock, workerID uint) error {
bb := sw.getBuffer(workerID)
if atomic.CompareAndSwapUint32(&firstLineOnce, 0, 1) {
// Use atomic.LoadUint32() in front of atomic.CompareAndSwapUint32() in order to avoid slow inter-CPU synchronization
// in fast path after the first line has been already sent.
if atomic.LoadUint32(&firstLineOnce) == 0 && atomic.CompareAndSwapUint32(&firstLineOnce, 0, 1) {
// Send the first line to sw.bw
WriteExportPromAPILine(bb, xb)
_, err := sw.bw.Write(bb.B)

View file

@ -79,6 +79,8 @@ func (f *filter) Add(h uint64) bool {
w := atomic.LoadUint64(&bits[i])
for (w & mask) == 0 {
wNew := w | mask
// The wNew != w most of the time, so there is no need in using atomic.LoadUint64
// in front of atomic.CompareAndSwapUint64 in order to try avoiding slow inter-CPU synchronization.
if atomic.CompareAndSwapUint64(&bits[i], w, wNew) {
isNew = true
break

View file

@ -738,7 +738,11 @@ func (tb *Table) flushBlocksToParts(ibs []*inmemoryBlock, isFinal bool) {
if isFinal {
tb.flushCallback()
} else {
atomic.CompareAndSwapUint32(&tb.needFlushCallbackCall, 0, 1)
// Use atomic.LoadUint32 in front of atomic.CompareAndSwapUint32 in order to avoid slow inter-CPU synchronization
// at fast path when needFlushCallbackCall is already set to 1.
if atomic.LoadUint32(&tb.needFlushCallbackCall) == 0 {
atomic.CompareAndSwapUint32(&tb.needFlushCallbackCall, 0, 1)
}
}
}
}

View file

@ -650,14 +650,19 @@ func (s *Storage) startFreeDiskSpaceWatcher() {
freeSpaceBytes := fs.MustGetFreeSpace(s.path)
if freeSpaceBytes < freeDiskSpaceLimitBytes {
// Switch the storage to readonly mode if there is no enough free space left at s.path
if atomic.CompareAndSwapUint32(&s.isReadOnly, 0, 1) {
//
// Use atomic.LoadUint32 in front of atomic.CompareAndSwapUint32 in order to avoid slow inter-CPU synchronization
// when the storage is already in read-only mode.
if atomic.LoadUint32(&s.isReadOnly) == 0 && atomic.CompareAndSwapUint32(&s.isReadOnly, 0, 1) {
// log notification only on state change
logger.Warnf("switching the storage at %s to read-only mode, since it has less than -storage.minFreeDiskSpaceBytes=%d of free space: %d bytes left",
s.path, freeDiskSpaceLimitBytes, freeSpaceBytes)
}
return
}
if atomic.CompareAndSwapUint32(&s.isReadOnly, 1, 0) {
// Use atomic.LoadUint32 in front of atomic.CompareAndSwapUint32 in order to avoid slow inter-CPU synchronization
// when the storage isn't in read-only mode.
if atomic.LoadUint32(&s.isReadOnly) == 1 && atomic.CompareAndSwapUint32(&s.isReadOnly, 1, 0) {
logger.Warnf("enabling writing to the storage at %s, since it has more than -storage.minFreeDiskSpaceBytes=%d of free space: %d bytes left",
s.path, freeDiskSpaceLimitBytes, freeSpaceBytes)
}