commit 85e18580919496378ea5e89edf67b23d6d9392f8
parent 8783b9746ede48dc2709ab2ddbb212eb1a22eab5
Author: cfillion <cfillion@users.noreply.github.com>
Date: Sat, 16 Sep 2017 19:46:23 -0400
refactor synchronization and index loading so that it waits until runTasks
Diffstat:
7 files changed, 190 insertions(+), 123 deletions(-)
diff --git a/src/filesystem.cpp b/src/filesystem.cpp
@@ -179,6 +179,16 @@ bool FS::exists(const Path &path, const bool dir)
return false;
}
+bool FS::allFilesExists(const set<Path> &paths)
+{
+ for(const Path &path : paths) {
+ if(!exists(path))
+ return false;
+ }
+
+ return true;
+}
+
bool FS::mkdir(const Path &path)
{
if(exists(path, true))
diff --git a/src/filesystem.hpp b/src/filesystem.hpp
@@ -18,6 +18,7 @@
#ifndef REAPACK_FILESYSTEM_HPP
#define REAPACK_FILESYSTEM_HPP
+#include <set>
#include <string>
class Path;
@@ -34,6 +35,7 @@ namespace FS {
bool removeRecursive(const Path &);
bool mtime(const Path &, time_t *);
bool exists(const Path &, bool dir = false);
+ bool allFilesExists(const std::set<Path> &);
bool mkdir(const Path &);
const char *lastError();
diff --git a/src/task.cpp b/src/task.cpp
@@ -32,6 +32,81 @@ Task::Task(Transaction *tx) : m_tx(tx)
{
}
+SynchronizeTask::SynchronizeTask(const Remote &remote, const bool fullSync,
+ const InstallOpts &opts, Transaction *tx)
+ : Task(tx), m_remote(remote), m_indexPath(Index::pathFor(m_remote.name())),
+ m_opts(opts), m_fullSync(fullSync)
+{
+}
+
+bool SynchronizeTask::start()
+{
+ printf("start\n");
+ const auto &netConfig = g_reapack->config()->network;
+
+ time_t mtime = 0, now = time(nullptr);
+ FS::mtime(m_indexPath, &mtime);
+
+ const time_t threshold = netConfig.staleThreshold;
+ if(!m_fullSync && mtime && (!threshold || mtime > now - threshold))
+ return true;
+
+ auto dl = new FileDownload(m_indexPath, m_remote.url(),
+ netConfig, Download::NoCacheFlag);
+ dl->setName(m_remote.name());
+
+ dl->onFinish([=] {
+ if(dl->save())
+ tx()->receipt()->setIndexChanged();
+ });
+
+ tx()->threadPool()->push(dl);
+ return true;
+}
+
+void SynchronizeTask::commit()
+{
+ if(!FS::exists(m_indexPath))
+ return;
+
+ const IndexPtr &index = tx()->loadIndex(m_remote); // TODO: reuse m_indexPath
+ if(!index || !m_fullSync)
+ return;
+
+ for(const Package *pkg : index->packages())
+ synchronize(pkg);
+
+ if(m_opts.promptObsolete && !m_remote.isProtected()) {
+ for(const auto &entry : tx()->registry()->getEntries(m_remote.name())) {
+ if(!entry.pinned && !index->find(entry.category, entry.package))
+ tx()->addObsolete(entry);
+ }
+ }
+}
+
+void SynchronizeTask::synchronize(const Package *pkg)
+{
+ const auto &entry = tx()->registry()->getEntry(pkg);
+
+ if(!entry && !m_opts.autoInstall)
+ return;
+
+ const Version *latest = pkg->lastVersion(m_opts.bleedingEdge, entry.version);
+
+ if(!latest)
+ return;
+
+ if(entry.version == latest->name()) {
+ if(FS::allFilesExists(latest->files()))
+ return; // latest version is really installed, nothing to do here!
+ }
+ else if(entry.pinned || latest->name() < entry.version)
+ return;
+
+ printf("installing\n");
+ tx()->install(latest, entry);
+}
+
InstallTask::InstallTask(const Version *ver, const bool pin,
const Registry::Entry &re, const ArchiveReaderPtr &reader, Transaction *tx)
: Task(tx), m_version(ver), m_pin(pin), m_oldEntry(move(re)), m_reader(reader),
diff --git a/src/task.hpp b/src/task.hpp
@@ -18,8 +18,10 @@
#ifndef REAPACK_TASK_HPP
#define REAPACK_TASK_HPP
+#include "config.hpp"
#include "path.hpp"
#include "registry.hpp"
+#include "remote.hpp"
#include <set>
#include <unordered_set>
@@ -31,6 +33,7 @@ class Source;
class ThreadTask;
class Transaction;
class Version;
+struct InstallOpts;
typedef std::shared_ptr<ArchiveReader> ArchiveReaderPtr;
typedef std::shared_ptr<const Index> IndexPtr;
@@ -54,6 +57,24 @@ private:
Transaction *m_tx;
};
+class SynchronizeTask : public Task {
+public:
+ SynchronizeTask(const Remote &remote, bool fullSync,
+ const InstallOpts &, Transaction *);
+
+protected:
+ bool start() override;
+ void commit() override;
+
+private:
+ void synchronize(const Package *);
+
+ Remote m_remote;
+ Path m_indexPath;
+ InstallOpts m_opts;
+ bool m_fullSync;
+};
+
class InstallTask : public Task {
public:
InstallTask(const Version *ver, bool pin, const Registry::Entry &,
diff --git a/src/transaction.cpp b/src/transaction.cpp
@@ -46,7 +46,7 @@ Transaction::Transaction()
queue<HostTicket>().swap(m_regQueue);
});
- // run tasks after fetching indexes
+ // run the next task queue when the current one is done
m_threadPool.onDone(bind(&Transaction::runTasks, this));
}
@@ -61,47 +61,13 @@ void Transaction::synchronize(const Remote &remote,
InstallOpts opts = g_reapack->config()->install;
opts.autoInstall = remote.autoInstall(forceAutoInstall.value_or(opts.autoInstall));
- fetchIndex(remote, true, [=] (const IndexPtr &ri) {
- for(const Package *pkg : ri->packages())
- synchronize(pkg, opts);
-
- if(opts.promptObsolete && !remote.isProtected()) {
- for(const Registry::Entry &entry : m_registry.getEntries(ri->name())) {
- if(!entry.pinned && !ri->find(entry.category, entry.package))
- m_obsolete.insert(entry);
- }
- }
- });
-}
-
-void Transaction::synchronize(const Package *pkg, const InstallOpts &opts)
-{
- const auto ®Entry = m_registry.getEntry(pkg);
-
- if(!regEntry && !opts.autoInstall)
- return;
-
- const Version *latest = pkg->lastVersion(opts.bleedingEdge, regEntry.version);
-
- // don't crash nor install a pre-release if autoInstall is on with
- // bleedingEdge mode off and there is no stable release
- if(!latest)
- return;
-
- if(regEntry.version == latest->name()) {
- if(allFilesExists(latest->files()))
- return; // latest version is really installed, nothing to do here!
- }
- else if(regEntry.pinned || latest->name() < regEntry.version)
- return;
-
- m_nextQueue.push(make_shared<InstallTask>(latest, false, regEntry, nullptr, this));
+ m_nextQueue.push(make_shared<SynchronizeTask>(remote, true, opts, this));
}
void Transaction::fetchIndexes(const vector<Remote> &remotes, const bool stale)
{
for(const Remote &remote : remotes)
- fetchIndex(remote, stale);
+ m_nextQueue.push(make_shared<SynchronizeTask>(remote, false, InstallOpts{}, this));
}
vector<IndexPtr> Transaction::getIndexes(const vector<Remote> &remotes) const
@@ -117,43 +83,6 @@ vector<IndexPtr> Transaction::getIndexes(const vector<Remote> &remotes) const
return indexes;
}
-void Transaction::fetchIndex(const Remote &remote, const bool stale,
- const function<void (const IndexPtr &)> &cb)
-{
- const auto &netConfig = g_reapack->config()->network;
-
- const auto load = [=] {
- const IndexPtr ri = loadIndex(remote);
- if(cb && ri)
- cb(ri);
- };
-
- const Path &path = Index::pathFor(remote.name());
- time_t mtime = 0, now = time(nullptr);
- FS::mtime(path, &mtime);
-
- const time_t threshold = netConfig.staleThreshold;
- if(!stale && mtime && (!threshold || mtime > now - threshold)) {
- load();
- return;
- }
-
- auto dl = new FileDownload(path, remote.url(), netConfig, Download::NoCacheFlag);
- dl->setName(remote.name());
-
- dl->onFinish([=] {
- if(dl->save())
- m_receipt.setIndexChanged();
- else
- m_receipt.addError({FS::lastError(), dl->path().target().join()});
-
- if(FS::exists(path))
- load(); // try to load anyway, even on failure
- });
-
- m_threadPool.push(dl);
-}
-
IndexPtr Transaction::loadIndex(const Remote &remote)
{
const auto &it = m_indexes.find(remote.name());
@@ -172,10 +101,15 @@ IndexPtr Transaction::loadIndex(const Remote &remote)
}
}
-void Transaction::install(const Version *ver,
+void Transaction::install(const Version *ver, const bool pin,
+ const ArchiveReaderPtr &reader)
+{
+ install(ver, m_registry.getEntry(ver->package()), pin, reader);
+}
+
+void Transaction::install(const Version *ver, const Registry::Entry &oldEntry,
const bool pin, const ArchiveReaderPtr &reader)
{
- const auto &oldEntry = m_registry.getEntry(ver->package());
m_nextQueue.push(make_shared<InstallTask>(ver, pin, oldEntry, reader, this));
}
@@ -211,44 +145,49 @@ void Transaction::exportArchive(const string &path)
bool Transaction::runTasks()
{
- if(!m_nextQueue.empty()) {
- m_taskQueues.push(m_nextQueue);
- TaskQueue().swap(m_nextQueue);
- }
+ do {
+ if(!m_nextQueue.empty()) {
+ m_taskQueues.push(m_nextQueue);
+ TaskQueue().swap(m_nextQueue);
+ }
- if(!commitTasks())
- return false; // we're downloading indexes for synchronization
- else if(m_isCancelled) {
- finish();
- return true;
- }
+ if(!commitTasks())
+ return false; // we're downloading indexes for synchronization
+ else if(m_isCancelled) {
+ finish();
+ return true;
+ }
- promptObsolete();
+ promptObsolete();
- while(!m_taskQueues.empty()) {
- m_registry.savepoint();
+ while(!m_taskQueues.empty()) {
+ runQueue(m_taskQueues.front());
+ m_taskQueues.pop();
- auto &queue = m_taskQueues.front();
+ if(!commitTasks())
+ return false; // if the tasks didn't finish immediately (downloading)
+ }
+ } while(!m_nextQueue.empty()); // restart if a task's commit() added new tasks
+
+ finish(); // we're done!
- while(!queue.empty()) {
- const TaskPtr &task = queue.top();
+ return true;
+}
- if(task->start())
- m_runningTasks.push(task);
+void Transaction::runQueue(TaskQueue &queue)
+{
+ m_registry.savepoint();
- queue.pop();
- }
+ while(!queue.empty()) {
+ const TaskPtr &task = queue.top();
- m_registry.restore();
- m_taskQueues.pop();
+ if(task->start())
+ m_runningTasks.push(task);
- if(!commitTasks()) // if the tasks didn't finish immediately (downloading)
- return false;
+ queue.pop();
}
- finish(); // we're done!
-
- return true;
+ m_registry.restore();
}
bool Transaction::commitTasks()
@@ -279,16 +218,6 @@ void Transaction::finish()
m_cleanupHandler();
}
-bool Transaction::allFilesExists(const set<Path> &list) const
-{
- for(const Path &path : list) {
- if(!FS::exists(path))
- return false;
- }
-
- return true;
-}
-
void Transaction::registerAll(const bool add, const Registry::Entry &entry)
{
// don't actually do anything until commit() – which will calls registerQueued
diff --git a/src/transaction.hpp b/src/transaction.hpp
@@ -31,9 +31,11 @@
#include <unordered_set>
class ArchiveReader;
+class InstallTask;
class Path;
class Remote;
-struct InstallOpts;
+class SynchronizeTask;
+class UninstallTask;
typedef std::shared_ptr<Task> TaskPtr;
@@ -56,6 +58,8 @@ public:
void synchronize(const Remote &,
boost::optional<bool> forceAutoInstall = boost::none);
void install(const Version *, bool pin = false, const ArchiveReaderPtr & = nullptr);
+ void install(const Version *, const Registry::Entry &oldEntry,
+ bool pin = false, const ArchiveReaderPtr & = nullptr);
void setPinned(const Registry::Entry &, bool pinned);
void uninstall(const Remote &);
void uninstall(const Registry::Entry &);
@@ -68,6 +72,13 @@ public:
Registry *registry() { return &m_registry; }
ThreadPool *threadPool() { return &m_threadPool; }
+protected:
+ friend SynchronizeTask;
+ friend InstallTask;
+ friend UninstallTask;
+
+ IndexPtr loadIndex(const Remote &);
+ void addObsolete(const Registry::Entry &e) { m_obsolete.insert(e); }
void registerAll(bool add, const Registry::Entry &);
void registerFile(const HostTicket &t) { m_regQueue.push(t); }
@@ -83,15 +94,11 @@ private:
typedef std::priority_queue<TaskPtr,
std::vector<TaskPtr>, CompareTask> TaskQueue;
- void fetchIndex(const Remote &, bool stale,
- const std::function<void (const IndexPtr &)> & = {});
- IndexPtr loadIndex(const Remote &);
- void synchronize(const Package *, const InstallOpts &);
- bool allFilesExists(const std::set<Path> &) const;
void registerQueued();
void registerScript(const HostTicket &, bool isLast);
void inhibit(const Remote &);
void promptObsolete();
+ void runQueue(TaskQueue &queue);
bool commitTasks();
void finish();
diff --git a/test/filesystem.cpp b/test/filesystem.cpp
@@ -27,9 +27,32 @@ TEST_CASE("file modification time", M) {
TEST_CASE("file exists", M) {
UseRootPath root(RIPATH);
- REQUIRE(FS::exists(Index::pathFor("Новая папка")));
- REQUIRE_FALSE(FS::exists(Index::pathFor("Новая папка"), true));
+ SECTION("file") {
+ const Path &file = Index::pathFor("Новая папка");
+ REQUIRE(FS::exists(file));
+ REQUIRE_FALSE(FS::exists(file, true));
+ }
+
+ SECTION("directory") {
+ REQUIRE_FALSE(FS::exists(Path("ReaPack")));
+ REQUIRE(FS::exists(Path("ReaPack"), true));
+ }
+}
+
+TEST_CASE("all files exists", M) {
+ UseRootPath root(RIPATH);
+
+ REQUIRE(FS::allFilesExists({}));
+
+ REQUIRE(FS::allFilesExists({
+ Index::pathFor("future_version"),
+ Index::pathFor("broken"),
+ }));
+
+ REQUIRE_FALSE(FS::allFilesExists({
+ Index::pathFor("future_version"),
+ Index::pathFor("not_found"),
+ }));
- REQUIRE_FALSE(FS::exists(Path("ReaPack")));
- REQUIRE(FS::exists(Path("ReaPack"), true));
+ REQUIRE_FALSE(FS::allFilesExists({Path("ReaPack")})); // directory
}