apache / pulsar

Apache Pulsar - distributed pub-sub messaging system
https://pulsar.apache.org/
Apache License 2.0
14.19k stars 3.58k forks source link

[Bug] Dead Letter Queue not working in custom sink due to ClassNotFoundException #21138

Open fmiguelez opened 1 year ago

fmiguelez commented 1 year ago

Search before asking

Version

Pulsar 2.8.4

Minimal reproduce step

We are experiencing a ClassNotFoundException when a Pulsar custom sink tries to produce to a DLQ topic once processing of a message has failed (and so the retries). The class is inside pulsar-client-original JAR file present in Pulsar docker image and also in the packaged NAR file of our sink.

The error is the following:

    17:19:24.006 [ForkJoinPool.commonPool-worker-3] ERROR org.apache.pulsar.client.impl.ConsumerImpl - Dead letter producer exception with topic: persistent://dbus/test/dummy-objects-sub-test-rtsink-postgresql-DLQ
    java.util.concurrent.CompletionException: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.ClassNotFoundException: org.apache.pulsar.client.impl.schema.AutoProduceBytesSchema
            at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:314) ~[?:?]
            at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:319) ~[?:?]
            at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:718) ~[?:?]
            at java.util.concurrent.CompletableFuture$Completion.exec(CompletableFuture.java:479) ~[?:?]
            at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290) ~[?:?]
            at java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020) ~[?:?]
            at java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656) ~[?:?]
            at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594) ~[?:?]
            at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183) ~[?:?]
    Caused by: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.ClassNotFoundException: org.apache.pulsar.client.impl.schema.AutoProduceBytesSchema
            at org.apache.pulsar.client.internal.ReflectionUtils.catchExceptions(ReflectionUtils.java:45) ~[java-instance.jar:?]
            at org.apache.pulsar.client.internal.DefaultImplementation.newAutoProduceSchema(DefaultImplementation.java:295) ~[java-instance.jar:?]
            at org.apache.pulsar.client.api.Schema.AUTO_PRODUCE_BYTES(Schema.java:426) ~[java-instance.jar:?]
            at org.apache.pulsar.client.impl.ConsumerImpl.lambda$processPossibleToDLQ$39(ConsumerImpl.java:1706) ~[?:?]
            at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:714) ~[?:?]
            ... 6 more
    Caused by: java.lang.RuntimeException: java.lang.ClassNotFoundException: org.apache.pulsar.client.impl.schema.AutoProduceBytesSchema
            at org.apache.pulsar.client.internal.ReflectionUtils.newClassInstance(ReflectionUtils.java:62) ~[java-instance.jar:?]
            at org.apache.pulsar.client.internal.ReflectionUtils.getConstructor(ReflectionUtils.java:68) ~[java-instance.jar:?]
            at org.apache.pulsar.client.internal.DefaultImplementation.lambda$newAutoProduceSchema$34(DefaultImplementation.java:296) ~[java-instance.jar:?]
            at org.apache.pulsar.client.internal.ReflectionUtils.catchExceptions(ReflectionUtils.java:34) ~[java-instance.jar:?]
            at org.apache.pulsar.client.internal.DefaultImplementation.newAutoProduceSchema(DefaultImplementation.java:295) ~[java-instance.jar:?]
            at org.apache.pulsar.client.api.Schema.AUTO_PRODUCE_BYTES(Schema.java:426) ~[java-instance.jar:?]
            at org.apache.pulsar.client.impl.ConsumerImpl.lambda$processPossibleToDLQ$39(ConsumerImpl.java:1706) ~[?:?]
            at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:714) ~[?:?]
            ... 6 more
    Caused by: java.lang.ClassNotFoundException: org.apache.pulsar.client.impl.schema.AutoProduceBytesSchema
            at jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581) ~[?:?]
            at jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178) ~[?:?]
            at java.lang.ClassLoader.loadClass(ClassLoader.java:522) ~[?:?]
            at java.lang.Class.forName0(Native Method) ~[?:?]
            at java.lang.Class.forName(Class.java:398) ~[?:?]
            at org.apache.pulsar.client.internal.ReflectionUtils.newClassInstance(ReflectionUtils.java:59) ~[java-instance.jar:?]
            at org.apache.pulsar.client.internal.ReflectionUtils.getConstructor(ReflectionUtils.java:68) ~[java-instance.jar:?]
            at org.apache.pulsar.client.internal.DefaultImplementation.lambda$newAutoProduceSchema$34(DefaultImplementation.java:296) ~[java-instance.jar:?]
            at org.apache.pulsar.client.internal.ReflectionUtils.catchExceptions(ReflectionUtils.java:34) ~[java-instance.jar:?]
            at org.apache.pulsar.client.internal.DefaultImplementation.newAutoProduceSchema(DefaultImplementation.java:295) ~[java-instance.jar:?]
            at org.apache.pulsar.client.api.Schema.AUTO_PRODUCE_BYTES(Schema.java:426) ~[java-instance.jar:?]
            at org.apache.pulsar.client.impl.ConsumerImpl.lambda$processPossibleToDLQ$39(ConsumerImpl.java:1706) ~[?:?]
            at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:714) ~[?:?]
            ... 6 more

