braverhealth / phoenix-socket-dart

Cross-platform and stream-based implementation of Phoenix Sockets
https://pub.dev/packages/phoenix_socket
BSD 3-Clause "New" or "Revised" License
74 stars 37 forks source link

Can't add a new channel on socket after closing one. #20

Closed hworld closed 3 years ago

hworld commented 3 years ago

Hey, another one related to closing channels. Once you close a channel, you can't add a new one.

I changed the 'closes successfully' test to this:

test('closes successfully', () async {
  final socket1 = PhoenixSocket(addr);
  await socket1.connect();
  final channel1 = socket1.addChannel(topic: 'channel3');
  await channel1.join().future;

  final socket2 = PhoenixSocket(addr);
  await socket2.connect();
  final channel2 = socket2.addChannel(topic: 'channel3');
  await channel2.join().future;

  channel1.push('ping', {'from': 'socket1'});

  expect(
    channel2.messages,
    emits(
      predicate(
        (msg) => msg.payload['from'] == 'socket1',
        'was from socket1',
      ),
    ),
  );

  await channel1.leave().future;

  expect(channel1.state, equals(PhoenixChannelState.closed));
  expect(socket1.channels.length, equals(0));

  final channel3 = socket1.addChannel(topic: 'channel3');
  await channel3.join().future;

  channel3.push('ping', {'from': 'socket1'});

  expect(
    channel2.messages,
    emits(
      predicate(
        (msg) => msg.payload['from'] == 'socket1',
        'was from socket1',
      ),
    ),
  );
});

It proceeds fully until it gets to the channel3.join().future call which will time out eventually.

I'll continue to try digging into the code to see why this may be happening, but I haven't figured it out yet.

hworld commented 3 years ago

Current thought is that it might be related to the StreamRouter that is set up in _topicStreams in PhoenixSocket. In removeChannel() it's doing this:

  void removeChannel(PhoenixChannel channel) {
    _logger.finer(() => 'Removing channel ${channel.topic}');
    if (channels.remove(channel.reference) is PhoenixChannel) {
      _topicStreams.remove(channel.topic);
    }
  }

This is just removing from the map, but not canceling the routing of the topics on the StreamRouter. I don't actually know how a stream router works, so I'll be looking into that now to see if it could be that.

hworld commented 3 years ago

I hacked in some code to verify my theory and it does seem correct. Since the route on the stream can't be removed, it'll continue splitting those events away from the stream with no way to subscribe it to the same topic again. I don't know enough about streams in dart to know the best way to solve this. It seems to me that this should be handled within an internal routing function on the socket when new messages arrive, but I'm not sure.

matehat commented 3 years ago

Good catch! I'll see if I can fix this really quickly

matehat commented 3 years ago

Fix is part of the new 0.4.11 release! Thanks again for your investigation @hworld !

matehat commented 3 years ago

I also filed an PR upstream in google/quiver-dart#673

hworld commented 3 years ago

Works perfectly for me now. Thanks so much for tackling these so fast!