Allow live-streaming via CLI

This commit is contained in:
Martchus 2021-04-05 21:15:46 +02:00
parent 338674a9e6
commit e47edcc09e
5 changed files with 273 additions and 245 deletions

View File

@ -302,43 +302,58 @@ static void printRawData(const LibRepoMgr::WebClient::Response::body_type::value
std::cout << rawData << '\n'; std::cout << rawData << '\n';
} }
static void handleResponse(const std::string &url, const LibRepoMgr::WebClient::SessionData &data, static void handleResponse(const std::string &url, LibRepoMgr::WebClient::Session &session,
const LibRepoMgr::WebClient::HttpClientError &error, void (*printer)(const LibRepoMgr::WebClient::Response::body_type::value_type &jsonData), const LibRepoMgr::WebClient::HttpClientError &error, void (*printer)(const LibRepoMgr::WebClient::Response::body_type::value_type &jsonData),
int &returnCode) int &returnCode)
{ {
const auto &response = std::get<LibRepoMgr::WebClient::Response>(data.response); auto result = boost::beast::http::status::ok;
const auto &body = response.body(); auto body = std::optional<std::string>();
if (auto *const emptyResponse = std::get_if<LibRepoMgr::WebClient::EmptyResponse>(&session.response)) {
result = emptyResponse->get().result();
} else if (auto *const response = std::get_if<LibRepoMgr::WebClient::Response>(&session.response)) {
result = response->result();
body = std::move(response->body());
}
if (error.errorCode != boost::beast::errc::success && error.errorCode != boost::asio::ssl::error::stream_truncated) { if (error.errorCode != boost::beast::errc::success && error.errorCode != boost::asio::ssl::error::stream_truncated) {
std::cerr << Phrases::ErrorMessage << "Unable to connect: " << error.what() << Phrases::End; std::cerr << Phrases::ErrorMessage << "Unable to connect: " << error.what() << Phrases::End;
std::cerr << Phrases::InfoMessage << "URL was: " << url << Phrases::End; returnCode = 9;
printRawDataForErrorHandling(body);
return;
} }
if (response.result() != boost::beast::http::status::ok) { if (result != boost::beast::http::status::ok) {
std::cerr << Phrases::ErrorMessage << "HTTP request not successful: " << response.result() << " (" std::cerr << Phrases::ErrorMessage << "HTTP request not successful: " << result << " ("
<< static_cast<std::underlying_type_t<decltype(response.result())>>(response.result()) << " response)" << Phrases::End; << static_cast<std::underlying_type_t<decltype(result)>>(result) << " response)" << Phrases::End;
std::cerr << Phrases::InfoMessage << "URL was: " << url << Phrases::End; returnCode = 10;
printRawDataForErrorHandling(body);
return;
} }
try { if (body.has_value()) {
std::invoke(printer, body); if (returnCode) {
} catch (const ReflectiveRapidJSON::JsonDeserializationError &e) { printRawDataForErrorHandling(body.value());
std::cerr << Phrases::ErrorMessage << "Unable to make sense of response: " << ReflectiveRapidJSON::formatJsonDeserializationError(e) } else {
<< Phrases::End; try {
returnCode = 13; std::invoke(printer, body.value());
} catch (const RAPIDJSON_NAMESPACE::ParseResult &e) { } catch (const ReflectiveRapidJSON::JsonDeserializationError &e) {
std::cerr << Phrases::ErrorMessage << "Unable to parse responnse: " << tupleToString(LibRepoMgr::serializeParseError(e)) << Phrases::End; std::cerr << Phrases::ErrorMessage << "Unable to make sense of response: " << ReflectiveRapidJSON::formatJsonDeserializationError(e)
returnCode = 11; << Phrases::End;
} catch (const std::runtime_error &e) { returnCode = 13;
std::cerr << Phrases::ErrorMessage << "Unable to display response: " << e.what() << Phrases::End; } catch (const RAPIDJSON_NAMESPACE::ParseResult &e) {
returnCode = 12; std::cerr << Phrases::ErrorMessage << "Unable to parse responnse: " << tupleToString(LibRepoMgr::serializeParseError(e)) << Phrases::End;
returnCode = 11;
} catch (const std::runtime_error &e) {
std::cerr << Phrases::ErrorMessage << "Unable to display response: " << e.what() << Phrases::End;
returnCode = 12;
}
}
} }
if (returnCode) { if (returnCode) {
std::cerr << Phrases::InfoMessage << "URL was: " << url << std::endl; std::cerr << Phrases::InfoMessage << "URL was: " << url << std::endl;
} }
} }
static void printChunk(const boost::beast::http::chunk_extensions &chunkExtensions, std::string_view chunkData)
{
CPP_UTILITIES_UNUSED(chunkExtensions)
std::cout << chunkData;
}
// helper for turning CLI args into URL query parameters // helper for turning CLI args into URL query parameters
static std::string asQueryParam(const Argument &cliArg, std::string_view paramName = std::string_view()) static std::string asQueryParam(const Argument &cliArg, std::string_view paramName = std::string_view())
@ -376,6 +391,7 @@ int main(int argc, const char *argv[])
// define command-specific parameters // define command-specific parameters
auto verb = boost::beast::http::verb::get; auto verb = boost::beast::http::verb::get;
auto path = std::string(); auto path = std::string();
void (*chunkHandler)(const boost::beast::http::chunk_extensions &chunkExtensions, std::string_view chunkData) = nullptr;
void (*printer)(const LibRepoMgr::WebClient::Response::body_type::value_type &jsonData) = nullptr; void (*printer)(const LibRepoMgr::WebClient::Response::body_type::value_type &jsonData) = nullptr;
// read CLI args // read CLI args
@ -427,6 +443,17 @@ int main(int argc, const char *argv[])
appendAsQueryParam(path, buildActionIdArg, "id"); appendAsQueryParam(path, buildActionIdArg, "id");
}); });
showBuildActionArg.setSubArguments({ &buildActionIdArg }); showBuildActionArg.setSubArguments({ &buildActionIdArg });
auto singleBuildActionIdArg = ConfigValueArgument("id", '\0', "specifies the build action ID", { "ID" });
singleBuildActionIdArg.setImplicit(true);
singleBuildActionIdArg.setRequired(true);
auto streamOutputBuildActionArg = OperationArgument("output", 'o', "stream build action output");
streamOutputBuildActionArg.setCallback([&path, &printer, &chunkHandler, &singleBuildActionIdArg](const ArgumentOccurrence &) {
path = "/api/v0/build-action/output?";
printer = printRawData;
chunkHandler = printChunk;
appendAsQueryParam(path, singleBuildActionIdArg, "id");
});
streamOutputBuildActionArg.setSubArguments({ &singleBuildActionIdArg });
auto createBuildActionArg = OperationArgument("create", '\0', "creates and starts a new build action (or pre-defined task)"); auto createBuildActionArg = OperationArgument("create", '\0', "creates and starts a new build action (or pre-defined task)");
auto taskArg = ConfigValueArgument("task", '\0', "specifies the pre-defined task to run", { "task" }); auto taskArg = ConfigValueArgument("task", '\0', "specifies the pre-defined task to run", { "task" });
auto typeArg = ConfigValueArgument("type", '\0', "specifies the action type", { "type" }); auto typeArg = ConfigValueArgument("type", '\0', "specifies the action type", { "type" });
@ -488,8 +515,8 @@ int main(int argc, const char *argv[])
appendAsQueryParam(path, buildActionIdArg, "id"); appendAsQueryParam(path, buildActionIdArg, "id");
}); });
stopBuildActionArg.setSubArguments({ &buildActionIdArg }); stopBuildActionArg.setSubArguments({ &buildActionIdArg });
actionArg.setSubArguments({ &listActionsArg, &showBuildActionArg, &createBuildActionArg, &deleteBuildActionArg, &cloneBuildActionArg, actionArg.setSubArguments({ &listActionsArg, &showBuildActionArg, &streamOutputBuildActionArg, &createBuildActionArg, &deleteBuildActionArg,
&startBuildActionArg, &stopBuildActionArg }); &cloneBuildActionArg, &startBuildActionArg, &stopBuildActionArg });
auto apiArg = OperationArgument("api", '\0', "Invoke a generic API request:"); auto apiArg = OperationArgument("api", '\0', "Invoke a generic API request:");
auto pathArg = ConfigValueArgument("path", '\0', "specifies the route's path without prefix", { "path/of/route?foo=bar&bar=foo" }); auto pathArg = ConfigValueArgument("path", '\0', "specifies the route's path without prefix", { "path/of/route?foo=bar&bar=foo" });
pathArg.setImplicit(true); pathArg.setImplicit(true);
@ -545,7 +572,7 @@ int main(int argc, const char *argv[])
LibRepoMgr::WebClient::runSessionFromUrl(ioContext, sslContext, url, LibRepoMgr::WebClient::runSessionFromUrl(ioContext, sslContext, url,
std::bind(&handleResponse, std::ref(url), std::placeholders::_1, std::placeholders::_2, rawArg.isPresent() ? printRawData : printer, std::bind(&handleResponse, std::ref(url), std::placeholders::_1, std::placeholders::_2, rawArg.isPresent() ? printRawData : printer,
std::ref(returnCode)), std::ref(returnCode)),
std::string(), config.userName, config.password, verb); std::string(), config.userName, config.password, verb, chunkHandler);
ioContext.run(); ioContext.run();
return returnCode; return returnCode;
} }

