1  
//
1  
//
2  
// Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
2  
// Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3  
//
3  
//
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6  
//
6  
//
7  
// Official repository: https://github.com/cppalliance/capy
7  
// Official repository: https://github.com/cppalliance/capy
8  
//
8  
//
9  

9  

10  
#ifndef BOOST_CAPY_IO_ANY_READ_STREAM_HPP
10  
#ifndef BOOST_CAPY_IO_ANY_READ_STREAM_HPP
11  
#define BOOST_CAPY_IO_ANY_READ_STREAM_HPP
11  
#define BOOST_CAPY_IO_ANY_READ_STREAM_HPP
12  

12  

13  
#include <boost/capy/detail/config.hpp>
13  
#include <boost/capy/detail/config.hpp>
14  
#include <boost/capy/detail/await_suspend_helper.hpp>
14  
#include <boost/capy/detail/await_suspend_helper.hpp>
15  
#include <boost/capy/buffers.hpp>
15  
#include <boost/capy/buffers.hpp>
16  
#include <boost/capy/buffers/buffer_array.hpp>
16  
#include <boost/capy/buffers/buffer_array.hpp>
17  
#include <boost/capy/concept/io_awaitable.hpp>
17  
#include <boost/capy/concept/io_awaitable.hpp>
18  
#include <boost/capy/concept/read_stream.hpp>
18  
#include <boost/capy/concept/read_stream.hpp>
19  
#include <boost/capy/ex/io_env.hpp>
19  
#include <boost/capy/ex/io_env.hpp>
20  
#include <boost/capy/io_result.hpp>
20  
#include <boost/capy/io_result.hpp>
21  

21  

22  
#include <concepts>
22  
#include <concepts>
23  
#include <coroutine>
23  
#include <coroutine>
24  
#include <cstddef>
24  
#include <cstddef>
 
25 +
#include <exception>
25  
#include <new>
26  
#include <new>
26  
#include <span>
27  
#include <span>
27  
#include <stop_token>
28  
#include <stop_token>
28  
#include <system_error>
29  
#include <system_error>
29  
#include <utility>
30  
#include <utility>
30  

31  

