AxonFramework / AxonFramework

Framework for Evolutionary Message-Driven Microservices on the JVM
https://axoniq.io/
Apache License 2.0
3.32k stars 790 forks source link

Exceptions in QueryHandler methods are swallowed for subscription queries #3123

Closed nils-christian closed 1 month ago

nils-christian commented 1 month ago

Hi,

I noticed that exceptions occurring in the QueryHandler annotated methods are swallowed, when the queries are used as subscription queries. This is not the case for "simple" (non-subscribed) queries whose results are retrieved via get.

Thank you and best regards

Nils

Basic information

Please take a look at the following example consisting of three files.

public record MyQuery( ) {

    public static final ResponseType<List<Integer>> INITIAL_RESPONSE_TYPE = ResponseTypes.multipleInstancesOf( Integer.class );
    public static final ResponseType<Integer> UPDATE_RESPONSE_TYPE = ResponseTypes.instanceOf( Integer.class );

}
@Component
public class MyQueryHandler {

    private final List<Integer> internalList = new CopyOnWriteArrayList<>( List.of( 1, 2 ) );
    private final QueryUpdateEmitter queryUpdateEmitter;

    public MyQueryHandler( final QueryUpdateEmitter queryUpdateEmitter ) {
        this.queryUpdateEmitter = queryUpdateEmitter;
    }

    @QueryHandler
    public List<Integer> handle( final MyQuery query ) {
        // During the handling an arbitrary exception occurrs
        throw new NullPointerException( "Something went wrong!" );
        // return List.copyOf( internalList );
    }

    public void addToList( final Integer newElement ) {
        internalList.add( newElement );
        queryUpdateEmitter.emit( MyQuery.class, x -> true, newElement );
    }

}
@SpringBootApplication
public class DemoApplication {

    public static void main( String[] args ) {
        final ConfigurableApplicationContext applicationContext = new SpringApplicationBuilder( )
                .sources( DemoApplication.class )
                .logStartupInfo( false )
                .bannerMode( Mode.OFF )
                .run( args );
        final QueryGateway queryGateway = applicationContext.getBean( QueryGateway.class );
        final SubscriptionQueryResult<List<Integer>, Integer> subscription = queryGateway.subscriptionQuery( new MyQuery( ), MyQuery.INITIAL_RESPONSE_TYPE, MyQuery.UPDATE_RESPONSE_TYPE );

        final MyQueryHandler myQueryHandler = applicationContext.getBean( MyQueryHandler.class );

        subscription.handle(
                initialResult -> System.out.println( "Initial result: " + initialResult ),
                updatedResult -> System.out.println( "Updated result: " + updatedResult ) );

        myQueryHandler.addToList( 3 );

        subscription.cancel( );
        applicationContext.close( );
    }

}

Steps to reproduce

Start the shown Spring Boot application. You will notice that an exception occurs only during canceling the subscription.

Exception in thread "main" reactor.core.publisher.Sinks$EmissionException: Sink emission failed with FAIL_TERMINATED at reactor.core.publisher.Sinks$EmitResult.orThrow(Sinks.java:175) at org.axonframework.queryhandling.SinksManyWrapper.complete(SinksManyWrapper.java:50) at org.axonframework.queryhandling.UpdateHandlerRegistration.complete(UpdateHandlerRegistration.java:73) at org.axonframework.queryhandling.SimpleQueryBus.lambda$getSubscriptionQueryResult$21(SimpleQueryBus.java:519) at org.axonframework.queryhandling.DefaultSubscriptionQueryResult.cancel(DefaultSubscriptionQueryResult.java:62) at org.axonframework.queryhandling.DefaultSubscriptionQueryResult.cancel(DefaultSubscriptionQueryResult.java:62) at com.example.demo.DemoApplication.main(DemoApplication.java:34)

The actual null pointer exception with the message "Something went wrong!" is swallowed by the framework.

Expected behaviour

The exception, which occurs in the QueryHandler annotated method, should be thrown during the handle method of the subscription.

Actual behaviour

The exception is swallowed.

smcvb commented 1 month ago

As always, thanks for filing an issue with us, @nils-christian!

Concerning the predicament you're stating, I want to point out the following from the API documentation on the QueryBus#subscriptionQuery(SubscriptionQueryMessage)

... If there is an error during retrieving or consuming initial result, stream for incremental updates is NOT interrupted. ...

