libtasks Documentation  1.6
test_socket.cpp
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2013-2014 ADTECH GmbH
3  * Licensed under MIT (https://github.com/adtechlabs/libtasks/blob/master/COPYING)
4  *
5  * Author: Andreas Pohl
6  */
7 
8 #include <tasks/dispatcher.h>
9 #include <tasks/net_io_task.h>
10 #include <tasks/worker.h>
11 #include <tasks/net/acceptor.h>
12 #include <tasks/logging.h>
13 #include <string.h>
14 #include <unistd.h>
15 
16 #include "test_socket.h"
17 
18 bool echo_handler::handle_event(tasks::worker* worker, int events) {
19  if (events & EV_READ) {
20  try {
21  std::vector<char> buf(1024);
22  std::size_t bytes = socket().read(&buf[0], buf.size());
23  buf.resize(bytes);
24  m_write_queue.push(std::move(buf));
25  } catch (tasks::net::socket_exception& e) {
26  return false;
27  }
28  }
29  if (events & EV_WRITE) {
30  if (!m_write_queue.empty()) {
31  std::vector<char>& buf = m_write_queue.front();
32  try {
33  std::size_t len = buf.size() - m_write_offset;
34  std::size_t bytes = socket().write(&buf[m_write_offset], len);
35  if (bytes == len) {
36  // buffer send completely
37  m_write_queue.pop();
38  m_write_offset = 0;
39  } else {
40  m_write_offset += bytes;
41  }
42  } catch (tasks::net::socket_exception& e) {
43  return false;
44  }
45  }
46  }
47  if (m_write_queue.empty()) {
48  set_events(EV_READ);
49  update_watcher(worker);
50  } else {
51  set_events(EV_READ | EV_WRITE);
52  update_watcher(worker);
53  }
54  return true;
55 }
56 
58  int port = 22334;
59 
60  // create acceptor
61  auto srv = new tasks::net::acceptor<echo_handler>(port);
62  tasks::dispatcher::instance()->add_event_task(srv);
63 
64  // create client
65  tasks::net::socket client0;
66  client0.set_blocking();
67  bool success = true;
68  try {
69  client0.connect("localhost", port);
70  } catch (tasks::net::socket_exception& e) {
71  success = false;
72  }
73  CPPUNIT_ASSERT(success);
74 
75  tasks::net::socket client1(client0);
76 
77  // read/write data
78  std::string data = "test123456789";
79  std::streamsize bytes = client1.write(data.c_str(), data.length());
80  CPPUNIT_ASSERT(bytes == static_cast<std::streamsize>(data.length()));
81  std::vector<char> buf(1024);
82  bytes = client1.read(&buf[0], buf.size());
83  CPPUNIT_ASSERT(bytes == static_cast<std::streamsize>(data.length()));
84  CPPUNIT_ASSERT(strncmp(data.c_str(), &buf[0], data.length()) == 0);
85 
86  client1.close();
87 
88  tasks::dispatcher::instance()->remove_event_task(srv);
89 }
90 
92  int port = 22335;
93 
94  // create srv socket
96  srv->set_blocking();
97  srv->bind(port);
98 
99  // create client socket
101  clnt.set_blocking();
102 
103  // read/write data
104  std::string data = "test123456789";
105  // clnt -> srv
106  std::streamsize bytes = clnt.write(data.c_str(), data.length(), port);
107  CPPUNIT_ASSERT(bytes == static_cast<std::streamsize>(data.length()));
108  std::vector<char> buf0(1024);
109  bytes = srv->read(&buf0[0], buf0.size());
110  CPPUNIT_ASSERT(bytes == static_cast<std::streamsize>(data.length()));
111  CPPUNIT_ASSERT(strncmp(data.c_str(), &buf0[0], data.length()) == 0);
112  // srv -> clnt
113  tasks::net::socket srv1(*srv); // copy constructor
114  delete srv;
115  std::vector<char> buf1(1024);
116  bytes = srv1.write(data.c_str(), data.length());
117  CPPUNIT_ASSERT(bytes == static_cast<std::streamsize>(data.length()));
118  tasks::net::socket clnt1(clnt); // copy constructor
119  bytes = clnt1.read(&buf1[0], buf1.size());
120  CPPUNIT_ASSERT(bytes == static_cast<std::streamsize>(data.length()));
121  CPPUNIT_ASSERT(strncmp(data.c_str(), &buf1[0], data.length()) == 0);
122 
123  clnt.close();
124  srv1.close();
125 }
126 
128  std::string sockfile = "/tmp/libtasks-test.socket";
129 
130  // create acceptor
131  auto srv = new tasks::net::acceptor<echo_handler>(sockfile);
132  tasks::dispatcher::instance()->add_event_task(srv);
133 
134  std::this_thread::sleep_for(std::chrono::seconds(1));
135 
136  // create client
137  tasks::net::socket client0;
138  client0.set_blocking();
139  bool success = true;
140  try {
141  client0.connect(sockfile);
142  } catch (tasks::net::socket_exception& e) {
143  terr(e.what() << std::endl);
144  success = false;
145  }
146  CPPUNIT_ASSERT(success);
147 
148  tasks::net::socket client1(client0);
149 
150  // read/write data
151  std::string data = "test123456789";
152  std::streamsize bytes = client1.write(data.c_str(), data.length());
153  CPPUNIT_ASSERT(bytes == static_cast<std::streamsize>(data.length()));
154  std::vector<char> buf(1024);
155  bytes = client1.read(&buf[0], buf.size());
156  CPPUNIT_ASSERT(bytes == static_cast<std::streamsize>(data.length()));
157  CPPUNIT_ASSERT(strncmp(data.c_str(), &buf[0], data.length()) == 0);
158 
159  client1.close();
160 
161  tasks::dispatcher::instance()->remove_event_task(srv);
162 
163  unlink(sockfile.c_str());
164 }
static std::shared_ptr< dispatcher > instance()
Definition: dispatcher.h:75
void bind(int port, std::string ip="")
Definition: socket.cpp:86
std::streamsize write(const char *data, std::size_t len, int port=-1, std::string ip="")
Definition: socket.cpp:225
virtual void close()
Definition: io_base.h:24
#define terr(m)
Definition: logging.h:57
bool handle_event(tasks::worker *worker, int revents)
Definition: echoserver.cpp:28
The socket class.
Definition: socket.h:35
std::streamsize read(char *data, std::size_t len)
Definition: socket.cpp:251
Tasks execption class.
virtual void update_watcher(worker *worker)
Udate a watcher in the context of the given worker.
net::socket & socket()
Provide access to the underlying socket object.
Definition: net_io_task.h:35
const char * what() const noexcept
Return the error message.
std::queue< std::vector< char > > m_write_queue
Definition: echoserver.h:46
ssize_t m_write_offset
Definition: echoserver.h:47
void set_blocking()
Set the socket to blocking mode.
Definition: socket.h:61
void connect(std::string path)
Definition: socket.cpp:147
void set_events(int events)
Update the events the object monitors.