nautechsystems / nautilus_trader

A high-performance algorithmic trading platform and event-driven backtester
https://nautilustrader.io
GNU Lesser General Public License v3.0
2.14k stars 481 forks source link

Porting network layer to Rust #1098

Closed twitu closed 1 year ago

twitu commented 1 year ago

This issue tracks the effort and discussion for porting the network layer to Rust.

The network module is relevant to the live trading functionality. Exchange adapters use network module clients to make requests to exchanges and pass received data to the engine. It has clients for 3 protocols - HTTP, websockets and raw socket.

The end goal is to port all three client implementations to rust and expose them as pyo3 modules to be directly consumed by the adapters. Further along it is expected that the adapters will also be ported and the intermediate pyo3 layer will no longer be necessary. Since the network module is IO heavy it is currently implemented as async and will be async in the rust version as well.

twitu commented 1 year ago

The first implementation aims to port the HTTP client. The client is backed by a hyper client that is configured for HTTPS. The custom response returned by the client has the body, status and headers that have keys in header_keys field of the client. Both client and response can directly be accessed as python objects through pyo3.

#[pyclass]
pub struct HttpClient {
    client: Client<HttpsConnector<hyper::client::HttpConnector>>,
    header_keys: Vec<String>,
}

#[pyclass]
pub struct HttpResponse {
    #[pyo3(get)]
    pub status: u16,
    #[pyo3(get)]
    headers: HashMap<String, String>,
    #[pyo3(get)]
    body: Vec<u8>,
}

The live trading setup has many async tasks running. Many of them will continue to be implemented in Python for the time being. So the rust and python async runtimes will have to co-exist. Here we use pyo3-asyncio library to help interface these two runtimes. At a high level it wraps a rust async task in python async task, and the result returned by the rust task is converted into a python object.

image

Note: this means for each request two async tasks are created and executed. The underlying rust task and the wrapping python task.

Currently this test passes.

import asyncio
from nautilus_network import HttpClient

async def make_request(client: HttpClient):
    return await client.request("get", "https://github.com", {})

if __name__ == "__main__":
    client = HttpClient()
    res = asyncio.run(make_request(client))
    assert res.status == 200
    assert len(res.body) != 0
rsmb7z commented 1 year ago

Further along it is expected that the adapters will also be ported and the intermediate pyo3 layer will no longer be necessary.

Hi @twitu @cjdsellers Does this mean implementing adapter in Python will no longer be supported or support will remain? I am particularly concerned about IB because that only supports Cpp, CSharp, Java, and Python.

cjdsellers commented 1 year ago

@rsmb7z This only concerns the layer which is currently implemented in Cython, so the intent is that we continue to build adapters in Python on top of the above.

twitu commented 1 year ago

With #1100 and 250e199, a hyper based HttpClient has been added and tested.

The core Http client and response are same as previously described:

#[pyclass]
pub struct HttpClient {
    client: Client<HttpsConnector<hyper::client::HttpConnector>>,
    header_keys: Vec<String>,
}

#[pyclass]
pub struct HttpResponse {
    #[pyo3(get)]
    pub status: u16,
    #[pyo3(get)]
    headers: HashMap<String, String>,
    #[pyo3(get)]
    body: Vec<u8>,
}

They are also exposed as Python classes by pyo3. And the Python class exposes the request method, along with specializations for get, post, delete and patch.

#[pymethods]
impl HttpClient {
    #[new]
    #[pyo3(signature=(header_keys=[].to_vec()))]
    #[must_use]
    pub fn new(header_keys: Vec<String>) -> Self

    pub fn request<'py>(
        slf: PyRef<'_, Self>,
        method_str: String,
        url: String,
        headers: HashMap<String, String>,
        body: Option<&'py PyBytes>,
        py: Python<'py>,
    ) -> PyResult<&'py PyAny>
}

Internally all of them have this logic, which calls the async function to send a request and returns a Rust future. The pyo3_asyncio function schedules the Rust future on the event loop and awaits it with a Python async task. The Rust future is scheduled as a task on the tokio event loop while the Python future is scheduled on the asyncio event loop.

        pyo3_asyncio::tokio::future_into_py(py, async move {
            match client.send_request(method, url, headers, body_vec).await {
                Ok(res) => Ok(res),
                Err(e) => Err(PyErr::new::<PyException, _>(format!(
                    "Error handling repsonse: {e}"
                ))),
            }
        })

A key point to note here is that there is in total three event loops in the program, and two of them are managed by the pyo3_asyncio crate. It creates and manages it's own tokio and asyncio event loops, which the main logic can't access (hence indirect). The indirect Rust event loop for all Rust related tasks, and the indirect Python event loop for all Python tasks that await Rust tasks.

The direct Python event loop is created by the main control flow logic. It is used for scheduling all other Python tasks i.e. anything that does not directly await a Rust task. Like tasks related to other clients, engines, cache etc.

