From 93afb3883d04cb688af4084fc51f5bde047b77cb Mon Sep 17 00:00:00 2001 From: Martchus Date: Sat, 19 Feb 2022 21:26:56 +0100 Subject: [PATCH] Write build action "output" log to a logfile like for sub-processes --- cli/main.cpp | 1 - librepomgr/buildactions/buildaction.cpp | 15 +- librepomgr/buildactions/buildaction.h | 22 +- .../buildactions/buildactionlivestreaming.cpp | 353 +++++++----------- librepomgr/buildactions/buildactionprivate.h | 34 +- librepomgr/buildactions/makelicenseinfo.cpp | 1 - librepomgr/buildactions/subprocess.h | 3 + librepomgr/tests/buildactions.cpp | 16 +- librepomgr/webapi/routes.h | 1 - librepomgr/webapi/routes_buildaction.cpp | 30 -- librepomgr/webapi/server.cpp | 1 - srv/static/js/buildactionspage.js | 26 +- 12 files changed, 184 insertions(+), 319 deletions(-) diff --git a/cli/main.cpp b/cli/main.cpp index a1d7789..0a0e43d 100644 --- a/cli/main.cpp +++ b/cli/main.cpp @@ -268,7 +268,6 @@ static void printBuildAction(const LibRepoMgr::BuildAction &a, const LibRepoMgr: } t.add_row({ "Log files", printListOfStringsAsSubTable(a.logfiles) }); t.add_row({ "Artefacts", printListOfStringsAsSubTable(a.artefacts) }); - t.add_row({ "Output", a.output }); t.column(0).format().font_align(tabulate::FontAlign::right); std::cout << t << '\n'; } diff --git a/librepomgr/buildactions/buildaction.cpp b/librepomgr/buildactions/buildaction.cpp index cc398c9..698e7aa 100644 --- a/librepomgr/buildactions/buildaction.cpp +++ b/librepomgr/buildactions/buildaction.cpp @@ -222,8 +222,6 @@ BuildAction &BuildAction::operator=(BuildAction &&other) status = other.status; result = other.result; resultData = std::move(other.resultData); - output = std::move(other.output); - outputMimeType = std::move(other.outputMimeType); logfiles = std::move(other.logfiles); artefacts = std::move(other.artefacts); created = other.created; @@ -236,7 +234,7 @@ BuildAction &BuildAction::operator=(BuildAction &&other) m_stopHandler = std::function(); m_concludeHandler = std::function(); m_ongoingProcesses.clear(); - m_bufferingForSession.clear(); + m_outputSession.reset(); m_internalBuildAction = std::move(other.m_internalBuildAction); return *this; } @@ -363,15 +361,8 @@ LibPkg::StorageID BuildAction::conclude(BuildActionResult result) finished = DateTime::gmtNow(); // tell clients waiting for output that it's over - const auto outputStreamingLock = std::unique_lock(m_outputStreamingMutex); - for (auto i = m_bufferingForSession.begin(); i != m_bufferingForSession.end();) { - if (!i->second->currentlySentBuffers.empty() || !i->second->outstandingBuffersToSend.empty()) { - ++i; - continue; - } - boost::beast::net::async_write(i->first->socket(), boost::beast::http::make_chunk_last(), - std::bind(&WebAPI::Session::responded, i->first, std::placeholders::_1, std::placeholders::_2, true)); - i = m_bufferingForSession.erase(i); + if (const auto outputStreamingLock = std::unique_lock(m_outputSessionMutex); m_outputSession) { + m_outputSession->writeEnd(); } // start globally visible follow-up actions if succeeded diff --git a/librepomgr/buildactions/buildaction.h b/librepomgr/buildactions/buildaction.h index 1b2ab98..6a06b28 100644 --- a/librepomgr/buildactions/buildaction.h +++ b/librepomgr/buildactions/buildaction.h @@ -152,7 +152,6 @@ struct LIBREPOMGR_EXPORT BuildActionMessages : public ReflectiveRapidJSON::JsonS }; class BuildProcessSession; -struct OutputBufferingForSession; struct ServiceSetup; struct LIBREPOMGR_EXPORT BuildAction : public std::enable_shared_from_this, @@ -184,8 +183,8 @@ public: LibPkg::StorageID start(ServiceSetup &setup); void assignStartAfter(const std::vector> &startsAfterBuildActions); void abort(); - void appendOutput(std::string &&output); void appendOutput(std::string_view output); + void appendOutput(std::string &&output); template void appendOutput(Args &&...args); template void appendOutput(CppUtilities::EscapeCodes::Phrases phrase, Args &&...args); LogContext &log(); @@ -204,11 +203,6 @@ private: template void post(); template void post(Callback &&codeToRun); LibPkg::StorageID conclude(BuildActionResult result); - void continueStreamingExistingOutputToSession(std::shared_ptr session, OutputBufferingForSession &buffering, - const boost::system::error_code &error, std::size_t bytesTransferred); - void continueStreamingNewOutputToSession(std::shared_ptr session, OutputBufferingForSession &buffering, - const boost::system::error_code &error, std::size_t bytesTransferred); - template void appendOutput(OutputType &&output); public: IdType id; @@ -228,8 +222,6 @@ public: std::variant, LibPkg::LicenseResult, LibPkg::PackageUpdates, BuildPreparation, BuildProgress, PackageMovementResult, std::unordered_map>, BuildActionMessages> resultData; - std::string output; - std::string outputMimeType = "text/plain"; std::vector logfiles; std::vector artefacts; CppUtilities::DateTime created = CppUtilities::DateTime::gmtNow(); @@ -245,8 +237,8 @@ private: std::function m_concludeHandler; std::mutex m_processesMutex; std::unordered_map> m_ongoingProcesses; - std::mutex m_outputStreamingMutex; - std::unordered_map, std::unique_ptr> m_bufferingForSession; + std::mutex m_outputSessionMutex; + std::shared_ptr m_outputSession; std::unique_ptr m_internalBuildAction; }; @@ -309,6 +301,14 @@ template inline void BuildAction::appendOutput(Args &&...args appendOutput(CppUtilities::argsToString(std::forward(args)...)); } +/*! + * \brief Append output (overload needed to prevent endless recursion). + */ +inline void BuildAction::appendOutput(std::string &&output) +{ + appendOutput(std::string_view(output)); +} + /*! * \brief Appends the specified arguments to the build action's log and to the overall service log. */ diff --git a/librepomgr/buildactions/buildactionlivestreaming.cpp b/librepomgr/buildactions/buildactionlivestreaming.cpp index 5e1d5b1..4f6ef93 100644 --- a/librepomgr/buildactions/buildactionlivestreaming.cpp +++ b/librepomgr/buildactions/buildactionlivestreaming.cpp @@ -12,8 +12,6 @@ using namespace CppUtilities::EscapeCodes; namespace LibRepoMgr { -static OutputBufferingForSession::BufferPoolType outputStreamingBufferPool(OutputBufferingForSession::bufferSize); - void BuildProcessSession::BuffersToWrite::clear() { currentlySentBuffers.clear(); @@ -70,6 +68,9 @@ void BuildProcessSession::DataForWebSession::writeFileData( m_descriptor.close(ec); } // send file data to web client + if (bytesTransferred > m_bytesToSendFromFile) { + bytesTransferred = m_bytesToSendFromFile; + } const auto bytesLeftToRead = m_bytesToSendFromFile - bytesTransferred; boost::beast::net::async_write(session->socket(), boost::beast::http::make_chunk(boost::asio::buffer(*m_fileBuffer, bytesTransferred)), [this, &filePath, session, bytesLeftToRead, moreToRead = !eof && bytesLeftToRead]( @@ -112,14 +113,42 @@ void BuildProcessSession::registerWebSession(std::shared_ptr && void BuildProcessSession::registerNewDataHandler(std::function &&handler) { - std::unique_lock lock(m_mutex); + const auto lock = std::lock_guard(m_mutex); m_newDataHandlers.emplace_back(std::move(handler)); } -void BuildProcessSession::prpareLogFile() +void BuildProcessSession::writeData(std::string_view data) { + const auto lock = std::lock_guard(m_mutex); + while (const auto bufferSize = std::min(data.size(), m_bufferPool.bufferSize())) { + m_buffer = m_bufferPool.newBuffer(); + data.copy(m_buffer->data(), bufferSize); + writeCurrentBuffer(bufferSize); + data = data.substr(bufferSize); + } +} + +void BuildProcessSession::writeEnd() +{ + m_exited = true; + close(); +} + +void BuildProcessSession::prepareLogFile() +{ + // ensure directory exists + auto path = std::filesystem::path(m_logFilePath); + if (path.has_parent_path()) { + auto ec = std::error_code(); + std::filesystem::create_directories(path.parent_path(), ec); + if (ec) { + result.errorCode = std::move(ec); + result.error = CppUtilities::argsToString("unable to create directory \"", path.parent_path(), ": ", ec.message()); + return; + } + } // open logfile and a "file descriptor" for writing in a non-blocking way - boost::beast::error_code ec; + auto ec = boost::beast::error_code(); m_logFile.open(m_logFilePath.data(), boost::beast::file_mode::write, ec); if (ec) { result.errorCode = std::error_code(ec.value(), ec.category()); @@ -152,35 +181,8 @@ void BuildProcessSession::writeDataFromPipe(boost::system::error_code ec, std::s } // write bytes to log file and web clients if (bytesTransferred) { - std::lock_guard lock(m_mutex); - if (!m_logFileBuffers.error) { - if (m_logFileBuffers.currentlySentBuffers.empty()) { - m_logFileBuffers.currentlySentBuffers.emplace_back(std::pair(m_buffer, bytesTransferred)); - boost::asio::async_write(m_logFileDescriptor, boost::asio::buffer(m_buffer.get(), bytesTransferred), - std::bind(&BuildProcessSession::writeNextBufferToLogFile, shared_from_this(), std::placeholders::_1, std::placeholders::_2)); - } else { - m_logFileBuffers.outstandingBuffersToSend.emplace_back(std::pair(m_buffer, bytesTransferred)); - } - } - for (auto &[session, sessionInfo] : m_registeredWebSessions) { - if (sessionInfo->error) { - continue; - } - if (sessionInfo->currentlySentBuffers.empty() && !sessionInfo->bytesToSendFromFile()) { - sessionInfo->currentlySentBuffers.emplace_back(std::pair(m_buffer, bytesTransferred)); - boost::beast::net::async_write(session->socket(), - boost::beast::http::make_chunk(boost::asio::buffer(m_buffer.get(), bytesTransferred)), - std::bind(&BuildProcessSession::writeNextBufferToWebSession, shared_from_this(), std::placeholders::_1, std::placeholders::_2, - std::ref(*session), std::ref(*sessionInfo))); - } else { - sessionInfo->outstandingBuffersToSend.emplace_back(std::pair(m_buffer, bytesTransferred)); - } - } - for (const auto &handler : m_newDataHandlers) { - if (handler) { - handler(m_buffer, bytesTransferred); - } - } + auto lock = std::lock_guard(m_mutex); + writeCurrentBuffer(bytesTransferred); } // continue reading from the pipe unless there was an error if (!ec) { @@ -188,23 +190,48 @@ void BuildProcessSession::writeDataFromPipe(boost::system::error_code ec, std::s return; } // stop reading from the pipe if there was an error; close the log file and tell web clients that it's over - if (bytesTransferred) { - return; + if (!bytesTransferred) { + close(); } - std::lock_guard lock(m_mutex); - if (m_logFileBuffers.outstandingBuffersToSend.empty()) { - boost::system::error_code error; - m_logFile.close(error); - if (error) { - cerr << Phrases::WarningMessage << "Error closing \"" << m_logFilePath << "\": " << error.message() << Phrases::EndFlush; +} + +void BuildProcessSession::writeCurrentBuffer(std::size_t bytesTransferred) +{ + // write bytesTransferred bytes from m_buffer to log file + if (!m_logFileBuffers.error) { + if (m_logFileBuffers.currentlySentBuffers.empty()) { + m_logFileBuffers.currentlySentBuffers.emplace_back(std::pair(m_buffer, bytesTransferred)); + boost::asio::async_write(m_logFileDescriptor, boost::asio::buffer(m_buffer.get(), bytesTransferred), + std::bind(&BuildProcessSession::writeNextBufferToLogFile, shared_from_this(), std::placeholders::_1, std::placeholders::_2)); + } else { + m_logFileBuffers.outstandingBuffersToSend.emplace_back(std::pair(m_buffer, bytesTransferred)); } } + // write bytesTransferred bytes from m_buffer to web sessions for (auto &[session, sessionInfo] : m_registeredWebSessions) { - if (!sessionInfo->outstandingBuffersToSend.empty()) { + if (sessionInfo->error) { continue; } - boost::beast::net::async_write(session->socket(), boost::beast::http::make_chunk_last(), - std::bind(&WebAPI::Session::responded, session, std::placeholders::_1, std::placeholders::_2, true)); + if (sessionInfo->currentlySentBuffers.empty() && !sessionInfo->bytesToSendFromFile()) { + sessionInfo->currentlySentBuffers.swap(sessionInfo->outstandingBuffersToSend); + sessionInfo->currentlySentBuffers.emplace_back(std::pair(m_buffer, bytesTransferred)); + sessionInfo->currentlySentBufferRefs.clear(); + for (const auto &buffer : sessionInfo->currentlySentBuffers) { + sessionInfo->currentlySentBufferRefs.emplace_back(boost::asio::buffer(buffer.first.get(), buffer.second)); + } + boost::beast::net::async_write(session->socket(), boost::beast::http::make_chunk(sessionInfo->currentlySentBufferRefs), + std::bind(&BuildProcessSession::writeNextBufferToWebSession, shared_from_this(), std::placeholders::_1, std::placeholders::_2, + std::ref(*session), std::ref(*sessionInfo))); + + } else { + sessionInfo->outstandingBuffersToSend.emplace_back(std::pair(m_buffer, bytesTransferred)); + } + } + // invoke new data handlers + for (const auto &handler : m_newDataHandlers) { + if (handler) { + handler(m_buffer, bytesTransferred); + } } } @@ -214,23 +241,21 @@ void BuildProcessSession::writeNextBufferToLogFile(const boost::system::error_co CPP_UTILITIES_UNUSED(bytesTransferred) if (error) { cerr << Phrases::ErrorMessage << "Error writing to \"" << m_logFilePath << "\": " << error.message() << Phrases::EndFlush; - std::lock_guard lock(m_mutex); + auto lock = std::lock_guard(m_mutex); m_logFileBuffers.clear(); m_logFileBuffers.error = true; return; } // write more data to the logfile if there's more { - std::lock_guard lock(m_mutex); + auto lock = std::lock_guard(m_mutex); m_logFileBuffers.currentlySentBuffers.clear(); - if (m_logFileBuffers.outstandingBuffersToSend.empty()) { - // close the logfile when the process exited and we've written all the output - if (m_exited.load()) { - boost::system::error_code closeError; - m_logFile.close(closeError); - if (closeError) { - cerr << Phrases::WarningMessage << "Error closing \"" << m_logFilePath << "\": " << closeError.message() << Phrases::EndFlush; - } + // close the logfile when the process exited and we've written all the output + if (m_logFileBuffers.outstandingBuffersToSend.empty() && m_exited.load()) { + auto closeError = boost::system::error_code(); + m_logFile.close(closeError); + if (closeError) { + cerr << Phrases::WarningMessage << "Error closing \"" << m_logFilePath << "\": " << closeError.message() << Phrases::EndFlush; } return; } @@ -258,14 +283,12 @@ void BuildProcessSession::writeNextBufferToWebSession( } // send more data to the client if there's more { - std::lock_guard lock(m_mutex); + auto lock = std::lock_guard(m_mutex); sessionInfo.currentlySentBuffers.clear(); // tell the client it's over when the process exited and we've sent all the output - if (sessionInfo.outstandingBuffersToSend.empty()) { - if (m_exited.load()) { - boost::beast::net::async_write(session.socket(), boost::beast::http::make_chunk_last(), - std::bind(&WebAPI::Session::responded, session.shared_from_this(), std::placeholders::_1, std::placeholders::_2, true)); - } + if (sessionInfo.outstandingBuffersToSend.empty() && m_exited.load()) { + boost::beast::net::async_write(session.socket(), boost::beast::http::make_chunk_last(), + std::bind(&WebAPI::Session::responded, session.shared_from_this(), std::placeholders::_1, std::placeholders::_2, true)); return; } sessionInfo.currentlySentBuffers.swap(sessionInfo.outstandingBuffersToSend); @@ -274,9 +297,30 @@ void BuildProcessSession::writeNextBufferToWebSession( sessionInfo.currentlySentBufferRefs.emplace_back(boost::asio::buffer(buffer.first.get(), buffer.second)); } } - boost::beast::net::async_write(session.socket(), boost::beast::http::make_chunk(sessionInfo.currentlySentBufferRefs), - std::bind(&BuildProcessSession::writeNextBufferToWebSession, shared_from_this(), std::placeholders::_1, std::placeholders::_2, - std::ref(session), std::ref(sessionInfo))); + if (!sessionInfo.currentlySentBufferRefs.empty()) { + boost::beast::net::async_write(session.socket(), boost::beast::http::make_chunk(sessionInfo.currentlySentBufferRefs), + std::bind(&BuildProcessSession::writeNextBufferToWebSession, shared_from_this(), std::placeholders::_1, std::placeholders::_2, + std::ref(session), std::ref(sessionInfo))); + } +} + +void BuildProcessSession::close() +{ + auto lock = std::lock_guard(m_mutex); + if (m_logFileBuffers.outstandingBuffersToSend.empty()) { + auto error = boost::system::error_code(); + m_logFile.close(error); + if (error) { + cerr << Phrases::WarningMessage << "Error closing \"" << m_logFilePath << "\": " << error.message() << Phrases::EndFlush; + } + } + for (auto &[session, sessionInfo] : m_registeredWebSessions) { + if (!sessionInfo->outstandingBuffersToSend.empty()) { + continue; + } + boost::beast::net::async_write(session->socket(), boost::beast::http::make_chunk_last(), + std::bind(&WebAPI::Session::responded, session, std::placeholders::_1, std::placeholders::_2, true)); + } } void BuildProcessSession::conclude() @@ -289,8 +333,12 @@ void BuildProcessSession::conclude() if (!buildAction) { return; } - const auto processesLock = std::lock_guard(buildAction->m_processesMutex); - buildAction->m_ongoingProcesses.erase(m_logFilePath); + if (const auto outputLock = std::lock_guard(buildAction->m_outputSessionMutex); buildAction->m_outputSession.get() == this) { + buildAction->m_outputSession.reset(); + } else { + const auto processesLock = std::lock_guard(buildAction->m_processesMutex); + buildAction->m_ongoingProcesses.erase(m_logFilePath); + } } std::shared_ptr BuildAction::makeBuildProcess( @@ -330,9 +378,13 @@ void BuildAction::terminateOngoingBuildProcesses() void BuildAction::streamFile(const WebAPI::Params ¶ms, const std::string &filePath, std::string_view fileMimeType) { - auto processesLock = std::unique_lock(m_processesMutex); - auto buildProcess = findBuildProcess(filePath); - processesLock.unlock(); + auto buildProcess = std::shared_ptr(); + if (const auto outputLock = std::unique_lock(m_outputSessionMutex); m_outputSession && m_outputSession->logFilePath() == filePath) { + buildProcess = m_outputSession; + } else { + const auto processesLock = std::unique_lock(m_processesMutex); + buildProcess = findBuildProcess(filePath); + } if (!buildProcess) { // simply send the file if there's no ongoing process writing to it anymore params.session.respond(filePath.data(), fileMimeType.data(), params.target.path); @@ -353,154 +405,31 @@ void BuildAction::streamFile(const WebAPI::Params ¶ms, const std::string &fi }); } -void BuildAction::streamOutput(const WebAPI::Params ¶ms, std::size_t offset) -{ - if (!m_setup) { - m_setup = ¶ms.setup; - } - auto session = params.session.shared_from_this(); - auto chunkResponse = WebAPI::Render::makeChunkResponse(params.request(), "application/octet-stream"); - auto outputStreamingLock = std::unique_lock(m_outputStreamingMutex); - auto &buffersForSession = m_bufferingForSession[session]; - if (buffersForSession) { - return; // skip when already streaming to that session - } - buffersForSession = std::make_unique(); - auto buildLock = params.setup.building.lockToRead(); - buffersForSession->existingOutputSize = output.size(); - buffersForSession->bytesSent = offset; - buildLock.unlock(); - outputStreamingLock.unlock(); - boost::beast::http::async_write_header(params.session.socket(), chunkResponse->serializer, - [buildAction = shared_from_this(), session = std::move(session), &buffering = *buffersForSession, chunkResponse]( - const boost::system::error_code &error, std::size_t bytesTransferred) { - CPP_UTILITIES_UNUSED(bytesTransferred) - buildAction->continueStreamingExistingOutputToSession(std::move(session), buffering, error, 0); - }); -} - -void BuildAction::continueStreamingExistingOutputToSession(std::shared_ptr session, OutputBufferingForSession &buffering, - const boost::system::error_code &error, std::size_t bytesTransferred) -{ - auto outputStreamingLock = std::unique_lock(m_outputStreamingMutex); - if (error) { - m_bufferingForSession.erase(session); - return; - } - const auto bytesSent = buffering.bytesSent += bytesTransferred; - if (bytesSent >= buffering.existingOutputSize) { - buffering.currentlySentBuffers.clear(); - buffering.existingOutputSent = true; - if (!buffering.outstandingBuffersToSend.empty()) { - outputStreamingLock.unlock(); - continueStreamingNewOutputToSession(std::move(session), buffering, error, 0); - return; - } - if (isDone()) { - m_bufferingForSession.erase(session); - outputStreamingLock.unlock(); - boost::beast::net::async_write(session->socket(), boost::beast::http::make_chunk_last(), - std::bind(&WebAPI::Session::responded, session, std::placeholders::_1, std::placeholders::_2, true)); - } - return; - } - auto buffer = buffering.currentlySentBuffers.empty() ? outputStreamingBufferPool.newBuffer() : buffering.currentlySentBuffers.front().first; - const auto bytesToCopy = std::min(output.size() - bytesSent, outputStreamingBufferPool.bufferSize()); - if (buffering.currentlySentBuffers.empty()) { - buffering.currentlySentBuffers.emplace_back(std::pair(buffer, bytesToCopy)); - } - outputStreamingLock.unlock(); - - auto buildLock = m_setup->building.lockToRead(); - output.copy(buffer->data(), bytesToCopy, bytesSent); - buildLock.unlock(); - boost::beast::net::async_write(session->socket(), boost::beast::http::make_chunk(boost::asio::buffer(buffer->data(), bytesToCopy)), - std::bind(&BuildAction::continueStreamingExistingOutputToSession, shared_from_this(), session, std::ref(buffering), std::placeholders::_1, - std::placeholders::_2)); -} - -void BuildAction::continueStreamingNewOutputToSession(std::shared_ptr session, OutputBufferingForSession &buffering, - const boost::system::error_code &error, std::size_t bytesTransferred) -{ - auto outputStreamingLock = std::unique_lock(m_outputStreamingMutex); - buffering.bytesSent += bytesTransferred; - buffering.currentlySentBuffers.clear(); - buffering.currentlySentBufferRefs.clear(); - if (error) { - m_bufferingForSession.erase(session); - return; - } - if (buffering.outstandingBuffersToSend.empty()) { - if (isDone()) { - m_bufferingForSession.erase(session); - outputStreamingLock.unlock(); - boost::beast::net::async_write(session->socket(), boost::beast::http::make_chunk_last(), - std::bind(&WebAPI::Session::responded, session, std::placeholders::_1, std::placeholders::_2, true)); - } - return; - } - buffering.outstandingBuffersToSend.swap(buffering.currentlySentBuffers); - buffering.currentlySentBufferRefs.reserve(buffering.currentlySentBuffers.size()); - for (const auto ¤tBuffer : buffering.currentlySentBuffers) { - buffering.currentlySentBufferRefs.emplace_back(boost::asio::buffer(*currentBuffer.first, currentBuffer.second)); - } - boost::beast::net::async_write(session->socket(), boost::beast::http::make_chunk(buffering.currentlySentBufferRefs), - std::bind(&BuildAction::continueStreamingNewOutputToSession, shared_from_this(), session, std::ref(buffering), std::placeholders::_1, - std::placeholders::_2)); -} - -template void BuildAction::appendOutput(OutputType &&output) -{ - if (output.empty() || !m_setup) { - return; - } - - auto lock = m_setup->building.lockToWrite(); - this->output.append(output); - lock.unlock(); - - OutputBufferingForSession::BufferPile buffers; - for (std::size_t offset = 0; offset < output.size(); offset += buffers.back().second) { - const auto bytesToBuffer = std::min(output.size() - offset, outputStreamingBufferPool.bufferSize()); - auto buffer = buffers.emplace_back(std::pair(outputStreamingBufferPool.newBuffer(), bytesToBuffer)); - output.copy(buffer.first->data(), bytesToBuffer, offset); - } - - auto outputStreamingLock = std::unique_lock(m_outputStreamingMutex); - for (auto &bufferingForSession : m_bufferingForSession) { - auto &buffering = bufferingForSession.second; - auto ¤tlySentBuffers = buffering->currentlySentBuffers; - if (currentlySentBuffers.empty() && buffering->existingOutputSent) { - auto &session = bufferingForSession.first; - auto ¤tlySentBufferRefs = buffering->currentlySentBufferRefs; - currentlySentBuffers.insert(currentlySentBuffers.end(), buffers.begin(), buffers.end()); - for (const auto &buffer : buffers) { - currentlySentBufferRefs.emplace_back(boost::asio::buffer(buffer.first->data(), buffer.second)); - } - boost::beast::net::async_write(session->socket(), boost::beast::http::make_chunk(currentlySentBufferRefs), - std::bind(&BuildAction::continueStreamingNewOutputToSession, shared_from_this(), session, std::ref(*buffering), std::placeholders::_1, - std::placeholders::_2)); - } else { - auto &outstandingBuffersToSend = buffering->outstandingBuffersToSend; - outstandingBuffersToSend.insert(outstandingBuffersToSend.end(), buffers.begin(), buffers.end()); - } - } -} - -/*! - * \brief Internally called to append output and spread it to all waiting sessions. - */ -void BuildAction::appendOutput(std::string &&output) -{ - appendOutput(std::move(output)); -} - /*! * \brief Internally called to append output and spread it to all waiting sessions. */ void BuildAction::appendOutput(std::string_view output) { - appendOutput(std::forward(output)); + if (output.empty() || !m_setup) { + return; + } + + auto outputLock = std::unique_lock(m_outputSessionMutex); + if (!m_outputSession) { + m_outputSession = std::make_shared( + this, m_setup->building.ioContext, argsToString("Output of build action ", id), argsToString("logs/build-action-", id, ".log")); + m_outputSession->prepareLogFile(); + if (m_outputSession->result.errorCode) { + std::cerr << Phrases::ErrorMessage << "Unable to open output logfile for build action " << id << ": " << m_outputSession->result.error + << Phrases::EndFlush; + return; + } + const auto buildingLock = m_setup->building.lockToWrite(); + logfiles.emplace_back(m_outputSession->logFilePath()); + } + if (!m_outputSession->result.errorCode) { + m_outputSession->writeData(output); + } } } // namespace LibRepoMgr diff --git a/librepomgr/buildactions/buildactionprivate.h b/librepomgr/buildactions/buildactionprivate.h index 7559295..07b8b76 100644 --- a/librepomgr/buildactions/buildactionprivate.h +++ b/librepomgr/buildactions/buildactionprivate.h @@ -112,22 +112,6 @@ template inline std::size_t BufferPool::stor return m_buffers.size(); } -/// \brief The OutputBufferingForSession struct holds buffers used by the BuildAction live-streaming. -struct LIBREPOMGR_EXPORT OutputBufferingForSession { - static constexpr std::size_t bufferSize = 4096; - using StorageType = std::array; - using BufferPoolType = BufferPool; - using BufferType = BufferPoolType::BufferType; - using BufferPile = std::vector>; - using BufferRefs = std::vector; - BufferPile currentlySentBuffers; - BufferPile outstandingBuffersToSend; - BufferRefs currentlySentBufferRefs; - std::atomic bytesSent = 0; - std::atomic existingOutputSize = 0; - std::atomic_bool existingOutputSent = false; -}; - /// \brief The BuildProcessSession class spawns a process associated with a build action. /// The process output is make available as a logfile of the build action allowing live-steaming. class LIBREPOMGR_EXPORT BuildProcessSession : public std::enable_shared_from_this, public BaseProcessSession { @@ -138,11 +122,15 @@ public: using BufferType = BufferPoolType::BufferType; explicit BuildProcessSession(BuildAction *buildAction, boost::asio::io_context &ioContext, std::string &&displayName, std::string &&logFilePath, - Handler &&handler, AssociatedLocks &&locks = AssociatedLocks()); + Handler &&handler = Handler(), AssociatedLocks &&locks = AssociatedLocks()); template void launch(ChildArgs &&...childArgs); void registerWebSession(std::shared_ptr &&webSession); void registerNewDataHandler(std::function &&handler); + void prepareLogFile(); + void writeData(std::string_view data); + void writeEnd(); AssociatedLocks &locks(); + const std::string &logFilePath() const; bool hasExited() const; private: @@ -171,12 +159,13 @@ private: boost::asio::posix::stream_descriptor m_descriptor; }; - void prpareLogFile(); void readMoreFromPipe(); - void writeDataFromPipe(boost::system::error_code ec, std::size_t bytesRead); + void writeDataFromPipe(boost::system::error_code ec, std::size_t bytesTransferred); + void writeCurrentBuffer(std::size_t bytesTransferred); void writeNextBufferToLogFile(const boost::system::error_code &error, std::size_t bytesTransferred); void writeNextBufferToWebSession( const boost::system::error_code &error, std::size_t bytesTransferred, WebAPI::Session &session, BuffersToWrite &sessionInfo); + void close(); void conclude(); std::weak_ptr m_buildAction; @@ -224,6 +213,11 @@ inline AssociatedLocks &BuildProcessSession::locks() return m_locks; } +inline const std::string &BuildProcessSession::logFilePath() const +{ + return m_logFilePath; +} + inline bool BuildProcessSession::hasExited() const { return m_exited.load(); @@ -231,7 +225,7 @@ inline bool BuildProcessSession::hasExited() const template void BuildProcessSession::launch(ChildArgs &&...childArgs) { - prpareLogFile(); + prepareLogFile(); if (result.errorCode) { conclude(); return; diff --git a/librepomgr/buildactions/makelicenseinfo.cpp b/librepomgr/buildactions/makelicenseinfo.cpp index ab90c45..01222a3 100644 --- a/librepomgr/buildactions/makelicenseinfo.cpp +++ b/librepomgr/buildactions/makelicenseinfo.cpp @@ -20,7 +20,6 @@ void MakeLicenseInfo::run() std::get>(configReadLock).unlock(); const auto buildActionWriteLock = m_setup.building.lockToWrite(); - m_buildAction->outputMimeType = "application/json"; m_buildAction->resultData = std::move(result); reportResult(result.success ? BuildActionResult::Success : BuildActionResult::Failure); } diff --git a/librepomgr/buildactions/subprocess.h b/librepomgr/buildactions/subprocess.h index 2c6379d..9abe4ab 100644 --- a/librepomgr/buildactions/subprocess.h +++ b/librepomgr/buildactions/subprocess.h @@ -62,6 +62,9 @@ inline BaseProcessSession::BaseProcessSession(boost::asio::io_context &ioContext inline BaseProcessSession::~BaseProcessSession() { + if (!m_handler) { + return; + } boost::asio::post(m_ioContext, [child = std::move(this->child), result = std::move(this->result), handler = std::move(m_handler)]() mutable { handler(std::move(child), std::move(result)); }); diff --git a/librepomgr/tests/buildactions.cpp b/librepomgr/tests/buildactions.cpp index 750154c..93953df 100644 --- a/librepomgr/tests/buildactions.cpp +++ b/librepomgr/tests/buildactions.cpp @@ -230,8 +230,11 @@ void BuildActionsTests::testLogging() m_buildAction->log()(Phrases::ErrorMessage, "some error: ", "message", '\n'); m_buildAction->log()(Phrases::InfoMessage, "info", '\n'); } - CPPUNIT_ASSERT_EQUAL_MESSAGE("messages added to build action output", - "\e[1;31m==> ERROR: \e[0m\e[1msome error: message\n\e[1;37m==> \e[0m\e[1minfo\n"s, m_buildAction->output); + m_buildAction->conclude(BuildActionResult::Success); + m_setup.building.ioContext.run(); + const auto output = readFile("logs/build-action-0.log"); + CPPUNIT_ASSERT_EQUAL_MESSAGE( + "messages added to build action output", "\e[1;31m==> ERROR: \e[0m\e[1msome error: message\n\e[1;37m==> \e[0m\e[1minfo\n"s, output); } /*! @@ -258,7 +261,7 @@ void BuildActionsTests::testProcessSession() */ void BuildActionsTests::testBuildActionProcess() { - m_buildAction = std::make_shared(0, &m_setup); + m_buildAction = std::make_shared(1, &m_setup); const auto scriptPath = testFilePath("scripts/print_some_data.sh"); const auto logFilePath = std::filesystem::path(TestApplication::instance()->workingDirectory()) / "logfile.log"; @@ -269,11 +272,11 @@ void BuildActionsTests::testBuildActionProcess() auto &ioc = m_setup.building.ioContext; auto session = std::make_shared( - m_buildAction.get(), ioc, "test", std::string(logFilePath), [&ioc](boost::process::child &&child, ProcessResult &&result) { + m_buildAction.get(), ioc, "test", std::string(logFilePath), [this](boost::process::child &&child, ProcessResult &&result) { CPPUNIT_ASSERT_EQUAL(std::error_code(), result.errorCode); CPPUNIT_ASSERT_EQUAL(0, result.exitCode); CPPUNIT_ASSERT_GREATER(0, child.native_handle()); - ioc.stop(); + m_buildAction->conclude(BuildActionResult::Success); }); session->launch(scriptPath); session.reset(); @@ -281,11 +284,12 @@ void BuildActionsTests::testBuildActionProcess() const auto logFile = readFile(logFilePath); const auto logLines = splitStringSimple>(logFile, "\r\n"); + const auto output = readFile("logs/build-action-1.log"); CPPUNIT_ASSERT_EQUAL(5002_st, logLines.size()); CPPUNIT_ASSERT_EQUAL("printing some numbers"sv, logLines.front()); CPPUNIT_ASSERT_EQUAL_MESSAGE("trailing line break", ""sv, logLines.back()); CPPUNIT_ASSERT_EQUAL_MESSAGE("last line", "line 5000"sv, logLines[logLines.size() - 2u]); - TESTUTILS_ASSERT_LIKE_FLAGS("PID logged", ".*Launched \"test\", PID\\: [0-9]+.*\n.*"s, std::regex::extended, m_buildAction->output); + TESTUTILS_ASSERT_LIKE_FLAGS("PID logged", ".*Launched \"test\", PID\\: [0-9]+.*\n.*"s, std::regex::extended, output); } /*! diff --git a/librepomgr/webapi/routes.h b/librepomgr/webapi/routes.h index 2312855..c895524 100644 --- a/librepomgr/webapi/routes.h +++ b/librepomgr/webapi/routes.h @@ -18,7 +18,6 @@ LIBREPOMGR_EXPORT void getUnresolved(const Params ¶ms, ResponseHandler &&han LIBREPOMGR_EXPORT void getPackages(const Params ¶ms, ResponseHandler &&handler); LIBREPOMGR_EXPORT void getBuildActions(const Params ¶ms, ResponseHandler &&handler); LIBREPOMGR_EXPORT void getBuildActionDetails(const Params ¶ms, ResponseHandler &&handler); -LIBREPOMGR_EXPORT void getBuildActionOutput(const Params ¶ms, ResponseHandler &&handler); LIBREPOMGR_EXPORT void getBuildActionLogFile(const Params ¶ms, ResponseHandler &&handler); LIBREPOMGR_EXPORT void getBuildActionArtefact(const Params ¶ms, ResponseHandler &&handler); LIBREPOMGR_EXPORT void postLoadPackages(const Params ¶ms, ResponseHandler &&handler); diff --git a/librepomgr/webapi/routes_buildaction.cpp b/librepomgr/webapi/routes_buildaction.cpp index c9b9fb1..e544c22 100644 --- a/librepomgr/webapi/routes_buildaction.cpp +++ b/librepomgr/webapi/routes_buildaction.cpp @@ -100,36 +100,6 @@ void getBuildActionDetails(const Params ¶ms, ResponseHandler &&handler) handler(makeJson(params.request(), jsonDoc, params.target.hasPrettyFlag())); } -void getBuildActionOutput(const Params ¶ms, ResponseHandler &&handler) -{ - const auto offsetParams = params.target.decodeValues("offset"); - std::size_t offset = 0; - if (offsetParams.size() > 1) { - handler(makeBadRequest(params.request(), "the offset parameter must be specified at most once")); - return; - } - if (!offsetParams.empty()) { - try { - offset = stringToNumber(offsetParams.front()); - } catch (const ConversionException &) { - handler(makeBadRequest(params.request(), "the offset must be an unsigned integer")); - return; - } - } - auto buildActionsSearchResult = findBuildActions(params, std::move(handler), false, 1); - if (!buildActionsSearchResult.ok) { - return; - } - auto &buildAction = buildActionsSearchResult.actions.front(); - if (offset > buildAction->output.size()) { - buildActionsSearchResult.lock = std::monostate{}; - handler(makeBadRequest(params.request(), "the offset must not exceed the output size")); - return; - } - buildActionsSearchResult.lock = std::monostate{}; - buildAction->streamOutput(params, offset); -} - static std::string readNameParam(const Params ¶ms, ResponseHandler &&handler) { const auto nameParams = params.target.decodeValues("name"); diff --git a/librepomgr/webapi/server.cpp b/librepomgr/webapi/server.cpp index dc89d9b..e3ec2de 100644 --- a/librepomgr/webapi/server.cpp +++ b/librepomgr/webapi/server.cpp @@ -33,7 +33,6 @@ const Router Server::s_router = { { { http::verb::get, "/api/v0/build-action" }, Route{&Routes::getBuildActions} }, { { http::verb::delete_, "/api/v0/build-action" }, Route{&Routes::deleteBuildActions, UserPermissions::ModifyBuildActions} }, { { http::verb::get, "/api/v0/build-action/details" }, Route{&Routes::getBuildActionDetails, UserPermissions::ReadBuildActionsDetails} }, - { { http::verb::get, "/api/v0/build-action/output" }, Route{&Routes::getBuildActionOutput, UserPermissions::ReadBuildActionsDetails} }, { { http::verb::get, "/api/v0/build-action/logfile" }, Route{&Routes::getBuildActionLogFile, UserPermissions::ReadBuildActionsDetails} }, { { http::verb::get, "/api/v0/build-action/artefact" }, Route{&Routes::getBuildActionArtefact, UserPermissions::ReadBuildActionsDetails} }, { { http::verb::post, "/api/v0/build-action" }, Route{&Routes::postBuildAction, UserPermissions::ModifyBuildActions} }, diff --git a/srv/static/js/buildactionspage.js b/srv/static/js/buildactionspage.js index 23c4d3d..7523d92 100644 --- a/srv/static/js/buildactionspage.js +++ b/srv/static/js/buildactionspage.js @@ -463,8 +463,8 @@ function renderBuildActionDetailsTable(buildActionDetails) { return GenericRendering.renderTableFromJsonObject({ data: buildActionDetails, - displayLabels: ['ID', 'Task', 'Type', 'Status', 'Result', 'Result data', 'Created', 'Started', 'Finished', 'Start after', 'Directory', 'Source repo', 'Destination repo', 'Packages', 'Flags', 'Settings', 'Log files', 'Artefacts', 'Output'], - fieldAccessors: ['id', 'taskName', 'type', 'status', 'result', 'resultData', 'created', 'started', 'finished', 'startAfter', 'directory', 'sourceDbs', 'destinationDbs', 'packageNames', 'flags', 'settings', 'logfiles', 'artefacts', 'output'], + displayLabels: ['ID', 'Task', 'Type', 'Status', 'Result', 'Result data', 'Created', 'Started', 'Finished', 'Start after', 'Directory', 'Source repo', 'Destination repo', 'Packages', 'Flags', 'Settings', 'Log files', 'Artefacts'], + fieldAccessors: ['id', 'taskName', 'type', 'status', 'result', 'resultData', 'created', 'started', 'finished', 'startAfter', 'directory', 'sourceDbs', 'destinationDbs', 'packageNames', 'flags', 'settings', 'logfiles', 'artefacts'], customRenderer: { taskName: function (value) { if (!value) { @@ -602,28 +602,6 @@ function renderBuildActionDetailsTable(buildActionDetails) }, logfiles: renderBuildActionLogFiles, artefacts: renderBuildActionArtefacts, - output: function(value, row) { - const isFinished = row.status === 4; - if (!value && isFinished) { - return GenericRendering.renderNoneInGrey(); - } - const targetElement = document.createElement('div'); - if (isFinished) { - const terminal = Terminal.makeTerminal(); - Terminal.setupTerminalLater(terminal, targetElement, value); - return targetElement; - } - const streamingSetup = setupTerminalForStreaming({ - id: 'output-' + row.id, - targetElement: targetElement, - path: '/build-action/output?id=' + encodeURIComponent(row.id) + '&offset=' + encodeURIComponent(value.length), - }); - streamingSetup.startStreaming(); - const terminal = streamingSetup.terminal(); - terminal.setOption('convertEol', true); - terminal.write(value); - return streamingSetup.elements; - }, }, }); }