LCOV - code coverage report
Current view: top level - src/detail/select - sockets.cpp (source / functions) Coverage Total Hit
Test: coverage_filtered.info Lines: 73.7 % 372 274
Test Date: 2026-02-04 14:16:13 Functions: 94.3 % 35 33

            Line data    Source code
       1              : //
       2              : // Copyright (c) 2026 Steve Gerbino
       3              : //
       4              : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       5              : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       6              : //
       7              : // Official repository: https://github.com/cppalliance/corosio
       8              : //
       9              : 
      10              : #include <boost/corosio/detail/platform.hpp>
      11              : 
      12              : #if BOOST_COROSIO_HAS_SELECT
      13              : 
      14              : #include "src/detail/select/sockets.hpp"
      15              : #include "src/detail/endpoint_convert.hpp"
      16              : #include "src/detail/make_err.hpp"
      17              : 
      18              : #include <boost/capy/buffers.hpp>
      19              : 
      20              : #include <errno.h>
      21              : #include <fcntl.h>
      22              : #include <netinet/in.h>
      23              : #include <netinet/tcp.h>
      24              : #include <sys/socket.h>
      25              : #include <unistd.h>
      26              : 
      27              : namespace boost::corosio::detail {
      28              : 
      29              : void
      30           80 : select_op::canceller::
      31              : operator()() const noexcept
      32              : {
      33           80 :     op->cancel();
      34           80 : }
      35              : 
      36              : void
      37            0 : select_connect_op::
      38              : cancel() noexcept
      39              : {
      40            0 :     if (socket_impl_)
      41            0 :         socket_impl_->cancel_single_op(*this);
      42              :     else
      43            0 :         request_cancel();
      44            0 : }
      45              : 
      46              : void
      47           80 : select_read_op::
      48              : cancel() noexcept
      49              : {
      50           80 :     if (socket_impl_)
      51           80 :         socket_impl_->cancel_single_op(*this);
      52              :     else
      53            0 :         request_cancel();
      54           80 : }
      55              : 
      56              : void
      57            0 : select_write_op::
      58              : cancel() noexcept
      59              : {
      60            0 :     if (socket_impl_)
      61            0 :         socket_impl_->cancel_single_op(*this);
      62              :     else
      63            0 :         request_cancel();
      64            0 : }
      65              : 
      66              : void
      67         2073 : select_connect_op::
      68              : operator()()
      69              : {
      70         2073 :     stop_cb.reset();
      71              : 
      72         2073 :     bool success = (errn == 0 && !cancelled.load(std::memory_order_acquire));
      73              : 
      74              :     // Cache endpoints on successful connect
      75         2073 :     if (success && socket_impl_)
      76              :     {
      77              :         // Query local endpoint via getsockname (may fail, but remote is always known)
      78         2071 :         endpoint local_ep;
      79         2071 :         sockaddr_in local_addr{};
      80         2071 :         socklen_t local_len = sizeof(local_addr);
      81         2071 :         if (::getsockname(fd, reinterpret_cast<sockaddr*>(&local_addr), &local_len) == 0)
      82         2071 :             local_ep = from_sockaddr_in(local_addr);
      83              :         // Always cache remote endpoint; local may be default if getsockname failed
      84         2071 :         static_cast<select_socket_impl*>(socket_impl_)->set_endpoints(local_ep, target_endpoint);
      85              :     }
      86              : 
      87         2073 :     if (ec_out)
      88              :     {
      89         2073 :         if (cancelled.load(std::memory_order_acquire))
      90            0 :             *ec_out = capy::error::canceled;
      91         2073 :         else if (errn != 0)
      92            2 :             *ec_out = make_err(errn);
      93              :         else
      94         2071 :             *ec_out = {};
      95              :     }
      96              : 
      97         2073 :     if (bytes_out)
      98            0 :         *bytes_out = bytes_transferred;
      99              : 
     100              :     // Move to stack before destroying the frame
     101         2073 :     capy::executor_ref saved_ex( std::move( ex ) );
     102         2073 :     capy::coro saved_h( std::move( h ) );
     103         2073 :     impl_ptr.reset();
     104         2073 :     saved_ex.dispatch( saved_h );
     105         2073 : }
     106              : 
     107         4155 : select_socket_impl::
     108         4155 : select_socket_impl(select_socket_service& svc) noexcept
     109         4155 :     : svc_(svc)
     110              : {
     111         4155 : }
     112              : 
     113              : void
     114         4155 : select_socket_impl::
     115              : release()
     116              : {
     117         4155 :     close_socket();
     118         4155 :     svc_.destroy_impl(*this);
     119         4155 : }
     120              : 
     121              : void
     122         2073 : select_socket_impl::
     123              : connect(
     124              :     std::coroutine_handle<> h,
     125              :     capy::executor_ref ex,
     126              :     endpoint ep,
     127              :     std::stop_token token,
     128              :     std::error_code* ec)
     129              : {
     130         2073 :     auto& op = conn_;
     131         2073 :     op.reset();
     132         2073 :     op.h = h;
     133         2073 :     op.ex = ex;
     134         2073 :     op.ec_out = ec;
     135         2073 :     op.fd = fd_;
     136         2073 :     op.target_endpoint = ep;  // Store target for endpoint caching
     137         2073 :     op.start(token, this);
     138              : 
     139         2073 :     sockaddr_in addr = detail::to_sockaddr_in(ep);
     140         2073 :     int result = ::connect(fd_, reinterpret_cast<sockaddr*>(&addr), sizeof(addr));
     141              : 
     142         2073 :     if (result == 0)
     143              :     {
     144              :         // Sync success - cache endpoints immediately
     145            0 :         sockaddr_in local_addr{};
     146            0 :         socklen_t local_len = sizeof(local_addr);
     147            0 :         if (::getsockname(fd_, reinterpret_cast<sockaddr*>(&local_addr), &local_len) == 0)
     148            0 :             local_endpoint_ = detail::from_sockaddr_in(local_addr);
     149            0 :         remote_endpoint_ = ep;
     150              : 
     151            0 :         op.complete(0, 0);
     152            0 :         op.impl_ptr = shared_from_this();
     153            0 :         svc_.post(&op);
     154            0 :         return;
     155              :     }
     156              : 
     157         2073 :     if (errno == EINPROGRESS)
     158              :     {
     159         2073 :         svc_.work_started();
     160         2073 :         op.impl_ptr = shared_from_this();
     161              : 
     162              :         // Set registering BEFORE register_fd to close the race window where
     163              :         // reactor sees an event before we set registered. The reactor treats
     164              :         // registering the same as registered when claiming the op.
     165         2073 :         op.registered.store(select_registration_state::registering, std::memory_order_release);
     166         2073 :         svc_.scheduler().register_fd(fd_, &op, select_scheduler::event_write);
     167              : 
     168              :         // Transition to registered. If this fails, reactor or cancel already
     169              :         // claimed the op (state is now unregistered), so we're done. However,
     170              :         // we must still deregister the fd because cancel's deregister_fd may
     171              :         // have run before our register_fd, leaving the fd orphaned.
     172         2073 :         auto expected = select_registration_state::registering;
     173         2073 :         if (!op.registered.compare_exchange_strong(
     174              :                 expected, select_registration_state::registered, std::memory_order_acq_rel))
     175              :         {
     176            0 :             svc_.scheduler().deregister_fd(fd_, select_scheduler::event_write);
     177            0 :             return;
     178              :         }
     179              : 
     180              :         // If cancelled was set before we registered, handle it now.
     181         2073 :         if (op.cancelled.load(std::memory_order_acquire))
     182              :         {
     183            0 :             auto prev = op.registered.exchange(
     184              :                 select_registration_state::unregistered, std::memory_order_acq_rel);
     185            0 :             if (prev != select_registration_state::unregistered)
     186              :             {
     187            0 :                 svc_.scheduler().deregister_fd(fd_, select_scheduler::event_write);
     188            0 :                 op.impl_ptr = shared_from_this();
     189            0 :                 svc_.post(&op);
     190            0 :                 svc_.work_finished();
     191              :             }
     192              :         }
     193         2073 :         return;
     194              :     }
     195              : 
     196            0 :     op.complete(errno, 0);
     197            0 :     op.impl_ptr = shared_from_this();
     198            0 :     svc_.post(&op);
     199              : }
     200              : 
     201              : std::coroutine_handle<>
     202        81846 : select_socket_impl::
     203              : read_some(
     204              :     std::coroutine_handle<> h,
     205              :     capy::executor_ref ex,
     206              :     io_buffer_param param,
     207              :     std::stop_token token,
     208              :     std::error_code* ec,
     209              :     std::size_t* bytes_out)
     210              : {
     211        81846 :     auto& op = rd_;
     212        81846 :     op.reset();
     213        81846 :     op.h = h;
     214        81846 :     op.ex = ex;
     215        81846 :     op.ec_out = ec;
     216        81846 :     op.bytes_out = bytes_out;
     217        81846 :     op.fd = fd_;
     218        81846 :     op.start(token, this);
     219              : 
     220        81846 :     capy::mutable_buffer bufs[select_read_op::max_buffers];
     221        81846 :     op.iovec_count = static_cast<int>(param.copy_to(bufs, select_read_op::max_buffers));
     222              : 
     223        81846 :     if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
     224              :     {
     225            1 :         op.empty_buffer_read = true;
     226            1 :         op.complete(0, 0);
     227            1 :         op.impl_ptr = shared_from_this();
     228            1 :         svc_.post(&op);
     229            1 :         return std::noop_coroutine();
     230              :     }
     231              : 
     232       163690 :     for (int i = 0; i < op.iovec_count; ++i)
     233              :     {
     234        81845 :         op.iovecs[i].iov_base = bufs[i].data();
     235        81845 :         op.iovecs[i].iov_len = bufs[i].size();
     236              :     }
     237              : 
     238        81845 :     ssize_t n = ::readv(fd_, op.iovecs, op.iovec_count);
     239              : 
     240        81845 :     if (n > 0)
     241              :     {
     242        81689 :         op.complete(0, static_cast<std::size_t>(n));
     243        81689 :         op.impl_ptr = shared_from_this();
     244        81689 :         svc_.post(&op);
     245        81689 :         return std::noop_coroutine();
     246              :     }
     247              : 
     248          156 :     if (n == 0)
     249              :     {
     250            5 :         op.complete(0, 0);
     251            5 :         op.impl_ptr = shared_from_this();
     252            5 :         svc_.post(&op);
     253            5 :         return std::noop_coroutine();
     254              :     }
     255              : 
     256          151 :     if (errno == EAGAIN || errno == EWOULDBLOCK)
     257              :     {
     258          151 :         svc_.work_started();
     259          151 :         op.impl_ptr = shared_from_this();
     260              : 
     261              :         // Set registering BEFORE register_fd to close the race window where
     262              :         // reactor sees an event before we set registered.
     263          151 :         op.registered.store(select_registration_state::registering, std::memory_order_release);
     264          151 :         svc_.scheduler().register_fd(fd_, &op, select_scheduler::event_read);
     265              : 
     266              :         // Transition to registered. If this fails, reactor or cancel already
     267              :         // claimed the op (state is now unregistered), so we're done. However,
     268              :         // we must still deregister the fd because cancel's deregister_fd may
     269              :         // have run before our register_fd, leaving the fd orphaned.
     270          151 :         auto expected = select_registration_state::registering;
     271          151 :         if (!op.registered.compare_exchange_strong(
     272              :                 expected, select_registration_state::registered, std::memory_order_acq_rel))
     273              :         {
     274            0 :             svc_.scheduler().deregister_fd(fd_, select_scheduler::event_read);
     275            0 :             return std::noop_coroutine();
     276              :         }
     277              : 
     278              :         // If cancelled was set before we registered, handle it now.
     279          151 :         if (op.cancelled.load(std::memory_order_acquire))
     280              :         {
     281            0 :             auto prev = op.registered.exchange(
     282              :                 select_registration_state::unregistered, std::memory_order_acq_rel);
     283            0 :             if (prev != select_registration_state::unregistered)
     284              :             {
     285            0 :                 svc_.scheduler().deregister_fd(fd_, select_scheduler::event_read);
     286            0 :                 op.impl_ptr = shared_from_this();
     287            0 :                 svc_.post(&op);
     288            0 :                 svc_.work_finished();
     289              :             }
     290              :         }
     291          151 :         return std::noop_coroutine();
     292              :     }
     293              : 
     294            0 :     op.complete(errno, 0);
     295            0 :     op.impl_ptr = shared_from_this();
     296            0 :     svc_.post(&op);
     297            0 :     return std::noop_coroutine();
     298              : }
     299              : 
     300              : std::coroutine_handle<>
     301        81738 : select_socket_impl::
     302              : write_some(
     303              :     std::coroutine_handle<> h,
     304              :     capy::executor_ref ex,
     305              :     io_buffer_param param,
     306              :     std::stop_token token,
     307              :     std::error_code* ec,
     308              :     std::size_t* bytes_out)
     309              : {
     310        81738 :     auto& op = wr_;
     311        81738 :     op.reset();
     312        81738 :     op.h = h;
     313        81738 :     op.ex = ex;
     314        81738 :     op.ec_out = ec;
     315        81738 :     op.bytes_out = bytes_out;
     316        81738 :     op.fd = fd_;
     317        81738 :     op.start(token, this);
     318              : 
     319        81738 :     capy::mutable_buffer bufs[select_write_op::max_buffers];
     320        81738 :     op.iovec_count = static_cast<int>(param.copy_to(bufs, select_write_op::max_buffers));
     321              : 
     322        81738 :     if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
     323              :     {
     324            1 :         op.complete(0, 0);
     325            1 :         op.impl_ptr = shared_from_this();
     326            1 :         svc_.post(&op);
     327            1 :         return std::noop_coroutine();
     328              :     }
     329              : 
     330       163474 :     for (int i = 0; i < op.iovec_count; ++i)
     331              :     {
     332        81737 :         op.iovecs[i].iov_base = bufs[i].data();
     333        81737 :         op.iovecs[i].iov_len = bufs[i].size();
     334              :     }
     335              : 
     336        81737 :     msghdr msg{};
     337        81737 :     msg.msg_iov = op.iovecs;
     338        81737 :     msg.msg_iovlen = static_cast<std::size_t>(op.iovec_count);
     339              : 
     340        81737 :     ssize_t n = ::sendmsg(fd_, &msg, MSG_NOSIGNAL);
     341              : 
     342        81737 :     if (n > 0)
     343              :     {
     344        81736 :         op.complete(0, static_cast<std::size_t>(n));
     345        81736 :         op.impl_ptr = shared_from_this();
     346        81736 :         svc_.post(&op);
     347        81736 :         return std::noop_coroutine();
     348              :     }
     349              : 
     350            1 :     if (errno == EAGAIN || errno == EWOULDBLOCK)
     351              :     {
     352            0 :         svc_.work_started();
     353            0 :         op.impl_ptr = shared_from_this();
     354              : 
     355              :         // Set registering BEFORE register_fd to close the race window where
     356              :         // reactor sees an event before we set registered.
     357            0 :         op.registered.store(select_registration_state::registering, std::memory_order_release);
     358            0 :         svc_.scheduler().register_fd(fd_, &op, select_scheduler::event_write);
     359              : 
     360              :         // Transition to registered. If this fails, reactor or cancel already
     361              :         // claimed the op (state is now unregistered), so we're done. However,
     362              :         // we must still deregister the fd because cancel's deregister_fd may
     363              :         // have run before our register_fd, leaving the fd orphaned.
     364            0 :         auto expected = select_registration_state::registering;
     365            0 :         if (!op.registered.compare_exchange_strong(
     366              :                 expected, select_registration_state::registered, std::memory_order_acq_rel))
     367              :         {
     368            0 :             svc_.scheduler().deregister_fd(fd_, select_scheduler::event_write);
     369            0 :             return std::noop_coroutine();
     370              :         }
     371              : 
     372              :         // If cancelled was set before we registered, handle it now.
     373            0 :         if (op.cancelled.load(std::memory_order_acquire))
     374              :         {
     375            0 :             auto prev = op.registered.exchange(
     376              :                 select_registration_state::unregistered, std::memory_order_acq_rel);
     377            0 :             if (prev != select_registration_state::unregistered)
     378              :             {
     379            0 :                 svc_.scheduler().deregister_fd(fd_, select_scheduler::event_write);
     380            0 :                 op.impl_ptr = shared_from_this();
     381            0 :                 svc_.post(&op);
     382            0 :                 svc_.work_finished();
     383              :             }
     384              :         }
     385            0 :         return std::noop_coroutine();
     386              :     }
     387              : 
     388            1 :     op.complete(errno ? errno : EIO, 0);
     389            1 :     op.impl_ptr = shared_from_this();
     390            1 :     svc_.post(&op);
     391            1 :     return std::noop_coroutine();
     392              : }
     393              : 
     394              : std::error_code
     395            3 : select_socket_impl::
     396              : shutdown(tcp_socket::shutdown_type what) noexcept
     397              : {
     398              :     int how;
     399            3 :     switch (what)
     400              :     {
     401            1 :     case tcp_socket::shutdown_receive: how = SHUT_RD;   break;
     402            1 :     case tcp_socket::shutdown_send:    how = SHUT_WR;   break;
     403            1 :     case tcp_socket::shutdown_both:    how = SHUT_RDWR; break;
     404            0 :     default:
     405            0 :         return make_err(EINVAL);
     406              :     }
     407            3 :     if (::shutdown(fd_, how) != 0)
     408            0 :         return make_err(errno);
     409            3 :     return {};
     410              : }
     411              : 
     412              : std::error_code
     413            5 : select_socket_impl::
     414              : set_no_delay(bool value) noexcept
     415              : {
     416            5 :     int flag = value ? 1 : 0;
     417            5 :     if (::setsockopt(fd_, IPPROTO_TCP, TCP_NODELAY, &flag, sizeof(flag)) != 0)
     418            0 :         return make_err(errno);
     419            5 :     return {};
     420              : }
     421              : 
     422              : bool
     423            5 : select_socket_impl::
     424              : no_delay(std::error_code& ec) const noexcept
     425              : {
     426            5 :     int flag = 0;
     427            5 :     socklen_t len = sizeof(flag);
     428            5 :     if (::getsockopt(fd_, IPPROTO_TCP, TCP_NODELAY, &flag, &len) != 0)
     429              :     {
     430            0 :         ec = make_err(errno);
     431            0 :         return false;
     432              :     }
     433            5 :     ec = {};
     434            5 :     return flag != 0;
     435              : }
     436              : 
     437              : std::error_code
     438            4 : select_socket_impl::
     439              : set_keep_alive(bool value) noexcept
     440              : {
     441            4 :     int flag = value ? 1 : 0;
     442            4 :     if (::setsockopt(fd_, SOL_SOCKET, SO_KEEPALIVE, &flag, sizeof(flag)) != 0)
     443            0 :         return make_err(errno);
     444            4 :     return {};
     445              : }
     446              : 
     447              : bool
     448            4 : select_socket_impl::
     449              : keep_alive(std::error_code& ec) const noexcept
     450              : {
     451            4 :     int flag = 0;
     452            4 :     socklen_t len = sizeof(flag);
     453            4 :     if (::getsockopt(fd_, SOL_SOCKET, SO_KEEPALIVE, &flag, &len) != 0)
     454              :     {
     455            0 :         ec = make_err(errno);
     456            0 :         return false;
     457              :     }
     458            4 :     ec = {};
     459            4 :     return flag != 0;
     460              : }
     461              : 
     462              : std::error_code
     463            1 : select_socket_impl::
     464              : set_receive_buffer_size(int size) noexcept
     465              : {
     466            1 :     if (::setsockopt(fd_, SOL_SOCKET, SO_RCVBUF, &size, sizeof(size)) != 0)
     467            0 :         return make_err(errno);
     468            1 :     return {};
     469              : }
     470              : 
     471              : int
     472            3 : select_socket_impl::
     473              : receive_buffer_size(std::error_code& ec) const noexcept
     474              : {
     475            3 :     int size = 0;
     476            3 :     socklen_t len = sizeof(size);
     477            3 :     if (::getsockopt(fd_, SOL_SOCKET, SO_RCVBUF, &size, &len) != 0)
     478              :     {
     479            0 :         ec = make_err(errno);
     480            0 :         return 0;
     481              :     }
     482            3 :     ec = {};
     483            3 :     return size;
     484              : }
     485              : 
     486              : std::error_code
     487            1 : select_socket_impl::
     488              : set_send_buffer_size(int size) noexcept
     489              : {
     490            1 :     if (::setsockopt(fd_, SOL_SOCKET, SO_SNDBUF, &size, sizeof(size)) != 0)
     491            0 :         return make_err(errno);
     492            1 :     return {};
     493              : }
     494              : 
     495              : int
     496            3 : select_socket_impl::
     497              : send_buffer_size(std::error_code& ec) const noexcept
     498              : {
     499            3 :     int size = 0;
     500            3 :     socklen_t len = sizeof(size);
     501            3 :     if (::getsockopt(fd_, SOL_SOCKET, SO_SNDBUF, &size, &len) != 0)
     502              :     {
     503            0 :         ec = make_err(errno);
     504            0 :         return 0;
     505              :     }
     506            3 :     ec = {};
     507            3 :     return size;
     508              : }
     509              : 
     510              : std::error_code
     511            4 : select_socket_impl::
     512              : set_linger(bool enabled, int timeout) noexcept
     513              : {
     514            4 :     if (timeout < 0)
     515            1 :         return make_err(EINVAL);
     516              :     struct ::linger lg;
     517            3 :     lg.l_onoff = enabled ? 1 : 0;
     518            3 :     lg.l_linger = timeout;
     519            3 :     if (::setsockopt(fd_, SOL_SOCKET, SO_LINGER, &lg, sizeof(lg)) != 0)
     520            0 :         return make_err(errno);
     521            3 :     return {};
     522              : }
     523              : 
     524              : tcp_socket::linger_options
     525            3 : select_socket_impl::
     526              : linger(std::error_code& ec) const noexcept
     527              : {
     528            3 :     struct ::linger lg{};
     529            3 :     socklen_t len = sizeof(lg);
     530            3 :     if (::getsockopt(fd_, SOL_SOCKET, SO_LINGER, &lg, &len) != 0)
     531              :     {
     532            0 :         ec = make_err(errno);
     533            0 :         return {};
     534              :     }
     535            3 :     ec = {};
     536            3 :     return {.enabled = lg.l_onoff != 0, .timeout = lg.l_linger};
     537              : }
     538              : 
     539              : void
     540         6333 : select_socket_impl::
     541              : cancel() noexcept
     542              : {
     543         6333 :     std::shared_ptr<select_socket_impl> self;
     544              :     try {
     545         6333 :         self = shared_from_this();
     546            0 :     } catch (const std::bad_weak_ptr&) {
     547            0 :         return;
     548            0 :     }
     549              : 
     550        18999 :     auto cancel_op = [this, &self](select_op& op, int events) {
     551        18999 :         auto prev = op.registered.exchange(
     552              :             select_registration_state::unregistered, std::memory_order_acq_rel);
     553        18999 :         op.request_cancel();
     554        18999 :         if (prev != select_registration_state::unregistered)
     555              :         {
     556           50 :             svc_.scheduler().deregister_fd(fd_, events);
     557           50 :             op.impl_ptr = self;
     558           50 :             svc_.post(&op);
     559           50 :             svc_.work_finished();
     560              :         }
     561        25332 :     };
     562              : 
     563         6333 :     cancel_op(conn_, select_scheduler::event_write);
     564         6333 :     cancel_op(rd_, select_scheduler::event_read);
     565         6333 :     cancel_op(wr_, select_scheduler::event_write);
     566         6333 : }
     567              : 
     568              : void
     569           80 : select_socket_impl::
     570              : cancel_single_op(select_op& op) noexcept
     571              : {
     572              :     // Called from stop_token callback to cancel a specific pending operation.
     573           80 :     auto prev = op.registered.exchange(
     574              :         select_registration_state::unregistered, std::memory_order_acq_rel);
     575           80 :     op.request_cancel();
     576              : 
     577           80 :     if (prev != select_registration_state::unregistered)
     578              :     {
     579              :         // Determine which event type to deregister
     580           54 :         int events = 0;
     581           54 :         if (&op == &conn_ || &op == &wr_)
     582            0 :             events = select_scheduler::event_write;
     583           54 :         else if (&op == &rd_)
     584           54 :             events = select_scheduler::event_read;
     585              : 
     586           54 :         svc_.scheduler().deregister_fd(fd_, events);
     587              : 
     588              :         // Keep impl alive until op completes
     589              :         try {
     590           54 :             op.impl_ptr = shared_from_this();
     591            0 :         } catch (const std::bad_weak_ptr&) {
     592              :             // Impl is being destroyed, op will be orphaned but that's ok
     593            0 :         }
     594              : 
     595           54 :         svc_.post(&op);
     596           54 :         svc_.work_finished();
     597              :     }
     598           80 : }
     599              : 
     600              : void
     601         6239 : select_socket_impl::
     602              : close_socket() noexcept
     603              : {
     604         6239 :     cancel();
     605              : 
     606         6239 :     if (fd_ >= 0)
     607              :     {
     608              :         // Unconditionally remove from registered_fds_ to handle edge cases
     609              :         // where the fd might be registered but cancel() didn't clean it up
     610              :         // due to race conditions.
     611         4155 :         svc_.scheduler().deregister_fd(fd_,
     612              :             select_scheduler::event_read | select_scheduler::event_write);
     613         4155 :         ::close(fd_);
     614         4155 :         fd_ = -1;
     615              :     }
     616              : 
     617              :     // Clear cached endpoints
     618         6239 :     local_endpoint_ = endpoint{};
     619         6239 :     remote_endpoint_ = endpoint{};
     620         6239 : }
     621              : 
     622          120 : select_socket_service::
     623          120 : select_socket_service(capy::execution_context& ctx)
     624          120 :     : state_(std::make_unique<select_socket_state>(ctx.use_service<select_scheduler>()))
     625              : {
     626          120 : }
     627              : 
     628          240 : select_socket_service::
     629          120 : ~select_socket_service()
     630              : {
     631          240 : }
     632              : 
     633              : void
     634          120 : select_socket_service::
     635              : shutdown()
     636              : {
     637          120 :     std::lock_guard lock(state_->mutex_);
     638              : 
     639          120 :     while (auto* impl = state_->socket_list_.pop_front())
     640            0 :         impl->close_socket();
     641              : 
     642          120 :     state_->socket_ptrs_.clear();
     643          120 : }
     644              : 
     645              : tcp_socket::socket_impl&
     646         4155 : select_socket_service::
     647              : create_impl()
     648              : {
     649         4155 :     auto impl = std::make_shared<select_socket_impl>(*this);
     650         4155 :     auto* raw = impl.get();
     651              : 
     652              :     {
     653         4155 :         std::lock_guard lock(state_->mutex_);
     654         4155 :         state_->socket_list_.push_back(raw);
     655         4155 :         state_->socket_ptrs_.emplace(raw, std::move(impl));
     656         4155 :     }
     657              : 
     658         4155 :     return *raw;
     659         4155 : }
     660              : 
     661              : void
     662         4155 : select_socket_service::
     663              : destroy_impl(tcp_socket::socket_impl& impl)
     664              : {
     665         4155 :     auto* select_impl = static_cast<select_socket_impl*>(&impl);
     666         4155 :     std::lock_guard lock(state_->mutex_);
     667         4155 :     state_->socket_list_.remove(select_impl);
     668         4155 :     state_->socket_ptrs_.erase(select_impl);
     669         4155 : }
     670              : 
     671              : std::error_code
     672         2084 : select_socket_service::
     673              : open_socket(tcp_socket::socket_impl& impl)
     674              : {
     675         2084 :     auto* select_impl = static_cast<select_socket_impl*>(&impl);
     676         2084 :     select_impl->close_socket();
     677              : 
     678         2084 :     int fd = ::socket(AF_INET, SOCK_STREAM, 0);
     679         2084 :     if (fd < 0)
     680            0 :         return make_err(errno);
     681              : 
     682              :     // Set non-blocking and close-on-exec
     683         2084 :     int flags = ::fcntl(fd, F_GETFL, 0);
     684         2084 :     if (flags == -1)
     685              :     {
     686            0 :         int errn = errno;
     687            0 :         ::close(fd);
     688            0 :         return make_err(errn);
     689              :     }
     690         2084 :     if (::fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1)
     691              :     {
     692            0 :         int errn = errno;
     693            0 :         ::close(fd);
     694            0 :         return make_err(errn);
     695              :     }
     696         2084 :     if (::fcntl(fd, F_SETFD, FD_CLOEXEC) == -1)
     697              :     {
     698            0 :         int errn = errno;
     699            0 :         ::close(fd);
     700            0 :         return make_err(errn);
     701              :     }
     702              : 
     703              :     // Check fd is within select() limits
     704         2084 :     if (fd >= FD_SETSIZE)
     705              :     {
     706            0 :         ::close(fd);
     707            0 :         return make_err(EMFILE);  // Too many open files
     708              :     }
     709              : 
     710         2084 :     select_impl->fd_ = fd;
     711         2084 :     return {};
     712              : }
     713              : 
     714              : void
     715       163537 : select_socket_service::
     716              : post(select_op* op)
     717              : {
     718       163537 :     state_->sched_.post(op);
     719       163537 : }
     720              : 
     721              : void
     722         2224 : select_socket_service::
     723              : work_started() noexcept
     724              : {
     725         2224 :     state_->sched_.work_started();
     726         2224 : }
     727              : 
     728              : void
     729          104 : select_socket_service::
     730              : work_finished() noexcept
     731              : {
     732          104 :     state_->sched_.work_finished();
     733          104 : }
     734              : 
     735              : } // namespace boost::corosio::detail
     736              : 
     737              : #endif // BOOST_COROSIO_HAS_SELECT
        

Generated by: LCOV version 2.3