OpenCyphal-Garage / libcyphal

Portable reference implementation of the Cyphal protocol stack in C++ for embedded systems and Linux.
http://opencyphal.org
MIT License
300 stars 501 forks source link

Refactor the execution logic #367

Closed pavel-kirienko closed 3 months ago

pavel-kirienko commented 4 months ago

The multiplexer as proposed has certain design issues that became apparent only now. The design needs to be slightly adjusted. The updated design will follow Scott's proposed execution model more closely.

The IMultiplexer will be modified to encapsulate not only IO polling but rather scheduling of executable entities in response to 1. input availability; 2. output readiness; 3. delay expiration. As such, the interface will be renamed into IScheduler (alternative names: IExecutor, IExecutionPolicy). The implementation of the interface will be informed by the specifics of the platform (e.g., a POSIX platform may use poll, FreeRTOS may use xQueueCreateSet/FreeRTOS_select, etc.) and by the preferred threading model (single-thread superloop, dedicated single thread for event handling, or a thread pool for the larger systems). Similar to the original LibUAVCAN design, LibCyphal will probably come with a small set of predefined execution policies for common usage scenarios, while allowing the user to easily create their own (perhaps using one of the predefined policies as a starting point).

The IScheduler (aka execution policy interface) will allow the library to register a IRunnable entity along with a condition for its execution (e.g., a socket became readable or a timeout expired). For now, to simplify the implementation of policies, we introduce a restriction that a single runnable entity can only be registered with either an IO event or a timeout (e.g., cannot schedule execution both on data availability AND a timeout, you have to choose one); later this restriction may be lifted if needed.

The first approximation of the interface is as follows:

class IScheduler
{
public:
    /// Invoke the specified runnable when the pollable becomes readable/writable.
    /// The future must be kept alive until the event is no longer needed.
    virtual expected<Future, Error> source(IDataSource& io, IRunnable& handler) = 0;
    virtual expected<Future, Error> sink(IDataSink& io, IRunnable& handler) = 0;

    /// Invoke the specified runnable at the specified period or after the delay expiration.
    /// The scheduler shall avoid accumulation of the phase error.
    /// The future must be kept alive until the event is no longer needed.
    virtual expected<Future, Error> repeat(const Duration period, const IRunnable& handler) = 0;
    virtual expected<Future, Error> defer(const Duration delay, const IRunnable& handler) = 0;

    // rule of 5...
};

The IDataSource/IDataSink interfaces will be implemented by the can::IMedia, udp::IRxSocket, and udp::ITxSocket; future transport implementations will follow suit. These interfaces will implement the CETL RTTI interface to allow the execution policy implementations to cast them to the platform-specific implementations (e.g., on POSIX this will be used to extract the file descriptor that is later to be used with poll or whatever).

The usage assumes that the transport factories will now accept an IScheduler in place of the old IMultiplexer; the transports themselves will register their internal entities with the scheduler as they see fit. For example, the CAN transport will register its own run method to be invoked when the IMedia becomes readable or writable. The output event will be active only as long as there is pending data in the lizard transmission queue:

class CanTransport
{
    // ...
private:
    ... runSingleMediaTransmit(...)
    {
         ...
         if tx queue not empty and !output_event_:
             output_event_ = scheduler_.output(media_, this);
         if tx queue empty:
             output_event_.reset();
         ...
    }
    cetl::optional<IScheduler::Handler> output_event_;
};

The UDP transport would pass a reference to its IScheduler to the session instances it creates, allowing the sessions to organize the required events directly with the scheduler without the need to involve the main transport instance.

One may notice that with this design, IRunnable need not be inherited publicly. Moreover, with the heapless cetl::function already in place, IRunnable should be disused because its interface is overly generalized and hard to make use of efficiently. This becomes particularly relevant in cases where the same entity needs to respond to more than one event; IRunnable does not provide a sensible way to pass additional arguments or capture context to inform the event handler on which actions are to be taken. The improved design that does not include IRunnable is shown below:

class IScheduler
{
public:
    /// The result will be stored into the future instance returned to the client
    /// when the event is created.
    using Callback = cetl::function<MaybeFailure(const TimePoint), sizeof(void*) * 8>;

    /// Alternatively, we could allow the callbacks to return type-erased results.
    /// This could potentially be useful in certain scenarios.
    /// The first non-empty result will cancel the event.
    using Callback = cetl::function<
        expected<cetl::unbounded_variant<sizeof(void*) * 8>, MaybeFailure>(const TimePoint),
        sizeof(void*) * 8>;

