supabase-community / supabase-csharp

A C# Client library for Supabase
https://github.com/supabase-community/supabase-csharp/wiki
MIT License
484 stars 50 forks source link

Calling Unsubscribe from a channel is still receiving update messages #125

Open GrimLothar opened 9 months ago

GrimLothar commented 9 months ago

I have these 2 methods:

    public async void SubscribeToTournamentChanges(int tournament_id, TournamentChangedHandler handler)
    {
        var channelName = "tournaments:" + tournament_id;
        channels.TryGetValue(channelName, out RealtimeChannel channel);
        if (channel == null)
        {
            channel = _supabase.Realtime.Channel(channelName);
            channels.Add(channelName, channel);
            channel.Register(new PostgresChangesOptions("public", "aw-tournaments", ListenType.Updates, "id=eq." + tournament_id ));
            channel.AddPostgresChangeHandler(ListenType.Updates, (_, change) =>
            {
                if (change.Payload?.Data?.Table == "aw-tournaments")
                {
                    //Tournament data changed
                    var model = change.Model<Tournament>();
                    if (model != null)
                    {
                        _mainThreadContext.Post(_ => handler.Invoke(model), null);
                    }
                }
            });
        }
        await channel.Subscribe();
    }
    public void UnsubscribeToTournamentChanges(int tournament_id)
    {
        var channelName = "tournaments:" + tournament_id;
        channels.TryGetValue(channelName, out RealtimeChannel channel);
        if (channel != null)
        {
            channel.Unsubscribe();
        }
    }

When I call channel.Unsubscribe(); i see in the logs that something gets changed:

Socket Push [topic: realtime:tournaments:5, event: phx_leave, ref: 5505e55a-d2d9-484d-8d98-34fc9d3c3d9d]:
    null
UnityEngine.Debug:Log (object)
SupabaseManager/<>c:<Start>b__9_0 (object,string,System.Exception) (at Assets/TcgEngine/SupabaseManager.cs:60)
Supabase.Realtime.Debugger:Log (object,string,System.Exception)
Supabase.Realtime.RealtimeSocket:Push (Supabase.Realtime.Socket.SocketRequest)
Supabase.Realtime.Channel.Push:Send ()
Supabase.Realtime.RealtimeChannel:Unsubscribe ()
SupabaseManager:UnsubscribeToTournamentChanges (int) (at Assets/TcgEngine/SupabaseManager.cs:222)
TcgEngine.UI.TournamentDetailsPanel:OnBack () (at Assets/TcgEngine/Scripts/UI/TournamentDetailsPanel.cs:149)
UnityEngine.Events.UnityEvent:Invoke ()
TcgEngine.UI.AWButtonNew:OnClick () (at Assets/TcgEngine/Scripts/UI/AWButtonNew.cs:34)
UnityEngine.EventSystems.EventSystem:Update () (at ./Library/PackageCache/com.unity.ugui@1.0.0/Runtime/EventSystem/EventSystem.cs:530)

But when I change my tournaments table in the DB, I still get the message. I would expect Unsubscribe to stop getting these (thus saving bandwith)

What is Unsubscribe meant to do if not this?

Thanks!

acupofjose commented 9 months ago

Thanks for the issue!

To clarify, the websocket it still receiving messages? Or your callback is continuing to be called?

GrimLothar commented 9 months ago

Thanks for the issue!

To clarify, the websocket it still receiving messages? Or your callback is continuing to be called?

Both!

acupofjose commented 9 months ago

@grimAgent I'm struggling to duplicate - are you sure you don't have multiple socket clients connected?

I wrote the following test in the RealtimeTests project (see the realtime-csharp repo] using a primary channel (demonstrating the Unsubscribe functionality) and a secondary channel (different channel with same postgres_changes parameters but remaining subscribed).

It passes as expected:

