jobqueue.cpp (2906B)
1 #include "jobqueue.h" 2 3 #include <shared_mutex> 4 5 #include "dsp56kEmu/threadtools.h" 6 7 namespace pluginLib::patchDB 8 { 9 JobQueue::JobQueue(std::string _name, const bool _start/* = true*/, const dsp56k::ThreadPriority& _prio/* = dsp56k::ThreadPriority::Normal*/, const uint32_t _threadCount/* = 1*/) 10 : m_name(std::move(_name)) 11 , m_threadPriority(_prio) 12 , m_threadCount(_threadCount) 13 { 14 if (_start) 15 start(); 16 } 17 18 JobQueue::~JobQueue() 19 { 20 destroy(); 21 } 22 23 void JobQueue::start() 24 { 25 if (!m_threads.empty()) 26 return; 27 28 m_destroy = false; 29 30 m_threads.reserve(m_threadCount); 31 32 for(size_t i=0; i<m_threadCount; ++i) 33 { 34 size_t idx = i; 35 m_threads.emplace_back(new std::thread([this, idx] 36 { 37 if (!m_name.empty()) 38 dsp56k::ThreadTools::setCurrentThreadName(m_name + std::to_string(idx)); 39 dsp56k::ThreadTools::setCurrentThreadPriority(m_threadPriority); 40 threadFunc(); 41 })); 42 } 43 } 44 45 void JobQueue::destroy() 46 { 47 if (m_destroy) 48 return; 49 50 { 51 std::unique_lock lock(m_mutexFuncs); 52 m_funcs.emplace_back([this] 53 { 54 m_destroy = true; 55 }); 56 } 57 58 m_cv.notify_all(); 59 60 for (const auto& thread : m_threads) 61 thread->join(); 62 m_threads.clear(); 63 64 m_funcs.clear(); 65 m_emptyCv.notify_all(); 66 } 67 68 void JobQueue::add(std::function<void()>&& _func) 69 { 70 { 71 std::unique_lock lock(m_mutexFuncs); 72 m_funcs.emplace_back(std::move(_func)); 73 } 74 m_cv.notify_one(); 75 } 76 77 size_t JobQueue::size() const 78 { 79 std::unique_lock lock(m_mutexFuncs); 80 return m_funcs.size() + m_numRunning; 81 } 82 83 void JobQueue::waitEmpty() 84 { 85 std::unique_lock lock(m_mutexFuncs); 86 87 m_emptyCv.wait(lock, [this] {return m_funcs.empty() && !m_numRunning; }); 88 } 89 90 size_t JobQueue::pending() const 91 { 92 std::unique_lock lock(m_mutexFuncs); 93 return m_funcs.size(); 94 } 95 96 void JobQueue::threadFunc() 97 { 98 while (!m_destroy) 99 { 100 std::unique_lock lock(m_mutexFuncs); 101 102 m_cv.wait(lock, [this] {return !m_funcs.empty();}); 103 104 const auto func = m_funcs.front(); 105 106 if (m_destroy) 107 return; 108 109 ++m_numRunning; 110 m_funcs.pop_front(); 111 112 lock.unlock(); 113 func(); 114 lock.lock(); 115 116 --m_numRunning; 117 118 if (m_funcs.empty() && !m_numRunning) 119 m_emptyCv.notify_all(); 120 } 121 } 122 123 JobGroup::JobGroup(JobQueue& _queue): m_queue(_queue) 124 { 125 } 126 127 JobGroup::~JobGroup() 128 { 129 wait(); 130 } 131 132 void JobGroup::add(std::function<void()>&& _func) 133 { 134 { 135 std::unique_lock lockCounts(m_mutexCounts); 136 ++m_countEnqueued; 137 } 138 139 auto func = [this, f = std::move(_func)] 140 { 141 f(); 142 onFuncCompleted(); 143 }; 144 145 m_queue.add(func); 146 } 147 148 void JobGroup::wait() 149 { 150 std::unique_lock l(m_mutexCounts); 151 m_completedCv.wait(l, [this] 152 { 153 return m_countCompleted == m_countEnqueued; 154 } 155 ); 156 } 157 158 void JobGroup::onFuncCompleted() 159 { 160 std::unique_lock lockCounts(m_mutexCounts); 161 ++m_countCompleted; 162 163 if (m_countCompleted == m_countEnqueued) 164 { 165 lockCounts.unlock(); 166 m_completedCv.notify_one(); 167 } 168 } 169 }