Use `sendfile()` to speed up copying

This commit is contained in:
Martchus 2019-08-14 21:57:17 +02:00
parent 7a5a02976a
commit ec891b48f6
2 changed files with 153 additions and 12 deletions

View File

@ -6,12 +6,23 @@
#include <functional> #include <functional>
#include <iostream> #include <iostream>
#if defined(CPP_UTILITIES_USE_NATIVE_FILE_BUFFER) && defined(PLATFORM_LINUX)
#define CPP_UTILITIES_USE_SEND_FILE
#include <c++utilities/conversion/stringbuilder.h>
#include <errno.h>
#include <sys/sendfile.h>
#include <algorithm>
#include <cstring>
#endif
namespace CppUtilities { namespace CppUtilities {
/*! /*!
* \class CopyHelper * \class CopyHelper
* \brief The CopyHelper class helps to copy bytes from one stream to another. * \brief The CopyHelper class helps to copy bytes from one stream to another.
* \tparam Specifies the buffer size. * \tparam Specifies the chunk/buffer size.
*/ */
template <std::size_t bufferSize> class CPP_UTILITIES_EXPORT CopyHelper { template <std::size_t bufferSize> class CPP_UTILITIES_EXPORT CopyHelper {
public: public:
@ -55,8 +66,8 @@ template <std::size_t bufferSize> void CopyHelper<bufferSize>::copy(std::istream
* \brief Copies \a count bytes from \a input to \a output. The procedure might be aborted and * \brief Copies \a count bytes from \a input to \a output. The procedure might be aborted and
* progress updates will be reported. * progress updates will be reported.
* *
* Copying is aborted when \a isAborted returns true. The current progress is reported by calling * Before processing the next chunk \a isAborted is checked and the copying aborted if it returns true. Before processing the next chunk
* the specified \a callback function. * \a callback is invoked to report the current progress.
* *
* \remarks Set an exception mask using std::ios::exceptions() to get a std::ios_base::failure exception * \remarks Set an exception mask using std::ios::exceptions() to get a std::ios_base::failure exception
* when an IO error occurs. * when an IO error occurs.
@ -85,10 +96,35 @@ void CopyHelper<bufferSize>::callbackCopy(std::istream &input, std::ostream &out
* \remarks * \remarks
* - Set an exception mask using std::ios::exceptions() to get a std::ios_base::failure exception * - Set an exception mask using std::ios::exceptions() to get a std::ios_base::failure exception
* when an IO error occurs. * when an IO error occurs.
* - Possibly uses native APIs such as POSIX sendfile() to improve the speed (not implemented yet). * - Possibly uses native APIs such as POSIX sendfile() to improve the speed.
*/ */
template <std::size_t bufferSize> void CopyHelper<bufferSize>::copy(NativeFileStream &input, NativeFileStream &output, std::uint64_t count) template <std::size_t bufferSize> void CopyHelper<bufferSize>::copy(NativeFileStream &input, NativeFileStream &output, std::uint64_t count)
{ {
#ifdef CPP_UTILITIES_USE_SEND_FILE
if (output.fileDescriptor() != -1 && input.fileDescriptor() != -1) {
const auto inputTellg = output.tellg();
const auto inputTellp = output.tellp();
const auto outputTellg = output.tellg();
const auto outputTellp = output.tellp();
input.flush();
output.flush();
const auto totalBytes = static_cast<std::streamoff>(count);
while (count) {
const auto bytesCopied = ::sendfile64(output.fileDescriptor(), input.fileDescriptor(), nullptr, count);
if (bytesCopied < 0) {
throw std::ios_base::failure(argsToString("sendfile64() failed: ", std::strerror(errno)));
}
count -= static_cast<std::size_t>(bytesCopied);
}
input.sync();
output.sync();
output.seekg(outputTellg + totalBytes);
output.seekp(outputTellp + totalBytes);
input.seekg(inputTellg + totalBytes);
input.seekp(inputTellp + totalBytes);
return;
}
#endif
copy(static_cast<std::istream &>(input), static_cast<std::ostream &>(output), count); copy(static_cast<std::istream &>(input), static_cast<std::ostream &>(output), count);
} }
@ -96,17 +132,47 @@ template <std::size_t bufferSize> void CopyHelper<bufferSize>::copy(NativeFileSt
* \brief Copies \a count bytes from \a input to \a output. The procedure might be aborted and * \brief Copies \a count bytes from \a input to \a output. The procedure might be aborted and
* progress updates will be reported. * progress updates will be reported.
* *
* Copying is aborted when \a isAborted returns true. The current progress is reported by calling * Before processing the next chunk \a isAborted is checked and the copying aborted if it returns true. Before processing the next chunk
* the specified \a callback function. * \a callback is invoked to report the current progress.
* *
* - Set an exception mask using std::ios::exceptions() to get a std::ios_base::failure exception * - Set an exception mask using std::ios::exceptions() to get a std::ios_base::failure exception
* when an IO error occurs. * when an IO error occurs.
* - Possibly uses native APIs such as POSIX sendfile() to improve the speed (not implemented yet). * - Possibly uses native APIs such as POSIX sendfile() to improve the speed.
*/ */
template <std::size_t bufferSize> template <std::size_t bufferSize>
void CopyHelper<bufferSize>::callbackCopy(NativeFileStream &input, NativeFileStream &output, std::uint64_t count, void CopyHelper<bufferSize>::callbackCopy(NativeFileStream &input, NativeFileStream &output, std::uint64_t count,
const std::function<bool(void)> &isAborted, const std::function<void(double)> &callback) const std::function<bool(void)> &isAborted, const std::function<void(double)> &callback)
{ {
#ifdef CPP_UTILITIES_USE_SEND_FILE
if (output.fileDescriptor() != -1 && input.fileDescriptor() != -1) {
const auto inputTellg = output.tellg();
const auto inputTellp = output.tellp();
const auto outputTellg = output.tellg();
const auto outputTellp = output.tellp();
input.flush();
output.flush();
const auto totalBytes = static_cast<std::streamoff>(count);
while (count) {
const auto bytesCopied = ::sendfile64(output.fileDescriptor(), input.fileDescriptor(), nullptr, std::min(count, bufferSize));
if (bytesCopied < 0) {
throw std::ios_base::failure(argsToString("sendfile64() failed: ", std::strerror(errno)));
}
count -= static_cast<std::uint64_t>(bytesCopied);
if (isAborted()) {
return;
}
callback(static_cast<double>(totalBytes - static_cast<std::streamoff>(count)) / static_cast<double>(totalBytes));
}
input.sync();
output.sync();
output.seekg(outputTellg + totalBytes);
output.seekp(outputTellp + totalBytes);
input.seekg(inputTellg + totalBytes);
input.seekp(inputTellp + totalBytes);
callback(1.0);
return;
}
#endif
callbackCopy(static_cast<std::istream &>(input), static_cast<std::ostream &>(output), count, isAborted, callback); callbackCopy(static_cast<std::istream &>(input), static_cast<std::ostream &>(output), count, isAborted, callback);
} }

View File

@ -67,6 +67,7 @@ class IoTests : public TestFixture {
CPPUNIT_TEST(testIniFile); CPPUNIT_TEST(testIniFile);
CPPUNIT_TEST(testAdvancedIniFile); CPPUNIT_TEST(testAdvancedIniFile);
CPPUNIT_TEST(testCopy); CPPUNIT_TEST(testCopy);
CPPUNIT_TEST(testCopyWithNativeFileStream);
CPPUNIT_TEST(testReadFile); CPPUNIT_TEST(testReadFile);
CPPUNIT_TEST(testWriteFile); CPPUNIT_TEST(testWriteFile);
CPPUNIT_TEST(testAnsiEscapeCodes); CPPUNIT_TEST(testAnsiEscapeCodes);
@ -87,6 +88,7 @@ public:
void testIniFile(); void testIniFile();
void testAdvancedIniFile(); void testAdvancedIniFile();
void testCopy(); void testCopy();
void testCopyWithNativeFileStream();
void testReadFile(); void testReadFile();
void testWriteFile(); void testWriteFile();
void testAnsiEscapeCodes(); void testAnsiEscapeCodes();
@ -499,23 +501,96 @@ void IoTests::testAdvancedIniFile()
void IoTests::testCopy() void IoTests::testCopy()
{ {
// prepare streams // prepare streams
fstream testFile; auto testFile = std::fstream();
testFile.exceptions(ios_base::failbit | ios_base::badbit); testFile.exceptions(ios_base::failbit | ios_base::badbit);
testFile.open(testFilePath("some_data"), ios_base::in | ios_base::binary); testFile.open(testFilePath("some_data"), ios_base::in | ios_base::binary);
stringstream outputStream(ios_base::in | ios_base::out | ios_base::binary); auto outputStream = std::stringstream(ios_base::in | ios_base::out | ios_base::binary);
outputStream.exceptions(ios_base::failbit | ios_base::badbit); outputStream.exceptions(ios_base::failbit | ios_base::badbit);
// copy // copy
CopyHelper<13> copyHelper; auto copyHelper = CopyHelper<13>();
copyHelper.copy(testFile, outputStream, 50); copyHelper.copy(testFile, outputStream, 50);
CPPUNIT_ASSERT_EQUAL(static_cast<std::fstream::pos_type>(0), outputStream.tellg());
CPPUNIT_ASSERT_EQUAL(static_cast<std::fstream::pos_type>(50), outputStream.tellp());
CPPUNIT_ASSERT_EQUAL(static_cast<std::fstream::pos_type>(50), testFile.tellg());
CPPUNIT_ASSERT_EQUAL(static_cast<std::fstream::pos_type>(50), testFile.tellp());
// test // verify
testFile.seekg(0); testFile.seekg(0);
for (auto i = 0; i < 50; ++i) { for (auto i = 0; i < 50; ++i) {
CPPUNIT_ASSERT(testFile.get() == outputStream.get()); CPPUNIT_ASSERT_EQUAL(testFile.get(), outputStream.get());
} }
} }
/*!
* \brief Tests CopyHelper in combination with NativeFileStream.
*/
void IoTests::testCopyWithNativeFileStream()
{
// prepare streams
auto testFile = NativeFileStream();
testFile.exceptions(ios_base::failbit | ios_base::badbit);
testFile.open(testFilePath("some_data"), ios_base::in | ios_base::binary);
auto outputPath = workingCopyPath("copied_data", WorkingCopyMode::Cleanup);
auto outputStream = NativeFileStream();
outputStream.exceptions(ios_base::failbit | ios_base::badbit);
outputStream.open(outputPath, ios_base::out | ios_base::binary);
// copy
auto copyHelper = CopyHelper<13>();
copyHelper.copy(testFile, outputStream, 50);
CPPUNIT_ASSERT(outputStream.is_open());
CPPUNIT_ASSERT(testFile.is_open());
// test seek positions (the expected values are from a run with normal std::fstream)
CPPUNIT_ASSERT_EQUAL(static_cast<std::fstream::pos_type>(50), outputStream.tellg());
CPPUNIT_ASSERT_EQUAL(static_cast<std::fstream::pos_type>(50), outputStream.tellp());
CPPUNIT_ASSERT_EQUAL(static_cast<std::fstream::pos_type>(50), testFile.tellg());
CPPUNIT_ASSERT_EQUAL(static_cast<std::fstream::pos_type>(50), testFile.tellp());
// add a few more bytes to output stream (to be sure it is still usable) and re-open for reading
const auto aFewMoreBytes = "a few more bytes"sv;
outputStream << aFewMoreBytes;
outputStream.close();
outputStream.open(outputPath, ios_base::in | ios_base::binary);
// verify
testFile.seekg(0);
for (auto i = 0; i < 50; ++i) {
CPPUNIT_ASSERT_EQUAL(testFile.get(), outputStream.get());
}
auto tail = std::string(aFewMoreBytes.size(), '0');
outputStream.read(tail.data(), static_cast<std::streamsize>(tail.size()));
CPPUNIT_ASSERT_EQUAL(aFewMoreBytes, std::string_view(tail.data(), tail.size()));
// repeat with callback version
auto percentage = 0.0;
const auto isAborted = [] { return false; };
const auto callback = [&percentage] (double p) { percentage = p; };
testFile.seekg(0);
outputStream.open(outputPath, ios_base::out | ios_base::trunc | ios_base::binary);
copyHelper.callbackCopy(testFile, outputStream, 50, isAborted, callback);
CPPUNIT_ASSERT_EQUAL(1.0, percentage);
// verify again
CPPUNIT_ASSERT(outputStream.is_open());
CPPUNIT_ASSERT(testFile.is_open());
CPPUNIT_ASSERT_EQUAL(static_cast<std::fstream::pos_type>(50), outputStream.tellg());
CPPUNIT_ASSERT_EQUAL(static_cast<std::fstream::pos_type>(50), outputStream.tellp());
CPPUNIT_ASSERT_EQUAL(static_cast<std::fstream::pos_type>(50), testFile.tellg());
CPPUNIT_ASSERT_EQUAL(static_cast<std::fstream::pos_type>(50), testFile.tellp());
outputStream << aFewMoreBytes;
outputStream.close();
outputStream.open(outputPath, ios_base::in | ios_base::binary);
testFile.seekg(0);
for (auto i = 0; i < 50; ++i) {
CPPUNIT_ASSERT_EQUAL(testFile.get(), outputStream.get());
}
tail.assign(aFewMoreBytes.size(), '0');
outputStream.read(tail.data(), static_cast<std::streamsize>(tail.size()));
CPPUNIT_ASSERT_EQUAL(aFewMoreBytes, std::string_view(tail.data(), tail.size()));
}
/*! /*!
* \brief Tests readFile(). * \brief Tests readFile().
*/ */