ReactiveX / rxjs

A reactive programming library for JavaScript
https://rxjs.dev
Apache License 2.0
30.64k stars 3k forks source link

Using rxjs/webSocket on a server #5385

Open kevmo314 opened 4 years ago

kevmo314 commented 4 years ago

Feature Request

Right now WebSocketSubject only accepts a constructor. Can this be changed to also accept an existing socket in the config?

Specifically, my use case is that I'd like to use rxjs with websocket/ws on my server, and their API would allow me to do something like:

wss.on("connection", (ws: WebSocket) => {
  const rxjsWebSocket = webSocket(ws); // Wrap the node WebSocket into a subject.
});

wrt api compatibility, I think it's reasonable to assume the web socket remains w3c compatible instead of worrying about the NodeJS.WebSocket API. There are various compatibility shims to adapt the assorted NodeJS variants to w3c sockets. I'm curious if this was an intentional choice to not allow wrapping an existing socket or if this can be done?

six-edge commented 3 years ago

I'm working on making a transparent WebSocket layer for connecting a browser to a remote server (Raspberry Pi) to be able to subscribe directly to sensors and set outputs etc. from the browser or front-end application. So the idea is to have a WebSocket server that handles incoming connections and subscribes in a seamless manner to the GPIO ports.

This would be a welcomed feature!

abarke commented 3 years ago

@kevmo314 Seems there is a way to do this already:

const WebSocket = require('ws')
const { webSocket } = require('rxjs/webSocket')

const webSocketConnection$ = (
  webSocket({
    protocol: 'v1',
    url: 'http://example.com:3000',
    WebSocketCtor: WebSocket,
  })
)

Ref: https://gist.github.com/Sawtaytoes/fe3d16b1a15aa20eef5d2a41d0b39934 See: https://rxjs.dev/api/webSocket/WebSocketSubjectConfig#WebSocketCtor

kevmo314 commented 3 years ago

@abarke This doesn't work for existing web sockets. I mentioned this in the first line of the FR.

abarke commented 3 years ago

@kevmo314 apologies I see your point. You are right. An already established WebSocket would allow a new WebSocketSubject to be created for each incoming WS connection. The WebSocketSubject would then be used to send and receive messages on the server-side. I suppose one could then subscribe each WebSocketSubject to a shared resource ('hardware input, data stream, logs, etc.') e.g. ResourceSubject that then emits updated values and multicasting it to all subscribers (WebSocket Clients).

I created a demo to simulate a shared resource on a server just to wrap my head around it, as I need something similar for a project: https://stackblitz.com/edit/rxjs-shared-source-example?file=index.ts

It might be simple enough to modify the WebSocketSubject to take a WebSocket as an argument. I might just give it a hack 🙂

six-edge commented 3 years ago

Hey @kevmo314 I managed to create a PoC and published the library on NPM for consumption. I rewrote the WebSocketSubject class as extending it proved difficult due to not being able to set private members of the WebSocketSubject. Works great in my project so far. Any feedback/contributions welcome.

https://www.npmjs.com/package/rxjs-websocket-server-subject

six-edge commented 3 years ago

Any feedback on this @kevmo314 ?

Im looking at opening a PR for this, however I'm not sure if I should merge the WebSocketServerSubject into the WebSocketSubject (could get messy) or leave it as its own class (some duplicate code, but has separation of concerns and is more maintainable IMHO)

@benlesh any ideas or feedback of how to proceed?

kevmo314 commented 3 years ago

Unfortunately my use case has come and gone, the code that needed this is now happily running and stable. I took a look at the package though, it looks pretty good and I'd love to see this PR'd into the main rxjs/webSocket package. One edge case I'm curious about, what happens if there are two subjects on a single socket and one closes due to backpressure? What happens to the other subject?

six-edge commented 3 years ago

@kevmo314 regarding two subjects on a single socket... do you mean something like this?

wsServer.on('connection', (webSocket: WebSocket, req: IncomingMessage) => {
  const wsServerSubject = new WebSocketServerSubject<Message>(webSocket)
  const wsServerSubject2 = new WebSocketServerSubject<Message>(webSocket)

I tried this but it seems only wsServerSubject2 is bound. Not sure what your specific use case is, however in my test project I just create a single wsServerSubject and pipe various listeners to that subject that filter a specific message for each message type emitted. Could you elaborate on your use case as to why having two subjects on a single socket would be useful?

ValentinBossi commented 3 years ago

what about using fromEvent? From then on its possible to do everything right?

const wss = new Server({ server });

const handleConnection = (websocket: WebSocket): Observable<WebSocket.MessageEvent | WebSocket.CloseEvent> => {
  const onmessage$ = fromEvent<WebSocket.MessageEvent>(websocket, 'message')
    .pipe(
      tap(m => console.log(m.type))
    );
  const onclose$ = fromEvent<WebSocket.CloseEvent>(websocket, 'close')
    .pipe(
      tap(m => console.log(m.reason)));
      // const onerror$ = ....
      // etc
  return merge(onmessage$, onclose$)
}

fromEvent(wss, 'connection')
  .pipe(
    map(a => a[0] as WebSocket),
    mergeMap(handleConnection)
  ).subscribe();
}
noririco commented 3 years ago

what about using fromEvent? From then on its possible to do everything right?

const wss = new Server({ server });

const handleConnection = (websocket: WebSocket): Observable<WebSocket.MessageEvent | WebSocket.CloseEvent> => {
  const onmessage$ = fromEvent<WebSocket.MessageEvent>(websocket, 'message')
    .pipe(
      tap(m => console.log(m.type))
    );
  const onclose$ = fromEvent<WebSocket.CloseEvent>(websocket, 'close')
    .pipe(
      tap(m => console.log(m.reason)));
      // const onerror$ = ....
      // etc
  return merge(onmessage$, onclose$)
}

fromEvent(wss, 'connection')
  .pipe(
    map(a => a[0] as WebSocket),
    mergeMap(handleConnection)
  ).subscribe();
}