shapes

This diagram shows how an http send request is scheduled and then awaited by the main control flow in Python.

shapes(1)
twitu commented 1 year ago

The library creates and maintains a single event loop throughout the lifetime of the program. It does not create a new event loop for each request, so having three event loops (2 python and 1 rust) does not impose significant overhead based on initial benchmarks.

static GET_RUNNING_LOOP: OnceCell<PyObject> = OnceCell::new();

/// Get a reference to the Python Event Loop from Rust
///
/// Equivalent to `asyncio.get_running_loop()` in Python 3.7+.
pub fn get_running_loop(py: Python) -> PyResult<&PyAny> {
    // Ideally should call get_running_loop, but calls get_event_loop for compatibility when
    // get_running_loop is not available.
    GET_RUNNING_LOOP
        .get_or_try_init(|| -> PyResult<PyObject> {
            let asyncio = asyncio(py)?;

            Ok(asyncio.getattr("get_running_loop")?.into())
        })?
        .as_ref(py)
        .call0()
}

The benchmarks introduced in e8d249e937f706a88289729c090464d96a5af1c3 roughly measures the client throughput in terms of reqs/s. The requests are sent to server running on the same machine.

name reqs conc time (s)
rust 1 M 128 10
rust 1 M 256 10
pyo3 1M 1 50
pyo3 1M 128 23
pyo3 1M 256 22
cython 1000 256* 0.5
cython 10,000 256* 0.7
cython 100,000 256* runtime error

*cython makes a connection with each request. So the connections are disconnected after each concurrent batch

The general conclusion is that the rust/pyo3 and cython client have similar throughput (< order of magnitude in difference) - indicating that IO/system resources are the main constraint.

However logs from full functional tests, where the server is a real world URL shows about 3x improvement in latency from 2-4 ms (cython) to 0.7 ms (pyo3).

twitu commented 1 year ago

Here's context around the design of the websocket client. The websocket client is async and can do the following -

The client will primarily be used to receive data from the server and occasionally to send messages and heartbeats to the server. Additionally, it might need hooks to check the status of the connection (disconnected, closed etc).

This is the expected client usage. It will keep receiving messages in a loop and the message will be passed to a handler. But message can be sent to from one or more scopes/tasks.

async run(client):
  while client.can_read():
    message = await client.recv()
    client.handler(message)

async do_something_else(client):
  client.send(data)

Using Arc Mutex

While this works well in Python, implementing this in Rust is challenging. Since a websocket has a single underlying TCP stream. Receive from it or sending to it requires a mutable reference [^1][^2]. Since receiving and sending can be done by multiple callers in different tasks. The client needs to be wrapped with an Arc Mutex [^3]. The arc is so that the client can be cloned and passed to different async tasks - for receiving and sending. The mutex locks the client and only allows one task to get mutable access to it.

#[pyclass]
struct WebSocketClient {
  stream: Arc<Mutex<WebSocketStream>>,
}

This is the most straight forward implementation. The assumption here is that messages are received far more than messages are sent. So execution will rarely block on acquiring the lock.

Using channels

We can avoid Arc Mutex by separating the python control flow and the rust client using channels. Here we will utilize our previous assumption that the client will only receive messages in one scope/task - the one running the loop.

Here we have a rust native Client and a Python facing class ClientPy. These are connected by two channels to enable bi-directional communication between them. Client passes messages received from the server back to ClientPy and ClientPy sends the occasional control messages and heart beats to Client which sends them to the server.

The ClientPy is a thin wrapper over the channels. We can assume only bytes will be returned by the client as expected by existing python client adaptors. But we'll need a small custom message struct to differentiate between various control messages sent from Python to the client.

use std::sync::mpsc::{Sender, Receiver};

use pyo3::prelude::*;

struct WebSocketClient { /* stuff */ }

#[pyclass]
struct WebSocketClientPy {
    sender: Sender<CustomMessage>,
    recv: Receiver<Vec<u8>>,
}

#[pymethods]
impl WebSocketClientPy {
    pub fn send(slf: PyRef<'_, Self>, data: u8) {
        slf.sender.send(data);
    }

    pub fn recv(slf: PyRef<'_, Self>) -> u8 {
        slf.recv.recv().unwrap()
    }
}

This implementation can be a simplified further by strictly enforcing the assumption there is only one reader (the python loop) and sender (rust client) for received messages. Then we need only one channel to pass control messages from ClientPy to Client.

[^1]: async implementation https://docs.rs/tokio-tungstenite/latest/tokio_tungstenite/struct.WebSocketStream.html#method.poll_ready [^2]: sync implementation https://docs.rs/tungstenite/latest/tungstenite/protocol/struct.WebSocket.html#method.read_message [^3]: An RWLock is not useful here because both receiving and sending require a mutable reference which behaves like multiple writer

twitu commented 1 year ago

