libs/corosio/src/corosio/src/detail/epoll/op.hpp

84.7% Lines (94/111) 84.2% Functions (16/19) 74.1% Branches (20/27)
libs/corosio/src/corosio/src/detail/epoll/op.hpp
Line Branch Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_DETAIL_EPOLL_OP_HPP
11 #define BOOST_COROSIO_DETAIL_EPOLL_OP_HPP
12
13 #include <boost/corosio/detail/platform.hpp>
14
15 #if BOOST_COROSIO_HAS_EPOLL
16
17 #include <boost/corosio/detail/config.hpp>
18 #include <boost/corosio/io_object.hpp>
19 #include <boost/corosio/endpoint.hpp>
20 #include <boost/capy/ex/executor_ref.hpp>
21 #include <boost/capy/coro.hpp>
22 #include <boost/capy/error.hpp>
23 #include <system_error>
24
25 #include "src/detail/make_err.hpp"
26 #include "src/detail/resume_coro.hpp"
27 #include "src/detail/scheduler_op.hpp"
28 #include "src/detail/endpoint_convert.hpp"
29
30 #include <unistd.h>
31 #include <errno.h>
32
33 #include <atomic>
34 #include <cstddef>
35 #include <memory>
36 #include <optional>
37 #include <stop_token>
38
39 #include <netinet/in.h>
40 #include <sys/socket.h>
41 #include <sys/uio.h>
42
43 /*
44 epoll Operation State
45 =====================
46
47 Each async I/O operation has a corresponding epoll_op-derived struct that
48 holds the operation's state while it's in flight. The socket impl owns
49 fixed slots for each operation type (conn_, rd_, wr_), so only one
50 operation of each type can be pending per socket at a time.
51
52 Persistent Registration
53 -----------------------
54 File descriptors are registered with epoll once (via descriptor_data) and
55 stay registered until closed. The descriptor_data tracks which operations
56 are pending (read_op, write_op, connect_op). When an event arrives, the
57 reactor dispatches to the appropriate pending operation.
58
59 Impl Lifetime Management
60 ------------------------
61 When cancel() posts an op to the scheduler's ready queue, the socket impl
62 might be destroyed before the scheduler processes the op. The `impl_ptr`
63 member holds a shared_ptr to the impl, keeping it alive until the op
64 completes. This is set by cancel() and cleared in operator() after the
65 coroutine is resumed.
66
67 EOF Detection
68 -------------
69 For reads, 0 bytes with no error means EOF. But an empty user buffer also
70 returns 0 bytes. The `empty_buffer_read` flag distinguishes these cases.
71
72 SIGPIPE Prevention
73 ------------------
74 Writes use sendmsg() with MSG_NOSIGNAL instead of writev() to prevent
75 SIGPIPE when the peer has closed.
76 */
77
78 namespace boost::corosio::detail {
79
80 // Forward declarations
81 class epoll_socket_impl;
82 class epoll_acceptor_impl;
83 struct epoll_op;
84
85 /** Per-descriptor state for persistent epoll registration.
86
87 Tracks pending operations for a file descriptor. The fd is registered
88 once with epoll and stays registered until closed. Events are dispatched
89 to the appropriate pending operation (EPOLLIN -> read_op, etc.).
90
91 With edge-triggered epoll (EPOLLET), atomic operations are required to
92 synchronize between operation registration and reactor event delivery.
93 The read_ready/write_ready flags cache edge events that arrived before
94 an operation was registered.
95 */
96 struct descriptor_data
97 {
98 /// Currently registered events (EPOLLIN, EPOLLOUT, etc.)
99 std::uint32_t registered_events = 0;
100
101 /// Pending read operation (nullptr if none)
102 std::atomic<epoll_op*> read_op{nullptr};
103
104 /// Pending write operation (nullptr if none)
105 std::atomic<epoll_op*> write_op{nullptr};
106
107 /// Pending connect operation (nullptr if none)
108 std::atomic<epoll_op*> connect_op{nullptr};
109
110 /// Cached read readiness (edge event arrived before op registered)
111 std::atomic<bool> read_ready{false};
112
113 /// Cached write readiness (edge event arrived before op registered)
114 std::atomic<bool> write_ready{false};
115
116 /// The file descriptor
117 int fd = -1;
118
119 /// Whether this descriptor is managed by persistent registration
120 bool is_registered = false;
121 };
122
123 struct epoll_op : scheduler_op
124 {
125 struct canceller
126 {
127 epoll_op* op;
128 void operator()() const noexcept;
129 };
130
131 capy::coro h;
132 capy::executor_ref ex;
133 std::error_code* ec_out = nullptr;
134 std::size_t* bytes_out = nullptr;
135
136 int fd = -1;
137 int errn = 0;
138 std::size_t bytes_transferred = 0;
139
140 std::atomic<bool> cancelled{false};
141 std::optional<std::stop_callback<canceller>> stop_cb;
142
143 // Prevents use-after-free when socket is closed with pending ops.
144 // See "Impl Lifetime Management" in file header.
145 std::shared_ptr<void> impl_ptr;
146
147 // For stop_token cancellation - pointer to owning socket/acceptor impl.
148 // When stop is requested, we call back to the impl to perform actual I/O cancellation.
149 epoll_socket_impl* socket_impl_ = nullptr;
150 epoll_acceptor_impl* acceptor_impl_ = nullptr;
151
152 15954 epoll_op()
153 15954 {
154 15954 data_ = this;
155 15954 }
156
157 117846 void reset() noexcept
158 {
159 117846 fd = -1;
160 117846 errn = 0;
161 117846 bytes_transferred = 0;
162 117846 cancelled.store(false, std::memory_order_relaxed);
163 117846 impl_ptr.reset();
164 117846 socket_impl_ = nullptr;
165 117846 acceptor_impl_ = nullptr;
166 117846 }
167
168 112552 void operator()() override
169 {
170 112552 stop_cb.reset();
171
172
1/2
✓ Branch 0 taken 112552 times.
✗ Branch 1 not taken.
112552 if (ec_out)
173 {
174
2/2
✓ Branch 1 taken 202 times.
✓ Branch 2 taken 112350 times.
112552 if (cancelled.load(std::memory_order_acquire))
175 202 *ec_out = capy::error::canceled;
176
2/2
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 112349 times.
112350 else if (errn != 0)
177 1 *ec_out = make_err(errn);
178
6/6
✓ Branch 1 taken 56137 times.
✓ Branch 2 taken 56212 times.
✓ Branch 3 taken 5 times.
✓ Branch 4 taken 56132 times.
✓ Branch 5 taken 5 times.
✓ Branch 6 taken 112344 times.
112349 else if (is_read_operation() && bytes_transferred == 0)
179 5 *ec_out = capy::error::eof;
180 else
181 112344 *ec_out = {};
182 }
183
184
1/2
✓ Branch 0 taken 112552 times.
✗ Branch 1 not taken.
112552 if (bytes_out)
185 112552 *bytes_out = bytes_transferred;
186
187 // Move to stack before resuming coroutine. The coroutine might close
188 // the socket, releasing the last wrapper ref. If impl_ptr were the
189 // last ref and we destroyed it while still in operator(), we'd have
190 // use-after-free. Moving to local ensures destruction happens at
191 // function exit, after all member accesses are complete.
192 112552 capy::executor_ref saved_ex( std::move( ex ) );
193 112552 capy::coro saved_h( std::move( h ) );
194 112552 auto prevent_premature_destruction = std::move(impl_ptr);
195
1/1
✓ Branch 1 taken 112552 times.
112552 resume_coro(saved_ex, saved_h);
196 112552 }
197
198 56211 virtual bool is_read_operation() const noexcept { return false; }
199 virtual void cancel() noexcept = 0;
200
201 void destroy() override
202 {
203 stop_cb.reset();
204 impl_ptr.reset();
205 }
206
207 24374 void request_cancel() noexcept
208 {
209 24374 cancelled.store(true, std::memory_order_release);
210 24374 }
211
212 void start(std::stop_token token)
213 {
214 cancelled.store(false, std::memory_order_release);
215 stop_cb.reset();
216 socket_impl_ = nullptr;
217 acceptor_impl_ = nullptr;
218
219 if (token.stop_possible())
220 stop_cb.emplace(token, canceller{this});
221 }
222
223 115195 void start(std::stop_token token, epoll_socket_impl* impl)
224 {
225 115195 cancelled.store(false, std::memory_order_release);
226 115195 stop_cb.reset();
227 115195 socket_impl_ = impl;
228 115195 acceptor_impl_ = nullptr;
229
230
2/2
✓ Branch 1 taken 104 times.
✓ Branch 2 taken 115091 times.
115195 if (token.stop_possible())
231 104 stop_cb.emplace(token, canceller{this});
232 115195 }
233
234 2651 void start(std::stop_token token, epoll_acceptor_impl* impl)
235 {
236 2651 cancelled.store(false, std::memory_order_release);
237 2651 stop_cb.reset();
238 2651 socket_impl_ = nullptr;
239 2651 acceptor_impl_ = impl;
240
241
2/2
✓ Branch 1 taken 9 times.
✓ Branch 2 taken 2642 times.
2651 if (token.stop_possible())
242 9 stop_cb.emplace(token, canceller{this});
243 2651 }
244
245 117720 void complete(int err, std::size_t bytes) noexcept
246 {
247 117720 errn = err;
248 117720 bytes_transferred = bytes;
249 117720 }
250
251 virtual void perform_io() noexcept {}
252 };
253
254
255 struct epoll_connect_op : epoll_op
256 {
257 endpoint target_endpoint;
258
259 2643 void reset() noexcept
260 {
261 2643 epoll_op::reset();
262 2643 target_endpoint = endpoint{};
263 2643 }
264
265 2643 void perform_io() noexcept override
266 {
267 // connect() completion status is retrieved via SO_ERROR, not return value
268 2643 int err = 0;
269 2643 socklen_t len = sizeof(err);
270
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 2643 times.
2643 if (::getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &len) < 0)
271 err = errno;
272 2643 complete(err, 0);
273 2643 }
274
275 // Defined in sockets.cpp where epoll_socket_impl is complete
276 void operator()() override;
277 void cancel() noexcept override;
278 };
279
280
281 struct epoll_read_op : epoll_op
282 {
283 static constexpr std::size_t max_buffers = 16;
284 iovec iovecs[max_buffers];
285 int iovec_count = 0;
286 bool empty_buffer_read = false;
287
288 56138 bool is_read_operation() const noexcept override
289 {
290 56138 return !empty_buffer_read;
291 }
292
293 56336 void reset() noexcept
294 {
295 56336 epoll_op::reset();
296 56336 iovec_count = 0;
297 56336 empty_buffer_read = false;
298 56336 }
299
300 51 void perform_io() noexcept override
301 {
302 51 ssize_t n = ::readv(fd, iovecs, iovec_count);
303
1/2
✓ Branch 0 taken 51 times.
✗ Branch 1 not taken.
51 if (n >= 0)
304 51 complete(0, static_cast<std::size_t>(n));
305 else
306 complete(errno, 0);
307 51 }
308
309 void cancel() noexcept override;
310 };
311
312
313 struct epoll_write_op : epoll_op
314 {
315 static constexpr std::size_t max_buffers = 16;
316 iovec iovecs[max_buffers];
317 int iovec_count = 0;
318
319 56216 void reset() noexcept
320 {
321 56216 epoll_op::reset();
322 56216 iovec_count = 0;
323 56216 }
324
325 void perform_io() noexcept override
326 {
327 msghdr msg{};
328 msg.msg_iov = iovecs;
329 msg.msg_iovlen = static_cast<std::size_t>(iovec_count);
330
331 ssize_t n = ::sendmsg(fd, &msg, MSG_NOSIGNAL);
332 if (n >= 0)
333 complete(0, static_cast<std::size_t>(n));
334 else
335 complete(errno, 0);
336 }
337
338 void cancel() noexcept override;
339 };
340
341
342 struct epoll_accept_op : epoll_op
343 {
344 int accepted_fd = -1;
345 io_object::io_object_impl* peer_impl = nullptr;
346 io_object::io_object_impl** impl_out = nullptr;
347
348 2651 void reset() noexcept
349 {
350 2651 epoll_op::reset();
351 2651 accepted_fd = -1;
352 2651 peer_impl = nullptr;
353 2651 impl_out = nullptr;
354 2651 }
355
356 2640 void perform_io() noexcept override
357 {
358 2640 sockaddr_in addr{};
359 2640 socklen_t addrlen = sizeof(addr);
360 2640 int new_fd = ::accept4(fd, reinterpret_cast<sockaddr*>(&addr),
361 &addrlen, SOCK_NONBLOCK | SOCK_CLOEXEC);
362
363
1/2
✓ Branch 0 taken 2640 times.
✗ Branch 1 not taken.
2640 if (new_fd >= 0)
364 {
365 2640 accepted_fd = new_fd;
366 2640 complete(0, 0);
367 }
368 else
369 {
370 complete(errno, 0);
371 }
372 2640 }
373
374 // Defined in acceptors.cpp where epoll_acceptor_impl is complete
375 void operator()() override;
376 void cancel() noexcept override;
377 };
378
379 } // namespace boost::corosio::detail
380
381 #endif // BOOST_COROSIO_HAS_EPOLL
382
383 #endif // BOOST_COROSIO_DETAIL_EPOLL_OP_HPP
384