Write build action "output" log to a logfile like for sub-processes

This commit is contained in:
Martchus 2022-02-19 21:26:56 +01:00
parent a7de520549
commit 93afb3883d
12 changed files with 184 additions and 319 deletions

View File

@ -268,7 +268,6 @@ static void printBuildAction(const LibRepoMgr::BuildAction &a, const LibRepoMgr:
}
t.add_row({ "Log files", printListOfStringsAsSubTable(a.logfiles) });
t.add_row({ "Artefacts", printListOfStringsAsSubTable(a.artefacts) });
t.add_row({ "Output", a.output });
t.column(0).format().font_align(tabulate::FontAlign::right);
std::cout << t << '\n';
}

View File

@ -222,8 +222,6 @@ BuildAction &BuildAction::operator=(BuildAction &&other)
status = other.status;
result = other.result;
resultData = std::move(other.resultData);
output = std::move(other.output);
outputMimeType = std::move(other.outputMimeType);
logfiles = std::move(other.logfiles);
artefacts = std::move(other.artefacts);
created = other.created;
@ -236,7 +234,7 @@ BuildAction &BuildAction::operator=(BuildAction &&other)
m_stopHandler = std::function<void(void)>();
m_concludeHandler = std::function<void(void)>();
m_ongoingProcesses.clear();
m_bufferingForSession.clear();
m_outputSession.reset();
m_internalBuildAction = std::move(other.m_internalBuildAction);
return *this;
}
@ -363,15 +361,8 @@ LibPkg::StorageID BuildAction::conclude(BuildActionResult result)
finished = DateTime::gmtNow();
// tell clients waiting for output that it's over
const auto outputStreamingLock = std::unique_lock<std::mutex>(m_outputStreamingMutex);
for (auto i = m_bufferingForSession.begin(); i != m_bufferingForSession.end();) {
if (!i->second->currentlySentBuffers.empty() || !i->second->outstandingBuffersToSend.empty()) {
++i;
continue;
}
boost::beast::net::async_write(i->first->socket(), boost::beast::http::make_chunk_last(),
std::bind(&WebAPI::Session::responded, i->first, std::placeholders::_1, std::placeholders::_2, true));
i = m_bufferingForSession.erase(i);
if (const auto outputStreamingLock = std::unique_lock<std::mutex>(m_outputSessionMutex); m_outputSession) {
m_outputSession->writeEnd();
}
// start globally visible follow-up actions if succeeded

View File

@ -152,7 +152,6 @@ struct LIBREPOMGR_EXPORT BuildActionMessages : public ReflectiveRapidJSON::JsonS
};
class BuildProcessSession;
struct OutputBufferingForSession;
struct ServiceSetup;
struct LIBREPOMGR_EXPORT BuildAction : public std::enable_shared_from_this<BuildAction>,
@ -184,8 +183,8 @@ public:
LibPkg::StorageID start(ServiceSetup &setup);
void assignStartAfter(const std::vector<std::shared_ptr<BuildAction>> &startsAfterBuildActions);
void abort();
void appendOutput(std::string &&output);
void appendOutput(std::string_view output);
void appendOutput(std::string &&output);
template <typename... Args> void appendOutput(Args &&...args);
template <typename... Args> void appendOutput(CppUtilities::EscapeCodes::Phrases phrase, Args &&...args);
LogContext &log();
@ -204,11 +203,6 @@ private:
template <typename InternalBuildActionType> void post();
template <typename Callback> void post(Callback &&codeToRun);
LibPkg::StorageID conclude(BuildActionResult result);
void continueStreamingExistingOutputToSession(std::shared_ptr<WebAPI::Session> session, OutputBufferingForSession &buffering,
const boost::system::error_code &error, std::size_t bytesTransferred);
void continueStreamingNewOutputToSession(std::shared_ptr<WebAPI::Session> session, OutputBufferingForSession &buffering,
const boost::system::error_code &error, std::size_t bytesTransferred);
template <typename OutputType> void appendOutput(OutputType &&output);
public:
IdType id;
@ -228,8 +222,6 @@ public:
std::variant<std::string, std::vector<std::string>, LibPkg::LicenseResult, LibPkg::PackageUpdates, BuildPreparation, BuildProgress,
PackageMovementResult, std::unordered_map<std::string, std::vector<RepositoryProblem>>, BuildActionMessages>
resultData;
std::string output;
std::string outputMimeType = "text/plain";
std::vector<std::string> logfiles;
std::vector<std::string> artefacts;
CppUtilities::DateTime created = CppUtilities::DateTime::gmtNow();
@ -245,8 +237,8 @@ private:
std::function<void(void)> m_concludeHandler;
std::mutex m_processesMutex;
std::unordered_map<std::string, std::shared_ptr<BuildProcessSession>> m_ongoingProcesses;
std::mutex m_outputStreamingMutex;
std::unordered_map<std::shared_ptr<WebAPI::Session>, std::unique_ptr<OutputBufferingForSession>> m_bufferingForSession;
std::mutex m_outputSessionMutex;
std::shared_ptr<BuildProcessSession> m_outputSession;
std::unique_ptr<InternalBuildAction> m_internalBuildAction;
};
@ -309,6 +301,14 @@ template <typename... Args> inline void BuildAction::appendOutput(Args &&...args
appendOutput(CppUtilities::argsToString(std::forward<Args>(args)...));
}
/*!
* \brief Append output (overload needed to prevent endless recursion).
*/
inline void BuildAction::appendOutput(std::string &&output)
{
appendOutput(std::string_view(output));
}
/*!
* \brief Appends the specified arguments to the build action's log and to the overall service log.
*/

