Improve Boost.Process-based SyncthingProcess

* Implement member functions required to run tests
* Add documentation
* Add minor tweaks
This commit is contained in:
Martchus 2021-06-17 20:32:25 +02:00
parent 303cea3658
commit 1bc52ebb0f
5 changed files with 197 additions and 78 deletions

View File

@ -117,11 +117,12 @@ void ApplicationTests::test()
}
syncthingProcess().waitForReadyRead(static_cast<int>(syncthingCheckInterval.totalMilliseconds()));
}
syncthingOutput.append(syncthingProcess().readAll());
const auto newOutput = syncthingProcess().readAll();
clog.write(newOutput.data(), newOutput.size());
syncthingOutput.append(newOutput);
} while (!syncthingOutput.contains("Access the GUI via the following URL"));
setInterleavedOutputEnabledFromEnv();
cout.write(syncthingOutput.data(), syncthingOutput.size());
cout.flush();
}

View File

@ -59,13 +59,9 @@ if (USE_BOOST_PROCESS)
# add Boost::boost target which represents include directory for header-only deps add Boost::filesystem when building for
# Windows as it is needed by Boost.Process there
set(BOOST_ARGS REQUIRED)
if (WIN32)
list(APPEND BOOST_ARGS COMPONENTS filesystem)
endif ()
list(APPEND BOOST_ARGS COMPONENTS filesystem)
use_package(TARGET_NAME Boost::boost PACKAGE_NAME Boost PACKAGE_ARGS "${BOOST_ARGS}")
if (WIN32)
use_package(TARGET_NAME Boost::filesystem PACKAGE_NAME Boost PACKAGE_ARGS "${BOOST_ARGS}")
endif ()
use_package(TARGET_NAME Boost::filesystem PACKAGE_NAME Boost PACKAGE_ARGS "${BOOST_ARGS}")
list(APPEND META_PUBLIC_COMPILE_DEFINITIONS ${META_PROJECT_VARNAME_UPPER}_BOOST_PROCESS)
if (MINGW)
# workaround https://github.com/boostorg/process/issues/96

View File

