wso2 / andes

Apache License 2.0
39 stars 129 forks source link

Fix for creating and re-using secured durable subscriptions of topics in a JMS inbound endpoint #1026

Closed akramkammoun closed 4 years ago

akramkammoun commented 5 years ago

Purpose

In a JMS inbound endoint, I want to set a username and a password in the parameters to be able to create or re-use a secured durable subscription of a topic, as in the following example:

<?xml version="1.0" encoding="UTF-8"?>
<inboundEndpoint name="MX00.securedurabletopic.inep" onError="MX00.securedurabletopic.seq" protocol="jms" sequence="MX00.securedurabletopic.seq" statistics="enable" suspend="false" trace="enable" xmlns="http://ws.apache.org/ns/synapse">
    <parameters>
        <parameter name="interval">1000</parameter>
        <parameter name="sequential">true</parameter>
        <parameter name="coordination">true</parameter>
        <parameter name="transport.jms.Destination">SecureTopic</parameter>
        <parameter name="transport.jms.CacheLevel">3</parameter>
        <parameter name="transport.jms.ConnectionFactoryJNDIName">TopicConnectionFactory</parameter>
        <parameter name="java.naming.factory.initial">org.wso2.andes.jndi.PropertiesFileInitialContextFactory</parameter>
        <parameter name="java.naming.provider.url">conf/jndi.properties</parameter>
        <parameter name="transport.jms.UserName">subscriber_user</parameter>
        <parameter name="transport.jms.SessionAcknowledgement">SESSION_TRANSACTED</parameter>
        <parameter name="transport.jms.Password">subscriber_user</parameter>
        <parameter name="transport.jms.SessionTransacted">true</parameter>
        <parameter name="transport.jms.SubscriptionDurable">true</parameter>
        <parameter name="transport.jms.ConnectionFactoryType">topic</parameter>
        <parameter name="transport.jms.DurableSubscriberName">myDurableSubClientName</parameter>
        <parameter name="transport.jms.SharedSubscription">false</parameter>
        <parameter name="transport.jms.ResetConnectionOnPollingSuspension">false</parameter>
    </parameters>
</inboundEndpoint>

However, WSO2 cannot create or re-use a durable subscription of a topic and throws two types of exception, respectively:

First exception type: [2019-07-24 15:31:54,201] [EI-Core] ERROR - JMSConnectionFactory JMS Exception while creating connection through factory 'TopicConnectionFactory' Client name cannot be changed after being set [2019-07-24 15:31:54,202] [EI-Core] DEBUG - AMQProtocolHandler (391791501)Method frame received: [ConnectionCloseOkBodyImpl: ] [2019-07-24 15:31:54,203] [EI-Core] DEBUG - AMQProtocolHandler Session closed called by client [2019-07-24 15:31:54,203] [EI-Core] DEBUG - AMQProtocolHandler Session closed called by client [2019-07-24 15:31:54,203] [EI-Core] DEBUG - AMQProtocolHandler Protocol Session [ org.wso2.andes.client.protocol.AMQProtocolHandler@175a438d ] closed [2019-07-24 15:31:54,203] [EI-Core] DEBUG - AMQProtocolHandler Protocol Session [ org.wso2.andes.client.protocol.AMQProtocolHandler@175a438d ] closed [2019-07-24 15:31:54,203] [EI-Core] WARN - JMSPollingConsumer Inbound JMS endpoint unable to get a connection.

