Big re-organization of repository [W.I.P]

This commit is contained in:
Yehonal
2016-08-11 20:25:27 +02:00
parent c62a72c0a8
commit 0f85ce1c54
3016 changed files with 1271 additions and 1 deletions

View File

@@ -0,0 +1,310 @@
/*
* Copyright (C)
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License as published by the
* Free Software Foundation; either version 2 of the License, or (at your
* option) any later version.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
* more details.
*
* You should have received a copy of the GNU General Public License along
* with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _CALLBACK_H
#define _CALLBACK_H
#include <ace/Future.h>
#include <ace/Future_Set.h>
#include "QueryResult.h"
typedef ACE_Future<QueryResult> QueryResultFuture;
typedef ACE_Future<PreparedQueryResult> PreparedQueryResultFuture;
/*! A simple template using ACE_Future to manage callbacks from the thread and object that
issued the request. <ParamType> is variable type of parameter that is used as parameter
for the callback function.
*/
#define CALLBACK_STAGE_INVALID uint8(-1)
template <typename Result, typename ParamType, bool chain = false>
class QueryCallback
{
public:
QueryCallback() : _param(), _stage(chain ? 0 : CALLBACK_STAGE_INVALID) {}
//! The parameter of this function should be a resultset returned from either .AsyncQuery or .AsyncPQuery
void SetFutureResult(ACE_Future<Result> value)
{
_result = value;
}
ACE_Future<Result> GetFutureResult()
{
return _result;
}
int IsReady()
{
return _result.ready();
}
void GetResult(Result& res)
{
_result.get(res);
}
void FreeResult()
{
_result.cancel();
}
void SetParam(ParamType value)
{
_param = value;
}
ParamType GetParam()
{
return _param;
}
//! Resets the stage of the callback chain
void ResetStage()
{
if (!chain)
return;
_stage = 0;
}
//! Advances the callback chain to the next stage, so upper level code can act on its results accordingly
void NextStage()
{
if (!chain)
return;
++_stage;
}
//! Returns the callback stage (or CALLBACK_STAGE_INVALID if invalid)
uint8 GetStage()
{
return _stage;
}
//! Resets all underlying variables (param, result and stage)
void Reset()
{
SetParam(NULL);
FreeResult();
ResetStage();
}
private:
ACE_Future<Result> _result;
ParamType _param;
uint8 _stage;
};
template <typename Result, typename ParamType1, typename ParamType2, bool chain = false>
class QueryCallback_2
{
public:
QueryCallback_2() : _stage(chain ? 0 : CALLBACK_STAGE_INVALID) {}
//! The parameter of this function should be a resultset returned from either .AsyncQuery or .AsyncPQuery
void SetFutureResult(ACE_Future<Result> value)
{
_result = value;
}
ACE_Future<Result> GetFutureResult()
{
return _result;
}
int IsReady()
{
return _result.ready();
}
void GetResult(Result& res)
{
_result.get(res);
}
void FreeResult()
{
_result.cancel();
}
void SetFirstParam(ParamType1 value)
{
_param_1 = value;
}
void SetSecondParam(ParamType2 value)
{
_param_2 = value;
}
ParamType1 GetFirstParam()
{
return _param_1;
}
ParamType2 GetSecondParam()
{
return _param_2;
}
//! Resets the stage of the callback chain
void ResetStage()
{
if (!chain)
return;
_stage = 0;
}
//! Advances the callback chain to the next stage, so upper level code can act on its results accordingly
void NextStage()
{
if (!chain)
return;
++_stage;
}
//! Returns the callback stage (or CALLBACK_STAGE_INVALID if invalid)
uint8 GetStage()
{
return _stage;
}
//! Resets all underlying variables (param, result and stage)
void Reset()
{
SetFirstParam(0);
SetSecondParam(NULL);
FreeResult();
ResetStage();
}
private:
ACE_Future<Result> _result;
ParamType1 _param_1;
ParamType2 _param_2;
uint8 _stage;
};
template <typename Result, typename ParamType1, typename ParamType2, typename ParamType3, bool chain = false>
class QueryCallback_3
{
public:
QueryCallback_3() : _stage(chain ? 0 : CALLBACK_STAGE_INVALID) {}
//! The parameter of this function should be a resultset returned from either .AsyncQuery or .AsyncPQuery
void SetFutureResult(ACE_Future<Result> value)
{
_result = value;
}
ACE_Future<Result> GetFutureResult()
{
return _result;
}
int IsReady()
{
return _result.ready();
}
void GetResult(Result& res)
{
_result.get(res);
}
void FreeResult()
{
_result.cancel();
}
void SetFirstParam(ParamType1 value)
{
_param_1 = value;
}
void SetSecondParam(ParamType2 value)
{
_param_2 = value;
}
void SetThirdParam(ParamType3 value)
{
_param_3 = value;
}
ParamType1 GetFirstParam()
{
return _param_1;
}
ParamType2 GetSecondParam()
{
return _param_2;
}
ParamType3 GetThirdParam()
{
return _param_3;
}
//! Resets the stage of the callback chain
void ResetStage()
{
if (!chain)
return;
_stage = 0;
}
//! Advances the callback chain to the next stage, so upper level code can act on its results accordingly
void NextStage()
{
if (!chain)
return;
++_stage;
}
//! Returns the callback stage (or CALLBACK_STAGE_INVALID if invalid)
uint8 GetStage()
{
return _stage;
}
//! Resets all underlying variables (param, result and stage)
void Reset()
{
SetFirstParam(NULL);
SetSecondParam(NULL);
SetThirdParam(NULL);
FreeResult();
ResetStage();
}
private:
ACE_Future<Result> _result;
ParamType1 _param_1;
ParamType2 _param_2;
ParamType3 _param_3;
uint8 _stage;
};
#endif

