openwallet-foundation / credo-ts

Typescript framework for building decentralized identity and verifiable credential solutions
https://credo.js.org
Apache License 2.0
252 stars 188 forks source link

Mediator instance not consuming messages received by another mediator instance #1625

Open ericvergnaud opened 8 months ago

ericvergnaud commented 8 months ago

Hi,

following standard SRE architecture, we're running 2 mediator instances behind an ALB (application load balancer).

To ensure that queued messages are accessible from both instances (and also survive a restart), we've implemented a PostgreSQLMessageRepository which we use instead of the default InMemoryMessageRepository.

This works well when the mediated wallet connects (i.e. it picks all messages stored since last connection), but not during a mediation session.

The failing scenario, as we understand it so far, is as follows:

We have a few questions:

What do you think ?

TimoGlastra commented 8 months ago

Hey @ericvergnaud. I think the behaviour you're describing is correct.

I know @genaris has been working on improvements to the mediator implementation, not sure if there's any improvements being made to this specific functionality though.

There's currently no config option to make it check regularly for queued messages. As you describe, I think we should have some sort of push mechanism that notifies all the mediators of a new message, and if there's a mediator session open, it can directly push that message to the client.

We're also currently using Postgres for storing mediated messages, but also exploring redis for this (which supports notifications using pub/sub). There's no endpoint in the mediator, but this is something you could set up as part of your custom PostgreSQLMessageRepository. (either set up a new server, or add it to the HttpInboundTransport, but make sure it's secured).

We could explore some good options to be added to the framework by default (or as an extension package) such as Redis and PostgresQL, but I think it would be good to first implement something and then look at maybe integrating it into the framework.

Thanks for opening this issue, this will be a good thing to tackle. Would also like to hear @genaris his opinion on this, and how it relates to the work he's already doing

ericvergnaud commented 8 months ago

@TimoGlastra thanks for confirming our findings. Not sure the MessageRepository is the best place for receiving notifications, since it would have to forward them upwards anyway (it knows nothing about active mediated agents)... I'll explore options and share them here

ericvergnaud commented 8 months ago

Poking around, I'm beginning to think that the right place for this feature could be MessagePickupApi. That's where we already pickup messages for a newly connected mobile agent, and we already have the connectivity to the message repo and the message sender.

We could introduce a MessageQueuedEventListener, configured via MediatorModuleConfig. That listener would be invoked by whatever mechanism provided by the application. When invoked, it would simply populate a new PickupMessagesOptions instance and call a slightly enhanced pickupMessages (special care needs to be taken in order to not queue messages twice).

What do you think ?

ericvergnaud commented 8 months ago

@genaris any thoughts ?

TelegramSam commented 8 months ago

If it helps, SocketDock was created to solve this specific issue - websockets are terminated in a cluster of socketdock instances, with messages forwarded to the clustered instances. Callback URIs help any instance send messages back down an individual websocket.

ericvergnaud commented 8 months ago

@TelegramSam thanks! From the docs, it seems SocketDoc was created to solve the issue of clients without websocket support ? The issue here is not related to web sockets, rather it's related to a clustered mediator. From the diagram it's not clear if SocketDoc supports multiple mediators ? I appreciate it may solve this issue as a side effect, but that's a lot of infra for a problem that the mediator should tackle in the first place ?

ericvergnaud commented 8 months ago

@genaris still hoping you can comment on the proposal ?

genaris commented 8 months ago

@genaris still hoping you can comment on the proposal ?

Thanks for not losing the hope! 😄 I've written some comments some days ago but for some reason they didn't get published.

If I understand correctly, in the architecture you are following, each AFJ instance holds a number of WebSocket sessions, while there is a unique "Pickup Message Queue" where all instances can query and post messages:

image

The approach you are proposing could be relatively straightforward to do with current AFJ architecture, as it would be just a matter of adding the listener you are mentioning in each instance and deliver any relevant message to its connected clients. In that sense, I think it is very interesting the idea from @TimoGlastra about using a Redis DB that includes this pub/sub interface: each instance can subscribe to those messages queued for clients connected to it and retrieve/deliver them.

I agree that MessagePickUpApi is the right place to provide means to handle this, although I'm not too sure about how many responsibilities we should give to it, and which ones can be handled externally (e.g. in the custom MessageRepository provided to the agent instance).

For instance, we can manage the subscription to changes in the Message Queue directly on each instance's custom MessageRepository implementation, based on some events fired by the agent when new clients are connected to its WebSocket pool.

Then, when it is notified about new relevant messages, it can call a new MessagePickupApi method to deliver them immediately. The way the agent actually delivers these queued messages depends on the nature of the Message Pickup session (which can be stored within MessagePickupApi or as a new field for WebSocketSession): if it is a "legacy" one (Implicit), the message will be sent as is, while if it is a PickUp V2 we can use a delivery message to get a fancy way of clearing up the queue when client acknowledges the reception.

image

For sure, this leaves more responsibility to the MessageRepository instance, so it will be somehow harder to use the framework. But I think it is good for the framework's core to simply provide all the means needed to achieve a goal, yet being flexible to let users do their own customizations. As @TimoGlastra said, I think we can work on a package in aries-framework-javascript-ext extension that can serve as a reference implementation of such scalable mediator architecture.

BTW I think we have to refactor MessageRepository to MessagePickupQueue or something meaningful ASAP 😄.

genaris commented 8 months ago

@TelegramSam thanks! From the docs, it seems SocketDoc was created to solve the issue of clients without websocket support ? The issue here is not related to web sockets, rather it's related to a clustered mediator. From the diagram it's not clear if SocketDoc supports multiple mediators ? I appreciate it may solve this issue as a side effect, but that's a lot of infra for a problem that the mediator should tackle in the first place ?

I think the docs treat the problem in a generic way because the folks that created it figured out that it could have a broader usage other than Aries mediator WS handling, but AFAIK it was mainly developed to solve a problem similar than ours (probably more oriented to ACA-Py architecture).

In my team we are also analyzing how it would be using this concept in AFJ, which would have an architecture like this one:

image

So we can see that, besides the Load Balancer that receives HTTP requests for messages sent to clients, we have now the SocketDock (that will also have some load balancing but I left the diagram as simple as possible).

The good thing about this is that AFJ instances will be simpler in the sense that they will just receive requests and either send a message to a client or queue it depending on its online/offline status, can be determined by storing socketId-connectionId relationship in a table that can be queried and updated by any AFJ instance (not sure where, maybe another Redis-based cache?).

This approach would not need a special notification mechanism for Pickup Message Queue, as all AFJ instances will know the status of clients connected to the SocketDock. Instead, they would implement a SocketDockInboundTransport that exposes endpoints for connect/message/disconnect and creates session objects that send messages to the callback provided by SocketDock.

image

In this diagram I'm showing what I've understood so far about the lifecycle of a client session. Not shown here, but incoming messages catched by any AFJ instance to a given connection will be sent to the callback provided by SocketDock for its particular connectionId, while they will be added to the Message Queue if no socketId is found for the given connectionId in the "Session Table".

As a result, I would say that it can become more complex than the other approach, as we have now two new actors (SocketDock and "Session Table"), but it can be very interesting considering that we will have more granularity when it comes to scale components.

Needless to say that we can aim to support both approaches with AFJ, as a good framework should do! 😄

TimoGlastra commented 8 months ago

Just wanted to share our approach to handling multi-instance mediators that we've recently implemented. It's currently build on top of AFJ, and ideally some additional events and hooks are added so we don't have to implement a custom WsInboundTransport.

We have the following setup:

So ideally:

In addition, message queueing currently works by taking messages from the queue before you know whether the message sending will succeed. Pickup v2 has a solution for this, but we don't really support this at the moment. So ideally we also start supporting a) explicit live delivery mode and b) pickup v2 delivery with acknowledgement (we do support acks but not based on ids, so it's still prone to issues). Then we deprecate the implicit way (should have been deprecated a long time ago). Pickup v1 can be kept for now I think, but knowing that possibly will lose messages.

