nautechsystems / nautilus_trader

A high-performance algorithmic trading platform and event-driven backtester
https://nautilustrader.io
GNU Lesser General Public License v3.0
1.7k stars 398 forks source link

Message Bus v2 #1712

Open twitu opened 2 weeks ago

twitu commented 2 weeks ago

Message bus is a core component of the nautilus engine that allows other components to talk to each by passing messages to handlers #1711 that can consume the message. It allows components to be loosely and dynamically coupled. A unique characteristic of the nautilus message bus is that it is meant to connect components running on the same thread which means it is a synchronous and deterministic message bus.

It supports 3 communication patterns -

Send

Request/Response

Publish

The current message bus implementation has most of the data structures to store the data to enable the above patterns. However, it needs a separate component to queue messages and execute the handlers with the messages. In this sense, the below structure is the context and the runner actually executes the tasks. The runner can be a custom implementation, however there might be a way to get the tokio runtime to do this for us by restricting it to run on one thread essentially making in synchronous. Moreover, since the message bus context needs to be accessible by many different components it might be ergonomic to make it a global variable.

pub struct MessageBus {
    ...
    /// mapping from topic to the corresponding handler
    /// a topic can be a string with wildcards
    /// * '?' - any character
    /// * '*' - any number of any characters
    subscriptions: IndexMap<Subscription, Vec<Ustr>>,
    /// maps a pattern to all the handlers registered for it
    /// this is updated whenever a new subscription is created.
    patterns: IndexMap<Ustr, Vec<Subscription>>,
    /// handles a message or a request destined for a specific endpoint.
    endpoints: IndexMap<Ustr, MessageHandler>,
    /// Relates a request with a response
    /// a request maps it's id to a handler so that a response
    /// with the same id can later be handled.
    correlation_index: IndexMap<UUID4, MessageHandler>,
}

There still remains some challenge around sharing access to the message bus fields because different components might need shared read and write access.

twitu commented 2 weeks ago

The message bus needs to accept many different kind of messages. There are a few pre-defined messages that are used by internal components, however users can also derive their custom classes and even transfer arbitrary Python Objects through the message bus.

The messages can have very different fields so a single trait cannot encompass all the functionality. However, we can use traits to hide the details of the message and recover the concrete type later (without any serde) by using the dynamic typing functionality provided by the std::any modules.

A message handler needs to advertise the concrete type it intends to consume, and the message can then be downcasted to the appropriate concrete value.

pub trait MessageHandler {
   fn arg_type_id() -> TypeId
   fn call(&mut self, arg: Box<dyn Any>)
}

Pseudo examples

For example, a user defined custom Python handler that takes a Python object can be wrapped into a message handler like this pseudocode:

struct MessageBusPythonHandler {
   callback: PyObject
}

impl MessageHandler for MessageBusPythonHandler {
    fn arg_type_id() -> TypeId {
        TypeId::of::<PyObject>()
    }

    fn call(&mut self, arg: Box<dyn Any>) {
            let arg = arg as &dyn Any;

            let arg = arg.downcast::<PyObject>::().unwrap();
            Python::with_gil(...
                self.callback.call(arg)
             );
    }
}

On the other hand, internal components can define their handlers in Rust. For example the DataEngine registers itself as an endpoint in the message bus.

        self._msgbus.register(endpoint="DataEngine.execute", handler=self.execute)

This can be ported to message bus v2 as:

struct DataEngineExecuteHandler {
    engine: DataEngine
}

impl MessageHandler for DateEngineExecuteHandler {
    fn arg_type_id() -> TypeId {
        TypeId::of::<PyObject>()
    }

    fn call(&mut self, arg: Box<dyn Any>) {
            let arg = arg as &dyn Any;
            let command = arg.downcast::<Command>::().unwrap();
            self.engine.execute(command);
    }
}

Summary

The downside to this approach is that dynamic typing will be costlier than directly calling the functions and will reduce the optimizations the compiler can perform. The exact impact can be found only after some perf tests but intuitively it should still be better than a serde based approach. It also adds some complexity to how a component can receives events because the component will have to create a unique struct for downcasting the specific "type" of event.

The upside is that this approach keeps the decoupled, modular and user extensible properties of the current message bus.

Troubladore commented 1 week ago

RE: "A unique characteristic of the nautilus message bus is that it is meant to connect components running on the same thread which means it is a synchronous and deterministic message bus."

Any thoughts on how the recent emergence of subinterpreters and the forthcoming GIL-less Python could impact some of Nautilus' architectural choices?

btw - I really appreciate the deep thought you are giving to these core elements - having an engine that is as robust and performant as possible is a huge selling point for Nautilus in my mind, and these are the tickets that make it happen.

twitu commented 1 week ago

Thanks for sharing this. I'm excited about GIL-less Python, although after reading it appears to be tricky to use properly and bug-freely leverage.

In the case of Nautilus's core architecture, I don't see it being affected much because nautilus core is CPU bound and each event is data dependent on previous event leaving little scope for parallelism. Concurrency is leveraged at the fringes where IO bound tasks read from files or networks. Perhaps after careful analysis of data dependence some components in the pub/sub communication model can benefit from this. But as of now, the message bus is meant to be used in a single threaded context.

cjdsellers commented 1 week ago

Just in addition to @twitu s thoughts - performance considerations aside, determinism is a very desirable feature for a backtest engine (and for the core platform), which is what we get with the current (mostly) single threaded design. As twitu points out, we're already leveraging threads (via Rust) for logging, cache and message bus (external publishing) and in Python for adapter network peripherals (live) which seems to be working well - as all of those are offloading I/O overhead from the main thread in some way.

The approach we're settling on right now is to port the basic design of the message bus we have already, but adapted slightly based on the guarantees Rust provides. We'll hopefully be able to keep the full range of features currently available, but the Python/pyo3 interaction does make it a challenge to achieve.