libs/corosio/src/corosio/src/detail/epoll/sockets.cpp

80.7% Lines (334/414) 94.4% Functions (34/36) 64.3% Branches (142/221)
libs/corosio/src/corosio/src/detail/epoll/sockets.cpp
Line Branch Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #include <boost/corosio/detail/platform.hpp>
11
12 #if BOOST_COROSIO_HAS_EPOLL
13
14 #include "src/detail/epoll/sockets.hpp"
15 #include "src/detail/endpoint_convert.hpp"
16 #include "src/detail/make_err.hpp"
17 #include "src/detail/dispatch_coro.hpp"
18
19 #include <boost/corosio/detail/except.hpp>
20 #include <boost/capy/buffers.hpp>
21
22 #include <utility>
23
24 #include <errno.h>
25 #include <netinet/in.h>
26 #include <netinet/tcp.h>
27 #include <sys/epoll.h>
28 #include <sys/socket.h>
29 #include <unistd.h>
30
31 namespace boost::corosio::detail {
32
33 // Register an op with the reactor, handling cached edge events.
34 // Called under the EAGAIN/EINPROGRESS path when speculative I/O failed.
35 void
36 2877 epoll_socket_impl::
37 register_op(
38 epoll_op& op,
39 epoll_op*& desc_slot,
40 bool& ready_flag) noexcept
41 {
42 2877 svc_.work_started();
43
44 2877 std::lock_guard lock(desc_state_.mutex);
45 2877 bool io_done = false;
46
2/2
✓ Branch 0 taken 92 times.
✓ Branch 1 taken 2785 times.
2877 if (ready_flag)
47 {
48 92 ready_flag = false;
49 92 op.perform_io();
50
1/4
✗ Branch 0 not taken.
✓ Branch 1 taken 92 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
92 io_done = (op.errn != EAGAIN && op.errn != EWOULDBLOCK);
51
1/2
✓ Branch 0 taken 92 times.
✗ Branch 1 not taken.
92 if (!io_done)
52 92 op.errn = 0;
53 }
54
55
3/6
✓ Branch 0 taken 2877 times.
✗ Branch 1 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 2877 times.
✗ Branch 5 not taken.
✓ Branch 6 taken 2877 times.
2877 if (io_done || op.cancelled.load(std::memory_order_acquire))
56 {
57 svc_.post(&op);
58 svc_.work_finished();
59 }
60 else
61 {
62 2877 desc_slot = &op;
63 }
64 2877 }
65
66 void
67 104 epoll_op::canceller::
68 operator()() const noexcept
69 {
70 104 op->cancel();
71 104 }
72
73 void
74 epoll_connect_op::
75 cancel() noexcept
76 {
77 if (socket_impl_)
78 socket_impl_->cancel_single_op(*this);
79 else
80 request_cancel();
81 }
82
83 void
84 98 epoll_read_op::
85 cancel() noexcept
86 {
87
1/2
✓ Branch 0 taken 98 times.
✗ Branch 1 not taken.
98 if (socket_impl_)
88 98 socket_impl_->cancel_single_op(*this);
89 else
90 request_cancel();
91 98 }
92
93 void
94 epoll_write_op::
95 cancel() noexcept
96 {
97 if (socket_impl_)
98 socket_impl_->cancel_single_op(*this);
99 else
100 request_cancel();
101 }
102
103 void
104 80254 epoll_op::
105 operator()()
106 {
107 80254 stop_cb.reset();
108
109 80254 socket_impl_->svc_.scheduler().reset_inline_budget();
110
111
2/2
✓ Branch 1 taken 157 times.
✓ Branch 2 taken 80097 times.
80254 if (cancelled.load(std::memory_order_acquire))
112 157 *ec_out = capy::error::canceled;
113
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 80097 times.
80097 else if (errn != 0)
114 *ec_out = make_err(errn);
115
4/6
✓ Branch 1 taken 40066 times.
✓ Branch 2 taken 40031 times.
✗ Branch 3 not taken.
✓ Branch 4 taken 40066 times.
✗ Branch 5 not taken.
✓ Branch 6 taken 80097 times.
80097 else if (is_read_operation() && bytes_transferred == 0)
116 *ec_out = capy::error::eof;
117 else
118 80097 *ec_out = {};
119
120 80254 *bytes_out = bytes_transferred;
121
122 // Move to stack before resuming coroutine. The coroutine might close
123 // the socket, releasing the last wrapper ref. If impl_ptr were the
124 // last ref and we destroyed it while still in operator(), we'd have
125 // use-after-free. Moving to local ensures destruction happens at
126 // function exit, after all member accesses are complete.
127 80254 capy::executor_ref saved_ex( std::move( ex ) );
128 80254 std::coroutine_handle<> saved_h( std::move( h ) );
129 80254 auto prevent_premature_destruction = std::move(impl_ptr);
130
2/2
✓ Branch 1 taken 80254 times.
✓ Branch 4 taken 80254 times.
80254 dispatch_coro(saved_ex, saved_h).resume();
131 80254 }
132
133 void
134 2677 epoll_connect_op::
135 operator()()
136 {
137 2677 stop_cb.reset();
138
139 2677 socket_impl_->svc_.scheduler().reset_inline_budget();
140
141
3/4
✓ Branch 0 taken 2676 times.
✓ Branch 1 taken 1 time.
✓ Branch 3 taken 2676 times.
✗ Branch 4 not taken.
2677 bool success = (errn == 0 && !cancelled.load(std::memory_order_acquire));
142
143 // Cache endpoints on successful connect
144
3/4
✓ Branch 0 taken 2676 times.
✓ Branch 1 taken 1 time.
✓ Branch 2 taken 2676 times.
✗ Branch 3 not taken.
2677 if (success && socket_impl_)
145 {
146 // Query local endpoint via getsockname (may fail, but remote is always known)
147 2676 endpoint local_ep;
148 2676 sockaddr_in local_addr{};
149 2676 socklen_t local_len = sizeof(local_addr);
150
1/2
✓ Branch 1 taken 2676 times.
✗ Branch 2 not taken.
2676 if (::getsockname(fd, reinterpret_cast<sockaddr*>(&local_addr), &local_len) == 0)
151 2676 local_ep = from_sockaddr_in(local_addr);
152 // Always cache remote endpoint; local may be default if getsockname failed
153 2676 static_cast<epoll_socket_impl*>(socket_impl_)->set_endpoints(local_ep, target_endpoint);
154 }
155
156
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 2677 times.
2677 if (cancelled.load(std::memory_order_acquire))
157 *ec_out = capy::error::canceled;
158
2/2
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 2676 times.
2677 else if (errn != 0)
159 1 *ec_out = make_err(errn);
160 else
161 2676 *ec_out = {};
162
163 // Move to stack before resuming. See epoll_op::operator()() for rationale.
164 2677 capy::executor_ref saved_ex( std::move( ex ) );
165 2677 std::coroutine_handle<> saved_h( std::move( h ) );
166 2677 auto prevent_premature_destruction = std::move(impl_ptr);
167
2/2
✓ Branch 1 taken 2677 times.
✓ Branch 4 taken 2677 times.
2677 dispatch_coro(saved_ex, saved_h).resume();
168 2677 }
169
170 5364 epoll_socket_impl::
171 5364 epoll_socket_impl(epoll_socket_service& svc) noexcept
172 5364 : svc_(svc)
173 {
174 5364 }
175
176 5364 epoll_socket_impl::
177 ~epoll_socket_impl() = default;
178
179 void
180 5364 epoll_socket_impl::
181 release()
182 {
183 5364 close_socket();
184 5364 svc_.destroy_impl(*this);
185 5364 }
186
187 std::coroutine_handle<>
188 2677 epoll_socket_impl::
189 connect(
190 std::coroutine_handle<> h,
191 capy::executor_ref ex,
192 endpoint ep,
193 std::stop_token token,
194 std::error_code* ec)
195 {
196 2677 auto& op = conn_;
197
198 2677 sockaddr_in addr = detail::to_sockaddr_in(ep);
199
1/1
✓ Branch 1 taken 2677 times.
2677 int result = ::connect(fd_, reinterpret_cast<sockaddr*>(&addr), sizeof(addr));
200
201
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2677 times.
2677 if (result == 0)
202 {
203 sockaddr_in local_addr{};
204 socklen_t local_len = sizeof(local_addr);
205 if (::getsockname(fd_, reinterpret_cast<sockaddr*>(&local_addr), &local_len) == 0)
206 local_endpoint_ = detail::from_sockaddr_in(local_addr);
207 remote_endpoint_ = ep;
208 }
209
210
2/4
✓ Branch 0 taken 2677 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 2677 times.
2677 if (result == 0 || errno != EINPROGRESS)
211 {
212 int err = (result < 0) ? errno : 0;
213 if (svc_.scheduler().try_consume_inline_budget())
214 {
215 *ec = err ? make_err(err) : std::error_code{};
216 return ex.dispatch(h);
217 }
218 op.reset();
219 op.h = h;
220 op.ex = ex;
221 op.ec_out = ec;
222 op.fd = fd_;
223 op.target_endpoint = ep;
224 op.start(token, this);
225 op.impl_ptr = shared_from_this();
226 op.complete(err, 0);
227 svc_.post(&op);
228 return std::noop_coroutine();
229 }
230
231 // EINPROGRESS — register with reactor
232 2677 op.reset();
233 2677 op.h = h;
234 2677 op.ex = ex;
235 2677 op.ec_out = ec;
236 2677 op.fd = fd_;
237 2677 op.target_endpoint = ep;
238 2677 op.start(token, this);
239
1/1
✓ Branch 1 taken 2677 times.
2677 op.impl_ptr = shared_from_this();
240
241 2677 register_op(op, desc_state_.connect_op, desc_state_.write_ready);
242 2677 return std::noop_coroutine();
243 }
244
245 std::coroutine_handle<>
246 120359 epoll_socket_impl::
247 read_some(
248 std::coroutine_handle<> h,
249 capy::executor_ref ex,
250 io_buffer_param param,
251 std::stop_token token,
252 std::error_code* ec,
253 std::size_t* bytes_out)
254 {
255 120359 auto& op = rd_;
256 120359 op.reset();
257
258 120359 capy::mutable_buffer bufs[epoll_read_op::max_buffers];
259 120359 op.iovec_count = static_cast<int>(param.copy_to(bufs, epoll_read_op::max_buffers));
260
261
6/8
✓ Branch 0 taken 120358 times.
✓ Branch 1 taken 1 time.
✓ Branch 2 taken 120358 times.
✗ Branch 3 not taken.
✗ Branch 5 not taken.
✓ Branch 6 taken 120358 times.
✓ Branch 7 taken 1 time.
✓ Branch 8 taken 120358 times.
120359 if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
262 {
263 1 op.empty_buffer_read = true;
264 1 op.h = h;
265 1 op.ex = ex;
266 1 op.ec_out = ec;
267 1 op.bytes_out = bytes_out;
268 1 op.start(token, this);
269
1/1
✓ Branch 1 taken 1 time.
1 op.impl_ptr = shared_from_this();
270 1 op.complete(0, 0);
271
1/1
✓ Branch 1 taken 1 time.
1 svc_.post(&op);
272 1 return std::noop_coroutine();
273 }
274
275
2/2
✓ Branch 0 taken 120358 times.
✓ Branch 1 taken 120358 times.
240716 for (int i = 0; i < op.iovec_count; ++i)
276 {
277 120358 op.iovecs[i].iov_base = bufs[i].data();
278 120358 op.iovecs[i].iov_len = bufs[i].size();
279 }
280
281 // Speculative read
282 ssize_t n;
283 do {
284
1/1
✓ Branch 1 taken 120358 times.
120358 n = ::readv(fd_, op.iovecs, op.iovec_count);
285
3/4
✓ Branch 0 taken 200 times.
✓ Branch 1 taken 120158 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 200 times.
120358 } while (n < 0 && errno == EINTR);
286
287
3/6
✓ Branch 0 taken 200 times.
✓ Branch 1 taken 120158 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 200 times.
✗ Branch 4 not taken.
✗ Branch 5 not taken.
120358 if (n >= 0 || (errno != EAGAIN && errno != EWOULDBLOCK))
288 {
289
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 120158 times.
120158 int err = (n < 0) ? errno : 0;
290 120158 auto bytes = (n > 0) ? static_cast<std::size_t>(n) : std::size_t(0);
291
292
2/2
✓ Branch 2 taken 80139 times.
✓ Branch 3 taken 40019 times.
120158 if (svc_.scheduler().try_consume_inline_budget())
293 {
294
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 80139 times.
80139 if (err)
295 *ec = make_err(err);
296
2/2
✓ Branch 0 taken 5 times.
✓ Branch 1 taken 80134 times.
80139 else if (n == 0)
297 5 *ec = capy::error::eof;
298 else
299 80134 *ec = {};
300 80139 *bytes_out = bytes;
301
1/1
✓ Branch 1 taken 80139 times.
80139 return ex.dispatch(h);
302 }
303 40019 op.h = h;
304 40019 op.ex = ex;
305 40019 op.ec_out = ec;
306 40019 op.bytes_out = bytes_out;
307 40019 op.start(token, this);
308
1/1
✓ Branch 1 taken 40019 times.
40019 op.impl_ptr = shared_from_this();
309 40019 op.complete(err, bytes);
310
1/1
✓ Branch 1 taken 40019 times.
40019 svc_.post(&op);
311 40019 return std::noop_coroutine();
312 }
313
314 // EAGAIN — register with reactor
315 200 op.h = h;
316 200 op.ex = ex;
317 200 op.ec_out = ec;
318 200 op.bytes_out = bytes_out;
319 200 op.fd = fd_;
320 200 op.start(token, this);
321
1/1
✓ Branch 1 taken 200 times.
200 op.impl_ptr = shared_from_this();
322
323 200 register_op(op, desc_state_.read_op, desc_state_.read_ready);
324 200 return std::noop_coroutine();
325 }
326
327 std::coroutine_handle<>
328 120207 epoll_socket_impl::
329 write_some(
330 std::coroutine_handle<> h,
331 capy::executor_ref ex,
332 io_buffer_param param,
333 std::stop_token token,
334 std::error_code* ec,
335 std::size_t* bytes_out)
336 {
337 120207 auto& op = wr_;
338 120207 op.reset();
339
340 120207 capy::mutable_buffer bufs[epoll_write_op::max_buffers];
341 120207 op.iovec_count = static_cast<int>(param.copy_to(bufs, epoll_write_op::max_buffers));
342
343
6/8
✓ Branch 0 taken 120206 times.
✓ Branch 1 taken 1 time.
✓ Branch 2 taken 120206 times.
✗ Branch 3 not taken.
✗ Branch 5 not taken.
✓ Branch 6 taken 120206 times.
✓ Branch 7 taken 1 time.
✓ Branch 8 taken 120206 times.
120207 if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
344 {
345 1 op.h = h;
346 1 op.ex = ex;
347 1 op.ec_out = ec;
348 1 op.bytes_out = bytes_out;
349 1 op.start(token, this);
350
1/1
✓ Branch 1 taken 1 time.
1 op.impl_ptr = shared_from_this();
351 1 op.complete(0, 0);
352
1/1
✓ Branch 1 taken 1 time.
1 svc_.post(&op);
353 1 return std::noop_coroutine();
354 }
355
356
2/2
✓ Branch 0 taken 120206 times.
✓ Branch 1 taken 120206 times.
240412 for (int i = 0; i < op.iovec_count; ++i)
357 {
358 120206 op.iovecs[i].iov_base = bufs[i].data();
359 120206 op.iovecs[i].iov_len = bufs[i].size();
360 }
361
362 // Speculative write
363 120206 msghdr msg{};
364 120206 msg.msg_iov = op.iovecs;
365 120206 msg.msg_iovlen = static_cast<std::size_t>(op.iovec_count);
366
367 ssize_t n;
368 do {
369
1/1
✓ Branch 1 taken 120206 times.
120206 n = ::sendmsg(fd_, &msg, MSG_NOSIGNAL);
370
3/4
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 120205 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 1 time.
120206 } while (n < 0 && errno == EINTR);
371
372
4/6
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 120205 times.
✓ Branch 2 taken 1 time.
✗ Branch 3 not taken.
✓ Branch 4 taken 1 time.
✗ Branch 5 not taken.
120206 if (n >= 0 || (errno != EAGAIN && errno != EWOULDBLOCK))
373 {
374
2/2
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 120205 times.
120206 int err = (n < 0) ? errno : 0;
375 120206 auto bytes = (n > 0) ? static_cast<std::size_t>(n) : std::size_t(0);
376
377
2/2
✓ Branch 2 taken 80173 times.
✓ Branch 3 taken 40033 times.
120206 if (svc_.scheduler().try_consume_inline_budget())
378 {
379
2/2
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 80172 times.
80173 *ec = err ? make_err(err) : std::error_code{};
380 80173 *bytes_out = bytes;
381
1/1
✓ Branch 1 taken 80173 times.
80173 return ex.dispatch(h);
382 }
383 40033 op.h = h;
384 40033 op.ex = ex;
385 40033 op.ec_out = ec;
386 40033 op.bytes_out = bytes_out;
387 40033 op.start(token, this);
388
1/1
✓ Branch 1 taken 40033 times.
40033 op.impl_ptr = shared_from_this();
389 40033 op.complete(err, bytes);
390
1/1
✓ Branch 1 taken 40033 times.
40033 svc_.post(&op);
391 40033 return std::noop_coroutine();
392 }
393
394 // EAGAIN — register with reactor
395 op.h = h;
396 op.ex = ex;
397 op.ec_out = ec;
398 op.bytes_out = bytes_out;
399 op.fd = fd_;
400 op.start(token, this);
401 op.impl_ptr = shared_from_this();
402
403 register_op(op, desc_state_.write_op, desc_state_.write_ready);
404 return std::noop_coroutine();
405 }
406
407 std::error_code
408 3 epoll_socket_impl::
409 shutdown(tcp_socket::shutdown_type what) noexcept
410 {
411 int how;
412
3/4
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 1 time.
✓ Branch 2 taken 1 time.
✗ Branch 3 not taken.
3 switch (what)
413 {
414 1 case tcp_socket::shutdown_receive: how = SHUT_RD; break;
415 1 case tcp_socket::shutdown_send: how = SHUT_WR; break;
416 1 case tcp_socket::shutdown_both: how = SHUT_RDWR; break;
417 default:
418 return make_err(EINVAL);
419 }
420
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 3 times.
3 if (::shutdown(fd_, how) != 0)
421 return make_err(errno);
422 3 return {};
423 }
424
425 std::error_code
426 5 epoll_socket_impl::
427 set_no_delay(bool value) noexcept
428 {
429
2/2
✓ Branch 0 taken 4 times.
✓ Branch 1 taken 1 time.
5 int flag = value ? 1 : 0;
430
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 5 times.
5 if (::setsockopt(fd_, IPPROTO_TCP, TCP_NODELAY, &flag, sizeof(flag)) != 0)
431 return make_err(errno);
432 5 return {};
433 }
434
435 bool
436 5 epoll_socket_impl::
437 no_delay(std::error_code& ec) const noexcept
438 {
439 5 int flag = 0;
440 5 socklen_t len = sizeof(flag);
441
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 5 times.
5 if (::getsockopt(fd_, IPPROTO_TCP, TCP_NODELAY, &flag, &len) != 0)
442 {
443 ec = make_err(errno);
444 return false;
445 }
446 5 ec = {};
447 5 return flag != 0;
448 }
449
450 std::error_code
451 4 epoll_socket_impl::
452 set_keep_alive(bool value) noexcept
453 {
454
2/2
✓ Branch 0 taken 3 times.
✓ Branch 1 taken 1 time.
4 int flag = value ? 1 : 0;
455
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 4 times.
4 if (::setsockopt(fd_, SOL_SOCKET, SO_KEEPALIVE, &flag, sizeof(flag)) != 0)
456 return make_err(errno);
457 4 return {};
458 }
459
460 bool
461 4 epoll_socket_impl::
462 keep_alive(std::error_code& ec) const noexcept
463 {
464 4 int flag = 0;
465 4 socklen_t len = sizeof(flag);
466
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 4 times.
4 if (::getsockopt(fd_, SOL_SOCKET, SO_KEEPALIVE, &flag, &len) != 0)
467 {
468 ec = make_err(errno);
469 return false;
470 }
471 4 ec = {};
472 4 return flag != 0;
473 }
474
475 std::error_code
476 1 epoll_socket_impl::
477 set_receive_buffer_size(int size) noexcept
478 {
479
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 1 time.
1 if (::setsockopt(fd_, SOL_SOCKET, SO_RCVBUF, &size, sizeof(size)) != 0)
480 return make_err(errno);
481 1 return {};
482 }
483
484 int
485 3 epoll_socket_impl::
486 receive_buffer_size(std::error_code& ec) const noexcept
487 {
488 3 int size = 0;
489 3 socklen_t len = sizeof(size);
490
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 3 times.
3 if (::getsockopt(fd_, SOL_SOCKET, SO_RCVBUF, &size, &len) != 0)
491 {
492 ec = make_err(errno);
493 return 0;
494 }
495 3 ec = {};
496 3 return size;
497 }
498
499 std::error_code
500 1 epoll_socket_impl::
501 set_send_buffer_size(int size) noexcept
502 {
503
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 1 time.
1 if (::setsockopt(fd_, SOL_SOCKET, SO_SNDBUF, &size, sizeof(size)) != 0)
504 return make_err(errno);
505 1 return {};
506 }
507
508 int
509 3 epoll_socket_impl::
510 send_buffer_size(std::error_code& ec) const noexcept
511 {
512 3 int size = 0;
513 3 socklen_t len = sizeof(size);
514
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 3 times.
3 if (::getsockopt(fd_, SOL_SOCKET, SO_SNDBUF, &size, &len) != 0)
515 {
516 ec = make_err(errno);
517 return 0;
518 }
519 3 ec = {};
520 3 return size;
521 }
522
523 std::error_code
524 8 epoll_socket_impl::
525 set_linger(bool enabled, int timeout) noexcept
526 {
527
2/2
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 7 times.
8 if (timeout < 0)
528 1 return make_err(EINVAL);
529 struct ::linger lg;
530
2/2
✓ Branch 0 taken 6 times.
✓ Branch 1 taken 1 time.
7 lg.l_onoff = enabled ? 1 : 0;
531 7 lg.l_linger = timeout;
532
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 7 times.
7 if (::setsockopt(fd_, SOL_SOCKET, SO_LINGER, &lg, sizeof(lg)) != 0)
533 return make_err(errno);
534 7 return {};
535 }
536
537 tcp_socket::linger_options
538 3 epoll_socket_impl::
539 linger(std::error_code& ec) const noexcept
540 {
541 3 struct ::linger lg{};
542 3 socklen_t len = sizeof(lg);
543
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 3 times.
3 if (::getsockopt(fd_, SOL_SOCKET, SO_LINGER, &lg, &len) != 0)
544 {
545 ec = make_err(errno);
546 return {};
547 }
548 3 ec = {};
549 3 return {.enabled = lg.l_onoff != 0, .timeout = lg.l_linger};
550 }
551
552 void
553 8147 epoll_socket_impl::
554 cancel() noexcept
555 {
556 8147 std::shared_ptr<epoll_socket_impl> self;
557 try {
558
1/1
✓ Branch 1 taken 8147 times.
8147 self = shared_from_this();
559 } catch (const std::bad_weak_ptr&) {
560 return;
561 }
562
563 8147 conn_.request_cancel();
564 8147 rd_.request_cancel();
565 8147 wr_.request_cancel();
566
567 8147 epoll_op* conn_claimed = nullptr;
568 8147 epoll_op* rd_claimed = nullptr;
569 8147 epoll_op* wr_claimed = nullptr;
570 {
571 8147 std::lock_guard lock(desc_state_.mutex);
572
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8147 times.
8147 if (desc_state_.connect_op == &conn_)
573 conn_claimed = std::exchange(desc_state_.connect_op, nullptr);
574
2/2
✓ Branch 0 taken 51 times.
✓ Branch 1 taken 8096 times.
8147 if (desc_state_.read_op == &rd_)
575 51 rd_claimed = std::exchange(desc_state_.read_op, nullptr);
576
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8147 times.
8147 if (desc_state_.write_op == &wr_)
577 wr_claimed = std::exchange(desc_state_.write_op, nullptr);
578 8147 }
579
580
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8147 times.
8147 if (conn_claimed)
581 {
582 conn_.impl_ptr = self;
583 svc_.post(&conn_);
584 svc_.work_finished();
585 }
586
2/2
✓ Branch 0 taken 51 times.
✓ Branch 1 taken 8096 times.
8147 if (rd_claimed)
587 {
588 51 rd_.impl_ptr = self;
589 51 svc_.post(&rd_);
590 51 svc_.work_finished();
591 }
592
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8147 times.
8147 if (wr_claimed)
593 {
594 wr_.impl_ptr = self;
595 svc_.post(&wr_);
596 svc_.work_finished();
597 }
598 8147 }
599
600 void
601 98 epoll_socket_impl::
602 cancel_single_op(epoll_op& op) noexcept
603 {
604 98 op.request_cancel();
605
606 98 epoll_op** desc_op_ptr = nullptr;
607
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 98 times.
98 if (&op == &conn_) desc_op_ptr = &desc_state_.connect_op;
608
1/2
✓ Branch 0 taken 98 times.
✗ Branch 1 not taken.
98 else if (&op == &rd_) desc_op_ptr = &desc_state_.read_op;
609 else if (&op == &wr_) desc_op_ptr = &desc_state_.write_op;
610
611
1/2
✓ Branch 0 taken 98 times.
✗ Branch 1 not taken.
98 if (desc_op_ptr)
612 {
613 98 epoll_op* claimed = nullptr;
614 {
615 98 std::lock_guard lock(desc_state_.mutex);
616
1/2
✓ Branch 0 taken 98 times.
✗ Branch 1 not taken.
98 if (*desc_op_ptr == &op)
617 98 claimed = std::exchange(*desc_op_ptr, nullptr);
618 98 }
619
1/2
✓ Branch 0 taken 98 times.
✗ Branch 1 not taken.
98 if (claimed)
620 {
621 try {
622
1/1
✓ Branch 1 taken 98 times.
98 op.impl_ptr = shared_from_this();
623 } catch (const std::bad_weak_ptr&) {}
624 98 svc_.post(&op);
625 98 svc_.work_finished();
626 }
627 }
628 98 }
629
630 void
631 8052 epoll_socket_impl::
632 close_socket() noexcept
633 {
634 8052 cancel();
635
636 // Keep impl alive if descriptor_state is queued in the scheduler.
637 // Without this, destroy_impl() drops the last shared_ptr while
638 // the queued descriptor_state node would become dangling.
639
2/2
✓ Branch 1 taken 7 times.
✓ Branch 2 taken 8045 times.
8052 if (desc_state_.is_enqueued_.load(std::memory_order_acquire))
640 {
641 try {
642
1/1
✓ Branch 1 taken 7 times.
7 desc_state_.impl_ref_ = shared_from_this();
643 } catch (std::bad_weak_ptr const&) {}
644 }
645
646
2/2
✓ Branch 0 taken 5364 times.
✓ Branch 1 taken 2688 times.
8052 if (fd_ >= 0)
647 {
648
1/2
✓ Branch 0 taken 5364 times.
✗ Branch 1 not taken.
5364 if (desc_state_.registered_events != 0)
649 5364 svc_.scheduler().deregister_descriptor(fd_);
650 5364 ::close(fd_);
651 5364 fd_ = -1;
652 }
653
654 8052 desc_state_.fd = -1;
655 {
656 8052 std::lock_guard lock(desc_state_.mutex);
657 8052 desc_state_.read_op = nullptr;
658 8052 desc_state_.write_op = nullptr;
659 8052 desc_state_.connect_op = nullptr;
660 8052 desc_state_.read_ready = false;
661 8052 desc_state_.write_ready = false;
662 8052 }
663 8052 desc_state_.registered_events = 0;
664
665 8052 local_endpoint_ = endpoint{};
666 8052 remote_endpoint_ = endpoint{};
667 8052 }
668
669 189 epoll_socket_service::
670 189 epoll_socket_service(capy::execution_context& ctx)
671
2/2
✓ Branch 2 taken 189 times.
✓ Branch 5 taken 189 times.
189 : state_(std::make_unique<epoll_socket_state>(ctx.use_service<epoll_scheduler>()))
672 {
673 189 }
674
675 378 epoll_socket_service::
676 189 ~epoll_socket_service()
677 {
678 378 }
679
680 void
681 189 epoll_socket_service::
682 shutdown()
683 {
684
1/1
✓ Branch 2 taken 189 times.
189 std::lock_guard lock(state_->mutex_);
685
686
1/2
✗ Branch 2 not taken.
✓ Branch 3 taken 189 times.
189 while (auto* impl = state_->socket_list_.pop_front())
687 impl->close_socket();
688
689 189 state_->socket_ptrs_.clear();
690 189 }
691
692 tcp_socket::socket_impl&
693 5364 epoll_socket_service::
694 create_impl()
695 {
696
1/1
✓ Branch 1 taken 5364 times.
5364 auto impl = std::make_shared<epoll_socket_impl>(*this);
697 5364 auto* raw = impl.get();
698
699 {
700
1/1
✓ Branch 2 taken 5364 times.
5364 std::lock_guard lock(state_->mutex_);
701 5364 state_->socket_list_.push_back(raw);
702
1/1
✓ Branch 3 taken 5364 times.
5364 state_->socket_ptrs_.emplace(raw, std::move(impl));
703 5364 }
704
705 5364 return *raw;
706 5364 }
707
708 void
709 5364 epoll_socket_service::
710 destroy_impl(tcp_socket::socket_impl& impl)
711 {
712 5364 auto* epoll_impl = static_cast<epoll_socket_impl*>(&impl);
713
1/1
✓ Branch 2 taken 5364 times.
5364 std::lock_guard lock(state_->mutex_);
714 5364 state_->socket_list_.remove(epoll_impl);
715
1/1
✓ Branch 2 taken 5364 times.
5364 state_->socket_ptrs_.erase(epoll_impl);
716 5364 }
717
718 std::error_code
719 2688 epoll_socket_service::
720 open_socket(tcp_socket::socket_impl& impl)
721 {
722 2688 auto* epoll_impl = static_cast<epoll_socket_impl*>(&impl);
723 2688 epoll_impl->close_socket();
724
725 2688 int fd = ::socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, 0);
726
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2688 times.
2688 if (fd < 0)
727 return make_err(errno);
728
729 2688 epoll_impl->fd_ = fd;
730
731 // Register fd with epoll (edge-triggered mode)
732 2688 epoll_impl->desc_state_.fd = fd;
733 {
734
1/1
✓ Branch 1 taken 2688 times.
2688 std::lock_guard lock(epoll_impl->desc_state_.mutex);
735 2688 epoll_impl->desc_state_.read_op = nullptr;
736 2688 epoll_impl->desc_state_.write_op = nullptr;
737 2688 epoll_impl->desc_state_.connect_op = nullptr;
738 2688 }
739 2688 scheduler().register_descriptor(fd, &epoll_impl->desc_state_);
740
741 2688 return {};
742 }
743
744 void
745 80203 epoll_socket_service::
746 post(epoll_op* op)
747 {
748 80203 state_->sched_.post(op);
749 80203 }
750
751 void
752 2877 epoll_socket_service::
753 work_started() noexcept
754 {
755 2877 state_->sched_.work_started();
756 2877 }
757
758 void
759 149 epoll_socket_service::
760 work_finished() noexcept
761 {
762 149 state_->sched_.work_finished();
763 149 }
764
765 } // namespace boost::corosio::detail
766
767 #endif // BOOST_COROSIO_HAS_EPOLL
768