LCOV - code coverage report
Current view: top level - src/detail/epoll - acceptors.cpp (source / functions) Coverage Total Hit
Test: coverage_filtered.info Lines: 79.9 % 219 175
Test Date: 2026-02-04 14:16:13 Functions: 95.0 % 20 19

            Line data    Source code
       1              : //
       2              : // Copyright (c) 2026 Steve Gerbino
       3              : //
       4              : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       5              : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       6              : //
       7              : // Official repository: https://github.com/cppalliance/corosio
       8              : //
       9              : 
      10              : #include <boost/corosio/detail/platform.hpp>
      11              : 
      12              : #if BOOST_COROSIO_HAS_EPOLL
      13              : 
      14              : #include "src/detail/epoll/acceptors.hpp"
      15              : #include "src/detail/epoll/sockets.hpp"
      16              : #include "src/detail/endpoint_convert.hpp"
      17              : #include "src/detail/make_err.hpp"
      18              : 
      19              : #include <errno.h>
      20              : #include <netinet/in.h>
      21              : #include <sys/epoll.h>
      22              : #include <sys/socket.h>
      23              : #include <unistd.h>
      24              : 
      25              : namespace boost::corosio::detail {
      26              : 
      27              : void
      28            6 : epoll_accept_op::
      29              : cancel() noexcept
      30              : {
      31            6 :     if (acceptor_impl_)
      32            6 :         acceptor_impl_->cancel_single_op(*this);
      33              :     else
      34            0 :         request_cancel();
      35            6 : }
      36              : 
      37              : void
      38         2651 : epoll_accept_op::
      39              : operator()()
      40              : {
      41         2651 :     stop_cb.reset();
      42              : 
      43         2651 :     bool success = (errn == 0 && !cancelled.load(std::memory_order_acquire));
      44              : 
      45         2651 :     if (ec_out)
      46              :     {
      47         2651 :         if (cancelled.load(std::memory_order_acquire))
      48            9 :             *ec_out = capy::error::canceled;
      49         2642 :         else if (errn != 0)
      50            0 :             *ec_out = make_err(errn);
      51              :         else
      52         2642 :             *ec_out = {};
      53              :     }
      54              : 
      55         2651 :     if (success && accepted_fd >= 0)
      56              :     {
      57         2642 :         if (acceptor_impl_)
      58              :         {
      59         2642 :             auto* socket_svc = static_cast<epoll_acceptor_impl*>(acceptor_impl_)
      60         2642 :                 ->service().socket_service();
      61         2642 :             if (socket_svc)
      62              :             {
      63         2642 :                 auto& impl = static_cast<epoll_socket_impl&>(socket_svc->create_impl());
      64         2642 :                 impl.set_socket(accepted_fd);
      65              : 
      66              :                 // Register accepted socket with epoll (edge-triggered mode)
      67         2642 :                 impl.desc_data_.fd = accepted_fd;
      68         2642 :                 impl.desc_data_.read_op.store(nullptr, std::memory_order_relaxed);
      69         2642 :                 impl.desc_data_.write_op.store(nullptr, std::memory_order_relaxed);
      70         2642 :                 impl.desc_data_.connect_op.store(nullptr, std::memory_order_relaxed);
      71         2642 :                 socket_svc->scheduler().register_descriptor(accepted_fd, &impl.desc_data_);
      72              : 
      73         2642 :                 sockaddr_in local_addr{};
      74         2642 :                 socklen_t local_len = sizeof(local_addr);
      75         2642 :                 sockaddr_in remote_addr{};
      76         2642 :                 socklen_t remote_len = sizeof(remote_addr);
      77              : 
      78         2642 :                 endpoint local_ep, remote_ep;
      79         2642 :                 if (::getsockname(accepted_fd, reinterpret_cast<sockaddr*>(&local_addr), &local_len) == 0)
      80         2642 :                     local_ep = from_sockaddr_in(local_addr);
      81         2642 :                 if (::getpeername(accepted_fd, reinterpret_cast<sockaddr*>(&remote_addr), &remote_len) == 0)
      82         2642 :                     remote_ep = from_sockaddr_in(remote_addr);
      83              : 
      84         2642 :                 impl.set_endpoints(local_ep, remote_ep);
      85              : 
      86         2642 :                 if (impl_out)
      87         2642 :                     *impl_out = &impl;
      88              : 
      89         2642 :                 accepted_fd = -1;
      90              :             }
      91              :             else
      92              :             {
      93            0 :                 if (ec_out && !*ec_out)
      94            0 :                     *ec_out = make_err(ENOENT);
      95            0 :                 ::close(accepted_fd);
      96            0 :                 accepted_fd = -1;
      97            0 :                 if (impl_out)
      98            0 :                     *impl_out = nullptr;
      99              :             }
     100              :         }
     101              :         else
     102              :         {
     103            0 :             ::close(accepted_fd);
     104            0 :             accepted_fd = -1;
     105            0 :             if (impl_out)
     106            0 :                 *impl_out = nullptr;
     107              :         }
     108         2642 :     }
     109              :     else
     110              :     {
     111            9 :         if (accepted_fd >= 0)
     112              :         {
     113            0 :             ::close(accepted_fd);
     114            0 :             accepted_fd = -1;
     115              :         }
     116              : 
     117            9 :         if (peer_impl)
     118              :         {
     119            0 :             peer_impl->release();
     120            0 :             peer_impl = nullptr;
     121              :         }
     122              : 
     123            9 :         if (impl_out)
     124            9 :             *impl_out = nullptr;
     125              :     }
     126              : 
     127              :     // Move to stack before resuming. See epoll_op::operator()() for rationale.
     128         2651 :     capy::executor_ref saved_ex( std::move( ex ) );
     129         2651 :     capy::coro saved_h( std::move( h ) );
     130         2651 :     auto prevent_premature_destruction = std::move(impl_ptr);
     131         2651 :     saved_ex.dispatch( saved_h );
     132         2651 : }
     133              : 
     134           66 : epoll_acceptor_impl::
     135           66 : epoll_acceptor_impl(epoll_acceptor_service& svc) noexcept
     136           66 :     : svc_(svc)
     137              : {
     138           66 : }
     139              : 
     140              : void
     141            0 : epoll_acceptor_impl::
     142              : update_epoll_events() noexcept
     143              : {
     144            0 :     svc_.scheduler().update_descriptor_events(fd_, &desc_data_, 0);
     145            0 : }
     146              : 
     147              : void
     148           66 : epoll_acceptor_impl::
     149              : release()
     150              : {
     151           66 :     close_socket();
     152           66 :     svc_.destroy_acceptor_impl(*this);
     153           66 : }
     154              : 
     155              : void
     156         2651 : epoll_acceptor_impl::
     157              : accept(
     158              :     std::coroutine_handle<> h,
     159              :     capy::executor_ref ex,
     160              :     std::stop_token token,
     161              :     std::error_code* ec,
     162              :     io_object::io_object_impl** impl_out)
     163              : {
     164         2651 :     auto& op = acc_;
     165         2651 :     op.reset();
     166         2651 :     op.h = h;
     167         2651 :     op.ex = ex;
     168         2651 :     op.ec_out = ec;
     169         2651 :     op.impl_out = impl_out;
     170         2651 :     op.fd = fd_;
     171         2651 :     op.start(token, this);
     172              : 
     173         2651 :     sockaddr_in addr{};
     174         2651 :     socklen_t addrlen = sizeof(addr);
     175         2651 :     int accepted = ::accept4(fd_, reinterpret_cast<sockaddr*>(&addr),
     176              :                              &addrlen, SOCK_NONBLOCK | SOCK_CLOEXEC);
     177              : 
     178         2651 :     if (accepted >= 0)
     179              :     {
     180            2 :         desc_data_.read_ready.store(false, std::memory_order_relaxed);
     181            2 :         op.accepted_fd = accepted;
     182            2 :         op.complete(0, 0);
     183            2 :         op.impl_ptr = shared_from_this();
     184            2 :         svc_.post(&op);
     185         2651 :         return;
     186              :     }
     187              : 
     188         2649 :     if (errno == EAGAIN || errno == EWOULDBLOCK)
     189              :     {
     190         2649 :         svc_.work_started();
     191         2649 :         op.impl_ptr = shared_from_this();
     192              : 
     193         2649 :         desc_data_.read_op.store(&op, std::memory_order_release);
     194              :         std::atomic_thread_fence(std::memory_order_seq_cst);
     195              : 
     196         2649 :         if (desc_data_.read_ready.exchange(false, std::memory_order_acquire))
     197              :         {
     198            0 :             auto* claimed = desc_data_.read_op.exchange(nullptr, std::memory_order_acq_rel);
     199            0 :             if (claimed)
     200              :             {
     201            0 :                 claimed->perform_io();
     202            0 :                 if (claimed->errn == EAGAIN || claimed->errn == EWOULDBLOCK)
     203              :                 {
     204            0 :                     claimed->errn = 0;
     205            0 :                     desc_data_.read_op.store(claimed, std::memory_order_release);
     206              :                 }
     207              :                 else
     208              :                 {
     209            0 :                     svc_.post(claimed);
     210            0 :                     svc_.work_finished();
     211              :                 }
     212            0 :                 return;
     213              :             }
     214              :         }
     215              : 
     216         2649 :         if (op.cancelled.load(std::memory_order_acquire))
     217              :         {
     218            0 :             auto* claimed = desc_data_.read_op.exchange(nullptr, std::memory_order_acq_rel);
     219            0 :             if (claimed)
     220              :             {
     221            0 :                 svc_.post(claimed);
     222            0 :                 svc_.work_finished();
     223              :             }
     224              :         }
     225         2649 :         return;
     226              :     }
     227              : 
     228            0 :     op.complete(errno, 0);
     229            0 :     op.impl_ptr = shared_from_this();
     230            0 :     svc_.post(&op);
     231              : }
     232              : 
     233              : void
     234          133 : epoll_acceptor_impl::
     235              : cancel() noexcept
     236              : {
     237          133 :     std::shared_ptr<epoll_acceptor_impl> self;
     238              :     try {
     239          133 :         self = shared_from_this();
     240            0 :     } catch (const std::bad_weak_ptr&) {
     241            0 :         return;
     242            0 :     }
     243              : 
     244          133 :     acc_.request_cancel();
     245              :     // Use atomic exchange - only one of cancellation or reactor will succeed
     246          133 :     auto* claimed = desc_data_.read_op.exchange(nullptr, std::memory_order_acq_rel);
     247          133 :     if (claimed == &acc_)
     248              :     {
     249            3 :         acc_.impl_ptr = self;
     250            3 :         svc_.post(&acc_);
     251            3 :         svc_.work_finished();
     252              :     }
     253          133 : }
     254              : 
     255              : void
     256            6 : epoll_acceptor_impl::
     257              : cancel_single_op(epoll_op& op) noexcept
     258              : {
     259            6 :     op.request_cancel();
     260              : 
     261              :     // Use atomic exchange - only one of cancellation or reactor will succeed
     262            6 :     auto* claimed = desc_data_.read_op.exchange(nullptr, std::memory_order_acq_rel);
     263            6 :     if (claimed == &op)
     264              :     {
     265              :         try {
     266            6 :             op.impl_ptr = shared_from_this();
     267            0 :         } catch (const std::bad_weak_ptr&) {}
     268            6 :         svc_.post(&op);
     269            6 :         svc_.work_finished();
     270              :     }
     271            6 : }
     272              : 
     273              : void
     274          132 : epoll_acceptor_impl::
     275              : close_socket() noexcept
     276              : {
     277          132 :     cancel();
     278              : 
     279          132 :     if (fd_ >= 0)
     280              :     {
     281           57 :         if (desc_data_.registered_events != 0)
     282           57 :             svc_.scheduler().deregister_descriptor(fd_);
     283           57 :         ::close(fd_);
     284           57 :         fd_ = -1;
     285              :     }
     286              : 
     287          132 :     desc_data_.fd = -1;
     288          132 :     desc_data_.is_registered = false;
     289          132 :     desc_data_.read_op.store(nullptr, std::memory_order_relaxed);
     290          132 :     desc_data_.read_ready.store(false, std::memory_order_relaxed);
     291          132 :     desc_data_.write_ready.store(false, std::memory_order_relaxed);
     292          132 :     desc_data_.registered_events = 0;
     293              : 
     294              :     // Clear cached endpoint
     295          132 :     local_endpoint_ = endpoint{};
     296          132 : }
     297              : 
     298          184 : epoll_acceptor_service::
     299          184 : epoll_acceptor_service(capy::execution_context& ctx)
     300          184 :     : ctx_(ctx)
     301          184 :     , state_(std::make_unique<epoll_acceptor_state>(ctx.use_service<epoll_scheduler>()))
     302              : {
     303          184 : }
     304              : 
     305          368 : epoll_acceptor_service::
     306          184 : ~epoll_acceptor_service()
     307              : {
     308          368 : }
     309              : 
     310              : void
     311          184 : epoll_acceptor_service::
     312              : shutdown()
     313              : {
     314          184 :     std::lock_guard lock(state_->mutex_);
     315              : 
     316          184 :     while (auto* impl = state_->acceptor_list_.pop_front())
     317            0 :         impl->close_socket();
     318              : 
     319          184 :     state_->acceptor_ptrs_.clear();
     320          184 : }
     321              : 
     322              : tcp_acceptor::acceptor_impl&
     323           66 : epoll_acceptor_service::
     324              : create_acceptor_impl()
     325              : {
     326           66 :     auto impl = std::make_shared<epoll_acceptor_impl>(*this);
     327           66 :     auto* raw = impl.get();
     328              : 
     329           66 :     std::lock_guard lock(state_->mutex_);
     330           66 :     state_->acceptor_list_.push_back(raw);
     331           66 :     state_->acceptor_ptrs_.emplace(raw, std::move(impl));
     332              : 
     333           66 :     return *raw;
     334           66 : }
     335              : 
     336              : void
     337           66 : epoll_acceptor_service::
     338              : destroy_acceptor_impl(tcp_acceptor::acceptor_impl& impl)
     339              : {
     340           66 :     auto* epoll_impl = static_cast<epoll_acceptor_impl*>(&impl);
     341           66 :     std::lock_guard lock(state_->mutex_);
     342           66 :     state_->acceptor_list_.remove(epoll_impl);
     343           66 :     state_->acceptor_ptrs_.erase(epoll_impl);
     344           66 : }
     345              : 
     346              : std::error_code
     347           66 : epoll_acceptor_service::
     348              : open_acceptor(
     349              :     tcp_acceptor::acceptor_impl& impl,
     350              :     endpoint ep,
     351              :     int backlog)
     352              : {
     353           66 :     auto* epoll_impl = static_cast<epoll_acceptor_impl*>(&impl);
     354           66 :     epoll_impl->close_socket();
     355              : 
     356           66 :     int fd = ::socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, 0);
     357           66 :     if (fd < 0)
     358            0 :         return make_err(errno);
     359              : 
     360           66 :     int reuse = 1;
     361           66 :     ::setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse));
     362              : 
     363           66 :     sockaddr_in addr = detail::to_sockaddr_in(ep);
     364           66 :     if (::bind(fd, reinterpret_cast<sockaddr*>(&addr), sizeof(addr)) < 0)
     365              :     {
     366            9 :         int errn = errno;
     367            9 :         ::close(fd);
     368            9 :         return make_err(errn);
     369              :     }
     370              : 
     371           57 :     if (::listen(fd, backlog) < 0)
     372              :     {
     373            0 :         int errn = errno;
     374            0 :         ::close(fd);
     375            0 :         return make_err(errn);
     376              :     }
     377              : 
     378           57 :     epoll_impl->fd_ = fd;
     379              : 
     380              :     // Register fd with epoll (edge-triggered mode)
     381           57 :     epoll_impl->desc_data_.fd = fd;
     382           57 :     epoll_impl->desc_data_.read_op.store(nullptr, std::memory_order_relaxed);
     383           57 :     scheduler().register_descriptor(fd, &epoll_impl->desc_data_);
     384              : 
     385              :     // Cache the local endpoint (queries OS for ephemeral port if port was 0)
     386           57 :     sockaddr_in local_addr{};
     387           57 :     socklen_t local_len = sizeof(local_addr);
     388           57 :     if (::getsockname(fd, reinterpret_cast<sockaddr*>(&local_addr), &local_len) == 0)
     389           57 :         epoll_impl->set_local_endpoint(detail::from_sockaddr_in(local_addr));
     390              : 
     391           57 :     return {};
     392              : }
     393              : 
     394              : void
     395           11 : epoll_acceptor_service::
     396              : post(epoll_op* op)
     397              : {
     398           11 :     state_->sched_.post(op);
     399           11 : }
     400              : 
     401              : void
     402         2649 : epoll_acceptor_service::
     403              : work_started() noexcept
     404              : {
     405         2649 :     state_->sched_.work_started();
     406         2649 : }
     407              : 
     408              : void
     409            9 : epoll_acceptor_service::
     410              : work_finished() noexcept
     411              : {
     412            9 :     state_->sched_.work_finished();
     413            9 : }
     414              : 
     415              : epoll_socket_service*
     416         2642 : epoll_acceptor_service::
     417              : socket_service() const noexcept
     418              : {
     419         2642 :     auto* svc = ctx_.find_service<detail::socket_service>();
     420         2642 :     return svc ? dynamic_cast<epoll_socket_service*>(svc) : nullptr;
     421              : }
     422              : 
     423              : } // namespace boost::corosio::detail
     424              : 
     425              : #endif // BOOST_COROSIO_HAS_EPOLL
        

Generated by: LCOV version 2.3