From aac3648b5c92c9d47431f45b36184afd6666a989 Mon Sep 17 00:00:00 2001 From: Kargatum Date: Tue, 12 Nov 2019 08:13:40 +0700 Subject: [PATCH] feat(Core/DBLayer): move DatabaseWorkerPool into it's own translation unit (#2417) --- src/common/Database/DatabaseWorkerPool.cpp | 329 ++++++++++++++++ src/common/Database/DatabaseWorkerPool.h | 416 ++++----------------- 2 files changed, 399 insertions(+), 346 deletions(-) create mode 100644 src/common/Database/DatabaseWorkerPool.cpp diff --git a/src/common/Database/DatabaseWorkerPool.cpp b/src/common/Database/DatabaseWorkerPool.cpp new file mode 100644 index 000000000..cd0421422 --- /dev/null +++ b/src/common/Database/DatabaseWorkerPool.cpp @@ -0,0 +1,329 @@ +/* + * Copyright (C) 2016+ AzerothCore + * Copyright (C) 2008-2016 TrinityCore + * Copyright (C) 2005-2009 MaNGOS + */ + +#include "DatabaseWorkerPool.h" +#include "DatabaseEnv.h" + +#define MIN_MYSQL_SERVER_VERSION 50600u +#define MIN_MYSQL_CLIENT_VERSION 50600u + +template +DatabaseWorkerPool::DatabaseWorkerPool() : +_mqueue(new ACE_Message_Queue(2*1024*1024, 2*1024*1024)), +_queue(new ACE_Activation_Queue(_mqueue)) +{ + memset(_connectionCount, 0, sizeof(_connectionCount)); + _connections.resize(IDX_SIZE); + + WPFatal(mysql_thread_safe(), "Used MySQL library isn't thread-safe."); + WPFatal(mysql_get_client_version() >= MIN_MYSQL_CLIENT_VERSION, "AzerothCore does not support MySQL versions below 5.6"); +} + +template +bool DatabaseWorkerPool::Open(const std::string& infoString, uint8 async_threads, uint8 synch_threads) +{ + bool res = true; + _connectionInfo = MySQLConnectionInfo(infoString); + + sLog->outSQLDriver("Opening DatabasePool '%s'. Asynchronous connections: %u, synchronous connections: %u.", + GetDatabaseName(), async_threads, synch_threads); + + //! Open asynchronous connections (delayed operations) + _connections[IDX_ASYNC].resize(async_threads); + for (uint8 i = 0; i < async_threads; ++i) + { + T* t = new T(_queue, _connectionInfo); + res &= t->Open(); + if (res) // only check mysql version if connection is valid + WPFatal(mysql_get_server_version(t->GetHandle()) >= MIN_MYSQL_SERVER_VERSION, "AzerothCore does not support MySQL versions below 5.6"); + + _connections[IDX_ASYNC][i] = t; + ++_connectionCount[IDX_ASYNC]; + } + + //! Open synchronous connections (direct, blocking operations) + _connections[IDX_SYNCH].resize(synch_threads); + for (uint8 i = 0; i < synch_threads; ++i) + { + T* t = new T(_connectionInfo); + res &= t->Open(); + _connections[IDX_SYNCH][i] = t; + ++_connectionCount[IDX_SYNCH]; + } + + if (res) + sLog->outSQLDriver("DatabasePool '%s' opened successfully. %u total connections running.", GetDatabaseName(), + (_connectionCount[IDX_SYNCH] + _connectionCount[IDX_ASYNC])); + else + sLog->outError("DatabasePool %s NOT opened. There were errors opening the MySQL connections. Check your SQLDriverLogFile " + "for specific errors.", GetDatabaseName()); + + return res; +} + +template +void DatabaseWorkerPool::Close() +{ + sLog->outSQLDriver("Closing down DatabasePool '%s'.", GetDatabaseName()); + + //! Shuts down delaythreads for this connection pool by underlying deactivate(). + //! The next dequeue attempt in the worker thread tasks will result in an error, + //! ultimately ending the worker thread task. + _queue->queue()->close(); + + for (uint8 i = 0; i < _connectionCount[IDX_ASYNC]; ++i) + { + T* t = _connections[IDX_ASYNC][i]; + DatabaseWorker* worker = t->m_worker; + worker->wait(); //! Block until no more threads are running this task. + delete worker; + t->Close(); //! Closes the actualy MySQL connection. + } + + sLog->outSQLDriver("Asynchronous connections on DatabasePool '%s' terminated. Proceeding with synchronous connections.", + GetDatabaseName()); + + //! Shut down the synchronous connections + //! There's no need for locking the connection, because DatabaseWorkerPool<>::Close + //! should only be called after any other thread tasks in the core have exited, + //! meaning there can be no concurrent access at this point. + for (uint8 i = 0; i < _connectionCount[IDX_SYNCH]; ++i) + _connections[IDX_SYNCH][i]->Close(); + + //! Deletes the ACE_Activation_Queue object and its underlying ACE_Message_Queue + delete _queue; + delete _mqueue; + + sLog->outSQLDriver("All connections on DatabasePool '%s' closed.", GetDatabaseName()); +} + +template +void DatabaseWorkerPool::Execute(const char* sql) +{ + if (!sql) + return; + + BasicStatementTask* task = new BasicStatementTask(sql); + Enqueue(task); +} + +template +void DatabaseWorkerPool::Execute(PreparedStatement* stmt) +{ + PreparedStatementTask* task = new PreparedStatementTask(stmt); + Enqueue(task); +} + +template +void DatabaseWorkerPool::DirectExecute(const char* sql) +{ + if (!sql) + return; + + T* t = GetFreeConnection(); + t->Execute(sql); + t->Unlock(); +} + +template +void DatabaseWorkerPool::DirectExecute(PreparedStatement* stmt) +{ + T* t = GetFreeConnection(); + t->Execute(stmt); + t->Unlock(); + + //! Delete proxy-class. Not needed anymore + delete stmt; +} + +template +QueryResult DatabaseWorkerPool::Query(const char* sql, T* conn /* = nullptr*/) +{ + if (!conn) + conn = GetFreeConnection(); + + ResultSet* result = conn->Query(sql); + conn->Unlock(); + if (!result || !result->GetRowCount()) + { + delete result; + return QueryResult(NULL); + } + + result->NextRow(); + return QueryResult(result); +} + +template +PreparedQueryResult DatabaseWorkerPool::Query(PreparedStatement* stmt) +{ + T* t = GetFreeConnection(); + PreparedResultSet* ret = t->Query(stmt); + t->Unlock(); + + //! Delete proxy-class. Not needed anymore + delete stmt; + + if (!ret || !ret->GetRowCount()) + { + delete ret; + return PreparedQueryResult(NULL); + } + + return PreparedQueryResult(ret); +} + +template +QueryResultFuture DatabaseWorkerPool::AsyncQuery(const char* sql) +{ + QueryResultFuture res; + BasicStatementTask* task = new BasicStatementTask(sql, res); + Enqueue(task); + return res; //! Actual return value has no use yet +} + +template +PreparedQueryResultFuture DatabaseWorkerPool::AsyncQuery(PreparedStatement* stmt) +{ + PreparedQueryResultFuture res; + PreparedStatementTask* task = new PreparedStatementTask(stmt, res); + Enqueue(task); + return res; +} + +template +QueryResultHolderFuture DatabaseWorkerPool::DelayQueryHolder(SQLQueryHolder* holder) +{ + QueryResultHolderFuture res; + SQLQueryHolderTask* task = new SQLQueryHolderTask(holder, res); + Enqueue(task); + return res; //! Fool compiler, has no use yet +} + +template +SQLTransaction DatabaseWorkerPool::BeginTransaction() +{ + return SQLTransaction(new Transaction); +} + +template +void DatabaseWorkerPool::CommitTransaction(SQLTransaction transaction) +{ + #ifdef TRINITY_DEBUG + //! Only analyze transaction weaknesses in Debug mode. + //! Ideally we catch the faults in Debug mode and then correct them, + //! so there's no need to waste these CPU cycles in Release mode. + switch (transaction->GetSize()) + { + case 0: + sLog->outSQLDriver("Transaction contains 0 queries. Not executing."); + return; + case 1: + sLog->outSQLDriver("Warning: Transaction only holds 1 query, consider removing Transaction context in code."); + break; + default: + break; + } + #endif // TRINITY_DEBUG + + Enqueue(new TransactionTask(transaction)); +} + +template +void DatabaseWorkerPool::DirectCommitTransaction(SQLTransaction& transaction) +{ + T* con = GetFreeConnection(); + if (con->ExecuteTransaction(transaction)) + { + con->Unlock(); // OK, operation succesful + return; + } + + //! Handle MySQL Errno 1213 without extending deadlock to the core itself + //! TODO: More elegant way + if (con->GetLastError() == 1213) + { + uint8 loopBreaker = 5; + for (uint8 i = 0; i < loopBreaker; ++i) + { + if (con->ExecuteTransaction(transaction)) + break; + } + } + + //! Clean up now. + transaction->Cleanup(); + + con->Unlock(); +} + +template +void DatabaseWorkerPool::ExecuteOrAppend(SQLTransaction& trans, PreparedStatement* stmt) +{ + if (trans.null()) + Execute(stmt); + else + trans->Append(stmt); +} + +template +void DatabaseWorkerPool::ExecuteOrAppend(SQLTransaction& trans, const char* sql) +{ + if (trans.null()) + Execute(sql); + else + trans->Append(sql); +} + +template +PreparedStatement* DatabaseWorkerPool::GetPreparedStatement(uint32 index) +{ + return new PreparedStatement(index); +} + +template +void DatabaseWorkerPool::KeepAlive() +{ + //! Ping synchronous connections + for (uint8 i = 0; i < _connectionCount[IDX_SYNCH]; ++i) + { + T* t = _connections[IDX_SYNCH][i]; + if (t->LockIfReady()) + { + t->Ping(); + t->Unlock(); + } + } + + //! Assuming all worker threads are free, every worker thread will receive 1 ping operation request + //! If one or more worker threads are busy, the ping operations will not be split evenly, but this doesn't matter + //! as the sole purpose is to prevent connections from idling. + for (size_t i = 0; i < _connections[IDX_ASYNC].size(); ++i) + Enqueue(new PingOperation); +} + +template +T* DatabaseWorkerPool::GetFreeConnection() +{ + uint8 i = 0; + size_t num_cons = _connectionCount[IDX_SYNCH]; + T* t = nullptr; + + //! Block forever until a connection is free + for (;;) + { + t = _connections[IDX_SYNCH][++i % num_cons]; + //! Must be matched with t->Unlock() or you will get deadlocks + if (t->LockIfReady()) + break; + } + + return t; +} + +template class DatabaseWorkerPool; +template class DatabaseWorkerPool; +template class DatabaseWorkerPool; diff --git a/src/common/Database/DatabaseWorkerPool.h b/src/common/Database/DatabaseWorkerPool.h index f1cddd1e2..53f46ce5d 100644 --- a/src/common/Database/DatabaseWorkerPool.h +++ b/src/common/Database/DatabaseWorkerPool.h @@ -19,9 +19,7 @@ #include "QueryResult.h" #include "QueryHolder.h" #include "AdhocStatement.h" - -#define MIN_MYSQL_SERVER_VERSION 50100u -#define MIN_MYSQL_CLIENT_VERSION 50100u +#include "StringFormat.h" class PingOperation : public SQLOperation { @@ -38,94 +36,13 @@ class DatabaseWorkerPool { public: /* Activity state */ - DatabaseWorkerPool() : - _mqueue(new ACE_Message_Queue(2*1024*1024, 2*1024*1024)), - _queue(new ACE_Activation_Queue(_mqueue)) - { - memset(_connectionCount, 0, sizeof(_connectionCount)); - _connections.resize(IDX_SIZE); + DatabaseWorkerPool(); - WPFatal(mysql_thread_safe(), "Used MySQL library isn't thread-safe."); - WPFatal(mysql_get_client_version() >= MIN_MYSQL_CLIENT_VERSION, "AzerothCore does not support MySQL versions below 5.1"); - } + ~DatabaseWorkerPool() { } - ~DatabaseWorkerPool() - { - } + bool Open(const std::string& infoString, uint8 async_threads, uint8 synch_threads); - bool Open(const std::string& infoString, uint8 async_threads, uint8 synch_threads) - { - bool res = true; - _connectionInfo = MySQLConnectionInfo(infoString); - - sLog->outSQLDriver("Opening DatabasePool '%s'. Asynchronous connections: %u, synchronous connections: %u.", - GetDatabaseName(), async_threads, synch_threads); - - //! Open asynchronous connections (delayed operations) - _connections[IDX_ASYNC].resize(async_threads); - for (uint8 i = 0; i < async_threads; ++i) - { - T* t = new T(_queue, _connectionInfo); - res &= t->Open(); - if (res) // only check mysql version if connection is valid - WPFatal(mysql_get_server_version(t->GetHandle()) >= MIN_MYSQL_SERVER_VERSION, "AzerothCore does not support MySQL versions below 5.1"); - _connections[IDX_ASYNC][i] = t; - ++_connectionCount[IDX_ASYNC]; - } - - //! Open synchronous connections (direct, blocking operations) - _connections[IDX_SYNCH].resize(synch_threads); - for (uint8 i = 0; i < synch_threads; ++i) - { - T* t = new T(_connectionInfo); - res &= t->Open(); - _connections[IDX_SYNCH][i] = t; - ++_connectionCount[IDX_SYNCH]; - } - - if (res) - sLog->outSQLDriver("DatabasePool '%s' opened successfully. %u total connections running.", GetDatabaseName(), - (_connectionCount[IDX_SYNCH] + _connectionCount[IDX_ASYNC])); - else - sLog->outError("DatabasePool %s NOT opened. There were errors opening the MySQL connections. Check your SQLDriverLogFile " - "for specific errors.", GetDatabaseName()); - return res; - } - - void Close() - { - sLog->outSQLDriver("Closing down DatabasePool '%s'.", GetDatabaseName()); - - //! Shuts down delaythreads for this connection pool by underlying deactivate(). - //! The next dequeue attempt in the worker thread tasks will result in an error, - //! ultimately ending the worker thread task. - _queue->queue()->close(); - - for (uint8 i = 0; i < _connectionCount[IDX_ASYNC]; ++i) - { - T* t = _connections[IDX_ASYNC][i]; - DatabaseWorker* worker = t->m_worker; - worker->wait(); //! Block until no more threads are running this task. - delete worker; - t->Close(); //! Closes the actualy MySQL connection. - } - - sLog->outSQLDriver("Asynchronous connections on DatabasePool '%s' terminated. Proceeding with synchronous connections.", - GetDatabaseName()); - - //! Shut down the synchronous connections - //! There's no need for locking the connection, because DatabaseWorkerPool<>::Close - //! should only be called after any other thread tasks in the core have exited, - //! meaning there can be no concurrent access at this point. - for (uint8 i = 0; i < _connectionCount[IDX_SYNCH]; ++i) - _connections[IDX_SYNCH][i]->Close(); - - //! Deletes the ACE_Activation_Queue object and its underlying ACE_Message_Queue - delete _queue; - delete _mqueue; - - sLog->outSQLDriver("All connections on DatabasePool '%s' closed.", GetDatabaseName()); - } + void Close(); /** Delayed one-way statement methods. @@ -133,38 +50,22 @@ class DatabaseWorkerPool //! Enqueues a one-way SQL operation in string format that will be executed asynchronously. //! This method should only be used for queries that are only executed once, e.g during startup. - void Execute(const char* sql) - { - if (!sql) - return; - - BasicStatementTask* task = new BasicStatementTask(sql); - Enqueue(task); - } + void Execute(const char* sql); //! Enqueues a one-way SQL operation in string format -with variable args- that will be executed asynchronously. //! This method should only be used for queries that are only executed once, e.g during startup. - void PExecute(const char* sql, ...) + template + void PExecute(Format&& sql, Args&&... args) { - if (!sql) + if (ACORE::IsFormatEmptyOrNull(sql)) return; - va_list ap; - char szQuery[MAX_QUERY_LEN]; - va_start(ap, sql); - vsnprintf(szQuery, MAX_QUERY_LEN, sql, ap); - va_end(ap); - - Execute(szQuery); + Execute(ACORE::StringFormat(std::forward(sql), std::forward(args)...).c_str()); } //! Enqueues a one-way SQL operation in prepared statement format that will be executed asynchronously. //! Statement must be prepared with CONNECTION_ASYNC flag. - void Execute(PreparedStatement* stmt) - { - PreparedStatementTask* task = new PreparedStatementTask(stmt); - Enqueue(task); - } + void Execute(PreparedStatement* stmt); /** Direct synchronous one-way statement methods. @@ -172,43 +73,22 @@ class DatabaseWorkerPool //! Directly executes a one-way SQL operation in string format, that will block the calling thread until finished. //! This method should only be used for queries that are only executed once, e.g during startup. - void DirectExecute(const char* sql) - { - if (!sql) - return; - - T* t = GetFreeConnection(); - t->Execute(sql); - t->Unlock(); - } + void DirectExecute(const char* sql); //! Directly executes a one-way SQL operation in string format -with variable args-, that will block the calling thread until finished. //! This method should only be used for queries that are only executed once, e.g during startup. - void DirectPExecute(const char* sql, ...) + template + void DirectPExecute(Format&& sql, Args&&... args) { - if (!sql) + if (ACORE::IsFormatEmptyOrNull(sql)) return; - va_list ap; - char szQuery[MAX_QUERY_LEN]; - va_start(ap, sql); - vsnprintf(szQuery, MAX_QUERY_LEN, sql, ap); - va_end(ap); - - return DirectExecute(szQuery); + DirectExecute(ACORE::StringFormat(std::forward(sql), std::forward(args)...).c_str()); } //! Directly executes a one-way SQL operation in prepared statement format, that will block the calling thread until finished. //! Statement must be prepared with the CONNECTION_SYNCH flag. - void DirectExecute(PreparedStatement* stmt) - { - T* t = GetFreeConnection(); - t->Execute(stmt); - t->Unlock(); - - //! Delete proxy-class. Not needed anymore - delete stmt; - } + void DirectExecute(PreparedStatement* stmt); /** Synchronous query (with resultset) methods. @@ -216,75 +96,34 @@ class DatabaseWorkerPool //! Directly executes an SQL query in string format that will block the calling thread until finished. //! Returns reference counted auto pointer, no need for manual memory management in upper level code. - QueryResult Query(const char* sql, T* conn = NULL) + QueryResult Query(const char* sql, T* conn = nullptr); + + //! Directly executes an SQL query in string format -with variable args- that will block the calling thread until finished. + //! Returns reference counted auto pointer, no need for manual memory management in upper level code. + template + QueryResult PQuery(Format&& sql, T* conn, Args&&... args) { - if (!conn) - conn = GetFreeConnection(); + if (ACORE::IsFormatEmptyOrNull(sql)) + return QueryResult(nullptr); - ResultSet* result = conn->Query(sql); - conn->Unlock(); - if (!result || !result->GetRowCount()) - { - delete result; - return QueryResult(NULL); - } - - result->NextRow(); - return QueryResult(result); + return Query(ACORE::StringFormat(std::forward(sql), std::forward(args)...).c_str(), conn); } //! Directly executes an SQL query in string format -with variable args- that will block the calling thread until finished. //! Returns reference counted auto pointer, no need for manual memory management in upper level code. - QueryResult PQuery(const char* sql, T* conn, ...) + template + QueryResult PQuery(Format&& sql, Args&&... args) { - if (!sql) - return QueryResult(NULL); + if (ACORE::IsFormatEmptyOrNull(sql)) + return QueryResult(nullptr); - va_list ap; - char szQuery[MAX_QUERY_LEN]; - va_start(ap, conn); - vsnprintf(szQuery, MAX_QUERY_LEN, sql, ap); - va_end(ap); - - return Query(szQuery, conn); - } - - //! Directly executes an SQL query in string format -with variable args- that will block the calling thread until finished. - //! Returns reference counted auto pointer, no need for manual memory management in upper level code. - QueryResult PQuery(const char* sql, ...) - { - if (!sql) - return QueryResult(NULL); - - va_list ap; - char szQuery[MAX_QUERY_LEN]; - va_start(ap, sql); - vsnprintf(szQuery, MAX_QUERY_LEN, sql, ap); - va_end(ap); - - return Query(szQuery); + return Query(ACORE::StringFormat(std::forward(sql), std::forward(args)...).c_str()); } //! Directly executes an SQL query in prepared format that will block the calling thread until finished. //! Returns reference counted auto pointer, no need for manual memory management in upper level code. //! Statement must be prepared with CONNECTION_SYNCH flag. - PreparedQueryResult Query(PreparedStatement* stmt) - { - T* t = GetFreeConnection(); - PreparedResultSet* ret = t->Query(stmt); - t->Unlock(); - - //! Delete proxy-class. Not needed anymore - delete stmt; - - if (!ret || !ret->GetRowCount()) - { - delete ret; - return PreparedQueryResult(NULL); - } - - return PreparedQueryResult(ret); - } + PreparedQueryResult Query(PreparedStatement* stmt); /** Asynchronous query (with resultset) methods. @@ -292,132 +131,52 @@ class DatabaseWorkerPool //! Enqueues a query in string format that will set the value of the QueryResultFuture return object as soon as the query is executed. //! The return value is then processed in ProcessQueryCallback methods. - QueryResultFuture AsyncQuery(const char* sql) - { - QueryResultFuture res; - BasicStatementTask* task = new BasicStatementTask(sql, res); - Enqueue(task); - return res; //! Actual return value has no use yet - } + QueryResultFuture AsyncQuery(const char* sql); //! Enqueues a query in string format -with variable args- that will set the value of the QueryResultFuture return object as soon as the query is executed. //! The return value is then processed in ProcessQueryCallback methods. - QueryResultFuture AsyncPQuery(const char* sql, ...) + template + QueryResultFuture AsyncPQuery(Format&& sql, Args&&... args) { - va_list ap; - char szQuery[MAX_QUERY_LEN]; - va_start(ap, sql); - vsnprintf(szQuery, MAX_QUERY_LEN, sql, ap); - va_end(ap); + if (ACORE::IsFormatEmptyOrNull(sql)) + return QueryResult(nullptr); - return AsyncQuery(szQuery); + return AsyncQuery(ACORE::StringFormat(std::forward(sql), std::forward(args)...).c_str()); } //! Enqueues a query in prepared format that will set the value of the PreparedQueryResultFuture return object as soon as the query is executed. //! The return value is then processed in ProcessQueryCallback methods. //! Statement must be prepared with CONNECTION_ASYNC flag. - PreparedQueryResultFuture AsyncQuery(PreparedStatement* stmt) - { - PreparedQueryResultFuture res; - PreparedStatementTask* task = new PreparedStatementTask(stmt, res); - Enqueue(task); - return res; - } + PreparedQueryResultFuture AsyncQuery(PreparedStatement* stmt); //! Enqueues a vector of SQL operations (can be both adhoc and prepared) that will set the value of the QueryResultHolderFuture //! return object as soon as the query is executed. //! The return value is then processed in ProcessQueryCallback methods. //! Any prepared statements added to this holder need to be prepared with the CONNECTION_ASYNC flag. - QueryResultHolderFuture DelayQueryHolder(SQLQueryHolder* holder) - { - QueryResultHolderFuture res; - SQLQueryHolderTask* task = new SQLQueryHolderTask(holder, res); - Enqueue(task); - return res; //! Fool compiler, has no use yet - } + QueryResultHolderFuture DelayQueryHolder(SQLQueryHolder* holder); /** Transaction context methods. */ //! Begins an automanaged transaction pointer that will automatically rollback if not commited. (Autocommit=0) - SQLTransaction BeginTransaction() - { - return SQLTransaction(new Transaction); - } + SQLTransaction BeginTransaction(); //! Enqueues a collection of one-way SQL operations (can be both adhoc and prepared). The order in which these operations //! were appended to the transaction will be respected during execution. - void CommitTransaction(SQLTransaction transaction) - { - #ifdef TRINITY_DEBUG - //! Only analyze transaction weaknesses in Debug mode. - //! Ideally we catch the faults in Debug mode and then correct them, - //! so there's no need to waste these CPU cycles in Release mode. - switch (transaction->GetSize()) - { - case 0: - sLog->outSQLDriver("Transaction contains 0 queries. Not executing."); - return; - case 1: - sLog->outSQLDriver("Warning: Transaction only holds 1 query, consider removing Transaction context in code."); - break; - default: - break; - } - #endif // TRINITY_DEBUG - - Enqueue(new TransactionTask(transaction)); - } + void CommitTransaction(SQLTransaction transaction); //! Directly executes a collection of one-way SQL operations (can be both adhoc and prepared). The order in which these operations //! were appended to the transaction will be respected during execution. - void DirectCommitTransaction(SQLTransaction& transaction) - { - T* con = GetFreeConnection(); - if (con->ExecuteTransaction(transaction)) - { - con->Unlock(); // OK, operation succesful - return; - } - - //! Handle MySQL Errno 1213 without extending deadlock to the core itself - //! TODO: More elegant way - if (con->GetLastError() == 1213) - { - uint8 loopBreaker = 5; - for (uint8 i = 0; i < loopBreaker; ++i) - { - if (con->ExecuteTransaction(transaction)) - break; - } - } - - //! Clean up now. - transaction->Cleanup(); - - con->Unlock(); - } + void DirectCommitTransaction(SQLTransaction& transaction); //! Method used to execute prepared statements in a diverse context. //! Will be wrapped in a transaction if valid object is present, otherwise executed standalone. - void ExecuteOrAppend(SQLTransaction& trans, PreparedStatement* stmt) - { - if (trans.null()) - Execute(stmt); - else - trans->Append(stmt); - } + void ExecuteOrAppend(SQLTransaction& trans, PreparedStatement* stmt); //! Method used to execute ad-hoc statements in a diverse context. //! Will be wrapped in a transaction if valid object is present, otherwise executed standalone. - void ExecuteOrAppend(SQLTransaction& trans, const char* sql) - { - if (trans.null()) - Execute(sql); - else - trans->Append(sql); - } + void ExecuteOrAppend(SQLTransaction& trans, const char* sql); /** Other @@ -426,51 +185,10 @@ class DatabaseWorkerPool //! Automanaged (internally) pointer to a prepared statement object for usage in upper level code. //! Pointer is deleted in this->DirectExecute(PreparedStatement*), this->Query(PreparedStatement*) or PreparedStatementTask::~PreparedStatementTask. //! This object is not tied to the prepared statement on the MySQL context yet until execution. - PreparedStatement* GetPreparedStatement(uint32 index) - { - return new PreparedStatement(index); - } + PreparedStatement* GetPreparedStatement(uint32 index); //! Apply escape string'ing for current collation. (utf8) - void EscapeString(std::string& str) - { - if (str.empty()) - return; - - char* buf = new char[str.size()*2+1]; - EscapeString(buf, str.c_str(), str.size()); - str = buf; - delete[] buf; - } - - //! Keeps all our MySQL connections alive, prevent the server from disconnecting us. - void KeepAlive() - { - //! Ping synchronous connections - for (uint8 i = 0; i < _connectionCount[IDX_SYNCH]; ++i) - { - T* t = _connections[IDX_SYNCH][i]; - if (t->LockIfReady()) - { - t->Ping(); - t->Unlock(); - } - } - - //! Assuming all worker threads are free, every worker thread will receive 1 ping operation request - //! If one or more worker threads are busy, the ping operations will not be split evenly, but this doesn't matter - //! as the sole purpose is to prevent connections from idling. - for (size_t i = 0; i < _connections[IDX_ASYNC].size(); ++i) - Enqueue(new PingOperation); - } - - char const* GetDatabaseName() const - { - return _connectionInfo.database.c_str(); - } - - private: - unsigned long EscapeString(char *to, const char *from, unsigned long length) + unsigned long EscapeString(char* to, const char* from, unsigned long length) { if (!to || !from || !length) return 0; @@ -478,6 +196,27 @@ class DatabaseWorkerPool return mysql_real_escape_string(_connections[IDX_SYNCH][0]->GetHandle(), to, from, length); } + //! Keeps all our MySQL connections alive, prevent the server from disconnecting us. + void KeepAlive(); + + char const* GetDatabaseName() const + { + return _connectionInfo.database.c_str(); + } + + void EscapeString(std::string& str) + { + if (str.empty()) + return; + + char* buf = new char[str.size() * 2 + 1]; + EscapeString(buf, str.c_str(), str.size()); + str = buf; + delete[] buf; + } + + private: + void Enqueue(SQLOperation* op) { _queue->enqueue(op); @@ -485,22 +224,7 @@ class DatabaseWorkerPool //! Gets a free connection in the synchronous connection pool. //! Caller MUST call t->Unlock() after touching the MySQL context to prevent deadlocks. - T* GetFreeConnection() - { - uint8 i = 0; - size_t num_cons = _connectionCount[IDX_SYNCH]; - T* t = NULL; - //! Block forever until a connection is free - for (;;) - { - t = _connections[IDX_SYNCH][++i % num_cons]; - //! Must be matched with t->Unlock() or you will get deadlocks - if (t->LockIfReady()) - break; - } - - return t; - } + T* GetFreeConnection(); private: enum _internalIndex @@ -512,7 +236,7 @@ class DatabaseWorkerPool ACE_Message_Queue* _mqueue; ACE_Activation_Queue* _queue; //! Queue shared by async worker threads. - std::vector< std::vector > _connections; + std::vector> _connections; uint32 _connectionCount[2]; //! Counter of MySQL connections; MySQLConnectionInfo _connectionInfo; };