apache / camel-kafka-connector

Camel Kafka Connector allows you to use all Camel components as Kafka Connect connectors
https://camel.apache.org
Apache License 2.0
154 stars 102 forks source link

camel.source.endpoint.exceptionHandler for the sftp connector not working as expected #1282

Open m20b27 opened 3 years ago

m20b27 commented 3 years ago

I've implemented a custom exceptionHandler for experimenting purpose like below (which is pretty much the same with the LoggingExceptionHandler except I'm using System.out.println() and I've added a prefix for every log):

package org.apache.camel.kafkaconnector;
import org.apache.camel.*;
import org.apache.camel.spi.ExceptionHandler;

public class CustomLoggingExceptionHandler implements ExceptionHandler {
    private final CamelContext camelContext;
    public CustomLoggingExceptionHandler(CamelContext camelContext, Class<?> ownerType) {
        this(camelContext);
    }

    public CustomLoggingExceptionHandler(CamelContext camelContext, Class<?> ownerType, LoggingLevel level) {
        this(camelContext);
    }

    public CustomLoggingExceptionHandler(CamelContext camelContext) {
        this.camelContext = camelContext;
    }

    @Override
    public void handleException(Throwable exception) {
        handleException(null, null, exception);

    }

    @Override
    public void handleException(String message, Throwable exception) {
        handleException(message, null, exception);
    }

    @Override
    public void handleException(String message, Exchange exchange, Throwable exception) {
        try {
            if (!isSuppressLogging()) {
                String msg = CamelExchangeException.createExceptionMessage(message, exchange, exception);
                msg = "##### From CustomLoggingExceptionHandler #####: " + msg;
                if (isCausedByRollbackExchangeException(exception)) {
                    // do not log stack trace for intended rollbacks
                    System.out.println(msg);
                } else {
                    if (exception != null) {
                        System.out.println(msg + exception);
                    } else {
                        System.out.println(msg);
                    }
                }
            }
        } catch (Throwable e) {
            // the logging exception handler must not cause new exceptions to occur
        }
    }
  ...
}

I've configured the sftp source connector to use my own exceptionHandler:

camel.source.endpoint.exceptionHandler=org.apache.camel.kafkaconnector.CustomLoggingExceptionHandler
camel.source.endpoint.throwExceptionOnConnectFailed=true
camel.source.endpoint.onCompletionExceptionHandler=org.apache.camel.kafkaconnector.CustomLoggingExceptionHandler

After I started the connector, I removed the read permission for the local sftp directory to trigger an exception, I was expecting all the error messages have ##### From CustomLoggingExceptionHandler #####: as prefix, but seems like my CustomLoggingExceptionHandler is not being used at all. Below is the actual error message:

[2021-11-01 13:47:06,445] WARN Consumer SftpConsumer[sftp://localhost:9090/data/?exceptionHandler=org.apache.camel.kafkaconnector.CustomLoggingExceptionHandler&fileName=%24%7Bfile%3Aname.noext%7D.csv&move=finished&password=xxxxxx] failed polling endpoint: sftp://localhost:9090/data/?exceptionHandler=org.apache.camel.kafkaconnector.CustomLoggingExceptionHandler&fileName=%24%7Bfile%3Aname.noext%7D.csv&move=finished&password=xxxxxx. Will try again at next poll. Caused by: [org.apache.camel.component.file.GenericFileOperationFailedException - Cannot list directory: .] (org.apache.camel.component.file.remote.SftpConsumer:214)
org.apache.camel.component.file.GenericFileOperationFailedException: Cannot list directory: .
    at org.apache.camel.component.file.remote.SftpOperations.listFiles(SftpOperations.java:732)
    at org.apache.camel.component.file.remote.SftpOperations.listFiles(SftpOperations.java:708)
    at org.apache.camel.component.file.remote.SftpConsumer.doPollDirectory(SftpConsumer.java:139)
    at org.apache.camel.component.file.remote.SftpConsumer.pollDirectory(SftpConsumer.java:97)
    at org.apache.camel.component.file.GenericFileConsumer.poll(GenericFileConsumer.java:140)
    at org.apache.camel.support.ScheduledPollConsumer.doRun(ScheduledPollConsumer.java:190)
    at org.apache.camel.support.ScheduledPollConsumer.run(ScheduledPollConsumer.java:107)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
    at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: 3: Permission denied
    at com.jcraft.jsch.ChannelSftp.throwStatusError(ChannelSftp.java:2873)
    at com.jcraft.jsch.ChannelSftp.ls(ChannelSftp.java:1633)
    at com.jcraft.jsch.ChannelSftp.ls(ChannelSftp.java:1553)
    at org.apache.camel.component.file.remote.SftpOperations.listFiles(SftpOperations.java:723)
    ... 12 more

Any idea of what's wrong with my approach? or what should be the correct way to set up camel.source.endpoint.exceptionHandler for the sftp connector? Thanks

oscerd commented 3 years ago

The following line

camel.source.endpoint.onCompletionExceptionHandler=org.apache.camel.kafkaconnector.CustomLoggingExceptionHandler

should be

camel.source.endpoint.onCompletionExceptionHandler=#class:org.apache.camel.kafkaconnector.CustomLoggingExceptionHandler

m20b27 commented 3 years ago

@oscerd It works like a charm! Thanks. Also would be great if this is documented somewhere.

oscerd commented 3 years ago

Let's keep this open so we could update the docs. Thanks for pointing this out