Add locking system so build actions can acquire named locks
This commit is contained in:
parent
120fbf993b
commit
bc993f1d78
|
@ -9,6 +9,8 @@ namespace LibPkg {
|
||||||
struct Lockable {
|
struct Lockable {
|
||||||
[[nodiscard]] std::shared_lock<std::shared_mutex> lockToRead() const;
|
[[nodiscard]] std::shared_lock<std::shared_mutex> lockToRead() const;
|
||||||
[[nodiscard]] std::unique_lock<std::shared_mutex> lockToWrite();
|
[[nodiscard]] std::unique_lock<std::shared_mutex> lockToWrite();
|
||||||
|
[[nodiscard]] std::shared_lock<std::shared_mutex> tryLockToRead() const;
|
||||||
|
[[nodiscard]] std::unique_lock<std::shared_mutex> tryLockToWrite();
|
||||||
[[nodiscard]] std::unique_lock<std::shared_mutex> lockToWrite(std::shared_lock<std::shared_mutex> &readLock);
|
[[nodiscard]] std::unique_lock<std::shared_mutex> lockToWrite(std::shared_lock<std::shared_mutex> &readLock);
|
||||||
|
|
||||||
private:
|
private:
|
||||||
|
@ -25,6 +27,16 @@ inline std::unique_lock<std::shared_mutex> Lockable::lockToWrite()
|
||||||
return std::unique_lock<std::shared_mutex>(m_mutex);
|
return std::unique_lock<std::shared_mutex>(m_mutex);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
inline std::shared_lock<std::shared_mutex> Lockable::tryLockToRead() const
|
||||||
|
{
|
||||||
|
return std::shared_lock<std::shared_mutex>(m_mutex, std::try_to_lock);
|
||||||
|
}
|
||||||
|
|
||||||
|
inline std::unique_lock<std::shared_mutex> Lockable::tryLockToWrite()
|
||||||
|
{
|
||||||
|
return std::unique_lock<std::shared_mutex>(m_mutex, std::try_to_lock);
|
||||||
|
}
|
||||||
|
|
||||||
inline std::unique_lock<std::shared_mutex> Lockable::lockToWrite(std::shared_lock<std::shared_mutex> &readLock)
|
inline std::unique_lock<std::shared_mutex> Lockable::lockToWrite(std::shared_lock<std::shared_mutex> &readLock)
|
||||||
{
|
{
|
||||||
readLock.unlock();
|
readLock.unlock();
|
||||||
|
|
|
@ -60,6 +60,8 @@ class Session;
|
||||||
|
|
||||||
struct InternalBuildAction;
|
struct InternalBuildAction;
|
||||||
|
|
||||||
|
using AssociatedLocks = std::vector<std::variant<std::shared_lock<std::shared_mutex>, std::unique_lock<std::shared_mutex>>>;
|
||||||
|
|
||||||
struct LIBREPOMGR_EXPORT PackageBuildData : public ReflectiveRapidJSON::JsonSerializable<PackageBuildData>,
|
struct LIBREPOMGR_EXPORT PackageBuildData : public ReflectiveRapidJSON::JsonSerializable<PackageBuildData>,
|
||||||
public ReflectiveRapidJSON::BinarySerializable<PackageBuildData> {
|
public ReflectiveRapidJSON::BinarySerializable<PackageBuildData> {
|
||||||
std::string existingVersion;
|
std::string existingVersion;
|
||||||
|
@ -200,7 +202,8 @@ public:
|
||||||
void setStopHandler(std::function<void(void)> &&stopHandler);
|
void setStopHandler(std::function<void(void)> &&stopHandler);
|
||||||
void setConcludeHandler(std::function<void(void)> &&concludeHandler);
|
void setConcludeHandler(std::function<void(void)> &&concludeHandler);
|
||||||
std::shared_ptr<BuildProcessSession> findBuildProcess(const std::string &filePath);
|
std::shared_ptr<BuildProcessSession> findBuildProcess(const std::string &filePath);
|
||||||
std::shared_ptr<BuildProcessSession> makeBuildProcess(std::string &&displayName, std::string &&logFilePath, ProcessHandler &&handler);
|
std::shared_ptr<BuildProcessSession> makeBuildProcess(
|
||||||
|
std::string &&displayName, std::string &&logFilePath, ProcessHandler &&handler, AssociatedLocks &&locks = AssociatedLocks());
|
||||||
void terminateOngoingBuildProcesses();
|
void terminateOngoingBuildProcesses();
|
||||||
void streamFile(const WebAPI::Params ¶ms, const std::string &filePath, std::string_view fileMimeType);
|
void streamFile(const WebAPI::Params ¶ms, const std::string &filePath, std::string_view fileMimeType);
|
||||||
void streamOutput(const WebAPI::Params ¶ms, std::size_t offset = 0);
|
void streamOutput(const WebAPI::Params ¶ms, std::size_t offset = 0);
|
||||||
|
|
|
@ -328,7 +328,8 @@ void BufferSearch::operator()(const BuildProcessSession::BufferType &buffer, std
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
std::shared_ptr<BuildProcessSession> BuildAction::makeBuildProcess(std::string &&displayName, std::string &&logFilePath, ProcessHandler &&handler)
|
std::shared_ptr<BuildProcessSession> BuildAction::makeBuildProcess(
|
||||||
|
std::string &&displayName, std::string &&logFilePath, ProcessHandler &&handler, AssociatedLocks &&locks)
|
||||||
{
|
{
|
||||||
const auto processesLock = std::lock_guard<std::mutex>(m_processesMutex);
|
const auto processesLock = std::lock_guard<std::mutex>(m_processesMutex);
|
||||||
auto &process = m_ongoingProcesses[logFilePath];
|
auto &process = m_ongoingProcesses[logFilePath];
|
||||||
|
@ -342,8 +343,8 @@ std::shared_ptr<BuildProcessSession> BuildAction::makeBuildProcess(std::string &
|
||||||
logfiles.emplace_back(logFilePath);
|
logfiles.emplace_back(logFilePath);
|
||||||
}
|
}
|
||||||
buildLock.unlock();
|
buildLock.unlock();
|
||||||
return process
|
return process = make_shared<BuildProcessSession>(
|
||||||
= make_shared<BuildProcessSession>(this, m_setup->building.ioContext, std::move(displayName), std::move(logFilePath), std::move(handler));
|
this, m_setup->building.ioContext, std::move(displayName), std::move(logFilePath), std::move(handler), std::move(locks));
|
||||||
}
|
}
|
||||||
|
|
||||||
void BuildAction::terminateOngoingBuildProcesses()
|
void BuildAction::terminateOngoingBuildProcesses()
|
||||||
|
|
|
@ -133,11 +133,12 @@ public:
|
||||||
using BufferPoolType = BufferPool<StorageType>;
|
using BufferPoolType = BufferPool<StorageType>;
|
||||||
using BufferType = BufferPoolType::BufferType;
|
using BufferType = BufferPoolType::BufferType;
|
||||||
|
|
||||||
explicit BuildProcessSession(
|
explicit BuildProcessSession(BuildAction *buildAction, boost::asio::io_context &ioContext, std::string &&displayName, std::string &&logFilePath,
|
||||||
BuildAction *buildAction, boost::asio::io_context &ioContext, std::string &&displayName, std::string &&logFilePath, Handler &&handler);
|
Handler &&handler, AssociatedLocks &&locks = AssociatedLocks());
|
||||||
template <typename... ChildArgs> void launch(ChildArgs &&...childArgs);
|
template <typename... ChildArgs> void launch(ChildArgs &&...childArgs);
|
||||||
void registerWebSession(std::shared_ptr<WebAPI::Session> &&webSession);
|
void registerWebSession(std::shared_ptr<WebAPI::Session> &&webSession);
|
||||||
void registerNewDataHandler(std::function<void(BufferType, std::size_t)> &&handler);
|
void registerNewDataHandler(std::function<void(BufferType, std::size_t)> &&handler);
|
||||||
|
void assignLocks(AssociatedLocks &&locks = AssociatedLocks());
|
||||||
bool hasExited() const;
|
bool hasExited() const;
|
||||||
|
|
||||||
private:
|
private:
|
||||||
|
@ -186,6 +187,7 @@ private:
|
||||||
BuffersToWrite m_logFileBuffers;
|
BuffersToWrite m_logFileBuffers;
|
||||||
std::unordered_map<std::shared_ptr<WebAPI::Session>, std::unique_ptr<DataForWebSession>> m_registeredWebSessions;
|
std::unordered_map<std::shared_ptr<WebAPI::Session>, std::unique_ptr<DataForWebSession>> m_registeredWebSessions;
|
||||||
std::function<void(BufferType, std::size_t)> m_newDataHandler;
|
std::function<void(BufferType, std::size_t)> m_newDataHandler;
|
||||||
|
AssociatedLocks m_locks;
|
||||||
std::atomic_bool m_exited = false;
|
std::atomic_bool m_exited = false;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -201,17 +203,23 @@ inline std::size_t BuildProcessSession::DataForWebSession::bytesToSendFromFile()
|
||||||
}
|
}
|
||||||
|
|
||||||
inline BuildProcessSession::BuildProcessSession(BuildAction *buildAction, boost::asio::io_context &ioContext, std::string &&displayName,
|
inline BuildProcessSession::BuildProcessSession(BuildAction *buildAction, boost::asio::io_context &ioContext, std::string &&displayName,
|
||||||
std::string &&logFilePath, BaseProcessSession::Handler &&handler)
|
std::string &&logFilePath, BaseProcessSession::Handler &&handler, AssociatedLocks &&locks)
|
||||||
: BaseProcessSession(ioContext, std::move(handler))
|
: BaseProcessSession(ioContext, std::move(handler))
|
||||||
, m_buildAction(buildAction ? buildAction->weak_from_this() : std::weak_ptr<BuildAction>())
|
, m_buildAction(buildAction ? buildAction->weak_from_this() : std::weak_ptr<BuildAction>())
|
||||||
, m_pipe(ioContext)
|
, m_pipe(ioContext)
|
||||||
, m_bufferPool(bufferSize)
|
, m_bufferPool(bufferSize)
|
||||||
, m_displayName(displayName)
|
, m_displayName(std::move(displayName))
|
||||||
, m_logFilePath(std::move(logFilePath))
|
, m_logFilePath(std::move(logFilePath))
|
||||||
, m_logFileDescriptor(ioContext)
|
, m_logFileDescriptor(ioContext)
|
||||||
|
, m_locks(std::move(locks))
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
|
inline void BuildProcessSession::assignLocks(AssociatedLocks &&locks)
|
||||||
|
{
|
||||||
|
m_locks = std::move(locks);
|
||||||
|
}
|
||||||
|
|
||||||
inline bool BuildProcessSession::hasExited() const
|
inline bool BuildProcessSession::hasExited() const
|
||||||
{
|
{
|
||||||
return m_exited.load();
|
return m_exited.load();
|
||||||
|
|
|
@ -343,6 +343,10 @@ void ServiceSetup::loadConfigFiles(bool restoreStateAndDiscardDatabases)
|
||||||
deduplicateVector(db.mirrors);
|
deduplicateVector(db.mirrors);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// clear unused locks because locks might not be useful anymore with the new config (e.g. the lock was for a
|
||||||
|
// repository directory which has been removed)
|
||||||
|
locks.clear(); // FIXME: Do this regularly to avoid the locks table growing potentially endlessly?
|
||||||
|
|
||||||
// log the most important config values
|
// log the most important config values
|
||||||
cerr << Phrases::InfoMessage << "Working directory: " << workingDirectory << Phrases::EndFlush;
|
cerr << Phrases::InfoMessage << "Working directory: " << workingDirectory << Phrases::EndFlush;
|
||||||
cerr << Phrases::InfoMessage << "Package cache directory: " << building.packageCacheDir << Phrases::EndFlush;
|
cerr << Phrases::InfoMessage << "Package cache directory: " << building.packageCacheDir << Phrases::EndFlush;
|
||||||
|
@ -648,6 +652,17 @@ void ServiceSetup::run()
|
||||||
saveState();
|
saveState();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void ServiceSetup::Locks::clear()
|
||||||
|
{
|
||||||
|
const auto lock = std::lock_guard(m_mutex);
|
||||||
|
for (auto i = m_locksByName.begin(), end = m_locksByName.end(); i != end; ++i) {
|
||||||
|
if (auto lock2 = i->second.tryLockToWrite()) { // check whether nobody holds the lock anymore
|
||||||
|
lock2.unlock(); // ~shared_mutex(): The behavior is undefined if the mutex is owned by any thread [...].
|
||||||
|
m_locksByName.erase(i); // we can be sure no other thead aquires i->second in the meantime because we're holding m_mutex
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
ServiceStatus::ServiceStatus(const ServiceSetup &setup)
|
ServiceStatus::ServiceStatus(const ServiceSetup &setup)
|
||||||
: version(applicationInfo.version)
|
: version(applicationInfo.version)
|
||||||
, config(setup.config.computeStatus())
|
, config(setup.config.computeStatus())
|
||||||
|
|
|
@ -120,6 +120,17 @@ struct LIBREPOMGR_EXPORT ServiceSetup : public LibPkg::Lockable {
|
||||||
UserPermissions authenticate(std::string_view authorizationHeader) const;
|
UserPermissions authenticate(std::string_view authorizationHeader) const;
|
||||||
} auth;
|
} auth;
|
||||||
|
|
||||||
|
struct LIBREPOMGR_EXPORT Locks {
|
||||||
|
[[nodiscard]] std::shared_lock<std::shared_mutex> acquireToRead(const std::string &lockName);
|
||||||
|
[[nodiscard]] std::unique_lock<std::shared_mutex> acquireToWrite(const std::string &lockName);
|
||||||
|
[[nodiscard]] std::unique_lock<std::shared_mutex> acquireToWrite(std::shared_lock<std::shared_mutex> &readLock, const std::string &lockName);
|
||||||
|
void clear();
|
||||||
|
|
||||||
|
private:
|
||||||
|
std::mutex m_mutex;
|
||||||
|
std::unordered_map<std::string, LibPkg::Lockable> m_locksByName;
|
||||||
|
} locks;
|
||||||
|
|
||||||
void loadConfigFiles(bool restoreStateAndDiscardDatabases);
|
void loadConfigFiles(bool restoreStateAndDiscardDatabases);
|
||||||
void printDatabases();
|
void printDatabases();
|
||||||
std::string_view cacheFilePath() const;
|
std::string_view cacheFilePath() const;
|
||||||
|
@ -136,6 +147,25 @@ inline std::shared_ptr<BuildAction> ServiceSetup::BuildSetup::getBuildAction(Bui
|
||||||
return id < actions.size() ? actions[id] : nullptr;
|
return id < actions.size() ? actions[id] : nullptr;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
inline std::shared_lock<std::shared_mutex> ServiceSetup::Locks::acquireToRead(const std::string &lockName)
|
||||||
|
{
|
||||||
|
const auto lock = std::lock_guard(m_mutex);
|
||||||
|
return m_locksByName[lockName].lockToRead();
|
||||||
|
}
|
||||||
|
|
||||||
|
inline std::unique_lock<std::shared_mutex> ServiceSetup::Locks::acquireToWrite(const std::string &lockName)
|
||||||
|
{
|
||||||
|
const auto lock = std::lock_guard(m_mutex);
|
||||||
|
return m_locksByName[lockName].lockToWrite();
|
||||||
|
}
|
||||||
|
|
||||||
|
inline std::unique_lock<std::shared_mutex> ServiceSetup::Locks::acquireToWrite(
|
||||||
|
std::shared_lock<std::shared_mutex> &readLock, const std::string &lockName)
|
||||||
|
{
|
||||||
|
readLock.unlock();
|
||||||
|
return acquireToWrite(lockName);
|
||||||
|
}
|
||||||
|
|
||||||
struct LIBREPOMGR_EXPORT ServiceStatus : public ReflectiveRapidJSON::JsonSerializable<ServiceStatus> {
|
struct LIBREPOMGR_EXPORT ServiceStatus : public ReflectiveRapidJSON::JsonSerializable<ServiceStatus> {
|
||||||
ServiceStatus(const ServiceSetup &setup);
|
ServiceStatus(const ServiceSetup &setup);
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue