libs/corosio/src/corosio/src/detail/epoll/op.hpp

81.4% Lines (79/97) 80.0% Functions (16/20) 50.0% Branches (12/24)
libs/corosio/src/corosio/src/detail/epoll/op.hpp
Line Branch Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_DETAIL_EPOLL_OP_HPP
11 #define BOOST_COROSIO_DETAIL_EPOLL_OP_HPP
12
13 #include <boost/corosio/detail/platform.hpp>
14
15 #if BOOST_COROSIO_HAS_EPOLL
16
17 #include <boost/corosio/detail/config.hpp>
18 #include <boost/corosio/io_object.hpp>
19 #include <boost/corosio/endpoint.hpp>
20 #include <boost/capy/ex/executor_ref.hpp>
21 #include <coroutine>
22 #include <boost/capy/error.hpp>
23 #include <system_error>
24
25 #include "src/detail/make_err.hpp"
26 #include "src/detail/dispatch_coro.hpp"
27 #include "src/detail/scheduler_op.hpp"
28 #include "src/detail/endpoint_convert.hpp"
29
30 #include <unistd.h>
31 #include <errno.h>
32
33 #include <atomic>
34 #include <cstddef>
35 #include <memory>
36 #include <mutex>
37 #include <optional>
38 #include <stop_token>
39
40 #include <netinet/in.h>
41 #include <sys/socket.h>
42 #include <sys/uio.h>
43
44 /*
45 epoll Operation State
46 =====================
47
48 Each async I/O operation has a corresponding epoll_op-derived struct that
49 holds the operation's state while it's in flight. The socket impl owns
50 fixed slots for each operation type (conn_, rd_, wr_), so only one
51 operation of each type can be pending per socket at a time.
52
53 Persistent Registration
54 -----------------------
55 File descriptors are registered with epoll once (via descriptor_state) and
56 stay registered until closed. The descriptor_state tracks which operations
57 are pending (read_op, write_op, connect_op). When an event arrives, the
58 reactor dispatches to the appropriate pending operation.
59
60 Impl Lifetime Management
61 ------------------------
62 When cancel() posts an op to the scheduler's ready queue, the socket impl
63 might be destroyed before the scheduler processes the op. The `impl_ptr`
64 member holds a shared_ptr to the impl, keeping it alive until the op
65 completes. This is set by cancel() and cleared in operator() after the
66 coroutine is resumed.
67
68 EOF Detection
69 -------------
70 For reads, 0 bytes with no error means EOF. But an empty user buffer also
71 returns 0 bytes. The `empty_buffer_read` flag distinguishes these cases.
72
73 SIGPIPE Prevention
74 ------------------
75 Writes use sendmsg() with MSG_NOSIGNAL instead of writev() to prevent
76 SIGPIPE when the peer has closed.
77 */
78
79 namespace boost::corosio::detail {
80
81 // Forward declarations
82 class epoll_socket_impl;
83 class epoll_acceptor_impl;
84 struct epoll_op;
85
86 // Forward declaration
87 class epoll_scheduler;
88
89 /** Per-descriptor state for persistent epoll registration.
90
91 Tracks pending operations for a file descriptor. The fd is registered
92 once with epoll and stays registered until closed.
93
94 This struct extends scheduler_op to support deferred I/O processing.
95 When epoll events arrive, the reactor sets ready_events and queues
96 this descriptor for processing. When popped from the scheduler queue,
97 operator() performs the actual I/O and queues completion handlers.
98
99 @par Deferred I/O Model
100 The reactor no longer performs I/O directly. Instead:
101 1. Reactor sets ready_events and queues descriptor_state
102 2. Scheduler pops descriptor_state and calls operator()
103 3. operator() performs I/O under mutex and queues completions
104
105 This eliminates per-descriptor mutex locking from the reactor hot path.
106
107 @par Thread Safety
108 The mutex protects operation pointers and ready flags during I/O.
109 ready_events_ and is_enqueued_ are atomic for lock-free reactor access.
110 */
111 struct descriptor_state : scheduler_op
112 {
113 std::mutex mutex;
114
115 // Protected by mutex
116 epoll_op* read_op = nullptr;
117 epoll_op* write_op = nullptr;
118 epoll_op* connect_op = nullptr;
119
120 // Caches edge events that arrived before an op was registered
121 bool read_ready = false;
122 bool write_ready = false;
123
124 // Set during registration only (no mutex needed)
125 std::uint32_t registered_events = 0;
126 int fd = -1;
127
128 // For deferred I/O - set by reactor, read by scheduler
129 std::atomic<std::uint32_t> ready_events_{0};
130 std::atomic<bool> is_enqueued_{false};
131 epoll_scheduler const* scheduler_ = nullptr;
132
133 // Prevents impl destruction while this descriptor_state is queued.
134 // Set by close_socket() when is_enqueued_ is true, cleared by operator().
135 std::shared_ptr<void> impl_ref_;
136
137 /// Add ready events atomically.
138 60280 void add_ready_events(std::uint32_t ev) noexcept
139 {
140 60280 ready_events_.fetch_or(ev, std::memory_order_relaxed);
141 60280 }
142
143 /// Perform deferred I/O and queue completions.
144 void operator()() override;
145
146 /// Destroy without invoking.
147 void destroy() override {}
148 };
149
150 struct epoll_op : scheduler_op
151 {
152 struct canceller
153 {
154 epoll_op* op;
155 void operator()() const noexcept;
156 };
157
158 std::coroutine_handle<> h;
159 capy::executor_ref ex;
160 std::error_code* ec_out = nullptr;
161 std::size_t* bytes_out = nullptr;
162
163 int fd = -1;
164 int errn = 0;
165 std::size_t bytes_transferred = 0;
166
167 std::atomic<bool> cancelled{false};
168 std::optional<std::stop_callback<canceller>> stop_cb;
169
170 // Prevents use-after-free when socket is closed with pending ops.
171 // See "Impl Lifetime Management" in file header.
172 std::shared_ptr<void> impl_ptr;
173
174 // For stop_token cancellation - pointer to owning socket/acceptor impl.
175 // When stop is requested, we call back to the impl to perform actual I/O cancellation.
176 epoll_socket_impl* socket_impl_ = nullptr;
177 epoll_acceptor_impl* acceptor_impl_ = nullptr;
178
179 16156 epoll_op() = default;
180
181 245928 void reset() noexcept
182 {
183 245928 fd = -1;
184 245928 errn = 0;
185 245928 bytes_transferred = 0;
186 245928 cancelled.store(false, std::memory_order_relaxed);
187 245928 impl_ptr.reset();
188 245928 socket_impl_ = nullptr;
189 245928 acceptor_impl_ = nullptr;
190 245928 }
191
192 // Defined in sockets.cpp where epoll_socket_impl is complete
193 void operator()() override;
194
195 40030 virtual bool is_read_operation() const noexcept { return false; }
196 virtual void cancel() noexcept = 0;
197
198 void destroy() override
199 {
200 stop_cb.reset();
201 impl_ptr.reset();
202 }
203
204 24674 void request_cancel() noexcept
205 {
206 24674 cancelled.store(true, std::memory_order_release);
207 24674 }
208
209 82931 void start(std::stop_token token, epoll_socket_impl* impl)
210 {
211 82931 cancelled.store(false, std::memory_order_release);
212 82931 stop_cb.reset();
213 82931 socket_impl_ = impl;
214 82931 acceptor_impl_ = nullptr;
215
216
2/2
✓ Branch 1 taken 99 times.
✓ Branch 2 taken 82832 times.
82931 if (token.stop_possible())
217 99 stop_cb.emplace(token, canceller{this});
218 82931 }
219
220 2685 void start(std::stop_token token, epoll_acceptor_impl* impl)
221 {
222 2685 cancelled.store(false, std::memory_order_release);
223 2685 stop_cb.reset();
224 2685 socket_impl_ = nullptr;
225 2685 acceptor_impl_ = impl;
226
227
2/2
✓ Branch 1 taken 9 times.
✓ Branch 2 taken 2676 times.
2685 if (token.stop_possible())
228 9 stop_cb.emplace(token, canceller{this});
229 2685 }
230
231 85550 void complete(int err, std::size_t bytes) noexcept
232 {
233 85550 errn = err;
234 85550 bytes_transferred = bytes;
235 85550 }
236
237 virtual void perform_io() noexcept {}
238 };
239
240
241 struct epoll_connect_op : epoll_op
242 {
243 endpoint target_endpoint;
244
245 2677 void reset() noexcept
246 {
247 2677 epoll_op::reset();
248 2677 target_endpoint = endpoint{};
249 2677 }
250
251 2677 void perform_io() noexcept override
252 {
253 // connect() completion status is retrieved via SO_ERROR, not return value
254 2677 int err = 0;
255 2677 socklen_t len = sizeof(err);
256
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 2677 times.
2677 if (::getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &len) < 0)
257 err = errno;
258 2677 complete(err, 0);
259 2677 }
260
261 // Defined in sockets.cpp where epoll_socket_impl is complete
262 void operator()() override;
263 void cancel() noexcept override;
264 };
265
266
267 struct epoll_read_op : epoll_op
268 {
269 static constexpr std::size_t max_buffers = 16;
270 iovec iovecs[max_buffers];
271 int iovec_count = 0;
272 bool empty_buffer_read = false;
273
274 40067 bool is_read_operation() const noexcept override
275 {
276 40067 return !empty_buffer_read;
277 }
278
279 120359 void reset() noexcept
280 {
281 120359 epoll_op::reset();
282 120359 iovec_count = 0;
283 120359 empty_buffer_read = false;
284 120359 }
285
286 143 void perform_io() noexcept override
287 {
288 ssize_t n;
289 do {
290 143 n = ::readv(fd, iovecs, iovec_count);
291
3/4
✓ Branch 0 taken 92 times.
✓ Branch 1 taken 51 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 92 times.
143 } while (n < 0 && errno == EINTR);
292
293
2/2
✓ Branch 0 taken 51 times.
✓ Branch 1 taken 92 times.
143 if (n >= 0)
294 51 complete(0, static_cast<std::size_t>(n));
295 else
296 92 complete(errno, 0);
297 143 }
298
299 void cancel() noexcept override;
300 };
301
302
303 struct epoll_write_op : epoll_op
304 {
305 static constexpr std::size_t max_buffers = 16;
306 iovec iovecs[max_buffers];
307 int iovec_count = 0;
308
309 120207 void reset() noexcept
310 {
311 120207 epoll_op::reset();
312 120207 iovec_count = 0;
313 120207 }
314
315 void perform_io() noexcept override
316 {
317 msghdr msg{};
318 msg.msg_iov = iovecs;
319 msg.msg_iovlen = static_cast<std::size_t>(iovec_count);
320
321 ssize_t n;
322 do {
323 n = ::sendmsg(fd, &msg, MSG_NOSIGNAL);
324 } while (n < 0 && errno == EINTR);
325
326 if (n >= 0)
327 complete(0, static_cast<std::size_t>(n));
328 else
329 complete(errno, 0);
330 }
331
332 void cancel() noexcept override;
333 };
334
335
336 struct epoll_accept_op : epoll_op
337 {
338 int accepted_fd = -1;
339 io_object::io_object_impl** impl_out = nullptr;
340 sockaddr_in peer_addr{};
341
342 2685 void reset() noexcept
343 {
344 2685 epoll_op::reset();
345 2685 accepted_fd = -1;
346 2685 impl_out = nullptr;
347 2685 peer_addr = {};
348 2685 }
349
350 2674 void perform_io() noexcept override
351 {
352 2674 socklen_t addrlen = sizeof(peer_addr);
353 int new_fd;
354 do {
355 2674 new_fd = ::accept4(fd, reinterpret_cast<sockaddr*>(&peer_addr),
356 &addrlen, SOCK_NONBLOCK | SOCK_CLOEXEC);
357
1/4
✗ Branch 0 not taken.
✓ Branch 1 taken 2674 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
2674 } while (new_fd < 0 && errno == EINTR);
358
359
1/2
✓ Branch 0 taken 2674 times.
✗ Branch 1 not taken.
2674 if (new_fd >= 0)
360 {
361 2674 accepted_fd = new_fd;
362 2674 complete(0, 0);
363 }
364 else
365 {
366 complete(errno, 0);
367 }
368 2674 }
369
370 // Defined in acceptors.cpp where epoll_acceptor_impl is complete
371 void operator()() override;
372 void cancel() noexcept override;
373 };
374
375 } // namespace boost::corosio::detail
376
377 #endif // BOOST_COROSIO_HAS_EPOLL
378
379 #endif // BOOST_COROSIO_DETAIL_EPOLL_OP_HPP
380