leonardocustodio / polkadart

Polkadart provides developers the ability to query a node and interact with the Polkadot based chains using Dart.
https://polkadart.dev
Apache License 2.0
39 stars 16 forks source link

How to work with `provider.subscribe` #338

Closed clangenb closed 1 year ago

clangenb commented 1 year ago

I could not find any tests for it, and when I tried to implement a for author_submitAndWatchExtrinsic I noticed that I don't get any events in my subscriptions. I also tried to get events for chain_subscribeFinalizedHeads.

May I ask for help please? This is the commit I am trying to fix: https://github.com/encointer/encointer-wallet-flutter/commit/1f15d2d64c152b6cda96b0c0236b2fda462aec54

clangenb commented 1 year ago

I also added some debug logs into our polkadart fork, so I can see that I get the subscription updates, and that I am indeed fetching the previous StreamController and so on:

received stream msg: {jsonrpc: 2.0, result: Ib9NTZfXSKFVSQO3, id: 0}
Message is valid
Received stream msg: {jsonrpc: 2.0, method: chain_finalizedHead, params: {subscription: Ib9NTZfXSKFVSQO3, result: {parentHash: 0xc7070a09bb916cf27c036a9350e95387aaa2000c200abfc619b7f62b1961474f, number: 0xd, stateRoot: 0x79891160c85d0c71f254559794e5d17ed4504939d6a2668c1fceeb705ded8ca4, extrinsicsRoot: 0xa1258313af8f2c33feb51020fa1e61085d87c1b3433f3b346967d3c9a5da3983, digest: {logs: [0x06617572612087f8dc1000000000, 0x0561757261010134f5fda2f3efb850b3abfe2540a44baae50a18da090abdcf8d250bab80d1b478d6597f06665e2b22344e62c7789bcb93fdf06a9f028a11841e923e09ef0e0d86]}}}}
Message is valid
getting controller: chain_finalizedHead, Ib9NTZfXSKFVSQO3
no existing method subscription found
getting new stream controller
no existing method subscription found
getting new stream controller
Received stream msg: {jsonrpc: 2.0, method: chain_finalizedHead, params: {subscription: Ib9NTZfXSKFVSQO3, result: {parentHash: 0x3946f04a745f92fd3d4f6c87c257a8824a1c22ad28cec2e4076aa38717862dac, number: 0xe, stateRoot: 0x77d3e58d1b3e4052017c0a1827c0c041db69373ac63772a89d5551d46caa0578, extrinsicsRoot: 0x347aa463a27c52f925da5749f04e00b35fc2a00164ea66a84080b9d5658a110e, digest: {logs: [0x06617572612088f8dc1000000000, 0x056175726101012a398fb500ac00e341f1fcb6b0c9d380bf9d24b27601fbf10e6bcdc8ef09c04ec132564e946a8728752dbeb374ae98747d8ff0a8e0cdd9326d7f488c65bde582]}}}}
Message is valid
getting controller: chain_finalizedHead, Ib9NTZfXSKFVSQO3
using previous method subscription
getting existing stream controller

So, I can confirm that controller.add(message) is called, but I don't receive the message on the other end.

clangenb commented 1 year ago

Ok, I found out that it is because the controller thinks that it doesn't have any listeners. I don't know why this is the case yet. I think the code looks ok:

// method under test
Future<StreamSubscription<String>> subscribeFinalizedHeads(void Function(String) onData) async {
    final subscription = await _provider.subscribe(
      'chain_subscribeFinalizedHeads',
      [],
      onCancel: (subscription) async {
        await _provider.send('chain_unsubscribeFinalizedHeads', [subscription]);
      },
    );

    return subscription.stream.map((event) => event.toString()).listen(onData);
}

// test
test('subscribing to finalized heads works', () async {
    final polkadart = Provider.fromUri(Uri.parse('ws://localhost:9944'));
    // Note: bad name, chain api is only used for prototyping
    final author = EWAuthorApi(polkadart);

    final completer = Completer<void>();
    final sub = await author.subscribeFinalizedHeads((event) {
        print('Event: $event');
        completer.complete();
   });

   await completer.future.then((_) => sub.cancel());

   // Fixme: we never get here
   print('End');
});
clangenb commented 1 year ago

Ok, I put some more logs, which uncovered the issue: The method in the subscription does not match the method that is contained in the stream, substrate WTF.

