commit 9b7a9903fa9be60a856764c2d756db6746396424
parent b1619ae3bd59f7fcb9921433a53bf11a54abfd6f
Author: cfillion <cfillion@users.noreply.github.com>
Date: Tue, 7 Feb 2017 19:00:46 -0500
Merge branch 'thread-pool'
Diffstat:
6 files changed, 224 insertions(+), 234 deletions(-)
diff --git a/src/download.cpp b/src/download.cpp
@@ -20,7 +20,6 @@
#include "reapack.hpp"
#include <boost/format.hpp>
-#include <curl/curl.h>
#include <reaper_plugin_functions.h>
@@ -28,15 +27,14 @@ using boost::format;
using namespace std;
static const int DOWNLOAD_TIMEOUT = 15;
-static const int CONCURRENT_DOWNLOADS = 3;
-
-Download::Queue Download::s_finished;
-WDL_Mutex Download::s_mutex;
-size_t Download::s_running = 0;
+// to set the amount of concurrent downloads, change the size of
+// the m_pool member in DownloadQueue
static CURLSH *g_curlShare = nullptr;
static WDL_Mutex g_curlMutex;
+DownloadNotifier *DownloadNotifier::s_instance = nullptr;
+
static void LockCurlMutex(CURL *, curl_lock_data, curl_lock_access, void *)
{
g_curlMutex.Enter();
@@ -56,6 +54,7 @@ void Download::Init()
curl_share_setopt(g_curlShare, CURLSHOPT_LOCKFUNC, LockCurlMutex);
curl_share_setopt(g_curlShare, CURLSHOPT_UNLOCKFUNC, UnlockCurlMutex);
+
curl_share_setopt(g_curlShare, CURLSHOPT_SHARE, CURL_LOCK_DATA_DNS);
curl_share_setopt(g_curlShare, CURLSHOPT_SHARE, CURL_LOCK_DATA_SSL_SESSION);
}
@@ -66,91 +65,89 @@ void Download::Cleanup()
curl_global_cleanup();
}
-Download::Download(const string &name, const string &url,
- const NetworkOpts &opts, const int flags)
- : m_name(name), m_url(url), m_opts(opts),
- m_flags(flags), m_threadHandle(nullptr)
+size_t Download::WriteData(char *data, size_t rawsize, size_t nmemb, void *ptr)
{
- reset();
+ const size_t size = rawsize * nmemb;
+
+ static_cast<string *>(ptr)->append(data, size);
+
+ return size;
}
-Download::~Download()
+int Download::UpdateProgress(void *ptr, const double, const double,
+ const double, const double)
{
- wait();
+ return static_cast<Download *>(ptr)->m_abort;
}
-void Download::reset()
+Download::Download(const string &name, const string &url,
+ const NetworkOpts &opts, const int flags)
+ : m_name(name), m_url(url), m_opts(opts), m_flags(flags),
+ m_state(Idle), m_abort(false)
{
- m_state = Idle;
- m_aborted = false;
- m_contents.clear();
- m_progress = 0;
+ DownloadNotifier::get()->start();
}
-void Download::wait()
+Download::~Download()
{
- if(m_threadHandle) {
- WaitForSingleObject(m_threadHandle, INFINITE);
- CloseHandle(m_threadHandle);
- }
+ DownloadNotifier::get()->stop();
}
-void Download::TimerTick()
+void Download::setState(const State state)
{
- Download *dl = Download::NextFinished();
+ m_state = state;
- if(dl)
- dl->finishInMainThread();
+ switch(state) {
+ case Idle:
+ break;
+ case Running:
+ m_onStart();
+ break;
+ case Success:
+ case Failure:
+ case Aborted:
+ m_onFinish();
+ m_cleanupHandler();
+ break;
+ }
}
void Download::start()
{
- if(m_threadHandle)
- return;
-
- reset();
-
- m_state = Running;
- m_onStart();
-
- RegisterStart();
- m_threadHandle = CreateThread(nullptr, 0, Worker, (void *)this, 0, nullptr);
+ DownloadThread *thread = new DownloadThread;
+ thread->push(this);
+ onFinish([thread] { delete thread; });
}
-DWORD WINAPI Download::Worker(void *ptr)
+void Download::exec(CURL *curl)
{
- const auto userAgent = format("ReaPack/%s REAPER/%s")
- % ReaPack::VERSION % GetAppVersion();
+ const auto finish = [&](const State state, const string &contents) {
+ m_contents = contents;
- Download *download = static_cast<Download *>(ptr);
- const NetworkOpts &opts = download->options();
+ DownloadNotifier::get()->notify({this, state});
+ };
- string contents;
+ if(m_abort) {
+ finish(Aborted, "cancelled");
+ return;
+ }
- CURL *curl = curl_easy_init();
- curl_easy_setopt(curl, CURLOPT_URL, download->url().c_str());
- curl_easy_setopt(curl, CURLOPT_USERAGENT, userAgent.str().c_str());
- curl_easy_setopt(curl, CURLOPT_PROXY, opts.proxy.c_str());
- curl_easy_setopt(curl, CURLOPT_SSL_VERIFYPEER, opts.verifyPeer);
+ DownloadNotifier::get()->notify({this, Running});
- curl_easy_setopt(curl, CURLOPT_LOW_SPEED_LIMIT, 1);
- curl_easy_setopt(curl, CURLOPT_LOW_SPEED_TIME, DOWNLOAD_TIMEOUT);
- curl_easy_setopt(curl, CURLOPT_CONNECTTIMEOUT, DOWNLOAD_TIMEOUT);
+ string contents;
- curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, true);
- curl_easy_setopt(curl, CURLOPT_MAXREDIRS, 5);
- curl_easy_setopt(curl, CURLOPT_ACCEPT_ENCODING, "");
- curl_easy_setopt(curl, CURLOPT_FAILONERROR, true);
- curl_easy_setopt(curl, CURLOPT_SHARE, g_curlShare);
+ curl_easy_setopt(curl, CURLOPT_URL, m_url.c_str());
+ curl_easy_setopt(curl, CURLOPT_PROXY, m_opts.proxy.c_str());
+ curl_easy_setopt(curl, CURLOPT_SSL_VERIFYPEER, m_opts.verifyPeer);
- curl_easy_setopt(curl, CURLOPT_NOPROGRESS, false);
- curl_easy_setopt(curl, CURLOPT_PROGRESSDATA, download);
curl_easy_setopt(curl, CURLOPT_PROGRESSFUNCTION, UpdateProgress);
- curl_easy_setopt(curl, CURLOPT_WRITEDATA, &contents);
+ curl_easy_setopt(curl, CURLOPT_PROGRESSDATA, this);
+
curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, WriteData);
+ curl_easy_setopt(curl, CURLOPT_WRITEDATA, &contents);
curl_slist *headers = nullptr;
- if(download->has(NoCacheFlag))
+ if(has(Download::NoCacheFlag))
headers = curl_slist_append(headers, "Cache-Control: no-cache");
curl_easy_setopt(curl, CURLOPT_HTTPHEADER, headers);
@@ -159,208 +156,174 @@ DWORD WINAPI Download::Worker(void *ptr)
const CURLcode res = curl_easy_perform(curl);
- if(download->isAborted())
- download->finish(Aborted, "aborted by user");
+ if(m_abort)
+ finish(Aborted, "aborted by user");
else if(res != CURLE_OK) {
const auto err = format("%s (%d): %s") % curl_easy_strerror(res) % res % errbuf;
- download->finish(Failure, err.str());
+ finish(Failure, err.str());
}
else
- download->finish(Success, contents);
+ finish(Success, contents);
curl_slist_free_all(headers);
- curl_easy_cleanup(curl);
-
- return 0;
}
-size_t Download::WriteData(char *ptr, size_t rawsize, size_t nmemb, void *data)
+DownloadThread::DownloadThread() : m_exit(false)
{
- const size_t size = rawsize * nmemb;
-
- string *str = static_cast<string *>(data);
- str->append(ptr, size);
-
- return size;
+ m_wake = CreateEvent(nullptr, true, false, AUTO_STR("WakeEvent"));
+ m_thread = CreateThread(nullptr, 0, thread, (void *)this, 0, nullptr);
}
-int Download::UpdateProgress(void *ptr, const double dltotal, const double dlnow,
- const double ultotal, const double ulnow)
+DownloadThread::~DownloadThread()
{
- Download *dl = static_cast<Download *>(ptr);
-
- if(dl->isAborted())
- return 1;
+ // remove all pending downloads then wake the thread to make it exit
+ m_exit = true;
+ SetEvent(m_wake);
- const double total = ultotal + dltotal;
+ WaitForSingleObject(m_thread, INFINITE);
- if(total > 0) {
- const short progress = (short)((ulnow + dlnow / total) * 100);
- dl->setProgress(min(progress, (short)100));
- }
-
- return 0;
+ CloseHandle(m_wake);
+ CloseHandle(m_thread);
}
-void Download::RegisterStart()
+DWORD WINAPI DownloadThread::thread(void *ptr)
{
- WDL_MutexLock lock(&s_mutex);
+ DownloadThread *thread = static_cast<DownloadThread *>(ptr);
+ CURL *curl = curl_easy_init();
- if(!s_running)
- plugin_register("timer", (void*)TimerTick);
+ const auto userAgent = format("ReaPack/%s REAPER/%s")
+ % ReaPack::VERSION % GetAppVersion();
- s_running++;
-}
+ curl_easy_setopt(curl, CURLOPT_USERAGENT, userAgent.str().c_str());
+ curl_easy_setopt(curl, CURLOPT_LOW_SPEED_LIMIT, 1);
+ curl_easy_setopt(curl, CURLOPT_LOW_SPEED_TIME, DOWNLOAD_TIMEOUT);
+ curl_easy_setopt(curl, CURLOPT_CONNECTTIMEOUT, DOWNLOAD_TIMEOUT);
+ curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, true);
+ curl_easy_setopt(curl, CURLOPT_MAXREDIRS, 5);
+ curl_easy_setopt(curl, CURLOPT_ACCEPT_ENCODING, "");
+ curl_easy_setopt(curl, CURLOPT_FAILONERROR, true);
+ curl_easy_setopt(curl, CURLOPT_SHARE, g_curlShare);
+ curl_easy_setopt(curl, CURLOPT_NOPROGRESS, false);
-void Download::MarkAsFinished(Download *dl)
-{
- WDL_MutexLock lock(&s_mutex);
+ while(!thread->m_exit) {
+ while(Download *dl = thread->nextDownload())
+ dl->exec(curl);
+
+ ResetEvent(thread->m_wake);
+ WaitForSingleObject(thread->m_wake, INFINITE);
+ }
+
+ curl_easy_cleanup(curl);
- s_finished.push(dl);
+ return 0;
}
-Download *Download::NextFinished()
+Download *DownloadThread::nextDownload()
{
- WDL_MutexLock lock(&s_mutex);
-
- if(!s_running)
- plugin_register("-timer", (void *)TimerTick);
+ WDL_MutexLock lock(&m_mutex);
- if(s_finished.empty())
+ if(m_queue.empty())
return nullptr;
- Download *dl = s_finished.front();
- s_finished.pop();
-
- s_running--;
-
+ Download *dl = m_queue.front();
+ m_queue.pop();
return dl;
}
-void Download::setProgress(const short percent)
+void DownloadThread::push(Download *dl)
{
WDL_MutexLock lock(&m_mutex);
- m_progress = percent;
+ m_queue.push(dl);
+ SetEvent(m_wake);
}
-void Download::finish(const State state, const string &contents)
+DownloadQueue::~DownloadQueue()
{
- // called from the worker thread
-
- WDL_MutexLock lock(&m_mutex);
-
- m_state = state;
- m_contents = contents;
+ // don't emit DownloadQueue::onAbort from the destructor
+ // which is most likely to cause a crash
+ m_onAbort.disconnect_all_slots();
- MarkAsFinished(this);
+ abort();
}
-void Download::finishInMainThread()
+void DownloadQueue::push(Download *dl)
{
- if(m_threadHandle) {
- CloseHandle(m_threadHandle);
- m_threadHandle = nullptr;
- }
+ m_onPush(dl);
+ m_running.insert(dl);
- m_onFinish();
- m_cleanupHandler();
-}
+ dl->onFinish([=] {
+ m_running.erase(dl);
-auto Download::state() -> State
-{
- WDL_MutexLock lock(&m_mutex);
+ // call m_onDone() only after every onFinish slots ran
+ if(m_running.empty())
+ m_onDone();
+ });
- return m_state;
-}
+ dl->setCleanupHandler([=] { delete dl; });
-const string &Download::contents()
-{
- WDL_MutexLock lock(&m_mutex);
+ auto &thread = m_pool[m_running.size() % m_pool.size()];
+ if(!thread)
+ thread = make_unique<DownloadThread>();
- return m_contents;
+ thread->push(dl);
}
-bool Download::isAborted()
+void DownloadQueue::abort()
{
- WDL_MutexLock lock(&m_mutex);
+ for(Download *dl : m_running)
+ dl->abort();
- return m_aborted;
+ m_onAbort();
}
-short Download::progress()
+DownloadNotifier *DownloadNotifier::get()
{
- WDL_MutexLock lock(&m_mutex);
+ if(!s_instance)
+ s_instance = new DownloadNotifier;
- return m_progress;
+ return s_instance;
}
-void Download::abort()
+void DownloadNotifier::start()
{
- WDL_MutexLock lock(&m_mutex);
-
- m_aborted = true;
+ if(!m_active++)
+ plugin_register("timer", (void *)tick);
}
-DownloadQueue::~DownloadQueue()
+void DownloadNotifier::stop()
{
- // don't emit DownloadQueue::onAbort from the destructor
- // which is most likely to cause a crash
- m_onAbort.disconnect_all_slots();
-
- abort();
+ --m_active;
}
-void DownloadQueue::push(Download *dl)
+void DownloadNotifier::notify(const Notification ¬if)
{
- m_onPush(dl);
-
- dl->onFinish([=] {
- m_running.erase(remove(m_running.begin(), m_running.end(), dl));
-
- start();
- });
-
- dl->setCleanupHandler([=] {
- delete dl;
-
- // call m_onDone() only after every onFinish slots ran
- if(idle())
- m_onDone();
- });
-
- m_queue.push(dl);
+ WDL_MutexLock lock(&m_mutex);
- start();
+ m_queue.push(notif);
}
-void DownloadQueue::start()
+void DownloadNotifier::tick()
{
- while(m_running.size() < CONCURRENT_DOWNLOADS && !m_queue.empty()) {
- Download *dl = m_queue.front();
- m_queue.pop();
+ DownloadNotifier *instance = DownloadNotifier::get();
+ instance->processQueue();
+
+ // doing this in stop() would cause a use after free of m_mutex in processQueue
+ if(!instance->m_active) {
+ plugin_register("-timer", (void *)tick);
- m_running.push_back(dl);
- dl->start();
+ delete s_instance;
+ s_instance = nullptr;
}
}
-void DownloadQueue::abort()
+void DownloadNotifier::processQueue()
{
- for(Download *dl : m_running)
- dl->abort();
-
- clear();
-
- m_onAbort();
-}
+ WDL_MutexLock lock(&m_mutex);
-void DownloadQueue::clear()
-{
while(!m_queue.empty()) {
- Download *download = m_queue.front();
- delete download;
-
+ const Notification ¬if = m_queue.front();
+ notif.first->setState(notif.second);
m_queue.pop();
}
}
diff --git a/src/download.hpp b/src/download.hpp
@@ -20,19 +20,21 @@
#include "config.hpp"
+#include <array>
+#include <atomic>
#include <functional>
#include <queue>
#include <string>
-#include <vector>
+#include <unordered_set>
#include <boost/signals2.hpp>
#include <WDL/mutex.h>
+#include <curl/curl.h>
#include <reaper_plugin.h>
class Download {
public:
- typedef std::queue<Download *> Queue;
typedef boost::signals2::signal<void ()> VoidSignal;
typedef std::function<void ()> CleanupHandler;
@@ -60,55 +62,56 @@ public:
const NetworkOpts &options() const { return m_opts; }
bool has(Flag f) const { return (m_flags & f) != 0; }
- State state();
- const std::string &contents();
- bool isAborted();
- short progress();
+ void setState(State);
+ State state() const { return m_state; }
+ const std::string &contents() { return m_contents; }
void onStart(const VoidSignal::slot_type &slot) { m_onStart.connect(slot); }
void onFinish(const VoidSignal::slot_type &slot) { m_onFinish.connect(slot); }
void setCleanupHandler(const CleanupHandler &cb) { m_cleanupHandler = cb; }
void start();
- void abort();
- void wait();
+ void abort() { m_abort = true; }
+
+ void exec(CURL *);
private:
- static WDL_Mutex s_mutex;
- static Queue s_finished;
- static size_t s_running;
- static void RegisterStart();
- static Download *NextFinished();
- static void MarkAsFinished(Download *);
-
- static void TimerTick();
static size_t WriteData(char *, size_t, size_t, void *);
static int UpdateProgress(void *, double, double, double, double);
- static DWORD WINAPI Worker(void *ptr);
-
- void setProgress(short);
- void finish(const State state, const std::string &contents);
- void finishInMainThread();
- void reset();
std::string m_name;
std::string m_url;
NetworkOpts m_opts;
int m_flags;
- WDL_Mutex m_mutex;
-
- HANDLE m_threadHandle;
State m_state;
- bool m_aborted;
+ std::atomic_bool m_abort;
std::string m_contents;
- short m_progress;
VoidSignal m_onStart;
VoidSignal m_onFinish;
CleanupHandler m_cleanupHandler;
};
+class DownloadThread {
+public:
+ DownloadThread();
+ ~DownloadThread();
+
+ void push(Download *);
+ void clear();
+
+private:
+ static DWORD WINAPI thread(void *);
+ Download *nextDownload();
+
+ HANDLE m_wake;
+ HANDLE m_thread;
+ std::atomic_bool m_exit;
+ WDL_Mutex m_mutex;
+ std::queue<Download *> m_queue;
+};
+
class DownloadQueue {
public:
typedef boost::signals2::signal<void ()> VoidSignal;
@@ -119,24 +122,47 @@ public:
~DownloadQueue();
void push(Download *);
- void start();
void abort();
- bool idle() const { return m_queue.empty() && m_running.empty(); }
+ bool idle() const { return m_running.empty(); }
void onPush(const DownloadSignal::slot_type &slot) { m_onPush.connect(slot); }
void onAbort(const VoidSignal::slot_type &slot) { m_onAbort.connect(slot); }
void onDone(const VoidSignal::slot_type &slot) { m_onDone.connect(slot); }
private:
- void clear();
-
- Download::Queue m_queue;
- std::vector<Download *> m_running;
+ std::array<std::unique_ptr<DownloadThread>, 3> m_pool;
+ std::unordered_set<Download *> m_running;
DownloadSignal m_onPush;
VoidSignal m_onAbort;
VoidSignal m_onDone;
};
+// This singleton class receives state change notifications from a
+// worker thread and applies them in the main thread
+class DownloadNotifier {
+ typedef std::pair<Download *, Download::State> Notification;
+
+public:
+ static DownloadNotifier *get();
+
+ void start();
+ void stop();
+
+ void notify(const Notification &);
+
+private:
+ static DownloadNotifier *s_instance;
+ static void tick();
+
+ DownloadNotifier() : m_active(0) {}
+ ~DownloadNotifier() = default;
+ void processQueue();
+
+ WDL_Mutex m_mutex;
+ size_t m_active;
+ std::queue<Notification> m_queue;
+};
+
#endif
diff --git a/src/reapack.cpp b/src/reapack.cpp
@@ -436,17 +436,20 @@ Transaction *ReaPack::setupTransaction()
&entries, &m_config->install.promptObsolete) == IDOK;
});
- m_tx->setCleanupHandler([=] {
- delete m_tx;
- m_tx = nullptr;
-
- // refresh only once all onFinish slots were ran
- refreshBrowser();
- });
+ m_tx->setCleanupHandler(bind(&ReaPack::teardownTransaction, this));
return m_tx;
}
+void ReaPack::teardownTransaction()
+{
+ delete m_tx;
+ m_tx = nullptr;
+
+ // refresh only once the registry is close
+ refreshBrowser();
+}
+
void ReaPack::refreshManager()
{
if(m_manager)
diff --git a/src/reapack.hpp b/src/reapack.hpp
@@ -91,6 +91,7 @@ private:
void registerSelf();
void doFetchIndex(const Remote &remote, DownloadQueue *, HWND, bool stale);
IndexPtr loadIndex(const Remote &remote, HWND);
+ void teardownTransaction();
std::map<int, ActionCallback> m_actions;
diff --git a/src/task.cpp b/src/task.cpp
@@ -73,9 +73,6 @@ bool InstallTask::start()
void InstallTask::saveSource(Download *dl, const Source *src)
{
- if(dl->state() == Download::Aborted)
- return;
-
const Path &targetPath = src->targetPath();
Path tmpPath(targetPath);
diff --git a/src/transaction.cpp b/src/transaction.cpp
@@ -118,12 +118,12 @@ void Transaction::fetchIndex(const Remote &remote, const function<void()> &cb)
return;
}
- m_downloadQueue.push(dl);
-
dl->onFinish([=] {
if(saveFile(dl, Index::pathFor(dl->name())))
cb();
});
+
+ m_downloadQueue.push(dl);
}
void Transaction::install(const Version *ver, const bool pin)