include/boost/capy/io/any_write_stream.hpp

88.0% Lines (147/167) 85.7% List of functions (30/35) 73.1% Branches (19/26)
f(x) Functions (35)
Function Calls Lines Branches Blocks
boost::capy::any_write_stream::any_write_stream() :105 1x 100.0% 100.0% boost::capy::any_write_stream::any_write_stream(boost::capy::any_write_stream&&) :122 2x 100.0% 100.0% boost::capy::any_write_stream::has_value() const :169 21x 100.0% 100.0% boost::capy::any_write_stream::operator bool() const :180 3x 100.0% 100.0% boost::capy::any_write_stream::vtable_for_impl<boost::capy::(anonymous namespace)::mock_stream>::do_destroy_impl(void*) :263 0 0.0% 0.0% boost::capy::any_write_stream::vtable_for_impl<boost::capy::(anonymous namespace)::pending_write_stream>::do_destroy_impl(void*) :263 0 0.0% 0.0% boost::capy::any_write_stream::vtable_for_impl<boost::capy::test::write_stream>::do_destroy_impl(void*) :263 1x 100.0% 100.0% boost::capy::any_write_stream::vtable_for_impl<boost::capy::(anonymous namespace)::mock_stream>::construct_awaitable_impl(void*, void*, std::span<boost::capy::const_buffer const, 18446744073709551615ul>) :269 10x 100.0% 100.0% boost::capy::any_write_stream::vtable_for_impl<boost::capy::(anonymous namespace)::pending_write_stream>::construct_awaitable_impl(void*, void*, std::span<boost::capy::const_buffer const, 18446744073709551615ul>) :269 2x 100.0% 100.0% boost::capy::any_write_stream::vtable_for_impl<boost::capy::test::write_stream>::construct_awaitable_impl(void*, void*, std::span<boost::capy::const_buffer const, 18446744073709551615ul>) :269 63x 100.0% 100.0% boost::capy::any_write_stream::vtable_for_impl<boost::capy::(anonymous namespace)::mock_stream>::{lambda(void*)#10}::operator()(void*) const :280 10x 100.0% 100.0% boost::capy::any_write_stream::vtable_for_impl<boost::capy::(anonymous namespace)::pending_write_stream>::{lambda(void*)#7}::operator()(void*) const :280 2x 100.0% 100.0% boost::capy::any_write_stream::vtable_for_impl<boost::capy::test::write_stream>::{lambda(void*)#4}::operator()(void*) const :280 63x 100.0% 100.0% boost::capy::any_write_stream::vtable_for_impl<boost::capy::(anonymous namespace)::mock_stream>::{lambda(void*, std::__n4861::coroutine_handle<void>, boost::capy::io_env const*)#4}::operator()(void*, std::__n4861::coroutine_handle<void>, boost::capy::io_env const*) const :283 0 0.0% 0.0% boost::capy::any_write_stream::vtable_for_impl<boost::capy::(anonymous namespace)::pending_write_stream>::{lambda(void*, std::__n4861::coroutine_handle<void>, boost::capy::io_env const*)#3}::operator()(void*, std::__n4861::coroutine_handle<void>, boost::capy::io_env const*) const :283 2x 100.0% 100.0% boost::capy::any_write_stream::vtable_for_impl<boost::capy::test::write_stream>::{lambda(void*, std::__n4861::coroutine_handle<void>, boost::capy::io_env const*)#2}::operator()(void*, std::__n4861::coroutine_handle<void>, boost::capy::io_env const*) const :283 0 0.0% 0.0% boost::capy::any_write_stream::vtable_for_impl<boost::capy::(anonymous namespace)::mock_stream>::{lambda(void*)#11}::operator()(void*) const :287 10x 100.0% 100.0% boost::capy::any_write_stream::vtable_for_impl<boost::capy::(anonymous namespace)::pending_write_stream>::{lambda(void*)#8}::operator()(void*) const :287 0 0.0% 0.0% boost::capy::any_write_stream::vtable_for_impl<boost::capy::test::write_stream>::{lambda(void*)#5}::operator()(void*) const :287 63x 100.0% 100.0% boost::capy::any_write_stream::vtable_for_impl<boost::capy::(anonymous namespace)::mock_stream>::{lambda(void*)#12}::operator()(void*) const :290 10x 100.0% 100.0% boost::capy::any_write_stream::vtable_for_impl<boost::capy::(anonymous namespace)::pending_write_stream>::{lambda(void*)#9}::operator()(void*) const :290 2x 100.0% 100.0% boost::capy::any_write_stream::vtable_for_impl<boost::capy::test::write_stream>::{lambda(void*)#6}::operator()(void*) const :290 63x 100.0% 100.0% boost::capy::any_write_stream::~any_write_stream() :302 95x 100.0% 100.0% 100.0% boost::capy::any_write_stream::operator=(boost::capy::any_write_stream&&) :318 5x 86.7% 75.0% 88.0% boost::capy::any_write_stream::any_write_stream<boost::capy::test::write_stream>(boost::capy::test::write_stream) :344 1x 100.0% 80.0% boost::capy::any_write_stream::any_write_stream<boost::capy::test::write_stream>(boost::capy::test::write_stream)::guard::~guard() :350 1x 90.5% 50.0% 33.0% boost::capy::any_write_stream::any_write_stream<boost::capy::(anonymous namespace)::mock_stream>(boost::capy::(anonymous namespace)::mock_stream*) :370 23x 100.0% 100.0% boost::capy::any_write_stream::any_write_stream<boost::capy::(anonymous namespace)::pending_write_stream>(boost::capy::(anonymous namespace)::pending_write_stream*) :370 2x 100.0% 100.0% boost::capy::any_write_stream::any_write_stream<boost::capy::test::write_stream>(boost::capy::test::write_stream*) :370 61x 100.0% 100.0% auto boost::capy::any_write_stream::write_some<std::span<boost::capy::const_buffer, 18446744073709551615ul> >(std::span<boost::capy::const_buffer, 18446744073709551615ul>) :382 10x 100.0% 100.0% boost::capy::any_write_stream::write_some<std::span<boost::capy::const_buffer, 18446744073709551615ul> >(std::span<boost::capy::const_buffer, 18446744073709551615ul>)::awaitable::awaitable(boost::capy::any_write_stream*, std::span<boost::capy::const_buffer, 18446744073709551615ul> const&) :389 10x 100.0% 100.0% boost::capy::any_write_stream::write_some<std::span<boost::capy::const_buffer, 18446744073709551615ul> >(std::span<boost::capy::const_buffer, 18446744073709551615ul>)::awaitable::await_ready() const :398 10x 100.0% 100.0% boost::capy::any_write_stream::write_some<std::span<boost::capy::const_buffer, 18446744073709551615ul> >(std::span<boost::capy::const_buffer, 18446744073709551615ul>)::awaitable::await_suspend(std::__n4861::coroutine_handle<void>, boost::capy::io_env const*) :404 10x 80.0% 66.7% 73.0% boost::capy::any_write_stream::write_some<std::span<boost::capy::const_buffer, 18446744073709551615ul> >(std::span<boost::capy::const_buffer, 18446744073709551615ul>)::awaitable::await_resume() :420 10x 66.7% 50.0% 82.0% boost::capy::any_write_stream::write_some<std::span<boost::capy::const_buffer, 18446744073709551615ul> >(std::span<boost::capy::const_buffer, 18446744073709551615ul>)::awaitable::await_resume()::guard::~guard() :426 10x 100.0% 100.0% 100.0%
Line Branch TLA Hits Source Code
1 //
2 // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/capy
8 //
9
10 #ifndef BOOST_CAPY_IO_ANY_WRITE_STREAM_HPP
11 #define BOOST_CAPY_IO_ANY_WRITE_STREAM_HPP
12
13 #include <boost/capy/detail/config.hpp>
14 #include <boost/capy/detail/await_suspend_helper.hpp>
15 #include <boost/capy/buffers.hpp>
16 #include <boost/capy/buffers/buffer_array.hpp>
17 #include <boost/capy/concept/io_awaitable.hpp>
18 #include <boost/capy/concept/write_stream.hpp>
19 #include <coroutine>
20 #include <boost/capy/ex/io_env.hpp>
21 #include <boost/capy/io_result.hpp>
22
23 #include <concepts>
24 #include <coroutine>
25 #include <cstddef>
26 #include <new>
27 #include <span>
28 #include <stop_token>
29 #include <system_error>
30 #include <utility>
31
32 namespace boost {
33 namespace capy {
34
35 /** Type-erased wrapper for any WriteStream.
36
37 This class provides type erasure for any type satisfying the
38 @ref WriteStream concept, enabling runtime polymorphism for
39 write operations. It uses cached awaitable storage to achieve
40 zero steady-state allocation after construction.
41
42 The wrapper supports two construction modes:
43 - **Owning**: Pass by value to transfer ownership. The wrapper
44 allocates storage and owns the stream.
45 - **Reference**: Pass a pointer to wrap without ownership. The
46 pointed-to stream must outlive this wrapper.
47
48 @par Awaitable Preallocation
49 The constructor preallocates storage for the type-erased awaitable.
50 This reserves all virtual address space at server startup
51 so memory usage can be measured up front, rather than
52 allocating piecemeal as traffic arrives.
53
54 @par Immediate Completion
55 Operations complete immediately without suspending when the
56 buffer sequence is empty, or when the underlying stream's
57 awaitable reports readiness via `await_ready`.
58
59 @par Thread Safety
60 Not thread-safe. Concurrent operations on the same wrapper
61 are undefined behavior.
62
63 @par Example
64 @code
65 // Owning - takes ownership of the stream
66 any_write_stream stream(socket{ioc});
67
68 // Reference - wraps without ownership
69 socket sock(ioc);
70 any_write_stream stream(&sock);
71
72 const_buffer buf(data, size);
73 auto [ec, n] = co_await stream.write_some(std::span(&buf, 1));
74 @endcode
75
76 @see any_read_stream, any_stream, WriteStream
77 */
78 class any_write_stream
79 {
80 struct vtable;
81
82 template<WriteStream S>
83 struct vtable_for_impl;
84
85 // ordered for cache line coherence
86 void* stream_ = nullptr;
87 vtable const* vt_ = nullptr;
88 void* cached_awaitable_ = nullptr;
89 void* storage_ = nullptr;
90 bool awaitable_active_ = false;
91
92 public:
93 /** Destructor.
94
95 Destroys the owned stream (if any) and releases the cached
96 awaitable storage.
97 */
98 ~any_write_stream();
99
100 /** Default constructor.
101
102 Constructs an empty wrapper. Operations on a default-constructed
103 wrapper result in undefined behavior.
104 */
105 1x any_write_stream() = default;
106
107 /** Non-copyable.
108
109 The awaitable cache is per-instance and cannot be shared.
110 */
111 any_write_stream(any_write_stream const&) = delete;
112 any_write_stream& operator=(any_write_stream const&) = delete;
113
114 /** Move constructor.
115
116 Transfers ownership of the wrapped stream (if owned) and
117 cached awaitable storage from `other`. After the move, `other` is
118 in a default-constructed state.
119
120 @param other The wrapper to move from.
121 */
122 2x any_write_stream(any_write_stream&& other) noexcept
123 2x : stream_(std::exchange(other.stream_, nullptr))
124 2x , vt_(std::exchange(other.vt_, nullptr))
125 2x , cached_awaitable_(std::exchange(other.cached_awaitable_, nullptr))
126 2x , storage_(std::exchange(other.storage_, nullptr))
127 2x , awaitable_active_(std::exchange(other.awaitable_active_, false))
128 {
129 2x }
130
131 /** Move assignment operator.
132
133 Destroys any owned stream and releases existing resources,
134 then transfers ownership from `other`.
135
136 @param other The wrapper to move from.
137 @return Reference to this wrapper.
138 */
139 any_write_stream&
140 operator=(any_write_stream&& other) noexcept;
141
142 /** Construct by taking ownership of a WriteStream.
143
144 Allocates storage and moves the stream into this wrapper.
145 The wrapper owns the stream and will destroy it.
146
147 @param s The stream to take ownership of.
148 */
149 template<WriteStream S>
150 requires (!std::same_as<std::decay_t<S>, any_write_stream>)
151 any_write_stream(S s);
152
153 /** Construct by wrapping a WriteStream without ownership.
154
155 Wraps the given stream by pointer. The stream must remain
156 valid for the lifetime of this wrapper.
157
158 @param s Pointer to the stream to wrap.
159 */
160 template<WriteStream S>
161 any_write_stream(S* s);
162
163 /** Check if the wrapper contains a valid stream.
164
165 @return `true` if wrapping a stream, `false` if default-constructed
166 or moved-from.
167 */
168 bool
169 21x has_value() const noexcept
170 {
171 21x return stream_ != nullptr;
172 }
173
174 /** Check if the wrapper contains a valid stream.
175
176 @return `true` if wrapping a stream, `false` if default-constructed
177 or moved-from.
178 */
179 explicit
180 3x operator bool() const noexcept
181 {
182 3x return has_value();
183 }
184
185 /** Initiate an asynchronous write operation.
186
187 Writes data from the provided buffer sequence. The operation
188 completes when at least one byte has been written, or an error
189 occurs.
190
191 @param buffers The buffer sequence containing data to write.
192 Passed by value to ensure the sequence lives in the
193 coroutine frame across suspension points.
194
195 @return An awaitable yielding `(error_code,std::size_t)`.
196
197 @par Immediate Completion
198 The operation completes immediately without suspending
199 the calling coroutine when:
200 @li The buffer sequence is empty, returning `{error_code{}, 0}`.
201 @li The underlying stream's awaitable reports immediate
202 readiness via `await_ready`.
203
204 @note This is a partial operation and may not process the
205 entire buffer sequence. Use the composed @ref write algorithm
206 for guaranteed complete transfer.
207
208 @par Preconditions
209 The wrapper must contain a valid stream (`has_value() == true`).
210 */
211 template<ConstBufferSequence CB>
212 auto
213 write_some(CB buffers);
214
215 protected:
216 /** Rebind to a new stream after move.
217
218 Updates the internal pointer to reference a new stream object.
219 Used by owning wrappers after move assignment when the owned
220 object has moved to a new location.
221
222 @param new_stream The new stream to bind to. Must be the same
223 type as the original stream.
224
225 @note Terminates if called with a stream of different type
226 than the original.
227 */
228 template<WriteStream S>
229 void
230 rebind(S& new_stream) noexcept
231 {
232 if(vt_ != &vtable_for_impl<S>::value)
233 std::terminate();
234 stream_ = &new_stream;
235 }
236 };
237
238 //----------------------------------------------------------
239
240 struct any_write_stream::vtable
241 {
242 // ordered by call frequency for cache line coherence
243 void (*construct_awaitable)(
244 void* stream,
245 void* storage,
246 std::span<const_buffer const> buffers);
247 bool (*await_ready)(void*);
248 std::coroutine_handle<> (*await_suspend)(void*, std::coroutine_handle<>, io_env const*);
249 io_result<std::size_t> (*await_resume)(void*);
250 void (*destroy_awaitable)(void*) noexcept;
251 std::size_t awaitable_size;
252 std::size_t awaitable_align;
253 void (*destroy)(void*) noexcept;
254 };
255
256 template<WriteStream S>
257 struct any_write_stream::vtable_for_impl
258 {
259 using Awaitable = decltype(std::declval<S&>().write_some(
260 std::span<const_buffer const>{}));
261
262 static void
263 1x do_destroy_impl(void* stream) noexcept
264 {
265 1x static_cast<S*>(stream)->~S();
266 1x }
267
268 static void
269 75x construct_awaitable_impl(
270 void* stream,
271 void* storage,
272 std::span<const_buffer const> buffers)
273 {
274 75x auto& s = *static_cast<S*>(stream);
275 75x ::new(storage) Awaitable(s.write_some(buffers));
276 75x }
277
278 static constexpr vtable value = {
279 &construct_awaitable_impl,
280 75x +[](void* p) {
281 75x return static_cast<Awaitable*>(p)->await_ready();
282 },
283 2x +[](void* p, std::coroutine_handle<> h, io_env const* env) {
284 2x return detail::call_await_suspend(
285 2x static_cast<Awaitable*>(p), h, env);
286 },
287 73x +[](void* p) {
288 73x return static_cast<Awaitable*>(p)->await_resume();
289 },
290 77x +[](void* p) noexcept {
291 12x static_cast<Awaitable*>(p)->~Awaitable();
292 },
293 sizeof(Awaitable),
294 alignof(Awaitable),
295 &do_destroy_impl
296 };
297 };
298
299 //----------------------------------------------------------
300
301 inline
302 95x any_write_stream::~any_write_stream()
303 {
304
2/2
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 94 times.
95x if(storage_)
305 {
306 1x vt_->destroy(stream_);
307 1x ::operator delete(storage_);
308 }
309
2/2
✓ Branch 0 taken 85 times.
✓ Branch 1 taken 10 times.
95x if(cached_awaitable_)
310 {
311
2/2
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 84 times.
85x if(awaitable_active_)
312 1x vt_->destroy_awaitable(cached_awaitable_);
313 85x ::operator delete(cached_awaitable_);
314 }
315 95x }
316
317 inline any_write_stream&
318 5x any_write_stream::operator=(any_write_stream&& other) noexcept
319 {
320
1/2
✓ Branch 0 taken 5 times.
✗ Branch 1 not taken.
5x if(this != &other)
321 {
322
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 5 times.
5x if(storage_)
323 {
324 vt_->destroy(stream_);
325 ::operator delete(storage_);
326 }
327
2/2
✓ Branch 0 taken 2 times.
✓ Branch 1 taken 3 times.
5x if(cached_awaitable_)
328 {
329
2/2
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 1 time.
2x if(awaitable_active_)
330 1x vt_->destroy_awaitable(cached_awaitable_);
331 2x ::operator delete(cached_awaitable_);
332 }
333 5x stream_ = std::exchange(other.stream_, nullptr);
334 5x vt_ = std::exchange(other.vt_, nullptr);
335 5x cached_awaitable_ = std::exchange(other.cached_awaitable_, nullptr);
336 5x storage_ = std::exchange(other.storage_, nullptr);
337 5x awaitable_active_ = std::exchange(other.awaitable_active_, false);
338 }
339 5x return *this;
340 }
341
342 template<WriteStream S>
343 requires (!std::same_as<std::decay_t<S>, any_write_stream>)
344 1x any_write_stream::any_write_stream(S s)
345 1x : vt_(&vtable_for_impl<S>::value)
346 {
347 struct guard {
348 any_write_stream* self;
349 bool committed = false;
350 1x ~guard() {
351
1/4
✗ Branch 0 not taken.
✓ Branch 1 taken 1 time.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
1x if(!committed && self->storage_) {
352 self->vt_->destroy(self->stream_);
353 ::operator delete(self->storage_);
354 self->storage_ = nullptr;
355 self->stream_ = nullptr;
356 }
357 1x }
358 1x } g{this};
359
360
1/1
✓ Branch 1 taken 1 time.
1x storage_ = ::operator new(sizeof(S));
361 1x stream_ = ::new(storage_) S(std::move(s));
362
363 // Preallocate the awaitable storage
364
1/1
✓ Branch 1 taken 1 time.
1x cached_awaitable_ = ::operator new(vt_->awaitable_size);
365
366 1x g.committed = true;
367 1x }
368
369 template<WriteStream S>
370 86x any_write_stream::any_write_stream(S* s)
371 86x : stream_(s)
372 86x , vt_(&vtable_for_impl<S>::value)
373 {
374 // Preallocate the awaitable storage
375 86x cached_awaitable_ = ::operator new(vt_->awaitable_size);
376 86x }
377
378 //----------------------------------------------------------
379
380 template<ConstBufferSequence CB>
381 auto
382 79x any_write_stream::write_some(CB buffers)
383 {
384 struct awaitable
385 {
386 any_write_stream* self_;
387 const_buffer_array<detail::max_iovec_> ba_;
388
389 79x awaitable(
390 any_write_stream* self,
391 CB const& buffers) noexcept
392 79x : self_(self)
393 79x , ba_(buffers)
394 {
395 79x }
396
397 bool
398 79x await_ready() const noexcept
399 {
400 79x return ba_.to_span().empty();
401 }
402
403 std::coroutine_handle<>
404 75x await_suspend(std::coroutine_handle<> h, io_env const* env)
405 {
406 75x self_->vt_->construct_awaitable(
407 75x self_->stream_,
408
1/1
✓ Branch 1 taken 10 times.
75x self_->cached_awaitable_,
409 75x ba_.to_span());
410 75x self_->awaitable_active_ = true;
411
412
1/2
✓ Branch 1 taken 10 times.
✗ Branch 2 not taken.
75x if(self_->vt_->await_ready(self_->cached_awaitable_))
413 73x return h;
414
415 2x return self_->vt_->await_suspend(
416 2x self_->cached_awaitable_, h, env);
417 }
418
419 io_result<std::size_t>
420 77x await_resume()
421 {
422
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 10 times.
77x if(!self_->awaitable_active_)
423 4x return {{}, 0};
424 struct guard {
425 any_write_stream* self;
426 73x ~guard() {
427 73x self->vt_->destroy_awaitable(self->cached_awaitable_);
428 73x self->awaitable_active_ = false;
429 73x }
430 73x } g{self_};
431 73x return self_->vt_->await_resume(
432
1/1
✓ Branch 1 taken 7 times.
73x self_->cached_awaitable_);
433 73x }
434 };
435 79x return awaitable{this, buffers};
436 }
437
438 } // namespace capy
439 } // namespace boost
440
441 #endif
442