commit 89b4f13cd3c918bbb28c592f46d249197a73fd94
parent 20d2df785cda701dcbb59572a035b97ca19e9409
Author: cfillion <cfillion@users.noreply.github.com>
Date: Thu, 27 Sep 2018 22:17:16 -0400
unify synchronous and asynchronous event handling and drop boost::signal2
Diffstat:
25 files changed, 406 insertions(+), 212 deletions(-)
diff --git a/src/about.cpp b/src/about.cpp
@@ -58,11 +58,11 @@ void About::onInit()
m_desc = createControl<RichEdit>(IDC_ABOUT);
m_menu = createControl<ListView>(IDC_MENU);
- m_menu->onSelect(bind(&About::updateList, this));
+ m_menu->onSelect >> bind(&About::updateList, this);
m_list = createControl<ListView>(IDC_LIST);
- m_list->onContextMenu([=] (Menu &m, int i) { return m_delegate->fillContextMenu(m, i); });
- m_list->onActivate([=] { m_delegate->itemActivated(); });
+ m_list->onFillContextMenu >> [=] (Menu &m, int i) { return m_delegate->fillContextMenu(m, i); };
+ m_list->onActivate >> [=] { m_delegate->itemActivated(); };
setMinimumSize({560, 300});
setAnchor(m_tabs->handle(), AnchorRight | AnchorBottom);
diff --git a/src/api_package.cpp b/src/api_package.cpp
@@ -56,7 +56,7 @@ The repository index is downloaded asynchronously if the cached copy doesn't exi
const vector<Remote> repos = {repo};
tx->fetchIndexes(repos);
- tx->onFinish([=] {
+ tx->onFinish >> [=] {
const auto &indexes = tx->getIndexes(repos);
if(indexes.empty())
return;
@@ -64,7 +64,7 @@ The repository index is downloaded asynchronously if the cached copy doesn't exi
const Package *pkg = indexes.front()->find(entryCopy.category, entryCopy.package);
if(pkg)
g_reapack->about()->setDelegate(make_shared<AboutPackageDelegate>(pkg, entryCopy.version));
- });
+ };
tx->runTasks();
return true;
diff --git a/src/archive_tasks.cpp b/src/archive_tasks.cpp
@@ -81,10 +81,10 @@ bool ExportTask::start()
// because we cannot safely write into the zip from more than one
// thread at the same time.
for(FileCompressor *job : jobs) {
- job->onFinish([=] {
+ job->onFinishAsync >> [=] {
if(job->state() == ThreadTask::Success)
tx()->receipt()->addExport(job->path());
- });
+ };
tx()->threadPool()->push(job);
}
diff --git a/src/browser.cpp b/src/browser.cpp
@@ -69,9 +69,10 @@ void Browser::onInit()
{"Last Update", 105, 0, ListView::TimeType},
});
- m_list->onActivate([=] { aboutPackage(m_list->itemUnderMouse()); });
- m_list->onSelect(bind(&Browser::onSelection, this));
- m_list->onContextMenu(bind(&Browser::fillContextMenu, this, _1, _2));
+ m_list->onActivate >> [=] { aboutPackage(m_list->itemUnderMouse()); };
+ m_list->onSelect >> bind(&Browser::onSelection, this);
+ m_list->onFillContextMenu >> bind(&Browser::fillContextMenu, this,
+ placeholders::_1, placeholders::_2);
m_list->sortByColumn(1);
Dialog::onInit();
@@ -134,25 +135,25 @@ void Browser::onCommand(const int id, const int event)
actionsButton();
break;
case ACTION_LATEST:
- currentDo(bind(&Browser::installLatest, this, _1, true));
+ currentDo(bind(&Browser::installLatest, this, placeholders::_1, true));
break;
case ACTION_LATEST_ALL:
installLatestAll();
break;
case ACTION_REINSTALL:
- currentDo(bind(&Browser::reinstall, this, _1, true));
+ currentDo(bind(&Browser::reinstall, this, placeholders::_1, true));
break;
case ACTION_REINSTALL_ALL:
- selectionDo(bind(&Browser::reinstall, this, _1, false));
+ selectionDo(bind(&Browser::reinstall, this, placeholders::_1, false));
break;
case ACTION_UNINSTALL:
- currentDo(bind(&Browser::uninstall, this, _1, true));
+ currentDo(bind(&Browser::uninstall, this, placeholders::_1, true));
break;
case ACTION_UNINSTALL_ALL:
- selectionDo(bind(&Browser::uninstall, this, _1, false));
+ selectionDo(bind(&Browser::uninstall, this, placeholders::_1, false));
break;
case ACTION_PIN:
- currentDo(bind(&Browser::togglePin, this, _1));
+ currentDo(bind(&Browser::togglePin, this, placeholders::_1));
break;
case ACTION_ABOUT_PKG:
aboutPackage(m_currentIndex);
@@ -161,7 +162,7 @@ void Browser::onCommand(const int id, const int event)
aboutRemote(m_currentIndex);
break;
case ACTION_RESET_ALL:
- selectionDo(bind(&Browser::resetActions, this, _1));
+ selectionDo(bind(&Browser::resetActions, this, placeholders::_1));
break;
case ACTION_COPY:
copy();
@@ -193,7 +194,7 @@ void Browser::onCommand(const int id, const int event)
break;
default:
if(id >> 8 == ACTION_VERSION)
- currentDo(bind(&Browser::installVersion, this, _1, id & 0xff));
+ currentDo(bind(&Browser::installVersion, this, placeholders::_1, id & 0xff));
else if(id >> 8 == ACTION_FILTERTYPE) {
m_typeFilter = static_cast<Package::Type>(id & 0xff);
fillList();
@@ -437,7 +438,7 @@ void Browser::refresh(const bool stale)
m_loadState = Loading;
tx->fetchIndexes(remotes, stale);
- tx->onFinish([=] {
+ tx->onFinish >> [=] {
if(isFirstLoad || isVisible()) {
populate(tx->getIndexes(remotes), tx->registry());
@@ -451,7 +452,7 @@ void Browser::refresh(const bool stale)
// before it could finished fetching the up to date indexes.
close();
}
- });
+ };
tx->runTasks();
}
@@ -642,7 +643,7 @@ void Browser::installLatestAll()
}
}
- selectionDo(bind(&Browser::installLatest, this, _1, false));
+ selectionDo(bind(&Browser::installLatest, this, placeholders::_1, false));
}
void Browser::installLatest(const int index, const bool toggle)
diff --git a/src/download.cpp b/src/download.cpp
@@ -20,6 +20,8 @@
#include "filesystem.hpp"
#include "reapack.hpp"
+#include <cassert>
+
#include <reaper_plugin_functions.h>
using namespace std;
diff --git a/src/event.cpp b/src/event.cpp
@@ -19,36 +19,36 @@
#include <reaper_plugin_functions.h>
-static std::weak_ptr<EventLoop> s_loop;
+static std::weak_ptr<AsyncEventImpl::Loop> s_loop;
-EventLoop::EventLoop()
+AsyncEventImpl::Loop::Loop()
{
plugin_register("timer", reinterpret_cast<void *>(&mainThreadTimer));
}
-EventLoop::~EventLoop()
+AsyncEventImpl::Loop::~Loop()
{
plugin_register("-timer", reinterpret_cast<void *>(&mainThreadTimer));
}
-void EventLoop::mainThreadTimer()
+void AsyncEventImpl::Loop::mainThreadTimer()
{
s_loop.lock()->processQueue();
}
-void EventLoop::push(EventEmitter *emitter, const Event event)
+void AsyncEventImpl::Loop::push(const MainThreadFunc &event, const void *source)
{
std::lock_guard<std::mutex> guard(m_mutex);
- m_queue.insert({emitter, event});
+ m_queue.insert({source, event});
}
-void EventLoop::forget(EventEmitter *emitter)
+void AsyncEventImpl::Loop::forget(const void *source)
{
std::lock_guard<std::mutex> guard(m_mutex);
- m_queue.erase(emitter);
+ m_queue.erase(source);
}
-void EventLoop::processQueue()
+void AsyncEventImpl::Loop::processQueue()
{
decltype(m_queue) events;
@@ -57,25 +57,25 @@ void EventLoop::processQueue()
std::swap(events, m_queue);
}
- for(const auto &[emitter, event] : events)
- emitter->eventHandler(event);
+ for(const auto &[emitter, func] : events)
+ func();
}
-EventEmitter::EventEmitter()
+AsyncEventImpl::Emitter::Emitter()
{
if(s_loop.expired())
- s_loop = m_loop = std::make_shared<EventLoop>();
+ s_loop = m_loop = std::make_shared<Loop>();
else
m_loop = s_loop.lock();
}
-EventEmitter::~EventEmitter()
+AsyncEventImpl::Emitter::~Emitter()
{
if(s_loop.use_count() > 1)
m_loop->forget(this);
}
-void EventEmitter::emit(const Event event)
+void AsyncEventImpl::Emitter::runInMainThread(const MainThreadFunc &event) const
{
- m_loop->push(this, event);
+ m_loop->push(event, this);
}
diff --git a/src/event.hpp b/src/event.hpp
@@ -18,42 +18,122 @@
#ifndef REAPACK_EVENT_HPP
#define REAPACK_EVENT_HPP
+#include <functional>
+#include <future>
#include <map>
#include <memory>
#include <mutex>
+#include <optional>
+#include <vector>
-typedef intptr_t Event;
-class EventEmitter;
+template<class T>
+class Event;
-class EventLoop {
+template<class R, class... Args>
+class Event<R(Args...)> {
public:
- EventLoop();
- ~EventLoop();
+ typedef std::function<R(Args...)> Handler;
+ typedef typename std::conditional<
+ std::is_void<R>::value, void, std::optional<R>>::type ReturnType;
- void push(EventEmitter *, Event);
- void forget(EventEmitter *);
+ Event() = default;
+ Event(const Event &) = delete;
+
+ operator bool() const { return !m_handlers.empty(); }
+ void reset() { m_handlers.clear(); }
+
+ Event<R(Args...)> &operator>>(const Handler &func)
+ {
+ m_handlers.push_back(func);
+ return *this;
+ }
+
+ ReturnType operator()(Args... args) const
+ {
+ if constexpr (std::is_void<R>::value) {
+ for(const auto &func : m_handlers)
+ func(std::forward<Args>(args)...);
+ }
+ else {
+ ReturnType ret;
+ for(const auto &func : m_handlers)
+ ret = func(std::forward<Args>(args)...);
+ return ret;
+ }
+ }
private:
- static void mainThreadTimer();
- void processQueue();
+ std::vector<Handler> m_handlers;
+};
+
+namespace AsyncEventImpl {
+ typedef std::function<void ()> MainThreadFunc;
+
+ class Loop {
+ public:
+ Loop();
+ ~Loop();
+
+ void push(const MainThreadFunc &, const void *source = nullptr);
+ void forget(const void *source);
+
+ private:
+ static void mainThreadTimer();
+ void processQueue();
- std::mutex m_mutex;
- std::multimap<EventEmitter *, Event> m_queue;
+ std::mutex m_mutex;
+ std::multimap<const void *, MainThreadFunc> m_queue;
+ };
+
+ class Emitter {
+ public:
+ Emitter();
+ ~Emitter();
+
+ void runInMainThread(const MainThreadFunc &) const;
+
+ private:
+ std::shared_ptr<Loop> m_loop;
+ };
};
-class EventEmitter {
+template<class T>
+class AsyncEvent;
+
+template<class R, class... Args>
+class AsyncEvent<R(Args...)> : public Event<R(Args...)> {
public:
- EventEmitter();
- virtual ~EventEmitter();
+ using typename Event<R(Args...)>::ReturnType;
+
+ std::future<ReturnType> operator()(Args... args) const
+ {
+ auto promise = std::make_shared<std::promise<ReturnType>>();
+
+ // don't wait until the next timer tick to return nothing if there are no
+ // handlers currently subscribed to the event
+ if(!*this) {
+ if constexpr (std::is_void<R>::value)
+ promise->set_value();
+ else
+ promise->set_value(std::nullopt);
+
+ return promise->get_future();
+ }
- void emit(Event);
+ m_emitter.runInMainThread([=] {
+ if constexpr (std::is_void<R>::value) {
+ Event<R(Args...)>::operator()(args...);
+ promise->set_value();
+ }
+ else
+ promise->set_value(Event<R(Args...)>::operator()(args...));
+ });
-protected:
- friend EventLoop;
- virtual void eventHandler(Event) = 0;
+ return promise->get_future();
+ }
private:
- std::shared_ptr<EventLoop> m_loop;
+ AsyncEventImpl::Emitter m_emitter;
};
#endif
diff --git a/src/import.cpp b/src/import.cpp
@@ -91,8 +91,8 @@ ThreadPool *Import::setupPool()
m_state = OK;
m_pool = new ThreadPool;
- m_pool->onAbort([=] { if(!m_state) m_state = Aborted; });
- m_pool->onDone([=] {
+ m_pool->onAbort >> [=] { if(!m_state) m_state = Aborted; };
+ m_pool->onDone >> [=] {
setWaiting(false);
if(!m_state)
@@ -107,7 +107,7 @@ ThreadPool *Import::setupPool()
close();
else
SetFocus(m_url);
- });
+ };
}
return m_pool;
@@ -132,7 +132,7 @@ void Import::fetch()
MemoryDownload *dl = new MemoryDownload(url, opts);
++index;
- dl->onFinish([=] {
+ dl->onFinishAsync >> [=] {
switch(dl->state()) {
case ThreadTask::Success:
// copy for later use, as `dl` won't be around after this callback
@@ -148,7 +148,7 @@ void Import::fetch()
default:
break;
}
- });
+ };
setupPool()->push(dl);
}
diff --git a/src/install.cpp b/src/install.cpp
@@ -83,13 +83,13 @@ bool InstallTask::start()
void InstallTask::push(ThreadTask *job, const TempPath &path)
{
- job->onStart([=] { m_newFiles.push_back(path); });
- job->onFinish([=] {
+ job->onStartAsync >> [=] { m_newFiles.push_back(path); };
+ job->onFinishAsync >> [=] {
m_waiting.erase(job);
if(job->state() != ThreadTask::Success)
rollback();
- });
+ };
m_waiting.insert(job);
tx()->threadPool()->push(job);
diff --git a/src/listview.cpp b/src/listview.cpp
@@ -24,6 +24,7 @@
#include "win32.hpp"
#include <boost/algorithm/string/case_conv.hpp>
+#include <cassert>
using namespace std;
@@ -491,7 +492,7 @@ bool ListView::onContextMenu(HWND dialog, int x, int y)
Menu menu;
- if(!m_onContextMenu(menu, index).value_or(false))
+ if(!onFillContextMenu(menu, index).value_or(false))
return false;
menu.show(x, y, dialog);
@@ -506,7 +507,7 @@ void ListView::onItemChanged(const LPARAM lParam)
if(info->uChanged & LVIF_STATE)
#endif
- m_onSelect();
+ onSelect();
}
void ListView::onClick(const bool dbclick)
@@ -515,9 +516,9 @@ void ListView::onClick(const bool dbclick)
if(itemUnderMouse(&overIcon) > -1 && currentIndex() > -1) {
if(dbclick)
- m_onActivate();
+ onActivate();
else if(overIcon)
- m_onIconClick();
+ onIconClick();
}
}
diff --git a/src/listview.hpp b/src/listview.hpp
@@ -20,10 +20,10 @@
#include "control.hpp"
+#include "event.hpp"
#include "filter.hpp"
#include "serializer.hpp"
-#include <boost/signals2.hpp>
#include <functional>
#include <optional>
#include <vector>
@@ -106,9 +106,6 @@ public:
typedef std::vector<Column> Columns;
- typedef boost::signals2::signal<void ()> VoidSignal;
- typedef boost::signals2::signal<bool (Menu &, int index)> MenuSignal;
-
ListView(HWND handle, const Columns & = {});
void reserveRows(size_t count) { m_rows.reserve(count); }
@@ -153,10 +150,10 @@ public:
void saveState(Serializer::Data &) const;
void resetColumns();
- void onSelect(const VoidSignal::slot_type &slot) { m_onSelect.connect(slot); }
- void onIconClick(const VoidSignal::slot_type &slot) { m_onIconClick.connect(slot); }
- void onActivate(const VoidSignal::slot_type &slot) { m_onActivate.connect(slot); }
- void onContextMenu(const MenuSignal::slot_type &slot) { m_onContextMenu.connect(slot); }
+ Event<void()> onSelect;
+ Event<void()> onIconClick;
+ Event<void()> onActivate;
+ Event<bool(Menu &, int index)> onFillContextMenu;
protected:
friend Row;
@@ -203,11 +200,6 @@ private:
std::vector<RowPtr> m_rows;
std::optional<Sort> m_sort;
std::optional<Sort> m_defaultSort;
-
- VoidSignal m_onSelect;
- VoidSignal m_onIconClick;
- VoidSignal m_onActivate;
- MenuSignal m_onContextMenu;
};
#endif
diff --git a/src/manager.cpp b/src/manager.cpp
@@ -72,10 +72,11 @@ void Manager::onInit()
});
m_list->enableIcons();
- m_list->onSelect(bind(&Dialog::startTimer, this, 100, TIMER_ABOUT, true));
- m_list->onIconClick(bind(&Manager::toggleEnabled, this));
- m_list->onActivate(bind(&Manager::aboutRepo, this, true));
- m_list->onContextMenu(bind(&Manager::fillContextMenu, this, _1, _2));
+ m_list->onSelect >> bind(&Dialog::startTimer, this, 100, TIMER_ABOUT, true);
+ m_list->onIconClick >> bind(&Manager::toggleEnabled, this);
+ m_list->onActivate >> bind(&Manager::aboutRepo, this, true);
+ m_list->onFillContextMenu >> bind(&Manager::fillContextMenu, this,
+ placeholders::_1, placeholders::_2);
setAnchor(m_list->handle(), AnchorRight | AnchorBottom);
setAnchor(getControl(IDC_IMPORT), AnchorTop | AnchorBottom);
diff --git a/src/obsquery.cpp b/src/obsquery.cpp
@@ -44,12 +44,12 @@ void ObsoleteQuery::onInit()
{"Package", 550}
});
- m_list->onSelect([=] { setEnabled(m_list->hasSelection(), m_okBtn); });
- m_list->onContextMenu([=] (Menu &menu, int) {
+ m_list->onSelect >> [=] { setEnabled(m_list->hasSelection(), m_okBtn); };
+ m_list->onFillContextMenu >> [=] (Menu &menu, int) {
menu.addAction("Select &all", ACTION_SELECT_ALL);
menu.addAction("&Unselect all", ACTION_UNSELECT_ALL);
return true;
- });
+ };
m_list->reserveRows(m_entries->size());
diff --git a/src/progress.cpp b/src/progress.cpp
@@ -30,7 +30,7 @@ Progress::Progress(ThreadPool *pool)
m_pool(pool), m_label(nullptr), m_progress(nullptr),
m_done(0), m_total(0)
{
- m_pool->onPush(bind(&Progress::addTask, this, _1));
+ m_pool->onPush >> bind(&Progress::addTask, this, placeholders::_1);
}
void Progress::onInit()
@@ -75,15 +75,15 @@ void Progress::addTask(ThreadTask *task)
if(!isVisible())
startTimer(100);
- task->onStart([=] {
+ task->onStartAsync >> [=] {
m_current = task->summary();
updateProgress();
- });
+ };
- task->onFinish([=] {
+ task->onFinishAsync >> [=] {
m_done++;
updateProgress();
- });
+ };
}
void Progress::updateProgress()
diff --git a/src/reapack.cpp b/src/reapack.cpp
@@ -32,6 +32,8 @@
#include "transaction.hpp"
#include "win32.hpp"
+#include <cassert>
+
#include <reaper_plugin_functions.h>
using namespace std;
@@ -182,10 +184,10 @@ void ReaPack::uninstall(const Remote &remote)
assert(m_tx);
m_tx->uninstall(remote);
- m_tx->onFinish([=] {
+ m_tx->onFinish >> [=] {
if(!m_tx->isCancelled())
config()->remotes.remove(remote);
- });
+ };
}
void ReaPack::importRemote()
@@ -224,11 +226,11 @@ void ReaPack::about(const Remote &repo, const bool focus)
const vector<Remote> repos = {repo};
tx->fetchIndexes(repos);
- tx->onFinish([=] {
+ tx->onFinish >> [=] {
const auto &indexes = tx->getIndexes(repos);
if(!indexes.empty())
about()->setDelegate(make_shared<AboutIndexDelegate>(indexes.front()), focus);
- });
+ };
tx->runTasks();
}
@@ -285,7 +287,7 @@ Transaction *ReaPack::setupTransaction()
assert(!m_progress);
m_progress = Dialog::Create<Progress>(m_instance, m_mainWindow, m_tx->threadPool());
- m_tx->onFinish([=] {
+ m_tx->onFinish >> [=] {
m_progress.reset();
if(!m_tx->isCancelled() && !m_tx->receipt()->empty()) {
@@ -294,7 +296,7 @@ Transaction *ReaPack::setupTransaction()
Dialog::Show<Report>(m_instance, m_mainWindow, m_tx->receipt());
}
- });
+ };
m_tx->setObsoleteHandler([=] (vector<Registry::Entry> &entries) {
LockDialog aboutLock(m_about.get());
@@ -329,9 +331,9 @@ void ReaPack::commitConfig(bool refresh)
if(m_tx) {
if(refresh) {
m_tx->receipt()->setIndexChanged(); // force browser refresh
- m_tx->onFinish(bind(&ReaPack::refreshManager, this));
+ m_tx->onFinish >> bind(&ReaPack::refreshManager, this);
}
- m_tx->onFinish(bind(&Config::write, &m_config));
+ m_tx->onFinish >> bind(&Config::write, &m_config);
m_tx->runTasks();
}
else {
diff --git a/src/report.cpp b/src/report.cpp
@@ -34,9 +34,9 @@ void Report::onInit()
Dialog::onInit();
m_tabbar = createControl<TabBar>(IDC_TABS, this);
- m_tabbar->onTabChange([=] (const int i) {
+ m_tabbar->onTabChange >> [=] (const int i) {
Win32::setWindowText(getControl(IDC_REPORT), m_pages[i].c_str());
- });
+ };
const ReceiptPage pages[] = {
m_receipt->installedPage(),
diff --git a/src/synchronize.cpp b/src/synchronize.cpp
@@ -46,10 +46,10 @@ bool SynchronizeTask::start()
netConfig, Download::NoCacheFlag);
dl->setName(m_remote.name());
- dl->onFinish([=] {
+ dl->onFinishAsync >> [=] {
if(dl->save())
tx()->receipt()->setIndexChanged();
- });
+ };
tx()->threadPool()->push(dl);
return true;
diff --git a/src/tabbar.cpp b/src/tabbar.cpp
@@ -109,7 +109,7 @@ void TabBar::switchPage()
}
const int index = currentIndex();
- m_onTabChange(index);
+ onTabChange(index);
if(index < 0 || (size_t)index >= m_pages.size()) {
m_lastPage = -1;
diff --git a/src/tabbar.hpp b/src/tabbar.hpp
@@ -20,15 +20,14 @@
#include "control.hpp"
-#include <boost/signals2.hpp>
+#include "event.hpp"
+
#include <vector>
class Dialog;
class TabBar : public Control {
public:
- typedef boost::signals2::signal<void (int index)> TabSignal;
-
typedef std::vector<HWND> Page;
struct Tab { const char *text; Page page; };
typedef std::vector<Tab> Tabs;
@@ -40,7 +39,8 @@ public:
void setFocus();
int count() const;
void clear();
- void onTabChange(const TabSignal::slot_type &slot) { m_onTabChange.connect(slot); }
+
+ Event<void(int index)> onTabChange;
protected:
void onNotify(LPNMHDR, LPARAM) override;
@@ -51,7 +51,6 @@ private:
Dialog *m_parent;
int m_lastPage;
std::vector<Page> m_pages;
- TabSignal m_onTabChange;
};
#endif
diff --git a/src/thread.cpp b/src/thread.cpp
@@ -34,48 +34,20 @@ ThreadTask::~ThreadTask()
{
}
-void ThreadTask::eventHandler(const Event event)
-{
- const State state = static_cast<State>(event);
-
- // The task may have been aborted while the task was running or just before
- // the finish notification got received in the main thread.
- m_state = aborted() ? Aborted : state;
-
- switch(state) {
- case Idle:
- case Queued:
- break;
- case Running:
- m_onStart();
- break;
- case Success:
- case Failure:
- case Aborted:
- m_onFinish();
- break;
- }
-}
-
-void ThreadTask::onFinish(const VoidSignal::slot_type &slot)
-{
- // The task has a slot deleting itself at this point, accepting
- // any more slots at this point is a very bad idea.
- assert(m_state < Queued);
-
- m_onFinish.connect(slot);
-}
-
void ThreadTask::exec()
{
- State state = Aborted;
+ State state = Idle;
if(!aborted()) {
- emit(Running);
+ onStartAsync();
state = run() ? Success : Failure;
}
- emit(state);
+ if(aborted()) // may have changed while the task was running
+ state = Aborted;
+
+ m_state = state;
+ onFinishAsync();
}
WorkerThread::WorkerThread() : m_stop(false), m_thread(&WorkerThread::run, this)
@@ -147,25 +119,25 @@ ThreadPool::~ThreadPool()
{
// don't emit ThreadPool::onAbort from the destructor
// which is most likely to cause a crash
- m_onAbort.disconnect_all_slots();
+ onAbort.reset();
abort();
}
void ThreadPool::push(ThreadTask *task)
{
- m_onPush(task);
+ onPush(task);
m_running.insert(task);
- task->onFinish([=] {
+ task->onFinishAsync >> [=] {
m_running.erase(task);
delete task;
// call m_onDone() only after every onFinish slots ran
if(m_running.empty())
- m_onDone();
- });
+ onDone();
+ };
const size_t nextThread = m_running.size() % m_pool.size();
auto &thread = task->concurrent() ? m_pool[nextThread] : m_pool.front();
@@ -180,5 +152,5 @@ void ThreadPool::abort()
for(ThreadTask *task : m_running)
task->abort();
- m_onAbort();
+ onAbort();
}
diff --git a/src/thread.hpp b/src/thread.hpp
@@ -28,9 +28,7 @@
#include <thread>
#include <unordered_set>
-#include <boost/signals2.hpp>
-
-class ThreadTask : public EventEmitter {
+class ThreadTask {
public:
enum State {
Idle,
@@ -41,8 +39,6 @@ public:
Aborted,
};
- typedef boost::signals2::signal<void ()> VoidSignal;
-
ThreadTask();
virtual ~ThreadTask();
@@ -54,15 +50,14 @@ public:
void setError(const ErrorInfo &err) { m_error = err; }
const ErrorInfo &error() { return m_error; }
- void onStart(const VoidSignal::slot_type &slot) { m_onStart.connect(slot); }
- void onFinish(const VoidSignal::slot_type &slot);
-
bool aborted() const { return m_abort; }
void abort() { m_abort = true; }
+ AsyncEvent<void()> onStartAsync;
+ AsyncEvent<void()> onFinishAsync;
+
protected:
virtual bool run() = 0;
- void eventHandler(Event) override;
void setSummary(const std::string &s) { m_summary = s; }
@@ -71,9 +66,6 @@ private:
State m_state;
ErrorInfo m_error;
std::atomic_bool m_abort;
-
- VoidSignal m_onStart;
- VoidSignal m_onFinish;
};
class WorkerThread {
@@ -98,9 +90,6 @@ private:
class ThreadPool {
public:
- typedef boost::signals2::signal<void ()> VoidSignal;
- typedef boost::signals2::signal<void (ThreadTask *)> TaskSignal;
-
ThreadPool() {}
ThreadPool(const ThreadPool &) = delete;
~ThreadPool();
@@ -110,17 +99,13 @@ public:
bool idle() const { return m_running.empty(); }
- void onPush(const TaskSignal::slot_type &slot) { m_onPush.connect(slot); }
- void onAbort(const VoidSignal::slot_type &slot) { m_onAbort.connect(slot); }
- void onDone(const VoidSignal::slot_type &slot) { m_onDone.connect(slot); }
+ Event<void(ThreadTask *)> onPush;
+ Event<void()> onAbort;
+ Event<void()> onDone;
private:
std::array<std::unique_ptr<WorkerThread>, 3> m_pool;
std::unordered_set<ThreadTask *> m_running;
-
- TaskSignal m_onPush;
- VoidSignal m_onAbort;
- VoidSignal m_onDone;
};
#endif
diff --git a/src/transaction.cpp b/src/transaction.cpp
@@ -26,6 +26,8 @@
#include "remote.hpp"
#include "task.hpp"
+#include <cassert>
+
#include <reaper_plugin_functions.h>
using namespace std;
@@ -33,20 +35,20 @@ using namespace std;
Transaction::Transaction()
: m_isCancelled(false), m_registry(Path::REGISTRY.prependRoot())
{
- m_threadPool.onPush([this] (ThreadTask *task) {
- task->onFinish([=] {
+ m_threadPool.onPush >> [this] (ThreadTask *task) {
+ task->onFinishAsync >> [=] {
if(task->state() == ThreadTask::Failure)
m_receipt.addError(task->error());
- });
- });
+ };
+ };
- m_threadPool.onAbort([this] {
+ m_threadPool.onAbort >> [this] {
m_isCancelled = true;
queue<HostTicket>().swap(m_regQueue);
- });
+ };
// run the next task queue when the current one is done
- m_threadPool.onDone(bind(&Transaction::runTasks, this));
+ m_threadPool.onDone >> bind(&Transaction::runTasks, this);
}
void Transaction::synchronize(const Remote &remote,
@@ -213,7 +215,7 @@ void Transaction::finish()
m_registry.commit();
registerQueued();
- m_onFinish();
+ onFinish();
m_cleanupHandler();
}
diff --git a/src/transaction.hpp b/src/transaction.hpp
@@ -18,12 +18,12 @@
#ifndef REAPACK_TRANSACTION_HPP
#define REAPACK_TRANSACTION_HPP
+#include "event.hpp"
#include "receipt.hpp"
#include "registry.hpp"
#include "task.hpp"
#include "thread.hpp"
-#include <boost/signals2.hpp>
#include <functional>
#include <memory>
#include <optional>
@@ -43,13 +43,11 @@ struct HostTicket { bool add; Registry::Entry entry; Registry::File file; };
class Transaction {
public:
- typedef boost::signals2::signal<void ()> VoidSignal;
typedef std::function<void()> CleanupHandler;
typedef std::function<bool(std::vector<Registry::Entry> &)> ObsoleteHandler;
Transaction();
- void onFinish(const VoidSignal::slot_type &slot) { m_onFinish.connect(slot); }
void setCleanupHandler(const CleanupHandler &cb) { m_cleanupHandler = cb; }
void setObsoleteHandler(const ObsoleteHandler &cb) { m_promptObsolete = cb; }
@@ -72,6 +70,8 @@ public:
Registry *registry() { return &m_registry; }
ThreadPool *threadPool() { return &m_threadPool; }
+ Event<void()> onFinish;
+
protected:
friend SynchronizeTask;
friend InstallTask;
@@ -117,7 +117,6 @@ private:
std::queue<TaskPtr> m_runningTasks;
std::queue<HostTicket> m_regQueue;
- VoidSignal m_onFinish;
CleanupHandler m_cleanupHandler;
ObsoleteHandler m_promptObsolete;
};
diff --git a/test/event.cpp b/test/event.cpp
@@ -4,9 +4,88 @@
#include <reaper_plugin_functions.h>
-static constexpr const char *M = "[event]";
+constexpr const char *M = "[event]";
-TEST_CASE("multiple EventEmitter registers a single timer", M) {
+TEST_CASE("check whether an event has handlers subscribed to it", M) {
+ Event<void()> e;
+ REQUIRE_FALSE(e);
+ e >> []{};
+ REQUIRE(e);
+}
+
+TEST_CASE("remove all handlers subscribed to an event", M) {
+ bool run = false;
+ Event<void()> e;
+ e >> [&]{ run = true; };
+
+ e.reset();
+ CHECK_FALSE(e);
+
+ e();
+ REQUIRE_FALSE(run);
+}
+
+TEST_CASE("Event<void(...)> handlers are run in order", M) {
+ std::vector<int> bucket;
+
+ Event<void()> e;
+ e >> [&] { bucket.push_back(1); }
+ >> [&] { bucket.push_back(2); };
+
+ e();
+
+ REQUIRE(bucket == decltype(bucket){1, 2});
+}
+
+TEST_CASE("Event<R(...)> handlers are run in order", M) {
+ std::vector<int> bucket;
+
+ Event<std::nullptr_t()> e;
+ e >> [&] { bucket.push_back(1); return nullptr; }
+ >> [&] { bucket.push_back(2); return nullptr; };
+
+ e();
+
+ REQUIRE(bucket == decltype(bucket){1, 2});
+}
+
+TEST_CASE("Event<R(...)> returns the last value") {
+ Event<int()> e;
+ REQUIRE_FALSE(e().has_value());
+
+ e >> [] { return 1; }
+ >> [] { return 2; }
+ >> [] { return 3; };
+
+ REQUIRE(e().has_value());
+ REQUIRE(*e() == 3);
+}
+
+TEST_CASE("Event<void(...)> arguments are not copied more than necessary", M) {
+ auto obj = std::make_shared<std::nullptr_t>();
+ Event<void(decltype(obj), decltype(obj))> e;
+
+ e >> [&obj](decltype(obj) a, decltype(obj) b) {
+ // original copy + by-value parameter copies
+ REQUIRE(obj.use_count() == 1 + 2);
+ };
+
+ e(obj, obj);
+};
+
+TEST_CASE("Event<R(...)> arguments are not copied more than necessary", M) {
+ auto obj = std::make_shared<std::nullptr_t>();
+ Event<std::nullptr_t(decltype(obj), decltype(obj))> e;
+
+ e >> [&obj](decltype(obj) a, decltype(obj) b) {
+ REQUIRE(obj.use_count() == 1 + 2);
+ return nullptr;
+ };
+
+ e(obj, obj);
+};
+
+TEST_CASE("multiple AsyncEvent registers a single timer", M) {
static std::vector<std::string> registered;
plugin_register = [](const char *n, void *) {
@@ -15,52 +94,131 @@ TEST_CASE("multiple EventEmitter registers a single timer", M) {
};
{
- struct : EventEmitter { void eventHandler(Event) {} } e1, e2;
- REQUIRE(registered == std::vector<std::string>{"timer"});
+ AsyncEvent<void()> e1, e2;
+ REQUIRE(registered == decltype(registered){"timer"});
}
- REQUIRE(registered == std::vector<std::string>{"timer", "-timer"});
+ REQUIRE(registered == decltype(registered){"timer", "-timer"});
}
-TEST_CASE("EventEmitter calls eventHandler from timer and in order", M) {
+TEST_CASE("AsyncEvent handlers are run asynchronously in order", M) {
static void (*tick)() = nullptr;
plugin_register = [](const char *, void *c) { tick = (void(*)())c; return 0; };
- struct : EventEmitter {
- void eventHandler(Event e) { bucket.push_back(e); }
- std::vector<Event> bucket;
- } e;
+ std::vector<size_t> bucket;
+
+ AsyncEvent<void(size_t)> e;
+ e >> [&](size_t i) { bucket.push_back(i); }
+ >> [&](size_t i) { bucket.push_back(i * 10); };
- e.emit(1);
- e.emit(2);
- e.emit(3);
- CHECK(e.bucket.empty());
+ e(1);
+ e(2);
+ e(3);
+
+ CHECK(bucket.empty());
tick();
- REQUIRE(e.bucket == std::vector<Event>{1, 2, 3});
+ REQUIRE(bucket == decltype(bucket){1, 10, 2, 20, 3, 30});
}
-TEST_CASE("events from deleted EventEmmiter are discarded", M) {
+TEST_CASE("AsyncEvent<void(...)> sets the future's value", M) {
static void (*tick)() = nullptr;
plugin_register = [](const char *, void *c) { tick = (void(*)())c; return 0; };
- struct Mock : EventEmitter { void eventHandler(Event) {} };
+ AsyncEvent<void()> e;
+ e >> []{};
+
+ std::future<void> ret = e();
- Mock a; // keep the timer alive
+ REQUIRE(ret.wait_for(std::chrono::seconds(0)) == std::future_status::timeout);
+ tick();
+ REQUIRE(ret.wait_for(std::chrono::seconds(0)) == std::future_status::ready);
+}
- Mock *b = new Mock();
- b->emit(0);
- delete b;
- memset((void *)b, 0, sizeof(Mock));
+TEST_CASE("AsyncEvent<R(...)> sets the future's value", M) {
+ static void (*tick)() = nullptr;
+ plugin_register = [](const char *, void *c) { tick = (void(*)())c; return 0; };
+
+ AsyncEvent<std::string()> e;
+ e >> []{ return "hello world"; }
+ >> []{ return "foo bar"; };
+
+ std::future<std::optional<std::string>> ret = e();
+
+ REQUIRE(ret.wait_for(std::chrono::seconds(0)) == std::future_status::timeout);
+ tick();
+ REQUIRE(ret.wait_for(std::chrono::seconds(0)) == std::future_status::ready);
+
+ std::optional<std::string> val = ret.get();
+ REQUIRE(val.has_value());
+ REQUIRE(*val == "foo bar");
+}
+
+TEST_CASE("running an AsyncEvent without handlers returns synchronously", M) {
+ plugin_register = [](const char *, void *c) { return 0; };
+
+ AsyncEvent<void()> e1;
+ AsyncEvent<int()> e2;
+
+ auto r1 = e1();
+ auto r2 = e2();
+
+ REQUIRE(r1.wait_for(std::chrono::seconds(0)) == std::future_status::ready);
+ REQUIRE_FALSE(r2.get().has_value());
+}
+
+TEST_CASE("AsyncEvent<void(...)> arguments are not copied more than necessary", M) {
+ static void (*tick)() = nullptr;
+ plugin_register = [](const char *, void *c) { tick = (void(*)())c; return 0; };
+
+ auto obj = std::make_shared<std::nullptr_t>();
+ AsyncEvent<void(decltype(obj), decltype(obj))> e;
+
+ e >> [&obj](decltype(obj) a, decltype(obj) b) {
+ REQUIRE(obj.use_count() == 1 + 2 + 2);
+ };
+
+ e(obj, obj);
+ tick();
+};
+
+TEST_CASE("AsyncEvent<R(...)> arguments are not copied more than necessary", M) {
+ static void (*tick)() = nullptr;
+ plugin_register = [](const char *, void *c) { tick = (void(*)())c; return 0; };
+
+ auto obj = std::make_shared<std::nullptr_t>();
+ AsyncEvent<std::nullptr_t(decltype(obj), decltype(obj))> e;
+
+ e >> [&obj](decltype(obj) a, decltype(obj) b) {
+ // original copy + async copies + by-value parameter copies
+ REQUIRE(obj.use_count() == 1 + 2 + 2);
+ return nullptr;
+ };
+
+ e(obj, obj);
+ tick();
+};
+
+TEST_CASE("deleted not-yet-posted AsyncEvents are discarded", M) {
+ static void (*tick)() = nullptr;
+ plugin_register = [](const char *, void *c) { tick = (void(*)())c; return 0; };
+
+ AsyncEvent<void()> keepTimerAlive;
+
+ {
+ AsyncEvent<void()> e;
+ e();
+ }
tick();
}
-TEST_CASE("deleting EventEmmiter from eventHandler is safe", M) {
+TEST_CASE("deleting AsyncEvent from handler is safe", M) {
static void (*tick)() = nullptr;
plugin_register = [](const char *, void *c) { tick = (void(*)())c; return 0; };
- struct Mock : EventEmitter { void eventHandler(Event) { delete this; } };
+ auto e = new AsyncEvent<void()>();
+ *e >> [e] { delete e; };
+ (*e)();
- (new Mock())->emit(0); // only one event!
tick();
}
diff --git a/vendor/vcpkg-deps.txt b/vendor/vcpkg-deps.txt
@@ -1 +1 @@
-boost-algorithm boost-core boost-logic boost-mpl boost-preprocessor boost-range boost-signals2 catch2 curl sqlite3
+boost-algorithm boost-core boost-logic boost-mpl boost-preprocessor boost-range catch2 curl sqlite3