fix mixup with mode & flags parameters, implement transaction restart in case of resize
This commit is contained in:
parent
348e9bd730
commit
ac9da997a0
|
@ -23,8 +23,10 @@ Among the things to keep in mind when using LMDB natively:
|
|||
* When opening a named database, no other threads may do that at the same time
|
||||
* Cursors within RO transactions need freeing, but cursors within RW
|
||||
transactions must not be freed.
|
||||
* A new transaction may indicate the database has grown, and you need to
|
||||
restart the transaction then.
|
||||
|
||||
Breaking these rules causes no immediate errors, but does lead to silent
|
||||
Breaking these rules may cause no immediate errors, but can lead to silent
|
||||
data corruption, missing updates, or random crashes. Again, this is not an
|
||||
actual bug in LMDB, it means that LMDB expects you to use it according to
|
||||
its exact rules. And who are we to disagree?
|
||||
|
|
57
lmdb-safe.cc
57
lmdb-safe.cc
|
@ -22,12 +22,11 @@ MDBDbi::MDBDbi(MDB_env* env, MDB_txn* txn, const char* dbname, int flags)
|
|||
// Database names are keys in the unnamed database, and may be read but not written.
|
||||
}
|
||||
|
||||
MDBEnv::MDBEnv(const char* fname, int mode, int flags)
|
||||
MDBEnv::MDBEnv(const char* fname, int flags, int mode)
|
||||
{
|
||||
mdb_env_create(&d_env);
|
||||
if(mdb_env_set_mapsize(d_env, 4ULL*4096*244140ULL)) // 4GB
|
||||
throw std::runtime_error("setting map size");
|
||||
|
||||
/*
|
||||
Various other options may also need to be set before opening the handle, e.g. mdb_env_set_mapsize(), mdb_env_set_maxreaders(), mdb_env_set_maxdbs(),
|
||||
*/
|
||||
|
@ -35,7 +34,7 @@ Various other options may also need to be set before opening the handle, e.g. md
|
|||
mdb_env_set_maxdbs(d_env, 128);
|
||||
|
||||
// we need MDB_NOTLS since we rely on its semantics
|
||||
if(int rc=mdb_env_open(d_env, fname, mode, flags | MDB_NOTLS)) {
|
||||
if(int rc=mdb_env_open(d_env, fname, flags | MDB_NOTLS, mode)) {
|
||||
// If this function fails, mdb_env_close() must be called to discard the MDB_env handle.
|
||||
mdb_env_close(d_env);
|
||||
throw std::runtime_error("Unable to open database file "+std::string(fname)+": " + MDBError(rc));
|
||||
|
@ -78,7 +77,7 @@ int MDBEnv::getROTX()
|
|||
}
|
||||
|
||||
|
||||
std::shared_ptr<MDBEnv> getMDBEnv(const char* fname, int mode, int flags)
|
||||
std::shared_ptr<MDBEnv> getMDBEnv(const char* fname, int flags, int mode)
|
||||
{
|
||||
struct Value
|
||||
{
|
||||
|
@ -95,7 +94,7 @@ std::shared_ptr<MDBEnv> getMDBEnv(const char* fname, int mode, int flags)
|
|||
throw std::runtime_error("Unable to stat prospective mdb database: "+string(strerror(errno)));
|
||||
else {
|
||||
std::lock_guard<std::mutex> l(mut);
|
||||
auto fresh = std::make_shared<MDBEnv>(fname, mode, flags);
|
||||
auto fresh = std::make_shared<MDBEnv>(fname, flags, mode);
|
||||
if(stat(fname, &statbuf))
|
||||
throw std::runtime_error("Unable to stat prospective mdb database: "+string(strerror(errno)));
|
||||
auto key = std::tie(statbuf.st_dev, statbuf.st_ino);
|
||||
|
@ -120,7 +119,7 @@ std::shared_ptr<MDBEnv> getMDBEnv(const char* fname, int mode, int flags)
|
|||
}
|
||||
}
|
||||
|
||||
auto fresh = std::make_shared<MDBEnv>(fname, mode, flags);
|
||||
auto fresh = std::make_shared<MDBEnv>(fname, flags, mode);
|
||||
s_envs[key] = {fresh, flags};
|
||||
|
||||
return fresh;
|
||||
|
@ -147,6 +146,52 @@ MDBDbi MDBEnv::openDB(const char* dbname, int flags)
|
|||
return rwt.openDB(dbname, flags);
|
||||
}
|
||||
|
||||
MDBRWTransaction::MDBRWTransaction(MDBEnv* parent, int flags) : d_parent(parent)
|
||||
{
|
||||
if(d_parent->getROTX() || d_parent->getRWTX())
|
||||
throw std::runtime_error("Duplicate transaction");
|
||||
|
||||
for(int tries =0 ; tries < 3; ++tries) { // it might happen twice, who knows
|
||||
if(int rc=mdb_txn_begin(d_parent->d_env, 0, flags, &d_txn)) {
|
||||
if(rc == MDB_MAP_RESIZED && tries < 2) {
|
||||
// "If the mapsize is increased by another process (..) mdb_txn_begin() will return MDB_MAP_RESIZED.
|
||||
// call mdb_env_set_mapsize with a size of zero to adopt the new size."
|
||||
mdb_env_set_mapsize(d_parent->d_env, 0);
|
||||
continue;
|
||||
}
|
||||
throw std::runtime_error("Unable to start RW transaction: "+std::string(mdb_strerror(rc)));
|
||||
}
|
||||
break;
|
||||
}
|
||||
d_parent->incRWTX();
|
||||
}
|
||||
|
||||
MDBROTransaction::MDBROTransaction(MDBEnv* parent, int flags) : d_parent(parent)
|
||||
{
|
||||
if(d_parent->getRWTX())
|
||||
throw std::runtime_error("Duplicate transaction");
|
||||
|
||||
/*
|
||||
A transaction and its cursors must only be used by a single thread, and a thread may only have a single transaction at a time. If MDB_NOTLS is in use, this does not apply to read-only transactions. */
|
||||
|
||||
for(int tries =0 ; tries < 3; ++tries) { // it might happen twice, who knows
|
||||
if(int rc=mdb_txn_begin(d_parent->d_env, 0, MDB_RDONLY | flags, &d_txn)) {
|
||||
if(rc == MDB_MAP_RESIZED && tries < 2) {
|
||||
// "If the mapsize is increased by another process (..) mdb_txn_begin() will return MDB_MAP_RESIZED.
|
||||
// call mdb_env_set_mapsize with a size of zero to adopt the new size."
|
||||
mdb_env_set_mapsize(d_parent->d_env, 0);
|
||||
continue;
|
||||
}
|
||||
|
||||
throw std::runtime_error("Unable to start RO transaction: "+string(mdb_strerror(rc)));
|
||||
}
|
||||
break;
|
||||
}
|
||||
d_parent->incROTX();
|
||||
}
|
||||
|
||||
|
||||
|
||||
void MDBRWTransaction::clear(MDB_dbi dbi)
|
||||
{
|
||||
if(int rc = mdb_drop(d_txn, dbi, 0)) {
|
||||
|
|
39
lmdb-safe.hh
39
lmdb-safe.hh
|
@ -43,10 +43,19 @@ class MDBROTransaction;
|
|||
class MDBEnv
|
||||
{
|
||||
public:
|
||||
MDBEnv(const char* fname, int mode, int flags);
|
||||
MDBEnv(const char* fname, int flags, int mode);
|
||||
|
||||
~MDBEnv()
|
||||
{
|
||||
for(auto& a : d_RWtransactionsOut) {
|
||||
if(a.second)
|
||||
cout << "thread " <<a.first<<" had "<<a.second<<" RW transactions open"<<endl;
|
||||
}
|
||||
for(auto& a : d_ROtransactionsOut) {
|
||||
if(a.second)
|
||||
cout << "thread " <<a.first<<" had "<<a.second<<" RO transactions open"<<endl;
|
||||
}
|
||||
|
||||
// Only a single thread may call this function. All transactions, databases, and cursors must already be closed before calling this function
|
||||
mdb_env_close(d_env);
|
||||
// but, elsewhere, docs say database handles do not need to be closed?
|
||||
|
@ -76,26 +85,14 @@ private:
|
|||
std::map<std::thread::id, int> d_ROtransactionsOut;
|
||||
};
|
||||
|
||||
std::shared_ptr<MDBEnv> getMDBEnv(const char* fname, int mode, int flags);
|
||||
std::shared_ptr<MDBEnv> getMDBEnv(const char* fname, int flags, int mode);
|
||||
|
||||
class MDBROCursor;
|
||||
|
||||
class MDBROTransaction
|
||||
{
|
||||
public:
|
||||
explicit MDBROTransaction(MDBEnv* parent, int flags=0) : d_parent(parent)
|
||||
{
|
||||
if(d_parent->getRWTX())
|
||||
throw std::runtime_error("Duplicate transaction");
|
||||
|
||||
/*
|
||||
A transaction and its cursors must only be used by a single thread, and a thread may only have a single transaction at a time. If MDB_NOTLS is in use, this does not apply to read-only transactions. */
|
||||
|
||||
if(mdb_txn_begin(d_parent->d_env, 0, MDB_RDONLY | flags, &d_txn))
|
||||
throw std::runtime_error("Unable to start RO transaction");
|
||||
d_parent->incROTX();
|
||||
}
|
||||
|
||||
explicit MDBROTransaction(MDBEnv* parent, int flags=0);
|
||||
|
||||
MDBROTransaction(MDBROTransaction&& rhs)
|
||||
{
|
||||
|
@ -211,15 +208,7 @@ class MDBRWCursor;
|
|||
class MDBRWTransaction
|
||||
{
|
||||
public:
|
||||
explicit MDBRWTransaction(MDBEnv* parent, int flags=0) : d_parent(parent)
|
||||
{
|
||||
if(d_parent->getROTX() || d_parent->getRWTX())
|
||||
throw std::runtime_error("Duplicate transaction");
|
||||
|
||||
if(int rc=mdb_txn_begin(d_parent->d_env, 0, flags, &d_txn))
|
||||
throw std::runtime_error("Unable to start RW transaction: "+std::string(mdb_strerror(rc)));
|
||||
d_parent->incRWTX();
|
||||
}
|
||||
explicit MDBRWTransaction(MDBEnv* parent, int flags=0);
|
||||
|
||||
MDBRWTransaction(MDBRWTransaction&& rhs)
|
||||
{
|
||||
|
@ -298,7 +287,7 @@ public:
|
|||
int get(MDB_dbi dbi, const MDB_val& key, MDB_val& val)
|
||||
{
|
||||
if(!d_txn)
|
||||
throw std::runtime_error("Attempt to use a closed transaction for get");
|
||||
throw std::runtime_error("Attempt to use a closed RW transaction for get");
|
||||
|
||||
int rc = mdb_get(d_txn, dbi, (MDB_val*)&key, &val);
|
||||
if(rc && rc != MDB_NOTFOUND)
|
||||
|
|
|
@ -0,0 +1,171 @@
|
|||
#include "lmdb-safe.hh"
|
||||
#include <sstream>
|
||||
#include <boost/archive/binary_oarchive.hpp>
|
||||
#include <boost/archive/binary_iarchive.hpp>
|
||||
|
||||
|
||||
struct Record
|
||||
{
|
||||
|
||||
friend class boost::serialization::access;
|
||||
template<class Archive>
|
||||
void serialize(Archive & ar, const unsigned int version)
|
||||
{
|
||||
ar & id & domain_id & name & type & ttl & content & enabled & auth;
|
||||
}
|
||||
|
||||
unsigned int id;
|
||||
unsigned int domain_id; // needs index
|
||||
std::string name; // needs index
|
||||
std::string type;
|
||||
unsigned int ttl{0};
|
||||
std::string content;
|
||||
bool enabled{true};
|
||||
bool auth{true};
|
||||
|
||||
};
|
||||
|
||||
struct MDBVal
|
||||
{
|
||||
MDBVal(unsigned int v) : d_v(v)
|
||||
{
|
||||
d_mdbval.mv_size = sizeof(d_v);
|
||||
d_mdbval.mv_data = &d_v;
|
||||
}
|
||||
|
||||
MDBVal(const std::string& str) : d_str(str)
|
||||
{
|
||||
d_mdbval.mv_size = str.size();
|
||||
d_mdbval.mv_data = (void*)str.c_str();
|
||||
}
|
||||
operator MDB_val&()
|
||||
{
|
||||
return d_mdbval;
|
||||
}
|
||||
unsigned int d_v;
|
||||
std::string d_str;
|
||||
MDB_val d_mdbval;
|
||||
};
|
||||
|
||||
static unsigned int getMaxID(MDBRWTransaction& txn, MDBDbi& dbi)
|
||||
{
|
||||
auto cursor = txn.getCursor(dbi);
|
||||
MDB_val maxidval, maxcontent;
|
||||
unsigned int maxid{0};
|
||||
if(!cursor.get(maxidval, maxcontent, MDB_LAST)) {
|
||||
memcpy(&maxid, maxidval.mv_data, 4);
|
||||
}
|
||||
return maxid;
|
||||
}
|
||||
|
||||
static void store(MDBRWTransaction& txn, MDBDbi& records, MDBDbi& domainidx, MDBDbi&nameidx, const Record& r)
|
||||
{
|
||||
ostringstream oss;
|
||||
boost::archive::binary_oarchive oa(oss,boost::archive::no_header );
|
||||
oa << r;
|
||||
|
||||
txn.put(records, MDBVal(r.id), MDBVal(oss.str()), MDB_APPEND);
|
||||
txn.put(domainidx, MDBVal(r.domain_id), MDBVal(r.id));
|
||||
txn.put(nameidx, MDBVal(r.name), MDBVal(r.id));
|
||||
}
|
||||
|
||||
|
||||
int main(int argc, char** argv)
|
||||
{
|
||||
auto env = getMDBEnv("pdns", 0, 0600);
|
||||
auto records = env->openDB("records", MDB_INTEGERKEY | MDB_CREATE );
|
||||
auto domainidx = env->openDB("domainidx", MDB_INTEGERKEY | MDB_DUPFIXED | MDB_DUPSORT | MDB_CREATE);
|
||||
auto nameidx = env->openDB("nameidx", MDB_DUPFIXED | MDB_DUPSORT | MDB_CREATE);
|
||||
|
||||
auto txn = env->getRWTransaction();
|
||||
|
||||
/*
|
||||
txn.clear(records);
|
||||
txn.clear(domainidx);
|
||||
txn.clear(domainidx);
|
||||
txn.clear(nameidx);
|
||||
*/
|
||||
|
||||
unsigned int maxid=getMaxID(txn, records);
|
||||
unsigned int maxdomainid=getMaxID(txn, domainidx);
|
||||
|
||||
cout<<"Maxid = "<<maxid<<", Max domain ID = "<<maxdomainid<<endl;
|
||||
|
||||
string prefix(argv[1]);
|
||||
auto lim=atoi(argv[2]);
|
||||
for(int n=0; n < lim; ++n) {
|
||||
string domain(prefix+std::to_string(n)+".com");
|
||||
Record r;
|
||||
r.id=++maxid;
|
||||
r.domain_id = ++maxdomainid;
|
||||
r.name = domain;
|
||||
r.ttl = 3600;
|
||||
r.type = "SOA";
|
||||
r.content = "ns1.powerdns.com ahu.powerdns.com 1";
|
||||
|
||||
store(txn, records, domainidx, nameidx, r);
|
||||
|
||||
r.id=++maxid;
|
||||
r.type="NS";
|
||||
r.content="ns1.powerdns.com";
|
||||
store(txn, records, domainidx, nameidx, r);
|
||||
|
||||
|
||||
r.id=++maxid;
|
||||
r.type="A";
|
||||
r.content="1.2.3.4";
|
||||
store(txn, records, domainidx, nameidx, r);
|
||||
|
||||
r.id=++maxid;
|
||||
r.type="AAAA";
|
||||
r.content="::1";
|
||||
store(txn, records, domainidx, nameidx, r);
|
||||
|
||||
r.id=++maxid;
|
||||
r.type="CAA";
|
||||
r.content="letsencrypt.org";
|
||||
store(txn, records, domainidx, nameidx, r);
|
||||
|
||||
|
||||
r.id=++maxid;
|
||||
r.type="AAAA";
|
||||
r.name="www."+domain;
|
||||
r.content="::1";
|
||||
store(txn, records, domainidx, nameidx, r);
|
||||
|
||||
r.id=++maxid;
|
||||
r.type="A";
|
||||
r.name="www."+domain;
|
||||
r.content="127.0.0.1";
|
||||
store(txn, records, domainidx, nameidx, r);
|
||||
}
|
||||
|
||||
txn.commit();
|
||||
|
||||
auto rotxn = env->getROTransaction();
|
||||
auto rotxn2 = env->getROTransaction();
|
||||
|
||||
auto rocursor = rotxn.getCursor(nameidx);
|
||||
|
||||
MDB_val data;
|
||||
int count = 0;
|
||||
while(!rocursor.get(MDBVal("www.powerdns.com"), data, count ? MDB_NEXT_DUP : MDB_SET)) {
|
||||
unsigned int id;
|
||||
memcpy(&id, data.mv_data, 4);
|
||||
cout<<"Got something: id="<<id<<endl;
|
||||
MDB_val record;
|
||||
|
||||
if(!rotxn.get(records, data, record)) {
|
||||
Record test;
|
||||
stringstream istr{std::string((char*)record.mv_data, record.mv_size)};
|
||||
boost::archive::binary_iarchive oi(istr,boost::archive::no_header );
|
||||
oi >> test;
|
||||
cout <<"Record: "<<test.name<<" "<<test.type <<" " <<test.ttl<<" "<<test.content<<endl;
|
||||
}
|
||||
else {
|
||||
cout<<"Did not find anything for id "<<id<<endl;
|
||||
}
|
||||
++count;
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue