lefticus / cpp_weekly

The official C++ Weekly Repository. Code samples and notes of future / past episodes will land here at various times. PR's will be accepted in some cases.
The Unlicense
695 stars 26 forks source link

Episode about coroutines: Application in Real world code #136

Open schorsch1976 opened 2 years ago

schorsch1976 commented 2 years ago

Channel

C++Weekly

Topics

Coroutines: Application in real world code and what you can gain from them.

Length I guess as this topic is complicated, its a longer episode. 20+ mins.

CppCon 2016: Gor Nishanov “C++ Coroutines: Under the covers" [3] shows some of the issues but none of the talks i searched and looked on YT seem to cover the statemachine like shown in the Kolb Report [1] Page 10. Bjarne Stroustroup wrote or said somewhere that he used them back in the days for simulations. (I cant find the reference.) Others wrote that you can use them for e.g. AI in games. Gor Nishanov had a talk [2][4] on CppCon and said: They are negative overhead abstractions as he used the prefetch assembly instruction to fetch data into the cache and then resume the function. So these people all suggest that they have great potential.

They are a C++20 feature and soon is C++23 out or already feature frozen.

When i think about them and how to apply them to my code (Statemachines in Realtime applications) i tend to always think about actors with an queue/ring-buffer with a variant and then an attached statemachine. When the statemachines get more complicated it leads to something that you just feed a Boost.MSM or Boot-ext.SML statemachine. So: What can i gain from them in such environments? How to write complex statemachines without the need for these libraries? How can you write complex statemachines with them? WIth complex statemachines i think about ~30-50 states.

What are the situations when they really shine and what situations do you better avoid them.

I also got them working on an embedded target (STM32F103RBT6 / Olimexino-STM32 Board). There you need a custom operator new/delete in the promise type and let it allocate the memory from a static pool. Somehow related to the memory pool that you "uncovered" in the analysis of the C&C Code with the units, [5]

So maybe this could be an interesting Episode :)

[1] https://wiki.tum.de/download/attachments/93291100/Kolb%20report%20-%20Coroutines%20in%20C%2B%2B20.pdf?version=1&modificationDate=1532345204090&api=v2 [2] CppCon 2015: Gor Nishanov “C++ Coroutines - a negative overhead abstraction" [3] https://www.youtube.com/watch?v=8C8NnE1Dg4A&t=278s [4] https://www.youtube.com/watch?v=j9tlJAqMV7U&t=2431s [5] https://youtu.be/Oee7gje-XRc?t=3083

MartyMcFlyInTheSky commented 2 years ago

I would like to hear that as well! Idk how statemachines can be simpler than double diapatched std::visit with the ovwrload pattern, but maybe they can

lefticus commented 2 years ago

double dispatched std::visit with overloads is amazing, but if you get beyond 5 different types in each variant you get some serious compile time explosion to worry about

MartyMcFlyInTheSky commented 2 years ago

@lefticus I'm lucky I'm just at about 5 types now haha. But I'll keep that in mind. It is most certainly not the most optimal state machine, because the reentry of std::visit costs one unnecessery runtime branch. I was reflecting on using label pointers for this which I come to like more and more. Maybe you could also make a video on them sometimes :>

lefticus commented 2 years ago

I feel like this would be best as a [fill in the blank] episode, so we can talk about the generic application of coroutines. Hopefully with a language that has better support for them

schorsch1976 commented 2 years ago

What makes C++ Coroutines so challenging at the moment are the 20+ customization points. They make it very expert friendly.

schorsch1976 commented 2 years ago

As i cant stop thinking about the coroutines i went to the history of them. M.E. Conway described them here https://www.melconway.com/Home/pdf/compiler.pdf

He wrote in this paper:

A program organization is separable if it is broken up into processing modules which communicate with each other according to the following restrictions: (I) the only communication between modules is in the form of discrete items of information; (2) the flow of each of these items is along fixed, one-way paths; (3) the entire program can be laid out so that the input is at the left extreme, the output is at the right extreme, and e\'erywhere in between all information items flowing between modules have a component of motion to the right.

Under these conditions each module may be made into a coroutine; that is, it may be coded as an autonomous program which communicates with adjacent modules as if they were input or output subroutines.

My interpretation is, that they are generators which feed other generators until the output is produced.

On Page 2 he shows how he connected 2 subroutines: Write data to tape, rewind it and feed the input to B. This is the essence of a ringbuffer/queue.

                          |
                   +---+  |  +---+
                   | A +--|->| B |
                   +---+  |  +---+
                          |

                       Tape

+---+   +----------+   +---+     +----------+    +---+
|   |   |Tape      |   |+-+|     |Tape      |    |   |
| A |+->|Writing   +-->|| ||+--->|Reading   |+-->| B |
|   |   |Subroutine|   |+-+++    |Subroutine|    |   |
+---+   +----------+   +---++    +----------+    +---+

When i apply this thinking to event driven applications, the data are the events. The statemachines are the coroutines that transform the data into another representation The actions that a statemachine does are outputs.

