LCOV - code coverage report
Current view: top level - src/detail/epoll - scheduler.cpp (source / functions) Coverage Total Hit
Test: coverage_filtered.info Lines: 77.6 % 379 294
Test Date: 2026-02-04 14:16:13 Functions: 91.7 % 36 33

            Line data    Source code
       1              : //
       2              : // Copyright (c) 2026 Steve Gerbino
       3              : //
       4              : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       5              : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       6              : //
       7              : // Official repository: https://github.com/cppalliance/corosio
       8              : //
       9              : 
      10              : #include <boost/corosio/detail/platform.hpp>
      11              : 
      12              : #if BOOST_COROSIO_HAS_EPOLL
      13              : 
      14              : #include "src/detail/epoll/scheduler.hpp"
      15              : #include "src/detail/epoll/op.hpp"
      16              : #include "src/detail/make_err.hpp"
      17              : #include "src/detail/posix/resolver_service.hpp"
      18              : #include "src/detail/posix/signals.hpp"
      19              : 
      20              : #include <boost/corosio/detail/except.hpp>
      21              : #include <boost/corosio/detail/thread_local_ptr.hpp>
      22              : 
      23              : #include <atomic>
      24              : #include <chrono>
      25              : #include <limits>
      26              : 
      27              : #include <errno.h>
      28              : #include <fcntl.h>
      29              : #include <sys/epoll.h>
      30              : #include <sys/eventfd.h>
      31              : #include <sys/socket.h>
      32              : #include <sys/timerfd.h>
      33              : #include <unistd.h>
      34              : 
      35              : /*
      36              :     epoll Scheduler - Single Reactor Model
      37              :     ======================================
      38              : 
      39              :     This scheduler uses a thread coordination strategy to provide handler
      40              :     parallelism and avoid the thundering herd problem.
      41              :     Instead of all threads blocking on epoll_wait(), one thread becomes the
      42              :     "reactor" while others wait on a condition variable for handler work.
      43              : 
      44              :     Thread Model
      45              :     ------------
      46              :     - ONE thread runs epoll_wait() at a time (the reactor thread)
      47              :     - OTHER threads wait on wakeup_event_ (condition variable) for handlers
      48              :     - When work is posted, exactly one waiting thread wakes via notify_one()
      49              :     - This matches Windows IOCP semantics where N posted items wake N threads
      50              : 
      51              :     Event Loop Structure (do_one)
      52              :     -----------------------------
      53              :     1. Lock mutex, try to pop handler from queue
      54              :     2. If got handler: execute it (unlocked), return
      55              :     3. If queue empty and no reactor running: become reactor
      56              :        - Run epoll_wait (unlocked), queue I/O completions, loop back
      57              :     4. If queue empty and reactor running: wait on condvar for work
      58              : 
      59              :     The reactor_running_ flag ensures only one thread owns epoll_wait().
      60              :     After the reactor queues I/O completions, it loops back to try getting
      61              :     a handler, giving priority to handler execution over more I/O polling.
      62              : 
      63              :     Wake Coordination (wake_one_thread_and_unlock)
      64              :     ----------------------------------------------
      65              :     When posting work:
      66              :     - If idle threads exist: notify_one() wakes exactly one worker
      67              :     - Else if reactor running: interrupt via eventfd write
      68              :     - Else: no-op (thread will find work when it checks queue)
      69              : 
      70              :     This is critical for matching IOCP behavior. With the old model, posting
      71              :     N handlers would wake all threads (thundering herd). Now each post()
      72              :     wakes at most one thread, and that thread handles exactly one item.
      73              : 
      74              :     Work Counting
      75              :     -------------
      76              :     outstanding_work_ tracks pending operations. When it hits zero, run()
      77              :     returns. Each operation increments on start, decrements on completion.
      78              : 
      79              :     Timer Integration
      80              :     -----------------
      81              :     Timers are handled by timer_service. The reactor adjusts epoll_wait
      82              :     timeout to wake for the nearest timer expiry. When a new timer is
      83              :     scheduled earlier than current, timer_service calls interrupt_reactor()
      84              :     to re-evaluate the timeout.
      85              : */
      86              : 
      87              : namespace boost::corosio::detail {
      88              : 
      89              : namespace {
      90              : 
      91              : struct scheduler_context
      92              : {
      93              :     epoll_scheduler const* key;
      94              :     scheduler_context* next;
      95              : };
      96              : 
      97              : corosio::detail::thread_local_ptr<scheduler_context> context_stack;
      98              : 
      99              : struct thread_context_guard
     100              : {
     101              :     scheduler_context frame_;
     102              : 
     103          154 :     explicit thread_context_guard(
     104              :         epoll_scheduler const* ctx) noexcept
     105          154 :         : frame_{ctx, context_stack.get()}
     106              :     {
     107          154 :         context_stack.set(&frame_);
     108          154 :     }
     109              : 
     110          154 :     ~thread_context_guard() noexcept
     111              :     {
     112          154 :         context_stack.set(frame_.next);
     113          154 :     }
     114              : };
     115              : 
     116              : } // namespace
     117              : 
     118          184 : epoll_scheduler::
     119              : epoll_scheduler(
     120              :     capy::execution_context& ctx,
     121          184 :     int)
     122          184 :     : epoll_fd_(-1)
     123          184 :     , event_fd_(-1)
     124          184 :     , timer_fd_(-1)
     125          184 :     , outstanding_work_(0)
     126          184 :     , stopped_(false)
     127          184 :     , shutdown_(false)
     128          184 :     , reactor_running_(false)
     129          184 :     , reactor_interrupted_(false)
     130          368 :     , idle_thread_count_(0)
     131              : {
     132          184 :     epoll_fd_ = ::epoll_create1(EPOLL_CLOEXEC);
     133          184 :     if (epoll_fd_ < 0)
     134            0 :         detail::throw_system_error(make_err(errno), "epoll_create1");
     135              : 
     136          184 :     event_fd_ = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
     137          184 :     if (event_fd_ < 0)
     138              :     {
     139            0 :         int errn = errno;
     140            0 :         ::close(epoll_fd_);
     141            0 :         detail::throw_system_error(make_err(errn), "eventfd");
     142              :     }
     143              : 
     144          184 :     timer_fd_ = ::timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC);
     145          184 :     if (timer_fd_ < 0)
     146              :     {
     147            0 :         int errn = errno;
     148            0 :         ::close(event_fd_);
     149            0 :         ::close(epoll_fd_);
     150            0 :         detail::throw_system_error(make_err(errn), "timerfd_create");
     151              :     }
     152              : 
     153          184 :     epoll_event ev{};
     154          184 :     ev.events = EPOLLIN | EPOLLET;
     155          184 :     ev.data.ptr = nullptr;
     156          184 :     if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, event_fd_, &ev) < 0)
     157              :     {
     158            0 :         int errn = errno;
     159            0 :         ::close(timer_fd_);
     160            0 :         ::close(event_fd_);
     161            0 :         ::close(epoll_fd_);
     162            0 :         detail::throw_system_error(make_err(errn), "epoll_ctl");
     163              :     }
     164              : 
     165          184 :     epoll_event timer_ev{};
     166          184 :     timer_ev.events = EPOLLIN | EPOLLERR;
     167          184 :     timer_ev.data.ptr = &timer_fd_;
     168          184 :     if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, timer_fd_, &timer_ev) < 0)
     169              :     {
     170            0 :         int errn = errno;
     171            0 :         ::close(timer_fd_);
     172            0 :         ::close(event_fd_);
     173            0 :         ::close(epoll_fd_);
     174            0 :         detail::throw_system_error(make_err(errn), "epoll_ctl (timerfd)");
     175              :     }
     176              : 
     177          184 :     timer_svc_ = &get_timer_service(ctx, *this);
     178          184 :     timer_svc_->set_on_earliest_changed(
     179              :         timer_service::callback(
     180              :             this,
     181         2881 :             [](void* p) { static_cast<epoll_scheduler*>(p)->update_timerfd(); }));
     182              : 
     183              :     // Initialize resolver service
     184          184 :     get_resolver_service(ctx, *this);
     185              : 
     186              :     // Initialize signal service
     187          184 :     get_signal_service(ctx, *this);
     188              : 
     189              :     // Push task sentinel to interleave reactor runs with handler execution
     190          184 :     completed_ops_.push(&task_op_);
     191          184 : }
     192              : 
     193          368 : epoll_scheduler::
     194          184 : ~epoll_scheduler()
     195              : {
     196          184 :     if (timer_fd_ >= 0)
     197          184 :         ::close(timer_fd_);
     198          184 :     if (event_fd_ >= 0)
     199          184 :         ::close(event_fd_);
     200          184 :     if (epoll_fd_ >= 0)
     201          184 :         ::close(epoll_fd_);
     202          368 : }
     203              : 
     204              : void
     205          184 : epoll_scheduler::
     206              : shutdown()
     207              : {
     208              :     {
     209          184 :         std::unique_lock lock(mutex_);
     210          184 :         shutdown_ = true;
     211              : 
     212          368 :         while (auto* h = completed_ops_.pop())
     213              :         {
     214          184 :             if (h == &task_op_)
     215          184 :                 continue;
     216            0 :             lock.unlock();
     217            0 :             h->destroy();
     218            0 :             lock.lock();
     219          184 :         }
     220          184 :     }
     221              : 
     222          184 :     outstanding_work_.store(0, std::memory_order_release);
     223              : 
     224          184 :     if (event_fd_ >= 0)
     225          184 :         interrupt_reactor();
     226              : 
     227          184 :     wakeup_event_.notify_all();
     228          184 : }
     229              : 
     230              : void
     231         1639 : epoll_scheduler::
     232              : post(capy::coro h) const
     233              : {
     234              :     struct post_handler final
     235              :         : scheduler_op
     236              :     {
     237              :         capy::coro h_;
     238              : 
     239              :         explicit
     240         1639 :         post_handler(capy::coro h)
     241         1639 :             : h_(h)
     242              :         {
     243         1639 :         }
     244              : 
     245         3278 :         ~post_handler() = default;
     246              : 
     247         1639 :         void operator()() override
     248              :         {
     249         1639 :             auto h = h_;
     250         1639 :             delete this;
     251              :             std::atomic_thread_fence(std::memory_order_acquire);
     252         1639 :             h.resume();
     253         1639 :         }
     254              : 
     255            0 :         void destroy() override
     256              :         {
     257            0 :             delete this;
     258            0 :         }
     259              :     };
     260              : 
     261         1639 :     auto ph = std::make_unique<post_handler>(h);
     262         1639 :     outstanding_work_.fetch_add(1, std::memory_order_relaxed);
     263              : 
     264         1639 :     std::unique_lock lock(mutex_);
     265         1639 :     completed_ops_.push(ph.release());
     266         1639 :     wake_one_thread_and_unlock(lock);
     267         1639 : }
     268              : 
     269              : void
     270       112592 : epoll_scheduler::
     271              : post(scheduler_op* h) const
     272              : {
     273       112592 :     outstanding_work_.fetch_add(1, std::memory_order_relaxed);
     274              : 
     275       112592 :     std::unique_lock lock(mutex_);
     276       112592 :     completed_ops_.push(h);
     277       112592 :     wake_one_thread_and_unlock(lock);
     278       112592 : }
     279              : 
     280              : void
     281         2905 : epoll_scheduler::
     282              : on_work_started() noexcept
     283              : {
     284         2905 :     outstanding_work_.fetch_add(1, std::memory_order_relaxed);
     285         2905 : }
     286              : 
     287              : void
     288         2873 : epoll_scheduler::
     289              : on_work_finished() noexcept
     290              : {
     291         5746 :     if (outstanding_work_.fetch_sub(1, std::memory_order_acq_rel) == 1)
     292           18 :         stop();
     293         2873 : }
     294              : 
     295              : bool
     296         3120 : epoll_scheduler::
     297              : running_in_this_thread() const noexcept
     298              : {
     299         3120 :     for (auto* c = context_stack.get(); c != nullptr; c = c->next)
     300         2910 :         if (c->key == this)
     301         2910 :             return true;
     302          210 :     return false;
     303              : }
     304              : 
     305              : void
     306           37 : epoll_scheduler::
     307              : stop()
     308              : {
     309           37 :     bool expected = false;
     310           37 :     if (stopped_.compare_exchange_strong(expected, true,
     311              :             std::memory_order_release, std::memory_order_relaxed))
     312              :     {
     313              :         // Wake all threads so they notice stopped_ and exit
     314              :         {
     315           37 :             std::lock_guard lock(mutex_);
     316           37 :             wakeup_event_.notify_all();
     317           37 :         }
     318           37 :         interrupt_reactor();
     319              :     }
     320           37 : }
     321              : 
     322              : bool
     323           16 : epoll_scheduler::
     324              : stopped() const noexcept
     325              : {
     326           16 :     return stopped_.load(std::memory_order_acquire);
     327              : }
     328              : 
     329              : void
     330           49 : epoll_scheduler::
     331              : restart()
     332              : {
     333           49 :     stopped_.store(false, std::memory_order_release);
     334           49 : }
     335              : 
     336              : std::size_t
     337          175 : epoll_scheduler::
     338              : run()
     339              : {
     340          175 :     if (stopped_.load(std::memory_order_acquire))
     341           21 :         return 0;
     342              : 
     343          308 :     if (outstanding_work_.load(std::memory_order_acquire) == 0)
     344              :     {
     345           11 :         stop();
     346           11 :         return 0;
     347              :     }
     348              : 
     349          143 :     thread_context_guard ctx(this);
     350              : 
     351          143 :     std::size_t n = 0;
     352       119654 :     while (do_one(-1))
     353       119511 :         if (n != (std::numeric_limits<std::size_t>::max)())
     354       119511 :             ++n;
     355          143 :     return n;
     356          143 : }
     357              : 
     358              : std::size_t
     359            2 : epoll_scheduler::
     360              : run_one()
     361              : {
     362            2 :     if (stopped_.load(std::memory_order_acquire))
     363            0 :         return 0;
     364              : 
     365            4 :     if (outstanding_work_.load(std::memory_order_acquire) == 0)
     366              :     {
     367            0 :         stop();
     368            0 :         return 0;
     369              :     }
     370              : 
     371            2 :     thread_context_guard ctx(this);
     372            2 :     return do_one(-1);
     373            2 : }
     374              : 
     375              : std::size_t
     376           10 : epoll_scheduler::
     377              : wait_one(long usec)
     378              : {
     379           10 :     if (stopped_.load(std::memory_order_acquire))
     380            0 :         return 0;
     381              : 
     382           20 :     if (outstanding_work_.load(std::memory_order_acquire) == 0)
     383              :     {
     384            4 :         stop();
     385            4 :         return 0;
     386              :     }
     387              : 
     388            6 :     thread_context_guard ctx(this);
     389            6 :     return do_one(usec);
     390            6 : }
     391              : 
     392              : std::size_t
     393            2 : epoll_scheduler::
     394              : poll()
     395              : {
     396            2 :     if (stopped_.load(std::memory_order_acquire))
     397            0 :         return 0;
     398              : 
     399            4 :     if (outstanding_work_.load(std::memory_order_acquire) == 0)
     400              :     {
     401            1 :         stop();
     402            1 :         return 0;
     403              :     }
     404              : 
     405            1 :     thread_context_guard ctx(this);
     406              : 
     407            1 :     std::size_t n = 0;
     408            3 :     while (do_one(0))
     409            2 :         if (n != (std::numeric_limits<std::size_t>::max)())
     410            2 :             ++n;
     411            1 :     return n;
     412            1 : }
     413              : 
     414              : std::size_t
     415            4 : epoll_scheduler::
     416              : poll_one()
     417              : {
     418            4 :     if (stopped_.load(std::memory_order_acquire))
     419            0 :         return 0;
     420              : 
     421            8 :     if (outstanding_work_.load(std::memory_order_acquire) == 0)
     422              :     {
     423            2 :         stop();
     424            2 :         return 0;
     425              :     }
     426              : 
     427            2 :     thread_context_guard ctx(this);
     428            2 :     return do_one(0);
     429            2 : }
     430              : 
     431              : void
     432         5353 : epoll_scheduler::
     433              : register_descriptor(int fd, descriptor_data* desc) const
     434              : {
     435         5353 :     epoll_event ev{};
     436         5353 :     ev.events = EPOLLIN | EPOLLOUT | EPOLLET | EPOLLERR | EPOLLHUP;
     437         5353 :     ev.data.ptr = desc;
     438              : 
     439         5353 :     if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &ev) < 0)
     440            0 :         detail::throw_system_error(make_err(errno), "epoll_ctl (register)");
     441              : 
     442         5353 :     desc->registered_events = ev.events;
     443         5353 :     desc->is_registered = true;
     444         5353 :     desc->fd = fd;
     445         5353 :     desc->read_ready.store(false, std::memory_order_relaxed);
     446         5353 :     desc->write_ready.store(false, std::memory_order_relaxed);
     447         5353 : }
     448              : 
     449              : void
     450            0 : epoll_scheduler::
     451              : update_descriptor_events(int, descriptor_data*, std::uint32_t) const
     452              : {
     453              :     // Provides memory fence for operation pointer visibility across threads
     454              :     std::atomic_thread_fence(std::memory_order_seq_cst);
     455            0 : }
     456              : 
     457              : void
     458         5353 : epoll_scheduler::
     459              : deregister_descriptor(int fd) const
     460              : {
     461         5353 :     ::epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, nullptr);
     462         5353 : }
     463              : 
     464              : void
     465         5460 : epoll_scheduler::
     466              : work_started() const noexcept
     467              : {
     468         5460 :     outstanding_work_.fetch_add(1, std::memory_order_relaxed);
     469         5460 : }
     470              : 
     471              : void
     472       119723 : epoll_scheduler::
     473              : work_finished() const noexcept
     474              : {
     475       239446 :     if (outstanding_work_.fetch_sub(1, std::memory_order_acq_rel) == 1)
     476              :     {
     477              :         // Last work item completed - wake all threads so they can exit.
     478              :         // notify_all() wakes threads waiting on the condvar.
     479              :         // interrupt_reactor() wakes the reactor thread blocked in epoll_wait().
     480              :         // Both are needed because they target different blocking mechanisms.
     481          130 :         std::unique_lock lock(mutex_);
     482          130 :         wakeup_event_.notify_all();
     483          130 :         if (reactor_running_ && !reactor_interrupted_)
     484              :         {
     485            2 :             reactor_interrupted_ = true;
     486            2 :             lock.unlock();
     487            2 :             interrupt_reactor();
     488              :         }
     489          130 :     }
     490       119723 : }
     491              : 
     492              : void
     493          462 : epoll_scheduler::
     494              : interrupt_reactor() const
     495              : {
     496              :     // Only write if not already armed to avoid redundant writes
     497          462 :     bool expected = false;
     498          462 :     if (eventfd_armed_.compare_exchange_strong(expected, true,
     499              :             std::memory_order_release, std::memory_order_relaxed))
     500              :     {
     501          425 :         std::uint64_t val = 1;
     502          425 :         [[maybe_unused]] auto r = ::write(event_fd_, &val, sizeof(val));
     503              :     }
     504          462 : }
     505              : 
     506              : void
     507       114231 : epoll_scheduler::
     508              : wake_one_thread_and_unlock(std::unique_lock<std::mutex>& lock) const
     509              : {
     510       114231 :     if (idle_thread_count_ > 0)
     511              :     {
     512            0 :         wakeup_event_.notify_one();
     513            0 :         lock.unlock();
     514              :     }
     515       114231 :     else if (reactor_running_ && !reactor_interrupted_)
     516              :     {
     517          239 :         reactor_interrupted_ = true;
     518          239 :         lock.unlock();
     519          239 :         interrupt_reactor();
     520              :     }
     521              :     else
     522              :     {
     523       113992 :         lock.unlock();
     524              :     }
     525       114231 : }
     526              : 
     527              : struct work_guard
     528              : {
     529              :     epoll_scheduler const* self;
     530            0 :     ~work_guard() { self->work_finished(); }
     531              : };
     532              : 
     533              : void
     534        51235 : epoll_scheduler::
     535              : update_timerfd() const
     536              : {
     537        51235 :     auto nearest = timer_svc_->nearest_expiry();
     538              : 
     539        51235 :     itimerspec ts{};
     540        51235 :     int flags = 0;
     541              : 
     542        51235 :     if (nearest == timer_service::time_point::max())
     543              :     {
     544              :         // No timers - disarm by setting to 0 (relative)
     545              :         // ts is already zeroed
     546              :     }
     547              :     else
     548              :     {
     549        50904 :         auto now = std::chrono::steady_clock::now();
     550        50904 :         if (nearest <= now)
     551              :         {
     552              :             // Use 1ns instead of 0 - zero disarms the timerfd
     553           69 :             ts.it_value.tv_nsec = 1;
     554              :         }
     555              :         else
     556              :         {
     557        50835 :             auto nsec = std::chrono::duration_cast<std::chrono::nanoseconds>(
     558       101670 :                 nearest - now).count();
     559        50835 :             ts.it_value.tv_sec = nsec / 1000000000;
     560        50835 :             ts.it_value.tv_nsec = nsec % 1000000000;
     561              :             // Ensure non-zero to avoid disarming if duration rounds to 0
     562        50835 :             if (ts.it_value.tv_sec == 0 && ts.it_value.tv_nsec == 0)
     563            0 :                 ts.it_value.tv_nsec = 1;
     564              :         }
     565              :     }
     566              : 
     567        51235 :     if (::timerfd_settime(timer_fd_, flags, &ts, nullptr) < 0)
     568            0 :         detail::throw_system_error(make_err(errno), "timerfd_settime");
     569        51235 : }
     570              : 
     571              : void
     572        48354 : epoll_scheduler::
     573              : run_reactor(std::unique_lock<std::mutex>& lock)
     574              : {
     575        48354 :     int timeout_ms = reactor_interrupted_ ? 0 : -1;
     576              : 
     577        48354 :     lock.unlock();
     578              : 
     579              :     epoll_event events[128];
     580        48354 :     int nfds = ::epoll_wait(epoll_fd_, events, 128, timeout_ms);
     581        48354 :     int saved_errno = errno;
     582              : 
     583        48354 :     timer_svc_->process_expired();
     584        48354 :     update_timerfd();
     585              : 
     586        48354 :     if (nfds < 0 && saved_errno != EINTR)
     587            0 :         detail::throw_system_error(make_err(saved_errno), "epoll_wait");
     588              : 
     589        48354 :     lock.lock();
     590              : 
     591        48354 :     int completions_queued = 0;
     592       113048 :     for (int i = 0; i < nfds; ++i)
     593              :     {
     594        64694 :         if (events[i].data.ptr == nullptr)
     595              :         {
     596              :             std::uint64_t val;
     597          241 :             [[maybe_unused]] auto r = ::read(event_fd_, &val, sizeof(val));
     598          241 :             eventfd_armed_.store(false, std::memory_order_relaxed);
     599          241 :             continue;
     600          241 :         }
     601              : 
     602              :         // timerfd_settime() in update_timerfd() resets the readable state
     603        64453 :         if (events[i].data.ptr == &timer_fd_)
     604         2870 :             continue;
     605              : 
     606        61583 :         auto* desc = static_cast<descriptor_data*>(events[i].data.ptr);
     607        61583 :         std::uint32_t ev = events[i].events;
     608        61583 :         int err = 0;
     609              : 
     610        61583 :         if (ev & (EPOLLERR | EPOLLHUP))
     611              :         {
     612           46 :             socklen_t len = sizeof(err);
     613           46 :             if (::getsockopt(desc->fd, SOL_SOCKET, SO_ERROR, &err, &len) < 0)
     614            0 :                 err = errno;
     615           46 :             if (err == 0)
     616           46 :                 err = EIO;
     617              :         }
     618              : 
     619        61583 :         if (ev & EPOLLIN)
     620              :         {
     621        18978 :             auto* op = desc->read_op.exchange(nullptr, std::memory_order_acq_rel);
     622        18978 :             if (op)
     623              :             {
     624         2691 :                 if (err)
     625              :                 {
     626            0 :                     op->complete(err, 0);
     627            0 :                     completed_ops_.push(op);
     628            0 :                     ++completions_queued;
     629              :                 }
     630              :                 else
     631              :                 {
     632         2691 :                     op->perform_io();
     633         2691 :                     if (op->errn == EAGAIN || op->errn == EWOULDBLOCK)
     634              :                     {
     635            0 :                         op->errn = 0;
     636            0 :                         desc->read_op.store(op, std::memory_order_release);
     637              :                     }
     638              :                     else
     639              :                     {
     640         2691 :                         completed_ops_.push(op);
     641         2691 :                         ++completions_queued;
     642              :                     }
     643              :                 }
     644              :             }
     645              :             else
     646              :             {
     647        16287 :                 desc->read_ready.store(true, std::memory_order_release);
     648              :             }
     649              :         }
     650              : 
     651        61583 :         if (ev & EPOLLOUT)
     652              :         {
     653              :             // Connect uses write readiness - try it first
     654        58943 :             auto* conn_op = desc->connect_op.exchange(nullptr, std::memory_order_acq_rel);
     655        58943 :             if (conn_op)
     656              :             {
     657         2600 :                 if (err)
     658              :                 {
     659            0 :                     conn_op->complete(err, 0);
     660            0 :                     completed_ops_.push(conn_op);
     661            0 :                     ++completions_queued;
     662              :                 }
     663              :                 else
     664              :                 {
     665         2600 :                     conn_op->perform_io();
     666         2600 :                     if (conn_op->errn == EAGAIN || conn_op->errn == EWOULDBLOCK)
     667              :                     {
     668            0 :                         conn_op->errn = 0;
     669            0 :                         desc->connect_op.store(conn_op, std::memory_order_release);
     670              :                     }
     671              :                     else
     672              :                     {
     673         2600 :                         completed_ops_.push(conn_op);
     674         2600 :                         ++completions_queued;
     675              :                     }
     676              :                 }
     677              :             }
     678              : 
     679        58943 :             auto* write_op = desc->write_op.exchange(nullptr, std::memory_order_acq_rel);
     680        58943 :             if (write_op)
     681              :             {
     682            0 :                 if (err)
     683              :                 {
     684            0 :                     write_op->complete(err, 0);
     685            0 :                     completed_ops_.push(write_op);
     686            0 :                     ++completions_queued;
     687              :                 }
     688              :                 else
     689              :                 {
     690            0 :                     write_op->perform_io();
     691            0 :                     if (write_op->errn == EAGAIN || write_op->errn == EWOULDBLOCK)
     692              :                     {
     693            0 :                         write_op->errn = 0;
     694            0 :                         desc->write_op.store(write_op, std::memory_order_release);
     695              :                     }
     696              :                     else
     697              :                     {
     698            0 :                         completed_ops_.push(write_op);
     699            0 :                         ++completions_queued;
     700              :                     }
     701              :                 }
     702              :             }
     703              : 
     704        58943 :             if (!conn_op && !write_op)
     705        56343 :                 desc->write_ready.store(true, std::memory_order_release);
     706              :         }
     707              : 
     708              :         // Handle error for ops not processed above (no EPOLLIN/EPOLLOUT)
     709        61583 :         if (err && !(ev & (EPOLLIN | EPOLLOUT)))
     710              :         {
     711            0 :             auto* read_op = desc->read_op.exchange(nullptr, std::memory_order_acq_rel);
     712            0 :             if (read_op)
     713              :             {
     714            0 :                 read_op->complete(err, 0);
     715            0 :                 completed_ops_.push(read_op);
     716            0 :                 ++completions_queued;
     717              :             }
     718              : 
     719            0 :             auto* write_op = desc->write_op.exchange(nullptr, std::memory_order_acq_rel);
     720            0 :             if (write_op)
     721              :             {
     722            0 :                 write_op->complete(err, 0);
     723            0 :                 completed_ops_.push(write_op);
     724            0 :                 ++completions_queued;
     725              :             }
     726              : 
     727            0 :             auto* conn_op = desc->connect_op.exchange(nullptr, std::memory_order_acq_rel);
     728            0 :             if (conn_op)
     729              :             {
     730            0 :                 conn_op->complete(err, 0);
     731            0 :                 completed_ops_.push(conn_op);
     732            0 :                 ++completions_queued;
     733              :             }
     734              :         }
     735              :     }
     736              : 
     737        48354 :     if (completions_queued > 0)
     738              :     {
     739         2691 :         if (completions_queued == 1)
     740           91 :             wakeup_event_.notify_one();
     741              :         else
     742         2600 :             wakeup_event_.notify_all();
     743              :     }
     744        48354 : }
     745              : 
     746              : std::size_t
     747       119667 : epoll_scheduler::
     748              : do_one(long timeout_us)
     749              : {
     750       119667 :     std::unique_lock lock(mutex_);
     751              : 
     752              :     for (;;)
     753              :     {
     754       168021 :         if (stopped_.load(std::memory_order_acquire))
     755           20 :             return 0;
     756              : 
     757       168001 :         scheduler_op* op = completed_ops_.pop();
     758              : 
     759       168001 :         if (op == &task_op_)
     760              :         {
     761        48477 :             bool more_handlers = !completed_ops_.empty();
     762              : 
     763        48477 :             if (!more_handlers)
     764              :             {
     765        11468 :                 if (outstanding_work_.load(std::memory_order_acquire) == 0)
     766              :                 {
     767          123 :                     completed_ops_.push(&task_op_);
     768          123 :                     return 0;
     769              :                 }
     770         5611 :                 if (timeout_us == 0)
     771              :                 {
     772            0 :                     completed_ops_.push(&task_op_);
     773            0 :                     return 0;
     774              :                 }
     775              :             }
     776              : 
     777        48354 :             reactor_interrupted_ = more_handlers || timeout_us == 0;
     778        48354 :             reactor_running_ = true;
     779              : 
     780        48354 :             if (more_handlers && idle_thread_count_ > 0)
     781            0 :                 wakeup_event_.notify_one();
     782              : 
     783        48354 :             run_reactor(lock);
     784              : 
     785        48354 :             reactor_running_ = false;
     786        48354 :             completed_ops_.push(&task_op_);
     787        48354 :             continue;
     788        48354 :         }
     789              : 
     790       119524 :         if (op != nullptr)
     791              :         {
     792       119522 :             lock.unlock();
     793       119522 :             work_guard g{this};
     794       119522 :             (*op)();
     795       119522 :             return 1;
     796       119522 :         }
     797              : 
     798            4 :         if (outstanding_work_.load(std::memory_order_acquire) == 0)
     799            2 :             return 0;
     800              : 
     801            0 :         if (timeout_us == 0)
     802            0 :             return 0;
     803              : 
     804            0 :         ++idle_thread_count_;
     805            0 :         if (timeout_us < 0)
     806            0 :             wakeup_event_.wait(lock);
     807              :         else
     808            0 :             wakeup_event_.wait_for(lock, std::chrono::microseconds(timeout_us));
     809            0 :         --idle_thread_count_;
     810        48354 :     }
     811       119667 : }
     812              : 
     813              : } // namespace boost::corosio::detail
     814              : 
     815              : #endif
        

Generated by: LCOV version 2.3