diff --git a/librepomgr/buildactions/buildaction.cpp b/librepomgr/buildactions/buildaction.cpp index 698e7aa..3f544ec 100644 --- a/librepomgr/buildactions/buildaction.cpp +++ b/librepomgr/buildactions/buildaction.cpp @@ -1,3 +1,4 @@ + #include "./buildactionprivate.h" #include "../webapi/session.h" @@ -167,7 +168,7 @@ const std::string &InternalBuildAction::findSetting(const std::string_view &sett void InternalBuildAction::reportError(std::string &&error) { const auto buildActionLock = m_setup.building.lockToWrite(); - m_buildAction->resultData = move(error); + m_buildAction->resultData = std::move(error); m_buildAction->conclude(BuildActionResult::Failure); } @@ -228,10 +229,10 @@ BuildAction &BuildAction::operator=(BuildAction &&other) started = other.started; finished = other.finished; startAfter = std::move(other.startAfter); - m_log = std::move(other.m_log); + m_log = LogContext(this); m_setup = other.m_setup; m_aborted = false; - m_stopHandler = std::function(); + m_stopHandler = std::bind(&BuildAction::terminateOngoingBuildProcesses, this); m_concludeHandler = std::function(); m_ongoingProcesses.clear(); m_outputSession.reset(); @@ -360,11 +361,6 @@ LibPkg::StorageID BuildAction::conclude(BuildActionResult result) this->result = result; finished = DateTime::gmtNow(); - // tell clients waiting for output that it's over - if (const auto outputStreamingLock = std::unique_lock(m_outputSessionMutex); m_outputSession) { - m_outputSession->writeEnd(); - } - // start globally visible follow-up actions if succeeded if (result == BuildActionResult::Success && m_setup) { const auto followUps = m_setup->building.followUpBuildActions(id); @@ -376,6 +372,15 @@ LibPkg::StorageID BuildAction::conclude(BuildActionResult result) // note: Not cleaning up the follow-up actions here because at some point I might implement recursive restarting. } + // detatch build process sessions + if (const auto lock = std::unique_lock(m_outputSessionMutex); m_outputSession) { + m_outputSession->writeEnd(); // tell clients waiting for output that it's over + m_outputSession.reset(); + } + if (const auto lock = std::unique_lock(m_processesMutex)) { + m_ongoingProcesses.clear(); + } + // write build action to persistent storage // TODO: should this also be done in the middle of the execution to have some "save points"? auto id = LibPkg::StorageID(); diff --git a/librepomgr/buildactions/buildaction.h b/librepomgr/buildactions/buildaction.h index 6a06b28..89f6cdd 100644 --- a/librepomgr/buildactions/buildaction.h +++ b/librepomgr/buildactions/buildaction.h @@ -180,6 +180,7 @@ public: bool hasSucceeded() const; static bool haveSucceeded(const std::vector> &buildActions); bool isAborted() const; + const std::atomic_bool &aborted() const; LibPkg::StorageID start(ServiceSetup &setup); void assignStartAfter(const std::vector> &startsAfterBuildActions); void abort(); @@ -267,6 +268,11 @@ inline bool BuildAction::isAborted() const return m_aborted.load(); } +inline const std::atomic_bool &BuildAction::aborted() const +{ + return m_aborted; +} + inline LogContext &BuildAction::log() { return m_log; diff --git a/librepomgr/buildactions/buildactionlivestreaming.cpp b/librepomgr/buildactions/buildactionlivestreaming.cpp index 9632727..88c6e5c 100644 --- a/librepomgr/buildactions/buildactionlivestreaming.cpp +++ b/librepomgr/buildactions/buildactionlivestreaming.cpp @@ -19,8 +19,8 @@ void BuildProcessSession::BuffersToWrite::clear() outstandingBuffersToSend.clear(); } -void BuildProcessSession::DataForWebSession::streamFile( - const std::string &filePath, std::shared_ptr &&session, std::unique_lock &&lock) +void BuildProcessSession::DataForWebSession::streamFile(const std::string &filePath, const std::shared_ptr &processSession, + const std::shared_ptr &webSession, std::unique_lock &&lock) { error = false; @@ -61,13 +61,14 @@ void BuildProcessSession::DataForWebSession::streamFile( return; } #endif - m_fileBuffer = m_session.m_bufferPool.newBuffer(); - m_fileStream.async_read_some(boost::asio::buffer(*m_fileBuffer, sizeof(std::min(fileSize, m_session.m_bufferPool.bufferSize()))), - std::bind(&DataForWebSession::writeFileData, this, std::ref(filePath), std::move(session), std::placeholders::_1, std::placeholders::_2)); + m_fileBuffer = processSession->m_bufferPool.newBuffer(); + m_fileStream.async_read_some(boost::asio::buffer(*m_fileBuffer, sizeof(std::min(fileSize, processSession->m_bufferPool.bufferSize()))), + [this, &filePath, processSession, webSession]( + auto &error, auto bytesTransferred) { writeFileData(filePath, processSession, webSession, error, bytesTransferred); }); } -void BuildProcessSession::DataForWebSession::writeFileData( - const std::string &filePath, std::shared_ptr session, const boost::system::error_code &readError, size_t bytesTransferred) +void BuildProcessSession::DataForWebSession::writeFileData(const std::string &filePath, const std::shared_ptr &processSession, + const std::shared_ptr &webSession, const boost::system::error_code &readError, size_t bytesTransferred) { // handle error const auto eof = readError == boost::asio::error::eof; @@ -83,14 +84,14 @@ void BuildProcessSession::DataForWebSession::writeFileData( bytesTransferred = m_bytesToSendFromFile; } const auto bytesLeftToRead = m_bytesToSendFromFile - bytesTransferred; - boost::beast::net::async_write(session->socket(), boost::beast::http::make_chunk(boost::asio::buffer(*m_fileBuffer, bytesTransferred)), - [this, &filePath, session, bytesLeftToRead, moreToRead = !eof && bytesLeftToRead]( + boost::beast::net::async_write(webSession->socket(), boost::beast::http::make_chunk(boost::asio::buffer(*m_fileBuffer, bytesTransferred)), + [this, &filePath, processSession, webSession, bytesLeftToRead, moreToRead = !eof && bytesLeftToRead]( boost::system::error_code ecWebClient, std::size_t bytesTransferredToWebClient) { // handle error CPP_UTILITIES_UNUSED(bytesTransferredToWebClient) if (ecWebClient) { cerr << Phrases::WarningMessage << "Error sending \"" << filePath << "\" to client: " << ecWebClient.message() << Phrases::EndFlush; - std::lock_guard lock(m_session.m_mutex); + std::lock_guard lock(processSession->m_mutex); clear(); error = true; m_bytesToSendFromFile.store(0); @@ -99,16 +100,18 @@ void BuildProcessSession::DataForWebSession::writeFileData( m_bytesToSendFromFile.store(bytesLeftToRead); // tell the client it's over if there is nothing more to read if (!moreToRead) { - if (m_session.m_exited.load()) { - boost::beast::net::async_write(session->socket(), boost::beast::http::make_chunk_last(), - std::bind(&WebAPI::Session::responded, session, std::placeholders::_1, std::placeholders::_2, true)); + if (processSession->m_exited.load()) { + boost::beast::net::async_write(webSession->socket(), boost::beast::http::make_chunk_last(), + std::bind(&WebAPI::Session::responded, webSession, std::placeholders::_1, std::placeholders::_2, true)); } return; } // continue reading if there's more data - m_fileStream.async_read_some(boost::asio::buffer(*m_fileBuffer, sizeof(std::min(bytesLeftToRead, m_session.m_bufferPool.bufferSize()))), - std::bind( - &DataForWebSession::writeFileData, this, std::ref(filePath), std::move(session), std::placeholders::_1, std::placeholders::_2)); + m_fileStream.async_read_some( + boost::asio::buffer(*m_fileBuffer, sizeof(std::min(bytesLeftToRead, processSession->m_bufferPool.bufferSize()))), + [this, &filePath, processSession, webSession](auto &readError2, auto bytesTransferred2) { + writeFileData(filePath, processSession, webSession, readError2, bytesTransferred2); + }); }); } @@ -117,9 +120,9 @@ void BuildProcessSession::registerWebSession(std::shared_ptr && std::unique_lock lock(m_mutex); auto &sessionInfo = m_registeredWebSessions[webSession]; if (!sessionInfo) { - sessionInfo = std::make_unique(*this); + sessionInfo = std::make_unique(m_ioContext); } - sessionInfo->streamFile(m_logFilePath, std::move(webSession), std::move(lock)); + sessionInfo->streamFile(m_logFilePath, shared_from_this(), std::move(webSession), std::move(lock)); } void BuildProcessSession::registerNewDataHandler(std::function &&handler) @@ -283,8 +286,10 @@ void BuildProcessSession::writeNextBufferToLogFile(const boost::system::error_co m_logFileBuffers.currentlySentBufferRefs.emplace_back(boost::asio::buffer(buffer.first.get(), buffer.second)); } } - boost::asio::async_write(m_logFileStream, m_logFileBuffers.currentlySentBufferRefs, - std::bind(&BuildProcessSession::writeNextBufferToLogFile, shared_from_this(), std::placeholders::_1, std::placeholders::_2)); + if (!m_logFileBuffers.currentlySentBufferRefs.empty()) { + boost::asio::async_write(m_logFileStream, m_logFileBuffers.currentlySentBufferRefs, + std::bind(&BuildProcessSession::writeNextBufferToLogFile, shared_from_this(), std::placeholders::_1, std::placeholders::_2)); + } } void BuildProcessSession::writeNextBufferToWebSession( @@ -356,15 +361,14 @@ void BuildProcessSession::conclude() m_exited = true; // detach from build action - auto buildAction = m_buildAction.lock(); - if (!buildAction) { + if (!m_buildAction) { return; } - if (const auto outputLock = std::lock_guard(buildAction->m_outputSessionMutex); buildAction->m_outputSession.get() == this) { - buildAction->m_outputSession.reset(); + if (const auto outputLock = std::lock_guard(m_buildAction->m_outputSessionMutex); m_buildAction->m_outputSession.get() == this) { + m_buildAction->m_outputSession.reset(); } else { - const auto processesLock = std::lock_guard(buildAction->m_processesMutex); - buildAction->m_ongoingProcesses.erase(m_logFilePath); + const auto processesLock = std::lock_guard(m_buildAction->m_processesMutex); + m_buildAction->m_ongoingProcesses.erase(m_logFilePath); } } diff --git a/librepomgr/buildactions/buildactionprivate.h b/librepomgr/buildactions/buildactionprivate.h index c7aa022..25baca0 100644 --- a/librepomgr/buildactions/buildactionprivate.h +++ b/librepomgr/buildactions/buildactionprivate.h @@ -153,15 +153,15 @@ private: void clear(); }; struct DataForWebSession : public BuffersToWrite { - explicit DataForWebSession(BuildProcessSession &session); - void streamFile(const std::string &filePath, std::shared_ptr &&session, std::unique_lock &&lock); + explicit DataForWebSession(boost::asio::io_context &ioc); + void streamFile(const std::string &filePath, const std::shared_ptr &processSession, + const std::shared_ptr &webSession, std::unique_lock &&lock); std::size_t bytesToSendFromFile() const; private: - void writeFileData(const std::string &filePath, std::shared_ptr session, const boost::system::error_code &error, - std::size_t bytesTransferred); + void writeFileData(const std::string &filePath, const std::shared_ptr &processSession, + const std::shared_ptr &webSession, const boost::system::error_code &error, std::size_t bytesTransferred); - BuildProcessSession &m_session; std::atomic m_bytesToSendFromFile = 0; #ifndef BOOST_ASIO_HAS_FILE boost::beast::file m_file; @@ -180,7 +180,7 @@ private: void close(); void conclude(); - std::weak_ptr m_buildAction; + std::shared_ptr m_buildAction; boost::process::async_pipe m_pipe; BufferPoolType m_bufferPool; BufferType m_buffer; @@ -198,9 +198,8 @@ private: std::atomic_bool m_exited = false; }; -inline BuildProcessSession::DataForWebSession::DataForWebSession(BuildProcessSession &session) - : m_session(session) - , m_fileStream(session.m_ioContext) +inline BuildProcessSession::DataForWebSession::DataForWebSession(boost::asio::io_context &ioc) + : m_fileStream(ioc) { } @@ -249,8 +248,9 @@ template void BuildProcessSession::launch(ChildArgs &&.. m_ioContext, group, std::forward(childArgs)..., (boost::process::std_out & boost::process::std_err) > m_pipe, boost::process::extend::on_success = [session = shared_from_this()](auto &executor) { - if (const auto buildAction = session->m_buildAction.lock()) { - buildAction->appendOutput(CppUtilities::EscapeCodes::Phrases::InfoMessage, "Launched \"", session->m_displayName, "\", PID: ", + if (session->m_buildAction) { + session->m_buildAction->appendOutput(CppUtilities::EscapeCodes::Phrases::InfoMessage, "Launched \"", session->m_displayName, + "\", PID: ", executor #ifdef PLATFORM_WINDOWS .proc_info.dwProcessId diff --git a/librepomgr/buildactions/reloadlibrarydependencies.cpp b/librepomgr/buildactions/reloadlibrarydependencies.cpp index fc70836..42d5808 100644 --- a/librepomgr/buildactions/reloadlibrarydependencies.cpp +++ b/librepomgr/buildactions/reloadlibrarydependencies.cpp @@ -241,10 +241,11 @@ void LibRepoMgr::ReloadLibraryDependencies::downloadPackagesFromMirror() } m_buildAction->appendOutput(Phrases::SuccessMessage, "Downloading ", packagesWhichNeedCaching, " binary packages from mirror ...\n"); - WebClient::cachePackages(m_buildAction->log(), - std::make_shared(m_cachingData, m_setup.building.ioContext, m_setup.webServer.sslContext, - std::bind(&ReloadLibraryDependencies::loadPackageInfoFromContents, this)), - m_packageDownloadSizeLimit ? std::make_optional(m_packageDownloadSizeLimit) : std::nullopt); + auto session = std::make_shared(m_cachingData, m_setup.building.ioContext, m_setup.webServer.sslContext, + std::bind(&ReloadLibraryDependencies::loadPackageInfoFromContents, this)); + session->aborted = &m_buildAction->aborted(); + WebClient::cachePackages( + m_buildAction->log(), std::move(session), m_packageDownloadSizeLimit ? std::make_optional(m_packageDownloadSizeLimit) : std::nullopt); } void ReloadLibraryDependencies::loadPackageInfoFromContents() diff --git a/librepomgr/webclient/database.cpp b/librepomgr/webclient/database.cpp index 904c150..ddea4d0 100644 --- a/librepomgr/webclient/database.cpp +++ b/librepomgr/webclient/database.cpp @@ -228,7 +228,8 @@ PackageCachingDataForPackage *PackageCachingSession::getCurrentDataAndSelectNext void cachePackages(LogContext &log, std::shared_ptr &&packageCachingSession, std::optional bodyLimit, std::size_t maxParallelDownloads) { - for (std::size_t startedDownloads = 0; startedDownloads < maxParallelDownloads; ++startedDownloads) { + for (std::size_t startedDownloads = 0; + startedDownloads < maxParallelDownloads && (!packageCachingSession->aborted || !*packageCachingSession->aborted); ++startedDownloads) { auto *const cachingData = packageCachingSession->getCurrentDataAndSelectNext(); if (!cachingData) { return; @@ -243,7 +244,7 @@ void cachePackages(LogContext &log, std::shared_ptr &&pac cachingData->error = tupleToString(msg); log(Phrases::ErrorMessage, msg, '\n'); } - const auto &response = get(session.response); + const auto &response = std::get(session.response); const auto &message = response.get(); if (message.result() != boost::beast::http::status::ok) { const auto msg = std::make_tuple("Error downloading \"", cachingData->url, "\" to \"", cachingData->destinationFilePath, diff --git a/librepomgr/webclient/database.h b/librepomgr/webclient/database.h index 51fa345..62180ea 100644 --- a/librepomgr/webclient/database.h +++ b/librepomgr/webclient/database.h @@ -55,6 +55,8 @@ struct PackageCachingSession : public MultiSession { explicit PackageCachingSession( PackageCachingDataForSession &data, boost::asio::io_context &ioContext, boost::asio::ssl::context &sslContext, HandlerType &&handler); + const std::atomic_bool *aborted = nullptr; + private: void selectNextPackage(); PackageCachingDataForPackage *getCurrentDataAndSelectNext();