quarkusio / quarkus

Quarkus: Supersonic Subatomic Java.
https://quarkus.io
Apache License 2.0
13.87k stars 2.71k forks source link

Provide a proper way to register quarkus default message codec for custom types with `EventBus` not using `@ConsumeEvent` #43691

Open azurvii opened 1 month ago

azurvii commented 1 month ago

Description

https://quarkus.io/guides/reactive-event-bus#using-codecs stated that Quarkus provides a default codec for local delivery. It works well, but only when the type is specified in a @ConsumeEvent method.

I need to unsubscribe from and subscribe to message queues dynamically. With @ConsumeEvent it does not seem to provide a way to "unregister" from the events. Thus I was sorted to using EventBus directly. And that would not benefit from Quarkus' default message codec.

Implementation ideas

There was https://github.com/quarkusio/quarkus/issues/23971, but unfortunately it did not take off. There was a quick proposal from @cescoffier: @EventBusCodec(target=MyClass.class).

I think it could also be done without adding new annotations, by exposing the register/unregister methods outside. I have not looked at how Quarkus is doing the registration, but I think how @ConsumeEvent is doing registration should be able to be exposed so that a user can register one too.

cescoffier commented 1 month ago

The event bus has the method to register codecs. If you use the event bus API directly, you should use this method (https://vertx.io/docs/apidocs/io/vertx/rxjava/core/eventbus/EventBus.html#registerCodec-io.vertx.core.eventbus.MessageCodec-)

azurvii commented 1 month ago

Is there a way to obtain the quarkus default message codec to call this method for my message types?

For one app, I have close to a hundred different message types, which I'd like to leverage quarkus' built-in custom message type support, if available. Instead of writing one for each of them manually, or by a custom annotation processor.

cescoffier commented 1 month ago

The implementation is super simple: https://github.com/quarkusio/quarkus/blob/main/extensions/vertx/runtime/src/main/java/io/quarkus/vertx/LocalEventBusCodec.java.

azurvii commented 1 month ago

Thanks for the pointer. I found this to be where the codec registration happens. For me, the key point is that after eventBus.registerCodec(), it needs a call to eventBus.codecSelector() to link the registered codec with the desired types. The generic type is erased at runtime, so registerCodec alone is not helpful for linking to the desired type.

However, eventBus.codecSelector() is a replacement call, not an addition one. i.e. if I call it, it will remove the Quarkus selector. Or if my call is earlier, my selector will be removed. This is not desirable.

I found registerDefaultCodec. Note this is from the Vertx core EventBus. Not sure why, but this is not present in the mutiny EventBus. Mutiny EventBus can call .getDelegate() to obtain the Vertx core EventBus.

Again, this way has some drawbacks. To do it properly, one LocalEventBusCodec can be created for each message types. Apparently this will created many duplicate instances with just different names (LocalEventBusCodec has an internal counter to avoid duplicate names). While just one instance is enough. When attempting to use one instance to register for all types, I found no way to have javac not complain about generic type safety.

Correct me if any of the above is not true. Otherwise, I think the best approach would still be to create some mechanism to register types together with the @ConsumeEvent types. And I take back my initial comment about "not needing a new annotation". I suspect VertxEventBusConsumerRecorder might be called during build time, though the actual registration is happening at runtime. To avoid calling codecSelector() more than once, the selectorTypes should include all types needed to register, which should be done via annotations.

darren-bell-optiva commented 2 weeks ago

Hi @azurvii Thanks for your investigation into this as I was facing a similar issue. And, I agree with the need to provide the capability to be able to specify this dynamically. In the meantime, I was able to achieve this using the "magic" string code that you linked here. I ended up with something like this

bus.send(messageId, messagePayload, new DeliveryOptions().setCodecName("quarkus_default_local_codec").setLocalOnly(true));

Regards