bytemunch / bevy-realtime

Apache License 2.0
0 stars 1 forks source link

Channel disconnections #3

Closed richchurcher closed 5 months ago

richchurcher commented 6 months ago

Not sure if you want separate issues for the nokio branch! Just organising my thoughts around this, but I notice quite frequently that I see Disconnected(..) at https://github.com/bytemunch/bevy-supabase/blob/main/crates/bevy-realtime/src/postgres_changes.rs#L61. Obviously it'll panic in its current state due to the unwrap, but it's less clear to me why the channel is disconnecting.

One theory I had is that we can't store a channel as a component? Possibly it gets cloned or duplicated in some way that's not terribly friendly to the rx from crossbeam? I wondered if perhaps we might need to look at storing it as a Resource instead, though that may feel less ergonomic. I'll have a wee play locally and see if anything helps.

richchurcher commented 6 months ago

I wonder if this might be of use... https://github.com/johanhelsing/bevy_crossbeam_event :thinking:

richchurcher commented 6 months ago

Given that they use a resource, it might be an interesting option to explore. It's actually a tiny library, so not much of a dependency!

To the original problem, I noticed that there were timing issues and when I created RealtimePlugin much earlier in the startup my Disconnects went away (or at least they seem to have for now!) I might see if I can get the branch working with bevy_crossbeam_event so that we can evaluate it.

bytemunch commented 6 months ago

Ooh that crate might be exactly what we need! Crossbeam rxs are thread friendly I think so passing them around shouldn't be an issue. I'll look into it all when I have time, if we can restructure in a more bevy way then future development will get exponentially easier

On Fri, 17 May 2024, 07:26 Rich Churcher, @.***> wrote:

Given that they use a resource, it might be an interesting option to explore. It's actually a tiny library, so not much of a dependency!

To the original problem, I noticed that there were timing issues and when I created RealtimePlugin much earlier in the startup my Disconnects went away (or at least they seem to have for now!) I might see if I can get the branch working with bevy_crossbeam_event so that we can evaluate it.

— Reply to this email directly, view it on GitHub https://github.com/bytemunch/bevy-realtime/issues/3, or unsubscribe https://github.com/notifications/unsubscribe-auth/AKMCYI66UVHAKCD7KMM2OPLZCWPHTAVCNFSM6AAAAABH3P77ECVHI2DSMVQWIX3LMV43OSLTON2WKQ3PNVWWK3TUHMZDCMJWHA2DCMJXHE . You are receiving this because you are subscribed to this thread.Message ID: @.***>

richchurcher commented 6 months ago

Hrm. Sad to say, it wasn't an instant fix. It's quite an ergonomic approach, e.g.

impl AppExtend for App {
    fn add_postgres_event<E: Event + PostgresPayloadEvent + Clone, F: Component>(
        &mut self,
    ) -> &mut Self {
        self.add_crossbeam_event::<E>()
            .add_systems(Update, postgres_forward::<E, F>)
        // self.add_event::<E>().add_systems(
        //     Update,
        //     (postgres_forward::<E, F>, forwarder_recv::<E>).chain(),
        // )
    }
}

and therefore:

pub fn postgres_forward<E: Event + PostgresPayloadEvent + Clone, T: Component>(
    mut commands: Commands,
    mut q: Query<
        (Entity, &mut BevyChannelBuilder, &PostgresForwarder<E>),
        (Added<PostgresForwarder<E>>, With<T>),
    >,
    sender: Res<CrossbeamEventSender<E>>,
) {
    for (e, mut cb, event) in q.iter_mut() {
        let s = sender.clone();
        cb.0.on_postgres_change(event.event.clone(), event.filter.clone(), move |payload| {
            let ev = E::new(payload.clone());
            s.send(ev);
        });

        commands.entity(e).remove::<PostgresForwarder<E>>();
    }
}

However, I still got disconnection events on the channel at runtime. They consistently appear on some runs, but not others! So clearly some kind of timing issue, but what I as yet have no idea! Will try to come up with a small-ish reproduction for further study.

richchurcher commented 6 months ago

It does seem to be only in my project that I can replicate these disconnects! Annoying. I'll keep trying to find the secret ingredient... image

richchurcher commented 6 months ago

Ah. It seems to become more prevalent the more tables you listen to. I can reliably make it happen most often with about five, using the following sample code:

use bevy::prelude::*;
use bevy_http_client::HttpClientPlugin;
use bevy_supabase::prelude::*;

#[allow(dead_code)]
#[derive(Clone, Event, Debug)]
pub struct ChatMessageEvent {
    payload: PostgresChangesPayload,
}

impl PostgresPayloadEvent for ChatMessageEvent {
    fn new(payload: PostgresChangesPayload) -> Self {
        Self { payload }
    }
}

#[allow(dead_code)]
#[derive(Clone, Event, Debug)]
pub struct FollowEvent {
    payload: PostgresChangesPayload,
}

impl PostgresPayloadEvent for FollowEvent {
    fn new(payload: PostgresChangesPayload) -> Self {
        Self { payload }
    }
}

#[allow(dead_code)]
#[derive(Clone, Event, Debug)]
pub struct AdsEvent {
    payload: PostgresChangesPayload,
}

impl PostgresPayloadEvent for AdsEvent {
    fn new(payload: PostgresChangesPayload) -> Self {
        Self { payload }
    }
}

#[allow(dead_code)]
#[derive(Clone, Event, Debug)]
pub struct SubscriptionEvent {
    payload: PostgresChangesPayload,
}

