libs/corosio/src/corosio/src/detail/select/acceptors.cpp

64.9% Lines (161/248) 88.9% Functions (16/18) 45.5% Branches (66/145)
libs/corosio/src/corosio/src/detail/select/acceptors.cpp
Line Branch Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #include <boost/corosio/detail/platform.hpp>
11
12 #if BOOST_COROSIO_HAS_SELECT
13
14 #include "src/detail/select/acceptors.hpp"
15 #include "src/detail/select/sockets.hpp"
16 #include "src/detail/endpoint_convert.hpp"
17 #include "src/detail/make_err.hpp"
18
19 #include <errno.h>
20 #include <fcntl.h>
21 #include <netinet/in.h>
22 #include <sys/socket.h>
23 #include <unistd.h>
24
25 namespace boost::corosio::detail {
26
27 void
28 select_accept_op::
29 cancel() noexcept
30 {
31 if (acceptor_impl_)
32 acceptor_impl_->cancel_single_op(*this);
33 else
34 request_cancel();
35 }
36
37 void
38 2074 select_accept_op::
39 operator()()
40 {
41 2074 stop_cb.reset();
42
43
3/4
✓ Branch 0 taken 2074 times.
✗ Branch 1 not taken.
✓ Branch 3 taken 2071 times.
✓ Branch 4 taken 3 times.
2074 bool success = (errn == 0 && !cancelled.load(std::memory_order_acquire));
44
45
1/2
✓ Branch 0 taken 2074 times.
✗ Branch 1 not taken.
2074 if (ec_out)
46 {
47
2/2
✓ Branch 1 taken 3 times.
✓ Branch 2 taken 2071 times.
2074 if (cancelled.load(std::memory_order_acquire))
48 3 *ec_out = capy::error::canceled;
49
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2071 times.
2071 else if (errn != 0)
50 *ec_out = make_err(errn);
51 else
52 2071 *ec_out = {};
53 }
54
55
3/4
✓ Branch 0 taken 2071 times.
✓ Branch 1 taken 3 times.
✓ Branch 2 taken 2071 times.
✗ Branch 3 not taken.
2074 if (success && accepted_fd >= 0)
56 {
57
1/2
✓ Branch 0 taken 2071 times.
✗ Branch 1 not taken.
2071 if (acceptor_impl_)
58 {
59 2071 auto* socket_svc = static_cast<select_acceptor_impl*>(acceptor_impl_)
60 2071 ->service().socket_service();
61
1/2
✓ Branch 0 taken 2071 times.
✗ Branch 1 not taken.
2071 if (socket_svc)
62 {
63
1/1
✓ Branch 1 taken 2071 times.
2071 auto& impl = static_cast<select_socket_impl&>(socket_svc->create_impl());
64 2071 impl.set_socket(accepted_fd);
65
66 2071 sockaddr_in local_addr{};
67 2071 socklen_t local_len = sizeof(local_addr);
68 2071 sockaddr_in remote_addr{};
69 2071 socklen_t remote_len = sizeof(remote_addr);
70
71 2071 endpoint local_ep, remote_ep;
72
1/2
✓ Branch 1 taken 2071 times.
✗ Branch 2 not taken.
2071 if (::getsockname(accepted_fd, reinterpret_cast<sockaddr*>(&local_addr), &local_len) == 0)
73 2071 local_ep = from_sockaddr_in(local_addr);
74
1/2
✓ Branch 1 taken 2071 times.
✗ Branch 2 not taken.
2071 if (::getpeername(accepted_fd, reinterpret_cast<sockaddr*>(&remote_addr), &remote_len) == 0)
75 2071 remote_ep = from_sockaddr_in(remote_addr);
76
77 2071 impl.set_endpoints(local_ep, remote_ep);
78
79
1/2
✓ Branch 0 taken 2071 times.
✗ Branch 1 not taken.
2071 if (impl_out)
80 2071 *impl_out = &impl;
81
82 2071 accepted_fd = -1;
83 }
84 else
85 {
86 if (ec_out && !*ec_out)
87 *ec_out = make_err(ENOENT);
88 ::close(accepted_fd);
89 accepted_fd = -1;
90 if (impl_out)
91 *impl_out = nullptr;
92 }
93 }
94 else
95 {
96 ::close(accepted_fd);
97 accepted_fd = -1;
98 if (impl_out)
99 *impl_out = nullptr;
100 }
101 2071 }
102 else
103 {
104
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3 times.
3 if (accepted_fd >= 0)
105 {
106 ::close(accepted_fd);
107 accepted_fd = -1;
108 }
109
110
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3 times.
3 if (peer_impl)
111 {
112 peer_impl->release();
113 peer_impl = nullptr;
114 }
115
116
1/2
✓ Branch 0 taken 3 times.
✗ Branch 1 not taken.
3 if (impl_out)
117 3 *impl_out = nullptr;
118 }
119
120 // Move to stack before destroying the frame
121 2074 capy::executor_ref saved_ex( std::move( ex ) );
122 2074 capy::coro saved_h( std::move( h ) );
123 2074 impl_ptr.reset();
124
1/1
✓ Branch 1 taken 2074 times.
2074 saved_ex.dispatch( saved_h );
125 2074 }
126
127 47 select_acceptor_impl::
128 47 select_acceptor_impl(select_acceptor_service& svc) noexcept
129 47 : svc_(svc)
130 {
131 47 }
132
133 void
134 47 select_acceptor_impl::
135 release()
136 {
137 47 close_socket();
138 47 svc_.destroy_acceptor_impl(*this);
139 47 }
140
141 void
142 2074 select_acceptor_impl::
143 accept(
144 std::coroutine_handle<> h,
145 capy::executor_ref ex,
146 std::stop_token token,
147 std::error_code* ec,
148 io_object::io_object_impl** impl_out)
149 {
150 2074 auto& op = acc_;
151 2074 op.reset();
152 2074 op.h = h;
153 2074 op.ex = ex;
154 2074 op.ec_out = ec;
155 2074 op.impl_out = impl_out;
156 2074 op.fd = fd_;
157 2074 op.start(token, this);
158
159 2074 sockaddr_in addr{};
160 2074 socklen_t addrlen = sizeof(addr);
161
1/1
✓ Branch 1 taken 2074 times.
2074 int accepted = ::accept(fd_, reinterpret_cast<sockaddr*>(&addr), &addrlen);
162
163
2/2
✓ Branch 0 taken 2 times.
✓ Branch 1 taken 2072 times.
2074 if (accepted >= 0)
164 {
165 // Reject fds that exceed select()'s FD_SETSIZE limit.
166 // Better to fail now than during later async operations.
167
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2 times.
2 if (accepted >= FD_SETSIZE)
168 {
169 ::close(accepted);
170 op.accepted_fd = -1;
171 op.complete(EINVAL, 0);
172 op.impl_ptr = shared_from_this();
173 svc_.post(&op);
174 2074 return;
175 }
176
177 // Set non-blocking and close-on-exec flags.
178 // A non-blocking socket is essential for the async reactor;
179 // if we can't configure it, fail rather than risk blocking.
180
1/1
✓ Branch 1 taken 2 times.
2 int flags = ::fcntl(accepted, F_GETFL, 0);
181
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2 times.
2 if (flags == -1)
182 {
183 int err = errno;
184 ::close(accepted);
185 op.accepted_fd = -1;
186 op.complete(err, 0);
187 op.impl_ptr = shared_from_this();
188 svc_.post(&op);
189 return;
190 }
191
192
2/3
✓ Branch 1 taken 2 times.
✗ Branch 3 not taken.
✓ Branch 4 taken 2 times.
2 if (::fcntl(accepted, F_SETFL, flags | O_NONBLOCK) == -1)
193 {
194 int err = errno;
195 ::close(accepted);
196 op.accepted_fd = -1;
197 op.complete(err, 0);
198 op.impl_ptr = shared_from_this();
199 svc_.post(&op);
200 return;
201 }
202
203
2/3
✓ Branch 1 taken 2 times.
✗ Branch 3 not taken.
✓ Branch 4 taken 2 times.
2 if (::fcntl(accepted, F_SETFD, FD_CLOEXEC) == -1)
204 {
205 int err = errno;
206 ::close(accepted);
207 op.accepted_fd = -1;
208 op.complete(err, 0);
209 op.impl_ptr = shared_from_this();
210 svc_.post(&op);
211 return;
212 }
213
214 2 op.accepted_fd = accepted;
215 2 op.complete(0, 0);
216
1/1
✓ Branch 1 taken 2 times.
2 op.impl_ptr = shared_from_this();
217
1/1
✓ Branch 1 taken 2 times.
2 svc_.post(&op);
218 2 return;
219 }
220
221
1/4
✗ Branch 0 not taken.
✓ Branch 1 taken 2072 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
2072 if (errno == EAGAIN || errno == EWOULDBLOCK)
222 {
223 2072 svc_.work_started();
224
1/1
✓ Branch 1 taken 2072 times.
2072 op.impl_ptr = shared_from_this();
225
226 // Set registering BEFORE register_fd to close the race window where
227 // reactor sees an event before we set registered.
228 2072 op.registered.store(select_registration_state::registering, std::memory_order_release);
229
1/1
✓ Branch 2 taken 2072 times.
2072 svc_.scheduler().register_fd(fd_, &op, select_scheduler::event_read);
230
231 // Transition to registered. If this fails, reactor or cancel already
232 // claimed the op (state is now unregistered), so we're done. However,
233 // we must still deregister the fd because cancel's deregister_fd may
234 // have run before our register_fd, leaving the fd orphaned.
235 2072 auto expected = select_registration_state::registering;
236
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 2072 times.
2072 if (!op.registered.compare_exchange_strong(
237 expected, select_registration_state::registered, std::memory_order_acq_rel))
238 {
239 svc_.scheduler().deregister_fd(fd_, select_scheduler::event_read);
240 return;
241 }
242
243 // If cancelled was set before we registered, handle it now.
244
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 2072 times.
2072 if (op.cancelled.load(std::memory_order_acquire))
245 {
246 auto prev = op.registered.exchange(
247 select_registration_state::unregistered, std::memory_order_acq_rel);
248 if (prev != select_registration_state::unregistered)
249 {
250 svc_.scheduler().deregister_fd(fd_, select_scheduler::event_read);
251 op.impl_ptr = shared_from_this();
252 svc_.post(&op);
253 svc_.work_finished();
254 }
255 }
256 2072 return;
257 }
258
259 op.complete(errno, 0);
260 op.impl_ptr = shared_from_this();
261 svc_.post(&op);
262 }
263
264 void
265 95 select_acceptor_impl::
266 cancel() noexcept
267 {
268 95 std::shared_ptr<select_acceptor_impl> self;
269 try {
270
1/1
✓ Branch 1 taken 95 times.
95 self = shared_from_this();
271 } catch (const std::bad_weak_ptr&) {
272 return;
273 }
274
275 95 auto prev = acc_.registered.exchange(
276 select_registration_state::unregistered, std::memory_order_acq_rel);
277 95 acc_.request_cancel();
278
279
2/2
✓ Branch 0 taken 3 times.
✓ Branch 1 taken 92 times.
95 if (prev != select_registration_state::unregistered)
280 {
281 3 svc_.scheduler().deregister_fd(fd_, select_scheduler::event_read);
282 3 acc_.impl_ptr = self;
283 3 svc_.post(&acc_);
284 3 svc_.work_finished();
285 }
286 95 }
287
288 void
289 select_acceptor_impl::
290 cancel_single_op(select_op& op) noexcept
291 {
292 // Called from stop_token callback to cancel a specific pending operation.
293 auto prev = op.registered.exchange(
294 select_registration_state::unregistered, std::memory_order_acq_rel);
295 op.request_cancel();
296
297 if (prev != select_registration_state::unregistered)
298 {
299 svc_.scheduler().deregister_fd(fd_, select_scheduler::event_read);
300
301 // Keep impl alive until op completes
302 try {
303 op.impl_ptr = shared_from_this();
304 } catch (const std::bad_weak_ptr&) {
305 // Impl is being destroyed, op will be orphaned but that's ok
306 }
307
308 svc_.post(&op);
309 svc_.work_finished();
310 }
311 }
312
313 void
314 94 select_acceptor_impl::
315 close_socket() noexcept
316 {
317 94 cancel();
318
319
2/2
✓ Branch 0 taken 42 times.
✓ Branch 1 taken 52 times.
94 if (fd_ >= 0)
320 {
321 // Unconditionally remove from registered_fds_ to handle edge cases
322 42 svc_.scheduler().deregister_fd(fd_, select_scheduler::event_read);
323 42 ::close(fd_);
324 42 fd_ = -1;
325 }
326
327 // Clear cached endpoint
328 94 local_endpoint_ = endpoint{};
329 94 }
330
331 120 select_acceptor_service::
332 120 select_acceptor_service(capy::execution_context& ctx)
333 120 : ctx_(ctx)
334
2/2
✓ Branch 2 taken 120 times.
✓ Branch 5 taken 120 times.
120 , state_(std::make_unique<select_acceptor_state>(ctx.use_service<select_scheduler>()))
335 {
336 120 }
337
338 240 select_acceptor_service::
339 120 ~select_acceptor_service()
340 {
341 240 }
342
343 void
344 120 select_acceptor_service::
345 shutdown()
346 {
347
1/1
✓ Branch 2 taken 120 times.
120 std::lock_guard lock(state_->mutex_);
348
349
1/2
✗ Branch 2 not taken.
✓ Branch 3 taken 120 times.
120 while (auto* impl = state_->acceptor_list_.pop_front())
350 impl->close_socket();
351
352 120 state_->acceptor_ptrs_.clear();
353 120 }
354
355 tcp_acceptor::acceptor_impl&
356 47 select_acceptor_service::
357 create_acceptor_impl()
358 {
359
1/1
✓ Branch 1 taken 47 times.
47 auto impl = std::make_shared<select_acceptor_impl>(*this);
360 47 auto* raw = impl.get();
361
362
1/1
✓ Branch 2 taken 47 times.
47 std::lock_guard lock(state_->mutex_);
363 47 state_->acceptor_list_.push_back(raw);
364
1/1
✓ Branch 3 taken 47 times.
47 state_->acceptor_ptrs_.emplace(raw, std::move(impl));
365
366 47 return *raw;
367 47 }
368
369 void
370 47 select_acceptor_service::
371 destroy_acceptor_impl(tcp_acceptor::acceptor_impl& impl)
372 {
373 47 auto* select_impl = static_cast<select_acceptor_impl*>(&impl);
374
1/1
✓ Branch 2 taken 47 times.
47 std::lock_guard lock(state_->mutex_);
375 47 state_->acceptor_list_.remove(select_impl);
376
1/1
✓ Branch 2 taken 47 times.
47 state_->acceptor_ptrs_.erase(select_impl);
377 47 }
378
379 std::error_code
380 47 select_acceptor_service::
381 open_acceptor(
382 tcp_acceptor::acceptor_impl& impl,
383 endpoint ep,
384 int backlog)
385 {
386 47 auto* select_impl = static_cast<select_acceptor_impl*>(&impl);
387 47 select_impl->close_socket();
388
389 47 int fd = ::socket(AF_INET, SOCK_STREAM, 0);
390
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 47 times.
47 if (fd < 0)
391 return make_err(errno);
392
393 // Set non-blocking and close-on-exec
394
1/1
✓ Branch 1 taken 47 times.
47 int flags = ::fcntl(fd, F_GETFL, 0);
395
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 47 times.
47 if (flags == -1)
396 {
397 int errn = errno;
398 ::close(fd);
399 return make_err(errn);
400 }
401
2/3
✓ Branch 1 taken 47 times.
✗ Branch 3 not taken.
✓ Branch 4 taken 47 times.
47 if (::fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1)
402 {
403 int errn = errno;
404 ::close(fd);
405 return make_err(errn);
406 }
407
2/3
✓ Branch 1 taken 47 times.
✗ Branch 3 not taken.
✓ Branch 4 taken 47 times.
47 if (::fcntl(fd, F_SETFD, FD_CLOEXEC) == -1)
408 {
409 int errn = errno;
410 ::close(fd);
411 return make_err(errn);
412 }
413
414 // Check fd is within select() limits
415
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 47 times.
47 if (fd >= FD_SETSIZE)
416 {
417 ::close(fd);
418 return make_err(EMFILE);
419 }
420
421 47 int reuse = 1;
422 47 ::setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse));
423
424 47 sockaddr_in addr = detail::to_sockaddr_in(ep);
425
2/2
✓ Branch 1 taken 5 times.
✓ Branch 2 taken 42 times.
47 if (::bind(fd, reinterpret_cast<sockaddr*>(&addr), sizeof(addr)) < 0)
426 {
427 5 int errn = errno;
428
1/1
✓ Branch 1 taken 5 times.
5 ::close(fd);
429 5 return make_err(errn);
430 }
431
432
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 42 times.
42 if (::listen(fd, backlog) < 0)
433 {
434 int errn = errno;
435 ::close(fd);
436 return make_err(errn);
437 }
438
439 42 select_impl->fd_ = fd;
440
441 // Cache the local endpoint (queries OS for ephemeral port if port was 0)
442 42 sockaddr_in local_addr{};
443 42 socklen_t local_len = sizeof(local_addr);
444
1/2
✓ Branch 1 taken 42 times.
✗ Branch 2 not taken.
42 if (::getsockname(fd, reinterpret_cast<sockaddr*>(&local_addr), &local_len) == 0)
445 42 select_impl->set_local_endpoint(detail::from_sockaddr_in(local_addr));
446
447 42 return {};
448 }
449
450 void
451 5 select_acceptor_service::
452 post(select_op* op)
453 {
454 5 state_->sched_.post(op);
455 5 }
456
457 void
458 2072 select_acceptor_service::
459 work_started() noexcept
460 {
461 2072 state_->sched_.work_started();
462 2072 }
463
464 void
465 3 select_acceptor_service::
466 work_finished() noexcept
467 {
468 3 state_->sched_.work_finished();
469 3 }
470
471 select_socket_service*
472 2071 select_acceptor_service::
473 socket_service() const noexcept
474 {
475 2071 auto* svc = ctx_.find_service<detail::socket_service>();
476
2/4
✓ Branch 0 taken 2071 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 2071 times.
✗ Branch 3 not taken.
2071 return svc ? dynamic_cast<select_socket_service*>(svc) : nullptr;
477 }
478
479 } // namespace boost::corosio::detail
480
481 #endif // BOOST_COROSIO_HAS_SELECT
482