Use io_uring for async file I/O

This commit is contained in:
Martchus 2022-02-20 18:44:07 +01:00
parent 8115721913
commit 334bb1faa9
3 changed files with 74 additions and 33 deletions

View File

@ -90,7 +90,7 @@ set(BOOST_ARGS "REQUIRED;COMPONENTS;system;filesystem;iostreams")
use_package(TARGET_NAME Boost::system PACKAGE_NAME Boost PACKAGE_ARGS "${BOOST_ARGS}")
use_package(TARGET_NAME Boost::filesystem PACKAGE_NAME Boost PACKAGE_ARGS "${BOOST_ARGS}")
use_package(TARGET_NAME Boost::iostreams PACKAGE_NAME Boost PACKAGE_ARGS "${BOOST_ARGS}")
option(BOOST_ASIO_IO_URING OFF "enable use of io_uring")
option(BOOST_ASIO_IO_URING ON "enable use of io_uring")
if (BOOST_ASIO_IO_URING)
list(APPEND META_PUBLIC_COMPILE_DEFINITIONS BOOST_ASIO_HAS_IO_URING BOOST_ASIO_DISABLE_EPOLL)
use_pkg_config_module(PKG_CONFIG_MODULES "liburing" VISIBILITY PUBLIC)

View File

@ -24,34 +24,45 @@ void BuildProcessSession::DataForWebSession::streamFile(
{
error = false;
boost::beast::error_code error;
m_file.open(filePath.data(), boost::beast::file_mode::scan, error);
if (error) {
cerr << Phrases::WarningMessage << "Unable to open \"" << filePath << "\": " << error.message() << Phrases::EndFlush;
#ifdef BOOST_ASIO_HAS_FILE
auto ec = boost::system::error_code();
m_fileStream.open(filePath, boost::asio::stream_file::read_only, ec);
#else
auto error = boost::beast::error_code();
m_file.open(filePath.data(), boost::beast::file_mode::scan, ec);
#endif
if (ec) {
cerr << Phrases::WarningMessage << "Unable to open \"" << filePath << "\": " << ec.message() << Phrases::EndFlush;
return;
}
const auto fileSize = m_file.size(error);
#ifdef BOOST_ASIO_HAS_FILE
const auto fileSize = m_fileStream.size(ec);
#else
const auto fileSize = m_file.size(ec);
#endif
m_bytesToSendFromFile.store(fileSize);
lock.unlock();
if (error) {
cerr << Phrases::WarningMessage << "Unable to determine size of \"" << filePath << "\": " << error.message() << Phrases::EndFlush;
if (ec) {
cerr << Phrases::WarningMessage << "Unable to determine size of \"" << filePath << "\": " << ec.message() << Phrases::EndFlush;
return;
}
m_descriptor.assign(m_file.native_handle(), error);
if (error) {
#ifndef BOOST_ASIO_HAS_FILE
m_fileStream.assign(m_file.native_handle(), ec);
if (ec) {
m_bytesToSendFromFile.store(0);
cerr << Phrases::WarningMessage << "Unable to assign descriptor for \"" << filePath << "\": " << error.message() << Phrases::EndFlush;
cerr << Phrases::WarningMessage << "Unable to assign descriptor for \"" << filePath << "\": " << ec.message() << Phrases::EndFlush;
return;
}
m_descriptor.non_blocking(true, error);
if (error) {
m_fileStream.non_blocking(true, ec);
if (ec) {
m_bytesToSendFromFile.store(0);
cerr << Phrases::WarningMessage << "Unable to set descriptor for \"" << filePath << "\" to non-blocking mode: " << error.message()
cerr << Phrases::WarningMessage << "Unable to set descriptor for \"" << filePath << "\" to non-blocking mode: " << ec.message()
<< Phrases::EndFlush;
return;
}
#endif
m_fileBuffer = m_session.m_bufferPool.newBuffer();
m_descriptor.async_read_some(boost::asio::buffer(*m_fileBuffer, sizeof(std::min(fileSize, m_session.m_bufferPool.bufferSize()))),
m_fileStream.async_read_some(boost::asio::buffer(*m_fileBuffer, sizeof(std::min(fileSize, m_session.m_bufferPool.bufferSize()))),
std::bind(&DataForWebSession::writeFileData, this, std::ref(filePath), std::move(session), std::placeholders::_1, std::placeholders::_2));
}
@ -65,7 +76,7 @@ void BuildProcessSession::DataForWebSession::writeFileData(
return;
} else if (eof) {
boost::system::error_code ec;
m_descriptor.close(ec);
m_fileStream.close(ec);
}
// send file data to web client
if (bytesTransferred > m_bytesToSendFromFile) {
@ -95,7 +106,7 @@ void BuildProcessSession::DataForWebSession::writeFileData(
return;
}
// continue reading if there's more data
m_descriptor.async_read_some(boost::asio::buffer(*m_fileBuffer, sizeof(std::min(bytesLeftToRead, m_session.m_bufferPool.bufferSize()))),
m_fileStream.async_read_some(boost::asio::buffer(*m_fileBuffer, sizeof(std::min(bytesLeftToRead, m_session.m_bufferPool.bufferSize()))),
std::bind(
&DataForWebSession::writeFileData, this, std::ref(filePath), std::move(session), std::placeholders::_1, std::placeholders::_2));
});
@ -148,6 +159,16 @@ void BuildProcessSession::prepareLogFile()
}
}
// open logfile and a "file descriptor" for writing in a non-blocking way
#ifdef BOOST_ASIO_HAS_FILE
auto ec = boost::system::error_code();
m_logFileStream.open(
m_logFilePath, boost::asio::stream_file::write_only | boost::asio::stream_file::create | boost::asio::stream_file::truncate, ec);
if (ec) {
result.errorCode = std::error_code(ec.value(), ec.category());
result.error = CppUtilities::argsToString("unable to open \"", m_logFilePath, ": ", ec.message());
return;
}
#else
auto ec = boost::beast::error_code();
m_logFile.open(m_logFilePath.data(), boost::beast::file_mode::write, ec);
if (ec) {
@ -164,6 +185,7 @@ void BuildProcessSession::prepareLogFile()
result.error = CppUtilities::argsToString("unable to prepare descriptor for \"", m_logFilePath, ": ", e.what());
return;
}
#endif
}
void BuildProcessSession::readMoreFromPipe()
@ -201,7 +223,7 @@ void BuildProcessSession::writeCurrentBuffer(std::size_t bytesTransferred)
if (!m_logFileBuffers.error) {
if (m_logFileBuffers.currentlySentBuffers.empty()) {
m_logFileBuffers.currentlySentBuffers.emplace_back(std::pair(m_buffer, bytesTransferred));
boost::asio::async_write(m_logFileDescriptor, boost::asio::buffer(m_buffer.get(), bytesTransferred),
boost::asio::async_write(m_logFileStream, boost::asio::buffer(m_buffer.get(), bytesTransferred),
std::bind(&BuildProcessSession::writeNextBufferToLogFile, shared_from_this(), std::placeholders::_1, std::placeholders::_2));
} else {
m_logFileBuffers.outstandingBuffersToSend.emplace_back(std::pair(m_buffer, bytesTransferred));
@ -252,11 +274,7 @@ void BuildProcessSession::writeNextBufferToLogFile(const boost::system::error_co
m_logFileBuffers.currentlySentBuffers.clear();
// close the logfile when the process exited and we've written all the output
if (m_logFileBuffers.outstandingBuffersToSend.empty() && m_exited.load()) {
auto closeError = boost::system::error_code();
m_logFile.close(closeError);
if (closeError) {
cerr << Phrases::WarningMessage << "Error closing \"" << m_logFilePath << "\": " << closeError.message() << Phrases::EndFlush;
}
closeLogFile();
return;
}
m_logFileBuffers.currentlySentBuffers.swap(m_logFileBuffers.outstandingBuffersToSend);
@ -265,7 +283,7 @@ void BuildProcessSession::writeNextBufferToLogFile(const boost::system::error_co
m_logFileBuffers.currentlySentBufferRefs.emplace_back(boost::asio::buffer(buffer.first.get(), buffer.second));
}
}
boost::asio::async_write(m_logFileDescriptor, m_logFileBuffers.currentlySentBufferRefs,
boost::asio::async_write(m_logFileStream, m_logFileBuffers.currentlySentBufferRefs,
std::bind(&BuildProcessSession::writeNextBufferToLogFile, shared_from_this(), std::placeholders::_1, std::placeholders::_2));
}
@ -304,15 +322,24 @@ void BuildProcessSession::writeNextBufferToWebSession(
}
}
void BuildProcessSession::closeLogFile()
{
auto ec = boost::system::error_code();
#ifdef BOOST_ASIO_HAS_FILE
m_logFileStream.close(ec);
#else
m_logFile.close(ec);
#endif
if (ec) {
cerr << Phrases::WarningMessage << "Error closing \"" << m_logFilePath << "\": " << ec.message() << Phrases::EndFlush;
}
}
void BuildProcessSession::close()
{
auto lock = std::lock_guard<std::mutex>(m_mutex);
if (m_logFileBuffers.outstandingBuffersToSend.empty()) {
auto error = boost::system::error_code();
m_logFile.close(error);
if (error) {
cerr << Phrases::WarningMessage << "Error closing \"" << m_logFilePath << "\": " << error.message() << Phrases::EndFlush;
}
closeLogFile();
}
for (auto &[session, sessionInfo] : m_registeredWebSessions) {
if (!sessionInfo->outstandingBuffersToSend.empty()) {

View File

@ -19,7 +19,10 @@
#include <unordered_map>
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/stream_file.hpp>
#ifndef BOOST_ASIO_HAS_FILE
#include <boost/asio/posix/stream_descriptor.hpp>
#endif
#include <boost/beast/core/file.hpp>
#include <boost/filesystem/path.hpp>
#include <boost/process/extend.hpp>
@ -112,6 +115,12 @@ template <typename StorageType> inline std::size_t BufferPool<StorageType>::stor
return m_buffers.size();
}
#ifdef BOOST_ASIO_HAS_FILE
using AsioFileStream = boost::asio::stream_file;
#else
using AsioFileStream = boost::asio::posix::stream_descriptor;
#endif
/// \brief The BuildProcessSession class spawns a process associated with a build action.
/// The process output is make available as a logfile of the build action allowing live-steaming.
class LIBREPOMGR_EXPORT BuildProcessSession : public std::enable_shared_from_this<BuildProcessSession>, public BaseProcessSession {
@ -154,9 +163,11 @@ private:
BuildProcessSession &m_session;
std::atomic<std::size_t> m_bytesToSendFromFile = 0;
#ifndef BOOST_ASIO_HAS_FILE
boost::beast::file m_file;
#endif
BufferType m_fileBuffer;
boost::asio::posix::stream_descriptor m_descriptor;
AsioFileStream m_fileStream;
};
void readMoreFromPipe();
@ -165,6 +176,7 @@ private:
void writeNextBufferToLogFile(const boost::system::error_code &error, std::size_t bytesTransferred);
void writeNextBufferToWebSession(
const boost::system::error_code &error, std::size_t bytesTransferred, WebAPI::Session &session, BuffersToWrite &sessionInfo);
void closeLogFile();
void close();
void conclude();
@ -174,8 +186,10 @@ private:
BufferType m_buffer;
std::string m_displayName;
std::string m_logFilePath;
#ifndef BOOST_ASIO_HAS_FILE
boost::beast::file m_logFile;
boost::asio::posix::stream_descriptor m_logFileDescriptor;
#endif
AsioFileStream m_logFileStream;
std::mutex m_mutex;
BuffersToWrite m_logFileBuffers;
std::unordered_map<std::shared_ptr<WebAPI::Session>, std::unique_ptr<DataForWebSession>> m_registeredWebSessions;
@ -186,7 +200,7 @@ private:
inline BuildProcessSession::DataForWebSession::DataForWebSession(BuildProcessSession &session)
: m_session(session)
, m_descriptor(session.m_ioContext)
, m_fileStream(session.m_ioContext)
{
}
@ -203,7 +217,7 @@ inline BuildProcessSession::BuildProcessSession(BuildAction *buildAction, boost:
, m_bufferPool(bufferSize)
, m_displayName(std::move(displayName))
, m_logFilePath(std::move(logFilePath))
, m_logFileDescriptor(ioContext)
, m_logFileStream(ioContext)
, m_locks(std::move(locks))
{
}