View File

@@ -0,0 +1,122 @@
#include <ace/Singleton.h>
#include <ace/Thread_Mutex.h>
#include <ace/Log_Msg.h>
#include "Threading.h"
#include "DelayExecutor.h"
DelayExecutor* DelayExecutor::instance()
{
return ACE_Singleton<DelayExecutor, ACE_Thread_Mutex>::instance();
}
DelayExecutor::DelayExecutor()
: pre_svc_hook_(0), post_svc_hook_(0), activated_(false), mqueue_(1*1024*1024, 1*1024*1024), queue_(&mqueue_)
{
}
DelayExecutor::~DelayExecutor()
{
if (pre_svc_hook_)
delete pre_svc_hook_;
if (post_svc_hook_)
delete post_svc_hook_;
deactivate();
}
int DelayExecutor::deactivate()
{
if (!activated())
return -1;
activated(false);
queue_.queue()->deactivate();
wait();
return 0;
}
int DelayExecutor::svc()
{
if (pre_svc_hook_)
pre_svc_hook_->call();
for (;;)
{
ACE_Method_Request* rq = queue_.dequeue();
if (!rq)
break;
rq->call();
delete rq;
}
if (post_svc_hook_)
post_svc_hook_->call();
return 0;
}
int DelayExecutor::start(int num_threads, ACE_Method_Request* pre_svc_hook, ACE_Method_Request* post_svc_hook)
{
if (activated())
return -1;
if (num_threads < 1)
return -1;
if (pre_svc_hook_)
delete pre_svc_hook_;
if (post_svc_hook_)
delete post_svc_hook_;
pre_svc_hook_ = pre_svc_hook;
post_svc_hook_ = post_svc_hook;
queue_.queue()->activate();
// pussywizard:
ACE_Based::ThreadPriority tp;
int _priority = tp.getPriority(ACE_Based::Highest);
if (ACE_Task_Base::activate(THR_NEW_LWP | THR_JOINABLE, num_threads, 0, _priority) == -1)
return -1;
//if (ACE_Task_Base::activate(THR_NEW_LWP | THR_JOINABLE | THR_INHERIT_SCHED, num_threads) == -1)
// return -1;
activated(true);
return true;
}
int DelayExecutor::execute(ACE_Method_Request* new_req)
{
if (new_req == NULL)
return -1;
// pussywizard: NULL as param for enqueue - wait until the action is possible!
// new tasks are added to the queue during map update (schedule_update in MapInstanced::Update)
// the queue can be momentarily blocked by map threads constantly waiting for tasks (for (;;) { queue_.dequeue();... } in DelayExecutor::svc())
// so just wait a moment, don't drop the task xDddd
if (queue_.enqueue(new_req, /*(ACE_Time_Value*)&ACE_Time_Value::zero*/ NULL) == -1)
{
delete new_req;
ACE_ERROR_RETURN((LM_ERROR, ACE_TEXT("(%t) %p\n"), ACE_TEXT("DelayExecutor::execute enqueue")), -1);
}
return 0;
}
bool DelayExecutor::activated()
{
return activated_;
}
void DelayExecutor::activated(bool s)
{
activated_ = s;
}

