spring-projects / spring-data-mongodb

Provides support to increase developer productivity in Java when using MongoDB. Uses familiar Spring concepts such as a template classes for core API usage and lightweight repository style data access.
https://spring.io/projects/spring-data-mongodb/
Apache License 2.0
1.61k stars 1.08k forks source link

Exceptions from MessageListeners are not caught by change stream container's error handler [DATAMONGO-2322] #3179

Closed spring-projects-issues closed 5 years ago

spring-projects-issues commented 5 years ago

Daniel John Gomez opened DATAMONGO-2322 and commented

When starting a DefaultMessageListenerContainer, the exceptions thrown on the MessageListener.onMessage() are not caught by the error handler passed on the container's constructor. They are instead propagated to the uncaught exception handler of the container thread.

This bug can be replicated using this test case:

package io.toro.integrate.endpoint;

import java.time.Duration;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import org.bson.Document;
import org.springframework.data.mongodb.core.ChangeStreamOptions;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.data.mongodb.core.messaging.ChangeStreamRequest;
import org.springframework.data.mongodb.core.messaging.ChangeStreamRequest.ChangeStreamRequestOptions;
import org.springframework.data.mongodb.core.messaging.DefaultMessageListenerContainer;
import org.springframework.data.mongodb.core.messaging.Message;
import org.springframework.data.mongodb.core.messaging.MessageListener;
import org.springframework.data.mongodb.core.messaging.Subscription;
import org.springframework.util.ErrorHandler;
import org.testng.annotations.Test;

import com.mongodb.client.MongoClients;
import com.mongodb.client.model.changestream.ChangeStreamDocument;

public class SpringContainerBugTest {

    @Test
    public void testExceptionFromMessageListenerIsCaughtByContainerErrorHandler() throws InterruptedException {
        String databaseName = "testDb";
        String collectionName = "testCollection";
        MongoTemplate template = new MongoTemplate( MongoClients.create(), databaseName );

        CountDownLatch errorLatch = new CountDownLatch( 1 );
        ErrorHandler errorHandler = t -> {
            System.out.println( "Exception caught by error handler: " + t.getMessage() );
            errorLatch.countDown();
        };

        DefaultMessageListenerContainer container = new DefaultMessageListenerContainer( template,
                Executors.newSingleThreadExecutor(), errorHandler );

        AlwaysFailingMessageListener messageListener = new AlwaysFailingMessageListener();
        ChangeStreamRequestOptions requestOptions = new ChangeStreamRequestOptions( databaseName, collectionName,
                ChangeStreamOptions.empty() );
        Subscription subscription = container.register( new ChangeStreamRequest<>( messageListener, requestOptions ),
                Document.class );
        container.start();

        if ( !subscription.await( Duration.ofSeconds( 5 ) ) )
            throw new RuntimeException( "Subscription not registered" );

        template.getCollection( collectionName ).insertOne( new Document( "key", "value" ) );

        if ( !errorLatch.await( 5, TimeUnit.SECONDS ) )
            throw new RuntimeException( "Exception not caught by error handler" );
    }

    class AlwaysFailingMessageListener implements MessageListener<ChangeStreamDocument<Document>, Document> {

        @Override
        public void onMessage( Message<ChangeStreamDocument<Document>, Document> message ) {
            throw new RuntimeException( "Exception from MessageListener.onMessage()" );
        }
    }
}

The test case uses a message listener that throws an exception upon receiving a message. It then asserts that the exception is caught by the exception handler.

As a workaround, the exception handler can be passed on the MessageListener and manually handle exceptions when they're thrown on the onMessage() method


Affects: 2.1.9 (Lovelace SR9)

Backported to: 2.1.10 (Lovelace SR10)

1 votes, 2 watchers

spring-projects-issues commented 5 years ago

Mark Paluch commented

That's fixed now