8 #ifndef _TASKS_WORKER_H_
9 #define _TASKS_WORKER_H_
19 #include <condition_variable>
27 #define ENABLE_ADD_TIME 0
28 #define ADD_TIME_BUCKETS 10
57 worker(uint8_t
id, std::unique_ptr<loop_t>& loop);
61 inline uint8_t
id()
const {
return m_id; }
67 std::ostringstream os;
68 os <<
"worker(" <<
this <<
"," << (
unsigned int)
m_id <<
")";
74 struct ev_loop* loop =
nullptr;
81 loop = ev_default_loop(0);
110 std::lock_guard<std::mutex> lock(tfq->
mutex);
118 m_loop = std::move(loop);
120 ev_set_userdata(m_loop->ptr,
this);
121 m_work_cond.notify_one();
126 tdbg(get_string() <<
": waiting to terminate thread" << std::endl);
128 m_work_cond.notify_one();
131 ev_async_send(loop_ptr(), &m_signal_watcher);
134 tdbg(get_string() <<
": thread done" << std::endl);
143 tdbg(
"worker: adding async event to worker " << w << std::endl);
152 void handle_io_event(ev_io* watcher,
int revents);
155 void handle_timer_event(ev_timer* watcher);
160 #if ENABLE_ADD_TIME == 1
161 inline void add_time(uint64_t idx, uint64_t t) {
176 m_time_total[idx] += t;
178 if (m_time_count[idx] == 5000) {
179 std::cout << get_string() <<
" time(" << idx <<
"): avg " << (double)m_time_total[idx] / 5000 <<
" micros"
181 m_time_total[idx] = 0;
182 m_time_count[idx] = 0;
191 __thread
static worker* m_worker_ptr;
194 uint64_t m_events_count = 0;
202 #if ENABLE_ADD_TIME == 1
221 w->set_event_loop(m_loop);
240 template <
typename EV_t>
243 assert(
nullptr != worker);
246 event event = {task, e};
255 #endif // _TASKS_WORKER_H_
static std::shared_ptr< dispatcher > instance()
void async_call(task_func_t f)
Put a functor into the async work queue of a worker and notify it.
The base class for any task.
std::function< void(struct ev_loop *)> task_func_t
void tasks_async_callback(struct ev_loop *loop, ev_async *w, int events)
Callback for async events.
std::condition_variable m_work_cond
static void add_async_event(event e)
Add an event to the workers queue from a different thread context.
bool signal_call(task_func_t f)
Same as exec_in_ctx(task_func_t f)
static worker * get()
Provide access to the executing worker thread object in the current thread context.
uint8_t m_id
A thread local pointer to the worker thread.
uint8_t id() const
Return the worker id.
ev_async m_signal_watcher
void terminate()
Terminate the worker and wait for it to finish.
std::atomic< bool > m_leader
std::queue< task_func_t > queue
std::queue< event > m_events_queue
std::atomic< bool > m_term
void set_event_loop(std::unique_ptr< loop_t > &loop)
Pass the event loop to the worker.
static thread_local worker * m_worker_ptr
Needed to use std::unique_ptr<>
std::unique_ptr< loop_t > m_loop
bool exec_in_worker_ctx(task_func_t f)
void mark_free()
Let the dispatcher know that this thread is free to become a leader and take on some work...
loop_t(struct ev_loop *p)
std::string get_string() const
void add_event(event e)
Add an event to the workers queue.
std::unique_ptr< std::thread > m_thread
virtual void stop_watcher(worker *worker)=0
Deactivate the underlying watcher.
worker(uint8_t id, std::unique_ptr< loop_t > &loop)
void tasks_event_callback(struct ev_loop *loop, EV_t w, int e)
Callback for I/O events.
void promote_leader()
Find a free worker and promote it to become the next leader.
uint64_t events_count() const
Return the number of events the worker has handled until now.
struct ev_loop * loop_ptr() const
Return the event loop pointer for this worker.