quarkiverse / quarkus-artemis

Quarkus Artemis extensions
Apache License 2.0
12 stars 12 forks source link

Support Rebalancing connections #257

Closed gastaldi closed 9 months ago

gastaldi commented 1 year ago

The activemq-artemis JCA Resource archive supports rebalancing of all inbound MDB connections when the underlying Artemis cluster topology changes so that when nodes are added/removed from the cluster, the MDB can connect to them instead of being stuck to the topology when the MDB initially connected to the cluster.

More info here:

Since MDBs are not supported in this extension, it would be nice if the extension could support this feature for JMS consumers (perhaps by having a quarkus.artemis.rebalance-connections=true config option).

vsevel commented 1 year ago

instead of being stuck to the topology when the MDB initially connected to the cluster.

it is not just about the initial connection. the typical use case is:

another approach (may be simpler) to solving this problem is to recognize a clustered artemis url (tcp://myhost:61616,tcp://myhost2:61616) and never create more than 5 connections on node 1, assuming we target a pool of 10. so when node 2 is down, you accept to work in degraded mode with 5 connections total. when node 2 restarts, 5 new connections will get created on that node.

this is assuming we work closely with the underneath pool.

Since MDBs are not supported in this extension

could a functionally equivalent be supported? we do not need a MDB, but we do need a managed listener with JTA integration.

turing85 commented 1 year ago

Aside from the fact that I lack a deeper understanding of that particular feature of JMS, I currently do not have time to work in this issue.

I have, however, a few thoughts about this, especially wrt. pooling. For pooling, we rely on https://github.com/quarkiverse/quarkus-pooled-jms (cc @zhfeng), so I am not really sure how much control we have on how many connections go where. I am not even sure we can easily integrate cluster connections into pooled-jms.

@gastaldi , @vsevel is either of you working on a PR?

gastaldi commented 1 year ago

I'm playing around with https://github.com/gastaldi/quarkus-jca, which enables you to deploy the Artemis JCA adapter in Quarkus. It's an early prototype, so don't get your hopes too high (it's in my organization, because I am not yet sure if or when it should be promoted to Quarkiverse org) :) I can see it works with the rebalance connection flag set BTW (the warnings in the log were displayed when I shutdown the Artemis server) and it recovered when I got it back up:

https://github.com/gastaldi/quarkus-jca/blob/4bf0619b03c5b724564ddb7db96347ea100d094f/integration-tests/src/main/java/io/quarkiverse/jca/it/message/MyMessageEndpoint.java#L11-L16


__  ____  __  _____   ___  __ ____  ______
 --/ __ \/ / / / _ | / _ \/ //_/ / / / __/
 -/ /_/ / /_/ / __ |/ , _/ ,< / /_/ /\ \
--\___\_\____/_/ |_/_/|_/_/|_|\____/___/
2023-08-06 20:17:02,958 INFO  [io.qua.jca.run.imp.JCAVerticle] (vert.x-eventloop-thread-0) Starting JCA Resource Adapter org.apache.activemq.artemis.ra.ActiveMQResourceAdapter@e2ba9d55

