LCOV - code coverage report
Current view: top level - src/detail/epoll - op.hpp (source / functions) Coverage Total Hit
Test: coverage_filtered.info Lines: 84.7 % 111 94
Test Date: 2026-02-04 14:16:13 Functions: 84.2 % 19 16

            Line data    Source code
       1              : //
       2              : // Copyright (c) 2026 Steve Gerbino
       3              : //
       4              : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       5              : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       6              : //
       7              : // Official repository: https://github.com/cppalliance/corosio
       8              : //
       9              : 
      10              : #ifndef BOOST_COROSIO_DETAIL_EPOLL_OP_HPP
      11              : #define BOOST_COROSIO_DETAIL_EPOLL_OP_HPP
      12              : 
      13              : #include <boost/corosio/detail/platform.hpp>
      14              : 
      15              : #if BOOST_COROSIO_HAS_EPOLL
      16              : 
      17              : #include <boost/corosio/detail/config.hpp>
      18              : #include <boost/corosio/io_object.hpp>
      19              : #include <boost/corosio/endpoint.hpp>
      20              : #include <boost/capy/ex/executor_ref.hpp>
      21              : #include <boost/capy/coro.hpp>
      22              : #include <boost/capy/error.hpp>
      23              : #include <system_error>
      24              : 
      25              : #include "src/detail/make_err.hpp"
      26              : #include "src/detail/resume_coro.hpp"
      27              : #include "src/detail/scheduler_op.hpp"
      28              : #include "src/detail/endpoint_convert.hpp"
      29              : 
      30              : #include <unistd.h>
      31              : #include <errno.h>
      32              : 
      33              : #include <atomic>
      34              : #include <cstddef>
      35              : #include <memory>
      36              : #include <optional>
      37              : #include <stop_token>
      38              : 
      39              : #include <netinet/in.h>
      40              : #include <sys/socket.h>
      41              : #include <sys/uio.h>
      42              : 
      43              : /*
      44              :     epoll Operation State
      45              :     =====================
      46              : 
      47              :     Each async I/O operation has a corresponding epoll_op-derived struct that
      48              :     holds the operation's state while it's in flight. The socket impl owns
      49              :     fixed slots for each operation type (conn_, rd_, wr_), so only one
      50              :     operation of each type can be pending per socket at a time.
      51              : 
      52              :     Persistent Registration
      53              :     -----------------------
      54              :     File descriptors are registered with epoll once (via descriptor_data) and
      55              :     stay registered until closed. The descriptor_data tracks which operations
      56              :     are pending (read_op, write_op, connect_op). When an event arrives, the
      57              :     reactor dispatches to the appropriate pending operation.
      58              : 
      59              :     Impl Lifetime Management
      60              :     ------------------------
      61              :     When cancel() posts an op to the scheduler's ready queue, the socket impl
      62              :     might be destroyed before the scheduler processes the op. The `impl_ptr`
      63              :     member holds a shared_ptr to the impl, keeping it alive until the op
      64              :     completes. This is set by cancel() and cleared in operator() after the
      65              :     coroutine is resumed.
      66              : 
      67              :     EOF Detection
      68              :     -------------
      69              :     For reads, 0 bytes with no error means EOF. But an empty user buffer also
      70              :     returns 0 bytes. The `empty_buffer_read` flag distinguishes these cases.
      71              : 
      72              :     SIGPIPE Prevention
      73              :     ------------------
      74              :     Writes use sendmsg() with MSG_NOSIGNAL instead of writev() to prevent
      75              :     SIGPIPE when the peer has closed.
      76              : */
      77              : 
      78              : namespace boost::corosio::detail {
      79              : 
      80              : // Forward declarations
      81              : class epoll_socket_impl;
      82              : class epoll_acceptor_impl;
      83              : struct epoll_op;
      84              : 
      85              : /** Per-descriptor state for persistent epoll registration.
      86              : 
      87              :     Tracks pending operations for a file descriptor. The fd is registered
      88              :     once with epoll and stays registered until closed. Events are dispatched
      89              :     to the appropriate pending operation (EPOLLIN -> read_op, etc.).
      90              : 
      91              :     With edge-triggered epoll (EPOLLET), atomic operations are required to
      92              :     synchronize between operation registration and reactor event delivery.
      93              :     The read_ready/write_ready flags cache edge events that arrived before
      94              :     an operation was registered.
      95              : */
      96              : struct descriptor_data
      97              : {
      98              :     /// Currently registered events (EPOLLIN, EPOLLOUT, etc.)
      99              :     std::uint32_t registered_events = 0;
     100              : 
     101              :     /// Pending read operation (nullptr if none)
     102              :     std::atomic<epoll_op*> read_op{nullptr};
     103              : 
     104              :     /// Pending write operation (nullptr if none)
     105              :     std::atomic<epoll_op*> write_op{nullptr};
     106              : 
     107              :     /// Pending connect operation (nullptr if none)
     108              :     std::atomic<epoll_op*> connect_op{nullptr};
     109              : 
     110              :     /// Cached read readiness (edge event arrived before op registered)
     111              :     std::atomic<bool> read_ready{false};
     112              : 
     113              :     /// Cached write readiness (edge event arrived before op registered)
     114              :     std::atomic<bool> write_ready{false};
     115              : 
     116              :     /// The file descriptor
     117              :     int fd = -1;
     118              : 
     119              :     /// Whether this descriptor is managed by persistent registration
     120              :     bool is_registered = false;
     121              : };
     122              : 
     123              : struct epoll_op : scheduler_op
     124              : {
     125              :     struct canceller
     126              :     {
     127              :         epoll_op* op;
     128              :         void operator()() const noexcept;
     129              :     };
     130              : 
     131              :     capy::coro h;
     132              :     capy::executor_ref ex;
     133              :     std::error_code* ec_out = nullptr;
     134              :     std::size_t* bytes_out = nullptr;
     135              : 
     136              :     int fd = -1;
     137              :     int errn = 0;
     138              :     std::size_t bytes_transferred = 0;
     139              : 
     140              :     std::atomic<bool> cancelled{false};
     141              :     std::optional<std::stop_callback<canceller>> stop_cb;
     142              : 
     143              :     // Prevents use-after-free when socket is closed with pending ops.
     144              :     // See "Impl Lifetime Management" in file header.
     145              :     std::shared_ptr<void> impl_ptr;
     146              : 
     147              :     // For stop_token cancellation - pointer to owning socket/acceptor impl.
     148              :     // When stop is requested, we call back to the impl to perform actual I/O cancellation.
     149              :     epoll_socket_impl* socket_impl_ = nullptr;
     150              :     epoll_acceptor_impl* acceptor_impl_ = nullptr;
     151              : 
     152        15954 :     epoll_op()
     153        15954 :     {
     154        15954 :         data_ = this;
     155        15954 :     }
     156              : 
     157       117846 :     void reset() noexcept
     158              :     {
     159       117846 :         fd = -1;
     160       117846 :         errn = 0;
     161       117846 :         bytes_transferred = 0;
     162       117846 :         cancelled.store(false, std::memory_order_relaxed);
     163       117846 :         impl_ptr.reset();
     164       117846 :         socket_impl_ = nullptr;
     165       117846 :         acceptor_impl_ = nullptr;
     166       117846 :     }
     167              : 
     168       112552 :     void operator()() override
     169              :     {
     170       112552 :         stop_cb.reset();
     171              : 
     172       112552 :         if (ec_out)
     173              :         {
     174       112552 :             if (cancelled.load(std::memory_order_acquire))
     175          202 :                 *ec_out = capy::error::canceled;
     176       112350 :             else if (errn != 0)
     177            1 :                 *ec_out = make_err(errn);
     178       112349 :             else if (is_read_operation() && bytes_transferred == 0)
     179            5 :                 *ec_out = capy::error::eof;
     180              :             else
     181       112344 :                 *ec_out = {};
     182              :         }
     183              : 
     184       112552 :         if (bytes_out)
     185       112552 :             *bytes_out = bytes_transferred;
     186              : 
     187              :         // Move to stack before resuming coroutine. The coroutine might close
     188              :         // the socket, releasing the last wrapper ref. If impl_ptr were the
     189              :         // last ref and we destroyed it while still in operator(), we'd have
     190              :         // use-after-free. Moving to local ensures destruction happens at
     191              :         // function exit, after all member accesses are complete.
     192       112552 :         capy::executor_ref saved_ex( std::move( ex ) );
     193       112552 :         capy::coro saved_h( std::move( h ) );
     194       112552 :         auto prevent_premature_destruction = std::move(impl_ptr);
     195       112552 :         resume_coro(saved_ex, saved_h);
     196       112552 :     }
     197              : 
     198        56211 :     virtual bool is_read_operation() const noexcept { return false; }
     199              :     virtual void cancel() noexcept = 0;
     200              : 
     201            0 :     void destroy() override
     202              :     {
     203            0 :         stop_cb.reset();
     204            0 :         impl_ptr.reset();
     205            0 :     }
     206              : 
     207        24374 :     void request_cancel() noexcept
     208              :     {
     209        24374 :         cancelled.store(true, std::memory_order_release);
     210        24374 :     }
     211              : 
     212              :     void start(std::stop_token token)
     213              :     {
     214              :         cancelled.store(false, std::memory_order_release);
     215              :         stop_cb.reset();
     216              :         socket_impl_ = nullptr;
     217              :         acceptor_impl_ = nullptr;
     218              : 
     219              :         if (token.stop_possible())
     220              :             stop_cb.emplace(token, canceller{this});
     221              :     }
     222              : 
     223       115195 :     void start(std::stop_token token, epoll_socket_impl* impl)
     224              :     {
     225       115195 :         cancelled.store(false, std::memory_order_release);
     226       115195 :         stop_cb.reset();
     227       115195 :         socket_impl_ = impl;
     228       115195 :         acceptor_impl_ = nullptr;
     229              : 
     230       115195 :         if (token.stop_possible())
     231          104 :             stop_cb.emplace(token, canceller{this});
     232       115195 :     }
     233              : 
     234         2651 :     void start(std::stop_token token, epoll_acceptor_impl* impl)
     235              :     {
     236         2651 :         cancelled.store(false, std::memory_order_release);
     237         2651 :         stop_cb.reset();
     238         2651 :         socket_impl_ = nullptr;
     239         2651 :         acceptor_impl_ = impl;
     240              : 
     241         2651 :         if (token.stop_possible())
     242            9 :             stop_cb.emplace(token, canceller{this});
     243         2651 :     }
     244              : 
     245       117720 :     void complete(int err, std::size_t bytes) noexcept
     246              :     {
     247       117720 :         errn = err;
     248       117720 :         bytes_transferred = bytes;
     249       117720 :     }
     250              : 
     251            0 :     virtual void perform_io() noexcept {}
     252              : };
     253              : 
     254              : 
     255              : struct epoll_connect_op : epoll_op
     256              : {
     257              :     endpoint target_endpoint;
     258              : 
     259         2643 :     void reset() noexcept
     260              :     {
     261         2643 :         epoll_op::reset();
     262         2643 :         target_endpoint = endpoint{};
     263         2643 :     }
     264              : 
     265         2643 :     void perform_io() noexcept override
     266              :     {
     267              :         // connect() completion status is retrieved via SO_ERROR, not return value
     268         2643 :         int err = 0;
     269         2643 :         socklen_t len = sizeof(err);
     270         2643 :         if (::getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &len) < 0)
     271            0 :             err = errno;
     272         2643 :         complete(err, 0);
     273         2643 :     }
     274              : 
     275              :     // Defined in sockets.cpp where epoll_socket_impl is complete
     276              :     void operator()() override;
     277              :     void cancel() noexcept override;
     278              : };
     279              : 
     280              : 
     281              : struct epoll_read_op : epoll_op
     282              : {
     283              :     static constexpr std::size_t max_buffers = 16;
     284              :     iovec iovecs[max_buffers];
     285              :     int iovec_count = 0;
     286              :     bool empty_buffer_read = false;
     287              : 
     288        56138 :     bool is_read_operation() const noexcept override
     289              :     {
     290        56138 :         return !empty_buffer_read;
     291              :     }
     292              : 
     293        56336 :     void reset() noexcept
     294              :     {
     295        56336 :         epoll_op::reset();
     296        56336 :         iovec_count = 0;
     297        56336 :         empty_buffer_read = false;
     298        56336 :     }
     299              : 
     300           51 :     void perform_io() noexcept override
     301              :     {
     302           51 :         ssize_t n = ::readv(fd, iovecs, iovec_count);
     303           51 :         if (n >= 0)
     304           51 :             complete(0, static_cast<std::size_t>(n));
     305              :         else
     306            0 :             complete(errno, 0);
     307           51 :     }
     308              : 
     309              :     void cancel() noexcept override;
     310              : };
     311              : 
     312              : 
     313              : struct epoll_write_op : epoll_op
     314              : {
     315              :     static constexpr std::size_t max_buffers = 16;
     316              :     iovec iovecs[max_buffers];
     317              :     int iovec_count = 0;
     318              : 
     319        56216 :     void reset() noexcept
     320              :     {
     321        56216 :         epoll_op::reset();
     322        56216 :         iovec_count = 0;
     323        56216 :     }
     324              : 
     325            0 :     void perform_io() noexcept override
     326              :     {
     327            0 :         msghdr msg{};
     328            0 :         msg.msg_iov = iovecs;
     329            0 :         msg.msg_iovlen = static_cast<std::size_t>(iovec_count);
     330              : 
     331            0 :         ssize_t n = ::sendmsg(fd, &msg, MSG_NOSIGNAL);
     332            0 :         if (n >= 0)
     333            0 :             complete(0, static_cast<std::size_t>(n));
     334              :         else
     335            0 :             complete(errno, 0);
     336            0 :     }
     337              : 
     338              :     void cancel() noexcept override;
     339              : };
     340              : 
     341              : 
     342              : struct epoll_accept_op : epoll_op
     343              : {
     344              :     int accepted_fd = -1;
     345              :     io_object::io_object_impl* peer_impl = nullptr;
     346              :     io_object::io_object_impl** impl_out = nullptr;
     347              : 
     348         2651 :     void reset() noexcept
     349              :     {
     350         2651 :         epoll_op::reset();
     351         2651 :         accepted_fd = -1;
     352         2651 :         peer_impl = nullptr;
     353         2651 :         impl_out = nullptr;
     354         2651 :     }
     355              : 
     356         2640 :     void perform_io() noexcept override
     357              :     {
     358         2640 :         sockaddr_in addr{};
     359         2640 :         socklen_t addrlen = sizeof(addr);
     360         2640 :         int new_fd = ::accept4(fd, reinterpret_cast<sockaddr*>(&addr),
     361              :                                &addrlen, SOCK_NONBLOCK | SOCK_CLOEXEC);
     362              : 
     363         2640 :         if (new_fd >= 0)
     364              :         {
     365         2640 :             accepted_fd = new_fd;
     366         2640 :             complete(0, 0);
     367              :         }
     368              :         else
     369              :         {
     370            0 :             complete(errno, 0);
     371              :         }
     372         2640 :     }
     373              : 
     374              :     // Defined in acceptors.cpp where epoll_acceptor_impl is complete
     375              :     void operator()() override;
     376              :     void cancel() noexcept override;
     377              : };
     378              : 
     379              : } // namespace boost::corosio::detail
     380              : 
     381              : #endif // BOOST_COROSIO_HAS_EPOLL
     382              : 
     383              : #endif // BOOST_COROSIO_DETAIL_EPOLL_OP_HPP
        

Generated by: LCOV version 2.3