lmdb-safe/lmdb-safe.cc

358 lines
9.5 KiB
C++
Raw Normal View History

2018-12-07 13:52:17 +01:00
#include "lmdb-safe.hh"
2018-12-07 18:17:03 +01:00
#include <fcntl.h>
#include <mutex>
#include <memory>
#include <sys/stat.h>
#include <string.h>
#include <map>
using namespace std;
2018-12-08 14:08:26 +01:00
static string MDBError(int rc)
{
return mdb_strerror(rc);
}
MDBDbi::MDBDbi(MDB_env* env, MDB_txn* txn, const string_view dbname, int flags)
2018-12-08 14:08:26 +01:00
{
// A transaction that uses this function must finish (either commit or abort) before any other transaction in the process may use this function.
2018-12-27 17:49:41 +01:00
int rc = mdb_dbi_open(txn, dbname.empty() ? 0 : &dbname[0], flags, &d_dbi);
2018-12-08 14:08:26 +01:00
if(rc)
throw std::runtime_error("Unable to open named database: " + MDBError(rc));
2018-12-08 14:08:26 +01:00
// Database names are keys in the unnamed database, and may be read but not written.
}
MDBEnv::MDBEnv(const char* fname, int flags, int mode)
2018-12-08 14:08:26 +01:00
{
mdb_env_create(&d_env);
if(mdb_env_set_mapsize(d_env, 16ULL*4096*244140ULL)) // 4GB
2018-12-08 14:08:26 +01:00
throw std::runtime_error("setting map size");
/*
Various other options may also need to be set before opening the handle, e.g. mdb_env_set_mapsize(), mdb_env_set_maxreaders(), mdb_env_set_maxdbs(),
*/
mdb_env_set_maxdbs(d_env, 128);
// we need MDB_NOTLS since we rely on its semantics
if(int rc=mdb_env_open(d_env, fname, flags | MDB_NOTLS, mode)) {
2018-12-08 14:08:26 +01:00
// If this function fails, mdb_env_close() must be called to discard the MDB_env handle.
mdb_env_close(d_env);
throw std::runtime_error("Unable to open database file "+std::string(fname)+": " + MDBError(rc));
2018-12-08 14:08:26 +01:00
}
}
void MDBEnv::incROTX()
{
std::lock_guard<std::mutex> l(d_countmutex);
2018-12-08 14:08:26 +01:00
++d_ROtransactionsOut[std::this_thread::get_id()];
}
void MDBEnv::decROTX()
{
std::lock_guard<std::mutex> l(d_countmutex);
2018-12-08 14:08:26 +01:00
--d_ROtransactionsOut[std::this_thread::get_id()];
}
void MDBEnv::incRWTX()
{
std::lock_guard<std::mutex> l(d_countmutex);
2018-12-08 14:08:26 +01:00
++d_RWtransactionsOut[std::this_thread::get_id()];
}
void MDBEnv::decRWTX()
{
std::lock_guard<std::mutex> l(d_countmutex);
2018-12-08 14:08:26 +01:00
--d_RWtransactionsOut[std::this_thread::get_id()];
}
int MDBEnv::getRWTX()
{
std::lock_guard<std::mutex> l(d_countmutex);
2018-12-08 14:08:26 +01:00
return d_RWtransactionsOut[std::this_thread::get_id()];
}
int MDBEnv::getROTX()
{
std::lock_guard<std::mutex> l(d_countmutex);
2018-12-08 14:08:26 +01:00
return d_ROtransactionsOut[std::this_thread::get_id()];
}
std::shared_ptr<MDBEnv> getMDBEnv(const char* fname, int flags, int mode)
2018-12-07 18:17:03 +01:00
{
struct Value
{
weak_ptr<MDBEnv> wp;
int flags;
};
static std::map<tuple<dev_t, ino_t>, Value> s_envs;
static std::mutex mut;
struct stat statbuf;
if(stat(fname, &statbuf)) {
if(errno != ENOENT)
throw std::runtime_error("Unable to stat prospective mdb database: "+string(strerror(errno)));
else {
std::lock_guard<std::mutex> l(mut);
auto fresh = std::make_shared<MDBEnv>(fname, flags, mode);
2018-12-07 18:17:03 +01:00
if(stat(fname, &statbuf))
throw std::runtime_error("Unable to stat prospective mdb database: "+string(strerror(errno)));
auto key = std::tie(statbuf.st_dev, statbuf.st_ino);
s_envs[key] = {fresh, flags};
return fresh;
}
}
std::lock_guard<std::mutex> l(mut);
auto key = std::tie(statbuf.st_dev, statbuf.st_ino);
auto iter = s_envs.find(key);
if(iter != s_envs.end()) {
auto sp = iter->second.wp.lock();
if(sp) {
if(iter->second.flags != flags)
throw std::runtime_error("Can't open mdb with differing flags");
return sp;
}
else {
s_envs.erase(iter); // useful if make_shared fails
}
}
2018-12-08 14:08:26 +01:00
auto fresh = std::make_shared<MDBEnv>(fname, flags, mode);
2018-12-07 18:17:03 +01:00
s_envs[key] = {fresh, flags};
return fresh;
}
2018-12-07 13:52:17 +01:00
MDBDbi MDBEnv::openDB(const string_view dbname, int flags)
2018-12-07 13:52:17 +01:00
{
unsigned int envflags;
mdb_env_get_flags(d_env, &envflags);
/*
This function must not be called from multiple concurrent transactions in the same process. A transaction that uses this function must finish (either commit or abort) before any other transaction in the process may use this function.
*/
std::lock_guard<std::mutex> l(d_openmut);
2018-12-07 13:52:17 +01:00
if(!(envflags & MDB_RDONLY)) {
auto rwt = getRWTransaction();
2018-12-27 17:49:41 +01:00
MDBDbi ret = rwt.openDB(dbname, flags);
2018-12-07 13:52:17 +01:00
rwt.commit();
return ret;
}
2018-12-27 17:49:41 +01:00
MDBDbi ret;
{
auto rwt = getROTransaction();
ret = rwt.openDB(dbname, flags);
}
return ret;
2018-12-07 13:52:17 +01:00
}
MDB_txn *MDBRWTransaction::openRWTransaction(MDBEnv *env, MDB_txn *parent, int flags)
{
MDB_txn *result;
if(env->getROTX() || env->getRWTX())
throw std::runtime_error("Duplicate RW transaction");
for(int tries =0 ; tries < 3; ++tries) { // it might happen twice, who knows
if(int rc=mdb_txn_begin(env->d_env, parent, flags, &result)) {
if(rc == MDB_MAP_RESIZED && tries < 2) {
// "If the mapsize is increased by another process (..) mdb_txn_begin() will return MDB_MAP_RESIZED.
// call mdb_env_set_mapsize with a size of zero to adopt the new size."
mdb_env_set_mapsize(env->d_env, 0);
continue;
}
throw std::runtime_error("Unable to start RW transaction: "+std::string(mdb_strerror(rc)));
}
break;
}
env->incRWTX();
return result;
}
MDBRWTransaction::MDBRWTransaction(MDBEnv* parent, int flags):
MDBROTransaction(parent, openRWTransaction(parent, nullptr, flags)),
d_rw_cursors(new decltype(d_rw_cursors)::element_type())
{
}
MDBRWTransaction::~MDBRWTransaction()
{
abort();
}
void MDBRWTransaction::commit()
{
closeRORWCursors();
if (!d_txn) {
return;
}
if(int rc = mdb_txn_commit(d_txn)) {
throw std::runtime_error("committing: " + std::string(mdb_strerror(rc)));
}
environment().decRWTX();
d_txn = nullptr;
}
void MDBRWTransaction::abort()
{
closeRORWCursors();
if (!d_txn) {
return;
}
mdb_txn_abort(d_txn);
// prevent the RO destructor from cleaning up the transaction itself
environment().decRWTX();
d_txn = nullptr;
}
MDBROTransaction::MDBROTransaction(MDBEnv *parent, MDB_txn *txn):
d_parent(parent),
d_cursors(new decltype(d_cursors)::element_type()),
d_txn(txn)
{
}
MDB_txn *MDBROTransaction::openROTransaction(MDBEnv *env, MDB_txn *parent, int flags)
{
if(env->getRWTX())
throw std::runtime_error("Duplicate RO transaction");
/*
A transaction and its cursors must only be used by a single thread, and a thread may only have a single transaction at a time. If MDB_NOTLS is in use, this does not apply to read-only transactions. */
MDB_txn *result = nullptr;
for(int tries =0 ; tries < 3; ++tries) { // it might happen twice, who knows
if(int rc=mdb_txn_begin(env->d_env, parent, MDB_RDONLY | flags, &result)) {
if(rc == MDB_MAP_RESIZED && tries < 2) {
// "If the mapsize is increased by another process (..) mdb_txn_begin() will return MDB_MAP_RESIZED.
// call mdb_env_set_mapsize with a size of zero to adopt the new size."
mdb_env_set_mapsize(env->d_env, 0);
continue;
}
throw std::runtime_error("Unable to start RO transaction: "+string(mdb_strerror(rc)));
}
break;
}
env->incROTX();
return result;
}
void MDBROTransaction::closeROCursors()
{
if (!d_cursors) {
return;
}
// we need to move the vector away to ensure that the cursors dont mess with our iteration.
std::vector<MDBROCursor*> buf;
std::swap(*d_cursors, buf);
for (auto &cursor: buf) {
cursor->close();
}
}
MDBROTransaction::MDBROTransaction(MDBEnv *parent, int flags):
MDBROTransaction(parent, openROTransaction(parent, nullptr, flags))
{
}
MDBROTransaction::~MDBROTransaction()
{
// this is safe because C++ will not call overrides of virtual methods in destructors.
commit();
}
void MDBROTransaction::abort()
{
closeROCursors();
// if d_txn is non-nullptr here, either the transaction object was invalidated earlier (e.g. by moving from it), or it is an RW transaction which has already cleaned up the d_txn pointer (with an abort).
if (d_txn) {
d_parent->decROTX();
mdb_txn_abort(d_txn); // this appears to work better than abort for r/o database opening
d_txn = nullptr;
}
}
void MDBROTransaction::commit()
{
closeROCursors();
// if d_txn is non-nullptr here, either the transaction object was invalidated earlier (e.g. by moving from it), or it is an RW transaction which has already cleaned up the d_txn pointer (with an abort).
if (d_txn) {
d_parent->decROTX();
mdb_txn_commit(d_txn); // this appears to work better than abort for r/o database opening
d_txn = nullptr;
}
}
2018-12-09 14:37:08 +01:00
void MDBRWTransaction::clear(MDB_dbi dbi)
{
if(int rc = mdb_drop(d_txn, dbi, 0)) {
throw runtime_error("Error clearing database: " + MDBError(rc));
}
}
MDBRWCursor MDBRWTransaction::getRWCursor(const MDBDbi& dbi)
{
MDB_cursor *cursor;
int rc= mdb_cursor_open(d_txn, dbi, &cursor);
if(rc) {
throw std::runtime_error("Error creating RO cursor: "+std::string(mdb_strerror(rc)));
}
return MDBRWCursor(*d_rw_cursors, cursor);
}
MDBRWCursor MDBRWTransaction::getCursor(const MDBDbi &dbi)
2018-12-07 13:52:17 +01:00
{
return getRWCursor(dbi);
2018-12-07 13:52:17 +01:00
}
MDBROTransaction MDBEnv::getROTransaction()
{
return MDBROTransaction(this);
}
MDBRWTransaction MDBEnv::getRWTransaction()
{
return MDBRWTransaction(this);
}
void MDBRWTransaction::closeRWCursors()
2018-12-07 13:52:17 +01:00
{
if (!d_rw_cursors) {
return;
}
decltype(d_rw_cursors)::element_type buf;
std::swap(*d_rw_cursors, buf);
for (auto &cursor: buf) {
cursor->close();
}
2018-12-07 13:52:17 +01:00
}
MDBROCursor MDBROTransaction::getCursor(const MDBDbi& dbi)
{
return getROCursor(dbi);
}
MDBROCursor MDBROTransaction::getROCursor(const MDBDbi &dbi)
{
MDB_cursor *cursor;
int rc= mdb_cursor_open(d_txn, dbi, &cursor);
if(rc) {
throw std::runtime_error("Error creating RO cursor: "+std::string(mdb_strerror(rc)));
}
return MDBROCursor(*d_cursors, cursor);
2018-12-07 13:52:17 +01:00
}
2018-12-08 14:08:26 +01:00