2023-08-06 20:17:02,960 INFO  [org.apa.act.art.ra.ActiveMQRALogger] (vert.x-eventloop-thread-0) AMQ151007: Resource adaptor started
2023-08-06 20:17:02,975 INFO  [org.apa.act.art.ra.ActiveMQRALogger] (vert.x-eventloop-thread-2) AMQ151004: Instantiating jakarta.jms.Queue "MyQueue" directly since UseJNDI=false.
2023-08-06 20:17:03,157 INFO  [io.quarkus] (Quarkus Main Thread) quarkus-jca-integration-tests 999-SNAPSHOT on JVM (powered by Quarkus 3.2.3.Final) started in 1.493s. Listening on: http://localhost:8080
2023-08-06 20:17:03,159 INFO  [io.quarkus] (Quarkus Main Thread) Profile dev activated. Live Coding activated.
2023-08-06 20:17:03,159 INFO  [io.quarkus] (Quarkus Main Thread) Installed features: [cdi, jca, narayana-jta, resteasy, smallrye-context-propagation, vertx]
2023-08-06 20:17:33,180 INFO  [io.qua.jca.it.mes.MyMessageEndpoint] (Thread-0 (ActiveMQ-client-global-threads)) Received message: Hello, World!
2023-08-06 20:17:38,916 WARN  [org.apa.act.art.cor.client] (Thread-0 (ActiveMQ-client-global-threads)) AMQ212037: Connection failure to localhost/127.0.0.1:61616 has been detected: AMQ219015: The connection was disconnected because of server shutdown [code=DISCONNECTED]
2023-08-06 20:17:38,921 WARN  [org.apa.act.art.cor.client] (Thread-1 (ActiveMQ-client-global-threads)) AMQ212037: Connection failure to localhost/127.0.0.1:61616 has been detected: AMQ219015: The connection was disconnected because of server shutdown [code=DISCONNECTED]
2023-08-06 20:17:38,921 WARN  [org.apa.act.art.cor.client] (Thread-2 (ActiveMQ-client-global-threads)) AMQ212037: Connection failure to localhost/127.0.0.1:61616 has been detected: AMQ219015: The connection was disconnected because of server shutdown [code=DISCONNECTED]
2023-08-06 20:18:11,006 INFO  [org.apa.act.art.ra.ActiveMQRALogger] (Thread-3 (ActiveMQ-client-global-threads)) AMQ151006: Cluster topology change detected. Re-balancing connections on even nodeUp TopologyMember[id=718d70c1-34af-11ee-befe-0242ac110002, connector=Pair[a=TransportConfiguration(name=8298ffa3-34af-11ee-8d6e-a2222f8c3c71, factory=org-apache-activemq-artemis-core-remoting-impl-netty-NettyConnectorFactory) ?host=localhost&port=61616, b=null], backupGroupName=null, scaleDownGroupName=null].
2023-08-06 20:18:11,009 ERROR [org.apa.act.art.cor.client] (Thread-0 (ActiveMQ-client-global-threads)) AMQ214003: Failed to handle failover: ActiveMQUnBlockedException[errorType=UNBLOCKED message=AMQ219016: Connection failure detected. Unblocking a blocking call that will never get a response]
    at org.apache.activemq.artemis.core.protocol.core.impl.ChannelImpl.sendBlocking(ChannelImpl.java:560)
    at org.apache.activemq.artemis.core.protocol.core.impl.ChannelImpl.sendBlocking(ChannelImpl.java:452)
    at org.apache.activemq.artemis.core.protocol.core.impl.ActiveMQSessionContext.recreateSession(ActiveMQSessionContext.java:909)
    at org.apache.activemq.artemis.core.client.impl.ClientSessionImpl.handleFailover(ClientSessionImpl.java:1444)
    at org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl.reconnectSessions(ClientSessionFactoryImpl.java:933)
    at org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl.failoverOrReconnect(ClientSessionFactoryImpl.java:700)
    at org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl.handleConnectionFailure(ClientSessionFactoryImpl.java:566)
    at org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl$DelegatingFailureListener.connectionFailed(ClientSessionFactoryImpl.java:1407)
    at org.apache.activemq.artemis.spi.core.protocol.AbstractRemotingConnection.callFailureListeners(AbstractRemotingConnection.java:98)
    at org.apache.activemq.artemis.core.protocol.core.impl.RemotingConnectionImpl.fail(RemotingConnectionImpl.java:212)
    at org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl$CloseRunnable.run(ClientSessionFactoryImpl.java:1172)
    at org.apache.activemq.artemis.utils.actors.OrderedExecutor.doTask(OrderedExecutor.java:57)
    at org.apache.activemq.artemis.utils.actors.OrderedExecutor.doTask(OrderedExecutor.java:32)
    at org.apache.activemq.artemis.utils.actors.ProcessorBase.executePendingTasks(ProcessorBase.java:68)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    at org.apache.activemq.artemis.utils.ActiveMQThreadFactory$1.run(ActiveMQThreadFactory.java:118)

