src/ex/thread_pool.cpp

89.7% Lines (70/78) 87.5% List of functions (14/16) 76.7% Branches (33/43)
f(x) Functions (16)
Function Calls Lines Branches Blocks
boost::capy::thread_pool::impl::work::work(std::__n4861::coroutine_handle<void>) :49 125x 100.0% 100.0% boost::capy::thread_pool::impl::work::run() :54 125x 100.0% 66.7% 100.0% boost::capy::thread_pool::impl::work::destroy() :61 0 0.0% 0.0% 0.0% boost::capy::thread_pool::impl::~impl() :77 61x 87.5% 66.7% 94.0% boost::capy::thread_pool::impl::impl(unsigned long, std::basic_string_view<char, std::char_traits<char> >) :88 61x 88.9% 80.0% 76.0% boost::capy::thread_pool::impl::post(std::__n4861::coroutine_handle<void>) :102 125x 100.0% 100.0% 100.0% boost::capy::thread_pool::impl::stop() :114 61x 100.0% 100.0% boost::capy::thread_pool::impl::ensure_started() :122 125x 100.0% 100.0% boost::capy::thread_pool::impl::ensure_started()::{lambda()#1}::operator()() const :124 22x 100.0% 100.0% 100.0% boost::capy::thread_pool::impl::ensure_started()::{lambda()#1}::operator()() const::{lambda()#1}::operator()() const :127 38x 100.0% 100.0% 100.0% boost::capy::thread_pool::impl::run(unsigned long) :132 38x 100.0% 100.0% 88.0% boost::capy::thread_pool::impl::run(unsigned long)::{lambda()#1}::operator()() const :144 217x 100.0% 92.9% 100.0% boost::capy::thread_pool::~thread_pool() :160 61x 100.0% 50.0% 100.0% boost::capy::thread_pool::thread_pool(unsigned long, std::basic_string_view<char, std::char_traits<char> >) :168 61x 100.0% 60.0% 55.0% boost::capy::thread_pool::stop() :176 0 0.0% 0.0% boost::capy::thread_pool::executor_type::post(std::__n4861::coroutine_handle<void>) const :185 125x 100.0% 100.0%
Line Branch TLA Hits Source Code
1 //
2 // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3 // Copyright (c) 2026 Michael Vandeberg
4 //
5 // Distributed under the Boost Software License, Version 1.0. (See accompanying
6 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
7 //
8 // Official repository: https://github.com/boostorg/capy
9 //
10
11 #include <boost/capy/ex/thread_pool.hpp>
12 #include <boost/capy/detail/intrusive.hpp>
13 #include <boost/capy/test/thread_name.hpp>
14 #include <atomic>
15 #include <condition_variable>
16 #include <cstdio>
17 #include <mutex>
18 #include <thread>
19 #include <vector>
20
21 /*
22 Thread pool implementation using a shared work queue.
23
24 Work items are coroutine handles wrapped in intrusive list nodes, stored
25 in a single queue protected by a mutex. Worker threads wait on a
26 condition_variable until work is available or stop is requested.
27
28 Threads are started lazily on first post() via std::call_once to avoid
29 spawning threads for pools that are constructed but never used. Each
30 thread is named with a configurable prefix plus index for debugger
31 visibility.
32
33 Shutdown sequence: stop() sets the stop flag and notifies all threads,
34 then the destructor joins threads and destroys any remaining queued
35 work without executing it.
36 */
37
38 namespace boost {
39 namespace capy {
40
41 //------------------------------------------------------------------------------
42
43 class thread_pool::impl
44 {
45 struct work : detail::intrusive_queue<work>::node
46 {
47 std::coroutine_handle<> h_;
48
49 125x explicit work(std::coroutine_handle<> h) noexcept
50 125x : h_(h)
51 {
52 125x }
53
54 125x void run()
55 {
56 125x auto h = h_;
57
1/2
✓ Branch 0 taken 125 times.
✗ Branch 1 not taken.
125x delete this;
58
1/1
✓ Branch 1 taken 125 times.
125x h.resume();
59 125x }
60
61 void destroy()
62 {
63 delete this;
64 }
65 };
66
67 std::mutex mutex_;
68 std::condition_variable cv_;
69 detail::intrusive_queue<work> q_;
70 std::vector<std::thread> threads_;
71 std::atomic<bool> stop_{false};
72 std::size_t num_threads_;
73 char thread_name_prefix_[13]{}; // 12 chars max + null terminator
74 std::once_flag start_flag_;
75
76 public:
77 61x ~impl()
78 {
79 61x stop();
80
2/2
✓ Branch 5 taken 38 times.
✓ Branch 6 taken 61 times.
99x for(auto& t : threads_)
81
1/2
✓ Branch 1 taken 38 times.
✗ Branch 2 not taken.
38x if(t.joinable())
82 38x t.join();
83
84
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 61 times.
61x while(auto* w = q_.pop())
85 w->destroy();
86 61x }
87
88 61x impl(std::size_t num_threads, std::string_view thread_name_prefix)
89 61x : num_threads_(num_threads)
90 {
91
2/2
✓ Branch 0 taken 2 times.
✓ Branch 1 taken 59 times.
61x if(num_threads_ == 0)
92 2x num_threads_ = std::thread::hardware_concurrency();
93
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 61 times.
61x if(num_threads_ == 0)
94 num_threads_ = 1;
95
96 // Truncate prefix to 12 chars, leaving room for up to 3-digit index.
97
1/1
✓ Branch 1 taken 61 times.
61x auto n = thread_name_prefix.copy(thread_name_prefix_, 12);
98 61x thread_name_prefix_[n] = '\0';
99 61x }
100
101 void
102 125x post(std::coroutine_handle<> h)
103 {
104 125x ensure_started();
105 125x auto* w = new work(h);
106 {
107
1/1
✓ Branch 1 taken 125 times.
125x std::lock_guard<std::mutex> lock(mutex_);
108 125x q_.push(w);
109 125x }
110 125x cv_.notify_one();
111 125x }
112
113 void
114 61x stop() noexcept
115 {
116 61x stop_.store(true, std::memory_order_release);
117 61x cv_.notify_all();
118 61x }
119
120 private:
121 void
122 125x ensure_started()
123 {
124
1/1
✓ Branch 1 taken 125 times.
125x std::call_once(start_flag_, [this]{
125 22x threads_.reserve(num_threads_);
126
2/2
✓ Branch 0 taken 38 times.
✓ Branch 1 taken 22 times.
60x for(std::size_t i = 0; i < num_threads_; ++i)
127
1/1
✓ Branch 2 taken 38 times.
76x threads_.emplace_back([this, i]{ run(i); });
128 22x });
129 125x }
130
131 void
132 38x run(std::size_t index)
133 {
134 // Build name; set_current_thread_name truncates to platform limits.
135 char name[16];
136 38x std::snprintf(name, sizeof(name), "%s%zu", thread_name_prefix_, index);
137 38x set_current_thread_name(name);
138
139 for(;;)
140 {
141 163x work* w = nullptr;
142 {
143
1/1
✓ Branch 1 taken 163 times.
163x std::unique_lock<std::mutex> lock(mutex_);
144
1/1
✓ Branch 1 taken 163 times.
163x cv_.wait(lock, [this]{
145
4/4
✓ Branch 1 taken 92 times.
✓ Branch 2 taken 125 times.
✓ Branch 3 taken 38 times.
✓ Branch 4 taken 54 times.
309x return !q_.empty() ||
146 309x stop_.load(std::memory_order_acquire);
147 });
148
6/6
✓ Branch 1 taken 40 times.
✓ Branch 2 taken 123 times.
✓ Branch 4 taken 38 times.
✓ Branch 5 taken 2 times.
✓ Branch 6 taken 38 times.
✓ Branch 7 taken 125 times.
163x if(stop_.load(std::memory_order_acquire) && q_.empty())
149 76x return;
150 125x w = q_.pop();
151 163x }
152
1/2
✓ Branch 0 taken 125 times.
✗ Branch 1 not taken.
125x if(w)
153
1/1
✓ Branch 1 taken 125 times.
125x w->run();
154 125x }
155 }
156 };
157
158 //------------------------------------------------------------------------------
159
160 61x thread_pool::
161 ~thread_pool()
162 {
163 61x shutdown();
164 61x destroy();
165
1/2
✓ Branch 0 taken 61 times.
✗ Branch 1 not taken.
61x delete impl_;
166 61x }
167
168 61x thread_pool::
169 61x thread_pool(std::size_t num_threads, std::string_view thread_name_prefix)
170
2/4
✓ Branch 2 taken 61 times.
✓ Branch 5 taken 61 times.
✗ Branch 7 not taken.
✗ Branch 8 not taken.
61x : impl_(new impl(num_threads, thread_name_prefix))
171 {
172
1/1
✓ Branch 1 taken 61 times.
61x this->set_frame_allocator(std::allocator<void>{});
173 61x }
174
175 void
176 thread_pool::
177 stop() noexcept
178 {
179 impl_->stop();
180 }
181
182 //------------------------------------------------------------------------------
183
184 void
185 125x thread_pool::executor_type::
186 post(std::coroutine_handle<> h) const
187 {
188 125x pool_->impl_->post(h);
189 125x }
190
191 } // capy
192 } // boost
193