Line data Source code
1 : //
2 : // Copyright (c) 2026 Steve Gerbino
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/corosio
8 : //
9 :
10 : #include <boost/corosio/detail/platform.hpp>
11 :
12 : #if BOOST_COROSIO_HAS_EPOLL
13 :
14 : #include "src/detail/epoll/scheduler.hpp"
15 : #include "src/detail/epoll/op.hpp"
16 : #include "src/detail/make_err.hpp"
17 : #include "src/detail/posix/resolver_service.hpp"
18 : #include "src/detail/posix/signals.hpp"
19 :
20 : #include <boost/corosio/detail/except.hpp>
21 : #include <boost/corosio/detail/thread_local_ptr.hpp>
22 :
23 : #include <atomic>
24 : #include <chrono>
25 : #include <limits>
26 :
27 : #include <errno.h>
28 : #include <fcntl.h>
29 : #include <sys/epoll.h>
30 : #include <sys/eventfd.h>
31 : #include <sys/socket.h>
32 : #include <sys/timerfd.h>
33 : #include <unistd.h>
34 :
35 : /*
36 : epoll Scheduler - Single Reactor Model
37 : ======================================
38 :
39 : This scheduler uses a thread coordination strategy to provide handler
40 : parallelism and avoid the thundering herd problem.
41 : Instead of all threads blocking on epoll_wait(), one thread becomes the
42 : "reactor" while others wait on a condition variable for handler work.
43 :
44 : Thread Model
45 : ------------
46 : - ONE thread runs epoll_wait() at a time (the reactor thread)
47 : - OTHER threads wait on wakeup_event_ (condition variable) for handlers
48 : - When work is posted, exactly one waiting thread wakes via notify_one()
49 : - This matches Windows IOCP semantics where N posted items wake N threads
50 :
51 : Event Loop Structure (do_one)
52 : -----------------------------
53 : 1. Lock mutex, try to pop handler from queue
54 : 2. If got handler: execute it (unlocked), return
55 : 3. If queue empty and no reactor running: become reactor
56 : - Run epoll_wait (unlocked), queue I/O completions, loop back
57 : 4. If queue empty and reactor running: wait on condvar for work
58 :
59 : The reactor_running_ flag ensures only one thread owns epoll_wait().
60 : After the reactor queues I/O completions, it loops back to try getting
61 : a handler, giving priority to handler execution over more I/O polling.
62 :
63 : Wake Coordination (wake_one_thread_and_unlock)
64 : ----------------------------------------------
65 : When posting work:
66 : - If idle threads exist: notify_one() wakes exactly one worker
67 : - Else if reactor running: interrupt via eventfd write
68 : - Else: no-op (thread will find work when it checks queue)
69 :
70 : This is critical for matching IOCP behavior. With the old model, posting
71 : N handlers would wake all threads (thundering herd). Now each post()
72 : wakes at most one thread, and that thread handles exactly one item.
73 :
74 : Work Counting
75 : -------------
76 : outstanding_work_ tracks pending operations. When it hits zero, run()
77 : returns. Each operation increments on start, decrements on completion.
78 :
79 : Timer Integration
80 : -----------------
81 : Timers are handled by timer_service. The reactor adjusts epoll_wait
82 : timeout to wake for the nearest timer expiry. When a new timer is
83 : scheduled earlier than current, timer_service calls interrupt_reactor()
84 : to re-evaluate the timeout.
85 : */
86 :
87 : namespace boost::corosio::detail {
88 :
89 : namespace {
90 :
91 : struct scheduler_context
92 : {
93 : epoll_scheduler const* key;
94 : scheduler_context* next;
95 : };
96 :
97 : corosio::detail::thread_local_ptr<scheduler_context> context_stack;
98 :
99 : struct thread_context_guard
100 : {
101 : scheduler_context frame_;
102 :
103 154 : explicit thread_context_guard(
104 : epoll_scheduler const* ctx) noexcept
105 154 : : frame_{ctx, context_stack.get()}
106 : {
107 154 : context_stack.set(&frame_);
108 154 : }
109 :
110 154 : ~thread_context_guard() noexcept
111 : {
112 154 : context_stack.set(frame_.next);
113 154 : }
114 : };
115 :
116 : } // namespace
117 :
118 184 : epoll_scheduler::
119 : epoll_scheduler(
120 : capy::execution_context& ctx,
121 184 : int)
122 184 : : epoll_fd_(-1)
123 184 : , event_fd_(-1)
124 184 : , timer_fd_(-1)
125 184 : , outstanding_work_(0)
126 184 : , stopped_(false)
127 184 : , shutdown_(false)
128 184 : , reactor_running_(false)
129 184 : , reactor_interrupted_(false)
130 368 : , idle_thread_count_(0)
131 : {
132 184 : epoll_fd_ = ::epoll_create1(EPOLL_CLOEXEC);
133 184 : if (epoll_fd_ < 0)
134 0 : detail::throw_system_error(make_err(errno), "epoll_create1");
135 :
136 184 : event_fd_ = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
137 184 : if (event_fd_ < 0)
138 : {
139 0 : int errn = errno;
140 0 : ::close(epoll_fd_);
141 0 : detail::throw_system_error(make_err(errn), "eventfd");
142 : }
143 :
144 184 : timer_fd_ = ::timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC);
145 184 : if (timer_fd_ < 0)
146 : {
147 0 : int errn = errno;
148 0 : ::close(event_fd_);
149 0 : ::close(epoll_fd_);
150 0 : detail::throw_system_error(make_err(errn), "timerfd_create");
151 : }
152 :
153 184 : epoll_event ev{};
154 184 : ev.events = EPOLLIN | EPOLLET;
155 184 : ev.data.ptr = nullptr;
156 184 : if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, event_fd_, &ev) < 0)
157 : {
158 0 : int errn = errno;
159 0 : ::close(timer_fd_);
160 0 : ::close(event_fd_);
161 0 : ::close(epoll_fd_);
162 0 : detail::throw_system_error(make_err(errn), "epoll_ctl");
163 : }
164 :
165 184 : epoll_event timer_ev{};
166 184 : timer_ev.events = EPOLLIN | EPOLLERR;
167 184 : timer_ev.data.ptr = &timer_fd_;
168 184 : if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, timer_fd_, &timer_ev) < 0)
169 : {
170 0 : int errn = errno;
171 0 : ::close(timer_fd_);
172 0 : ::close(event_fd_);
173 0 : ::close(epoll_fd_);
174 0 : detail::throw_system_error(make_err(errn), "epoll_ctl (timerfd)");
175 : }
176 :
177 184 : timer_svc_ = &get_timer_service(ctx, *this);
178 184 : timer_svc_->set_on_earliest_changed(
179 : timer_service::callback(
180 : this,
181 2881 : [](void* p) { static_cast<epoll_scheduler*>(p)->update_timerfd(); }));
182 :
183 : // Initialize resolver service
184 184 : get_resolver_service(ctx, *this);
185 :
186 : // Initialize signal service
187 184 : get_signal_service(ctx, *this);
188 :
189 : // Push task sentinel to interleave reactor runs with handler execution
190 184 : completed_ops_.push(&task_op_);
191 184 : }
192 :
193 368 : epoll_scheduler::
194 184 : ~epoll_scheduler()
195 : {
196 184 : if (timer_fd_ >= 0)
197 184 : ::close(timer_fd_);
198 184 : if (event_fd_ >= 0)
199 184 : ::close(event_fd_);
200 184 : if (epoll_fd_ >= 0)
201 184 : ::close(epoll_fd_);
202 368 : }
203 :
204 : void
205 184 : epoll_scheduler::
206 : shutdown()
207 : {
208 : {
209 184 : std::unique_lock lock(mutex_);
210 184 : shutdown_ = true;
211 :
212 368 : while (auto* h = completed_ops_.pop())
213 : {
214 184 : if (h == &task_op_)
215 184 : continue;
216 0 : lock.unlock();
217 0 : h->destroy();
218 0 : lock.lock();
219 184 : }
220 184 : }
221 :
222 184 : outstanding_work_.store(0, std::memory_order_release);
223 :
224 184 : if (event_fd_ >= 0)
225 184 : interrupt_reactor();
226 :
227 184 : wakeup_event_.notify_all();
228 184 : }
229 :
230 : void
231 1639 : epoll_scheduler::
232 : post(capy::coro h) const
233 : {
234 : struct post_handler final
235 : : scheduler_op
236 : {
237 : capy::coro h_;
238 :
239 : explicit
240 1639 : post_handler(capy::coro h)
241 1639 : : h_(h)
242 : {
243 1639 : }
244 :
245 3278 : ~post_handler() = default;
246 :
247 1639 : void operator()() override
248 : {
249 1639 : auto h = h_;
250 1639 : delete this;
251 : std::atomic_thread_fence(std::memory_order_acquire);
252 1639 : h.resume();
253 1639 : }
254 :
255 0 : void destroy() override
256 : {
257 0 : delete this;
258 0 : }
259 : };
260 :
261 1639 : auto ph = std::make_unique<post_handler>(h);
262 1639 : outstanding_work_.fetch_add(1, std::memory_order_relaxed);
263 :
264 1639 : std::unique_lock lock(mutex_);
265 1639 : completed_ops_.push(ph.release());
266 1639 : wake_one_thread_and_unlock(lock);
267 1639 : }
268 :
269 : void
270 112592 : epoll_scheduler::
271 : post(scheduler_op* h) const
272 : {
273 112592 : outstanding_work_.fetch_add(1, std::memory_order_relaxed);
274 :
275 112592 : std::unique_lock lock(mutex_);
276 112592 : completed_ops_.push(h);
277 112592 : wake_one_thread_and_unlock(lock);
278 112592 : }
279 :
280 : void
281 2905 : epoll_scheduler::
282 : on_work_started() noexcept
283 : {
284 2905 : outstanding_work_.fetch_add(1, std::memory_order_relaxed);
285 2905 : }
286 :
287 : void
288 2873 : epoll_scheduler::
289 : on_work_finished() noexcept
290 : {
291 5746 : if (outstanding_work_.fetch_sub(1, std::memory_order_acq_rel) == 1)
292 18 : stop();
293 2873 : }
294 :
295 : bool
296 3120 : epoll_scheduler::
297 : running_in_this_thread() const noexcept
298 : {
299 3120 : for (auto* c = context_stack.get(); c != nullptr; c = c->next)
300 2910 : if (c->key == this)
301 2910 : return true;
302 210 : return false;
303 : }
304 :
305 : void
306 37 : epoll_scheduler::
307 : stop()
308 : {
309 37 : bool expected = false;
310 37 : if (stopped_.compare_exchange_strong(expected, true,
311 : std::memory_order_release, std::memory_order_relaxed))
312 : {
313 : // Wake all threads so they notice stopped_ and exit
314 : {
315 37 : std::lock_guard lock(mutex_);
316 37 : wakeup_event_.notify_all();
317 37 : }
318 37 : interrupt_reactor();
319 : }
320 37 : }
321 :
322 : bool
323 16 : epoll_scheduler::
324 : stopped() const noexcept
325 : {
326 16 : return stopped_.load(std::memory_order_acquire);
327 : }
328 :
329 : void
330 49 : epoll_scheduler::
331 : restart()
332 : {
333 49 : stopped_.store(false, std::memory_order_release);
334 49 : }
335 :
336 : std::size_t
337 175 : epoll_scheduler::
338 : run()
339 : {
340 175 : if (stopped_.load(std::memory_order_acquire))
341 21 : return 0;
342 :
343 308 : if (outstanding_work_.load(std::memory_order_acquire) == 0)
344 : {
345 11 : stop();
346 11 : return 0;
347 : }
348 :
349 143 : thread_context_guard ctx(this);
350 :
351 143 : std::size_t n = 0;
352 119654 : while (do_one(-1))
353 119511 : if (n != (std::numeric_limits<std::size_t>::max)())
354 119511 : ++n;
355 143 : return n;
356 143 : }
357 :
358 : std::size_t
359 2 : epoll_scheduler::
360 : run_one()
361 : {
362 2 : if (stopped_.load(std::memory_order_acquire))
363 0 : return 0;
364 :
365 4 : if (outstanding_work_.load(std::memory_order_acquire) == 0)
366 : {
367 0 : stop();
368 0 : return 0;
369 : }
370 :
371 2 : thread_context_guard ctx(this);
372 2 : return do_one(-1);
373 2 : }
374 :
375 : std::size_t
376 10 : epoll_scheduler::
377 : wait_one(long usec)
378 : {
379 10 : if (stopped_.load(std::memory_order_acquire))
380 0 : return 0;
381 :
382 20 : if (outstanding_work_.load(std::memory_order_acquire) == 0)
383 : {
384 4 : stop();
385 4 : return 0;
386 : }
387 :
388 6 : thread_context_guard ctx(this);
389 6 : return do_one(usec);
390 6 : }
391 :
392 : std::size_t
393 2 : epoll_scheduler::
394 : poll()
395 : {
396 2 : if (stopped_.load(std::memory_order_acquire))
397 0 : return 0;
398 :
399 4 : if (outstanding_work_.load(std::memory_order_acquire) == 0)
400 : {
401 1 : stop();
402 1 : return 0;
403 : }
404 :
405 1 : thread_context_guard ctx(this);
406 :
407 1 : std::size_t n = 0;
408 3 : while (do_one(0))
409 2 : if (n != (std::numeric_limits<std::size_t>::max)())
410 2 : ++n;
411 1 : return n;
412 1 : }
413 :
414 : std::size_t
415 4 : epoll_scheduler::
416 : poll_one()
417 : {
418 4 : if (stopped_.load(std::memory_order_acquire))
419 0 : return 0;
420 :
421 8 : if (outstanding_work_.load(std::memory_order_acquire) == 0)
422 : {
423 2 : stop();
424 2 : return 0;
425 : }
426 :
427 2 : thread_context_guard ctx(this);
428 2 : return do_one(0);
429 2 : }
430 :
431 : void
432 5353 : epoll_scheduler::
433 : register_descriptor(int fd, descriptor_data* desc) const
434 : {
435 5353 : epoll_event ev{};
436 5353 : ev.events = EPOLLIN | EPOLLOUT | EPOLLET | EPOLLERR | EPOLLHUP;
437 5353 : ev.data.ptr = desc;
438 :
439 5353 : if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &ev) < 0)
440 0 : detail::throw_system_error(make_err(errno), "epoll_ctl (register)");
441 :
442 5353 : desc->registered_events = ev.events;
443 5353 : desc->is_registered = true;
444 5353 : desc->fd = fd;
445 5353 : desc->read_ready.store(false, std::memory_order_relaxed);
446 5353 : desc->write_ready.store(false, std::memory_order_relaxed);
447 5353 : }
448 :
449 : void
450 0 : epoll_scheduler::
451 : update_descriptor_events(int, descriptor_data*, std::uint32_t) const
452 : {
453 : // Provides memory fence for operation pointer visibility across threads
454 : std::atomic_thread_fence(std::memory_order_seq_cst);
455 0 : }
456 :
457 : void
458 5353 : epoll_scheduler::
459 : deregister_descriptor(int fd) const
460 : {
461 5353 : ::epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, nullptr);
462 5353 : }
463 :
464 : void
465 5460 : epoll_scheduler::
466 : work_started() const noexcept
467 : {
468 5460 : outstanding_work_.fetch_add(1, std::memory_order_relaxed);
469 5460 : }
470 :
471 : void
472 119723 : epoll_scheduler::
473 : work_finished() const noexcept
474 : {
475 239446 : if (outstanding_work_.fetch_sub(1, std::memory_order_acq_rel) == 1)
476 : {
477 : // Last work item completed - wake all threads so they can exit.
478 : // notify_all() wakes threads waiting on the condvar.
479 : // interrupt_reactor() wakes the reactor thread blocked in epoll_wait().
480 : // Both are needed because they target different blocking mechanisms.
481 130 : std::unique_lock lock(mutex_);
482 130 : wakeup_event_.notify_all();
483 130 : if (reactor_running_ && !reactor_interrupted_)
484 : {
485 2 : reactor_interrupted_ = true;
486 2 : lock.unlock();
487 2 : interrupt_reactor();
488 : }
489 130 : }
490 119723 : }
491 :
492 : void
493 462 : epoll_scheduler::
494 : interrupt_reactor() const
495 : {
496 : // Only write if not already armed to avoid redundant writes
497 462 : bool expected = false;
498 462 : if (eventfd_armed_.compare_exchange_strong(expected, true,
499 : std::memory_order_release, std::memory_order_relaxed))
500 : {
501 425 : std::uint64_t val = 1;
502 425 : [[maybe_unused]] auto r = ::write(event_fd_, &val, sizeof(val));
503 : }
504 462 : }
505 :
506 : void
507 114231 : epoll_scheduler::
508 : wake_one_thread_and_unlock(std::unique_lock<std::mutex>& lock) const
509 : {
510 114231 : if (idle_thread_count_ > 0)
511 : {
512 0 : wakeup_event_.notify_one();
513 0 : lock.unlock();
514 : }
515 114231 : else if (reactor_running_ && !reactor_interrupted_)
516 : {
517 239 : reactor_interrupted_ = true;
518 239 : lock.unlock();
519 239 : interrupt_reactor();
520 : }
521 : else
522 : {
523 113992 : lock.unlock();
524 : }
525 114231 : }
526 :
527 : struct work_guard
528 : {
529 : epoll_scheduler const* self;
530 0 : ~work_guard() { self->work_finished(); }
531 : };
532 :
533 : void
534 51235 : epoll_scheduler::
535 : update_timerfd() const
536 : {
537 51235 : auto nearest = timer_svc_->nearest_expiry();
538 :
539 51235 : itimerspec ts{};
540 51235 : int flags = 0;
541 :
542 51235 : if (nearest == timer_service::time_point::max())
543 : {
544 : // No timers - disarm by setting to 0 (relative)
545 : // ts is already zeroed
546 : }
547 : else
548 : {
549 50904 : auto now = std::chrono::steady_clock::now();
550 50904 : if (nearest <= now)
551 : {
552 : // Use 1ns instead of 0 - zero disarms the timerfd
553 69 : ts.it_value.tv_nsec = 1;
554 : }
555 : else
556 : {
557 50835 : auto nsec = std::chrono::duration_cast<std::chrono::nanoseconds>(
558 101670 : nearest - now).count();
559 50835 : ts.it_value.tv_sec = nsec / 1000000000;
560 50835 : ts.it_value.tv_nsec = nsec % 1000000000;
561 : // Ensure non-zero to avoid disarming if duration rounds to 0
562 50835 : if (ts.it_value.tv_sec == 0 && ts.it_value.tv_nsec == 0)
563 0 : ts.it_value.tv_nsec = 1;
564 : }
565 : }
566 :
567 51235 : if (::timerfd_settime(timer_fd_, flags, &ts, nullptr) < 0)
568 0 : detail::throw_system_error(make_err(errno), "timerfd_settime");
569 51235 : }
570 :
571 : void
572 48354 : epoll_scheduler::
573 : run_reactor(std::unique_lock<std::mutex>& lock)
574 : {
575 48354 : int timeout_ms = reactor_interrupted_ ? 0 : -1;
576 :
577 48354 : lock.unlock();
578 :
579 : epoll_event events[128];
580 48354 : int nfds = ::epoll_wait(epoll_fd_, events, 128, timeout_ms);
581 48354 : int saved_errno = errno;
582 :
583 48354 : timer_svc_->process_expired();
584 48354 : update_timerfd();
585 :
586 48354 : if (nfds < 0 && saved_errno != EINTR)
587 0 : detail::throw_system_error(make_err(saved_errno), "epoll_wait");
588 :
589 48354 : lock.lock();
590 :
591 48354 : int completions_queued = 0;
592 113048 : for (int i = 0; i < nfds; ++i)
593 : {
594 64694 : if (events[i].data.ptr == nullptr)
595 : {
596 : std::uint64_t val;
597 241 : [[maybe_unused]] auto r = ::read(event_fd_, &val, sizeof(val));
598 241 : eventfd_armed_.store(false, std::memory_order_relaxed);
599 241 : continue;
600 241 : }
601 :
602 : // timerfd_settime() in update_timerfd() resets the readable state
603 64453 : if (events[i].data.ptr == &timer_fd_)
604 2870 : continue;
605 :
606 61583 : auto* desc = static_cast<descriptor_data*>(events[i].data.ptr);
607 61583 : std::uint32_t ev = events[i].events;
608 61583 : int err = 0;
609 :
610 61583 : if (ev & (EPOLLERR | EPOLLHUP))
611 : {
612 46 : socklen_t len = sizeof(err);
613 46 : if (::getsockopt(desc->fd, SOL_SOCKET, SO_ERROR, &err, &len) < 0)
614 0 : err = errno;
615 46 : if (err == 0)
616 46 : err = EIO;
617 : }
618 :
619 61583 : if (ev & EPOLLIN)
620 : {
621 18978 : auto* op = desc->read_op.exchange(nullptr, std::memory_order_acq_rel);
622 18978 : if (op)
623 : {
624 2691 : if (err)
625 : {
626 0 : op->complete(err, 0);
627 0 : completed_ops_.push(op);
628 0 : ++completions_queued;
629 : }
630 : else
631 : {
632 2691 : op->perform_io();
633 2691 : if (op->errn == EAGAIN || op->errn == EWOULDBLOCK)
634 : {
635 0 : op->errn = 0;
636 0 : desc->read_op.store(op, std::memory_order_release);
637 : }
638 : else
639 : {
640 2691 : completed_ops_.push(op);
641 2691 : ++completions_queued;
642 : }
643 : }
644 : }
645 : else
646 : {
647 16287 : desc->read_ready.store(true, std::memory_order_release);
648 : }
649 : }
650 :
651 61583 : if (ev & EPOLLOUT)
652 : {
653 : // Connect uses write readiness - try it first
654 58943 : auto* conn_op = desc->connect_op.exchange(nullptr, std::memory_order_acq_rel);
655 58943 : if (conn_op)
656 : {
657 2600 : if (err)
658 : {
659 0 : conn_op->complete(err, 0);
660 0 : completed_ops_.push(conn_op);
661 0 : ++completions_queued;
662 : }
663 : else
664 : {
665 2600 : conn_op->perform_io();
666 2600 : if (conn_op->errn == EAGAIN || conn_op->errn == EWOULDBLOCK)
667 : {
668 0 : conn_op->errn = 0;
669 0 : desc->connect_op.store(conn_op, std::memory_order_release);
670 : }
671 : else
672 : {
673 2600 : completed_ops_.push(conn_op);
674 2600 : ++completions_queued;
675 : }
676 : }
677 : }
678 :
679 58943 : auto* write_op = desc->write_op.exchange(nullptr, std::memory_order_acq_rel);
680 58943 : if (write_op)
681 : {
682 0 : if (err)
683 : {
684 0 : write_op->complete(err, 0);
685 0 : completed_ops_.push(write_op);
686 0 : ++completions_queued;
687 : }
688 : else
689 : {
690 0 : write_op->perform_io();
691 0 : if (write_op->errn == EAGAIN || write_op->errn == EWOULDBLOCK)
692 : {
693 0 : write_op->errn = 0;
694 0 : desc->write_op.store(write_op, std::memory_order_release);
695 : }
696 : else
697 : {
698 0 : completed_ops_.push(write_op);
699 0 : ++completions_queued;
700 : }
701 : }
702 : }
703 :
704 58943 : if (!conn_op && !write_op)
705 56343 : desc->write_ready.store(true, std::memory_order_release);
706 : }
707 :
708 : // Handle error for ops not processed above (no EPOLLIN/EPOLLOUT)
709 61583 : if (err && !(ev & (EPOLLIN | EPOLLOUT)))
710 : {
711 0 : auto* read_op = desc->read_op.exchange(nullptr, std::memory_order_acq_rel);
712 0 : if (read_op)
713 : {
714 0 : read_op->complete(err, 0);
715 0 : completed_ops_.push(read_op);
716 0 : ++completions_queued;
717 : }
718 :
719 0 : auto* write_op = desc->write_op.exchange(nullptr, std::memory_order_acq_rel);
720 0 : if (write_op)
721 : {
722 0 : write_op->complete(err, 0);
723 0 : completed_ops_.push(write_op);
724 0 : ++completions_queued;
725 : }
726 :
727 0 : auto* conn_op = desc->connect_op.exchange(nullptr, std::memory_order_acq_rel);
728 0 : if (conn_op)
729 : {
730 0 : conn_op->complete(err, 0);
731 0 : completed_ops_.push(conn_op);
732 0 : ++completions_queued;
733 : }
734 : }
735 : }
736 :
737 48354 : if (completions_queued > 0)
738 : {
739 2691 : if (completions_queued == 1)
740 91 : wakeup_event_.notify_one();
741 : else
742 2600 : wakeup_event_.notify_all();
743 : }
744 48354 : }
745 :
746 : std::size_t
747 119667 : epoll_scheduler::
748 : do_one(long timeout_us)
749 : {
750 119667 : std::unique_lock lock(mutex_);
751 :
752 : for (;;)
753 : {
754 168021 : if (stopped_.load(std::memory_order_acquire))
755 20 : return 0;
756 :
757 168001 : scheduler_op* op = completed_ops_.pop();
758 :
759 168001 : if (op == &task_op_)
760 : {
761 48477 : bool more_handlers = !completed_ops_.empty();
762 :
763 48477 : if (!more_handlers)
764 : {
765 11468 : if (outstanding_work_.load(std::memory_order_acquire) == 0)
766 : {
767 123 : completed_ops_.push(&task_op_);
768 123 : return 0;
769 : }
770 5611 : if (timeout_us == 0)
771 : {
772 0 : completed_ops_.push(&task_op_);
773 0 : return 0;
774 : }
775 : }
776 :
777 48354 : reactor_interrupted_ = more_handlers || timeout_us == 0;
778 48354 : reactor_running_ = true;
779 :
780 48354 : if (more_handlers && idle_thread_count_ > 0)
781 0 : wakeup_event_.notify_one();
782 :
783 48354 : run_reactor(lock);
784 :
785 48354 : reactor_running_ = false;
786 48354 : completed_ops_.push(&task_op_);
787 48354 : continue;
788 48354 : }
789 :
790 119524 : if (op != nullptr)
791 : {
792 119522 : lock.unlock();
793 119522 : work_guard g{this};
794 119522 : (*op)();
795 119522 : return 1;
796 119522 : }
797 :
798 4 : if (outstanding_work_.load(std::memory_order_acquire) == 0)
799 2 : return 0;
800 :
801 0 : if (timeout_us == 0)
802 0 : return 0;
803 :
804 0 : ++idle_thread_count_;
805 0 : if (timeout_us < 0)
806 0 : wakeup_event_.wait(lock);
807 : else
808 0 : wakeup_event_.wait_for(lock, std::chrono::microseconds(timeout_us));
809 0 : --idle_thread_count_;
810 48354 : }
811 119667 : }
812 :
813 : } // namespace boost::corosio::detail
814 :
815 : #endif
|