[TestMethod("Channel: Unsubscribes")]
public async Task ChannelDisconnectsAppropriately()
{
    var callbackInvokedTCS = new TaskCompletionSource<bool>();
    var primaryClientInvocations = 0;
    var secondaryClientInvocations = 0;

    var channel1 = _socketClient!.Channel("test:unsubscribes");
    channel1.Register(new PostgresChangesOptions("public", "todos", ListenType.Inserts, "user_id=eq.1"));
    channel1.AddPostgresChangeHandler(ListenType.Inserts, (_, changes) =>
    {
        primaryClientInvocations++;
        callbackInvokedTCS.TrySetResult(true);
    });
    await channel1.Subscribe();

    var channel2 = _socketClient!.Channel("test:unsubscribes:2");
    channel2.Register(new PostgresChangesOptions("public", "todos", ListenType.Inserts, "user_id=eq.1"));
    channel2.AddPostgresChangeHandler(ListenType.Inserts, (_, changes) => secondaryClientInvocations++);
    await channel2.Subscribe(); 

    await _restClient!.Table<Todo>().Insert(new Todo { UserId = 1 });
    await callbackInvokedTCS.Task;

    channel1.Unsubscribe();

    await _restClient!.Table<Todo>().Insert(new Todo { UserId = 1 });
    await _restClient!.Table<Todo>().Insert(new Todo { UserId = 1 });

    await Task.Delay(3000);
    Assert.AreEqual(1, primaryClientInvocations);
    Assert.AreEqual(3, secondaryClientInvocations);
}

With the following console output:

Debug Trace:
Socket Reconnection: Initial
Socket Connected to: ws://realtime-dev.localhost:4000/socket/websocket?apikey=eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJpc3MiOiIiLCJpYXQiOjE2NzEyMzc4NzMsImV4cCI6MjAwMjc3Mzk5MywiYXVkIjoiIiwic3ViIjoiIiwicm9sZSI6ImF1dGhlbnRpY2F0ZWQifQ.qoYdljDZ9rjfs1DKj5_OqMweNtj7yk20LZKlGNLpUO8&vsn=1.0.0
Socket State Change: Open
Socket Push [topic: phoenix, event: heartbeat, ref: 91de2812-09d7-4fcc-8263-2834ae862423]:
    {}
Socket Push [topic: realtime:test:unsubscribes, event: phx_join, ref: 23775bd6-5117-405d-bdaa-99074b798f30]:
    {
  "config": {
    "broadcast": {
      "self": false,
      "ack": false
    },
    "presence": {
      "key": ""
    },
    "postgres_changes": [
      {
        "schema": "public",
        "table": "todos",
        "filter": "user_id=eq.1",
        "event": "INSERT"
      }
    ]
  }
}
Socket Message Received:
    {"event":"phx_reply","payload":{"response":{},"status":"ok"},"ref":"91de2812-09d7-4fcc-8263-2834ae862423","topic":"phoenix"}
Socket Message Received:
    {"event":"phx_reply","payload":{"response":{"postgres_changes":[{"id":48556260,"event":"INSERT","filter":"user_id=eq.1","schema":"public","table":"todos"}]},"status":"ok"},"ref":"23775bd6-5117-405d-bdaa-99074b798f30","topic":"realtime:test:unsubscribes"}
Socket Message Received:
    {"event":"presence_state","payload":{},"ref":null,"topic":"realtime:test:unsubscribes"}
Socket Message Received:
    {"event":"system","payload":{"channel":"test:unsubscribes","extension":"postgres_changes","message":"Subscribed to PostgreSQL","status":"ok"},"ref":null,"topic":"realtime:test:unsubscribes"}
Socket Push [topic: realtime:test:unsubscribes:2, event: phx_join, ref: 2fcb5d42-4470-4be3-8660-49b3c081dda3]:
    {
  "config": {
    "broadcast": {
      "self": false,
      "ack": false
    },
    "presence": {
      "key": ""
    },
    "postgres_changes": [
      {
        "schema": "public",
        "table": "todos",
        "filter": "user_id=eq.1",
        "event": "INSERT"
      }
    ]
  }
}
Socket Message Received:
    {"event":"phx_reply","payload":{"response":{"postgres_changes":[{"id":48556260,"event":"INSERT","filter":"user_id=eq.1","schema":"public","table":"todos"}]},"status":"ok"},"ref":"2fcb5d42-4470-4be3-8660-49b3c081dda3","topic":"realtime:test:unsubscribes:2"}
Socket Message Received:
    {"event":"presence_state","payload":{},"ref":null,"topic":"realtime:test:unsubscribes:2"}
