libtasks Documentation  1.6
test_disk_io_task.cpp
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2013-2014 ADTECH GmbH
3  * Licensed under MIT (https://github.com/adtechlabs/libtasks/blob/master/COPYING)
4  *
5  * Author: Andreas Pohl
6  */
7 
8 #include <test_disk_io_task.h>
9 #include <tasks/dispatcher.h>
10 #include <tasks/tools/buffer.h>
11 #include <tasks/disk_io_task.h>
12 #include <ostream>
13 #include <fcntl.h>
14 
17  std::ostream os(&buf);
18  os << "test1"
19  << "test2" << std::endl << "test3" << 12345 << std::endl;
20  buf.write("test4\ntest5\n", 12);
21 
22  std::atomic<uint16_t> count(m_total);
23 
24  for (int i = 0; i < m_total; i++) {
25  std::string fname = "/tmp/disk_io_test" + std::to_string(i);
26  int fd = open(fname.c_str(), O_WRONLY | O_CREAT, S_IRUSR | S_IWUSR);
27  CPPUNIT_ASSERT(fd > -1);
28 
29  tasks::disk_io_task* task = new tasks::disk_io_task(fd, EV_WRITE, &buf);
30  task->on_finish([this, task, fd, &buf, &count] {
31  close(fd);
32  // check the return of write()
33  CPPUNIT_ASSERT(task->bytes() == static_cast<std::streamsize>(buf.size()));
34  CPPUNIT_ASSERT(task->bytes() == 34);
35  count--;
36  });
37 
39  }
40 
41  while (count > 0) {
42  std::this_thread::sleep_for(std::chrono::milliseconds(1000));
43  }
44 }
45 
47  std::atomic<uint16_t> count(m_total);
48 
49  // test future
50  std::string fname = "/tmp/disk_io_test0";
51  int fd = open(fname.c_str(), O_RDONLY);
52  CPPUNIT_ASSERT(fd > -1);
54  tasks::disk_io_task* task = new tasks::disk_io_task(fd, EV_READ, buf);
55  auto future_bytes = tasks::disk_io_task::add_task(task);
56  CPPUNIT_ASSERT(future_bytes.get() == 34);
57  close(fd);
58  delete buf;
59 
60  for (int i = 0; i < m_total; i++) {
61  std::string fname = "/tmp/disk_io_test" + std::to_string(i);
62  int fd = open(fname.c_str(), O_RDONLY);
63  CPPUNIT_ASSERT(fd > -1);
64 
66  tasks::disk_io_task* task = new tasks::disk_io_task(fd, EV_READ, buf);
67  task->on_finish([this, task, fd, buf, &count, fname] {
68  close(fd);
69  // check the return of read()
70  CPPUNIT_ASSERT(task->bytes() == 34);
71  delete buf;
72  unlink(fname.c_str());
73  count--;
74  });
75 
77  }
78 
79  while (count > 0) {
80  std::this_thread::sleep_for(std::chrono::milliseconds(1000));
81  }
82 }
static std::shared_future< std::streamsize > add_task(disk_io_task *task)
Definition: disk_io_task.h:41
std::size_t size() const
Definition: buffer.h:70
void on_finish(finish_func_worker_t f)
Definition: task.h:56
std::streamsize bytes() const
Definition: disk_io_task.h:32
std::streamsize write(const char_type *data, std::streamsize size)
Definition: buffer.h:91