dart-lang / sdk

The Dart SDK, including the VM, dart2js, core libraries, and more.
https://dart.dev
BSD 3-Clause "New" or "Revised" License
9.94k stars 1.53k forks source link

DOM streams drop events when the stream is paused, even for web sockets. #34656

Open DanTup opened 5 years ago

DanTup commented 5 years ago

I'm investigating an issue with dropped WebSocket events in the browser. It seems to be when the stream is paused:

int i = 0;
final WebSocket ws = new WebSocket('ws://demos.kaazing.com/echo');
ws.onOpen.listen((_) {
  new Timer.periodic(const Duration(milliseconds: 500),
      (_) => ws.sendString((i++).toString()));
  final sub = ws.onMessage.listen((e) => print(e.data));
  new Timer(const Duration(seconds: 5), () => sub.pause());
  new Timer(const Duration(seconds: 10), () => sub.resume());
  new Timer(const Duration(seconds: 15), () => sub.cancel());
});

According to the docs, the events should be buffered. I (re-)found that the docs say:

Currently DOM streams silently drop events when the stream is paused. This is a bug and will be fixed.

I don't know if this is the same issue (I'm assuming so, but I don't know if WebSockets events are "DOM streams") but that text was added 5 years ago but I can't find any other information on it (nor can I find an open issue here).

So, I'm wondering what the status is. Is it still considered a bug that will be fixed, or should the docs be updated? (Also - is it likely the same issue effecting web sockets?).

mit-mit commented 5 years ago

cc @lrhn

lrhn commented 5 years ago

The DOM event streams are fairly special. Each time you read the onClick event getter, you get a new single-subscription stream. Internally that single-subscription stream adds and removes DOM event listeners when it's paused and resumed, so it doesn't provide the intermediate events in any way. It used to be that way, I don't actually know if this is still the case.

That behavior is nontraditional, possibly unexpected, but it is not wrong. A single subscription stream is allowed to send whatever events it wants to. This is not a broadcast stream, there is no requirement that all listeners get the same events.

This is actually much closer to the model for streams we had hoped to introduce in Dart 2, where the distinction between broadcast and single-subscription streams was turned into variants of a more general stream behavior.

So, if the streams in question document the behavior, I'll retract the "bug" verdict and consider it a valid design choice.

It's still dangerous because a simple await for (var event in o.onClick) { ... } could miss clicks if the body is asynchronous.

For sockets, which are often streams of chunks of a larger entity, I think dropping events is a bug. A quick check suggests that it is implemented using the same EventStreamProvider class, and it probably shouldn't be. I don't actually know anything about web sockets, so I could be completely wrong. I'll leave that choice to the Dart4Web developers.

DanTup commented 5 years ago

Each time you read the onClick event getter, you get a new single-subscription stream. Internally that single-subscription stream adds and removes DOM event listeners when it's paused and resumed

Ah, I see why it behaves as it does then (though I'm not sure I understand why they're not just broadcast streams?).

For sockets, which are often streams of chunks of a larger entity, I think dropping events is a bug.

I think so too, though I guess it's easy to argue that I shouldn't pause the stream if I don't want this. The original code that exposed this was actually because we were calling asyncMap. When processing web sockets where the handler is async, but we need it to complete before we process the next message, asyncMap seemed like a valid solution - however this behaviour means we lose other messages instead of buffering them.

I worked around it by converting out web socket stream into a single-subscription stream like this:

Stream<T> convertBroadcastToSingleSubscriber<T>(Stream<T> stream) {
  final StreamController<T> controller = new StreamController<T>();
  StreamSubscription<T> subscription;
  controller.onListen =
      () => subscription = stream.listen((T e) => controller.add(e));
  controller.onCancel = () => subscription.cancel();
  return controller.stream;
}

However it feels a bit hacky. If it is decided to be by design though, it should probably be added to the docs where the current note is, since it's not obvious that the existing text applies to WebSockets (I also wonder if it's consistent between the browser and IO implementations - I haven't checked this).

lrhn commented 5 years ago

It is a little hacky, but probably necessary since doing controller.addStream(stream) would propagate the pauses.

I'll keep this issue open as a bug-report against the HTML library: Sockets should not drop events on pause.