View File

@ -12,8 +12,6 @@ using namespace CppUtilities::EscapeCodes;
namespace LibRepoMgr {
static OutputBufferingForSession::BufferPoolType outputStreamingBufferPool(OutputBufferingForSession::bufferSize);
void BuildProcessSession::BuffersToWrite::clear()
{
currentlySentBuffers.clear();
@ -70,6 +68,9 @@ void BuildProcessSession::DataForWebSession::writeFileData(
m_descriptor.close(ec);
}
// send file data to web client
if (bytesTransferred > m_bytesToSendFromFile) {
bytesTransferred = m_bytesToSendFromFile;
}
const auto bytesLeftToRead = m_bytesToSendFromFile - bytesTransferred;
boost::beast::net::async_write(session->socket(), boost::beast::http::make_chunk(boost::asio::buffer(*m_fileBuffer, bytesTransferred)),
[this, &filePath, session, bytesLeftToRead, moreToRead = !eof && bytesLeftToRead](
@ -112,14 +113,42 @@ void BuildProcessSession::registerWebSession(std::shared_ptr<WebAPI::Session> &&
void BuildProcessSession::registerNewDataHandler(std::function<void(BuildProcessSession::BufferType, std::size_t)> &&handler)
{
std::unique_lock<std::mutex> lock(m_mutex);
const auto lock = std::lock_guard<std::mutex>(m_mutex);
m_newDataHandlers.emplace_back(std::move(handler));
}
void BuildProcessSession::prpareLogFile()
void BuildProcessSession::writeData(std::string_view data)
{
const auto lock = std::lock_guard<std::mutex>(m_mutex);
while (const auto bufferSize = std::min<std::size_t>(data.size(), m_bufferPool.bufferSize())) {
m_buffer = m_bufferPool.newBuffer();
data.copy(m_buffer->data(), bufferSize);
writeCurrentBuffer(bufferSize);
data = data.substr(bufferSize);
}
}
void BuildProcessSession::writeEnd()
{
m_exited = true;
close();
}
void BuildProcessSession::prepareLogFile()
{
// ensure directory exists
auto path = std::filesystem::path(m_logFilePath);
if (path.has_parent_path()) {
auto ec = std::error_code();
std::filesystem::create_directories(path.parent_path(), ec);
if (ec) {
result.errorCode = std::move(ec);
result.error = CppUtilities::argsToString("unable to create directory \"", path.parent_path(), ": ", ec.message());
return;
}
}
// open logfile and a "file descriptor" for writing in a non-blocking way
boost::beast::error_code ec;
auto ec = boost::beast::error_code();
m_logFile.open(m_logFilePath.data(), boost::beast::file_mode::write, ec);
if (ec) {
result.errorCode = std::error_code(ec.value(), ec.category());
@ -152,35 +181,8 @@ void BuildProcessSession::writeDataFromPipe(boost::system::error_code ec, std::s
}
// write bytes to log file and web clients
if (bytesTransferred) {
std::lock_guard<std::mutex> lock(m_mutex);
if (!m_logFileBuffers.error) {
if (m_logFileBuffers.currentlySentBuffers.empty()) {
m_logFileBuffers.currentlySentBuffers.emplace_back(std::pair(m_buffer, bytesTransferred));
boost::asio::async_write(m_logFileDescriptor, boost::asio::buffer(m_buffer.get(), bytesTransferred),
std::bind(&BuildProcessSession::writeNextBufferToLogFile, shared_from_this(), std::placeholders::_1, std::placeholders::_2));
} else {
m_logFileBuffers.outstandingBuffersToSend.emplace_back(std::pair(m_buffer, bytesTransferred));
}
}
for (auto &[session, sessionInfo] : m_registeredWebSessions) {
if (sessionInfo->error) {
continue;
}
if (sessionInfo->currentlySentBuffers.empty() && !sessionInfo->bytesToSendFromFile()) {
sessionInfo->currentlySentBuffers.emplace_back(std::pair(m_buffer, bytesTransferred));
boost::beast::net::async_write(session->socket(),
boost::beast::http::make_chunk(boost::asio::buffer(m_buffer.get(), bytesTransferred)),
std::bind(&BuildProcessSession::writeNextBufferToWebSession, shared_from_this(), std::placeholders::_1, std::placeholders::_2,
std::ref(*session), std::ref(*sessionInfo)));
} else {
sessionInfo->outstandingBuffersToSend.emplace_back(std::pair(m_buffer, bytesTransferred));
}
}
for (const auto &handler : m_newDataHandlers) {
if (handler) {
handler(m_buffer, bytesTransferred);
}
}
auto lock = std::lock_guard<std::mutex>(m_mutex);
writeCurrentBuffer(bytesTransferred);
}
// continue reading from the pipe unless there was an error
if (!ec) {
@ -188,23 +190,48 @@ void BuildProcessSession::writeDataFromPipe(boost::system::error_code ec, std::s
return;
}
// stop reading from the pipe if there was an error; close the log file and tell web clients that it's over
if (bytesTransferred) {
return;
if (!bytesTransferred) {
close();
}
std::lock_guard<std::mutex> lock(m_mutex);
if (m_logFileBuffers.outstandingBuffersToSend.empty()) {
boost::system::error_code error;
m_logFile.close(error);
if (error) {
cerr << Phrases::WarningMessage << "Error closing \"" << m_logFilePath << "\": " << error.message() << Phrases::EndFlush;
}
void BuildProcessSession::writeCurrentBuffer(std::size_t bytesTransferred)
{
// write bytesTransferred bytes from m_buffer to log file
if (!m_logFileBuffers.error) {
if (m_logFileBuffers.currentlySentBuffers.empty()) {
m_logFileBuffers.currentlySentBuffers.emplace_back(std::pair(m_buffer, bytesTransferred));
boost::asio::async_write(m_logFileDescriptor, boost::asio::buffer(m_buffer.get(), bytesTransferred),
std::bind(&BuildProcessSession::writeNextBufferToLogFile, shared_from_this(), std::placeholders::_1, std::placeholders::_2));
} else {
m_logFileBuffers.outstandingBuffersToSend.emplace_back(std::pair(m_buffer, bytesTransferred));
}
}
// write bytesTransferred bytes from m_buffer to web sessions
for (auto &[session, sessionInfo] : m_registeredWebSessions) {
if (!sessionInfo->outstandingBuffersToSend.empty()) {
if (sessionInfo->error) {
continue;
}
boost::beast::net::async_write(session->socket(), boost::beast::http::make_chunk_last(),
std::bind(&WebAPI::Session::responded, session, std::placeholders::_1, std::placeholders::_2, true));
if (sessionInfo->currentlySentBuffers.empty() && !sessionInfo->bytesToSendFromFile()) {
sessionInfo->currentlySentBuffers.swap(sessionInfo->outstandingBuffersToSend);
sessionInfo->currentlySentBuffers.emplace_back(std::pair(m_buffer, bytesTransferred));
sessionInfo->currentlySentBufferRefs.clear();
for (const auto &buffer : sessionInfo->currentlySentBuffers) {
sessionInfo->currentlySentBufferRefs.emplace_back(boost::asio::buffer(buffer.first.get(), buffer.second));
}
boost::beast::net::async_write(session->socket(), boost::beast::http::make_chunk(sessionInfo->currentlySentBufferRefs),
std::bind(&BuildProcessSession::writeNextBufferToWebSession, shared_from_this(), std::placeholders::_1, std::placeholders::_2,
std::ref(*session), std::ref(*sessionInfo)));
} else {
sessionInfo->outstandingBuffersToSend.emplace_back(std::pair(m_buffer, bytesTransferred));
}
}
// invoke new data handlers
for (const auto &handler : m_newDataHandlers) {
if (handler) {
handler(m_buffer, bytesTransferred);
}
}
}
@ -214,23 +241,21 @@ void BuildProcessSession::writeNextBufferToLogFile(const boost::system::error_co
CPP_UTILITIES_UNUSED(bytesTransferred)
if (error) {
cerr << Phrases::ErrorMessage << "Error writing to \"" << m_logFilePath << "\": " << error.message() << Phrases::EndFlush;
std::lock_guard<std::mutex> lock(m_mutex);
auto lock = std::lock_guard<std::mutex>(m_mutex);
m_logFileBuffers.clear();
m_logFileBuffers.error = true;
return;
}
// write more data to the logfile if there's more
{
std::lock_guard<std::mutex> lock(m_mutex);
auto lock = std::lock_guard<std::mutex>(m_mutex);
m_logFileBuffers.currentlySentBuffers.clear();
if (m_logFileBuffers.outstandingBuffersToSend.empty()) {
// close the logfile when the process exited and we've written all the output
if (m_exited.load()) {
boost::system::error_code closeError;
m_logFile.close(closeError);
if (closeError) {
cerr << Phrases::WarningMessage << "Error closing \"" << m_logFilePath << "\": " << closeError.message() << Phrases::EndFlush;
}
// close the logfile when the process exited and we've written all the output
if (m_logFileBuffers.outstandingBuffersToSend.empty() && m_exited.load()) {
auto closeError = boost::system::error_code();
m_logFile.close(closeError);
if (closeError) {
cerr << Phrases::WarningMessage << "Error closing \"" << m_logFilePath << "\": " << closeError.message() << Phrases::EndFlush;
}
return;
}
@ -258,14 +283,12 @@ void BuildProcessSession::writeNextBufferToWebSession(
}
// send more data to the client if there's more
{
std::lock_guard<std::mutex> lock(m_mutex);
auto lock = std::lock_guard<std::mutex>(m_mutex);
sessionInfo.currentlySentBuffers.clear();
// tell the client it's over when the process exited and we've sent all the output
if (sessionInfo.outstandingBuffersToSend.empty()) {
if (m_exited.load()) {
boost::beast::net::async_write(session.socket(), boost::beast::http::make_chunk_last(),
std::bind(&WebAPI::Session::responded, session.shared_from_this(), std::placeholders::_1, std::placeholders::_2, true));
}
if (sessionInfo.outstandingBuffersToSend.empty() && m_exited.load()) {
boost::beast::net::async_write(session.socket(), boost::beast::http::make_chunk_last(),
std::bind(&WebAPI::Session::responded, session.shared_from_this(), std::placeholders::_1, std::placeholders::_2, true));
return;
}
sessionInfo.currentlySentBuffers.swap(sessionInfo.outstandingBuffersToSend);
@ -274,9 +297,30 @@ void BuildProcessSession::writeNextBufferToWebSession(
sessionInfo.currentlySentBufferRefs.emplace_back(boost::asio::buffer(buffer.first.get(), buffer.second));
}
}
boost::beast::net::async_write(session.socket(), boost::beast::http::make_chunk(sessionInfo.currentlySentBufferRefs),
std::bind(&BuildProcessSession::writeNextBufferToWebSession, shared_from_this(), std::placeholders::_1, std::placeholders::_2,
std::ref(session), std::ref(sessionInfo)));
if (!sessionInfo.currentlySentBufferRefs.empty()) {
boost::beast::net::async_write(session.socket(), boost::beast::http::make_chunk(sessionInfo.currentlySentBufferRefs),
std::bind(&BuildProcessSession::writeNextBufferToWebSession, shared_from_this(), std::placeholders::_1, std::placeholders::_2,
std::ref(session), std::ref(sessionInfo)));
}
}
void BuildProcessSession::close()
{
auto lock = std::lock_guard<std::mutex>(m_mutex);
if (m_logFileBuffers.outstandingBuffersToSend.empty()) {
auto error = boost::system::error_code();
m_logFile.close(error);
if (error) {
cerr << Phrases::WarningMessage << "Error closing \"" << m_logFilePath << "\": " << error.message() << Phrases::EndFlush;
}
}
for (auto &[session, sessionInfo] : m_registeredWebSessions) {
if (!sessionInfo->outstandingBuffersToSend.empty()) {
continue;
}
boost::beast::net::async_write(session->socket(), boost::beast::http::make_chunk_last(),
std::bind(&WebAPI::Session::responded, session, std::placeholders::_1, std::placeholders::_2, true));
}
}
void BuildProcessSession::conclude()
@ -289,8 +333,12 @@ void BuildProcessSession::conclude()
if (!buildAction) {
return;
}
const auto processesLock = std::lock_guard<std::mutex>(buildAction->m_processesMutex);
buildAction->m_ongoingProcesses.erase(m_logFilePath);
if (const auto outputLock = std::lock_guard<std::mutex>(buildAction->m_outputSessionMutex); buildAction->m_outputSession.get() == this) {
buildAction->m_outputSession.reset();
} else {
const auto processesLock = std::lock_guard<std::mutex>(buildAction->m_processesMutex);
buildAction->m_ongoingProcesses.erase(m_logFilePath);
}
}
std::shared_ptr<BuildProcessSession> BuildAction::makeBuildProcess(
@ -330,9 +378,13 @@ void BuildAction::terminateOngoingBuildProcesses()
void BuildAction::streamFile(const WebAPI::Params &params, const std::string &filePath, std::string_view fileMimeType)
{
auto processesLock = std::unique_lock<std::mutex>(m_processesMutex);
auto buildProcess = findBuildProcess(filePath);
processesLock.unlock();
auto buildProcess = std::shared_ptr<BuildProcessSession>();
if (const auto outputLock = std::unique_lock<std::mutex>(m_outputSessionMutex); m_outputSession && m_outputSession->logFilePath() == filePath) {
buildProcess = m_outputSession;
} else {
const auto processesLock = std::unique_lock<std::mutex>(m_processesMutex);
buildProcess = findBuildProcess(filePath);
}
if (!buildProcess) {
// simply send the file if there's no ongoing process writing to it anymore
params.session.respond(filePath.data(), fileMimeType.data(), params.target.path);
@ -353,154 +405,31 @@ void BuildAction::streamFile(const WebAPI::Params &params, const std::string &fi
});
}
void BuildAction::streamOutput(const WebAPI::Params &params, std::size_t offset)
{
if (!m_setup) {
m_setup = &params.setup;
}
auto session = params.session.shared_from_this();
auto chunkResponse = WebAPI::Render::makeChunkResponse(params.request(), "application/octet-stream");
auto outputStreamingLock = std::unique_lock<std::mutex>(m_outputStreamingMutex);
auto &buffersForSession = m_bufferingForSession[session];
if (buffersForSession) {
return; // skip when already streaming to that session
}
buffersForSession = std::make_unique<OutputBufferingForSession>();
auto buildLock = params.setup.building.lockToRead();
buffersForSession->existingOutputSize = output.size();
buffersForSession->bytesSent = offset;
buildLock.unlock();
outputStreamingLock.unlock();
boost::beast::http::async_write_header(params.session.socket(), chunkResponse->serializer,
[buildAction = shared_from_this(), session = std::move(session), &buffering = *buffersForSession, chunkResponse](
const boost::system::error_code &error, std::size_t bytesTransferred) {
CPP_UTILITIES_UNUSED(bytesTransferred)
buildAction->continueStreamingExistingOutputToSession(std::move(session), buffering, error, 0);
});
}
void BuildAction::continueStreamingExistingOutputToSession(std::shared_ptr<WebAPI::Session> session, OutputBufferingForSession &buffering,
const boost::system::error_code &error, std::size_t bytesTransferred)
{
auto outputStreamingLock = std::unique_lock<std::mutex>(m_outputStreamingMutex);
if (error) {
m_bufferingForSession.erase(session);
return;
}
const auto bytesSent = buffering.bytesSent += bytesTransferred;
if (bytesSent >= buffering.existingOutputSize) {
buffering.currentlySentBuffers.clear();
buffering.existingOutputSent = true;
if (!buffering.outstandingBuffersToSend.empty()) {
outputStreamingLock.unlock();
continueStreamingNewOutputToSession(std::move(session), buffering, error, 0);
return;
}
if (isDone()) {
m_bufferingForSession.erase(session);
outputStreamingLock.unlock();
boost::beast::net::async_write(session->socket(), boost::beast::http::make_chunk_last(),
std::bind(&WebAPI::Session::responded, session, std::placeholders::_1, std::placeholders::_2, true));
}
return;
}
auto buffer = buffering.currentlySentBuffers.empty() ? outputStreamingBufferPool.newBuffer() : buffering.currentlySentBuffers.front().first;
const auto bytesToCopy = std::min(output.size() - bytesSent, outputStreamingBufferPool.bufferSize());
if (buffering.currentlySentBuffers.empty()) {
buffering.currentlySentBuffers.emplace_back(std::pair(buffer, bytesToCopy));
}
outputStreamingLock.unlock();
auto buildLock = m_setup->building.lockToRead();
output.copy(buffer->data(), bytesToCopy, bytesSent);
buildLock.unlock();
boost::beast::net::async_write(session->socket(), boost::beast::http::make_chunk(boost::asio::buffer(buffer->data(), bytesToCopy)),
std::bind(&BuildAction::continueStreamingExistingOutputToSession, shared_from_this(), session, std::ref(buffering), std::placeholders::_1,
std::placeholders::_2));
}
void BuildAction::continueStreamingNewOutputToSession(std::shared_ptr<WebAPI::Session> session, OutputBufferingForSession &buffering,
const boost::system::error_code &error, std::size_t bytesTransferred)
{
auto outputStreamingLock = std::unique_lock<std::mutex>(m_outputStreamingMutex);
buffering.bytesSent += bytesTransferred;
buffering.currentlySentBuffers.clear();
buffering.currentlySentBufferRefs.clear();
if (error) {
m_bufferingForSession.erase(session);
return;
}
if (buffering.outstandingBuffersToSend.empty()) {
if (isDone()) {
m_bufferingForSession.erase(session);
outputStreamingLock.unlock();
boost::beast::net::async_write(session->socket(), boost::beast::http::make_chunk_last(),
std::bind(&WebAPI::Session::responded, session, std::placeholders::_1, std::placeholders::_2, true));
}
return;
}
buffering.outstandingBuffersToSend.swap(buffering.currentlySentBuffers);
buffering.currentlySentBufferRefs.reserve(buffering.currentlySentBuffers.size());
for (const auto &currentBuffer : buffering.currentlySentBuffers) {
buffering.currentlySentBufferRefs.emplace_back(boost::asio::buffer(*currentBuffer.first, currentBuffer.second));
}
boost::beast::net::async_write(session->socket(), boost::beast::http::make_chunk(buffering.currentlySentBufferRefs),
std::bind(&BuildAction::continueStreamingNewOutputToSession, shared_from_this(), session, std::ref(buffering), std::placeholders::_1,
std::placeholders::_2));
}
template <typename OutputType> void BuildAction::appendOutput(OutputType &&output)
{
if (output.empty() || !m_setup) {
return;
}
auto lock = m_setup->building.lockToWrite();
this->output.append(output);
lock.unlock();
OutputBufferingForSession::BufferPile buffers;
for (std::size_t offset = 0; offset < output.size(); offset += buffers.back().second) {
const auto bytesToBuffer = std::min(output.size() - offset, outputStreamingBufferPool.bufferSize());
auto buffer = buffers.emplace_back(std::pair(outputStreamingBufferPool.newBuffer(), bytesToBuffer));
output.copy(buffer.first->data(), bytesToBuffer, offset);
}
auto outputStreamingLock = std::unique_lock<std::mutex>(m_outputStreamingMutex);
for (auto &bufferingForSession : m_bufferingForSession) {
auto &buffering = bufferingForSession.second;
auto &currentlySentBuffers = buffering->currentlySentBuffers;
if (currentlySentBuffers.empty() && buffering->existingOutputSent) {
auto &session = bufferingForSession.first;
auto &currentlySentBufferRefs = buffering->currentlySentBufferRefs;
currentlySentBuffers.insert(currentlySentBuffers.end(), buffers.begin(), buffers.end());
for (const auto &buffer : buffers) {
currentlySentBufferRefs.emplace_back(boost::asio::buffer(buffer.first->data(), buffer.second));
}
boost::beast::net::async_write(session->socket(), boost::beast::http::make_chunk(currentlySentBufferRefs),
std::bind(&BuildAction::continueStreamingNewOutputToSession, shared_from_this(), session, std::ref(*buffering), std::placeholders::_1,
std::placeholders::_2));
} else {
auto &outstandingBuffersToSend = buffering->outstandingBuffersToSend;
outstandingBuffersToSend.insert(outstandingBuffersToSend.end(), buffers.begin(), buffers.end());
}
}
}
/*!
* \brief Internally called to append output and spread it to all waiting sessions.
*/
void BuildAction::appendOutput(std::string &&output)
{
appendOutput<std::string>(std::move(output));
}
/*!
* \brief Internally called to append output and spread it to all waiting sessions.
*/
void BuildAction::appendOutput(std::string_view output)
{
appendOutput<std::string_view>(std::forward<std::string_view>(output));
if (output.empty() || !m_setup) {
return;
}
auto outputLock = std::unique_lock<std::mutex>(m_outputSessionMutex);
if (!m_outputSession) {
m_outputSession = std::make_shared<BuildProcessSession>(
this, m_setup->building.ioContext, argsToString("Output of build action ", id), argsToString("logs/build-action-", id, ".log"));
m_outputSession->prepareLogFile();
if (m_outputSession->result.errorCode) {
std::cerr << Phrases::ErrorMessage << "Unable to open output logfile for build action " << id << ": " << m_outputSession->result.error
<< Phrases::EndFlush;
return;
}
const auto buildingLock = m_setup->building.lockToWrite();
logfiles.emplace_back(m_outputSession->logFilePath());
}
if (!m_outputSession->result.errorCode) {
m_outputSession->writeData(output);
}
}
} // namespace LibRepoMgr

View File

@ -112,22 +112,6 @@ template <typename StorageType> inline std::size_t BufferPool<StorageType>::stor
return m_buffers.size();
}
/// \brief The OutputBufferingForSession struct holds buffers used by the BuildAction live-streaming.
struct LIBREPOMGR_EXPORT OutputBufferingForSession {
static constexpr std::size_t bufferSize = 4096;
using StorageType = std::array<char, bufferSize>;
using BufferPoolType = BufferPool<StorageType>;
using BufferType = BufferPoolType::BufferType;
using BufferPile = std::vector<std::pair<BufferType, std::size_t>>;
using BufferRefs = std::vector<boost::asio::const_buffer>;
BufferPile currentlySentBuffers;
BufferPile outstandingBuffersToSend;
BufferRefs currentlySentBufferRefs;
std::atomic<std::size_t> bytesSent = 0;
std::atomic<std::size_t> existingOutputSize = 0;
std::atomic_bool existingOutputSent = false;
};
/// \brief The BuildProcessSession class spawns a process associated with a build action.
/// The process output is make available as a logfile of the build action allowing live-steaming.
class LIBREPOMGR_EXPORT BuildProcessSession : public std::enable_shared_from_this<BuildProcessSession>, public BaseProcessSession {
@ -138,11 +122,15 @@ public:
using BufferType = BufferPoolType::BufferType;
explicit BuildProcessSession(BuildAction *buildAction, boost::asio::io_context &ioContext, std::string &&displayName, std::string &&logFilePath,
Handler &&handler, AssociatedLocks &&locks = AssociatedLocks());
Handler &&handler = Handler(), AssociatedLocks &&locks = AssociatedLocks());
template <typename... ChildArgs> void launch(ChildArgs &&...childArgs);
void registerWebSession(std::shared_ptr<WebAPI::Session> &&webSession);
void registerNewDataHandler(std::function<void(BufferType, std::size_t)> &&handler);
void prepareLogFile();
void writeData(std::string_view data);
void writeEnd();
AssociatedLocks &locks();
const std::string &logFilePath() const;
bool hasExited() const;
private:
@ -171,12 +159,13 @@ private:
boost::asio::posix::stream_descriptor m_descriptor;
};
void prpareLogFile();
void readMoreFromPipe();
void writeDataFromPipe(boost::system::error_code ec, std::size_t bytesRead);
void writeDataFromPipe(boost::system::error_code ec, std::size_t bytesTransferred);
void writeCurrentBuffer(std::size_t bytesTransferred);
void writeNextBufferToLogFile(const boost::system::error_code &error, std::size_t bytesTransferred);
void writeNextBufferToWebSession(
const boost::system::error_code &error, std::size_t bytesTransferred, WebAPI::Session &session, BuffersToWrite &sessionInfo);
void close();
void conclude();
std::weak_ptr<BuildAction> m_buildAction;
@ -224,6 +213,11 @@ inline AssociatedLocks &BuildProcessSession::locks()
return m_locks;
}
inline const std::string &BuildProcessSession::logFilePath() const
{
return m_logFilePath;
}
inline bool BuildProcessSession::hasExited() const
{
return m_exited.load();
@ -231,7 +225,7 @@ inline bool BuildProcessSession::hasExited() const
template <typename... ChildArgs> void BuildProcessSession::launch(ChildArgs &&...childArgs)
{
prpareLogFile();
prepareLogFile();
if (result.errorCode) {
conclude();
return;

View File

@ -20,7 +20,6 @@ void MakeLicenseInfo::run()
std::get<std::shared_lock<std::shared_mutex>>(configReadLock).unlock();
const auto buildActionWriteLock = m_setup.building.lockToWrite();
m_buildAction->outputMimeType = "application/json";
m_buildAction->resultData = std::move(result);
reportResult(result.success ? BuildActionResult::Success : BuildActionResult::Failure);
}

View File

@ -62,6 +62,9 @@ inline BaseProcessSession::BaseProcessSession(boost::asio::io_context &ioContext
inline BaseProcessSession::~BaseProcessSession()
{
if (!m_handler) {
return;
}
boost::asio::post(m_ioContext, [child = std::move(this->child), result = std::move(this->result), handler = std::move(m_handler)]() mutable {
handler(std::move(child), std::move(result));
});

View File

@ -230,8 +230,11 @@ void BuildActionsTests::testLogging()
m_buildAction->log()(Phrases::ErrorMessage, "some error: ", "message", '\n');
m_buildAction->log()(Phrases::InfoMessage, "info", '\n');
}
CPPUNIT_ASSERT_EQUAL_MESSAGE("messages added to build action output",
"\e[1;31m==> ERROR: \e[0m\e[1msome error: message\n\e[1;37m==> \e[0m\e[1minfo\n"s, m_buildAction->output);
m_buildAction->conclude(BuildActionResult::Success);
m_setup.building.ioContext.run();
const auto output = readFile("logs/build-action-0.log");
CPPUNIT_ASSERT_EQUAL_MESSAGE(
"messages added to build action output", "\e[1;31m==> ERROR: \e[0m\e[1msome error: message\n\e[1;37m==> \e[0m\e[1minfo\n"s, output);
}
/*!
@ -258,7 +261,7 @@ void BuildActionsTests::testProcessSession()
*/
void BuildActionsTests::testBuildActionProcess()
{
m_buildAction = std::make_shared<BuildAction>(0, &m_setup);
m_buildAction = std::make_shared<BuildAction>(1, &m_setup);
const auto scriptPath = testFilePath("scripts/print_some_data.sh");
const auto logFilePath = std::filesystem::path(TestApplication::instance()->workingDirectory()) / "logfile.log";
@ -269,11 +272,11 @@ void BuildActionsTests::testBuildActionProcess()
auto &ioc = m_setup.building.ioContext;
auto session = std::make_shared<BuildProcessSession>(
m_buildAction.get(), ioc, "test", std::string(logFilePath), [&ioc](boost::process::child &&child, ProcessResult &&result) {
m_buildAction.get(), ioc, "test", std::string(logFilePath), [this](boost::process::child &&child, ProcessResult &&result) {
CPPUNIT_ASSERT_EQUAL(std::error_code(), result.errorCode);
CPPUNIT_ASSERT_EQUAL(0, result.exitCode);
CPPUNIT_ASSERT_GREATER(0, child.native_handle());
ioc.stop();
m_buildAction->conclude(BuildActionResult::Success);
});
session->launch(scriptPath);
session.reset();
@ -281,11 +284,12 @@ void BuildActionsTests::testBuildActionProcess()
const auto logFile = readFile(logFilePath);
const auto logLines = splitStringSimple<std::vector<std::string_view>>(logFile, "\r\n");
const auto output = readFile("logs/build-action-1.log");
CPPUNIT_ASSERT_EQUAL(5002_st, logLines.size());
CPPUNIT_ASSERT_EQUAL("printing some numbers"sv, logLines.front());
CPPUNIT_ASSERT_EQUAL_MESSAGE("trailing line break", ""sv, logLines.back());
CPPUNIT_ASSERT_EQUAL_MESSAGE("last line", "line 5000"sv, logLines[logLines.size() - 2u]);
TESTUTILS_ASSERT_LIKE_FLAGS("PID logged", ".*Launched \"test\", PID\\: [0-9]+.*\n.*"s, std::regex::extended, m_buildAction->output);
TESTUTILS_ASSERT_LIKE_FLAGS("PID logged", ".*Launched \"test\", PID\\: [0-9]+.*\n.*"s, std::regex::extended, output);
}
/*!

View File

@ -18,7 +18,6 @@ LIBREPOMGR_EXPORT void getUnresolved(const Params &params, ResponseHandler &&han
LIBREPOMGR_EXPORT void getPackages(const Params &params, ResponseHandler &&handler);
LIBREPOMGR_EXPORT void getBuildActions(const Params &params, ResponseHandler &&handler);
LIBREPOMGR_EXPORT void getBuildActionDetails(const Params &params, ResponseHandler &&handler);
LIBREPOMGR_EXPORT void getBuildActionOutput(const Params &params, ResponseHandler &&handler);
LIBREPOMGR_EXPORT void getBuildActionLogFile(const Params &params, ResponseHandler &&handler);
LIBREPOMGR_EXPORT void getBuildActionArtefact(const Params &params, ResponseHandler &&handler);
LIBREPOMGR_EXPORT void postLoadPackages(const Params &params, ResponseHandler &&handler);

View File

@ -100,36 +100,6 @@ void getBuildActionDetails(const Params &params, ResponseHandler &&handler)
handler(makeJson(params.request(), jsonDoc, params.target.hasPrettyFlag()));
}
void getBuildActionOutput(const Params &params, ResponseHandler &&handler)
{
const auto offsetParams = params.target.decodeValues("offset");
std::size_t offset = 0;
if (offsetParams.size() > 1) {
handler(makeBadRequest(params.request(), "the offset parameter must be specified at most once"));
return;
}
if (!offsetParams.empty()) {
try {
offset = stringToNumber<std::size_t>(offsetParams.front());
} catch (const ConversionException &) {
handler(makeBadRequest(params.request(), "the offset must be an unsigned integer"));
return;
}
}
auto buildActionsSearchResult = findBuildActions(params, std::move(handler), false, 1);
if (!buildActionsSearchResult.ok) {
return;
}
auto &buildAction = buildActionsSearchResult.actions.front();
if (offset > buildAction->output.size()) {
buildActionsSearchResult.lock = std::monostate{};
handler(makeBadRequest(params.request(), "the offset must not exceed the output size"));
return;
}
buildActionsSearchResult.lock = std::monostate{};
buildAction->streamOutput(params, offset);
}
static std::string readNameParam(const Params &params, ResponseHandler &&handler)
{
const auto nameParams = params.target.decodeValues("name");

View File

@ -33,7 +33,6 @@ const Router Server::s_router = {
{ { http::verb::get, "/api/v0/build-action" }, Route{&Routes::getBuildActions} },
{ { http::verb::delete_, "/api/v0/build-action" }, Route{&Routes::deleteBuildActions, UserPermissions::ModifyBuildActions} },
{ { http::verb::get, "/api/v0/build-action/details" }, Route{&Routes::getBuildActionDetails, UserPermissions::ReadBuildActionsDetails} },
{ { http::verb::get, "/api/v0/build-action/output" }, Route{&Routes::getBuildActionOutput, UserPermissions::ReadBuildActionsDetails} },
{ { http::verb::get, "/api/v0/build-action/logfile" }, Route{&Routes::getBuildActionLogFile, UserPermissions::ReadBuildActionsDetails} },
{ { http::verb::get, "/api/v0/build-action/artefact" }, Route{&Routes::getBuildActionArtefact, UserPermissions::ReadBuildActionsDetails} },
{ { http::verb::post, "/api/v0/build-action" }, Route{&Routes::postBuildAction, UserPermissions::ModifyBuildActions} },

View File

@ -463,8 +463,8 @@ function renderBuildActionDetailsTable(buildActionDetails)
{
return GenericRendering.renderTableFromJsonObject({
data: buildActionDetails,
displayLabels: ['ID', 'Task', 'Type', 'Status', 'Result', 'Result data', 'Created', 'Started', 'Finished', 'Start after', 'Directory', 'Source repo', 'Destination repo', 'Packages', 'Flags', 'Settings', 'Log files', 'Artefacts', 'Output'],
fieldAccessors: ['id', 'taskName', 'type', 'status', 'result', 'resultData', 'created', 'started', 'finished', 'startAfter', 'directory', 'sourceDbs', 'destinationDbs', 'packageNames', 'flags', 'settings', 'logfiles', 'artefacts', 'output'],
displayLabels: ['ID', 'Task', 'Type', 'Status', 'Result', 'Result data', 'Created', 'Started', 'Finished', 'Start after', 'Directory', 'Source repo', 'Destination repo', 'Packages', 'Flags', 'Settings', 'Log files', 'Artefacts'],
fieldAccessors: ['id', 'taskName', 'type', 'status', 'result', 'resultData', 'created', 'started', 'finished', 'startAfter', 'directory', 'sourceDbs', 'destinationDbs', 'packageNames', 'flags', 'settings', 'logfiles', 'artefacts'],
customRenderer: {
taskName: function (value) {
if (!value) {
@ -602,28 +602,6 @@ function renderBuildActionDetailsTable(buildActionDetails)
},
logfiles: renderBuildActionLogFiles,
artefacts: renderBuildActionArtefacts,
output: function(value, row) {
const isFinished = row.status === 4;
if (!value && isFinished) {
return GenericRendering.renderNoneInGrey();
}
const targetElement = document.createElement('div');
if (isFinished) {
const terminal = Terminal.makeTerminal();
Terminal.setupTerminalLater(terminal, targetElement, value);
return targetElement;
}
const streamingSetup = setupTerminalForStreaming({
id: 'output-' + row.id,
targetElement: targetElement,
path: '/build-action/output?id=' + encodeURIComponent(row.id) + '&offset=' + encodeURIComponent(value.length),
});
streamingSetup.startStreaming();
const terminal = streamingSetup.terminal();
terminal.setOption('convertEol', true);
terminal.write(value);
return streamingSetup.elements;
},
},
});
}