This is very nice idea, did you try it ? once you go rxjs you need it to the vain

ValentinBossi commented 3 years ago

what about using fromEvent? From then on its possible to do everything right?

const wss = new Server({ server });

const handleConnection = (websocket: WebSocket): Observable<WebSocket.MessageEvent | WebSocket.CloseEvent> => {
  const onmessage$ = fromEvent<WebSocket.MessageEvent>(websocket, 'message')
    .pipe(
      tap(m => console.log(m.type))
    );
  const onclose$ = fromEvent<WebSocket.CloseEvent>(websocket, 'close')
    .pipe(
      tap(m => console.log(m.reason)));
      // const onerror$ = ....
      // etc
  return merge(onmessage$, onclose$)
}

fromEvent(wss, 'connection')
  .pipe(
    map(a => a[0] as WebSocket),
    mergeMap(handleConnection)
  ).subscribe();
}

This is very nice idea, did you try it ? once you go rxjs you need it to the vain

sure. in the moment one message type is looking like that:

const onConvertMessage$ = fromEvent(websocket, 'message')
        .pipe(
            map(m => WsExchange.fromMessage(JSON.parse((m as WsMessage).data))),
            filter(m => m.action === "convert"),
            map(m => m as ConvertText),
            switchMap(handleConvertion),
            mergeMap(uploadToAws),
            map(bucket => GeneratedAudio.from(bucket)),
            tap(message => websocket.send(JSON.stringify(message))),
            catchError((err, caught) => {
                console.log("error in convert pipeline: ", err)
                return caught;
            }),
        );
bever1337 commented 1 year ago

I don't think this makes sense as part of rxjs because the change is to support a userland socket implementation that only works in a node environment. It would be a large commitment for RxJS development to follow ws development. For example, what versions should RxJS pin as a peer dependency? Who will test changes? ws is my preferred tooling, and this package looks like a great implementation, so why not continue to use this as a package?

If it is included, it should not be part of the dom observable module because the browser runtime is not supported.

kevmo314 commented 1 year ago

It would be a large commitment for RxJS development to follow ws development.

This is not true, as mentioned in the original issue the request is to follow the w3c specification for which the ws websocket is compliant. The change is to support passing in a spec-compliant websocket which also exists in browsers, not a specifically nodejs runtime websocket.

bever1337 commented 1 year ago

The client is spec-compliant in that it can talk to other spec-compliant sockets, absolutely, but the API cannot be used in the browser. For example, the node websocket allows the user to implement ping/pong, whereas browser websockets handle ping/pong automatically. Does RxJS need to expose additional streams on the subject? IMO yes, if the goal is to fully support native sockets.

If the goal is only to pass in a web socket, why not create a dynamic constructor as-needed? Someone suggested this earlier, but without creating a constructor on-the-fly. We could leverage variable scope, for example:

const nativeSocket = /* native `ws` created socket reference */;
const WebsocketCtor = function LocalCtor(url: string, protocol: string[]) {
  this = nativeSocket;
};
const rxjsSocket = new WebSocketSubject({
  WebSocketCtor,
});

In this situation, RxJS won't attach listeners until the developer subscribes to the new subject. It would be possible for the socket to emit events that are lost and de-sync RxJS state. For example, the openObserver might not get called. I need to review your changes better to understand how retry logic works. For example: By passing in a socket instead of a config, how would RxJS know the correct origin or protocol for the new socket?

bever1337 commented 1 year ago

I'm writing an additional comment instead of more edits 😂 @kevmo314 is the intention that your package could be merged right over the WebSocketSubject code? How can we review this as a PR? I'd love to get a proper diff so I can better understand your idea.

lropero commented 1 month ago

what about using fromEvent? From then on its possible to do everything right?

const wss = new Server({ server });

const handleConnection = (websocket: WebSocket): Observable<WebSocket.MessageEvent | WebSocket.CloseEvent> => {
  const onmessage$ = fromEvent<WebSocket.MessageEvent>(websocket, 'message')
    .pipe(
      tap(m => console.log(m.type))
    );
  const onclose$ = fromEvent<WebSocket.CloseEvent>(websocket, 'close')
    .pipe(
      tap(m => console.log(m.reason)));
      // const onerror$ = ....
      // etc
  return merge(onmessage$, onclose$)
}