ericvergnaud commented 8 months ago

@genaris @TimoGlastra thanks for your comments. I'm a bit busy with other stuff right now, but I'll come up with a PR shortly after that.

The goal is minimal: make it possible to notify a mediator instance that a message has been queued, and have the mediator process the notification.

I think the notification infrastructure (emitter and listener) should be left entirely to the developers (the same way storage is). FWIW, many AWS database services (Aurora, RDS, DynamoDB) come with CDC support, so it's less infra and 100% ACID to just have the CDC call the new notification entry point (using Redis or any other mechanism on top of PGSQL requires some form of 2-phase commit in order to be bullet-proof).

This enables the following architecture:

MessageNotifier drawio-2

IMHO the above would be for some implementers simpler than using SocketDock because the notifier can be implemented using serverless services (such as AWS SNS or AWS EventBridge) and thus does not require additional infra.

genaris commented 8 months ago

In addition, message queueing currently works by taking messages from the queue before you know whether the message sending will succeed. Pickup v2 has a solution for this, but we don't really support this at the moment. So ideally we also start supporting a) explicit live delivery mode and b) pickup v2 delivery with acknowledgement (we do support acks but not based on ids, so it's still prone to issues). Then we deprecate the implicit way (should have been deprecated a long time ago). Pickup v1 can be kept for now I think, but knowing that possibly will lose messages.

I agree on deprecating the implicit mode (on the mediator side at least). Not sure how many are using it but I'd even remove it from 0.5.0.

In regards to the PickUp V2 'official' Live Mode support and MessagePickupQueue refactoring, I'll submit a PR soon with the works I've been doing so far. I have some doubts regarding where is the best place to handle some things but, you know, a PR worths more than a thousand words 😛.