From 3f261f22c981cbdc31ce079f0fd06789e78053e3 Mon Sep 17 00:00:00 2001
From: Liam <byteslice@airmail.cc>
Date: Thu, 9 Mar 2023 20:58:37 -0500
Subject: [PATCH] vk_scheduler: split work queue waits and execution waits

---
 .../renderer_vulkan/vk_scheduler.cpp          | 92 +++++++++++++------
 src/video_core/renderer_vulkan/vk_scheduler.h |  6 +-
 2 files changed, 66 insertions(+), 32 deletions(-)

diff --git a/src/video_core/renderer_vulkan/vk_scheduler.cpp b/src/video_core/renderer_vulkan/vk_scheduler.cpp
index e03685af10..c636a16256 100644
--- a/src/video_core/renderer_vulkan/vk_scheduler.cpp
+++ b/src/video_core/renderer_vulkan/vk_scheduler.cpp
@@ -47,14 +47,15 @@ Scheduler::Scheduler(const Device& device_, StateTracker& state_tracker_)
 Scheduler::~Scheduler() = default;
 
 void Scheduler::Flush(VkSemaphore signal_semaphore, VkSemaphore wait_semaphore) {
+    // When flushing, we only send data to the worker thread; no waiting is necessary.
     SubmitExecution(signal_semaphore, wait_semaphore);
     AllocateNewContext();
 }
 
 void Scheduler::Finish(VkSemaphore signal_semaphore, VkSemaphore wait_semaphore) {
+    // When finishing, we need to wait for the submission to have executed on the device.
     const u64 presubmit_tick = CurrentTick();
     SubmitExecution(signal_semaphore, wait_semaphore);
-    WaitWorker();
     Wait(presubmit_tick);
     AllocateNewContext();
 }
@@ -63,8 +64,13 @@ void Scheduler::WaitWorker() {
     MICROPROFILE_SCOPE(Vulkan_WaitForWorker);
     DispatchWork();
 
-    std::unique_lock lock{work_mutex};
-    wait_cv.wait(lock, [this] { return work_queue.empty(); });
+    // Ensure the queue is drained.
+    std::unique_lock ql{queue_mutex};
+    event_cv.wait(ql, [this] { return work_queue.empty(); });
+
+    // Now wait for execution to finish.
+    // This needs to be done in the same order as WorkerThread.
+    std::unique_lock el{execution_mutex};
 }
 
 void Scheduler::DispatchWork() {
@@ -72,10 +78,10 @@ void Scheduler::DispatchWork() {
         return;
     }
     {
-        std::scoped_lock lock{work_mutex};
+        std::scoped_lock ql{queue_mutex};
         work_queue.push(std::move(chunk));
     }
-    work_cv.notify_one();
+    event_cv.notify_all();
     AcquireNewChunk();
 }
 
@@ -137,30 +143,55 @@ bool Scheduler::UpdateRescaling(bool is_rescaling) {
 
 void Scheduler::WorkerThread(std::stop_token stop_token) {
     Common::SetCurrentThreadName("VulkanWorker");
-    do {
-        std::unique_ptr<CommandChunk> work;
-        bool has_submit{false};
-        {
-            std::unique_lock lock{work_mutex};
-            if (work_queue.empty()) {
-                wait_cv.notify_all();
-            }
-            Common::CondvarWait(work_cv, lock, stop_token, [&] { return !work_queue.empty(); });
-            if (stop_token.stop_requested()) {
-                continue;
-            }
-            work = std::move(work_queue.front());
-            work_queue.pop();
 
-            has_submit = work->HasSubmit();
+    const auto TryPopQueue{[this](auto& work) -> bool {
+        if (work_queue.empty()) {
+            return false;
+        }
+
+        work = std::move(work_queue.front());
+        work_queue.pop();
+        event_cv.notify_all();
+        return true;
+    }};
+
+    while (!stop_token.stop_requested()) {
+        std::unique_ptr<CommandChunk> work;
+
+        {
+            std::unique_lock lk{queue_mutex};
+
+            // Wait for work.
+            Common::CondvarWait(event_cv, lk, stop_token, [&] { return TryPopQueue(work); });
+
+            // If we've been asked to stop, we're done.
+            if (stop_token.stop_requested()) {
+                return;
+            }
+
+            // Exchange lock ownership so that we take the execution lock before
+            // the queue lock goes out of scope. This allows us to force execution
+            // to complete in the next step.
+            std::exchange(lk, std::unique_lock{execution_mutex});
+
+            // Perform the work, tracking whether the chunk was a submission
+            // before executing.
+            const bool has_submit = work->HasSubmit();
             work->ExecuteAll(current_cmdbuf);
+
+            // If the chunk was a submission, reallocate the command buffer.
+            if (has_submit) {
+                AllocateWorkerCommandBuffer();
+            }
         }
-        if (has_submit) {
-            AllocateWorkerCommandBuffer();
+
+        {
+            std::scoped_lock rl{reserve_mutex};
+
+            // Recycle the chunk back to the reserve.
+            chunk_reserve.emplace_back(std::move(work));
         }
-        std::scoped_lock reserve_lock{reserve_mutex};
-        chunk_reserve.push_back(std::move(work));
-    } while (!stop_token.stop_requested());
+    }
 }
 
 void Scheduler::AllocateWorkerCommandBuffer() {
@@ -289,13 +320,16 @@ void Scheduler::EndRenderPass() {
 }
 
 void Scheduler::AcquireNewChunk() {
-    std::scoped_lock lock{reserve_mutex};
+    std::scoped_lock rl{reserve_mutex};
+
     if (chunk_reserve.empty()) {
+        // If we don't have anything reserved, we need to make a new chunk.
         chunk = std::make_unique<CommandChunk>();
-        return;
+    } else {
+        // Otherwise, we can just take from the reserve.
+        chunk = std::make_unique<CommandChunk>();
+        chunk_reserve.pop_back();
     }
-    chunk = std::move(chunk_reserve.back());
-    chunk_reserve.pop_back();
 }
 
 } // namespace Vulkan
diff --git a/src/video_core/renderer_vulkan/vk_scheduler.h b/src/video_core/renderer_vulkan/vk_scheduler.h
index bd4cb0f7e4..8d75ce9877 100644
--- a/src/video_core/renderer_vulkan/vk_scheduler.h
+++ b/src/video_core/renderer_vulkan/vk_scheduler.h
@@ -232,10 +232,10 @@ private:
 
     std::queue<std::unique_ptr<CommandChunk>> work_queue;
     std::vector<std::unique_ptr<CommandChunk>> chunk_reserve;
+    std::mutex execution_mutex;
     std::mutex reserve_mutex;
-    std::mutex work_mutex;
-    std::condition_variable_any work_cv;
-    std::condition_variable wait_cv;
+    std::mutex queue_mutex;
+    std::condition_variable_any event_cv;
     std::jthread worker_thread;
 };