feat(Core/Database): port TrinityCore database API (#5611)

This commit is contained in:
Kargatum
2021-06-22 11:21:07 +07:00
committed by GitHub
parent 2a2e54d8c5
commit 9ac6fddcae
155 changed files with 5818 additions and 4321 deletions

View File

@@ -1,33 +1,65 @@
/*
* Copyright (C) 2016+ AzerothCore <www.azerothcore.org>
* Copyright (C) 2008-2016 TrinityCore <http://www.trinitycore.org/>
* Copyright (C) 2005-2009 MaNGOS <http://getmangos.com/>
* Copyright (C) 2016+ AzerothCore <www.azerothcore.org>, released under GNU GPL v2 license, you may redistribute it and/or modify it under version 2 of the License, or (at your option), any later version.
* Copyright (C) 2021+ WarheadCore <https://github.com/WarheadCore>
*/
#include "DatabaseWorkerPool.h"
#include "DatabaseEnv.h"
#include "AdhocStatement.h"
#include "Common.h"
#include "Errors.h"
#include "Implementation/CharacterDatabase.h"
#include "Implementation/LoginDatabase.h"
#include "Implementation/WorldDatabase.h"
#include "Log.h"
#include "MySQLPreparedStatement.h"
#include "MySQLWorkaround.h"
#include "PreparedStatement.h"
#include "ProducerConsumerQueue.h"
#include "QueryCallback.h"
#include "QueryHolder.h"
#include "QueryResult.h"
#include "SQLOperation.h"
#include "Transaction.h"
#include <mysqld_error.h>
#ifdef ACORE_DEBUG
#include <boost/stacktrace.hpp>
#include <sstream>
#endif
#define MIN_MYSQL_SERVER_VERSION 50700u
#define MIN_MYSQL_CLIENT_VERSION 50700u
template <class T> DatabaseWorkerPool<T>::DatabaseWorkerPool() :
_mqueue(new ACE_Message_Queue<ACE_SYNCH>(2 * 1024 * 1024, 2 * 1024 * 1024)),
_queue(new ACE_Activation_Queue(_mqueue)),
_async_threads(0),
_synch_threads(0)
class PingOperation : public SQLOperation
{
memset(_connectionCount, 0, sizeof(_connectionCount));
_connections.resize(IDX_SIZE);
//! Operation for idle delaythreads
bool Execute() override
{
m_conn->Ping();
return true;
}
};
template <class T>
DatabaseWorkerPool<T>::DatabaseWorkerPool()
: _queue(new ProducerConsumerQueue<SQLOperation*>()),
_async_threads(0), _synch_threads(0)
{
WPFatal(mysql_thread_safe(), "Used MySQL library isn't thread-safe.");
WPFatal(mysql_get_client_version() >= MIN_MYSQL_CLIENT_VERSION, "AzerothCore does not support MySQL versions below 5.7");
WPFatal(mysql_get_client_version() == MYSQL_VERSION_ID, "Used MySQL library version (%s id %lu) does not match the version id used to compile AzerothCore (id %u)",
mysql_get_client_info(), mysql_get_client_version(), MYSQL_VERSION_ID);
}
template <class T>
DatabaseWorkerPool<T>::~DatabaseWorkerPool()
{
_queue->Cancel();
}
template <class T>
void DatabaseWorkerPool<T>::SetConnectionInfo(std::string const& infoString,
uint8 const asyncThreads, uint8 const synchThreads)
uint8 const asyncThreads, uint8 const synchThreads)
{
_connectionInfo = std::make_unique<MySQLConnectionInfo>(infoString);
@@ -40,22 +72,22 @@ uint32 DatabaseWorkerPool<T>::Open()
{
WPFatal(_connectionInfo.get(), "Connection info was not set!");
LOG_INFO("sql.driver", "Opening DatabasePool '%s'. Asynchronous connections: %u, synchronous connections: %u.",
LOG_INFO("sql.driver", "Opening DatabasePool '%s'. "
"Asynchronous connections: %u, synchronous connections: %u.",
GetDatabaseName(), _async_threads, _synch_threads);
uint32 error = OpenConnections(IDX_ASYNC, _async_threads);
if (error)
{
return error;
}
error = OpenConnections(IDX_SYNCH, _synch_threads);
if (!error)
{
LOG_INFO("sql.driver", "DatabasePool '%s' opened successfully. %u total connections running.",
GetDatabaseName(), (_connectionCount[IDX_SYNCH] + _connectionCount[IDX_ASYNC]));
LOG_INFO("sql.driver", "DatabasePool '%s' opened successfully. " SZFMTD
" total connections running.", GetDatabaseName(),
(_connections[IDX_SYNCH].size() + _connections[IDX_ASYNC].size()));
}
LOG_INFO("sql.driver", " ");
@@ -68,109 +100,59 @@ void DatabaseWorkerPool<T>::Close()
{
LOG_INFO("sql.driver", "Closing down DatabasePool '%s'.", GetDatabaseName());
//! Shuts down delaythreads for this connection pool by underlying deactivate().
//! The next dequeue attempt in the worker thread tasks will result in an error,
//! ultimately ending the worker thread task.
_queue->queue()->close();
//! Closes the actualy MySQL connection.
_connections[IDX_ASYNC].clear();
for (uint8 i = 0; i < _connectionCount[IDX_ASYNC]; ++i)
{
T* t = _connections[IDX_ASYNC][i];
DatabaseWorker* worker = t->m_worker;
worker->wait(); //! Block until no more threads are running this task.
delete worker;
t->Close(); //! Closes the actualy MySQL connection.
}
LOG_INFO("sql.driver", "Asynchronous connections on DatabasePool '%s' terminated. Proceeding with synchronous connections.",
LOG_INFO("sql.driver", "Asynchronous connections on DatabasePool '%s' terminated. "
"Proceeding with synchronous connections.",
GetDatabaseName());
//! Shut down the synchronous connections
//! There's no need for locking the connection, because DatabaseWorkerPool<>::Close
//! should only be called after any other thread tasks in the core have exited,
//! meaning there can be no concurrent access at this point.
for (uint8 i = 0; i < _connectionCount[IDX_SYNCH]; ++i)
_connections[IDX_SYNCH][i]->Close();
//! Deletes the ACE_Activation_Queue object and its underlying ACE_Message_Queue
delete _queue;
delete _mqueue;
_connections[IDX_SYNCH].clear();
LOG_INFO("sql.driver", "All connections on DatabasePool '%s' closed.", GetDatabaseName());
}
template <class T>
uint32 DatabaseWorkerPool<T>::OpenConnections(InternalIndex type, uint8 numConnections)
{
_connections[type].resize(numConnections);
for (uint8 i = 0; i < numConnections; ++i)
{
T* t;
if (type == IDX_ASYNC)
{
t = new T(_queue, *_connectionInfo);
}
else if (type == IDX_SYNCH)
{
t = new T(*_connectionInfo);
}
else
{
ASSERT(false, "> Incorrect InternalIndex (%u)", static_cast<uint32>(type));
}
_connections[type][i] = t;
++_connectionCount[type];
uint32 error = t->Open();
if (!error)
{
if (mysql_get_server_version(t->GetHandle()) < MIN_MYSQL_SERVER_VERSION)
{
LOG_ERROR("sql.driver", "Not support MySQL versions below 5.7");
error = 1;
}
}
// Failed to open a connection or invalid version, abort and cleanup
if (error)
{
while (_connectionCount[type] != 0)
{
T* t = _connections[type][i--];
delete t;
--_connectionCount[type];
}
return error;
}
}
// Everything is fine
return 0;
}
template <class T>
bool DatabaseWorkerPool<T>::PrepareStatements()
{
for (uint8 i = 0; i < IDX_SIZE; ++i)
for (auto& connections : _connections)
{
for (uint32 c = 0; c < _connectionCount[i]; ++c)
for (auto& connection : connections)
{
T* t = _connections[i][c];
t->LockIfReady();
if (!t->PrepareStatements())
connection->LockIfReady();
if (!connection->PrepareStatements())
{
t->Unlock();
connection->Unlock();
Close();
return false;
}
else
connection->Unlock();
size_t const preparedSize = connection->m_stmts.size();
if (_preparedStatementSize.size() < preparedSize)
_preparedStatementSize.resize(preparedSize);
for (size_t i = 0; i < preparedSize; ++i)
{
t->Unlock();
// already set by another connection
// (each connection only has prepared statements of it's own type sync/async)
if (_preparedStatementSize[i] > 0)
continue;
if (MySQLPreparedStatement* stmt = connection->m_stmts[i].get())
{
uint32 const paramCount = stmt->GetParameterCount();
// TC only supports uint8 indices.
ASSERT(paramCount < std::numeric_limits<uint8>::max());
_preparedStatementSize[i] = static_cast<uint8>(paramCount);
}
}
}
}
@@ -179,74 +161,28 @@ bool DatabaseWorkerPool<T>::PrepareStatements()
}
template <class T>
char const* DatabaseWorkerPool<T>::GetDatabaseName() const
QueryResult DatabaseWorkerPool<T>::Query(char const* sql, T* connection /*= nullptr*/)
{
return _connectionInfo->database.c_str();
}
if (!connection)
connection = GetFreeConnection();
template <class T>
void DatabaseWorkerPool<T>::Execute(const char* sql)
{
if (!sql)
return;
BasicStatementTask* task = new BasicStatementTask(sql);
Enqueue(task);
}
template <class T>
void DatabaseWorkerPool<T>::Execute(PreparedStatement* stmt)
{
PreparedStatementTask* task = new PreparedStatementTask(stmt);
Enqueue(task);
}
template <class T>
void DatabaseWorkerPool<T>::DirectExecute(const char* sql)
{
if (!sql)
return;
T* t = GetFreeConnection();
t->Execute(sql);
t->Unlock();
}
template <class T>
void DatabaseWorkerPool<T>::DirectExecute(PreparedStatement* stmt)
{
T* t = GetFreeConnection();
t->Execute(stmt);
t->Unlock();
//! Delete proxy-class. Not needed anymore
delete stmt;
}
template <class T>
QueryResult DatabaseWorkerPool<T>::Query(const char* sql, T* conn /* = nullptr*/)
{
if (!conn)
conn = GetFreeConnection();
ResultSet* result = conn->Query(sql);
conn->Unlock();
if (!result || !result->GetRowCount())
ResultSet* result = connection->Query(sql);
connection->Unlock();
if (!result || !result->GetRowCount() || !result->NextRow())
{
delete result;
return QueryResult(nullptr);
}
result->NextRow();
return QueryResult(result);
}
template <class T>
PreparedQueryResult DatabaseWorkerPool<T>::Query(PreparedStatement* stmt)
PreparedQueryResult DatabaseWorkerPool<T>::Query(PreparedStatement<T>* stmt)
{
T* t = GetFreeConnection();
PreparedResultSet* ret = t->Query(stmt);
t->Unlock();
auto connection = GetFreeConnection();
PreparedResultSet* ret = connection->Query(stmt);
connection->Unlock();
//! Delete proxy-class. Not needed anymore
delete stmt;
@@ -261,40 +197,66 @@ PreparedQueryResult DatabaseWorkerPool<T>::Query(PreparedStatement* stmt)
}
template <class T>
QueryResultFuture DatabaseWorkerPool<T>::AsyncQuery(const char* sql)
QueryCallback DatabaseWorkerPool<T>::AsyncQuery(char const* sql)
{
QueryResultFuture res;
BasicStatementTask* task = new BasicStatementTask(sql, res);
BasicStatementTask* task = new BasicStatementTask(sql, true);
// Store future result before enqueueing - task might get already processed and deleted before returning from this method
QueryResultFuture result = task->GetFuture();
Enqueue(task);
return res; //! Actual return value has no use yet
return QueryCallback(std::move(result));
}
template <class T>
PreparedQueryResultFuture DatabaseWorkerPool<T>::AsyncQuery(PreparedStatement* stmt)
QueryCallback DatabaseWorkerPool<T>::AsyncQuery(PreparedStatement<T>* stmt)
{
PreparedQueryResultFuture res;
PreparedStatementTask* task = new PreparedStatementTask(stmt, res);
PreparedStatementTask* task = new PreparedStatementTask(stmt, true);
// Store future result before enqueueing - task might get already processed and deleted before returning from this method
PreparedQueryResultFuture result = task->GetFuture();
Enqueue(task);
return res;
return QueryCallback(std::move(result));
}
template <class T>
QueryResultHolderFuture DatabaseWorkerPool<T>::DelayQueryHolder(SQLQueryHolder* holder)
SQLQueryHolderCallback DatabaseWorkerPool<T>::DelayQueryHolder(std::shared_ptr<SQLQueryHolder<T>> holder)
{
QueryResultHolderFuture res;
SQLQueryHolderTask* task = new SQLQueryHolderTask(holder, res);
SQLQueryHolderTask* task = new SQLQueryHolderTask(holder);
// Store future result before enqueueing - task might get already processed and deleted before returning from this method
QueryResultHolderFuture result = task->GetFuture();
Enqueue(task);
return res; //! Fool compiler, has no use yet
return { std::move(holder), std::move(result) };
}
template <class T>
SQLTransaction DatabaseWorkerPool<T>::BeginTransaction()
SQLTransaction<T> DatabaseWorkerPool<T>::BeginTransaction()
{
return SQLTransaction(new Transaction);
return std::make_shared<Transaction<T>>();
}
template <class T>
void DatabaseWorkerPool<T>::CommitTransaction(SQLTransaction transaction)
void DatabaseWorkerPool<T>::CommitTransaction(SQLTransaction<T> transaction)
{
#ifdef ACORE_DEBUG
//! Only analyze transaction weaknesses in Debug mode.
//! Ideally we catch the faults in Debug mode and then correct them,
//! so there's no need to waste these CPU cycles in Release mode.
switch (transaction->GetSize())
{
case 0:
LOG_DEBUG("sql.driver", "Transaction contains 0 queries. Not executing.");
return;
case 1:
LOG_DEBUG("sql.driver", "Warning: Transaction only holds 1 query, consider removing Transaction context in code.");
break;
default:
break;
}
#endif // ACORE_DEBUG
Enqueue(new TransactionTask(transaction));
}
template <class T>
TransactionCallback DatabaseWorkerPool<T>::AsyncCommitTransaction(SQLTransaction<T> transaction)
{
#ifdef ACORE_DEBUG
//! Only analyze transaction weaknesses in Debug mode.
@@ -303,38 +265,42 @@ void DatabaseWorkerPool<T>::CommitTransaction(SQLTransaction transaction)
switch (transaction->GetSize())
{
case 0:
LOG_INFO("sql.driver", "Transaction contains 0 queries. Not executing.");
return;
LOG_DEBUG("sql.driver", "Transaction contains 0 queries. Not executing.");
break;
case 1:
LOG_INFO("sql.driver", "Warning: Transaction only holds 1 query, consider removing Transaction context in code.");
LOG_DEBUG("sql.driver", "Warning: Transaction only holds 1 query, consider removing Transaction context in code.");
break;
default:
break;
}
#endif // ACORE_DEBUG
Enqueue(new TransactionTask(transaction));
TransactionWithResultTask* task = new TransactionWithResultTask(transaction);
TransactionFuture result = task->GetFuture();
Enqueue(task);
return TransactionCallback(std::move(result));
}
template <class T>
void DatabaseWorkerPool<T>::DirectCommitTransaction(SQLTransaction& transaction)
void DatabaseWorkerPool<T>::DirectCommitTransaction(SQLTransaction<T>& transaction)
{
T* con = GetFreeConnection();
int errorCode = con->ExecuteTransaction(transaction);
T* connection = GetFreeConnection();
int errorCode = connection->ExecuteTransaction(transaction);
if (!errorCode)
{
con->Unlock(); // OK, operation succesful
connection->Unlock(); // OK, operation succesful
return;
}
//! Handle MySQL Errno 1213 without extending deadlock to the core itself
//! TODO: More elegant way
/// @todo More elegant way
if (errorCode == ER_LOCK_DEADLOCK)
{
//todo: handle multiple sync threads deadlocking in a similar way as async threads
uint8 loopBreaker = 5;
for (uint8 i = 0; i < loopBreaker; ++i)
{
if (!con->ExecuteTransaction(transaction))
if (!connection->ExecuteTransaction(transaction))
break;
}
}
@@ -342,20 +308,177 @@ void DatabaseWorkerPool<T>::DirectCommitTransaction(SQLTransaction& transaction)
//! Clean up now.
transaction->Cleanup();
con->Unlock();
connection->Unlock();
}
template <class T>
void DatabaseWorkerPool<T>::ExecuteOrAppend(SQLTransaction& trans, PreparedStatement* stmt)
PreparedStatement<T>* DatabaseWorkerPool<T>::GetPreparedStatement(PreparedStatementIndex index)
{
if (!trans)
Execute(stmt);
else
trans->Append(stmt);
return new PreparedStatement<T>(index, _preparedStatementSize[index]);
}
template <class T>
void DatabaseWorkerPool<T>::ExecuteOrAppend(SQLTransaction& trans, const char* sql)
void DatabaseWorkerPool<T>::EscapeString(std::string& str)
{
if (str.empty())
return;
char* buf = new char[str.size() * 2 + 1];
EscapeString(buf, str.c_str(), uint32(str.size()));
str = buf;
delete[] buf;
}
template <class T>
void DatabaseWorkerPool<T>::KeepAlive()
{
//! Ping synchronous connections
for (auto& connection : _connections[IDX_SYNCH])
{
if (connection->LockIfReady())
{
connection->Ping();
connection->Unlock();
}
}
//! Assuming all worker threads are free, every worker thread will receive 1 ping operation request
//! If one or more worker threads are busy, the ping operations will not be split evenly, but this doesn't matter
//! as the sole purpose is to prevent connections from idling.
auto const count = _connections[IDX_ASYNC].size();
for (uint8 i = 0; i < count; ++i)
Enqueue(new PingOperation);
}
template <class T>
uint32 DatabaseWorkerPool<T>::OpenConnections(InternalIndex type, uint8 numConnections)
{
for (uint8 i = 0; i < numConnections; ++i)
{
// Create the connection
auto connection = [&] {
switch (type)
{
case IDX_ASYNC:
return std::make_unique<T>(_queue.get(), *_connectionInfo);
case IDX_SYNCH:
return std::make_unique<T>(*_connectionInfo);
default:
ABORT();
}
}();
if (uint32 error = connection->Open())
{
// Failed to open a connection or invalid version, abort and cleanup
_connections[type].clear();
return error;
}
else if (connection->GetServerVersion() < MIN_MYSQL_SERVER_VERSION)
{
LOG_ERROR("sql.driver", "AzerothCore does not support MySQL versions below 5.7");
return 1;
}
else
{
_connections[type].push_back(std::move(connection));
}
}
// Everything is fine
return 0;
}
template <class T>
unsigned long DatabaseWorkerPool<T>::EscapeString(char* to, char const* from, unsigned long length)
{
if (!to || !from || !length)
return 0;
return _connections[IDX_SYNCH].front()->EscapeString(to, from, length);
}
template <class T>
void DatabaseWorkerPool<T>::Enqueue(SQLOperation* op)
{
_queue->Push(op);
}
template <class T>
T* DatabaseWorkerPool<T>::GetFreeConnection()
{
#ifdef ACORE_DEBUG
if (_warnSyncQueries)
{
std::ostringstream ss;
ss << boost::stacktrace::stacktrace();
LOG_WARN("sql.performances", "Sync query at:\n%s", ss.str().c_str());
}
#endif
uint8 i = 0;
auto const num_cons = _connections[IDX_SYNCH].size();
T* connection = nullptr;
//! Block forever until a connection is free
for (;;)
{
connection = _connections[IDX_SYNCH][++i % num_cons].get();
//! Must be matched with t->Unlock() or you will get deadlocks
if (connection->LockIfReady())
break;
}
return connection;
}
template <class T>
char const* DatabaseWorkerPool<T>::GetDatabaseName() const
{
return _connectionInfo->database.c_str();
}
template <class T>
void DatabaseWorkerPool<T>::Execute(char const* sql)
{
if (Acore::IsFormatEmptyOrNull(sql))
return;
BasicStatementTask* task = new BasicStatementTask(sql);
Enqueue(task);
}
template <class T>
void DatabaseWorkerPool<T>::Execute(PreparedStatement<T>* stmt)
{
PreparedStatementTask* task = new PreparedStatementTask(stmt);
Enqueue(task);
}
template <class T>
void DatabaseWorkerPool<T>::DirectExecute(char const* sql)
{
if (Acore::IsFormatEmptyOrNull(sql))
return;
T* connection = GetFreeConnection();
connection->Execute(sql);
connection->Unlock();
}
template <class T>
void DatabaseWorkerPool<T>::DirectExecute(PreparedStatement<T>* stmt)
{
T* connection = GetFreeConnection();
connection->Execute(stmt);
connection->Unlock();
//! Delete proxy-class. Not needed anymore
delete stmt;
}
template <class T>
void DatabaseWorkerPool<T>::ExecuteOrAppend(SQLTransaction<T>& trans, char const* sql)
{
if (!trans)
Execute(sql);
@@ -364,51 +487,14 @@ void DatabaseWorkerPool<T>::ExecuteOrAppend(SQLTransaction& trans, const char* s
}
template <class T>
PreparedStatement* DatabaseWorkerPool<T>::GetPreparedStatement(uint32 index)
void DatabaseWorkerPool<T>::ExecuteOrAppend(SQLTransaction<T>& trans, PreparedStatement<T>* stmt)
{
return new PreparedStatement(index);
if (!trans)
Execute(stmt);
else
trans->Append(stmt);
}
template <class T>
void DatabaseWorkerPool<T>::KeepAlive()
{
//! Ping synchronous connections
for (uint8 i = 0; i < _connectionCount[IDX_SYNCH]; ++i)
{
T* t = _connections[IDX_SYNCH][i];
if (t->LockIfReady())
{
t->Ping();
t->Unlock();
}
}
//! Assuming all worker threads are free, every worker thread will receive 1 ping operation request
//! If one or more worker threads are busy, the ping operations will not be split evenly, but this doesn't matter
//! as the sole purpose is to prevent connections from idling.
for (size_t i = 0; i < _connections[IDX_ASYNC].size(); ++i)
Enqueue(new PingOperation);
}
template <class T>
T* DatabaseWorkerPool<T>::GetFreeConnection()
{
uint8 i = 0;
size_t num_cons = _connectionCount[IDX_SYNCH];
T* t = nullptr;
//! Block forever until a connection is free
for (;;)
{
t = _connections[IDX_SYNCH][++i % num_cons];
//! Must be matched with t->Unlock() or you will get deadlocks
if (t->LockIfReady())
break;
}
return t;
}
template class DatabaseWorkerPool<LoginDatabaseConnection>;
template class DatabaseWorkerPool<WorldDatabaseConnection>;
template class DatabaseWorkerPool<CharacterDatabaseConnection>;
template class AC_DATABASE_API DatabaseWorkerPool<LoginDatabaseConnection>;
template class AC_DATABASE_API DatabaseWorkerPool<WorldDatabaseConnection>;
template class AC_DATABASE_API DatabaseWorkerPool<CharacterDatabaseConnection>;