include/boost/capy/io/any_read_stream.hpp

84.2% Lines (123/146) 81.6% List of functions (31/38) 77.3% Branches (17/22)
f(x) Functions (38)
Function Calls Lines Branches Blocks
boost::capy::any_read_stream::any_read_stream() :104 1x 100.0% 100.0% boost::capy::any_read_stream::any_read_stream(boost::capy::any_read_stream&&) :121 2x 100.0% 100.0% boost::capy::any_read_stream::has_value() const :168 25x 100.0% 100.0% boost::capy::any_read_stream::operator bool() const :179 3x 71.4% 100.0% boost::capy::any_read_stream::vtable_for_impl<boost::capy::(anonymous namespace)::mock_stream>::do_destroy_impl(void*) :262 0 0.0% 0.0% boost::capy::any_read_stream::vtable_for_impl<boost::capy::(anonymous namespace)::pending_read_stream>::do_destroy_impl(void*) :262 0 0.0% 0.0% boost::capy::any_read_stream::vtable_for_impl<boost::capy::test::read_stream>::do_destroy_impl(void*) :262 1x 100.0% 100.0% boost::capy::any_read_stream::vtable_for_impl<boost::capy::(anonymous namespace)::mock_stream>::construct_awaitable_impl(void*, void*, std::span<boost::capy::mutable_buffer const, 18446744073709551615ul>) :268 14x 100.0% 100.0% boost::capy::any_read_stream::vtable_for_impl<boost::capy::(anonymous namespace)::pending_read_stream>::construct_awaitable_impl(void*, void*, std::span<boost::capy::mutable_buffer const, 18446744073709551615ul>) :268 2x 100.0% 100.0% boost::capy::any_read_stream::vtable_for_impl<boost::capy::test::read_stream>::construct_awaitable_impl(void*, void*, std::span<boost::capy::mutable_buffer const, 18446744073709551615ul>) :268 75x 100.0% 100.0% boost::capy::any_read_stream::vtable_for_impl<boost::capy::(anonymous namespace)::mock_stream>::{lambda(void*)#7}::operator()(void*) const :279 14x 100.0% 100.0% boost::capy::any_read_stream::vtable_for_impl<boost::capy::(anonymous namespace)::pending_read_stream>::{lambda(void*)#7}::operator()(void*) const :279 2x 100.0% 100.0% boost::capy::any_read_stream::vtable_for_impl<boost::capy::test::read_stream>::{lambda(void*)#4}::operator()(void*) const :279 75x 100.0% 100.0% boost::capy::any_read_stream::vtable_for_impl<boost::capy::(anonymous namespace)::mock_stream>::{lambda(void*, std::__n4861::coroutine_handle<void>, boost::capy::io_env const*)#3}::operator()(void*, std::__n4861::coroutine_handle<void>, boost::capy::io_env const*) const :282 0 0.0% 0.0% boost::capy::any_read_stream::vtable_for_impl<boost::capy::(anonymous namespace)::pending_read_stream>::{lambda(void*, std::__n4861::coroutine_handle<void>, boost::capy::io_env const*)#3}::operator()(void*, std::__n4861::coroutine_handle<void>, boost::capy::io_env const*) const :282 0 0.0% 0.0% boost::capy::any_read_stream::vtable_for_impl<boost::capy::test::read_stream>::{lambda(void*, std::__n4861::coroutine_handle<void>, boost::capy::io_env const*)#2}::operator()(void*, std::__n4861::coroutine_handle<void>, boost::capy::io_env const*) const :282 0 0.0% 0.0% boost::capy::any_read_stream::vtable_for_impl<boost::capy::(anonymous namespace)::mock_stream>::{lambda(void*)#8}::operator()(void*) const :286 14x 100.0% 100.0% boost::capy::any_read_stream::vtable_for_impl<boost::capy::(anonymous namespace)::pending_read_stream>::{lambda(void*)#8}::operator()(void*) const :286 0 0.0% 0.0% boost::capy::any_read_stream::vtable_for_impl<boost::capy::test::read_stream>::{lambda(void*)#5}::operator()(void*) const :286 75x 100.0% 100.0% boost::capy::any_read_stream::vtable_for_impl<boost::capy::(anonymous namespace)::mock_stream>::{lambda(void*)#9}::operator()(void*) const :289 14x 100.0% 100.0% boost::capy::any_read_stream::vtable_for_impl<boost::capy::(anonymous namespace)::pending_read_stream>::{lambda(void*)#9}::operator()(void*) const :289 2x 100.0% 100.0% boost::capy::any_read_stream::vtable_for_impl<boost::capy::test::read_stream>::{lambda(void*)#6}::operator()(void*) const :289 75x 100.0% 100.0% boost::capy::any_read_stream::~any_read_stream() :301 101x 100.0% 100.0% 100.0% boost::capy::any_read_stream::operator=(boost::capy::any_read_stream&&) :317 5x 86.7% 75.0% 88.0% boost::capy::any_read_stream::any_read_stream<boost::capy::test::read_stream>(boost::capy::test::read_stream) :343 1x 100.0% 80.0% boost::capy::any_read_stream::any_read_stream<boost::capy::test::read_stream>(boost::capy::test::read_stream)::guard::~guard() :349 1x 69.2% 50.0% 33.0% boost::capy::any_read_stream::any_read_stream<boost::capy::(anonymous namespace)::mock_stream>(boost::capy::(anonymous namespace)::mock_stream*) :369 23x 100.0% 100.0% boost::capy::any_read_stream::any_read_stream<boost::capy::(anonymous namespace)::pending_read_stream>(boost::capy::(anonymous namespace)::pending_read_stream*) :369 2x 100.0% 100.0% boost::capy::any_read_stream::any_read_stream<boost::capy::test::read_stream>(boost::capy::test::read_stream*) :369 67x 100.0% 100.0% auto boost::capy::any_read_stream::read_some<boost::capy::mutable_buffer>(boost::capy::mutable_buffer) :381 21x 100.0% 100.0% auto boost::capy::any_read_stream::read_some<std::array<boost::capy::mutable_buffer, 2ul> >(std::array<boost::capy::mutable_buffer, 2ul>) :381 6x 100.0% 100.0% auto boost::capy::any_read_stream::read_some<std::span<boost::capy::mutable_buffer const, 18446744073709551615ul> >(std::span<boost::capy::mutable_buffer const, 18446744073709551615ul>) :381 6x 100.0% 100.0% auto boost::capy::any_read_stream::read_some<std::span<boost::capy::mutable_buffer, 18446744073709551615ul> >(std::span<boost::capy::mutable_buffer, 18446744073709551615ul>) :381 52x 100.0% 100.0% auto boost::capy::any_read_stream::read_some<std::vector<boost::capy::mutable_buffer, std::allocator<boost::capy::mutable_buffer> > >(std::vector<boost::capy::mutable_buffer, std::allocator<boost::capy::mutable_buffer> >) :381 6x 100.0% 100.0% boost::capy::any_read_stream::read_some<std::span<boost::capy::mutable_buffer, 18446744073709551615ul> >(std::span<boost::capy::mutable_buffer, 18446744073709551615ul>)::awaitable::await_ready() :391 14x 100.0% 100.0% 88.0% boost::capy::any_read_stream::read_some<std::span<boost::capy::mutable_buffer, 18446744073709551615ul> >(std::span<boost::capy::mutable_buffer, 18446744073709551615ul>)::awaitable::await_suspend(std::__n4861::coroutine_handle<void>, boost::capy::io_env const*) :404 0 0.0% 0.0% boost::capy::any_read_stream::read_some<std::span<boost::capy::mutable_buffer, 18446744073709551615ul> >(std::span<boost::capy::mutable_buffer, 18446744073709551615ul>)::awaitable::await_resume() :411 14x 100.0% 100.0% boost::capy::any_read_stream::read_some<std::span<boost::capy::mutable_buffer, 18446744073709551615ul> >(std::span<boost::capy::mutable_buffer, 18446744073709551615ul>)::awaitable::await_resume()::guard::~guard() :415 14x 100.0% 100.0% 100.0%
Line Branch TLA Hits Source Code
1 //
2 // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/capy
8 //
9
10 #ifndef BOOST_CAPY_IO_ANY_READ_STREAM_HPP
11 #define BOOST_CAPY_IO_ANY_READ_STREAM_HPP
12
13 #include <boost/capy/detail/config.hpp>
14 #include <boost/capy/detail/await_suspend_helper.hpp>
15 #include <boost/capy/buffers.hpp>
16 #include <boost/capy/buffers/buffer_array.hpp>
17 #include <boost/capy/concept/io_awaitable.hpp>
18 #include <boost/capy/concept/read_stream.hpp>
19 #include <boost/capy/ex/io_env.hpp>
20 #include <boost/capy/io_result.hpp>
21
22 #include <concepts>
23 #include <coroutine>
24 #include <cstddef>
25 #include <new>
26 #include <span>
27 #include <stop_token>
28 #include <system_error>
29 #include <utility>
30
31 namespace boost {
32 namespace capy {
33
34 /** Type-erased wrapper for any ReadStream.
35
36 This class provides type erasure for any type satisfying the
37 @ref ReadStream concept, enabling runtime polymorphism for
38 read operations. It uses cached awaitable storage to achieve
39 zero steady-state allocation after construction.
40
41 The wrapper supports two construction modes:
42 - **Owning**: Pass by value to transfer ownership. The wrapper
43 allocates storage and owns the stream.
44 - **Reference**: Pass a pointer to wrap without ownership. The
45 pointed-to stream must outlive this wrapper.
46
47 @par Awaitable Preallocation
48 The constructor preallocates storage for the type-erased awaitable.
49 This reserves all virtual address space at server startup
50 so memory usage can be measured up front, rather than
51 allocating piecemeal as traffic arrives.
52
53 @par Immediate Completion
54 When the underlying stream's awaitable reports ready immediately
55 (e.g. buffered data already available), the wrapper skips
56 coroutine suspension entirely and returns the result inline.
57
58 @par Thread Safety
59 Not thread-safe. Concurrent operations on the same wrapper
60 are undefined behavior.
61
62 @par Example
63 @code
64 // Owning - takes ownership of the stream
65 any_read_stream stream(socket{ioc});
66
67 // Reference - wraps without ownership
68 socket sock(ioc);
69 any_read_stream stream(&sock);
70
71 mutable_buffer buf(data, size);
72 auto [ec, n] = co_await stream.read_some(buf);
73 @endcode
74
75 @see any_write_stream, any_stream, ReadStream
76 */
77 class any_read_stream
78 {
79 struct vtable;
80
81 template<ReadStream S>
82 struct vtable_for_impl;
83
84 // ordered for cache line coherence
85 void* stream_ = nullptr;
86 vtable const* vt_ = nullptr;
87 void* cached_awaitable_ = nullptr;
88 void* storage_ = nullptr;
89 bool awaitable_active_ = false;
90
91 public:
92 /** Destructor.
93
94 Destroys the owned stream (if any) and releases the cached
95 awaitable storage.
96 */
97 ~any_read_stream();
98
99 /** Default constructor.
100
101 Constructs an empty wrapper. Operations on a default-constructed
102 wrapper result in undefined behavior.
103 */
104 1x any_read_stream() = default;
105
106 /** Non-copyable.
107
108 The awaitable cache is per-instance and cannot be shared.
109 */
110 any_read_stream(any_read_stream const&) = delete;
111 any_read_stream& operator=(any_read_stream const&) = delete;
112
113 /** Move constructor.
114
115 Transfers ownership of the wrapped stream (if owned) and
116 cached awaitable storage from `other`. After the move, `other` is
117 in a default-constructed state.
118
119 @param other The wrapper to move from.
120 */
121 2x any_read_stream(any_read_stream&& other) noexcept
122 2x : stream_(std::exchange(other.stream_, nullptr))
123 2x , vt_(std::exchange(other.vt_, nullptr))
124 2x , cached_awaitable_(std::exchange(other.cached_awaitable_, nullptr))
125 2x , storage_(std::exchange(other.storage_, nullptr))
126 2x , awaitable_active_(std::exchange(other.awaitable_active_, false))
127 {
128 2x }
129
130 /** Move assignment operator.
131
132 Destroys any owned stream and releases existing resources,
133 then transfers ownership from `other`.
134
135 @param other The wrapper to move from.
136 @return Reference to this wrapper.
137 */
138 any_read_stream&
139 operator=(any_read_stream&& other) noexcept;
140
141 /** Construct by taking ownership of a ReadStream.
142
143 Allocates storage and moves the stream into this wrapper.
144 The wrapper owns the stream and will destroy it.
145
146 @param s The stream to take ownership of.
147 */
148 template<ReadStream S>
149 requires (!std::same_as<std::decay_t<S>, any_read_stream>)
150 any_read_stream(S s);
151
152 /** Construct by wrapping a ReadStream without ownership.
153
154 Wraps the given stream by pointer. The stream must remain
155 valid for the lifetime of this wrapper.
156
157 @param s Pointer to the stream to wrap.
158 */
159 template<ReadStream S>
160 any_read_stream(S* s);
161
162 /** Check if the wrapper contains a valid stream.
163
164 @return `true` if wrapping a stream, `false` if default-constructed
165 or moved-from.
166 */
167 bool
168 25x has_value() const noexcept
169 {
170 25x return stream_ != nullptr;
171 }
172
173 /** Check if the wrapper contains a valid stream.
174
175 @return `true` if wrapping a stream, `false` if default-constructed
176 or moved-from.
177 */
178 explicit
179 3x operator bool() const noexcept
180 {
181 3x return has_value();
182 }
183
184 /** Initiate an asynchronous read operation.
185
186 Reads data into the provided buffer sequence. The operation
187 completes when at least one byte has been read, or an error
188 occurs.
189
190 @param buffers The buffer sequence to read into. Passed by
191 value to ensure the sequence lives in the coroutine frame
192 across suspension points.
193
194 @return An awaitable yielding `(error_code,std::size_t)`.
195
196 @par Immediate Completion
197 The operation completes immediately without suspending
198 the calling coroutine when the underlying stream's
199 awaitable reports immediate readiness via `await_ready`.
200
201 @note This is a partial operation and may not process the
202 entire buffer sequence. Use the composed @ref read algorithm
203 for guaranteed complete transfer.
204
205 @par Preconditions
206 The wrapper must contain a valid stream (`has_value() == true`).
207 The caller must not call this function again after a prior
208 call returned an error (including EOF).
209 */
210 template<MutableBufferSequence MB>
211 auto
212 read_some(MB buffers);
213
214 protected:
215 /** Rebind to a new stream after move.
216
217 Updates the internal pointer to reference a new stream object.
218 Used by owning wrappers after move assignment when the owned
219 object has moved to a new location.
220
221 @param new_stream The new stream to bind to. Must be the same
222 type as the original stream.
223
224 @note Terminates if called with a stream of different type
225 than the original.
226 */
227 template<ReadStream S>
228 void
229 rebind(S& new_stream) noexcept
230 {
231 if(vt_ != &vtable_for_impl<S>::value)
232 std::terminate();
233 stream_ = &new_stream;
234 }
235 };
236
237 //----------------------------------------------------------
238
239 struct any_read_stream::vtable
240 {
241 // ordered by call frequency for cache line coherence
242 void (*construct_awaitable)(
243 void* stream,
244 void* storage,
245 std::span<mutable_buffer const> buffers);
246 bool (*await_ready)(void*);
247 std::coroutine_handle<> (*await_suspend)(void*, std::coroutine_handle<>, io_env const*);
248 io_result<std::size_t> (*await_resume)(void*);
249 void (*destroy_awaitable)(void*) noexcept;
250 std::size_t awaitable_size;
251 std::size_t awaitable_align;
252 void (*destroy)(void*) noexcept;
253 };
254
255 template<ReadStream S>
256 struct any_read_stream::vtable_for_impl
257 {
258 using Awaitable = decltype(std::declval<S&>().read_some(
259 std::span<mutable_buffer const>{}));
260
261 static void
262 1x do_destroy_impl(void* stream) noexcept
263 {
264 1x static_cast<S*>(stream)->~S();
265 1x }
266
267 static void
268 91x construct_awaitable_impl(
269 void* stream,
270 void* storage,
271 std::span<mutable_buffer const> buffers)
272 {
273 91x auto& s = *static_cast<S*>(stream);
274 91x ::new(storage) Awaitable(s.read_some(buffers));
275 91x }
276
277 static constexpr vtable value = {
278 &construct_awaitable_impl,
279 91x +[](void* p) {
280 91x return static_cast<Awaitable*>(p)->await_ready();
281 },
282 +[](void* p, std::coroutine_handle<> h, io_env const* env) {
283 return detail::call_await_suspend(
284 static_cast<Awaitable*>(p), h, env);
285 },
286 89x +[](void* p) {
287 89x return static_cast<Awaitable*>(p)->await_resume();
288 },
289 93x +[](void* p) noexcept {
290 16x static_cast<Awaitable*>(p)->~Awaitable();
291 },
292 sizeof(Awaitable),
293 alignof(Awaitable),
294 &do_destroy_impl
295 };
296 };
297
298 //----------------------------------------------------------
299
300 inline
301 101x any_read_stream::~any_read_stream()
302 {
303
2/2
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 100 times.
101x if(storage_)
304 {
305 1x vt_->destroy(stream_);
306 1x ::operator delete(storage_);
307 }
308
2/2
✓ Branch 0 taken 91 times.
✓ Branch 1 taken 10 times.
101x if(cached_awaitable_)
309 {
310
2/2
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 90 times.
91x if(awaitable_active_)
311 1x vt_->destroy_awaitable(cached_awaitable_);
312 91x ::operator delete(cached_awaitable_);
313 }
314 101x }
315
316 inline any_read_stream&
317 5x any_read_stream::operator=(any_read_stream&& other) noexcept
318 {
319
1/2
✓ Branch 0 taken 5 times.
✗ Branch 1 not taken.
5x if(this != &other)
320 {
321
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 5 times.
5x if(storage_)
322 {
323 vt_->destroy(stream_);
324 ::operator delete(storage_);
325 }
326
2/2
✓ Branch 0 taken 2 times.
✓ Branch 1 taken 3 times.
5x if(cached_awaitable_)
327 {
328
2/2
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 1 time.
2x if(awaitable_active_)
329 1x vt_->destroy_awaitable(cached_awaitable_);
330 2x ::operator delete(cached_awaitable_);
331 }
332 5x stream_ = std::exchange(other.stream_, nullptr);
333 5x vt_ = std::exchange(other.vt_, nullptr);
334 5x cached_awaitable_ = std::exchange(other.cached_awaitable_, nullptr);
335 5x storage_ = std::exchange(other.storage_, nullptr);
336 5x awaitable_active_ = std::exchange(other.awaitable_active_, false);
337 }
338 5x return *this;
339 }
340
341 template<ReadStream S>
342 requires (!std::same_as<std::decay_t<S>, any_read_stream>)
343 1x any_read_stream::any_read_stream(S s)
344 1x : vt_(&vtable_for_impl<S>::value)
345 {
346 struct guard {
347 any_read_stream* self;
348 bool committed = false;
349 1x ~guard() {
350
1/4
✗ Branch 0 not taken.
✓ Branch 1 taken 1 time.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
1x if(!committed && self->storage_) {
351 self->vt_->destroy(self->stream_);
352 ::operator delete(self->storage_);
353 self->storage_ = nullptr;
354 self->stream_ = nullptr;
355 }
356 1x }
357 1x } g{this};
358
359
1/1
✓ Branch 1 taken 1 time.
1x storage_ = ::operator new(sizeof(S));
360 1x stream_ = ::new(storage_) S(std::move(s));
361
362 // Preallocate the awaitable storage
363
1/1
✓ Branch 1 taken 1 time.
1x cached_awaitable_ = ::operator new(vt_->awaitable_size);
364
365 1x g.committed = true;
366 1x }
367
368 template<ReadStream S>
369 92x any_read_stream::any_read_stream(S* s)
370 92x : stream_(s)
371 92x , vt_(&vtable_for_impl<S>::value)
372 {
373 // Preallocate the awaitable storage
374 92x cached_awaitable_ = ::operator new(vt_->awaitable_size);
375 92x }
376
377 //----------------------------------------------------------
378
379 template<MutableBufferSequence MB>
380 auto
381 91x any_read_stream::read_some(MB buffers)
382 {
383 // VFALCO in theory, we could use if constexpr to detect a
384 // span and then pass that through to read_some without the array
385 struct awaitable
386 {
387 any_read_stream* self_;
388 mutable_buffer_array<detail::max_iovec_> ba_;
389
390 bool
391 14x await_ready()
392 {
393 14x self_->vt_->construct_awaitable(
394 14x self_->stream_,
395
1/1
✓ Branch 1 taken 14 times.
14x self_->cached_awaitable_,
396 14x ba_.to_span());
397 14x self_->awaitable_active_ = true;
398
399 28x return self_->vt_->await_ready(
400 14x self_->cached_awaitable_);
401 }
402
403 std::coroutine_handle<>
404 await_suspend(std::coroutine_handle<> h, io_env const* env)
405 {
406 return self_->vt_->await_suspend(
407 self_->cached_awaitable_, h, env);
408 }
409
410 io_result<std::size_t>
411 14x await_resume()
412 {
413 struct guard {
414 any_read_stream* self;
415 14x ~guard() {
416 14x self->vt_->destroy_awaitable(self->cached_awaitable_);
417 14x self->awaitable_active_ = false;
418 14x }
419 14x } g{self_};
420 14x return self_->vt_->await_resume(
421
1/1
✓ Branch 1 taken 10 times.
24x self_->cached_awaitable_);
422 14x }
423 };
424 return awaitable{this,
425 91x mutable_buffer_array<detail::max_iovec_>(buffers)};
426 91x }
427
428 } // namespace capy
429 } // namespace boost
430
431 #endif
432