libs/corosio/src/corosio/src/detail/epoll/acceptors.cpp

79.9% Lines (175/219) 94.7% Functions (18/19) 50.0% Branches (57/114)
libs/corosio/src/corosio/src/detail/epoll/acceptors.cpp
Line Branch Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #include <boost/corosio/detail/platform.hpp>
11
12 #if BOOST_COROSIO_HAS_EPOLL
13
14 #include "src/detail/epoll/acceptors.hpp"
15 #include "src/detail/epoll/sockets.hpp"
16 #include "src/detail/endpoint_convert.hpp"
17 #include "src/detail/make_err.hpp"
18
19 #include <errno.h>
20 #include <netinet/in.h>
21 #include <sys/epoll.h>
22 #include <sys/socket.h>
23 #include <unistd.h>
24
25 namespace boost::corosio::detail {
26
27 void
28 6 epoll_accept_op::
29 cancel() noexcept
30 {
31
1/2
✓ Branch 0 taken 6 times.
✗ Branch 1 not taken.
6 if (acceptor_impl_)
32 6 acceptor_impl_->cancel_single_op(*this);
33 else
34 request_cancel();
35 6 }
36
37 void
38 2651 epoll_accept_op::
39 operator()()
40 {
41 2651 stop_cb.reset();
42
43
3/4
✓ Branch 0 taken 2651 times.
✗ Branch 1 not taken.
✓ Branch 3 taken 2642 times.
✓ Branch 4 taken 9 times.
2651 bool success = (errn == 0 && !cancelled.load(std::memory_order_acquire));
44
45
1/2
✓ Branch 0 taken 2651 times.
✗ Branch 1 not taken.
2651 if (ec_out)
46 {
47
2/2
✓ Branch 1 taken 9 times.
✓ Branch 2 taken 2642 times.
2651 if (cancelled.load(std::memory_order_acquire))
48 9 *ec_out = capy::error::canceled;
49
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2642 times.
2642 else if (errn != 0)
50 *ec_out = make_err(errn);
51 else
52 2642 *ec_out = {};
53 }
54
55
3/4
✓ Branch 0 taken 2642 times.
✓ Branch 1 taken 9 times.
✓ Branch 2 taken 2642 times.
✗ Branch 3 not taken.
2651 if (success && accepted_fd >= 0)
56 {
57
1/2
✓ Branch 0 taken 2642 times.
✗ Branch 1 not taken.
2642 if (acceptor_impl_)
58 {
59 2642 auto* socket_svc = static_cast<epoll_acceptor_impl*>(acceptor_impl_)
60 2642 ->service().socket_service();
61
1/2
✓ Branch 0 taken 2642 times.
✗ Branch 1 not taken.
2642 if (socket_svc)
62 {
63
1/1
✓ Branch 1 taken 2642 times.
2642 auto& impl = static_cast<epoll_socket_impl&>(socket_svc->create_impl());
64 2642 impl.set_socket(accepted_fd);
65
66 // Register accepted socket with epoll (edge-triggered mode)
67 2642 impl.desc_data_.fd = accepted_fd;
68 2642 impl.desc_data_.read_op.store(nullptr, std::memory_order_relaxed);
69 2642 impl.desc_data_.write_op.store(nullptr, std::memory_order_relaxed);
70 2642 impl.desc_data_.connect_op.store(nullptr, std::memory_order_relaxed);
71
1/1
✓ Branch 2 taken 2642 times.
2642 socket_svc->scheduler().register_descriptor(accepted_fd, &impl.desc_data_);
72
73 2642 sockaddr_in local_addr{};
74 2642 socklen_t local_len = sizeof(local_addr);
75 2642 sockaddr_in remote_addr{};
76 2642 socklen_t remote_len = sizeof(remote_addr);
77
78 2642 endpoint local_ep, remote_ep;
79
1/2
✓ Branch 1 taken 2642 times.
✗ Branch 2 not taken.
2642 if (::getsockname(accepted_fd, reinterpret_cast<sockaddr*>(&local_addr), &local_len) == 0)
80 2642 local_ep = from_sockaddr_in(local_addr);
81
1/2
✓ Branch 1 taken 2642 times.
✗ Branch 2 not taken.
2642 if (::getpeername(accepted_fd, reinterpret_cast<sockaddr*>(&remote_addr), &remote_len) == 0)
82 2642 remote_ep = from_sockaddr_in(remote_addr);
83
84 2642 impl.set_endpoints(local_ep, remote_ep);
85
86
1/2
✓ Branch 0 taken 2642 times.
✗ Branch 1 not taken.
2642 if (impl_out)
87 2642 *impl_out = &impl;
88
89 2642 accepted_fd = -1;
90 }
91 else
92 {
93 if (ec_out && !*ec_out)
94 *ec_out = make_err(ENOENT);
95 ::close(accepted_fd);
96 accepted_fd = -1;
97 if (impl_out)
98 *impl_out = nullptr;
99 }
100 }
101 else
102 {
103 ::close(accepted_fd);
104 accepted_fd = -1;
105 if (impl_out)
106 *impl_out = nullptr;
107 }
108 2642 }
109 else
110 {
111
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 9 times.
9 if (accepted_fd >= 0)
112 {
113 ::close(accepted_fd);
114 accepted_fd = -1;
115 }
116
117
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 9 times.
9 if (peer_impl)
118 {
119 peer_impl->release();
120 peer_impl = nullptr;
121 }
122
123
1/2
✓ Branch 0 taken 9 times.
✗ Branch 1 not taken.
9 if (impl_out)
124 9 *impl_out = nullptr;
125 }
126
127 // Move to stack before resuming. See epoll_op::operator()() for rationale.
128 2651 capy::executor_ref saved_ex( std::move( ex ) );
129 2651 capy::coro saved_h( std::move( h ) );
130 2651 auto prevent_premature_destruction = std::move(impl_ptr);
131
1/1
✓ Branch 1 taken 2651 times.
2651 saved_ex.dispatch( saved_h );
132 2651 }
133
134 66 epoll_acceptor_impl::
135 66 epoll_acceptor_impl(epoll_acceptor_service& svc) noexcept
136 66 : svc_(svc)
137 {
138 66 }
139
140 void
141 epoll_acceptor_impl::
142 update_epoll_events() noexcept
143 {
144 svc_.scheduler().update_descriptor_events(fd_, &desc_data_, 0);
145 }
146
147 void
148 66 epoll_acceptor_impl::
149 release()
150 {
151 66 close_socket();
152 66 svc_.destroy_acceptor_impl(*this);
153 66 }
154
155 void
156 2651 epoll_acceptor_impl::
157 accept(
158 std::coroutine_handle<> h,
159 capy::executor_ref ex,
160 std::stop_token token,
161 std::error_code* ec,
162 io_object::io_object_impl** impl_out)
163 {
164 2651 auto& op = acc_;
165 2651 op.reset();
166 2651 op.h = h;
167 2651 op.ex = ex;
168 2651 op.ec_out = ec;
169 2651 op.impl_out = impl_out;
170 2651 op.fd = fd_;
171 2651 op.start(token, this);
172
173 2651 sockaddr_in addr{};
174 2651 socklen_t addrlen = sizeof(addr);
175
1/1
✓ Branch 1 taken 2651 times.
2651 int accepted = ::accept4(fd_, reinterpret_cast<sockaddr*>(&addr),
176 &addrlen, SOCK_NONBLOCK | SOCK_CLOEXEC);
177
178
2/2
✓ Branch 0 taken 2 times.
✓ Branch 1 taken 2649 times.
2651 if (accepted >= 0)
179 {
180 2 desc_data_.read_ready.store(false, std::memory_order_relaxed);
181 2 op.accepted_fd = accepted;
182 2 op.complete(0, 0);
183
1/1
✓ Branch 1 taken 2 times.
2 op.impl_ptr = shared_from_this();
184
1/1
✓ Branch 1 taken 2 times.
2 svc_.post(&op);
185 2651 return;
186 }
187
188
1/4
✗ Branch 0 not taken.
✓ Branch 1 taken 2649 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
2649 if (errno == EAGAIN || errno == EWOULDBLOCK)
189 {
190 2649 svc_.work_started();
191
1/1
✓ Branch 1 taken 2649 times.
2649 op.impl_ptr = shared_from_this();
192
193 2649 desc_data_.read_op.store(&op, std::memory_order_release);
194 std::atomic_thread_fence(std::memory_order_seq_cst);
195
196
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 2649 times.
2649 if (desc_data_.read_ready.exchange(false, std::memory_order_acquire))
197 {
198 auto* claimed = desc_data_.read_op.exchange(nullptr, std::memory_order_acq_rel);
199 if (claimed)
200 {
201 claimed->perform_io();
202 if (claimed->errn == EAGAIN || claimed->errn == EWOULDBLOCK)
203 {
204 claimed->errn = 0;
205 desc_data_.read_op.store(claimed, std::memory_order_release);
206 }
207 else
208 {
209 svc_.post(claimed);
210 svc_.work_finished();
211 }
212 return;
213 }
214 }
215
216
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 2649 times.
2649 if (op.cancelled.load(std::memory_order_acquire))
217 {
218 auto* claimed = desc_data_.read_op.exchange(nullptr, std::memory_order_acq_rel);
219 if (claimed)
220 {
221 svc_.post(claimed);
222 svc_.work_finished();
223 }
224 }
225 2649 return;
226 }
227
228 op.complete(errno, 0);
229 op.impl_ptr = shared_from_this();
230 svc_.post(&op);
231 }
232
233 void
234 133 epoll_acceptor_impl::
235 cancel() noexcept
236 {
237 133 std::shared_ptr<epoll_acceptor_impl> self;
238 try {
239
1/1
✓ Branch 1 taken 133 times.
133 self = shared_from_this();
240 } catch (const std::bad_weak_ptr&) {
241 return;
242 }
243
244 133 acc_.request_cancel();
245 // Use atomic exchange - only one of cancellation or reactor will succeed
246 133 auto* claimed = desc_data_.read_op.exchange(nullptr, std::memory_order_acq_rel);
247
2/2
✓ Branch 0 taken 3 times.
✓ Branch 1 taken 130 times.
133 if (claimed == &acc_)
248 {
249 3 acc_.impl_ptr = self;
250 3 svc_.post(&acc_);
251 3 svc_.work_finished();
252 }
253 133 }
254
255 void
256 6 epoll_acceptor_impl::
257 cancel_single_op(epoll_op& op) noexcept
258 {
259 6 op.request_cancel();
260
261 // Use atomic exchange - only one of cancellation or reactor will succeed
262 6 auto* claimed = desc_data_.read_op.exchange(nullptr, std::memory_order_acq_rel);
263
1/2
✓ Branch 0 taken 6 times.
✗ Branch 1 not taken.
6 if (claimed == &op)
264 {
265 try {
266
1/1
✓ Branch 1 taken 6 times.
6 op.impl_ptr = shared_from_this();
267 } catch (const std::bad_weak_ptr&) {}
268 6 svc_.post(&op);
269 6 svc_.work_finished();
270 }
271 6 }
272
273 void
274 132 epoll_acceptor_impl::
275 close_socket() noexcept
276 {
277 132 cancel();
278
279
2/2
✓ Branch 0 taken 57 times.
✓ Branch 1 taken 75 times.
132 if (fd_ >= 0)
280 {
281
1/2
✓ Branch 0 taken 57 times.
✗ Branch 1 not taken.
57 if (desc_data_.registered_events != 0)
282 57 svc_.scheduler().deregister_descriptor(fd_);
283 57 ::close(fd_);
284 57 fd_ = -1;
285 }
286
287 132 desc_data_.fd = -1;
288 132 desc_data_.is_registered = false;
289 132 desc_data_.read_op.store(nullptr, std::memory_order_relaxed);
290 132 desc_data_.read_ready.store(false, std::memory_order_relaxed);
291 132 desc_data_.write_ready.store(false, std::memory_order_relaxed);
292 132 desc_data_.registered_events = 0;
293
294 // Clear cached endpoint
295 132 local_endpoint_ = endpoint{};
296 132 }
297
298 184 epoll_acceptor_service::
299 184 epoll_acceptor_service(capy::execution_context& ctx)
300 184 : ctx_(ctx)
301
2/2
✓ Branch 2 taken 184 times.
✓ Branch 5 taken 184 times.
184 , state_(std::make_unique<epoll_acceptor_state>(ctx.use_service<epoll_scheduler>()))
302 {
303 184 }
304
305 368 epoll_acceptor_service::
306 184 ~epoll_acceptor_service()
307 {
308 368 }
309
310 void
311 184 epoll_acceptor_service::
312 shutdown()
313 {
314
1/1
✓ Branch 2 taken 184 times.
184 std::lock_guard lock(state_->mutex_);
315
316
1/2
✗ Branch 2 not taken.
✓ Branch 3 taken 184 times.
184 while (auto* impl = state_->acceptor_list_.pop_front())
317 impl->close_socket();
318
319 184 state_->acceptor_ptrs_.clear();
320 184 }
321
322 tcp_acceptor::acceptor_impl&
323 66 epoll_acceptor_service::
324 create_acceptor_impl()
325 {
326
1/1
✓ Branch 1 taken 66 times.
66 auto impl = std::make_shared<epoll_acceptor_impl>(*this);
327 66 auto* raw = impl.get();
328
329
1/1
✓ Branch 2 taken 66 times.
66 std::lock_guard lock(state_->mutex_);
330 66 state_->acceptor_list_.push_back(raw);
331
1/1
✓ Branch 3 taken 66 times.
66 state_->acceptor_ptrs_.emplace(raw, std::move(impl));
332
333 66 return *raw;
334 66 }
335
336 void
337 66 epoll_acceptor_service::
338 destroy_acceptor_impl(tcp_acceptor::acceptor_impl& impl)
339 {
340 66 auto* epoll_impl = static_cast<epoll_acceptor_impl*>(&impl);
341
1/1
✓ Branch 2 taken 66 times.
66 std::lock_guard lock(state_->mutex_);
342 66 state_->acceptor_list_.remove(epoll_impl);
343
1/1
✓ Branch 2 taken 66 times.
66 state_->acceptor_ptrs_.erase(epoll_impl);
344 66 }
345
346 std::error_code
347 66 epoll_acceptor_service::
348 open_acceptor(
349 tcp_acceptor::acceptor_impl& impl,
350 endpoint ep,
351 int backlog)
352 {
353 66 auto* epoll_impl = static_cast<epoll_acceptor_impl*>(&impl);
354 66 epoll_impl->close_socket();
355
356 66 int fd = ::socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, 0);
357
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 66 times.
66 if (fd < 0)
358 return make_err(errno);
359
360 66 int reuse = 1;
361 66 ::setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse));
362
363 66 sockaddr_in addr = detail::to_sockaddr_in(ep);
364
2/2
✓ Branch 1 taken 9 times.
✓ Branch 2 taken 57 times.
66 if (::bind(fd, reinterpret_cast<sockaddr*>(&addr), sizeof(addr)) < 0)
365 {
366 9 int errn = errno;
367
1/1
✓ Branch 1 taken 9 times.
9 ::close(fd);
368 9 return make_err(errn);
369 }
370
371
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 57 times.
57 if (::listen(fd, backlog) < 0)
372 {
373 int errn = errno;
374 ::close(fd);
375 return make_err(errn);
376 }
377
378 57 epoll_impl->fd_ = fd;
379
380 // Register fd with epoll (edge-triggered mode)
381 57 epoll_impl->desc_data_.fd = fd;
382 57 epoll_impl->desc_data_.read_op.store(nullptr, std::memory_order_relaxed);
383
1/1
✓ Branch 2 taken 57 times.
57 scheduler().register_descriptor(fd, &epoll_impl->desc_data_);
384
385 // Cache the local endpoint (queries OS for ephemeral port if port was 0)
386 57 sockaddr_in local_addr{};
387 57 socklen_t local_len = sizeof(local_addr);
388
1/2
✓ Branch 1 taken 57 times.
✗ Branch 2 not taken.
57 if (::getsockname(fd, reinterpret_cast<sockaddr*>(&local_addr), &local_len) == 0)
389 57 epoll_impl->set_local_endpoint(detail::from_sockaddr_in(local_addr));
390
391 57 return {};
392 }
393
394 void
395 11 epoll_acceptor_service::
396 post(epoll_op* op)
397 {
398 11 state_->sched_.post(op);
399 11 }
400
401 void
402 2649 epoll_acceptor_service::
403 work_started() noexcept
404 {
405 2649 state_->sched_.work_started();
406 2649 }
407
408 void
409 9 epoll_acceptor_service::
410 work_finished() noexcept
411 {
412 9 state_->sched_.work_finished();
413 9 }
414
415 epoll_socket_service*
416 2642 epoll_acceptor_service::
417 socket_service() const noexcept
418 {
419 2642 auto* svc = ctx_.find_service<detail::socket_service>();
420
2/4
✓ Branch 0 taken 2642 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 2642 times.
✗ Branch 3 not taken.
2642 return svc ? dynamic_cast<epoll_socket_service*>(svc) : nullptr;
421 }
422
423 } // namespace boost::corosio::detail
424
425 #endif // BOOST_COROSIO_HAS_EPOLL
426