diff --git a/librepomgr/CMakeLists.txt b/librepomgr/CMakeLists.txt index c34a917..d1a9a64 100644 --- a/librepomgr/CMakeLists.txt +++ b/librepomgr/CMakeLists.txt @@ -90,7 +90,7 @@ set(BOOST_ARGS "REQUIRED;COMPONENTS;system;filesystem;iostreams") use_package(TARGET_NAME Boost::system PACKAGE_NAME Boost PACKAGE_ARGS "${BOOST_ARGS}") use_package(TARGET_NAME Boost::filesystem PACKAGE_NAME Boost PACKAGE_ARGS "${BOOST_ARGS}") use_package(TARGET_NAME Boost::iostreams PACKAGE_NAME Boost PACKAGE_ARGS "${BOOST_ARGS}") -option(BOOST_ASIO_IO_URING OFF "enable use of io_uring") +option(BOOST_ASIO_IO_URING ON "enable use of io_uring") if (BOOST_ASIO_IO_URING) list(APPEND META_PUBLIC_COMPILE_DEFINITIONS BOOST_ASIO_HAS_IO_URING BOOST_ASIO_DISABLE_EPOLL) use_pkg_config_module(PKG_CONFIG_MODULES "liburing" VISIBILITY PUBLIC) diff --git a/librepomgr/buildactions/buildactionlivestreaming.cpp b/librepomgr/buildactions/buildactionlivestreaming.cpp index 4f6ef93..773e65e 100644 --- a/librepomgr/buildactions/buildactionlivestreaming.cpp +++ b/librepomgr/buildactions/buildactionlivestreaming.cpp @@ -24,34 +24,45 @@ void BuildProcessSession::DataForWebSession::streamFile( { error = false; - boost::beast::error_code error; - m_file.open(filePath.data(), boost::beast::file_mode::scan, error); - if (error) { - cerr << Phrases::WarningMessage << "Unable to open \"" << filePath << "\": " << error.message() << Phrases::EndFlush; +#ifdef BOOST_ASIO_HAS_FILE + auto ec = boost::system::error_code(); + m_fileStream.open(filePath, boost::asio::stream_file::read_only, ec); +#else + auto error = boost::beast::error_code(); + m_file.open(filePath.data(), boost::beast::file_mode::scan, ec); +#endif + if (ec) { + cerr << Phrases::WarningMessage << "Unable to open \"" << filePath << "\": " << ec.message() << Phrases::EndFlush; return; } - const auto fileSize = m_file.size(error); +#ifdef BOOST_ASIO_HAS_FILE + const auto fileSize = m_fileStream.size(ec); +#else + const auto fileSize = m_file.size(ec); +#endif m_bytesToSendFromFile.store(fileSize); lock.unlock(); - if (error) { - cerr << Phrases::WarningMessage << "Unable to determine size of \"" << filePath << "\": " << error.message() << Phrases::EndFlush; + if (ec) { + cerr << Phrases::WarningMessage << "Unable to determine size of \"" << filePath << "\": " << ec.message() << Phrases::EndFlush; return; } - m_descriptor.assign(m_file.native_handle(), error); - if (error) { +#ifndef BOOST_ASIO_HAS_FILE + m_fileStream.assign(m_file.native_handle(), ec); + if (ec) { m_bytesToSendFromFile.store(0); - cerr << Phrases::WarningMessage << "Unable to assign descriptor for \"" << filePath << "\": " << error.message() << Phrases::EndFlush; + cerr << Phrases::WarningMessage << "Unable to assign descriptor for \"" << filePath << "\": " << ec.message() << Phrases::EndFlush; return; } - m_descriptor.non_blocking(true, error); - if (error) { + m_fileStream.non_blocking(true, ec); + if (ec) { m_bytesToSendFromFile.store(0); - cerr << Phrases::WarningMessage << "Unable to set descriptor for \"" << filePath << "\" to non-blocking mode: " << error.message() + cerr << Phrases::WarningMessage << "Unable to set descriptor for \"" << filePath << "\" to non-blocking mode: " << ec.message() << Phrases::EndFlush; return; } +#endif m_fileBuffer = m_session.m_bufferPool.newBuffer(); - m_descriptor.async_read_some(boost::asio::buffer(*m_fileBuffer, sizeof(std::min(fileSize, m_session.m_bufferPool.bufferSize()))), + m_fileStream.async_read_some(boost::asio::buffer(*m_fileBuffer, sizeof(std::min(fileSize, m_session.m_bufferPool.bufferSize()))), std::bind(&DataForWebSession::writeFileData, this, std::ref(filePath), std::move(session), std::placeholders::_1, std::placeholders::_2)); } @@ -65,7 +76,7 @@ void BuildProcessSession::DataForWebSession::writeFileData( return; } else if (eof) { boost::system::error_code ec; - m_descriptor.close(ec); + m_fileStream.close(ec); } // send file data to web client if (bytesTransferred > m_bytesToSendFromFile) { @@ -95,7 +106,7 @@ void BuildProcessSession::DataForWebSession::writeFileData( return; } // continue reading if there's more data - m_descriptor.async_read_some(boost::asio::buffer(*m_fileBuffer, sizeof(std::min(bytesLeftToRead, m_session.m_bufferPool.bufferSize()))), + m_fileStream.async_read_some(boost::asio::buffer(*m_fileBuffer, sizeof(std::min(bytesLeftToRead, m_session.m_bufferPool.bufferSize()))), std::bind( &DataForWebSession::writeFileData, this, std::ref(filePath), std::move(session), std::placeholders::_1, std::placeholders::_2)); }); @@ -148,6 +159,16 @@ void BuildProcessSession::prepareLogFile() } } // open logfile and a "file descriptor" for writing in a non-blocking way +#ifdef BOOST_ASIO_HAS_FILE + auto ec = boost::system::error_code(); + m_logFileStream.open( + m_logFilePath, boost::asio::stream_file::write_only | boost::asio::stream_file::create | boost::asio::stream_file::truncate, ec); + if (ec) { + result.errorCode = std::error_code(ec.value(), ec.category()); + result.error = CppUtilities::argsToString("unable to open \"", m_logFilePath, ": ", ec.message()); + return; + } +#else auto ec = boost::beast::error_code(); m_logFile.open(m_logFilePath.data(), boost::beast::file_mode::write, ec); if (ec) { @@ -164,6 +185,7 @@ void BuildProcessSession::prepareLogFile() result.error = CppUtilities::argsToString("unable to prepare descriptor for \"", m_logFilePath, ": ", e.what()); return; } +#endif } void BuildProcessSession::readMoreFromPipe() @@ -201,7 +223,7 @@ void BuildProcessSession::writeCurrentBuffer(std::size_t bytesTransferred) if (!m_logFileBuffers.error) { if (m_logFileBuffers.currentlySentBuffers.empty()) { m_logFileBuffers.currentlySentBuffers.emplace_back(std::pair(m_buffer, bytesTransferred)); - boost::asio::async_write(m_logFileDescriptor, boost::asio::buffer(m_buffer.get(), bytesTransferred), + boost::asio::async_write(m_logFileStream, boost::asio::buffer(m_buffer.get(), bytesTransferred), std::bind(&BuildProcessSession::writeNextBufferToLogFile, shared_from_this(), std::placeholders::_1, std::placeholders::_2)); } else { m_logFileBuffers.outstandingBuffersToSend.emplace_back(std::pair(m_buffer, bytesTransferred)); @@ -252,11 +274,7 @@ void BuildProcessSession::writeNextBufferToLogFile(const boost::system::error_co m_logFileBuffers.currentlySentBuffers.clear(); // close the logfile when the process exited and we've written all the output if (m_logFileBuffers.outstandingBuffersToSend.empty() && m_exited.load()) { - auto closeError = boost::system::error_code(); - m_logFile.close(closeError); - if (closeError) { - cerr << Phrases::WarningMessage << "Error closing \"" << m_logFilePath << "\": " << closeError.message() << Phrases::EndFlush; - } + closeLogFile(); return; } m_logFileBuffers.currentlySentBuffers.swap(m_logFileBuffers.outstandingBuffersToSend); @@ -265,7 +283,7 @@ void BuildProcessSession::writeNextBufferToLogFile(const boost::system::error_co m_logFileBuffers.currentlySentBufferRefs.emplace_back(boost::asio::buffer(buffer.first.get(), buffer.second)); } } - boost::asio::async_write(m_logFileDescriptor, m_logFileBuffers.currentlySentBufferRefs, + boost::asio::async_write(m_logFileStream, m_logFileBuffers.currentlySentBufferRefs, std::bind(&BuildProcessSession::writeNextBufferToLogFile, shared_from_this(), std::placeholders::_1, std::placeholders::_2)); } @@ -304,15 +322,24 @@ void BuildProcessSession::writeNextBufferToWebSession( } } +void BuildProcessSession::closeLogFile() +{ + auto ec = boost::system::error_code(); +#ifdef BOOST_ASIO_HAS_FILE + m_logFileStream.close(ec); +#else + m_logFile.close(ec); +#endif + if (ec) { + cerr << Phrases::WarningMessage << "Error closing \"" << m_logFilePath << "\": " << ec.message() << Phrases::EndFlush; + } +} + void BuildProcessSession::close() { auto lock = std::lock_guard(m_mutex); if (m_logFileBuffers.outstandingBuffersToSend.empty()) { - auto error = boost::system::error_code(); - m_logFile.close(error); - if (error) { - cerr << Phrases::WarningMessage << "Error closing \"" << m_logFilePath << "\": " << error.message() << Phrases::EndFlush; - } + closeLogFile(); } for (auto &[session, sessionInfo] : m_registeredWebSessions) { if (!sessionInfo->outstandingBuffersToSend.empty()) { diff --git a/librepomgr/buildactions/buildactionprivate.h b/librepomgr/buildactions/buildactionprivate.h index 07b8b76..c7aa022 100644 --- a/librepomgr/buildactions/buildactionprivate.h +++ b/librepomgr/buildactions/buildactionprivate.h @@ -19,7 +19,10 @@ #include #include +#include +#ifndef BOOST_ASIO_HAS_FILE #include +#endif #include #include #include @@ -112,6 +115,12 @@ template inline std::size_t BufferPool::stor return m_buffers.size(); } +#ifdef BOOST_ASIO_HAS_FILE +using AsioFileStream = boost::asio::stream_file; +#else +using AsioFileStream = boost::asio::posix::stream_descriptor; +#endif + /// \brief The BuildProcessSession class spawns a process associated with a build action. /// The process output is make available as a logfile of the build action allowing live-steaming. class LIBREPOMGR_EXPORT BuildProcessSession : public std::enable_shared_from_this, public BaseProcessSession { @@ -154,9 +163,11 @@ private: BuildProcessSession &m_session; std::atomic m_bytesToSendFromFile = 0; +#ifndef BOOST_ASIO_HAS_FILE boost::beast::file m_file; +#endif BufferType m_fileBuffer; - boost::asio::posix::stream_descriptor m_descriptor; + AsioFileStream m_fileStream; }; void readMoreFromPipe(); @@ -165,6 +176,7 @@ private: void writeNextBufferToLogFile(const boost::system::error_code &error, std::size_t bytesTransferred); void writeNextBufferToWebSession( const boost::system::error_code &error, std::size_t bytesTransferred, WebAPI::Session &session, BuffersToWrite &sessionInfo); + void closeLogFile(); void close(); void conclude(); @@ -174,8 +186,10 @@ private: BufferType m_buffer; std::string m_displayName; std::string m_logFilePath; +#ifndef BOOST_ASIO_HAS_FILE boost::beast::file m_logFile; - boost::asio::posix::stream_descriptor m_logFileDescriptor; +#endif + AsioFileStream m_logFileStream; std::mutex m_mutex; BuffersToWrite m_logFileBuffers; std::unordered_map, std::unique_ptr> m_registeredWebSessions; @@ -186,7 +200,7 @@ private: inline BuildProcessSession::DataForWebSession::DataForWebSession(BuildProcessSession &session) : m_session(session) - , m_descriptor(session.m_ioContext) + , m_fileStream(session.m_ioContext) { } @@ -203,7 +217,7 @@ inline BuildProcessSession::BuildProcessSession(BuildAction *buildAction, boost: , m_bufferPool(bufferSize) , m_displayName(std::move(displayName)) , m_logFilePath(std::move(logFilePath)) - , m_logFileDescriptor(ioContext) + , m_logFileStream(ioContext) , m_locks(std::move(locks)) { }