#include "database.h" #include #include #include #include #include "../Common/defines.h" #include "constants.h" #define DEFAULT_PAGESIZE (1024) #define DEFAULT_CONCURRENT_TRANSACTIONS (100) #define DEFAULT_DEADLOCK_DETECTION_SECONDS (10) #define MEMORY_ONE_MB (1024*1024) // memory required for locks, db handles and statistics overhead #define MAX_MEMORY_GBYTES (0) #ifdef QT_DEBUG #define MAX_MEMORY_BYTES (2*MEMORY_ONE_MB) #else #define MAX_MEMORY_BYTES (4*MEMORY_ONE_MB) #endif #define DEFAULT_CACHE_SPLIT_PARTS (5) #define DEFAULT_CACHE_GBYTES (0) #define MAX_CACHE_GBYTES (0) #ifdef QT_DEBUG #define DEFAULT_CACHE_BYTES (10*MEMORY_ONE_MB) #define MAX_CACHE_BYTES (100*MEMORY_ONE_MB) #else #define DEFAULT_CACHE_BYTES (100*MEMORY_ONE_MB) #define MAX_CACHE_BYTES (200*MEMORY_ONE_MB) #endif #ifdef QT_DEBUG #define BUFFER_EXPAND_ATTEMPTS 20 #else #define BUFFER_EXPAND_ATTEMPTS 5 #endif namespace Helpers { QString ensureDBDirectoryExists(const QString &dbDirName) { QString appDataPath = XPIKS_USERDATA_PATH; QString path; if (!appDataPath.isEmpty()) { path = QDir::cleanPath(appDataPath + QDir::separator() + dbDirName); QDir dbDir(path); if (!dbDir.exists()) { LOG_INFO << "Creating db dir" << path; QDir().mkpath(path); } } else { path = QDir::currentPath(); } return path; } Database::Database(__db_env *environment, int maxBufferSize, AsyncCoordinator *finalizeCoordinator): m_FinalizeCoordinator(finalizeCoordinator), m_Environment(environment), m_Database(nullptr), m_StartValueBufferSize(maxBufferSize), m_IsOpened(false) { Q_ASSERT(maxBufferSize > 0); Q_ASSERT(environment != nullptr); AsyncCoordinatorLocker locker(m_FinalizeCoordinator); Q_UNUSED(locker); } Database::~Database() { Q_ASSERT(m_IsOpened == false); if (m_IsOpened) { close(); } } bool Database::open(const char *dbName) { Q_ASSERT(m_Database == nullptr); LOG_DEBUG << dbName; bool success = false; int result = 0; do { result = db_create(&m_Database, m_Environment, 0); if (result != 0) { LOG_WARNING << "Failed creating a database instance:" << db_strerror(result); break; } result = m_Database->set_pagesize(m_Database, DEFAULT_PAGESIZE); if (result != 0) { LOG_WARNING << "Failed to set pagesize:" << db_strerror(result); break; } u_int32_t db_flags; db_flags = DB_CREATE | // create the database if not exists DB_AUTO_COMMIT | // Enclose the DB->open() call within a transaction DB_THREAD; // thread-safety result = m_Database->open(m_Database, /* Pointer to the database */ NULL, /* Txn pointer */ dbName, /* File name */ NULL, /* Logical db name */ DB_BTREE, /* Database type (using btree) */ db_flags, /* Open flags */ 0); /* File mode. Using defaults */ if (result != 0) { LOG_WARNING << "Failed to open the database" << dbName << db_strerror(result); break; } m_IsOpened = true; success = true; } while (false); return success; } void Database::close() { LOG_DEBUG << "#"; Q_ASSERT(m_Database != nullptr); // sync and close database int result = m_Database->close(m_Database, 0); if (result != 0) { LOG_WARNING << "Closing database failed:" << db_strerror(result); } else { m_IsOpened = false; m_Database = nullptr; } AsyncCoordinatorUnlocker unlocker(m_FinalizeCoordinator); Q_UNUSED(unlocker); } void Database::sync() { LOG_DEBUG << "#"; Q_ASSERT(m_Database != nullptr); int result = m_Database->sync(m_Database, 0); if (result != 0) { LOG_WARNING << "Database sync failed" << db_strerror(result); } } void Database::warmupCache(int percent) { LOG_DEBUG << "#"; Q_ASSERT((0 <= percent) && (percent <= 100)); Q_ASSERT(m_Database != nullptr); Q_ASSERT(m_Environment != nullptr); // here and below code from "Warming the memory pool" BerkeleyDB docs do { u_int32_t pagesize, gbytes, bytes; int ret = 0, numcachepages; /* Find out how many pages can fit at most in the cache */ ret = m_Environment->get_mp_pagesize(m_Environment, &pagesize); if (ret != 0) { LOG_WARNING << "Error retrieving the cache pagesize:" << db_strerror(ret); break; } ret = m_Environment->get_cache_max(m_Environment, &gbytes, &bytes); if (ret != 0) { LOG_WARNING << "Error retrieving maximum cache size:" << db_strerror(ret); break; } /* Avoid an overflow by first calculating pages per gigabyte. */ numcachepages = gbytes * ((1024 * 1024 * 1024) / pagesize); numcachepages += bytes / pagesize; LOG_DEBUG << numcachepages << "can fit"; double realPages = floor(numcachepages * ((double)percent / 100.0)); int result = doWarmupCache((int)realPages); if (result != 0) { LOG_WARNING << "Cache warmup failed"; } else { LOG_INFO << "Cache warmup succeeded"; } } while (false); } bool Database::exists(const QByteArray &key) { Q_ASSERT(m_Database != nullptr); int result = checkExists(key); bool missing = result == DB_NOTFOUND; return !missing; } bool Database::tryGetKey(const QByteArray &key, QByteArray &value) { Q_ASSERT(m_Database != nullptr); int attempts = BUFFER_EXPAND_ATTEMPTS; int result = 0; int bufferSize = m_StartValueBufferSize; while (attempts--) { result = doGetKey(key, value, bufferSize); if ((result != DB_NOTFOUND) && (result != DB_KEYEMPTY)) { LOG_WARNING << "Error reading" << key << "from database:" << db_strerror(result); } if (result == DB_BUFFER_SMALL) { bufferSize = 2*bufferSize + 1; } else { break; } } // do not save this for debug in order to test code above #ifdef QT_NO_DEBUG if (result == 0) { // if we have concurrency issue here, then anyway eventually // max buffer size will be properly adjusted m_MaxValueBufferSize = bufferSize; } #endif bool success = result == 0; return success; } bool Database::trySetKey(const QByteArray &key, const QByteArray &value) { Q_ASSERT(m_Database != nullptr); int result = doSetKey(key, value); if (result != 0) { LOG_WARNING << "Failed to set a key" << key << "with error:" << db_strerror(result); } bool success = result == 0; return success; } bool Database::deleteRecord(const QByteArray &key) { Q_ASSERT(m_Database != nullptr); int result = doDeleteRecord(key); if (result != 0) { LOG_WARNING << "Failed to delete a key" << key << ":" << db_strerror(result); } bool success = result == 0; return success; } std::unique_ptr Database::getIterator() { LOG_DEBUG << "#"; std::unique_ptr it(new Database::Iterator(m_Database)); it->initialize(); LOG_DEBUG << "Iterator is valid:" << it->isValid(); return it; } int Database::checkExists(const QByteArray &key) { DBT dbtKey; memset(&dbtKey, 0, sizeof(DBT)); u_int32_t keySize = key.size(); const char *keyData = key.data(); dbtKey.data = (void*)keyData; dbtKey.size = keySize; int result = m_Database->exists(m_Database, NULL, &dbtKey, 0); return result; } int Database::doGetKey(const QByteArray &key, QByteArray &value, int bufferSize) { DBT dbtKey, dbtData; memset(&dbtKey, 0, sizeof(DBT)); memset(&dbtData, 0, sizeof(DBT)); u_int32_t keySize = key.size(); const char *keyData = key.data(); dbtKey.data = (void*)keyData; dbtKey.size = keySize; value.fill(0, bufferSize); char *valueData = value.data(); dbtData.data = valueData; dbtData.ulen = bufferSize; dbtData.flags = DB_DBT_USERMEM; int result = m_Database->get(m_Database, NULL, &dbtKey, &dbtData, 0); return result; } int Database::doSetKey(const QByteArray &key, const QByteArray &value) { DBT dbtKey, dbtData; memset(&dbtKey, 0, sizeof(DBT)); memset(&dbtData, 0, sizeof(DBT)); u_int32_t keySize = key.size(); const char *keyData = key.data(); dbtKey.data = (void*)keyData; dbtKey.size = keySize; u_int32_t valueSize = value.size() + 1; const char *valueData = value.data(); dbtData.data = (void*)valueData; dbtData.size = valueSize; int result = m_Database->put(m_Database, NULL, &dbtKey, &dbtData, 0); return result; } int Database::doDeleteRecord(const QByteArray &key) { DBT dbtKey; memset(&dbtKey, 0, sizeof(DBT)); u_int32_t keySize = key.size(); const char *keyData = key.data(); dbtKey.data = (void*)keyData; dbtKey.size = keySize; int result = m_Database->del(m_Database, NULL, &dbtKey, 0); return result; } int Database::doWarmupCache(int pagesCount) { LOG_INFO << pagesCount; // here and below code from "Warming the memory pool" BerkeleyDB docs DB_MPOOLFILE *mpf = 0; void *page_addrp = 0; db_pgno_t page_number = 0; int ret = 0; int pagecount = 0; /* * Get the mpool handle */ mpf = m_Database->get_mpf(m_Database); /* Load pages until there are no more pages in the database, * or until we've put as many pages into the cache as will fit. */ while (ret != DB_PAGE_NOTFOUND && pagecount < pagesCount) { /* * Get the page from the cache. This causes DB to retrieve * the page from disk if it isn't already in the cache. */ ret = mpf->get(mpf, &page_number, 0, 0, &page_addrp); if (ret && ret != DB_PAGE_NOTFOUND) { LOG_WARNING << "Error retrieving db page:" << page_number << db_strerror(ret); return ret; } /* * If a page was retrieved, put it back into the cache. This * releases the page latch so that the page can be evicted * if DB needs more room in the cache at some later time. */ if (ret != DB_PAGE_NOTFOUND) { ret = mpf->put(mpf, page_addrp, DB_PRIORITY_UNCHANGED, 0); if (ret) { LOG_WARNING << "Error putting db page:" << page_number << db_strerror(ret); return ret; } } ++page_number; ++pagecount; } LOG_INFO << pagecount << "pages loaded"; return 0; } DatabaseManager::DatabaseManager(): QObject(), m_Environment(NULL) { QObject::connect(&m_FinalizeCoordinator, &Helpers::AsyncCoordinator::statusReported, this, &DatabaseManager::onReadyToFinalize); } bool DatabaseManager::initialize() { const bool withoutRecovery = false; const bool withRecovery = true; int result = 0; do { result = doInitialize(Constants::DB_DIR, withoutRecovery); if (result == 0) { break; } LOG_WARNING << "Retrying environment initialization with recovery"; closeEnvironment(); result = doInitialize(Constants::DB_DIR, withRecovery); if (result == 0) { LOG_INFO << "Recovery finished successfully!"; break; } LOG_WARNING << "Switching to failover DB environment"; result = doInitialize(Constants::DB_DIR_FAILOVER, withoutRecovery); if (result == 0) { LOG_INFO << "Successfully switched to failover environment"; break; } LOG_WARNING << "Retrying environment initialization with recovery"; closeEnvironment(); result = doInitialize(Constants::DB_DIR_FAILOVER, withRecovery); if (result == 0) { LOG_INFO << "Failover recovery finished successfully!"; break; } LOG_WARNING << "Failed to initialize the environment"; } while (false); bool success = result == 0; return success; } void DatabaseManager::finalize() { LOG_DEBUG << "#"; closeAll(); closeEnvironment(); LOG_INFO << "Finalize finished"; } int DatabaseManager::doInitialize(const QString &dbDirName, bool withRecovery) { Q_ASSERT(m_Environment == nullptr); LOG_INFO << "with recovery =" << withRecovery; int result = 0; do { result = db_env_create(&m_Environment, 0); if (result != 0) { LOG_WARNING << "Failed to create an environment:" << db_strerror(result); break; } LOG_DEBUG << "DB Environment allocated"; u_int32_t additional_flags; additional_flags = DB_AUTO_COMMIT | /*wrap all operations in transactions*/ DB_TXN_NOSYNC; /*do not flush write-ahead logs*/ result = m_Environment->set_flags(m_Environment, additional_flags, 1); if (result != 0) { LOG_WARNING << "Failed to set additional flags:" << db_strerror(result); break; } // deadlock detection result = m_Environment->set_timeout(m_Environment, DEFAULT_DEADLOCK_DETECTION_SECONDS, DB_SET_TXN_TIMEOUT); if (result != 0) { LOG_WARNING << "Failed to set the deadlock detection timeout:" << db_strerror(result); break; } result = m_Environment->set_tx_max(m_Environment, DEFAULT_CONCURRENT_TRANSACTIONS); if (result != 0) { LOG_WARNING << "Failed to set concurrent transactions number:" << db_strerror(result); break; } result = m_Environment->set_cache_max(m_Environment, MAX_CACHE_GBYTES, MAX_CACHE_BYTES); if (result != 0) { LOG_WARNING << "Failed to set max cache size:" << db_strerror(result); break; } result = m_Environment->set_cachesize(m_Environment, DEFAULT_CACHE_GBYTES, DEFAULT_CACHE_BYTES, DEFAULT_CACHE_SPLIT_PARTS); if (result != 0) { LOG_WARNING << "Failed to set cache size:" << db_strerror(result); break; } result = m_Environment->set_memory_max(m_Environment, MAX_MEMORY_GBYTES, MAX_MEMORY_BYTES); if (result != 0) { LOG_WARNING << "Failed to set memory max size:" << db_strerror(result); break; } LOG_DEBUG << "DB Environment configured"; QString dbDirPath = ensureDBDirectoryExists(dbDirName); QByteArray dbDir = dbDirPath.toUtf8(); char *dbDirData = dbDir.data(); u_int32_t env_flags; env_flags = 0 | DB_INIT_LOCK | /* Initialize locking */ DB_INIT_LOG | /* Initialize logging */ DB_INIT_MPOOL | /* Initialize the cache */ DB_INIT_TXN | /* Initialize transactions */ DB_CREATE | /* If the environment does not exist, create it. */ //DB_PRIVATE | DO NOT specify if using failcheck or recovery DB_THREAD | /* Multithreading support */ DB_RECOVER | /* run recovery */ DB_USE_ENVIRON; /* allow any paths for db files */ if (withRecovery) { env_flags = env_flags | DB_RECOVER_FATAL; } result = m_Environment->open(m_Environment, dbDirData, env_flags, 0); if (result != 0) { LOG_WARNING << "Failed to open an environment:" << db_strerror(result); break; } LOG_DEBUG << "DB Environment opened"; m_DBDirPath = dbDirPath; } while (false); return result; } int DatabaseManager::closeEnvironment() { LOG_DEBUG << "#"; Q_ASSERT(m_Environment != nullptr); int result = m_Environment->close(m_Environment, 0); if (result != 0) { LOG_WARNING << "Failed to close an environment:" << db_strerror(result); } else { LOG_DEBUG << "Environment closed"; m_Environment = nullptr; } return result; } std::shared_ptr DatabaseManager::openDatabase(const char *dbName) { LOG_DEBUG << dbName; Q_ASSERT(m_Environment != nullptr); std::shared_ptr db(new Database(m_Environment, DEFAULT_READ_BUFFER_START_SIZE, &m_FinalizeCoordinator)); bool openSucceded = db->open(dbName); if (!openSucceded) { LOG_WARNING << "Failed to open" << dbName; db->close(); db.reset(); } else { LOG_INFO << "Opened" << dbName << "database"; m_DatabaseArray.push_back(db); } return db; } void DatabaseManager::createCheckpoint() { LOG_DEBUG << "#"; Q_ASSERT(m_Environment != nullptr); int result = m_Environment->txn_checkpoint(m_Environment, 0, 0, 0); if (result != 0) { LOG_WARNING << "Failed to checkpoint environment:" << db_strerror(result); } else { LOG_INFO << "Checkpoint created"; } } void DatabaseManager::prepareToFinalize() { LOG_DEBUG << "#"; m_FinalizeCoordinator.allBegun(); } void DatabaseManager::onReadyToFinalize(int status) { LOG_INFO << status; finalize(); } void DatabaseManager::closeAll() { LOG_DEBUG << "#"; Q_ASSERT(m_Environment != nullptr); for (auto &db: m_DatabaseArray) { db->close(); } LOG_INFO << "Databases closed"; } Database::Iterator::Iterator(__db *database): m_Database(database), m_Cursor(nullptr), m_CurrentKey(new DBT()), m_CurrentValue(new DBT()), m_IsValid(false), m_IsInitialized(false) { Q_ASSERT(database != nullptr); } Database::Iterator::~Iterator() { if (m_CurrentKey != nullptr) { delete m_CurrentKey; } if (m_CurrentValue != nullptr) { delete m_CurrentValue; } if (m_Cursor != nullptr) { m_Cursor->close(m_Cursor); } } bool Database::Iterator::moveNext() { Q_ASSERT(m_IsInitialized); if (!m_IsValid) { return false; } const bool success = doMoveNext(); m_IsValid = success; return success; } void Database::Iterator::initialize() { Q_ASSERT(!m_IsInitialized); u_int32_t flags = 0; // optimize for bulk operations to continue on the // same database page as the previous operation flags |= DB_CURSOR_BULK; int result = m_Database->cursor(m_Database, NULL, &m_Cursor, flags); if (result != 0) { LOG_WARNING << "Failed to checkpoint environment:" << db_strerror(result); m_IsValid = false; } else { m_IsValid = true; } m_IsInitialized = true; } bool Database::Iterator::doMoveNext() { int attempts = BUFFER_EXPAND_ATTEMPTS; int result = 0; int keyBufferSize = m_KeyStartBufferSize; int valueBufferSize = m_ValueStartBufferSize; while (attempts--) { result = moveCursor(keyBufferSize, valueBufferSize); if (result == DB_BUFFER_SMALL) { keyBufferSize = 2*keyBufferSize + 1; valueBufferSize = 2*valueBufferSize + 1; } else { if (result == DB_NOTFOUND) { LOG_INFO << "Reached the end of the database"; } else { LOG_WARNING << "Error moving cursor through database:" << db_strerror(result); } break; } } // do not save this for debug in order to test code above #ifdef QT_NO_DEBUG if (result == 0) { // if we have concurrency issue here, then anyway eventually // max buffer size will be properly adjusted m_KeyStartBufferSize = keyBufferSize; m_ValueStartBufferSize = valueBufferSize; } #endif bool success = result == 0; return success; } int Database::Iterator::moveCursor(int keyBufferSize, int valueBufferSize) { memset(m_CurrentKey, 0, sizeof(DBT)); memset(m_CurrentValue, 0, sizeof(DBT)); m_KeyBuffer.fill(0, keyBufferSize); u_int32_t keySize = m_KeyBuffer.size(); char *keyData = m_KeyBuffer.data(); m_CurrentKey->data = (void*)keyData; m_CurrentKey->size = keySize; m_CurrentKey->flags = DB_DBT_USERMEM; // ---- m_ValueBuffer.fill(0, valueBufferSize); u_int32_t valueSize = m_ValueBuffer.size(); char *valueData = m_ValueBuffer.data(); m_CurrentValue->data = (void*)valueData; m_CurrentValue->size = valueSize; m_CurrentValue->flags = DB_DBT_USERMEM; u_int32_t flags = 0; // the cursor is moved to the next key/data pair of the database, and that pair is returned. // In the presence of duplicate key values, the value of the key may not change. flags |= DB_NEXT; int result = m_Cursor->get(m_Cursor, m_CurrentKey, m_CurrentValue, flags); return result; } }