31  
namespace boost {
32  
namespace boost {
32  
namespace capy {
33  
namespace capy {
33  

34  

34  
/** Type-erased wrapper for any ReadStream.
35  
/** Type-erased wrapper for any ReadStream.
35  

36  

36  
    This class provides type erasure for any type satisfying the
37  
    This class provides type erasure for any type satisfying the
37  
    @ref ReadStream concept, enabling runtime polymorphism for
38  
    @ref ReadStream concept, enabling runtime polymorphism for
38  
    read operations. It uses cached awaitable storage to achieve
39  
    read operations. It uses cached awaitable storage to achieve
39  
    zero steady-state allocation after construction.
40  
    zero steady-state allocation after construction.
40  

41  

41  
    The wrapper supports two construction modes:
42  
    The wrapper supports two construction modes:
42  
    - **Owning**: Pass by value to transfer ownership. The wrapper
43  
    - **Owning**: Pass by value to transfer ownership. The wrapper
43  
      allocates storage and owns the stream.
44  
      allocates storage and owns the stream.
44  
    - **Reference**: Pass a pointer to wrap without ownership. The
45  
    - **Reference**: Pass a pointer to wrap without ownership. The
45  
      pointed-to stream must outlive this wrapper.
46  
      pointed-to stream must outlive this wrapper.
46  

47  

47  
    @par Awaitable Preallocation
48  
    @par Awaitable Preallocation
48  
    The constructor preallocates storage for the type-erased awaitable.
49  
    The constructor preallocates storage for the type-erased awaitable.
49  
    This reserves all virtual address space at server startup
50  
    This reserves all virtual address space at server startup
50  
    so memory usage can be measured up front, rather than
51  
    so memory usage can be measured up front, rather than
51  
    allocating piecemeal as traffic arrives.
52  
    allocating piecemeal as traffic arrives.
52  

53  

53  
    @par Immediate Completion
54  
    @par Immediate Completion
54  
    When the underlying stream's awaitable reports ready immediately
55  
    When the underlying stream's awaitable reports ready immediately
55  
    (e.g. buffered data already available), the wrapper skips
56  
    (e.g. buffered data already available), the wrapper skips
56  
    coroutine suspension entirely and returns the result inline.
57  
    coroutine suspension entirely and returns the result inline.
57  

58  

58  
    @par Thread Safety
59  
    @par Thread Safety
59  
    Not thread-safe. Concurrent operations on the same wrapper
60  
    Not thread-safe. Concurrent operations on the same wrapper
60  
    are undefined behavior.
61  
    are undefined behavior.
61  

62  

62  
    @par Example
63  
    @par Example
63  
    @code
64  
    @code
64  
    // Owning - takes ownership of the stream
65  
    // Owning - takes ownership of the stream
65  
    any_read_stream stream(socket{ioc});
66  
    any_read_stream stream(socket{ioc});
66  

67  

67  
    // Reference - wraps without ownership
68  
    // Reference - wraps without ownership
68  
    socket sock(ioc);
69  
    socket sock(ioc);
69  
    any_read_stream stream(&sock);
70  
    any_read_stream stream(&sock);
70  

71  

71  
    mutable_buffer buf(data, size);
72  
    mutable_buffer buf(data, size);
72  
    auto [ec, n] = co_await stream.read_some(buf);
73  
    auto [ec, n] = co_await stream.read_some(buf);
73  
    @endcode
74  
    @endcode
74  

75  

75  
    @see any_write_stream, any_stream, ReadStream
76  
    @see any_write_stream, any_stream, ReadStream
76  
*/
77  
*/
77  
class any_read_stream
78  
class any_read_stream
78  
{
79  
{
79  
    struct vtable;
80  
    struct vtable;
80  

81  

81  
    template<ReadStream S>
82  
    template<ReadStream S>
82  
    struct vtable_for_impl;
83  
    struct vtable_for_impl;
83  

84  

84  
    // ordered for cache line coherence
85  
    // ordered for cache line coherence
85  
    void* stream_ = nullptr;
86  
    void* stream_ = nullptr;
86  
    vtable const* vt_ = nullptr;
87  
    vtable const* vt_ = nullptr;
87  
    void* cached_awaitable_ = nullptr;
88  
    void* cached_awaitable_ = nullptr;
88  
    void* storage_ = nullptr;
89  
    void* storage_ = nullptr;
89  
    bool awaitable_active_ = false;
90  
    bool awaitable_active_ = false;
90  

91  

91  
public:
92  
public:
92  
    /** Destructor.
93  
    /** Destructor.
93  

94  

94  
        Destroys the owned stream (if any) and releases the cached
95  
        Destroys the owned stream (if any) and releases the cached
95  
        awaitable storage.
96  
        awaitable storage.
96  
    */
97  
    */
97  
    ~any_read_stream();
98  
    ~any_read_stream();
98  

99  

99  
    /** Default constructor.
100  
    /** Default constructor.
100  

101  

101  
        Constructs an empty wrapper. Operations on a default-constructed
102  
        Constructs an empty wrapper. Operations on a default-constructed
102  
        wrapper result in undefined behavior.
103  
        wrapper result in undefined behavior.
103  
    */
104  
    */
104  
    any_read_stream() = default;
105  
    any_read_stream() = default;
105  

106  

106  
    /** Non-copyable.
107  
    /** Non-copyable.
107  

108  

108  
        The awaitable cache is per-instance and cannot be shared.
109  
        The awaitable cache is per-instance and cannot be shared.
109  
    */
110  
    */
110  
    any_read_stream(any_read_stream const&) = delete;
111  
    any_read_stream(any_read_stream const&) = delete;
111  
    any_read_stream& operator=(any_read_stream const&) = delete;
112  
    any_read_stream& operator=(any_read_stream const&) = delete;
112  

113  

113  
    /** Move constructor.
114  
    /** Move constructor.
114  

115  

115  
        Transfers ownership of the wrapped stream (if owned) and
116  
        Transfers ownership of the wrapped stream (if owned) and
116  
        cached awaitable storage from `other`. After the move, `other` is
117  
        cached awaitable storage from `other`. After the move, `other` is
117  
        in a default-constructed state.
118  
        in a default-constructed state.
118  

119  

119  
        @param other The wrapper to move from.
120  
        @param other The wrapper to move from.
120  
    */
121  
    */
121  
    any_read_stream(any_read_stream&& other) noexcept
122  
    any_read_stream(any_read_stream&& other) noexcept
122  
        : stream_(std::exchange(other.stream_, nullptr))
123  
        : stream_(std::exchange(other.stream_, nullptr))
123  
        , vt_(std::exchange(other.vt_, nullptr))
124  
        , vt_(std::exchange(other.vt_, nullptr))
124  
        , cached_awaitable_(std::exchange(other.cached_awaitable_, nullptr))
125  
        , cached_awaitable_(std::exchange(other.cached_awaitable_, nullptr))
125  
        , storage_(std::exchange(other.storage_, nullptr))
126  
        , storage_(std::exchange(other.storage_, nullptr))
126  
        , awaitable_active_(std::exchange(other.awaitable_active_, false))
127  
        , awaitable_active_(std::exchange(other.awaitable_active_, false))
127  
    {
128  
    {
128  
    }
129  
    }
129  

130  

130  
    /** Move assignment operator.
131  
    /** Move assignment operator.
131  

132  

132  
        Destroys any owned stream and releases existing resources,
133  
        Destroys any owned stream and releases existing resources,
133  
        then transfers ownership from `other`.
134  
        then transfers ownership from `other`.
134  

135  

135  
        @param other The wrapper to move from.
136  
        @param other The wrapper to move from.
136  
        @return Reference to this wrapper.
137  
        @return Reference to this wrapper.
137  
    */
138  
    */
138  
    any_read_stream&
139  
    any_read_stream&
139  
    operator=(any_read_stream&& other) noexcept;
140  
    operator=(any_read_stream&& other) noexcept;
140  

141  

141  
    /** Construct by taking ownership of a ReadStream.
142  
    /** Construct by taking ownership of a ReadStream.
142  

143  

143  
        Allocates storage and moves the stream into this wrapper.
144  
        Allocates storage and moves the stream into this wrapper.
144  
        The wrapper owns the stream and will destroy it.
145  
        The wrapper owns the stream and will destroy it.
145  

146  

146  
        @param s The stream to take ownership of.
147  
        @param s The stream to take ownership of.
147  
    */
148  
    */
148  
    template<ReadStream S>
149  
    template<ReadStream S>
149  
        requires (!std::same_as<std::decay_t<S>, any_read_stream>)
150  
        requires (!std::same_as<std::decay_t<S>, any_read_stream>)
150  
    any_read_stream(S s);
151  
    any_read_stream(S s);
151  

152  

152  
    /** Construct by wrapping a ReadStream without ownership.
153  
    /** Construct by wrapping a ReadStream without ownership.
153  

154  

154  
        Wraps the given stream by pointer. The stream must remain
155  
        Wraps the given stream by pointer. The stream must remain
155  
        valid for the lifetime of this wrapper.
156  
        valid for the lifetime of this wrapper.
156  

157  

157  
        @param s Pointer to the stream to wrap.
158  
        @param s Pointer to the stream to wrap.
158  
    */
159  
    */
159  
    template<ReadStream S>
160  
    template<ReadStream S>
160  
    any_read_stream(S* s);
161  
    any_read_stream(S* s);
161  

162  

162  
    /** Check if the wrapper contains a valid stream.
163  
    /** Check if the wrapper contains a valid stream.
163  

164  

164  
        @return `true` if wrapping a stream, `false` if default-constructed
165  
        @return `true` if wrapping a stream, `false` if default-constructed
165  
            or moved-from.
166  
            or moved-from.
166  
    */
167  
    */
167  
    bool
168  
    bool
168  
    has_value() const noexcept
169  
    has_value() const noexcept
169  
    {
170  
    {
170  
        return stream_ != nullptr;
171  
        return stream_ != nullptr;
171  
    }
172  
    }
172  

173  

173  
    /** Check if the wrapper contains a valid stream.
174  
    /** Check if the wrapper contains a valid stream.
174  

175  

175  
        @return `true` if wrapping a stream, `false` if default-constructed
176  
        @return `true` if wrapping a stream, `false` if default-constructed
176  
            or moved-from.
177  
            or moved-from.
177  
    */
178  
    */
178  
    explicit
179  
    explicit
179  
    operator bool() const noexcept
180  
    operator bool() const noexcept
180  
    {
181  
    {
181  
        return has_value();
182  
        return has_value();
182  
    }
183  
    }
183  

184  

184  
    /** Initiate an asynchronous read operation.
185  
    /** Initiate an asynchronous read operation.
185  

186  

186  
        Reads data into the provided buffer sequence. The operation
187  
        Reads data into the provided buffer sequence. The operation
187  
        completes when at least one byte has been read, or an error
188  
        completes when at least one byte has been read, or an error
188  
        occurs.
189  
        occurs.
189  

190  

190  
        @param buffers The buffer sequence to read into. Passed by
191  
        @param buffers The buffer sequence to read into. Passed by
191  
            value to ensure the sequence lives in the coroutine frame
192  
            value to ensure the sequence lives in the coroutine frame
192  
            across suspension points.
193  
            across suspension points.
193  

194  

194  
        @return An awaitable yielding `(error_code,std::size_t)`.
195  
        @return An awaitable yielding `(error_code,std::size_t)`.
195  

196  

196  
        @par Immediate Completion
197  
        @par Immediate Completion
197  
        The operation completes immediately without suspending
198  
        The operation completes immediately without suspending
198  
        the calling coroutine when the underlying stream's
199  
        the calling coroutine when the underlying stream's
199  
        awaitable reports immediate readiness via `await_ready`.
200  
        awaitable reports immediate readiness via `await_ready`.
200  

201  

201  
        @note This is a partial operation and may not process the
202  
        @note This is a partial operation and may not process the
202  
        entire buffer sequence. Use the composed @ref read algorithm
203  
        entire buffer sequence. Use the composed @ref read algorithm
203  
        for guaranteed complete transfer.
204  
        for guaranteed complete transfer.
204  

205  

205  
        @par Preconditions
206  
        @par Preconditions
206  
        The wrapper must contain a valid stream (`has_value() == true`).
207  
        The wrapper must contain a valid stream (`has_value() == true`).
207  
        The caller must not call this function again after a prior
208  
        The caller must not call this function again after a prior
208  
        call returned an error (including EOF).
209  
        call returned an error (including EOF).
209  
    */
210  
    */
210  
    template<MutableBufferSequence MB>
211  
    template<MutableBufferSequence MB>
211  
    auto
212  
    auto
212  
    read_some(MB buffers);
213  
    read_some(MB buffers);
213  

214  

214  
protected:
215  
protected:
215  
    /** Rebind to a new stream after move.
216  
    /** Rebind to a new stream after move.
216  

217  

217  
        Updates the internal pointer to reference a new stream object.
218  
        Updates the internal pointer to reference a new stream object.
218  
        Used by owning wrappers after move assignment when the owned
219  
        Used by owning wrappers after move assignment when the owned
219  
        object has moved to a new location.
220  
        object has moved to a new location.
220  

221  

221  
        @param new_stream The new stream to bind to. Must be the same
222  
        @param new_stream The new stream to bind to. Must be the same
222  
            type as the original stream.
223  
            type as the original stream.
223  

224  

224  
        @note Terminates if called with a stream of different type
225  
        @note Terminates if called with a stream of different type
225  
            than the original.
226  
            than the original.
226  
    */
227  
    */
227  
    template<ReadStream S>
228  
    template<ReadStream S>
228  
    void
229  
    void
229  
    rebind(S& new_stream) noexcept
230  
    rebind(S& new_stream) noexcept
230  
    {
231  
    {
231  
        if(vt_ != &vtable_for_impl<S>::value)
232  
        if(vt_ != &vtable_for_impl<S>::value)
232  
            std::terminate();
233  
            std::terminate();
233  
        stream_ = &new_stream;
234  
        stream_ = &new_stream;
234  
    }
235  
    }
235  
};
236  
};
236  

237  

237  
//----------------------------------------------------------
238  
//----------------------------------------------------------
238  

239  

239  
struct any_read_stream::vtable
240  
struct any_read_stream::vtable
240  
{
241  
{
241  
    // ordered by call frequency for cache line coherence
242  
    // ordered by call frequency for cache line coherence
242  
    void (*construct_awaitable)(
243  
    void (*construct_awaitable)(
243  
        void* stream,
244  
        void* stream,
244  
        void* storage,
245  
        void* storage,
245  
        std::span<mutable_buffer const> buffers);
246  
        std::span<mutable_buffer const> buffers);
246  
    bool (*await_ready)(void*);
247  
    bool (*await_ready)(void*);
247  
    std::coroutine_handle<> (*await_suspend)(void*, std::coroutine_handle<>, io_env const*);
248  
    std::coroutine_handle<> (*await_suspend)(void*, std::coroutine_handle<>, io_env const*);
248  
    io_result<std::size_t> (*await_resume)(void*);
249  
    io_result<std::size_t> (*await_resume)(void*);
249  
    void (*destroy_awaitable)(void*) noexcept;
250  
    void (*destroy_awaitable)(void*) noexcept;
250  
    std::size_t awaitable_size;
251  
    std::size_t awaitable_size;
251  
    std::size_t awaitable_align;
252  
    std::size_t awaitable_align;
252  
    void (*destroy)(void*) noexcept;
253  
    void (*destroy)(void*) noexcept;
253  
};
254  
};
254  

255  

255  
template<ReadStream S>
256  
template<ReadStream S>
256  
struct any_read_stream::vtable_for_impl
257  
struct any_read_stream::vtable_for_impl
257  
{
258  
{
258  
    using Awaitable = decltype(std::declval<S&>().read_some(
259  
    using Awaitable = decltype(std::declval<S&>().read_some(
259  
        std::span<mutable_buffer const>{}));
260  
        std::span<mutable_buffer const>{}));
260  

261  

261  
    static void
262  
    static void
262  
    do_destroy_impl(void* stream) noexcept
263  
    do_destroy_impl(void* stream) noexcept
263  
    {
264  
    {
264  
        static_cast<S*>(stream)->~S();
265  
        static_cast<S*>(stream)->~S();
265  
    }
266  
    }
266  

267  

267  
    static void
268  
    static void
268  
    construct_awaitable_impl(
269  
    construct_awaitable_impl(
269  
        void* stream,
270  
        void* stream,
270  
        void* storage,
271  
        void* storage,
271  
        std::span<mutable_buffer const> buffers)
272  
        std::span<mutable_buffer const> buffers)
272  
    {
273  
    {
273  
        auto& s = *static_cast<S*>(stream);
274  
        auto& s = *static_cast<S*>(stream);
274  
        ::new(storage) Awaitable(s.read_some(buffers));
275  
        ::new(storage) Awaitable(s.read_some(buffers));
275  
    }
276  
    }
276  

277  

277  
    static constexpr vtable value = {
278  
    static constexpr vtable value = {
278  
        &construct_awaitable_impl,
279  
        &construct_awaitable_impl,
279  
        +[](void* p) {
280  
        +[](void* p) {
280  
            return static_cast<Awaitable*>(p)->await_ready();
281  
            return static_cast<Awaitable*>(p)->await_ready();
281  
        },
282  
        },
282  
        +[](void* p, std::coroutine_handle<> h, io_env const* env) {
283  
        +[](void* p, std::coroutine_handle<> h, io_env const* env) {
283  
            return detail::call_await_suspend(
284  
            return detail::call_await_suspend(
284  
                static_cast<Awaitable*>(p), h, env);
285  
                static_cast<Awaitable*>(p), h, env);
285  
        },
286  
        },
286  
        +[](void* p) {
287  
        +[](void* p) {
287  
            return static_cast<Awaitable*>(p)->await_resume();
288  
            return static_cast<Awaitable*>(p)->await_resume();
288  
        },
289  
        },
289  
        +[](void* p) noexcept {
290  
        +[](void* p) noexcept {
290  
            static_cast<Awaitable*>(p)->~Awaitable();
291  
            static_cast<Awaitable*>(p)->~Awaitable();
291  
        },
292  
        },
292  
        sizeof(Awaitable),
293  
        sizeof(Awaitable),
293  
        alignof(Awaitable),
294  
        alignof(Awaitable),
294  
        &do_destroy_impl
295  
        &do_destroy_impl
295  
    };
296  
    };
296  
};
297  
};
297  

298  

298  
//----------------------------------------------------------
299  
//----------------------------------------------------------
299  

300  

300  
inline
301  
inline
301  
any_read_stream::~any_read_stream()
302  
any_read_stream::~any_read_stream()
302  
{
303  
{
303  
    if(storage_)
304  
    if(storage_)
304  
    {
305  
    {
305  
        vt_->destroy(stream_);
306  
        vt_->destroy(stream_);
306  
        ::operator delete(storage_);
307  
        ::operator delete(storage_);
307  
    }
308  
    }
308  
    if(cached_awaitable_)
309  
    if(cached_awaitable_)
309  
    {
310  
    {
310  
        if(awaitable_active_)
311  
        if(awaitable_active_)
311  
            vt_->destroy_awaitable(cached_awaitable_);
312  
            vt_->destroy_awaitable(cached_awaitable_);
312  
        ::operator delete(cached_awaitable_);
313  
        ::operator delete(cached_awaitable_);
313  
    }
314  
    }
314  
}
315  
}
315  

316  

316  
inline any_read_stream&
317  
inline any_read_stream&
317  
any_read_stream::operator=(any_read_stream&& other) noexcept
318  
any_read_stream::operator=(any_read_stream&& other) noexcept
318  
{
319  
{
319  
    if(this != &other)
320  
    if(this != &other)
320  
    {
321  
    {
321  
        if(storage_)
322  
        if(storage_)
322  
        {
323  
        {
323  
            vt_->destroy(stream_);
324  
            vt_->destroy(stream_);
324  
            ::operator delete(storage_);
325  
            ::operator delete(storage_);
325  
        }
326  
        }
326  
        if(cached_awaitable_)
327  
        if(cached_awaitable_)
327  
        {
328  
        {
328  
            if(awaitable_active_)
329  
            if(awaitable_active_)
329  
                vt_->destroy_awaitable(cached_awaitable_);
330  
                vt_->destroy_awaitable(cached_awaitable_);
330  
            ::operator delete(cached_awaitable_);
331  
            ::operator delete(cached_awaitable_);
331  
        }
332  
        }
332  
        stream_ = std::exchange(other.stream_, nullptr);
333  
        stream_ = std::exchange(other.stream_, nullptr);
333  
        vt_ = std::exchange(other.vt_, nullptr);
334  
        vt_ = std::exchange(other.vt_, nullptr);
334  
        cached_awaitable_ = std::exchange(other.cached_awaitable_, nullptr);
335  
        cached_awaitable_ = std::exchange(other.cached_awaitable_, nullptr);
335  
        storage_ = std::exchange(other.storage_, nullptr);
336  
        storage_ = std::exchange(other.storage_, nullptr);
336  
        awaitable_active_ = std::exchange(other.awaitable_active_, false);
337  
        awaitable_active_ = std::exchange(other.awaitable_active_, false);
337  
    }
338  
    }
338  
    return *this;
339  
    return *this;
339  
}
340  
}
340  

341  

341  
template<ReadStream S>
342  
template<ReadStream S>
342  
    requires (!std::same_as<std::decay_t<S>, any_read_stream>)
343  
    requires (!std::same_as<std::decay_t<S>, any_read_stream>)
343  
any_read_stream::any_read_stream(S s)
344  
any_read_stream::any_read_stream(S s)
344  
    : vt_(&vtable_for_impl<S>::value)
345  
    : vt_(&vtable_for_impl<S>::value)
345  
{
346  
{
346  
    struct guard {
347  
    struct guard {
347  
        any_read_stream* self;
348  
        any_read_stream* self;
348  
        bool committed = false;
349  
        bool committed = false;
349  
        ~guard() {
350  
        ~guard() {
350  
            if(!committed && self->storage_) {
351  
            if(!committed && self->storage_) {
351  
                self->vt_->destroy(self->stream_);
352  
                self->vt_->destroy(self->stream_);
352  
                ::operator delete(self->storage_);
353  
                ::operator delete(self->storage_);
353  
                self->storage_ = nullptr;
354  
                self->storage_ = nullptr;
354  
                self->stream_ = nullptr;
355  
                self->stream_ = nullptr;
355  
            }
356  
            }
356  
        }
357  
        }
357  
    } g{this};
358  
    } g{this};
358  

359  

359  
    storage_ = ::operator new(sizeof(S));
360  
    storage_ = ::operator new(sizeof(S));
360  
    stream_ = ::new(storage_) S(std::move(s));
361  
    stream_ = ::new(storage_) S(std::move(s));
361  

362  

362  
    // Preallocate the awaitable storage
363  
    // Preallocate the awaitable storage
363  
    cached_awaitable_ = ::operator new(vt_->awaitable_size);
364  
    cached_awaitable_ = ::operator new(vt_->awaitable_size);
364  

365  

365  
    g.committed = true;
366  
    g.committed = true;
366  
}
367  
}
367  

368  

368  
template<ReadStream S>
369  
template<ReadStream S>
369  
any_read_stream::any_read_stream(S* s)
370  
any_read_stream::any_read_stream(S* s)
370  
    : stream_(s)
371  
    : stream_(s)
371  
    , vt_(&vtable_for_impl<S>::value)
372  
    , vt_(&vtable_for_impl<S>::value)
372  
{
373  
{
373  
    // Preallocate the awaitable storage
374  
    // Preallocate the awaitable storage
374  
    cached_awaitable_ = ::operator new(vt_->awaitable_size);
375  
    cached_awaitable_ = ::operator new(vt_->awaitable_size);
375  
}
376  
}
376  

377  

377  
//----------------------------------------------------------
378  
//----------------------------------------------------------
378  

379  

379  
template<MutableBufferSequence MB>
380  
template<MutableBufferSequence MB>
380  
auto
381  
auto
381  
any_read_stream::read_some(MB buffers)
382  
any_read_stream::read_some(MB buffers)
382  
{
383  
{
383  
    // VFALCO in theory, we could use if constexpr to detect a
384  
    // VFALCO in theory, we could use if constexpr to detect a
384  
    // span and then pass that through to read_some without the array
385  
    // span and then pass that through to read_some without the array
385  
    struct awaitable
386  
    struct awaitable
386  
    {
387  
    {
387  
        any_read_stream* self_;
388  
        any_read_stream* self_;
388  
        mutable_buffer_array<detail::max_iovec_> ba_;
389  
        mutable_buffer_array<detail::max_iovec_> ba_;
389  

390  

390  
        bool
391  
        bool
391  
        await_ready()
392  
        await_ready()
392  
        {
393  
        {
393  
            self_->vt_->construct_awaitable(
394  
            self_->vt_->construct_awaitable(
394  
                self_->stream_,
395  
                self_->stream_,
395  
                self_->cached_awaitable_,
396  
                self_->cached_awaitable_,
396  
                ba_.to_span());
397  
                ba_.to_span());
397  
            self_->awaitable_active_ = true;
398  
            self_->awaitable_active_ = true;
398  

399  

399  
            return self_->vt_->await_ready(
400  
            return self_->vt_->await_ready(
400  
                self_->cached_awaitable_);
401  
                self_->cached_awaitable_);
401  
        }
402  
        }
402  

403  

403  
        std::coroutine_handle<>
404  
        std::coroutine_handle<>
404  
        await_suspend(std::coroutine_handle<> h, io_env const* env)
405  
        await_suspend(std::coroutine_handle<> h, io_env const* env)
405  
        {
406  
        {
406  
            return self_->vt_->await_suspend(
407  
            return self_->vt_->await_suspend(
407  
                self_->cached_awaitable_, h, env);
408  
                self_->cached_awaitable_, h, env);
408  
        }
409  
        }
409  

410  

410  
        io_result<std::size_t>
411  
        io_result<std::size_t>
411  
        await_resume()
412  
        await_resume()
412  
        {
413  
        {
413  
            struct guard {
414  
            struct guard {
414  
                any_read_stream* self;
415  
                any_read_stream* self;
415  
                ~guard() {
416  
                ~guard() {
416  
                    self->vt_->destroy_awaitable(self->cached_awaitable_);
417  
                    self->vt_->destroy_awaitable(self->cached_awaitable_);
417  
                    self->awaitable_active_ = false;
418  
                    self->awaitable_active_ = false;
418  
                }
419  
                }
419  
            } g{self_};
420  
            } g{self_};
420  
            return self_->vt_->await_resume(
421  
            return self_->vt_->await_resume(
421  
                self_->cached_awaitable_);
422  
                self_->cached_awaitable_);
422  
        }
423  
        }
423  
    };
424  
    };
424  
    return awaitable{this,
425  
    return awaitable{this,
425  
        mutable_buffer_array<detail::max_iovec_>(buffers)};
426  
        mutable_buffer_array<detail::max_iovec_>(buffers)};
426  
}
427  
}
427  

428  

428  
} // namespace capy
429  
} // namespace capy
429  
} // namespace boost
430  
} // namespace boost
430  

431  

431  
#endif
432  
#endif