View File

@ -40,8 +40,8 @@ constexpr auto aurPort = "443";
void searchAurPackages(LogContext &log, ServiceSetup &setup, const std::string &searchTerm, boost::asio::io_context &ioContext, void searchAurPackages(LogContext &log, ServiceSetup &setup, const std::string &searchTerm, boost::asio::io_context &ioContext,
std::shared_ptr<AurQuerySession> &multiSession) std::shared_ptr<AurQuerySession> &multiSession)
{ {
auto session = std::make_shared<WebClient::SslSession>(ioContext, setup.webServer.sslContext, auto session = std::make_shared<WebClient::Session>(ioContext, setup.webServer.sslContext,
[&log, &setup, multiSession](WebClient::SslSession &session2, const WebClient::HttpClientError &error) mutable { [&log, &setup, multiSession](WebClient::Session &session2, const WebClient::HttpClientError &error) mutable {
if (error.errorCode != boost::beast::errc::success && error.errorCode != boost::asio::ssl::error::stream_truncated) { if (error.errorCode != boost::beast::errc::success && error.errorCode != boost::asio::ssl::error::stream_truncated) {
log(Phrases::ErrorMessage, "Failed to search AUR: ", error.what(), '\n'); log(Phrases::ErrorMessage, "Failed to search AUR: ", error.what(), '\n');
return; return;
@ -88,8 +88,8 @@ std::shared_ptr<AurQuerySession> queryAurPackagesInternal(LogContext &log, Servi
auto multiSession = AurQuerySession::create(ioContext, move(handler)); auto multiSession = AurQuerySession::create(ioContext, move(handler));
for (auto i = packages.cbegin(), end = packages.cend(); i != end;) { for (auto i = packages.cbegin(), end = packages.cend(); i != end;) {
auto session = make_shared<WebClient::SslSession>(ioContext, setup.webServer.sslContext, auto session = make_shared<WebClient::Session>(ioContext, setup.webServer.sslContext,
[&log, &setup, multiSession](WebClient::SslSession &session2, const WebClient::HttpClientError &error) mutable { [&log, &setup, multiSession](WebClient::Session &session2, const WebClient::HttpClientError &error) mutable {
if (error.errorCode != boost::beast::errc::success && error.errorCode != boost::asio::ssl::error::stream_truncated) { if (error.errorCode != boost::beast::errc::success && error.errorCode != boost::asio::ssl::error::stream_truncated) {
log(Phrases::ErrorMessage, "Failed to retrieve AUR packages from RPC: ", error.what(), '\n'); log(Phrases::ErrorMessage, "Failed to retrieve AUR packages from RPC: ", error.what(), '\n');
return; return;
@ -191,8 +191,8 @@ void queryAurSnapshots(LogContext &log, ServiceSetup &setup, const std::vector<A
{ {
CPP_UTILITIES_UNUSED(log) CPP_UTILITIES_UNUSED(log)
for (const auto &params : queryParams) { for (const auto &params : queryParams) {
auto session = std::make_shared<WebClient::SslSession>(ioContext, setup.webServer.sslContext, auto session = std::make_shared<WebClient::Session>(ioContext, setup.webServer.sslContext,
[multiSession, params](WebClient::SslSession &session2, const WebClient::HttpClientError &error) mutable { [multiSession, params](WebClient::Session &session2, const WebClient::HttpClientError &error) mutable {
if (error.errorCode != boost::beast::errc::success && error.errorCode.message() != "stream truncated") { if (error.errorCode != boost::beast::errc::success && error.errorCode.message() != "stream truncated") {
multiSession->addResponse(WebClient::AurSnapshotResult{ .packageName = *params.packageName, multiSession->addResponse(WebClient::AurSnapshotResult{ .packageName = *params.packageName,
.error = "Unable to retrieve AUR snapshot tarball for package " % *params.packageName % ": " + error.what() }); .error = "Unable to retrieve AUR snapshot tarball for package " % *params.packageName % ": " + error.what() });

View File

@ -64,18 +64,18 @@ void queryDatabases(
auto session = runSessionFromUrl( auto session = runSessionFromUrl(
setup.building.ioContext, setup.webServer.sslContext, query.url, setup.building.ioContext, setup.webServer.sslContext, query.url,
[&log, &setup, dbName = std::move(query.databaseName), dbArch = std::move(query.databaseArch), dbQuerySession]( [&log, &setup, dbName = std::move(query.databaseName), dbArch = std::move(query.databaseArch), dbQuerySession](
SessionData data, const WebClient::HttpClientError &error) mutable { Session &session2, const WebClient::HttpClientError &error) mutable {
if (error.errorCode != boost::beast::errc::success && error.errorCode != boost::asio::ssl::error::stream_truncated) { if (error.errorCode != boost::beast::errc::success && error.errorCode != boost::asio::ssl::error::stream_truncated) {
log(Phrases::ErrorMessage, "Error retrieving database file \"", data.destinationFilePath, "\" for ", dbName, ": ", error.what(), log(Phrases::ErrorMessage, "Error retrieving database file \"", session2.destinationFilePath, "\" for ", dbName, ": ", error.what(),
'\n'); '\n');
dbQuerySession->addResponse(std::move(dbName)); dbQuerySession->addResponse(std::move(dbName));
return; return;
} }
const auto &response = get<FileResponse>(data.response); const auto &response = get<FileResponse>(session2.response);
const auto &message = response.get(); const auto &message = response.get();
if (message.result() != boost::beast::http::status::ok) { if (message.result() != boost::beast::http::status::ok) {
log(Phrases::ErrorMessage, "Error retrieving database file \"", data.destinationFilePath, "\" for ", dbName, ": mirror returned ", log(Phrases::ErrorMessage, "Error retrieving database file \"", session2.destinationFilePath, "\" for ", dbName, ": mirror returned ",
message.result_int(), " response\n"); message.result_int(), " response\n");
dbQuerySession->addResponse(std::move(dbName)); dbQuerySession->addResponse(std::move(dbName));
return; return;
@ -118,7 +118,7 @@ void queryDatabases(
try { try {
// load packages // load packages
auto files = extractFiles(data.destinationFilePath, &Database::isFileRelevant); auto files = extractFiles(session2.destinationFilePath, &Database::isFileRelevant);
auto packages = Package::fromDatabaseFile(move(files)); auto packages = Package::fromDatabaseFile(move(files));
// insert packages // insert packages
@ -200,14 +200,14 @@ void cachePackages(LogContext &log, std::shared_ptr<PackageCachingSession> &&pac
log(Phrases::InfoMessage, "Downloading \"", cachingData->url, "\" to \"", cachingData->destinationFilePath, "\"\n"); log(Phrases::InfoMessage, "Downloading \"", cachingData->url, "\" to \"", cachingData->destinationFilePath, "\"\n");
runSessionFromUrl( runSessionFromUrl(
packageCachingSession->ioContext(), packageCachingSession->m_sslContext, cachingData->url, packageCachingSession->ioContext(), packageCachingSession->m_sslContext, cachingData->url,
[&log, packageCachingSession, cachingData](SessionData data, const WebClient::HttpClientError &error) mutable { [&log, packageCachingSession, cachingData](Session &session, const WebClient::HttpClientError &error) mutable {
if (error.errorCode != boost::beast::errc::success && error.errorCode != boost::asio::ssl::error::stream_truncated) { if (error.errorCode != boost::beast::errc::success && error.errorCode != boost::asio::ssl::error::stream_truncated) {
const auto msg = std::make_tuple( const auto msg = std::make_tuple(
"Error downloading \"", cachingData->url, "\" to \"", cachingData->destinationFilePath, "\": ", error.what()); "Error downloading \"", cachingData->url, "\" to \"", cachingData->destinationFilePath, "\": ", error.what());
cachingData->error = tupleToString(msg); cachingData->error = tupleToString(msg);
log(Phrases::ErrorMessage, msg, '\n'); log(Phrases::ErrorMessage, msg, '\n');
} }
const auto &response = get<FileResponse>(data.response); const auto &response = get<FileResponse>(session.response);
const auto &message = response.get(); const auto &message = response.get();
if (message.result() != boost::beast::http::status::ok) { if (message.result() != boost::beast::http::status::ok) {
const auto msg = std::make_tuple("Error downloading \"", cachingData->url, "\" to \"", cachingData->destinationFilePath, const auto msg = std::make_tuple("Error downloading \"", cachingData->url, "\" to \"", cachingData->destinationFilePath,

View File

@ -13,6 +13,7 @@
#include <boost/system/error_code.hpp> #include <boost/system/error_code.hpp>
#include <iostream> #include <iostream>
#include <limits>
using namespace std; using namespace std;
using namespace boost::asio; using namespace boost::asio;
@ -30,8 +31,26 @@ HttpClientError::HttpClientError(const char *context, boost::beast::error_code e
{ {
} }
void Session::setChunkHandler(ChunkHandler &&handler)
{
m_chunkProcessing = std::make_unique<ChunkProcessing>();
m_chunkProcessing->onChunkHeader = std::bind(&Session::onChunkHeader, shared_from_this(), std::placeholders::_1, std::placeholders::_2, std::placeholders::_3);
m_chunkProcessing->onChunkBody = std::bind(&Session::onChunkBody, shared_from_this(), std::placeholders::_1, std::placeholders::_2, std::placeholders::_3);
m_chunkProcessing->handler = std::move(handler);
}
void Session::run(const char *host, const char *port, http::verb verb, const char *target, unsigned int version) void Session::run(const char *host, const char *port, http::verb verb, const char *target, unsigned int version)
{ {
// set SNI Hostname (many hosts need this to handshake successfully)
auto *const sslStream = std::get_if<SslStream>(&m_stream);
if (sslStream && !SSL_ctrl(
sslStream->native_handle(), SSL_CTRL_SET_TLSEXT_HOSTNAME, TLSEXT_NAMETYPE_host_name, reinterpret_cast<void *>(const_cast<char *>(host)))) {
m_handler(*this,
HttpClientError(
"setting SNI hostname", boost::beast::error_code{ static_cast<int>(::ERR_get_error()), boost::asio::error::get_ssl_category() }));
return;
}
// set up an HTTP request message // set up an HTTP request message
request.version(version); request.version(version);
request.method(verb); request.method(verb);
@ -49,12 +68,25 @@ void Session::run(const char *host, const char *port, http::verb verb, const cha
m_handler(*this, HttpClientError("opening output file", errorCode)); m_handler(*this, HttpClientError("opening output file", errorCode));
return; return;
} }
} else if (m_chunkProcessing) {
auto &emptyResponse = response.emplace<EmptyResponse>();
emptyResponse.on_chunk_header(m_chunkProcessing->onChunkHeader);
emptyResponse.on_chunk_body(m_chunkProcessing->onChunkBody);
} }
// look up the domain name // look up the domain name
m_resolver.async_resolve(host, port, m_resolver.async_resolve(host, port,
boost::asio::ip::tcp::resolver::canonical_name | boost::asio::ip::tcp::resolver::passive | boost::asio::ip::tcp::resolver::all_matching, boost::asio::ip::tcp::resolver::canonical_name | boost::asio::ip::tcp::resolver::passive | boost::asio::ip::tcp::resolver::all_matching,
std::bind(&Session::resolved, shared_from_this(), std::placeholders::_1, std::placeholders::_2)); std::bind(&Session::resolved, shared_from_this(), std::placeholders::_1, std::placeholders::_2));
}
inline Session::RawSocket &Session::socket()
{
auto *socket = std::get_if<RawSocket>(&m_stream);
if (!socket) {
socket = &std::get<SslStream>(m_stream).next_layer();
}
return *socket;
} }
void Session::resolved(boost::beast::error_code ec, ip::tcp::resolver::results_type results) void Session::resolved(boost::beast::error_code ec, ip::tcp::resolver::results_type results)
@ -65,7 +97,7 @@ void Session::resolved(boost::beast::error_code ec, ip::tcp::resolver::results_t
} }
// make the connection on the IP address we get from a lookup // make the connection on the IP address we get from a lookup
boost::asio::async_connect(m_socket, results.begin(), results.end(), std::bind(&Session::connected, shared_from_this(), std::placeholders::_1)); boost::asio::async_connect(socket(), results.begin(), results.end(), std::bind(&Session::connected, shared_from_this(), std::placeholders::_1));
} }
void Session::connected(boost::beast::error_code ec) void Session::connected(boost::beast::error_code ec)
@ -75,8 +107,30 @@ void Session::connected(boost::beast::error_code ec)
return; return;
} }
// perform the SSL handshake if (auto *const sslStream = std::get_if<SslStream>(&m_stream)) {
http::async_write(m_socket, request, std::bind(&Session::requested, shared_from_this(), std::placeholders::_1, std::placeholders::_2)); // perform the SSL handshake
sslStream->async_handshake(ssl::stream_base::client, std::bind(&Session::handshakeDone, shared_from_this(), std::placeholders::_1));
} else {
sendRequest();
}
}
void Session::handshakeDone(boost::beast::error_code ec)
{
if (ec) {
m_handler(*this, HttpClientError("SSL handshake", ec));
return;
}
sendRequest();
}
void Session::sendRequest()
{
// send the HTTP request to the remote host
std::visit([this](auto &&stream) {
boost::beast::http::async_write(
stream, request, std::bind(&Session::requested, shared_from_this(), std::placeholders::_1, std::placeholders::_2));
}, m_stream);
} }
void Session::requested(boost::beast::error_code ec, std::size_t bytesTransferred) void Session::requested(boost::beast::error_code ec, std::size_t bytesTransferred)
@ -89,11 +143,73 @@ void Session::requested(boost::beast::error_code ec, std::size_t bytesTransferre
// receive the HTTP response // receive the HTTP response
std::visit( std::visit(
[this](auto &&response) { [this](auto &stream, auto &&response) {
http::async_read( if constexpr (std::is_same_v<std::decay_t<decltype(response)>, EmptyResponse>) {
m_socket, m_buffer, response, std::bind(&Session::received, shared_from_this(), std::placeholders::_1, std::placeholders::_2)); http::async_read_header(stream, m_buffer, response, std::bind(&Session::chunkReceived, shared_from_this(), std::placeholders::_1, std::placeholders::_2));
} else {
http::async_read(
stream, m_buffer, response, std::bind(&Session::received, shared_from_this(), std::placeholders::_1, std::placeholders::_2));
}
}, },
response); m_stream, response);
}
void Session::onChunkHeader(std::uint64_t chunkSize, boost::beast::string_view extensions, boost::beast::error_code &ec)
{
// parse the chunk extensions so we can access them easily
m_chunkProcessing->chunkExtensions.parse(extensions, ec);
if (ec) {
return;
}
if(chunkSize > std::numeric_limits<std::size_t>::max()) {
ec = boost::beast::http::error::body_limit;
return;
}
// make sure we have enough storage, and reset the container for the upcoming chunk
m_chunkProcessing->currentChunk.reserve(static_cast<std::size_t>(chunkSize));
m_chunkProcessing->currentChunk.clear();
}
std::size_t Session::onChunkBody(std::uint64_t bytesLeftInThisChunk, boost::beast::string_view chunkBodyData, boost::beast::error_code &ec)
{
// set the error so that the call to `read` returns if this is the last piece of the chunk body and we can process the chunk
if(bytesLeftInThisChunk == chunkBodyData.size()) {
ec = boost::beast::http::error::end_of_chunk;
}
// append this piece to our container
m_chunkProcessing->currentChunk.append(chunkBodyData.data(), chunkBodyData.size());
return chunkBodyData.size();
// note: The return value informs the parser of how much of the body we
// consumed. We will indicate that we consumed everything passed in.
}
void Session::chunkReceived(boost::beast::error_code ec, std::size_t bytesTransferred)
{
if(ec == boost::beast::http::error::end_of_chunk) {
m_chunkProcessing->handler(m_chunkProcessing->chunkExtensions, m_chunkProcessing->currentChunk);
} else if (ec) {
m_handler(*this, HttpClientError("receiving chunk response", ec));
return;
}
if (!continueReadingChunks()) {
received(ec, bytesTransferred);
}
}
bool Session::continueReadingChunks()
{
auto &parser = std::get<EmptyResponse>(response);
if (parser.is_done()) {
return false;
}
std::visit([this, &parser] (auto &stream) {
boost::beast::http::async_read(stream, m_buffer, parser, std::bind(&Session::chunkReceived, shared_from_this(), std::placeholders::_1, std::placeholders::_2));
}, m_stream);
return true;
} }
void Session::received(boost::beast::error_code ec, std::size_t bytesTransferred) void Session::received(boost::beast::error_code ec, std::size_t bytesTransferred)
@ -105,153 +221,28 @@ void Session::received(boost::beast::error_code ec, std::size_t bytesTransferred
} }
// close the stream gracefully // close the stream gracefully
m_socket.shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec); if (auto *const sslStream = std::get_if<SslStream>(&m_stream)) {
// perform the SSL handshake
if (ec && ec != boost::beast::errc::not_connected) { sslStream->async_shutdown(std::bind(&Session::closed, shared_from_this(), std::placeholders::_1));
m_handler(*this, HttpClientError("closing connection", ec)); } else if (auto *const socket = std::get_if<RawSocket>(&m_stream)) {
return; socket->shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec);
m_handler(*this, ec && ec != boost::beast::errc::not_connected ? HttpClientError("closing connection", ec) : HttpClientError());
} }
// if we get here then the connection is closed gracefully
m_handler(*this, HttpClientError());
} }
void SslSession::run(const char *host, const char *port, http::verb verb, const char *target, unsigned int version) void Session::closed(boost::beast::error_code ec)
{ {
// set SNI Hostname (many hosts need this to handshake successfully) // rationale regarding boost::asio::error::eof: http://stackoverflow.com/questions/25587403/boost-asio-ssl-async-shutdown-always-finishes-with-an-error
if (!SSL_ctrl( m_handler(*this, ec && ec != boost::asio::error::eof ? HttpClientError("closing connection", ec) : HttpClientError());
m_stream.native_handle(), SSL_CTRL_SET_TLSEXT_HOSTNAME, TLSEXT_NAMETYPE_host_name, reinterpret_cast<void *>(const_cast<char *>(host)))) {
m_handler(*this,
HttpClientError(
"setting SNI hostname", boost::beast::error_code{ static_cast<int>(::ERR_get_error()), boost::asio::error::get_ssl_category() }));
return;
}
// setup an HTTP request message
request.version(version);
request.method(verb);
request.target(target);
request.set(http::field::host, host);
request.set(http::field::user_agent, APP_NAME " " APP_VERSION);
// setup a file response
if (!destinationFilePath.empty()) {
auto &fileResponse = response.emplace<FileResponse>();
boost::beast::error_code errorCode;
fileResponse.body_limit(100 * 1024 * 1024);
fileResponse.get().body().open(destinationFilePath.data(), file_mode::write, errorCode);
if (errorCode != boost::beast::errc::success) {
m_handler(*this, HttpClientError("opening output file", errorCode));
return;
}
}
// look up the domain name
m_resolver.async_resolve(host, port,
boost::asio::ip::tcp::resolver::canonical_name | boost::asio::ip::tcp::resolver::passive | boost::asio::ip::tcp::resolver::all_matching,
std::bind(&SslSession::resolved, shared_from_this(), std::placeholders::_1, std::placeholders::_2));
} }
void SslSession::resolved(boost::beast::error_code ec, ip::tcp::resolver::results_type results) std::variant<std::string, std::shared_ptr<Session>> runSessionFromUrl(
boost::asio::io_context &ioContext, boost::asio::ssl::context &sslContext, std::string_view url,
Session::Handler &&handler, std::string &&destinationPath,
std::string_view userName, std::string_view password,
boost::beast::http::verb verb, Session::ChunkHandler &&chunkHandler)
{ {
if (ec) { std::string host, port, target;
m_handler(*this, HttpClientError("resolving", ec));
return;
}
// make the connection on the IP address we get from a lookup
boost::asio::async_connect(
m_stream.next_layer(), results.begin(), results.end(), std::bind(&SslSession::connected, shared_from_this(), std::placeholders::_1));
}
void SslSession::connected(boost::beast::error_code ec)
{
if (ec) {
m_handler(*this, HttpClientError("connecting", ec));
return;
}
// perform the SSL handshake
m_stream.async_handshake(ssl::stream_base::client, std::bind(&SslSession::handshakeDone, shared_from_this(), std::placeholders::_1));
}
void SslSession::handshakeDone(boost::beast::error_code ec)
{
if (ec) {
m_handler(*this, HttpClientError("SSL handshake", ec));
return;
}
// send the HTTP request to the remote host
boost::beast::http::async_write(
m_stream, request, std::bind(&SslSession::requested, shared_from_this(), std::placeholders::_1, std::placeholders::_2));
}
void SslSession::requested(boost::beast::error_code ec, std::size_t bytesTransferred)
{
boost::ignore_unused(bytesTransferred);
if (ec) {
m_handler(*this, HttpClientError("sending request", ec));
return;
}
// receive the HTTP response
std::visit(
[this](auto &&response) {
http::async_read(
m_stream, m_buffer, response, std::bind(&SslSession::received, shared_from_this(), std::placeholders::_1, std::placeholders::_2));
},
response);
}
void SslSession::received(boost::beast::error_code ec, std::size_t bytesTransferred)
{
boost::ignore_unused(bytesTransferred);
if (ec) {
m_handler(*this, HttpClientError("receiving response", ec));
return;
}
// close the stream gracefully
m_stream.async_shutdown(std::bind(&SslSession::closed, shared_from_this(), std::placeholders::_1));
}
void SslSession::closed(boost::beast::error_code ec)
{
if (ec == boost::asio::error::eof) {
// rationale: http://stackoverflow.com/questions/25587403/boost-asio-ssl-async-shutdown-always-finishes-with-an-error
ec = {};
}
if (ec) {
m_handler(*this, HttpClientError("closing connection", ec));
return;
}
// if we get here then the connection is closed gracefully
m_handler(*this, HttpClientError());
}
template <typename SessionType, typename... ArgType>
std::variant<string, std::shared_ptr<Session>, std::shared_ptr<SslSession>> runSession(const std::string &host, const std::string &port,
const std::string &target, std::function<void(SessionData, const HttpClientError &)> &&handler, std::string &&destinationPath,
std::string_view userName, std::string_view password, boost::beast::http::verb verb, ArgType &&...args)
{
auto session = make_shared<SessionType>(args..., [handler{ move(handler) }](auto &session2, const HttpClientError &error) mutable {
handler(SessionData{ session2.shared_from_this(), session2.request, session2.response, session2.destinationFilePath }, error);
});
if (!userName.empty()) {
const auto authInfo = userName % ":" + password;
session->request.set(boost::beast::http::field::authorization,
"Basic " + encodeBase64(reinterpret_cast<const std::uint8_t *>(authInfo.data()), static_cast<std::uint32_t>(authInfo.size())));
}
session->destinationFilePath = move(destinationPath);
session->run(host.data(), port.data(), verb, target.data());
return std::variant<string, std::shared_ptr<Session>, std::shared_ptr<SslSession>>(std::move(session));
}
std::variant<string, std::shared_ptr<Session>, std::shared_ptr<SslSession>> runSessionFromUrl(boost::asio::io_context &ioContext,
boost::asio::ssl::context &sslContext, std::string_view url, std::function<void(SessionData, const HttpClientError &)> &&handler,
std::string &&destinationPath, std::string_view userName, std::string_view password, boost::beast::http::verb verb)
{
string host, port, target;
auto ssl = false; auto ssl = false;
if (startsWith(url, "http:")) { if (startsWith(url, "http:")) {
@ -260,7 +251,7 @@ std::variant<string, std::shared_ptr<Session>, std::shared_ptr<SslSession>> runS
url = url.substr(6); url = url.substr(6);
ssl = true; ssl = true;
} else { } else {
return "db mirror for database has unsupported protocol"; return std::string("unsupported protocol");
} }
auto urlParts = splitStringSimple<vector<std::string_view>>(url, "/"); auto urlParts = splitStringSimple<vector<std::string_view>>(url, "/");
@ -285,11 +276,18 @@ std::variant<string, std::shared_ptr<Session>, std::shared_ptr<SslSession>> runS
port = ssl ? "443" : "80"; port = ssl ? "443" : "80";
} }
if (ssl) { auto session = ssl ? std::make_shared<Session>(ioContext, sslContext, std::move(handler)) : std::make_shared<Session>(ioContext, std::move(handler));
return runSession<SslSession>(host, port, target, move(handler), move(destinationPath), userName, password, verb, ioContext, sslContext); if (!userName.empty()) {
} else { const auto authInfo = userName % ":" + password;
return runSession<Session>(host, port, target, move(handler), move(destinationPath), userName, password, verb, ioContext); session->request.set(boost::beast::http::field::authorization,
"Basic " + encodeBase64(reinterpret_cast<const std::uint8_t *>(authInfo.data()), static_cast<std::uint32_t>(authInfo.size())));
} }
session->destinationFilePath = std::move(destinationPath);
if (chunkHandler) {
session->setChunkHandler(std::move(chunkHandler));
}
session->run(host.data(), port.data(), verb, target.data());
return std::variant<std::string, std::shared_ptr<Session>>(std::move(session));
} }
} // namespace WebClient } // namespace WebClient

View File

@ -43,66 +43,50 @@ inline LibRepoMgr::WebClient::HttpClientError::operator bool() const
using Response = WebAPI::Response; using Response = WebAPI::Response;
using FileResponse = boost::beast::http::response_parser<boost::beast::http::file_body>; using FileResponse = boost::beast::http::response_parser<boost::beast::http::file_body>;
using MultiResponse = std::variant<Response, FileResponse>; using EmptyResponse = boost::beast::http::response_parser<boost::beast::http::empty_body>;
using MultiResponse = std::variant<Response, FileResponse, EmptyResponse>;
using Request = boost::beast::http::request<boost::beast::http::empty_body>; using Request = boost::beast::http::request<boost::beast::http::empty_body>;
struct LIBREPOMGR_EXPORT SessionData { struct ChunkProcessing;
std::shared_ptr<void> session;
Request &request;
MultiResponse &response;
std::string &destinationFilePath;
};
class LIBREPOMGR_EXPORT Session : public std::enable_shared_from_this<Session> { class LIBREPOMGR_EXPORT Session : public std::enable_shared_from_this<Session> {
public: public:
using Handler = std::function<void(Session &, const HttpClientError &error)>; using Handler = std::function<void(Session &, const HttpClientError &error)>;
using ChunkHandler = std::function<void(const boost::beast::http::chunk_extensions &chunkExtensions, std::string_view chunkData)>;
template <typename ResponseType = Response> explicit Session(boost::asio::io_context &ioContext, const Handler &handler = Handler()); template <typename ResponseType = Response>
explicit Session(boost::asio::io_context &ioContext, const Handler &handler = Handler());
template <typename ResponseType = Response>
explicit Session(boost::asio::io_context &ioContext, Handler &&handler = Handler());
template <typename ResponseType = Response>
explicit Session(boost::asio::io_context &ioContext, boost::asio::ssl::context &sslContext, const Handler &handler = Handler());
template <typename ResponseType = Response>
explicit Session(boost::asio::io_context &ioContext, boost::asio::ssl::context &sslContext, Handler &&handler);
void setChunkHandler(ChunkHandler &&handler);
void run(const char *host, const char *port, boost::beast::http::verb verb, const char *target, unsigned int version = 11); void run(const char *host, const char *port, boost::beast::http::verb verb, const char *target, unsigned int version = 11);
private: private:
void resolved(boost::beast::error_code ec, boost::asio::ip::tcp::resolver::results_type results); using RawSocket = boost::asio::ip::tcp::socket;
void connected(boost::beast::error_code ec); using SslStream = boost::asio::ssl::stream<boost::asio::ip::tcp::socket>;
void requested(boost::beast::error_code ec, std::size_t bytesTransferred); struct ChunkProcessing {
void received(boost::beast::error_code ec, std::size_t bytesTransferred); boost::beast::http::chunk_extensions chunkExtensions;
std::string currentChunk;
std::function<void(std::uint64_t chunkSize, boost::beast::string_view extensions, boost::beast::error_code &ec)> onChunkHeader;
std::function<std::size_t(std::uint64_t bytesLeftInThisChunk, boost::beast::string_view chunkBodyData, boost::beast::error_code &ec)> onChunkBody;
Session::ChunkHandler handler;
};
public: RawSocket &socket();
Request request;
MultiResponse response;
std::string destinationFilePath;
private:
boost::asio::ip::tcp::resolver m_resolver;
boost::asio::ip::tcp::socket m_socket;
boost::beast::flat_buffer m_buffer;
Handler m_handler;
};
template <typename ResponseType>
inline Session::Session(boost::asio::io_context &ioContext, const Handler &handler)
: response(ResponseType{})
, m_resolver(ioContext)
, m_socket(ioContext)
, m_handler(handler)
{
}
class LIBREPOMGR_EXPORT SslSession : public std::enable_shared_from_this<SslSession> {
public:
using Handler = std::function<void(SslSession &, const HttpClientError &error)>;
template <typename ResponseType = Response>
explicit SslSession(boost::asio::io_context &ioContext, boost::asio::ssl::context &sslContext, const Handler &handler = Handler());
template <typename ResponseType = Response>
explicit SslSession(boost::asio::io_context &ioContext, boost::asio::ssl::context &sslContext, Handler &&handler);
void run(const char *host, const char *port, boost::beast::http::verb verb, const char *target, unsigned int version = 11);
private:
void resolved(boost::beast::error_code ec, boost::asio::ip::tcp::resolver::results_type results); void resolved(boost::beast::error_code ec, boost::asio::ip::tcp::resolver::results_type results);
void connected(boost::beast::error_code ec); void connected(boost::beast::error_code ec);
void handshakeDone(boost::beast::error_code ec); void handshakeDone(boost::beast::error_code ec);
void sendRequest();
void requested(boost::beast::error_code ec, std::size_t bytesTransferred); void requested(boost::beast::error_code ec, std::size_t bytesTransferred);
void onChunkHeader(std::uint64_t chunkSize, boost::beast::string_view extensions, boost::beast::error_code &ec);
std::size_t onChunkBody(std::uint64_t bytesLeftInThisChunk, boost::beast::string_view chunkBodyData, boost::beast::error_code &ec);
void chunkReceived(boost::beast::error_code ec, std::size_t bytesTransferred);
bool continueReadingChunks();
void received(boost::beast::error_code ec, std::size_t bytesTransferred); void received(boost::beast::error_code ec, std::size_t bytesTransferred);
void closed(boost::beast::error_code ec); void closed(boost::beast::error_code ec);
@ -113,34 +97,53 @@ public:
private: private:
boost::asio::ip::tcp::resolver m_resolver; boost::asio::ip::tcp::resolver m_resolver;
boost::asio::ssl::stream<boost::asio::ip::tcp::socket> m_stream; std::variant<RawSocket, SslStream> m_stream;
boost::beast::flat_buffer m_buffer; boost::beast::flat_buffer m_buffer;
std::unique_ptr<ChunkProcessing> m_chunkProcessing;
Handler m_handler; Handler m_handler;
}; };
template <typename ResponseType> template <typename ResponseType>
inline SslSession::SslSession(boost::asio::io_context &ioContext, boost::asio::ssl::context &sslContext, const Handler &handler) inline Session::Session(boost::asio::io_context &ioContext, const Handler &handler)
: response(ResponseType{}) : response(ResponseType{})
, m_resolver(ioContext) , m_resolver(ioContext)
, m_stream(ioContext, sslContext) , m_stream(RawSocket{ioContext})
, m_handler(handler) , m_handler(handler)
{ {
} }
template <typename ResponseType> template <typename ResponseType>
inline SslSession::SslSession(boost::asio::io_context &ioContext, boost::asio::ssl::context &sslContext, Handler &&handler) inline Session::Session(boost::asio::io_context &ioContext, Handler &&handler)
: response(ResponseType{}) : response(ResponseType{})
, m_resolver(ioContext) , m_resolver(ioContext)
, m_stream(ioContext, sslContext) , m_stream(RawSocket{ioContext})
, m_handler(std::move(handler))
{
}
template <typename ResponseType>
inline Session::Session(boost::asio::io_context &ioContext, boost::asio::ssl::context &sslContext, const Handler &handler)
: response(ResponseType{})
, m_resolver(ioContext)
, m_stream(SslStream{ioContext, sslContext})
, m_handler(handler) , m_handler(handler)
{ {
} }
LIBREPOMGR_EXPORT std::variant<std::string, std::shared_ptr<Session>, std::shared_ptr<SslSession>> runSessionFromUrl( template <typename ResponseType>
inline Session::Session(boost::asio::io_context &ioContext, boost::asio::ssl::context &sslContext, Handler &&handler)
: response(ResponseType{})
, m_resolver(ioContext)
, m_stream(SslStream{ioContext, sslContext})
, m_handler(std::move(handler))
{
}
LIBREPOMGR_EXPORT std::variant<std::string, std::shared_ptr<Session>> runSessionFromUrl(
boost::asio::io_context &ioContext, boost::asio::ssl::context &sslContext, std::string_view url, boost::asio::io_context &ioContext, boost::asio::ssl::context &sslContext, std::string_view url,
std::function<void(SessionData data, const HttpClientError &error)> &&handler, std::string &&destinationPath = std::string(), Session::Handler &&handler, std::string &&destinationPath = std::string(),
std::string_view userName = std::string_view(), std::string_view password = std::string_view(), std::string_view userName = std::string_view(), std::string_view password = std::string_view(),
boost::beast::http::verb verb = boost::beast::http::verb::get); boost::beast::http::verb verb = boost::beast::http::verb::get, Session::ChunkHandler &&chunkHandler = Session::ChunkHandler());
} // namespace WebClient } // namespace WebClient
} // namespace LibRepoMgr } // namespace LibRepoMgr