Subscribing to method: chain_subscribeFinalizedHeads
Received stream msg: {jsonrpc: 2.0, result: riaUnOsrR9UWxwIk, id: 0}
Invalid stream msg received
Received stream msg: {jsonrpc: 2.0, method: chain_finalizedHead, params: {subscription: riaUnOsrR9UWxwIk, result: {parentHash: 0xe68f792057febe3631a1db03e0fd1b679731107ee6d96559b94c0eb1532f322a, number: 0x8b, stateRoot: 0x0a362da577d37391547e98168f884d8021a490cbd144707013edc927dd5c3a89, extrinsicsRoot: 0x6ed2b361e27b56abe9c948eeea10d4f19a0fe05bcdc08509bc267a7e3a26a91c, digest: {logs: [0x066175726120a233dd1000000000, 0x05617572610101decf9a99d24e407ea169709169fab26540fbf52985b915b52e9b2619053cc2218e2f86f92adbacf74ec0ad61fc83c1c90b8ebb1cc08536dfcc63f037a40a6f8c]}}}}
Message is valid
getting controller: chain_finalizedHead, riaUnOsrR9UWxwIk
Getting controller for method: chain_finalizedHead, and subscription: riaUnOsrR9UWxwIk
no existing method subscription found
getting new stream controller
controller doesn't have listeners, will not fire event
Getting controller for method: chain_subscribeFinalizedHeads, and subscription: riaUnOsrR9UWxwIk
no existing method subscription found
getting new stream controller
awaiting completion
Received stream msg: {jsonrpc: 2.0, method: chain_finalizedHead, params: {subscription: riaUnOsrR9UWxwIk, result: {parentHash: 0x82d00fc7759d3770cfe098a4f8410d8c574cb86b2f280ee3d58ec2ce49ce53fd, number: 0x8c, stateRoot: 0x0092f850d35a5f1312582f2f3e9eae04b68186ba49276f580f6bac37abdff79c, extrinsicsRoot: 0xf380a83292681906737a0c022204fb851628f6d498ca724f5f1a80a60d5c46d3, digest: {logs: [0x066175726120a333dd1000000000, 0x05617572610101b6acbdc9ac7efd16d90a64a26a5aef7754e17a20b141dde4aa6cc8b8ecb06325913ec0e922f8aaa86cbbc255d40358268a9a3bd2f287777942761e069a35f487]}}}}

This is why two stream controllers are created, one by the method caller and one by the stream and the method caller subscribes to a controller that never receives a message...

clangenb commented 1 year ago

So the current implementation of subscriptions is probably broken for all RPCs with an alias when the old version is used: https://github.com/paritytech/polkadot-sdk/blob/024fce70f4c6c2f87bbbb870e96a269258d9f3a4/substrate/client/rpc-api/src/chain/mod.rs#L69

clangenb commented 1 year ago

Oh, I was wrong. It has noting to do with the aliases. It is intentional that the method to subscribe is not the same as the method of the subscription message, I can't understand the rationale, though. However, I guess we can conclude that the StreamController map in the provider should not be keyed by the subscription method, but rather the subscription ID.

#[subscription(
      name = "chain_subscribeFinalizedHeads" => "chain_finalizedHead",
      aliases = ["chain_subscribeFinalisedHeads"],
      unsubscribe = "chain_unsubscribeFinalizedHeads",
      unsubscribe_aliases = ["chain_unsubscribeFinalisedHeads"],
      item = Header
)]
clangenb commented 1 year ago

So the same is true for autho_submitAndWatchExtrinsic

Subscribing to method: author_submitAndWatchExtrinsic
Received stream msg: {jsonrpc: 2.0, result: bkrTyonMHqS8pUJS, id: 0}
Invalid stream msg received
Received stream msg: {jsonrpc: 2.0, method: author_extrinsicUpdate, params: {subscription: bkrTyonMHqS8pUJS, result: ready}}
Message is valid
getting controller: author_extrinsicUpdate, bkrTyonMHqS8pUJS
Getting controller for method: author_extrinsicUpdate, and subscription: bkrTyonMHqS8pUJS
no existing method subscription found
getting new stream controller
controller doesn't have listeners, will not fire event
Getting controller for method: author_submitAndWatchExtrinsic, and subscription: bkrTyonMHqS8pUJS
no existing method subscription found
getting new stream controller
Received stream msg: {jsonrpc: 2.0, method: author_extrinsicUpdate, params: {subscription: bkrTyonMHqS8pUJS, result: {inBlock: 0x295879228e824f32666da2a8b21999a46f38cd1fc9944d7965455bd9fd607517}}}
Message is valid
getting controller: author_extrinsicUpdate, bkrTyonMHqS8pUJS
Getting controller for method: author_extrinsicUpdate, and subscription: bkrTyonMHqS8pUJS
using previous method subscription
getting existing stream controller
controller doesn't have listeners, will not fire event