2023-08-06 20:18:11,011 ERROR [org.apa.act.art.cor.client] (Thread-1 (ActiveMQ-client-global-threads)) AMQ214003: Failed to handle failover: ActiveMQUnBlockedException[errorType=UNBLOCKED message=AMQ219016: Connection failure detected. Unblocking a blocking call that will never get a response]
    at org.apache.activemq.artemis.core.protocol.core.impl.ChannelImpl.sendBlocking(ChannelImpl.java:560)
    at org.apache.activemq.artemis.core.protocol.core.impl.ChannelImpl.sendBlocking(ChannelImpl.java:452)
    at org.apache.activemq.artemis.core.protocol.core.impl.ActiveMQSessionContext.recreateSession(ActiveMQSessionContext.java:909)
    at org.apache.activemq.artemis.core.client.impl.ClientSessionImpl.handleFailover(ClientSessionImpl.java:1444)
    at org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl.reconnectSessions(ClientSessionFactoryImpl.java:933)
    at org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl.failoverOrReconnect(ClientSessionFactoryImpl.java:700)
    at org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl.handleConnectionFailure(ClientSessionFactoryImpl.java:566)
    at org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl$DelegatingFailureListener.connectionFailed(ClientSessionFactoryImpl.java:1407)
    at org.apache.activemq.artemis.spi.core.protocol.AbstractRemotingConnection.callFailureListeners(AbstractRemotingConnection.java:98)
    at org.apache.activemq.artemis.core.protocol.core.impl.RemotingConnectionImpl.fail(RemotingConnectionImpl.java:212)
    at org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl$CloseRunnable.run(ClientSessionFactoryImpl.java:1172)
    at org.apache.activemq.artemis.utils.actors.OrderedExecutor.doTask(OrderedExecutor.java:57)
    at org.apache.activemq.artemis.utils.actors.OrderedExecutor.doTask(OrderedExecutor.java:32)
    at org.apache.activemq.artemis.utils.actors.ProcessorBase.executePendingTasks(ProcessorBase.java:68)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    at org.apache.activemq.artemis.utils.ActiveMQThreadFactory$1.run(ActiveMQThreadFactory.java:118)

2023-08-06 20:18:11,013 ERROR [org.apa.act.art.cor.client] (Thread-2 (ActiveMQ-client-global-threads)) AMQ214003: Failed to handle failover: ActiveMQUnBlockedException[errorType=UNBLOCKED message=AMQ219016: Connection failure detected. Unblocking a blocking call that will never get a response]
    at org.apache.activemq.artemis.core.protocol.core.impl.ChannelImpl.sendBlocking(ChannelImpl.java:560)
    at org.apache.activemq.artemis.core.protocol.core.impl.ChannelImpl.sendBlocking(ChannelImpl.java:452)
    at org.apache.activemq.artemis.core.protocol.core.impl.ActiveMQSessionContext.recreateSession(ActiveMQSessionContext.java:909)
    at org.apache.activemq.artemis.core.client.impl.ClientSessionImpl.handleFailover(ClientSessionImpl.java:1444)
    at org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl.reconnectSessions(ClientSessionFactoryImpl.java:933)
    at org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl.failoverOrReconnect(ClientSessionFactoryImpl.java:700)
    at org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl.handleConnectionFailure(ClientSessionFactoryImpl.java:566)
    at org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl$DelegatingFailureListener.connectionFailed(ClientSessionFactoryImpl.java:1407)
    at org.apache.activemq.artemis.spi.core.protocol.AbstractRemotingConnection.callFailureListeners(AbstractRemotingConnection.java:98)
    at org.apache.activemq.artemis.core.protocol.core.impl.RemotingConnectionImpl.fail(RemotingConnectionImpl.java:212)
    at org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl$CloseRunnable.run(ClientSessionFactoryImpl.java:1172)
    at org.apache.activemq.artemis.utils.actors.OrderedExecutor.doTask(OrderedExecutor.java:57)
    at org.apache.activemq.artemis.utils.actors.OrderedExecutor.doTask(OrderedExecutor.java:32)
    at org.apache.activemq.artemis.utils.actors.ProcessorBase.executePendingTasks(ProcessorBase.java:68)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    at org.apache.activemq.artemis.utils.ActiveMQThreadFactory$1.run(ActiveMQThreadFactory.java:118)

