libtasks Documentation  1.6
disk_io_task.cpp
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2013-2014 ADTECH GmbH
3  * Licensed under MIT (https://github.com/adtechlabs/libtasks/blob/master/COPYING)
4  *
5  * Author: Andreas Pohl
6  */
7 
8 #include <tasks/worker.h>
9 #include <tasks/disk_io_task.h>
10 #include <tasks/logging.h>
11 #include <unistd.h>
12 
13 namespace tasks {
14 
15 disk_io_task::disk_io_task(int fd, int events, tools::buffer* buf) : m_fd(fd), m_events(events), m_buf(buf) {
16  tdbg(get_string() << ": ctor" << std::endl);
17 }
18 
19 disk_io_task::~disk_io_task() { tdbg(get_string() << ": dtor" << std::endl); }
20 
21 std::shared_future<std::streamsize> disk_io_task::op() {
22  // run the io op in a separate thread
23  std::promise<std::streamsize> bytes_promise;
24  std::async(std::launch::async, [this, &bytes_promise] {
25  switch (m_events) {
26  case EV_READ:
27  tdbg(get_string() << ": calling read()" << std::endl);
28  m_bytes = read(m_fd, m_buf->ptr_write(), m_buf->to_write());
29  tdbg(get_string() << ": read() returned " << m_bytes << std::endl);
30  if (m_bytes > 0) {
32  }
33  break;
34  case EV_WRITE:
35  tdbg(get_string() << ": calling write()" << std::endl);
36  m_bytes = write(m_fd, m_buf->ptr_read(), m_buf->to_read());
37  tdbg(get_string() << ": write() returned " << m_bytes << std::endl);
38  break;
39  default:
40  m_bytes = -1;
42  get_string() + std::string(": events has to be either EV_READ or EV_WRITE"));
43  set_exception(e);
44  }
45  bytes_promise.set_value(m_bytes);
46  // fire an event
47  event e = {this, m_events};
49  });
50  return bytes_promise.get_future();
51 }
52 
53 bool disk_io_task::handle_event(worker* /* worker */, int /* events */) {
54  tdbg(get_string() << ": handle_event" << std::endl);
55  return false;
56 }
57 
59  if (nullptr == worker) {
60  worker = dispatcher::instance()->get_worker_by_task(this);
61  }
62  worker->exec_in_worker_ctx([this](struct ev_loop* /* loop */) {
63  tdbg(get_string() << ": disposing disk_io_task" << std::endl);
64  delete this;
65  });
66 }
67 
68 } // tasks
static std::shared_ptr< dispatcher > instance()
Definition: dispatcher.h:75
char * ptr_read()
Definition: buffer.h:32
std::streamsize m_bytes
Definition: disk_io_task.h:50
tools::buffer * m_buf
Definition: disk_io_task.h:49
#define tdbg(m)
Definition: logging.h:54
std::string get_string() const
Definition: disk_io_task.h:26
static void add_async_event(event e)
Add an event to the workers queue from a different thread context.
Definition: worker.h:141
void move_ptr_write(std::size_t s)
Definition: buffer.h:58
char * ptr_write()
Definition: buffer.h:30
Tasks execption class.
bool exec_in_worker_ctx(task_func_t f)
Definition: worker.h:89
std::streamsize to_read() const
Definition: buffer.h:68
virtual bool handle_event(worker *worker, int events)
virtual void dispose(worker *worker=nullptr)
void set_exception(tasks_exception &e)
Set an exception to report an error.
Definition: error_base.h:54
std::shared_future< std::streamsize > op()
std::streamsize to_write() const
Definition: buffer.h:66
disk_io_task(int fd, int events, tools::buffer *buf)