Add async locks
This might be useful to avoid blocking threads in the thread pool just for waiting on a global lock. It might also be useful to allow stopping build actions while they're waiting for a lock.
This commit is contained in:
parent
b1acbee127
commit
c404a092cc
|
@ -3,6 +3,8 @@
|
||||||
|
|
||||||
#include <condition_variable>
|
#include <condition_variable>
|
||||||
#include <cstdint>
|
#include <cstdint>
|
||||||
|
#include <functional>
|
||||||
|
#include <list>
|
||||||
#include <mutex>
|
#include <mutex>
|
||||||
#include <shared_mutex>
|
#include <shared_mutex>
|
||||||
|
|
||||||
|
@ -16,17 +18,23 @@ struct LogContext;
|
||||||
struct GlobalSharedMutex {
|
struct GlobalSharedMutex {
|
||||||
void lock();
|
void lock();
|
||||||
bool try_lock();
|
bool try_lock();
|
||||||
|
void lock_async(std::function<void()> &&callback);
|
||||||
void unlock();
|
void unlock();
|
||||||
|
|
||||||
void lock_shared();
|
void lock_shared();
|
||||||
bool try_lock_shared();
|
bool try_lock_shared();
|
||||||
|
void lock_shared_async(std::function<void()> &&callback);
|
||||||
void unlock_shared();
|
void unlock_shared();
|
||||||
|
|
||||||
private:
|
private:
|
||||||
|
void notify(std::unique_lock<std::mutex> &lock);
|
||||||
|
|
||||||
std::mutex m_mutex;
|
std::mutex m_mutex;
|
||||||
std::condition_variable m_cv;
|
std::condition_variable m_cv;
|
||||||
std::uint32_t m_sharedOwners = 0;
|
std::uint32_t m_sharedOwners = 0;
|
||||||
bool m_exclusivelyOwned = false;
|
bool m_exclusivelyOwned = false;
|
||||||
|
std::list<std::function<void()>> m_sharedCallbacks;
|
||||||
|
std::function<void()> m_exclusiveCallback;
|
||||||
};
|
};
|
||||||
|
|
||||||
inline void GlobalSharedMutex::lock()
|
inline void GlobalSharedMutex::lock()
|
||||||
|
@ -48,12 +56,23 @@ inline bool GlobalSharedMutex::try_lock()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
inline void GlobalSharedMutex::lock_async(std::function<void()> &&callback)
|
||||||
|
{
|
||||||
|
auto lock = std::unique_lock<std::mutex>(m_mutex);
|
||||||
|
if (m_sharedOwners || m_exclusivelyOwned) {
|
||||||
|
m_exclusiveCallback = std::move(callback);
|
||||||
|
} else {
|
||||||
|
m_exclusivelyOwned = true;
|
||||||
|
lock.unlock();
|
||||||
|
callback();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
inline void GlobalSharedMutex::unlock()
|
inline void GlobalSharedMutex::unlock()
|
||||||
{
|
{
|
||||||
auto lock = std::unique_lock<std::mutex>(m_mutex);
|
auto lock = std::unique_lock<std::mutex>(m_mutex);
|
||||||
m_exclusivelyOwned = false;
|
m_exclusivelyOwned = false;
|
||||||
lock.unlock();
|
notify(lock);
|
||||||
m_cv.notify_one();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
inline void GlobalSharedMutex::lock_shared()
|
inline void GlobalSharedMutex::lock_shared()
|
||||||
|
@ -75,13 +94,51 @@ inline bool GlobalSharedMutex::try_lock_shared()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
inline void GlobalSharedMutex::lock_shared_async(std::function<void()> &&callback)
|
||||||
|
{
|
||||||
|
auto lock = std::unique_lock<std::mutex>(m_mutex);
|
||||||
|
if (m_exclusivelyOwned) {
|
||||||
|
m_sharedCallbacks.emplace_back(std::move(callback));
|
||||||
|
} else {
|
||||||
|
++m_sharedOwners;
|
||||||
|
lock.unlock();
|
||||||
|
callback();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
inline void GlobalSharedMutex::unlock_shared()
|
inline void GlobalSharedMutex::unlock_shared()
|
||||||
{
|
{
|
||||||
auto lock = std::unique_lock<std::mutex>(m_mutex);
|
auto lock = std::unique_lock<std::mutex>(m_mutex);
|
||||||
if (!--m_sharedOwners) {
|
if (!--m_sharedOwners) {
|
||||||
|
notify(lock);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
inline void GlobalSharedMutex::notify(std::unique_lock<std::mutex> &lock)
|
||||||
|
{
|
||||||
|
// invoke callbacks for lock_shared_async()
|
||||||
|
if (!m_sharedCallbacks.empty() && !m_exclusivelyOwned) {
|
||||||
|
auto callbacks = std::move(m_sharedCallbacks);
|
||||||
|
m_sharedOwners += static_cast<std::uint32_t>(callbacks.size());
|
||||||
|
lock.unlock();
|
||||||
|
for (auto &callback : callbacks) {
|
||||||
|
callback();
|
||||||
|
}
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
// invoke callbacks for lock_async()
|
||||||
|
if (m_exclusiveCallback) {
|
||||||
|
if (!m_sharedOwners && !m_exclusivelyOwned) {
|
||||||
|
auto callback = std::move(m_exclusiveCallback);
|
||||||
|
m_exclusivelyOwned = true;
|
||||||
|
lock.unlock();
|
||||||
|
callback();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// resume threads blocked in lock() and lock_shared()
|
||||||
lock.unlock();
|
lock.unlock();
|
||||||
m_cv.notify_one();
|
m_cv.notify_one();
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// \brief A wrapper around a standard lock which logs acquisition/release.
|
/// \brief A wrapper around a standard lock which logs acquisition/release.
|
||||||
|
|
|
@ -21,10 +21,12 @@ using namespace LibRepoMgr;
|
||||||
class UtilsTests : public TestFixture {
|
class UtilsTests : public TestFixture {
|
||||||
CPPUNIT_TEST_SUITE(UtilsTests);
|
CPPUNIT_TEST_SUITE(UtilsTests);
|
||||||
CPPUNIT_TEST(testGlobalLock);
|
CPPUNIT_TEST(testGlobalLock);
|
||||||
|
CPPUNIT_TEST(testGlobalLockAsync);
|
||||||
CPPUNIT_TEST(testLockTable);
|
CPPUNIT_TEST(testLockTable);
|
||||||
CPPUNIT_TEST_SUITE_END();
|
CPPUNIT_TEST_SUITE_END();
|
||||||
|
|
||||||
void testGlobalLock();
|
void testGlobalLock();
|
||||||
|
void testGlobalLockAsync();
|
||||||
void testLockTable();
|
void testLockTable();
|
||||||
|
|
||||||
public:
|
public:
|
||||||
|
@ -73,6 +75,44 @@ void UtilsTests::testGlobalLock()
|
||||||
mutex.unlock();
|
mutex.unlock();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void UtilsTests::testGlobalLockAsync()
|
||||||
|
{
|
||||||
|
auto mutex = GlobalSharedMutex();
|
||||||
|
auto sharedLock1 = false, sharedLock2 = false;
|
||||||
|
mutex.lock_shared_async([&sharedLock1] { sharedLock1 = true; });
|
||||||
|
CPPUNIT_ASSERT(sharedLock1);
|
||||||
|
mutex.lock_shared_async([&sharedLock2] { sharedLock2 = true; }); // locking twice is not a problem, also not from the same thread
|
||||||
|
CPPUNIT_ASSERT(sharedLock2);
|
||||||
|
auto thread1 = std::thread([&mutex] {
|
||||||
|
mutex.unlock_shared(); // unlocking from another thread is ok
|
||||||
|
});
|
||||||
|
auto lock1 = false, lock2 = false;
|
||||||
|
auto thread2 = std::thread([&mutex, &lock2] {
|
||||||
|
mutex.lock();
|
||||||
|
lock2 = true;
|
||||||
|
});
|
||||||
|
mutex.lock_async([&lock1] { lock1 = true; });
|
||||||
|
CPPUNIT_ASSERT_MESSAGE("lock_async() not yet invoked", !lock1);
|
||||||
|
CPPUNIT_ASSERT_MESSAGE("blocking lock() not yet invoked", !lock2);
|
||||||
|
thread1.join();
|
||||||
|
mutex.unlock_shared();
|
||||||
|
CPPUNIT_ASSERT_MESSAGE("lock_async() callback invoked via unlock_shared()", lock1);
|
||||||
|
CPPUNIT_ASSERT_MESSAGE("blocking lock() not yet invoked (async callbacks are handled first)", !lock2);
|
||||||
|
mutex.unlock(); // release async lock so …
|
||||||
|
thread2.join(); // … thread2 is able to acquire the mutex exclusively (and then terminate)
|
||||||
|
CPPUNIT_ASSERT_MESSAGE("try_lock_shared() returns false if mutex exclusively locked", !mutex.try_lock_shared());
|
||||||
|
auto sharedLock3 = false;
|
||||||
|
mutex.lock_shared_async([&sharedLock3] { sharedLock3 = true; });
|
||||||
|
mutex.unlock();
|
||||||
|
CPPUNIT_ASSERT_MESSAGE("lock_async() callback invoked via unlock()", lock1);
|
||||||
|
CPPUNIT_ASSERT_MESSAGE("try_lock_shared() possible if mutex only shared locked", mutex.try_lock_shared());
|
||||||
|
mutex.unlock_shared();
|
||||||
|
CPPUNIT_ASSERT_MESSAGE("try_lock() returns false if mutex has still shared locked", !mutex.try_lock());
|
||||||
|
mutex.unlock_shared();
|
||||||
|
CPPUNIT_ASSERT_MESSAGE("try_lock() possible if mutex not locked", mutex.try_lock());
|
||||||
|
mutex.unlock();
|
||||||
|
}
|
||||||
|
|
||||||
void UtilsTests::testLockTable()
|
void UtilsTests::testLockTable()
|
||||||
{
|
{
|
||||||
auto log = LogContext();
|
auto log = LogContext();
|
||||||
|
|
Loading…
Reference in New Issue