mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
all: call atomic.Load* in front of atomic.CompareAndSwap* at places where the atomic.CompareAndSwap* returns false most of the time
This allows avoiding slow inter-CPU synchornization induced by atomic.CompareAndSwap*
This commit is contained in:
parent
6d91d10cbd
commit
5f5fcab217
5 changed files with 20 additions and 5 deletions
|
@ -271,8 +271,10 @@ func (up *URLPrefix) getLeastLoadedBackendURL() *backendURL {
|
||||||
if bu.isBroken() {
|
if bu.isBroken() {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if atomic.CompareAndSwapInt32(&bu.concurrentRequests, 0, 1) {
|
if atomic.LoadInt32(&bu.concurrentRequests) == 0 {
|
||||||
// Fast path - return the backend with zero concurrently executed requests.
|
// Fast path - return the backend with zero concurrently executed requests.
|
||||||
|
// Do not use atomic.CompareAndSwapInt32(), since it is much slower on systems with many CPU cores.
|
||||||
|
atomic.AddInt32(&bu.concurrentRequests, 1)
|
||||||
return bu
|
return bu
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -332,7 +332,9 @@ func exportHandler(qt *querytracer.Tracer, at *auth.Token, w http.ResponseWriter
|
||||||
firstLineSent := uint32(0)
|
firstLineSent := uint32(0)
|
||||||
writeLineFunc = func(xb *exportBlock, workerID uint) error {
|
writeLineFunc = func(xb *exportBlock, workerID uint) error {
|
||||||
bb := sw.getBuffer(workerID)
|
bb := sw.getBuffer(workerID)
|
||||||
if atomic.CompareAndSwapUint32(&firstLineOnce, 0, 1) {
|
// Use atomic.LoadUint32() in front of atomic.CompareAndSwapUint32() in order to avoid slow inter-CPU synchronization
|
||||||
|
// in fast path after the first line has been already sent.
|
||||||
|
if atomic.LoadUint32(&firstLineOnce) == 0 && atomic.CompareAndSwapUint32(&firstLineOnce, 0, 1) {
|
||||||
// Send the first line to sw.bw
|
// Send the first line to sw.bw
|
||||||
WriteExportPromAPILine(bb, xb)
|
WriteExportPromAPILine(bb, xb)
|
||||||
_, err := sw.bw.Write(bb.B)
|
_, err := sw.bw.Write(bb.B)
|
||||||
|
|
|
@ -79,6 +79,8 @@ func (f *filter) Add(h uint64) bool {
|
||||||
w := atomic.LoadUint64(&bits[i])
|
w := atomic.LoadUint64(&bits[i])
|
||||||
for (w & mask) == 0 {
|
for (w & mask) == 0 {
|
||||||
wNew := w | mask
|
wNew := w | mask
|
||||||
|
// The wNew != w most of the time, so there is no need in using atomic.LoadUint64
|
||||||
|
// in front of atomic.CompareAndSwapUint64 in order to try avoiding slow inter-CPU synchronization.
|
||||||
if atomic.CompareAndSwapUint64(&bits[i], w, wNew) {
|
if atomic.CompareAndSwapUint64(&bits[i], w, wNew) {
|
||||||
isNew = true
|
isNew = true
|
||||||
break
|
break
|
||||||
|
|
|
@ -738,7 +738,11 @@ func (tb *Table) flushBlocksToParts(ibs []*inmemoryBlock, isFinal bool) {
|
||||||
if isFinal {
|
if isFinal {
|
||||||
tb.flushCallback()
|
tb.flushCallback()
|
||||||
} else {
|
} else {
|
||||||
atomic.CompareAndSwapUint32(&tb.needFlushCallbackCall, 0, 1)
|
// Use atomic.LoadUint32 in front of atomic.CompareAndSwapUint32 in order to avoid slow inter-CPU synchronization
|
||||||
|
// at fast path when needFlushCallbackCall is already set to 1.
|
||||||
|
if atomic.LoadUint32(&tb.needFlushCallbackCall) == 0 {
|
||||||
|
atomic.CompareAndSwapUint32(&tb.needFlushCallbackCall, 0, 1)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -665,14 +665,19 @@ func (s *Storage) startFreeDiskSpaceWatcher() {
|
||||||
freeSpaceBytes := fs.MustGetFreeSpace(s.path)
|
freeSpaceBytes := fs.MustGetFreeSpace(s.path)
|
||||||
if freeSpaceBytes < freeDiskSpaceLimitBytes {
|
if freeSpaceBytes < freeDiskSpaceLimitBytes {
|
||||||
// Switch the storage to readonly mode if there is no enough free space left at s.path
|
// Switch the storage to readonly mode if there is no enough free space left at s.path
|
||||||
if atomic.CompareAndSwapUint32(&s.isReadOnly, 0, 1) {
|
//
|
||||||
|
// Use atomic.LoadUint32 in front of atomic.CompareAndSwapUint32 in order to avoid slow inter-CPU synchronization
|
||||||
|
// when the storage is already in read-only mode.
|
||||||
|
if atomic.LoadUint32(&s.isReadOnly) == 0 && atomic.CompareAndSwapUint32(&s.isReadOnly, 0, 1) {
|
||||||
// log notification only on state change
|
// log notification only on state change
|
||||||
logger.Warnf("switching the storage at %s to read-only mode, since it has less than -storage.minFreeDiskSpaceBytes=%d of free space: %d bytes left",
|
logger.Warnf("switching the storage at %s to read-only mode, since it has less than -storage.minFreeDiskSpaceBytes=%d of free space: %d bytes left",
|
||||||
s.path, freeDiskSpaceLimitBytes, freeSpaceBytes)
|
s.path, freeDiskSpaceLimitBytes, freeSpaceBytes)
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if atomic.CompareAndSwapUint32(&s.isReadOnly, 1, 0) {
|
// Use atomic.LoadUint32 in front of atomic.CompareAndSwapUint32 in order to avoid slow inter-CPU synchronization
|
||||||
|
// when the storage isn't in read-only mode.
|
||||||
|
if atomic.LoadUint32(&s.isReadOnly) == 1 && atomic.CompareAndSwapUint32(&s.isReadOnly, 1, 0) {
|
||||||
logger.Warnf("enabling writing to the storage at %s, since it has more than -storage.minFreeDiskSpaceBytes=%d of free space: %d bytes left",
|
logger.Warnf("enabling writing to the storage at %s, since it has more than -storage.minFreeDiskSpaceBytes=%d of free space: %d bytes left",
|
||||||
s.path, freeDiskSpaceLimitBytes, freeSpaceBytes)
|
s.path, freeDiskSpaceLimitBytes, freeSpaceBytes)
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue