Closed ctron closed 5 years ago
https://issues.apache.org/jira/browse/PROTON-2029
Obvious workaround, don't call it multiple times. Less obvious workaround, there is an isSettled() method on vertx-proton's ProtonDeliveryImpl, you could cast to that and check it first.
The underlying issue is in proton-j as above and a fix will be made there soon, but a similar fix can be achieved inside vertx-proton per the previous workaround details, so I've made that change in 9cd3cd5f2113da4e80fc2a6da30dd2c1c50cf604 (master branch) and 17fab8cde4d04923063c71753dcbc1668ba74738 (3.7 branch), also exposing the isSettled() method via the interface.
When
io.vertx.proton.ProtonDelivery.disposition(DeliveryState, boolean)
gets called multiple times, withsettled=true
, then the disposition gets sent out multiple times.Reproducer
~~~java import org.apache.qpid.proton.amqp.messaging.Accepted; import org.apache.qpid.proton.amqp.messaging.Released; import org.apache.qpid.proton.message.Message; import io.vertx.core.Vertx; import io.vertx.proton.ProtonClient; import io.vertx.proton.ProtonServer; public class Application { public static void main(final String[] args) throws InterruptedException { final var vertx = Vertx.factory.vertx(); final var server = ProtonServer.create(vertx) .connectHandler(con -> { con.sessionOpenHandler(session -> session.open()); con.openHandler(res -> con.open()); con.disconnectHandler(res -> con.disconnect()); con.closeHandler(res -> { con.close(); con.disconnect(); }); con.senderOpenHandler(sender -> { sender.setSource(sender.getRemoteSource()); sender.open(); final var msg = Message.Factory.create(); msg.setSubject("foo"); sender.send(msg, del -> { System.out.println("Delivery: " + del.getRemoteState()); }); }); }) .listen(5672); Thread.sleep(250); final var client = ProtonClient.create(vertx); client.connect("localhost", server.actualPort(), con -> { if (con.succeeded()) { con.result() .openHandler(open -> { System.out.println("opened"); open.result().createReceiver("foo") .handler((del, msg) -> { System.out.println("Message"); del.disposition(Accepted.getInstance(), true); del.disposition(Released.getInstance(), true); }) .open(); }) .open(); } }); Thread.sleep(Long.MAX_VALUE); } } ~~~amqp-double-disposition.pcapng.gz