From 5f5fcab2179f8aec733106542f52479538809e24 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Sun, 21 Jan 2024 13:58:27 +0200 Subject: [PATCH] all: call atomic.Load* in front of atomic.CompareAndSwap* at places where the atomic.CompareAndSwap* returns false most of the time This allows avoiding slow inter-CPU synchornization induced by atomic.CompareAndSwap* --- app/vmauth/auth_config.go | 4 +++- app/vmselect/prometheus/prometheus.go | 4 +++- lib/bloomfilter/filter.go | 2 ++ lib/mergeset/table.go | 6 +++++- lib/storage/storage.go | 9 +++++++-- 5 files changed, 20 insertions(+), 5 deletions(-) diff --git a/app/vmauth/auth_config.go b/app/vmauth/auth_config.go index 413e97a7b..d47f081dd 100644 --- a/app/vmauth/auth_config.go +++ b/app/vmauth/auth_config.go @@ -271,8 +271,10 @@ func (up *URLPrefix) getLeastLoadedBackendURL() *backendURL { if bu.isBroken() { continue } - if atomic.CompareAndSwapInt32(&bu.concurrentRequests, 0, 1) { + if atomic.LoadInt32(&bu.concurrentRequests) == 0 { // Fast path - return the backend with zero concurrently executed requests. + // Do not use atomic.CompareAndSwapInt32(), since it is much slower on systems with many CPU cores. + atomic.AddInt32(&bu.concurrentRequests, 1) return bu } } diff --git a/app/vmselect/prometheus/prometheus.go b/app/vmselect/prometheus/prometheus.go index b65fb159b..645983267 100644 --- a/app/vmselect/prometheus/prometheus.go +++ b/app/vmselect/prometheus/prometheus.go @@ -332,7 +332,9 @@ func exportHandler(qt *querytracer.Tracer, at *auth.Token, w http.ResponseWriter firstLineSent := uint32(0) writeLineFunc = func(xb *exportBlock, workerID uint) error { bb := sw.getBuffer(workerID) - if atomic.CompareAndSwapUint32(&firstLineOnce, 0, 1) { + // Use atomic.LoadUint32() in front of atomic.CompareAndSwapUint32() in order to avoid slow inter-CPU synchronization + // in fast path after the first line has been already sent. + if atomic.LoadUint32(&firstLineOnce) == 0 && atomic.CompareAndSwapUint32(&firstLineOnce, 0, 1) { // Send the first line to sw.bw WriteExportPromAPILine(bb, xb) _, err := sw.bw.Write(bb.B) diff --git a/lib/bloomfilter/filter.go b/lib/bloomfilter/filter.go index 07a561bb5..314a7eca9 100644 --- a/lib/bloomfilter/filter.go +++ b/lib/bloomfilter/filter.go @@ -79,6 +79,8 @@ func (f *filter) Add(h uint64) bool { w := atomic.LoadUint64(&bits[i]) for (w & mask) == 0 { wNew := w | mask + // The wNew != w most of the time, so there is no need in using atomic.LoadUint64 + // in front of atomic.CompareAndSwapUint64 in order to try avoiding slow inter-CPU synchronization. if atomic.CompareAndSwapUint64(&bits[i], w, wNew) { isNew = true break diff --git a/lib/mergeset/table.go b/lib/mergeset/table.go index a5420df3c..3fa5e48c5 100644 --- a/lib/mergeset/table.go +++ b/lib/mergeset/table.go @@ -738,7 +738,11 @@ func (tb *Table) flushBlocksToParts(ibs []*inmemoryBlock, isFinal bool) { if isFinal { tb.flushCallback() } else { - atomic.CompareAndSwapUint32(&tb.needFlushCallbackCall, 0, 1) + // Use atomic.LoadUint32 in front of atomic.CompareAndSwapUint32 in order to avoid slow inter-CPU synchronization + // at fast path when needFlushCallbackCall is already set to 1. + if atomic.LoadUint32(&tb.needFlushCallbackCall) == 0 { + atomic.CompareAndSwapUint32(&tb.needFlushCallbackCall, 0, 1) + } } } } diff --git a/lib/storage/storage.go b/lib/storage/storage.go index fc2c98907..2363b806d 100644 --- a/lib/storage/storage.go +++ b/lib/storage/storage.go @@ -665,14 +665,19 @@ func (s *Storage) startFreeDiskSpaceWatcher() { freeSpaceBytes := fs.MustGetFreeSpace(s.path) if freeSpaceBytes < freeDiskSpaceLimitBytes { // Switch the storage to readonly mode if there is no enough free space left at s.path - if atomic.CompareAndSwapUint32(&s.isReadOnly, 0, 1) { + // + // Use atomic.LoadUint32 in front of atomic.CompareAndSwapUint32 in order to avoid slow inter-CPU synchronization + // when the storage is already in read-only mode. + if atomic.LoadUint32(&s.isReadOnly) == 0 && atomic.CompareAndSwapUint32(&s.isReadOnly, 0, 1) { // log notification only on state change logger.Warnf("switching the storage at %s to read-only mode, since it has less than -storage.minFreeDiskSpaceBytes=%d of free space: %d bytes left", s.path, freeDiskSpaceLimitBytes, freeSpaceBytes) } return } - if atomic.CompareAndSwapUint32(&s.isReadOnly, 1, 0) { + // Use atomic.LoadUint32 in front of atomic.CompareAndSwapUint32 in order to avoid slow inter-CPU synchronization + // when the storage isn't in read-only mode. + if atomic.LoadUint32(&s.isReadOnly) == 1 && atomic.CompareAndSwapUint32(&s.isReadOnly, 1, 0) { logger.Warnf("enabling writing to the storage at %s, since it has more than -storage.minFreeDiskSpaceBytes=%d of free space: %d bytes left", s.path, freeDiskSpaceLimitBytes, freeSpaceBytes) }