View File

@@ -0,0 +1,38 @@
#ifndef _M_DELAY_EXECUTOR_H
#define _M_DELAY_EXECUTOR_H
#include <ace/Task.h>
#include <ace/Activation_Queue.h>
#include <ace/Method_Request.h>
class DelayExecutor : protected ACE_Task_Base
{
public:
DelayExecutor();
virtual ~DelayExecutor();
static DelayExecutor* instance();
int execute(ACE_Method_Request* new_req);
int start(int num_threads = 1, ACE_Method_Request* pre_svc_hook = NULL, ACE_Method_Request* post_svc_hook = NULL);
int deactivate();
bool activated();
virtual int svc();
private:
ACE_Message_Queue<ACE_SYNCH> mqueue_;
ACE_Activation_Queue queue_;
ACE_Method_Request* pre_svc_hook_;
ACE_Method_Request* post_svc_hook_;
bool activated_;
void activated(bool s);
};
#endif // _M_DELAY_EXECUTOR_H

View File

@@ -0,0 +1,158 @@
/*
* Copyright (C)
* Copyright (C)
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License as published by the
* Free Software Foundation; either version 2 of the License, or (at your
* option) any later version.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
* more details.
*
* You should have received a copy of the GNU General Public License along
* with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef LOCKEDQUEUE_H
#define LOCKEDQUEUE_H
#include <ace/Guard_T.h>
#include <ace/Thread_Mutex.h>
#include <deque>
#include <assert.h>
#include "Debugging/Errors.h"
namespace ACE_Based
{
template <class T, class LockType, typename StorageType=std::deque<T> >
class LockedQueue
{
//! Lock access to the queue.
LockType _lock;
//! Storage backing the queue.
StorageType _queue;
//! Cancellation flag.
volatile bool _canceled;
public:
//! Create a LockedQueue.
LockedQueue()
: _canceled(false)
{
}
//! Destroy a LockedQueue.
virtual ~LockedQueue()
{
}
//! Adds an item to the queue.
void add(const T& item)
{
lock();
//ASSERT(!this->_canceled);
// throw Cancellation_Exception();
_queue.push_back(item);
unlock();
}
//! Gets the next result in the queue, if any.
bool next(T& result)
{
// ACE_Guard<LockType> g(this->_lock);
ACE_GUARD_RETURN (LockType, g, this->_lock, false);
if (_queue.empty())
return false;
//ASSERT (!_queue.empty() || !this->_canceled);
// throw Cancellation_Exception();
result = _queue.front();
_queue.pop_front();
return true;
}
template<class Checker>
bool next(T& result, Checker& check)
{
ACE_Guard<LockType> g(this->_lock);
if (_queue.empty())
return false;
result = _queue.front();
if (!check.Process(result))
return false;
_queue.pop_front();
return true;
}
//! Peeks at the top of the queue. Check if the queue is empty before calling! Remember to unlock after use if autoUnlock == false.
T& peek(bool autoUnlock = false)
{
lock();
T& result = _queue.front();
if (autoUnlock)
unlock();
return result;
}
//! Cancels the queue.
void cancel()
{
lock();
_canceled = true;
unlock();
}
//! Checks if the queue is cancelled.
bool cancelled()
{
ACE_Guard<LockType> g(this->_lock);
return _canceled;
}
//! Locks the queue for access.
void lock()
{
this->_lock.acquire();
}
//! Unlocks the queue.
void unlock()
{
this->_lock.release();
}
///! Calls pop_front of the queue
void pop_front()
{
ACE_GUARD (LockType, g, this->_lock);
_queue.pop_front();
}
///! Checks if we're empty or not with locks held
bool empty()
{
ACE_GUARD_RETURN (LockType, g, this->_lock, false);
return _queue.empty();
}
};
}
#endif

View File

@@ -0,0 +1,235 @@
/*
* Copyright (C)
* Copyright (C)
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License as published by the
* Free Software Foundation; either version 2 of the License, or (at your
* option) any later version.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
* more details.
*
* You should have received a copy of the GNU General Public License along
* with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "Threading.h"
#include "Errors.h"
#include <ace/OS_NS_unistd.h>
#include <ace/Sched_Params.h>
#include <vector>
using namespace ACE_Based;
ThreadPriority::ThreadPriority()
{
for (int i = Idle; i < MAXPRIORITYNUM; ++i)
m_priority[i] = ACE_THR_PRI_OTHER_DEF;
m_priority[Idle] = ACE_Sched_Params::priority_min(ACE_SCHED_OTHER);
m_priority[Realtime] = ACE_Sched_Params::priority_max(ACE_SCHED_OTHER);
std::vector<int> _tmp;
ACE_Sched_Params::Policy _policy = ACE_SCHED_OTHER;
ACE_Sched_Priority_Iterator pr_iter(_policy);
while (pr_iter.more())
{
_tmp.push_back(pr_iter.priority());
pr_iter.next();
}
ASSERT (!_tmp.empty());
if (_tmp.size() >= MAXPRIORITYNUM)
{
const size_t max_pos = _tmp.size();
size_t min_pos = 1;
size_t norm_pos = 0;
for (size_t i = 0; i < max_pos; ++i)
{
if (_tmp[i] == ACE_THR_PRI_OTHER_DEF)
{
norm_pos = i + 1;
break;
}
}
// since we have only 7(seven) values in enum Priority
// and 3 we know already (Idle, Normal, Realtime) so
// we need to split each list [Idle...Normal] and [Normal...Realtime]
// into pieces
const size_t _divider = 4;
size_t _div = (norm_pos - min_pos) / _divider;
if (_div == 0)
_div = 1;
min_pos = (norm_pos - 1);
m_priority[Low] = _tmp[min_pos -= _div];
m_priority[Lowest] = _tmp[min_pos -= _div ];
_div = (max_pos - norm_pos) / _divider;
if (_div == 0)
_div = 1;
min_pos = norm_pos - 1;
m_priority[High] = _tmp[min_pos += _div];
m_priority[Highest] = _tmp[min_pos += _div];
}
}
int ThreadPriority::getPriority(Priority p) const
{
if (p < Idle)
p = Idle;
if (p > Realtime)
p = Realtime;
return m_priority[p];
}
#define THREADFLAG (THR_NEW_LWP | THR_SCHED_DEFAULT| THR_JOINABLE)
Thread::Thread(): m_iThreadId(0), m_hThreadHandle(0), m_task(0)
{
}
Thread::Thread(Runnable* instance): m_iThreadId(0), m_hThreadHandle(0), m_task(instance)
{
// register reference to m_task to prevent it deeltion until destructor
if (m_task)
m_task->incReference();
bool _start = start();
ASSERT (_start);
}
Thread::~Thread()
{
//Wait();
// deleted runnable object (if no other references)
if (m_task)
m_task->decReference();
}
//initialize Thread's class static member
Thread::ThreadStorage Thread::m_ThreadStorage;
ThreadPriority Thread::m_TpEnum;
bool Thread::start()
{
if (m_task == 0 || m_iThreadId != 0)
return false;
// incRef before spawing the thread, otherwise Thread::ThreadTask() might call decRef and delete m_task
m_task->incReference();
bool res = (ACE_Thread::spawn(&Thread::ThreadTask, (void*)m_task, THREADFLAG, &m_iThreadId, &m_hThreadHandle) == 0);
if (!res)
m_task->decReference();
return res;
}
bool Thread::wait()
{
if (!m_hThreadHandle || !m_task)
return false;
ACE_THR_FUNC_RETURN _value = ACE_THR_FUNC_RETURN(-1);
int _res = ACE_Thread::join(m_hThreadHandle, &_value);
m_iThreadId = 0;
m_hThreadHandle = 0;
return (_res == 0);
}
void Thread::destroy()
{
if (!m_iThreadId || !m_task)
return;
if (ACE_Thread::kill(m_iThreadId, -1) != 0)
return;
m_iThreadId = 0;
m_hThreadHandle = 0;
// reference set at ACE_Thread::spawn
m_task->decReference();
}
void Thread::suspend()
{
ACE_Thread::suspend(m_hThreadHandle);
}
void Thread::resume()
{
ACE_Thread::resume(m_hThreadHandle);
}
ACE_THR_FUNC_RETURN Thread::ThreadTask(void * param)
{
Runnable* _task = (Runnable*)param;
_task->run();
// task execution complete, free referecne added at
_task->decReference();
return (ACE_THR_FUNC_RETURN)0;
}
ACE_thread_t Thread::currentId()
{
return ACE_Thread::self();
}
ACE_hthread_t Thread::currentHandle()
{
ACE_hthread_t _handle;
ACE_Thread::self(_handle);
return _handle;
}
Thread * Thread::current()
{
Thread * _thread = m_ThreadStorage.ts_object();
if (!_thread)
{
_thread = new Thread();
_thread->m_iThreadId = Thread::currentId();
_thread->m_hThreadHandle = Thread::currentHandle();
Thread * _oldValue = m_ThreadStorage.ts_object(_thread);
if (_oldValue)
delete _oldValue;
}
return _thread;
}
void Thread::setPriority(Priority type)
{
int _priority = m_TpEnum.getPriority(type);
int _ok = ACE_Thread::setprio(m_hThreadHandle, _priority);
//remove this ASSERT in case you don't want to know is thread priority change was successful or not
ASSERT (_ok == 0);
}
void Thread::Sleep(unsigned long msecs)
{
ACE_OS::sleep(ACE_Time_Value(0, 1000 * msecs));
}

View File

@@ -0,0 +1,110 @@
/*
* Copyright (C)
* Copyright (C)
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License as published by the
* Free Software Foundation; either version 2 of the License, or (at your
* option) any later version.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
* more details.
*
* You should have received a copy of the GNU General Public License along
* with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef THREADING_H
#define THREADING_H
#include "Common.h"
#include <ace/ACE.h>
#include <ace/Thread.h>
#include <ace/TSS_T.h>
#include <ace/Atomic_Op.h>
#include <assert.h>
namespace ACE_Based
{
class Runnable
{
public:
virtual ~Runnable() { }
virtual void run() = 0;
void incReference() { ++m_refs; }
void decReference()
{
if (!--m_refs)
delete this;
}
private:
ACE_Atomic_Op<ACE_Thread_Mutex, long> m_refs;
};
enum Priority
{
Idle,
Lowest,
Low,
Normal,
High,
Highest,
Realtime
};
#define MAXPRIORITYNUM (Realtime + 1)
class ThreadPriority
{
public:
ThreadPriority();
int getPriority(Priority p) const;
private:
int m_priority[MAXPRIORITYNUM];
};
class Thread
{
public:
Thread();
explicit Thread(Runnable* instance);
~Thread();
bool start();
bool wait();
void destroy();
void suspend();
void resume();
void setPriority(Priority type);
static void Sleep(unsigned long msecs);
static ACE_thread_t currentId();
static ACE_hthread_t currentHandle();
static Thread * current();
private:
Thread(const Thread&);
Thread& operator=(const Thread&);
static ACE_THR_FUNC_RETURN ThreadTask(void * param);
ACE_thread_t m_iThreadId;
ACE_hthread_t m_hThreadHandle;
Runnable* m_task;
typedef ACE_TSS<Thread> ThreadStorage;
//global object - container for Thread class representation of every thread
static ThreadStorage m_ThreadStorage;
//use this object to determine current OS thread priority values mapped to enum Priority{ }
static ThreadPriority m_TpEnum;
};
}
#endif