It can be easily reproduced by trying to instantiate the missing class (AutoProduceBytesSchema) in the open() method of the sink using same classloader with following piece of code.

    @Override
    public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception {
        // This fails
        Class<?> clazz = ClassLoader.getSystemClassLoader().loadClass("org.apache.pulsar.client.impl.schema.AutoProduceBytesSchema");
        logger.info("AutoProduceBytesSchema loaded from: {}", clazz.getProtectionDomain().getCodeSource().getLocation());

        // Rest of the code
    }

The error now is the following:

    18:16:47.173 [dbus/test/rtsink-postgresql-0] ERROR org.apache.pulsar.functions.instance.JavaInstanceRunnable - Sink open produced uncaught exception:
    java.lang.ClassNotFoundException: org.apache.pulsar.client.impl.schema.AutoProduceBytesSchema
            at jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581) ~[?:?]
            at jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178) ~[?:?]
            at java.lang.ClassLoader.loadClass(ClassLoader.java:522) ~[?:?]
            at com.kapschtraffic.oneatms.databus.rtsink.RealtimeSink.open(RealtimeSink.java:54) ~[EJP2V0kUmSDLN4DXH0z-jw/:?]
            at org.apache.pulsar.functions.instance.JavaInstanceRunnable.setupOutput(JavaInstanceRunnable.java:814) ~[?:?]
            at org.apache.pulsar.functions.instance.JavaInstanceRunnable.setup(JavaInstanceRunnable.java:223) ~[?:?]
            at org.apache.pulsar.functions.instance.JavaInstanceRunnable.run(JavaInstanceRunnable.java:249) ~[?:?]
            at java.lang.Thread.run(Thread.java:829) ~[?:?]
    18:16:47.248 [dbus/test/rtsink-postgresql-0] ERROR org.apache.pulsar.functions.instance.JavaInstanceRunnable - [dbus/test/rtsink-postgresql:0] Uncaught exception in Java Instance
    java.lang.ClassNotFoundException: org.apache.pulsar.client.impl.schema.AutoProduceBytesSchema
            at jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581) ~[?:?]
            at jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178) ~[?:?]
            at java.lang.ClassLoader.loadClass(ClassLoader.java:522) ~[?:?]
            at com.kapschtraffic.oneatms.databus.rtsink.RealtimeSink.open(RealtimeSink.java:54) ~[?:?]
            at org.apache.pulsar.functions.instance.JavaInstanceRunnable.setupOutput(JavaInstanceRunnable.java:814) ~[org.apache.pulsar-pulsar-functions-instance-2.8.4.jar:2.8.4]
            at org.apache.pulsar.functions.instance.JavaInstanceRunnable.setup(JavaInstanceRunnable.java:223) ~[org.apache.pulsar-pulsar-functions-instance-2.8.4.jar:2.8.4]
            at org.apache.pulsar.functions.instance.JavaInstanceRunnable.run(JavaInstanceRunnable.java:249) ~[org.apache.pulsar-pulsar-functions-instance-2.8.4.jar:2.8.4]
            at java.lang.Thread.run(Thread.java:829) ~[?:?]         

What did you expect to see?

No exception thrown and message published to the DLQ topic

What did you see instead?

A ClassNotFoundException

Anything else?

No response

Are you willing to submit a PR?

RobertIndie commented 1 year ago

@fmiguelez Thanks for your issue. But unfortunately, I'm not able to reproduce it on my side.

Could you share what mode you are running the connector? localrun or cluster? Could you try using the latest pulsar function worker version?

fmiguelez commented 1 year ago

@RobertIndie the error refers to the Kubernetes mode. Each pulsar function instance runs inside a pod.

github-actions[bot] commented 11 months ago

The issue had no activity for 30 days, mark with Stale label.