Merge branch 'master' into Playerbot

This commit is contained in:
Yunfan Li
2025-01-09 19:56:11 +08:00
20 changed files with 346 additions and 109 deletions

View File

@@ -20,51 +20,51 @@
#include <condition_variable>
#include <queue>
#include <atomic>
#include <mutex>
template <typename T>
class ProducerConsumerQueue
{
private:
std::mutex _queueLock;
mutable std::mutex _queueLock;
std::queue<T> _queue;
std::condition_variable _condition;
std::atomic<bool> _cancel;
std::atomic<bool> _shutdown;
std::atomic<bool> _cancel{};
std::atomic<bool> _shutdown{};
public:
ProducerConsumerQueue() : _cancel(false), _shutdown(false) { }
ProducerConsumerQueue() = default;
void Push(const T& value)
{
std::lock_guard<std::mutex> lock(_queueLock);
_queue.push(std::move(value));
{
std::lock_guard<std::mutex> lock(_queueLock);
_queue.push(std::move(value));
}
_condition.notify_one();
}
bool Empty()
bool Empty() const
{
std::lock_guard<std::mutex> lock(_queueLock);
return _queue.empty();
}
[[nodiscard]] std::size_t Size() const
{
std::lock_guard<std::mutex> lock(_queueLock);
return _queue.size();
}
bool Pop(T& value)
{
std::lock_guard<std::mutex> lock(_queueLock);
if (_queue.empty() || _cancel)
return false;
value = _queue.front();
value = std::move(_queue.front());
_queue.pop();
return true;
}
@@ -72,39 +72,30 @@ public:
{
std::unique_lock<std::mutex> lock(_queueLock);
// we could be using .wait(lock, predicate) overload here but it is broken
// https://connect.microsoft.com/VisualStudio/feedback/details/1098841
while (_queue.empty() && !_cancel && !_shutdown)
_condition.wait(lock);
// Wait for the queue to have an element or the cancel/shutdown flag
_condition.wait(lock, [this] { return !_queue.empty() || _cancel || _shutdown; });
if (_queue.empty() || _cancel)
return;
value = _queue.front();
value = std::move(_queue.front());
_queue.pop();
}
// Clears the queue and will immediately stop any consumers
// Clears the queue and immediately stops any consumers.
void Cancel()
{
std::unique_lock<std::mutex> lock(_queueLock);
while (!_queue.empty())
{
std::lock_guard<std::mutex> lock(_queueLock);
while (!_queue.empty()) {
T& value = _queue.front();
DeleteQueuedObject(value);
_queue.pop();
}
_cancel = true;
_condition.notify_all();
}
// Graceful stop, will wait for queue to become empty before stopping consumers
// Graceful stop: waits for the queue to become empty before stopping consumers.
void Shutdown()
{
_shutdown = true;
@@ -113,10 +104,13 @@ public:
private:
template<typename E = T>
typename std::enable_if<std::is_pointer<E>::value>::type DeleteQueuedObject(E& obj) { delete obj; }
typename std::enable_if<std::is_pointer<E>::value>::type DeleteQueuedObject(E& obj)
{
delete obj;
}
template<typename E = T>
typename std::enable_if<!std::is_pointer<E>::value>::type DeleteQueuedObject(E const& /*packet*/) { }
typename std::enable_if<!std::is_pointer<E>::value>::type DeleteQueuedObject(E const& /*obj*/) { }
};
#endif

View File

@@ -124,9 +124,9 @@ bool TaskScheduler::IsGroupScheduled(group_t const group)
return _task_holder.IsGroupQueued(group);
}
Milliseconds TaskScheduler::GetNextGroupOcurrence(group_t const group) const
Milliseconds TaskScheduler::GetNextGroupOccurrence(group_t const group) const
{
return std::chrono::duration_cast<std::chrono::milliseconds>(_task_holder.GetNextGroupOcurrence(group) - clock_t::now());
return std::chrono::duration_cast<std::chrono::milliseconds>(_task_holder.GetNextGroupOccurrence(group) - clock_t::now());
}
void TaskScheduler::TaskQueue::Push(TaskContainer&& task)
@@ -194,15 +194,12 @@ bool TaskScheduler::TaskQueue::IsGroupQueued(group_t const group)
return false;
}
TaskScheduler::timepoint_t TaskScheduler::TaskQueue::GetNextGroupOcurrence(group_t const group) const
TaskScheduler::timepoint_t TaskScheduler::TaskQueue::GetNextGroupOccurrence(group_t const group) const
{
TaskScheduler::timepoint_t next = TaskScheduler::timepoint_t::max();
for (auto const& task : container)
{
if (task->IsInGroup(group) && task->_end < next)
next = task->_end;
}
return next;
}
@@ -248,7 +245,7 @@ TaskScheduler::repeated_t TaskContext::GetRepeatCounter() const
return _task->_repeated;
}
TaskScheduler::timepoint_t TaskContext::GetNextOcurrence() const
TaskScheduler::timepoint_t TaskContext::GetNextOccurrence() const
{
return _task->_end;
}

View File

@@ -149,7 +149,7 @@ class TaskScheduler
bool IsGroupQueued(group_t const group);
// Returns the next group occurrence.
TaskScheduler::timepoint_t GetNextGroupOcurrence(group_t const group) const;
TaskScheduler::timepoint_t GetNextGroupOccurrence(group_t const group) const;
bool IsEmpty() const;
};
@@ -377,7 +377,7 @@ public:
}
// Returns the next group occurrence.
Milliseconds GetNextGroupOcurrence(group_t const group) const;
Milliseconds GetNextGroupOccurrence(group_t const group) const;
private:
/// Insert a new task to the enqueued tasks.
@@ -483,7 +483,7 @@ public:
/// Returns the repeat counter which increases every time the task is repeated.
TaskScheduler::repeated_t GetRepeatCounter() const;
TaskScheduler::timepoint_t GetNextOcurrence() const;
TaskScheduler::timepoint_t GetNextOccurrence() const;
/// Repeats the event and sets a new duration.
/// std::chrono::seconds(5) for example.