diff --git a/libpkg/data/database.cpp b/libpkg/data/database.cpp index a4c73df..efa78ff 100644 --- a/libpkg/data/database.cpp +++ b/libpkg/data/database.cpp @@ -25,12 +25,14 @@ struct PackageUpdaterPrivate { using AffectedDeps = std::unordered_multimap; using AffectedLibs = std::unordered_map; - explicit PackageUpdaterPrivate(DatabaseStorage &storage); + explicit PackageUpdaterPrivate(DatabaseStorage &storage, bool clear); void update(const PackageCache::StoreResult &res, const std::shared_ptr &package); void update(const StorageID packageID, bool removed, const std::shared_ptr &package); void submit(const std::string &dependencyName, AffectedDeps::mapped_type &affected, DependencyStorage::RWTransaction &txn); void submit(const std::string &libraryName, AffectedLibs::mapped_type &affected, LibraryDependencyStorage::RWTransaction &txn); + bool clear = false; + std::unique_lock lock; PackageStorage::RWTransaction packagesTxn; AffectedDeps affectedProvidedDeps; AffectedDeps affectedRequiredDeps; @@ -95,10 +97,12 @@ void Database::resetConfiguration() void Database::clearPackages() { - lastUpdate = DateTime(); - if (m_storage) { - m_storage->packageCache.clear(*m_storage); + if (!m_storage) { + return; } + const auto lock = std::unique_lock(m_storage->updateMutex); + m_storage->packageCache.clear(*m_storage); + lastUpdate = DateTime(); } std::vector> Database::findPackages(const std::function &pred) @@ -340,6 +344,7 @@ PackageSpec Database::findPackageWithID(const std::string &packageName) void Database::removePackage(const std::string &packageName) { + const auto lock = std::unique_lock(m_storage->updateMutex); const auto [packageID, package] = m_storage->packageCache.retrieve(*m_storage, packageName); if (package) { removePackageDependencies(packageID, package); @@ -349,6 +354,7 @@ void Database::removePackage(const std::string &packageName) StorageID Database::updatePackage(const std::shared_ptr &package) { + const auto lock = std::unique_lock(m_storage->updateMutex); const auto res = m_storage->packageCache.store(*m_storage, package, false); if (!res.updated) { return res.id; @@ -362,6 +368,7 @@ StorageID Database::updatePackage(const std::shared_ptr &package) StorageID Database::forceUpdatePackage(const std::shared_ptr &package) { + const auto lock = std::unique_lock(m_storage->updateMutex); const auto res = m_storage->packageCache.store(*m_storage, package, true); if (res.oldEntry) { removePackageDependencies(res.id, res.oldEntry); @@ -377,8 +384,7 @@ void Database::replacePackages(const std::vector> &newP package->addDepsAndProvidesFromOtherPackage(*existingPackage); } } - clearPackages(); - auto updater = PackageUpdater(*this); + auto updater = PackageUpdater(*this, true); for (const auto &package : newPackages) { updater.update(package); } @@ -623,15 +629,21 @@ std::string Database::filesPathFromRegularPath() const return ext == std::string::npos ? path : argsToString(std::string_view(path.data(), ext), ".files"); } -PackageUpdaterPrivate::PackageUpdaterPrivate(DatabaseStorage &storage) - : packagesTxn(storage.packages.getRWTransaction()) +PackageUpdaterPrivate::PackageUpdaterPrivate(DatabaseStorage &storage, bool clear) + : clear(clear) + , lock(storage.updateMutex) + , packagesTxn(storage.packages.getRWTransaction()) { + if (clear) { + storage.packageCache.clearCacheOnly(storage); + packagesTxn.clear(); + } } void PackageUpdaterPrivate::update(const PackageCache::StoreResult &res, const std::shared_ptr &package) { update(res.id, false, package); - if (res.oldEntry) { + if (!clear && res.oldEntry) { update(res.id, true, res.oldEntry); } } @@ -731,9 +743,9 @@ void PackageUpdaterPrivate::addLibrary(StorageID packageID, const std::string &l } } -PackageUpdater::PackageUpdater(Database &database) +PackageUpdater::PackageUpdater(Database &database, bool clear) : m_database(database) - , m_d(std::make_unique(*m_database.m_storage)) + , m_d(std::make_unique(*m_database.m_storage, clear)) { } @@ -748,42 +760,57 @@ LibPkg::PackageSpec LibPkg::PackageUpdater::findPackageWithID(const std::string StorageID PackageUpdater::update(const std::shared_ptr &package) { - const auto res = m_database.m_storage->packageCache.store(*m_database.m_storage, m_d->packagesTxn, package); + const auto &storage = m_database.m_storage; + const auto res = storage->packageCache.store(*m_database.m_storage, m_d->packagesTxn, package); m_d->update(res, package); return res.id; } void PackageUpdater::commit() { + const auto &storage = m_database.m_storage; m_d->packagesTxn.commit(); { - auto txn = m_database.m_storage->providedDeps.getRWTransaction(); + auto txn = storage->providedDeps.getRWTransaction(); + if (m_d->clear) { + txn.clear(); + } for (auto &[dependencyName, affected] : m_d->affectedProvidedDeps) { m_d->submit(dependencyName, affected, txn); } txn.commit(); } { - auto txn = m_database.m_storage->requiredDeps.getRWTransaction(); + auto txn = storage->requiredDeps.getRWTransaction(); + if (m_d->clear) { + txn.clear(); + } for (auto &[dependencyName, affected] : m_d->affectedRequiredDeps) { m_d->submit(dependencyName, affected, txn); } txn.commit(); } { - auto txn = m_database.m_storage->providedLibs.getRWTransaction(); + auto txn = storage->providedLibs.getRWTransaction(); + if (m_d->clear) { + txn.clear(); + } for (auto &[libraryName, affected] : m_d->affectedProvidedLibs) { m_d->submit(libraryName, affected, txn); } txn.commit(); } { - auto txn = m_database.m_storage->requiredLibs.getRWTransaction(); + auto txn = storage->requiredLibs.getRWTransaction(); + if (m_d->clear) { + txn.clear(); + } for (auto &[libraryName, affected] : m_d->affectedRequiredLibs) { m_d->submit(libraryName, affected, txn); } txn.commit(); } + m_d->lock.unlock(); } } // namespace LibPkg @@ -793,7 +820,7 @@ namespace ReflectiveRapidJSON { namespace JsonReflector { template <> -LIBPKG_EXPORT void push( +void push( const LibPkg::PackageSearchResult &reflectable, RAPIDJSON_NAMESPACE::Value &value, RAPIDJSON_NAMESPACE::Document::AllocatorType &allocator) { // customize serialization of PackageSearchResult to render as if it was pkg itself with an additional db property @@ -830,7 +857,7 @@ LIBPKG_EXPORT void push( } template <> -LIBPKG_EXPORT void pull(LibPkg::PackageSearchResult &reflectable, +void pull(LibPkg::PackageSearchResult &reflectable, const RAPIDJSON_NAMESPACE::GenericValue> &value, JsonDeserializationErrors *errors) { if (!value.IsObject()) { @@ -862,12 +889,28 @@ LIBPKG_EXPORT void pull(LibPkg::PackageSearchResult ReflectiveRapidJSON::JsonReflector::pull(dbInfo.arch, "dbArch", obj, errors); } +template <> +void push( + const LibPkg::AtomicDateTime &reflectable, RAPIDJSON_NAMESPACE::Value &value, RAPIDJSON_NAMESPACE::Document::AllocatorType &allocator) +{ + push(reflectable.load(), value, allocator); +} + +template <> +void pull(LibPkg::AtomicDateTime &reflectable, + const RAPIDJSON_NAMESPACE::GenericValue> &value, JsonDeserializationErrors *errors) +{ + auto d = CppUtilities::DateTime(); + pull(d, value, errors); + reflectable.store(d); +} + } // namespace JsonReflector namespace BinaryReflector { template <> -LIBPKG_EXPORT void writeCustomType( +void writeCustomType( BinarySerializer &serializer, const LibPkg::PackageSearchResult &packageSearchResult, BinaryVersion version) { if (const auto *const dbInfo = std::get_if(&packageSearchResult.db)) { @@ -881,7 +924,7 @@ LIBPKG_EXPORT void writeCustomType( } template <> -LIBPKG_EXPORT BinaryVersion readCustomType( +BinaryVersion readCustomType( BinaryDeserializer &deserializer, LibPkg::PackageSearchResult &packageSearchResult, BinaryVersion version) { deserializer.read(packageSearchResult.db.emplace().name, version); @@ -889,6 +932,20 @@ LIBPKG_EXPORT BinaryVersion readCustomType( return 0; } +template <> void writeCustomType(BinarySerializer &serializer, const LibPkg::AtomicDateTime &dateTime, BinaryVersion version) +{ + writeCustomType(serializer, dateTime.load(), version); +} + +template <> +BinaryVersion readCustomType(BinaryDeserializer &deserializer, LibPkg::AtomicDateTime &dateTime, BinaryVersion version) +{ + auto d = CppUtilities::DateTime(); + auto v = readCustomType(deserializer, d, version); + dateTime.store(d); + return v; +} + } // namespace BinaryReflector } // namespace ReflectiveRapidJSON diff --git a/libpkg/data/database.h b/libpkg/data/database.h index bfcfcb9..f6f0f8e 100644 --- a/libpkg/data/database.h +++ b/libpkg/data/database.h @@ -10,6 +10,7 @@ #include #include +#include #include #include #include @@ -100,7 +101,7 @@ struct LIBPKG_EXPORT UnresolvedDependencies : public ReflectiveRapidJSON::JsonSe struct PackageUpdaterPrivate; struct LIBPKG_EXPORT PackageUpdater { - explicit PackageUpdater(Database &database); + explicit PackageUpdater(Database &database, bool clear = false); ~PackageUpdater(); PackageSpec findPackageWithID(const std::string &packageName); @@ -112,6 +113,22 @@ private: std::unique_ptr m_d; }; +struct AtomicDateTime : public std::atomic { + AtomicDateTime(CppUtilities::DateTime value = CppUtilities::DateTime()) + : std::atomic(value) + { + } + AtomicDateTime(AtomicDateTime &&other) + : std::atomic(other.load()) + { + } + AtomicDateTime &operator=(AtomicDateTime &&other) + { + store(other.load()); + return *this; + } +}; + struct LIBPKG_EXPORT Database : public ReflectiveRapidJSON::JsonSerializable, public ReflectiveRapidJSON::BinarySerializable { using PackageVisitorMove = std::function &&)>; // package is invalidated/reused unless moved from!!! using PackageVisitorConst = std::function &)>; @@ -134,8 +151,6 @@ struct LIBPKG_EXPORT Database : public ReflectiveRapidJSON::JsonSerializable> findPackages(const std::function &pred); - void removePackageDependencies(StorageID packageID, const std::shared_ptr &package); - void addPackageDependencies(StorageID packageID, const std::shared_ptr &package); void allPackages(const PackageVisitorMove &visitor); void allPackagesByName(const PackageVisitorByName &visitor); std::size_t packageCount() const; @@ -158,6 +173,11 @@ struct LIBPKG_EXPORT Database : public ReflectiveRapidJSON::JsonSerializable &package); + void addPackageDependencies(StorageID packageID, const std::shared_ptr &package); + +public: std::string name; std::string path; std::string filesPath; @@ -168,7 +188,7 @@ struct LIBPKG_EXPORT Database : public ReflectiveRapidJSON::JsonSerializable dependencies; std::string localPkgDir; std::string localDbDir; - CppUtilities::DateTime lastUpdate; + AtomicDateTime lastUpdate; bool syncFromMirror = false; bool toBeDiscarded = false; @@ -245,6 +265,14 @@ template <> LIBPKG_EXPORT void pull(LibPkg::PackageSearchResult &reflectable, const RAPIDJSON_NAMESPACE::GenericValue> &value, JsonDeserializationErrors *errors); +// declare custom (de)serialization for AtomicDateTime +template <> +LIBPKG_EXPORT void push( + const LibPkg::AtomicDateTime &reflectable, RAPIDJSON_NAMESPACE::Value &value, RAPIDJSON_NAMESPACE::Document::AllocatorType &allocator); +template <> +LIBPKG_EXPORT void pull(LibPkg::AtomicDateTime &reflectable, + const RAPIDJSON_NAMESPACE::GenericValue> &value, JsonDeserializationErrors *errors); + } // namespace JsonReflector namespace BinaryReflector { @@ -256,6 +284,13 @@ template <> LIBPKG_EXPORT BinaryVersion readCustomType( BinaryDeserializer &deserializer, LibPkg::PackageSearchResult &packageSearchResult, BinaryVersion version); +template <> +LIBPKG_EXPORT void writeCustomType( + BinarySerializer &serializer, const LibPkg::AtomicDateTime &packageSearchResult, BinaryVersion version); +template <> +LIBPKG_EXPORT BinaryVersion readCustomType( + BinaryDeserializer &deserializer, LibPkg::AtomicDateTime &packageSearchResult, BinaryVersion version); + } // namespace BinaryReflector } // namespace ReflectiveRapidJSON diff --git a/libpkg/data/storage.cpp b/libpkg/data/storage.cpp index 28bd9b7..d720682 100644 --- a/libpkg/data/storage.cpp +++ b/libpkg/data/storage.cpp @@ -57,7 +57,7 @@ template void StorageCacheEntries: template auto StorageCache::retrieve(Storage &storage, ROTxn *txn, StorageID storageID) -> SpecType { - // check for package in cache + // check for package in cache, should be ok even if the db is being updated const auto ref = typename StorageEntryByID::result_type{ storageID, &storage }; auto lock = std::unique_lock(m_mutex); if (auto *const existingCacheEntry = m_entries.find(ref)) { @@ -67,13 +67,16 @@ auto StorageCache::retrieve(Storage & lock.unlock(); auto entry = std::make_shared(); if (auto id = txn ? txn->get(storageID, *entry) : storage.packages.getROTransaction().get(storageID, *entry)) { - using CacheEntry = typename Entries::StorageEntry; - using CacheRef = typename Entries::Ref; - auto newCacheEntry = CacheEntry(CacheRef(storage, entry), id); - newCacheEntry.entry = entry; - lock = std::unique_lock(m_mutex); - m_entries.insert(std::move(newCacheEntry)); - lock.unlock(); + // try to acquire update lock to avoid update existing cache entries while db is being updated + if (const auto updateLock = std::unique_lock(storage.updateMutex, std::try_to_lock)) { + using CacheEntry = typename Entries::StorageEntry; + using CacheRef = typename Entries::Ref; + auto newCacheEntry = CacheEntry(CacheRef(storage, entry), id); + newCacheEntry.entry = entry; + lock = std::unique_lock(m_mutex); + m_entries.insert(std::move(newCacheEntry)); + lock.unlock(); + } return SpecType(id, entry); } return SpecType(0, std::shared_ptr()); @@ -92,7 +95,7 @@ auto StorageCache::retrieve(Storage & if (entryName.empty()) { return SpecType(0, std::shared_ptr()); } - // check for package in cache + // check for package in cache, should be ok even if the db is being updated using CacheRef = typename Entries::Ref; const auto ref = CacheRef(storage, entryName); auto lock = std::unique_lock(m_mutex); @@ -103,12 +106,15 @@ auto StorageCache::retrieve(Storage & // check for package in storage, populate cache entry auto entry = std::make_shared(); if (auto id = txn ? txn->template get<0>(entryName, *entry) : storage.packages.getROTransaction().template get<0>(entryName, *entry)) { - using CacheEntry = typename Entries::StorageEntry; - auto newCacheEntry = CacheEntry(CacheRef(storage, entry), id); - newCacheEntry.entry = entry; - lock = std::unique_lock(m_mutex); - m_entries.insert(std::move(newCacheEntry)); - lock.unlock(); + // try to acquire update lock to avoid update existing cache entries while db is being updated + if (const auto updateLock = std::unique_lock(storage.updateMutex, std::try_to_lock)) { + using CacheEntry = typename Entries::StorageEntry; + auto newCacheEntry = CacheEntry(CacheRef(storage, entry), id); + newCacheEntry.entry = entry; + lock = std::unique_lock(m_mutex); + m_entries.insert(std::move(newCacheEntry)); + lock.unlock(); + } return SpecType(id, entry); } return SpecType(0, std::shared_ptr()); diff --git a/libpkg/data/storageprivate.h b/libpkg/data/storageprivate.h index 0be0433..0b7a0b8 100644 --- a/libpkg/data/storageprivate.h +++ b/libpkg/data/storageprivate.h @@ -4,6 +4,8 @@ #include "./package.h" #include "./storagegeneric.h" +#include + namespace LibPkg { using PackageStorage = LMDBSafe::TypedDBI>; @@ -50,6 +52,7 @@ struct DatabaseStorage { DependencyStorage requiredDeps; LibraryDependencyStorage providedLibs; LibraryDependencyStorage requiredLibs; + std::mutex updateMutex; // must be acquired to update packages, concurrent reads should still be possible private: std::shared_ptr m_env; diff --git a/librepomgr/buildactions/reloaddatabase.cpp b/librepomgr/buildactions/reloaddatabase.cpp index b6be92a..a203cdb 100644 --- a/librepomgr/buildactions/reloaddatabase.cpp +++ b/librepomgr/buildactions/reloaddatabase.cpp @@ -85,7 +85,7 @@ void ReloadDatabase::run() if (!force) { auto configReadLock2 = m_setup.config.lockToRead(); auto *const destinationDb = m_setup.config.findDatabase(dbName, dbArch); - if (const auto lastUpdate = destinationDb->lastUpdate; lastModified <= lastUpdate) { + if (const auto lastUpdate = destinationDb->lastUpdate.load(); lastModified <= lastUpdate) { configReadLock2.unlock(); m_buildAction->appendOutput(Phrases::InfoMessage, "Skip loading database \"", dbName, '@', dbArch, "\" from local file \"", dbPath, "\"; last modification time <= last update (", lastModified.toString(), '<', '=', @@ -98,7 +98,7 @@ void ReloadDatabase::run() dbFileLock.lock().unlock(); m_buildAction->appendOutput( Phrases::InfoMessage, "Loading database \"", dbName, '@', dbArch, "\" from local file \"", dbPath, "\"\n"); - const auto configLock = m_setup.config.lockToWrite(); + const auto configLock = m_setup.config.lockToRead(); auto *const destinationDb = m_setup.config.findDatabase(dbName, dbArch); if (!destinationDb) { m_buildAction->appendOutput( @@ -123,7 +123,7 @@ void ReloadDatabase::run() // clear AUR cache if (m_toAur) { - auto lock = m_setup.config.lockToWrite(); + auto lock = m_setup.config.lockToRead(); m_setup.config.aur.clearPackages(); lock.unlock(); m_buildAction->log()(Phrases::InfoMessage, "Cleared AUR cache\n"); diff --git a/librepomgr/buildactions/reloadlibrarydependencies.cpp b/librepomgr/buildactions/reloadlibrarydependencies.cpp index 94dc623..138cb5b 100644 --- a/librepomgr/buildactions/reloadlibrarydependencies.cpp +++ b/librepomgr/buildactions/reloadlibrarydependencies.cpp @@ -362,7 +362,7 @@ void ReloadLibraryDependencies::loadPackageInfoFromContents() m_buildAction->appendOutput(Phrases::SuccessMessage, "Adding parsed information to databases ...\n"); std::size_t counter = 0; for (DatabaseToConsider &relevantDb : m_relevantPackagesByDatabase) { - auto configWritelock = m_setup.config.lockToWrite(); // acquire lock within loop to allow intermediate reads + auto configWritelock = m_setup.config.lockToRead(); auto *const db = m_setup.config.findDatabase(relevantDb.name, relevantDb.arch); if (!db) { continue; // the whole database has been removed while we were loading package contents diff --git a/librepomgr/buildactions/repomanagement.cpp b/librepomgr/buildactions/repomanagement.cpp index 7e87133..6f10c66 100644 --- a/librepomgr/buildactions/repomanagement.cpp +++ b/librepomgr/buildactions/repomanagement.cpp @@ -591,7 +591,7 @@ void CleanRepository::run() const auto lastModified = LibPkg::lastModified(dbFile); if (lastModified != db->lastUpdate) { m_messages.errors.emplace_back("The db file's last modification (" % lastModified.toString() % ") does not match the last db update (" - % db->lastUpdate.toString() + % db->lastUpdate.load().toString() + ")."); fatalError = true; } diff --git a/librepomgr/serversetup.cpp b/librepomgr/serversetup.cpp index 8c88c46..cace8eb 100644 --- a/librepomgr/serversetup.cpp +++ b/librepomgr/serversetup.cpp @@ -608,7 +608,7 @@ void ServiceSetup::printDatabases() cerr << Phrases::SuccessMessage << "Found " << config.databases.size() << " databases:" << Phrases::End; for (const auto &db : config.databases) { cerr << Phrases::SubMessage << db.name << "@" << db.arch << ": " << db.packageCount() << " packages, last updated on " - << db.lastUpdate.toString(DateTimeOutputFormat::DateAndTime) << Phrases::End << " - path: " << db.path + << db.lastUpdate.load().toString(DateTimeOutputFormat::DateAndTime) << Phrases::End << " - path: " << db.path << "\n - local db dir: " << db.localDbDir << "\n - local package dir: " << db.localPkgDir << '\n'; } cerr << Phrases::SubMessage << "AUR (" << config.aur.packageCount() << " packages cached)" << Phrases::End; diff --git a/librepomgr/webapi/routes.cpp b/librepomgr/webapi/routes.cpp index 3a11a93..d2403d2 100644 --- a/librepomgr/webapi/routes.cpp +++ b/librepomgr/webapi/routes.cpp @@ -357,7 +357,7 @@ void postLoadPackages(const Params ¶ms, ResponseHandler &&handler) { const auto withFiles = params.target.hasFlag("with-files"); const auto force = params.target.hasFlag("force"); - auto lock = params.setup.config.lockToWrite(); + auto lock = params.setup.config.lockToRead(); params.setup.config.loadAllPackages(withFiles, force); lock.unlock(); handler(makeText(params.request(), "packages loaded")); diff --git a/librepomgr/webclient/aur.cpp b/librepomgr/webclient/aur.cpp index 5e5000a..b6e18b2 100644 --- a/librepomgr/webclient/aur.cpp +++ b/librepomgr/webclient/aur.cpp @@ -52,7 +52,7 @@ void searchAurPackages(LogContext &log, ServiceSetup &setup, const std::string & try { // parse and cache the AUR packages auto packages = Package::fromAurRpcJson(body.data(), body.size(), PackageOrigin::AurRpcSearch); - auto lock = setup.config.lockToWrite(); + auto lock = setup.config.lockToRead(); auto updater = LibPkg::PackageUpdater(setup.config.aur); for (auto &[packageID, package] : packages) { packageID = updater.update(package); @@ -100,7 +100,7 @@ std::shared_ptr queryAurPackagesInternal(LogContext &log, Servi try { // parse and cache the AUR packages auto packagesFromAur = Package::fromAurRpcJson(body.data(), body.size()); - auto lock = setup.config.lockToWrite(); + auto lock = setup.config.lockToRead(); auto updater = PackageUpdater(setup.config.aur); for (auto &[packageID, package] : packagesFromAur) { packageID = updater.update(package); diff --git a/librepomgr/webclient/database.cpp b/librepomgr/webclient/database.cpp index ddea4d0..d4da03f 100644 --- a/librepomgr/webclient/database.cpp +++ b/librepomgr/webclient/database.cpp @@ -100,7 +100,7 @@ void queryDatabases(LogContext &log, ServiceSetup &setup, std::vectorlastUpdate; + const auto lastUpdate = destinationDb->lastUpdate.load(); configReadLock.unlock(); if (lastModified > lastUpdate) { return; @@ -138,7 +138,7 @@ void queryDatabases(LogContext &log, ServiceSetup &setup, std::vectorlastUpdate; lastModified <= lastUpdate) { + if (const auto lastUpdate = destinationDb->lastUpdate.load(); lastModified <= lastUpdate) { configReadLock.unlock(); log(Phrases::InfoMessage, "Skip loading database \"", dbName, '@', dbArch, "\" from mirror response; last modification time <= last update (", lastModified.toString(), @@ -157,9 +157,10 @@ void queryDatabases(LogContext &log, ServiceSetup &setup, std::vector