spring-projects / spring-integration

Spring Integration provides an extension of the Spring programming model to support the well-known Enterprise Integration Patterns (EIP)
http://projects.spring.io/spring-integration/
Apache License 2.0
1.54k stars 1.1k forks source link

Embrace queue APIs in Spring's JDBC message stores for polling messages #3872

Closed raphw closed 2 years ago

raphw commented 2 years ago

Modern databases support much better queuing APIs than what they offered years ago, so articles like the one included in the Spring integration documentation are rather dated.

For example, Postgres offers LISTEN and NOTIFY or Oracle database change norification. These APIs would allow for a more efficient Queue usage of the JDBC queues that are offered, if a database offers it.

artembilan commented 2 years ago

Yes, we may consider that as a contribution, for example like spring-integration-postgresql and spring-integration-oracle to implement those vendor-specific features. It might be not as a store impl, but rather just PostgresMessageChannel and perhaps message-driven channel adapter to react to notifications and emit respective data.

Contribution is welcome!

raphw commented 2 years ago

Thanks, I will look into it and try to solve this for my own application first and then try to extract it to a pull request.

raphw commented 2 years ago

I played around with this on Oracle and Postgres and here are some observations:

  1. The Oracle JDBC driver is rather buggy, on its newest version, the driver fails with an internal assertion error upon trying to register a change notification. With an older version, I get it to work nicely, but only if the Oracle database runs on the same host as the JVM. There is a property NTF_LOCAL_HOST to define a host name that the Oracle DB should send notifications to, but it is falsely submitted by the JDBC driver. I got around this as well by registering the change notification in a PL/SQL script that I run via JDBC from which I call back via TCP. It requires some effort to run this setup and I found it rather limiting as the Oracle DB must be able to call the JVM. In most setups with Kubernetes, it is for example not possible to callback a specific replica. Also, often it requires to change firewall setup.

  2. The official Postgres JDBC driver only supports blocking polls but I assume that would not be an issue as that represents the queue API. However, it consumes a full connection.

  3. There is an alternative Postgres JDBC driver which does support an asynchronous notification. This way, it would be possible to use a single JDBC connection for multiple queues. Unfortunately, the project seems abandoned.

  4. The same is true for the Vertx Postgres client, which uses Netty and is better maintained.

What would you consider to be in-scope? I'd suggest to aim for Postgres support only, at least for a first version.

I am further wondering if this should rather be implemented as a pollable channel or as a PublishSubscribeChannel. Both is possible, of course, I wonder what would be the more appropriate abstraction. As the standard JDBC driver for Postgres is blocking, though, I would consider the pollable channel, though.

artembilan commented 2 years ago

Thank you, @raphw , for investigation. For Oracle I heard about Advance Queuing, but looks like it can work via standard JMS protocol: https://docs.oracle.com/database/121/ADQUE/aq_intro.htm#ADQUE2440. And for that part we already have a specific MessageChannel implementation: https://docs.spring.io/spring-integration/docs/current/reference/html/jms.html#jms-channel.

What you say for PostgreSQL and its blocking polls does not make too much difference for me with what we have so far with a QueueChannel and JdbcChannelMessageStore combination.

So, probably we come back to square one when we have everything what we can, but provides don't give us enough flexibility to extend for their features. Therefore I fill like this one is Won't Fix and Invalid 😄

raphw commented 2 years ago

Indeed, Oracle AQ is JMS compatible, I don't think that needs explicit support.

As for Postgres, the difference to using a QueueChannel is that the queue channel uses an explicit lock which is only released if you add a message on the same JVM. If you for example run replicas of an application, other replicas that do not add messages to the queue will poll forever without ever releasing their locks and remain stale.

The common workaround is to poll the group every X seconds, but with Postgres's notify/listen API, this is not necessary as all listening JVMs will receive information on a new message being sent to the queue. If one replica sends 1000 messages to the queue, with this approach all other replicas will immediately wake up and start processing what is the desired behavior in my case.

(I prototyped this now with a custom component and it works like a charm.)

artembilan commented 2 years ago

I'm confused: you said in your previous comment that "Postgres JDBC driver only supports blocking polls", so we are in a situation to re-implement a PollableChannel, which is, essentially, no difference with the currently polling QueueChannel. And you talk about notify/listen API again. So, what do I miss from your investigation? Doesn't look like you agreed about some out-dated libraries to rely on...

raphw commented 2 years ago

In few words: the difference is that the current solution notifies on a JVM level, the proposed solution notifies on a database level.

