gql-dart / gql

Libraries supporting GraphQL in Dart
MIT License
268 stars 124 forks source link

How to use WebSocketMaker with cupertino_http #467

Closed orestesgaolin closed 3 months ago

orestesgaolin commented 3 months ago

I'd like to use cupertino_http for underlying websocket implementation.

It supports websocket connections as described here. In my graphql client I'm using TransportWebSocketLink. How can I use CupertinoWebSocket instead of gql's WebSocketChannel?

import 'package:gql_websocket_link/gql_websocket_link.dart' as gql_ws;

    final wsLink = gql_ws.TransportWebSocketLink(
      gql_ws.TransportWsClientOptions(
        socketMaker: gql_ws.WebSocketMaker.generator(() async {
          final socket = await CupertinoWebSocket.connect(
            Uri.parse(baseWs),
          );
         // not sure how to proceed

        }),
        retryWait: (retries) {
          _log.i('Retrying websocket connection in 2 sec, attempt $retries');
          return Future.delayed(const Duration(seconds: 2));
        },
        connectionParams: () async {
          var token = await _getToken();

          return {
            'headers': {
              'Authorization': token,
              'Hasura-Client-Name': 'flutter',
              'Content-Type': 'application/json',
            },
          };
        },
      ),
    );
knaeckeKami commented 3 months ago

Hi!

The TransportWebSocketLink uses web_socket_channel to use a single WebSocket implementation for both web and IO.

I think you should be able to use the adapter of this package in order to use the cupertino implementation like

 socketMaker: WebSocketMaker.generator((){
      final socket = CupertinoWebSocket.connect(
        Uri.parse("url"),
      );
      return AdapterWebSocketChannel(socket);
    })
orestesgaolin commented 3 months ago

Thanks, I was able to achieve it as follows:


import 'dart:async';
import 'dart:io';
import 'dart:typed_data';

import 'package:async/async.dart' as async;
import 'package:cupertino_http/cupertino_http.dart';
import 'package:gql_websocket_link/gql_websocket_link.dart' as gql_ws;
import 'package:graphql/client.dart';
import 'package:http/http.dart';
import 'package:stream_channel/stream_channel.dart' as sc;
import 'package:web_socket/web_socket.dart' as ws;
import 'package:web_socket_channel/web_socket_channel.dart' as wsc;

final wsLink = gql_ws.TransportWebSocketLink(
  gql_ws.TransportWsClientOptions(
    socketMaker: gql_ws.WebSocketMaker.generator(() async {
      return AdapterWebSocketChannel(
        CupertinoWebSocket.connect(
          Uri.parse(baseWs),
          protocols: ['graphql-transport-ws'],
          config: URLSessionConfiguration.defaultSessionConfiguration(),
        ),
      );
    }),
  ),
);

// adjusted from https://github.com/dart-lang/web_socket_channel/blob/master/lib/adapter_web_socket_channel.dart#L135
class AdapterWebSocketChannel extends sc.StreamChannelMixin implements wsc.WebSocketChannel {
  @override
  String? get protocol => _protocol;
  String? _protocol;

  @override
  int? get closeCode => _closeCode;
  int? _closeCode;

  @override
  String? get closeReason => _closeReason;
  String? _closeReason;

  /// The close code set by the local user.
  ///
  /// To ensure proper ordering, this is stored until we get a done event on
  /// [StreamChannelController.local]`.stream`.
  int? _localCloseCode;

  /// The close reason set by the local user.
  ///
  /// To ensure proper ordering, this is stored until we get a done event on
  /// [StreamChannelController.local]`.stream`.
  String? _localCloseReason;

  /// Completer for [ready].
  final _readyCompleter = Completer<void>();

  @override
  Future<void> get ready => _readyCompleter.future;

  @override
  Stream get stream => _controller.foreign.stream;

  final _controller = sc.StreamChannelController<Object?>(sync: true, allowForeignErrors: false);

  @override
  late final wsc.WebSocketSink sink = _WebSocketSink(this);

  /// Creates a new WebSocket connection.
  ///
  /// If provided, the [protocols] argument indicates that subprotocols that
  /// the peer is able to select. See
  /// [RFC-6455 1.9](https://datatracker.ietf.org/doc/html/rfc6455#section-1.9).
  ///
  /// After construction, the [AdapterWebSocketChannel] may not be
  /// connected to the peer. The [ready] future will complete after the channel
  /// is connected. If there are errors creating the connection the [ready]
  /// future will complete with an error.
  factory AdapterWebSocketChannel.connect(Uri url, {Iterable<String>? protocols}) =>
      AdapterWebSocketChannel(ws.WebSocket.connect(url, protocols: protocols));

  // Construct a [WebSocketWebSocketChannelAdapter] from an existing
  // [WebSocket].
  AdapterWebSocketChannel(FutureOr<ws.WebSocket> webSocket) {
    Future<ws.WebSocket> webSocketFuture;
    if (webSocket is ws.WebSocket) {
      webSocketFuture = Future.value(webSocket);
    } else {
      webSocketFuture = webSocket;
    }

    webSocketFuture.then((webSocket) {
      webSocket.events.listen((event) {
        switch (event) {
          case ws.TextDataReceived(text: final text):
            _controller.local.sink.add(text);
          case ws.BinaryDataReceived(data: final data):
            _controller.local.sink.add(data);
          case ws.CloseReceived(code: final code, reason: final reason):
            _closeCode = code;
            _closeReason = reason;
            _controller.local.sink.close();
        }
      });
      _controller.local.stream.listen((obj) {
        try {
          switch (obj) {
            case final String s:
              webSocket.sendText(s);
            case final Uint8List b:
              webSocket.sendBytes(b);
            case final List<int> b:
              webSocket.sendBytes(Uint8List.fromList(b));
            default:
              throw UnsupportedError('Cannot send ${obj.runtimeType}');
          }
        } on ws.WebSocketConnectionClosed {
          // There is nowhere to surface this error; `_controller.local.sink`
          // has already been closed.
        }
      }, onDone: () async {
        try {
          await webSocket.close(_localCloseCode, _localCloseReason);
        } on ws.WebSocketConnectionClosed {
          // It is not an error to close an already-closed `WebSocketChannel`.
        }
      });
      _protocol = webSocket.protocol;
      _readyCompleter.complete();
    }, onError: (Object e) {
      Exception error;
      if (e is TimeoutException) {
        // Required for backwards compatibility with `IOWebSocketChannel`.
        error = e;
      } else {
        error = wsc.WebSocketChannelException.from(e);
      }
      _readyCompleter.completeError(error);
      _controller.local.sink.addError(error);
      _controller.local.sink.close();
    });
  }
}

/// A [WebSocketSink] that tracks the close code and reason passed to [close].
class _WebSocketSink extends async.DelegatingStreamSink implements wsc.WebSocketSink {
  /// The channel to which this sink belongs.
  final AdapterWebSocketChannel _channel;

  _WebSocketSink(AdapterWebSocketChannel channel)
      : _channel = channel,
        super(channel._controller.foreign.sink);

  @override
  Future close([int? closeCode, String? closeReason]) {
    _channel._localCloseCode = closeCode;
    _channel._localCloseReason = closeReason;
    return super.close();
  }
}