marblejs / marble

Marble.js - functional reactive Node.js framework for building server-side applications, based on TypeScript and RxJS.
https://marblejs.com
MIT License
2.15k stars 71 forks source link

Have a way to listen to the CQRS event bus arbitrarily #385

Closed EthanStandel closed 2 years ago

EthanStandel commented 2 years ago

Is your feature request related to a problem? Please describe. I'm trying to have a websocket listen to the CQRS event bus and I don't see a concise way to do that. Effects which listen to the event bus are registered functions, as are websocket effects. So they're not truly observables which can subscribe to each other.

Describe the solution you'd like It seems like the EventBusClient should have a way to arbitrarily listen and filter for commands and events.

Describe alternatives you've considered I don't really see a way to keep them reliably connected besides just instantiating a global Subject and feeding it from the CQRS side and then listening to it on the websocket side. That definitely feels wrong though.

Additional context N/A

JozefFlakus commented 2 years ago

Hi @EthanStandel,

I'm not sure if I get it correctly. Can you describe in more details the possible use case for this problem statement?

EthanStandel commented 2 years ago

I'm just building out a simple proof of concept game, somewhat similar to Pretend you're Xyzzy. So people can arbitrarily join a game as long as they have a game ID (or they can create a new game). If you're in a game, you would subscribe to all game changes and events over a websocket so that the state of the game for each client is constantly up to date. If another player joins the game, update the game with a CQRS command, and dispatch the change for the game through all websocket sessions that are subscribed to that particular game.

JozefFlakus commented 2 years ago

I would try to inject EventBus directly in each WsEffect that would like to listen to outgoing events, eg.

const eventBus = useContext(EventBusToken)(ctx.ask);

The only one problem is that the injected EventBus uses a raw message stream (as Buffer), which means that you have to parse the data accordingly, eg.

eventBus.message$.pipe(
  map(msg => JSON.parse(msg.data.toString())),
);

Please note that it is just only a short snipped, inside of the framework there are some additional things that are performed underneath. Personally I would think about extending the currently existing EventBusClient interface with such a shortcut for a stream of outgoing (properly parsed) events.

EthanStandel commented 2 years ago

Ah, okay that's what I was missing! I'm cool with closing this out, though I will say that would be super helpful to have in the docs, even that quick little example (and also, it might be worth it to rename messages$ to rawMessage$ and then assign messages$ to this pipe, but that's just opinion and not at all necessary).