27 : m_term(false), m_num_workers(num_workers), m_workers_busy(tools::bitset(m_num_workers)), m_rr_worker_id(0) {
40 for (
int i = 0; i < num; i++) {
55 for (
auto t : tasks) {
65 struct ev_loop* loop_raw = ev_default_loop(0);
67 std::unique_ptr<loop_t> loop =
nullptr;
69 if (
nullptr == loop_raw) {
70 loop_raw = ev_loop_new(0);
72 assert(loop_raw !=
nullptr);
73 loop.reset(
new loop_t(loop_raw));
78 auto w = std::make_shared<worker>(i, loop);
86 std::this_thread::sleep_for(std::chrono::milliseconds(10));
90 std::unique_ptr<loop_t> loop(
new loop_t);
91 loop->ptr = ev_default_loop(0);
100 while (!
m_term &&
m_finish_cond.wait_for(lock, std::chrono::milliseconds(1000)) == std::cv_status::timeout) {
102 tdbg(
"dispatcher: terminating workers" << std::endl);
106 tdbg(
"dispatcher: finished" << std::endl);
114 tdbg(
"dispatcher: free_worker(" <<
id <<
")" << std::endl);
123 tdbg(
"searching executor" << std::endl);
128 if ((*i)->terminated()) {
129 tdbg(
"erasing terminated executor " << (*i).get() << std::endl);
131 }
else if (!(*i)->busy()) {
132 tdbg(
"returning executor " << (*i).get() << std::endl);
139 tdbg(
"creating executor" << std::endl);
140 auto e = std::make_shared<executor>();
142 tdbg(
"returning executor " << e.get() << std::endl);
147 tdbg(
"dispatcher: add_free_worker(" << (
unsigned int)
id <<
")" << std::endl);
153 if (
nullptr == worker) {
163 if (
nullptr == worker) {
173 terr(w->get_string() <<
": number of handled events is " << w->events_count() << std::endl);
180 terr(
"dispatcher: You have to call dispatcher::start() before dispatcher::add_task()" << std::endl);
190 }
catch (std::exception&) {
200 }
catch (std::exception&) {
203 terr(
"dispatcher: task " << task <<
" is not an event_task nor an exec_task!");
209 tdbg(
"add_event_task: adding event_task " << task <<
" using worker " << worker << std::endl);
217 tdbg(
"add_exec_task: adding exec_task " << task <<
" using executor " << executor << std::endl);
218 executor->add_task(task);
229 }
catch (std::exception&) {
239 }
catch (std::exception&) {
242 terr(
"dispatcher: task " << task <<
" is not an event_task nor an exec_task!");
251 if (
nullptr != disp) {
253 tdbg(
"remove_event_task: deposing event_task " << task <<
" using worker " << worker << std::endl);
259 tdbg(
"remove_event_task: delayed deposing event_task "
260 << disp <<
"/" << task <<
" using worker " << worker <<
" and cleanup_task " << ct << std::endl);
267 }
catch (std::exception&) {
270 tdbg(
"remove_event_task: deleting event_task " << task << std::endl);
275 tdbg(
"remove_exec_task: deleting exec_task " << task << std::endl);
282 assert(
nullptr != pfunc);
283 (*pfunc)(sig->signum);
289 terr(
"dispatcher: add_signal_handler can't be used after the event loop has been started!");
292 ev_signal* es =
new ev_signal();
296 ev_signal_start(ev_default_loop(0), es);
std::mutex m_finish_mutex
The base class for any task.
void remove_exec_task(exec_task *task)
void add_task(task *task)
void run(int num,...)
This method starts the system and blocks until terminate() gets called.
worker * assigned_worker() const
Returns a pointer to the assigned worker.
std::list< std::shared_ptr< executor > > m_executors
All executor threads.
virtual void init_watcher()=0
Initialize the underlying watcher object.
tools::bitset m_workers_busy
State of the workers used for maintaining the leader/followers.
std::shared_ptr< executor > free_executor()
Find a free executor. If non is found a new executor gets created.
std::condition_variable m_finish_cond
Condition variable/mutex used to wait for finishing up.
void init_watcher()
Initialize the underlying watcher object.
Base class for objects/tasks that can be deleted.
static worker * get()
Provide access to the executing worker thread object in the current thread context.
void remove_task(task *task)
Remove a task from the system.
tools::bitset::int_type m_last_worker_id
std::function< void(int)> signal_func_t
static std::shared_ptr< dispatcher > m_instance
void print_worker_stats() const
void start_watcher(worker *worker)
Activate the underlying watcher to listen for I/O or timer events.
Needed to use std::unique_ptr<>
void assign_worker(worker *worker)
virtual void start_watcher(worker *worker)=0
Activate the underlying watcher to listen for I/O or timer events.
std::atomic< uint8_t > m_rr_worker_id
void add_event_task(event_task *task)
Add an event task to the system.
std::atomic< bool > m_term
std::mutex m_executor_mutex
std::vector< std::shared_ptr< worker > > m_workers
All worker threads.
void add_exec_task(exec_task *task)
Add an exec task to the system.
worker * get_worker_by_task(event_task *task)
void add_free_worker(uint8_t id)
When a worker finishes his work he returns to the free worker queue.
dispatcher(uint8_t num_workers)
void start()
Start the event loop. Do not block.
static void handle_signal(struct ev_loop *, ev_signal *sig, int)
void join()
Wait for the dispatcher to finish.
bool can_dispose() const
Check if a task can be disposed or not.
static void add_signal_handler(int sig, signal_func_t func)
virtual void dispose(worker *)=0
Dispose an object.
void remove_event_task(event_task *task)
std::shared_ptr< worker > free_worker()
Get a free worker to promote it to the leader.