Closed alsonkemp closed 3 years ago
Argh. Sure enough this likely turned out to be a dangling or misunderstood Future. async/await in Dart doesn't seem to work the same as in JS/TS:
I switched the code around to use .then()
and it seems to be working now. In the code below PubNub
is my wrapper for PubNub
(basically, just the stock client with some helpers). .subscribeAll()
subscribes to a few of our channels (but otherwise is pretty standard). Our .publish()
is fairly specialized because it takes a closure as an argument and synthesizes a request/response cycle (via requestId/responseId). The original PubNub library is imported as original
.
Working:
PubNub pnClient = PubNub(pubKey, subKey, secKey, null);
PubNub pnServer = PubNub(pubKey, subKey, secKey, null);
return await pnServer.subscribeAll().then((_) async {
return pnClient.subscribeAll();
}).then((_) async {
return Future<void>.delayed(Duration(seconds: 1));
}).then((_) async {
Ping ping = Ping('This is a ping...');
Completer completer = Completer<bool>();
original.PublishResult pr = await pnClient.publish('ping', ping, (BaseMessage m) async {
// The server's response comes in here...
print("--- Response received: ${m.toMap()}");
expect(m.envelope.payload['kind'], equals('pong'));
completer.complete(true);
return true;
});
return completer.future;
});
Not working (it looks as though the messages is being published by the client
before the server
is subscribed::
PubNub pnClient = PubNub(pubKey, subKey, secKey, null);
PubNub pnServer = PubNub(pubKey, subKey, secKey, null);
await pnClient.subscribeAll();
await pnServer.subscribeAll();
sleep(Duration(seconds: 1));
Ping ping = Ping('This is a ping...');
Completer completer = Completer<bool>();
original.PublishResult pr = await pnClient.publish('ping', ping, (BaseMessage m) async {
// The server's response SOMETIMES comes in here.
print("--- Response received: ${m.toMap()}");
expect(m.envelope.payload['kind'], equals('pong'));
completer.complete(true);
return true;
});
return completer.future;
Actually, this looks like a bug in the PubNub library. While I've got this working fairly consistently, the logs still look wrong.
In the code above, I go to pains to synchronize the two subscribe calls. The logs show:
2021-02-22 15:31:48.546482] (info) root.pubnub.networking.request_handler: (0) Preparing request.
[2021-02-22 15:31:48.551912] (info) root.pubnub.networking.request_handler: (0) Starting request to "https://ps.pndsn.com/v2/subscribe/sub-c-62246348-17ba-11eb-a0a1-be9072d3ef6c/ping,ptag.*/0?pnsdk=PubNub-Dart%2F3.1.0&tt=0&uuid=b454b417-968b-45f5-92cf-279a68ccc833"...
[2021-02-22 15:31:48.559245] (info) root.pubnub.networking.request_handler: (1) Preparing request.
[2021-02-22 15:31:48.559502] (info) root.pubnub.networking.request_handler: (1) Starting request to "https://ps.pndsn.com/v2/subscribe/sub-c-62246348-17ba-11eb-a0a1-be9072d3ef6c/ping,ptag.*/0?pnsdk=PubNub-Dart%2F3.1.0&tt=0&uuid=34cbda10-b003-4cf8-be69-fbde8fa5bc5b"...
[2021-02-22 15:31:48.761463] (info) root.pubnub.networking.request_handler: (0) Request succeed!
[2021-02-22 15:31:48.766870] (info) root.pubnub.networking.request_handler: (0) Resource released...
[2021-02-22 15:31:48.771694] (silly) root.pubnub.subscribe.subscribe_loop: Result: timetoken 16140361649500789, new messages: 0
[2021-02-22 15:31:48.773695] (silly) root.pubnub.subscribe.subscribe_loop: Updating the state...
[2021-02-22 15:31:48.773859] (silly) root.pubnub.subscribe.subscribe_loop: State has been updated.
[2021-02-22 15:31:48.776061] (silly) root.pubnub.subscribe.subscribe_loop: Starting new loop iteration.
[2021-02-22 15:31:48.776386] (info) root.pubnub.networking.request_handler: (2) Preparing request.
[2021-02-22 15:31:48.776571] (info) root.pubnub.networking.request_handler: (2) Starting request to "https://ps.pndsn.com/v2/subscribe/sub-c-62246348-17ba-11eb-a0a1-be9072d3ef6c/ping,ptag.*/0?pnsdk=PubNub-Dart%2F3.1.0&tt=16140361649500789&tr=1&uuid=b454b417-968b-45f5-92cf-279a68ccc833"...
[2021-02-22 15:31:48.778680] (info) root.pubnub.networking.request_handler: (1) Request succeed!
[2021-02-22 15:31:48.778901] (info) root.pubnub.networking.request_handler: (1) Resource released...
Noticed that the two subscriptions start at nearly the same time (net requests 0 & 1). They should be starting sequentially.
The code for .subscribeAll()
looks pretty blameless:
// Helper to subscribe to a range of useful channels.
// Entirely for the server.
Future<PN.Subscription> subscribeAll() async {
PN.Subscription subscription =
await pn.subscribe(channels: {'ping', 'ptag.*'}, withPresence: false);
print("SUB: ${subscription.isPaused} ${subscription.isCancelled}");
if (!listening) {
subscription.messages.listen(this.handler, onDone: () {
print("!!! ON DONE");
}, onError: (Object error) {
print("!!! LISTEN ERROR: ${error}");
});
listening = true;
}
return subscription;
}
I migrated the 'print' in subscribeAll()
over to a logger. Now I see:
[2021-02-22 15:55:22.813390] (silly) root.pubnub.subscribe.subscribe_loop: State has been updated.
[2021-02-22 15:55:22.817847] (verbose) root.pubnub.subscription.subscription: Resuming subscription.
[2021-02-22 15:55:22.818439] (silly) root.pubnub.subscribe.subscribe_loop: State has been updated.
[2021-02-22 15:55:22.819941] (silly) root.pubnub.subscribe.subscribe_loop: Starting new loop iteration.
[2021-02-22 15:55:22.822056] (info) root: SUB: false false ############### AFTER .SUBSCRIBE 1 #####
[2021-02-22 15:55:22.822692] (silly) root.pubnub.subscribe.subscribe_loop: State has been updated.
[2021-02-22 15:55:22.822769] (verbose) root.pubnub.subscription.subscription: Resuming subscription.
[2021-02-22 15:55:22.822861] (silly) root.pubnub.subscribe.subscribe_loop: State has been updated.
[2021-02-22 15:55:22.824716] (silly) root.pubnub.subscribe.subscribe_loop: Starting new loop iteration.
[2021-02-22 15:55:22.824912] (info) root: SUB: false false ##### AFTER SUBSCRIBE 2 #####
[2021-02-22 15:55:22.828987] (info) root.pubnub.networking.request_handler: (0) Preparing request. ### WAIT? SUB 1 IS **NOW** FIRING
[2021-02-22 15:55:22.833934] (info) root.pubnub.networking.request_handler: (0) Starting request to "https://ps.pndsn.com/v2/subscribe/sub-c-62246348-17ba-11eb-a0a1-be9072d3ef6c/ping,ptag.*/0?pnsdk=PubNub-Dart%2F3.1.0&tt=0&uuid=2f79b512-bfc6-48e9-9fb0-d7d87798cca3"...
[2021-02-22 15:55:22.841208] (info) root.pubnub.networking.request_handler: (1) Preparing request.
[2021-02-22 15:55:22.841442] (info) root.pubnub.networking.request_handler: (1) Starting request to "https://ps.pndsn.com/v2/subscribe/sub-c-62246348-17ba-11eb-a0a1-be9072d3ef6c/ping,ptag.*/0?pnsdk=PubNub-Dart%2F3.1.0&tt=0&uuid=0ffc04b0-53c5-475a-acb9-cfa9fdf74b32"...
Snippet of code.
Future<PN.Subscription> subscribeAll() async {
PN.Subscription subscription =
await pn.subscribe(channels: {'ping', 'ptag.*'}, withPresence: false);
logger.info("SUB: ${subscription.isPaused} ${subscription.isCancelled}"); // RUNNING BEFORE .subscribe()?
Hi there! Wow, I see that you put in a lot of work into this! I greatly appreciate it!
Secondly, .subscribe
call is synchronous! Underneath there is an internal subscribe loop that handles all subscriptions and .subscribe
only adds new information to the internal state. That is why only after you call pn.subscribe
and log the message, the subscribe loop starts the actual request. This happens because the actual subscribe loop state gets calculated asynchronously and a request is made after that subscribe loop state is resolved.
If you want to make sure your subscriptions are going before sending any publishes, add a little delay to publishes (1 second should be enough). This is mainly for the PubNub network itself so it can recognize the messages were sent after the subscribes and therefore those messages can be passed back into the results of the subscribe.
Presence should not impact the subscribes because its actually just a subscribe to an additional channel.
This is my small example trying to reproduce what you have been doing:
import 'package:pubnub/logging.dart';
import 'package:pubnub/pubnub.dart';
Keyset client =
Keyset(subscribeKey: 'demo', publishKey: 'demo', uuid: UUID('client'));
Keyset server =
Keyset(subscribeKey: 'demo', publishKey: 'demo', uuid: UUID('server'));
void main() async {
var logger = StreamLogger.root('root', logLevel: Level.warning);
// Subscribe to messages with a default printer
logger.stream.listen(LogRecord.defaultPrinter);
// Create PubNub instance.
var pubnub = PubNub();
await provideLogger(logger, () async {
var subServer = pubnub.subscribe(channels: {'test'}, keyset: server);
var subClient = pubnub.subscribe(channels: {'test'}, keyset: client);
subServer.messages.listen((message) async {
print('Server hears ${message.uuid} say: ${message.payload}');
if (message.uuid == client.uuid) {
print('Server got pong and is done. Bye!');
await subServer.cancel();
}
});
subClient.messages.listen((message) async {
print('Client hears ${message.uuid} say: ${message.payload}');
if (message.uuid == server.uuid) {
print('Client says: pong');
await pubnub.publish('test', 'pong', keyset: client);
print('Client got ping and is done. Bye!');
await subClient.cancel();
}
});
await Future.delayed(Duration(seconds: 1));
print('Server says: ping');
await pubnub.publish('test', 'ping', keyset: server);
});
}
And here is my output:
[2021-02-23 12:38:56.474493] (verbose) root.pubnub.subscription.subscription Resuming subscription.
[2021-02-23 12:38:56.485092] (verbose) root.pubnub.subscription.subscription Resuming subscription.
[2021-02-23 12:38:56.508210] (info) root.pubnub.networking.request_handler (0) Preparing request.
[2021-02-23 12:38:56.518564] (info) root.pubnub.networking.request_handler (0) Starting request to "https://ps.pndsn.com/v2/subscribe/demo/test/0?pnsdk=PubNub-Dart%2F3.1.0&tt=0&uuid=server"...
[2021-02-23 12:38:56.529135] (info) root.pubnub.networking.request_handler (1) Preparing request.
[2021-02-23 12:38:56.529673] (info) root.pubnub.networking.request_handler (1) Starting request to "https://ps.pndsn.com/v2/subscribe/demo/test/0?pnsdk=PubNub-Dart%2F3.1.0&tt=0&uuid=client"...
[2021-02-23 12:38:56.786633] (info) root.pubnub.networking.request_handler (1) Request succeed!
[2021-02-23 12:38:56.796279] (info) root.pubnub.networking.request_handler (1) Resource released...
[2021-02-23 12:38:56.811840] (info) root.pubnub.networking.request_handler (2) Preparing request.
[2021-02-23 12:38:56.812183] (info) root.pubnub.networking.request_handler (2) Starting request to "https://ps.pndsn.com/v2/subscribe/demo/test/0?pnsdk=PubNub-Dart%2F3.1.0&tt=16140802612445317&tr=12&uuid=client"...
[2021-02-23 12:38:56.814270] (info) root.pubnub.networking.request_handler (0) Request succeed!
[2021-02-23 12:38:56.815549] (info) root.pubnub.networking.request_handler (0) Resource released...
[2021-02-23 12:38:56.816475] (info) root.pubnub.networking.request_handler (3) Preparing request.
[2021-02-23 12:38:56.816825] (info) root.pubnub.networking.request_handler (3) Starting request to "https://ps.pndsn.com/v2/subscribe/demo/test/0?pnsdk=PubNub-Dart%2F3.1.0&tt=16140802612445317&tr=12&uuid=server"...
Server says: ping
[2021-02-23 12:38:57.494924] (verbose) root.pubnub.dx.publish Publishing a message to a channel test
[2021-02-23 12:38:57.499727] (info) root.pubnub.networking.request_handler (4) Preparing request.
[2021-02-23 12:38:57.499990] (info) root.pubnub.networking.request_handler (4) Starting request to "https://ps.pndsn.com/publish/demo/demo/0/test/0/%22ping%22?pnsdk=PubNub-Dart%2F3.1.0&uuid=server"...
[2021-02-23 12:38:57.649769] (info) root.pubnub.networking.request_handler (4) Request succeed!
[2021-02-23 12:38:57.650367] (info) root.pubnub.networking.request_handler (4) Resource released...
[2021-02-23 12:38:57.652778] (info) root.pubnub.networking.request_handler (2) Request succeed!
[20021-02-23 12:38:57.652999] (info) root.pubnub.networking.request_handler (2) Resource released...
Client hears server say: ping
Client says: pong
[2021-02-23 12:38:57.657272] (verbose) root.pubnub.dx.publish Publishing a message to a channel test
[2021-02-23 12:38:57.657956] (info) root.pubnub.networking.request_handler (5) Preparing request.
[2021-02-23 12:38:57.658294] (info) root.pubnub.networking.request_handler (5) Starting request to "https://ps.pndsn.com/publish/demo/demo/0/test/0/%22pong%22?pnsdk=PubNub-Dart%2F3.1.0&uuid=client"...
[2021-02-23 12:38:57.658972] (info) root.pubnub.networking.request_handler (6) Preparing request.
[2021-02-23 12:38:57.659666] (info) root.pubnub.networking.request_handler (6) Starting request to "https://ps.pndsn.com/v2/subscribe/demo/test/0?pnsdk=PubNub-Dart%2F3.1.0&tt=16140803376315782&tr=12&uuid=client"...
[2021-02-23 12:38:57.661060] (info) root.pubnub.networking.request_handler (3) Request succeed!
[2021-02-23 12:38:57.661797] (info) root.pubnub.networking.request_handler (3) Resource released...
Server hears server say: ping
[2021-02-23 12:38:57.663367] (info) root.pubnub.networking.request_handler (7) Preparing request.
[2021-02-23 12:38:57.663587] (info) root.pubnub.networking.request_handler (7) Starting request to "https://ps.pndsn.com/v2/subscribe/demo/test/0?pnsdk=PubNub-Dart%2F3.1.0&tt=16140803376315782&tr=12&uuid=server"...
[2021-02-23 12:38:57.811495] (info) root.pubnub.networking.request_handler (5) Request succeed!
[2021-02-23 12:38:57.811925] (info) root.pubnub.networking.request_handler (5) Resource released...
Client got ping and is done. Bye!
[2021-02-23 12:38:57.813194] (verbose) root.pubnub.subscription.subscription Subscription cancelled.
[2021-02-23 12:38:57.815745] (info) root.pubnub.networking.request_handler (6) Request has been cancelled (reason: Null).
[2021-02-23 12:38:57.817201] (info) root.pubnub.networking.request_handler (6) Resource released...
[2021-02-23 12:38:57.818611] (info) root.pubnub.networking.request_handler (7) Request succeed!
[2021-02-23 12:38:57.819163] (info) root.pubnub.networking.request_handler (7) Resource released...
Server hears client say: pong
Server got pong and is done. Bye!
[2021-02-23 12:38:57.819979] (verbose) root.pubnub.subscription.subscription Subscription cancelled.
Exited
Ah. That makes sense.
FWIW, it's a little confusing having sync-but-sorta-not-sync (subscribe) code mixed in with async code. My assumption is that a sync function returns when its job is done. This is (admittedly) a very small use case (testing subscription with immediate send/receive) but it's a gnarly corner case. I would much prefer that subscribe return when it's actually done but that could break existing users... I'll stick with the delay for the moment.
In any case, I should be good to go. Feel free to close this ticket or leave open as a TODO/reminder/gripe.
Just to give you a little more background on this: subscriptions are actually asynchronous, but the asynchronicity is enclosed in a stream, so they return to the call site immediately. Also multiple subscriptions can be handled by the same request, so it is not guaranteed that a new subscription will create a request to the API immediately. So this is why subscriptions are special - they are the only part that is not in one-to-one correspondence with requests.
I've written a simple client/server implementation to do ping/pong with synthesized request/response and occasionally see missing messages. I know this is not a lot to go on but I'm a bit new to Dart and its async implementation and didn't know if there were any quick gotchas that I was missing. (The code is proprietary so it'll take me a good amount of time to put together a minimal, replicating sample...)
This is a good test session:
Once every 5-10 test runs, I get:
In this case, the test sent out a "Ping" message but only got presence type messages (which aren't decoded in the system (hence the "MSG was null")) back. Is there some way that the presence messages could be disrupting the PubNub library? I've run through the code to see if I have any missing/messed-up async. Any suggestions?
Ah, got debug logging going. This is the failing log:
This is the good log: