Line data Source code
1 : //
2 : // Copyright (c) 2026 Steve Gerbino
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/corosio
8 : //
9 :
10 : #ifndef BOOST_COROSIO_DETAIL_SELECT_SCHEDULER_HPP
11 : #define BOOST_COROSIO_DETAIL_SELECT_SCHEDULER_HPP
12 :
13 : #include <boost/corosio/detail/platform.hpp>
14 :
15 : #if BOOST_COROSIO_HAS_SELECT
16 :
17 : #include <boost/corosio/detail/config.hpp>
18 : #include <boost/corosio/detail/scheduler.hpp>
19 : #include <boost/capy/ex/execution_context.hpp>
20 :
21 : #include "src/detail/scheduler_op.hpp"
22 : #include "src/detail/timer_service.hpp"
23 :
24 : #include <sys/select.h>
25 :
26 : #include <atomic>
27 : #include <condition_variable>
28 : #include <cstddef>
29 : #include <mutex>
30 : #include <unordered_map>
31 :
32 : namespace boost::corosio::detail {
33 :
34 : struct select_op;
35 :
36 : /** POSIX scheduler using select() for I/O multiplexing.
37 :
38 : This scheduler implements the scheduler interface using the POSIX select()
39 : call for I/O event notification. It uses a single reactor model
40 : where one thread runs select() while other threads wait on a condition
41 : variable for handler work. This design provides:
42 :
43 : - Handler parallelism: N posted handlers can execute on N threads
44 : - No thundering herd: condition_variable wakes exactly one thread
45 : - Portability: Works on all POSIX systems
46 :
47 : The design mirrors epoll_scheduler for behavioral consistency:
48 : - Same single-reactor thread coordination model
49 : - Same work counting semantics
50 : - Same timer integration pattern
51 :
52 : Known Limitations:
53 : - FD_SETSIZE (~1024) limits maximum concurrent connections
54 : - O(n) scanning: rebuilds fd_sets each iteration
55 : - Level-triggered only (no edge-triggered mode)
56 :
57 : @par Thread Safety
58 : All public member functions are thread-safe.
59 : */
60 : class select_scheduler
61 : : public scheduler
62 : , public capy::execution_context::service
63 : {
64 : public:
65 : using key_type = scheduler;
66 :
67 : /** Construct the scheduler.
68 :
69 : Creates a self-pipe for reactor interruption.
70 :
71 : @param ctx Reference to the owning execution_context.
72 : @param concurrency_hint Hint for expected thread count (unused).
73 : */
74 : select_scheduler(
75 : capy::execution_context& ctx,
76 : int concurrency_hint = -1);
77 :
78 : ~select_scheduler();
79 :
80 : select_scheduler(select_scheduler const&) = delete;
81 : select_scheduler& operator=(select_scheduler const&) = delete;
82 :
83 : void shutdown() override;
84 : void post(capy::coro h) const override;
85 : void post(scheduler_op* h) const override;
86 : void on_work_started() noexcept override;
87 : void on_work_finished() noexcept override;
88 : bool running_in_this_thread() const noexcept override;
89 : void stop() override;
90 : bool stopped() const noexcept override;
91 : void restart() override;
92 : std::size_t run() override;
93 : std::size_t run_one() override;
94 : std::size_t wait_one(long usec) override;
95 : std::size_t poll() override;
96 : std::size_t poll_one() override;
97 :
98 : /** Return the maximum file descriptor value supported.
99 :
100 : Returns FD_SETSIZE - 1, the maximum fd value that can be
101 : monitored by select(). Operations with fd >= FD_SETSIZE
102 : will fail with EINVAL.
103 :
104 : @return The maximum supported file descriptor value.
105 : */
106 : static constexpr int max_fd() noexcept { return FD_SETSIZE - 1; }
107 :
108 : /** Register a file descriptor for monitoring.
109 :
110 : @param fd The file descriptor to register.
111 : @param op The operation associated with this fd.
112 : @param events Event mask: 1 = read, 2 = write, 3 = both.
113 : */
114 : void register_fd(int fd, select_op* op, int events) const;
115 :
116 : /** Unregister a file descriptor from monitoring.
117 :
118 : @param fd The file descriptor to unregister.
119 : @param events Event mask to remove: 1 = read, 2 = write, 3 = both.
120 : */
121 : void deregister_fd(int fd, int events) const;
122 :
123 : /** For use by I/O operations to track pending work. */
124 : void work_started() const noexcept override;
125 :
126 : /** For use by I/O operations to track completed work. */
127 : void work_finished() const noexcept override;
128 :
129 : // Event flags for register_fd/deregister_fd
130 : static constexpr int event_read = 1;
131 : static constexpr int event_write = 2;
132 :
133 : private:
134 : std::size_t do_one(long timeout_us);
135 : void run_reactor(std::unique_lock<std::mutex>& lock);
136 : void wake_one_thread_and_unlock(std::unique_lock<std::mutex>& lock) const;
137 : void interrupt_reactor() const;
138 : long calculate_timeout(long requested_timeout_us) const;
139 :
140 : // Self-pipe for interrupting select()
141 : int pipe_fds_[2]; // [0]=read, [1]=write
142 :
143 : mutable std::mutex mutex_;
144 : mutable std::condition_variable wakeup_event_;
145 : mutable op_queue completed_ops_;
146 : mutable std::atomic<long> outstanding_work_;
147 : std::atomic<bool> stopped_;
148 : bool shutdown_;
149 : timer_service* timer_svc_ = nullptr;
150 :
151 : // Per-fd state for tracking registered operations
152 : struct fd_state
153 : {
154 : select_op* read_op = nullptr;
155 : select_op* write_op = nullptr;
156 : };
157 : mutable std::unordered_map<int, fd_state> registered_fds_;
158 : mutable int max_fd_ = -1;
159 :
160 : // Single reactor thread coordination
161 : mutable bool reactor_running_ = false;
162 : mutable bool reactor_interrupted_ = false;
163 : mutable int idle_thread_count_ = 0;
164 :
165 : // Sentinel operation for interleaving reactor runs with handler execution.
166 : // Ensures the reactor runs periodically even when handlers are continuously
167 : // posted, preventing timer starvation.
168 : struct task_op final : scheduler_op
169 : {
170 0 : void operator()() override {}
171 0 : void destroy() override {}
172 : };
173 : task_op task_op_;
174 : };
175 :
176 : } // namespace boost::corosio::detail
177 :
178 : #endif // BOOST_COROSIO_HAS_SELECT
179 :
180 : #endif // BOOST_COROSIO_DETAIL_SELECT_SCHEDULER_HPP
|