feat(Core/Database): implement db loader (#4431)

This commit is contained in:
Kargatum
2021-04-12 15:09:13 +07:00
committed by GitHub
parent 81301c67d9
commit 53ce87d0f7
15 changed files with 426 additions and 226 deletions

View File

@@ -12,7 +12,9 @@
template <class T> DatabaseWorkerPool<T>::DatabaseWorkerPool() :
_mqueue(new ACE_Message_Queue<ACE_SYNCH>(2 * 1024 * 1024, 2 * 1024 * 1024)),
_queue(new ACE_Activation_Queue(_mqueue))
_queue(new ACE_Activation_Queue(_mqueue)),
_async_threads(0),
_synch_threads(0)
{
memset(_connectionCount, 0, sizeof(_connectionCount));
_connections.resize(IDX_SIZE);
@@ -22,45 +24,39 @@ template <class T> DatabaseWorkerPool<T>::DatabaseWorkerPool() :
}
template <class T>
bool DatabaseWorkerPool<T>::Open(const std::string& infoString, uint8 async_threads, uint8 synch_threads)
void DatabaseWorkerPool<T>::SetConnectionInfo(std::string const& infoString,
uint8 const asyncThreads, uint8 const synchThreads)
{
bool res = true;
_connectionInfo = MySQLConnectionInfo(infoString);
_connectionInfo = std::make_unique<MySQLConnectionInfo>(infoString);
_async_threads = asyncThreads;
_synch_threads = synchThreads;
}
template <class T>
uint32 DatabaseWorkerPool<T>::Open()
{
WPFatal(_connectionInfo.get(), "Connection info was not set!");
sLog->outSQLDriver("Opening DatabasePool '%s'. Asynchronous connections: %u, synchronous connections: %u.",
GetDatabaseName(), async_threads, synch_threads);
GetDatabaseName(), _async_threads, _synch_threads);
//! Open asynchronous connections (delayed operations)
_connections[IDX_ASYNC].resize(async_threads);
for (uint8 i = 0; i < async_threads; ++i)
uint32 error = OpenConnections(IDX_ASYNC, _async_threads);
if (error)
{
T* t = new T(_queue, _connectionInfo);
res &= t->Open();
if (res) // only check mysql version if connection is valid
WPFatal(mysql_get_server_version(t->GetHandle()) >= MIN_MYSQL_SERVER_VERSION, "AzerothCore does not support MySQL versions below 5.7");
_connections[IDX_ASYNC][i] = t;
++_connectionCount[IDX_ASYNC];
return error;
}
//! Open synchronous connections (direct, blocking operations)
_connections[IDX_SYNCH].resize(synch_threads);
for (uint8 i = 0; i < synch_threads; ++i)
error = OpenConnections(IDX_SYNCH, _synch_threads);
if (!error)
{
T* t = new T(_connectionInfo);
res &= t->Open();
_connections[IDX_SYNCH][i] = t;
++_connectionCount[IDX_SYNCH];
sLog->outSQLDriver("DatabasePool '%s' opened successfully. %u total connections running.",
GetDatabaseName(), (_connectionCount[IDX_SYNCH] + _connectionCount[IDX_ASYNC]));
}
if (res)
sLog->outSQLDriver("DatabasePool '%s' opened successfully. %u total connections running.", GetDatabaseName(),
(_connectionCount[IDX_SYNCH] + _connectionCount[IDX_ASYNC]));
else
sLog->outError("DatabasePool %s NOT opened. There were errors opening the MySQL connections. Check your SQLDriverLogFile "
"for specific errors.", GetDatabaseName());
return res;
return error;
}
template <class T>
@@ -99,6 +95,91 @@ void DatabaseWorkerPool<T>::Close()
sLog->outSQLDriver("All connections on DatabasePool '%s' closed.", GetDatabaseName());
}
template <class T>
uint32 DatabaseWorkerPool<T>::OpenConnections(InternalIndex type, uint8 numConnections)
{
_connections[type].resize(numConnections);
for (uint8 i = 0; i < numConnections; ++i)
{
T* t;
if (type == IDX_ASYNC)
{
t = new T(_queue, *_connectionInfo);
}
else if (type == IDX_SYNCH)
{
t = new T(*_connectionInfo);
}
else
{
ASSERT(false, "> Incorrect InternalIndex (%u)", static_cast<uint32>(type));
}
_connections[type][i] = t;
++_connectionCount[type];
uint32 error = t->Open();
if (!error)
{
if (mysql_get_server_version(t->GetHandle()) < MIN_MYSQL_SERVER_VERSION)
{
sLog->outSQLDriver("Not support MySQL versions below 5.7");
error = 1;
}
}
// Failed to open a connection or invalid version, abort and cleanup
if (error)
{
while (_connectionCount[type] != 0)
{
T* t = _connections[type][i--];
delete t;
--_connectionCount[type];
}
return error;
}
}
// Everything is fine
return 0;
}
template <class T>
bool DatabaseWorkerPool<T>::PrepareStatements()
{
for (uint8 i = 0; i < IDX_SIZE; ++i)
{
for (uint32 c = 0; c < _connectionCount[i]; ++c)
{
T* t = _connections[i][c];
t->LockIfReady();
if (!t->PrepareStatements())
{
t->Unlock();
Close();
return false;
}
else
{
t->Unlock();
}
}
}
return true;
}
template <class T>
char const* DatabaseWorkerPool<T>::GetDatabaseName() const
{
return _connectionInfo->database.c_str();
}
template <class T>
void DatabaseWorkerPool<T>::Execute(const char* sql)
{