libtasks Documentation  1.6
worker.cpp
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2013-2014 ADTECH GmbH
3  * Licensed under MIT (https://github.com/adtechlabs/libtasks/blob/master/COPYING)
4  *
5  * Author: Andreas Pohl
6  */
7 
8 #include <tasks/worker.h>
9 #include <chrono>
10 
11 namespace tasks {
12 
13 #ifndef __clang__
14 thread_local worker* worker::m_worker_ptr = nullptr;
15 #else
16 __thread worker* worker::m_worker_ptr = nullptr;
17 #endif
18 
19 worker::worker(uint8_t id, std::unique_ptr<loop_t>& loop) : m_id(id), m_term(false), m_leader(false) {
20  // Initialize and add the threads async watcher
21  ev_async_init(&m_signal_watcher, tasks_async_callback);
23 
24  assert(dispatcher::mode::SINGLE_LOOP == dispatcher::run_mode() || nullptr != loop);
25  struct ev_loop* loop_raw = nullptr;
26  if (nullptr != loop) {
27  m_loop = std::move(loop);
28  m_leader = true;
29  ev_set_userdata(m_loop->ptr, this);
30  loop_raw = m_loop->ptr;
31  } else {
32  loop_raw = ev_default_loop(0);
33  }
34  ev_async_start(loop_raw, &m_signal_watcher);
35  m_thread.reset(new std::thread(&worker::run, this));
36 }
37 
39  tdbg(get_string() << ": dtor" << std::endl);
41  delete tfq;
42 }
43 
44 void worker::run() {
45  m_worker_ptr = this;
46  mark_free();
47 
48  while (!m_term) {
49  // Wait until promoted to the leader thread
50  if (!m_leader) {
51  tdbg(get_string() << ": waiting..." << std::endl);
52  std::unique_lock<std::mutex> lock(m_work_mutex);
53  // Use wait_for to check the term flag
54  while (m_work_cond.wait_for(lock, std::chrono::milliseconds(100)) == std::cv_status::timeout && !m_leader &&
55  !m_term) {
56  }
57  }
58 
59  // Became leader, so execute the event loop
60  while (m_leader && !m_term) {
61  tdbg(get_string() << ": running event loop" << std::endl);
62  ev_loop(m_loop->ptr, EVLOOP_ONESHOT);
63  tdbg(get_string() << ": event loop returned" << std::endl);
64  // Check if events got fired
65  if (!m_events_queue.empty()) {
66  tdbg(get_string() << ": executing events" << std::endl);
67  // Now promote the next leader and call the event
68  // handlers
70  // Handle events
71  while (!m_events_queue.empty()) {
73  event event = m_events_queue.front();
74  bool cont = event.task->handle_event(this, event.revents);
75  // Trigger the error callbacks if needed.
76  if (event.task->error()) {
77  event.task->notify_error(this);
78  }
79  // If the handler wants to continue to run the task we activate the watcher again. Otherwise we
80  // delete it if it has auto deletion activated.
81  if (cont) {
82  event.task->reset_error();
83  event.task->start_watcher(this);
84  } else {
85  if (event.task->auto_delete()) {
86  event.task->finish(this);
87  }
88  }
89  m_events_queue.pop();
90  }
91  }
92  }
93 
94  if (!m_term) {
95  mark_free();
96  } else {
97  // Shutdown, the leader terminates the loop
98  if (m_leader) {
99  ev_unloop(m_loop->ptr, EVUNLOOP_ALL);
100  ev_loop_destroy(m_loop->ptr);
101  }
102  }
103  }
104 }
105 
106 void tasks_async_callback(struct ev_loop* loop, ev_async* w, int /* events */) {
107  worker* worker = (tasks::worker*)ev_userdata(loop);
108  assert(nullptr != worker);
110  if (nullptr != tfq) {
111  std::queue<task_func_t> qcpy;
112  {
113  std::lock_guard<std::mutex> lock(tfq->mutex);
114  tfq->queue.swap(qcpy);
115  }
116  // Execute all queued functors
117  while (!qcpy.empty()) {
118  assert(worker->exec_in_worker_ctx(qcpy.front()));
119  qcpy.pop();
120  }
121  }
122 }
123 
124 } // tasks
void tasks_async_callback(struct ev_loop *loop, ev_async *w, int events)
Callback for async events.
Definition: worker.cpp:106
static mode run_mode()
Definition: dispatcher.h:73
std::mutex mutex
Definition: worker.h:43
uint64_t m_events_count
Definition: worker.h:194
#define tdbg(m)
Definition: logging.h:54
bool error() const
Return true if an error occured.
Definition: error_base.h:24
std::condition_variable m_work_cond
Definition: worker.h:199
ev_async m_signal_watcher
Definition: worker.h:209
bool auto_delete() const
Returns true if auto deletion is active.
Definition: task.h:46
std::atomic< bool > m_leader
Definition: worker.h:197
std::queue< task_func_t > queue
Definition: worker.h:42
std::queue< event > m_events_queue
Definition: worker.h:200
std::atomic< bool > m_term
Definition: worker.h:196
static thread_local worker * m_worker_ptr
Definition: worker.h:189
std::mutex m_work_mutex
Definition: worker.h:198
std::unique_ptr< loop_t > m_loop
Definition: worker.h:195
bool exec_in_worker_ctx(task_func_t f)
Definition: worker.h:89
void mark_free()
Let the dispatcher know that this thread is free to become a leader and take on some work...
Definition: worker.h:227
int revents
Definition: worker.h:52
std::string get_string() const
Definition: worker.h:66
void run()
Main method of the thread.
Definition: worker.cpp:44
std::unique_ptr< std::thread > m_thread
Definition: worker.h:211
tasks::event_task * task
Definition: worker.h:51
worker(uint8_t id, std::unique_ptr< loop_t > &loop)
Definition: worker.cpp:19
virtual ~worker()
Definition: worker.cpp:38
void promote_leader()
Find a free worker and promote it to become the next leader.
Definition: worker.h:214