From 08b31c8438f03b5d1eb9acb187a2bbdb8c7a241a Mon Sep 17 00:00:00 2001 From: Dmitry Kurkin Date: Sun, 29 Nov 2015 20:51:15 +0300 Subject: [PATCH 01/15] almost worked version for mac os --- cap_read.cpp | 6 +-- epoll.cpp | 85 ++++++++++++++++++---------------- epoll.h | 13 +++--- main_echo_server.cpp | 13 ------ pipe.cpp | 5 +- socket.cpp | 108 ++++++++++++++++++++----------------------- socket.h | 27 ++++++----- throw_error.cpp | 3 +- 8 files changed, 121 insertions(+), 139 deletions(-) diff --git a/cap_read.cpp b/cap_read.cpp index 0ea8942..337e327 100644 --- a/cap_read.cpp +++ b/cap_read.cpp @@ -23,9 +23,9 @@ void read(weak_file_descriptor fdc, void* data, size_t size) std::string read_string(weak_file_descriptor fd, std::size_t size) { // yes, it is slow, but it is the fastest standard compliant way I know to read data into a string - std::vector tmp(size); - read(fd, tmp.data(), size); - return std::string{tmp.begin(), tmp.end()}; + char buff[size]; + read(fd, buff, size); + return std::string{buff, size}; } size_t read_some(weak_file_descriptor fdc, void* data, size_t size) diff --git a/epoll.cpp b/epoll.cpp index 2659efc..0b5ed64 100644 --- a/epoll.cpp +++ b/epoll.cpp @@ -1,73 +1,72 @@ #include "epoll.h" -#include - -#include #include #include #include #include + #include "throw_error.h" using namespace sysapi; epoll::epoll() { - int r = ::epoll_create1(EPOLL_CLOEXEC); + int r = kqueue(); + if (r == -1) - throw_error(errno, "epoll_create1()"); + throw_error(errno, "kqueue()"); assert(r >= 0); - fd_.reset(r); + fd_ = r; } epoll::epoll(epoll&& rhs) - : fd_(std::move(rhs.fd_)) + : fd_(rhs.fd_) {} -epoll& epoll::operator=(epoll rhs) +void epoll::swap(epoll& other) { - swap(rhs); - return *this; + using std::swap; + swap(fd_, other.fd_); } -void epoll::swap(epoll& other) +epoll& epoll::operator=(epoll rhs) { - using ::swap; - swap(fd_, other.fd_); + swap(rhs); + return *this; } void epoll::run() { for (;;) { - std::array ev; - - again: - int r = ::epoll_wait(fd_.getfd(), ev.data(), ev.size(), -1); + int const ev_size = 16; + struct kevent ev[ev_size]; + + int r = kevent(fd_, NULL, 0, ev, ev_size, NULL); if (r < 0) { int err = errno; if (err == EINTR) - goto again; + continue; - throw_error(err, "epoll_wait()"); + throw_error(err, "kevent()"); } assert(r > 0); size_t num_events = static_cast(r); - assert(num_events <= ev.size()); + assert(num_events <= ev_size); - for (auto i = ev.begin(); i != ev.begin() + num_events; ++i) + for (size_t i = 0; i < r; ++i) { try { - epoll_event const& ee = *i; - static_cast(ee.data.ptr)->callback(ee.events); + struct kevent const& ee = ev[i]; + static_cast(ee.udata)->callback(ee); } catch (std::exception const& e) { @@ -81,33 +80,37 @@ void epoll::run() } } -void epoll::add(int fd, uint32_t events, epoll_registration* reg) +void epoll::add(int fd, int16_t events, epoll_registration* reg) { - epoll_event ev = {0, 0}; - ev.data.ptr = reg; - ev.events = events; + struct kevent event; + EV_SET(&event, fd, events, EV_ADD, 0, 0, reg); - int r = ::epoll_ctl(fd_.getfd(), EPOLL_CTL_ADD, fd, &ev); + int r = kevent(fd_, &event, 1, NULL, 0, NULL); if (r < 0) - throw_error(errno, "epoll_ctl(EPOLL_CTL_ADD)"); + throw_error(errno, "kevent(EV_ADD)"); } -void epoll::modify(int fd, uint32_t events, epoll_registration* reg) +void epoll::modify(int fd, int16_t events, epoll_registration* reg) { - epoll_event ev = {0, 0}; - ev.data.ptr = reg; - ev.events = events; - - int r = ::epoll_ctl(fd_.getfd(), EPOLL_CTL_MOD, fd, &ev); + struct kevent event; + EV_SET(&event, fd, events, EV_DELETE, 0, 0, reg); + kevent(fd_, &event, 1, NULL, 0, NULL); + + EV_SET(&event, fd, events, EV_ADD, 0, 0, reg); + + int r = kevent(fd_, &event, 1, NULL, 0, NULL); if (r < 0) - throw_error(errno, "epoll_ctl(EPOLL_CTL_MOD)"); + throw_error(errno, "kevent() MOD"); } -void epoll::remove(int fd) +void epoll::remove(int fd, int16_t events) { - int r = ::epoll_ctl(fd_.getfd(), EPOLL_CTL_DEL, fd, nullptr); + struct kevent event; + EV_SET(&event, fd, events, EV_DELETE, 0, 0, NULL); + + int r = kevent(fd_, &event, 1, NULL, 0, NULL); if (r < 0) - throw_error(errno, "epoll_ctl(EPOLL_CTL_DEL)"); + throw_error(errno, "kevent(EV_DELETE)"); } epoll_registration::epoll_registration() @@ -149,7 +152,7 @@ epoll_registration& epoll_registration::operator=(epoll_registration rhs) return *this; } -void epoll_registration::modify(uint32_t new_events) +void epoll_registration::modify(int16_t new_events) { assert(ep); @@ -174,7 +177,7 @@ void epoll_registration::clear() { if (ep) { - ep->remove(fd); + ep->remove(fd, events); ep = nullptr; fd = -1; events = 0; diff --git a/epoll.h b/epoll.h index 86fec85..98f6fb9 100644 --- a/epoll.h +++ b/epoll.h @@ -5,6 +5,7 @@ #include #include +#include namespace sysapi { @@ -25,19 +26,19 @@ namespace sysapi void run(); private: - void add(int fd, uint32_t events, epoll_registration*); - void modify(int fd, uint32_t events, epoll_registration*); - void remove(int fd); + void add(int fd, int16_t events, epoll_registration*); + void modify(int fd, int16_t events, epoll_registration*); + void remove(int fd, int16_t events); private: - file_descriptor fd_; + int fd_; friend struct epoll_registration; }; struct epoll_registration { - typedef std::function callback_t; + typedef std::function callback_t; epoll_registration(); epoll_registration(epoll&, int fd, uint32_t events, callback_t callback); @@ -47,7 +48,7 @@ namespace sysapi epoll_registration& operator=(epoll_registration); - void modify(uint32_t new_events); + void modify(int16_t new_events); void swap(epoll_registration& other); diff --git a/main_echo_server.cpp b/main_echo_server.cpp index 51bf4ca..bfd2c42 100644 --- a/main_echo_server.cpp +++ b/main_echo_server.cpp @@ -35,30 +35,17 @@ struct echo_server void update() { - if (end_offset == 0) - { assert(start_offset == 0); socket.set_on_read([this] { end_offset = socket.read_some(buf, sizeof buf); assert(start_offset == 0); - update(); - }); - socket.set_on_write(client_socket::on_ready_t{}); - } - else - { - assert(start_offset < end_offset); - socket.set_on_read(client_socket::on_ready_t{}); - socket.set_on_write([this] { start_offset += socket.write_some(buf + start_offset, end_offset - start_offset); if (start_offset == end_offset) { start_offset = 0; end_offset = 0; - update(); } }); - } } private: diff --git a/pipe.cpp b/pipe.cpp index a397a1b..715fc51 100644 --- a/pipe.cpp +++ b/pipe.cpp @@ -8,12 +8,13 @@ pipe_pair make_pipe(bool non_block) { int fds[2]; - int res = pipe2(fds, O_CLOEXEC | (non_block ? O_NONBLOCK : 0)); + + int res = pipe(fds); if (res != 0) { assert(res == -1); - throw_error(errno, "pipe2()"); + throw_error(0, "pipe()"); // undeclarated errno? } return pipe_pair{weak_file_descriptor{fds[0]}, weak_file_descriptor{fds[1]}}; diff --git a/socket.cpp b/socket.cpp index fe72721..280627d 100644 --- a/socket.cpp +++ b/socket.cpp @@ -5,8 +5,8 @@ #include #include "throw_error.h" -#include -#include +//#include +//#include #include "cap_write.h" #include "cap_read.h" @@ -51,14 +51,11 @@ namespace throw_error(errno, "connect()"); } - file_descriptor create_eventfd(bool semaphore) - { - int res = ::eventfd(0, (semaphore ? EFD_SEMAPHORE : 0) | EFD_CLOEXEC | EFD_NONBLOCK); - if (res == -1) - throw_error(errno, "eventfd()"); - - return file_descriptor{res}; - } +// struct kevent create_event() +// { +// struct kevent event; +// return event; +// } } client_socket::client_socket(sysapi::epoll &ep, file_descriptor fd, on_ready_t on_disconnect) @@ -68,33 +65,25 @@ client_socket::client_socket(sysapi::epoll &ep, file_descriptor fd, on_ready_t o client_socket::impl::impl(sysapi::epoll &ep, file_descriptor fd, on_ready_t on_disconnect) : ep(ep) , fd(std::move(fd)) - , reg(ep, this->fd.getfd(), 0, [this](uint32_t events) { - assert((events & ~(EPOLLIN | EPOLLOUT | EPOLLRDHUP | EPOLLERR | EPOLLHUP)) == 0); + , reg(ep, this->fd.getfd(), EVFILT_READ, [this](struct kevent event) { + assert(event.filter & EVFILT_READ); bool is_destroyed = false; assert(destroyed == nullptr); destroyed = &is_destroyed; try { - if ((events & EPOLLRDHUP) - || (events & EPOLLERR) - || (events & EPOLLHUP)) + if (event.filter & EVFILT_READ && event.flags & EV_EOF && event.data == 0) { this->on_disconnect(); if (is_destroyed) return; } - if (events & EPOLLIN) + if (event.filter & EVFILT_READ) { this->on_read_ready(); if (is_destroyed) return; } - if (events & EPOLLOUT) - { - this->on_write_ready(); - if (is_destroyed) - return; - } } catch (...) { @@ -120,11 +109,11 @@ void client_socket::set_on_read(on_ready_t on_ready) update_registration(); } -void client_socket::set_on_write(client_socket::on_ready_t on_ready) -{ - pimpl->on_write_ready = on_ready; - update_registration(); -} +//void client_socket::set_on_write(client_socket::on_ready_t on_ready) +//{ +// pimpl->on_write_ready = on_ready; +// update_registration(); +//} size_t client_socket::write_some(const void *data, size_t size) { @@ -146,16 +135,14 @@ client_socket client_socket::connect(sysapi::epoll &ep, const ipv4_endpoint &rem void client_socket::update_registration() { - pimpl->reg.modify((pimpl->on_read_ready ? EPOLLIN : 0) - | (pimpl->on_write_ready ? EPOLLOUT: 0) - | EPOLLRDHUP); + pimpl->reg.modify(EVFILT_READ); } server_socket::server_socket(epoll& ep, on_connected_t on_connected) : fd(make_socket(AF_INET, SOCK_STREAM)) , on_connected(on_connected) - , reg(ep, fd.getfd(), EPOLLIN, [this](uint32_t events) { - assert(events == EPOLLIN); + , reg(ep, fd.getfd(), EVFILT_READ, [this](struct kevent event) { + assert(event.filter & EVFILT_READ); this->on_connected(); }) { @@ -165,8 +152,8 @@ server_socket::server_socket(epoll& ep, on_connected_t on_connected) server_socket::server_socket(epoll& ep, ipv4_endpoint local_endpoint, on_connected_t on_connected) : fd(make_socket(AF_INET, SOCK_STREAM)) , on_connected(on_connected) - , reg(ep, fd.getfd(), EPOLLIN, [this](uint32_t events) { - assert(events == EPOLLIN); + , reg(ep, fd.getfd(), EVFILT_READ, [this](struct kevent event) { + assert(event.filter & EVFILT_READ); this->on_connected(); }) { @@ -187,31 +174,36 @@ ipv4_endpoint server_socket::local_endpoint() const client_socket server_socket::accept(client_socket::on_ready_t on_disconnect) const { - int res = ::accept4(fd.getfd(), nullptr, nullptr, SOCK_NONBLOCK | SOCK_CLOEXEC); + struct sockaddr addr; + socklen_t socklen = sizeof(addr); + int res = ::accept(fd.getfd(), &addr, &socklen); if (res == -1) - throw_error(errno, "accept4()"); - + throw_error(errno, "accept()"); + + const int set = 1; + ::setsockopt(res, SOL_SOCKET, SO_NOSIGPIPE, &set, sizeof(set)); // NOSIGPIPE FOR SEND + return client_socket{reg.get_epoll(), {res}, std::move(on_disconnect)}; } -eventfd::eventfd(epoll& ep, bool semaphore, on_event_t on_event) - : fd(create_eventfd(semaphore)) - , on_event(on_event) - , reg(ep, fd.getfd(), 0, [this] (uint32_t events) { - assert((events & ~EPOLLIN) == 0); - uint64_t tmp; - read(this->fd.getfd(), &tmp, sizeof tmp); - this->on_event(); - }) -{} - -void eventfd::notify(uint64_t increment) -{ - write(fd, &increment, sizeof increment); -} - -void eventfd::set_on_event(eventfd::on_event_t on_event) -{ - this->on_event = on_event; - reg.modify(on_event ? EPOLLIN : 0); -} +//eventfd::eventfd(epoll& ep, on_event_t on_event) +// : event(create_eventfd()) +// , on_event(on_event) +// , reg(ep, fd.getfd(), 0, [this] (struct kevent event) { +// assert((event.filter & EVFILT_READ) == 0); +// uint64_t tmp; +// read(this->fd.getfd(), &tmp, sizeof tmp); +// this->on_event(); +// }) +//{} +// +//void eventfd::notify(uint64_t increment) +//{ +// write(fd, &increment, sizeof increment); +//} +// +//void eventfd::set_on_event(eventfd::on_event_t on_event) +//{ +// this->on_event = on_event; +// reg.modify(on_event ? EVFILT_READ: 0); +//} diff --git a/socket.h b/socket.h index 758cda9..7a58db5 100644 --- a/socket.h +++ b/socket.h @@ -14,7 +14,7 @@ struct client_socket client_socket(epoll& ep, file_descriptor fd, on_ready_t on_disconnect); void set_on_read(on_ready_t on_ready); - void set_on_write(on_ready_t on_ready); +// void set_on_write(on_ready_t on_ready); size_t write_some(void const* data, size_t size); size_t read_some(void* data, size_t size); @@ -58,18 +58,17 @@ struct server_socket epoll_registration reg; }; -struct eventfd -{ - typedef std::function on_event_t; - - eventfd(epoll& ep, bool semaphore, on_event_t on_event); - void notify(uint64_t increment = 1); - void set_on_event(on_event_t on_event); - -private: - file_descriptor fd; - on_event_t on_event; - epoll_registration reg; -}; +//struct eventfd +//{ +// typedef std::function on_event_t; +// +// eventfd(epoll& ep, on_event_t on_event); +// void notify(uint64_t increment = 1); +// void set_on_event(on_event_t on_event); +// +//private: +// on_event_t on_event; +// epoll_registration reg; +//}; #endif // SOCKET_H diff --git a/throw_error.cpp b/throw_error.cpp index 9ef67ad..b9325c2 100644 --- a/throw_error.cpp +++ b/throw_error.cpp @@ -33,8 +33,7 @@ void throw_error [[noreturn]] (int err, char const* action) { std::stringstream ss; ss << action << " failed, error: " << error_enum_name(err); - char tmp[2048]; - char const* err_msg = strerror_r(err, tmp, sizeof tmp); + char const* err_msg = strerror(err); ss << " (" << err << ", " << err_msg << ")"; throw std::runtime_error(ss.str()); } From 1312b466bc1686e5fe73b9bfb2a79dff2c85759d Mon Sep 17 00:00:00 2001 From: Dmitry Date: Sun, 29 Nov 2015 20:55:15 +0300 Subject: [PATCH 02/15] Update socket.h --- socket.h | 13 ------------- 1 file changed, 13 deletions(-) diff --git a/socket.h b/socket.h index 7a58db5..2a4fff9 100644 --- a/socket.h +++ b/socket.h @@ -58,17 +58,4 @@ struct server_socket epoll_registration reg; }; -//struct eventfd -//{ -// typedef std::function on_event_t; -// -// eventfd(epoll& ep, on_event_t on_event); -// void notify(uint64_t increment = 1); -// void set_on_event(on_event_t on_event); -// -//private: -// on_event_t on_event; -// epoll_registration reg; -//}; - #endif // SOCKET_H From b9411d468265a78fe21dca46b24700f5f2ef6ca2 Mon Sep 17 00:00:00 2001 From: Dmitry Date: Sun, 29 Nov 2015 20:56:00 +0300 Subject: [PATCH 03/15] Update socket.cpp --- socket.cpp | 30 ------------------------------ 1 file changed, 30 deletions(-) diff --git a/socket.cpp b/socket.cpp index 280627d..d07dc79 100644 --- a/socket.cpp +++ b/socket.cpp @@ -5,8 +5,6 @@ #include #include "throw_error.h" -//#include -//#include #include "cap_write.h" #include "cap_read.h" @@ -50,12 +48,6 @@ namespace if (res == -1) throw_error(errno, "connect()"); } - -// struct kevent create_event() -// { -// struct kevent event; -// return event; -// } } client_socket::client_socket(sysapi::epoll &ep, file_descriptor fd, on_ready_t on_disconnect) @@ -185,25 +177,3 @@ client_socket server_socket::accept(client_socket::on_ready_t on_disconnect) con return client_socket{reg.get_epoll(), {res}, std::move(on_disconnect)}; } - -//eventfd::eventfd(epoll& ep, on_event_t on_event) -// : event(create_eventfd()) -// , on_event(on_event) -// , reg(ep, fd.getfd(), 0, [this] (struct kevent event) { -// assert((event.filter & EVFILT_READ) == 0); -// uint64_t tmp; -// read(this->fd.getfd(), &tmp, sizeof tmp); -// this->on_event(); -// }) -//{} -// -//void eventfd::notify(uint64_t increment) -//{ -// write(fd, &increment, sizeof increment); -//} -// -//void eventfd::set_on_event(eventfd::on_event_t on_event) -//{ -// this->on_event = on_event; -// reg.modify(on_event ? EVFILT_READ: 0); -//} From 9535c3081fec04f35b832c3d6cf72f92cf7bab5a Mon Sep 17 00:00:00 2001 From: Dmitry Kurkin Date: Sun, 29 Nov 2015 21:54:52 +0300 Subject: [PATCH 04/15] unreadable version for linux and mac --- epoll.cpp | 186 +++++++++++++++++++++++++++++++++++++++++-- epoll.h | 66 +++++++++++++++ main_echo_server.cpp | 27 +++++++ pipe.cpp | 12 ++- socket.cpp | 104 +++++++++++++++--------- socket.h | 7 +- 6 files changed, 352 insertions(+), 50 deletions(-) diff --git a/epoll.cpp b/epoll.cpp index 0b5ed64..59fab9c 100644 --- a/epoll.cpp +++ b/epoll.cpp @@ -1,17 +1,23 @@ #include "epoll.h" +#ifndef __APPLE__ +#include + +#include +#endif + #include #include #include #include - #include "throw_error.h" using namespace sysapi; epoll::epoll() { +#ifdef __APPLE__ int r = kqueue(); if (r == -1) @@ -20,28 +26,42 @@ epoll::epoll() assert(r >= 0); fd_ = r; +#else + int r = ::epoll_create1(EPOLL_CLOEXEC); + if (r == -1) + throw_error(errno, "epoll_create1()"); + + assert(r >= 0); + + fd_.reset(r); +#endif } epoll::epoll(epoll&& rhs) +#ifdef __APPLE__ : fd_(rhs.fd_) +#else + : fd_(std::move(rhs.fd_)) +#endif {} -void epoll::swap(epoll& other) -{ - using std::swap; - swap(fd_, other.fd_); -} - epoll& epoll::operator=(epoll rhs) { swap(rhs); return *this; } +void epoll::swap(epoll& other) +{ + using std::swap; + swap(fd_, other.fd_); +} + void epoll::run() { for (;;) { +#ifdef __APPLE__ int const ev_size = 16; struct kevent ev[ev_size]; @@ -77,9 +97,47 @@ void epoll::run() std::cerr << "unknown exception in message loop" << std::endl; } } +#else + std::array ev; + + again: + int r = ::epoll_wait(fd_.getfd(), ev.data(), ev.size(), -1); + + if (r < 0) + { + int err = errno; + + if (err == EINTR) + goto again; + + throw_error(err, "epoll_wait()"); + } + + assert(r > 0); + size_t num_events = static_cast(r); + assert(num_events <= ev.size()); + + for (auto i = ev.begin(); i != ev.begin() + num_events; ++i) + { + try + { + epoll_event const& ee = *i; + static_cast(ee.data.ptr)->callback(ee.events); + } + catch (std::exception const& e) + { + std::cerr << "error: " << e.what() << std::endl; + } + catch (...) + { + std::cerr << "unknown exception in message loop" << std::endl; + } + } +#endif } } +#ifdef __APPLE__ void epoll::add(int fd, int16_t events, epoll_registration* reg) { struct kevent event; @@ -194,3 +252,117 @@ void epoll_registration::update() if (ep) ep->modify(fd, events, this); } + +#else + +void epoll::add(int fd, uint32_t events, epoll_registration* reg) +{ + epoll_event ev = {0, 0}; + ev.data.ptr = reg; + ev.events = events; + + int r = ::epoll_ctl(fd_.getfd(), EPOLL_CTL_ADD, fd, &ev); + if (r < 0) + throw_error(errno, "epoll_ctl(EPOLL_CTL_ADD)"); +} + +void epoll::modify(int fd, uint32_t events, epoll_registration* reg) +{ + epoll_event ev = {0, 0}; + ev.data.ptr = reg; + ev.events = events; + + int r = ::epoll_ctl(fd_.getfd(), EPOLL_CTL_MOD, fd, &ev); + if (r < 0) + throw_error(errno, "epoll_ctl(EPOLL_CTL_MOD)"); +} + +void epoll::remove(int fd) +{ + int r = ::epoll_ctl(fd_.getfd(), EPOLL_CTL_DEL, fd, nullptr); + if (r < 0) + throw_error(errno, "epoll_ctl(EPOLL_CTL_DEL)"); +} + +epoll_registration::epoll_registration() + : ep() + , fd(-1) + , events() +{} + +epoll_registration::epoll_registration(epoll& ep, int fd, uint32_t events, callback_t callback) + : ep(&ep) + , fd(fd) + , events(events) + , callback(std::move(callback)) +{ + ep.add(fd, events, this); +} + +epoll_registration::epoll_registration(epoll_registration&& rhs) + : ep(rhs.ep) + , fd(rhs.fd) + , events(rhs.events) + , callback(std::move(rhs.callback)) +{ + update(); + rhs.ep = nullptr; + rhs.fd = -1; + rhs.events = 0; + rhs.callback = callback_t(); +} + +epoll_registration::~epoll_registration() +{ + clear(); +} + +epoll_registration& epoll_registration::operator=(epoll_registration rhs) +{ + swap(rhs); + return *this; +} + +void epoll_registration::modify(uint32_t new_events) +{ + assert(ep); + + if (events == new_events) + return; + + ep->modify(fd, new_events, this); + events = new_events; +} + +void epoll_registration::swap(epoll_registration& other) +{ + std::swap(ep, other.ep); + std::swap(fd, other.fd); + std::swap(events, other.events); + std::swap(callback, other.callback); + update(); + other.update(); +} + +void epoll_registration::clear() +{ + if (ep) + { + ep->remove(fd); + ep = nullptr; + fd = -1; + events = 0; + } +} + +epoll& epoll_registration::get_epoll() const +{ + return *ep; +} + +void epoll_registration::update() +{ + if (ep) + ep->modify(fd, events, this); +} +#endif diff --git a/epoll.h b/epoll.h index 98f6fb9..e5f4cde 100644 --- a/epoll.h +++ b/epoll.h @@ -5,6 +5,7 @@ #include #include +#ifdef __APPLE__ #include namespace sysapi @@ -68,6 +69,71 @@ namespace sysapi }; } +#else + +namespace sysapi +{ + struct epoll; + struct epoll_registration; + + struct epoll + { + typedef std::function action_t; + epoll(); + epoll(epoll const&) = delete; + epoll(epoll&&); + + epoll& operator=(epoll); + + void swap(epoll& other); + + void run(); + + private: + void add(int fd, uint32_t events, epoll_registration*); + void modify(int fd, uint32_t events, epoll_registration*); + void remove(int fd); + + private: + file_descriptor fd_; + + friend struct epoll_registration; + }; + + struct epoll_registration + { + typedef std::function callback_t; + + epoll_registration(); + epoll_registration(epoll&, int fd, uint32_t events, callback_t callback); + epoll_registration(epoll_registration const&) = delete; + epoll_registration(epoll_registration&&); + ~epoll_registration(); + + epoll_registration& operator=(epoll_registration); + + void modify(uint32_t new_events); + + void swap(epoll_registration& other); + + void clear(); + epoll& get_epoll() const; + + private: + void update(); + + private: + epoll* ep; + int fd; + uint32_t events; + callback_t callback; + + friend struct epoll; + }; +} + +#endif + using sysapi::epoll; using sysapi::epoll_registration; diff --git a/main_echo_server.cpp b/main_echo_server.cpp index bfd2c42..ac0e08d 100644 --- a/main_echo_server.cpp +++ b/main_echo_server.cpp @@ -35,17 +35,44 @@ struct echo_server void update() { +#ifdef __APPLE__ + assert(start_offset == 0); + socket.set_on_read([this] { + end_offset = socket.read_some(buf, sizeof buf); + assert(start_offset == 0); + start_offset += socket.write_some(buf + start_offset, end_offset - start_offset); + if (start_offset == end_offset) + { + start_offset = 0; + end_offset = 0; + } + }); +#else + if (end_offset == 0) + { assert(start_offset == 0); socket.set_on_read([this] { end_offset = socket.read_some(buf, sizeof buf); assert(start_offset == 0); + update(); + }); + socket.set_on_write(client_socket::on_ready_t{}); + } + else + { + assert(start_offset < end_offset); + socket.set_on_read(client_socket::on_ready_t{}); + socket.set_on_write([this] { start_offset += socket.write_some(buf + start_offset, end_offset - start_offset); if (start_offset == end_offset) { start_offset = 0; end_offset = 0; + update(); } }); + } +#endif } private: diff --git a/pipe.cpp b/pipe.cpp index 715fc51..2c9ddbd 100644 --- a/pipe.cpp +++ b/pipe.cpp @@ -8,7 +8,7 @@ pipe_pair make_pipe(bool non_block) { int fds[2]; - +#ifdef __APPLE__ int res = pipe(fds); if (res != 0) @@ -16,6 +16,14 @@ pipe_pair make_pipe(bool non_block) assert(res == -1); throw_error(0, "pipe()"); // undeclarated errno? } - +#else + int res = pipe2(fds, O_CLOEXEC | (non_block ? O_NONBLOCK : 0)); + + if (res != 0) + { + assert(res == -1); + throw_error(errno, "pipe2()"); + } +#endif return pipe_pair{weak_file_descriptor{fds[0]}, weak_file_descriptor{fds[1]}}; } diff --git a/socket.cpp b/socket.cpp index 280627d..974ed60 100644 --- a/socket.cpp +++ b/socket.cpp @@ -5,8 +5,10 @@ #include #include "throw_error.h" -//#include -//#include +#ifndef __APPLE__ +#include +#include +#endif #include "cap_write.h" #include "cap_read.h" @@ -50,12 +52,6 @@ namespace if (res == -1) throw_error(errno, "connect()"); } - -// struct kevent create_event() -// { -// struct kevent event; -// return event; -// } } client_socket::client_socket(sysapi::epoll &ep, file_descriptor fd, on_ready_t on_disconnect) @@ -65,13 +61,19 @@ client_socket::client_socket(sysapi::epoll &ep, file_descriptor fd, on_ready_t o client_socket::impl::impl(sysapi::epoll &ep, file_descriptor fd, on_ready_t on_disconnect) : ep(ep) , fd(std::move(fd)) +#ifdef __APPLE__ , reg(ep, this->fd.getfd(), EVFILT_READ, [this](struct kevent event) { assert(event.filter & EVFILT_READ); +#else + , reg(ep, this->fd.getfd(), 0, [this](uint32_t events) { + assert((events & ~(EPOLLIN | EPOLLOUT | EPOLLRDHUP | EPOLLERR | EPOLLHUP)) == 0); +#endif bool is_destroyed = false; assert(destroyed == nullptr); destroyed = &is_destroyed; try { +#ifdef __APPLE__ if (event.filter & EVFILT_READ && event.flags & EV_EOF && event.data == 0) { this->on_disconnect(); @@ -84,6 +86,29 @@ client_socket::impl::impl(sysapi::epoll &ep, file_descriptor fd, on_ready_t on_d if (is_destroyed) return; } + +#else + if ((events & EPOLLRDHUP) + || (events & EPOLLERR) + || (events & EPOLLHUP)) + { + this->on_disconnect(); + if (is_destroyed) + return; + } + if (events & EPOLLIN) + { + this->on_read_ready(); + if (is_destroyed) + return; + } + if (events & EPOLLOUT) + { + this->on_write_ready(); + if (is_destroyed) + return; + } +#endif } catch (...) { @@ -108,13 +133,13 @@ void client_socket::set_on_read(on_ready_t on_ready) pimpl->on_read_ready = on_ready; update_registration(); } - -//void client_socket::set_on_write(client_socket::on_ready_t on_ready) -//{ -// pimpl->on_write_ready = on_ready; -// update_registration(); -//} - +#ifndef __APPLE__ +void client_socket::set_on_write(client_socket::on_ready_t on_ready) +{ + pimpl->on_write_ready = on_ready; + update_registration(); +} +#endif size_t client_socket::write_some(const void *data, size_t size) { return ::write_some(pimpl->fd, data, size); @@ -135,14 +160,25 @@ client_socket client_socket::connect(sysapi::epoll &ep, const ipv4_endpoint &rem void client_socket::update_registration() { +#ifdef __APPLE__ pimpl->reg.modify(EVFILT_READ); +#else + pimpl->reg.modify((pimpl->on_read_ready ? EPOLLIN : 0) + | (pimpl->on_write_ready ? EPOLLOUT: 0) + | EPOLLRDHUP); +#endif } server_socket::server_socket(epoll& ep, on_connected_t on_connected) : fd(make_socket(AF_INET, SOCK_STREAM)) , on_connected(on_connected) +#ifdef __APPLE__ , reg(ep, fd.getfd(), EVFILT_READ, [this](struct kevent event) { assert(event.filter & EVFILT_READ); +#else + , reg(ep, fd.getfd(), EPOLLIN, [this](uint32_t events) { + assert(events == EPOLLIN); +#endif this->on_connected(); }) { @@ -152,8 +188,13 @@ server_socket::server_socket(epoll& ep, on_connected_t on_connected) server_socket::server_socket(epoll& ep, ipv4_endpoint local_endpoint, on_connected_t on_connected) : fd(make_socket(AF_INET, SOCK_STREAM)) , on_connected(on_connected) - , reg(ep, fd.getfd(), EVFILT_READ, [this](struct kevent event) { - assert(event.filter & EVFILT_READ); +#ifdef __APPLE__ + , reg(ep, fd.getfd(), EVFILT_READ, [this](struct kevent event) { + assert(event.filter & EVFILT_READ); +#else + , reg(ep, fd.getfd(), EPOLLIN, [this](uint32_t events) { + assert(events == EPOLLIN); +#endif this->on_connected(); }) { @@ -174,6 +215,7 @@ ipv4_endpoint server_socket::local_endpoint() const client_socket server_socket::accept(client_socket::on_ready_t on_disconnect) const { +#ifdef __APPLE__ struct sockaddr addr; socklen_t socklen = sizeof(addr); int res = ::accept(fd.getfd(), &addr, &socklen); @@ -183,27 +225,11 @@ client_socket server_socket::accept(client_socket::on_ready_t on_disconnect) con const int set = 1; ::setsockopt(res, SOL_SOCKET, SO_NOSIGPIPE, &set, sizeof(set)); // NOSIGPIPE FOR SEND +#else + int res = ::accept4(fd.getfd(), nullptr, nullptr, SOCK_NONBLOCK | SOCK_CLOEXEC); + if (res == -1) + throw_error(errno, "accept4()"); + +#endif return client_socket{reg.get_epoll(), {res}, std::move(on_disconnect)}; } - -//eventfd::eventfd(epoll& ep, on_event_t on_event) -// : event(create_eventfd()) -// , on_event(on_event) -// , reg(ep, fd.getfd(), 0, [this] (struct kevent event) { -// assert((event.filter & EVFILT_READ) == 0); -// uint64_t tmp; -// read(this->fd.getfd(), &tmp, sizeof tmp); -// this->on_event(); -// }) -//{} -// -//void eventfd::notify(uint64_t increment) -//{ -// write(fd, &increment, sizeof increment); -//} -// -//void eventfd::set_on_event(eventfd::on_event_t on_event) -//{ -// this->on_event = on_event; -// reg.modify(on_event ? EVFILT_READ: 0); -//} diff --git a/socket.h b/socket.h index 7a58db5..6888824 100644 --- a/socket.h +++ b/socket.h @@ -14,8 +14,11 @@ struct client_socket client_socket(epoll& ep, file_descriptor fd, on_ready_t on_disconnect); void set_on_read(on_ready_t on_ready); -// void set_on_write(on_ready_t on_ready); - +#ifndef __APPLE__ + void set_on_write(on_ready_t on_ready); +#else + void set_on_write(on_ready_t on_ready); +#endif size_t write_some(void const* data, size_t size); size_t read_some(void* data, size_t size); From 889d214f5fde53de5e6c0f59ff91691c4622b179 Mon Sep 17 00:00:00 2001 From: Dmitry Kurkin Date: Sun, 29 Nov 2015 23:50:11 +0300 Subject: [PATCH 05/15] pipe.* deleted --- pipe.cpp | 29 ----------------------------- pipe.h | 16 ---------------- 2 files changed, 45 deletions(-) delete mode 100644 pipe.cpp delete mode 100644 pipe.h diff --git a/pipe.cpp b/pipe.cpp deleted file mode 100644 index 2c9ddbd..0000000 --- a/pipe.cpp +++ /dev/null @@ -1,29 +0,0 @@ -#include "pipe.h" - -#include -#include - -#include "throw_error.h" - -pipe_pair make_pipe(bool non_block) -{ - int fds[2]; -#ifdef __APPLE__ - int res = pipe(fds); - - if (res != 0) - { - assert(res == -1); - throw_error(0, "pipe()"); // undeclarated errno? - } -#else - int res = pipe2(fds, O_CLOEXEC | (non_block ? O_NONBLOCK : 0)); - - if (res != 0) - { - assert(res == -1); - throw_error(errno, "pipe2()"); - } -#endif - return pipe_pair{weak_file_descriptor{fds[0]}, weak_file_descriptor{fds[1]}}; -} diff --git a/pipe.h b/pipe.h deleted file mode 100644 index b66efb6..0000000 --- a/pipe.h +++ /dev/null @@ -1,16 +0,0 @@ -#ifndef PIPE_H -#define PIPE_H - -#include "file_descriptor.h" -#include "cap_read.h" -#include "cap_write.h" - -struct pipe_pair -{ - weak_file_descriptor out; - weak_file_descriptor in; -}; - -pipe_pair make_pipe(bool non_block); - -#endif // PIPE_H From d71681caaec7ec5a2c47e6b95051ac1dce355ea6 Mon Sep 17 00:00:00 2001 From: Dmitry Date: Mon, 30 Nov 2015 15:39:40 +0300 Subject: [PATCH 06/15] Update socket.cpp --- socket.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/socket.cpp b/socket.cpp index 7f0ecef..7033ab1 100644 --- a/socket.cpp +++ b/socket.cpp @@ -157,13 +157,13 @@ void client_socket::set_on_read(on_ready_t on_ready) pimpl->on_read_ready = on_ready; update_registration(); } -#ifndef __APPLE__ + void client_socket::set_on_write(client_socket::on_ready_t on_ready) { pimpl->on_write_ready = on_ready; update_registration(); } -#endif + size_t client_socket::write_some(const void *data, size_t size) { return ::write_some(pimpl->fd, data, size); From b32ca489eab12e1262e99cdf5c520b61b3644600 Mon Sep 17 00:00:00 2001 From: Dmitry Kurkin Date: Mon, 30 Nov 2015 18:23:09 +0300 Subject: [PATCH 07/15] minor changes --- echo_server.cpp | 4 +-- echo_tester.cpp | 1 + epoll.cpp | 58 +++++++++++++++++++++++------------- epoll.h | 12 ++++---- socket.cpp | 79 ++++++++++++++++++++++++++++++++++--------------- socket.h | 4 --- 6 files changed, 102 insertions(+), 56 deletions(-) diff --git a/echo_server.cpp b/echo_server.cpp index 534274d..942fc0b 100644 --- a/echo_server.cpp +++ b/echo_server.cpp @@ -21,12 +21,12 @@ void echo_server::connection::update() assert(start_offset == 0); update(); }); - socket.set_on_write(client_socket::on_ready_t{}); + socket.set_on_write([this]{}); // otherwise exception will be throwen } else { assert(start_offset < end_offset); - socket.set_on_read(client_socket::on_ready_t{}); + socket.set_on_read([this]{}); socket.set_on_write([this] { start_offset += socket.write_some(buf + start_offset, end_offset - start_offset); if (start_offset == end_offset) diff --git a/echo_tester.cpp b/echo_tester.cpp index 617a666..bb39351 100644 --- a/echo_tester.cpp +++ b/echo_tester.cpp @@ -104,6 +104,7 @@ echo_tester::echo_tester(epoll &ep, ipv4_endpoint remote_endpoint) bool echo_tester::do_step() { + size_t i = rand() % (desired_number_of_connections * 2); if (i < connections.size()) { diff --git a/epoll.cpp b/epoll.cpp index 59fab9c..c56b515 100644 --- a/epoll.cpp +++ b/epoll.cpp @@ -78,15 +78,25 @@ void epoll::run() } assert(r > 0); - size_t num_events = static_cast(r); - assert(num_events <= ev_size); + assert(r <= ev_size); for (size_t i = 0; i < r; ++i) { try { struct kevent const& ee = ev[i]; + if (ev[i].ident == -1) { + continue; + } static_cast(ee.udata)->callback(ee); + if (event_deleted) { + event_deleted = false; + for (size_t j = i + 1; j < r; j++) { + if (ev[j].ident == ev[i].ident) { + ev[j].ident = -1; + } + } + } } catch (std::exception const& e) { @@ -148,17 +158,20 @@ void epoll::add(int fd, int16_t events, epoll_registration* reg) throw_error(errno, "kevent(EV_ADD)"); } -void epoll::modify(int fd, int16_t events, epoll_registration* reg) +void epoll::modify(int fd, std::list events, epoll_registration* reg) { - struct kevent event; - EV_SET(&event, fd, events, EV_DELETE, 0, 0, reg); - kevent(fd_, &event, 1, NULL, 0, NULL); - - EV_SET(&event, fd, events, EV_ADD, 0, 0, reg); - - int r = kevent(fd_, &event, 1, NULL, 0, NULL); - if (r < 0) - throw_error(errno, "kevent() MOD"); + for (auto it = events.begin(); it != events.end(); it++) { + struct kevent event; + EV_SET(&event, fd, *it, EV_DELETE, 0, 0, reg); + kevent(fd_, &event, 1, NULL, 0, NULL); + + EV_SET(&event, fd, *it, EV_ADD, 0, 0, reg); + + int r = kevent(fd_, &event, 1, NULL, 0, NULL); + if (r < 0) + throw_error(errno, "kevent() MOD"); + } + event_deleted = true; } void epoll::remove(int fd, int16_t events) @@ -169,6 +182,8 @@ void epoll::remove(int fd, int16_t events) int r = kevent(fd_, &event, 1, NULL, 0, NULL); if (r < 0) throw_error(errno, "kevent(EV_DELETE)"); + + event_deleted = true; } epoll_registration::epoll_registration() @@ -177,13 +192,15 @@ epoll_registration::epoll_registration() , events() {} -epoll_registration::epoll_registration(epoll& ep, int fd, uint32_t events, callback_t callback) +epoll_registration::epoll_registration(epoll& ep, int fd, std::list events, callback_t callback) : ep(&ep) , fd(fd) , events(events) , callback(std::move(callback)) { - ep.add(fd, events, this); + for (auto it = events.begin(); it != events.end(); it++) { + ep.add(fd, *it, this); + } } epoll_registration::epoll_registration(epoll_registration&& rhs) @@ -195,7 +212,7 @@ epoll_registration::epoll_registration(epoll_registration&& rhs) update(); rhs.ep = nullptr; rhs.fd = -1; - rhs.events = 0; + rhs.events = events; rhs.callback = callback_t(); } @@ -210,13 +227,10 @@ epoll_registration& epoll_registration::operator=(epoll_registration rhs) return *this; } -void epoll_registration::modify(int16_t new_events) +void epoll_registration::modify(std::list new_events) { assert(ep); - if (events == new_events) - return; - ep->modify(fd, new_events, this); events = new_events; } @@ -235,10 +249,12 @@ void epoll_registration::clear() { if (ep) { - ep->remove(fd, events); + for (auto it = events.begin(); it != events.end(); it++) { + ep->remove(fd, *it); + } ep = nullptr; fd = -1; - events = 0; + events = {}; } } diff --git a/epoll.h b/epoll.h index e5f4cde..b84e5cc 100644 --- a/epoll.h +++ b/epoll.h @@ -7,6 +7,7 @@ #include #ifdef __APPLE__ #include +#include namespace sysapi { @@ -28,12 +29,13 @@ namespace sysapi private: void add(int fd, int16_t events, epoll_registration*); - void modify(int fd, int16_t events, epoll_registration*); + void modify(int fd, std::list events, epoll_registration*); void remove(int fd, int16_t events); private: int fd_; - + bool event_deleted = false; + friend struct epoll_registration; }; @@ -42,14 +44,14 @@ namespace sysapi typedef std::function callback_t; epoll_registration(); - epoll_registration(epoll&, int fd, uint32_t events, callback_t callback); + epoll_registration(epoll&, int fd, std::list events, callback_t callback); epoll_registration(epoll_registration const&) = delete; epoll_registration(epoll_registration&&); ~epoll_registration(); epoll_registration& operator=(epoll_registration); - void modify(int16_t new_events); + void modify(std::list new_events); void swap(epoll_registration& other); @@ -62,7 +64,7 @@ namespace sysapi private: epoll* ep; int fd; - uint32_t events; + std::list events; callback_t callback; friend struct epoll; diff --git a/socket.cpp b/socket.cpp index 7f0ecef..aedb2ab 100644 --- a/socket.cpp +++ b/socket.cpp @@ -13,17 +13,39 @@ #include "cap_write.h" #include "cap_read.h" + +#include + namespace { +#ifdef __APPLE__ + int get_fd_flags(int fd); + void set_fd_flags(int fd, int flags); + + file_descriptor make_socket(int domain, int type, bool nonblock) + { + int fd = ::socket(domain, type, 0); + if (fd == -1) + throw_error(errno, "socket()"); + + if (nonblock) { + set_fd_flags(fd, get_fd_flags(fd) | O_NONBLOCK); + } + + return file_descriptor{fd}; + } + +#else file_descriptor make_socket(int domain, int type) { int fd = ::socket(domain, type, 0); if (fd == -1) throw_error(errno, "socket()"); - + return file_descriptor{fd}; } - + +#endif void start_listen(int fd) { int res = ::listen(fd, SOMAXCONN); @@ -54,20 +76,12 @@ namespace throw_error(errno, "connect()"); } - file_descriptor create_eventfd(bool semaphore) - { - int res = ::eventfd(0, (semaphore ? EFD_SEMAPHORE : 0) | EFD_CLOEXEC | EFD_NONBLOCK); - if (res == -1) - throw_error(errno, "eventfd()"); - - return file_descriptor{res}; - } - int get_fd_flags(int fd) { int res = fcntl(fd, F_GETFL, 0); if (res == -1) throw_error(errno, "fcntl(F_GETFL)"); + return res; } void set_fd_flags(int fd, int flags) @@ -86,8 +100,8 @@ client_socket::impl::impl(sysapi::epoll &ep, file_descriptor fd, on_ready_t on_d : ep(ep) , fd(std::move(fd)) #ifdef __APPLE__ - , reg(ep, this->fd.getfd(), EVFILT_READ, [this](struct kevent event) { - assert(event.filter & EVFILT_READ); + , reg(ep, this->fd.getfd(), {EVFILT_READ}, [this](struct kevent event) { + assert(event.filter == EVFILT_READ || event.filter == EVFILT_WRITE); #else , reg(ep, this->fd.getfd(), 0, [this](uint32_t events) { assert((events & ~(EPOLLIN | EPOLLOUT | EPOLLRDHUP | EPOLLERR | EPOLLHUP)) == 0); @@ -98,18 +112,24 @@ client_socket::impl::impl(sysapi::epoll &ep, file_descriptor fd, on_ready_t on_d try { #ifdef __APPLE__ - if (event.filter & EVFILT_READ && event.flags & EV_EOF && event.data == 0) + if ((event.filter == EVFILT_READ && event.flags & EV_EOF) || (event.filter == EVFILT_WRITE && event.flags & EV_EOF)) { this->on_disconnect(); if (is_destroyed) return; } - if (event.filter & EVFILT_READ) + if (event.filter == EVFILT_READ) { this->on_read_ready(); if (is_destroyed) return; } + if (event.filter == EVFILT_WRITE) + { + this->on_write_ready(); + if (is_destroyed) + return; + } #else if ((events & EPOLLRDHUP) @@ -139,6 +159,7 @@ client_socket::impl::impl(sysapi::epoll &ep, file_descriptor fd, on_ready_t on_d destroyed = nullptr; throw; } + destroyed = nullptr; }) , on_disconnect(std::move(on_disconnect)) @@ -157,13 +178,13 @@ void client_socket::set_on_read(on_ready_t on_ready) pimpl->on_read_ready = on_ready; update_registration(); } -#ifndef __APPLE__ + void client_socket::set_on_write(client_socket::on_ready_t on_ready) { pimpl->on_write_ready = on_ready; update_registration(); } -#endif + size_t client_socket::write_some(const void *data, size_t size) { return ::write_some(pimpl->fd, data, size); @@ -176,7 +197,11 @@ size_t client_socket::read_some(void* data, size_t size) client_socket client_socket::connect(sysapi::epoll &ep, const ipv4_endpoint &remote, on_ready_t on_disconnect) { +#ifdef __APPLE__ + file_descriptor fd = make_socket(AF_INET, SOCK_STREAM, false); +#else file_descriptor fd = make_socket(AF_INET, SOCK_STREAM); +#endif connect_socket(fd.getfd(), remote.port_net, remote.addr_net); set_fd_flags(fd.getfd(), get_fd_flags(fd.getfd()) | O_NONBLOCK); client_socket res{ep, std::move(fd), std::move(on_disconnect)}; @@ -186,7 +211,9 @@ client_socket client_socket::connect(sysapi::epoll &ep, const ipv4_endpoint &rem void client_socket::update_registration() { #ifdef __APPLE__ - pimpl->reg.modify(EVFILT_READ); + + pimpl->reg.modify({EVFILT_READ, EVFILT_WRITE}); + #else pimpl->reg.modify((pimpl->on_read_ready ? EPOLLIN : 0) | (pimpl->on_write_ready ? EPOLLOUT: 0) @@ -195,12 +222,14 @@ void client_socket::update_registration() } server_socket::server_socket(epoll& ep, on_connected_t on_connected) - : fd(make_socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK)) - , on_connected(on_connected) #ifdef __APPLE__ - , reg(ep, fd.getfd(), EVFILT_READ, [this](struct kevent event) { + : fd(make_socket(AF_INET, SOCK_STREAM, true)) + , on_connected(on_connected) + , reg(ep, fd.getfd(), {EVFILT_READ}, [this](struct kevent event) { assert(event.filter & EVFILT_READ); #else + : fd(make_socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK)) + , on_connected(on_connected) , reg(ep, fd.getfd(), EPOLLIN, [this](uint32_t events) { assert(events == EPOLLIN); #endif @@ -211,12 +240,14 @@ server_socket::server_socket(epoll& ep, on_connected_t on_connected) } server_socket::server_socket(epoll& ep, ipv4_endpoint local_endpoint, on_connected_t on_connected) - : fd(make_socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK)) - , on_connected(on_connected) #ifdef __APPLE__ - , reg(ep, fd.getfd(), EVFILT_READ, [this](struct kevent event) { + : fd(make_socket(AF_INET, SOCK_STREAM, true)) + , on_connected(on_connected) + , reg(ep, fd.getfd(), {EVFILT_READ}, [this](struct kevent event) { assert(event.filter & EVFILT_READ); #else + : fd(make_socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK)) + , on_connected(on_connected) , reg(ep, fd.getfd(), EPOLLIN, [this](uint32_t events) { assert(events == EPOLLIN); #endif diff --git a/socket.h b/socket.h index f6b7e67..3fb1bed 100644 --- a/socket.h +++ b/socket.h @@ -14,11 +14,7 @@ struct client_socket client_socket(epoll& ep, file_descriptor fd, on_ready_t on_disconnect); void set_on_read(on_ready_t on_ready); -#ifndef __APPLE__ void set_on_write(on_ready_t on_ready); -#else - void set_on_write(on_ready_t on_ready); -#endif size_t write_some(void const* data, size_t size); size_t read_some(void* data, size_t size); From a925a42c64d00995d252cf06ab42b4a78ecd2360 Mon Sep 17 00:00:00 2001 From: Dmitry Kurkin Date: Tue, 1 Dec 2015 11:20:28 +0300 Subject: [PATCH 08/15] relocate kqueue --- CMakeLists.txt | 31 ++++-- echo_tester.h | 4 + epoll.cpp | 219 +++---------------------------------------- epoll.h | 71 +------------- kqueue.cpp | 211 +++++++++++++++++++++++++++++++++++++++++ kqueue.hpp | 76 +++++++++++++++ main_echo_server.cpp | 4 + main_echo_test.cpp | 4 + socket.h | 4 + 9 files changed, 340 insertions(+), 284 deletions(-) create mode 100644 kqueue.cpp create mode 100644 kqueue.hpp diff --git a/CMakeLists.txt b/CMakeLists.txt index 3dda5ca..060793a 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -4,15 +4,28 @@ project(echo_test) set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11 -g") -add_library(common STATIC - address.cpp - cap_read.cpp - cap_write.cpp - epoll.cpp - pipe.cpp - socket.cpp - throw_error.cpp -) +IF(UNIX) + IF(APPLE) + add_library(common STATIC + address.cpp + cap_read.cpp + cap_write.cpp + kqueue.cpp + socket.cpp + throw_error.cpp + ) + ELSE(APPLE) + add_library(common STATIC + address.cpp + cap_read.cpp + cap_write.cpp + epoll.cpp + socket.cpp + throw_error.cpp + ) + ENDIF(APPLE) +ENDIF(UNIX) + add_executable(echo_server main_echo_server.cpp diff --git a/echo_tester.h b/echo_tester.h index 047cd8c..f7dde49 100644 --- a/echo_tester.h +++ b/echo_tester.h @@ -1,7 +1,11 @@ #ifndef ECHO_TESTER_H #define ECHO_TESTER_H +#ifdef __APPLE__ +#include "kqueue.hpp" +#else #include "epoll.h" +#endif #include "address.h" #include "socket.h" #include diff --git a/epoll.cpp b/epoll.cpp index c56b515..f0a14c3 100644 --- a/epoll.cpp +++ b/epoll.cpp @@ -1,11 +1,8 @@ #include "epoll.h" -#ifndef __APPLE__ #include #include -#endif - #include #include #include @@ -17,16 +14,6 @@ using namespace sysapi; epoll::epoll() { -#ifdef __APPLE__ - int r = kqueue(); - - if (r == -1) - throw_error(errno, "kqueue()"); - - assert(r >= 0); - - fd_ = r; -#else int r = ::epoll_create1(EPOLL_CLOEXEC); if (r == -1) throw_error(errno, "epoll_create1()"); @@ -34,15 +21,10 @@ epoll::epoll() assert(r >= 0); fd_.reset(r); -#endif } epoll::epoll(epoll&& rhs) -#ifdef __APPLE__ - : fd_(rhs.fd_) -#else - : fd_(std::move(rhs.fd_)) -#endif +: fd_(std::move(rhs.fd_)) {} epoll& epoll::operator=(epoll rhs) @@ -53,7 +35,7 @@ epoll& epoll::operator=(epoll rhs) void epoll::swap(epoll& other) { - using std::swap; + using ::swap; swap(fd_, other.fd_); } @@ -61,53 +43,6 @@ void epoll::run() { for (;;) { -#ifdef __APPLE__ - int const ev_size = 16; - struct kevent ev[ev_size]; - - int r = kevent(fd_, NULL, 0, ev, ev_size, NULL); - - if (r < 0) - { - int err = errno; - - if (err == EINTR) - continue; - - throw_error(err, "kevent()"); - } - - assert(r > 0); - assert(r <= ev_size); - - for (size_t i = 0; i < r; ++i) - { - try - { - struct kevent const& ee = ev[i]; - if (ev[i].ident == -1) { - continue; - } - static_cast(ee.udata)->callback(ee); - if (event_deleted) { - event_deleted = false; - for (size_t j = i + 1; j < r; j++) { - if (ev[j].ident == ev[i].ident) { - ev[j].ident = -1; - } - } - } - } - catch (std::exception const& e) - { - std::cerr << "error: " << e.what() << std::endl; - } - catch (...) - { - std::cerr << "unknown exception in message loop" << std::endl; - } - } -#else std::array ev; again: @@ -143,134 +78,9 @@ void epoll::run() std::cerr << "unknown exception in message loop" << std::endl; } } -#endif } } -#ifdef __APPLE__ -void epoll::add(int fd, int16_t events, epoll_registration* reg) -{ - struct kevent event; - EV_SET(&event, fd, events, EV_ADD, 0, 0, reg); - - int r = kevent(fd_, &event, 1, NULL, 0, NULL); - if (r < 0) - throw_error(errno, "kevent(EV_ADD)"); -} - -void epoll::modify(int fd, std::list events, epoll_registration* reg) -{ - for (auto it = events.begin(); it != events.end(); it++) { - struct kevent event; - EV_SET(&event, fd, *it, EV_DELETE, 0, 0, reg); - kevent(fd_, &event, 1, NULL, 0, NULL); - - EV_SET(&event, fd, *it, EV_ADD, 0, 0, reg); - - int r = kevent(fd_, &event, 1, NULL, 0, NULL); - if (r < 0) - throw_error(errno, "kevent() MOD"); - } - event_deleted = true; -} - -void epoll::remove(int fd, int16_t events) -{ - struct kevent event; - EV_SET(&event, fd, events, EV_DELETE, 0, 0, NULL); - - int r = kevent(fd_, &event, 1, NULL, 0, NULL); - if (r < 0) - throw_error(errno, "kevent(EV_DELETE)"); - - event_deleted = true; -} - -epoll_registration::epoll_registration() - : ep() - , fd(-1) - , events() -{} - -epoll_registration::epoll_registration(epoll& ep, int fd, std::list events, callback_t callback) - : ep(&ep) - , fd(fd) - , events(events) - , callback(std::move(callback)) -{ - for (auto it = events.begin(); it != events.end(); it++) { - ep.add(fd, *it, this); - } -} - -epoll_registration::epoll_registration(epoll_registration&& rhs) - : ep(rhs.ep) - , fd(rhs.fd) - , events(rhs.events) - , callback(std::move(rhs.callback)) -{ - update(); - rhs.ep = nullptr; - rhs.fd = -1; - rhs.events = events; - rhs.callback = callback_t(); -} - -epoll_registration::~epoll_registration() -{ - clear(); -} - -epoll_registration& epoll_registration::operator=(epoll_registration rhs) -{ - swap(rhs); - return *this; -} - -void epoll_registration::modify(std::list new_events) -{ - assert(ep); - - ep->modify(fd, new_events, this); - events = new_events; -} - -void epoll_registration::swap(epoll_registration& other) -{ - std::swap(ep, other.ep); - std::swap(fd, other.fd); - std::swap(events, other.events); - std::swap(callback, other.callback); - update(); - other.update(); -} - -void epoll_registration::clear() -{ - if (ep) - { - for (auto it = events.begin(); it != events.end(); it++) { - ep->remove(fd, *it); - } - ep = nullptr; - fd = -1; - events = {}; - } -} - -epoll& epoll_registration::get_epoll() const -{ - return *ep; -} - -void epoll_registration::update() -{ - if (ep) - ep->modify(fd, events, this); -} - -#else - void epoll::add(int fd, uint32_t events, epoll_registration* reg) { epoll_event ev = {0, 0}; @@ -301,25 +111,25 @@ void epoll::remove(int fd) } epoll_registration::epoll_registration() - : ep() - , fd(-1) - , events() +: ep() +, fd(-1) +, events() {} epoll_registration::epoll_registration(epoll& ep, int fd, uint32_t events, callback_t callback) - : ep(&ep) - , fd(fd) - , events(events) - , callback(std::move(callback)) +: ep(&ep) +, fd(fd) +, events(events) +, callback(std::move(callback)) { ep.add(fd, events, this); } epoll_registration::epoll_registration(epoll_registration&& rhs) - : ep(rhs.ep) - , fd(rhs.fd) - , events(rhs.events) - , callback(std::move(rhs.callback)) +: ep(rhs.ep) +, fd(rhs.fd) +, events(rhs.events) +, callback(std::move(rhs.callback)) { update(); rhs.ep = nullptr; @@ -380,5 +190,4 @@ void epoll_registration::update() { if (ep) ep->modify(fd, events, this); -} -#endif +} \ No newline at end of file diff --git a/epoll.h b/epoll.h index b84e5cc..5205ff5 100644 --- a/epoll.h +++ b/epoll.h @@ -5,73 +5,6 @@ #include #include -#ifdef __APPLE__ -#include -#include - -namespace sysapi -{ - struct epoll; - struct epoll_registration; - - struct epoll - { - typedef std::function action_t; - epoll(); - epoll(epoll const&) = delete; - epoll(epoll&&); - - epoll& operator=(epoll); - - void swap(epoll& other); - - void run(); - - private: - void add(int fd, int16_t events, epoll_registration*); - void modify(int fd, std::list events, epoll_registration*); - void remove(int fd, int16_t events); - - private: - int fd_; - bool event_deleted = false; - - friend struct epoll_registration; - }; - - struct epoll_registration - { - typedef std::function callback_t; - - epoll_registration(); - epoll_registration(epoll&, int fd, std::list events, callback_t callback); - epoll_registration(epoll_registration const&) = delete; - epoll_registration(epoll_registration&&); - ~epoll_registration(); - - epoll_registration& operator=(epoll_registration); - - void modify(std::list new_events); - - void swap(epoll_registration& other); - - void clear(); - epoll& get_epoll() const; - - private: - void update(); - - private: - epoll* ep; - int fd; - std::list events; - callback_t callback; - - friend struct epoll; - }; -} - -#else namespace sysapi { @@ -134,9 +67,7 @@ namespace sysapi }; } -#endif - using sysapi::epoll; using sysapi::epoll_registration; -#endif +#endif \ No newline at end of file diff --git a/kqueue.cpp b/kqueue.cpp new file mode 100644 index 0000000..0234871 --- /dev/null +++ b/kqueue.cpp @@ -0,0 +1,211 @@ +#include "kqueue.hpp" + +#include +#include +#include +#include + +#include "throw_error.h" + +using namespace sysapi; + +epoll::epoll() +{ + int r = kqueue(); + + if (r == -1) + throw_error(errno, "kqueue()"); + + assert(r >= 0); + + fd_ = r; +} + +epoll::epoll(epoll&& rhs) + : fd_(rhs.fd_) +{} + +epoll& epoll::operator=(epoll rhs) +{ + swap(rhs); + return *this; +} + +void epoll::swap(epoll& other) +{ + using std::swap; + swap(fd_, other.fd_); +} + +void epoll::run() +{ + for (;;) + { + int const ev_size = 16; + struct kevent ev[ev_size]; + + int r = kevent(fd_, NULL, 0, ev, ev_size, NULL); + + if (r < 0) + { + int err = errno; + + if (err == EINTR) + continue; + + throw_error(err, "kevent()"); + } + + assert(r > 0); + assert(r <= ev_size); + + for (size_t i = 0; i < r; ++i) + { + try + { + struct kevent const& ee = ev[i]; + if (ev[i].ident == -1) { + continue; + } + static_cast(ee.udata)->callback(ee); + if (event_deleted) { + event_deleted = false; + for (size_t j = i + 1; j < r; j++) { + if (ev[j].ident == ev[i].ident) { + ev[j].ident = -1; + } + } + } + } + catch (std::exception const& e) + { + std::cerr << "error: " << e.what() << std::endl; + } + catch (...) + { + std::cerr << "unknown exception in message loop" << std::endl; + } + } + } +} + +void epoll::add(int fd, int16_t events, epoll_registration* reg) +{ + struct kevent event; + EV_SET(&event, fd, events, EV_ADD, 0, 0, reg); + + int r = kevent(fd_, &event, 1, NULL, 0, NULL); + if (r < 0) + throw_error(errno, "kevent(EV_ADD)"); +} + +void epoll::modify(int fd, std::list events, epoll_registration* reg) +{ + for (auto it = events.begin(); it != events.end(); it++) { + struct kevent event; + EV_SET(&event, fd, *it, EV_DELETE, 0, 0, reg); + kevent(fd_, &event, 1, NULL, 0, NULL); + + EV_SET(&event, fd, *it, EV_ADD, 0, 0, reg); + + int r = kevent(fd_, &event, 1, NULL, 0, NULL); + if (r < 0) + throw_error(errno, "kevent() MOD"); + } + event_deleted = true; +} + +void epoll::remove(int fd, int16_t events) +{ + struct kevent event; + EV_SET(&event, fd, events, EV_DELETE, 0, 0, NULL); + + int r = kevent(fd_, &event, 1, NULL, 0, NULL); + if (r < 0) + throw_error(errno, "kevent(EV_DELETE)"); + + event_deleted = true; +} + +epoll_registration::epoll_registration() +: ep() +, fd(-1) +, events() +{} + +epoll_registration::epoll_registration(epoll& ep, int fd, std::list events, callback_t callback) +: ep(&ep) +, fd(fd) +, events(events) +, callback(std::move(callback)) +{ + for (auto it = events.begin(); it != events.end(); it++) { + ep.add(fd, *it, this); + } +} + +epoll_registration::epoll_registration(epoll_registration&& rhs) +: ep(rhs.ep) +, fd(rhs.fd) +, events(rhs.events) +, callback(std::move(rhs.callback)) +{ + update(); + rhs.ep = nullptr; + rhs.fd = -1; + rhs.events = events; + rhs.callback = callback_t(); +} + +epoll_registration::~epoll_registration() +{ + clear(); +} + +epoll_registration& epoll_registration::operator=(epoll_registration rhs) +{ + swap(rhs); + return *this; +} + +void epoll_registration::modify(std::list new_events) +{ + assert(ep); + + ep->modify(fd, new_events, this); + events = new_events; +} + +void epoll_registration::swap(epoll_registration& other) +{ + std::swap(ep, other.ep); + std::swap(fd, other.fd); + std::swap(events, other.events); + std::swap(callback, other.callback); + update(); + other.update(); +} + +void epoll_registration::clear() +{ + if (ep) + { + for (auto it = events.begin(); it != events.end(); it++) { + ep->remove(fd, *it); + } + ep = nullptr; + fd = -1; + events = {}; + } +} + +epoll& epoll_registration::get_epoll() const +{ + return *ep; +} + +void epoll_registration::update() +{ + if (ep) + ep->modify(fd, events, this); +} \ No newline at end of file diff --git a/kqueue.hpp b/kqueue.hpp new file mode 100644 index 0000000..d44c701 --- /dev/null +++ b/kqueue.hpp @@ -0,0 +1,76 @@ +#ifndef kqueue_hpp +#define kqueue_hpp + +#include "file_descriptor.h" + +#include +#include +#include +#include + +namespace sysapi +{ + struct epoll; + struct epoll_registration; + + struct epoll + { + typedef std::function action_t; + epoll(); + epoll(epoll const&) = delete; + epoll(epoll&&); + + epoll& operator=(epoll); + + void swap(epoll& other); + + void run(); + + private: + void add(int fd, int16_t events, epoll_registration*); + void modify(int fd, std::list events, epoll_registration*); + void remove(int fd, int16_t events); + + private: + int fd_; + bool event_deleted = false; + + friend struct epoll_registration; + }; + + struct epoll_registration + { + typedef std::function callback_t; + + epoll_registration(); + epoll_registration(epoll&, int fd, std::list events, callback_t callback); + epoll_registration(epoll_registration const&) = delete; + epoll_registration(epoll_registration&&); + ~epoll_registration(); + + epoll_registration& operator=(epoll_registration); + + void modify(std::list new_events); + + void swap(epoll_registration& other); + + void clear(); + epoll& get_epoll() const; + + private: + void update(); + + private: + epoll* ep; + int fd; + std::list events; + callback_t callback; + + friend struct epoll; + }; +} + +using sysapi::epoll; +using sysapi::epoll_registration; + +#endif diff --git a/main_echo_server.cpp b/main_echo_server.cpp index 48bac64..7ac6daf 100644 --- a/main_echo_server.cpp +++ b/main_echo_server.cpp @@ -1,6 +1,10 @@ #include +#ifdef __APPLE__ +#include "kqueue.hpp" +#else #include "epoll.h" +#endif #include "echo_server.h" int main() diff --git a/main_echo_test.cpp b/main_echo_test.cpp index 8b17bee..607903f 100644 --- a/main_echo_test.cpp +++ b/main_echo_test.cpp @@ -1,6 +1,10 @@ #include +#ifdef __APPLE__ +#include "kqueue.hpp" +#else #include "epoll.h" +#endif #include "echo_tester.h" int main(int argc, char* argv[]) diff --git a/socket.h b/socket.h index 3fb1bed..d825859 100644 --- a/socket.h +++ b/socket.h @@ -3,7 +3,11 @@ #include "file_descriptor.h" #include "address.h" +#ifdef __APPLE__ +#include "kqueue.hpp" +#else #include "epoll.h" +#endif #include #include From 308e64abbe69204b7eb5d68c4a33341bcb306337 Mon Sep 17 00:00:00 2001 From: Dmitry Kurkin Date: Tue, 1 Dec 2015 11:28:58 +0300 Subject: [PATCH 09/15] minor changes --- epoll.cpp | 46 +++++++++++++++++++++++----------------------- epoll.h | 32 ++++++++++++++++---------------- socket.h | 1 + 3 files changed, 40 insertions(+), 39 deletions(-) diff --git a/epoll.cpp b/epoll.cpp index f0a14c3..f7fddb3 100644 --- a/epoll.cpp +++ b/epoll.cpp @@ -17,14 +17,14 @@ epoll::epoll() int r = ::epoll_create1(EPOLL_CLOEXEC); if (r == -1) throw_error(errno, "epoll_create1()"); - + assert(r >= 0); - + fd_.reset(r); } epoll::epoll(epoll&& rhs) -: fd_(std::move(rhs.fd_)) + : fd_(std::move(rhs.fd_)) {} epoll& epoll::operator=(epoll rhs) @@ -44,20 +44,20 @@ void epoll::run() for (;;) { std::array ev; - + again: int r = ::epoll_wait(fd_.getfd(), ev.data(), ev.size(), -1); - + if (r < 0) { int err = errno; - + if (err == EINTR) goto again; - + throw_error(err, "epoll_wait()"); } - + assert(r > 0); size_t num_events = static_cast(r); assert(num_events <= ev.size()); @@ -86,7 +86,7 @@ void epoll::add(int fd, uint32_t events, epoll_registration* reg) epoll_event ev = {0, 0}; ev.data.ptr = reg; ev.events = events; - + int r = ::epoll_ctl(fd_.getfd(), EPOLL_CTL_ADD, fd, &ev); if (r < 0) throw_error(errno, "epoll_ctl(EPOLL_CTL_ADD)"); @@ -97,7 +97,7 @@ void epoll::modify(int fd, uint32_t events, epoll_registration* reg) epoll_event ev = {0, 0}; ev.data.ptr = reg; ev.events = events; - + int r = ::epoll_ctl(fd_.getfd(), EPOLL_CTL_MOD, fd, &ev); if (r < 0) throw_error(errno, "epoll_ctl(EPOLL_CTL_MOD)"); @@ -111,25 +111,25 @@ void epoll::remove(int fd) } epoll_registration::epoll_registration() -: ep() -, fd(-1) -, events() + : ep() + , fd(-1) + , events() {} epoll_registration::epoll_registration(epoll& ep, int fd, uint32_t events, callback_t callback) -: ep(&ep) -, fd(fd) -, events(events) -, callback(std::move(callback)) + : ep(&ep) + , fd(fd) + , events(events) + , callback(std::move(callback)) { ep.add(fd, events, this); } epoll_registration::epoll_registration(epoll_registration&& rhs) -: ep(rhs.ep) -, fd(rhs.fd) -, events(rhs.events) -, callback(std::move(rhs.callback)) + : ep(rhs.ep) + , fd(rhs.fd) + , events(rhs.events) + , callback(std::move(rhs.callback)) { update(); rhs.ep = nullptr; @@ -152,10 +152,10 @@ epoll_registration& epoll_registration::operator=(epoll_registration rhs) void epoll_registration::modify(uint32_t new_events) { assert(ep); - + if (events == new_events) return; - + ep->modify(fd, new_events, this); events = new_events; } diff --git a/epoll.h b/epoll.h index 5205ff5..5de621b 100644 --- a/epoll.h +++ b/epoll.h @@ -10,59 +10,59 @@ namespace sysapi { struct epoll; struct epoll_registration; - + struct epoll { typedef std::function action_t; epoll(); epoll(epoll const&) = delete; epoll(epoll&&); - + epoll& operator=(epoll); - + void swap(epoll& other); - + void run(); - + private: void add(int fd, uint32_t events, epoll_registration*); void modify(int fd, uint32_t events, epoll_registration*); void remove(int fd); - + private: file_descriptor fd_; - + friend struct epoll_registration; }; - + struct epoll_registration { typedef std::function callback_t; - + epoll_registration(); epoll_registration(epoll&, int fd, uint32_t events, callback_t callback); epoll_registration(epoll_registration const&) = delete; epoll_registration(epoll_registration&&); ~epoll_registration(); - + epoll_registration& operator=(epoll_registration); - + void modify(uint32_t new_events); - + void swap(epoll_registration& other); - + void clear(); epoll& get_epoll() const; - + private: void update(); - + private: epoll* ep; int fd; uint32_t events; callback_t callback; - + friend struct epoll; }; } diff --git a/socket.h b/socket.h index d825859..74b41f1 100644 --- a/socket.h +++ b/socket.h @@ -19,6 +19,7 @@ struct client_socket void set_on_read(on_ready_t on_ready); void set_on_write(on_ready_t on_ready); + size_t write_some(void const* data, size_t size); size_t read_some(void* data, size_t size); From 195ba7744d431e932351d8bab006c3ba1fcec751 Mon Sep 17 00:00:00 2001 From: Dmitry Kurkin Date: Tue, 1 Dec 2015 11:33:22 +0300 Subject: [PATCH 10/15] minor changes --- echo_tester.cpp | 1 - epoll.cpp | 2 +- socket.cpp | 27 ++++++++++++--------------- 3 files changed, 13 insertions(+), 17 deletions(-) diff --git a/echo_tester.cpp b/echo_tester.cpp index bb39351..617a666 100644 --- a/echo_tester.cpp +++ b/echo_tester.cpp @@ -104,7 +104,6 @@ echo_tester::echo_tester(epoll &ep, ipv4_endpoint remote_endpoint) bool echo_tester::do_step() { - size_t i = rand() % (desired_number_of_connections * 2); if (i < connections.size()) { diff --git a/epoll.cpp b/epoll.cpp index f7fddb3..7dd2b5d 100644 --- a/epoll.cpp +++ b/epoll.cpp @@ -61,7 +61,7 @@ void epoll::run() assert(r > 0); size_t num_events = static_cast(r); assert(num_events <= ev.size()); - + for (auto i = ev.begin(); i != ev.begin() + num_events; ++i) { try diff --git a/socket.cpp b/socket.cpp index aedb2ab..748a360 100644 --- a/socket.cpp +++ b/socket.cpp @@ -13,9 +13,6 @@ #include "cap_write.h" #include "cap_read.h" - -#include - namespace { #ifdef __APPLE__ @@ -41,10 +38,10 @@ namespace int fd = ::socket(domain, type, 0); if (fd == -1) throw_error(errno, "socket()"); - + return file_descriptor{fd}; } - + #endif void start_listen(int fd) { @@ -216,8 +213,8 @@ void client_socket::update_registration() #else pimpl->reg.modify((pimpl->on_read_ready ? EPOLLIN : 0) - | (pimpl->on_write_ready ? EPOLLOUT: 0) - | EPOLLRDHUP); + | (pimpl->on_write_ready ? EPOLLOUT: 0) + | EPOLLRDHUP); #endif } @@ -241,15 +238,15 @@ server_socket::server_socket(epoll& ep, on_connected_t on_connected) server_socket::server_socket(epoll& ep, ipv4_endpoint local_endpoint, on_connected_t on_connected) #ifdef __APPLE__ - : fd(make_socket(AF_INET, SOCK_STREAM, true)) - , on_connected(on_connected) - , reg(ep, fd.getfd(), {EVFILT_READ}, [this](struct kevent event) { - assert(event.filter & EVFILT_READ); + : fd(make_socket(AF_INET, SOCK_STREAM, true)) + , on_connected(on_connected) + , reg(ep, fd.getfd(), {EVFILT_READ}, [this](struct kevent event) { + assert(event.filter & EVFILT_READ); #else - : fd(make_socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK)) - , on_connected(on_connected) - , reg(ep, fd.getfd(), EPOLLIN, [this](uint32_t events) { - assert(events == EPOLLIN); + : fd(make_socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK)) + , on_connected(on_connected) + , reg(ep, fd.getfd(), EPOLLIN, [this](uint32_t events) { + assert(events == EPOLLIN); #endif this->on_connected(); }) From e8bae74503dcb586f95f18be04c7656780219d03 Mon Sep 17 00:00:00 2001 From: Dmitry Kurkin Date: Tue, 1 Dec 2015 11:41:05 +0300 Subject: [PATCH 11/15] relocate socket --- epoll.cpp | 2 +- epoll.h | 2 +- socket.cpp | 121 ++++++++------------------- socket.h | 18 +++- socket_apple.cpp | 212 +++++++++++++++++++++++++++++++++++++++++++++++ socket_apple.h | 61 ++++++++++++++ 6 files changed, 322 insertions(+), 94 deletions(-) create mode 100644 socket_apple.cpp create mode 100644 socket_apple.h diff --git a/epoll.cpp b/epoll.cpp index 7dd2b5d..2659efc 100644 --- a/epoll.cpp +++ b/epoll.cpp @@ -190,4 +190,4 @@ void epoll_registration::update() { if (ep) ep->modify(fd, events, this); -} \ No newline at end of file +} diff --git a/epoll.h b/epoll.h index 5de621b..86fec85 100644 --- a/epoll.h +++ b/epoll.h @@ -70,4 +70,4 @@ namespace sysapi using sysapi::epoll; using sysapi::epoll_registration; -#endif \ No newline at end of file +#endif diff --git a/socket.cpp b/socket.cpp index 748a360..9727ced 100644 --- a/socket.cpp +++ b/socket.cpp @@ -6,33 +6,13 @@ #include #include "throw_error.h" -#ifndef __APPLE__ #include #include -#endif #include "cap_write.h" #include "cap_read.h" namespace { -#ifdef __APPLE__ - int get_fd_flags(int fd); - void set_fd_flags(int fd, int flags); - - file_descriptor make_socket(int domain, int type, bool nonblock) - { - int fd = ::socket(domain, type, 0); - if (fd == -1) - throw_error(errno, "socket()"); - - if (nonblock) { - set_fd_flags(fd, get_fd_flags(fd) | O_NONBLOCK); - } - - return file_descriptor{fd}; - } - -#else file_descriptor make_socket(int domain, int type) { int fd = ::socket(domain, type, 0); @@ -42,7 +22,6 @@ namespace return file_descriptor{fd}; } -#endif void start_listen(int fd) { int res = ::listen(fd, SOMAXCONN); @@ -73,12 +52,20 @@ namespace throw_error(errno, "connect()"); } + file_descriptor create_eventfd(bool semaphore) + { + int res = ::eventfd(0, (semaphore ? EFD_SEMAPHORE : 0) | EFD_CLOEXEC | EFD_NONBLOCK); + if (res == -1) + throw_error(errno, "eventfd()"); + + return file_descriptor{res}; + } + int get_fd_flags(int fd) { int res = fcntl(fd, F_GETFL, 0); if (res == -1) throw_error(errno, "fcntl(F_GETFL)"); - return res; } void set_fd_flags(int fd, int flags) @@ -96,42 +83,16 @@ client_socket::client_socket(sysapi::epoll &ep, file_descriptor fd, on_ready_t o client_socket::impl::impl(sysapi::epoll &ep, file_descriptor fd, on_ready_t on_disconnect) : ep(ep) , fd(std::move(fd)) -#ifdef __APPLE__ - , reg(ep, this->fd.getfd(), {EVFILT_READ}, [this](struct kevent event) { - assert(event.filter == EVFILT_READ || event.filter == EVFILT_WRITE); -#else , reg(ep, this->fd.getfd(), 0, [this](uint32_t events) { assert((events & ~(EPOLLIN | EPOLLOUT | EPOLLRDHUP | EPOLLERR | EPOLLHUP)) == 0); -#endif bool is_destroyed = false; assert(destroyed == nullptr); destroyed = &is_destroyed; try { -#ifdef __APPLE__ - if ((event.filter == EVFILT_READ && event.flags & EV_EOF) || (event.filter == EVFILT_WRITE && event.flags & EV_EOF)) - { - this->on_disconnect(); - if (is_destroyed) - return; - } - if (event.filter == EVFILT_READ) - { - this->on_read_ready(); - if (is_destroyed) - return; - } - if (event.filter == EVFILT_WRITE) - { - this->on_write_ready(); - if (is_destroyed) - return; - } - -#else if ((events & EPOLLRDHUP) - || (events & EPOLLERR) - || (events & EPOLLHUP)) + || (events & EPOLLERR) + || (events & EPOLLHUP)) { this->on_disconnect(); if (is_destroyed) @@ -149,14 +110,12 @@ client_socket::impl::impl(sysapi::epoll &ep, file_descriptor fd, on_ready_t on_d if (is_destroyed) return; } -#endif } catch (...) { destroyed = nullptr; throw; } - destroyed = nullptr; }) , on_disconnect(std::move(on_disconnect)) @@ -194,11 +153,7 @@ size_t client_socket::read_some(void* data, size_t size) client_socket client_socket::connect(sysapi::epoll &ep, const ipv4_endpoint &remote, on_ready_t on_disconnect) { -#ifdef __APPLE__ - file_descriptor fd = make_socket(AF_INET, SOCK_STREAM, false); -#else file_descriptor fd = make_socket(AF_INET, SOCK_STREAM); -#endif connect_socket(fd.getfd(), remote.port_net, remote.addr_net); set_fd_flags(fd.getfd(), get_fd_flags(fd.getfd()) | O_NONBLOCK); client_socket res{ep, std::move(fd), std::move(on_disconnect)}; @@ -207,29 +162,16 @@ client_socket client_socket::connect(sysapi::epoll &ep, const ipv4_endpoint &rem void client_socket::update_registration() { -#ifdef __APPLE__ - - pimpl->reg.modify({EVFILT_READ, EVFILT_WRITE}); - -#else pimpl->reg.modify((pimpl->on_read_ready ? EPOLLIN : 0) | (pimpl->on_write_ready ? EPOLLOUT: 0) | EPOLLRDHUP); -#endif } server_socket::server_socket(epoll& ep, on_connected_t on_connected) -#ifdef __APPLE__ - : fd(make_socket(AF_INET, SOCK_STREAM, true)) - , on_connected(on_connected) - , reg(ep, fd.getfd(), {EVFILT_READ}, [this](struct kevent event) { - assert(event.filter & EVFILT_READ); -#else : fd(make_socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK)) , on_connected(on_connected) , reg(ep, fd.getfd(), EPOLLIN, [this](uint32_t events) { assert(events == EPOLLIN); -#endif this->on_connected(); }) { @@ -237,17 +179,10 @@ server_socket::server_socket(epoll& ep, on_connected_t on_connected) } server_socket::server_socket(epoll& ep, ipv4_endpoint local_endpoint, on_connected_t on_connected) -#ifdef __APPLE__ - : fd(make_socket(AF_INET, SOCK_STREAM, true)) - , on_connected(on_connected) - , reg(ep, fd.getfd(), {EVFILT_READ}, [this](struct kevent event) { - assert(event.filter & EVFILT_READ); -#else : fd(make_socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK)) , on_connected(on_connected) , reg(ep, fd.getfd(), EPOLLIN, [this](uint32_t events) { assert(events == EPOLLIN); -#endif this->on_connected(); }) { @@ -268,21 +203,31 @@ ipv4_endpoint server_socket::local_endpoint() const client_socket server_socket::accept(client_socket::on_ready_t on_disconnect) const { -#ifdef __APPLE__ - struct sockaddr addr; - socklen_t socklen = sizeof(addr); - int res = ::accept(fd.getfd(), &addr, &socklen); - if (res == -1) - throw_error(errno, "accept()"); - - const int set = 1; - ::setsockopt(res, SOL_SOCKET, SO_NOSIGPIPE, &set, sizeof(set)); // NOSIGPIPE FOR SEND - -#else int res = ::accept4(fd.getfd(), nullptr, nullptr, SOCK_NONBLOCK | SOCK_CLOEXEC); if (res == -1) throw_error(errno, "accept4()"); -#endif return client_socket{reg.get_epoll(), {res}, std::move(on_disconnect)}; } + +eventfd::eventfd(epoll& ep, bool semaphore, on_event_t on_event) + : fd(create_eventfd(semaphore)) + , on_event(on_event) + , reg(ep, fd.getfd(), 0, [this] (uint32_t events) { + assert((events & ~EPOLLIN) == 0); + uint64_t tmp; + read(this->fd.getfd(), &tmp, sizeof tmp); + this->on_event(); + }) +{} + +void eventfd::notify(uint64_t increment) +{ + write(fd, &increment, sizeof increment); +} + +void eventfd::set_on_event(eventfd::on_event_t on_event) +{ + this->on_event = on_event; + reg.modify(on_event ? EPOLLIN : 0); +} diff --git a/socket.h b/socket.h index 74b41f1..758cda9 100644 --- a/socket.h +++ b/socket.h @@ -3,11 +3,7 @@ #include "file_descriptor.h" #include "address.h" -#ifdef __APPLE__ -#include "kqueue.hpp" -#else #include "epoll.h" -#endif #include #include @@ -62,4 +58,18 @@ struct server_socket epoll_registration reg; }; +struct eventfd +{ + typedef std::function on_event_t; + + eventfd(epoll& ep, bool semaphore, on_event_t on_event); + void notify(uint64_t increment = 1); + void set_on_event(on_event_t on_event); + +private: + file_descriptor fd; + on_event_t on_event; + epoll_registration reg; +}; + #endif // SOCKET_H diff --git a/socket_apple.cpp b/socket_apple.cpp new file mode 100644 index 0000000..8b1b149 --- /dev/null +++ b/socket_apple.cpp @@ -0,0 +1,212 @@ +#include "socket_apple.h" + +#include +#include +#include +#include + +#include "throw_error.h" +#include +#include +#include "cap_write.h" +#include "cap_read.h" + +namespace +{ + int get_fd_flags(int fd); + void set_fd_flags(int fd, int flags); + + file_descriptor make_socket(int domain, int type, bool nonblock) + { + int fd = ::socket(domain, type, 0); + if (fd == -1) + throw_error(errno, "socket()"); + + if (nonblock) { + set_fd_flags(fd, get_fd_flags(fd) | O_NONBLOCK); + } + + return file_descriptor{fd}; + } + + void start_listen(int fd) + { + int res = ::listen(fd, SOMAXCONN); + if (res == -1) + throw_error(errno, "listen()"); + } + + void bind_socket(int fd, uint16_t port_net, uint32_t addr_net) + { + sockaddr_in saddr{}; + saddr.sin_family = AF_INET; + saddr.sin_port = port_net; + saddr.sin_addr.s_addr = addr_net; + int res = ::bind(fd, reinterpret_cast(&saddr), sizeof saddr); + if (res == -1) + throw_error(errno, "bind()"); + } + + void connect_socket(int fd, uint16_t port_net, uint32_t addr_net) + { + sockaddr_in saddr{}; + saddr.sin_family = AF_INET; + saddr.sin_port = port_net; + saddr.sin_addr.s_addr = addr_net; + + int res = ::connect(fd, reinterpret_cast(&saddr), sizeof saddr); + if (res == -1) + throw_error(errno, "connect()"); + } + + int get_fd_flags(int fd) + { + int res = fcntl(fd, F_GETFL, 0); + if (res == -1) + throw_error(errno, "fcntl(F_GETFL)"); + return res; + } + + void set_fd_flags(int fd, int flags) + { + int res = fcntl(fd, F_SETFL, flags); + if (res == -1) + throw_error(errno, "fcntl(F_SETFL)"); + } +} + +client_socket::client_socket(sysapi::epoll &ep, file_descriptor fd, on_ready_t on_disconnect) + : pimpl(new impl(ep, std::move(fd), std::move(on_disconnect))) +{} + +client_socket::impl::impl(sysapi::epoll &ep, file_descriptor fd, on_ready_t on_disconnect) + : ep(ep) + , fd(std::move(fd)) + , reg(ep, this->fd.getfd(), {EVFILT_READ}, [this](struct kevent event) { + assert(event.filter == EVFILT_READ || event.filter == EVFILT_WRITE); + bool is_destroyed = false; + assert(destroyed == nullptr); + destroyed = &is_destroyed; + try + { + if ((event.filter == EVFILT_READ && event.flags & EV_EOF) || (event.filter == EVFILT_WRITE && event.flags & EV_EOF)) + { + this->on_disconnect(); + if (is_destroyed) + return; + } + if (event.filter == EVFILT_READ) + { + this->on_read_ready(); + if (is_destroyed) + return; + } + if (event.filter == EVFILT_WRITE) + { + this->on_write_ready(); + if (is_destroyed) + return; + } + } + catch (...) + { + destroyed = nullptr; + throw; + } + + destroyed = nullptr; + }) + , on_disconnect(std::move(on_disconnect)) + , destroyed(nullptr) +{} + +client_socket::impl::~impl() +{ + if (destroyed) + *destroyed = true; +} + +void client_socket::set_on_read(on_ready_t on_ready) +{ + // TODO: not exception safe + pimpl->on_read_ready = on_ready; + update_registration(); +} + +void client_socket::set_on_write(client_socket::on_ready_t on_ready) +{ + pimpl->on_write_ready = on_ready; + update_registration(); +} + +size_t client_socket::write_some(const void *data, size_t size) +{ + return ::write_some(pimpl->fd, data, size); +} + +size_t client_socket::read_some(void* data, size_t size) +{ + return ::read_some(pimpl->fd, data, size); +} + +client_socket client_socket::connect(sysapi::epoll &ep, const ipv4_endpoint &remote, on_ready_t on_disconnect) +{ + file_descriptor fd = make_socket(AF_INET, SOCK_STREAM, false); + connect_socket(fd.getfd(), remote.port_net, remote.addr_net); + set_fd_flags(fd.getfd(), get_fd_flags(fd.getfd()) | O_NONBLOCK); + client_socket res{ep, std::move(fd), std::move(on_disconnect)}; + return res; +} + +void client_socket::update_registration() +{ + pimpl->reg.modify({EVFILT_READ, EVFILT_WRITE}); +} + +server_socket::server_socket(epoll& ep, on_connected_t on_connected) + : fd(make_socket(AF_INET, SOCK_STREAM, true)) + , on_connected(on_connected) + , reg(ep, fd.getfd(), {EVFILT_READ}, [this](struct kevent event) { + assert(event.filter & EVFILT_READ); + this->on_connected(); + }) +{ + start_listen(fd.getfd()); +} + +server_socket::server_socket(epoll& ep, ipv4_endpoint local_endpoint, on_connected_t on_connected) + : fd(make_socket(AF_INET, SOCK_STREAM, true)) + , on_connected(on_connected) + , reg(ep, fd.getfd(), {EVFILT_READ}, [this](struct kevent event) { + assert(event.filter & EVFILT_READ); + this->on_connected(); + }) +{ + bind_socket(fd.getfd(), local_endpoint.port_net, local_endpoint.addr_net); + start_listen(fd.getfd()); +} + +ipv4_endpoint server_socket::local_endpoint() const +{ + sockaddr_in saddr{}; + socklen_t saddr_len = sizeof saddr; + int res = ::getsockname(fd.getfd(), reinterpret_cast(&saddr), &saddr_len); + if (res == -1) + throw_error(errno, "getsockname()"); + assert(saddr_len == sizeof saddr); + return ipv4_endpoint{saddr.sin_port, saddr.sin_addr.s_addr}; +} + +client_socket server_socket::accept(client_socket::on_ready_t on_disconnect) const +{ + struct sockaddr addr; + socklen_t socklen = sizeof(addr); + int res = ::accept(fd.getfd(), &addr, &socklen); + if (res == -1) + throw_error(errno, "accept()"); + + const int set = 1; + ::setsockopt(res, SOL_SOCKET, SO_NOSIGPIPE, &set, sizeof(set)); // NOSIGPIPE FOR SEND + + return client_socket{reg.get_epoll(), {res}, std::move(on_disconnect)}; +} diff --git a/socket_apple.h b/socket_apple.h new file mode 100644 index 0000000..205db10 --- /dev/null +++ b/socket_apple.h @@ -0,0 +1,61 @@ +#ifndef SOCKET_APPLE_H +#define SOCKET_APPLE_H + +#include "file_descriptor.h" +#include "address.h" +#include "kqueue.hpp" +#include +#include + +struct client_socket +{ + typedef std::function on_ready_t; + + client_socket(epoll& ep, file_descriptor fd, on_ready_t on_disconnect); + + void set_on_read(on_ready_t on_ready); + void set_on_write(on_ready_t on_ready); + + size_t write_some(void const* data, size_t size); + size_t read_some(void* data, size_t size); + + static client_socket connect(epoll& ep, ipv4_endpoint const& remote, on_ready_t on_disconnect); + +private: + void update_registration(); + +private: + struct impl + { + impl(epoll& ep, file_descriptor fd, on_ready_t on_disconnect); + ~impl(); + + epoll& ep; + file_descriptor fd; + epoll_registration reg; + on_ready_t on_disconnect; + on_ready_t on_read_ready; + on_ready_t on_write_ready; + bool* destroyed; + }; + + std::unique_ptr pimpl; +}; + +struct server_socket +{ + typedef std::function on_connected_t; + + server_socket(epoll& ep, on_connected_t on_connected); + server_socket(epoll& ep, ipv4_endpoint local_endpoint, on_connected_t on_connected); + + ipv4_endpoint local_endpoint() const; + client_socket accept(client_socket::on_ready_t on_disconnect) const; + +private: + file_descriptor fd; + on_connected_t on_connected; + epoll_registration reg; +}; + +#endif // SOCKET_APPLE_H From dfa61d5ed544b95d50d10aae710e643948e40802 Mon Sep 17 00:00:00 2001 From: Dmitry Kurkin Date: Tue, 1 Dec 2015 11:48:54 +0300 Subject: [PATCH 12/15] correct CMakeList --- CMakeLists.txt | 2 +- echo_server.h | 4 ++++ echo_tester.h | 3 ++- socket_apple.cpp | 2 -- 4 files changed, 7 insertions(+), 4 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 060793a..d5cf799 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -11,7 +11,7 @@ IF(UNIX) cap_read.cpp cap_write.cpp kqueue.cpp - socket.cpp + socket_apple.cpp throw_error.cpp ) ELSE(APPLE) diff --git a/echo_server.h b/echo_server.h index 6cebefc..4fe2557 100644 --- a/echo_server.h +++ b/echo_server.h @@ -2,7 +2,11 @@ #define ECHO_SERVER_H #include +#ifdef __APPLE__ +#include "socket_apple.h" +#else #include "socket.h" +#endif struct echo_server { diff --git a/echo_tester.h b/echo_tester.h index f7dde49..142f4bb 100644 --- a/echo_tester.h +++ b/echo_tester.h @@ -3,11 +3,12 @@ #ifdef __APPLE__ #include "kqueue.hpp" +#include "socket_apple.h" #else #include "epoll.h" +#include "socket.h" #endif #include "address.h" -#include "socket.h" #include struct echo_tester diff --git a/socket_apple.cpp b/socket_apple.cpp index 8b1b149..40a5ddf 100644 --- a/socket_apple.cpp +++ b/socket_apple.cpp @@ -6,8 +6,6 @@ #include #include "throw_error.h" -#include -#include #include "cap_write.h" #include "cap_read.h" From 655925fc56423373d1f506a7d04763005dd9835e Mon Sep 17 00:00:00 2001 From: Dmitry Kurkin Date: Fri, 18 Dec 2015 02:40:31 +0300 Subject: [PATCH 13/15] Update file_descriptor.cpp --- file_descriptor.cpp | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/file_descriptor.cpp b/file_descriptor.cpp index 7a6fc95..da46791 100644 --- a/file_descriptor.cpp +++ b/file_descriptor.cpp @@ -121,7 +121,11 @@ size_t write_some(weak_file_descriptor fdc, void const* data, std::size_t size) int fd = fdc.getfd(); assert(fd != -1); +#ifdef __APPLE__ ssize_t res = ::send(fd, data, size, 0); +#else + ssize_t res = ::send(fd, data, size, MSG_NOSIGNAL); +#endif if (res == -1) { int err = errno; From 6da76a01f2ec3603ae0de2b7d8dfd3b8b8a799d5 Mon Sep 17 00:00:00 2001 From: Dmitry Kurkin Date: Fri, 18 Dec 2015 02:41:35 +0300 Subject: [PATCH 14/15] Update main_echo_server.cpp --- main_echo_server.cpp | 26 +++++++++++++------------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/main_echo_server.cpp b/main_echo_server.cpp index 14ea2d0..ec8cb1a 100644 --- a/main_echo_server.cpp +++ b/main_echo_server.cpp @@ -9,8 +9,8 @@ int main() { -// try -// { + try + { sysapi::epoll ep; echo_server echo_server(ep, ipv4_endpoint(0, ipv4_address::any())); @@ -18,17 +18,17 @@ int main() std::cout << "bound to " << echo_server_endpoint << std::endl; ep.run(); -// } -// catch (std::exception const& e) -// { -// std::cerr << "error: " << e.what() << std::endl; -// return EXIT_FAILURE; -// } -// catch (...) -// { -// std::cerr << "unknown exception in main" << std::endl; -// return EXIT_FAILURE; -// } + } + catch (std::exception const& e) + { + std::cerr << "error: " << e.what() << std::endl; + return EXIT_FAILURE; + } + catch (...) + { + std::cerr << "unknown exception in main" << std::endl; + return EXIT_FAILURE; + } return EXIT_SUCCESS; } From a5111efda6f51c1402d26ec125603918071bc50c Mon Sep 17 00:00:00 2001 From: Dmitry Kurkin Date: Fri, 18 Dec 2015 02:42:27 +0300 Subject: [PATCH 15/15] Update main_echo_test.cpp --- main_echo_test.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/main_echo_test.cpp b/main_echo_test.cpp index 6b275af..a1af5df 100644 --- a/main_echo_test.cpp +++ b/main_echo_test.cpp @@ -17,7 +17,7 @@ int main(int argc, char* argv[]) return 0; } - ipv4_endpoint endpoint(std::stod("52100"), ipv4_address("0.0.0.0")); + ipv4_endpoint endpoint(std::stod(argv[2]), ipv4_address(argv[1])); std::cout << endpoint << std::endl; sysapi::epoll tep; echo_tester tester{tep, endpoint};