rsocket / rsocket-js

JavaScript implementation of RSocket
https://github.com/rsocket/rsocket-js
Apache License 2.0
605 stars 97 forks source link

RSocketWebSocketClient is not a constructor #263

Open dev-morph opened 1 year ago

dev-morph commented 1 year ago

First of all, Thanks for your nice work! But when I follow your guide, I got in trouble.

Expected Behavior

I followed example page code of Link I thought I could easily connect RSocketServer.

Actual Behavior

But I got RSocketWebSocketClient is not a constructor ERROR, even though I just copy and paste guide code. I searched it, I found I should downgrade to ^0.0.27. both of rsocket-core and rsocket-websocket-client. and with 0.0.27 version, it works like a charm! maybe guide is only for version 0.0.27? I am little bit confused.

Steps to Reproduce

After npm install rsocket-core rsocket-websocket-client; it will install 1.0.0-alpha.3. and then copy paste guide code.

viglucci commented 1 year ago

Hey @dev-morph thanks for the heads up. We'll update the guide to help avoid this issue. With that being said, I would recommend using the latest alpha version if possible. We likely won't be supporting the 0.27 versions long term once the 1.0.0 versions land and the 1.0.0 alpha versions are not receiving large breaking changes at the moment so they should be relatively safe to begin learning, especially if this is your first introduction to RSocket-js.

The best way to learn the 1.0.0 versions is currently going to be through the examples under packages/RSocket-examples.

saleweaver commented 1 month ago

@dev-morph if you're still looking to get it working, here's the Websocket Client converted to Typescript


import type {
  DuplexConnection,
  Frame,
  ISubject,
  ISubscriber,
  ISubscription,
} from 'rsocket-types';
import type { Encoders } from 'rsocket-core';

import { Flowable } from 'rsocket-flowable';
import {
  deserializeFrame,
  deserializeFrameWithLength,
  printFrame,
  serializeFrame,
  serializeFrameWithLength,
  toBuffer,
} from 'rsocket-core';
/**
 * Connection status types representing the various states of the connection.
 */
export type ConnectionStatus =
  | { kind: "NOT_CONNECTED" }
  | { kind: "CONNECTING" }
  | { kind: "CONNECTED" }
  | { kind: "CLOSED" }
  | { kind: "ERROR"; error: Error };

/**
 * Constants representing each non-error connection status.
 */
export const CONNECTION_STATUS = {
  NOT_CONNECTED: { kind: "NOT_CONNECTED" } as ConnectionStatus,
  CONNECTING: { kind: "CONNECTING" } as ConnectionStatus,
  CONNECTED: { kind: "CONNECTED" } as ConnectionStatus,
  CLOSED: { kind: "CLOSED" } as ConnectionStatus,
} as const;

export type ClientOptions = {
  url: string;
  wsCreator?: (url: string) => WebSocket;
  debug?: boolean;
  lengthPrefixedFrames?: boolean;
};

/**
 * A WebSocket transport client for use in browser environments.
 */
export default class RSocketWebSocketClient implements DuplexConnection {
  private _encoders?: Encoders<any>;
  private _options: ClientOptions;
  private _receivers: Set<ISubscriber<Frame>>;
  private _senders: Set<ISubscription>;
  private _socket?: WebSocket;
  private _status: ConnectionStatus;
  private _statusSubscribers: Set<ISubject<ConnectionStatus>>;

  constructor(options: ClientOptions, encoders?: Encoders<any>) {
    this._encoders = encoders;
    this._options = options;
    this._receivers = new Set();
    this._senders = new Set();
    this._socket = undefined;
    this._status = CONNECTION_STATUS.NOT_CONNECTED;
    this._statusSubscribers = new Set();
  }

  close(): void {
    this._close();
  }

  connect(): void {
    if (this._status.kind !== 'NOT_CONNECTED') {
      throw new Error(
        'RSocketWebSocketClient: Cannot connect(), a connection is already established.',
      );
    }
    this._setConnectionStatus(CONNECTION_STATUS.CONNECTING);

    const wsCreator = this._options.wsCreator;
    const url = this._options.url;
    this._socket = wsCreator ? wsCreator(url) : new WebSocket(url);

    if (!this._socket) {
      throw new Error('RSocketWebSocketClient: Failed to create WebSocket.');
    }

    const socket = this._socket;
    socket.binaryType = 'arraybuffer';

    socket.addEventListener('close', this._handleClosed);
    socket.addEventListener('error', this._handleError);
    socket.addEventListener('open', this._handleOpened);
    socket.addEventListener('message', this._handleMessage);
  }