But, although the stream is not interrupted, you still have the opportunity to deal with the exception from the initialResult. The AbstractSubscriptiionQueryTestSuite#subscribingQueryHandlerFailing test method ([here's](https://github.com/AxonFramework/AxonFramework/blob/b387d83e71ba8dafd27c3eeaf745010571fbe292/integrationtests/src/test/java/org/axonframework/integrationtests/queryhandling/AbstractSubscriptionQueryTestSuite.java#L461] the code for it) Pulls the exception from the ResultMessage that's returned.

Granted, this isn't the case with the QueryGateway, which performs the following operation to retrieve a result, if present:

private <I, U> DefaultSubscriptionQueryResult<I, U> getSubscriptionQueryResult(
        SubscriptionQueryResult<QueryResponseMessage<I>, SubscriptionQueryUpdateMessage<U>> result) {
    return new DefaultSubscriptionQueryResult<>(
            result.initialResult()
                  .filter(initialResult -> Objects.nonNull(initialResult.getPayload()))
                  .map(Message::getPayload)
                  .onErrorMap(e -> e instanceof IllegalPayloadAccessException ? e.getCause() : e),
            result.updates()
                  .filter(update -> Objects.nonNull(update.getPayload()))
                  .map(SubscriptionQueryUpdateMessage::getPayload),
            result
    );
}

Since the initialResult#getPayload() is null, the initial result will resolve to null. The fact these results are ignored is further shown in the JavaDoc of the QueryGateway#subscriptionQuery with the following statement:

... Note: Any null results, on the initial result or the updates, wil lbe filtered out by the QueryGateway. If you require the null to be returned for the initial and update results, we suggest using the QueryBus instead. ...

Again, granted, it might be somewhat cryptic how exceptions are dealt with.

However, I don't think we're dealing with a bug per se. If anything, there is an unclarity in the documentation on this subject.

So, after sharing all that, what's your take, @nils-christian? Would things have been resolved for you if the QueryGateway documentation is clearer on the subject?

nils-christian commented 1 month ago

Hi @smcvb,

To be honest I don't think that this is just a documentation issue. I would assume that most people would just use the gateway classes and (almost) never the bus classes directly (at least in my experience). Especially when it comes to the reactive stuff in the query bus. So in my opinion the QueryGateway should handle this situation better.

A swallowed exception makes it very difficult to track down an error. Even if the gateway method doesn't rethrow the exception, it should still log it somehow and make it visible. The fact that only the not called handler methods point to an error is very dangerous.

How about using the doOnError method in getSubscriptionQueryResult to log the exceptions occurring in initialResult and updates? Granted, I am not an expert in reactive programming, but it should do the trick and help users of the framework to track down errors.

result.initialResult( )
  .filter( initialResult -> Objects.nonNull( initialResult.getPayload( ) ) )
  .map( Message::getPayload )
  .onErrorMap( e -> e instanceof IllegalPayloadAccessException ? e.getCause( ) : e )
  .doOnError( e -> {
    logger.warn( "An exception occurred while calling a query handler method.", e );
   } ),

Best regards,

Nils

smcvb commented 1 month ago

I would assume that most people would just use the gateway classes and (almost) never the bus classes directly (at least in my experience).

This is most definitely true. Hence why I was leaning towards a discrepancy between the JavaDoc of both in this subject. Any documentation on the QueryBus that impacts the gateway thusly is paramount to exist there.

Granted, I get where you're coming from here. Although I find calling incorrect query results dangerous debatable, especially given that the updates are most important from the subscription query, I do acknowledge a discrepancy here. In all honesty, before proceeding forward, I want to check with the team members that concocted the subscription query implementation, to validate that I am not missing any important reasoning for this. With that outcome, we can deduce whether we go for (1) sticking to this solution (doubt that'll be the outcome though), (2) start logging a warning, or (3) bubble up the exception.

smcvb commented 1 month ago

After checking in with the colleague that created this, we concluded there wasn't really any intent why SubscriptionQueryResult#handle would swallow the exception. As such, I went ahead and made a PR that adds another handle method to the SubscriptionQueryResult, which expects a Consumer<Throwable> to be given. Furthermore, the original handle method has been deprecated. If you're curious, you can find the PR here.

smcvb commented 1 month ago

Closing this issue as it as been resolved in pull request #3124.