@ -7,15 +7,18 @@
#include <boost/asio/executor_work_guard.hpp>
#include <boost/asio/io_context.hpp>
#include <boost/filesystem/path.hpp>
#include <boost/process/async.hpp>
#include <boost/process/async_pipe.hpp>
#include <boost/process/child.hpp>
#include <boost/process/extend.hpp>
#include <boost/process/group.hpp>
#include <boost/process/io.hpp>
#include <boost/process/search_path.hpp>
#include <atomic>
#include <chrono>
#include <condition_variable>
#include <csignal>
#include <iostream>
#include <limits>
@ -28,8 +31,9 @@ using namespace CppUtilities;
namespace Data {
/// \cond
#ifdef LIB_SYNCTHING_CONNECTOR_BOOST_PROCESS
/// \brief Holds data related to the process execution via Boost.Process.
/// \remarks A new one is created for each process to be started.
struct SyncthingProcessInternalData : std::enable_shared_from_this<SyncthingProcessInternalData> {
static constexpr std::size_t bufferCapacity = 0x1000;
static_assert(SyncthingProcessInternalData::bufferCapacity <= std::numeric_limits<qint64>::max());
@ -48,12 +52,16 @@ struct SyncthingProcessInternalData : std::enable_shared_from_this<SyncthingProc
boost::process::group group;
boost::process::child child;
boost::process::async_pipe pipe;
std::mutex readMutex;
std::condition_variable readCondVar;
char buffer[bufferCapacity];
std::atomic_size_t bytesBuffered = 0;
std::size_t bytesRead = 0;
QProcess::ProcessState state = QProcess::NotRunning;
};
/// \brief Holds the IO context and thread handles for the process execution via Boost.Process.
/// \remarks A new one is created for each SyncthingProcess instance.
struct SyncthingProcessIOHandler {
explicit SyncthingProcessIOHandler();
~SyncthingProcessIOHandler();
@ -63,7 +71,6 @@ struct SyncthingProcessIOHandler {
std::thread t;
};
#endif
/// \endcond
SyncthingProcess *SyncthingProcess::s_mainInstance = nullptr;
@ -84,9 +91,7 @@ SyncthingProcess::SyncthingProcess(QObject *parent)
{
m_killTimer.setInterval(3000);
m_killTimer.setSingleShot(true);
#ifndef LIB_SYNCTHING_CONNECTOR_BOOST_PROCESS
setProcessChannelMode(QProcess::MergedChannels);
#endif
connect(this, &SyncthingProcess::started, this, &SyncthingProcess::handleStarted);
connect(this, static_cast<void (SyncthingProcess::*)(int exitCode, QProcess::ExitStatus exitStatus)>(&SyncthingProcess::finished), this,
&SyncthingProcess::handleFinished);
@ -228,11 +233,17 @@ void SyncthingProcess::killSyncthing()
kill();
}
/*!
* \brief Keeps track of when the process has been started.
*/
void SyncthingProcess::handleStarted()
{
m_activeSince = DateTime::gmtNow();
}
/*!
* \brief Resets the state after the process has finished; restart it again if requested.
*/
void SyncthingProcess::handleFinished(int exitCode, QProcess::ExitStatus exitStatus)
{
Q_UNUSED(exitCode)
@ -246,6 +257,9 @@ void SyncthingProcess::handleFinished(int exitCode, QProcess::ExitStatus exitSta
}
}
/*!
* \brief Kills the process if it is supposed to be restarted.
*/
void SyncthingProcess::killToRestart()
{
if (!m_program.isEmpty()) {
@ -253,9 +267,8 @@ void SyncthingProcess::killToRestart()
}
}
/// \cond
/// \remarks The functions below are for using Boost.Process (instead of QProcess) to be able to
/// terminate the process better by using a group.
// The functions below are for using Boost.Process (instead of QProcess) to be able to
// terminate the process better by using a group.
#ifdef LIB_SYNCTHING_CONNECTOR_BOOST_PROCESS
SyncthingProcessIOHandler::SyncthingProcessIOHandler()
: ioc()
@ -276,17 +289,20 @@ Data::SyncthingProcessInternalData::SyncthingProcessInternalData(boost::asio::io
{
}
inline Data::SyncthingProcessInternalData::Lock::Lock(const std::weak_ptr<SyncthingProcessInternalData> &weak)
Data::SyncthingProcessInternalData::Lock::Lock(const std::weak_ptr<SyncthingProcessInternalData> &weak)
: process(weak.lock())
, lock(process ? decltype(lock)(process->mutex, std::try_to_lock) : decltype(lock)())
{
}
inline Data::SyncthingProcessInternalData::Lock::operator bool() const
Data::SyncthingProcessInternalData::Lock::operator bool() const
{
return process && lock;
}
/*!
* \brief Internally handles an error.
*/
void SyncthingProcess::handleError(QProcess::ProcessError error, const QString &errorMessage, bool closed)
{
setErrorString(errorMessage);
@ -296,11 +312,24 @@ void SyncthingProcess::handleError(QProcess::ProcessError error, const QString &
}
}
/*!
* \brief Returns the process' state.
* \remarks The process is only considered QProcess::NotRunning if all sub processes it has created have been
* terminated as well.
*/
QProcess::ProcessState SyncthingProcess::state() const
{
return !m_process ? QProcess::NotRunning : m_process->state;
}
/*!
* \brief Starts a new process within a new process group (or job object under Windows) with the specified parameters.
* \remarks
* - The specified \a openMode must be QIODevice::ReadOnly.
* - Only one process can be started at a time. Starting another process while the previous one or any child
* process of it is still running is an error.
* - In case of an error the error strinng is set and errorOccurred() is emitted.
*/
void SyncthingProcess::start(const QString &program, const QStringList &arguments, QIODevice::OpenMode openMode)
{
// ensure the dev is only opened in read-only mode because writing is not implemented here
@ -331,71 +360,88 @@ void SyncthingProcess::start(const QString &program, const QStringList &argument
emit stateChanged(m_process->state = QProcess::Starting);
// convert args
auto prog = program.toStdString();
auto args = std::vector<std::string>();
args.reserve(1 + static_cast<std::size_t>(arguments.size()));
args.emplace_back(program.toStdString());
args.reserve(static_cast<std::size_t>(arguments.size()));
for (const auto &arg : arguments) {
args.emplace_back(arg.toStdString());
}
m_process->program = program;
m_process->arguments = arguments;
// define handler
auto successHandler = boost::process::extend::on_success = [this, maybeProcess = m_process->weak_from_this()](auto &executor) {
std::cerr << EscapeCodes::Phrases::Info << "Launched process, PID: "
<< executor
#ifdef PLATFORM_WINDOWS
.proc_info.dwProcessId
#else
.pid
#endif
<< EscapeCodes::Phrases::End;
if (const auto lock = SyncthingProcessInternalData::Lock(maybeProcess)) {
emit stateChanged(m_process->state = QProcess::Running);
emit started();
}
};
auto exitHandler = boost::process::on_exit = [this, maybeProcess = m_process->weak_from_this()](int rc, const std::error_code &ec) {
const auto lock = SyncthingProcessInternalData::Lock(maybeProcess);
if (!lock) {
return;
}
handleLeftoverProcesses();
emit stateChanged(m_process->state = QProcess::NotRunning);
emit finished(rc, rc != 0 ? QProcess::CrashExit : QProcess::NormalExit);
if (ec) {
const auto msg = ec.message();
std::cerr << EscapeCodes::Phrases::Error << "Launched process " << m_process->child.native_handle() << " exited with error: " << msg
<< EscapeCodes::Phrases::End;
QMetaObject::invokeMethod(this, "handleError", Qt::QueuedConnection, Q_ARG(QProcess::ProcessError, QProcess::Crashed),
Q_ARG(QString, QString::fromStdString(msg)), Q_ARG(bool, false));
}
};
auto errorHandler = boost::process::extend::on_error = [this, maybeProcess = m_process->weak_from_this()](auto &, const std::error_code &ec) {
const auto lock = SyncthingProcessInternalData::Lock(maybeProcess);
if (!lock) {
return;
}
handleLeftoverProcesses();
const auto started = m_process->state == QProcess::Running;
if (m_process->state != QProcess::NotRunning) {
emit stateChanged(m_process->state = QProcess::NotRunning);
}
if (started) {
emit finished(0, QProcess::CrashExit);
}
const auto error = ec == std::errc::timed_out || ec == std::errc::stream_timeout ? QProcess::Timedout : QProcess::Crashed;
const auto msg = ec.message();
std::cerr << EscapeCodes::Phrases::Error << "Unable to launch process: " << msg << EscapeCodes::Phrases::End;
QMetaObject::invokeMethod(this, "handleError", Qt::QueuedConnection, Q_ARG(QProcess::ProcessError, error),
Q_ARG(QString, QString::fromStdString(msg)), Q_ARG(bool, false));
};
// start the process within a new process group (or job object under Windows)
try {
m_process->child = boost::process::child(
m_handler->ioc, m_process->group, args, (boost::process::std_out & boost::process::std_err) > m_process->pipe,
boost::process::extend::on_success =
[this, maybeProcess = m_process->weak_from_this()](auto &executor) {
std::cerr << EscapeCodes::Phrases::Info << "Launched process, PID: "
<< executor
#ifdef PLATFORM_WINDOWS
.proc_info.dwProcessId
#else
.pid
#endif
<< EscapeCodes::Phrases::End;
if (const auto lock = SyncthingProcessInternalData::Lock(maybeProcess)) {
emit stateChanged(m_process->state = QProcess::Running);
emit started();
}
},
boost::process::on_exit =
[this, maybeProcess = m_process->weak_from_this()](int rc, const std::error_code &ec) {
const auto lock = SyncthingProcessInternalData::Lock(maybeProcess);
if (!lock) {
return;
}
handleLeftoverProcesses();
emit stateChanged(m_process->state = QProcess::NotRunning);
emit finished(rc, rc != 0 ? QProcess::CrashExit : QProcess::NormalExit);
if (ec) {
const auto msg = ec.message();
std::cerr << EscapeCodes::Phrases::Error << "Launched process " << m_process->child.native_handle()
<< " exited with error: " << msg << EscapeCodes::Phrases::End;
QMetaObject::invokeMethod(this, "handleError", Qt::QueuedConnection, Q_ARG(QProcess::ProcessError, QProcess::Crashed),
Q_ARG(QString, QString::fromStdString(msg)), Q_ARG(bool, false));
}
},
boost::process::extend::on_error =
[this, maybeProcess = m_process->weak_from_this()](auto &, const std::error_code &ec) {
const auto lock = SyncthingProcessInternalData::Lock(maybeProcess);
if (!lock) {
return;
}
handleLeftoverProcesses();
const auto started = m_process->state == QProcess::Running;
if (m_process->state != QProcess::NotRunning) {
emit stateChanged(m_process->state = QProcess::NotRunning);
}
if (started) {
emit finished(0, QProcess::CrashExit);
}
const auto error = ec == std::errc::timed_out || ec == std::errc::stream_timeout ? QProcess::Timedout : QProcess::Crashed;
const auto msg = ec.message();
std::cerr << EscapeCodes::Phrases::Error << "Unable to launch process: " << msg << EscapeCodes::Phrases::End;
QMetaObject::invokeMethod(this, "handleError", Qt::QueuedConnection, Q_ARG(QProcess::ProcessError, error),
Q_ARG(QString, QString::fromStdString(msg)), Q_ARG(bool, false));
});
auto path = boost::filesystem::path(prog);
if (path.is_relative()) {
path = boost::process::search_path(path);
}
switch (m_mode) {
case QProcess::MergedChannels:
m_process->child = boost::process::child(m_handler->ioc, m_process->group, path, args,
(boost::process::std_out & boost::process::std_err) > m_process->pipe, std::move(successHandler), std::move(exitHandler),
std::move(errorHandler));
break;
case QProcess::ForwardedChannels:
m_process->child = boost::process::child(
m_handler->ioc, m_process->group, path, args, std::move(successHandler), std::move(exitHandler), std::move(errorHandler));
break;
default:
std::cerr << EscapeCodes::Phrases::Error << "Unable to launch process: specified channel mode not supported" << EscapeCodes::Phrases::End;
emit stateChanged(m_process->state = QProcess::NotRunning);
handleError(QProcess::FailedToStart, QStringLiteral("specified channel mode not supported"), false);
return;
}
} catch (const boost::process::process_error &e) {
std::cerr << EscapeCodes::Phrases::Error << "Unable to launch process: " << e.what() << EscapeCodes::Phrases::End;
emit stateChanged(m_process->state = QProcess::NotRunning);
@ -408,6 +454,10 @@ void SyncthingProcess::start(const QString &program, const QStringList &argument
bufferOutput();
}
/*!
* \brief Terminates the launched process and all child processes gracefully if possible.
* \remarks Sends SIGTERM to the process group under POSIX and invokes TerminateJobObject() under Windows.
*/
void SyncthingProcess::terminate()
{
if (!m_process) {
@ -435,6 +485,10 @@ void SyncthingProcess::terminate()
#endif
}
/*!
* \brief Terminates the launched process and all child processes forcefully.
* \remarks Sends SIGKILL to the process group under POSIX and invokes TerminateJobObject() under Windows.
*/
void SyncthingProcess::kill()
{
if (!m_process) {
@ -458,6 +512,9 @@ void SyncthingProcess::kill()
// also in case of a forceful termination.
}
/*!
* \brief Reads data from the pipe into the internal buffer.
*/
void SyncthingProcess::bufferOutput()
{
m_process->pipe.async_read_some(boost::asio::buffer(m_process->buffer, m_process->bufferCapacity),
@ -483,10 +540,14 @@ void SyncthingProcess::bufferOutput()
}
if (!ec || bytesRead) {
emit readyRead();
m_process->readCondVar.notify_all();
}
});
}
/*!
* \brief Terminates all process in the group forcefully and waits until they're gone.
*/
void SyncthingProcess::handleLeftoverProcesses()
{
if (!m_process->group.valid()) {
@ -498,7 +559,10 @@ void SyncthingProcess::handleLeftoverProcesses()
std::cerr << EscapeCodes::Phrases::Error << "Unable to kill leftover processes in group " << m_process->group.native_handle() << ": "
<< ec.message() << EscapeCodes::Phrases::End;
}
m_process->group.wait(ec);
if (!m_process->group.valid()) {
return;
}
m_process->group.wait(ec); // wait until group has terminated: Is this ever required?
if (ec && ec != std::errc::no_such_process) {
std::cerr << EscapeCodes::Phrases::Error << "Unable to wait for leftover processes in group " << m_process->group.native_handle() << ": "
<< ec.message() << EscapeCodes::Phrases::End;
@ -521,14 +585,22 @@ void SyncthingProcess::close()
setOpenMode(QIODevice::NotOpen);
}
/*!
* \brief Returns the exit code of the last process which has been started.
* \remarks Only valid if the process has already terminated.
*/
int SyncthingProcess::exitCode() const
{
return m_process ? m_process->child.exit_code() : 0;
}
/*!
* \brief Waits until the current process and all its child processes have been terminated.
* \param msecs Specifies the number of milliseconds to wait at most or a negative integer to wait infinitely.
*/
bool SyncthingProcess::waitForFinished(int msecs)
{
if (!m_process) {
if (!m_process || !m_process->group.valid()) {
return false;
}
auto ec = std::error_code();
@ -540,16 +612,42 @@ bool SyncthingProcess::waitForFinished(int msecs)
return !ec || ec == std::errc::no_such_process || ec == std::errc::no_child_process;
}
bool SyncthingProcess::waitForReadyRead(int msecs)
{
if (!m_process) {
return false;
}
if (m_process->bytesBuffered) {
return true;
}
auto lock = std::unique_lock<std::mutex>(m_process->readMutex);
if (msecs < 0) {
m_process->readCondVar.wait(lock);
} else {
m_process->readCondVar.wait_for(lock, std::chrono::milliseconds(msecs));
}
return m_process->bytesBuffered;
}
/*!
* \brief Returns the process ID of the last process which has been started.
*/
qint64 SyncthingProcess::processId() const
{
return m_process ? m_process->child.id() : -1;
}
/*!
* \brief Returns the path/executable of the last process which has been started.
*/
QString SyncthingProcess::program() const
{
return m_process ? m_process->program : QString();
}
/*!
* \brief Returns the arguments the last process has been started with.
*/
QStringList SyncthingProcess::arguments() const
{
return m_process ? m_process->arguments : QStringList();
@ -589,6 +687,5 @@ qint64 SyncthingProcess::writeData(const char *data, qint64 len)
return -1;
}
#endif
/// \endcond
} // namespace Data

View File

@ -46,9 +46,12 @@ public:
void close() override;
int exitCode() const;
bool waitForFinished(int msecs = 30000);
bool waitForReadyRead(int msecs) override;
qint64 processId() const;
QString program() const;
QStringList arguments() const;
QProcess::ProcessChannelMode processChannelMode() const;
void setProcessChannelMode(QProcess::ProcessChannelMode mode);
#endif
public Q_SLOTS:
@ -94,6 +97,7 @@ private:
#ifdef LIB_SYNCTHING_CONNECTOR_BOOST_PROCESS
std::shared_ptr<SyncthingProcessInternalData> m_process;
std::unique_ptr<SyncthingProcessIOHandler> m_handler;
QProcess::ProcessChannelMode m_mode;
#endif
bool m_manuallyStopped;
static SyncthingProcess *s_mainInstance;
@ -141,6 +145,27 @@ inline void SyncthingProcess::setMainInstance(SyncthingProcess *mainInstance)
s_mainInstance = mainInstance;
}
#ifdef LIB_SYNCTHING_CONNECTOR_BOOST_PROCESS
/*!
* \brief Returns the QProcess::ProcessChannelMode like QProcess::processChannelMode().
*/
inline QProcess::ProcessChannelMode SyncthingProcess::processChannelMode() const
{
return m_mode;
}
/*!
* \brief Sets the QProcess::ProcessChannelMode like QProcess::setProcessChannelMode().
* \remarks
* - Does not affect an already running process.
* - Supports only QProcess::MergedChannels, QProcess::SeparateChannels and QProcess::ForwardedChannels.
*/
inline void SyncthingProcess::setProcessChannelMode(QProcess::ProcessChannelMode mode)
{
m_mode = mode;
}
#endif
} // namespace Data
#endif // DATA_SYNCTHINGPROCESS_H

View File

@ -36,9 +36,9 @@ SyncthingLauncher::SyncthingLauncher(QObject *parent)
, m_manuallyStopped(true)
, m_emittingOutput(false)
{
connect(&m_process, &SyncthingProcess::readyRead, this, &SyncthingLauncher::handleProcessReadyRead);
connect(&m_process, &SyncthingProcess::readyRead, this, &SyncthingLauncher::handleProcessReadyRead, Qt::QueuedConnection);
connect(&m_process, static_cast<void (SyncthingProcess::*)(int exitCode, QProcess::ExitStatus exitStatus)>(&SyncthingProcess::finished), this,
&SyncthingLauncher::handleProcessFinished);
&SyncthingLauncher::handleProcessFinished, Qt::QueuedConnection);
connect(&m_process, &SyncthingProcess::stateChanged, this, &SyncthingLauncher::handleProcessStateChanged, Qt::QueuedConnection);
connect(&m_process, &SyncthingProcess::errorOccurred, this, &SyncthingLauncher::errorOccurred, Qt::QueuedConnection);
connect(&m_process, &SyncthingProcess::confirmKill, this, &SyncthingLauncher::confirmKill);