gql-dart / gql

Libraries supporting GraphQL in Dart
MIT License
268 stars 123 forks source link

fix(graphql-transport-ws): ensure result message is processed before … #466

Open agufagit opened 2 months ago

agufagit commented 2 months ago

fix(graphql-transport-ws): ensure result message (next or error) is processed before complete message; do not close connection if keepalive is on

knaeckeKami commented 2 months ago

Hi!

Could you explain (on a higher level) what exactly this does, why it's needed/an improvement and document the change in behaviour with a test?

I don't fully understand it yet.

agufagit commented 2 months ago

Hi!

Could you explain (on a higher level) what exactly this does, why it's needed/an improvement and document the change in behaviour with a test?

I don't fully understand it yet.

I'm using websocket connection not only for subscriptions, but also for single result operations. This pr is to fix 2 bugs that occur for single-result operations.

this part of code in graph-transport-ws.dart causes race condition, if "complete" message is received shortly after "next" message, it's possible that "complete" message is processed before "next" message (because of this step final messageMap = await options.graphQLSocketMessageDecoder(msg);), which closes sink, but "next" message will still add data to already closed sink, which causes process to hang, there is no error thrown

try {
    final messageMap = await options.graphQLSocketMessageDecoder(msg);
    message = parseMessage(messageMap!, options.parser);
  } catch (e, s) {
    throw WebSocketLinkParserException(
      // TODO: should not be ConnectionAck
      message: ConnectionAck(),
      originalException: e,
      originalStackTrace: s,
    );
  }

this part of code closes connection whenever there are no more operations in process, which is not desired, because I want to do a bunch of single-result operations before I do subscription

released.then((_) {
          if (locks == 0) {
            // and if no more locks are present, complete the connection
            ...
          }
  }
)
knaeckeKami commented 2 months ago

Thanks! I can see how awaiting the decoded message is problematic.

try {
    final messageMap = await options.graphQLSocketMessageDecoder(msg);
    message = parseMessage(messageMap!, options.parser);
  } catch (e, s) {
    throw WebSocketLinkParserException(
      // TODO: should not be ConnectionAck
      message: ConnectionAck(),
      originalException: e,
      originalStackTrace: s,
    );
  }

I observe the following:

So, the await is probably unnecessary for the vast majority of users

              final messageMapFutureOr =
                  options.graphQLSocketMessageDecoder(msg);
              final Map<String, dynamic>? messageMap;
              if (messageMapFutureOr is Future) {
                messageMap = await messageMapFutureOr;
              } else {
                messageMap = messageMapFutureOr;
              }

This would avoid this problem by processing the message synchronously, if possible.

But it would be a breaking change, so let's investigate on how to handle the path with a future:

            int waitTimeOut = 0;
            while (message is CompleteMessage &&
                nextOrErrorMsgWaitMap.containsKey(message.id)) {
              await Future.delayed(const Duration(milliseconds: 100));
              waitTimeOut++;
              // if next or error message is not arrived or processed in 60 seconds, break the loop
              if (waitTimeOut >= 600) {
                break;
              }
            }

This avoid the endless loop, but I would prefer a solution without polling.

How about making the

nextOrErrorMsgWaitMapa map Map<String, Completer<void>>? Then we could just await the Completer instead of continuously polling for it.

And before removing the id, we'd call complete() on the existing completer.

knaeckeKami commented 2 months ago

this part of code closes connection whenever there are no more operations in process, which is not desired, because I want to do a bunch of single-result operations before I do subscription

Sounds reasonable. Could you split this part up in a separate PR and add a test, which ensures that this behavior isn't get broken in a future update?

There are already similar tests in test/gql_websocket_link_test.dart. Based on your description, it could just be executing two queries sequentially and asserting the the same connection is used (if I understood the issue correctly)

agufagit commented 2 months ago

I'll add tests in next couple of weeks, kinda tied up at the moment.

I can't make another pr for close connection fix because my app needs both changes in the same repo to work

agufagit commented 1 month ago

the new commit fix(graphql_transport_ws): completer error Future already completed fixes error

Bad state: Future already completed

For single-result operations, error message can complete future, but completer.complete() will be called after future is completed if caused by error, so need to check before call

james-pellow commented 3 weeks ago

We are needing similar things. I'm wondering how this PR is coming? Thanks for all your work on it!

agufagit commented 2 weeks ago

very busy at the moment, I need at least 1 day to do tests, my estimation is November, you can point to my repo in the mean time

gql_websocket_link:
    git:
       url: https://github.com/agufagit/gql
       ref: master
       path: links/gql_websocket_link

If you have time to do tests, feel free to make a PR to my repo