reapack

Package manager for REAPER
Log | Files | Refs | Submodules | README | LICENSE

thread.cpp (3642B)


      1 /* ReaPack: Package manager for REAPER
      2  * Copyright (C) 2015-2025  Christian Fillion
      3  *
      4  * This program is free software: you can redistribute it and/or modify
      5  * it under the terms of the GNU Lesser General Public License as published by
      6  * the Free Software Foundation, either version 3 of the License, or
      7  * (at your option) any later version.
      8  *
      9  * This program is distributed in the hope that it will be useful,
     10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
     11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
     12  * GNU Lesser General Public License for more details.
     13  *
     14  * You should have received a copy of the GNU Lesser General Public License
     15  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
     16  */
     17 
     18 #include "thread.hpp"
     19 
     20 #include <reaper_plugin_functions.h>
     21 
     22 #ifdef _WIN32
     23   typedef void (__stdcall *_tls_callback_type)(HANDLE, DWORD const dwReason, LPVOID);
     24   extern "C" extern const _tls_callback_type __dyn_tls_dtor_callback;
     25 #endif
     26 
     27 ThreadTask::ThreadTask() : m_state(Idle), m_abort(false)
     28 {
     29 }
     30 
     31 ThreadTask::~ThreadTask()
     32 {
     33 }
     34 
     35 void ThreadTask::exec()
     36 {
     37   State state = Idle;
     38 
     39   if(!aborted()) {
     40     onStartAsync();
     41     state = run() ? Success : Failure;
     42   }
     43 
     44   if(aborted()) // may have changed while the task was running
     45     state = Aborted;
     46 
     47   m_state = state;
     48   onFinishAsync();
     49 }
     50 
     51 WorkerThread::WorkerThread() : m_stop(false), m_thread(&WorkerThread::run, this)
     52 {
     53 }
     54 
     55 WorkerThread::~WorkerThread()
     56 {
     57   {
     58     std::lock_guard lock(m_mutex);
     59     m_stop = true;
     60   }
     61 
     62   m_wake.notify_one();
     63   m_thread.join();
     64 }
     65 
     66 void WorkerThread::run()
     67 {
     68   std::unique_lock<std::mutex> lock(m_mutex);
     69 
     70   while(true) {
     71     m_wake.wait(lock, [=] { return !m_queue.empty() || m_stop; });
     72 
     73     if(m_stop)
     74       break;
     75 
     76     ThreadTask *task = m_queue.front();
     77     m_queue.pop();
     78 
     79     lock.unlock();
     80     task->exec();
     81     lock.lock();
     82   }
     83 
     84 #ifdef _WIN32
     85   // HACK: Destruct thread-local storage objects earlier on Windows to avoid a
     86   // possible deadlock when tearing down the cURL context with active HTTPS
     87   // connections on some computers [p=2038163]. InitializeSecurityContext would
     88   // hang forever waiting for a semaphore for undetermined reasons...
     89   //
     90   // Note that the destructors are not called a second time when this function
     91   // is invoked by the C++ runtime during the normal thread shutdown procedure.
     92   __dyn_tls_dtor_callback(nullptr, DLL_THREAD_DETACH, nullptr);
     93 #endif
     94 }
     95 
     96 ThreadTask *WorkerThread::nextTask()
     97 {
     98   std::lock_guard<std::mutex> guard(m_mutex);
     99 
    100   if(m_queue.empty())
    101     return nullptr;
    102 
    103   ThreadTask *task = m_queue.front();
    104   m_queue.pop();
    105   return task;
    106 }
    107 
    108 void WorkerThread::push(ThreadTask *task)
    109 {
    110   std::lock_guard<std::mutex> guard(m_mutex);
    111 
    112   m_queue.push(task);
    113   m_wake.notify_one();
    114 }
    115 
    116 ThreadPool::~ThreadPool()
    117 {
    118   // don't emit ThreadPool::onAbort from the destructor
    119   // which is most likely to cause a crash
    120   onAbort.reset();
    121 
    122   abort();
    123 }
    124 
    125 void ThreadPool::push(ThreadTask *task)
    126 {
    127   onPush(task);
    128   m_running.insert(task);
    129 
    130   task->onFinishAsync >> [=] {
    131     m_running.erase(task);
    132 
    133     // 'this' (captured variable) isn't valid after the delete below
    134     ThreadPool *self = this;
    135     delete task;
    136 
    137     // call m_onDone() only after every onFinish slots ran
    138     if(self->m_running.empty())
    139       self->onDone();
    140   };
    141 
    142   const size_t nextThread = m_running.size() % m_pool.size();
    143   auto &thread = task->concurrent() ? m_pool[nextThread] : m_pool.front();
    144   if(!thread)
    145     thread = std::make_unique<WorkerThread>();
    146 
    147   thread->push(task);
    148 }
    149 
    150 void ThreadPool::abort()
    151 {
    152   for(ThreadTask *task : m_running)
    153     task->abort();
    154 
    155   onAbort();
    156 }