src/ex/detail/strand_service.cpp

96.7% Lines (88/91) 95.5% List of functions (21/22) 87.1% Branches (27/31)
f(x) Functions (22)
Function Calls Lines Branches Blocks
boost::capy::detail::strand_invoker::promise_type::operator new(unsigned long, boost::capy::detail::strand_impl&) :49 7x 100.0% 66.7% 86.0% boost::capy::detail::strand_invoker::promise_type::operator delete(void*, unsigned long) :65 7x 87.5% 50.0% 75.0% boost::capy::detail::strand_invoker::promise_type::get_return_object() :79 7x 100.0% 100.0% boost::capy::detail::strand_invoker::promise_type::initial_suspend() :82 7x 100.0% 100.0% boost::capy::detail::strand_invoker::promise_type::final_suspend() :83 7x 100.0% 100.0% boost::capy::detail::strand_invoker::promise_type::return_void() :84 7x 100.0% 100.0% boost::capy::detail::strand_invoker::promise_type::unhandled_exception() :85 0 0.0% 0.0% boost::capy::detail::strand_service_impl::strand_service_impl(boost::capy::execution_context&) :107 19x 100.0% 100.0% boost::capy::detail::strand_service_impl::get_implementation() :112 23x 100.0% 100.0% 100.0% boost::capy::detail::strand_service_impl::shutdown() :122 19x 100.0% 100.0% 100.0% boost::capy::detail::strand_service_impl::enqueue(boost::capy::detail::strand_impl&, std::__n4861::coroutine_handle<void>) :139 322x 100.0% 100.0% 82.0% boost::capy::detail::strand_service_impl::dispatch_pending(boost::capy::detail::strand_impl&) :152 13x 100.0% 100.0% 86.0% boost::capy::detail::strand_service_impl::try_unlock(boost::capy::detail::strand_impl&) :163 13x 100.0% 100.0% 100.0% boost::capy::detail::strand_service_impl::set_dispatch_thread(boost::capy::detail::strand_impl&) :175 13x 100.0% 100.0% boost::capy::detail::strand_service_impl::clear_dispatch_thread(boost::capy::detail::strand_impl&) :181 7x 100.0% 100.0% boost::capy::detail::strand_service_impl::make_invoker(boost::capy::detail::strand_impl&) :189 7x 100.0% 100.0% 55.0% boost::capy::detail::strand_service::strand_service() :209 19x 100.0% 100.0% boost::capy::detail::strand_service::~strand_service() :215 19x 100.0% 100.0% boost::capy::detail::strand_service::running_in_this_thread(boost::capy::detail::strand_impl&) :219 2x 100.0% 100.0% boost::capy::detail::strand_service::dispatch(boost::capy::detail::strand_impl&, boost::capy::executor_ref, std::__n4861::coroutine_handle<void>) :226 1x 83.3% 66.7% 93.0% boost::capy::detail::strand_service::post(boost::capy::detail::strand_impl&, boost::capy::executor_ref, std::__n4861::coroutine_handle<void>) :238 321x 100.0% 100.0% 100.0% boost::capy::detail::get_strand_service(boost::capy::execution_context&) :246 23x 100.0% 100.0%
Line Branch TLA Hits Source Code
1 //
2 // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/capy
8 //
9
10 #include "src/ex/detail/strand_queue.hpp"
11 #include <boost/capy/ex/detail/strand_service.hpp>
12 #include <atomic>
13 #include <coroutine>
14 #include <mutex>
15 #include <thread>
16 #include <utility>
17
18 namespace boost {
19 namespace capy {
20 namespace detail {
21
22 //----------------------------------------------------------
23
24 /** Implementation state for a strand.
25
26 Each strand_impl provides serialization for coroutines
27 dispatched through strands that share it.
28 */
29 struct strand_impl
30 {
31 std::mutex mutex_;
32 strand_queue pending_;
33 bool locked_ = false;
34 std::atomic<std::thread::id> dispatch_thread_{};
35 void* cached_frame_ = nullptr;
36 };
37
38 //----------------------------------------------------------
39
40 /** Invoker coroutine for strand dispatch.
41
42 Uses custom allocator to recycle frame - one allocation
43 per strand_impl lifetime, stored in trailer for recovery.
44 */
45 struct strand_invoker
46 {
47 struct promise_type
48 {
49 7x void* operator new(std::size_t n, strand_impl& impl)
50 {
51 7x constexpr auto A = alignof(strand_impl*);
52 7x std::size_t padded = (n + A - 1) & ~(A - 1);
53 7x std::size_t total = padded + sizeof(strand_impl*);
54
55 7x void* p = impl.cached_frame_
56
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7 times.
7x ? std::exchange(impl.cached_frame_, nullptr)
57
1/1
✓ Branch 1 taken 7 times.
7x : ::operator new(total);
58
59 // Trailer lets delete recover impl
60 7x *reinterpret_cast<strand_impl**>(
61 7x static_cast<char*>(p) + padded) = &impl;
62 7x return p;
63 }
64
65 7x void operator delete(void* p, std::size_t n) noexcept
66 {
67 7x constexpr auto A = alignof(strand_impl*);
68 7x std::size_t padded = (n + A - 1) & ~(A - 1);
69
70 7x auto* impl = *reinterpret_cast<strand_impl**>(
71 static_cast<char*>(p) + padded);
72
73
1/2
✓ Branch 0 taken 7 times.
✗ Branch 1 not taken.
7x if (!impl->cached_frame_)
74 7x impl->cached_frame_ = p;
75 else
76 ::operator delete(p);
77 7x }
78
79 7x strand_invoker get_return_object() noexcept
80 7x { return {std::coroutine_handle<promise_type>::from_promise(*this)}; }
81
82 7x std::suspend_always initial_suspend() noexcept { return {}; }
83 7x std::suspend_never final_suspend() noexcept { return {}; }
84 7x void return_void() noexcept {}
85 void unhandled_exception() { std::terminate(); }
86 };
87
88 std::coroutine_handle<promise_type> h_;
89 };
90
91 //----------------------------------------------------------
92
93 /** Concrete implementation of strand_service.
94
95 Holds the fixed pool of strand_impl objects.
96 */
97 class strand_service_impl : public strand_service
98 {
99 static constexpr std::size_t num_impls = 211;
100
101 strand_impl impls_[num_impls];
102 std::size_t salt_ = 0;
103 std::mutex mutex_;
104
105 public:
106 explicit
107 19x strand_service_impl(execution_context&)
108 4028x {
109 19x }
110
111 strand_impl*
112 23x get_implementation() override
113 {
114
1/1
✓ Branch 1 taken 23 times.
23x std::lock_guard<std::mutex> lock(mutex_);
115 23x std::size_t index = salt_++;
116 23x index = index % num_impls;
117 23x return &impls_[index];
118 23x }
119
120 protected:
121 void
122 19x shutdown() override
123 {
124
2/2
✓ Branch 0 taken 4009 times.
✓ Branch 1 taken 19 times.
4028x for(std::size_t i = 0; i < num_impls; ++i)
125 {
126
1/1
✓ Branch 1 taken 4009 times.
4009x std::lock_guard<std::mutex> lock(impls_[i].mutex_);
127 4009x impls_[i].locked_ = true;
128
129
2/2
✓ Branch 0 taken 7 times.
✓ Branch 1 taken 4002 times.
4009x if(impls_[i].cached_frame_)
130 {
131 7x ::operator delete(impls_[i].cached_frame_);
132 7x impls_[i].cached_frame_ = nullptr;
133 }
134 4009x }
135 19x }
136
137 private:
138 static bool
139 322x enqueue(strand_impl& impl, std::coroutine_handle<> h)
140 {
141
1/1
✓ Branch 1 taken 322 times.
322x std::lock_guard<std::mutex> lock(impl.mutex_);
142
1/1
✓ Branch 1 taken 322 times.
322x impl.pending_.push(h);
143
2/2
✓ Branch 0 taken 7 times.
✓ Branch 1 taken 315 times.
322x if(!impl.locked_)
144 {
145 7x impl.locked_ = true;
146 7x return true;
147 }
148 315x return false;
149 322x }
150
151 static void
152 13x dispatch_pending(strand_impl& impl)
153 {
154 13x strand_queue::taken_batch batch;
155 {
156
1/1
✓ Branch 1 taken 13 times.
13x std::lock_guard<std::mutex> lock(impl.mutex_);
157 13x batch = impl.pending_.take_all();
158 13x }
159
1/1
✓ Branch 1 taken 13 times.
13x impl.pending_.dispatch_batch(batch);
160 13x }
161
162 static bool
163 13x try_unlock(strand_impl& impl)
164 {
165
1/1
✓ Branch 1 taken 13 times.
13x std::lock_guard<std::mutex> lock(impl.mutex_);
166
2/2
✓ Branch 1 taken 7 times.
✓ Branch 2 taken 6 times.
13x if(impl.pending_.empty())
167 {
168 7x impl.locked_ = false;
169 7x return true;
170 }
171 6x return false;
172 13x }
173
174 static void
175 13x set_dispatch_thread(strand_impl& impl) noexcept
176 {
177 13x impl.dispatch_thread_.store(std::this_thread::get_id());
178 13x }
179
180 static void
181 7x clear_dispatch_thread(strand_impl& impl) noexcept
182 {
183 7x impl.dispatch_thread_.store(std::thread::id{});
184 7x }
185
186 // Loops until queue empty (aggressive). Alternative: per-batch fairness
187 // (repost after each batch to let other work run) - explore if starvation observed.
188 static strand_invoker
189
1/1
✓ Branch 1 taken 7 times.
7x make_invoker(strand_impl& impl)
190 {
191 strand_impl* p = &impl;
192 for(;;)
193 {
194 set_dispatch_thread(*p);
195 dispatch_pending(*p);
196 if(try_unlock(*p))
197 {
198 clear_dispatch_thread(*p);
199 co_return;
200 }
201 }
202 14x }
203
204 friend class strand_service;
205 };
206
207 //----------------------------------------------------------
208
209 19x strand_service::
210 19x strand_service()
211 19x : service()
212 {
213 19x }
214
215 19x strand_service::
216 ~strand_service() = default;
217
218 bool
219 2x strand_service::
220 running_in_this_thread(strand_impl& impl) noexcept
221 {
222 2x return impl.dispatch_thread_.load() == std::this_thread::get_id();
223 }
224
225 std::coroutine_handle<>
226 1x strand_service::
227 dispatch(strand_impl& impl, executor_ref ex, std::coroutine_handle<> h)
228 {
229
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 1 time.
1x if(running_in_this_thread(impl))
230 return h;
231
232
1/2
✓ Branch 1 taken 1 time.
✗ Branch 2 not taken.
1x if(strand_service_impl::enqueue(impl, h))
233
2/2
✓ Branch 1 taken 1 time.
✓ Branch 5 taken 1 time.
1x ex.post(strand_service_impl::make_invoker(impl).h_);
234 1x return std::noop_coroutine();
235 }
236
237 void
238 321x strand_service::
239 post(strand_impl& impl, executor_ref ex, std::coroutine_handle<> h)
240 {
241
2/2
✓ Branch 1 taken 6 times.
✓ Branch 2 taken 315 times.
321x if(strand_service_impl::enqueue(impl, h))
242
2/2
✓ Branch 1 taken 6 times.
✓ Branch 5 taken 6 times.
6x ex.post(strand_service_impl::make_invoker(impl).h_);
243 321x }
244
245 strand_service&
246 23x get_strand_service(execution_context& ctx)
247 {
248 23x return ctx.use_service<strand_service_impl>();
249 }
250
251 } // namespace detail
252 } // namespace capy
253 } // namespace boost
254