quarkiverse / quarkus-discord4j

A JVM-based REST/WS wrapper for the official Discord Bot API, written in Java
https://discord4j.com/
Apache License 2.0
4 stars 2 forks source link

Mutiny/Reactor error in tests #21

Closed gsmet closed 8 months ago

gsmet commented 9 months ago

Hey @jponge ,

Discord4j is using Mono and when executing the tests, I get this error:

[INFO] Running io.quarkiverse.discord4j.test.Discord4jGatewayEventListenerTest
2024-02-10 20:11:26,846 INFO  [dis.cor.DiscordClientBuilder] (main) Discord4J 3.2.6 (https://discord4j.com)
2024-02-10 20:11:27,962 INFO  [dis.gat.DefaultGatewayClient] (vert.x-eventloop-thread-2) [G:e24c710, S:0] Connected to Gateway
2024-02-10 20:11:27,962 INFO  [dis.cor.sha.LocalShardCoordinator] (vert.x-eventloop-thread-2) [G:e24c710, S:0] Shard connected
2024-02-10 20:11:27,987 INFO  [io.quarkus] (main) quarkus-discord4j-deployment 999-SNAPSHOT on JVM (powered by Quarkus 3.7.2) started in 1.530s. Listening on: http://localhost:8081
2024-02-10 20:11:27,987 INFO  [io.quarkus] (main) Profile test activated. 
2024-02-10 20:11:27,987 INFO  [io.quarkus] (main) Installed features: [cdi, discord4j, smallrye-context-propagation, smallrye-health, smallrye-metrics, vertx]
2024-02-10 20:11:28,150 INFO  [io.qua.dis.tes.Discord4jGatewayEventListenerTest$MyBean] (d4j-events-3) Received MessageCreateEvent
2024-02-10 20:11:28,151 INFO  [io.qua.dis.tes.Discord4jGatewayEventListenerTest$MyBean] (d4j-events-1) Received MessageCreateEvent
2024-02-10 20:11:28,156 ERROR [rea.cor.pub.Operators] (d4j-events-4) Operator called default onErrorDropped: reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.ClassCastException: class io.smallrye.mutiny.groups.MultiCreate$1 cannot be cast to class org.reactivestreams.Publisher (io.smallrye.mutiny.groups.MultiCreate$1 and org.reactivestreams.Publisher are in unnamed module of loader io.quarkus.bootstrap.classloading.QuarkusClassLoader @654b9f0)
Caused by: java.lang.ClassCastException: class io.smallrye.mutiny.groups.MultiCreate$1 cannot be cast to class org.reactivestreams.Publisher (io.smallrye.mutiny.groups.MultiCreate$1 and org.reactivestreams.Publisher are in unnamed module of loader io.quarkus.bootstrap.classloading.QuarkusClassLoader @654b9f0)
    at reactor.core.publisher.FluxFlatMap$FlatMapMain.onNext(FluxFlatMap.java:388)
    at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.tryOnNext(FluxContextWrite.java:115)
    at reactor.core.publisher.FluxDoFinally$DoFinallyConditionalSubscriber.tryOnNext(FluxDoFinally.java:172)
    at reactor.core.publisher.FluxPeekFuseable$PeekFuseableConditionalSubscriber.tryOnNext(FluxPeekFuseable.java:530)
    at reactor.core.publisher.FluxHandleFuseable$HandleFuseableConditionalSubscriber.tryOnNext(FluxHandleFuseable.java:562)
    at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.tryOnNext(FluxMapFuseable.java:317)
    at reactor.core.publisher.FluxFilterFuseable$FilterFuseableConditionalSubscriber.tryOnNext(FluxFilterFuseable.java:367)
    at reactor.core.publisher.FluxPublishOn$PublishOnConditionalSubscriber.runAsync(FluxPublishOn.java:1004)
    at reactor.core.publisher.FluxPublishOn$PublishOnConditionalSubscriber.run(FluxPublishOn.java:1079)
    at reactor.scheduler.forkjoin.ForkJoinPoolScheduler$DisposableWorkerTask.run(ForkJoinPoolScheduler.java:443)
    at reactor.scheduler.forkjoin.ForkJoinPoolScheduler$Worker.processTaskQueue(ForkJoinPoolScheduler.java:407)
    at java.base/java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1395)
    at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373)
    at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1182)
    at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1655)
    at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1622)
    at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:165)

There's probably something wrong somewhere and your insights would be welcome!

jponge commented 8 months ago

Reactor is stuck on Java 6 era Reactive Streams APIs while we're on JDK Flow APIs, hence the cast fails.

There are 2 options:

  1. use https://github.com/smallrye/smallrye-mutiny/tree/main/reactor, or
  2. use the thin adapter from https://smallrye.io/smallrye-mutiny-zero/1.0.0/flow-adapters/ to go from a Publisher to a Flow.Publisher
gsmet commented 8 months ago

@jponge what I don't know is how I can figure out where to fix that given the stack is not helping. I was wondering if you would have some advice? Also maybe we should translate the reactor things directly to Mutiny in the project itself.

Note that it's not my project, I was just trying to help moving it to Quarkus 3.7 and trying to release a first version as people were asking for it.

jponge commented 8 months ago

I feel the pain on these kind of stack traces.

If you are in the know-how of reactive streams and the 2 APIs in the wild then the class cast issue makes sense, but if not, it doesn't make sense.

I'll have a look at Discord4J to see if it'd be a good candidate for Mutiny/Mutiny Zero, or if it's yet another library that uses Reactor to be "reactive" while it just puts streams from non-backpressured in-memory data chunks.

gsmet commented 8 months ago

@jponge yeah, I'm not sure we'll be able to make a push in the upstream library but if we could at least be clean on the Quarkus extension side and make sure we don't do anything wrong, that would be awesome. I don't think there is a lot of code to check, it looks like a relatively small project on the Quarkus side.

Thanks!

jponge commented 8 months ago

I'm looking at the code.

The stacktrace is because there's a flatMap in Reactor Flux where a Multi gets returned from the flatMap function (e.g., flux.flatMap(n -> functionThatReturnsMulti(n)). This used to work when Mutiny was based on legacy / Java 6 Reactive Streams APIs, but not since we moved to JDK Flow APIs.

I'm still unsure if it's just the test case that needs a fix, or if there's something more fundamental to fix, especially as there's lots of code generation (and I'm not sure why this is all needed when we have a client library, but...). Also note that this client uses reactor-netty, hence more events loops in Quarkus applications that will use it.