2023-08-06 20:18:13,016 INFO  [org.apa.act.art.ra.ActiveMQRALogger] (vert.x-eventloop-thread-5) AMQ151001: Attempting to reconnect org.apache.activemq.artemis.ra.inflow.ActiveMQActivationSpec(ra=org.apache.activemq.artemis.ra.ActiveMQResourceAdapter@757a2641 destination=MyQueue destinationType=jakarta.jms.Queue ack=Auto-acknowledge durable=false clientID=null user=null maxSession=3)
2023-08-06 20:18:13,022 INFO  [org.apa.act.art.ra.ActiveMQRALogger] (vert.x-eventloop-thread-5) AMQ151004: Instantiating jakarta.jms.Queue "MyQueue" directly since UseJNDI=false.
2023-08-06 20:18:13,102 INFO  [org.apa.act.art.ra.ActiveMQRALogger] (vert.x-eventloop-thread-5) AMQ151002: Reconnected with broker
gastaldi commented 1 year ago

@quarkiverse/quarkiverse-artemis Do you think it would make sense to have an extension here for cases where you'd need the Artemis RA? The extension should provide a ResourceAdapterFactory implementation like this: https://github.com/quarkiverse/quarkus-ironjacamar/blob/main/integration-tests/artemis-common/src/main/java/io/quarkiverse/ironjacamar/artemis/ArtemisResourceAdapterFactory.java

turing85 commented 1 year ago

It should be possible (and relatively straight-forward).

Counter-question: is there a reason why we should have a separate extension? Why not simply always provide the ResourceAdapterFactory when ironjacamar is detected?

The main concern I have is that the feature should also work with quarkus-pooled-jms. As I said before: I am not familiar (enough) with JCA to understand whether providing the factory here is sufficient, or whether a separate integration on quarkus-pooled-jms would be needed.

gastaldi commented 1 year ago

It should be possible (and relatively straight-forward).

Great!

Counter-question: is there a reason why we should have a separate extension? Why not simply always provide the ResourceAdapterFactory when ironjacamar is detected?

I am not sure that would work because the extension will be in charge of producing ConnectionFactories and other features that may overlap with the current extension, but I'm happy to be proven wrong.

The main concern I have is that the feature should also work with quarkus-pooled-jms. As I said before: I am not familiar (enough) with JCA to understand whether providing the factory here is sufficient, or whether a separate integration on quarkus-pooled-jms would be needed.

I think the RA already does this job so I am not sure if the quarkus-pooled-jms is necessary TBH, but again, happy to be proven wrong 😉

zhfeng commented 10 months ago

IMO, quarkus-ironjacamar (JCA) can provide pooling and XA support. So I think there is no need for quarkus-pooled-jms. And it looks good to have a separate extension for quarkus-artemis-jms-ra.

I'd like to provide a PR.

turing85 commented 10 months ago

@gastaldi is iron-jacamar available for quarkus 3.2.x LTS?

gastaldi commented 10 months ago

@turing85 it is built against Quarkus 3.2.x, so I believe yes

turing85 commented 10 months ago

@turing85 it is built against Quarkus 3.2.x, so I believe yes

Nice! This means we can backport it to quarkus-artemis 3.0.x (i.e. the LTS-support version).