    /// Invoke the specified runnable when the pollable becomes readable.
    /// The future must be kept alive until the event is no longer needed.
    /// The future may be destroyed from within the callback.
    /// The first failed callback will automatically deregister the event.
    virtual expected<Future, Error> source(IDataSource& io, const Callback cb) = 0;
    virtual expected<Future, Error> sink(IDataSink& io, const Callback cb) = 0;

    /// Invoke the specified callback at the specified period.
    /// The scheduler shall avoid accumulation of the phase error.
    /// The future must be kept alive until the event is no longer needed.
    /// The future may be destroyed from within the callback.
    /// The first failed callback will automatically deregister the event.
    virtual expected<Future, Error> repeat(const Duration period, const Callback cb) = 0;
    virtual expected<Future, Error> defer(const Duration delay, const Callback cb) = 0;

    /// Optionally, we could add this to replace the static MonotonicClock::now().
    virtual TimePoint now() const noexcept = 0;

    // rule of 5...
};

I have a fairly efficient and very compact implementation of an EDF scheduler with logarithmic complexity that can be used to bootstrap the implementation of concrete execution policies.

The recap of the changes is as follows:

pavel-kirienko commented 4 months ago

For now, we assume that callbacks and API methods are always invoked in the same thread. In the future we may want to introduce locks and probably other synchronization primitives to support true multi-threaded environments. The IScheduler may need to be extended with factory methods for such primitives.

pavel-kirienko commented 4 months ago

The future could look roughly like this (implementation omitted):

class Future
{
public:
    /// True if the associated event is still armed.
    bool isAlive() const noexcept;

    /// The first failure to occur will cancel the event, so if this function returns a non-empty result,
    /// it means the event is no longer alive.
    MaybeFailure getFailure() const noexcept;
};
thirtytwobits commented 4 months ago

alternative names: IExecutor, IExecutionPolicy

Executor (abstract) is a manifestation of an Execution Policy (concrete). As such I prefer IExecutor.

thirtytwobits commented 4 months ago
    virtual expected<Future, Error> repeat(const Duration period, const IRunnable& handler) = 0;
    virtual expected<Future, Error> defer(const Duration delay, const IRunnable& handler) = 0;

I'm unsure these methods are something we expose as an abstraction. It seems like it's intrinsic to the policy itself. My assumption is there is code in the library to the effect of:

ErrorHandlingType init(const ConfigType& runtime_config, IExecutionPolicy& execution_policy)
{
    ErrorHandlingType media_result = initMediaLayer(runtime_config, execution_policy);
    if (media_result.isFailure())
    {
         return media_result;
    }
    ErrorHandlingType transport_result = initTransportLayer(runtime_config, execution_policy);
    if (transport_result.isFailure())
    {
         return transport_result;
    }
    // ditto for presentation and application
}

Within each layer there is a central object that must be registered with a concrete execution policy using the IExecutionPolicy object. I don't expect the IExecutionPolicy initialization routine to support generic configuration of this policy (at first, perhaps it's a feature in the future but it's likely a complex feature). Rather, the simplest way to start with this design is to configure the policy at compile-time such that only parameters like network interfaces and data types are part of the runtime_config and IExecutionPolicy simply has register/unregister methods. Considering this, there would be no generic use for defer or repeat but there would be a need to simply execute which suggests that IExecutable is, itself, "runnable" but is not an IRunnable as it is only ever executed once in a well-formed program:

static extern "C" ThreadPoolExecutionPolicy policy{};

void signalHandler() {
    // register for ctrl+c
    policy.cancel();
}

int main()
{
    ... stuff here ...
    LinuxMedia media{};
    LibCyphal instance{media}; // you probably don't want this on the stack but I'm
                               // emphasizing the need to make the policy separable
                               // to allow for integration with the OS.
    ErrorHandlingType init_result = instance.init(runtime_config, policy);
    if (init_result.isFailure())
    {
        return -1;
    }
    instance.run(); // If using a thread-pool then this will exit immediately after spawning
                    // the required threads. This would block if using a single-threaded policy
                    // where a timed mainloop was used by the execution policy.

    policy.waitForExit(); // because we know that we are using a thread pool policy
                          // we know we have to block on the policy itself unless we are
                          // reusing the main thread for other work.
    return 0;
}

For the sake of simplicity we should only implement a single-threaded policy to start.

thirtytwobits commented 4 months ago

I have a fairly efficient and very compact implementation of an EDF scheduler with logarithmic complexity that can be used to bootstrap the implementation of concrete execution policies.

I'm unsure you need to schedule anything for the simplest case. Each phase simply does all the work available when called.

pavel-kirienko commented 4 months ago

