Open justanu opened 6 years ago
Good that you mention this. I've already developed modules that communicate with Apache ActiveMQ Artemis like this. The problem is that OpenAS2 isn't quite ready for this, so the solution is a bit hacky. But we're planning to put in into production. One day, soon.
@jsmucr Please feel free to suggest how OpenAS2 can be adapted to make it easier to create a message queue module
@justanu You can follow the suggestions on how to add the inbound and outbound handlers as discussed here: https://sourceforge.net/p/openas2/discussion/265567/thread/75b5def5/ Essentially, for inbound messages implement a class that replaces org.openas2.processor.storage.MessageFileModule for passing received messages into a queue. Declare the module in the same way as the MessageFileModule is declared .
Similarly for outbound messages implement a class that replaces org.openas2.processor.receiver.AS2DirectoryPollingModule for receiving messages to be sent via AS2 from the message queue
You should be able to copy the 2 classes listed above and then replace the relevant code in the class with code that reads/writes message queues.
@uhurusurfa will follow the instructions and update the ticket. Thanks everyone.
@uhurusurfa Sure. I didn't mean to be rude. I appreciate the software very much and will gladly help with its development (however my boss allows me to :-)).
This is an example of my code (Groovy). There's a nasty thing I had to use to be able to create an AS2Message
instance with partnership details preset (since there's no input file path to extract these things from):
import cz.aimtec.mq.MapUtils
import org.apache.activemq.artemis.api.core.client.ClientMessage
import org.openas2.OpenAS2Exception
import org.openas2.Session
import org.openas2.message.AS2Message
import org.openas2.message.Message
class MessageSourceModule extends ClouEDIMQSourceModule {
private ThreadLocal<Map> messageProperties = new ThreadLocal<>()
@Override
void init(Session session, Map<String, String> parameters) throws OpenAS2Exception {
super.init(session, parameters)
consumer.receiveMessageAsync(this.&receive)
}
@Override
protected Message createMessage() {
final msg = new AS2Message()
final props = messageProperties.get()
msg.partnership.senderIDs.name = props.message.sender.toString()
msg.partnership.receiverIDs.name = props.message.receiver.toString()
return msg
}
@Override
void doStart() throws OpenAS2Exception {
consumer.start()
}
@Override
void doStop() throws OpenAS2Exception {
consumer.stop()
}
@Override
boolean healthcheck(List<String> failures) {
return true
}
protected boolean receive(ClientMessage msg) {
// HACK: I need to propagate certain received params to the createMessage() method
messageProperties.set(MapUtils.explodeKeys(msg.toPropertyMap(), '.' as char))
processDocument(msg.bodyInputStream, msg.getStringProperty('message.file.name'))
messageProperties.set(null)
}
}
Is there a way to easily debug the server? I've built MessageAMQPModule to send AS2 messages to AMQP queue instead of file system, set new module on config.xml. I would like now to start the server in debug mode, can someone explain shortly how I can do that? In the mean time , I will try on my own.
Try adding the server itself as a Maven dependency to your module. The only downside is that the latest version available there is OpenAS2 2.4.1.
I would suggest your first step should be a unit test but you can use the unit test that does a full system test(/src/test/java/org/openas2/app/OpenAS2ServerTest.java) and run it in debug mode in your IDE. This is easily accomplished in Eclipse and can provide more details to do this if you are using Eclipse
I got the inbound working nicely:
<module classname="org.openas2.processor.storage.MessageAMQPModule"
mq_connection_string="amqp://user1:user1@ubuntudev:5672/soasol"
mq_exchange="as2"
mq_routingkey="as2"/>
Header: AS2-From: | OpenAS2A_OID AS2-To: | OpenAS2B_OID AS2-Version: | 1.1 .... Payload: Cg8wh2Qu.....
Will work on outbound as well.
Got the outbound working as well, however there are few other things to consider, the retry and the MDN.
So far here is an outbound configuration module:
<module classname="org.openas2.processor.receiver.AS2AMQPPollingModule" interval="20" mq_connection_string="amqp://user1:user1@ubuntudev:5672/soasol" mq_queue="AS2_Outbound" format="sender.as2_id, receiver.as2_id, attributes.fileid" errordir="%home%/../data/toOpenAS2B/error"/>
The interval is irrelevant since whenever the module starts, it listen to the messages on the AMQP queue. This is good since it actions immedialtely on receving of the message.
The MQ message must have AS2-From and AS2-To set to match the AS2 ides as defined on the partnership.xml configuration file.
I still need to see how I can replace the "errorDir" with a different mechanism, and probably create a retry module from queue instead of file system.
Changed it slightly as following:
INBOUND
<module classname="org.openas2.processor.storage.MessageAMQPModule" mq_connection_string="amqp://user1:user1@ubuntudev:5672/soasol" mq_queue="AS2_Inbound"/>
OUTBOUND
<module classname="org.openas2.processor.receiver.AS2AMQPPollingModule" interval="9999" mq_connection_string="amqp://user1:user1@ubuntudev:5672/soasol" mq_queue="AS2_Outbound" mq_error_queue="AS2_Error" format="sender.as2_id, receiver.as2_id, attributes.fileid" errordir=""/>
The interval and errordir are not used. Will build test cases and provide the code
Make sure you test with the latest version in the "dev" branch. There are significant changes to the base code for handling HTTP transport as well as fixes for resend functionality but no change to the module interface API so it should work the same
@justanu Have you managed to complete this feature to include in the next release?
@justanu Are you still going to provide the solution back to the project or shall I close this issue?
If there is no progress I'll pick up on this.
I'm picking up on this. After some work, I'll have to do some refactoring and code reorganization of the packages. Is that possible?
Basically, I want to extract the DirectoryPollingModule.java Interface and relocate it into its own package to simplify the creation of similar extensions as external libraries to the project.
I am prpbably missing something but not sure what exactly you want to do in terms of refactoring. The DirectoryPollingModule is already a pluggable module. A RabbitMQ implementation would simply use the same interface as the DirectoryPollingModule to pass files into OpenAS2.
ie. You simply extend MessageBuilderModule
Any module you plug in would then pass files it has to the AS2SenderModule via the MessageBuilder implementation.
This is a new feature not a bug. I would like to create a new handler for both inbound and outbound messages: