some docs, basic example

This commit is contained in:
bert hubert 2018-12-08 14:08:26 +01:00
parent e664945846
commit c40719f671
6 changed files with 409 additions and 90 deletions

View File

@ -3,11 +3,11 @@ LIBS=lmdb-LMDB_0.9.22/libraries/liblmdb/liblmdb.a
INCLUDES=-Ilmdb-LMDB_0.9.22/libraries/liblmdb
#LIBS=-llmdb
CXXFLAGS:=-std=gnu++11 -Wall -O2 -MMD -MP -ggdb -pthread $(INCLUDES)
CXXFLAGS:=-std=gnu++17 -Wall -O2 -MMD -MP -ggdb -pthread $(INCLUDES) # -fsanitize=address -fno-omit-frame-pointer
CFLAGS:= -Wall -O2 -MMD -MP -ggdb
PROGRAMS = lmdb-test
PROGRAMS = lmdb-test basic-example
all: $(PROGRAMS)
@ -18,4 +18,7 @@ clean:
lmdb-test: lmdb-test.o lmdb-safe.o
g++ -std=gnu++11 $^ -o $@ -pthread $(LIBS)
g++ -std=gnu++17 $^ -o $@ -pthread $(LIBS) #-lasan
basic-example: basic-example.o lmdb-safe.o
g++ -std=gnu++17 $^ -o $@ -pthread $(LIBS)

View File

@ -1,2 +1,89 @@
# lmdb-safe
A safe modern C++ wrapper of lmdb
A safe modern & performant C++ wrapper of LMDB. For now briefly only
available for C++17, will support C++ 11 again soon. MIT licensed.
[LMDB](http://www.lmdb.tech/doc/index.html) is an outrageously fast
key/value store with semantics that make it highly interesting for many
applications. Of specific note, besides speed, is the full support for
transactions and read/write concurrency. LMDB is also famed for its
robustness.. **when used correctly**.
The design of LMDB is elegant and simple, which aids both the performance
and stability. The downside of this elegant design is a plethora of rules
that need to be followed to not break things. In other words, LMDB delivers
great things but only if you use it exactly right.
Among the things to keep in mind when using LMDB natively:
* Never open a database file more than once anywhere in your process
* Never open more than one transaction within a thread
* .. unless they are all Read Only and have MDB_NOTLS set
* When opening a named database, no other threads may do that at the same time
* Cursors within RO transactions need freeing, but cursors within RW
transactions must not be freed.
Breaking these rules causes no errors, but does lead to silent data
corruption, missing updates, or random crashes.
This LMDB library aims to deliver the full LMDB performance while
programmatically making sure the LMDB semantics are adhered to, with very
limited overhead.
Most common LMDB functionality is wrapped within this library but the native
MDB handles are all available should you want to use functionality we did
not (yet) cater for.
# Example
The following example has no overhead compared to native LMDB, but already
exhibits several ways in which lmdb-safe is easier and safer to use:
```
auto env = getMDBEnv("./database", 0, 0600);
auto dbi = env->openDB("example", MDB_CREATE);
auto txn = env->getRWTransaction();
```
The first line requests an LMDB environment for a database hosted in
`./database`. **Within LMDB, it is not allowed to open a database file more
than once**, not even from other threads, not even when using a different LMDB
handle. `getMDBEnv` keeps a registry of LMDB environments, keyed to the
exact inode. If another part of your process requests access to the same
inode, it will get the same environment.
On the second line, a database is opened within our environment. The
semantics of opening or creating a database within LMDB are tricky. With
some loss of generality, `MDBEnv::openDB` will create a transaction for you
to open the database, and close it too. Most of the time this is what you
want. It is also possible to open a database within a transaction manually.
The third line opens a read/write transaction using the Resource Acquisition
Is Initialization (RAII) technique. If `txn` goes out of scope, the
transaction is aborted automatically. To commit or abort, use `commit()` or
`abort()`, after which going out of scope has no further effect.
```
txn.put(dbi, "lmdb", "great");
string_view data;
if(!txn.get(dbi, "lmdb", data)) {
cout<< "Within RW transaction, found that lmdb = " << data <<endl;
}
else
cout<<"Found nothing" << endl;
txn.commit();
```
LMDB is so fast because it does not copy data unless it really needs to.
Memory bandwidth is a huge determinant of performance on modern CPUs. This
wrapper agrees and using modern C++, it is possible to seemlessly use
'views' on data without copying them. Using these techniques, the call to
`txn.put()` sets the "lmdb" string to "great", without making additional
copies.
We employ the same technique to request the value of "lmdb", which is made
available to us as a read-only view, straight onto the memory mapped data on
disk.
In the final line, we commit the transaction, after which it also becomes
available for other threads and processes.

37
basic-example.cc Normal file
View File

@ -0,0 +1,37 @@
#include "lmdb-safe.hh"
void checkLMDB(MDBEnv* env, MDBDbi dbi)
{
auto rotxn = env->getROTransaction();
string_view data;
if(!rotxn.get(dbi, "lmdb", data)) {
cout<< "Outside RW transaction, found that lmdb = " << data <<endl;
}
else
cout<<"Outside RW transaction, found nothing" << endl;
}
int main()
{
auto env = getMDBEnv("./database", 0, 0600);
auto dbi = env->openDB("example", MDB_CREATE);
auto txn = env->getRWTransaction();
txn.put(dbi, "lmdb", "great");
string_view data;
if(!txn.get(dbi, "lmdb", data)) {
cout<< "Within RW transaction, found that lmdb = " << data <<endl;
}
else
cout<<"Found nothing" << endl;
std::thread elsewhere(checkLMDB, env.get(), dbi);
elsewhere.join();
txn.commit();
cout<<"Committed data"<<endl;
checkLMDB(env.get(), dbi);
}

View File

@ -6,6 +6,78 @@
#include <string.h>
#include <map>
static string MDBError(int rc)
{
return mdb_strerror(rc);
}
MDBDbi::MDBDbi(MDB_env* env, MDB_txn* txn, const char* dbname, int flags)
{
// A transaction that uses this function must finish (either commit or abort) before any other transaction in the process may use this function.
int rc = mdb_dbi_open(txn, dbname, flags, &d_dbi);
if(rc)
throw std::runtime_error("Unable to open database: " + MDBError(rc));
// Database names are keys in the unnamed database, and may be read but not written.
}
MDBEnv::MDBEnv(const char* fname, int mode, int flags)
{
mdb_env_create(&d_env);
if(mdb_env_set_mapsize(d_env, 4096*2000000ULL))
throw std::runtime_error("setting map size");
/*
Various other options may also need to be set before opening the handle, e.g. mdb_env_set_mapsize(), mdb_env_set_maxreaders(), mdb_env_set_maxdbs(),
*/
mdb_env_set_maxdbs(d_env, 128);
// we need MDB_NOTLS since we rely on its semantics
if(int rc=mdb_env_open(d_env, fname, mode, flags | MDB_NOTLS)) {
// If this function fails, mdb_env_close() must be called to discard the MDB_env handle.
mdb_env_close(d_env);
throw std::runtime_error("Unable to open database: " + MDBError(rc));
}
}
void MDBEnv::incROTX()
{
std::lock_guard<std::mutex> l(d_mutex);
++d_ROtransactionsOut[std::this_thread::get_id()];
}
void MDBEnv::decROTX()
{
std::lock_guard<std::mutex> l(d_mutex);
--d_ROtransactionsOut[std::this_thread::get_id()];
}
void MDBEnv::incRWTX()
{
std::lock_guard<std::mutex> l(d_mutex);
++d_RWtransactionsOut[std::this_thread::get_id()];
}
void MDBEnv::decRWTX()
{
std::lock_guard<std::mutex> l(d_mutex);
--d_RWtransactionsOut[std::this_thread::get_id()];
}
int MDBEnv::getRWTX()
{
std::lock_guard<std::mutex> l(d_mutex);
return d_RWtransactionsOut[std::this_thread::get_id()];
}
int MDBEnv::getROTX()
{
std::lock_guard<std::mutex> l(d_mutex);
return d_ROtransactionsOut[std::this_thread::get_id()];
}
std::shared_ptr<MDBEnv> getMDBEnv(const char* fname, int mode, int flags)
{
struct Value
@ -23,7 +95,6 @@ std::shared_ptr<MDBEnv> getMDBEnv(const char* fname, int mode, int flags)
throw std::runtime_error("Unable to stat prospective mdb database: "+string(strerror(errno)));
else {
std::lock_guard<std::mutex> l(mut);
cout<<"Making a fresh one, file did not exist yet"<<endl;
auto fresh = std::make_shared<MDBEnv>(fname, mode, flags);
if(stat(fname, &statbuf))
throw std::runtime_error("Unable to stat prospective mdb database: "+string(strerror(errno)));
@ -37,24 +108,18 @@ std::shared_ptr<MDBEnv> getMDBEnv(const char* fname, int mode, int flags)
auto key = std::tie(statbuf.st_dev, statbuf.st_ino);
auto iter = s_envs.find(key);
if(iter != s_envs.end()) {
cout<<"Found something!"<<endl;
auto sp = iter->second.wp.lock();
if(sp) {
if(iter->second.flags != flags)
throw std::runtime_error("Can't open mdb with differing flags");
cout<<"It was live!"<<endl;
return sp;
}
else {
cout<<"It was dead already"<<endl;
s_envs.erase(iter); // useful if make_shared fails
}
}
else
cout<<"Found nothing"<<endl;
cout<<"Making a fresh one"<<endl;
auto fresh = std::make_shared<MDBEnv>(fname, mode, flags);
s_envs[key] = {fresh, flags};
@ -104,3 +169,24 @@ MDBROCursor MDBROTransaction::getCursor(const MDBDbi& dbi)
{
return MDBROCursor(this, dbi);
}
void MDBRWTransaction::put(MDB_dbi dbi, string_view key, string_view val, int flags)
{
put(dbi, MDB_val{key.size(), (void*)&key[0]}, MDB_val{val.size(), (void*)&val[0]}, flags);
}
int MDBRWTransaction::get(MDB_dbi dbi, string_view key, string_view& val)
{
MDB_val res;
int rc = get(dbi, MDB_val{key.size(), (void*)&key[0]}, res);
val=string_view((char*)res.mv_data, res.mv_size);
return rc;
}
int MDBROTransaction::get(MDB_dbi dbi, string_view key, string_view& val)
{
MDB_val res;
int rc = get(dbi, MDB_val{key.size(), (void*)&key[0]}, res);
val=string_view((char*)res.mv_data, res.mv_size);
return rc;
}

View File

@ -5,16 +5,12 @@
#include <map>
#include <thread>
#include <memory>
#include <mutex>
using namespace std;
/* open issues:
*
* - opening a DBI is still exceptionally painful to get right, especially in a
* multi-threaded world
* - we're not yet protecting you against opening a file twice
* - we are not yet protecting you correctly against opening multiple transactions in 1 thread
* - error reporting is bad
* - missing convenience functions (string_view, string)
*/
@ -26,38 +22,12 @@ The error strategy. Anything that "should never happen" turns into an exception.
Thread safety: we are as safe as lmdb. You can talk to MDBEnv from as many threads as you want
*/
/** MDBDbi is our only 'value type' object, as 1) a dbi is actually an integer
and 2) per LMDB documentation, we never close it. */
class MDBDbi
{
public:
explicit MDBDbi(MDB_env* env, MDB_txn* txn, const char* dbname, int flags)
: d_env(env), d_txn(txn)
{
// A transaction that uses this function must finish (either commit or abort) before any other transaction in the process may use this function.
int rc = mdb_dbi_open(txn, dbname, flags, &d_dbi);
if(rc)
throw std::runtime_error("Unable to open database: "+string(mdb_strerror(rc)));
// Database names are keys in the unnamed database, and may be read but not written.
}
MDBDbi(MDBDbi&& rhs)
{
d_dbi = rhs.d_dbi;
d_env = rhs.d_env;
d_txn = rhs.d_txn;
rhs.d_env = 0;
rhs.d_txn = 0;
}
~MDBDbi()
{
if(d_env)
mdb_dbi_close(d_env, d_dbi);
}
explicit MDBDbi(MDB_env* env, MDB_txn* txn, const char* dbname, int flags);
operator const MDB_dbi&() const
{
@ -65,40 +35,15 @@ public:
}
MDB_dbi d_dbi;
MDB_env* d_env;
MDB_txn* d_txn;
};
class MDBRWTransaction;
class MDBROTransaction;
class MDBEnv
{
public:
MDBEnv(const char* fname, int mode, int flags)
{
mdb_env_create(&d_env); // there is no close
if(mdb_env_set_mapsize(d_env, 4096*2000000ULL))
throw std::runtime_error("setting map size");
/*
Various other options may also need to be set before opening the handle, e.g. mdb_env_set_mapsize(), mdb_env_set_maxreaders(), mdb_env_set_maxdbs(),
*/
mdb_env_set_maxdbs(d_env, 128);
// TODO: check if fname is open somewhere already (under lock)
// we need MDB_NOTLS since we rely on its semantics
if(mdb_env_open(d_env, fname, mode, flags | MDB_NOTLS)) {
// If this function fails, mdb_env_close() must be called to discard the MDB_env handle.
mdb_env_close(d_env);
throw std::runtime_error("Unable to open database");
}
}
MDBEnv(const char* fname, int mode, int flags);
~MDBEnv()
{
@ -117,9 +62,17 @@ Various other options may also need to be set before opening the handle, e.g. md
return d_env;
}
MDB_env* d_env;
int getRWTX();
void incRWTX();
void decRWTX();
int getROTX();
void incROTX();
void decROTX();
private:
std::mutex d_mutex;
std::map<std::thread::id, int> d_RWtransactionsOut;
std::map<std::thread::id, int> d_ROtransactionsOut;
};
std::shared_ptr<MDBEnv> getMDBEnv(const char* fname, int mode, int flags);
@ -131,7 +84,7 @@ class MDBROTransaction
public:
explicit MDBROTransaction(MDBEnv* parent, int flags=0) : d_parent(parent)
{
if(d_parent->d_RWtransactionsOut[std::this_thread::get_id()])
if(d_parent->getRWTX())
throw std::runtime_error("Duplicate transaction");
/*
@ -139,7 +92,7 @@ public:
if(mdb_txn_begin(d_parent->d_env, 0, MDB_RDONLY | flags, &d_txn))
throw std::runtime_error("Unable to start RO transaction");
++d_parent->d_ROtransactionsOut[std::this_thread::get_id()];
d_parent->incROTX();
}
@ -155,23 +108,28 @@ public:
{
// this does not free cursors
mdb_txn_reset(d_txn);
--d_parent->d_ROtransactionsOut[std::this_thread::get_id()];
d_parent->decROTX();
}
void renew()
{
if(d_parent->d_RWtransactionsOut[std::this_thread::get_id()])
if(d_parent->getROTX())
throw std::runtime_error("Duplicate transaction");
d_parent->d_ROtransactionsOut[std::this_thread::get_id()]++;
if(mdb_txn_renew(d_txn))
throw std::runtime_error("Renewing transaction");
d_parent->incROTX();
}
int get(MDB_dbi dbi, const MDB_val& key, MDB_val& val)
{
if(!d_txn)
throw std::runtime_error("Attempt to use a closed RO transaction for get");
return mdb_get(d_txn, dbi, (MDB_val*)&key, &val);
}
int get(MDB_dbi dbi, string_view key, string_view& val);
// this is something you can do, readonly
MDBDbi openDB(const char* dbname, int flags)
@ -184,7 +142,7 @@ public:
~MDBROTransaction()
{
if(d_txn) {
--d_parent->d_ROtransactionsOut[std::this_thread::get_id()];
d_parent->decROTX();
mdb_txn_commit(d_txn); // this appears to work better than abort for r/o database opening
}
}
@ -236,7 +194,7 @@ public:
{
return mdb_cursor_get(d_cursor, &key, &data, op);
}
MDB_cursor* d_cursor;
MDBROTransaction* d_parent;
};
@ -250,11 +208,12 @@ class MDBRWTransaction
public:
explicit MDBRWTransaction(MDBEnv* parent, int flags=0) : d_parent(parent)
{
if(d_parent->d_ROtransactionsOut[std::this_thread::get_id()] || d_parent->d_RWtransactionsOut[std::this_thread::get_id()])
if(d_parent->getROTX() || d_parent->getRWTX())
throw std::runtime_error("Duplicate transaction");
++d_parent->d_RWtransactionsOut[std::this_thread::get_id()];
if(int rc=mdb_txn_begin(d_parent->d_env, 0, flags, &d_txn))
throw std::runtime_error("Unable to start RW transaction: "+std::string(mdb_strerror(rc)));
d_parent->incRWTX();
}
MDBRWTransaction(MDBRWTransaction&& rhs)
@ -281,9 +240,9 @@ public:
~MDBRWTransaction()
{
if(d_txn) {
--d_parent->d_RWtransactionsOut[std::this_thread::get_id()];
d_parent->decRWTX();
closeCursors();
mdb_txn_abort(d_txn);
mdb_txn_abort(d_txn); // XXX check response?
}
}
void closeCursors();
@ -294,7 +253,7 @@ public:
if(mdb_txn_commit(d_txn)) {
throw std::runtime_error("committing");
}
--d_parent->d_RWtransactionsOut[std::this_thread::get_id()];
d_parent->decRWTX();
d_txn=0;
}
@ -302,18 +261,22 @@ public:
void abort()
{
closeCursors();
mdb_txn_abort(d_txn);
mdb_txn_abort(d_txn); // XXX check error?
d_txn = 0;
--d_parent->d_RWtransactionsOut[std::this_thread::get_id()];
d_parent->decRWTX();
}
void put(MDB_dbi dbi, const MDB_val& key, const MDB_val& val, int flags)
void put(MDB_dbi dbi, const MDB_val& key, const MDB_val& val, int flags=0)
{
if(!d_txn)
throw std::runtime_error("Attempt to use a closed RW transaction for put");
int rc;
if((rc=mdb_put(d_txn, dbi, (MDB_val*)&key, (MDB_val*)&val, flags)))
throw std::runtime_error("putting data: " + std::string(mdb_strerror(rc)));
}
void put(MDB_dbi dbi, string_view key, string_view val, int flags=0);
int del(MDB_dbi dbi, const MDB_val& key)
{
@ -327,9 +290,14 @@ public:
int get(MDB_dbi dbi, const MDB_val& key, MDB_val& val)
{
if(!d_txn)
throw std::runtime_error("Attempt to use a closed transaction for get");
return mdb_get(d_txn, dbi, (MDB_val*)&key, &val);
}
int get(MDB_dbi dbi, string_view key, string_view& val);
MDBDbi openDB(const char* dbname, int flags)
{
@ -401,7 +369,7 @@ public:
return mdb_cursor_get(d_cursor, &key, &data, op);
}
int put(MDB_val& key, MDB_val& data, int flags)
int put(MDB_val& key, MDB_val& data, int flags=0)
{
return mdb_cursor_put(d_cursor, &key, &data, flags);
}

View File

@ -4,6 +4,7 @@
#include "lmdb-safe.hh"
#include <unistd.h>
#include <thread>
#include <vector>
static void closeTest()
{
@ -31,9 +32,145 @@ static void closeTest()
}
void doPuts(int tid)
try
{
auto env = getMDBEnv("./database", 0, 0600);
MDBDbi dbi = env->openDB("ahu", MDB_CREATE);
for(int n=0; n < 15; ++n) {
auto txn = env->getRWTransaction();
int val = n + 1000*tid;
txn.put(dbi, {sizeof(val), (char*)&val},
{sizeof(val), (char*)&val});
txn.commit();
cout << "Done with transaction "<<n<<" in thread " << tid<<endl;
}
cout<<"Done with thread "<<tid<<endl;
}
catch(std::exception& e)
{
cout<<"in thread "<<tid<<": "<<e.what()<<endl;
throw;
}
void doGets(int tid)
try
{
auto env = getMDBEnv("./database", 0, 0600);
MDBDbi dbi = env->openDB("ahu", MDB_CREATE);
for(int n=0; n < 15; ++n) {
auto txn = env->getROTransaction();
int val = n + 1000*tid;
MDB_val res;
if(txn.get(dbi, {sizeof(val), (char*)&val},
res)) {
throw std::runtime_error("no record");
}
cout << "Done with readtransaction "<<n<<" in thread " << tid<<endl;
}
cout<<"Done with read thread "<<tid<<endl;
}
catch(std::exception& e)
{
cout<<"in thread "<<tid<<": "<<e.what()<<endl;
throw;
}
struct MDBVal
{
MDBVal(int v) : d_v(v)
{
d_mdbval.mv_size=sizeof(v);
d_mdbval.mv_data = &d_v;
}
operator const MDB_val&()
{
return d_mdbval;
}
int d_v;
MDB_val d_mdbval;
};
void doFill()
{
auto env = getMDBEnv("./database", 0, 0600);
MDBDbi dbi = env->openDB("ahu", MDB_CREATE);
for(int n = 0; n < 20; ++n) {
auto txn = env->getRWTransaction();
for(int j=0; j < 1000000; ++j) {
MDBVal mv(n*1000000+j);
txn.put(dbi, mv, mv, 0);
}
txn.commit();
}
cout<<"Done filling"<<endl;
}
void doMeasure()
{
auto env = getMDBEnv("./database", 0, 0600);
MDBDbi dbi = env->openDB("ahu", MDB_CREATE);
for(;;) {
for(int n = 0; n < 20; ++n) {
auto txn = env->getROTransaction();
unsigned int count=0;
for(int j=0; j < 1000000; ++j) {
MDBVal mv(n*1000000+j);
MDB_val res;
if(!txn.get(dbi, mv, res))
++count;
}
cout<<count<<" ";
cout.flush();
if(!count)
break;
}
cout<<endl;
}
}
int main(int argc, char** argv)
{
cout<<std::this_thread::get_id()<<endl;
std::thread t1(doMeasure);
std::thread t2(doFill);
t1.join();
t2.join();
}
/*
auto env = getMDBEnv("./database", 0, 0600);
MDBDbi dbi = env->openDB("ahu", MDB_CREATE);
vector<std::thread> threads;
for(int n=0; n < 100; ++n) {
std::thread t(doPuts, n);
threads.emplace_back(std::move(t));
}
for(auto& t: threads) {
t.join();
}
threads.clear();
for(int n=0; n < 100; ++n) {
std::thread t(doGets, n);
threads.emplace_back(std::move(t));
}
for(auto& t: threads) {
t.join();
}
return 0;
}
closeTest();
auto env = getMDBEnv("./database", 0, 0600);
@ -116,3 +253,4 @@ int main(int argc, char** argv)
}
txn.commit();
}
*/