libs/corosio/src/corosio/src/detail/epoll/sockets.cpp

77.1% Lines (310/402) 92.5% Functions (37/40) 59.2% Branches (122/206)
libs/corosio/src/corosio/src/detail/epoll/sockets.cpp
Line Branch Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #include <boost/corosio/detail/platform.hpp>
11
12 #if BOOST_COROSIO_HAS_EPOLL
13
14 #include "src/detail/epoll/sockets.hpp"
15 #include "src/detail/endpoint_convert.hpp"
16 #include "src/detail/make_err.hpp"
17 #include "src/detail/resume_coro.hpp"
18
19 #include <boost/corosio/detail/except.hpp>
20 #include <boost/capy/buffers.hpp>
21
22 #include <errno.h>
23 #include <netinet/in.h>
24 #include <netinet/tcp.h>
25 #include <sys/epoll.h>
26 #include <sys/socket.h>
27 #include <unistd.h>
28
29 namespace boost::corosio::detail {
30
31 void
32 103 epoll_op::canceller::
33 operator()() const noexcept
34 {
35 103 op->cancel();
36 103 }
37
38 void
39 epoll_connect_op::
40 cancel() noexcept
41 {
42 if (socket_impl_)
43 socket_impl_->cancel_single_op(*this);
44 else
45 request_cancel();
46 }
47
48 void
49 97 epoll_read_op::
50 cancel() noexcept
51 {
52
1/2
✓ Branch 0 taken 97 times.
✗ Branch 1 not taken.
97 if (socket_impl_)
53 97 socket_impl_->cancel_single_op(*this);
54 else
55 request_cancel();
56 97 }
57
58 void
59 epoll_write_op::
60 cancel() noexcept
61 {
62 if (socket_impl_)
63 socket_impl_->cancel_single_op(*this);
64 else
65 request_cancel();
66 }
67
68 void
69 2643 epoll_connect_op::
70 operator()()
71 {
72 2643 stop_cb.reset();
73
74
3/4
✓ Branch 0 taken 2642 times.
✓ Branch 1 taken 1 time.
✓ Branch 3 taken 2642 times.
✗ Branch 4 not taken.
2643 bool success = (errn == 0 && !cancelled.load(std::memory_order_acquire));
75
76 // Cache endpoints on successful connect
77
3/4
✓ Branch 0 taken 2642 times.
✓ Branch 1 taken 1 time.
✓ Branch 2 taken 2642 times.
✗ Branch 3 not taken.
2643 if (success && socket_impl_)
78 {
79 // Query local endpoint via getsockname (may fail, but remote is always known)
80 2642 endpoint local_ep;
81 2642 sockaddr_in local_addr{};
82 2642 socklen_t local_len = sizeof(local_addr);
83
1/2
✓ Branch 1 taken 2642 times.
✗ Branch 2 not taken.
2642 if (::getsockname(fd, reinterpret_cast<sockaddr*>(&local_addr), &local_len) == 0)
84 2642 local_ep = from_sockaddr_in(local_addr);
85 // Always cache remote endpoint; local may be default if getsockname failed
86 2642 static_cast<epoll_socket_impl*>(socket_impl_)->set_endpoints(local_ep, target_endpoint);
87 }
88
89
1/2
✓ Branch 0 taken 2643 times.
✗ Branch 1 not taken.
2643 if (ec_out)
90 {
91
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 2643 times.
2643 if (cancelled.load(std::memory_order_acquire))
92 *ec_out = capy::error::canceled;
93
2/2
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 2642 times.
2643 else if (errn != 0)
94 1 *ec_out = make_err(errn);
95 else
96 2642 *ec_out = {};
97 }
98
99
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2643 times.
2643 if (bytes_out)
100 *bytes_out = bytes_transferred;
101
102 // Move to stack before resuming. See epoll_op::operator()() for rationale.
103 2643 capy::executor_ref saved_ex( std::move( ex ) );
104 2643 capy::coro saved_h( std::move( h ) );
105 2643 auto prevent_premature_destruction = std::move(impl_ptr);
106
1/1
✓ Branch 1 taken 2643 times.
2643 resume_coro(saved_ex, saved_h);
107 2643 }
108
109 5296 epoll_socket_impl::
110 5296 epoll_socket_impl(epoll_socket_service& svc) noexcept
111 5296 : svc_(svc)
112 {
113 5296 }
114
115 5296 epoll_socket_impl::
116 5296 ~epoll_socket_impl()
117 {
118
2/2
✓ Branch 1 taken 40 times.
✓ Branch 2 taken 5256 times.
5296 if (read_initiator_handle_)
119 40 read_initiator_handle_.destroy();
120
2/2
✓ Branch 1 taken 38 times.
✓ Branch 2 taken 5258 times.
5296 if (write_initiator_handle_)
121 38 write_initiator_handle_.destroy();
122
123 // promise_type::operator delete is no-op, so free here
124
2/2
✓ Branch 0 taken 40 times.
✓ Branch 1 taken 5256 times.
5296 if (read_initiator_frame_)
125 40 ::operator delete(read_initiator_frame_);
126
2/2
✓ Branch 0 taken 38 times.
✓ Branch 1 taken 5258 times.
5296 if (write_initiator_frame_)
127 38 ::operator delete(write_initiator_frame_);
128 5296 }
129
130 void
131 epoll_socket_impl::
132 update_epoll_events() noexcept
133 {
134 // With EPOLLET, update_descriptor_events just provides a memory fence
135 svc_.scheduler().update_descriptor_events(fd_, &desc_data_, 0);
136 }
137
138 void
139 5296 epoll_socket_impl::
140 release()
141 {
142 5296 close_socket();
143 5296 svc_.destroy_impl(*this);
144 5296 }
145
146 void
147 2643 epoll_socket_impl::
148 connect(
149 std::coroutine_handle<> h,
150 capy::executor_ref ex,
151 endpoint ep,
152 std::stop_token token,
153 std::error_code* ec)
154 {
155 2643 auto& op = conn_;
156 2643 op.reset();
157 2643 op.h = h;
158 2643 op.ex = ex;
159 2643 op.ec_out = ec;
160 2643 op.fd = fd_;
161 2643 op.target_endpoint = ep; // Store target for endpoint caching
162 2643 op.start(token, this);
163
164 2643 sockaddr_in addr = detail::to_sockaddr_in(ep);
165
1/1
✓ Branch 1 taken 2643 times.
2643 int result = ::connect(fd_, reinterpret_cast<sockaddr*>(&addr), sizeof(addr));
166
167
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2643 times.
2643 if (result == 0)
168 {
169 // Sync success - cache endpoints immediately
170 // Remote is always known; local may fail but we still cache remote
171 sockaddr_in local_addr{};
172 socklen_t local_len = sizeof(local_addr);
173 if (::getsockname(fd_, reinterpret_cast<sockaddr*>(&local_addr), &local_len) == 0)
174 local_endpoint_ = detail::from_sockaddr_in(local_addr);
175 remote_endpoint_ = ep;
176
177 op.complete(0, 0);
178 op.impl_ptr = shared_from_this();
179 svc_.post(&op);
180 return;
181 }
182
183
1/2
✓ Branch 0 taken 2643 times.
✗ Branch 1 not taken.
2643 if (errno == EINPROGRESS)
184 {
185 2643 svc_.work_started();
186
1/1
✓ Branch 1 taken 2643 times.
2643 op.impl_ptr = shared_from_this();
187
188 2643 desc_data_.connect_op.store(&op, std::memory_order_seq_cst);
189
190
2/2
✓ Branch 1 taken 43 times.
✓ Branch 2 taken 2600 times.
2643 if (desc_data_.write_ready.exchange(false, std::memory_order_seq_cst))
191 {
192 43 auto* claimed = desc_data_.connect_op.exchange(nullptr, std::memory_order_acq_rel);
193
1/2
✓ Branch 0 taken 43 times.
✗ Branch 1 not taken.
43 if (claimed)
194 {
195 43 claimed->perform_io();
196
2/4
✓ Branch 0 taken 43 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 43 times.
43 if (claimed->errn == EAGAIN || claimed->errn == EWOULDBLOCK)
197 {
198 claimed->errn = 0;
199 desc_data_.connect_op.store(claimed, std::memory_order_release);
200 }
201 else
202 {
203
1/1
✓ Branch 1 taken 43 times.
43 svc_.post(claimed);
204 43 svc_.work_finished();
205 }
206 43 return;
207 }
208 }
209
210
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 2600 times.
2600 if (op.cancelled.load(std::memory_order_acquire))
211 {
212 auto* claimed = desc_data_.connect_op.exchange(nullptr, std::memory_order_acq_rel);
213 if (claimed)
214 {
215 svc_.post(claimed);
216 svc_.work_finished();
217 }
218 }
219 2600 return;
220 }
221
222 op.complete(errno, 0);
223 op.impl_ptr = shared_from_this();
224 svc_.post(&op);
225 }
226
227 read_initiator
228
1/1
✓ Branch 1 taken 56335 times.
56335 make_read_initiator(void*& cached, epoll_socket_impl* impl)
229 {
230 impl->do_read_io();
231 co_return;
232 112670 }
233
234 write_initiator
235
1/1
✓ Branch 1 taken 56215 times.
56215 make_write_initiator(void*& cached, epoll_socket_impl* impl)
236 {
237 impl->do_write_io();
238 co_return;
239 112430 }
240
241 void
242 56335 epoll_socket_impl::
243 do_read_io()
244 {
245 56335 auto& op = rd_;
246
247 56335 ssize_t n = ::readv(fd_, op.iovecs, op.iovec_count);
248
249
2/2
✓ Branch 0 taken 56162 times.
✓ Branch 1 taken 173 times.
56335 if (n > 0)
250 {
251 56162 desc_data_.read_ready.store(false, std::memory_order_relaxed);
252 56162 op.complete(0, static_cast<std::size_t>(n));
253 56162 svc_.post(&op);
254 56162 return;
255 }
256
257
2/2
✓ Branch 0 taken 5 times.
✓ Branch 1 taken 168 times.
173 if (n == 0)
258 {
259 5 desc_data_.read_ready.store(false, std::memory_order_relaxed);
260 5 op.complete(0, 0);
261 5 svc_.post(&op);
262 5 return;
263 }
264
265
1/4
✗ Branch 0 not taken.
✓ Branch 1 taken 168 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
168 if (errno == EAGAIN || errno == EWOULDBLOCK)
266 {
267 168 svc_.work_started();
268
269 168 desc_data_.read_op.store(&op, std::memory_order_seq_cst);
270
271
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 168 times.
168 if (desc_data_.read_ready.exchange(false, std::memory_order_seq_cst))
272 {
273 auto* claimed = desc_data_.read_op.exchange(nullptr, std::memory_order_acq_rel);
274 if (claimed)
275 {
276 claimed->perform_io();
277 if (claimed->errn == EAGAIN || claimed->errn == EWOULDBLOCK)
278 {
279 claimed->errn = 0;
280 desc_data_.read_op.store(claimed, std::memory_order_release);
281 }
282 else
283 {
284 svc_.post(claimed);
285 svc_.work_finished();
286 }
287 return;
288 }
289 }
290
291
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 168 times.
168 if (op.cancelled.load(std::memory_order_acquire))
292 {
293 auto* claimed = desc_data_.read_op.exchange(nullptr, std::memory_order_acq_rel);
294 if (claimed)
295 {
296 svc_.post(claimed);
297 svc_.work_finished();
298 }
299 }
300 168 return;
301 }
302
303 op.complete(errno, 0);
304 svc_.post(&op);
305 }
306
307 void
308 56215 epoll_socket_impl::
309 do_write_io()
310 {
311 56215 auto& op = wr_;
312
313 56215 msghdr msg{};
314 56215 msg.msg_iov = op.iovecs;
315 56215 msg.msg_iovlen = static_cast<std::size_t>(op.iovec_count);
316
317
1/1
✓ Branch 1 taken 56215 times.
56215 ssize_t n = ::sendmsg(fd_, &msg, MSG_NOSIGNAL);
318
319
2/2
✓ Branch 0 taken 56214 times.
✓ Branch 1 taken 1 time.
56215 if (n > 0)
320 {
321 56214 desc_data_.write_ready.store(false, std::memory_order_relaxed);
322 56214 op.complete(0, static_cast<std::size_t>(n));
323
1/1
✓ Branch 1 taken 56214 times.
56214 svc_.post(&op);
324 56214 return;
325 }
326
327
2/4
✓ Branch 0 taken 1 time.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 1 time.
1 if (errno == EAGAIN || errno == EWOULDBLOCK)
328 {
329 svc_.work_started();
330
331 desc_data_.write_op.store(&op, std::memory_order_seq_cst);
332
333 if (desc_data_.write_ready.exchange(false, std::memory_order_seq_cst))
334 {
335 auto* claimed = desc_data_.write_op.exchange(nullptr, std::memory_order_acq_rel);
336 if (claimed)
337 {
338 claimed->perform_io();
339 if (claimed->errn == EAGAIN || claimed->errn == EWOULDBLOCK)
340 {
341 claimed->errn = 0;
342 desc_data_.write_op.store(claimed, std::memory_order_release);
343 }
344 else
345 {
346 svc_.post(claimed);
347 svc_.work_finished();
348 }
349 return;
350 }
351 }
352
353 if (op.cancelled.load(std::memory_order_acquire))
354 {
355 auto* claimed = desc_data_.write_op.exchange(nullptr, std::memory_order_acq_rel);
356 if (claimed)
357 {
358 svc_.post(claimed);
359 svc_.work_finished();
360 }
361 }
362 return;
363 }
364
365
1/2
✓ Branch 0 taken 1 time.
✗ Branch 1 not taken.
1 op.complete(errno ? errno : EIO, 0);
366
1/1
✓ Branch 1 taken 1 time.
1 svc_.post(&op);
367 }
368
369 std::coroutine_handle<>
370 56336 epoll_socket_impl::
371 read_some(
372 std::coroutine_handle<> h,
373 capy::executor_ref ex,
374 io_buffer_param param,
375 std::stop_token token,
376 std::error_code* ec,
377 std::size_t* bytes_out)
378 {
379 56336 auto& op = rd_;
380 56336 op.reset();
381 56336 op.h = h;
382 56336 op.ex = ex;
383 56336 op.ec_out = ec;
384 56336 op.bytes_out = bytes_out;
385 56336 op.fd = fd_;
386 56336 op.start(token, this);
387
1/1
✓ Branch 1 taken 56336 times.
56336 op.impl_ptr = shared_from_this();
388
389 // Must prepare buffers before initiator runs
390 56336 capy::mutable_buffer bufs[epoll_read_op::max_buffers];
391 56336 op.iovec_count = static_cast<int>(param.copy_to(bufs, epoll_read_op::max_buffers));
392
393
6/8
✓ Branch 0 taken 56335 times.
✓ Branch 1 taken 1 time.
✓ Branch 2 taken 56335 times.
✗ Branch 3 not taken.
✗ Branch 5 not taken.
✓ Branch 6 taken 56335 times.
✓ Branch 7 taken 1 time.
✓ Branch 8 taken 56335 times.
56336 if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
394 {
395 1 op.empty_buffer_read = true;
396 1 op.complete(0, 0);
397
1/1
✓ Branch 1 taken 1 time.
1 svc_.post(&op);
398 1 return std::noop_coroutine();
399 }
400
401
2/2
✓ Branch 0 taken 56335 times.
✓ Branch 1 taken 56335 times.
112670 for (int i = 0; i < op.iovec_count; ++i)
402 {
403 56335 op.iovecs[i].iov_base = bufs[i].data();
404 56335 op.iovecs[i].iov_len = bufs[i].size();
405 }
406
407
2/2
✓ Branch 1 taken 56295 times.
✓ Branch 2 taken 40 times.
56335 if (read_initiator_handle_)
408
1/1
✓ Branch 1 taken 56295 times.
56295 read_initiator_handle_.destroy();
409
410
1/1
✓ Branch 1 taken 56335 times.
56335 auto initiator = make_read_initiator(read_initiator_frame_, this);
411 56335 read_initiator_handle_ = initiator.h;
412
413 // Symmetric transfer ensures caller is suspended before I/O starts
414 56335 return initiator.h;
415 }
416
417 std::coroutine_handle<>
418 56216 epoll_socket_impl::
419 write_some(
420 std::coroutine_handle<> h,
421 capy::executor_ref ex,
422 io_buffer_param param,
423 std::stop_token token,
424 std::error_code* ec,
425 std::size_t* bytes_out)
426 {
427 56216 auto& op = wr_;
428 56216 op.reset();
429 56216 op.h = h;
430 56216 op.ex = ex;
431 56216 op.ec_out = ec;
432 56216 op.bytes_out = bytes_out;
433 56216 op.fd = fd_;
434 56216 op.start(token, this);
435
1/1
✓ Branch 1 taken 56216 times.
56216 op.impl_ptr = shared_from_this();
436
437 // Must prepare buffers before initiator runs
438 56216 capy::mutable_buffer bufs[epoll_write_op::max_buffers];
439 56216 op.iovec_count = static_cast<int>(param.copy_to(bufs, epoll_write_op::max_buffers));
440
441
6/8
✓ Branch 0 taken 56215 times.
✓ Branch 1 taken 1 time.
✓ Branch 2 taken 56215 times.
✗ Branch 3 not taken.
✗ Branch 5 not taken.
✓ Branch 6 taken 56215 times.
✓ Branch 7 taken 1 time.
✓ Branch 8 taken 56215 times.
56216 if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
442 {
443 1 op.complete(0, 0);
444
1/1
✓ Branch 1 taken 1 time.
1 svc_.post(&op);
445 1 return std::noop_coroutine();
446 }
447
448
2/2
✓ Branch 0 taken 56215 times.
✓ Branch 1 taken 56215 times.
112430 for (int i = 0; i < op.iovec_count; ++i)
449 {
450 56215 op.iovecs[i].iov_base = bufs[i].data();
451 56215 op.iovecs[i].iov_len = bufs[i].size();
452 }
453
454
2/2
✓ Branch 1 taken 56177 times.
✓ Branch 2 taken 38 times.
56215 if (write_initiator_handle_)
455
1/1
✓ Branch 1 taken 56177 times.
56177 write_initiator_handle_.destroy();
456
457
1/1
✓ Branch 1 taken 56215 times.
56215 auto initiator = make_write_initiator(write_initiator_frame_, this);
458 56215 write_initiator_handle_ = initiator.h;
459
460 // Symmetric transfer ensures caller is suspended before I/O starts
461 56215 return initiator.h;
462 }
463
464 std::error_code
465 3 epoll_socket_impl::
466 shutdown(tcp_socket::shutdown_type what) noexcept
467 {
468 int how;
469
3/4
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 1 time.
✓ Branch 2 taken 1 time.
✗ Branch 3 not taken.
3 switch (what)
470 {
471 1 case tcp_socket::shutdown_receive: how = SHUT_RD; break;
472 1 case tcp_socket::shutdown_send: how = SHUT_WR; break;
473 1 case tcp_socket::shutdown_both: how = SHUT_RDWR; break;
474 default:
475 return make_err(EINVAL);
476 }
477
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 3 times.
3 if (::shutdown(fd_, how) != 0)
478 return make_err(errno);
479 3 return {};
480 }
481
482 std::error_code
483 5 epoll_socket_impl::
484 set_no_delay(bool value) noexcept
485 {
486
2/2
✓ Branch 0 taken 4 times.
✓ Branch 1 taken 1 time.
5 int flag = value ? 1 : 0;
487
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 5 times.
5 if (::setsockopt(fd_, IPPROTO_TCP, TCP_NODELAY, &flag, sizeof(flag)) != 0)
488 return make_err(errno);
489 5 return {};
490 }
491
492 bool
493 5 epoll_socket_impl::
494 no_delay(std::error_code& ec) const noexcept
495 {
496 5 int flag = 0;
497 5 socklen_t len = sizeof(flag);
498
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 5 times.
5 if (::getsockopt(fd_, IPPROTO_TCP, TCP_NODELAY, &flag, &len) != 0)
499 {
500 ec = make_err(errno);
501 return false;
502 }
503 5 ec = {};
504 5 return flag != 0;
505 }
506
507 std::error_code
508 4 epoll_socket_impl::
509 set_keep_alive(bool value) noexcept
510 {
511
2/2
✓ Branch 0 taken 3 times.
✓ Branch 1 taken 1 time.
4 int flag = value ? 1 : 0;
512
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 4 times.
4 if (::setsockopt(fd_, SOL_SOCKET, SO_KEEPALIVE, &flag, sizeof(flag)) != 0)
513 return make_err(errno);
514 4 return {};
515 }
516
517 bool
518 4 epoll_socket_impl::
519 keep_alive(std::error_code& ec) const noexcept
520 {
521 4 int flag = 0;
522 4 socklen_t len = sizeof(flag);
523
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 4 times.
4 if (::getsockopt(fd_, SOL_SOCKET, SO_KEEPALIVE, &flag, &len) != 0)
524 {
525 ec = make_err(errno);
526 return false;
527 }
528 4 ec = {};
529 4 return flag != 0;
530 }
531
532 std::error_code
533 1 epoll_socket_impl::
534 set_receive_buffer_size(int size) noexcept
535 {
536
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 1 time.
1 if (::setsockopt(fd_, SOL_SOCKET, SO_RCVBUF, &size, sizeof(size)) != 0)
537 return make_err(errno);
538 1 return {};
539 }
540
541 int
542 3 epoll_socket_impl::
543 receive_buffer_size(std::error_code& ec) const noexcept
544 {
545 3 int size = 0;
546 3 socklen_t len = sizeof(size);
547
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 3 times.
3 if (::getsockopt(fd_, SOL_SOCKET, SO_RCVBUF, &size, &len) != 0)
548 {
549 ec = make_err(errno);
550 return 0;
551 }
552 3 ec = {};
553 3 return size;
554 }
555
556 std::error_code
557 1 epoll_socket_impl::
558 set_send_buffer_size(int size) noexcept
559 {
560
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 1 time.
1 if (::setsockopt(fd_, SOL_SOCKET, SO_SNDBUF, &size, sizeof(size)) != 0)
561 return make_err(errno);
562 1 return {};
563 }
564
565 int
566 3 epoll_socket_impl::
567 send_buffer_size(std::error_code& ec) const noexcept
568 {
569 3 int size = 0;
570 3 socklen_t len = sizeof(size);
571
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 3 times.
3 if (::getsockopt(fd_, SOL_SOCKET, SO_SNDBUF, &size, &len) != 0)
572 {
573 ec = make_err(errno);
574 return 0;
575 }
576 3 ec = {};
577 3 return size;
578 }
579
580 std::error_code
581 4 epoll_socket_impl::
582 set_linger(bool enabled, int timeout) noexcept
583 {
584
2/2
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 3 times.
4 if (timeout < 0)
585 1 return make_err(EINVAL);
586 struct ::linger lg;
587
2/2
✓ Branch 0 taken 2 times.
✓ Branch 1 taken 1 time.
3 lg.l_onoff = enabled ? 1 : 0;
588 3 lg.l_linger = timeout;
589
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 3 times.
3 if (::setsockopt(fd_, SOL_SOCKET, SO_LINGER, &lg, sizeof(lg)) != 0)
590 return make_err(errno);
591 3 return {};
592 }
593
594 tcp_socket::linger_options
595 3 epoll_socket_impl::
596 linger(std::error_code& ec) const noexcept
597 {
598 3 struct ::linger lg{};
599 3 socklen_t len = sizeof(lg);
600
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 3 times.
3 if (::getsockopt(fd_, SOL_SOCKET, SO_LINGER, &lg, &len) != 0)
601 {
602 ec = make_err(errno);
603 return {};
604 }
605 3 ec = {};
606 3 return {.enabled = lg.l_onoff != 0, .timeout = lg.l_linger};
607 }
608
609 void
610 8046 epoll_socket_impl::
611 cancel() noexcept
612 {
613 8046 std::shared_ptr<epoll_socket_impl> self;
614 try {
615
1/1
✓ Branch 1 taken 8046 times.
8046 self = shared_from_this();
616 } catch (const std::bad_weak_ptr&) {
617 return;
618 }
619
620 // Use atomic exchange to claim operations - only one of cancellation
621 // or reactor will succeed
622 24138 auto cancel_atomic_op = [this, &self](epoll_op& op, std::atomic<epoll_op*>& desc_op_ptr) {
623 24138 op.request_cancel();
624 24138 auto* claimed = desc_op_ptr.exchange(nullptr, std::memory_order_acq_rel);
625
2/2
✓ Branch 0 taken 51 times.
✓ Branch 1 taken 24087 times.
24138 if (claimed == &op)
626 {
627 51 op.impl_ptr = self;
628 51 svc_.post(&op);
629 51 svc_.work_finished();
630 }
631 32184 };
632
633 8046 cancel_atomic_op(conn_, desc_data_.connect_op);
634 8046 cancel_atomic_op(rd_, desc_data_.read_op);
635 8046 cancel_atomic_op(wr_, desc_data_.write_op);
636 8046 }
637
638 void
639 97 epoll_socket_impl::
640 cancel_single_op(epoll_op& op) noexcept
641 {
642 97 op.request_cancel();
643
644 97 std::atomic<epoll_op*>* desc_op_ptr = nullptr;
645
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 97 times.
97 if (&op == &conn_) desc_op_ptr = &desc_data_.connect_op;
646
1/2
✓ Branch 0 taken 97 times.
✗ Branch 1 not taken.
97 else if (&op == &rd_) desc_op_ptr = &desc_data_.read_op;
647 else if (&op == &wr_) desc_op_ptr = &desc_data_.write_op;
648
649
1/2
✓ Branch 0 taken 97 times.
✗ Branch 1 not taken.
97 if (desc_op_ptr)
650 {
651 // Use atomic exchange - only one of cancellation or reactor will succeed
652 97 auto* claimed = desc_op_ptr->exchange(nullptr, std::memory_order_acq_rel);
653
2/2
✓ Branch 0 taken 66 times.
✓ Branch 1 taken 31 times.
97 if (claimed == &op)
654 {
655 try {
656
1/1
✓ Branch 1 taken 66 times.
66 op.impl_ptr = shared_from_this();
657 } catch (const std::bad_weak_ptr&) {}
658 66 svc_.post(&op);
659 66 svc_.work_finished();
660 }
661 }
662 97 }
663
664 void
665 7950 epoll_socket_impl::
666 close_socket() noexcept
667 {
668 7950 cancel();
669
670
2/2
✓ Branch 0 taken 5296 times.
✓ Branch 1 taken 2654 times.
7950 if (fd_ >= 0)
671 {
672
1/2
✓ Branch 0 taken 5296 times.
✗ Branch 1 not taken.
5296 if (desc_data_.registered_events != 0)
673 5296 svc_.scheduler().deregister_descriptor(fd_);
674 5296 ::close(fd_);
675 5296 fd_ = -1;
676 }
677
678 7950 desc_data_.fd = -1;
679 7950 desc_data_.is_registered = false;
680 7950 desc_data_.read_op.store(nullptr, std::memory_order_relaxed);
681 7950 desc_data_.write_op.store(nullptr, std::memory_order_relaxed);
682 7950 desc_data_.connect_op.store(nullptr, std::memory_order_relaxed);
683 7950 desc_data_.read_ready.store(false, std::memory_order_relaxed);
684 7950 desc_data_.write_ready.store(false, std::memory_order_relaxed);
685 7950 desc_data_.registered_events = 0;
686
687 7950 local_endpoint_ = endpoint{};
688 7950 remote_endpoint_ = endpoint{};
689 7950 }
690
691 184 epoll_socket_service::
692 184 epoll_socket_service(capy::execution_context& ctx)
693
2/2
✓ Branch 2 taken 184 times.
✓ Branch 5 taken 184 times.
184 : state_(std::make_unique<epoll_socket_state>(ctx.use_service<epoll_scheduler>()))
694 {
695 184 }
696
697 368 epoll_socket_service::
698 184 ~epoll_socket_service()
699 {
700 368 }
701
702 void
703 184 epoll_socket_service::
704 shutdown()
705 {
706
1/1
✓ Branch 2 taken 184 times.
184 std::lock_guard lock(state_->mutex_);
707
708
1/2
✗ Branch 2 not taken.
✓ Branch 3 taken 184 times.
184 while (auto* impl = state_->socket_list_.pop_front())
709 impl->close_socket();
710
711 184 state_->socket_ptrs_.clear();
712 184 }
713
714 tcp_socket::socket_impl&
715 5296 epoll_socket_service::
716 create_impl()
717 {
718
1/1
✓ Branch 1 taken 5296 times.
5296 auto impl = std::make_shared<epoll_socket_impl>(*this);
719 5296 auto* raw = impl.get();
720
721 {
722
1/1
✓ Branch 2 taken 5296 times.
5296 std::lock_guard lock(state_->mutex_);
723 5296 state_->socket_list_.push_back(raw);
724
1/1
✓ Branch 3 taken 5296 times.
5296 state_->socket_ptrs_.emplace(raw, std::move(impl));
725 5296 }
726
727 5296 return *raw;
728 5296 }
729
730 void
731 5296 epoll_socket_service::
732 destroy_impl(tcp_socket::socket_impl& impl)
733 {
734 5296 auto* epoll_impl = static_cast<epoll_socket_impl*>(&impl);
735
1/1
✓ Branch 2 taken 5296 times.
5296 std::lock_guard lock(state_->mutex_);
736 5296 state_->socket_list_.remove(epoll_impl);
737
1/1
✓ Branch 2 taken 5296 times.
5296 state_->socket_ptrs_.erase(epoll_impl);
738 5296 }
739
740 std::error_code
741 2654 epoll_socket_service::
742 open_socket(tcp_socket::socket_impl& impl)
743 {
744 2654 auto* epoll_impl = static_cast<epoll_socket_impl*>(&impl);
745 2654 epoll_impl->close_socket();
746
747 2654 int fd = ::socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, 0);
748
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2654 times.
2654 if (fd < 0)
749 return make_err(errno);
750
751 2654 epoll_impl->fd_ = fd;
752
753 // Register fd with epoll (edge-triggered mode)
754 2654 epoll_impl->desc_data_.fd = fd;
755 2654 epoll_impl->desc_data_.read_op.store(nullptr, std::memory_order_relaxed);
756 2654 epoll_impl->desc_data_.write_op.store(nullptr, std::memory_order_relaxed);
757 2654 epoll_impl->desc_data_.connect_op.store(nullptr, std::memory_order_relaxed);
758 2654 scheduler().register_descriptor(fd, &epoll_impl->desc_data_);
759
760 2654 return {};
761 }
762
763 void
764 112544 epoll_socket_service::
765 post(epoll_op* op)
766 {
767 112544 state_->sched_.post(op);
768 112544 }
769
770 void
771 2811 epoll_socket_service::
772 work_started() noexcept
773 {
774 2811 state_->sched_.work_started();
775 2811 }
776
777 void
778 160 epoll_socket_service::
779 work_finished() noexcept
780 {
781 160 state_->sched_.work_finished();
782 160 }
783
784 } // namespace boost::corosio::detail
785
786 #endif // BOOST_COROSIO_HAS_EPOLL
787