If JVM A started polling from a group, it would either block forever or would need to poll in intervals. Because if JVM B sent a message, there's no mechanism to push these notifications to the other JVM. LISTEN/NOTIFY on the other hand allows for an immediate reaction on JVM A if JVM B added a message.

artembilan commented 2 years ago

I understand what you are saying and that's why it is called "polling". Our current impl in the QueueChannel for external store is like this:

        try {
            message = doPoll();
            while (message == null && timeoutInNanos > 0) {
                timeoutInNanos = this.messageStoreNotEmpty.awaitNanos(timeoutInNanos);
                message = doPoll();
            }
        }

Just because there is no a notification hook from the DB to react on. Therefore we have to poll in a loop.

What you describer with the LISTEN/NOTIFY is already a SubscribableChannel, but not a polling. Not sure why we are still not on the same page...

This is your words:

As the standard JDBC driver for Postgres is blocking, though, I would consider the pollable channel, though.

That's why I want to get a clear answer from you: what we are going to implement - the polling channel which is already there, or SubscribableChannel which is going to react for some notification.

raphw commented 2 years ago

Well, the implementation I am out after is a Java Queue that I can plug into an existing API. In the current implementation of the poll method, this delegates to the code your quoting. I was out after replacing the messageStoreNotEmpty lock with a database based one. But a subscribable channel is of course also possible. I was nore trying to explain why the current API in Soring is not equivalent as you suggested.

artembilan commented 2 years ago

Sure! It really can be a custom Queue impl which we can inject into the QueueChannel. Or, as I understood from your explanation, a SubscribableChannel to initiate a LISTEN on one when we subscribe to this channel and NOTIFY when we send().

raphw commented 2 years ago

Yeah, that's what I am currently thinking would be the best solution. If one wanted to map this into a Java Queue, that would still be possible by a subscriber that adds to a regular queue.

I will try to make a prototype out of this!

raphw commented 2 years ago

I have created something that hooks into Spring Integration in a way that works for me. I wanted to start out with something minimal that depicts what I am aiming at without any details: https://github.com/raphw/spring-integration/tree/postgres-notification-listener

A listener like this will trigger JVM B immediately if listening for a group message send by JVM A. As things stand, the run method of PostgresChannelMessageSubscriber must be run by some executor. Listeners will then be provided with the message being received. It does not matter how the messages are added to the message table due to using a trigger on the table that invokes NOTIFY on each insert.

In a first attempt, I tried to implement the SubscribableChannel interface, but it did not really fit. LISTEN is rather expensive as it consumes a full connection, so ideally this should be run as a singleton in the Spring context and not once for each subscription. Sharing is possible as the NOTIFY sends the groupKey as its payload.

Any suggestions how this might fit into the existing landscape? I read quite some code to figure this out myself, but it's a lot of hierarchies, so I was wondering if someone who is more experienced with the code base could make a better consideration.

artembilan commented 2 years ago

At a glance looks cool!

I don't think we need to mix poll missed and notify concerns in one class. I see it like this:

  1. A single global PostgresChannelMessageTableSubscriber. Since you say it block the whole connection, so indeed it is better to have one per table.
  2. The notifier must emit not only NEW.GROUP_KEY, but also NEW.REGION (if that possible, of course). Different regions may have the same group key.
  3. The SubscribableChannel may implement something like PostgresListener to be added to the provided PostgresChannelMessageTableSubscriber. How - we will figure out on the way. Probably with some attributes for filtering like those group key and region.
  4. This channel may indeed implement that "missed messages" feature in its start() contract.

My point is to avoid a JdbcChannelMessageStore and MessageHandler abstractions from this new PostgresChannelMessageTableSubscriber.

WDYT?

raphw commented 2 years ago

Yeah, that sounds like a better idea as it allows the subscriber to decide on the polling.

One issue I found however was mapping back from groupKey to groupId, if one wanted to provide the latter as an argument to the subscriber.

artembilan commented 2 years ago

Not sure what you mean. The channel must be supplied with that groupKey and it is stored as is (string) into DB. This key has to be a contract of the PostgresListener we are going to implement on the PostgresSubscribableChannel. We also may consider to make it based on a bean name: see BeanNameAware DI hook.

raphw commented 2 years ago

If listening to inserts via notify, only database values can be supplied to the NOTIFY call, GROUP_KEY in this case. On LISTEN, this key is available, but not the object which is normally supplied when sending messages.

I had hoped to supply offer a listener interface where region and groupId for a message were provided to the listener, but only the stringified key is available.

raphw commented 2 years ago

I redid the suggestion, maybe that's a better approach. Please have another look!