libtasks Documentation  1.6
worker.h
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2013-2014 ADTECH GmbH
3  * Licensed under MIT (https://github.com/adtechlabs/libtasks/blob/master/COPYING)
4  *
5  * Author: Andreas Pohl
6  */
7 
8 #ifndef _TASKS_WORKER_H_
9 #define _TASKS_WORKER_H_
10 
11 #include <tasks/dispatcher.h>
12 #include <tasks/event_task.h>
13 #include <tasks/logging.h>
14 #include <tasks/ev_wrapper.h>
15 #include <thread>
16 #include <atomic>
17 #include <memory>
18 #include <mutex>
19 #include <condition_variable>
20 #include <functional>
21 #include <queue>
22 #include <sstream>
23 #include <cassert>
24 #include <vector>
25 
26 // To enable worker::add_time() set this to 1.
27 #define ENABLE_ADD_TIME 0
28 #define ADD_TIME_BUCKETS 10
29 
30 namespace tasks {
31 
32 class event_task;
33 
35 struct loop_t {
36  struct ev_loop* ptr;
37  loop_t() : ptr(nullptr) {}
38  loop_t(struct ev_loop* p) : ptr(p) {}
39 };
40 
42  std::queue<task_func_t> queue;
43  std::mutex mutex;
44 };
45 
50 struct event {
52  int revents;
53 };
54 
55 class worker {
56  public:
57  worker(uint8_t id, std::unique_ptr<loop_t>& loop);
58  virtual ~worker();
59 
61  inline uint8_t id() const { return m_id; }
62 
64  static worker* get() { return m_worker_ptr; }
65 
66  inline std::string get_string() const {
67  std::ostringstream os;
68  os << "worker(" << this << "," << (unsigned int)m_id << ")";
69  return os.str();
70  }
71 
73  inline struct ev_loop* loop_ptr() const {
74  struct ev_loop* loop = nullptr;
75  switch (dispatcher::run_mode()) {
77  assert(nullptr != m_loop);
78  loop = m_loop->ptr;
79  break;
80  default:
81  loop = ev_default_loop(0);
82  }
83  return loop;
84  }
85 
90  if (m_leader && this == worker::get()) {
91  // The worker is running an event loop and we are running in the workers thread context, now execute the
92  // functor.
93  f(m_loop->ptr);
94  return true;
95  } else {
96  async_call(f);
97  return false;
98  }
99  }
100 
102  inline bool signal_call(task_func_t f) {
103  return exec_in_worker_ctx(f);
104  }
105 
107  inline void async_call(task_func_t f) {
109  {
110  std::lock_guard<std::mutex> lock(tfq->mutex);
111  tfq->queue.push(f);
112  }
113  ev_async_send(loop_ptr(), &m_signal_watcher);
114  }
115 
117  inline void set_event_loop(std::unique_ptr<loop_t>& loop) {
118  m_loop = std::move(loop);
119  m_leader = true;
120  ev_set_userdata(m_loop->ptr, this);
121  m_work_cond.notify_one();
122  }
123 
125  inline void terminate() {
126  tdbg(get_string() << ": waiting to terminate thread" << std::endl);
127  m_term = true;
128  m_work_cond.notify_one();
129  if (m_leader) {
130  // interrupt the event loop
131  ev_async_send(loop_ptr(), &m_signal_watcher);
132  }
133  m_thread->join();
134  tdbg(get_string() << ": thread done" << std::endl);
135  }
136 
138  inline void add_event(event e) { m_events_queue.push(e); }
139 
141  static void add_async_event(event e) {
142  worker* w = dispatcher::instance()->last_worker();
143  tdbg("worker: adding async event to worker " << w << std::endl);
144  w->async_call([e](struct ev_loop* loop) {
145  // get the executing worker
146  worker* worker = (tasks::worker*)ev_userdata(loop);
147  worker->add_event(e);
148  });
149  }
150 
152  void handle_io_event(ev_io* watcher, int revents);
153 
155  void handle_timer_event(ev_timer* watcher);
156 
158  inline uint64_t events_count() const { return m_events_count; }
159 
160 #if ENABLE_ADD_TIME == 1
161  inline void add_time(uint64_t idx, uint64_t t) {
176  m_time_total[idx] += t;
177  m_time_count[idx]++;
178  if (m_time_count[idx] == 5000) {
179  std::cout << get_string() << " time(" << idx << "): avg " << (double)m_time_total[idx] / 5000 << " micros"
180  << std::endl;
181  m_time_total[idx] = 0;
182  m_time_count[idx] = 0;
183  }
184  }
185 #endif
186 
187  private:
188 #ifndef __clang__
189  thread_local static worker* m_worker_ptr;
190 #else
191  __thread static worker* m_worker_ptr;
192 #endif
193  uint8_t m_id;
194  uint64_t m_events_count = 0;
195  std::unique_ptr<loop_t> m_loop;
196  std::atomic<bool> m_term;
197  std::atomic<bool> m_leader;
198  std::mutex m_work_mutex;
199  std::condition_variable m_work_cond;
200  std::queue<event> m_events_queue;
201 
202 #if ENABLE_ADD_TIME == 1
203  uint64_t m_time_total[ADD_TIME_BUCKETS];
204  uint64_t m_time_count[ADD_TIME_BUCKETS];
205 #endif
206 
210 
211  std::unique_ptr<std::thread> m_thread;
212 
214  inline void promote_leader() {
216  std::shared_ptr<worker> w = dispatcher::instance()->free_worker();
217  if (nullptr != w) {
218  // If we find a free worker, we promote it to the next
219  // leader. This thread stays leader otherwise.
220  m_leader = false;
221  w->set_event_loop(m_loop);
222  }
223  }
224  }
225 
227  inline void mark_free() {
229  dispatcher::instance()->add_free_worker(id());
230  }
231  }
232 
234  void run();
235 };
236 
237 /* CALLBACKS */
238 
240 template <typename EV_t>
241 void tasks_event_callback(struct ev_loop* loop, EV_t w, int e) {
242  worker* worker = (tasks::worker*)ev_userdata(loop);
243  assert(nullptr != worker);
244  event_task* task = (tasks::event_task*)w->data;
245  task->stop_watcher(worker);
246  event event = {task, e};
247  worker->add_event(event);
248 }
249 
251 void tasks_async_callback(struct ev_loop* loop, ev_async* w, int events);
252 
253 } // tasks
254 
255 #endif // _TASKS_WORKER_H_
static std::shared_ptr< dispatcher > instance()
Definition: dispatcher.h:75
void async_call(task_func_t f)
Put a functor into the async work queue of a worker and notify it.
Definition: worker.h:107
The base class for any task.
Definition: task.h:19
std::function< void(struct ev_loop *)> task_func_t
Definition: event_task.h:19
void tasks_async_callback(struct ev_loop *loop, ev_async *w, int events)
Callback for async events.
Definition: worker.cpp:106
static mode run_mode()
Definition: dispatcher.h:73
std::mutex mutex
Definition: worker.h:43
#define tdbg(m)
Definition: logging.h:54
std::condition_variable m_work_cond
Definition: worker.h:199
static void add_async_event(event e)
Add an event to the workers queue from a different thread context.
Definition: worker.h:141
bool signal_call(task_func_t f)
Same as exec_in_ctx(task_func_t f)
Definition: worker.h:102
static worker * get()
Provide access to the executing worker thread object in the current thread context.
Definition: worker.h:64
uint8_t m_id
A thread local pointer to the worker thread.
Definition: worker.h:193
uint8_t id() const
Return the worker id.
Definition: worker.h:61
ev_async m_signal_watcher
Definition: worker.h:209
void terminate()
Terminate the worker and wait for it to finish.
Definition: worker.h:125
std::atomic< bool > m_leader
Definition: worker.h:197
std::queue< task_func_t > queue
Definition: worker.h:42
std::queue< event > m_events_queue
Definition: worker.h:200
std::atomic< bool > m_term
Definition: worker.h:196
void set_event_loop(std::unique_ptr< loop_t > &loop)
Pass the event loop to the worker.
Definition: worker.h:117
static thread_local worker * m_worker_ptr
Definition: worker.h:189
Needed to use std::unique_ptr<>
Definition: worker.h:35
std::mutex m_work_mutex
Definition: worker.h:198
std::unique_ptr< loop_t > m_loop
Definition: worker.h:195
bool exec_in_worker_ctx(task_func_t f)
Definition: worker.h:89
void mark_free()
Let the dispatcher know that this thread is free to become a leader and take on some work...
Definition: worker.h:227
loop_t(struct ev_loop *p)
Definition: worker.h:38
int revents
Definition: worker.h:52
std::string get_string() const
Definition: worker.h:66
#define ADD_TIME_BUCKETS
Definition: worker.h:28
void add_event(event e)
Add an event to the workers queue.
Definition: worker.h:138
std::unique_ptr< std::thread > m_thread
Definition: worker.h:211
virtual void stop_watcher(worker *worker)=0
Deactivate the underlying watcher.
tasks::event_task * task
Definition: worker.h:51
struct ev_loop * ptr
Definition: worker.h:36
worker(uint8_t id, std::unique_ptr< loop_t > &loop)
Definition: worker.cpp:19
void tasks_event_callback(struct ev_loop *loop, EV_t w, int e)
Callback for I/O events.
Definition: worker.h:241
virtual ~worker()
Definition: worker.cpp:38
void promote_leader()
Find a free worker and promote it to become the next leader.
Definition: worker.h:214
uint64_t events_count() const
Return the number of events the worker has handled until now.
Definition: worker.h:158
struct ev_loop * loop_ptr() const
Return the event loop pointer for this worker.
Definition: worker.h:73