schorsch1976 commented 2 years ago

Here i got a sample which is like the description of Conway (in my point of view)

#include <array>
#include <cassert>
#include <coroutine>
#include <iostream>
#include <variant>

template<std::size_t MAX_BUFFER_SIZE, typename... Events>
class EventQueue {
public:
    static_assert(MAX_BUFFER_SIZE >= 1);
    static_assert((not std::is_void_v<Events> && ...), "No void arguments allowed");
    static_assert((std::is_copy_assignable_v<Events> && ...), "All Event types must be copy assignable");

    using Variant = std::variant<Events...>;

    [[nodiscard]] bool full() const {
        return m_full;
    }
    [[nodiscard]] bool empty() const {
        return m_empty;
    }

    [[nodiscard]] std::size_t capacity() const {
        return MAX_BUFFER_SIZE;
    }

    [[nodiscard]] std::size_t size() const {
        if (m_full) {
            return m_buffer.size();
        }
        if (m_empty) {
            return 0;
        }

        if (m_tail >= m_head) {
            return m_tail - m_head;
        } else {
            return m_buffer.size() - m_head + m_tail;
        }
    }

    template<typename T>
    [[nodiscard]] auto push(const T value) {
        static_assert((std::is_same_v<T, Events> || ...), "Only exact types can be pushed into the EventQueue");

        struct Awaiter {
            explicit Awaiter(EventQueue &queue, const T value) : m_queue{queue}, m_value{value} {}

            bool await_ready() const {
                return not m_queue.full();
            }

            void await_suspend(std::coroutine_handle<> h) {
                assert(m_queue.m_push == nullptr);
                m_queue.m_push = h;
            }

            void await_resume() {
                m_queue.m_push = nullptr;
                m_queue.InternalPush(m_value);
            }

            EventQueue &m_queue;
            T m_value;
        };

        return Awaiter{*this, value};
    }

    [[nodiscard]] auto pull() {
        struct Awaiter {
            explicit Awaiter(EventQueue &queue) : m_queue{queue} {}

            bool await_ready() const {
                return not m_queue.empty();
            }

            void await_suspend(std::coroutine_handle<> h) {
                assert(m_queue.m_pull == nullptr);
                m_queue.m_pull = h;
            }

            Variant await_resume() {
                m_queue.m_pull = nullptr;
                return m_queue.InternalPull();
            }

            EventQueue &m_queue;
        };

        return Awaiter{*this};
    }

    void Execute() {
        if (not full() && m_push) {
            // resume the writer
            m_push.resume();
        }
        if (not empty() && m_pull) {
            // resume the reader
            m_pull.resume();
        }
    }

private:
    Variant InternalPull() {
        assert(not empty());

        Variant result{m_buffer[m_head]};
        m_buffer[m_head] = Variant{};
        m_head = (m_head + 1) % m_buffer.size();

        m_full = false;
        m_empty = (m_head == m_tail);

        return result;
    }
    void InternalPush(Variant value) {
        assert(not full());

        m_buffer[m_tail] = value;
        m_tail = (m_tail + 1) % m_buffer.size();

        m_empty = false;
        m_full = (m_head == m_tail);
    }

    std::array<Variant, MAX_BUFFER_SIZE> m_buffer{};
    std::size_t m_head = 0;
    std::size_t m_tail = 0;
    bool m_empty{true};
    bool m_full{false};

    std::coroutine_handle<> m_pull = nullptr;
    std::coroutine_handle<> m_push = nullptr;
};

using EQueue = EventQueue<2, int>;

struct Task {
    struct promise_type;
    using Handle = std::coroutine_handle<promise_type>;

    explicit Task(Handle handle) : m_handle{handle} {
    }
    ~Task() {
        if (m_handle)
        {
            m_handle.destroy();
        }
    }
    Task(Task &&rhs) noexcept {
        m_handle = rhs.m_handle;
    }
    Task &operator=(Task &&rhs) noexcept {
        m_handle = rhs.m_handle;
        return *this;
    }

    Task(const Task &) = delete;
    Task &operator=(const Task &) = delete;

    bool IsValid() const {
        return not m_handle.done();
    }

    struct promise_type {
        Task get_return_object() {
            return Task{Handle::from_promise(*this)};
        }

        std::suspend_never initial_suspend() { return {}; }
        std::suspend_always final_suspend() noexcept { return {}; }
        void return_void() {}
        void unhandled_exception() { std::terminate(); }
    };

private:
    Handle m_handle;
};

Task A(EQueue &queue) {
    for (int i = 0; i < 100; ++i) {
        co_await queue.push(i);
    }
}

Task B(EQueue &queue) {
    while (true) {
        EQueue::Variant v = co_await queue.pull();
        std::visit([](auto &&arg) {
            std::cout << arg << '\n';
        },
                   v);
    }
}

int main() {
    EQueue q;
    Task a = A(q);
    Task b = B(q);

    do {
        q.Execute();
    } while (a.IsValid() && b.IsValid());

    return 0;
}