Line data Source code
1 : //
2 : // Copyright (c) 2026 Vinnie Falco (vinnie dot falco at gmail dot com)
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/corosio
8 : //
9 :
10 : #include <boost/corosio/tcp_server.hpp>
11 : #include <boost/corosio/detail/except.hpp>
12 : #include <condition_variable>
13 : #include <mutex>
14 : #include <utility>
15 :
16 : namespace boost::corosio {
17 :
18 : struct tcp_server::impl
19 : {
20 : std::mutex join_mutex;
21 : std::condition_variable join_cv;
22 : capy::execution_context& ctx;
23 : std::vector<tcp_acceptor> ports;
24 : std::stop_source stop;
25 :
26 7 : explicit impl(capy::execution_context& c) noexcept
27 7 : : ctx(c)
28 : {
29 7 : }
30 : };
31 :
32 : tcp_server::impl*
33 7 : tcp_server::make_impl(capy::execution_context& ctx)
34 : {
35 7 : return new impl(ctx);
36 : }
37 :
38 7 : tcp_server::~tcp_server()
39 : {
40 7 : delete impl_;
41 7 : }
42 :
43 0 : tcp_server::tcp_server(
44 0 : tcp_server&& o) noexcept
45 0 : : impl_(std::exchange(o.impl_, nullptr))
46 0 : , ex_(std::move(o.ex_))
47 0 : , waiters_(std::exchange(o.waiters_, nullptr))
48 0 : , idle_head_(std::exchange(o.idle_head_, nullptr))
49 0 : , active_head_(std::exchange(o.active_head_, nullptr))
50 0 : , active_tail_(std::exchange(o.active_tail_, nullptr))
51 0 : , active_accepts_(std::exchange(o.active_accepts_, 0))
52 0 : , storage_(std::move(o.storage_))
53 0 : , running_(std::exchange(o.running_, false))
54 : {
55 0 : }
56 :
57 : tcp_server&
58 0 : tcp_server::operator=(tcp_server&& o) noexcept
59 : {
60 0 : delete impl_;
61 0 : impl_ = std::exchange(o.impl_, nullptr);
62 0 : ex_ = std::move(o.ex_);
63 0 : waiters_ = std::exchange(o.waiters_, nullptr);
64 0 : idle_head_ = std::exchange(o.idle_head_, nullptr);
65 0 : active_head_ = std::exchange(o.active_head_, nullptr);
66 0 : active_tail_ = std::exchange(o.active_tail_, nullptr);
67 0 : active_accepts_ = std::exchange(o.active_accepts_, 0);
68 0 : storage_ = std::move(o.storage_);
69 0 : running_ = std::exchange(o.running_, false);
70 0 : return *this;
71 : }
72 :
73 : // Accept loop: wait for idle worker, accept connection, dispatch
74 : capy::task<void>
75 8 : tcp_server::do_accept(tcp_acceptor& acc)
76 : {
77 : auto st = co_await capy::this_coro::stop_token;
78 : while(! st.stop_requested())
79 : {
80 : // Wait for an idle worker before blocking on accept
81 : auto& w = co_await pop();
82 : auto [ec] = co_await acc.accept(w.socket());
83 : if(ec)
84 : {
85 : co_await push(w);
86 : continue;
87 : }
88 : w.run(launcher{*this, w});
89 : }
90 16 : }
91 :
92 : std::error_code
93 7 : tcp_server::bind(endpoint ep)
94 : {
95 7 : impl_->ports.emplace_back(impl_->ctx);
96 : // VFALCO this should return error_code
97 7 : impl_->ports.back().listen(ep);
98 7 : return {};
99 : }
100 :
101 : void
102 10 : tcp_server::
103 : start()
104 : {
105 : // Idempotent - only start if not already running
106 10 : if(running_)
107 1 : return;
108 :
109 : // Previous session must be fully stopped before restart
110 9 : if(active_accepts_ != 0)
111 1 : detail::throw_logic_error(
112 : "tcp_server::start: previous session not joined");
113 :
114 8 : running_ = true;
115 :
116 8 : impl_->stop = {}; // Fresh stop source
117 8 : auto st = impl_->stop.get_token();
118 :
119 8 : active_accepts_ = impl_->ports.size();
120 :
121 : // Launch with completion handler that decrements counter
122 16 : for(auto& t : impl_->ports)
123 16 : capy::run_async(ex_, st, [this]() {
124 8 : std::lock_guard lock(impl_->join_mutex);
125 8 : if(--active_accepts_ == 0)
126 8 : impl_->join_cv.notify_all();
127 16 : })(do_accept(t));
128 8 : }
129 :
130 : void
131 10 : tcp_server::
132 : stop()
133 : {
134 : // Idempotent - only stop if running
135 10 : if(!running_)
136 2 : return;
137 8 : running_ = false;
138 :
139 : // Stop accept loops
140 8 : impl_->stop.request_stop();
141 :
142 : // Launch cancellation coroutine on server executor
143 8 : capy::run_async(ex_, std::stop_token{})(do_stop());
144 : }
145 :
146 : void
147 4 : tcp_server::
148 : join()
149 : {
150 4 : std::unique_lock lock(impl_->join_mutex);
151 8 : impl_->join_cv.wait(lock, [this] { return active_accepts_ == 0; });
152 4 : }
153 :
154 : capy::task<>
155 8 : tcp_server::do_stop()
156 : {
157 : // Running on server executor - safe to iterate active list
158 : // Just cancel, don't modify list - workers return themselves when done
159 : for(auto* w = active_head_; w; w = w->next_)
160 : w->stop_.request_stop();
161 : co_return;
162 16 : }
163 :
164 : } // namespace boost::corosio
|