impl PostgresPayloadEvent for SubscriptionEvent {
    fn new(payload: PostgresChangesPayload) -> Self {
        Self { payload }
    }
}

#[allow(dead_code)]
#[derive(Clone, Event, Debug)]
pub struct FirstMessageEvent {
    payload: PostgresChangesPayload,
}

impl PostgresPayloadEvent for FirstMessageEvent {
    fn new(payload: PostgresChangesPayload) -> Self {
        Self { payload }
    }
}

fn main() {
    let Ok(apikey) = std::env::var("SUPABASE_LOCAL_ANON_KEY") else {
        panic!("SUPABASE_LOCAL_ANON_KEY must be set.");
    };

    let mut app = App::new();

    app.add_plugins((
        DefaultPlugins,
        HttpClientPlugin,
        SupabasePlugin {
            apikey,
            endpoint: "http://127.0.0.1:54321".into(),
            ..default()
        },
    ))
    .add_systems(Startup, init)
    .add_systems(
        Update,
        (
            read_chat_messages,
            read_follows,
            read_ads,
            read_subscriptions,
            read_first_messages,
        )
            .chain(),
    )
    .add_postgres_event::<AdsEvent, BevyChannelBuilder>()
    .add_postgres_event::<ChatMessageEvent, BevyChannelBuilder>()
    .add_postgres_event::<FirstMessageEvent, BevyChannelBuilder>()
    .add_postgres_event::<SubscriptionEvent, BevyChannelBuilder>()
    .add_postgres_event::<FollowEvent, BevyChannelBuilder>();

    app.run()
}

pub fn init(client: ResMut<Client>, mut commands: Commands) {
    let channel = client.channel("table_changes".into());

    let mut c = commands.spawn(BevyChannelBuilder(channel));

    c.insert(PostgresForwarder::<ChatMessageEvent>::new(
        PostgresChangesEvent::Insert,
        PostgresChangeFilter {
            schema: "public".into(),
            table: Some("messages".into()),
            filter: None,
        },
    ));
    c.insert(PostgresForwarder::<FollowEvent>::new(
        PostgresChangesEvent::Insert,
        PostgresChangeFilter {
            schema: "public".into(),
            table: Some("follows".into()),
            filter: None,
        },
    ));
    c.insert(PostgresForwarder::<AdsEvent>::new(
        PostgresChangesEvent::Insert,
        PostgresChangeFilter {
            schema: "public".into(),
            table: Some("ads".into()),
            filter: None,
        },
    ));
    c.insert(PostgresForwarder::<SubscriptionEvent>::new(
        PostgresChangesEvent::Insert,
        PostgresChangeFilter {
            schema: "public".into(),
            table: Some("subscriptions".into()),
            filter: None,
        },
    ));
    c.insert(PostgresForwarder::<FirstMessageEvent>::new(
        PostgresChangesEvent::Insert,
        PostgresChangeFilter {
            schema: "public".into(),
            table: Some("first_messages".into()),
            filter: None,
        },
    ));
    c.insert(BuildChannel);
}

fn read_chat_messages(mut evr: EventReader<ChatMessageEvent>) {
    for _ev in evr.read() {
        println!("::: chat message received :::");
    }
}
fn read_first_messages(mut evr: EventReader<FirstMessageEvent>) {
    for _ev in evr.read() {
        println!("::: first message received :::");
    }
}

fn read_ads(mut evr: EventReader<AdsEvent>) {
    for _ev in evr.read() {
        println!("::: ads starting :::");
    }
}

fn read_subscriptions(mut evr: EventReader<SubscriptionEvent>) {
    for _ev in evr.read() {
        println!("::: subscription received :::");
    }
}

fn read_follows(mut evr: EventReader<FollowEvent>) {
    for _ev in evr.read() {
        println!("::: channel followed :::");
    }
}
bytemunch commented 5 months ago

I think this is a race condition, the plugin is trying to make a channel before the client is connected, leading to the channels being disconnected when they try to instantiate but fail.

I'm working on a client_ready runcondition to allow for bevy systems to run at the correct time, but runconditions on Update functions are run every frame and I'm struggling to work in an async way where I don't overflow the channel, but also don't hang the program when connecting. Got a few ideas I'm gonna try implement, surely one of them will work

richchurcher commented 5 months ago

Yeah the speed of systems run on update is a bit of a confounding factor for sure, when dealing with anything outside the game engine.

surely one of them will work

Surely!

richchurcher commented 5 months ago

Data point: tried with the new client_ready but sadly, no improvement. (bevy_crossbeam_events swallows Disconnected(), but it's still clear the events are not being received by the Receiver.)

richchurcher commented 5 months ago

Just checked with the latest example (world callback, client in world resource). Still semi-inconsistent results, still playing around to see if it's PEBCAK or a race condition continues to exist somewhere I haven't yet :laughing: This is with a sample set of five table subscriptions. I'm starting to wonder if I should come up with a docker-based integration test for this case, as I eventually plan to subscribe to double-digit numbers of tables and may as well ensure it's working consistently!

bytemunch commented 5 months ago

Yeah I've been testing with one channel per operation so far, I plan to write a multi-table example, as well as include the required SQL to create example tables on the backend to make testing easier.

I'm planning to rewrite the ChannelForwarder logic to leverage bevy_crossbeam_event and oneshot systems (#6) soon so hopefully that will solve these issues.

richchurcher commented 5 months ago

Oh awesome, I wasn't sure where you were up to with the oneshot stuff. I'm tempted to ask the author of bevy_crossbeam_event if they'd accept a PR to stop swallowing the Disconnected error, since it feels like something consuming code should be able to respond to.