libs/corosio/src/corosio/src/detail/epoll/scheduler.cpp

80.9% Lines (402/497) 89.6% Functions (43/48) 68.6% Branches (208/303)
libs/corosio/src/corosio/src/detail/epoll/scheduler.cpp
Line Branch Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #include <boost/corosio/detail/platform.hpp>
11
12 #if BOOST_COROSIO_HAS_EPOLL
13
14 #include "src/detail/epoll/scheduler.hpp"
15 #include "src/detail/epoll/op.hpp"
16 #include "src/detail/make_err.hpp"
17 #include "src/detail/posix/resolver_service.hpp"
18 #include "src/detail/posix/signals.hpp"
19
20 #include <boost/corosio/detail/except.hpp>
21 #include <boost/corosio/detail/thread_local_ptr.hpp>
22
23 #include <chrono>
24 #include <limits>
25 #include <utility>
26
27 #include <errno.h>
28 #include <fcntl.h>
29 #include <sys/epoll.h>
30 #include <sys/eventfd.h>
31 #include <sys/socket.h>
32 #include <sys/timerfd.h>
33 #include <unistd.h>
34
35 /*
36 epoll Scheduler - Single Reactor Model
37 ======================================
38
39 This scheduler uses a thread coordination strategy to provide handler
40 parallelism and avoid the thundering herd problem.
41 Instead of all threads blocking on epoll_wait(), one thread becomes the
42 "reactor" while others wait on a condition variable for handler work.
43
44 Thread Model
45 ------------
46 - ONE thread runs epoll_wait() at a time (the reactor thread)
47 - OTHER threads wait on cond_ (condition variable) for handlers
48 - When work is posted, exactly one waiting thread wakes via notify_one()
49 - This matches Windows IOCP semantics where N posted items wake N threads
50
51 Event Loop Structure (do_one)
52 -----------------------------
53 1. Lock mutex, try to pop handler from queue
54 2. If got handler: execute it (unlocked), return
55 3. If queue empty and no reactor running: become reactor
56 - Run epoll_wait (unlocked), queue I/O completions, loop back
57 4. If queue empty and reactor running: wait on condvar for work
58
59 The task_running_ flag ensures only one thread owns epoll_wait().
60 After the reactor queues I/O completions, it loops back to try getting
61 a handler, giving priority to handler execution over more I/O polling.
62
63 Signaling State (state_)
64 ------------------------
65 The state_ variable encodes two pieces of information:
66 - Bit 0: signaled flag (1 = signaled, persists until cleared)
67 - Upper bits: waiter count (each waiter adds 2 before blocking)
68
69 This allows efficient coordination:
70 - Signalers only call notify when waiters exist (state_ > 1)
71 - Waiters check if already signaled before blocking (fast-path)
72
73 Wake Coordination (wake_one_thread_and_unlock)
74 ----------------------------------------------
75 When posting work:
76 - If waiters exist (state_ > 1): signal and notify_one()
77 - Else if reactor running: interrupt via eventfd write
78 - Else: no-op (thread will find work when it checks queue)
79
80 This avoids waking threads unnecessarily. With cascading wakes,
81 each handler execution wakes at most one additional thread if
82 more work exists in the queue.
83
84 Work Counting
85 -------------
86 outstanding_work_ tracks pending operations. When it hits zero, run()
87 returns. Each operation increments on start, decrements on completion.
88
89 Timer Integration
90 -----------------
91 Timers are handled by timer_service. The reactor adjusts epoll_wait
92 timeout to wake for the nearest timer expiry. When a new timer is
93 scheduled earlier than current, timer_service calls interrupt_reactor()
94 to re-evaluate the timeout.
95 */
96
97 namespace boost::corosio::detail {
98
99 struct scheduler_context
100 {
101 epoll_scheduler const* key;
102 scheduler_context* next;
103 op_queue private_queue;
104 long private_outstanding_work;
105 int inline_budget;
106
107 158 scheduler_context(epoll_scheduler const* k, scheduler_context* n)
108 158 : key(k)
109 158 , next(n)
110 158 , private_outstanding_work(0)
111 158 , inline_budget(0)
112 {
113 158 }
114 };
115
116 namespace {
117
118 corosio::detail::thread_local_ptr<scheduler_context> context_stack;
119
120 struct thread_context_guard
121 {
122 scheduler_context frame_;
123
124 158 explicit thread_context_guard(
125 epoll_scheduler const* ctx) noexcept
126 158 : frame_(ctx, context_stack.get())
127 {
128 158 context_stack.set(&frame_);
129 158 }
130
131 158 ~thread_context_guard() noexcept
132 {
133
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 158 times.
158 if (!frame_.private_queue.empty())
134 frame_.key->drain_thread_queue(frame_.private_queue, frame_.private_outstanding_work);
135 158 context_stack.set(frame_.next);
136 158 }
137 };
138
139 scheduler_context*
140 468598 find_context(epoll_scheduler const* self) noexcept
141 {
142
2/2
✓ Branch 1 taken 466949 times.
✓ Branch 2 taken 1649 times.
468598 for (auto* c = context_stack.get(); c != nullptr; c = c->next)
143
1/2
✓ Branch 0 taken 466949 times.
✗ Branch 1 not taken.
466949 if (c->key == self)
144 466949 return c;
145 1649 return nullptr;
146 }
147
148 } // namespace
149
150 void
151 85616 epoll_scheduler::
152 reset_inline_budget() const noexcept
153 {
154
1/2
✓ Branch 1 taken 85616 times.
✗ Branch 2 not taken.
85616 if (auto* ctx = find_context(this))
155 85616 ctx->inline_budget = max_inline_budget_;
156 85616 }
157
158 bool
159 240366 epoll_scheduler::
160 try_consume_inline_budget() const noexcept
161 {
162
1/2
✓ Branch 1 taken 240366 times.
✗ Branch 2 not taken.
240366 if (auto* ctx = find_context(this))
163 {
164
2/2
✓ Branch 0 taken 160312 times.
✓ Branch 1 taken 80054 times.
240366 if (ctx->inline_budget > 0)
165 {
166 160312 --ctx->inline_budget;
167 160312 return true;
168 }
169 }
170 80054 return false;
171 }
172
173 void
174 60280 descriptor_state::
175 operator()()
176 {
177 60280 is_enqueued_.store(false, std::memory_order_relaxed);
178
179 // Take ownership of impl ref set by close_socket() to prevent
180 // the owning impl from being freed while we're executing
181 60280 auto prevent_impl_destruction = std::move(impl_ref_);
182
183 60280 std::uint32_t ev = ready_events_.exchange(0, std::memory_order_acquire);
184
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 60280 times.
60280 if (ev == 0)
185 {
186 scheduler_->compensating_work_started();
187 return;
188 }
189
190 60280 op_queue local_ops;
191
192 60280 int err = 0;
193
2/2
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 60279 times.
60280 if (ev & EPOLLERR)
194 {
195 1 socklen_t len = sizeof(err);
196
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 1 time.
1 if (::getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &len) < 0)
197 err = errno;
198
1/2
✓ Branch 0 taken 1 time.
✗ Branch 1 not taken.
1 if (err == 0)
199 1 err = EIO;
200 }
201
202 {
203
1/1
✓ Branch 1 taken 60280 times.
60280 std::lock_guard lock(mutex);
204
2/2
✓ Branch 0 taken 17514 times.
✓ Branch 1 taken 42766 times.
60280 if (ev & EPOLLIN)
205 {
206
2/2
✓ Branch 0 taken 2725 times.
✓ Branch 1 taken 14789 times.
17514 if (read_op)
207 {
208 2725 auto* rd = read_op;
209
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2725 times.
2725 if (err)
210 rd->complete(err, 0);
211 else
212 2725 rd->perform_io();
213
214
2/4
✓ Branch 0 taken 2725 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 2725 times.
2725 if (rd->errn == EAGAIN || rd->errn == EWOULDBLOCK)
215 {
216 rd->errn = 0;
217 }
218 else
219 {
220 2725 read_op = nullptr;
221 2725 local_ops.push(rd);
222 }
223 }
224 else
225 {
226 14789 read_ready = true;
227 }
228 }
229
2/2
✓ Branch 0 taken 57606 times.
✓ Branch 1 taken 2674 times.
60280 if (ev & EPOLLOUT)
230 {
231
3/4
✓ Branch 0 taken 54929 times.
✓ Branch 1 taken 2677 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 54929 times.
57606 bool had_write_op = (connect_op || write_op);
232
2/2
✓ Branch 0 taken 2677 times.
✓ Branch 1 taken 54929 times.
57606 if (connect_op)
233 {
234 2677 auto* cn = connect_op;
235
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2677 times.
2677 if (err)
236 cn->complete(err, 0);
237 else
238 2677 cn->perform_io();
239 2677 connect_op = nullptr;
240 2677 local_ops.push(cn);
241 }
242
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 57606 times.
57606 if (write_op)
243 {
244 auto* wr = write_op;
245 if (err)
246 wr->complete(err, 0);
247 else
248 wr->perform_io();
249
250 if (wr->errn == EAGAIN || wr->errn == EWOULDBLOCK)
251 {
252 wr->errn = 0;
253 }
254 else
255 {
256 write_op = nullptr;
257 local_ops.push(wr);
258 }
259 }
260
2/2
✓ Branch 0 taken 54929 times.
✓ Branch 1 taken 2677 times.
57606 if (!had_write_op)
261 54929 write_ready = true;
262 }
263
2/2
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 60279 times.
60280 if (err)
264 {
265
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 time.
1 if (read_op)
266 {
267 read_op->complete(err, 0);
268 local_ops.push(std::exchange(read_op, nullptr));
269 }
270
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 time.
1 if (write_op)
271 {
272 write_op->complete(err, 0);
273 local_ops.push(std::exchange(write_op, nullptr));
274 }
275
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 time.
1 if (connect_op)
276 {
277 connect_op->complete(err, 0);
278 local_ops.push(std::exchange(connect_op, nullptr));
279 }
280 }
281 60280 }
282
283 // Execute first handler inline — the scheduler's work_cleanup
284 // accounts for this as the "consumed" work item
285 60280 scheduler_op* first = local_ops.pop();
286
2/2
✓ Branch 0 taken 5402 times.
✓ Branch 1 taken 54878 times.
60280 if (first)
287 {
288
1/1
✓ Branch 1 taken 5402 times.
5402 scheduler_->post_deferred_completions(local_ops);
289
1/1
✓ Branch 1 taken 5402 times.
5402 (*first)();
290 }
291 else
292 {
293 54878 scheduler_->compensating_work_started();
294 }
295 60280 }
296
297 189 epoll_scheduler::
298 epoll_scheduler(
299 capy::execution_context& ctx,
300 189 int)
301 189 : epoll_fd_(-1)
302 189 , event_fd_(-1)
303 189 , timer_fd_(-1)
304 189 , outstanding_work_(0)
305 189 , stopped_(false)
306 189 , shutdown_(false)
307 189 , task_running_{false}
308 189 , task_interrupted_(false)
309 378 , state_(0)
310 {
311 189 epoll_fd_ = ::epoll_create1(EPOLL_CLOEXEC);
312
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 189 times.
189 if (epoll_fd_ < 0)
313 detail::throw_system_error(make_err(errno), "epoll_create1");
314
315 189 event_fd_ = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
316
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 189 times.
189 if (event_fd_ < 0)
317 {
318 int errn = errno;
319 ::close(epoll_fd_);
320 detail::throw_system_error(make_err(errn), "eventfd");
321 }
322
323 189 timer_fd_ = ::timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC);
324
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 189 times.
189 if (timer_fd_ < 0)
325 {
326 int errn = errno;
327 ::close(event_fd_);
328 ::close(epoll_fd_);
329 detail::throw_system_error(make_err(errn), "timerfd_create");
330 }
331
332 189 epoll_event ev{};
333 189 ev.events = EPOLLIN | EPOLLET;
334 189 ev.data.ptr = nullptr;
335
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 189 times.
189 if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, event_fd_, &ev) < 0)
336 {
337 int errn = errno;
338 ::close(timer_fd_);
339 ::close(event_fd_);
340 ::close(epoll_fd_);
341 detail::throw_system_error(make_err(errn), "epoll_ctl");
342 }
343
344 189 epoll_event timer_ev{};
345 189 timer_ev.events = EPOLLIN | EPOLLERR;
346 189 timer_ev.data.ptr = &timer_fd_;
347
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 189 times.
189 if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, timer_fd_, &timer_ev) < 0)
348 {
349 int errn = errno;
350 ::close(timer_fd_);
351 ::close(event_fd_);
352 ::close(epoll_fd_);
353 detail::throw_system_error(make_err(errn), "epoll_ctl (timerfd)");
354 }
355
356
1/1
✓ Branch 1 taken 189 times.
189 timer_svc_ = &get_timer_service(ctx, *this);
357
1/1
✓ Branch 3 taken 189 times.
189 timer_svc_->set_on_earliest_changed(
358 timer_service::callback(
359 this,
360 [](void* p) {
361 2869 auto* self = static_cast<epoll_scheduler*>(p);
362 2869 self->timerfd_stale_.store(true, std::memory_order_release);
363
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 2869 times.
2869 if (self->task_running_.load(std::memory_order_acquire))
364 self->interrupt_reactor();
365 2869 }));
366
367 // Initialize resolver service
368
1/1
✓ Branch 1 taken 189 times.
189 get_resolver_service(ctx, *this);
369
370 // Initialize signal service
371
1/1
✓ Branch 1 taken 189 times.
189 get_signal_service(ctx, *this);
372
373 // Push task sentinel to interleave reactor runs with handler execution
374 189 completed_ops_.push(&task_op_);
375 189 }
376
377 378 epoll_scheduler::
378 189 ~epoll_scheduler()
379 {
380
1/2
✓ Branch 0 taken 189 times.
✗ Branch 1 not taken.
189 if (timer_fd_ >= 0)
381 189 ::close(timer_fd_);
382
1/2
✓ Branch 0 taken 189 times.
✗ Branch 1 not taken.
189 if (event_fd_ >= 0)
383 189 ::close(event_fd_);
384
1/2
✓ Branch 0 taken 189 times.
✗ Branch 1 not taken.
189 if (epoll_fd_ >= 0)
385 189 ::close(epoll_fd_);
386 378 }
387
388 void
389 189 epoll_scheduler::
390 shutdown()
391 {
392 {
393
1/1
✓ Branch 1 taken 189 times.
189 std::unique_lock lock(mutex_);
394 189 shutdown_ = true;
395
396
2/2
✓ Branch 1 taken 189 times.
✓ Branch 2 taken 189 times.
378 while (auto* h = completed_ops_.pop())
397 {
398
1/2
✓ Branch 0 taken 189 times.
✗ Branch 1 not taken.
189 if (h == &task_op_)
399 189 continue;
400 lock.unlock();
401 h->destroy();
402 lock.lock();
403 189 }
404
405 189 signal_all(lock);
406 189 }
407
408 189 outstanding_work_.store(0, std::memory_order_release);
409
410
1/2
✓ Branch 0 taken 189 times.
✗ Branch 1 not taken.
189 if (event_fd_ >= 0)
411 189 interrupt_reactor();
412 189 }
413
414 void
415 4612 epoll_scheduler::
416 post(std::coroutine_handle<> h) const
417 {
418 struct post_handler final
419 : scheduler_op
420 {
421 std::coroutine_handle<> h_;
422
423 explicit
424 4612 post_handler(std::coroutine_handle<> h)
425 4612 : h_(h)
426 {
427 4612 }
428
429 9224 ~post_handler() = default;
430
431 4612 void operator()() override
432 {
433 4612 auto h = h_;
434
1/2
✓ Branch 0 taken 4612 times.
✗ Branch 1 not taken.
4612 delete this;
435
1/1
✓ Branch 1 taken 4612 times.
4612 h.resume();
436 4612 }
437
438 void destroy() override
439 {
440 delete this;
441 }
442 };
443
444
1/1
✓ Branch 1 taken 4612 times.
4612 auto ph = std::make_unique<post_handler>(h);
445
446 // Fast path: same thread posts to private queue
447 // Only count locally; work_cleanup batches to global counter
448
2/2
✓ Branch 1 taken 2989 times.
✓ Branch 2 taken 1623 times.
4612 if (auto* ctx = find_context(this))
449 {
450 2989 ++ctx->private_outstanding_work;
451 2989 ctx->private_queue.push(ph.release());
452 2989 return;
453 }
454
455 // Slow path: cross-thread post requires mutex
456 1623 outstanding_work_.fetch_add(1, std::memory_order_relaxed);
457
458
1/1
✓ Branch 1 taken 1623 times.
1623 std::unique_lock lock(mutex_);
459 1623 completed_ops_.push(ph.release());
460
1/1
✓ Branch 1 taken 1623 times.
1623 wake_one_thread_and_unlock(lock);
461 4612 }
462
463 void
464 83126 epoll_scheduler::
465 post(scheduler_op* h) const
466 {
467 // Fast path: same thread posts to private queue
468 // Only count locally; work_cleanup batches to global counter
469
2/2
✓ Branch 1 taken 83100 times.
✓ Branch 2 taken 26 times.
83126 if (auto* ctx = find_context(this))
470 {
471 83100 ++ctx->private_outstanding_work;
472 83100 ctx->private_queue.push(h);
473 83100 return;
474 }
475
476 // Slow path: cross-thread post requires mutex
477 26 outstanding_work_.fetch_add(1, std::memory_order_relaxed);
478
479
1/1
✓ Branch 1 taken 26 times.
26 std::unique_lock lock(mutex_);
480 26 completed_ops_.push(h);
481
1/1
✓ Branch 1 taken 26 times.
26 wake_one_thread_and_unlock(lock);
482 26 }
483
484 void
485 3414 epoll_scheduler::
486 on_work_started() noexcept
487 {
488 3414 outstanding_work_.fetch_add(1, std::memory_order_relaxed);
489 3414 }
490
491 void
492 3382 epoll_scheduler::
493 on_work_finished() noexcept
494 {
495
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3382 times.
6764 if (outstanding_work_.fetch_sub(1, std::memory_order_acq_rel) == 1)
496 stop();
497 3382 }
498
499 bool
500 160837 epoll_scheduler::
501 running_in_this_thread() const noexcept
502 {
503
2/2
✓ Branch 1 taken 160627 times.
✓ Branch 2 taken 210 times.
160837 for (auto* c = context_stack.get(); c != nullptr; c = c->next)
504
1/2
✓ Branch 0 taken 160627 times.
✗ Branch 1 not taken.
160627 if (c->key == this)
505 160627 return true;
506 210 return false;
507 }
508
509 void
510 40 epoll_scheduler::
511 stop()
512 {
513
1/1
✓ Branch 1 taken 40 times.
40 std::unique_lock lock(mutex_);
514
2/2
✓ Branch 0 taken 20 times.
✓ Branch 1 taken 20 times.
40 if (!stopped_)
515 {
516 20 stopped_ = true;
517 20 signal_all(lock);
518
1/1
✓ Branch 1 taken 20 times.
20 interrupt_reactor();
519 }
520 40 }
521
522 bool
523 16 epoll_scheduler::
524 stopped() const noexcept
525 {
526 16 std::unique_lock lock(mutex_);
527 32 return stopped_;
528 16 }
529
530 void
531 49 epoll_scheduler::
532 restart()
533 {
534
1/1
✓ Branch 1 taken 49 times.
49 std::unique_lock lock(mutex_);
535 49 stopped_ = false;
536 49 }
537
538 std::size_t
539 175 epoll_scheduler::
540 run()
541 {
542
2/2
✓ Branch 1 taken 31 times.
✓ Branch 2 taken 144 times.
350 if (outstanding_work_.load(std::memory_order_acquire) == 0)
543 {
544
1/1
✓ Branch 1 taken 31 times.
31 stop();
545 31 return 0;
546 }
547
548 144 thread_context_guard ctx(this);
549
1/1
✓ Branch 1 taken 144 times.
144 std::unique_lock lock(mutex_);
550
551 144 std::size_t n = 0;
552 for (;;)
553 {
554
3/3
✓ Branch 1 taken 148147 times.
✓ Branch 3 taken 144 times.
✓ Branch 4 taken 148003 times.
148147 if (!do_one(lock, -1, &ctx.frame_))
555 144 break;
556
1/2
✓ Branch 1 taken 148003 times.
✗ Branch 2 not taken.
148003 if (n != (std::numeric_limits<std::size_t>::max)())
557 148003 ++n;
558
2/2
✓ Branch 1 taken 64790 times.
✓ Branch 2 taken 83213 times.
148003 if (!lock.owns_lock())
559
1/1
✓ Branch 1 taken 64790 times.
64790 lock.lock();
560 }
561 144 return n;
562 144 }
563
564 std::size_t
565 2 epoll_scheduler::
566 run_one()
567 {
568
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 2 times.
4 if (outstanding_work_.load(std::memory_order_acquire) == 0)
569 {
570 stop();
571 return 0;
572 }
573
574 2 thread_context_guard ctx(this);
575
1/1
✓ Branch 1 taken 2 times.
2 std::unique_lock lock(mutex_);
576
1/1
✓ Branch 1 taken 2 times.
2 return do_one(lock, -1, &ctx.frame_);
577 2 }
578
579 std::size_t
580 14 epoll_scheduler::
581 wait_one(long usec)
582 {
583
2/2
✓ Branch 1 taken 5 times.
✓ Branch 2 taken 9 times.
28 if (outstanding_work_.load(std::memory_order_acquire) == 0)
584 {
585
1/1
✓ Branch 1 taken 5 times.
5 stop();
586 5 return 0;
587 }
588
589 9 thread_context_guard ctx(this);
590
1/1
✓ Branch 1 taken 9 times.
9 std::unique_lock lock(mutex_);
591
1/1
✓ Branch 1 taken 9 times.
9 return do_one(lock, usec, &ctx.frame_);
592 9 }
593
594 std::size_t
595 2 epoll_scheduler::
596 poll()
597 {
598
2/2
✓ Branch 1 taken 1 time.
✓ Branch 2 taken 1 time.
4 if (outstanding_work_.load(std::memory_order_acquire) == 0)
599 {
600
1/1
✓ Branch 1 taken 1 time.
1 stop();
601 1 return 0;
602 }
603
604 1 thread_context_guard ctx(this);
605
1/1
✓ Branch 1 taken 1 time.
1 std::unique_lock lock(mutex_);
606
607 1 std::size_t n = 0;
608 for (;;)
609 {
610
3/3
✓ Branch 1 taken 3 times.
✓ Branch 3 taken 1 time.
✓ Branch 4 taken 2 times.
3 if (!do_one(lock, 0, &ctx.frame_))
611 1 break;
612
1/2
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
2 if (n != (std::numeric_limits<std::size_t>::max)())
613 2 ++n;
614
1/2
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
2 if (!lock.owns_lock())
615
1/1
✓ Branch 1 taken 2 times.
2 lock.lock();
616 }
617 1 return n;
618 1 }
619
620 std::size_t
621 4 epoll_scheduler::
622 poll_one()
623 {
624
2/2
✓ Branch 1 taken 2 times.
✓ Branch 2 taken 2 times.
8 if (outstanding_work_.load(std::memory_order_acquire) == 0)
625 {
626
1/1
✓ Branch 1 taken 2 times.
2 stop();
627 2 return 0;
628 }
629
630 2 thread_context_guard ctx(this);
631
1/1
✓ Branch 1 taken 2 times.
2 std::unique_lock lock(mutex_);
632
1/1
✓ Branch 1 taken 2 times.
2 return do_one(lock, 0, &ctx.frame_);
633 2 }
634
635 void
636 5426 epoll_scheduler::
637 register_descriptor(int fd, descriptor_state* desc) const
638 {
639 5426 epoll_event ev{};
640 5426 ev.events = EPOLLIN | EPOLLOUT | EPOLLET | EPOLLERR | EPOLLHUP;
641 5426 ev.data.ptr = desc;
642
643
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 5426 times.
5426 if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &ev) < 0)
644 detail::throw_system_error(make_err(errno), "epoll_ctl (register)");
645
646 5426 desc->registered_events = ev.events;
647 5426 desc->fd = fd;
648 5426 desc->scheduler_ = this;
649
650
1/1
✓ Branch 1 taken 5426 times.
5426 std::lock_guard lock(desc->mutex);
651 5426 desc->read_ready = false;
652 5426 desc->write_ready = false;
653 5426 }
654
655 void
656 5426 epoll_scheduler::
657 deregister_descriptor(int fd) const
658 {
659 5426 ::epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, nullptr);
660 5426 }
661
662 void
663 5560 epoll_scheduler::
664 work_started() const noexcept
665 {
666 5560 outstanding_work_.fetch_add(1, std::memory_order_relaxed);
667 5560 }
668
669 void
670 10114 epoll_scheduler::
671 work_finished() const noexcept
672 {
673
2/2
✓ Branch 0 taken 148 times.
✓ Branch 1 taken 9966 times.
20228 if (outstanding_work_.fetch_sub(1, std::memory_order_acq_rel) == 1)
674 {
675 // Last work item completed - wake all threads so they can exit.
676 // signal_all() wakes threads waiting on the condvar.
677 // interrupt_reactor() wakes the reactor thread blocked in epoll_wait().
678 // Both are needed because they target different blocking mechanisms.
679 148 std::unique_lock lock(mutex_);
680 148 signal_all(lock);
681
5/6
✓ Branch 1 taken 2 times.
✓ Branch 2 taken 146 times.
✓ Branch 3 taken 2 times.
✗ Branch 4 not taken.
✓ Branch 5 taken 2 times.
✓ Branch 6 taken 146 times.
148 if (task_running_.load(std::memory_order_relaxed) && !task_interrupted_)
682 {
683 2 task_interrupted_ = true;
684 2 lock.unlock();
685 2 interrupt_reactor();
686 }
687 148 }
688 10114 }
689
690 void
691 54878 epoll_scheduler::
692 compensating_work_started() const noexcept
693 {
694 54878 auto* ctx = find_context(this);
695
1/2
✓ Branch 0 taken 54878 times.
✗ Branch 1 not taken.
54878 if (ctx)
696 54878 ++ctx->private_outstanding_work;
697 54878 }
698
699 void
700 epoll_scheduler::
701 drain_thread_queue(op_queue& queue, long count) const
702 {
703 // Note: outstanding_work_ was already incremented when posting
704 std::unique_lock lock(mutex_);
705 completed_ops_.splice(queue);
706 if (count > 0)
707 maybe_unlock_and_signal_one(lock);
708 }
709
710 void
711 5402 epoll_scheduler::
712 post_deferred_completions(op_queue& ops) const
713 {
714
1/2
✓ Branch 1 taken 5402 times.
✗ Branch 2 not taken.
5402 if (ops.empty())
715 5402 return;
716
717 // Fast path: if on scheduler thread, use private queue
718 if (auto* ctx = find_context(this))
719 {
720 ctx->private_queue.splice(ops);
721 return;
722 }
723
724 // Slow path: add to global queue and wake a thread
725 std::unique_lock lock(mutex_);
726 completed_ops_.splice(ops);
727 wake_one_thread_and_unlock(lock);
728 }
729
730 void
731 237 epoll_scheduler::
732 interrupt_reactor() const
733 {
734 // Only write if not already armed to avoid redundant writes
735 237 bool expected = false;
736
2/2
✓ Branch 1 taken 223 times.
✓ Branch 2 taken 14 times.
237 if (eventfd_armed_.compare_exchange_strong(expected, true,
737 std::memory_order_release, std::memory_order_relaxed))
738 {
739 223 std::uint64_t val = 1;
740
1/1
✓ Branch 1 taken 223 times.
223 [[maybe_unused]] auto r = ::write(event_fd_, &val, sizeof(val));
741 }
742 237 }
743
744 void
745 357 epoll_scheduler::
746 signal_all(std::unique_lock<std::mutex>&) const
747 {
748 357 state_ |= 1;
749 357 cond_.notify_all();
750 357 }
751
752 bool
753 1649 epoll_scheduler::
754 maybe_unlock_and_signal_one(std::unique_lock<std::mutex>& lock) const
755 {
756 1649 state_ |= 1;
757
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1649 times.
1649 if (state_ > 1)
758 {
759 lock.unlock();
760 cond_.notify_one();
761 return true;
762 }
763 1649 return false;
764 }
765
766 void
767 187025 epoll_scheduler::
768 unlock_and_signal_one(std::unique_lock<std::mutex>& lock) const
769 {
770 187025 state_ |= 1;
771 187025 bool have_waiters = state_ > 1;
772 187025 lock.unlock();
773
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 187025 times.
187025 if (have_waiters)
774 cond_.notify_one();
775 187025 }
776
777 void
778 epoll_scheduler::
779 clear_signal() const
780 {
781 state_ &= ~std::size_t(1);
782 }
783
784 void
785 epoll_scheduler::
786 wait_for_signal(std::unique_lock<std::mutex>& lock) const
787 {
788 while ((state_ & 1) == 0)
789 {
790 state_ += 2;
791 cond_.wait(lock);
792 state_ -= 2;
793 }
794 }
795
796 void
797 epoll_scheduler::
798 wait_for_signal_for(
799 std::unique_lock<std::mutex>& lock,
800 long timeout_us) const
801 {
802 if ((state_ & 1) == 0)
803 {
804 state_ += 2;
805 cond_.wait_for(lock, std::chrono::microseconds(timeout_us));
806 state_ -= 2;
807 }
808 }
809
810 void
811 1649 epoll_scheduler::
812 wake_one_thread_and_unlock(std::unique_lock<std::mutex>& lock) const
813 {
814
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 1649 times.
1649 if (maybe_unlock_and_signal_one(lock))
815 return;
816
817
5/6
✓ Branch 1 taken 26 times.
✓ Branch 2 taken 1623 times.
✓ Branch 3 taken 26 times.
✗ Branch 4 not taken.
✓ Branch 5 taken 26 times.
✓ Branch 6 taken 1623 times.
1649 if (task_running_.load(std::memory_order_relaxed) && !task_interrupted_)
818 {
819 26 task_interrupted_ = true;
820 26 lock.unlock();
821 26 interrupt_reactor();
822 }
823 else
824 {
825 1623 lock.unlock();
826 }
827 }
828
829 /** RAII guard for handler execution work accounting.
830
831 Handler consumes 1 work item, may produce N new items via fast-path posts.
832 Net change = N - 1:
833 - If N > 1: add (N-1) to global (more work produced than consumed)
834 - If N == 1: net zero, do nothing
835 - If N < 1: call work_finished() (work consumed, may trigger stop)
836
837 Also drains private queue to global for other threads to process.
838 */
839 struct work_cleanup
840 {
841 epoll_scheduler const* scheduler;
842 std::unique_lock<std::mutex>* lock;
843 scheduler_context* ctx;
844
845 148018 ~work_cleanup()
846 {
847
1/2
✓ Branch 0 taken 148018 times.
✗ Branch 1 not taken.
148018 if (ctx)
848 {
849 148018 long produced = ctx->private_outstanding_work;
850
2/2
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 148017 times.
148018 if (produced > 1)
851 1 scheduler->outstanding_work_.fetch_add(produced - 1, std::memory_order_relaxed);
852
2/2
✓ Branch 0 taken 9924 times.
✓ Branch 1 taken 138093 times.
148017 else if (produced < 1)
853 9924 scheduler->work_finished();
854 // produced == 1: net zero, handler consumed what it produced
855 148018 ctx->private_outstanding_work = 0;
856
857
2/2
✓ Branch 1 taken 83216 times.
✓ Branch 2 taken 64802 times.
148018 if (!ctx->private_queue.empty())
858 {
859 83216 lock->lock();
860 83216 scheduler->completed_ops_.splice(ctx->private_queue);
861 }
862 }
863 else
864 {
865 // No thread context - slow-path op was already counted globally
866 scheduler->work_finished();
867 }
868 148018 }
869 };
870
871 /** RAII guard for reactor work accounting.
872
873 Reactor only produces work via timer/signal callbacks posting handlers.
874 Unlike handler execution which consumes 1, the reactor consumes nothing.
875 All produced work must be flushed to global counter.
876 */
877 struct task_cleanup
878 {
879 epoll_scheduler const* scheduler;
880 std::unique_lock<std::mutex>* lock;
881 scheduler_context* ctx;
882
883 44629 ~task_cleanup()
884 44629 {
885
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 44629 times.
44629 if (!ctx)
886 return;
887
888
2/2
✓ Branch 0 taken 2871 times.
✓ Branch 1 taken 41758 times.
44629 if (ctx->private_outstanding_work > 0)
889 {
890 2871 scheduler->outstanding_work_.fetch_add(
891 2871 ctx->private_outstanding_work, std::memory_order_relaxed);
892 2871 ctx->private_outstanding_work = 0;
893 }
894
895
2/2
✓ Branch 1 taken 2871 times.
✓ Branch 2 taken 41758 times.
44629 if (!ctx->private_queue.empty())
896 {
897
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 2871 times.
2871 if (!lock->owns_lock())
898 lock->lock();
899 2871 scheduler->completed_ops_.splice(ctx->private_queue);
900 }
901 44629 }
902 };
903
904 void
905 5736 epoll_scheduler::
906 update_timerfd() const
907 {
908 5736 auto nearest = timer_svc_->nearest_expiry();
909
910 5736 itimerspec ts{};
911 5736 int flags = 0;
912
913
3/3
✓ Branch 2 taken 5736 times.
✓ Branch 4 taken 5696 times.
✓ Branch 5 taken 40 times.
5736 if (nearest == timer_service::time_point::max())
914 {
915 // No timers - disarm by setting to 0 (relative)
916 }
917 else
918 {
919 5696 auto now = std::chrono::steady_clock::now();
920
3/3
✓ Branch 1 taken 5696 times.
✓ Branch 4 taken 30 times.
✓ Branch 5 taken 5666 times.
5696 if (nearest <= now)
921 {
922 // Use 1ns instead of 0 - zero disarms the timerfd
923 30 ts.it_value.tv_nsec = 1;
924 }
925 else
926 {
927 5666 auto nsec = std::chrono::duration_cast<std::chrono::nanoseconds>(
928
1/1
✓ Branch 1 taken 5666 times.
11332 nearest - now).count();
929 5666 ts.it_value.tv_sec = nsec / 1000000000;
930 5666 ts.it_value.tv_nsec = nsec % 1000000000;
931 // Ensure non-zero to avoid disarming if duration rounds to 0
932
3/4
✓ Branch 0 taken 5662 times.
✓ Branch 1 taken 4 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 5662 times.
5666 if (ts.it_value.tv_sec == 0 && ts.it_value.tv_nsec == 0)
933 ts.it_value.tv_nsec = 1;
934 }
935 }
936
937
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 5736 times.
5736 if (::timerfd_settime(timer_fd_, flags, &ts, nullptr) < 0)
938 detail::throw_system_error(make_err(errno), "timerfd_settime");
939 5736 }
940
941 void
942 44629 epoll_scheduler::
943 run_task(std::unique_lock<std::mutex>& lock, scheduler_context* ctx)
944 {
945
2/2
✓ Branch 0 taken 39007 times.
✓ Branch 1 taken 5622 times.
44629 int timeout_ms = task_interrupted_ ? 0 : -1;
946
947
2/2
✓ Branch 1 taken 5622 times.
✓ Branch 2 taken 39007 times.
44629 if (lock.owns_lock())
948
1/1
✓ Branch 1 taken 5622 times.
5622 lock.unlock();
949
950 44629 task_cleanup on_exit{this, &lock, ctx};
951
952 // Flush deferred timerfd programming before blocking
953
2/2
✓ Branch 1 taken 2865 times.
✓ Branch 2 taken 41764 times.
44629 if (timerfd_stale_.exchange(false, std::memory_order_acquire))
954
1/1
✓ Branch 1 taken 2865 times.
2865 update_timerfd();
955
956 // Event loop runs without mutex held
957 epoll_event events[128];
958
1/1
✓ Branch 1 taken 44629 times.
44629 int nfds = ::epoll_wait(epoll_fd_, events, 128, timeout_ms);
959
960
1/4
✗ Branch 0 not taken.
✓ Branch 1 taken 44629 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
44629 if (nfds < 0 && errno != EINTR)
961 detail::throw_system_error(make_err(errno), "epoll_wait");
962
963 44629 bool check_timers = false;
964 44629 op_queue local_ops;
965
966 // Process events without holding the mutex
967
2/2
✓ Branch 0 taken 63185 times.
✓ Branch 1 taken 44629 times.
107814 for (int i = 0; i < nfds; ++i)
968 {
969
2/2
✓ Branch 0 taken 34 times.
✓ Branch 1 taken 63151 times.
63185 if (events[i].data.ptr == nullptr)
970 {
971 std::uint64_t val;
972
1/1
✓ Branch 1 taken 34 times.
34 [[maybe_unused]] auto r = ::read(event_fd_, &val, sizeof(val));
973 34 eventfd_armed_.store(false, std::memory_order_relaxed);
974 34 continue;
975 34 }
976
977
2/2
✓ Branch 0 taken 2871 times.
✓ Branch 1 taken 60280 times.
63151 if (events[i].data.ptr == &timer_fd_)
978 {
979 std::uint64_t expirations;
980
1/1
✓ Branch 1 taken 2871 times.
2871 [[maybe_unused]] auto r = ::read(timer_fd_, &expirations, sizeof(expirations));
981 2871 check_timers = true;
982 2871 continue;
983 2871 }
984
985 // Deferred I/O: just set ready events and enqueue descriptor
986 // No per-descriptor mutex locking in reactor hot path!
987 60280 auto* desc = static_cast<descriptor_state*>(events[i].data.ptr);
988 60280 desc->add_ready_events(events[i].events);
989
990 // Only enqueue if not already enqueued
991 60280 bool expected = false;
992
1/2
✓ Branch 1 taken 60280 times.
✗ Branch 2 not taken.
60280 if (desc->is_enqueued_.compare_exchange_strong(expected, true,
993 std::memory_order_release, std::memory_order_relaxed))
994 {
995 60280 local_ops.push(desc);
996 }
997 }
998
999 // Process timers only when timerfd fires
1000
2/2
✓ Branch 0 taken 2871 times.
✓ Branch 1 taken 41758 times.
44629 if (check_timers)
1001 {
1002
1/1
✓ Branch 1 taken 2871 times.
2871 timer_svc_->process_expired();
1003
1/1
✓ Branch 1 taken 2871 times.
2871 update_timerfd();
1004 }
1005
1006
1/1
✓ Branch 1 taken 44629 times.
44629 lock.lock();
1007
1008
2/2
✓ Branch 1 taken 38596 times.
✓ Branch 2 taken 6033 times.
44629 if (!local_ops.empty())
1009 38596 completed_ops_.splice(local_ops);
1010 44629 }
1011
1012 std::size_t
1013 148163 epoll_scheduler::
1014 do_one(std::unique_lock<std::mutex>& lock, long timeout_us, scheduler_context* ctx)
1015 {
1016 for (;;)
1017 {
1018
2/2
✓ Branch 0 taken 4 times.
✓ Branch 1 taken 192788 times.
192792 if (stopped_)
1019 4 return 0;
1020
1021 192788 scheduler_op* op = completed_ops_.pop();
1022
1023 // Handle reactor sentinel - time to poll for I/O
1024
2/2
✓ Branch 0 taken 44769 times.
✓ Branch 1 taken 148019 times.
192788 if (op == &task_op_)
1025 {
1026 44769 bool more_handlers = !completed_ops_.empty();
1027
1028 // Nothing to run the reactor for: no pending work to wait on,
1029 // or caller requested a non-blocking poll
1030
4/4
✓ Branch 0 taken 5762 times.
✓ Branch 1 taken 39007 times.
✓ Branch 2 taken 140 times.
✓ Branch 3 taken 44629 times.
50531 if (!more_handlers &&
1031
3/4
✓ Branch 1 taken 5622 times.
✓ Branch 2 taken 140 times.
✗ Branch 3 not taken.
✓ Branch 4 taken 5622 times.
11524 (outstanding_work_.load(std::memory_order_acquire) == 0 ||
1032 timeout_us == 0))
1033 {
1034 140 completed_ops_.push(&task_op_);
1035 140 return 0;
1036 }
1037
1038
3/4
✓ Branch 0 taken 5622 times.
✓ Branch 1 taken 39007 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 5622 times.
44629 task_interrupted_ = more_handlers || timeout_us == 0;
1039 44629 task_running_.store(true, std::memory_order_release);
1040
1041
2/2
✓ Branch 0 taken 39007 times.
✓ Branch 1 taken 5622 times.
44629 if (more_handlers)
1042 39007 unlock_and_signal_one(lock);
1043
1044 44629 run_task(lock, ctx);
1045
1046 44629 task_running_.store(false, std::memory_order_relaxed);
1047 44629 completed_ops_.push(&task_op_);
1048 44629 continue;
1049 44629 }
1050
1051 // Handle operation
1052
2/2
✓ Branch 0 taken 148018 times.
✓ Branch 1 taken 1 time.
148019 if (op != nullptr)
1053 {
1054
1/2
✓ Branch 1 taken 148018 times.
✗ Branch 2 not taken.
148018 if (!completed_ops_.empty())
1055
1/1
✓ Branch 1 taken 148018 times.
148018 unlock_and_signal_one(lock);
1056 else
1057 lock.unlock();
1058
1059 148018 work_cleanup on_exit{this, &lock, ctx};
1060
1061
1/1
✓ Branch 1 taken 148018 times.
148018 (*op)();
1062 148018 return 1;
1063 148018 }
1064
1065 // No pending work to wait on, or caller requested non-blocking poll
1066
2/6
✗ Branch 1 not taken.
✓ Branch 2 taken 1 time.
✗ Branch 3 not taken.
✗ Branch 4 not taken.
✓ Branch 5 taken 1 time.
✗ Branch 6 not taken.
2 if (outstanding_work_.load(std::memory_order_acquire) == 0 ||
1067 timeout_us == 0)
1068 1 return 0;
1069
1070 clear_signal();
1071 if (timeout_us < 0)
1072 wait_for_signal(lock);
1073 else
1074 wait_for_signal_for(lock, timeout_us);
1075 44629 }
1076 }
1077
1078 } // namespace boost::corosio::detail
1079
1080 #endif
1081