spring-projects / spring-integration

Spring Integration provides an extension of the Spring programming model to support the well-known Enterprise Integration Patterns (EIP)
http://projects.spring.io/spring-integration/
Apache License 2.0
1.55k stars 1.11k forks source link

Use postgres listen/notify for publish and subscribe. #4004

Open joshiste opened 1 year ago

joshiste commented 1 year ago

Since #3884 we can use postgres' listen/notify in conjunction with the JdbcMessageStore.

The current implementation is only for queues - but the listen/notify could be also used for publish/subscribe to broadcast messages to all listeners.

p.s. I've got a working prototype for this adding an inbound- and outbound adapter.

artembilan commented 1 year ago

Right, the PostgresSubscribableChannel with a UnicastingDispatcher to handle received message only in one handler. But this does not stop you to bridge such a solution into a PublishSubscribeChannel:

@BridgeFrom("myPostgresSubscribableChannel")
@Bean
PublishSubscribeChannel myPubSubChannel() {
    return new PublishSubscribeChannel();
}

On the other hand, if you have several instances of your application, all those PostgresSubscribableChannel are going to receive a notification according to the Postgres protocol.

I'm not sure what drove me to stick with the UnicastingDispatcher in this channel, since it really makes sense to just let all the handlers to process the same message as it works in a cluster solution. So, probably we can do that within this issue.

Regarding, channel adapters. I don't see a reason in the outbound one, since what we just need to do on the produce side is an INSERT (or UPDATE) and have a respective trigger configured on the table to initiated a notification via function.

I am probably OK with an inbound channel adapter (a MessageProducerSupport impl) which would subscribe to the LISTEN command in its start() to any configured table notification with respective SELECT against has just notified parameter. Or just produce that parameter as a payload... but I guess this may deserve it own GH issue.

WDYT?

joshiste commented 1 year ago

On the other hand, if you have several instances of your application, all those PostgresSubscribableChannel are going to receive a notification according to the Postgres protocol.

Having multiple instances of the same application and having all of them process the message (therefore a broadcast), is what I had in mind.

As you noted, all instances will get the notification via the PostgresSubscribableChannel. But due to the nature of the JdbcChannelMessageStore only one of the instances will process the message (and all the other will most like like log the warning Message with id '<uuid>' was not deleted).

Regarding, channel adapters. I don't see a reason in the outbound one, since what we just need to do on the produce side is an INSERT (or UPDATE) and have a respective trigger configured on the table to initiated a notification via function.

What about not to persist the message at all and passing it in the notify? This means all currently listening instances will receive the message, if there is no listener the message is lost. This avoids any housekeeping for the persisted messages. The outbound adapter would be a simple notify call passing the message as parameter.

I am probably OK with an inbound channel adapter (a MessageProducerSupport impl) which would subscribe to the LISTEN command in its start() to any configured table notification with respective SELECT against has just notified parameter. Or just produce that parameter as a payload... but I guess this may deserve it own GH issue.

My prototype is a MessageProducerSupport implementation which uses the parameter as payload.

In overall it would be very similar to camel-pgevent

artembilan commented 1 year ago

Yeah... My bad, sorry. Yes, the nature of JdbcChannelMessageStore is really a queue, so only one can poll the message from that persistent store, even if they are competing. The pub/sub cannot be implemented around JdbcChannelMessageStore. It is not correct to assume that we can notify with any arbitrary data. See docs for that NOTIFY feature: https://www.postgresql.org/docs/current/sql-notify.html. I don't think the notify was designed to be called from the external service. This is really something DB-specific which is better to initiate from some trigger or stored procedure. I don't see a problem to listen for such a notification, but as you see that data can be transferred is limited and probably better just to notify with a key to SELECT then data from db.

joshiste commented 1 year ago

It is not correct to assume that we can notify with any arbitrary data. See docs for that NOTIFY feature: https://www.postgresql.org/docs/current/sql-notify.html.

as long as we can serialise it into a string (max 8k by default) we can send everything, right?

I don't think the notify was designed to be called from the external service. This is really something DB-specific which is better to initiate from some trigger or stored procedure.

Why do you think that?

I don't see a problem to listen for such a notification, but as you see that data can be transferred is limited and probably better just to notify with a key to SELECT then data from db.

So are you proposing an additional store implementation for supporting broadcasted data? Or providing a MessageProducerSupport implementation for receiving notifications and leave the rest to the user?

artembilan commented 1 year ago

I find this as an abuse of relation database: it is really better to use tools which were designed for messaging: Apache Kafka, RabbitMQ, Pub/Sub, Event Hubs etc. Hazelcast or ZeroMQ in the end.

Why would one try to serialize message to a String just be able to publish it into other instances via DB meanwhile loosing data if no subscribers fully eliminating a purpose of DB?

No, I don't purpose any additional message store impls: I don't think it is going to be easy to come with a persistent topic solution based on DB.

Since a NOTIFY call from the application has a little (or even less) value over regular INSERT, I will stay against implementing an outbound channel adapter.

I'm OK with a MessageProducerSupport implementation to receive a notification from PostgreSQL, but as a general manner listener, not any other additional message store impls. The Message is just a particular type of data for DB, so it will be up to end-user to implement some specific data promotion solution in their DB.

You probably can implement a simple POJO to call NOTIFY from your application, but I don't want to encourage community via an outbound channel adapter and come back to us with some unexpected behaviors we don't support.

MarcusCambio commented 1 year ago

Since #3884 we can use postgres' listen/notify in conjunction with the JdbcMessageStore.

The current implementation is only for queues - but the listen/notify could be also used for publish/subscribe to broadcast messages to all listeners.

p.s. I've got a working prototype for this adding an inbound- and outbound adapter.

Is this code available to look at somewhere? I'm looking to use postgresql as a simple pub/sub server with minimal payloads and no delivery guarantees needed.