libtasks Documentation  1.6
test_uwsgi_thrift_async.cpp
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2013-2014 ADTECH GmbH
3  * Licensed under MIT (https://github.com/adtechlabs/libtasks/blob/master/COPYING)
4  *
5  * Author: Andreas Pohl
6  */
7 
8 #include <arpa/inet.h>
9 #include <csignal>
10 #include <thrift/protocol/TBinaryProtocol.h>
11 #include <thrift/transport/THttpClient.h>
12 #include <thrift/transport/TSocket.h>
13 #include <boost/shared_ptr.hpp>
14 
15 #include <tasks/net/acceptor.h>
17 #include <tasks/logging.h>
18 #include <tasks/exec.h>
19 
21 
22 std::atomic<bool> g_finished(false);
23 
24 void ip_service_async1::service(std::shared_ptr<args_t> args) {
25  key_value_type kv;
26  id_name_type val;
27  if (args->ipv4 == 123456789) {
28  kv.key.id = 1;
29  kv.key.name = "city";
30  val.id = 123456;
31  val.name = "Berlin";
32  kv.values.push_back(val);
33  result().key_values.push_back(kv);
34  kv.values.clear();
35 
36  kv.key.id = 2;
37  kv.key.name = "country";
38  val.id = 3345677;
39  val.name = "Germany";
40  kv.values.push_back(val);
41  result().key_values.push_back(kv);
42  // it could take some time to finish, so sleep a bit
43  std::this_thread::sleep_for(std::chrono::milliseconds(1000));
44  } else {
45  set_error("wrong ip address");
46  }
47  finish();
48 }
49 
50 void ip_service_async2::service(std::shared_ptr<args_t> args) {
51  tdbg("ip_service_async2::service(" << this << "): entered" << std::endl);
52  // run in a different thread
53  tasks::exec([this, args] {
54  key_value_type kv;
55  id_name_type val;
56  if (args->ipv4 == 123456789) {
57  kv.key.id = 1;
58  kv.key.name = "city";
59  val.id = 123456;
60  val.name = "Berlin";
61  kv.values.push_back(val);
62  result().key_values.push_back(kv);
63  kv.values.clear();
64 
65  kv.key.id = 2;
66  kv.key.name = "country";
67  val.id = 3345677;
68  val.name = "Germany";
69  kv.values.push_back(val);
70  result().key_values.push_back(kv);
71  // it could take some time to finish, so sleep a bit
72  std::this_thread::sleep_for(std::chrono::milliseconds(1000));
73  } else {
74  set_error("wrong ip address");
75  }
76  finish();
77  g_finished = true;
78  tdbg("ip_service_async2::service(" << this << "): finished" << std::endl);
79  });
80 }
81 
85  request("/test2");
86 }
87 
91  request("/test3");
92 }
93 
97 
98  using namespace apache::thrift::protocol;
99  using namespace apache::thrift::transport;
100  boost::shared_ptr<THttpClient> transport(new THttpClient("Localhost", 18080, "/test4"));
101  boost::shared_ptr<TBinaryProtocol> protocol(new TBinaryProtocol(transport));
102  IpServiceClient client(protocol);
103 
104  // set a low receive timeout
105  struct Socket : THttpClient {
106  // can access protected member
107  static boost::shared_ptr<TTransport> get(THttpClient* x) { return x->*(&Socket::transport_); }
108  };
109  boost::shared_ptr<TSocket> tsock = boost::dynamic_pointer_cast<TSocket>(Socket::get(transport.get()));
110 
111  tsock->setRecvTimeout(100);
112  g_finished = false;
113  try {
114  transport->open();
115 
116  int32_t ip = 123456789;
117  ipv6_type ipv6;
118  response_type r;
119  client.lookup(r, ip, ipv6);
120 
121  CPPUNIT_ASSERT_MESSAGE("TTransportException expected", false);
122  } catch (TTransportException& e) {
123  CPPUNIT_ASSERT_MESSAGE(e.what(), std::string(e.what()).find("timed out") != std::string::npos);
124  transport->close();
125  }
126 
127  while (!g_finished) {
128  std::this_thread::sleep_for(std::chrono::seconds(1));
129  }
130  std::this_thread::sleep_for(std::chrono::seconds(1));
131 
132  tsock->setRecvTimeout(800);
133  g_finished = false;
134  try {
135  transport->open();
136 
137  int32_t ip = 123456789;
138  ipv6_type ipv6;
139  response_type r;
140  client.lookup(r, ip, ipv6);
141 
142  CPPUNIT_ASSERT_MESSAGE("TTransportException expected", false);
143  } catch (TTransportException& e) {
144  CPPUNIT_ASSERT_MESSAGE(e.what(), std::string(e.what()).find("timed out") != std::string::npos);
145  transport->close();
146  }
147 
148  while (!g_finished) {
149  std::this_thread::sleep_for(std::chrono::seconds(1));
150  }
151  std::this_thread::sleep_for(std::chrono::seconds(1));
152 }
153 
154 void test_uwsgi_thrift_async::request(std::string url) {
155  using namespace apache::thrift::protocol;
156  using namespace apache::thrift::transport;
157  boost::shared_ptr<THttpClient> transport(new THttpClient("Localhost", 18080, url));
158  boost::shared_ptr<TBinaryProtocol> protocol(new TBinaryProtocol(transport));
159  IpServiceClient client(protocol);
160 
161  try {
162  transport->open();
163 
164  int32_t ip = 123456789;
165  ipv6_type ipv6;
166  response_type r;
167  client.lookup(r, ip, ipv6);
168 
169  CPPUNIT_ASSERT(r.key_values.size() == 2);
170  CPPUNIT_ASSERT(r.key_values[0].key.id == 1);
171  CPPUNIT_ASSERT(r.key_values[0].key.name == "city");
172  CPPUNIT_ASSERT(r.key_values[0].values.size() == 1);
173  CPPUNIT_ASSERT(r.key_values[0].values[0].id == 123456);
174  CPPUNIT_ASSERT(r.key_values[0].values[0].name == "Berlin");
175  CPPUNIT_ASSERT(r.key_values[1].key.id == 2);
176  CPPUNIT_ASSERT(r.key_values[1].key.name == "country");
177  CPPUNIT_ASSERT(r.key_values[1].values.size() == 1);
178  CPPUNIT_ASSERT(r.key_values[1].values[0].id == 3345677);
179  CPPUNIT_ASSERT(r.key_values[1].values[0].name == "Germany");
180 
181  transport->close();
182  } catch (TTransportException& e) {
183  CPPUNIT_ASSERT_MESSAGE(std::string("TTransportException: ") + e.what(), false);
184  }
185 
186  try {
187  transport->open();
188 
189  int32_t ip = 0;
190  ipv6_type ipv6;
191  response_type r;
192  client.lookup(r, ip, ipv6);
193 
194  transport->close();
195 
196  CPPUNIT_ASSERT_MESSAGE("TTransportException expected", false);
197  } catch (TTransportException& e) {
198  CPPUNIT_ASSERT(e.what() == std::string("Bad Status: HTTP/1.1"));
199  }
200 }
std::unique_ptr< net_io_task > m_srv3
static void add_task(net_io_task *task)
Definition: net_io_task.cpp:40
auto result() -> decltype((m_result.success))
Return a reference to the thrift result object. A thrift handler uses this to return data...
#define tdbg(m)
Definition: logging.h:54
void service(std::shared_ptr< args_t > args)
std::unique_ptr< net_io_task > m_srv2
void service(std::shared_ptr< args_t > args)
std::atomic< bool > g_finished(false)
void exec(exec_task::func_t f)
Execute code in a separate executor thread.
Definition: exec.cpp:14
std::unique_ptr< net_io_task > m_srv1
void finish()
Async handlers call this method to trigger the processor callback.