libs/corosio/src/corosio/src/detail/select/op.hpp

74.8% Lines (98/131) 84.2% Functions (16/19) 65.7% Branches (23/35)
libs/corosio/src/corosio/src/detail/select/op.hpp
Line Branch Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_DETAIL_SELECT_OP_HPP
11 #define BOOST_COROSIO_DETAIL_SELECT_OP_HPP
12
13 #include <boost/corosio/detail/platform.hpp>
14
15 #if BOOST_COROSIO_HAS_SELECT
16
17 #include <boost/corosio/detail/config.hpp>
18 #include <boost/corosio/io_object.hpp>
19 #include <boost/corosio/endpoint.hpp>
20 #include <boost/capy/ex/executor_ref.hpp>
21 #include <boost/capy/coro.hpp>
22 #include <boost/capy/error.hpp>
23 #include <system_error>
24
25 #include "src/detail/make_err.hpp"
26 #include "src/detail/scheduler_op.hpp"
27 #include "src/detail/endpoint_convert.hpp"
28
29 #include <unistd.h>
30 #include <errno.h>
31 #include <fcntl.h>
32
33 #include <atomic>
34 #include <cstddef>
35 #include <memory>
36 #include <optional>
37 #include <stop_token>
38
39 #include <netinet/in.h>
40 #include <sys/select.h>
41 #include <sys/socket.h>
42 #include <sys/uio.h>
43
44 /*
45 select Operation State
46 ======================
47
48 Each async I/O operation has a corresponding select_op-derived struct that
49 holds the operation's state while it's in flight. The socket impl owns
50 fixed slots for each operation type (conn_, rd_, wr_), so only one
51 operation of each type can be pending per socket at a time.
52
53 This mirrors the epoll_op design for consistency across backends.
54
55 Completion vs Cancellation Race
56 -------------------------------
57 The `registered` atomic uses a tri-state (unregistered, registering,
58 registered) to handle two races: (1) between register_fd() and the
59 reactor seeing an event, and (2) between reactor completion and cancel().
60
61 The registering state closes the window where an event could arrive
62 after register_fd() but before the boolean was set. The reactor and
63 cancel() both treat registering the same as registered when claiming.
64
65 Whoever atomically exchanges to unregistered "claims" the operation
66 and is responsible for completing it. The loser sees unregistered and
67 does nothing. The initiating thread uses compare_exchange to transition
68 from registering to registered; if this fails, the reactor or cancel
69 already claimed the op.
70
71 Impl Lifetime Management
72 ------------------------
73 When cancel() posts an op to the scheduler's ready queue, the socket impl
74 might be destroyed before the scheduler processes the op. The `impl_ptr`
75 member holds a shared_ptr to the impl, keeping it alive until the op
76 completes.
77
78 EOF Detection
79 -------------
80 For reads, 0 bytes with no error means EOF. But an empty user buffer also
81 returns 0 bytes. The `empty_buffer_read` flag distinguishes these cases.
82
83 SIGPIPE Prevention
84 ------------------
85 Writes use sendmsg() with MSG_NOSIGNAL instead of writev() to prevent
86 SIGPIPE when the peer has closed.
87 */
88
89 namespace boost::corosio::detail {
90
91 // Forward declarations for cancellation support
92 class select_socket_impl;
93 class select_acceptor_impl;
94
95 /** Registration state for async operations.
96
97 Tri-state enum to handle the race between register_fd() and
98 run_reactor() seeing an event. Setting REGISTERING before
99 calling register_fd() ensures events delivered during the
100 registration window are not dropped.
101 */
102 enum class select_registration_state : std::uint8_t
103 {
104 unregistered, ///< Not registered with reactor
105 registering, ///< register_fd() called, not yet confirmed
106 registered ///< Fully registered, ready for events
107 };
108
109 struct select_op : scheduler_op
110 {
111 struct canceller
112 {
113 select_op* op;
114 void operator()() const noexcept;
115 };
116
117 capy::coro h;
118 capy::executor_ref ex;
119 std::error_code* ec_out = nullptr;
120 std::size_t* bytes_out = nullptr;
121
122 int fd = -1;
123 int errn = 0;
124 std::size_t bytes_transferred = 0;
125
126 std::atomic<bool> cancelled{false};
127 std::atomic<select_registration_state> registered{select_registration_state::unregistered};
128 std::optional<std::stop_callback<canceller>> stop_cb;
129
130 // Prevents use-after-free when socket is closed with pending ops.
131 std::shared_ptr<void> impl_ptr;
132
133 // For stop_token cancellation - pointer to owning socket/acceptor impl.
134 select_socket_impl* socket_impl_ = nullptr;
135 select_acceptor_impl* acceptor_impl_ = nullptr;
136
137 12512 select_op()
138 12512 {
139 12512 data_ = this;
140 12512 }
141
142 167731 void reset() noexcept
143 {
144 167731 fd = -1;
145 167731 errn = 0;
146 167731 bytes_transferred = 0;
147 167731 cancelled.store(false, std::memory_order_relaxed);
148 167731 registered.store(select_registration_state::unregistered, std::memory_order_relaxed);
149 167731 impl_ptr.reset();
150 167731 socket_impl_ = nullptr;
151 167731 acceptor_impl_ = nullptr;
152 167731 }
153
154 163584 void operator()() override
155 {
156 163584 stop_cb.reset();
157
158
1/2
✓ Branch 0 taken 163584 times.
✗ Branch 1 not taken.
163584 if (ec_out)
159 {
160
2/2
✓ Branch 1 taken 183 times.
✓ Branch 2 taken 163401 times.
163584 if (cancelled.load(std::memory_order_acquire))
161 183 *ec_out = capy::error::canceled;
162
2/2
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 163400 times.
163401 else if (errn != 0)
163 1 *ec_out = make_err(errn);
164
6/6
✓ Branch 1 taken 81666 times.
✓ Branch 2 taken 81734 times.
✓ Branch 3 taken 5 times.
✓ Branch 4 taken 81661 times.
✓ Branch 5 taken 5 times.
✓ Branch 6 taken 163395 times.
163400 else if (is_read_operation() && bytes_transferred == 0)
165 5 *ec_out = capy::error::eof;
166 else
167 163395 *ec_out = {};
168 }
169
170
1/2
✓ Branch 0 taken 163584 times.
✗ Branch 1 not taken.
163584 if (bytes_out)
171 163584 *bytes_out = bytes_transferred;
172
173 // Move to stack before destroying the frame
174 163584 capy::executor_ref saved_ex( std::move( ex ) );
175 163584 capy::coro saved_h( std::move( h ) );
176 163584 impl_ptr.reset();
177
1/1
✓ Branch 1 taken 163584 times.
163584 saved_ex.dispatch( saved_h );
178 163584 }
179
180 81733 virtual bool is_read_operation() const noexcept { return false; }
181 virtual void cancel() noexcept = 0;
182
183 void destroy() override
184 {
185 stop_cb.reset();
186 impl_ptr.reset();
187 }
188
189 19174 void request_cancel() noexcept
190 {
191 19174 cancelled.store(true, std::memory_order_release);
192 19174 }
193
194 void start(std::stop_token token)
195 {
196 cancelled.store(false, std::memory_order_release);
197 stop_cb.reset();
198 socket_impl_ = nullptr;
199 acceptor_impl_ = nullptr;
200
201 if (token.stop_possible())
202 stop_cb.emplace(token, canceller{this});
203 }
204
205 165657 void start(std::stop_token token, select_socket_impl* impl)
206 {
207 165657 cancelled.store(false, std::memory_order_release);
208 165657 stop_cb.reset();
209 165657 socket_impl_ = impl;
210 165657 acceptor_impl_ = nullptr;
211
212
2/2
✓ Branch 1 taken 81 times.
✓ Branch 2 taken 165576 times.
165657 if (token.stop_possible())
213 81 stop_cb.emplace(token, canceller{this});
214 165657 }
215
216 2074 void start(std::stop_token token, select_acceptor_impl* impl)
217 {
218 2074 cancelled.store(false, std::memory_order_release);
219 2074 stop_cb.reset();
220 2074 socket_impl_ = nullptr;
221 2074 acceptor_impl_ = impl;
222
223
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 2074 times.
2074 if (token.stop_possible())
224 stop_cb.emplace(token, canceller{this});
225 2074 }
226
227 167624 void complete(int err, std::size_t bytes) noexcept
228 {
229 167624 errn = err;
230 167624 bytes_transferred = bytes;
231 167624 }
232
233 virtual void perform_io() noexcept {}
234 };
235
236
237 struct select_connect_op : select_op
238 {
239 endpoint target_endpoint;
240
241 2073 void reset() noexcept
242 {
243 2073 select_op::reset();
244 2073 target_endpoint = endpoint{};
245 2073 }
246
247 2073 void perform_io() noexcept override
248 {
249 // connect() completion status is retrieved via SO_ERROR, not return value
250 2073 int err = 0;
251 2073 socklen_t len = sizeof(err);
252
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 2073 times.
2073 if (::getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &len) < 0)
253 err = errno;
254 2073 complete(err, 0);
255 2073 }
256
257 // Defined in sockets.cpp where select_socket_impl is complete
258 void operator()() override;
259 void cancel() noexcept override;
260 };
261
262
263 struct select_read_op : select_op
264 {
265 static constexpr std::size_t max_buffers = 16;
266 iovec iovecs[max_buffers];
267 int iovec_count = 0;
268 bool empty_buffer_read = false;
269
270 81667 bool is_read_operation() const noexcept override
271 {
272 81667 return !empty_buffer_read;
273 }
274
275 81846 void reset() noexcept
276 {
277 81846 select_op::reset();
278 81846 iovec_count = 0;
279 81846 empty_buffer_read = false;
280 81846 }
281
282 47 void perform_io() noexcept override
283 {
284 47 ssize_t n = ::readv(fd, iovecs, iovec_count);
285
1/2
✓ Branch 0 taken 47 times.
✗ Branch 1 not taken.
47 if (n >= 0)
286 47 complete(0, static_cast<std::size_t>(n));
287 else
288 complete(errno, 0);
289 47 }
290
291 void cancel() noexcept override;
292 };
293
294
295 struct select_write_op : select_op
296 {
297 static constexpr std::size_t max_buffers = 16;
298 iovec iovecs[max_buffers];
299 int iovec_count = 0;
300
301 81738 void reset() noexcept
302 {
303 81738 select_op::reset();
304 81738 iovec_count = 0;
305 81738 }
306
307 void perform_io() noexcept override
308 {
309 msghdr msg{};
310 msg.msg_iov = iovecs;
311 msg.msg_iovlen = static_cast<std::size_t>(iovec_count);
312
313 ssize_t n = ::sendmsg(fd, &msg, MSG_NOSIGNAL);
314 if (n >= 0)
315 complete(0, static_cast<std::size_t>(n));
316 else
317 complete(errno, 0);
318 }
319
320 void cancel() noexcept override;
321 };
322
323
324 struct select_accept_op : select_op
325 {
326 int accepted_fd = -1;
327 io_object::io_object_impl* peer_impl = nullptr;
328 io_object::io_object_impl** impl_out = nullptr;
329
330 2074 void reset() noexcept
331 {
332 2074 select_op::reset();
333 2074 accepted_fd = -1;
334 2074 peer_impl = nullptr;
335 2074 impl_out = nullptr;
336 2074 }
337
338 2069 void perform_io() noexcept override
339 {
340 2069 sockaddr_in addr{};
341 2069 socklen_t addrlen = sizeof(addr);
342
343 // Note: select backend uses accept() + fcntl instead of accept4()
344 // for broader POSIX compatibility
345 2069 int new_fd = ::accept(fd, reinterpret_cast<sockaddr*>(&addr), &addrlen);
346
347
1/2
✓ Branch 0 taken 2069 times.
✗ Branch 1 not taken.
2069 if (new_fd >= 0)
348 {
349 // Reject fds that exceed select()'s FD_SETSIZE limit.
350 // Better to fail now than during later async operations.
351
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2069 times.
2069 if (new_fd >= FD_SETSIZE)
352 {
353 ::close(new_fd);
354 complete(EINVAL, 0);
355 return;
356 }
357
358 // Set non-blocking and close-on-exec flags.
359 // A non-blocking socket is essential for the async reactor;
360 // if we can't configure it, fail rather than risk blocking.
361 2069 int flags = ::fcntl(new_fd, F_GETFL, 0);
362
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2069 times.
2069 if (flags == -1)
363 {
364 int err = errno;
365 ::close(new_fd);
366 complete(err, 0);
367 return;
368 }
369
370
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 2069 times.
2069 if (::fcntl(new_fd, F_SETFL, flags | O_NONBLOCK) == -1)
371 {
372 int err = errno;
373 ::close(new_fd);
374 complete(err, 0);
375 return;
376 }
377
378
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 2069 times.
2069 if (::fcntl(new_fd, F_SETFD, FD_CLOEXEC) == -1)
379 {
380 int err = errno;
381 ::close(new_fd);
382 complete(err, 0);
383 return;
384 }
385
386 2069 accepted_fd = new_fd;
387 2069 complete(0, 0);
388 }
389 else
390 {
391 complete(errno, 0);
392 }
393 }
394
395 // Defined in acceptors.cpp where select_acceptor_impl is complete
396 void operator()() override;
397 void cancel() noexcept override;
398 };
399
400 } // namespace boost::corosio::detail
401
402 #endif // BOOST_COROSIO_HAS_SELECT
403
404 #endif // BOOST_COROSIO_DETAIL_SELECT_OP_HPP
405