The final design for sockets (websocket and raw) uses a different approach in #1118 . It assumes and enforces a single reader multiple writer model for the client. All methods require mutable references (exclusive access) and pyo3-asyncio requires ownership (Clone), which means the connection has to be wrapped in an Arc Mutex.

However because it is a stream we can cleverly split it into reading and writing ends. This way only the writer needs to be wrapped in an Arc Mutex the reader can remain lockless. This is a great advantage because it is expected that the connection will be read more than written to.

[#pyclass]
pub struct WebSocketClient {
    pub read_task: Option<task::JoinHandle<io::Result<()>>>,
    pub heartbeat_task: Option<task::JoinHandle<()>>,
    write_mutex: Arc<Mutex<SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>>>,
}

impl WebSocketClient {
    pub async fn connect(
        url: &str,
        handler: PyObject,
        heartbeat: Option<u64>,
    ) -> Result<Self, Error>
}

The read end of the connection is moved into an async task that continuously polls the connection for new messages and then passes it on to a handler. Another task is created to heartbeat the server at a fixed interval.

The websocket client is highly stateful and can have these transitions -

Action Result
Server closes connection Read task will receive close message and finish
Write/Heartbeat fails Program terminates
Client calls close heart beat and read task are aborted, writer is closed by sending close message
twitu commented 1 year ago

In #1129 we change the underlying crate used for websockets to fastwebsockets. It's faster and minimalist which suits the intended use case.

fastwebsockets (send):  3.1261ms
fastwebsockets (echo):  8.080872ms
fastwebsockets (recv):  5.525353ms
fastwebsockets:         16.732845ms

soketto (send):  7.998258ms
soketto (echo):  18.857447ms
soketto (recv):  10.228193ms
soketto:         37.08868ms

tokio_tungstenite (send):  7.722288ms
tokio_tungstenite (echo):  17.284609ms
tokio_tungstenite (recv):  10.407554ms
tokio_tungstenite:         35.427836ms

web-socket (send):  2.369528ms
web-socket (echo):  6.243006ms
web-socket (recv):  3.372092ms
web-socket:         11.985043ms

Taken from nurmohammed840/web-socket-benchmark

Since fastwebsockets does not implement the Stream trait we cannot split it's read and write ends - it is one single client that reads and writes. Since both reading and writing need a mutable reference we wrap the client in an Arc<Mutex> and acquire the lock when we need to read or write. Assuming that writes and hearbeats are infrequent there should be very little contention on the lock because there is only one reader.

pub struct WebSocketClient {
    pub read_task: Option<task::JoinHandle<io::Result<()>>>,
    pub heartbeat_task: Option<task::JoinHandle<()>>,
    inner: Arc<Mutex<FragmentCollector<TcpStream>>>,
}
twitu commented 1 year ago

The fastwebsockets design was scrapped because the library has rough edges around TLS and manipulating websocket frames. tokio-tungstenite has excellent TLS support. In fact it's so modular that with a few modifications we've used the TLS parts to add TLS support to raw sockets as well. However, those changes have not been included in the main repo yet (tracked here https://github.com/snapview/tokio-tungstenite/issues/285) so we've vendored relevant parts of the repo.

While fastwebsockets is ... faster, tokio-tungstenite is more suitable, easier to use and maintain.


The latest design of the websockets (#1138) and sockets (#1146) are very similar. Websockets design is illustrated here since it's more featureful.

To enforce single reader, multiple writer we split the underlying stream into reader and writer ends. The reader gets moved to a reader task while the writer end gets wrapped in an Arc<Mutex> so that it can be shared between threads and tasks.

Auto-reconnect is more involved and requires a two layered client. Auto-reconnect can only be done by a continuously running task that monitors the inner client. If the inner client has disconnected the task must reconnect and rebuild the state of the client. The state of the inner client consists of the following -

type MessageWriter = SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>;
type SharedMessageWriter =
    Arc<Mutex<SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>>>;
type MessageReader = SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>;

struct WebSocketClientInner {
    read_task: task::JoinHandle<()>,
    heartbeat_task: Option<task::JoinHandle<()>>,
    writer: SharedMessageWriter,
    url: String,
    handler: PyObject,
    heartbeat: Option<u64>,
}

#[pyclass]
pub struct WebSocketClient {
    writer: SharedMessageWriter,
    controller_task: task::JoinHandle<()>,
    disconnect_mode: Arc<Mutex<bool>>,
}

To mutably change the state of the inner client the reconnect task needs to own the client. However, since we want to expose write/send as a Python interface, we need the a clone of the writer in the outer client as well. This avoids using channels/message passing complexity. However, this means the inner task, after it reconnects, needs to share the new writer with the outer task. The reconnect task locks the mutex and updates the writer shared with the. All subsequent send calls use the updated writer.

let (new_writer, reader) = WebSocketClientInner::connect_with_server(&self.url).await?;
let mut guard = self.writer.lock().await;
*guard = new_writer;
drop(guard);