diff --git a/cli/main.cpp b/cli/main.cpp index 6e6de09..c2c557a 100644 --- a/cli/main.cpp +++ b/cli/main.cpp @@ -302,43 +302,58 @@ static void printRawData(const LibRepoMgr::WebClient::Response::body_type::value std::cout << rawData << '\n'; } -static void handleResponse(const std::string &url, const LibRepoMgr::WebClient::SessionData &data, +static void handleResponse(const std::string &url, LibRepoMgr::WebClient::Session &session, const LibRepoMgr::WebClient::HttpClientError &error, void (*printer)(const LibRepoMgr::WebClient::Response::body_type::value_type &jsonData), int &returnCode) { - const auto &response = std::get(data.response); - const auto &body = response.body(); + auto result = boost::beast::http::status::ok; + auto body = std::optional(); + if (auto *const emptyResponse = std::get_if(&session.response)) { + result = emptyResponse->get().result(); + } else if (auto *const response = std::get_if(&session.response)) { + result = response->result(); + body = std::move(response->body()); + } + if (error.errorCode != boost::beast::errc::success && error.errorCode != boost::asio::ssl::error::stream_truncated) { std::cerr << Phrases::ErrorMessage << "Unable to connect: " << error.what() << Phrases::End; - std::cerr << Phrases::InfoMessage << "URL was: " << url << Phrases::End; - printRawDataForErrorHandling(body); - return; + returnCode = 9; } - if (response.result() != boost::beast::http::status::ok) { - std::cerr << Phrases::ErrorMessage << "HTTP request not successful: " << response.result() << " (" - << static_cast>(response.result()) << " response)" << Phrases::End; - std::cerr << Phrases::InfoMessage << "URL was: " << url << Phrases::End; - printRawDataForErrorHandling(body); - return; + if (result != boost::beast::http::status::ok) { + std::cerr << Phrases::ErrorMessage << "HTTP request not successful: " << result << " (" + << static_cast>(result) << " response)" << Phrases::End; + returnCode = 10; } - try { - std::invoke(printer, body); - } catch (const ReflectiveRapidJSON::JsonDeserializationError &e) { - std::cerr << Phrases::ErrorMessage << "Unable to make sense of response: " << ReflectiveRapidJSON::formatJsonDeserializationError(e) - << Phrases::End; - returnCode = 13; - } catch (const RAPIDJSON_NAMESPACE::ParseResult &e) { - std::cerr << Phrases::ErrorMessage << "Unable to parse responnse: " << tupleToString(LibRepoMgr::serializeParseError(e)) << Phrases::End; - returnCode = 11; - } catch (const std::runtime_error &e) { - std::cerr << Phrases::ErrorMessage << "Unable to display response: " << e.what() << Phrases::End; - returnCode = 12; + if (body.has_value()) { + if (returnCode) { + printRawDataForErrorHandling(body.value()); + } else { + try { + std::invoke(printer, body.value()); + } catch (const ReflectiveRapidJSON::JsonDeserializationError &e) { + std::cerr << Phrases::ErrorMessage << "Unable to make sense of response: " << ReflectiveRapidJSON::formatJsonDeserializationError(e) + << Phrases::End; + returnCode = 13; + } catch (const RAPIDJSON_NAMESPACE::ParseResult &e) { + std::cerr << Phrases::ErrorMessage << "Unable to parse responnse: " << tupleToString(LibRepoMgr::serializeParseError(e)) << Phrases::End; + returnCode = 11; + } catch (const std::runtime_error &e) { + std::cerr << Phrases::ErrorMessage << "Unable to display response: " << e.what() << Phrases::End; + returnCode = 12; + } + } } if (returnCode) { std::cerr << Phrases::InfoMessage << "URL was: " << url << std::endl; } } +static void printChunk(const boost::beast::http::chunk_extensions &chunkExtensions, std::string_view chunkData) +{ + CPP_UTILITIES_UNUSED(chunkExtensions) + std::cout << chunkData; +} + // helper for turning CLI args into URL query parameters static std::string asQueryParam(const Argument &cliArg, std::string_view paramName = std::string_view()) @@ -376,6 +391,7 @@ int main(int argc, const char *argv[]) // define command-specific parameters auto verb = boost::beast::http::verb::get; auto path = std::string(); + void (*chunkHandler)(const boost::beast::http::chunk_extensions &chunkExtensions, std::string_view chunkData) = nullptr; void (*printer)(const LibRepoMgr::WebClient::Response::body_type::value_type &jsonData) = nullptr; // read CLI args @@ -427,6 +443,17 @@ int main(int argc, const char *argv[]) appendAsQueryParam(path, buildActionIdArg, "id"); }); showBuildActionArg.setSubArguments({ &buildActionIdArg }); + auto singleBuildActionIdArg = ConfigValueArgument("id", '\0', "specifies the build action ID", { "ID" }); + singleBuildActionIdArg.setImplicit(true); + singleBuildActionIdArg.setRequired(true); + auto streamOutputBuildActionArg = OperationArgument("output", 'o', "stream build action output"); + streamOutputBuildActionArg.setCallback([&path, &printer, &chunkHandler, &singleBuildActionIdArg](const ArgumentOccurrence &) { + path = "/api/v0/build-action/output?"; + printer = printRawData; + chunkHandler = printChunk; + appendAsQueryParam(path, singleBuildActionIdArg, "id"); + }); + streamOutputBuildActionArg.setSubArguments({ &singleBuildActionIdArg }); auto createBuildActionArg = OperationArgument("create", '\0', "creates and starts a new build action (or pre-defined task)"); auto taskArg = ConfigValueArgument("task", '\0', "specifies the pre-defined task to run", { "task" }); auto typeArg = ConfigValueArgument("type", '\0', "specifies the action type", { "type" }); @@ -488,8 +515,8 @@ int main(int argc, const char *argv[]) appendAsQueryParam(path, buildActionIdArg, "id"); }); stopBuildActionArg.setSubArguments({ &buildActionIdArg }); - actionArg.setSubArguments({ &listActionsArg, &showBuildActionArg, &createBuildActionArg, &deleteBuildActionArg, &cloneBuildActionArg, - &startBuildActionArg, &stopBuildActionArg }); + actionArg.setSubArguments({ &listActionsArg, &showBuildActionArg, &streamOutputBuildActionArg, &createBuildActionArg, &deleteBuildActionArg, + &cloneBuildActionArg, &startBuildActionArg, &stopBuildActionArg }); auto apiArg = OperationArgument("api", '\0', "Invoke a generic API request:"); auto pathArg = ConfigValueArgument("path", '\0', "specifies the route's path without prefix", { "path/of/route?foo=bar&bar=foo" }); pathArg.setImplicit(true); @@ -545,7 +572,7 @@ int main(int argc, const char *argv[]) LibRepoMgr::WebClient::runSessionFromUrl(ioContext, sslContext, url, std::bind(&handleResponse, std::ref(url), std::placeholders::_1, std::placeholders::_2, rawArg.isPresent() ? printRawData : printer, std::ref(returnCode)), - std::string(), config.userName, config.password, verb); + std::string(), config.userName, config.password, verb, chunkHandler); ioContext.run(); return returnCode; } diff --git a/librepomgr/webclient/aur.cpp b/librepomgr/webclient/aur.cpp index 679407b..dbff26d 100644 --- a/librepomgr/webclient/aur.cpp +++ b/librepomgr/webclient/aur.cpp @@ -40,8 +40,8 @@ constexpr auto aurPort = "443"; void searchAurPackages(LogContext &log, ServiceSetup &setup, const std::string &searchTerm, boost::asio::io_context &ioContext, std::shared_ptr &multiSession) { - auto session = std::make_shared(ioContext, setup.webServer.sslContext, - [&log, &setup, multiSession](WebClient::SslSession &session2, const WebClient::HttpClientError &error) mutable { + auto session = std::make_shared(ioContext, setup.webServer.sslContext, + [&log, &setup, multiSession](WebClient::Session &session2, const WebClient::HttpClientError &error) mutable { if (error.errorCode != boost::beast::errc::success && error.errorCode != boost::asio::ssl::error::stream_truncated) { log(Phrases::ErrorMessage, "Failed to search AUR: ", error.what(), '\n'); return; @@ -88,8 +88,8 @@ std::shared_ptr queryAurPackagesInternal(LogContext &log, Servi auto multiSession = AurQuerySession::create(ioContext, move(handler)); for (auto i = packages.cbegin(), end = packages.cend(); i != end;) { - auto session = make_shared(ioContext, setup.webServer.sslContext, - [&log, &setup, multiSession](WebClient::SslSession &session2, const WebClient::HttpClientError &error) mutable { + auto session = make_shared(ioContext, setup.webServer.sslContext, + [&log, &setup, multiSession](WebClient::Session &session2, const WebClient::HttpClientError &error) mutable { if (error.errorCode != boost::beast::errc::success && error.errorCode != boost::asio::ssl::error::stream_truncated) { log(Phrases::ErrorMessage, "Failed to retrieve AUR packages from RPC: ", error.what(), '\n'); return; @@ -191,8 +191,8 @@ void queryAurSnapshots(LogContext &log, ServiceSetup &setup, const std::vector(ioContext, setup.webServer.sslContext, - [multiSession, params](WebClient::SslSession &session2, const WebClient::HttpClientError &error) mutable { + auto session = std::make_shared(ioContext, setup.webServer.sslContext, + [multiSession, params](WebClient::Session &session2, const WebClient::HttpClientError &error) mutable { if (error.errorCode != boost::beast::errc::success && error.errorCode.message() != "stream truncated") { multiSession->addResponse(WebClient::AurSnapshotResult{ .packageName = *params.packageName, .error = "Unable to retrieve AUR snapshot tarball for package " % *params.packageName % ": " + error.what() }); diff --git a/librepomgr/webclient/database.cpp b/librepomgr/webclient/database.cpp index a0f6152..bf11ea6 100644 --- a/librepomgr/webclient/database.cpp +++ b/librepomgr/webclient/database.cpp @@ -64,18 +64,18 @@ void queryDatabases( auto session = runSessionFromUrl( setup.building.ioContext, setup.webServer.sslContext, query.url, [&log, &setup, dbName = std::move(query.databaseName), dbArch = std::move(query.databaseArch), dbQuerySession]( - SessionData data, const WebClient::HttpClientError &error) mutable { + Session &session2, const WebClient::HttpClientError &error) mutable { if (error.errorCode != boost::beast::errc::success && error.errorCode != boost::asio::ssl::error::stream_truncated) { - log(Phrases::ErrorMessage, "Error retrieving database file \"", data.destinationFilePath, "\" for ", dbName, ": ", error.what(), + log(Phrases::ErrorMessage, "Error retrieving database file \"", session2.destinationFilePath, "\" for ", dbName, ": ", error.what(), '\n'); dbQuerySession->addResponse(std::move(dbName)); return; } - const auto &response = get(data.response); + const auto &response = get(session2.response); const auto &message = response.get(); if (message.result() != boost::beast::http::status::ok) { - log(Phrases::ErrorMessage, "Error retrieving database file \"", data.destinationFilePath, "\" for ", dbName, ": mirror returned ", + log(Phrases::ErrorMessage, "Error retrieving database file \"", session2.destinationFilePath, "\" for ", dbName, ": mirror returned ", message.result_int(), " response\n"); dbQuerySession->addResponse(std::move(dbName)); return; @@ -118,7 +118,7 @@ void queryDatabases( try { // load packages - auto files = extractFiles(data.destinationFilePath, &Database::isFileRelevant); + auto files = extractFiles(session2.destinationFilePath, &Database::isFileRelevant); auto packages = Package::fromDatabaseFile(move(files)); // insert packages @@ -200,14 +200,14 @@ void cachePackages(LogContext &log, std::shared_ptr &&pac log(Phrases::InfoMessage, "Downloading \"", cachingData->url, "\" to \"", cachingData->destinationFilePath, "\"\n"); runSessionFromUrl( packageCachingSession->ioContext(), packageCachingSession->m_sslContext, cachingData->url, - [&log, packageCachingSession, cachingData](SessionData data, const WebClient::HttpClientError &error) mutable { + [&log, packageCachingSession, cachingData](Session &session, const WebClient::HttpClientError &error) mutable { if (error.errorCode != boost::beast::errc::success && error.errorCode != boost::asio::ssl::error::stream_truncated) { const auto msg = std::make_tuple( "Error downloading \"", cachingData->url, "\" to \"", cachingData->destinationFilePath, "\": ", error.what()); cachingData->error = tupleToString(msg); log(Phrases::ErrorMessage, msg, '\n'); } - const auto &response = get(data.response); + const auto &response = get(session.response); const auto &message = response.get(); if (message.result() != boost::beast::http::status::ok) { const auto msg = std::make_tuple("Error downloading \"", cachingData->url, "\" to \"", cachingData->destinationFilePath, diff --git a/librepomgr/webclient/session.cpp b/librepomgr/webclient/session.cpp index a44aba6..3a8263a 100644 --- a/librepomgr/webclient/session.cpp +++ b/librepomgr/webclient/session.cpp @@ -13,6 +13,7 @@ #include #include +#include using namespace std; using namespace boost::asio; @@ -30,8 +31,26 @@ HttpClientError::HttpClientError(const char *context, boost::beast::error_code e { } +void Session::setChunkHandler(ChunkHandler &&handler) +{ + m_chunkProcessing = std::make_unique(); + m_chunkProcessing->onChunkHeader = std::bind(&Session::onChunkHeader, shared_from_this(), std::placeholders::_1, std::placeholders::_2, std::placeholders::_3); + m_chunkProcessing->onChunkBody = std::bind(&Session::onChunkBody, shared_from_this(), std::placeholders::_1, std::placeholders::_2, std::placeholders::_3); + m_chunkProcessing->handler = std::move(handler); +} + void Session::run(const char *host, const char *port, http::verb verb, const char *target, unsigned int version) { + // set SNI Hostname (many hosts need this to handshake successfully) + auto *const sslStream = std::get_if(&m_stream); + if (sslStream && !SSL_ctrl( + sslStream->native_handle(), SSL_CTRL_SET_TLSEXT_HOSTNAME, TLSEXT_NAMETYPE_host_name, reinterpret_cast(const_cast(host)))) { + m_handler(*this, + HttpClientError( + "setting SNI hostname", boost::beast::error_code{ static_cast(::ERR_get_error()), boost::asio::error::get_ssl_category() })); + return; + } + // set up an HTTP request message request.version(version); request.method(verb); @@ -49,12 +68,25 @@ void Session::run(const char *host, const char *port, http::verb verb, const cha m_handler(*this, HttpClientError("opening output file", errorCode)); return; } + } else if (m_chunkProcessing) { + auto &emptyResponse = response.emplace(); + emptyResponse.on_chunk_header(m_chunkProcessing->onChunkHeader); + emptyResponse.on_chunk_body(m_chunkProcessing->onChunkBody); } // look up the domain name m_resolver.async_resolve(host, port, boost::asio::ip::tcp::resolver::canonical_name | boost::asio::ip::tcp::resolver::passive | boost::asio::ip::tcp::resolver::all_matching, - std::bind(&Session::resolved, shared_from_this(), std::placeholders::_1, std::placeholders::_2)); + std::bind(&Session::resolved, shared_from_this(), std::placeholders::_1, std::placeholders::_2)); +} + +inline Session::RawSocket &Session::socket() +{ + auto *socket = std::get_if(&m_stream); + if (!socket) { + socket = &std::get(m_stream).next_layer(); + } + return *socket; } void Session::resolved(boost::beast::error_code ec, ip::tcp::resolver::results_type results) @@ -65,7 +97,7 @@ void Session::resolved(boost::beast::error_code ec, ip::tcp::resolver::results_t } // make the connection on the IP address we get from a lookup - boost::asio::async_connect(m_socket, results.begin(), results.end(), std::bind(&Session::connected, shared_from_this(), std::placeholders::_1)); + boost::asio::async_connect(socket(), results.begin(), results.end(), std::bind(&Session::connected, shared_from_this(), std::placeholders::_1)); } void Session::connected(boost::beast::error_code ec) @@ -75,8 +107,30 @@ void Session::connected(boost::beast::error_code ec) return; } - // perform the SSL handshake - http::async_write(m_socket, request, std::bind(&Session::requested, shared_from_this(), std::placeholders::_1, std::placeholders::_2)); + if (auto *const sslStream = std::get_if(&m_stream)) { + // perform the SSL handshake + sslStream->async_handshake(ssl::stream_base::client, std::bind(&Session::handshakeDone, shared_from_this(), std::placeholders::_1)); + } else { + sendRequest(); + } +} + +void Session::handshakeDone(boost::beast::error_code ec) +{ + if (ec) { + m_handler(*this, HttpClientError("SSL handshake", ec)); + return; + } + sendRequest(); +} + +void Session::sendRequest() +{ + // send the HTTP request to the remote host + std::visit([this](auto &&stream) { + boost::beast::http::async_write( + stream, request, std::bind(&Session::requested, shared_from_this(), std::placeholders::_1, std::placeholders::_2)); + }, m_stream); } void Session::requested(boost::beast::error_code ec, std::size_t bytesTransferred) @@ -89,11 +143,73 @@ void Session::requested(boost::beast::error_code ec, std::size_t bytesTransferre // receive the HTTP response std::visit( - [this](auto &&response) { - http::async_read( - m_socket, m_buffer, response, std::bind(&Session::received, shared_from_this(), std::placeholders::_1, std::placeholders::_2)); + [this](auto &stream, auto &&response) { + if constexpr (std::is_same_v, EmptyResponse>) { + http::async_read_header(stream, m_buffer, response, std::bind(&Session::chunkReceived, shared_from_this(), std::placeholders::_1, std::placeholders::_2)); + } else { + http::async_read( + stream, m_buffer, response, std::bind(&Session::received, shared_from_this(), std::placeholders::_1, std::placeholders::_2)); + } }, - response); + m_stream, response); +} + +void Session::onChunkHeader(std::uint64_t chunkSize, boost::beast::string_view extensions, boost::beast::error_code &ec) +{ + // parse the chunk extensions so we can access them easily + m_chunkProcessing->chunkExtensions.parse(extensions, ec); + if (ec) { + return; + } + + if(chunkSize > std::numeric_limits::max()) { + ec = boost::beast::http::error::body_limit; + return; + } + + // make sure we have enough storage, and reset the container for the upcoming chunk + m_chunkProcessing->currentChunk.reserve(static_cast(chunkSize)); + m_chunkProcessing->currentChunk.clear(); +} + +std::size_t Session::onChunkBody(std::uint64_t bytesLeftInThisChunk, boost::beast::string_view chunkBodyData, boost::beast::error_code &ec) +{ + // set the error so that the call to `read` returns if this is the last piece of the chunk body and we can process the chunk + if(bytesLeftInThisChunk == chunkBodyData.size()) { + ec = boost::beast::http::error::end_of_chunk; + } + + // append this piece to our container + m_chunkProcessing->currentChunk.append(chunkBodyData.data(), chunkBodyData.size()); + + return chunkBodyData.size(); + // note: The return value informs the parser of how much of the body we + // consumed. We will indicate that we consumed everything passed in. +} + +void Session::chunkReceived(boost::beast::error_code ec, std::size_t bytesTransferred) +{ + if(ec == boost::beast::http::error::end_of_chunk) { + m_chunkProcessing->handler(m_chunkProcessing->chunkExtensions, m_chunkProcessing->currentChunk); + } else if (ec) { + m_handler(*this, HttpClientError("receiving chunk response", ec)); + return; + } + if (!continueReadingChunks()) { + received(ec, bytesTransferred); + } +} + +bool Session::continueReadingChunks() +{ + auto &parser = std::get(response); + if (parser.is_done()) { + return false; + } + std::visit([this, &parser] (auto &stream) { + boost::beast::http::async_read(stream, m_buffer, parser, std::bind(&Session::chunkReceived, shared_from_this(), std::placeholders::_1, std::placeholders::_2)); + }, m_stream); + return true; } void Session::received(boost::beast::error_code ec, std::size_t bytesTransferred) @@ -105,153 +221,28 @@ void Session::received(boost::beast::error_code ec, std::size_t bytesTransferred } // close the stream gracefully - m_socket.shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec); - - if (ec && ec != boost::beast::errc::not_connected) { - m_handler(*this, HttpClientError("closing connection", ec)); - return; + if (auto *const sslStream = std::get_if(&m_stream)) { + // perform the SSL handshake + sslStream->async_shutdown(std::bind(&Session::closed, shared_from_this(), std::placeholders::_1)); + } else if (auto *const socket = std::get_if(&m_stream)) { + socket->shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec); + m_handler(*this, ec && ec != boost::beast::errc::not_connected ? HttpClientError("closing connection", ec) : HttpClientError()); } - // if we get here then the connection is closed gracefully - m_handler(*this, HttpClientError()); } -void SslSession::run(const char *host, const char *port, http::verb verb, const char *target, unsigned int version) +void Session::closed(boost::beast::error_code ec) { - // set SNI Hostname (many hosts need this to handshake successfully) - if (!SSL_ctrl( - m_stream.native_handle(), SSL_CTRL_SET_TLSEXT_HOSTNAME, TLSEXT_NAMETYPE_host_name, reinterpret_cast(const_cast(host)))) { - m_handler(*this, - HttpClientError( - "setting SNI hostname", boost::beast::error_code{ static_cast(::ERR_get_error()), boost::asio::error::get_ssl_category() })); - return; - } - - // setup an HTTP request message - request.version(version); - request.method(verb); - request.target(target); - request.set(http::field::host, host); - request.set(http::field::user_agent, APP_NAME " " APP_VERSION); - - // setup a file response - if (!destinationFilePath.empty()) { - auto &fileResponse = response.emplace(); - boost::beast::error_code errorCode; - fileResponse.body_limit(100 * 1024 * 1024); - fileResponse.get().body().open(destinationFilePath.data(), file_mode::write, errorCode); - if (errorCode != boost::beast::errc::success) { - m_handler(*this, HttpClientError("opening output file", errorCode)); - return; - } - } - - // look up the domain name - m_resolver.async_resolve(host, port, - boost::asio::ip::tcp::resolver::canonical_name | boost::asio::ip::tcp::resolver::passive | boost::asio::ip::tcp::resolver::all_matching, - std::bind(&SslSession::resolved, shared_from_this(), std::placeholders::_1, std::placeholders::_2)); + // rationale regarding boost::asio::error::eof: http://stackoverflow.com/questions/25587403/boost-asio-ssl-async-shutdown-always-finishes-with-an-error + m_handler(*this, ec && ec != boost::asio::error::eof ? HttpClientError("closing connection", ec) : HttpClientError()); } -void SslSession::resolved(boost::beast::error_code ec, ip::tcp::resolver::results_type results) +std::variant> runSessionFromUrl( + boost::asio::io_context &ioContext, boost::asio::ssl::context &sslContext, std::string_view url, + Session::Handler &&handler, std::string &&destinationPath, + std::string_view userName, std::string_view password, + boost::beast::http::verb verb, Session::ChunkHandler &&chunkHandler) { - if (ec) { - m_handler(*this, HttpClientError("resolving", ec)); - return; - } - - // make the connection on the IP address we get from a lookup - boost::asio::async_connect( - m_stream.next_layer(), results.begin(), results.end(), std::bind(&SslSession::connected, shared_from_this(), std::placeholders::_1)); -} - -void SslSession::connected(boost::beast::error_code ec) -{ - if (ec) { - m_handler(*this, HttpClientError("connecting", ec)); - return; - } - - // perform the SSL handshake - m_stream.async_handshake(ssl::stream_base::client, std::bind(&SslSession::handshakeDone, shared_from_this(), std::placeholders::_1)); -} - -void SslSession::handshakeDone(boost::beast::error_code ec) -{ - if (ec) { - m_handler(*this, HttpClientError("SSL handshake", ec)); - return; - } - - // send the HTTP request to the remote host - boost::beast::http::async_write( - m_stream, request, std::bind(&SslSession::requested, shared_from_this(), std::placeholders::_1, std::placeholders::_2)); -} - -void SslSession::requested(boost::beast::error_code ec, std::size_t bytesTransferred) -{ - boost::ignore_unused(bytesTransferred); - if (ec) { - m_handler(*this, HttpClientError("sending request", ec)); - return; - } - - // receive the HTTP response - std::visit( - [this](auto &&response) { - http::async_read( - m_stream, m_buffer, response, std::bind(&SslSession::received, shared_from_this(), std::placeholders::_1, std::placeholders::_2)); - }, - response); -} - -void SslSession::received(boost::beast::error_code ec, std::size_t bytesTransferred) -{ - boost::ignore_unused(bytesTransferred); - if (ec) { - m_handler(*this, HttpClientError("receiving response", ec)); - return; - } - - // close the stream gracefully - m_stream.async_shutdown(std::bind(&SslSession::closed, shared_from_this(), std::placeholders::_1)); -} - -void SslSession::closed(boost::beast::error_code ec) -{ - if (ec == boost::asio::error::eof) { - // rationale: http://stackoverflow.com/questions/25587403/boost-asio-ssl-async-shutdown-always-finishes-with-an-error - ec = {}; - } - if (ec) { - m_handler(*this, HttpClientError("closing connection", ec)); - return; - } - // if we get here then the connection is closed gracefully - m_handler(*this, HttpClientError()); -} - -template -std::variant, std::shared_ptr> runSession(const std::string &host, const std::string &port, - const std::string &target, std::function &&handler, std::string &&destinationPath, - std::string_view userName, std::string_view password, boost::beast::http::verb verb, ArgType &&...args) -{ - auto session = make_shared(args..., [handler{ move(handler) }](auto &session2, const HttpClientError &error) mutable { - handler(SessionData{ session2.shared_from_this(), session2.request, session2.response, session2.destinationFilePath }, error); - }); - if (!userName.empty()) { - const auto authInfo = userName % ":" + password; - session->request.set(boost::beast::http::field::authorization, - "Basic " + encodeBase64(reinterpret_cast(authInfo.data()), static_cast(authInfo.size()))); - } - session->destinationFilePath = move(destinationPath); - session->run(host.data(), port.data(), verb, target.data()); - return std::variant, std::shared_ptr>(std::move(session)); -} - -std::variant, std::shared_ptr> runSessionFromUrl(boost::asio::io_context &ioContext, - boost::asio::ssl::context &sslContext, std::string_view url, std::function &&handler, - std::string &&destinationPath, std::string_view userName, std::string_view password, boost::beast::http::verb verb) -{ - string host, port, target; + std::string host, port, target; auto ssl = false; if (startsWith(url, "http:")) { @@ -260,7 +251,7 @@ std::variant, std::shared_ptr> runS url = url.substr(6); ssl = true; } else { - return "db mirror for database has unsupported protocol"; + return std::string("unsupported protocol"); } auto urlParts = splitStringSimple>(url, "/"); @@ -285,11 +276,18 @@ std::variant, std::shared_ptr> runS port = ssl ? "443" : "80"; } - if (ssl) { - return runSession(host, port, target, move(handler), move(destinationPath), userName, password, verb, ioContext, sslContext); - } else { - return runSession(host, port, target, move(handler), move(destinationPath), userName, password, verb, ioContext); + auto session = ssl ? std::make_shared(ioContext, sslContext, std::move(handler)) : std::make_shared(ioContext, std::move(handler)); + if (!userName.empty()) { + const auto authInfo = userName % ":" + password; + session->request.set(boost::beast::http::field::authorization, + "Basic " + encodeBase64(reinterpret_cast(authInfo.data()), static_cast(authInfo.size()))); } + session->destinationFilePath = std::move(destinationPath); + if (chunkHandler) { + session->setChunkHandler(std::move(chunkHandler)); + } + session->run(host.data(), port.data(), verb, target.data()); + return std::variant>(std::move(session)); } } // namespace WebClient diff --git a/librepomgr/webclient/session.h b/librepomgr/webclient/session.h index 835cde4..cb8ae62 100644 --- a/librepomgr/webclient/session.h +++ b/librepomgr/webclient/session.h @@ -43,66 +43,50 @@ inline LibRepoMgr::WebClient::HttpClientError::operator bool() const using Response = WebAPI::Response; using FileResponse = boost::beast::http::response_parser; -using MultiResponse = std::variant; +using EmptyResponse = boost::beast::http::response_parser; +using MultiResponse = std::variant; using Request = boost::beast::http::request; -struct LIBREPOMGR_EXPORT SessionData { - std::shared_ptr session; - Request &request; - MultiResponse &response; - std::string &destinationFilePath; -}; +struct ChunkProcessing; class LIBREPOMGR_EXPORT Session : public std::enable_shared_from_this { public: using Handler = std::function; + using ChunkHandler = std::function; - template explicit Session(boost::asio::io_context &ioContext, const Handler &handler = Handler()); + template + explicit Session(boost::asio::io_context &ioContext, const Handler &handler = Handler()); + template + explicit Session(boost::asio::io_context &ioContext, Handler &&handler = Handler()); + template + explicit Session(boost::asio::io_context &ioContext, boost::asio::ssl::context &sslContext, const Handler &handler = Handler()); + template + explicit Session(boost::asio::io_context &ioContext, boost::asio::ssl::context &sslContext, Handler &&handler); + void setChunkHandler(ChunkHandler &&handler); void run(const char *host, const char *port, boost::beast::http::verb verb, const char *target, unsigned int version = 11); private: - void resolved(boost::beast::error_code ec, boost::asio::ip::tcp::resolver::results_type results); - void connected(boost::beast::error_code ec); - void requested(boost::beast::error_code ec, std::size_t bytesTransferred); - void received(boost::beast::error_code ec, std::size_t bytesTransferred); + using RawSocket = boost::asio::ip::tcp::socket; + using SslStream = boost::asio::ssl::stream; + struct ChunkProcessing { + boost::beast::http::chunk_extensions chunkExtensions; + std::string currentChunk; + std::function onChunkHeader; + std::function onChunkBody; + Session::ChunkHandler handler; + }; -public: - Request request; - MultiResponse response; - std::string destinationFilePath; + RawSocket &socket(); -private: - boost::asio::ip::tcp::resolver m_resolver; - boost::asio::ip::tcp::socket m_socket; - boost::beast::flat_buffer m_buffer; - Handler m_handler; -}; - -template -inline Session::Session(boost::asio::io_context &ioContext, const Handler &handler) - : response(ResponseType{}) - , m_resolver(ioContext) - , m_socket(ioContext) - , m_handler(handler) -{ -} - -class LIBREPOMGR_EXPORT SslSession : public std::enable_shared_from_this { -public: - using Handler = std::function; - - template - explicit SslSession(boost::asio::io_context &ioContext, boost::asio::ssl::context &sslContext, const Handler &handler = Handler()); - template - explicit SslSession(boost::asio::io_context &ioContext, boost::asio::ssl::context &sslContext, Handler &&handler); - - void run(const char *host, const char *port, boost::beast::http::verb verb, const char *target, unsigned int version = 11); - -private: void resolved(boost::beast::error_code ec, boost::asio::ip::tcp::resolver::results_type results); void connected(boost::beast::error_code ec); void handshakeDone(boost::beast::error_code ec); + void sendRequest(); void requested(boost::beast::error_code ec, std::size_t bytesTransferred); + void onChunkHeader(std::uint64_t chunkSize, boost::beast::string_view extensions, boost::beast::error_code &ec); + std::size_t onChunkBody(std::uint64_t bytesLeftInThisChunk, boost::beast::string_view chunkBodyData, boost::beast::error_code &ec); + void chunkReceived(boost::beast::error_code ec, std::size_t bytesTransferred); + bool continueReadingChunks(); void received(boost::beast::error_code ec, std::size_t bytesTransferred); void closed(boost::beast::error_code ec); @@ -113,34 +97,53 @@ public: private: boost::asio::ip::tcp::resolver m_resolver; - boost::asio::ssl::stream m_stream; + std::variant m_stream; boost::beast::flat_buffer m_buffer; + std::unique_ptr m_chunkProcessing; Handler m_handler; }; template -inline SslSession::SslSession(boost::asio::io_context &ioContext, boost::asio::ssl::context &sslContext, const Handler &handler) +inline Session::Session(boost::asio::io_context &ioContext, const Handler &handler) : response(ResponseType{}) , m_resolver(ioContext) - , m_stream(ioContext, sslContext) + , m_stream(RawSocket{ioContext}) , m_handler(handler) { } template -inline SslSession::SslSession(boost::asio::io_context &ioContext, boost::asio::ssl::context &sslContext, Handler &&handler) +inline Session::Session(boost::asio::io_context &ioContext, Handler &&handler) : response(ResponseType{}) , m_resolver(ioContext) - , m_stream(ioContext, sslContext) + , m_stream(RawSocket{ioContext}) + , m_handler(std::move(handler)) +{ +} + +template +inline Session::Session(boost::asio::io_context &ioContext, boost::asio::ssl::context &sslContext, const Handler &handler) + : response(ResponseType{}) + , m_resolver(ioContext) + , m_stream(SslStream{ioContext, sslContext}) , m_handler(handler) { } -LIBREPOMGR_EXPORT std::variant, std::shared_ptr> runSessionFromUrl( +template +inline Session::Session(boost::asio::io_context &ioContext, boost::asio::ssl::context &sslContext, Handler &&handler) + : response(ResponseType{}) + , m_resolver(ioContext) + , m_stream(SslStream{ioContext, sslContext}) + , m_handler(std::move(handler)) +{ +} + +LIBREPOMGR_EXPORT std::variant> runSessionFromUrl( boost::asio::io_context &ioContext, boost::asio::ssl::context &sslContext, std::string_view url, - std::function &&handler, std::string &&destinationPath = std::string(), + Session::Handler &&handler, std::string &&destinationPath = std::string(), std::string_view userName = std::string_view(), std::string_view password = std::string_view(), - boost::beast::http::verb verb = boost::beast::http::verb::get); + boost::beast::http::verb verb = boost::beast::http::verb::get, Session::ChunkHandler &&chunkHandler = Session::ChunkHandler()); } // namespace WebClient } // namespace LibRepoMgr