libs/corosio/src/corosio/src/detail/epoll/acceptors.cpp

79.8% Lines (178/223) 100.0% Functions (18/18) 55.7% Branches (68/122)
libs/corosio/src/corosio/src/detail/epoll/acceptors.cpp
Line Branch Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #include <boost/corosio/detail/platform.hpp>
11
12 #if BOOST_COROSIO_HAS_EPOLL
13
14 #include "src/detail/epoll/acceptors.hpp"
15 #include "src/detail/epoll/sockets.hpp"
16 #include "src/detail/endpoint_convert.hpp"
17 #include "src/detail/dispatch_coro.hpp"
18 #include "src/detail/make_err.hpp"
19
20 #include <utility>
21
22 #include <errno.h>
23 #include <netinet/in.h>
24 #include <sys/epoll.h>
25 #include <sys/socket.h>
26 #include <unistd.h>
27
28 namespace boost::corosio::detail {
29
30 void
31 6 epoll_accept_op::
32 cancel() noexcept
33 {
34
1/2
✓ Branch 0 taken 6 times.
✗ Branch 1 not taken.
6 if (acceptor_impl_)
35 6 acceptor_impl_->cancel_single_op(*this);
36 else
37 request_cancel();
38 6 }
39
40 void
41 2685 epoll_accept_op::
42 operator()()
43 {
44 2685 stop_cb.reset();
45
46 2685 static_cast<epoll_acceptor_impl*>(acceptor_impl_)
47 2685 ->service().scheduler().reset_inline_budget();
48
49
3/4
✓ Branch 0 taken 2685 times.
✗ Branch 1 not taken.
✓ Branch 3 taken 2676 times.
✓ Branch 4 taken 9 times.
2685 bool success = (errn == 0 && !cancelled.load(std::memory_order_acquire));
50
51
2/2
✓ Branch 1 taken 9 times.
✓ Branch 2 taken 2676 times.
2685 if (cancelled.load(std::memory_order_acquire))
52 9 *ec_out = capy::error::canceled;
53
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2676 times.
2676 else if (errn != 0)
54 *ec_out = make_err(errn);
55 else
56 2676 *ec_out = {};
57
58 // Set up the peer socket on success
59
4/6
✓ Branch 0 taken 2676 times.
✓ Branch 1 taken 9 times.
✓ Branch 2 taken 2676 times.
✗ Branch 3 not taken.
✓ Branch 4 taken 2676 times.
✗ Branch 5 not taken.
2685 if (success && accepted_fd >= 0 && acceptor_impl_)
60 {
61 2676 auto* socket_svc = static_cast<epoll_acceptor_impl*>(acceptor_impl_)
62 2676 ->service().socket_service();
63
1/2
✓ Branch 0 taken 2676 times.
✗ Branch 1 not taken.
2676 if (socket_svc)
64 {
65
1/1
✓ Branch 1 taken 2676 times.
2676 auto& impl = static_cast<epoll_socket_impl&>(socket_svc->create_impl());
66 2676 impl.set_socket(accepted_fd);
67
68 2676 impl.desc_state_.fd = accepted_fd;
69 {
70
1/1
✓ Branch 1 taken 2676 times.
2676 std::lock_guard lock(impl.desc_state_.mutex);
71 2676 impl.desc_state_.read_op = nullptr;
72 2676 impl.desc_state_.write_op = nullptr;
73 2676 impl.desc_state_.connect_op = nullptr;
74 2676 }
75
1/1
✓ Branch 2 taken 2676 times.
2676 socket_svc->scheduler().register_descriptor(accepted_fd, &impl.desc_state_);
76
77 2676 impl.set_endpoints(
78 2676 static_cast<epoll_acceptor_impl*>(acceptor_impl_)->local_endpoint(),
79 2676 from_sockaddr_in(peer_addr));
80
81
1/2
✓ Branch 0 taken 2676 times.
✗ Branch 1 not taken.
2676 if (impl_out)
82 2676 *impl_out = &impl;
83 2676 accepted_fd = -1;
84 }
85 else
86 {
87 // No socket service — treat as error
88 *ec_out = make_err(ENOENT);
89 success = false;
90 }
91 }
92
93
3/4
✓ Branch 0 taken 2676 times.
✓ Branch 1 taken 9 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 2676 times.
2685 if (!success || !acceptor_impl_)
94 {
95
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 9 times.
9 if (accepted_fd >= 0)
96 {
97 ::close(accepted_fd);
98 accepted_fd = -1;
99 }
100
1/2
✓ Branch 0 taken 9 times.
✗ Branch 1 not taken.
9 if (impl_out)
101 9 *impl_out = nullptr;
102 }
103
104 // Move to stack before resuming. See epoll_op::operator()() for rationale.
105 2685 capy::executor_ref saved_ex( std::move( ex ) );
106 2685 std::coroutine_handle<> saved_h( std::move( h ) );
107 2685 auto prevent_premature_destruction = std::move(impl_ptr);
108
2/2
✓ Branch 1 taken 2685 times.
✓ Branch 4 taken 2685 times.
2685 dispatch_coro(saved_ex, saved_h).resume();
109 2685 }
110
111 64 epoll_acceptor_impl::
112 64 epoll_acceptor_impl(epoll_acceptor_service& svc) noexcept
113 64 : svc_(svc)
114 {
115 64 }
116
117 void
118 64 epoll_acceptor_impl::
119 release()
120 {
121 64 close_socket();
122 64 svc_.destroy_acceptor_impl(*this);
123 64 }
124
125 std::coroutine_handle<>
126 2685 epoll_acceptor_impl::
127 accept(
128 std::coroutine_handle<> h,
129 capy::executor_ref ex,
130 std::stop_token token,
131 std::error_code* ec,
132 io_object::io_object_impl** impl_out)
133 {
134 2685 auto& op = acc_;
135 2685 op.reset();
136 2685 op.h = h;
137 2685 op.ex = ex;
138 2685 op.ec_out = ec;
139 2685 op.impl_out = impl_out;
140 2685 op.fd = fd_;
141 2685 op.start(token, this);
142
143 2685 sockaddr_in addr{};
144 2685 socklen_t addrlen = sizeof(addr);
145 int accepted;
146 do {
147
1/1
✓ Branch 1 taken 2685 times.
2685 accepted = ::accept4(fd_, reinterpret_cast<sockaddr*>(&addr),
148 &addrlen, SOCK_NONBLOCK | SOCK_CLOEXEC);
149
3/4
✓ Branch 0 taken 2683 times.
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 2683 times.
2685 } while (accepted < 0 && errno == EINTR);
150
151
2/2
✓ Branch 0 taken 2 times.
✓ Branch 1 taken 2683 times.
2685 if (accepted >= 0)
152 {
153 {
154
1/1
✓ Branch 1 taken 2 times.
2 std::lock_guard lock(desc_state_.mutex);
155 2 desc_state_.read_ready = false;
156 2 }
157
158
1/2
✗ Branch 2 not taken.
✓ Branch 3 taken 2 times.
2 if (svc_.scheduler().try_consume_inline_budget())
159 {
160 auto* socket_svc = svc_.socket_service();
161 if (socket_svc)
162 {
163 auto& impl = static_cast<epoll_socket_impl&>(socket_svc->create_impl());
164 impl.set_socket(accepted);
165
166 impl.desc_state_.fd = accepted;
167 {
168 std::lock_guard lock(impl.desc_state_.mutex);
169 impl.desc_state_.read_op = nullptr;
170 impl.desc_state_.write_op = nullptr;
171 impl.desc_state_.connect_op = nullptr;
172 }
173 socket_svc->scheduler().register_descriptor(accepted, &impl.desc_state_);
174
175 impl.set_endpoints(local_endpoint_, from_sockaddr_in(addr));
176
177 *ec = {};
178 if (impl_out)
179 *impl_out = &impl;
180 }
181 else
182 {
183 ::close(accepted);
184 *ec = make_err(ENOENT);
185 if (impl_out)
186 *impl_out = nullptr;
187 }
188 return ex.dispatch(h);
189 }
190
191 2 op.accepted_fd = accepted;
192 2 op.peer_addr = addr;
193 2 op.complete(0, 0);
194
1/1
✓ Branch 1 taken 2 times.
2 op.impl_ptr = shared_from_this();
195
1/1
✓ Branch 1 taken 2 times.
2 svc_.post(&op);
196 2 return std::noop_coroutine();
197 }
198
199
1/4
✗ Branch 0 not taken.
✓ Branch 1 taken 2683 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
2683 if (errno == EAGAIN || errno == EWOULDBLOCK)
200 {
201
1/1
✓ Branch 1 taken 2683 times.
2683 op.impl_ptr = shared_from_this();
202 2683 svc_.work_started();
203
204
1/1
✓ Branch 1 taken 2683 times.
2683 std::lock_guard lock(desc_state_.mutex);
205 2683 bool io_done = false;
206
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2683 times.
2683 if (desc_state_.read_ready)
207 {
208 desc_state_.read_ready = false;
209 op.perform_io();
210 io_done = (op.errn != EAGAIN && op.errn != EWOULDBLOCK);
211 if (!io_done)
212 op.errn = 0;
213 }
214
215
3/6
✓ Branch 0 taken 2683 times.
✗ Branch 1 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 2683 times.
✗ Branch 5 not taken.
✓ Branch 6 taken 2683 times.
2683 if (io_done || op.cancelled.load(std::memory_order_acquire))
216 {
217 svc_.post(&op);
218 svc_.work_finished();
219 }
220 else
221 {
222 2683 desc_state_.read_op = &op;
223 }
224 2683 return std::noop_coroutine();
225 2683 }
226
227 op.complete(errno, 0);
228 op.impl_ptr = shared_from_this();
229 svc_.post(&op);
230 // completion is always posted to scheduler queue, never inline.
231 return std::noop_coroutine();
232 }
233
234 void
235 129 epoll_acceptor_impl::
236 cancel() noexcept
237 {
238 129 cancel_single_op(acc_);
239 129 }
240
241 void
242 135 epoll_acceptor_impl::
243 cancel_single_op(epoll_op& op) noexcept
244 {
245 135 op.request_cancel();
246
247 135 epoll_op* claimed = nullptr;
248 {
249 135 std::lock_guard lock(desc_state_.mutex);
250
2/2
✓ Branch 0 taken 9 times.
✓ Branch 1 taken 126 times.
135 if (desc_state_.read_op == &op)
251 9 claimed = std::exchange(desc_state_.read_op, nullptr);
252 135 }
253
2/2
✓ Branch 0 taken 9 times.
✓ Branch 1 taken 126 times.
135 if (claimed)
254 {
255 try {
256
1/1
✓ Branch 1 taken 9 times.
9 op.impl_ptr = shared_from_this();
257 } catch (const std::bad_weak_ptr&) {}
258 9 svc_.post(&op);
259 9 svc_.work_finished();
260 }
261 135 }
262
263 void
264 128 epoll_acceptor_impl::
265 close_socket() noexcept
266 {
267 128 cancel();
268
269
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 128 times.
128 if (desc_state_.is_enqueued_.load(std::memory_order_acquire))
270 {
271 try {
272 desc_state_.impl_ref_ = shared_from_this();
273 } catch (std::bad_weak_ptr const&) {}
274 }
275
276
2/2
✓ Branch 0 taken 62 times.
✓ Branch 1 taken 66 times.
128 if (fd_ >= 0)
277 {
278
1/2
✓ Branch 0 taken 62 times.
✗ Branch 1 not taken.
62 if (desc_state_.registered_events != 0)
279 62 svc_.scheduler().deregister_descriptor(fd_);
280 62 ::close(fd_);
281 62 fd_ = -1;
282 }
283
284 128 desc_state_.fd = -1;
285 {
286 128 std::lock_guard lock(desc_state_.mutex);
287 128 desc_state_.read_op = nullptr;
288 128 desc_state_.read_ready = false;
289 128 desc_state_.write_ready = false;
290 128 }
291 128 desc_state_.registered_events = 0;
292
293 // Clear cached endpoint
294 128 local_endpoint_ = endpoint{};
295 128 }
296
297 189 epoll_acceptor_service::
298 189 epoll_acceptor_service(capy::execution_context& ctx)
299 189 : ctx_(ctx)
300
2/2
✓ Branch 2 taken 189 times.
✓ Branch 5 taken 189 times.
189 , state_(std::make_unique<epoll_acceptor_state>(ctx.use_service<epoll_scheduler>()))
301 {
302 189 }
303
304 378 epoll_acceptor_service::
305 189 ~epoll_acceptor_service()
306 {
307 378 }
308
309 void
310 189 epoll_acceptor_service::
311 shutdown()
312 {
313
1/1
✓ Branch 2 taken 189 times.
189 std::lock_guard lock(state_->mutex_);
314
315
1/2
✗ Branch 2 not taken.
✓ Branch 3 taken 189 times.
189 while (auto* impl = state_->acceptor_list_.pop_front())
316 impl->close_socket();
317
318 189 state_->acceptor_ptrs_.clear();
319 189 }
320
321 tcp_acceptor::acceptor_impl&
322 64 epoll_acceptor_service::
323 create_acceptor_impl()
324 {
325
1/1
✓ Branch 1 taken 64 times.
64 auto impl = std::make_shared<epoll_acceptor_impl>(*this);
326 64 auto* raw = impl.get();
327
328
1/1
✓ Branch 2 taken 64 times.
64 std::lock_guard lock(state_->mutex_);
329 64 state_->acceptor_list_.push_back(raw);
330
1/1
✓ Branch 3 taken 64 times.
64 state_->acceptor_ptrs_.emplace(raw, std::move(impl));
331
332 64 return *raw;
333 64 }
334
335 void
336 64 epoll_acceptor_service::
337 destroy_acceptor_impl(tcp_acceptor::acceptor_impl& impl)
338 {
339 64 auto* epoll_impl = static_cast<epoll_acceptor_impl*>(&impl);
340
1/1
✓ Branch 2 taken 64 times.
64 std::lock_guard lock(state_->mutex_);
341 64 state_->acceptor_list_.remove(epoll_impl);
342
1/1
✓ Branch 2 taken 64 times.
64 state_->acceptor_ptrs_.erase(epoll_impl);
343 64 }
344
345 std::error_code
346 64 epoll_acceptor_service::
347 open_acceptor(
348 tcp_acceptor::acceptor_impl& impl,
349 endpoint ep,
350 int backlog)
351 {
352 64 auto* epoll_impl = static_cast<epoll_acceptor_impl*>(&impl);
353 64 epoll_impl->close_socket();
354
355 64 int fd = ::socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, 0);
356
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 64 times.
64 if (fd < 0)
357 return make_err(errno);
358
359 64 int reuse = 1;
360 64 ::setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse));
361
362 64 sockaddr_in addr = detail::to_sockaddr_in(ep);
363
2/2
✓ Branch 1 taken 2 times.
✓ Branch 2 taken 62 times.
64 if (::bind(fd, reinterpret_cast<sockaddr*>(&addr), sizeof(addr)) < 0)
364 {
365 2 int errn = errno;
366
1/1
✓ Branch 1 taken 2 times.
2 ::close(fd);
367 2 return make_err(errn);
368 }
369
370
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 62 times.
62 if (::listen(fd, backlog) < 0)
371 {
372 int errn = errno;
373 ::close(fd);
374 return make_err(errn);
375 }
376
377 62 epoll_impl->fd_ = fd;
378
379 // Register fd with epoll (edge-triggered mode)
380 62 epoll_impl->desc_state_.fd = fd;
381 {
382
1/1
✓ Branch 1 taken 62 times.
62 std::lock_guard lock(epoll_impl->desc_state_.mutex);
383 62 epoll_impl->desc_state_.read_op = nullptr;
384 62 }
385
1/1
✓ Branch 2 taken 62 times.
62 scheduler().register_descriptor(fd, &epoll_impl->desc_state_);
386
387 // Cache the local endpoint (queries OS for ephemeral port if port was 0)
388 62 sockaddr_in local_addr{};
389 62 socklen_t local_len = sizeof(local_addr);
390
1/2
✓ Branch 1 taken 62 times.
✗ Branch 2 not taken.
62 if (::getsockname(fd, reinterpret_cast<sockaddr*>(&local_addr), &local_len) == 0)
391 62 epoll_impl->set_local_endpoint(detail::from_sockaddr_in(local_addr));
392
393 62 return {};
394 }
395
396 void
397 11 epoll_acceptor_service::
398 post(epoll_op* op)
399 {
400 11 state_->sched_.post(op);
401 11 }
402
403 void
404 2683 epoll_acceptor_service::
405 work_started() noexcept
406 {
407 2683 state_->sched_.work_started();
408 2683 }
409
410 void
411 9 epoll_acceptor_service::
412 work_finished() noexcept
413 {
414 9 state_->sched_.work_finished();
415 9 }
416
417 epoll_socket_service*
418 2676 epoll_acceptor_service::
419 socket_service() const noexcept
420 {
421 2676 auto* svc = ctx_.find_service<detail::socket_service>();
422
2/4
✓ Branch 0 taken 2676 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 2676 times.
✗ Branch 3 not taken.
2676 return svc ? dynamic_cast<epoll_socket_service*>(svc) : nullptr;
423 }
424
425 } // namespace boost::corosio::detail
426
427 #endif // BOOST_COROSIO_HAS_EPOLL
428