libtasks Documentation  1.6
uwsgi_thrift_async_processor.h
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2013-2014 ADTECH GmbH
3  * Licensed under MIT (https://github.com/adtechlabs/libtasks/blob/master/COPYING)
4  *
5  * Author: Andreas Pohl
6  */
7 
8 #ifndef _UWSGI_THRIFT_ASYNC_PROCESSOR_H_
9 #define _UWSGI_THRIFT_ASYNC_PROCESSOR_H_
10 
11 #include <arpa/inet.h>
12 #include <boost/shared_ptr.hpp>
13 #include <thrift/Thrift.h>
14 #include <thrift/protocol/TBinaryProtocol.h>
15 #include <thrift/TApplicationException.h>
16 #include <unordered_set>
17 #include <future>
18 #include <sstream>
19 
20 #include <tasks/net/uwsgi_task.h>
22 #include <tasks/logging.h>
23 
24 using namespace apache::thrift;
25 using namespace apache::thrift::protocol;
26 using namespace apache::thrift::transport;
27 
28 namespace tasks {
29 namespace net {
30 
31 template <class handler_type>
33  public:
36  using protocol_type = TBinaryProtocol;
37 
38  uwsgi_thrift_async_processor(net::socket& s) : uwsgi_task(s) { tdbg(get_string() << ": ctor" << std::endl); }
39 
40  virtual ~uwsgi_thrift_async_processor() { tdbg(get_string() << ": dtor" << std::endl); }
41 
42  inline std::string get_string() const {
43  std::ostringstream os;
44  os << "uwsgi_thrift_async_processor(" << this << ")";
45  return os.str();
46  }
47 
49  virtual bool handle_request() {
50  boost::shared_ptr<in_transport_type> in_transport(new in_transport_type(request_p()));
51  boost::shared_ptr<out_transport_type> out_transport(new out_transport_type(response_p()));
52  boost::shared_ptr<protocol_type> in_protocol(new protocol_type(in_transport));
53  boost::shared_ptr<protocol_type> out_protocol(new protocol_type(out_transport));
54 
55  // Process message
56  worker* worker = worker::get();
57  m_handler.reset(new handler_type());
58  tdbg(get_string() << ": created handler " << m_handler.get() << std::endl);
59  m_handler->set_uwsgi_task(this);
60  m_handler->on_finish([this, worker, out_protocol] {
61  if (m_handler->error()) {
62  tdbg(get_string() << ": handler " << m_handler.get() << " finished with error(" << m_handler->error()
63  << "): " << m_handler->error_message() << std::endl);
64  write_thrift_error(std::string("Handler Error: ") + m_handler->error_message(),
65  m_handler->service_name(), out_protocol);
66  } else {
67  tdbg(get_string() << ": handler " << m_handler.get() << " finished with no error" << std::endl);
68  // Fill the response back in.
69  out_protocol->writeMessageBegin(m_handler->service_name(), T_REPLY, m_seqid);
70  m_handler->result_base().__isset.success = true;
71  m_handler->result_base().write(out_protocol.get());
72  out_protocol->writeMessageEnd();
73  out_protocol->getTransport()->writeEnd();
74  out_protocol->getTransport()->flush();
75  response().set_status("200 OK");
76  }
77  response().set_header("Content-Type", "application/x-thrift");
78  // Make sure we run in the context of a worker thread
79  if (error_code() == tasks_error::UNSET) {
80  worker->exec_in_worker_ctx([this](struct ev_loop*) { send_response(); });
81  }
82  // Allow cleanup now
83  enable_dispose();
84  });
85 
86  try {
87  std::string fname;
88  TMessageType mtype;
89  in_protocol->readMessageBegin(fname, mtype, m_seqid);
90  if (mtype != protocol::T_CALL && mtype != protocol::T_ONEWAY) {
91  write_thrift_error("invalid message type", m_handler->service_name(), out_protocol);
92  send_thrift_response();
93  } else if (fname != m_handler->service_name()) {
94  write_thrift_error("invalid method name", m_handler->service_name(), out_protocol);
95  send_thrift_response();
96  } else {
97  // read the args from the request
98  auto args = std::make_shared<typename handler_type::args_t>();
99  args->read(in_protocol.get());
100  in_protocol->readMessageEnd();
101  in_protocol->getTransport()->readEnd();
102  // Make sure the object is kept until the m_handler finishes
103  disable_dispose();
104  // Call the m_handler
105  tdbg(get_string() << ": calling service on handler " << m_handler.get() << std::endl);
106  m_handler->service(args);
107  }
108  } catch (TException& e) {
109  write_thrift_error(std::string("TException: ") + e.what(), m_handler->service_name(), out_protocol);
110  send_thrift_response();
111  }
112 
113  return true;
114  }
115 
117  inline void send_thrift_response() {
118  response().set_header("Content-Type", "application/x-thrift");
119  send_response();
120  }
121 
127  inline void write_thrift_error(std::string msg, std::string service_name,
128  boost::shared_ptr<protocol_type> out_protocol) {
129  response().set_header("X-UWSGI_THRIFT_ASYNC_PROCESSOR_ERROR", msg);
130  response().set_status("400 Bad Request");
131  TApplicationException ae(msg);
132  out_protocol->writeMessageBegin(service_name, T_EXCEPTION, m_seqid);
133  ae.write(out_protocol.get());
134  out_protocol->writeMessageEnd();
135  out_protocol->getTransport()->writeEnd();
136  out_protocol->getTransport()->flush();
137  }
138 
139  private:
140  int32_t m_seqid = 0;
141  std::unique_ptr<handler_type> m_handler;
142 };
143 
144 } // net
145 } // tasks
146 
147 #endif // _UWSGI_THRIFT_ASYNC_PROCESSOR_H_
void write_thrift_error(std::string msg, std::string service_name, boost::shared_ptr< protocol_type > out_protocol)
#define tdbg(m)
Definition: logging.h:54
The socket class.
Definition: socket.h:35
The base class for the uwsgi protocol implementation.
Definition: uwsgi_task.h:30
bool exec_in_worker_ctx(task_func_t f)
Definition: worker.h:89
void send_thrift_response()
Send the thrift response back to the caller.