  connectionStatus(): Flowable<ConnectionStatus> {
    return new Flowable((subscriber) => {
      subscriber.onSubscribe({
        cancel: () => {
          this._statusSubscribers.delete(subscriber);
        },
        request: () => {
          this._statusSubscribers.add(subscriber);
          subscriber.onNext(this._status);
        },
      });
    });
  }

  receive(): Flowable<Frame> {
    return new Flowable((subject) => {
      subject.onSubscribe({
        cancel: () => {
          this._receivers.delete(subject);
        },
        request: () => {
          this._receivers.add(subject);
        },
      });
    });
  }

  sendOne(frame: Frame): void {
    this._writeFrame(frame);
  }

  send(frames: Flowable<Frame>): void {
    let subscription: ISubscription | undefined;
    frames.subscribe({
      onComplete: () => {
        if (subscription) {
          this._senders.delete(subscription);
        }
      },
      onError: (error) => {
        if (subscription) {
          this._senders.delete(subscription);
        }
        this._close(error);
      },
      onNext: (frame) => this._writeFrame(frame),
      onSubscribe: (_subscription) => {
        subscription = _subscription;
        this._senders.add(subscription);
        subscription.request(Number.MAX_SAFE_INTEGER);
      },
    });
  }

  private _close(error?: Error): void {
    if (this._status.kind === 'CLOSED' || this._status.kind === 'ERROR') {
      // already closed
      return;
    }
    const status: ConnectionStatus = error
      ? { error, kind: 'ERROR' }
      : CONNECTION_STATUS.CLOSED;
    this._setConnectionStatus(status);
    this._receivers.forEach((subscriber) => {
      if (error) {
        subscriber.onError(error);
      } else {
        subscriber.onComplete();
      }
    });
    this._receivers.clear();
    this._senders.forEach((subscription) => subscription.cancel());
    this._senders.clear();
    const socket = this._socket;
    if (socket) {
      socket.removeEventListener('close', this._handleClosed);
      socket.removeEventListener('error', this._handleError);
      socket.removeEventListener('open', this._handleOpened);
      socket.removeEventListener('message', this._handleMessage);
      socket.close();
      this._socket = undefined;
    }
  }

  private _setConnectionStatus(status: ConnectionStatus): void {
    this._status = status;
    this._statusSubscribers.forEach((subscriber) => subscriber.onNext(status));
  }

  private _handleClosed = (event: CloseEvent): void => {
    this._close(
      new Error(
        event.reason || 'RSocketWebSocketClient: Socket closed unexpectedly.',
      ),
    );
  };

  private _handleError = (event: Event): void => {
    this._close(new Error('RSocketWebSocketClient: WebSocket encountered an error.'));
  };

  private _handleOpened = (): void => {
    this._setConnectionStatus(CONNECTION_STATUS.CONNECTED);
  };

  private _handleMessage = (message: MessageEvent): void => {
    try {
      const frame = this._readFrame(message);
      this._receivers.forEach((subscriber) => subscriber.onNext(frame));
    } catch (error) {
      this._close(error instanceof Error ? error : new Error(String(error)));
    }
  };

  private _readFrame(message: MessageEvent): Frame {
    const buffer = toBuffer(message.data);
    const frame = this._options.lengthPrefixedFrames
      ? deserializeFrameWithLength(buffer, this._encoders)
      : deserializeFrame(buffer, this._encoders);

    return frame;
  }

  private _writeFrame(frame: Frame): void {
    try {

      const buffer = this._options.lengthPrefixedFrames
        ? serializeFrameWithLength(frame, this._encoders)
        : serializeFrame(frame, this._encoders);
      if (!this._socket) {
        throw new Error(
          'RSocketWebSocketClient: Cannot send frame, not connected.',
        );
      }
      this._socket.send(buffer);
    } catch (error) {
      this._close(error instanceof Error ? error : new Error(String(error)));
    }
  }
}