gearmulator

Emulation of classic VA synths of the late 90s/2000s that are based on Motorola 56300 family DSPs
Log | Files | Refs | Submodules | README | LICENSE

jobqueue.cpp (2906B)


      1 #include "jobqueue.h"
      2 
      3 #include <shared_mutex>
      4 
      5 #include "dsp56kEmu/threadtools.h"
      6 
      7 namespace pluginLib::patchDB
      8 {
      9 	JobQueue::JobQueue(std::string _name, const bool _start/* = true*/, const dsp56k::ThreadPriority& _prio/* = dsp56k::ThreadPriority::Normal*/, const uint32_t _threadCount/* = 1*/)
     10 	: m_name(std::move(_name))
     11 	, m_threadPriority(_prio)
     12 	, m_threadCount(_threadCount)
     13 	{
     14 		if (_start)
     15 			start();
     16 	}
     17 
     18 	JobQueue::~JobQueue()
     19 	{
     20 		destroy();
     21 	}
     22 
     23 	void JobQueue::start()
     24 	{
     25 		if (!m_threads.empty())
     26 			return;
     27 
     28 		m_destroy = false;
     29 
     30 		m_threads.reserve(m_threadCount);
     31 
     32 		for(size_t i=0; i<m_threadCount; ++i)
     33 		{
     34 			size_t idx = i;
     35 			m_threads.emplace_back(new std::thread([this, idx]
     36 			{
     37 				if (!m_name.empty())
     38 					dsp56k::ThreadTools::setCurrentThreadName(m_name + std::to_string(idx));
     39 				dsp56k::ThreadTools::setCurrentThreadPriority(m_threadPriority);
     40 				threadFunc();
     41 			}));
     42 		}
     43 	}
     44 
     45 	void JobQueue::destroy()
     46 	{
     47 		if (m_destroy)
     48 			return;
     49 
     50 		{
     51 			std::unique_lock lock(m_mutexFuncs);
     52 			m_funcs.emplace_back([this]
     53 			{
     54 				m_destroy = true;
     55 			});
     56 		}
     57 
     58 		m_cv.notify_all();
     59 
     60 		for (const auto& thread : m_threads)
     61 			thread->join();
     62 		m_threads.clear();
     63 
     64 		m_funcs.clear();
     65 		m_emptyCv.notify_all();
     66 	}
     67 
     68 	void JobQueue::add(std::function<void()>&& _func)
     69 	{
     70 		{
     71 			std::unique_lock lock(m_mutexFuncs);
     72 			m_funcs.emplace_back(std::move(_func));
     73 		}
     74 		m_cv.notify_one();
     75 	}
     76 
     77 	size_t JobQueue::size() const
     78 	{
     79 		std::unique_lock lock(m_mutexFuncs);
     80 		return m_funcs.size() + m_numRunning;
     81 	}
     82 
     83 	void JobQueue::waitEmpty()
     84 	{
     85 		std::unique_lock lock(m_mutexFuncs);
     86 
     87 		m_emptyCv.wait(lock, [this] {return m_funcs.empty() && !m_numRunning; });
     88 	}
     89 
     90 	size_t JobQueue::pending() const
     91 	{
     92 		std::unique_lock lock(m_mutexFuncs);
     93 		return m_funcs.size();
     94 	}
     95 
     96 	void JobQueue::threadFunc()
     97 	{
     98 		while (!m_destroy)
     99 		{
    100 			std::unique_lock lock(m_mutexFuncs);
    101 
    102 			m_cv.wait(lock, [this] {return !m_funcs.empty();});
    103 
    104 			const auto func = m_funcs.front();
    105 
    106 			if (m_destroy)
    107 				return;
    108 
    109 			++m_numRunning;
    110 			m_funcs.pop_front();
    111 
    112 			lock.unlock();
    113 			func();
    114 			lock.lock();
    115 
    116 			--m_numRunning;
    117 
    118 			if (m_funcs.empty() && !m_numRunning)
    119 				m_emptyCv.notify_all();
    120 		}
    121 	}
    122 
    123 	JobGroup::JobGroup(JobQueue& _queue): m_queue(_queue)
    124 	{
    125 	}
    126 
    127 	JobGroup::~JobGroup()
    128 	{
    129 		wait();
    130 	}
    131 
    132 	void JobGroup::add(std::function<void()>&& _func)
    133 	{
    134 		{
    135 			std::unique_lock lockCounts(m_mutexCounts);
    136 			++m_countEnqueued;
    137 		}
    138 
    139 		auto func = [this, f = std::move(_func)]
    140 		{
    141 			f();
    142 			onFuncCompleted();
    143 		};
    144 
    145 		m_queue.add(func);
    146 	}
    147 
    148 	void JobGroup::wait()
    149 	{
    150 		std::unique_lock l(m_mutexCounts);
    151 		m_completedCv.wait(l, [this]
    152 			{
    153 				return m_countCompleted == m_countEnqueued;
    154 			}
    155 		);
    156 	}
    157 
    158 	void JobGroup::onFuncCompleted()
    159 	{
    160 		std::unique_lock lockCounts(m_mutexCounts);
    161 		++m_countCompleted;
    162 
    163 		if (m_countCompleted == m_countEnqueued)
    164 		{
    165 			lockCounts.unlock();
    166 			m_completedCv.notify_one();
    167 		}
    168 	}
    169 }