libs/corosio/src/corosio/src/detail/epoll/scheduler.cpp

77.6% Lines (294/379) 91.2% Functions (31/34) 66.8% Branches (153/229)
libs/corosio/src/corosio/src/detail/epoll/scheduler.cpp
Line Branch Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #include <boost/corosio/detail/platform.hpp>
11
12 #if BOOST_COROSIO_HAS_EPOLL
13
14 #include "src/detail/epoll/scheduler.hpp"
15 #include "src/detail/epoll/op.hpp"
16 #include "src/detail/make_err.hpp"
17 #include "src/detail/posix/resolver_service.hpp"
18 #include "src/detail/posix/signals.hpp"
19
20 #include <boost/corosio/detail/except.hpp>
21 #include <boost/corosio/detail/thread_local_ptr.hpp>
22
23 #include <atomic>
24 #include <chrono>
25 #include <limits>
26
27 #include <errno.h>
28 #include <fcntl.h>
29 #include <sys/epoll.h>
30 #include <sys/eventfd.h>
31 #include <sys/socket.h>
32 #include <sys/timerfd.h>
33 #include <unistd.h>
34
35 /*
36 epoll Scheduler - Single Reactor Model
37 ======================================
38
39 This scheduler uses a thread coordination strategy to provide handler
40 parallelism and avoid the thundering herd problem.
41 Instead of all threads blocking on epoll_wait(), one thread becomes the
42 "reactor" while others wait on a condition variable for handler work.
43
44 Thread Model
45 ------------
46 - ONE thread runs epoll_wait() at a time (the reactor thread)
47 - OTHER threads wait on wakeup_event_ (condition variable) for handlers
48 - When work is posted, exactly one waiting thread wakes via notify_one()
49 - This matches Windows IOCP semantics where N posted items wake N threads
50
51 Event Loop Structure (do_one)
52 -----------------------------
53 1. Lock mutex, try to pop handler from queue
54 2. If got handler: execute it (unlocked), return
55 3. If queue empty and no reactor running: become reactor
56 - Run epoll_wait (unlocked), queue I/O completions, loop back
57 4. If queue empty and reactor running: wait on condvar for work
58
59 The reactor_running_ flag ensures only one thread owns epoll_wait().
60 After the reactor queues I/O completions, it loops back to try getting
61 a handler, giving priority to handler execution over more I/O polling.
62
63 Wake Coordination (wake_one_thread_and_unlock)
64 ----------------------------------------------
65 When posting work:
66 - If idle threads exist: notify_one() wakes exactly one worker
67 - Else if reactor running: interrupt via eventfd write
68 - Else: no-op (thread will find work when it checks queue)
69
70 This is critical for matching IOCP behavior. With the old model, posting
71 N handlers would wake all threads (thundering herd). Now each post()
72 wakes at most one thread, and that thread handles exactly one item.
73
74 Work Counting
75 -------------
76 outstanding_work_ tracks pending operations. When it hits zero, run()
77 returns. Each operation increments on start, decrements on completion.
78
79 Timer Integration
80 -----------------
81 Timers are handled by timer_service. The reactor adjusts epoll_wait
82 timeout to wake for the nearest timer expiry. When a new timer is
83 scheduled earlier than current, timer_service calls interrupt_reactor()
84 to re-evaluate the timeout.
85 */
86
87 namespace boost::corosio::detail {
88
89 namespace {
90
91 struct scheduler_context
92 {
93 epoll_scheduler const* key;
94 scheduler_context* next;
95 };
96
97 corosio::detail::thread_local_ptr<scheduler_context> context_stack;
98
99 struct thread_context_guard
100 {
101 scheduler_context frame_;
102
103 154 explicit thread_context_guard(
104 epoll_scheduler const* ctx) noexcept
105 154 : frame_{ctx, context_stack.get()}
106 {
107 154 context_stack.set(&frame_);
108 154 }
109
110 154 ~thread_context_guard() noexcept
111 {
112 154 context_stack.set(frame_.next);
113 154 }
114 };
115
116 } // namespace
117
118 184 epoll_scheduler::
119 epoll_scheduler(
120 capy::execution_context& ctx,
121 184 int)
122 184 : epoll_fd_(-1)
123 184 , event_fd_(-1)
124 184 , timer_fd_(-1)
125 184 , outstanding_work_(0)
126 184 , stopped_(false)
127 184 , shutdown_(false)
128 184 , reactor_running_(false)
129 184 , reactor_interrupted_(false)
130 368 , idle_thread_count_(0)
131 {
132 184 epoll_fd_ = ::epoll_create1(EPOLL_CLOEXEC);
133
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 184 times.
184 if (epoll_fd_ < 0)
134 detail::throw_system_error(make_err(errno), "epoll_create1");
135
136 184 event_fd_ = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
137
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 184 times.
184 if (event_fd_ < 0)
138 {
139 int errn = errno;
140 ::close(epoll_fd_);
141 detail::throw_system_error(make_err(errn), "eventfd");
142 }
143
144 184 timer_fd_ = ::timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC);
145
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 184 times.
184 if (timer_fd_ < 0)
146 {
147 int errn = errno;
148 ::close(event_fd_);
149 ::close(epoll_fd_);
150 detail::throw_system_error(make_err(errn), "timerfd_create");
151 }
152
153 184 epoll_event ev{};
154 184 ev.events = EPOLLIN | EPOLLET;
155 184 ev.data.ptr = nullptr;
156
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 184 times.
184 if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, event_fd_, &ev) < 0)
157 {
158 int errn = errno;
159 ::close(timer_fd_);
160 ::close(event_fd_);
161 ::close(epoll_fd_);
162 detail::throw_system_error(make_err(errn), "epoll_ctl");
163 }
164
165 184 epoll_event timer_ev{};
166 184 timer_ev.events = EPOLLIN | EPOLLERR;
167 184 timer_ev.data.ptr = &timer_fd_;
168
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 184 times.
184 if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, timer_fd_, &timer_ev) < 0)
169 {
170 int errn = errno;
171 ::close(timer_fd_);
172 ::close(event_fd_);
173 ::close(epoll_fd_);
174 detail::throw_system_error(make_err(errn), "epoll_ctl (timerfd)");
175 }
176
177
1/1
✓ Branch 1 taken 184 times.
184 timer_svc_ = &get_timer_service(ctx, *this);
178
1/1
✓ Branch 3 taken 184 times.
184 timer_svc_->set_on_earliest_changed(
179 timer_service::callback(
180 this,
181 2881 [](void* p) { static_cast<epoll_scheduler*>(p)->update_timerfd(); }));
182
183 // Initialize resolver service
184
1/1
✓ Branch 1 taken 184 times.
184 get_resolver_service(ctx, *this);
185
186 // Initialize signal service
187
1/1
✓ Branch 1 taken 184 times.
184 get_signal_service(ctx, *this);
188
189 // Push task sentinel to interleave reactor runs with handler execution
190 184 completed_ops_.push(&task_op_);
191 184 }
192
193 368 epoll_scheduler::
194 184 ~epoll_scheduler()
195 {
196
1/2
✓ Branch 0 taken 184 times.
✗ Branch 1 not taken.
184 if (timer_fd_ >= 0)
197 184 ::close(timer_fd_);
198
1/2
✓ Branch 0 taken 184 times.
✗ Branch 1 not taken.
184 if (event_fd_ >= 0)
199 184 ::close(event_fd_);
200
1/2
✓ Branch 0 taken 184 times.
✗ Branch 1 not taken.
184 if (epoll_fd_ >= 0)
201 184 ::close(epoll_fd_);
202 368 }
203
204 void
205 184 epoll_scheduler::
206 shutdown()
207 {
208 {
209
1/1
✓ Branch 1 taken 184 times.
184 std::unique_lock lock(mutex_);
210 184 shutdown_ = true;
211
212
2/2
✓ Branch 1 taken 184 times.
✓ Branch 2 taken 184 times.
368 while (auto* h = completed_ops_.pop())
213 {
214
1/2
✓ Branch 0 taken 184 times.
✗ Branch 1 not taken.
184 if (h == &task_op_)
215 184 continue;
216 lock.unlock();
217 h->destroy();
218 lock.lock();
219 184 }
220 184 }
221
222 184 outstanding_work_.store(0, std::memory_order_release);
223
224
1/2
✓ Branch 0 taken 184 times.
✗ Branch 1 not taken.
184 if (event_fd_ >= 0)
225 184 interrupt_reactor();
226
227 184 wakeup_event_.notify_all();
228 184 }
229
230 void
231 1639 epoll_scheduler::
232 post(capy::coro h) const
233 {
234 struct post_handler final
235 : scheduler_op
236 {
237 capy::coro h_;
238
239 explicit
240 1639 post_handler(capy::coro h)
241 1639 : h_(h)
242 {
243 1639 }
244
245 3278 ~post_handler() = default;
246
247 1639 void operator()() override
248 {
249 1639 auto h = h_;
250
1/2
✓ Branch 0 taken 1639 times.
✗ Branch 1 not taken.
1639 delete this;
251 std::atomic_thread_fence(std::memory_order_acquire);
252
1/1
✓ Branch 1 taken 1639 times.
1639 h.resume();
253 1639 }
254
255 void destroy() override
256 {
257 delete this;
258 }
259 };
260
261
1/1
✓ Branch 1 taken 1639 times.
1639 auto ph = std::make_unique<post_handler>(h);
262 1639 outstanding_work_.fetch_add(1, std::memory_order_relaxed);
263
264
1/1
✓ Branch 1 taken 1639 times.
1639 std::unique_lock lock(mutex_);
265 1639 completed_ops_.push(ph.release());
266
1/1
✓ Branch 1 taken 1639 times.
1639 wake_one_thread_and_unlock(lock);
267 1639 }
268
269 void
270 112592 epoll_scheduler::
271 post(scheduler_op* h) const
272 {
273 112592 outstanding_work_.fetch_add(1, std::memory_order_relaxed);
274
275
1/1
✓ Branch 1 taken 112592 times.
112592 std::unique_lock lock(mutex_);
276 112592 completed_ops_.push(h);
277
1/1
✓ Branch 1 taken 112592 times.
112592 wake_one_thread_and_unlock(lock);
278 112592 }
279
280 void
281 2905 epoll_scheduler::
282 on_work_started() noexcept
283 {
284 2905 outstanding_work_.fetch_add(1, std::memory_order_relaxed);
285 2905 }
286
287 void
288 2873 epoll_scheduler::
289 on_work_finished() noexcept
290 {
291
2/2
✓ Branch 0 taken 18 times.
✓ Branch 1 taken 2855 times.
5746 if (outstanding_work_.fetch_sub(1, std::memory_order_acq_rel) == 1)
292 18 stop();
293 2873 }
294
295 bool
296 3120 epoll_scheduler::
297 running_in_this_thread() const noexcept
298 {
299
2/2
✓ Branch 1 taken 2910 times.
✓ Branch 2 taken 210 times.
3120 for (auto* c = context_stack.get(); c != nullptr; c = c->next)
300
1/2
✓ Branch 0 taken 2910 times.
✗ Branch 1 not taken.
2910 if (c->key == this)
301 2910 return true;
302 210 return false;
303 }
304
305 void
306 37 epoll_scheduler::
307 stop()
308 {
309 37 bool expected = false;
310
1/2
✓ Branch 1 taken 37 times.
✗ Branch 2 not taken.
37 if (stopped_.compare_exchange_strong(expected, true,
311 std::memory_order_release, std::memory_order_relaxed))
312 {
313 // Wake all threads so they notice stopped_ and exit
314 {
315
1/1
✓ Branch 1 taken 37 times.
37 std::lock_guard lock(mutex_);
316 37 wakeup_event_.notify_all();
317 37 }
318
1/1
✓ Branch 1 taken 37 times.
37 interrupt_reactor();
319 }
320 37 }
321
322 bool
323 16 epoll_scheduler::
324 stopped() const noexcept
325 {
326 16 return stopped_.load(std::memory_order_acquire);
327 }
328
329 void
330 49 epoll_scheduler::
331 restart()
332 {
333 49 stopped_.store(false, std::memory_order_release);
334 49 }
335
336 std::size_t
337 175 epoll_scheduler::
338 run()
339 {
340
2/2
✓ Branch 1 taken 21 times.
✓ Branch 2 taken 154 times.
175 if (stopped_.load(std::memory_order_acquire))
341 21 return 0;
342
343
2/2
✓ Branch 1 taken 11 times.
✓ Branch 2 taken 143 times.
308 if (outstanding_work_.load(std::memory_order_acquire) == 0)
344 {
345
1/1
✓ Branch 1 taken 11 times.
11 stop();
346 11 return 0;
347 }
348
349 143 thread_context_guard ctx(this);
350
351 143 std::size_t n = 0;
352
3/3
✓ Branch 1 taken 119654 times.
✓ Branch 3 taken 119511 times.
✓ Branch 4 taken 143 times.
119654 while (do_one(-1))
353
1/2
✓ Branch 1 taken 119511 times.
✗ Branch 2 not taken.
119511 if (n != (std::numeric_limits<std::size_t>::max)())
354 119511 ++n;
355 143 return n;
356 143 }
357
358 std::size_t
359 2 epoll_scheduler::
360 run_one()
361 {
362
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 2 times.
2 if (stopped_.load(std::memory_order_acquire))
363 return 0;
364
365
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 2 times.
4 if (outstanding_work_.load(std::memory_order_acquire) == 0)
366 {
367 stop();
368 return 0;
369 }
370
371 2 thread_context_guard ctx(this);
372
1/1
✓ Branch 1 taken 2 times.
2 return do_one(-1);
373 2 }
374
375 std::size_t
376 10 epoll_scheduler::
377 wait_one(long usec)
378 {
379
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 10 times.
10 if (stopped_.load(std::memory_order_acquire))
380 return 0;
381
382
2/2
✓ Branch 1 taken 4 times.
✓ Branch 2 taken 6 times.
20 if (outstanding_work_.load(std::memory_order_acquire) == 0)
383 {
384
1/1
✓ Branch 1 taken 4 times.
4 stop();
385 4 return 0;
386 }
387
388 6 thread_context_guard ctx(this);
389
1/1
✓ Branch 1 taken 6 times.
6 return do_one(usec);
390 6 }
391
392 std::size_t
393 2 epoll_scheduler::
394 poll()
395 {
396
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 2 times.
2 if (stopped_.load(std::memory_order_acquire))
397 return 0;
398
399
2/2
✓ Branch 1 taken 1 time.
✓ Branch 2 taken 1 time.
4 if (outstanding_work_.load(std::memory_order_acquire) == 0)
400 {
401
1/1
✓ Branch 1 taken 1 time.
1 stop();
402 1 return 0;
403 }
404
405 1 thread_context_guard ctx(this);
406
407 1 std::size_t n = 0;
408
3/3
✓ Branch 1 taken 3 times.
✓ Branch 3 taken 2 times.
✓ Branch 4 taken 1 time.
3 while (do_one(0))
409
1/2
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
2 if (n != (std::numeric_limits<std::size_t>::max)())
410 2 ++n;
411 1 return n;
412 1 }
413
414 std::size_t
415 4 epoll_scheduler::
416 poll_one()
417 {
418
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 4 times.
4 if (stopped_.load(std::memory_order_acquire))
419 return 0;
420
421
2/2
✓ Branch 1 taken 2 times.
✓ Branch 2 taken 2 times.
8 if (outstanding_work_.load(std::memory_order_acquire) == 0)
422 {
423
1/1
✓ Branch 1 taken 2 times.
2 stop();
424 2 return 0;
425 }
426
427 2 thread_context_guard ctx(this);
428
1/1
✓ Branch 1 taken 2 times.
2 return do_one(0);
429 2 }
430
431 void
432 5353 epoll_scheduler::
433 register_descriptor(int fd, descriptor_data* desc) const
434 {
435 5353 epoll_event ev{};
436 5353 ev.events = EPOLLIN | EPOLLOUT | EPOLLET | EPOLLERR | EPOLLHUP;
437 5353 ev.data.ptr = desc;
438
439
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 5353 times.
5353 if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &ev) < 0)
440 detail::throw_system_error(make_err(errno), "epoll_ctl (register)");
441
442 5353 desc->registered_events = ev.events;
443 5353 desc->is_registered = true;
444 5353 desc->fd = fd;
445 5353 desc->read_ready.store(false, std::memory_order_relaxed);
446 5353 desc->write_ready.store(false, std::memory_order_relaxed);
447 5353 }
448
449 void
450 epoll_scheduler::
451 update_descriptor_events(int, descriptor_data*, std::uint32_t) const
452 {
453 // Provides memory fence for operation pointer visibility across threads
454 std::atomic_thread_fence(std::memory_order_seq_cst);
455 }
456
457 void
458 5353 epoll_scheduler::
459 deregister_descriptor(int fd) const
460 {
461 5353 ::epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, nullptr);
462 5353 }
463
464 void
465 5460 epoll_scheduler::
466 work_started() const noexcept
467 {
468 5460 outstanding_work_.fetch_add(1, std::memory_order_relaxed);
469 5460 }
470
471 void
472 119723 epoll_scheduler::
473 work_finished() const noexcept
474 {
475
2/2
✓ Branch 0 taken 130 times.
✓ Branch 1 taken 119593 times.
239446 if (outstanding_work_.fetch_sub(1, std::memory_order_acq_rel) == 1)
476 {
477 // Last work item completed - wake all threads so they can exit.
478 // notify_all() wakes threads waiting on the condvar.
479 // interrupt_reactor() wakes the reactor thread blocked in epoll_wait().
480 // Both are needed because they target different blocking mechanisms.
481 130 std::unique_lock lock(mutex_);
482 130 wakeup_event_.notify_all();
483
3/4
✓ Branch 0 taken 2 times.
✓ Branch 1 taken 128 times.
✓ Branch 2 taken 2 times.
✗ Branch 3 not taken.
130 if (reactor_running_ && !reactor_interrupted_)
484 {
485 2 reactor_interrupted_ = true;
486 2 lock.unlock();
487 2 interrupt_reactor();
488 }
489 130 }
490 119723 }
491
492 void
493 462 epoll_scheduler::
494 interrupt_reactor() const
495 {
496 // Only write if not already armed to avoid redundant writes
497 462 bool expected = false;
498
2/2
✓ Branch 1 taken 425 times.
✓ Branch 2 taken 37 times.
462 if (eventfd_armed_.compare_exchange_strong(expected, true,
499 std::memory_order_release, std::memory_order_relaxed))
500 {
501 425 std::uint64_t val = 1;
502
1/1
✓ Branch 1 taken 425 times.
425 [[maybe_unused]] auto r = ::write(event_fd_, &val, sizeof(val));
503 }
504 462 }
505
506 void
507 114231 epoll_scheduler::
508 wake_one_thread_and_unlock(std::unique_lock<std::mutex>& lock) const
509 {
510
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 114231 times.
114231 if (idle_thread_count_ > 0)
511 {
512 wakeup_event_.notify_one();
513 lock.unlock();
514 }
515
4/4
✓ Branch 0 taken 286 times.
✓ Branch 1 taken 113945 times.
✓ Branch 2 taken 239 times.
✓ Branch 3 taken 47 times.
114231 else if (reactor_running_ && !reactor_interrupted_)
516 {
517 239 reactor_interrupted_ = true;
518 239 lock.unlock();
519 239 interrupt_reactor();
520 }
521 else
522 {
523 113992 lock.unlock();
524 }
525 114231 }
526
527 struct work_guard
528 {
529 epoll_scheduler const* self;
530 ~work_guard() { self->work_finished(); }
531 };
532
533 void
534 51235 epoll_scheduler::
535 update_timerfd() const
536 {
537 51235 auto nearest = timer_svc_->nearest_expiry();
538
539 51235 itimerspec ts{};
540 51235 int flags = 0;
541
542
3/3
✓ Branch 2 taken 51235 times.
✓ Branch 4 taken 50904 times.
✓ Branch 5 taken 331 times.
51235 if (nearest == timer_service::time_point::max())
543 {
544 // No timers - disarm by setting to 0 (relative)
545 // ts is already zeroed
546 }
547 else
548 {
549 50904 auto now = std::chrono::steady_clock::now();
550
3/3
✓ Branch 1 taken 50904 times.
✓ Branch 4 taken 69 times.
✓ Branch 5 taken 50835 times.
50904 if (nearest <= now)
551 {
552 // Use 1ns instead of 0 - zero disarms the timerfd
553 69 ts.it_value.tv_nsec = 1;
554 }
555 else
556 {
557 50835 auto nsec = std::chrono::duration_cast<std::chrono::nanoseconds>(
558
1/1
✓ Branch 1 taken 50835 times.
101670 nearest - now).count();
559 50835 ts.it_value.tv_sec = nsec / 1000000000;
560 50835 ts.it_value.tv_nsec = nsec % 1000000000;
561 // Ensure non-zero to avoid disarming if duration rounds to 0
562
3/4
✓ Branch 0 taken 50825 times.
✓ Branch 1 taken 10 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 50825 times.
50835 if (ts.it_value.tv_sec == 0 && ts.it_value.tv_nsec == 0)
563 ts.it_value.tv_nsec = 1;
564 }
565 }
566
567
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 51235 times.
51235 if (::timerfd_settime(timer_fd_, flags, &ts, nullptr) < 0)
568 detail::throw_system_error(make_err(errno), "timerfd_settime");
569 51235 }
570
571 void
572 48354 epoll_scheduler::
573 run_reactor(std::unique_lock<std::mutex>& lock)
574 {
575
2/2
✓ Branch 0 taken 42743 times.
✓ Branch 1 taken 5611 times.
48354 int timeout_ms = reactor_interrupted_ ? 0 : -1;
576
577
1/1
✓ Branch 1 taken 48354 times.
48354 lock.unlock();
578
579 epoll_event events[128];
580
1/1
✓ Branch 1 taken 48354 times.
48354 int nfds = ::epoll_wait(epoll_fd_, events, 128, timeout_ms);
581 48354 int saved_errno = errno;
582
583
1/1
✓ Branch 1 taken 48354 times.
48354 timer_svc_->process_expired();
584
1/1
✓ Branch 1 taken 48354 times.
48354 update_timerfd();
585
586
1/4
✗ Branch 0 not taken.
✓ Branch 1 taken 48354 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
48354 if (nfds < 0 && saved_errno != EINTR)
587 detail::throw_system_error(make_err(saved_errno), "epoll_wait");
588
589
1/1
✓ Branch 1 taken 48354 times.
48354 lock.lock();
590
591 48354 int completions_queued = 0;
592
2/2
✓ Branch 0 taken 64694 times.
✓ Branch 1 taken 48354 times.
113048 for (int i = 0; i < nfds; ++i)
593 {
594
2/2
✓ Branch 0 taken 241 times.
✓ Branch 1 taken 64453 times.
64694 if (events[i].data.ptr == nullptr)
595 {
596 std::uint64_t val;
597
1/1
✓ Branch 1 taken 241 times.
241 [[maybe_unused]] auto r = ::read(event_fd_, &val, sizeof(val));
598 241 eventfd_armed_.store(false, std::memory_order_relaxed);
599 241 continue;
600 241 }
601
602 // timerfd_settime() in update_timerfd() resets the readable state
603
2/2
✓ Branch 0 taken 2870 times.
✓ Branch 1 taken 61583 times.
64453 if (events[i].data.ptr == &timer_fd_)
604 2870 continue;
605
606 61583 auto* desc = static_cast<descriptor_data*>(events[i].data.ptr);
607 61583 std::uint32_t ev = events[i].events;
608 61583 int err = 0;
609
610
2/2
✓ Branch 0 taken 46 times.
✓ Branch 1 taken 61537 times.
61583 if (ev & (EPOLLERR | EPOLLHUP))
611 {
612 46 socklen_t len = sizeof(err);
613
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 46 times.
46 if (::getsockopt(desc->fd, SOL_SOCKET, SO_ERROR, &err, &len) < 0)
614 err = errno;
615
1/2
✓ Branch 0 taken 46 times.
✗ Branch 1 not taken.
46 if (err == 0)
616 46 err = EIO;
617 }
618
619
2/2
✓ Branch 0 taken 18978 times.
✓ Branch 1 taken 42605 times.
61583 if (ev & EPOLLIN)
620 {
621 18978 auto* op = desc->read_op.exchange(nullptr, std::memory_order_acq_rel);
622
2/2
✓ Branch 0 taken 2691 times.
✓ Branch 1 taken 16287 times.
18978 if (op)
623 {
624
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2691 times.
2691 if (err)
625 {
626 op->complete(err, 0);
627 completed_ops_.push(op);
628 ++completions_queued;
629 }
630 else
631 {
632 2691 op->perform_io();
633
2/4
✓ Branch 0 taken 2691 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 2691 times.
2691 if (op->errn == EAGAIN || op->errn == EWOULDBLOCK)
634 {
635 op->errn = 0;
636 desc->read_op.store(op, std::memory_order_release);
637 }
638 else
639 {
640 2691 completed_ops_.push(op);
641 2691 ++completions_queued;
642 }
643 }
644 }
645 else
646 {
647 16287 desc->read_ready.store(true, std::memory_order_release);
648 }
649 }
650
651
2/2
✓ Branch 0 taken 58943 times.
✓ Branch 1 taken 2640 times.
61583 if (ev & EPOLLOUT)
652 {
653 // Connect uses write readiness - try it first
654 58943 auto* conn_op = desc->connect_op.exchange(nullptr, std::memory_order_acq_rel);
655
2/2
✓ Branch 0 taken 2600 times.
✓ Branch 1 taken 56343 times.
58943 if (conn_op)
656 {
657
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2600 times.
2600 if (err)
658 {
659 conn_op->complete(err, 0);
660 completed_ops_.push(conn_op);
661 ++completions_queued;
662 }
663 else
664 {
665 2600 conn_op->perform_io();
666
2/4
✓ Branch 0 taken 2600 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 2600 times.
2600 if (conn_op->errn == EAGAIN || conn_op->errn == EWOULDBLOCK)
667 {
668 conn_op->errn = 0;
669 desc->connect_op.store(conn_op, std::memory_order_release);
670 }
671 else
672 {
673 2600 completed_ops_.push(conn_op);
674 2600 ++completions_queued;
675 }
676 }
677 }
678
679 58943 auto* write_op = desc->write_op.exchange(nullptr, std::memory_order_acq_rel);
680
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 58943 times.
58943 if (write_op)
681 {
682 if (err)
683 {
684 write_op->complete(err, 0);
685 completed_ops_.push(write_op);
686 ++completions_queued;
687 }
688 else
689 {
690 write_op->perform_io();
691 if (write_op->errn == EAGAIN || write_op->errn == EWOULDBLOCK)
692 {
693 write_op->errn = 0;
694 desc->write_op.store(write_op, std::memory_order_release);
695 }
696 else
697 {
698 completed_ops_.push(write_op);
699 ++completions_queued;
700 }
701 }
702 }
703
704
3/4
✓ Branch 0 taken 56343 times.
✓ Branch 1 taken 2600 times.
✓ Branch 2 taken 56343 times.
✗ Branch 3 not taken.
58943 if (!conn_op && !write_op)
705 56343 desc->write_ready.store(true, std::memory_order_release);
706 }
707
708 // Handle error for ops not processed above (no EPOLLIN/EPOLLOUT)
709
3/4
✓ Branch 0 taken 46 times.
✓ Branch 1 taken 61537 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 46 times.
61583 if (err && !(ev & (EPOLLIN | EPOLLOUT)))
710 {
711 auto* read_op = desc->read_op.exchange(nullptr, std::memory_order_acq_rel);
712 if (read_op)
713 {
714 read_op->complete(err, 0);
715 completed_ops_.push(read_op);
716 ++completions_queued;
717 }
718
719 auto* write_op = desc->write_op.exchange(nullptr, std::memory_order_acq_rel);
720 if (write_op)
721 {
722 write_op->complete(err, 0);
723 completed_ops_.push(write_op);
724 ++completions_queued;
725 }
726
727 auto* conn_op = desc->connect_op.exchange(nullptr, std::memory_order_acq_rel);
728 if (conn_op)
729 {
730 conn_op->complete(err, 0);
731 completed_ops_.push(conn_op);
732 ++completions_queued;
733 }
734 }
735 }
736
737
2/2
✓ Branch 0 taken 2691 times.
✓ Branch 1 taken 45663 times.
48354 if (completions_queued > 0)
738 {
739
2/2
✓ Branch 0 taken 91 times.
✓ Branch 1 taken 2600 times.
2691 if (completions_queued == 1)
740 91 wakeup_event_.notify_one();
741 else
742 2600 wakeup_event_.notify_all();
743 }
744 48354 }
745
746 std::size_t
747 119667 epoll_scheduler::
748 do_one(long timeout_us)
749 {
750
1/1
✓ Branch 1 taken 119667 times.
119667 std::unique_lock lock(mutex_);
751
752 for (;;)
753 {
754
2/2
✓ Branch 1 taken 20 times.
✓ Branch 2 taken 168001 times.
168021 if (stopped_.load(std::memory_order_acquire))
755 20 return 0;
756
757 168001 scheduler_op* op = completed_ops_.pop();
758
759
2/2
✓ Branch 0 taken 48477 times.
✓ Branch 1 taken 119524 times.
168001 if (op == &task_op_)
760 {
761 48477 bool more_handlers = !completed_ops_.empty();
762
763
2/2
✓ Branch 0 taken 5734 times.
✓ Branch 1 taken 42743 times.
48477 if (!more_handlers)
764 {
765
2/2
✓ Branch 1 taken 123 times.
✓ Branch 2 taken 5611 times.
11468 if (outstanding_work_.load(std::memory_order_acquire) == 0)
766 {
767 123 completed_ops_.push(&task_op_);
768 123 return 0;
769 }
770
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 5611 times.
5611 if (timeout_us == 0)
771 {
772 completed_ops_.push(&task_op_);
773 return 0;
774 }
775 }
776
777
3/4
✓ Branch 0 taken 5611 times.
✓ Branch 1 taken 42743 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 5611 times.
48354 reactor_interrupted_ = more_handlers || timeout_us == 0;
778 48354 reactor_running_ = true;
779
780
3/4
✓ Branch 0 taken 42743 times.
✓ Branch 1 taken 5611 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 42743 times.
48354 if (more_handlers && idle_thread_count_ > 0)
781 wakeup_event_.notify_one();
782
783
1/1
✓ Branch 1 taken 48354 times.
48354 run_reactor(lock);
784
785 48354 reactor_running_ = false;
786 48354 completed_ops_.push(&task_op_);
787 48354 continue;
788 48354 }
789
790
2/2
✓ Branch 0 taken 119522 times.
✓ Branch 1 taken 2 times.
119524 if (op != nullptr)
791 {
792
1/1
✓ Branch 1 taken 119522 times.
119522 lock.unlock();
793 119522 work_guard g{this};
794
1/1
✓ Branch 1 taken 119522 times.
119522 (*op)();
795 119522 return 1;
796 119522 }
797
798
1/2
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
4 if (outstanding_work_.load(std::memory_order_acquire) == 0)
799 2 return 0;
800
801 if (timeout_us == 0)
802 return 0;
803
804 ++idle_thread_count_;
805 if (timeout_us < 0)
806 wakeup_event_.wait(lock);
807 else
808 wakeup_event_.wait_for(lock, std::chrono::microseconds(timeout_us));
809 --idle_thread_count_;
810 48354 }
811 119667 }
812
813 } // namespace boost::corosio::detail
814
815 #endif
816