reapack

Package manager for REAPER
Log | Files | Refs | Submodules | README | LICENSE

commit 0eb0446119d0c5bf570ca4534c227e0a318e8f30
parent b1619ae3bd59f7fcb9921433a53bf11a54abfd6f
Author: cfillion <cfillion@users.noreply.github.com>
Date:   Sun,  5 Feb 2017 19:19:45 -0500

download: redesign thread scheduling (now 50% faster with many files)

Diffstat:
Msrc/download.cpp | 373+++++++++++++++++++++++++++++++++++++++++--------------------------------------
Msrc/download.hpp | 75++++++++++++++++++++++++++++++++++++++++++++++++++++++---------------------
2 files changed, 248 insertions(+), 200 deletions(-)

diff --git a/src/download.cpp b/src/download.cpp @@ -20,7 +20,6 @@ #include "reapack.hpp" #include <boost/format.hpp> -#include <curl/curl.h> #include <reaper_plugin_functions.h> @@ -28,15 +27,12 @@ using boost::format; using namespace std; static const int DOWNLOAD_TIMEOUT = 15; -static const int CONCURRENT_DOWNLOADS = 3; - -Download::Queue Download::s_finished; -WDL_Mutex Download::s_mutex; -size_t Download::s_running = 0; static CURLSH *g_curlShare = nullptr; static WDL_Mutex g_curlMutex; +DownloadNotifier *DownloadNotifier::s_instance = nullptr; + static void LockCurlMutex(CURL *, curl_lock_data, curl_lock_access, void *) { g_curlMutex.Enter(); @@ -56,6 +52,7 @@ void Download::Init() curl_share_setopt(g_curlShare, CURLSHOPT_LOCKFUNC, LockCurlMutex); curl_share_setopt(g_curlShare, CURLSHOPT_UNLOCKFUNC, UnlockCurlMutex); + curl_share_setopt(g_curlShare, CURLSHOPT_SHARE, CURL_LOCK_DATA_DNS); curl_share_setopt(g_curlShare, CURLSHOPT_SHARE, CURL_LOCK_DATA_SSL_SESSION); } @@ -66,17 +63,46 @@ void Download::Cleanup() curl_global_cleanup(); } +size_t Download::WriteData(char *ptr, size_t rawsize, size_t nmemb, void *data) +{ + const size_t size = rawsize * nmemb; + + string *str = static_cast<string *>(data); + str->append(ptr, size); + + return size; +} + +int Download::UpdateProgress(void *ptr, const double dltotal, const double dlnow, + const double ultotal, const double ulnow) +{ + Download *dl = static_cast<Download *>(ptr); + + if(dl->isAborted()) + return 1; + + const double total = ultotal + dltotal; + + if(total > 0) { + const short progress = (short)((ulnow + dlnow / total) * 100); + dl->setProgress(min(progress, (short)100)); + } + + return 0; +} + Download::Download(const string &name, const string &url, const NetworkOpts &opts, const int flags) - : m_name(name), m_url(url), m_opts(opts), - m_flags(flags), m_threadHandle(nullptr) + : m_name(name), m_url(url), m_opts(opts), m_flags(flags) { reset(); + + DownloadNotifier::get()->start(); } Download::~Download() { - wait(); + DownloadNotifier::get()->stop(); } void Download::reset() @@ -87,70 +113,93 @@ void Download::reset() m_progress = 0; } -void Download::wait() +void Download::setProgress(const short percent) +{ + WDL_MutexLock lock(&m_mutex); + + m_progress = percent; +} + +void Download::finish(const State state, const string &contents) +{ + DownloadNotifier::get()->notify({this, state}); + + WDL_MutexLock lock(&m_mutex); + m_contents = contents; +} + +void Download::setState(const State state) { - if(m_threadHandle) { - WaitForSingleObject(m_threadHandle, INFINITE); - CloseHandle(m_threadHandle); + m_state = state; + + switch(state) { + case Idle: + break; + case Running: + m_onStart(); + break; + case Success: + case Failure: + case Aborted: + m_onFinish(); + m_cleanupHandler(); + break; } } -void Download::TimerTick() +const string &Download::contents() { - Download *dl = Download::NextFinished(); + WDL_MutexLock lock(&m_mutex); - if(dl) - dl->finishInMainThread(); + return m_contents; } -void Download::start() +bool Download::isAborted() { - if(m_threadHandle) - return; + WDL_MutexLock lock(&m_mutex); - reset(); + return m_aborted; +} - m_state = Running; - m_onStart(); +short Download::progress() +{ + WDL_MutexLock lock(&m_mutex); - RegisterStart(); - m_threadHandle = CreateThread(nullptr, 0, Worker, (void *)this, 0, nullptr); + return m_progress; } -DWORD WINAPI Download::Worker(void *ptr) +void Download::start() { - const auto userAgent = format("ReaPack/%s REAPER/%s") - % ReaPack::VERSION % GetAppVersion(); + DownloadThread *thread = new DownloadThread; + thread->push(this); + onFinish([thread] { delete thread; }); +} - Download *download = static_cast<Download *>(ptr); - const NetworkOpts &opts = download->options(); +void Download::abort() +{ + WDL_MutexLock lock(&m_mutex); - string contents; + m_aborted = true; +} - CURL *curl = curl_easy_init(); - curl_easy_setopt(curl, CURLOPT_URL, download->url().c_str()); - curl_easy_setopt(curl, CURLOPT_USERAGENT, userAgent.str().c_str()); - curl_easy_setopt(curl, CURLOPT_PROXY, opts.proxy.c_str()); - curl_easy_setopt(curl, CURLOPT_SSL_VERIFYPEER, opts.verifyPeer); +void Download::exec(CURL *curl) +{ + DownloadNotifier::get()->notify({this, Running}); - curl_easy_setopt(curl, CURLOPT_LOW_SPEED_LIMIT, 1); - curl_easy_setopt(curl, CURLOPT_LOW_SPEED_TIME, DOWNLOAD_TIMEOUT); - curl_easy_setopt(curl, CURLOPT_CONNECTTIMEOUT, DOWNLOAD_TIMEOUT); + string contents; - curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, true); - curl_easy_setopt(curl, CURLOPT_MAXREDIRS, 5); - curl_easy_setopt(curl, CURLOPT_ACCEPT_ENCODING, ""); - curl_easy_setopt(curl, CURLOPT_FAILONERROR, true); - curl_easy_setopt(curl, CURLOPT_SHARE, g_curlShare); + curl_easy_setopt(curl, CURLOPT_URL, m_url.c_str()); + curl_easy_setopt(curl, CURLOPT_PROXY, m_opts.proxy.c_str()); + curl_easy_setopt(curl, CURLOPT_SSL_VERIFYPEER, m_opts.verifyPeer); - curl_easy_setopt(curl, CURLOPT_NOPROGRESS, false); - curl_easy_setopt(curl, CURLOPT_PROGRESSDATA, download); curl_easy_setopt(curl, CURLOPT_PROGRESSFUNCTION, UpdateProgress); - curl_easy_setopt(curl, CURLOPT_WRITEDATA, &contents); + curl_easy_setopt(curl, CURLOPT_PROGRESSDATA, this); + curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, WriteData); + curl_easy_setopt(curl, CURLOPT_WRITEDATA, &contents); curl_slist *headers = nullptr; - if(download->has(NoCacheFlag)) + if(has(Download::NoCacheFlag)) headers = curl_slist_append(headers, "Cache-Control: no-cache"); curl_easy_setopt(curl, CURLOPT_HTTPHEADER, headers); @@ -159,208 +208,174 @@ DWORD WINAPI Download::Worker(void *ptr) const CURLcode res = curl_easy_perform(curl); - if(download->isAborted()) - download->finish(Aborted, "aborted by user"); + if(isAborted()) + finish(Aborted, "aborted by user"); else if(res != CURLE_OK) { const auto err = format("%s (%d): %s") % curl_easy_strerror(res) % res % errbuf; - download->finish(Failure, err.str()); + finish(Failure, err.str()); } else - download->finish(Success, contents); + finish(Success, contents); curl_slist_free_all(headers); - curl_easy_cleanup(curl); - - return 0; } -size_t Download::WriteData(char *ptr, size_t rawsize, size_t nmemb, void *data) +DownloadThread::DownloadThread() : m_exit(false) { - const size_t size = rawsize * nmemb; - - string *str = static_cast<string *>(data); - str->append(ptr, size); - - return size; + m_wake = CreateEvent(nullptr, true, false, AUTO_STR("WakeEvent")); + m_thread = CreateThread(nullptr, 0, thread, (void *)this, 0, nullptr); } -int Download::UpdateProgress(void *ptr, const double dltotal, const double dlnow, - const double ultotal, const double ulnow) +DownloadThread::~DownloadThread() { - Download *dl = static_cast<Download *>(ptr); - - if(dl->isAborted()) - return 1; + // remove all pending downloads then wake the thread to make it exit + m_exit = true; + SetEvent(m_wake); - const double total = ultotal + dltotal; + WaitForSingleObject(m_thread, INFINITE); - if(total > 0) { - const short progress = (short)((ulnow + dlnow / total) * 100); - dl->setProgress(min(progress, (short)100)); - } - - return 0; + CloseHandle(m_wake); + CloseHandle(m_thread); } -void Download::RegisterStart() +DWORD WINAPI DownloadThread::thread(void *ptr) { - WDL_MutexLock lock(&s_mutex); + DownloadThread *thread = static_cast<DownloadThread *>(ptr); + CURL *curl = curl_easy_init(); - if(!s_running) - plugin_register("timer", (void*)TimerTick); + const auto userAgent = format("ReaPack/%s REAPER/%s") + % ReaPack::VERSION % GetAppVersion(); - s_running++; -} + curl_easy_setopt(curl, CURLOPT_USERAGENT, userAgent.str().c_str()); + curl_easy_setopt(curl, CURLOPT_LOW_SPEED_LIMIT, 1); + curl_easy_setopt(curl, CURLOPT_LOW_SPEED_TIME, DOWNLOAD_TIMEOUT); + curl_easy_setopt(curl, CURLOPT_CONNECTTIMEOUT, DOWNLOAD_TIMEOUT); + curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, true); + curl_easy_setopt(curl, CURLOPT_MAXREDIRS, 5); + curl_easy_setopt(curl, CURLOPT_ACCEPT_ENCODING, ""); + curl_easy_setopt(curl, CURLOPT_FAILONERROR, true); + curl_easy_setopt(curl, CURLOPT_SHARE, g_curlShare); + curl_easy_setopt(curl, CURLOPT_NOPROGRESS, false); -void Download::MarkAsFinished(Download *dl) -{ - WDL_MutexLock lock(&s_mutex); + while(!thread->m_exit) { + while(Download *dl = thread->nextDownload()) + dl->exec(curl); - s_finished.push(dl); + ResetEvent(thread->m_wake); + WaitForSingleObject(thread->m_wake, INFINITE); + } + + curl_easy_cleanup(curl); + + return 0; } -Download *Download::NextFinished() +Download *DownloadThread::nextDownload() { - WDL_MutexLock lock(&s_mutex); - - if(!s_running) - plugin_register("-timer", (void *)TimerTick); + WDL_MutexLock lock(&m_mutex); - if(s_finished.empty()) + if(m_queue.empty()) return nullptr; - Download *dl = s_finished.front(); - s_finished.pop(); - - s_running--; - + Download *dl = m_queue.front(); + m_queue.pop(); return dl; } -void Download::setProgress(const short percent) +void DownloadThread::push(Download *dl) { WDL_MutexLock lock(&m_mutex); - - m_progress = percent; + m_queue.push(dl); + SetEvent(m_wake); } -void Download::finish(const State state, const string &contents) +DownloadQueue::~DownloadQueue() { - // called from the worker thread - - WDL_MutexLock lock(&m_mutex); - - m_state = state; - m_contents = contents; + // don't emit DownloadQueue::onAbort from the destructor + // which is most likely to cause a crash + m_onAbort.disconnect_all_slots(); - MarkAsFinished(this); + abort(); } -void Download::finishInMainThread() +void DownloadQueue::push(Download *dl) { - if(m_threadHandle) { - CloseHandle(m_threadHandle); - m_threadHandle = nullptr; - } + m_onPush(dl); + m_running.insert(dl); - m_onFinish(); - m_cleanupHandler(); -} + dl->setCleanupHandler([=] { + delete dl; -auto Download::state() -> State -{ - WDL_MutexLock lock(&m_mutex); + m_running.erase(dl); - return m_state; -} + // call m_onDone() only after every onFinish slots ran + if(m_running.empty()) + m_onDone(); + }); -const string &Download::contents() -{ - WDL_MutexLock lock(&m_mutex); + auto &thread = m_pool[m_running.size() % m_pool.size()]; + if(!thread) + thread = make_unique<DownloadThread>(); - return m_contents; + thread->push(dl); } -bool Download::isAborted() +void DownloadQueue::abort() { - WDL_MutexLock lock(&m_mutex); + for(Download *dl : m_running) + dl->abort(); - return m_aborted; + m_onAbort(); } -short Download::progress() +DownloadNotifier::DownloadNotifier() : m_active(0) { - WDL_MutexLock lock(&m_mutex); - - return m_progress; + assert(!s_instance); } -void Download::abort() +DownloadNotifier *DownloadNotifier::get() { - WDL_MutexLock lock(&m_mutex); + if(!s_instance) + s_instance = new DownloadNotifier; - m_aborted = true; + return s_instance; } -DownloadQueue::~DownloadQueue() +void DownloadNotifier::start() { - // don't emit DownloadQueue::onAbort from the destructor - // which is most likely to cause a crash - m_onAbort.disconnect_all_slots(); - - abort(); + if(!m_active++) + plugin_register("timer", (void *)tick); } -void DownloadQueue::push(Download *dl) +void DownloadNotifier::stop() { - m_onPush(dl); - - dl->onFinish([=] { - m_running.erase(remove(m_running.begin(), m_running.end(), dl)); - - start(); - }); - - dl->setCleanupHandler([=] { - delete dl; - - // call m_onDone() only after every onFinish slots ran - if(idle()) - m_onDone(); - }); - - m_queue.push(dl); - - start(); + --m_active; } -void DownloadQueue::start() +void DownloadNotifier::notify(const Notification &notif) { - while(m_running.size() < CONCURRENT_DOWNLOADS && !m_queue.empty()) { - Download *dl = m_queue.front(); - m_queue.pop(); + WDL_MutexLock lock(&m_mutex); - m_running.push_back(dl); - dl->start(); - } + m_queue.push(notif); } -void DownloadQueue::abort() +void DownloadNotifier::tick() { - for(Download *dl : m_running) - dl->abort(); + DownloadNotifier *instance = DownloadNotifier::get(); - clear(); + WDL_MutexLock lock(&instance->m_mutex); - m_onAbort(); -} + while(!instance->m_queue.empty()) { + const Notification notif = instance->m_queue.front(); + instance->m_queue.pop(); -void DownloadQueue::clear() -{ - while(!m_queue.empty()) { - Download *download = m_queue.front(); - delete download; + notif.first->setState(notif.second); + // `this` can be destroyed here + } - m_queue.pop(); + if(!instance->m_active) { + plugin_register("-timer", (void *)tick); + s_instance = nullptr; + delete s_instance; } } diff --git a/src/download.hpp b/src/download.hpp @@ -20,19 +20,21 @@ #include "config.hpp" +#include <array> +#include <atomic> #include <functional> #include <queue> #include <string> -#include <vector> +#include <unordered_set> #include <boost/signals2.hpp> #include <WDL/mutex.h> +#include <curl/curl.h> #include <reaper_plugin.h> class Download { public: - typedef std::queue<Download *> Queue; typedef boost::signals2::signal<void ()> VoidSignal; typedef std::function<void ()> CleanupHandler; @@ -60,7 +62,8 @@ public: const NetworkOpts &options() const { return m_opts; } bool has(Flag f) const { return (m_flags & f) != 0; } - State state(); + void setState(State); + State state() const { return m_state; } const std::string &contents(); bool isAborted(); short progress(); @@ -71,24 +74,15 @@ public: void start(); void abort(); - void wait(); + + void exec(CURL *); private: - static WDL_Mutex s_mutex; - static Queue s_finished; - static size_t s_running; - static void RegisterStart(); - static Download *NextFinished(); - static void MarkAsFinished(Download *); - - static void TimerTick(); static size_t WriteData(char *, size_t, size_t, void *); static int UpdateProgress(void *, double, double, double, double); - static DWORD WINAPI Worker(void *ptr); void setProgress(short); void finish(const State state, const std::string &contents); - void finishInMainThread(); void reset(); std::string m_name; @@ -98,7 +92,6 @@ private: WDL_Mutex m_mutex; - HANDLE m_threadHandle; State m_state; bool m_aborted; std::string m_contents; @@ -109,6 +102,25 @@ private: CleanupHandler m_cleanupHandler; }; +class DownloadThread { +public: + DownloadThread(); + ~DownloadThread(); + + void push(Download *); + void clear(); + +private: + static DWORD WINAPI thread(void *); + Download *nextDownload(); + + HANDLE m_wake; + HANDLE m_thread; + std::atomic_bool m_exit; + WDL_Mutex m_mutex; + std::queue<Download *> m_queue; +}; + class DownloadQueue { public: typedef boost::signals2::signal<void ()> VoidSignal; @@ -119,24 +131,45 @@ public: ~DownloadQueue(); void push(Download *); - void start(); void abort(); - bool idle() const { return m_queue.empty() && m_running.empty(); } + bool idle() const { return m_running.empty(); } void onPush(const DownloadSignal::slot_type &slot) { m_onPush.connect(slot); } void onAbort(const VoidSignal::slot_type &slot) { m_onAbort.connect(slot); } void onDone(const VoidSignal::slot_type &slot) { m_onDone.connect(slot); } private: - void clear(); - - Download::Queue m_queue; - std::vector<Download *> m_running; + std::array<std::unique_ptr<DownloadThread>, 3> m_pool; + std::unordered_set<Download *> m_running; DownloadSignal m_onPush; VoidSignal m_onAbort; VoidSignal m_onDone; }; +// This singleton class receives state change notifications from a +// worker thread and applies them in the main thread +class DownloadNotifier { + typedef std::pair<Download *, Download::State> Notification; + +public: + static DownloadNotifier *get(); + + DownloadNotifier(); + + void start(); + void stop(); + + void notify(const Notification &); + +private: + static DownloadNotifier *s_instance; + static void tick(); + + WDL_Mutex m_mutex; + size_t m_active; + std::queue<Notification> m_queue; +}; + #endif