libtasks Documentation  1.6
dispatcher.h
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2013-2014 ADTECH GmbH
3  * Licensed under MIT (https://github.com/adtechlabs/libtasks/blob/master/COPYING)
4  *
5  * Author: Andreas Pohl
6  */
7 
8 #ifndef _TASKS_DISPATCHER_H_
9 #define _TASKS_DISPATCHER_H_
10 
11 #include <vector>
12 #include <list>
13 #include <condition_variable>
14 #include <mutex>
15 #include <memory>
16 #include <atomic>
17 #include <unistd.h>
18 
19 #include <tasks/ev_wrapper.h>
20 #include <tasks/tools/bitset.h>
21 #include <tasks/logging.h>
22 
23 namespace tasks {
24 
25 class worker;
26 class executor;
27 class task;
28 class event_task;
29 class exec_task;
30 class disposable;
31 
32 struct signal_data;
33 
34 typedef std::function<void(int)> signal_func_t;
35 
36 class dispatcher {
37  friend class test_exec;
38 
39  public:
40  enum class mode { SINGLE_LOOP, MULTI_LOOP };
41 
42  dispatcher(uint8_t num_workers);
43 
47  static void init_workers(uint8_t num_workers) {
48  if (nullptr == m_instance) {
49  m_instance = std::make_shared<dispatcher>(num_workers);
50  }
51  }
52 
65  static void init_run_mode(mode m) {
66  if (nullptr != m_instance) {
67  terr("ERROR: dispatcher::init_run_mode must be called before anything else!" << std::endl);
68  assert(false);
69  }
70  m_run_mode = m;
71  }
72 
73  static mode run_mode() { return m_run_mode; }
74 
75  static std::shared_ptr<dispatcher> instance() {
76  if (nullptr == m_instance) {
77  // Create as many workers as we have CPU's per default
78  m_instance = std::make_shared<dispatcher>(sysconf(_SC_NPROCESSORS_ONLN));
79  }
80  return m_instance;
81  }
82 
83  static void destroy() {
84  if (nullptr != m_instance) {
85  if (!m_instance->m_term) {
86  m_instance->terminate();
87  m_instance->join();
88  }
89  m_instance.reset();
90  m_instance = nullptr;
91  }
92  }
93 
96  static void add_signal_handler(int sig, signal_func_t func);
97 
99  std::shared_ptr<worker> free_worker();
100 
102  std::shared_ptr<executor> free_executor();
103 
105  void add_free_worker(uint8_t id);
106 
109  inline worker* last_worker() {
110  return m_workers[m_last_worker_id].get();
111  }
112 
116 
119  void add_task(task* task);
124 
126  void remove_task(task* task);
129 
131  [[deprecated]] void run(int num, ...);
132 
134  void run(std::vector<tasks::task*>& tasks);
135 
137  void start();
138 
140  void join();
141 
143  inline void terminate() {
144  m_term = true;
145  m_finish_cond.notify_one();
146  }
147 
148  void print_worker_stats() const;
149 
150  private:
151  static std::shared_ptr<dispatcher> m_instance;
152  std::atomic<bool> m_term;
153 
155  std::vector<std::shared_ptr<worker> > m_workers;
156  uint8_t m_num_workers = 0;
157 
159  std::list<std::shared_ptr<executor> > m_executors;
160  std::mutex m_executor_mutex;
161 
162  static mode m_run_mode;
163 
167 
171  std::atomic<uint8_t> m_rr_worker_id;
172 
174  std::condition_variable m_finish_cond;
175  std::mutex m_finish_mutex;
176 
177  bool m_started = false;
178 };
179 
180 } // tasks
181 
182 #endif // _TASKS_DISPATCHER_H_
static void init_workers(uint8_t num_workers)
Definition: dispatcher.h:47
std::mutex m_finish_mutex
Definition: dispatcher.h:175
static std::shared_ptr< dispatcher > instance()
Definition: dispatcher.h:75
The base class for any task.
Definition: task.h:19
void remove_exec_task(exec_task *task)
Definition: dispatcher.cpp:274
void add_task(task *task)
Definition: dispatcher.cpp:177
void run(int num,...)
This method starts the system and blocks until terminate() gets called.
Definition: dispatcher.cpp:32
static mode run_mode()
Definition: dispatcher.h:73
std::list< std::shared_ptr< executor > > m_executors
All executor threads.
Definition: dispatcher.h:159
tools::bitset m_workers_busy
State of the workers used for maintaining the leader/followers.
Definition: dispatcher.h:165
std::shared_ptr< executor > free_executor()
Find a free executor. If non is found a new executor gets created.
Definition: dispatcher.cpp:122
std::condition_variable m_finish_cond
Condition variable/mutex used to wait for finishing up.
Definition: dispatcher.h:174
#define terr(m)
Definition: logging.h:57
static mode m_run_mode
Definition: dispatcher.h:162
void remove_task(task *task)
Remove a task from the system.
Definition: dispatcher.cpp:221
tools::bitset::int_type m_last_worker_id
Definition: dispatcher.h:166
std::function< void(int)> signal_func_t
Definition: dispatcher.h:32
std::vector< data_type >::size_type int_type
Definition: bitset.h:23
static std::shared_ptr< dispatcher > m_instance
Definition: dispatcher.h:151
void print_worker_stats() const
Definition: dispatcher.cpp:171
std::atomic< uint8_t > m_rr_worker_id
Definition: dispatcher.h:171
void add_event_task(event_task *task)
Add an event task to the system.
Definition: dispatcher.cpp:207
std::atomic< bool > m_term
Definition: dispatcher.h:152
std::mutex m_executor_mutex
Definition: dispatcher.h:160
static void init_run_mode(mode m)
Definition: dispatcher.h:65
A thread safe lock free bitset.
Definition: bitset.h:20
void terminate()
Terminate the workers and die.
Definition: dispatcher.h:143
std::vector< std::shared_ptr< worker > > m_workers
All worker threads.
Definition: dispatcher.h:155
void add_exec_task(exec_task *task)
Add an exec task to the system.
Definition: dispatcher.cpp:215
worker * get_worker_by_task(event_task *task)
Definition: dispatcher.cpp:151
uint8_t m_num_workers
Definition: dispatcher.h:156
void add_free_worker(uint8_t id)
When a worker finishes his work he returns to the free worker queue.
Definition: dispatcher.cpp:146
dispatcher(uint8_t num_workers)
Definition: dispatcher.cpp:26
void start()
Start the event loop. Do not block.
Definition: dispatcher.cpp:63
worker * last_worker()
Definition: dispatcher.h:109
static void destroy()
Definition: dispatcher.h:83
void join()
Wait for the dispatcher to finish.
Definition: dispatcher.cpp:98
static void add_signal_handler(int sig, signal_func_t func)
Definition: dispatcher.cpp:287
void remove_event_task(event_task *task)
Definition: dispatcher.cpp:246
std::shared_ptr< worker > free_worker()
Get a free worker to promote it to the leader.
Definition: dispatcher.cpp:109