libtasks Documentation  1.6
echoserver.cpp
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2013-2014 ADTECH GmbH
3  * Licensed under MIT (https://github.com/adtechlabs/libtasks/blob/master/COPYING)
4  *
5  * Author: Andreas Pohl
6  */
7 
8 #include <vector>
9 
10 #include <tasks/dispatcher.h>
11 #include <tasks/logging.h>
12 #include <tasks/ev_wrapper.h>
13 #include <tasks/net/acceptor.h>
14 #include <tasks/net/socket.h>
15 
16 #include <echoserver.h>
17 
18 #ifdef PROFILER
19 #include <google/profiler.h>
20 #endif
21 
22 using namespace tasks;
23 using namespace tasks::net;
24 
25 std::atomic<int> stats::m_req_count;
26 std::atomic<int> stats::m_clients;
27 
29  if (events & EV_READ) {
30  try {
31  std::vector<char> buf(1024);
32  std::size_t bytes = socket().read(&buf[0], buf.size());
33  tdbg("echo_handler: read " << bytes << " bytes" << std::endl);
34  buf.resize(bytes);
35  m_write_queue.push(std::move(buf));
36  } catch (socket_exception& e) {
37  terr("echo_handler::handle_event: " << e.what() << std::endl);
38  return false;
39  }
40  }
41  if (events & EV_WRITE) {
42  if (!m_write_queue.empty()) {
43  std::vector<char>& buf = m_write_queue.front();
44  try {
45  std::size_t len = buf.size() - m_write_offset;
46  std::size_t bytes = socket().write(&buf[m_write_offset], len);
47  tdbg("echo_handler: wrote " << bytes << " bytes" << std::endl);
48  if (bytes == len) {
49  // buffer send completely
50  m_write_queue.pop();
51  m_write_offset = 0;
53  } else {
54  m_write_offset += bytes;
55  }
56  } catch (socket_exception& e) {
57  terr("echo_handler::handle_event: " << e.what() << std::endl);
58  return false;
59  }
60  }
61  }
62  if (m_write_queue.empty()) {
63  set_events(EV_READ);
64  update_watcher(worker);
65  } else {
66  set_events(EV_READ | EV_WRITE);
67  update_watcher(worker);
68  }
69  return true;
70 }
71 
73  std::time_t now = std::time(nullptr);
74  std::time_t diff = now - m_last;
75  int count;
76  count = m_req_count.exchange(0, std::memory_order_relaxed);
77  m_last = now;
78  int qps = count / diff;
79  std::cout << qps << " req/s, num of clients " << m_clients << std::endl;
80  dispatcher::instance()->print_worker_stats();
81  return true;
82 }
83 
84 void handle_signal(int sig) {
85  terr("Got signal " << sig << std::endl);
86  dispatcher::instance()->terminate();
87 }
88 
89 int main(int argc, char** argv) {
90 #ifdef PROFILER
91  ProfilerStart("echoserver.prof");
92 #endif
93  stats s;
94  acceptor<echo_handler> srv(12345);
95  // dispatcher::init_workers(1);
97  auto tasks = std::vector<task*>{&srv, &s};
98  dispatcher::instance()->run(tasks);
99 #ifdef PROFILER
100  ProfilerStop();
101 #endif
102  return 0;
103 }
static std::shared_ptr< dispatcher > instance()
Definition: dispatcher.h:75
std::streamsize write(const char *data, std::size_t len, int port=-1, std::string ip="")
Definition: socket.cpp:225
#define terr(m)
Definition: logging.h:57
#define tdbg(m)
Definition: logging.h:54
bool handle_event(tasks::worker *worker, int revents)
Definition: echoserver.cpp:28
int main(int argc, char **argv)
Definition: echoserver.cpp:89
bool handle_event(tasks::worker *, int revents)
Definition: echoserver.cpp:72
The socket class.
Definition: socket.h:35
std::streamsize read(char *data, std::size_t len)
Definition: socket.cpp:251
Tasks execption class.
const char * what() const noexcept
Return the error message.
static std::atomic< int > m_req_count
Definition: echoserver.h:32
static std::atomic< int > m_clients
Definition: echoserver.h:33
static void handle_signal(struct ev_loop *, ev_signal *sig, int)
Definition: dispatcher.cpp:280
static void inc_req()
Definition: echoserver.h:25
static void add_signal_handler(int sig, signal_func_t func)
Definition: dispatcher.cpp:287