Line data Source code
1 : //
2 : // Copyright (c) 2026 Steve Gerbino
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/corosio
8 : //
9 :
10 : #include <boost/corosio/detail/platform.hpp>
11 :
12 : #if BOOST_COROSIO_HAS_EPOLL
13 :
14 : #include "src/detail/epoll/sockets.hpp"
15 : #include "src/detail/endpoint_convert.hpp"
16 : #include "src/detail/make_err.hpp"
17 : #include "src/detail/resume_coro.hpp"
18 :
19 : #include <boost/corosio/detail/except.hpp>
20 : #include <boost/capy/buffers.hpp>
21 :
22 : #include <errno.h>
23 : #include <netinet/in.h>
24 : #include <netinet/tcp.h>
25 : #include <sys/epoll.h>
26 : #include <sys/socket.h>
27 : #include <unistd.h>
28 :
29 : namespace boost::corosio::detail {
30 :
31 : void
32 103 : epoll_op::canceller::
33 : operator()() const noexcept
34 : {
35 103 : op->cancel();
36 103 : }
37 :
38 : void
39 0 : epoll_connect_op::
40 : cancel() noexcept
41 : {
42 0 : if (socket_impl_)
43 0 : socket_impl_->cancel_single_op(*this);
44 : else
45 0 : request_cancel();
46 0 : }
47 :
48 : void
49 97 : epoll_read_op::
50 : cancel() noexcept
51 : {
52 97 : if (socket_impl_)
53 97 : socket_impl_->cancel_single_op(*this);
54 : else
55 0 : request_cancel();
56 97 : }
57 :
58 : void
59 0 : epoll_write_op::
60 : cancel() noexcept
61 : {
62 0 : if (socket_impl_)
63 0 : socket_impl_->cancel_single_op(*this);
64 : else
65 0 : request_cancel();
66 0 : }
67 :
68 : void
69 2643 : epoll_connect_op::
70 : operator()()
71 : {
72 2643 : stop_cb.reset();
73 :
74 2643 : bool success = (errn == 0 && !cancelled.load(std::memory_order_acquire));
75 :
76 : // Cache endpoints on successful connect
77 2643 : if (success && socket_impl_)
78 : {
79 : // Query local endpoint via getsockname (may fail, but remote is always known)
80 2642 : endpoint local_ep;
81 2642 : sockaddr_in local_addr{};
82 2642 : socklen_t local_len = sizeof(local_addr);
83 2642 : if (::getsockname(fd, reinterpret_cast<sockaddr*>(&local_addr), &local_len) == 0)
84 2642 : local_ep = from_sockaddr_in(local_addr);
85 : // Always cache remote endpoint; local may be default if getsockname failed
86 2642 : static_cast<epoll_socket_impl*>(socket_impl_)->set_endpoints(local_ep, target_endpoint);
87 : }
88 :
89 2643 : if (ec_out)
90 : {
91 2643 : if (cancelled.load(std::memory_order_acquire))
92 0 : *ec_out = capy::error::canceled;
93 2643 : else if (errn != 0)
94 1 : *ec_out = make_err(errn);
95 : else
96 2642 : *ec_out = {};
97 : }
98 :
99 2643 : if (bytes_out)
100 0 : *bytes_out = bytes_transferred;
101 :
102 : // Move to stack before resuming. See epoll_op::operator()() for rationale.
103 2643 : capy::executor_ref saved_ex( std::move( ex ) );
104 2643 : capy::coro saved_h( std::move( h ) );
105 2643 : auto prevent_premature_destruction = std::move(impl_ptr);
106 2643 : resume_coro(saved_ex, saved_h);
107 2643 : }
108 :
109 5296 : epoll_socket_impl::
110 5296 : epoll_socket_impl(epoll_socket_service& svc) noexcept
111 5296 : : svc_(svc)
112 : {
113 5296 : }
114 :
115 5296 : epoll_socket_impl::
116 5296 : ~epoll_socket_impl()
117 : {
118 5296 : if (read_initiator_handle_)
119 40 : read_initiator_handle_.destroy();
120 5296 : if (write_initiator_handle_)
121 38 : write_initiator_handle_.destroy();
122 :
123 : // promise_type::operator delete is no-op, so free here
124 5296 : if (read_initiator_frame_)
125 40 : ::operator delete(read_initiator_frame_);
126 5296 : if (write_initiator_frame_)
127 38 : ::operator delete(write_initiator_frame_);
128 5296 : }
129 :
130 : void
131 0 : epoll_socket_impl::
132 : update_epoll_events() noexcept
133 : {
134 : // With EPOLLET, update_descriptor_events just provides a memory fence
135 0 : svc_.scheduler().update_descriptor_events(fd_, &desc_data_, 0);
136 0 : }
137 :
138 : void
139 5296 : epoll_socket_impl::
140 : release()
141 : {
142 5296 : close_socket();
143 5296 : svc_.destroy_impl(*this);
144 5296 : }
145 :
146 : void
147 2643 : epoll_socket_impl::
148 : connect(
149 : std::coroutine_handle<> h,
150 : capy::executor_ref ex,
151 : endpoint ep,
152 : std::stop_token token,
153 : std::error_code* ec)
154 : {
155 2643 : auto& op = conn_;
156 2643 : op.reset();
157 2643 : op.h = h;
158 2643 : op.ex = ex;
159 2643 : op.ec_out = ec;
160 2643 : op.fd = fd_;
161 2643 : op.target_endpoint = ep; // Store target for endpoint caching
162 2643 : op.start(token, this);
163 :
164 2643 : sockaddr_in addr = detail::to_sockaddr_in(ep);
165 2643 : int result = ::connect(fd_, reinterpret_cast<sockaddr*>(&addr), sizeof(addr));
166 :
167 2643 : if (result == 0)
168 : {
169 : // Sync success - cache endpoints immediately
170 : // Remote is always known; local may fail but we still cache remote
171 0 : sockaddr_in local_addr{};
172 0 : socklen_t local_len = sizeof(local_addr);
173 0 : if (::getsockname(fd_, reinterpret_cast<sockaddr*>(&local_addr), &local_len) == 0)
174 0 : local_endpoint_ = detail::from_sockaddr_in(local_addr);
175 0 : remote_endpoint_ = ep;
176 :
177 0 : op.complete(0, 0);
178 0 : op.impl_ptr = shared_from_this();
179 0 : svc_.post(&op);
180 0 : return;
181 : }
182 :
183 2643 : if (errno == EINPROGRESS)
184 : {
185 2643 : svc_.work_started();
186 2643 : op.impl_ptr = shared_from_this();
187 :
188 2643 : desc_data_.connect_op.store(&op, std::memory_order_seq_cst);
189 :
190 2643 : if (desc_data_.write_ready.exchange(false, std::memory_order_seq_cst))
191 : {
192 43 : auto* claimed = desc_data_.connect_op.exchange(nullptr, std::memory_order_acq_rel);
193 43 : if (claimed)
194 : {
195 43 : claimed->perform_io();
196 43 : if (claimed->errn == EAGAIN || claimed->errn == EWOULDBLOCK)
197 : {
198 0 : claimed->errn = 0;
199 0 : desc_data_.connect_op.store(claimed, std::memory_order_release);
200 : }
201 : else
202 : {
203 43 : svc_.post(claimed);
204 43 : svc_.work_finished();
205 : }
206 43 : return;
207 : }
208 : }
209 :
210 2600 : if (op.cancelled.load(std::memory_order_acquire))
211 : {
212 0 : auto* claimed = desc_data_.connect_op.exchange(nullptr, std::memory_order_acq_rel);
213 0 : if (claimed)
214 : {
215 0 : svc_.post(claimed);
216 0 : svc_.work_finished();
217 : }
218 : }
219 2600 : return;
220 : }
221 :
222 0 : op.complete(errno, 0);
223 0 : op.impl_ptr = shared_from_this();
224 0 : svc_.post(&op);
225 : }
226 :
227 : read_initiator
228 56335 : make_read_initiator(void*& cached, epoll_socket_impl* impl)
229 : {
230 : impl->do_read_io();
231 : co_return;
232 112670 : }
233 :
234 : write_initiator
235 56215 : make_write_initiator(void*& cached, epoll_socket_impl* impl)
236 : {
237 : impl->do_write_io();
238 : co_return;
239 112430 : }
240 :
241 : void
242 56335 : epoll_socket_impl::
243 : do_read_io()
244 : {
245 56335 : auto& op = rd_;
246 :
247 56335 : ssize_t n = ::readv(fd_, op.iovecs, op.iovec_count);
248 :
249 56335 : if (n > 0)
250 : {
251 56162 : desc_data_.read_ready.store(false, std::memory_order_relaxed);
252 56162 : op.complete(0, static_cast<std::size_t>(n));
253 56162 : svc_.post(&op);
254 56162 : return;
255 : }
256 :
257 173 : if (n == 0)
258 : {
259 5 : desc_data_.read_ready.store(false, std::memory_order_relaxed);
260 5 : op.complete(0, 0);
261 5 : svc_.post(&op);
262 5 : return;
263 : }
264 :
265 168 : if (errno == EAGAIN || errno == EWOULDBLOCK)
266 : {
267 168 : svc_.work_started();
268 :
269 168 : desc_data_.read_op.store(&op, std::memory_order_seq_cst);
270 :
271 168 : if (desc_data_.read_ready.exchange(false, std::memory_order_seq_cst))
272 : {
273 0 : auto* claimed = desc_data_.read_op.exchange(nullptr, std::memory_order_acq_rel);
274 0 : if (claimed)
275 : {
276 0 : claimed->perform_io();
277 0 : if (claimed->errn == EAGAIN || claimed->errn == EWOULDBLOCK)
278 : {
279 0 : claimed->errn = 0;
280 0 : desc_data_.read_op.store(claimed, std::memory_order_release);
281 : }
282 : else
283 : {
284 0 : svc_.post(claimed);
285 0 : svc_.work_finished();
286 : }
287 0 : return;
288 : }
289 : }
290 :
291 168 : if (op.cancelled.load(std::memory_order_acquire))
292 : {
293 0 : auto* claimed = desc_data_.read_op.exchange(nullptr, std::memory_order_acq_rel);
294 0 : if (claimed)
295 : {
296 0 : svc_.post(claimed);
297 0 : svc_.work_finished();
298 : }
299 : }
300 168 : return;
301 : }
302 :
303 0 : op.complete(errno, 0);
304 0 : svc_.post(&op);
305 : }
306 :
307 : void
308 56215 : epoll_socket_impl::
309 : do_write_io()
310 : {
311 56215 : auto& op = wr_;
312 :
313 56215 : msghdr msg{};
314 56215 : msg.msg_iov = op.iovecs;
315 56215 : msg.msg_iovlen = static_cast<std::size_t>(op.iovec_count);
316 :
317 56215 : ssize_t n = ::sendmsg(fd_, &msg, MSG_NOSIGNAL);
318 :
319 56215 : if (n > 0)
320 : {
321 56214 : desc_data_.write_ready.store(false, std::memory_order_relaxed);
322 56214 : op.complete(0, static_cast<std::size_t>(n));
323 56214 : svc_.post(&op);
324 56214 : return;
325 : }
326 :
327 1 : if (errno == EAGAIN || errno == EWOULDBLOCK)
328 : {
329 0 : svc_.work_started();
330 :
331 0 : desc_data_.write_op.store(&op, std::memory_order_seq_cst);
332 :
333 0 : if (desc_data_.write_ready.exchange(false, std::memory_order_seq_cst))
334 : {
335 0 : auto* claimed = desc_data_.write_op.exchange(nullptr, std::memory_order_acq_rel);
336 0 : if (claimed)
337 : {
338 0 : claimed->perform_io();
339 0 : if (claimed->errn == EAGAIN || claimed->errn == EWOULDBLOCK)
340 : {
341 0 : claimed->errn = 0;
342 0 : desc_data_.write_op.store(claimed, std::memory_order_release);
343 : }
344 : else
345 : {
346 0 : svc_.post(claimed);
347 0 : svc_.work_finished();
348 : }
349 0 : return;
350 : }
351 : }
352 :
353 0 : if (op.cancelled.load(std::memory_order_acquire))
354 : {
355 0 : auto* claimed = desc_data_.write_op.exchange(nullptr, std::memory_order_acq_rel);
356 0 : if (claimed)
357 : {
358 0 : svc_.post(claimed);
359 0 : svc_.work_finished();
360 : }
361 : }
362 0 : return;
363 : }
364 :
365 1 : op.complete(errno ? errno : EIO, 0);
366 1 : svc_.post(&op);
367 : }
368 :
369 : std::coroutine_handle<>
370 56336 : epoll_socket_impl::
371 : read_some(
372 : std::coroutine_handle<> h,
373 : capy::executor_ref ex,
374 : io_buffer_param param,
375 : std::stop_token token,
376 : std::error_code* ec,
377 : std::size_t* bytes_out)
378 : {
379 56336 : auto& op = rd_;
380 56336 : op.reset();
381 56336 : op.h = h;
382 56336 : op.ex = ex;
383 56336 : op.ec_out = ec;
384 56336 : op.bytes_out = bytes_out;
385 56336 : op.fd = fd_;
386 56336 : op.start(token, this);
387 56336 : op.impl_ptr = shared_from_this();
388 :
389 : // Must prepare buffers before initiator runs
390 56336 : capy::mutable_buffer bufs[epoll_read_op::max_buffers];
391 56336 : op.iovec_count = static_cast<int>(param.copy_to(bufs, epoll_read_op::max_buffers));
392 :
393 56336 : if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
394 : {
395 1 : op.empty_buffer_read = true;
396 1 : op.complete(0, 0);
397 1 : svc_.post(&op);
398 1 : return std::noop_coroutine();
399 : }
400 :
401 112670 : for (int i = 0; i < op.iovec_count; ++i)
402 : {
403 56335 : op.iovecs[i].iov_base = bufs[i].data();
404 56335 : op.iovecs[i].iov_len = bufs[i].size();
405 : }
406 :
407 56335 : if (read_initiator_handle_)
408 56295 : read_initiator_handle_.destroy();
409 :
410 56335 : auto initiator = make_read_initiator(read_initiator_frame_, this);
411 56335 : read_initiator_handle_ = initiator.h;
412 :
413 : // Symmetric transfer ensures caller is suspended before I/O starts
414 56335 : return initiator.h;
415 : }
416 :
417 : std::coroutine_handle<>
418 56216 : epoll_socket_impl::
419 : write_some(
420 : std::coroutine_handle<> h,
421 : capy::executor_ref ex,
422 : io_buffer_param param,
423 : std::stop_token token,
424 : std::error_code* ec,
425 : std::size_t* bytes_out)
426 : {
427 56216 : auto& op = wr_;
428 56216 : op.reset();
429 56216 : op.h = h;
430 56216 : op.ex = ex;
431 56216 : op.ec_out = ec;
432 56216 : op.bytes_out = bytes_out;
433 56216 : op.fd = fd_;
434 56216 : op.start(token, this);
435 56216 : op.impl_ptr = shared_from_this();
436 :
437 : // Must prepare buffers before initiator runs
438 56216 : capy::mutable_buffer bufs[epoll_write_op::max_buffers];
439 56216 : op.iovec_count = static_cast<int>(param.copy_to(bufs, epoll_write_op::max_buffers));
440 :
441 56216 : if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
442 : {
443 1 : op.complete(0, 0);
444 1 : svc_.post(&op);
445 1 : return std::noop_coroutine();
446 : }
447 :
448 112430 : for (int i = 0; i < op.iovec_count; ++i)
449 : {
450 56215 : op.iovecs[i].iov_base = bufs[i].data();
451 56215 : op.iovecs[i].iov_len = bufs[i].size();
452 : }
453 :
454 56215 : if (write_initiator_handle_)
455 56177 : write_initiator_handle_.destroy();
456 :
457 56215 : auto initiator = make_write_initiator(write_initiator_frame_, this);
458 56215 : write_initiator_handle_ = initiator.h;
459 :
460 : // Symmetric transfer ensures caller is suspended before I/O starts
461 56215 : return initiator.h;
462 : }
463 :
464 : std::error_code
465 3 : epoll_socket_impl::
466 : shutdown(tcp_socket::shutdown_type what) noexcept
467 : {
468 : int how;
469 3 : switch (what)
470 : {
471 1 : case tcp_socket::shutdown_receive: how = SHUT_RD; break;
472 1 : case tcp_socket::shutdown_send: how = SHUT_WR; break;
473 1 : case tcp_socket::shutdown_both: how = SHUT_RDWR; break;
474 0 : default:
475 0 : return make_err(EINVAL);
476 : }
477 3 : if (::shutdown(fd_, how) != 0)
478 0 : return make_err(errno);
479 3 : return {};
480 : }
481 :
482 : std::error_code
483 5 : epoll_socket_impl::
484 : set_no_delay(bool value) noexcept
485 : {
486 5 : int flag = value ? 1 : 0;
487 5 : if (::setsockopt(fd_, IPPROTO_TCP, TCP_NODELAY, &flag, sizeof(flag)) != 0)
488 0 : return make_err(errno);
489 5 : return {};
490 : }
491 :
492 : bool
493 5 : epoll_socket_impl::
494 : no_delay(std::error_code& ec) const noexcept
495 : {
496 5 : int flag = 0;
497 5 : socklen_t len = sizeof(flag);
498 5 : if (::getsockopt(fd_, IPPROTO_TCP, TCP_NODELAY, &flag, &len) != 0)
499 : {
500 0 : ec = make_err(errno);
501 0 : return false;
502 : }
503 5 : ec = {};
504 5 : return flag != 0;
505 : }
506 :
507 : std::error_code
508 4 : epoll_socket_impl::
509 : set_keep_alive(bool value) noexcept
510 : {
511 4 : int flag = value ? 1 : 0;
512 4 : if (::setsockopt(fd_, SOL_SOCKET, SO_KEEPALIVE, &flag, sizeof(flag)) != 0)
513 0 : return make_err(errno);
514 4 : return {};
515 : }
516 :
517 : bool
518 4 : epoll_socket_impl::
519 : keep_alive(std::error_code& ec) const noexcept
520 : {
521 4 : int flag = 0;
522 4 : socklen_t len = sizeof(flag);
523 4 : if (::getsockopt(fd_, SOL_SOCKET, SO_KEEPALIVE, &flag, &len) != 0)
524 : {
525 0 : ec = make_err(errno);
526 0 : return false;
527 : }
528 4 : ec = {};
529 4 : return flag != 0;
530 : }
531 :
532 : std::error_code
533 1 : epoll_socket_impl::
534 : set_receive_buffer_size(int size) noexcept
535 : {
536 1 : if (::setsockopt(fd_, SOL_SOCKET, SO_RCVBUF, &size, sizeof(size)) != 0)
537 0 : return make_err(errno);
538 1 : return {};
539 : }
540 :
541 : int
542 3 : epoll_socket_impl::
543 : receive_buffer_size(std::error_code& ec) const noexcept
544 : {
545 3 : int size = 0;
546 3 : socklen_t len = sizeof(size);
547 3 : if (::getsockopt(fd_, SOL_SOCKET, SO_RCVBUF, &size, &len) != 0)
548 : {
549 0 : ec = make_err(errno);
550 0 : return 0;
551 : }
552 3 : ec = {};
553 3 : return size;
554 : }
555 :
556 : std::error_code
557 1 : epoll_socket_impl::
558 : set_send_buffer_size(int size) noexcept
559 : {
560 1 : if (::setsockopt(fd_, SOL_SOCKET, SO_SNDBUF, &size, sizeof(size)) != 0)
561 0 : return make_err(errno);
562 1 : return {};
563 : }
564 :
565 : int
566 3 : epoll_socket_impl::
567 : send_buffer_size(std::error_code& ec) const noexcept
568 : {
569 3 : int size = 0;
570 3 : socklen_t len = sizeof(size);
571 3 : if (::getsockopt(fd_, SOL_SOCKET, SO_SNDBUF, &size, &len) != 0)
572 : {
573 0 : ec = make_err(errno);
574 0 : return 0;
575 : }
576 3 : ec = {};
577 3 : return size;
578 : }
579 :
580 : std::error_code
581 4 : epoll_socket_impl::
582 : set_linger(bool enabled, int timeout) noexcept
583 : {
584 4 : if (timeout < 0)
585 1 : return make_err(EINVAL);
586 : struct ::linger lg;
587 3 : lg.l_onoff = enabled ? 1 : 0;
588 3 : lg.l_linger = timeout;
589 3 : if (::setsockopt(fd_, SOL_SOCKET, SO_LINGER, &lg, sizeof(lg)) != 0)
590 0 : return make_err(errno);
591 3 : return {};
592 : }
593 :
594 : tcp_socket::linger_options
595 3 : epoll_socket_impl::
596 : linger(std::error_code& ec) const noexcept
597 : {
598 3 : struct ::linger lg{};
599 3 : socklen_t len = sizeof(lg);
600 3 : if (::getsockopt(fd_, SOL_SOCKET, SO_LINGER, &lg, &len) != 0)
601 : {
602 0 : ec = make_err(errno);
603 0 : return {};
604 : }
605 3 : ec = {};
606 3 : return {.enabled = lg.l_onoff != 0, .timeout = lg.l_linger};
607 : }
608 :
609 : void
610 8046 : epoll_socket_impl::
611 : cancel() noexcept
612 : {
613 8046 : std::shared_ptr<epoll_socket_impl> self;
614 : try {
615 8046 : self = shared_from_this();
616 0 : } catch (const std::bad_weak_ptr&) {
617 0 : return;
618 0 : }
619 :
620 : // Use atomic exchange to claim operations - only one of cancellation
621 : // or reactor will succeed
622 24138 : auto cancel_atomic_op = [this, &self](epoll_op& op, std::atomic<epoll_op*>& desc_op_ptr) {
623 24138 : op.request_cancel();
624 24138 : auto* claimed = desc_op_ptr.exchange(nullptr, std::memory_order_acq_rel);
625 24138 : if (claimed == &op)
626 : {
627 51 : op.impl_ptr = self;
628 51 : svc_.post(&op);
629 51 : svc_.work_finished();
630 : }
631 32184 : };
632 :
633 8046 : cancel_atomic_op(conn_, desc_data_.connect_op);
634 8046 : cancel_atomic_op(rd_, desc_data_.read_op);
635 8046 : cancel_atomic_op(wr_, desc_data_.write_op);
636 8046 : }
637 :
638 : void
639 97 : epoll_socket_impl::
640 : cancel_single_op(epoll_op& op) noexcept
641 : {
642 97 : op.request_cancel();
643 :
644 97 : std::atomic<epoll_op*>* desc_op_ptr = nullptr;
645 97 : if (&op == &conn_) desc_op_ptr = &desc_data_.connect_op;
646 97 : else if (&op == &rd_) desc_op_ptr = &desc_data_.read_op;
647 0 : else if (&op == &wr_) desc_op_ptr = &desc_data_.write_op;
648 :
649 97 : if (desc_op_ptr)
650 : {
651 : // Use atomic exchange - only one of cancellation or reactor will succeed
652 97 : auto* claimed = desc_op_ptr->exchange(nullptr, std::memory_order_acq_rel);
653 97 : if (claimed == &op)
654 : {
655 : try {
656 66 : op.impl_ptr = shared_from_this();
657 0 : } catch (const std::bad_weak_ptr&) {}
658 66 : svc_.post(&op);
659 66 : svc_.work_finished();
660 : }
661 : }
662 97 : }
663 :
664 : void
665 7950 : epoll_socket_impl::
666 : close_socket() noexcept
667 : {
668 7950 : cancel();
669 :
670 7950 : if (fd_ >= 0)
671 : {
672 5296 : if (desc_data_.registered_events != 0)
673 5296 : svc_.scheduler().deregister_descriptor(fd_);
674 5296 : ::close(fd_);
675 5296 : fd_ = -1;
676 : }
677 :
678 7950 : desc_data_.fd = -1;
679 7950 : desc_data_.is_registered = false;
680 7950 : desc_data_.read_op.store(nullptr, std::memory_order_relaxed);
681 7950 : desc_data_.write_op.store(nullptr, std::memory_order_relaxed);
682 7950 : desc_data_.connect_op.store(nullptr, std::memory_order_relaxed);
683 7950 : desc_data_.read_ready.store(false, std::memory_order_relaxed);
684 7950 : desc_data_.write_ready.store(false, std::memory_order_relaxed);
685 7950 : desc_data_.registered_events = 0;
686 :
687 7950 : local_endpoint_ = endpoint{};
688 7950 : remote_endpoint_ = endpoint{};
689 7950 : }
690 :
691 184 : epoll_socket_service::
692 184 : epoll_socket_service(capy::execution_context& ctx)
693 184 : : state_(std::make_unique<epoll_socket_state>(ctx.use_service<epoll_scheduler>()))
694 : {
695 184 : }
696 :
697 368 : epoll_socket_service::
698 184 : ~epoll_socket_service()
699 : {
700 368 : }
701 :
702 : void
703 184 : epoll_socket_service::
704 : shutdown()
705 : {
706 184 : std::lock_guard lock(state_->mutex_);
707 :
708 184 : while (auto* impl = state_->socket_list_.pop_front())
709 0 : impl->close_socket();
710 :
711 184 : state_->socket_ptrs_.clear();
712 184 : }
713 :
714 : tcp_socket::socket_impl&
715 5296 : epoll_socket_service::
716 : create_impl()
717 : {
718 5296 : auto impl = std::make_shared<epoll_socket_impl>(*this);
719 5296 : auto* raw = impl.get();
720 :
721 : {
722 5296 : std::lock_guard lock(state_->mutex_);
723 5296 : state_->socket_list_.push_back(raw);
724 5296 : state_->socket_ptrs_.emplace(raw, std::move(impl));
725 5296 : }
726 :
727 5296 : return *raw;
728 5296 : }
729 :
730 : void
731 5296 : epoll_socket_service::
732 : destroy_impl(tcp_socket::socket_impl& impl)
733 : {
734 5296 : auto* epoll_impl = static_cast<epoll_socket_impl*>(&impl);
735 5296 : std::lock_guard lock(state_->mutex_);
736 5296 : state_->socket_list_.remove(epoll_impl);
737 5296 : state_->socket_ptrs_.erase(epoll_impl);
738 5296 : }
739 :
740 : std::error_code
741 2654 : epoll_socket_service::
742 : open_socket(tcp_socket::socket_impl& impl)
743 : {
744 2654 : auto* epoll_impl = static_cast<epoll_socket_impl*>(&impl);
745 2654 : epoll_impl->close_socket();
746 :
747 2654 : int fd = ::socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, 0);
748 2654 : if (fd < 0)
749 0 : return make_err(errno);
750 :
751 2654 : epoll_impl->fd_ = fd;
752 :
753 : // Register fd with epoll (edge-triggered mode)
754 2654 : epoll_impl->desc_data_.fd = fd;
755 2654 : epoll_impl->desc_data_.read_op.store(nullptr, std::memory_order_relaxed);
756 2654 : epoll_impl->desc_data_.write_op.store(nullptr, std::memory_order_relaxed);
757 2654 : epoll_impl->desc_data_.connect_op.store(nullptr, std::memory_order_relaxed);
758 2654 : scheduler().register_descriptor(fd, &epoll_impl->desc_data_);
759 :
760 2654 : return {};
761 : }
762 :
763 : void
764 112544 : epoll_socket_service::
765 : post(epoll_op* op)
766 : {
767 112544 : state_->sched_.post(op);
768 112544 : }
769 :
770 : void
771 2811 : epoll_socket_service::
772 : work_started() noexcept
773 : {
774 2811 : state_->sched_.work_started();
775 2811 : }
776 :
777 : void
778 160 : epoll_socket_service::
779 : work_finished() noexcept
780 : {
781 160 : state_->sched_.work_finished();
782 160 : }
783 :
784 : } // namespace boost::corosio::detail
785 :
786 : #endif // BOOST_COROSIO_HAS_EPOLL
|