Line data Source code
1 : //
2 : // Copyright (c) 2026 Steve Gerbino
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/corosio
8 : //
9 :
10 : #include <boost/corosio/detail/platform.hpp>
11 :
12 : #if BOOST_COROSIO_HAS_SELECT
13 :
14 : #include "src/detail/select/acceptors.hpp"
15 : #include "src/detail/select/sockets.hpp"
16 : #include "src/detail/endpoint_convert.hpp"
17 : #include "src/detail/make_err.hpp"
18 :
19 : #include <errno.h>
20 : #include <fcntl.h>
21 : #include <netinet/in.h>
22 : #include <sys/socket.h>
23 : #include <unistd.h>
24 :
25 : namespace boost::corosio::detail {
26 :
27 : void
28 0 : select_accept_op::
29 : cancel() noexcept
30 : {
31 0 : if (acceptor_impl_)
32 0 : acceptor_impl_->cancel_single_op(*this);
33 : else
34 0 : request_cancel();
35 0 : }
36 :
37 : void
38 2074 : select_accept_op::
39 : operator()()
40 : {
41 2074 : stop_cb.reset();
42 :
43 2074 : bool success = (errn == 0 && !cancelled.load(std::memory_order_acquire));
44 :
45 2074 : if (ec_out)
46 : {
47 2074 : if (cancelled.load(std::memory_order_acquire))
48 3 : *ec_out = capy::error::canceled;
49 2071 : else if (errn != 0)
50 0 : *ec_out = make_err(errn);
51 : else
52 2071 : *ec_out = {};
53 : }
54 :
55 2074 : if (success && accepted_fd >= 0)
56 : {
57 2071 : if (acceptor_impl_)
58 : {
59 2071 : auto* socket_svc = static_cast<select_acceptor_impl*>(acceptor_impl_)
60 2071 : ->service().socket_service();
61 2071 : if (socket_svc)
62 : {
63 2071 : auto& impl = static_cast<select_socket_impl&>(socket_svc->create_impl());
64 2071 : impl.set_socket(accepted_fd);
65 :
66 2071 : sockaddr_in local_addr{};
67 2071 : socklen_t local_len = sizeof(local_addr);
68 2071 : sockaddr_in remote_addr{};
69 2071 : socklen_t remote_len = sizeof(remote_addr);
70 :
71 2071 : endpoint local_ep, remote_ep;
72 2071 : if (::getsockname(accepted_fd, reinterpret_cast<sockaddr*>(&local_addr), &local_len) == 0)
73 2071 : local_ep = from_sockaddr_in(local_addr);
74 2071 : if (::getpeername(accepted_fd, reinterpret_cast<sockaddr*>(&remote_addr), &remote_len) == 0)
75 2071 : remote_ep = from_sockaddr_in(remote_addr);
76 :
77 2071 : impl.set_endpoints(local_ep, remote_ep);
78 :
79 2071 : if (impl_out)
80 2071 : *impl_out = &impl;
81 :
82 2071 : accepted_fd = -1;
83 : }
84 : else
85 : {
86 0 : if (ec_out && !*ec_out)
87 0 : *ec_out = make_err(ENOENT);
88 0 : ::close(accepted_fd);
89 0 : accepted_fd = -1;
90 0 : if (impl_out)
91 0 : *impl_out = nullptr;
92 : }
93 : }
94 : else
95 : {
96 0 : ::close(accepted_fd);
97 0 : accepted_fd = -1;
98 0 : if (impl_out)
99 0 : *impl_out = nullptr;
100 : }
101 2071 : }
102 : else
103 : {
104 3 : if (accepted_fd >= 0)
105 : {
106 0 : ::close(accepted_fd);
107 0 : accepted_fd = -1;
108 : }
109 :
110 3 : if (peer_impl)
111 : {
112 0 : peer_impl->release();
113 0 : peer_impl = nullptr;
114 : }
115 :
116 3 : if (impl_out)
117 3 : *impl_out = nullptr;
118 : }
119 :
120 : // Move to stack before destroying the frame
121 2074 : capy::executor_ref saved_ex( std::move( ex ) );
122 2074 : capy::coro saved_h( std::move( h ) );
123 2074 : impl_ptr.reset();
124 2074 : saved_ex.dispatch( saved_h );
125 2074 : }
126 :
127 47 : select_acceptor_impl::
128 47 : select_acceptor_impl(select_acceptor_service& svc) noexcept
129 47 : : svc_(svc)
130 : {
131 47 : }
132 :
133 : void
134 47 : select_acceptor_impl::
135 : release()
136 : {
137 47 : close_socket();
138 47 : svc_.destroy_acceptor_impl(*this);
139 47 : }
140 :
141 : void
142 2074 : select_acceptor_impl::
143 : accept(
144 : std::coroutine_handle<> h,
145 : capy::executor_ref ex,
146 : std::stop_token token,
147 : std::error_code* ec,
148 : io_object::io_object_impl** impl_out)
149 : {
150 2074 : auto& op = acc_;
151 2074 : op.reset();
152 2074 : op.h = h;
153 2074 : op.ex = ex;
154 2074 : op.ec_out = ec;
155 2074 : op.impl_out = impl_out;
156 2074 : op.fd = fd_;
157 2074 : op.start(token, this);
158 :
159 2074 : sockaddr_in addr{};
160 2074 : socklen_t addrlen = sizeof(addr);
161 2074 : int accepted = ::accept(fd_, reinterpret_cast<sockaddr*>(&addr), &addrlen);
162 :
163 2074 : if (accepted >= 0)
164 : {
165 : // Reject fds that exceed select()'s FD_SETSIZE limit.
166 : // Better to fail now than during later async operations.
167 2 : if (accepted >= FD_SETSIZE)
168 : {
169 0 : ::close(accepted);
170 0 : op.accepted_fd = -1;
171 0 : op.complete(EINVAL, 0);
172 0 : op.impl_ptr = shared_from_this();
173 0 : svc_.post(&op);
174 2074 : return;
175 : }
176 :
177 : // Set non-blocking and close-on-exec flags.
178 : // A non-blocking socket is essential for the async reactor;
179 : // if we can't configure it, fail rather than risk blocking.
180 2 : int flags = ::fcntl(accepted, F_GETFL, 0);
181 2 : if (flags == -1)
182 : {
183 0 : int err = errno;
184 0 : ::close(accepted);
185 0 : op.accepted_fd = -1;
186 0 : op.complete(err, 0);
187 0 : op.impl_ptr = shared_from_this();
188 0 : svc_.post(&op);
189 0 : return;
190 : }
191 :
192 2 : if (::fcntl(accepted, F_SETFL, flags | O_NONBLOCK) == -1)
193 : {
194 0 : int err = errno;
195 0 : ::close(accepted);
196 0 : op.accepted_fd = -1;
197 0 : op.complete(err, 0);
198 0 : op.impl_ptr = shared_from_this();
199 0 : svc_.post(&op);
200 0 : return;
201 : }
202 :
203 2 : if (::fcntl(accepted, F_SETFD, FD_CLOEXEC) == -1)
204 : {
205 0 : int err = errno;
206 0 : ::close(accepted);
207 0 : op.accepted_fd = -1;
208 0 : op.complete(err, 0);
209 0 : op.impl_ptr = shared_from_this();
210 0 : svc_.post(&op);
211 0 : return;
212 : }
213 :
214 2 : op.accepted_fd = accepted;
215 2 : op.complete(0, 0);
216 2 : op.impl_ptr = shared_from_this();
217 2 : svc_.post(&op);
218 2 : return;
219 : }
220 :
221 2072 : if (errno == EAGAIN || errno == EWOULDBLOCK)
222 : {
223 2072 : svc_.work_started();
224 2072 : op.impl_ptr = shared_from_this();
225 :
226 : // Set registering BEFORE register_fd to close the race window where
227 : // reactor sees an event before we set registered.
228 2072 : op.registered.store(select_registration_state::registering, std::memory_order_release);
229 2072 : svc_.scheduler().register_fd(fd_, &op, select_scheduler::event_read);
230 :
231 : // Transition to registered. If this fails, reactor or cancel already
232 : // claimed the op (state is now unregistered), so we're done. However,
233 : // we must still deregister the fd because cancel's deregister_fd may
234 : // have run before our register_fd, leaving the fd orphaned.
235 2072 : auto expected = select_registration_state::registering;
236 2072 : if (!op.registered.compare_exchange_strong(
237 : expected, select_registration_state::registered, std::memory_order_acq_rel))
238 : {
239 0 : svc_.scheduler().deregister_fd(fd_, select_scheduler::event_read);
240 0 : return;
241 : }
242 :
243 : // If cancelled was set before we registered, handle it now.
244 2072 : if (op.cancelled.load(std::memory_order_acquire))
245 : {
246 0 : auto prev = op.registered.exchange(
247 : select_registration_state::unregistered, std::memory_order_acq_rel);
248 0 : if (prev != select_registration_state::unregistered)
249 : {
250 0 : svc_.scheduler().deregister_fd(fd_, select_scheduler::event_read);
251 0 : op.impl_ptr = shared_from_this();
252 0 : svc_.post(&op);
253 0 : svc_.work_finished();
254 : }
255 : }
256 2072 : return;
257 : }
258 :
259 0 : op.complete(errno, 0);
260 0 : op.impl_ptr = shared_from_this();
261 0 : svc_.post(&op);
262 : }
263 :
264 : void
265 95 : select_acceptor_impl::
266 : cancel() noexcept
267 : {
268 95 : std::shared_ptr<select_acceptor_impl> self;
269 : try {
270 95 : self = shared_from_this();
271 0 : } catch (const std::bad_weak_ptr&) {
272 0 : return;
273 0 : }
274 :
275 95 : auto prev = acc_.registered.exchange(
276 : select_registration_state::unregistered, std::memory_order_acq_rel);
277 95 : acc_.request_cancel();
278 :
279 95 : if (prev != select_registration_state::unregistered)
280 : {
281 3 : svc_.scheduler().deregister_fd(fd_, select_scheduler::event_read);
282 3 : acc_.impl_ptr = self;
283 3 : svc_.post(&acc_);
284 3 : svc_.work_finished();
285 : }
286 95 : }
287 :
288 : void
289 0 : select_acceptor_impl::
290 : cancel_single_op(select_op& op) noexcept
291 : {
292 : // Called from stop_token callback to cancel a specific pending operation.
293 0 : auto prev = op.registered.exchange(
294 : select_registration_state::unregistered, std::memory_order_acq_rel);
295 0 : op.request_cancel();
296 :
297 0 : if (prev != select_registration_state::unregistered)
298 : {
299 0 : svc_.scheduler().deregister_fd(fd_, select_scheduler::event_read);
300 :
301 : // Keep impl alive until op completes
302 : try {
303 0 : op.impl_ptr = shared_from_this();
304 0 : } catch (const std::bad_weak_ptr&) {
305 : // Impl is being destroyed, op will be orphaned but that's ok
306 0 : }
307 :
308 0 : svc_.post(&op);
309 0 : svc_.work_finished();
310 : }
311 0 : }
312 :
313 : void
314 94 : select_acceptor_impl::
315 : close_socket() noexcept
316 : {
317 94 : cancel();
318 :
319 94 : if (fd_ >= 0)
320 : {
321 : // Unconditionally remove from registered_fds_ to handle edge cases
322 42 : svc_.scheduler().deregister_fd(fd_, select_scheduler::event_read);
323 42 : ::close(fd_);
324 42 : fd_ = -1;
325 : }
326 :
327 : // Clear cached endpoint
328 94 : local_endpoint_ = endpoint{};
329 94 : }
330 :
331 120 : select_acceptor_service::
332 120 : select_acceptor_service(capy::execution_context& ctx)
333 120 : : ctx_(ctx)
334 120 : , state_(std::make_unique<select_acceptor_state>(ctx.use_service<select_scheduler>()))
335 : {
336 120 : }
337 :
338 240 : select_acceptor_service::
339 120 : ~select_acceptor_service()
340 : {
341 240 : }
342 :
343 : void
344 120 : select_acceptor_service::
345 : shutdown()
346 : {
347 120 : std::lock_guard lock(state_->mutex_);
348 :
349 120 : while (auto* impl = state_->acceptor_list_.pop_front())
350 0 : impl->close_socket();
351 :
352 120 : state_->acceptor_ptrs_.clear();
353 120 : }
354 :
355 : tcp_acceptor::acceptor_impl&
356 47 : select_acceptor_service::
357 : create_acceptor_impl()
358 : {
359 47 : auto impl = std::make_shared<select_acceptor_impl>(*this);
360 47 : auto* raw = impl.get();
361 :
362 47 : std::lock_guard lock(state_->mutex_);
363 47 : state_->acceptor_list_.push_back(raw);
364 47 : state_->acceptor_ptrs_.emplace(raw, std::move(impl));
365 :
366 47 : return *raw;
367 47 : }
368 :
369 : void
370 47 : select_acceptor_service::
371 : destroy_acceptor_impl(tcp_acceptor::acceptor_impl& impl)
372 : {
373 47 : auto* select_impl = static_cast<select_acceptor_impl*>(&impl);
374 47 : std::lock_guard lock(state_->mutex_);
375 47 : state_->acceptor_list_.remove(select_impl);
376 47 : state_->acceptor_ptrs_.erase(select_impl);
377 47 : }
378 :
379 : std::error_code
380 47 : select_acceptor_service::
381 : open_acceptor(
382 : tcp_acceptor::acceptor_impl& impl,
383 : endpoint ep,
384 : int backlog)
385 : {
386 47 : auto* select_impl = static_cast<select_acceptor_impl*>(&impl);
387 47 : select_impl->close_socket();
388 :
389 47 : int fd = ::socket(AF_INET, SOCK_STREAM, 0);
390 47 : if (fd < 0)
391 0 : return make_err(errno);
392 :
393 : // Set non-blocking and close-on-exec
394 47 : int flags = ::fcntl(fd, F_GETFL, 0);
395 47 : if (flags == -1)
396 : {
397 0 : int errn = errno;
398 0 : ::close(fd);
399 0 : return make_err(errn);
400 : }
401 47 : if (::fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1)
402 : {
403 0 : int errn = errno;
404 0 : ::close(fd);
405 0 : return make_err(errn);
406 : }
407 47 : if (::fcntl(fd, F_SETFD, FD_CLOEXEC) == -1)
408 : {
409 0 : int errn = errno;
410 0 : ::close(fd);
411 0 : return make_err(errn);
412 : }
413 :
414 : // Check fd is within select() limits
415 47 : if (fd >= FD_SETSIZE)
416 : {
417 0 : ::close(fd);
418 0 : return make_err(EMFILE);
419 : }
420 :
421 47 : int reuse = 1;
422 47 : ::setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse));
423 :
424 47 : sockaddr_in addr = detail::to_sockaddr_in(ep);
425 47 : if (::bind(fd, reinterpret_cast<sockaddr*>(&addr), sizeof(addr)) < 0)
426 : {
427 5 : int errn = errno;
428 5 : ::close(fd);
429 5 : return make_err(errn);
430 : }
431 :
432 42 : if (::listen(fd, backlog) < 0)
433 : {
434 0 : int errn = errno;
435 0 : ::close(fd);
436 0 : return make_err(errn);
437 : }
438 :
439 42 : select_impl->fd_ = fd;
440 :
441 : // Cache the local endpoint (queries OS for ephemeral port if port was 0)
442 42 : sockaddr_in local_addr{};
443 42 : socklen_t local_len = sizeof(local_addr);
444 42 : if (::getsockname(fd, reinterpret_cast<sockaddr*>(&local_addr), &local_len) == 0)
445 42 : select_impl->set_local_endpoint(detail::from_sockaddr_in(local_addr));
446 :
447 42 : return {};
448 : }
449 :
450 : void
451 5 : select_acceptor_service::
452 : post(select_op* op)
453 : {
454 5 : state_->sched_.post(op);
455 5 : }
456 :
457 : void
458 2072 : select_acceptor_service::
459 : work_started() noexcept
460 : {
461 2072 : state_->sched_.work_started();
462 2072 : }
463 :
464 : void
465 3 : select_acceptor_service::
466 : work_finished() noexcept
467 : {
468 3 : state_->sched_.work_finished();
469 3 : }
470 :
471 : select_socket_service*
472 2071 : select_acceptor_service::
473 : socket_service() const noexcept
474 : {
475 2071 : auto* svc = ctx_.find_service<detail::socket_service>();
476 2071 : return svc ? dynamic_cast<select_socket_service*>(svc) : nullptr;
477 : }
478 :
479 : } // namespace boost::corosio::detail
480 :
481 : #endif // BOOST_COROSIO_HAS_SELECT
|