smallrye / smallrye-reactive-messaging

SmallRye Reactive Messaging
http://www.smallrye.io/smallrye-reactive-messaging/
Apache License 2.0
235 stars 176 forks source link

JMS publisher and subscriber in one JMSContext #477

Open andrejpetras opened 4 years ago

andrejpetras commented 4 years ago

Hi, would it be possible to add functionality for JMS publisher and subscriber to use the same JMSContext? I would like to use the SESSION_TRANSACTED.

I created my "test app" and extend the incoming/outgoing channel with "transaction-group". If they are in the same group, they will share the JMSContext. The JmsSink does the commit. Example: https://github.com/andrejpetras/quarkus-jms-stream/blob/master/impl/src/main/java/io/smallrye/reactive/messaging/jms/JmsConnector.java

cescoffier commented 4 years ago

That sounds a great addition!

What we can do is pass the context in the (incoming) message metadata. So, if we get the metadata in the outbound connector (JmsSink), we can use this context and commit. WDYT?

andrejpetras commented 4 years ago

Does this depend on the @Acknowledgment? I am not sure if my test was correct but I have a method without @Acknowledgment and first was ack on incoming message and then commit in JmsSink (JMS publisher). This is why I do it in the JmsSink and there is JMSContext.

cescoffier commented 4 years ago

That's a good question - should it be committed in the acknowledgment callback?

Let's imagine I receive a JMS message. The connector creates the Reactive Messaging Message with an acknowledgment callback acknowledging the JMS message. Should the commit happen at the same time?

This callback is invoked when the message gets acknowledged. This can be triggered by the outbound connector (JMS or other), or earlier depending on the acknowledgment strategy.

andrejpetras commented 4 years ago

In my case I would like to have one acknowledgment after received message and sent all out-going messages.

  1. receive message - consumer.receive()
  2. send messages - producer.send(...)
  3. commit

I am not sure if this is possible with current specification of the @Acknowledgment.

I did my manual tests:

public Message<String> test1(Message<String> input) {
    return Message.of("RESULT " + input.getPayload());
}
  1. INCOMING MESSAGE COMMIT/ACK
  2. JMS-SINK COMMIT/ACK
    public PublisherBuilder<Message<String>> message(Message<String> input) {
    String data = input.getPayload();
    return ReactiveStreams.of(Message.of("RESULT_1 " + data), Message.of("RESULT_2 " + data));
    }
  3. INCOMING MESSAGE COMMIT/ACK
  4. PRODUCER SEND
  5. JMS-SINK COMMIT/ACK
  6. PRODUCER SEND
  7. JMS-SINK COMMIT/ACK
    @Acknowledgment(Acknowledgment.Strategy.MANUAL)
    public PublisherBuilder<Message<String>> message(Message<String> input) {
    String data = input.getPayload();
    return ReactiveStreams.of(Message.of("RESULT_1 " + data), Message.of("RESULT_2 " + data))
            .onComplete(() -> {
                System.out.println("ON_COMPLETE");
            });
    }
  8. INCOMING MESSAGE COMMIT/ACK
  9. PRODUCER SEND null
  10. JMS-SINK COMMIT/ACK
  11. PRODUCER SEND null
  12. JMS-SINK COMMIT/ACK
  13. ON_COMPLETE

Would it be possible to have for MANUAL @Acknowledgment the context.commit() in the onComplete method and no incoming and outgoing messages ack ?

@Acknowledgment(Acknowledgment.Strategy.MANUAL)
public PublisherBuilder<Message<String>> message(Message<String> input) {
    String data = input.getPayload();
    return ReactiveStreams.of(Message.of("RESULT_1 " + data), Message.of("RESULT_2 " + data))
            .onComplete(() -> {
                context.commit();
            });
}
andrejpetras commented 4 years ago
@Acknowledgment(Acknowledgment.Strategy.MANUAL)
public PublisherBuilder<Message<String>> message(Message<String> input) {
    String data = input.getPayload();
    return ReactiveStreams.of(Message.of("RESULT_1 " + data), Message.of("RESULT_2 " + data))
            .onComplete(() -> {
                // input.commit();
                // input.ack() -> JMSContext.commit()
                input.ack();
            });
}
andrejpetras commented 4 years ago

In the end, I had this solution

@Incoming("input")
@Acknowledgment(Acknowledgment.Strategy.MANUAL)
public CompletionStage<Void> message(IncomingJmsTxMessage<Data> input) {
    try {    
      ...
      // optional send message
      input.send(Message.of("Output"));

      // session commit
      return input.ack();
    } catch (Exception ex) {
        // session rollback
        return input.rollback();
    }
}

This extension is based on the smallrye-reactive-messaging-jms version 2.0.0 https://github.com/lorislab/quarkus-reactive-jms-tx

cescoffier commented 4 years ago

Nice! And Yes SmallRye Reactive Messaging is bringing a few more things around acknowledgment (and we have more to come).

Note that you can still receive a regular message and extract the "TX part" in specific metadata.

cescoffier commented 4 years ago

BTW, I would love to have this here, so if you like to contribute this feature, feel free to open a PR.