Second exception type: [2019-07-24 15:53:45,992] [EI-Core] DEBUG - AMQDestination Based on topic://amq.topic/SecureTopic/carbon:myDurableSubClientName123?routingkey='SecureTopic'&durable='true'&exclusive='true' the selected destination syntax is BURL [2019-07-24 15:53:45,993] [EI-Core] DEBUG - AMQProtocolHandler (457622435)Method frame received: [ExchangeDeclareOkBodyImpl: ] [2019-07-24 15:53:45,997] [EI-Core] DEBUG - AMQProtocolHandler (457622435)Method frame received: [ChannelCloseBodyImpl: replyCode=541, replyText=Cannot declare queue('carbon:mydurablesubclientname123'), as exclusive queue with same name declared on another client ID('clientID') your clientID('1563998025986'), classId=50, methodId=10] [2019-07-24 15:53:45,998] [EI-Core] ERROR - JMSConnectionFactory JMS Exception while creating consumer. Error registering consumer: org.wso2.andes.AMQChannelClosedException: Error: Cannot declare queue('carbon:mydurablesubclientname123'), as exclusive queue with same name declared on another client ID('clientID') your clientID('1563998025986') [error code 541: internal error] javax.jms.JMSException: Error registering consumer: org.wso2.andes.AMQChannelClosedException: Error: Cannot declare queue('carbon:mydurablesubclientname123'), as exclusive queue with same name declared on another client ID('clientID') your clientID('1563998025986') [error code 541: internal error] at org.wso2.andes.client.AMQSession$6.execute(AMQSession.java:2178) at org.wso2.andes.client.AMQSession$6.execute(AMQSession.java:2121) at org.wso2.andes.client.AMQConnectionDelegate_8_0.executeRetrySupport(AMQConnectionDelegate_8_0.java:339) at org.wso2.andes.client.AMQConnection$3.run(AMQConnection.java:663) at java.security.AccessController.doPrivileged(Native Method) at org.wso2.andes.client.AMQConnection.executeRetrySupport(AMQConnection.java:660) at org.wso2.andes.client.failover.FailoverRetrySupport.execute(FailoverRetrySupport.java:102) at org.wso2.andes.client.AMQSession.createConsumerImpl(AMQSession.java:2186) at org.wso2.andes.client.AMQSession.createConsumer(AMQSession.java:1091) at org.wso2.andes.client.AMQSession.createDurableSubscriber(AMQSession.java:1233) at org.wso2.carbon.inbound.endpoint.protocol.jms.factory.JMSConnectionFactory.createMessageConsumer(JMSConnectionFactory.java:373) at org.wso2.carbon.inbound.endpoint.protocol.jms.factory.CachedJMSConnectionFactory.createMessageConsumer(CachedJMSConnectionFactory.java:126) at org.wso2.carbon.inbound.endpoint.protocol.jms.factory.CachedJMSConnectionFactory.getMessageConsumer(CachedJMSConnectionFactory.java:118) at org.wso2.carbon.inbound.endpoint.protocol.jms.JMSPollingConsumer.poll(JMSPollingConsumer.java:267) at org.wso2.carbon.inbound.endpoint.protocol.jms.JMSPollingConsumer.execute(JMSPollingConsumer.java:204) at org.wso2.carbon.inbound.endpoint.protocol.jms.JMSTask.taskExecute(JMSTask.java:47) at org.wso2.carbon.inbound.endpoint.common.InboundTask.execute(InboundTask.java:45) at org.wso2.carbon.mediation.ntask.NTaskAdapter.execute(NTaskAdapter.java:98) at org.wso2.carbon.ntask.core.impl.TaskQuartzJobAdapter.execute(TaskQuartzJobAdapter.java:67) at org.quartz.core.JobRunShell.run(JobRunShell.java:213) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)

Approach

As a solution, we need to edit the method Connection createConnection(String userName, String password, String id) existing in the class org.wso2.andes.client.AMQConnectionFactory, in two steps:

1) Adding this code, which it is necessary for WSO2 to create a secured durable subcription, as shown in the commit:

        /**
         * In AMQP it is not possible to change the client ID. If one is not specified upon connection
         * construction, an id is generated automatically. Therefore we can always throw an exception.
         *
         * We can bypass this illegal state exception by setting the following system property. So we
         * are setting it up here to fix the issue https://wso2.org/jira/browse/MB-162. 
         * */

        // Requires permission java.util.PropertyPermission "ignore_setclientID", "write";
        AccessController.doPrivileged(new PrivilegedAction<Void>() {
            public Void run() {
                System.setProperty(ClientProperties.IGNORE_SET_CLIENTID_PROP_NAME, "true");
                return null; // nothing to return
            }
        });

It should be noted that this code is implemented in another method, named Connection createConnection(), that exists in the same class.

2) Deleting this code line so WSO2 can re-use an existing secured durable subcription, as shown in the commit:

         _connectionDetails.setClientName(id);

Release note

Fix for creating and re-using secured durable subscriptions of topics in a JMS inbound endpoint

Documentation

N/A This PR is a fix which I do not think it needs a documentation.

Certification

N/A This PR is a fix which I do not think it affects the certification.

Security checks

Migrations (if applicable)

Tested on WSO2 6.4.0

Test environment

JDK: java 1.8.0 Operating system: Windows7

CLAassistant commented 5 years ago

CLA assistant check
Thank you for your submission, we really appreciate it. Like many open source projects, we ask that you sign our Contributor License Agreement before we can accept your contribution.


Akram.Kammoun seems not to be a GitHub user. You need a GitHub account to be able to sign the CLA. If you have already a GitHub account, please add the email address used for this commit to your account.
You have signed the CLA already but the status is still pending? Let us recheck it.

jpbelangerfromquebec commented 5 years ago

Why is this still in this state ? Can someone do something to complete this ?

sdkottegoda commented 4 years ago

Hi @akramkammoun thank you very much for your contribution. We will evaluate this and update the issue as soon as possible.

sdkottegoda commented 4 years ago

Hi @akramkammoun, we do not allow re-using a durable topic subscription while another active one exists with the same subscription Id by default.

If you want to re-use a durable subscription with the same id, you might need to look into the concept of Sharing a Durable Topic Subscription in the official documentation

Also, setting the username and the password can be done in the connection URL defined in the jndi.properties file

Please get back to us for further clarifications.

sdkottegoda commented 4 years ago

Closing this PR with the previous comment. Please re-open for any further concerns.