supabase / realtime-py

A Python Client for Phoenix Channels
MIT License
134 stars 56 forks source link

Auto reconnect loses subscription to `postgres_changes` events #213

Open etzelc opened 1 month ago

etzelc commented 1 month ago

Bug report

Describe the bug

The auto_reconnect feature, which was fixed in PR #203, is not functioning as expected. When the connection to the realtime server is lost and reconnected using auto_reconnect = true, the client successfully rejoins the channel but does not resubscribe to the postgres_changes events.

To Reproduce

An extracted version of the code setup:

      client = AsyncRealtimeClient(f"{url}/realtime/v1", key, auto_reconnect=True)
      await client.connect()
      channel = client.channel("new-test-db-channel")

      await channel.on_postgres_changes(
          "INSERT",
          schema="public",
          table="table_masked",
          filter="filter_masked",
          callback=my_callback_function,
      ).on_postgres_changes(
          "UPDATE",
          schema="public",
          table="table_masked",
          filter="filter_masked",
          callback=my_callback_function,
      ).subscribe()

      # before we listen, we are doing some processing
      process_something()

      await client.listen()

Expected behavior

After the connection is automatically reestablished, all postgres_changes events should be resubscribed.

Log


2024-09-30T13:05:21.388061183Z INFO: Connection was successful
2024-09-30T13:05:21.388617410Z INFO: send: {"topic": "realtime:new-test-db-channel", "event": "phx_join", "payload": {"config": {"broadcast": {"ack": false, "self": false}, "presence": {"key": ""}, "private": false, "postgres_changes": [{"event": "INSERT", "schema": "public", "table": table_masked, "filter": "filter_masked"}, {"event": "UPDATE", "schema": "public", "table": "table_masked", "filter": "filter_masked"}]}, "access_token": "access_token_masked"}, "ref": "1", "join_ref": "1"}
... OUR_APP_LOG: Some processing with logs for nearly 3 minutes
2024-09-30T13:08:13.161949439Z INFO: send: {"topic": "phoenix", "event": "heartbeat", "payload": {}, "ref": null}
2024-09-30T13:08:13.163777556Z INFO: receive: {"ref":"1","event":"phx_reply","payload":{"status":"ok","response":{"postgres_changes":[{"id":47427136,"event":"INSERT","filter":"filter_masked","schema":"public","table":"table_masked"},{"id":25027634,"event":"UPDATE","filter":"filter_masked","schema":"public","table":"table_masked"}]}},"topic":"realtime:new-test-db-channel"}
2024-09-30T13:08:13.164177247Z INFO: receive: {"ref":null,"event":"presence_state","payload":{},"topic":"realtime:new-test-db-channel"}
2024-09-30T13:08:13.164928848Z INFO: receive: {"ref":null,"event":"system","payload":{"message":"Subscribed to PostgreSQL","status":"ok","extension":"postgres_changes","channel":"new-test-db-channel"},"topic":"realtime:new-test-db-channel"}
2024-09-30T13:08:13.164953762Z INFO: receive: {"ref":null,"event":"postgres_changes","payload":{"data":{"table":"table_masked","type":"UPDATE","record":{"column_name_masked":"data_masked"},"columns":[{"name":"column_name_masked","type":"type_masked"}],"errors":null,"commit_timestamp":"2024-09-30T13:06:15.367Z","schema":"public","old_record":{"id":"72c747f7-d79d-4d5e-abac-b87a6e417003"}},"ids":[25027634,47427136]},"topic":"realtime:new-test-db-channel"}
2024-09-30T13:08:13.308654328Z INFO: OUR_APP_LOG: Callback invoked
2024-09-30T13:08:13.308726428Z INFO: receive: {"ref":null,"event":"postgres_changes","payload":{"data":{"table":"table_masked","type":"UPDATE","record":{"column_name_masked":"data_masked"},"columns":[{"name":"column_name_masked","type":"type_masked"}],"errors":null,"commit_timestamp":"2024-09-30T13:06:15.503Z","schema":"public","old_record":{"id":"00fcd532-48c6-44f9-8fa0-bbec3d17aece"}},"ids":[25027634,47427136]},"topic":"realtime:new-test-db-channel"}
2024-09-30T13:08:13.443820966Z INFO: OUR_APP_LOG: Callback invoked
2024-09-30T13:08:13.445383040Z INFO: Connection with server closed, trying to reconnect...
2024-09-30T13:08:13.710289333Z INFO: Connection was successful
2024-09-30T13:08:13.712427114Z INFO: send: {"topic": "realtime:new-test-db-channel", "event": "phx_join", "payload": {"config": {"config": {"broadcast": {"ack": false, "self": false}, "presence": {"key": ""}, "private": false}}}, "ref": null}
2024-09-30T13:08:13.782978712Z INFO: receive: {"ref":null,"event":"phx_reply","payload":{"status":"ok","response":{"postgres_changes":[]}},"topic":"realtime:new-test-db-channel"}
2024-09-30T13:08:13.783590005Z INFO: receive: {"ref":null,"event":"presence_state","payload":{},"topic":"realtime:new-test-db-channel"}
2024-09-30T13:08:43.172802339Z INFO: send: {"topic": "phoenix", "event": "heartbeat", "payload": {}, "ref": null}
2024-09-30T13:08:43.224117072Z INFO: receive: {"ref":null,"event":"phx_reply","payload":{"status":"ok","response":{}},"topic":"phoenix"}
2024-09-30T13:08:43.224161401Z INFO: Channel phoenix not found
2024-09-30T13:09:13.191831336Z INFO: send: {"topic": "phoenix", "event": "heartbeat", "payload": {}, "ref": null}
2024-09-30T13:09:13.243016173Z INFO: receive: {"ref":null,"event":"phx_reply","payload":{"status":"ok","response":{}},"topic":"phoenix"}
2024-09-30T13:09:13.244175515Z INFO: Channel phoenix not found
... 
No further "event":"postgres_changes" arriving.

System information

jackyliang commented 1 week ago

I have the same issue, and it appears a lot of people are. Can the devs please provide an answer or workaround for this? The library unfortunately is not usable because of this disconnection.

etzelc commented 1 week ago

Here is pseudocode of the workaround I am using successfully. auto_reconnect is set to false:

client = AsyncRealtimeClient(f"{url}/realtime/v1", key, auto_reconnect=False)

# Retry logic to manage reconnections within a set limit 
while connection_loss_counter <= MAX_RECONNECTS:
    try:
        await client.connect()
        channel = client.channel("the-channel")
        channel.on_postgres_changes( ... ).subscribe(on_subscribe_callback)

        try:
            await client.listen()
        finally:
            # Calculate connection_loss_counter for retry logic (it is using a time window approach)
            record_connection_loss()
    except Exception:
        # Continue the retry loop
        continue

If too many exceptions occur (e.g., due to connection loss), additional mechanisms are executed after the loop to handle the situation.

jackyliang commented 1 week ago

How does setting auto_reconnect to false fix your issue, out of curiosity?

etzelc commented 6 days ago

With auto_reconnect to false I don't use the broken reconncect logic, but handle it myself. listen() throws an exception if the connection is lost and the loop re-establishes everything.