I think what you wrote makes sense and we are, broadly speaking, on the same page. Let us please focus on the remaining questions:

  1. Elimination of IRunnable in favor of cetl::function.
  2. The case for deferred execution.

As I mentioned earlier, IRunnable is too restrictive in its inability to absorb context from the place of event registration, which is useful in practice; at the same time, as an interface, it is too generic, attempting to capture the broad notion of all things that can be executed. Of course, there are ways to work around this, but they quickly become comparatively cluttered. I would, therefore, like to abandon this interface and start using type-erased heapless functors from CETL.

As for the time-based events -- neither the transport nor the presentation layers are going to require this (but see an exception below), at least not per the current design, but the application layer will. Without the time-based events, there would be no well-defined way for the application-layer entities to specify periodic activities. I foresee a valid objection that the addition of time-based events to IExecutor violates the interface segregation principle for the transport and presentation layers; at the same time, the application layer will not require any low-level IO primitives since it is too abstract for that. The obvious solution is to extract the time-based events into a new interface that will only be used by the application layer. We can discuss its specifics later when we are ready to commence the work on that layer.

The exception I mentioned earlier is the registration of postponed IO deinitialization activities in the transport layer. Remember that per the design decision we made in the beginning, we do not perform complex activities at the time of object destruction; instead, we postpone this work to be performed at the next opportunity from the normal execution context as opposed to the object destruction context. To do this, I propose that we add a method to IExecutable that allows the caller to execute a specified callable on the next occasion. The method would be defined as follows:

virtual cetl::optional<Error> postpone(cetl::function<void()> fun) = 0;
lydia-at-amazon commented 4 months ago
  1. Elimination of IRunnable in favor of cetl::function
    • Any concerns that using cetl::function would make the code more complex? I think the answer is no based on the previous comments and that it should actually help simplify the code.
  2. The case for deferred execution.
    • The postpone function would never have to be called directly by the user, correct? So I'm fine with adding this method to IExecutor. I also like the idea of creating a separate interface for the application layer that allows for periodic events.
thirtytwobits commented 4 months ago

Elimination of IRunnable in favor of cetl::function.

Agreed.

The case for deferred execution.

For the application layer, sure. If there is a valid, generic use then the API should exist. For object destruction, I was thinking this is a specific phase of execution not simply deferred in time. As such, it's probably the same signature but the semantics are different requiring a different name reflecting when the function is postponed to. We could decide that garbage collection is performed when idle (a concept we haven't formalized yet but which would naturally occur between invocations to the top-level executor) or we could simply name a queue to be processed at a time defined by the execution policy. So either this is on the Executor:

/// enqueue destruction work. Policy must provide time to process the unreachable queue.
/// This seems odd since executors aren't memory managers.
virtual cetl::optional<Error> pushUnreachable(cetl::function<void()> deleter) = 0;

or we define a way to install Idle handlers and always add one that does garbage collection to the front of the idle handler queue.

pavel-kirienko commented 4 months ago

This seems odd since executors aren't memory managers.

I think this is only odd as long as you call it garbage collection. I think a better term would be deferred finalization; as finalization requires control flow / code execution, its management is within the scope of responsibility of the IExecutor.

I'm not sure why the queue is called unreachable.

A revised design could look roughly as follows (the specific design of the Future is to be determined; it is not essential for this discussion):

class IExecutor
{
public:
    /// This is just an approximation.
    using Any = cetl::unbounded_variant<sizeof(void*) * 8>;

    /// This is just an approximation.
    /// The first non-empty result will cancel the event.
    using Callback = cetl::function<expected<Any, Any>(TimePoint), sizeof(void*) * 8>;

    /// Invoke the specified runnable when the pollable becomes readable.
    /// The future must be kept alive until the event is no longer needed.
    /// The future may be destroyed from within the callback.
    /// The first failed callback will automatically deregister the event.
    virtual expected<Future, Error> source(IDataSource& io, const Callback cb) = 0;
    virtual expected<Future, Error> sink(IDataSink& io, const Callback cb) = 0;

    /// Enqueue destruction work.
    /// Policy will invoke each finalizer eventually in the order of their addition.
    virtual cetl::optional<Error> dispose(cetl::function<void(), sizeof(void*) * 8> finalizer) = 0;
};
pavel-kirienko commented 4 months ago

Note from the discussion with Sergei: the new entities IDataSource and IDataSink may not be needed if we inverted the io entity registration logic such that we pass the IExecutor& into a method on IMedia/ITxSocket/etc.:

class ITxSocket
{
public:
    // <...>

    virtual Expected<Handle, Failure> onWritable(IExecutor& exe, Callback cb) = 0;
};
serges147 commented 3 months ago

Closing as it was refactored by pr #374