bytemunch / realtime-rs

Apache License 2.0
7 stars 0 forks source link

Async handlers and error handling #1

Open richchurcher opened 2 months ago

richchurcher commented 2 months ago

Thanks so much for this very useful library. I think part of my struggle below may be to Rust ineptitude, but I wondered if you had also encountered difficulties with this kind of code and had any suggestions?

Essentially, if I'm already in a Tokio async task and set up handlers, I notice that I can't use an async handler so need to spawn an additional task to use any kind of async code in the handler:

        .on_postgres_change(
            PostgresChangesEvent::Insert,
            PostgresChangeFilter {
                schema: "public".into(),
                table: Some("follows".into()),
                ..Default::default()
            },
            move |msg| {
                if let Some(record) = msg.data.record.clone() {
                    let sender = follows_sender.clone();
                    tokio::spawn(async move {
                        let Some(display_name) = record.get("display_name") else {
                            println!("missing display_name");
                            return;
                        };
                        let Some(name) = display_name.as_str() else {
                            println!("display_name was unexpectedly empty");
                            return;
                        };
                        let _ = sender
                            .send(OutgoingTraffic {
                                display_name: Some(name.into()),
                                event_type: crate::outgoing::OutgoingEventType::Follow,
                            })
                            .await;
                    });
                }
            },
        )

(where sender is an mpsc::Sender).

This demonstrates two things:

richchurcher commented 2 months ago

I suppose the broader question of "how would the handler report an error" to the wider context of the program is an open one, but I'd be curious if you have ideas around error/failure management when responding to Supabase changes.

bytemunch commented 2 months ago

Yeah this was a pain point for myself too, both returning values from handlers and doing async work in one.

I've used channels to extract values from the callback code, e.g.

let (tx, rx) = mpsc::channel(255);

channel.on_presence(event.clone(), move |key, old, new| {
    let ev = (key, old, new);
    tx.try_send(ev).unwrap();
});

which could be a result, like

let (tx, rx) = mpsc::channel(255);

channel.on_presence(event.clone(), move |key, old, new| {
    if some_condition {
        tx.try_send(Err(ErrorType);
        return;
    }
    tx.try_send(Ok((key, old, new))).unwrap();
});

and then move the rx somewhere it can be polled. That's what I ended up doing so far for the bevy plugin I'm writing on top of this crate but I'm not sure it's a scalable or ergonomic approach. Also, bevy prefers sync code.

I'm afraid I only know enough tokio to be dangerous, not quite sure if the JS-like callback pattern is compatible with the way rust async works. I may experiment with adding an async version of each of the on_* functions to allow async move |msg| { call_async(msg).await } as the callback argument. nope, async closures are unstable to take a tokio async block? Unsure, need more tokio knowledge.

The callback closure is a sync function that gets moved into a Vec when the on_* function runs, so I think that takes it outside of the tokio runtime, which would explain the need to spawn when inside the handler; it's not in an async context anymore. This is also the reason they can't return to the definition site, it's not the closure's call site. The call site (for on_broadcast) is buried in https://github.com/bytemunch/realtime-rs/blob/6d25b1eb3e6cad5485254b1e358cb1b53261b6f0/src/realtime_channel.rs#L361

Sorry if I'm rambling, it's been a little while since I've touched this code so need to familiarise myself as I go.

Let me know if this makes sense, and if you have any ideas for a more ergonomic API. It's very possible I got too far into the weeds of trying to mirror the JS API that I've footgunned in a way I don't yet understand 😅

richchurcher commented 2 months ago

Interesting! I'm very new to Rust async myself, but I will ponder the idea. Right now I have realtime-rs happily listening to postgres changes and firing off messages in channels to other processes. I don't suppose it would be terribly egregious to also send errors in the same way... I'll have a think about it. Thanks so much for taking the time to sketch this out for me.

richchurcher commented 2 months ago

Oh I just re-read this:

That's what I ended up doing so far for the bevy plugin I'm writing

and realised you might have partially answered the other questions I have, because I'm using all this for a Twitch overlay written in Bevy. Doing the dance around not being able to use async is proving tricky for the likes of me though!

richchurcher commented 2 months ago

Oh for goodness' sake, you've got a whole bevy-supabase over there! I will take a look and hopefully be able to help out or at least kick the tyres for you :laughing: