libs/corosio/src/corosio/src/detail/epoll/sockets.hpp

91.2% Lines (31/34) 85.7% Functions (18/21) 100.0% Branches (4/4)
libs/corosio/src/corosio/src/detail/epoll/sockets.hpp
Line Branch Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_DETAIL_EPOLL_SOCKETS_HPP
11 #define BOOST_COROSIO_DETAIL_EPOLL_SOCKETS_HPP
12
13 #include <boost/corosio/detail/platform.hpp>
14
15 #if BOOST_COROSIO_HAS_EPOLL
16
17 #include <boost/corosio/detail/config.hpp>
18 #include <boost/corosio/tcp_socket.hpp>
19 #include <boost/capy/ex/executor_ref.hpp>
20 #include <boost/capy/ex/execution_context.hpp>
21 #include "src/detail/intrusive.hpp"
22 #include "src/detail/socket_service.hpp"
23
24 #include "src/detail/epoll/op.hpp"
25 #include "src/detail/epoll/scheduler.hpp"
26
27 #include <coroutine>
28 #include <memory>
29 #include <mutex>
30 #include <unordered_map>
31
32 /*
33 epoll Socket Implementation
34 ===========================
35
36 Each I/O operation follows the same pattern:
37 1. Try the syscall immediately (non-blocking socket)
38 2. If it succeeds or fails with a real error, post to completion queue
39 3. If EAGAIN/EWOULDBLOCK, register with epoll and wait
40
41 This "try first" approach avoids unnecessary epoll round-trips for
42 operations that can complete immediately (common for small reads/writes
43 on fast local connections).
44
45 One-Shot Registration
46 ---------------------
47 We use one-shot epoll registration: each operation registers, waits for
48 one event, then unregisters. This simplifies the state machine since we
49 don't need to track whether an fd is currently registered or handle
50 re-arming. The tradeoff is slightly more epoll_ctl calls, but the
51 simplicity is worth it.
52
53 Cancellation
54 ------------
55 See op.hpp for the completion/cancellation race handling via the
56 `registered` atomic. cancel() must complete pending operations (post
57 them with cancelled flag) so coroutines waiting on them can resume.
58 close_socket() calls cancel() first to ensure this.
59
60 Impl Lifetime with shared_ptr
61 -----------------------------
62 Socket impls use enable_shared_from_this. The service owns impls via
63 shared_ptr maps (socket_ptrs_) keyed by raw pointer for O(1) lookup and
64 removal. When a user calls close(), we call cancel() which posts pending
65 ops to the scheduler.
66
67 CRITICAL: The posted ops must keep the impl alive until they complete.
68 Otherwise the scheduler would process a freed op (use-after-free). The
69 cancel() method captures shared_from_this() into op.impl_ptr before
70 posting. When the op completes, impl_ptr is cleared, allowing the impl
71 to be destroyed if no other references exist.
72
73 Service Ownership
74 -----------------
75 epoll_socket_service owns all socket impls. destroy_impl() removes the
76 shared_ptr from the map, but the impl may survive if ops still hold
77 impl_ptr refs. shutdown() closes all sockets and clears the map; any
78 in-flight ops will complete and release their refs.
79 */
80
81 namespace boost::corosio::detail {
82
83 class epoll_socket_service;
84 class epoll_socket_impl;
85
86 /** Initiator coroutine for read operations.
87
88 This coroutine receives control via symmetric transfer after the caller
89 has fully suspended, then initiates the actual I/O. Uses cached frame
90 allocation to avoid per-operation heap allocations.
91 */
92 struct read_initiator
93 {
94 struct promise_type
95 {
96 epoll_socket_impl* impl;
97
98 /// Reuse cached frame to avoid per-operation heap allocation.
99 56335 static void* operator new(std::size_t n, void*& cached, epoll_socket_impl*)
100 {
101
2/2
✓ Branch 0 taken 40 times.
✓ Branch 1 taken 56295 times.
56335 if (!cached)
102 40 cached = ::operator new(n);
103 56335 return cached;
104 }
105
106 /// No-op - frame memory freed in socket destructor.
107 56335 static void operator delete(void*) noexcept {}
108
109 56335 std::suspend_always initial_suspend() noexcept { return {}; }
110 56335 std::suspend_always final_suspend() noexcept { return {}; }
111
112 56335 read_initiator get_return_object()
113 {
114 56335 return {std::coroutine_handle<promise_type>::from_promise(*this)};
115 }
116
117 56335 void return_void() {}
118 void unhandled_exception() { std::terminate(); }
119 };
120
121 using handle_type = std::coroutine_handle<promise_type>;
122 handle_type h;
123 };
124
125 /** Initiator coroutine for write operations.
126
127 This coroutine receives control via symmetric transfer after the caller
128 has fully suspended, then initiates the actual I/O. Uses cached frame
129 allocation to avoid per-operation heap allocations.
130 */
131 struct write_initiator
132 {
133 struct promise_type
134 {
135 epoll_socket_impl* impl;
136
137 /// Reuse cached frame to avoid per-operation heap allocation.
138 56215 static void* operator new(std::size_t n, void*& cached, epoll_socket_impl*)
139 {
140
2/2
✓ Branch 0 taken 38 times.
✓ Branch 1 taken 56177 times.
56215 if (!cached)
141 38 cached = ::operator new(n);
142 56215 return cached;
143 }
144
145 /// No-op - frame memory freed in socket destructor.
146 56215 static void operator delete(void*) noexcept {}
147
148 56215 std::suspend_always initial_suspend() noexcept { return {}; }
149 56215 std::suspend_always final_suspend() noexcept { return {}; }
150
151 56215 write_initiator get_return_object()
152 {
153 56215 return {std::coroutine_handle<promise_type>::from_promise(*this)};
154 }
155
156 56215 void return_void() {}
157 void unhandled_exception() { std::terminate(); }
158 };
159
160 using handle_type = std::coroutine_handle<promise_type>;
161 handle_type h;
162 };
163
164 // Coroutine factory functions (defined in sockets.cpp)
165 read_initiator make_read_initiator(void*& cached, epoll_socket_impl* impl);
166 write_initiator make_write_initiator(void*& cached, epoll_socket_impl* impl);
167
168 /// Socket implementation for epoll backend.
169 class epoll_socket_impl
170 : public tcp_socket::socket_impl
171 , public std::enable_shared_from_this<epoll_socket_impl>
172 , public intrusive_list<epoll_socket_impl>::node
173 {
174 friend class epoll_socket_service;
175
176 public:
177 explicit epoll_socket_impl(epoll_socket_service& svc) noexcept;
178 ~epoll_socket_impl();
179
180 void release() override;
181
182 void connect(
183 std::coroutine_handle<>,
184 capy::executor_ref,
185 endpoint,
186 std::stop_token,
187 std::error_code*) override;
188
189 std::coroutine_handle<> read_some(
190 std::coroutine_handle<>,
191 capy::executor_ref,
192 io_buffer_param,
193 std::stop_token,
194 std::error_code*,
195 std::size_t*) override;
196
197 std::coroutine_handle<> write_some(
198 std::coroutine_handle<>,
199 capy::executor_ref,
200 io_buffer_param,
201 std::stop_token,
202 std::error_code*,
203 std::size_t*) override;
204
205 std::error_code shutdown(tcp_socket::shutdown_type what) noexcept override;
206
207 native_handle_type native_handle() const noexcept override { return fd_; }
208
209 // Socket options
210 std::error_code set_no_delay(bool value) noexcept override;
211 bool no_delay(std::error_code& ec) const noexcept override;
212
213 std::error_code set_keep_alive(bool value) noexcept override;
214 bool keep_alive(std::error_code& ec) const noexcept override;
215
216 std::error_code set_receive_buffer_size(int size) noexcept override;
217 int receive_buffer_size(std::error_code& ec) const noexcept override;
218
219 std::error_code set_send_buffer_size(int size) noexcept override;
220 int send_buffer_size(std::error_code& ec) const noexcept override;
221
222 std::error_code set_linger(bool enabled, int timeout) noexcept override;
223 tcp_socket::linger_options linger(std::error_code& ec) const noexcept override;
224
225 16 endpoint local_endpoint() const noexcept override { return local_endpoint_; }
226 16 endpoint remote_endpoint() const noexcept override { return remote_endpoint_; }
227 bool is_open() const noexcept { return fd_ >= 0; }
228 void cancel() noexcept override;
229 void cancel_single_op(epoll_op& op) noexcept;
230 void close_socket() noexcept;
231 void update_epoll_events() noexcept;
232 2642 void set_socket(int fd) noexcept { fd_ = fd; }
233 5284 void set_endpoints(endpoint local, endpoint remote) noexcept
234 {
235 5284 local_endpoint_ = local;
236 5284 remote_endpoint_ = remote;
237 5284 }
238
239 epoll_connect_op conn_;
240 epoll_read_op rd_;
241 epoll_write_op wr_;
242
243 /// Per-descriptor state for persistent epoll registration
244 descriptor_data desc_data_;
245
246 void* read_initiator_frame_ = nullptr;
247 void* write_initiator_frame_ = nullptr;
248 read_initiator::handle_type read_initiator_handle_;
249 write_initiator::handle_type write_initiator_handle_;
250
251 /// Execute the read I/O operation (called by initiator coroutine).
252 void do_read_io();
253
254 /// Execute the write I/O operation (called by initiator coroutine).
255 void do_write_io();
256
257 private:
258 epoll_socket_service& svc_;
259 int fd_ = -1;
260 endpoint local_endpoint_;
261 endpoint remote_endpoint_;
262 };
263
264 /** State for epoll socket service. */
265 class epoll_socket_state
266 {
267 public:
268 184 explicit epoll_socket_state(epoll_scheduler& sched) noexcept
269 184 : sched_(sched)
270 {
271 184 }
272
273 epoll_scheduler& sched_;
274 std::mutex mutex_;
275 intrusive_list<epoll_socket_impl> socket_list_;
276 std::unordered_map<epoll_socket_impl*, std::shared_ptr<epoll_socket_impl>> socket_ptrs_;
277 };
278
279 /** epoll socket service implementation.
280
281 Inherits from socket_service to enable runtime polymorphism.
282 Uses key_type = socket_service for service lookup.
283 */
284 class epoll_socket_service : public socket_service
285 {
286 public:
287 explicit epoll_socket_service(capy::execution_context& ctx);
288 ~epoll_socket_service();
289
290 epoll_socket_service(epoll_socket_service const&) = delete;
291 epoll_socket_service& operator=(epoll_socket_service const&) = delete;
292
293 void shutdown() override;
294
295 tcp_socket::socket_impl& create_impl() override;
296 void destroy_impl(tcp_socket::socket_impl& impl) override;
297 std::error_code open_socket(tcp_socket::socket_impl& impl) override;
298
299 10592 epoll_scheduler& scheduler() const noexcept { return state_->sched_; }
300 void post(epoll_op* op);
301 void work_started() noexcept;
302 void work_finished() noexcept;
303
304 private:
305 std::unique_ptr<epoll_socket_state> state_;
306 };
307
308 // Backward compatibility alias
309 using epoll_sockets = epoll_socket_service;
310
311 } // namespace boost::corosio::detail
312
313 #endif // BOOST_COROSIO_HAS_EPOLL
314
315 #endif // BOOST_COROSIO_DETAIL_EPOLL_SOCKETS_HPP
316