libs/corosio/src/corosio/src/detail/select/sockets.cpp

73.5% Lines (274/373) 94.1% Functions (32/34) 57.4% Branches (113/197)
libs/corosio/src/corosio/src/detail/select/sockets.cpp
Line Branch Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #include <boost/corosio/detail/platform.hpp>
11
12 #if BOOST_COROSIO_HAS_SELECT
13
14 #include "src/detail/select/sockets.hpp"
15 #include "src/detail/endpoint_convert.hpp"
16 #include "src/detail/dispatch_coro.hpp"
17 #include "src/detail/make_err.hpp"
18
19 #include <boost/capy/buffers.hpp>
20
21 #include <errno.h>
22 #include <fcntl.h>
23 #include <netinet/in.h>
24 #include <netinet/tcp.h>
25 #include <sys/socket.h>
26 #include <unistd.h>
27
28 namespace boost::corosio::detail {
29
30 void
31 97 select_op::canceller::
32 operator()() const noexcept
33 {
34 97 op->cancel();
35 97 }
36
37 void
38 select_connect_op::
39 cancel() noexcept
40 {
41 if (socket_impl_)
42 socket_impl_->cancel_single_op(*this);
43 else
44 request_cancel();
45 }
46
47 void
48 97 select_read_op::
49 cancel() noexcept
50 {
51
1/2
✓ Branch 0 taken 97 times.
✗ Branch 1 not taken.
97 if (socket_impl_)
52 97 socket_impl_->cancel_single_op(*this);
53 else
54 request_cancel();
55 97 }
56
57 void
58 select_write_op::
59 cancel() noexcept
60 {
61 if (socket_impl_)
62 socket_impl_->cancel_single_op(*this);
63 else
64 request_cancel();
65 }
66
67 void
68 2043 select_connect_op::
69 operator()()
70 {
71 2043 stop_cb.reset();
72
73
3/4
✓ Branch 0 taken 2042 times.
✓ Branch 1 taken 1 time.
✓ Branch 3 taken 2042 times.
✗ Branch 4 not taken.
2043 bool success = (errn == 0 && !cancelled.load(std::memory_order_acquire));
74
75 // Cache endpoints on successful connect
76
3/4
✓ Branch 0 taken 2042 times.
✓ Branch 1 taken 1 time.
✓ Branch 2 taken 2042 times.
✗ Branch 3 not taken.
2043 if (success && socket_impl_)
77 {
78 // Query local endpoint via getsockname (may fail, but remote is always known)
79 2042 endpoint local_ep;
80 2042 sockaddr_in local_addr{};
81 2042 socklen_t local_len = sizeof(local_addr);
82
1/2
✓ Branch 1 taken 2042 times.
✗ Branch 2 not taken.
2042 if (::getsockname(fd, reinterpret_cast<sockaddr*>(&local_addr), &local_len) == 0)
83 2042 local_ep = from_sockaddr_in(local_addr);
84 // Always cache remote endpoint; local may be default if getsockname failed
85 2042 static_cast<select_socket_impl*>(socket_impl_)->set_endpoints(local_ep, target_endpoint);
86 }
87
88
1/2
✓ Branch 0 taken 2043 times.
✗ Branch 1 not taken.
2043 if (ec_out)
89 {
90
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 2043 times.
2043 if (cancelled.load(std::memory_order_acquire))
91 *ec_out = capy::error::canceled;
92
2/2
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 2042 times.
2043 else if (errn != 0)
93 1 *ec_out = make_err(errn);
94 else
95 2042 *ec_out = {};
96 }
97
98
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2043 times.
2043 if (bytes_out)
99 *bytes_out = bytes_transferred;
100
101 // Move to stack before destroying the frame
102 2043 capy::executor_ref saved_ex( std::move( ex ) );
103 2043 std::coroutine_handle<> saved_h( std::move( h ) );
104 2043 impl_ptr.reset();
105
2/2
✓ Branch 1 taken 2043 times.
✓ Branch 4 taken 2043 times.
2043 dispatch_coro(saved_ex, saved_h).resume();
106 2043 }
107
108 4096 select_socket_impl::
109 4096 select_socket_impl(select_socket_service& svc) noexcept
110 4096 : svc_(svc)
111 {
112 4096 }
113
114 void
115 4096 select_socket_impl::
116 release()
117 {
118 4096 close_socket();
119 4096 svc_.destroy_impl(*this);
120 4096 }
121
122 std::coroutine_handle<>
123 2043 select_socket_impl::
124 connect(
125 std::coroutine_handle<> h,
126 capy::executor_ref ex,
127 endpoint ep,
128 std::stop_token token,
129 std::error_code* ec)
130 {
131 2043 auto& op = conn_;
132 2043 op.reset();
133 2043 op.h = h;
134 2043 op.ex = ex;
135 2043 op.ec_out = ec;
136 2043 op.fd = fd_;
137 2043 op.target_endpoint = ep; // Store target for endpoint caching
138 2043 op.start(token, this);
139
140 2043 sockaddr_in addr = detail::to_sockaddr_in(ep);
141
1/1
✓ Branch 1 taken 2043 times.
2043 int result = ::connect(fd_, reinterpret_cast<sockaddr*>(&addr), sizeof(addr));
142
143
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2043 times.
2043 if (result == 0)
144 {
145 // Sync success - cache endpoints immediately
146 sockaddr_in local_addr{};
147 socklen_t local_len = sizeof(local_addr);
148 if (::getsockname(fd_, reinterpret_cast<sockaddr*>(&local_addr), &local_len) == 0)
149 local_endpoint_ = detail::from_sockaddr_in(local_addr);
150 remote_endpoint_ = ep;
151
152 op.complete(0, 0);
153 op.impl_ptr = shared_from_this();
154 svc_.post(&op);
155 // completion is always posted to scheduler queue, never inline.
156 return std::noop_coroutine();
157 }
158
159
1/2
✓ Branch 0 taken 2043 times.
✗ Branch 1 not taken.
2043 if (errno == EINPROGRESS)
160 {
161 2043 svc_.work_started();
162
1/1
✓ Branch 1 taken 2043 times.
2043 op.impl_ptr = shared_from_this();
163
164 // Set registering BEFORE register_fd to close the race window where
165 // reactor sees an event before we set registered. The reactor treats
166 // registering the same as registered when claiming the op.
167 2043 op.registered.store(select_registration_state::registering, std::memory_order_release);
168
1/1
✓ Branch 2 taken 2043 times.
2043 svc_.scheduler().register_fd(fd_, &op, select_scheduler::event_write);
169
170 // Transition to registered. If this fails, reactor or cancel already
171 // claimed the op (state is now unregistered), so we're done. However,
172 // we must still deregister the fd because cancel's deregister_fd may
173 // have run before our register_fd, leaving the fd orphaned.
174 2043 auto expected = select_registration_state::registering;
175
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 2043 times.
2043 if (!op.registered.compare_exchange_strong(
176 expected, select_registration_state::registered, std::memory_order_acq_rel))
177 {
178 svc_.scheduler().deregister_fd(fd_, select_scheduler::event_write);
179 // completion is always posted to scheduler queue, never inline.
180 return std::noop_coroutine();
181 }
182
183 // If cancelled was set before we registered, handle it now.
184
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 2043 times.
2043 if (op.cancelled.load(std::memory_order_acquire))
185 {
186 auto prev = op.registered.exchange(
187 select_registration_state::unregistered, std::memory_order_acq_rel);
188 if (prev != select_registration_state::unregistered)
189 {
190 svc_.scheduler().deregister_fd(fd_, select_scheduler::event_write);
191 op.impl_ptr = shared_from_this();
192 svc_.post(&op);
193 svc_.work_finished();
194 }
195 }
196 // completion is always posted to scheduler queue, never inline.
197 2043 return std::noop_coroutine();
198 }
199
200 op.complete(errno, 0);
201 op.impl_ptr = shared_from_this();
202 svc_.post(&op);
203 // completion is always posted to scheduler queue, never inline.
204 return std::noop_coroutine();
205 }
206
207 std::coroutine_handle<>
208 86042 select_socket_impl::
209 read_some(
210 std::coroutine_handle<> h,
211 capy::executor_ref ex,
212 io_buffer_param param,
213 std::stop_token token,
214 std::error_code* ec,
215 std::size_t* bytes_out)
216 {
217 86042 auto& op = rd_;
218 86042 op.reset();
219 86042 op.h = h;
220 86042 op.ex = ex;
221 86042 op.ec_out = ec;
222 86042 op.bytes_out = bytes_out;
223 86042 op.fd = fd_;
224 86042 op.start(token, this);
225
226 86042 capy::mutable_buffer bufs[select_read_op::max_buffers];
227 86042 op.iovec_count = static_cast<int>(param.copy_to(bufs, select_read_op::max_buffers));
228
229
6/8
✓ Branch 0 taken 86041 times.
✓ Branch 1 taken 1 time.
✓ Branch 2 taken 86041 times.
✗ Branch 3 not taken.
✗ Branch 5 not taken.
✓ Branch 6 taken 86041 times.
✓ Branch 7 taken 1 time.
✓ Branch 8 taken 86041 times.
86042 if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
230 {
231 1 op.empty_buffer_read = true;
232 1 op.complete(0, 0);
233
1/1
✓ Branch 1 taken 1 time.
1 op.impl_ptr = shared_from_this();
234
1/1
✓ Branch 1 taken 1 time.
1 svc_.post(&op);
235 1 return std::noop_coroutine();
236 }
237
238
2/2
✓ Branch 0 taken 86041 times.
✓ Branch 1 taken 86041 times.
172082 for (int i = 0; i < op.iovec_count; ++i)
239 {
240 86041 op.iovecs[i].iov_base = bufs[i].data();
241 86041 op.iovecs[i].iov_len = bufs[i].size();
242 }
243
244
1/1
✓ Branch 1 taken 86041 times.
86041 ssize_t n = ::readv(fd_, op.iovecs, op.iovec_count);
245
246
2/2
✓ Branch 0 taken 85761 times.
✓ Branch 1 taken 280 times.
86041 if (n > 0)
247 {
248 85761 op.complete(0, static_cast<std::size_t>(n));
249
1/1
✓ Branch 1 taken 85761 times.
85761 op.impl_ptr = shared_from_this();
250
1/1
✓ Branch 1 taken 85761 times.
85761 svc_.post(&op);
251 85761 return std::noop_coroutine();
252 }
253
254
2/2
✓ Branch 0 taken 5 times.
✓ Branch 1 taken 275 times.
280 if (n == 0)
255 {
256 5 op.complete(0, 0);
257
1/1
✓ Branch 1 taken 5 times.
5 op.impl_ptr = shared_from_this();
258
1/1
✓ Branch 1 taken 5 times.
5 svc_.post(&op);
259 5 return std::noop_coroutine();
260 }
261
262
1/4
✗ Branch 0 not taken.
✓ Branch 1 taken 275 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
275 if (errno == EAGAIN || errno == EWOULDBLOCK)
263 {
264 275 svc_.work_started();
265
1/1
✓ Branch 1 taken 275 times.
275 op.impl_ptr = shared_from_this();
266
267 // Set registering BEFORE register_fd to close the race window where
268 // reactor sees an event before we set registered.
269 275 op.registered.store(select_registration_state::registering, std::memory_order_release);
270
1/1
✓ Branch 2 taken 275 times.
275 svc_.scheduler().register_fd(fd_, &op, select_scheduler::event_read);
271
272 // Transition to registered. If this fails, reactor or cancel already
273 // claimed the op (state is now unregistered), so we're done. However,
274 // we must still deregister the fd because cancel's deregister_fd may
275 // have run before our register_fd, leaving the fd orphaned.
276 275 auto expected = select_registration_state::registering;
277
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 275 times.
275 if (!op.registered.compare_exchange_strong(
278 expected, select_registration_state::registered, std::memory_order_acq_rel))
279 {
280 svc_.scheduler().deregister_fd(fd_, select_scheduler::event_read);
281 return std::noop_coroutine();
282 }
283
284 // If cancelled was set before we registered, handle it now.
285
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 275 times.
275 if (op.cancelled.load(std::memory_order_acquire))
286 {
287 auto prev = op.registered.exchange(
288 select_registration_state::unregistered, std::memory_order_acq_rel);
289 if (prev != select_registration_state::unregistered)
290 {
291 svc_.scheduler().deregister_fd(fd_, select_scheduler::event_read);
292 op.impl_ptr = shared_from_this();
293 svc_.post(&op);
294 svc_.work_finished();
295 }
296 }
297 275 return std::noop_coroutine();
298 }
299
300 op.complete(errno, 0);
301 op.impl_ptr = shared_from_this();
302 svc_.post(&op);
303 return std::noop_coroutine();
304 }
305
306 std::coroutine_handle<>
307 85883 select_socket_impl::
308 write_some(
309 std::coroutine_handle<> h,
310 capy::executor_ref ex,
311 io_buffer_param param,
312 std::stop_token token,
313 std::error_code* ec,
314 std::size_t* bytes_out)
315 {
316 85883 auto& op = wr_;
317 85883 op.reset();
318 85883 op.h = h;
319 85883 op.ex = ex;
320 85883 op.ec_out = ec;
321 85883 op.bytes_out = bytes_out;
322 85883 op.fd = fd_;
323 85883 op.start(token, this);
324
325 85883 capy::mutable_buffer bufs[select_write_op::max_buffers];
326 85883 op.iovec_count = static_cast<int>(param.copy_to(bufs, select_write_op::max_buffers));
327
328
6/8
✓ Branch 0 taken 85882 times.
✓ Branch 1 taken 1 time.
✓ Branch 2 taken 85882 times.
✗ Branch 3 not taken.
✗ Branch 5 not taken.
✓ Branch 6 taken 85882 times.
✓ Branch 7 taken 1 time.
✓ Branch 8 taken 85882 times.
85883 if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
329 {
330 1 op.complete(0, 0);
331
1/1
✓ Branch 1 taken 1 time.
1 op.impl_ptr = shared_from_this();
332
1/1
✓ Branch 1 taken 1 time.
1 svc_.post(&op);
333 1 return std::noop_coroutine();
334 }
335
336
2/2
✓ Branch 0 taken 85882 times.
✓ Branch 1 taken 85882 times.
171764 for (int i = 0; i < op.iovec_count; ++i)
337 {
338 85882 op.iovecs[i].iov_base = bufs[i].data();
339 85882 op.iovecs[i].iov_len = bufs[i].size();
340 }
341
342 85882 msghdr msg{};
343 85882 msg.msg_iov = op.iovecs;
344 85882 msg.msg_iovlen = static_cast<std::size_t>(op.iovec_count);
345
346
1/1
✓ Branch 1 taken 85882 times.
85882 ssize_t n = ::sendmsg(fd_, &msg, MSG_NOSIGNAL);
347
348
2/2
✓ Branch 0 taken 85881 times.
✓ Branch 1 taken 1 time.
85882 if (n > 0)
349 {
350 85881 op.complete(0, static_cast<std::size_t>(n));
351
1/1
✓ Branch 1 taken 85881 times.
85881 op.impl_ptr = shared_from_this();
352
1/1
✓ Branch 1 taken 85881 times.
85881 svc_.post(&op);
353 85881 return std::noop_coroutine();
354 }
355
356
2/4
✓ Branch 0 taken 1 time.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 1 time.
1 if (errno == EAGAIN || errno == EWOULDBLOCK)
357 {
358 svc_.work_started();
359 op.impl_ptr = shared_from_this();
360
361 // Set registering BEFORE register_fd to close the race window where
362 // reactor sees an event before we set registered.
363 op.registered.store(select_registration_state::registering, std::memory_order_release);
364 svc_.scheduler().register_fd(fd_, &op, select_scheduler::event_write);
365
366 // Transition to registered. If this fails, reactor or cancel already
367 // claimed the op (state is now unregistered), so we're done. However,
368 // we must still deregister the fd because cancel's deregister_fd may
369 // have run before our register_fd, leaving the fd orphaned.
370 auto expected = select_registration_state::registering;
371 if (!op.registered.compare_exchange_strong(
372 expected, select_registration_state::registered, std::memory_order_acq_rel))
373 {
374 svc_.scheduler().deregister_fd(fd_, select_scheduler::event_write);
375 return std::noop_coroutine();
376 }
377
378 // If cancelled was set before we registered, handle it now.
379 if (op.cancelled.load(std::memory_order_acquire))
380 {
381 auto prev = op.registered.exchange(
382 select_registration_state::unregistered, std::memory_order_acq_rel);
383 if (prev != select_registration_state::unregistered)
384 {
385 svc_.scheduler().deregister_fd(fd_, select_scheduler::event_write);
386 op.impl_ptr = shared_from_this();
387 svc_.post(&op);
388 svc_.work_finished();
389 }
390 }
391 return std::noop_coroutine();
392 }
393
394
1/2
✓ Branch 0 taken 1 time.
✗ Branch 1 not taken.
1 op.complete(errno ? errno : EIO, 0);
395
1/1
✓ Branch 1 taken 1 time.
1 op.impl_ptr = shared_from_this();
396
1/1
✓ Branch 1 taken 1 time.
1 svc_.post(&op);
397 1 return std::noop_coroutine();
398 }
399
400 std::error_code
401 3 select_socket_impl::
402 shutdown(tcp_socket::shutdown_type what) noexcept
403 {
404 int how;
405
3/4
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 1 time.
✓ Branch 2 taken 1 time.
✗ Branch 3 not taken.
3 switch (what)
406 {
407 1 case tcp_socket::shutdown_receive: how = SHUT_RD; break;
408 1 case tcp_socket::shutdown_send: how = SHUT_WR; break;
409 1 case tcp_socket::shutdown_both: how = SHUT_RDWR; break;
410 default:
411 return make_err(EINVAL);
412 }
413
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 3 times.
3 if (::shutdown(fd_, how) != 0)
414 return make_err(errno);
415 3 return {};
416 }
417
418 std::error_code
419 5 select_socket_impl::
420 set_no_delay(bool value) noexcept
421 {
422
2/2
✓ Branch 0 taken 4 times.
✓ Branch 1 taken 1 time.
5 int flag = value ? 1 : 0;
423
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 5 times.
5 if (::setsockopt(fd_, IPPROTO_TCP, TCP_NODELAY, &flag, sizeof(flag)) != 0)
424 return make_err(errno);
425 5 return {};
426 }
427
428 bool
429 5 select_socket_impl::
430 no_delay(std::error_code& ec) const noexcept
431 {
432 5 int flag = 0;
433 5 socklen_t len = sizeof(flag);
434
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 5 times.
5 if (::getsockopt(fd_, IPPROTO_TCP, TCP_NODELAY, &flag, &len) != 0)
435 {
436 ec = make_err(errno);
437 return false;
438 }
439 5 ec = {};
440 5 return flag != 0;
441 }
442
443 std::error_code
444 4 select_socket_impl::
445 set_keep_alive(bool value) noexcept
446 {
447
2/2
✓ Branch 0 taken 3 times.
✓ Branch 1 taken 1 time.
4 int flag = value ? 1 : 0;
448
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 4 times.
4 if (::setsockopt(fd_, SOL_SOCKET, SO_KEEPALIVE, &flag, sizeof(flag)) != 0)
449 return make_err(errno);
450 4 return {};
451 }
452
453 bool
454 4 select_socket_impl::
455 keep_alive(std::error_code& ec) const noexcept
456 {
457 4 int flag = 0;
458 4 socklen_t len = sizeof(flag);
459
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 4 times.
4 if (::getsockopt(fd_, SOL_SOCKET, SO_KEEPALIVE, &flag, &len) != 0)
460 {
461 ec = make_err(errno);
462 return false;
463 }
464 4 ec = {};
465 4 return flag != 0;
466 }
467
468 std::error_code
469 1 select_socket_impl::
470 set_receive_buffer_size(int size) noexcept
471 {
472
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 1 time.
1 if (::setsockopt(fd_, SOL_SOCKET, SO_RCVBUF, &size, sizeof(size)) != 0)
473 return make_err(errno);
474 1 return {};
475 }
476
477 int
478 3 select_socket_impl::
479 receive_buffer_size(std::error_code& ec) const noexcept
480 {
481 3 int size = 0;
482 3 socklen_t len = sizeof(size);
483
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 3 times.
3 if (::getsockopt(fd_, SOL_SOCKET, SO_RCVBUF, &size, &len) != 0)
484 {
485 ec = make_err(errno);
486 return 0;
487 }
488 3 ec = {};
489 3 return size;
490 }
491
492 std::error_code
493 1 select_socket_impl::
494 set_send_buffer_size(int size) noexcept
495 {
496
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 1 time.
1 if (::setsockopt(fd_, SOL_SOCKET, SO_SNDBUF, &size, sizeof(size)) != 0)
497 return make_err(errno);
498 1 return {};
499 }
500
501 int
502 3 select_socket_impl::
503 send_buffer_size(std::error_code& ec) const noexcept
504 {
505 3 int size = 0;
506 3 socklen_t len = sizeof(size);
507
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 3 times.
3 if (::getsockopt(fd_, SOL_SOCKET, SO_SNDBUF, &size, &len) != 0)
508 {
509 ec = make_err(errno);
510 return 0;
511 }
512 3 ec = {};
513 3 return size;
514 }
515
516 std::error_code
517 4 select_socket_impl::
518 set_linger(bool enabled, int timeout) noexcept
519 {
520
2/2
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 3 times.
4 if (timeout < 0)
521 1 return make_err(EINVAL);
522 struct ::linger lg;
523
2/2
✓ Branch 0 taken 2 times.
✓ Branch 1 taken 1 time.
3 lg.l_onoff = enabled ? 1 : 0;
524 3 lg.l_linger = timeout;
525
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 3 times.
3 if (::setsockopt(fd_, SOL_SOCKET, SO_LINGER, &lg, sizeof(lg)) != 0)
526 return make_err(errno);
527 3 return {};
528 }
529
530 tcp_socket::linger_options
531 3 select_socket_impl::
532 linger(std::error_code& ec) const noexcept
533 {
534 3 struct ::linger lg{};
535 3 socklen_t len = sizeof(lg);
536
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 3 times.
3 if (::getsockopt(fd_, SOL_SOCKET, SO_LINGER, &lg, &len) != 0)
537 {
538 ec = make_err(errno);
539 return {};
540 }
541 3 ec = {};
542 3 return {.enabled = lg.l_onoff != 0, .timeout = lg.l_linger};
543 }
544
545 void
546 6321 select_socket_impl::
547 cancel() noexcept
548 {
549 6321 std::shared_ptr<select_socket_impl> self;
550 try {
551
1/1
✓ Branch 1 taken 6321 times.
6321 self = shared_from_this();
552 } catch (const std::bad_weak_ptr&) {
553 return;
554 }
555
556 18963 auto cancel_op = [this, &self](select_op& op, int events) {
557 18963 auto prev = op.registered.exchange(
558 select_registration_state::unregistered, std::memory_order_acq_rel);
559 18963 op.request_cancel();
560
2/2
✓ Branch 0 taken 90 times.
✓ Branch 1 taken 18873 times.
18963 if (prev != select_registration_state::unregistered)
561 {
562 90 svc_.scheduler().deregister_fd(fd_, events);
563 90 op.impl_ptr = self;
564 90 svc_.post(&op);
565 90 svc_.work_finished();
566 }
567 25284 };
568
569 6321 cancel_op(conn_, select_scheduler::event_write);
570 6321 cancel_op(rd_, select_scheduler::event_read);
571 6321 cancel_op(wr_, select_scheduler::event_write);
572 6321 }
573
574 void
575 97 select_socket_impl::
576 cancel_single_op(select_op& op) noexcept
577 {
578 // Called from stop_token callback to cancel a specific pending operation.
579 97 auto prev = op.registered.exchange(
580 select_registration_state::unregistered, std::memory_order_acq_rel);
581 97 op.request_cancel();
582
583
2/2
✓ Branch 0 taken 65 times.
✓ Branch 1 taken 32 times.
97 if (prev != select_registration_state::unregistered)
584 {
585 // Determine which event type to deregister
586 65 int events = 0;
587
2/4
✓ Branch 0 taken 65 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 65 times.
65 if (&op == &conn_ || &op == &wr_)
588 events = select_scheduler::event_write;
589
1/2
✓ Branch 0 taken 65 times.
✗ Branch 1 not taken.
65 else if (&op == &rd_)
590 65 events = select_scheduler::event_read;
591
592 65 svc_.scheduler().deregister_fd(fd_, events);
593
594 // Keep impl alive until op completes
595 try {
596
1/1
✓ Branch 1 taken 65 times.
65 op.impl_ptr = shared_from_this();
597 } catch (const std::bad_weak_ptr&) {
598 // Impl is being destroyed, op will be orphaned but that's ok
599 }
600
601 65 svc_.post(&op);
602 65 svc_.work_finished();
603 }
604 97 }
605
606 void
607 6150 select_socket_impl::
608 close_socket() noexcept
609 {
610 6150 cancel();
611
612
2/2
✓ Branch 0 taken 4096 times.
✓ Branch 1 taken 2054 times.
6150 if (fd_ >= 0)
613 {
614 // Unconditionally remove from registered_fds_ to handle edge cases
615 // where the fd might be registered but cancel() didn't clean it up
616 // due to race conditions.
617 4096 svc_.scheduler().deregister_fd(fd_,
618 select_scheduler::event_read | select_scheduler::event_write);
619 4096 ::close(fd_);
620 4096 fd_ = -1;
621 }
622
623 // Clear cached endpoints
624 6150 local_endpoint_ = endpoint{};
625 6150 remote_endpoint_ = endpoint{};
626 6150 }
627
628 120 select_socket_service::
629 120 select_socket_service(capy::execution_context& ctx)
630
2/2
✓ Branch 2 taken 120 times.
✓ Branch 5 taken 120 times.
120 : state_(std::make_unique<select_socket_state>(ctx.use_service<select_scheduler>()))
631 {
632 120 }
633
634 240 select_socket_service::
635 120 ~select_socket_service()
636 {
637 240 }
638
639 void
640 120 select_socket_service::
641 shutdown()
642 {
643
1/1
✓ Branch 2 taken 120 times.
120 std::lock_guard lock(state_->mutex_);
644
645
1/2
✗ Branch 2 not taken.
✓ Branch 3 taken 120 times.
120 while (auto* impl = state_->socket_list_.pop_front())
646 impl->close_socket();
647
648 120 state_->socket_ptrs_.clear();
649 120 }
650
651 tcp_socket::socket_impl&
652 4096 select_socket_service::
653 create_impl()
654 {
655
1/1
✓ Branch 1 taken 4096 times.
4096 auto impl = std::make_shared<select_socket_impl>(*this);
656 4096 auto* raw = impl.get();
657
658 {
659
1/1
✓ Branch 2 taken 4096 times.
4096 std::lock_guard lock(state_->mutex_);
660 4096 state_->socket_list_.push_back(raw);
661
1/1
✓ Branch 3 taken 4096 times.
4096 state_->socket_ptrs_.emplace(raw, std::move(impl));
662 4096 }
663
664 4096 return *raw;
665 4096 }
666
667 void
668 4096 select_socket_service::
669 destroy_impl(tcp_socket::socket_impl& impl)
670 {
671 4096 auto* select_impl = static_cast<select_socket_impl*>(&impl);
672
1/1
✓ Branch 2 taken 4096 times.
4096 std::lock_guard lock(state_->mutex_);
673 4096 state_->socket_list_.remove(select_impl);
674
1/1
✓ Branch 2 taken 4096 times.
4096 state_->socket_ptrs_.erase(select_impl);
675 4096 }
676
677 std::error_code
678 2054 select_socket_service::
679 open_socket(tcp_socket::socket_impl& impl)
680 {
681 2054 auto* select_impl = static_cast<select_socket_impl*>(&impl);
682 2054 select_impl->close_socket();
683
684 2054 int fd = ::socket(AF_INET, SOCK_STREAM, 0);
685
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2054 times.
2054 if (fd < 0)
686 return make_err(errno);
687
688 // Set non-blocking and close-on-exec
689 2054 int flags = ::fcntl(fd, F_GETFL, 0);
690
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2054 times.
2054 if (flags == -1)
691 {
692 int errn = errno;
693 ::close(fd);
694 return make_err(errn);
695 }
696
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 2054 times.
2054 if (::fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1)
697 {
698 int errn = errno;
699 ::close(fd);
700 return make_err(errn);
701 }
702
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 2054 times.
2054 if (::fcntl(fd, F_SETFD, FD_CLOEXEC) == -1)
703 {
704 int errn = errno;
705 ::close(fd);
706 return make_err(errn);
707 }
708
709 // Check fd is within select() limits
710
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2054 times.
2054 if (fd >= FD_SETSIZE)
711 {
712 ::close(fd);
713 return make_err(EMFILE); // Too many open files
714 }
715
716 2054 select_impl->fd_ = fd;
717 2054 return {};
718 }
719
720 void
721 171805 select_socket_service::
722 post(select_op* op)
723 {
724 171805 state_->sched_.post(op);
725 171805 }
726
727 void
728 2318 select_socket_service::
729 work_started() noexcept
730 {
731 2318 state_->sched_.work_started();
732 2318 }
733
734 void
735 155 select_socket_service::
736 work_finished() noexcept
737 {
738 155 state_->sched_.work_finished();
739 155 }
740
741 } // namespace boost::corosio::detail
742
743 #endif // BOOST_COROSIO_HAS_SELECT
744