diff --git a/src/shared/Database/DatabaseMysql.cpp b/src/shared/Database/DatabaseMysql.cpp index 2b555d8..ca5a1f8 100644 --- a/src/shared/Database/DatabaseMysql.cpp +++ b/src/shared/Database/DatabaseMysql.cpp @@ -279,6 +279,7 @@ bool DatabaseMysql::Execute(const char *sql) // don't use queued execution if it has not been initialized if (!m_threadBody) return DirectExecute(sql); + nMutex.acquire(); tranThread = ACE_Based::Thread::current(); // owner of this transaction TransactionQueues::iterator i = m_tranQueues.find(tranThread); if (i != m_tranQueues.end() && i->second != NULL) @@ -291,6 +292,7 @@ bool DatabaseMysql::Execute(const char *sql) m_threadBody->Delay(new SqlStatement(sql)); } + nMutex.release(); return true; } @@ -356,6 +358,7 @@ bool DatabaseMysql::BeginTransaction() return true; // transaction started } + nMutex.acquire(); tranThread = ACE_Based::Thread::current(); // owner of this transaction TransactionQueues::iterator i = m_tranQueues.find(tranThread); if (i != m_tranQueues.end() && i->second != NULL) @@ -365,6 +368,7 @@ bool DatabaseMysql::BeginTransaction() m_tranQueues[tranThread] = new SqlTransaction(); + nMutex.release(); return true; } @@ -373,27 +377,31 @@ bool DatabaseMysql::CommitTransaction() if (!mMysql) return false; + bool _res = false; + // don't use queued execution if it has not been initialized if (!m_threadBody) { if (tranThread != ACE_Based::Thread::current()) return false; - bool _res = _TransactionCmd("COMMIT"); + _res = _TransactionCmd("COMMIT"); tranThread = NULL; mMutex.release(); return _res; } + nMutex.acquire(); tranThread = ACE_Based::Thread::current(); TransactionQueues::iterator i = m_tranQueues.find(tranThread); if (i != m_tranQueues.end() && i->second != NULL) { m_threadBody->Delay(i->second); - i->second = NULL; - return true; + m_tranQueues.erase(i); + _res = true; } - else - return false; + + nMutex.release(); + return _res; } bool DatabaseMysql::RollbackTransaction() @@ -412,13 +420,16 @@ bool DatabaseMysql::RollbackTransaction() return _res; } + nMutex.acquire(); tranThread = ACE_Based::Thread::current(); TransactionQueues::iterator i = m_tranQueues.find(tranThread); if (i != m_tranQueues.end() && i->second != NULL) { delete i->second; i->second = NULL; + m_tranQueues.erase(i); } + nMutex.release(); return true; } diff --git a/src/shared/Database/DatabaseMysql.h b/src/shared/Database/DatabaseMysql.h index 43baddf..6d27f5a 100644 --- a/src/shared/Database/DatabaseMysql.h +++ b/src/shared/Database/DatabaseMysql.h @@ -65,7 +65,8 @@ class MANGOS_DLL_SPEC DatabaseMysql : public Database // must be call before finish thread run void ThreadEnd(); private: - ACE_Thread_Mutex mMutex; + ACE_Thread_Mutex mMutex; // For thread safe operations between core and mySQL server + ACE_Thread_Mutex nMutex; // For thread safe operations on m_transQueues ACE_Based::Thread * tranThread; diff --git a/src/shared/Database/SqlDelayThread.cpp b/src/shared/Database/SqlDelayThread.cpp index e131100..115f296 100644 --- a/src/shared/Database/SqlDelayThread.cpp +++ b/src/shared/Database/SqlDelayThread.cpp @@ -30,21 +30,22 @@ void SqlDelayThread::run() mysql_thread_init(); #endif - const uint32 loopSleepms = 10; + //lets wait for next async task no more than 2 secs... + ACE_Time_Value _time(2); + const uint32 loopSleepms = 10; const uint32 pingEveryLoop = m_dbEngine->GetPingIntervall()/loopSleepms; - uint32 loopCounter = 0; while (m_running) { // if the running state gets turned off while sleeping // empty the queue before exiting - ACE_Based::Thread::Sleep(loopSleepms); - SqlOperation* s = NULL; - while (m_sqlQueue.next(s)) + ACE_Based::Thread::Sleep(loopSleepms); + SqlAsyncTask * s = dynamic_cast (m_sqlQueue.dequeue(/*&_time*/)); + if(s) { - s->Execute(m_dbEngine); + s->call(); delete s; } if((loopCounter++) >= pingEveryLoop) @@ -62,4 +63,11 @@ void SqlDelayThread::run() void SqlDelayThread::Stop() { m_running = false; + m_sqlQueue.queue()->deactivate(); +} + +bool SqlDelayThread::Delay(SqlOperation* sql) +{ + int res = m_sqlQueue.enqueue(new SqlAsyncTask(m_dbEngine, sql)); + return (res != -1); } diff --git a/src/shared/Database/SqlDelayThread.h b/src/shared/Database/SqlDelayThread.h index 0b3e633..0d21034 100644 --- a/src/shared/Database/SqlDelayThread.h +++ b/src/shared/Database/SqlDelayThread.h @@ -20,7 +20,7 @@ #define __SQLDELAYTHREAD_H #include "ace/Thread_Mutex.h" -#include "LockedQueue.h" +#include "ace/Activation_Queue.h" #include "Threading.h" @@ -29,7 +29,7 @@ class SqlOperation; class SqlDelayThread : public ACE_Based::Runnable { - typedef ACE_Based::LockedQueue SqlQueue; + typedef ACE_Activation_Queue SqlQueue; private: SqlQueue m_sqlQueue; ///< Queue of SQL statements @@ -41,7 +41,7 @@ class SqlDelayThread : public ACE_Based::Runnable SqlDelayThread(Database* db); ///< Put sql statement to delay queue - bool Delay(SqlOperation* sql) { m_sqlQueue.add(sql); return true; } + bool Delay(SqlOperation* sql); virtual void Stop(); ///< Stop event virtual void run(); ///< Main Thread loop diff --git a/src/shared/Database/SqlOperations.cpp b/src/shared/Database/SqlOperations.cpp index 25933a3..04a25aa 100644 --- a/src/shared/Database/SqlOperations.cpp +++ b/src/shared/Database/SqlOperations.cpp @@ -31,29 +31,36 @@ void SqlStatement::Execute(Database *db) void SqlTransaction::Execute(Database *db) { + m_Mutex.acquire(); if(m_queue.empty()) + { + m_Mutex.release(); return; + } db->DirectExecute("START TRANSACTION"); while(!m_queue.empty()) { char *sql = const_cast(m_queue.front()); - m_queue.pop(); if(!db->DirectExecute(sql)) { delete [] sql; + m_queue.pop(); db->DirectExecute("ROLLBACK"); while(!m_queue.empty()) { delete [] (const_cast(m_queue.front())); m_queue.pop(); } + m_Mutex.release(); return; } delete [] sql; + m_queue.pop(); } db->DirectExecute("COMMIT"); + m_Mutex.release(); } /// ---- ASYNC QUERIES ---- diff --git a/src/shared/Database/SqlOperations.h b/src/shared/Database/SqlOperations.h index fa9437c..4a9e831 100644 --- a/src/shared/Database/SqlOperations.h +++ b/src/shared/Database/SqlOperations.h @@ -22,6 +22,7 @@ #include "Common.h" #include "ace/Thread_Mutex.h" +#include "ace/Method_Request.h" #include "LockedQueue.h" #include #include "Utilities/Callback.h" @@ -55,9 +56,17 @@ class SqlTransaction : public SqlOperation { private: std::queue m_queue; + ACE_Thread_Mutex m_Mutex; public: SqlTransaction() {} - void DelayExecute(const char *sql) { m_queue.push(mangos_strdup(sql)); } + void DelayExecute(const char *sql) + { + m_Mutex.acquire(); + char* _sql = mangos_strdup(sql); + if (_sql) + m_queue.push(_sql); + m_Mutex.release(); + } void Execute(Database *db); }; @@ -117,4 +126,30 @@ class SqlQueryHolderEx : public SqlOperation : m_holder(holder), m_callback(callback), m_queue(queue) {} void Execute(Database *db); }; + +class SqlAsyncTask : public ACE_Method_Request +{ + public: + SqlAsyncTask(Database * db, SqlOperation * op) : m_db(db), m_op(op){} + ~SqlAsyncTask() + { + if (!m_op) + return; + + delete m_op; + m_op = NULL; + } + int call() + { + if(this == NULL || !m_db || !m_op) + return -1; + + m_op->Execute(m_db); + return 0; + } + private: + Database * m_db; + SqlOperation * m_op; +}; + #endif //__SQLOPERATIONS_H diff --git a/src/shared/LockedQueue.h b/src/shared/LockedQueue.h index b3adb13..e31d389 100644 --- a/src/shared/LockedQueue.h +++ b/src/shared/LockedQueue.h @@ -68,7 +68,8 @@ namespace ACE_Based //! Gets the next result in the queue, if any. bool next(T& result) { - ACE_Guard g(this->_lock); + //ACE_Guard g(this->_lock); + ACE_GUARD_RETURN (LockType, g, this->_lock, false); if (_queue.empty()) return false; @@ -121,6 +122,20 @@ namespace ACE_Based { this->_lock.release(); } + + ///! Calls pop_front of the queue + void pop_front() + { + ACE_GUARD (LockType, g, this->_lock); + _queue.pop_front(); + } + + ///! Checks if we're empty or not with locks held + bool empty() + { + ACE_GUARD_RETURN (LockType, g, this->_lock, false); + return _queue.empty(); + } }; } #endif