libtasks Documentation  1.6
dispatcher.cpp
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2013-2014 ADTECH GmbH
3  * Licensed under MIT (https://github.com/adtechlabs/libtasks/blob/master/COPYING)
4  *
5  * Author: Andreas Pohl
6  */
7 
8 #include <tasks/dispatcher.h>
9 #include <tasks/worker.h>
10 #include <tasks/executor.h>
11 #include <tasks/task.h>
12 #include <tasks/logging.h>
13 #include <tasks/disposable.h>
14 #include <tasks/event_task.h>
15 #include <tasks/exec_task.h>
16 #include <tasks/cleanup_task.h>
17 #include <cassert>
18 #include <cstdarg>
19 #include <chrono>
20 
21 namespace tasks {
22 
23 std::shared_ptr<dispatcher> dispatcher::m_instance = nullptr;
24 dispatcher::mode dispatcher::m_run_mode = mode::SINGLE_LOOP;
25 
26 dispatcher::dispatcher(uint8_t num_workers)
27  : m_term(false), m_num_workers(num_workers), m_workers_busy(tools::bitset(m_num_workers)), m_rr_worker_id(0) {
28  // Create workers
29  tdbg("dispatcher: number of cpus is " << (int)m_num_workers << std::endl);
30 }
31 
32 void dispatcher::run(int num, ...) {
33  // Start the event loop
34  start();
35 
36  // Start tasks if passed
37  if (num > 0) {
38  va_list tasks;
39  va_start(tasks, num);
40  for (int i = 0; i < num; i++) {
41  tasks::task* t = va_arg(tasks, tasks::task*);
42  add_task(t);
43  }
44  va_end(tasks);
45  }
46 
47  // Now we park this thread until someone calls finish()
48  join();
49 }
50 
51 void dispatcher::run(std::vector<tasks::task*>& tasks) {
52  // Start the event loop
53  start();
54 
55  for (auto t : tasks) {
56  add_task(t);
57  }
58 
59  // Now we park this thread until someone calls finish()
60  join();
61 }
62 
64  // The first thread becomes the leader or each thread gets its own loop
65  struct ev_loop* loop_raw = ev_default_loop(0);
66  for (uint8_t i = 0; i < m_num_workers; i++) {
67  std::unique_ptr<loop_t> loop = nullptr;
69  if (nullptr == loop_raw) {
70  loop_raw = ev_loop_new(0);
71  }
72  assert(loop_raw != nullptr);
73  loop.reset(new loop_t(loop_raw));
74  // Force the next iteration to create a new loop struct.
75  loop_raw = nullptr;
76  }
78  auto w = std::make_shared<worker>(i, loop);
79  assert(nullptr != w);
80  m_workers.push_back(w);
81  }
83  // Wait for the workers to become available
84  for (uint8_t i = 0; i < m_num_workers; i++) {
85  while (!m_workers_busy.test(i)) {
86  std::this_thread::sleep_for(std::chrono::milliseconds(10));
87  }
88  }
89  // Promote the first leader
90  std::unique_ptr<loop_t> loop(new loop_t);
91  loop->ptr = ev_default_loop(0);
93  m_workers[0]->set_event_loop(loop);
94  }
95  m_started = true;
96 }
97 
99  std::unique_lock<std::mutex> lock(m_finish_mutex);
100  while (!m_term && m_finish_cond.wait_for(lock, std::chrono::milliseconds(1000)) == std::cv_status::timeout) {
101  }
102  tdbg("dispatcher: terminating workers" << std::endl);
103  for (auto w : m_workers) {
104  w->terminate();
105  }
106  tdbg("dispatcher: finished" << std::endl);
107 }
108 
109 std::shared_ptr<worker> dispatcher::free_worker() {
110  if (m_num_workers > 1) {
113  m_last_worker_id = id;
114  tdbg("dispatcher: free_worker(" << id << ")" << std::endl);
115  m_workers_busy.unset(id);
116  return m_workers[id];
117  }
118  }
119  return nullptr;
120 }
121 
122 std::shared_ptr<executor> dispatcher::free_executor() {
123  tdbg("searching executor" << std::endl);
124  std::lock_guard<std::mutex> lock(m_executor_mutex);
125  auto i = m_executors.begin();
126  auto end = m_executors.end();
127  while (i != end) {
128  if ((*i)->terminated()) {
129  tdbg("erasing terminated executor " << (*i).get() << std::endl);
130  i = m_executors.erase(i);
131  } else if (!(*i)->busy()) {
132  tdbg("returning executor " << (*i).get() << std::endl);
133  (*i)->set_busy();
134  return *i;
135  } else {
136  i++;
137  }
138  }
139  tdbg("creating executor" << std::endl);
140  auto e = std::make_shared<executor>();
141  m_executors.push_back(e);
142  tdbg("returning executor " << e.get() << std::endl);
143  return e;
144 }
145 
146 void dispatcher::add_free_worker(uint8_t id) {
147  tdbg("dispatcher: add_free_worker(" << (unsigned int)id << ")" << std::endl);
148  m_workers_busy.set(id);
149 }
150 
152  worker* worker = task->assigned_worker();
153  if (nullptr == worker) {
154  switch (m_run_mode) {
155  case mode::MULTI_LOOP:
156  // In multi loop mode we pick a worker by round robin
157  worker = m_workers[m_rr_worker_id++ % m_num_workers].get();
158  break;
159  default:
160  // In single loop mode use the current executing worker
161  worker = worker::get();
162  // If we get called from some other thread than a worker we pick the last active worker
163  if (nullptr == worker) {
164  worker = m_workers[m_last_worker_id].get();
165  }
166  }
167  }
168  return worker;
169 }
170 
172  for (auto& w : m_workers) {
173  terr(w->get_string() << ": number of handled events is " << w->events_count() << std::endl);
174  }
175 }
176 
178  // Check if the system is running
179  if (!m_started) {
180  terr("dispatcher: You have to call dispatcher::start() before dispatcher::add_task()" << std::endl);
181  assert(false);
182  }
183  // The pass object is no event_task. Now find a worker to add it.
184  try {
185  event_task* et = dynamic_cast<event_task*>(task);
186  if (nullptr != et) {
187  add_event_task(et);
188  return;
189  }
190  } catch (std::exception&) {
191  }
192 
193  // Check if the passed task object is an exec_task.
194  try {
195  exec_task* et = dynamic_cast<exec_task*>(task);
196  if (nullptr != et) {
197  add_exec_task(et);
198  return;
199  }
200  } catch (std::exception&) {
201  }
202 
203  terr("dispatcher: task " << task << " is not an event_task nor an exec_task!");
204  assert(false);
205 }
206 
209  tdbg("add_event_task: adding event_task " << task << " using worker " << worker << std::endl);
210  task->init_watcher();
211  task->assign_worker(worker);
212  task->start_watcher(worker);
213 }
214 
216  std::shared_ptr<executor> executor = free_executor();
217  tdbg("add_exec_task: adding exec_task " << task << " using executor " << executor << std::endl);
218  executor->add_task(task);
219 }
220 
222  // The pass object is no event_task. Now find a worker to add it.
223  try {
224  event_task* et = dynamic_cast<event_task*>(task);
225  if (nullptr != et) {
226  remove_event_task(et);
227  return;
228  }
229  } catch (std::exception&) {
230  }
231 
232  // Check if the passed task object is an exec_task.
233  try {
234  exec_task* et = dynamic_cast<exec_task*>(task);
235  if (nullptr != et) {
236  remove_exec_task(et);
237  return;
238  }
239  } catch (std::exception&) {
240  }
241 
242  terr("dispatcher: task " << task << " is not an event_task nor an exec_task!");
243  assert(false);
244 }
245 
247  // If the task implements the disposable interface, we check if it is ready to be
248  // disposed.
249  try {
250  disposable* disp = dynamic_cast<disposable*>(task);
251  if (nullptr != disp) {
253  tdbg("remove_event_task: deposing event_task " << task << " using worker " << worker << std::endl);
254  if (disp->can_dispose()) {
255  disp->dispose(worker);
256  } else {
257  // Try later
258  cleanup_task* ct = new cleanup_task(disp);
259  tdbg("remove_event_task: delayed deposing event_task "
260  << disp << "/" << task << " using worker " << worker << " and cleanup_task " << ct << std::endl);
261  ct->init_watcher();
262  ct->assign_worker(worker);
263  ct->start_watcher(worker);
264  }
265  return;
266  }
267  } catch (std::exception&) {
268  }
269  // No disposable, so delete the task now.
270  tdbg("remove_event_task: deleting event_task " << task << std::endl);
271  delete task;
272 }
273 
275  tdbg("remove_exec_task: deleting exec_task " << task << std::endl);
276  // No disposable object support yet. Just delete it.
277  delete task;
278 }
279 
280 static void handle_signal(struct ev_loop* /* loop */, ev_signal* sig, int /* revents */) {
281  signal_func_t* pfunc = (signal_func_t*) sig->data;
282  assert(nullptr != pfunc);
283  (*pfunc)(sig->signum);
284  delete pfunc;
285 }
286 
288  if (m_instance && m_instance->m_started) {
289  terr("dispatcher: add_signal_handler can't be used after the event loop has been started!");
290  assert(false);
291  }
292  ev_signal* es = new ev_signal();
293  signal_func_t* pfunc = new signal_func_t(func);
294  ev_signal_init(es, handle_signal, sig);
295  es->data = pfunc;
296  ev_signal_start(ev_default_loop(0), es);
297 }
298 
299 } // tasks
std::mutex m_finish_mutex
Definition: dispatcher.h:175
void set(int_type p)
Set a bit.
Definition: bitset.h:45
The base class for any task.
Definition: task.h:19
void remove_exec_task(exec_task *task)
Definition: dispatcher.cpp:274
void add_task(task *task)
Definition: dispatcher.cpp:177
void run(int num,...)
This method starts the system and blocks until terminate() gets called.
Definition: dispatcher.cpp:32
worker * assigned_worker() const
Returns a pointer to the assigned worker.
Definition: event_task.h:68
std::list< std::shared_ptr< executor > > m_executors
All executor threads.
Definition: dispatcher.h:159
virtual void init_watcher()=0
Initialize the underlying watcher object.
tools::bitset m_workers_busy
State of the workers used for maintaining the leader/followers.
Definition: dispatcher.h:165
std::shared_ptr< executor > free_executor()
Find a free executor. If non is found a new executor gets created.
Definition: dispatcher.cpp:122
std::condition_variable m_finish_cond
Condition variable/mutex used to wait for finishing up.
Definition: dispatcher.h:174
#define terr(m)
Definition: logging.h:57
#define tdbg(m)
Definition: logging.h:54
void init_watcher()
Initialize the underlying watcher object.
Definition: timer_task.h:37
Base class for objects/tasks that can be deleted.
Definition: disposable.h:23
static mode m_run_mode
Definition: dispatcher.h:162
static worker * get()
Provide access to the executing worker thread object in the current thread context.
Definition: worker.h:64
void remove_task(task *task)
Remove a task from the system.
Definition: dispatcher.cpp:221
tools::bitset::int_type m_last_worker_id
Definition: dispatcher.h:166
std::function< void(int)> signal_func_t
Definition: dispatcher.h:32
std::vector< data_type >::size_type int_type
Definition: bitset.h:23
static std::shared_ptr< dispatcher > m_instance
Definition: dispatcher.h:151
void print_worker_stats() const
Definition: dispatcher.cpp:171
void start_watcher(worker *worker)
Activate the underlying watcher to listen for I/O or timer events.
Definition: timer_task.cpp:25
Needed to use std::unique_ptr<>
Definition: worker.h:35
void assign_worker(worker *worker)
Definition: event_task.cpp:13
void unset(int_type p)
Unset a bit.
Definition: bitset.h:51
virtual void start_watcher(worker *worker)=0
Activate the underlying watcher to listen for I/O or timer events.
std::atomic< uint8_t > m_rr_worker_id
Definition: dispatcher.h:171
void add_event_task(event_task *task)
Add an event task to the system.
Definition: dispatcher.cpp:207
std::atomic< bool > m_term
Definition: dispatcher.h:152
std::mutex m_executor_mutex
Definition: dispatcher.h:160
bool test(int_type p) const
Test if a bit is set.
Definition: bitset.h:57
std::vector< std::shared_ptr< worker > > m_workers
All worker threads.
Definition: dispatcher.h:155
void add_exec_task(exec_task *task)
Add an exec task to the system.
Definition: dispatcher.cpp:215
worker * get_worker_by_task(event_task *task)
Definition: dispatcher.cpp:151
uint8_t m_num_workers
Definition: dispatcher.h:156
bool next(int_type &idx, int_type start=0) const
Definition: bitset.h:84
void add_free_worker(uint8_t id)
When a worker finishes his work he returns to the free worker queue.
Definition: dispatcher.cpp:146
dispatcher(uint8_t num_workers)
Definition: dispatcher.cpp:26
void start()
Start the event loop. Do not block.
Definition: dispatcher.cpp:63
static void handle_signal(struct ev_loop *, ev_signal *sig, int)
Definition: dispatcher.cpp:280
void join()
Wait for the dispatcher to finish.
Definition: dispatcher.cpp:98
bool can_dispose() const
Check if a task can be disposed or not.
Definition: disposable.h:33
static void add_signal_handler(int sig, signal_func_t func)
Definition: dispatcher.cpp:287
virtual void dispose(worker *)=0
Dispose an object.
void remove_event_task(event_task *task)
Definition: dispatcher.cpp:246
std::shared_ptr< worker > free_worker()
Get a free worker to promote it to the leader.
Definition: dispatcher.cpp:109