thread.cpp (3642B)
1 /* ReaPack: Package manager for REAPER 2 * Copyright (C) 2015-2025 Christian Fillion 3 * 4 * This program is free software: you can redistribute it and/or modify 5 * it under the terms of the GNU Lesser General Public License as published by 6 * the Free Software Foundation, either version 3 of the License, or 7 * (at your option) any later version. 8 * 9 * This program is distributed in the hope that it will be useful, 10 * but WITHOUT ANY WARRANTY; without even the implied warranty of 11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 12 * GNU Lesser General Public License for more details. 13 * 14 * You should have received a copy of the GNU Lesser General Public License 15 * along with this program. If not, see <http://www.gnu.org/licenses/>. 16 */ 17 18 #include "thread.hpp" 19 20 #include <reaper_plugin_functions.h> 21 22 #ifdef _WIN32 23 typedef void (__stdcall *_tls_callback_type)(HANDLE, DWORD const dwReason, LPVOID); 24 extern "C" extern const _tls_callback_type __dyn_tls_dtor_callback; 25 #endif 26 27 ThreadTask::ThreadTask() : m_state(Idle), m_abort(false) 28 { 29 } 30 31 ThreadTask::~ThreadTask() 32 { 33 } 34 35 void ThreadTask::exec() 36 { 37 State state = Idle; 38 39 if(!aborted()) { 40 onStartAsync(); 41 state = run() ? Success : Failure; 42 } 43 44 if(aborted()) // may have changed while the task was running 45 state = Aborted; 46 47 m_state = state; 48 onFinishAsync(); 49 } 50 51 WorkerThread::WorkerThread() : m_stop(false), m_thread(&WorkerThread::run, this) 52 { 53 } 54 55 WorkerThread::~WorkerThread() 56 { 57 { 58 std::lock_guard lock(m_mutex); 59 m_stop = true; 60 } 61 62 m_wake.notify_one(); 63 m_thread.join(); 64 } 65 66 void WorkerThread::run() 67 { 68 std::unique_lock<std::mutex> lock(m_mutex); 69 70 while(true) { 71 m_wake.wait(lock, [=] { return !m_queue.empty() || m_stop; }); 72 73 if(m_stop) 74 break; 75 76 ThreadTask *task = m_queue.front(); 77 m_queue.pop(); 78 79 lock.unlock(); 80 task->exec(); 81 lock.lock(); 82 } 83 84 #ifdef _WIN32 85 // HACK: Destruct thread-local storage objects earlier on Windows to avoid a 86 // possible deadlock when tearing down the cURL context with active HTTPS 87 // connections on some computers [p=2038163]. InitializeSecurityContext would 88 // hang forever waiting for a semaphore for undetermined reasons... 89 // 90 // Note that the destructors are not called a second time when this function 91 // is invoked by the C++ runtime during the normal thread shutdown procedure. 92 __dyn_tls_dtor_callback(nullptr, DLL_THREAD_DETACH, nullptr); 93 #endif 94 } 95 96 ThreadTask *WorkerThread::nextTask() 97 { 98 std::lock_guard<std::mutex> guard(m_mutex); 99 100 if(m_queue.empty()) 101 return nullptr; 102 103 ThreadTask *task = m_queue.front(); 104 m_queue.pop(); 105 return task; 106 } 107 108 void WorkerThread::push(ThreadTask *task) 109 { 110 std::lock_guard<std::mutex> guard(m_mutex); 111 112 m_queue.push(task); 113 m_wake.notify_one(); 114 } 115 116 ThreadPool::~ThreadPool() 117 { 118 // don't emit ThreadPool::onAbort from the destructor 119 // which is most likely to cause a crash 120 onAbort.reset(); 121 122 abort(); 123 } 124 125 void ThreadPool::push(ThreadTask *task) 126 { 127 onPush(task); 128 m_running.insert(task); 129 130 task->onFinishAsync >> [=] { 131 m_running.erase(task); 132 133 // 'this' (captured variable) isn't valid after the delete below 134 ThreadPool *self = this; 135 delete task; 136 137 // call m_onDone() only after every onFinish slots ran 138 if(self->m_running.empty()) 139 self->onDone(); 140 }; 141 142 const size_t nextThread = m_running.size() % m_pool.size(); 143 auto &thread = task->concurrent() ? m_pool[nextThread] : m_pool.front(); 144 if(!thread) 145 thread = std::make_unique<WorkerThread>(); 146 147 thread->push(task); 148 } 149 150 void ThreadPool::abort() 151 { 152 for(ThreadTask *task : m_running) 153 task->abort(); 154 155 onAbort(); 156 }