fromEvent(wss, 'connection')
  .pipe(
    map(a => a[0] as WebSocket),
    mergeMap(handleConnection)
  ).subscribe();
}

Hi, I'm trying to access message's data in handleConnection's observables:

import express from 'express'
import { createServer } from 'http'
import { fromEvent, merge } from 'rxjs'
import { map, mergeMap, tap } from 'rxjs/operators'
import { WebSocketServer } from 'ws'

const app = express()
const server = createServer(app)
const wss = new WebSocketServer({ server })

const handleConnection = socket => {
  const onClose$ = fromEvent(socket, 'close').pipe(tap(value => console.log('close', value)))
  const onMessage$ = fromEvent(socket, 'message').pipe(tap(value => console.log('message', value)))
  return merge(onClose$, onMessage$)
}

fromEvent(wss, 'connection')
  .pipe(
    map(value => value[0]),
    mergeMap(handleConnection)
  )
  .subscribe()

...but instead I'm getting ws's event target formatted messages, like so:

close CloseEvent {
  [Symbol(kTarget)]: <ref *1> WebSocket {
    _events: [Object: null prototype] {
      close: [Array],
      error: [Function],
      message: [Function],
      open: [Function]
    },
    _eventsCount: 4,
    _maxListeners: undefined,
    _binaryType: 'nodebuffer',
    _closeCode: 1005,
    _closeFrameReceived: true,
    _closeFrameSent: true,
    _closeMessage: <Buffer >,
    _closeTimer: Timeout {
      _idleTimeout: -1,
      _idlePrev: null,
      _idleNext: null,
      _idleStart: 2493,
      _onTimeout: null,
      _timerArgs: undefined,
      _repeat: null,
      _destroyed: true,
      [Symbol(refed)]: true,
      [Symbol(kHasPrimitive)]: false,
      [Symbol(asyncId)]: 43,
      [Symbol(triggerId)]: 35
    },
    _errorEmitted: false,
    _extensions: {},
    _paused: false,
    _protocol: '',
    _readyState: 3,
    _receiver: Receiver {
      _events: [Object: null prototype] {},
      _writableState: [WritableState],
      _maxListeners: undefined,
      _allowSynchronousEvents: true,
      _binaryType: 'nodebuffer',
      _extensions: {},
      _isServer: true,
      _maxPayload: 104857600,
      _skipUTF8Validation: false,
      _bufferedBytes: 0,
      _buffers: [],
      _compressed: false,
      _payloadLength: 0,
      _mask: <Buffer b0 dc 14 ca>,
      _fragmented: 0,
      _masked: true,
      _fin: true,
      _opcode: 8,
      _totalPayloadLength: 0,
      _messageLength: 0,
      _fragments: [],
      _errored: false,
      _loop: false,
      _state: 0,
      _eventsCount: 0,
      [Symbol(shapeMode)]: false,
      [Symbol(kCapture)]: false,
      [Symbol(websocket)]: [Circular *1]
    },
    _sender: Sender {
      _extensions: {},
      _socket: [Socket],
      _firstFragment: true,
      _compress: false,
      _bufferedBytes: 0,
      _queue: [],
      _state: 0,
      onerror: [Function: senderOnError],
      [Symbol(websocket)]: [Circular *1]
    },
    _socket: Socket {
      connecting: false,
      _hadError: false,
      _parent: null,
      _host: null,
      _closeAfterHandlingError: false,
      _events: [Object],
      _readableState: [ReadableState],
      _writableState: [WritableState],
      allowHalfOpen: true,
      _maxListeners: undefined,
      _eventsCount: 2,
      _sockname: null,
      _pendingData: null,
      _pendingEncoding: '',
      server: [Server],
      _server: [Server],
      parser: null,
      on: [Function (anonymous)],
      addListener: [Function (anonymous)],
      prependListener: [Function: prependListener],
      setEncoding: [Function: socketSetEncoding],
      _paused: false,
      timeout: 0,
      [Symbol(async_id_symbol)]: 35,
      [Symbol(kHandle)]: null,
      [Symbol(lastWriteQueueSize)]: 0,
      [Symbol(timeout)]: null,
      [Symbol(kBuffer)]: null,
      [Symbol(kBufferCb)]: null,
      [Symbol(kBufferGen)]: null,
      [Symbol(shapeMode)]: true,
      [Symbol(kCapture)]: false,
      [Symbol(kSetNoDelay)]: true,
      [Symbol(kSetKeepAlive)]: false,
      [Symbol(kSetKeepAliveInitialDelay)]: 0,
      [Symbol(kBytesRead)]: 229,
      [Symbol(kBytesWritten)]: 131,
      [Symbol(websocket)]: undefined
    },
    _autoPong: true,
    _isServer: true,
    [Symbol(shapeMode)]: false,
    [Symbol(kCapture)]: false
  },
  [Symbol(kType)]: 'close',
  [Symbol(kCode)]: 1005,
  [Symbol(kReason)]: '',
  [Symbol(kWasClean)]: true
}

How can I fix this? Thanks.-