libs/corosio/src/corosio/src/detail/epoll/scheduler.hpp

0.0% Lines (0/2) 0.0% Functions (0/2) -% Branches (0/0)
libs/corosio/src/corosio/src/detail/epoll/scheduler.hpp
Line Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_DETAIL_EPOLL_SCHEDULER_HPP
11 #define BOOST_COROSIO_DETAIL_EPOLL_SCHEDULER_HPP
12
13 #include <boost/corosio/detail/platform.hpp>
14
15 #if BOOST_COROSIO_HAS_EPOLL
16
17 #include <boost/corosio/detail/config.hpp>
18 #include <boost/corosio/detail/scheduler.hpp>
19 #include <boost/capy/ex/execution_context.hpp>
20
21 #include "src/detail/scheduler_op.hpp"
22 #include "src/detail/timer_service.hpp"
23
24 #include <atomic>
25 #include <condition_variable>
26 #include <cstddef>
27 #include <cstdint>
28 #include <mutex>
29
30 namespace boost::corosio::detail {
31
32 struct epoll_op;
33 struct descriptor_data;
34
35 /** Linux scheduler using epoll for I/O multiplexing.
36
37 This scheduler implements the scheduler interface using Linux epoll
38 for efficient I/O event notification. It uses a single reactor model
39 where one thread runs epoll_wait while other threads
40 wait on a condition variable for handler work. This design provides:
41
42 - Handler parallelism: N posted handlers can execute on N threads
43 - No thundering herd: condition_variable wakes exactly one thread
44 - IOCP parity: Behavior matches Windows I/O completion port semantics
45
46 When threads call run(), they first try to execute queued handlers.
47 If the queue is empty and no reactor is running, one thread becomes
48 the reactor and runs epoll_wait. Other threads wait on a condition
49 variable until handlers are available.
50
51 @par Thread Safety
52 All public member functions are thread-safe.
53 */
54 class epoll_scheduler
55 : public scheduler
56 , public capy::execution_context::service
57 {
58 public:
59 using key_type = scheduler;
60
61 /** Construct the scheduler.
62
63 Creates an epoll instance, eventfd for reactor interruption,
64 and timerfd for kernel-managed timer expiry.
65
66 @param ctx Reference to the owning execution_context.
67 @param concurrency_hint Hint for expected thread count (unused).
68 */
69 epoll_scheduler(
70 capy::execution_context& ctx,
71 int concurrency_hint = -1);
72
73 ~epoll_scheduler();
74
75 epoll_scheduler(epoll_scheduler const&) = delete;
76 epoll_scheduler& operator=(epoll_scheduler const&) = delete;
77
78 void shutdown() override;
79 void post(capy::coro h) const override;
80 void post(scheduler_op* h) const override;
81 void on_work_started() noexcept override;
82 void on_work_finished() noexcept override;
83 bool running_in_this_thread() const noexcept override;
84 void stop() override;
85 bool stopped() const noexcept override;
86 void restart() override;
87 std::size_t run() override;
88 std::size_t run_one() override;
89 std::size_t wait_one(long usec) override;
90 std::size_t poll() override;
91 std::size_t poll_one() override;
92
93 /** Return the epoll file descriptor.
94
95 Used by socket services to register file descriptors
96 for I/O event notification.
97
98 @return The epoll file descriptor.
99 */
100 int epoll_fd() const noexcept { return epoll_fd_; }
101
102 /** Register a descriptor for persistent monitoring.
103
104 The fd is registered once and stays registered until explicitly
105 deregistered. Events are dispatched via descriptor_data which
106 tracks pending read/write/connect operations.
107
108 @param fd The file descriptor to register.
109 @param desc Pointer to descriptor data (stored in epoll_event.data.ptr).
110 */
111 void register_descriptor(int fd, descriptor_data* desc) const;
112
113 /** Update events for a persistently registered descriptor.
114
115 @param fd The file descriptor.
116 @param desc Pointer to descriptor data.
117 @param events The new events to monitor.
118 */
119 void update_descriptor_events(int fd, descriptor_data* desc, std::uint32_t events) const;
120
121 /** Deregister a persistently registered descriptor.
122
123 @param fd The file descriptor to deregister.
124 */
125 void deregister_descriptor(int fd) const;
126
127 /** For use by I/O operations to track pending work. */
128 void work_started() const noexcept override;
129
130 /** For use by I/O operations to track completed work. */
131 void work_finished() const noexcept override;
132
133 private:
134 std::size_t do_one(long timeout_us);
135 void run_reactor(std::unique_lock<std::mutex>& lock);
136 void wake_one_thread_and_unlock(std::unique_lock<std::mutex>& lock) const;
137 void interrupt_reactor() const;
138 void update_timerfd() const;
139
140 int epoll_fd_;
141 int event_fd_; // for interrupting reactor
142 int timer_fd_; // timerfd for kernel-managed timer expiry
143 mutable std::mutex mutex_;
144 mutable std::condition_variable wakeup_event_;
145 mutable op_queue completed_ops_;
146 mutable std::atomic<long> outstanding_work_;
147 std::atomic<bool> stopped_;
148 bool shutdown_;
149 timer_service* timer_svc_ = nullptr;
150
151 // Single reactor thread coordination
152 mutable bool reactor_running_ = false;
153 mutable bool reactor_interrupted_ = false;
154 mutable int idle_thread_count_ = 0;
155
156 // Edge-triggered eventfd state
157 mutable std::atomic<bool> eventfd_armed_{false};
158
159 // Sentinel operation for interleaving reactor runs with handler execution.
160 // Ensures the reactor runs periodically even when handlers are continuously
161 // posted, preventing timer starvation.
162 struct task_op final : scheduler_op
163 {
164 void operator()() override {}
165 void destroy() override {}
166 };
167 task_op task_op_;
168 };
169
170 } // namespace boost::corosio::detail
171
172 #endif // BOOST_COROSIO_HAS_EPOLL
173
174 #endif // BOOST_COROSIO_DETAIL_EPOLL_SCHEDULER_HPP
175