Socket Message Received:
    {"event":"system","payload":{"channel":"test:unsubscribes:2","extension":"postgres_changes","message":"Subscribed to PostgreSQL","status":"ok"},"ref":null,"topic":"realtime:test:unsubscribes:2"}
Socket Message Received:
    {"event":"postgres_changes","payload":{"data":{"columns":[{"name":"id","type":"int8"},{"name":"name","type":"text"},{"name":"notes","type":"text"},{"name":"done","type":"bool"},{"name":"details","type":"text"},{"name":"inserted_at","type":"timestamp"},{"name":"numbers","type":"_int4"},{"name":"user_id","type":"text"},{"name":"status","type":"todo_status"}],"commit_timestamp":"2023-11-29T03:43:19.106Z","errors":null,"record":{"details":null,"done":false,"id":25,"inserted_at":null,"name":null,"notes":null,"numbers":[],"status":"NOT STARTED","user_id":"1"},"schema":"public","table":"todos","type":"INSERT"},"ids":[48556260]},"ref":null,"topic":"realtime:test:unsubscribes:2"}
Socket Message Received:
    {"event":"postgres_changes","payload":{"data":{"columns":[{"name":"id","type":"int8"},{"name":"name","type":"text"},{"name":"notes","type":"text"},{"name":"done","type":"bool"},{"name":"details","type":"text"},{"name":"inserted_at","type":"timestamp"},{"name":"numbers","type":"_int4"},{"name":"user_id","type":"text"},{"name":"status","type":"todo_status"}],"commit_timestamp":"2023-11-29T03:43:19.106Z","errors":null,"record":{"details":null,"done":false,"id":25,"inserted_at":null,"name":null,"notes":null,"numbers":[],"status":"NOT STARTED","user_id":"1"},"schema":"public","table":"todos","type":"INSERT"},"ids":[48556260]},"ref":null,"topic":"realtime:test:unsubscribes"}
Socket Push [topic: realtime:test:unsubscribes, event: phx_leave, ref: 998db8ee-771d-43f8-be6a-11d82334af56]:
    null
Socket Message Received:
    {"event":"phx_reply","payload":{"response":{},"status":"ok"},"ref":"998db8ee-771d-43f8-be6a-11d82334af56","topic":"realtime:test:unsubscribes"}
Socket Message Received:
    {"event":"phx_close","payload":{},"ref":"23775bd6-5117-405d-bdaa-99074b798f30","topic":"realtime:test:unsubscribes"}
Socket Message Received:
    {"event":"postgres_changes","payload":{"data":{"columns":[{"name":"id","type":"int8"},{"name":"name","type":"text"},{"name":"notes","type":"text"},{"name":"done","type":"bool"},{"name":"details","type":"text"},{"name":"inserted_at","type":"timestamp"},{"name":"numbers","type":"_int4"},{"name":"user_id","type":"text"},{"name":"status","type":"todo_status"}],"commit_timestamp":"2023-11-29T03:43:19.150Z","errors":null,"record":{"details":null,"done":false,"id":26,"inserted_at":null,"name":null,"notes":null,"numbers":[],"status":"NOT STARTED","user_id":"1"},"schema":"public","table":"todos","type":"INSERT"},"ids":[48556260]},"ref":null,"topic":"realtime:test:unsubscribes:2"}
Socket Message Received:
    {"event":"postgres_changes","payload":{"data":{"columns":[{"name":"id","type":"int8"},{"name":"name","type":"text"},{"name":"notes","type":"text"},{"name":"done","type":"bool"},{"name":"details","type":"text"},{"name":"inserted_at","type":"timestamp"},{"name":"numbers","type":"_int4"},{"name":"user_id","type":"text"},{"name":"status","type":"todo_status"}],"commit_timestamp":"2023-11-29T03:43:19.155Z","errors":null,"record":{"details":null,"done":false,"id":27,"inserted_at":null,"name":null,"notes":null,"numbers":[],"status":"NOT STARTED","user_id":"1"},"schema":"public","table":"todos","type":"INSERT"},"ids":[48556260]},"ref":null,"topic":"realtime:test:unsubscribes:2"}
Socket Closed at 11/28/2023 9:43:22 PM
GrimLothar commented 9 months ago

I don't think I have multiple clients connected... you can see my full implementation here: https://github.com/supabase-community/supabase-csharp/discussions/126